diff options
| -rw-r--r-- | miasm2/analysis/depgraph.py | 140 | ||||
| -rw-r--r-- | test/analysis/depgraph.py | 15 |
2 files changed, 93 insertions, 62 deletions
diff --git a/miasm2/analysis/depgraph.py b/miasm2/analysis/depgraph.py index a375af1b..66c3f546 100644 --- a/miasm2/analysis/depgraph.py +++ b/miasm2/analysis/depgraph.py @@ -17,6 +17,7 @@ from miasm2.ir.ir import irbloc from miasm2.ir.translators import Translator class DependencyNode(object): + """Node elements of a DependencyGraph A dependency node stands for the dependency on the @element at line number @@ -41,9 +42,13 @@ class DependencyNode(object): (self._label, self._element, self._line_nb, self._step)) def __hash__(self): + """Returns a hash of @self to uniquely identify @self""" return self._hash def __eq__(self, depnode): + """Returns True if @self and @depnode are equals. + The attribute 'step' is not considered in the comparison. + """ if not isinstance(depnode, self.__class__): return False return (self.label == depnode.label and @@ -52,6 +57,9 @@ class DependencyNode(object): self.step == depnode.step) def __cmp__(self, node): + """Compares @self with @node. The step attribute is not taken into + account in the comparison. + """ if not isinstance(node, self.__class__): raise ValueError("Compare error between %s, %s" % (self.__class__, node.__class__)) @@ -59,11 +67,13 @@ class DependencyNode(object): (node.label, node.element, node.line_nb)) def __str__(self): - return "<%s %s %s %s M:%s>"%(self.__class__.__name__, - self.label.name, self.element, - self.line_nb, self.modifier) + """Returns a string representation of DependencyNode""" + return "<%s %s %s %s M:%s>" % (self.__class__.__name__, + self.label.name, self.element, + self.line_nb, self.modifier) def __repr__(self): + """Returns a string representation of DependencyNode""" return self.__str__() @property @@ -99,6 +109,8 @@ class DependencyNode(object): @modifier.setter def modifier(self, value): + """Evaluating the current line involves a modification of tracked + dependencies""" if not isinstance(value, bool): raise ValueError("Modifier must be a boolean") self._modifier = value @@ -134,8 +146,8 @@ class CacheWrapper(IterableUserDict): def nostep_cache(self): """Dictionnary of DependencyNode and their dependencies, without the step attribute. - The dictionnary is generated once when the method is called for the first time - and not updated afterward. + The dictionnary is generated once when the method is called for the + first time and not updated afterward. """ if self._nostep_cache is None: self._nostep_cache = {} @@ -146,6 +158,7 @@ class CacheWrapper(IterableUserDict): class DependencyDict(object): + """Internal structure for the DependencyGraph algorithm""" def __init__(self, label, history): @@ -247,7 +260,7 @@ class DependencyDict(object): dependencies = self._cache[depnode] out = set() - ## Launch on each depnodes + # Launch on each depnodes parallels = [] for depnode in dependencies: parallels.append(self._get_modifiers_in_cache(depnode)) @@ -268,7 +281,7 @@ class DependencyDict(object): force=True) self._cache = cache_out - def _build_depGraph(self, depnode): + def _build_depgraph(self, depnode): """Recursively build the final list of DiGraph, and clean up unmodifier nodes @depnode: starting node @@ -276,7 +289,7 @@ class DependencyDict(object): if depnode not in self._cache or \ not self._cache[depnode]: - ## There is no dependency + # There is no dependency graph = DiGraph() graph.add_node(depnode) return graph @@ -286,7 +299,7 @@ class DependencyDict(object): graphs = [] for sub_depnode in dependencies: - graphs.append(self._build_depGraph(sub_depnode)) + graphs.append(self._build_depgraph(sub_depnode)) # head(graphs[i]) == dependencies[i] graph = DiGraph() @@ -313,7 +326,7 @@ class DependencyDict(object): # Build subgraph for each starting_node subgraphs = [] for starting_node in starting_nodes: - subgraphs.append(self._build_depGraph(starting_node)) + subgraphs.append(self._build_depgraph(starting_node)) # Merge subgraphs into a final DiGraph graph = DiGraph() @@ -370,7 +383,7 @@ class DependencyDict(object): to_remove = set() for depnode in previous_dict.pending: if (depnode.nostep_repr not in nostep_pending or - implicit and depnode.element == irdst): + implicit and depnode.element == irdst): continue to_remove.update(self._non_modifier_in_loop(depnode)) @@ -419,6 +432,7 @@ class DependencyDict(object): class DependencyResult(object): + """Container and methods for DependencyGraph results""" def __init__(self, ira, final_depdict, input_depnodes): @@ -438,22 +452,27 @@ class DependencyResult(object): @property def graph(self): - "Lazy" + """Returns a DiGraph instance representing the DependencyGraph""" if self._graph is None: self._graph = self._depdict.as_graph(self._input_depnodes) return self._graph @property def history(self): + """List of depdict corresponding to the blocks encountered in the + analysis""" return list(self._depdict.history) + [self._depdict] @property def unresolved(self): + """Set of nodes whose dependencies weren't found""" return set([node.nostep_repr for node in self._depdict.pending if node.element != self._ira.IRDst]) @property def relevant_nodes(self): + """Set of nodes directly and indirectly influencing + @self.input_depnodes""" output = set() for depnodes in self._depdict.cache.values(): output.update(depnodes) @@ -461,6 +480,9 @@ class DependencyResult(object): @property def relevant_labels(self): + """List of labels containing nodes influencing @self.input_depnodes. + The history order is preserved. + """ # Get used labels used_labels = set([depnode.label for depnode in self.relevant_nodes]) @@ -474,11 +496,12 @@ class DependencyResult(object): @property def input(self): + """Set of DependencyGraph start nodes""" return self._input_depnodes @property def has_loop(self): - """True if current depgraph represent a looping path""" + """True if current dictionnary has a loop""" if self._has_loop is None: self._has_loop = (len(self.relevant_labels) != len(set(self.relevant_labels))) @@ -490,7 +513,8 @@ class DependencyResult(object): @ctx: (optional) Initial context as dictionnary @step: (optional) Verbose execution - /!\ The emulation is not safe if there is a loop in the relevant labels + Warning: The emulation is not sound if the input nodes depend on loop + variant. """ # Init ctx_init = self._ira.arch.regs.regs_init @@ -509,15 +533,16 @@ class DependencyResult(object): # Eval the block temp_label = asm_label("Temp") - sb = symbexec(self._ira, ctx_init) - sb.emulbloc(irbloc(temp_label, affects), step=step) + symb_exec = symbexec(self._ira, ctx_init) + symb_exec.emulbloc(irbloc(temp_label, affects), step=step) # Return only inputs values (others could be wrongs) - return {depnode.element: sb.symbols[depnode.element] + return {depnode.element: symb_exec.symbols[depnode.element] for depnode in self.input} class DependencyResultImplicit(DependencyResult): + """Stand for a result of a DependencyGraph with implicit option Provide path constraints using the z3 solver""" @@ -532,7 +557,7 @@ class DependencyResultImplicit(DependencyResult): ctx_init.update(ctx) depnodes = self.relevant_nodes solver = z3.Solver() - sb = symbexec(self._ira, ctx_init) + symb_exec = symbexec(self._ira, ctx_init) temp_label = asm_label("Temp") history = self.relevant_labels[::-1] history_size = len(history) @@ -548,12 +573,12 @@ class DependencyResultImplicit(DependencyResult): affects.append(irs[line_nb]) # Emul the block and get back destination - dst = sb.emulbloc(irbloc(temp_label, affects), step=step) + dst = symb_exec.emulbloc(irbloc(temp_label, affects), step=step) # Add constraint if hist_nb + 1 < history_size: next_label = history[hist_nb + 1] - expected = sb.eval_expr(m2_expr.ExprId(next_label, 32)) + expected = symb_exec.eval_expr(m2_expr.ExprId(next_label, 32)) constraint = m2_expr.ExprAff(dst, expected) solver.add(Translator.to_language("z3").from_expr(constraint)) @@ -561,7 +586,7 @@ class DependencyResultImplicit(DependencyResult): self._solver = solver # Return only inputs values (others could be wrongs) - return {depnode.element: sb.symbols[depnode.element] + return {depnode.element: symb_exec.symbols[depnode.element] for depnode in self.input} @property @@ -580,6 +605,7 @@ class DependencyResultImplicit(DependencyResult): class FollowExpr(object): + "Stand for an element (expression, depnode, ...) to follow or not" def __init__(self, follow, element): @@ -610,6 +636,7 @@ class FollowExpr(object): class DependencyGraph(object): + """Implementation of a dependency graph A dependency graph contains DependencyNode as nodes. The oriented edges @@ -637,7 +664,7 @@ class DependencyGraph(object): self._step_counter = itertools.count() # The IRA graph must be computed - assert(hasattr(self._ira, 'g')) + assert hasattr(self._ira, 'g') # Create callback filters. The order is relevant. self._cb_follow = [] @@ -678,6 +705,7 @@ class DependencyGraph(object): @staticmethod def _follow_mem_wrapper(exprs, mem_read): + """Wrapper to follow or not expression from memory pointer""" follow = set() for expr in exprs: follow.update(expr.get_r(mem_read=mem_read, cst_read=True)) @@ -706,16 +734,17 @@ class DependencyGraph(object): return follow, nofollow def _follow_apply_cb(self, expr): + """Apply callback functions to @expr + @expr : FollowExpr instance""" follow = set([expr]) nofollow = set() - for cb in self._cb_follow: - follow, nofollow_tmp = cb(follow) + for callback in self._cb_follow: + follow, nofollow_tmp = callback(follow) nofollow.update(nofollow_tmp) out = set(FollowExpr(True, expr) for expr in follow) out.update(set(FollowExpr(False, expr) for expr in nofollow)) - return out def _get_irs(self, label): @@ -727,8 +756,9 @@ class DependencyGraph(object): LINE_NB must be > 0""" return self._get_irs(depnode.label)[depnode.line_nb - 1] - def _resolve_depNode(self, depnode): - """Compute and return the dependencies involved by @depnode + def _direct_depnode_dependencies(self, depnode): + """Compute and return the dependencies involved by @depnode, + over the instruction @depnode.line_,. Return a set of FollowExpr""" if isinstance(depnode.element, m2_expr.ExprInt): @@ -741,9 +771,8 @@ class DependencyGraph(object): else: # Intra-block resolving - ## Get dependencies + # Get dependencies read = set() - modifier = False for affect in self._get_affblock(depnode): @@ -752,19 +781,20 @@ class DependencyGraph(object): read.update(elements) modifier = True - ## If it's not a modifier affblock, reinject current element + # If it's not a modifier affblock, reinject current element if not modifier: read = set([FollowExpr(True, depnode.element)]) - ## Build output + # Build output output = FollowExpr.to_depnodes(read, depnode.label, depnode.line_nb - 1, modifier, self.step_counter) return output - def _updateDependencyDict(self, depdict): - """Update DependencyDict until a fixed point is reached + def _resolve_intrablock_dep(self, depdict): + """Resolve the dependencies of nodes in @depdict.pending inside + @depdict.label until a fixed point is reached. @depdict: DependencyDict to update""" # Prepare the work list @@ -785,14 +815,14 @@ class DependencyGraph(object): continue # Find dependency of the current depnode - sub_depnodes = self._resolve_depNode(depnode) + sub_depnodes = self._direct_depnode_dependencies(depnode) depdict.cache[depnode] = FollowExpr.extract_depnodes(sub_depnodes) # Add to the worklist its dependencies todo.update(FollowExpr.extract_depnodes(sub_depnodes, only_follow=True)) - # Pending states will be override in cache + # Pending states will be overriden in cache for depnode in depdict.pending: try: del depdict.cache[depnode] @@ -807,9 +837,9 @@ class DependencyGraph(object): length = len(self._get_irs(pred_label)) yield (pred_label, length) - def _processInterBloc(self, depnodes, heads): - """Create a DependencyDict from @depnodes, and propagate DependencyDicts - through all blocs + def _compute_interblock_dep(self, depnodes, heads): + """Create a DependencyDict from @depnodes, and propagate + DependencyDicts through all blocs """ # Create an DependencyDict which will only contain our depnodes current_depdict = DependencyDict(list(depnodes)[0].label, []) @@ -823,7 +853,7 @@ class DependencyGraph(object): depdict = todo.popleft() # Update the dependencydict until fixed point is reached - self._updateDependencyDict(depdict) + self._resolve_intrablock_dep(depdict) # Clean irrelevant path depdict.filter_unmodifier_loops(self._implicit, self._ira.IRDst) @@ -846,33 +876,32 @@ class DependencyGraph(object): for label, irb_len in self._get_previousblocks(depdict.label): is_final = False - ## Duplicate the DependencyDict + # Duplicate the DependencyDict new_depdict = depdict.extend(label) if self._implicit: - ### Implicit dependencies: IRDst will be link with heads + # Implicit dependencies: IRDst will be link with heads implicit_depnode = DependencyNode(label, self._ira.IRDst, irb_len, next(self.step_counter), modifier=False) - ## Create links between DependencyDict + # Create links between DependencyDict for depnode_head in depdict.pending: - ### Follow the head element in the parent + # Follow the head element in the parent new_depnode = DependencyNode(label, depnode_head.element, irb_len, next(self.step_counter)) - ### The new node has to be computed in _updateDependencyDict + # The new node has to be analysed new_depdict.cache[depnode_head] = set([new_depnode]) new_depdict.pending.add(new_depnode) - ### Handle implicit dependencies + # Handle implicit dependencies if self._implicit: new_depdict.cache[depnode_head].add(implicit_depnode) new_depdict.pending.add(implicit_depnode) - - ## Manage the new element + # Manage the new element todo.append(new_depdict) # Return the node if it's a final one, ie. it's a head (in graph @@ -898,23 +927,26 @@ class DependencyGraph(object): next(self.step_counter))) # Compute final depdicts - depdicts = self._processInterBloc(input_depnodes, heads) + depdicts = self._compute_interblock_dep(input_depnodes, heads) # Unify solutions unified = [] - cls_res = DependencyResultImplicit if self._implicit else DependencyResult + cls_res = DependencyResultImplicit if self._implicit else \ + DependencyResult + for final_depdict in depdicts: - ## Keep only relevant nodes + # Keep only relevant nodes final_depdict.clean_modifiers_in_cache() final_depdict.filter_used_nodes(input_depnodes) - ## Remove duplicate solutions + # Remove duplicate solutions if final_depdict not in unified: unified.append(final_depdict) - ### Return solutions as DiGraph + + # Return solutions as DiGraph yield cls_res(self._ira, final_depdict, input_depnodes) - def get_fromDepNodes(self, depnodes, heads): + def get_from_depnodes(self, depnodes, heads): """Alias for the get() method. Use the attributes of @depnodes as argument. PRE: Labels and lines of depnodes have to be equals @@ -925,7 +957,7 @@ class DependencyGraph(object): elements = set([depnode.element for depnode in depnodes]) return self.get(lead.label, elements, lead.line_nb, heads) - def get_fromEnd(self, label, elements, heads): + def get_from_end(self, label, elements, heads): """Alias for the get() method. Consider that the dependency is asked at the end of the block named @label. @label: asm_label instance diff --git a/test/analysis/depgraph.py b/test/analysis/depgraph.py index 760eeca1..86aa1749 100644 --- a/test/analysis/depgraph.py +++ b/test/analysis/depgraph.py @@ -133,15 +133,15 @@ class DepNodeTest(DiGraph): return True def node2str(self, node): - assert(node.label in self.ira.blocs) - out = "(%s, %s, %s)\\l"%(node.label.name, + assert node.label in self.ira.blocs + out = "(%s, %s, %s)\\l" % (node.label.name, node.element, node.line_nb) - if not (0 <= node.line_nb < len(self.ira.blocs[node.label].irs)): + if not 0 <= node.line_nb < len(self.ira.blocs[node.label].irs): return out exprs = self.ira.blocs[node.label].irs[node.line_nb] exprs_str = '\\l'.join([str(x) for x in exprs]) - return "%s %s"%(out, exprs_str) + return "%s %s" % (out, exprs_str) # Test structures print "[+] Test structures" @@ -1007,7 +1007,6 @@ G14_TEST2_1.add_uniq_edge(G14_TEST2_0_DN7, G14_TEST1_1_DN7) G14_TEST2_1.add_uniq_edge(G14_TEST2_0_DN7, G14_TEST1_1_DN8) - DN14_UR_D = DependencyNode(G14_IRB0.label, D, 0, 0).nostep_repr DN14_UR_C = DependencyNode(G14_IRB0.label, C, 0, 0).nostep_repr @@ -1142,7 +1141,7 @@ for test_nb, test in enumerate([(G1_IRA, G1_INPUT, G1_OUTPUT), # Test public APIs for api_i, g_list in enumerate( - [g_dep.get_fromDepNodes(depnodes, heads), + [g_dep.get_from_depnodes(depnodes, heads), g_dep.get(list(depnodes)[0].label, [depnode.element for depnode in depnodes], @@ -1202,12 +1201,12 @@ for test_nb, test in enumerate([(G1_IRA, G1_INPUT, G1_OUTPUT), error = "emul" found = False for exp_nb in xrange(len(g_list)): - if (emul_result == g_test_output["emul"][exp_nb] and + if (emul_result == g_test_output["emul"][exp_nb] and getattr(result, "unresolved") == g_test_output[unresolved_test_key][exp_nb] and g_test_output["has_loop"][exp_nb] == getattr(result, "has_loop") - ): + ): found = True break assert found |