about summary refs log tree commit diff stats
path: root/symbolic.py
diff options
context:
space:
mode:
Diffstat (limited to 'symbolic.py')
-rw-r--r--symbolic.py392
1 files changed, 234 insertions, 158 deletions
diff --git a/symbolic.py b/symbolic.py
index 56857d7..a5e1f70 100644
--- a/symbolic.py
+++ b/symbolic.py
@@ -1,174 +1,250 @@
-"""Tools and utilities for symbolic execution with angr."""
-
-import angr
-import claripy as cp
-from angr.exploration_techniques import Symbion
-
-from arch import Arch, x86
-from interpreter import eval as eval_symbol, SymbolResolver
-from lldb_target import LLDBConcreteTarget
-
-def symbolize_state(state: angr.SimState,
-                    arch: Arch = x86.ArchX86(),
-                    exclude: list[str] = ['RIP', 'RBP', 'RSP'],
-                    stack_name: str = 'stack',
-                    stack_size: int = 0x10) \
-        -> angr.SimState:
-    """Create a copy of a SimState and replace most of it with symbolic
-    values.
-
-    Leaves pc, rbp, and rsp concrete by default. This can be configured with
-    the `exclude` parameter. Add the string 'stack' to the exclude list to
-    prevent stack memory from being replaced with a symbolic buffer.
-
-    :return: A symbolized SymState object.
-    """
-    _exclude = set(exclude)
-    state = state.copy()
-
-    if stack_name not in _exclude:
-        symb_stack = cp.BVS(stack_name, stack_size * 8, explicit_name=True)
-        state.memory.store(state.regs.rsp - stack_size, symb_stack)
-
-    for reg in arch.regnames:
-        if reg not in _exclude:
-            symb_val = cp.BVS(reg, 64, explicit_name=True)
-            try:
-                state.regs.__setattr__(reg.lower(), symb_val)
-            except AttributeError:
-                pass
-    return state
+"""Tools and utilities for symbolic execution with Miasm."""
+
+from miasm.analysis.binary import ContainerELF
+from miasm.analysis.machine import Machine
+from miasm.core.asmblock import AsmCFG
+from miasm.core.locationdb import LocationDB
+from miasm.ir.symbexec import SymbolicExecutionEngine
+from miasm.expression.expression import Expr, ExprId, ExprMem
+
+from lldb_target import LLDBConcreteTarget, record_snapshot
+from miasm_util import MiasmConcreteState, eval_expr
+from snapshot import ProgramState
 
 class SymbolicTransform:
+    def __init__(self, from_addr: int, to_addr: int):
+        self.addr = from_addr
+        self.range = (from_addr, to_addr)
+
+    def calc_register_transform(self, conc_state: ProgramState) \
+            -> dict[str, int]:
+        raise NotImplementedError('calc_register_transform is abstract.')
+
+    def calc_memory_transform(self, conc_state: ProgramState) \
+            -> dict[int, bytes]:
+        raise NotImplementedError('calc_memory_transform is abstract.')
+
+class MiasmSymbolicTransform(SymbolicTransform):
     def __init__(self,
-                 state: angr.SimState,
-                 first_inst: int,
-                 last_inst: int,
-                 end_inst: int):
+                 transform: dict[ExprId, Expr],
+                 loc_db: LocationDB,
+                 start_addr: int,
+                 end_addr: int):
         """
         :param state: The symbolic transformation in the form of a SimState
                       object.
-        :param first_inst: An instruction address. The transformation operates
-                           on the program state *before* this instruction is
-                           executed.
-        :param last_inst:  An instruction address. The last instruction that
-                           is included in the transformation. This may be equal
-                           to `prev_state` if the `SymbolicTransform`
-                           represents the work done by a single instruction.
-                           The transformation includes all instructions in the
-                           range `[first_inst, last_inst]` (note the inclusive
-                           right bound) of the specific program trace.
-        :param end_inst:   An instruction address. The address of the *next*
-                           instruction executed on the state that results from
-                           the transformation.
+        :param first_inst: An instruction address. The transformation
+                           represents the modifications to the program state
+                           performed by this instruction.
         """
