about summary refs log tree commit diff stats
diff options
context:
space:
mode:
authorTheofilos Augoustis <theofilos.augoustis@gmail.com>2023-11-07 17:11:49 +0100
committerTheofilos Augoustis <theofilos.augoustis@gmail.com>2023-11-07 17:11:49 +0100
commitca7044cdc7fe99d8065594d455b7f41505f796ab (patch)
tree4d0aaa4ccf98ccaf6d213d7487b2464561d3caa9
parent6f01367f9c8ad4c3d641cc63dbb1a3977ff4ec56 (diff)
downloadfocaccia-ca7044cdc7fe99d8065594d455b7f41505f796ab.tar.gz
focaccia-ca7044cdc7fe99d8065594d455b7f41505f796ab.zip
Implement symbolic tracing in trace_symbols.py using Angr
Diffstat (limited to '')
-rw-r--r--README.md48
-rw-r--r--test.py180
-rw-r--r--trace_symbols.py110
3 files changed, 158 insertions, 180 deletions
diff --git a/README.md b/README.md
index 7cf64cd..04ef446 100644
--- a/README.md
+++ b/README.md
@@ -3,3 +3,51 @@
 This repository contains initial code for comprehensive testing of binary
 translators.
 
+## Snapshot-comparison framework
+
+The following files belong to a rough framework for the snapshot comparison engine:
+
+ - `main.py`: Entry point to the tool. Handling of command line arguments, pre-processing of input
+logs, etc.
+
+ - `snapshot.py`: Internal structures used to work with snapshots. Contains the previous
+`ContextBlock` class, which has been renamed to `ProgramState` to make its purpose as a snapshot of
+the program state clearer.
+
+ - `compare.py`: The central algorithms that work on snapshots.
+
+ - `run.py`: Tools to execute native programs and capture their state via an external debugger.
+
+ - `arancini.py`: Functionality specific to working with arancini. Parsing of arancini's logs into our
+snapshot structures.
+
+ - `arch/`: Abstractions over different processor architectures. Will be used to integrate support for
+more architectures later. Currently, we only have X86.
+
+## Symbolic execution
+
+The following files belong to a prototype of a data-dependency generator based on symbolic
+execution:
+
+ - `gen_trace.py`: An invokable tool that generates an instruction trace for an executable's native
+execution. Is imported into `trace_symbols.py`, which uses the core function that records a trace.
+
+ - `trace_symbols.py`: A simple proof of concept for symbolic data-dependency tracking. Takes an
+executable as an argument and does the following:
+
+    1. Executes the program natively (starting at `main`) and records a trace of every instruction
+executed, stopping when exiting `main`.
+
+    2. Tries to follow this trace of instructions concolically (keeps a concrete program state from
+a native execution in parallel to a symbolic program state), recording after each instruction the
+changes it has made to the program state before that instruction.
+
+    3. Writes the program state at each instruction to log files; writes the concrete state of the
+real execution to 'concrete.log' and the symbolic difference to 'symbolic.log'.
+
+    This first version is very fragile. As soon as angr can't handle a branch instruction (which is
+the case for almost any branch instruction), it aborts with an error.
+
+## Helpers
+
+ - `lldb_target.py`: Implements angr's `ConcreteTarget` interface for [LLDB](https://lldb.llvm.org/).
diff --git a/test.py b/test.py
deleted file mode 100644
index 72a1438..0000000
--- a/test.py
+++ /dev/null
@@ -1,180 +0,0 @@
-import angr
-import angr_targets
-import claripy as cp
-import sys
-
-from lldb_target import LLDBConcreteTarget
-
-from arancini import parse_break_addresses
-from arch import x86
-
-def print_state(state, file=sys.stdout):
-    for reg in x86.regnames:
-        try:
-            val = state.regs.__getattr__(reg.lower())
-            print(f'{reg} = {val}', file=file)
-        except angr.SimConcreteRegisterError:
-            print(f'Unable to read value of register {reg}: register error',
-                  file=file)
-        except angr.SimConcreteMemoryError:
-            print(f'Unable to read value of register {reg}: memory error',
-                  file=file)
-        except AttributeError:
-            print(f'Unable to read value of register {reg}: AttributeError',
-                  file=file)
-        except KeyError:
-            print(f'Unable to read value of register {reg}: KeyError',
-                  file=file)
-
-def copy_state(src: angr_targets.ConcreteTarget, dst: angr.SimState):
-    """Copy a concrete program state to an `angr.SimState` object."""
-    # Copy register contents
-    for reg in x86.regnames:
-        regname = reg.lower()
-        try:
-            dst.regs.__setattr__(regname, src.read_register(regname))
-        except angr.SimConcreteRegisterError:
-            # Register does not exist (i.e. "flag ZF")
-            pass
-
-    # Copy memory contents
-    for mapping in src.get_mappings():
-        addr = mapping.start_address
-        size = mapping.end_address - mapping.start_address
-        try:
-            dst.memory.store(addr, src.read_memory(addr, size), size)
-        except angr.SimConcreteMemoryError:
-            # Invalid memory access
-            pass
-
-def symbolize_state(state: angr.SimState):
-    for reg in x86.regnames:
-        if reg != 'PC':
-            symb_val = cp.BVS(reg, 64)
-            try:
-                state.regs.__setattr__(reg.lower(), symb_val)
-            except AttributeError:
-                pass
-
-def output_truth(breakpoints: set[int]):
-    import run
-    res = run.run_native_execution(BINARY, breakpoints)
-    with open('truth.log', 'w') as file:
-        for snapshot in res:
-            print(cp.BVV(snapshot.regs['PC'], 64), file=file)
-
-BINARY = "hello-static-musl"
-BREAKPOINT_LOG = "emulator-log.txt"
-
-# Read breakpoint addresses from a file
-with open(BREAKPOINT_LOG, "r") as file:
-    breakpoints = parse_break_addresses(file.readlines())
-
-print(f'Found {len(breakpoints)} breakpoints.')
-
-class ConcreteExecution:
-    def __init__(self, executable: str, breakpoints: list[int]):
-        self.target = LLDBConcreteTarget(executable)
-        self.proj = angr.Project(executable,
-                                 concrete_target=self.target,
-                                 use_sim_procedures=False)
-
-        # Set the initial state
-        state = self.proj.factory.entry_state()
-        state.options.add(angr.options.SYMBION_SYNC_CLE)
-        state.options.add(angr.options.SYMBION_KEEP_STUBS_ON_SYNC)
-        self.simgr = self.proj.factory.simgr(state)
-        self.simgr.use_technique(
-            angr.exploration_techniques.Symbion(find=breakpoints))
-
-    def is_running(self):
-        return not self.target.is_exited()
-
-    def step(self) -> angr.SimState | None:
-        self.simgr.run()
-        self.simgr.unstash(to_stash='active', from_stash='found')
-        if len(self.simgr.active) > 0:
-            state = self.simgr.active[0]
-            print(f'-- Concrete execution hit a breakpoint at {state.regs.pc}!')
-            return state
-        return None
-
-class SymbolicExecution:
-    def __init__(self, executable: str):
-        self.proj = angr.Project(executable, use_sim_procedures=False)
-        self.simgr = self.proj.factory.simgr(self.proj.factory.entry_state())
-
-    def is_running(self):
-        return len(self.simgr.active) > 0
-
-    def step(self, find) -> angr.SimState | None:
-        self.simgr.explore(find=find)
-        self.simgr.unstash(to_stash='active', from_stash='found')
-        if len(self.simgr.active) == 0:
-            print(f'No states found. Stashes: {self.simgr.stashes}')
-            return None
-
-        state = self.simgr.active[0]
-        assert(len(self.simgr.active) == 1)
-        print(f'-- Symbolic execution stopped at {state.regs.pc}!')
-        print(f'   Found the following stashes: {self.simgr.stashes}')
-
-        return state
-
-output_truth(breakpoints)
-
-conc = ConcreteExecution(BINARY, list(breakpoints))
-symb = SymbolicExecution(BINARY)
-
-conc_log = open('concrete.log', 'w')
-symb_log = open('symbolic.log', 'w')
-
-while True:
-    if not (conc.is_running() and symb.is_running()):
-        assert(not conc.is_running() and not symb.is_running())
-        print(f'Execution has exited.')
-        exit(0)
-
-    # It seems that we have to copy the program's state manually to the state
-    # handed to the symbolic engine, otherwise the program emulation is
-    # incorrect. Something in angr's emulation is scuffed.
-    copy_state(conc.target, symb.simgr.active[0])
-
-    # angr performs a sanity check to ensure that the address at which the
-    # concrete engine stops actually is one of the breakpoints specified by
-    # the user. This sanity check is faulty because it is performed before the
-    # user has a chance determine whether the program has exited. If the
-    # program counter is read after the concrete execution has exited, LLDB
-    # returns a null value and the check fails, resulting in a crash. This
-    # try/catch block prevents that.
-    #
-    # As of angr commit `cbeace5d7`, this faulty read of the program counter
-    # can be found at `angr/engines/concrete.py:148`.
-    try:
-        conc_state = conc.step()
-        if conc_state is None:
-            print(f'Execution has exited: ConcreteExecution.step() returned null.')
-            exit(0)
-    except angr.SimConcreteRegisterError:
-        print(f'Done.')
-        exit(0)
-
-    pc = conc_state.solver.eval(conc_state.regs.pc)
-    print(f'-- Trying to find address {hex(pc)} with symbolic execution...')
-
-    # TODO:
-    #symbolize_state(symb.simgr.active[0])
-    symb_state = symb.step(pc)
-
-    # Check exit conditions
-    if symb_state is None:
-        print(f'Execution has exited: SymbolicExecution.step() returned null.')
-        exit(0)
-    assert(pc == symb_state.solver.eval(symb_state.regs.pc))
-
-    # Log some stuff
-    print(f'-- Concrete breakpoint {conc_state.regs.pc}'
-          f' vs symbolic breakpoint {symb_state.regs.pc}')
-
-    print(conc_state.regs.pc, file=conc_log)
-    print(symb_state.regs.pc, file=symb_log)
diff --git a/trace_symbols.py b/trace_symbols.py
new file mode 100644
index 0000000..c16cd6e
--- /dev/null
+++ b/trace_symbols.py
@@ -0,0 +1,110 @@
+import angr
+import argparse
+import claripy as cp
+import sys
+
+from angr.exploration_techniques import Symbion
+from arch import x86
+from gen_trace import record_trace
+from lldb_target import LLDBConcreteTarget
+
+def print_state(state: angr.SimState, file=sys.stdout):
+    """Print a program state in a fancy way."""
+    print('-' * 80, file=file)
+    print(f'State at {hex(state.addr)}:', file=file)
+    print('-' * 80, file=file)
+    for reg in x86.regnames:
+        try:
+            val = state.regs.__getattr__(reg.lower())
+            print(f'{reg} = {val}', file=file)
+        except angr.SimConcreteRegisterError: pass
+        except angr.SimConcreteMemoryError: pass
+        except AttributeError: pass
+        except KeyError: pass
+
+    # Print some of the stack
+    print('\nStack:', file=file)
+    try:
+        rbp = state.regs.rbp
+        stack_size = 0xc
+        stack_mem = state.memory.load(rbp - stack_size, stack_size)
+        print(stack_mem, file=file)
+        stack = state.solver.eval(stack_mem, cast_to=bytes)
+        print(' '.join(f'{b:02x}' for b in stack[::-1]), file=file)
+    except angr.SimConcreteMemoryError:
+        print('<unable to read memory>', file=file)
+    print('-' * 80, file=file)
+
+def symbolize_state(state: angr.SimState,
+                    exclude: list[str] = ['PC', 'RBP', 'RSP']) \
+        -> angr.SimState:
+    """Create a copy of a SimState and replace most of it with symbolic
+    values.
+
+    Leaves pc, rbp, and rsp concrete by default. This can be configured with
+    the `exclude` parameter.
+
+    :return: A symbolized SymState object.
+    """
+    state = state.copy()
+
+    stack_size = 0xc
+    symb_stack = cp.BVS('stack', stack_size * 8)
+    state.memory.store(state.regs.rbp - stack_size, symb_stack)
+
+    _exclude = set(exclude)
+    for reg in x86.regnames:
+        if reg not in _exclude:
+            symb_val = cp.BVS(reg, 64)
+            try:
+                state.regs.__setattr__(reg.lower(), symb_val)
+            except AttributeError:
+                pass
+    return state
+
+def parse_args():
+    prog = argparse.ArgumentParser()
+    prog.add_argument('binary', type=str)
+    return prog.parse_args()
+
+def main():
+    args = parse_args()
+    binary = args.binary
+
+    conc_log = open('concrete.log', 'w')
+    symb_log = open('symbolic.log', 'w')
+
+    # Generate a program trace from a real execution
+    trace = record_trace(binary)
+    print(f'Found {len(trace)} trace points.')
+
+    target = LLDBConcreteTarget(binary)
+    proj = angr.Project(binary,
+                        concrete_target=target,
+                        use_sim_procedures=False)
+
+    entry_state = proj.factory.entry_state()
+    entry_state.options.add(angr.options.SYMBION_KEEP_STUBS_ON_SYNC)
+    entry_state.options.add(angr.options.SYMBION_SYNC_CLE)
+
+    for cur_inst, next_inst in zip(trace[0:-1], trace[1:]):
+        symbion = proj.factory.simgr(entry_state)
+        symbion.use_technique(Symbion(find=[cur_inst]))
+
+        conc_exploration = symbion.run()
+        conc_state = conc_exploration.found[0]
+
+        # Start symbolic execution with the concrete ('truth') state and try
+        # to reach the next instruction in the trace
+        simgr = proj.factory.simgr(symbolize_state(conc_state))
+        symb_exploration = simgr.explore(find=next_inst)
+        if len(symb_exploration.found) == 0:
+            print(f'Symbolic execution can\'t reach address {hex(next_inst)}'
+                  f' from {hex(cur_inst)}. Exiting.')
+            exit(1)
+
+        print_state(conc_state, file=conc_log)
+        print_state(symb_exploration.found[0], file=symb_log)
+
+if __name__ == "__main__":
+    main()