diff options
| -rw-r--r-- | example/asm/shellcode.py | 3 | ||||
| -rw-r--r-- | example/disasm/callback.py | 4 | ||||
| -rw-r--r-- | example/disasm/file.py | 6 | ||||
| -rw-r--r-- | example/disasm/full.py | 10 | ||||
| -rw-r--r-- | example/disasm/function.py | 10 | ||||
| -rw-r--r-- | example/expression/solve_condition_stp.py | 2 | ||||
| -rw-r--r-- | example/ida/graph_ir.py | 3 | ||||
| -rw-r--r-- | example/jitter/unpack_upx.py | 8 | ||||
| -rw-r--r-- | miasm2/core/asmbloc.py | 808 | ||||
| -rw-r--r-- | miasm2/core/graph.py | 67 | ||||
| -rw-r--r-- | miasm2/core/parse_asm.py | 4 | ||||
| -rw-r--r-- | test/core/asmbloc.py | 282 | ||||
| -rw-r--r-- | test/core/graph.py | 25 | ||||
| -rw-r--r-- | test/test_all.py | 3 |
14 files changed, 842 insertions, 393 deletions
diff --git a/example/asm/shellcode.py b/example/asm/shellcode.py index 9dc5c6bc..11cf9a4d 100644 --- a/example/asm/shellcode.py +++ b/example/asm/shellcode.py @@ -76,8 +76,7 @@ if args.PE: # Print and graph firsts blocs before patching it for bloc in blocs: print bloc -graph = asmbloc.bloc2graph(blocs) -open("graph.dot", "w").write(graph) +open("graph.dot", "w").write(blocs.dot()) # Apply patches patches = asmbloc.asm_resolve_final(machine.mn, diff --git a/example/disasm/callback.py b/example/disasm/callback.py index 6c77023e..4a7507dd 100644 --- a/example/disasm/callback.py +++ b/example/disasm/callback.py @@ -63,5 +63,5 @@ blocks_after = mdis.dis_multibloc(0) print "\n".join(str(block) for block in blocks_after) # Ensure the callback has been called -assert blocks[0].lines[0].name == "CALL" -assert blocks_after[0].lines[0].name == "PUSH" +assert blocks.heads()[0].lines[0].name == "CALL" +assert blocks_after.heads()[0].lines[0].name == "PUSH" diff --git a/example/disasm/file.py b/example/disasm/file.py index 1b9347d8..db5cd96b 100644 --- a/example/disasm/file.py +++ b/example/disasm/file.py @@ -1,6 +1,5 @@ import sys from miasm2.arch.x86.disasm import dis_x86_32 -from miasm2.core.asmbloc import bloc2graph from miasm2.analysis.binary import Container from pdb import pm @@ -14,7 +13,6 @@ cont = Container.from_stream(open(sys.argv[1])) mdis = dis_x86_32(cont.bin_stream) # Inform the engine to avoid disassembling null instructions mdis.dont_dis_nulstart_bloc = True -blocs = mdis.dis_multibloc(addr) +blocks = mdis.dis_multibloc(addr) -graph = bloc2graph(blocs) -open('graph.dot', 'w').write(graph) +open('graph.dot', 'w').write(blocks.dot()) diff --git a/example/disasm/full.py b/example/disasm/full.py index 33b2f41f..0b0069c6 100644 --- a/example/disasm/full.py +++ b/example/disasm/full.py @@ -4,7 +4,7 @@ from argparse import ArgumentParser from pdb import pm from miasm2.analysis.binary import Container -from miasm2.core.asmbloc import log_asmbloc, asm_label, bloc2graph +from miasm2.core.asmbloc import log_asmbloc, asm_label, AsmCFG from miasm2.expression.expression import ExprId from miasm2.core.interval import interval from miasm2.analysis.machine import Machine @@ -142,15 +142,13 @@ while not finish and todo: # Generate dotty graph -all_blocs = [] +all_blocs = AsmCFG() for blocs in all_funcs_blocs.values(): all_blocs += blocs - # for b in blocs: - # print b + log.info('generate graph file') -g = bloc2graph(all_blocs, True) -open('graph_execflow.dot', 'w').write(g) +open('graph_execflow.dot', 'w').write(all_blocs.dot(label=True)) log.info('generate intervals') diff --git a/example/disasm/function.py b/example/disasm/function.py index a1a9b393..1fe1754f 100644 --- a/example/disasm/function.py +++ b/example/disasm/function.py @@ -1,5 +1,4 @@ from miasm2.arch.x86.disasm import dis_x86_32 -from miasm2.core.asmbloc import bloc2graph # MOV EAX, 0x1337BEEF # MOV ECX, 0x4 @@ -9,10 +8,9 @@ from miasm2.core.asmbloc import bloc2graph # RET shellcode = '\xb8\xef\xbe7\x13\xb9\x04\x00\x00\x00\xc1\xc0\x08\xe2\xfb\xc3' mdis = dis_x86_32(shellcode) -blocs = mdis.dis_multibloc(0) +blocks = mdis.dis_multibloc(0) -for bloc in blocs: - print bloc +for block in blocks: + print block -graph = bloc2graph(blocs) -open('graph.dot', 'w').write(graph) +open('graph.dot', 'w').write(blocks.dot()) diff --git a/example/expression/solve_condition_stp.py b/example/expression/solve_condition_stp.py index 034a115f..93c17018 100644 --- a/example/expression/solve_condition_stp.py +++ b/example/expression/solve_condition_stp.py @@ -168,7 +168,7 @@ if __name__ == '__main__': ''') - b = blocs[0] + b = list(blocs)[0] print b # add fake address and len to parsed instructions for i, l in enumerate(b.lines): diff --git a/example/ida/graph_ir.py b/example/ida/graph_ir.py index b181f72a..4447cadd 100644 --- a/example/ida/graph_ir.py +++ b/example/ida/graph_ir.py @@ -119,8 +119,7 @@ print hex(ad) ab = mdis.dis_multibloc(ad) print "generating graph" -g = bloc2graph(ab, True) -open('asm_flow.dot', 'w').write(g) +open('asm_flow.dot', 'w').write(ab.graph.dot(label=True)) print "generating IR... %x" % ad diff --git a/example/jitter/unpack_upx.py b/example/jitter/unpack_upx.py index 72a9feb3..d95c5a18 100644 --- a/example/jitter/unpack_upx.py +++ b/example/jitter/unpack_upx.py @@ -3,7 +3,7 @@ import logging from pdb import pm from elfesteem import pe from miasm2.analysis.sandbox import Sandbox_Win_x86_32 -from miasm2.core import asmbloc + filename = os.environ.get('PYTHONSTARTUP') if filename and os.path.isfile(filename): @@ -61,8 +61,7 @@ mdis = sb.machine.dis_engine(sb.jitter.bs) mdis.dont_dis_nulstart_bloc = True ab = mdis.dis_multibloc(sb.entry_point) -bb = asmbloc.basicblocs(ab) -leaves = bb.get_bad_dst() +leaves = list(ab.get_bad_blocks_predecessors()) assert(len(leaves) == 1) l = leaves.pop() logging.info(l) @@ -73,8 +72,7 @@ logging.info(end_label) # Export CFG graph (dot format) if options.graph is True: - g = asmbloc.bloc2graph(ab) - open("graph.dot", "w").write(g) + open("graph.dot", "w").write(ab.graph.dot()) if options.verbose is True: diff --git a/miasm2/core/asmbloc.py b/miasm2/core/asmbloc.py index 3517d452..9553d14d 100644 --- a/miasm2/core/asmbloc.py +++ b/miasm2/core/asmbloc.py @@ -4,15 +4,16 @@ import logging import inspect import re - +from collections import namedtuple import miasm2.expression.expression as m2_expr from miasm2.expression.simplifications import expr_simp from miasm2.expression.modint import moduint, modint from miasm2.core.utils import Disasm_Exception, pck -from miasm2.core.graph import DiGraph +from miasm2.core.graph import DiGraph, DiGraphSimplifier from miasm2.core.interval import interval + log_asmbloc = logging.getLogger("asmblock") console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter("%(levelname)-5s: %(message)s")) @@ -524,47 +525,6 @@ def dis_bloc(mnemo, pool_bin, label, offset, job_done, symbol_pool, return cur_block, offsets_to_dis -def split_bloc(mnemo, attrib, pool_bin, blocs, - symbol_pool, more_ref=None, dis_bloc_callback=None): - if not more_ref: - more_ref = [] - - # get all possible dst - bloc_dst = [symbol_pool._offset2label[x] for x in more_ref] - for b in blocs: - if isinstance(b, asm_block_bad): - continue - for c in b.bto: - bloc_dst.append(c.label) - - bloc_dst = [x.offset for x in bloc_dst if x.offset is not None] - - j = -1 - while j < len(blocs) - 1: - j += 1 - cb = blocs[j] - a, b = cb.get_range() - - for off in bloc_dst: - if not (off > a and off < b): - continue - l = symbol_pool.getby_offset_create(off) - new_b = cb.split(off, l) - log_asmbloc.debug("split bloc %x", off) - if new_b is None: - log_asmbloc.error("cannot split %x!!", off) - continue - if dis_bloc_callback: - offsets_to_dis = set(x.label.offset for x in new_b.bto) - dis_bloc_callback(mn=mnemo, attrib=attrib, pool_bin=pool_bin, - cur_bloc=new_b, offsets_to_dis=offsets_to_dis, - symbol_pool=symbol_pool) - blocs.append(new_b) - a, b = cb.get_range() - - return blocs - - def dis_bloc_all(mnemo, pool_bin, offset, job_done, symbol_pool, dont_dis=[], split_dis=[], follow_call=False, dontdis_retcall=False, blocs_wd=None, lines_wd=None, blocs=None, @@ -572,7 +532,7 @@ def dis_bloc_all(mnemo, pool_bin, offset, job_done, symbol_pool, dont_dis=[], attrib={}): log_asmbloc.info("dis bloc all") if blocs is None: - blocs = [] + blocs = AsmCFG() todo = [offset] bloc_cpt = 0 @@ -609,70 +569,461 @@ def dis_bloc_all(mnemo, pool_bin, offset, job_done, symbol_pool, dont_dis=[], dont_dis_nulstart_bloc=dont_dis_nulstart_bloc, attrib=attrib) todo += nexts - blocs.append(cur_block) + blocs.add_node(cur_block) + + blocs.apply_splitting(symbol_pool, dis_block_callback=dis_bloc_callback, + mn=mnemo, attrib=attrib, pool_bin=pool_bin) + return blocs - return split_bloc(mnemo, attrib, pool_bin, blocs, - symbol_pool, dis_bloc_callback=dis_bloc_callback) +class AsmCFG(DiGraph): + """Directed graph standing for a ASM Control Flow Graph with: + - nodes: asm_bloc + - edges: constraints between blocks, synchronized with asm_bloc's "bto" -def bloc2graph(blocks, label=False, lines=True): - """Render dot graph of @blocks""" + Specialized the .dot export and force the relation between block to be uniq, + and associated with a constraint. - escape_chars = re.compile('[' + re.escape('{}') + ']') - label_attr = 'colspan="2" align="center" bgcolor="grey"' - edge_attr = 'label = "%s" color="%s" style="bold"' - td_attr = 'align="left"' - block_attr = 'shape="Mrecord" fontname="Courier New"' + Offer helpers on AsmCFG management, such as research by label, sanity + checking and mnemonic size guessing. + """ - out = ["digraph asm_graph {"] - fix_chars = lambda x: '\\' + x.group() + # Internal structure for pending management + AsmCFGPending = namedtuple("AsmCFGPending", + ["waiter", "constraint"]) + + def __init__(self, *args, **kwargs): + super(AsmCFG, self).__init__(*args, **kwargs) + # Edges -> constraint + self.edges2constraint = {} + # Expected asm_label -> set( (src, dst), constraint ) + self._pendings = {} + # Label2block built on the fly + self._label2block = {} + + # Compatibility with old list API + def append(self, *args, **kwargs): + raise DeprecationWarning("AsmCFG is a graph, use add_node") + + def remove(self, *args, **kwargs): + raise DeprecationWarning("AsmCFG is a graph, use del_node") + + def __getitem__(self, *args, **kwargs): + raise DeprecationWarning("Order of AsmCFG elements is not reliable") + + def __iter__(self): + """Iterator on asm_bloc composing the current graph""" + return iter(self._nodes) + + def __len__(self): + """Return the number of blocks in AsmCFG""" + return len(self._nodes) + + # Manage graph with associated constraints + def add_edge(self, src, dst, constraint): + """Add an edge to the graph + @src: asm_bloc instance, source + @dst: asm_block instance, destination + @constraint: constraint associated to this edge + """ + # Sanity check + assert (src, dst) not in self.edges2constraint + + # Add the edge to src.bto if needed + if dst.label not in [cons.label for cons in src.bto]: + src.bto.add(asm_constraint(dst.label, constraint)) + + # Add edge + self.edges2constraint[(src, dst)] = constraint + super(AsmCFG, self).add_edge(src, dst) + + def add_uniq_edge(self, src, dst, constraint): + """Add an edge from @src to @dst if it doesn't already exist""" + if (src not in self._nodes_succ or + dst not in self._nodes_succ[src]): + self.add_edge(src, dst, constraint) + + def del_edge(self, src, dst): + """Delete the edge @src->@dst and its associated constraint""" + # Delete from src.bto + to_remove = [cons for cons in src.bto if cons.label == dst.label] + if to_remove: + assert len(to_remove) == 1 + src.bto.remove(to_remove[0]) + + # Del edge + del self.edges2constraint[(src, dst)] + super(AsmCFG, self).del_edge(src, dst) + + def add_node(self, block): + """Add the block @block to the current instance, if it is not already in + @block: asm_bloc instance + + Edges will be created for @block.bto, if destinations are already in + this instance. If not, they will be resolved when adding these + aforementionned destinations. + `self.pendings` indicates which blocks are not yet resolved. + """ + status = super(AsmCFG, self).add_node(block) + if not status: + return status + + # Update waiters + if block.label in self._pendings: + for bblpend in self._pendings[block.label]: + self.add_edge(bblpend.waiter, block, bblpend.constraint) + del self._pendings[block.label] + + # Synchronize edges with block destinations + self._label2block[block.label] = block + for constraint in block.bto: + dst = self._label2block.get(constraint.label, + None) + if dst is None: + # Block is yet unknown, add it to pendings + to_add = self.AsmCFGPending(waiter=block, + constraint=constraint.c_t) + self._pendings.setdefault(constraint.label, + set()).add(to_add) + else: + # Block is already in known nodes + self.add_edge(block, dst, constraint.c_t) + + return status + + def del_node(self, block): + super(AsmCFG, self).del_node(block) + del self._label2block[block.label] + + def merge(self, graph): + """Merge with @graph, taking in account constraints""" + # -> add_edge(x, y, constraint) + for node in graph._nodes: + self.add_node(node) + for edge in graph._edges: + # Use "_uniq_" beacause the edge can already exist due to add_node + self.add_uniq_edge(*edge, constraint=graph.edges2constraint[edge]) + + def dot(self, label=False, lines=True): + """Render dot graph with HTML + @label: (optional) if set, add the corresponding label in each block + @lines: (optional) if set, includes assembly lines in the output + """ - # Generate basic blocks - out_blocks = [] - for block in blocks: - out_block = '%s [\n' % block.label.name - out_block += "%s " % block_attr - out_block += 'label =<<table border="0" cellborder="0" cellpadding="3">' - - block_label = '<tr><td %s>%s</td></tr>' % ( - label_attr, block.label.name) - block_html_lines = [] - if lines: - for line in block.lines: - if label: - out_render = "%.8X</td><td %s> " % (line.offset, td_attr) - else: - out_render = "" - out_render += escape_chars.sub(fix_chars, str(line)) - block_html_lines.append(out_render) - block_html_lines = ('<tr><td %s>' % td_attr + - ('</td></tr><tr><td %s>' % td_attr).join(block_html_lines) + - '</td></tr>') - out_block += "%s " % block_label - out_block += block_html_lines + "</table>> ];" - out_blocks.append(out_block) - - out += out_blocks - - # Generate links - for block in blocks: - for next_b in block.bto: - src, dst, cst = block.label.name, next_b.label.name, next_b.c_t + escape_chars = re.compile('[' + re.escape('{}') + ']') + label_attr = 'colspan="2" align="center" bgcolor="grey"' + edge_attr = 'label = "%s" color="%s" style="bold"' + td_attr = 'align="left"' + block_attr = 'shape="Mrecord" fontname="Courier New"' + + out = ["digraph asm_graph {"] + fix_chars = lambda x: '\\' + x.group() + + # Generate basic blocks + out_blocks = [] + for block in self.nodes(): + out_block = '%s [\n' % block.label.name + out_block += "%s " % block_attr + if isinstance(block, asm_block_bad): + out_block += 'style=filled fillcolor="red" ' + out_block += 'label =<<table border="0" cellborder="0" cellpadding="3">' + + block_label = '<tr><td %s>%s</td></tr>' % ( + label_attr, block.label.name) + block_html_lines = [] + + if lines: + if isinstance(block, asm_block_bad): + block_html_lines.append(block.ERROR_TYPES.get(block._errno, + block._errno)) + + for line in block.lines: + if label: + out_render = "%.8X</td><td %s> " % (line.offset, + td_attr) + else: + out_render = "" + out_render += escape_chars.sub(fix_chars, str(line)) + block_html_lines.append(out_render) + + block_html_lines = ('<tr><td %s>' % td_attr + + ('</td></tr><tr><td %s>' % td_attr).join(block_html_lines) + + '</td></tr>') + out_block += "%s " % block_label + out_block += block_html_lines + "</table>> ];" + out_blocks.append(out_block) + + out += out_blocks + + # Generate links + for src, dst in self.edges(): + exp_label = dst.label + cst = self.edges2constraint.get((src, dst), None) edge_color = "black" - if next_b.c_t == asm_constraint.c_next: + if cst == asm_constraint.c_next: edge_color = "red" - elif next_b.c_t == asm_constraint.c_to: + elif cst == asm_constraint.c_to: edge_color = "limegreen" # special case - if len(block.bto) == 1: + if len(src.bto) == 1: edge_color = "blue" - out.append('%s -> %s' % (src, dst) + + out.append('%s -> %s' % (src.label.name, dst.label.name) + \ '[' + edge_attr % (cst, edge_color) + '];') - out.append("}") - return '\n'.join(out) + out.append("}") + return '\n'.join(out) + + # Helpers + @property + def pendings(self): + """Dictionnary of label -> set(AsmCFGPending instance) indicating + which label are missing in the current instance. + A label is missing if a block which is already in nodes has constraints + with him (thanks to its .bto) and the corresponding block is not yet in + nodes + """ + return self._pendings + + def _build_label2block(self): + self._label2block = {block.label: block + for block in self._nodes} + + def label2block(self, label): + """Return the block corresponding to label @label + @label: asm_label instance or ExprId(asm_label) instance""" + return self._label2block[label] + + def rebuild_edges(self): + """Consider blocks '.bto' and rebuild edges according to them, ie: + - update constraint type + - add missing edge + - remove no more used edge + + This method should be called if a block's '.bto' in nodes have been + modified without notifying this instance to resynchronize edges. + """ + self._build_label2block() + for block in self._nodes: + edges = [] + # Rebuild edges from bto + for constraint in block.bto: + dst = self._label2block.get(constraint.label, + None) + if dst is None: + # Missing destination, add to pendings + self._pendings.setdefault(constraint.label, + set()).add(self.AsmCFGPending(block, + constraint.c_t)) + continue + edge = (block, dst) + edges.append(edge) + if edge in self._edges: + # Already known edge, constraint may have changed + self.edges2constraint[edge] = constraint.c_t + else: + # An edge is missing + self.add_edge(edge[0], edge[1], constraint.c_t) + + # Remove useless edges + for succ in self.successors(block): + edge = (block, succ) + if edge not in edges: + self.del_edge(*edge) + + def get_bad_blocks(self): + """Iterator on asm_block_bad elements""" + # A bad asm block is always a leaf + for block in self.leaves(): + if isinstance(block, asm_block_bad): + yield block + + def get_bad_blocks_predecessors(self, strict=False): + """Iterator on block with an asm_block_bad destination + @strict: (optional) if set, return block with only bad + successors + """ + # Avoid returning the same block + done = set() + for badblock in self.get_bad_blocks(): + for predecessor in self.predecessors_iter(badblock): + if predecessor not in done: + if (strict and + not all(isinstance(block, asm_block_bad) + for block in self.successors_iter(predecessor))): + continue + yield predecessor + done.add(predecessor) + + def sanity_check(self): + """Do sanity checks on blocks' constraints: + * no pendings + * no multiple next constraint to same block + * no next constraint to self + """ + + if len(self._pendings) != 0: + raise RuntimeError("Some blocks are missing: %s" % map(str, + self._pendings.keys())) + + next_edges = {edge: constraint + for edge, constraint in self.edges2constraint.iteritems() + if constraint == asm_constraint.c_next} + + for block in self._nodes: + # No next constraint to self + if (block, block) in next_edges: + raise RuntimeError('Bad constraint: self in next') + + # No multiple next constraint to same block + pred_next = list(pblock + for (pblock, dblock) in next_edges + if dblock == block) + + if len(pred_next) > 1: + raise RuntimeError("Too many next constraints for bloc %r" + "(%s)" % (block.label, + map(lambda x: x.label, pred_next))) + + def guess_blocks_size(self, mnemo): + """Asm and compute max block size + Add a 'size' and 'max_size' attribute on each block + @mnemo: metamn instance""" + for block in self._nodes: + size = 0 + for instr in block.lines: + if isinstance(instr, asm_raw): + # for special asm_raw, only extract len + if isinstance(instr.raw, list): + data = None + if len(instr.raw) == 0: + l = 0 + else: + l = instr.raw[0].size / 8 * len(instr.raw) + elif isinstance(instr.raw, str): + data = instr.raw + l = len(data) + else: + raise NotImplementedError('asm raw') + else: + # Assemble the instruction to retrieve its len. + # If the instruction uses symbol it will fail + # In this case, the max_instruction_len is used + try: + candidates = mnemo.asm(instr) + l = len(candidates[-1]) + except: + l = mnemo.max_instruction_len + data = None + instr.data = data + instr.l = l + size += l + + block.size = size + block.max_size = size + log_asmbloc.info("size: %d max: %d", block.size, block.max_size) + + def apply_splitting(self, symbol_pool, dis_block_callback=None, **kwargs): + """Consider @self' bto destinations and split block in @self if one of + these destinations jumps in the middle of this block. + In order to work, they must be only one block in @self per label in + @symbol_pool (which is true if @self come from the same disasmEngine). + + @symbol_pool: asm_symbol_pool instance associated with @self'labels + @dis_block_callback: (optional) if set, this callback will be called on + new block destinations + @kwargs: (optional) named arguments to pass to dis_block_callback + """ + # Get all possible destinations not yet resolved, with a resolved offset + block_dst = [label.offset + for label in self.pendings + if label.offset is not None] + + todo = self.nodes().copy() + rebuild_needed = False + + while todo: + # Find a block with a destination inside another one + cur_block = todo.pop() + range_start, range_stop = cur_block.get_range() + + for off in block_dst: + if not (off > range_start and off <= range_stop): + continue + + # `cur_block` must be splitted at offset `off` + label = symbol_pool.getby_offset_create(off) + new_b = cur_block.split(off, label) + log_asmbloc.debug("Split block %x", off) + if new_b is None: + log_asmbloc.error("Cannot split %x!!", off) + continue + + # The new block destinations may need to be disassembled + if dis_block_callback: + offsets_to_dis = set(constraint.label.offset + for constraint in new_b.bto) + dis_block_callback(cur_bloc=new_b, + offsets_to_dis=offsets_to_dis, + symbol_pool=symbol_pool, **kwargs) + + # Update structure + rebuild_needed = True + self.add_node(new_b) + + # The new block must be considered + todo.add(new_b) + range_start, range_stop = cur_block.get_range() + + # Rebuild edges to match new blocks'bto + if rebuild_needed: + self.rebuild_edges() + +def _merge_blocks(dg, graph): + + for block in graph.nodes(): + + # Acceptable block + if isinstance(block, asm_block_bad) or len(block.lines) == 0: + continue + + # Only one son, not me + succs = graph.successors(block) + if len(succs) != 1: + continue + succ = succs[0] + if succ == block: + continue + + # Son has only one parent, me + preds = graph.predecessors(succ) + if len(preds) != 1: + continue + assert preds[0] == block + + # Remove block last instruction if needed + last_instr = block.lines[-1] + if last_instr.delayslot > 0: + # TODO: delayslot + raise RuntimeError("Not implemented yet") + + if last_instr.is_subcall(): + continue + if last_instr.breakflow() and last_instr.dstflow(): + block.lines.pop() + + # Merge block + block.lines += succ.lines + for nextb in graph.successors_iter(succ): + graph.add_edge(block, nextb, graph.edges2constraint[(succ, nextb)]) + + graph.del_node(succ) + break + + +bbl_simplifier = DiGraphSimplifier() +bbl_simplifier.enable_passes([_merge_blocks]) def conservative_asm(mnemo, instr, symbols, conservative): @@ -708,44 +1059,6 @@ def fix_expr_val(expr, symbols): return result -def guess_blocks_size(mnemo, blocks): - """Asm and compute max block size""" - - for block in blocks: - size = 0 - for instr in block.lines: - if isinstance(instr, asm_raw): - # for special asm_raw, only extract len - if isinstance(instr.raw, list): - data = None - if len(instr.raw) == 0: - l = 0 - else: - l = instr.raw[0].size / 8 * len(instr.raw) - elif isinstance(instr.raw, str): - data = instr.raw - l = len(data) - else: - raise NotImplementedError('asm raw') - else: - # Assemble the instruction to retrieve its len. - # If the instruction uses symbol it will fail - # In this case, the max_instruction_len is used - try: - candidates = mnemo.asm(instr) - l = len(candidates[-1]) - except: - l = mnemo.max_instruction_len - data = None - instr.data = data - instr.l = l - size += l - - block.size = size - block.max_size = size - log_asmbloc.info("size: %d max: %d", block.size, block.max_size) - - def fix_label_offset(symbol_pool, label, offset, modified): """Fix the @label offset to @offset. If the @offset has changed, add @label to @modified @@ -1089,33 +1402,13 @@ def asmbloc_final(mnemo, blocks, blockChains, symbol_pool, conservative=False): assemble_block(mnemo, block, symbol_pool, conservative) -def sanity_check_blocks(blocks): - """Do sanity checks on blocks' constraints: - * no multiple next constraint to same block - * no next constraint to self""" - - blocks_graph = basicblocs(blocks) - graph = blocks_graph.g - for label in graph.nodes(): - if blocks_graph.blocs[label].get_next() == label: - raise RuntimeError('Bad constraint: self in next') - pred_next = set() - for pred in graph.predecessors(label): - if not pred in blocks_graph.blocs: - continue - if blocks_graph.blocs[pred].get_next() == label: - pred_next.add(pred) - if len(pred_next) > 1: - raise RuntimeError("Too many next constraints for bloc %r" % label) - - def asm_resolve_final(mnemo, blocks, symbol_pool, dst_interval=None): """Resolve and assemble @blocks using @symbol_pool into interval @dst_interval""" - sanity_check_blocks(blocks) + blocks.sanity_check() - guess_blocks_size(mnemo, blocks) + blocks.guess_blocks_size(mnemo) blockChains = group_constrained_blocks(symbol_pool, blocks) resolved_blockChains = resolve_symbol( blockChains, symbol_pool, dst_interval) @@ -1140,215 +1433,6 @@ def asm_resolve_final(mnemo, blocks, symbol_pool, dst_interval=None): return patches -def blist2graph(ab): - """ - ab: list of asmbloc - return: graph of asmbloc - """ - g = DiGraph() - g.lbl2bloc = {} - for b in ab: - g.lbl2bloc[b.label] = b - g.add_node(b.label) - for x in b.bto: - g.add_edge(b.label, x.label) - return g - - -class basicblocs: - - def __init__(self, ab=[]): - self.blocs = {} - self.g = DiGraph() - self.add_blocs(ab) - - def add(self, b): - self.blocs[b.label] = b - self.g.add_node(b.label) - for dst in b.bto: - self.g.add_edge(b.label, dst.label) - - def add_blocs(self, ab): - for b in ab: - self.add(b) - - def get_bad_dst(self): - out = set() - for block in self.blocs.values(): - for constraint in block.bto: - if isinstance(self.blocs[constraint.label], asm_block_bad): - out.add(block) - return out - - -def find_parents(blocs, l): - p = set() - for b in blocs: - if l in [x.label for x in b.bto]: - p.add(b.label) - return p - - -def bloc_blink(blocs): - for b in blocs: - b.parents = find_parents(blocs, b.label) - - -def getbloc_around(blocs, a, level=3, done=None, blocby_label=None): - - if not blocby_label: - blocby_label = {} - for b in blocs: - blocby_label[b.label] = b - if done is None: - done = set() - - done.add(a) - if not level: - return done - for b in a.parents: - b = blocby_label[b] - if b in done: - continue - done.update(getbloc_around(blocs, b, level - 1, done, blocby_label)) - for b in a.bto: - b = blocby_label[b.label] - if b in done: - continue - done.update(getbloc_around(blocs, b, level - 1, done, blocby_label)) - return done - - -def getbloc_parents(blocs, a, level=3, done=None, blocby_label=None): - - if not blocby_label: - blocby_label = {} - for b in blocs: - blocby_label[b.label] = b - if done is None: - done = set() - - done.add(a) - if not level: - return done - for b in a.parents: - b = blocby_label[b] - if b in done: - continue - done.update(getbloc_parents(blocs, b, level - 1, done, blocby_label)) - return done - -# get ONLY level_X parents - - -def getbloc_parents_strict( - blocs, a, level=3, rez=None, done=None, blocby_label=None): - - if not blocby_label: - blocby_label = {} - for b in blocs: - blocby_label[b.label] = b - if rez is None: - rez = set() - if done is None: - done = set() - - done.add(a) - if level == 0: - rez.add(a) - if not level: - return rez - for b in a.parents: - b = blocby_label[b] - if b in done: - continue - rez.update(getbloc_parents_strict( - blocs, b, level - 1, rez, done, blocby_label)) - return rez - - -def bloc_find_path_next(blocs, blocby_label, a, b, path=None): - if path == None: - path = [] - if a == b: - return [path] - - all_path = [] - for x in a.bto: - if x.c_t != asm_constraint.c_next: - continue - if not x.label in blocby_label: - log_asmbloc.error('XXX unknown label') - continue - x = blocby_label[x.label] - all_path += bloc_find_path_next(blocs, blocby_label, x, b, path + [a]) - # stop if at least one path found - if all_path: - return all_path - return all_path - - -def bloc_merge(blocs, dont_merge=[]): - blocby_label = {} - for b in blocs: - blocby_label[b.label] = b - b.parents = find_parents(blocs, b.label) - - i = -1 - while i < len(blocs) - 1: - i += 1 - b = blocs[i] - if b.label in dont_merge: - continue - p = set(b.parents) - # if bloc dont self ref - if b.label in p: - continue - # and bloc has only one parent - if len(p) != 1: - continue - # may merge - bpl = p.pop() - # bp = getblocby_label(blocs, bpl) - bp = blocby_label[bpl] - # and parent has only one son - if len(bp.bto) != 1: - continue - # and will not create next loop composed of constraint_next from son to - # parent - - path = bloc_find_path_next(blocs, blocby_label, b, bp) - if path: - continue - if bp.lines: - l = bp.lines[-1] - # jmp opt; jcc opt - if l.is_subcall(): - continue - if l.breakflow() and l.dstflow(): - bp.lines.pop() - # merge - # sons = b.bto[:] - - # update parents - for s in b.bto: - if s.label.name == None: - continue - if not s.label in blocby_label: - log_asmbloc.error("unknown parent XXX") - continue - bs = blocby_label[s.label] - for p in list(bs.parents): - if p == b.label: - bs.parents.discard(p) - bs.parents.add(bp.label) - bp.lines += b.lines - bp.bto = b.bto - - del blocs[i] - i = -1 - - class disasmEngine(object): def __init__(self, arch, attrib, bs=None, **kwargs): diff --git a/miasm2/core/graph.py b/miasm2/core/graph.py index 49fc0817..66dffdba 100644 --- a/miasm2/core/graph.py +++ b/miasm2/core/graph.py @@ -1,5 +1,6 @@ from collections import defaultdict, namedtuple + class DiGraph(object): """Implementation of directed graph""" @@ -25,12 +26,42 @@ class DiGraph(object): def edges(self): return self._edges + def merge(self, graph): + """Merge the current graph with @graph + @graph: DiGraph instance + """ + for node in graph._nodes: + self.add_node(node) + for edge in graph._edges: + self.add_edge(*edge) + + def __add__(self, graph): + """Wrapper on `.merge`""" + self.merge(graph) + return self + + def copy(self): + """Copy the current graph instance""" + graph = self.__class__() + return graph + self + + def __eq__(self, graph): + if not isinstance(graph, self.__class__): + return False + return all((self._nodes == graph.nodes(), + sorted(self._edges) == sorted(graph.edges()))) + def add_node(self, node): + """Add the node @node to the graph. + If the node was already present, return False. + Otherwise, return True + """ if node in self._nodes: - return + return False self._nodes.add(node) self._nodes_succ[node] = [] self._nodes_pred[node] = [] + return True def del_node(self, node): """Delete the @node of the graph; Also delete every edge to/from this @@ -507,3 +538,37 @@ shape = "box" done.add(current.node) yield scc + + +class DiGraphSimplifier(object): + """Wrapper on graph simplification passes. + + Instance handle passes lists. + """ + + def __init__(self): + self.passes = [] + + def enable_passes(self, passes): + """Add @passes to passes to applied + @passes: sequence of function (DiGraphSimplifier, DiGraph) -> None + """ + self.passes += passes + + def apply_simp(self, graph): + """Apply enabled simplifications on graph @graph + @graph: DiGraph instance + """ + while True: + new_graph = graph.copy() + for simp_func in self.passes: + simp_func(self, new_graph) + + if new_graph == graph: + break + graph = new_graph + return new_graph + + def __call__(self, graph): + """Wrapper on 'apply_simp'""" + return self.apply_simp(graph) diff --git a/miasm2/core/parse_asm.py b/miasm2/core/parse_asm.py index e55a9af8..aefa6df9 100644 --- a/miasm2/core/parse_asm.py +++ b/miasm2/core/parse_asm.py @@ -232,7 +232,7 @@ def parse_txt(mnemo, attrib, txt, symbol_pool=None): cur_block = None state = STATE_NO_BLOC i = 0 - blocks = [] + blocks = asmbloc.AsmCFG() block_to_nlink = None block_may_link = False delayslot = 0 @@ -261,7 +261,7 @@ def parse_txt(mnemo, attrib, txt, symbol_pool=None): cur_block = asmbloc.asm_bloc(line, alignment=mnemo.alignment) i += 1 # Generate the current bloc - blocks.append(cur_block) + blocks.add_node(cur_block) state = STATE_IN_BLOC if block_to_nlink: block_to_nlink.addto( diff --git a/test/core/asmbloc.py b/test/core/asmbloc.py new file mode 100644 index 00000000..45f7f27f --- /dev/null +++ b/test/core/asmbloc.py @@ -0,0 +1,282 @@ +from pdb import pm + +from miasm2.arch.x86.disasm import dis_x86_32 +from miasm2.analysis.binary import Container +from miasm2.core.asmbloc import AsmCFG, asm_constraint, asm_bloc, \ + asm_label, asm_block_bad, asm_constraint_to, asm_constraint_next, \ + bbl_simplifier +from miasm2.core.graph import DiGraphSimplifier +from miasm2.expression.expression import ExprId + + +# Initial data: from 'samples/simple_test.bin' +data = "5589e583ec10837d08007509c745fc01100000eb73837d08017709c745fc02100000eb64837d08057709c745fc03100000eb55837d080774138b450801c083f80e7509c745fc04100000eb3c8b450801c083f80e7509c745fc05100000eb298b450883e03085c07409c745fc06100000eb16837d08427509c745fc07100000eb07c745fc081000008b45fcc9c3".decode("hex") +cont = Container.from_string(data) + +# Test Disasm engine +mdis = dis_x86_32(cont.bin_stream) +## Disassembly of one block +first_block = mdis.dis_bloc(0) +assert len(first_block.lines) == 5 +print first_block + +## Disassembly of several block, with cache +blocks = mdis.dis_multibloc(0) +assert len(blocks) == 0 + +## Test cache +mdis.job_done.clear() +blocks = mdis.dis_multibloc(0) +assert len(blocks) == 17 +## Equality between assembly lines is not yet implemented +assert len(blocks.heads()) == 1 +assert len(blocks.heads()[0].lines) == len(first_block.lines) + +# Test AsmCFG +assert isinstance(blocks, AsmCFG) +assert len(blocks.pendings) == 0 +assert len(blocks.nodes()) == 17 +assert len(blocks.edges2constraint) == len(blocks.edges()) +assert len(blocks.edges()) == 24 + +## Convert to dot +open("graph.dot", "w").write(blocks.dot()) + +## Modify the structure: link the first and the last block +leaves = blocks.leaves() +assert len(leaves) == 1 +last_block = leaves.pop() + +### Remove first_block for the rest of the graph +first_block = blocks.heads()[0] +assert len(first_block.bto) == 2 +for succ in blocks.successors(first_block): + blocks.del_edge(first_block, succ) + +### Modification must be reported from the graph +assert len(first_block.bto) == 0 +assert last_block in blocks + +### Remove predecessors of last block +for pred in blocks.predecessors(last_block): + blocks.del_edge(pred, last_block) +### Link first and last block +blocks.add_edge(first_block, last_block, asm_constraint.c_next) +### Only one link between two blocks +try: + blocks.add_edge(first_block, last_block, asm_constraint.c_to) + good = False +except AssertionError: + good = True +assert good + +### Check final state +assert len(first_block.bto) == 1 +assert list(first_block.bto)[0].c_t == asm_constraint.c_next + +## Simplify the obtained graph to keep only blocks which reach a block +## finnishing with RET + +def remove_useless_blocks(d_g, graph): + """Remove leaves without a RET""" + for block in graph.leaves(): + if block.lines[-1].name != "RET": + graph.del_node(block) + +### Use a graph simplifier to recursively apply the simplification pass +dg = DiGraphSimplifier() +dg.enable_passes([remove_useless_blocks]) +blocks = dg(blocks) + +### Only two blocks should remain +assert len(blocks) == 2 +assert first_block in blocks +assert last_block in blocks + +## Graph the final output +open("graph2.dot", "w").write(blocks.dot()) + +# Test helper methods +## Label2block should always be updated +assert blocks.label2block(first_block.label) == first_block +my_block = asm_bloc(asm_label("testlabel")) +blocks.add_node(my_block) +assert len(blocks) == 3 +assert blocks.label2block(first_block.label) == first_block +assert blocks.label2block(my_block.label) == my_block + +## Bad blocks +assert len(list(blocks.get_bad_blocks())) == 0 +assert len(list(blocks.get_bad_blocks_predecessors())) == 0 +### Add a bad block, not linked +my_bad_block = asm_block_bad(asm_label("testlabel_bad")) +blocks.add_node(my_bad_block) +assert list(blocks.get_bad_blocks()) == [my_bad_block] +assert len(list(blocks.get_bad_blocks_predecessors())) == 0 +### Link the bad block and update edges +### Indeed, a sub-element has been modified (bto from a block from blocks) +my_block.bto.add(asm_constraint_to(my_bad_block.label)) +blocks.rebuild_edges() +assert list(blocks.get_bad_blocks_predecessors()) == [my_block] +### Test strict option +my_block.bto.add(asm_constraint_to(my_block.label)) +blocks.rebuild_edges() +assert list(blocks.get_bad_blocks_predecessors(strict=False)) == [my_block] +assert len(list(blocks.get_bad_blocks_predecessors(strict=True))) == 0 + +## Sanity check +blocks.sanity_check() +### Next on itself +my_block_ni = asm_bloc(asm_label("testlabel_nextitself")) +my_block_ni.bto.add(asm_constraint_next(my_block_ni.label)) +blocks.add_node(my_block_ni) +error_raised = False +try: + blocks.sanity_check() +except RuntimeError: + error_raised = True +assert error_raised +### Back to a normal state +blocks.del_node(my_block_ni) +blocks.sanity_check() +### Multiple next on the same node +my_block_target = asm_bloc(asm_label("testlabel_target")) +blocks.add_node(my_block_target) +my_block_src1 = asm_bloc(asm_label("testlabel_src1")) +my_block_src2 = asm_bloc(asm_label("testlabel_src2")) +my_block_src1.bto.add(asm_constraint_next(my_block_target.label)) +blocks.add_node(my_block_src1) +### OK for now +blocks.sanity_check() +### Add a second next from src2 to target (already src1 -> target) +my_block_src2.bto.add(asm_constraint_next(my_block_target.label)) +blocks.add_node(my_block_src2) +error_raised = False +try: + blocks.sanity_check() +except RuntimeError: + error_raised = True +assert error_raised +blocks.del_node(my_block_src2) +blocks.sanity_check() + +## Guess block size +### Initial state +assert not hasattr(first_block, 'size') +assert not hasattr(first_block, 'max_size') +blocks.guess_blocks_size(mdis.arch) +assert first_block.size == 39 +assert blocks.label2block(my_block_src1.label).size == 0 +assert first_block.max_size == 39 +assert blocks.label2block(my_block_src1.label).max_size == 0 + +## Check pendings +### Create a pending element +my_block_src = asm_bloc(asm_label("testlabel_pend_src")) +my_block_dst = asm_bloc(asm_label("testlabel_pend_dst")) +my_block_src.bto.add(asm_constraint_to(my_block_dst.label)) +blocks.add_node(my_block_src) +### Check resulting state +assert len(blocks) == 7 +assert len(blocks.pendings) == 1 +assert my_block_dst.label in blocks.pendings +assert len(blocks.pendings[my_block_dst.label]) == 1 +pending = list(blocks.pendings[my_block_dst.label])[0] +assert isinstance(pending, blocks.AsmCFGPending) +assert pending.waiter == my_block_src +assert pending.constraint == asm_constraint.c_to +### Sanity check must fail +error_raised = False +try: + blocks.sanity_check() +except RuntimeError: + error_raised = True +assert error_raised +### Pending must disappeared when adding expected block +blocks.add_node(my_block_dst) +assert len(blocks) == 8 +assert len(blocks.pendings) == 0 +blocks.sanity_check() + +# Test block_merge +data2 = "31c0eb0c31c9750c31d2eb0c31ffebf831dbebf031edebfc31f6ebf031e4c3".decode("hex") +cont2 = Container.from_string(data2) +mdis = dis_x86_32(cont2.bin_stream) +## Elements to merge +blocks = mdis.dis_multibloc(0) +## Block alone +blocks.add_node(mdis.dis_bloc(0x1c)) +## Bad block +blocks.add_node(mdis.dis_bloc(len(data2))) +## Dump the graph before merging +open("graph3.dot", "w").write(blocks.dot()) +## Apply merging +blocks = bbl_simplifier(blocks) +## Dump the graph after merging +open("graph4.dot", "w").write(blocks.dot()) +## Check the final state +assert len(blocks) == 5 +assert len(list(blocks.get_bad_blocks())) == 1 +### Check "special" blocks +entry_blocks = blocks.heads() +bad_block = (block for block in entry_blocks + if isinstance(block, asm_block_bad)).next() +entry_blocks.remove(bad_block) +alone_block = (block for block in entry_blocks + if len(blocks.successors(block)) == 0).next() +entry_blocks.remove(alone_block) +assert alone_block.lines[-1].name == "RET" +assert len(alone_block.lines) == 2 +### Check resulting function +entry_block = entry_blocks.pop() +assert len(entry_block.lines) == 4 +assert map(str, entry_block.lines) == ['XOR EAX, EAX', + 'XOR EBX, EBX', + 'XOR ECX, ECX', + 'JNZ loc_0000000000000014:0x00000014'] +assert len(blocks.successors(entry_block)) == 2 +assert len(entry_block.bto) == 2 +nextb = blocks.label2block((cons.label for cons in entry_block.bto + if cons.c_t == asm_constraint.c_next).next()) +tob = blocks.label2block((cons.label for cons in entry_block.bto + if cons.c_t == asm_constraint.c_to).next()) +assert len(nextb.lines) == 4 +assert map(str, nextb.lines) == ['XOR EDX, EDX', + 'XOR ESI, ESI', + 'XOR EDI, EDI', + 'JMP loc_0000000000000008:0x00000008'] +assert blocks.successors(nextb) == [nextb] +assert len(tob.lines) == 2 +assert map(str, tob.lines) == ['XOR EBP, EBP', + 'JMP loc_0000000000000014:0x00000014'] +assert blocks.successors(tob) == [tob] + +# Check split_block +## Without condition for a split, no change +blocks_bef = blocks.copy() +blocks.apply_splitting(mdis.symbol_pool) +assert blocks_bef == blocks +## Create conditions for a block split +inside_firstbbl = mdis.symbol_pool.getby_offset(4) +tob.bto.add(asm_constraint_to(inside_firstbbl)) +blocks.rebuild_edges() +assert len(blocks.pendings) == 1 +assert inside_firstbbl in blocks.pendings +blocks.apply_splitting(mdis.symbol_pool) +## Check result +assert len(blocks) == 6 +assert len(blocks.pendings) == 0 +assert len(entry_block.lines) == 2 +assert map(str, entry_block.lines) == ['XOR EAX, EAX', + 'XOR EBX, EBX'] +assert len(blocks.successors(entry_block)) == 1 +newb = blocks.successors(entry_block)[0] +assert len(newb.lines) == 2 +assert map(str, newb.lines) == ['XOR ECX, ECX', + 'JNZ loc_0000000000000014:0x00000014'] +preds = blocks.predecessors(newb) +assert len(preds) == 2 +assert entry_block in preds +assert tob in preds +assert blocks.edges2constraint[(entry_block, newb)] == asm_constraint.c_next +assert blocks.edges2constraint[(tob, newb)] == asm_constraint.c_to diff --git a/test/core/graph.py b/test/core/graph.py index 86175c91..33a2fc6f 100644 --- a/test/core/graph.py +++ b/test/core/graph.py @@ -192,3 +192,28 @@ assert(sccs == {frozenset({6}), frozenset({7, 8}), frozenset({3}), frozenset({1, 2, 4, 5, 9})}) + +# Equality +graph = DiGraph() +graph.add_edge(1, 2) +graph.add_edge(2, 3) +graph2 = DiGraph() +graph2.add_edge(2, 3) +graph2.add_edge(1, 2) +assert graph == graph2 + +# Copy +graph4 = graph.copy() +assert graph == graph4 + +# Merge +graph3 = DiGraph() +graph3.add_edge(3, 1) +graph3.add_edge(1, 4) +graph4 += graph3 +for node in graph3.nodes(): + assert node in graph4.nodes() +for edge in graph3.edges(): + assert edge in graph4.edges() +assert graph4.nodes() == graph.nodes().union(graph3.nodes()) +assert sorted(graph4.edges()) == sorted(graph.edges() + graph3.edges()) diff --git a/test/test_all.py b/test/test_all.py index 6b94ad1f..9d7c1256 100644 --- a/test/test_all.py +++ b/test/test_all.py @@ -194,6 +194,9 @@ for script in ["interval.py", "test_types.py", ]: testset += RegressionTest([script], base_dir="core") +testset += RegressionTest(["asmbloc.py"], base_dir="core", + products=["graph.dot", "graph2.dot", + "graph3.dot", "graph4.dot"]) ## Expression for script in ["modint.py", "expression.py", |