diff options
| -rw-r--r-- | focaccia/miasm_util.py | 37 | ||||
| -rw-r--r-- | focaccia/symbolic.py | 276 | ||||
| -rw-r--r-- | focaccia/utils.py | 17 |
3 files changed, 139 insertions, 191 deletions
diff --git a/focaccia/miasm_util.py b/focaccia/miasm_util.py index 0e0c7e8..4299f87 100644 --- a/focaccia/miasm_util.py +++ b/focaccia/miasm_util.py @@ -26,9 +26,35 @@ def simp_segm(expr_simp, expr: ExprOp): return expr_simp(base_regs[segm] + addr) return expr +def simp_fadd(expr_simp, expr: ExprOp): + from .utils import float_bits_to_uint, uint_bits_to_float, \ + double_bits_to_uint, uint_bits_to_double + + if expr.op != 'fadd': + return expr + + assert(len(expr.args) == 2) + lhs, rhs = expr.args + if lhs.is_int() and rhs.is_int(): + assert(lhs.size == rhs.size) + if lhs.size == 32: + uint_to_float = uint_bits_to_float + float_to_uint = float_bits_to_uint + elif lhs.size == 64: + uint_to_float = uint_bits_to_double + float_to_uint = double_bits_to_uint + else: + raise NotImplementedError('fadd on values of size not in {32, 64}') + + res = float_to_uint(uint_to_float(lhs.arg) + uint_to_float(rhs.arg)) + return expr_simp(ExprInt(res, expr.size)) + return expr + # The expression simplifier used in this module expr_simp = expr_simp_explicit -expr_simp.enable_passes({ExprOp: [simp_segm]}) +expr_simp.enable_passes({ + ExprOp: [simp_segm, simp_fadd], +}) class MiasmSymbolResolver: """Resolves atomic symbols to some state.""" @@ -126,23 +152,18 @@ def _eval_exprmem(expr: ExprMem, state: MiasmSymbolResolver): """Evaluate an ExprMem using the current state. This function first evaluates the memory pointer value. """ - # TODO: Implement cases with more than 64 bytes. - # - # The symbolic memory class used in SymbolicExecutionEngine may return - # ExprCompose objects here. Maybe I should use that. - assert(expr.size <= 64) assert(expr.size % 8 == 0) addr = eval_expr(expr.ptr, state) if not addr.is_int(): return expr - mem = state.resolve_memory(int(addr), int(expr.size / 8)) + mem = state.resolve_memory(int(addr), expr.size // 8) if mem is None: return expr assert(len(mem) * 8 == expr.size) - return ExprInt(int.from_bytes(mem), expr.size) + return ExprInt(int.from_bytes(mem, byteorder='big'), expr.size) def _eval_exprcond(expr, state: MiasmSymbolResolver): """Evaluate an ExprCond using the current state""" diff --git a/focaccia/symbolic.py b/focaccia/symbolic.py index fdc8ea1..029e979 100644 --- a/focaccia/symbolic.py +++ b/focaccia/symbolic.py @@ -2,13 +2,15 @@ from __future__ import annotations from typing import Iterable +import logging +import sys from miasm.analysis.binary import ContainerELF from miasm.analysis.machine import Machine from miasm.core.cpu import instruction as miasm_instr from miasm.core.locationdb import LocationDB from miasm.expression.expression import Expr, ExprId, ExprMem, ExprInt -from miasm.ir.ir import IRBlock +from miasm.ir.ir import Lifter from miasm.ir.symbexec import SymbolicExecutionEngine from .arch import Arch, supported_architectures @@ -19,6 +21,8 @@ from .miasm_util import MiasmSymbolResolver, eval_expr from .snapshot import ProgramState, ReadableProgramState, \ RegisterAccessError, MemoryAccessError +warn = logging.warning + def eval_symbol(symbol: Expr, conc_state: ReadableProgramState) -> int: """Evaluate a symbol based on a concrete reference state. @@ -76,11 +80,13 @@ class Instruction: def from_bytecode(asm: bytes, arch: Arch) -> Instruction: """Disassemble an instruction.""" machine = Machine(arch.archname) + assert(machine.mn is not None) _instr = machine.mn.dis(asm, arch.ptr_size) return Instruction(_instr, machine, arch, None) def to_bytecode(self) -> bytes: """Assemble the instruction to byte code.""" + assert(self.machine.mn is not None) return self.machine.mn.asm(self.instr)[0] def to_string(self) -> str: @@ -333,7 +339,7 @@ class SymbolicTransform: for addr, expr in self.changed_mem.items(): addr = eval_symbol(addr, conc_state) length = int(expr.size / 8) - res[addr] = eval_symbol(expr, conc_state).to_bytes(length) + res[addr] = eval_symbol(expr, conc_state).to_bytes(length, byteorder='big') return res def __repr__(self) -> str: @@ -358,7 +364,7 @@ def parse_symbolic_transform(string: str) -> SymbolicTransform: from miasm.expression.parser import str_to_expr as parse def decode_inst(obj: Iterable[int], arch: Arch): - b = b''.join(i.to_bytes(1) for i in obj) + b = b''.join(i.to_bytes(1, byteorder='big') for i in obj) return Instruction.from_bytecode(b, arch) data = json.loads(string) @@ -395,17 +401,6 @@ def serialize_symbolic_transform(t: SymbolicTransform) -> str: 'mem': { repr(addr): repr(val) for addr, val in t.changed_mem.items() }, }) -def _step_until(target: LLDBConcreteTarget, addr: int) -> list[int]: - """Step a concrete target to a specific instruction. - :return: Trace of all instructions executed. - """ - trace = [target.read_register('pc')] - target.step() - while not target.is_exited() and target.read_register('pc') != addr: - trace.append(target.read_register('pc')) - target.step() - return trace - class DisassemblyContext: def __init__(self, binary): self.loc_db = LocationDB() @@ -413,122 +408,78 @@ class DisassemblyContext: # Load the binary with open(binary, 'rb') as bin_file: cont = ContainerELF.from_stream(bin_file, self.loc_db) + self.entry_point = cont.entry_point + # Determine the binary's architecture self.machine = Machine(cont.arch) if self.machine.name not in supported_architectures: raise NotImplementedError(f'[ERROR] {self.machine.name} is not' f' supported.') self.arch = supported_architectures[self.machine.name] - - self.entry_point = cont.entry_point + """Focaccia's description of an instruction set architecture.""" # Create disassembly/lifting context - self.lifter = self.machine.lifter(self.loc_db) self.mdis = self.machine.dis_engine(cont.bin_stream, loc_db=self.loc_db) self.mdis.follow_call = True - self.ircfg = self.lifter.new_ircfg() - - def get_irblock(self, addr: int) -> IRBlock | None: - irblock = self.ircfg.get_block(addr) + self.lifter = self.machine.lifter(self.loc_db) - # Initial disassembly might not find all blocks in the binary. - # Disassemble code ad-hoc if the current address has not yet been - # disassembled. - if irblock is None: - cfg = self.mdis.dis_multiblock(addr) - for asmblock in cfg.blocks: - try: - self.lifter.add_asmblock_to_ircfg(asmblock, self.ircfg) - except NotImplementedError as err: - print(f'[WARNING] Unable to disassemble block at' - f' {hex(asmblock.get_range()[0])}:' - f' [Not implemented] {err}') - pass - print(f'Disassembled {len(cfg.blocks):5} new blocks at {hex(int(addr))}.') - irblock = self.ircfg.get_block(addr) - - # Might still be None if disassembly/lifting failed for the block - # at `addr`. - return irblock - -class DisassemblyError(Exception): - def __init__(self, - partial_trace: list[tuple[Instruction, dict]], - faulty_pc: int, - err_msg: str): - self.partial_trace = partial_trace - self.faulty_pc = faulty_pc - self.err_msg = err_msg - -def _run_block(pc: int, conc_state: MiasmSymbolResolver, ctx: DisassemblyContext) \ - -> tuple[int | None, list[tuple[Instruction, dict]]]: +def run_instruction(instr: miasm_instr, + conc_state: MiasmSymbolResolver, + lifter: Lifter) \ + -> tuple[ExprInt | None, dict[Expr, Expr]]: """Run a basic block. Tries to run IR blocks until the end of an ASM block/basic block is reached. Skips 'virtual' blocks that purely exist in the IR. - :param pc: A program counter at which we start executing. - :param conc_state: A concrete reference state at `pc`. Used to resolve - symbolic program counters, i.e. to 'guide' the symbolic - execution on the correct path. This is the concrete part - of our concolic execution. - - :return: The next program counter. None if no next program counter can be - found. This happens when an error occurs or when the program - exits. + :param instr: The instruction to run. + :param conc_state: A concrete reference state at `pc = instr.offset`. Used + to resolve symbolic program counters, i.e. to 'guide' the + symbolic execution on the correct path. This is the + concrete part of our concolic execution. + :param lifter: A lifter of the appropriate architecture. Get this from a + `DisassemblyContext` or a `Machine`. + + :return: The next program counter and a symbolic state. None if no next + program counter can be found. This happens when an error occurs or + when the program exits. The state represents the transformation + which the instruction applies to a program state. """ - # Start with a clean, purely symbolic state - engine = SymbolicExecutionEngine(ctx.lifter) - - # A list of symbolic transformation for each single instruction - symb_trace = [] - - while True: - irblock = ctx.get_irblock(pc) + def execute_location(loc, base_state: dict | None) -> tuple[ExprInt | None, dict]: + # Query the location's IR block + irblock = ircfg.get_block(loc) if irblock is None: - raise DisassemblyError( - symb_trace, - pc, - f'[ERROR] Unable to disassemble block at {hex(pc)}.' - ) - - # Execute each instruction in the current basic block and record the - # resulting change in program state. - for assignblk in irblock: - # A clean engine for the single-instruction diff, otherwise - # it concatenates the current instruction to the previous ones in - # the block. - _engine = SymbolicExecutionEngine(ctx.lifter) - modified = _engine.eval_assignblk(assignblk) - instr = Instruction(assignblk.instr, ctx.machine, ctx.arch, ctx.loc_db) - symb_trace.append((instr, modified)) - - # Run a single instruction - engine.eval_updt_assignblk(assignblk) - - # Obtain the next program counter after the basic block. - symbolic_pc = engine.eval_expr(engine.lifter.IRDst) - - # The new program counter might be a symbolic value. Try to evaluate - # it based on the last recorded concrete state at the start of the - # current basic block. - pc = eval_expr(symbolic_pc, conc_state) - - # If the resulting PC is an integer, i.e. a concrete address that can - # be mapped to the assembly code, we return as we have reached the end - # of a basic block. Otherwise we might have reached the end of an IR - # block, in which case we keep executing until we reach the end of an - # ASM block. - # - # Example: This happens for the REP STOS instruction, for which Miasm - # generates multiple IR blocks. - try: - return int(pc), symb_trace - except: - # We reach this point when the program counter is an IR block - # location (not an integer). That happens when single ASM - # instructions are translated to multiple IR instructions. - pass + # Weirdly, this never seems to happen. I don't know why, though. + return None, base_state if base_state is not None else {} + + # Apply IR block to the current state + engine = SymbolicExecutionEngine(lifter, state=base_state) + new_pc = engine.eval_updt_irblock(irblock) + modified = dict(engine.modified()) + + # Resolve the symbolic PC to a concrete value based on the + # concrete program state before this instruction + new_pc = eval_expr(new_pc, conc_state) + if new_pc.is_loc(): + # Run chained IR blocks until a real program counter is reached + new_pc, modified = execute_location(new_pc, modified) + + assert(new_pc is not None and isinstance(new_pc, ExprInt)) + return new_pc, modified + + # Lift instruction to IR + ircfg = lifter.new_ircfg() + try: + loc = lifter.add_instr_to_ircfg(instr, ircfg, None, False) + except NotImplementedError as err: + warn(f'[WARNING] Unable to lift instruction {instr}: {err}. Skipping.') + return None, {} # Create an empty transform for the instruction + + # Execute instruction symbolically + new_pc, modified = execute_location(loc, None) + modified[lifter.pc] = new_pc # Add PC update to state + + return new_pc, modified class _LLDBConcreteState(ReadableProgramState): """A wrapper around `LLDBConcreteTarget` that provides access via a @@ -576,81 +527,40 @@ def collect_symbolic_trace(binary: str, ctx = DisassemblyContext(binary) arch = ctx.arch - if start_addr is None: - pc = ctx.entry_point - else: - pc = start_addr - + # Set up concrete reference state target = LLDBConcreteTarget(binary, args) - if target.read_register('pc') != pc: - target.set_breakpoint(pc) - target.run() - target.remove_breakpoint(pc) - conc_state = _LLDBConcreteState(target, arch) - - symb_trace: list[tuple[Instruction, dict]] = [] # The resulting list of symbolic transforms per instruction + if start_addr is not None: + target.run_until(start_addr) + lldb_state = _LLDBConcreteState(target, arch) - # Run until no more states can be reached - while pc is not None: - assert(target.read_register('pc') == pc) + # Trace concolically + strace: list[SymbolicTransform] = [] + while not target.is_exited(): + pc = target.read_register('pc') - # Run symbolic execution - # It uses the concrete state to resolve symbolic program counters to - # concrete values. + # Disassemble instruction at the current PC try: - pc, strace = _run_block( - pc, - MiasmSymbolResolver(conc_state, ctx.loc_db), - ctx) - except DisassemblyError as err: - # This happens if we encounter an instruction that is not - # implemented by Miasm. Try to skip that instruction and continue - # at the next one. - print(f'[WARNING] Skipping instruction at {hex(err.faulty_pc)}...') - - # First, catch up to symbolic trace if required - if err.faulty_pc != pc: - ctrace = _step_until(target, err.faulty_pc) - symb_trace.extend(err.partial_trace) - assert(len(ctrace) - 1 == len(err.partial_trace)) # no ghost instr - - # Now step one more time to skip the faulty instruction + instr = ctx.mdis.dis_instr(pc) + except: + err = sys.exc_info()[1] + warn(f'[WARNING] Unable to disassemble instruction at {hex(pc)}:' + f' {err}. Skipping.') target.step() - if target.is_exited(): - break - - # Do not generate an entry in the symbolic trace at all - pc = target.read_register('pc') continue - if pc is None: - break - - # Step concrete target forward. - # - # The concrete target now lags behind the symbolic execution by exactly - # one basic block: the one that we just executed. Run the concrete - # execution until it reaches the new PC. - ctrace = _step_until(target, pc) - - # Sometimes, miasm generates ghost instructions at the end of basic - # blocks. Don't include them in the symbolic trace. - strace = strace[:len(ctrace)] - symb_trace.extend(strace) - - # Use this for extensive trace debugging - #if [a for a, _ in strace] != ctrace: - # print(f'[WARNING] Symbolic trace and concrete trace are not equal!' - # f'\n symbolic: {[hex(a) for a, _ in strace]}' - # f'\n concrete: {[hex(a) for a in ctrace]}') - - if target.is_exited(): - break - - res = [] - for (start, diff), (end, _) in zip(symb_trace[:-1], symb_trace[1:]): - res.append(SymbolicTransform(diff, [start], arch, start.addr, end.addr)) - start, diff = symb_trace[-1] - res.append(SymbolicTransform(diff, [start], arch, start.addr, start.addr)) - - return res + # Run instruction + conc_state = MiasmSymbolResolver(lldb_state, ctx.loc_db) + new_pc, modified = run_instruction(instr, conc_state, ctx.lifter) + + # Create symbolic transform + instruction = Instruction(instr, ctx.machine, ctx.arch, ctx.loc_db) + if new_pc is None: + new_pc = pc + instruction.length + else: + new_pc = int(new_pc) + strace.append(SymbolicTransform(modified, [instruction], arch, pc, new_pc)) + + # Step forward + target.step() + + return strace diff --git a/focaccia/utils.py b/focaccia/utils.py index 7173169..50251a1 100644 --- a/focaccia/utils.py +++ b/focaccia/utils.py @@ -1,8 +1,25 @@ +import ctypes import sys import shutil from .compare import ErrorSeverity +def float_bits_to_uint(v: float) -> int: + """Bit-cast a float to a 32-bit integer.""" + return ctypes.c_uint32.from_buffer(ctypes.c_float(v)).value + +def uint_bits_to_float(v: int) -> float: + """Bit-cast a 32-bit integer to a float.""" + return ctypes.c_float.from_buffer(ctypes.c_uint32(v)).value + +def double_bits_to_uint(v: float) -> int: + """Bit-cast a double to a 64-bit integer.""" + return ctypes.c_uint64.from_buffer(ctypes.c_double(v)).value + +def uint_bits_to_double(v: int) -> float: + """Bit-cast a 64-bit integer to a double.""" + return ctypes.c_double.from_buffer(ctypes.c_uint64(v)).value + def print_separator(separator: str = '-', stream=sys.stdout, count: int = 80): maxtermsize = count termsize = shutil.get_terminal_size((80, 20)).columns |