about summary refs log tree commit diff stats
diff options
context:
space:
mode:
authorTheofilos Augoustis <theofilos.augoustis@gmail.com>2023-11-28 15:47:47 +0100
committerTheofilos Augoustis <theofilos.augoustis@gmail.com>2023-11-28 15:47:47 +0100
commitffcae80c2167f271a7d733d424fbd72db8c98a93 (patch)
treeb4d72a5b7522ffe7e10f5cf625be93347cab419f
parent836e42215fda0cbd330caef2dc5fc93336d4722c (diff)
downloadfocaccia-ffcae80c2167f271a7d733d424fbd72db8c98a93.tar.gz
focaccia-ffcae80c2167f271a7d733d424fbd72db8c98a93.zip
Record symbolic transform for single instructions
Step manually through single instructions instead of full basic blocks.
Record the transformation performed by each instruction as symbolic
equations.

Co-authored-by: Theofilos Augoustis <theofilos.augoustis@gmail.com>
Co-authored-by: Nicola Crivellin <nicola.crivellin98@gmail.com>
-rw-r--r--README.md18
-rw-r--r--arch/x86.py9
-rw-r--r--miasm_test.py206
-rw-r--r--miasm_util.py23
4 files changed, 178 insertions, 78 deletions
diff --git a/README.md b/README.md
index 0397afe..65fe4ce 100644
--- a/README.md
+++ b/README.md
@@ -3,6 +3,24 @@
 This repository contains initial code for comprehensive testing of binary
 translators.
 
+## Requirements
+
+We require at least LLDB version 17 for `fs_base`/`gs_base` register support.
+
+I had to compile LLDB myself; these are the steps I had to take (you also need swig version >= 4):
+
+```
+git clone https://github.com/llvm/llvm-project <llvm-path>
+cd <llvm-path>
+cmake -S llvm -B build -DCMAKE_BUILD_TYPE=Release -DLLVM_ENABLE_PROJECTS="clang;lldb" -DLLDB_ENABLE_PYTHON=TRUE -DLLDB_ENABLE_SWIG=TRUE
+cmake --build build/ --parallel $(nproc)
+
+# Add the built LLDB python bindings to your PYTHONPATH:
+PYTHONPATH="$PYTHONPATH:$(./build/bin/lldb -P)"
+```
+
+It will take a while to compile.
+
 ## Snapshot-comparison framework
 
 The following files belong to a rough framework for the snapshot comparison engine:
