diff options
Diffstat (limited to 'miasm2/analysis/dse.py')
| -rw-r--r-- | miasm2/analysis/dse.py | 545 |
1 files changed, 545 insertions, 0 deletions
diff --git a/miasm2/analysis/dse.py b/miasm2/analysis/dse.py new file mode 100644 index 00000000..290015ea --- /dev/null +++ b/miasm2/analysis/dse.py @@ -0,0 +1,545 @@ +"""Dynamic symbolic execution module. + +Offers a way to have a symbolic execution along a concrete one. +Basically, this is done through DSEEngine class, with scheme: + +dse = DSEEngine(Machine("x86_32")) +dse.attach(jitter) + +The DSE state can be updated through: + + - .update_state_from_concrete: update the values from the CPU, so the symbolic + execution will be completely concrete from this point (until changes) + - .update_state: inject information, for instance RAX = symbolic_RAX + - .symbolize_memory: symbolize (using .memory_to_expr) memory areas (ie, + reading from an address in one of these areas yield a symbol) + +The DSE run can be instrumented through: + - .add_handler: register an handler, modifying the state instead of the current + execution. Can be used for stubbing external API + - .add_lib_handler: register handlers for libraries + - .add_instrumentation: register an handler, modifying the state but continuing + the current execution. Can be used for logging facilities + + +On branch, if the decision is symbolic, one can also collect "path constraints" +and inverse them to produce new inputs potentially reaching new paths. + +Basically, this is done through DSEPathConstraint. In order to produce a new +solution, one can extend this class, and override 'handle_solution' to +produce a solution which fit its needs. + +If one is only interested in constraints associated to its path, the option +"produce_solution" should be set to False, to speed up emulation. +The constraints are accumulated in the .z3_cur z3.Solver object. + +Here are a few remainings TODO: + - handle endianess in check_state / atomic read: currently, but this is also + true for others Miasm2 symbolic engines, the endianness is not take in + account, and assumed to be Little Endian + + - too many memory dependencies in constraint tracking: in order to let z3 find + new solution, it does need information on memory values (for instance, a + lookup in a table with a symbolic index). The estimated possible involved + memory location could be too large to pass to the solver (threshold named + MAX_MEMORY_INJECT). One possible solution, not yet implemented, is to call + the solver for reducing the possible values thanks to its accumulated + constraints. +""" + +from collections import namedtuple + +import z3 + +from miasm2.expression.expression import ExprMem, ExprInt, ExprCompose, \ + ExprAff, ExprId +from miasm2.core.bin_stream import bin_stream_vm +from miasm2.core.asmblock import expr_is_label +from miasm2.jitter.emulatedsymbexec import EmulatedSymbExec +from miasm2.expression.expression_helper import possible_values +from miasm2.ir.translators import Translator +from miasm2.analysis.expression_range import expr_range +from miasm2.analysis.modularintervals import ModularIntervals + + +DriftInfo = namedtuple("DriftInfo", ["symbol", "computed", "expected"]) + +class DriftException(Exception): + """Raised when the emulation drift from the reference engine""" + + def __init__(self, info): + super(Exception, self).__init__() + self.info = info + + def __str__(self): + if len(self.info) == 1: + return "Drift of %s: %s instead of %s" % ( + self.info[0].symbol, + self.info[0].computed, + self.info[0].expected, + ) + else: + return "Drift of:\n\t" + "\n\t".join("%s: %s instead of %s" % ( + dinfo.symbol, + dinfo.computed, + dinfo.expected) + for dinfo in self.info) + + +class ESETrackModif(EmulatedSymbExec): + """Extension of EmulatedSymbExec to be used by DSE engines + + Add the tracking of modified expressions, and the ability to symbolize + memory areas + """ + + def __init__(self, *args, **kwargs): + super(ESETrackModif, self).__init__(*args, **kwargs) + self.modified_expr = set() # Expr modified since the last reset + self.dse_memory_range = [] # List/Intervals of memory addresses to + # symbolize + self.dse_memory_to_expr = None # function(addr) -> Expr used to + # symbolize + + def _func_read(self, expr_mem): + assert expr_mem.arg.is_int() + dst_addr = int(expr_mem.arg) + + if not self.dse_memory_range: + # Trivial case (optimization) + return super(ESETrackModif, self)._func_read(expr_mem) + + # Split access in atomic accesses + out = [] + for addr in xrange(dst_addr, dst_addr + (expr_mem.size / 8)): + if addr in self.dse_memory_range: + # Symbolize memory access + out.append(self.dse_memory_to_expr(addr)) + else: + # Get concrete value + atomic_access = ExprMem(ExprInt(addr, expr_mem.arg.size), 8) + out.append(super(ESETrackModif, self)._func_read(atomic_access)) + + if len(out) == 1: + # Trivial case (optimization) + return out[0] + + # Simplify for constant merging (ex: {ExprInt(1, 8), ExprInt(2, 8)}) + return self.expr_simp(ExprCompose(*out)) + + def reset_modified(self): + """Reset modified expression tracker""" + self.modified_expr.clear() + + def apply_change(self, dst, src): + super(ESETrackModif, self).apply_change(dst, src) + self.modified_expr.add(dst) + + +class DSEEngine(object): + """Dynamic Symbolic Execution Engine + + This class aims to be overrided for each specific purpose + """ + SYMB_ENGINE = ESETrackModif + + def __init__(self, machine): + self.machine = machine + self.handler = {} # addr -> callback(DSEEngine instance) + self.instrumentation = {} # addr -> callback(DSEEngine instance) + self.addr_to_cacheblocks = {} # addr -> {label -> IRBlock} + self.ir_arch = self.machine.ir() # corresponding IR + + # Defined after attachment + self.jitter = None # Jitload (concrete execution) + self.symb = None # SymbolicExecutionEngine + self.symb_concrete = None # Concrete SymbExec for path desambiguisation + self.mdis = None # DisasmEngine + + def prepare(self): + """Prepare the environment for attachment with a jitter""" + # Disassembler + self.mdis = self.machine.dis_engine(bin_stream_vm(self.jitter.vm), + lines_wd=1) + + # Symbexec engine + ## Prepare symbexec engines + self.symb = self.SYMB_ENGINE(self.jitter.cpu, self.jitter.vm, + self.ir_arch, {}) + self.symb.enable_emulated_simplifications() + self.symb_concrete = EmulatedSymbExec(self.jitter.cpu, self.jitter.vm, + self.ir_arch, {}) + + ## Update registers value + self.symb.symbols[self.ir_arch.IRDst] = ExprInt(getattr(self.jitter.cpu, + self.ir_arch.pc.name), + self.ir_arch.IRDst.size) + + # Avoid memory write + self.symb.func_write = None + + # Activate callback on each instr + self.jitter.jit.set_options(max_exec_per_call=1, jit_maxline=1) + self.jitter.exec_cb = self.callback + + def attach(self, emulator): + """Attach the DSE to @emulator + @emulator: jitload (or API equivalent) instance""" + self.jitter = emulator + self.prepare() + + def handle(self, cur_addr): + """Handle destination + @cur_addr: Expr of the next address in concrete execution + /!\ cur_addr may be a lbl_gen + + In this method, self.symb is in the "just before branching" state + """ + pass + + def add_handler(self, addr, callback): + """Add a @callback for address @addr before any state update. + The state IS NOT updated after returning from the callback + @addr: int + @callback: func(dse instance)""" + self.handler[addr] = callback + + def add_lib_handler(self, libimp, namespace): + """Add search for handler based on a @libimp libimp instance + + Known functions will be looked by {name}_symb in the @namespace + """ + + # lambda cannot contain statement + def default_func(dse): + fname = "%s_symb" % libimp.fad2cname[dse.jitter.pc] + raise RuntimeError("Symbolic stub '%s' not found" % fname) + + for addr, fname in libimp.fad2cname.iteritems(): + fname = "%s_symb" % fname + func = namespace.get(fname, None) + if func is not None: + self.add_handler(addr, func) + else: + self.add_handler(addr, default_func) + + def add_instrumentation(self, addr, callback): + """Add a @callback for address @addr before any state update. + The state IS updated after returning from the callback + @addr: int + @callback: func(dse instance)""" + self.instrumentation[addr] = callback + + def _check_state(self): + """Check the current state against the concrete one""" + errors = [] # List of DriftInfo + + for symbol in self.symb.modified_expr: + # Do not consider PC + if symbol in [self.ir_arch.pc, self.ir_arch.IRDst]: + continue + + # Consider only concrete values + symb_value = self.eval_expr(symbol) + if not symb_value.is_int(): + continue + symb_value = int(symb_value) + + # Check computed values against real ones + if symbol.is_id(): + if hasattr(self.jitter.cpu, symbol.name): + value = getattr(self.jitter.cpu, symbol.name) + if value != symb_value: + errors.append(DriftInfo(symbol, symb_value, value)) + elif symbol.is_mem() and symbol.arg.is_int(): + value_chr = self.jitter.vm.get_mem(int(symbol.arg), + symbol.size / 8) + exp_value = int(value_chr[::-1].encode("hex"), 16) + if exp_value != symb_value: + errors.append(DriftInfo(symbol, symb_value, exp_value)) + + # Check for drift, and act accordingly + if errors: + raise DriftException(errors) + + def callback(self, _): + """Called before each instruction""" + # Assert synchronization with concrete execution + self._check_state() + + # Call callbacks associated to the current address + cur_addr = self.jitter.pc + + if cur_addr in self.handler: + self.handler[cur_addr](self) + return True + + if cur_addr in self.instrumentation: + self.instrumentation[cur_addr](self) + + # Handle current address + self.handle(ExprInt(cur_addr, self.ir_arch.IRDst.size)) + + # Avoid memory issue in ExpressionSimplifier + if len(self.symb.expr_simp.simplified_exprs) > 100000: + self.symb.expr_simp.simplified_exprs.clear() + + # Get IR blocks + if cur_addr in self.addr_to_cacheblocks: + self.ir_arch.blocks.clear() + self.ir_arch.blocks.update(self.addr_to_cacheblocks[cur_addr]) + else: + + ## Reset cache structures + self.mdis.job_done.clear() + self.ir_arch.blocks.clear()# = {} + + ## Update current state + asm_block = self.mdis.dis_bloc(cur_addr) + self.ir_arch.add_bloc(asm_block) + self.addr_to_cacheblocks[cur_addr] = dict(self.ir_arch.blocks) + + # Emulate the current instruction + self.symb.reset_modified() + + # Is the symbolic execution going (potentially) to jump on a lbl_gen? + if len(self.ir_arch.blocks) == 1: + next_addr = self.symb.emul_ir_blocks(cur_addr) + else: + # Emulation could stuck in generated IR blocks + # But concrete execution callback is not enough precise to obtain + # the full IR blocks path + # -> Use a fully concrete execution to get back path + + # Update the concrete execution + self._update_state_from_concrete_symb(self.symb_concrete) + while True: + next_addr_concrete = self.symb_concrete.emul_ir_block(cur_addr) + self.symb.emul_ir_block(cur_addr) + + if not(expr_is_label(next_addr_concrete) and + next_addr_concrete.name.offset is None): + # Not a lbl_gen, exit + break + + # Call handle with lbl_gen state + self.handle(next_addr_concrete) + cur_addr = next_addr_concrete + + # At this stage, symbolic engine is one instruction after the concrete + # engine + + return True + + def take_snapshot(self): + """Return a snapshot of the current state (including jitter state)""" + snapshot = { + "mem": self.jitter.vm.get_all_memory(), + "regs": self.jitter.cpu.get_gpreg(), + "symb": self.symb.symbols.copy() + } + return snapshot + + def restore_snapshot(self, snapshot, memory=True): + """Restore a @snapshot taken with .take_snapshot + @snapshot: .take_snapshot output + @memory: (optional) if set, also restore the memory + """ + # Restore memory + if memory: + self.jitter.vm.reset_memory_page_pool() + self.jitter.vm.reset_code_bloc_pool() + for addr, metadata in snapshot["mem"].iteritems(): + self.jitter.vm.add_memory_page(addr, + metadata["access"], + metadata["data"]) + + # Restore registers + self.jitter.pc = snapshot["regs"][self.ir_arch.pc.name] + self.jitter.cpu.set_gpreg(snapshot["regs"]) + + # Reset intern elements + self.jitter.vm.set_exception(0) + self.jitter.cpu.set_exception(0) + self.jitter.bs._atomic_mode = False + + # Reset symb exec + for key, _ in self.symb.symbols.items(): + del self.symb.symbols[key] + for expr, value in snapshot["symb"].items(): + self.symb.symbols[expr] = value + + def update_state(self, assignblk): + """From this point, assume @assignblk in the symbolic execution + @assignblk: AssignBlock/{dst -> src} + """ + for dst, src in assignblk.iteritems(): + self.symb.apply_change(dst, src) + + def _update_state_from_concrete_symb(self, symbexec, cpu=True, mem=False): + if mem: + # Values will be retrieved from the concrete execution if they are + # not present + for symbol in symbexec.symbols.symbols_mem.copy(): + del symbexec.symbols[symbol] + if cpu: + regs = self.ir_arch.arch.regs.attrib_to_regs[self.ir_arch.attrib] + for reg in regs: + if hasattr(self.jitter.cpu, reg.name): + value = ExprInt(getattr(self.jitter.cpu, reg.name), + size=reg.size) + symbexec.symbols[reg] = value + + def update_state_from_concrete(self, cpu=True, mem=False): + """Update the symbolic state with concrete values from the concrete + engine + + @cpu: (optional) if set, update registers' value + @mem: (optional) if set, update memory value + + /!\ all current states will be loss. + This function is usually called when states are no more synchronized + (at the beginning, returning from an unstubbed syscall, ...) + """ + self._update_state_from_concrete_symb(self.symb, cpu, mem) + + def eval_expr(self, expr): + """Return the evaluation of @expr: + @expr: Expr instance""" + return self.symb.eval_expr(expr) + + @staticmethod + def memory_to_expr(addr): + """Translate an address to its corresponding symbolic ID (8bits) + @addr: int""" + return ExprId("MEM_0x%x" % int(addr), 8) + + def symbolize_memory(self, memory_range): + """Register a range of memory addresses to symbolize + @memory_range: object with support of __in__ operation (intervals, list, + ...) + """ + self.symb.dse_memory_range = memory_range + self.symb.dse_memory_to_expr = self.memory_to_expr + + +class DSEPathConstraint(DSEEngine): + """Dynamic Symbolic Execution Engine keeping the path constraint + + Possible new "solutions" are produced along the path, by inversing concrete + path constraint. Thus, a "solution" is a potential initial context leading + to a new path. + + In order to produce a new solution, one can extend this class, and override + 'handle_solution' to produce a solution which fit its needs. + + If one is only interested in constraints associated to its path, the option + "produce_solution" should be set to False, to speed up emulation. + The constraints are accumulated in the .z3_cur z3.Solver object. + + """ + + # Maximum memory size to inject in constraints solving + MAX_MEMORY_INJECT = 0x10000 + + def __init__(self, machine, produce_solution=True, **kwargs): + """Init a DSEPathConstraint + @machine: Machine of the targeted architecture instance + @produce_solution: (optional) if set, new solutions will be computed""" + super(DSEPathConstraint, self).__init__(machine, **kwargs) + self.path_constraint = set() + self.produce_solution = produce_solution + self.cur_solver = z3.Solver() + self.new_solutions = {} # destination -> set of model + self.z3_trans = Translator.to_language("z3") + + def take_snapshot(self, *args, **kwargs): + snap = super(DSEPathConstraint, self).take_snapshot(*args, **kwargs) + snap["new_solutions"] = {dst: src.copy + for dst, src in self.new_solutions.iteritems()} + snap["cur_constraints"] = self.cur_solver.assertions() + return snap + + def restore_snapshot(self, snapshot, *args, **kwargs): + super(DSEPathConstraint, self).restore_snapshot(snapshot, *args, + **kwargs) + self.new_solutions.clear() + self.new_solutions.update(snapshot["new_solutions"]) + self.cur_solver = z3.Solver() + self.cur_solver.add(snapshot["cur_constraints"]) + + def handle_solution(self, model, destination): + """Called when a new solution for destination @destination is founded + @model: z3 model instance + @destination: Expr instance for an addr which is not on the DSE path + """ + self.new_solutions.setdefault(destination, set()).add(model) + + def handle(self, cur_addr): + symb_pc = self.eval_expr(self.ir_arch.IRDst) + possibilities = possible_values(symb_pc) + if len(possibilities) == 1: + assert next(iter(possibilities)).value == cur_addr + else: + cur_path_constraint = set() # path_constraint for the concrete path + for possibility in possibilities: + path_constraint = set() # Set of ExprAff for the possible path + + # Get constraint associated to the possible path + memory_to_add = ModularIntervals(symb_pc.size) + for cons in possibility.constraints: + eaff = cons.to_constraint() + # eaff.get_r(mem_read=True) is not enough + # ExprAff consider a Memory access in dst as a write + mem = eaff.dst.get_r(mem_read=True) + mem.update(eaff.src.get_r(mem_read=True)) + for expr in mem: + if expr.is_mem(): + addr_range = expr_range(expr.arg) + # At upper bounds, add the size of the memory access + # if addr (- [a, b], then @size[addr] reachables + # values are in @8[a, b + size[ + for start, stop in addr_range: + stop += (expr.size / 8) - 1 + full_range = ModularIntervals(symb_pc.size, + [(start, stop)]) + memory_to_add.update(full_range) + path_constraint.add(eaff) + + if memory_to_add.length > self.MAX_MEMORY_INJECT: + # TODO re-croncretize the constraint or z3-try + raise RuntimeError("Not implemented: too long memory area") + + # Inject memory + for start, stop in memory_to_add: + for address in xrange(start, stop + 1): + expr_mem = ExprMem(ExprInt(address, + self.ir_arch.pc.size), + 8) + value = self.eval_expr(expr_mem) + if not value.is_int(): + raise TypeError("Rely on a symbolic memory case, " \ + "address 0x%x" % address) + path_constraint.add(ExprAff(expr_mem, value)) + + if possibility.value == cur_addr: + # Add path constraint + cur_path_constraint = path_constraint + + elif self.produce_solution: + # Looking for a new solution + self.cur_solver.push() + for cons in path_constraint: + trans = self.z3_trans.from_expr(cons) + trans = z3.simplify(trans) + self.cur_solver.add(trans) + + result = self.cur_solver.check() + if result == z3.sat: + model = self.cur_solver.model() + self.handle_solution(model, possibility.value) + self.cur_solver.pop() + + # Update current solver + for cons in cur_path_constraint: + self.cur_solver.add(self.z3_trans.from_expr(cons)) |