diff options
| -rw-r--r-- | tools/_qemu_tool.py | 251 | ||||
| -rw-r--r-- | tools/qemu_tool.py | 125 | ||||
| -rw-r--r-- | tools/verify_qemu.py | 47 |
3 files changed, 288 insertions, 135 deletions
diff --git a/tools/_qemu_tool.py b/tools/_qemu_tool.py new file mode 100644 index 0000000..ff7dc38 --- /dev/null +++ b/tools/_qemu_tool.py @@ -0,0 +1,251 @@ +"""Invocable like this: + + gdb -n --batch -x qemu_tool.py + +But please use `tools/verify_qemu.py` instead because we have some more setup +work to do. +""" + +import gdb +import platform +from typing import Iterable + +import focaccia.parser as parser +from focaccia.arch import supported_architectures, Arch +from focaccia.compare import compare_symbolic +from focaccia.snapshot import ProgramState, ReadableProgramState, \ + RegisterAccessError, MemoryAccessError +from focaccia.symbolic import SymbolicTransform, eval_symbol, ExprMem +from focaccia.utils import print_result + +from verify_qemu import make_argparser, verbosity + +class GDBProgramState(ReadableProgramState): + def __init__(self, process: gdb.Inferior, frame: gdb.Frame): + self._proc = process + self._frame = frame + + def read_register(self, reg: str) -> int: + try: + val = self._frame.read_register(reg.lower()) + return int(val) & 0xffffffffffffffff # force int to be unsigned + except ValueError as err: + from focaccia.arch import x86 + rflags = int(self._frame.read_register('eflags')) + rflags = x86.decompose_rflags(rflags) + if reg in rflags: + return rflags[reg] + raise RegisterAccessError(reg, str(err)) + + def read_memory(self, addr: int, size: int) -> bytes: + try: + mem = self._proc.read_memory(addr, size).tobytes() + return bytes(reversed(mem)) # Convert to big endian + except gdb.MemoryError as err: + raise MemoryAccessError(addr, size, str(err)) + +class GDBServerStateIterator: + def __init__(self, address: str, port: int): + gdb.execute('set pagination 0') + gdb.execute('set sysroot') + gdb.execute(f'target remote {address}:{port}') + self._process = gdb.selected_inferior() + self._first_next = True + + # Try to determine the guest architecture. This is a bit hacky and + # tailored to GDB's naming for the x86-64 architecture. + split = self._process.architecture().name().split(':') + archname = split[1] if len(split) > 1 else split[0] + archname = archname.replace('-', '_') + if archname not in supported_architectures: + print(f'Error: Current platform ({archname}) is not' + f' supported by Focaccia. Exiting.') + exit(1) + + self.arch = supported_architectures[archname] + + def __iter__(self): + return self + + def __next__(self): + # The first call to __next__ should yield the first program state, + # i.e. before stepping the first time + if self._first_next: + self._first_next = False + return GDBProgramState(self._process, gdb.selected_frame()) + + # Step + pc = gdb.selected_frame().read_register('pc') + new_pc = pc + while pc == new_pc: # Skip instruction chains from REP STOS etc. + gdb.execute('si', to_string=True) + if not self._process.is_valid() or len(self._process.threads()) == 0: + raise StopIteration + new_pc = gdb.selected_frame().read_register('pc') + + return GDBProgramState(self._process, gdb.selected_frame()) + +def collect_conc_trace(gdb: GDBServerStateIterator, \ + strace: list[SymbolicTransform]) \ + -> tuple[list[ProgramState], list[SymbolicTransform]]: + """Collect a trace of concrete states from GDB. + + Records minimal concrete states from GDB by using symbolic trace + information to determine which register/memory values are required to + verify the correctness of the program running in GDB. + + May drop symbolic transformations if the symbolic trace and the GDB trace + diverge (e.g. because of differences in environment, etc.). Returns the + new, possibly modified, symbolic trace that matches the returned concrete + trace. + + :return: A list of concrete states and a list of corresponding symbolic + transformations. The lists are guaranteed to have the same length. + """ + def record_snapshot(prev_state: ReadableProgramState, + cur_state: GDBProgramState, + prev_transform: SymbolicTransform, + cur_transform: SymbolicTransform) \ + -> ProgramState: + """Record a minimal snapshot. + + A minimal snapshot must include values (registers and memory) that are + accessed by two transformations: + 1. The values produced by the previous transformation (the + transformation that is producing this snapshot) to check these + values against expected values calculated from the previous + program state. + 2. The values that act as inputs to the transformation acting on this + snapshot, to calculate the expected values of the next snapshot. + + :param prev_transform: The symbolic transformation generating, or + leading to, `gdb_state`. Values generated by + this transformation are included in the + snapshot. + :param transform: The symbolic transformation operating on this + snapshot. Input values to this transformation are + included in the snapshot. + """ + assert(cur_state.read_register('pc') == cur_transform.addr) + + def get_written_addresses(t: SymbolicTransform): + """Get all output memory accesses of a symbolic transformation.""" + return [ExprMem(a, v.size) for a, v in t.changed_mem.items()] + + def set_values(regs: Iterable[str], mems: Iterable[ExprMem], + cur_state: GDBProgramState, prev_state: ReadableProgramState, + out_state: ProgramState): + """ + :param prev_state: Addresses of memory included in the snapshot are + resolved relative to this state. + """ + for regname in regs: + regval = cur_state.read_register(regname) + try: + out_state.set_register(regname, regval) + except RegisterAccessError: + pass + for mem in mems: + assert(mem.size % 8 == 0) + addr = eval_symbol(mem.ptr, prev_state) + try: + mem = cur_state.read_memory(addr, int(mem.size / 8)) + out_state.write_memory(addr, mem) + except MemoryAccessError: + pass + + state = ProgramState(gdb.arch) + state.set_register('PC', cur_transform.addr) + + set_values(prev_transform.changed_regs.keys(), + get_written_addresses(prev_transform), + cur_state, + prev_state, # Evaluate memory addresses based on previous + # state because they are that state's output + # addresses. + state) + set_values(cur_transform.get_used_registers(), + cur_transform.get_used_memory_addresses(), + cur_state, + cur_state, + state) + return state + + def find_index(seq, target, access=lambda el: el): + for i, el in enumerate(seq): + if access(el) == target: + return i + return None + + if not strace: + return [], [] + + states = [] + matched_transforms = [] + + state_iter = iter(gdb) + cur_state = next(state_iter) + symb_i = 0 + while True: + try: + pc = cur_state.read_register('pc') + assert(pc is not None) + + while pc != strace[symb_i].addr: + next_i = find_index(strace[symb_i+1:], pc, lambda t: t.addr) + + # Drop the concrete state if no address in the symbolic trace + # matches + if next_i is None: + print(f'Warning: Dropping concrete state {hex(pc)}, as no' + f' matching instruction can be found in the symbolic' + f' reference trace.') + cur_state = next(state_iter) + pc = cur_state.read_register('pc') + assert(pc is not None) + continue + + # Otherwise, jump to the next matching symbolic state + symb_i += next_i + 1 + + assert(cur_state.read_register('pc') == strace[symb_i].addr) + states.append(record_snapshot( + states[-1] if states else cur_state, + cur_state, + matched_transforms[-1] if matched_transforms else strace[symb_i], + strace[symb_i])) + matched_transforms.append(strace[symb_i]) + cur_state = next(state_iter) + symb_i += 1 + except StopIteration: + break + + return states, matched_transforms + +def main(): + args = make_argparser().parse_args() + + gdbserver_addr = 'localhost' + gdbserver_port = args.port + + # Read pre-computed symbolic trace + with open(args.symb_trace, 'r') as strace: + symb_transforms = parser.parse_transformations(strace) + + # Use symbolic trace to collect concrete trace from QEMU + conc_states, matched_transforms = collect_conc_trace( + GDBServerStateIterator(gdbserver_addr, gdbserver_port), + symb_transforms) + + # Verify and print result + if not args.quiet: + res = compare_symbolic(conc_states, matched_transforms) + print_result(res, verbosity[args.error_level]) + + if args.output: + from focaccia.parser import serialize_snapshots + with open(args.output, 'w') as file: + serialize_snapshots(conc_states, file) + +if __name__ == "__main__": + main() diff --git a/tools/qemu_tool.py b/tools/qemu_tool.py deleted file mode 100644 index d670692..0000000 --- a/tools/qemu_tool.py +++ /dev/null @@ -1,125 +0,0 @@ -"""Invocable like this: - - gdb -n --batch -x qemu_tool.py -""" - -import gdb -import platform - -import focaccia.parser as parser -from focaccia.arch import supported_architectures, Arch -from focaccia.compare import compare_symbolic, ErrorTypes -from focaccia.snapshot import ProgramState -from focaccia.symbolic import SymbolicTransform, eval_symbol -from focaccia.utils import print_result - -from verify_qemu import make_argparser - -class GDBProgramState: - def __init__(self, process: gdb.Inferior, frame: gdb.Frame): - self._proc = process - self._frame = frame - - def read_register(self, regname: str) -> int | None: - try: - return int(self._frame.read_register(regname.lower())) - except ValueError as err: - from focaccia.arch import x86 - rflags = int(self._frame.read_register('eflags')) - rflags = x86.decompose_rflags(rflags) - if regname in rflags: - return rflags[regname] - - print(f'{regname}: {err}') - return None - - def read_memory(self, addr: int, size: int) -> bytes | None: - try: - return self._proc.read_memory(addr, size).tobytes() - except gdb.MemoryError as err: - print(f'@{size}[{hex(addr)}]: {err}') - return None - -class GDBServerStateIterator: - def __init__(self, address: str, port: int): - gdb.execute('set pagination 0') - gdb.execute('set sysroot') - gdb.execute(f'target remote {address}:{port}') - self._process = gdb.selected_inferior() - self._first_next = True - - def __iter__(self): - return self - - def __next__(self): - # The first call to __next__ should yield the first program state, - # i.e. before stepping the first time - if self._first_next: - self._first_next = False - return GDBProgramState(self._process, gdb.selected_frame()) - - # Step - pc = gdb.selected_frame().read_register('pc') - new_pc = pc - while pc == new_pc: - gdb.execute('si', to_string=True) - if not self._process.is_valid() or len(self._process.threads()) == 0: - raise StopIteration - new_pc = gdb.selected_frame().read_register('pc') - - return GDBProgramState(self._process, gdb.selected_frame()) - -def collect_conc_trace(arch: Arch, \ - gdb: GDBServerStateIterator, \ - strace: list[SymbolicTransform]) \ - -> list[ProgramState]: - states = [] - for qemu, transform in zip(gdb, strace): - qemu_pc = qemu.read_register('pc') - assert(qemu_pc is not None) - - if qemu_pc != transform.addr: - print(f'Fatal error: QEMU\'s program counter' - f' ({hex(qemu_pc)}) does not match the' - f' expected program counter in the symbolic trace' - f' ({hex(transform.addr)}).') - print(f'Processing only partial trace up to this instruction.') - return states - - state = ProgramState(arch) - state.set_register('PC', transform.addr) - - accessed_regs = transform.get_used_registers() - accessed_mems = transform.get_used_memory_addresses() - for regname in accessed_regs: - regval = qemu.read_register(regname) - if regval is not None: - state.set_register(regname, regval) - for mem in accessed_mems: - assert(mem.size % 8 == 0) - addr = eval_symbol(mem.ptr, qemu) - mem = qemu.read_memory(addr, int(mem.size / 8)) - if mem is not None: - state.write_memory(addr, mem) - states.append(state) - - return states - -def main(): - args = make_argparser().parse_args() - - gdbserver_port = args.gdbserver_port - with open(args.symb_trace, 'r') as strace: - symb_transforms = parser.parse_transformations(strace) - - arch = supported_architectures[platform.machine()] - conc_states = collect_conc_trace( - arch, - GDBServerStateIterator('localhost', gdbserver_port), - symb_transforms) - - res = compare_symbolic(conc_states, symb_transforms) - print_result(res, ErrorTypes.POSSIBLE) - -if __name__ == "__main__": - main() diff --git a/tools/verify_qemu.py b/tools/verify_qemu.py index 98437cb..da2e985 100644 --- a/tools/verify_qemu.py +++ b/tools/verify_qemu.py @@ -17,15 +17,46 @@ import os import subprocess import sys +from focaccia.compare import ErrorTypes + +verbosity = { + 'info': ErrorTypes.INFO, + 'warning': ErrorTypes.POSSIBLE, + 'error': ErrorTypes.CONFIRMED, +} + def make_argparser(): """This is also used by the GDB-invoked script to parse its args.""" prog = argparse.ArgumentParser() + prog.description = """Use Focaccia to test QEMU. + +Uses QEMU's GDB-server feature to read QEMU's emulated state and test its +transformation during emulation against a symbolic truth. + +In fact, this tool could be used to test any emulator that provides a +GDB-server interface. The server must support reading registers, reading +memory, and stepping forward by single instructions. + +The GDB server is assumed to be at 'localhost'. +""" + prog.add_argument('port', + type=int, + help='The port at which QEMU\'s GDB server resides.') prog.add_argument('--symb-trace', required=True, - help='A symbolic transformation trace to be used for' \ - ' verification.') - prog.add_argument('--output', '-o', help='Name of output file.') - prog.add_argument('gdbserver_port', type=int) + help='A pre-computed symbolic transformation trace to' \ + ' be used for verification. Generate this with' \ + ' the `tools/capture_transforms.py` tool.') + prog.add_argument('-q', '--quiet', + default=False, + action='store_true', + help='Don\'t print a verification result.') + prog.add_argument('-o', '--output', + help='If specified with a file name, the recorded trace' + ' of QEMU states will be written to that file.') + prog.add_argument('--error-level', + default='warning', + choices=list(verbosity.keys())) return prog def quoted(s: str) -> str: @@ -41,20 +72,16 @@ if __name__ == "__main__": prog = make_argparser() prog.add_argument('--gdb', default='/bin/gdb', help='GDB binary to invoke') - prog.add_argument('--quiet', '-q', action='store_true', - help='Suppress all output') args = prog.parse_args() filepath = os.path.realpath(__file__) - qemu_tool_path = os.path.join(os.path.dirname(filepath), 'qemu_tool.py') + qemu_tool_path = os.path.join(os.path.dirname(filepath), '_qemu_tool.py') # We have to remove all arguments we don't want to pass to the qemu tool # manually here. Not nice, but what can you do.. argv = sys.argv try_remove(argv, '--gdb') try_remove(argv, args.gdb) - try_remove(argv, '--quiet') - try_remove(argv, '-q') # Assemble the argv array passed to the qemu tool. GDB does not have a # mechanism to pass arguments to a script that it executes, so we @@ -65,7 +92,7 @@ if __name__ == "__main__": gdb_cmd = [ args.gdb, '-nx', # Don't parse any .gdbinits - '--batch-silent' if args.quiet else '--batch', + '--batch', '-ex', f'py import sys', '-ex', f'py sys.argv = {argv_str}', '-ex', f'py sys.path = {path_str}', |