diff options
| author | Theofilos Augoustis <theofilos.augoustis@gmail.com> | 2025-10-21 15:53:58 +0000 |
|---|---|---|
| committer | Theofilos Augoustis <theofilos.augoustis@gmail.com> | 2025-11-06 17:20:13 +0000 |
| commit | f762c35148bb69cb9ddcea4e95022750e9367e52 (patch) | |
| tree | 3292fd5ef66ebe60daae20f6a4eb0fc7af0a964a | |
| parent | adb2b16164d0f7b049b97e4ac77ed3b866f56e0c (diff) | |
| download | focaccia-f762c35148bb69cb9ddcea4e95022750e9367e52.tar.gz focaccia-f762c35148bb69cb9ddcea4e95022750e9367e52.zip | |
Refactor native tracing to facilitate remote tracing
| -rw-r--r-- | src/focaccia/lldb_target.py | 8 | ||||
| -rw-r--r-- | src/focaccia/symbolic.py | 174 | ||||
| -rwxr-xr-x | src/focaccia/tools/capture_transforms.py | 26 | ||||
| -rw-r--r-- | src/focaccia/trace.py | 8 |
4 files changed, 134 insertions, 82 deletions
diff --git a/src/focaccia/lldb_target.py b/src/focaccia/lldb_target.py index a6f7d6d..6f0011f 100644 --- a/src/focaccia/lldb_target.py +++ b/src/focaccia/lldb_target.py @@ -77,6 +77,14 @@ class LLDBConcreteTarget: raise NotImplementedError(err) return archname + def determine_name(self) -> str: + return self.process.GetProcessInfo().GetName() + + def determine_arguments(self): + launch_info = self.target.GetLaunchInfo() + argc = self.target.GetLaunchInfo().GetNumArguments() + return [launch_info.GetArgumentAtIndex(i) for i in range(argc)] + def is_exited(self): """Signals whether the concrete process has exited. diff --git a/src/focaccia/symbolic.py b/src/focaccia/symbolic.py index aa6ab5e..4480d42 100644 --- a/src/focaccia/symbolic.py +++ b/src/focaccia/symbolic.py @@ -25,6 +25,8 @@ from .snapshot import ProgramState, ReadableProgramState, \ from .trace import Trace, TraceEnvironment logger = logging.getLogger('focaccia-symbolic') +debug = logger.debug +info = logger.info warn = logger.warn # Disable Miasm's disassembly logger @@ -618,84 +620,54 @@ class _LLDBConcreteState(ReadableProgramState): except ConcreteMemoryError: raise MemoryAccessError(addr, size, 'Unable to read memory from LLDB.') -def collect_symbolic_trace(env: TraceEnvironment, - start_addr: int | None = None, - remote: str | None = None, - ) -> Trace[SymbolicTransform]: - """Execute a program and compute state transformations between executed - instructions. - - :param binary: The binary to trace. - :param args: Arguments to the program. - """ - binary = env.binary_name - - # Set up concrete reference state - target = None - if remote: - target = LLDBRemoteTarget(remote) - else: - target = LLDBLocalTarget(binary, env.argv, env.envp) - - if start_addr is not None: - target.run_until(start_addr) - lldb_state = _LLDBConcreteState(target) - - ctx = DisassemblyContext(lldb_state) - arch = ctx.arch - - # Trace concolically - strace: list[SymbolicTransform] = [] - while not target.is_exited(): - pc = target.read_register('pc') - - # Disassemble instruction at the current PC - try: - instr = ctx.mdis.dis_instr(pc) - print(f'Disassembled instruction {instr} at {hex(pc)}') - except: - err = sys.exc_info()[1] - - # Try to get the LLDB disassembly instead to simplify debugging - try: - alt_disas = target.get_disassembly(pc) - except: - warn(f'Unable to disassemble instruction at {hex(pc)}: {err}.' - f' Skipping.') - warn(f'Unable to disassemble instruction {alt_disas} at {hex(pc)}: {err}.' - f' Skipping.') - target.step() - continue - - # Run instruction - conc_state = MiasmSymbolResolver(lldb_state, ctx.loc_db) - new_pc, modified = run_instruction(instr, conc_state, ctx.lifter) - - # Create symbolic transform - instruction = Instruction(instr, ctx.machine, ctx.arch, ctx.loc_db) - if new_pc is None: - new_pc = pc + instruction.length - else: - new_pc = int(new_pc) - transform = SymbolicTransform(modified, [instruction], arch, pc, new_pc) - strace.append(transform) - - # Predict next concrete state. - # We verify the symbolic execution backend on the fly for some - # additional protection from bugs in the backend. - if env.cross_validate: +class SymbolicTracer: + def __init__(self, + env: TraceEnvironment, + remote: str | None=None, + force: bool=False, + cross_validate: bool=False): + self.env = env + self.force = force + self.remote = remote + self.cross_validate = cross_validate + + def create_debug_target(self) -> LLDBConcreteTarget: + binary = self.env.binary_name + if self.remote is False: + debug(f'Launching local debug target {binary} {self.env.argv}') + debug(f'Environment: {self.env}') + return LLDBLocalTarget(binary, self.env.argv, self.env.envp) + + debug(f'Connecting to remote debug target {self.remote}') + target = LLDBRemoteTarget(self.remote) + + module_name = target.determine_name() + if binary is None: + binary, self.env.binary_name = module_name, module_name + if binary != module_name: + warn(f'Discovered binary name {module_name} differs from specified name {binary}') + + binary_args = target.determine_arguments() + if binary_args != self.env.argv: + warn(f'Discovered program arguments {binary_args} differ from those specified {self.env.argv}') + + return target + + def step_to_next(self, target, instruction, transform, lldb_state) -> bool: + if self.cross_validate: + debug(f'Evaluating register and memory transforms for {instruction} to cross-validate') predicted_regs = transform.eval_register_transforms(lldb_state) predicted_mems = transform.eval_memory_transforms(lldb_state) # Step forward target.step() if target.is_exited(): - break + return False # Verify last generated transform by comparing concrete state against # predicted values. - assert(len(strace) > 0) - if env.cross_validate: + if self.cross_validate: + debug('Cross-validating symbolic transforms by comparing actual to predicted values') for reg, val in predicted_regs.items(): conc_val = lldb_state.read_register(reg) if conc_val != val: @@ -714,6 +686,68 @@ def collect_symbolic_trace(env: TraceEnvironment, f' mem[{hex(addr)}:{hex(addr+len(data))}] = {conc_data}.' f'\nFaulty transformation: {transform}') raise Exception() + return True + + def trace(self, + start_addr: int | None = None) -> Trace[SymbolicTransform]: + """Execute a program and compute state transformations between executed + instructions. - return Trace(strace, env) + :param start_addr: Address from which to start tracing. + """ + # Set up concrete reference state + target = self.create_debug_target() + if start_addr is not None: + target.run_until(start_addr) + lldb_state = _LLDBConcreteState(target) + + ctx = DisassemblyContext(lldb_state) + arch = ctx.arch + + # Trace concolically + strace: list[SymbolicTransform] = [] + while not target.is_exited(): + pc = target.read_register('pc') + + # Disassemble instruction at the current PC + try: + instr = ctx.mdis.dis_instr(pc) + info(f'Disassembled instruction {instr} at {hex(pc)}') + except: + err = sys.exc_info()[1] + + # Try to get the LLDB disassembly instead to simplify debugging + try: + alt_disas = target.get_disassembly(pc) + except: + warn(f'Unable to disassemble instruction at {hex(pc)}: {err}.' + f' Skipping.') + continue + + warn(f'Unable to disassemble instruction {alt_disas} at {hex(pc)}: {err}.' + f' Skipping.') + target.step() + continue + + # Run instruction + conc_state = MiasmSymbolResolver(lldb_state, ctx.loc_db) + new_pc, modified = run_instruction(instr, conc_state, ctx.lifter) + + # Create symbolic transform + instruction = Instruction(instr, ctx.machine, ctx.arch, ctx.loc_db) + if new_pc is None: + new_pc = pc + instruction.length + else: + new_pc = int(new_pc) + transform = SymbolicTransform(modified, [instruction], arch, pc, new_pc) + strace.append(transform) + + if len(strace) == 0: + raise Exception(f'Unable to collect trace for instruction {instr}') + + # Predict next concrete state. + # We verify the symbolic execution backend on the fly for some + # additional protection from bugs in the backend. + if not self.step_to_next(target, instr, transform, lldb_state): + return Trace(strace, self.env) diff --git a/src/focaccia/tools/capture_transforms.py b/src/focaccia/tools/capture_transforms.py index da34e0f..fa8c92a 100755 --- a/src/focaccia/tools/capture_transforms.py +++ b/src/focaccia/tools/capture_transforms.py @@ -1,9 +1,10 @@ #!/usr/bin/env python3 import argparse +import logging from focaccia import parser, utils -from focaccia.symbolic import collect_symbolic_trace +from focaccia.symbolic import SymbolicTracer from focaccia.trace import TraceEnvironment def main(): @@ -23,12 +24,27 @@ def main(): prog.add_argument('-r', '--remote', default=False, help='Remote target to trace (e.g. 127.0.0.1:12345)') + prog.add_argument('--log-level', + help='Set the logging level') + prog.add_argument('--debug', + default=False, + action='store_true', + help='Capture transforms in debug mode to identify errors in Focaccia itself') args = prog.parse_args() - env = TraceEnvironment(args.binary, args.args, args.cross_validate, utils.get_envp()) - trace = collect_symbolic_trace(env, None, remote=args.remote) + if args.debug: + logging.basicConfig(level=logging.DEBUG) # will be override by --log-level + + # Set default logging level + if args.log_level: + level = getattr(logging, args.log_level.upper(), logging.INFO) + logging.basicConfig(level=level, force=True) + else: + logging.basicConfig(level=logging.INFO) + + env = TraceEnvironment(args.binary, args.args, utils.get_envp()) + tracer = SymbolicTracer(env, remote=args.remote, cross_validate=args.cross_validate) + trace = tracer.trace() with open(args.output, 'w') as file: parser.serialize_transformations(trace, file) -if __name__ == "__main__": - main() diff --git a/src/focaccia/trace.py b/src/focaccia/trace.py index c72d90f..829b03f 100644 --- a/src/focaccia/trace.py +++ b/src/focaccia/trace.py @@ -10,14 +10,12 @@ class TraceEnvironment: def __init__(self, binary: str, argv: list[str], - cross_validate: bool, envp: list[str], binary_hash: str | None = None): self.argv = argv self.envp = envp self.binary_name = binary - self.cross_validate = cross_validate - if binary_hash is None: + if binary_hash is None and self.binary_name is not None: self.binary_hash = file_hash(binary) else: self.binary_hash = binary_hash @@ -28,7 +26,6 @@ class TraceEnvironment: return cls( json['binary_name'], json['argv'], - json['cross_validate'], json['envp'], json['binary_hash'], ) @@ -39,7 +36,6 @@ class TraceEnvironment: 'binary_name': self.binary_name, 'binary_hash': self.binary_hash, 'argv': self.argv, - 'cross_validate': self.cross_validate, 'envp': self.envp, } @@ -50,13 +46,11 @@ class TraceEnvironment: return self.binary_name == other.binary_name \ and self.binary_hash == other.binary_hash \ and self.argv == other.argv \ - and self.cross_validate == other.cross_validate \ and self.envp == other.envp def __repr__(self) -> str: return f'{self.binary_name} {" ".join(self.argv)}' \ f'\n bin-hash={self.binary_hash}' \ - f'\n options=cross-validate' \ f'\n envp={repr(self.envp)}' class Trace(Generic[T]): |