about summary refs log tree commit diff stats
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--tools/_qemu_tool.py251
-rw-r--r--tools/qemu_tool.py125
-rw-r--r--tools/verify_qemu.py47
3 files changed, 288 insertions, 135 deletions
diff --git a/tools/_qemu_tool.py b/tools/_qemu_tool.py
new file mode 100644
index 0000000..ff7dc38
--- /dev/null
+++ b/tools/_qemu_tool.py
@@ -0,0 +1,251 @@
+"""Invocable like this:
+
+    gdb -n --batch -x qemu_tool.py
+
+But please use `tools/verify_qemu.py` instead because we have some more setup
+work to do.
+"""
+
+import gdb
+import platform
+from typing import Iterable
+
+import focaccia.parser as parser
+from focaccia.arch import supported_architectures, Arch
+from focaccia.compare import compare_symbolic
+from focaccia.snapshot import ProgramState, ReadableProgramState, \
+                              RegisterAccessError, MemoryAccessError
+from focaccia.symbolic import SymbolicTransform, eval_symbol, ExprMem
+from focaccia.utils import print_result
+
+from verify_qemu import make_argparser, verbosity
+
+class GDBProgramState(ReadableProgramState):
+    def __init__(self, process: gdb.Inferior, frame: gdb.Frame):
+        self._proc = process
+        self._frame = frame
+
+    def read_register(self, reg: str) -> int:
+        try:
+            val = self._frame.read_register(reg.lower())
+            return int(val) & 0xffffffffffffffff  # force int to be unsigned
+        except ValueError as err:
+            from focaccia.arch import x86
+            rflags = int(self._frame.read_register('eflags'))
+            rflags = x86.decompose_rflags(rflags)
+            if reg in rflags:
+                return rflags[reg]
+            raise RegisterAccessError(reg, str(err))
+
+    def read_memory(self, addr: int, size: int) -> bytes:
+        try:
+            mem = self._proc.read_memory(addr, size).tobytes()
+            return bytes(reversed(mem))  # Convert to big endian
+        except gdb.MemoryError as err:
+            raise MemoryAccessError(addr, size, str(err))
+
+class GDBServerStateIterator:
+    def __init__(self, address: str, port: int):
+        gdb.execute('set pagination 0')
+        gdb.execute('set sysroot')
+        gdb.execute(f'target remote {address}:{port}')
+        self._process = gdb.selected_inferior()
+        self._first_next = True
+
+        # Try to determine the guest architecture. This is a bit hacky and
+        # tailored to GDB's naming for the x86-64 architecture.
+        split = self._process.architecture().name().split(':')
+        archname = split[1] if len(split) > 1 else split[0]
+        archname = archname.replace('-', '_')
+        if archname not in supported_architectures:
+            print(f'Error: Current platform ({archname}) is not'
+                  f' supported by Focaccia. Exiting.')
+            exit(1)
+
+        self.arch = supported_architectures[archname]
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        # The first call to __next__ should yield the first program state,
+        # i.e. before stepping the first time
+        if self._first_next:
+            self._first_next = False
+            return GDBProgramState(self._process, gdb.selected_frame())
+
+        # Step
+        pc = gdb.selected_frame().read_register('pc')
+        new_pc = pc
+        while pc == new_pc:  # Skip instruction chains from REP STOS etc.
+            gdb.execute('si', to_string=True)
+            if not self._process.is_valid() or len(self._process.threads()) == 0:
+                raise StopIteration
+            new_pc = gdb.selected_frame().read_register('pc')
+
+        return GDBProgramState(self._process, gdb.selected_frame())
+
+def collect_conc_trace(gdb: GDBServerStateIterator, \
+                       strace: list[SymbolicTransform]) \
+        -> tuple[list[ProgramState], list[SymbolicTransform]]:
+    """Collect a trace of concrete states from GDB.
+
+    Records minimal concrete states from GDB by using symbolic trace
+    information to determine which register/memory values are required to
+    verify the correctness of the program running in GDB.
+
+    May drop symbolic transformations if the symbolic trace and the GDB trace
+    diverge (e.g. because of differences in environment, etc.). Returns the
+    new, possibly modified, symbolic trace that matches the returned concrete
+    trace.
+
+    :return: A list of concrete states and a list of corresponding symbolic
+             transformations. The lists are guaranteed to have the same length.
+    """
+    def record_snapshot(prev_state: ReadableProgramState,
+                        cur_state: GDBProgramState,
+                        prev_transform: SymbolicTransform,
+                        cur_transform: SymbolicTransform) \
+            -> ProgramState:
+        """Record a minimal snapshot.
+
+        A minimal snapshot must include values (registers and memory) that are
+        accessed by two transformations:
+          1. The values produced by the previous transformation (the
+             transformation that is producing this snapshot) to check these
+             values against expected values calculated from the previous
+             program state.
+          2. The values that act as inputs to the transformation acting on this
+             snapshot, to calculate the expected values of the next snapshot.
+
+        :param prev_transform: The symbolic transformation generating, or
+                               leading to, `gdb_state`. Values generated by
+                               this transformation are included in the
+                               snapshot.
+        :param transform: The symbolic transformation operating on this
+                          snapshot. Input values to this transformation are
+                          included in the snapshot.
+        """
+        assert(cur_state.read_register('pc') == cur_transform.addr)
+
+        def get_written_addresses(t: SymbolicTransform):
+            """Get all output memory accesses of a symbolic transformation."""
+            return [ExprMem(a, v.size) for a, v in t.changed_mem.items()]
+
+        def set_values(regs: Iterable[str], mems: Iterable[ExprMem],
+                       cur_state: GDBProgramState, prev_state: ReadableProgramState,
+                       out_state: ProgramState):
+            """
+            :param prev_state: Addresses of memory included in the snapshot are
+                               resolved relative to this state.
+            """
+            for regname in regs:
+                regval = cur_state.read_register(regname)
+                try:
+                    out_state.set_register(regname, regval)
+                except RegisterAccessError:
+                    pass
+            for mem in mems:
+                assert(mem.size % 8 == 0)
+                addr = eval_symbol(mem.ptr, prev_state)
+                try:
+                    mem = cur_state.read_memory(addr, int(mem.size / 8))
+                    out_state.write_memory(addr, mem)
+                except MemoryAccessError:
+                    pass
+
+        state = ProgramState(gdb.arch)
+        state.set_register('PC', cur_transform.addr)
+
+        set_values(prev_transform.changed_regs.keys(),
+                   get_written_addresses(prev_transform),
+                   cur_state,
+                   prev_state,  # Evaluate memory addresses based on previous
+                                # state because they are that state's output
+                                # addresses.
+                   state)
+        set_values(cur_transform.get_used_registers(),
+                   cur_transform.get_used_memory_addresses(),
+                   cur_state,
+                   cur_state,
+                   state)
+        return state
+
+    def find_index(seq, target, access=lambda el: el):
+        for i, el in enumerate(seq):
+            if access(el) == target:
+                return i
+        return None
+
+    if not strace:
+        return [], []
+
+    states = []
+    matched_transforms = []
+
+    state_iter = iter(gdb)
+    cur_state = next(state_iter)
+    symb_i = 0
+    while True:
+        try:
+            pc = cur_state.read_register('pc')
+            assert(pc is not None)
+
+            while pc != strace[symb_i].addr:
+                next_i = find_index(strace[symb_i+1:], pc, lambda t: t.addr)
+
+                # Drop the concrete state if no address in the symbolic trace
+                # matches
+                if next_i is None:
+                    print(f'Warning: Dropping concrete state {hex(pc)}, as no'
+                          f' matching instruction can be found in the symbolic'
+                          f' reference trace.')
+                    cur_state = next(state_iter)
+                    pc = cur_state.read_register('pc')
+                    assert(pc is not None)
+                    continue
+
+                # Otherwise, jump to the next matching symbolic state
+                symb_i += next_i + 1
+
+            assert(cur_state.read_register('pc') == strace[symb_i].addr)
+            states.append(record_snapshot(
+                states[-1] if states else cur_state,
+                cur_state,
+                matched_transforms[-1] if matched_transforms else strace[symb_i],
+                strace[symb_i]))
+            matched_transforms.append(strace[symb_i])
+            cur_state = next(state_iter)
+            symb_i += 1
+        except StopIteration:
+            break
+
+    return states, matched_transforms
+
+def main():
+    args = make_argparser().parse_args()
+
+    gdbserver_addr = 'localhost'
+    gdbserver_port = args.port
+
+    # Read pre-computed symbolic trace
+    with open(args.symb_trace, 'r') as strace:
+        symb_transforms = parser.parse_transformations(strace)
+
+    # Use symbolic trace to collect concrete trace from QEMU
+    conc_states, matched_transforms = collect_conc_trace(
+        GDBServerStateIterator(gdbserver_addr, gdbserver_port),
+        symb_transforms)
+
+    # Verify and print result
+    if not args.quiet:
+        res = compare_symbolic(conc_states, matched_transforms)
+        print_result(res, verbosity[args.error_level])
+
+    if args.output:
+        from focaccia.parser import serialize_snapshots
+        with open(args.output, 'w') as file:
+            serialize_snapshots(conc_states, file)
+
+if __name__ == "__main__":
+    main()
diff --git a/tools/qemu_tool.py b/tools/qemu_tool.py
deleted file mode 100644
index d670692..0000000
--- a/tools/qemu_tool.py
+++ /dev/null
@@ -1,125 +0,0 @@
-"""Invocable like this:
-
-    gdb -n --batch -x qemu_tool.py
-"""
-
-import gdb
-import platform
-
-import focaccia.parser as parser
-from focaccia.arch import supported_architectures, Arch
-from focaccia.compare import compare_symbolic, ErrorTypes
-from focaccia.snapshot import ProgramState
-from focaccia.symbolic import SymbolicTransform, eval_symbol
-from focaccia.utils import print_result
-
-from verify_qemu import make_argparser
-
-class GDBProgramState:
-    def __init__(self, process: gdb.Inferior, frame: gdb.Frame):
-        self._proc = process
-        self._frame = frame
-
-    def read_register(self, regname: str) -> int | None:
-        try:
-            return int(self._frame.read_register(regname.lower()))
-        except ValueError as err:
-            from focaccia.arch import x86
-            rflags = int(self._frame.read_register('eflags'))
-            rflags = x86.decompose_rflags(rflags)
-            if regname in rflags:
-                return rflags[regname]
-
-            print(f'{regname}: {err}')
-            return None
-
-    def read_memory(self, addr: int, size: int) -> bytes | None:
-        try:
-            return self._proc.read_memory(addr, size).tobytes()
-        except gdb.MemoryError as err:
-            print(f'@{size}[{hex(addr)}]: {err}')
-            return None
-
-class GDBServerStateIterator:
-    def __init__(self, address: str, port: int):
-        gdb.execute('set pagination 0')
-        gdb.execute('set sysroot')
-        gdb.execute(f'target remote {address}:{port}')
-        self._process = gdb.selected_inferior()
-        self._first_next = True
-
-    def __iter__(self):
-        return self
-
-    def __next__(self):
-        # The first call to __next__ should yield the first program state,
-        # i.e. before stepping the first time
-        if self._first_next:
-            self._first_next = False
-            return GDBProgramState(self._process, gdb.selected_frame())
-
-        # Step
-        pc = gdb.selected_frame().read_register('pc')
-        new_pc = pc
-        while pc == new_pc:
-            gdb.execute('si', to_string=True)
-            if not self._process.is_valid() or len(self._process.threads()) == 0:
-                raise StopIteration
-            new_pc = gdb.selected_frame().read_register('pc')
-
-        return GDBProgramState(self._process, gdb.selected_frame())
-
-def collect_conc_trace(arch: Arch, \
-                       gdb: GDBServerStateIterator, \
-                       strace: list[SymbolicTransform]) \
-        -> list[ProgramState]:
-    states = []
-    for qemu, transform in zip(gdb, strace):
-        qemu_pc = qemu.read_register('pc')
-        assert(qemu_pc is not None)
-
-        if qemu_pc != transform.addr:
-            print(f'Fatal error: QEMU\'s program counter'
-                  f' ({hex(qemu_pc)}) does not match the'
-                  f' expected program counter in the symbolic trace'
-                  f' ({hex(transform.addr)}).')
-            print(f'Processing only partial trace up to this instruction.')
-            return states
-
-        state = ProgramState(arch)
-        state.set_register('PC', transform.addr)
-
-        accessed_regs = transform.get_used_registers()
-        accessed_mems = transform.get_used_memory_addresses()
-        for regname in accessed_regs:
-            regval = qemu.read_register(regname)
-            if regval is not None:
-                state.set_register(regname, regval)
-        for mem in accessed_mems:
-            assert(mem.size % 8 == 0)
-            addr = eval_symbol(mem.ptr, qemu)
-            mem = qemu.read_memory(addr, int(mem.size / 8))
-            if mem is not None:
-                state.write_memory(addr, mem)
-        states.append(state)
-
-    return states
-
-def main():
-    args = make_argparser().parse_args()
-
-    gdbserver_port = args.gdbserver_port
-    with open(args.symb_trace, 'r') as strace:
-        symb_transforms = parser.parse_transformations(strace)
-
-    arch = supported_architectures[platform.machine()]
-    conc_states = collect_conc_trace(
-        arch,
-        GDBServerStateIterator('localhost', gdbserver_port),
-        symb_transforms)
-
-    res = compare_symbolic(conc_states, symb_transforms)
-    print_result(res, ErrorTypes.POSSIBLE)
-
-if __name__ == "__main__":
-    main()
diff --git a/tools/verify_qemu.py b/tools/verify_qemu.py
index 98437cb..da2e985 100644
--- a/tools/verify_qemu.py
+++ b/tools/verify_qemu.py
@@ -17,15 +17,46 @@ import os
 import subprocess
 import sys
 
