1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
|
"""Parsing of JSON files containing snapshot data."""
import base64
import json
import re
from typing import TextIO
from arch import supported_architectures, Arch
from snapshot import ProgramState
class ParseError(Exception):
"""A parse error."""
def _get_or_throw(obj: dict, key: str):
"""Get a value from a dict or throw a ParseError if not present."""
val = obj.get(key)
if val is not None:
return val
raise ParseError(f'Expected value at key {key}, but found none.')
def parse_snapshots(json_stream: TextIO) -> list[ProgramState]:
"""Parse snapshots from our JSON format."""
json_data = json.load(json_stream)
arch = supported_architectures[_get_or_throw(json_data, 'architecture')]
snapshots = []
for snapshot in _get_or_throw(json_data, 'snapshots'):
state = ProgramState(arch)
for reg, val in _get_or_throw(snapshot, 'registers').items():
state.set(reg, val)
for mem in _get_or_throw(snapshot, 'memory'):
start, end = _get_or_throw(mem, 'range')
data = base64.b64decode(_get_or_throw(mem, 'data'))
assert(len(data) == end - start)
state.write_memory(start, data)
snapshots.append(state)
return snapshots
def serialize_snapshots(snapshots: list[ProgramState], out_stream: TextIO):
"""Serialize a list of snapshots to out JSON format."""
if not snapshots:
return json.dump({}, out_stream)
arch = snapshots[0].arch
res = { 'architecture': arch.archname, 'snapshots': [] }
for snapshot in snapshots:
assert(snapshot.arch == arch)
regs = {r: v for r, v in snapshot.regs.items() if v is not None}
mem = []
for addr, data in snapshot.mem._pages.items():
mem.append({
'range': [addr, addr + len(data)],
'data': base64.b64encode(data).decode('ascii')
})
res['snapshots'].append({ 'registers': regs, 'memory': mem })
json.dump(res, out_stream)
def parse_qemu(stream: TextIO, arch: Arch) -> list[ProgramState]:
"""Parse a QEMU log from a stream.
:return: A list of parsed program states, in order of occurrence in the
log.
"""
states = []
for line in stream:
if line.startswith('Trace'):
states.append(ProgramState(arch))
continue
if states:
_parse_qemu_line(line, states[-1])
return states
def _parse_qemu_line(line: str, cur_state: ProgramState):
"""Try to parse a single register-assignment line from a QEMU log.
Set all registers for which the line specified values in a `ProgramState`
object.
:param line: The log line to parse.
:param cur_state: The state on which to set parsed register values.
"""
line = line.strip()
# Remove padding spaces around equality signs
line = re.sub(' =', '=', line)
line = re.sub('= +', '=', line)
# Standardize register names
line = re.sub('YMM0([0-9])', lambda m: f'YMM{m.group(1)}', line)
line = re.sub('FPR([0-9])', lambda m: f'ST{m.group(1)}', line)
# Bring each register assignment into a new line
line = re.sub(' ([A-Z0-9]+)=', lambda m: f'\n{m.group(1)}=', line)
# Remove all trailing information from register assignments
line = re.sub('^([A-Z0-9]+)=([0-9a-f ]+).*$',
lambda m: f'{m.group(1)}={m.group(2)}',
line,
0, re.MULTILINE)
# Now parse registers and their values from the resulting lines
lines = line.split('\n')
for line in lines:
split = line.split('=')
if len(split) == 2:
regname, value = split
value = value.replace(' ', '')
regname = cur_state.arch.to_regname(regname)
if regname is not None:
cur_state.set(regname, int(value, 16))
def parse_arancini(stream: TextIO, arch: Arch) -> list[ProgramState]:
aliases = {
'Program counter': 'RIP',
'flag ZF': 'ZF',
'flag CF': 'CF',
'flag OF': 'OF',
'flag SF': 'SF',
'flag PF': 'PF',
'flag DF': 'DF',
}
states = []
for line in stream:
if line.startswith('INVOKE PC='):
states.append(ProgramState(arch))
continue
# Parse a register assignment
split = line.split(':')
if len(split) == 2 and states:
regname, value = split
regname = arch.to_regname(aliases.get(regname, regname))
if regname is not None:
states[-1].set(regname, int(value, 16))
return states
if __name__ == "__main__":
from arch import x86
with open('qemu.log', 'r') as file:
states = parse_qemu(file, x86.ArchX86())
print(f'Parsed {len(states)} states from QEMU log.')
with open('dump.qemu', 'w') as file:
serialize_snapshots(states, file)
with open('emulator-log.txt', 'r') as file:
states = parse_arancini(file, x86.ArchX86())
print(f'Parsed {len(states)} states from Arancini log.')
with open('dump.arancini', 'w') as file:
serialize_snapshots(states, file)
exit(0)
|