about summary refs log tree commit diff stats
diff options
context:
space:
mode:
authorTheofilos Augoustis <37243696+taugoust@users.noreply.github.com>2025-11-04 10:37:21 +0100
committerGitHub <noreply@github.com>2025-11-04 10:37:21 +0100
commit5f2fbf712e222258d5e939dcf474e8039a93fa87 (patch)
treea565f331dc468f9f5f6dd70b4fba01ef6f2fd967
parent6de47d0892975f802bcbeacf974f36c5c2dc2a76 (diff)
parent1cac72b8753bf7a5cce5d6b9342aade42773f249 (diff)
downloadfocaccia-5f2fbf712e222258d5e939dcf474e8039a93fa87.tar.gz
focaccia-5f2fbf712e222258d5e939dcf474e8039a93fa87.zip
Merge pull request #12 from TUM-DSE/sr/plugin
Plugin validation
-rw-r--r--README.md12
-rw-r--r--src/focaccia/arch/aarch64.py67
-rw-r--r--src/focaccia/arch/arch.py1
-rw-r--r--src/focaccia/arch/x86.py4
-rw-r--r--src/focaccia/snapshot.py20
-rw-r--r--src/focaccia/symbolic.py2
-rw-r--r--src/focaccia/tools/_qemu_tool.py9
-rwxr-xr-xsrc/focaccia/tools/validate_qemu.py5
-rwxr-xr-xsrc/focaccia/tools/validation_server.py419
9 files changed, 500 insertions, 39 deletions
diff --git a/README.md b/README.md
index 94e6889..68033d9 100644
--- a/README.md
+++ b/README.md
@@ -29,7 +29,7 @@ It will take a while to compile.
 ### QEMU
 
 A number of additional tools are included to simplify use when validating QEMU:
-`capture-transforms`, `convert-log`, `validate-qemu`. They enable the following workflow.
+`capture-transforms`, `convert-log`, `validate-qemu`, `validation_server`. They enable the following workflow.
 
 ```bash
 capture-transforms -o oracle.trace bug.out
@@ -37,6 +37,16 @@ qemu-x86_64 -g 12345 bug.out &
 validate-qemu --symb-trace oracle.trace localhost 12345
 ```
 
+Alternatively if you have access to the focaccia QEMU plugin:
+
+```bash
+validation_server.py --symb-trace oracle.trace --use-socket=/tmp/focaccia.sock --guest_arch=<arch>
+```
+After you see `Listening for QEMU Plugin connection at /tmp/focaccia.sock...` you can start QEMU like this:
+```bash
+qemu-<arch> [-one-insn-per-tb] --plugin build/contrib/plugins/libfocaccia.so <bug.out>
+```
+
 Using this workflow, Focaccia can determine whether a mistranslation occured in that particular QEMU run.
 
 ### Box64
