about summary refs log tree commit diff stats
diff options
context:
space:
mode:
authorTheofilos Augoustis <theofilos.augoustis@gmail.com>2024-02-14 15:37:37 +0100
committerTheofilos Augoustis <theofilos.augoustis@gmail.com>2024-02-14 15:37:37 +0100
commit403d5e60ad6f438d9e844819258fbbd2724fab57 (patch)
treeda8c79c84eea81a107c9571f1a650bc35dc36709
parentf66f6edbd7d8918b497ccc17b78d4a77425fcb66 (diff)
downloadfocaccia-403d5e60ad6f438d9e844819258fbbd2724fab57.tar.gz
focaccia-403d5e60ad6f438d9e844819258fbbd2724fab57.zip
Rewrite symbolic tracing algorithm
Co-authored-by: Theofilos Augoustis <theofilos.augoustis@gmail.com>
Co-authored-by: Nicola Crivellin <nicola.crivellin98@gmail.com>
-rw-r--r--focaccia/miasm_util.py37
-rw-r--r--focaccia/symbolic.py276
-rw-r--r--focaccia/utils.py17
3 files changed, 139 insertions, 191 deletions
diff --git a/focaccia/miasm_util.py b/focaccia/miasm_util.py
index 0e0c7e8..4299f87 100644
--- a/focaccia/miasm_util.py
+++ b/focaccia/miasm_util.py
@@ -26,9 +26,35 @@ def simp_segm(expr_simp, expr: ExprOp):
         return expr_simp(base_regs[segm] + addr)
     return expr
 
+def simp_fadd(expr_simp, expr: ExprOp):
+    from .utils import float_bits_to_uint, uint_bits_to_float, \
+                       double_bits_to_uint, uint_bits_to_double
+
+    if expr.op != 'fadd':
+        return expr
+
+    assert(len(expr.args) == 2)
+    lhs, rhs = expr.args
+    if lhs.is_int() and rhs.is_int():
+        assert(lhs.size == rhs.size)
+        if lhs.size == 32:
+            uint_to_float = uint_bits_to_float
+            float_to_uint = float_bits_to_uint
+        elif lhs.size == 64:
+            uint_to_float = uint_bits_to_double
+            float_to_uint = double_bits_to_uint
+        else:
+            raise NotImplementedError('fadd on values of size not in {32, 64}')
+
+        res = float_to_uint(uint_to_float(lhs.arg) + uint_to_float(rhs.arg))
+        return expr_simp(ExprInt(res, expr.size))
+    return expr
+
 # The expression simplifier used in this module
 expr_simp = expr_simp_explicit
-expr_simp.enable_passes({ExprOp: [simp_segm]})
+expr_simp.enable_passes({
+    ExprOp: [simp_segm, simp_fadd],
+})
 
 class MiasmSymbolResolver:
     """Resolves atomic symbols to some state."""
@@ -126,23 +152,18 @@ def _eval_exprmem(expr: ExprMem, state: MiasmSymbolResolver):
     """Evaluate an ExprMem using the current state.
     This function first evaluates the memory pointer value.
     """
-    # TODO: Implement cases with more than 64 bytes.
-    #
-    # The symbolic memory class used in SymbolicExecutionEngine may return
-    # ExprCompose objects here. Maybe I should use that.
-    assert(expr.size <= 64)
     assert(expr.size % 8 == 0)
 
     addr = eval_expr(expr.ptr, state)
     if not addr.is_int():
         return expr
 
