1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
|
"""Dynamic symbolic execution module.
Offers a way to have a symbolic execution along a concrete one.
Basically, this is done through DSEEngine class, with scheme:
dse = DSEEngine(Machine("x86_32"))
dse.attach(jitter)
The DSE state can be updated through:
- .update_state_from_concrete: update the values from the CPU, so the symbolic
execution will be completely concrete from this point (until changes)
- .update_state: inject information, for instance RAX = symbolic_RAX
- .symbolize_memory: symbolize (using .memory_to_expr) memory areas (ie,
reading from an address in one of these areas yield a symbol)
The DSE run can be instrumented through:
- .add_handler: register an handler, modifying the state instead of the current
execution. Can be used for stubbing external API
- .add_lib_handler: register handlers for libraries
- .add_instrumentation: register an handler, modifying the state but continuing
the current execution. Can be used for logging facilities
On branch, if the decision is symbolic, one can also collect "path constraints"
and inverse them to produce new inputs potentially reaching new paths.
Basically, this is done through DSEPathConstraint. In order to produce a new
solution, one can extend this class, and override 'handle_solution' to produce a
solution which fit its needs. It could avoid computing new solution by
overriding 'produce_solution'.
If one is only interested in constraints associated to its path, the option
"produce_solution" should be set to False, to speed up emulation.
The constraints are accumulated in the .z3_cur z3.Solver object.
Here are a few remainings TODO:
- handle endianness in check_state / atomic read: currently, but this is also
true for others Miasm2 symbolic engines, the endianness is not take in
account, and assumed to be Little Endian
- too many memory dependencies in constraint tracking: in order to let z3 find
new solution, it does need information on memory values (for instance, a
lookup in a table with a symbolic index). The estimated possible involved
memory location could be too large to pass to the solver (threshold named
MAX_MEMORY_INJECT). One possible solution, not yet implemented, is to call
the solver for reducing the possible values thanks to its accumulated
constraints.
"""
from builtins import range
from collections import namedtuple
import warnings
try:
import z3
except:
z3 = None
from future.utils import viewitems
from miasm.core.utils import encode_hex, force_bytes
from miasm.expression.expression import ExprMem, ExprInt, ExprCompose, \
ExprAssign, ExprId, ExprLoc, LocKey, canonize_to_exprloc
from miasm.core.bin_stream import bin_stream_vm
from miasm.jitter.emulatedsymbexec import EmulatedSymbExec
from miasm.expression.expression_helper import possible_values
from miasm.ir.translators import Translator
from miasm.analysis.expression_range import expr_range
from miasm.analysis.modularintervals import ModularIntervals
DriftInfo = namedtuple("DriftInfo", ["symbol", "computed", "expected"])
class DriftException(Exception):
"""Raised when the emulation drift from the reference engine"""
def __init__(self, info):
super(DriftException, self).__init__()
self.info = info
def __str__(self):
if len(self.info) == 1:
return "Drift of %s: %s instead of %s" % (
self.info[0].symbol,
self.info[0].computed,
self.info[0].expected,
)
else:
return "Drift of:\n\t" + "\n\t".join("%s: %s instead of %s" % (
dinfo.symbol,
dinfo.computed,
dinfo.expected)
for dinfo in self.info)
class ESETrackModif(EmulatedSymbExec):
"""Extension of EmulatedSymbExec to be used by DSE engines
Add the tracking of modified expressions, and the ability to symbolize
memory areas
"""
def __init__(self, *args, **kwargs):
super(ESETrackModif, self).__init__(*args, **kwargs)
self.modified_expr = set() # Expr modified since the last reset
self.dse_memory_range = [] # List/Intervals of memory addresses to
# symbolize
self.dse_memory_to_expr = None # function(addr) -> Expr used to
# symbolize
def mem_read(self, expr_mem):
if not expr_mem.ptr.is_int():
return super(ESETrackModif, self).mem_read(expr_mem)
dst_addr = int(expr_mem.ptr)
# Split access in atomic accesses
out = []
for addr in range(dst_addr, dst_addr + expr_mem.size // 8):
if addr in self.dse_memory_range:
# Symbolize memory access
out.append(self.dse_memory_to_expr(addr))
continue
atomic_access = ExprMem(ExprInt(addr, expr_mem.ptr.size), 8)
if atomic_access in self.symbols:
out.append( super(EmulatedSymbExec, self).mem_read(atomic_access))
else:
# Get concrete value
atomic_access = ExprMem(ExprInt(addr, expr_mem.ptr.size), 8)
out.append(super(ESETrackModif, self).mem_read(atomic_access))
if len(out) == 1:
# Trivial case (optimization)
return out[0]
# Simplify for constant merging (ex: {ExprInt(1, 8), ExprInt(2, 8)})
return self.expr_simp(ExprCompose(*out))
def mem_write(self, expr, data):
# Call Symbolic mem_write (avoid side effects on vm)
return super(EmulatedSymbExec, self).mem_write(expr, data)
def reset_modified(self):
"""Reset modified expression tracker"""
self.modified_expr.clear()
def apply_change(self, dst, src):
super(ESETrackModif, self).apply_change(dst, src)
self.modified_expr.add(dst)
class ESENoVMSideEffects(EmulatedSymbExec):
"""
Do EmulatedSymbExec without modifying memory
"""
def mem_write(self, expr, data):
return super(EmulatedSymbExec, self).mem_write(expr, data)
class DSEEngine(object):
"""Dynamic Symbolic Execution Engine
This class aims to be overridden for each specific purpose
"""
SYMB_ENGINE = ESETrackModif
def __init__(self, machine, loc_db):
self.machine = machine
self.loc_db = loc_db
self.handler = {} # addr -> callback(DSEEngine instance)
self.instrumentation = {} # addr -> callback(DSEEngine instance)
self.addr_to_cacheblocks = {} # addr -> {label -> IRBlock}
self.lifter = self.machine.lifter(loc_db=self.loc_db) # corresponding IR
self.ircfg = self.lifter.new_ircfg() # corresponding IR
# Defined after attachment
self.jitter = None # Jitload (concrete execution)
self.symb = None # SymbolicExecutionEngine
self.symb_concrete = None # Concrete SymbExec for path desambiguisation
self.mdis = None # DisasmEngine
def prepare(self):
"""Prepare the environment for attachment with a jitter"""
# Disassembler
self.mdis = self.machine.dis_engine(bin_stream_vm(self.jitter.vm),
lines_wd=1,
loc_db=self.loc_db)
# Symbexec engine
## Prepare symbexec engines
self.symb = self.SYMB_ENGINE(self.jitter.cpu, self.jitter.vm,
self.lifter, {})
self.symb.enable_emulated_simplifications()
self.symb_concrete = ESENoVMSideEffects(
self.jitter.cpu, self.jitter.vm,
self.lifter, {}
)
## Update registers value
self.symb.symbols[self.lifter.IRDst] = ExprInt(
getattr(self.jitter.cpu, self.lifter.pc.name),
self.lifter.IRDst.size
)
# Activate callback on each instr
self.jitter.jit.set_options(max_exec_per_call=1, jit_maxline=1)
self.jitter.exec_cb = self.callback
# Clean jit cache to avoid multi-line basic blocks already jitted
self.jitter.jit.clear_jitted_blocks()
def attach(self, emulator):
"""Attach the DSE to @emulator
@emulator: jitload (or API equivalent) instance
To attach *DURING A BREAKPOINT*, one may consider using the following snippet:
def breakpoint(self, jitter):
...
dse.attach(jitter)
dse.update...
...
# Additional call to the exec callback is necessary, as breakpoints are
# honored AFTER exec callback
jitter.exec_cb(jitter)
return True
Without it, one may encounteer a DriftException error due to a
"desynchronization" between jitter and dse states. Indeed, on 'handle'
call, the jitter must be one instruction AFTER the dse.
"""
self.jitter = emulator
self.prepare()
def handle(self, cur_addr):
r"""Handle destination
@cur_addr: Expr of the next address in concrete execution
/!\ cur_addr may be a loc_key
In this method, self.symb is in the "just before branching" state
"""
pass
def add_handler(self, addr, callback):
"""Add a @callback for address @addr before any state update.
The state IS NOT updated after returning from the callback
@addr: int
@callback: func(dse instance)"""
self.handler[addr] = callback
def add_lib_handler(self, libimp, namespace):
"""Add search for handler based on a @libimp libimp instance
Known functions will be looked by {name}_symb or {name}_{ord}_symb in the @namespace
"""
namespace = dict(
(force_bytes(name), func) for name, func in viewitems(namespace)
)
# lambda cannot contain statement
def default_func(dse):
fname = libimp.fad2cname[dse.jitter.pc]
if isinstance(fname, tuple):
fname = b"%s_%d_symb" % (force_bytes(fname[0]), fname[1])
else:
fname = b"%s_symb" % force_bytes(fname)
raise RuntimeError("Symbolic stub '%s' not found" % fname)
for addr, fname in viewitems(libimp.fad2cname):
if isinstance(fname, tuple):
fname = b"%s_%d_symb" % (force_bytes(fname[0]), fname[1])
else:
fname = b"%s_symb" % force_bytes(fname)
func = namespace.get(fname, None)
if func is not None:
self.add_handler(addr, func)
else:
self.add_handler(addr, default_func)
def add_instrumentation(self, addr, callback):
"""Add a @callback for address @addr before any state update.
The state IS updated after returning from the callback
@addr: int
@callback: func(dse instance)"""
self.instrumentation[addr] = callback
def _check_state(self):
"""Check the current state against the concrete one"""
errors = [] # List of DriftInfo
for symbol in self.symb.modified_expr:
# Do not consider PC
if symbol in [self.lifter.pc, self.lifter.IRDst]:
continue
# Consider only concrete values
symb_value = self.eval_expr(symbol)
if not symb_value.is_int():
continue
symb_value = int(symb_value)
# Check computed values against real ones
if symbol.is_id():
if hasattr(self.jitter.cpu, symbol.name):
value = getattr(self.jitter.cpu, symbol.name)
if value != symb_value:
errors.append(DriftInfo(symbol, symb_value, value))
elif symbol.is_mem() and symbol.ptr.is_int():
value_chr = self.jitter.vm.get_mem(
int(symbol.ptr),
symbol.size // 8
)
exp_value = int(encode_hex(value_chr[::-1]), 16)
if exp_value != symb_value:
errors.append(DriftInfo(symbol, symb_value, exp_value))
# Check for drift, and act accordingly
if errors:
raise DriftException(errors)
def callback(self, _):
"""Called before each instruction"""
# Assert synchronization with concrete execution
self._check_state()
# Call callbacks associated to the current address
cur_addr = self.jitter.pc
if isinstance(cur_addr, LocKey):
lbl = self.lifter.loc_db.loc_key_to_label(cur_addr)
cur_addr = lbl.offset
if cur_addr in self.handler:
self.handler[cur_addr](self)
return True
if cur_addr in self.instrumentation:
self.instrumentation[cur_addr](self)
# Handle current address
self.handle(ExprInt(cur_addr, self.lifter.IRDst.size))
# Avoid memory issue in ExpressionSimplifier
if len(self.symb.expr_simp.cache) > 100000:
self.symb.expr_simp.cache.clear()
# Get IR blocks
if cur_addr in self.addr_to_cacheblocks:
self.ircfg.blocks.clear()
self.ircfg.blocks.update(self.addr_to_cacheblocks[cur_addr])
else:
## Reset cache structures
self.ircfg.blocks.clear()# = {}
## Update current state
asm_block = self.mdis.dis_block(cur_addr)
self.lifter.add_asmblock_to_ircfg(asm_block, self.ircfg)
self.addr_to_cacheblocks[cur_addr] = dict(self.ircfg.blocks)
# Emulate the current instruction
self.symb.reset_modified()
# Is the symbolic execution going (potentially) to jump on a lbl_gen?
if len(self.ircfg.blocks) == 1:
self.symb.run_at(self.ircfg, cur_addr)
else:
# Emulation could stuck in generated IR blocks
# But concrete execution callback is not enough precise to obtain
# the full IR blocks path
# -> Use a fully concrete execution to get back path
# Update the concrete execution
self._update_state_from_concrete_symb(
self.symb_concrete, cpu=True, mem=True
)
while True:
next_addr_concrete = self.symb_concrete.run_block_at(
self.ircfg, cur_addr
)
self.symb.run_block_at(self.ircfg, cur_addr)
if not (isinstance(next_addr_concrete, ExprLoc) and
self.lifter.loc_db.get_location_offset(
next_addr_concrete.loc_key
) is None):
# Not a lbl_gen, exit
break
# Call handle with lbl_gen state
self.handle(next_addr_concrete)
cur_addr = next_addr_concrete
# At this stage, symbolic engine is one instruction after the concrete
# engine
return True
def _get_gpregs(self):
"""Return a dict of regs: value from the jitter
This version use the regs associated to the attrib (!= cpu.get_gpreg())
"""
out = {}
regs = self.lifter.arch.regs.attrib_to_regs[self.lifter.attrib]
for reg in regs:
if hasattr(self.jitter.cpu, reg.name):
out[reg.name] = getattr(self.jitter.cpu, reg.name)
return out
def take_snapshot(self):
"""Return a snapshot of the current state (including jitter state)"""
snapshot = {
"mem": self.jitter.vm.get_all_memory(),
"regs": self._get_gpregs(),
"symb": self.symb.symbols.copy(),
}
return snapshot
def restore_snapshot(self, snapshot, memory=True):
"""Restore a @snapshot taken with .take_snapshot
@snapshot: .take_snapshot output
@memory: (optional) if set, also restore the memory
"""
# Restore memory
if memory:
self.jitter.vm.reset_memory_page_pool()
self.jitter.vm.reset_code_bloc_pool()
for addr, metadata in viewitems(snapshot["mem"]):
self.jitter.vm.add_memory_page(
addr,
metadata["access"],
metadata["data"]
)
# Restore registers
self.jitter.pc = snapshot["regs"][self.lifter.pc.name]
for reg, value in viewitems(snapshot["regs"]):
setattr(self.jitter.cpu, reg, value)
# Reset intern elements
self.jitter.vm.set_exception(0)
self.jitter.cpu.set_exception(0)
self.jitter.bs._atomic_mode = False
# Reset symb exec
for key, _ in list(viewitems(self.symb.symbols)):
del self.symb.symbols[key]
for expr, value in viewitems(snapshot["symb"]):
self.symb.symbols[expr] = value
def update_state(self, assignblk):
"""From this point, assume @assignblk in the symbolic execution
@assignblk: AssignBlock/{dst -> src}
"""
for dst, src in viewitems(assignblk):
self.symb.apply_change(dst, src)
def _update_state_from_concrete_symb(self, symbexec, cpu=True, mem=False):
if mem:
# Values will be retrieved from the concrete execution if they are
# not present
symbexec.symbols.symbols_mem.base_to_memarray.clear()
if cpu:
regs = self.lifter.arch.regs.attrib_to_regs[self.lifter.attrib]
for reg in regs:
if hasattr(self.jitter.cpu, reg.name):
value = ExprInt(getattr(self.jitter.cpu, reg.name),
size=reg.size)
symbexec.symbols[reg] = value
def update_state_from_concrete(self, cpu=True, mem=False):
r"""Update the symbolic state with concrete values from the concrete
engine
@cpu: (optional) if set, update registers' value
@mem: (optional) if set, update memory value
/!\ all current states will be loss.
This function is usually called when states are no more synchronized
(at the beginning, returning from an unstubbed syscall, ...)
"""
self._update_state_from_concrete_symb(self.symb, cpu, mem)
def eval_expr(self, expr):
"""Return the evaluation of @expr:
@expr: Expr instance"""
return self.symb.eval_expr(expr)
@staticmethod
def memory_to_expr(addr):
"""Translate an address to its corresponding symbolic ID (8bits)
@addr: int"""
return ExprId("MEM_0x%x" % int(addr), 8)
def symbolize_memory(self, memory_range):
"""Register a range of memory addresses to symbolize
@memory_range: object with support of __in__ operation (intervals, list,
...)
"""
self.symb.dse_memory_range = memory_range
self.symb.dse_memory_to_expr = self.memory_to_expr
class DSEPathConstraint(DSEEngine):
"""Dynamic Symbolic Execution Engine keeping the path constraint
Possible new "solutions" are produced along the path, by inversing concrete
path constraint. Thus, a "solution" is a potential initial context leading
to a new path.
In order to produce a new solution, one can extend this class, and override
'handle_solution' to produce a solution which fit its needs. It could avoid
computing new solution by overriding 'produce_solution'.
If one is only interested in constraints associated to its path, the option
"produce_solution" should be set to False, to speed up emulation.
The constraints are accumulated in the .z3_cur z3.Solver object.
"""
# Maximum memory size to inject in constraints solving
MAX_MEMORY_INJECT = 0x10000
# Produce solution strategies
PRODUCE_NO_SOLUTION = 0
PRODUCE_SOLUTION_CODE_COV = 1
PRODUCE_SOLUTION_BRANCH_COV = 2
PRODUCE_SOLUTION_PATH_COV = 3
def __init__(self, machine, loc_db, produce_solution=PRODUCE_SOLUTION_CODE_COV,
known_solutions=None,
**kwargs):
"""Init a DSEPathConstraint
@machine: Machine of the targeted architecture instance
@produce_solution: (optional) if set, new solutions will be computed"""
super(DSEPathConstraint, self).__init__(machine, loc_db, **kwargs)
# Dependency check
assert z3 is not None
# Init PathConstraint specifics structures
self.cur_solver = z3.Solver()
self.new_solutions = {} # solution identifier -> solution's model
self._known_solutions = set() # set of solution identifiers
self.z3_trans = Translator.to_language("z3")
self._produce_solution_strategy = produce_solution
self._previous_addr = None
self._history = None
if produce_solution == self.PRODUCE_SOLUTION_PATH_COV:
self._history = [] # List of addresses in the current path
@property
def ir_arch(self):
warnings.warn('DEPRECATION WARNING: use ".lifter" instead of ".ir_arch"')
return self.lifter
def take_snapshot(self, *args, **kwargs):
snap = super(DSEPathConstraint, self).take_snapshot(*args, **kwargs)
snap["new_solutions"] = {
dst: src.copy
for dst, src in viewitems(self.new_solutions)
}
snap["cur_constraints"] = self.cur_solver.assertions()
if self._produce_solution_strategy == self.PRODUCE_SOLUTION_PATH_COV:
snap["_history"] = list(self._history)
elif self._produce_solution_strategy == self.PRODUCE_SOLUTION_BRANCH_COV:
snap["_previous_addr"] = self._previous_addr
return snap
def restore_snapshot(self, snapshot, keep_known_solutions=True, **kwargs):
"""Restore a DSEPathConstraint snapshot
@keep_known_solutions: if set, do not forget solutions already found.
-> They will not appear in 'new_solutions'
"""
super(DSEPathConstraint, self).restore_snapshot(snapshot, **kwargs)
self.new_solutions.clear()
self.new_solutions.update(snapshot["new_solutions"])
self.cur_solver = z3.Solver()
self.cur_solver.add(snapshot["cur_constraints"])
if not keep_known_solutions:
self._known_solutions.clear()
if self._produce_solution_strategy == self.PRODUCE_SOLUTION_PATH_COV:
self._history = list(snapshot["_history"])
elif self._produce_solution_strategy == self.PRODUCE_SOLUTION_BRANCH_COV:
self._previous_addr = snapshot["_previous_addr"]
def _key_for_solution_strategy(self, destination):
"""Return the associated identifier for the current solution strategy"""
if self._produce_solution_strategy == self.PRODUCE_NO_SOLUTION:
# Never produce a solution
return None
elif self._produce_solution_strategy == self.PRODUCE_SOLUTION_CODE_COV:
# Decision based on code coverage
# -> produce a solution if the destination has never been seen
key = destination
elif self._produce_solution_strategy == self.PRODUCE_SOLUTION_BRANCH_COV:
# Decision based on branch coverage
# -> produce a solution if the current branch has never been take
key = (self._previous_addr, destination)
elif self._produce_solution_strategy == self.PRODUCE_SOLUTION_PATH_COV:
# Decision based on path coverage
# -> produce a solution if the current path has never been take
key = tuple(self._history + [destination])
else:
raise ValueError("Unknown produce solution strategy")
return key
def produce_solution(self, destination):
"""Called to determine if a solution for @destination should be test for
satisfiability and computed
@destination: Expr instance of the target @destination
"""
key = self._key_for_solution_strategy(destination)
if key is None:
return False
return key not in self._known_solutions
def handle_solution(self, model, destination):
"""Called when a new solution for destination @destination is founded
@model: z3 model instance
@destination: Expr instance for an addr which is not on the DSE path
"""
key = self._key_for_solution_strategy(destination)
assert key is not None
self.new_solutions[key] = model
self._known_solutions.add(key)
def handle_correct_destination(self, destination, path_constraints):
"""[DEV] Called by handle() to update internal structures giving the
correct destination (the concrete execution one).
"""
# Update structure used by produce_solution()
if self._produce_solution_strategy == self.PRODUCE_SOLUTION_PATH_COV:
self._history.append(destination)
elif self._produce_solution_strategy == self.PRODUCE_SOLUTION_BRANCH_COV:
self._previous_addr = destination
# Update current solver
for cons in path_constraints:
self.cur_solver.add(self.z3_trans.from_expr(cons))
def handle(self, cur_addr):
cur_addr = canonize_to_exprloc(self.lifter.loc_db, cur_addr)
symb_pc = self.eval_expr(self.lifter.IRDst)
possibilities = possible_values(symb_pc)
cur_path_constraint = set() # path_constraint for the concrete path
if len(possibilities) == 1:
dst = next(iter(possibilities)).value
dst = canonize_to_exprloc(self.lifter.loc_db, dst)
assert dst == cur_addr
else:
for possibility in possibilities:
target_addr = canonize_to_exprloc(self.lifter.loc_db, possibility.value)
path_constraint = set() # Set of ExprAssign for the possible path
# Get constraint associated to the possible path
memory_to_add = ModularIntervals(symb_pc.size)
for cons in possibility.constraints:
eaff = cons.to_constraint()
# eaff.get_r(mem_read=True) is not enough
# ExprAssign consider a Memory access in dst as a write
mem = eaff.dst.get_r(mem_read=True)
mem.update(eaff.src.get_r(mem_read=True))
for expr in mem:
if expr.is_mem():
addr_range = expr_range(expr.ptr)
# At upper bounds, add the size of the memory access
# if addr (- [a, b], then @size[addr] reachables
# values are in @8[a, b + size[
for start, stop in addr_range:
stop += expr.size // 8 - 1
full_range = ModularIntervals(
symb_pc.size,
[(start, stop)]
)
memory_to_add.update(full_range)
path_constraint.add(eaff)
if memory_to_add.length > self.MAX_MEMORY_INJECT:
# TODO re-croncretize the constraint or z3-try
raise RuntimeError("Not implemented: too long memory area")
# Inject memory
for start, stop in memory_to_add:
for address in range(start, stop + 1):
expr_mem = ExprMem(ExprInt(address,
self.lifter.pc.size),
8)
value = self.eval_expr(expr_mem)
if not value.is_int():
raise TypeError("Rely on a symbolic memory case, " \
"address 0x%x" % address)
path_constraint.add(ExprAssign(expr_mem, value))
if target_addr == cur_addr:
# Add path constraint
cur_path_constraint = path_constraint
elif self.produce_solution(target_addr):
# Looking for a new solution
self.cur_solver.push()
for cons in path_constraint:
trans = self.z3_trans.from_expr(cons)
trans = z3.simplify(trans)
self.cur_solver.add(trans)
result = self.cur_solver.check()
if result == z3.sat:
model = self.cur_solver.model()
self.handle_solution(model, target_addr)
self.cur_solver.pop()
self.handle_correct_destination(cur_addr, cur_path_constraint)
|