about summary refs log tree commit diff stats
diff options
context:
space:
mode:
authorTheofilos Augoustis <theofilos.augoustis@gmail.com>2025-10-24 15:28:15 +0000
committerTheofilos Augoustis <theofilos.augoustis@gmail.com>2025-11-06 17:20:13 +0000
commitae65ae4d0a3013b83ae9bdf0b3b91291748c9d96 (patch)
treef0115bcf775f68cc07157db1ec11180a4f678789
parentaba970d960a4842bbd899f557c8e0229ec20ad0c (diff)
downloadfocaccia-ae65ae4d0a3013b83ae9bdf0b3b91291748c9d96.tar.gz
focaccia-ae65ae4d0a3013b83ae9bdf0b3b91291748c9d96.zip
Enable tracing without single stepping
-rw-r--r--src/focaccia/arch/aarch64.py4
-rw-r--r--src/focaccia/arch/arch.py4
-rw-r--r--src/focaccia/arch/x86.py7
-rw-r--r--src/focaccia/lldb_target.py11
-rw-r--r--src/focaccia/parser.py1
-rw-r--r--src/focaccia/symbolic.py224
6 files changed, 160 insertions, 91 deletions
diff --git a/src/focaccia/arch/aarch64.py b/src/focaccia/arch/aarch64.py
index 0e7d98c..76f7bd4 100644
--- a/src/focaccia/arch/aarch64.py
+++ b/src/focaccia/arch/aarch64.py
@@ -179,3 +179,7 @@ class ArchAArch64(Arch):
             from . import aarch64_dczid as dczid
             return dczid.read
         return None
+
+    def is_instr_syscall(self, instr: str) -> bool:
+        return instr.upper().startswith('SVC')
+
diff --git a/src/focaccia/arch/arch.py b/src/focaccia/arch/arch.py
index a1b0671..c220a3b 100644
--- a/src/focaccia/arch/arch.py
+++ b/src/focaccia/arch/arch.py
@@ -103,8 +103,12 @@ class Arch():
         """
         return False
 
+    def is_instr_syscall(self, instr: str) -> bool:
+        return False
+
     def __eq__(self, other):
         return self.archname == other.archname
 
     def __repr__(self) -> str:
         return self.archname
+
diff --git a/src/focaccia/arch/x86.py b/src/focaccia/arch/x86.py
index 9ec5c51..a5d29f5 100644
--- a/src/focaccia/arch/x86.py
+++ b/src/focaccia/arch/x86.py
@@ -208,3 +208,10 @@ class ArchX86(Arch):
             return True
         return False
 
+    def is_instr_syscall(self, instr: str) -> bool:
+        if instr.upper().startswith("SYSCALL"):
+            return True
+        if instr.upper().startswith("INT"):
+            return True
+        return False
+
diff --git a/src/focaccia/lldb_target.py b/src/focaccia/lldb_target.py
index f529f9c..283c5bf 100644
--- a/src/focaccia/lldb_target.py
+++ b/src/focaccia/lldb_target.py
@@ -111,7 +111,6 @@ class LLDBConcreteTarget:
         if state == lldb.eStateExited:
             raise RuntimeError('Tried to resume process execution, but the'
                                ' process has already exited.')
-        assert(state == lldb.eStateStopped)
         self.process.Continue()
 
     def step(self):
@@ -122,8 +121,12 @@ class LLDBConcreteTarget:
     def run_until(self, address: int) -> None:
         """Continue execution until the address is arrived, ignores other breakpoints"""
         bp = self.target.BreakpointCreateByAddress(address)
-        while self.read_register("pc") != address:
+        while True:
             self.run()
+            if self.is_exited():
+                return
+            if self.read_register('pc') == address:
+                break
         self.target.BreakpointDelete(bp.GetID())
 
     def record_snapshot(self) -> ProgramState:
@@ -241,7 +244,7 @@ class LLDBConcreteTarget:
                 f'[In LLDBConcreteTarget.write_register]: Unable to set'
                 f' {regname} to value {hex(value)}!')
 
-    def read_memory(self, addr, size):
+    def read_memory(self, addr: int, size: int) -> bytes:
         """Read bytes from memory.
 
         :raise ConcreteMemoryError: If unable to read `size` bytes from `addr`.
@@ -256,7 +259,7 @@ class LLDBConcreteTarget:
         else:
             return bytes(reversed(content))
 
