diff options
| author | Theofilos Augoustis <37243696+taugoust@users.noreply.github.com> | 2025-11-19 10:10:21 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-11-19 10:10:21 +0100 |
| commit | dae5b391c2042ca70359cf9c93e62b1d4c903e71 (patch) | |
| tree | c2f56e740ffd13200af31530f6693593bd500ac0 | |
| parent | ba5864bd653a995bc72405f0a5e7c389e4052e44 (diff) | |
| parent | 97d9a0330c3c09023a82d39b10100ff816b0d846 (diff) | |
| download | focaccia-dae5b391c2042ca70359cf9c93e62b1d4c903e71.tar.gz focaccia-dae5b391c2042ca70359cf9c93e62b1d4c903e71.zip | |
Merge pull request #27 from TUM-DSE/ta/emu-replay
Implement basic replaying for QEMU
| -rw-r--r-- | reproducers/issue-508.c | 7 | ||||
| -rw-r--r-- | src/focaccia/_deterministic_impl.py | 391 | ||||
| -rw-r--r-- | src/focaccia/arch/arch.py | 24 | ||||
| -rw-r--r-- | src/focaccia/arch/x86.py | 11 | ||||
| -rw-r--r-- | src/focaccia/compare.py | 2 | ||||
| -rw-r--r-- | src/focaccia/deterministic.py | 536 | ||||
| -rw-r--r-- | src/focaccia/native/__init__.py | 0 | ||||
| -rw-r--r-- | src/focaccia/native/lldb_target.py (renamed from src/focaccia/lldb_target.py) | 4 | ||||
| -rw-r--r-- | src/focaccia/native/tracer.py | 321 | ||||
| -rw-r--r-- | src/focaccia/qemu/__init__.py | 0 | ||||
| -rw-r--r-- | src/focaccia/qemu/_qemu_tool.py (renamed from src/focaccia/tools/_qemu_tool.py) | 236 | ||||
| -rw-r--r-- | src/focaccia/qemu/deterministic.py | 9 | ||||
| -rw-r--r-- | src/focaccia/qemu/syscall.py | 14 | ||||
| -rwxr-xr-x | src/focaccia/qemu/validation_server.py (renamed from src/focaccia/tools/validation_server.py) | 0 | ||||
| -rw-r--r-- | src/focaccia/qemu/x86.py | 10 | ||||
| -rw-r--r-- | src/focaccia/snapshot.py | 9 | ||||
| -rw-r--r-- | src/focaccia/symbolic.py | 359 | ||||
| -rwxr-xr-x | src/focaccia/tools/capture_transforms.py | 21 | ||||
| -rwxr-xr-x | src/focaccia/tools/validate_qemu.py | 10 |
19 files changed, 1185 insertions, 779 deletions
diff --git a/reproducers/issue-508.c b/reproducers/issue-508.c index e143183..c8fd251 100644 --- a/reproducers/issue-508.c +++ b/reproducers/issue-508.c @@ -1,13 +1,18 @@ #include <stdio.h> +#include <stdlib.h> +#include <sys/random.h> int main() { int mem = 0x12345678; + int buf = 0; + getrandom(&buf, sizeof(buf), 0); register long rax asm("rax") = 0x1234567812345678; - register int edi asm("edi") = 0x77777777; + register int edi asm("edi") = buf; asm("cmpxchg %[edi],%[mem]" : [ mem ] "+m"(mem), [ rax ] "+r"(rax) : [ edi ] "r"(edi)); long rax2 = rax; printf("rax2 = %lx\n", rax2); + printf("rand= %d\n", buf); } diff --git a/src/focaccia/_deterministic_impl.py b/src/focaccia/_deterministic_impl.py new file mode 100644 index 0000000..fc85b9a --- /dev/null +++ b/src/focaccia/_deterministic_impl.py @@ -0,0 +1,391 @@ +"""Parsing of JSON files containing snapshot data.""" + +import os +import io +import struct +from typing import Union, Tuple, Optional + +import brotli + +from .deterministic import ( + MemoryWriteHole, + MemoryWrite, + Event, + SyscallBufferFlushEvent, + SyscallExtra, + SyscallEvent, + SignalDescriptor, + SignalEvent, + MemoryMapping, + Task, + CloneTask, + ExecTask, + ExitTask +) + +import capnp +rr_trace = capnp.load(file_name='./rr/src/rr_trace.capnp', + imports=[os.path.dirname(p) for p in capnp.__path__]) + +Frame = rr_trace.Frame +TaskEvent = rr_trace.TaskEvent +MMap = rr_trace.MMap +SerializedObject = Union[Frame, TaskEvent, MMap] + +class DeterministicLogReader(io.RawIOBase): + """ + File-like reader for rr trace files. + + Each block in the file: + uint32_t uncompressed_size + uint32_t compressed_size + [compressed_data...] + Presents the concatenated uncompressed data as a sequential byte stream. + """ + + _HDR = struct.Struct("<II") + + def __init__(self, filename: str): + super().__init__() + self._f = open(filename, "rb", buffering=0) + self._data_buffer = memoryview(b"") + self._pos = 0 + self._eof = False + + def _load_chunk(self) -> None: + """Load and decompress the next Brotli block.""" + header = self._f.read(self._HDR.size) + if not header: + self._eof = True + self._data_buffer = memoryview(b"") + return + if len(header) != self._HDR.size: + raise EOFError("Incomplete RR data block header") + + compressed_length, uncompressed_length = self._HDR.unpack(header) + chunk = self._f.read(compressed_length) + if len(chunk) != compressed_length: + raise EOFError("Incomplete RR data block") + + chunk = brotli.decompress(chunk) + if len(chunk) != uncompressed_length: + raise Exception(f'Malformed deterministic log: uncompressed chunk is not equal' + f'to reported length {hex(uncompressed_length)}') + + self._data_buffer = memoryview(chunk) + self._pos = 0 + + def read(self, n: Optional[int] = -1) -> bytes: + """Read up to n bytes from the uncompressed stream.""" + if n == 0: + return b"" + + chunks = bytearray() + remaining = n if n is not None and n >= 0 else None + + while not self._eof and (remaining is None or remaining > 0): + if self._pos >= len(self._data_buffer): + self._load_chunk() + if self._eof: + break + + available = len(self._data_buffer) - self._pos + take = available if remaining is None else min(available, remaining) + chunks += self._data_buffer[self._pos:self._pos + take] + self._pos += take + if remaining is not None: + remaining -= take + + return bytes(chunks) + + def readable(self) -> bool: + return True + + def close(self) -> None: + if not self.closed: + self._f.close() + super().close() + +def parse_x64_registers(enc_regs: bytes, signed: bool=False) -> dict[str, int]: + idx = 0 + def parse_reg(): + nonlocal idx + enc_reg = enc_regs[idx:(idx := idx + 8)] + return int.from_bytes(enc_reg, byteorder='little', signed=signed) + + regs = {} + + regs['r15'] = parse_reg() + regs['r14'] = parse_reg() + regs['r13'] = parse_reg() + regs['r12'] = parse_reg() + regs['rbp'] = parse_reg() + regs['rbx'] = parse_reg() + + # rcx is unreliable: parsed but ignored + parse_reg() + + regs['r10'] = parse_reg() + regs['r9'] = parse_reg() + regs['r8'] = parse_reg() + + regs['rax'] = parse_reg() + + # rcx is unreliable: parsed but ignored + parse_reg() + + regs['rdx'] = parse_reg() + regs['rsi'] = parse_reg() + regs['rdi'] = parse_reg() + + regs['orig_rax'] = parse_reg() + + regs['rip'] = parse_reg() + regs['cs'] = parse_reg() + + # eflags is unreliable: parsed but ignored + parse_reg() + + regs['rsp'] = parse_reg() + regs['ss'] = parse_reg() + regs['fs_base'] = parse_reg() + regs['ds'] = parse_reg() + regs['es'] = parse_reg() + regs['fs'] = parse_reg() + regs['gs'] = parse_reg() + regs['gs_base'] = parse_reg() + + return regs + +def parse_aarch64_registers(enc_regs: bytes, order: str='little', signed: bool=False) -> dict[str, int]: + idx = 0 + def parse_reg(): + nonlocal idx + enc_reg = enc_regs[idx:(idx := idx + 8)] + return int.from_bytes(enc_reg, byteorder=order, signed=signed) + + regnames = [] + for i in range(32): + regnames.append(f'x{i}') + regnames.append('sp') + regnames.append('pc') + regnames.append('cpsr') + + regs = {} + for i in range(len(regnames)): + regs[regnames[i]] = parse_reg() + + return regs + +class DeterministicLog: + def __init__(self, log_dir: str): + self.base_directory = log_dir + + def _get_file(self, file_name: str) -> str | None: + candidate = os.path.join(self.base_directory, file_name) + if os.path.isfile(candidate): + return candidate + return None + + def events_file(self) -> str | None: + return self._get_file('events') + + def tasks_file(self) -> str | None: + return self._get_file('tasks') + + def mmaps_file(self) -> str | None: + return self._get_file('mmaps') + + def data_file(self) -> str | None: + return self._get_file('data') + + def _read_structure(self, file, obj: SerializedObject) -> list[SerializedObject]: + data = DeterministicLogReader(file).read() + + objects = [] + for deser in obj.read_multiple_bytes_packed(data): + objects.append(deser) + return objects + + def raw_events(self) -> list[Frame]: + return self._read_structure(self.events_file(), Frame) + + def raw_tasks(self) -> list[TaskEvent]: + return self._read_structure(self.tasks_file(), TaskEvent) + + def raw_mmaps(self) -> list[MMap]: + return self._read_structure(self.mmaps_file(), MMap) + + def events(self) -> list[Event]: + def parse_registers(event: Frame) -> Tuple[str, dict[str, int]]: + arch = event.arch + if arch == rr_trace.Arch.x8664: + regs = parse_x64_registers(event.registers.raw) + return 'rip', regs + if arch == rr_trace.Arch.aarch64: + regs = parse_aarch64_registers(event.registers.raw) + return 'pc', regs + raise NotImplementedError(f'Unable to parse registers for architecture {arch}') + + def parse_memory_writes(event: Frame, reader: io.RawIOBase) -> list[MemoryWrite]: + writes = [] + for raw_write in event.memWrites: + # Skip memory writes with 0 bytes + if raw_write.size == 0: + continue + + holes = [] + for raw_hole in raw_write.holes: + holes.append(MemoryWriteHole(raw_hole.offset, raw_hole.size)) + + data = bytearray() + for hole in holes: + until_hole = hole.offset - reader.tell() + data.extend(reader.read(until_hole)) + data.extend(b'\x00' * hole.size) + + # No holes + if len(data) == 0: + data = reader.read(raw_write.size) + + mem_write = MemoryWrite(raw_write.tid, + raw_write.addr, + raw_write.size, + holes, + raw_write.sizeIsConservative, + bytes(data)) + writes.append(mem_write) + return writes + + data_reader = DeterministicLogReader(self.data_file()) + + events = [] + raw_events = self.raw_events() + for raw_event in raw_events: + pc, registers = parse_registers(raw_event) + mem_writes = parse_memory_writes(raw_event, data_reader) + + event = None + + tid = raw_event.tid + arch = raw_event.arch + event_type = raw_event.event.which() + + if event_type == 'syscall': + if raw_event.arch == rr_trace.Arch.x8664: + # On entry: substitute orig_rax for RAX + syscall = raw_event.event.syscall + if syscall.state == rr_trace.SyscallState.entering: + registers['rax'] = registers['orig_rax'] + if syscall.number != 59: + registers[pc] -= 2 + del registers['orig_rax'] + if raw_event.arch == rr_trace.Arch.aarch64: + syscall = raw_event.event.syscall + if syscall.state == rr_trace.SyscallState.entering and syscall.number != 221: + registers[pc] -= 4 + + event = SyscallEvent(registers[pc], + tid, + arch, + registers, + mem_writes, + raw_event.event.syscall.arch, + raw_event.event.syscall.number, + raw_event.event.syscall.state, + raw_event.event.syscall.failedDuringPreparation) + + if event_type == 'syscallbufFlush': + event = SyscallBufferFlushEvent(registers[pc], + tid, + arch, + registers, + mem_writes, + raw_event.event.syscallbufFlush.mprotectRecords) + raise NotImplementedError(f'Cannot support system call buffer events yet: {event}') + if event_type == 'signal': + signal = raw_event.event.signal + signal_descriptor = SignalDescriptor(signal.arch, + signal.siginfo, + signal.deterministic, + signal.disposition) + event = SignalEvent(registers[pc], tid, arch, registers, mem_writes, + signal_number=signal_descriptor) + + if event_type == 'signalDelivery': + signal = raw_event.event.signalDelivery + signal_descriptor = SignalDescriptor(signal.arch, + signal.siginfo, + signal.deterministic, + signal.disposition) + event = SignalEvent(registers[pc], tid, arch, registers, mem_writes, + signal_delivery=signal_descriptor) + + if event_type == 'signalHandler': + signal = raw_event.event.signalHandler + signal_descriptor = SignalDescriptor(signal.arch, + signal.siginfo, + signal.deterministic, + signal.disposition) + event = SignalEvent(registers[pc], tid, arch, registers, mem_writes, + signal_handler=signal_descriptor) + + if event is None: + event = Event(registers[pc], tid, arch, registers, mem_writes, event_type) + + events.append(event) + + return events + + def tasks(self) -> list[Task]: + tasks = [] + raw_tasks = self.raw_tasks() + for raw_task in raw_tasks: + task_type = raw_task.which() + + task = None + if task_type == 'clone': + task = CloneTask(raw_task.frameTime, + raw_task.tid, + raw_task.clone.parentTid, + raw_task.clone.flags, + raw_task.clone.ownNsTid) + if task_type == 'exec': + task = ExecTask(raw_task.frameTime, + raw_task.tid, + raw_task.exec.fileName, + raw_task.exec.cmdLine, + raw_task.exec.exeBase, + raw_task.exec.interpBase, + raw_task.exec.interpName) + if task_type == 'exit': + task = ExitTask(raw_task.frameTime, raw_task.tid, raw_task.exit.exitStatus) + if task_type == 'detach': + task = DetachTask(raw_task.frameTime, raw_task.tid) + tasks.append(task) + return tasks + + def mmaps(self) -> list[MemoryMapping]: + def mapping_source(mmap: MMap) -> str: + source_type = mmap.source.which() + if source_type == 'zero' or source_type == 'trace': + return source_type + elif source_type == 'file': + return mmap.source.file.backingFileName + else: + raise NotImplementedError(f'Unable to handle memory mappings from source type:' + f' {source_type}') + + mmaps = [] + raw_mmaps = self.raw_mmaps() + for raw_mmap in raw_mmaps: + mmap = MemoryMapping(raw_mmap.frameTime, + raw_mmap.start, + raw_mmap.end, + mapping_source(raw_mmap), + raw_mmap.fileOffsetBytes, + raw_mmap.prot, + raw_mmap.flags) + mmaps.append(mmap) + return mmaps + diff --git a/src/focaccia/arch/arch.py b/src/focaccia/arch/arch.py index c220a3b..2652159 100644 --- a/src/focaccia/arch/arch.py +++ b/src/focaccia/arch/arch.py @@ -1,6 +1,17 @@ from typing import Literal from collections.abc import Callable +class SyscallInfo: + def __init__(self, name: str, outputs: list[(str, str, str)]): + """ Describes a syscall by its name and outputs. + + An output is a regname holding the pointer, + the length in bytes--either as a number or as onther register name-- and + the type of the output + """ + self.name = name + self.outputs = outputs + class RegisterAccessor: def __init__(self, regname: str, start_bit: int, end_bit: int): """An accessor that describes a range of bits. @@ -103,6 +114,19 @@ class Arch(): """ return False + def get_em_syscalls(self) -> dict[int, str]: + """Returns an architecture specific set of syscalls that Focaccia needs to purely emulate.""" + raise NotImplementedError("Architecture must implement get_em_syscalls") + + def get_pasthru_syscalls(self) -> dict[int, str]: + """Returns an architecture specific set of syscalls that Focaccia needs to passthrough and + then warns about missmatching values. Examples are memory and lock related syscalls.""" + raise NotImplementedError("Architecture must implement get_pasthru_syscalls") + + def get_syscall_reg(self) -> str: + """Returns the register name that contains the syscall number.""" + raise NotImplementedError("Architecture must implement get_syscall_reg") + def is_instr_syscall(self, instr: str) -> bool: return False diff --git a/src/focaccia/arch/x86.py b/src/focaccia/arch/x86.py index a5d29f5..4609f4c 100644 --- a/src/focaccia/arch/x86.py +++ b/src/focaccia/arch/x86.py @@ -1,6 +1,6 @@ """Architecture-specific configuration.""" -from .arch import Arch, RegisterDescription as _Reg +from .arch import Arch, RegisterDescription as _Reg, SyscallInfo as _Sc archname = 'x86_64' @@ -215,3 +215,12 @@ class ArchX86(Arch): return True return False + def get_em_syscalls(self) -> dict[int, str]: + return emulatedSyscalls + + def get_pasthru_syscalls(self) -> dict[int, str]: + return passthruSyscalls + + def get_syscall_reg(self) -> str: + return 'rax' + diff --git a/src/focaccia/compare.py b/src/focaccia/compare.py index 4fea451..3aa5ab2 100644 --- a/src/focaccia/compare.py +++ b/src/focaccia/compare.py @@ -34,7 +34,7 @@ def _calc_transformation(previous: ProgramState, current: ProgramState): try: prev_val = previous.read_register(reg) cur_val = current.read_register(reg) - transformation.set_register(reg, cur_val - prev_val) + transformation.write_register(reg, cur_val - prev_val) except RegisterAccessError: # Register is not set in either state pass diff --git a/src/focaccia/deterministic.py b/src/focaccia/deterministic.py index e7914a3..ffd519c 100644 --- a/src/focaccia/deterministic.py +++ b/src/focaccia/deterministic.py @@ -1,172 +1,8 @@ -"""Parsing of JSON files containing snapshot data.""" - -import os -import io -import struct -from typing import Union, Optional - -import brotli - from .arch import Arch from .snapshot import ReadableProgramState -try: - import capnp - rr_trace = capnp.load(file_name='./rr/src/rr_trace.capnp', - imports=[os.path.dirname(p) for p in capnp.__path__]) -except Exception as e: - print(f'Cannot load RR trace loader: {e}') - exit(2) - -Frame = rr_trace.Frame -TaskEvent = rr_trace.TaskEvent -MMap = rr_trace.MMap -SerializedObject = Union[Frame, TaskEvent, MMap] - -class DeterministicLogReader(io.RawIOBase): - """ - File-like reader for rr trace files. - - Each block in the file: - uint32_t uncompressed_size - uint32_t compressed_size - [compressed_data...] - Presents the concatenated uncompressed data as a sequential byte stream. - """ - - _HDR = struct.Struct("<II") - - def __init__(self, filename: str): - super().__init__() - self._f = open(filename, "rb", buffering=0) - self._data_buffer = memoryview(b"") - self._pos = 0 - self._eof = False - - def _load_chunk(self) -> None: - """Load and decompress the next Brotli block.""" - header = self._f.read(self._HDR.size) - if not header: - self._eof = True - self._data_buffer = memoryview(b"") - return - if len(header) != self._HDR.size: - raise EOFError("Incomplete RR data block header") - - compressed_length, uncompressed_length = self._HDR.unpack(header) - chunk = self._f.read(compressed_length) - if len(chunk) != compressed_length: - raise EOFError("Incomplete RR data block") - - chunk = brotli.decompress(chunk) - if len(chunk) != uncompressed_length: - raise Exception(f'Malformed deterministic log: uncompressed chunk is not equal' - f'to reported length {hex(uncompressed_length)}') - - self._data_buffer = memoryview(chunk) - self._pos = 0 - - def read(self, n: Optional[int] = -1) -> bytes: - """Read up to n bytes from the uncompressed stream.""" - if n == 0: - return b"" - - chunks = bytearray() - remaining = n if n is not None and n >= 0 else None - - while not self._eof and (remaining is None or remaining > 0): - if self._pos >= len(self._data_buffer): - self._load_chunk() - if self._eof: - break - - available = len(self._data_buffer) - self._pos - take = available if remaining is None else min(available, remaining) - chunks += self._data_buffer[self._pos:self._pos + take] - self._pos += take - if remaining is not None: - remaining -= take - - return bytes(chunks) - - def readable(self) -> bool: - return True - - def close(self) -> None: - if not self.closed: - self._f.close() - super().close() - -def parse_x64_registers(enc_regs: bytes, signed: bool=False) -> dict[str, int]: - idx = 0 - def parse_reg(): - nonlocal idx - enc_reg = enc_regs[idx:(idx := idx + 8)] - return int.from_bytes(enc_reg, byteorder='little', signed=signed) - - regs = {} - - regs['r15'] = parse_reg() - regs['r14'] = parse_reg() - regs['r13'] = parse_reg() - regs['r12'] = parse_reg() - regs['rbp'] = parse_reg() - regs['rbx'] = parse_reg() - - # rcx is unreliable: parsed but ignored - parse_reg() - - regs['r10'] = parse_reg() - regs['r9'] = parse_reg() - regs['r8'] = parse_reg() - - regs['rax'] = parse_reg() - - # rcx is unreliable: parsed but ignored - parse_reg() - - regs['rdx'] = parse_reg() - regs['rsi'] = parse_reg() - regs['rdi'] = parse_reg() - - regs['orig_rax'] = parse_reg() - - regs['rip'] = parse_reg() - regs['cs'] = parse_reg() - - # eflags is unreliable: parsed but ignored - parse_reg() - - regs['rsp'] = parse_reg() - regs['ss'] = parse_reg() - regs['fs_base'] = parse_reg() - regs['ds'] = parse_reg() - regs['es'] = parse_reg() - regs['fs'] = parse_reg() - regs['gs'] = parse_reg() - regs['gs_base'] = parse_reg() - - return regs - -def parse_aarch64_registers(enc_regs: bytes, order: str='little', signed: bool=False) -> dict[str, int]: - idx = 0 - def parse_reg(): - nonlocal idx - enc_reg = enc_regs[idx:(idx := idx + 8)] - return int.from_bytes(enc_reg, byteorder=order, signed=signed) - - regnames = [] - for i in range(32): - regnames.append(f'x{i}') - regnames.append('sp') - regnames.append('pc') - regnames.append('cpsr') - - regs = {} - for i in range(len(regnames)): - regs[regnames[i]] = parse_reg() - - return regs +from reprlib import repr as alt_repr +from typing import Callable, Tuple, Optional class MemoryWriteHole: def __init__(self, offset: int, size: int): @@ -214,18 +50,6 @@ class Event: self.mem_writes = memory_writes self.event_type = event_type - def match(self, pc: int, target: ReadableProgramState) -> bool: - # TODO: match the rest of the state to be sure - if self.pc == pc: - for reg, value in self.registers.items(): - if value == self.pc: - continue - if target.read_register(reg) != value: - print(f'Failed match for {reg}: {hex(value)} != {hex(target.read_register(reg))}') - return False - return True - return False - def __repr__(self) -> str: reg_repr = f'{self.event_type} event\n' for reg, value in self.registers.items(): @@ -233,11 +57,11 @@ class Event: mem_write_repr = '' for mem_write in self.mem_writes: - mem_write_repr += f'{mem_write}\n' + mem_write_repr += f'{alt_repr(mem_write)}\n' repr_str = f'Thread {hex(self.tid)} executed event {self.event_type} at {hex(self.pc)}\n' repr_str += f'Register set:\n{reg_repr}' - + if len(self.mem_writes): repr_str += f'\nMemory writes:\n{mem_write_repr}' @@ -356,7 +180,8 @@ class MemoryMapping: source: str, offset: int, mmap_prot: int, - mmap_flags: int): + mmap_flags: int, + name: str | None = None): self.event_count = event_count self.start_address = start_address self.length = end_address - self.start_address @@ -364,10 +189,13 @@ class MemoryMapping: self.offset = offset self.mmap_prot = mmap_prot self.mmap_flags = mmap_flags + self.name = name def __repr__(self) -> str: - return f'Memory mapping at event {self.event_count}\n' \ - f'start = {hex(self.start_address)}\n' \ + header = f'Memory mapping at event {self.event_count}\n' + if self.name: + header += f'name = {self.name}\n' + return header + f'start = {hex(self.start_address)}\n' \ f'length = {self.length}\n' \ f'source = {self.source}\n' \ f'offset = {self.offset}\n' \ @@ -447,201 +275,151 @@ class DetachTask(Task): def __repr__(self) -> str: return f'Detach task\n{super().__repr__()}' -class DeterministicLog: - def __init__(self, log_dir: str): - self.base_directory = log_dir - - def events_file(self) -> str: - return os.path.join(self.base_directory, 'events') - - def tasks_file(self) -> str: - return os.path.join(self.base_directory, 'tasks') - - def mmaps_file(self) -> str: - return os.path.join(self.base_directory, 'mmaps') - - def data_file(self) -> str: - return os.path.join(self.base_directory, 'data') - - def _read_structure(self, file, obj: SerializedObject) -> list[SerializedObject]: - data = DeterministicLogReader(file).read() - - objects = [] - for deser in obj.read_multiple_bytes_packed(data): - objects.append(deser) - return objects - - def raw_events(self) -> list[Frame]: - return self._read_structure(self.events_file(), Frame) - - def raw_tasks(self) -> list[TaskEvent]: - return self._read_structure(self.tasks_file(), TaskEvent) - - def raw_mmaps(self) -> list[MMap]: - return self._read_structure(self.mmaps_file(), MMap) - - def events(self) -> list[Event]: - def parse_registers(event: Frame) -> Union[int, dict[str, int]]: - arch = event.arch - if arch == rr_trace.Arch.x8664: - regs = parse_x64_registers(event.registers.raw) - return regs['rip'], regs - if arch == rr_trace.Arch.aarch64: - regs = parse_aarch64_registers(event.registers.raw) - return regs['pc'], regs - raise NotImplementedError(f'Unable to parse registers for architecture {arch}') - - def parse_memory_writes(event: Frame, reader: io.RawIOBase) -> list[MemoryWrite]: - writes = [] - for raw_write in event.memWrites: - # Skip memory writes with 0 bytes - if raw_write.size == 0: - continue - - holes = [] - for raw_hole in raw_write.holes: - holes.append(MemoryWriteHole(raw_hole.offset, raw_hole.size)) - - data = bytearray() - for hole in holes: - until_hole = hole.offset - reader.tell() - data.extend(reader.read(until_hole)) - data.extend(b'\x00' * hole.size) - - # No holes - if len(data) == 0: - data = reader.read(raw_write.size) - - mem_write = MemoryWrite(raw_write.tid, - raw_write.addr, - raw_write.size, - holes, - raw_write.sizeIsConservative, - bytes(data)) - writes.append(mem_write) - return writes - - data_reader = DeterministicLogReader(self.data_file()) - - events = [] - raw_events = self.raw_events() - for raw_event in raw_events: - pc, registers = parse_registers(raw_event) - mem_writes = parse_memory_writes(raw_event, data_reader) - - event = None - - tid = raw_event.tid - arch = raw_event.arch - event_type = raw_event.event.which() - - if event_type == 'syscall': - if raw_event.arch == rr_trace.Arch.x8664: - # On entry: substitute orig_rax for RAX - if raw_event.event.syscall.state == rr_trace.SyscallState.entering: - registers['rax'] = registers['orig_rax'] - del registers['orig_rax'] - event = SyscallEvent(pc, - tid, - arch, - registers, - mem_writes, - raw_event.event.syscall.arch, - raw_event.event.syscall.number, - raw_event.event.syscall.state, - raw_event.event.syscall.failedDuringPreparation) - - if event_type == 'syscallbufFlush': - event = SyscallBufferFlushEvent(pc, - tid, - arch, - registers, - mem_writes, - raw_event.event.syscallbufFlush.mprotectRecords) - raise NotImplementedError(f'Cannot support system call buffer events yet: {event}') - if event_type == 'signal': - signal = raw_event.event.signal - signal_descriptor = SignalDescriptor(signal.arch, - signal.siginfo, - signal.deterministic, - signal.disposition) - event = SignalEvent(pc, tid, arch, registers, mem_writes, - signal_number=signal_descriptor) - - if event_type == 'signalDelivery': - signal = raw_event.event.signalDelivery - signal_descriptor = SignalDescriptor(signal.arch, - signal.siginfo, - signal.deterministic, - signal.disposition) - event = SignalEvent(pc, tid, arch, registers, mem_writes, - signal_delivery=signal_descriptor) - - if event_type == 'signalHandler': - signal = raw_event.event.signalHandler - signal_descriptor = SignalDescriptor(signal.arch, - signal.siginfo, - signal.deterministic, - signal.disposition) - event = SignalEvent(pc, tid, arch, registers, mem_writes, - signal_handler=signal_descriptor) - +try: + from ._deterministic_impl import DeterministicLog +except Exception: + class DeterministicLog: + def __init__(self, log_dir: str): + self.base_directory = None + + def events_file(self) -> str | None: return None + def tasks_file(self) -> str | None: return None + def mmaps_file(self) -> str | None: return None + def events(self) -> list[Event]: return [] + def tasks(self) -> list[Task]: return [] + def mmaps(self) -> list[MemoryMapping]: return [] +finally: + class EventMatcher: + def __init__(self, + events: list[Event], + match_fn: Callable, + from_state: ReadableProgramState | None = None): + self.events = events + self.matcher = match_fn + + self.matched_count = None + if from_state: + self.match(from_state) + + def match(self, state: ReadableProgramState) -> Event | None: + if self.matched_count is None: + # Need to synchronize + # Search for match + for idx in range(len(self.events)): + event = self.events[idx] + if self.matcher(event, state): + self.matched_count = idx + 1 + return event + + if self.matched_count is None: + return None + + event = self.events[self.matched_count] + if self.matcher(event, state): + self.matched_count += 1 # proceed to next + return event + + return None + + def next(self): + if self.matched_count is None: + raise ValueError('Cannot get next event with unsynchronized event matcher') + if self.matched_count < len(self.events): + return self.events[self.matched_count] + return None + + def match_pair(self, state: ReadableProgramState): + event = self.match(state) if event is None: - event = Event(pc, tid, arch, registers, mem_writes, event_type) - - events.append(event) - - return events - - def tasks(self) -> list[Task]: - tasks = [] - raw_tasks = self.raw_tasks() - for raw_task in raw_tasks: - task_type = raw_task.which() - - task = None - if task_type == 'clone': - task = CloneTask(raw_task.frameTime, - raw_task.tid, - raw_task.clone.parentTid, - raw_task.clone.flags, - raw_task.clone.ownNsTid) - if task_type == 'exec': - task = ExecTask(raw_task.frameTime, - raw_task.tid, - raw_task.exec.fileName, - raw_task.exec.cmdLine, - raw_task.exec.exeBase, - raw_task.exec.interpBase, - raw_task.exec.interpName) - if task_type == 'exit': - task = ExitTask(raw_task.frameTime, raw_task.tid, raw_task.exit.exitStatus) - if task_type == 'detach': - task = DetachTask(raw_task.frameTime, raw_task.tid) - tasks.append(task) - return tasks - - def mmaps(self) -> list[MemoryMapping]: - def mapping_source(mmap: MMap) -> str: - source_type = mmap.source.which() - if source_type == 'zero' or source_type == 'trace': - return source_type - elif source_type == 'file': - return mmap.source.file.backingFileName - else: - raise NotImplementedError(f'Unable to handle memory mappings from source type:' - f' {source_type}') - - mmaps = [] - raw_mmaps = self.raw_mmaps() - for raw_mmap in raw_mmaps: - mmap = MemoryMapping(raw_mmap.frameTime, - raw_mmap.start, - raw_mmap.end, - mapping_source(raw_mmap), - raw_mmap.fileOffsetBytes, - raw_mmap.prot, - raw_mmap.flags) - mmaps.append(mmap) - return mmaps + return None, None + if isinstance(event, SyscallEvent) and event.syscall_state == 'exiting': + self.matched_count = None + return None, None + assert(self.matched_count is not None) + post_event = self.events[self.matched_count] + self.matched_count += 1 + return event, post_event + + def __bool__(self) -> bool: + return len(self.events) > 0 + + class MappingMatcher: + def __init__(self, memory_mappings: list[MemoryMapping]): + self.memory_mappings = memory_mappings + self.matched_count = None + + def match(self, event_count: int) -> list[MemoryMapping]: + if self.matched_count is None: + # Need to synchronize + # Search for match + mappings = [] + for idx in range(len(self.memory_mappings)): + mapping = self.memory_mappings[idx] + if mapping.event_count == event_count: + self.matched_count = idx + 1 + mappings.append(mapping) + return mappings + + mappings = [] + while self.matched_count < len(self.memory_mappings): + mapping = self.memory_mappings[self.matched_count] + if mapping.event_count == event_count: + self.matched_count += 1 # proceed to next + mappings.append(mapping) + + return mappings + + def next(self): + if self.matched_count is None: + raise ValueError('Cannot get next mapping with unsynchronized mapping matcher') + if self.matched_count < len(self.memory_mappings): + return self.memory_mappings[self.matched_count] + return None + + def __bool__(self) -> bool: + return len(self.memory_mappings) > 0 + + class LogStateMatcher: + def __init__(self, + events: list[Event], + memory_mappings: list[MemoryMapping], + event_match_fn: Callable, + from_state: ReadableProgramState | None = None): + self.event_matcher = EventMatcher(events, event_match_fn, from_state) + self.mapping_matcher = MappingMatcher(memory_mappings) + + def events(self) -> list[Event]: + return self.event_matcher.events + + def mappings(self) -> list[MemoryMapping]: + return self.mapping_matcher.memory_mappings + + def matched_events(self) -> Optional[int]: + return self.event_matcher.matched_count + + def match(self, state: ReadableProgramState) -> Tuple[Optional[Event], list[MemoryMapping]]: + event = self.event_matcher.match(state) + if not event: + return None, [] + assert(self.event_matcher.matched_count is not None) + mapping = self.mapping_matcher.match(self.event_matcher.matched_count) + return event, mapping + + def match_pair(self, state: ReadableProgramState) -> Tuple[Optional[Event], Optional[Event], list[MemoryMapping]]: + event, post_event = self.event_matcher.match_pair(state) + if not event: + return None, None, [] + assert(self.event_matcher.matched_count is not None) + mapping = self.mapping_matcher.match(self.event_matcher.matched_count-1) + return event, post_event, mapping + + def next(self) -> Tuple[Optional[Event], list[MemoryMapping]]: + next_event = self.event_matcher.next() + if not next_event: + return None, [] + assert(self.event_matcher.matched_count is not None) + return next_event, self.mapping_matcher.match(self.event_matcher.matched_count) + + def __bool__(self) -> bool: + return bool(self.event_matcher) diff --git a/src/focaccia/native/__init__.py b/src/focaccia/native/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/focaccia/native/__init__.py diff --git a/src/focaccia/lldb_target.py b/src/focaccia/native/lldb_target.py index 940b3d9..ff43643 100644 --- a/src/focaccia/lldb_target.py +++ b/src/focaccia/native/lldb_target.py @@ -3,8 +3,8 @@ import logging import lldb -from .arch import supported_architectures -from .snapshot import ProgramState +from focaccia.snapshot import ProgramState +from focaccia.arch import supported_architectures logger = logging.getLogger('focaccia-lldb-target') debug = logger.debug diff --git a/src/focaccia/native/tracer.py b/src/focaccia/native/tracer.py new file mode 100644 index 0000000..b2ca0d8 --- /dev/null +++ b/src/focaccia/native/tracer.py @@ -0,0 +1,321 @@ +"""Concolic Tracer for native programs.""" + +from __future__ import annotations + +import sys +import logging + +from pathlib import Path + +from focaccia.utils import timebound, TimeoutError +from focaccia.trace import Trace, TraceEnvironment +from focaccia.miasm_util import MiasmSymbolResolver +from focaccia.snapshot import ReadableProgramState, RegisterAccessError +from focaccia.symbolic import SymbolicTransform, DisassemblyContext, run_instruction +from focaccia.deterministic import Event, EventMatcher + +from .lldb_target import LLDBConcreteTarget, LLDBLocalTarget, LLDBRemoteTarget + +logger = logging.getLogger('focaccia-symbolic') +debug = logger.debug +info = logger.info +warn = logger.warn + +# Disable Miasm's disassembly logger +logging.getLogger('asmblock').setLevel(logging.CRITICAL) + +class ValidationError(Exception): + pass + +def match_event(event: Event, target: ReadableProgramState) -> bool: + # TODO: match the rest of the state to be sure + if event.pc == target.read_pc(): + for reg, value in event.registers.items(): + if value == event.pc: + continue + if target.read_register(reg) != value: + print(f'Failed match for {reg}: {hex(value)} != {hex(target.read_register(reg))}') + return False + return True + return False + +class SpeculativeTracer(ReadableProgramState): + def __init__(self, target: LLDBConcreteTarget): + super().__init__(target.arch) + self.target = target + self.pc = target.read_register('pc') + self.speculative_pc: int | None = None + self.speculative_count: int = 0 + + self.read_cache = {} + + def speculate(self, new_pc): + self.read_cache.clear() + if new_pc is None: + self.progress_execution() + self.target.step() + self.pc = self.target.read_register('pc') + self.speculative_pc = None + self.speculative_count = 0 + return + + new_pc = int(new_pc) + self.speculative_pc = new_pc + self.speculative_count += 1 + + def progress_execution(self) -> None: + if self.speculative_pc is not None and self.speculative_count != 0: + debug(f'Updating PC to {hex(self.speculative_pc)}') + if self.speculative_count == 1: + self.target.step() + else: + self.target.run_until(self.speculative_pc) + + self.pc = self.speculative_pc + self.speculative_pc = None + self.speculative_count = 0 + + self.read_cache.clear() + + def run_until(self, addr: int): + if self.speculative_pc: + raise Exception('Attempting manual execution with speculative execution enabled') + self.target.run_until(addr) + self.pc = addr + + def step(self): + self.progress_execution() + if self.target.is_exited(): + return + self.target.step() + self.pc = self.target.read_register('pc') + + def _cache(self, name: str, value): + self.read_cache[name] = value + return value + + def read_pc(self) -> int: + if self.speculative_pc is not None: + return self.speculative_pc + return self.pc + + def read_flags(self) -> dict[str, int | bool]: + if 'flags' in self.read_cache: + return self.read_cache['flags'] + self.progress_execution() + return self._cache('flags', self.target.read_flags()) + + def read_register(self, reg: str) -> int: + regname = self.arch.to_regname(reg) + if regname is None: + raise RegisterAccessError(reg, f'Not a register name: {reg}') + + if reg in self.read_cache: + return self.read_cache[reg] + + self.progress_execution() + return self._cache(reg, self.target.read_register(regname)) + + def write_register(self, regname: str, value: int): + self.progress_execution() + self.read_cache.pop(regname, None) + self.target.write_register(regname, value) + + def read_instructions(self, addr: int, size: int) -> bytes: + return self.target.read_memory(addr, size) + + def read_memory(self, addr: int, size: int) -> bytes: + self.progress_execution() + cache_name = f'{addr}_{size}' + if cache_name in self.read_cache: + return self.read_cache[cache_name] + return self._cache(cache_name, self.target.read_memory(addr, size)) + + def write_memory(self, addr: int, value: bytes): + self.progress_execution() + self.read_cache.pop(addr, None) + self.target.write_memory(addr, value) + + def __getattr__(self, name: str): + return getattr(self.target, name) + +class SymbolicTracer: + """A symbolic tracer that uses `LLDBConcreteTarget` with Miasm to simultaneously execute a + program with concrete state and collect its symbolic transforms + """ + def __init__(self, + env: TraceEnvironment, + remote: str | None=None, + force: bool=False, + cross_validate: bool=False): + self.env = env + self.force = force + self.remote = remote + self.cross_validate = cross_validate + self.target = SpeculativeTracer(self.create_debug_target()) + + def create_debug_target(self) -> LLDBConcreteTarget: + binary = self.env.binary_name + if self.remote is False: + debug(f'Launching local debug target {binary} {self.env.argv}') + debug(f'Environment: {self.env}') + return LLDBLocalTarget(binary, self.env.argv, self.env.envp) + + debug(f'Connecting to remote debug target {self.remote}') + target = LLDBRemoteTarget(self.remote, binary) + + module_name = target.determine_name() + binary = str(Path(self.env.binary_name).resolve()) + if binary != module_name: + warn(f'Discovered binary name {module_name} differs from specified name {binary}') + + return target + + def predict_next_state(self, instruction: Instruction, transform: SymbolicTransform): + debug(f'Evaluating register and memory transforms for {instruction} to cross-validate') + predicted_regs = transform.eval_register_transforms(self.target) + predicted_mems = transform.eval_memory_transforms(self.target) + return predicted_regs, predicted_mems + + def validate(self, + instruction: Instruction, + transform: SymbolicTransform, + predicted_regs: dict[str, int], + predicted_mems: dict[int, bytes]): + # Verify last generated transform by comparing concrete state against + # predicted values. + if self.target.is_exited(): + return + + debug('Cross-validating symbolic transforms by comparing actual to predicted values') + for reg, val in predicted_regs.items(): + conc_val = self.target.read_register(reg) + if conc_val != val: + raise ValidationError(f'Symbolic execution backend generated false equation for' + f' [{hex(instruction.addr)}]: {instruction}:' + f' Predicted {reg} = {hex(val)}, but the' + f' concrete state has value {reg} = {hex(conc_val)}.' + f'\nFaulty transformation: {transform}') + for addr, data in predicted_mems.items(): + conc_data = self.target.read_memory(addr, len(data)) + if conc_data != data: + raise ValidationError(f'Symbolic execution backend generated false equation for' + f' [{hex(instruction.addr)}]: {instruction}: Predicted' + f' mem[{hex(addr)}:{hex(addr+len(data))}] = {data},' + f' but the concrete state has value' + f' mem[{hex(addr)}:{hex(addr+len(data))}] = {conc_data}.' + f'\nFaulty transformation: {transform}') + + def progress(self, new_pc, step: bool = False) -> int | None: + self.target.speculate(new_pc) + if step: + info(f'Stepping through event at {hex(self.target.read_pc())}') + self.target.progress_execution() + if self.target.is_exited(): + return None + return self.target.read_pc() + + def trace(self, time_limit: int | None = None) -> Trace[SymbolicTransform]: + """Execute a program and compute state transformations between executed + instructions. + + :param start_addr: Address from which to start tracing. + :param stop_addr: Address until which to trace. + """ + # Set up concrete reference state + if self.env.start_address is not None: + self.target.run_until(self.env.start_address) + + ctx = DisassemblyContext(self.target) + arch = ctx.arch + + event_matcher = EventMatcher(self.env.detlog.events(), match_event, self.target) + if logger.isEnabledFor(logging.DEBUG): + debug('Tracing program with the following non-deterministic events') + for event in event_matcher.events: + debug(event) + + # Trace concolically + strace: list[SymbolicTransform] = [] + while not self.target.is_exited(): + pc = self.target.read_pc() + + if self.env.stop_address is not None and pc == self.env.stop_address: + info(f'Reached stop address at {hex(pc)}') + break + + # Disassemble instruction at the current PC + tid = self.target.get_current_tid() + try: + instruction = ctx.disassemble(pc) + info(f'[{tid}] Disassembled instruction {instruction} at {hex(pc)}') + except: + err = sys.exc_info()[1] + + # Try to recovery by using the LLDB disassembly instead + try: + alt_disas = self.target.get_disassembly(pc) + instruction = Instruction.from_string(alt_disas, ctx.arch, pc, + self.target.get_instruction_size(pc)) + info(f'[{tid}] Disassembled instruction {instruction} at {hex(pc)}') + except: + if self.force: + if alt_disas: + warn(f'[{tid}] Unable to handle instruction {alt_disas} at {hex(pc)} in Miasm.' + f' Skipping.') + else: + warn(f'[{tid}] Unable to disassemble instruction {hex(pc)}: {err}.' + f' Skipping.') + self.target.step() + continue + raise # forward exception + + event, post_event = event_matcher.match_pair(self.target) + in_event = (event and event_matcher) or self.target.arch.is_instr_syscall(str(instruction)) + + # Run instruction + conc_state = MiasmSymbolResolver(self.target, ctx.loc_db) + + try: + new_pc, modified = timebound(time_limit, run_instruction, + instruction.instr, conc_state, ctx.lifter) + except TimeoutError: + warn(f'Running instruction {instruction} took longer than {time_limit} second. Skipping') + new_pc, modified = None, {} + + if self.cross_validate and new_pc: + # Predict next concrete state. + # We verify the symbolic execution backend on the fly for some + # additional protection from bugs in the backend. + new_pc = int(new_pc) + transform = SymbolicTransform(tid, modified, [instruction], arch, pc, new_pc) + pred_regs, pred_mems = self.predict_next_state(instruction, transform) + self.progress(new_pc, step=in_event) + + try: + self.validate(instruction, transform, pred_regs, pred_mems) + except ValidationError as e: + if self.force: + warn(f'Cross-validation failed: {e}') + continue + raise + else: + new_pc = self.progress(new_pc, step=in_event) + if new_pc is None: + transform = SymbolicTransform(tid, modified, [instruction], arch, pc, 0) + strace.append(transform) + continue # we're done + transform = SymbolicTransform(tid, modified, [instruction], arch, pc, new_pc) + + strace.append(transform) + + if post_event: + if post_event.pc == 0: + # Exit sequence + debug('Completed exit event') + self.target.run() + + debug(f'Completed handling event: {post_event}') + + return Trace(strace, self.env) + diff --git a/src/focaccia/qemu/__init__.py b/src/focaccia/qemu/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/focaccia/qemu/__init__.py diff --git a/src/focaccia/tools/_qemu_tool.py b/src/focaccia/qemu/_qemu_tool.py index 02d150b..7ca556b 100644 --- a/src/focaccia/tools/_qemu_tool.py +++ b/src/focaccia/qemu/_qemu_tool.py @@ -6,10 +6,11 @@ But please use `tools/validate_qemu.py` instead because we have some more setup work to do. """ +import re import gdb import logging import traceback -from typing import Iterable +from typing import Iterable, Optional import focaccia.parser as parser from focaccia.arch import supported_architectures, Arch @@ -19,8 +20,16 @@ from focaccia.snapshot import ProgramState, ReadableProgramState, \ from focaccia.symbolic import SymbolicTransform, eval_symbol, ExprMem from focaccia.trace import Trace, TraceEnvironment from focaccia.utils import print_result +from focaccia.deterministic import ( + DeterministicLog, + Event, + EventMatcher, + SyscallEvent, + MemoryMapping, +) +from focaccia.qemu.deterministic import emulated_system_calls -from validate_qemu import make_argparser, verbosity +from focaccia.tools.validate_qemu import make_argparser, verbosity logger = logging.getLogger('focaccia-qemu-validator') debug = logger.debug @@ -36,7 +45,14 @@ qemu_crash = { 'snap': None, } -class GDBProgramState(ReadableProgramState): +def match_event(event: Event, target: ReadableProgramState) -> bool: + # Match just on PC + debug(f'Matching for PC {hex(target.read_pc())} with event {hex(event.pc)}') + if event.pc == target.read_pc(): + return True + return False + +class GDBProgramState(ProgramState): from focaccia.arch import aarch64, x86 flag_register_names = { @@ -121,11 +137,12 @@ class GDBProgramState(ReadableProgramState): raise MemoryAccessError(addr, size, str(err)) class GDBServerStateIterator: - def __init__(self, remote: str): + def __init__(self, remote: str, deterministic_log: DeterministicLog): gdb.execute('set pagination 0') gdb.execute('set sysroot') gdb.execute('set python print-stack full') # enable complete Python tracebacks gdb.execute(f'target remote {remote}') + self._deterministic_log = deterministic_log self._process = gdb.selected_inferior() self._first_next = True @@ -135,40 +152,197 @@ class GDBServerStateIterator: archname = split[1] if len(split) > 1 else split[0] archname = archname.replace('-', '_') if archname not in supported_architectures: - print(f'Error: Current platform ({archname}) is not' - f' supported by Focaccia. Exiting.') - exit(1) + raise NotImplementedError(f'Platform {archname} is not supported by Focaccia') self.arch = supported_architectures[archname] self.binary = self._process.progspace.filename + first_state = self.current_state() + self._events = EventMatcher(self._deterministic_log.events(), + match_event, + from_state=first_state) + event = self._events.match(first_state) + info(f'Synchronized at PC={hex(first_state.read_pc())} to event:\n{event}') + + def current_state(self) -> ReadableProgramState: + return GDBProgramState(self._process, gdb.selected_frame(), self.arch) + + def _handle_syscall(self, event: Event, post_event: Event) -> GDBProgramState: + call = event.registers.get(self.arch.get_syscall_reg()) + + syscall = emulated_system_calls[self.arch.archname].get(call, None) + if syscall is not None: + info(f'Replaying system call number {hex(call)}') + + self.skip(post_event.pc) + next_state = self.current_state() + + patchup_regs = [self.arch.get_syscall_reg(), *(syscall.patchup_registers or [])] + for reg in patchup_regs: + gdb.parse_and_eval(f'${reg}={post_event.registers.get(reg)}') + + for mem in post_event.mem_writes: + addr, data = mem.address, mem.data + for reg, value in post_event.registers.items(): + if value == addr: + addr = next_state.read_register(reg) + break + + info(f'Replaying write to {hex(addr)} with data:\n{data.hex(" ")}') + + # Insert holes into data + for hole in mem.holes: + data[hole.offset:hole.offset] = b'\x00' * hole.size + self._process.write_memory(addr, data) + return next_state + + return next_state + + info(f'System call number {hex(call)} not replayed') + self._step() + if self._is_exited(): + raise StopIteration + return GDBProgramState(self._process, gdb.selected_frame(), self.arch) + + def _handle_event(self, event: Event | None, post_event: Event | None) -> GDBProgramState: + if not event: + return self.current_state() + + if isinstance(event, SyscallEvent): + return self._handle_syscall(event, post_event) + + warn(f'Event handling for events of type {event.event_type} not implemented') + return self.current_state() + + def _is_exited(self) -> bool: + return not self._process.is_valid() or len(self._process.threads()) == 0 + def __iter__(self): return self - def __next__(self): + def __next__(self) -> ReadableProgramState: # The first call to __next__ should yield the first program state, # i.e. before stepping the first time if self._first_next: self._first_next = False return GDBProgramState(self._process, gdb.selected_frame(), self.arch) + event, post_event = self._events.match_pair(self.current_state()) + if event: + state = self._handle_event(event, post_event) + if self._is_exited(): + raise StopIteration + + return state + # Step pc = gdb.selected_frame().read_register('pc') new_pc = pc while pc == new_pc: # Skip instruction chains from REP STOS etc. - gdb.execute('si', to_string=True) - if not self._process.is_valid() or len(self._process.threads()) == 0: + self._step() + if self._is_exited(): raise StopIteration new_pc = gdb.selected_frame().read_register('pc') - return GDBProgramState(self._process, gdb.selected_frame(), self.arch) + return self.current_state() + + def run_until(self, addr: int) -> ReadableProgramState: + events_handled = 0 + event = self._events.next() + while event: + state = self._run_until_any([addr, event.pc]) + if state.read_pc() == addr: + # Check if we started at the very _start + self._first_next = events_handled == 0 + return state + + event, post_event = self._events.match_pair(self.current_state()) + self._handle_event(event, post_event) + + event = self._events.next() + events_handled += 1 + return self._run_until_any([addr]) + + def _run_until_any(self, addresses: list[int]) -> ReadableProgramState: + info(f'Executing until {[hex(x) for x in addresses]}') + + breakpoints = [] + for addr in addresses: + breakpoints.append(gdb.Breakpoint(f'*{addr:#x}')) - def run_until(self, addr: int): - breakpoint = gdb.Breakpoint(f'*{addr:#x}') gdb.execute('continue') - breakpoint.delete() + + for bp in breakpoints: + bp.delete() + return GDBProgramState(self._process, gdb.selected_frame(), self.arch) + def skip(self, new_pc: int): + gdb.execute(f'set $pc = {hex(new_pc)}') + + def _step(self): + gdb.execute('si', to_string=True) + + def get_sections(self) -> list[MemoryMapping]: + mappings = [] + + # Skip everything until the header line + started = False + + text = gdb.execute('info proc mappings', to_string=True) + for line in text.splitlines(): + line = line.strip() + if not line: + continue + + # Detect header line once + if line.startswith("Start Addr"): + started = True + continue + + if not started: + continue + + # Lines look like: + # 0x0000000000400000 0x0000000000401000 0x1000 0x0 r--p /path + # or: + # 0x... 0x... 0x... 0x... rw-p [vdso] + parts = line.split(None, 6) + + if len(parts) < 5: + continue + + start = int(parts[0], 16) + end = int(parts[1], 16) + size = int(parts[2], 16) + offset = int(parts[3], 16) + perms = parts[4] + + file_or_tag = None + is_special = False + + if len(parts) >= 6: + tail = parts[5] + + # If it's [tag], mark as special + if tail.startswith("[") and tail.endswith("]"): + file_or_tag = tail.strip() + is_special = True + else: + # Might be a filename or absent + file_or_tag = tail + + mapping = MemoryMapping(0, + start, + end, + '', + offset, + 0, + 0) + mappings.append(mapping) + + return mappings + def record_minimal_snapshot(prev_state: ReadableProgramState, cur_state: ReadableProgramState, prev_transform: SymbolicTransform, @@ -211,7 +385,7 @@ def record_minimal_snapshot(prev_state: ReadableProgramState, for regname in regs: try: regval = cur_state.read_register(regname) - out_state.set_register(regname, regval) + out_state.write_register(regname, regval) except RegisterAccessError: pass for mem in mems: @@ -224,7 +398,7 @@ def record_minimal_snapshot(prev_state: ReadableProgramState, pass state = ProgramState(cur_transform.arch) - state.set_register('PC', cur_transform.addr) + state.write_register('PC', cur_transform.addr) set_values(prev_transform.changed_regs.keys(), get_written_addresses(prev_transform), @@ -275,24 +449,33 @@ def collect_conc_trace(gdb: GDBServerStateIterator, \ cur_state = next(state_iter) symb_i = 0 + if logger.isEnabledFor(logging.DEBUG): + debug('Tracing program with the following non-deterministic events:') + for event in gdb._events.events: + debug(event) + # Skip to start + pc = cur_state.read_register('pc') + start_addr = start_addr if start_addr else pc try: - pc = cur_state.read_register('pc') - if start_addr and pc != start_addr: - info(f'Tracing QEMU from starting address: {hex(start_addr)}') + if pc != start_addr: + info(f'Executing until starting address {hex(start_addr)}') cur_state = state_iter.run_until(start_addr) except Exception as e: - if start_addr: + if pc != start_addr: raise Exception(f'Unable to reach start address {hex(start_addr)}: {e}') raise Exception(f'Unable to trace: {e}') # An online trace matching algorithm. + info(f'Tracing QEMU between {hex(start_addr)}:{hex(stop_addr) if stop_addr else "end"}') while True: try: pc = cur_state.read_register('pc') + if stop_addr and pc == stop_addr: + break while pc != strace[symb_i].addr: - info(f'PC {hex(pc)} does not match next symbolic reference {hex(strace[symb_i].addr)}') + warn(f'PC {hex(pc)} does not match next symbolic reference {hex(strace[symb_i].addr)}') next_i = find_index(strace[symb_i+1:], pc, lambda t: t.addr) @@ -341,17 +524,22 @@ def collect_conc_trace(gdb: GDBServerStateIterator, \ # Note: this may occur when symbolic traces were gathered with a stop address if symb_i >= len(strace): warn(f'QEMU executed more states than native execution: {symb_i} vs {len(strace)-1}') - + return states, matched_transforms def main(): args = make_argparser().parse_args() - + logging_level = getattr(logging, args.error_level.upper(), logging.INFO) logging.basicConfig(level=logging_level, force=True) + detlog = DeterministicLog(args.deterministic_log) + if args.deterministic_log and detlog.base_directory is None: + raise NotImplementedError(f'Deterministic log {args.deterministic_log} specified but ' + 'Focaccia built without deterministic log support') + try: - gdb_server = GDBServerStateIterator(args.remote) + gdb_server = GDBServerStateIterator(args.remote, detlog) except Exception as e: raise Exception(f'Unable to perform basic GDB setup: {e}') diff --git a/src/focaccia/qemu/deterministic.py b/src/focaccia/qemu/deterministic.py new file mode 100644 index 0000000..d2d314e --- /dev/null +++ b/src/focaccia/qemu/deterministic.py @@ -0,0 +1,9 @@ +from focaccia.qemu.x86 import emulated_system_calls as x86_emu_syscalls + +emulated_system_calls = { + 'x86_64': x86_emu_syscalls, + 'aarch64': { }, + 'aarch64l': { }, + 'aarch64b': { } +} + diff --git a/src/focaccia/qemu/syscall.py b/src/focaccia/qemu/syscall.py new file mode 100644 index 0000000..956f5c9 --- /dev/null +++ b/src/focaccia/qemu/syscall.py @@ -0,0 +1,14 @@ +class SyscallInfo: + def __init__(self, + name: str, + patchup_registers: list[str] | None = None, + patchup_address_registers: list[str] | None = None): + """Describes a syscall by its name and outputs. + + :param name: The name of a system call. + :param patchup_registers: Registers that must be replaced with deterministic values. + """ + self.name = name + self.patchup_registers = patchup_registers + self.patchup_address_registers = patchup_address_registers + diff --git a/src/focaccia/tools/validation_server.py b/src/focaccia/qemu/validation_server.py index db33ff3..db33ff3 100755 --- a/src/focaccia/tools/validation_server.py +++ b/src/focaccia/qemu/validation_server.py diff --git a/src/focaccia/qemu/x86.py b/src/focaccia/qemu/x86.py new file mode 100644 index 0000000..347bbc4 --- /dev/null +++ b/src/focaccia/qemu/x86.py @@ -0,0 +1,10 @@ +from focaccia.qemu.syscall import SyscallInfo + +# Incomplete, only the most common ones +emulated_system_calls = { + 34: SyscallInfo('pause', []), + 39: SyscallInfo('getpid', []), + 102: SyscallInfo('getuid', []), + 318: SyscallInfo('getrandom', patchup_address_registers=['rdi']) +} + diff --git a/src/focaccia/snapshot.py b/src/focaccia/snapshot.py index 03a03cd..93241c1 100644 --- a/src/focaccia/snapshot.py +++ b/src/focaccia/snapshot.py @@ -92,6 +92,13 @@ class ReadableProgramState: self.arch = arch self.strict = True + def read_pc(self) -> int: + """Read the PC value. + + :raise RegisterAccessError: If the register has not value. + """ + return self.read_register('pc') + def read_register(self, reg: str) -> int: """Read a register's value. @@ -154,7 +161,7 @@ class ProgramState(ReadableProgramState): return (regval & acc.mask) >> acc.start - def set_register(self, reg: str, value: int): + def write_register(self, reg: str, value: int): """Assign a value to a register. :raise RegisterAccessError: If `reg` is not a register name. diff --git a/src/focaccia/symbolic.py b/src/focaccia/symbolic.py index 2a66a26..b83b289 100644 --- a/src/focaccia/symbolic.py +++ b/src/focaccia/symbolic.py @@ -1,42 +1,17 @@ -"""Tools and utilities for execution with Miasm.""" +"""Tools and utilities for execution with Miasm.""" from __future__ import annotations -import sys -import logging - -from pathlib import Path - +from miasm.ir.ir import Lifter from miasm.analysis.machine import Machine -from miasm.core.cpu import instruction as miasm_instr from miasm.core.locationdb import LocationDB -from miasm.expression.expression import Expr, ExprId, ExprMem, ExprInt -from miasm.ir.ir import Lifter +from miasm.core.cpu import instruction as miasm_instr from miasm.ir.symbexec import SymbolicExecutionEngine +from miasm.expression.expression import Expr, ExprId, ExprMem, ExprInt +from .snapshot import ReadableProgramState from .arch import Arch, supported_architectures -from .lldb_target import ( - LLDBConcreteTarget, - LLDBLocalTarget, - LLDBRemoteTarget, - ConcreteRegisterError, - ConcreteMemoryError, -) from .miasm_util import MiasmSymbolResolver, eval_expr, make_machine -from .snapshot import ReadableProgramState, RegisterAccessError, MemoryAccessError -from .trace import Trace, TraceEnvironment -from .utils import timebound, TimeoutError - -logger = logging.getLogger('focaccia-symbolic') -debug = logger.debug -info = logger.info -warn = logger.warn - -# Disable Miasm's disassembly logger -logging.getLogger('asmblock').setLevel(logging.CRITICAL) - -class ValidationError(Exception): - pass def eval_symbol(symbol: Expr, conc_state: ReadableProgramState) -> int: """Evaluate a symbol based on a concrete reference state. @@ -542,8 +517,7 @@ def run_instruction(instr: miasm_instr, res[dst] = expr_simp(ExprCond(cond, dst, v)) return res - def _execute_location(loc, base_state: dict | None) \ - -> tuple[Expr, dict]: + def _execute_location(loc, base_state: dict | None) -> tuple[Expr, dict]: """Execute a single IR block via symbolic engine. No fancy stuff.""" # Query the location's IR block irblock = ircfg.get_block(loc) @@ -601,12 +575,7 @@ def run_instruction(instr: miasm_instr, loc = lifter.add_instr_to_ircfg(instr, ircfg, None, False) assert(isinstance(loc, Expr) or isinstance(loc, LocKey)) except NotImplementedError as err: - msg = f'Unable to lift instruction {instr}: {err}' - if force: - warn(f'{msg}. Skipping') - return None, {} - else: - raise Exception(msg) + raise Exception(f'Unable to lift instruction {instr}: {err}') # Execute instruction symbolically new_pc, modified = execute_location(loc) @@ -614,317 +583,3 @@ def run_instruction(instr: miasm_instr, return new_pc, modified -class SpeculativeTracer(ReadableProgramState): - def __init__(self, target: LLDBConcreteTarget): - super().__init__(target.arch) - self.target = target - self.pc = target.read_register('pc') - self.speculative_pc: int | None = None - self.speculative_count: int = 0 - - self.read_cache = {} - - def speculate(self, new_pc): - self.read_cache.clear() - if new_pc is None: - self.progress_execution() - self.target.step() - self.pc = self.target.read_register('pc') - self.speculative_pc = None - self.speculative_count = 0 - return - - new_pc = int(new_pc) - self.speculative_pc = new_pc - self.speculative_count += 1 - - def progress_execution(self) -> None: - if self.speculative_pc is not None and self.speculative_count != 0: - debug(f'Updating PC to {hex(self.speculative_pc)}') - if self.speculative_count == 1: - self.target.step() - else: - self.target.run_until(self.speculative_pc) - - self.pc = self.speculative_pc - self.speculative_pc = None - self.speculative_count = 0 - - self.read_cache.clear() - - def run_until(self, addr: int): - if self.speculative_pc: - raise Exception('Attempting manual execution with speculative execution enabled') - self.target.run_until(addr) - self.pc = addr - - def step(self): - self.progress_execution() - if self.target.is_exited(): - return - self.target.step() - self.pc = self.target.read_register('pc') - - def _cache(self, name: str, value): - self.read_cache[name] = value - return value - - def read_pc(self) -> int: - if self.speculative_pc is not None: - return self.speculative_pc - return self.pc - - def read_flags(self) -> dict[str, int | bool]: - if 'flags' in self.read_cache: - return self.read_cache['flags'] - self.progress_execution() - return self._cache('flags', self.target.read_flags()) - - def read_register(self, reg: str) -> int: - regname = self.arch.to_regname(reg) - if regname is None: - raise RegisterAccessError(reg, f'Not a register name: {reg}') - - if reg in self.read_cache: - return self.read_cache[reg] - - self.progress_execution() - return self._cache(reg, self.target.read_register(regname)) - - def write_register(self, regname: str, value: int): - self.progress_execution() - self.read_cache.pop(regname, None) - self.target.write_register(regname, value) - - def read_instructions(self, addr: int, size: int) -> bytes: - return self.target.read_memory(addr, size) - - def read_memory(self, addr: int, size: int) -> bytes: - self.progress_execution() - cache_name = f'{addr}_{size}' - if cache_name in self.read_cache: - return self.read_cache[cache_name] - return self._cache(cache_name, self.target.read_memory(addr, size)) - - def write_memory(self, addr: int, value: bytes): - self.progress_execution() - self.read_cache.pop(addr, None) - self.target.write_memory(addr, value) - - def __getattr__(self, name: str): - return getattr(self.target, name) - -class SymbolicTracer: - """A symbolic tracer that uses `LLDBConcreteTarget` with Miasm to simultaneously execute a - program with concrete state and collect its symbolic transforms - """ - def __init__(self, - env: TraceEnvironment, - remote: str | None=None, - force: bool=False, - cross_validate: bool=False): - self.env = env - self.force = force - self.remote = remote - self.cross_validate = cross_validate - self.target = SpeculativeTracer(self.create_debug_target()) - - self.nondet_events = self.env.detlog.events() - self.next_event: int | None = None - - def create_debug_target(self) -> LLDBConcreteTarget: - binary = self.env.binary_name - if self.remote is False: - debug(f'Launching local debug target {binary} {self.env.argv}') - debug(f'Environment: {self.env}') - return LLDBLocalTarget(binary, self.env.argv, self.env.envp) - - debug(f'Connecting to remote debug target {self.remote}') - target = LLDBRemoteTarget(self.remote, binary) - - module_name = target.determine_name() - binary = str(Path(self.env.binary_name).resolve()) - if binary != module_name: - warn(f'Discovered binary name {module_name} differs from specified name {binary}') - - return target - - def predict_next_state(self, instruction: Instruction, transform: SymbolicTransform): - debug(f'Evaluating register and memory transforms for {instruction} to cross-validate') - predicted_regs = transform.eval_register_transforms(self.target) - predicted_mems = transform.eval_memory_transforms(self.target) - return predicted_regs, predicted_mems - - def validate(self, - instruction: Instruction, - transform: SymbolicTransform, - predicted_regs: dict[str, int], - predicted_mems: dict[int, bytes]): - # Verify last generated transform by comparing concrete state against - # predicted values. - if self.target.is_exited(): - return - - debug('Cross-validating symbolic transforms by comparing actual to predicted values') - for reg, val in predicted_regs.items(): - conc_val = self.target.read_register(reg) - if conc_val != val: - raise ValidationError(f'Symbolic execution backend generated false equation for' - f' [{hex(instruction.addr)}]: {instruction}:' - f' Predicted {reg} = {hex(val)}, but the' - f' concrete state has value {reg} = {hex(conc_val)}.' - f'\nFaulty transformation: {transform}') - for addr, data in predicted_mems.items(): - conc_data = self.target.read_memory(addr, len(data)) - if conc_data != data: - raise ValidationError(f'Symbolic execution backend generated false equation for' - f' [{hex(instruction.addr)}]: {instruction}: Predicted' - f' mem[{hex(addr)}:{hex(addr+len(data))}] = {data},' - f' but the concrete state has value' - f' mem[{hex(addr)}:{hex(addr+len(data))}] = {conc_data}.' - f'\nFaulty transformation: {transform}') - - def progress_event(self) -> None: - if (self.next_event + 1) < len(self.nondet_events): - self.next_event += 1 - debug(f'Next event to handle at index {self.next_event}') - else: - self.next_event = None - - def post_event(self) -> None: - if self.next_event: - if self.nondet_events[self.next_event].pc == 0: - # Exit sequence - debug('Completed exit event') - self.target.run() - - debug(f'Completed handling event at index {self.next_event}') - self.progress_event() - - def is_stepping_instr(self, pc: int, instruction: Instruction) -> bool: - if self.nondet_events: - pc = pc + instruction.length # detlog reports next pc for each event - if self.next_event and self.nondet_events[self.next_event].match(pc, self.target): - debug('Current instruction matches next event; stepping through it') - self.progress_event() - return True - else: - if self.target.arch.is_instr_syscall(str(instruction)): - return True - return False - - def progress(self, new_pc, step: bool = False) -> int | None: - self.target.speculate(new_pc) - if step: - self.target.progress_execution() - if self.target.is_exited(): - return None - return self.target.read_pc() - - def trace(self, time_limit: int | None = None) -> Trace[SymbolicTransform]: - """Execute a program and compute state transformations between executed - instructions. - - :param start_addr: Address from which to start tracing. - :param stop_addr: Address until which to trace. - """ - # Set up concrete reference state - if self.env.start_address is not None: - self.target.run_until(self.env.start_address) - - for i in range(len(self.nondet_events)): - if self.nondet_events[i].pc == self.target.read_pc(): - self.next_event = i+1 - if self.next_event >= len(self.nondet_events): - break - - debug(f'Starting from event {self.nondet_events[i]} onwards') - break - - ctx = DisassemblyContext(self.target) - arch = ctx.arch - - if logger.isEnabledFor(logging.DEBUG): - debug('Tracing program with the following non-deterministic events') - for event in self.nondet_events: - debug(event) - - # Trace concolically - strace: list[SymbolicTransform] = [] - while not self.target.is_exited(): - pc = self.target.read_pc() - - if self.env.stop_address is not None and pc == self.env.stop_address: - break - - assert(pc != 0) - - # Disassemble instruction at the current PC - tid = self.target.get_current_tid() - try: - instruction = ctx.disassemble(pc) - info(f'[{tid}] Disassembled instruction {instruction} at {hex(pc)}') - except: - err = sys.exc_info()[1] - - # Try to recovery by using the LLDB disassembly instead - try: - alt_disas = self.target.get_disassembly(pc) - instruction = Instruction.from_string(alt_disas, ctx.arch, pc, - self.target.get_instruction_size(pc)) - info(f'[{tid}] Disassembled instruction {instruction} at {hex(pc)}') - except: - if self.force: - if alt_disas: - warn(f'[{tid}] Unable to handle instruction {alt_disas} at {hex(pc)} in Miasm.' - f' Skipping.') - else: - warn(f'[{tid}] Unable to disassemble instruction {hex(pc)}: {err}.' - f' Skipping.') - self.target.step() - continue - raise # forward exception - - is_event = self.is_stepping_instr(pc, instruction) - - # Run instruction - conc_state = MiasmSymbolResolver(self.target, ctx.loc_db) - - try: - new_pc, modified = timebound(time_limit, run_instruction, - instruction.instr, conc_state, ctx.lifter) - except TimeoutError: - warn(f'Running instruction {instruction} took longer than {time_limit} second. Skipping') - new_pc, modified = None, {} - - if self.cross_validate and new_pc: - # Predict next concrete state. - # We verify the symbolic execution backend on the fly for some - # additional protection from bugs in the backend. - new_pc = int(new_pc) - transform = SymbolicTransform(tid, modified, [instruction], arch, pc, new_pc) - pred_regs, pred_mems = self.predict_next_state(instruction, transform) - self.progress(new_pc, step=is_event) - - try: - self.validate(instruction, transform, pred_regs, pred_mems) - except ValidationError as e: - if self.force: - warn(f'Cross-validation failed: {e}') - continue - raise - else: - new_pc = self.progress(new_pc, step=is_event) - if new_pc is None: - transform = SymbolicTransform(tid, modified, [instruction], arch, pc, 0) - strace.append(transform) - continue # we're done - transform = SymbolicTransform(tid, modified, [instruction], arch, pc, new_pc) - - strace.append(transform) - - if is_event: - self.post_event() - - return Trace(strace, self.env) - diff --git a/src/focaccia/tools/capture_transforms.py b/src/focaccia/tools/capture_transforms.py index 1208156..268af36 100755 --- a/src/focaccia/tools/capture_transforms.py +++ b/src/focaccia/tools/capture_transforms.py @@ -5,8 +5,9 @@ import argparse import logging from focaccia import parser, utils -from focaccia.symbolic import SymbolicTracer from focaccia.trace import TraceEnvironment +from focaccia.native.tracer import SymbolicTracer +from focaccia.deterministic import DeterministicLog def main(): prog = argparse.ArgumentParser() @@ -62,20 +63,10 @@ def main(): else: logging.basicConfig(level=logging.INFO) - detlog = None - if args.deterministic_log: - from focaccia.deterministic import DeterministicLog - detlog = DeterministicLog(args.deterministic_log) - else: - class NullDeterministicLog: - def __init__(self): pass - def events_file(self): return None - def tasks_file(self): return None - def mmaps_file(self): return None - def events(self): return [] - def tasks(self): return [] - def mmaps(self): return [] - detlog = NullDeterministicLog() + detlog = DeterministicLog(args.deterministic_log) + if args.deterministic_log and detlog.base_directory is None: + raise NotImplementedError(f'Deterministic log {args.deterministic_log} specified but ' + 'Focaccia built without deterministic log support') env = TraceEnvironment(args.binary, args.args, utils.get_envp(), nondeterminism_log=detlog, diff --git a/src/focaccia/tools/validate_qemu.py b/src/focaccia/tools/validate_qemu.py index e834a6d..4b5160f 100755 --- a/src/focaccia/tools/validate_qemu.py +++ b/src/focaccia/tools/validate_qemu.py @@ -23,11 +23,13 @@ import argparse import sysconfig import subprocess +import focaccia.qemu from focaccia.compare import ErrorTypes from focaccia.arch import supported_architectures -from focaccia.tools.validation_server import start_validation_server +from focaccia.qemu.validation_server import start_validation_server verbosity = { + 'debug': ErrorTypes.INFO, 'info': ErrorTypes.INFO, 'warning': ErrorTypes.POSSIBLE, 'error': ErrorTypes.CONFIRMED, @@ -78,10 +80,12 @@ memory, and stepping forward by single instructions. prog.add_argument('--remote', type=str, help='The hostname:port pair at which to find a QEMU GDB server.') - prog.add_argument('--gdb', + prog.add_argument('--gdb', type=str, default='gdb', help='GDB binary to invoke.') + prog.add_argument('--deterministic-log', default=None, + help='The directory containing rr traces') return prog def quoted(s: str) -> str: @@ -118,7 +122,7 @@ def main(): args.quiet) else: # QEMU GDB interface - script_dirname = os.path.dirname(__file__) + script_dirname = os.path.dirname(focaccia.qemu.__file__) qemu_tool_path = os.path.join(script_dirname, '_qemu_tool.py') # We have to remove all arguments we don't want to pass to the qemu tool |