diff options
Diffstat (limited to 'miasm2/analysis')
| -rw-r--r-- | miasm2/analysis/depgraph.py | 54 | ||||
| -rw-r--r-- | miasm2/analysis/dse.py | 50 |
2 files changed, 61 insertions, 43 deletions
diff --git a/miasm2/analysis/depgraph.py b/miasm2/analysis/depgraph.py index f7949c88..49368508 100644 --- a/miasm2/analysis/depgraph.py +++ b/miasm2/analysis/depgraph.py @@ -1,8 +1,7 @@ """Provide dependency graph""" -import miasm2.expression.expression as m2_expr +from miasm2.expression.expression import ExprInt, ExprLoc, ExprAff from miasm2.core.graph import DiGraph -from miasm2.core.asmblock import AsmLabel, expr_is_int_or_label, expr_is_label from miasm2.expression.simplifications import expr_simp from miasm2.ir.symbexec import SymbolicExecutionEngine from miasm2.ir.ir import IRBlock, AssignBlock @@ -61,7 +60,7 @@ class DependencyNode(object): def __str__(self): """Returns a string representation of DependencyNode""" return "<%s %s %s %s>" % (self.__class__.__name__, - self.label.name, self.element, + self.label, self.element, self.line_nb) def __repr__(self): @@ -297,9 +296,10 @@ class DependencyResult(DependencyState): line_nb).assignblks # Eval the block - temp_label = AsmLabel("Temp") + symbol_pool = AsmSymbolPool() + temp_label = symbol_pool.getby_name_create("Temp") symb_exec = SymbolicExecutionEngine(self._ira, ctx_init) - symb_exec.eval_updt_irblock(IRBlock(temp_label, assignblks), step=step) + symb_exec.eval_updt_irblock(IRBlock(temp_label.loc_key, assignblks), step=step) # Return only inputs values (others could be wrongs) return {element: symb_exec.symbols[element] @@ -314,30 +314,31 @@ class DependencyResultImplicit(DependencyResult): # Z3 Solver instance _solver = None - unsat_expr = m2_expr.ExprAff(m2_expr.ExprInt(0, 1), - m2_expr.ExprInt(1, 1)) + unsat_expr = ExprAff(ExprInt(0, 1), ExprInt(1, 1)) def _gen_path_constraints(self, translator, expr, expected): """Generate path constraint from @expr. Handle special case with generated labels """ out = [] - expected_is_label = expr_is_label(expected) + expected = self._ira.symbol_pool.canonize_to_exprloc(expected) + expected_is_label = expected.is_label() for consval in possible_values(expr): - if (expected_is_label and - consval.value != expected): + value = self._ira.symbol_pool.canonize_to_exprloc(consval.value) + if expected_is_label and value != expected: continue - if (not expected_is_label and - expr_is_label(consval.value)): + if not expected_is_label and value.is_label(): continue conds = z3.And(*[translator.from_expr(cond.to_constraint()) for cond in consval.constraints]) - if expected != consval.value: - conds = z3.And(conds, - translator.from_expr( - m2_expr.ExprAff(consval.value, - expected))) + if expected != value: + conds = z3.And( + conds, + translator.from_expr( + ExprAff(value, + expected)) + ) out.append(conds) if out: @@ -373,10 +374,8 @@ class DependencyResultImplicit(DependencyResult): # Add constraint if hist_nb < history_size: next_label = history[hist_nb] - expected = symb_exec.eval_expr(m2_expr.ExprId(next_label, - size)) - solver.add( - self._gen_path_constraints(translator, dst, expected)) + expected = symb_exec.eval_expr(ExprLoc(next_label, size)) + solver.add(self._gen_path_constraints(translator, dst, expected)) # Save the solver self._solver = solver @@ -491,11 +490,11 @@ class DependencyGraph(object): @follow: set of nodes to follow @nofollow: set of nodes not to follow """ - if isinstance(expr, m2_expr.ExprId): + if expr.is_id(): follow.add(expr) - elif isinstance(expr, m2_expr.ExprInt): + elif expr.is_int(): nofollow.add(expr) - elif isinstance(expr, m2_expr.ExprMem): + elif expr.is_mem(): follow.add(expr) return expr @@ -508,7 +507,7 @@ class DependencyGraph(object): @follow_mem: force the visit of memory sub expressions @follow_call: force the visit of call sub expressions """ - if not follow_mem and isinstance(expr, m2_expr.ExprMem): + if not follow_mem and expr.is_mem(): nofollow.add(expr) return False if not follow_call and expr.is_function_call(): @@ -534,8 +533,9 @@ class DependencyGraph(object): """Do not follow labels""" follow = set() for expr in exprs: - if not expr_is_int_or_label(expr): - follow.add(expr) + if expr.is_int() or expr.is_label(): + continue + follow.add(expr) return follow, set() diff --git a/miasm2/analysis/dse.py b/miasm2/analysis/dse.py index 427a8bd0..66caffc9 100644 --- a/miasm2/analysis/dse.py +++ b/miasm2/analysis/dse.py @@ -56,15 +56,14 @@ except ImportError: z3 = None from miasm2.expression.expression import ExprMem, ExprInt, ExprCompose, \ - ExprAff, ExprId + ExprAff, ExprId, ExprLoc, LocKey from miasm2.core.bin_stream import bin_stream_vm -from miasm2.core.asmblock import expr_is_label from miasm2.jitter.emulatedsymbexec import EmulatedSymbExec from miasm2.expression.expression_helper import possible_values from miasm2.ir.translators import Translator from miasm2.analysis.expression_range import expr_range from miasm2.analysis.modularintervals import ModularIntervals - +from miasm2.core.asmblock import AsmBlockBad DriftInfo = namedtuple("DriftInfo", ["symbol", "computed", "expected"]) @@ -72,7 +71,7 @@ class DriftException(Exception): """Raised when the emulation drift from the reference engine""" def __init__(self, info): - super(Exception, self).__init__() + super(DriftException, self).__init__() self.info = info def __str__(self): @@ -161,11 +160,14 @@ class DSEEngine(object): self.symb_concrete = None # Concrete SymbExec for path desambiguisation self.mdis = None # DisasmEngine + self.symbol_pool = self.ir_arch.symbol_pool + def prepare(self): """Prepare the environment for attachment with a jitter""" # Disassembler self.mdis = self.machine.dis_engine(bin_stream_vm(self.jitter.vm), - lines_wd=1) + lines_wd=1, + symbol_pool=self.symbol_pool) # Symbexec engine ## Prepare symbexec engines @@ -215,7 +217,7 @@ class DSEEngine(object): self.prepare() def handle(self, cur_addr): - """Handle destination + r"""Handle destination @cur_addr: Expr of the next address in concrete execution /!\ cur_addr may be a lbl_gen @@ -295,6 +297,9 @@ class DSEEngine(object): # Call callbacks associated to the current address cur_addr = self.jitter.pc + if isinstance(cur_addr, LocKey): + lbl = self.ir_arch.symbol_pool.loc_key_to_label(cur_addr) + cur_addr = lbl.offset if cur_addr in self.handler: self.handler[cur_addr](self) @@ -321,7 +326,8 @@ class DSEEngine(object): ## Update current state asm_block = self.mdis.dis_block(cur_addr) - self.ir_arch.add_block(asm_block) + if not isinstance(asm_block, AsmBlockBad): + self.ir_arch.add_block(asm_block) self.addr_to_cacheblocks[cur_addr] = dict(self.ir_arch.blocks) # Emulate the current instruction @@ -329,7 +335,7 @@ class DSEEngine(object): # Is the symbolic execution going (potentially) to jump on a lbl_gen? if len(self.ir_arch.blocks) == 1: - next_addr = self.symb.run_at(cur_addr) + self.symb.run_at(cur_addr) else: # Emulation could stuck in generated IR blocks # But concrete execution callback is not enough precise to obtain @@ -339,18 +345,25 @@ class DSEEngine(object): # Update the concrete execution self._update_state_from_concrete_symb(self.symb_concrete) while True: + next_addr_concrete = self.symb_concrete.run_block_at(cur_addr) self.symb.run_block_at(cur_addr) - if not(expr_is_label(next_addr_concrete) and - next_addr_concrete.name.offset is None): + if not (isinstance(next_addr_concrete, ExprLoc) and + self.ir_arch.symbol_pool.loc_key_to_offset( + next_addr_concrete + ) is None): # Not a lbl_gen, exit break + if self.symb.ir_arch.get_block(cur_addr) is None: + break + # Call handle with lbl_gen state self.handle(next_addr_concrete) cur_addr = next_addr_concrete + # At this stage, symbolic engine is one instruction after the concrete # engine @@ -428,7 +441,7 @@ class DSEEngine(object): symbexec.symbols[reg] = value def update_state_from_concrete(self, cpu=True, mem=False): - """Update the symbolic state with concrete values from the concrete + r"""Update the symbolic state with concrete values from the concrete engine @cpu: (optional) if set, update registers' value @@ -596,13 +609,19 @@ class DSEPathConstraint(DSEEngine): self.cur_solver.add(self.z3_trans.from_expr(cons)) def handle(self, cur_addr): + cur_addr = self.ir_arch.symbol_pool.canonize_to_exprloc(cur_addr) symb_pc = self.eval_expr(self.ir_arch.IRDst) possibilities = possible_values(symb_pc) cur_path_constraint = set() # path_constraint for the concrete path if len(possibilities) == 1: - assert next(iter(possibilities)).value == cur_addr + dst = next(iter(possibilities)).value + dst = self.ir_arch.symbol_pool.canonize_to_exprloc(dst) + assert dst == cur_addr else: for possibility in possibilities: + target_addr = self.ir_arch.symbol_pool.canonize_to_exprloc( + possibility.value + ) path_constraint = set() # Set of ExprAff for the possible path # Get constraint associated to the possible path @@ -642,11 +661,11 @@ class DSEPathConstraint(DSEEngine): "address 0x%x" % address) path_constraint.add(ExprAff(expr_mem, value)) - if possibility.value == cur_addr: + if target_addr == cur_addr: # Add path constraint cur_path_constraint = path_constraint - elif self.produce_solution(possibility.value): + elif self.produce_solution(target_addr): # Looking for a new solution self.cur_solver.push() for cons in path_constraint: @@ -657,8 +676,7 @@ class DSEPathConstraint(DSEEngine): result = self.cur_solver.check() if result == z3.sat: model = self.cur_solver.model() - self.handle_solution(model, possibility.value) + self.handle_solution(model, target_addr) self.cur_solver.pop() self.handle_correct_destination(cur_addr, cur_path_constraint) - |