about summary refs log tree commit diff stats
path: root/miasm2/core/asmbloc.py
diff options
context:
space:
mode:
Diffstat (limited to 'miasm2/core/asmbloc.py')
-rw-r--r--miasm2/core/asmbloc.py412
1 files changed, 210 insertions, 202 deletions
diff --git a/miasm2/core/asmbloc.py b/miasm2/core/asmbloc.py
index 96c2f4ec..54cd51cf 100644
--- a/miasm2/core/asmbloc.py
+++ b/miasm2/core/asmbloc.py
@@ -410,183 +410,6 @@ class asm_symbol_pool:
         return label
 
 
-def dis_bloc(mnemo, pool_bin, label, offset, job_done, symbol_pool,
-             dont_dis=None, split_dis=None, follow_call=False,
-             dontdis_retcall=False, lines_wd=None, dis_bloc_callback=None,
-             dont_dis_nulstart_bloc=False, attrib=None):
-    # pool_bin.offset = offset
-    if dont_dis is None:
-        dont_dis = []
-    if split_dis is None:
-        split_dis = []
-    if attrib is None:
-        attrib = {}
-    lines_cpt = 0
-    in_delayslot = False
-    delayslot_count = mnemo.delayslot
-    offsets_to_dis = set()
-    add_next_offset = False
-    cur_block = asm_bloc(label)
-    log_asmbloc.debug("dis at %X", int(offset))
-    while not in_delayslot or delayslot_count > 0:
-        if in_delayslot:
-            delayslot_count -= 1
-
-        if offset in dont_dis:
-            if not cur_block.lines:
-                job_done.add(offset)
-                # Block is empty -> bad block
-                cur_block = asm_block_bad(label, errno=2)
-            else:
-                # Block is not empty, stop the desassembly pass and add a
-                # constraint to the next block
-                cur_block.add_cst(offset, asm_constraint.c_next, symbol_pool)
-            break
-
-        if lines_cpt > 0 and offset in split_dis:
-            cur_block.add_cst(offset, asm_constraint.c_next, symbol_pool)
-            offsets_to_dis.add(offset)
-            break
-
-        lines_cpt += 1
-        if lines_wd is not None and lines_cpt > lines_wd:
-            # log_asmbloc.warning( "lines watchdog reached at %X"%int(offset))
-            break
-
-        if offset in job_done:
-            cur_block.add_cst(offset, asm_constraint.c_next, symbol_pool)
-            break
-
-        off_i = offset
-        try:
-            instr = mnemo.dis(pool_bin, attrib, offset)
-        except (Disasm_Exception, IOError), e:
-            log_asmbloc.warning(e)
-            instr = None
-
-        if instr is None:
-            log_asmbloc.warning("cannot disasm at %X", int(off_i))
-            if not cur_block.lines:
-                job_done.add(offset)
-                # Block is empty -> bad block
-                cur_block = asm_block_bad(label, errno=0)
-            else:
-                # Block is not empty, stop the desassembly pass and add a
-                # constraint to the next block
-                cur_block.add_cst(off_i, asm_constraint.c_next, symbol_pool)
-            break
-
-        # XXX TODO nul start block option
-        if dont_dis_nulstart_bloc and instr.b.count('\x00') == instr.l:
-            log_asmbloc.warning("reach nul instr at %X", int(off_i))
-            if not cur_block.lines:
-                # Block is empty -> bad block
-                cur_block = asm_block_bad(label, errno=1)
-            else:
-                # Block is not empty, stop the desassembly pass and add a
-                # constraint to the next block
-                cur_block.add_cst(off_i, asm_constraint.c_next, symbol_pool)
-            break
-
-        # special case: flow graph modificator in delayslot
-        if in_delayslot and instr and (instr.splitflow() or instr.breakflow()):
-            add_next_offset = True
-            break
-
-        job_done.add(offset)
-        log_asmbloc.debug("dis at %X", int(offset))
-
-        offset += instr.l
-        log_asmbloc.debug(instr)
-        log_asmbloc.debug(instr.args)
-
-        cur_block.addline(instr)
-        if not instr.breakflow():
-            continue
-        # test split
-        if instr.splitflow() and not (instr.is_subcall() and dontdis_retcall):
-            add_next_offset = True
-            # cur_bloc.add_cst(n, asm_constraint.c_next, symbol_pool)
-            pass
-        if instr.dstflow():
-            instr.dstflow2label(symbol_pool)
-            dst = instr.getdstflow(symbol_pool)
-            dstn = []
-            for d in dst:
-                if isinstance(d, m2_expr.ExprId) and \
-                        isinstance(d.name, asm_label):
-                    dstn.append(d.name)
-            dst = dstn
-            if (not instr.is_subcall()) or follow_call:
-                cur_block.bto.update(
-                    [asm_constraint(x, asm_constraint.c_to) for x in dst])
-
-        # get in delayslot mode
-        in_delayslot = True
-        delayslot_count = instr.delayslot
-
-    for c in cur_block.bto:
-        offsets_to_dis.add(c.label.offset)
-
-    if add_next_offset:
-        cur_block.add_cst(offset, asm_constraint.c_next, symbol_pool)
-        offsets_to_dis.add(offset)
-
-    # Fix multiple constraints
-    cur_block.fix_constraints()
-
-    if dis_bloc_callback is not None:
-        dis_bloc_callback(mn=mnemo, attrib=attrib, pool_bin=pool_bin,
-                          cur_bloc=cur_block, offsets_to_dis=offsets_to_dis,
-                          symbol_pool=symbol_pool)
-    # print 'dst', [hex(x) for x in offsets_to_dis]
-    return cur_block, offsets_to_dis
-
-
-def dis_bloc_all(mnemo, pool_bin, offset, job_done, symbol_pool, dont_dis=None,
-                 split_dis=None, follow_call=False, dontdis_retcall=False,
-                 blocs_wd=None, lines_wd=None, blocs=None,
-                 dis_bloc_callback=None, dont_dis_nulstart_bloc=False,
-                 attrib=None):
-    log_asmbloc.info("dis bloc all")
-    if dont_dis is None:
-        dont_dis = []
-    if split_dis is None:
-        split_dis = []
-    if attrib is None:
-        attrib = {}
-    if blocs is None:
-        blocs = AsmCFG()
-    todo = [offset]
-
-    bloc_cpt = 0
-    while len(todo):
-        bloc_cpt += 1
-        if blocs_wd is not None and bloc_cpt > blocs_wd:
-            log_asmbloc.debug("blocs watchdog reached at %X", int(offset))
-            break
-
-        n = int(todo.pop(0))
-        if n is None:
-            continue
-        if n in job_done:
-            continue
-        label = symbol_pool.getby_offset_create(n)
-        cur_block, nexts = dis_bloc(mnemo, pool_bin, label, n, job_done,
-                                    symbol_pool, dont_dis, split_dis,
-                                    follow_call, dontdis_retcall,
-                                    dis_bloc_callback=dis_bloc_callback,
-                                    lines_wd=lines_wd,
-                                    dont_dis_nulstart_bloc=dont_dis_nulstart_bloc,
-                                    attrib=attrib)
-        todo += nexts
-        blocs.add_node(cur_block)
-
-    blocs.apply_splitting(symbol_pool, dis_block_callback=dis_bloc_callback,
-                          mn=mnemo, attrib=attrib, pool_bin=pool_bin)
-    return blocs
-
-
 class AsmCFG(DiGraph):
 
     """Directed graph standing for a ASM Control Flow Graph with:
@@ -1433,11 +1256,50 @@ def asm_resolve_final(mnemo, blocks, symbol_pool, dst_interval=None):
 
 class disasmEngine(object):
 
-    def __init__(self, arch, attrib, bs=None, **kwargs):
+    """Disassembly engine, taking care of disassembler options and mutli-block
+    strategy.
+
+    Engine options:
+
+    + Object supporting membership test (offset in ..)
+     - dont_dis: stop the current disassembly branch if reached
+     - split_dis: force a basic block end if reached,
+                  with a next constraint on its successor
+
+    + On/Off
+     - follow_call: recursively disassemble CALL destinations
+     - dontdis_retcall: stop on CALL return addresses
+     - dont_dis_nulstart_bloc: stop if a block begin with a few \x00
+
+    + Number
+     - lines_wd: maximum block's size (in number of instruction)
+     - blocs_wd: maximum number of distinct disassembled block
+
+    + callback(arch, attrib, pool_bin, cur_bloc, offsets_to_dis,
+               symbol_pool)
+     - dis_bloc_callback: callback after each new disassembled block
+
+    The engine also tracks already handled block, for performance and to avoid
+    infinite cycling.
+    Addresses of disassembled block is in the attribute `job_done`.
+    To force a new disassembly, the targeted offset must first be removed from
+    this structure.
+    """
+
+    def __init__(self, arch, attrib, bin_stream, **kwargs):
+        """Instanciate a new disassembly engine
+        @arch: targeted architecture
+        @attrib: architecture attribute
+        @bin_stream: bytes source
+        @kwargs: (optional) custom options
+        """
         self.arch = arch
         self.attrib = attrib
-        self.bs = bs
+        self.bin_stream = bin_stream
         self.symbol_pool = asm_symbol_pool()
+        self.job_done = set()
+
+        # Setup options
         self.dont_dis = []
         self.split_dis = []
         self.follow_call = False
@@ -1446,33 +1308,179 @@ class disasmEngine(object):
         self.blocs_wd = None
         self.dis_bloc_callback = None
         self.dont_dis_nulstart_bloc = False
-        self.job_done = set()
+
+        # Override options if needed
         self.__dict__.update(kwargs)
 
-    def dis_bloc(self, offset):
+    def _dis_bloc(self, offset):
+        """Disassemble the block at offset @offset
+        Return the created asm_bloc and future offsets to disassemble
+        """
+
+        lines_cpt = 0
+        in_delayslot = False
+        delayslot_count = self.arch.delayslot
+        offsets_to_dis = set()
+        add_next_offset = False
         label = self.symbol_pool.getby_offset_create(offset)
-        current_block, _ = dis_bloc(self.arch, self.bs, label, offset,
-                                    self.job_done, self.symbol_pool,
-                                    dont_dis=self.dont_dis,
-                                    split_dis=self.split_dis,
-                                    follow_call=self.follow_call,
-                                    dontdis_retcall=self.dontdis_retcall,
-                                    lines_wd=self.lines_wd,
-                                    dis_bloc_callback=self.dis_bloc_callback,
-                                    dont_dis_nulstart_bloc=self.dont_dis_nulstart_bloc,
-                                    attrib=self.attrib)
+        cur_block = asm_bloc(label)
+        log_asmbloc.debug("dis at %X", int(offset))
+        while not in_delayslot or delayslot_count > 0:
+            if in_delayslot:
+                delayslot_count -= 1
+
+            if offset in self.dont_dis:
+                if not cur_block.lines:
+                    self.job_done.add(offset)
+                    # Block is empty -> bad block
+                    cur_block = asm_block_bad(label, errno=2)
+                else:
+                    # Block is not empty, stop the desassembly pass and add a
+                    # constraint to the next block
+                    cur_block.add_cst(offset, asm_constraint.c_next,
+                                      self.symbol_pool)
+                break
+
+            if lines_cpt > 0 and offset in self.split_dis:
+                cur_block.add_cst(offset, asm_constraint.c_next,
+                                  self.symbol_pool)
+                offsets_to_dis.add(offset)
+                break
+
+            lines_cpt += 1
+            if self.lines_wd is not None and lines_cpt > self.lines_wd:
+                log_asmbloc.debug("lines watchdog reached at %X", int(offset))
+                break
+
+            if offset in self.job_done:
+                cur_block.add_cst(offset, asm_constraint.c_next,
+                                  self.symbol_pool)
+                break
+
+            off_i = offset
+            try:
+                instr = self.arch.dis(self.bin_stream, self.attrib, offset)
+            except (Disasm_Exception, IOError), e:
+                log_asmbloc.warning(e)
+                instr = None
+
+            if instr is None:
+                log_asmbloc.warning("cannot disasm at %X", int(off_i))
+                if not cur_block.lines:
+                    self.job_done.add(offset)
+                    # Block is empty -> bad block
+                    cur_block = asm_block_bad(label, errno=0)
+                else:
+                    # Block is not empty, stop the desassembly pass and add a
+                    # constraint to the next block
+                    cur_block.add_cst(off_i, asm_constraint.c_next,
+                                      self.symbol_pool)
+                break
+
+            # XXX TODO nul start block option
+            if self.dont_dis_nulstart_bloc and instr.b.count('\x00') == instr.l:
+                log_asmbloc.warning("reach nul instr at %X", int(off_i))
+                if not cur_block.lines:
+                    # Block is empty -> bad block
+                    cur_block = asm_block_bad(label, errno=1)
+                else:
+                    # Block is not empty, stop the desassembly pass and add a
+                    # constraint to the next block
+                    cur_block.add_cst(off_i, asm_constraint.c_next,
+                                      self.symbol_pool)
+                break
+
+            # special case: flow graph modificator in delayslot
+            if in_delayslot and instr and (instr.splitflow() or instr.breakflow()):
+                add_next_offset = True
+                break
+
+            self.job_done.add(offset)
+            log_asmbloc.debug("dis at %X", int(offset))
+
+            offset += instr.l
+            log_asmbloc.debug(instr)
+            log_asmbloc.debug(instr.args)
+
+            cur_block.addline(instr)
+            if not instr.breakflow():
+                continue
+            # test split
+            if instr.splitflow() and not (instr.is_subcall() and self.dontdis_retcall):
+                add_next_offset = True
+                pass
+            if instr.dstflow():
+                instr.dstflow2label(self.symbol_pool)
+                dst = instr.getdstflow(self.symbol_pool)
+                dstn = []
+                for d in dst:
+                    if isinstance(d, m2_expr.ExprId) and \
+                            isinstance(d.name, asm_label):
+                        dstn.append(d.name)
+                dst = dstn
+                if (not instr.is_subcall()) or self.follow_call:
+                    cur_block.bto.update(
+                        [asm_constraint(x, asm_constraint.c_to) for x in dst])
+
+            # get in delayslot mode
+            in_delayslot = True
+            delayslot_count = instr.delayslot
+
+        for c in cur_block.bto:
+            offsets_to_dis.add(c.label.offset)
+
+        if add_next_offset:
+            cur_block.add_cst(offset, asm_constraint.c_next, self.symbol_pool)
+            offsets_to_dis.add(offset)
+
+        # Fix multiple constraints
+        cur_block.fix_constraints()
+
+        if self.dis_bloc_callback is not None:
+            self.dis_bloc_callback(mn=self.arch, attrib=self.attrib,
+                                   pool_bin=self.bin_stream, cur_bloc=cur_block,
+                                   offsets_to_dis=offsets_to_dis,
+                                   symbol_pool=self.symbol_pool)
+        return cur_block, offsets_to_dis
+
+    def dis_bloc(self, offset):
+        """Disassemble the block at offset @offset and return the created
+        asm_bloc
+        @offset: targeted offset to disassemble
+        """
+        current_block, _ = self._dis_bloc(offset)
         return current_block
 
     def dis_multibloc(self, offset, blocs=None):
-        blocs = dis_bloc_all(self.arch, self.bs, offset, self.job_done,
-                             self.symbol_pool,
-                             dont_dis=self.dont_dis, split_dis=self.split_dis,
-                             follow_call=self.follow_call,
-                             dontdis_retcall=self.dontdis_retcall,
-                             blocs_wd=self.blocs_wd,
-                             lines_wd=self.lines_wd,
-                             blocs=blocs,
-                             dis_bloc_callback=self.dis_bloc_callback,
-                             dont_dis_nulstart_bloc=self.dont_dis_nulstart_bloc,
-                             attrib=self.attrib)
+        """Disassemble every block reachable from @offset regarding
+        specific disasmEngine conditions
+        Return an AsmCFG instance containing disassembled blocks
+        @offset: starting offset
+        @blocs: (optional) AsmCFG instance of already disassembled blocks to
+                merge with
+        """
+        log_asmbloc.info("dis bloc all")
+        if blocs is None:
+            blocs = AsmCFG()
+        todo = [offset]
+
+        bloc_cpt = 0
+        while len(todo):
+            bloc_cpt += 1
+            if self.blocs_wd is not None and bloc_cpt > self.blocs_wd:
+                log_asmbloc.debug("blocs watchdog reached at %X", int(offset))
+                break
+
+            target_offset = int(todo.pop(0))
+            if (target_offset is None or
+                    target_offset in self.job_done):
+                continue
+            cur_block, nexts = self._dis_bloc(target_offset)
+            todo += nexts
+            blocs.add_node(cur_block)
+
+        blocs.apply_splitting(self.symbol_pool,
+                              dis_block_callback=self.dis_bloc_callback,
+                              mn=self.arch, attrib=self.attrib,
+                              pool_bin=self.bin_stream)
         return blocs