diff options
Diffstat (limited to 'tools/_qemu_tool.py')
| -rw-r--r-- | tools/_qemu_tool.py | 158 |
1 files changed, 84 insertions, 74 deletions
diff --git a/tools/_qemu_tool.py b/tools/_qemu_tool.py index 9659f0e..38715dc 100644 --- a/tools/_qemu_tool.py +++ b/tools/_qemu_tool.py @@ -15,7 +15,8 @@ from focaccia.compare import compare_symbolic from focaccia.snapshot import ProgramState, ReadableProgramState, \ RegisterAccessError, MemoryAccessError from focaccia.symbolic import SymbolicTransform, eval_symbol, ExprMem -from focaccia.utils import print_result +from focaccia.trace import Trace, TraceEnvironment +from focaccia.utils import get_envp, print_result from verify_qemu import make_argparser, verbosity @@ -76,6 +77,7 @@ class GDBServerStateIterator: exit(1) self.arch = supported_architectures[archname] + self.binary = self._process.progspace.filename def __iter__(self): return self @@ -98,6 +100,77 @@ class GDBServerStateIterator: return GDBProgramState(self._process, gdb.selected_frame()) +def record_minimal_snapshot(prev_state: ReadableProgramState, + cur_state: ReadableProgramState, + prev_transform: SymbolicTransform, + cur_transform: SymbolicTransform) \ + -> ProgramState: + """Record a minimal snapshot. + + A minimal snapshot must include values (registers and memory) that are + accessed by two transformations: + 1. The values produced by the previous transformation (the + transformation that is producing this snapshot) to check these + values against expected values calculated from the previous + program state. + 2. The values that act as inputs to the transformation acting on this + snapshot, to calculate the expected values of the next snapshot. + + :param prev_transform: The symbolic transformation generating, or + leading to, `cur_state`. Values generated by + this transformation are included in the + snapshot. + :param transform: The symbolic transformation operating on this + snapshot. Input values to this transformation are + included in the snapshot. + """ + assert(cur_state.read_register('pc') == cur_transform.addr) + assert(prev_transform.arch == cur_transform.arch) + + def get_written_addresses(t: SymbolicTransform): + """Get all output memory accesses of a symbolic transformation.""" + return [ExprMem(a, v.size) for a, v in t.changed_mem.items()] + + def set_values(regs: Iterable[str], mems: Iterable[ExprMem], + cur_state: ReadableProgramState, + prev_state: ReadableProgramState, + out_state: ProgramState): + """ + :param prev_state: Addresses of memory included in the snapshot are + resolved relative to this state. + """ + for regname in regs: + regval = cur_state.read_register(regname) + try: + out_state.set_register(regname, regval) + except RegisterAccessError: + pass + for mem in mems: + assert(mem.size % 8 == 0) + addr = eval_symbol(mem.ptr, prev_state) + try: + mem = cur_state.read_memory(addr, int(mem.size / 8)) + out_state.write_memory(addr, mem) + except MemoryAccessError: + pass + + state = ProgramState(cur_transform.arch) + state.set_register('PC', cur_transform.addr) + + set_values(prev_transform.changed_regs.keys(), + get_written_addresses(prev_transform), + cur_state, + prev_state, # Evaluate memory addresses based on previous + # state because they are that state's output + # addresses. + state) + set_values(cur_transform.get_used_registers(), + cur_transform.get_used_memory_addresses(), + cur_state, + cur_state, + state) + return state + def collect_conc_trace(gdb: GDBServerStateIterator, \ strace: list[SymbolicTransform]) \ -> tuple[list[ProgramState], list[SymbolicTransform]]: @@ -115,75 +188,6 @@ def collect_conc_trace(gdb: GDBServerStateIterator, \ :return: A list of concrete states and a list of corresponding symbolic transformations. The lists are guaranteed to have the same length. """ - def record_snapshot(prev_state: ReadableProgramState, - cur_state: GDBProgramState, - prev_transform: SymbolicTransform, - cur_transform: SymbolicTransform) \ - -> ProgramState: - """Record a minimal snapshot. - - A minimal snapshot must include values (registers and memory) that are - accessed by two transformations: - 1. The values produced by the previous transformation (the - transformation that is producing this snapshot) to check these - values against expected values calculated from the previous - program state. - 2. The values that act as inputs to the transformation acting on this - snapshot, to calculate the expected values of the next snapshot. - - :param prev_transform: The symbolic transformation generating, or - leading to, `gdb_state`. Values generated by - this transformation are included in the - snapshot. - :param transform: The symbolic transformation operating on this - snapshot. Input values to this transformation are - included in the snapshot. - """ - assert(cur_state.read_register('pc') == cur_transform.addr) - - def get_written_addresses(t: SymbolicTransform): - """Get all output memory accesses of a symbolic transformation.""" - return [ExprMem(a, v.size) for a, v in t.changed_mem.items()] - - def set_values(regs: Iterable[str], mems: Iterable[ExprMem], - cur_state: GDBProgramState, prev_state: ReadableProgramState, - out_state: ProgramState): - """ - :param prev_state: Addresses of memory included in the snapshot are - resolved relative to this state. - """ - for regname in regs: - regval = cur_state.read_register(regname) - try: - out_state.set_register(regname, regval) - except RegisterAccessError: - pass - for mem in mems: - assert(mem.size % 8 == 0) - addr = eval_symbol(mem.ptr, prev_state) - try: - mem = cur_state.read_memory(addr, int(mem.size / 8)) - out_state.write_memory(addr, mem) - except MemoryAccessError: - pass - - state = ProgramState(gdb.arch) - state.set_register('PC', cur_transform.addr) - - set_values(prev_transform.changed_regs.keys(), - get_written_addresses(prev_transform), - cur_state, - prev_state, # Evaluate memory addresses based on previous - # state because they are that state's output - # addresses. - state) - set_values(cur_transform.get_used_registers(), - cur_transform.get_used_memory_addresses(), - cur_state, - cur_state, - state) - return state - def find_index(seq, target, access=lambda el: el): for i, el in enumerate(seq): if access(el) == target: @@ -222,7 +226,7 @@ def collect_conc_trace(gdb: GDBServerStateIterator, \ symb_i += next_i + 1 assert(cur_state.read_register('pc') == strace[symb_i].addr) - states.append(record_snapshot( + states.append(record_minimal_snapshot( states[-1] if states else cur_state, cur_state, matched_transforms[-1] if matched_transforms else strace[symb_i], @@ -240,6 +244,12 @@ def main(): gdbserver_addr = 'localhost' gdbserver_port = args.port + gdb_server = GDBServerStateIterator(gdbserver_addr, gdbserver_port) + + executable = gdb_server.binary + argv = [] # QEMU's GDB stub does not support 'info proc cmdline' + envp = [] # Can't get the remote target's environment + env = TraceEnvironment(executable, argv, envp, '?') # Read pre-computed symbolic trace with open(args.symb_trace, 'r') as strace: @@ -247,8 +257,8 @@ def main(): # Use symbolic trace to collect concrete trace from QEMU conc_states, matched_transforms = collect_conc_trace( - GDBServerStateIterator(gdbserver_addr, gdbserver_port), - symb_transforms) + gdb_server, + symb_transforms.states) # Verify and print result if not args.quiet: @@ -258,7 +268,7 @@ def main(): if args.output: from focaccia.parser import serialize_snapshots with open(args.output, 'w') as file: - serialize_snapshots(conc_states, file) + serialize_snapshots(Trace(conc_states, env), file) if __name__ == "__main__": main() |