diff options
| -rw-r--r-- | README.md | 18 | ||||
| -rw-r--r-- | arch/x86.py | 9 | ||||
| -rw-r--r-- | miasm_test.py | 206 | ||||
| -rw-r--r-- | miasm_util.py | 23 |
4 files changed, 178 insertions, 78 deletions
diff --git a/README.md b/README.md index 0397afe..65fe4ce 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,24 @@ This repository contains initial code for comprehensive testing of binary translators. +## Requirements + +We require at least LLDB version 17 for `fs_base`/`gs_base` register support. + +I had to compile LLDB myself; these are the steps I had to take (you also need swig version >= 4): + +``` +git clone https://github.com/llvm/llvm-project <llvm-path> +cd <llvm-path> +cmake -S llvm -B build -DCMAKE_BUILD_TYPE=Release -DLLVM_ENABLE_PROJECTS="clang;lldb" -DLLDB_ENABLE_PYTHON=TRUE -DLLDB_ENABLE_SWIG=TRUE +cmake --build build/ --parallel $(nproc) + +# Add the built LLDB python bindings to your PYTHONPATH: +PYTHONPATH="$PYTHONPATH:$(./build/bin/lldb -P)" +``` + +It will take a while to compile. + ## Snapshot-comparison framework The following files belong to a rough framework for the snapshot comparison engine: diff --git a/arch/x86.py b/arch/x86.py index 25213a0..88e6d1a 100644 --- a/arch/x86.py +++ b/arch/x86.py @@ -24,6 +24,7 @@ regnames = [ 'RFLAGS', # Segment registers 'CS', 'DS', 'SS', 'ES', 'FS', 'GS', + 'FS_BASE', 'GS_BASE', # FLAGS 'CF', 'PF', 'AF', 'ZF', 'SF', 'TF', 'IF', 'DF', 'OF', 'IOPL', 'NT', # EFLAGS @@ -80,6 +81,8 @@ class ArchX86(Arch): Applies certain register name aliases. """ reg = super().to_regname(name) - if reg in regname_aliases: - return regname_aliases[reg] - return reg + if reg is not None: + return reg + + # Apply custom register alias rules + return regname_aliases.get(name.upper(), None) diff --git a/miasm_test.py b/miasm_test.py index 64dc04a..36246af 100644 --- a/miasm_test.py +++ b/miasm_test.py @@ -1,8 +1,10 @@ import sys +import time -from miasm.arch.x86.sem import Lifter_X86_64 -from miasm.analysis.machine import Machine from miasm.analysis.binary import ContainerELF +from miasm.analysis.machine import Machine +from miasm.arch.x86.sem import Lifter_X86_64 +from miasm.core.asmblock import AsmCFG from miasm.core.locationdb import LocationDB from miasm.ir.symbexec import SymbolicExecutionEngine, SymbolicState @@ -25,6 +27,17 @@ def print_state(state: SymbolicState): print(f'{str(reg):10s} = {val}') print('=' * 80) +def step_until(target: LLDBConcreteTarget, addr: int) -> list[int]: + """Step a concrete target to a specific instruction. + :return: Trace of all instructions executed. + """ + trace = [target.read_register('pc')] + target.step() + while not target.is_exited() and target.read_register('pc') != addr: + trace.append(target.read_register('pc')) + target.step() + return trace + def create_state(target: LLDBConcreteTarget) -> ProgramState: def standardize_flag_name(regname: str) -> str: regname = regname.upper() @@ -36,8 +49,7 @@ def create_state(target: LLDBConcreteTarget) -> ProgramState: # Query and store register state rflags = x86.decompose_rflags(target.read_register('rflags')) - for reg in machine.mn.regs.all_regs_ids_no_alias: - regname = reg.name + for regname in x86.regnames: try: conc_val = target.read_register(regname) state.set(regname, conc_val) @@ -61,57 +73,39 @@ def create_state(target: LLDBConcreteTarget) -> ProgramState: return state -def record_concrete_states(binary) -> list[tuple[int, ProgramState]]: - """Record a trace of concrete program states by stepping through an - executable. - """ - addrs = set() - states = [] - target = LLDBConcreteTarget(binary) - while not target.is_exited(): - addrs.add(target.read_register('pc')) - states.append((target.read_register('pc'), create_state(target))) - target.step() - return states +symb_exec_time = 0 +conc_exec_time = 0 +disasm_time = 0 + +total_time_start = time.perf_counter_ns() -binary = 'test_program' +binary = sys.argv[1] loc_db = LocationDB() cont = ContainerELF.from_stream(open(binary, 'rb'), loc_db) machine = Machine(cont.arch) pc = int(cont.entry_point) +if len(sys.argv) > 2: + pc = int(sys.argv[2], 16) -# Disassemble binary -print(f'Disassembling "{binary}"...') +# Create disassembly/lifting context mdis = machine.dis_engine(cont.bin_stream, loc_db=loc_db) mdis.follow_call = True -asmcfg = mdis.dis_multiblock(pc) +asmcfg = AsmCFG(loc_db) -with open('full_disasm', 'w') as file: - print(f'Entry point: {hex(pc)}\n', file=file) - print_blocks(asmcfg, file) -print(f'--- Disassembled "{binary}". Log written to "full_disasm.log".') - -# Lift disassembly to IR -print(f'Lifting disassembly to IR...') lifter: Lifter_X86_64 = machine.lifter(loc_db) ircfg = lifter.new_ircfg_from_asmcfg(asmcfg) -with open('full_ir', 'w') as file: - print('=' * 80, file=file) - for block in ircfg.blocks.values(): - print(block, file=file) - print('-' * 60, file=file) - print('=' * 80, file=file) -print(f'--- Lifted disassembly to IR. Log written to "full_ir.log".') -# Record concrete reference states to guide symbolic execution -print(f'Recording concrete program trace...') -conc_trace = record_concrete_states(binary) -conc_trace = [(a, MiasmConcreteState(s, loc_db)) for a, s in conc_trace] -print(f'Recorded {len(conc_trace)} trace points.') +# TODO: To implement support for unimplemented instructions, add their +# ASM->IR implementations to the `mnemo_func` array in +# `miasm/arch/x86/sem.py:5142`. +# +# For XGETBV, I might have to add the extended control register XCR0 first. +# This might be a nontrivial patch to Miasm. -def run_block(pc: int, conc_state: MiasmConcreteState) -> int | None: +def run_block(pc: int, conc_state: MiasmConcreteState) \ + -> tuple[int | None, list]: """Run a basic block. Tries to run IR blocks until the end of an ASM block/basic block is @@ -127,66 +121,130 @@ def run_block(pc: int, conc_state: MiasmConcreteState) -> int | None: found. This happens when an error occurs or when the program exits. """ + global disasm_time + global symb_exec_time + # Start with a clean, purely symbolic state engine = SymbolicExecutionEngine(lifter) + # A list of symbolic transformation for each single instruction + symb_trace = [] + while True: - symbolic_pc = engine.run_block_at(ircfg, pc) + irblock = ircfg.get_block(pc) + + # Initial disassembly might not find all blocks in the binary. + # Disassemble code ad-hoc if the current PC has not yet been + # disassembled. + if irblock is None: + disasm_time_start = time.perf_counter_ns() + cfg = mdis.dis_multiblock(pc) + for irblock in cfg.blocks: + lifter.add_asmblock_to_ircfg(irblock, ircfg) + disasm_time += time.perf_counter_ns() - disasm_time_start + print(f'Disassembled {len(cfg.blocks):4} new blocks at {hex(int(pc))}.') + + irblock = ircfg.get_block(pc) + assert(irblock is not None) + + # Execute each instruction in the current basic block and record the + # resulting change in program state. + symb_exec_time_start = time.perf_counter_ns() + for assignblk in irblock: + modified = engine.eval_assignblk(assignblk) + symb_trace.append((assignblk.instr.offset, modified)) + + # Run a single instruction + engine.eval_updt_assignblk(assignblk) + + # Obtain the next program counter after the basic block. + symbolic_pc = engine.eval_expr(engine.lifter.IRDst) # The new program counter might be a symbolic value. Try to evaluate # it based on the last recorded concrete state at the start of the # current basic block. pc = eval_expr(symbolic_pc, conc_state) - # Initial disassembly might not find all blocks in the binary. - # Disassemble code ad-hoc if the new PC has not yet been disassembled. - if ircfg.get_block(pc) is None: - addr = int(pc) - cfg = mdis.dis_multiblock(addr) - for block in cfg.blocks: - lifter.add_asmblock_to_ircfg(block, ircfg) - assert(ircfg.get_block(pc) is not None) - - print(f'Disassembled {len(cfg.blocks):4} new blocks at {hex(addr)}' - f' (evaluated from symbolic PC {symbolic_pc}).') + symb_exec_time += time.perf_counter_ns() - symb_exec_time_start # If the resulting PC is an integer, i.e. a concrete address that can # be mapped to the assembly code, we return as we have reached the end # of a basic block. Otherwise we might have reached the end of an IR # block, in which case we keep executing until we reach the end of an # ASM block. + # + # Example: This happens for the REP STOS instruction, for which Miasm + # generates multiple IR blocks. try: - return int(symbolic_pc) + return int(pc), symb_trace except: + # We reach this point when the program counter is an IR block + # location (not an integer). That happens when single ASM + # instructions are translated to multiple IR instructions. pass -last_pc = None # Debugging info +symb_trace = [] # The list of generated symbolic transforms per instruction + +conc_exec_time_start = time.perf_counter_ns() +target = LLDBConcreteTarget(binary) +initial_state = create_state(target) +conc_exec_time += time.perf_counter_ns() - conc_exec_time_start + +if target.read_register('pc') != pc: + target.set_breakpoint(pc) + target.run() + target.remove_breakpoint(pc) # Run until no more states can be reached print(f'Re-tracing symbolically...') while pc is not None: - def step_trace(trace, pc: int): - for i, (addr, _) in enumerate(trace): - if addr == pc: - return trace[i:] - return [] - - assert(type(pc) is int) - - # Find next trace point (the concrete trace may have stopped at more - # states than the symbolic trace does) - conc_trace = step_trace(conc_trace, pc) - if not conc_trace: - print(f'Next PC {hex(pc)} is not contained in the concrete program' - f' trace. Last valid PC: {hex(last_pc)}') + assert(target.read_register('pc') == pc) + + # Run symbolic execution + # It uses the concrete state to resolve symbolic program counters to + # concrete values. + pc, strace = run_block(pc, MiasmConcreteState(initial_state, loc_db)) + + if pc is None: break - last_pc = pc - addr, initial_state = conc_trace[0] - assert(addr == pc) - conc_trace.pop(0) + # Step concrete target forward. + # + # The concrete target now lags behind the symbolic execution by exactly + # one basic block: the one that we just executed. Run the concrete + # execution until it reaches the new PC. + conc_exec_time_start = time.perf_counter_ns() + ctrace = step_until(target, pc) + conc_exec_time += time.perf_counter_ns() - conc_exec_time_start + + # Sometimes, miasm generates ghost instructions at the end of basic blocks. + # Don't include them in the symbolic trace. + strace = strace[:len(ctrace)] + symb_trace.extend(strace) + + # Use this for extensive trace debugging + if [a for a, _ in strace] != ctrace: + print(f'[WARNING] Symbolic trace and concrete trace are not equal!' + f'\n symbolic: {[hex(a) for a, _ in strace]}' + f'\n concrete: {[hex(a) for a in ctrace]}') + + if target.is_exited(): + print(f'Next PC {hex(pc)} is not contained in the concrete trace.') + break - # Run symbolic execution - pc = run_block(pc, initial_state) + # Query the new reference state for symbolic execution + conc_exec_time_start = time.perf_counter_ns() + initial_state = create_state(target) + conc_exec_time += time.perf_counter_ns() - conc_exec_time_start + +total_time = time.perf_counter_ns() - total_time_start +other_time = total_time - symb_exec_time - conc_exec_time - disasm_time +print(f'--- {len(symb_trace)} instructions traced.') print(f'--- No new PC found. Exiting.') +print() +print(f' Total time: {total_time * 1e-6:10.3f} ms') +print(f' Disassembly time: {disasm_time * 1e-6:10.3f} ms') +print(f' Symbolic execution time: {symb_exec_time * 1e-6:10.3f} ms') +print(f' Concrete execution time: {conc_exec_time * 1e-6:10.3f} ms') +print(f' Other: {other_time * 1e-6:10.3f} ms') diff --git a/miasm_util.py b/miasm_util.py index 31083d9..55dfad0 100644 --- a/miasm_util.py +++ b/miasm_util.py @@ -6,6 +6,27 @@ from miasm.expression.simplifications import expr_simp_explicit from snapshot import ProgramState +def simp_segm(expr_simp, expr: ExprOp): + """Simplify a segmentation expression to an addition of the segment + register's base value and the address argument. + """ + import miasm.arch.x86.regs as regs + + base_regs = { + regs.FS: ExprId('fs_base', 64), + regs.GS: ExprId('gs_base', 64), + } + + if expr.op == 'segm': + segm, addr = expr.args + assert(segm == regs.FS or segm == regs.GS) + return expr_simp(base_regs[segm] + addr) + return expr + +# The expression simplifier used in this module +expr_simp = expr_simp_explicit +expr_simp.enable_passes({ExprOp: [simp_segm]}) + class MiasmConcreteState: miasm_flag_aliases = { 'NF': 'SF', @@ -49,7 +70,7 @@ def eval_expr(expr: Expr, conc_state: MiasmConcreteState) -> int: raise TypeError("Unknown expr type") ret = visitor(expr, conc_state) - ret = expr_simp_explicit(ret) + ret = expr_simp(ret) assert(ret is not None) return ret |