about summary refs log tree commit diff stats
path: root/symbolic.py
diff options
context:
space:
mode:
authorTheofilos Augoustis <theofilos.augoustis@gmail.com>2023-12-14 17:03:59 +0100
committerTheofilos Augoustis <theofilos.augoustis@gmail.com>2023-12-14 17:03:59 +0100
commit194f3d6f2ebdc7b0631fdaaeb8451142b052ccb0 (patch)
tree20806871433db331bdfed66ddadfadecbea2b7c4 /symbolic.py
parent4a5584d8f69d8ff511285387971d8cbf803f16b7 (diff)
downloadfocaccia-194f3d6f2ebdc7b0631fdaaeb8451142b052ccb0.tar.gz
focaccia-194f3d6f2ebdc7b0631fdaaeb8451142b052ccb0.zip
Implement symbolic comparison and match traces via Miasm
Co-authored-by: Theofilos Augoustis <theofilos.augoustis@gmail.com>
Co-authored-by: Nicola Crivellin <nicola.crivellin98@gmail.com>
Diffstat (limited to '')
-rw-r--r--symbolic.py171
1 files changed, 115 insertions, 56 deletions
diff --git a/symbolic.py b/symbolic.py
index 2a328fd..b005c5e 100644
--- a/symbolic.py
+++ b/symbolic.py
@@ -8,9 +8,10 @@ from miasm.analysis.machine import Machine
 from miasm.core.asmblock import AsmCFG
 from miasm.core.locationdb import LocationDB
 from miasm.ir.symbexec import SymbolicExecutionEngine
+from miasm.ir.ir import IRBlock
 from miasm.expression.expression import Expr, ExprId, ExprMem, ExprInt
 
-from lldb_target import LLDBConcreteTarget, record_snapshot
+from lldb_target import LLDBConcreteTarget
 from miasm_util import MiasmConcreteState, eval_expr
 from snapshot import ProgramState
 from arch import Arch, supported_architectures
@@ -36,11 +37,14 @@ class SymbolicTransform:
             -> dict[int, bytes]:
         raise NotImplementedError('calc_memory_transform is abstract.')
 
+    def __repr__(self) -> str:
+        start, end = self.range
+        return f'Symbolic state transformation {hex(start)} -> {hex(end)}'
+
 class MiasmSymbolicTransform(SymbolicTransform):
     def __init__(self,
-                 transform: dict[ExprId, Expr],
+                 transform: dict[Expr, Expr],
                  arch: Arch,
-                 loc_db: LocationDB,
                  start_addr: int,
                  end_addr: int):
         """
@@ -55,6 +59,8 @@ class MiasmSymbolicTransform(SymbolicTransform):
         self.regs_diff: dict[str, Expr] = {}
         self.mem_diff: dict[ExprMem, Expr] = {}
         for dst, expr in transform.items():
+            assert(isinstance(dst, ExprMem) or isinstance(dst, ExprId))
+
             if isinstance(dst, ExprMem):
                 self.mem_diff[dst] = expr
             else:
@@ -64,14 +70,16 @@ class MiasmSymbolicTransform(SymbolicTransform):
                     self.regs_diff[regname] = expr
 
         self.arch = arch
-        self.loc_db = loc_db
 
     def concat(self, other: MiasmSymbolicTransform) -> Self:
-        class MiasmSymbolicState:
+        class MiasmSymbolicState(MiasmConcreteState):
             """Drop-in replacement for MiasmConcreteState in eval_expr that
             returns the current transform's symbolic equations instead of
             symbolic values. Calling eval_expr with this effectively nests the
             transformation into the concatenated transformation.
