about summary refs log tree commit diff stats
path: root/miasm/analysis/depgraph.py
diff options
context:
space:
mode:
Diffstat (limited to 'miasm/analysis/depgraph.py')
-rw-r--r--miasm/analysis/depgraph.py659
1 files changed, 0 insertions, 659 deletions
diff --git a/miasm/analysis/depgraph.py b/miasm/analysis/depgraph.py
deleted file mode 100644
index 436e5354..00000000
--- a/miasm/analysis/depgraph.py
+++ /dev/null
@@ -1,659 +0,0 @@
-"""Provide dependency graph"""
-
-from functools import total_ordering
-
-from future.utils import viewitems
-
-from miasm.expression.expression import ExprInt, ExprLoc, ExprAssign, \
-    ExprWalk, canonize_to_exprloc
-from miasm.core.graph import DiGraph
-from miasm.expression.simplifications import expr_simp_explicit
-from miasm.ir.symbexec import SymbolicExecutionEngine
-from miasm.ir.ir import IRBlock, AssignBlock
-from miasm.ir.translators import Translator
-from miasm.expression.expression_helper import possible_values
-
-try:
-    import z3
-except:
-    pass
-
-@total_ordering
-class DependencyNode(object):
-
-    """Node elements of a DependencyGraph
-
-    A dependency node stands for the dependency on the @element at line number
-    @line_nb in the IRblock named @loc_key, *before* the evaluation of this
-    line.
-    """
-
-    __slots__ = ["_loc_key", "_element", "_line_nb", "_hash"]
-
-    def __init__(self, loc_key, element, line_nb):
-        """Create a dependency node with:
-        @loc_key: LocKey instance
-        @element: Expr instance
-        @line_nb: int
-        """
-        self._loc_key = loc_key
-        self._element = element
-        self._line_nb = line_nb
-        self._hash = hash(
-            (self._loc_key, self._element, self._line_nb))
-
-    def __hash__(self):
-        """Returns a hash of @self to uniquely identify @self"""
-        return self._hash
-
-    def __eq__(self, depnode):
-        """Returns True if @self and @depnode are equals."""
-        if not isinstance(depnode, self.__class__):
-            return False
-        return (self.loc_key == depnode.loc_key and
-                self.element == depnode.element and
-                self.line_nb == depnode.line_nb)
-
-    def __ne__(self, depnode):
-        # required Python 2.7.14
-        return not self == depnode
-
-    def __lt__(self, node):
-        """Compares @self with @node."""
-        if not isinstance(node, self.__class__):
-            return NotImplemented
-
-        return ((self.loc_key, self.element, self.line_nb) <
-                (node.loc_key, node.element, node.line_nb))
-
-    def __str__(self):
-        """Returns a string representation of DependencyNode"""
-        return "<%s %s %s %s>" % (self.__class__.__name__,
-                                  self.loc_key, self.element,
-                                  self.line_nb)
-
-    def __repr__(self):
-        """Returns a string representation of DependencyNode"""
-        return self.__str__()
-
-    @property
-    def loc_key(self):
-        "Name of the current IRBlock"
-        return self._loc_key
-
-    @property
-    def element(self):
-        "Current tracked Expr"
-        return self._element
-
-    @property
-    def line_nb(self):
-        "Line in the current IRBlock"
-        return self._line_nb
-
-
-class DependencyState(object):
-
-    """
-    Store intermediate depnodes states during dependencygraph analysis
-    """
-
-    def __init__(self, loc_key, pending, line_nb=None):
-        self.loc_key = loc_key
-        self.history = [loc_key]
-        self.pending = {k: set(v) for k, v in viewitems(pending)}
-        self.line_nb = line_nb
-        self.links = set()
-
-        # Init lazy elements
-        self._graph = None
-
-    def __repr__(self):
-        return "<State: %r (%r) (%r)>" % (
-            self.loc_key,
-            self.pending,
-            self.links
-        )
-
-    def extend(self, loc_key):
-        """Return a copy of itself, with itself in history
-        @loc_key: LocKey instance for the new DependencyState's loc_key
-        """
-        new_state = self.__class__(loc_key, self.pending)
-        new_state.links = set(self.links)
-        new_state.history = self.history + [loc_key]
-        return new_state
-
-    def get_done_state(self):
-        """Returns immutable object representing current state"""
-        return (self.loc_key, frozenset(self.links))
-
-    def as_graph(self):
-        """Generates a Digraph of dependencies"""
-        graph = DiGraph()
-        for node_a, node_b in self.links:
-            if not node_b:
-                graph.add_node(node_a)
-            else:
-                graph.add_edge(node_a, node_b)
-        for parent, sons in viewitems(self.pending):
-            for son in sons:
-                graph.add_edge(parent, son)
-        return graph
-
-    @property
-    def graph(self):
-        """Returns a DiGraph instance representing the DependencyGraph"""
-        if self._graph is None:
-            self._graph = self.as_graph()
-        return self._graph
-
-    def remove_pendings(self, nodes):
-        """Remove resolved @nodes"""
-        for node in nodes:
-            del self.pending[node]
-
-    def add_pendings(self, future_pending):
-        """Add @future_pending to the state"""
-        for node, depnodes in viewitems(future_pending):
-            if node not in self.pending:
-                self.pending[node] = depnodes
-            else:
-                self.pending[node].update(depnodes)
-
-    def link_element(self, element, line_nb):
-        """Link element to its dependencies
-        @element: the element to link
-        @line_nb: the element's line
-        """
-
-        depnode = DependencyNode(self.loc_key, element, line_nb)
-        if not self.pending[element]:
-            # Create start node
-            self.links.add((depnode, None))
-        else:
-            # Link element to its known dependencies
-            for node_son in self.pending[element]:
-                self.links.add((depnode, node_son))
-
-    def link_dependencies(self, element, line_nb, dependencies,
-                          future_pending):
-        """Link unfollowed dependencies and create remaining pending elements.
-        @element: the element to link
-        @line_nb: the element's line
-        @dependencies: the element's dependencies
-        @future_pending: the future dependencies
-        """
-
-        depnode = DependencyNode(self.loc_key, element, line_nb)
-
-        # Update pending, add link to unfollowed nodes
-        for dependency in dependencies:
-            if not dependency.follow:
-                # Add non followed dependencies to the dependency graph
-                parent = DependencyNode(
-                    self.loc_key, dependency.element, line_nb)
-                self.links.add((parent, depnode))
-                continue
-            # Create future pending between new dependency and the current
-            # element
-            future_pending.setdefault(dependency.element, set()).add(depnode)
-
-
-class DependencyResult(DependencyState):
-
-    """Container and methods for DependencyGraph results"""
-
-    def __init__(self, ircfg, initial_state, state, inputs):
-
-        super(DependencyResult, self).__init__(state.loc_key, state.pending)
-        self.initial_state = initial_state
-        self.history = state.history
-        self.pending = state.pending
-        self.line_nb = state.line_nb
-        self.inputs = inputs
-        self.links = state.links
-        self._ircfg = ircfg
-
-        # Init lazy elements
-        self._has_loop = None
-
-    @property
-    def unresolved(self):
-        """Set of nodes whose dependencies weren't found"""
-        return set(element for element in self.pending
-                   if element != self._ircfg.IRDst)
-
-    @property
-    def relevant_nodes(self):
-        """Set of nodes directly and indirectly influencing inputs"""
-        output = set()
-        for node_a, node_b in self.links:
-            output.add(node_a)
-            if node_b is not None:
-                output.add(node_b)
-        return output
-
-    @property
-    def relevant_loc_keys(self):
-        """List of loc_keys containing nodes influencing inputs.
-        The history order is preserved."""
-        # Get used loc_keys
-        used_loc_keys = set(depnode.loc_key for depnode in self.relevant_nodes)
-
-        # Keep history order
-        output = []
-        for loc_key in self.history:
-            if loc_key in used_loc_keys:
-                output.append(loc_key)
-
-        return output
-
-    @property
-    def has_loop(self):
-        """True iff there is at least one data dependencies cycle (regarding
-        the associated depgraph)"""
-        if self._has_loop is None:
-            self._has_loop = self.graph.has_loop()
-        return self._has_loop
-
-    def irblock_slice(self, irb, max_line=None):
-        """Slice of the dependency nodes on the irblock @irb
-        @irb: irbloc instance
-        """
-
-        assignblks = []
-        line2elements = {}
-        for depnode in self.relevant_nodes:
-            if depnode.loc_key != irb.loc_key:
-                continue
-            line2elements.setdefault(depnode.line_nb,
-                                     set()).add(depnode.element)
-
-        for line_nb, elements in sorted(viewitems(line2elements)):
-            if max_line is not None and line_nb >= max_line:
-                break
-            assignmnts = {}
-            for element in elements:
-                if element in irb[line_nb]:
-                    # constants, loc_key, ... are not in destination
-                    assignmnts[element] = irb[line_nb][element]
-            assignblks.append(AssignBlock(assignmnts))
-
-        return IRBlock(irb.loc_db, irb.loc_key, assignblks)
-
-    def emul(self, lifter, ctx=None, step=False):
-        """Symbolic execution of relevant nodes according to the history
-        Return the values of inputs nodes' elements
-        @lifter: Lifter instance
-        @ctx: (optional) Initial context as dictionary
-        @step: (optional) Verbose execution
-        Warning: The emulation is not sound if the inputs nodes depend on loop
-        variant.
-        """
-        # Init
-        ctx_init = {}
-        if ctx is not None:
-            ctx_init.update(ctx)
-        assignblks = []
-
-        # Build a single assignment block according to history
-        last_index = len(self.relevant_loc_keys)
-        for index, loc_key in enumerate(reversed(self.relevant_loc_keys), 1):
-            if index == last_index and loc_key == self.initial_state.loc_key:
-                line_nb = self.initial_state.line_nb
-            else:
-                line_nb = None
-            assignblks += self.irblock_slice(self._ircfg.blocks[loc_key],
-                                             line_nb).assignblks
-
-        # Eval the block
-        loc_db = lifter.loc_db
-        temp_loc = loc_db.get_or_create_name_location("Temp")
-        symb_exec = SymbolicExecutionEngine(lifter, ctx_init)
-        symb_exec.eval_updt_irblock(IRBlock(loc_db, temp_loc, assignblks), step=step)
-
-        # Return only inputs values (others could be wrongs)
-        return {element: symb_exec.symbols[element]
-                for element in self.inputs}
-
-
-class DependencyResultImplicit(DependencyResult):
-
-    """Stand for a result of a DependencyGraph with implicit option
-
-    Provide path constraints using the z3 solver"""
-    # Z3 Solver instance
-    _solver = None
-
-    unsat_expr = ExprAssign(ExprInt(0, 1), ExprInt(1, 1))
-
-    def _gen_path_constraints(self, translator, expr, expected):
-        """Generate path constraint from @expr. Handle special case with
-        generated loc_keys
-        """
-        out = []
-        expected = canonize_to_exprloc(self._ircfg.loc_db, expected)
-        expected_is_loc_key = expected.is_loc()
-        for consval in possible_values(expr):
-            value = canonize_to_exprloc(self._ircfg.loc_db, consval.value)
-            if expected_is_loc_key and value != expected:
-                continue
-            if not expected_is_loc_key and value.is_loc_key():
-                continue
-
-            conds = z3.And(*[translator.from_expr(cond.to_constraint())
-                             for cond in consval.constraints])
-            if expected != value:
-                conds = z3.And(
-                    conds,
-                    translator.from_expr(
-                        ExprAssign(value,
-                                expected))
-                )
-            out.append(conds)
-
-        if out:
-            conds = z3.Or(*out)
-        else:
-            # Ex: expr: lblgen1, expected: 0x1234
-            # -> Avoid inconsistent solution lblgen1 = 0x1234
-            conds = translator.from_expr(self.unsat_expr)
-        return conds
-
-    def emul(self, lifter, ctx=None, step=False):
-        # Init
-        ctx_init = {}
-        if ctx is not None:
-            ctx_init.update(ctx)
-        solver = z3.Solver()
-        symb_exec = SymbolicExecutionEngine(lifter, ctx_init)
-        history = self.history[::-1]
-        history_size = len(history)
-        translator = Translator.to_language("z3")
-        size = self._ircfg.IRDst.size
-
-        for hist_nb, loc_key in enumerate(history, 1):
-            if hist_nb == history_size and loc_key == self.initial_state.loc_key:
-                line_nb = self.initial_state.line_nb
-            else:
-                line_nb = None
-            irb = self.irblock_slice(self._ircfg.blocks[loc_key], line_nb)
-
-            # Emul the block and get back destination
-            dst = symb_exec.eval_updt_irblock(irb, step=step)
-
-            # Add constraint
-            if hist_nb < history_size:
-                next_loc_key = history[hist_nb]
-                expected = symb_exec.eval_expr(ExprLoc(next_loc_key, size))
-                solver.add(self._gen_path_constraints(translator, dst, expected))
-        # Save the solver
-        self._solver = solver
-
-        # Return only inputs values (others could be wrongs)
-        return {
-            element: symb_exec.eval_expr(element)
-            for element in self.inputs
-        }
-
-    @property
-    def is_satisfiable(self):
-        """Return True iff the solution path admits at least one solution
-        PRE: 'emul'
-        """
-        return self._solver.check() == z3.sat
-
-    @property
-    def constraints(self):
-        """If satisfiable, return a valid solution as a Z3 Model instance"""
-        if not self.is_satisfiable:
-            raise ValueError("Unsatisfiable")
-        return self._solver.model()
-
-
-class FollowExpr(object):
-
-    "Stand for an element (expression, depnode, ...) to follow or not"
-    __slots__ = ["follow", "element"]
-
-    def __init__(self, follow, element):
-        self.follow = follow
-        self.element = element
-
-    def __repr__(self):
-        return '%s(%r, %r)' % (self.__class__.__name__, self.follow, self.element)
-
-    @staticmethod
-    def to_depnodes(follow_exprs, loc_key, line):
-        """Build a set of FollowExpr(DependencyNode) from the @follow_exprs set
-        of FollowExpr
-        @follow_exprs: set of FollowExpr
-        @loc_key: LocKey instance
-        @line: integer
-        """
-        dependencies = set()
-        for follow_expr in follow_exprs:
-            dependencies.add(FollowExpr(follow_expr.follow,
-                                        DependencyNode(loc_key,
-                                                       follow_expr.element,
-                                                       line)))
-        return dependencies
-
-    @staticmethod
-    def extract_depnodes(follow_exprs, only_follow=False):
-        """Extract depnodes from a set of FollowExpr(Depnodes)
-        @only_follow: (optional) extract only elements to follow"""
-        return set(follow_expr.element
-                   for follow_expr in follow_exprs
-                   if not(only_follow) or follow_expr.follow)
-
-
-class FilterExprSources(ExprWalk):
-    """
-    Walk Expression to find sources to track
-    @follow_mem: (optional) Track memory syntactically
-    @follow_call: (optional) Track through "call"
-    """
-    def __init__(self, follow_mem, follow_call):
-        super(FilterExprSources, self).__init__(lambda x:None)
-        self.follow_mem = follow_mem
-        self.follow_call = follow_call
-        self.nofollow = set()
-        self.follow = set()
-
-    def visit(self, expr, *args, **kwargs):
-        if expr in self.cache:
-            return None
-        ret = self.visit_inner(expr, *args, **kwargs)
-        self.cache.add(expr)
-        return ret
-
-    def visit_inner(self, expr, *args, **kwargs):
-        if expr.is_id():
-            self.follow.add(expr)
-        elif expr.is_int():
-            self.nofollow.add(expr)
-        elif expr.is_loc():
-            self.nofollow.add(expr)
-        elif expr.is_mem():
-            if self.follow_mem:
-                self.follow.add(expr)
-            else:
-                self.nofollow.add(expr)
-                return None
-        elif expr.is_function_call():
-            if self.follow_call:
-                self.follow.add(expr)
-            else:
-                self.nofollow.add(expr)
-                return None
-
-        ret = super(FilterExprSources, self).visit(expr, *args, **kwargs)
-        return ret
-
-
-class DependencyGraph(object):
-
-    """Implementation of a dependency graph
-
-    A dependency graph contains DependencyNode as nodes. The oriented edges
-    stand for a dependency.
-    The dependency graph is made of the lines of a group of IRblock
-    *explicitly* or *implicitly* involved in the equation of given element.
-    """
-
-    def __init__(self, ircfg,
-                 implicit=False, apply_simp=True, follow_mem=True,
-                 follow_call=True):
-        """Create a DependencyGraph linked to @ircfg
-
-        @ircfg: IRCFG instance
-        @implicit: (optional) Track IRDst for each block in the resulting path
-
-        Following arguments define filters used to generate dependencies
-        @apply_simp: (optional) Apply expr_simp_explicit
-        @follow_mem: (optional) Track memory syntactically
-        @follow_call: (optional) Track through "call"
-        """
-        # Init
-        self._ircfg = ircfg
-        self._implicit = implicit
-
-        # Create callback filters. The order is relevant.
-        self._cb_follow = []
-        if apply_simp:
-            self._cb_follow.append(self._follow_simp_expr)
-        self._cb_follow.append(lambda exprs: self.do_follow(exprs, follow_mem, follow_call))
-
-    @staticmethod
-    def do_follow(exprs, follow_mem, follow_call):
-        visitor = FilterExprSources(follow_mem, follow_call)
-        for expr in exprs:
-            visitor.visit(expr)
-        return visitor.follow, visitor.nofollow
-
-    @staticmethod
-    def _follow_simp_expr(exprs):
-        """Simplify expression so avoid tracking useless elements,
-        as: XOR EAX, EAX
-        """
-        follow = set()
-        for expr in exprs:
-            follow.add(expr_simp_explicit(expr))
-        return follow, set()
-
-    def _follow_apply_cb(self, expr):
-        """Apply callback functions to @expr
-        @expr : FollowExpr instance"""
-        follow = set([expr])
-        nofollow = set()
-
-        for callback in self._cb_follow:
-            follow, nofollow_tmp = callback(follow)
-            nofollow.update(nofollow_tmp)
-
-        out = set(FollowExpr(True, expr) for expr in follow)
-        out.update(set(FollowExpr(False, expr) for expr in nofollow))
-        return out
-
-    def _track_exprs(self, state, assignblk, line_nb):
-        """Track pending expression in an assignblock"""
-        future_pending = {}
-        node_resolved = set()
-        for dst, src in viewitems(assignblk):
-            # Only track pending
-            if dst not in state.pending:
-                continue
-            # Track IRDst in implicit mode only
-            if dst == self._ircfg.IRDst and not self._implicit:
-                continue
-            assert dst not in node_resolved
-            node_resolved.add(dst)
-            dependencies = self._follow_apply_cb(src)
-
-            state.link_element(dst, line_nb)
-            state.link_dependencies(dst, line_nb,
-                                    dependencies, future_pending)
-
-        # Update pending nodes
-        state.remove_pendings(node_resolved)
-        state.add_pendings(future_pending)
-
-    def _compute_intrablock(self, state):
-        """Follow dependencies tracked in @state in the current irbloc
-        @state: instance of DependencyState"""
-
-        irb = self._ircfg.blocks[state.loc_key]
-        line_nb = len(irb) if state.line_nb is None else state.line_nb
-
-        for cur_line_nb, assignblk in reversed(list(enumerate(irb[:line_nb]))):
-            self._track_exprs(state, assignblk, cur_line_nb)
-
-    def get(self, loc_key, elements, line_nb, heads):
-        """Compute the dependencies of @elements at line number @line_nb in
-        the block named @loc_key in the current IRCFG, before the execution of
-        this line. Dependency check stop if one of @heads is reached
-        @loc_key: LocKey instance
-        @element: set of Expr instances
-        @line_nb: int
-        @heads: set of LocKey instances
-        Return an iterator on DiGraph(DependencyNode)
-        """
-        # Init the algorithm
-        inputs = {element: set() for element in elements}
-        initial_state = DependencyState(loc_key, inputs, line_nb)
-        todo = set([initial_state])
-        done = set()
-        dpResultcls = DependencyResultImplicit if self._implicit else DependencyResult
-
-        while todo:
-            state = todo.pop()
-            self._compute_intrablock(state)
-            done_state = state.get_done_state()
-            if done_state in done:
-                continue
-            done.add(done_state)
-            if (not state.pending or
-                    state.loc_key in heads or
-                    not self._ircfg.predecessors(state.loc_key)):
-                yield dpResultcls(self._ircfg, initial_state, state, elements)
-                if not state.pending:
-                    continue
-
-            if self._implicit:
-                # Force IRDst to be tracked, except in the input block
-                state.pending[self._ircfg.IRDst] = set()
-
-            # Propagate state to parents
-            for pred in self._ircfg.predecessors_iter(state.loc_key):
-                todo.add(state.extend(pred))
-
-    def get_from_depnodes(self, depnodes, heads):
-        """Alias for the get() method. Use the attributes of @depnodes as
-        argument.
-        PRE: Loc_Keys and lines of depnodes have to be equals
-        @depnodes: set of DependencyNode instances
-        @heads: set of LocKey instances
-        """
-        lead = list(depnodes)[0]
-        elements = set(depnode.element for depnode in depnodes)
-        return self.get(lead.loc_key, elements, lead.line_nb, heads)
-
-    def address_to_location(self, address):
-        """Helper to retrieve the .get() arguments, ie.
-        assembly address -> irblock's location key and line number
-        """
-        current_loc_key = next(iter(self._ircfg.getby_offset(address)))
-        assignblk_index = 0
-        current_block = self._ircfg.get_block(current_loc_key)
-        for assignblk_index, assignblk in enumerate(current_block):
-            if assignblk.instr.offset == address:
-                break
-        else:
-            return None
-
-        return {
-            "loc_key": current_block.loc_key,
-            "line_nb": assignblk_index,
-        }