-    def write_memory(self, addr, value: bytes):
+    def write_memory(self, addr: int, value: bytes):
         """Write bytes to memory.
 
         :raise ConcreteMemoryError: If unable to write at `addr`.
diff --git a/src/focaccia/parser.py b/src/focaccia/parser.py
index e9e5e0c..c157a36 100644
--- a/src/focaccia/parser.py
+++ b/src/focaccia/parser.py
@@ -201,3 +201,4 @@ def parse_box64(stream: TextIO, arch: Arch) -> Trace[ProgramState]:
                 states[-1].set_register(regname, int(value, 16))
 
     return Trace(states, _make_unknown_env())
+
diff --git a/src/focaccia/symbolic.py b/src/focaccia/symbolic.py
index b4b29f6..270de55 100644
--- a/src/focaccia/symbolic.py
+++ b/src/focaccia/symbolic.py
@@ -1,4 +1,4 @@
-"""Tools and utilities for symbolic execution with Miasm."""
+"""Tools and utilities for  execution with Miasm."""
 
 from __future__ import annotations
 import logging
@@ -429,7 +429,7 @@ class SymbolicTransform:
 
     def __repr__(self) -> str:
         start, end = self.range
-        res = f'Symbolic state transformation {hex(start)} -> {hex(end)}:\n'
+        res = f'Symbolic state transformation {start} -> {end}:\n'
         res += '  [Symbols]\n'
         for reg, expr in self.changed_regs.items():
             res += f'    {reg:6s} = {expr}\n'
@@ -452,8 +452,8 @@ class MemoryBinstream:
 
     def __getitem__(self, key: int | slice):
         if isinstance(key, slice):
-            return self._state.read_memory(key.start, key.stop - key.start)
-        return self._state.read_memory(key, 1)
+            return self._state.read_instructions(key.start, key.stop - key.start)
+        return self._state.read_instructions(key, 1)
 
 class DisassemblyContext:
     def __init__(self, target: ReadableProgramState):
@@ -597,31 +597,77 @@ def run_instruction(instr: miasm_instr,
 
     return new_pc, modified
 
-class _LLDBConcreteState(ReadableProgramState):
-    """A wrapper around `LLDBConcreteTarget` that provides access via a
-    `ReadableProgramState` interface. Reads values directly from an LLDB
-    target. This saves us the trouble of recording a full program state, and
-    allows us instead to read values from LLDB on demand.
-    """
+class SpeculativeTracer(ReadableProgramState):
     def __init__(self, target: LLDBConcreteTarget):
         super().__init__(target.arch)
-        self._target = target
+        self.target = target
+        self.pc = target.read_register('pc')
+        self.speculative_pc: int | None = None
+        self.speculative_count: int = 0
+
+    def speculate(self, new_pc: int):
+        self.speculative_pc = new_pc
+        self.speculative_count += 1
+
+    def progress_execution(self) -> None:
+        if self.speculative_pc is not None and self.speculative_count != 0:
+            debug(f'Updating PC to {hex(self.speculative_pc)}')
+            if self.speculative_count == 1:
+                self.target.step()
+            else:
+                self.target.run_until(self.speculative_pc)
+
+            self.pc = self.speculative_pc
+            self.speculative_pc = None
+            self.speculative_count = 0
+
+    def run_until(self, addr: int):
+        if self.speculative_pc:
+            raise Exception('Attempting manual execution with speculative execution enabled')
+        self.target.run_until(addr)
+        self.pc = addr
+
+    def step(self):
+        self.progress_execution()
+        if self.target.is_exited():
+            return
+        self.target.step()
+        self.pc = self.target.read_register('pc')
+
+    def read_pc(self) -> int:
+        if self.speculative_pc is not None:
+            return self.speculative_pc
+        return self.pc
+
+    def read_flags(self) -> dict[str, int | bool]:
+        self.progress_execution()
+        return self.target.read_flags()
 
     def read_register(self, reg: str) -> int:
         regname = self.arch.to_regname(reg)
         if regname is None:
             raise RegisterAccessError(reg, f'Not a register name: {reg}')
 
-        try:
-            return self._target.read_register(regname)
-        except ConcreteRegisterError:
-            raise RegisterAccessError(regname, '')
+        self.progress_execution()
+        return self.target.read_register(regname)
+
+    def write_register(self, regname: str, value: int):
+        self.progress_execution()
+        self.target.write_register(regname, value)
+
+    def read_instructions(self, addr: int, size: int) -> bytes:
+        return self.target.read_memory(addr, size)
 
     def read_memory(self, addr: int, size: int) -> bytes:
-        try:
-            return self._target.read_memory(addr, size)
-        except ConcreteMemoryError:
-            raise MemoryAccessError(addr, size, 'Unable to read memory from LLDB.')
+        self.progress_execution()
+        return self.target.read_memory(addr, size)
+
+    def write_memory(self, addr: int, value: bytes):
+        self.progress_execution()
+        self.target.write_memory(addr, value)
+
+    def __getattr__(self, name: str):
+        return getattr(self.target, name)
 
 class SymbolicTracer:
     """A symbolic tracer that uses `LLDBConcreteTarget` with Miasm to simultaneously execute a