+
+            We inherit from `MiasmSymbolicTransform` only for the purpose of
+            correct type checking.
             """
             def __init__(self, transform: MiasmSymbolicTransform):
                 self.transform = transform
@@ -110,7 +118,9 @@ class MiasmSymbolicTransform(SymbolicTransform):
 
     def calc_register_transform(self, conc_state: ProgramState) \
             -> dict[str, int]:
-        ref_state = MiasmConcreteState(conc_state, self.loc_db)
+        # Construct a dummy location DB. At this point, expressions should
+        # never contain IR locations.
+        ref_state = MiasmConcreteState(conc_state, LocationDB())
 
         res = {}
         for regname, expr in self.regs_diff.items():
@@ -119,7 +129,9 @@ class MiasmSymbolicTransform(SymbolicTransform):
 
     def calc_memory_transform(self, conc_state: ProgramState) \
             -> dict[int, bytes]:
-        ref_state = MiasmConcreteState(conc_state, self.loc_db)
+        # Construct a dummy location DB. At this point, expressions should
+        # never contain IR locations.
+        ref_state = MiasmConcreteState(conc_state, LocationDB())
 
         res = {}
         for addr, expr in self.mem_diff.items():
@@ -149,8 +161,58 @@ def _step_until(target: LLDBConcreteTarget, addr: int) -> list[int]:
         target.step()
     return trace
 
-def _run_block(pc: int, conc_state: MiasmConcreteState, lifter, ircfg, mdis) \
-        -> tuple[int | None, list]:
+class DisassemblyContext:
+    def __init__(self, binary):
+        self.loc_db = LocationDB()
+
+        # Load the binary
+        with open(binary, 'rb') as bin_file:
+            cont = ContainerELF.from_stream(bin_file, self.loc_db)
+
+        self.machine = Machine(cont.arch)
+        self.entry_point = cont.entry_point
+
+        # Create disassembly/lifting context
+        self.lifter = self.machine.lifter(self.loc_db)
+        self.mdis = self.machine.dis_engine(cont.bin_stream, loc_db=self.loc_db)
+        self.mdis.follow_call = True
+        self.asmcfg = AsmCFG(self.loc_db)
+        self.ircfg = self.lifter.new_ircfg_from_asmcfg(self.asmcfg)
+
+    def get_irblock(self, addr: int) -> IRBlock | None:
+        irblock = self.ircfg.get_block(addr)
+
+        # Initial disassembly might not find all blocks in the binary.
+        # Disassemble code ad-hoc if the current address has not yet been
+        # disassembled.
+        if irblock is None:
+            cfg = self.mdis.dis_multiblock(addr)
+            for asmblock in cfg.blocks:
+                try:
+                    self.lifter.add_asmblock_to_ircfg(asmblock, self.ircfg)
+                except NotImplementedError as err:
+                    print(f'[WARNING] Unable to disassemble block at'
+                          f' {hex(asmblock.get_range()[0])}:'
+                          f' [Not implemented] {err}')
+                    pass
+            print(f'Disassembled {len(cfg.blocks):5} new blocks at {hex(int(addr))}.')
+            irblock = self.ircfg.get_block(addr)
+
+        # Might still be None if disassembly/lifting failed for the block
+        # at `addr`.
+        return irblock
+
+class DisassemblyError(Exception):
+    def __init__(self,
+                 partial_trace: list[tuple[int, MiasmSymbolicTransform]],
+                 faulty_pc: int,
+                 err_msg: str):
+        self.partial_trace = partial_trace
+        self.faulty_pc = faulty_pc
+        self.err_msg = err_msg
+
+def _run_block(pc: int, conc_state: MiasmConcreteState, ctx: DisassemblyContext) \
+        -> tuple[int | None, list[dict]]:
     """Run a basic block.
 
     Tries to run IR blocks until the end of an ASM block/basic block is
@@ -166,37 +228,20 @@ def _run_block(pc: int, conc_state: MiasmConcreteState, lifter, ircfg, mdis) \
              found. This happens when an error occurs or when the program
              exits.
     """
-    global disasm_time
-    global symb_exec_time
-
     # Start with a clean, purely symbolic state
-    engine = SymbolicExecutionEngine(lifter)
+    engine = SymbolicExecutionEngine(ctx.lifter)
 
     # A list of symbolic transformation for each single instruction
     symb_trace = []
 
     while True:
-        irblock = ircfg.get_block(pc)
-
-        # Initial disassembly might not find all blocks in the binary.
-        # Disassemble code ad-hoc if the current PC has not yet been
-        # disassembled.
+        irblock = ctx.get_irblock(pc)
         if irblock is None:
-            cfg = mdis.dis_multiblock(pc)
-            for asmblock in cfg.blocks:
-                try:
-                    lifter.add_asmblock_to_ircfg(asmblock, ircfg)
-                except NotImplementedError as err:
-                    print(f'[ERROR] Unable to disassemble block at'
-                          f' {hex(asmblock.get_range()[0])}:'
-                          f' [Not implemented] {err}')
-                    pass
-
-            irblock = ircfg.get_block(pc)
-            if irblock is None:
-                print(f'[ERROR] Unable to disassemble block(s) at {hex(pc)}.')
-                raise RuntimeError()
-            print(f'Disassembled {len(cfg.blocks):4} new blocks at {hex(int(pc))}.')
+            raise DisassemblyError(
+                symb_trace,
+                pc,
+                f'[ERROR] Unable to disassemble block at {hex(pc)}.'
+            )
 
         # Execute each instruction in the current basic block and record the
         # resulting change in program state.
@@ -240,27 +285,17 @@ def collect_symbolic_trace(binary: str,
 
     :param binary: The binary to trace.
     """
