From 4af7a62b6c9cf876d78d4fd229b3c3c7060681f3 Mon Sep 17 00:00:00 2001 From: Theofilos Augoustis Date: Thu, 13 Nov 2025 13:57:30 +0000 Subject: Move native backend under dedicated module --- src/focaccia/lldb_target.py | 418 ------------------------------- src/focaccia/native/__init__.py | 0 src/focaccia/native/lldb_target.py | 418 +++++++++++++++++++++++++++++++ src/focaccia/native/tracer.py | 342 +++++++++++++++++++++++++ src/focaccia/symbolic.py | 359 +------------------------- src/focaccia/tools/capture_transforms.py | 2 +- 6 files changed, 768 insertions(+), 771 deletions(-) delete mode 100644 src/focaccia/lldb_target.py create mode 100644 src/focaccia/native/__init__.py create mode 100644 src/focaccia/native/lldb_target.py create mode 100644 src/focaccia/native/tracer.py diff --git a/src/focaccia/lldb_target.py b/src/focaccia/lldb_target.py deleted file mode 100644 index 940b3d9..0000000 --- a/src/focaccia/lldb_target.py +++ /dev/null @@ -1,418 +0,0 @@ -import os -import logging - -import lldb - -from .arch import supported_architectures -from .snapshot import ProgramState - -logger = logging.getLogger('focaccia-lldb-target') -debug = logger.debug -info = logger.info -warn = logger.warn - -class MemoryMap: - """Description of a range of mapped memory. - - Inspired by https://github.com/angr/angr-targets/blob/master/angr_targets/memory_map.py, - meaning we initially used angr and I wanted to keep the interface when we - switched to a different tool. - """ - def __init__(self, start_address, end_address, name, perms): - self.start_address = start_address - self.end_address = end_address - self.name = name - self.perms = perms - - def __str__(self): - return f'MemoryMap[0x{self.start_address:x}, 0x{self.end_address:x}]' \ - f': {self.name}' - -class ConcreteRegisterError(Exception): - pass - -class ConcreteMemoryError(Exception): - pass - -class ConcreteSectionError(Exception): - pass - -class LLDBConcreteTarget: - from focaccia.arch import aarch64, x86 - - flag_register_names = { - aarch64.archname: 'cpsr', - x86.archname: 'rflags', - } - - flag_register_decompose = { - aarch64.archname: aarch64.decompose_cpsr, - x86.archname: x86.decompose_rflags, - } - - register_retries = { - aarch64.archname: {}, - x86.archname: { - "rflags": ["eflags"] - } - } - - def __init__(self, - debugger: lldb.SBDebugger, - target: lldb.SBTarget, - process: lldb.SBProcess): - """Construct an LLDB concrete target. Stop at entry. - - :param debugger: LLDB SBDebugger object representing an initialized debug session. - :param target: LLDB SBTarget object representing an initialized target for the debugger. - :param process: LLDB SBProcess object representing an initialized process (either local or remote). - """ - self.debugger = debugger - self.target = target - self.process = process - - self.module = self.target.FindModule(self.target.GetExecutable()) - self.interpreter = self.debugger.GetCommandInterpreter() - - # Set up objects for process execution - self.listener = self.debugger.GetListener() - - # Determine current arch - self.archname = self.determine_arch() - self.arch = supported_architectures[self.archname] - - def determine_arch(self): - archname = self.target.GetPlatform().GetTriple().split('-')[0] - if archname not in supported_architectures: - err = f'LLDBConcreteTarget: Architecture {archname} is not' \ - f' supported by Focaccia.' - print(f'[ERROR] {err}') - raise NotImplementedError(err) - return archname - - def determine_name(self) -> str: - return self.process.GetTarget().GetExecutable().fullpath - - def determine_arguments(self): - launch_info = self.target.GetLaunchInfo() - argc = self.target.GetLaunchInfo().GetNumArguments() - return [launch_info.GetArgumentAtIndex(i) for i in range(argc)] - - def is_exited(self): - """Signals whether the concrete process has exited. - - :return: True if the process has exited. False otherwise. - """ - return self.process.GetState() == lldb.eStateExited - - def run(self): - """Continue execution of the concrete process.""" - state = self.process.GetState() - if state == lldb.eStateExited: - raise RuntimeError('Tried to resume process execution, but the' - ' process has already exited.') - self.process.Continue() - - def step(self): - """Step forward by a single instruction.""" - thread: lldb.SBThread = self.process.GetSelectedThread() - thread.StepInstruction(False) - - def run_until(self, address: int) -> None: - """Continue execution until the address is arrived, ignores other breakpoints""" - bp = self.target.BreakpointCreateByAddress(address) - while True: - self.run() - if self.is_exited(): - return - if self.read_register('pc') == address: - break - self.target.BreakpointDelete(bp.GetID()) - - def record_snapshot(self) -> ProgramState: - """Record the concrete target's state in a ProgramState object.""" - state = ProgramState(self.arch) - - # Query and store register state - for regname in self.arch.regnames: - try: - conc_val = self.read_register(regname) - state.set_register(regname, conc_val) - except KeyError: - pass - except ConcreteRegisterError: - pass - - # Query and store memory state - for mapping in self.get_mappings(): - assert(mapping.end_address > mapping.start_address) - size = mapping.end_address - mapping.start_address - try: - data = self.read_memory(mapping.start_address, size) - state.write_memory(mapping.start_address, data) - except ConcreteMemoryError: - pass - - return state - - def _get_register(self, regname: str) -> lldb.SBValue: - """Find a register by name. - - :raise ConcreteRegisterError: If no register with the specified name - can be found. - """ - debug(f'Accessing register {regname}') - - frame = self.process.GetSelectedThread().GetFrameAtIndex(0) - - retry_list = self.register_retries[self.archname].get(regname, []) - error_msg = f'[In LLDBConcreteTarget._get_register]: Register {regname} not found' - - reg = None - for name in [regname, *retry_list]: - reg = frame.FindRegister(name) - if reg.IsValid(): - break - if not reg.IsValid(): - raise ConcreteRegisterError(error_msg) - return reg - - def read_flags(self) -> dict[str, int | bool]: - """Read the current state flags. - - If the concrete target's architecture has state flags, read and return - their current values. - - This handles the conversion from implementation details like flags - registers to the logical flag values. For example: On X86, this reads - the RFLAGS register and extracts the flag bits from its value. - - :return: Dictionary mapping flag names to values. The values may be - booleans in the case of true binary flags or integers in the - case of multi-byte flags. Is empty if the current architecture - does not have state flags of the access is not implemented for - it. - """ - if self.archname not in self.flag_register_names: - return {} - - flags_reg = self.flag_register_names[self.archname] - flags_val = self._get_register(flags_reg).GetValueAsUnsigned() - return self.flag_register_decompose[self.archname](flags_val) - - def read_register(self, regname: str) -> int: - """Read the value of a register. - - :raise ConcreteRegisterError: If `regname` is not a valid register name - or the target is otherwise unable to read - the register's value. - """ - try: - reg = self._get_register(regname) - assert(reg.IsValid()) - if reg.size > 8: # reg is a vector register - reg.data.byte_order = lldb.eByteOrderLittle - val = 0 - for ui64 in reversed(reg.data.uint64s): - val <<= 64 - val |= ui64 - return val - return reg.GetValueAsUnsigned() - except ConcreteRegisterError as err: - flags = self.read_flags() - if regname in flags: - return flags[regname] - reader = self.arch.get_reg_reader(regname) - if reader: - return reader() - raise ConcreteRegisterError( - f'[In LLDBConcreteTarget.read_register]: Unable to read' - f' register {regname}: {err}') - - def write_register(self, regname: str, value: int): - """Write a value to a register. - - :raise ConcreteRegisterError: If `regname` is not a valid register name - or the target is otherwise unable to set - the register's value. - """ - reg = self._get_register(regname) - error = lldb.SBError() - reg.SetValueFromCString(hex(value), error) - if not error.success: - raise ConcreteRegisterError( - f'[In LLDBConcreteTarget.write_register]: Unable to set' - f' {regname} to value {hex(value)}!') - - def read_memory(self, addr: int, size: int) -> bytes: - """Read bytes from memory. - - :raise ConcreteMemoryError: If unable to read `size` bytes from `addr`. - """ - err = lldb.SBError() - content = self.process.ReadMemory(addr, size, err) - if not err.success: - raise ConcreteMemoryError(f'Error when reading {size} bytes at' - f' address {hex(addr)}: {err}') - if self.arch.endianness == 'little': - return content - else: - return bytes(reversed(content)) - - def write_memory(self, addr: int, value: bytes): - """Write bytes to memory. - - :raise ConcreteMemoryError: If unable to write at `addr`. - """ - err = lldb.SBError() - res = self.process.WriteMemory(addr, value, err) - if not err.success or res != len(value): - raise ConcreteMemoryError(f'Error when writing to address' - f' {hex(addr)}: {err}') - - def get_mappings(self) -> list[MemoryMap]: - mmap = [] - - region_list = self.process.GetMemoryRegions() - for i in range(region_list.GetSize()): - region = lldb.SBMemoryRegionInfo() - region_list.GetMemoryRegionAtIndex(i, region) - - perms = f'{"r" if region.IsReadable() else "-"}' \ - f'{"w" if region.IsWritable() else "-"}' \ - f'{"x" if region.IsExecutable() else "-"}' - name = region.GetName() - - mmap.append(MemoryMap(region.GetRegionBase(), - region.GetRegionEnd(), - name if name is not None else '', - perms)) - return mmap - - def set_breakpoint(self, addr): - command = f'b -a {addr} -s {self.module.GetFileSpec().GetFilename()}' - result = lldb.SBCommandReturnObject() - self.interpreter.HandleCommand(command, result) - - def remove_breakpoint(self, addr): - command = f'breakpoint delete {addr}' - result = lldb.SBCommandReturnObject() - self.interpreter.HandleCommand(command, result) - - def get_basic_block(self, addr: int) -> list[lldb.SBInstruction]: - """Returns a basic block pointed by addr - a code section is considered a basic block only if - the last instruction is a brach, e.g. JUMP, CALL, RET - """ - block = [] - while not self.target.ReadInstructions(lldb.SBAddress(addr, self.target), 1)[0].is_branch: - block.append(self.target.ReadInstructions(lldb.SBAddress(addr, self.target), 1)[0]) - addr += self.target.ReadInstructions(lldb.SBAddress(addr, self.target), 1)[0].size - block.append(self.target.ReadInstructions(lldb.SBAddress(addr, self.target), 1)[0]) - - return block - - def get_basic_block_inst(self, addr: int) -> list[str]: - inst = [] - for bb in self.get_basic_block(addr): - inst.append(f'{bb.GetMnemonic(self.target)} {bb.GetOperands(self.target)}') - return inst - - def get_next_basic_block(self) -> list[lldb.SBInstruction]: - return self.get_basic_block(self.read_register("pc")) - - def get_symbol(self, addr: int) -> lldb.SBSymbol: - """Returns the symbol that belongs to the addr - """ - for s in self.module.symbols: - if (s.GetType() == lldb.eSymbolTypeCode and s.GetStartAddress().GetLoadAddress(self.target) <= addr < s.GetEndAddress().GetLoadAddress(self.target)): - return s - raise ConcreteSectionError(f'Error getting the symbol to which address {hex(addr)} belongs to') - - def get_symbol_limit(self) -> int: - """Returns the address after all the symbols""" - addr = 0 - for s in self.module.symbols: - if s.GetStartAddress().IsValid(): - if s.GetStartAddress().GetLoadAddress(self.target) > addr: - addr = s.GetEndAddress().GetLoadAddress(self.target) - return addr - - def get_disassembly(self, addr: int) -> str: - inst: lldb.SBInstruction = self.target.ReadInstructions(lldb.SBAddress(addr, self.target), 1, 'intel')[0] - mnemonic: str = inst.GetMnemonic(self.target).upper() - operands: str = inst.GetOperands(self.target).upper() - operands = operands.replace("0X", "0x") - return f'{mnemonic} {operands}' - - def get_disassembly_bytes(self, addr: int): - error = lldb.SBError() - buf = self.process.ReadMemory(addr, 64, error) - inst = self.target.GetInstructions(lldb.SBAddress(addr, self.target), buf)[0] - return inst.GetData(self.target).ReadRawData(error, 0, inst.GetByteSize()) - - def get_instruction_size(self, addr: int) -> int: - inst = self.target.ReadInstructions(lldb.SBAddress(addr, self.target), 1, 'intel')[0] - return inst.GetByteSize() - - def get_current_tid(self) -> int: - thread: lldb.SBThread = self.process.GetSelectedThread() - return thread.GetThreadID() - -class LLDBLocalTarget(LLDBConcreteTarget): - def __init__(self, - executable: str, - argv: list[str] = [], - envp: list[str] | None = None): - """Construct an LLDB local target. Stop at entry. - - :param executable: Name of executable to run under LLDB. - :param argv: List of arguements. Does NOT include the conventional - executable name as the first entry. - :param envp: List of environment entries. Defaults to current - `os.environ` if `None`. - :raises RuntimeError: If the process is unable to launch. - """ - if envp is None: - envp = [f'{k}={v}' for k, v in os.environ.items()] - - debugger = lldb.SBDebugger.Create() - debugger.SetAsync(False) - target = debugger.CreateTargetWithFileAndArch(executable, lldb.LLDB_ARCH_DEFAULT) - - # Set up objects for process execution - error = lldb.SBError() - process = target.Launch(debugger.GetListener(), - argv, envp, # argv, envp - None, None, None, # stdin, stdout, stderr - None, # working directory - 0, - True, error) - - if not target.process.IsValid(): - raise RuntimeError(f'Failed to launch LLDB target: {error.GetCString()}') - - super().__init__(debugger, target, process) - -class LLDBRemoteTarget(LLDBConcreteTarget): - def __init__(self, remote: str, executable: str | None = None): - """Construct an LLDB remote target. Stop at entry. - - :param remote: String of the form : (e.g. localhost:12345). - :raises RuntimeError: If failing to attach to a remote debug session. - """ - debugger = lldb.SBDebugger.Create() - debugger.SetAsync(False) - target = debugger.CreateTarget(executable) - - # Set up objects for process execution - error = lldb.SBError() - process = target.ConnectRemote(debugger.GetListener(), - f'connect://{remote}', - None, - error) - if not target.process.IsValid(): - raise RuntimeError(f'Failed to connect via LLDB to remote target: {error.GetCString()}') - - super().__init__(debugger, target, process) - diff --git a/src/focaccia/native/__init__.py b/src/focaccia/native/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/focaccia/native/lldb_target.py b/src/focaccia/native/lldb_target.py new file mode 100644 index 0000000..ff43643 --- /dev/null +++ b/src/focaccia/native/lldb_target.py @@ -0,0 +1,418 @@ +import os +import logging + +import lldb + +from focaccia.snapshot import ProgramState +from focaccia.arch import supported_architectures + +logger = logging.getLogger('focaccia-lldb-target') +debug = logger.debug +info = logger.info +warn = logger.warn + +class MemoryMap: + """Description of a range of mapped memory. + + Inspired by https://github.com/angr/angr-targets/blob/master/angr_targets/memory_map.py, + meaning we initially used angr and I wanted to keep the interface when we + switched to a different tool. + """ + def __init__(self, start_address, end_address, name, perms): + self.start_address = start_address + self.end_address = end_address + self.name = name + self.perms = perms + + def __str__(self): + return f'MemoryMap[0x{self.start_address:x}, 0x{self.end_address:x}]' \ + f': {self.name}' + +class ConcreteRegisterError(Exception): + pass + +class ConcreteMemoryError(Exception): + pass + +class ConcreteSectionError(Exception): + pass + +class LLDBConcreteTarget: + from focaccia.arch import aarch64, x86 + + flag_register_names = { + aarch64.archname: 'cpsr', + x86.archname: 'rflags', + } + + flag_register_decompose = { + aarch64.archname: aarch64.decompose_cpsr, + x86.archname: x86.decompose_rflags, + } + + register_retries = { + aarch64.archname: {}, + x86.archname: { + "rflags": ["eflags"] + } + } + + def __init__(self, + debugger: lldb.SBDebugger, + target: lldb.SBTarget, + process: lldb.SBProcess): + """Construct an LLDB concrete target. Stop at entry. + + :param debugger: LLDB SBDebugger object representing an initialized debug session. + :param target: LLDB SBTarget object representing an initialized target for the debugger. + :param process: LLDB SBProcess object representing an initialized process (either local or remote). + """ + self.debugger = debugger + self.target = target + self.process = process + + self.module = self.target.FindModule(self.target.GetExecutable()) + self.interpreter = self.debugger.GetCommandInterpreter() + + # Set up objects for process execution + self.listener = self.debugger.GetListener() + + # Determine current arch + self.archname = self.determine_arch() + self.arch = supported_architectures[self.archname] + + def determine_arch(self): + archname = self.target.GetPlatform().GetTriple().split('-')[0] + if archname not in supported_architectures: + err = f'LLDBConcreteTarget: Architecture {archname} is not' \ + f' supported by Focaccia.' + print(f'[ERROR] {err}') + raise NotImplementedError(err) + return archname + + def determine_name(self) -> str: + return self.process.GetTarget().GetExecutable().fullpath + + def determine_arguments(self): + launch_info = self.target.GetLaunchInfo() + argc = self.target.GetLaunchInfo().GetNumArguments() + return [launch_info.GetArgumentAtIndex(i) for i in range(argc)] + + def is_exited(self): + """Signals whether the concrete process has exited. + + :return: True if the process has exited. False otherwise. + """ + return self.process.GetState() == lldb.eStateExited + + def run(self): + """Continue execution of the concrete process.""" + state = self.process.GetState() + if state == lldb.eStateExited: + raise RuntimeError('Tried to resume process execution, but the' + ' process has already exited.') + self.process.Continue() + + def step(self): + """Step forward by a single instruction.""" + thread: lldb.SBThread = self.process.GetSelectedThread() + thread.StepInstruction(False) + + def run_until(self, address: int) -> None: + """Continue execution until the address is arrived, ignores other breakpoints""" + bp = self.target.BreakpointCreateByAddress(address) + while True: + self.run() + if self.is_exited(): + return + if self.read_register('pc') == address: + break + self.target.BreakpointDelete(bp.GetID()) + + def record_snapshot(self) -> ProgramState: + """Record the concrete target's state in a ProgramState object.""" + state = ProgramState(self.arch) + + # Query and store register state + for regname in self.arch.regnames: + try: + conc_val = self.read_register(regname) + state.set_register(regname, conc_val) + except KeyError: + pass + except ConcreteRegisterError: + pass + + # Query and store memory state + for mapping in self.get_mappings(): + assert(mapping.end_address > mapping.start_address) + size = mapping.end_address - mapping.start_address + try: + data = self.read_memory(mapping.start_address, size) + state.write_memory(mapping.start_address, data) + except ConcreteMemoryError: + pass + + return state + + def _get_register(self, regname: str) -> lldb.SBValue: + """Find a register by name. + + :raise ConcreteRegisterError: If no register with the specified name + can be found. + """ + debug(f'Accessing register {regname}') + + frame = self.process.GetSelectedThread().GetFrameAtIndex(0) + + retry_list = self.register_retries[self.archname].get(regname, []) + error_msg = f'[In LLDBConcreteTarget._get_register]: Register {regname} not found' + + reg = None + for name in [regname, *retry_list]: + reg = frame.FindRegister(name) + if reg.IsValid(): + break + if not reg.IsValid(): + raise ConcreteRegisterError(error_msg) + return reg + + def read_flags(self) -> dict[str, int | bool]: + """Read the current state flags. + + If the concrete target's architecture has state flags, read and return + their current values. + + This handles the conversion from implementation details like flags + registers to the logical flag values. For example: On X86, this reads + the RFLAGS register and extracts the flag bits from its value. + + :return: Dictionary mapping flag names to values. The values may be + booleans in the case of true binary flags or integers in the + case of multi-byte flags. Is empty if the current architecture + does not have state flags of the access is not implemented for + it. + """ + if self.archname not in self.flag_register_names: + return {} + + flags_reg = self.flag_register_names[self.archname] + flags_val = self._get_register(flags_reg).GetValueAsUnsigned() + return self.flag_register_decompose[self.archname](flags_val) + + def read_register(self, regname: str) -> int: + """Read the value of a register. + + :raise ConcreteRegisterError: If `regname` is not a valid register name + or the target is otherwise unable to read + the register's value. + """ + try: + reg = self._get_register(regname) + assert(reg.IsValid()) + if reg.size > 8: # reg is a vector register + reg.data.byte_order = lldb.eByteOrderLittle + val = 0 + for ui64 in reversed(reg.data.uint64s): + val <<= 64 + val |= ui64 + return val + return reg.GetValueAsUnsigned() + except ConcreteRegisterError as err: + flags = self.read_flags() + if regname in flags: + return flags[regname] + reader = self.arch.get_reg_reader(regname) + if reader: + return reader() + raise ConcreteRegisterError( + f'[In LLDBConcreteTarget.read_register]: Unable to read' + f' register {regname}: {err}') + + def write_register(self, regname: str, value: int): + """Write a value to a register. + + :raise ConcreteRegisterError: If `regname` is not a valid register name + or the target is otherwise unable to set + the register's value. + """ + reg = self._get_register(regname) + error = lldb.SBError() + reg.SetValueFromCString(hex(value), error) + if not error.success: + raise ConcreteRegisterError( + f'[In LLDBConcreteTarget.write_register]: Unable to set' + f' {regname} to value {hex(value)}!') + + def read_memory(self, addr: int, size: int) -> bytes: + """Read bytes from memory. + + :raise ConcreteMemoryError: If unable to read `size` bytes from `addr`. + """ + err = lldb.SBError() + content = self.process.ReadMemory(addr, size, err) + if not err.success: + raise ConcreteMemoryError(f'Error when reading {size} bytes at' + f' address {hex(addr)}: {err}') + if self.arch.endianness == 'little': + return content + else: + return bytes(reversed(content)) + + def write_memory(self, addr: int, value: bytes): + """Write bytes to memory. + + :raise ConcreteMemoryError: If unable to write at `addr`. + """ + err = lldb.SBError() + res = self.process.WriteMemory(addr, value, err) + if not err.success or res != len(value): + raise ConcreteMemoryError(f'Error when writing to address' + f' {hex(addr)}: {err}') + + def get_mappings(self) -> list[MemoryMap]: + mmap = [] + + region_list = self.process.GetMemoryRegions() + for i in range(region_list.GetSize()): + region = lldb.SBMemoryRegionInfo() + region_list.GetMemoryRegionAtIndex(i, region) + + perms = f'{"r" if region.IsReadable() else "-"}' \ + f'{"w" if region.IsWritable() else "-"}' \ + f'{"x" if region.IsExecutable() else "-"}' + name = region.GetName() + + mmap.append(MemoryMap(region.GetRegionBase(), + region.GetRegionEnd(), + name if name is not None else '', + perms)) + return mmap + + def set_breakpoint(self, addr): + command = f'b -a {addr} -s {self.module.GetFileSpec().GetFilename()}' + result = lldb.SBCommandReturnObject() + self.interpreter.HandleCommand(command, result) + + def remove_breakpoint(self, addr): + command = f'breakpoint delete {addr}' + result = lldb.SBCommandReturnObject() + self.interpreter.HandleCommand(command, result) + + def get_basic_block(self, addr: int) -> list[lldb.SBInstruction]: + """Returns a basic block pointed by addr + a code section is considered a basic block only if + the last instruction is a brach, e.g. JUMP, CALL, RET + """ + block = [] + while not self.target.ReadInstructions(lldb.SBAddress(addr, self.target), 1)[0].is_branch: + block.append(self.target.ReadInstructions(lldb.SBAddress(addr, self.target), 1)[0]) + addr += self.target.ReadInstructions(lldb.SBAddress(addr, self.target), 1)[0].size + block.append(self.target.ReadInstructions(lldb.SBAddress(addr, self.target), 1)[0]) + + return block + + def get_basic_block_inst(self, addr: int) -> list[str]: + inst = [] + for bb in self.get_basic_block(addr): + inst.append(f'{bb.GetMnemonic(self.target)} {bb.GetOperands(self.target)}') + return inst + + def get_next_basic_block(self) -> list[lldb.SBInstruction]: + return self.get_basic_block(self.read_register("pc")) + + def get_symbol(self, addr: int) -> lldb.SBSymbol: + """Returns the symbol that belongs to the addr + """ + for s in self.module.symbols: + if (s.GetType() == lldb.eSymbolTypeCode and s.GetStartAddress().GetLoadAddress(self.target) <= addr < s.GetEndAddress().GetLoadAddress(self.target)): + return s + raise ConcreteSectionError(f'Error getting the symbol to which address {hex(addr)} belongs to') + + def get_symbol_limit(self) -> int: + """Returns the address after all the symbols""" + addr = 0 + for s in self.module.symbols: + if s.GetStartAddress().IsValid(): + if s.GetStartAddress().GetLoadAddress(self.target) > addr: + addr = s.GetEndAddress().GetLoadAddress(self.target) + return addr + + def get_disassembly(self, addr: int) -> str: + inst: lldb.SBInstruction = self.target.ReadInstructions(lldb.SBAddress(addr, self.target), 1, 'intel')[0] + mnemonic: str = inst.GetMnemonic(self.target).upper() + operands: str = inst.GetOperands(self.target).upper() + operands = operands.replace("0X", "0x") + return f'{mnemonic} {operands}' + + def get_disassembly_bytes(self, addr: int): + error = lldb.SBError() + buf = self.process.ReadMemory(addr, 64, error) + inst = self.target.GetInstructions(lldb.SBAddress(addr, self.target), buf)[0] + return inst.GetData(self.target).ReadRawData(error, 0, inst.GetByteSize()) + + def get_instruction_size(self, addr: int) -> int: + inst = self.target.ReadInstructions(lldb.SBAddress(addr, self.target), 1, 'intel')[0] + return inst.GetByteSize() + + def get_current_tid(self) -> int: + thread: lldb.SBThread = self.process.GetSelectedThread() + return thread.GetThreadID() + +class LLDBLocalTarget(LLDBConcreteTarget): + def __init__(self, + executable: str, + argv: list[str] = [], + envp: list[str] | None = None): + """Construct an LLDB local target. Stop at entry. + + :param executable: Name of executable to run under LLDB. + :param argv: List of arguements. Does NOT include the conventional + executable name as the first entry. + :param envp: List of environment entries. Defaults to current + `os.environ` if `None`. + :raises RuntimeError: If the process is unable to launch. + """ + if envp is None: + envp = [f'{k}={v}' for k, v in os.environ.items()] + + debugger = lldb.SBDebugger.Create() + debugger.SetAsync(False) + target = debugger.CreateTargetWithFileAndArch(executable, lldb.LLDB_ARCH_DEFAULT) + + # Set up objects for process execution + error = lldb.SBError() + process = target.Launch(debugger.GetListener(), + argv, envp, # argv, envp + None, None, None, # stdin, stdout, stderr + None, # working directory + 0, + True, error) + + if not target.process.IsValid(): + raise RuntimeError(f'Failed to launch LLDB target: {error.GetCString()}') + + super().__init__(debugger, target, process) + +class LLDBRemoteTarget(LLDBConcreteTarget): + def __init__(self, remote: str, executable: str | None = None): + """Construct an LLDB remote target. Stop at entry. + + :param remote: String of the form : (e.g. localhost:12345). + :raises RuntimeError: If failing to attach to a remote debug session. + """ + debugger = lldb.SBDebugger.Create() + debugger.SetAsync(False) + target = debugger.CreateTarget(executable) + + # Set up objects for process execution + error = lldb.SBError() + process = target.ConnectRemote(debugger.GetListener(), + f'connect://{remote}', + None, + error) + if not target.process.IsValid(): + raise RuntimeError(f'Failed to connect via LLDB to remote target: {error.GetCString()}') + + super().__init__(debugger, target, process) + diff --git a/src/focaccia/native/tracer.py b/src/focaccia/native/tracer.py new file mode 100644 index 0000000..9dbc32a --- /dev/null +++ b/src/focaccia/native/tracer.py @@ -0,0 +1,342 @@ +"""Concolic Tracer for native programs.""" + +from __future__ import annotations + +import sys +import logging + +from pathlib import Path + +from focaccia.utils import timebound, TimeoutError +from focaccia.trace import Trace, TraceEnvironment +from focaccia.miasm_util import MiasmSymbolResolver +from focaccia.snapshot import ReadableProgramState, RegisterAccessError +from focaccia.symbolic import SymbolicTransform, DisassemblyContext, run_instruction + +from .lldb_target import LLDBConcreteTarget, LLDBLocalTarget, LLDBRemoteTarget + +logger = logging.getLogger('focaccia-symbolic') +debug = logger.debug +info = logger.info +warn = logger.warn + +# Disable Miasm's disassembly logger +logging.getLogger('asmblock').setLevel(logging.CRITICAL) + +class ValidationError(Exception): + pass + +class SpeculativeTracer(ReadableProgramState): + def __init__(self, target: LLDBConcreteTarget): + super().__init__(target.arch) + self.target = target + self.pc = target.read_register('pc') + self.speculative_pc: int | None = None + self.speculative_count: int = 0 + + self.read_cache = {} + + def speculate(self, new_pc): + self.read_cache.clear() + if new_pc is None: + self.progress_execution() + self.target.step() + self.pc = self.target.read_register('pc') + self.speculative_pc = None + self.speculative_count = 0 + return + + new_pc = int(new_pc) + self.speculative_pc = new_pc + self.speculative_count += 1 + + def progress_execution(self) -> None: + if self.speculative_pc is not None and self.speculative_count != 0: + debug(f'Updating PC to {hex(self.speculative_pc)}') + if self.speculative_count == 1: + self.target.step() + else: + self.target.run_until(self.speculative_pc) + + self.pc = self.speculative_pc + self.speculative_pc = None + self.speculative_count = 0 + + self.read_cache.clear() + + def run_until(self, addr: int): + if self.speculative_pc: + raise Exception('Attempting manual execution with speculative execution enabled') + self.target.run_until(addr) + self.pc = addr + + def step(self): + self.progress_execution() + if self.target.is_exited(): + return + self.target.step() + self.pc = self.target.read_register('pc') + + def _cache(self, name: str, value): + self.read_cache[name] = value + return value + + def read_pc(self) -> int: + if self.speculative_pc is not None: + return self.speculative_pc + return self.pc + + def read_flags(self) -> dict[str, int | bool]: + if 'flags' in self.read_cache: + return self.read_cache['flags'] + self.progress_execution() + return self._cache('flags', self.target.read_flags()) + + def read_register(self, reg: str) -> int: + regname = self.arch.to_regname(reg) + if regname is None: + raise RegisterAccessError(reg, f'Not a register name: {reg}') + + if reg in self.read_cache: + return self.read_cache[reg] + + self.progress_execution() + return self._cache(reg, self.target.read_register(regname)) + + def write_register(self, regname: str, value: int): + self.progress_execution() + self.read_cache.pop(regname, None) + self.target.write_register(regname, value) + + def read_instructions(self, addr: int, size: int) -> bytes: + return self.target.read_memory(addr, size) + + def read_memory(self, addr: int, size: int) -> bytes: + self.progress_execution() + cache_name = f'{addr}_{size}' + if cache_name in self.read_cache: + return self.read_cache[cache_name] + return self._cache(cache_name, self.target.read_memory(addr, size)) + + def write_memory(self, addr: int, value: bytes): + self.progress_execution() + self.read_cache.pop(addr, None) + self.target.write_memory(addr, value) + + def __getattr__(self, name: str): + return getattr(self.target, name) + +class SymbolicTracer: + """A symbolic tracer that uses `LLDBConcreteTarget` with Miasm to simultaneously execute a + program with concrete state and collect its symbolic transforms + """ + def __init__(self, + env: TraceEnvironment, + remote: str | None=None, + force: bool=False, + cross_validate: bool=False): + self.env = env + self.force = force + self.remote = remote + self.cross_validate = cross_validate + self.target = SpeculativeTracer(self.create_debug_target()) + + self.nondet_events = self.env.detlog.events() + self.next_event: int | None = None + + def create_debug_target(self) -> LLDBConcreteTarget: + binary = self.env.binary_name + if self.remote is False: + debug(f'Launching local debug target {binary} {self.env.argv}') + debug(f'Environment: {self.env}') + return LLDBLocalTarget(binary, self.env.argv, self.env.envp) + + debug(f'Connecting to remote debug target {self.remote}') + target = LLDBRemoteTarget(self.remote, binary) + + module_name = target.determine_name() + binary = str(Path(self.env.binary_name).resolve()) + if binary != module_name: + warn(f'Discovered binary name {module_name} differs from specified name {binary}') + + return target + + def predict_next_state(self, instruction: Instruction, transform: SymbolicTransform): + debug(f'Evaluating register and memory transforms for {instruction} to cross-validate') + predicted_regs = transform.eval_register_transforms(self.target) + predicted_mems = transform.eval_memory_transforms(self.target) + return predicted_regs, predicted_mems + + def validate(self, + instruction: Instruction, + transform: SymbolicTransform, + predicted_regs: dict[str, int], + predicted_mems: dict[int, bytes]): + # Verify last generated transform by comparing concrete state against + # predicted values. + if self.target.is_exited(): + return + + debug('Cross-validating symbolic transforms by comparing actual to predicted values') + for reg, val in predicted_regs.items(): + conc_val = self.target.read_register(reg) + if conc_val != val: + raise ValidationError(f'Symbolic execution backend generated false equation for' + f' [{hex(instruction.addr)}]: {instruction}:' + f' Predicted {reg} = {hex(val)}, but the' + f' concrete state has value {reg} = {hex(conc_val)}.' + f'\nFaulty transformation: {transform}') + for addr, data in predicted_mems.items(): + conc_data = self.target.read_memory(addr, len(data)) + if conc_data != data: + raise ValidationError(f'Symbolic execution backend generated false equation for' + f' [{hex(instruction.addr)}]: {instruction}: Predicted' + f' mem[{hex(addr)}:{hex(addr+len(data))}] = {data},' + f' but the concrete state has value' + f' mem[{hex(addr)}:{hex(addr+len(data))}] = {conc_data}.' + f'\nFaulty transformation: {transform}') + + def progress_event(self) -> None: + if (self.next_event + 1) < len(self.nondet_events): + self.next_event += 1 + debug(f'Next event to handle at index {self.next_event}') + else: + self.next_event = None + + def post_event(self) -> None: + if self.next_event: + if self.nondet_events[self.next_event].pc == 0: + # Exit sequence + debug('Completed exit event') + self.target.run() + + debug(f'Completed handling event at index {self.next_event}') + self.progress_event() + + def is_stepping_instr(self, pc: int, instruction: Instruction) -> bool: + if self.nondet_events: + pc = pc + instruction.length # detlog reports next pc for each event + if self.next_event and self.nondet_events[self.next_event].match(pc, self.target): + debug('Current instruction matches next event; stepping through it') + self.progress_event() + return True + else: + if self.target.arch.is_instr_syscall(str(instruction)): + return True + return False + + def progress(self, new_pc, step: bool = False) -> int | None: + self.target.speculate(new_pc) + if step: + self.target.progress_execution() + if self.target.is_exited(): + return None + return self.target.read_pc() + + def trace(self, time_limit: int | None = None) -> Trace[SymbolicTransform]: + """Execute a program and compute state transformations between executed + instructions. + + :param start_addr: Address from which to start tracing. + :param stop_addr: Address until which to trace. + """ + # Set up concrete reference state + if self.env.start_address is not None: + self.target.run_until(self.env.start_address) + + for i in range(len(self.nondet_events)): + if self.nondet_events[i].pc == self.target.read_pc(): + self.next_event = i+1 + if self.next_event >= len(self.nondet_events): + break + + debug(f'Starting from event {self.nondet_events[i]} onwards') + break + + ctx = DisassemblyContext(self.target) + arch = ctx.arch + + if logger.isEnabledFor(logging.DEBUG): + debug('Tracing program with the following non-deterministic events') + for event in self.nondet_events: + debug(event) + + # Trace concolically + strace: list[SymbolicTransform] = [] + while not self.target.is_exited(): + pc = self.target.read_pc() + + if self.env.stop_address is not None and pc == self.env.stop_address: + break + + assert(pc != 0) + + # Disassemble instruction at the current PC + tid = self.target.get_current_tid() + try: + instruction = ctx.disassemble(pc) + info(f'[{tid}] Disassembled instruction {instruction} at {hex(pc)}') + except: + err = sys.exc_info()[1] + + # Try to recovery by using the LLDB disassembly instead + try: + alt_disas = self.target.get_disassembly(pc) + instruction = Instruction.from_string(alt_disas, ctx.arch, pc, + self.target.get_instruction_size(pc)) + info(f'[{tid}] Disassembled instruction {instruction} at {hex(pc)}') + except: + if self.force: + if alt_disas: + warn(f'[{tid}] Unable to handle instruction {alt_disas} at {hex(pc)} in Miasm.' + f' Skipping.') + else: + warn(f'[{tid}] Unable to disassemble instruction {hex(pc)}: {err}.' + f' Skipping.') + self.target.step() + continue + raise # forward exception + + is_event = self.is_stepping_instr(pc, instruction) + + # Run instruction + conc_state = MiasmSymbolResolver(self.target, ctx.loc_db) + + try: + new_pc, modified = timebound(time_limit, run_instruction, + instruction.instr, conc_state, ctx.lifter) + except TimeoutError: + warn(f'Running instruction {instruction} took longer than {time_limit} second. Skipping') + new_pc, modified = None, {} + + if self.cross_validate and new_pc: + # Predict next concrete state. + # We verify the symbolic execution backend on the fly for some + # additional protection from bugs in the backend. + new_pc = int(new_pc) + transform = SymbolicTransform(tid, modified, [instruction], arch, pc, new_pc) + pred_regs, pred_mems = self.predict_next_state(instruction, transform) + self.progress(new_pc, step=is_event) + + try: + self.validate(instruction, transform, pred_regs, pred_mems) + except ValidationError as e: + if self.force: + warn(f'Cross-validation failed: {e}') + continue + raise + else: + new_pc = self.progress(new_pc, step=is_event) + if new_pc is None: + transform = SymbolicTransform(tid, modified, [instruction], arch, pc, 0) + strace.append(transform) + continue # we're done + transform = SymbolicTransform(tid, modified, [instruction], arch, pc, new_pc) + + strace.append(transform) + + if is_event: + self.post_event() + + return Trace(strace, self.env) + diff --git a/src/focaccia/symbolic.py b/src/focaccia/symbolic.py index 2a66a26..b83b289 100644 --- a/src/focaccia/symbolic.py +++ b/src/focaccia/symbolic.py @@ -1,42 +1,17 @@ -"""Tools and utilities for execution with Miasm.""" +"""Tools and utilities for execution with Miasm.""" from __future__ import annotations -import sys -import logging - -from pathlib import Path - +from miasm.ir.ir import Lifter from miasm.analysis.machine import Machine -from miasm.core.cpu import instruction as miasm_instr from miasm.core.locationdb import LocationDB -from miasm.expression.expression import Expr, ExprId, ExprMem, ExprInt -from miasm.ir.ir import Lifter +from miasm.core.cpu import instruction as miasm_instr from miasm.ir.symbexec import SymbolicExecutionEngine +from miasm.expression.expression import Expr, ExprId, ExprMem, ExprInt +from .snapshot import ReadableProgramState from .arch import Arch, supported_architectures -from .lldb_target import ( - LLDBConcreteTarget, - LLDBLocalTarget, - LLDBRemoteTarget, - ConcreteRegisterError, - ConcreteMemoryError, -) from .miasm_util import MiasmSymbolResolver, eval_expr, make_machine -from .snapshot import ReadableProgramState, RegisterAccessError, MemoryAccessError -from .trace import Trace, TraceEnvironment -from .utils import timebound, TimeoutError - -logger = logging.getLogger('focaccia-symbolic') -debug = logger.debug -info = logger.info -warn = logger.warn - -# Disable Miasm's disassembly logger -logging.getLogger('asmblock').setLevel(logging.CRITICAL) - -class ValidationError(Exception): - pass def eval_symbol(symbol: Expr, conc_state: ReadableProgramState) -> int: """Evaluate a symbol based on a concrete reference state. @@ -542,8 +517,7 @@ def run_instruction(instr: miasm_instr, res[dst] = expr_simp(ExprCond(cond, dst, v)) return res - def _execute_location(loc, base_state: dict | None) \ - -> tuple[Expr, dict]: + def _execute_location(loc, base_state: dict | None) -> tuple[Expr, dict]: """Execute a single IR block via symbolic engine. No fancy stuff.""" # Query the location's IR block irblock = ircfg.get_block(loc) @@ -601,12 +575,7 @@ def run_instruction(instr: miasm_instr, loc = lifter.add_instr_to_ircfg(instr, ircfg, None, False) assert(isinstance(loc, Expr) or isinstance(loc, LocKey)) except NotImplementedError as err: - msg = f'Unable to lift instruction {instr}: {err}' - if force: - warn(f'{msg}. Skipping') - return None, {} - else: - raise Exception(msg) + raise Exception(f'Unable to lift instruction {instr}: {err}') # Execute instruction symbolically new_pc, modified = execute_location(loc) @@ -614,317 +583,3 @@ def run_instruction(instr: miasm_instr, return new_pc, modified -class SpeculativeTracer(ReadableProgramState): - def __init__(self, target: LLDBConcreteTarget): - super().__init__(target.arch) - self.target = target - self.pc = target.read_register('pc') - self.speculative_pc: int | None = None - self.speculative_count: int = 0 - - self.read_cache = {} - - def speculate(self, new_pc): - self.read_cache.clear() - if new_pc is None: - self.progress_execution() - self.target.step() - self.pc = self.target.read_register('pc') - self.speculative_pc = None - self.speculative_count = 0 - return - - new_pc = int(new_pc) - self.speculative_pc = new_pc - self.speculative_count += 1 - - def progress_execution(self) -> None: - if self.speculative_pc is not None and self.speculative_count != 0: - debug(f'Updating PC to {hex(self.speculative_pc)}') - if self.speculative_count == 1: - self.target.step() - else: - self.target.run_until(self.speculative_pc) - - self.pc = self.speculative_pc - self.speculative_pc = None - self.speculative_count = 0 - - self.read_cache.clear() - - def run_until(self, addr: int): - if self.speculative_pc: - raise Exception('Attempting manual execution with speculative execution enabled') - self.target.run_until(addr) - self.pc = addr - - def step(self): - self.progress_execution() - if self.target.is_exited(): - return - self.target.step() - self.pc = self.target.read_register('pc') - - def _cache(self, name: str, value): - self.read_cache[name] = value - return value - - def read_pc(self) -> int: - if self.speculative_pc is not None: - return self.speculative_pc - return self.pc - - def read_flags(self) -> dict[str, int | bool]: - if 'flags' in self.read_cache: - return self.read_cache['flags'] - self.progress_execution() - return self._cache('flags', self.target.read_flags()) - - def read_register(self, reg: str) -> int: - regname = self.arch.to_regname(reg) - if regname is None: - raise RegisterAccessError(reg, f'Not a register name: {reg}') - - if reg in self.read_cache: - return self.read_cache[reg] - - self.progress_execution() - return self._cache(reg, self.target.read_register(regname)) - - def write_register(self, regname: str, value: int): - self.progress_execution() - self.read_cache.pop(regname, None) - self.target.write_register(regname, value) - - def read_instructions(self, addr: int, size: int) -> bytes: - return self.target.read_memory(addr, size) - - def read_memory(self, addr: int, size: int) -> bytes: - self.progress_execution() - cache_name = f'{addr}_{size}' - if cache_name in self.read_cache: - return self.read_cache[cache_name] - return self._cache(cache_name, self.target.read_memory(addr, size)) - - def write_memory(self, addr: int, value: bytes): - self.progress_execution() - self.read_cache.pop(addr, None) - self.target.write_memory(addr, value) - - def __getattr__(self, name: str): - return getattr(self.target, name) - -class SymbolicTracer: - """A symbolic tracer that uses `LLDBConcreteTarget` with Miasm to simultaneously execute a - program with concrete state and collect its symbolic transforms - """ - def __init__(self, - env: TraceEnvironment, - remote: str | None=None, - force: bool=False, - cross_validate: bool=False): - self.env = env - self.force = force - self.remote = remote - self.cross_validate = cross_validate - self.target = SpeculativeTracer(self.create_debug_target()) - - self.nondet_events = self.env.detlog.events() - self.next_event: int | None = None - - def create_debug_target(self) -> LLDBConcreteTarget: - binary = self.env.binary_name - if self.remote is False: - debug(f'Launching local debug target {binary} {self.env.argv}') - debug(f'Environment: {self.env}') - return LLDBLocalTarget(binary, self.env.argv, self.env.envp) - - debug(f'Connecting to remote debug target {self.remote}') - target = LLDBRemoteTarget(self.remote, binary) - - module_name = target.determine_name() - binary = str(Path(self.env.binary_name).resolve()) - if binary != module_name: - warn(f'Discovered binary name {module_name} differs from specified name {binary}') - - return target - - def predict_next_state(self, instruction: Instruction, transform: SymbolicTransform): - debug(f'Evaluating register and memory transforms for {instruction} to cross-validate') - predicted_regs = transform.eval_register_transforms(self.target) - predicted_mems = transform.eval_memory_transforms(self.target) - return predicted_regs, predicted_mems - - def validate(self, - instruction: Instruction, - transform: SymbolicTransform, - predicted_regs: dict[str, int], - predicted_mems: dict[int, bytes]): - # Verify last generated transform by comparing concrete state against - # predicted values. - if self.target.is_exited(): - return - - debug('Cross-validating symbolic transforms by comparing actual to predicted values') - for reg, val in predicted_regs.items(): - conc_val = self.target.read_register(reg) - if conc_val != val: - raise ValidationError(f'Symbolic execution backend generated false equation for' - f' [{hex(instruction.addr)}]: {instruction}:' - f' Predicted {reg} = {hex(val)}, but the' - f' concrete state has value {reg} = {hex(conc_val)}.' - f'\nFaulty transformation: {transform}') - for addr, data in predicted_mems.items(): - conc_data = self.target.read_memory(addr, len(data)) - if conc_data != data: - raise ValidationError(f'Symbolic execution backend generated false equation for' - f' [{hex(instruction.addr)}]: {instruction}: Predicted' - f' mem[{hex(addr)}:{hex(addr+len(data))}] = {data},' - f' but the concrete state has value' - f' mem[{hex(addr)}:{hex(addr+len(data))}] = {conc_data}.' - f'\nFaulty transformation: {transform}') - - def progress_event(self) -> None: - if (self.next_event + 1) < len(self.nondet_events): - self.next_event += 1 - debug(f'Next event to handle at index {self.next_event}') - else: - self.next_event = None - - def post_event(self) -> None: - if self.next_event: - if self.nondet_events[self.next_event].pc == 0: - # Exit sequence - debug('Completed exit event') - self.target.run() - - debug(f'Completed handling event at index {self.next_event}') - self.progress_event() - - def is_stepping_instr(self, pc: int, instruction: Instruction) -> bool: - if self.nondet_events: - pc = pc + instruction.length # detlog reports next pc for each event - if self.next_event and self.nondet_events[self.next_event].match(pc, self.target): - debug('Current instruction matches next event; stepping through it') - self.progress_event() - return True - else: - if self.target.arch.is_instr_syscall(str(instruction)): - return True - return False - - def progress(self, new_pc, step: bool = False) -> int | None: - self.target.speculate(new_pc) - if step: - self.target.progress_execution() - if self.target.is_exited(): - return None - return self.target.read_pc() - - def trace(self, time_limit: int | None = None) -> Trace[SymbolicTransform]: - """Execute a program and compute state transformations between executed - instructions. - - :param start_addr: Address from which to start tracing. - :param stop_addr: Address until which to trace. - """ - # Set up concrete reference state - if self.env.start_address is not None: - self.target.run_until(self.env.start_address) - - for i in range(len(self.nondet_events)): - if self.nondet_events[i].pc == self.target.read_pc(): - self.next_event = i+1 - if self.next_event >= len(self.nondet_events): - break - - debug(f'Starting from event {self.nondet_events[i]} onwards') - break - - ctx = DisassemblyContext(self.target) - arch = ctx.arch - - if logger.isEnabledFor(logging.DEBUG): - debug('Tracing program with the following non-deterministic events') - for event in self.nondet_events: - debug(event) - - # Trace concolically - strace: list[SymbolicTransform] = [] - while not self.target.is_exited(): - pc = self.target.read_pc() - - if self.env.stop_address is not None and pc == self.env.stop_address: - break - - assert(pc != 0) - - # Disassemble instruction at the current PC - tid = self.target.get_current_tid() - try: - instruction = ctx.disassemble(pc) - info(f'[{tid}] Disassembled instruction {instruction} at {hex(pc)}') - except: - err = sys.exc_info()[1] - - # Try to recovery by using the LLDB disassembly instead - try: - alt_disas = self.target.get_disassembly(pc) - instruction = Instruction.from_string(alt_disas, ctx.arch, pc, - self.target.get_instruction_size(pc)) - info(f'[{tid}] Disassembled instruction {instruction} at {hex(pc)}') - except: - if self.force: - if alt_disas: - warn(f'[{tid}] Unable to handle instruction {alt_disas} at {hex(pc)} in Miasm.' - f' Skipping.') - else: - warn(f'[{tid}] Unable to disassemble instruction {hex(pc)}: {err}.' - f' Skipping.') - self.target.step() - continue - raise # forward exception - - is_event = self.is_stepping_instr(pc, instruction) - - # Run instruction - conc_state = MiasmSymbolResolver(self.target, ctx.loc_db) - - try: - new_pc, modified = timebound(time_limit, run_instruction, - instruction.instr, conc_state, ctx.lifter) - except TimeoutError: - warn(f'Running instruction {instruction} took longer than {time_limit} second. Skipping') - new_pc, modified = None, {} - - if self.cross_validate and new_pc: - # Predict next concrete state. - # We verify the symbolic execution backend on the fly for some - # additional protection from bugs in the backend. - new_pc = int(new_pc) - transform = SymbolicTransform(tid, modified, [instruction], arch, pc, new_pc) - pred_regs, pred_mems = self.predict_next_state(instruction, transform) - self.progress(new_pc, step=is_event) - - try: - self.validate(instruction, transform, pred_regs, pred_mems) - except ValidationError as e: - if self.force: - warn(f'Cross-validation failed: {e}') - continue - raise - else: - new_pc = self.progress(new_pc, step=is_event) - if new_pc is None: - transform = SymbolicTransform(tid, modified, [instruction], arch, pc, 0) - strace.append(transform) - continue # we're done - transform = SymbolicTransform(tid, modified, [instruction], arch, pc, new_pc) - - strace.append(transform) - - if is_event: - self.post_event() - - return Trace(strace, self.env) - diff --git a/src/focaccia/tools/capture_transforms.py b/src/focaccia/tools/capture_transforms.py index 1208156..a178ba0 100755 --- a/src/focaccia/tools/capture_transforms.py +++ b/src/focaccia/tools/capture_transforms.py @@ -5,8 +5,8 @@ import argparse import logging from focaccia import parser, utils -from focaccia.symbolic import SymbolicTracer from focaccia.trace import TraceEnvironment +from focaccia.native.tracer import SymbolicTracer def main(): prog = argparse.ArgumentParser() -- cgit 1.4.1 From 35a90ec74bc2e0c74b848ac1bb70a05d779de973 Mon Sep 17 00:00:00 2001 From: Theofilos Augoustis Date: Thu, 13 Nov 2025 14:37:57 +0000 Subject: Move QEMU to its own backend directory --- src/focaccia/qemu/__init__.py | 0 src/focaccia/qemu/_qemu_tool.py | 414 ++++++++++++++++++++++++++++++++ src/focaccia/qemu/validation_server.py | 409 +++++++++++++++++++++++++++++++ src/focaccia/tools/_qemu_tool.py | 414 -------------------------------- src/focaccia/tools/validate_qemu.py | 5 +- src/focaccia/tools/validation_server.py | 409 ------------------------------- 6 files changed, 826 insertions(+), 825 deletions(-) create mode 100644 src/focaccia/qemu/__init__.py create mode 100644 src/focaccia/qemu/_qemu_tool.py create mode 100755 src/focaccia/qemu/validation_server.py delete mode 100644 src/focaccia/tools/_qemu_tool.py delete mode 100755 src/focaccia/tools/validation_server.py diff --git a/src/focaccia/qemu/__init__.py b/src/focaccia/qemu/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/focaccia/qemu/_qemu_tool.py b/src/focaccia/qemu/_qemu_tool.py new file mode 100644 index 0000000..93849bd --- /dev/null +++ b/src/focaccia/qemu/_qemu_tool.py @@ -0,0 +1,414 @@ +"""Invocable like this: + + gdb -n --batch -x qemu_tool.py + +But please use `tools/validate_qemu.py` instead because we have some more setup +work to do. +""" + +import gdb +import logging +import traceback +from typing import Iterable + +import focaccia.parser as parser +from focaccia.arch import supported_architectures, Arch +from focaccia.compare import compare_symbolic, Error, ErrorTypes +from focaccia.snapshot import ProgramState, ReadableProgramState, \ + RegisterAccessError, MemoryAccessError +from focaccia.symbolic import SymbolicTransform, eval_symbol, ExprMem +from focaccia.trace import Trace, TraceEnvironment +from focaccia.utils import print_result + +from focaccia.tools.validate_qemu import make_argparser, verbosity + +logger = logging.getLogger('focaccia-qemu-validator') +debug = logger.debug +info = logger.info +warn = logger.warning + +qemu_crash = { + "crashed": False, + "pc": None, + 'txl': None, + 'ref': None, + 'errors': [Error(ErrorTypes.CONFIRMED, "QEMU crashed")], + 'snap': None, +} + +class GDBProgramState(ReadableProgramState): + from focaccia.arch import aarch64, x86 + + flag_register_names = { + aarch64.archname: 'cpsr', + x86.archname: 'eflags', + } + + flag_register_decompose = { + aarch64.archname: aarch64.decompose_cpsr, + x86.archname: x86.decompose_rflags, + } + + def __init__(self, process: gdb.Inferior, frame: gdb.Frame, arch: Arch): + super().__init__(arch) + self._proc = process + self._frame = frame + + @staticmethod + def _read_vector_reg_aarch64(val: gdb.Value, size) -> int: + try: + return int(str(val['d']['u']), 10) + except: + try: + return int(str(val['u']), 10) + except: + return int(str(val['q']['u']), 10) + + @staticmethod + def _read_vector_reg_x86(val: gdb.Value, size) -> int: + num_longs = size // 64 + vals = val[f'v{num_longs}_int64'] + res = 0 + for i in range(num_longs): + val = int(vals[i].cast(gdb.lookup_type('unsigned long'))) + res += val << i * 64 + return res + + read_vector_reg = { + aarch64.archname: _read_vector_reg_aarch64, + x86.archname: _read_vector_reg_x86, + } + + def read_register(self, reg: str) -> int: + if reg == 'RFLAGS': + reg = 'EFLAGS' + + try: + val = self._frame.read_register(reg.lower()) + size = val.type.sizeof * 8 + + # For vector registers, we need to apply architecture-specific + # logic because GDB's interface is not consistent. + if size >= 128: # Value is a vector + if self.arch.archname not in self.read_vector_reg: + raise NotImplementedError( + f'Reading vector registers is not implemented for' + f' architecture {self.arch.archname}.') + return self.read_vector_reg[self.arch.archname](val, size) + elif size < 64: + return int(val.cast(gdb.lookup_type('unsigned int'))) + # For non-vector values, just return the 64-bit value + return int(val.cast(gdb.lookup_type('unsigned long'))) + except ValueError as err: + # Try to access the flags register with `reg` as a logical flag name + if self.arch.archname in self.flag_register_names: + flags_reg = self.flag_register_names[self.arch.archname] + flags = int(self._frame.read_register(flags_reg)) + flags = self.flag_register_decompose[self.arch.archname](flags) + if reg in flags: + return flags[reg] + raise RegisterAccessError(reg, + f'[GDB] Unable to access {reg}: {err}') + + def read_memory(self, addr: int, size: int) -> bytes: + try: + mem = self._proc.read_memory(addr, size).tobytes() + if self.arch.endianness == 'little': + return mem + else: + return bytes(reversed(mem)) # Convert to big endian + except gdb.MemoryError as err: + raise MemoryAccessError(addr, size, str(err)) + +class GDBServerStateIterator: + def __init__(self, remote: str): + gdb.execute('set pagination 0') + gdb.execute('set sysroot') + gdb.execute('set python print-stack full') # enable complete Python tracebacks + gdb.execute(f'target remote {remote}') + self._process = gdb.selected_inferior() + self._first_next = True + + # Try to determine the guest architecture. This is a bit hacky and + # tailored to GDB's naming for the x86-64 architecture. + split = self._process.architecture().name().split(':') + archname = split[1] if len(split) > 1 else split[0] + archname = archname.replace('-', '_') + if archname not in supported_architectures: + print(f'Error: Current platform ({archname}) is not' + f' supported by Focaccia. Exiting.') + exit(1) + + self.arch = supported_architectures[archname] + self.binary = self._process.progspace.filename + + def __iter__(self): + return self + + def __next__(self): + # The first call to __next__ should yield the first program state, + # i.e. before stepping the first time + if self._first_next: + self._first_next = False + return GDBProgramState(self._process, gdb.selected_frame(), self.arch) + + # Step + pc = gdb.selected_frame().read_register('pc') + new_pc = pc + while pc == new_pc: # Skip instruction chains from REP STOS etc. + gdb.execute('si', to_string=True) + if not self._process.is_valid() or len(self._process.threads()) == 0: + raise StopIteration + new_pc = gdb.selected_frame().read_register('pc') + + return GDBProgramState(self._process, gdb.selected_frame(), self.arch) + + def run_until(self, addr: int): + breakpoint = gdb.Breakpoint(f'*{addr:#x}') + gdb.execute('continue') + breakpoint.delete() + return GDBProgramState(self._process, gdb.selected_frame(), self.arch) + +def record_minimal_snapshot(prev_state: ReadableProgramState, + cur_state: ReadableProgramState, + prev_transform: SymbolicTransform, + cur_transform: SymbolicTransform) \ + -> ProgramState: + """Record a minimal snapshot. + + A minimal snapshot must include values (registers and memory) that are + accessed by two transformations: + 1. The values produced by the previous transformation (the + transformation that is producing this snapshot) to check these + values against expected values calculated from the previous + program state. + 2. The values that act as inputs to the transformation acting on this + snapshot, to calculate the expected values of the next snapshot. + + :param prev_transform: The symbolic transformation generating, or + leading to, `cur_state`. Values generated by + this transformation are included in the + snapshot. + :param transform: The symbolic transformation operating on this + snapshot. Input values to this transformation are + included in the snapshot. + """ + assert(cur_state.read_register('pc') == cur_transform.addr) + assert(prev_transform.arch == cur_transform.arch) + + def get_written_addresses(t: SymbolicTransform): + """Get all output memory accesses of a symbolic transformation.""" + return [ExprMem(a, v.size) for a, v in t.changed_mem.items()] + + def set_values(regs: Iterable[str], mems: Iterable[ExprMem], + cur_state: ReadableProgramState, + prev_state: ReadableProgramState, + out_state: ProgramState): + """ + :param prev_state: Addresses of memory included in the snapshot are + resolved relative to this state. + """ + for regname in regs: + try: + regval = cur_state.read_register(regname) + out_state.set_register(regname, regval) + except RegisterAccessError: + pass + for mem in mems: + assert(mem.size % 8 == 0) + addr = eval_symbol(mem.ptr, prev_state) + try: + mem = cur_state.read_memory(addr, int(mem.size / 8)) + out_state.write_memory(addr, mem) + except MemoryAccessError: + pass + + state = ProgramState(cur_transform.arch) + state.set_register('PC', cur_transform.addr) + + set_values(prev_transform.changed_regs.keys(), + get_written_addresses(prev_transform), + cur_state, + prev_state, # Evaluate memory addresses based on previous + # state because they are that state's output + # addresses. + state) + set_values(cur_transform.get_used_registers(), + cur_transform.get_used_memory_addresses(), + cur_state, + cur_state, + state) + return state + +def collect_conc_trace(gdb: GDBServerStateIterator, \ + strace: list[SymbolicTransform], + start_addr: int | None = None, + stop_addr: int | None = None) \ + -> tuple[list[ProgramState], list[SymbolicTransform]]: + """Collect a trace of concrete states from GDB. + + Records minimal concrete states from GDB by using symbolic trace + information to determine which register/memory values are required to + verify the correctness of the program running in GDB. + + May drop symbolic transformations if the symbolic trace and the GDB trace + diverge (e.g. because of differences in environment, etc.). Returns the + new, possibly modified, symbolic trace that matches the returned concrete + trace. + + :return: A list of concrete states and a list of corresponding symbolic + transformations. The lists are guaranteed to have the same length. + """ + def find_index(seq, target, access=lambda el: el): + for i, el in enumerate(seq): + if access(el) == target: + return i + return None + + if not strace: + return [], [] + + states = [] + matched_transforms = [] + + state_iter = iter(gdb) + cur_state = next(state_iter) + symb_i = 0 + + # Skip to start + try: + pc = cur_state.read_register('pc') + if start_addr and pc != start_addr: + info(f'Tracing QEMU from starting address: {hex(start_addr)}') + cur_state = state_iter.run_until(start_addr) + except Exception as e: + if start_addr: + raise Exception(f'Unable to reach start address {hex(start_addr)}: {e}') + raise Exception(f'Unable to trace: {e}') + + # An online trace matching algorithm. + while True: + try: + pc = cur_state.read_register('pc') + + while pc != strace[symb_i].addr: + info(f'PC {hex(pc)} does not match next symbolic reference {hex(strace[symb_i].addr)}') + + next_i = find_index(strace[symb_i+1:], pc, lambda t: t.addr) + + # Drop the concrete state if no address in the symbolic trace + # matches + if next_i is None: + warn(f'Dropping concrete state {hex(pc)}, as no' + f' matching instruction can be found in the symbolic' + f' reference trace.') + cur_state = next(state_iter) + pc = cur_state.read_register('pc') + continue + + # Otherwise, jump to the next matching symbolic state + symb_i += next_i + 1 + if symb_i >= len(strace): + break + + assert(cur_state.read_register('pc') == strace[symb_i].addr) + info(f'Validating instruction at address {hex(pc)}') + states.append(record_minimal_snapshot( + states[-1] if states else cur_state, + cur_state, + matched_transforms[-1] if matched_transforms else strace[symb_i], + strace[symb_i])) + matched_transforms.append(strace[symb_i]) + cur_state = next(state_iter) + symb_i += 1 + if symb_i >= len(strace): + break + except StopIteration: + # TODO: The conditions may test for the same + if stop_addr and pc != stop_addr: + raise Exception(f'QEMU stopped at {hex(pc)} before reaching the stop address' + f' {hex(stop_addr)}') + if symb_i+1 < len(strace): + qemu_crash["crashed"] = True + qemu_crash["pc"] = strace[symb_i].addr + qemu_crash["ref"] = strace[symb_i] + qemu_crash["snap"] = states[-1] + break + except Exception as e: + print(traceback.format_exc()) + raise e + + # Note: this may occur when symbolic traces were gathered with a stop address + if symb_i >= len(strace): + warn(f'QEMU executed more states than native execution: {symb_i} vs {len(strace)-1}') + + return states, matched_transforms + +def main(): + args = make_argparser().parse_args() + + logging_level = getattr(logging, args.error_level.upper(), logging.INFO) + logging.basicConfig(level=logging_level, force=True) + + try: + gdb_server = GDBServerStateIterator(args.remote) + except Exception as e: + raise Exception(f'Unable to perform basic GDB setup: {e}') + + try: + executable: str | None = None + if args.executable is None: + executable = gdb_server.binary + else: + executable = args.executable + + argv = [] # QEMU's GDB stub does not support 'info proc cmdline' + envp = [] # Can't get the remote target's environment + env = TraceEnvironment(executable, argv, envp, '?') + except Exception as e: + raise Exception(f'Unable to create trace environment for executable {executable}: {e}') + + # Read pre-computed symbolic trace + try: + with open(args.symb_trace, 'r') as strace: + symb_transforms = parser.parse_transformations(strace) + except Exception as e: + raise Exception(f'Failed to parse state transformations from native trace: {e}') + + # Use symbolic trace to collect concrete trace from QEMU + try: + conc_states, matched_transforms = collect_conc_trace( + gdb_server, + symb_transforms.states, + symb_transforms.env.start_address, + symb_transforms.env.stop_address) + except Exception as e: + raise Exception(f'Failed to collect concolic trace from QEMU: {e}') + + # Verify and print result + if not args.quiet: + try: + res = compare_symbolic(conc_states, matched_transforms) + if qemu_crash["crashed"]: + res.append({ + 'pc': qemu_crash["pc"], + 'txl': None, + 'ref': qemu_crash["ref"], + 'errors': qemu_crash["errors"], + 'snap': qemu_crash["snap"], + }) + print_result(res, verbosity[args.error_level]) + except Exception as e: + raise Exception('Error occured when comparing with symbolic equations: {e}') + + if args.output: + from focaccia.parser import serialize_snapshots + try: + with open(args.output, 'w') as file: + serialize_snapshots(Trace(conc_states, env), file) + except Exception as e: + raise Exception(f'Unable to serialize snapshots to file {args.output}: {e}') + +if __name__ == "__main__": + main() + diff --git a/src/focaccia/qemu/validation_server.py b/src/focaccia/qemu/validation_server.py new file mode 100755 index 0000000..db33ff3 --- /dev/null +++ b/src/focaccia/qemu/validation_server.py @@ -0,0 +1,409 @@ +#! /usr/bin/env python3 + +import os +import socket +import struct +import logging +from typing import Iterable + +import focaccia.parser as parser +from focaccia.arch import supported_architectures, Arch +from focaccia.compare import compare_symbolic, ErrorTypes +from focaccia.snapshot import ProgramState, RegisterAccessError, MemoryAccessError +from focaccia.symbolic import SymbolicTransform, eval_symbol, ExprMem +from focaccia.trace import Trace +from focaccia.utils import print_result + + +logger = logging.getLogger('focaccia-qemu-validation-server') +debug = logger.debug +info = logger.info +warn = logger.warning + + +def endian_fmt(endianness: str) -> str: + if endianness == 'little': + return '<' + else: + return '>' + +def mk_command(cmd: str, endianness: str, reg: str="", addr: int=0, size: int=0) -> bytes: + # char[16]:regname | long long:addr long long:size | long long:unused + # READ REG | READ MEM | STEP ONE + + if cmd == 'read register': + fmt = f'{endian_fmt(endianness)}16s9s' + return struct.pack(fmt,reg.encode('utf-8'),"READ REG".encode('utf-8')) + elif cmd == 'read memory': + fmt = f'{endian_fmt(endianness)}QQ9s' + return struct.pack(fmt, addr, size, "READ MEM".encode('utf-8')) + elif cmd == 'step': + fmt = f'{endian_fmt(endianness)}qq9s' + return struct.pack(fmt, 0, 0, "STEP ONE".encode('utf-8')) + else: + raise ValueError(f'Unknown command {cmd}') +def unmk_memory(msg: bytes, endianness: str) -> tuple: + # packed! + # unsigned long long: addr + # unsigned long: length + fmt = f'{endian_fmt(endianness)}QQ' + addr, length = struct.unpack(fmt, msg) + + return addr, length + +def unmk_register(msg: bytes, endianness: str) -> tuple: + # packed! + # char[108]:regname | unsigned long:bytes | char[64]:value + fmt = f'{endian_fmt(endianness)}108sQ64s' + reg_name, size, val = struct.unpack(fmt, msg) + reg_name = reg_name.decode('utf-8').rstrip('\x00') + + if reg_name == "UNKNOWN": + raise RegisterAccessError(reg_name, + f'[QEMU Plugin] Unable to access register {reg_name}.') + + val = val[:size] + val = int.from_bytes(val, endianness) + return val, size + +class PluginProgramState(ProgramState): + from focaccia.arch import aarch64, x86 + + flag_register_names = { + aarch64.archname: 'cpsr', + x86.archname: 'eflags', + } + + flag_register_decompose = { + aarch64.archname: aarch64.decompose_cpsr, + x86.archname: x86.decompose_rflags, + } + + def _flush_caches(self): + for r in self.regs.keys(): + self.regs[r] = None + self.mem.drop_all() + + + def __init__(self, arch: Arch): + super().__init__(arch) + self.strict = False + + def read_register(self, reg: str, no_cached: bool=False) -> int: + global CONN + + if reg == 'RFLAGS': + reg = 'EFLAGS' + + flags = self.flag_register_decompose[self.arch.archname](0).keys() + if reg in flags and self.arch.archname in self.flag_register_names: + reg_name = self.flag_register_names[self.arch.archname] + else: + reg_name = self.arch.to_regname(reg) + + if reg_name is None: + raise RegisterAccessError(reg, f'Not a register name: {reg}') + + reg_acc = self.arch.get_reg_accessor(reg_name) + if reg_acc is None: + raise RegisterAccessError(reg, f'Not a enclosing register name: {reg}') + exit(-1) + reg_name = reg_acc.base_reg.lower() + + val = None + from_cache = False + if not no_cached and super().test_register(reg_name): + val = super().read_register(reg_name) + from_cache = True + else: + msg = mk_command("read register", self.arch.endianness, reg=reg_name) + CONN.send(msg) + + try: + resp = CONN.recv(180) + except ConnectionResetError: + raise StopIteration + + if len(resp) < 180: + raise RegisterAccessError(reg, f'Invalid response length when reading {reg}: {len(resp)}' + f' for response {resp}') + + val, size = unmk_register(resp, self.arch.endianness) + + # Try to access the flags register with `reg` as a logical flag name + if reg in flags and self.arch.archname in self.flag_register_names: + flags_reg = self.flag_register_names[self.arch.archname] + _flags = self.flag_register_decompose[self.arch.archname](val) + if reg in _flags: + if not from_cache: + self.set_register(reg, _flags[reg]) + return _flags[reg] + raise RegisterAccessError(f'Unable to access flag {reg}.') + + if not from_cache: + self.set_register(reg, val) + return val & reg_acc.mask >> reg_acc.start + + def read_memory(self, addr: int, size: int) -> bytes: + global CONN + + if self.mem.test(addr): + return super().read_memory(addr, size) + + # print(f'Reading memory at {addr:x}, size={size}') + + msg = mk_command("read memory", self.arch.endianness, addr=addr, size=size) + CONN.send(msg) + + try: + resp = CONN.recv(16) + except ConnectionResetError: + raise StopIteration + _addr, length = unmk_memory(resp, self.arch.endianness) + + if _addr != addr or length == 0: + raise MemoryAccessError( + _addr, size, + f'Unable to access memory at address {addr:x}, size={size}.') + return b'' + + mem = b'' + while len(mem) < length: + try: + resp = CONN.recv(length - len(mem)) + except ConnectionResetError: + raise StopIteration + mem += resp + + self.write_memory(addr, mem) + return mem + + def step(self): + global CONN + + self._flush_caches() + msg = mk_command("step", self.arch.endianness) + CONN.send(msg) + + + return + +class PluginStateIterator: + + def __init__(self, sock_path: str, arch: Arch): + global SOCK + global CONN + + self.sock_path = sock_path + self.arch = arch + self._first_next = True + + + # Start the server that waits for QEMU to connect + try: + os.unlink(self.sock_path) + except FileNotFoundError: + pass + # TODO: allow new connections when QEMU clones + SOCK = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + + print(f'Listening for QEMU Plugin connection at {self.sock_path}...') + SOCK.bind(self.sock_path) + SOCK.listen(1) + + CONN, qemu_addr = SOCK.accept() + + # Handshake with QEMU + pid_b = CONN.recv(4) + pid = struct.unpack('i', pid_b)[0] + print(f'Connected to QEMU instance with PID {pid}.') + + def __iter__(self): + return self + + def __next__(self): + # The first call to __next__ should yield the first program state, + # i.e. after stepping the first time + if self._first_next: + self._first_next = False + self.state = PluginProgramState(self.arch) + #self.state.step() + return self.state + + # Step + pc = self.state.read_register('pc') + new_pc = pc + while pc == new_pc: # Skip instruction chains from REP STOS etc. + self.state.step() + new_pc = self.state.read_register('pc', True) + + return self.state + +def record_minimal_snapshot(prev_state: ProgramState, + cur_state: PluginProgramState, + prev_transform: SymbolicTransform, + cur_transform: SymbolicTransform) \ + -> ProgramState: + """Record a minimal snapshot. + + A minimal snapshot must include values (registers and memory) that are + accessed by two transformations: + 1. The values produced by the previous transformation (the + transformation that is producing this snapshot) to check these + values against expected values calculated from the previous + program state. + 2. The values that act as inputs to the transformation acting on this + snapshot, to calculate the expected values of the next snapshot. + + :param prev_transform: The symbolic transformation generating, or + leading to, `cur_state`. Values generated by + this transformation are included in the + snapshot. + :param transform: The symbolic transformation operating on this + snapshot. Input values to this transformation are + included in the snapshot. + """ + assert(cur_state.read_register('pc') == cur_transform.addr) + assert(prev_transform.arch == cur_transform.arch) + + def get_written_addresses(t: SymbolicTransform) -> Iterable[ExprMem]: + """Get all output memory accesses of a symbolic transformation.""" + return [ExprMem(a, v.size) for a, v in t.changed_mem.items()] + + def set_values(regs: Iterable[str], mems: Iterable[ExprMem], + cur_state: PluginProgramState, + prev_state: PluginProgramState, + out_state: ProgramState): + """ + :param prev_state: Addresses of memory included in the snapshot are + resolved relative to this state. + """ + for regname in regs: + try: + regval = cur_state.read_register(regname) + out_state.set_register(regname, regval) + except RegisterAccessError: + out_state.set_register(regname, 0) + for mem in mems: + assert(mem.size % 8 == 0) + addr = eval_symbol(mem.ptr, prev_state) + try: + mem = cur_state.read_memory(addr, int(mem.size / 8)) + out_state.write_memory(addr, mem) + except MemoryAccessError: + pass + + state = ProgramState(cur_transform.arch) + state.set_register('pc', cur_transform.addr) + + set_values(prev_transform.changed_regs.keys(), + get_written_addresses(prev_transform), + cur_state, + prev_state, + state) + set_values(cur_transform.get_used_registers(), + cur_transform.get_used_memory_addresses(), + cur_state, + cur_state, + state) + return state + +def collect_conc_trace(qemu: PluginStateIterator, \ + strace: list[SymbolicTransform]) \ + -> tuple[list[ProgramState], list[SymbolicTransform]]: + """Collect a trace of concrete states from QEMU. + + Records minimal concrete states from QEMU by using symbolic trace + information to determine which register/memory values are required to + verify the correctness of QEMU. + + May drop symbolic transformations if the symbolic trace and the QEMU trace + diverge (e.g. because of differences in environment, etc.). Returns the + new, possibly modified, symbolic trace that matches the returned concrete + trace. + + :return: A list of concrete states and a list of corresponding symbolic + transformations. The lists are guaranteed to have the same length. + """ + def find_index(seq, target, access=lambda el: el): + for i, el in enumerate(seq): + if access(el) == target: + return i + return None + + if not strace: + return [], [] + + states = [] + matched_transforms = [] + + state_iter = iter(qemu) + cur_state = next(state_iter) + symb_i = 0 + + # An online trace matching algorithm. + while True: + try: + pc = cur_state.read_register('pc') + + while pc != strace[symb_i].addr: + next_i = find_index(strace[symb_i+1:], pc, lambda t: t.addr) + + # Drop the concrete state if no address in the symbolic trace + # matches + if next_i is None: + print(f'Warning: Dropping concrete state {hex(pc)}, as no' + f' matching instruction can be found in the symbolic' + f' reference trace.') + cur_state = next(state_iter) + pc = cur_state.read_register('pc', True) + continue + + # Otherwise, jump to the next matching symbolic state + symb_i += next_i + 1 + + assert(cur_state.read_register('pc') == strace[symb_i].addr) + states.append(record_minimal_snapshot( + states[-1] if states else cur_state, + cur_state, + matched_transforms[-1] if matched_transforms else strace[symb_i], + strace[symb_i])) + matched_transforms.append(strace[symb_i]) + cur_state = next(state_iter) + symb_i += 1 + except StopIteration: + break + + return states, matched_transforms + + +def start_validation_server(symb_trace: str, + output: str, + socket: str, + guest_arch: str, + env, + verbosity: ErrorTypes, + is_quiet: bool = False): + # Read pre-computed symbolic trace + with open(symb_trace, 'r') as strace: + symb_transforms = parser.parse_transformations(strace) + + arch = supported_architectures.get(guest_arch) + + qemu = PluginStateIterator(socket, arch) + + # Use symbolic trace to collect concrete trace from QEMU + conc_states, matched_transforms = collect_conc_trace( + qemu, + symb_transforms.states) + + # Verify and print result + if not is_quiet: + res = compare_symbolic(conc_states, matched_transforms) + print_result(res, verbosity) + + if output: + from focaccia.parser import serialize_snapshots + with open(output, 'w') as file: + serialize_snapshots(Trace(conc_states, env), file) + diff --git a/src/focaccia/tools/_qemu_tool.py b/src/focaccia/tools/_qemu_tool.py deleted file mode 100644 index 02d150b..0000000 --- a/src/focaccia/tools/_qemu_tool.py +++ /dev/null @@ -1,414 +0,0 @@ -"""Invocable like this: - - gdb -n --batch -x qemu_tool.py - -But please use `tools/validate_qemu.py` instead because we have some more setup -work to do. -""" - -import gdb -import logging -import traceback -from typing import Iterable - -import focaccia.parser as parser -from focaccia.arch import supported_architectures, Arch -from focaccia.compare import compare_symbolic, Error, ErrorTypes -from focaccia.snapshot import ProgramState, ReadableProgramState, \ - RegisterAccessError, MemoryAccessError -from focaccia.symbolic import SymbolicTransform, eval_symbol, ExprMem -from focaccia.trace import Trace, TraceEnvironment -from focaccia.utils import print_result - -from validate_qemu import make_argparser, verbosity - -logger = logging.getLogger('focaccia-qemu-validator') -debug = logger.debug -info = logger.info -warn = logger.warning - -qemu_crash = { - "crashed": False, - "pc": None, - 'txl': None, - 'ref': None, - 'errors': [Error(ErrorTypes.CONFIRMED, "QEMU crashed")], - 'snap': None, -} - -class GDBProgramState(ReadableProgramState): - from focaccia.arch import aarch64, x86 - - flag_register_names = { - aarch64.archname: 'cpsr', - x86.archname: 'eflags', - } - - flag_register_decompose = { - aarch64.archname: aarch64.decompose_cpsr, - x86.archname: x86.decompose_rflags, - } - - def __init__(self, process: gdb.Inferior, frame: gdb.Frame, arch: Arch): - super().__init__(arch) - self._proc = process - self._frame = frame - - @staticmethod - def _read_vector_reg_aarch64(val: gdb.Value, size) -> int: - try: - return int(str(val['d']['u']), 10) - except: - try: - return int(str(val['u']), 10) - except: - return int(str(val['q']['u']), 10) - - @staticmethod - def _read_vector_reg_x86(val: gdb.Value, size) -> int: - num_longs = size // 64 - vals = val[f'v{num_longs}_int64'] - res = 0 - for i in range(num_longs): - val = int(vals[i].cast(gdb.lookup_type('unsigned long'))) - res += val << i * 64 - return res - - read_vector_reg = { - aarch64.archname: _read_vector_reg_aarch64, - x86.archname: _read_vector_reg_x86, - } - - def read_register(self, reg: str) -> int: - if reg == 'RFLAGS': - reg = 'EFLAGS' - - try: - val = self._frame.read_register(reg.lower()) - size = val.type.sizeof * 8 - - # For vector registers, we need to apply architecture-specific - # logic because GDB's interface is not consistent. - if size >= 128: # Value is a vector - if self.arch.archname not in self.read_vector_reg: - raise NotImplementedError( - f'Reading vector registers is not implemented for' - f' architecture {self.arch.archname}.') - return self.read_vector_reg[self.arch.archname](val, size) - elif size < 64: - return int(val.cast(gdb.lookup_type('unsigned int'))) - # For non-vector values, just return the 64-bit value - return int(val.cast(gdb.lookup_type('unsigned long'))) - except ValueError as err: - # Try to access the flags register with `reg` as a logical flag name - if self.arch.archname in self.flag_register_names: - flags_reg = self.flag_register_names[self.arch.archname] - flags = int(self._frame.read_register(flags_reg)) - flags = self.flag_register_decompose[self.arch.archname](flags) - if reg in flags: - return flags[reg] - raise RegisterAccessError(reg, - f'[GDB] Unable to access {reg}: {err}') - - def read_memory(self, addr: int, size: int) -> bytes: - try: - mem = self._proc.read_memory(addr, size).tobytes() - if self.arch.endianness == 'little': - return mem - else: - return bytes(reversed(mem)) # Convert to big endian - except gdb.MemoryError as err: - raise MemoryAccessError(addr, size, str(err)) - -class GDBServerStateIterator: - def __init__(self, remote: str): - gdb.execute('set pagination 0') - gdb.execute('set sysroot') - gdb.execute('set python print-stack full') # enable complete Python tracebacks - gdb.execute(f'target remote {remote}') - self._process = gdb.selected_inferior() - self._first_next = True - - # Try to determine the guest architecture. This is a bit hacky and - # tailored to GDB's naming for the x86-64 architecture. - split = self._process.architecture().name().split(':') - archname = split[1] if len(split) > 1 else split[0] - archname = archname.replace('-', '_') - if archname not in supported_architectures: - print(f'Error: Current platform ({archname}) is not' - f' supported by Focaccia. Exiting.') - exit(1) - - self.arch = supported_architectures[archname] - self.binary = self._process.progspace.filename - - def __iter__(self): - return self - - def __next__(self): - # The first call to __next__ should yield the first program state, - # i.e. before stepping the first time - if self._first_next: - self._first_next = False - return GDBProgramState(self._process, gdb.selected_frame(), self.arch) - - # Step - pc = gdb.selected_frame().read_register('pc') - new_pc = pc - while pc == new_pc: # Skip instruction chains from REP STOS etc. - gdb.execute('si', to_string=True) - if not self._process.is_valid() or len(self._process.threads()) == 0: - raise StopIteration - new_pc = gdb.selected_frame().read_register('pc') - - return GDBProgramState(self._process, gdb.selected_frame(), self.arch) - - def run_until(self, addr: int): - breakpoint = gdb.Breakpoint(f'*{addr:#x}') - gdb.execute('continue') - breakpoint.delete() - return GDBProgramState(self._process, gdb.selected_frame(), self.arch) - -def record_minimal_snapshot(prev_state: ReadableProgramState, - cur_state: ReadableProgramState, - prev_transform: SymbolicTransform, - cur_transform: SymbolicTransform) \ - -> ProgramState: - """Record a minimal snapshot. - - A minimal snapshot must include values (registers and memory) that are - accessed by two transformations: - 1. The values produced by the previous transformation (the - transformation that is producing this snapshot) to check these - values against expected values calculated from the previous - program state. - 2. The values that act as inputs to the transformation acting on this - snapshot, to calculate the expected values of the next snapshot. - - :param prev_transform: The symbolic transformation generating, or - leading to, `cur_state`. Values generated by - this transformation are included in the - snapshot. - :param transform: The symbolic transformation operating on this - snapshot. Input values to this transformation are - included in the snapshot. - """ - assert(cur_state.read_register('pc') == cur_transform.addr) - assert(prev_transform.arch == cur_transform.arch) - - def get_written_addresses(t: SymbolicTransform): - """Get all output memory accesses of a symbolic transformation.""" - return [ExprMem(a, v.size) for a, v in t.changed_mem.items()] - - def set_values(regs: Iterable[str], mems: Iterable[ExprMem], - cur_state: ReadableProgramState, - prev_state: ReadableProgramState, - out_state: ProgramState): - """ - :param prev_state: Addresses of memory included in the snapshot are - resolved relative to this state. - """ - for regname in regs: - try: - regval = cur_state.read_register(regname) - out_state.set_register(regname, regval) - except RegisterAccessError: - pass - for mem in mems: - assert(mem.size % 8 == 0) - addr = eval_symbol(mem.ptr, prev_state) - try: - mem = cur_state.read_memory(addr, int(mem.size / 8)) - out_state.write_memory(addr, mem) - except MemoryAccessError: - pass - - state = ProgramState(cur_transform.arch) - state.set_register('PC', cur_transform.addr) - - set_values(prev_transform.changed_regs.keys(), - get_written_addresses(prev_transform), - cur_state, - prev_state, # Evaluate memory addresses based on previous - # state because they are that state's output - # addresses. - state) - set_values(cur_transform.get_used_registers(), - cur_transform.get_used_memory_addresses(), - cur_state, - cur_state, - state) - return state - -def collect_conc_trace(gdb: GDBServerStateIterator, \ - strace: list[SymbolicTransform], - start_addr: int | None = None, - stop_addr: int | None = None) \ - -> tuple[list[ProgramState], list[SymbolicTransform]]: - """Collect a trace of concrete states from GDB. - - Records minimal concrete states from GDB by using symbolic trace - information to determine which register/memory values are required to - verify the correctness of the program running in GDB. - - May drop symbolic transformations if the symbolic trace and the GDB trace - diverge (e.g. because of differences in environment, etc.). Returns the - new, possibly modified, symbolic trace that matches the returned concrete - trace. - - :return: A list of concrete states and a list of corresponding symbolic - transformations. The lists are guaranteed to have the same length. - """ - def find_index(seq, target, access=lambda el: el): - for i, el in enumerate(seq): - if access(el) == target: - return i - return None - - if not strace: - return [], [] - - states = [] - matched_transforms = [] - - state_iter = iter(gdb) - cur_state = next(state_iter) - symb_i = 0 - - # Skip to start - try: - pc = cur_state.read_register('pc') - if start_addr and pc != start_addr: - info(f'Tracing QEMU from starting address: {hex(start_addr)}') - cur_state = state_iter.run_until(start_addr) - except Exception as e: - if start_addr: - raise Exception(f'Unable to reach start address {hex(start_addr)}: {e}') - raise Exception(f'Unable to trace: {e}') - - # An online trace matching algorithm. - while True: - try: - pc = cur_state.read_register('pc') - - while pc != strace[symb_i].addr: - info(f'PC {hex(pc)} does not match next symbolic reference {hex(strace[symb_i].addr)}') - - next_i = find_index(strace[symb_i+1:], pc, lambda t: t.addr) - - # Drop the concrete state if no address in the symbolic trace - # matches - if next_i is None: - warn(f'Dropping concrete state {hex(pc)}, as no' - f' matching instruction can be found in the symbolic' - f' reference trace.') - cur_state = next(state_iter) - pc = cur_state.read_register('pc') - continue - - # Otherwise, jump to the next matching symbolic state - symb_i += next_i + 1 - if symb_i >= len(strace): - break - - assert(cur_state.read_register('pc') == strace[symb_i].addr) - info(f'Validating instruction at address {hex(pc)}') - states.append(record_minimal_snapshot( - states[-1] if states else cur_state, - cur_state, - matched_transforms[-1] if matched_transforms else strace[symb_i], - strace[symb_i])) - matched_transforms.append(strace[symb_i]) - cur_state = next(state_iter) - symb_i += 1 - if symb_i >= len(strace): - break - except StopIteration: - # TODO: The conditions may test for the same - if stop_addr and pc != stop_addr: - raise Exception(f'QEMU stopped at {hex(pc)} before reaching the stop address' - f' {hex(stop_addr)}') - if symb_i+1 < len(strace): - qemu_crash["crashed"] = True - qemu_crash["pc"] = strace[symb_i].addr - qemu_crash["ref"] = strace[symb_i] - qemu_crash["snap"] = states[-1] - break - except Exception as e: - print(traceback.format_exc()) - raise e - - # Note: this may occur when symbolic traces were gathered with a stop address - if symb_i >= len(strace): - warn(f'QEMU executed more states than native execution: {symb_i} vs {len(strace)-1}') - - return states, matched_transforms - -def main(): - args = make_argparser().parse_args() - - logging_level = getattr(logging, args.error_level.upper(), logging.INFO) - logging.basicConfig(level=logging_level, force=True) - - try: - gdb_server = GDBServerStateIterator(args.remote) - except Exception as e: - raise Exception(f'Unable to perform basic GDB setup: {e}') - - try: - executable: str | None = None - if args.executable is None: - executable = gdb_server.binary - else: - executable = args.executable - - argv = [] # QEMU's GDB stub does not support 'info proc cmdline' - envp = [] # Can't get the remote target's environment - env = TraceEnvironment(executable, argv, envp, '?') - except Exception as e: - raise Exception(f'Unable to create trace environment for executable {executable}: {e}') - - # Read pre-computed symbolic trace - try: - with open(args.symb_trace, 'r') as strace: - symb_transforms = parser.parse_transformations(strace) - except Exception as e: - raise Exception(f'Failed to parse state transformations from native trace: {e}') - - # Use symbolic trace to collect concrete trace from QEMU - try: - conc_states, matched_transforms = collect_conc_trace( - gdb_server, - symb_transforms.states, - symb_transforms.env.start_address, - symb_transforms.env.stop_address) - except Exception as e: - raise Exception(f'Failed to collect concolic trace from QEMU: {e}') - - # Verify and print result - if not args.quiet: - try: - res = compare_symbolic(conc_states, matched_transforms) - if qemu_crash["crashed"]: - res.append({ - 'pc': qemu_crash["pc"], - 'txl': None, - 'ref': qemu_crash["ref"], - 'errors': qemu_crash["errors"], - 'snap': qemu_crash["snap"], - }) - print_result(res, verbosity[args.error_level]) - except Exception as e: - raise Exception('Error occured when comparing with symbolic equations: {e}') - - if args.output: - from focaccia.parser import serialize_snapshots - try: - with open(args.output, 'w') as file: - serialize_snapshots(Trace(conc_states, env), file) - except Exception as e: - raise Exception(f'Unable to serialize snapshots to file {args.output}: {e}') - -if __name__ == "__main__": - main() - diff --git a/src/focaccia/tools/validate_qemu.py b/src/focaccia/tools/validate_qemu.py index e834a6d..48b3f1c 100755 --- a/src/focaccia/tools/validate_qemu.py +++ b/src/focaccia/tools/validate_qemu.py @@ -23,9 +23,10 @@ import argparse import sysconfig import subprocess +import focaccia.qemu from focaccia.compare import ErrorTypes from focaccia.arch import supported_architectures -from focaccia.tools.validation_server import start_validation_server +from focaccia.qemu.validation_server import start_validation_server verbosity = { 'info': ErrorTypes.INFO, @@ -118,7 +119,7 @@ def main(): args.quiet) else: # QEMU GDB interface - script_dirname = os.path.dirname(__file__) + script_dirname = os.path.dirname(focaccia.qemu.__file__) qemu_tool_path = os.path.join(script_dirname, '_qemu_tool.py') # We have to remove all arguments we don't want to pass to the qemu tool diff --git a/src/focaccia/tools/validation_server.py b/src/focaccia/tools/validation_server.py deleted file mode 100755 index db33ff3..0000000 --- a/src/focaccia/tools/validation_server.py +++ /dev/null @@ -1,409 +0,0 @@ -#! /usr/bin/env python3 - -import os -import socket -import struct -import logging -from typing import Iterable - -import focaccia.parser as parser -from focaccia.arch import supported_architectures, Arch -from focaccia.compare import compare_symbolic, ErrorTypes -from focaccia.snapshot import ProgramState, RegisterAccessError, MemoryAccessError -from focaccia.symbolic import SymbolicTransform, eval_symbol, ExprMem -from focaccia.trace import Trace -from focaccia.utils import print_result - - -logger = logging.getLogger('focaccia-qemu-validation-server') -debug = logger.debug -info = logger.info -warn = logger.warning - - -def endian_fmt(endianness: str) -> str: - if endianness == 'little': - return '<' - else: - return '>' - -def mk_command(cmd: str, endianness: str, reg: str="", addr: int=0, size: int=0) -> bytes: - # char[16]:regname | long long:addr long long:size | long long:unused - # READ REG | READ MEM | STEP ONE - - if cmd == 'read register': - fmt = f'{endian_fmt(endianness)}16s9s' - return struct.pack(fmt,reg.encode('utf-8'),"READ REG".encode('utf-8')) - elif cmd == 'read memory': - fmt = f'{endian_fmt(endianness)}QQ9s' - return struct.pack(fmt, addr, size, "READ MEM".encode('utf-8')) - elif cmd == 'step': - fmt = f'{endian_fmt(endianness)}qq9s' - return struct.pack(fmt, 0, 0, "STEP ONE".encode('utf-8')) - else: - raise ValueError(f'Unknown command {cmd}') -def unmk_memory(msg: bytes, endianness: str) -> tuple: - # packed! - # unsigned long long: addr - # unsigned long: length - fmt = f'{endian_fmt(endianness)}QQ' - addr, length = struct.unpack(fmt, msg) - - return addr, length - -def unmk_register(msg: bytes, endianness: str) -> tuple: - # packed! - # char[108]:regname | unsigned long:bytes | char[64]:value - fmt = f'{endian_fmt(endianness)}108sQ64s' - reg_name, size, val = struct.unpack(fmt, msg) - reg_name = reg_name.decode('utf-8').rstrip('\x00') - - if reg_name == "UNKNOWN": - raise RegisterAccessError(reg_name, - f'[QEMU Plugin] Unable to access register {reg_name}.') - - val = val[:size] - val = int.from_bytes(val, endianness) - return val, size - -class PluginProgramState(ProgramState): - from focaccia.arch import aarch64, x86 - - flag_register_names = { - aarch64.archname: 'cpsr', - x86.archname: 'eflags', - } - - flag_register_decompose = { - aarch64.archname: aarch64.decompose_cpsr, - x86.archname: x86.decompose_rflags, - } - - def _flush_caches(self): - for r in self.regs.keys(): - self.regs[r] = None - self.mem.drop_all() - - - def __init__(self, arch: Arch): - super().__init__(arch) - self.strict = False - - def read_register(self, reg: str, no_cached: bool=False) -> int: - global CONN - - if reg == 'RFLAGS': - reg = 'EFLAGS' - - flags = self.flag_register_decompose[self.arch.archname](0).keys() - if reg in flags and self.arch.archname in self.flag_register_names: - reg_name = self.flag_register_names[self.arch.archname] - else: - reg_name = self.arch.to_regname(reg) - - if reg_name is None: - raise RegisterAccessError(reg, f'Not a register name: {reg}') - - reg_acc = self.arch.get_reg_accessor(reg_name) - if reg_acc is None: - raise RegisterAccessError(reg, f'Not a enclosing register name: {reg}') - exit(-1) - reg_name = reg_acc.base_reg.lower() - - val = None - from_cache = False - if not no_cached and super().test_register(reg_name): - val = super().read_register(reg_name) - from_cache = True - else: - msg = mk_command("read register", self.arch.endianness, reg=reg_name) - CONN.send(msg) - - try: - resp = CONN.recv(180) - except ConnectionResetError: - raise StopIteration - - if len(resp) < 180: - raise RegisterAccessError(reg, f'Invalid response length when reading {reg}: {len(resp)}' - f' for response {resp}') - - val, size = unmk_register(resp, self.arch.endianness) - - # Try to access the flags register with `reg` as a logical flag name - if reg in flags and self.arch.archname in self.flag_register_names: - flags_reg = self.flag_register_names[self.arch.archname] - _flags = self.flag_register_decompose[self.arch.archname](val) - if reg in _flags: - if not from_cache: - self.set_register(reg, _flags[reg]) - return _flags[reg] - raise RegisterAccessError(f'Unable to access flag {reg}.') - - if not from_cache: - self.set_register(reg, val) - return val & reg_acc.mask >> reg_acc.start - - def read_memory(self, addr: int, size: int) -> bytes: - global CONN - - if self.mem.test(addr): - return super().read_memory(addr, size) - - # print(f'Reading memory at {addr:x}, size={size}') - - msg = mk_command("read memory", self.arch.endianness, addr=addr, size=size) - CONN.send(msg) - - try: - resp = CONN.recv(16) - except ConnectionResetError: - raise StopIteration - _addr, length = unmk_memory(resp, self.arch.endianness) - - if _addr != addr or length == 0: - raise MemoryAccessError( - _addr, size, - f'Unable to access memory at address {addr:x}, size={size}.') - return b'' - - mem = b'' - while len(mem) < length: - try: - resp = CONN.recv(length - len(mem)) - except ConnectionResetError: - raise StopIteration - mem += resp - - self.write_memory(addr, mem) - return mem - - def step(self): - global CONN - - self._flush_caches() - msg = mk_command("step", self.arch.endianness) - CONN.send(msg) - - - return - -class PluginStateIterator: - - def __init__(self, sock_path: str, arch: Arch): - global SOCK - global CONN - - self.sock_path = sock_path - self.arch = arch - self._first_next = True - - - # Start the server that waits for QEMU to connect - try: - os.unlink(self.sock_path) - except FileNotFoundError: - pass - # TODO: allow new connections when QEMU clones - SOCK = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - - print(f'Listening for QEMU Plugin connection at {self.sock_path}...') - SOCK.bind(self.sock_path) - SOCK.listen(1) - - CONN, qemu_addr = SOCK.accept() - - # Handshake with QEMU - pid_b = CONN.recv(4) - pid = struct.unpack('i', pid_b)[0] - print(f'Connected to QEMU instance with PID {pid}.') - - def __iter__(self): - return self - - def __next__(self): - # The first call to __next__ should yield the first program state, - # i.e. after stepping the first time - if self._first_next: - self._first_next = False - self.state = PluginProgramState(self.arch) - #self.state.step() - return self.state - - # Step - pc = self.state.read_register('pc') - new_pc = pc - while pc == new_pc: # Skip instruction chains from REP STOS etc. - self.state.step() - new_pc = self.state.read_register('pc', True) - - return self.state - -def record_minimal_snapshot(prev_state: ProgramState, - cur_state: PluginProgramState, - prev_transform: SymbolicTransform, - cur_transform: SymbolicTransform) \ - -> ProgramState: - """Record a minimal snapshot. - - A minimal snapshot must include values (registers and memory) that are - accessed by two transformations: - 1. The values produced by the previous transformation (the - transformation that is producing this snapshot) to check these - values against expected values calculated from the previous - program state. - 2. The values that act as inputs to the transformation acting on this - snapshot, to calculate the expected values of the next snapshot. - - :param prev_transform: The symbolic transformation generating, or - leading to, `cur_state`. Values generated by - this transformation are included in the - snapshot. - :param transform: The symbolic transformation operating on this - snapshot. Input values to this transformation are - included in the snapshot. - """ - assert(cur_state.read_register('pc') == cur_transform.addr) - assert(prev_transform.arch == cur_transform.arch) - - def get_written_addresses(t: SymbolicTransform) -> Iterable[ExprMem]: - """Get all output memory accesses of a symbolic transformation.""" - return [ExprMem(a, v.size) for a, v in t.changed_mem.items()] - - def set_values(regs: Iterable[str], mems: Iterable[ExprMem], - cur_state: PluginProgramState, - prev_state: PluginProgramState, - out_state: ProgramState): - """ - :param prev_state: Addresses of memory included in the snapshot are - resolved relative to this state. - """ - for regname in regs: - try: - regval = cur_state.read_register(regname) - out_state.set_register(regname, regval) - except RegisterAccessError: - out_state.set_register(regname, 0) - for mem in mems: - assert(mem.size % 8 == 0) - addr = eval_symbol(mem.ptr, prev_state) - try: - mem = cur_state.read_memory(addr, int(mem.size / 8)) - out_state.write_memory(addr, mem) - except MemoryAccessError: - pass - - state = ProgramState(cur_transform.arch) - state.set_register('pc', cur_transform.addr) - - set_values(prev_transform.changed_regs.keys(), - get_written_addresses(prev_transform), - cur_state, - prev_state, - state) - set_values(cur_transform.get_used_registers(), - cur_transform.get_used_memory_addresses(), - cur_state, - cur_state, - state) - return state - -def collect_conc_trace(qemu: PluginStateIterator, \ - strace: list[SymbolicTransform]) \ - -> tuple[list[ProgramState], list[SymbolicTransform]]: - """Collect a trace of concrete states from QEMU. - - Records minimal concrete states from QEMU by using symbolic trace - information to determine which register/memory values are required to - verify the correctness of QEMU. - - May drop symbolic transformations if the symbolic trace and the QEMU trace - diverge (e.g. because of differences in environment, etc.). Returns the - new, possibly modified, symbolic trace that matches the returned concrete - trace. - - :return: A list of concrete states and a list of corresponding symbolic - transformations. The lists are guaranteed to have the same length. - """ - def find_index(seq, target, access=lambda el: el): - for i, el in enumerate(seq): - if access(el) == target: - return i - return None - - if not strace: - return [], [] - - states = [] - matched_transforms = [] - - state_iter = iter(qemu) - cur_state = next(state_iter) - symb_i = 0 - - # An online trace matching algorithm. - while True: - try: - pc = cur_state.read_register('pc') - - while pc != strace[symb_i].addr: - next_i = find_index(strace[symb_i+1:], pc, lambda t: t.addr) - - # Drop the concrete state if no address in the symbolic trace - # matches - if next_i is None: - print(f'Warning: Dropping concrete state {hex(pc)}, as no' - f' matching instruction can be found in the symbolic' - f' reference trace.') - cur_state = next(state_iter) - pc = cur_state.read_register('pc', True) - continue - - # Otherwise, jump to the next matching symbolic state - symb_i += next_i + 1 - - assert(cur_state.read_register('pc') == strace[symb_i].addr) - states.append(record_minimal_snapshot( - states[-1] if states else cur_state, - cur_state, - matched_transforms[-1] if matched_transforms else strace[symb_i], - strace[symb_i])) - matched_transforms.append(strace[symb_i]) - cur_state = next(state_iter) - symb_i += 1 - except StopIteration: - break - - return states, matched_transforms - - -def start_validation_server(symb_trace: str, - output: str, - socket: str, - guest_arch: str, - env, - verbosity: ErrorTypes, - is_quiet: bool = False): - # Read pre-computed symbolic trace - with open(symb_trace, 'r') as strace: - symb_transforms = parser.parse_transformations(strace) - - arch = supported_architectures.get(guest_arch) - - qemu = PluginStateIterator(socket, arch) - - # Use symbolic trace to collect concrete trace from QEMU - conc_states, matched_transforms = collect_conc_trace( - qemu, - symb_transforms.states) - - # Verify and print result - if not is_quiet: - res = compare_symbolic(conc_states, matched_transforms) - print_result(res, verbosity) - - if output: - from focaccia.parser import serialize_snapshots - with open(output, 'w') as file: - serialize_snapshots(Trace(conc_states, env), file) - -- cgit 1.4.1 From 917edeb7ecfc3335b1be9bc1ef23b7122405b1fb Mon Sep 17 00:00:00 2001 From: Theofilos Augoustis Date: Thu, 13 Nov 2025 14:43:02 +0000 Subject: Move matching functionality out of event (cannot rely on entire program state to match) --- src/focaccia/deterministic.py | 12 ------------ src/focaccia/native/tracer.py | 15 ++++++++++++++- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/src/focaccia/deterministic.py b/src/focaccia/deterministic.py index e7914a3..4d52086 100644 --- a/src/focaccia/deterministic.py +++ b/src/focaccia/deterministic.py @@ -214,18 +214,6 @@ class Event: self.mem_writes = memory_writes self.event_type = event_type - def match(self, pc: int, target: ReadableProgramState) -> bool: - # TODO: match the rest of the state to be sure - if self.pc == pc: - for reg, value in self.registers.items(): - if value == self.pc: - continue - if target.read_register(reg) != value: - print(f'Failed match for {reg}: {hex(value)} != {hex(target.read_register(reg))}') - return False - return True - return False - def __repr__(self) -> str: reg_repr = f'{self.event_type} event\n' for reg, value in self.registers.items(): diff --git a/src/focaccia/native/tracer.py b/src/focaccia/native/tracer.py index 9dbc32a..47ac7e2 100644 --- a/src/focaccia/native/tracer.py +++ b/src/focaccia/native/tracer.py @@ -12,6 +12,7 @@ from focaccia.trace import Trace, TraceEnvironment from focaccia.miasm_util import MiasmSymbolResolver from focaccia.snapshot import ReadableProgramState, RegisterAccessError from focaccia.symbolic import SymbolicTransform, DisassemblyContext, run_instruction +from focaccia.deterministic import Event from .lldb_target import LLDBConcreteTarget, LLDBLocalTarget, LLDBRemoteTarget @@ -26,6 +27,18 @@ logging.getLogger('asmblock').setLevel(logging.CRITICAL) class ValidationError(Exception): pass +def match_event(event: Event, pc: int, target: ReadableProgramState) -> bool: + # TODO: match the rest of the state to be sure + if event.pc == pc: + for reg, value in event.registers.items(): + if value == event.pc: + continue + if target.read_register(reg) != value: + print(f'Failed match for {reg}: {hex(value)} != {hex(target.read_register(reg))}') + return False + return True + return False + class SpeculativeTracer(ReadableProgramState): def __init__(self, target: LLDBConcreteTarget): super().__init__(target.arch) @@ -216,7 +229,7 @@ class SymbolicTracer: def is_stepping_instr(self, pc: int, instruction: Instruction) -> bool: if self.nondet_events: pc = pc + instruction.length # detlog reports next pc for each event - if self.next_event and self.nondet_events[self.next_event].match(pc, self.target): + if self.next_event and match_event(self.nondet_events[self.next_event], pc, self.target): debug('Current instruction matches next event; stepping through it') self.progress_event() return True -- cgit 1.4.1 From 82027de31eb5ec497a4b56ee94986316b0491890 Mon Sep 17 00:00:00 2001 From: Theofilos Augoustis Date: Wed, 12 Nov 2025 09:16:30 +0000 Subject: Use a stub module to abstract-away handling of the deterministic log as an optional feature --- src/focaccia/_deterministic_impl.py | 383 +++++++++++++++++++++++++++++++ src/focaccia/deterministic.py | 378 ++---------------------------- src/focaccia/tools/capture_transforms.py | 19 +- 3 files changed, 401 insertions(+), 379 deletions(-) create mode 100644 src/focaccia/_deterministic_impl.py diff --git a/src/focaccia/_deterministic_impl.py b/src/focaccia/_deterministic_impl.py new file mode 100644 index 0000000..1d784cb --- /dev/null +++ b/src/focaccia/_deterministic_impl.py @@ -0,0 +1,383 @@ +"""Parsing of JSON files containing snapshot data.""" + +import os +import io +import struct +from typing import Union, Optional + +import brotli + +from .deterministic import ( + MemoryWriteHole, + MemoryWrite, + Event, + SyscallBufferFlushEvent, + SyscallExtra, + SyscallEvent, + SignalDescriptor, + SignalEvent, + MemoryMapping, + Task, + CloneTask, + ExecTask, + ExitTask +) + +import capnp +rr_trace = capnp.load(file_name='./rr/src/rr_trace.capnp', + imports=[os.path.dirname(p) for p in capnp.__path__]) + +Frame = rr_trace.Frame +TaskEvent = rr_trace.TaskEvent +MMap = rr_trace.MMap +SerializedObject = Union[Frame, TaskEvent, MMap] + +class DeterministicLogReader(io.RawIOBase): + """ + File-like reader for rr trace files. + + Each block in the file: + uint32_t uncompressed_size + uint32_t compressed_size + [compressed_data...] + Presents the concatenated uncompressed data as a sequential byte stream. + """ + + _HDR = struct.Struct(" None: + """Load and decompress the next Brotli block.""" + header = self._f.read(self._HDR.size) + if not header: + self._eof = True + self._data_buffer = memoryview(b"") + return + if len(header) != self._HDR.size: + raise EOFError("Incomplete RR data block header") + + compressed_length, uncompressed_length = self._HDR.unpack(header) + chunk = self._f.read(compressed_length) + if len(chunk) != compressed_length: + raise EOFError("Incomplete RR data block") + + chunk = brotli.decompress(chunk) + if len(chunk) != uncompressed_length: + raise Exception(f'Malformed deterministic log: uncompressed chunk is not equal' + f'to reported length {hex(uncompressed_length)}') + + self._data_buffer = memoryview(chunk) + self._pos = 0 + + def read(self, n: Optional[int] = -1) -> bytes: + """Read up to n bytes from the uncompressed stream.""" + if n == 0: + return b"" + + chunks = bytearray() + remaining = n if n is not None and n >= 0 else None + + while not self._eof and (remaining is None or remaining > 0): + if self._pos >= len(self._data_buffer): + self._load_chunk() + if self._eof: + break + + available = len(self._data_buffer) - self._pos + take = available if remaining is None else min(available, remaining) + chunks += self._data_buffer[self._pos:self._pos + take] + self._pos += take + if remaining is not None: + remaining -= take + + return bytes(chunks) + + def readable(self) -> bool: + return True + + def close(self) -> None: + if not self.closed: + self._f.close() + super().close() + +def parse_x64_registers(enc_regs: bytes, signed: bool=False) -> dict[str, int]: + idx = 0 + def parse_reg(): + nonlocal idx + enc_reg = enc_regs[idx:(idx := idx + 8)] + return int.from_bytes(enc_reg, byteorder='little', signed=signed) + + regs = {} + + regs['r15'] = parse_reg() + regs['r14'] = parse_reg() + regs['r13'] = parse_reg() + regs['r12'] = parse_reg() + regs['rbp'] = parse_reg() + regs['rbx'] = parse_reg() + + # rcx is unreliable: parsed but ignored + parse_reg() + + regs['r10'] = parse_reg() + regs['r9'] = parse_reg() + regs['r8'] = parse_reg() + + regs['rax'] = parse_reg() + + # rcx is unreliable: parsed but ignored + parse_reg() + + regs['rdx'] = parse_reg() + regs['rsi'] = parse_reg() + regs['rdi'] = parse_reg() + + regs['orig_rax'] = parse_reg() + + regs['rip'] = parse_reg() + regs['cs'] = parse_reg() + + # eflags is unreliable: parsed but ignored + parse_reg() + + regs['rsp'] = parse_reg() + regs['ss'] = parse_reg() + regs['fs_base'] = parse_reg() + regs['ds'] = parse_reg() + regs['es'] = parse_reg() + regs['fs'] = parse_reg() + regs['gs'] = parse_reg() + regs['gs_base'] = parse_reg() + + return regs + +def parse_aarch64_registers(enc_regs: bytes, order: str='little', signed: bool=False) -> dict[str, int]: + idx = 0 + def parse_reg(): + nonlocal idx + enc_reg = enc_regs[idx:(idx := idx + 8)] + return int.from_bytes(enc_reg, byteorder=order, signed=signed) + + regnames = [] + for i in range(32): + regnames.append(f'x{i}') + regnames.append('sp') + regnames.append('pc') + regnames.append('cpsr') + + regs = {} + for i in range(len(regnames)): + regs[regnames[i]] = parse_reg() + + return regs + +class DeterministicLog: + def __init__(self, log_dir: str): + self.base_directory = log_dir + + def _get_file(self, file_name: str) -> str | None: + candidate = os.path.join(self.base_directory, file_name) + if os.path.isfile(candidate): + return candidate + return None + + def events_file(self) -> str | None: + return self._get_file('events') + + def tasks_file(self) -> str | None: + return self._get_file('tasks') + + def mmaps_file(self) -> str | None: + return self._get_file('mmaps') + + def data_file(self) -> str | None: + return self._get_file('data') + + def _read_structure(self, file, obj: SerializedObject) -> list[SerializedObject]: + data = DeterministicLogReader(file).read() + + objects = [] + for deser in obj.read_multiple_bytes_packed(data): + objects.append(deser) + return objects + + def raw_events(self) -> list[Frame]: + return self._read_structure(self.events_file(), Frame) + + def raw_tasks(self) -> list[TaskEvent]: + return self._read_structure(self.tasks_file(), TaskEvent) + + def raw_mmaps(self) -> list[MMap]: + return self._read_structure(self.mmaps_file(), MMap) + + def events(self) -> list[Event]: + def parse_registers(event: Frame) -> Union[int, dict[str, int]]: + arch = event.arch + if arch == rr_trace.Arch.x8664: + regs = parse_x64_registers(event.registers.raw) + return regs['rip'], regs + if arch == rr_trace.Arch.aarch64: + regs = parse_aarch64_registers(event.registers.raw) + return regs['pc'], regs + raise NotImplementedError(f'Unable to parse registers for architecture {arch}') + + def parse_memory_writes(event: Frame, reader: io.RawIOBase) -> list[MemoryWrite]: + writes = [] + for raw_write in event.memWrites: + # Skip memory writes with 0 bytes + if raw_write.size == 0: + continue + + holes = [] + for raw_hole in raw_write.holes: + holes.append(MemoryWriteHole(raw_hole.offset, raw_hole.size)) + + data = bytearray() + for hole in holes: + until_hole = hole.offset - reader.tell() + data.extend(reader.read(until_hole)) + data.extend(b'\x00' * hole.size) + + # No holes + if len(data) == 0: + data = reader.read(raw_write.size) + + mem_write = MemoryWrite(raw_write.tid, + raw_write.addr, + raw_write.size, + holes, + raw_write.sizeIsConservative, + bytes(data)) + writes.append(mem_write) + return writes + + data_reader = DeterministicLogReader(self.data_file()) + + events = [] + raw_events = self.raw_events() + for raw_event in raw_events: + pc, registers = parse_registers(raw_event) + mem_writes = parse_memory_writes(raw_event, data_reader) + + event = None + + tid = raw_event.tid + arch = raw_event.arch + event_type = raw_event.event.which() + + if event_type == 'syscall': + if raw_event.arch == rr_trace.Arch.x8664: + # On entry: substitute orig_rax for RAX + if raw_event.event.syscall.state == rr_trace.SyscallState.entering: + registers['rax'] = registers['orig_rax'] + del registers['orig_rax'] + event = SyscallEvent(pc, + tid, + arch, + registers, + mem_writes, + raw_event.event.syscall.arch, + raw_event.event.syscall.number, + raw_event.event.syscall.state, + raw_event.event.syscall.failedDuringPreparation) + + if event_type == 'syscallbufFlush': + event = SyscallBufferFlushEvent(pc, + tid, + arch, + registers, + mem_writes, + raw_event.event.syscallbufFlush.mprotectRecords) + raise NotImplementedError(f'Cannot support system call buffer events yet: {event}') + if event_type == 'signal': + signal = raw_event.event.signal + signal_descriptor = SignalDescriptor(signal.arch, + signal.siginfo, + signal.deterministic, + signal.disposition) + event = SignalEvent(pc, tid, arch, registers, mem_writes, + signal_number=signal_descriptor) + + if event_type == 'signalDelivery': + signal = raw_event.event.signalDelivery + signal_descriptor = SignalDescriptor(signal.arch, + signal.siginfo, + signal.deterministic, + signal.disposition) + event = SignalEvent(pc, tid, arch, registers, mem_writes, + signal_delivery=signal_descriptor) + + if event_type == 'signalHandler': + signal = raw_event.event.signalHandler + signal_descriptor = SignalDescriptor(signal.arch, + signal.siginfo, + signal.deterministic, + signal.disposition) + event = SignalEvent(pc, tid, arch, registers, mem_writes, + signal_handler=signal_descriptor) + + if event is None: + event = Event(pc, tid, arch, registers, mem_writes, event_type) + + events.append(event) + + return events + + def tasks(self) -> list[Task]: + tasks = [] + raw_tasks = self.raw_tasks() + for raw_task in raw_tasks: + task_type = raw_task.which() + + task = None + if task_type == 'clone': + task = CloneTask(raw_task.frameTime, + raw_task.tid, + raw_task.clone.parentTid, + raw_task.clone.flags, + raw_task.clone.ownNsTid) + if task_type == 'exec': + task = ExecTask(raw_task.frameTime, + raw_task.tid, + raw_task.exec.fileName, + raw_task.exec.cmdLine, + raw_task.exec.exeBase, + raw_task.exec.interpBase, + raw_task.exec.interpName) + if task_type == 'exit': + task = ExitTask(raw_task.frameTime, raw_task.tid, raw_task.exit.exitStatus) + if task_type == 'detach': + task = DetachTask(raw_task.frameTime, raw_task.tid) + tasks.append(task) + return tasks + + def mmaps(self) -> list[MemoryMapping]: + def mapping_source(mmap: MMap) -> str: + source_type = mmap.source.which() + if source_type == 'zero' or source_type == 'trace': + return source_type + elif source_type == 'file': + return mmap.source.file.backingFileName + else: + raise NotImplementedError(f'Unable to handle memory mappings from source type:' + f' {source_type}') + + mmaps = [] + raw_mmaps = self.raw_mmaps() + for raw_mmap in raw_mmaps: + mmap = MemoryMapping(raw_mmap.frameTime, + raw_mmap.start, + raw_mmap.end, + mapping_source(raw_mmap), + raw_mmap.fileOffsetBytes, + raw_mmap.prot, + raw_mmap.flags) + mmaps.append(mmap) + return mmaps + diff --git a/src/focaccia/deterministic.py b/src/focaccia/deterministic.py index 4d52086..4ebcf9e 100644 --- a/src/focaccia/deterministic.py +++ b/src/focaccia/deterministic.py @@ -1,172 +1,4 @@ -"""Parsing of JSON files containing snapshot data.""" - -import os -import io -import struct -from typing import Union, Optional - -import brotli - from .arch import Arch -from .snapshot import ReadableProgramState - -try: - import capnp - rr_trace = capnp.load(file_name='./rr/src/rr_trace.capnp', - imports=[os.path.dirname(p) for p in capnp.__path__]) -except Exception as e: - print(f'Cannot load RR trace loader: {e}') - exit(2) - -Frame = rr_trace.Frame -TaskEvent = rr_trace.TaskEvent -MMap = rr_trace.MMap -SerializedObject = Union[Frame, TaskEvent, MMap] - -class DeterministicLogReader(io.RawIOBase): - """ - File-like reader for rr trace files. - - Each block in the file: - uint32_t uncompressed_size - uint32_t compressed_size - [compressed_data...] - Presents the concatenated uncompressed data as a sequential byte stream. - """ - - _HDR = struct.Struct(" None: - """Load and decompress the next Brotli block.""" - header = self._f.read(self._HDR.size) - if not header: - self._eof = True - self._data_buffer = memoryview(b"") - return - if len(header) != self._HDR.size: - raise EOFError("Incomplete RR data block header") - - compressed_length, uncompressed_length = self._HDR.unpack(header) - chunk = self._f.read(compressed_length) - if len(chunk) != compressed_length: - raise EOFError("Incomplete RR data block") - - chunk = brotli.decompress(chunk) - if len(chunk) != uncompressed_length: - raise Exception(f'Malformed deterministic log: uncompressed chunk is not equal' - f'to reported length {hex(uncompressed_length)}') - - self._data_buffer = memoryview(chunk) - self._pos = 0 - - def read(self, n: Optional[int] = -1) -> bytes: - """Read up to n bytes from the uncompressed stream.""" - if n == 0: - return b"" - - chunks = bytearray() - remaining = n if n is not None and n >= 0 else None - - while not self._eof and (remaining is None or remaining > 0): - if self._pos >= len(self._data_buffer): - self._load_chunk() - if self._eof: - break - - available = len(self._data_buffer) - self._pos - take = available if remaining is None else min(available, remaining) - chunks += self._data_buffer[self._pos:self._pos + take] - self._pos += take - if remaining is not None: - remaining -= take - - return bytes(chunks) - - def readable(self) -> bool: - return True - - def close(self) -> None: - if not self.closed: - self._f.close() - super().close() - -def parse_x64_registers(enc_regs: bytes, signed: bool=False) -> dict[str, int]: - idx = 0 - def parse_reg(): - nonlocal idx - enc_reg = enc_regs[idx:(idx := idx + 8)] - return int.from_bytes(enc_reg, byteorder='little', signed=signed) - - regs = {} - - regs['r15'] = parse_reg() - regs['r14'] = parse_reg() - regs['r13'] = parse_reg() - regs['r12'] = parse_reg() - regs['rbp'] = parse_reg() - regs['rbx'] = parse_reg() - - # rcx is unreliable: parsed but ignored - parse_reg() - - regs['r10'] = parse_reg() - regs['r9'] = parse_reg() - regs['r8'] = parse_reg() - - regs['rax'] = parse_reg() - - # rcx is unreliable: parsed but ignored - parse_reg() - - regs['rdx'] = parse_reg() - regs['rsi'] = parse_reg() - regs['rdi'] = parse_reg() - - regs['orig_rax'] = parse_reg() - - regs['rip'] = parse_reg() - regs['cs'] = parse_reg() - - # eflags is unreliable: parsed but ignored - parse_reg() - - regs['rsp'] = parse_reg() - regs['ss'] = parse_reg() - regs['fs_base'] = parse_reg() - regs['ds'] = parse_reg() - regs['es'] = parse_reg() - regs['fs'] = parse_reg() - regs['gs'] = parse_reg() - regs['gs_base'] = parse_reg() - - return regs - -def parse_aarch64_registers(enc_regs: bytes, order: str='little', signed: bool=False) -> dict[str, int]: - idx = 0 - def parse_reg(): - nonlocal idx - enc_reg = enc_regs[idx:(idx := idx + 8)] - return int.from_bytes(enc_reg, byteorder=order, signed=signed) - - regnames = [] - for i in range(32): - regnames.append(f'x{i}') - regnames.append('sp') - regnames.append('pc') - regnames.append('cpsr') - - regs = {} - for i in range(len(regnames)): - regs[regnames[i]] = parse_reg() - - return regs class MemoryWriteHole: def __init__(self, offset: int, size: int): @@ -435,201 +267,17 @@ class DetachTask(Task): def __repr__(self) -> str: return f'Detach task\n{super().__repr__()}' -class DeterministicLog: - def __init__(self, log_dir: str): - self.base_directory = log_dir - - def events_file(self) -> str: - return os.path.join(self.base_directory, 'events') - - def tasks_file(self) -> str: - return os.path.join(self.base_directory, 'tasks') - - def mmaps_file(self) -> str: - return os.path.join(self.base_directory, 'mmaps') - - def data_file(self) -> str: - return os.path.join(self.base_directory, 'data') - - def _read_structure(self, file, obj: SerializedObject) -> list[SerializedObject]: - data = DeterministicLogReader(file).read() - - objects = [] - for deser in obj.read_multiple_bytes_packed(data): - objects.append(deser) - return objects - - def raw_events(self) -> list[Frame]: - return self._read_structure(self.events_file(), Frame) - - def raw_tasks(self) -> list[TaskEvent]: - return self._read_structure(self.tasks_file(), TaskEvent) - - def raw_mmaps(self) -> list[MMap]: - return self._read_structure(self.mmaps_file(), MMap) - - def events(self) -> list[Event]: - def parse_registers(event: Frame) -> Union[int, dict[str, int]]: - arch = event.arch - if arch == rr_trace.Arch.x8664: - regs = parse_x64_registers(event.registers.raw) - return regs['rip'], regs - if arch == rr_trace.Arch.aarch64: - regs = parse_aarch64_registers(event.registers.raw) - return regs['pc'], regs - raise NotImplementedError(f'Unable to parse registers for architecture {arch}') - - def parse_memory_writes(event: Frame, reader: io.RawIOBase) -> list[MemoryWrite]: - writes = [] - for raw_write in event.memWrites: - # Skip memory writes with 0 bytes - if raw_write.size == 0: - continue - - holes = [] - for raw_hole in raw_write.holes: - holes.append(MemoryWriteHole(raw_hole.offset, raw_hole.size)) - - data = bytearray() - for hole in holes: - until_hole = hole.offset - reader.tell() - data.extend(reader.read(until_hole)) - data.extend(b'\x00' * hole.size) - - # No holes - if len(data) == 0: - data = reader.read(raw_write.size) - - mem_write = MemoryWrite(raw_write.tid, - raw_write.addr, - raw_write.size, - holes, - raw_write.sizeIsConservative, - bytes(data)) - writes.append(mem_write) - return writes - - data_reader = DeterministicLogReader(self.data_file()) - - events = [] - raw_events = self.raw_events() - for raw_event in raw_events: - pc, registers = parse_registers(raw_event) - mem_writes = parse_memory_writes(raw_event, data_reader) - - event = None - - tid = raw_event.tid - arch = raw_event.arch - event_type = raw_event.event.which() - - if event_type == 'syscall': - if raw_event.arch == rr_trace.Arch.x8664: - # On entry: substitute orig_rax for RAX - if raw_event.event.syscall.state == rr_trace.SyscallState.entering: - registers['rax'] = registers['orig_rax'] - del registers['orig_rax'] - event = SyscallEvent(pc, - tid, - arch, - registers, - mem_writes, - raw_event.event.syscall.arch, - raw_event.event.syscall.number, - raw_event.event.syscall.state, - raw_event.event.syscall.failedDuringPreparation) - - if event_type == 'syscallbufFlush': - event = SyscallBufferFlushEvent(pc, - tid, - arch, - registers, - mem_writes, - raw_event.event.syscallbufFlush.mprotectRecords) - raise NotImplementedError(f'Cannot support system call buffer events yet: {event}') - if event_type == 'signal': - signal = raw_event.event.signal - signal_descriptor = SignalDescriptor(signal.arch, - signal.siginfo, - signal.deterministic, - signal.disposition) - event = SignalEvent(pc, tid, arch, registers, mem_writes, - signal_number=signal_descriptor) - - if event_type == 'signalDelivery': - signal = raw_event.event.signalDelivery - signal_descriptor = SignalDescriptor(signal.arch, - signal.siginfo, - signal.deterministic, - signal.disposition) - event = SignalEvent(pc, tid, arch, registers, mem_writes, - signal_delivery=signal_descriptor) - - if event_type == 'signalHandler': - signal = raw_event.event.signalHandler - signal_descriptor = SignalDescriptor(signal.arch, - signal.siginfo, - signal.deterministic, - signal.disposition) - event = SignalEvent(pc, tid, arch, registers, mem_writes, - signal_handler=signal_descriptor) - - if event is None: - event = Event(pc, tid, arch, registers, mem_writes, event_type) - - events.append(event) - - return events - - def tasks(self) -> list[Task]: - tasks = [] - raw_tasks = self.raw_tasks() - for raw_task in raw_tasks: - task_type = raw_task.which() - - task = None - if task_type == 'clone': - task = CloneTask(raw_task.frameTime, - raw_task.tid, - raw_task.clone.parentTid, - raw_task.clone.flags, - raw_task.clone.ownNsTid) - if task_type == 'exec': - task = ExecTask(raw_task.frameTime, - raw_task.tid, - raw_task.exec.fileName, - raw_task.exec.cmdLine, - raw_task.exec.exeBase, - raw_task.exec.interpBase, - raw_task.exec.interpName) - if task_type == 'exit': - task = ExitTask(raw_task.frameTime, raw_task.tid, raw_task.exit.exitStatus) - if task_type == 'detach': - task = DetachTask(raw_task.frameTime, raw_task.tid) - tasks.append(task) - return tasks - - def mmaps(self) -> list[MemoryMapping]: - def mapping_source(mmap: MMap) -> str: - source_type = mmap.source.which() - if source_type == 'zero' or source_type == 'trace': - return source_type - elif source_type == 'file': - return mmap.source.file.backingFileName - else: - raise NotImplementedError(f'Unable to handle memory mappings from source type:' - f' {source_type}') - - mmaps = [] - raw_mmaps = self.raw_mmaps() - for raw_mmap in raw_mmaps: - mmap = MemoryMapping(raw_mmap.frameTime, - raw_mmap.start, - raw_mmap.end, - mapping_source(raw_mmap), - raw_mmap.fileOffsetBytes, - raw_mmap.prot, - raw_mmap.flags) - mmaps.append(mmap) - return mmaps +try: + from ._deterministic_impl import DeterministicLog +except Exception: + class DeterministicLog: + def __init__(self, log_dir: str): + self.base_directory = None + + def events_file(self) -> str | None: return None + def tasks_file(self) -> str | None: return None + def mmaps_file(self) -> str | None: return None + def events(self) -> list[Event]: return [] + def tasks(self) -> list[Task]: return [] + def mmaps(self) -> list[MemoryMapping]: return [] diff --git a/src/focaccia/tools/capture_transforms.py b/src/focaccia/tools/capture_transforms.py index a178ba0..268af36 100755 --- a/src/focaccia/tools/capture_transforms.py +++ b/src/focaccia/tools/capture_transforms.py @@ -7,6 +7,7 @@ import logging from focaccia import parser, utils from focaccia.trace import TraceEnvironment from focaccia.native.tracer import SymbolicTracer +from focaccia.deterministic import DeterministicLog def main(): prog = argparse.ArgumentParser() @@ -62,20 +63,10 @@ def main(): else: logging.basicConfig(level=logging.INFO) - detlog = None - if args.deterministic_log: - from focaccia.deterministic import DeterministicLog - detlog = DeterministicLog(args.deterministic_log) - else: - class NullDeterministicLog: - def __init__(self): pass - def events_file(self): return None - def tasks_file(self): return None - def mmaps_file(self): return None - def events(self): return [] - def tasks(self): return [] - def mmaps(self): return [] - detlog = NullDeterministicLog() + detlog = DeterministicLog(args.deterministic_log) + if args.deterministic_log and detlog.base_directory is None: + raise NotImplementedError(f'Deterministic log {args.deterministic_log} specified but ' + 'Focaccia built without deterministic log support') env = TraceEnvironment(args.binary, args.args, utils.get_envp(), nondeterminism_log=detlog, -- cgit 1.4.1 From 014e07a7128e23c0130fef3d74459f4159d0fe4d Mon Sep 17 00:00:00 2001 From: ReimersS Date: Fri, 7 Nov 2025 15:45:32 +0000 Subject: Deterministic replay (single threaded, no memory) --- reproducers/issue-508.c | 7 +++- src/focaccia/arch/arch.py | 24 ++++++++++++ src/focaccia/arch/x86.py | 22 ++++++++++- src/focaccia/deterministic.py | 2 +- src/focaccia/qemu/_qemu_tool.py | 74 +++++++++++++++++++++++++++++++++++-- src/focaccia/tools/validate_qemu.py | 4 +- 6 files changed, 125 insertions(+), 8 deletions(-) diff --git a/reproducers/issue-508.c b/reproducers/issue-508.c index e143183..c8fd251 100644 --- a/reproducers/issue-508.c +++ b/reproducers/issue-508.c @@ -1,13 +1,18 @@ #include +#include +#include int main() { int mem = 0x12345678; + int buf = 0; + getrandom(&buf, sizeof(buf), 0); register long rax asm("rax") = 0x1234567812345678; - register int edi asm("edi") = 0x77777777; + register int edi asm("edi") = buf; asm("cmpxchg %[edi],%[mem]" : [ mem ] "+m"(mem), [ rax ] "+r"(rax) : [ edi ] "r"(edi)); long rax2 = rax; printf("rax2 = %lx\n", rax2); + printf("rand= %d\n", buf); } diff --git a/src/focaccia/arch/arch.py b/src/focaccia/arch/arch.py index c220a3b..2652159 100644 --- a/src/focaccia/arch/arch.py +++ b/src/focaccia/arch/arch.py @@ -1,6 +1,17 @@ from typing import Literal from collections.abc import Callable +class SyscallInfo: + def __init__(self, name: str, outputs: list[(str, str, str)]): + """ Describes a syscall by its name and outputs. + + An output is a regname holding the pointer, + the length in bytes--either as a number or as onther register name-- and + the type of the output + """ + self.name = name + self.outputs = outputs + class RegisterAccessor: def __init__(self, regname: str, start_bit: int, end_bit: int): """An accessor that describes a range of bits. @@ -103,6 +114,19 @@ class Arch(): """ return False + def get_em_syscalls(self) -> dict[int, str]: + """Returns an architecture specific set of syscalls that Focaccia needs to purely emulate.""" + raise NotImplementedError("Architecture must implement get_em_syscalls") + + def get_pasthru_syscalls(self) -> dict[int, str]: + """Returns an architecture specific set of syscalls that Focaccia needs to passthrough and + then warns about missmatching values. Examples are memory and lock related syscalls.""" + raise NotImplementedError("Architecture must implement get_pasthru_syscalls") + + def get_syscall_reg(self) -> str: + """Returns the register name that contains the syscall number.""" + raise NotImplementedError("Architecture must implement get_syscall_reg") + def is_instr_syscall(self, instr: str) -> bool: return False diff --git a/src/focaccia/arch/x86.py b/src/focaccia/arch/x86.py index a5d29f5..b33c6bf 100644 --- a/src/focaccia/arch/x86.py +++ b/src/focaccia/arch/x86.py @@ -1,6 +1,6 @@ """Architecture-specific configuration.""" -from .arch import Arch, RegisterDescription as _Reg +from .arch import Arch, RegisterDescription as _Reg, SyscallInfo as _Sc archname = 'x86_64' @@ -183,6 +183,18 @@ def compose_rflags(rflags: dict[str, int]) -> int: (0x00200000 if rflags.get('ID', 0) else 0) ) +# Incomplete, only the most common ones +emulatedSyscalls = { + 34: _Sc('pause', []), + 39: _Sc('getpid', []), + 102: _Sc('getuid', []), + 318: _Sc('getrandom', [('rdi', 'rsi', 'char')]), +} + +# Focaccia will do scheduling (and locking ???) +passthruSyscalls = { +} + class ArchX86(Arch): def __init__(self): super().__init__(archname, registers, 64) @@ -215,3 +227,11 @@ class ArchX86(Arch): return True return False + def get_em_syscalls(self) -> dict[int, str]: + return emulatedSyscalls + + def get_pasthru_syscalls(self) -> dict[int, str]: + return passthruSyscalls + + def get_syscall_reg(self) -> str: + return 'rax' diff --git a/src/focaccia/deterministic.py b/src/focaccia/deterministic.py index 4ebcf9e..2a15430 100644 --- a/src/focaccia/deterministic.py +++ b/src/focaccia/deterministic.py @@ -57,7 +57,7 @@ class Event: repr_str = f'Thread {hex(self.tid)} executed event {self.event_type} at {hex(self.pc)}\n' repr_str += f'Register set:\n{reg_repr}' - + if len(self.mem_writes): repr_str += f'\nMemory writes:\n{mem_write_repr}' diff --git a/src/focaccia/qemu/_qemu_tool.py b/src/focaccia/qemu/_qemu_tool.py index 93849bd..73857a4 100644 --- a/src/focaccia/qemu/_qemu_tool.py +++ b/src/focaccia/qemu/_qemu_tool.py @@ -19,6 +19,7 @@ from focaccia.snapshot import ProgramState, ReadableProgramState, \ from focaccia.symbolic import SymbolicTransform, eval_symbol, ExprMem from focaccia.trace import Trace, TraceEnvironment from focaccia.utils import print_result +from focaccia.deterministic import DeterministicLog, Event from focaccia.tools.validate_qemu import make_argparser, verbosity @@ -121,11 +122,13 @@ class GDBProgramState(ReadableProgramState): raise MemoryAccessError(addr, size, str(err)) class GDBServerStateIterator: - def __init__(self, remote: str): + def __init__(self, remote: str, replay_log: list[Event] | None): gdb.execute('set pagination 0') gdb.execute('set sysroot') gdb.execute('set python print-stack full') # enable complete Python tracebacks gdb.execute(f'target remote {remote}') + self._replay_log = replay_log + self._replay_idx = 0 self._process = gdb.selected_inferior() self._first_next = True @@ -142,6 +145,55 @@ class GDBServerStateIterator: self.arch = supported_architectures[archname] self.binary = self._process.progspace.filename + def _handle_sync_point(self, call: int, addr: int, length: int, arch: Arch): + def _search_next_event(addr: int, idx: int) -> Event | None: + if self._replay_log is None: + return idx, None + for i in range(idx, len(self._replay_log)): + event = self._replay_log[i] + if event.pc == addr: + return i, event + return idx, None + + _new_pc = addr + length + print(f'Handling syscall at {hex(_new_pc)} with call number {call}') + if int(call) in arch.get_em_syscalls().keys(): + + #print(f'Events: {self._replay_log[self._replay_idx:]}') + i, e = _search_next_event(_new_pc, self._replay_idx) + if e is None: + raise Exception(f'No matching event found in deterministic log \ + for syscall at {hex(_new_pc)}') + + e = self._replay_log[i+1] + print(f'Adjusting w/ Event: {e}') + gdb.execute(f'set $pc = {hex(_new_pc)}') + self._replay_idx = i+2 + + reg_name = arch.get_syscall_reg() + gdb.execute(f'set $rax = {hex(e.registers.get("{reg_name}", 0))}') + + assert(len(arch.get_em_syscalls()[int(call)].outputs) == len(e.mem_writes)) + + w_idx = 0 + for _reg, _size, _type in arch.get_em_syscalls()[int(call)].outputs: + if arch.to_regname(_size) is not None: + _size = e.registers[_size] + else: + _size = int(_size) + + _addr_rr = e.registers[_reg] + _size_rr = e.mem_writes[_addr_rr] + + assert (_size == _size_rr), f'{_size} != {_size_rr}' + _addr = gdb.selected_frame().read_register(_reg) + # TODO + gdb.execute(f'set {{{_type}[_src]}}{_addr} = *({_type}[{_size}] *){_addr}') + + return _new_pc + + return addr + def __iter__(self): return self @@ -160,6 +212,11 @@ class GDBServerStateIterator: if not self._process.is_valid() or len(self._process.threads()) == 0: raise StopIteration new_pc = gdb.selected_frame().read_register('pc') + if self._replay_log is not None: + asm = gdb.selected_frame().architecture().disassemble(new_pc, count=1)[0] + if 'syscall' in asm['asm']: + call_reg = self.arch.get_syscall_reg() + new_pc = self._handle_sync_point(gdb.selected_frame().read_register(call_reg), asm['addr'], asm['length'], self.arch) return GDBProgramState(self._process, gdb.selected_frame(), self.arch) @@ -341,17 +398,26 @@ def collect_conc_trace(gdb: GDBServerStateIterator, \ # Note: this may occur when symbolic traces were gathered with a stop address if symb_i >= len(strace): warn(f'QEMU executed more states than native execution: {symb_i} vs {len(strace)-1}') - + return states, matched_transforms def main(): args = make_argparser().parse_args() - + logging_level = getattr(logging, args.error_level.upper(), logging.INFO) logging.basicConfig(level=logging_level, force=True) + if args.deterministic is not None: + replay_log = DeterministicLog(log_dir=args.deterministic) + + if args.deterministic is not None: + replay_log = DeterministicLog(log_dir=args.deterministic) + + print(f'Events: {list(replay_log.raw_events())}') + print(f'Maps: {list(replay_log.raw_mmaps())}') + exit(0) try: - gdb_server = GDBServerStateIterator(args.remote) + gdb_server = GDBServerStateIterator(args.remote, replay_log.events()) except Exception as e: raise Exception(f'Unable to perform basic GDB setup: {e}') diff --git a/src/focaccia/tools/validate_qemu.py b/src/focaccia/tools/validate_qemu.py index 48b3f1c..3e0db89 100755 --- a/src/focaccia/tools/validate_qemu.py +++ b/src/focaccia/tools/validate_qemu.py @@ -79,10 +79,12 @@ memory, and stepping forward by single instructions. prog.add_argument('--remote', type=str, help='The hostname:port pair at which to find a QEMU GDB server.') - prog.add_argument('--gdb', + prog.add_argument('--gdb', type=str, default='gdb', help='GDB binary to invoke.') + prog.add_argument('--deterministic', default=None, + help='The directory containing rr traces') return prog def quoted(s: str) -> str: -- cgit 1.4.1 From 3a7af86d1f027b9d0788faea36923729a8a8b28f Mon Sep 17 00:00:00 2001 From: ReimersS Date: Tue, 11 Nov 2025 13:27:08 +0000 Subject: Rebase syscall overwrites --- src/focaccia/qemu/_qemu_tool.py | 38 +++++++++++++++++--------------------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/src/focaccia/qemu/_qemu_tool.py b/src/focaccia/qemu/_qemu_tool.py index 73857a4..ccdf2fb 100644 --- a/src/focaccia/qemu/_qemu_tool.py +++ b/src/focaccia/qemu/_qemu_tool.py @@ -122,13 +122,13 @@ class GDBProgramState(ReadableProgramState): raise MemoryAccessError(addr, size, str(err)) class GDBServerStateIterator: - def __init__(self, remote: str, replay_log: list[Event] | None): + def __init__(self, remote: str, deterministic_log: list[Event] | None): gdb.execute('set pagination 0') gdb.execute('set sysroot') gdb.execute('set python print-stack full') # enable complete Python tracebacks gdb.execute(f'target remote {remote}') - self._replay_log = replay_log - self._replay_idx = 0 + self._deterministic_log = deterministic_log + self._deterministic_idx = 0 self._process = gdb.selected_inferior() self._first_next = True @@ -147,10 +147,10 @@ class GDBServerStateIterator: def _handle_sync_point(self, call: int, addr: int, length: int, arch: Arch): def _search_next_event(addr: int, idx: int) -> Event | None: - if self._replay_log is None: + if self._deterministic_log is None: return idx, None - for i in range(idx, len(self._replay_log)): - event = self._replay_log[i] + for i in range(idx, len(self._deterministic_log)): + event = self._deterministic_log[i] if event.pc == addr: return i, event return idx, None @@ -159,16 +159,16 @@ class GDBServerStateIterator: print(f'Handling syscall at {hex(_new_pc)} with call number {call}') if int(call) in arch.get_em_syscalls().keys(): - #print(f'Events: {self._replay_log[self._replay_idx:]}') - i, e = _search_next_event(_new_pc, self._replay_idx) + #print(f'Events: {self._deterministic_log[self._deterministic_idx:]}') + i, e = _search_next_event(_new_pc, self._deterministic_idx) if e is None: raise Exception(f'No matching event found in deterministic log \ for syscall at {hex(_new_pc)}') - e = self._replay_log[i+1] + e = self._deterministic_log[i+1] print(f'Adjusting w/ Event: {e}') gdb.execute(f'set $pc = {hex(_new_pc)}') - self._replay_idx = i+2 + self._deterministic_idx = i+2 reg_name = arch.get_syscall_reg() gdb.execute(f'set $rax = {hex(e.registers.get("{reg_name}", 0))}') @@ -183,12 +183,14 @@ class GDBServerStateIterator: _size = int(_size) _addr_rr = e.registers[_reg] - _size_rr = e.mem_writes[_addr_rr] + _w_rr = e.mem_writes[w_idx] + w_idx += 1 - assert (_size == _size_rr), f'{_size} != {_size_rr}' + assert (_size == _w_rr.size), f'{_size} != {_w_rr.size}' _addr = gdb.selected_frame().read_register(_reg) - # TODO - gdb.execute(f'set {{{_type}[_src]}}{_addr} = *({_type}[{_size}] *){_addr}') + cmd = f'set {{char[{_size}]}}{hex(_addr)} = 0x{_w_rr.data.hex()}' + # print(f'GDB: {cmd}') + gdb.execute(cmd) return _new_pc @@ -212,7 +214,7 @@ class GDBServerStateIterator: if not self._process.is_valid() or len(self._process.threads()) == 0: raise StopIteration new_pc = gdb.selected_frame().read_register('pc') - if self._replay_log is not None: + if self._deterministic_log is not None: asm = gdb.selected_frame().architecture().disassemble(new_pc, count=1)[0] if 'syscall' in asm['asm']: call_reg = self.arch.get_syscall_reg() @@ -410,12 +412,6 @@ def main(): if args.deterministic is not None: replay_log = DeterministicLog(log_dir=args.deterministic) - if args.deterministic is not None: - replay_log = DeterministicLog(log_dir=args.deterministic) - - print(f'Events: {list(replay_log.raw_events())}') - print(f'Maps: {list(replay_log.raw_mmaps())}') - exit(0) try: gdb_server = GDBServerStateIterator(args.remote, replay_log.events()) except Exception as e: -- cgit 1.4.1 From d103a0532b3cb7c73a2522022ff6c971af98f650 Mon Sep 17 00:00:00 2001 From: Theofilos Augoustis Date: Wed, 12 Nov 2025 14:40:39 +0000 Subject: Refactor iteration over events in native tracer --- src/focaccia/_deterministic_impl.py | 30 +++++++++++++-------- src/focaccia/deterministic.py | 52 +++++++++++++++++++++++++++++++++++++ src/focaccia/native/tracer.py | 52 ++++++++++++------------------------- src/focaccia/snapshot.py | 7 +++++ 4 files changed, 95 insertions(+), 46 deletions(-) diff --git a/src/focaccia/_deterministic_impl.py b/src/focaccia/_deterministic_impl.py index 1d784cb..fc85b9a 100644 --- a/src/focaccia/_deterministic_impl.py +++ b/src/focaccia/_deterministic_impl.py @@ -3,7 +3,7 @@ import os import io import struct -from typing import Union, Optional +from typing import Union, Tuple, Optional import brotli @@ -217,14 +217,14 @@ class DeterministicLog: return self._read_structure(self.mmaps_file(), MMap) def events(self) -> list[Event]: - def parse_registers(event: Frame) -> Union[int, dict[str, int]]: + def parse_registers(event: Frame) -> Tuple[str, dict[str, int]]: arch = event.arch if arch == rr_trace.Arch.x8664: regs = parse_x64_registers(event.registers.raw) - return regs['rip'], regs + return 'rip', regs if arch == rr_trace.Arch.aarch64: regs = parse_aarch64_registers(event.registers.raw) - return regs['pc'], regs + return 'pc', regs raise NotImplementedError(f'Unable to parse registers for architecture {arch}') def parse_memory_writes(event: Frame, reader: io.RawIOBase) -> list[MemoryWrite]: @@ -274,10 +274,18 @@ class DeterministicLog: if event_type == 'syscall': if raw_event.arch == rr_trace.Arch.x8664: # On entry: substitute orig_rax for RAX - if raw_event.event.syscall.state == rr_trace.SyscallState.entering: + syscall = raw_event.event.syscall + if syscall.state == rr_trace.SyscallState.entering: registers['rax'] = registers['orig_rax'] + if syscall.number != 59: + registers[pc] -= 2 del registers['orig_rax'] - event = SyscallEvent(pc, + if raw_event.arch == rr_trace.Arch.aarch64: + syscall = raw_event.event.syscall + if syscall.state == rr_trace.SyscallState.entering and syscall.number != 221: + registers[pc] -= 4 + + event = SyscallEvent(registers[pc], tid, arch, registers, @@ -288,7 +296,7 @@ class DeterministicLog: raw_event.event.syscall.failedDuringPreparation) if event_type == 'syscallbufFlush': - event = SyscallBufferFlushEvent(pc, + event = SyscallBufferFlushEvent(registers[pc], tid, arch, registers, @@ -301,7 +309,7 @@ class DeterministicLog: signal.siginfo, signal.deterministic, signal.disposition) - event = SignalEvent(pc, tid, arch, registers, mem_writes, + event = SignalEvent(registers[pc], tid, arch, registers, mem_writes, signal_number=signal_descriptor) if event_type == 'signalDelivery': @@ -310,7 +318,7 @@ class DeterministicLog: signal.siginfo, signal.deterministic, signal.disposition) - event = SignalEvent(pc, tid, arch, registers, mem_writes, + event = SignalEvent(registers[pc], tid, arch, registers, mem_writes, signal_delivery=signal_descriptor) if event_type == 'signalHandler': @@ -319,11 +327,11 @@ class DeterministicLog: signal.siginfo, signal.deterministic, signal.disposition) - event = SignalEvent(pc, tid, arch, registers, mem_writes, + event = SignalEvent(registers[pc], tid, arch, registers, mem_writes, signal_handler=signal_descriptor) if event is None: - event = Event(pc, tid, arch, registers, mem_writes, event_type) + event = Event(registers[pc], tid, arch, registers, mem_writes, event_type) events.append(event) diff --git a/src/focaccia/deterministic.py b/src/focaccia/deterministic.py index 2a15430..91afd4e 100644 --- a/src/focaccia/deterministic.py +++ b/src/focaccia/deterministic.py @@ -1,4 +1,7 @@ from .arch import Arch +from .snapshot import ReadableProgramState + +from typing import Callable class MemoryWriteHole: def __init__(self, offset: int, size: int): @@ -280,4 +283,53 @@ except Exception: def events(self) -> list[Event]: return [] def tasks(self) -> list[Task]: return [] def mmaps(self) -> list[MemoryMapping]: return [] +finally: + class DeterministicEventIterator: + def __init__(self, deterministic_log: DeterministicLog, match_fn: Callable): + self._detlog = deterministic_log + self._events = self._detlog.events() + self._pc_to_event = {} + self._match = match_fn + self._idx: int | None = None # None represents no current event + self._in_event: bool = False + + idx = 0 + for event in self._events: + self._pc_to_event.setdefault(event.pc, []).append((event, idx)) + + def events(self) -> list[Event]: + return self._events + + def current_event(self) -> Event | None: + # No event when not synchronized + if self._idx is None or not self._in_event: + return None + return self._events[self._idx] + + def update(self, target: ReadableProgramState) -> Event | None: + # Quick check + candidates = self._pc_to_event.get(target.read_pc(), []) + if len(candidates) == 0: + self._in_event = False + return None + + # Find synchronization point + if self._idx is None: + for event, idx in candidates: + if self._match(event, target): + self._idx = idx + self._in_event = True + return self.current_event() + + return self.next() + + def next(self) -> Event | None: + if self._idx is None: + raise ValueError('Attempted to get next event without synchronizing') + + self._idx += 1 + return self.current_event() + + def __bool__(self) -> bool: + return len(self.events()) > 0 diff --git a/src/focaccia/native/tracer.py b/src/focaccia/native/tracer.py index 47ac7e2..b369b22 100644 --- a/src/focaccia/native/tracer.py +++ b/src/focaccia/native/tracer.py @@ -12,7 +12,7 @@ from focaccia.trace import Trace, TraceEnvironment from focaccia.miasm_util import MiasmSymbolResolver from focaccia.snapshot import ReadableProgramState, RegisterAccessError from focaccia.symbolic import SymbolicTransform, DisassemblyContext, run_instruction -from focaccia.deterministic import Event +from focaccia.deterministic import Event, DeterministicEventIterator from .lldb_target import LLDBConcreteTarget, LLDBLocalTarget, LLDBRemoteTarget @@ -27,9 +27,9 @@ logging.getLogger('asmblock').setLevel(logging.CRITICAL) class ValidationError(Exception): pass -def match_event(event: Event, pc: int, target: ReadableProgramState) -> bool: +def match_event(event: Event, target: ReadableProgramState) -> bool: # TODO: match the rest of the state to be sure - if event.pc == pc: + if event.pc == target.read_pc(): for reg, value in event.registers.items(): if value == event.pc: continue @@ -154,8 +154,7 @@ class SymbolicTracer: self.cross_validate = cross_validate self.target = SpeculativeTracer(self.create_debug_target()) - self.nondet_events = self.env.detlog.events() - self.next_event: int | None = None + self.nondet_events = DeterministicEventIterator(self.env.detlog, match_event) def create_debug_target(self) -> LLDBConcreteTarget: binary = self.env.binary_name @@ -209,30 +208,22 @@ class SymbolicTracer: f' mem[{hex(addr)}:{hex(addr+len(data))}] = {conc_data}.' f'\nFaulty transformation: {transform}') - def progress_event(self) -> None: - if (self.next_event + 1) < len(self.nondet_events): - self.next_event += 1 - debug(f'Next event to handle at index {self.next_event}') - else: - self.next_event = None - def post_event(self) -> None: - if self.next_event: - if self.nondet_events[self.next_event].pc == 0: + current_event = self.nondet_events.current_event() + if current_event: + if current_event.pc == 0: # Exit sequence debug('Completed exit event') self.target.run() - debug(f'Completed handling event at index {self.next_event}') - self.progress_event() + debug(f'Completed handling event: {current_event}') + self.nondet_events.next() - def is_stepping_instr(self, pc: int, instruction: Instruction) -> bool: - if self.nondet_events: - pc = pc + instruction.length # detlog reports next pc for each event - if self.next_event and match_event(self.nondet_events[self.next_event], pc, self.target): - debug('Current instruction matches next event; stepping through it') - self.progress_event() - return True + def is_stepping_instr(self, instruction: Instruction) -> bool: + if self.nondet_events.current_event(): + debug('Current instruction matches next event; stepping through it') + self.nondet_events.next() + return True else: if self.target.arch.is_instr_syscall(str(instruction)): return True @@ -257,21 +248,12 @@ class SymbolicTracer: if self.env.start_address is not None: self.target.run_until(self.env.start_address) - for i in range(len(self.nondet_events)): - if self.nondet_events[i].pc == self.target.read_pc(): - self.next_event = i+1 - if self.next_event >= len(self.nondet_events): - break - - debug(f'Starting from event {self.nondet_events[i]} onwards') - break - ctx = DisassemblyContext(self.target) arch = ctx.arch if logger.isEnabledFor(logging.DEBUG): debug('Tracing program with the following non-deterministic events') - for event in self.nondet_events: + for event in self.nondet_events.events(): debug(event) # Trace concolically @@ -282,7 +264,7 @@ class SymbolicTracer: if self.env.stop_address is not None and pc == self.env.stop_address: break - assert(pc != 0) + self.nondet_events.update(self.target) # Disassemble instruction at the current PC tid = self.target.get_current_tid() @@ -310,7 +292,7 @@ class SymbolicTracer: continue raise # forward exception - is_event = self.is_stepping_instr(pc, instruction) + is_event = self.is_stepping_instr(instruction) # Run instruction conc_state = MiasmSymbolResolver(self.target, ctx.loc_db) diff --git a/src/focaccia/snapshot.py b/src/focaccia/snapshot.py index 03a03cd..f40ac5a 100644 --- a/src/focaccia/snapshot.py +++ b/src/focaccia/snapshot.py @@ -92,6 +92,13 @@ class ReadableProgramState: self.arch = arch self.strict = True + def read_pc(self) -> int: + """Read the PC value. + + :raise RegisterAccessError: If the register has not value. + """ + return self.read_register('pc') + def read_register(self, reg: str) -> int: """Read a register's value. -- cgit 1.4.1 From 6247fb055a5c1e3eddb9948b3abf4c1e766edc08 Mon Sep 17 00:00:00 2001 From: Theofilos Augoustis Date: Thu, 13 Nov 2025 18:47:40 +0000 Subject: Refactor event tracing in QEMU tool --- src/focaccia/arch/x86.py | 13 +-- src/focaccia/compare.py | 2 +- src/focaccia/deterministic.py | 17 ++- src/focaccia/native/tracer.py | 4 +- src/focaccia/qemu/_qemu_tool.py | 208 ++++++++++++++++++++++-------------- src/focaccia/qemu/deterministic.py | 9 ++ src/focaccia/qemu/syscall.py | 14 +++ src/focaccia/qemu/x86.py | 10 ++ src/focaccia/snapshot.py | 2 +- src/focaccia/tools/validate_qemu.py | 3 +- 10 files changed, 182 insertions(+), 100 deletions(-) create mode 100644 src/focaccia/qemu/deterministic.py create mode 100644 src/focaccia/qemu/syscall.py create mode 100644 src/focaccia/qemu/x86.py diff --git a/src/focaccia/arch/x86.py b/src/focaccia/arch/x86.py index b33c6bf..4609f4c 100644 --- a/src/focaccia/arch/x86.py +++ b/src/focaccia/arch/x86.py @@ -183,18 +183,6 @@ def compose_rflags(rflags: dict[str, int]) -> int: (0x00200000 if rflags.get('ID', 0) else 0) ) -# Incomplete, only the most common ones -emulatedSyscalls = { - 34: _Sc('pause', []), - 39: _Sc('getpid', []), - 102: _Sc('getuid', []), - 318: _Sc('getrandom', [('rdi', 'rsi', 'char')]), -} - -# Focaccia will do scheduling (and locking ???) -passthruSyscalls = { -} - class ArchX86(Arch): def __init__(self): super().__init__(archname, registers, 64) @@ -235,3 +223,4 @@ class ArchX86(Arch): def get_syscall_reg(self) -> str: return 'rax' + diff --git a/src/focaccia/compare.py b/src/focaccia/compare.py index 4fea451..3aa5ab2 100644 --- a/src/focaccia/compare.py +++ b/src/focaccia/compare.py @@ -34,7 +34,7 @@ def _calc_transformation(previous: ProgramState, current: ProgramState): try: prev_val = previous.read_register(reg) cur_val = current.read_register(reg) - transformation.set_register(reg, cur_val - prev_val) + transformation.write_register(reg, cur_val - prev_val) except RegisterAccessError: # Register is not set in either state pass diff --git a/src/focaccia/deterministic.py b/src/focaccia/deterministic.py index 91afd4e..6d76457 100644 --- a/src/focaccia/deterministic.py +++ b/src/focaccia/deterministic.py @@ -1,6 +1,7 @@ from .arch import Arch from .snapshot import ReadableProgramState +from reprlib import repr as alt_repr from typing import Callable class MemoryWriteHole: @@ -56,7 +57,7 @@ class Event: mem_write_repr = '' for mem_write in self.mem_writes: - mem_write_repr += f'{mem_write}\n' + mem_write_repr += f'{alt_repr(mem_write)}\n' repr_str = f'Thread {hex(self.tid)} executed event {self.event_type} at {hex(self.pc)}\n' repr_str += f'Register set:\n{reg_repr}' @@ -306,6 +307,13 @@ finally: return None return self._events[self._idx] + def next_event(self) -> Event | None: + if self._idx is None: + raise ValueError('Attempted to get next event without synchronizing') + if self._idx + 1 >= len(self._events): + return None + return self._events[self._idx+1] + def update(self, target: ReadableProgramState) -> Event | None: # Quick check candidates = self._pc_to_event.get(target.read_pc(), []) @@ -321,13 +329,14 @@ finally: self._in_event = True return self.current_event() - return self.next() + return self.update_to_next() - def next(self) -> Event | None: + def update_to_next(self, count: int = 1) -> Event | None: if self._idx is None: raise ValueError('Attempted to get next event without synchronizing') - self._idx += 1 + self._in_event = True + self._idx += count return self.current_event() def __bool__(self) -> bool: diff --git a/src/focaccia/native/tracer.py b/src/focaccia/native/tracer.py index b369b22..4376f41 100644 --- a/src/focaccia/native/tracer.py +++ b/src/focaccia/native/tracer.py @@ -217,12 +217,12 @@ class SymbolicTracer: self.target.run() debug(f'Completed handling event: {current_event}') - self.nondet_events.next() + self.nondet_events.update_to_next() def is_stepping_instr(self, instruction: Instruction) -> bool: if self.nondet_events.current_event(): debug('Current instruction matches next event; stepping through it') - self.nondet_events.next() + self.nondet_events.update_to_next() return True else: if self.target.arch.is_instr_syscall(str(instruction)): diff --git a/src/focaccia/qemu/_qemu_tool.py b/src/focaccia/qemu/_qemu_tool.py index ccdf2fb..75b142e 100644 --- a/src/focaccia/qemu/_qemu_tool.py +++ b/src/focaccia/qemu/_qemu_tool.py @@ -19,7 +19,8 @@ from focaccia.snapshot import ProgramState, ReadableProgramState, \ from focaccia.symbolic import SymbolicTransform, eval_symbol, ExprMem from focaccia.trace import Trace, TraceEnvironment from focaccia.utils import print_result -from focaccia.deterministic import DeterministicLog, Event +from focaccia.deterministic import DeterministicLog, DeterministicEventIterator, Event, SyscallEvent +from focaccia.qemu.deterministic import emulated_system_calls from focaccia.tools.validate_qemu import make_argparser, verbosity @@ -37,7 +38,13 @@ qemu_crash = { 'snap': None, } -class GDBProgramState(ReadableProgramState): +def match_event(event: Event, target: ReadableProgramState) -> bool: + # Match just on PC + if event.pc == target.read_pc(): + return True + return False + +class GDBProgramState(ProgramState): from focaccia.arch import aarch64, x86 flag_register_names = { @@ -122,13 +129,12 @@ class GDBProgramState(ReadableProgramState): raise MemoryAccessError(addr, size, str(err)) class GDBServerStateIterator: - def __init__(self, remote: str, deterministic_log: list[Event] | None): + def __init__(self, remote: str, deterministic_log: DeterministicLog): gdb.execute('set pagination 0') gdb.execute('set sysroot') gdb.execute('set python print-stack full') # enable complete Python tracebacks gdb.execute(f'target remote {remote}') self._deterministic_log = deterministic_log - self._deterministic_idx = 0 self._process = gdb.selected_inferior() self._first_next = True @@ -138,96 +144,129 @@ class GDBServerStateIterator: archname = split[1] if len(split) > 1 else split[0] archname = archname.replace('-', '_') if archname not in supported_architectures: - print(f'Error: Current platform ({archname}) is not' - f' supported by Focaccia. Exiting.') - exit(1) + raise NotImplementedError(f'Platform {archname} is not supported by Focaccia') self.arch = supported_architectures[archname] self.binary = self._process.progspace.filename - def _handle_sync_point(self, call: int, addr: int, length: int, arch: Arch): - def _search_next_event(addr: int, idx: int) -> Event | None: - if self._deterministic_log is None: - return idx, None - for i in range(idx, len(self._deterministic_log)): - event = self._deterministic_log[i] - if event.pc == addr: - return i, event - return idx, None - - _new_pc = addr + length - print(f'Handling syscall at {hex(_new_pc)} with call number {call}') - if int(call) in arch.get_em_syscalls().keys(): - - #print(f'Events: {self._deterministic_log[self._deterministic_idx:]}') - i, e = _search_next_event(_new_pc, self._deterministic_idx) - if e is None: - raise Exception(f'No matching event found in deterministic log \ - for syscall at {hex(_new_pc)}') - - e = self._deterministic_log[i+1] - print(f'Adjusting w/ Event: {e}') - gdb.execute(f'set $pc = {hex(_new_pc)}') - self._deterministic_idx = i+2 - - reg_name = arch.get_syscall_reg() - gdb.execute(f'set $rax = {hex(e.registers.get("{reg_name}", 0))}') - - assert(len(arch.get_em_syscalls()[int(call)].outputs) == len(e.mem_writes)) - - w_idx = 0 - for _reg, _size, _type in arch.get_em_syscalls()[int(call)].outputs: - if arch.to_regname(_size) is not None: - _size = e.registers[_size] - else: - _size = int(_size) - - _addr_rr = e.registers[_reg] - _w_rr = e.mem_writes[w_idx] - w_idx += 1 - - assert (_size == _w_rr.size), f'{_size} != {_w_rr.size}' - _addr = gdb.selected_frame().read_register(_reg) - cmd = f'set {{char[{_size}]}}{hex(_addr)} = 0x{_w_rr.data.hex()}' - # print(f'GDB: {cmd}') - gdb.execute(cmd) - - return _new_pc - - return addr + self._deterministic_events = DeterministicEventIterator(self._deterministic_log, match_event) + + # Filter non-deterministic events for event after start + self._deterministic_events.update(self.current_state()) + self._deterministic_events.update_to_next() + + def current_state(self) -> ReadableProgramState: + return GDBProgramState(self._process, gdb.selected_frame(), self.arch) + + def _handle_syscall(self) -> GDBProgramState: + cur_event = self._deterministic_events.current_event() + call = cur_event.registers.get(self.arch.get_syscall_reg()) + + post_event = self._deterministic_events.update_to_next() + syscall = emulated_system_calls[self.arch.archname].get(call, None) + debug(f'Handling event:\n{cur_event}') + if syscall is not None: + info(f'Replaying system call number {hex(call)}') + + self.skip(post_event.pc) + next_state = GDBProgramState(self._process, gdb.selected_frame(), self.arch) + + patchup_regs = [self.arch.get_syscall_reg(), *(syscall.patchup_registers or [])] + for reg in patchup_regs: + next_state.write_register(reg, post_event.registers.get(reg)) + + for mem in post_event.mem_writes: + # TODO: handle holes + # TODO: address mapping + addr, data = mem.address, mem.data + next_state.write_memory(addr, data) + + return next_state + + info(f'System call number {hex(call)} not replayed') + self._step() + if self._is_exited(): + raise StopIteration + return GDBProgramState(self._process, gdb.selected_frame(), self.arch) + + def _handle_event(self) -> GDBProgramState: + current_event = self._deterministic_events.current_event() + if not current_event: + return self.current_state() + + if isinstance(current_event, SyscallEvent): + return self._handle_syscall() + + warn(f'Event handling for events of type {current_event.event_type} not implemented') + return self.current_state() + + def _is_exited(self) -> bool: + return not self._process.is_valid() or len(self._process.threads()) == 0 def __iter__(self): return self - def __next__(self): + def __next__(self) -> ReadableProgramState: # The first call to __next__ should yield the first program state, # i.e. before stepping the first time if self._first_next: self._first_next = False return GDBProgramState(self._process, gdb.selected_frame(), self.arch) + if match_event(self._deterministic_events.current_event(), self.current_state()): + state = self._handle_event() + if self._is_exited(): + raise StopIteration + self._deterministic_events.update_to_next() + return state + # Step pc = gdb.selected_frame().read_register('pc') new_pc = pc while pc == new_pc: # Skip instruction chains from REP STOS etc. - gdb.execute('si', to_string=True) - if not self._process.is_valid() or len(self._process.threads()) == 0: + self._step() + if self._is_exited(): raise StopIteration new_pc = gdb.selected_frame().read_register('pc') - if self._deterministic_log is not None: - asm = gdb.selected_frame().architecture().disassemble(new_pc, count=1)[0] - if 'syscall' in asm['asm']: - call_reg = self.arch.get_syscall_reg() - new_pc = self._handle_sync_point(gdb.selected_frame().read_register(call_reg), asm['addr'], asm['length'], self.arch) - return GDBProgramState(self._process, gdb.selected_frame(), self.arch) + return self.current_state() + + def run_until(self, addr: int) -> ReadableProgramState: + events_handled = 0 + event = self._deterministic_events.current_event() + while event: + state = self._run_until_any([addr, event.pc]) + if state.read_pc() == addr: + # Check if we started at the very _start + self._first_next = events_handled == 0 + return state + + self._handle_event() + + event = self._deterministic_events.update_to_next() + events_handled += 1 + return self._run_until_any([addr]) + + def _run_until_any(self, addresses: list[int]) -> ReadableProgramState: + info(f'Executing until {[hex(x) for x in addresses]}') + + breakpoints = [] + for addr in addresses: + breakpoints.append(gdb.Breakpoint(f'*{addr:#x}')) - def run_until(self, addr: int): - breakpoint = gdb.Breakpoint(f'*{addr:#x}') gdb.execute('continue') - breakpoint.delete() + + for bp in breakpoints: + bp.delete() + return GDBProgramState(self._process, gdb.selected_frame(), self.arch) + def skip(self, new_pc: int): + gdb.execute(f'set $pc = {hex(new_pc)}') + + def _step(self): + gdb.execute('si', to_string=True) + def record_minimal_snapshot(prev_state: ReadableProgramState, cur_state: ReadableProgramState, prev_transform: SymbolicTransform, @@ -270,7 +309,7 @@ def record_minimal_snapshot(prev_state: ReadableProgramState, for regname in regs: try: regval = cur_state.read_register(regname) - out_state.set_register(regname, regval) + out_state.write_register(regname, regval) except RegisterAccessError: pass for mem in mems: @@ -283,7 +322,7 @@ def record_minimal_snapshot(prev_state: ReadableProgramState, pass state = ProgramState(cur_transform.arch) - state.set_register('PC', cur_transform.addr) + state.write_register('PC', cur_transform.addr) set_values(prev_transform.changed_regs.keys(), get_written_addresses(prev_transform), @@ -334,24 +373,33 @@ def collect_conc_trace(gdb: GDBServerStateIterator, \ cur_state = next(state_iter) symb_i = 0 + if logger.isEnabledFor(logging.DEBUG): + debug('Tracing program with the following non-deterministic events:') + for event in gdb._deterministic_events.events(): + debug(event) + # Skip to start + pc = cur_state.read_register('pc') + start_addr = start_addr if start_addr else pc try: - pc = cur_state.read_register('pc') - if start_addr and pc != start_addr: - info(f'Tracing QEMU from starting address: {hex(start_addr)}') + if pc != start_addr: + info(f'Executing until starting address {hex(start_addr)}') cur_state = state_iter.run_until(start_addr) except Exception as e: - if start_addr: + if pc != start_addr: raise Exception(f'Unable to reach start address {hex(start_addr)}: {e}') raise Exception(f'Unable to trace: {e}') # An online trace matching algorithm. + info(f'Tracing QEMU between {hex(start_addr)}:{hex(stop_addr) if stop_addr else "end"}') while True: try: pc = cur_state.read_register('pc') + if stop_addr and pc == stop_addr: + break while pc != strace[symb_i].addr: - info(f'PC {hex(pc)} does not match next symbolic reference {hex(strace[symb_i].addr)}') + warn(f'PC {hex(pc)} does not match next symbolic reference {hex(strace[symb_i].addr)}') next_i = find_index(strace[symb_i+1:], pc, lambda t: t.addr) @@ -409,11 +457,13 @@ def main(): logging_level = getattr(logging, args.error_level.upper(), logging.INFO) logging.basicConfig(level=logging_level, force=True) - if args.deterministic is not None: - replay_log = DeterministicLog(log_dir=args.deterministic) + detlog = DeterministicLog(args.deterministic_log) + if args.deterministic_log and detlog.base_directory is None: + raise NotImplementedError(f'Deterministic log {args.deterministic_log} specified but ' + 'Focaccia built without deterministic log support') try: - gdb_server = GDBServerStateIterator(args.remote, replay_log.events()) + gdb_server = GDBServerStateIterator(args.remote, detlog) except Exception as e: raise Exception(f'Unable to perform basic GDB setup: {e}') diff --git a/src/focaccia/qemu/deterministic.py b/src/focaccia/qemu/deterministic.py new file mode 100644 index 0000000..d2d314e --- /dev/null +++ b/src/focaccia/qemu/deterministic.py @@ -0,0 +1,9 @@ +from focaccia.qemu.x86 import emulated_system_calls as x86_emu_syscalls + +emulated_system_calls = { + 'x86_64': x86_emu_syscalls, + 'aarch64': { }, + 'aarch64l': { }, + 'aarch64b': { } +} + diff --git a/src/focaccia/qemu/syscall.py b/src/focaccia/qemu/syscall.py new file mode 100644 index 0000000..956f5c9 --- /dev/null +++ b/src/focaccia/qemu/syscall.py @@ -0,0 +1,14 @@ +class SyscallInfo: + def __init__(self, + name: str, + patchup_registers: list[str] | None = None, + patchup_address_registers: list[str] | None = None): + """Describes a syscall by its name and outputs. + + :param name: The name of a system call. + :param patchup_registers: Registers that must be replaced with deterministic values. + """ + self.name = name + self.patchup_registers = patchup_registers + self.patchup_address_registers = patchup_address_registers + diff --git a/src/focaccia/qemu/x86.py b/src/focaccia/qemu/x86.py new file mode 100644 index 0000000..347bbc4 --- /dev/null +++ b/src/focaccia/qemu/x86.py @@ -0,0 +1,10 @@ +from focaccia.qemu.syscall import SyscallInfo + +# Incomplete, only the most common ones +emulated_system_calls = { + 34: SyscallInfo('pause', []), + 39: SyscallInfo('getpid', []), + 102: SyscallInfo('getuid', []), + 318: SyscallInfo('getrandom', patchup_address_registers=['rdi']) +} + diff --git a/src/focaccia/snapshot.py b/src/focaccia/snapshot.py index f40ac5a..93241c1 100644 --- a/src/focaccia/snapshot.py +++ b/src/focaccia/snapshot.py @@ -161,7 +161,7 @@ class ProgramState(ReadableProgramState): return (regval & acc.mask) >> acc.start - def set_register(self, reg: str, value: int): + def write_register(self, reg: str, value: int): """Assign a value to a register. :raise RegisterAccessError: If `reg` is not a register name. diff --git a/src/focaccia/tools/validate_qemu.py b/src/focaccia/tools/validate_qemu.py index 3e0db89..4b5160f 100755 --- a/src/focaccia/tools/validate_qemu.py +++ b/src/focaccia/tools/validate_qemu.py @@ -29,6 +29,7 @@ from focaccia.arch import supported_architectures from focaccia.qemu.validation_server import start_validation_server verbosity = { + 'debug': ErrorTypes.INFO, 'info': ErrorTypes.INFO, 'warning': ErrorTypes.POSSIBLE, 'error': ErrorTypes.CONFIRMED, @@ -83,7 +84,7 @@ memory, and stepping forward by single instructions. type=str, default='gdb', help='GDB binary to invoke.') - prog.add_argument('--deterministic', default=None, + prog.add_argument('--deterministic-log', default=None, help='The directory containing rr traces') return prog -- cgit 1.4.1 From fe56719f06e6fd53ae0d897cf29cee6456a0e1db Mon Sep 17 00:00:00 2001 From: Theofilos Augoustis Date: Mon, 17 Nov 2025 19:06:25 +0000 Subject: Refactor iteration interface over events --- src/focaccia/deterministic.py | 182 ++++++++++++++++++++++++++++------------ src/focaccia/native/tracer.py | 48 ++++------- src/focaccia/qemu/_qemu_tool.py | 57 +++++++------ 3 files changed, 176 insertions(+), 111 deletions(-) diff --git a/src/focaccia/deterministic.py b/src/focaccia/deterministic.py index 6d76457..41d49b8 100644 --- a/src/focaccia/deterministic.py +++ b/src/focaccia/deterministic.py @@ -2,7 +2,7 @@ from .arch import Arch from .snapshot import ReadableProgramState from reprlib import repr as alt_repr -from typing import Callable +from typing import Callable, Tuple, Optional class MemoryWriteHole: def __init__(self, offset: int, size: int): @@ -285,60 +285,136 @@ except Exception: def tasks(self) -> list[Task]: return [] def mmaps(self) -> list[MemoryMapping]: return [] finally: - class DeterministicEventIterator: - def __init__(self, deterministic_log: DeterministicLog, match_fn: Callable): - self._detlog = deterministic_log - self._events = self._detlog.events() - self._pc_to_event = {} - self._match = match_fn - self._idx: int | None = None # None represents no current event - self._in_event: bool = False - - idx = 0 - for event in self._events: - self._pc_to_event.setdefault(event.pc, []).append((event, idx)) + class EventMatcher: + def __init__(self, + events: list[Event], + match_fn: Callable, + from_state: ReadableProgramState | None = None): + self.events = events + self.matcher = match_fn + + self.matched_count = None + if from_state: + self.match(from_state) + + def match(self, state: ReadableProgramState) -> Event | None: + if self.matched_count is None: + # Need to synchronize + # Search for match + for idx in range(len(self.events)): + event = self.events[idx] + if self.matcher(event, state): + self.matched_count = idx + 1 + return event + + if self.matched_count is None: + return None + + event = self.events[self.matched_count] + if self.matcher(event, state): + self.matched_count += 1 # proceed to next + return event + + return None + + def next(self): + if self.matched_count is None: + raise ValueError('Cannot get next event with unsynchronized event matcher') + if self.matched_count < len(self.events): + return self.events[self.matched_count] + return None + + def match_pair(self, state: ReadableProgramState): + event = self.match(state) + if event is None: + return None, None + if isinstance(event, SyscallEvent) and event.syscall_state == 'exiting': + self.matched_count = None + return None, None + assert(self.matched_count is not None) + post_event = self.events[self.matched_count] + self.matched_count += 1 + return event, post_event + + def __bool__(self) -> bool: + return len(self.events) > 0 + + class MappingMatcher: + def __init__(self, memory_mappings: list[MemoryMapping]): + self.memory_mappings = memory_mappings + self.matched_count = None + + def match(self, event_count: int) -> MemoryMapping | None: + if self.matched_count is None: + # Need to synchronize + # Search for match + for idx in range(len(self.memory_mappings)): + mapping = self.memory_mappings[idx] + if mapping.event_count == event_count: + self.matched_count = idx + 1 + return mapping + + if self.matched_count is None: + return None + + mapping = self.memory_mappings[self.matched_count] + if mapping.event_count == event_count: + self.matched_count += 1 # proceed to next + return mapping + + return None + + def next(self): + if self.matched_count is None: + raise ValueError('Cannot get next mapping with unsynchronized mapping matcher') + if self.matched_count < len(self.memory_mappings): + return self.memory_mappings[self.matched_count] + return None + + def __bool__(self) -> bool: + return len(self.memory_mappings) > 0 + + class LogStateMatcher: + def __init__(self, + events: list[Event], + memory_mappings: list[MemoryMapping], + event_match_fn: Callable, + from_state: ReadableProgramState | None = None): + self.event_matcher = EventMatcher(events, event_match_fn, from_state) + self.mapping_matcher = MappingMatcher(memory_mappings) def events(self) -> list[Event]: - return self._events - - def current_event(self) -> Event | None: - # No event when not synchronized - if self._idx is None or not self._in_event: - return None - return self._events[self._idx] - - def next_event(self) -> Event | None: - if self._idx is None: - raise ValueError('Attempted to get next event without synchronizing') - if self._idx + 1 >= len(self._events): - return None - return self._events[self._idx+1] - - def update(self, target: ReadableProgramState) -> Event | None: - # Quick check - candidates = self._pc_to_event.get(target.read_pc(), []) - if len(candidates) == 0: - self._in_event = False - return None - - # Find synchronization point - if self._idx is None: - for event, idx in candidates: - if self._match(event, target): - self._idx = idx - self._in_event = True - return self.current_event() - - return self.update_to_next() - - def update_to_next(self, count: int = 1) -> Event | None: - if self._idx is None: - raise ValueError('Attempted to get next event without synchronizing') - - self._in_event = True - self._idx += count - return self.current_event() + return self.event_matcher.events + + def mappings(self) -> list[MemoryMapping]: + return self.mapping_matcher.memory_mappings + + def matched_events(self) -> Optional[int]: + return self.event_matcher.matched_count + + def match(self, state: ReadableProgramState) -> Tuple[Optional[Event], Optional[MemoryMapping]]: + event = self.event_matcher.match(state) + if not event: + return None, None + assert(self.event_matcher.matched_count is not None) + mapping = self.mapping_matcher.match(self.event_matcher.matched_count) + return event, mapping + + def match_pair(self, state: ReadableProgramState) -> Tuple[Optional[Event], Optional[Event], Optional[MemoryMapping]]: + event, post_event = self.event_matcher.match_pair(state) + if not event: + return None, None, None + assert(self.event_matcher.matched_count is not None) + mapping = self.mapping_matcher.match(self.event_matcher.matched_count-1) + return event, post_event, mapping + + def next(self) -> Tuple[Optional[Event], Optional[MemoryMapping]]: + next_event = self.event_matcher.next() + if not next_event: + return None, None + assert(self.event_matcher.matched_count is not None) + return next_event, self.mapping_matcher.match(self.event_matcher.matched_count) def __bool__(self) -> bool: - return len(self.events()) > 0 + return bool(self.event_matcher) diff --git a/src/focaccia/native/tracer.py b/src/focaccia/native/tracer.py index 4376f41..b2ca0d8 100644 --- a/src/focaccia/native/tracer.py +++ b/src/focaccia/native/tracer.py @@ -12,7 +12,7 @@ from focaccia.trace import Trace, TraceEnvironment from focaccia.miasm_util import MiasmSymbolResolver from focaccia.snapshot import ReadableProgramState, RegisterAccessError from focaccia.symbolic import SymbolicTransform, DisassemblyContext, run_instruction -from focaccia.deterministic import Event, DeterministicEventIterator +from focaccia.deterministic import Event, EventMatcher from .lldb_target import LLDBConcreteTarget, LLDBLocalTarget, LLDBRemoteTarget @@ -154,8 +154,6 @@ class SymbolicTracer: self.cross_validate = cross_validate self.target = SpeculativeTracer(self.create_debug_target()) - self.nondet_events = DeterministicEventIterator(self.env.detlog, match_event) - def create_debug_target(self) -> LLDBConcreteTarget: binary = self.env.binary_name if self.remote is False: @@ -208,30 +206,10 @@ class SymbolicTracer: f' mem[{hex(addr)}:{hex(addr+len(data))}] = {conc_data}.' f'\nFaulty transformation: {transform}') - def post_event(self) -> None: - current_event = self.nondet_events.current_event() - if current_event: - if current_event.pc == 0: - # Exit sequence - debug('Completed exit event') - self.target.run() - - debug(f'Completed handling event: {current_event}') - self.nondet_events.update_to_next() - - def is_stepping_instr(self, instruction: Instruction) -> bool: - if self.nondet_events.current_event(): - debug('Current instruction matches next event; stepping through it') - self.nondet_events.update_to_next() - return True - else: - if self.target.arch.is_instr_syscall(str(instruction)): - return True - return False - def progress(self, new_pc, step: bool = False) -> int | None: self.target.speculate(new_pc) if step: + info(f'Stepping through event at {hex(self.target.read_pc())}') self.target.progress_execution() if self.target.is_exited(): return None @@ -251,9 +229,10 @@ class SymbolicTracer: ctx = DisassemblyContext(self.target) arch = ctx.arch + event_matcher = EventMatcher(self.env.detlog.events(), match_event, self.target) if logger.isEnabledFor(logging.DEBUG): debug('Tracing program with the following non-deterministic events') - for event in self.nondet_events.events(): + for event in event_matcher.events: debug(event) # Trace concolically @@ -262,10 +241,9 @@ class SymbolicTracer: pc = self.target.read_pc() if self.env.stop_address is not None and pc == self.env.stop_address: + info(f'Reached stop address at {hex(pc)}') break - self.nondet_events.update(self.target) - # Disassemble instruction at the current PC tid = self.target.get_current_tid() try: @@ -292,7 +270,8 @@ class SymbolicTracer: continue raise # forward exception - is_event = self.is_stepping_instr(instruction) + event, post_event = event_matcher.match_pair(self.target) + in_event = (event and event_matcher) or self.target.arch.is_instr_syscall(str(instruction)) # Run instruction conc_state = MiasmSymbolResolver(self.target, ctx.loc_db) @@ -311,7 +290,7 @@ class SymbolicTracer: new_pc = int(new_pc) transform = SymbolicTransform(tid, modified, [instruction], arch, pc, new_pc) pred_regs, pred_mems = self.predict_next_state(instruction, transform) - self.progress(new_pc, step=is_event) + self.progress(new_pc, step=in_event) try: self.validate(instruction, transform, pred_regs, pred_mems) @@ -321,7 +300,7 @@ class SymbolicTracer: continue raise else: - new_pc = self.progress(new_pc, step=is_event) + new_pc = self.progress(new_pc, step=in_event) if new_pc is None: transform = SymbolicTransform(tid, modified, [instruction], arch, pc, 0) strace.append(transform) @@ -330,8 +309,13 @@ class SymbolicTracer: strace.append(transform) - if is_event: - self.post_event() + if post_event: + if post_event.pc == 0: + # Exit sequence + debug('Completed exit event') + self.target.run() + + debug(f'Completed handling event: {post_event}') return Trace(strace, self.env) diff --git a/src/focaccia/qemu/_qemu_tool.py b/src/focaccia/qemu/_qemu_tool.py index 75b142e..188ecf2 100644 --- a/src/focaccia/qemu/_qemu_tool.py +++ b/src/focaccia/qemu/_qemu_tool.py @@ -9,7 +9,7 @@ work to do. import gdb import logging import traceback -from typing import Iterable +from typing import Iterable, Optional import focaccia.parser as parser from focaccia.arch import supported_architectures, Arch @@ -19,7 +19,13 @@ from focaccia.snapshot import ProgramState, ReadableProgramState, \ from focaccia.symbolic import SymbolicTransform, eval_symbol, ExprMem from focaccia.trace import Trace, TraceEnvironment from focaccia.utils import print_result -from focaccia.deterministic import DeterministicLog, DeterministicEventIterator, Event, SyscallEvent +from focaccia.deterministic import ( + DeterministicLog, + LogStateMatcher, + Event, + SyscallEvent, + MemoryMapping, +) from focaccia.qemu.deterministic import emulated_system_calls from focaccia.tools.validate_qemu import make_argparser, verbosity @@ -40,6 +46,7 @@ qemu_crash = { def match_event(event: Event, target: ReadableProgramState) -> bool: # Match just on PC + debug(f'Matching for PC {hex(target.read_pc())} with event {hex(event.pc)}') if event.pc == target.read_pc(): return True return False @@ -149,23 +156,20 @@ class GDBServerStateIterator: self.arch = supported_architectures[archname] self.binary = self._process.progspace.filename - self._deterministic_events = DeterministicEventIterator(self._deterministic_log, match_event) - - # Filter non-deterministic events for event after start - self._deterministic_events.update(self.current_state()) - self._deterministic_events.update_to_next() + self._log_matcher = LogStateMatcher(self._deterministic_log.events(), + self._deterministic_log.mmaps(), + match_event, + from_state=self.current_state()) + info(f'Synchronizing at PC {hex(self.current_state().read_pc())} with {self._log_matcher.matched_events()}') def current_state(self) -> ReadableProgramState: return GDBProgramState(self._process, gdb.selected_frame(), self.arch) - def _handle_syscall(self) -> GDBProgramState: - cur_event = self._deterministic_events.current_event() - call = cur_event.registers.get(self.arch.get_syscall_reg()) + def _handle_syscall(self, event: Event, post_event: Event) -> GDBProgramState: + call = event.registers.get(self.arch.get_syscall_reg()) - post_event = self._deterministic_events.update_to_next() syscall = emulated_system_calls[self.arch.archname].get(call, None) - debug(f'Handling event:\n{cur_event}') - if syscall is not None: + if syscall is not None and False: info(f'Replaying system call number {hex(call)}') self.skip(post_event.pc) @@ -189,15 +193,14 @@ class GDBServerStateIterator: raise StopIteration return GDBProgramState(self._process, gdb.selected_frame(), self.arch) - def _handle_event(self) -> GDBProgramState: - current_event = self._deterministic_events.current_event() - if not current_event: + def _handle_event(self, event: Event | None, post_event: Event | None) -> GDBProgramState: + if not event: return self.current_state() - if isinstance(current_event, SyscallEvent): - return self._handle_syscall() + if isinstance(event, SyscallEvent): + return self._handle_syscall(event, post_event) - warn(f'Event handling for events of type {current_event.event_type} not implemented') + warn(f'Event handling for events of type {event.event_type} not implemented') return self.current_state() def _is_exited(self) -> bool: @@ -213,11 +216,12 @@ class GDBServerStateIterator: self._first_next = False return GDBProgramState(self._process, gdb.selected_frame(), self.arch) - if match_event(self._deterministic_events.current_event(), self.current_state()): - state = self._handle_event() + event, post_event, _ = self._log_matcher.match_pair(self.current_state()) + if event: + state = self._handle_event(event, post_event) if self._is_exited(): raise StopIteration - self._deterministic_events.update_to_next() + return state # Step @@ -233,7 +237,7 @@ class GDBServerStateIterator: def run_until(self, addr: int) -> ReadableProgramState: events_handled = 0 - event = self._deterministic_events.current_event() + event, _ = self._log_matcher.next() while event: state = self._run_until_any([addr, event.pc]) if state.read_pc() == addr: @@ -241,9 +245,10 @@ class GDBServerStateIterator: self._first_next = events_handled == 0 return state - self._handle_event() + event, post_event, _ = self._log_matcher.match_pair(self.current_state()) + self._handle_event(event, post_event) - event = self._deterministic_events.update_to_next() + event, _ = self._log_matcher.next() events_handled += 1 return self._run_until_any([addr]) @@ -375,7 +380,7 @@ def collect_conc_trace(gdb: GDBServerStateIterator, \ if logger.isEnabledFor(logging.DEBUG): debug('Tracing program with the following non-deterministic events:') - for event in gdb._deterministic_events.events(): + for event in gdb._log_matcher.events(): debug(event) # Skip to start -- cgit 1.4.1 From 0ef1c1f2a5a905623288ea0f8d03b416b200c6ff Mon Sep 17 00:00:00 2001 From: Theofilos Augoustis Date: Tue, 18 Nov 2025 09:01:15 +0000 Subject: Retrieve all mappings at once --- src/focaccia/deterministic.py | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/src/focaccia/deterministic.py b/src/focaccia/deterministic.py index 41d49b8..1cfda0d 100644 --- a/src/focaccia/deterministic.py +++ b/src/focaccia/deterministic.py @@ -344,25 +344,26 @@ finally: self.memory_mappings = memory_mappings self.matched_count = None - def match(self, event_count: int) -> MemoryMapping | None: + def match(self, event_count: int) -> list[MemoryMapping]: if self.matched_count is None: # Need to synchronize # Search for match + mappings = [] for idx in range(len(self.memory_mappings)): mapping = self.memory_mappings[idx] if mapping.event_count == event_count: self.matched_count = idx + 1 - return mapping + mappings.append(mapping) + return mappings - if self.matched_count is None: - return None + mappings = [] + while self.matched_count < len(self.memory_mappings): + mapping = self.memory_mappings[self.matched_count] + if mapping.event_count == event_count: + self.matched_count += 1 # proceed to next + mappings.append(mapping) - mapping = self.memory_mappings[self.matched_count] - if mapping.event_count == event_count: - self.matched_count += 1 # proceed to next - return mapping - - return None + return mappings def next(self): if self.matched_count is None: @@ -392,26 +393,26 @@ finally: def matched_events(self) -> Optional[int]: return self.event_matcher.matched_count - def match(self, state: ReadableProgramState) -> Tuple[Optional[Event], Optional[MemoryMapping]]: + def match(self, state: ReadableProgramState) -> Tuple[Optional[Event], list[MemoryMapping]]: event = self.event_matcher.match(state) if not event: - return None, None + return None, [] assert(self.event_matcher.matched_count is not None) mapping = self.mapping_matcher.match(self.event_matcher.matched_count) return event, mapping - def match_pair(self, state: ReadableProgramState) -> Tuple[Optional[Event], Optional[Event], Optional[MemoryMapping]]: + def match_pair(self, state: ReadableProgramState) -> Tuple[Optional[Event], Optional[Event], list[MemoryMapping]]: event, post_event = self.event_matcher.match_pair(state) if not event: - return None, None, None + return None, None, [] assert(self.event_matcher.matched_count is not None) mapping = self.mapping_matcher.match(self.event_matcher.matched_count-1) return event, post_event, mapping - def next(self) -> Tuple[Optional[Event], Optional[MemoryMapping]]: + def next(self) -> Tuple[Optional[Event], list[MemoryMapping]]: next_event = self.event_matcher.next() if not next_event: - return None, None + return None, [] assert(self.event_matcher.matched_count is not None) return next_event, self.mapping_matcher.match(self.event_matcher.matched_count) -- cgit 1.4.1 From dcaeea54a6bef313c82dc55359deb8c27c1ecb38 Mon Sep 17 00:00:00 2001 From: Theofilos Augoustis Date: Tue, 18 Nov 2025 12:18:38 +0000 Subject: Add basic QEMU section reading capabilities --- src/focaccia/deterministic.py | 10 ++++-- src/focaccia/qemu/_qemu_tool.py | 67 +++++++++++++++++++++++++++++++++++++++-- 2 files changed, 72 insertions(+), 5 deletions(-) diff --git a/src/focaccia/deterministic.py b/src/focaccia/deterministic.py index 1cfda0d..ffd519c 100644 --- a/src/focaccia/deterministic.py +++ b/src/focaccia/deterministic.py @@ -180,7 +180,8 @@ class MemoryMapping: source: str, offset: int, mmap_prot: int, - mmap_flags: int): + mmap_flags: int, + name: str | None = None): self.event_count = event_count self.start_address = start_address self.length = end_address - self.start_address @@ -188,10 +189,13 @@ class MemoryMapping: self.offset = offset self.mmap_prot = mmap_prot self.mmap_flags = mmap_flags + self.name = name def __repr__(self) -> str: - return f'Memory mapping at event {self.event_count}\n' \ - f'start = {hex(self.start_address)}\n' \ + header = f'Memory mapping at event {self.event_count}\n' + if self.name: + header += f'name = {self.name}\n' + return header + f'start = {hex(self.start_address)}\n' \ f'length = {self.length}\n' \ f'source = {self.source}\n' \ f'offset = {self.offset}\n' \ diff --git a/src/focaccia/qemu/_qemu_tool.py b/src/focaccia/qemu/_qemu_tool.py index 188ecf2..e6394e5 100644 --- a/src/focaccia/qemu/_qemu_tool.py +++ b/src/focaccia/qemu/_qemu_tool.py @@ -6,6 +6,7 @@ But please use `tools/validate_qemu.py` instead because we have some more setup work to do. """ +import re import gdb import logging import traceback @@ -156,11 +157,13 @@ class GDBServerStateIterator: self.arch = supported_architectures[archname] self.binary = self._process.progspace.filename + first_state = self.current_state() self._log_matcher = LogStateMatcher(self._deterministic_log.events(), self._deterministic_log.mmaps(), match_event, - from_state=self.current_state()) - info(f'Synchronizing at PC {hex(self.current_state().read_pc())} with {self._log_matcher.matched_events()}') + from_state=first_state) + event, _ = self._log_matcher.match(first_state) + info(f'Synchronized at PC={hex(first_state.read_pc())} to event:\n{event}') def current_state(self) -> ReadableProgramState: return GDBProgramState(self._process, gdb.selected_frame(), self.arch) @@ -272,6 +275,66 @@ class GDBServerStateIterator: def _step(self): gdb.execute('si', to_string=True) + def get_sections(self) -> list[MemoryMapping]: + mappings = [] + + # Skip everything until the header line + started = False + + text = gdb.execute('info proc mappings', to_string=True) + for line in text.splitlines(): + line = line.strip() + if not line: + continue + + # Detect header line once + if line.startswith("Start Addr"): + started = True + continue + + if not started: + continue + + # Lines look like: + # 0x0000000000400000 0x0000000000401000 0x1000 0x0 r--p /path + # or: + # 0x... 0x... 0x... 0x... rw-p [vdso] + parts = line.split(None, 6) + + if len(parts) < 5: + continue + + start = int(parts[0], 16) + end = int(parts[1], 16) + size = int(parts[2], 16) + offset = int(parts[3], 16) + perms = parts[4] + + file_or_tag = None + is_special = False + + if len(parts) >= 6: + tail = parts[5] + + # If it's [tag], mark as special + if tail.startswith("[") and tail.endswith("]"): + file_or_tag = tail.strip() + is_special = True + else: + # Might be a filename or absent + file_or_tag = tail + + mapping = MemoryMapping(0, + start, + end, + '', + offset, + 0, + 0) + mappings.append(mapping) + + return mappings + def record_minimal_snapshot(prev_state: ReadableProgramState, cur_state: ReadableProgramState, prev_transform: SymbolicTransform, -- cgit 1.4.1 From 5fa6cc2c48aa60ed7d6f001f32526db396b8b871 Mon Sep 17 00:00:00 2001 From: Theofilos Augoustis Date: Tue, 18 Nov 2025 14:11:01 +0000 Subject: Reintroduce support for emulating getrandom --- src/focaccia/qemu/_qemu_tool.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/focaccia/qemu/_qemu_tool.py b/src/focaccia/qemu/_qemu_tool.py index e6394e5..4a51dce 100644 --- a/src/focaccia/qemu/_qemu_tool.py +++ b/src/focaccia/qemu/_qemu_tool.py @@ -172,21 +172,30 @@ class GDBServerStateIterator: call = event.registers.get(self.arch.get_syscall_reg()) syscall = emulated_system_calls[self.arch.archname].get(call, None) - if syscall is not None and False: + if syscall is not None: info(f'Replaying system call number {hex(call)}') self.skip(post_event.pc) - next_state = GDBProgramState(self._process, gdb.selected_frame(), self.arch) + next_state = self.current_state() patchup_regs = [self.arch.get_syscall_reg(), *(syscall.patchup_registers or [])] for reg in patchup_regs: - next_state.write_register(reg, post_event.registers.get(reg)) + gdb.parse_and_eval(f'${reg}={post_event.registers.get(reg)}') for mem in post_event.mem_writes: - # TODO: handle holes - # TODO: address mapping addr, data = mem.address, mem.data - next_state.write_memory(addr, data) + for reg, value in post_event.registers.items(): + if value == addr: + addr = next_state.read_register(reg) + break + + info(f'Replaying write to {hex(addr)} with data:\n{data.hex(" ")}') + + # Insert holes into data + for hole in mem.holes: + data[hole.offset:hole.offset] = b'\x00' * hole.size + self._process.write_memory(addr, data) + return next_state return next_state -- cgit 1.4.1 From 97d9a0330c3c09023a82d39b10100ff816b0d846 Mon Sep 17 00:00:00 2001 From: Theofilos Augoustis Date: Tue, 18 Nov 2025 15:37:26 +0000 Subject: Only handle events when tracing --- src/focaccia/qemu/_qemu_tool.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/src/focaccia/qemu/_qemu_tool.py b/src/focaccia/qemu/_qemu_tool.py index 4a51dce..7ca556b 100644 --- a/src/focaccia/qemu/_qemu_tool.py +++ b/src/focaccia/qemu/_qemu_tool.py @@ -22,8 +22,8 @@ from focaccia.trace import Trace, TraceEnvironment from focaccia.utils import print_result from focaccia.deterministic import ( DeterministicLog, - LogStateMatcher, Event, + EventMatcher, SyscallEvent, MemoryMapping, ) @@ -158,11 +158,10 @@ class GDBServerStateIterator: self.binary = self._process.progspace.filename first_state = self.current_state() - self._log_matcher = LogStateMatcher(self._deterministic_log.events(), - self._deterministic_log.mmaps(), - match_event, - from_state=first_state) - event, _ = self._log_matcher.match(first_state) + self._events = EventMatcher(self._deterministic_log.events(), + match_event, + from_state=first_state) + event = self._events.match(first_state) info(f'Synchronized at PC={hex(first_state.read_pc())} to event:\n{event}') def current_state(self) -> ReadableProgramState: @@ -228,7 +227,7 @@ class GDBServerStateIterator: self._first_next = False return GDBProgramState(self._process, gdb.selected_frame(), self.arch) - event, post_event, _ = self._log_matcher.match_pair(self.current_state()) + event, post_event = self._events.match_pair(self.current_state()) if event: state = self._handle_event(event, post_event) if self._is_exited(): @@ -249,7 +248,7 @@ class GDBServerStateIterator: def run_until(self, addr: int) -> ReadableProgramState: events_handled = 0 - event, _ = self._log_matcher.next() + event = self._events.next() while event: state = self._run_until_any([addr, event.pc]) if state.read_pc() == addr: @@ -257,10 +256,10 @@ class GDBServerStateIterator: self._first_next = events_handled == 0 return state - event, post_event, _ = self._log_matcher.match_pair(self.current_state()) + event, post_event = self._events.match_pair(self.current_state()) self._handle_event(event, post_event) - event, _ = self._log_matcher.next() + event = self._events.next() events_handled += 1 return self._run_until_any([addr]) @@ -452,7 +451,7 @@ def collect_conc_trace(gdb: GDBServerStateIterator, \ if logger.isEnabledFor(logging.DEBUG): debug('Tracing program with the following non-deterministic events:') - for event in gdb._log_matcher.events(): + for event in gdb._events.events: debug(event) # Skip to start -- cgit 1.4.1