diff --git a/arch/x86.py b/arch/x86.py
index 25213a0..88e6d1a 100644
--- a/arch/x86.py
+++ b/arch/x86.py
@@ -24,6 +24,7 @@ regnames = [
     'RFLAGS',
     # Segment registers
     'CS', 'DS', 'SS', 'ES', 'FS', 'GS',
+    'FS_BASE', 'GS_BASE',
     # FLAGS
     'CF', 'PF', 'AF', 'ZF', 'SF', 'TF', 'IF', 'DF', 'OF', 'IOPL', 'NT',
     # EFLAGS
@@ -80,6 +81,8 @@ class ArchX86(Arch):
         Applies certain register name aliases.
         """
         reg = super().to_regname(name)
-        if reg in regname_aliases:
-            return regname_aliases[reg]
-        return reg
+        if reg is not None:
+            return reg
+
+        # Apply custom register alias rules
+        return regname_aliases.get(name.upper(), None)
diff --git a/miasm_test.py b/miasm_test.py
index 64dc04a..36246af 100644
--- a/miasm_test.py
+++ b/miasm_test.py
@@ -1,8 +1,10 @@
 import sys
+import time
 
-from miasm.arch.x86.sem import Lifter_X86_64
-from miasm.analysis.machine import Machine
 from miasm.analysis.binary import ContainerELF
+from miasm.analysis.machine import Machine
+from miasm.arch.x86.sem import Lifter_X86_64
+from miasm.core.asmblock import AsmCFG
 from miasm.core.locationdb import LocationDB
 from miasm.ir.symbexec import SymbolicExecutionEngine, SymbolicState
 
@@ -25,6 +27,17 @@ def print_state(state: SymbolicState):
         print(f'{str(reg):10s} = {val}')
     print('=' * 80)
 
+def step_until(target: LLDBConcreteTarget, addr: int) -> list[int]:
+    """Step a concrete target to a specific instruction.
+    :return: Trace of all instructions executed.
+    """
+    trace = [target.read_register('pc')]
+    target.step()
+    while not target.is_exited() and target.read_register('pc') != addr:
+        trace.append(target.read_register('pc'))
+        target.step()
+    return trace
+
 def create_state(target: LLDBConcreteTarget) -> ProgramState:
     def standardize_flag_name(regname: str) -> str:
         regname = regname.upper()
@@ -36,8 +49,7 @@ def create_state(target: LLDBConcreteTarget) -> ProgramState:
 
     # Query and store register state
     rflags = x86.decompose_rflags(target.read_register('rflags'))
-    for reg in machine.mn.regs.all_regs_ids_no_alias:
-        regname = reg.name
+    for regname in x86.regnames:
         try:
             conc_val = target.read_register(regname)
             state.set(regname, conc_val)
@@ -61,57 +73,39 @@ def create_state(target: LLDBConcreteTarget) -> ProgramState:
 
     return state
 
-def record_concrete_states(binary) -> list[tuple[int, ProgramState]]:
-    """Record a trace of concrete program states by stepping through an
-    executable.
-    """
-    addrs = set()
-    states = []
-    target = LLDBConcreteTarget(binary)
-    while not target.is_exited():
-        addrs.add(target.read_register('pc'))
-        states.append((target.read_register('pc'), create_state(target)))
-        target.step()
-    return states
+symb_exec_time = 0
+conc_exec_time = 0
+disasm_time = 0
+
+total_time_start = time.perf_counter_ns()
 
-binary = 'test_program'
+binary = sys.argv[1]
 
 loc_db = LocationDB()
 cont = ContainerELF.from_stream(open(binary, 'rb'), loc_db)
 machine = Machine(cont.arch)
 
 pc = int(cont.entry_point)
+if len(sys.argv) > 2:
+    pc = int(sys.argv[2], 16)
 
-# Disassemble binary
-print(f'Disassembling "{binary}"...')
+# Create disassembly/lifting context
 mdis = machine.dis_engine(cont.bin_stream, loc_db=loc_db)
 mdis.follow_call = True
-asmcfg = mdis.dis_multiblock(pc)
+asmcfg = AsmCFG(loc_db)
 
-with open('full_disasm', 'w') as file:
-    print(f'Entry point: {hex(pc)}\n', file=file)
-    print_blocks(asmcfg, file)
-print(f'--- Disassembled "{binary}". Log written to "full_disasm.log".')
-
-# Lift disassembly to IR
-print(f'Lifting disassembly to IR...')
 lifter: Lifter_X86_64 = machine.lifter(loc_db)
 ircfg = lifter.new_ircfg_from_asmcfg(asmcfg)
-with open('full_ir', 'w') as file:
-    print('=' * 80, file=file)
-    for block in ircfg.blocks.values():
-        print(block, file=file)
-        print('-' * 60, file=file)
-    print('=' * 80, file=file)
-print(f'--- Lifted disassembly to IR. Log written to "full_ir.log".')
 
-# Record concrete reference states to guide symbolic execution
-print(f'Recording concrete program trace...')
-conc_trace = record_concrete_states(binary)
-conc_trace = [(a, MiasmConcreteState(s, loc_db)) for a, s in conc_trace]
-print(f'Recorded {len(conc_trace)} trace points.')
+# TODO: To implement support for unimplemented instructions, add their
+# ASM->IR implementations to the `mnemo_func` array in
+# `miasm/arch/x86/sem.py:5142`.
+#
+# For XGETBV, I might have to add the extended control register XCR0 first.
+# This might be a nontrivial patch to Miasm.
 
-def run_block(pc: int, conc_state: MiasmConcreteState) -> int | None:
+def run_block(pc: int, conc_state: MiasmConcreteState) \
+        -> tuple[int | None, list]:
     """Run a basic block.
 
     Tries to run IR blocks until the end of an ASM block/basic block is
@@ -127,66 +121,130 @@ def run_block(pc: int, conc_state: MiasmConcreteState) -> int | None:
              found. This happens when an error occurs or when the program
              exits.
     """
+    global disasm_time
+    global symb_exec_time
+
     # Start with a clean, purely symbolic state
     engine = SymbolicExecutionEngine(lifter)
 
+    # A list of symbolic transformation for each single instruction
+    symb_trace = []
+
     while True:
-        symbolic_pc = engine.run_block_at(ircfg, pc)
+        irblock = ircfg.get_block(pc)
+
+        # Initial disassembly might not find all blocks in the binary.
+        # Disassemble code ad-hoc if the current PC has not yet been
+        # disassembled.
+        if irblock is None:
+            disasm_time_start = time.perf_counter_ns()
+            cfg = mdis.dis_multiblock(pc)
+            for irblock in cfg.blocks:
+                lifter.add_asmblock_to_ircfg(irblock, ircfg)
+            disasm_time += time.perf_counter_ns() - disasm_time_start
+            print(f'Disassembled {len(cfg.blocks):4} new blocks at {hex(int(pc))}.')
+
+            irblock = ircfg.get_block(pc)
+            assert(irblock is not None)
+
+        # Execute each instruction in the current basic block and record the
+        # resulting change in program state.
+        symb_exec_time_start = time.perf_counter_ns()
+        for assignblk in irblock:
+            modified = engine.eval_assignblk(assignblk)
+            symb_trace.append((assignblk.instr.offset, modified))
+
+            # Run a single instruction
+            engine.eval_updt_assignblk(assignblk)
+
+        # Obtain the next program counter after the basic block.
+        symbolic_pc = engine.eval_expr(engine.lifter.IRDst)
 
         # The new program counter might be a symbolic value. Try to evaluate
         # it based on the last recorded concrete state at the start of the
         # current basic block.
         pc = eval_expr(symbolic_pc, conc_state)
 
-        # Initial disassembly might not find all blocks in the binary.
-        # Disassemble code ad-hoc if the new PC has not yet been disassembled.
-        if ircfg.get_block(pc) is None:
-            addr = int(pc)
-            cfg = mdis.dis_multiblock(addr)
-            for block in cfg.blocks:
-                lifter.add_asmblock_to_ircfg(block, ircfg)
-            assert(ircfg.get_block(pc) is not None)
-
-            print(f'Disassembled {len(cfg.blocks):4} new blocks at {hex(addr)}'
-                  f' (evaluated from symbolic PC {symbolic_pc}).')
+        symb_exec_time += time.perf_counter_ns() - symb_exec_time_start
 
         # If the resulting PC is an integer, i.e. a concrete address that can
         # be mapped to the assembly code, we return as we have reached the end
         # of a basic block. Otherwise we might have reached the end of an IR
         # block, in which case we keep executing until we reach the end of an
         # ASM block.
+        #
+        # Example: This happens for the REP STOS instruction, for which Miasm
+        # generates multiple IR blocks.
         try:
-            return int(symbolic_pc)
+            return int(pc), symb_trace
         except:
+            # We reach this point when the program counter is an IR block
+            # location (not an integer). That happens when single ASM
+            # instructions are translated to multiple IR instructions.
             pass
 
-last_pc = None  # Debugging info
+symb_trace = [] # The list of generated symbolic transforms per instruction
+
+conc_exec_time_start = time.perf_counter_ns()
+target = LLDBConcreteTarget(binary)
+initial_state = create_state(target)
+conc_exec_time += time.perf_counter_ns() - conc_exec_time_start
+
+if target.read_register('pc') != pc:
+    target.set_breakpoint(pc)
+    target.run()
+    target.remove_breakpoint(pc)
 
 # Run until no more states can be reached
 print(f'Re-tracing symbolically...')
 while pc is not None:
-    def step_trace(trace, pc: int):
-        for i, (addr, _) in enumerate(trace):
-            if addr == pc:
-                return trace[i:]
-        return []
-
-    assert(type(pc) is int)
-
-    # Find next trace point (the concrete trace may have stopped at more
-    # states than the symbolic trace does)
-    conc_trace = step_trace(conc_trace, pc)
-    if not conc_trace:
-        print(f'Next PC {hex(pc)} is not contained in the concrete program'
-              f' trace. Last valid PC: {hex(last_pc)}')
+    assert(target.read_register('pc') == pc)
+
+    # Run symbolic execution
+    # It uses the concrete state to resolve symbolic program counters to
+    # concrete values.
+    pc, strace = run_block(pc, MiasmConcreteState(initial_state, loc_db))
+
+    if pc is None:
         break
-    last_pc = pc
 
-    addr, initial_state = conc_trace[0]
-    assert(addr == pc)
-    conc_trace.pop(0)
+    # Step concrete target forward.
+    #
+    # The concrete target now lags behind the symbolic execution by exactly
+    # one basic block: the one that we just executed. Run the concrete
+    # execution until it reaches the new PC.
+    conc_exec_time_start = time.perf_counter_ns()
+    ctrace = step_until(target, pc)
+    conc_exec_time += time.perf_counter_ns() - conc_exec_time_start
+
+    # Sometimes, miasm generates ghost instructions at the end of basic blocks.
+    # Don't include them in the symbolic trace.
+    strace = strace[:len(ctrace)]
+    symb_trace.extend(strace)
+
+    # Use this for extensive trace debugging
+    if [a for a, _ in strace] != ctrace:
+        print(f'[WARNING] Symbolic trace and concrete trace are not equal!'
+              f'\n    symbolic: {[hex(a) for a, _ in strace]}'
+              f'\n    concrete: {[hex(a) for a in ctrace]}')
+
+    if target.is_exited():
+        print(f'Next PC {hex(pc)} is not contained in the concrete trace.')
+        break
 
-    # Run symbolic execution
-    pc = run_block(pc, initial_state)
+    # Query the new reference state for symbolic execution
+    conc_exec_time_start = time.perf_counter_ns()
+    initial_state = create_state(target)
+    conc_exec_time += time.perf_counter_ns() - conc_exec_time_start
+
+total_time = time.perf_counter_ns() - total_time_start
+other_time = total_time - symb_exec_time - conc_exec_time - disasm_time
 
+print(f'--- {len(symb_trace)} instructions traced.')
 print(f'--- No new PC found. Exiting.')
+print()
+print(f' Total time:              {total_time * 1e-6:10.3f} ms')
+print(f' Disassembly time:        {disasm_time * 1e-6:10.3f} ms')
+print(f' Symbolic execution time: {symb_exec_time * 1e-6:10.3f} ms')
+print(f' Concrete execution time: {conc_exec_time * 1e-6:10.3f} ms')
+print(f' Other:                   {other_time * 1e-6:10.3f} ms')
diff --git a/miasm_util.py b/miasm_util.py
index 31083d9..55dfad0 100644
--- a/miasm_util.py
+++ b/miasm_util.py
@@ -6,6 +6,27 @@ from miasm.expression.simplifications import expr_simp_explicit
 
 from snapshot import ProgramState
 
+def simp_segm(expr_simp, expr: ExprOp):
+    """Simplify a segmentation expression to an addition of the segment
+    register's base value and the address argument.
+    """
+    import miasm.arch.x86.regs as regs
+
+    base_regs = {
+        regs.FS: ExprId('fs_base', 64),
+        regs.GS: ExprId('gs_base', 64),
+    }
+
+    if expr.op == 'segm':
+        segm, addr = expr.args
+        assert(segm == regs.FS or segm == regs.GS)
+        return expr_simp(base_regs[segm] + addr)
+    return expr
+
+# The expression simplifier used in this module
+expr_simp = expr_simp_explicit
+expr_simp.enable_passes({ExprOp: [simp_segm]})
+
 class MiasmConcreteState:
     miasm_flag_aliases = {
         'NF':     'SF',
@@ -49,7 +70,7 @@ def eval_expr(expr: Expr, conc_state: MiasmConcreteState) -> int:
         raise TypeError("Unknown expr type")
 
     ret = visitor(expr, conc_state)
-    ret = expr_simp_explicit(ret)
+    ret = expr_simp(ret)
     assert(ret is not None)
 
     return ret