diff options
| -rw-r--r-- | src/focaccia/arch/aarch64.py | 4 | ||||
| -rw-r--r-- | src/focaccia/arch/arch.py | 4 | ||||
| -rw-r--r-- | src/focaccia/arch/x86.py | 7 | ||||
| -rw-r--r-- | src/focaccia/lldb_target.py | 11 | ||||
| -rw-r--r-- | src/focaccia/parser.py | 1 | ||||
| -rw-r--r-- | src/focaccia/symbolic.py | 224 |
6 files changed, 160 insertions, 91 deletions
diff --git a/src/focaccia/arch/aarch64.py b/src/focaccia/arch/aarch64.py index 0e7d98c..76f7bd4 100644 --- a/src/focaccia/arch/aarch64.py +++ b/src/focaccia/arch/aarch64.py @@ -179,3 +179,7 @@ class ArchAArch64(Arch): from . import aarch64_dczid as dczid return dczid.read return None + + def is_instr_syscall(self, instr: str) -> bool: + return instr.upper().startswith('SVC') + diff --git a/src/focaccia/arch/arch.py b/src/focaccia/arch/arch.py index a1b0671..c220a3b 100644 --- a/src/focaccia/arch/arch.py +++ b/src/focaccia/arch/arch.py @@ -103,8 +103,12 @@ class Arch(): """ return False + def is_instr_syscall(self, instr: str) -> bool: + return False + def __eq__(self, other): return self.archname == other.archname def __repr__(self) -> str: return self.archname + diff --git a/src/focaccia/arch/x86.py b/src/focaccia/arch/x86.py index 9ec5c51..a5d29f5 100644 --- a/src/focaccia/arch/x86.py +++ b/src/focaccia/arch/x86.py @@ -208,3 +208,10 @@ class ArchX86(Arch): return True return False + def is_instr_syscall(self, instr: str) -> bool: + if instr.upper().startswith("SYSCALL"): + return True + if instr.upper().startswith("INT"): + return True + return False + diff --git a/src/focaccia/lldb_target.py b/src/focaccia/lldb_target.py index f529f9c..283c5bf 100644 --- a/src/focaccia/lldb_target.py +++ b/src/focaccia/lldb_target.py @@ -111,7 +111,6 @@ class LLDBConcreteTarget: if state == lldb.eStateExited: raise RuntimeError('Tried to resume process execution, but the' ' process has already exited.') - assert(state == lldb.eStateStopped) self.process.Continue() def step(self): @@ -122,8 +121,12 @@ class LLDBConcreteTarget: def run_until(self, address: int) -> None: """Continue execution until the address is arrived, ignores other breakpoints""" bp = self.target.BreakpointCreateByAddress(address) - while self.read_register("pc") != address: + while True: self.run() + if self.is_exited(): + return + if self.read_register('pc') == address: + break self.target.BreakpointDelete(bp.GetID()) def record_snapshot(self) -> ProgramState: @@ -241,7 +244,7 @@ class LLDBConcreteTarget: f'[In LLDBConcreteTarget.write_register]: Unable to set' f' {regname} to value {hex(value)}!') - def read_memory(self, addr, size): + def read_memory(self, addr: int, size: int) -> bytes: """Read bytes from memory. :raise ConcreteMemoryError: If unable to read `size` bytes from `addr`. @@ -256,7 +259,7 @@ class LLDBConcreteTarget: else: return bytes(reversed(content)) - def write_memory(self, addr, value: bytes): + def write_memory(self, addr: int, value: bytes): """Write bytes to memory. :raise ConcreteMemoryError: If unable to write at `addr`. diff --git a/src/focaccia/parser.py b/src/focaccia/parser.py index e9e5e0c..c157a36 100644 --- a/src/focaccia/parser.py +++ b/src/focaccia/parser.py @@ -201,3 +201,4 @@ def parse_box64(stream: TextIO, arch: Arch) -> Trace[ProgramState]: states[-1].set_register(regname, int(value, 16)) return Trace(states, _make_unknown_env()) + diff --git a/src/focaccia/symbolic.py b/src/focaccia/symbolic.py index b4b29f6..270de55 100644 --- a/src/focaccia/symbolic.py +++ b/src/focaccia/symbolic.py @@ -1,4 +1,4 @@ -"""Tools and utilities for symbolic execution with Miasm.""" +"""Tools and utilities for execution with Miasm.""" from __future__ import annotations import logging @@ -429,7 +429,7 @@ class SymbolicTransform: def __repr__(self) -> str: start, end = self.range - res = f'Symbolic state transformation {hex(start)} -> {hex(end)}:\n' + res = f'Symbolic state transformation {start} -> {end}:\n' res += ' [Symbols]\n' for reg, expr in self.changed_regs.items(): res += f' {reg:6s} = {expr}\n' @@ -452,8 +452,8 @@ class MemoryBinstream: def __getitem__(self, key: int | slice): if isinstance(key, slice): - return self._state.read_memory(key.start, key.stop - key.start) - return self._state.read_memory(key, 1) + return self._state.read_instructions(key.start, key.stop - key.start) + return self._state.read_instructions(key, 1) class DisassemblyContext: def __init__(self, target: ReadableProgramState): @@ -597,31 +597,77 @@ def run_instruction(instr: miasm_instr, return new_pc, modified -class _LLDBConcreteState(ReadableProgramState): - """A wrapper around `LLDBConcreteTarget` that provides access via a - `ReadableProgramState` interface. Reads values directly from an LLDB - target. This saves us the trouble of recording a full program state, and - allows us instead to read values from LLDB on demand. - """ +class SpeculativeTracer(ReadableProgramState): def __init__(self, target: LLDBConcreteTarget): super().__init__(target.arch) - self._target = target + self.target = target + self.pc = target.read_register('pc') + self.speculative_pc: int | None = None + self.speculative_count: int = 0 + + def speculate(self, new_pc: int): + self.speculative_pc = new_pc + self.speculative_count += 1 + + def progress_execution(self) -> None: + if self.speculative_pc is not None and self.speculative_count != 0: + debug(f'Updating PC to {hex(self.speculative_pc)}') + if self.speculative_count == 1: + self.target.step() + else: + self.target.run_until(self.speculative_pc) + + self.pc = self.speculative_pc + self.speculative_pc = None + self.speculative_count = 0 + + def run_until(self, addr: int): + if self.speculative_pc: + raise Exception('Attempting manual execution with speculative execution enabled') + self.target.run_until(addr) + self.pc = addr + + def step(self): + self.progress_execution() + if self.target.is_exited(): + return + self.target.step() + self.pc = self.target.read_register('pc') + + def read_pc(self) -> int: + if self.speculative_pc is not None: + return self.speculative_pc + return self.pc + + def read_flags(self) -> dict[str, int | bool]: + self.progress_execution() + return self.target.read_flags() def read_register(self, reg: str) -> int: regname = self.arch.to_regname(reg) if regname is None: raise RegisterAccessError(reg, f'Not a register name: {reg}') - try: - return self._target.read_register(regname) - except ConcreteRegisterError: - raise RegisterAccessError(regname, '') + self.progress_execution() + return self.target.read_register(regname) + + def write_register(self, regname: str, value: int): + self.progress_execution() + self.target.write_register(regname, value) + + def read_instructions(self, addr: int, size: int) -> bytes: + return self.target.read_memory(addr, size) def read_memory(self, addr: int, size: int) -> bytes: - try: - return self._target.read_memory(addr, size) - except ConcreteMemoryError: - raise MemoryAccessError(addr, size, 'Unable to read memory from LLDB.') + self.progress_execution() + return self.target.read_memory(addr, size) + + def write_memory(self, addr: int, value: bytes): + self.progress_execution() + self.target.write_memory(addr, value) + + def __getattr__(self, name: str): + return getattr(self.target, name) class SymbolicTracer: """A symbolic tracer that uses `LLDBConcreteTarget` with Miasm to simultaneously execute a @@ -636,6 +682,7 @@ class SymbolicTracer: self.force = force self.remote = remote self.cross_validate = cross_validate + self.target = SpeculativeTracer(self.create_debug_target()) def create_debug_target(self) -> LLDBConcreteTarget: binary = self.env.binary_name @@ -659,40 +706,50 @@ class SymbolicTracer: return target - def step_to_next(self, target, instruction, transform, lldb_state) -> bool: - if self.cross_validate: - debug(f'Evaluating register and memory transforms for {instruction} to cross-validate') - predicted_regs = transform.eval_register_transforms(lldb_state) - predicted_mems = transform.eval_memory_transforms(lldb_state) - - # Step forward - target.step() - if target.is_exited(): - return False - + def predict_next_state(self, instruction: Instruction, transform: SymbolicTransform): + debug(f'Evaluating register and memory transforms for {instruction} to cross-validate') + predicted_regs = transform.eval_register_transforms(self.target) + predicted_mems = transform.eval_memory_transforms(self.target) + return predicted_regs, predicted_mems + + def validate(self, + instruction: Instruction, + transform: SymbolicTransform, + predicted_regs: dict[str, int], + predicted_mems: dict[int, bytes]): # Verify last generated transform by comparing concrete state against # predicted values. - if self.cross_validate: - debug('Cross-validating symbolic transforms by comparing actual to predicted values') - for reg, val in predicted_regs.items(): - conc_val = lldb_state.read_register(reg) - if conc_val != val: - warn(f'Symbolic execution backend generated false equation for' - f' [{hex(instruction.addr)}]: {instruction}:' - f' Predicted {reg} = {hex(val)}, but the' - f' concrete state has value {reg} = {hex(conc_val)}.' - f'\nFaulty transformation: {transform}') - for addr, data in predicted_mems.items(): - conc_data = lldb_state.read_memory(addr, len(data)) - if conc_data != data: - warn(f'Symbolic execution backend generated false equation for' - f' [{hex(instruction.addr)}]: {instruction}: Predicted' - f' mem[{hex(addr)}:{hex(addr+len(data))}] = {data},' - f' but the concrete state has value' - f' mem[{hex(addr)}:{hex(addr+len(data))}] = {conc_data}.' - f'\nFaulty transformation: {transform}') - raise Exception() - return True + if self.target.is_exited(): + return + + debug('Cross-validating symbolic transforms by comparing actual to predicted values') + for reg, val in predicted_regs.items(): + conc_val = self.target.read_register(reg) + if conc_val != val: + warn(f'Symbolic execution backend generated false equation for' + f' [{hex(instruction.addr)}]: {instruction}:' + f' Predicted {reg} = {hex(val)}, but the' + f' concrete state has value {reg} = {hex(conc_val)}.' + f'\nFaulty transformation: {transform}') + raise Exception() + for addr, data in predicted_mems.items(): + conc_data = self.target.read_memory(addr, len(data)) + if conc_data != data: + warn(f'Symbolic execution backend generated false equation for' + f' [{hex(instruction.addr)}]: {instruction}: Predicted' + f' mem[{hex(addr)}:{hex(addr+len(data))}] = {data},' + f' but the concrete state has value' + f' mem[{hex(addr)}:{hex(addr+len(data))}] = {conc_data}.' + f'\nFaulty transformation: {transform}') + raise Exception() + + def progress(self, new_pc: int, instruction: Instruction) -> int | None: + self.target.speculate(new_pc) + if self.target.arch.is_instr_syscall(str(instruction)): + self.target.progress_execution() + if self.target.is_exited(): + return None + return self.target.read_pc() def trace(self, start_addr: int | None = None) -> Trace[SymbolicTransform]: """Execute a program and compute state transformations between executed @@ -701,21 +758,20 @@ class SymbolicTracer: :param start_addr: Address from which to start tracing. """ # Set up concrete reference state - target = self.create_debug_target() if start_addr is not None: - target.run_until(start_addr) - lldb_state = _LLDBConcreteState(target) + self.target.run_until(start_addr) - ctx = DisassemblyContext(lldb_state) + ctx = DisassemblyContext(self.target) arch = ctx.arch # Trace concolically strace: list[SymbolicTransform] = [] - while not target.is_exited(): - pc = target.read_register('pc') + while not self.target.is_exited(): + pc = self.target.read_pc() + assert(pc != 0) # Disassemble instruction at the current PC - tid = target.get_current_tid() + tid = self.target.get_current_tid() try: instruction = ctx.disassemble(pc) info(f'[{tid}] Disassembled instruction {instruction} at {hex(pc)}') @@ -724,9 +780,9 @@ class SymbolicTracer: # Try to recovery by using the LLDB disassembly instead try: - alt_disas = target.get_disassembly(pc) + alt_disas = self.target.get_disassembly(pc) instruction = Instruction.from_string(alt_disas, ctx.arch, pc, - target.get_instruction_size(pc)) + self.target.get_instruction_size(pc)) info(f'[{tid}] Disassembled instruction {instruction} at {hex(pc)}') except: if self.force: @@ -736,38 +792,32 @@ class SymbolicTracer: else: warn(f'[{tid}] Unable to disassemble instruction {hex(pc)}: {err}.' f' Skipping.') - target.step() + self.target.step() continue raise # forward exception # Run instruction - conc_state = MiasmSymbolResolver(lldb_state, ctx.loc_db) - - try: - new_pc, modified = run_instruction(instruction.instr, conc_state, ctx.lifter) - except: - if not self.force: - raise - new_pc, modified = None, {} - - # Create symbolic transform - if new_pc is None: - new_pc = pc + instruction.length + conc_state = MiasmSymbolResolver(self.target, ctx.loc_db) + + # TODO: handle None + new_pc, modified = run_instruction(instruction.instr, conc_state, ctx.lifter) + new_pc = int(new_pc) + + if self.cross_validate: + # Predict next concrete state. + # We verify the symbolic execution backend on the fly for some + # additional protection from bugs in the backend. + transform = SymbolicTransform(modified, [instruction], arch, pc, new_pc) + pred_regs, pred_mems = self.predict_next_state(instruction, transform) + self.progress(new_pc, instruction) + self.validate(instruction, transform, pred_regs, pred_mems) else: - new_pc = int(new_pc) - transform = SymbolicTransform(modified, [instruction], arch, pc, new_pc) + new_pc = self.progress(new_pc, instruction) + if new_pc is None: + continue # we're done + transform = SymbolicTransform(modified, [instruction], arch, pc, new_pc) + strace.append(transform) - if len(strace) == 0: - msg = f'Unable to collect trace for instruction {instruction}' - if not self.force: - raise Exception(msg) - else: - warn(msg) - - # Predict next concrete state. - # We verify the symbolic execution backend on the fly for some - # additional protection from bugs in the backend. - if not self.step_to_next(target, instruction.instr, transform, lldb_state): - return Trace(strace, self.env) + return Trace(strace, self.env) |