diff options
| author | Theofilos Augoustis <theofilos.augoustis@gmail.com> | 2023-11-28 15:47:47 +0100 |
|---|---|---|
| committer | Theofilos Augoustis <theofilos.augoustis@gmail.com> | 2023-11-28 15:47:47 +0100 |
| commit | ffcae80c2167f271a7d733d424fbd72db8c98a93 (patch) | |
| tree | b4d72a5b7522ffe7e10f5cf625be93347cab419f /miasm_test.py | |
| parent | 836e42215fda0cbd330caef2dc5fc93336d4722c (diff) | |
| download | focaccia-ffcae80c2167f271a7d733d424fbd72db8c98a93.tar.gz focaccia-ffcae80c2167f271a7d733d424fbd72db8c98a93.zip | |
Record symbolic transform for single instructions
Step manually through single instructions instead of full basic blocks. Record the transformation performed by each instruction as symbolic equations. Co-authored-by: Theofilos Augoustis <theofilos.augoustis@gmail.com> Co-authored-by: Nicola Crivellin <nicola.crivellin98@gmail.com>
Diffstat (limited to 'miasm_test.py')
| -rw-r--r-- | miasm_test.py | 206 |
1 files changed, 132 insertions, 74 deletions
diff --git a/miasm_test.py b/miasm_test.py index 64dc04a..36246af 100644 --- a/miasm_test.py +++ b/miasm_test.py @@ -1,8 +1,10 @@ import sys +import time -from miasm.arch.x86.sem import Lifter_X86_64 -from miasm.analysis.machine import Machine from miasm.analysis.binary import ContainerELF +from miasm.analysis.machine import Machine +from miasm.arch.x86.sem import Lifter_X86_64 +from miasm.core.asmblock import AsmCFG from miasm.core.locationdb import LocationDB from miasm.ir.symbexec import SymbolicExecutionEngine, SymbolicState @@ -25,6 +27,17 @@ def print_state(state: SymbolicState): print(f'{str(reg):10s} = {val}') print('=' * 80) +def step_until(target: LLDBConcreteTarget, addr: int) -> list[int]: + """Step a concrete target to a specific instruction. + :return: Trace of all instructions executed. + """ + trace = [target.read_register('pc')] + target.step() + while not target.is_exited() and target.read_register('pc') != addr: + trace.append(target.read_register('pc')) + target.step() + return trace + def create_state(target: LLDBConcreteTarget) -> ProgramState: def standardize_flag_name(regname: str) -> str: regname = regname.upper() @@ -36,8 +49,7 @@ def create_state(target: LLDBConcreteTarget) -> ProgramState: # Query and store register state rflags = x86.decompose_rflags(target.read_register('rflags')) - for reg in machine.mn.regs.all_regs_ids_no_alias: - regname = reg.name + for regname in x86.regnames: try: conc_val = target.read_register(regname) state.set(regname, conc_val) @@ -61,57 +73,39 @@ def create_state(target: LLDBConcreteTarget) -> ProgramState: return state -def record_concrete_states(binary) -> list[tuple[int, ProgramState]]: - """Record a trace of concrete program states by stepping through an - executable. - """ - addrs = set() - states = [] - target = LLDBConcreteTarget(binary) - while not target.is_exited(): - addrs.add(target.read_register('pc')) - states.append((target.read_register('pc'), create_state(target))) - target.step() - return states +symb_exec_time = 0 +conc_exec_time = 0 +disasm_time = 0 + +total_time_start = time.perf_counter_ns() -binary = 'test_program' +binary = sys.argv[1] loc_db = LocationDB() cont = ContainerELF.from_stream(open(binary, 'rb'), loc_db) machine = Machine(cont.arch) pc = int(cont.entry_point) +if len(sys.argv) > 2: + pc = int(sys.argv[2], 16) -# Disassemble binary -print(f'Disassembling "{binary}"...') +# Create disassembly/lifting context mdis = machine.dis_engine(cont.bin_stream, loc_db=loc_db) mdis.follow_call = True -asmcfg = mdis.dis_multiblock(pc) +asmcfg = AsmCFG(loc_db) -with open('full_disasm', 'w') as file: - print(f'Entry point: {hex(pc)}\n', file=file) - print_blocks(asmcfg, file) -print(f'--- Disassembled "{binary}". Log written to "full_disasm.log".') - -# Lift disassembly to IR -print(f'Lifting disassembly to IR...') lifter: Lifter_X86_64 = machine.lifter(loc_db) ircfg = lifter.new_ircfg_from_asmcfg(asmcfg) -with open('full_ir', 'w') as file: - print('=' * 80, file=file) - for block in ircfg.blocks.values(): - print(block, file=file) - print('-' * 60, file=file) - print('=' * 80, file=file) -print(f'--- Lifted disassembly to IR. Log written to "full_ir.log".') -# Record concrete reference states to guide symbolic execution -print(f'Recording concrete program trace...') -conc_trace = record_concrete_states(binary) -conc_trace = [(a, MiasmConcreteState(s, loc_db)) for a, s in conc_trace] -print(f'Recorded {len(conc_trace)} trace points.') +# TODO: To implement support for unimplemented instructions, add their +# ASM->IR implementations to the `mnemo_func` array in +# `miasm/arch/x86/sem.py:5142`. +# +# For XGETBV, I might have to add the extended control register XCR0 first. +# This might be a nontrivial patch to Miasm. -def run_block(pc: int, conc_state: MiasmConcreteState) -> int | None: +def run_block(pc: int, conc_state: MiasmConcreteState) \ + -> tuple[int | None, list]: """Run a basic block. Tries to run IR blocks until the end of an ASM block/basic block is @@ -127,66 +121,130 @@ def run_block(pc: int, conc_state: MiasmConcreteState) -> int | None: found. This happens when an error occurs or when the program exits. """ + global disasm_time + global symb_exec_time + # Start with a clean, purely symbolic state engine = SymbolicExecutionEngine(lifter) + # A list of symbolic transformation for each single instruction + symb_trace = [] + while True: - symbolic_pc = engine.run_block_at(ircfg, pc) + irblock = ircfg.get_block(pc) + + # Initial disassembly might not find all blocks in the binary. + # Disassemble code ad-hoc if the current PC has not yet been + # disassembled. + if irblock is None: + disasm_time_start = time.perf_counter_ns() + cfg = mdis.dis_multiblock(pc) + for irblock in cfg.blocks: + lifter.add_asmblock_to_ircfg(irblock, ircfg) + disasm_time += time.perf_counter_ns() - disasm_time_start + print(f'Disassembled {len(cfg.blocks):4} new blocks at {hex(int(pc))}.') + + irblock = ircfg.get_block(pc) + assert(irblock is not None) + + # Execute each instruction in the current basic block and record the + # resulting change in program state. + symb_exec_time_start = time.perf_counter_ns() + for assignblk in irblock: + modified = engine.eval_assignblk(assignblk) + symb_trace.append((assignblk.instr.offset, modified)) + + # Run a single instruction + engine.eval_updt_assignblk(assignblk) + + # Obtain the next program counter after the basic block. + symbolic_pc = engine.eval_expr(engine.lifter.IRDst) # The new program counter might be a symbolic value. Try to evaluate # it based on the last recorded concrete state at the start of the # current basic block. pc = eval_expr(symbolic_pc, conc_state) - # Initial disassembly might not find all blocks in the binary. - # Disassemble code ad-hoc if the new PC has not yet been disassembled. - if ircfg.get_block(pc) is None: - addr = int(pc) - cfg = mdis.dis_multiblock(addr) - for block in cfg.blocks: - lifter.add_asmblock_to_ircfg(block, ircfg) - assert(ircfg.get_block(pc) is not None) - - print(f'Disassembled {len(cfg.blocks):4} new blocks at {hex(addr)}' - f' (evaluated from symbolic PC {symbolic_pc}).') + symb_exec_time += time.perf_counter_ns() - symb_exec_time_start # If the resulting PC is an integer, i.e. a concrete address that can # be mapped to the assembly code, we return as we have reached the end # of a basic block. Otherwise we might have reached the end of an IR # block, in which case we keep executing until we reach the end of an # ASM block. + # + # Example: This happens for the REP STOS instruction, for which Miasm + # generates multiple IR blocks. try: - return int(symbolic_pc) + return int(pc), symb_trace except: + # We reach this point when the program counter is an IR block + # location (not an integer). That happens when single ASM + # instructions are translated to multiple IR instructions. pass -last_pc = None # Debugging info +symb_trace = [] # The list of generated symbolic transforms per instruction + +conc_exec_time_start = time.perf_counter_ns() +target = LLDBConcreteTarget(binary) +initial_state = create_state(target) +conc_exec_time += time.perf_counter_ns() - conc_exec_time_start + +if target.read_register('pc') != pc: + target.set_breakpoint(pc) + target.run() + target.remove_breakpoint(pc) # Run until no more states can be reached print(f'Re-tracing symbolically...') while pc is not None: - def step_trace(trace, pc: int): - for i, (addr, _) in enumerate(trace): - if addr == pc: - return trace[i:] - return [] - - assert(type(pc) is int) - - # Find next trace point (the concrete trace may have stopped at more - # states than the symbolic trace does) - conc_trace = step_trace(conc_trace, pc) - if not conc_trace: - print(f'Next PC {hex(pc)} is not contained in the concrete program' - f' trace. Last valid PC: {hex(last_pc)}') + assert(target.read_register('pc') == pc) + + # Run symbolic execution + # It uses the concrete state to resolve symbolic program counters to + # concrete values. + pc, strace = run_block(pc, MiasmConcreteState(initial_state, loc_db)) + + if pc is None: break - last_pc = pc - addr, initial_state = conc_trace[0] - assert(addr == pc) - conc_trace.pop(0) + # Step concrete target forward. + # + # The concrete target now lags behind the symbolic execution by exactly + # one basic block: the one that we just executed. Run the concrete + # execution until it reaches the new PC. + conc_exec_time_start = time.perf_counter_ns() + ctrace = step_until(target, pc) + conc_exec_time += time.perf_counter_ns() - conc_exec_time_start + + # Sometimes, miasm generates ghost instructions at the end of basic blocks. + # Don't include them in the symbolic trace. + strace = strace[:len(ctrace)] + symb_trace.extend(strace) + + # Use this for extensive trace debugging + if [a for a, _ in strace] != ctrace: + print(f'[WARNING] Symbolic trace and concrete trace are not equal!' + f'\n symbolic: {[hex(a) for a, _ in strace]}' + f'\n concrete: {[hex(a) for a in ctrace]}') + + if target.is_exited(): + print(f'Next PC {hex(pc)} is not contained in the concrete trace.') + break - # Run symbolic execution - pc = run_block(pc, initial_state) + # Query the new reference state for symbolic execution + conc_exec_time_start = time.perf_counter_ns() + initial_state = create_state(target) + conc_exec_time += time.perf_counter_ns() - conc_exec_time_start + +total_time = time.perf_counter_ns() - total_time_start +other_time = total_time - symb_exec_time - conc_exec_time - disasm_time +print(f'--- {len(symb_trace)} instructions traced.') print(f'--- No new PC found. Exiting.') +print() +print(f' Total time: {total_time * 1e-6:10.3f} ms') +print(f' Disassembly time: {disasm_time * 1e-6:10.3f} ms') +print(f' Symbolic execution time: {symb_exec_time * 1e-6:10.3f} ms') +print(f' Concrete execution time: {conc_exec_time * 1e-6:10.3f} ms') +print(f' Other: {other_time * 1e-6:10.3f} ms') |