@@ -636,6 +682,7 @@ class SymbolicTracer:
         self.force = force
         self.remote = remote
         self.cross_validate = cross_validate
+        self.target = SpeculativeTracer(self.create_debug_target())
 
     def create_debug_target(self) -> LLDBConcreteTarget:
         binary = self.env.binary_name
@@ -659,40 +706,50 @@ class SymbolicTracer:
 
         return target
 
-    def step_to_next(self, target, instruction, transform, lldb_state) -> bool:
-        if self.cross_validate:
-            debug(f'Evaluating register and memory transforms for {instruction} to cross-validate')
-            predicted_regs = transform.eval_register_transforms(lldb_state)
-            predicted_mems = transform.eval_memory_transforms(lldb_state)
-
-        # Step forward
-        target.step()
-        if target.is_exited():
-            return False
-
+    def predict_next_state(self, instruction: Instruction, transform: SymbolicTransform):
+        debug(f'Evaluating register and memory transforms for {instruction} to cross-validate')
+        predicted_regs = transform.eval_register_transforms(self.target)
+        predicted_mems = transform.eval_memory_transforms(self.target)
+        return predicted_regs, predicted_mems
+
+    def validate(self,
+                 instruction: Instruction,
+                 transform: SymbolicTransform,
+                 predicted_regs: dict[str, int],
+                 predicted_mems: dict[int, bytes]):
         # Verify last generated transform by comparing concrete state against
         # predicted values.
-        if self.cross_validate:
-            debug('Cross-validating symbolic transforms by comparing actual to predicted values')
-            for reg, val in predicted_regs.items():
-                conc_val = lldb_state.read_register(reg)
-                if conc_val != val:
-                    warn(f'Symbolic execution backend generated false equation for'
-                         f' [{hex(instruction.addr)}]: {instruction}:'
-                         f' Predicted {reg} = {hex(val)}, but the'
-                         f' concrete state has value {reg} = {hex(conc_val)}.'
-                         f'\nFaulty transformation: {transform}')
-            for addr, data in predicted_mems.items():
-                conc_data = lldb_state.read_memory(addr, len(data))
-                if conc_data != data:
-                    warn(f'Symbolic execution backend generated false equation for'
-                         f' [{hex(instruction.addr)}]: {instruction}: Predicted'
-                         f' mem[{hex(addr)}:{hex(addr+len(data))}] = {data},'
-                         f' but the concrete state has value'
-                         f' mem[{hex(addr)}:{hex(addr+len(data))}] = {conc_data}.'
-                         f'\nFaulty transformation: {transform}')
-                    raise Exception()
-        return True
+        if self.target.is_exited():
+            return
+
+        debug('Cross-validating symbolic transforms by comparing actual to predicted values')
+        for reg, val in predicted_regs.items():
+            conc_val = self.target.read_register(reg)
+            if conc_val != val:
+                warn(f'Symbolic execution backend generated false equation for'
+                     f' [{hex(instruction.addr)}]: {instruction}:'
+                     f' Predicted {reg} = {hex(val)}, but the'
+                     f' concrete state has value {reg} = {hex(conc_val)}.'
+                     f'\nFaulty transformation: {transform}')
+                raise Exception()
+        for addr, data in predicted_mems.items():
+            conc_data = self.target.read_memory(addr, len(data))
+            if conc_data != data:
+                warn(f'Symbolic execution backend generated false equation for'
+                     f' [{hex(instruction.addr)}]: {instruction}: Predicted'
+                     f' mem[{hex(addr)}:{hex(addr+len(data))}] = {data},'
+                     f' but the concrete state has value'
+                     f' mem[{hex(addr)}:{hex(addr+len(data))}] = {conc_data}.'
+                     f'\nFaulty transformation: {transform}')
+                raise Exception()
+
+    def progress(self, new_pc: int, instruction: Instruction) -> int | None:
+        self.target.speculate(new_pc)
+        if self.target.arch.is_instr_syscall(str(instruction)):
+            self.target.progress_execution()
+            if self.target.is_exited():
+                return None
+        return self.target.read_pc()
 
     def trace(self, start_addr: int | None = None) -> Trace[SymbolicTransform]:
         """Execute a program and compute state transformations between executed
@@ -701,21 +758,20 @@ class SymbolicTracer:
         :param start_addr: Address from which to start tracing.
         """
         # Set up concrete reference state
