diff options
Diffstat (limited to 'miasm2/analysis/dse.py')
| -rw-r--r-- | miasm2/analysis/dse.py | 81 |
1 files changed, 50 insertions, 31 deletions
diff --git a/miasm2/analysis/dse.py b/miasm2/analysis/dse.py index 427a8bd0..0c01610f 100644 --- a/miasm2/analysis/dse.py +++ b/miasm2/analysis/dse.py @@ -56,15 +56,14 @@ except ImportError: z3 = None from miasm2.expression.expression import ExprMem, ExprInt, ExprCompose, \ - ExprAff, ExprId + ExprAff, ExprId, ExprLoc, LocKey from miasm2.core.bin_stream import bin_stream_vm -from miasm2.core.asmblock import expr_is_label from miasm2.jitter.emulatedsymbexec import EmulatedSymbExec from miasm2.expression.expression_helper import possible_values from miasm2.ir.translators import Translator from miasm2.analysis.expression_range import expr_range from miasm2.analysis.modularintervals import ModularIntervals - +from miasm2.core.locationdb import LocationDB DriftInfo = namedtuple("DriftInfo", ["symbol", "computed", "expected"]) @@ -72,7 +71,7 @@ class DriftException(Exception): """Raised when the emulation drift from the reference engine""" def __init__(self, info): - super(Exception, self).__init__() + super(DriftException, self).__init__() self.info = info def __str__(self): @@ -150,10 +149,12 @@ class DSEEngine(object): def __init__(self, machine): self.machine = machine + self.loc_db = LocationDB() self.handler = {} # addr -> callback(DSEEngine instance) self.instrumentation = {} # addr -> callback(DSEEngine instance) self.addr_to_cacheblocks = {} # addr -> {label -> IRBlock} - self.ir_arch = self.machine.ir() # corresponding IR + self.ir_arch = self.machine.ir(loc_db=self.loc_db) # corresponding IR + self.ircfg = self.ir_arch.new_ircfg() # corresponding IR # Defined after attachment self.jitter = None # Jitload (concrete execution) @@ -165,20 +166,24 @@ class DSEEngine(object): """Prepare the environment for attachment with a jitter""" # Disassembler self.mdis = self.machine.dis_engine(bin_stream_vm(self.jitter.vm), - lines_wd=1) + lines_wd=1, + loc_db=self.loc_db) # Symbexec engine ## Prepare symbexec engines self.symb = self.SYMB_ENGINE(self.jitter.cpu, self.jitter.vm, self.ir_arch, {}) self.symb.enable_emulated_simplifications() - self.symb_concrete = EmulatedSymbExec(self.jitter.cpu, self.jitter.vm, - self.ir_arch, {}) + self.symb_concrete = EmulatedSymbExec( + self.jitter.cpu, self.jitter.vm, + self.ir_arch, {} + ) ## Update registers value - self.symb.symbols[self.ir_arch.IRDst] = ExprInt(getattr(self.jitter.cpu, - self.ir_arch.pc.name), - self.ir_arch.IRDst.size) + self.symb.symbols[self.ir_arch.IRDst] = ExprInt( + getattr(self.jitter.cpu, self.ir_arch.pc.name), + self.ir_arch.IRDst.size + ) # Avoid memory write self.symb.func_write = None @@ -188,7 +193,7 @@ class DSEEngine(object): self.jitter.exec_cb = self.callback # Clean jit cache to avoid multi-line basic blocks already jitted - self.jitter.jit.lbl2jitbloc.clear() + self.jitter.jit.clear_jitted_blocks() def attach(self, emulator): """Attach the DSE to @emulator @@ -215,9 +220,9 @@ class DSEEngine(object): self.prepare() def handle(self, cur_addr): - """Handle destination + r"""Handle destination @cur_addr: Expr of the next address in concrete execution - /!\ cur_addr may be a lbl_gen + /!\ cur_addr may be a loc_key In this method, self.symb is in the "just before branching" state """ @@ -295,6 +300,9 @@ class DSEEngine(object): # Call callbacks associated to the current address cur_addr = self.jitter.pc + if isinstance(cur_addr, LocKey): + lbl = self.ir_arch.loc_db.loc_key_to_label(cur_addr) + cur_addr = lbl.offset if cur_addr in self.handler: self.handler[cur_addr](self) @@ -312,24 +320,24 @@ class DSEEngine(object): # Get IR blocks if cur_addr in self.addr_to_cacheblocks: - self.ir_arch.blocks.clear() - self.ir_arch.blocks.update(self.addr_to_cacheblocks[cur_addr]) + self.ircfg.blocks.clear() + self.ircfg.blocks.update(self.addr_to_cacheblocks[cur_addr]) else: ## Reset cache structures - self.ir_arch.blocks.clear()# = {} + self.ircfg.blocks.clear()# = {} ## Update current state asm_block = self.mdis.dis_block(cur_addr) - self.ir_arch.add_block(asm_block) - self.addr_to_cacheblocks[cur_addr] = dict(self.ir_arch.blocks) + self.ir_arch.add_asmblock_to_ircfg(asm_block, self.ircfg) + self.addr_to_cacheblocks[cur_addr] = dict(self.ircfg.blocks) # Emulate the current instruction self.symb.reset_modified() # Is the symbolic execution going (potentially) to jump on a lbl_gen? - if len(self.ir_arch.blocks) == 1: - next_addr = self.symb.run_at(cur_addr) + if len(self.ircfg.blocks) == 1: + self.symb.run_at(self.ircfg, cur_addr) else: # Emulation could stuck in generated IR blocks # But concrete execution callback is not enough precise to obtain @@ -339,11 +347,16 @@ class DSEEngine(object): # Update the concrete execution self._update_state_from_concrete_symb(self.symb_concrete) while True: - next_addr_concrete = self.symb_concrete.run_block_at(cur_addr) - self.symb.run_block_at(cur_addr) - if not(expr_is_label(next_addr_concrete) and - next_addr_concrete.name.offset is None): + next_addr_concrete = self.symb_concrete.run_block_at( + self.ircfg, cur_addr + ) + self.symb.run_block_at(self.ircfg, cur_addr) + + if not (isinstance(next_addr_concrete, ExprLoc) and + self.ir_arch.loc_db.get_location_offset( + next_addr_concrete.loc_key + ) is None): # Not a lbl_gen, exit break @@ -351,6 +364,7 @@ class DSEEngine(object): self.handle(next_addr_concrete) cur_addr = next_addr_concrete + # At this stage, symbolic engine is one instruction after the concrete # engine @@ -428,7 +442,7 @@ class DSEEngine(object): symbexec.symbols[reg] = value def update_state_from_concrete(self, cpu=True, mem=False): - """Update the symbolic state with concrete values from the concrete + r"""Update the symbolic state with concrete values from the concrete engine @cpu: (optional) if set, update registers' value @@ -596,13 +610,19 @@ class DSEPathConstraint(DSEEngine): self.cur_solver.add(self.z3_trans.from_expr(cons)) def handle(self, cur_addr): + cur_addr = self.ir_arch.loc_db.canonize_to_exprloc(cur_addr) symb_pc = self.eval_expr(self.ir_arch.IRDst) possibilities = possible_values(symb_pc) cur_path_constraint = set() # path_constraint for the concrete path if len(possibilities) == 1: - assert next(iter(possibilities)).value == cur_addr + dst = next(iter(possibilities)).value + dst = self.ir_arch.loc_db.canonize_to_exprloc(dst) + assert dst == cur_addr else: for possibility in possibilities: + target_addr = self.ir_arch.loc_db.canonize_to_exprloc( + possibility.value + ) path_constraint = set() # Set of ExprAff for the possible path # Get constraint associated to the possible path @@ -642,11 +662,11 @@ class DSEPathConstraint(DSEEngine): "address 0x%x" % address) path_constraint.add(ExprAff(expr_mem, value)) - if possibility.value == cur_addr: + if target_addr == cur_addr: # Add path constraint cur_path_constraint = path_constraint - elif self.produce_solution(possibility.value): + elif self.produce_solution(target_addr): # Looking for a new solution self.cur_solver.push() for cons in path_constraint: @@ -657,8 +677,7 @@ class DSEPathConstraint(DSEEngine): result = self.cur_solver.check() if result == z3.sat: model = self.cur_solver.model() - self.handle_solution(model, possibility.value) + self.handle_solution(model, target_addr) self.cur_solver.pop() self.handle_correct_destination(cur_addr, cur_path_constraint) - |