-        self.state = state
-        self.start_addr = first_inst
-        self.last_inst = last_inst
-        self.end_addr = end_inst
-
-    def eval_register_transform(self, regname: str, resolver: SymbolResolver):
-        """
-        :param regname:  Name of the register to evaluate.
-        :param resolver: A provider for the values to be plugged into the
-                         symbolic equation.
-
-        :raise angr.SimConcreteRegisterError: If the state contains no register
-                                              named `regname`.
-        """
-        return eval_symbol(resolver, self.state.regs.get(regname))
+        super().__init__(start_addr, end_addr)
+
+        self.regs_diff: dict[str, Expr] = {}
+        self.mem_diff: dict[ExprMem, Expr] = {}
+        for id, expr in transform.items():
+            if isinstance(id, ExprMem):
+                self.mem_diff[id.ptr] = expr
+            elif id.name != 'IRDst':
+                assert(isinstance(id, ExprId))
+                self.regs_diff[id.name] = expr
+
+        self.loc_db = loc_db
+
+    def calc_register_transform(self, conc_state: ProgramState) \
+            -> dict[str, int]:
+        ref_state = MiasmConcreteState(conc_state, self.loc_db)
+
+        res = {}
+        for regname, expr in self.regs_diff.items():
+            res[regname] = eval_expr(expr, ref_state)
+        return res
+
+    def calc_memory_transform(self, conc_state: ProgramState) \
+            -> dict[int, bytes]:
+        ref_state = MiasmConcreteState(conc_state, self.loc_db)
+
+        res = {}
+        for addr, expr in self.mem_diff.items():
+            addr = int(eval_expr(addr, ref_state))
+            length = int(expr.size / 8)
+            res[addr] = int(eval_expr(expr, ref_state)).to_bytes(length)
+        return res
 
     def __repr__(self) -> str:
-        return f'Symbolic state transformation: \
-                 {hex(self.start_addr)} -> {hex(self.end_addr)}'
+        return f'Symbolic state transformation for instruction \
+                 {hex(self.addr)}.'
 