+from focaccia.compare import ErrorTypes
+
+verbosity = {
+    'info':    ErrorTypes.INFO,
+    'warning': ErrorTypes.POSSIBLE,
+    'error':   ErrorTypes.CONFIRMED,
+}
+
 def make_argparser():
     """This is also used by the GDB-invoked script to parse its args."""
     prog = argparse.ArgumentParser()
+    prog.description = """Use Focaccia to test QEMU.
+
+Uses QEMU's GDB-server feature to read QEMU's emulated state and test its
+transformation during emulation against a symbolic truth.
+
+In fact, this tool could be used to test any emulator that provides a
+GDB-server interface. The server must support reading registers, reading
+memory, and stepping forward by single instructions.
+
+The GDB server is assumed to be at 'localhost'.
+"""
+    prog.add_argument('port',
+                      type=int,
+                      help='The port at which QEMU\'s GDB server resides.')
     prog.add_argument('--symb-trace',
                       required=True,
-                      help='A symbolic transformation trace to be used for' \
-                           ' verification.')
-    prog.add_argument('--output', '-o', help='Name of output file.')
-    prog.add_argument('gdbserver_port', type=int)
+                      help='A pre-computed symbolic transformation trace to' \
+                           ' be used for verification. Generate this with' \
+                           ' the `tools/capture_transforms.py` tool.')
+    prog.add_argument('-q', '--quiet',
+                      default=False,
+                      action='store_true',
+                      help='Don\'t print a verification result.')
+    prog.add_argument('-o', '--output',
+                      help='If specified with a file name, the recorded trace'
+                           ' of QEMU states will be written to that file.')
+    prog.add_argument('--error-level',
+                      default='warning',
+                      choices=list(verbosity.keys()))
     return prog
 
 def quoted(s: str) -> str:
