diff options
| -rw-r--r-- | focaccia/compare.py | 33 | ||||
| -rw-r--r-- | focaccia/lldb_target.py | 16 | ||||
| -rw-r--r-- | focaccia/match.py | 2 | ||||
| -rw-r--r-- | focaccia/miasm_util.py | 1 | ||||
| -rw-r--r-- | focaccia/parser.py | 49 | ||||
| -rw-r--r-- | focaccia/symbolic.py | 211 | ||||
| -rw-r--r-- | focaccia/trace.py | 74 | ||||
| -rw-r--r-- | focaccia/utils.py | 60 | ||||
| -rw-r--r-- | tools/_qemu_tool.py | 158 | ||||
| -rwxr-xr-x | tools/capture_transforms.py | 7 |
10 files changed, 413 insertions, 198 deletions
diff --git a/focaccia/compare.py b/focaccia/compare.py index 65c0f49..13f965c 100644 --- a/focaccia/compare.py +++ b/focaccia/compare.py @@ -1,36 +1,9 @@ -from functools import total_ordering -from typing import Iterable, Self +from __future__ import annotations +from typing import Iterable from .snapshot import ProgramState, MemoryAccessError, RegisterAccessError from .symbolic import SymbolicTransform - -@total_ordering -class ErrorSeverity: - def __init__(self, num: int, name: str): - """Construct an error severity. - - :param num: A numerical value that orders the severity with respect - to other `ErrorSeverity` objects. Smaller values are less - severe. - :param name: A descriptive name for the error severity, e.g. 'fatal' - or 'info'. - """ - self._numeral = num - self.name = name - - def __repr__(self) -> str: - return f'[{self.name}]' - - def __eq__(self, other: object) -> bool: - if not isinstance(other, Self): - return False - return self._numeral == other._numeral - - def __lt__(self, other: Self) -> bool: - return self._numeral < other._numeral - - def __hash__(self) -> int: - return hash(self._numeral) +from .utils import ErrorSeverity class ErrorTypes: INFO = ErrorSeverity(0, 'INFO') diff --git a/focaccia/lldb_target.py b/focaccia/lldb_target.py index b51ec3d..a4e96a8 100644 --- a/focaccia/lldb_target.py +++ b/focaccia/lldb_target.py @@ -1,3 +1,5 @@ +import os + import lldb from .arch import supported_architectures, x86 @@ -30,11 +32,21 @@ class ConcreteSectionError(Exception): pass class LLDBConcreteTarget: - def __init__(self, executable: str, argv: list[str] = []): + def __init__(self, + executable: str, + argv: list[str] = [], + envp: list[str] | None = None): """Construct an LLDB concrete target. Stop at entry. + :param argv: List of arguements. Does NOT include the conventional + executable name as the first entry. + :param envp: List of environment entries. Defaults to current + `os.environ` if `None`. :raises RuntimeError: If the process is unable to launch. """ + if envp is None: + envp = [f'{k}={v}' for k, v in os.environ.items()] + self.debugger = lldb.SBDebugger.Create() self.debugger.SetAsync(False) self.target = self.debugger.CreateTargetWithFileAndArch(executable, @@ -46,7 +58,7 @@ class LLDBConcreteTarget: self.error = lldb.SBError() self.listener = self.debugger.GetListener() self.process = self.target.Launch(self.listener, - argv, None, # argv, envp + argv, envp, # argv, envp None, None, None, # stdin, stdout, stderr None, # working directory 0, diff --git a/focaccia/match.py b/focaccia/match.py index 4c8071f..be88176 100644 --- a/focaccia/match.py +++ b/focaccia/match.py @@ -54,6 +54,8 @@ def fold_traces(ctrace: list[ProgramState], while i + 1 < len(strace): strace[i].concat(strace.pop(i + 1)) + return ctrace, strace + def match_traces(ctrace: list[ProgramState], \ strace: list[SymbolicTransform]): """Try to match traces that don't follow the same program flow. diff --git a/focaccia/miasm_util.py b/focaccia/miasm_util.py index 4299f87..24a0e11 100644 --- a/focaccia/miasm_util.py +++ b/focaccia/miasm_util.py @@ -158,6 +158,7 @@ def _eval_exprmem(expr: ExprMem, state: MiasmSymbolResolver): if not addr.is_int(): return expr + assert(isinstance(addr, ExprInt)) mem = state.resolve_memory(int(addr), expr.size // 8) if mem is None: return expr diff --git a/focaccia/parser.py b/focaccia/parser.py index 9fb83d8..c37c07a 100644 --- a/focaccia/parser.py +++ b/focaccia/parser.py @@ -4,11 +4,11 @@ import base64 import json import re from typing import TextIO -import lldb from .arch import supported_architectures, Arch from .snapshot import ProgramState from .symbolic import SymbolicTransform +from .trace import Trace, TraceEnvironment class ParseError(Exception): """A parse error.""" @@ -20,25 +20,30 @@ def _get_or_throw(obj: dict, key: str): return val raise ParseError(f'Expected value at key {key}, but found none.') -def parse_transformations(json_stream: TextIO) -> list[SymbolicTransform]: +def parse_transformations(json_stream: TextIO) -> Trace[SymbolicTransform]: """Parse symbolic transformations from a text stream.""" - from .symbolic import parse_symbolic_transform + data = json.load(json_stream) - json_data = json.load(json_stream) - return [parse_symbolic_transform(item) for item in json_data] + env = TraceEnvironment.from_json(_get_or_throw(data, 'env')) + strace = [SymbolicTransform.from_json(item) \ + for item in _get_or_throw(data, 'states')] + + return Trace(strace, env) -def serialize_transformations(transforms: list[SymbolicTransform], +def serialize_transformations(transforms: Trace[SymbolicTransform], out_stream: TextIO): """Serialize symbolic transformations to a text stream.""" - from .symbolic import serialize_symbolic_transform - - json.dump([serialize_symbolic_transform(t) for t in transforms], out_stream) + json.dump({ + 'env': transforms.env.to_json(), + 'states': [t.to_json() for t in transforms], + }, out_stream) -def parse_snapshots(json_stream: TextIO) -> list[ProgramState]: +def parse_snapshots(json_stream: TextIO) -> Trace[ProgramState]: """Parse snapshots from our JSON format.""" json_data = json.load(json_stream) arch = supported_architectures[_get_or_throw(json_data, 'architecture')] + env = TraceEnvironment.from_json(_get_or_throw(json_data, 'env')) snapshots = [] for snapshot in _get_or_throw(json_data, 'snapshots'): state = ProgramState(arch) @@ -52,15 +57,19 @@ def parse_snapshots(json_stream: TextIO) -> list[ProgramState]: snapshots.append(state) - return snapshots + return Trace(snapshots, env) -def serialize_snapshots(snapshots: list[ProgramState], out_stream: TextIO): +def serialize_snapshots(snapshots: Trace[ProgramState], out_stream: TextIO): """Serialize a list of snapshots to out JSON format.""" if not snapshots: return json.dump({}, out_stream) arch = snapshots[0].arch - res = { 'architecture': arch.archname, 'snapshots': [] } + res = { + 'architecture': arch.archname, + 'env': snapshots.env.to_json(), + 'snapshots': [] + } for snapshot in snapshots: assert(snapshot.arch == arch) regs = {r: v for r, v in snapshot.regs.items() if v is not None} @@ -74,9 +83,15 @@ def serialize_snapshots(snapshots: list[ProgramState], out_stream: TextIO): json.dump(res, out_stream) -def parse_qemu(stream: TextIO, arch: Arch) -> list[ProgramState]: +def _make_unknown_env() -> TraceEnvironment: + return TraceEnvironment('', [], [], '?') + +def parse_qemu(stream: TextIO, arch: Arch) -> Trace[ProgramState]: """Parse a QEMU log from a stream. + Recommended QEMU log option: `qemu -d exec,cpu,fpu,vpu,nochain`. The `exec` + flag is strictly necessary for the log to be parseable. + :return: A list of parsed program states, in order of occurrence in the log. """ @@ -88,7 +103,7 @@ def parse_qemu(stream: TextIO, arch: Arch) -> list[ProgramState]: if states: _parse_qemu_line(line, states[-1]) - return states + return Trace(states, _make_unknown_env()) def _parse_qemu_line(line: str, cur_state: ProgramState): """Try to parse a single register-assignment line from a QEMU log. @@ -129,7 +144,7 @@ def _parse_qemu_line(line: str, cur_state: ProgramState): if regname is not None: cur_state.set_register(regname, int(value, 16)) -def parse_arancini(stream: TextIO, arch: Arch) -> list[ProgramState]: +def parse_arancini(stream: TextIO, arch: Arch) -> Trace[ProgramState]: aliases = { 'Program counter': 'RIP', 'flag ZF': 'ZF', @@ -154,4 +169,4 @@ def parse_arancini(stream: TextIO, arch: Arch) -> list[ProgramState]: if regname is not None: states[-1].set_register(regname, int(value, 16)) - return states + return Trace(states, _make_unknown_env()) diff --git a/focaccia/symbolic.py b/focaccia/symbolic.py index 029e979..dfc2966 100644 --- a/focaccia/symbolic.py +++ b/focaccia/symbolic.py @@ -20,6 +20,7 @@ from .lldb_target import LLDBConcreteTarget, \ from .miasm_util import MiasmSymbolResolver, eval_expr from .snapshot import ProgramState, ReadableProgramState, \ RegisterAccessError, MemoryAccessError +from .trace import Trace, TraceEnvironment warn = logging.warning @@ -342,6 +343,49 @@ class SymbolicTransform: res[addr] = eval_symbol(expr, conc_state).to_bytes(length, byteorder='big') return res + @classmethod + def from_json(cls, data: dict) -> SymbolicTransform: + """Parse a symbolic transformation from a JSON object. + + :raise KeyError: if a parse error occurs. + """ + from miasm.expression.parser import str_to_expr as parse + + def decode_inst(obj: Iterable[int], arch: Arch): + b = b''.join(i.to_bytes(1, byteorder='big') for i in obj) + return Instruction.from_bytecode(b, arch) + + arch = supported_architectures[data['arch']] + start_addr = int(data['from_addr']) + end_addr = int(data['to_addr']) + + t = SymbolicTransform({}, [], arch, start_addr, end_addr) + t.changed_regs = { name: parse(val) for name, val in data['regs'].items() } + t.changed_mem = { parse(addr): parse(val) for addr, val in data['mem'].items() } + t.instructions = [decode_inst(b, arch) for b in data['instructions']] + + # Recover the instructions' address information + addr = t.addr + for inst in t.instructions: + inst.addr = addr + addr += inst.length + + return t + + def to_json(self) -> dict: + """Serialize a symbolic transformation as a JSON object.""" + def encode_inst(inst: Instruction): + return [int(b) for b in inst.to_bytecode()] + + return { + 'arch': self.arch.archname, + 'from_addr': self.range[0], + 'to_addr': self.range[1], + 'instructions': [encode_inst(inst) for inst in self.instructions], + 'regs': { name: repr(expr) for name, expr in self.changed_regs.items() }, + 'mem': { repr(addr): repr(val) for addr, val in self.changed_mem.items() }, + } + def __repr__(self) -> str: start, end = self.range res = f'Symbolic state transformation {hex(start)} -> {hex(end)}:\n' @@ -356,51 +400,6 @@ class SymbolicTransform: return res[:-1] # Remove trailing newline -def parse_symbolic_transform(string: str) -> SymbolicTransform: - """Parse a symbolic transformation from a string. - :raise KeyError: if a parse error occurs. - """ - import json - from miasm.expression.parser import str_to_expr as parse - - def decode_inst(obj: Iterable[int], arch: Arch): - b = b''.join(i.to_bytes(1, byteorder='big') for i in obj) - return Instruction.from_bytecode(b, arch) - - data = json.loads(string) - arch = supported_architectures[data['arch']] - start_addr = int(data['from_addr']) - end_addr = int(data['to_addr']) - - t = SymbolicTransform({}, [], arch, start_addr, end_addr) - t.changed_regs = { name: parse(val) for name, val in data['regs'].items() } - t.changed_mem = { parse(addr): parse(val) for addr, val in data['mem'].items() } - t.instructions = [decode_inst(b, arch) for b in data['instructions']] - - # Recover the instructions' address information - addr = t.addr - for inst in t.instructions: - inst.addr = addr - addr += inst.length - - return t - -def serialize_symbolic_transform(t: SymbolicTransform) -> str: - """Serialize a symbolic transformation.""" - import json - - def encode_inst(inst: Instruction): - return [int(b) for b in inst.to_bytecode()] - - return json.dumps({ - 'arch': t.arch.archname, - 'from_addr': t.range[0], - 'to_addr': t.range[1], - 'instructions': [encode_inst(inst) for inst in t.instructions], - 'regs': { name: repr(expr) for name, expr in t.changed_regs.items() }, - 'mem': { repr(addr): repr(val) for addr, val in t.changed_mem.items() }, - }) - class DisassemblyContext: def __init__(self, binary): self.loc_db = LocationDB() @@ -419,6 +418,7 @@ class DisassemblyContext: """Focaccia's description of an instruction set architecture.""" # Create disassembly/lifting context + assert(self.machine.dis_engine is not None) self.mdis = self.machine.dis_engine(cont.bin_stream, loc_db=self.loc_db) self.mdis.follow_call = True self.lifter = self.machine.lifter(self.loc_db) @@ -427,56 +427,122 @@ def run_instruction(instr: miasm_instr, conc_state: MiasmSymbolResolver, lifter: Lifter) \ -> tuple[ExprInt | None, dict[Expr, Expr]]: - """Run a basic block. + """Compute the symbolic equation of a single instruction. - Tries to run IR blocks until the end of an ASM block/basic block is - reached. Skips 'virtual' blocks that purely exist in the IR. + The concolic engine tries to express the instruction's equation as + independent of the concrete state as possible. + + May fail if the instruction is not supported. Failure is signalled by + returning `None` as the next program counter. :param instr: The instruction to run. :param conc_state: A concrete reference state at `pc = instr.offset`. Used - to resolve symbolic program counters, i.e. to 'guide' the - symbolic execution on the correct path. This is the + to resolve symbolic program counters, i.e. to 'guide' + the symbolic execution on the correct path. This is the concrete part of our concolic execution. - :param lifter: A lifter of the appropriate architecture. Get this from a - `DisassemblyContext` or a `Machine`. + :param lifter: A lifter of the appropriate architecture. Get this from + a `DisassemblyContext` or a `Machine`. - :return: The next program counter and a symbolic state. None if no next - program counter can be found. This happens when an error occurs or - when the program exits. The state represents the transformation - which the instruction applies to a program state. + :return: The next program counter and a symbolic state. The PC is None if + an error occurs or when the program exits. The returned state + is `instr`'s symbolic transformation. """ - def execute_location(loc, base_state: dict | None) -> tuple[ExprInt | None, dict]: + from miasm.expression.expression import ExprCond, LocKey + from miasm.expression.simplifications import expr_simp + + def create_cond_state(cond: Expr, iftrue: dict, iffalse: dict) -> dict: + """Combines states that are to be reached conditionally. + + Example: + State A: + RAX = 0x42 + @[RBP - 0x4] = 0x123 + State B: + RDI = -0x777 + @[RBP - 0x4] = 0x5c32 + Condition: + RCX > 0x4 ? A : B + + Result State: + RAX = (RCX > 0x4) ? 0x42 : RAX + RDI = (RCX > 0x4) ? RDI : -0x777 + @[RBP - 0x4] = (RCX > 0x4) ? 0x123 : 0x5c32 + """ + res = {} + for dst, v in iftrue.items(): + if dst not in iffalse: + res[dst] = expr_simp(ExprCond(cond, v, dst)) + else: + res[dst] = expr_simp(ExprCond(cond, v, iffalse[dst])) + for dst, v in iffalse.items(): + if dst not in iftrue: + res[dst] = expr_simp(ExprCond(cond, dst, v)) + return res + + def _execute_location(loc, base_state: dict | None) \ + -> tuple[Expr, dict]: + """Execute a single IR block via symbolic engine. No fancy stuff.""" # Query the location's IR block irblock = ircfg.get_block(loc) if irblock is None: - # Weirdly, this never seems to happen. I don't know why, though. - return None, base_state if base_state is not None else {} + return loc, base_state if base_state is not None else {} # Apply IR block to the current state engine = SymbolicExecutionEngine(lifter, state=base_state) new_pc = engine.eval_updt_irblock(irblock) modified = dict(engine.modified()) + return new_pc, modified + + def execute_location(loc: Expr | LocKey) -> tuple[ExprInt, dict]: + """Execute chains of IR blocks until a concrete program counter is + reached.""" + seen_locs = set() # To break out of loop instructions + new_pc, modified = _execute_location(loc, None) + + # Run chained IR blocks until a real program counter is reached. + # This used to be recursive (and much more elegant), but large RCX + # values for 'REP ...' instructions could make the stack overflow. + while not new_pc.is_int(): + seen_locs.add(new_pc) + + if new_pc.is_loc(): + # Jump to the next location. + new_pc, modified = _execute_location(new_pc, modified) + elif new_pc.is_cond(): + # Explore conditional paths manually by constructing + # conditional states based on the possible outcomes. + assert(isinstance(new_pc, ExprCond)) + cond = new_pc.cond + pc_iftrue, pc_iffalse = new_pc.src1, new_pc.src2 + + pc_t, state_t = _execute_location(pc_iftrue, modified.copy()) + pc_f, state_f = _execute_location(pc_iffalse, modified.copy()) + modified = create_cond_state(cond, state_t, state_f) + new_pc = expr_simp(ExprCond(cond, pc_t, pc_f)) + else: + # Concretisize PC in case it is, e.g., a memory expression + new_pc = eval_expr(new_pc, conc_state) - # Resolve the symbolic PC to a concrete value based on the - # concrete program state before this instruction - new_pc = eval_expr(new_pc, conc_state) - if new_pc.is_loc(): - # Run chained IR blocks until a real program counter is reached - new_pc, modified = execute_location(new_pc, modified) + # Avoid infinite loops for loop instructions (REP ...) by making + # the jump to the next loop iteration (or exit) concrete. + if new_pc in seen_locs: + new_pc = eval_expr(new_pc, conc_state) + seen_locs.clear() - assert(new_pc is not None and isinstance(new_pc, ExprInt)) + assert(isinstance(new_pc, ExprInt)) return new_pc, modified # Lift instruction to IR ircfg = lifter.new_ircfg() try: loc = lifter.add_instr_to_ircfg(instr, ircfg, None, False) + assert(isinstance(loc, Expr) or isinstance(loc, LocKey)) except NotImplementedError as err: warn(f'[WARNING] Unable to lift instruction {instr}: {err}. Skipping.') return None, {} # Create an empty transform for the instruction # Execute instruction symbolically - new_pc, modified = execute_location(loc, None) + new_pc, modified = execute_location(loc) modified[lifter.pc] = new_pc # Add PC update to state return new_pc, modified @@ -514,21 +580,22 @@ class _LLDBConcreteState(ReadableProgramState): except ConcreteMemoryError: raise MemoryAccessError(addr, size, 'Unable to read memory from LLDB.') -def collect_symbolic_trace(binary: str, - args: list[str], +def collect_symbolic_trace(env: TraceEnvironment, start_addr: int | None = None - ) -> list[SymbolicTransform]: + ) -> Trace[SymbolicTransform]: """Execute a program and compute state transformations between executed instructions. :param binary: The binary to trace. :param args: Arguments to the program. """ + binary = env.binary_name + ctx = DisassemblyContext(binary) arch = ctx.arch # Set up concrete reference state - target = LLDBConcreteTarget(binary, args) + target = LLDBConcreteTarget(binary, env.argv, env.envp) if start_addr is not None: target.run_until(start_addr) lldb_state = _LLDBConcreteState(target, arch) @@ -563,4 +630,4 @@ def collect_symbolic_trace(binary: str, # Step forward target.step() - return strace + return Trace(strace, env) diff --git a/focaccia/trace.py b/focaccia/trace.py new file mode 100644 index 0000000..094358f --- /dev/null +++ b/focaccia/trace.py @@ -0,0 +1,74 @@ +from __future__ import annotations +from typing import Generic, TypeVar + +from .utils import file_hash + +T = TypeVar('T') + +class TraceEnvironment: + """Data that defines the environment in which a trace was recorded.""" + def __init__(self, + binary: str, + argv: list[str], + envp: list[str], + binary_hash: str | None = None): + self.argv = argv + self.envp = envp + self.binary_name = binary + if binary_hash is None: + self.binary_hash = file_hash(binary) + else: + self.binary_hash = binary_hash + + @classmethod + def from_json(cls, json: dict) -> TraceEnvironment: + """Parse a JSON object into a TraceEnvironment.""" + return cls( + json['binary_name'], + json['argv'], + json['envp'], + json['binary_hash'], + ) + + def to_json(self) -> dict: + """Serialize a TraceEnvironment to a JSON object.""" + return { + 'binary_name': self.binary_name, + 'binary_hash': self.binary_hash, + 'argv': self.argv, + 'envp': self.envp, + } + + def __eq__(self, other: object) -> bool: + if not isinstance(other, TraceEnvironment): + return False + + return self.binary_name == other.binary_name \ + and self.binary_hash == other.binary_hash \ + and self.argv == other.argv \ + and self.envp == other.envp + + def __repr__(self) -> str: + return f'{self.binary_name} {" ".join(self.argv)}' \ + f'\n bin-hash={self.binary_hash}' \ + f'\n envp={repr(self.envp)}' + +class Trace(Generic[T]): + def __init__(self, + trace_states: list[T], + env: TraceEnvironment): + self.states = trace_states + self.env = env + + def __len__(self) -> int: + return len(self.states) + + def __getitem__(self, i: int) -> T: + return self.states[i] + + def __iter__(self): + return iter(self.states) + + def __repr__(self) -> str: + return f'Trace with {len(self.states)} trace points.' \ + f' Environment: {repr(self.env)}' diff --git a/focaccia/utils.py b/focaccia/utils.py index 50251a1..c4f6a74 100644 --- a/focaccia/utils.py +++ b/focaccia/utils.py @@ -1,8 +1,39 @@ +from __future__ import annotations + import ctypes -import sys +import os import shutil +import sys +from functools import total_ordering +from hashlib import sha256 + +@total_ordering +class ErrorSeverity: + def __init__(self, num: int, name: str): + """Construct an error severity. + + :param num: A numerical value that orders the severity with respect + to other `ErrorSeverity` objects. Smaller values are less + severe. + :param name: A descriptive name for the error severity, e.g. 'fatal' + or 'info'. + """ + self._numeral = num + self.name = name + + def __repr__(self) -> str: + return f'[{self.name}]' -from .compare import ErrorSeverity + def __eq__(self, other: object) -> bool: + if not isinstance(other, ErrorSeverity): + return False + return self._numeral == other._numeral + + def __lt__(self, other: ErrorSeverity) -> bool: + return self._numeral < other._numeral + + def __hash__(self) -> int: + return hash(self._numeral) def float_bits_to_uint(v: float) -> int: """Bit-cast a float to a 32-bit integer.""" @@ -20,6 +51,31 @@ def uint_bits_to_double(v: int) -> float: """Bit-cast a 64-bit integer to a double.""" return ctypes.c_double.from_buffer(ctypes.c_uint64(v)).value +def file_hash(filename: str, hash = sha256(), chunksize: int = 65536) -> str: + """Calculate a file's hash. + + :param filename: Name of the file to hash. + :param hash: The hash algorithm to use. + :param chunksize: Optimization option. Size of contiguous chunks to read + from the file and feed into the hashing algorithm. + :return: A hex digest. + """ + with open(filename, 'rb') as file: + while True: + data = file.read(chunksize) + if not data: + break + hash.update(data) + return hash.hexdigest() + +def get_envp() -> list[str]: + """Return current environment array. + + Merge dict-like `os.environ` struct to the traditional list-like + environment array. + """ + return [f'{k}={v}' for k, v in os.environ.items()] + def print_separator(separator: str = '-', stream=sys.stdout, count: int = 80): maxtermsize = count termsize = shutil.get_terminal_size((80, 20)).columns diff --git a/tools/_qemu_tool.py b/tools/_qemu_tool.py index 9659f0e..38715dc 100644 --- a/tools/_qemu_tool.py +++ b/tools/_qemu_tool.py @@ -15,7 +15,8 @@ from focaccia.compare import compare_symbolic from focaccia.snapshot import ProgramState, ReadableProgramState, \ RegisterAccessError, MemoryAccessError from focaccia.symbolic import SymbolicTransform, eval_symbol, ExprMem -from focaccia.utils import print_result +from focaccia.trace import Trace, TraceEnvironment +from focaccia.utils import get_envp, print_result from verify_qemu import make_argparser, verbosity @@ -76,6 +77,7 @@ class GDBServerStateIterator: exit(1) self.arch = supported_architectures[archname] + self.binary = self._process.progspace.filename def __iter__(self): return self @@ -98,6 +100,77 @@ class GDBServerStateIterator: return GDBProgramState(self._process, gdb.selected_frame()) +def record_minimal_snapshot(prev_state: ReadableProgramState, + cur_state: ReadableProgramState, + prev_transform: SymbolicTransform, + cur_transform: SymbolicTransform) \ + -> ProgramState: + """Record a minimal snapshot. + + A minimal snapshot must include values (registers and memory) that are + accessed by two transformations: + 1. The values produced by the previous transformation (the + transformation that is producing this snapshot) to check these + values against expected values calculated from the previous + program state. + 2. The values that act as inputs to the transformation acting on this + snapshot, to calculate the expected values of the next snapshot. + + :param prev_transform: The symbolic transformation generating, or + leading to, `cur_state`. Values generated by + this transformation are included in the + snapshot. + :param transform: The symbolic transformation operating on this + snapshot. Input values to this transformation are + included in the snapshot. + """ + assert(cur_state.read_register('pc') == cur_transform.addr) + assert(prev_transform.arch == cur_transform.arch) + + def get_written_addresses(t: SymbolicTransform): + """Get all output memory accesses of a symbolic transformation.""" + return [ExprMem(a, v.size) for a, v in t.changed_mem.items()] + + def set_values(regs: Iterable[str], mems: Iterable[ExprMem], + cur_state: ReadableProgramState, + prev_state: ReadableProgramState, + out_state: ProgramState): + """ + :param prev_state: Addresses of memory included in the snapshot are + resolved relative to this state. + """ + for regname in regs: + regval = cur_state.read_register(regname) + try: + out_state.set_register(regname, regval) + except RegisterAccessError: + pass + for mem in mems: + assert(mem.size % 8 == 0) + addr = eval_symbol(mem.ptr, prev_state) + try: + mem = cur_state.read_memory(addr, int(mem.size / 8)) + out_state.write_memory(addr, mem) + except MemoryAccessError: + pass + + state = ProgramState(cur_transform.arch) + state.set_register('PC', cur_transform.addr) + + set_values(prev_transform.changed_regs.keys(), + get_written_addresses(prev_transform), + cur_state, + prev_state, # Evaluate memory addresses based on previous + # state because they are that state's output + # addresses. + state) + set_values(cur_transform.get_used_registers(), + cur_transform.get_used_memory_addresses(), + cur_state, + cur_state, + state) + return state + def collect_conc_trace(gdb: GDBServerStateIterator, \ strace: list[SymbolicTransform]) \ -> tuple[list[ProgramState], list[SymbolicTransform]]: @@ -115,75 +188,6 @@ def collect_conc_trace(gdb: GDBServerStateIterator, \ :return: A list of concrete states and a list of corresponding symbolic transformations. The lists are guaranteed to have the same length. """ - def record_snapshot(prev_state: ReadableProgramState, - cur_state: GDBProgramState, - prev_transform: SymbolicTransform, - cur_transform: SymbolicTransform) \ - -> ProgramState: - """Record a minimal snapshot. - - A minimal snapshot must include values (registers and memory) that are - accessed by two transformations: - 1. The values produced by the previous transformation (the - transformation that is producing this snapshot) to check these - values against expected values calculated from the previous - program state. - 2. The values that act as inputs to the transformation acting on this - snapshot, to calculate the expected values of the next snapshot. - - :param prev_transform: The symbolic transformation generating, or - leading to, `gdb_state`. Values generated by - this transformation are included in the - snapshot. - :param transform: The symbolic transformation operating on this - snapshot. Input values to this transformation are - included in the snapshot. - """ - assert(cur_state.read_register('pc') == cur_transform.addr) - - def get_written_addresses(t: SymbolicTransform): - """Get all output memory accesses of a symbolic transformation.""" - return [ExprMem(a, v.size) for a, v in t.changed_mem.items()] - - def set_values(regs: Iterable[str], mems: Iterable[ExprMem], - cur_state: GDBProgramState, prev_state: ReadableProgramState, - out_state: ProgramState): - """ - :param prev_state: Addresses of memory included in the snapshot are - resolved relative to this state. - """ - for regname in regs: - regval = cur_state.read_register(regname) - try: - out_state.set_register(regname, regval) - except RegisterAccessError: - pass - for mem in mems: - assert(mem.size % 8 == 0) - addr = eval_symbol(mem.ptr, prev_state) - try: - mem = cur_state.read_memory(addr, int(mem.size / 8)) - out_state.write_memory(addr, mem) - except MemoryAccessError: - pass - - state = ProgramState(gdb.arch) - state.set_register('PC', cur_transform.addr) - - set_values(prev_transform.changed_regs.keys(), - get_written_addresses(prev_transform), - cur_state, - prev_state, # Evaluate memory addresses based on previous - # state because they are that state's output - # addresses. - state) - set_values(cur_transform.get_used_registers(), - cur_transform.get_used_memory_addresses(), - cur_state, - cur_state, - state) - return state - def find_index(seq, target, access=lambda el: el): for i, el in enumerate(seq): if access(el) == target: @@ -222,7 +226,7 @@ def collect_conc_trace(gdb: GDBServerStateIterator, \ symb_i += next_i + 1 assert(cur_state.read_register('pc') == strace[symb_i].addr) - states.append(record_snapshot( + states.append(record_minimal_snapshot( states[-1] if states else cur_state, cur_state, matched_transforms[-1] if matched_transforms else strace[symb_i], @@ -240,6 +244,12 @@ def main(): gdbserver_addr = 'localhost' gdbserver_port = args.port + gdb_server = GDBServerStateIterator(gdbserver_addr, gdbserver_port) + + executable = gdb_server.binary + argv = [] # QEMU's GDB stub does not support 'info proc cmdline' + envp = [] # Can't get the remote target's environment + env = TraceEnvironment(executable, argv, envp, '?') # Read pre-computed symbolic trace with open(args.symb_trace, 'r') as strace: @@ -247,8 +257,8 @@ def main(): # Use symbolic trace to collect concrete trace from QEMU conc_states, matched_transforms = collect_conc_trace( - GDBServerStateIterator(gdbserver_addr, gdbserver_port), - symb_transforms) + gdb_server, + symb_transforms.states) # Verify and print result if not args.quiet: @@ -258,7 +268,7 @@ def main(): if args.output: from focaccia.parser import serialize_snapshots with open(args.output, 'w') as file: - serialize_snapshots(conc_states, file) + serialize_snapshots(Trace(conc_states, env), file) if __name__ == "__main__": main() diff --git a/tools/capture_transforms.py b/tools/capture_transforms.py index 5439b05..5a104c0 100755 --- a/tools/capture_transforms.py +++ b/tools/capture_transforms.py @@ -2,9 +2,11 @@ import argparse import logging +import os from focaccia import parser from focaccia.symbolic import collect_symbolic_trace +from focaccia.trace import Trace, TraceEnvironment def main(): prog = argparse.ArgumentParser() @@ -19,7 +21,10 @@ def main(): args = prog.parse_args() logging.disable(logging.CRITICAL) - trace = collect_symbolic_trace(args.binary, args.args, None) + env = TraceEnvironment(args.binary, + args.args, + [f'{k}={v}' for k, v in os.environ.items()]) + trace = collect_symbolic_trace(env, None) with open(args.output, 'w') as file: parser.serialize_transformations(trace, file) |