diff options
| author | Theofilos Augoustis <theofilos.augoustis@gmail.com> | 2023-12-07 23:39:35 +0100 |
|---|---|---|
| committer | Theofilos Augoustis <theofilos.augoustis@gmail.com> | 2023-12-07 23:39:35 +0100 |
| commit | 0cf4f736fd5d7cd99f00d6c5896af9a608d2df8b (patch) | |
| tree | da605cad6f2afe01acadc04df2088e0952d9e6e2 | |
| parent | ffcae80c2167f271a7d733d424fbd72db8c98a93 (diff) | |
| download | focaccia-0cf4f736fd5d7cd99f00d6c5896af9a608d2df8b.tar.gz focaccia-0cf4f736fd5d7cd99f00d6c5896af9a608d2df8b.zip | |
Replace symbolic execution tools with Miasm
Refactor SymbolicTransform interface a bit to include transformations of memory content. Implement it for Miasm as a backend. Move all symbolic execution things out of the test script (`miasm_test.py`) and move them to `symbolic.py` to replace the angr-based algorithms.
| -rw-r--r-- | lldb_target.py | 48 | ||||
| -rw-r--r-- | miasm_test.py | 257 | ||||
| -rw-r--r-- | miasm_util.py | 20 | ||||
| -rw-r--r-- | symbolic.py | 392 |
4 files changed, 317 insertions, 400 deletions
diff --git a/lldb_target.py b/lldb_target.py index dd0d543..e016005 100644 --- a/lldb_target.py +++ b/lldb_target.py @@ -5,11 +5,18 @@ from angr.errors import SimConcreteMemoryError, \ from angr_targets.concrete import ConcreteTarget from angr_targets.memory_map import MemoryMap +from arch import x86 +from snapshot import ProgramState + class LLDBConcreteTarget(ConcreteTarget): - def __init__(self, executable: str, args: list[str] = []): - # Prepend the executable's path to argv, as is convention - args.insert(0, executable) + def __init__(self, executable: str, argv: list[str] = []): + """Construct an LLDB concrete target. Stop at entry. + + :param argv: The full argv array, including the executable's path as + the first argument (as is convention). + :raises RuntimeError: If the process is unable to launch. + """ self.debugger = lldb.SBDebugger.Create() self.debugger.SetAsync(False) self.target = self.debugger.CreateTargetWithFileAndArch(executable, @@ -21,7 +28,7 @@ class LLDBConcreteTarget(ConcreteTarget): self.error = lldb.SBError() self.listener = self.debugger.GetListener() self.process = self.target.Launch(self.listener, - args, None, None, + argv, None, None, None, None, None, 0, True, self.error) if not self.process.IsValid(): @@ -143,3 +150,36 @@ class LLDBConcreteTarget(ConcreteTarget): name if name is not None else '<none>', perms)) return mmap + +def record_snapshot(target: LLDBConcreteTarget) -> ProgramState: + """Record a concrete target's state in a ProgramState object. + + :param target: The target from which to query state. Currently assumes an + X86 target. + """ + state = ProgramState(x86.ArchX86()) + + # Query and store register state + rflags = x86.decompose_rflags(target.read_register('rflags')) + for regname in x86.regnames: + try: + conc_val = target.read_register(regname) + state.set(regname, conc_val) + except KeyError: + pass + except SimConcreteRegisterError: + if regname in rflags: + state.set(regname, rflags[regname]) + + # Query and store memory state + for mapping in target.get_mappings(): + assert(mapping.end_address > mapping.start_address) + size = mapping.end_address - mapping.start_address + try: + data = target.read_memory(mapping.start_address, size) + state.write_memory(mapping.start_address, data) + except SimConcreteMemoryError: + # Unable to read memory from mapping + pass + + return state diff --git a/miasm_test.py b/miasm_test.py index 36246af..97c23de 100644 --- a/miasm_test.py +++ b/miasm_test.py @@ -1,101 +1,33 @@ -import sys -import time +import argparse -from miasm.analysis.binary import ContainerELF -from miasm.analysis.machine import Machine -from miasm.arch.x86.sem import Lifter_X86_64 -from miasm.core.asmblock import AsmCFG -from miasm.core.locationdb import LocationDB -from miasm.ir.symbexec import SymbolicExecutionEngine, SymbolicState +from symbolic import collect_symbolic_trace -from arch import x86 -from lldb_target import LLDBConcreteTarget, SimConcreteMemoryError, \ - SimConcreteRegisterError -from miasm_util import MiasmConcreteState, eval_expr -from snapshot import ProgramState +def main(): + program = argparse.ArgumentParser() + program.add_argument('binary') + program.add_argument('argv', action='store', nargs=argparse.REMAINDER) + program.add_argument('--start-addr', + help='Instruction at which to start') + args = program.parse_args() -def print_blocks(asmcfg, file=sys.stdout): - print('=' * 80, file=file) - for block in asmcfg.blocks: - print(block, file=file) - print('-' * 60, file=file) - print('=' * 80, file=file) + binary = args.binary + argv = args.argv -def print_state(state: SymbolicState): - print('=' * 80) - for reg, val in state.iteritems(): - print(f'{str(reg):10s} = {val}') - print('=' * 80) - -def step_until(target: LLDBConcreteTarget, addr: int) -> list[int]: - """Step a concrete target to a specific instruction. - :return: Trace of all instructions executed. - """ - trace = [target.read_register('pc')] - target.step() - while not target.is_exited() and target.read_register('pc') != addr: - trace.append(target.read_register('pc')) - target.step() - return trace - -def create_state(target: LLDBConcreteTarget) -> ProgramState: - def standardize_flag_name(regname: str) -> str: - regname = regname.upper() - if regname in MiasmConcreteState.miasm_flag_aliases: - return MiasmConcreteState.miasm_flag_aliases[regname] - return regname - - state = ProgramState(x86.ArchX86()) - - # Query and store register state - rflags = x86.decompose_rflags(target.read_register('rflags')) - for regname in x86.regnames: - try: - conc_val = target.read_register(regname) - state.set(regname, conc_val) - except KeyError: - pass - except SimConcreteRegisterError: - regname = standardize_flag_name(regname) - if regname in rflags: - state.set(regname, rflags[regname]) - - # Query and store memory state - for mapping in target.get_mappings(): - assert(mapping.end_address > mapping.start_address) - size = mapping.end_address - mapping.start_address + pc = None + if args.start_addr: try: - data = target.read_memory(mapping.start_address, size) - state.write_memory(mapping.start_address, data) - except SimConcreteMemoryError: - # Unable to read memory from mapping - pass - - return state - -symb_exec_time = 0 -conc_exec_time = 0 -disasm_time = 0 - -total_time_start = time.perf_counter_ns() - -binary = sys.argv[1] + pc = int(args.start_addr, 16) + except ValueError: + print(f'Start address must be a hexadecimal number. Exiting.') + exit(1) -loc_db = LocationDB() -cont = ContainerELF.from_stream(open(binary, 'rb'), loc_db) -machine = Machine(cont.arch) + strace = collect_symbolic_trace(binary, [binary, *argv], pc) -pc = int(cont.entry_point) -if len(sys.argv) > 2: - pc = int(sys.argv[2], 16) + print(f'--- {len(strace)} instructions traced.') + print(f'--- No new PC found. Exiting.') -# Create disassembly/lifting context -mdis = machine.dis_engine(cont.bin_stream, loc_db=loc_db) -mdis.follow_call = True -asmcfg = AsmCFG(loc_db) - -lifter: Lifter_X86_64 = machine.lifter(loc_db) -ircfg = lifter.new_ircfg_from_asmcfg(asmcfg) +if __name__ == "__main__": + main() # TODO: To implement support for unimplemented instructions, add their # ASM->IR implementations to the `mnemo_func` array in @@ -103,148 +35,3 @@ ircfg = lifter.new_ircfg_from_asmcfg(asmcfg) # # For XGETBV, I might have to add the extended control register XCR0 first. # This might be a nontrivial patch to Miasm. - -def run_block(pc: int, conc_state: MiasmConcreteState) \ - -> tuple[int | None, list]: - """Run a basic block. - - Tries to run IR blocks until the end of an ASM block/basic block is - reached. Skips 'virtual' blocks that purely exist in the IR. - - :param pc: A program counter at which we start executing. - :param conc_state: A concrete reference state at `pc`. Used to resolve - symbolic program counters, i.e. to 'guide' the symbolic - execution on the correct path. This is the concrete part - of our concolic execution. - - :return: The next program counter. None if no next program counter can be - found. This happens when an error occurs or when the program - exits. - """ - global disasm_time - global symb_exec_time - - # Start with a clean, purely symbolic state - engine = SymbolicExecutionEngine(lifter) - - # A list of symbolic transformation for each single instruction - symb_trace = [] - - while True: - irblock = ircfg.get_block(pc) - - # Initial disassembly might not find all blocks in the binary. - # Disassemble code ad-hoc if the current PC has not yet been - # disassembled. - if irblock is None: - disasm_time_start = time.perf_counter_ns() - cfg = mdis.dis_multiblock(pc) - for irblock in cfg.blocks: - lifter.add_asmblock_to_ircfg(irblock, ircfg) - disasm_time += time.perf_counter_ns() - disasm_time_start - print(f'Disassembled {len(cfg.blocks):4} new blocks at {hex(int(pc))}.') - - irblock = ircfg.get_block(pc) - assert(irblock is not None) - - # Execute each instruction in the current basic block and record the - # resulting change in program state. - symb_exec_time_start = time.perf_counter_ns() - for assignblk in irblock: - modified = engine.eval_assignblk(assignblk) - symb_trace.append((assignblk.instr.offset, modified)) - - # Run a single instruction - engine.eval_updt_assignblk(assignblk) - - # Obtain the next program counter after the basic block. - symbolic_pc = engine.eval_expr(engine.lifter.IRDst) - - # The new program counter might be a symbolic value. Try to evaluate - # it based on the last recorded concrete state at the start of the - # current basic block. - pc = eval_expr(symbolic_pc, conc_state) - - symb_exec_time += time.perf_counter_ns() - symb_exec_time_start - - # If the resulting PC is an integer, i.e. a concrete address that can - # be mapped to the assembly code, we return as we have reached the end - # of a basic block. Otherwise we might have reached the end of an IR - # block, in which case we keep executing until we reach the end of an - # ASM block. - # - # Example: This happens for the REP STOS instruction, for which Miasm - # generates multiple IR blocks. - try: - return int(pc), symb_trace - except: - # We reach this point when the program counter is an IR block - # location (not an integer). That happens when single ASM - # instructions are translated to multiple IR instructions. - pass - -symb_trace = [] # The list of generated symbolic transforms per instruction - -conc_exec_time_start = time.perf_counter_ns() -target = LLDBConcreteTarget(binary) -initial_state = create_state(target) -conc_exec_time += time.perf_counter_ns() - conc_exec_time_start - -if target.read_register('pc') != pc: - target.set_breakpoint(pc) - target.run() - target.remove_breakpoint(pc) - -# Run until no more states can be reached -print(f'Re-tracing symbolically...') -while pc is not None: - assert(target.read_register('pc') == pc) - - # Run symbolic execution - # It uses the concrete state to resolve symbolic program counters to - # concrete values. - pc, strace = run_block(pc, MiasmConcreteState(initial_state, loc_db)) - - if pc is None: - break - - # Step concrete target forward. - # - # The concrete target now lags behind the symbolic execution by exactly - # one basic block: the one that we just executed. Run the concrete - # execution until it reaches the new PC. - conc_exec_time_start = time.perf_counter_ns() - ctrace = step_until(target, pc) - conc_exec_time += time.perf_counter_ns() - conc_exec_time_start - - # Sometimes, miasm generates ghost instructions at the end of basic blocks. - # Don't include them in the symbolic trace. - strace = strace[:len(ctrace)] - symb_trace.extend(strace) - - # Use this for extensive trace debugging - if [a for a, _ in strace] != ctrace: - print(f'[WARNING] Symbolic trace and concrete trace are not equal!' - f'\n symbolic: {[hex(a) for a, _ in strace]}' - f'\n concrete: {[hex(a) for a in ctrace]}') - - if target.is_exited(): - print(f'Next PC {hex(pc)} is not contained in the concrete trace.') - break - - # Query the new reference state for symbolic execution - conc_exec_time_start = time.perf_counter_ns() - initial_state = create_state(target) - conc_exec_time += time.perf_counter_ns() - conc_exec_time_start - -total_time = time.perf_counter_ns() - total_time_start -other_time = total_time - symb_exec_time - conc_exec_time - disasm_time - -print(f'--- {len(symb_trace)} instructions traced.') -print(f'--- No new PC found. Exiting.') -print() -print(f' Total time: {total_time * 1e-6:10.3f} ms') -print(f' Disassembly time: {disasm_time * 1e-6:10.3f} ms') -print(f' Symbolic execution time: {symb_exec_time * 1e-6:10.3f} ms') -print(f' Concrete execution time: {conc_exec_time * 1e-6:10.3f} ms') -print(f' Other: {other_time * 1e-6:10.3f} ms') diff --git a/miasm_util.py b/miasm_util.py index 55dfad0..3ceebea 100644 --- a/miasm_util.py +++ b/miasm_util.py @@ -1,3 +1,5 @@ +from typing import Callable + from miasm.core.locationdb import LocationDB, LocKey from miasm.expression.expression import Expr, ExprOp, ExprId, ExprLoc, \ ExprInt, ExprMem, ExprCompose, \ @@ -51,10 +53,21 @@ class MiasmConcreteState: def resolve_location(self, loc: LocKey) -> int | None: return self.loc_db.get_location_offset(loc) -def eval_expr(expr: Expr, conc_state: MiasmConcreteState) -> int: +def eval_expr(expr: Expr, conc_state: MiasmConcreteState): + """Evaluate a symbolic expression with regard to a concrete reference + state. + + :param expr: An expression to evaluate. + :param conc_state: The concrete reference state from which symbolic + register and memory state is resolved. + + :return: The most simplified and concrete representation of `expr` that + is possibly producible. May be either an `ExprInt` or an + `ExprLoc`. + """ # Most of these implementation are just copy-pasted members of # `SymbolicExecutionEngine`. - expr_to_visitor = { + expr_to_visitor: dict[type[Expr], Callable] = { ExprInt: _eval_exprint, ExprId: _eval_exprid, ExprLoc: _eval_exprloc, @@ -105,7 +118,8 @@ def _eval_exprmem(expr: ExprMem, state: MiasmConcreteState): addr = eval_expr(expr.ptr, state) ret = state.resolve_memory(int(addr), int(expr.size / 8)) assert(len(ret) * 8 == expr.size) - return ExprInt(int.from_bytes(ret, byteorder='little'), expr.size) + ival = ExprInt(int.from_bytes(ret, byteorder='little'), expr.size) + return ExprSlice(ival, 0, len(ret) * 8) def _eval_exprcond(expr, state: MiasmConcreteState): """Evaluate an ExprCond using the current state""" diff --git a/symbolic.py b/symbolic.py index 56857d7..a5e1f70 100644 --- a/symbolic.py +++ b/symbolic.py @@ -1,174 +1,250 @@ -"""Tools and utilities for symbolic execution with angr.""" - -import angr -import claripy as cp -from angr.exploration_techniques import Symbion - -from arch import Arch, x86 -from interpreter import eval as eval_symbol, SymbolResolver -from lldb_target import LLDBConcreteTarget - -def symbolize_state(state: angr.SimState, - arch: Arch = x86.ArchX86(), - exclude: list[str] = ['RIP', 'RBP', 'RSP'], - stack_name: str = 'stack', - stack_size: int = 0x10) \ - -> angr.SimState: - """Create a copy of a SimState and replace most of it with symbolic - values. - - Leaves pc, rbp, and rsp concrete by default. This can be configured with - the `exclude` parameter. Add the string 'stack' to the exclude list to - prevent stack memory from being replaced with a symbolic buffer. - - :return: A symbolized SymState object. - """ - _exclude = set(exclude) - state = state.copy() - - if stack_name not in _exclude: - symb_stack = cp.BVS(stack_name, stack_size * 8, explicit_name=True) - state.memory.store(state.regs.rsp - stack_size, symb_stack) - - for reg in arch.regnames: - if reg not in _exclude: - symb_val = cp.BVS(reg, 64, explicit_name=True) - try: - state.regs.__setattr__(reg.lower(), symb_val) - except AttributeError: - pass - return state +"""Tools and utilities for symbolic execution with Miasm.""" + +from miasm.analysis.binary import ContainerELF +from miasm.analysis.machine import Machine +from miasm.core.asmblock import AsmCFG +from miasm.core.locationdb import LocationDB +from miasm.ir.symbexec import SymbolicExecutionEngine +from miasm.expression.expression import Expr, ExprId, ExprMem + +from lldb_target import LLDBConcreteTarget, record_snapshot +from miasm_util import MiasmConcreteState, eval_expr +from snapshot import ProgramState class SymbolicTransform: + def __init__(self, from_addr: int, to_addr: int): + self.addr = from_addr + self.range = (from_addr, to_addr) + + def calc_register_transform(self, conc_state: ProgramState) \ + -> dict[str, int]: + raise NotImplementedError('calc_register_transform is abstract.') + + def calc_memory_transform(self, conc_state: ProgramState) \ + -> dict[int, bytes]: + raise NotImplementedError('calc_memory_transform is abstract.') + +class MiasmSymbolicTransform(SymbolicTransform): def __init__(self, - state: angr.SimState, - first_inst: int, - last_inst: int, - end_inst: int): + transform: dict[ExprId, Expr], + loc_db: LocationDB, + start_addr: int, + end_addr: int): """ :param state: The symbolic transformation in the form of a SimState object. - :param first_inst: An instruction address. The transformation operates - on the program state *before* this instruction is - executed. - :param last_inst: An instruction address. The last instruction that - is included in the transformation. This may be equal - to `prev_state` if the `SymbolicTransform` - represents the work done by a single instruction. - The transformation includes all instructions in the - range `[first_inst, last_inst]` (note the inclusive - right bound) of the specific program trace. - :param end_inst: An instruction address. The address of the *next* - instruction executed on the state that results from - the transformation. + :param first_inst: An instruction address. The transformation + represents the modifications to the program state + performed by this instruction. """ - self.state = state - self.start_addr = first_inst - self.last_inst = last_inst - self.end_addr = end_inst - - def eval_register_transform(self, regname: str, resolver: SymbolResolver): - """ - :param regname: Name of the register to evaluate. - :param resolver: A provider for the values to be plugged into the - symbolic equation. - - :raise angr.SimConcreteRegisterError: If the state contains no register - named `regname`. - """ - return eval_symbol(resolver, self.state.regs.get(regname)) + super().__init__(start_addr, end_addr) + + self.regs_diff: dict[str, Expr] = {} + self.mem_diff: dict[ExprMem, Expr] = {} + for id, expr in transform.items(): + if isinstance(id, ExprMem): + self.mem_diff[id.ptr] = expr + elif id.name != 'IRDst': + assert(isinstance(id, ExprId)) + self.regs_diff[id.name] = expr + + self.loc_db = loc_db + + def calc_register_transform(self, conc_state: ProgramState) \ + -> dict[str, int]: + ref_state = MiasmConcreteState(conc_state, self.loc_db) + + res = {} + for regname, expr in self.regs_diff.items(): + res[regname] = eval_expr(expr, ref_state) + return res + + def calc_memory_transform(self, conc_state: ProgramState) \ + -> dict[int, bytes]: + ref_state = MiasmConcreteState(conc_state, self.loc_db) + + res = {} + for addr, expr in self.mem_diff.items(): + addr = int(eval_expr(addr, ref_state)) + length = int(expr.size / 8) + res[addr] = int(eval_expr(expr, ref_state)).to_bytes(length) + return res def __repr__(self) -> str: - return f'Symbolic state transformation: \ - {hex(self.start_addr)} -> {hex(self.end_addr)}' + return f'Symbolic state transformation for instruction \ + {hex(self.addr)}.' -def collect_symbolic_trace(binary: str, trace: list[int]) \ - -> list[SymbolicTransform]: +def _step_until(target: LLDBConcreteTarget, addr: int) -> list[int]: + """Step a concrete target to a specific instruction. + :return: Trace of all instructions executed. + """ + trace = [target.read_register('pc')] + target.step() + while not target.is_exited() and target.read_register('pc') != addr: + trace.append(target.read_register('pc')) + target.step() + return trace + +def _run_block(pc: int, conc_state: MiasmConcreteState, lifter, ircfg, mdis) \ + -> tuple[int | None, list]: + """Run a basic block. + + Tries to run IR blocks until the end of an ASM block/basic block is + reached. Skips 'virtual' blocks that purely exist in the IR. + + :param pc: A program counter at which we start executing. + :param conc_state: A concrete reference state at `pc`. Used to resolve + symbolic program counters, i.e. to 'guide' the symbolic + execution on the correct path. This is the concrete part + of our concolic execution. + + :return: The next program counter. None if no next program counter can be + found. This happens when an error occurs or when the program + exits. + """ + global disasm_time + global symb_exec_time + + # Start with a clean, purely symbolic state + engine = SymbolicExecutionEngine(lifter) + + # A list of symbolic transformation for each single instruction + symb_trace = [] + + while True: + irblock = ircfg.get_block(pc) + + # Initial disassembly might not find all blocks in the binary. + # Disassemble code ad-hoc if the current PC has not yet been + # disassembled. + if irblock is None: + cfg = mdis.dis_multiblock(pc) + for asmblock in cfg.blocks: + try: + lifter.add_asmblock_to_ircfg(asmblock, ircfg) + except NotImplementedError as err: + print(f'[ERROR] Unable to disassemble block at' + f' {hex(asmblock.get_range()[0])}:' + f' [Not implemented] {err}') + pass + + irblock = ircfg.get_block(pc) + if irblock is None: + print(f'[ERROR] Unable to disassemble block(s) at {hex(pc)}.') + raise RuntimeError() + print(f'Disassembled {len(cfg.blocks):4} new blocks at {hex(int(pc))}.') + + # Execute each instruction in the current basic block and record the + # resulting change in program state. + for assignblk in irblock: + modified = engine.eval_assignblk(assignblk) + symb_trace.append((assignblk.instr.offset, modified)) + + # Run a single instruction + engine.eval_updt_assignblk(assignblk) + + # Obtain the next program counter after the basic block. + symbolic_pc = engine.eval_expr(engine.lifter.IRDst) + + # The new program counter might be a symbolic value. Try to evaluate + # it based on the last recorded concrete state at the start of the + # current basic block. + pc = eval_expr(symbolic_pc, conc_state) + + # If the resulting PC is an integer, i.e. a concrete address that can + # be mapped to the assembly code, we return as we have reached the end + # of a basic block. Otherwise we might have reached the end of an IR + # block, in which case we keep executing until we reach the end of an + # ASM block. + # + # Example: This happens for the REP STOS instruction, for which Miasm + # generates multiple IR blocks. + try: + return int(pc), symb_trace + except: + # We reach this point when the program counter is an IR block + # location (not an integer). That happens when single ASM + # instructions are translated to multiple IR instructions. + pass + +def collect_symbolic_trace(binary: str, + argv: list[str], + start_addr: int | None = None + ) -> list[SymbolicTransform]: """Execute a program and compute state transformations between executed instructions. :param binary: The binary to trace. - :param trace: A program trace that symbolic execution shall follow. """ - target = LLDBConcreteTarget(binary) - proj = angr.Project(binary, - concrete_target=target, - use_sim_procedures=False) - - entry_state = proj.factory.entry_state(addr=trace[0]) - entry_state.options.add(angr.options.SYMBION_KEEP_STUBS_ON_SYNC) - entry_state.options.add(angr.options.SYMBION_SYNC_CLE) - - # We keep a history of concrete states at their addresses because of the - # backtracking approach described below. - concrete_states = {} - - # All recorded symbolic transformations - result = [] - - for (cur_idx, cur_inst), next_inst in zip(enumerate(trace[0:-1]), trace[1:]): - # The last instruction included in the generated transformation - last_inst = cur_inst - - symbion = proj.factory.simgr(entry_state) - symbion.use_technique(Symbion(find=[cur_inst])) - - try: - if cur_inst != entry_state.addr: - conc_exploration = symbion.run() - else: - symbion.move('active', 'found') - conc_exploration = symbion - except angr.AngrError as err: - print(f'Angr error: {err} Returning partial result.') - return result - conc_state = conc_exploration.found[0] - entry_state = conc_state - - concrete_states[conc_state.addr] = conc_state.copy() - - # Start symbolic execution with the concrete ('truth') state and try - # to reach the next instruction in the trace - # - # -- Notes -- - # It does not even work when I feed the entirely concrete state - # `conc_state` that I receive from Symbion into the symbolic simgr. - simgr = proj.factory.simgr(symbolize_state(conc_state)) - symb_exploration = simgr.explore(find=next_inst) - - # Symbolic execution can't handle starting at some jump instructions. - # When this occurs, we re-start symbolic execution at an earlier - # instruction. - # - # Example: - # 0x401155 cmp -0x4(%rbp),%eax - # 0x401158 jle 0x401162 - # ... - # 0x401162 addl $0x1337,-0xc(%rbp) + loc_db = LocationDB() + with open(binary, 'rb') as bin_file: + cont = ContainerELF.from_stream(bin_file, loc_db) + machine = Machine(cont.arch) + + # Create disassembly/lifting context + mdis = machine.dis_engine(cont.bin_stream, loc_db=loc_db) + mdis.follow_call = True + asmcfg = AsmCFG(loc_db) + + lifter = machine.lifter(loc_db) + ircfg = lifter.new_ircfg_from_asmcfg(asmcfg) + + if start_addr is None: + pc = cont.entry_point + else: + pc = start_addr + + target = LLDBConcreteTarget(binary, argv) + if target.read_register('pc') != pc: + target.set_breakpoint(pc) + target.run() + target.remove_breakpoint(pc) + + symb_trace = [] # The resulting list of symbolic transforms per instruction + + # Run until no more states can be reached + initial_state = record_snapshot(target) + while pc is not None: + assert(target.read_register('pc') == pc) + + # Run symbolic execution + # It uses the concrete state to resolve symbolic program counters to + # concrete values. + pc, strace = _run_block( + pc, MiasmConcreteState(initial_state, loc_db), + lifter, ircfg, mdis) + + if pc is None: + break + + # Step concrete target forward. # - # Here, symbolic execution can't find a valid state at `0x401162` when - # starting at `0x401158`, but it finds it successfully when starting at - # `0x401155`. - while len(symb_exploration.found) == 0 and cur_idx > 0: - print(f'[INFO] Symbolic execution can\'t reach address' - f' {hex(next_inst)} from {hex(cur_inst)}.' - f' Attempting to reach it from {hex(trace[cur_idx - 1])}...') - cur_idx -= 1 - cur_inst = trace[cur_idx] - conc_state = concrete_states[cur_inst] - simgr = proj.factory.simgr(symbolize_state(conc_state)) - symb_exploration = simgr.explore(find=next_inst) - - if len(symb_exploration.found) == 0: - print(f'Symbolic execution can\'t reach address {hex(next_inst)}.' - ' Exiting.') - exit(1) - - result.append(SymbolicTransform( - symb_exploration.found[0], - cur_inst, - last_inst, - next_inst - )) - - return result + # The concrete target now lags behind the symbolic execution by exactly + # one basic block: the one that we just executed. Run the concrete + # execution until it reaches the new PC. + ctrace = _step_until(target, pc) + + # Sometimes, miasm generates ghost instructions at the end of basic + # blocks. Don't include them in the symbolic trace. + strace = strace[:len(ctrace)] + symb_trace.extend(strace) + + # Use this for extensive trace debugging + if [a for a, _ in strace] != ctrace: + print(f'[WARNING] Symbolic trace and concrete trace are not equal!' + f'\n symbolic: {[hex(a) for a, _ in strace]}' + f'\n concrete: {[hex(a) for a in ctrace]}') + + if target.is_exited(): + break + + # Query the new reference state for symbolic execution + initial_state = record_snapshot(target) + + res = [] + for (start, diff), (end, _) in zip(symb_trace[:-1], symb_trace[1:]): + res.append(MiasmSymbolicTransform(diff, loc_db, start, end)) + start, diff = symb_trace[-1] + res.append(MiasmSymbolicTransform(diff, loc_db, start, start)) + + return res |