about summary refs log tree commit diff stats
path: root/miasm_test.py
diff options
context:
space:
mode:
authorTheofilos Augoustis <theofilos.augoustis@gmail.com>2023-11-27 14:08:55 +0100
committerTheofilos Augoustis <theofilos.augoustis@gmail.com>2023-11-27 14:08:55 +0100
commit836e42215fda0cbd330caef2dc5fc93336d4722c (patch)
tree0b9ce5cca67c511b74b9ae91a8fda2fc0a35e65c /miasm_test.py
parent5d51b4fe0bb41bc9e86c5775de35a9aef023fec5 (diff)
downloadfocaccia-836e42215fda0cbd330caef2dc5fc93336d4722c.tar.gz
focaccia-836e42215fda0cbd330caef2dc5fc93336d4722c.zip
Add memory storage capabilities to `ProgramState`
The `SparseMemory` class represents a program's memory. While the user
can read from and write to arbitrary memory addresses, it manages its
memory in pages/chunks internally. This is a tradeoff between space
consumption (this solution might have a memory overhead) and lookup
speed of individual memory addresses.

Add two small unit tests for `SparseMemory`.
Diffstat (limited to 'miasm_test.py')
-rw-r--r--miasm_test.py172
1 files changed, 72 insertions, 100 deletions
diff --git a/miasm_test.py b/miasm_test.py
index 7ec76a9..64dc04a 100644
--- a/miasm_test.py
+++ b/miasm_test.py
@@ -1,21 +1,16 @@
 import sys
-from typing import Any
 
 from miasm.arch.x86.sem import Lifter_X86_64
 from miasm.analysis.machine import Machine
-from miasm.analysis.binary import Container, ContainerELF
-from miasm.core.asmblock import disasmEngine, AsmCFG
-from miasm.core.interval import interval
+from miasm.analysis.binary import ContainerELF
 from miasm.core.locationdb import LocationDB
-from miasm.expression.expression import ExprId, ExprInt, ExprLoc
 from miasm.ir.symbexec import SymbolicExecutionEngine, SymbolicState
-from miasm.ir.ir import IRBlock, AsmBlock
-from miasm.analysis.dse import DSEEngine
 
+from arch import x86
 from lldb_target import LLDBConcreteTarget, SimConcreteMemoryError, \
                         SimConcreteRegisterError
-from arch import x86
-from miasm_util import MiasmProgramState, eval_expr
+from miasm_util import MiasmConcreteState, eval_expr
+from snapshot import ProgramState
 
 def print_blocks(asmcfg, file=sys.stdout):
     print('=' * 80, file=file)
@@ -30,95 +25,54 @@ def print_state(state: SymbolicState):
         print(f'{str(reg):10s} = {val}')
     print('=' * 80)
 
-def flag_names_to_miasm(regs: dict[str, Any]) -> dict:
-    """Convert standard flag names to Miasm's names.
-
-    :param regs: Modified in-place.
-    :return: Returns `regs`.
-    """
-    regs['NF']     = regs.pop('SF')
-    regs['I_F']    = regs.pop('IF')
-    regs['IOPL_F'] = regs.pop('IOPL')
-    regs['I_D']    = regs.pop('ID')
-    return regs
+def create_state(target: LLDBConcreteTarget) -> ProgramState:
+    def standardize_flag_name(regname: str) -> str:
+        regname = regname.upper()
+        if regname in MiasmConcreteState.miasm_flag_aliases:
+            return MiasmConcreteState.miasm_flag_aliases[regname]
+        return regname
 
-def disasm_elf(addr, mdis: disasmEngine) -> AsmCFG:
-    """Try to disassemble all contents of an ELF file.
-
-    Based on the full-disassembly algorithm in
-    `https://github.com/cea-sec/miasm/blob/master/example/disasm/full.py`
-    (as of commit `a229f4e`).
-
-    :return: An asmcfg.
-    """
-    # Settings for the engine
-    mdis.follow_call = True
-
-    # Initial run
-    asmcfg = mdis.dis_multiblock(addr)
-
-    todo = [addr]
-    done = set()
-    done_interval = interval()
-
-    while todo:
-        while todo:
-            ad = todo.pop(0)
-            if ad in done:
-                continue
-            done.add(ad)
-            asmcfg = mdis.dis_multiblock(ad, asmcfg)
-
-            for block in asmcfg.blocks:
-                for l in block.lines:
-                    done_interval += interval([(l.offset, l.offset + l.l)])
-
-            # Process recursive functions
-            for block in asmcfg.blocks:
-                instr = block.get_subcall_instr()
-                if not instr:
-                    continue
-                for dest in instr.getdstflow(mdis.loc_db):
-                    if not dest.is_loc():
-                        continue
-                    offset = mdis.loc_db.get_location_offset(dest.loc_key)
-                    todo.append(offset)
-
-        # Disassemble all:
-        for _, b in done_interval.intervals:
-            if b in done:
-                continue
-            todo.append(b)
-
-    return asmcfg
-
-def create_state(target: LLDBConcreteTarget) -> MiasmProgramState:
-    regs: dict[ExprId, ExprInt] = {}
-    mem = []
+    state = ProgramState(x86.ArchX86())
 
     # Query and store register state
-    rflags = target.read_register('rflags')
-    rflags = flag_names_to_miasm(x86.decompose_rflags(rflags))
+    rflags = x86.decompose_rflags(target.read_register('rflags'))
     for reg in machine.mn.regs.all_regs_ids_no_alias:
