about summary refs log tree commit diff stats
path: root/src
diff options
context:
space:
mode:
authorTheofilos Augoustis <theofilos.augoustis@gmail.com>2025-11-05 14:23:48 +0000
committerTheofilos Augoustis <theofilos.augoustis@gmail.com>2025-11-06 15:30:13 +0000
commitd57d639908a19c3dfcc31829eb7996cf3bfc8b4e (patch)
tree6e2543279973974e2d618f001ed01e25b61a61a0 /src
parent5f2fbf712e222258d5e939dcf474e8039a93fa87 (diff)
downloadfocaccia-ta/uniformize-qemu.tar.gz
focaccia-ta/uniformize-qemu.zip
Integrate QEMU plugin directly into Focaccia ta/uniformize-qemu
Diffstat (limited to 'src')
-rw-r--r--src/focaccia/tools/_qemu_tool.py20
-rwxr-xr-xsrc/focaccia/tools/validate_qemu.py124
-rwxr-xr-xsrc/focaccia/tools/validation_server.py66
3 files changed, 112 insertions, 98 deletions
diff --git a/src/focaccia/tools/_qemu_tool.py b/src/focaccia/tools/_qemu_tool.py
index 706a9fe..631c79b 100644
--- a/src/focaccia/tools/_qemu_tool.py
+++ b/src/focaccia/tools/_qemu_tool.py
@@ -106,11 +106,11 @@ class GDBProgramState(ReadableProgramState):
             raise MemoryAccessError(addr, size, str(err))
 
 class GDBServerStateIterator:
-    def __init__(self, address: str, port: int):
+    def __init__(self, remote: str):
         gdb.execute('set pagination 0')
         gdb.execute('set sysroot')
         gdb.execute('set python print-stack full') # enable complete Python tracebacks
-        gdb.execute(f'target remote {address}:{port}')
+        gdb.execute(f'target remote {remote}')
         self._process = gdb.selected_inferior()
         self._first_next = True
 
