diff options
Diffstat (limited to 'tools/qemu_tool.py')
| -rw-r--r-- | tools/qemu_tool.py | 233 |
1 files changed, 111 insertions, 122 deletions
diff --git a/tools/qemu_tool.py b/tools/qemu_tool.py index c7730fd..d670692 100644 --- a/tools/qemu_tool.py +++ b/tools/qemu_tool.py @@ -3,134 +3,123 @@ gdb -n --batch -x qemu_tool.py """ -import argparse -import re -import shlex -import subprocess -from typing import TextIO +import gdb +import platform import focaccia.parser as parser -from focaccia.arch import x86 -from focaccia.lldb_target import MemoryMap +from focaccia.arch import supported_architectures, Arch +from focaccia.compare import compare_symbolic, ErrorTypes from focaccia.snapshot import ProgramState - -def parse_memory_maps(stream: TextIO) -> tuple[list[MemoryMap], str]: - """ - :return: Returns the list of parsed memory mappings as well as the first - line in the stream that does not belong to the memory mapping - information, i.e. the line that terminates the block of mapping - information. - The line is returned for the technical reason that the parser - needs to read a line from the stream in order to determine that - this line does no longer belong to the mapping information; but it - might still contain other important information. - """ - mappings = [] - while True: - line = stream.readline() - split = line.split(' ') - if len(split) != 3 or not re.match('^[0-9a-f]+-[0-9a-f]+$', split[0]): - return mappings, line - - addr_range, size, perms = split - start, end = addr_range.split('-') - start, end = int(start, 16), int(end, 16) - mappings.append(MemoryMap(start, end, '[unnamed]', perms)) - -def copy_memory(proc, state: ProgramState, maps: list[MemoryMap]): - """Copy memory from a GDB process to a ProgramState object. - - Problem: Reading large mappings via GDB takes way too long (~500ms for ~8MB). - """ - for mapping in maps: - # Only copy read- and writeable memory from the process. This is a - # heuristic to try to copy only heap and stack. - if 'rw' not in mapping.perms: - continue - - map_size = mapping.end_address - mapping.start_address - mem = proc.read_memory(mapping.start_address, map_size) - assert(mem.contiguous) - assert(mem.nbytes == len(mem.tobytes())) - assert(mem.nbytes == map_size) - state.write_memory(mapping.start_address, mem.tobytes()) - -def run_gdb(qemu_log: TextIO, qemu_port: int) -> list[ProgramState]: - import gdb - - gdb.execute('set pagination 0') - gdb.execute('set sysroot') - gdb.execute(f'target remote localhost:{qemu_port}') - process = gdb.selected_inferior() - - arch = x86.ArchX86() - mappings: list[MemoryMap] = [] - states: list[ProgramState] = [] - - while process.is_valid() and len(process.threads()) > 0: - for line in qemu_log: - if re.match('^start +end +size +prot$', line): - mappings, line = parse_memory_maps(qemu_log) - - if line.startswith('Trace'): - states.append(ProgramState(arch)) - copy_memory(process, states[-1], mappings) - continue - - if states: - parser._parse_qemu_line(line, states[-1]) - - gdb.execute('si', to_string=True) +from focaccia.symbolic import SymbolicTransform, eval_symbol +from focaccia.utils import print_result + +from verify_qemu import make_argparser + +class GDBProgramState: + def __init__(self, process: gdb.Inferior, frame: gdb.Frame): + self._proc = process + self._frame = frame + + def read_register(self, regname: str) -> int | None: + try: + return int(self._frame.read_register(regname.lower())) + except ValueError as err: + from focaccia.arch import x86 + rflags = int(self._frame.read_register('eflags')) + rflags = x86.decompose_rflags(rflags) + if regname in rflags: + return rflags[regname] + + print(f'{regname}: {err}') + return None + + def read_memory(self, addr: int, size: int) -> bytes | None: + try: + return self._proc.read_memory(addr, size).tobytes() + except gdb.MemoryError as err: + print(f'@{size}[{hex(addr)}]: {err}') + return None + +class GDBServerStateIterator: + def __init__(self, address: str, port: int): + gdb.execute('set pagination 0') + gdb.execute('set sysroot') + gdb.execute(f'target remote {address}:{port}') + self._process = gdb.selected_inferior() + self._first_next = True + + def __iter__(self): + return self + + def __next__(self): + # The first call to __next__ should yield the first program state, + # i.e. before stepping the first time + if self._first_next: + self._first_next = False + return GDBProgramState(self._process, gdb.selected_frame()) + + # Step + pc = gdb.selected_frame().read_register('pc') + new_pc = pc + while pc == new_pc: + gdb.execute('si', to_string=True) + if not self._process.is_valid() or len(self._process.threads()) == 0: + raise StopIteration + new_pc = gdb.selected_frame().read_register('pc') + + return GDBProgramState(self._process, gdb.selected_frame()) + +def collect_conc_trace(arch: Arch, \ + gdb: GDBServerStateIterator, \ + strace: list[SymbolicTransform]) \ + -> list[ProgramState]: + states = [] + for qemu, transform in zip(gdb, strace): + qemu_pc = qemu.read_register('pc') + assert(qemu_pc is not None) + + if qemu_pc != transform.addr: + print(f'Fatal error: QEMU\'s program counter' + f' ({hex(qemu_pc)}) does not match the' + f' expected program counter in the symbolic trace' + f' ({hex(transform.addr)}).') + print(f'Processing only partial trace up to this instruction.') + return states + + state = ProgramState(arch) + state.set_register('PC', transform.addr) + + accessed_regs = transform.get_used_registers() + accessed_mems = transform.get_used_memory_addresses() + for regname in accessed_regs: + regval = qemu.read_register(regname) + if regval is not None: + state.set_register(regname, regval) + for mem in accessed_mems: + assert(mem.size % 8 == 0) + addr = eval_symbol(mem.ptr, qemu) + mem = qemu.read_memory(addr, int(mem.size / 8)) + if mem is not None: + state.write_memory(addr, mem) + states.append(state) return states -def make_argparser(): - prog = argparse.ArgumentParser() - prog.add_argument('binary', - type=str, - help='The binary to run and record.') - prog.add_argument('--binary-args', - type=str, - help='A string of arguments to be passed to the binary.') - prog.add_argument('--output', '-o', help='Name of output file.') - prog.add_argument('--gdbserver-port', type=int, default=12421) - prog.add_argument('--qemu', type=str, default='qemu-x86_64', - help='QEMU binary to invoke. [Default: qemu-x86_64') - prog.add_argument('--qemu-log', type=str, default='qemu.log') - prog.add_argument('--qemu-extra-args', type=str, default='', - help='Arguments passed to QEMU in addition to the' - ' default ones required by this script.') - return prog - -if __name__ == "__main__": +def main(): args = make_argparser().parse_args() - binary = args.binary - binary_args = shlex.split(args.binary_args) if args.binary_args else '' - - qemu_bin = args.qemu gdbserver_port = args.gdbserver_port - qemu_log_name = args.qemu_log - qemu_args = [ - qemu_bin, - '--trace', 'target_mmap*', - '--trace', 'memory_notdirty_*', - # We write QEMU's output to a log file, then read it from that file. - # This is preferred over reading from the process's stdout pipe because - # we require a non-blocking solution that returns when all available - # lines have been read. - '-D', qemu_log_name, - '-d', 'cpu,fpu,exec,unimp,page,strace', - '-g', str(gdbserver_port), - *shlex.split(args.qemu_extra_args), - binary, - *binary_args, - ] - - qemu = subprocess.Popen(qemu_args) - - with open(qemu_log_name, 'r') as qemu_log: - snapshots = run_gdb(qemu_log, gdbserver_port) - - with open(args.output, 'w') as file: - parser.serialize_snapshots(snapshots, file) + with open(args.symb_trace, 'r') as strace: + symb_transforms = parser.parse_transformations(strace) + + arch = supported_architectures[platform.machine()] + conc_states = collect_conc_trace( + arch, + GDBServerStateIterator('localhost', gdbserver_port), + symb_transforms) + + res = compare_symbolic(conc_states, symb_transforms) + print_result(res, ErrorTypes.POSSIBLE) + +if __name__ == "__main__": + main() |