From 35a90ec74bc2e0c74b848ac1bb70a05d779de973 Mon Sep 17 00:00:00 2001 From: Theofilos Augoustis Date: Thu, 13 Nov 2025 14:37:57 +0000 Subject: Move QEMU to its own backend directory --- src/focaccia/qemu/__init__.py | 0 src/focaccia/qemu/_qemu_tool.py | 414 ++++++++++++++++++++++++++++++++ src/focaccia/qemu/validation_server.py | 409 +++++++++++++++++++++++++++++++ src/focaccia/tools/_qemu_tool.py | 414 -------------------------------- src/focaccia/tools/validate_qemu.py | 5 +- src/focaccia/tools/validation_server.py | 409 ------------------------------- 6 files changed, 826 insertions(+), 825 deletions(-) create mode 100644 src/focaccia/qemu/__init__.py create mode 100644 src/focaccia/qemu/_qemu_tool.py create mode 100755 src/focaccia/qemu/validation_server.py delete mode 100644 src/focaccia/tools/_qemu_tool.py delete mode 100755 src/focaccia/tools/validation_server.py (limited to 'src') diff --git a/src/focaccia/qemu/__init__.py b/src/focaccia/qemu/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/focaccia/qemu/_qemu_tool.py b/src/focaccia/qemu/_qemu_tool.py new file mode 100644 index 0000000..93849bd --- /dev/null +++ b/src/focaccia/qemu/_qemu_tool.py @@ -0,0 +1,414 @@ +"""Invocable like this: + + gdb -n --batch -x qemu_tool.py + +But please use `tools/validate_qemu.py` instead because we have some more setup +work to do. +""" + +import gdb +import logging +import traceback +from typing import Iterable + +import focaccia.parser as parser +from focaccia.arch import supported_architectures, Arch +from focaccia.compare import compare_symbolic, Error, ErrorTypes +from focaccia.snapshot import ProgramState, ReadableProgramState, \ + RegisterAccessError, MemoryAccessError +from focaccia.symbolic import SymbolicTransform, eval_symbol, ExprMem +from focaccia.trace import Trace, TraceEnvironment +from focaccia.utils import print_result + +from focaccia.tools.validate_qemu import make_argparser, verbosity + +logger = logging.getLogger('focaccia-qemu-validator') +debug = logger.debug +info = logger.info +warn = logger.warning + +qemu_crash = { + "crashed": False, + "pc": None, + 'txl': None, + 'ref': None, + 'errors': [Error(ErrorTypes.CONFIRMED, "QEMU crashed")], + 'snap': None, +} + +class GDBProgramState(ReadableProgramState): + from focaccia.arch import aarch64, x86 + + flag_register_names = { + aarch64.archname: 'cpsr', + x86.archname: 'eflags', + } + + flag_register_decompose = { + aarch64.archname: aarch64.decompose_cpsr, + x86.archname: x86.decompose_rflags, + } + + def __init__(self, process: gdb.Inferior, frame: gdb.Frame, arch: Arch): + super().__init__(arch) + self._proc = process + self._frame = frame + + @staticmethod + def _read_vector_reg_aarch64(val: gdb.Value, size) -> int: + try: + return int(str(val['d']['u']), 10) + except: + try: + return int(str(val['u']), 10) + except: + return int(str(val['q']['u']), 10) + + @staticmethod + def _read_vector_reg_x86(val: gdb.Value, size) -> int: + num_longs = size // 64 + vals = val[f'v{num_longs}_int64'] + res = 0 + for i in range(num_longs): + val = int(vals[i].cast(gdb.lookup_type('unsigned long'))) + res += val << i * 64 + return res + + read_vector_reg = { + aarch64.archname: _read_vector_reg_aarch64, + x86.archname: _read_vector_reg_x86, + } + + def read_register(self, reg: str) -> int: + if reg == 'RFLAGS': + reg = 'EFLAGS' + + try: + val = self._frame.read_register(reg.lower()) + size = val.type.sizeof * 8 + + # For vector registers, we need to apply architecture-specific + # logic because GDB's interface is not consistent. + if size >= 128: # Value is a vector + if self.arch.archname not in self.read_vector_reg: + raise NotImplementedError( + f'Reading vector registers is not implemented for' + f' architecture {self.arch.archname}.') + return self.read_vector_reg[self.arch.archname](val, size) + elif size < 64: + return int(val.cast(gdb.lookup_type('unsigned int'))) + # For non-vector values, just return the 64-bit value + return int(val.cast(gdb.lookup_type('unsigned long'))) + except ValueError as err: + # Try to access the flags register with `reg` as a logical flag name + if self.arch.archname in self.flag_register_names: + flags_reg = self.flag_register_names[self.arch.archname] + flags = int(self._frame.read_register(flags_reg)) + flags = self.flag_register_decompose[self.arch.archname](flags) + if reg in flags: + return flags[reg] + raise RegisterAccessError(reg, + f'[GDB] Unable to access {reg}: {err}') + + def read_memory(self, addr: int, size: int) -> bytes: + try: + mem = self._proc.read_memory(addr, size).tobytes() + if self.arch.endianness == 'little': + return mem + else: + return bytes(reversed(mem)) # Convert to big endian + except gdb.MemoryError as err: + raise MemoryAccessError(addr, size, str(err)) + +class GDBServerStateIterator: + def __init__(self, remote: str): + gdb.execute('set pagination 0') + gdb.execute('set sysroot') + gdb.execute('set python print-stack full') # enable complete Python tracebacks + gdb.execute(f'target remote {remote}') + self._process = gdb.selected_inferior() + self._first_next = True + + # Try to determine the guest architecture. This is a bit hacky and + # tailored to GDB's naming for the x86-64 architecture. + split = self._process.architecture().name().split(':') + archname = split[1] if len(split) > 1 else split[0] + archname = archname.replace('-', '_') + if archname not in supported_architectures: + print(f'Error: Current platform ({archname}) is not' + f' supported by Focaccia. Exiting.') + exit(1) + + self.arch = supported_architectures[archname] + self.binary = self._process.progspace.filename + + def __iter__(self): + return self + + def __next__(self): + # The first call to __next__ should yield the first program state, + # i.e. before stepping the first time + if self._first_next: + self._first_next = False + return GDBProgramState(self._process, gdb.selected_frame(), self.arch) + + # Step + pc = gdb.selected_frame().read_register('pc') + new_pc = pc + while pc == new_pc: # Skip instruction chains from REP STOS etc. + gdb.execute('si', to_string=True) + if not self._process.is_valid() or len(self._process.threads()) == 0: + raise StopIteration + new_pc = gdb.selected_frame().read_register('pc') + + return GDBProgramState(self._process, gdb.selected_frame(), self.arch) + + def run_until(self, addr: int): + breakpoint = gdb.Breakpoint(f'*{addr:#x}') + gdb.execute('continue') + breakpoint.delete() + return GDBProgramState(self._process, gdb.selected_frame(), self.arch) + +def record_minimal_snapshot(prev_state: ReadableProgramState, + cur_state: ReadableProgramState, + prev_transform: SymbolicTransform, + cur_transform: SymbolicTransform) \ + -> ProgramState: + """Record a minimal snapshot. + + A minimal snapshot must include values (registers and memory) that are + accessed by two transformations: + 1. The values produced by the previous transformation (the + transformation that is producing this snapshot) to check these + values against expected values calculated from the previous + program state. + 2. The values that act as inputs to the transformation acting on this + snapshot, to calculate the expected values of the next snapshot. + + :param prev_transform: The symbolic transformation generating, or + leading to, `cur_state`. Values generated by + this transformation are included in the + snapshot. + :param transform: The symbolic transformation operating on this + snapshot. Input values to this transformation are + included in the snapshot. + """ + assert(cur_state.read_register('pc') == cur_transform.addr) + assert(prev_transform.arch == cur_transform.arch) + + def get_written_addresses(t: SymbolicTransform): + """Get all output memory accesses of a symbolic transformation.""" + return [ExprMem(a, v.size) for a, v in t.changed_mem.items()] + + def set_values(regs: Iterable[str], mems: Iterable[ExprMem], + cur_state: ReadableProgramState, + prev_state: ReadableProgramState, + out_state: ProgramState): + """ + :param prev_state: Addresses of memory included in the snapshot are + resolved relative to this state. + """ + for regname in regs: + try: + regval = cur_state.read_register(regname) + out_state.set_register(regname, regval) + except RegisterAccessError: + pass + for mem in mems: + assert(mem.size % 8 == 0) + addr = eval_symbol(mem.ptr, prev_state) + try: + mem = cur_state.read_memory(addr, int(mem.size / 8)) + out_state.write_memory(addr, mem) + except MemoryAccessError: + pass + + state = ProgramState(cur_transform.arch) + state.set_register('PC', cur_transform.addr) + + set_values(prev_transform.changed_regs.keys(), + get_written_addresses(prev_transform), + cur_state, + prev_state, # Evaluate memory addresses based on previous + # state because they are that state's output + # addresses. + state) + set_values(cur_transform.get_used_registers(), + cur_transform.get_used_memory_addresses(), + cur_state, + cur_state, + state) + return state + +def collect_conc_trace(gdb: GDBServerStateIterator, \ + strace: list[SymbolicTransform], + start_addr: int | None = None, + stop_addr: int | None = None) \ + -> tuple[list[ProgramState], list[SymbolicTransform]]: + """Collect a trace of concrete states from GDB. + + Records minimal concrete states from GDB by using symbolic trace + information to determine which register/memory values are required to + verify the correctness of the program running in GDB. + + May drop symbolic transformations if the symbolic trace and the GDB trace + diverge (e.g. because of differences in environment, etc.). Returns the + new, possibly modified, symbolic trace that matches the returned concrete + trace. + + :return: A list of concrete states and a list of corresponding symbolic + transformations. The lists are guaranteed to have the same length. + """ + def find_index(seq, target, access=lambda el: el): + for i, el in enumerate(seq): + if access(el) == target: + return i + return None + + if not strace: + return [], [] + + states = [] + matched_transforms = [] + + state_iter = iter(gdb) + cur_state = next(state_iter) + symb_i = 0 + + # Skip to start + try: + pc = cur_state.read_register('pc') + if start_addr and pc != start_addr: + info(f'Tracing QEMU from starting address: {hex(start_addr)}') + cur_state = state_iter.run_until(start_addr) + except Exception as e: + if start_addr: + raise Exception(f'Unable to reach start address {hex(start_addr)}: {e}') + raise Exception(f'Unable to trace: {e}') + + # An online trace matching algorithm. + while True: + try: + pc = cur_state.read_register('pc') + + while pc != strace[symb_i].addr: + info(f'PC {hex(pc)} does not match next symbolic reference {hex(strace[symb_i].addr)}') + + next_i = find_index(strace[symb_i+1:], pc, lambda t: t.addr) + + # Drop the concrete state if no address in the symbolic trace + # matches + if next_i is None: + warn(f'Dropping concrete state {hex(pc)}, as no' + f' matching instruction can be found in the symbolic' + f' reference trace.') + cur_state = next(state_iter) + pc = cur_state.read_register('pc') + continue + + # Otherwise, jump to the next matching symbolic state + symb_i += next_i + 1 + if symb_i >= len(strace): + break + + assert(cur_state.read_register('pc') == strace[symb_i].addr) + info(f'Validating instruction at address {hex(pc)}') + states.append(record_minimal_snapshot( + states[-1] if states else cur_state, + cur_state, + matched_transforms[-1] if matched_transforms else strace[symb_i], + strace[symb_i])) + matched_transforms.append(strace[symb_i]) + cur_state = next(state_iter) + symb_i += 1 + if symb_i >= len(strace): + break + except StopIteration: + # TODO: The conditions may test for the same + if stop_addr and pc != stop_addr: + raise Exception(f'QEMU stopped at {hex(pc)} before reaching the stop address' + f' {hex(stop_addr)}') + if symb_i+1 < len(strace): + qemu_crash["crashed"] = True + qemu_crash["pc"] = strace[symb_i].addr + qemu_crash["ref"] = strace[symb_i] + qemu_crash["snap"] = states[-1] + break + except Exception as e: + print(traceback.format_exc()) + raise e + + # Note: this may occur when symbolic traces were gathered with a stop address + if symb_i >= len(strace): + warn(f'QEMU executed more states than native execution: {symb_i} vs {len(strace)-1}') + + return states, matched_transforms + +def main(): + args = make_argparser().parse_args() + + logging_level = getattr(logging, args.error_level.upper(), logging.INFO) + logging.basicConfig(level=logging_level, force=True) + + try: + gdb_server = GDBServerStateIterator(args.remote) + except Exception as e: + raise Exception(f'Unable to perform basic GDB setup: {e}') + + try: + executable: str | None = None + if args.executable is None: + executable = gdb_server.binary + else: + executable = args.executable + + argv = [] # QEMU's GDB stub does not support 'info proc cmdline' + envp = [] # Can't get the remote target's environment + env = TraceEnvironment(executable, argv, envp, '?') + except Exception as e: + raise Exception(f'Unable to create trace environment for executable {executable}: {e}') + + # Read pre-computed symbolic trace + try: + with open(args.symb_trace, 'r') as strace: + symb_transforms = parser.parse_transformations(strace) + except Exception as e: + raise Exception(f'Failed to parse state transformations from native trace: {e}') + + # Use symbolic trace to collect concrete trace from QEMU + try: + conc_states, matched_transforms = collect_conc_trace( + gdb_server, + symb_transforms.states, + symb_transforms.env.start_address, + symb_transforms.env.stop_address) + except Exception as e: + raise Exception(f'Failed to collect concolic trace from QEMU: {e}') + + # Verify and print result + if not args.quiet: + try: + res = compare_symbolic(conc_states, matched_transforms) + if qemu_crash["crashed"]: + res.append({ + 'pc': qemu_crash["pc"], + 'txl': None, + 'ref': qemu_crash["ref"], + 'errors': qemu_crash["errors"], + 'snap': qemu_crash["snap"], + }) + print_result(res, verbosity[args.error_level]) + except Exception as e: + raise Exception('Error occured when comparing with symbolic equations: {e}') + + if args.output: + from focaccia.parser import serialize_snapshots + try: + with open(args.output, 'w') as file: + serialize_snapshots(Trace(conc_states, env), file) + except Exception as e: + raise Exception(f'Unable to serialize snapshots to file {args.output}: {e}') + +if __name__ == "__main__": + main() + diff --git a/src/focaccia/qemu/validation_server.py b/src/focaccia/qemu/validation_server.py new file mode 100755 index 0000000..db33ff3 --- /dev/null +++ b/src/focaccia/qemu/validation_server.py @@ -0,0 +1,409 @@ +#! /usr/bin/env python3 + +import os +import socket +import struct +import logging +from typing import Iterable + +import focaccia.parser as parser +from focaccia.arch import supported_architectures, Arch +from focaccia.compare import compare_symbolic, ErrorTypes +from focaccia.snapshot import ProgramState, RegisterAccessError, MemoryAccessError +from focaccia.symbolic import SymbolicTransform, eval_symbol, ExprMem +from focaccia.trace import Trace +from focaccia.utils import print_result + + +logger = logging.getLogger('focaccia-qemu-validation-server') +debug = logger.debug +info = logger.info +warn = logger.warning + + +def endian_fmt(endianness: str) -> str: + if endianness == 'little': + return '<' + else: + return '>' + +def mk_command(cmd: str, endianness: str, reg: str="", addr: int=0, size: int=0) -> bytes: + # char[16]:regname | long long:addr long long:size | long long:unused + # READ REG | READ MEM | STEP ONE + + if cmd == 'read register': + fmt = f'{endian_fmt(endianness)}16s9s' + return struct.pack(fmt,reg.encode('utf-8'),"READ REG".encode('utf-8')) + elif cmd == 'read memory': + fmt = f'{endian_fmt(endianness)}QQ9s' + return struct.pack(fmt, addr, size, "READ MEM".encode('utf-8')) + elif cmd == 'step': + fmt = f'{endian_fmt(endianness)}qq9s' + return struct.pack(fmt, 0, 0, "STEP ONE".encode('utf-8')) + else: + raise ValueError(f'Unknown command {cmd}') +def unmk_memory(msg: bytes, endianness: str) -> tuple: + # packed! + # unsigned long long: addr + # unsigned long: length + fmt = f'{endian_fmt(endianness)}QQ' + addr, length = struct.unpack(fmt, msg) + + return addr, length + +def unmk_register(msg: bytes, endianness: str) -> tuple: + # packed! + # char[108]:regname | unsigned long:bytes | char[64]:value + fmt = f'{endian_fmt(endianness)}108sQ64s' + reg_name, size, val = struct.unpack(fmt, msg) + reg_name = reg_name.decode('utf-8').rstrip('\x00') + + if reg_name == "UNKNOWN": + raise RegisterAccessError(reg_name, + f'[QEMU Plugin] Unable to access register {reg_name}.') + + val = val[:size] + val = int.from_bytes(val, endianness) + return val, size + +class PluginProgramState(ProgramState): + from focaccia.arch import aarch64, x86 + + flag_register_names = { + aarch64.archname: 'cpsr', + x86.archname: 'eflags', + } + + flag_register_decompose = { + aarch64.archname: aarch64.decompose_cpsr, + x86.archname: x86.decompose_rflags, + } + + def _flush_caches(self): + for r in self.regs.keys(): + self.regs[r] = None + self.mem.drop_all() + + + def __init__(self, arch: Arch): + super().__init__(arch) + self.strict = False + + def read_register(self, reg: str, no_cached: bool=False) -> int: + global CONN + + if reg == 'RFLAGS': + reg = 'EFLAGS' + + flags = self.flag_register_decompose[self.arch.archname](0).keys() + if reg in flags and self.arch.archname in self.flag_register_names: + reg_name = self.flag_register_names[self.arch.archname] + else: + reg_name = self.arch.to_regname(reg) + + if reg_name is None: + raise RegisterAccessError(reg, f'Not a register name: {reg}') + + reg_acc = self.arch.get_reg_accessor(reg_name) + if reg_acc is None: + raise RegisterAccessError(reg, f'Not a enclosing register name: {reg}') + exit(-1) + reg_name = reg_acc.base_reg.lower() + + val = None + from_cache = False + if not no_cached and super().test_register(reg_name): + val = super().read_register(reg_name) + from_cache = True + else: + msg = mk_command("read register", self.arch.endianness, reg=reg_name) + CONN.send(msg) + + try: + resp = CONN.recv(180) + except ConnectionResetError: + raise StopIteration + + if len(resp) < 180: + raise RegisterAccessError(reg, f'Invalid response length when reading {reg}: {len(resp)}' + f' for response {resp}') + + val, size = unmk_register(resp, self.arch.endianness) + + # Try to access the flags register with `reg` as a logical flag name + if reg in flags and self.arch.archname in self.flag_register_names: + flags_reg = self.flag_register_names[self.arch.archname] + _flags = self.flag_register_decompose[self.arch.archname](val) + if reg in _flags: + if not from_cache: + self.set_register(reg, _flags[reg]) + return _flags[reg] + raise RegisterAccessError(f'Unable to access flag {reg}.') + + if not from_cache: + self.set_register(reg, val) + return val & reg_acc.mask >> reg_acc.start + + def read_memory(self, addr: int, size: int) -> bytes: + global CONN + + if self.mem.test(addr): + return super().read_memory(addr, size) + + # print(f'Reading memory at {addr:x}, size={size}') + + msg = mk_command("read memory", self.arch.endianness, addr=addr, size=size) + CONN.send(msg) + + try: + resp = CONN.recv(16) + except ConnectionResetError: + raise StopIteration + _addr, length = unmk_memory(resp, self.arch.endianness) + + if _addr != addr or length == 0: + raise MemoryAccessError( + _addr, size, + f'Unable to access memory at address {addr:x}, size={size}.') + return b'' + + mem = b'' + while len(mem) < length: + try: + resp = CONN.recv(length - len(mem)) + except ConnectionResetError: + raise StopIteration + mem += resp + + self.write_memory(addr, mem) + return mem + + def step(self): + global CONN + + self._flush_caches() + msg = mk_command("step", self.arch.endianness) + CONN.send(msg) + + + return + +class PluginStateIterator: + + def __init__(self, sock_path: str, arch: Arch): + global SOCK + global CONN + + self.sock_path = sock_path + self.arch = arch + self._first_next = True + + + # Start the server that waits for QEMU to connect + try: + os.unlink(self.sock_path) + except FileNotFoundError: + pass + # TODO: allow new connections when QEMU clones + SOCK = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + + print(f'Listening for QEMU Plugin connection at {self.sock_path}...') + SOCK.bind(self.sock_path) + SOCK.listen(1) + + CONN, qemu_addr = SOCK.accept() + + # Handshake with QEMU + pid_b = CONN.recv(4) + pid = struct.unpack('i', pid_b)[0] + print(f'Connected to QEMU instance with PID {pid}.') + + def __iter__(self): + return self + + def __next__(self): + # The first call to __next__ should yield the first program state, + # i.e. after stepping the first time + if self._first_next: + self._first_next = False + self.state = PluginProgramState(self.arch) + #self.state.step() + return self.state + + # Step + pc = self.state.read_register('pc') + new_pc = pc + while pc == new_pc: # Skip instruction chains from REP STOS etc. + self.state.step() + new_pc = self.state.read_register('pc', True) + + return self.state + +def record_minimal_snapshot(prev_state: ProgramState, + cur_state: PluginProgramState, + prev_transform: SymbolicTransform, + cur_transform: SymbolicTransform) \ + -> ProgramState: + """Record a minimal snapshot. + + A minimal snapshot must include values (registers and memory) that are + accessed by two transformations: + 1. The values produced by the previous transformation (the + transformation that is producing this snapshot) to check these + values against expected values calculated from the previous + program state. + 2. The values that act as inputs to the transformation acting on this + snapshot, to calculate the expected values of the next snapshot. + + :param prev_transform: The symbolic transformation generating, or + leading to, `cur_state`. Values generated by + this transformation are included in the + snapshot. + :param transform: The symbolic transformation operating on this + snapshot. Input values to this transformation are + included in the snapshot. + """ + assert(cur_state.read_register('pc') == cur_transform.addr) + assert(prev_transform.arch == cur_transform.arch) + + def get_written_addresses(t: SymbolicTransform) -> Iterable[ExprMem]: + """Get all output memory accesses of a symbolic transformation.""" + return [ExprMem(a, v.size) for a, v in t.changed_mem.items()] + + def set_values(regs: Iterable[str], mems: Iterable[ExprMem], + cur_state: PluginProgramState, + prev_state: PluginProgramState, + out_state: ProgramState): + """ + :param prev_state: Addresses of memory included in the snapshot are + resolved relative to this state. + """ + for regname in regs: + try: + regval = cur_state.read_register(regname) + out_state.set_register(regname, regval) + except RegisterAccessError: + out_state.set_register(regname, 0) + for mem in mems: + assert(mem.size % 8 == 0) + addr = eval_symbol(mem.ptr, prev_state) + try: + mem = cur_state.read_memory(addr, int(mem.size / 8)) + out_state.write_memory(addr, mem) + except MemoryAccessError: + pass + + state = ProgramState(cur_transform.arch) + state.set_register('pc', cur_transform.addr) + + set_values(prev_transform.changed_regs.keys(), + get_written_addresses(prev_transform), + cur_state, + prev_state, + state) + set_values(cur_transform.get_used_registers(), + cur_transform.get_used_memory_addresses(), + cur_state, + cur_state, + state) + return state + +def collect_conc_trace(qemu: PluginStateIterator, \ + strace: list[SymbolicTransform]) \ + -> tuple[list[ProgramState], list[SymbolicTransform]]: + """Collect a trace of concrete states from QEMU. + + Records minimal concrete states from QEMU by using symbolic trace + information to determine which register/memory values are required to + verify the correctness of QEMU. + + May drop symbolic transformations if the symbolic trace and the QEMU trace + diverge (e.g. because of differences in environment, etc.). Returns the + new, possibly modified, symbolic trace that matches the returned concrete + trace. + + :return: A list of concrete states and a list of corresponding symbolic + transformations. The lists are guaranteed to have the same length. + """ + def find_index(seq, target, access=lambda el: el): + for i, el in enumerate(seq): + if access(el) == target: + return i + return None + + if not strace: + return [], [] + + states = [] + matched_transforms = [] + + state_iter = iter(qemu) + cur_state = next(state_iter) + symb_i = 0 + + # An online trace matching algorithm. + while True: + try: + pc = cur_state.read_register('pc') + + while pc != strace[symb_i].addr: + next_i = find_index(strace[symb_i+1:], pc, lambda t: t.addr) + + # Drop the concrete state if no address in the symbolic trace + # matches + if next_i is None: + print(f'Warning: Dropping concrete state {hex(pc)}, as no' + f' matching instruction can be found in the symbolic' + f' reference trace.') + cur_state = next(state_iter) + pc = cur_state.read_register('pc', True) + continue + + # Otherwise, jump to the next matching symbolic state + symb_i += next_i + 1 + + assert(cur_state.read_register('pc') == strace[symb_i].addr) + states.append(record_minimal_snapshot( + states[-1] if states else cur_state, + cur_state, + matched_transforms[-1] if matched_transforms else strace[symb_i], + strace[symb_i])) + matched_transforms.append(strace[symb_i]) + cur_state = next(state_iter) + symb_i += 1 + except StopIteration: + break + + return states, matched_transforms + + +def start_validation_server(symb_trace: str, + output: str, + socket: str, + guest_arch: str, + env, + verbosity: ErrorTypes, + is_quiet: bool = False): + # Read pre-computed symbolic trace + with open(symb_trace, 'r') as strace: + symb_transforms = parser.parse_transformations(strace) + + arch = supported_architectures.get(guest_arch) + + qemu = PluginStateIterator(socket, arch) + + # Use symbolic trace to collect concrete trace from QEMU + conc_states, matched_transforms = collect_conc_trace( + qemu, + symb_transforms.states) + + # Verify and print result + if not is_quiet: + res = compare_symbolic(conc_states, matched_transforms) + print_result(res, verbosity) + + if output: + from focaccia.parser import serialize_snapshots + with open(output, 'w') as file: + serialize_snapshots(Trace(conc_states, env), file) + diff --git a/src/focaccia/tools/_qemu_tool.py b/src/focaccia/tools/_qemu_tool.py deleted file mode 100644 index 02d150b..0000000 --- a/src/focaccia/tools/_qemu_tool.py +++ /dev/null @@ -1,414 +0,0 @@ -"""Invocable like this: - - gdb -n --batch -x qemu_tool.py - -But please use `tools/validate_qemu.py` instead because we have some more setup -work to do. -""" - -import gdb -import logging -import traceback -from typing import Iterable - -import focaccia.parser as parser -from focaccia.arch import supported_architectures, Arch -from focaccia.compare import compare_symbolic, Error, ErrorTypes -from focaccia.snapshot import ProgramState, ReadableProgramState, \ - RegisterAccessError, MemoryAccessError -from focaccia.symbolic import SymbolicTransform, eval_symbol, ExprMem -from focaccia.trace import Trace, TraceEnvironment -from focaccia.utils import print_result - -from validate_qemu import make_argparser, verbosity - -logger = logging.getLogger('focaccia-qemu-validator') -debug = logger.debug -info = logger.info -warn = logger.warning - -qemu_crash = { - "crashed": False, - "pc": None, - 'txl': None, - 'ref': None, - 'errors': [Error(ErrorTypes.CONFIRMED, "QEMU crashed")], - 'snap': None, -} - -class GDBProgramState(ReadableProgramState): - from focaccia.arch import aarch64, x86 - - flag_register_names = { - aarch64.archname: 'cpsr', - x86.archname: 'eflags', - } - - flag_register_decompose = { - aarch64.archname: aarch64.decompose_cpsr, - x86.archname: x86.decompose_rflags, - } - - def __init__(self, process: gdb.Inferior, frame: gdb.Frame, arch: Arch): - super().__init__(arch) - self._proc = process - self._frame = frame - - @staticmethod - def _read_vector_reg_aarch64(val: gdb.Value, size) -> int: - try: - return int(str(val['d']['u']), 10) - except: - try: - return int(str(val['u']), 10) - except: - return int(str(val['q']['u']), 10) - - @staticmethod - def _read_vector_reg_x86(val: gdb.Value, size) -> int: - num_longs = size // 64 - vals = val[f'v{num_longs}_int64'] - res = 0 - for i in range(num_longs): - val = int(vals[i].cast(gdb.lookup_type('unsigned long'))) - res += val << i * 64 - return res - - read_vector_reg = { - aarch64.archname: _read_vector_reg_aarch64, - x86.archname: _read_vector_reg_x86, - } - - def read_register(self, reg: str) -> int: - if reg == 'RFLAGS': - reg = 'EFLAGS' - - try: - val = self._frame.read_register(reg.lower()) - size = val.type.sizeof * 8 - - # For vector registers, we need to apply architecture-specific - # logic because GDB's interface is not consistent. - if size >= 128: # Value is a vector - if self.arch.archname not in self.read_vector_reg: - raise NotImplementedError( - f'Reading vector registers is not implemented for' - f' architecture {self.arch.archname}.') - return self.read_vector_reg[self.arch.archname](val, size) - elif size < 64: - return int(val.cast(gdb.lookup_type('unsigned int'))) - # For non-vector values, just return the 64-bit value - return int(val.cast(gdb.lookup_type('unsigned long'))) - except ValueError as err: - # Try to access the flags register with `reg` as a logical flag name - if self.arch.archname in self.flag_register_names: - flags_reg = self.flag_register_names[self.arch.archname] - flags = int(self._frame.read_register(flags_reg)) - flags = self.flag_register_decompose[self.arch.archname](flags) - if reg in flags: - return flags[reg] - raise RegisterAccessError(reg, - f'[GDB] Unable to access {reg}: {err}') - - def read_memory(self, addr: int, size: int) -> bytes: - try: - mem = self._proc.read_memory(addr, size).tobytes() - if self.arch.endianness == 'little': - return mem - else: - return bytes(reversed(mem)) # Convert to big endian - except gdb.MemoryError as err: - raise MemoryAccessError(addr, size, str(err)) - -class GDBServerStateIterator: - def __init__(self, remote: str): - gdb.execute('set pagination 0') - gdb.execute('set sysroot') - gdb.execute('set python print-stack full') # enable complete Python tracebacks - gdb.execute(f'target remote {remote}') - self._process = gdb.selected_inferior() - self._first_next = True - - # Try to determine the guest architecture. This is a bit hacky and - # tailored to GDB's naming for the x86-64 architecture. - split = self._process.architecture().name().split(':') - archname = split[1] if len(split) > 1 else split[0] - archname = archname.replace('-', '_') - if archname not in supported_architectures: - print(f'Error: Current platform ({archname}) is not' - f' supported by Focaccia. Exiting.') - exit(1) - - self.arch = supported_architectures[archname] - self.binary = self._process.progspace.filename - - def __iter__(self): - return self - - def __next__(self): - # The first call to __next__ should yield the first program state, - # i.e. before stepping the first time - if self._first_next: - self._first_next = False - return GDBProgramState(self._process, gdb.selected_frame(), self.arch) - - # Step - pc = gdb.selected_frame().read_register('pc') - new_pc = pc - while pc == new_pc: # Skip instruction chains from REP STOS etc. - gdb.execute('si', to_string=True) - if not self._process.is_valid() or len(self._process.threads()) == 0: - raise StopIteration - new_pc = gdb.selected_frame().read_register('pc') - - return GDBProgramState(self._process, gdb.selected_frame(), self.arch) - - def run_until(self, addr: int): - breakpoint = gdb.Breakpoint(f'*{addr:#x}') - gdb.execute('continue') - breakpoint.delete() - return GDBProgramState(self._process, gdb.selected_frame(), self.arch) - -def record_minimal_snapshot(prev_state: ReadableProgramState, - cur_state: ReadableProgramState, - prev_transform: SymbolicTransform, - cur_transform: SymbolicTransform) \ - -> ProgramState: - """Record a minimal snapshot. - - A minimal snapshot must include values (registers and memory) that are - accessed by two transformations: - 1. The values produced by the previous transformation (the - transformation that is producing this snapshot) to check these - values against expected values calculated from the previous - program state. - 2. The values that act as inputs to the transformation acting on this - snapshot, to calculate the expected values of the next snapshot. - - :param prev_transform: The symbolic transformation generating, or - leading to, `cur_state`. Values generated by - this transformation are included in the - snapshot. - :param transform: The symbolic transformation operating on this - snapshot. Input values to this transformation are - included in the snapshot. - """ - assert(cur_state.read_register('pc') == cur_transform.addr) - assert(prev_transform.arch == cur_transform.arch) - - def get_written_addresses(t: SymbolicTransform): - """Get all output memory accesses of a symbolic transformation.""" - return [ExprMem(a, v.size) for a, v in t.changed_mem.items()] - - def set_values(regs: Iterable[str], mems: Iterable[ExprMem], - cur_state: ReadableProgramState, - prev_state: ReadableProgramState, - out_state: ProgramState): - """ - :param prev_state: Addresses of memory included in the snapshot are - resolved relative to this state. - """ - for regname in regs: - try: - regval = cur_state.read_register(regname) - out_state.set_register(regname, regval) - except RegisterAccessError: - pass - for mem in mems: - assert(mem.size % 8 == 0) - addr = eval_symbol(mem.ptr, prev_state) - try: - mem = cur_state.read_memory(addr, int(mem.size / 8)) - out_state.write_memory(addr, mem) - except MemoryAccessError: - pass - - state = ProgramState(cur_transform.arch) - state.set_register('PC', cur_transform.addr) - - set_values(prev_transform.changed_regs.keys(), - get_written_addresses(prev_transform), - cur_state, - prev_state, # Evaluate memory addresses based on previous - # state because they are that state's output - # addresses. - state) - set_values(cur_transform.get_used_registers(), - cur_transform.get_used_memory_addresses(), - cur_state, - cur_state, - state) - return state - -def collect_conc_trace(gdb: GDBServerStateIterator, \ - strace: list[SymbolicTransform], - start_addr: int | None = None, - stop_addr: int | None = None) \ - -> tuple[list[ProgramState], list[SymbolicTransform]]: - """Collect a trace of concrete states from GDB. - - Records minimal concrete states from GDB by using symbolic trace - information to determine which register/memory values are required to - verify the correctness of the program running in GDB. - - May drop symbolic transformations if the symbolic trace and the GDB trace - diverge (e.g. because of differences in environment, etc.). Returns the - new, possibly modified, symbolic trace that matches the returned concrete - trace. - - :return: A list of concrete states and a list of corresponding symbolic - transformations. The lists are guaranteed to have the same length. - """ - def find_index(seq, target, access=lambda el: el): - for i, el in enumerate(seq): - if access(el) == target: - return i - return None - - if not strace: - return [], [] - - states = [] - matched_transforms = [] - - state_iter = iter(gdb) - cur_state = next(state_iter) - symb_i = 0 - - # Skip to start - try: - pc = cur_state.read_register('pc') - if start_addr and pc != start_addr: - info(f'Tracing QEMU from starting address: {hex(start_addr)}') - cur_state = state_iter.run_until(start_addr) - except Exception as e: - if start_addr: - raise Exception(f'Unable to reach start address {hex(start_addr)}: {e}') - raise Exception(f'Unable to trace: {e}') - - # An online trace matching algorithm. - while True: - try: - pc = cur_state.read_register('pc') - - while pc != strace[symb_i].addr: - info(f'PC {hex(pc)} does not match next symbolic reference {hex(strace[symb_i].addr)}') - - next_i = find_index(strace[symb_i+1:], pc, lambda t: t.addr) - - # Drop the concrete state if no address in the symbolic trace - # matches - if next_i is None: - warn(f'Dropping concrete state {hex(pc)}, as no' - f' matching instruction can be found in the symbolic' - f' reference trace.') - cur_state = next(state_iter) - pc = cur_state.read_register('pc') - continue - - # Otherwise, jump to the next matching symbolic state - symb_i += next_i + 1 - if symb_i >= len(strace): - break - - assert(cur_state.read_register('pc') == strace[symb_i].addr) - info(f'Validating instruction at address {hex(pc)}') - states.append(record_minimal_snapshot( - states[-1] if states else cur_state, - cur_state, - matched_transforms[-1] if matched_transforms else strace[symb_i], - strace[symb_i])) - matched_transforms.append(strace[symb_i]) - cur_state = next(state_iter) - symb_i += 1 - if symb_i >= len(strace): - break - except StopIteration: - # TODO: The conditions may test for the same - if stop_addr and pc != stop_addr: - raise Exception(f'QEMU stopped at {hex(pc)} before reaching the stop address' - f' {hex(stop_addr)}') - if symb_i+1 < len(strace): - qemu_crash["crashed"] = True - qemu_crash["pc"] = strace[symb_i].addr - qemu_crash["ref"] = strace[symb_i] - qemu_crash["snap"] = states[-1] - break - except Exception as e: - print(traceback.format_exc()) - raise e - - # Note: this may occur when symbolic traces were gathered with a stop address - if symb_i >= len(strace): - warn(f'QEMU executed more states than native execution: {symb_i} vs {len(strace)-1}') - - return states, matched_transforms - -def main(): - args = make_argparser().parse_args() - - logging_level = getattr(logging, args.error_level.upper(), logging.INFO) - logging.basicConfig(level=logging_level, force=True) - - try: - gdb_server = GDBServerStateIterator(args.remote) - except Exception as e: - raise Exception(f'Unable to perform basic GDB setup: {e}') - - try: - executable: str | None = None - if args.executable is None: - executable = gdb_server.binary - else: - executable = args.executable - - argv = [] # QEMU's GDB stub does not support 'info proc cmdline' - envp = [] # Can't get the remote target's environment - env = TraceEnvironment(executable, argv, envp, '?') - except Exception as e: - raise Exception(f'Unable to create trace environment for executable {executable}: {e}') - - # Read pre-computed symbolic trace - try: - with open(args.symb_trace, 'r') as strace: - symb_transforms = parser.parse_transformations(strace) - except Exception as e: - raise Exception(f'Failed to parse state transformations from native trace: {e}') - - # Use symbolic trace to collect concrete trace from QEMU - try: - conc_states, matched_transforms = collect_conc_trace( - gdb_server, - symb_transforms.states, - symb_transforms.env.start_address, - symb_transforms.env.stop_address) - except Exception as e: - raise Exception(f'Failed to collect concolic trace from QEMU: {e}') - - # Verify and print result - if not args.quiet: - try: - res = compare_symbolic(conc_states, matched_transforms) - if qemu_crash["crashed"]: - res.append({ - 'pc': qemu_crash["pc"], - 'txl': None, - 'ref': qemu_crash["ref"], - 'errors': qemu_crash["errors"], - 'snap': qemu_crash["snap"], - }) - print_result(res, verbosity[args.error_level]) - except Exception as e: - raise Exception('Error occured when comparing with symbolic equations: {e}') - - if args.output: - from focaccia.parser import serialize_snapshots - try: - with open(args.output, 'w') as file: - serialize_snapshots(Trace(conc_states, env), file) - except Exception as e: - raise Exception(f'Unable to serialize snapshots to file {args.output}: {e}') - -if __name__ == "__main__": - main() - diff --git a/src/focaccia/tools/validate_qemu.py b/src/focaccia/tools/validate_qemu.py index e834a6d..48b3f1c 100755 --- a/src/focaccia/tools/validate_qemu.py +++ b/src/focaccia/tools/validate_qemu.py @@ -23,9 +23,10 @@ import argparse import sysconfig import subprocess +import focaccia.qemu from focaccia.compare import ErrorTypes from focaccia.arch import supported_architectures -from focaccia.tools.validation_server import start_validation_server +from focaccia.qemu.validation_server import start_validation_server verbosity = { 'info': ErrorTypes.INFO, @@ -118,7 +119,7 @@ def main(): args.quiet) else: # QEMU GDB interface - script_dirname = os.path.dirname(__file__) + script_dirname = os.path.dirname(focaccia.qemu.__file__) qemu_tool_path = os.path.join(script_dirname, '_qemu_tool.py') # We have to remove all arguments we don't want to pass to the qemu tool diff --git a/src/focaccia/tools/validation_server.py b/src/focaccia/tools/validation_server.py deleted file mode 100755 index db33ff3..0000000 --- a/src/focaccia/tools/validation_server.py +++ /dev/null @@ -1,409 +0,0 @@ -#! /usr/bin/env python3 - -import os -import socket -import struct -import logging -from typing import Iterable - -import focaccia.parser as parser -from focaccia.arch import supported_architectures, Arch -from focaccia.compare import compare_symbolic, ErrorTypes -from focaccia.snapshot import ProgramState, RegisterAccessError, MemoryAccessError -from focaccia.symbolic import SymbolicTransform, eval_symbol, ExprMem -from focaccia.trace import Trace -from focaccia.utils import print_result - - -logger = logging.getLogger('focaccia-qemu-validation-server') -debug = logger.debug -info = logger.info -warn = logger.warning - - -def endian_fmt(endianness: str) -> str: - if endianness == 'little': - return '<' - else: - return '>' - -def mk_command(cmd: str, endianness: str, reg: str="", addr: int=0, size: int=0) -> bytes: - # char[16]:regname | long long:addr long long:size | long long:unused - # READ REG | READ MEM | STEP ONE - - if cmd == 'read register': - fmt = f'{endian_fmt(endianness)}16s9s' - return struct.pack(fmt,reg.encode('utf-8'),"READ REG".encode('utf-8')) - elif cmd == 'read memory': - fmt = f'{endian_fmt(endianness)}QQ9s' - return struct.pack(fmt, addr, size, "READ MEM".encode('utf-8')) - elif cmd == 'step': - fmt = f'{endian_fmt(endianness)}qq9s' - return struct.pack(fmt, 0, 0, "STEP ONE".encode('utf-8')) - else: - raise ValueError(f'Unknown command {cmd}') -def unmk_memory(msg: bytes, endianness: str) -> tuple: - # packed! - # unsigned long long: addr - # unsigned long: length - fmt = f'{endian_fmt(endianness)}QQ' - addr, length = struct.unpack(fmt, msg) - - return addr, length - -def unmk_register(msg: bytes, endianness: str) -> tuple: - # packed! - # char[108]:regname | unsigned long:bytes | char[64]:value - fmt = f'{endian_fmt(endianness)}108sQ64s' - reg_name, size, val = struct.unpack(fmt, msg) - reg_name = reg_name.decode('utf-8').rstrip('\x00') - - if reg_name == "UNKNOWN": - raise RegisterAccessError(reg_name, - f'[QEMU Plugin] Unable to access register {reg_name}.') - - val = val[:size] - val = int.from_bytes(val, endianness) - return val, size - -class PluginProgramState(ProgramState): - from focaccia.arch import aarch64, x86 - - flag_register_names = { - aarch64.archname: 'cpsr', - x86.archname: 'eflags', - } - - flag_register_decompose = { - aarch64.archname: aarch64.decompose_cpsr, - x86.archname: x86.decompose_rflags, - } - - def _flush_caches(self): - for r in self.regs.keys(): - self.regs[r] = None - self.mem.drop_all() - - - def __init__(self, arch: Arch): - super().__init__(arch) - self.strict = False - - def read_register(self, reg: str, no_cached: bool=False) -> int: - global CONN - - if reg == 'RFLAGS': - reg = 'EFLAGS' - - flags = self.flag_register_decompose[self.arch.archname](0).keys() - if reg in flags and self.arch.archname in self.flag_register_names: - reg_name = self.flag_register_names[self.arch.archname] - else: - reg_name = self.arch.to_regname(reg) - - if reg_name is None: - raise RegisterAccessError(reg, f'Not a register name: {reg}') - - reg_acc = self.arch.get_reg_accessor(reg_name) - if reg_acc is None: - raise RegisterAccessError(reg, f'Not a enclosing register name: {reg}') - exit(-1) - reg_name = reg_acc.base_reg.lower() - - val = None - from_cache = False - if not no_cached and super().test_register(reg_name): - val = super().read_register(reg_name) - from_cache = True - else: - msg = mk_command("read register", self.arch.endianness, reg=reg_name) - CONN.send(msg) - - try: - resp = CONN.recv(180) - except ConnectionResetError: - raise StopIteration - - if len(resp) < 180: - raise RegisterAccessError(reg, f'Invalid response length when reading {reg}: {len(resp)}' - f' for response {resp}') - - val, size = unmk_register(resp, self.arch.endianness) - - # Try to access the flags register with `reg` as a logical flag name - if reg in flags and self.arch.archname in self.flag_register_names: - flags_reg = self.flag_register_names[self.arch.archname] - _flags = self.flag_register_decompose[self.arch.archname](val) - if reg in _flags: - if not from_cache: - self.set_register(reg, _flags[reg]) - return _flags[reg] - raise RegisterAccessError(f'Unable to access flag {reg}.') - - if not from_cache: - self.set_register(reg, val) - return val & reg_acc.mask >> reg_acc.start - - def read_memory(self, addr: int, size: int) -> bytes: - global CONN - - if self.mem.test(addr): - return super().read_memory(addr, size) - - # print(f'Reading memory at {addr:x}, size={size}') - - msg = mk_command("read memory", self.arch.endianness, addr=addr, size=size) - CONN.send(msg) - - try: - resp = CONN.recv(16) - except ConnectionResetError: - raise StopIteration - _addr, length = unmk_memory(resp, self.arch.endianness) - - if _addr != addr or length == 0: - raise MemoryAccessError( - _addr, size, - f'Unable to access memory at address {addr:x}, size={size}.') - return b'' - - mem = b'' - while len(mem) < length: - try: - resp = CONN.recv(length - len(mem)) - except ConnectionResetError: - raise StopIteration - mem += resp - - self.write_memory(addr, mem) - return mem - - def step(self): - global CONN - - self._flush_caches() - msg = mk_command("step", self.arch.endianness) - CONN.send(msg) - - - return - -class PluginStateIterator: - - def __init__(self, sock_path: str, arch: Arch): - global SOCK - global CONN - - self.sock_path = sock_path - self.arch = arch - self._first_next = True - - - # Start the server that waits for QEMU to connect - try: - os.unlink(self.sock_path) - except FileNotFoundError: - pass - # TODO: allow new connections when QEMU clones - SOCK = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - - print(f'Listening for QEMU Plugin connection at {self.sock_path}...') - SOCK.bind(self.sock_path) - SOCK.listen(1) - - CONN, qemu_addr = SOCK.accept() - - # Handshake with QEMU - pid_b = CONN.recv(4) - pid = struct.unpack('i', pid_b)[0] - print(f'Connected to QEMU instance with PID {pid}.') - - def __iter__(self): - return self - - def __next__(self): - # The first call to __next__ should yield the first program state, - # i.e. after stepping the first time - if self._first_next: - self._first_next = False - self.state = PluginProgramState(self.arch) - #self.state.step() - return self.state - - # Step - pc = self.state.read_register('pc') - new_pc = pc - while pc == new_pc: # Skip instruction chains from REP STOS etc. - self.state.step() - new_pc = self.state.read_register('pc', True) - - return self.state - -def record_minimal_snapshot(prev_state: ProgramState, - cur_state: PluginProgramState, - prev_transform: SymbolicTransform, - cur_transform: SymbolicTransform) \ - -> ProgramState: - """Record a minimal snapshot. - - A minimal snapshot must include values (registers and memory) that are - accessed by two transformations: - 1. The values produced by the previous transformation (the - transformation that is producing this snapshot) to check these - values against expected values calculated from the previous - program state. - 2. The values that act as inputs to the transformation acting on this - snapshot, to calculate the expected values of the next snapshot. - - :param prev_transform: The symbolic transformation generating, or - leading to, `cur_state`. Values generated by - this transformation are included in the - snapshot. - :param transform: The symbolic transformation operating on this - snapshot. Input values to this transformation are - included in the snapshot. - """ - assert(cur_state.read_register('pc') == cur_transform.addr) - assert(prev_transform.arch == cur_transform.arch) - - def get_written_addresses(t: SymbolicTransform) -> Iterable[ExprMem]: - """Get all output memory accesses of a symbolic transformation.""" - return [ExprMem(a, v.size) for a, v in t.changed_mem.items()] - - def set_values(regs: Iterable[str], mems: Iterable[ExprMem], - cur_state: PluginProgramState, - prev_state: PluginProgramState, - out_state: ProgramState): - """ - :param prev_state: Addresses of memory included in the snapshot are - resolved relative to this state. - """ - for regname in regs: - try: - regval = cur_state.read_register(regname) - out_state.set_register(regname, regval) - except RegisterAccessError: - out_state.set_register(regname, 0) - for mem in mems: - assert(mem.size % 8 == 0) - addr = eval_symbol(mem.ptr, prev_state) - try: - mem = cur_state.read_memory(addr, int(mem.size / 8)) - out_state.write_memory(addr, mem) - except MemoryAccessError: - pass - - state = ProgramState(cur_transform.arch) - state.set_register('pc', cur_transform.addr) - - set_values(prev_transform.changed_regs.keys(), - get_written_addresses(prev_transform), - cur_state, - prev_state, - state) - set_values(cur_transform.get_used_registers(), - cur_transform.get_used_memory_addresses(), - cur_state, - cur_state, - state) - return state - -def collect_conc_trace(qemu: PluginStateIterator, \ - strace: list[SymbolicTransform]) \ - -> tuple[list[ProgramState], list[SymbolicTransform]]: - """Collect a trace of concrete states from QEMU. - - Records minimal concrete states from QEMU by using symbolic trace - information to determine which register/memory values are required to - verify the correctness of QEMU. - - May drop symbolic transformations if the symbolic trace and the QEMU trace - diverge (e.g. because of differences in environment, etc.). Returns the - new, possibly modified, symbolic trace that matches the returned concrete - trace. - - :return: A list of concrete states and a list of corresponding symbolic - transformations. The lists are guaranteed to have the same length. - """ - def find_index(seq, target, access=lambda el: el): - for i, el in enumerate(seq): - if access(el) == target: - return i - return None - - if not strace: - return [], [] - - states = [] - matched_transforms = [] - - state_iter = iter(qemu) - cur_state = next(state_iter) - symb_i = 0 - - # An online trace matching algorithm. - while True: - try: - pc = cur_state.read_register('pc') - - while pc != strace[symb_i].addr: - next_i = find_index(strace[symb_i+1:], pc, lambda t: t.addr) - - # Drop the concrete state if no address in the symbolic trace - # matches - if next_i is None: - print(f'Warning: Dropping concrete state {hex(pc)}, as no' - f' matching instruction can be found in the symbolic' - f' reference trace.') - cur_state = next(state_iter) - pc = cur_state.read_register('pc', True) - continue - - # Otherwise, jump to the next matching symbolic state - symb_i += next_i + 1 - - assert(cur_state.read_register('pc') == strace[symb_i].addr) - states.append(record_minimal_snapshot( - states[-1] if states else cur_state, - cur_state, - matched_transforms[-1] if matched_transforms else strace[symb_i], - strace[symb_i])) - matched_transforms.append(strace[symb_i]) - cur_state = next(state_iter) - symb_i += 1 - except StopIteration: - break - - return states, matched_transforms - - -def start_validation_server(symb_trace: str, - output: str, - socket: str, - guest_arch: str, - env, - verbosity: ErrorTypes, - is_quiet: bool = False): - # Read pre-computed symbolic trace - with open(symb_trace, 'r') as strace: - symb_transforms = parser.parse_transformations(strace) - - arch = supported_architectures.get(guest_arch) - - qemu = PluginStateIterator(socket, arch) - - # Use symbolic trace to collect concrete trace from QEMU - conc_states, matched_transforms = collect_conc_trace( - qemu, - symb_transforms.states) - - # Verify and print result - if not is_quiet: - res = compare_symbolic(conc_states, matched_transforms) - print_result(res, verbosity) - - if output: - from focaccia.parser import serialize_snapshots - with open(output, 'w') as file: - serialize_snapshots(Trace(conc_states, env), file) - -- cgit 1.4.1