about summary refs log tree commit diff stats
path: root/miasm2/analysis/dse.py
diff options
context:
space:
mode:
Diffstat (limited to 'miasm2/analysis/dse.py')
-rw-r--r--miasm2/analysis/dse.py81
1 files changed, 50 insertions, 31 deletions
diff --git a/miasm2/analysis/dse.py b/miasm2/analysis/dse.py
index 427a8bd0..0c01610f 100644
--- a/miasm2/analysis/dse.py
+++ b/miasm2/analysis/dse.py
@@ -56,15 +56,14 @@ except ImportError:
     z3 = None
 
 from miasm2.expression.expression import ExprMem, ExprInt, ExprCompose, \
-    ExprAff, ExprId
+    ExprAff, ExprId, ExprLoc, LocKey
 from miasm2.core.bin_stream import bin_stream_vm
-from miasm2.core.asmblock import expr_is_label
 from miasm2.jitter.emulatedsymbexec import EmulatedSymbExec
 from miasm2.expression.expression_helper import possible_values
 from miasm2.ir.translators import Translator
 from miasm2.analysis.expression_range import expr_range
 from miasm2.analysis.modularintervals import ModularIntervals
-
+from miasm2.core.locationdb import LocationDB
 
 DriftInfo = namedtuple("DriftInfo", ["symbol", "computed", "expected"])
 
@@ -72,7 +71,7 @@ class DriftException(Exception):
     """Raised when the emulation drift from the reference engine"""
 
     def __init__(self, info):
-        super(Exception, self).__init__()
+        super(DriftException, self).__init__()
         self.info = info
 
     def __str__(self):
@@ -150,10 +149,12 @@ class DSEEngine(object):
 
     def __init__(self, machine):
         self.machine = machine
+        self.loc_db = LocationDB()
         self.handler = {} # addr -> callback(DSEEngine instance)
         self.instrumentation = {} # addr -> callback(DSEEngine instance)
         self.addr_to_cacheblocks = {} # addr -> {label -> IRBlock}
-        self.ir_arch = self.machine.ir() # corresponding IR
+        self.ir_arch = self.machine.ir(loc_db=self.loc_db) # corresponding IR
+        self.ircfg = self.ir_arch.new_ircfg() # corresponding IR
 
         # Defined after attachment
         self.jitter = None # Jitload (concrete execution)
@@ -165,20 +166,24 @@ class DSEEngine(object):
         """Prepare the environment for attachment with a jitter"""
         # Disassembler
         self.mdis = self.machine.dis_engine(bin_stream_vm(self.jitter.vm),
-                                            lines_wd=1)
+                                            lines_wd=1,
+                                            loc_db=self.loc_db)
 
         # Symbexec engine
         ## Prepare symbexec engines
         self.symb = self.SYMB_ENGINE(self.jitter.cpu, self.jitter.vm,
                                      self.ir_arch, {})
         self.symb.enable_emulated_simplifications()
-        self.symb_concrete = EmulatedSymbExec(self.jitter.cpu, self.jitter.vm,
-                                              self.ir_arch, {})
+        self.symb_concrete = EmulatedSymbExec(
+            self.jitter.cpu, self.jitter.vm,
+            self.ir_arch, {}
+        )
 
         ## Update registers value
-        self.symb.symbols[self.ir_arch.IRDst] = ExprInt(getattr(self.jitter.cpu,
-                                                                self.ir_arch.pc.name),
-                                                        self.ir_arch.IRDst.size)
+        self.symb.symbols[self.ir_arch.IRDst] = ExprInt(
+            getattr(self.jitter.cpu, self.ir_arch.pc.name),
+            self.ir_arch.IRDst.size
+        )
 
         # Avoid memory write
         self.symb.func_write = None
@@ -188,7 +193,7 @@ class DSEEngine(object):
         self.jitter.exec_cb = self.callback
 
         # Clean jit cache to avoid multi-line basic blocks already jitted
-        self.jitter.jit.lbl2jitbloc.clear()
+        self.jitter.jit.clear_jitted_blocks()
 
     def attach(self, emulator):
         """Attach the DSE to @emulator
@@ -215,9 +220,9 @@ class DSEEngine(object):
         self.prepare()
 
     def handle(self, cur_addr):
-        """Handle destination
+        r"""Handle destination
         @cur_addr: Expr of the next address in concrete execution
-        /!\ cur_addr may be a lbl_gen
+        /!\ cur_addr may be a loc_key
 
         In this method, self.symb is in the "just before branching" state
         """
@@ -295,6 +300,9 @@ class DSEEngine(object):
 
         # Call callbacks associated to the current address
         cur_addr = self.jitter.pc
+        if isinstance(cur_addr, LocKey):
+            lbl = self.ir_arch.loc_db.loc_key_to_label(cur_addr)
+            cur_addr = lbl.offset
 
         if cur_addr in self.handler:
             self.handler[cur_addr](self)