-        target = self.create_debug_target()
         if start_addr is not None:
-            target.run_until(start_addr)
-        lldb_state = _LLDBConcreteState(target)
+            self.target.run_until(start_addr)
 
-        ctx = DisassemblyContext(lldb_state)
+        ctx = DisassemblyContext(self.target)
         arch = ctx.arch
 
         # Trace concolically
         strace: list[SymbolicTransform] = []
-        while not target.is_exited():
-            pc = target.read_register('pc')
+        while not self.target.is_exited():
+            pc = self.target.read_pc()
+            assert(pc != 0)
 
             # Disassemble instruction at the current PC
-            tid = target.get_current_tid()
+            tid = self.target.get_current_tid()
             try:
                 instruction = ctx.disassemble(pc)
                 info(f'[{tid}] Disassembled instruction {instruction} at {hex(pc)}')
@@ -724,9 +780,9 @@ class SymbolicTracer:
 
                 # Try to recovery by using the LLDB disassembly instead
                 try:
-                    alt_disas = target.get_disassembly(pc)
+                    alt_disas = self.target.get_disassembly(pc)
                     instruction = Instruction.from_string(alt_disas, ctx.arch, pc,
-                                                         target.get_instruction_size(pc))
+                                                         self.target.get_instruction_size(pc))
                     info(f'[{tid}] Disassembled instruction {instruction} at {hex(pc)}')
                 except:
                     if self.force:
@@ -736,38 +792,32 @@ class SymbolicTracer:
                         else:
                             warn(f'[{tid}] Unable to disassemble instruction {hex(pc)}: {err}.'
                                  f' Skipping.')
-                        target.step()
+                        self.target.step()
                         continue
                     raise # forward exception
 
             # Run instruction
-            conc_state = MiasmSymbolResolver(lldb_state, ctx.loc_db)
-
-            try:
-                new_pc, modified = run_instruction(instruction.instr, conc_state, ctx.lifter)
-            except:
-                if not self.force:
-                    raise
-                new_pc, modified = None, {}
-
-            # Create symbolic transform
-            if new_pc is None:
-                new_pc = pc + instruction.length
+            conc_state = MiasmSymbolResolver(self.target, ctx.loc_db)
+
+            # TODO: handle None
+            new_pc, modified = run_instruction(instruction.instr, conc_state, ctx.lifter)
+            new_pc = int(new_pc)
+
+            if self.cross_validate:
+                # Predict next concrete state.
+                # We verify the symbolic execution backend on the fly for some
+                # additional protection from bugs in the backend.
+                transform = SymbolicTransform(modified, [instruction], arch, pc, new_pc)
+                pred_regs, pred_mems = self.predict_next_state(instruction, transform)
+                self.progress(new_pc, instruction)
+                self.validate(instruction, transform, pred_regs, pred_mems)
             else:
-                new_pc = int(new_pc)
-            transform = SymbolicTransform(modified, [instruction], arch, pc, new_pc)
+                new_pc = self.progress(new_pc, instruction)
+                if new_pc is None:
+                    continue # we're done
+                transform = SymbolicTransform(modified, [instruction], arch, pc, new_pc)
+
             strace.append(transform)
 
-            if len(strace) == 0:
-                msg = f'Unable to collect trace for instruction {instruction}'
-                if not self.force:
-                    raise Exception(msg)
-                else:
-                    warn(msg)
-
-            # Predict next concrete state.
-            # We verify the symbolic execution backend on the fly for some
-            # additional protection from bugs in the backend.
-            if not self.step_to_next(target, instruction.instr, transform, lldb_state):
-                return Trace(strace, self.env)
+        return Trace(strace, self.env)