1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
"""Invocable like this:
gdb -n --batch -x qemu_tool.py
"""
import gdb
import platform
import focaccia.parser as parser
from focaccia.arch import supported_architectures, Arch
from focaccia.compare import compare_symbolic, ErrorTypes
from focaccia.snapshot import ProgramState
from focaccia.symbolic import SymbolicTransform, eval_symbol
from focaccia.utils import print_result
from verify_qemu import make_argparser
class GDBProgramState:
def __init__(self, process: gdb.Inferior, frame: gdb.Frame):
self._proc = process
self._frame = frame
def read_register(self, regname: str) -> int | None:
try:
return int(self._frame.read_register(regname.lower()))
except ValueError as err:
from focaccia.arch import x86
rflags = int(self._frame.read_register('eflags'))
rflags = x86.decompose_rflags(rflags)
if regname in rflags:
return rflags[regname]
print(f'{regname}: {err}')
return None
def read_memory(self, addr: int, size: int) -> bytes | None:
try:
return self._proc.read_memory(addr, size).tobytes()
except gdb.MemoryError as err:
print(f'@{size}[{hex(addr)}]: {err}')
return None
class GDBServerStateIterator:
def __init__(self, address: str, port: int):
gdb.execute('set pagination 0')
gdb.execute('set sysroot')
gdb.execute(f'target remote {address}:{port}')
self._process = gdb.selected_inferior()
self._first_next = True
def __iter__(self):
return self
def __next__(self):
# The first call to __next__ should yield the first program state,
# i.e. before stepping the first time
if self._first_next:
self._first_next = False
return GDBProgramState(self._process, gdb.selected_frame())
# Step
pc = gdb.selected_frame().read_register('pc')
new_pc = pc
while pc == new_pc:
gdb.execute('si', to_string=True)
if not self._process.is_valid() or len(self._process.threads()) == 0:
raise StopIteration
new_pc = gdb.selected_frame().read_register('pc')
return GDBProgramState(self._process, gdb.selected_frame())
def collect_conc_trace(arch: Arch, \
gdb: GDBServerStateIterator, \
strace: list[SymbolicTransform]) \
-> list[ProgramState]:
states = []
for qemu, transform in zip(gdb, strace):
qemu_pc = qemu.read_register('pc')
assert(qemu_pc is not None)
if qemu_pc != transform.addr:
print(f'Fatal error: QEMU\'s program counter'
f' ({hex(qemu_pc)}) does not match the'
f' expected program counter in the symbolic trace'
f' ({hex(transform.addr)}).')
print(f'Processing only partial trace up to this instruction.')
return states
state = ProgramState(arch)
state.set_register('PC', transform.addr)
accessed_regs = transform.get_used_registers()
accessed_mems = transform.get_used_memory_addresses()
for regname in accessed_regs:
regval = qemu.read_register(regname)
if regval is not None:
state.set_register(regname, regval)
for mem in accessed_mems:
assert(mem.size % 8 == 0)
addr = eval_symbol(mem.ptr, qemu)
mem = qemu.read_memory(addr, int(mem.size / 8))
if mem is not None:
state.write_memory(addr, mem)
states.append(state)
return states
def main():
args = make_argparser().parse_args()
gdbserver_port = args.gdbserver_port
with open(args.symb_trace, 'r') as strace:
symb_transforms = parser.parse_transformations(strace)
arch = supported_architectures[platform.machine()]
conc_states = collect_conc_trace(
arch,
GDBServerStateIterator('localhost', gdbserver_port),
symb_transforms)
res = compare_symbolic(conc_states, symb_transforms)
print_result(res, ErrorTypes.POSSIBLE)
if __name__ == "__main__":
main()
|