-    mem = state.resolve_memory(int(addr), int(expr.size / 8))
+    mem = state.resolve_memory(int(addr), expr.size // 8)
     if mem is None:
         return expr
 
     assert(len(mem) * 8 == expr.size)
-    return ExprInt(int.from_bytes(mem), expr.size)
+    return ExprInt(int.from_bytes(mem, byteorder='big'), expr.size)
 
 def _eval_exprcond(expr, state: MiasmSymbolResolver):
     """Evaluate an ExprCond using the current state"""
diff --git a/focaccia/symbolic.py b/focaccia/symbolic.py
index fdc8ea1..029e979 100644
--- a/focaccia/symbolic.py
+++ b/focaccia/symbolic.py
@@ -2,13 +2,15 @@
 
 from __future__ import annotations
 from typing import Iterable
+import logging
+import sys
 
 from miasm.analysis.binary import ContainerELF
 from miasm.analysis.machine import Machine
 from miasm.core.cpu import instruction as miasm_instr
 from miasm.core.locationdb import LocationDB
 from miasm.expression.expression import Expr, ExprId, ExprMem, ExprInt
-from miasm.ir.ir import IRBlock
+from miasm.ir.ir import Lifter
 from miasm.ir.symbexec import SymbolicExecutionEngine
 
 from .arch import Arch, supported_architectures
@@ -19,6 +21,8 @@ from .miasm_util import MiasmSymbolResolver, eval_expr
 from .snapshot import ProgramState, ReadableProgramState, \
                       RegisterAccessError, MemoryAccessError
 
+warn = logging.warning
+
 def eval_symbol(symbol: Expr, conc_state: ReadableProgramState) -> int:
     """Evaluate a symbol based on a concrete reference state.
 
@@ -76,11 +80,13 @@ class Instruction:
     def from_bytecode(asm: bytes, arch: Arch) -> Instruction:
         """Disassemble an instruction."""
         machine = Machine(arch.archname)
+        assert(machine.mn is not None)
         _instr = machine.mn.dis(asm, arch.ptr_size)
         return Instruction(_instr, machine, arch, None)
 
     def to_bytecode(self) -> bytes:
         """Assemble the instruction to byte code."""
+        assert(self.machine.mn is not None)
         return self.machine.mn.asm(self.instr)[0]
 
     def to_string(self) -> str:
@@ -333,7 +339,7 @@ class SymbolicTransform:
         for addr, expr in self.changed_mem.items():
             addr = eval_symbol(addr, conc_state)
             length = int(expr.size / 8)
-            res[addr] = eval_symbol(expr, conc_state).to_bytes(length)
+            res[addr] = eval_symbol(expr, conc_state).to_bytes(length, byteorder='big')
         return res
 
     def __repr__(self) -> str:
@@ -358,7 +364,7 @@ def parse_symbolic_transform(string: str) -> SymbolicTransform:
     from miasm.expression.parser import str_to_expr as parse
 
     def decode_inst(obj: Iterable[int], arch: Arch):
-        b = b''.join(i.to_bytes(1) for i in obj)
+        b = b''.join(i.to_bytes(1, byteorder='big') for i in obj)
         return Instruction.from_bytecode(b, arch)
 
     data = json.loads(string)
@@ -395,17 +401,6 @@ def serialize_symbolic_transform(t: SymbolicTransform) -> str:
         'mem': { repr(addr): repr(val) for addr, val in t.changed_mem.items() },
     })
 
-def _step_until(target: LLDBConcreteTarget, addr: int) -> list[int]:
-    """Step a concrete target to a specific instruction.
-    :return: Trace of all instructions executed.
-    """
-    trace = [target.read_register('pc')]
-    target.step()
-    while not target.is_exited() and target.read_register('pc') != addr:
-        trace.append(target.read_register('pc'))
-        target.step()
-    return trace
-
 class DisassemblyContext:
     def __init__(self, binary):
         self.loc_db = LocationDB()
@@ -413,122 +408,78 @@ class DisassemblyContext:
         # Load the binary
         with open(binary, 'rb') as bin_file:
             cont = ContainerELF.from_stream(bin_file, self.loc_db)
+        self.entry_point = cont.entry_point
 
+        # Determine the binary's architecture
         self.machine = Machine(cont.arch)
         if self.machine.name not in supported_architectures:
             raise NotImplementedError(f'[ERROR] {self.machine.name} is not'
                                       f' supported.')
         self.arch = supported_architectures[self.machine.name]
-
-        self.entry_point = cont.entry_point
+        """Focaccia's description of an instruction set architecture."""
 
         # Create disassembly/lifting context
-        self.lifter = self.machine.lifter(self.loc_db)
         self.mdis = self.machine.dis_engine(cont.bin_stream, loc_db=self.loc_db)
         self.mdis.follow_call = True
-        self.ircfg = self.lifter.new_ircfg()
-
-    def get_irblock(self, addr: int) -> IRBlock | None:
-        irblock = self.ircfg.get_block(addr)
+        self.lifter = self.machine.lifter(self.loc_db)
 
-        # Initial disassembly might not find all blocks in the binary.
-        # Disassemble code ad-hoc if the current address has not yet been
-        # disassembled.
-        if irblock is None:
-            cfg = self.mdis.dis_multiblock(addr)
-            for asmblock in cfg.blocks:
-                try:
-                    self.lifter.add_asmblock_to_ircfg(asmblock, self.ircfg)
-                except NotImplementedError as err:
-                    print(f'[WARNING] Unable to disassemble block at'
-                          f' {hex(asmblock.get_range()[0])}:'
-                          f' [Not implemented] {err}')
-                    pass
-            print(f'Disassembled {len(cfg.blocks):5} new blocks at {hex(int(addr))}.')
-            irblock = self.ircfg.get_block(addr)
-
-        # Might still be None if disassembly/lifting failed for the block
-        # at `addr`.
-        return irblock
-
-class DisassemblyError(Exception):
-    def __init__(self,
-                 partial_trace: list[tuple[Instruction, dict]],
-                 faulty_pc: int,
-                 err_msg: str):
-        self.partial_trace = partial_trace
-        self.faulty_pc = faulty_pc
-        self.err_msg = err_msg
-
-def _run_block(pc: int, conc_state: MiasmSymbolResolver, ctx: DisassemblyContext) \
-        -> tuple[int | None, list[tuple[Instruction, dict]]]:
+def run_instruction(instr: miasm_instr,
+                    conc_state: MiasmSymbolResolver,
+                    lifter: Lifter) \
+        -> tuple[ExprInt | None, dict[Expr, Expr]]:
     """Run a basic block.
 
     Tries to run IR blocks until the end of an ASM block/basic block is
     reached. Skips 'virtual' blocks that purely exist in the IR.
 
-    :param pc:         A program counter at which we start executing.
-    :param conc_state: A concrete reference state at `pc`. Used to resolve
-                       symbolic program counters, i.e. to 'guide' the symbolic
-                       execution on the correct path. This is the concrete part
-                       of our concolic execution.
-
-    :return: The next program counter. None if no next program counter can be
-             found. This happens when an error occurs or when the program
-             exits.
+    :param instr:      The instruction to run.
+    :param conc_state: A concrete reference state at `pc = instr.offset`. Used
+                       to resolve symbolic program counters, i.e. to 'guide' the
+                       symbolic execution on the correct path. This is the
+                       concrete part of our concolic execution.
+    :param lifter:     A lifter of the appropriate architecture. Get this from a
+                       `DisassemblyContext` or a `Machine`.
+
+    :return: The next program counter and a symbolic state. None if no next
+             program counter can be found. This happens when an error occurs or
+             when the program exits. The state represents the transformation
+             which the instruction applies to a program state.
     """
-    # Start with a clean, purely symbolic state
-    engine = SymbolicExecutionEngine(ctx.lifter)
-
-    # A list of symbolic transformation for each single instruction
-    symb_trace = []
-
-    while True:
-        irblock = ctx.get_irblock(pc)
+    def execute_location(loc, base_state: dict | None) -> tuple[ExprInt | None, dict]:
+        # Query the location's IR block
+        irblock = ircfg.get_block(loc)
         if irblock is None:
-            raise DisassemblyError(
-                symb_trace,
-                pc,
-                f'[ERROR] Unable to disassemble block at {hex(pc)}.'
-            )
-
-        # Execute each instruction in the current basic block and record the
-        # resulting change in program state.
-        for assignblk in irblock:
-            # A clean engine for the single-instruction diff, otherwise
-            # it concatenates the current instruction to the previous ones in
-            # the block.
-            _engine = SymbolicExecutionEngine(ctx.lifter)
-            modified = _engine.eval_assignblk(assignblk)
-            instr = Instruction(assignblk.instr, ctx.machine, ctx.arch, ctx.loc_db)
-            symb_trace.append((instr, modified))
-
-            # Run a single instruction
-            engine.eval_updt_assignblk(assignblk)
-
-        # Obtain the next program counter after the basic block.
-        symbolic_pc = engine.eval_expr(engine.lifter.IRDst)
-
-        # The new program counter might be a symbolic value. Try to evaluate
-        # it based on the last recorded concrete state at the start of the
-        # current basic block.
-        pc = eval_expr(symbolic_pc, conc_state)
-
-        # If the resulting PC is an integer, i.e. a concrete address that can
-        # be mapped to the assembly code, we return as we have reached the end
-        # of a basic block. Otherwise we might have reached the end of an IR
-        # block, in which case we keep executing until we reach the end of an
-        # ASM block.
-        #
-        # Example: This happens for the REP STOS instruction, for which Miasm
-        # generates multiple IR blocks.
-        try:
-            return int(pc), symb_trace
-        except:
-            # We reach this point when the program counter is an IR block
-            # location (not an integer). That happens when single ASM
-            # instructions are translated to multiple IR instructions.
-            pass
+            # Weirdly, this never seems to happen. I don't know why, though.
+            return None, base_state if base_state is not None else {}
+
+        # Apply IR block to the current state
+        engine = SymbolicExecutionEngine(lifter, state=base_state)
+        new_pc = engine.eval_updt_irblock(irblock)
+        modified = dict(engine.modified())
+
+        # Resolve the symbolic PC to a concrete value based on the
+        # concrete program state before this instruction
+        new_pc = eval_expr(new_pc, conc_state)
+        if new_pc.is_loc():
+            # Run chained IR blocks until a real program counter is reached
+            new_pc, modified = execute_location(new_pc, modified)
+
+        assert(new_pc is not None and isinstance(new_pc, ExprInt))
+        return new_pc, modified
+
+    # Lift instruction to IR
+    ircfg = lifter.new_ircfg()
+    try:
+        loc = lifter.add_instr_to_ircfg(instr, ircfg, None, False)
+    except NotImplementedError as err:
+        warn(f'[WARNING] Unable to lift instruction {instr}: {err}. Skipping.')
+        return None, {}  # Create an empty transform for the instruction
+
+    # Execute instruction symbolically
+    new_pc, modified = execute_location(loc, None)
+    modified[lifter.pc] = new_pc  # Add PC update to state
+
+    return new_pc, modified
 
 class _LLDBConcreteState(ReadableProgramState):
     """A wrapper around `LLDBConcreteTarget` that provides access via a
@@ -576,81 +527,40 @@ def collect_symbolic_trace(binary: str,
     ctx = DisassemblyContext(binary)
     arch = ctx.arch
 
-    if start_addr is None:
-        pc = ctx.entry_point
-    else:
-        pc = start_addr
-
+    # Set up concrete reference state
     target = LLDBConcreteTarget(binary, args)
-    if target.read_register('pc') != pc:
-        target.set_breakpoint(pc)
-        target.run()
-        target.remove_breakpoint(pc)
-    conc_state = _LLDBConcreteState(target, arch)
-
-    symb_trace: list[tuple[Instruction, dict]] = [] # The resulting list of symbolic transforms per instruction
+    if start_addr is not None:
+        target.run_until(start_addr)
+    lldb_state = _LLDBConcreteState(target, arch)
 
-    # Run until no more states can be reached
-    while pc is not None:
-        assert(target.read_register('pc') == pc)
+    # Trace concolically
+    strace: list[SymbolicTransform] = []
+    while not target.is_exited():
+        pc = target.read_register('pc')
 
-        # Run symbolic execution
-        # It uses the concrete state to resolve symbolic program counters to
-        # concrete values.
+        # Disassemble instruction at the current PC
         try:
-            pc, strace = _run_block(
-                pc,
-                MiasmSymbolResolver(conc_state, ctx.loc_db),
-                ctx)
-        except DisassemblyError as err:
-            # This happens if we encounter an instruction that is not
-            # implemented by Miasm. Try to skip that instruction and continue
-            # at the next one.
-            print(f'[WARNING] Skipping instruction at {hex(err.faulty_pc)}...')
-
-            # First, catch up to symbolic trace if required
-            if err.faulty_pc != pc:
-                ctrace = _step_until(target, err.faulty_pc)
-                symb_trace.extend(err.partial_trace)
-                assert(len(ctrace) - 1 == len(err.partial_trace))  # no ghost instr
-
-            # Now step one more time to skip the faulty instruction
+            instr = ctx.mdis.dis_instr(pc)
+        except:
+            err = sys.exc_info()[1]
+            warn(f'[WARNING] Unable to disassemble instruction at {hex(pc)}:'
+                 f' {err}. Skipping.')
             target.step()
-            if target.is_exited():
-                break
-
-            # Do not generate an entry in the symbolic trace at all
-            pc = target.read_register('pc')
             continue
 
-        if pc is None:
-            break
-
-        # Step concrete target forward.
-        #
-        # The concrete target now lags behind the symbolic execution by exactly
-        # one basic block: the one that we just executed. Run the concrete
-        # execution until it reaches the new PC.
-        ctrace = _step_until(target, pc)
-
-        # Sometimes, miasm generates ghost instructions at the end of basic
-        # blocks. Don't include them in the symbolic trace.
-        strace = strace[:len(ctrace)]
-        symb_trace.extend(strace)
-
-        # Use this for extensive trace debugging
-        #if [a for a, _ in strace] != ctrace:
-        #    print(f'[WARNING] Symbolic trace and concrete trace are not equal!'
-        #          f'\n    symbolic: {[hex(a) for a, _ in strace]}'
-        #          f'\n    concrete: {[hex(a) for a in ctrace]}')
-
-        if target.is_exited():
-            break
-
-    res = []
-    for (start, diff), (end, _) in zip(symb_trace[:-1], symb_trace[1:]):
-        res.append(SymbolicTransform(diff, [start], arch, start.addr, end.addr))
-    start, diff = symb_trace[-1]
-    res.append(SymbolicTransform(diff, [start], arch, start.addr, start.addr))
-
-    return res
+        # Run instruction
+        conc_state = MiasmSymbolResolver(lldb_state, ctx.loc_db)
+        new_pc, modified = run_instruction(instr, conc_state, ctx.lifter)
+
+        # Create symbolic transform
+        instruction = Instruction(instr, ctx.machine, ctx.arch, ctx.loc_db)
+        if new_pc is None:
+            new_pc = pc + instruction.length
+        else:
+            new_pc = int(new_pc)
+        strace.append(SymbolicTransform(modified, [instruction], arch, pc, new_pc))
+
+        # Step forward
+        target.step()
+
+    return strace
diff --git a/focaccia/utils.py b/focaccia/utils.py
index 7173169..50251a1 100644
--- a/focaccia/utils.py
+++ b/focaccia/utils.py
@@ -1,8 +1,25 @@
+import ctypes
 import sys
 import shutil
 
 from .compare import ErrorSeverity
 
+def float_bits_to_uint(v: float) -> int:
+    """Bit-cast a float to a 32-bit integer."""
+    return ctypes.c_uint32.from_buffer(ctypes.c_float(v)).value
+
+def uint_bits_to_float(v: int) -> float:
+    """Bit-cast a 32-bit integer to a float."""
+    return ctypes.c_float.from_buffer(ctypes.c_uint32(v)).value
+
+def double_bits_to_uint(v: float) -> int:
+    """Bit-cast a double to a 64-bit integer."""
+    return ctypes.c_uint64.from_buffer(ctypes.c_double(v)).value
+
+def uint_bits_to_double(v: int) -> float:
+    """Bit-cast a 64-bit integer to a double."""
+    return ctypes.c_double.from_buffer(ctypes.c_uint64(v)).value
+
 def print_separator(separator: str = '-', stream=sys.stdout, count: int = 80):
     maxtermsize = count
     termsize = shutil.get_terminal_size((80, 20)).columns