diff options
| author | Theofilos Augoustis <theofilos.augoustis@gmail.com> | 2023-12-14 17:03:59 +0100 |
|---|---|---|
| committer | Theofilos Augoustis <theofilos.augoustis@gmail.com> | 2023-12-14 17:03:59 +0100 |
| commit | 194f3d6f2ebdc7b0631fdaaeb8451142b052ccb0 (patch) | |
| tree | 20806871433db331bdfed66ddadfadecbea2b7c4 /symbolic.py | |
| parent | 4a5584d8f69d8ff511285387971d8cbf803f16b7 (diff) | |
| download | focaccia-194f3d6f2ebdc7b0631fdaaeb8451142b052ccb0.tar.gz focaccia-194f3d6f2ebdc7b0631fdaaeb8451142b052ccb0.zip | |
Implement symbolic comparison and match traces via Miasm
Co-authored-by: Theofilos Augoustis <theofilos.augoustis@gmail.com> Co-authored-by: Nicola Crivellin <nicola.crivellin98@gmail.com>
Diffstat (limited to '')
| -rw-r--r-- | symbolic.py | 171 |
1 files changed, 115 insertions, 56 deletions
diff --git a/symbolic.py b/symbolic.py index 2a328fd..b005c5e 100644 --- a/symbolic.py +++ b/symbolic.py @@ -8,9 +8,10 @@ from miasm.analysis.machine import Machine from miasm.core.asmblock import AsmCFG from miasm.core.locationdb import LocationDB from miasm.ir.symbexec import SymbolicExecutionEngine +from miasm.ir.ir import IRBlock from miasm.expression.expression import Expr, ExprId, ExprMem, ExprInt -from lldb_target import LLDBConcreteTarget, record_snapshot +from lldb_target import LLDBConcreteTarget from miasm_util import MiasmConcreteState, eval_expr from snapshot import ProgramState from arch import Arch, supported_architectures @@ -36,11 +37,14 @@ class SymbolicTransform: -> dict[int, bytes]: raise NotImplementedError('calc_memory_transform is abstract.') + def __repr__(self) -> str: + start, end = self.range + return f'Symbolic state transformation {hex(start)} -> {hex(end)}' + class MiasmSymbolicTransform(SymbolicTransform): def __init__(self, - transform: dict[ExprId, Expr], + transform: dict[Expr, Expr], arch: Arch, - loc_db: LocationDB, start_addr: int, end_addr: int): """ @@ -55,6 +59,8 @@ class MiasmSymbolicTransform(SymbolicTransform): self.regs_diff: dict[str, Expr] = {} self.mem_diff: dict[ExprMem, Expr] = {} for dst, expr in transform.items(): + assert(isinstance(dst, ExprMem) or isinstance(dst, ExprId)) + if isinstance(dst, ExprMem): self.mem_diff[dst] = expr else: @@ -64,14 +70,16 @@ class MiasmSymbolicTransform(SymbolicTransform): self.regs_diff[regname] = expr self.arch = arch - self.loc_db = loc_db def concat(self, other: MiasmSymbolicTransform) -> Self: - class MiasmSymbolicState: + class MiasmSymbolicState(MiasmConcreteState): """Drop-in replacement for MiasmConcreteState in eval_expr that returns the current transform's symbolic equations instead of symbolic values. Calling eval_expr with this effectively nests the transformation into the concatenated transformation. + + We inherit from `MiasmSymbolicTransform` only for the purpose of + correct type checking. """ def __init__(self, transform: MiasmSymbolicTransform): self.transform = transform @@ -110,7 +118,9 @@ class MiasmSymbolicTransform(SymbolicTransform): def calc_register_transform(self, conc_state: ProgramState) \ -> dict[str, int]: - ref_state = MiasmConcreteState(conc_state, self.loc_db) + # Construct a dummy location DB. At this point, expressions should + # never contain IR locations. + ref_state = MiasmConcreteState(conc_state, LocationDB()) res = {} for regname, expr in self.regs_diff.items(): @@ -119,7 +129,9 @@ class MiasmSymbolicTransform(SymbolicTransform): def calc_memory_transform(self, conc_state: ProgramState) \ -> dict[int, bytes]: - ref_state = MiasmConcreteState(conc_state, self.loc_db) + # Construct a dummy location DB. At this point, expressions should + # never contain IR locations. + ref_state = MiasmConcreteState(conc_state, LocationDB()) res = {} for addr, expr in self.mem_diff.items(): @@ -149,8 +161,58 @@ def _step_until(target: LLDBConcreteTarget, addr: int) -> list[int]: target.step() return trace -def _run_block(pc: int, conc_state: MiasmConcreteState, lifter, ircfg, mdis) \ - -> tuple[int | None, list]: +class DisassemblyContext: + def __init__(self, binary): + self.loc_db = LocationDB() + + # Load the binary + with open(binary, 'rb') as bin_file: + cont = ContainerELF.from_stream(bin_file, self.loc_db) + + self.machine = Machine(cont.arch) + self.entry_point = cont.entry_point + + # Create disassembly/lifting context + self.lifter = self.machine.lifter(self.loc_db) + self.mdis = self.machine.dis_engine(cont.bin_stream, loc_db=self.loc_db) + self.mdis.follow_call = True + self.asmcfg = AsmCFG(self.loc_db) + self.ircfg = self.lifter.new_ircfg_from_asmcfg(self.asmcfg) + + def get_irblock(self, addr: int) -> IRBlock | None: + irblock = self.ircfg.get_block(addr) + + # Initial disassembly might not find all blocks in the binary. + # Disassemble code ad-hoc if the current address has not yet been + # disassembled. + if irblock is None: + cfg = self.mdis.dis_multiblock(addr) + for asmblock in cfg.blocks: + try: + self.lifter.add_asmblock_to_ircfg(asmblock, self.ircfg) + except NotImplementedError as err: + print(f'[WARNING] Unable to disassemble block at' + f' {hex(asmblock.get_range()[0])}:' + f' [Not implemented] {err}') + pass + print(f'Disassembled {len(cfg.blocks):5} new blocks at {hex(int(addr))}.') + irblock = self.ircfg.get_block(addr) + + # Might still be None if disassembly/lifting failed for the block + # at `addr`. + return irblock + +class DisassemblyError(Exception): + def __init__(self, + partial_trace: list[tuple[int, MiasmSymbolicTransform]], + faulty_pc: int, + err_msg: str): + self.partial_trace = partial_trace + self.faulty_pc = faulty_pc + self.err_msg = err_msg + +def _run_block(pc: int, conc_state: MiasmConcreteState, ctx: DisassemblyContext) \ + -> tuple[int | None, list[dict]]: """Run a basic block. Tries to run IR blocks until the end of an ASM block/basic block is @@ -166,37 +228,20 @@ def _run_block(pc: int, conc_state: MiasmConcreteState, lifter, ircfg, mdis) \ found. This happens when an error occurs or when the program exits. """ - global disasm_time - global symb_exec_time - # Start with a clean, purely symbolic state - engine = SymbolicExecutionEngine(lifter) + engine = SymbolicExecutionEngine(ctx.lifter) # A list of symbolic transformation for each single instruction symb_trace = [] while True: - irblock = ircfg.get_block(pc) - - # Initial disassembly might not find all blocks in the binary. - # Disassemble code ad-hoc if the current PC has not yet been - # disassembled. + irblock = ctx.get_irblock(pc) if irblock is None: - cfg = mdis.dis_multiblock(pc) - for asmblock in cfg.blocks: - try: - lifter.add_asmblock_to_ircfg(asmblock, ircfg) - except NotImplementedError as err: - print(f'[ERROR] Unable to disassemble block at' - f' {hex(asmblock.get_range()[0])}:' - f' [Not implemented] {err}') - pass - - irblock = ircfg.get_block(pc) - if irblock is None: - print(f'[ERROR] Unable to disassemble block(s) at {hex(pc)}.') - raise RuntimeError() - print(f'Disassembled {len(cfg.blocks):4} new blocks at {hex(int(pc))}.') + raise DisassemblyError( + symb_trace, + pc, + f'[ERROR] Unable to disassemble block at {hex(pc)}.' + ) # Execute each instruction in the current basic block and record the # resulting change in program state. @@ -240,27 +285,17 @@ def collect_symbolic_trace(binary: str, :param binary: The binary to trace. """ - loc_db = LocationDB() - with open(binary, 'rb') as bin_file: - cont = ContainerELF.from_stream(bin_file, loc_db) - machine = Machine(cont.arch) + ctx = DisassemblyContext(binary) # Find corresponding architecture - if machine.name not in supported_architectures: - print(f'[ERROR] {machine.name} is not supported. Returning.') + mach_name = ctx.machine.name + if mach_name not in supported_architectures: + print(f'[ERROR] {mach_name} is not supported. Returning.') return [] - arch = supported_architectures[machine.name] - - # Create disassembly/lifting context - mdis = machine.dis_engine(cont.bin_stream, loc_db=loc_db) - mdis.follow_call = True - asmcfg = AsmCFG(loc_db) - - lifter = machine.lifter(loc_db) - ircfg = lifter.new_ircfg_from_asmcfg(asmcfg) + arch = supported_architectures[mach_name] if start_addr is None: - pc = cont.entry_point + pc = ctx.entry_point else: pc = start_addr @@ -273,16 +308,40 @@ def collect_symbolic_trace(binary: str, symb_trace = [] # The resulting list of symbolic transforms per instruction # Run until no more states can be reached - initial_state = record_snapshot(target) + initial_state = target.record_snapshot() while pc is not None: assert(target.read_register('pc') == pc) # Run symbolic execution # It uses the concrete state to resolve symbolic program counters to # concrete values. - pc, strace = _run_block( - pc, MiasmConcreteState(initial_state, loc_db), - lifter, ircfg, mdis) + try: + pc, strace = _run_block( + pc, + MiasmConcreteState(initial_state, ctx.loc_db), + ctx + ) + except DisassemblyError as err: + # This happens if we encounter an instruction that is not + # implemented by Miasm. Try to skip that instruction and continue + # at the next one. + print(f'[WARNING] Skipping instruction at {hex(err.faulty_pc)}...') + + # First, catch up to symbolic trace if required + if err.faulty_pc != pc: + ctrace = _step_until(target, err.faulty_pc) + symb_trace.extend(err.partial_trace) + assert(len(ctrace) - 1 == len(err.partial_trace)) # no ghost instr + + # Now step one more time to skip the faulty instruction + target.step() + if target.is_exited(): + break + + symb_trace.append((err.faulty_pc, {})) # Generate empty transform + pc = target.read_register('pc') + initial_state = target.record_snapshot() + continue if pc is None: break @@ -309,12 +368,12 @@ def collect_symbolic_trace(binary: str, break # Query the new reference state for symbolic execution - initial_state = record_snapshot(target) + initial_state = target.record_snapshot() res = [] for (start, diff), (end, _) in zip(symb_trace[:-1], symb_trace[1:]): - res.append(MiasmSymbolicTransform(diff, arch, loc_db, start, end)) + res.append(MiasmSymbolicTransform(diff, arch, start, end)) start, diff = symb_trace[-1] - res.append(MiasmSymbolicTransform(diff, arch, loc_db, start, start)) + res.append(MiasmSymbolicTransform(diff, arch, start, start)) return res |