diff options
| author | Theofilos Augoustis <theofilos.augoustis@tum.de> | 2025-08-28 13:29:25 +0000 |
|---|---|---|
| committer | Theofilos Augoustis <theofilos.augoustis@tum.de> | 2025-08-28 13:29:25 +0000 |
| commit | 4bcee933ed1894b3ead6e5dc4c6f993cf769b523 (patch) | |
| tree | 30ecfa2ce98474fd4447d2dcf1814d4c16661602 /tools/_qemu_tool.py | |
| parent | 7e46de2da5f4d99ab672f5609b0142a4915498fc (diff) | |
| download | focaccia-4bcee933ed1894b3ead6e5dc4c6f993cf769b523.tar.gz focaccia-4bcee933ed1894b3ead6e5dc4c6f993cf769b523.zip | |
Move qemu validator tool to our current setup ta/tool-refactor
Diffstat (limited to 'tools/_qemu_tool.py')
| -rw-r--r-- | tools/_qemu_tool.py | 314 |
1 files changed, 0 insertions, 314 deletions
diff --git a/tools/_qemu_tool.py b/tools/_qemu_tool.py deleted file mode 100644 index b365d39..0000000 --- a/tools/_qemu_tool.py +++ /dev/null @@ -1,314 +0,0 @@ -"""Invocable like this: - - gdb -n --batch -x qemu_tool.py - -But please use `tools/verify_qemu.py` instead because we have some more setup -work to do. -""" - -import gdb -from typing import Iterable - -import focaccia.parser as parser -from focaccia.arch import supported_architectures, Arch -from focaccia.compare import compare_symbolic -from focaccia.snapshot import ProgramState, ReadableProgramState, \ - RegisterAccessError, MemoryAccessError -from focaccia.symbolic import SymbolicTransform, eval_symbol, ExprMem -from focaccia.trace import Trace, TraceEnvironment -from focaccia.utils import print_result - -from verify_qemu import make_argparser, verbosity - -class GDBProgramState(ReadableProgramState): - from focaccia.arch import aarch64, x86 - - flag_register_names = { - aarch64.archname: 'cpsr', - x86.archname: 'eflags', - } - - flag_register_decompose = { - aarch64.archname: aarch64.decompose_cpsr, - x86.archname: x86.decompose_rflags, - } - - def __init__(self, process: gdb.Inferior, frame: gdb.Frame, arch: Arch): - super().__init__(arch) - self._proc = process - self._frame = frame - - @staticmethod - def _read_vector_reg_aarch64(val, size) -> int: - return int(str(val['u']), 10) - - @staticmethod - def _read_vector_reg_x86(val, size) -> int: - num_longs = size // 64 - vals = val[f'v{num_longs}_int64'] - res = 0 - for i in range(num_longs): - val = int(vals[i].cast(gdb.lookup_type('unsigned long'))) - res += val << i * 64 - return res - - read_vector_reg = { - aarch64.archname: _read_vector_reg_aarch64, - x86.archname: _read_vector_reg_x86, - } - - def read_register(self, reg: str) -> int: - if reg == 'RFLAGS': - reg = 'EFLAGS' - - try: - val = self._frame.read_register(reg.lower()) - size = val.type.sizeof * 8 - - # For vector registers, we need to apply architecture-specific - # logic because GDB's interface is not consistent. - if size >= 128: # Value is a vector - if self.arch.archname not in self.read_vector_reg: - raise NotImplementedError( - f'Reading vector registers is not implemented for' - f' architecture {self.arch.archname}.') - return self.read_vector_reg[self.arch.archname](val, size) - elif size < 64: - return int(val.cast(gdb.lookup_type('unsigned int'))) - # For non-vector values, just return the 64-bit value - return int(val.cast(gdb.lookup_type('unsigned long'))) - except ValueError as err: - # Try to access the flags register with `reg` as a logical flag name - if self.arch.archname in self.flag_register_names: - flags_reg = self.flag_register_names[self.arch.archname] - flags = int(self._frame.read_register(flags_reg)) - flags = self.flag_register_decompose[self.arch.archname](flags) - if reg in flags: - return flags[reg] - raise RegisterAccessError(reg, - f'[GDB] Unable to access {reg}: {err}') - - def read_memory(self, addr: int, size: int) -> bytes: - try: - mem = self._proc.read_memory(addr, size).tobytes() - if self.arch.endianness == 'little': - return mem - else: - return bytes(reversed(mem)) # Convert to big endian - except gdb.MemoryError as err: - raise MemoryAccessError(addr, size, str(err)) - -class GDBServerStateIterator: - def __init__(self, address: str, port: int): - gdb.execute('set pagination 0') - gdb.execute('set sysroot') - gdb.execute(f'target remote {address}:{port}') - self._process = gdb.selected_inferior() - self._first_next = True - - # Try to determine the guest architecture. This is a bit hacky and - # tailored to GDB's naming for the x86-64 architecture. - split = self._process.architecture().name().split(':') - archname = split[1] if len(split) > 1 else split[0] - archname = archname.replace('-', '_') - if archname not in supported_architectures: - print(f'Error: Current platform ({archname}) is not' - f' supported by Focaccia. Exiting.') - exit(1) - - self.arch = supported_architectures[archname] - self.binary = self._process.progspace.filename - - def __iter__(self): - return self - - def __next__(self): - # The first call to __next__ should yield the first program state, - # i.e. before stepping the first time - if self._first_next: - self._first_next = False - return GDBProgramState(self._process, gdb.selected_frame(), self.arch) - - # Step - pc = gdb.selected_frame().read_register('pc') - new_pc = pc - while pc == new_pc: # Skip instruction chains from REP STOS etc. - gdb.execute('si', to_string=True) - if not self._process.is_valid() or len(self._process.threads()) == 0: - raise StopIteration - new_pc = gdb.selected_frame().read_register('pc') - - return GDBProgramState(self._process, gdb.selected_frame(), self.arch) - -def record_minimal_snapshot(prev_state: ReadableProgramState, - cur_state: ReadableProgramState, - prev_transform: SymbolicTransform, - cur_transform: SymbolicTransform) \ - -> ProgramState: - """Record a minimal snapshot. - - A minimal snapshot must include values (registers and memory) that are - accessed by two transformations: - 1. The values produced by the previous transformation (the - transformation that is producing this snapshot) to check these - values against expected values calculated from the previous - program state. - 2. The values that act as inputs to the transformation acting on this - snapshot, to calculate the expected values of the next snapshot. - - :param prev_transform: The symbolic transformation generating, or - leading to, `cur_state`. Values generated by - this transformation are included in the - snapshot. - :param transform: The symbolic transformation operating on this - snapshot. Input values to this transformation are - included in the snapshot. - """ - assert(cur_state.read_register('pc') == cur_transform.addr) - assert(prev_transform.arch == cur_transform.arch) - - def get_written_addresses(t: SymbolicTransform): - """Get all output memory accesses of a symbolic transformation.""" - return [ExprMem(a, v.size) for a, v in t.changed_mem.items()] - - def set_values(regs: Iterable[str], mems: Iterable[ExprMem], - cur_state: ReadableProgramState, - prev_state: ReadableProgramState, - out_state: ProgramState): - """ - :param prev_state: Addresses of memory included in the snapshot are - resolved relative to this state. - """ - for regname in regs: - try: - regval = cur_state.read_register(regname) - out_state.set_register(regname, regval) - except RegisterAccessError: - pass - for mem in mems: - assert(mem.size % 8 == 0) - addr = eval_symbol(mem.ptr, prev_state) - try: - mem = cur_state.read_memory(addr, int(mem.size / 8)) - out_state.write_memory(addr, mem) - except MemoryAccessError: - pass - - state = ProgramState(cur_transform.arch) - state.set_register('PC', cur_transform.addr) - - set_values(prev_transform.changed_regs.keys(), - get_written_addresses(prev_transform), - cur_state, - prev_state, # Evaluate memory addresses based on previous - # state because they are that state's output - # addresses. - state) - set_values(cur_transform.get_used_registers(), - cur_transform.get_used_memory_addresses(), - cur_state, - cur_state, - state) - return state - -def collect_conc_trace(gdb: GDBServerStateIterator, \ - strace: list[SymbolicTransform]) \ - -> tuple[list[ProgramState], list[SymbolicTransform]]: - """Collect a trace of concrete states from GDB. - - Records minimal concrete states from GDB by using symbolic trace - information to determine which register/memory values are required to - verify the correctness of the program running in GDB. - - May drop symbolic transformations if the symbolic trace and the GDB trace - diverge (e.g. because of differences in environment, etc.). Returns the - new, possibly modified, symbolic trace that matches the returned concrete - trace. - - :return: A list of concrete states and a list of corresponding symbolic - transformations. The lists are guaranteed to have the same length. - """ - def find_index(seq, target, access=lambda el: el): - for i, el in enumerate(seq): - if access(el) == target: - return i - return None - - if not strace: - return [], [] - - states = [] - matched_transforms = [] - - state_iter = iter(gdb) - cur_state = next(state_iter) - symb_i = 0 - - # An online trace matching algorithm. - while True: - try: - pc = cur_state.read_register('pc') - - while pc != strace[symb_i].addr: - next_i = find_index(strace[symb_i+1:], pc, lambda t: t.addr) - - # Drop the concrete state if no address in the symbolic trace - # matches - if next_i is None: - print(f'Warning: Dropping concrete state {hex(pc)}, as no' - f' matching instruction can be found in the symbolic' - f' reference trace.') - cur_state = next(state_iter) - pc = cur_state.read_register('pc') - continue - - # Otherwise, jump to the next matching symbolic state - symb_i += next_i + 1 - - assert(cur_state.read_register('pc') == strace[symb_i].addr) - states.append(record_minimal_snapshot( - states[-1] if states else cur_state, - cur_state, - matched_transforms[-1] if matched_transforms else strace[symb_i], - strace[symb_i])) - matched_transforms.append(strace[symb_i]) - cur_state = next(state_iter) - symb_i += 1 - except StopIteration: - break - - return states, matched_transforms - -def main(): - args = make_argparser().parse_args() - - gdbserver_addr = 'localhost' - gdbserver_port = args.port - gdb_server = GDBServerStateIterator(gdbserver_addr, gdbserver_port) - - executable = gdb_server.binary - argv = [] # QEMU's GDB stub does not support 'info proc cmdline' - envp = [] # Can't get the remote target's environment - env = TraceEnvironment(executable, argv, envp, '?') - - # Read pre-computed symbolic trace - with open(args.symb_trace, 'r') as strace: - symb_transforms = parser.parse_transformations(strace) - - # Use symbolic trace to collect concrete trace from QEMU - conc_states, matched_transforms = collect_conc_trace( - gdb_server, - symb_transforms.states) - - # Verify and print result - if not args.quiet: - res = compare_symbolic(conc_states, matched_transforms) - print_result(res, verbosity[args.error_level]) - - if args.output: - from focaccia.parser import serialize_snapshots - with open(args.output, 'w') as file: - serialize_snapshots(Trace(conc_states, env), file) - -if __name__ == "__main__": - main() |