@@ -41,20 +72,16 @@ if __name__ == "__main__":
     prog = make_argparser()
     prog.add_argument('--gdb', default='/bin/gdb',
                       help='GDB binary to invoke')
-    prog.add_argument('--quiet', '-q', action='store_true',
-                      help='Suppress all output')
     args = prog.parse_args()
 
     filepath = os.path.realpath(__file__)
-    qemu_tool_path = os.path.join(os.path.dirname(filepath), 'qemu_tool.py')
+    qemu_tool_path = os.path.join(os.path.dirname(filepath), '_qemu_tool.py')
 
     # We have to remove all arguments we don't want to pass to the qemu tool
     # manually here. Not nice, but what can you do..
     argv = sys.argv
     try_remove(argv, '--gdb')
     try_remove(argv, args.gdb)
-    try_remove(argv, '--quiet')
-    try_remove(argv, '-q')
 
     # Assemble the argv array passed to the qemu tool. GDB does not have a
     # mechanism to pass arguments to a script that it executes, so we
@@ -65,7 +92,7 @@ if __name__ == "__main__":
     gdb_cmd = [
         args.gdb,
         '-nx',  # Don't parse any .gdbinits
-        '--batch-silent' if args.quiet else '--batch',
+        '--batch',
         '-ex', f'py import sys',
         '-ex', f'py sys.argv = {argv_str}',
         '-ex', f'py sys.path = {path_str}',