about summary refs log tree commit diff stats
path: root/parser.py
diff options
context:
space:
mode:
Diffstat (limited to 'parser.py')
-rw-r--r--parser.py124
1 files changed, 124 insertions, 0 deletions
diff --git a/parser.py b/parser.py
new file mode 100644
index 0000000..d2fcf13
--- /dev/null
+++ b/parser.py
@@ -0,0 +1,124 @@
+"""Parsing of JSON files containing snapshot data."""
+
+import json
+import re
+from typing import TextIO
+
+from arch import supported_architectures, Arch
+from snapshot import ProgramState
+
+class ParseError(Exception):
+    """A parse error."""
+
+def _get_or_throw(obj: dict, key: str):
+    """Get a value from a dict or throw a ParseError if not present."""
+    val = obj.get(key)
+    if val is not None:
+        return val
+    raise ParseError(f'Expected value at key {key}, but found none.')
+
+def parse_snapshots(json_stream: TextIO) -> list[ProgramState]:
+    """Parse snapshots from our JSON format."""
+    json_data = json.load(json_stream)
+
+    arch = supported_architectures[_get_or_throw(json_data, 'architecture')]
+    snapshots = []
+    for snapshot in _get_or_throw(json_data, 'snapshots'):
+        state = ProgramState(arch)
+        for reg, val in _get_or_throw(snapshot, 'registers').items():
+            state.set(reg, val)
+        for mem in _get_or_throw(snapshot, 'memory'):
+            start, end = _get_or_throw(mem, 'range')
+            data = _get_or_throw(mem, 'data').encode()
+            assert(len(data) == end - start)
+            state.write_memory(start, data)
+
+        snapshots.append(state)
+
+    return snapshots
+
+def serialize_snapshots(snapshots: list[ProgramState], out_stream: TextIO):
+    """Serialize a list of snapshots to out JSON format."""
+    if not snapshots:
+        return json.dump({}, out_stream)
+
+    arch = snapshots[0].arch
+    res = { 'architecture': arch.archname, 'snapshots': [] }
+    for snapshot in snapshots:
+        assert(snapshot.arch == arch)
+        regs = {r: v for r, v in snapshot.regs.items() if v is not None}
+        mem = []
+        for addr, data in snapshot.mem._pages.items():
+            mem.append({
+                'range': [addr, addr + len(data)],
+                'data': data.decode(),
+            })
+        res['snapshots'].append({ 'registers': regs, 'memory': mem })
+
+    json.dump(res, out_stream)
+
+def parse_qemu(stream: TextIO, arch: Arch) -> list[ProgramState]:
+    states = []
+    for line in stream:
+        if line.startswith('Trace'):
+            states.append(ProgramState(arch))
+            continue
+
+        line = line.strip()
+
+        # Remove padding spaces around equality signs
+        line = re.sub(' =', '=', line)
+        line = re.sub('= +', '=', line)
+
+        # Standardize register names
+        line = re.sub('YMM0([0-9])',   lambda m: f'YMM{m.group(1)}', line)
+        line = re.sub('FPR([0-9])',    lambda m: f'ST{m.group(1)}', line)
+
+        # Bring each register assignment into a new line
+        line = re.sub(' ([A-Z0-9]+)=', lambda m: f'\n{m.group(1)}=', line)
+
+        # Remove all trailing information from register assignments
+        line = re.sub('^([A-Z0-9]+)=([0-9a-f ]+).*$',
+                      lambda m: f'{m.group(1)}={m.group(2)}',
+                      line,
+                      0, re.MULTILINE)
+
+        # Now parse registers and their values from the resulting lines
+        lines = line.split('\n')
+        for line in lines:
+            split = line.split('=')
+            if len(split) == 2:
+                regname, value = split
+                value = value.replace(' ', '')
+                regname = arch.to_regname(regname)
+                if regname is not None:
+                    states[-1].set(regname, int(value, 16))
+
+    return states
+
+def parse_arancini(stream: TextIO, arch: Arch) -> list[ProgramState]:
+    aliases = {
+        'Program counter': 'RIP',
+        'flag ZF': 'ZF',
+        'flag CF': 'CF',
+        'flag OF': 'OF',
+        'flag SF': 'SF',
+        'flag PF': 'PF',
+        'flag DF': 'DF',
+    }
+
+    states = []
+    for line in stream:
+        if line.startswith('INVOKE PC='):
+            states.append(ProgramState(arch))
+            continue
+
+        # Parse a register assignment
+        split = line.split(':')
+        if len(split) == 2 and states:
+            regname, value = split
+            regname = arch.to_regname(aliases.get(regname, regname))
+            if regname is not None:
+                states[-1].set(regname, int(value, 16))
+
+    return states