@@ -291,22 +291,12 @@ def collect_conc_trace(gdb: GDBServerStateIterator, \
     return states, matched_transforms
 
 def main():
-    prog = make_argparser()
-    prog.add_argument('hostname',
-                      help='The hostname at which to find the GDB server.')
-    prog.add_argument('port',
-                      type=int,
-                      help='The port at which to find the GDB server.')
-
-    args = prog.parse_args()
-
-    gdbserver_addr = 'localhost'
-    gdbserver_port = args.port
+    args = make_argparser().parse_args()
 
     try:
-        gdb_server = GDBServerStateIterator(gdbserver_addr, gdbserver_port)
+        gdb_server = GDBServerStateIterator(args.remote)
     except:
-        raise Exception(f'Unable to perform basic GDB setup')
+        raise Exception('Unable to perform basic GDB setup')
 
     try:
         if args.executable is None:
diff --git a/src/focaccia/tools/validate_qemu.py b/src/focaccia/tools/validate_qemu.py
index 2b7e65c..edef9ae 100755
--- a/src/focaccia/tools/validate_qemu.py
+++ b/src/focaccia/tools/validate_qemu.py
@@ -6,6 +6,8 @@ Spawn GDB, connect to QEMU's GDB server, and read test states from that.
 We need two scripts (this one and the primary `qemu_tool.py`) because we can't
 pass arguments to scripts executed via `gdb -x <script>`.
 
+Alternatively, we connect to the Focaccia QEMU plugin when a socket is given.
+
 This script (`validate_qemu.py`) is the one the user interfaces with. It
 eventually calls `execv` to spawn a GDB process that calls the main
 `qemu_tool.py` script; `python validate_qemu.py` essentially behaves as if
@@ -21,6 +23,8 @@ import sysconfig
 import subprocess
 
 from focaccia.compare import ErrorTypes
+from focaccia.arch import supported_architectures
+from focaccia.tools.validation_server import start_validation_server
 
 verbosity = {
     'info':    ErrorTypes.INFO,
@@ -50,6 +54,7 @@ memory, and stepping forward by single instructions.
                       action='store_true',
                       help='Don\'t print a verification result.')
     prog.add_argument('-o', '--output',
+                      type=str,
                       help='If specified with a file name, the recorded'
                            ' emulator states will be written to that file.')
     prog.add_argument('--error-level',
@@ -59,6 +64,23 @@ memory, and stepping forward by single instructions.
                       default=None,
                       help='The executable executed under QEMU, overrides the auto-detected' \
                             'executable')
+    prog.add_argument('--use-socket',
+                      type=str,
+                      nargs='?',
+                      const='/tmp/focaccia.sock',
+                      help='Use QEMU plugin interface given by socket instead of GDB')
+    prog.add_argument('--guest-arch',
+                      type=str,
+                      choices=supported_architectures.keys(),
+                      help='Architecture of the emulated guest'
+                           '(Only required when using --use-socket)')
+    prog.add_argument('--remote',
+                      type=str,
+                      help='The hostname:port pair at which to find a QEMU GDB server.')
+    prog.add_argument('--gdb', 
+                      type=str,
+                      default='gdb',
+                      help='GDB binary to invoke.')
     return prog
 
 def quoted(s: str) -> str:
@@ -71,50 +93,62 @@ def try_remove(l: list, v):
         pass
 
 def main():
-    prog = make_argparser()
-    prog.add_argument('--gdb', default='gdb',
-                      help='GDB binary to invoke.')
-    args = prog.parse_args()
-
-    script_dirname = os.path.dirname(__file__)
-    qemu_tool_path = os.path.join(script_dirname, '_qemu_tool.py')
-
-    # We have to remove all arguments we don't want to pass to the qemu tool
-    # manually here. Not nice, but what can you do..
-    argv = sys.argv
-    try_remove(argv, '--gdb')
-    try_remove(argv, args.gdb)
-
-    # Assemble the argv array passed to the qemu tool. GDB does not have a
-    # mechanism to pass arguments to a script that it executes, so we
-    # overwrite `sys.argv` manually before invoking the script.
-    argv_str = f'[{", ".join(quoted(a) for a in argv)}]'
-    path_str = f'[{", ".join(quoted(s) for s in sys.path)}]'
-
-    paths = sysconfig.get_paths()
-    candidates = [paths["purelib"], paths["platlib"]]
-    entries = [p for p in candidates if p and os.path.isdir(p)]
-    venv_site = entries[0]
+    argparser = make_argparser()
+    args = argparser.parse_args()
+
+    # Get environment
     env = os.environ.copy()
-    env["PYTHONPATH"] = ','.join([script_dirname, venv_site])
-
-    print(f"GDB started with Python Path: {env['PYTHONPATH']}")
-    gdb_cmd = [
-        args.gdb,
-        '-nx',  # Don't parse any .gdbinits
-        '--batch',
-        '-ex', f'py import sys',
-        '-ex', f'py sys.argv = {argv_str}',
-        '-ex', f'py sys.path = {path_str}',
-        "-ex", f"py import site; site.addsitedir({venv_site!r})",
-        "-ex", f"py import site; site.addsitedir({script_dirname!r})",
-        '-x', qemu_tool_path
-    ]
-    proc = subprocess.Popen(gdb_cmd, env=env)
-
-    ret = proc.wait()
-    exit(ret)
-
-if __name__ == "__main__":
-    main()
+
+    # Differentiate between the QEMU GDB server and QEMU plugin interfaces
+    if args.use_socket:
+        if not args.guest_arch:
+            argparser.error('--guest-arch is required when --use-socket is specified')
+
+        # QEMU plugin interface
+        start_validation_server(args.symb_trace,
+                                args.output,
+                                args.use_socket,
+                                args.guest_arch,
+                                env,
+                                verbosity[args.error_level],
+                                args.quiet)
+    else:
+        # QEMU GDB interface
+        script_dirname = os.path.dirname(__file__)
+        qemu_tool_path = os.path.join(script_dirname, '_qemu_tool.py')
+
+        # We have to remove all arguments we don't want to pass to the qemu tool
+        # manually here. Not nice, but what can you do..
+        argv = sys.argv
+        try_remove(argv, '--gdb')
+        try_remove(argv, args.gdb)
+
+        # Assemble the argv array passed to the qemu tool. GDB does not have a
+        # mechanism to pass arguments to a script that it executes, so we
+        # overwrite `sys.argv` manually before invoking the script.
+        argv_str = f'[{", ".join(quoted(a) for a in argv)}]'
+        path_str = f'[{", ".join(quoted(s) for s in sys.path)}]'
+
+        paths = sysconfig.get_paths()
+        candidates = [paths["purelib"], paths["platlib"]]
+        entries = [p for p in candidates if p and os.path.isdir(p)]
+        venv_site = entries[0]
+        env["PYTHONPATH"] = ','.join([script_dirname, venv_site])
+
+        print(f"GDB started with Python Path: {env['PYTHONPATH']}")
+        gdb_cmd = [
+            args.gdb,
+            '-nx',  # Don't parse any .gdbinits
+            '--batch',
+            '-ex',  'py import sys',
+            '-ex', f'py sys.argv = {argv_str}',
+            '-ex', f'py sys.path = {path_str}',
+            "-ex", f'py import site; site.addsitedir({venv_site!r})',
+            "-ex", f'py import site; site.addsitedir({script_dirname!r})',
+            '-x', qemu_tool_path
+        ]
+        proc = subprocess.Popen(gdb_cmd, env=env)
+
+        ret = proc.wait()
+        exit(ret)
 
diff --git a/src/focaccia/tools/validation_server.py b/src/focaccia/tools/validation_server.py
index b87048a..db33ff3 100755
--- a/src/focaccia/tools/validation_server.py
+++ b/src/focaccia/tools/validation_server.py
@@ -1,25 +1,26 @@
 #! /usr/bin/env python3
 
+import os
+import socket
+import struct
+import logging
 from typing import Iterable
 
 import focaccia.parser as parser
 from focaccia.arch import supported_architectures, Arch
-from focaccia.compare import compare_symbolic
-from focaccia.snapshot import ProgramState, ReadableProgramState, \
-                              RegisterAccessError, MemoryAccessError
+from focaccia.compare import compare_symbolic, ErrorTypes
+from focaccia.snapshot import ProgramState, RegisterAccessError, MemoryAccessError
 from focaccia.symbolic import SymbolicTransform, eval_symbol, ExprMem
-from focaccia.trace import Trace, TraceEnvironment
+from focaccia.trace import Trace
 from focaccia.utils import print_result
 
-from validate_qemu import make_argparser, verbosity
 
-import socket
-import struct
-import os
+logger = logging.getLogger('focaccia-qemu-validation-server')
+debug = logger.debug
+info = logger.info
+warn = logger.warning
 
 
-SOCK_PATH = '/tmp/focaccia.sock'
-
 def endian_fmt(endianness: str) -> str:
     if endianness == 'little':
         return '<'
@@ -124,9 +125,8 @@ class PluginProgramState(ProgramState):
                 raise StopIteration
 
             if len(resp) < 180:
-                print(f'Invalid response length: {len(resp)}')
-                print(f'Response: {resp}')
-                return 0
+                raise RegisterAccessError(reg, f'Invalid response length when reading {reg}: {len(resp)}'
+                                          f' for response {resp}')
 
             val, size = unmk_register(resp, self.arch.endianness)
 
@@ -283,7 +283,7 @@ def record_minimal_snapshot(prev_state: ProgramState,
                 regval = cur_state.read_register(regname)
                 out_state.set_register(regname, regval)
             except RegisterAccessError:
-                pass
+                out_state.set_register(regname, 0)
         for mem in mems:
             assert(mem.size % 8 == 0)
             addr = eval_symbol(mem.ptr, prev_state)
@@ -377,27 +377,20 @@ def collect_conc_trace(qemu: PluginStateIterator, \
     return states, matched_transforms
 
 
-def main():
-    prog = make_argparser()
-    prog.add_argument('--use-socket',
-                      required=True,
-                      type=str,
-                      help='Use QEMU Plugin interface at <socket> instead of GDB')
-    prog.add_argument('--guest_arch',
-                      required=True,
-                      type=str,
-                      help='Architecture of the guest being emulated. (Only required when using --use-socket)')
-
-    args = prog.parse_args()
-
+def start_validation_server(symb_trace: str,
+                            output: str,
+                            socket: str,
+                            guest_arch: str,
+                            env,
+                            verbosity: ErrorTypes,
+                            is_quiet: bool = False):
     # Read pre-computed symbolic trace
-    with open(args.symb_trace, 'r') as strace:
+    with open(symb_trace, 'r') as strace:
         symb_transforms = parser.parse_transformations(strace)
 
-    arch = supported_architectures.get(args.guest_arch)
-    sock_path = args.use_socket
+    arch = supported_architectures.get(guest_arch)
 
-    qemu = PluginStateIterator(sock_path, arch)
+    qemu = PluginStateIterator(socket, arch)
 
     # Use symbolic trace to collect concrete trace from QEMU
     conc_states, matched_transforms = collect_conc_trace(
@@ -405,15 +398,12 @@ def main():
         symb_transforms.states)
 
     # Verify and print result
-    if not args.quiet:
+    if not is_quiet:
         res = compare_symbolic(conc_states, matched_transforms)
-        print_result(res, verbosity[args.error_level])
+        print_result(res, verbosity)
 
-    if args.output:
+    if output:
         from focaccia.parser import serialize_snapshots
-        with open(args.output, 'w') as file:
+        with open(output, 'w') as file:
             serialize_snapshots(Trace(conc_states, env), file)
 
-if __name__ == "__main__":
-    main()
-