about summary refs log tree commit diff stats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/focaccia/tools/_qemu_tool.py314
-rwxr-xr-xsrc/focaccia/tools/validate_qemu.py111
2 files changed, 425 insertions, 0 deletions
diff --git a/src/focaccia/tools/_qemu_tool.py b/src/focaccia/tools/_qemu_tool.py
new file mode 100644
index 0000000..b365d39
--- /dev/null
+++ b/src/focaccia/tools/_qemu_tool.py
@@ -0,0 +1,314 @@
+"""Invocable like this:
+
+    gdb -n --batch -x qemu_tool.py
+
+But please use `tools/verify_qemu.py` instead because we have some more setup
+work to do.
+"""
+
+import gdb
+from typing import Iterable
+
+import focaccia.parser as parser
+from focaccia.arch import supported_architectures, Arch
+from focaccia.compare import compare_symbolic
+from focaccia.snapshot import ProgramState, ReadableProgramState, \
+                              RegisterAccessError, MemoryAccessError
+from focaccia.symbolic import SymbolicTransform, eval_symbol, ExprMem
+from focaccia.trace import Trace, TraceEnvironment
+from focaccia.utils import print_result
+
+from verify_qemu import make_argparser, verbosity
+
+class GDBProgramState(ReadableProgramState):
+    from focaccia.arch import aarch64, x86
+
+    flag_register_names = {
+        aarch64.archname: 'cpsr',
+        x86.archname: 'eflags',
+    }
+
+    flag_register_decompose = {
+        aarch64.archname: aarch64.decompose_cpsr,
+        x86.archname: x86.decompose_rflags,
+    }
+
+    def __init__(self, process: gdb.Inferior, frame: gdb.Frame, arch: Arch):
+        super().__init__(arch)
+        self._proc = process
+        self._frame = frame
+
+    @staticmethod
+    def _read_vector_reg_aarch64(val, size) -> int:
+        return int(str(val['u']), 10)
+
+    @staticmethod
+    def _read_vector_reg_x86(val, size) -> int:
+        num_longs = size // 64
+        vals = val[f'v{num_longs}_int64']
+        res = 0
+        for i in range(num_longs):
+            val = int(vals[i].cast(gdb.lookup_type('unsigned long')))
+            res += val << i * 64
+        return res
+
+    read_vector_reg = {
+        aarch64.archname: _read_vector_reg_aarch64,
+        x86.archname: _read_vector_reg_x86,
+    }
+
+    def read_register(self, reg: str) -> int:
+        if reg == 'RFLAGS':
+            reg = 'EFLAGS'
+
+        try:
+            val = self._frame.read_register(reg.lower())
+            size = val.type.sizeof * 8
+
+            # For vector registers, we need to apply architecture-specific
+            # logic because GDB's interface is not consistent.
+            if size >= 128:  # Value is a vector
+                if self.arch.archname not in self.read_vector_reg:
+                    raise NotImplementedError(
+                        f'Reading vector registers is not implemented for'
+                        f' architecture {self.arch.archname}.')
+                return self.read_vector_reg[self.arch.archname](val, size)
+            elif size < 64:
+                return int(val.cast(gdb.lookup_type('unsigned int')))
+            # For non-vector values, just return the 64-bit value
+            return int(val.cast(gdb.lookup_type('unsigned long')))
+        except ValueError as err:
+            # Try to access the flags register with `reg` as a logical flag name
+            if self.arch.archname in self.flag_register_names:
+                flags_reg = self.flag_register_names[self.arch.archname]
+                flags = int(self._frame.read_register(flags_reg))
+                flags = self.flag_register_decompose[self.arch.archname](flags)
+                if reg in flags:
+                    return flags[reg]
+            raise RegisterAccessError(reg,
+                                      f'[GDB] Unable to access {reg}: {err}')
+
+    def read_memory(self, addr: int, size: int) -> bytes:
+        try:
+            mem = self._proc.read_memory(addr, size).tobytes()
+            if self.arch.endianness == 'little':
+                return mem
+            else:
+                return bytes(reversed(mem))  # Convert to big endian
+        except gdb.MemoryError as err:
+            raise MemoryAccessError(addr, size, str(err))
+
+class GDBServerStateIterator:
+    def __init__(self, address: str, port: int):
+        gdb.execute('set pagination 0')
+        gdb.execute('set sysroot')
+        gdb.execute(f'target remote {address}:{port}')
+        self._process = gdb.selected_inferior()
+        self._first_next = True
+
+        # Try to determine the guest architecture. This is a bit hacky and
+        # tailored to GDB's naming for the x86-64 architecture.
+        split = self._process.architecture().name().split(':')
+        archname = split[1] if len(split) > 1 else split[0]
+        archname = archname.replace('-', '_')
+        if archname not in supported_architectures:
+            print(f'Error: Current platform ({archname}) is not'
+                  f' supported by Focaccia. Exiting.')
+            exit(1)
+
+        self.arch = supported_architectures[archname]
+        self.binary = self._process.progspace.filename
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        # The first call to __next__ should yield the first program state,
+        # i.e. before stepping the first time
+        if self._first_next:
+            self._first_next = False
+            return GDBProgramState(self._process, gdb.selected_frame(), self.arch)
+
+        # Step
+        pc = gdb.selected_frame().read_register('pc')
+        new_pc = pc
+        while pc == new_pc:  # Skip instruction chains from REP STOS etc.
+            gdb.execute('si', to_string=True)
+            if not self._process.is_valid() or len(self._process.threads()) == 0:
+                raise StopIteration
+            new_pc = gdb.selected_frame().read_register('pc')
+
+        return GDBProgramState(self._process, gdb.selected_frame(), self.arch)
+
+def record_minimal_snapshot(prev_state: ReadableProgramState,
+                            cur_state: ReadableProgramState,
+                            prev_transform: SymbolicTransform,
+                            cur_transform: SymbolicTransform) \
+        -> ProgramState:
+    """Record a minimal snapshot.
+
+    A minimal snapshot must include values (registers and memory) that are
+    accessed by two transformations:
+      1. The values produced by the previous transformation (the
+         transformation that is producing this snapshot) to check these
+         values against expected values calculated from the previous
+         program state.
+      2. The values that act as inputs to the transformation acting on this
+         snapshot, to calculate the expected values of the next snapshot.
+
+    :param prev_transform: The symbolic transformation generating, or
+                           leading to, `cur_state`. Values generated by
+                           this transformation are included in the
+                           snapshot.
+    :param transform: The symbolic transformation operating on this
+                      snapshot. Input values to this transformation are
+                      included in the snapshot.
+    """
+    assert(cur_state.read_register('pc') == cur_transform.addr)
+    assert(prev_transform.arch == cur_transform.arch)
+
+    def get_written_addresses(t: SymbolicTransform):
+        """Get all output memory accesses of a symbolic transformation."""
+        return [ExprMem(a, v.size) for a, v in t.changed_mem.items()]
+
+    def set_values(regs: Iterable[str], mems: Iterable[ExprMem],
+                   cur_state: ReadableProgramState,
+                   prev_state: ReadableProgramState,
+                   out_state: ProgramState):
+        """
+        :param prev_state: Addresses of memory included in the snapshot are
+                           resolved relative to this state.
+        """
+        for regname in regs:
+            try:
+                regval = cur_state.read_register(regname)
+                out_state.set_register(regname, regval)
+            except RegisterAccessError:
+                pass
+        for mem in mems:
+            assert(mem.size % 8 == 0)
+            addr = eval_symbol(mem.ptr, prev_state)
+            try:
+                mem = cur_state.read_memory(addr, int(mem.size / 8))
+                out_state.write_memory(addr, mem)
+            except MemoryAccessError:
+                pass
+
+    state = ProgramState(cur_transform.arch)
+    state.set_register('PC', cur_transform.addr)
+
+    set_values(prev_transform.changed_regs.keys(),
+               get_written_addresses(prev_transform),
+               cur_state,
+               prev_state,  # Evaluate memory addresses based on previous
+                            # state because they are that state's output
+                            # addresses.
+               state)
+    set_values(cur_transform.get_used_registers(),
+               cur_transform.get_used_memory_addresses(),
+               cur_state,
+               cur_state,
+               state)
+    return state
+
+def collect_conc_trace(gdb: GDBServerStateIterator, \
+                       strace: list[SymbolicTransform]) \
+        -> tuple[list[ProgramState], list[SymbolicTransform]]:
+    """Collect a trace of concrete states from GDB.
+
+    Records minimal concrete states from GDB by using symbolic trace
+    information to determine which register/memory values are required to
+    verify the correctness of the program running in GDB.
+
+    May drop symbolic transformations if the symbolic trace and the GDB trace
+    diverge (e.g. because of differences in environment, etc.). Returns the
+    new, possibly modified, symbolic trace that matches the returned concrete
+    trace.
+
+    :return: A list of concrete states and a list of corresponding symbolic
+             transformations. The lists are guaranteed to have the same length.
+    """
+    def find_index(seq, target, access=lambda el: el):
+        for i, el in enumerate(seq):
+            if access(el) == target:
+                return i
+        return None
+
+    if not strace:
+        return [], []
+
+    states = []
+    matched_transforms = []
+
+    state_iter = iter(gdb)
+    cur_state = next(state_iter)
+    symb_i = 0
+
+    # An online trace matching algorithm.
+    while True:
+        try:
+            pc = cur_state.read_register('pc')
+
+            while pc != strace[symb_i].addr:
+                next_i = find_index(strace[symb_i+1:], pc, lambda t: t.addr)
+
+                # Drop the concrete state if no address in the symbolic trace
+                # matches
+                if next_i is None:
+                    print(f'Warning: Dropping concrete state {hex(pc)}, as no'
+                          f' matching instruction can be found in the symbolic'
+                          f' reference trace.')
+                    cur_state = next(state_iter)
+                    pc = cur_state.read_register('pc')
+                    continue
+
+                # Otherwise, jump to the next matching symbolic state
+                symb_i += next_i + 1
+
+            assert(cur_state.read_register('pc') == strace[symb_i].addr)
+            states.append(record_minimal_snapshot(
+                states[-1] if states else cur_state,
+                cur_state,
+                matched_transforms[-1] if matched_transforms else strace[symb_i],
+                strace[symb_i]))
+            matched_transforms.append(strace[symb_i])
+            cur_state = next(state_iter)
+            symb_i += 1
+        except StopIteration:
+            break
+
+    return states, matched_transforms
+
+def main():
+    args = make_argparser().parse_args()
+
+    gdbserver_addr = 'localhost'
+    gdbserver_port = args.port
+    gdb_server = GDBServerStateIterator(gdbserver_addr, gdbserver_port)
+
+    executable = gdb_server.binary
+    argv = []  # QEMU's GDB stub does not support 'info proc cmdline'
+    envp = []  # Can't get the remote target's environment
+    env = TraceEnvironment(executable, argv, envp, '?')
+
+    # Read pre-computed symbolic trace
+    with open(args.symb_trace, 'r') as strace:
+        symb_transforms = parser.parse_transformations(strace)
+
+    # Use symbolic trace to collect concrete trace from QEMU
+    conc_states, matched_transforms = collect_conc_trace(
+        gdb_server,
+        symb_transforms.states)
+
+    # Verify and print result
+    if not args.quiet:
+        res = compare_symbolic(conc_states, matched_transforms)
+        print_result(res, verbosity[args.error_level])
+
+    if args.output:
+        from focaccia.parser import serialize_snapshots
+        with open(args.output, 'w') as file:
+            serialize_snapshots(Trace(conc_states, env), file)
+
+if __name__ == "__main__":
+    main()
diff --git a/src/focaccia/tools/validate_qemu.py b/src/focaccia/tools/validate_qemu.py
new file mode 100755
index 0000000..f5d0bb2
--- /dev/null
+++ b/src/focaccia/tools/validate_qemu.py
@@ -0,0 +1,111 @@
+#!/usr/bin/env python3
+
+"""
+Spawn GDB, connect to QEMU's GDB server, and read test states from that.
+
+We need two scripts (this one and the primary `qemu_tool.py`) because we can't
+pass arguments to scripts executed via `gdb -x <script>`.
+
+This script (`verify_qemu.py`) is the one the user interfaces with. It
+eventually calls `execv` to spawn a GDB process that calls the main
+`qemu_tool.py` script; `python verify_qemu.py` essentially behaves as if
+something like `gdb --batch -x qemu_tool.py` were executed instead. Before it
+starts GDB, though, it parses command line arguments and applies some weird but
+necessary logic to pass them to `qemu_tool.py`.
+"""
+
+import argparse
+import os
+import subprocess
+import sys
+
+import focaccia
+from focaccia.compare import ErrorTypes
+
+verbosity = {
+    'info':    ErrorTypes.INFO,
+    'warning': ErrorTypes.POSSIBLE,
+    'error':   ErrorTypes.CONFIRMED,
+}
+
+def make_argparser():
+    """This is also used by the GDB-invoked script to parse its args."""
+    prog = argparse.ArgumentParser()
+    prog.description = """Use Focaccia to test QEMU.
+
+Uses QEMU's GDB-server feature to read QEMU's emulated state and test its
+transformation during emulation against a symbolic truth.
+
+In fact, this tool could be used to test any emulator that provides a
+GDB-server interface. The server must support reading registers, reading
+memory, and stepping forward by single instructions.
+"""
+    prog.add_argument('hostname',
+                      help='The hostname at which to find the GDB server.')
+    prog.add_argument('port',
+                      type=int,
+                      help='The port at which to find the GDB server.')
+    prog.add_argument('--symb-trace',
+                      required=True,
+                      help='A pre-computed symbolic transformation trace to' \
+                           ' be used for verification. Generate this with' \
+                           ' the `tools/capture_transforms.py` tool.')
+    prog.add_argument('-q', '--quiet',
+                      default=False,
+                      action='store_true',
+                      help='Don\'t print a verification result.')
+    prog.add_argument('-o', '--output',
+                      help='If specified with a file name, the recorded'
+                           ' emulator states will be written to that file.')
+    prog.add_argument('--error-level',
+                      default='warning',
+                      choices=list(verbosity.keys()))
+    return prog
+
+def quoted(s: str) -> str:
+    return f'"{s}"'
+
+def try_remove(l: list, v):
+    try:
+        l.remove(v)
+    except ValueError:
+        pass
+
+def main():
+    prog = make_argparser()
+    prog.add_argument('--gdb', default='gdb',
+                      help='GDB binary to invoke.')
+    args = prog.parse_args()
+
+    filepath = focaccia.__file__
+    qemu_tool_path = os.path.join(os.path.dirname(filepath), '_qemu_tool.py')
+
+    # We have to remove all arguments we don't want to pass to the qemu tool
+    # manually here. Not nice, but what can you do..
+    argv = sys.argv
+    try_remove(argv, '--gdb')
+    try_remove(argv, args.gdb)
+
+    # Assemble the argv array passed to the qemu tool. GDB does not have a
+    # mechanism to pass arguments to a script that it executes, so we
+    # overwrite `sys.argv` manually before invoking the script.
+    argv_str = f'[{", ".join(quoted(a) for a in argv)}]'
+    path_str = f'[{", ".join(quoted(s) for s in sys.path)}]'
+
+    gdb_cmd = [
+        args.gdb,
+        '-nx',  # Don't parse any .gdbinits
+        '--batch',
+        '-ex', f'py import sys',
+        '-ex', f'py sys.argv = {argv_str}',
+        '-ex', f'py sys.path = {path_str}',
+        '-x', qemu_tool_path
+    ]
+    proc = subprocess.Popen(gdb_cmd)
+
+    ret = proc.wait()
+    exit(ret)
+
+if __name__ == "__main__":
+    main()
+