about summary refs log tree commit diff stats
diff options
context:
space:
mode:
authorTheofilos Augoustis <theofilos.augoustis@gmail.com>2025-10-21 15:53:58 +0000
committerTheofilos Augoustis <theofilos.augoustis@gmail.com>2025-10-30 17:22:33 +0000
commit72c2ba26066b8641c3cd22e29a12a96c6cb99aa7 (patch)
tree80f6e5632502a2391fa44cf512b7de1201a5f5bb
parent42284a61cfff656e48b82ed23ef1db39eec687f6 (diff)
downloadfocaccia-72c2ba26066b8641c3cd22e29a12a96c6cb99aa7.tar.gz
focaccia-72c2ba26066b8641c3cd22e29a12a96c6cb99aa7.zip
Refactor native tracing to facilitate remote tracing
-rw-r--r--src/focaccia/lldb_target.py8
-rw-r--r--src/focaccia/symbolic.py174
-rwxr-xr-xsrc/focaccia/tools/capture_transforms.py26
-rw-r--r--src/focaccia/trace.py8
4 files changed, 134 insertions, 82 deletions
diff --git a/src/focaccia/lldb_target.py b/src/focaccia/lldb_target.py
index a6f7d6d..6f0011f 100644
--- a/src/focaccia/lldb_target.py
+++ b/src/focaccia/lldb_target.py
@@ -77,6 +77,14 @@ class LLDBConcreteTarget:
             raise NotImplementedError(err)
         return archname
 
