diff options
Diffstat (limited to 'miasm2/analysis/dse.py')
| -rw-r--r-- | miasm2/analysis/dse.py | 50 |
1 files changed, 34 insertions, 16 deletions
diff --git a/miasm2/analysis/dse.py b/miasm2/analysis/dse.py index 427a8bd0..66caffc9 100644 --- a/miasm2/analysis/dse.py +++ b/miasm2/analysis/dse.py @@ -56,15 +56,14 @@ except ImportError: z3 = None from miasm2.expression.expression import ExprMem, ExprInt, ExprCompose, \ - ExprAff, ExprId + ExprAff, ExprId, ExprLoc, LocKey from miasm2.core.bin_stream import bin_stream_vm -from miasm2.core.asmblock import expr_is_label from miasm2.jitter.emulatedsymbexec import EmulatedSymbExec from miasm2.expression.expression_helper import possible_values from miasm2.ir.translators import Translator from miasm2.analysis.expression_range import expr_range from miasm2.analysis.modularintervals import ModularIntervals - +from miasm2.core.asmblock import AsmBlockBad DriftInfo = namedtuple("DriftInfo", ["symbol", "computed", "expected"]) @@ -72,7 +71,7 @@ class DriftException(Exception): """Raised when the emulation drift from the reference engine""" def __init__(self, info): - super(Exception, self).__init__() + super(DriftException, self).__init__() self.info = info def __str__(self): @@ -161,11 +160,14 @@ class DSEEngine(object): self.symb_concrete = None # Concrete SymbExec for path desambiguisation self.mdis = None # DisasmEngine + self.symbol_pool = self.ir_arch.symbol_pool + def prepare(self): """Prepare the environment for attachment with a jitter""" # Disassembler self.mdis = self.machine.dis_engine(bin_stream_vm(self.jitter.vm), - lines_wd=1) + lines_wd=1, + symbol_pool=self.symbol_pool) # Symbexec engine ## Prepare symbexec engines @@ -215,7 +217,7 @@ class DSEEngine(object): self.prepare() def handle(self, cur_addr): - """Handle destination + r"""Handle destination @cur_addr: Expr of the next address in concrete execution /!\ cur_addr may be a lbl_gen @@ -295,6 +297,9 @@ class DSEEngine(object): # Call callbacks associated to the current address cur_addr = self.jitter.pc + if isinstance(cur_addr, LocKey): + lbl = self.ir_arch.symbol_pool.loc_key_to_label(cur_addr) + cur_addr = lbl.offset if cur_addr in self.handler: self.handler[cur_addr](self) @@ -321,7 +326,8 @@ class DSEEngine(object): ## Update current state asm_block = self.mdis.dis_block(cur_addr) - self.ir_arch.add_block(asm_block) + if not isinstance(asm_block, AsmBlockBad): + self.ir_arch.add_block(asm_block) self.addr_to_cacheblocks[cur_addr] = dict(self.ir_arch.blocks) # Emulate the current instruction @@ -329,7 +335,7 @@ class DSEEngine(object): # Is the symbolic execution going (potentially) to jump on a lbl_gen? if len(self.ir_arch.blocks) == 1: - next_addr = self.symb.run_at(cur_addr) + self.symb.run_at(cur_addr) else: # Emulation could stuck in generated IR blocks # But concrete execution callback is not enough precise to obtain @@ -339,18 +345,25 @@ class DSEEngine(object): # Update the concrete execution self._update_state_from_concrete_symb(self.symb_concrete) while True: + next_addr_concrete = self.symb_concrete.run_block_at(cur_addr) self.symb.run_block_at(cur_addr) - if not(expr_is_label(next_addr_concrete) and - next_addr_concrete.name.offset is None): + if not (isinstance(next_addr_concrete, ExprLoc) and + self.ir_arch.symbol_pool.loc_key_to_offset( + next_addr_concrete + ) is None): # Not a lbl_gen, exit break + if self.symb.ir_arch.get_block(cur_addr) is None: + break + # Call handle with lbl_gen state self.handle(next_addr_concrete) cur_addr = next_addr_concrete + # At this stage, symbolic engine is one instruction after the concrete # engine @@ -428,7 +441,7 @@ class DSEEngine(object): symbexec.symbols[reg] = value def update_state_from_concrete(self, cpu=True, mem=False): - """Update the symbolic state with concrete values from the concrete + r"""Update the symbolic state with concrete values from the concrete engine @cpu: (optional) if set, update registers' value @@ -596,13 +609,19 @@ class DSEPathConstraint(DSEEngine): self.cur_solver.add(self.z3_trans.from_expr(cons)) def handle(self, cur_addr): + cur_addr = self.ir_arch.symbol_pool.canonize_to_exprloc(cur_addr) symb_pc = self.eval_expr(self.ir_arch.IRDst) possibilities = possible_values(symb_pc) cur_path_constraint = set() # path_constraint for the concrete path if len(possibilities) == 1: - assert next(iter(possibilities)).value == cur_addr + dst = next(iter(possibilities)).value + dst = self.ir_arch.symbol_pool.canonize_to_exprloc(dst) + assert dst == cur_addr else: for possibility in possibilities: + target_addr = self.ir_arch.symbol_pool.canonize_to_exprloc( + possibility.value + ) path_constraint = set() # Set of ExprAff for the possible path # Get constraint associated to the possible path @@ -642,11 +661,11 @@ class DSEPathConstraint(DSEEngine): "address 0x%x" % address) path_constraint.add(ExprAff(expr_mem, value)) - if possibility.value == cur_addr: + if target_addr == cur_addr: # Add path constraint cur_path_constraint = path_constraint - elif self.produce_solution(possibility.value): + elif self.produce_solution(target_addr): # Looking for a new solution self.cur_solver.push() for cons in path_constraint: @@ -657,8 +676,7 @@ class DSEPathConstraint(DSEEngine): result = self.cur_solver.check() if result == z3.sat: model = self.cur_solver.model() - self.handle_solution(model, possibility.value) + self.handle_solution(model, target_addr) self.cur_solver.pop() self.handle_correct_destination(cur_addr, cur_path_constraint) - |