diff options
| author | Theofilos Augoustis <theofilos.augoustis@gmail.com> | 2024-02-19 16:26:22 +0100 |
|---|---|---|
| committer | Theofilos Augoustis <theofilos.augoustis@gmail.com> | 2024-02-19 16:26:22 +0100 |
| commit | 67dbe17ec3dff6f4c9a508ace3f1d25cb5e9f9c8 (patch) | |
| tree | 5f3281228e39c072daee10dd6aea2267e4612f57 | |
| parent | 86b42b57d322837c18a47daa917a597c79ad2bbb (diff) | |
| download | focaccia-67dbe17ec3dff6f4c9a508ace3f1d25cb5e9f9c8.tar.gz focaccia-67dbe17ec3dff6f4c9a508ace3f1d25cb5e9f9c8.zip | |
Make symbolic equations more symbolic
Reduce the impact of concrete guidance on the process of calculating an instruction's symbolic equation. The resulting equations will contain less assumptions about the concrete state and thus be more generic.
| -rw-r--r-- | focaccia/compare.py | 33 | ||||
| -rw-r--r-- | focaccia/lldb_target.py | 16 | ||||
| -rw-r--r-- | focaccia/match.py | 2 | ||||
| -rw-r--r-- | focaccia/miasm_util.py | 1 | ||||
| -rw-r--r-- | focaccia/parser.py | 49 | ||||
| -rw-r--r-- | focaccia/symbolic.py | 211 | ||||
| -rw-r--r-- | focaccia/trace.py | 74 | ||||
| -rw-r--r-- | focaccia/utils.py | 60 | ||||
| -rw-r--r-- | tools/_qemu_tool.py | 158 | ||||
| -rwxr-xr-x | tools/capture_transforms.py | 7 |
10 files changed, 413 insertions, 198 deletions
diff --git a/focaccia/compare.py b/focaccia/compare.py index 65c0f49..13f965c 100644 --- a/focaccia/compare.py +++ b/focaccia/compare.py @@ -1,36 +1,9 @@ -from functools import total_ordering -from typing import Iterable, Self +from __future__ import annotations +from typing import Iterable from .snapshot import ProgramState, MemoryAccessError, RegisterAccessError from .symbolic import SymbolicTransform - -@total_ordering -class ErrorSeverity: - def __init__(self, num: int, name: str): - """Construct an error severity. - - :param num: A numerical value that orders the severity with respect - to other `ErrorSeverity` objects. Smaller values are less - severe. - :param name: A descriptive name for the error severity, e.g. 'fatal' - or 'info'. - """ - self._numeral = num - self.name = name - - def __repr__(self) -> str: - return f'[{self.name}]' - - def __eq__(self, other: object) -> bool: - if not isinstance(other, Self): - return False - return self._numeral == other._numeral - - def __lt__(self, other: Self) -> bool: - return self._numeral < other._numeral - - def __hash__(self) -> int: - return hash(self._numeral) +from .utils import ErrorSeverity class ErrorTypes: INFO = ErrorSeverity(0, 'INFO') diff --git a/focaccia/lldb_target.py b/focaccia/lldb_target.py index b51ec3d..a4e96a8 100644 --- a/focaccia/lldb_target.py +++ b/focaccia/lldb_target.py @@ -1,3 +1,5 @@ +import os + import lldb from .arch import supported_architectures, x86 @@ -30,11 +32,21 @@ class ConcreteSectionError(Exception): pass class LLDBConcreteTarget: - def __init__(self, executable: str, argv: list[str] = []): + def __init__(self, + executable: str, + argv: list[str] = [], + envp: list[str] | None = None): """Construct an LLDB concrete target. Stop at entry. + :param argv: List of arguements. Does NOT include the conventional + executable name as the first entry. + :param envp: List of environment entries. Defaults to current + `os.environ` if `None`. :raises RuntimeError: If the process is unable to launch. """ + if envp is None: + envp = [f'{k}={v}' for k, v in os.environ.items()] + self.debugger = lldb.SBDebugger.Create() self.debugger.SetAsync(False) self.target = self.debugger.CreateTargetWithFileAndArch(executable, @@ -46,7 +58,7 @@ class LLDBConcreteTarget: self.error = lldb.SBError() self.listener = self.debugger.GetListener() self.process = self.target.Launch(self.listener, - argv, None, # argv, envp + argv, envp, # argv, envp None, None, None, # stdin, stdout, stderr None, # working directory 0, diff --git a/focaccia/match.py b/focaccia/match.py index 4c8071f..be88176 100644 --- a/focaccia/match.py +++ b/focaccia/match.py @@ -54,6 +54,8 @@ def fold_traces(ctrace: list[ProgramState], while i + 1 < len(strace): strace[i].concat(strace.pop(i + 1)) + return ctrace, strace + def match_traces(ctrace: list[ProgramState], \ strace: list[SymbolicTransform]): """Try to match traces that don't follow the same program flow. diff --git a/focaccia/miasm_util.py b/focaccia/miasm_util.py index 4299f87..24a0e11 100644 --- a/focaccia/miasm_util.py +++ b/focaccia/miasm_util.py @@ -158,6 +158,7 @@ def _eval_exprmem(expr: ExprMem, state: MiasmSymbolResolver): if not addr.is_int(): return expr + assert(isinstance(addr, ExprInt)) mem = state.resolve_memory(int(addr), expr.size // 8) if mem is None: return expr diff --git a/focaccia/parser.py b/focaccia/parser.py index 9fb83d8..c37c07a 100644 --- a/focaccia/parser.py +++ b/focaccia/parser.py @@ -4,11 +4,11 @@ import base64 import json import re from typing import TextIO -import lldb from .arch import supported_architectures, Arch from .snapshot import ProgramState from .symbolic import SymbolicTransform +from .trace import Trace, TraceEnvironment class ParseError(Exception): """A parse error.""" @@ -20,25 +20,30 @@ def _get_or_throw(obj: dict, key: str): return val raise ParseError(f'Expected value at key {key}, but found none.') -def parse_transformations(json_stream: TextIO) -> list[SymbolicTransform]: +def parse_transformations(json_stream: TextIO) -> Trace[SymbolicTransform]: """Parse symbolic transformations from a text stream.""" - from .symbolic import parse_symbolic_transform + data = json.load(json_stream) - json_data = json.load(json_stream) - return [parse_symbolic_transform(item) for item in json_data] + env = TraceEnvironment.from_json(_get_or_throw(data, 'env')) + strace = [SymbolicTransform.from_json(item) \ + for item in _get_or_throw(data, 'states')] + + return Trace(strace, env) -def serialize_transformations(transforms: list[SymbolicTransform], +def serialize_transformations(transforms: Trace[SymbolicTransform], out_stream: TextIO): """Serialize symbolic transformations to a text stream.""" - from .symbolic import serialize_symbolic_transform - - json.dump([serialize_symbolic_transform(t) for t in transforms], out_stream) + json.dump({ + 'env': transforms.env.to_json(), + 'states': [t.to_json() for t in transforms], + }, out_stream) -def parse_snapshots(json_stream: TextIO) -> list[ProgramState]: +def parse_snapshots(json_stream: TextIO) -> Trace[ProgramState]: """Parse snapshots from our JSON format.""" json_data = json.load(json_stream) arch = supported_architectures[_get_or_throw(json_data, 'architecture')] + env = TraceEnvironment.from_json(_get_or_throw(json_data, 'env')) snapshots = [] for snapshot in _get_or_throw(json_data, 'snapshots'): state = ProgramState(arch) @@ -52,15 +57,19 @@ def parse_snapshots(json_stream: TextIO) -> list[ProgramState]: snapshots.append(state) - return snapshots + return Trace(snapshots, env) -def serialize_snapshots(snapshots: list[ProgramState], out_stream: TextIO): +def serialize_snapshots(snapshots: Trace[ProgramState], out_stream: TextIO): """Serialize a list of snapshots to out JSON format.""" if not snapshots: return json.dump({}, out_stream) arch = snapshots[0].arch - res = { 'architecture': arch.archname, 'snapshots': [] } + res = { + 'architecture': arch.archname, + 'env': snapshots.env.to_json(), + 'snapshots': [] + } for snapshot in snapshots: assert(snapshot.arch == arch) regs = {r: v for r, v in snapshot.regs.items() if v is not None} @@ -74,9 +83,15 @@ def serialize_snapshots(snapshots: list[ProgramState], out_stream: TextIO): json.dump(res, out_stream) -def parse_qemu(stream: TextIO, arch: Arch) -> list[ProgramState]: +def _make_unknown_env() -> TraceEnvironment: + return TraceEnvironment('', [], [], '?') + +def parse_qemu(stream: TextIO, arch: Arch) -> Trace[ProgramState]: """Parse a QEMU log from a stream. + Recommended QEMU log option: `qemu -d exec,cpu,fpu,vpu,nochain`. The `exec` + flag is strictly necessary for the log to be parseable. + :return: A list of parsed program states, in order of occurrence in the log. """ @@ -88,7 +103,7 @@ def parse_qemu(stream: TextIO, arch: Arch) -> list[ProgramState]: if states: _parse_qemu_line(line, states[-1]) - return states + return Trace(states, _make_unknown_env()) def _parse_qemu_line(line: str, cur_state: ProgramState): """Try to parse a single register-assignment line from a QEMU log. @@ -129,7 +144,7 @@ def _parse_qemu_line(line: str, cur_state: ProgramState): if regname is not None: cur_state.set_register(regname, int(value, 16)) -def parse_arancini(stream: TextIO, arch: Arch) -> list[ProgramState]: +def parse_arancini(stream: TextIO, arch: Arch) -> Trace[ProgramState]: aliases = { 'Program counter': 'RIP', 'flag ZF': 'ZF', @@ -154,4 +169,4 @@ def parse_arancini(stream: TextIO, arch: Arch) -> list[ProgramState]: if regname is not None: states[-1].set_register(regname, int(value, 16)) - return states + return Trace(states, _make_unknown_env()) diff --git a/focaccia/symbolic.py b/focaccia/symbolic.py index 029e979..dfc2966 100644 --- a/focaccia/symbolic.py +++ b/focaccia/symbolic.py @@ -20,6 +20,7 @@ from .lldb_target import LLDBConcreteTarget, \ from .miasm_util import MiasmSymbolResolver, eval_expr from .snapshot import ProgramState, ReadableProgramState, \ RegisterAccessError, MemoryAccessError +from .trace import Trace, TraceEnvironment warn = logging.warning @@ -342,6 +343,49 @@ class SymbolicTransform: res[addr] = eval_symbol(expr, conc_state).to_bytes(length, byteorder='big') return res + @classmethod + def from_json(cls, data: dict) -> SymbolicTransform: + """Parse a symbolic transformation from a JSON object. + + :raise KeyError: if a parse error occurs. + """ + from miasm.expression.parser import str_to_expr as parse + + def decode_inst(obj: Iterable[int], arch: Arch): + b = b''.join(i.to_bytes(1, byteorder='big') for i in obj) + return Instruction.from_bytecode(b, arch) + + arch = supported_architectures[data['arch']] + start_addr = int(data['from_addr']) + end_addr = int(data['to_addr']) + + t = SymbolicTransform({}, [], arch, start_addr, end_addr) + t.changed_regs = { name: parse(val) for name, val in data['regs'].items() } + t.changed_mem = { parse(addr): parse(val) for addr, val in data['mem'].items() } + t.instructions = [decode_inst(b, arch) for b in data['instructions']] + + # Recover the instructions' address information + addr = t.addr + for inst in t.instructions: + inst.addr = addr + addr += inst.length + + return t + + def to_json(self) -> dict: + """Serialize a symbolic transformation as a JSON object.""" + def encode_inst(inst: Instruction): + return [int(b) for b in inst.to_bytecode()] + + return { + 'arch': self.arch.archname, + 'from_addr': self.range[0], + 'to_addr': self.range[1], + 'instructions': [encode_inst(inst) for inst in self.instructions], + 'regs': { name: repr(expr) for name, expr in self.changed_regs.items() }, + 'mem': { repr(addr): repr(val) for addr, val in self.changed_mem.items() }, + } + def __repr__(self) -> str: start, end = self.range res = f'Symbolic state transformation {hex(start)} -> {hex(end)}:\n' @@ -356,51 +400,6 @@ class SymbolicTransform: return res[:-1] # Remove trailing newline -def parse_symbolic_transform(string: str) -> SymbolicTransform: - """Parse a symbolic transformation from a string. - :raise KeyError: if a parse error occurs. - """ - import json - from miasm.expression.parser import str_to_expr as parse - - def decode_inst(obj: Iterable[int], arch: Arch): - b = b''.join(i.to_bytes(1, byteorder='big') for i in obj) - return Instruction.from_bytecode(b, arch) - - data = json.loads(string) - arch = supported_architectures[data['arch']] - start_addr = int(data['from_addr']) - end_addr = int(data['to_addr']) - - t = SymbolicTransform({}, [], arch, start_addr, end_addr) - t.changed_regs = { name: parse(val) for name, val in data['regs'].items() } - t.changed_mem = { parse(addr): parse(val) for addr, val in data['mem'].items() } - t.instructions = [decode_inst(b, arch) for b in data['instructions']] - - # Recover the instructions' address information - addr = t.addr - for inst in t.instructions: - inst.addr = addr - addr += inst.length - - return t - -def serialize_symbolic_transform(t: SymbolicTransform) -> str: - """Serialize a symbolic transformation.""" - import json - - def encode_inst(inst: Instruction): - return [int(b) for b in inst.to_bytecode()] - - return json.dumps({ - 'arch': t.arch.archname, - 'from_addr': t.range[0], - 'to_addr': t.range[1], - 'instructions': [encode_inst(inst) for inst in t.instructions], - 'regs': { name: repr(expr) for name, expr in t.changed_regs.items() }, - 'mem': { repr(addr): repr(val) for addr, val in t.changed_mem.items() }, - }) - class DisassemblyContext: def __init__(self, binary): self.loc_db = LocationDB() @@ -419,6 +418,7 @@ class DisassemblyContext: """Focaccia's description of an instruction set architecture.""" # Create disassembly/lifting context + assert(self.machine.dis_engine is not None) self.mdis = self.machine.dis_engine(cont.bin_stream, loc_db=self.loc_db) self.mdis.follow_call = True self.lifter = self.machine.lifter(self.loc_db) @@ -427,56 +427,122 @@ def run_instruction(instr: miasm_instr, conc_state: MiasmSymbolResolver, lifter: Lifter) \ -> tuple[ExprInt | None, dict[Expr, Expr]]: - """Run a basic block. + """Compute the symbolic equation of a single instruction. - Tries to run IR blocks until the end of an ASM block/basic block is - reached. Skips 'virtual' blocks that purely exist in the IR. + The concolic engine tries to express the instruction's equation as + independent of the concrete state as possible. + + May fail if the instruction is not supported. Failure is signalled by + returning `None` as the next program counter. :param instr: The instruction to run. :param conc_state: A concrete reference state at `pc = instr.offset`. Used - to resolve symbolic program counters, i.e. to 'guide' the - symbolic execution on the correct path. This is the + to resolve symbolic program counters, i.e. to 'guide' + the symbolic execution on the correct path. This is the concrete part of our concolic execution. - :param lifter: A lifter of the appropriate architecture. Get this from a - `DisassemblyContext` or a `Machine`. + :param lifter: A lifter of the appropriate architecture. Get this from + a `DisassemblyContext` or a `Machine`. - :return: The next program counter and a symbolic state. None if no next - program counter can be found. This happens when an error occurs or - when the program exits. The state represents the transformation - which the instruction applies to a program state. + :return: The next program counter and a symbolic state. The PC is None if + an error occurs or when the program exits. The returned state + is `instr`'s symbolic transformation. """ - def execute_location(loc, base_state: dict | None) -> tuple[ExprInt | None, dict]: + from miasm.expression.expression import ExprCond, LocKey + from miasm.expression.simplifications import expr_simp + + def create_cond_state(cond: Expr, iftrue: dict, iffalse: dict) -> dict: + """Combines states that are to be reached conditionally. + + Example: + State A: + RAX = 0x42 + @[RBP - 0x4] = 0x123 + State B: + RDI = -0x777 + @[RBP - 0x4] = 0x5c32 + Condition: + RCX > 0x4 ? A : B + + Result State: + RAX = (RCX > 0x4) ? 0x42 : RAX + RDI = (RCX > 0x4) ? RDI : -0x777 + @[RBP - 0x4] = (RCX > 0x4) ? 0x123 : 0x5c32 + """ + res = {} + for dst, v in iftrue.items(): + if dst not in iffalse: + res[dst] = expr_simp(ExprCond(cond, v, dst)) + else: + res[dst] = expr_simp(ExprCond(cond, v, iffalse[dst])) + for dst, v in iffalse.items(): + if dst not in iftrue: + res[dst] = expr_simp(ExprCond(cond, dst, v)) + return res + + def _execute_location(loc, base_state: dict | None) \ + -> tuple[Expr, dict]: + """Execute a single IR block via symbolic engine. No fancy stuff.""" # Query the location's IR block irblock = ircfg.get_block(loc) if irblock is None: - # Weirdly, this never seems to happen. I don't know why, though. - return None, base_state if base_state is not None else {} + return loc, base_state if base_state is not None else {} # Apply IR block to the current state engine = SymbolicExecutionEngine(lifter, state=base_state) new_pc = engine.eval_updt_irblock(irblock) modified = dict(engine.modified()) + return new_pc, modified + + def execute_location(loc: Expr | LocKey) -> tuple[ExprInt, dict]: + """Execute chains of IR blocks until a concrete program counter is + reached.""" + seen_locs = set() # To break out of loop instructions + new_pc, modified = _execute_location(loc, None) + + # Run chained IR blocks until a real program counter is reached. + # This used to be recursive (and much more elegant), but large RCX + # values for 'REP ...' instructions could make the stack overflow. + while not new_pc.is_int(): + seen_locs.add(new_pc) + + if new_pc.is_loc(): + # Jump to the next location. + new_pc, modified = _execute_location(new_pc, modified) + elif new_pc.is_cond(): + # Explore conditional paths manually by constructing + # conditional states based on the possible outcomes. + assert(isinstance(new_pc, ExprCond)) + cond = new_pc.cond + pc_iftrue, pc_iffalse = new_pc.src1, new_pc.src2 + + pc_t, state_t = _execute_location(pc_iftrue, modified.copy()) + pc_f, state_f = _execute_location(pc_iffalse, modified.copy()) + modified = create_cond_state(cond, state_t, state_f) + new_pc = expr_simp(ExprCond(cond, pc_t, pc_f)) + else: + # Concretisize PC in case it is, e.g., a memory expression + new_pc = eval_expr(new_pc, conc_state) - # Resolve the symbolic PC to a concrete value based on the - # concrete program state before this instruction - new_pc = eval_expr(new_pc, conc_state) - if new_pc.is_loc(): - # Run chained IR blocks until a real program counter is reached - new_pc, modified = execute_location(new_pc, modified) + # Avoid infinite loops for loop instructions (REP ...) by making + # the jump to the next loop iteration (or exit) concrete. + if new_pc in seen_locs: + new_pc = eval_expr(new_pc, conc_state) + seen_locs.clear() - assert(new_pc is not None and isinstance(new_pc, ExprInt)) + assert(isinstance(new_pc, ExprInt)) return new_pc, modified # Lift instruction to IR ircfg = lifter.new_ircfg() try: loc = lifter.add_instr_to_ircfg(instr, ircfg, None, False) + assert(isinstance(loc, Expr) or isinstance(loc, LocKey)) except NotImplementedError as err: warn(f'[WARNING] Unable to lift instruction {instr}: {err}. Skipping.') return None, {} # Create an empty transform for the instruction # Execute instruction symbolically - new_pc, modified = execute_location(loc, None) + new_pc, modified = execute_location(loc) modified[lifter.pc] = new_pc # Add PC update to state return new_pc, modified @@ -514,21 +580,22 @@ class _LLDBConcreteState(ReadableProgramState): except ConcreteMemoryError: raise MemoryAccessError(addr, size, 'Unable to read memory from LLDB.') -def collect_symbolic_trace(binary: str, - args: list[str], +def collect_symbolic_trace(env: TraceEnvironment, start_addr: int | None = None - ) -> list[SymbolicTransform]: + ) -> Trace[SymbolicTransform]: """Execute a program and compute state transformations between executed instructions. :param binary: The binary to trace. :param args: Arguments to the program. """ + binary = env.binary_name + ctx = DisassemblyContext(binary) arch = ctx.arch # Set up concrete reference state - target = LLDBConcreteTarget(binary, args) + target = LLDBConcreteTarget(binary, env.argv, env.envp) if start_addr is not None: target.run_until(start_addr) lldb_state = _LLDBConcreteState(target, arch) @@ -563,4 +630,4 @@ def collect_symbolic_trace(binary: str, # Step forward target.step() - return strace + return Trace(strace, env) diff --git a/focaccia/trace.py b/focaccia/trace.py new file mode 100644 index 0000000..094358f --- /dev/null +++ b/focaccia/trace.py @@ -0,0 +1,74 @@ +from __future__ import annotations +from typing import Generic, TypeVar + +from .utils import file_hash + +T = TypeVar('T') + +class TraceEnvironment: + """Data that defines the environment in which a trace was recorded.""" + def __init__(self, + binary: str, + argv: list[str], + envp: list[str], + binary_hash: str | None = None): + self.argv = argv + self.envp = envp + self.binary_name = binary + if binary_hash is None: + self.binary_hash = file_hash(binary) + else: + self.binary_hash = binary_hash + + @classmethod + def from_json(cls, json: dict) -> TraceEnvironment: + """Parse a JSON object into a TraceEnvironment.""" + return cls( + json['binary_name'], + json['argv'], + json['envp'], + json['binary_hash'], + ) + + def to_json(self) -> dict: + """Serialize a TraceEnvironment to a JSON object.""" + return { + 'binary_name': self.binary_name, + 'binary_hash': self.binary_hash, + 'argv': self.argv, + 'envp': self.envp, + } + + def __eq__(self, other: object) -> bool: + if not isinstance(other, TraceEnvironment): + return False + + return self.binary_name == other.binary_name \ + and self.binary_hash == other.binary_hash \ + and self.argv == other.argv \ + and self.envp == other.envp + + def __repr__(self) -> str: + return f'{self.binary_name} {" ".join(self.argv)}' \ + f'\n bin-hash={self.binary_hash}' \ + f'\n envp={repr(self.envp)}' + +class Trace(Generic[T]): + def __init__(self, + trace_states: list[T], + env: TraceEnvironment): + self.states = trace_states + self.env = env + + def __len__(self) -> int: + return len(self.states) + + def __getitem__(self, i: int) -> T: + return self.states[i] + + def __iter__(self): + return iter(self.states) + + def __repr__(self) -> str: + return f'Trace with {len(self.states)} trace points.' \ + f' Environment: {repr(self.env)}' diff --git a/focaccia/utils.py b/focaccia/utils.py index 50251a1..c4f6a74 100644 --- a/focaccia/utils.py +++ b/focaccia/utils.py @@ -1,8 +1,39 @@ +from __future__ import annotations + import ctypes -import sys +import os import shutil +import sys +from functools import total_ordering +from hashlib import sha256 + +@total_ordering +class ErrorSeverity: + def __init__(self, num: int, name: str): + """Construct an error severity. + + :param num: A numerical value that orders the severity with respect + to other `ErrorSeverity` objects. Smaller values are less + severe. + :param name: A descriptive name for the error severity, e.g. 'fatal' + or 'info'. + """ + self._numeral = num + self.name = name + + def __repr__(self) -> str: + return f'[{self.name}]' -from .compare import ErrorSeverity + def __eq__(self, other: object) -> bool: + if not isinstance(other, ErrorSeverity): + return False + return self._numeral == other._numeral + + def __lt__(self, other: ErrorSeverity) -> bool: + return self._numeral < other._numeral + + def __hash__(self) -> int: + return hash(self._numeral) def float_bits_to_uint(v: float) -> int: """Bit-cast a float to a 32-bit integer.""" @@ -20,6 +51,31 @@ def uint_bits_to_double(v: int) -> float: """Bit-cast a 64-bit integer to a double.""" return ctypes.c_double.from_buffer(ctypes.c_uint64(v)).value +def file_hash(filename: str, hash = sha256(), chunksize: int = 65536) -> str: + """Calculate a file's hash. + + :param filename: Name of the file to hash. + :param hash: The hash algorithm to use. + :param chunksize: Optimization option. Size of contiguous chunks to read + from the file and feed into the hashing algorithm. + :return: A hex digest. + """ + with open(filename, 'rb') as file: + while True: + data = file.read(chunksize) + if not data: + break + hash.update(data) + return hash.hexdigest() + +def get_envp() -> list[str]: + """Return current environment array. + + Merge dict-like `os.environ` struct to the traditional list-like + environment array. + """ + return [f'{k}={v}' for k, v in os.environ.items()] + def print_separator(separator: str = '-', stream=sys.stdout, count: int = 80): maxtermsize = count termsize = shutil.get_terminal_size((80, 20)).columns diff --git a/tools/_qemu_tool.py b/tools/_qemu_tool.py index 9659f0e..38715dc 100644 --- a/tools/_qemu_tool.py +++ b/tools/_qemu_tool.py @@ -15,7 +15,8 @@ from focaccia.compare import compare_symbolic from focaccia.snapshot import ProgramState, ReadableProgramState, \ RegisterAccessError, MemoryAccessError from focaccia.symbolic import SymbolicTransform, eval_symbol, ExprMem -from focaccia.utils import print_result +from focaccia.trace import Trace, TraceEnvironment +from focaccia.utils import get_envp, print_result from verify_qemu import make_argparser, verbosity @@ -76,6 +77,7 @@ class GDBServerStateIterator: exit(1) self.arch = supported_architectures[archname] + self.binary = self._process.progspace.filename def __iter__(self): return self @@ -98,6 +100,77 @@ class GDBServerStateIterator: return GDBProgramState(self._process, gdb.selected_frame()) +def record_minimal_snapshot(prev_state: ReadableProgramState, + cur_state: ReadableProgramState, + prev_transform: SymbolicTransform, + cur_transform: SymbolicTransform) \ + -> ProgramState: + """Record a minimal snapshot. + + A minimal snapshot must include values (registers and memory) that are + accessed by two transformations: + 1. The values produced by the previous transformation (the + transformation that is producing this snapshot) to check these + values against expected values calculated from the previous + program state. + 2. The values that act as inputs to the transformation acting on this + snapshot, to calculate the expected values of the next snapshot. + + :param prev_transform: The symbolic transformation generating, or + leading to, `cur_state`. Values generated by + this transformation are included in the + snapshot. + :param transform: The symbolic transformation operating on this + snapshot. Input values to this transformation are + included in the snapshot. + """ + assert(cur_state.read_register('pc') == cur_transform.addr) + assert(prev_transform.arch == cur_transform.arch) + + def get_written_addresses(t: SymbolicTransform): + """Get all output memory accesses of a symbolic transformation.""" + return [ExprMem(a, v.size) for a, v in t.changed_mem.items()] + + def set_values(regs: Iterable[str], mems: Iterable[ExprMem], + cur_state: ReadableProgramState, + prev_state: ReadableProgramState, + out_state: ProgramState): + """ + :param prev_state: Addresses of memory included in the snapshot are + resolved relative to this state. + """ + for regname in regs: + regval = cur_state.read_register(regname) + try: + out_state.set_register(regname, regval) + except RegisterAccessError: + pass + for mem in mems: + assert(mem.size % 8 == 0) + addr = eval_symbol(mem.ptr, prev_state) + try: + mem = cur_state.read_memory(addr, int(mem.size / 8)) + out_state.write_memory(addr, mem) + except MemoryAccessError: + pass + + state = ProgramState(cur_transform.arch) + state.set_register('PC', cur_transform.addr) + + set_values(prev_transform.changed_regs.keys(), + get_written_addresses(prev_transform), + cur_state, + prev_state, # Evaluate memory addresses based on previous + # state because they are that state's output + # addresses. + state) + set_values(cur_transform.get_used_registers(), + cur_transform.get_used_memory_addresses(), + cur_state, + cur_state, + state) + return state + def collect_conc_trace(gdb: GDBServerStateIterator, \ strace: list[SymbolicTransform]) \ -> tuple[list[ProgramState], list[SymbolicTransform]]: @@ -115,75 +188,6 @@ def collect_conc_trace(gdb: GDBServerStateIterator, \ :return: A list of concrete states and a list of corresponding symbolic transformations. The lists are guaranteed to have the same length. """ - def record_snapshot(prev_state: ReadableProgramState, - cur_state: GDBProgramState, - prev_transform: SymbolicTransform, - cur_transform: SymbolicTransform) \ - -> ProgramState: - """Record a minimal snapshot. - - A minimal snapshot must include values (registers and memory) that are - accessed by two transformations: - 1. The values produced by the previous transformation (the - transformation that is producing this snapshot) to check these - values against expected values calculated from the previous - program state. - 2. The values that act as inputs to the transformation acting on this - snapshot, to calculate the expected values of the next snapshot. - - :param prev_transform: The symbolic transformation generating, or - leading to, `gdb_state`. Values generated by - this transformation are included in the - snapshot. - :param transform: The symbolic transformation operating on this - snapshot. Input values to this transformation are - included in the snapshot. - """ - assert(cur_state.read_register('pc') == cur_transform.addr) - - def get_written_addresses(t: SymbolicTransform): - """Get all output memory accesses of a symbolic transformation.""" - return [ExprMem(a, v.size) for a, v in t.changed_mem.items()] - - def set_values(regs: Iterable[str], mems: Iterable[ExprMem], - cur_state: GDBProgramState, prev_state: ReadableProgramState, - out_state: ProgramState): - """ - :param prev_state: Addresses of memory included in the snapshot are - resolved relative to this state. - """ - for regname in regs: - regval = cur_state.read_register(regname) - try: - out_state.set_register(regname, regval) - except RegisterAccessError: - pass - for mem in mems: - assert(mem.size % 8 == 0) - addr = eval_symbol(mem.ptr, prev_state) - try: - mem = cur_state.read_memory(addr, int(mem.size / 8)) - out_state.write_memory(addr, mem) - except MemoryAccessError: - pass - - state = ProgramState(gdb.arch) - state.set_register('PC', cur_transform.addr) - - set_values(prev_transform.changed_regs.keys(), - get_written_addresses(prev_transform), - cur_state, - prev_state, # Evaluate memory addresses based on previous - # state because they are that state's output - # addresses. - state) - set_values(cur_transform.get_used_registers(), - cur_transform.get_used_memory_addresses(), - cur_state, - cur_state, - state) - return state - def find_index(seq, target, access=lambda el: el): for i, el in enumerate(seq): if access(el) == target: @@ -222,7 +226,7 @@ def collect_conc_trace(gdb: GDBServerStateIterator, \ symb_i += next_i + 1 assert(cur_state.read_register('pc') == strace[symb_i].addr) - states.append(record_snapshot( + states.append(record_minimal_snapshot( states[-1] if states else cur_state, cur_state, matched_transforms[-1] if matched_transforms else strace[symb_i], @@ -240,6 +244,12 @@ def main(): gdbserver_addr = 'localhost' gdbserver_port = args.port + gdb_server = GDBServerStateIterator(gdbserver_addr, gdbserver_port) + + executable = gdb_server.binary + argv = [] # QEMU's GDB stub does not support 'info proc cmdline' + envp = [] # Can't get the remote target's environment + env = TraceEnvironment(executable, argv, envp, '?') # Read pre-computed symbolic trace with open(args.symb_trace, 'r') as strace: @@ -247,8 +257,8 @@ def main(): # Use symbolic trace to collect concrete trace from QEMU conc_states, matched_transforms = collect_conc_trace( - GDBServerStateIterator(gdbserver_addr, gdbserver_port), - symb_transforms) + gdb_server, + symb_transforms.states) # Verify and print result if not args.quiet: @@ -258,7 +268,7 @@ def main(): if args.output: from focaccia.parser import serialize_snapshots with open(args.output, 'w') as file: - serialize_snapshots(conc_states, file) + serialize_snapshots(Trace(conc_states, env), file) if __name__ == "__main__": main() diff --git a/tools/capture_transforms.py b/tools/capture_transforms.py index 5439b05..5a104c0 100755 --- a/tools/capture_transforms.py +++ b/tools/capture_transforms.py @@ -2,9 +2,11 @@ import argparse import logging +import os from focaccia import parser from focaccia.symbolic import collect_symbolic_trace +from focaccia.trace import Trace, TraceEnvironment def main(): prog = argparse.ArgumentParser() @@ -19,7 +21,10 @@ def main(): args = prog.parse_args() logging.disable(logging.CRITICAL) - trace = collect_symbolic_trace(args.binary, args.args, None) + env = TraceEnvironment(args.binary, + args.args, + [f'{k}={v}' for k, v in os.environ.items()]) + trace = collect_symbolic_trace(env, None) with open(args.output, 'w') as file: parser.serialize_transformations(trace, file) |