1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
"""Parsing of JSON files containing snapshot data."""
import base64
import json
import re
from typing import TextIO
from arch import supported_architectures, Arch
from snapshot import ProgramState
class ParseError(Exception):
"""A parse error."""
def _get_or_throw(obj: dict, key: str):
"""Get a value from a dict or throw a ParseError if not present."""
val = obj.get(key)
if val is not None:
return val
raise ParseError(f'Expected value at key {key}, but found none.')
def parse_snapshots(json_stream: TextIO) -> list[ProgramState]:
"""Parse snapshots from our JSON format."""
json_data = json.load(json_stream)
arch = supported_architectures[_get_or_throw(json_data, 'architecture')]
snapshots = []
for snapshot in _get_or_throw(json_data, 'snapshots'):
state = ProgramState(arch)
for reg, val in _get_or_throw(snapshot, 'registers').items():
state.set(reg, val)
for mem in _get_or_throw(snapshot, 'memory'):
start, end = _get_or_throw(mem, 'range')
data = base64.b64decode(_get_or_throw(mem, 'data'))
assert(len(data) == end - start)
state.write_memory(start, data)
snapshots.append(state)
return snapshots
def serialize_snapshots(snapshots: list[ProgramState], out_stream: TextIO):
"""Serialize a list of snapshots to out JSON format."""
if not snapshots:
return json.dump({}, out_stream)
arch = snapshots[0].arch
res = { 'architecture': arch.archname, 'snapshots': [] }
for snapshot in snapshots:
assert(snapshot.arch == arch)
regs = {r: v for r, v in snapshot.regs.items() if v is not None}
mem = []
for addr, data in snapshot.mem._pages.items():
mem.append({
'range': [addr, addr + len(data)],
'data': base64.b64encode(data).decode('ascii')
})
res['snapshots'].append({ 'registers': regs, 'memory': mem })
json.dump(res, out_stream)
def parse_qemu(stream: TextIO, arch: Arch) -> list[ProgramState]:
states = []
for line in stream:
if line.startswith('Trace'):
states.append(ProgramState(arch))
continue
line = line.strip()
# Remove padding spaces around equality signs
line = re.sub(' =', '=', line)
line = re.sub('= +', '=', line)
# Standardize register names
line = re.sub('YMM0([0-9])', lambda m: f'YMM{m.group(1)}', line)
line = re.sub('FPR([0-9])', lambda m: f'ST{m.group(1)}', line)
# Bring each register assignment into a new line
line = re.sub(' ([A-Z0-9]+)=', lambda m: f'\n{m.group(1)}=', line)
# Remove all trailing information from register assignments
line = re.sub('^([A-Z0-9]+)=([0-9a-f ]+).*$',
lambda m: f'{m.group(1)}={m.group(2)}',
line,
0, re.MULTILINE)
# Now parse registers and their values from the resulting lines
lines = line.split('\n')
for line in lines:
split = line.split('=')
if len(split) == 2:
regname, value = split
value = value.replace(' ', '')
regname = arch.to_regname(regname)
if regname is not None:
states[-1].set(regname, int(value, 16))
return states
def parse_arancini(stream: TextIO, arch: Arch) -> list[ProgramState]:
aliases = {
'Program counter': 'RIP',
'flag ZF': 'ZF',
'flag CF': 'CF',
'flag OF': 'OF',
'flag SF': 'SF',
'flag PF': 'PF',
'flag DF': 'DF',
}
states = []
for line in stream:
if line.startswith('INVOKE PC='):
states.append(ProgramState(arch))
continue
# Parse a register assignment
split = line.split(':')
if len(split) == 2 and states:
regname, value = split
regname = arch.to_regname(aliases.get(regname, regname))
if regname is not None:
states[-1].set(regname, int(value, 16))
return states
|