@@ -312,24 +320,24 @@ class DSEEngine(object):
 
         # Get IR blocks
         if cur_addr in self.addr_to_cacheblocks:
-            self.ir_arch.blocks.clear()
-            self.ir_arch.blocks.update(self.addr_to_cacheblocks[cur_addr])
+            self.ircfg.blocks.clear()
+            self.ircfg.blocks.update(self.addr_to_cacheblocks[cur_addr])
         else:
 
             ## Reset cache structures
-            self.ir_arch.blocks.clear()# = {}
+            self.ircfg.blocks.clear()# = {}
 
             ## Update current state
             asm_block = self.mdis.dis_block(cur_addr)
-            self.ir_arch.add_block(asm_block)
-            self.addr_to_cacheblocks[cur_addr] = dict(self.ir_arch.blocks)
+            self.ir_arch.add_asmblock_to_ircfg(asm_block, self.ircfg)
+            self.addr_to_cacheblocks[cur_addr] = dict(self.ircfg.blocks)
 
         # Emulate the current instruction
         self.symb.reset_modified()
 
         # Is the symbolic execution going (potentially) to jump on a lbl_gen?
-        if len(self.ir_arch.blocks) == 1:
-            next_addr = self.symb.run_at(cur_addr)
+        if len(self.ircfg.blocks) == 1:
+            self.symb.run_at(self.ircfg, cur_addr)
         else:
             # Emulation could stuck in generated IR blocks
             # But concrete execution callback is not enough precise to obtain
@@ -339,11 +347,16 @@ class DSEEngine(object):
             # Update the concrete execution
             self._update_state_from_concrete_symb(self.symb_concrete)
             while True:
-                next_addr_concrete = self.symb_concrete.run_block_at(cur_addr)
-                self.symb.run_block_at(cur_addr)
 
-                if not(expr_is_label(next_addr_concrete) and
-                       next_addr_concrete.name.offset is None):
+                next_addr_concrete = self.symb_concrete.run_block_at(
+                    self.ircfg, cur_addr
+                )
+                self.symb.run_block_at(self.ircfg, cur_addr)
+
+                if not (isinstance(next_addr_concrete, ExprLoc) and
+                        self.ir_arch.loc_db.get_location_offset(
+                            next_addr_concrete.loc_key
+                        ) is None):
                     # Not a lbl_gen, exit
                     break
 
@@ -351,6 +364,7 @@ class DSEEngine(object):
                 self.handle(next_addr_concrete)
                 cur_addr = next_addr_concrete
 
+
         # At this stage, symbolic engine is one instruction after the concrete
         # engine
 
@@ -428,7 +442,7 @@ class DSEEngine(object):
                     symbexec.symbols[reg] = value
 
     def update_state_from_concrete(self, cpu=True, mem=False):
-        """Update the symbolic state with concrete values from the concrete
+        r"""Update the symbolic state with concrete values from the concrete
         engine
 
         @cpu: (optional) if set, update registers' value
@@ -596,13 +610,19 @@ class DSEPathConstraint(DSEEngine):
             self.cur_solver.add(self.z3_trans.from_expr(cons))
 
     def handle(self, cur_addr):
+        cur_addr = self.ir_arch.loc_db.canonize_to_exprloc(cur_addr)
         symb_pc = self.eval_expr(self.ir_arch.IRDst)
         possibilities = possible_values(symb_pc)
         cur_path_constraint = set() # path_constraint for the concrete path
         if len(possibilities) == 1:
-            assert next(iter(possibilities)).value == cur_addr
+            dst = next(iter(possibilities)).value
+            dst = self.ir_arch.loc_db.canonize_to_exprloc(dst)
+            assert dst == cur_addr
         else:
             for possibility in possibilities:
+                target_addr = self.ir_arch.loc_db.canonize_to_exprloc(
+                    possibility.value
+                )
                 path_constraint = set() # Set of ExprAff for the possible path
 
                 # Get constraint associated to the possible path
@@ -642,11 +662,11 @@ class DSEPathConstraint(DSEEngine):
                                             "address 0x%x" % address)
                         path_constraint.add(ExprAff(expr_mem, value))
 
-                if possibility.value == cur_addr:
+                if target_addr == cur_addr:
                     # Add path constraint
                     cur_path_constraint = path_constraint
 
-                elif self.produce_solution(possibility.value):
+                elif self.produce_solution(target_addr):
                     # Looking for a new solution
                     self.cur_solver.push()
                     for cons in path_constraint:
@@ -657,8 +677,7 @@ class DSEPathConstraint(DSEEngine):
                     result = self.cur_solver.check()
                     if result == z3.sat:
                         model = self.cur_solver.model()
-                        self.handle_solution(model, possibility.value)
+                        self.handle_solution(model, target_addr)
                     self.cur_solver.pop()
 
         self.handle_correct_destination(cur_addr, cur_path_constraint)
-