diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/focaccia/arch/aarch64.py | 4 | ||||
| -rw-r--r-- | src/focaccia/arch/arch.py | 1 | ||||
| -rw-r--r-- | src/focaccia/arch/x86.py | 4 | ||||
| -rw-r--r-- | src/focaccia/snapshot.py | 19 | ||||
| -rw-r--r-- | src/focaccia/symbolic.py | 2 | ||||
| -rw-r--r-- | src/focaccia/tools/_qemu_tool.py | 9 | ||||
| -rwxr-xr-x | src/focaccia/tools/validate_qemu.py | 5 | ||||
| -rwxr-xr-x | src/focaccia/tools/validation_server.py | 417 |
8 files changed, 455 insertions, 6 deletions
diff --git a/src/focaccia/arch/aarch64.py b/src/focaccia/arch/aarch64.py index 6c1163e..af173e3 100644 --- a/src/focaccia/arch/aarch64.py +++ b/src/focaccia/arch/aarch64.py @@ -162,6 +162,10 @@ def decompose_cpsr(cpsr: int) -> dict[str, int]: class ArchAArch64(Arch): def __init__(self, endianness: Arch.Endianness): super().__init__(archname, registers, 64, endianness) + self.ignored_regs = [ + 'N', 'Z', 'C', 'V', 'Q', 'SSBS', 'PAN', 'DIT', 'GE', + 'E', 'A', 'I', 'F', 'M' + ] def to_regname(self, name: str) -> str | None: reg = super().to_regname(name) diff --git a/src/focaccia/arch/arch.py b/src/focaccia/arch/arch.py index 8aac8b5..234b0d9 100644 --- a/src/focaccia/arch/arch.py +++ b/src/focaccia/arch/arch.py @@ -46,6 +46,7 @@ class Arch(): self.archname = archname self.ptr_size = ptr_size self.endianness: Literal['little', 'big'] = endianness + self.ignored_regs: list[str] = [] self._accessors = {} for desc in registers: diff --git a/src/focaccia/arch/x86.py b/src/focaccia/arch/x86.py index fefab37..809055e 100644 --- a/src/focaccia/arch/x86.py +++ b/src/focaccia/arch/x86.py @@ -186,6 +186,10 @@ def compose_rflags(rflags: dict[str, int]) -> int: class ArchX86(Arch): def __init__(self): super().__init__(archname, registers, 64) + self.ignored_regs = [ + 'EFLAGS', 'CF', 'PF', 'AF', 'ZF', 'SF', 'TF', 'IF', 'DF', 'OF', + 'CS', 'DS', 'SS', 'ES', 'FS', 'GS', + ] def to_regname(self, name: str) -> str | None: """The X86 override of the standard register name lookup. diff --git a/src/focaccia/snapshot.py b/src/focaccia/snapshot.py index 1945d71..294afb1 100644 --- a/src/focaccia/snapshot.py +++ b/src/focaccia/snapshot.py @@ -27,6 +27,13 @@ class SparseMemory: off = addr % self.page_size return addr - off, off + def drop_all(self): + self._pages.clear() + + def test(self, addr: int) -> bool: + page_addr, off = self._to_page_addr_and_offset(addr) + return page_addr in self._pages + def read(self, addr: int, size: int) -> bytes: """Read a number of bytes from memory. :param addr: The offset from where to read. @@ -113,6 +120,18 @@ class ProgramState(ReadableProgramState): self.regs: dict[str, int | None] = {reg: None for reg in arch.regnames} self.mem = SparseMemory() + def test_register(self, reg: str) -> bool: + """Test whether a register has a value assigned. + + :raise RegisterAccessError: If `reg` is not a register name. + """ + acc = self.arch.get_reg_accessor(reg) + if acc is None: + raise RegisterAccessError(reg, f'Not a register name: {reg}') + + assert(acc.base_reg in self.regs) + return self.regs[acc.base_reg] is not None + def read_register(self, reg: str) -> int: """Read a register's value. diff --git a/src/focaccia/symbolic.py b/src/focaccia/symbolic.py index 6a99b60..3925a06 100644 --- a/src/focaccia/symbolic.py +++ b/src/focaccia/symbolic.py @@ -338,6 +338,8 @@ class SymbolicTransform: """ res = {} for regname, expr in self.changed_regs.items(): + if regname.upper() in self.arch.ignored_regs: + continue res[regname] = eval_symbol(expr, conc_state) return res diff --git a/src/focaccia/tools/_qemu_tool.py b/src/focaccia/tools/_qemu_tool.py index 23c1488..706a9fe 100644 --- a/src/focaccia/tools/_qemu_tool.py +++ b/src/focaccia/tools/_qemu_tool.py @@ -291,7 +291,14 @@ def collect_conc_trace(gdb: GDBServerStateIterator, \ return states, matched_transforms def main(): - args = make_argparser().parse_args() + prog = make_argparser() + prog.add_argument('hostname', + help='The hostname at which to find the GDB server.') + prog.add_argument('port', + type=int, + help='The port at which to find the GDB server.') + + args = prog.parse_args() gdbserver_addr = 'localhost' gdbserver_port = args.port diff --git a/src/focaccia/tools/validate_qemu.py b/src/focaccia/tools/validate_qemu.py index 56e3beb..2b7e65c 100755 --- a/src/focaccia/tools/validate_qemu.py +++ b/src/focaccia/tools/validate_qemu.py @@ -40,11 +40,6 @@ In fact, this tool could be used to test any emulator that provides a GDB-server interface. The server must support reading registers, reading memory, and stepping forward by single instructions. """ - prog.add_argument('hostname', - help='The hostname at which to find the GDB server.') - prog.add_argument('port', - type=int, - help='The port at which to find the GDB server.') prog.add_argument('--symb-trace', required=True, help='A pre-computed symbolic transformation trace to' \ diff --git a/src/focaccia/tools/validation_server.py b/src/focaccia/tools/validation_server.py new file mode 100755 index 0000000..6a62fa3 --- /dev/null +++ b/src/focaccia/tools/validation_server.py @@ -0,0 +1,417 @@ +#! /usr/bin/env python3 + +from typing import Iterable + +import focaccia.parser as parser +from focaccia.arch import supported_architectures, Arch +from focaccia.compare import compare_symbolic +from focaccia.snapshot import ProgramState, ReadableProgramState, \ + RegisterAccessError, MemoryAccessError +from focaccia.symbolic import SymbolicTransform, eval_symbol, ExprMem +from focaccia.trace import Trace, TraceEnvironment +from focaccia.utils import print_result + +from validate_qemu import make_argparser, verbosity + +import socket +import struct +import os + + +SOCK_PATH = '/tmp/focaccia.sock' + +def endian_fmt(endianness: str) -> str: + if endianness == 'little': + return '<' + else: + return '>' + +def mkCommand(cmd: str, endianness: str, reg: str="", addr: int=0, size: int=0) -> bytes: + # char[16]:regname | long long:addr long long:size | long long:unused + # READ REG | READ MEM | STEP ONE + + if cmd == 'read register': + fmt = f'{endian_fmt(endianness)}16s9s' + return struct.pack(fmt,reg.encode('utf-8'),"READ REG".encode('utf-8')) + elif cmd == 'read memory': + fmt = f'{endian_fmt(endianness)}QQ9s' + return struct.pack(fmt, addr, size, "READ MEM".encode('utf-8')) + elif cmd == 'step': + fmt = f'{endian_fmt(endianness)}qq9s' + return struct.pack(fmt, 0, 0, "STEP ONE".encode('utf-8')) + else: + raise ValueError(f'Unknown command {cmd}') +def unmkMemory(msg: bytes, endianness: str) -> tuple: + # packed! + # unsigned long long: addr + # unsigned long: length + fmt = f'{endian_fmt(endianness)}QQ' + addr, length = struct.unpack(fmt, msg) + + return addr, length + +def unmkRegister(msg: bytes, endianness: str) -> tuple: + # packed! + # char[108]:regname | unsigned long:bytes | char[64]:value + fmt = f'{endian_fmt(endianness)}108sQ64s' + reg_name, size, val = struct.unpack(fmt, msg) + reg_name = reg_name.decode('utf-8').rstrip('\x00') + + if reg_name == "UNKNOWN": + raise RegisterAccessError(reg_name, + f'[QEMU Plugin] Unable to access register {reg_name}.') + + val = val[:size] + + val = int.from_bytes(val, endianness) + + return val, size + +class PluginProgramState(ProgramState): + from focaccia.arch import aarch64, x86 + + flag_register_names = { + aarch64.archname: 'cpsr', + x86.archname: 'eflags', + } + + flag_register_decompose = { + aarch64.archname: aarch64.decompose_cpsr, + x86.archname: x86.decompose_rflags, + } + + def _flush_caches(self): + for r in self.regs.keys(): + self.regs[r] = None + self.mem.drop_all() + + + def __init__(self, arch: Arch): + super().__init__(arch) + self.curr_pc = -1 + + def read_register(self, reg: str, no_cached: bool=False) -> int: + global CONN + + if reg == 'RFLAGS': + reg = 'EFLAGS' + + flags = self.flag_register_decompose[self.arch.archname](0).keys() + if reg in flags and self.arch.archname in self.flag_register_names: + reg_name = self.flag_register_names[self.arch.archname] + else: + reg_name = reg + + reg_name = reg_name.lower() + reg_name = self.arch.to_regname(reg_name) + reg_name = reg_name.lower() + + val = None + from_cache = False + if not no_cached and super().test_register(reg_name): + val = super().read_register(reg_name) + from_cache = True + else: + msg = mkCommand("read register", self.arch.endianness, reg=reg_name) + CONN.send(msg) + + try: + resp = CONN.recv(180) + except ConnectionResetError: + raise StopIteration + + if len(resp) < 180: + print(f'Invalid response length: {len(resp)}') + print(f'Response: {resp}') + return 0 + + + val, size = unmkRegister(resp, self.arch.endianness) + + # Try to access the flags register with `reg` as a logical flag name + if reg in flags and self.arch.archname in self.flag_register_names: + flags_reg = self.flag_register_names[self.arch.archname] + _flags = self.flag_register_decompose[self.arch.archname](val) + if reg in _flags: + if not from_cache: + self.set_register(reg, _flags[reg]) + return _flags[reg] + raise RegisterAccessError(f'Unable to access flag {reg}.') + + if not from_cache: + self.set_register(reg, val) + return val + + def read_memory(self, addr: int, size: int) -> bytes: + global CONN + + if self.mem.test(addr): + return super().read_memory(addr, size) + + # print(f'Reading memory at {addr:x}, size={size}') + + msg = mkCommand("read memory", self.arch.endianness, addr=addr, size=size) + CONN.send(msg) + + try: + resp = CONN.recv(16) + except ConnectionResetError: + raise StopIteration + _addr, length = unmkMemory(resp, self.arch.endianness) + + if _addr != addr or length == 0: + raise MemoryAccessError( + _addr, size, + f'Unable to access memory at address {addr:x}, size={size}.') + return b'' + + mem = b'' + while len(mem) < length: + try: + resp = CONN.recv(length - len(mem)) + except ConnectionResetError: + raise StopIteration + mem += resp + + self.write_memory(addr, mem) + return mem + + def step(self): + global CONN + + self._flush_caches() + msg = mkCommand("step", self.arch.endianness) + CONN.send(msg) + + + return + +class PluginStateIterator: + + def __init__(self, sock_path: str, arch: Arch): + global SOCK + global CONN + + self.sock_path = sock_path + self.arch = arch + self._first_next = True + + + # Start the server that waits for QEMU to connect + try: + os.unlink(self.sock_path) + except FileNotFoundError: + pass + # TODO: allow new connections when QEMU clones + SOCK = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + + print(f'Listening for QEMU Plugin connection at {self.sock_path}...') + SOCK.bind(self.sock_path) + SOCK.listen(1) + + CONN, qemu_addr = SOCK.accept() + + # Handshake with QEMU + pid_b = CONN.recv(4) + pid = struct.unpack('i', pid_b)[0] + print(f'Connected to QEMU instance with PID {pid}.') + + def __iter__(self): + return self + + def __next__(self): + # The first call to __next__ should yield the first program state, + # i.e. after stepping the first time + if self._first_next: + self._first_next = False + self.state = PluginProgramState(self.arch) + #self.state.step() + return self.state + + # Step + pc = self.state.read_register('pc') + new_pc = pc + while pc == new_pc: # Skip instruction chains from REP STOS etc. + self.state.step() + new_pc = self.state.read_register('pc', True) + + return self.state + +def record_minimal_snapshot(prev_state: ProgramState, + cur_state: PluginProgramState, + prev_transform: SymbolicTransform, + cur_transform: SymbolicTransform) \ + -> ProgramState: + """Record a minimal snapshot. + + A minimal snapshot must include values (registers and memory) that are + accessed by two transformations: + 1. The values produced by the previous transformation (the + transformation that is producing this snapshot) to check these + values against expected values calculated from the previous + program state. + 2. The values that act as inputs to the transformation acting on this + snapshot, to calculate the expected values of the next snapshot. + + :param prev_transform: The symbolic transformation generating, or + leading to, `cur_state`. Values generated by + this transformation are included in the + snapshot. + :param transform: The symbolic transformation operating on this + snapshot. Input values to this transformation are + included in the snapshot. + """ + assert(cur_state.read_register('pc') == cur_transform.addr) + assert(prev_transform.arch == cur_transform.arch) + + def get_written_addresses(t: SymbolicTransform): + """Get all output memory accesses of a symbolic transformation.""" + return [ExprMem(a, v.size) for a, v in t.changed_mem.items()] + + def set_values(regs: Iterable[str], mems: Iterable[ExprMem], + cur_state: PluginProgramState, + prev_state: PluginProgramState, + out_state: ProgramState): + """ + :param prev_state: Addresses of memory included in the snapshot are + resolved relative to this state. + """ + for regname in regs: + try: + regval = cur_state.read_register(regname) + out_state.set_register(regname, regval) + except RegisterAccessError: + pass + for mem in mems: + assert(mem.size % 8 == 0) + addr = eval_symbol(mem.ptr, prev_state) + try: + mem = cur_state.read_memory(addr, int(mem.size / 8)) + out_state.write_memory(addr, mem) + except MemoryAccessError: + pass + + state = ProgramState(cur_transform.arch) + state.set_register('pc', cur_transform.addr) + + set_values(prev_transform.changed_regs.keys(), + get_written_addresses(prev_transform), + cur_state, + prev_state, # Evaluate memory addresses based on previous + # state because they are that state's output + # addresses. + state) + set_values(cur_transform.get_used_registers(), + cur_transform.get_used_memory_addresses(), + cur_state, + cur_state, + state) + return state + +def collect_conc_trace(qemu: PluginStateIterator, \ + strace: list[SymbolicTransform]) \ + -> tuple[list[ProgramState], list[SymbolicTransform]]: + """Collect a trace of concrete states from QEMU. + + Records minimal concrete states from QEMU by using symbolic trace + information to determine which register/memory values are required to + verify the correctness of QEMU. + + May drop symbolic transformations if the symbolic trace and the QEMU trace + diverge (e.g. because of differences in environment, etc.). Returns the + new, possibly modified, symbolic trace that matches the returned concrete + trace. + + :return: A list of concrete states and a list of corresponding symbolic + transformations. The lists are guaranteed to have the same length. + """ + def find_index(seq, target, access=lambda el: el): + for i, el in enumerate(seq): + if access(el) == target: + return i + return None + + if not strace: + return [], [] + + states = [] + matched_transforms = [] + + state_iter = iter(qemu) + cur_state = next(state_iter) + symb_i = 0 + + # An online trace matching algorithm. + while True: + try: + pc = cur_state.read_register('pc') + + while pc != strace[symb_i].addr: + next_i = find_index(strace[symb_i+1:], pc, lambda t: t.addr) + + # Drop the concrete state if no address in the symbolic trace + # matches + if next_i is None: + print(f'Warning: Dropping concrete state {hex(pc)}, as no' + f' matching instruction can be found in the symbolic' + f' reference trace.') + cur_state = next(state_iter) + pc = cur_state.read_register('pc', True) + continue + + # Otherwise, jump to the next matching symbolic state + symb_i += next_i + 1 + + assert(cur_state.read_register('pc') == strace[symb_i].addr) + states.append(record_minimal_snapshot( + states[-1] if states else cur_state, + cur_state, + matched_transforms[-1] if matched_transforms else strace[symb_i], + strace[symb_i])) + matched_transforms.append(strace[symb_i]) + cur_state = next(state_iter) + symb_i += 1 + except StopIteration: + break + + return states, matched_transforms + + +def main(): + prog = make_argparser() + prog.add_argument('--use-socket', + required=True, + type=str, + help='Use QEMU Plugin interface at <socket> instead of GDB') + prog.add_argument('--guest_arch', + required=True, + type=str, + help='Architecture of the guest being emulated. (Only required when using --use-socket)') + + args = prog.parse_args() + + # Read pre-computed symbolic trace + with open(args.symb_trace, 'r') as strace: + symb_transforms = parser.parse_transformations(strace) + + arch = supported_architectures.get(args.guest_arch) + sock_path = args.use_socket + + qemu = PluginStateIterator(sock_path, arch) + # Use symbolic trace to collect concrete trace from QEMU + conc_states, matched_transforms = collect_conc_trace( + qemu, + symb_transforms.states) + + # Verify and print result + if not args.quiet: + res = compare_symbolic(conc_states, matched_transforms) + print_result(res, verbosity[args.error_level]) + + if args.output: + from focaccia.parser import serialize_snapshots + with open(args.output, 'w') as file: + serialize_snapshots(Trace(conc_states, env), file) + +if __name__ == "__main__": + main() |