diff options
| -rw-r--r-- | focaccia/arch/arch.py | 6 | ||||
| -rw-r--r-- | focaccia/arch/x86.py | 44 | ||||
| -rw-r--r-- | focaccia/miasm_util.py | 15 | ||||
| -rw-r--r-- | focaccia/snapshot.py | 6 | ||||
| -rw-r--r-- | focaccia/symbolic.py | 116 | ||||
| -rw-r--r-- | tools/_qemu_tool.py | 23 |
6 files changed, 151 insertions, 59 deletions
diff --git a/focaccia/arch/arch.py b/focaccia/arch/arch.py index f2be5cb..e64f1fa 100644 --- a/focaccia/arch/arch.py +++ b/focaccia/arch/arch.py @@ -1,9 +1,13 @@ from typing import Iterable class Arch(): - def __init__(self, archname: str, regnames: Iterable[str]): + def __init__(self, + archname: str, + regnames: Iterable[str], + ptr_size: int): self.archname = archname self.regnames = set(name.upper() for name in regnames) + self.ptr_size = ptr_size def to_regname(self, name: str) -> str | None: """Transform a string into a standard register name. diff --git a/focaccia/arch/x86.py b/focaccia/arch/x86.py index ebe74dd..f35783b 100644 --- a/focaccia/arch/x86.py +++ b/focaccia/arch/x86.py @@ -6,32 +6,36 @@ archname = 'x86_64' # Names of registers in the architecture regnames = [ + # 8-bit + 'AL', 'CL', 'DL', 'BL', 'SPL', 'BPL', 'SIL', 'DIL', + 'AH', 'CH', 'DH', 'BH', + + # 16-bit + 'IP', + 'AX', 'BX', 'CX', 'DX', 'SI', 'DI', 'BP', 'SP', + + # 32-bit + 'EIP', + 'EAX', 'EBX', 'ECX', 'EDX', 'ESI', 'EDI', 'EBP', 'ESP', + 'EFLAGS', + + # 64-bit 'RIP', - 'RAX', - 'RBX', - 'RCX', - 'RDX', - 'RSI', - 'RDI', - 'RBP', - 'RSP', - 'R8', - 'R9', - 'R10', - 'R11', - 'R12', - 'R13', - 'R14', - 'R15', + 'RAX', 'RBX', 'RCX', 'RDX', 'RSI', 'RDI', 'RBP', 'RSP', + 'R8', 'R9', 'R10', 'R11', 'R12', 'R13', 'R14', 'R15', 'RFLAGS', # x87 float registers 'ST0', 'ST1', 'ST2', 'ST3', 'ST4', 'ST5', 'ST6', 'ST7', # Vector registers - 'YMM0', 'YMM1', 'YMM2', 'YMM3', 'YMM4', - 'YMM5', 'YMM6', 'YMM7', 'YMM8', 'YMM9', - 'YMM10', 'YMM11', 'YMM12', 'YMM13', 'YMM14', 'YMM15', + 'MM0', 'MM1', 'MM2', 'MM3', 'MM4', 'MM5', 'MM6', 'MM7', + + 'XMM0', 'XMM1', 'XMM2', 'XMM3', 'XMM4', 'XMM5', 'XMM6', 'XMM7', + 'XMM8', 'XMM9', 'XMM10', 'XMM11', 'XMM12', 'XMM13', 'XMM14', 'XMM15', + + 'YMM0', 'YMM1', 'YMM2', 'YMM3', 'YMM4', 'YMM5', 'YMM6', 'YMM7', + 'YMM8', 'YMM9', 'YMM10', 'YMM11', 'YMM12', 'YMM13', 'YMM14', 'YMM15', # Segment registers 'CS', 'DS', 'SS', 'ES', 'FS', 'GS', @@ -122,7 +126,7 @@ def compose_rflags(rflags: dict[str, int]) -> int: class ArchX86(Arch): def __init__(self): - super().__init__(archname, regnames) + super().__init__(archname, regnames, 64) def to_regname(self, name: str) -> str | None: """The X86 override of the standard register name lookup. diff --git a/focaccia/miasm_util.py b/focaccia/miasm_util.py index 7ab9c87..0e0c7e8 100644 --- a/focaccia/miasm_util.py +++ b/focaccia/miasm_util.py @@ -174,16 +174,17 @@ def _eval_cpuid(rax: ExprInt, out_reg: ExprInt): def _eval_exprop(expr, state: MiasmSymbolResolver): """Evaluate an ExprOp using the current state""" - args = [] - for oarg in expr.args: - arg = eval_expr(oarg, state) - args.append(arg) + args = [eval_expr(arg, state) for arg in expr.args] + # Special case: CPUID instruction + # Evaluate the expression to a value obtained from an an actual call to + # the CPUID instruction. Can't do this in an expression simplifier plugin + # because the arguments must be concrete. if expr.op == 'x86_cpuid': - # Can't do this in an expression simplifier plugin because the - # arguments must be concrete. - assert(len(expr.args) == 2) + assert(len(args) == 2) + assert(isinstance(args[0], ExprInt) and isinstance(args[1], ExprInt)) return _eval_cpuid(args[0], args[1]) + return ExprOp(expr.op, *args) def _eval_exprcompose(expr, state: MiasmSymbolResolver): diff --git a/focaccia/snapshot.py b/focaccia/snapshot.py index 592c2bb..104ca0a 100644 --- a/focaccia/snapshot.py +++ b/focaccia/snapshot.py @@ -19,7 +19,7 @@ class SparseMemory: Note that out-of-bound reads are possible when performed on unwritten sections of existing pages and that there is no safeguard check for them. """ - def __init__(self, page_size=4096): + def __init__(self, page_size=256): self.page_size = page_size self._pages: dict[int, bytes] = {} @@ -164,5 +164,5 @@ class ProgramState(ReadableProgramState): self.mem.write(addr, data) def __repr__(self): - return f'Snapshot ({self.arch.archname}): ' \ - + repr({r: hex(v) for r, v in self.regs.items() if v is not None}) + regs = {r: hex(v) for r, v in self.regs.items() if v is not None} + return f'Snapshot ({self.arch.archname}): {regs}' diff --git a/focaccia/symbolic.py b/focaccia/symbolic.py index 66ccaef..fdc8ea1 100644 --- a/focaccia/symbolic.py +++ b/focaccia/symbolic.py @@ -1,14 +1,15 @@ """Tools and utilities for symbolic execution with Miasm.""" from __future__ import annotations +from typing import Iterable from miasm.analysis.binary import ContainerELF from miasm.analysis.machine import Machine -from miasm.core.asmblock import AsmCFG +from miasm.core.cpu import instruction as miasm_instr from miasm.core.locationdb import LocationDB -from miasm.ir.symbexec import SymbolicExecutionEngine -from miasm.ir.ir import IRBlock from miasm.expression.expression import Expr, ExprId, ExprMem, ExprInt +from miasm.ir.ir import IRBlock +from miasm.ir.symbexec import SymbolicExecutionEngine from .arch import Arch, supported_architectures from .lldb_target import LLDBConcreteTarget, \ @@ -51,10 +52,49 @@ def eval_symbol(symbol: Expr, conc_state: ReadableProgramState) -> int: # ConcreteStateWrapper return int(res) +class Instruction: + """An instruction.""" + def __init__(self, + instr: miasm_instr, + machine: Machine, + arch: Arch, + loc_db: LocationDB | None = None): + self.arch = arch + self.machine = machine + + if loc_db is not None: + instr.args = instr.resolve_args_with_symbols(loc_db) + self.instr: miasm_instr = instr + """The underlying Miasm instruction object.""" + + assert(instr.offset is not None) + assert(instr.l is not None) + self.addr: int = instr.offset + self.length: int = instr.l + + @staticmethod + def from_bytecode(asm: bytes, arch: Arch) -> Instruction: + """Disassemble an instruction.""" + machine = Machine(arch.archname) + _instr = machine.mn.dis(asm, arch.ptr_size) + return Instruction(_instr, machine, arch, None) + + def to_bytecode(self) -> bytes: + """Assemble the instruction to byte code.""" + return self.machine.mn.asm(self.instr)[0] + + def to_string(self) -> str: + """Convert the instruction to an Intel-syntax assembly string.""" + return str(self.instr) + + def __repr__(self): + return self.to_string() + class SymbolicTransform: """A symbolic transformation mapping one program state to another.""" def __init__(self, transform: dict[Expr, Expr], + instrs: list[Instruction], arch: Arch, from_addr: int, to_addr: int): @@ -65,6 +105,8 @@ class SymbolicTransform: represents the modifications to the program state performed by this instruction. """ + self.arch = arch + self.addr = from_addr """The instruction address of the program state on which the transformation operates. Equivalent to `self.range[0]`.""" @@ -88,6 +130,10 @@ class SymbolicTransform: written to address `addr`. Memory addresses may depend on other symbolic values, such as register content, and are therefore symbolic themselves.""" + + self.instructions: list[Instruction] = instrs + """The sequence of instructions that comprise this transformation.""" + for dst, expr in transform.items(): assert(isinstance(dst, ExprMem) or isinstance(dst, ExprId)) @@ -179,6 +225,7 @@ class SymbolicTransform: self.changed_regs = new_regs self.changed_mem = new_mem self.range = (self.range[0], other.range[1]) + self.instructions.extend(other.instructions) return self @@ -292,10 +339,15 @@ class SymbolicTransform: def __repr__(self) -> str: start, end = self.range res = f'Symbolic state transformation {hex(start)} -> {hex(end)}:\n' + res += ' [Symbols]\n' for reg, expr in self.changed_regs.items(): - res += f' {reg:6s} = {expr}\n' + res += f' {reg:6s} = {expr}\n' for addr, expr in self.changed_mem.items(): - res += f' {ExprMem(addr, expr.size)} = {expr}\n' + res += f' {ExprMem(addr, expr.size)} = {expr}\n' + res += ' [Instructions]\n' + for inst in self.instructions: + res += f' {inst}\n' + return res[:-1] # Remove trailing newline def parse_symbolic_transform(string: str) -> SymbolicTransform: @@ -305,21 +357,40 @@ def parse_symbolic_transform(string: str) -> SymbolicTransform: import json from miasm.expression.parser import str_to_expr as parse + def decode_inst(obj: Iterable[int], arch: Arch): + b = b''.join(i.to_bytes(1) for i in obj) + return Instruction.from_bytecode(b, arch) + data = json.loads(string) + arch = supported_architectures[data['arch']] + start_addr = int(data['from_addr']) + end_addr = int(data['to_addr']) - # We can use a None-arch because it's only used when the dict is not empty - t = SymbolicTransform({}, None, int(data['from_addr']), int(data['to_addr'])) + t = SymbolicTransform({}, [], arch, start_addr, end_addr) t.changed_regs = { name: parse(val) for name, val in data['regs'].items() } t.changed_mem = { parse(addr): parse(val) for addr, val in data['mem'].items() } + t.instructions = [decode_inst(b, arch) for b in data['instructions']] + + # Recover the instructions' address information + addr = t.addr + for inst in t.instructions: + inst.addr = addr + addr += inst.length return t def serialize_symbolic_transform(t: SymbolicTransform) -> str: """Serialize a symbolic transformation.""" import json + + def encode_inst(inst: Instruction): + return [int(b) for b in inst.to_bytecode()] + return json.dumps({ + 'arch': t.arch.archname, 'from_addr': t.range[0], 'to_addr': t.range[1], + 'instructions': [encode_inst(inst) for inst in t.instructions], 'regs': { name: repr(expr) for name, expr in t.changed_regs.items() }, 'mem': { repr(addr): repr(val) for addr, val in t.changed_mem.items() }, }) @@ -344,14 +415,18 @@ class DisassemblyContext: cont = ContainerELF.from_stream(bin_file, self.loc_db) self.machine = Machine(cont.arch) + if self.machine.name not in supported_architectures: + raise NotImplementedError(f'[ERROR] {self.machine.name} is not' + f' supported.') + self.arch = supported_architectures[self.machine.name] + self.entry_point = cont.entry_point # Create disassembly/lifting context self.lifter = self.machine.lifter(self.loc_db) self.mdis = self.machine.dis_engine(cont.bin_stream, loc_db=self.loc_db) self.mdis.follow_call = True - self.asmcfg = AsmCFG(self.loc_db) - self.ircfg = self.lifter.new_ircfg_from_asmcfg(self.asmcfg) + self.ircfg = self.lifter.new_ircfg() def get_irblock(self, addr: int) -> IRBlock | None: irblock = self.ircfg.get_block(addr) @@ -378,7 +453,7 @@ class DisassemblyContext: class DisassemblyError(Exception): def __init__(self, - partial_trace: list[tuple[int, SymbolicTransform]], + partial_trace: list[tuple[Instruction, dict]], faulty_pc: int, err_msg: str): self.partial_trace = partial_trace @@ -386,7 +461,7 @@ class DisassemblyError(Exception): self.err_msg = err_msg def _run_block(pc: int, conc_state: MiasmSymbolResolver, ctx: DisassemblyContext) \ - -> tuple[int | None, list[dict]]: + -> tuple[int | None, list[tuple[Instruction, dict]]]: """Run a basic block. Tries to run IR blocks until the end of an ASM block/basic block is @@ -425,7 +500,8 @@ def _run_block(pc: int, conc_state: MiasmSymbolResolver, ctx: DisassemblyContext # the block. _engine = SymbolicExecutionEngine(ctx.lifter) modified = _engine.eval_assignblk(assignblk) - symb_trace.append((assignblk.instr.offset, modified)) + instr = Instruction(assignblk.instr, ctx.machine, ctx.arch, ctx.loc_db) + symb_trace.append((instr, modified)) # Run a single instruction engine.eval_updt_assignblk(assignblk) @@ -498,13 +574,7 @@ def collect_symbolic_trace(binary: str, :param args: Arguments to the program. """ ctx = DisassemblyContext(binary) - - # Find corresponding architecture - mach_name = ctx.machine.name - if mach_name not in supported_architectures: - print(f'[ERROR] {mach_name} is not supported. Returning.') - return [] - arch = supported_architectures[mach_name] + arch = ctx.arch if start_addr is None: pc = ctx.entry_point @@ -518,7 +588,7 @@ def collect_symbolic_trace(binary: str, target.remove_breakpoint(pc) conc_state = _LLDBConcreteState(target, arch) - symb_trace = [] # The resulting list of symbolic transforms per instruction + symb_trace: list[tuple[Instruction, dict]] = [] # The resulting list of symbolic transforms per instruction # Run until no more states can be reached while pc is not None: @@ -549,7 +619,7 @@ def collect_symbolic_trace(binary: str, if target.is_exited(): break - symb_trace.append((err.faulty_pc, {})) # Generate empty transform + # Do not generate an entry in the symbolic trace at all pc = target.read_register('pc') continue @@ -579,8 +649,8 @@ def collect_symbolic_trace(binary: str, res = [] for (start, diff), (end, _) in zip(symb_trace[:-1], symb_trace[1:]): - res.append(SymbolicTransform(diff, arch, start, end)) + res.append(SymbolicTransform(diff, [start], arch, start.addr, end.addr)) start, diff = symb_trace[-1] - res.append(SymbolicTransform(diff, arch, start, start)) + res.append(SymbolicTransform(diff, [start], arch, start.addr, start.addr)) return res diff --git a/tools/_qemu_tool.py b/tools/_qemu_tool.py index ff7dc38..9659f0e 100644 --- a/tools/_qemu_tool.py +++ b/tools/_qemu_tool.py @@ -7,11 +7,10 @@ work to do. """ import gdb -import platform from typing import Iterable import focaccia.parser as parser -from focaccia.arch import supported_architectures, Arch +from focaccia.arch import supported_architectures from focaccia.compare import compare_symbolic from focaccia.snapshot import ProgramState, ReadableProgramState, \ RegisterAccessError, MemoryAccessError @@ -28,7 +27,21 @@ class GDBProgramState(ReadableProgramState): def read_register(self, reg: str) -> int: try: val = self._frame.read_register(reg.lower()) - return int(val) & 0xffffffffffffffff # force int to be unsigned + size = val.type.sizeof * 8 + + # For vector registers, we have to assemble Python's + # arbitrary-length integers from GDB's fixed-size integers + # ourselves: + if size > 64: # Value is a vector + num_longs = size // 64 + vals = val[f'v{num_longs}_int64'] + res = 0 + for i in range(num_longs): + val = int(vals[i].cast(gdb.lookup_type('unsigned long'))) + res += val << i * 64 + return res + # For non-vector values, just return the 64-bit value + return int(val.cast(gdb.lookup_type('unsigned long'))) except ValueError as err: from focaccia.arch import x86 rflags = int(self._frame.read_register('eflags')) @@ -186,10 +199,11 @@ def collect_conc_trace(gdb: GDBServerStateIterator, \ state_iter = iter(gdb) cur_state = next(state_iter) symb_i = 0 + + # An online trace matching algorithm. while True: try: pc = cur_state.read_register('pc') - assert(pc is not None) while pc != strace[symb_i].addr: next_i = find_index(strace[symb_i+1:], pc, lambda t: t.addr) @@ -202,7 +216,6 @@ def collect_conc_trace(gdb: GDBServerStateIterator, \ f' reference trace.') cur_state = next(state_iter) pc = cur_state.read_register('pc') - assert(pc is not None) continue # Otherwise, jump to the next matching symbolic state |