diff options
| author | Theofilos Augoustis <theofilos.augoustis@gmail.com> | 2023-12-14 17:03:59 +0100 |
|---|---|---|
| committer | Theofilos Augoustis <theofilos.augoustis@gmail.com> | 2023-12-14 17:03:59 +0100 |
| commit | 194f3d6f2ebdc7b0631fdaaeb8451142b052ccb0 (patch) | |
| tree | 20806871433db331bdfed66ddadfadecbea2b7c4 /parser.py | |
| parent | 4a5584d8f69d8ff511285387971d8cbf803f16b7 (diff) | |
| download | focaccia-194f3d6f2ebdc7b0631fdaaeb8451142b052ccb0.tar.gz focaccia-194f3d6f2ebdc7b0631fdaaeb8451142b052ccb0.zip | |
Implement symbolic comparison and match traces via Miasm
Co-authored-by: Theofilos Augoustis <theofilos.augoustis@gmail.com> Co-authored-by: Nicola Crivellin <nicola.crivellin98@gmail.com>
Diffstat (limited to '')
| -rw-r--r-- | parser.py | 124 |
1 files changed, 124 insertions, 0 deletions
diff --git a/parser.py b/parser.py new file mode 100644 index 0000000..d2fcf13 --- /dev/null +++ b/parser.py @@ -0,0 +1,124 @@ +"""Parsing of JSON files containing snapshot data.""" + +import json +import re +from typing import TextIO + +from arch import supported_architectures, Arch +from snapshot import ProgramState + +class ParseError(Exception): + """A parse error.""" + +def _get_or_throw(obj: dict, key: str): + """Get a value from a dict or throw a ParseError if not present.""" + val = obj.get(key) + if val is not None: + return val + raise ParseError(f'Expected value at key {key}, but found none.') + +def parse_snapshots(json_stream: TextIO) -> list[ProgramState]: + """Parse snapshots from our JSON format.""" + json_data = json.load(json_stream) + + arch = supported_architectures[_get_or_throw(json_data, 'architecture')] + snapshots = [] + for snapshot in _get_or_throw(json_data, 'snapshots'): + state = ProgramState(arch) + for reg, val in _get_or_throw(snapshot, 'registers').items(): + state.set(reg, val) + for mem in _get_or_throw(snapshot, 'memory'): + start, end = _get_or_throw(mem, 'range') + data = _get_or_throw(mem, 'data').encode() + assert(len(data) == end - start) + state.write_memory(start, data) + + snapshots.append(state) + + return snapshots + +def serialize_snapshots(snapshots: list[ProgramState], out_stream: TextIO): + """Serialize a list of snapshots to out JSON format.""" + if not snapshots: + return json.dump({}, out_stream) + + arch = snapshots[0].arch + res = { 'architecture': arch.archname, 'snapshots': [] } + for snapshot in snapshots: + assert(snapshot.arch == arch) + regs = {r: v for r, v in snapshot.regs.items() if v is not None} + mem = [] + for addr, data in snapshot.mem._pages.items(): + mem.append({ + 'range': [addr, addr + len(data)], + 'data': data.decode(), + }) + res['snapshots'].append({ 'registers': regs, 'memory': mem }) + + json.dump(res, out_stream) + +def parse_qemu(stream: TextIO, arch: Arch) -> list[ProgramState]: + states = [] + for line in stream: + if line.startswith('Trace'): + states.append(ProgramState(arch)) + continue + + line = line.strip() + + # Remove padding spaces around equality signs + line = re.sub(' =', '=', line) + line = re.sub('= +', '=', line) + + # Standardize register names + line = re.sub('YMM0([0-9])', lambda m: f'YMM{m.group(1)}', line) + line = re.sub('FPR([0-9])', lambda m: f'ST{m.group(1)}', line) + + # Bring each register assignment into a new line + line = re.sub(' ([A-Z0-9]+)=', lambda m: f'\n{m.group(1)}=', line) + + # Remove all trailing information from register assignments + line = re.sub('^([A-Z0-9]+)=([0-9a-f ]+).*$', + lambda m: f'{m.group(1)}={m.group(2)}', + line, + 0, re.MULTILINE) + + # Now parse registers and their values from the resulting lines + lines = line.split('\n') + for line in lines: + split = line.split('=') + if len(split) == 2: + regname, value = split + value = value.replace(' ', '') + regname = arch.to_regname(regname) + if regname is not None: + states[-1].set(regname, int(value, 16)) + + return states + +def parse_arancini(stream: TextIO, arch: Arch) -> list[ProgramState]: + aliases = { + 'Program counter': 'RIP', + 'flag ZF': 'ZF', + 'flag CF': 'CF', + 'flag OF': 'OF', + 'flag SF': 'SF', + 'flag PF': 'PF', + 'flag DF': 'DF', + } + + states = [] + for line in stream: + if line.startswith('INVOKE PC='): + states.append(ProgramState(arch)) + continue + + # Parse a register assignment + split = line.split(':') + if len(split) == 2 and states: + regname, value = split + regname = arch.to_regname(aliases.get(regname, regname)) + if regname is not None: + states[-1].set(regname, int(value, 16)) + + return states |