-def collect_symbolic_trace(binary: str, trace: list[int]) \
-    -> list[SymbolicTransform]:
+def _step_until(target: LLDBConcreteTarget, addr: int) -> list[int]:
+    """Step a concrete target to a specific instruction.
+    :return: Trace of all instructions executed.
+    """
+    trace = [target.read_register('pc')]
+    target.step()
+    while not target.is_exited() and target.read_register('pc') != addr:
+        trace.append(target.read_register('pc'))
+        target.step()
+    return trace
+
+def _run_block(pc: int, conc_state: MiasmConcreteState, lifter, ircfg, mdis) \
+        -> tuple[int | None, list]:
+    """Run a basic block.
+
+    Tries to run IR blocks until the end of an ASM block/basic block is
+    reached. Skips 'virtual' blocks that purely exist in the IR.
+
+    :param pc:         A program counter at which we start executing.
+    :param conc_state: A concrete reference state at `pc`. Used to resolve
+                       symbolic program counters, i.e. to 'guide' the symbolic
+                       execution on the correct path. This is the concrete part
+                       of our concolic execution.
+
+    :return: The next program counter. None if no next program counter can be
+             found. This happens when an error occurs or when the program
+             exits.
+    """
+    global disasm_time
+    global symb_exec_time
+
+    # Start with a clean, purely symbolic state
+    engine = SymbolicExecutionEngine(lifter)
+
+    # A list of symbolic transformation for each single instruction
+    symb_trace = []
+
+    while True:
+        irblock = ircfg.get_block(pc)
+
+        # Initial disassembly might not find all blocks in the binary.
+        # Disassemble code ad-hoc if the current PC has not yet been
+        # disassembled.
+        if irblock is None:
+            cfg = mdis.dis_multiblock(pc)
+            for asmblock in cfg.blocks:
+                try:
+                    lifter.add_asmblock_to_ircfg(asmblock, ircfg)
+                except NotImplementedError as err:
+                    print(f'[ERROR] Unable to disassemble block at'
+                          f' {hex(asmblock.get_range()[0])}:'
+                          f' [Not implemented] {err}')
+                    pass
+
+            irblock = ircfg.get_block(pc)
+            if irblock is None:
+                print(f'[ERROR] Unable to disassemble block(s) at {hex(pc)}.')
+                raise RuntimeError()
+            print(f'Disassembled {len(cfg.blocks):4} new blocks at {hex(int(pc))}.')
+
+        # Execute each instruction in the current basic block and record the
+        # resulting change in program state.
+        for assignblk in irblock:
+            modified = engine.eval_assignblk(assignblk)
+            symb_trace.append((assignblk.instr.offset, modified))
+
+            # Run a single instruction
+            engine.eval_updt_assignblk(assignblk)
+
+        # Obtain the next program counter after the basic block.
+        symbolic_pc = engine.eval_expr(engine.lifter.IRDst)
+
+        # The new program counter might be a symbolic value. Try to evaluate
+        # it based on the last recorded concrete state at the start of the
+        # current basic block.
+        pc = eval_expr(symbolic_pc, conc_state)
+
+        # If the resulting PC is an integer, i.e. a concrete address that can
+        # be mapped to the assembly code, we return as we have reached the end
+        # of a basic block. Otherwise we might have reached the end of an IR
+        # block, in which case we keep executing until we reach the end of an
+        # ASM block.
+        #
+        # Example: This happens for the REP STOS instruction, for which Miasm
+        # generates multiple IR blocks.
+        try:
+            return int(pc), symb_trace
+        except:
+            # We reach this point when the program counter is an IR block
+            # location (not an integer). That happens when single ASM
+            # instructions are translated to multiple IR instructions.
+            pass
+
+def collect_symbolic_trace(binary: str,
+                           argv: list[str],
+                           start_addr: int | None = None
+                           ) -> list[SymbolicTransform]:
     """Execute a program and compute state transformations between executed
     instructions.
 
     :param binary: The binary to trace.
-    :param trace:  A program trace that symbolic execution shall follow.
     """
-    target = LLDBConcreteTarget(binary)
-    proj = angr.Project(binary,
-                        concrete_target=target,
-                        use_sim_procedures=False)
-
-    entry_state = proj.factory.entry_state(addr=trace[0])
-    entry_state.options.add(angr.options.SYMBION_KEEP_STUBS_ON_SYNC)
-    entry_state.options.add(angr.options.SYMBION_SYNC_CLE)
-
-    # We keep a history of concrete states at their addresses because of the
-    # backtracking approach described below.
-    concrete_states = {}
-
-    # All recorded symbolic transformations
-    result = []
-
-    for (cur_idx, cur_inst), next_inst in zip(enumerate(trace[0:-1]), trace[1:]):
-        # The last instruction included in the generated transformation
-        last_inst = cur_inst
-
-        symbion = proj.factory.simgr(entry_state)
-        symbion.use_technique(Symbion(find=[cur_inst]))
-
-        try:
-            if cur_inst != entry_state.addr:
-                conc_exploration = symbion.run()
-            else:
-                symbion.move('active', 'found')
-                conc_exploration = symbion
-        except angr.AngrError as err:
-            print(f'Angr error: {err} Returning partial result.')
-            return result
-        conc_state = conc_exploration.found[0]
-        entry_state = conc_state
-
-        concrete_states[conc_state.addr] = conc_state.copy()
-
-        # Start symbolic execution with the concrete ('truth') state and try
-        # to reach the next instruction in the trace
-        #
-        # -- Notes --
-        # It does not even work when I feed the entirely concrete state
-        # `conc_state` that I receive from Symbion into the symbolic simgr.
-        simgr = proj.factory.simgr(symbolize_state(conc_state))
-        symb_exploration = simgr.explore(find=next_inst)
-
-        # Symbolic execution can't handle starting at some jump instructions.
-        # When this occurs, we re-start symbolic execution at an earlier
-        # instruction.
-        #
-        # Example:
-        #   0x401155      cmp   -0x4(%rbp),%eax
-        #   0x401158      jle   0x401162
-        #   ...
-        #   0x401162      addl  $0x1337,-0xc(%rbp)
+    loc_db = LocationDB()
+    with open(binary, 'rb') as bin_file:
+        cont = ContainerELF.from_stream(bin_file, loc_db)
+    machine = Machine(cont.arch)
+
+    # Create disassembly/lifting context
+    mdis = machine.dis_engine(cont.bin_stream, loc_db=loc_db)
+    mdis.follow_call = True
+    asmcfg = AsmCFG(loc_db)
+
+    lifter = machine.lifter(loc_db)
+    ircfg = lifter.new_ircfg_from_asmcfg(asmcfg)
+
+    if start_addr is None:
+        pc = cont.entry_point
+    else:
+        pc = start_addr
+
+    target = LLDBConcreteTarget(binary, argv)
+    if target.read_register('pc') != pc:
+        target.set_breakpoint(pc)
+        target.run()
+        target.remove_breakpoint(pc)
+
+    symb_trace = [] # The resulting list of symbolic transforms per instruction
+
+    # Run until no more states can be reached
+    initial_state = record_snapshot(target)
+    while pc is not None:
+        assert(target.read_register('pc') == pc)
+
+        # Run symbolic execution
+        # It uses the concrete state to resolve symbolic program counters to
+        # concrete values.
+        pc, strace = _run_block(
+            pc, MiasmConcreteState(initial_state, loc_db),
+            lifter, ircfg, mdis)
+
+        if pc is None:
+            break
+
+        # Step concrete target forward.
         #