diff --git a/src/focaccia/arch/aarch64.py b/src/focaccia/arch/aarch64.py
index 6c1163e..0e7d98c 100644
--- a/src/focaccia/arch/aarch64.py
+++ b/src/focaccia/arch/aarch64.py
@@ -7,37 +7,37 @@ from .arch import Arch, RegisterDescription as _Reg
 archname = 'aarch64'
 
 registers = [
-    _Reg(('R0',  0, 64), ('X0',  0, 64), ('W0',  0, 32)),
-    _Reg(('R1',  0, 64), ('X1',  0, 64), ('W1',  0, 32)),
-    _Reg(('R2',  0, 64), ('X2',  0, 64), ('W2',  0, 32)),
-    _Reg(('R3',  0, 64), ('X3',  0, 64), ('W3',  0, 32)),
-    _Reg(('R4',  0, 64), ('X4',  0, 64), ('W4',  0, 32)),
-    _Reg(('R5',  0, 64), ('X5',  0, 64), ('W5',  0, 32)),
-    _Reg(('R6',  0, 64), ('X6',  0, 64), ('W6',  0, 32)),
-    _Reg(('R7',  0, 64), ('X7',  0, 64), ('W7',  0, 32)),
-    _Reg(('R8',  0, 64), ('X8',  0, 64), ('W8',  0, 32)),
-    _Reg(('R9',  0, 64), ('X9',  0, 64), ('W9',  0, 32)),
-    _Reg(('R10', 0, 64), ('X10', 0, 64), ('W10', 0, 32)),
-    _Reg(('R11', 0, 64), ('X11', 0, 64), ('W11', 0, 32)),
-    _Reg(('R12', 0, 64), ('X12', 0, 64), ('W12', 0, 32)),
-    _Reg(('R13', 0, 64), ('X13', 0, 64), ('W13', 0, 32)),
-    _Reg(('R14', 0, 64), ('X14', 0, 64), ('W14', 0, 32)),
-    _Reg(('R15', 0, 64), ('X15', 0, 64), ('W15', 0, 32)),
-    _Reg(('R16', 0, 64), ('X16', 0, 64), ('W16', 0, 32)),
-    _Reg(('R17', 0, 64), ('X17', 0, 64), ('W17', 0, 32)),
-    _Reg(('R18', 0, 64), ('X18', 0, 64), ('W18', 0, 32)),
-    _Reg(('R19', 0, 64), ('X19', 0, 64), ('W19', 0, 32)),
-    _Reg(('R20', 0, 64), ('X20', 0, 64), ('W20', 0, 32)),
-    _Reg(('R21', 0, 64), ('X21', 0, 64), ('W21', 0, 32)),
-    _Reg(('R22', 0, 64), ('X22', 0, 64), ('W22', 0, 32)),
-    _Reg(('R23', 0, 64), ('X23', 0, 64), ('W23', 0, 32)),
-    _Reg(('R24', 0, 64), ('X24', 0, 64), ('W24', 0, 32)),
-    _Reg(('R25', 0, 64), ('X25', 0, 64), ('W25', 0, 32)),
-    _Reg(('R26', 0, 64), ('X26', 0, 64), ('W26', 0, 32)),
-    _Reg(('R27', 0, 64), ('X27', 0, 64), ('W27', 0, 32)),
-    _Reg(('R28', 0, 64), ('X28', 0, 64), ('W28', 0, 32)),
-    _Reg(('R29', 0, 64), ('X29', 0, 64), ('W29', 0, 32)),
-    _Reg(('R30', 0, 64), ('X30', 0, 64), ('W30', 0, 32), ('LR', 0, 64)),
+    _Reg(('X0',  0, 64), ('W0',  0, 32)),
+    _Reg(('X1',  0, 64), ('W1',  0, 32)),
+    _Reg(('X2',  0, 64), ('W2',  0, 32)),
+    _Reg(('X3',  0, 64), ('W3',  0, 32)),
+    _Reg(('X4',  0, 64), ('W4',  0, 32)),
+    _Reg(('X5',  0, 64), ('W5',  0, 32)),
+    _Reg(('X6',  0, 64), ('W6',  0, 32)),
+    _Reg(('X7',  0, 64), ('W7',  0, 32)),
+    _Reg(('X8',  0, 64), ('W8',  0, 32)),
+    _Reg(('X9',  0, 64), ('W9',  0, 32)),
+    _Reg(('X10', 0, 64), ('W10', 0, 32)),
+    _Reg(('X11', 0, 64), ('W11', 0, 32)),
+    _Reg(('X12', 0, 64), ('W12', 0, 32)),
+    _Reg(('X13', 0, 64), ('W13', 0, 32)),
+    _Reg(('X14', 0, 64), ('W14', 0, 32)),
+    _Reg(('X15', 0, 64), ('W15', 0, 32)),
+    _Reg(('X16', 0, 64), ('W16', 0, 32)),
+    _Reg(('X17', 0, 64), ('W17', 0, 32)),
+    _Reg(('X18', 0, 64), ('W18', 0, 32)),
+    _Reg(('X19', 0, 64), ('W19', 0, 32)),
+    _Reg(('X20', 0, 64), ('W20', 0, 32)),
+    _Reg(('X21', 0, 64), ('W21', 0, 32)),
+    _Reg(('X22', 0, 64), ('W22', 0, 32)),
+    _Reg(('X23', 0, 64), ('W23', 0, 32)),
+    _Reg(('X24', 0, 64), ('W24', 0, 32)),
+    _Reg(('X25', 0, 64), ('W25', 0, 32)),
+    _Reg(('X26', 0, 64), ('W26', 0, 32)),
+    _Reg(('X27', 0, 64), ('W27', 0, 32)),
+    _Reg(('X28', 0, 64), ('W28', 0, 32)),
+    _Reg(('X29', 0, 64), ('W29', 0, 32)),
+    _Reg(('X30', 0, 64), ('W30', 0, 32), ('LR', 0, 64)),
 
     _Reg(('RZR', 0, 64), ('XZR', 0, 64), ('WZR', 0, 32)),
     _Reg(('SP', 0, 64), ('RSP', 0, 64)),
@@ -162,6 +162,10 @@ def decompose_cpsr(cpsr: int) -> dict[str, int]:
 class ArchAArch64(Arch):
     def __init__(self, endianness: Arch.Endianness):
         super().__init__(archname, registers, 64, endianness)
+        self.ignored_regs = [
+            'N', 'Z', 'C', 'V', 'Q', 'SSBS', 'PAN', 'DIT', 'GE',
+            'E', 'A', 'I', 'F', 'M'
+        ]
 
     def to_regname(self, name: str) -> str | None:
         reg = super().to_regname(name)
@@ -175,4 +179,3 @@ class ArchAArch64(Arch):
             from . import aarch64_dczid as dczid
             return dczid.read
         return None
-
diff --git a/src/focaccia/arch/arch.py b/src/focaccia/arch/arch.py
index 8aac8b5..234b0d9 100644
--- a/src/focaccia/arch/arch.py
+++ b/src/focaccia/arch/arch.py
@@ -46,6 +46,7 @@ class Arch():
         self.archname = archname
         self.ptr_size = ptr_size
         self.endianness: Literal['little', 'big'] = endianness
+        self.ignored_regs: list[str] = []
 
         self._accessors = {}
         for desc in registers:
diff --git a/src/focaccia/arch/x86.py b/src/focaccia/arch/x86.py
index fefab37..809055e 100644
--- a/src/focaccia/arch/x86.py
+++ b/src/focaccia/arch/x86.py
@@ -186,6 +186,10 @@ def compose_rflags(rflags: dict[str, int]) -> int:
 class ArchX86(Arch):
     def __init__(self):
         super().__init__(archname, registers, 64)
+        self.ignored_regs = [
+            'EFLAGS', 'CF', 'PF', 'AF', 'ZF', 'SF', 'TF', 'IF', 'DF', 'OF',
+            'CS', 'DS', 'SS', 'ES', 'FS', 'GS',
+        ]
 
     def to_regname(self, name: str) -> str | None:
         """The X86 override of the standard register name lookup.
diff --git a/src/focaccia/snapshot.py b/src/focaccia/snapshot.py
index 1945d71..03a03cd 100644
--- a/src/focaccia/snapshot.py
+++ b/src/focaccia/snapshot.py
@@ -27,6 +27,13 @@ class SparseMemory:
         off = addr % self.page_size
         return addr - off, off
 
+    def drop_all(self):
+        self._pages.clear()
+
+    def test(self, addr: int) -> bool:
+        page_addr, off = self._to_page_addr_and_offset(addr)
+        return page_addr in self._pages
+
     def read(self, addr: int, size: int) -> bytes:
         """Read a number of bytes from memory.
         :param addr: The offset from where to read.
@@ -83,6 +90,7 @@ class ReadableProgramState:
     """Interface for read-only program states."""
     def __init__(self, arch: Arch):
         self.arch = arch
+        self.strict = True
 
     def read_register(self, reg: str) -> int:
         """Read a register's value.
@@ -113,6 +121,18 @@ class ProgramState(ReadableProgramState):
         self.regs: dict[str, int | None] = {reg: None for reg in arch.regnames}
         self.mem = SparseMemory()
 
+    def test_register(self, reg: str) -> bool:
+        """Test whether a register has a value assigned.
+
+        :raise RegisterAccessError: If `reg` is not a register name.
+        """
+        acc = self.arch.get_reg_accessor(reg)
+        if acc is None:
+            raise RegisterAccessError(reg, f'Not a register name: {reg}')
+
+        assert(acc.base_reg in self.regs)
+        return self.regs[acc.base_reg] is not None
+
     def read_register(self, reg: str) -> int:
         """Read a register's value.
 
diff --git a/src/focaccia/symbolic.py b/src/focaccia/symbolic.py
index 6a99b60..7b6098e 100644
--- a/src/focaccia/symbolic.py
+++ b/src/focaccia/symbolic.py
@@ -338,6 +338,8 @@ class SymbolicTransform:
         """
         res = {}
         for regname, expr in self.changed_regs.items():
+            if not conc_state.strict and regname.upper() in self.arch.ignored_regs:
+                continue
             res[regname] = eval_symbol(expr, conc_state)
         return res
 
diff --git a/src/focaccia/tools/_qemu_tool.py b/src/focaccia/tools/_qemu_tool.py
index 23c1488..706a9fe 100644
--- a/src/focaccia/tools/_qemu_tool.py
+++ b/src/focaccia/tools/_qemu_tool.py
@@ -291,7 +291,14 @@ def collect_conc_trace(gdb: GDBServerStateIterator, \
     return states, matched_transforms
 
 def main():
-    args = make_argparser().parse_args()
+    prog = make_argparser()
+    prog.add_argument('hostname',
+                      help='The hostname at which to find the GDB server.')
+    prog.add_argument('port',
+                      type=int,
+                      help='The port at which to find the GDB server.')
+
+    args = prog.parse_args()
 
     gdbserver_addr = 'localhost'
     gdbserver_port = args.port
diff --git a/src/focaccia/tools/validate_qemu.py b/src/focaccia/tools/validate_qemu.py
index 56e3beb..2b7e65c 100755
--- a/src/focaccia/tools/validate_qemu.py
+++ b/src/focaccia/tools/validate_qemu.py
@@ -40,11 +40,6 @@ In fact, this tool could be used to test any emulator that provides a
 GDB-server interface. The server must support reading registers, reading
 memory, and stepping forward by single instructions.
 """
-    prog.add_argument('hostname',
-                      help='The hostname at which to find the GDB server.')
-    prog.add_argument('port',
-                      type=int,
-                      help='The port at which to find the GDB server.')
     prog.add_argument('--symb-trace',
                       required=True,
                       help='A pre-computed symbolic transformation trace to' \
diff --git a/src/focaccia/tools/validation_server.py b/src/focaccia/tools/validation_server.py
new file mode 100755
index 0000000..b87048a
--- /dev/null
+++ b/src/focaccia/tools/validation_server.py
@@ -0,0 +1,419 @@
+#! /usr/bin/env python3
+
+from typing import Iterable
+
+import focaccia.parser as parser
+from focaccia.arch import supported_architectures, Arch
+from focaccia.compare import compare_symbolic
+from focaccia.snapshot import ProgramState, ReadableProgramState, \
+                              RegisterAccessError, MemoryAccessError
+from focaccia.symbolic import SymbolicTransform, eval_symbol, ExprMem
+from focaccia.trace import Trace, TraceEnvironment
+from focaccia.utils import print_result
+
+from validate_qemu import make_argparser, verbosity
+
+import socket
+import struct
+import os
+
+
+SOCK_PATH = '/tmp/focaccia.sock'
+
+def endian_fmt(endianness: str) -> str:
+    if endianness == 'little':
+        return '<'
+    else:
+        return '>'
+
+def mk_command(cmd: str, endianness: str, reg: str="", addr: int=0, size: int=0) -> bytes:
+    # char[16]:regname | long long:addr long long:size | long long:unused
+    # READ REG         | READ MEM                      | STEP ONE
+
+    if cmd == 'read register':
+        fmt = f'{endian_fmt(endianness)}16s9s'
+        return struct.pack(fmt,reg.encode('utf-8'),"READ REG".encode('utf-8'))
+    elif cmd == 'read memory':
+        fmt = f'{endian_fmt(endianness)}QQ9s'
+        return struct.pack(fmt, addr, size, "READ MEM".encode('utf-8'))
+    elif cmd == 'step':
+        fmt = f'{endian_fmt(endianness)}qq9s'
+        return struct.pack(fmt, 0, 0, "STEP ONE".encode('utf-8'))
+    else:
+        raise ValueError(f'Unknown command {cmd}')
+def unmk_memory(msg: bytes, endianness: str) -> tuple:
+    # packed!
+    # unsigned long long: addr
+    # unsigned long: length
+    fmt = f'{endian_fmt(endianness)}QQ'
+    addr, length = struct.unpack(fmt, msg)
+
+    return addr, length
+
+def unmk_register(msg: bytes, endianness: str) -> tuple:
+    # packed!
+    # char[108]:regname | unsigned long:bytes | char[64]:value
+    fmt = f'{endian_fmt(endianness)}108sQ64s'
+    reg_name, size, val = struct.unpack(fmt, msg)
+    reg_name = reg_name.decode('utf-8').rstrip('\x00')
+
+    if reg_name == "UNKNOWN":
+        raise RegisterAccessError(reg_name,
+                                  f'[QEMU Plugin] Unable to access register {reg_name}.')
+
+    val = val[:size]
+    val = int.from_bytes(val, endianness)
+    return val, size
+
+class PluginProgramState(ProgramState):
+    from focaccia.arch import aarch64, x86
+
+    flag_register_names = {
+        aarch64.archname: 'cpsr',
+        x86.archname: 'eflags',
+    }
+
+    flag_register_decompose = {
+        aarch64.archname: aarch64.decompose_cpsr,
+        x86.archname: x86.decompose_rflags,
+    }
+
+    def _flush_caches(self):
+        for r in self.regs.keys():
+            self.regs[r] = None
+        self.mem.drop_all()
+
+
+    def __init__(self, arch: Arch):
+        super().__init__(arch)
+        self.strict = False
+
+    def read_register(self, reg: str, no_cached: bool=False) -> int:
+        global CONN
+
+        if reg == 'RFLAGS':
+            reg = 'EFLAGS'
+
+        flags = self.flag_register_decompose[self.arch.archname](0).keys()
+        if reg in flags and self.arch.archname in self.flag_register_names:
+            reg_name = self.flag_register_names[self.arch.archname]
+        else:
+            reg_name = self.arch.to_regname(reg)
+
+        if reg_name is None:
+            raise RegisterAccessError(reg, f'Not a register name: {reg}')
+
+        reg_acc = self.arch.get_reg_accessor(reg_name)
+        if reg_acc is None:
+            raise RegisterAccessError(reg, f'Not a enclosing register name: {reg}')
+            exit(-1)
+        reg_name = reg_acc.base_reg.lower()
+
+        val = None
+        from_cache = False
+        if not no_cached and super().test_register(reg_name):
+            val = super().read_register(reg_name)
+            from_cache = True
+        else:
+            msg = mk_command("read register", self.arch.endianness, reg=reg_name)
+            CONN.send(msg)
+
+            try:
+                resp = CONN.recv(180)
+            except ConnectionResetError:
+                raise StopIteration
+
+            if len(resp) < 180:
+                print(f'Invalid response length: {len(resp)}')
+                print(f'Response: {resp}')
+                return 0
+
+            val, size = unmk_register(resp, self.arch.endianness)
+
+        # Try to access the flags register with `reg` as a logical flag name
+        if reg in flags and self.arch.archname in self.flag_register_names:
+            flags_reg = self.flag_register_names[self.arch.archname]
+            _flags = self.flag_register_decompose[self.arch.archname](val)
+            if reg in _flags:
+                if not from_cache:
+                    self.set_register(reg, _flags[reg])
+                return _flags[reg]
+            raise RegisterAccessError(f'Unable to access flag {reg}.')
+
+        if not from_cache:
+            self.set_register(reg, val)
+        return val & reg_acc.mask >> reg_acc.start
+
+    def read_memory(self, addr: int, size: int) -> bytes:
+        global CONN
+
+        if self.mem.test(addr):
+            return super().read_memory(addr, size)
+
+        # print(f'Reading memory at {addr:x}, size={size}')
+
+        msg = mk_command("read memory", self.arch.endianness, addr=addr, size=size)
+        CONN.send(msg)
+
+        try:
+            resp = CONN.recv(16)
+        except ConnectionResetError:
+            raise StopIteration
+        _addr, length = unmk_memory(resp, self.arch.endianness)
+
+        if _addr != addr or length == 0:
+            raise MemoryAccessError(
+                _addr, size,
+                f'Unable to access memory at address {addr:x}, size={size}.')
+            return b''
+
+        mem = b''
+        while len(mem) < length:
+            try:
+                resp = CONN.recv(length - len(mem))
+            except ConnectionResetError:
+                raise StopIteration
+            mem += resp
+
+        self.write_memory(addr, mem)
+        return mem
+
+    def step(self):
+        global CONN
+
+        self._flush_caches()
+        msg = mk_command("step", self.arch.endianness)
+        CONN.send(msg)
+
+
+        return
+
+class PluginStateIterator:
+
+    def __init__(self, sock_path: str, arch: Arch):
+        global SOCK
+        global CONN
+
+        self.sock_path = sock_path
+        self.arch = arch
+        self._first_next = True
+
+
+        # Start the server that waits for QEMU to connect
+        try:
+            os.unlink(self.sock_path)
+        except FileNotFoundError:
+            pass
+        # TODO: allow new connections when QEMU clones
+        SOCK = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+
+        print(f'Listening for QEMU Plugin connection at {self.sock_path}...')
+        SOCK.bind(self.sock_path)
+        SOCK.listen(1)
+
+        CONN, qemu_addr = SOCK.accept()
+
+        # Handshake with QEMU
+        pid_b = CONN.recv(4)
+        pid = struct.unpack('i', pid_b)[0]
+        print(f'Connected to QEMU instance with PID {pid}.')
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        # The first call to __next__ should yield the first program state,
+        # i.e. after stepping the first time
+        if self._first_next:
+            self._first_next = False
+            self.state = PluginProgramState(self.arch)
+            #self.state.step()
+            return self.state
+
+        # Step
+        pc = self.state.read_register('pc')
+        new_pc = pc
+        while pc == new_pc:  # Skip instruction chains from REP STOS etc.
+            self.state.step()
+            new_pc = self.state.read_register('pc', True)
+
+        return self.state
+
+def record_minimal_snapshot(prev_state: ProgramState,
+                            cur_state: PluginProgramState,
+                            prev_transform: SymbolicTransform,
+                            cur_transform: SymbolicTransform) \
+        -> ProgramState:
+    """Record a minimal snapshot.
+
+    A minimal snapshot must include values (registers and memory) that are
+    accessed by two transformations:
+      1. The values produced by the previous transformation (the
+         transformation that is producing this snapshot) to check these
+         values against expected values calculated from the previous
+         program state.
+      2. The values that act as inputs to the transformation acting on this
+         snapshot, to calculate the expected values of the next snapshot.
+
+    :param prev_transform: The symbolic transformation generating, or
+                           leading to, `cur_state`. Values generated by
+                           this transformation are included in the
+                           snapshot.
+    :param transform: The symbolic transformation operating on this
+                      snapshot. Input values to this transformation are
+                      included in the snapshot.
+    """
+    assert(cur_state.read_register('pc') == cur_transform.addr)
+    assert(prev_transform.arch == cur_transform.arch)
+
+    def get_written_addresses(t: SymbolicTransform) -> Iterable[ExprMem]:
+        """Get all output memory accesses of a symbolic transformation."""
+        return [ExprMem(a, v.size) for a, v in t.changed_mem.items()]
+
+    def set_values(regs: Iterable[str], mems: Iterable[ExprMem],
+                   cur_state: PluginProgramState,
+                   prev_state: PluginProgramState,
+                   out_state: ProgramState):
+        """
+        :param prev_state: Addresses of memory included in the snapshot are
+                           resolved relative to this state.
+        """
+        for regname in regs:
+            try:
+                regval = cur_state.read_register(regname)
+                out_state.set_register(regname, regval)
+            except RegisterAccessError:
+                pass
+        for mem in mems:
+            assert(mem.size % 8 == 0)
+            addr = eval_symbol(mem.ptr, prev_state)
+            try:
+                mem = cur_state.read_memory(addr, int(mem.size / 8))
+                out_state.write_memory(addr, mem)
+            except MemoryAccessError:
+                pass
+
+    state = ProgramState(cur_transform.arch)
+    state.set_register('pc', cur_transform.addr)
+
+    set_values(prev_transform.changed_regs.keys(),
+               get_written_addresses(prev_transform),
+               cur_state,
+               prev_state,
+               state)
+    set_values(cur_transform.get_used_registers(),
+               cur_transform.get_used_memory_addresses(),
+               cur_state,
+               cur_state,
+               state)
+    return state
+
+def collect_conc_trace(qemu: PluginStateIterator, \
+                       strace: list[SymbolicTransform]) \
+        -> tuple[list[ProgramState], list[SymbolicTransform]]:
+    """Collect a trace of concrete states from QEMU.
+
+    Records minimal concrete states from QEMU by using symbolic trace
+    information to determine which register/memory values are required to
+    verify the correctness of QEMU.
+
+    May drop symbolic transformations if the symbolic trace and the QEMU trace
+    diverge (e.g. because of differences in environment, etc.). Returns the
+    new, possibly modified, symbolic trace that matches the returned concrete
+    trace.
+
+    :return: A list of concrete states and a list of corresponding symbolic
+             transformations. The lists are guaranteed to have the same length.
+    """
+    def find_index(seq, target, access=lambda el: el):
+        for i, el in enumerate(seq):
+            if access(el) == target:
+                return i
+        return None
+
+    if not strace:
+        return [], []
+
+    states = []
+    matched_transforms = []
+
+    state_iter = iter(qemu)
+    cur_state = next(state_iter)
+    symb_i = 0
+
+    # An online trace matching algorithm.
+    while True:
+        try:
+            pc = cur_state.read_register('pc')
+
+            while pc != strace[symb_i].addr:
+                next_i = find_index(strace[symb_i+1:], pc, lambda t: t.addr)
+
+                # Drop the concrete state if no address in the symbolic trace
+                # matches
+                if next_i is None:
+                    print(f'Warning: Dropping concrete state {hex(pc)}, as no'
+                          f' matching instruction can be found in the symbolic'
+                          f' reference trace.')
+                    cur_state = next(state_iter)
+                    pc = cur_state.read_register('pc', True)
+                    continue
+
+                # Otherwise, jump to the next matching symbolic state
+                symb_i += next_i + 1
+
+            assert(cur_state.read_register('pc') == strace[symb_i].addr)
+            states.append(record_minimal_snapshot(
+                states[-1] if states else cur_state,
+                cur_state,
+                matched_transforms[-1] if matched_transforms else strace[symb_i],
+                strace[symb_i]))
+            matched_transforms.append(strace[symb_i])
+            cur_state = next(state_iter)
+            symb_i += 1
+        except StopIteration:
+            break
+
+    return states, matched_transforms
+
+
+def main():
+    prog = make_argparser()
+    prog.add_argument('--use-socket',
+                      required=True,
+                      type=str,
+                      help='Use QEMU Plugin interface at <socket> instead of GDB')
+    prog.add_argument('--guest_arch',
+                      required=True,
+                      type=str,
+                      help='Architecture of the guest being emulated. (Only required when using --use-socket)')
+
+    args = prog.parse_args()
+
+    # Read pre-computed symbolic trace
+    with open(args.symb_trace, 'r') as strace:
+        symb_transforms = parser.parse_transformations(strace)
+
+    arch = supported_architectures.get(args.guest_arch)
+    sock_path = args.use_socket
+
+    qemu = PluginStateIterator(sock_path, arch)
+
+    # Use symbolic trace to collect concrete trace from QEMU
+    conc_states, matched_transforms = collect_conc_trace(
+        qemu,
+        symb_transforms.states)
+
+    # Verify and print result
+    if not args.quiet:
+        res = compare_symbolic(conc_states, matched_transforms)
+        print_result(res, verbosity[args.error_level])
+
+    if args.output:
+        from focaccia.parser import serialize_snapshots
+        with open(args.output, 'w') as file:
+            serialize_snapshots(Trace(conc_states, env), file)
+
+if __name__ == "__main__":
+    main()
+