diff options
| author | Theofilos Augoustis <theofilos.augoustis@gmail.com> | 2025-11-05 14:23:48 +0000 |
|---|---|---|
| committer | Theofilos Augoustis <theofilos.augoustis@gmail.com> | 2025-11-06 15:30:13 +0000 |
| commit | d57d639908a19c3dfcc31829eb7996cf3bfc8b4e (patch) | |
| tree | 6e2543279973974e2d618f001ed01e25b61a61a0 /src | |
| parent | 5f2fbf712e222258d5e939dcf474e8039a93fa87 (diff) | |
| download | focaccia-ta/uniformize-qemu.tar.gz focaccia-ta/uniformize-qemu.zip | |
Integrate QEMU plugin directly into Focaccia ta/uniformize-qemu
Diffstat (limited to 'src')
| -rw-r--r-- | src/focaccia/tools/_qemu_tool.py | 20 | ||||
| -rwxr-xr-x | src/focaccia/tools/validate_qemu.py | 124 | ||||
| -rwxr-xr-x | src/focaccia/tools/validation_server.py | 66 |
3 files changed, 112 insertions, 98 deletions
diff --git a/src/focaccia/tools/_qemu_tool.py b/src/focaccia/tools/_qemu_tool.py index 706a9fe..631c79b 100644 --- a/src/focaccia/tools/_qemu_tool.py +++ b/src/focaccia/tools/_qemu_tool.py @@ -106,11 +106,11 @@ class GDBProgramState(ReadableProgramState): raise MemoryAccessError(addr, size, str(err)) class GDBServerStateIterator: - def __init__(self, address: str, port: int): + def __init__(self, remote: str): gdb.execute('set pagination 0') gdb.execute('set sysroot') gdb.execute('set python print-stack full') # enable complete Python tracebacks - gdb.execute(f'target remote {address}:{port}') + gdb.execute(f'target remote {remote}') self._process = gdb.selected_inferior() self._first_next = True @@ -291,22 +291,12 @@ def collect_conc_trace(gdb: GDBServerStateIterator, \ return states, matched_transforms def main(): - prog = make_argparser() - prog.add_argument('hostname', - help='The hostname at which to find the GDB server.') - prog.add_argument('port', - type=int, - help='The port at which to find the GDB server.') - - args = prog.parse_args() - - gdbserver_addr = 'localhost' - gdbserver_port = args.port + args = make_argparser().parse_args() try: - gdb_server = GDBServerStateIterator(gdbserver_addr, gdbserver_port) + gdb_server = GDBServerStateIterator(args.remote) except: - raise Exception(f'Unable to perform basic GDB setup') + raise Exception('Unable to perform basic GDB setup') try: if args.executable is None: diff --git a/src/focaccia/tools/validate_qemu.py b/src/focaccia/tools/validate_qemu.py index 2b7e65c..edef9ae 100755 --- a/src/focaccia/tools/validate_qemu.py +++ b/src/focaccia/tools/validate_qemu.py @@ -6,6 +6,8 @@ Spawn GDB, connect to QEMU's GDB server, and read test states from that. We need two scripts (this one and the primary `qemu_tool.py`) because we can't pass arguments to scripts executed via `gdb -x <script>`. +Alternatively, we connect to the Focaccia QEMU plugin when a socket is given. + This script (`validate_qemu.py`) is the one the user interfaces with. It eventually calls `execv` to spawn a GDB process that calls the main `qemu_tool.py` script; `python validate_qemu.py` essentially behaves as if @@ -21,6 +23,8 @@ import sysconfig import subprocess from focaccia.compare import ErrorTypes +from focaccia.arch import supported_architectures +from focaccia.tools.validation_server import start_validation_server verbosity = { 'info': ErrorTypes.INFO, @@ -50,6 +54,7 @@ memory, and stepping forward by single instructions. action='store_true', help='Don\'t print a verification result.') prog.add_argument('-o', '--output', + type=str, help='If specified with a file name, the recorded' ' emulator states will be written to that file.') prog.add_argument('--error-level', @@ -59,6 +64,23 @@ memory, and stepping forward by single instructions. default=None, help='The executable executed under QEMU, overrides the auto-detected' \ 'executable') + prog.add_argument('--use-socket', + type=str, + nargs='?', + const='/tmp/focaccia.sock', + help='Use QEMU plugin interface given by socket instead of GDB') + prog.add_argument('--guest-arch', + type=str, + choices=supported_architectures.keys(), + help='Architecture of the emulated guest' + '(Only required when using --use-socket)') + prog.add_argument('--remote', + type=str, + help='The hostname:port pair at which to find a QEMU GDB server.') + prog.add_argument('--gdb', + type=str, + default='gdb', + help='GDB binary to invoke.') return prog def quoted(s: str) -> str: @@ -71,50 +93,62 @@ def try_remove(l: list, v): pass def main(): - prog = make_argparser() - prog.add_argument('--gdb', default='gdb', - help='GDB binary to invoke.') - args = prog.parse_args() - - script_dirname = os.path.dirname(__file__) - qemu_tool_path = os.path.join(script_dirname, '_qemu_tool.py') - - # We have to remove all arguments we don't want to pass to the qemu tool - # manually here. Not nice, but what can you do.. - argv = sys.argv - try_remove(argv, '--gdb') - try_remove(argv, args.gdb) - - # Assemble the argv array passed to the qemu tool. GDB does not have a - # mechanism to pass arguments to a script that it executes, so we - # overwrite `sys.argv` manually before invoking the script. - argv_str = f'[{", ".join(quoted(a) for a in argv)}]' - path_str = f'[{", ".join(quoted(s) for s in sys.path)}]' - - paths = sysconfig.get_paths() - candidates = [paths["purelib"], paths["platlib"]] - entries = [p for p in candidates if p and os.path.isdir(p)] - venv_site = entries[0] + argparser = make_argparser() + args = argparser.parse_args() + + # Get environment env = os.environ.copy() - env["PYTHONPATH"] = ','.join([script_dirname, venv_site]) - - print(f"GDB started with Python Path: {env['PYTHONPATH']}") - gdb_cmd = [ - args.gdb, - '-nx', # Don't parse any .gdbinits - '--batch', - '-ex', f'py import sys', - '-ex', f'py sys.argv = {argv_str}', - '-ex', f'py sys.path = {path_str}', - "-ex", f"py import site; site.addsitedir({venv_site!r})", - "-ex", f"py import site; site.addsitedir({script_dirname!r})", - '-x', qemu_tool_path - ] - proc = subprocess.Popen(gdb_cmd, env=env) - - ret = proc.wait() - exit(ret) - -if __name__ == "__main__": - main() + + # Differentiate between the QEMU GDB server and QEMU plugin interfaces + if args.use_socket: + if not args.guest_arch: + argparser.error('--guest-arch is required when --use-socket is specified') + + # QEMU plugin interface + start_validation_server(args.symb_trace, + args.output, + args.use_socket, + args.guest_arch, + env, + verbosity[args.error_level], + args.quiet) + else: + # QEMU GDB interface + script_dirname = os.path.dirname(__file__) + qemu_tool_path = os.path.join(script_dirname, '_qemu_tool.py') + + # We have to remove all arguments we don't want to pass to the qemu tool + # manually here. Not nice, but what can you do.. + argv = sys.argv + try_remove(argv, '--gdb') + try_remove(argv, args.gdb) + + # Assemble the argv array passed to the qemu tool. GDB does not have a + # mechanism to pass arguments to a script that it executes, so we + # overwrite `sys.argv` manually before invoking the script. + argv_str = f'[{", ".join(quoted(a) for a in argv)}]' + path_str = f'[{", ".join(quoted(s) for s in sys.path)}]' + + paths = sysconfig.get_paths() + candidates = [paths["purelib"], paths["platlib"]] + entries = [p for p in candidates if p and os.path.isdir(p)] + venv_site = entries[0] + env["PYTHONPATH"] = ','.join([script_dirname, venv_site]) + + print(f"GDB started with Python Path: {env['PYTHONPATH']}") + gdb_cmd = [ + args.gdb, + '-nx', # Don't parse any .gdbinits + '--batch', + '-ex', 'py import sys', + '-ex', f'py sys.argv = {argv_str}', + '-ex', f'py sys.path = {path_str}', + "-ex", f'py import site; site.addsitedir({venv_site!r})', + "-ex", f'py import site; site.addsitedir({script_dirname!r})', + '-x', qemu_tool_path + ] + proc = subprocess.Popen(gdb_cmd, env=env) + + ret = proc.wait() + exit(ret) diff --git a/src/focaccia/tools/validation_server.py b/src/focaccia/tools/validation_server.py index b87048a..db33ff3 100755 --- a/src/focaccia/tools/validation_server.py +++ b/src/focaccia/tools/validation_server.py @@ -1,25 +1,26 @@ #! /usr/bin/env python3 +import os +import socket +import struct +import logging from typing import Iterable import focaccia.parser as parser from focaccia.arch import supported_architectures, Arch -from focaccia.compare import compare_symbolic -from focaccia.snapshot import ProgramState, ReadableProgramState, \ - RegisterAccessError, MemoryAccessError +from focaccia.compare import compare_symbolic, ErrorTypes +from focaccia.snapshot import ProgramState, RegisterAccessError, MemoryAccessError from focaccia.symbolic import SymbolicTransform, eval_symbol, ExprMem -from focaccia.trace import Trace, TraceEnvironment +from focaccia.trace import Trace from focaccia.utils import print_result -from validate_qemu import make_argparser, verbosity -import socket -import struct -import os +logger = logging.getLogger('focaccia-qemu-validation-server') +debug = logger.debug +info = logger.info +warn = logger.warning -SOCK_PATH = '/tmp/focaccia.sock' - def endian_fmt(endianness: str) -> str: if endianness == 'little': return '<' @@ -124,9 +125,8 @@ class PluginProgramState(ProgramState): raise StopIteration if len(resp) < 180: - print(f'Invalid response length: {len(resp)}') - print(f'Response: {resp}') - return 0 + raise RegisterAccessError(reg, f'Invalid response length when reading {reg}: {len(resp)}' + f' for response {resp}') val, size = unmk_register(resp, self.arch.endianness) @@ -283,7 +283,7 @@ def record_minimal_snapshot(prev_state: ProgramState, regval = cur_state.read_register(regname) out_state.set_register(regname, regval) except RegisterAccessError: - pass + out_state.set_register(regname, 0) for mem in mems: assert(mem.size % 8 == 0) addr = eval_symbol(mem.ptr, prev_state) @@ -377,27 +377,20 @@ def collect_conc_trace(qemu: PluginStateIterator, \ return states, matched_transforms -def main(): - prog = make_argparser() - prog.add_argument('--use-socket', - required=True, - type=str, - help='Use QEMU Plugin interface at <socket> instead of GDB') - prog.add_argument('--guest_arch', - required=True, - type=str, - help='Architecture of the guest being emulated. (Only required when using --use-socket)') - - args = prog.parse_args() - +def start_validation_server(symb_trace: str, + output: str, + socket: str, + guest_arch: str, + env, + verbosity: ErrorTypes, + is_quiet: bool = False): # Read pre-computed symbolic trace - with open(args.symb_trace, 'r') as strace: + with open(symb_trace, 'r') as strace: symb_transforms = parser.parse_transformations(strace) - arch = supported_architectures.get(args.guest_arch) - sock_path = args.use_socket + arch = supported_architectures.get(guest_arch) - qemu = PluginStateIterator(sock_path, arch) + qemu = PluginStateIterator(socket, arch) # Use symbolic trace to collect concrete trace from QEMU conc_states, matched_transforms = collect_conc_trace( @@ -405,15 +398,12 @@ def main(): symb_transforms.states) # Verify and print result - if not args.quiet: + if not is_quiet: res = compare_symbolic(conc_states, matched_transforms) - print_result(res, verbosity[args.error_level]) + print_result(res, verbosity) - if args.output: + if output: from focaccia.parser import serialize_snapshots - with open(args.output, 'w') as file: + with open(output, 'w') as file: serialize_snapshots(Trace(conc_states, env), file) -if __name__ == "__main__": - main() - |