+    def determine_name(self) -> str:
+        return self.process.GetProcessInfo().GetName()
+
+    def determine_arguments(self):
+        launch_info = self.target.GetLaunchInfo()
+        argc = self.target.GetLaunchInfo().GetNumArguments()
+        return [launch_info.GetArgumentAtIndex(i) for i in range(argc)]
+
     def is_exited(self):
         """Signals whether the concrete process has exited.
 
diff --git a/src/focaccia/symbolic.py b/src/focaccia/symbolic.py
index 9969619..f854392 100644
--- a/src/focaccia/symbolic.py
+++ b/src/focaccia/symbolic.py
@@ -25,6 +25,8 @@ from .snapshot import ProgramState, ReadableProgramState, \
 from .trace import Trace, TraceEnvironment
 
 logger = logging.getLogger('focaccia-symbolic')
+debug = logger.debug
+info = logger.info
 warn = logger.warn
 
 # Disable Miasm's disassembly logger
@@ -616,84 +618,54 @@ class _LLDBConcreteState(ReadableProgramState):
         except ConcreteMemoryError:
             raise MemoryAccessError(addr, size, 'Unable to read memory from LLDB.')
 
-def collect_symbolic_trace(env: TraceEnvironment,
-                           start_addr: int | None = None,
-                           remote: str | None = None,
-                           ) -> Trace[SymbolicTransform]:
-    """Execute a program and compute state transformations between executed
-    instructions.
-
-    :param binary: The binary to trace.
-    :param args:   Arguments to the program.
-    """
-    binary = env.binary_name
-
-    # Set up concrete reference state
-    target = None
-    if remote:
-        target = LLDBRemoteTarget(remote)
-    else:
-        target = LLDBLocalTarget(binary, env.argv, env.envp)
-
-    if start_addr is not None:
-        target.run_until(start_addr)
-    lldb_state = _LLDBConcreteState(target)
-
-    ctx = DisassemblyContext(lldb_state)
-    arch = ctx.arch
-
-    # Trace concolically
-    strace: list[SymbolicTransform] = []
-    while not target.is_exited():
-        pc = target.read_register('pc')
-
-        # Disassemble instruction at the current PC
-        try:
-            instr = ctx.mdis.dis_instr(pc)
-            print(f'Disassembled instruction {instr} at {hex(pc)}')
-        except:
-            err = sys.exc_info()[1]
-
-            # Try to get the LLDB disassembly instead to simplify debugging
-            try:
-                alt_disas = target.get_disassembly(pc)
-            except:
-                warn(f'Unable to disassemble instruction at {hex(pc)}: {err}.'
-                     f' Skipping.')
-            warn(f'Unable to disassemble instruction {alt_disas} at {hex(pc)}: {err}.'
-                 f' Skipping.')
-            target.step()
-            continue
-
-        # Run instruction
-        conc_state = MiasmSymbolResolver(lldb_state, ctx.loc_db)
-        new_pc, modified = run_instruction(instr, conc_state, ctx.lifter)
-
-        # Create symbolic transform
-        instruction = Instruction(instr, ctx.machine, ctx.arch, ctx.loc_db)
-        if new_pc is None:
-            new_pc = pc + instruction.length
-        else:
-            new_pc = int(new_pc)
-        transform = SymbolicTransform(modified, [instruction], arch, pc, new_pc)
-        strace.append(transform)
-
-        # Predict next concrete state.
-        # We verify the symbolic execution backend on the fly for some
-        # additional protection from bugs in the backend.
-        if env.cross_validate:
+class SymbolicTracer:
+    def __init__(self, 
+                 env: TraceEnvironment, 
+                 remote: str | None=None,
+                 force: bool=False,
+                 cross_validate: bool=False):
+        self.env = env
+        self.force = force
+        self.remote = remote
+        self.cross_validate = cross_validate
+
+    def create_debug_target(self) -> LLDBConcreteTarget:
+        binary = self.env.binary_name
+        if self.remote is False:
+            debug(f'Launching local debug target {binary} {self.env.argv}')
+            debug(f'Environment: {self.env}')
+            return LLDBLocalTarget(binary, self.env.argv, self.env.envp)
+
+        debug(f'Connecting to remote debug target {self.remote}')
+        target = LLDBRemoteTarget(self.remote)
+
+        module_name = target.determine_name()
+        if binary is None:
+            binary, self.env.binary_name = module_name, module_name
+        if binary != module_name:
+            warn(f'Discovered binary name {module_name} differs from specified name {binary}')
+
+        binary_args = target.determine_arguments()
+        if binary_args != self.env.argv:
+            warn(f'Discovered program arguments {binary_args} differ from those specified {self.env.argv}')
+
+        return target
+
+    def step_to_next(self, target, instruction, transform, lldb_state) -> bool:
+        if self.cross_validate:
+            debug(f'Evaluating register and memory transforms for {instruction} to cross-validate')
             predicted_regs = transform.eval_register_transforms(lldb_state)
             predicted_mems = transform.eval_memory_transforms(lldb_state)
 
         # Step forward
         target.step()
         if target.is_exited():
-            break
+            return False
 
         # Verify last generated transform by comparing concrete state against
         # predicted values.
-        assert(len(strace) > 0)
-        if env.cross_validate:
+        if self.cross_validate:
+            debug('Cross-validating symbolic transforms by comparing actual to predicted values')
             for reg, val in predicted_regs.items():
                 conc_val = lldb_state.read_register(reg)
                 if conc_val != val:
@@ -712,6 +684,68 @@ def collect_symbolic_trace(env: TraceEnvironment,
                          f' mem[{hex(addr)}:{hex(addr+len(data))}] = {conc_data}.'
                          f'\nFaulty transformation: {transform}')
                     raise Exception()
+        return True
+
+    def trace(self,
+              start_addr: int | None = None) -> Trace[SymbolicTransform]:
+        """Execute a program and compute state transformations between executed
+        instructions.
 
