diff options
| author | Theofilos Augoustis <theofilos.augoustis@gmail.com> | 2023-11-07 17:11:49 +0100 |
|---|---|---|
| committer | Theofilos Augoustis <theofilos.augoustis@gmail.com> | 2023-11-07 17:11:49 +0100 |
| commit | ca7044cdc7fe99d8065594d455b7f41505f796ab (patch) | |
| tree | 4d0aaa4ccf98ccaf6d213d7487b2464561d3caa9 | |
| parent | 6f01367f9c8ad4c3d641cc63dbb1a3977ff4ec56 (diff) | |
| download | focaccia-ca7044cdc7fe99d8065594d455b7f41505f796ab.tar.gz focaccia-ca7044cdc7fe99d8065594d455b7f41505f796ab.zip | |
Implement symbolic tracing in trace_symbols.py using Angr
| -rw-r--r-- | README.md | 48 | ||||
| -rw-r--r-- | test.py | 180 | ||||
| -rw-r--r-- | trace_symbols.py | 110 |
3 files changed, 158 insertions, 180 deletions
diff --git a/README.md b/README.md index 7cf64cd..04ef446 100644 --- a/README.md +++ b/README.md @@ -3,3 +3,51 @@ This repository contains initial code for comprehensive testing of binary translators. +## Snapshot-comparison framework + +The following files belong to a rough framework for the snapshot comparison engine: + + - `main.py`: Entry point to the tool. Handling of command line arguments, pre-processing of input +logs, etc. + + - `snapshot.py`: Internal structures used to work with snapshots. Contains the previous +`ContextBlock` class, which has been renamed to `ProgramState` to make its purpose as a snapshot of +the program state clearer. + + - `compare.py`: The central algorithms that work on snapshots. + + - `run.py`: Tools to execute native programs and capture their state via an external debugger. + + - `arancini.py`: Functionality specific to working with arancini. Parsing of arancini's logs into our +snapshot structures. + + - `arch/`: Abstractions over different processor architectures. Will be used to integrate support for +more architectures later. Currently, we only have X86. + +## Symbolic execution + +The following files belong to a prototype of a data-dependency generator based on symbolic +execution: + + - `gen_trace.py`: An invokable tool that generates an instruction trace for an executable's native +execution. Is imported into `trace_symbols.py`, which uses the core function that records a trace. + + - `trace_symbols.py`: A simple proof of concept for symbolic data-dependency tracking. Takes an +executable as an argument and does the following: + + 1. Executes the program natively (starting at `main`) and records a trace of every instruction +executed, stopping when exiting `main`. + + 2. Tries to follow this trace of instructions concolically (keeps a concrete program state from +a native execution in parallel to a symbolic program state), recording after each instruction the +changes it has made to the program state before that instruction. + + 3. Writes the program state at each instruction to log files; writes the concrete state of the +real execution to 'concrete.log' and the symbolic difference to 'symbolic.log'. + + This first version is very fragile. As soon as angr can't handle a branch instruction (which is +the case for almost any branch instruction), it aborts with an error. + +## Helpers + + - `lldb_target.py`: Implements angr's `ConcreteTarget` interface for [LLDB](https://lldb.llvm.org/). diff --git a/test.py b/test.py deleted file mode 100644 index 72a1438..0000000 --- a/test.py +++ /dev/null @@ -1,180 +0,0 @@ -import angr -import angr_targets -import claripy as cp -import sys - -from lldb_target import LLDBConcreteTarget - -from arancini import parse_break_addresses -from arch import x86 - -def print_state(state, file=sys.stdout): - for reg in x86.regnames: - try: - val = state.regs.__getattr__(reg.lower()) - print(f'{reg} = {val}', file=file) - except angr.SimConcreteRegisterError: - print(f'Unable to read value of register {reg}: register error', - file=file) - except angr.SimConcreteMemoryError: - print(f'Unable to read value of register {reg}: memory error', - file=file) - except AttributeError: - print(f'Unable to read value of register {reg}: AttributeError', - file=file) - except KeyError: - print(f'Unable to read value of register {reg}: KeyError', - file=file) - -def copy_state(src: angr_targets.ConcreteTarget, dst: angr.SimState): - """Copy a concrete program state to an `angr.SimState` object.""" - # Copy register contents - for reg in x86.regnames: - regname = reg.lower() - try: - dst.regs.__setattr__(regname, src.read_register(regname)) - except angr.SimConcreteRegisterError: - # Register does not exist (i.e. "flag ZF") - pass - - # Copy memory contents - for mapping in src.get_mappings(): - addr = mapping.start_address - size = mapping.end_address - mapping.start_address - try: - dst.memory.store(addr, src.read_memory(addr, size), size) - except angr.SimConcreteMemoryError: - # Invalid memory access - pass - -def symbolize_state(state: angr.SimState): - for reg in x86.regnames: - if reg != 'PC': - symb_val = cp.BVS(reg, 64) - try: - state.regs.__setattr__(reg.lower(), symb_val) - except AttributeError: - pass - -def output_truth(breakpoints: set[int]): - import run - res = run.run_native_execution(BINARY, breakpoints) - with open('truth.log', 'w') as file: - for snapshot in res: - print(cp.BVV(snapshot.regs['PC'], 64), file=file) - -BINARY = "hello-static-musl" -BREAKPOINT_LOG = "emulator-log.txt" - -# Read breakpoint addresses from a file -with open(BREAKPOINT_LOG, "r") as file: - breakpoints = parse_break_addresses(file.readlines()) - -print(f'Found {len(breakpoints)} breakpoints.') - -class ConcreteExecution: - def __init__(self, executable: str, breakpoints: list[int]): - self.target = LLDBConcreteTarget(executable) - self.proj = angr.Project(executable, - concrete_target=self.target, - use_sim_procedures=False) - - # Set the initial state - state = self.proj.factory.entry_state() - state.options.add(angr.options.SYMBION_SYNC_CLE) - state.options.add(angr.options.SYMBION_KEEP_STUBS_ON_SYNC) - self.simgr = self.proj.factory.simgr(state) - self.simgr.use_technique( - angr.exploration_techniques.Symbion(find=breakpoints)) - - def is_running(self): - return not self.target.is_exited() - - def step(self) -> angr.SimState | None: - self.simgr.run() - self.simgr.unstash(to_stash='active', from_stash='found') - if len(self.simgr.active) > 0: - state = self.simgr.active[0] - print(f'-- Concrete execution hit a breakpoint at {state.regs.pc}!') - return state - return None - -class SymbolicExecution: - def __init__(self, executable: str): - self.proj = angr.Project(executable, use_sim_procedures=False) - self.simgr = self.proj.factory.simgr(self.proj.factory.entry_state()) - - def is_running(self): - return len(self.simgr.active) > 0 - - def step(self, find) -> angr.SimState | None: - self.simgr.explore(find=find) - self.simgr.unstash(to_stash='active', from_stash='found') - if len(self.simgr.active) == 0: - print(f'No states found. Stashes: {self.simgr.stashes}') - return None - - state = self.simgr.active[0] - assert(len(self.simgr.active) == 1) - print(f'-- Symbolic execution stopped at {state.regs.pc}!') - print(f' Found the following stashes: {self.simgr.stashes}') - - return state - -output_truth(breakpoints) - -conc = ConcreteExecution(BINARY, list(breakpoints)) -symb = SymbolicExecution(BINARY) - -conc_log = open('concrete.log', 'w') -symb_log = open('symbolic.log', 'w') - -while True: - if not (conc.is_running() and symb.is_running()): - assert(not conc.is_running() and not symb.is_running()) - print(f'Execution has exited.') - exit(0) - - # It seems that we have to copy the program's state manually to the state - # handed to the symbolic engine, otherwise the program emulation is - # incorrect. Something in angr's emulation is scuffed. - copy_state(conc.target, symb.simgr.active[0]) - - # angr performs a sanity check to ensure that the address at which the - # concrete engine stops actually is one of the breakpoints specified by - # the user. This sanity check is faulty because it is performed before the - # user has a chance determine whether the program has exited. If the - # program counter is read after the concrete execution has exited, LLDB - # returns a null value and the check fails, resulting in a crash. This - # try/catch block prevents that. - # - # As of angr commit `cbeace5d7`, this faulty read of the program counter - # can be found at `angr/engines/concrete.py:148`. - try: - conc_state = conc.step() - if conc_state is None: - print(f'Execution has exited: ConcreteExecution.step() returned null.') - exit(0) - except angr.SimConcreteRegisterError: - print(f'Done.') - exit(0) - - pc = conc_state.solver.eval(conc_state.regs.pc) - print(f'-- Trying to find address {hex(pc)} with symbolic execution...') - - # TODO: - #symbolize_state(symb.simgr.active[0]) - symb_state = symb.step(pc) - - # Check exit conditions - if symb_state is None: - print(f'Execution has exited: SymbolicExecution.step() returned null.') - exit(0) - assert(pc == symb_state.solver.eval(symb_state.regs.pc)) - - # Log some stuff - print(f'-- Concrete breakpoint {conc_state.regs.pc}' - f' vs symbolic breakpoint {symb_state.regs.pc}') - - print(conc_state.regs.pc, file=conc_log) - print(symb_state.regs.pc, file=symb_log) diff --git a/trace_symbols.py b/trace_symbols.py new file mode 100644 index 0000000..c16cd6e --- /dev/null +++ b/trace_symbols.py @@ -0,0 +1,110 @@ +import angr +import argparse +import claripy as cp +import sys + +from angr.exploration_techniques import Symbion +from arch import x86 +from gen_trace import record_trace +from lldb_target import LLDBConcreteTarget + +def print_state(state: angr.SimState, file=sys.stdout): + """Print a program state in a fancy way.""" + print('-' * 80, file=file) + print(f'State at {hex(state.addr)}:', file=file) + print('-' * 80, file=file) + for reg in x86.regnames: + try: + val = state.regs.__getattr__(reg.lower()) + print(f'{reg} = {val}', file=file) + except angr.SimConcreteRegisterError: pass + except angr.SimConcreteMemoryError: pass + except AttributeError: pass + except KeyError: pass + + # Print some of the stack + print('\nStack:', file=file) + try: + rbp = state.regs.rbp + stack_size = 0xc + stack_mem = state.memory.load(rbp - stack_size, stack_size) + print(stack_mem, file=file) + stack = state.solver.eval(stack_mem, cast_to=bytes) + print(' '.join(f'{b:02x}' for b in stack[::-1]), file=file) + except angr.SimConcreteMemoryError: + print('<unable to read memory>', file=file) + print('-' * 80, file=file) + +def symbolize_state(state: angr.SimState, + exclude: list[str] = ['PC', 'RBP', 'RSP']) \ + -> angr.SimState: + """Create a copy of a SimState and replace most of it with symbolic + values. + + Leaves pc, rbp, and rsp concrete by default. This can be configured with + the `exclude` parameter. + + :return: A symbolized SymState object. + """ + state = state.copy() + + stack_size = 0xc + symb_stack = cp.BVS('stack', stack_size * 8) + state.memory.store(state.regs.rbp - stack_size, symb_stack) + + _exclude = set(exclude) + for reg in x86.regnames: + if reg not in _exclude: + symb_val = cp.BVS(reg, 64) + try: + state.regs.__setattr__(reg.lower(), symb_val) + except AttributeError: + pass + return state + +def parse_args(): + prog = argparse.ArgumentParser() + prog.add_argument('binary', type=str) + return prog.parse_args() + +def main(): + args = parse_args() + binary = args.binary + + conc_log = open('concrete.log', 'w') + symb_log = open('symbolic.log', 'w') + + # Generate a program trace from a real execution + trace = record_trace(binary) + print(f'Found {len(trace)} trace points.') + + target = LLDBConcreteTarget(binary) + proj = angr.Project(binary, + concrete_target=target, + use_sim_procedures=False) + + entry_state = proj.factory.entry_state() + entry_state.options.add(angr.options.SYMBION_KEEP_STUBS_ON_SYNC) + entry_state.options.add(angr.options.SYMBION_SYNC_CLE) + + for cur_inst, next_inst in zip(trace[0:-1], trace[1:]): + symbion = proj.factory.simgr(entry_state) + symbion.use_technique(Symbion(find=[cur_inst])) + + conc_exploration = symbion.run() + conc_state = conc_exploration.found[0] + + # Start symbolic execution with the concrete ('truth') state and try + # to reach the next instruction in the trace + simgr = proj.factory.simgr(symbolize_state(conc_state)) + symb_exploration = simgr.explore(find=next_inst) + if len(symb_exploration.found) == 0: + print(f'Symbolic execution can\'t reach address {hex(next_inst)}' + f' from {hex(cur_inst)}. Exiting.') + exit(1) + + print_state(conc_state, file=conc_log) + print_state(symb_exploration.found[0], file=symb_log) + +if __name__ == "__main__": + main() |