#!/usr/bin/env python #-*- coding:utf-8 -*- import logging import inspect from collections import namedtuple import miasm2.expression.expression as m2_expr from miasm2.expression.simplifications import expr_simp from miasm2.expression.modint import moduint, modint from miasm2.core.utils import Disasm_Exception, pck from miasm2.core.graph import DiGraph, DiGraphSimplifier, MatchGraphJoker from miasm2.core.interval import interval log_asmbloc = logging.getLogger("asmblock") console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter("%(levelname)-5s: %(message)s")) log_asmbloc.addHandler(console_handler) log_asmbloc.setLevel(logging.WARNING) def is_int(a): return isinstance(a, int) or isinstance(a, long) or \ isinstance(a, moduint) or isinstance(a, modint) def expr_is_label(e): return isinstance(e, m2_expr.ExprId) and isinstance(e.name, asm_label) def expr_is_int_or_label(e): return isinstance(e, m2_expr.ExprInt) or \ (isinstance(e, m2_expr.ExprId) and isinstance(e.name, asm_label)) class asm_label: "Stand for an assembly label" def __init__(self, name="", offset=None): self.fixedblocs = False if is_int(name): name = "loc_%.16X" % (int(name) & 0xFFFFFFFFFFFFFFFF) self.name = name self.attrib = None if offset is None: self.offset = offset else: self.offset = int(offset) def __str__(self): if isinstance(self.offset, (int, long)): return "%s:0x%08x" % (self.name, self.offset) else: return "%s:%s" % (self.name, str(self.offset)) def __repr__(self): rep = '"] for l in self.bto: if l is None: lbls.append("Unknown? ") else: lbls.append(str(l) + " ") lbls = '\t'.join(lbls) out.append(lbls) return '\n'.join(out) def addline(self, l): self.lines.append(l) def addto(self, c): assert isinstance(self.bto, set) self.bto.add(c) def split(self, offset, l): log_asmbloc.debug('split at %x', offset) i = -1 offsets = [x.offset for x in self.lines] if not l.offset in offsets: log_asmbloc.warning( 'cannot split bloc at %X ' % offset + 'middle instruction? default middle') offsets.sort() return None new_bloc = asm_bloc(l) i = offsets.index(offset) self.lines, new_bloc.lines = self.lines[:i], self.lines[i:] flow_mod_instr = self.get_flow_instr() log_asmbloc.debug('flow mod %r', flow_mod_instr) c = asm_constraint(l, asm_constraint.c_next) # move dst if flowgraph modifier was in original bloc # (usecase: split delayslot bloc) if flow_mod_instr: for xx in self.bto: log_asmbloc.debug('lbl %s', xx) c_next = set( [x for x in self.bto if x.c_t == asm_constraint.c_next]) c_to = [x for x in self.bto if x.c_t != asm_constraint.c_next] self.bto = set([c] + c_to) new_bloc.bto = c_next else: new_bloc.bto = self.bto self.bto = set([c]) return new_bloc def get_range(self): """Returns the offset hull of an asm_bloc""" if len(self.lines): return (self.lines[0].offset, self.lines[-1].offset + self.lines[-1].l) else: return 0, 0 def get_offsets(self): return [x.offset for x in self.lines] def add_cst(self, offset, c_t, symbol_pool): if isinstance(offset, (int, long)): l = symbol_pool.getby_offset_create(offset) elif isinstance(offset, str): l = symbol_pool.getby_name_create(offset) elif isinstance(offset, asm_label): l = offset else: raise ValueError('unknown offset type %r' % offset) c = asm_constraint(l, c_t) self.bto.add(c) def get_flow_instr(self): if not self.lines: return None for i in xrange(-1, -1 - self.lines[0].delayslot - 1, -1): if not 0 <= i < len(self.lines): return None l = self.lines[i] if l.splitflow() or l.breakflow(): raise NotImplementedError('not fully functional') def get_subcall_instr(self): if not self.lines: return None delayslot = self.lines[0].delayslot end_index = len(self.lines) - 1 ds_max_index = max(end_index - delayslot, 0) for i in xrange(end_index, ds_max_index - 1, -1): l = self.lines[i] if l.is_subcall(): return l return None def get_next(self): for x in self.bto: if x.c_t == asm_constraint.c_next: return x.label return None @staticmethod def _filter_constraint(constraints): """Sort and filter @constraints for asm_bloc.bto @constraints: non-empty set of asm_constraint instance Always the same type -> one of the constraint c_next and c_to -> c_next """ # Only one constraint if len(constraints) == 1: return next(iter(constraints)) # Constraint type -> set of corresponding constraint cbytype = {} for cons in constraints: cbytype.setdefault(cons.c_t, set()).add(cons) # Only one type -> any constraint is OK if len(cbytype) == 1: return next(iter(constraints)) # At least 2 types -> types = {c_next, c_to} # c_to is included in c_next return next(iter(cbytype[asm_constraint.c_next])) def fix_constraints(self): """Fix next block constraints""" # destination -> associated constraints dests = {} for constraint in self.bto: dests.setdefault(constraint.label, set()).add(constraint) self.bto = set(self._filter_constraint(constraints) for constraints in dests.itervalues()) class asm_block_bad(asm_bloc): """Stand for a *bad* ASM block (malformed, unreachable, not disassembled, ...)""" ERROR_TYPES = {-1: "Unknown error", 0: "Unable to disassemble", 1: "Null starting block", 2: "Address forbidden by dont_dis", } def __init__(self, label=None, alignment=1, errno=-1, *args, **kwargs): """Instanciate an asm_block_bad. @label, @alignement: same as asm_bloc.__init__ @errno: (optional) specify a error type associated with the block """ super(asm_block_bad, self).__init__(label, alignment, *args, **kwargs) self._errno = errno def __str__(self): error_txt = self.ERROR_TYPES.get(self._errno, self._errno) return "\n".join([str(self.label), "\tBad block: %s" % error_txt]) def addline(self, *args, **kwargs): raise RuntimeError("An asm_block_bad cannot have line") def addto(self, *args, **kwargs): raise RuntimeError("An asm_block_bad cannot have bto") def split(self, *args, **kwargs): raise RuntimeError("An asm_block_bad cannot be splitted") class asm_symbol_pool: def __init__(self): self._labels = [] self._name2label = {} self._offset2label = {} self._label_num = 0 def add_label(self, name, offset=None): """ Create and add a label to the symbol_pool @name: label's name @offset: (optional) label's offset """ label = asm_label(name, offset) # Test for collisions if (label.offset in self._offset2label and label != self._offset2label[label.offset]): raise ValueError('symbol %s has same offset as %s' % (label, self._offset2label[label.offset])) if (label.name in self._name2label and label != self._name2label[label.name]): raise ValueError('symbol %s has same name as %s' % (label, self._name2label[label.name])) self._labels.append(label) if label.offset is not None: self._offset2label[label.offset] = label if label.name != "": self._name2label[label.name] = label return label def remove_label(self, label): """ Delete a @label """ self._name2label.pop(label.name, None) self._offset2label.pop(label.offset, None) if label in self._labels: self._labels.remove(label) def del_label_offset(self, label): """Unpin the @label from its offset""" self._offset2label.pop(label.offset, None) label.offset = None def getby_offset(self, offset): """Retrieve label using its @offset""" return self._offset2label.get(offset, None) def getby_name(self, name): """Retrieve label using its @name""" return self._name2label.get(name, None) def getby_name_create(self, name): """Get a label from its @name, create it if it doesn't exist""" label = self.getby_name(name) if label is None: label = self.add_label(name) return label def getby_offset_create(self, offset): """Get a label from its @offset, create it if it doesn't exist""" label = self.getby_offset(offset) if label is None: label = self.add_label(offset, offset) return label def rename_label(self, label, newname): """Rename the @label name to @newname""" if newname in self._name2label: raise ValueError('Symbol already known') self._name2label.pop(label.name, None) label.name = newname self._name2label[label.name] = label def set_offset(self, label, offset): """Pin the @label from at @offset Note that there is a special case when the offset is a list it happens when offsets are recomputed in resolve_symbol* """ if label is None: raise ValueError('label should not be None') if not label.name in self._name2label: raise ValueError('label %s not in symbol pool' % label) if offset is not None and offset in self._offset2label: raise ValueError('Conflict in label %s' % label) self._offset2label.pop(label.offset, None) label.offset = offset if is_int(label.offset): self._offset2label[label.offset] = label @property def items(self): """Return all labels""" return self._labels def __str__(self): return reduce(lambda x, y: x + str(y) + '\n', self._labels, "") def __getitem__(self, item): if item in self._name2label: return self._name2label[item] if item in self._offset2label: return self._offset2label[item] raise KeyError('unknown symbol %r' % item) def __contains__(self, item): return item in self._name2label or item in self._offset2label def merge(self, symbol_pool): """Merge with another @symbol_pool""" self._labels += symbol_pool._labels self._name2label.update(symbol_pool._name2label) self._offset2label.update(symbol_pool._offset2label) def gen_label(self): """Generate a new unpinned label""" label = self.add_label("lbl_gen_%.8X" % (self._label_num)) self._label_num += 1 return label class AsmCFG(DiGraph): """Directed graph standing for a ASM Control Flow Graph with: - nodes: asm_bloc - edges: constraints between blocks, synchronized with asm_bloc's "bto" Specialized the .dot export and force the relation between block to be uniq, and associated with a constraint. Offer helpers on AsmCFG management, such as research by label, sanity checking and mnemonic size guessing. """ # Internal structure for pending management AsmCFGPending = namedtuple("AsmCFGPending", ["waiter", "constraint"]) def __init__(self, *args, **kwargs): super(AsmCFG, self).__init__(*args, **kwargs) # Edges -> constraint self.edges2constraint = {} # Expected asm_label -> set( (src, dst), constraint ) self._pendings = {} # Label2block built on the fly self._label2block = {} # Compatibility with old list API def append(self, *args, **kwargs): raise DeprecationWarning("AsmCFG is a graph, use add_node") def remove(self, *args, **kwargs): raise DeprecationWarning("AsmCFG is a graph, use del_node") def __getitem__(self, *args, **kwargs): raise DeprecationWarning("Order of AsmCFG elements is not reliable") def __iter__(self): """Iterator on asm_bloc composing the current graph""" return iter(self._nodes) def __len__(self): """Return the number of blocks in AsmCFG""" return len(self._nodes) # Manage graph with associated constraints def add_edge(self, src, dst, constraint): """Add an edge to the graph @src: asm_bloc instance, source @dst: asm_block instance, destination @constraint: constraint associated to this edge """ # Sanity check assert (src, dst) not in self.edges2constraint # Add the edge to src.bto if needed if dst.label not in [cons.label for cons in src.bto]: src.bto.add(asm_constraint(dst.label, constraint)) # Add edge self.edges2constraint[(src, dst)] = constraint super(AsmCFG, self).add_edge(src, dst) def add_uniq_edge(self, src, dst, constraint): """Add an edge from @src to @dst if it doesn't already exist""" if (src not in self._nodes_succ or dst not in self._nodes_succ[src]): self.add_edge(src, dst, constraint) def del_edge(self, src, dst): """Delete the edge @src->@dst and its associated constraint""" # Delete from src.bto to_remove = [cons for cons in src.bto if cons.label == dst.label] if to_remove: assert len(to_remove) == 1 src.bto.remove(to_remove[0]) # Del edge del self.edges2constraint[(src, dst)] super(AsmCFG, self).del_edge(src, dst) def add_node(self, block): """Add the block @block to the current instance, if it is not already in @block: asm_bloc instance Edges will be created for @block.bto, if destinations are already in this instance. If not, they will be resolved when adding these aforementionned destinations. `self.pendings` indicates which blocks are not yet resolved. """ status = super(AsmCFG, self).add_node(block) if not status: return status # Update waiters if block.label in self._pendings: for bblpend in self._pendings[block.label]: self.add_edge(bblpend.waiter, block, bblpend.constraint) del self._pendings[block.label] # Synchronize edges with block destinations self._label2block[block.label] = block for constraint in block.bto: dst = self._label2block.get(constraint.label, None) if dst is None: # Block is yet unknown, add it to pendings to_add = self.AsmCFGPending(waiter=block, constraint=constraint.c_t) self._pendings.setdefault(constraint.label, set()).add(to_add) else: # Block is already in known nodes self.add_edge(block, dst, constraint.c_t) return status def del_node(self, block): super(AsmCFG, self).del_node(block) del self._label2block[block.label] def merge(self, graph): """Merge with @graph, taking in account constraints""" # -> add_edge(x, y, constraint) for node in graph._nodes: self.add_node(node) for edge in graph._edges: # Use "_uniq_" beacause the edge can already exist due to add_node self.add_uniq_edge(*edge, constraint=graph.edges2constraint[edge]) def node2lines(self, node): yield self.DotCellDescription(text=str(node.label.name), attr={'align': 'center', 'colspan': 2, 'bgcolor': 'grey'}) if isinstance(node, asm_block_bad): yield [self.DotCellDescription( text=node.ERROR_TYPES.get(node._errno, node._errno), attr={})] raise StopIteration for line in node.lines: if self._dot_offset: yield [self.DotCellDescription(text="%.8X" % line.offset, attr={}), self.DotCellDescription(text=str(line), attr={})] else: yield self.DotCellDescription(text=str(line), attr={}) def node_attr(self, node): if isinstance(node, asm_block_bad): return {'style': 'filled', 'fillcolor': 'red'} return {} def edge_attr(self, src, dst): cst = self.edges2constraint.get((src, dst), None) edge_color = "blue" if len(self.successors(src)) > 1: if cst == asm_constraint.c_next: edge_color = "red" else: edge_color = "limegreen" return {"color": edge_color} def dot(self, offset=False): """ @offset: (optional) if set, add the corresponding offsets in each node """ self._dot_offset = offset return super(AsmCFG, self).dot() # Helpers @property def pendings(self): """Dictionary of label -> set(AsmCFGPending instance) indicating which label are missing in the current instance. A label is missing if a block which is already in nodes has constraints with him (thanks to its .bto) and the corresponding block is not yet in nodes """ return self._pendings def _build_label2block(self): self._label2block = {block.label: block for block in self._nodes} def label2block(self, label): """Return the block corresponding to label @label @label: asm_label instance or ExprId(asm_label) instance""" return self._label2block[label] def rebuild_edges(self): """Consider blocks '.bto' and rebuild edges according to them, ie: - update constraint type - add missing edge - remove no more used edge This method should be called if a block's '.bto' in nodes have been modified without notifying this instance to resynchronize edges. """ self._build_label2block() for block in self._nodes: edges = [] # Rebuild edges from bto for constraint in block.bto: dst = self._label2block.get(constraint.label, None) if dst is None: # Missing destination, add to pendings self._pendings.setdefault(constraint.label, set()).add(self.AsmCFGPending(block, constraint.c_t)) continue edge = (block, dst) edges.append(edge) if edge in self._edges: # Already known edge, constraint may have changed self.edges2constraint[edge] = constraint.c_t else: # An edge is missing self.add_edge(edge[0], edge[1], constraint.c_t) # Remove useless edges for succ in self.successors(block): edge = (block, succ) if edge not in edges: self.del_edge(*edge) def get_bad_blocks(self): """Iterator on asm_block_bad elements""" # A bad asm block is always a leaf for block in self.leaves(): if isinstance(block, asm_block_bad): yield block def get_bad_blocks_predecessors(self, strict=False): """Iterator on block with an asm_block_bad destination @strict: (optional) if set, return block with only bad successors """ # Avoid returning the same block done = set() for badblock in self.get_bad_blocks(): for predecessor in self.predecessors_iter(badblock): if predecessor not in done: if (strict and not all(isinstance(block, asm_block_bad) for block in self.successors_iter(predecessor))): continue yield predecessor done.add(predecessor) def sanity_check(self): """Do sanity checks on blocks' constraints: * no pendings * no multiple next constraint to same block * no next constraint to self """ if len(self._pendings) != 0: raise RuntimeError("Some blocks are missing: %s" % map(str, self._pendings.keys())) next_edges = {edge: constraint for edge, constraint in self.edges2constraint.iteritems() if constraint == asm_constraint.c_next} for block in self._nodes: # No next constraint to self if (block, block) in next_edges: raise RuntimeError('Bad constraint: self in next') # No multiple next constraint to same block pred_next = list(pblock for (pblock, dblock) in next_edges if dblock == block) if len(pred_next) > 1: raise RuntimeError("Too many next constraints for bloc %r" "(%s)" % (block.label, map(lambda x: x.label, pred_next))) def guess_blocks_size(self, mnemo): """Asm and compute max block size Add a 'size' and 'max_size' attribute on each block @mnemo: metamn instance""" for block in self._nodes: size = 0 for instr in block.lines: if isinstance(instr, asm_raw): # for special asm_raw, only extract len if isinstance(instr.raw, list): data = None if len(instr.raw) == 0: l = 0 else: l = instr.raw[0].size / 8 * len(instr.raw) elif isinstance(instr.raw, str): data = instr.raw l = len(data) else: raise NotImplementedError('asm raw') else: # Assemble the instruction to retrieve its len. # If the instruction uses symbol it will fail # In this case, the max_instruction_len is used try: candidates = mnemo.asm(instr) l = len(candidates[-1]) except: l = mnemo.max_instruction_len data = None instr.data = data instr.l = l size += l block.size = size block.max_size = size log_asmbloc.info("size: %d max: %d", block.size, block.max_size) def apply_splitting(self, symbol_pool, dis_block_callback=None, **kwargs): """Consider @self' bto destinations and split block in @self if one of these destinations jumps in the middle of this block. In order to work, they must be only one block in @self per label in @symbol_pool (which is true if @self come from the same disasmEngine). @symbol_pool: asm_symbol_pool instance associated with @self'labels @dis_block_callback: (optional) if set, this callback will be called on new block destinations @kwargs: (optional) named arguments to pass to dis_block_callback """ # Get all possible destinations not yet resolved, with a resolved # offset block_dst = [label.offset for label in self.pendings if label.offset is not None] todo = self.nodes().copy() rebuild_needed = False while todo: # Find a block with a destination inside another one cur_block = todo.pop() range_start, range_stop = cur_block.get_range() for off in block_dst: if not (off > range_start and off < range_stop): continue # `cur_block` must be splitted at offset `off` label = symbol_pool.getby_offset_create(off) new_b = cur_block.split(off, label) log_asmbloc.debug("Split block %x", off) if new_b is None: log_asmbloc.error("Cannot split %x!!", off) continue # Remove pending from cur_block # Links from new_b will be generated in rebuild_edges for dst in new_b.bto: if dst.label not in self.pendings: continue self.pendings[dst.label] = set(pending for pending in self.pendings[dst.label] if pending.waiter != cur_block) # The new block destinations may need to be disassembled if dis_block_callback: offsets_to_dis = set(constraint.label.offset for constraint in new_b.bto) dis_block_callback(cur_bloc=new_b, offsets_to_dis=offsets_to_dis, symbol_pool=symbol_pool, **kwargs) # Update structure rebuild_needed = True self.add_node(new_b) # The new block must be considered todo.add(new_b) range_start, range_stop = cur_block.get_range() # Rebuild edges to match new blocks'bto if rebuild_needed: self.rebuild_edges() # Out of _merge_blocks to be computed only once _acceptable_block = lambda block: (not isinstance(block, asm_block_bad) and len(block.lines) > 0) _parent = MatchGraphJoker(restrict_in=False, filt=_acceptable_block) _son = MatchGraphJoker(restrict_out=False, filt=_acceptable_block) _expgraph = _parent >> _son def _merge_blocks(dg, graph): """Graph simplification merging asm_bloc with one and only one son with this son if this son has one and only one parent""" # Blocks to ignore, because they have been removed from the graph to_ignore = set() for match in _expgraph.match(graph): # Get matching blocks block, succ = match[_parent], match[_son] # Ignore already deleted blocks if (block in to_ignore or succ in to_ignore): continue # Remove block last instruction if needed last_instr = block.lines[-1] if last_instr.delayslot > 0: # TODO: delayslot raise RuntimeError("Not implemented yet") if last_instr.is_subcall(): continue if last_instr.breakflow() and last_instr.dstflow(): block.lines.pop() # Merge block block.lines += succ.lines for nextb in graph.successors_iter(succ): graph.add_edge(block, nextb, graph.edges2constraint[(succ, nextb)]) graph.del_node(succ) to_ignore.add(succ) bbl_simplifier = DiGraphSimplifier() bbl_simplifier.enable_passes([_merge_blocks]) def conservative_asm(mnemo, instr, symbols, conservative): """ Asm instruction; Try to keep original instruction bytes if it exists """ candidates = mnemo.asm(instr, symbols) if not candidates: raise ValueError('cannot asm:%s' % str(instr)) if not hasattr(instr, "b"): return candidates[0], candidates if instr.b in candidates: return instr.b, candidates if conservative: for c in candidates: if len(c) == len(instr.b): return c, candidates return candidates[0], candidates def fix_expr_val(expr, symbols): """Resolve an expression @expr using @symbols""" def expr_calc(e): if isinstance(e, m2_expr.ExprId): s = symbols._name2label[e.name] e = m2_expr.ExprInt_from(e, s.offset) return e result = expr.visit(expr_calc) result = expr_simp(result) if not isinstance(result, m2_expr.ExprInt): raise RuntimeError('Cannot resolve symbol %s' % expr) return result def fix_label_offset(symbol_pool, label, offset, modified): """Fix the @label offset to @offset. If the @offset has changed, add @label to @modified @symbol_pool: current symbol_pool """ if label.offset == offset: return symbol_pool.set_offset(label, offset) modified.add(label) class BlockChain(object): """Manage blocks linked with an asm_constraint_next""" def __init__(self, symbol_pool, blocks): self.symbol_pool = symbol_pool self.blocks = blocks self.place() @property def pinned(self): """Return True iff at least one block is pinned""" return self.pinned_block_idx is not None def _set_pinned_block_idx(self): self.pinned_block_idx = None for i, block in enumerate(self.blocks): if is_int(block.label.offset): if self.pinned_block_idx is not None: raise ValueError("Multiples pinned block detected") self.pinned_block_idx = i def place(self): """Compute BlockChain min_offset and max_offset using pinned block and blocks' size """ self._set_pinned_block_idx() self.max_size = 0 for block in self.blocks: self.max_size += block.max_size + block.alignment - 1 # Check if chain has one block pinned if not self.pinned: return offset_base = self.blocks[self.pinned_block_idx].label.offset assert(offset_base % self.blocks[self.pinned_block_idx].alignment == 0) self.offset_min = offset_base for block in self.blocks[:self.pinned_block_idx - 1:-1]: self.offset_min -= block.max_size + \ (block.alignment - block.max_size) % block.alignment self.offset_max = offset_base for block in self.blocks[self.pinned_block_idx:]: self.offset_max += block.max_size + \ (block.alignment - block.max_size) % block.alignment def merge(self, chain): """Best effort merge two block chains Return the list of resulting blockchains""" self.blocks += chain.blocks self.place() return [self] def fix_blocks(self, modified_labels): """Propagate a pinned to its blocks' neighbour @modified_labels: store new pinned labels""" if not self.pinned: raise ValueError('Trying to fix unpinned block') # Propagate offset to blocks before pinned block pinned_block = self.blocks[self.pinned_block_idx] offset = pinned_block.label.offset if offset % pinned_block.alignment != 0: raise RuntimeError('Bad alignment') for block in self.blocks[:self.pinned_block_idx - 1:-1]: new_offset = offset - block.size new_offset = new_offset - new_offset % pinned_block.alignment fix_label_offset(self.symbol_pool, block.label, new_offset, modified_labels) # Propagate offset to blocks after pinned block offset = pinned_block.label.offset + pinned_block.size last_block = pinned_block for block in self.blocks[self.pinned_block_idx + 1:]: offset += (- offset) % last_block.alignment fix_label_offset(self.symbol_pool, block.label, offset, modified_labels) offset += block.size last_block = block return modified_labels class BlockChainWedge(object): """Stand for wedges between blocks""" def __init__(self, symbol_pool, offset, size): self.symbol_pool = symbol_pool self.offset = offset self.max_size = size self.offset_min = offset self.offset_max = offset + size def merge(self, chain): """Best effort merge two block chains Return the list of resulting blockchains""" self.symbol_pool.set_offset(chain.blocks[0].label, self.offset_max) chain.place() return [self, chain] def group_constrained_blocks(symbol_pool, blocks): """ Return the BlockChains list built from grouped asm blocks linked by asm_constraint_next @blocks: a list of asm block """ log_asmbloc.info('group_constrained_blocks') # Group adjacent blocks remaining_blocks = list(blocks) known_block_chains = {} lbl2block = {block.label: block for block in blocks} while remaining_blocks: # Create a new block chain block_list = [remaining_blocks.pop()] # Find sons in remainings blocks linked with a next constraint while True: # Get next block next_label = block_list[-1].get_next() if next_label is None or next_label not in lbl2block: break next_block = lbl2block[next_label] # Add the block at the end of the current chain if next_block not in remaining_blocks: break block_list.append(next_block) remaining_blocks.remove(next_block) # Check if son is in a known block group if next_label is not None and next_label in known_block_chains: block_list += known_block_chains[next_label] del known_block_chains[next_label] known_block_chains[block_list[0].label] = block_list out_block_chains = [] for label in known_block_chains: chain = BlockChain(symbol_pool, known_block_chains[label]) out_block_chains.append(chain) return out_block_chains def get_blockchains_address_interval(blockChains, dst_interval): """Compute the interval used by the pinned @blockChains Check if the placed chains are in the @dst_interval""" allocated_interval = interval() for chain in blockChains: if not chain.pinned: continue chain_interval = interval([(chain.offset_min, chain.offset_max - 1)]) if chain_interval not in dst_interval: raise ValueError('Chain placed out of destination interval') allocated_interval += chain_interval return allocated_interval def resolve_symbol(blockChains, symbol_pool, dst_interval=None): """Place @blockChains in the @dst_interval""" log_asmbloc.info('resolve_symbol') if dst_interval is None: dst_interval = interval([(0, 0xFFFFFFFFFFFFFFFF)]) forbidden_interval = interval( [(-1, 0xFFFFFFFFFFFFFFFF + 1)]) - dst_interval allocated_interval = get_blockchains_address_interval(blockChains, dst_interval) log_asmbloc.debug('allocated interval: %s', allocated_interval) pinned_chains = [chain for chain in blockChains if chain.pinned] # Add wedge in forbidden intervals for start, stop in forbidden_interval.intervals: wedge = BlockChainWedge( symbol_pool, offset=start, size=stop + 1 - start) pinned_chains.append(wedge) # Try to place bigger blockChains first pinned_chains.sort(key=lambda x: x.offset_min) blockChains.sort(key=lambda x: -x.max_size) fixed_chains = list(pinned_chains) log_asmbloc.debug("place chains") for chain in blockChains: if chain.pinned: continue fixed = False for i in xrange(1, len(fixed_chains)): prev_chain = fixed_chains[i - 1] next_chain = fixed_chains[i] if prev_chain.offset_max + chain.max_size < next_chain.offset_min: new_chains = prev_chain.merge(chain) fixed_chains[i - 1:i] = new_chains fixed = True break if not fixed: raise RuntimeError('Cannot find enough space to place blocks') return [chain for chain in fixed_chains if isinstance(chain, BlockChain)] def filter_exprid_label(exprs): """Extract labels from list of ExprId @exprs""" return set(expr.name for expr in exprs if isinstance(expr.name, asm_label)) def get_block_labels(block): """Extract labels used by @block""" symbols = set() for instr in block.lines: if isinstance(instr, asm_raw): if isinstance(instr.raw, list): for expr in instr.raw: symbols.update(m2_expr.get_expr_ids(expr)) else: for arg in instr.args: symbols.update(m2_expr.get_expr_ids(arg)) labels = filter_exprid_label(symbols) return labels def assemble_block(mnemo, block, symbol_pool, conservative=False): """Assemble a @block using @symbol_pool @conservative: (optional) use original bytes when possible """ offset_i = 0 for instr in block.lines: if isinstance(instr, asm_raw): if isinstance(instr.raw, list): # Fix special asm_raw data = "" for expr in instr.raw: expr_int = fix_expr_val(expr, symbol_pool) data += pck[expr_int.size](expr_int.arg) instr.data = data instr.offset = offset_i offset_i += instr.l continue # Assemble an instruction saved_args = list(instr.args) instr.offset = block.label.offset + offset_i # Replace instruction's arguments by resolved ones instr.args = instr.resolve_args_with_symbols(symbol_pool) if instr.dstflow(): instr.fixDstOffset() old_l = instr.l cached_candidate, _ = conservative_asm(mnemo, instr, symbol_pool, conservative) # Restore original arguments instr.args = saved_args # We need to update the block size block.size = block.size - old_l + len(cached_candidate) instr.data = cached_candidate instr.l = len(cached_candidate) offset_i += instr.l def asmbloc_final(mnemo, blocks, blockChains, symbol_pool, conservative=False): """Resolve and assemble @blockChains using @symbol_pool until fixed point is reached""" log_asmbloc.debug("asmbloc_final") # Init structures lbl2block = {block.label: block for block in blocks} blocks_using_label = {} for block in blocks: labels = get_block_labels(block) for label in labels: blocks_using_label.setdefault(label, set()).add(block) block2chain = {} for chain in blockChains: for block in chain.blocks: block2chain[block] = chain # Init worklist blocks_to_rework = set(blocks) # Fix and re-assemble blocks until fixed point is reached while True: # Propagate pinned blocks into chains modified_labels = set() for chain in blockChains: chain.fix_blocks(modified_labels) for label in modified_labels: # Retrive block with modified reference if label in lbl2block: blocks_to_rework.add(lbl2block[label]) # Enqueue blocks referencing a modified label if label not in blocks_using_label: continue for block in blocks_using_label[label]: blocks_to_rework.add(block) # No more work if not blocks_to_rework: break while blocks_to_rework: block = blocks_to_rework.pop() assemble_block(mnemo, block, symbol_pool, conservative) def asm_resolve_final(mnemo, blocks, symbol_pool, dst_interval=None): """Resolve and assemble @blocks using @symbol_pool into interval @dst_interval""" blocks.sanity_check() blocks.guess_blocks_size(mnemo) blockChains = group_constrained_blocks(symbol_pool, blocks) resolved_blockChains = resolve_symbol( blockChains, symbol_pool, dst_interval) asmbloc_final(mnemo, blocks, resolved_blockChains, symbol_pool) patches = {} output_interval = interval() for block in blocks: offset = block.label.offset for instr in block.lines: if not instr.data: # Empty line continue assert len(instr.data) == instr.l patches[offset] = instr.data instruction_interval = interval([(offset, offset + instr.l - 1)]) if not (instruction_interval & output_interval).empty: raise RuntimeError("overlapping bytes %X" % int(offset)) instr.offset = offset offset += instr.l return patches class disasmEngine(object): """Disassembly engine, taking care of disassembler options and mutli-block strategy. Engine options: + Object supporting membership test (offset in ..) - dont_dis: stop the current disassembly branch if reached - split_dis: force a basic block end if reached, with a next constraint on its successor + On/Off - follow_call: recursively disassemble CALL destinations - dontdis_retcall: stop on CALL return addresses - dont_dis_nulstart_bloc: stop if a block begin with a few \x00 + Number - lines_wd: maximum block's size (in number of instruction) - blocs_wd: maximum number of distinct disassembled block + callback(arch, attrib, pool_bin, cur_bloc, offsets_to_dis, symbol_pool) - dis_bloc_callback: callback after each new disassembled block The engine also tracks already handled block, for performance and to avoid infinite cycling. Addresses of disassembled block is in the attribute `job_done`. To force a new disassembly, the targeted offset must first be removed from this structure. """ def __init__(self, arch, attrib, bin_stream, **kwargs): """Instanciate a new disassembly engine @arch: targeted architecture @attrib: architecture attribute @bin_stream: bytes source @kwargs: (optional) custom options """ self.arch = arch self.attrib = attrib self.bin_stream = bin_stream self.symbol_pool = asm_symbol_pool() self.job_done = set() # Setup options self.dont_dis = [] self.split_dis = [] self.follow_call = False self.dontdis_retcall = False self.lines_wd = None self.blocs_wd = None self.dis_bloc_callback = None self.dont_dis_nulstart_bloc = False # Override options if needed self.__dict__.update(kwargs) def _dis_bloc(self, offset): """Disassemble the block at offset @offset Return the created asm_bloc and future offsets to disassemble """ lines_cpt = 0 in_delayslot = False delayslot_count = self.arch.delayslot offsets_to_dis = set() add_next_offset = False label = self.symbol_pool.getby_offset_create(offset) cur_block = asm_bloc(label) log_asmbloc.debug("dis at %X", int(offset)) while not in_delayslot or delayslot_count > 0: if in_delayslot: delayslot_count -= 1 if offset in self.dont_dis: if not cur_block.lines: self.job_done.add(offset) # Block is empty -> bad block cur_block = asm_block_bad(label, errno=2) else: # Block is not empty, stop the desassembly pass and add a # constraint to the next block cur_block.add_cst(offset, asm_constraint.c_next, self.symbol_pool) break if lines_cpt > 0 and offset in self.split_dis: cur_block.add_cst(offset, asm_constraint.c_next, self.symbol_pool) offsets_to_dis.add(offset) break lines_cpt += 1 if self.lines_wd is not None and lines_cpt > self.lines_wd: log_asmbloc.debug("lines watchdog reached at %X", int(offset)) break if offset in self.job_done: cur_block.add_cst(offset, asm_constraint.c_next, self.symbol_pool) break off_i = offset try: instr = self.arch.dis(self.bin_stream, self.attrib, offset) except (Disasm_Exception, IOError), e: log_asmbloc.warning(e) instr = None if instr is None: log_asmbloc.warning("cannot disasm at %X", int(off_i)) if not cur_block.lines: self.job_done.add(offset) # Block is empty -> bad block cur_block = asm_block_bad(label, errno=0) else: # Block is not empty, stop the desassembly pass and add a # constraint to the next block cur_block.add_cst(off_i, asm_constraint.c_next, self.symbol_pool) break # XXX TODO nul start block option if self.dont_dis_nulstart_bloc and instr.b.count('\x00') == instr.l: log_asmbloc.warning("reach nul instr at %X", int(off_i)) if not cur_block.lines: # Block is empty -> bad block cur_block = asm_block_bad(label, errno=1) else: # Block is not empty, stop the desassembly pass and add a # constraint to the next block cur_block.add_cst(off_i, asm_constraint.c_next, self.symbol_pool) break # special case: flow graph modificator in delayslot if in_delayslot and instr and (instr.splitflow() or instr.breakflow()): add_next_offset = True break self.job_done.add(offset) log_asmbloc.debug("dis at %X", int(offset)) offset += instr.l log_asmbloc.debug(instr) log_asmbloc.debug(instr.args) cur_block.addline(instr) if not instr.breakflow(): continue # test split if instr.splitflow() and not (instr.is_subcall() and self.dontdis_retcall): add_next_offset = True pass if instr.dstflow(): instr.dstflow2label(self.symbol_pool) dst = instr.getdstflow(self.symbol_pool) dstn = [] for d in dst: if isinstance(d, m2_expr.ExprId) and \ isinstance(d.name, asm_label): dstn.append(d.name) dst = dstn if (not instr.is_subcall()) or self.follow_call: cur_block.bto.update( [asm_constraint(x, asm_constraint.c_to) for x in dst]) # get in delayslot mode in_delayslot = True delayslot_count = instr.delayslot for c in cur_block.bto: offsets_to_dis.add(c.label.offset) if add_next_offset: cur_block.add_cst(offset, asm_constraint.c_next, self.symbol_pool) offsets_to_dis.add(offset) # Fix multiple constraints cur_block.fix_constraints() if self.dis_bloc_callback is not None: self.dis_bloc_callback(mn=self.arch, attrib=self.attrib, pool_bin=self.bin_stream, cur_bloc=cur_block, offsets_to_dis=offsets_to_dis, symbol_pool=self.symbol_pool) return cur_block, offsets_to_dis def dis_bloc(self, offset): """Disassemble the block at offset @offset and return the created asm_bloc @offset: targeted offset to disassemble """ current_block, _ = self._dis_bloc(offset) return current_block def dis_multibloc(self, offset, blocs=None): """Disassemble every block reachable from @offset regarding specific disasmEngine conditions Return an AsmCFG instance containing disassembled blocks @offset: starting offset @blocs: (optional) AsmCFG instance of already disassembled blocks to merge with """ log_asmbloc.info("dis bloc all") if blocs is None: blocs = AsmCFG() todo = [offset] bloc_cpt = 0 while len(todo): bloc_cpt += 1 if self.blocs_wd is not None and bloc_cpt > self.blocs_wd: log_asmbloc.debug("blocs watchdog reached at %X", int(offset)) break target_offset = int(todo.pop(0)) if (target_offset is None or target_offset in self.job_done): continue cur_block, nexts = self._dis_bloc(target_offset) todo += nexts blocs.add_node(cur_block) blocs.apply_splitting(self.symbol_pool, dis_block_callback=self.dis_bloc_callback, mn=self.arch, attrib=self.attrib, pool_bin=self.bin_stream) return blocs