diff options
Diffstat (limited to 'miasm_test.py')
| -rw-r--r-- | miasm_test.py | 172 |
1 files changed, 72 insertions, 100 deletions
diff --git a/miasm_test.py b/miasm_test.py index 7ec76a9..64dc04a 100644 --- a/miasm_test.py +++ b/miasm_test.py @@ -1,21 +1,16 @@ import sys -from typing import Any from miasm.arch.x86.sem import Lifter_X86_64 from miasm.analysis.machine import Machine -from miasm.analysis.binary import Container, ContainerELF -from miasm.core.asmblock import disasmEngine, AsmCFG -from miasm.core.interval import interval +from miasm.analysis.binary import ContainerELF from miasm.core.locationdb import LocationDB -from miasm.expression.expression import ExprId, ExprInt, ExprLoc from miasm.ir.symbexec import SymbolicExecutionEngine, SymbolicState -from miasm.ir.ir import IRBlock, AsmBlock -from miasm.analysis.dse import DSEEngine +from arch import x86 from lldb_target import LLDBConcreteTarget, SimConcreteMemoryError, \ SimConcreteRegisterError -from arch import x86 -from miasm_util import MiasmProgramState, eval_expr +from miasm_util import MiasmConcreteState, eval_expr +from snapshot import ProgramState def print_blocks(asmcfg, file=sys.stdout): print('=' * 80, file=file) @@ -30,95 +25,54 @@ def print_state(state: SymbolicState): print(f'{str(reg):10s} = {val}') print('=' * 80) -def flag_names_to_miasm(regs: dict[str, Any]) -> dict: - """Convert standard flag names to Miasm's names. - - :param regs: Modified in-place. - :return: Returns `regs`. - """ - regs['NF'] = regs.pop('SF') - regs['I_F'] = regs.pop('IF') - regs['IOPL_F'] = regs.pop('IOPL') - regs['I_D'] = regs.pop('ID') - return regs +def create_state(target: LLDBConcreteTarget) -> ProgramState: + def standardize_flag_name(regname: str) -> str: + regname = regname.upper() + if regname in MiasmConcreteState.miasm_flag_aliases: + return MiasmConcreteState.miasm_flag_aliases[regname] + return regname -def disasm_elf(addr, mdis: disasmEngine) -> AsmCFG: - """Try to disassemble all contents of an ELF file. - - Based on the full-disassembly algorithm in - `https://github.com/cea-sec/miasm/blob/master/example/disasm/full.py` - (as of commit `a229f4e`). - - :return: An asmcfg. - """ - # Settings for the engine - mdis.follow_call = True - - # Initial run - asmcfg = mdis.dis_multiblock(addr) - - todo = [addr] - done = set() - done_interval = interval() - - while todo: - while todo: - ad = todo.pop(0) - if ad in done: - continue - done.add(ad) - asmcfg = mdis.dis_multiblock(ad, asmcfg) - - for block in asmcfg.blocks: - for l in block.lines: - done_interval += interval([(l.offset, l.offset + l.l)]) - - # Process recursive functions - for block in asmcfg.blocks: - instr = block.get_subcall_instr() - if not instr: - continue - for dest in instr.getdstflow(mdis.loc_db): - if not dest.is_loc(): - continue - offset = mdis.loc_db.get_location_offset(dest.loc_key) - todo.append(offset) - - # Disassemble all: - for _, b in done_interval.intervals: - if b in done: - continue - todo.append(b) - - return asmcfg - -def create_state(target: LLDBConcreteTarget) -> MiasmProgramState: - regs: dict[ExprId, ExprInt] = {} - mem = [] + state = ProgramState(x86.ArchX86()) # Query and store register state - rflags = target.read_register('rflags') - rflags = flag_names_to_miasm(x86.decompose_rflags(rflags)) + rflags = x86.decompose_rflags(target.read_register('rflags')) for reg in machine.mn.regs.all_regs_ids_no_alias: - regname = reg.name.upper() # Make flag names upper case, too + regname = reg.name try: conc_val = target.read_register(regname) - regs[reg] = ExprInt(conc_val, reg.size) + state.set(regname, conc_val) + except KeyError: + pass except SimConcreteRegisterError: + regname = standardize_flag_name(regname) if regname in rflags: - regs[reg] = ExprInt(rflags[regname], reg.size) + state.set(regname, rflags[regname]) # Query and store memory state for mapping in target.get_mappings(): assert(mapping.end_address > mapping.start_address) size = mapping.end_address - mapping.start_address try: - mem_state = target.read_memory(mapping.start_address, size) + data = target.read_memory(mapping.start_address, size) + state.write_memory(mapping.start_address, data) except SimConcreteMemoryError: - mem_state = f'<unable to access "{mapping.name}">' - mem.append((mapping, mem_state)) + # Unable to read memory from mapping + pass + + return state - return MiasmProgramState(regs, mem) +def record_concrete_states(binary) -> list[tuple[int, ProgramState]]: + """Record a trace of concrete program states by stepping through an + executable. + """ + addrs = set() + states = [] + target = LLDBConcreteTarget(binary) + while not target.is_exited(): + addrs.add(target.read_register('pc')) + states.append((target.read_register('pc'), create_state(target))) + target.step() + return states binary = 'test_program' @@ -131,7 +85,9 @@ pc = int(cont.entry_point) # Disassemble binary print(f'Disassembling "{binary}"...') mdis = machine.dis_engine(cont.bin_stream, loc_db=loc_db) -asmcfg = disasm_elf(pc, mdis) +mdis.follow_call = True +asmcfg = mdis.dis_multiblock(pc) + with open('full_disasm', 'w') as file: print(f'Entry point: {hex(pc)}\n', file=file) print_blocks(asmcfg, file) @@ -149,19 +105,13 @@ with open('full_ir', 'w') as file: print('=' * 80, file=file) print(f'--- Lifted disassembly to IR. Log written to "full_ir.log".') -def record_concrete_states(binary): - states = {} - target = LLDBConcreteTarget(binary) - while not target.is_exited(): - states[target.read_register('pc')] = create_state(target) - target.step() - return states - +# Record concrete reference states to guide symbolic execution print(f'Recording concrete program trace...') -conc_states = record_concrete_states(binary) -print(f'Recorded {len(conc_states)} trace points.') +conc_trace = record_concrete_states(binary) +conc_trace = [(a, MiasmConcreteState(s, loc_db)) for a, s in conc_trace] +print(f'Recorded {len(conc_trace)} trace points.') -def run_block(pc: int, conc_state: MiasmProgramState) -> int | None: +def run_block(pc: int, conc_state: MiasmConcreteState) -> int | None: """Run a basic block. Tries to run IR blocks until the end of an ASM block/basic block is @@ -186,11 +136,19 @@ def run_block(pc: int, conc_state: MiasmProgramState) -> int | None: # The new program counter might be a symbolic value. Try to evaluate # it based on the last recorded concrete state at the start of the # current basic block. - pc = eval_expr(symbolic_pc, conc_state, loc_db) + pc = eval_expr(symbolic_pc, conc_state) + + # Initial disassembly might not find all blocks in the binary. + # Disassemble code ad-hoc if the new PC has not yet been disassembled. if ircfg.get_block(pc) is None: - print(f'Unable to access IR block at PC {pc}' - f' (evaluated from the expression PC = {symbolic_pc}).') - return None + addr = int(pc) + cfg = mdis.dis_multiblock(addr) + for block in cfg.blocks: + lifter.add_asmblock_to_ircfg(block, ircfg) + assert(ircfg.get_block(pc) is not None) + + print(f'Disassembled {len(cfg.blocks):4} new blocks at {hex(addr)}' + f' (evaluated from symbolic PC {symbolic_pc}).') # If the resulting PC is an integer, i.e. a concrete address that can # be mapped to the assembly code, we return as we have reached the end @@ -207,14 +165,28 @@ last_pc = None # Debugging info # Run until no more states can be reached print(f'Re-tracing symbolically...') while pc is not None: + def step_trace(trace, pc: int): + for i, (addr, _) in enumerate(trace): + if addr == pc: + return trace[i:] + return [] + assert(type(pc) is int) - if pc not in conc_states: + + # Find next trace point (the concrete trace may have stopped at more + # states than the symbolic trace does) + conc_trace = step_trace(conc_trace, pc) + if not conc_trace: print(f'Next PC {hex(pc)} is not contained in the concrete program' f' trace. Last valid PC: {hex(last_pc)}') break last_pc = pc - initial_state = conc_states[pc] + addr, initial_state = conc_trace[0] + assert(addr == pc) + conc_trace.pop(0) + + # Run symbolic execution pc = run_block(pc, initial_state) print(f'--- No new PC found. Exiting.') |