diff options
| author | Theofilos Augoustis <theofilos.augoustis@gmail.com> | 2025-11-06 18:31:34 +0000 |
|---|---|---|
| committer | Theofilos Augoustis <theofilos.augoustis@gmail.com> | 2025-11-06 18:31:34 +0000 |
| commit | 9af40eaf852145e66c4a2f981e4897b8edaa07e5 (patch) | |
| tree | 5e1745162c9de218d178d1e6558488f875fc6d07 | |
| parent | 63f7658197adbe435b8e54394a754ba192dd8065 (diff) | |
| download | focaccia-9af40eaf852145e66c4a2f981e4897b8edaa07e5.tar.gz focaccia-9af40eaf852145e66c4a2f981e4897b8edaa07e5.zip | |
Implement start-stop mechanism for partial validation ta/partial-qemu-validation
| -rw-r--r-- | src/focaccia/symbolic.py | 14 | ||||
| -rw-r--r-- | src/focaccia/tools/_qemu_tool.py | 32 | ||||
| -rwxr-xr-x | src/focaccia/tools/capture_transforms.py | 9 | ||||
| -rw-r--r-- | src/focaccia/trace.py | 12 |
4 files changed, 45 insertions, 22 deletions
diff --git a/src/focaccia/symbolic.py b/src/focaccia/symbolic.py index b46d9e2..2a66a26 100644 --- a/src/focaccia/symbolic.py +++ b/src/focaccia/symbolic.py @@ -821,10 +821,7 @@ class SymbolicTracer: return None return self.target.read_pc() - def trace(self, - start_addr: int | None = None, - stop_addr: int | None = None, - time_limit: int | None = None) -> Trace[SymbolicTransform]: + def trace(self, time_limit: int | None = None) -> Trace[SymbolicTransform]: """Execute a program and compute state transformations between executed instructions. @@ -832,8 +829,8 @@ class SymbolicTracer: :param stop_addr: Address until which to trace. """ # Set up concrete reference state - if start_addr is not None: - self.target.run_until(start_addr) + if self.env.start_address is not None: + self.target.run_until(self.env.start_address) for i in range(len(self.nondet_events)): if self.nondet_events[i].pc == self.target.read_pc(): @@ -857,7 +854,7 @@ class SymbolicTracer: while not self.target.is_exited(): pc = self.target.read_pc() - if stop_addr is not None and pc == stop_addr: + if self.env.stop_address is not None and pc == self.env.stop_address: break assert(pc != 0) @@ -894,7 +891,8 @@ class SymbolicTracer: conc_state = MiasmSymbolResolver(self.target, ctx.loc_db) try: - new_pc, modified = timebound(time_limit, run_instruction, instruction.instr, conc_state, ctx.lifter) + new_pc, modified = timebound(time_limit, run_instruction, + instruction.instr, conc_state, ctx.lifter) except TimeoutError: warn(f'Running instruction {instruction} took longer than {time_limit} second. Skipping') new_pc, modified = None, {} diff --git a/src/focaccia/tools/_qemu_tool.py b/src/focaccia/tools/_qemu_tool.py index 2c7a1de..61fbb7c 100644 --- a/src/focaccia/tools/_qemu_tool.py +++ b/src/focaccia/tools/_qemu_tool.py @@ -226,7 +226,9 @@ def record_minimal_snapshot(prev_state: ReadableProgramState, return state def collect_conc_trace(gdb: GDBServerStateIterator, \ - strace: list[SymbolicTransform]) \ + strace: list[SymbolicTransform], + start_addr: int | None = None, + stop_addr: int | None = None) \ -> tuple[list[ProgramState], list[SymbolicTransform]]: """Collect a trace of concrete states from GDB. @@ -258,6 +260,15 @@ def collect_conc_trace(gdb: GDBServerStateIterator, \ cur_state = next(state_iter) symb_i = 0 + # Skip to start + pc = None + while start_addr: + pc = cur_state.read_register('pc') + if pc == start_addr: + info(f'Tracing QEMU from starting address: {start_addr}') + break + next(state_iter) + # An online trace matching algorithm. while True: try: @@ -293,6 +304,9 @@ def collect_conc_trace(gdb: GDBServerStateIterator, \ if symb_i >= len(strace): break except StopIteration: + if stop_addr and pc != stop_addr: + raise Exception(f'QEMU stopped at {hex(pc)} before reaching the stop address' + f' {hex(stop_addr)}') break except Exception as e: print(traceback.format_exc()) @@ -313,7 +327,7 @@ def main(): try: gdb_server = GDBServerStateIterator(args.remote) except Exception as e: - raise Exception(f'Unable to perform basic GDB setup: {e}') from None + raise Exception(f'Unable to perform basic GDB setup: {e}') try: executable: str | None = None @@ -326,22 +340,24 @@ def main(): envp = [] # Can't get the remote target's environment env = TraceEnvironment(executable, argv, envp, '?') except Exception as e: - raise Exception(f'Unable to create trace environment for executable {executable}: {e}') from None + raise Exception(f'Unable to create trace environment for executable {executable}: {e}') # Read pre-computed symbolic trace try: with open(args.symb_trace, 'r') as strace: symb_transforms = parser.parse_transformations(strace) except Exception as e: - raise Exception(f'Failed to parse state transformations from native trace: {e}') from None + raise Exception(f'Failed to parse state transformations from native trace: {e}') # Use symbolic trace to collect concrete trace from QEMU try: conc_states, matched_transforms = collect_conc_trace( gdb_server, - symb_transforms.states) + symb_transforms.states, + env.start_address, + env.stop_address) except Exception as e: - raise Exception(f'Failed to collect concolic trace from QEMU: {e}') from None + raise Exception(f'Failed to collect concolic trace from QEMU: {e}') # Verify and print result if not args.quiet: @@ -349,7 +365,7 @@ def main(): res = compare_symbolic(conc_states, matched_transforms) print_result(res, verbosity[args.error_level]) except Exception as e: - raise Exception('Error occured when comparing with symbolic equations: {e}') from None + raise Exception('Error occured when comparing with symbolic equations: {e}') if args.output: from focaccia.parser import serialize_snapshots @@ -357,7 +373,7 @@ def main(): with open(args.output, 'w') as file: serialize_snapshots(Trace(conc_states, env), file) except Exception as e: - raise Exception(f'Unable to serialize snapshots to file {args.output}: {e}') from None + raise Exception(f'Unable to serialize snapshots to file {args.output}: {e}') if __name__ == "__main__": main() diff --git a/src/focaccia/tools/capture_transforms.py b/src/focaccia/tools/capture_transforms.py index 10dfdb2..1208156 100755 --- a/src/focaccia/tools/capture_transforms.py +++ b/src/focaccia/tools/capture_transforms.py @@ -77,13 +77,14 @@ def main(): def mmaps(self): return [] detlog = NullDeterministicLog() - env = TraceEnvironment(args.binary, args.args, utils.get_envp(), nondeterminism_log=detlog) + env = TraceEnvironment(args.binary, args.args, utils.get_envp(), + nondeterminism_log=detlog, + start_address=args.start_address, + stop_address=args.stop_address) tracer = SymbolicTracer(env, remote=args.remote, cross_validate=args.debug, force=args.force) - trace = tracer.trace(start_addr=args.start_address, - stop_addr=args.stop_address, - time_limit=args.insn_time_limit) + trace = tracer.trace(time_limit=args.insn_time_limit) with open(args.output, 'w') as file: parser.serialize_transformations(trace, file) diff --git a/src/focaccia/trace.py b/src/focaccia/trace.py index 4a4509d..b488e6c 100644 --- a/src/focaccia/trace.py +++ b/src/focaccia/trace.py @@ -12,11 +12,15 @@ class TraceEnvironment: argv: list[str], envp: list[str], binary_hash: str | None = None, - nondeterminism_log = None): + nondeterminism_log = None, + start_address: int | None = None, + stop_address: int | None = None): self.argv = argv self.envp = envp self.binary_name = binary self.detlog = nondeterminism_log + self.start_address = start_address + self.stop_address = stop_address if binary_hash is None and self.binary_name is not None: self.binary_hash = file_hash(binary) else: @@ -30,6 +34,8 @@ class TraceEnvironment: json['argv'], json['envp'], json['binary_hash'], + json['start_address'], + json['stop_address'] ) def to_json(self) -> dict: @@ -39,6 +45,8 @@ class TraceEnvironment: 'binary_hash': self.binary_hash, 'argv': self.argv, 'envp': self.envp, + 'start_address': self.start_address, + 'stop_address': self.stop_address } def __eq__(self, other: object) -> bool: @@ -59,8 +67,8 @@ class Trace(Generic[T]): def __init__(self, trace_states: list[T], env: TraceEnvironment): - self.states = trace_states self.env = env + self.states = trace_states def __len__(self) -> int: return len(self.states) |