1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
"""Invocable like this:
gdb -n --batch -x qemu_tool.py
"""
import argparse
import re
import shlex
import subprocess
from typing import TextIO
import focaccia.parser as parser
from focaccia.arch import x86
from focaccia.lldb_target import MemoryMap
from focaccia.snapshot import ProgramState
def parse_memory_maps(stream: TextIO) -> tuple[list[MemoryMap], str]:
"""
:return: Returns the list of parsed memory mappings as well as the first
line in the stream that does not belong to the memory mapping
information, i.e. the line that terminates the block of mapping
information.
The line is returned for the technical reason that the parser
needs to read a line from the stream in order to determine that
this line does no longer belong to the mapping information; but it
might still contain other important information.
"""
mappings = []
while True:
line = stream.readline()
split = line.split(' ')
if len(split) != 3 or not re.match('^[0-9a-f]+-[0-9a-f]+$', split[0]):
return mappings, line
addr_range, size, perms = split
start, end = addr_range.split('-')
start, end = int(start, 16), int(end, 16)
mappings.append(MemoryMap(start, end, '[unnamed]', perms))
def copy_memory(proc, state: ProgramState, maps: list[MemoryMap]):
"""Copy memory from a GDB process to a ProgramState object.
Problem: Reading large mappings via GDB takes way too long (~500ms for ~8MB).
"""
for mapping in maps:
# Only copy read- and writeable memory from the process. This is a
# heuristic to try to copy only heap and stack.
if 'rw' not in mapping.perms:
continue
map_size = mapping.end_address - mapping.start_address
mem = proc.read_memory(mapping.start_address, map_size)
assert(mem.contiguous)
assert(mem.nbytes == len(mem.tobytes()))
assert(mem.nbytes == map_size)
state.write_memory(mapping.start_address, mem.tobytes())
def run_gdb(qemu_log: TextIO, qemu_port: int) -> list[ProgramState]:
import gdb
gdb.execute('set pagination 0')
gdb.execute('set sysroot')
gdb.execute(f'target remote localhost:{qemu_port}')
process = gdb.selected_inferior()
arch = x86.ArchX86()
mappings: list[MemoryMap] = []
states: list[ProgramState] = []
while process.is_valid() and len(process.threads()) > 0:
for line in qemu_log:
if re.match('^start +end +size +prot$', line):
mappings, line = parse_memory_maps(qemu_log)
if line.startswith('Trace'):
states.append(ProgramState(arch))
copy_memory(process, states[-1], mappings)
continue
if states:
parser._parse_qemu_line(line, states[-1])
gdb.execute('si', to_string=True)
return states
def make_argparser():
prog = argparse.ArgumentParser()
prog.add_argument('binary',
type=str,
help='The binary to run and record.')
prog.add_argument('--binary-args',
type=str,
help='A string of arguments to be passed to the binary.')
prog.add_argument('--output', '-o', help='Name of output file.')
prog.add_argument('--gdbserver-port', type=int, default=12421)
prog.add_argument('--qemu', type=str, default='qemu-x86_64',
help='QEMU binary to invoke. [Default: qemu-x86_64')
prog.add_argument('--qemu-log', type=str, default='qemu.log')
prog.add_argument('--qemu-extra-args', type=str, default='',
help='Arguments passed to QEMU in addition to the'
' default ones required by this script.')
return prog
if __name__ == "__main__":
args = make_argparser().parse_args()
binary = args.binary
binary_args = shlex.split(args.binary_args) if args.binary_args else ''
qemu_bin = args.qemu
gdbserver_port = args.gdbserver_port
qemu_log_name = args.qemu_log
qemu_args = [
qemu_bin,
'--trace', 'target_mmap*',
'--trace', 'memory_notdirty_*',
# We write QEMU's output to a log file, then read it from that file.
# This is preferred over reading from the process's stdout pipe because
# we require a non-blocking solution that returns when all available
# lines have been read.
'-D', qemu_log_name,
'-d', 'cpu,fpu,exec,unimp,page,strace',
'-g', str(gdbserver_port),
*shlex.split(args.qemu_extra_args),
binary,
*binary_args,
]
qemu = subprocess.Popen(qemu_args)
with open(qemu_log_name, 'r') as qemu_log:
snapshots = run_gdb(qemu_log, gdbserver_port)
with open(args.output, 'w') as file:
parser.serialize_snapshots(snapshots, file)
|