diff options
Diffstat (limited to 'src/miasm/analysis/ssa.py')
| -rw-r--r-- | src/miasm/analysis/ssa.py | 731 |
1 files changed, 731 insertions, 0 deletions
diff --git a/src/miasm/analysis/ssa.py b/src/miasm/analysis/ssa.py new file mode 100644 index 00000000..5c1964ef --- /dev/null +++ b/src/miasm/analysis/ssa.py @@ -0,0 +1,731 @@ +from collections import deque +from future.utils import viewitems, viewvalues + +from miasm.expression.expression import ExprId, ExprAssign, ExprOp, \ + ExprLoc, get_expr_ids +from miasm.ir.ir import AssignBlock, IRBlock + + +def sanitize_graph_head(ircfg, head): + """ + In multiple algorithm, the @head of the ircfg may not have predecessors. + The function transform the @ircfg in order to ensure this property + @ircfg: IRCFG instance + @head: the location of the graph's head + """ + + if not ircfg.predecessors(head): + return + original_edges = ircfg.predecessors(head) + sub_head = ircfg.loc_db.add_location() + + # Duplicate graph, replacing references to head by sub_head + replaced_expr = { + ExprLoc(head, ircfg.IRDst.size): + ExprLoc(sub_head, ircfg.IRDst.size) + } + ircfg.simplify( + lambda expr:expr.replace_expr(replaced_expr) + ) + # Duplicate head block + ircfg.add_irblock(IRBlock(ircfg.loc_db, sub_head, list(ircfg.blocks[head]))) + + # Remove original head block + ircfg.del_node(head) + + for src in original_edges: + ircfg.add_edge(src, sub_head) + + # Create new head, jumping to sub_head + assignblk = AssignBlock({ircfg.IRDst:ExprLoc(sub_head, ircfg.IRDst.size)}) + new_irblock = IRBlock(ircfg.loc_db, head, [assignblk]) + ircfg.add_irblock(new_irblock) + + +class SSA(object): + """ + Generic class for static single assignment (SSA) transformation + + Handling of + - variable generation + - variable renaming + - conversion of an IRCFG block into SSA + + Variables will be renamed to <variable>.<index>, whereby the + index will be increased in every definition of <variable>. + + Memory expressions are stateless. The addresses are in SSA form, + but memory aliasing will occur. For instance, if it holds + that RAX == RBX.0 + (-0x8) and + + @64[RBX.0 + (-0x8)] = RDX + RCX.0 = @64[RAX], + + then it cannot be tracked that RCX.0 == RDX. + """ + + + def __init__(self, ircfg): + """ + Initialises generic class for SSA + :param ircfg: instance of IRCFG + """ + # IRCFG instance + self.ircfg = ircfg + + # stack for RHS + self._stack_rhs = {} + # stack for LHS + self._stack_lhs = {} + + self.ssa_variable_to_expr = {} + + # dict of SSA expressions + self.expressions = {} + + # dict of SSA to original location + self.ssa_to_location = {} + + # Don't SSA IRDst + self.immutable_ids = set([self.ircfg.IRDst]) + + def get_regs(self, expr): + return get_expr_ids(expr) + + def transform(self, *args, **kwargs): + """Transforms into SSA""" + raise NotImplementedError("Abstract method") + + def get_block(self, loc_key): + """ + Returns an IRBlock + :param loc_key: LocKey instance + :return: IRBlock + """ + irblock = self.ircfg.blocks.get(loc_key, None) + + return irblock + + def reverse_variable(self, ssa_var): + """ + Transforms a variable in SSA form into non-SSA form + :param ssa_var: ExprId, variable in SSA form + :return: ExprId, variable in non-SSA form + """ + expr = self.ssa_variable_to_expr.get(ssa_var, ssa_var) + return expr + + def reset(self): + """Resets SSA transformation""" + self.expressions = {} + self._stack_rhs = {} + self._stack_lhs = {} + self.ssa_to_location = {} + + def _gen_var_expr(self, expr, stack): + """ + Generates a variable expression in SSA form + :param expr: variable expression which will be translated + :param stack: self._stack_rhs or self._stack_lhs + :return: variable expression in SSA form + """ + index = stack[expr] + name = "%s.%d" % (expr.name, index) + ssa_var = ExprId(name, expr.size) + self.ssa_variable_to_expr[ssa_var] = expr + + return ssa_var + + def _transform_var_rhs(self, ssa_var): + """ + Transforms a variable on the right hand side into SSA + :param ssa_var: variable + :return: transformed variable + """ + # variable has never been on the LHS + if ssa_var not in self._stack_rhs: + return ssa_var + # variable has been on the LHS + stack = self._stack_rhs + return self._gen_var_expr(ssa_var, stack) + + def _transform_var_lhs(self, expr): + """ + Transforms a variable on the left hand side into SSA + :param expr: variable + :return: transformed variable + """ + # check if variable has already been on the LHS + if expr not in self._stack_lhs: + self._stack_lhs[expr] = 0 + # save last value for RHS transformation + self._stack_rhs[expr] = self._stack_lhs[expr] + + # generate SSA expression + stack = self._stack_lhs + ssa_var = self._gen_var_expr(expr, stack) + + return ssa_var + + def _transform_expression_lhs(self, dst): + """ + Transforms an expression on the left hand side into SSA + :param dst: expression + :return: expression in SSA form + """ + if dst.is_mem(): + # transform with last RHS instance + ssa_var = self._transform_expression_rhs(dst) + else: + # transform LHS + ssa_var = self._transform_var_lhs(dst) + + # increase SSA variable counter + self._stack_lhs[dst] += 1 + + return ssa_var + + def _transform_expression_rhs(self, src): + """ + Transforms an expression on the right hand side into SSA + :param src: expression + :return: expression in SSA form + """ + # dissect expression in variables + variables = self.get_regs(src) + src_ssa = src + # transform variables + to_replace = {} + for expr in variables: + ssa_var = self._transform_var_rhs(expr) + to_replace[expr] = ssa_var + src_ssa = src_ssa.replace_expr(to_replace) + + return src_ssa + + @staticmethod + def _parallel_instructions(assignblk): + """ + Extracts the instruction from a AssignBlock. + + Since instructions in a AssignBlock are evaluated + in parallel, memory instructions on the left hand + side will be inserted into the start of the list. + Then, memory instruction on the LHS will be + transformed firstly. + + :param assignblk: assignblock + :return: sorted list of expressions + """ + instructions = [] + for dst in assignblk: + # dst = src + aff = assignblk.dst2ExprAssign(dst) + # insert memory expression into start of list + if dst.is_mem(): + instructions.insert(0, aff) + else: + instructions.append(aff) + + return instructions + + @staticmethod + def _convert_block(irblock, ssa_list): + """ + Transforms an IRBlock inplace into SSA + :param irblock: IRBlock to be transformed + :param ssa_list: list of SSA expressions + """ + # iterator over SSA expressions + ssa_iter = iter(ssa_list) + new_irs = [] + # walk over IR blocks' assignblocks + for assignblk in irblock.assignblks: + # list of instructions + instructions = [] + # insert SSA instructions + for _ in assignblk: + instructions.append(next(ssa_iter)) + # replace instructions of assignblock in IRBlock + new_irs.append(AssignBlock(instructions, assignblk.instr)) + return IRBlock(irblock.loc_db, irblock.loc_key, new_irs) + + def _rename_expressions(self, loc_key): + """ + Transforms variables and expressions + of an IRBlock into SSA. + + IR representations of an assembly instruction are evaluated + in parallel. Thus, RHS and LHS instructions will be performed + separately. + :param loc_key: IRBlock loc_key + """ + # list of IRBlock's SSA expressions + ssa_expressions_block = [] + + # retrieve IRBlock + irblock = self.get_block(loc_key) + if irblock is None: + # Incomplete graph + return + + # iterate block's IR expressions + for index, assignblk in enumerate(irblock.assignblks): + # list of parallel instructions + instructions = self._parallel_instructions(assignblk) + # list for transformed RHS expressions + rhs = deque() + + # transform RHS + for expr in instructions: + src = expr.src + src_ssa = self._transform_expression_rhs(src) + # save transformed RHS + rhs.append(src_ssa) + + # transform LHS + for expr in instructions: + if expr.dst in self.immutable_ids or expr.dst in self.ssa_variable_to_expr: + dst_ssa = expr.dst + else: + dst_ssa = self._transform_expression_lhs(expr.dst) + + # retrieve corresponding RHS expression + src_ssa = rhs.popleft() + + # rebuild SSA expression + expr = ExprAssign(dst_ssa, src_ssa) + self.expressions[dst_ssa] = src_ssa + self.ssa_to_location[dst_ssa] = (loc_key, index) + + + # append ssa expression to list + ssa_expressions_block.append(expr) + + # replace blocks IR expressions with corresponding SSA transformations + new_irblock = self._convert_block(irblock, ssa_expressions_block) + self.ircfg.blocks[loc_key] = new_irblock + + +class SSABlock(SSA): + """ + SSA transformation on block level + + It handles + - transformation of a single IRBlock into SSA + - reassembling an SSA expression into a non-SSA + expression through iterative resolving of the RHS + """ + + def transform(self, loc_key): + """ + Transforms a block into SSA form + :param loc_key: IRBlock loc_key + """ + self._rename_expressions(loc_key) + + def reassemble_expr(self, expr): + """ + Reassembles an expression in SSA form into a solely non-SSA expression + :param expr: expression + :return: non-SSA expression + """ + # worklist + todo = {expr.copy()} + + while todo: + # current expression + cur = todo.pop() + # RHS of current expression + cur_rhs = self.expressions[cur] + + # replace cur with RHS in expr + expr = expr.replace_expr({cur: cur_rhs}) + + # parse ExprIDs on RHS + ids_rhs = self.get_regs(cur_rhs) + + # add RHS ids to worklist + for id_rhs in ids_rhs: + if id_rhs in self.expressions: + todo.add(id_rhs) + return expr + + +class SSAPath(SSABlock): + """ + SSA transformation on path level + + It handles + - transformation of a path of IRBlocks into SSA + """ + + def transform(self, path): + """ + Transforms a path into SSA + :param path: list of IRBlock loc_key + """ + for block in path: + self._rename_expressions(block) + + +class SSADiGraph(SSA): + """ + SSA transformation on DiGraph level + + It handles + - transformation of a DiGraph into SSA + - generation, insertion and filling of phi nodes + + The implemented SSA form is known as minimal SSA. + """ + + PHI_STR = 'Phi' + + + def __init__(self, ircfg): + """ + Initialises SSA class for directed graphs + :param ircfg: instance of IRCFG + """ + super(SSADiGraph, self).__init__(ircfg) + + # variable definitions + self.defs = {} + + # dict of blocks' phi nodes + self._phinodes = {} + + # IRCFG control flow graph + self.graph = ircfg + + + def transform(self, head): + """Transforms into SSA""" + sanitize_graph_head(self.graph, head) + self._init_variable_defs(head) + self._place_phi(head) + self._rename(head) + self._insert_phi() + self._convert_phi() + self._fix_no_def_var(head) + + def reset(self): + """Resets SSA transformation""" + super(SSADiGraph, self).reset() + self.defs = {} + self._phinodes = {} + + def _init_variable_defs(self, head): + """ + Initialises all variable definitions and + assigns the corresponding IRBlocks. + + All variable definitions in self.defs contain + a set of IRBlocks in which the variable gets assigned + """ + + visited_loc = set() + for loc_key in self.graph.walk_depth_first_forward(head): + irblock = self.get_block(loc_key) + if irblock is None: + # Incomplete graph + continue + visited_loc.add(loc_key) + # search for block's IR definitions/destinations + for assignblk in irblock.assignblks: + for dst in assignblk: + # enforce ExprId + if dst.is_id(): + # exclude immutable ids + if dst in self.immutable_ids or dst in self.ssa_variable_to_expr: + continue + # map variable definition to blocks + self.defs.setdefault(dst, set()).add(irblock.loc_key) + if visited_loc != set(self.graph.blocks): + raise RuntimeError("Cannot operate on a non connected graph") + + def _place_phi(self, head): + """ + For all blocks, empty phi functions will be placed for every + variable in the block's dominance frontier. + + self.phinodes contains a dict for every block in the + dominance frontier. In this dict, each variable + definition maps to its corresponding phi function. + + Source: Cytron, Ron, et al. + "An efficient method of computing static single assignment form" + Proceedings of the 16th ACM SIGPLAN-SIGACT symposium on + Principles of programming languages (1989), p. 30 + """ + # dominance frontier + frontier = self.graph.compute_dominance_frontier(head) + + for variable in self.defs: + done = set() + todo = set() + intodo = set() + + for loc_key in self.defs[variable]: + todo.add(loc_key) + intodo.add(loc_key) + + while todo: + loc_key = todo.pop() + + # walk through block's dominance frontier + for node in frontier.get(loc_key, []): + if node in done: + continue + # place empty phi functions for a variable + empty_phi = self._gen_empty_phi(variable) + + # add empty phi node for variable in node + self._phinodes.setdefault(node, {})[variable] = empty_phi.src + done.add(node) + + if node not in intodo: + intodo.add(node) + todo.add(node) + + def _gen_empty_phi(self, expr): + """ + Generates an empty phi function for a variable + :param expr: variable + :return: ExprAssign, empty phi function for expr + """ + phi = ExprId(self.PHI_STR, expr.size) + return ExprAssign(expr, phi) + + def _fill_phi(self, *args): + """ + Fills a phi function with variables. + + phi(x.1, x.5, x.6) + + :param args: list of ExprId + :return: ExprOp + """ + return ExprOp(self.PHI_STR, *set(args)) + + def _rename(self, head): + """ + Transforms each variable expression in the CFG into SSA + by traversing the dominator tree in depth-first search. + + 1. Transform variables of phi functions on LHS into SSA + 2. Transform all non-phi expressions into SSA + 3. Update the successor's phi functions' RHS with current SSA variables + 4. Save current SSA variable stack for successors in the dominator tree + + Source: Cytron, Ron, et al. + "An efficient method of computing static single assignment form" + Proceedings of the 16th ACM SIGPLAN-SIGACT symposium on + Principles of programming languages (1989), p. 31 + """ + # compute dominator tree + dominator_tree = self.graph.compute_dominator_tree(head) + + # init SSA variable stack + stack = [self._stack_rhs] + + # walk in DFS over the dominator tree + for loc_key in dominator_tree.walk_depth_first_forward(head): + # restore SSA variable stack of the predecessor in the dominator tree + self._stack_rhs = stack.pop().copy() + + # Transform variables of phi functions on LHS into SSA + self._rename_phi_lhs(loc_key) + + # Transform all non-phi expressions into SSA + self._rename_expressions(loc_key) + + # Update the successor's phi functions' RHS with current SSA variables + # walk over block's successors in the CFG + for successor in self.graph.successors_iter(loc_key): + self._rename_phi_rhs(successor) + + # Save current SSA variable stack for successors in the dominator tree + for _ in dominator_tree.successors_iter(loc_key): + stack.append(self._stack_rhs) + + def _rename_phi_lhs(self, loc_key): + """ + Transforms phi function's expressions of an IRBlock + on the left hand side into SSA + :param loc_key: IRBlock loc_key + """ + if loc_key in self._phinodes: + # create temporary list of phi function assignments for inplace renaming + tmp = list(self._phinodes[loc_key]) + + # iterate over all block's phi nodes + for dst in tmp: + # transform variables on LHS inplace + self._phinodes[loc_key][self._transform_expression_lhs(dst)] = self._phinodes[loc_key].pop(dst) + + def _rename_phi_rhs(self, successor): + """ + Transforms the right hand side of each successor's phi function + into SSA. Each transformed expression of a phi function's + right hand side is of the form + + phi(<var>.<index 1>, <var>.<index 2>, ..., <var>.<index n>) + + :param successor: loc_key of block's direct successor in the CFG + """ + # if successor is in block's dominance frontier + if successor in self._phinodes: + # walk over all variables on LHS + for dst, src in list(viewitems(self._phinodes[successor])): + # transform variable on RHS in non-SSA form + expr = self.reverse_variable(dst) + # transform expr into it's SSA form using current stack + src_ssa = self._transform_expression_rhs(expr) + + # Add src_ssa to phi args + if src.is_id(self.PHI_STR): + # phi function is empty + expr = self._fill_phi(src_ssa) + else: + # phi function contains at least one value + expr = self._fill_phi(src_ssa, *src.args) + + # update phi function + self._phinodes[successor][dst] = expr + + def _insert_phi(self): + """Inserts phi functions into the list of SSA expressions""" + for loc_key in self._phinodes: + for dst in self._phinodes[loc_key]: + self.expressions[dst] = self._phinodes[loc_key][dst] + + def _convert_phi(self): + """Inserts corresponding phi functions inplace + into IRBlock at the beginning""" + for loc_key in self._phinodes: + irblock = self.get_block(loc_key) + if irblock is None: + continue + assignblk = AssignBlock(self._phinodes[loc_key]) + if irblock_has_phi(irblock): + # If first block contains phi, we are updating an existing ssa form + # so update phi + assignblks = list(irblock.assignblks) + out = dict(assignblks[0]) + out.update(dict(assignblk)) + assignblks[0] = AssignBlock(out, assignblk.instr) + new_irblock = IRBlock(self.ircfg.loc_db, loc_key, assignblks) + else: + # insert at the beginning + new_irblock = IRBlock(self.ircfg.loc_db, loc_key, [assignblk] + list(irblock.assignblks)) + self.ircfg.blocks[loc_key] = new_irblock + + def _fix_no_def_var(self, head): + """ + Replace phi source variables which are not ssa vars by ssa vars. + @head: loc_key of the graph head + """ + var_to_insert = set() + for loc_key in self._phinodes: + for dst, sources in viewitems(self._phinodes[loc_key]): + for src in sources.args: + if src in self.ssa_variable_to_expr: + continue + var_to_insert.add(src) + var_to_newname = {} + newname_to_var = {} + for var in var_to_insert: + new_var = self._transform_var_lhs(var) + var_to_newname[var] = new_var + newname_to_var[new_var] = var + + # Replace non modified node used in phi with new variable + self.ircfg.simplify(lambda expr:expr.replace_expr(var_to_newname)) + + if newname_to_var: + irblock = self.ircfg.blocks[head] + assignblks = list(irblock) + assignblks[0:0] = [AssignBlock(newname_to_var, assignblks[0].instr)] + self.ircfg.blocks[head] = IRBlock(self.ircfg.loc_db, head, assignblks) + + # Updt structure + for loc_key in self._phinodes: + for dst, sources in viewitems(self._phinodes[loc_key]): + self._phinodes[loc_key][dst] = sources.replace_expr(var_to_newname) + + for var, (loc_key, index) in list(viewitems(self.ssa_to_location)): + if loc_key == head: + self.ssa_to_location[var] = loc_key, index + 1 + + for newname, var in viewitems(newname_to_var): + self.ssa_to_location[newname] = head, 0 + self.ssa_variable_to_expr[newname] = var + self.expressions[newname] = var + + +def irblock_has_phi(irblock): + """ + Return True if @irblock has Phi assignments + @irblock: IRBlock instance + """ + if not irblock.assignblks: + return False + for src in viewvalues(irblock[0]): + return src.is_op('Phi') + return False + + +class Varinfo(object): + """Store liveness information for a variable""" + __slots__ = ["live_index", "loc_key", "index"] + + def __init__(self, live_index, loc_key, index): + self.live_index = live_index + self.loc_key = loc_key + self.index = index + + +def get_var_assignment_src(ircfg, node, variables): + """ + Return the variable of @variables which is written by the irblock at @node + @node: Location + @variables: a set of variable to test + """ + irblock = ircfg.blocks[node] + for assignblk in irblock: + result = set(assignblk).intersection(variables) + if not result: + continue + assert len(result) == 1 + return list(result)[0] + return None + + +def get_phi_sources_parent_block(ircfg, loc_key, sources): + """ + Return a dictionary linking a variable to it's direct parent label + which belong to a path which affects the node. + @loc_key: the starting node + @sources: set of variables to resolve + """ + source_to_parent = {} + for parent in ircfg.predecessors(loc_key): + done = set() + todo = set([parent]) + found = False + while todo: + node = todo.pop() + if node in done: + continue + done.add(node) + ret = get_var_assignment_src(ircfg, node, sources) + if ret: + source_to_parent.setdefault(ret, set()).add(parent) + found = True + break + for pred in ircfg.predecessors(node): + todo.add(pred) + assert found + return source_to_parent |