-    loc_db = LocationDB()
-    with open(binary, 'rb') as bin_file:
-        cont = ContainerELF.from_stream(bin_file, loc_db)
-    machine = Machine(cont.arch)
+    ctx = DisassemblyContext(binary)
 
     # Find corresponding architecture
-    if machine.name not in supported_architectures:
-        print(f'[ERROR] {machine.name} is not supported. Returning.')
+    mach_name = ctx.machine.name
+    if mach_name not in supported_architectures:
+        print(f'[ERROR] {mach_name} is not supported. Returning.')
         return []
-    arch = supported_architectures[machine.name]
-
-    # Create disassembly/lifting context
-    mdis = machine.dis_engine(cont.bin_stream, loc_db=loc_db)
-    mdis.follow_call = True
-    asmcfg = AsmCFG(loc_db)
-
-    lifter = machine.lifter(loc_db)
-    ircfg = lifter.new_ircfg_from_asmcfg(asmcfg)
+    arch = supported_architectures[mach_name]
 
     if start_addr is None:
-        pc = cont.entry_point
+        pc = ctx.entry_point
     else:
         pc = start_addr
 
@@ -273,16 +308,40 @@ def collect_symbolic_trace(binary: str,
     symb_trace = [] # The resulting list of symbolic transforms per instruction
 
     # Run until no more states can be reached
-    initial_state = record_snapshot(target)
+    initial_state = target.record_snapshot()
     while pc is not None:
         assert(target.read_register('pc') == pc)
 
         # Run symbolic execution
         # It uses the concrete state to resolve symbolic program counters to
         # concrete values.
-        pc, strace = _run_block(
-            pc, MiasmConcreteState(initial_state, loc_db),
-            lifter, ircfg, mdis)
+        try:
+            pc, strace = _run_block(
+                pc,
+                MiasmConcreteState(initial_state, ctx.loc_db),
+                ctx
+            )
+        except DisassemblyError as err:
+            # This happens if we encounter an instruction that is not
+            # implemented by Miasm. Try to skip that instruction and continue
+            # at the next one.
+            print(f'[WARNING] Skipping instruction at {hex(err.faulty_pc)}...')
+
+            # First, catch up to symbolic trace if required
+            if err.faulty_pc != pc:
+                ctrace = _step_until(target, err.faulty_pc)
+                symb_trace.extend(err.partial_trace)
+                assert(len(ctrace) - 1 == len(err.partial_trace))  # no ghost instr
+
+            # Now step one more time to skip the faulty instruction
+            target.step()
+            if target.is_exited():
+                break
+
+            symb_trace.append((err.faulty_pc, {}))  # Generate empty transform
+            pc = target.read_register('pc')
+            initial_state = target.record_snapshot()
+            continue
 
         if pc is None:
             break
@@ -309,12 +368,12 @@ def collect_symbolic_trace(binary: str,
             break
 
         # Query the new reference state for symbolic execution
-        initial_state = record_snapshot(target)
+        initial_state = target.record_snapshot()
 
     res = []
     for (start, diff), (end, _) in zip(symb_trace[:-1], symb_trace[1:]):
-        res.append(MiasmSymbolicTransform(diff, arch, loc_db, start, end))
+        res.append(MiasmSymbolicTransform(diff, arch, start, end))
     start, diff = symb_trace[-1]
-    res.append(MiasmSymbolicTransform(diff, arch, loc_db, start, start))
+    res.append(MiasmSymbolicTransform(diff, arch, start, start))
 
     return res