From 605e12fc5cf0fb64e45f68378390a09aa28df2f9 Mon Sep 17 00:00:00 2001 From: Theofilos Augoustis Date: Wed, 17 Jan 2024 17:08:48 +0100 Subject: Refactor symbolic transformation handling --- tools/capture_transforms.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 tools/capture_transforms.py (limited to 'tools/capture_transforms.py') diff --git a/tools/capture_transforms.py b/tools/capture_transforms.py new file mode 100644 index 0000000..de35d86 --- /dev/null +++ b/tools/capture_transforms.py @@ -0,0 +1,27 @@ +#!/usr/bin/env python + +import argparse +import logging + +from focaccia import parser +from focaccia.symbolic import collect_symbolic_trace + +def main(): + prog = argparse.ArgumentParser() + prog.description = 'Trace an executable concolically to capture symbolic' \ + ' transformations among instructions.' + prog.add_argument('binary', help='The program to analyse.') + prog.add_argument('args', action='store', nargs=argparse.REMAINDER, + help='Arguments to the program.') + prog.add_argument('-o', '--output', + default='trace.out', + help='Name of output file. (default: trace.out)') + args = prog.parse_args() + + logging.disable(logging.CRITICAL) + trace = collect_symbolic_trace(args.binary, args.args, None) + with open(args.output, 'w') as file: + parser.serialize_transformations(trace, file) + +if __name__ == "__main__": + main() -- cgit 1.4.1 From a24c5f12d6d909898472f7208cbe16b086a001c9 Mon Sep 17 00:00:00 2001 From: Theofilos Augoustis Date: Fri, 2 Feb 2024 17:54:41 +0100 Subject: Basic reproducer generator setup Co-authored-by: Alp Berkman Co-authored-by: Theofilos Augoustis --- .gitignore | 4 ++ focaccia.py | 54 +++++++++++--- focaccia/compare.py | 3 +- focaccia/lldb_target.py | 38 +++++++--- focaccia/parser.py | 1 + focaccia/reproducer.py | 172 ++++++++++++++++++++++++++++++++++++++++++++ tools/capture_transforms.py | 2 +- tools/convert.py | 2 + tools/verify_qemu.py | 2 + 9 files changed, 257 insertions(+), 21 deletions(-) create mode 100644 focaccia/reproducer.py mode change 100644 => 100755 tools/capture_transforms.py mode change 100644 => 100755 tools/convert.py mode change 100644 => 100755 tools/verify_qemu.py (limited to 'tools/capture_transforms.py') diff --git a/.gitignore b/.gitignore index 39b5bdb..ea2880a 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,7 @@ __pycache__/ # Dev environment .gdbinit + +# Focaccia files +qemu.sym +qemu.trace diff --git a/focaccia.py b/focaccia.py index bf5a9ff..0637690 100755 --- a/focaccia.py +++ b/focaccia.py @@ -1,17 +1,21 @@ -#! /usr/bin/env python3 +#!/usr/bin/env python3 import argparse import platform -from typing import Iterable +from typing import Iterable, Tuple from focaccia.arch import supported_architectures from focaccia.compare import compare_simple, compare_symbolic, ErrorTypes from focaccia.lldb_target import LLDBConcreteTarget -from focaccia.match import fold_traces -from focaccia.parser import parse_arancini +from focaccia.match import fold_traces, match_traces +from focaccia.parser import parse_arancini, parse_snapshots from focaccia.snapshot import ProgramState from focaccia.symbolic import collect_symbolic_trace from focaccia.utils import print_result +from focaccia.reproducer import Reproducer +from focaccia.compare import ErrorSeverity + + verbosity = { 'info': ErrorTypes.INFO, @@ -20,7 +24,7 @@ verbosity = { } def collect_concrete_trace(oracle_program: str, breakpoints: Iterable[int]) \ - -> list[ProgramState]: + -> Tuple[list[ProgramState], list]: """Gather snapshots from a native execution via an external debugger. :param oracle_program: Program to execute. @@ -37,11 +41,13 @@ def collect_concrete_trace(oracle_program: str, breakpoints: Iterable[int]) \ # Execute the native program snapshots = [] + basic_blocks = [] while not target.is_exited(): snapshots.append(target.record_snapshot()) + basic_blocks.append(target.get_next_basic_block()) target.run() - return snapshots + return snapshots, basic_blocks def parse_arguments(): parser = argparse.ArgumentParser(description='Comparator for emulator logs to reference') @@ -77,9 +83,32 @@ def parse_arguments(): ' may as well stem from incomplete input data.' ' \'info\' will report absolutely everything.' ' [Default: warning]') + parser.add_argument('-r', '--reproducer', + action='store_true', + default=False, + help='Enable reproducer to get assembly code' + ' which should replicate the first error.') + parser.add_argument('--trace-type', + type=str, + default='qemu', + choices=['qemu', 'arancini'], + help='Trace type of the emulator.' + ' Currently only Qemu and Arancini traces are accepted.' + ' Use \'qemu\' for Qemu and \'arancini\' for Arancini.' + ' [Default: qemu]') args = parser.parse_args() return args +def print_reproducer(result, min_severity: ErrorSeverity, oracle, oracle_args): + for res in result: + errs = [e for e in res['errors'] if e.severity >= min_severity] + #breakpoint() + if errs: + rep = Reproducer(oracle, oracle_args, res['snap'], res['ref']) + print(rep.asm()) + return + + def main(): args = parse_arguments() @@ -97,13 +126,19 @@ def main(): # Parse reference trace with open(txl_path, "r") as txl_file: - test_states = parse_arancini(txl_file, arch) + if args.trace_type == 'qemu': + test_states = parse_snapshots(txl_file) + elif args.trace_type == 'arancini': + test_states = parse_arancini(txl_file, arch) + else: + test_states = parse_snapshots(txl_file) # Compare reference trace to a truth if args.symbolic: print(f'Tracing {oracle} symbolically with arguments {oracle_args}...') transforms = collect_symbolic_trace(oracle, oracle_args) - fold_traces(test_states, transforms) + test_states, transforms = match_traces(test_states, transforms) + #fold_traces(test_states, transforms) result = compare_symbolic(test_states, transforms) else: # Record truth states from a concrete execution of the oracle @@ -113,5 +148,8 @@ def main(): print_result(result, verbosity[args.error_level]) + if args.reproducer: + print_reproducer(result, verbosity[args.error_level], oracle, oracle_args) + if __name__ == '__main__': main() diff --git a/focaccia/compare.py b/focaccia/compare.py index 43a0133..65c0f49 100644 --- a/focaccia/compare.py +++ b/focaccia/compare.py @@ -316,7 +316,8 @@ def compare_symbolic(test_states: Iterable[ProgramState], 'pc': pc_cur, 'txl': _calc_transformation(cur_state, next_state), 'ref': transform, - 'errors': errors + 'errors': errors, + 'snap': cur_state, }) # Step forward diff --git a/focaccia/lldb_target.py b/focaccia/lldb_target.py index 903e73d..b51ec3d 100644 --- a/focaccia/lldb_target.py +++ b/focaccia/lldb_target.py @@ -77,15 +77,14 @@ class LLDBConcreteTarget: """Step forward by a single instruction.""" thread: lldb.SBThread = self.process.GetThreadAtIndex(0) thread.StepInstruction(False) - + def run_until(self, address: int) -> None: """Continue execution until the address is arrived, ignores other breakpoints""" bp = self.target.BreakpointCreateByAddress(address) while self.read_register("pc") != address: - self.target.run() + self.run() self.target.BreakpointDelete(bp.GetID()) - def record_snapshot(self) -> ProgramState: """Record the concrete target's state in a ProgramState object.""" # Determine current arch @@ -219,8 +218,8 @@ class LLDBConcreteTarget: command = f'breakpoint delete {addr}' result = lldb.SBCommandReturnObject() self.interpreter.HandleCommand(command, result) - - def get_basic_block(self, addr: int) -> [lldb.SBInstruction]: + + def get_basic_block(self, addr: int) -> list[lldb.SBInstruction]: """Returns a basic block pointed by addr a code section is considered a basic block only if the last instruction is a brach, e.g. JUMP, CALL, RET @@ -232,12 +231,29 @@ class LLDBConcreteTarget: block.append(self.target.ReadInstructions(lldb.SBAddress(addr, self.target), 1)[0]) return block - + + def get_basic_block_inst(self, addr: int) -> list[str]: + inst = [] + for bb in self.get_basic_block(addr): + inst.append(f'{bb.GetMnemonic(self.target)} {bb.GetOperands(self.target)}') + return inst + + def get_next_basic_block(self) -> list[lldb.SBInstruction]: + return self.get_basic_block(self.read_register("pc")) + def get_symbol(self, addr: int) -> lldb.SBSymbol: - """Returns the symbol that belongs to the addr""" - for s in self.target.module.symbols: - if (s.GetType() == lldb.eSymbolTypeCode - and s.GetStartAddress().GetLoadAddress(self.target) <= addr - and addr < s.GetEndAddress().GetLoadAddress(self.target)): + """Returns the symbol that belongs to the addr + """ + for s in self.module.symbols: + if (s.GetType() == lldb.eSymbolTypeCode and s.GetStartAddress().GetLoadAddress(self.target) <= addr < s.GetEndAddress().GetLoadAddress(self.target)): return s raise ConcreteSectionError(f'Error getting the symbol to which address {hex(addr)} belongs to') + + def get_symbol_limit(self) -> int: + """Returns the address after all the symbols""" + addr = 0 + for s in self.module.symbols: + if s.GetStartAddress().IsValid(): + if s.GetStartAddress().GetLoadAddress(self.target) > addr: + addr = s.GetEndAddress().GetLoadAddress(self.target) + return addr diff --git a/focaccia/parser.py b/focaccia/parser.py index a5a1014..9fb83d8 100644 --- a/focaccia/parser.py +++ b/focaccia/parser.py @@ -4,6 +4,7 @@ import base64 import json import re from typing import TextIO +import lldb from .arch import supported_architectures, Arch from .snapshot import ProgramState diff --git a/focaccia/reproducer.py b/focaccia/reproducer.py new file mode 100644 index 0000000..90e1378 --- /dev/null +++ b/focaccia/reproducer.py @@ -0,0 +1,172 @@ + +from .lldb_target import LLDBConcreteTarget +from .snapshot import ProgramState +from .symbolic import SymbolicTransform, eval_symbol +from .arch import x86 + +class ReproducerMemoryError(Exception): + pass +class ReproducerBasicBlockError(Exception): + pass +class ReproducerRegisterError(Exception): + pass + +class Reproducer(): + def __init__(self, oracle: str, argv: str, snap: ProgramState, sym: SymbolicTransform) -> None: + + target = LLDBConcreteTarget(oracle) + + self.pc = snap.read_register("pc") + self.bb = target.get_basic_block_inst(self.pc) + self.sl = target.get_symbol_limit() + self.snap = snap + self.sym = sym + + def get_bb(self) -> str: + try: + asm = "" + asm += f'_bb_{hex(self.pc)}:\n' + for i in self.bb[:-1]: + asm += f'{i}\n' + asm += f'ret\n' + asm += f'\n' + + return asm + except: + raise ReproducerBasicBlockError(f'{hex(self.pc)}\n{self.snap}\n{self.sym}\n{self.bb}') + + def get_regs(self) -> str: + general_regs = ['RIP', 'RAX', 'RBX','RCX','RDX', 'RSI','RDI','RBP','RSP','R8','R9','R10','R11','R12','R13','R14','R15',] + flag_regs = ['CF', 'PF', 'AF', 'ZF', 'SF', 'TF', 'IF', 'DF', 'OF', 'IOPL', 'NT',] + eflag_regs = ['RF', 'VM', 'AC', 'VIF', 'VIP', 'ID',] + + try: + asm = "" + asm += f'_setup_regs:\n' + for reg in self.sym.get_used_registers(): + if reg in general_regs: + asm += f'mov ${hex(self.snap.read_register(reg))}, %{reg.lower()}\n' + + if 'RFLAGS' in self.sym.get_used_registers(): + asm += f'pushfq ${hex(self.snap.read_register("RFLAGS"))}\n' + + if any(reg in self.sym.get_used_registers() for reg in flag_regs+eflag_regs): + asm += f'pushfd ${hex(x86.compose_rflags(self.snap.regs))}\n' + asm += f'ret\n' + asm += f'\n' + + return asm + except: + raise ReproducerRegisterError(f'{hex(self.pc)}\n{self.snap}\n{self.sym}\n{self.bb}') + + def get_mem(self) -> str: + try: + asm = "" + asm += f'_setup_mem:\n' + for mem in self.sym.get_used_memory_addresses(): + addr = eval_symbol(mem.ptr, self.snap) + val = self.snap.read_memory(addr, int(mem.size/8)) + + if addr < self.sl: + asm += f'.org {hex(addr)}\n' + for b in val: + asm += f'.byte ${hex(b)}\n' + asm += f'\n' + + return asm + except: + raise ReproducerMemoryError(f'{hex(self.pc)}\n{self.snap}\n{self.sym}\n{self.bb}') + + def get_dyn(self) -> str: + try: + asm = "" + asm += f'_setup_dyn:\n' + for mem in self.sym.get_used_memory_addresses(): + addr = eval_symbol(mem.ptr, self.snap) + val = self.snap.read_memory(addr, int(mem.size/8)) + + if addr >= self.sl: + asm += f'mov ${hex(addr)}, %rdi\n' + asm += f'call _alloc\n' + for b in val: + asm += f'mov ${hex(addr)}, %rax\n' + asm += f'movb ${hex(b)}, (%rax)\n' + addr += 1 + asm += f'ret\n' + asm += f'\n' + + return asm + except: + raise ReproducerMemoryError(f'{hex(self.pc)}\n{self.snap}\n{self.sym}\n{self.bb}') + + def get_start(self) -> str: + asm = "" + asm += f'_start:\n' + asm += f'call _setup_dyn\n' + asm += f'call _setup_regs\n' + asm += f'call _bb_{hex(self.pc)}\n' + asm += f'call _exit\n' + asm += f'\n' + + return asm + + def get_exit(self) -> str: + asm = "" + asm += f'_exit:\n' + asm += f'movq $0, %rdi\n' + asm += f'movq $60, %rax\n' + asm += f'syscall\n' + asm += f'\n' + + return asm + + def get_alloc(self) -> str: + asm = "" + asm += f'_alloc:\n' + asm += f'movq $4096, %rsi\n' + asm += f'movq $(PROT_READ | PROT_WRITE), %rdx\n' + asm += f'movq $(MAP_PRIVATE | MAP_ANONYMOUS), %r10\n' + asm += f'movq $-1, %r8\n' + asm += f'movq $0, %r9\n' + asm += f'movq $syscall_mmap, %rax\n' + asm += f'syscall\n' + asm += f'ret\n' + asm += f'\n' + + return asm + + def get_code(self) -> str: + asm = "" + asm += f'.section .text\n' + asm += f'.global _start\n' + asm += f'\n' + asm += f'.org {hex(self.pc)}\n' + asm += self.get_bb() + asm += self.get_start() + asm += self.get_exit() + asm += self.get_alloc() + asm += self.get_regs() + asm += self.get_dyn() + + return asm + + def get_data(self) -> str: + asm = "" + asm += f'.section .data\n' + asm += f'PROT_READ = 0x1\n' + asm += f'PROT_WRITE = 0x2\n' + asm += f'MAP_PRIVATE = 0x2\n' + asm += f'MAP_ANONYMOUS = 0x20\n' + asm += f'syscall_mmap = 9\n' + asm += f'\n' + + asm += self.get_mem() + + return asm + + def asm(self) -> str: + asm = "" + asm += self.get_code() + asm += self.get_data() + + return asm diff --git a/tools/capture_transforms.py b/tools/capture_transforms.py old mode 100644 new mode 100755 index de35d86..5439b05 --- a/tools/capture_transforms.py +++ b/tools/capture_transforms.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 import argparse import logging diff --git a/tools/convert.py b/tools/convert.py old mode 100644 new mode 100755 index 27a8a4a..f21a2fa --- a/tools/convert.py +++ b/tools/convert.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python3 + import argparse import sys diff --git a/tools/verify_qemu.py b/tools/verify_qemu.py old mode 100644 new mode 100755 index da2e985..779b903 --- a/tools/verify_qemu.py +++ b/tools/verify_qemu.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python3 + """ Spawn GDB, connect to QEMU's GDB server, and read test states from that. -- cgit 1.4.1 From 67dbe17ec3dff6f4c9a508ace3f1d25cb5e9f9c8 Mon Sep 17 00:00:00 2001 From: Theofilos Augoustis Date: Mon, 19 Feb 2024 16:26:22 +0100 Subject: Make symbolic equations more symbolic Reduce the impact of concrete guidance on the process of calculating an instruction's symbolic equation. The resulting equations will contain less assumptions about the concrete state and thus be more generic. --- focaccia/compare.py | 33 +------ focaccia/lldb_target.py | 16 +++- focaccia/match.py | 2 + focaccia/miasm_util.py | 1 + focaccia/parser.py | 49 ++++++---- focaccia/symbolic.py | 211 +++++++++++++++++++++++++++++--------------- focaccia/trace.py | 74 ++++++++++++++++ focaccia/utils.py | 60 ++++++++++++- tools/_qemu_tool.py | 158 +++++++++++++++++---------------- tools/capture_transforms.py | 7 +- 10 files changed, 413 insertions(+), 198 deletions(-) create mode 100644 focaccia/trace.py (limited to 'tools/capture_transforms.py') diff --git a/focaccia/compare.py b/focaccia/compare.py index 65c0f49..13f965c 100644 --- a/focaccia/compare.py +++ b/focaccia/compare.py @@ -1,36 +1,9 @@ -from functools import total_ordering -from typing import Iterable, Self +from __future__ import annotations +from typing import Iterable from .snapshot import ProgramState, MemoryAccessError, RegisterAccessError from .symbolic import SymbolicTransform - -@total_ordering -class ErrorSeverity: - def __init__(self, num: int, name: str): - """Construct an error severity. - - :param num: A numerical value that orders the severity with respect - to other `ErrorSeverity` objects. Smaller values are less - severe. - :param name: A descriptive name for the error severity, e.g. 'fatal' - or 'info'. - """ - self._numeral = num - self.name = name - - def __repr__(self) -> str: - return f'[{self.name}]' - - def __eq__(self, other: object) -> bool: - if not isinstance(other, Self): - return False - return self._numeral == other._numeral - - def __lt__(self, other: Self) -> bool: - return self._numeral < other._numeral - - def __hash__(self) -> int: - return hash(self._numeral) +from .utils import ErrorSeverity class ErrorTypes: INFO = ErrorSeverity(0, 'INFO') diff --git a/focaccia/lldb_target.py b/focaccia/lldb_target.py index b51ec3d..a4e96a8 100644 --- a/focaccia/lldb_target.py +++ b/focaccia/lldb_target.py @@ -1,3 +1,5 @@ +import os + import lldb from .arch import supported_architectures, x86 @@ -30,11 +32,21 @@ class ConcreteSectionError(Exception): pass class LLDBConcreteTarget: - def __init__(self, executable: str, argv: list[str] = []): + def __init__(self, + executable: str, + argv: list[str] = [], + envp: list[str] | None = None): """Construct an LLDB concrete target. Stop at entry. + :param argv: List of arguements. Does NOT include the conventional + executable name as the first entry. + :param envp: List of environment entries. Defaults to current + `os.environ` if `None`. :raises RuntimeError: If the process is unable to launch. """ + if envp is None: + envp = [f'{k}={v}' for k, v in os.environ.items()] + self.debugger = lldb.SBDebugger.Create() self.debugger.SetAsync(False) self.target = self.debugger.CreateTargetWithFileAndArch(executable, @@ -46,7 +58,7 @@ class LLDBConcreteTarget: self.error = lldb.SBError() self.listener = self.debugger.GetListener() self.process = self.target.Launch(self.listener, - argv, None, # argv, envp + argv, envp, # argv, envp None, None, None, # stdin, stdout, stderr None, # working directory 0, diff --git a/focaccia/match.py b/focaccia/match.py index 4c8071f..be88176 100644 --- a/focaccia/match.py +++ b/focaccia/match.py @@ -54,6 +54,8 @@ def fold_traces(ctrace: list[ProgramState], while i + 1 < len(strace): strace[i].concat(strace.pop(i + 1)) + return ctrace, strace + def match_traces(ctrace: list[ProgramState], \ strace: list[SymbolicTransform]): """Try to match traces that don't follow the same program flow. diff --git a/focaccia/miasm_util.py b/focaccia/miasm_util.py index 4299f87..24a0e11 100644 --- a/focaccia/miasm_util.py +++ b/focaccia/miasm_util.py @@ -158,6 +158,7 @@ def _eval_exprmem(expr: ExprMem, state: MiasmSymbolResolver): if not addr.is_int(): return expr + assert(isinstance(addr, ExprInt)) mem = state.resolve_memory(int(addr), expr.size // 8) if mem is None: return expr diff --git a/focaccia/parser.py b/focaccia/parser.py index 9fb83d8..c37c07a 100644 --- a/focaccia/parser.py +++ b/focaccia/parser.py @@ -4,11 +4,11 @@ import base64 import json import re from typing import TextIO -import lldb from .arch import supported_architectures, Arch from .snapshot import ProgramState from .symbolic import SymbolicTransform +from .trace import Trace, TraceEnvironment class ParseError(Exception): """A parse error.""" @@ -20,25 +20,30 @@ def _get_or_throw(obj: dict, key: str): return val raise ParseError(f'Expected value at key {key}, but found none.') -def parse_transformations(json_stream: TextIO) -> list[SymbolicTransform]: +def parse_transformations(json_stream: TextIO) -> Trace[SymbolicTransform]: """Parse symbolic transformations from a text stream.""" - from .symbolic import parse_symbolic_transform + data = json.load(json_stream) - json_data = json.load(json_stream) - return [parse_symbolic_transform(item) for item in json_data] + env = TraceEnvironment.from_json(_get_or_throw(data, 'env')) + strace = [SymbolicTransform.from_json(item) \ + for item in _get_or_throw(data, 'states')] + + return Trace(strace, env) -def serialize_transformations(transforms: list[SymbolicTransform], +def serialize_transformations(transforms: Trace[SymbolicTransform], out_stream: TextIO): """Serialize symbolic transformations to a text stream.""" - from .symbolic import serialize_symbolic_transform - - json.dump([serialize_symbolic_transform(t) for t in transforms], out_stream) + json.dump({ + 'env': transforms.env.to_json(), + 'states': [t.to_json() for t in transforms], + }, out_stream) -def parse_snapshots(json_stream: TextIO) -> list[ProgramState]: +def parse_snapshots(json_stream: TextIO) -> Trace[ProgramState]: """Parse snapshots from our JSON format.""" json_data = json.load(json_stream) arch = supported_architectures[_get_or_throw(json_data, 'architecture')] + env = TraceEnvironment.from_json(_get_or_throw(json_data, 'env')) snapshots = [] for snapshot in _get_or_throw(json_data, 'snapshots'): state = ProgramState(arch) @@ -52,15 +57,19 @@ def parse_snapshots(json_stream: TextIO) -> list[ProgramState]: snapshots.append(state) - return snapshots + return Trace(snapshots, env) -def serialize_snapshots(snapshots: list[ProgramState], out_stream: TextIO): +def serialize_snapshots(snapshots: Trace[ProgramState], out_stream: TextIO): """Serialize a list of snapshots to out JSON format.""" if not snapshots: return json.dump({}, out_stream) arch = snapshots[0].arch - res = { 'architecture': arch.archname, 'snapshots': [] } + res = { + 'architecture': arch.archname, + 'env': snapshots.env.to_json(), + 'snapshots': [] + } for snapshot in snapshots: assert(snapshot.arch == arch) regs = {r: v for r, v in snapshot.regs.items() if v is not None} @@ -74,9 +83,15 @@ def serialize_snapshots(snapshots: list[ProgramState], out_stream: TextIO): json.dump(res, out_stream) -def parse_qemu(stream: TextIO, arch: Arch) -> list[ProgramState]: +def _make_unknown_env() -> TraceEnvironment: + return TraceEnvironment('', [], [], '?') + +def parse_qemu(stream: TextIO, arch: Arch) -> Trace[ProgramState]: """Parse a QEMU log from a stream. + Recommended QEMU log option: `qemu -d exec,cpu,fpu,vpu,nochain`. The `exec` + flag is strictly necessary for the log to be parseable. + :return: A list of parsed program states, in order of occurrence in the log. """ @@ -88,7 +103,7 @@ def parse_qemu(stream: TextIO, arch: Arch) -> list[ProgramState]: if states: _parse_qemu_line(line, states[-1]) - return states + return Trace(states, _make_unknown_env()) def _parse_qemu_line(line: str, cur_state: ProgramState): """Try to parse a single register-assignment line from a QEMU log. @@ -129,7 +144,7 @@ def _parse_qemu_line(line: str, cur_state: ProgramState): if regname is not None: cur_state.set_register(regname, int(value, 16)) -def parse_arancini(stream: TextIO, arch: Arch) -> list[ProgramState]: +def parse_arancini(stream: TextIO, arch: Arch) -> Trace[ProgramState]: aliases = { 'Program counter': 'RIP', 'flag ZF': 'ZF', @@ -154,4 +169,4 @@ def parse_arancini(stream: TextIO, arch: Arch) -> list[ProgramState]: if regname is not None: states[-1].set_register(regname, int(value, 16)) - return states + return Trace(states, _make_unknown_env()) diff --git a/focaccia/symbolic.py b/focaccia/symbolic.py index 029e979..dfc2966 100644 --- a/focaccia/symbolic.py +++ b/focaccia/symbolic.py @@ -20,6 +20,7 @@ from .lldb_target import LLDBConcreteTarget, \ from .miasm_util import MiasmSymbolResolver, eval_expr from .snapshot import ProgramState, ReadableProgramState, \ RegisterAccessError, MemoryAccessError +from .trace import Trace, TraceEnvironment warn = logging.warning @@ -342,6 +343,49 @@ class SymbolicTransform: res[addr] = eval_symbol(expr, conc_state).to_bytes(length, byteorder='big') return res + @classmethod + def from_json(cls, data: dict) -> SymbolicTransform: + """Parse a symbolic transformation from a JSON object. + + :raise KeyError: if a parse error occurs. + """ + from miasm.expression.parser import str_to_expr as parse + + def decode_inst(obj: Iterable[int], arch: Arch): + b = b''.join(i.to_bytes(1, byteorder='big') for i in obj) + return Instruction.from_bytecode(b, arch) + + arch = supported_architectures[data['arch']] + start_addr = int(data['from_addr']) + end_addr = int(data['to_addr']) + + t = SymbolicTransform({}, [], arch, start_addr, end_addr) + t.changed_regs = { name: parse(val) for name, val in data['regs'].items() } + t.changed_mem = { parse(addr): parse(val) for addr, val in data['mem'].items() } + t.instructions = [decode_inst(b, arch) for b in data['instructions']] + + # Recover the instructions' address information + addr = t.addr + for inst in t.instructions: + inst.addr = addr + addr += inst.length + + return t + + def to_json(self) -> dict: + """Serialize a symbolic transformation as a JSON object.""" + def encode_inst(inst: Instruction): + return [int(b) for b in inst.to_bytecode()] + + return { + 'arch': self.arch.archname, + 'from_addr': self.range[0], + 'to_addr': self.range[1], + 'instructions': [encode_inst(inst) for inst in self.instructions], + 'regs': { name: repr(expr) for name, expr in self.changed_regs.items() }, + 'mem': { repr(addr): repr(val) for addr, val in self.changed_mem.items() }, + } + def __repr__(self) -> str: start, end = self.range res = f'Symbolic state transformation {hex(start)} -> {hex(end)}:\n' @@ -356,51 +400,6 @@ class SymbolicTransform: return res[:-1] # Remove trailing newline -def parse_symbolic_transform(string: str) -> SymbolicTransform: - """Parse a symbolic transformation from a string. - :raise KeyError: if a parse error occurs. - """ - import json - from miasm.expression.parser import str_to_expr as parse - - def decode_inst(obj: Iterable[int], arch: Arch): - b = b''.join(i.to_bytes(1, byteorder='big') for i in obj) - return Instruction.from_bytecode(b, arch) - - data = json.loads(string) - arch = supported_architectures[data['arch']] - start_addr = int(data['from_addr']) - end_addr = int(data['to_addr']) - - t = SymbolicTransform({}, [], arch, start_addr, end_addr) - t.changed_regs = { name: parse(val) for name, val in data['regs'].items() } - t.changed_mem = { parse(addr): parse(val) for addr, val in data['mem'].items() } - t.instructions = [decode_inst(b, arch) for b in data['instructions']] - - # Recover the instructions' address information - addr = t.addr - for inst in t.instructions: - inst.addr = addr - addr += inst.length - - return t - -def serialize_symbolic_transform(t: SymbolicTransform) -> str: - """Serialize a symbolic transformation.""" - import json - - def encode_inst(inst: Instruction): - return [int(b) for b in inst.to_bytecode()] - - return json.dumps({ - 'arch': t.arch.archname, - 'from_addr': t.range[0], - 'to_addr': t.range[1], - 'instructions': [encode_inst(inst) for inst in t.instructions], - 'regs': { name: repr(expr) for name, expr in t.changed_regs.items() }, - 'mem': { repr(addr): repr(val) for addr, val in t.changed_mem.items() }, - }) - class DisassemblyContext: def __init__(self, binary): self.loc_db = LocationDB() @@ -419,6 +418,7 @@ class DisassemblyContext: """Focaccia's description of an instruction set architecture.""" # Create disassembly/lifting context + assert(self.machine.dis_engine is not None) self.mdis = self.machine.dis_engine(cont.bin_stream, loc_db=self.loc_db) self.mdis.follow_call = True self.lifter = self.machine.lifter(self.loc_db) @@ -427,56 +427,122 @@ def run_instruction(instr: miasm_instr, conc_state: MiasmSymbolResolver, lifter: Lifter) \ -> tuple[ExprInt | None, dict[Expr, Expr]]: - """Run a basic block. + """Compute the symbolic equation of a single instruction. - Tries to run IR blocks until the end of an ASM block/basic block is - reached. Skips 'virtual' blocks that purely exist in the IR. + The concolic engine tries to express the instruction's equation as + independent of the concrete state as possible. + + May fail if the instruction is not supported. Failure is signalled by + returning `None` as the next program counter. :param instr: The instruction to run. :param conc_state: A concrete reference state at `pc = instr.offset`. Used - to resolve symbolic program counters, i.e. to 'guide' the - symbolic execution on the correct path. This is the + to resolve symbolic program counters, i.e. to 'guide' + the symbolic execution on the correct path. This is the concrete part of our concolic execution. - :param lifter: A lifter of the appropriate architecture. Get this from a - `DisassemblyContext` or a `Machine`. + :param lifter: A lifter of the appropriate architecture. Get this from + a `DisassemblyContext` or a `Machine`. - :return: The next program counter and a symbolic state. None if no next - program counter can be found. This happens when an error occurs or - when the program exits. The state represents the transformation - which the instruction applies to a program state. + :return: The next program counter and a symbolic state. The PC is None if + an error occurs or when the program exits. The returned state + is `instr`'s symbolic transformation. """ - def execute_location(loc, base_state: dict | None) -> tuple[ExprInt | None, dict]: + from miasm.expression.expression import ExprCond, LocKey + from miasm.expression.simplifications import expr_simp + + def create_cond_state(cond: Expr, iftrue: dict, iffalse: dict) -> dict: + """Combines states that are to be reached conditionally. + + Example: + State A: + RAX = 0x42 + @[RBP - 0x4] = 0x123 + State B: + RDI = -0x777 + @[RBP - 0x4] = 0x5c32 + Condition: + RCX > 0x4 ? A : B + + Result State: + RAX = (RCX > 0x4) ? 0x42 : RAX + RDI = (RCX > 0x4) ? RDI : -0x777 + @[RBP - 0x4] = (RCX > 0x4) ? 0x123 : 0x5c32 + """ + res = {} + for dst, v in iftrue.items(): + if dst not in iffalse: + res[dst] = expr_simp(ExprCond(cond, v, dst)) + else: + res[dst] = expr_simp(ExprCond(cond, v, iffalse[dst])) + for dst, v in iffalse.items(): + if dst not in iftrue: + res[dst] = expr_simp(ExprCond(cond, dst, v)) + return res + + def _execute_location(loc, base_state: dict | None) \ + -> tuple[Expr, dict]: + """Execute a single IR block via symbolic engine. No fancy stuff.""" # Query the location's IR block irblock = ircfg.get_block(loc) if irblock is None: - # Weirdly, this never seems to happen. I don't know why, though. - return None, base_state if base_state is not None else {} + return loc, base_state if base_state is not None else {} # Apply IR block to the current state engine = SymbolicExecutionEngine(lifter, state=base_state) new_pc = engine.eval_updt_irblock(irblock) modified = dict(engine.modified()) + return new_pc, modified + + def execute_location(loc: Expr | LocKey) -> tuple[ExprInt, dict]: + """Execute chains of IR blocks until a concrete program counter is + reached.""" + seen_locs = set() # To break out of loop instructions + new_pc, modified = _execute_location(loc, None) + + # Run chained IR blocks until a real program counter is reached. + # This used to be recursive (and much more elegant), but large RCX + # values for 'REP ...' instructions could make the stack overflow. + while not new_pc.is_int(): + seen_locs.add(new_pc) + + if new_pc.is_loc(): + # Jump to the next location. + new_pc, modified = _execute_location(new_pc, modified) + elif new_pc.is_cond(): + # Explore conditional paths manually by constructing + # conditional states based on the possible outcomes. + assert(isinstance(new_pc, ExprCond)) + cond = new_pc.cond + pc_iftrue, pc_iffalse = new_pc.src1, new_pc.src2 + + pc_t, state_t = _execute_location(pc_iftrue, modified.copy()) + pc_f, state_f = _execute_location(pc_iffalse, modified.copy()) + modified = create_cond_state(cond, state_t, state_f) + new_pc = expr_simp(ExprCond(cond, pc_t, pc_f)) + else: + # Concretisize PC in case it is, e.g., a memory expression + new_pc = eval_expr(new_pc, conc_state) - # Resolve the symbolic PC to a concrete value based on the - # concrete program state before this instruction - new_pc = eval_expr(new_pc, conc_state) - if new_pc.is_loc(): - # Run chained IR blocks until a real program counter is reached - new_pc, modified = execute_location(new_pc, modified) + # Avoid infinite loops for loop instructions (REP ...) by making + # the jump to the next loop iteration (or exit) concrete. + if new_pc in seen_locs: + new_pc = eval_expr(new_pc, conc_state) + seen_locs.clear() - assert(new_pc is not None and isinstance(new_pc, ExprInt)) + assert(isinstance(new_pc, ExprInt)) return new_pc, modified # Lift instruction to IR ircfg = lifter.new_ircfg() try: loc = lifter.add_instr_to_ircfg(instr, ircfg, None, False) + assert(isinstance(loc, Expr) or isinstance(loc, LocKey)) except NotImplementedError as err: warn(f'[WARNING] Unable to lift instruction {instr}: {err}. Skipping.') return None, {} # Create an empty transform for the instruction # Execute instruction symbolically - new_pc, modified = execute_location(loc, None) + new_pc, modified = execute_location(loc) modified[lifter.pc] = new_pc # Add PC update to state return new_pc, modified @@ -514,21 +580,22 @@ class _LLDBConcreteState(ReadableProgramState): except ConcreteMemoryError: raise MemoryAccessError(addr, size, 'Unable to read memory from LLDB.') -def collect_symbolic_trace(binary: str, - args: list[str], +def collect_symbolic_trace(env: TraceEnvironment, start_addr: int | None = None - ) -> list[SymbolicTransform]: + ) -> Trace[SymbolicTransform]: """Execute a program and compute state transformations between executed instructions. :param binary: The binary to trace. :param args: Arguments to the program. """ + binary = env.binary_name + ctx = DisassemblyContext(binary) arch = ctx.arch # Set up concrete reference state - target = LLDBConcreteTarget(binary, args) + target = LLDBConcreteTarget(binary, env.argv, env.envp) if start_addr is not None: target.run_until(start_addr) lldb_state = _LLDBConcreteState(target, arch) @@ -563,4 +630,4 @@ def collect_symbolic_trace(binary: str, # Step forward target.step() - return strace + return Trace(strace, env) diff --git a/focaccia/trace.py b/focaccia/trace.py new file mode 100644 index 0000000..094358f --- /dev/null +++ b/focaccia/trace.py @@ -0,0 +1,74 @@ +from __future__ import annotations +from typing import Generic, TypeVar + +from .utils import file_hash + +T = TypeVar('T') + +class TraceEnvironment: + """Data that defines the environment in which a trace was recorded.""" + def __init__(self, + binary: str, + argv: list[str], + envp: list[str], + binary_hash: str | None = None): + self.argv = argv + self.envp = envp + self.binary_name = binary + if binary_hash is None: + self.binary_hash = file_hash(binary) + else: + self.binary_hash = binary_hash + + @classmethod + def from_json(cls, json: dict) -> TraceEnvironment: + """Parse a JSON object into a TraceEnvironment.""" + return cls( + json['binary_name'], + json['argv'], + json['envp'], + json['binary_hash'], + ) + + def to_json(self) -> dict: + """Serialize a TraceEnvironment to a JSON object.""" + return { + 'binary_name': self.binary_name, + 'binary_hash': self.binary_hash, + 'argv': self.argv, + 'envp': self.envp, + } + + def __eq__(self, other: object) -> bool: + if not isinstance(other, TraceEnvironment): + return False + + return self.binary_name == other.binary_name \ + and self.binary_hash == other.binary_hash \ + and self.argv == other.argv \ + and self.envp == other.envp + + def __repr__(self) -> str: + return f'{self.binary_name} {" ".join(self.argv)}' \ + f'\n bin-hash={self.binary_hash}' \ + f'\n envp={repr(self.envp)}' + +class Trace(Generic[T]): + def __init__(self, + trace_states: list[T], + env: TraceEnvironment): + self.states = trace_states + self.env = env + + def __len__(self) -> int: + return len(self.states) + + def __getitem__(self, i: int) -> T: + return self.states[i] + + def __iter__(self): + return iter(self.states) + + def __repr__(self) -> str: + return f'Trace with {len(self.states)} trace points.' \ + f' Environment: {repr(self.env)}' diff --git a/focaccia/utils.py b/focaccia/utils.py index 50251a1..c4f6a74 100644 --- a/focaccia/utils.py +++ b/focaccia/utils.py @@ -1,8 +1,39 @@ +from __future__ import annotations + import ctypes -import sys +import os import shutil +import sys +from functools import total_ordering +from hashlib import sha256 + +@total_ordering +class ErrorSeverity: + def __init__(self, num: int, name: str): + """Construct an error severity. + + :param num: A numerical value that orders the severity with respect + to other `ErrorSeverity` objects. Smaller values are less + severe. + :param name: A descriptive name for the error severity, e.g. 'fatal' + or 'info'. + """ + self._numeral = num + self.name = name + + def __repr__(self) -> str: + return f'[{self.name}]' -from .compare import ErrorSeverity + def __eq__(self, other: object) -> bool: + if not isinstance(other, ErrorSeverity): + return False + return self._numeral == other._numeral + + def __lt__(self, other: ErrorSeverity) -> bool: + return self._numeral < other._numeral + + def __hash__(self) -> int: + return hash(self._numeral) def float_bits_to_uint(v: float) -> int: """Bit-cast a float to a 32-bit integer.""" @@ -20,6 +51,31 @@ def uint_bits_to_double(v: int) -> float: """Bit-cast a 64-bit integer to a double.""" return ctypes.c_double.from_buffer(ctypes.c_uint64(v)).value +def file_hash(filename: str, hash = sha256(), chunksize: int = 65536) -> str: + """Calculate a file's hash. + + :param filename: Name of the file to hash. + :param hash: The hash algorithm to use. + :param chunksize: Optimization option. Size of contiguous chunks to read + from the file and feed into the hashing algorithm. + :return: A hex digest. + """ + with open(filename, 'rb') as file: + while True: + data = file.read(chunksize) + if not data: + break + hash.update(data) + return hash.hexdigest() + +def get_envp() -> list[str]: + """Return current environment array. + + Merge dict-like `os.environ` struct to the traditional list-like + environment array. + """ + return [f'{k}={v}' for k, v in os.environ.items()] + def print_separator(separator: str = '-', stream=sys.stdout, count: int = 80): maxtermsize = count termsize = shutil.get_terminal_size((80, 20)).columns diff --git a/tools/_qemu_tool.py b/tools/_qemu_tool.py index 9659f0e..38715dc 100644 --- a/tools/_qemu_tool.py +++ b/tools/_qemu_tool.py @@ -15,7 +15,8 @@ from focaccia.compare import compare_symbolic from focaccia.snapshot import ProgramState, ReadableProgramState, \ RegisterAccessError, MemoryAccessError from focaccia.symbolic import SymbolicTransform, eval_symbol, ExprMem -from focaccia.utils import print_result +from focaccia.trace import Trace, TraceEnvironment +from focaccia.utils import get_envp, print_result from verify_qemu import make_argparser, verbosity @@ -76,6 +77,7 @@ class GDBServerStateIterator: exit(1) self.arch = supported_architectures[archname] + self.binary = self._process.progspace.filename def __iter__(self): return self @@ -98,6 +100,77 @@ class GDBServerStateIterator: return GDBProgramState(self._process, gdb.selected_frame()) +def record_minimal_snapshot(prev_state: ReadableProgramState, + cur_state: ReadableProgramState, + prev_transform: SymbolicTransform, + cur_transform: SymbolicTransform) \ + -> ProgramState: + """Record a minimal snapshot. + + A minimal snapshot must include values (registers and memory) that are + accessed by two transformations: + 1. The values produced by the previous transformation (the + transformation that is producing this snapshot) to check these + values against expected values calculated from the previous + program state. + 2. The values that act as inputs to the transformation acting on this + snapshot, to calculate the expected values of the next snapshot. + + :param prev_transform: The symbolic transformation generating, or + leading to, `cur_state`. Values generated by + this transformation are included in the + snapshot. + :param transform: The symbolic transformation operating on this + snapshot. Input values to this transformation are + included in the snapshot. + """ + assert(cur_state.read_register('pc') == cur_transform.addr) + assert(prev_transform.arch == cur_transform.arch) + + def get_written_addresses(t: SymbolicTransform): + """Get all output memory accesses of a symbolic transformation.""" + return [ExprMem(a, v.size) for a, v in t.changed_mem.items()] + + def set_values(regs: Iterable[str], mems: Iterable[ExprMem], + cur_state: ReadableProgramState, + prev_state: ReadableProgramState, + out_state: ProgramState): + """ + :param prev_state: Addresses of memory included in the snapshot are + resolved relative to this state. + """ + for regname in regs: + regval = cur_state.read_register(regname) + try: + out_state.set_register(regname, regval) + except RegisterAccessError: + pass + for mem in mems: + assert(mem.size % 8 == 0) + addr = eval_symbol(mem.ptr, prev_state) + try: + mem = cur_state.read_memory(addr, int(mem.size / 8)) + out_state.write_memory(addr, mem) + except MemoryAccessError: + pass + + state = ProgramState(cur_transform.arch) + state.set_register('PC', cur_transform.addr) + + set_values(prev_transform.changed_regs.keys(), + get_written_addresses(prev_transform), + cur_state, + prev_state, # Evaluate memory addresses based on previous + # state because they are that state's output + # addresses. + state) + set_values(cur_transform.get_used_registers(), + cur_transform.get_used_memory_addresses(), + cur_state, + cur_state, + state) + return state + def collect_conc_trace(gdb: GDBServerStateIterator, \ strace: list[SymbolicTransform]) \ -> tuple[list[ProgramState], list[SymbolicTransform]]: @@ -115,75 +188,6 @@ def collect_conc_trace(gdb: GDBServerStateIterator, \ :return: A list of concrete states and a list of corresponding symbolic transformations. The lists are guaranteed to have the same length. """ - def record_snapshot(prev_state: ReadableProgramState, - cur_state: GDBProgramState, - prev_transform: SymbolicTransform, - cur_transform: SymbolicTransform) \ - -> ProgramState: - """Record a minimal snapshot. - - A minimal snapshot must include values (registers and memory) that are - accessed by two transformations: - 1. The values produced by the previous transformation (the - transformation that is producing this snapshot) to check these - values against expected values calculated from the previous - program state. - 2. The values that act as inputs to the transformation acting on this - snapshot, to calculate the expected values of the next snapshot. - - :param prev_transform: The symbolic transformation generating, or - leading to, `gdb_state`. Values generated by - this transformation are included in the - snapshot. - :param transform: The symbolic transformation operating on this - snapshot. Input values to this transformation are - included in the snapshot. - """ - assert(cur_state.read_register('pc') == cur_transform.addr) - - def get_written_addresses(t: SymbolicTransform): - """Get all output memory accesses of a symbolic transformation.""" - return [ExprMem(a, v.size) for a, v in t.changed_mem.items()] - - def set_values(regs: Iterable[str], mems: Iterable[ExprMem], - cur_state: GDBProgramState, prev_state: ReadableProgramState, - out_state: ProgramState): - """ - :param prev_state: Addresses of memory included in the snapshot are - resolved relative to this state. - """ - for regname in regs: - regval = cur_state.read_register(regname) - try: - out_state.set_register(regname, regval) - except RegisterAccessError: - pass - for mem in mems: - assert(mem.size % 8 == 0) - addr = eval_symbol(mem.ptr, prev_state) - try: - mem = cur_state.read_memory(addr, int(mem.size / 8)) - out_state.write_memory(addr, mem) - except MemoryAccessError: - pass - - state = ProgramState(gdb.arch) - state.set_register('PC', cur_transform.addr) - - set_values(prev_transform.changed_regs.keys(), - get_written_addresses(prev_transform), - cur_state, - prev_state, # Evaluate memory addresses based on previous - # state because they are that state's output - # addresses. - state) - set_values(cur_transform.get_used_registers(), - cur_transform.get_used_memory_addresses(), - cur_state, - cur_state, - state) - return state - def find_index(seq, target, access=lambda el: el): for i, el in enumerate(seq): if access(el) == target: @@ -222,7 +226,7 @@ def collect_conc_trace(gdb: GDBServerStateIterator, \ symb_i += next_i + 1 assert(cur_state.read_register('pc') == strace[symb_i].addr) - states.append(record_snapshot( + states.append(record_minimal_snapshot( states[-1] if states else cur_state, cur_state, matched_transforms[-1] if matched_transforms else strace[symb_i], @@ -240,6 +244,12 @@ def main(): gdbserver_addr = 'localhost' gdbserver_port = args.port + gdb_server = GDBServerStateIterator(gdbserver_addr, gdbserver_port) + + executable = gdb_server.binary + argv = [] # QEMU's GDB stub does not support 'info proc cmdline' + envp = [] # Can't get the remote target's environment + env = TraceEnvironment(executable, argv, envp, '?') # Read pre-computed symbolic trace with open(args.symb_trace, 'r') as strace: @@ -247,8 +257,8 @@ def main(): # Use symbolic trace to collect concrete trace from QEMU conc_states, matched_transforms = collect_conc_trace( - GDBServerStateIterator(gdbserver_addr, gdbserver_port), - symb_transforms) + gdb_server, + symb_transforms.states) # Verify and print result if not args.quiet: @@ -258,7 +268,7 @@ def main(): if args.output: from focaccia.parser import serialize_snapshots with open(args.output, 'w') as file: - serialize_snapshots(conc_states, file) + serialize_snapshots(Trace(conc_states, env), file) if __name__ == "__main__": main() diff --git a/tools/capture_transforms.py b/tools/capture_transforms.py index 5439b05..5a104c0 100755 --- a/tools/capture_transforms.py +++ b/tools/capture_transforms.py @@ -2,9 +2,11 @@ import argparse import logging +import os from focaccia import parser from focaccia.symbolic import collect_symbolic_trace +from focaccia.trace import Trace, TraceEnvironment def main(): prog = argparse.ArgumentParser() @@ -19,7 +21,10 @@ def main(): args = prog.parse_args() logging.disable(logging.CRITICAL) - trace = collect_symbolic_trace(args.binary, args.args, None) + env = TraceEnvironment(args.binary, + args.args, + [f'{k}={v}' for k, v in os.environ.items()]) + trace = collect_symbolic_trace(env, None) with open(args.output, 'w') as file: parser.serialize_transformations(trace, file) -- cgit 1.4.1 From 4f005b2917ff83acb966495741487029ab34ab1a Mon Sep 17 00:00:00 2001 From: Theofilos Augoustis Date: Sun, 28 Jul 2024 15:47:00 +0200 Subject: Enable Focaccia's logging in capture_transforms.py Disable Miasm's disassembly logger by default. Enable Focaccia's symbolic execution logger. Also refactor envp construction to use the `utils.get_envp` function. --- focaccia/symbolic.py | 10 +++++++--- tools/capture_transforms.py | 11 +++-------- 2 files changed, 10 insertions(+), 11 deletions(-) (limited to 'tools/capture_transforms.py') diff --git a/focaccia/symbolic.py b/focaccia/symbolic.py index d5475a6..6338a14 100644 --- a/focaccia/symbolic.py +++ b/focaccia/symbolic.py @@ -22,7 +22,11 @@ from .snapshot import ProgramState, ReadableProgramState, \ RegisterAccessError, MemoryAccessError from .trace import Trace, TraceEnvironment -warn = logging.warning +logger = logging.getLogger('focaccia-symbolic') +warn = logger.warn + +# Disable Miasm's disassembly logger +logging.getLogger('asmblock').setLevel(logging.CRITICAL) def eval_symbol(symbol: Expr, conc_state: ReadableProgramState) -> int: """Evaluate a symbol based on a concrete reference state. @@ -619,8 +623,8 @@ def collect_symbolic_trace(env: TraceEnvironment, instr = ctx.mdis.dis_instr(pc) except: err = sys.exc_info()[1] - warn(f'[WARNING] Unable to disassemble instruction at {hex(pc)}:' - f' {err}. Skipping.') + warn(f'Unable to disassemble instruction at {hex(pc)}: {err}.' + f' Skipping.') target.step() continue diff --git a/tools/capture_transforms.py b/tools/capture_transforms.py index 5a104c0..552b855 100755 --- a/tools/capture_transforms.py +++ b/tools/capture_transforms.py @@ -1,12 +1,10 @@ #!/usr/bin/env python3 import argparse -import logging -import os -from focaccia import parser +from focaccia import parser, utils from focaccia.symbolic import collect_symbolic_trace -from focaccia.trace import Trace, TraceEnvironment +from focaccia.trace import TraceEnvironment def main(): prog = argparse.ArgumentParser() @@ -20,10 +18,7 @@ def main(): help='Name of output file. (default: trace.out)') args = prog.parse_args() - logging.disable(logging.CRITICAL) - env = TraceEnvironment(args.binary, - args.args, - [f'{k}={v}' for k, v in os.environ.items()]) + env = TraceEnvironment(args.binary, args.args, utils.get_envp()) trace = collect_symbolic_trace(env, None) with open(args.output, 'w') as file: parser.serialize_transformations(trace, file) -- cgit 1.4.1