-        # Here, symbolic execution can't find a valid state at `0x401162` when
-        # starting at `0x401158`, but it finds it successfully when starting at
-        # `0x401155`.
-        while len(symb_exploration.found) == 0 and cur_idx > 0:
-            print(f'[INFO] Symbolic execution can\'t reach address'
-                  f' {hex(next_inst)} from {hex(cur_inst)}.'
-                  f' Attempting to reach it from {hex(trace[cur_idx - 1])}...')
-            cur_idx -= 1
-            cur_inst = trace[cur_idx]
-            conc_state = concrete_states[cur_inst]
-            simgr = proj.factory.simgr(symbolize_state(conc_state))
-            symb_exploration = simgr.explore(find=next_inst)
-
-        if len(symb_exploration.found) == 0:
-            print(f'Symbolic execution can\'t reach address {hex(next_inst)}.'
-                  ' Exiting.')
-            exit(1)
-
-        result.append(SymbolicTransform(
-            symb_exploration.found[0],
-            cur_inst,
-            last_inst,
-            next_inst
-        ))
-
-    return result
+        # The concrete target now lags behind the symbolic execution by exactly
+        # one basic block: the one that we just executed. Run the concrete
+        # execution until it reaches the new PC.
+        ctrace = _step_until(target, pc)
+
+        # Sometimes, miasm generates ghost instructions at the end of basic
+        # blocks. Don't include them in the symbolic trace.
+        strace = strace[:len(ctrace)]
+        symb_trace.extend(strace)
+
+        # Use this for extensive trace debugging
+        if [a for a, _ in strace] != ctrace:
+            print(f'[WARNING] Symbolic trace and concrete trace are not equal!'
+                  f'\n    symbolic: {[hex(a) for a, _ in strace]}'
+                  f'\n    concrete: {[hex(a) for a in ctrace]}')
+
+        if target.is_exited():
+            break
+
+        # Query the new reference state for symbolic execution
+        initial_state = record_snapshot(target)
+
+    res = []
+    for (start, diff), (end, _) in zip(symb_trace[:-1], symb_trace[1:]):
+        res.append(MiasmSymbolicTransform(diff, loc_db, start, end))
+    start, diff = symb_trace[-1]
+    res.append(MiasmSymbolicTransform(diff, loc_db, start, start))
+
+    return res