diff options
| author | Ajax <commial@gmail.com> | 2016-01-25 11:03:53 +0100 |
|---|---|---|
| committer | Ajax <commial@gmail.com> | 2016-01-26 17:09:18 +0100 |
| commit | 01b1f292c84d4cbda38b6c308fc7b1af52595cec (patch) | |
| tree | 1ed1decf955e8f67bc6c3402e2d38e07a6a68fdd | |
| parent | 7e2ad0109bb191b7e83f4e779b39440664ce4b5c (diff) | |
| download | miasm-01b1f292c84d4cbda38b6c308fc7b1af52595cec.tar.gz miasm-01b1f292c84d4cbda38b6c308fc7b1af52595cec.zip | |
AsmBloc: introduce BasicBlocks, standing for a CFG with associated methods
| -rw-r--r-- | miasm2/core/asmbloc.py | 490 | ||||
| -rw-r--r-- | miasm2/core/parse_asm.py | 4 |
2 files changed, 344 insertions, 150 deletions
diff --git a/miasm2/core/asmbloc.py b/miasm2/core/asmbloc.py index 3517d452..e72cd7f7 100644 --- a/miasm2/core/asmbloc.py +++ b/miasm2/core/asmbloc.py @@ -4,7 +4,7 @@ import logging import inspect import re - +from collections import namedtuple import miasm2.expression.expression as m2_expr from miasm2.expression.simplifications import expr_simp @@ -13,6 +13,7 @@ from miasm2.core.utils import Disasm_Exception, pck from miasm2.core.graph import DiGraph from miasm2.core.interval import interval + log_asmbloc = logging.getLogger("asmblock") console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter("%(levelname)-5s: %(message)s")) @@ -572,7 +573,7 @@ def dis_bloc_all(mnemo, pool_bin, offset, job_done, symbol_pool, dont_dis=[], attrib={}): log_asmbloc.info("dis bloc all") if blocs is None: - blocs = [] + blocs = BasicBlocks() todo = [offset] bloc_cpt = 0 @@ -609,70 +610,360 @@ def dis_bloc_all(mnemo, pool_bin, offset, job_done, symbol_pool, dont_dis=[], dont_dis_nulstart_bloc=dont_dis_nulstart_bloc, attrib=attrib) todo += nexts - blocs.append(cur_block) + blocs.add_node(cur_block) return split_bloc(mnemo, attrib, pool_bin, blocs, symbol_pool, dis_bloc_callback=dis_bloc_callback) + return blocs -def bloc2graph(blocks, label=False, lines=True): - """Render dot graph of @blocks""" +class BasicBlocks(DiGraph): + """Directed graph standing for a ASM Control Flow Graph with: + - nodes: asm_bloc + - edges: constraints between blocks, synchronized with asm_bloc's "bto" - escape_chars = re.compile('[' + re.escape('{}') + ']') - label_attr = 'colspan="2" align="center" bgcolor="grey"' - edge_attr = 'label = "%s" color="%s" style="bold"' - td_attr = 'align="left"' - block_attr = 'shape="Mrecord" fontname="Courier New"' + Specialized the .dot export and force the relation between block to be uniq, + and associated with a constraint. - out = ["digraph asm_graph {"] - fix_chars = lambda x: '\\' + x.group() + Offer helpers on BasicBlocks management, such as research by label, sanity + checking and mnemonic size guessing. + """ - # Generate basic blocks - out_blocks = [] - for block in blocks: - out_block = '%s [\n' % block.label.name - out_block += "%s " % block_attr - out_block += 'label =<<table border="0" cellborder="0" cellpadding="3">' - - block_label = '<tr><td %s>%s</td></tr>' % ( - label_attr, block.label.name) - block_html_lines = [] - if lines: - for line in block.lines: - if label: - out_render = "%.8X</td><td %s> " % (line.offset, td_attr) - else: - out_render = "" - out_render += escape_chars.sub(fix_chars, str(line)) - block_html_lines.append(out_render) - block_html_lines = ('<tr><td %s>' % td_attr + - ('</td></tr><tr><td %s>' % td_attr).join(block_html_lines) + - '</td></tr>') - out_block += "%s " % block_label - out_block += block_html_lines + "</table>> ];" - out_blocks.append(out_block) - - out += out_blocks - - # Generate links - for block in blocks: - for next_b in block.bto: - src, dst, cst = block.label.name, next_b.label.name, next_b.c_t + # Internal structure for pending management + BasicBlocksPending = namedtuple("BasicBlocksPending", + ["waiter", "constraint"]) + + def __init__(self, *args, **kwargs): + super(BasicBlocks, self).__init__(*args, **kwargs) + # Edges -> constraint + self.edges2constraint = {} + # Expected asm_label -> list( (src, dst), constraint ) + self._pendings = {} + # Label2block built on the fly + self._label2block = {} + + # Compatibility with old list API + def append(self, *args, **kwargs): + raise DeprecationWarning("BasicBlocks is a graph, use add_node") + + def remove(self, *args, **kwargs): + raise DeprecationWarning("BasicBlocks is a graph, use del_node") + + def __getitem__(self, *args, **kwargs): + raise DeprecationWarning("Order of BasicBlocks elements is not reliable") + + def __iter__(self): + """Iterator on asm_bloc composing the current graph""" + return iter(self._nodes) + + def __len__(self): + """Return the number of blocks in BasicBlocks""" + return len(self._nodes) + + # Manage graph with associated constraints + def add_edge(self, src, dst, constraint): + """Add an edge to the graph + @src: asm_bloc instance, source + @dst: asm_block instance, destination + @constraint: constraint associated to this edge + """ + # Sanity check + assert (src, dst) not in self.edges2constraint + + # Add the edge to src.bto if needed + if dst.label not in [cons.label for cons in src.bto]: + src.bto.add(asm_constraint(dst.label, constraint)) + + # Add edge + self.edges2constraint[(src, dst)] = constraint + super(BasicBlocks, self).add_edge(src, dst) + + def add_uniq_edge(self, src, dst, constraint): + """Add an edge from @src to @dst if it doesn't already exist""" + if (src not in self._nodes_succ or + dst not in self._nodes_succ[src]): + self.add_edge(src, dst, constraint) + + def del_edge(self, src, dst): + """Delete the edge @src->@dst and its associated constraint""" + # Delete from src.bto + to_remove = [cons for cons in src.bto if cons.label == dst.label] + if to_remove: + assert len(to_remove) == 1 + src.bto.remove(to_remove[0]) + + # Del edge + del self.edges2constraint[(src, dst)] + super(BasicBlocks, self).del_edge(src, dst) + + def add_node(self, block): + """Add the block @block to the current instance, if it is not already in + @block: asm_bloc instance + + Edges will be created for @block.bto, if destinations are already in + this instance. If not, they will be resolved when adding these + aforementionned destinations. + `self.pendings` indicates which blocks are not yet resolved. + """ + status = super(BasicBlocks, self).add_node(block) + if not status: + return status + + # Update waiters + if block.label in self._pendings: + for bblpend in self._pendings[block.label]: + self.add_edge(bblpend.waiter, block, bblpend.constraint) + del self._pendings[block.label] + + # Synchronize edges with block destinations + self._label2block[block.label] = block + for constraint in block.bto: + dst = self._label2block.get(constraint.label, + None) + if dst is None: + # Block is yet unknown, add it to pendings + to_add = self.BasicBlocksPending(waiter=block, + constraint=constraint.c_t) + self._pendings.setdefault(constraint.label, + list()).append(to_add) + else: + # Block is already in known nodes + self.add_edge(block, dst, constraint.c_t) + + return status + + def del_node(self, block): + super(BasicBlocks, self).del_node(block) + del self._label2block[block.label] + + def merge(self, graph): + """Merge with @graph, taking in account constraints""" + # -> add_edge(x, y, constraint) + for node in graph._nodes: + self.add_node(node) + for edge in graph._edges: + # Use "_uniq_" beacause the edge can already exist due to add_node + self.add_uniq_edge(*edge, constraint=graph.edges2constraint[edge]) + + def dot(self, label=False, lines=True): + """Render dot graph with HTML + @label: (optional) if set, add the corresponding label in each block + @lines: (optional) if set, includes assembly lines in the output + """ + + escape_chars = re.compile('[' + re.escape('{}') + ']') + label_attr = 'colspan="2" align="center" bgcolor="grey"' + edge_attr = 'label = "%s" color="%s" style="bold"' + td_attr = 'align="left"' + block_attr = 'shape="Mrecord" fontname="Courier New"' + + out = ["digraph asm_graph {"] + fix_chars = lambda x: '\\' + x.group() + + # Generate basic blocks + out_blocks = [] + for block in self.nodes(): + out_block = '%s [\n' % block.label.name + out_block += "%s " % block_attr + if isinstance(block, asm_block_bad): + out_block += 'style=filled fillcolor="red" ' + out_block += 'label =<<table border="0" cellborder="0" cellpadding="3">' + + block_label = '<tr><td %s>%s</td></tr>' % ( + label_attr, block.label.name) + block_html_lines = [] + + if lines: + if isinstance(block, asm_block_bad): + block_html_lines.append(block.ERROR_TYPES.get(block._errno, + block._errno)) + + for line in block.lines: + if label: + out_render = "%.8X</td><td %s> " % (line.offset, + td_attr) + else: + out_render = "" + out_render += escape_chars.sub(fix_chars, str(line)) + block_html_lines.append(out_render) + + block_html_lines = ('<tr><td %s>' % td_attr + + ('</td></tr><tr><td %s>' % td_attr).join(block_html_lines) + + '</td></tr>') + out_block += "%s " % block_label + out_block += block_html_lines + "</table>> ];" + out_blocks.append(out_block) + + out += out_blocks + + # Generate links + for src, dst in self.edges(): + exp_label = dst.label + cst = self.edges2constraint.get((src, dst), None) edge_color = "black" - if next_b.c_t == asm_constraint.c_next: + if cst == asm_constraint.c_next: edge_color = "red" - elif next_b.c_t == asm_constraint.c_to: + elif cst == asm_constraint.c_to: edge_color = "limegreen" # special case - if len(block.bto) == 1: + if len(src.bto) == 1: edge_color = "blue" - out.append('%s -> %s' % (src, dst) + + out.append('%s -> %s' % (src.label.name, dst.label.name) + \ '[' + edge_attr % (cst, edge_color) + '];') - out.append("}") - return '\n'.join(out) + out.append("}") + return '\n'.join(out) + + # Helpers + @property + def pendings(self): + """Dictionnary of label -> list(BasicBlocksPending instance) indicating + which label are missing in the current instance. + A label is missing if a block which is already in nodes has constraints + with him (thanks to its .bto) and the corresponding block is not yet in + nodes + """ + return self._pendings + + def _build_label2block(self): + self._label2block = {block.label: block + for block in self._nodes} + + def label2block(self, label): + """Return the block corresponding to label @label + @label: asm_label instance or ExprId(asm_label) instance""" + return self._label2block[label] + + def rebuild_edges(self): + """Consider blocks '.bto' and rebuild edges according to them, ie: + - update constraint type + - add missing edge + - remove no more used edge + + This method should be called if a block's '.bto' in nodes have been + modified without notifying this instance to resynchronize edges. + """ + self._build_label2block() + for block in self._nodes: + edges = [] + # Rebuild edges from bto + for constraint in block.bto: + dst = self._label2block.get(constraint.label, + None) + if dst is None: + # Missing destination, add to pendings + self._pendings.setdefault(constraint.label, + list()).append(self.BasicBlocksPending(block, + constraint.c_t)) + continue + edge = (block, dst) + edges.append(edge) + if edge in self._edges: + # Already known edge, constraint may have changed + self.edges2constraint[edge] = constraint.c_t + else: + # An edge is missing + self.add_edge(edge[0], edge[1], constraint.c_t) + + # Remove useless edges + for succ in self.successors(block): + edge = (block, succ) + if edge not in edges: + self.del_edge(*edge) + + def get_bad_blocks(self): + """Iterator on asm_block_bad elements""" + # A bad asm block is always a leaf + for block in self.leaves(): + if isinstance(block, asm_block_bad): + yield block + + def get_bad_blocks_predecessors(self, strict=False): + """Iterator on block with an asm_block_bad destination + @strict: (optional) if set, return block with only bad + successors + """ + # Avoid returning the same block + done = set() + for badblock in self.get_bad_blocks(): + for predecessor in self.predecessors_iter(badblock): + if predecessor not in done: + if (strict and + not all(isinstance(block, asm_block_bad) + for block in self.successors_iter(predecessor))): + continue + yield predecessor + done.add(predecessor) + + def sanity_check(self): + """Do sanity checks on blocks' constraints: + * no pendings + * no multiple next constraint to same block + * no next constraint to self + """ + + if len(self._pendings) != 0: + raise RuntimeError("Some blocks are missing: %s" % map(str, + self._pendings.keys())) + + next_edges = {edge: constraint + for edge, constraint in self.edges2constraint.iteritems() + if constraint == asm_constraint.c_next} + + for block in self._nodes: + # No next constraint to self + if (block, block) in next_edges: + raise RuntimeError('Bad constraint: self in next') + + # No multiple next constraint to same block + pred_next = list(pblock + for (pblock, dblock) in next_edges + if dblock == block) + + if len(pred_next) > 1: + raise RuntimeError("Too many next constraints for bloc %r" + "(%s)" % (block.label, + map(lambda x: x.label, pred_next))) + + def guess_blocks_size(self, mnemo): + """Asm and compute max block size + Add a 'size' and 'max_size' attribute on each block + @mnemo: metamn instance""" + for block in self._nodes: + size = 0 + for instr in block.lines: + if isinstance(instr, asm_raw): + # for special asm_raw, only extract len + if isinstance(instr.raw, list): + data = None + if len(instr.raw) == 0: + l = 0 + else: + l = instr.raw[0].size / 8 * len(instr.raw) + elif isinstance(instr.raw, str): + data = instr.raw + l = len(data) + else: + raise NotImplementedError('asm raw') + else: + # Assemble the instruction to retrieve its len. + # If the instruction uses symbol it will fail + # In this case, the max_instruction_len is used + try: + candidates = mnemo.asm(instr) + l = len(candidates[-1]) + except: + l = mnemo.max_instruction_len + data = None + instr.data = data + instr.l = l + size += l + + block.size = size + block.max_size = size + log_asmbloc.info("size: %d max: %d", block.size, block.max_size) def conservative_asm(mnemo, instr, symbols, conservative): @@ -708,44 +999,6 @@ def fix_expr_val(expr, symbols): return result -def guess_blocks_size(mnemo, blocks): - """Asm and compute max block size""" - - for block in blocks: - size = 0 - for instr in block.lines: - if isinstance(instr, asm_raw): - # for special asm_raw, only extract len - if isinstance(instr.raw, list): - data = None - if len(instr.raw) == 0: - l = 0 - else: - l = instr.raw[0].size / 8 * len(instr.raw) - elif isinstance(instr.raw, str): - data = instr.raw - l = len(data) - else: - raise NotImplementedError('asm raw') - else: - # Assemble the instruction to retrieve its len. - # If the instruction uses symbol it will fail - # In this case, the max_instruction_len is used - try: - candidates = mnemo.asm(instr) - l = len(candidates[-1]) - except: - l = mnemo.max_instruction_len - data = None - instr.data = data - instr.l = l - size += l - - block.size = size - block.max_size = size - log_asmbloc.info("size: %d max: %d", block.size, block.max_size) - - def fix_label_offset(symbol_pool, label, offset, modified): """Fix the @label offset to @offset. If the @offset has changed, add @label to @modified @@ -1089,33 +1342,13 @@ def asmbloc_final(mnemo, blocks, blockChains, symbol_pool, conservative=False): assemble_block(mnemo, block, symbol_pool, conservative) -def sanity_check_blocks(blocks): - """Do sanity checks on blocks' constraints: - * no multiple next constraint to same block - * no next constraint to self""" - - blocks_graph = basicblocs(blocks) - graph = blocks_graph.g - for label in graph.nodes(): - if blocks_graph.blocs[label].get_next() == label: - raise RuntimeError('Bad constraint: self in next') - pred_next = set() - for pred in graph.predecessors(label): - if not pred in blocks_graph.blocs: - continue - if blocks_graph.blocs[pred].get_next() == label: - pred_next.add(pred) - if len(pred_next) > 1: - raise RuntimeError("Too many next constraints for bloc %r" % label) - - def asm_resolve_final(mnemo, blocks, symbol_pool, dst_interval=None): """Resolve and assemble @blocks using @symbol_pool into interval @dst_interval""" - sanity_check_blocks(blocks) + blocks.sanity_check() - guess_blocks_size(mnemo, blocks) + blocks.guess_blocks_size(mnemo) blockChains = group_constrained_blocks(symbol_pool, blocks) resolved_blockChains = resolve_symbol( blockChains, symbol_pool, dst_interval) @@ -1140,45 +1373,6 @@ def asm_resolve_final(mnemo, blocks, symbol_pool, dst_interval=None): return patches -def blist2graph(ab): - """ - ab: list of asmbloc - return: graph of asmbloc - """ - g = DiGraph() - g.lbl2bloc = {} - for b in ab: - g.lbl2bloc[b.label] = b - g.add_node(b.label) - for x in b.bto: - g.add_edge(b.label, x.label) - return g - - -class basicblocs: - - def __init__(self, ab=[]): - self.blocs = {} - self.g = DiGraph() - self.add_blocs(ab) - - def add(self, b): - self.blocs[b.label] = b - self.g.add_node(b.label) - for dst in b.bto: - self.g.add_edge(b.label, dst.label) - - def add_blocs(self, ab): - for b in ab: - self.add(b) - - def get_bad_dst(self): - out = set() - for block in self.blocs.values(): - for constraint in block.bto: - if isinstance(self.blocs[constraint.label], asm_block_bad): - out.add(block) - return out def find_parents(blocs, l): diff --git a/miasm2/core/parse_asm.py b/miasm2/core/parse_asm.py index e55a9af8..304d0673 100644 --- a/miasm2/core/parse_asm.py +++ b/miasm2/core/parse_asm.py @@ -232,7 +232,7 @@ def parse_txt(mnemo, attrib, txt, symbol_pool=None): cur_block = None state = STATE_NO_BLOC i = 0 - blocks = [] + blocks = asmbloc.BasicBlocks() block_to_nlink = None block_may_link = False delayslot = 0 @@ -261,7 +261,7 @@ def parse_txt(mnemo, attrib, txt, symbol_pool=None): cur_block = asmbloc.asm_bloc(line, alignment=mnemo.alignment) i += 1 # Generate the current bloc - blocks.append(cur_block) + blocks.add_node(cur_block) state = STATE_IN_BLOC if block_to_nlink: block_to_nlink.addto( |