about summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--focaccia/compare.py33
-rw-r--r--focaccia/lldb_target.py16
-rw-r--r--focaccia/match.py2
-rw-r--r--focaccia/miasm_util.py1
-rw-r--r--focaccia/parser.py49
-rw-r--r--focaccia/symbolic.py211
-rw-r--r--focaccia/trace.py74
-rw-r--r--focaccia/utils.py60
-rw-r--r--tools/_qemu_tool.py158
-rwxr-xr-xtools/capture_transforms.py7
10 files changed, 413 insertions, 198 deletions
diff --git a/focaccia/compare.py b/focaccia/compare.py
index 65c0f49..13f965c 100644
--- a/focaccia/compare.py
+++ b/focaccia/compare.py
@@ -1,36 +1,9 @@
-from functools import total_ordering
-from typing import Iterable, Self
+from __future__ import annotations
+from typing import Iterable
 
 from .snapshot import ProgramState, MemoryAccessError, RegisterAccessError
 from .symbolic import SymbolicTransform
-
-@total_ordering
-class ErrorSeverity:
-    def __init__(self, num: int, name: str):
-        """Construct an error severity.
-
-        :param num:  A numerical value that orders the severity with respect
-                     to other `ErrorSeverity` objects. Smaller values are less
-                     severe.
-        :param name: A descriptive name for the error severity, e.g. 'fatal'
-                     or 'info'.
-        """
-        self._numeral = num
-        self.name = name
-
-    def __repr__(self) -> str:
-        return f'[{self.name}]'
-
-    def __eq__(self, other: object) -> bool:
-        if not isinstance(other, Self):
-            return False
-        return self._numeral == other._numeral
-
-    def __lt__(self, other: Self) -> bool:
-        return self._numeral < other._numeral
-
-    def __hash__(self) -> int:
-        return hash(self._numeral)
+from .utils import ErrorSeverity
 
 class ErrorTypes:
     INFO       = ErrorSeverity(0, 'INFO')
diff --git a/focaccia/lldb_target.py b/focaccia/lldb_target.py
index b51ec3d..a4e96a8 100644
--- a/focaccia/lldb_target.py
+++ b/focaccia/lldb_target.py
@@ -1,3 +1,5 @@
+import os
+
 import lldb
 
 from .arch import supported_architectures, x86
@@ -30,11 +32,21 @@ class ConcreteSectionError(Exception):
     pass
 
 class LLDBConcreteTarget:
-    def __init__(self, executable: str, argv: list[str] = []):
+    def __init__(self,
+                 executable: str,
+                 argv: list[str] = [],
+                 envp: list[str] | None = None):
         """Construct an LLDB concrete target. Stop at entry.
 
+        :param argv: List of arguements. Does NOT include the conventional
+                     executable name as the first entry.
+        :param envp: List of environment entries. Defaults to current
+                     `os.environ` if `None`.
         :raises RuntimeError: If the process is unable to launch.
         """
