about summary refs log tree commit diff stats
path: root/test.py
diff options
context:
space:
mode:
authorTheofilos Augoustis <theofilos.augoustis@gmail.com>2023-10-21 16:39:49 +0200
committerTheofilos Augoustis <theofilos.augoustis@gmail.com>2023-10-21 16:39:49 +0200
commit6f01367f9c8ad4c3d641cc63dbb1a3977ff4ec56 (patch)
tree5b9677b9a5cca449497cea4418b2bb10e2ab0509 /test.py
parent83d4b4dbe6f20c2fa7865e4888b89e888d3509f9 (diff)
downloadfocaccia-6f01367f9c8ad4c3d641cc63dbb1a3977ff4ec56.tar.gz
focaccia-6f01367f9c8ad4c3d641cc63dbb1a3977ff4ec56.zip
Support for testing concrete and emulated execution with angr
Diffstat (limited to 'test.py')
-rw-r--r--test.py180
1 files changed, 180 insertions, 0 deletions
diff --git a/test.py b/test.py
new file mode 100644
index 0000000..72a1438
--- /dev/null
+++ b/test.py
@@ -0,0 +1,180 @@
+import angr
+import angr_targets
+import claripy as cp
+import sys
+
+from lldb_target import LLDBConcreteTarget
+
+from arancini import parse_break_addresses
+from arch import x86
+
+def print_state(state, file=sys.stdout):
+    for reg in x86.regnames:
+        try:
+            val = state.regs.__getattr__(reg.lower())
+            print(f'{reg} = {val}', file=file)
+        except angr.SimConcreteRegisterError:
+            print(f'Unable to read value of register {reg}: register error',
+                  file=file)
+        except angr.SimConcreteMemoryError:
+            print(f'Unable to read value of register {reg}: memory error',
+                  file=file)
+        except AttributeError:
+            print(f'Unable to read value of register {reg}: AttributeError',
+                  file=file)
+        except KeyError:
+            print(f'Unable to read value of register {reg}: KeyError',
+                  file=file)
+
+def copy_state(src: angr_targets.ConcreteTarget, dst: angr.SimState):
+    """Copy a concrete program state to an `angr.SimState` object."""
+    # Copy register contents
+    for reg in x86.regnames:
+        regname = reg.lower()
+        try:
+            dst.regs.__setattr__(regname, src.read_register(regname))
+        except angr.SimConcreteRegisterError:
+            # Register does not exist (i.e. "flag ZF")
+            pass
+
+    # Copy memory contents
+    for mapping in src.get_mappings():
+        addr = mapping.start_address
+        size = mapping.end_address - mapping.start_address
+        try:
+            dst.memory.store(addr, src.read_memory(addr, size), size)
+        except angr.SimConcreteMemoryError:
+            # Invalid memory access
+            pass
+
+def symbolize_state(state: angr.SimState):
+    for reg in x86.regnames:
+        if reg != 'PC':
+            symb_val = cp.BVS(reg, 64)
+            try:
+                state.regs.__setattr__(reg.lower(), symb_val)
+            except AttributeError:
+                pass
+
+def output_truth(breakpoints: set[int]):
+    import run
+    res = run.run_native_execution(BINARY, breakpoints)
+    with open('truth.log', 'w') as file:
+        for snapshot in res:
+            print(cp.BVV(snapshot.regs['PC'], 64), file=file)
+
+BINARY = "hello-static-musl"
+BREAKPOINT_LOG = "emulator-log.txt"
+
+# Read breakpoint addresses from a file
+with open(BREAKPOINT_LOG, "r") as file:
+    breakpoints = parse_break_addresses(file.readlines())
+
+print(f'Found {len(breakpoints)} breakpoints.')
+
+class ConcreteExecution:
+    def __init__(self, executable: str, breakpoints: list[int]):
+        self.target = LLDBConcreteTarget(executable)
+        self.proj = angr.Project(executable,
+                                 concrete_target=self.target,
+                                 use_sim_procedures=False)
+
+        # Set the initial state
+        state = self.proj.factory.entry_state()
+        state.options.add(angr.options.SYMBION_SYNC_CLE)
+        state.options.add(angr.options.SYMBION_KEEP_STUBS_ON_SYNC)
+        self.simgr = self.proj.factory.simgr(state)
+        self.simgr.use_technique(
+            angr.exploration_techniques.Symbion(find=breakpoints))
+
+    def is_running(self):
+        return not self.target.is_exited()
+
+    def step(self) -> angr.SimState | None:
+        self.simgr.run()
+        self.simgr.unstash(to_stash='active', from_stash='found')
+        if len(self.simgr.active) > 0:
+            state = self.simgr.active[0]
+            print(f'-- Concrete execution hit a breakpoint at {state.regs.pc}!')
+            return state
+        return None
+
+class SymbolicExecution:
+    def __init__(self, executable: str):
+        self.proj = angr.Project(executable, use_sim_procedures=False)
+        self.simgr = self.proj.factory.simgr(self.proj.factory.entry_state())
+
+    def is_running(self):
+        return len(self.simgr.active) > 0
+
+    def step(self, find) -> angr.SimState | None:
+        self.simgr.explore(find=find)
+        self.simgr.unstash(to_stash='active', from_stash='found')
+        if len(self.simgr.active) == 0:
+            print(f'No states found. Stashes: {self.simgr.stashes}')
+            return None
+
+        state = self.simgr.active[0]
+        assert(len(self.simgr.active) == 1)
+        print(f'-- Symbolic execution stopped at {state.regs.pc}!')
+        print(f'   Found the following stashes: {self.simgr.stashes}')
+
+        return state
+
+output_truth(breakpoints)
+
+conc = ConcreteExecution(BINARY, list(breakpoints))
+symb = SymbolicExecution(BINARY)
+
+conc_log = open('concrete.log', 'w')
+symb_log = open('symbolic.log', 'w')
+
+while True:
+    if not (conc.is_running() and symb.is_running()):
+        assert(not conc.is_running() and not symb.is_running())
+        print(f'Execution has exited.')
+        exit(0)
+
+    # It seems that we have to copy the program's state manually to the state
+    # handed to the symbolic engine, otherwise the program emulation is
+    # incorrect. Something in angr's emulation is scuffed.
+    copy_state(conc.target, symb.simgr.active[0])
+
+    # angr performs a sanity check to ensure that the address at which the
+    # concrete engine stops actually is one of the breakpoints specified by
+    # the user. This sanity check is faulty because it is performed before the
+    # user has a chance determine whether the program has exited. If the
+    # program counter is read after the concrete execution has exited, LLDB
+    # returns a null value and the check fails, resulting in a crash. This
+    # try/catch block prevents that.
+    #
+    # As of angr commit `cbeace5d7`, this faulty read of the program counter
+    # can be found at `angr/engines/concrete.py:148`.
+    try:
+        conc_state = conc.step()
+        if conc_state is None:
+            print(f'Execution has exited: ConcreteExecution.step() returned null.')
+            exit(0)
+    except angr.SimConcreteRegisterError:
+        print(f'Done.')
+        exit(0)
+
+    pc = conc_state.solver.eval(conc_state.regs.pc)
+    print(f'-- Trying to find address {hex(pc)} with symbolic execution...')
+
+    # TODO:
+    #symbolize_state(symb.simgr.active[0])
+    symb_state = symb.step(pc)
+
+    # Check exit conditions
+    if symb_state is None:
+        print(f'Execution has exited: SymbolicExecution.step() returned null.')
+        exit(0)
+    assert(pc == symb_state.solver.eval(symb_state.regs.pc))
+
+    # Log some stuff
+    print(f'-- Concrete breakpoint {conc_state.regs.pc}'
+          f' vs symbolic breakpoint {symb_state.regs.pc}')
+
+    print(conc_state.regs.pc, file=conc_log)
+    print(symb_state.regs.pc, file=symb_log)