diff options
Diffstat (limited to 'miasm2/analysis/dse.py')
| -rw-r--r-- | miasm2/analysis/dse.py | 99 |
1 files changed, 61 insertions, 38 deletions
diff --git a/miasm2/analysis/dse.py b/miasm2/analysis/dse.py index 1a704f61..290015ea 100644 --- a/miasm2/analysis/dse.py +++ b/miasm2/analysis/dse.py @@ -45,14 +45,6 @@ Here are a few remainings TODO: MAX_MEMORY_INJECT). One possible solution, not yet implemented, is to call the solver for reducing the possible values thanks to its accumulated constraints. - - - the symbolic execution could stuck on generated label. When path constraints - are computed, the destination taken is required to obtain corresponding - assumptions, but cannot be directly obtained from the concrete execution (the - jitter has not enough granularity). One solution is to simulate a concrete - execution thanks to a symbolic engine in which all symbols are concrete, and - get step by step which path has been followed. - """ from collections import namedtuple @@ -62,6 +54,7 @@ import z3 from miasm2.expression.expression import ExprMem, ExprInt, ExprCompose, \ ExprAff, ExprId from miasm2.core.bin_stream import bin_stream_vm +from miasm2.core.asmblock import expr_is_label from miasm2.jitter.emulatedsymbexec import EmulatedSymbExec from miasm2.expression.expression_helper import possible_values from miasm2.ir.translators import Translator @@ -160,6 +153,7 @@ class DSEEngine(object): # Defined after attachment self.jitter = None # Jitload (concrete execution) self.symb = None # SymbolicExecutionEngine + self.symb_concrete = None # Concrete SymbExec for path desambiguisation self.mdis = None # DisasmEngine def prepare(self): @@ -169,10 +163,12 @@ class DSEEngine(object): lines_wd=1) # Symbexec engine - ## Prepare the symbexec engine + ## Prepare symbexec engines self.symb = self.SYMB_ENGINE(self.jitter.cpu, self.jitter.vm, self.ir_arch, {}) self.symb.enable_emulated_simplifications() + self.symb_concrete = EmulatedSymbExec(self.jitter.cpu, self.jitter.vm, + self.ir_arch, {}) ## Update registers value self.symb.symbols[self.ir_arch.IRDst] = ExprInt(getattr(self.jitter.cpu, @@ -194,7 +190,10 @@ class DSEEngine(object): def handle(self, cur_addr): """Handle destination - @cur_addr: address of the next instruction in concrete execution + @cur_addr: Expr of the next address in concrete execution + /!\ cur_addr may be a lbl_gen + + In this method, self.symb is in the "just before branching" state """ pass @@ -265,7 +264,10 @@ class DSEEngine(object): def callback(self, _): """Called before each instruction""" + # Assert synchronization with concrete execution self._check_state() + + # Call callbacks associated to the current address cur_addr = self.jitter.pc if cur_addr in self.handler: @@ -275,12 +277,14 @@ class DSEEngine(object): if cur_addr in self.instrumentation: self.instrumentation[cur_addr](self) - self.handle(cur_addr) + # Handle current address + self.handle(ExprInt(cur_addr, self.ir_arch.IRDst.size)) + # Avoid memory issue in ExpressionSimplifier if len(self.symb.expr_simp.simplified_exprs) > 100000: self.symb.expr_simp.simplified_exprs.clear() - # Update state + # Get IR blocks if cur_addr in self.addr_to_cacheblocks: self.ir_arch.blocks.clear() self.ir_arch.blocks.update(self.addr_to_cacheblocks[cur_addr]) @@ -295,8 +299,32 @@ class DSEEngine(object): self.ir_arch.add_bloc(asm_block) self.addr_to_cacheblocks[cur_addr] = dict(self.ir_arch.blocks) + # Emulate the current instruction self.symb.reset_modified() - self.symb.emul_ir_blocks(cur_addr) + + # Is the symbolic execution going (potentially) to jump on a lbl_gen? + if len(self.ir_arch.blocks) == 1: + next_addr = self.symb.emul_ir_blocks(cur_addr) + else: + # Emulation could stuck in generated IR blocks + # But concrete execution callback is not enough precise to obtain + # the full IR blocks path + # -> Use a fully concrete execution to get back path + + # Update the concrete execution + self._update_state_from_concrete_symb(self.symb_concrete) + while True: + next_addr_concrete = self.symb_concrete.emul_ir_block(cur_addr) + self.symb.emul_ir_block(cur_addr) + + if not(expr_is_label(next_addr_concrete) and + next_addr_concrete.name.offset is None): + # Not a lbl_gen, exit + break + + # Call handle with lbl_gen state + self.handle(next_addr_concrete) + cur_addr = next_addr_concrete # At this stage, symbolic engine is one instruction after the concrete # engine @@ -348,6 +376,20 @@ class DSEEngine(object): for dst, src in assignblk.iteritems(): self.symb.apply_change(dst, src) + def _update_state_from_concrete_symb(self, symbexec, cpu=True, mem=False): + if mem: + # Values will be retrieved from the concrete execution if they are + # not present + for symbol in symbexec.symbols.symbols_mem.copy(): + del symbexec.symbols[symbol] + if cpu: + regs = self.ir_arch.arch.regs.attrib_to_regs[self.ir_arch.attrib] + for reg in regs: + if hasattr(self.jitter.cpu, reg.name): + value = ExprInt(getattr(self.jitter.cpu, reg.name), + size=reg.size) + symbexec.symbols[reg] = value + def update_state_from_concrete(self, cpu=True, mem=False): """Update the symbolic state with concrete values from the concrete engine @@ -359,18 +401,7 @@ class DSEEngine(object): This function is usually called when states are no more synchronized (at the beginning, returning from an unstubbed syscall, ...) """ - if mem: - # Values will be retrieved from the concrete execution if they are - # not present - for symbol in self.symb.symbols.symbols_mem.copy(): - del self.symb.symbols[symbol] - if cpu: - regs = self.ir_arch.arch.regs.attrib_to_regs[self.ir_arch.attrib] - for reg in regs: - if hasattr(self.jitter.cpu, reg.name): - value = ExprInt(getattr(self.jitter.cpu, reg.name), - size=reg.size) - self.symb.symbols[reg] = value + self._update_state_from_concrete_symb(self.symb, cpu, mem) def eval_expr(self, expr): """Return the evaluation of @expr: @@ -445,15 +476,13 @@ class DSEPathConstraint(DSEEngine): self.new_solutions.setdefault(destination, set()).add(model) def handle(self, cur_addr): - """Handle destination - @cur_addr: address of the next instruction in concrete execution - """ symb_pc = self.eval_expr(self.ir_arch.IRDst) - if symb_pc.is_int(): - assert int(symb_pc) == cur_addr + possibilities = possible_values(symb_pc) + if len(possibilities) == 1: + assert next(iter(possibilities)).value == cur_addr else: cur_path_constraint = set() # path_constraint for the concrete path - for possibility in possible_values(symb_pc): + for possibility in possibilities: path_constraint = set() # Set of ExprAff for the possible path # Get constraint associated to the possible path @@ -493,13 +522,7 @@ class DSEPathConstraint(DSEEngine): "address 0x%x" % address) path_constraint.add(ExprAff(expr_mem, value)) - if possibility.value.is_id(): - # get stuck in lbl_gen - # TODO resolve with a more detailed concrete execution - raise RuntimeError("Not implemented: stuck in a generated "\ - "IR block") - - elif int(possibility.value.arg) == cur_addr: + if possibility.value == cur_addr: # Add path constraint cur_path_constraint = path_constraint |