+        if envp is None:
+            envp = [f'{k}={v}' for k, v in os.environ.items()]
+
         self.debugger = lldb.SBDebugger.Create()
         self.debugger.SetAsync(False)
         self.target = self.debugger.CreateTargetWithFileAndArch(executable,
@@ -46,7 +58,7 @@ class LLDBConcreteTarget:
         self.error = lldb.SBError()
         self.listener = self.debugger.GetListener()
         self.process = self.target.Launch(self.listener,
-                                          argv, None,        # argv, envp
+                                          argv, envp,        # argv, envp
                                           None, None, None,  # stdin, stdout, stderr
                                           None,              # working directory
                                           0,
diff --git a/focaccia/match.py b/focaccia/match.py
index 4c8071f..be88176 100644
--- a/focaccia/match.py
+++ b/focaccia/match.py
@@ -54,6 +54,8 @@ def fold_traces(ctrace: list[ProgramState],
     while i + 1 < len(strace):
         strace[i].concat(strace.pop(i + 1))
 
+    return ctrace, strace
+
 def match_traces(ctrace: list[ProgramState], \
                  strace: list[SymbolicTransform]):
     """Try to match traces that don't follow the same program flow.
diff --git a/focaccia/miasm_util.py b/focaccia/miasm_util.py
index 4299f87..24a0e11 100644
--- a/focaccia/miasm_util.py
+++ b/focaccia/miasm_util.py
@@ -158,6 +158,7 @@ def _eval_exprmem(expr: ExprMem, state: MiasmSymbolResolver):
     if not addr.is_int():
         return expr
 
+    assert(isinstance(addr, ExprInt))
     mem = state.resolve_memory(int(addr), expr.size // 8)
     if mem is None:
         return expr
diff --git a/focaccia/parser.py b/focaccia/parser.py
index 9fb83d8..c37c07a 100644
--- a/focaccia/parser.py
+++ b/focaccia/parser.py
@@ -4,11 +4,11 @@ import base64
 import json
 import re
 from typing import TextIO
-import lldb
 
 from .arch import supported_architectures, Arch
 from .snapshot import ProgramState
 from .symbolic import SymbolicTransform
+from .trace import Trace, TraceEnvironment
 
 class ParseError(Exception):
     """A parse error."""
@@ -20,25 +20,30 @@ def _get_or_throw(obj: dict, key: str):
         return val
     raise ParseError(f'Expected value at key {key}, but found none.')
 
-def parse_transformations(json_stream: TextIO) -> list[SymbolicTransform]:
+def parse_transformations(json_stream: TextIO) -> Trace[SymbolicTransform]:
     """Parse symbolic transformations from a text stream."""
-    from .symbolic import parse_symbolic_transform
+    data = json.load(json_stream)
 
-    json_data = json.load(json_stream)
-    return [parse_symbolic_transform(item) for item in json_data]
+    env = TraceEnvironment.from_json(_get_or_throw(data, 'env'))
+    strace = [SymbolicTransform.from_json(item) \
+              for item in _get_or_throw(data, 'states')]
+
+    return Trace(strace, env)
 
-def serialize_transformations(transforms: list[SymbolicTransform],
+def serialize_transformations(transforms: Trace[SymbolicTransform],
                               out_stream: TextIO):
     """Serialize symbolic transformations to a text stream."""
-    from .symbolic import serialize_symbolic_transform
-
-    json.dump([serialize_symbolic_transform(t) for t in transforms], out_stream)
+    json.dump({
+        'env': transforms.env.to_json(),
+        'states': [t.to_json() for t in transforms],
+    }, out_stream)
 
-def parse_snapshots(json_stream: TextIO) -> list[ProgramState]:
+def parse_snapshots(json_stream: TextIO) -> Trace[ProgramState]:
     """Parse snapshots from our JSON format."""
     json_data = json.load(json_stream)
 
     arch = supported_architectures[_get_or_throw(json_data, 'architecture')]
+    env = TraceEnvironment.from_json(_get_or_throw(json_data, 'env'))
     snapshots = []
     for snapshot in _get_or_throw(json_data, 'snapshots'):
         state = ProgramState(arch)
@@ -52,15 +57,19 @@ def parse_snapshots(json_stream: TextIO) -> list[ProgramState]:
 
         snapshots.append(state)
 
-    return snapshots
+    return Trace(snapshots, env)
 
-def serialize_snapshots(snapshots: list[ProgramState], out_stream: TextIO):
+def serialize_snapshots(snapshots: Trace[ProgramState], out_stream: TextIO):
     """Serialize a list of snapshots to out JSON format."""
     if not snapshots:
         return json.dump({}, out_stream)
 
     arch = snapshots[0].arch
-    res = { 'architecture': arch.archname, 'snapshots': [] }
+    res = {
+        'architecture': arch.archname,
+        'env': snapshots.env.to_json(),
+        'snapshots': []
+    }
     for snapshot in snapshots:
         assert(snapshot.arch == arch)
         regs = {r: v for r, v in snapshot.regs.items() if v is not None}
@@ -74,9 +83,15 @@ def serialize_snapshots(snapshots: list[ProgramState], out_stream: TextIO):
 
     json.dump(res, out_stream)
 
-def parse_qemu(stream: TextIO, arch: Arch) -> list[ProgramState]:
+def _make_unknown_env() -> TraceEnvironment:
+    return TraceEnvironment('', [], [], '?')
+
+def parse_qemu(stream: TextIO, arch: Arch) -> Trace[ProgramState]:
     """Parse a QEMU log from a stream.
 
+    Recommended QEMU log option: `qemu -d exec,cpu,fpu,vpu,nochain`. The `exec`
+    flag is strictly necessary for the log to be parseable.
+
     :return: A list of parsed program states, in order of occurrence in the
              log.
     """
@@ -88,7 +103,7 @@ def parse_qemu(stream: TextIO, arch: Arch) -> list[ProgramState]:
         if states:
             _parse_qemu_line(line, states[-1])
 
-    return states
+    return Trace(states, _make_unknown_env())
 
 def _parse_qemu_line(line: str, cur_state: ProgramState):
     """Try to parse a single register-assignment line from a QEMU log.
@@ -129,7 +144,7 @@ def _parse_qemu_line(line: str, cur_state: ProgramState):
             if regname is not None:
                 cur_state.set_register(regname, int(value, 16))
 
-def parse_arancini(stream: TextIO, arch: Arch) -> list[ProgramState]:
+def parse_arancini(stream: TextIO, arch: Arch) -> Trace[ProgramState]:
     aliases = {
         'Program counter': 'RIP',
         'flag ZF': 'ZF',
@@ -154,4 +169,4 @@ def parse_arancini(stream: TextIO, arch: Arch) -> list[ProgramState]:
             if regname is not None:
                 states[-1].set_register(regname, int(value, 16))
 
-    return states
+    return Trace(states, _make_unknown_env())
diff --git a/focaccia/symbolic.py b/focaccia/symbolic.py
index 029e979..dfc2966 100644
--- a/focaccia/symbolic.py
+++ b/focaccia/symbolic.py
@@ -20,6 +20,7 @@ from .lldb_target import LLDBConcreteTarget, \
 from .miasm_util import MiasmSymbolResolver, eval_expr
 from .snapshot import ProgramState, ReadableProgramState, \
                       RegisterAccessError, MemoryAccessError
+from .trace import Trace, TraceEnvironment
 
 warn = logging.warning
 
@@ -342,6 +343,49 @@ class SymbolicTransform:
             res[addr] = eval_symbol(expr, conc_state).to_bytes(length, byteorder='big')
         return res
 
+    @classmethod
+    def from_json(cls, data: dict) -> SymbolicTransform:
+        """Parse a symbolic transformation from a JSON object.
+
+        :raise KeyError: if a parse error occurs.
+        """
+        from miasm.expression.parser import str_to_expr as parse
+
+        def decode_inst(obj: Iterable[int], arch: Arch):
+            b = b''.join(i.to_bytes(1, byteorder='big') for i in obj)
+            return Instruction.from_bytecode(b, arch)
+
+        arch = supported_architectures[data['arch']]
+        start_addr = int(data['from_addr'])
+        end_addr = int(data['to_addr'])
+
+        t = SymbolicTransform({}, [], arch, start_addr, end_addr)
+        t.changed_regs = { name: parse(val) for name, val in data['regs'].items() }
+        t.changed_mem = { parse(addr): parse(val) for addr, val in data['mem'].items() }
+        t.instructions = [decode_inst(b, arch) for b in data['instructions']]
+
+        # Recover the instructions' address information
+        addr = t.addr
+        for inst in t.instructions:
+            inst.addr = addr
+            addr += inst.length
+
+        return t
+
+    def to_json(self) -> dict:
+        """Serialize a symbolic transformation as a JSON object."""
+        def encode_inst(inst: Instruction):
+            return [int(b) for b in inst.to_bytecode()]
+
+        return {
+            'arch': self.arch.archname,
+            'from_addr': self.range[0],
+            'to_addr': self.range[1],
+            'instructions': [encode_inst(inst) for inst in self.instructions],
+            'regs': { name: repr(expr) for name, expr in self.changed_regs.items() },
+            'mem': { repr(addr): repr(val) for addr, val in self.changed_mem.items() },
+        }
+
     def __repr__(self) -> str:
         start, end = self.range
         res = f'Symbolic state transformation {hex(start)} -> {hex(end)}:\n'
@@ -356,51 +400,6 @@ class SymbolicTransform:
 
         return res[:-1]  # Remove trailing newline
 
-def parse_symbolic_transform(string: str) -> SymbolicTransform:
-    """Parse a symbolic transformation from a string.
-    :raise KeyError: if a parse error occurs.
-    """
-    import json
-    from miasm.expression.parser import str_to_expr as parse
-
-    def decode_inst(obj: Iterable[int], arch: Arch):
-        b = b''.join(i.to_bytes(1, byteorder='big') for i in obj)
-        return Instruction.from_bytecode(b, arch)
-
-    data = json.loads(string)
-    arch = supported_architectures[data['arch']]
-    start_addr = int(data['from_addr'])
-    end_addr = int(data['to_addr'])
-
-    t = SymbolicTransform({}, [], arch, start_addr, end_addr)
-    t.changed_regs = { name: parse(val) for name, val in data['regs'].items() }
-    t.changed_mem = { parse(addr): parse(val) for addr, val in data['mem'].items() }
-    t.instructions = [decode_inst(b, arch) for b in data['instructions']]
-
-    # Recover the instructions' address information
-    addr = t.addr
-    for inst in t.instructions:
-        inst.addr = addr
-        addr += inst.length
-
-    return t
-
-def serialize_symbolic_transform(t: SymbolicTransform) -> str:
-    """Serialize a symbolic transformation."""
-    import json
-
-    def encode_inst(inst: Instruction):
-        return [int(b) for b in inst.to_bytecode()]
-
-    return json.dumps({
-        'arch': t.arch.archname,
-        'from_addr': t.range[0],
-        'to_addr': t.range[1],
-        'instructions': [encode_inst(inst) for inst in t.instructions],
-        'regs': { name: repr(expr) for name, expr in t.changed_regs.items() },
-        'mem': { repr(addr): repr(val) for addr, val in t.changed_mem.items() },
-    })
-
 class DisassemblyContext:
     def __init__(self, binary):
         self.loc_db = LocationDB()
@@ -419,6 +418,7 @@ class DisassemblyContext:
         """Focaccia's description of an instruction set architecture."""
 
         # Create disassembly/lifting context
+        assert(self.machine.dis_engine is not None)
         self.mdis = self.machine.dis_engine(cont.bin_stream, loc_db=self.loc_db)
         self.mdis.follow_call = True
         self.lifter = self.machine.lifter(self.loc_db)
@@ -427,56 +427,122 @@ def run_instruction(instr: miasm_instr,
                     conc_state: MiasmSymbolResolver,
                     lifter: Lifter) \
         -> tuple[ExprInt | None, dict[Expr, Expr]]:
-    """Run a basic block.
+    """Compute the symbolic equation of a single instruction.
 
-    Tries to run IR blocks until the end of an ASM block/basic block is
-    reached. Skips 'virtual' blocks that purely exist in the IR.
+    The concolic engine tries to express the instruction's equation as
+    independent of the concrete state as possible.
+
+    May fail if the instruction is not supported. Failure is signalled by
+    returning `None` as the next program counter.
 
     :param instr:      The instruction to run.
     :param conc_state: A concrete reference state at `pc = instr.offset`. Used
-                       to resolve symbolic program counters, i.e. to 'guide' the
-                       symbolic execution on the correct path. This is the
+                       to resolve symbolic program counters, i.e. to 'guide'
+                       the symbolic execution on the correct path. This is the
                        concrete part of our concolic execution.
-    :param lifter:     A lifter of the appropriate architecture. Get this from a
-                       `DisassemblyContext` or a `Machine`.
+    :param lifter:     A lifter of the appropriate architecture. Get this from
+                       a `DisassemblyContext` or a `Machine`.
 
-    :return: The next program counter and a symbolic state. None if no next
-             program counter can be found. This happens when an error occurs or
-             when the program exits. The state represents the transformation
-             which the instruction applies to a program state.
+    :return: The next program counter and a symbolic state. The PC is None if
+             an error occurs or when the program exits. The returned state
+             is `instr`'s symbolic transformation.
     """
-    def execute_location(loc, base_state: dict | None) -> tuple[ExprInt | None, dict]:
+    from miasm.expression.expression import ExprCond, LocKey
+    from miasm.expression.simplifications import expr_simp
+
+    def create_cond_state(cond: Expr, iftrue: dict, iffalse: dict) -> dict:
+        """Combines states that are to be reached conditionally.
+
+        Example:
+            State A:
+                RAX          = 0x42
+                @[RBP - 0x4] = 0x123
+            State B:
+                RDI          = -0x777
+                @[RBP - 0x4] = 0x5c32
+            Condition:
+                RCX > 0x4 ? A : B
+
+            Result State:
+                RAX          = (RCX > 0x4) ? 0x42 : RAX
+                RDI          = (RCX > 0x4) ? RDI : -0x777
+                @[RBP - 0x4] = (RCX > 0x4) ? 0x123 : 0x5c32
+        """
+        res = {}
+        for dst, v in iftrue.items():
+            if dst not in iffalse:
+                res[dst] = expr_simp(ExprCond(cond, v, dst))
+            else:
+                res[dst] = expr_simp(ExprCond(cond, v, iffalse[dst]))
+        for dst, v in iffalse.items():
+            if dst not in iftrue:
+                res[dst] = expr_simp(ExprCond(cond, dst, v))
+        return res
+
+    def _execute_location(loc, base_state: dict | None) \
+            -> tuple[Expr, dict]:
+        """Execute a single IR block via symbolic engine. No fancy stuff."""
         # Query the location's IR block
         irblock = ircfg.get_block(loc)
         if irblock is None:
-            # Weirdly, this never seems to happen. I don't know why, though.
-            return None, base_state if base_state is not None else {}
+            return loc, base_state if base_state is not None else {}
 
         # Apply IR block to the current state
         engine = SymbolicExecutionEngine(lifter, state=base_state)
         new_pc = engine.eval_updt_irblock(irblock)
         modified = dict(engine.modified())
+        return new_pc, modified
+
+    def execute_location(loc: Expr | LocKey) -> tuple[ExprInt, dict]:
+        """Execute chains of IR blocks until a concrete program counter is
+        reached."""
+        seen_locs = set()  # To break out of loop instructions
+        new_pc, modified = _execute_location(loc, None)
+
+        # Run chained IR blocks until a real program counter is reached.
+        # This used to be recursive (and much more elegant), but large RCX
+        # values for 'REP ...' instructions could make the stack overflow.
+        while not new_pc.is_int():
+            seen_locs.add(new_pc)
+
+            if new_pc.is_loc():
+                # Jump to the next location.
+                new_pc, modified = _execute_location(new_pc, modified)
+            elif new_pc.is_cond():
+                # Explore conditional paths manually by constructing
+                # conditional states based on the possible outcomes.
+                assert(isinstance(new_pc, ExprCond))
+                cond = new_pc.cond
+                pc_iftrue, pc_iffalse = new_pc.src1, new_pc.src2
+
+                pc_t, state_t = _execute_location(pc_iftrue, modified.copy())
+                pc_f, state_f = _execute_location(pc_iffalse, modified.copy())
+                modified = create_cond_state(cond, state_t, state_f)
+                new_pc = expr_simp(ExprCond(cond, pc_t, pc_f))
+            else:
+                # Concretisize PC in case it is, e.g., a memory expression
+                new_pc = eval_expr(new_pc, conc_state)
 
-        # Resolve the symbolic PC to a concrete value based on the
-        # concrete program state before this instruction
-        new_pc = eval_expr(new_pc, conc_state)
-        if new_pc.is_loc():
-            # Run chained IR blocks until a real program counter is reached
-            new_pc, modified = execute_location(new_pc, modified)
+            # Avoid infinite loops for loop instructions (REP ...) by making
+            # the jump to the next loop iteration (or exit) concrete.
+            if new_pc in seen_locs:
+                new_pc = eval_expr(new_pc, conc_state)
+                seen_locs.clear()
 
-        assert(new_pc is not None and isinstance(new_pc, ExprInt))
+        assert(isinstance(new_pc, ExprInt))
         return new_pc, modified
 
     # Lift instruction to IR
     ircfg = lifter.new_ircfg()
     try:
         loc = lifter.add_instr_to_ircfg(instr, ircfg, None, False)
+        assert(isinstance(loc, Expr) or isinstance(loc, LocKey))
     except NotImplementedError as err:
         warn(f'[WARNING] Unable to lift instruction {instr}: {err}. Skipping.')
         return None, {}  # Create an empty transform for the instruction
 
     # Execute instruction symbolically
-    new_pc, modified = execute_location(loc, None)
+    new_pc, modified = execute_location(loc)
     modified[lifter.pc] = new_pc  # Add PC update to state
 
     return new_pc, modified
@@ -514,21 +580,22 @@ class _LLDBConcreteState(ReadableProgramState):
         except ConcreteMemoryError:
             raise MemoryAccessError(addr, size, 'Unable to read memory from LLDB.')
 
-def collect_symbolic_trace(binary: str,
-                           args: list[str],
+def collect_symbolic_trace(env: TraceEnvironment,
                            start_addr: int | None = None
-                           ) -> list[SymbolicTransform]:
+                           ) -> Trace[SymbolicTransform]:
     """Execute a program and compute state transformations between executed
     instructions.
 
     :param binary: The binary to trace.
     :param args:   Arguments to the program.
     """
+    binary = env.binary_name
+
     ctx = DisassemblyContext(binary)
     arch = ctx.arch
 
     # Set up concrete reference state
-    target = LLDBConcreteTarget(binary, args)
+    target = LLDBConcreteTarget(binary, env.argv, env.envp)
     if start_addr is not None:
         target.run_until(start_addr)
     lldb_state = _LLDBConcreteState(target, arch)
@@ -563,4 +630,4 @@ def collect_symbolic_trace(binary: str,
         # Step forward
         target.step()
 
-    return strace
+    return Trace(strace, env)
diff --git a/focaccia/trace.py b/focaccia/trace.py
new file mode 100644
index 0000000..094358f
--- /dev/null
+++ b/focaccia/trace.py
@@ -0,0 +1,74 @@
+from __future__ import annotations
+from typing import Generic, TypeVar
+
+from .utils import file_hash
+
+T = TypeVar('T')
+
+class TraceEnvironment:
+    """Data that defines the environment in which a trace was recorded."""
+    def __init__(self,
+                 binary: str,
+                 argv: list[str],
+                 envp: list[str],
+                 binary_hash: str | None = None):
+        self.argv = argv
+        self.envp = envp
+        self.binary_name = binary
+        if binary_hash is None:
+            self.binary_hash = file_hash(binary)
+        else:
+            self.binary_hash = binary_hash
+
+    @classmethod
+    def from_json(cls, json: dict) -> TraceEnvironment:
+        """Parse a JSON object into a TraceEnvironment."""
+        return cls(
+            json['binary_name'],
+            json['argv'],
+            json['envp'],
+            json['binary_hash'],
+        )
+
+    def to_json(self) -> dict:
+        """Serialize a TraceEnvironment to a JSON object."""
+        return {
+            'binary_name': self.binary_name,
+            'binary_hash': self.binary_hash,
+            'argv': self.argv,
+            'envp': self.envp,
+        }
+
+    def __eq__(self, other: object) -> bool:
+        if not isinstance(other, TraceEnvironment):
+            return False
+
+        return self.binary_name == other.binary_name \
+            and self.binary_hash == other.binary_hash \
+            and self.argv == other.argv \
+            and self.envp == other.envp
+
+    def __repr__(self) -> str:
+        return f'{self.binary_name} {" ".join(self.argv)}' \
+               f'\n   bin-hash={self.binary_hash}' \
+               f'\n   envp={repr(self.envp)}'
+
+class Trace(Generic[T]):
+    def __init__(self,
+                 trace_states: list[T],
+                 env: TraceEnvironment):
+        self.states = trace_states
+        self.env = env
+
+    def __len__(self) -> int:
+        return len(self.states)
+
+    def __getitem__(self, i: int) -> T:
+        return self.states[i]
+
+    def __iter__(self):
+        return iter(self.states)
+
+    def __repr__(self) -> str:
+        return f'Trace with {len(self.states)} trace points.' \
+               f' Environment: {repr(self.env)}'
diff --git a/focaccia/utils.py b/focaccia/utils.py
index 50251a1..c4f6a74 100644
--- a/focaccia/utils.py
+++ b/focaccia/utils.py
@@ -1,8 +1,39 @@
+from __future__ import annotations
+
 import ctypes
-import sys
+import os
 import shutil
+import sys
+from functools import total_ordering
+from hashlib import sha256
+
+@total_ordering
+class ErrorSeverity:
+    def __init__(self, num: int, name: str):
+        """Construct an error severity.
+
+        :param num:  A numerical value that orders the severity with respect
+                     to other `ErrorSeverity` objects. Smaller values are less
+                     severe.
+        :param name: A descriptive name for the error severity, e.g. 'fatal'
+                     or 'info'.
+        """
+        self._numeral = num
+        self.name = name
+
+    def __repr__(self) -> str:
+        return f'[{self.name}]'
 
-from .compare import ErrorSeverity
+    def __eq__(self, other: object) -> bool:
+        if not isinstance(other, ErrorSeverity):
+            return False
+        return self._numeral == other._numeral
+
+    def __lt__(self, other: ErrorSeverity) -> bool:
+        return self._numeral < other._numeral
+
+    def __hash__(self) -> int:
+        return hash(self._numeral)
 
 def float_bits_to_uint(v: float) -> int:
     """Bit-cast a float to a 32-bit integer."""
@@ -20,6 +51,31 @@ def uint_bits_to_double(v: int) -> float:
     """Bit-cast a 64-bit integer to a double."""
     return ctypes.c_double.from_buffer(ctypes.c_uint64(v)).value
 
+def file_hash(filename: str, hash = sha256(), chunksize: int = 65536) -> str:
+    """Calculate a file's hash.
+
+    :param filename:  Name of the file to hash.
+    :param hash:      The hash algorithm to use.
+    :param chunksize: Optimization option. Size of contiguous chunks to read
+                      from the file and feed into the hashing algorithm.
+    :return: A hex digest.
+    """
+    with open(filename, 'rb') as file:
+        while True:
+            data = file.read(chunksize)
+            if not data:
+                break
+            hash.update(data)
+    return hash.hexdigest()
+
+def get_envp() -> list[str]:
+    """Return current environment array.
+
+    Merge dict-like `os.environ` struct to the traditional list-like
+    environment array.
+    """
+    return [f'{k}={v}' for k, v in os.environ.items()]
+
 def print_separator(separator: str = '-', stream=sys.stdout, count: int = 80):
     maxtermsize = count
     termsize = shutil.get_terminal_size((80, 20)).columns
diff --git a/tools/_qemu_tool.py b/tools/_qemu_tool.py
index 9659f0e..38715dc 100644
--- a/tools/_qemu_tool.py
+++ b/tools/_qemu_tool.py
@@ -15,7 +15,8 @@ from focaccia.compare import compare_symbolic
 from focaccia.snapshot import ProgramState, ReadableProgramState, \
                               RegisterAccessError, MemoryAccessError
 from focaccia.symbolic import SymbolicTransform, eval_symbol, ExprMem
-from focaccia.utils import print_result
+from focaccia.trace import Trace, TraceEnvironment
+from focaccia.utils import get_envp, print_result
 
 from verify_qemu import make_argparser, verbosity
 
@@ -76,6 +77,7 @@ class GDBServerStateIterator:
             exit(1)
 
         self.arch = supported_architectures[archname]
+        self.binary = self._process.progspace.filename
 
     def __iter__(self):
         return self
@@ -98,6 +100,77 @@ class GDBServerStateIterator:
 
         return GDBProgramState(self._process, gdb.selected_frame())
 
+def record_minimal_snapshot(prev_state: ReadableProgramState,
+                            cur_state: ReadableProgramState,
+                            prev_transform: SymbolicTransform,
+                            cur_transform: SymbolicTransform) \
+        -> ProgramState:
+    """Record a minimal snapshot.
+
+    A minimal snapshot must include values (registers and memory) that are
+    accessed by two transformations:
+      1. The values produced by the previous transformation (the
+         transformation that is producing this snapshot) to check these
+         values against expected values calculated from the previous
+         program state.
+      2. The values that act as inputs to the transformation acting on this
+         snapshot, to calculate the expected values of the next snapshot.
+
+    :param prev_transform: The symbolic transformation generating, or
+                           leading to, `cur_state`. Values generated by
+                           this transformation are included in the
+                           snapshot.
+    :param transform: The symbolic transformation operating on this
+                      snapshot. Input values to this transformation are
+                      included in the snapshot.
+    """
+    assert(cur_state.read_register('pc') == cur_transform.addr)
+    assert(prev_transform.arch == cur_transform.arch)
+
+    def get_written_addresses(t: SymbolicTransform):
+        """Get all output memory accesses of a symbolic transformation."""
+        return [ExprMem(a, v.size) for a, v in t.changed_mem.items()]
+
+    def set_values(regs: Iterable[str], mems: Iterable[ExprMem],
+                   cur_state: ReadableProgramState,
+                   prev_state: ReadableProgramState,
+                   out_state: ProgramState):
+        """
+        :param prev_state: Addresses of memory included in the snapshot are
+                           resolved relative to this state.
+        """
+        for regname in regs:
+            regval = cur_state.read_register(regname)
+            try:
+                out_state.set_register(regname, regval)
+            except RegisterAccessError:
+                pass
+        for mem in mems:
+            assert(mem.size % 8 == 0)
+            addr = eval_symbol(mem.ptr, prev_state)
+            try:
+                mem = cur_state.read_memory(addr, int(mem.size / 8))
+                out_state.write_memory(addr, mem)
+            except MemoryAccessError:
+                pass
+
+    state = ProgramState(cur_transform.arch)
+    state.set_register('PC', cur_transform.addr)
+
+    set_values(prev_transform.changed_regs.keys(),
+               get_written_addresses(prev_transform),
+               cur_state,
+               prev_state,  # Evaluate memory addresses based on previous
+                            # state because they are that state's output
+                            # addresses.
+               state)
+    set_values(cur_transform.get_used_registers(),
+               cur_transform.get_used_memory_addresses(),
+               cur_state,
+               cur_state,
+               state)
+    return state
+
 def collect_conc_trace(gdb: GDBServerStateIterator, \
                        strace: list[SymbolicTransform]) \
         -> tuple[list[ProgramState], list[SymbolicTransform]]:
@@ -115,75 +188,6 @@ def collect_conc_trace(gdb: GDBServerStateIterator, \
     :return: A list of concrete states and a list of corresponding symbolic
              transformations. The lists are guaranteed to have the same length.
     """
-    def record_snapshot(prev_state: ReadableProgramState,
-                        cur_state: GDBProgramState,
-                        prev_transform: SymbolicTransform,
-                        cur_transform: SymbolicTransform) \
-            -> ProgramState:
-        """Record a minimal snapshot.
-
-        A minimal snapshot must include values (registers and memory) that are
-        accessed by two transformations:
-          1. The values produced by the previous transformation (the
-             transformation that is producing this snapshot) to check these
-             values against expected values calculated from the previous
-             program state.
-          2. The values that act as inputs to the transformation acting on this
-             snapshot, to calculate the expected values of the next snapshot.
-
-        :param prev_transform: The symbolic transformation generating, or
-                               leading to, `gdb_state`. Values generated by
-                               this transformation are included in the
-                               snapshot.
-        :param transform: The symbolic transformation operating on this
-                          snapshot. Input values to this transformation are
-                          included in the snapshot.
-        """
-        assert(cur_state.read_register('pc') == cur_transform.addr)
-
-        def get_written_addresses(t: SymbolicTransform):
-            """Get all output memory accesses of a symbolic transformation."""
-            return [ExprMem(a, v.size) for a, v in t.changed_mem.items()]
-
-        def set_values(regs: Iterable[str], mems: Iterable[ExprMem],
-                       cur_state: GDBProgramState, prev_state: ReadableProgramState,
-                       out_state: ProgramState):
-            """
-            :param prev_state: Addresses of memory included in the snapshot are
-                               resolved relative to this state.
-            """
-            for regname in regs:
-                regval = cur_state.read_register(regname)
-                try:
-                    out_state.set_register(regname, regval)
-                except RegisterAccessError:
-                    pass
-            for mem in mems:
-                assert(mem.size % 8 == 0)
-                addr = eval_symbol(mem.ptr, prev_state)
-                try:
-                    mem = cur_state.read_memory(addr, int(mem.size / 8))
-                    out_state.write_memory(addr, mem)
-                except MemoryAccessError:
-                    pass
-
-        state = ProgramState(gdb.arch)
-        state.set_register('PC', cur_transform.addr)
-
-        set_values(prev_transform.changed_regs.keys(),
-                   get_written_addresses(prev_transform),
-                   cur_state,
-                   prev_state,  # Evaluate memory addresses based on previous
-                                # state because they are that state's output
-                                # addresses.
-                   state)
-        set_values(cur_transform.get_used_registers(),
-                   cur_transform.get_used_memory_addresses(),
-                   cur_state,
-                   cur_state,
-                   state)
-        return state
-
     def find_index(seq, target, access=lambda el: el):
         for i, el in enumerate(seq):
             if access(el) == target:
@@ -222,7 +226,7 @@ def collect_conc_trace(gdb: GDBServerStateIterator, \
                 symb_i += next_i + 1
 
             assert(cur_state.read_register('pc') == strace[symb_i].addr)
-            states.append(record_snapshot(
+            states.append(record_minimal_snapshot(
                 states[-1] if states else cur_state,
                 cur_state,
                 matched_transforms[-1] if matched_transforms else strace[symb_i],
@@ -240,6 +244,12 @@ def main():
 
     gdbserver_addr = 'localhost'
     gdbserver_port = args.port
+    gdb_server = GDBServerStateIterator(gdbserver_addr, gdbserver_port)
+
+    executable = gdb_server.binary
+    argv = []  # QEMU's GDB stub does not support 'info proc cmdline'
+    envp = []  # Can't get the remote target's environment
+    env = TraceEnvironment(executable, argv, envp, '?')
 
     # Read pre-computed symbolic trace
     with open(args.symb_trace, 'r') as strace:
@@ -247,8 +257,8 @@ def main():
 
     # Use symbolic trace to collect concrete trace from QEMU
     conc_states, matched_transforms = collect_conc_trace(
-        GDBServerStateIterator(gdbserver_addr, gdbserver_port),
-        symb_transforms)
+        gdb_server,
+        symb_transforms.states)
 
     # Verify and print result
     if not args.quiet:
@@ -258,7 +268,7 @@ def main():
     if args.output:
         from focaccia.parser import serialize_snapshots
         with open(args.output, 'w') as file:
-            serialize_snapshots(conc_states, file)
+            serialize_snapshots(Trace(conc_states, env), file)
 
 if __name__ == "__main__":
     main()
diff --git a/tools/capture_transforms.py b/tools/capture_transforms.py
index 5439b05..5a104c0 100755
--- a/tools/capture_transforms.py
+++ b/tools/capture_transforms.py
@@ -2,9 +2,11 @@
 
 import argparse
 import logging
+import os
 
 from focaccia import parser
 from focaccia.symbolic import collect_symbolic_trace
+from focaccia.trace import Trace, TraceEnvironment
 
 def main():
     prog = argparse.ArgumentParser()
@@ -19,7 +21,10 @@ def main():
     args = prog.parse_args()
 
     logging.disable(logging.CRITICAL)
-    trace = collect_symbolic_trace(args.binary, args.args, None)
+    env = TraceEnvironment(args.binary,
+                           args.args,
+                           [f'{k}={v}' for k, v in os.environ.items()])
+    trace = collect_symbolic_trace(env, None)
     with open(args.output, 'w') as file:
         parser.serialize_transformations(trace, file)