diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/focaccia/arch/aarch64.py | 4 | ||||
| -rw-r--r-- | src/focaccia/arch/arch.py | 19 | ||||
| -rw-r--r-- | src/focaccia/arch/x86.py | 13 | ||||
| -rw-r--r-- | src/focaccia/compare.py | 34 | ||||
| -rw-r--r-- | src/focaccia/deterministic.py | 226 | ||||
| -rw-r--r-- | src/focaccia/lldb_target.py | 180 | ||||
| -rw-r--r-- | src/focaccia/miasm_util.py | 6 | ||||
| -rw-r--r-- | src/focaccia/parser.py | 1 | ||||
| -rw-r--r-- | src/focaccia/symbolic.py | 488 | ||||
| -rw-r--r-- | src/focaccia/tools/_qemu_tool.py | 49 | ||||
| -rwxr-xr-x | src/focaccia/tools/capture_transforms.py | 69 | ||||
| -rwxr-xr-x | src/focaccia/tools/validate_qemu.py | 4 | ||||
| -rw-r--r-- | src/focaccia/trace.py | 12 | ||||
| -rw-r--r-- | src/focaccia/utils.py | 33 |
14 files changed, 928 insertions, 210 deletions
diff --git a/src/focaccia/arch/aarch64.py b/src/focaccia/arch/aarch64.py index 0e7d98c..76f7bd4 100644 --- a/src/focaccia/arch/aarch64.py +++ b/src/focaccia/arch/aarch64.py @@ -179,3 +179,7 @@ class ArchAArch64(Arch): from . import aarch64_dczid as dczid return dczid.read return None + + def is_instr_syscall(self, instr: str) -> bool: + return instr.upper().startswith('SVC') + diff --git a/src/focaccia/arch/arch.py b/src/focaccia/arch/arch.py index 234b0d9..c220a3b 100644 --- a/src/focaccia/arch/arch.py +++ b/src/focaccia/arch/arch.py @@ -80,7 +80,7 @@ class Arch(): return self._accessors.get(_regname, None) def get_reg_reader(self, regname: str) -> Callable[[], int] | None: - """Read a register directly from Focaccia + """Read a register directly from Focaccia. :param name: The register to read. :return: The register value. @@ -90,8 +90,25 @@ class Arch(): """ return None + def is_instr_uarch_dep(self, instr: str) -> bool: + """Returns true when an instruction is microarchitecturally-dependent. + + :param name: The instruction. + :return: True if microarchitecturally-dependent. + + Microarchitecturally-dependent instructions may have different output on different + microarchitectures, without it representing an error. Such instructions usually denote feature + support, output differences are representative of an error only if a program relies on the + particular microarchitectural features. + """ + return False + + def is_instr_syscall(self, instr: str) -> bool: + return False + def __eq__(self, other): return self.archname == other.archname def __repr__(self) -> str: return self.archname + diff --git a/src/focaccia/arch/x86.py b/src/focaccia/arch/x86.py index 809055e..a5d29f5 100644 --- a/src/focaccia/arch/x86.py +++ b/src/focaccia/arch/x86.py @@ -202,3 +202,16 @@ class ArchX86(Arch): # Apply custom register alias rules return regname_aliases.get(name.upper(), None) + + def is_instr_uarch_dep(self, instr: str) -> bool: + if "XGETBV" in instr.upper(): + return True + return False + + def is_instr_syscall(self, instr: str) -> bool: + if instr.upper().startswith("SYSCALL"): + return True + if instr.upper().startswith("INT"): + return True + return False + diff --git a/src/focaccia/compare.py b/src/focaccia/compare.py index 13f965c..4fea451 100644 --- a/src/focaccia/compare.py +++ b/src/focaccia/compare.py @@ -131,7 +131,8 @@ def compare_simple(test_states: list[ProgramState], def _find_register_errors(txl_from: ProgramState, txl_to: ProgramState, - transform_truth: SymbolicTransform) \ + transform_truth: SymbolicTransform, + is_uarch_dep: bool = False) \ -> list[Error]: """Find errors in register values. @@ -155,6 +156,14 @@ def _find_register_errors(txl_from: ProgramState, )] except RegisterAccessError as err: s, e = transform_truth.range + if is_uarch_dep: + return [Error(ErrorTypes.INCOMPLETE, + f'Register transformations {hex(s)} -> {hex(e)} depend' + f' on the value of microarchitecturally-dependent register {err.regname}, which is not' + f' set in the tested state. Incorrect or missing values for such registers' + f'are errors only if the program relies on them. Such registers generally' + f'denote microarchitectural feature support and not all programs depend on' + f'all features exposed by a microarchitecture')] return [Error(ErrorTypes.INCOMPLETE, f'Register transformations {hex(s)} -> {hex(e)} depend' f' on the value of register {err.regname}, which is not' @@ -172,10 +181,21 @@ def _find_register_errors(txl_from: ProgramState, continue if txl_val != truth_val: - errors.append(Error(ErrorTypes.CONFIRMED, - f'Content of register {regname} is false.' - f' Expected value: {hex(truth_val)}, actual' - f' value in the translation: {hex(txl_val)}.')) + if is_uarch_dep: + errors.append( + Error( + ErrorTypes.POSSIBLE, + f"Content of microarchitecture-specific register {regname} is different." + f"This denotes an error only when relied upon" + f" Expected value: {hex(truth_val)}, actual" + f" value in the translation: {hex(txl_val)}.", + ) + ) + else: + errors.append(Error(ErrorTypes.CONFIRMED, + f'Content of register {regname} is false.' + f' Expected value: {hex(truth_val)}, actual' + f' value in the translation: {hex(txl_val)}.')) return errors def _find_memory_errors(txl_from: ProgramState, @@ -252,8 +272,10 @@ def _find_errors_symbolic(txl_from: ProgramState, to_pc = txl_to.read_register('PC') assert((from_pc, to_pc) == transform_truth.range) + is_uarch_dep = txl_from.arch.is_instr_uarch_dep(transform_truth.instructions[0].to_string()) + errors = [] - errors.extend(_find_register_errors(txl_from, txl_to, transform_truth)) + errors.extend(_find_register_errors(txl_from, txl_to, transform_truth, is_uarch_dep)) errors.extend(_find_memory_errors(txl_from, txl_to, transform_truth)) return errors diff --git a/src/focaccia/deterministic.py b/src/focaccia/deterministic.py new file mode 100644 index 0000000..5a2b411 --- /dev/null +++ b/src/focaccia/deterministic.py @@ -0,0 +1,226 @@ +"""Parsing of JSON files containing snapshot data.""" + +import os +import itertools +from typing import Union, Iterable + +import brotli + +from .arch import Arch +from .snapshot import ReadableProgramState + +try: + import capnp + rr_trace = capnp.load(file_name='./rr/src/rr_trace.capnp', + imports=[os.path.dirname(p) for p in capnp.__path__]) +except Exception as e: + print(f'Cannot load RR trace loader: {e}') + exit(2) + +Frame = rr_trace.Frame +TaskEvent = rr_trace.TaskEvent +MMap = rr_trace.MMap +SerializedObject = Union[Frame, TaskEvent, MMap] + +def parse_x64_registers(enc_regs: bytes, signed: bool=False) -> dict[str, int]: + idx = 0 + def parse_reg(): + nonlocal idx + enc_reg = enc_regs[idx:(idx := idx + 8)] + return int.from_bytes(enc_reg, byteorder='little', signed=signed) + + regs = {} + + regs['r15'] = parse_reg() + regs['r14'] = parse_reg() + regs['r13'] = parse_reg() + regs['r12'] = parse_reg() + regs['rbp'] = parse_reg() + regs['rbx'] = parse_reg() + + # rcx is unreliable: parsed but ignored + parse_reg() + + regs['r10'] = parse_reg() + regs['r9'] = parse_reg() + regs['r8'] = parse_reg() + + regs['rax'] = parse_reg() + + # rcx is unreliable: parsed but ignored + parse_reg() + + regs['rdx'] = parse_reg() + regs['rsi'] = parse_reg() + regs['rdi'] = parse_reg() + + regs['orig_rax'] = parse_reg() + + regs['rip'] = parse_reg() + regs['cs'] = parse_reg() + + # eflags is unreliable: parsed but ignored + parse_reg() + + regs['rsp'] = parse_reg() + regs['ss'] = parse_reg() + regs['fs_base'] = parse_reg() + regs['ds'] = parse_reg() + regs['es'] = parse_reg() + regs['fs'] = parse_reg() + regs['gs'] = parse_reg() + regs['gs_base'] = parse_reg() + + return regs + +def parse_aarch64_registers(enc_regs: bytes, order: str='little', signed: bool=False) -> dict[str, int]: + idx = 0 + def parse_reg(): + nonlocal idx + enc_reg = enc_regs[idx:(idx := idx + 8)] + return int.from_bytes(enc_reg, byteorder=order, signed=signed) + + regnames = [] + for i in range(32): + regnames.append(f'x{i}') + regnames.append('sp') + regnames.append('pc') + regnames.append('cpsr') + + regs = {} + for i in range(len(regnames)): + regs[regnames[i]] = parse_reg() + + return regs + +class Event: + def __init__(self, + pc: int, + tid: int, + arch: Arch, + event_type: str, + registers: dict[str, int], + memory_writes: dict[int, int]): + self.pc = pc + self.tid = tid + self.arch = arch + self.event_type = event_type + + self.registers = registers + self.mem_writes = memory_writes + + def match(self, pc: int, target: ReadableProgramState) -> bool: + # TODO: match the rest of the state to be sure + if self.pc == pc: + for reg, value in self.registers.items(): + if value == self.pc: + continue + if target.read_register(reg) != value: + print(f'Failed match for {reg}: {hex(value)} != {hex(target.read_register(reg))}') + return False + return True + return False + + def __repr__(self) -> str: + reg_repr = '' + for reg, value in self.registers.items(): + reg_repr += f'{reg} = {hex(value)}\n' + + mem_write_repr = '' + for addr, size in self.mem_writes.items(): + mem_write_repr += f'{hex(addr)}:{hex(addr+size)}\n' + + repr_str = f'Thread {hex(self.tid)} executed event {self.event_type} at {hex(self.pc)}\n' + repr_str += f'Register set:\n{reg_repr}' + + if len(self.mem_writes): + repr_str += f'\nMemory writes:\n{mem_write_repr}' + + return repr_str + +class DeterministicLog: + def __init__(self, log_dir: str): + self.base_directory = log_dir + + def events_file(self) -> str: + return os.path.join(self.base_directory, 'events') + + def tasks_file(self) -> str: + return os.path.join(self.base_directory, 'tasks') + + def mmaps_file(self) -> str: + return os.path.join(self.base_directory, 'mmaps') + + def _read(self, file, obj: SerializedObject) -> list[SerializedObject]: + data = bytearray() + objects = [] + with open(file, 'rb') as f: + while True: + try: + compressed_len = int.from_bytes(f.read(4), byteorder='little') + uncompressed_len = int.from_bytes(f.read(4), byteorder='little') + except Exception as e: + raise Exception(f'Malformed deterministic log: {e}') from None + + chunk = f.read(compressed_len) + if not chunk: + break + + chunk = brotli.decompress(chunk) + if len(chunk) != uncompressed_len: + raise Exception(f'Malformed deterministic log: uncompressed chunk is not equal' + f'to reported length {hex(uncompressed_len)}') + data.extend(chunk) + + for deser in obj.read_multiple_bytes_packed(data): + objects.append(deser) + return objects + + def raw_events(self) -> list[SerializedObject]: + return self._read(self.events_file(), Frame) + + def raw_tasks(self) -> list[SerializedObject]: + return self._read(self.tasks_file(), TaskEvent) + + def raw_mmaps(self) -> list[SerializedObject]: + return self._read(self.mmaps_file(), MMap) + + def events(self) -> list[Event]: + def parse_registers(event: Frame) -> Union[int, dict[str, int]]: + arch = event.arch + if arch == rr_trace.Arch.x8664: + regs = parse_x64_registers(event.registers.raw) + return regs['rip'], regs + if arch == rr_trace.Arch.aarch64: + regs = parse_aarch64_registers(event.registers.raw) + return regs['pc'], regs + raise NotImplementedError(f'Unable to parse registers for architecture {arch}') + + def parse_memory_writes(event: Frame) -> dict[int, int]: + writes = {} + for raw_write in event.memWrites: + writes[int(raw_write.addr)] = int(raw_write.size) + return writes + + events = [] + raw_events = self.raw_events() + for raw_event in raw_events: + pc, registers = parse_registers(raw_event) + mem_writes = parse_memory_writes(raw_event) + + event_type = raw_event.event.which() + if event_type == 'syscall' and raw_event.arch == rr_trace.Arch.x8664: + # On entry: substitute orig_rax for RAX + if raw_event.event.syscall.state == rr_trace.SyscallState.entering: + registers['rax'] = registers['orig_rax'] + del registers['orig_rax'] + + event = Event(pc, + raw_event.tid, + raw_event.arch, + event_type, + registers, mem_writes) + events.append(event) + + return events + diff --git a/src/focaccia/lldb_target.py b/src/focaccia/lldb_target.py index c5042d5..940b3d9 100644 --- a/src/focaccia/lldb_target.py +++ b/src/focaccia/lldb_target.py @@ -1,10 +1,16 @@ import os +import logging import lldb from .arch import supported_architectures from .snapshot import ProgramState +logger = logging.getLogger('focaccia-lldb-target') +debug = logger.debug +info = logger.info +warn = logger.warn + class MemoryMap: """Description of a range of mapped memory. @@ -44,49 +50,53 @@ class LLDBConcreteTarget: x86.archname: x86.decompose_rflags, } + register_retries = { + aarch64.archname: {}, + x86.archname: { + "rflags": ["eflags"] + } + } + def __init__(self, - executable: str, - argv: list[str] = [], - envp: list[str] | None = None): + debugger: lldb.SBDebugger, + target: lldb.SBTarget, + process: lldb.SBProcess): """Construct an LLDB concrete target. Stop at entry. - :param argv: List of arguements. Does NOT include the conventional - executable name as the first entry. - :param envp: List of environment entries. Defaults to current - `os.environ` if `None`. - :raises RuntimeError: If the process is unable to launch. + :param debugger: LLDB SBDebugger object representing an initialized debug session. + :param target: LLDB SBTarget object representing an initialized target for the debugger. + :param process: LLDB SBProcess object representing an initialized process (either local or remote). """ - if envp is None: - envp = [f'{k}={v}' for k, v in os.environ.items()] + self.debugger = debugger + self.target = target + self.process = process - self.debugger = lldb.SBDebugger.Create() - self.debugger.SetAsync(False) - self.target = self.debugger.CreateTargetWithFileAndArch(executable, - lldb.LLDB_ARCH_DEFAULT) self.module = self.target.FindModule(self.target.GetExecutable()) self.interpreter = self.debugger.GetCommandInterpreter() # Set up objects for process execution - self.error = lldb.SBError() self.listener = self.debugger.GetListener() - self.process = self.target.Launch(self.listener, - argv, envp, # argv, envp - None, None, None, # stdin, stdout, stderr - None, # working directory - 0, - True, self.error) - if not self.process.IsValid(): - raise RuntimeError(f'[In LLDBConcreteTarget.__init__]: Failed to' - f' launch process.') # Determine current arch - self.archname = self.target.GetPlatform().GetTriple().split('-')[0] - if self.archname not in supported_architectures: - err = f'LLDBConcreteTarget: Architecture {self.archname} is not' \ + self.archname = self.determine_arch() + self.arch = supported_architectures[self.archname] + + def determine_arch(self): + archname = self.target.GetPlatform().GetTriple().split('-')[0] + if archname not in supported_architectures: + err = f'LLDBConcreteTarget: Architecture {archname} is not' \ f' supported by Focaccia.' print(f'[ERROR] {err}') raise NotImplementedError(err) - self.arch = supported_architectures[self.archname] + return archname + + def determine_name(self) -> str: + return self.process.GetTarget().GetExecutable().fullpath + + def determine_arguments(self): + launch_info = self.target.GetLaunchInfo() + argc = self.target.GetLaunchInfo().GetNumArguments() + return [launch_info.GetArgumentAtIndex(i) for i in range(argc)] def is_exited(self): """Signals whether the concrete process has exited. @@ -99,21 +109,24 @@ class LLDBConcreteTarget: """Continue execution of the concrete process.""" state = self.process.GetState() if state == lldb.eStateExited: - raise RuntimeError(f'Tried to resume process execution, but the' - f' process has already exited.') - assert(state == lldb.eStateStopped) + raise RuntimeError('Tried to resume process execution, but the' + ' process has already exited.') self.process.Continue() def step(self): """Step forward by a single instruction.""" - thread: lldb.SBThread = self.process.GetThreadAtIndex(0) + thread: lldb.SBThread = self.process.GetSelectedThread() thread.StepInstruction(False) def run_until(self, address: int) -> None: """Continue execution until the address is arrived, ignores other breakpoints""" bp = self.target.BreakpointCreateByAddress(address) - while self.read_register("pc") != address: + while True: self.run() + if self.is_exited(): + return + if self.read_register('pc') == address: + break self.target.BreakpointDelete(bp.GetID()) def record_snapshot(self) -> ProgramState: @@ -148,12 +161,20 @@ class LLDBConcreteTarget: :raise ConcreteRegisterError: If no register with the specified name can be found. """ - frame = self.process.GetThreadAtIndex(0).GetFrameAtIndex(0) - reg = frame.FindRegister(regname) + debug(f'Accessing register {regname}') + + frame = self.process.GetSelectedThread().GetFrameAtIndex(0) + + retry_list = self.register_retries[self.archname].get(regname, []) + error_msg = f'[In LLDBConcreteTarget._get_register]: Register {regname} not found' + + reg = None + for name in [regname, *retry_list]: + reg = frame.FindRegister(name) + if reg.IsValid(): + break if not reg.IsValid(): - raise ConcreteRegisterError( - f'[In LLDBConcreteTarget._get_register]: Register {regname}' - f' not found.') + raise ConcreteRegisterError(error_msg) return reg def read_flags(self) -> dict[str, int | bool]: @@ -223,7 +244,7 @@ class LLDBConcreteTarget: f'[In LLDBConcreteTarget.write_register]: Unable to set' f' {regname} to value {hex(value)}!') - def read_memory(self, addr, size): + def read_memory(self, addr: int, size: int) -> bytes: """Read bytes from memory. :raise ConcreteMemoryError: If unable to read `size` bytes from `addr`. @@ -238,7 +259,7 @@ class LLDBConcreteTarget: else: return bytes(reversed(content)) - def write_memory(self, addr, value: bytes): + def write_memory(self, addr: int, value: bytes): """Write bytes to memory. :raise ConcreteMemoryError: If unable to write at `addr`. @@ -316,3 +337,82 @@ class LLDBConcreteTarget: if s.GetStartAddress().GetLoadAddress(self.target) > addr: addr = s.GetEndAddress().GetLoadAddress(self.target) return addr + + def get_disassembly(self, addr: int) -> str: + inst: lldb.SBInstruction = self.target.ReadInstructions(lldb.SBAddress(addr, self.target), 1, 'intel')[0] + mnemonic: str = inst.GetMnemonic(self.target).upper() + operands: str = inst.GetOperands(self.target).upper() + operands = operands.replace("0X", "0x") + return f'{mnemonic} {operands}' + + def get_disassembly_bytes(self, addr: int): + error = lldb.SBError() + buf = self.process.ReadMemory(addr, 64, error) + inst = self.target.GetInstructions(lldb.SBAddress(addr, self.target), buf)[0] + return inst.GetData(self.target).ReadRawData(error, 0, inst.GetByteSize()) + + def get_instruction_size(self, addr: int) -> int: + inst = self.target.ReadInstructions(lldb.SBAddress(addr, self.target), 1, 'intel')[0] + return inst.GetByteSize() + + def get_current_tid(self) -> int: + thread: lldb.SBThread = self.process.GetSelectedThread() + return thread.GetThreadID() + +class LLDBLocalTarget(LLDBConcreteTarget): + def __init__(self, + executable: str, + argv: list[str] = [], + envp: list[str] | None = None): + """Construct an LLDB local target. Stop at entry. + + :param executable: Name of executable to run under LLDB. + :param argv: List of arguements. Does NOT include the conventional + executable name as the first entry. + :param envp: List of environment entries. Defaults to current + `os.environ` if `None`. + :raises RuntimeError: If the process is unable to launch. + """ + if envp is None: + envp = [f'{k}={v}' for k, v in os.environ.items()] + + debugger = lldb.SBDebugger.Create() + debugger.SetAsync(False) + target = debugger.CreateTargetWithFileAndArch(executable, lldb.LLDB_ARCH_DEFAULT) + + # Set up objects for process execution + error = lldb.SBError() + process = target.Launch(debugger.GetListener(), + argv, envp, # argv, envp + None, None, None, # stdin, stdout, stderr + None, # working directory + 0, + True, error) + + if not target.process.IsValid(): + raise RuntimeError(f'Failed to launch LLDB target: {error.GetCString()}') + + super().__init__(debugger, target, process) + +class LLDBRemoteTarget(LLDBConcreteTarget): + def __init__(self, remote: str, executable: str | None = None): + """Construct an LLDB remote target. Stop at entry. + + :param remote: String of the form <remote_name>:<port> (e.g. localhost:12345). + :raises RuntimeError: If failing to attach to a remote debug session. + """ + debugger = lldb.SBDebugger.Create() + debugger.SetAsync(False) + target = debugger.CreateTarget(executable) + + # Set up objects for process execution + error = lldb.SBError() + process = target.ConnectRemote(debugger.GetListener(), + f'connect://{remote}', + None, + error) + if not target.process.IsValid(): + raise RuntimeError(f'Failed to connect via LLDB to remote target: {error.GetCString()}') + + super().__init__(debugger, target, process) + diff --git a/src/focaccia/miasm_util.py b/src/focaccia/miasm_util.py index 8e9d1ed..cacc6e8 100644 --- a/src/focaccia/miasm_util.py +++ b/src/focaccia/miasm_util.py @@ -136,11 +136,7 @@ class MiasmSymbolResolver: return regname def resolve_register(self, regname: str) -> int | None: - try: - return self._state.read_register(self._miasm_to_regname(regname)) - except RegisterAccessError as err: - print(f'Not a register: {regname} ({err})') - return None + return self._state.read_register(self._miasm_to_regname(regname)) def resolve_memory(self, addr: int, size: int) -> bytes | None: try: diff --git a/src/focaccia/parser.py b/src/focaccia/parser.py index e9e5e0c..c157a36 100644 --- a/src/focaccia/parser.py +++ b/src/focaccia/parser.py @@ -201,3 +201,4 @@ def parse_box64(stream: TextIO, arch: Arch) -> Trace[ProgramState]: states[-1].set_register(regname, int(value, 16)) return Trace(states, _make_unknown_env()) + diff --git a/src/focaccia/symbolic.py b/src/focaccia/symbolic.py index 7b6098e..b46d9e2 100644 --- a/src/focaccia/symbolic.py +++ b/src/focaccia/symbolic.py @@ -1,11 +1,12 @@ -"""Tools and utilities for symbolic execution with Miasm.""" +"""Tools and utilities for execution with Miasm.""" from __future__ import annotations -from typing import Iterable -import logging + import sys +import logging + +from pathlib import Path -from miasm.analysis.binary import ContainerELF from miasm.analysis.machine import Machine from miasm.core.cpu import instruction as miasm_instr from miasm.core.locationdb import LocationDB @@ -14,20 +15,29 @@ from miasm.ir.ir import Lifter from miasm.ir.symbexec import SymbolicExecutionEngine from .arch import Arch, supported_architectures -from .lldb_target import LLDBConcreteTarget, \ - ConcreteRegisterError, \ - ConcreteMemoryError +from .lldb_target import ( + LLDBConcreteTarget, + LLDBLocalTarget, + LLDBRemoteTarget, + ConcreteRegisterError, + ConcreteMemoryError, +) from .miasm_util import MiasmSymbolResolver, eval_expr, make_machine -from .snapshot import ProgramState, ReadableProgramState, \ - RegisterAccessError, MemoryAccessError +from .snapshot import ReadableProgramState, RegisterAccessError, MemoryAccessError from .trace import Trace, TraceEnvironment +from .utils import timebound, TimeoutError logger = logging.getLogger('focaccia-symbolic') +debug = logger.debug +info = logger.info warn = logger.warn # Disable Miasm's disassembly logger logging.getLogger('asmblock').setLevel(logging.CRITICAL) +class ValidationError(Exception): + pass + def eval_symbol(symbol: Expr, conc_state: ReadableProgramState) -> int: """Evaluate a symbol based on a concrete reference state. @@ -52,8 +62,8 @@ def eval_symbol(symbol: Expr, conc_state: ReadableProgramState) -> int: return self._state.read_memory(addr, size) def resolve_location(self, loc): - raise ValueError(f'[In eval_symbol]: Unable to evaluate symbols' - f' that contain IR location expressions.') + raise ValueError('[In eval_symbol]: Unable to evaluate symbols' + ' that contain IR location expressions.') res = eval_expr(symbol, ConcreteStateWrapper(conc_state)) @@ -61,7 +71,8 @@ def eval_symbol(symbol: Expr, conc_state: ReadableProgramState) -> int: # but ExprLocs are disallowed by the # ConcreteStateWrapper if not isinstance(res, ExprInt): - raise Exception(f'{res} from symbol {symbol} is not an instance of ExprInt but only ExprInt can be evaluated') + raise Exception(f'{res} from symbol {symbol} is not an instance of ExprInt' + f' but only ExprInt can be evaluated') return int(res) class Instruction: @@ -116,18 +127,24 @@ class Instruction: class SymbolicTransform: """A symbolic transformation mapping one program state to another.""" def __init__(self, + tid: int, transform: dict[Expr, Expr], instrs: list[Instruction], arch: Arch, from_addr: int, to_addr: int): """ - :param state: The symbolic transformation in the form of a SimState - object. - :param first_inst: An instruction address. The transformation - represents the modifications to the program state - performed by this instruction. + :param tid: The thread ID that executed the instructions effecting the transformation. + :param transform: A map of input symbolic expressions and output symbolic expressions. + :param instrs: A list of instructions. The transformation + represents the collective modifications to the program state + performed by these instructions. + :param arch: The architecture of the symbolic transformation. + :param from_addr: The starting address of the instruction effecting the symbolic + transformation. + :param to_addr: The final address of the last instruction in the instructions list. """ + self.tid = tid self.arch = arch self.addr = from_addr @@ -376,15 +393,16 @@ class SymbolicTransform: try: return Instruction.from_string(text, arch, offset=0, length=length) except Exception as err: - warn(f'[In SymbolicTransform.from_json] Unable to parse' - f' instruction string "{text}": {err}.') - return None + # Note: from None disables chaining in traceback + raise ValueError(f'[In SymbolicTransform.from_json] Unable to parse' + f' instruction string "{text}": {err}.') from None + tid = int(data['tid']) arch = supported_architectures[data['arch']] start_addr = int(data['from_addr']) end_addr = int(data['to_addr']) - t = SymbolicTransform({}, [], arch, start_addr, end_addr) + t = SymbolicTransform(tid, {}, [], arch, start_addr, end_addr) t.changed_regs = { name: parse(val) for name, val in data['regs'].items() } t.changed_mem = { parse(addr): parse(val) for addr, val in data['mem'].items() } instrs = [decode_inst(b, arch) for b in data['instructions']] @@ -404,15 +422,15 @@ class SymbolicTransform: try: return [inst.length, inst.to_string()] except Exception as err: - warn(f'[In SymbolicTransform.to_json] Unable to serialize' - f' "{inst}" as string: {err}. This instruction will not' - f' be serialized.') - return None + # Note: from None disables chaining in traceback + raise Exception(f'[In SymbolicTransform.to_json] Unable to serialize' + f' "{inst}" as string: {err}') from None instrs = [encode_inst(inst) for inst in self.instructions] instrs = [inst for inst in instrs if inst is not None] return { 'arch': self.arch.archname, + 'tid': self.tid, 'from_addr': self.range[0], 'to_addr': self.range[1], 'instructions': instrs, @@ -422,7 +440,7 @@ class SymbolicTransform: def __repr__(self) -> str: start, end = self.range - res = f'Symbolic state transformation {hex(start)} -> {hex(end)}:\n' + res = f'Symbolic state transformation [{self.tid}] {start} -> {end}:\n' res += ' [Symbols]\n' for reg, expr in self.changed_regs.items(): res += f' {reg:6s} = {expr}\n' @@ -445,8 +463,8 @@ class MemoryBinstream: def __getitem__(self, key: int | slice): if isinstance(key, slice): - return self._state.read_memory(key.start, key.stop - key.start) - return self._state.read_memory(key, 1) + return self._state.read_instructions(key.start, key.stop - key.start) + return self._state.read_instructions(key, 1) class DisassemblyContext: def __init__(self, target: ReadableProgramState): @@ -463,9 +481,14 @@ class DisassemblyContext: self.mdis.follow_call = True self.lifter = self.machine.lifter(self.loc_db) + def disassemble(self, address: int) -> Instruction: + miasm_instr = self.mdis.dis_instr(address) + return Instruction(miasm_instr, self.machine, self.arch, self.loc_db) + def run_instruction(instr: miasm_instr, conc_state: MiasmSymbolResolver, - lifter: Lifter) \ + lifter: Lifter, + force: bool = False) \ -> tuple[ExprInt | None, dict[Expr, Expr]]: """Compute the symbolic equation of a single instruction. @@ -578,8 +601,12 @@ def run_instruction(instr: miasm_instr, loc = lifter.add_instr_to_ircfg(instr, ircfg, None, False) assert(isinstance(loc, Expr) or isinstance(loc, LocKey)) except NotImplementedError as err: - warn(f'[WARNING] Unable to lift instruction {instr}: {err}. Skipping.') - return None, {} # Create an empty transform for the instruction + msg = f'Unable to lift instruction {instr}: {err}' + if force: + warn(f'{msg}. Skipping') + return None, {} + else: + raise Exception(msg) # Execute instruction symbolically new_pc, modified = execute_location(loc) @@ -587,114 +614,319 @@ def run_instruction(instr: miasm_instr, return new_pc, modified -class _LLDBConcreteState(ReadableProgramState): - """A wrapper around `LLDBConcreteTarget` that provides access via a - `ReadableProgramState` interface. Reads values directly from an LLDB - target. This saves us the trouble of recording a full program state, and - allows us instead to read values from LLDB on demand. - """ +class SpeculativeTracer(ReadableProgramState): def __init__(self, target: LLDBConcreteTarget): super().__init__(target.arch) - self._target = target + self.target = target + self.pc = target.read_register('pc') + self.speculative_pc: int | None = None + self.speculative_count: int = 0 + + self.read_cache = {} + + def speculate(self, new_pc): + self.read_cache.clear() + if new_pc is None: + self.progress_execution() + self.target.step() + self.pc = self.target.read_register('pc') + self.speculative_pc = None + self.speculative_count = 0 + return + + new_pc = int(new_pc) + self.speculative_pc = new_pc + self.speculative_count += 1 + + def progress_execution(self) -> None: + if self.speculative_pc is not None and self.speculative_count != 0: + debug(f'Updating PC to {hex(self.speculative_pc)}') + if self.speculative_count == 1: + self.target.step() + else: + self.target.run_until(self.speculative_pc) + + self.pc = self.speculative_pc + self.speculative_pc = None + self.speculative_count = 0 + + self.read_cache.clear() + + def run_until(self, addr: int): + if self.speculative_pc: + raise Exception('Attempting manual execution with speculative execution enabled') + self.target.run_until(addr) + self.pc = addr + + def step(self): + self.progress_execution() + if self.target.is_exited(): + return + self.target.step() + self.pc = self.target.read_register('pc') + + def _cache(self, name: str, value): + self.read_cache[name] = value + return value + + def read_pc(self) -> int: + if self.speculative_pc is not None: + return self.speculative_pc + return self.pc + + def read_flags(self) -> dict[str, int | bool]: + if 'flags' in self.read_cache: + return self.read_cache['flags'] + self.progress_execution() + return self._cache('flags', self.target.read_flags()) def read_register(self, reg: str) -> int: regname = self.arch.to_regname(reg) if regname is None: raise RegisterAccessError(reg, f'Not a register name: {reg}') - try: - return self._target.read_register(regname) - except ConcreteRegisterError: - raise RegisterAccessError(regname, '') + if reg in self.read_cache: + return self.read_cache[reg] + + self.progress_execution() + return self._cache(reg, self.target.read_register(regname)) + + def write_register(self, regname: str, value: int): + self.progress_execution() + self.read_cache.pop(regname, None) + self.target.write_register(regname, value) + + def read_instructions(self, addr: int, size: int) -> bytes: + return self.target.read_memory(addr, size) def read_memory(self, addr: int, size: int) -> bytes: - try: - return self._target.read_memory(addr, size) - except ConcreteMemoryError: - raise MemoryAccessError(addr, size, 'Unable to read memory from LLDB.') - -def collect_symbolic_trace(env: TraceEnvironment, - start_addr: int | None = None - ) -> Trace[SymbolicTransform]: - """Execute a program and compute state transformations between executed - instructions. - - :param binary: The binary to trace. - :param args: Arguments to the program. + self.progress_execution() + cache_name = f'{addr}_{size}' + if cache_name in self.read_cache: + return self.read_cache[cache_name] + return self._cache(cache_name, self.target.read_memory(addr, size)) + + def write_memory(self, addr: int, value: bytes): + self.progress_execution() + self.read_cache.pop(addr, None) + self.target.write_memory(addr, value) + + def __getattr__(self, name: str): + return getattr(self.target, name) + +class SymbolicTracer: + """A symbolic tracer that uses `LLDBConcreteTarget` with Miasm to simultaneously execute a + program with concrete state and collect its symbolic transforms """ - binary = env.binary_name - - # Set up concrete reference state - target = LLDBConcreteTarget(binary, env.argv, env.envp) - if start_addr is not None: - target.run_until(start_addr) - lldb_state = _LLDBConcreteState(target) - - ctx = DisassemblyContext(lldb_state) - arch = ctx.arch - - # Trace concolically - strace: list[SymbolicTransform] = [] - while not target.is_exited(): - pc = target.read_register('pc') - - # Disassemble instruction at the current PC - try: - instr = ctx.mdis.dis_instr(pc) - except: - err = sys.exc_info()[1] - warn(f'Unable to disassemble instruction at {hex(pc)}: {err}.' - f' Skipping.') - target.step() - continue - - # Run instruction - conc_state = MiasmSymbolResolver(lldb_state, ctx.loc_db) - new_pc, modified = run_instruction(instr, conc_state, ctx.lifter) - - # Create symbolic transform - instruction = Instruction(instr, ctx.machine, ctx.arch, ctx.loc_db) - if new_pc is None: - new_pc = pc + instruction.length - else: - new_pc = int(new_pc) - transform = SymbolicTransform(modified, [instruction], arch, pc, new_pc) - strace.append(transform) - - # Predict next concrete state. - # We verify the symbolic execution backend on the fly for some - # additional protection from bugs in the backend. - if env.cross_validate: - predicted_regs = transform.eval_register_transforms(lldb_state) - predicted_mems = transform.eval_memory_transforms(lldb_state) - - # Step forward - target.step() - if target.is_exited(): - break - + def __init__(self, + env: TraceEnvironment, + remote: str | None=None, + force: bool=False, + cross_validate: bool=False): + self.env = env + self.force = force + self.remote = remote + self.cross_validate = cross_validate + self.target = SpeculativeTracer(self.create_debug_target()) + + self.nondet_events = self.env.detlog.events() + self.next_event: int | None = None + + def create_debug_target(self) -> LLDBConcreteTarget: + binary = self.env.binary_name + if self.remote is False: + debug(f'Launching local debug target {binary} {self.env.argv}') + debug(f'Environment: {self.env}') + return LLDBLocalTarget(binary, self.env.argv, self.env.envp) + + debug(f'Connecting to remote debug target {self.remote}') + target = LLDBRemoteTarget(self.remote, binary) + + module_name = target.determine_name() + binary = str(Path(self.env.binary_name).resolve()) + if binary != module_name: + warn(f'Discovered binary name {module_name} differs from specified name {binary}') + + return target + + def predict_next_state(self, instruction: Instruction, transform: SymbolicTransform): + debug(f'Evaluating register and memory transforms for {instruction} to cross-validate') + predicted_regs = transform.eval_register_transforms(self.target) + predicted_mems = transform.eval_memory_transforms(self.target) + return predicted_regs, predicted_mems + + def validate(self, + instruction: Instruction, + transform: SymbolicTransform, + predicted_regs: dict[str, int], + predicted_mems: dict[int, bytes]): # Verify last generated transform by comparing concrete state against # predicted values. - assert(len(strace) > 0) - if env.cross_validate: - for reg, val in predicted_regs.items(): - conc_val = lldb_state.read_register(reg) - if conc_val != val: - warn(f'Symbolic execution backend generated false equation for' - f' [{hex(instruction.addr)}]: {instruction}:' - f' Predicted {reg} = {hex(val)}, but the' - f' concrete state has value {reg} = {hex(conc_val)}.' - f'\nFaulty transformation: {transform}') - for addr, data in predicted_mems.items(): - conc_data = lldb_state.read_memory(addr, len(data)) - if conc_data != data: - warn(f'Symbolic execution backend generated false equation for' - f' [{hex(instruction.addr)}]: {instruction}: Predicted' - f' mem[{hex(addr)}:{hex(addr+len(data))}] = {data},' - f' but the concrete state has value' - f' mem[{hex(addr)}:{hex(addr+len(data))}] = {conc_data}.' - f'\nFaulty transformation: {transform}') - raise Exception() - - return Trace(strace, env) + if self.target.is_exited(): + return + + debug('Cross-validating symbolic transforms by comparing actual to predicted values') + for reg, val in predicted_regs.items(): + conc_val = self.target.read_register(reg) + if conc_val != val: + raise ValidationError(f'Symbolic execution backend generated false equation for' + f' [{hex(instruction.addr)}]: {instruction}:' + f' Predicted {reg} = {hex(val)}, but the' + f' concrete state has value {reg} = {hex(conc_val)}.' + f'\nFaulty transformation: {transform}') + for addr, data in predicted_mems.items(): + conc_data = self.target.read_memory(addr, len(data)) + if conc_data != data: + raise ValidationError(f'Symbolic execution backend generated false equation for' + f' [{hex(instruction.addr)}]: {instruction}: Predicted' + f' mem[{hex(addr)}:{hex(addr+len(data))}] = {data},' + f' but the concrete state has value' + f' mem[{hex(addr)}:{hex(addr+len(data))}] = {conc_data}.' + f'\nFaulty transformation: {transform}') + + def progress_event(self) -> None: + if (self.next_event + 1) < len(self.nondet_events): + self.next_event += 1 + debug(f'Next event to handle at index {self.next_event}') + else: + self.next_event = None + + def post_event(self) -> None: + if self.next_event: + if self.nondet_events[self.next_event].pc == 0: + # Exit sequence + debug('Completed exit event') + self.target.run() + + debug(f'Completed handling event at index {self.next_event}') + self.progress_event() + + def is_stepping_instr(self, pc: int, instruction: Instruction) -> bool: + if self.nondet_events: + pc = pc + instruction.length # detlog reports next pc for each event + if self.next_event and self.nondet_events[self.next_event].match(pc, self.target): + debug('Current instruction matches next event; stepping through it') + self.progress_event() + return True + else: + if self.target.arch.is_instr_syscall(str(instruction)): + return True + return False + + def progress(self, new_pc, step: bool = False) -> int | None: + self.target.speculate(new_pc) + if step: + self.target.progress_execution() + if self.target.is_exited(): + return None + return self.target.read_pc() + + def trace(self, + start_addr: int | None = None, + stop_addr: int | None = None, + time_limit: int | None = None) -> Trace[SymbolicTransform]: + """Execute a program and compute state transformations between executed + instructions. + + :param start_addr: Address from which to start tracing. + :param stop_addr: Address until which to trace. + """ + # Set up concrete reference state + if start_addr is not None: + self.target.run_until(start_addr) + + for i in range(len(self.nondet_events)): + if self.nondet_events[i].pc == self.target.read_pc(): + self.next_event = i+1 + if self.next_event >= len(self.nondet_events): + break + + debug(f'Starting from event {self.nondet_events[i]} onwards') + break + + ctx = DisassemblyContext(self.target) + arch = ctx.arch + + if logger.isEnabledFor(logging.DEBUG): + debug('Tracing program with the following non-deterministic events') + for event in self.nondet_events: + debug(event) + + # Trace concolically + strace: list[SymbolicTransform] = [] + while not self.target.is_exited(): + pc = self.target.read_pc() + + if stop_addr is not None and pc == stop_addr: + break + + assert(pc != 0) + + # Disassemble instruction at the current PC + tid = self.target.get_current_tid() + try: + instruction = ctx.disassemble(pc) + info(f'[{tid}] Disassembled instruction {instruction} at {hex(pc)}') + except: + err = sys.exc_info()[1] + + # Try to recovery by using the LLDB disassembly instead + try: + alt_disas = self.target.get_disassembly(pc) + instruction = Instruction.from_string(alt_disas, ctx.arch, pc, + self.target.get_instruction_size(pc)) + info(f'[{tid}] Disassembled instruction {instruction} at {hex(pc)}') + except: + if self.force: + if alt_disas: + warn(f'[{tid}] Unable to handle instruction {alt_disas} at {hex(pc)} in Miasm.' + f' Skipping.') + else: + warn(f'[{tid}] Unable to disassemble instruction {hex(pc)}: {err}.' + f' Skipping.') + self.target.step() + continue + raise # forward exception + + is_event = self.is_stepping_instr(pc, instruction) + + # Run instruction + conc_state = MiasmSymbolResolver(self.target, ctx.loc_db) + + try: + new_pc, modified = timebound(time_limit, run_instruction, instruction.instr, conc_state, ctx.lifter) + except TimeoutError: + warn(f'Running instruction {instruction} took longer than {time_limit} second. Skipping') + new_pc, modified = None, {} + + if self.cross_validate and new_pc: + # Predict next concrete state. + # We verify the symbolic execution backend on the fly for some + # additional protection from bugs in the backend. + new_pc = int(new_pc) + transform = SymbolicTransform(tid, modified, [instruction], arch, pc, new_pc) + pred_regs, pred_mems = self.predict_next_state(instruction, transform) + self.progress(new_pc, step=is_event) + + try: + self.validate(instruction, transform, pred_regs, pred_mems) + except ValidationError as e: + if self.force: + warn(f'Cross-validation failed: {e}') + continue + raise + else: + new_pc = self.progress(new_pc, step=is_event) + if new_pc is None: + transform = SymbolicTransform(tid, modified, [instruction], arch, pc, 0) + strace.append(transform) + continue # we're done + transform = SymbolicTransform(tid, modified, [instruction], arch, pc, new_pc) + + strace.append(transform) + + if is_event: + self.post_event() + + return Trace(strace, self.env) diff --git a/src/focaccia/tools/_qemu_tool.py b/src/focaccia/tools/_qemu_tool.py index 631c79b..2c7a1de 100644 --- a/src/focaccia/tools/_qemu_tool.py +++ b/src/focaccia/tools/_qemu_tool.py @@ -7,6 +7,7 @@ work to do. """ import gdb +import logging import traceback from typing import Iterable @@ -21,6 +22,11 @@ from focaccia.utils import print_result from validate_qemu import make_argparser, verbosity +logger = logging.getLogger('focaccia-qemu-validator') +debug = logger.debug +info = logger.info +warn = logger.warning + class GDBProgramState(ReadableProgramState): from focaccia.arch import aarch64, x86 @@ -263,15 +269,17 @@ def collect_conc_trace(gdb: GDBServerStateIterator, \ # Drop the concrete state if no address in the symbolic trace # matches if next_i is None: - print(f'Warning: Dropping concrete state {hex(pc)}, as no' - f' matching instruction can be found in the symbolic' - f' reference trace.') + warn(f'Dropping concrete state {hex(pc)}, as no' + f' matching instruction can be found in the symbolic' + f' reference trace.') cur_state = next(state_iter) pc = cur_state.read_register('pc') continue # Otherwise, jump to the next matching symbolic state symb_i += next_i + 1 + if symb_i >= len(strace): + break assert(cur_state.read_register('pc') == strace[symb_i].addr) states.append(record_minimal_snapshot( @@ -282,23 +290,33 @@ def collect_conc_trace(gdb: GDBServerStateIterator, \ matched_transforms.append(strace[symb_i]) cur_state = next(state_iter) symb_i += 1 + if symb_i >= len(strace): + break except StopIteration: break except Exception as e: print(traceback.format_exc()) raise e + # Note: this may occur when symbolic traces were gathered with a stop address + if symb_i >= len(strace): + warn(f'QEMU executed more states than native execution: {symb_i} vs {len(strace)-1}') + return states, matched_transforms def main(): args = make_argparser().parse_args() + + logging_level = getattr(logging, args.error_level.upper(), logging.INFO) + logging.basicConfig(level=logging_level, force=True) try: gdb_server = GDBServerStateIterator(args.remote) - except: - raise Exception('Unable to perform basic GDB setup') + except Exception as e: + raise Exception(f'Unable to perform basic GDB setup: {e}') from None try: + executable: str | None = None if args.executable is None: executable = gdb_server.binary else: @@ -307,39 +325,40 @@ def main(): argv = [] # QEMU's GDB stub does not support 'info proc cmdline' envp = [] # Can't get the remote target's environment env = TraceEnvironment(executable, argv, envp, '?') - except: - raise Exception(f'Unable to create trace environment for executable {executable}') + except Exception as e: + raise Exception(f'Unable to create trace environment for executable {executable}: {e}') from None # Read pre-computed symbolic trace try: with open(args.symb_trace, 'r') as strace: symb_transforms = parser.parse_transformations(strace) - except: - raise Exception('Failed to parse state transformations from native trace') + except Exception as e: + raise Exception(f'Failed to parse state transformations from native trace: {e}') from None # Use symbolic trace to collect concrete trace from QEMU try: conc_states, matched_transforms = collect_conc_trace( gdb_server, symb_transforms.states) - except: - raise Exception(f'Failed to collect concolic trace from QEMU') + except Exception as e: + raise Exception(f'Failed to collect concolic trace from QEMU: {e}') from None # Verify and print result if not args.quiet: try: res = compare_symbolic(conc_states, matched_transforms) print_result(res, verbosity[args.error_level]) - except: - raise Exception('Error occured when comparing with symbolic equations') + except Exception as e: + raise Exception('Error occured when comparing with symbolic equations: {e}') from None if args.output: from focaccia.parser import serialize_snapshots try: with open(args.output, 'w') as file: serialize_snapshots(Trace(conc_states, env), file) - except: - raise Exception(f'Unable to serialize snapshots to file {args.output}') + except Exception as e: + raise Exception(f'Unable to serialize snapshots to file {args.output}: {e}') from None if __name__ == "__main__": main() + diff --git a/src/focaccia/tools/capture_transforms.py b/src/focaccia/tools/capture_transforms.py index 6ef0eaa..10dfdb2 100755 --- a/src/focaccia/tools/capture_transforms.py +++ b/src/focaccia/tools/capture_transforms.py @@ -1,9 +1,11 @@ #!/usr/bin/env python3 +import sys import argparse +import logging from focaccia import parser, utils -from focaccia.symbolic import collect_symbolic_trace +from focaccia.symbolic import SymbolicTracer from focaccia.trace import TraceEnvironment def main(): @@ -20,12 +22,69 @@ def main(): default=False, action='store_true', help='Cross-validate symbolic equations with concrete values') + prog.add_argument('-r', '--remote', + default=False, + help='Remote target to trace (e.g. 127.0.0.1:12345)') + prog.add_argument('-l', '--deterministic-log', + help='Path of the directory storing the deterministic log produced by RR') + prog.add_argument('--log-level', + help='Set the logging level') + prog.add_argument('--force', + default=False, + action='store_true', + help='Force Focaccia to continue tracing even when something goes wrong') + prog.add_argument('--debug', + default=False, + action='store_true', + help='Capture transforms in debug mode to identify errors in Focaccia itself') + prog.add_argument('--start-address', + default=None, + type=utils.to_int, + help='Set a starting address from which to collect the symoblic trace') + prog.add_argument('--stop-address', + default=None, + type=utils.to_int, + help='Set a final address up until which to collect the symoblic trace') + prog.add_argument('--insn-time-limit', + default=None, + type=utils.to_num, + help='Set a time limit for executing an instruction symbolically, skip' + 'instruction when limit is exceeded') args = prog.parse_args() - env = TraceEnvironment(args.binary, args.args, args.cross_validate, utils.get_envp()) - trace = collect_symbolic_trace(env, None) + if args.debug: + logging.basicConfig(level=logging.DEBUG) # will be override by --log-level + + # Set default logging level + if args.log_level: + level = getattr(logging, args.log_level.upper(), logging.INFO) + logging.basicConfig(level=level, force=True) + else: + logging.basicConfig(level=logging.INFO) + + detlog = None + if args.deterministic_log: + from focaccia.deterministic import DeterministicLog + detlog = DeterministicLog(args.deterministic_log) + else: + class NullDeterministicLog: + def __init__(self): pass + def events_file(self): return None + def tasks_file(self): return None + def mmaps_file(self): return None + def events(self): return [] + def tasks(self): return [] + def mmaps(self): return [] + detlog = NullDeterministicLog() + + env = TraceEnvironment(args.binary, args.args, utils.get_envp(), nondeterminism_log=detlog) + tracer = SymbolicTracer(env, remote=args.remote, cross_validate=args.debug, + force=args.force) + + trace = tracer.trace(start_addr=args.start_address, + stop_addr=args.stop_address, + time_limit=args.insn_time_limit) + with open(args.output, 'w') as file: parser.serialize_transformations(trace, file) -if __name__ == "__main__": - main() diff --git a/src/focaccia/tools/validate_qemu.py b/src/focaccia/tools/validate_qemu.py index edef9ae..e834a6d 100755 --- a/src/focaccia/tools/validate_qemu.py +++ b/src/focaccia/tools/validate_qemu.py @@ -18,6 +18,7 @@ necessary logic to pass them to `qemu_tool.py`. import os import sys +import logging import argparse import sysconfig import subprocess @@ -104,6 +105,9 @@ def main(): if not args.guest_arch: argparser.error('--guest-arch is required when --use-socket is specified') + logging_level = getattr(logging, args.error_level.upper(), logging.INFO) + logging.basicConfig(level=logging_level, force=True) + # QEMU plugin interface start_validation_server(args.symb_trace, args.output, diff --git a/src/focaccia/trace.py b/src/focaccia/trace.py index c72d90f..4a4509d 100644 --- a/src/focaccia/trace.py +++ b/src/focaccia/trace.py @@ -10,14 +10,14 @@ class TraceEnvironment: def __init__(self, binary: str, argv: list[str], - cross_validate: bool, envp: list[str], - binary_hash: str | None = None): + binary_hash: str | None = None, + nondeterminism_log = None): self.argv = argv self.envp = envp self.binary_name = binary - self.cross_validate = cross_validate - if binary_hash is None: + self.detlog = nondeterminism_log + if binary_hash is None and self.binary_name is not None: self.binary_hash = file_hash(binary) else: self.binary_hash = binary_hash @@ -28,7 +28,6 @@ class TraceEnvironment: return cls( json['binary_name'], json['argv'], - json['cross_validate'], json['envp'], json['binary_hash'], ) @@ -39,7 +38,6 @@ class TraceEnvironment: 'binary_name': self.binary_name, 'binary_hash': self.binary_hash, 'argv': self.argv, - 'cross_validate': self.cross_validate, 'envp': self.envp, } @@ -50,13 +48,11 @@ class TraceEnvironment: return self.binary_name == other.binary_name \ and self.binary_hash == other.binary_hash \ and self.argv == other.argv \ - and self.cross_validate == other.cross_validate \ and self.envp == other.envp def __repr__(self) -> str: return f'{self.binary_name} {" ".join(self.argv)}' \ f'\n bin-hash={self.binary_hash}' \ - f'\n options=cross-validate' \ f'\n envp={repr(self.envp)}' class Trace(Generic[T]): diff --git a/src/focaccia/utils.py b/src/focaccia/utils.py index c4f6a74..c648d41 100644 --- a/src/focaccia/utils.py +++ b/src/focaccia/utils.py @@ -1,9 +1,10 @@ from __future__ import annotations -import ctypes import os -import shutil import sys +import shutil +import ctypes +import signal from functools import total_ordering from hashlib import sha256 @@ -114,3 +115,31 @@ def print_result(result, min_severity: ErrorSeverity): f' (showing {min_severity} and higher).') print('#' * 60) print() + +def to_int(value: str) -> int: + return int(value, 0) + +def to_num(value: str) -> int | float: + try: + return int(value, 0) + except: + return float(value) + +class TimeoutError(Exception): + pass + +def timebound(timeout: int | float | None, func, *args, **kwargs): + if timeout is None: + return func(*args, **kwargs) + + def _handle_timeout(signum, frame): + raise TimeoutError(f'Function exceeded {timeout} limit') + + old_handler = signal.signal(signal.SIGALRM, _handle_timeout) + signal.setitimer(signal.ITIMER_REAL, timeout) + try: + return func(*args, **kwargs) + finally: + signal.setitimer(signal.ITIMER_REAL, 0) + signal.signal(signal.SIGALRM, old_handler) + |