-        regname = reg.name.upper()  # Make flag names upper case, too
+        regname = reg.name
         try:
             conc_val = target.read_register(regname)
-            regs[reg] = ExprInt(conc_val, reg.size)
+            state.set(regname, conc_val)
+        except KeyError:
+            pass
         except SimConcreteRegisterError:
+            regname = standardize_flag_name(regname)
             if regname in rflags:
-                regs[reg] = ExprInt(rflags[regname], reg.size)
+                state.set(regname, rflags[regname])
 
     # Query and store memory state
     for mapping in target.get_mappings():
         assert(mapping.end_address > mapping.start_address)
         size = mapping.end_address - mapping.start_address
         try:
-            mem_state = target.read_memory(mapping.start_address, size)
+            data = target.read_memory(mapping.start_address, size)
+            state.write_memory(mapping.start_address, data)
         except SimConcreteMemoryError:
-            mem_state = f'<unable to access "{mapping.name}">'
-        mem.append((mapping, mem_state))
+            # Unable to read memory from mapping
+            pass
+
+    return state
 
-    return MiasmProgramState(regs, mem)
+def record_concrete_states(binary) -> list[tuple[int, ProgramState]]:
+    """Record a trace of concrete program states by stepping through an
+    executable.
+    """
+    addrs = set()
+    states = []
+    target = LLDBConcreteTarget(binary)
+    while not target.is_exited():
+        addrs.add(target.read_register('pc'))
+        states.append((target.read_register('pc'), create_state(target)))
+        target.step()
+    return states
 
 binary = 'test_program'
 
@@ -131,7 +85,9 @@ pc = int(cont.entry_point)
 # Disassemble binary
 print(f'Disassembling "{binary}"...')
 mdis = machine.dis_engine(cont.bin_stream, loc_db=loc_db)
-asmcfg = disasm_elf(pc, mdis)
+mdis.follow_call = True
+asmcfg = mdis.dis_multiblock(pc)
+
 with open('full_disasm', 'w') as file:
     print(f'Entry point: {hex(pc)}\n', file=file)
     print_blocks(asmcfg, file)
@@ -149,19 +105,13 @@ with open('full_ir', 'w') as file:
     print('=' * 80, file=file)
 print(f'--- Lifted disassembly to IR. Log written to "full_ir.log".')
 
-def record_concrete_states(binary):
-    states = {}
-    target = LLDBConcreteTarget(binary)
-    while not target.is_exited():
-        states[target.read_register('pc')] = create_state(target)
-        target.step()
-    return states
-
+# Record concrete reference states to guide symbolic execution
 print(f'Recording concrete program trace...')
-conc_states = record_concrete_states(binary)
-print(f'Recorded {len(conc_states)} trace points.')
+conc_trace = record_concrete_states(binary)
+conc_trace = [(a, MiasmConcreteState(s, loc_db)) for a, s in conc_trace]
+print(f'Recorded {len(conc_trace)} trace points.')
 
-def run_block(pc: int, conc_state: MiasmProgramState) -> int | None:
+def run_block(pc: int, conc_state: MiasmConcreteState) -> int | None:
     """Run a basic block.
 
     Tries to run IR blocks until the end of an ASM block/basic block is
@@ -186,11 +136,19 @@ def run_block(pc: int, conc_state: MiasmProgramState) -> int | None:
         # The new program counter might be a symbolic value. Try to evaluate
         # it based on the last recorded concrete state at the start of the
         # current basic block.
-        pc = eval_expr(symbolic_pc, conc_state, loc_db)
+        pc = eval_expr(symbolic_pc, conc_state)
+
+        # Initial disassembly might not find all blocks in the binary.
+        # Disassemble code ad-hoc if the new PC has not yet been disassembled.
         if ircfg.get_block(pc) is None:
-            print(f'Unable to access IR block at PC {pc}'
-                  f' (evaluated from the expression PC = {symbolic_pc}).')
-            return None
+            addr = int(pc)
+            cfg = mdis.dis_multiblock(addr)
+            for block in cfg.blocks:
+                lifter.add_asmblock_to_ircfg(block, ircfg)
+            assert(ircfg.get_block(pc) is not None)
+
+            print(f'Disassembled {len(cfg.blocks):4} new blocks at {hex(addr)}'
+                  f' (evaluated from symbolic PC {symbolic_pc}).')
 
         # If the resulting PC is an integer, i.e. a concrete address that can
         # be mapped to the assembly code, we return as we have reached the end
@@ -207,14 +165,28 @@ last_pc = None  # Debugging info
 # Run until no more states can be reached
 print(f'Re-tracing symbolically...')
 while pc is not None:
+    def step_trace(trace, pc: int):
+        for i, (addr, _) in enumerate(trace):
+            if addr == pc:
+                return trace[i:]
+        return []
+
     assert(type(pc) is int)
-    if pc not in conc_states:
+
+    # Find next trace point (the concrete trace may have stopped at more
+    # states than the symbolic trace does)
+    conc_trace = step_trace(conc_trace, pc)
+    if not conc_trace:
         print(f'Next PC {hex(pc)} is not contained in the concrete program'
               f' trace. Last valid PC: {hex(last_pc)}')
         break
     last_pc = pc
 
-    initial_state = conc_states[pc]
+    addr, initial_state = conc_trace[0]
+    assert(addr == pc)
+    conc_trace.pop(0)
+
+    # Run symbolic execution
     pc = run_block(pc, initial_state)
 
 print(f'--- No new PC found. Exiting.')