diff options
Diffstat (limited to 'miasm2/core/asmbloc.py')
| -rw-r--r-- | miasm2/core/asmbloc.py | 412 |
1 files changed, 210 insertions, 202 deletions
diff --git a/miasm2/core/asmbloc.py b/miasm2/core/asmbloc.py index 96c2f4ec..54cd51cf 100644 --- a/miasm2/core/asmbloc.py +++ b/miasm2/core/asmbloc.py @@ -410,183 +410,6 @@ class asm_symbol_pool: return label -def dis_bloc(mnemo, pool_bin, label, offset, job_done, symbol_pool, - dont_dis=None, split_dis=None, follow_call=False, - dontdis_retcall=False, lines_wd=None, dis_bloc_callback=None, - dont_dis_nulstart_bloc=False, attrib=None): - # pool_bin.offset = offset - if dont_dis is None: - dont_dis = [] - if split_dis is None: - split_dis = [] - if attrib is None: - attrib = {} - lines_cpt = 0 - in_delayslot = False - delayslot_count = mnemo.delayslot - offsets_to_dis = set() - add_next_offset = False - cur_block = asm_bloc(label) - log_asmbloc.debug("dis at %X", int(offset)) - while not in_delayslot or delayslot_count > 0: - if in_delayslot: - delayslot_count -= 1 - - if offset in dont_dis: - if not cur_block.lines: - job_done.add(offset) - # Block is empty -> bad block - cur_block = asm_block_bad(label, errno=2) - else: - # Block is not empty, stop the desassembly pass and add a - # constraint to the next block - cur_block.add_cst(offset, asm_constraint.c_next, symbol_pool) - break - - if lines_cpt > 0 and offset in split_dis: - cur_block.add_cst(offset, asm_constraint.c_next, symbol_pool) - offsets_to_dis.add(offset) - break - - lines_cpt += 1 - if lines_wd is not None and lines_cpt > lines_wd: - # log_asmbloc.warning( "lines watchdog reached at %X"%int(offset)) - break - - if offset in job_done: - cur_block.add_cst(offset, asm_constraint.c_next, symbol_pool) - break - - off_i = offset - try: - instr = mnemo.dis(pool_bin, attrib, offset) - except (Disasm_Exception, IOError), e: - log_asmbloc.warning(e) - instr = None - - if instr is None: - log_asmbloc.warning("cannot disasm at %X", int(off_i)) - if not cur_block.lines: - job_done.add(offset) - # Block is empty -> bad block - cur_block = asm_block_bad(label, errno=0) - else: - # Block is not empty, stop the desassembly pass and add a - # constraint to the next block - cur_block.add_cst(off_i, asm_constraint.c_next, symbol_pool) - break - - # XXX TODO nul start block option - if dont_dis_nulstart_bloc and instr.b.count('\x00') == instr.l: - log_asmbloc.warning("reach nul instr at %X", int(off_i)) - if not cur_block.lines: - # Block is empty -> bad block - cur_block = asm_block_bad(label, errno=1) - else: - # Block is not empty, stop the desassembly pass and add a - # constraint to the next block - cur_block.add_cst(off_i, asm_constraint.c_next, symbol_pool) - break - - # special case: flow graph modificator in delayslot - if in_delayslot and instr and (instr.splitflow() or instr.breakflow()): - add_next_offset = True - break - - job_done.add(offset) - log_asmbloc.debug("dis at %X", int(offset)) - - offset += instr.l - log_asmbloc.debug(instr) - log_asmbloc.debug(instr.args) - - cur_block.addline(instr) - if not instr.breakflow(): - continue - # test split - if instr.splitflow() and not (instr.is_subcall() and dontdis_retcall): - add_next_offset = True - # cur_bloc.add_cst(n, asm_constraint.c_next, symbol_pool) - pass - if instr.dstflow(): - instr.dstflow2label(symbol_pool) - dst = instr.getdstflow(symbol_pool) - dstn = [] - for d in dst: - if isinstance(d, m2_expr.ExprId) and \ - isinstance(d.name, asm_label): - dstn.append(d.name) - dst = dstn - if (not instr.is_subcall()) or follow_call: - cur_block.bto.update( - [asm_constraint(x, asm_constraint.c_to) for x in dst]) - - # get in delayslot mode - in_delayslot = True - delayslot_count = instr.delayslot - - for c in cur_block.bto: - offsets_to_dis.add(c.label.offset) - - if add_next_offset: - cur_block.add_cst(offset, asm_constraint.c_next, symbol_pool) - offsets_to_dis.add(offset) - - # Fix multiple constraints - cur_block.fix_constraints() - - if dis_bloc_callback is not None: - dis_bloc_callback(mn=mnemo, attrib=attrib, pool_bin=pool_bin, - cur_bloc=cur_block, offsets_to_dis=offsets_to_dis, - symbol_pool=symbol_pool) - # print 'dst', [hex(x) for x in offsets_to_dis] - return cur_block, offsets_to_dis - - -def dis_bloc_all(mnemo, pool_bin, offset, job_done, symbol_pool, dont_dis=None, - split_dis=None, follow_call=False, dontdis_retcall=False, - blocs_wd=None, lines_wd=None, blocs=None, - dis_bloc_callback=None, dont_dis_nulstart_bloc=False, - attrib=None): - log_asmbloc.info("dis bloc all") - if dont_dis is None: - dont_dis = [] - if split_dis is None: - split_dis = [] - if attrib is None: - attrib = {} - if blocs is None: - blocs = AsmCFG() - todo = [offset] - - bloc_cpt = 0 - while len(todo): - bloc_cpt += 1 - if blocs_wd is not None and bloc_cpt > blocs_wd: - log_asmbloc.debug("blocs watchdog reached at %X", int(offset)) - break - - n = int(todo.pop(0)) - if n is None: - continue - if n in job_done: - continue - label = symbol_pool.getby_offset_create(n) - cur_block, nexts = dis_bloc(mnemo, pool_bin, label, n, job_done, - symbol_pool, dont_dis, split_dis, - follow_call, dontdis_retcall, - dis_bloc_callback=dis_bloc_callback, - lines_wd=lines_wd, - dont_dis_nulstart_bloc=dont_dis_nulstart_bloc, - attrib=attrib) - todo += nexts - blocs.add_node(cur_block) - - blocs.apply_splitting(symbol_pool, dis_block_callback=dis_bloc_callback, - mn=mnemo, attrib=attrib, pool_bin=pool_bin) - return blocs - - class AsmCFG(DiGraph): """Directed graph standing for a ASM Control Flow Graph with: @@ -1433,11 +1256,50 @@ def asm_resolve_final(mnemo, blocks, symbol_pool, dst_interval=None): class disasmEngine(object): - def __init__(self, arch, attrib, bs=None, **kwargs): + """Disassembly engine, taking care of disassembler options and mutli-block + strategy. + + Engine options: + + + Object supporting membership test (offset in ..) + - dont_dis: stop the current disassembly branch if reached + - split_dis: force a basic block end if reached, + with a next constraint on its successor + + + On/Off + - follow_call: recursively disassemble CALL destinations + - dontdis_retcall: stop on CALL return addresses + - dont_dis_nulstart_bloc: stop if a block begin with a few \x00 + + + Number + - lines_wd: maximum block's size (in number of instruction) + - blocs_wd: maximum number of distinct disassembled block + + + callback(arch, attrib, pool_bin, cur_bloc, offsets_to_dis, + symbol_pool) + - dis_bloc_callback: callback after each new disassembled block + + The engine also tracks already handled block, for performance and to avoid + infinite cycling. + Addresses of disassembled block is in the attribute `job_done`. + To force a new disassembly, the targeted offset must first be removed from + this structure. + """ + + def __init__(self, arch, attrib, bin_stream, **kwargs): + """Instanciate a new disassembly engine + @arch: targeted architecture + @attrib: architecture attribute + @bin_stream: bytes source + @kwargs: (optional) custom options + """ self.arch = arch self.attrib = attrib - self.bs = bs + self.bin_stream = bin_stream self.symbol_pool = asm_symbol_pool() + self.job_done = set() + + # Setup options self.dont_dis = [] self.split_dis = [] self.follow_call = False @@ -1446,33 +1308,179 @@ class disasmEngine(object): self.blocs_wd = None self.dis_bloc_callback = None self.dont_dis_nulstart_bloc = False - self.job_done = set() + + # Override options if needed self.__dict__.update(kwargs) - def dis_bloc(self, offset): + def _dis_bloc(self, offset): + """Disassemble the block at offset @offset + Return the created asm_bloc and future offsets to disassemble + """ + + lines_cpt = 0 + in_delayslot = False + delayslot_count = self.arch.delayslot + offsets_to_dis = set() + add_next_offset = False label = self.symbol_pool.getby_offset_create(offset) - current_block, _ = dis_bloc(self.arch, self.bs, label, offset, - self.job_done, self.symbol_pool, - dont_dis=self.dont_dis, - split_dis=self.split_dis, - follow_call=self.follow_call, - dontdis_retcall=self.dontdis_retcall, - lines_wd=self.lines_wd, - dis_bloc_callback=self.dis_bloc_callback, - dont_dis_nulstart_bloc=self.dont_dis_nulstart_bloc, - attrib=self.attrib) + cur_block = asm_bloc(label) + log_asmbloc.debug("dis at %X", int(offset)) + while not in_delayslot or delayslot_count > 0: + if in_delayslot: + delayslot_count -= 1 + + if offset in self.dont_dis: + if not cur_block.lines: + self.job_done.add(offset) + # Block is empty -> bad block + cur_block = asm_block_bad(label, errno=2) + else: + # Block is not empty, stop the desassembly pass and add a + # constraint to the next block + cur_block.add_cst(offset, asm_constraint.c_next, + self.symbol_pool) + break + + if lines_cpt > 0 and offset in self.split_dis: + cur_block.add_cst(offset, asm_constraint.c_next, + self.symbol_pool) + offsets_to_dis.add(offset) + break + + lines_cpt += 1 + if self.lines_wd is not None and lines_cpt > self.lines_wd: + log_asmbloc.debug("lines watchdog reached at %X", int(offset)) + break + + if offset in self.job_done: + cur_block.add_cst(offset, asm_constraint.c_next, + self.symbol_pool) + break + + off_i = offset + try: + instr = self.arch.dis(self.bin_stream, self.attrib, offset) + except (Disasm_Exception, IOError), e: + log_asmbloc.warning(e) + instr = None + + if instr is None: + log_asmbloc.warning("cannot disasm at %X", int(off_i)) + if not cur_block.lines: + self.job_done.add(offset) + # Block is empty -> bad block + cur_block = asm_block_bad(label, errno=0) + else: + # Block is not empty, stop the desassembly pass and add a + # constraint to the next block + cur_block.add_cst(off_i, asm_constraint.c_next, + self.symbol_pool) + break + + # XXX TODO nul start block option + if self.dont_dis_nulstart_bloc and instr.b.count('\x00') == instr.l: + log_asmbloc.warning("reach nul instr at %X", int(off_i)) + if not cur_block.lines: + # Block is empty -> bad block + cur_block = asm_block_bad(label, errno=1) + else: + # Block is not empty, stop the desassembly pass and add a + # constraint to the next block + cur_block.add_cst(off_i, asm_constraint.c_next, + self.symbol_pool) + break + + # special case: flow graph modificator in delayslot + if in_delayslot and instr and (instr.splitflow() or instr.breakflow()): + add_next_offset = True + break + + self.job_done.add(offset) + log_asmbloc.debug("dis at %X", int(offset)) + + offset += instr.l + log_asmbloc.debug(instr) + log_asmbloc.debug(instr.args) + + cur_block.addline(instr) + if not instr.breakflow(): + continue + # test split + if instr.splitflow() and not (instr.is_subcall() and self.dontdis_retcall): + add_next_offset = True + pass + if instr.dstflow(): + instr.dstflow2label(self.symbol_pool) + dst = instr.getdstflow(self.symbol_pool) + dstn = [] + for d in dst: + if isinstance(d, m2_expr.ExprId) and \ + isinstance(d.name, asm_label): + dstn.append(d.name) + dst = dstn + if (not instr.is_subcall()) or self.follow_call: + cur_block.bto.update( + [asm_constraint(x, asm_constraint.c_to) for x in dst]) + + # get in delayslot mode + in_delayslot = True + delayslot_count = instr.delayslot + + for c in cur_block.bto: + offsets_to_dis.add(c.label.offset) + + if add_next_offset: + cur_block.add_cst(offset, asm_constraint.c_next, self.symbol_pool) + offsets_to_dis.add(offset) + + # Fix multiple constraints + cur_block.fix_constraints() + + if self.dis_bloc_callback is not None: + self.dis_bloc_callback(mn=self.arch, attrib=self.attrib, + pool_bin=self.bin_stream, cur_bloc=cur_block, + offsets_to_dis=offsets_to_dis, + symbol_pool=self.symbol_pool) + return cur_block, offsets_to_dis + + def dis_bloc(self, offset): + """Disassemble the block at offset @offset and return the created + asm_bloc + @offset: targeted offset to disassemble + """ + current_block, _ = self._dis_bloc(offset) return current_block def dis_multibloc(self, offset, blocs=None): - blocs = dis_bloc_all(self.arch, self.bs, offset, self.job_done, - self.symbol_pool, - dont_dis=self.dont_dis, split_dis=self.split_dis, - follow_call=self.follow_call, - dontdis_retcall=self.dontdis_retcall, - blocs_wd=self.blocs_wd, - lines_wd=self.lines_wd, - blocs=blocs, - dis_bloc_callback=self.dis_bloc_callback, - dont_dis_nulstart_bloc=self.dont_dis_nulstart_bloc, - attrib=self.attrib) + """Disassemble every block reachable from @offset regarding + specific disasmEngine conditions + Return an AsmCFG instance containing disassembled blocks + @offset: starting offset + @blocs: (optional) AsmCFG instance of already disassembled blocks to + merge with + """ + log_asmbloc.info("dis bloc all") + if blocs is None: + blocs = AsmCFG() + todo = [offset] + + bloc_cpt = 0 + while len(todo): + bloc_cpt += 1 + if self.blocs_wd is not None and bloc_cpt > self.blocs_wd: + log_asmbloc.debug("blocs watchdog reached at %X", int(offset)) + break + + target_offset = int(todo.pop(0)) + if (target_offset is None or + target_offset in self.job_done): + continue + cur_block, nexts = self._dis_bloc(target_offset) + todo += nexts + blocs.add_node(cur_block) + + blocs.apply_splitting(self.symbol_pool, + dis_block_callback=self.dis_bloc_callback, + mn=self.arch, attrib=self.attrib, + pool_bin=self.bin_stream) return blocs |