-    return Trace(strace, env)
+        :param start_addr: Address from which to start tracing.
+        """
+        # Set up concrete reference state
+        target = self.create_debug_target()
+        if start_addr is not None:
+            target.run_until(start_addr)
+        lldb_state = _LLDBConcreteState(target)
+
+        ctx = DisassemblyContext(lldb_state)
+        arch = ctx.arch
+
+        # Trace concolically
+        strace: list[SymbolicTransform] = []
+        while not target.is_exited():
+            pc = target.read_register('pc')
+
+            # Disassemble instruction at the current PC
+            try:
+                instr = ctx.mdis.dis_instr(pc)
+                info(f'Disassembled instruction {instr} at {hex(pc)}')
+            except:
+                err = sys.exc_info()[1]
+
+                # Try to get the LLDB disassembly instead to simplify debugging
+                try:
+                    alt_disas = target.get_disassembly(pc)
+                except:
+                    warn(f'Unable to disassemble instruction at {hex(pc)}: {err}.'
+                         f' Skipping.')
+                    continue
+
+                warn(f'Unable to disassemble instruction {alt_disas} at {hex(pc)}: {err}.'
+                     f' Skipping.')
+                target.step()
+                continue
+
+            # Run instruction
+            conc_state = MiasmSymbolResolver(lldb_state, ctx.loc_db)
+            new_pc, modified = run_instruction(instr, conc_state, ctx.lifter)
+
+            # Create symbolic transform
+            instruction = Instruction(instr, ctx.machine, ctx.arch, ctx.loc_db)
+            if new_pc is None:
+                new_pc = pc + instruction.length
+            else:
+                new_pc = int(new_pc)
+            transform = SymbolicTransform(modified, [instruction], arch, pc, new_pc)
+            strace.append(transform)
+
+            if len(strace) == 0:
+                raise Exception(f'Unable to collect trace for instruction {instr}')
+
+            # Predict next concrete state.
+            # We verify the symbolic execution backend on the fly for some
+            # additional protection from bugs in the backend.
+            if not self.step_to_next(target, instr, transform, lldb_state):
+                return Trace(strace, self.env)
 
diff --git a/src/focaccia/tools/capture_transforms.py b/src/focaccia/tools/capture_transforms.py
index da34e0f..fa8c92a 100755
--- a/src/focaccia/tools/capture_transforms.py
+++ b/src/focaccia/tools/capture_transforms.py
@@ -1,9 +1,10 @@
 #!/usr/bin/env python3
 
 import argparse
+import logging
 
 from focaccia import parser, utils
-from focaccia.symbolic import collect_symbolic_trace
+from focaccia.symbolic import SymbolicTracer
 from focaccia.trace import TraceEnvironment
 
 def main():
@@ -23,12 +24,27 @@ def main():
     prog.add_argument('-r', '--remote',
                       default=False,
                       help='Remote target to trace (e.g. 127.0.0.1:12345)')
+    prog.add_argument('--log-level',
+                      help='Set the logging level')
+    prog.add_argument('--debug',
+                      default=False,
+                      action='store_true',
+                      help='Capture transforms in debug mode to identify errors in Focaccia itself')
     args = prog.parse_args()
 
-    env = TraceEnvironment(args.binary, args.args, args.cross_validate, utils.get_envp())
-    trace = collect_symbolic_trace(env, None, remote=args.remote)
+    if args.debug:
+        logging.basicConfig(level=logging.DEBUG) # will be override by --log-level
+
+    # Set default logging level
+    if args.log_level:
+        level = getattr(logging, args.log_level.upper(), logging.INFO)
+        logging.basicConfig(level=level, force=True)
+    else:
+        logging.basicConfig(level=logging.INFO)
+
+    env = TraceEnvironment(args.binary, args.args, utils.get_envp())
+    tracer = SymbolicTracer(env, remote=args.remote, cross_validate=args.cross_validate)
+    trace = tracer.trace()
     with open(args.output, 'w') as file:
         parser.serialize_transformations(trace, file)
 
-if __name__ == "__main__":
-    main()
diff --git a/src/focaccia/trace.py b/src/focaccia/trace.py
index c72d90f..829b03f 100644
--- a/src/focaccia/trace.py
+++ b/src/focaccia/trace.py
@@ -10,14 +10,12 @@ class TraceEnvironment:
     def __init__(self,
                  binary: str,
                  argv: list[str],
-                 cross_validate: bool,
                  envp: list[str],
                  binary_hash: str | None = None):
         self.argv = argv
         self.envp = envp
         self.binary_name = binary
-        self.cross_validate = cross_validate
-        if binary_hash is None:
+        if binary_hash is None and self.binary_name is not None:
             self.binary_hash = file_hash(binary)
         else:
             self.binary_hash = binary_hash
@@ -28,7 +26,6 @@ class TraceEnvironment:
         return cls(
             json['binary_name'],
             json['argv'],
-            json['cross_validate'],
             json['envp'],
             json['binary_hash'],
         )
@@ -39,7 +36,6 @@ class TraceEnvironment:
             'binary_name': self.binary_name,
             'binary_hash': self.binary_hash,
             'argv': self.argv,
-            'cross_validate': self.cross_validate,
             'envp': self.envp,
         }
 
@@ -50,13 +46,11 @@ class TraceEnvironment:
         return self.binary_name == other.binary_name \
             and self.binary_hash == other.binary_hash \
             and self.argv == other.argv \
-            and self.cross_validate == other.cross_validate \
             and self.envp == other.envp
 
     def __repr__(self) -> str:
         return f'{self.binary_name} {" ".join(self.argv)}' \
                f'\n   bin-hash={self.binary_hash}' \
-               f'\n   options=cross-validate' \
                f'\n   envp={repr(self.envp)}'
 
 class Trace(Generic[T]):