about summary refs log tree commit diff stats
path: root/parser.py
diff options
context:
space:
mode:
authorTheofilos Augoustis <theofilos.augoustis@gmail.com>2023-12-31 18:29:31 +0100
committerTheofilos Augoustis <theofilos.augoustis@gmail.com>2023-12-31 18:29:31 +0100
commiteae0b3b08bd078ad2f621ce2ef201e656da3f16a (patch)
treeeb93252f39543c46146297264ff548d9925178e0 /parser.py
parentd26ae0a7d583da5034cd6271f953b6253119ceae (diff)
downloadfocaccia-eae0b3b08bd078ad2f621ce2ef201e656da3f16a.tar.gz
focaccia-eae0b3b08bd078ad2f621ce2ef201e656da3f16a.zip
Refactor project structure
Read concrete state on demand during concolic exec

During concolic tracing, don't record full program snapshots at each
basic block, but instead read concrete values directly from the concrete
target when they are needed.
Diffstat (limited to 'parser.py')
-rw-r--r--parser.py156
1 files changed, 0 insertions, 156 deletions
diff --git a/parser.py b/parser.py
deleted file mode 100644
index 391d58a..0000000
--- a/parser.py
+++ /dev/null
@@ -1,156 +0,0 @@
-"""Parsing of JSON files containing snapshot data."""
-
-import base64
-import json
-import re
-from typing import TextIO
-
-from arch import supported_architectures, Arch
-from snapshot import ProgramState
-
-class ParseError(Exception):
-    """A parse error."""
-
-def _get_or_throw(obj: dict, key: str):
-    """Get a value from a dict or throw a ParseError if not present."""
-    val = obj.get(key)
-    if val is not None:
-        return val
-    raise ParseError(f'Expected value at key {key}, but found none.')
-
-def parse_snapshots(json_stream: TextIO) -> list[ProgramState]:
-    """Parse snapshots from our JSON format."""
-    json_data = json.load(json_stream)
-
-    arch = supported_architectures[_get_or_throw(json_data, 'architecture')]
-    snapshots = []
-    for snapshot in _get_or_throw(json_data, 'snapshots'):
-        state = ProgramState(arch)
-        for reg, val in _get_or_throw(snapshot, 'registers').items():
-            state.set(reg, val)
-        for mem in _get_or_throw(snapshot, 'memory'):
-            start, end = _get_or_throw(mem, 'range')
-            data = base64.b64decode(_get_or_throw(mem, 'data'))
-            assert(len(data) == end - start)
-            state.write_memory(start, data)
-
-        snapshots.append(state)
-
-    return snapshots
-
-def serialize_snapshots(snapshots: list[ProgramState], out_stream: TextIO):
-    """Serialize a list of snapshots to out JSON format."""
-    if not snapshots:
-        return json.dump({}, out_stream)
-
-    arch = snapshots[0].arch
-    res = { 'architecture': arch.archname, 'snapshots': [] }
-    for snapshot in snapshots:
-        assert(snapshot.arch == arch)
-        regs = {r: v for r, v in snapshot.regs.items() if v is not None}
-        mem = []
-        for addr, data in snapshot.mem._pages.items():
-            mem.append({
-                'range': [addr, addr + len(data)],
-                'data': base64.b64encode(data).decode('ascii')
-            })
-        res['snapshots'].append({ 'registers': regs, 'memory': mem })
-
-    json.dump(res, out_stream)
-
-def parse_qemu(stream: TextIO, arch: Arch) -> list[ProgramState]:
-    """Parse a QEMU log from a stream.
-
-    :return: A list of parsed program states, in order of occurrence in the
-             log.
-    """
-    states = []
-    for line in stream:
-        if line.startswith('Trace'):
-            states.append(ProgramState(arch))
-            continue
-        if states:
-            _parse_qemu_line(line, states[-1])
-
-    return states
-
-def _parse_qemu_line(line: str, cur_state: ProgramState):
-    """Try to parse a single register-assignment line from a QEMU log.
-
-    Set all registers for which the line specified values in a `ProgramState`
-    object.
-
-    :param line:      The log line to parse.
-    :param cur_state: The state on which to set parsed register values.
-    """
-    line = line.strip()
-
-    # Remove padding spaces around equality signs
-    line = re.sub(' =', '=', line)
-    line = re.sub('= +', '=', line)
-
-    # Standardize register names
-    line = re.sub('YMM0([0-9])',   lambda m: f'YMM{m.group(1)}', line)
-    line = re.sub('FPR([0-9])',    lambda m: f'ST{m.group(1)}', line)
-
-    # Bring each register assignment into a new line
-    line = re.sub(' ([A-Z0-9]+)=', lambda m: f'\n{m.group(1)}=', line)
-
-    # Remove all trailing information from register assignments
-    line = re.sub('^([A-Z0-9]+)=([0-9a-f ]+).*$',
-                  lambda m: f'{m.group(1)}={m.group(2)}',
-                  line,
-                  0, re.MULTILINE)
-
-    # Now parse registers and their values from the resulting lines
-    lines = line.split('\n')
-    for line in lines:
-        split = line.split('=')
-        if len(split) == 2:
-            regname, value = split
-            value = value.replace(' ', '')
-            regname = cur_state.arch.to_regname(regname)
-            if regname is not None:
-                cur_state.set(regname, int(value, 16))
-
-def parse_arancini(stream: TextIO, arch: Arch) -> list[ProgramState]:
-    aliases = {
-        'Program counter': 'RIP',
-        'flag ZF': 'ZF',
-        'flag CF': 'CF',
-        'flag OF': 'OF',
-        'flag SF': 'SF',
-        'flag PF': 'PF',
-        'flag DF': 'DF',
-    }
-
-    states = []
-    for line in stream:
-        if line.startswith('INVOKE PC='):
-            states.append(ProgramState(arch))
-            continue
-
-        # Parse a register assignment
-        split = line.split(':')
-        if len(split) == 2 and states:
-            regname, value = split
-            regname = arch.to_regname(aliases.get(regname, regname))
-            if regname is not None:
-                states[-1].set(regname, int(value, 16))
-
-    return states
-
-if __name__ == "__main__":
-    from arch import x86
-    with open('qemu.log', 'r') as file:
-        states = parse_qemu(file, x86.ArchX86())
-        print(f'Parsed {len(states)} states from QEMU log.')
-    with open('dump.qemu', 'w') as file:
-        serialize_snapshots(states, file)
-
-    with open('emulator-log.txt', 'r') as file:
-        states = parse_arancini(file, x86.ArchX86())
-        print(f'Parsed {len(states)} states from Arancini log.')
-    with open('dump.arancini', 'w') as file:
-        serialize_snapshots(states, file)
-    exit(0)