diff options
Diffstat (limited to 'src/miasm/ir/symbexec.py')
| -rw-r--r-- | src/miasm/ir/symbexec.py | 1132 |
1 files changed, 1132 insertions, 0 deletions
diff --git a/src/miasm/ir/symbexec.py b/src/miasm/ir/symbexec.py new file mode 100644 index 00000000..2e6b07df --- /dev/null +++ b/src/miasm/ir/symbexec.py @@ -0,0 +1,1132 @@ +from __future__ import print_function +from builtins import range +import logging +try: + from collections.abc import MutableMapping +except ImportError: + from collections import MutableMapping + +from future.utils import viewitems + +from miasm.expression.expression import ExprOp, ExprId, ExprLoc, ExprInt, \ + ExprMem, ExprCompose, ExprSlice, ExprCond +from miasm.expression.simplifications import expr_simp_explicit +from miasm.ir.ir import AssignBlock + +log = logging.getLogger("symbexec") +console_handler = logging.StreamHandler() +console_handler.setFormatter(logging.Formatter("[%(levelname)-8s]: %(message)s")) +log.addHandler(console_handler) +log.setLevel(logging.INFO) + + +def get_block(lifter, ircfg, mdis, addr): + """Get IRBlock at address @addr""" + loc_key = ircfg.get_or_create_loc_key(addr) + if not loc_key in ircfg.blocks: + offset = mdis.loc_db.get_location_offset(loc_key) + block = mdis.dis_block(offset) + lifter.add_asmblock_to_ircfg(block, ircfg) + irblock = ircfg.get_block(loc_key) + if irblock is None: + raise LookupError('No block found at that address: %s' % lifter.loc_db.pretty_str(loc_key)) + return irblock + + +class StateEngine(object): + """Stores an Engine state""" + + def merge(self, other): + """Generate a new state, representing the merge of self and @other + @other: a StateEngine instance""" + + raise NotImplementedError("Abstract method") + + +class SymbolicState(StateEngine): + """Stores a SymbolicExecutionEngine state""" + + def __init__(self, dct): + self._symbols = frozenset(viewitems(dct)) + + def __hash__(self): + return hash((self.__class__, self._symbols)) + + def __eq__(self, other): + if self is other: + return True + if self.__class__ != other.__class__: + return False + return self.symbols == other.symbols + + def __ne__(self, other): + return not self == other + + def __iter__(self): + for dst, src in self._symbols: + yield dst, src + + def iteritems(self): + """Iterate on stored memory/values""" + return self.__iter__() + + def merge(self, other): + """Merge two symbolic states + Only equal expressions are kept in both states + @other: second symbolic state + """ + + symb_a = self.symbols + symb_b = other.symbols + intersection = set(symb_a).intersection(set(symb_b)) + out = {} + for dst in intersection: + if symb_a[dst] == symb_b[dst]: + out[dst] = symb_a[dst] + return self.__class__(out) + + @property + def symbols(self): + """Return the dictionary of known symbols""" + return dict(self._symbols) + + +INTERNAL_INTBASE_NAME = "__INTERNAL_INTBASE__" + + +def get_expr_base_offset(expr): + """Return a couple representing the symbolic/concrete part of an addition + expression. + + If there is no symbolic part, ExprId(INTERNAL_INTBASE_NAME) is used + If there is not concrete part, 0 is used + @expr: Expression instance + + """ + if expr.is_int(): + internal_intbase = ExprId(INTERNAL_INTBASE_NAME, expr.size) + return internal_intbase, int(expr) + + if not expr.is_op('+'): + return expr, 0 + if expr.args[-1].is_int(): + args, offset = expr.args[:-1], int(expr.args[-1]) + if len(args) == 1: + return args[0], offset + return ExprOp('+', *args), offset + return expr, 0 + + +class MemArray(MutableMapping): + """Link between base and its (offset, Expr) + + Given an expression (say *base*), this structure will store every memory + content relatively to an integer offset from *base*. + + The value associated to a given offset is a description of the slice of a + stored expression. The slice size depends on the configuration of the + MemArray. For example, for a slice size of 8 bits, the assignment: + - @32[EAX+0x10] = EBX + + will store for the base EAX: + - 0x10: (EBX, 0) + - 0x11: (EBX, 1) + - 0x12: (EBX, 2) + - 0x13: (EBX, 3) + + If the *base* is EAX+EBX, this structure can store the following contents: + - @32[EAX+EBX] + - @8[EAX+EBX+0x100] + But not: + - @32[EAX+0x10] (which is stored in another MemArray based on EAX) + - @32[EAX+EBX+ECX] (which is stored in another MemArray based on + EAX+EBX+ECX) + + """ + + def __init__(self, base, expr_simp=expr_simp_explicit): + self._base = base + self.expr_simp = expr_simp + self._mask = int(base.mask) + self._offset_to_expr = {} + + @property + def base(self): + """Expression representing the symbolic base address""" + return self._base + + @property + def mask(self): + """Mask offset""" + return self._mask + + def __contains__(self, offset): + return offset in self._offset_to_expr + + def __getitem__(self, offset): + assert 0 <= offset <= self._mask + return self._offset_to_expr.__getitem__(offset) + + def __setitem__(self, offset, value): + raise RuntimeError("Use write api to update keys") + + def __delitem__(self, offset): + assert 0 <= offset <= self._mask + return self._offset_to_expr.__delitem__(offset) + + def __iter__(self): + for offset, _ in viewitems(self._offset_to_expr): + yield offset + + def __len__(self): + return len(self._offset_to_expr) + + def __repr__(self): + out = [] + out.append("Base: %s" % self.base) + for offset, (index, value) in sorted(viewitems(self._offset_to_expr)): + out.append("%16X %d %s" % (offset, index, value)) + return '\n'.join(out) + + def copy(self): + """Copy object instance""" + obj = MemArray(self.base, self.expr_simp) + obj._offset_to_expr = self._offset_to_expr.copy() + return obj + + @staticmethod + def offset_to_ptr(base, offset): + """ + Return an expression representing the @base + @offset + @base: symbolic base address + @offset: relative offset integer to the @base address + """ + if base.is_id(INTERNAL_INTBASE_NAME): + ptr = ExprInt(offset, base.size) + elif offset == 0: + ptr = base + else: + ptr = base + ExprInt(offset, base.size) + return ptr.canonize() + + def read(self, offset, size): + """ + Return memory at @offset with @size as an Expr list + @offset: integer (in bytes) + @size: integer (in bits), byte aligned + + Consider the following state: + - 0x10: (EBX, 0) + - 0x11: (EBX, 1) + - 0x12: (EBX, 2) + - 0x13: (EBX, 3) + + A read at 0x10 of 32 bits should return: EBX + """ + + assert size % 8 == 0 + # Parts is (Expr's offset, size, Expr) + parts = [] + for index in range(size // 8): + # Wrap read: + # @32[EAX+0xFFFFFFFF] is ok and will read at 0xFFFFFFFF, 0, 1, 2 + request_offset = (offset + index) & self._mask + if request_offset in self._offset_to_expr: + # Known memory portion + off, data = self._offset_to_expr[request_offset] + parts.append((off, 1, data)) + continue + + # Unknown memory portion + ptr = self.offset_to_ptr(self.base, request_offset) + data = ExprMem(ptr, 8) + parts.append((0, 1, data)) + + # Group similar data + # XXX TODO: only little endian here + index = 0 + while index + 1 < len(parts): + off_a, size_a, data_a = parts[index] + off_b, size_b, data_b = parts[index+1] + if data_a == data_b and off_a + size_a == off_b: + # Read consecutive bytes of a variable + # [(0, 8, x), (1, 8, x)] => (0, 16, x) + parts[index:index+2] = [(off_a, size_a + size_b, data_a)] + continue + if data_a.is_int() and data_b.is_int(): + # Read integer parts + # [(0, 8, 0x11223344), (1, 8, 0x55667788)] => (0, 16, 0x7744) + int1 = self.expr_simp(data_a[off_a*8:(off_a+size_a)*8]) + int2 = self.expr_simp(data_b[off_b*8:(off_b+size_b)*8]) + assert int1.is_int() and int2.is_int() + int1, int2 = int(int1), int(int2) + result = ExprInt((int2 << (size_a * 8)) | int1, (size_a + size_b) * 8) + parts[index:index+2] = [(0, size_a + size_b, result)] + continue + if data_a.is_mem() and data_b.is_mem(): + # Read consecutive bytes of a memory variable + ptr_base_a, ptr_offset_a = get_expr_base_offset(data_a.ptr) + ptr_base_b, ptr_offset_b = get_expr_base_offset(data_b.ptr) + if ptr_base_a != ptr_base_b: + index += 1 + continue + if (ptr_offset_a + off_a + size_a) & self._mask == (ptr_offset_b + off_b) & self._mask: + assert size_a <= data_a.size // 8 - off_a + assert size_b <= data_b.size // 8 - off_b + # Successive comparable symbolic pointers + # [(0, 8, @8[ptr]), (0, 8, @8[ptr+1])] => (0, 16, @16[ptr]) + ptr = self.offset_to_ptr(ptr_base_a, (ptr_offset_a + off_a) & self._mask) + + data = ExprMem(ptr, (size_a + size_b) * 8) + parts[index:index+2] = [(0, size_a + size_b, data)] + + continue + + index += 1 + + # Slice data + read_mem = [] + for off, bytesize, data in parts: + if data.size // 8 != bytesize: + data = data[off * 8: (off + bytesize) * 8] + read_mem.append(data) + + return read_mem + + def write(self, offset, expr): + """ + Write @expr at @offset + @offset: integer (in bytes) + @expr: Expr instance value + """ + assert expr.size % 8 == 0 + assert offset <= self._mask + for index in range(expr.size // 8): + # Wrap write: + # @32[EAX+0xFFFFFFFF] is ok and will write at 0xFFFFFFFF, 0, 1, 2 + request_offset = (offset + index) & self._mask + # XXX TODO: only little endian here + self._offset_to_expr[request_offset] = (index, expr) + + tmp = self.expr_simp(expr[index * 8: (index + 1) * 8]) + # Special case: Simplify slice of pointer (simplification is ok + # here, as we won't store the simplified expression) + if tmp.is_slice() and tmp.arg.is_mem() and tmp.start % 8 == 0: + new_ptr = self.expr_simp( + tmp.arg.ptr + ExprInt(tmp.start // 8, tmp.arg.ptr.size) + ) + tmp = ExprMem(new_ptr, tmp.stop - tmp.start) + # Test if write to original value + if tmp.is_mem(): + src_ptr, src_off = get_expr_base_offset(tmp.ptr) + if src_ptr == self.base and src_off == request_offset: + del self._offset_to_expr[request_offset] + + + def _get_variable_parts(self, index, known_offsets, forward=True): + """ + Find consecutive memory parts representing the same variable. The part + starts at offset known_offsets[@index] and search is in offset direction + determined by @forward + Return the number of consecutive parts of the same variable. + + @index: index of the memory offset in known_offsets + @known_offsets: sorted offsets + @forward: Search in offset growing direction if True, else in reverse + order + """ + + offset = known_offsets[index] + value_byte_index, value = self._offset_to_expr[offset] + assert value.size % 8 == 0 + + if forward: + start, end, step = value_byte_index + 1, value.size // 8, 1 + else: + start, end, step = value_byte_index - 1, -1, -1 + + partnum = 1 + for value_offset in range(start, end, step): + offset += step + # Check if next part is in known_offsets + next_index = index + step * partnum + if not 0 <= next_index < len(known_offsets): + break + + offset_next = known_offsets[next_index] + if offset_next != offset: + break + + # Check if next part is a part of the searched value + byte_index, value_next = self._offset_to_expr[offset_next] + if byte_index != value_offset: + break + if value != value_next: + break + partnum += 1 + + return partnum + + + def _build_value_at_offset(self, value, offset, start, length): + """ + Return a couple. The first element is the memory Expression representing + the value at @offset, the second is its value. The value is truncated + at byte @start with @length + + @value: Expression to truncate + @offset: offset in bytes of the variable (integer) + @start: value's byte offset (integer) + @length: length in bytes (integer) + """ + + ptr = self.offset_to_ptr(self.base, offset) + size = length * 8 + if start == 0 and size == value.size: + result = value + else: + result = self.expr_simp(value[start * 8: start * 8 + size]) + + return ExprMem(ptr, size), result + + + def memory(self): + """ + Iterate on stored memory/values + + The goal here is to group entities. + + Consider the following state: + EAX + 0x10 = (0, EDX) + EAX + 0x11 = (1, EDX) + EAX + 0x12 = (2, EDX) + EAX + 0x13 = (3, EDX) + + The function should return: + @32[EAX + 0x10] = EDX + """ + + if not self._offset_to_expr: + return + known_offsets = sorted(self._offset_to_expr) + index = 0 + # Test if the first element is the continuation of the last byte. If + # yes, merge and output it first. + min_int = 0 + max_int = (1 << self.base.size) - 1 + limit_index = len(known_offsets) + + first_element = None + # Special case where a variable spreads on max_int/min_int + if known_offsets[0] == min_int and known_offsets[-1] == max_int: + min_offset, max_offset = known_offsets[0], known_offsets[-1] + min_byte_index, min_value = self._offset_to_expr[min_offset] + max_byte_index, max_value = self._offset_to_expr[max_offset] + if min_value == max_value and max_byte_index + 1 == min_byte_index: + # Look for current variable start + partnum_before = self._get_variable_parts(len(known_offsets) - 1, known_offsets, False) + # Look for current variable end + partnum_after = self._get_variable_parts(0, known_offsets) + + partnum = partnum_before + partnum_after + offset = known_offsets[-partnum_before] + index_value, value = self._offset_to_expr[offset] + + mem, result = self._build_value_at_offset(value, offset, index_value, partnum) + first_element = mem, result + index = partnum_after + limit_index = len(known_offsets) - partnum_before + + # Special cases are done, walk and merge variables + while index < limit_index: + offset = known_offsets[index] + index_value, value = self._offset_to_expr[offset] + partnum = self._get_variable_parts(index, known_offsets) + mem, result = self._build_value_at_offset(value, offset, index_value, partnum) + yield mem, result + index += partnum + + if first_element is not None: + yield first_element + + def dump(self): + """Display MemArray content""" + for mem, value in self.memory(): + print("%s = %s" % (mem, value)) + + +class MemSparse(object): + """Link a symbolic memory pointer to its MemArray. + + For each symbolic memory object, this object will extract the memory pointer + *ptr*. It then splits *ptr* into a symbolic and an integer part. For + example, the memory @[ESP+4] will give ESP+4 for *ptr*. *ptr* is then split + into its base ESP and its offset 4. Each symbolic base address uses a + different MemArray. + + Example: + - @32[EAX+EBX] + - @8[EAX+EBX+0x100] + Will be stored in the same MemArray with a EAX+EBX base + + """ + + def __init__(self, addrsize, expr_simp=expr_simp_explicit): + """ + @addrsize: size (in bits) of the addresses manipulated by the MemSparse + @expr_simp: an ExpressionSimplifier instance + """ + self.addrsize = addrsize + self.expr_simp = expr_simp + self.base_to_memarray = {} + + def __contains__(self, expr): + """ + Return True if the whole @expr is present + For partial check, use 'contains_partial' + """ + if not expr.is_mem(): + return False + ptr = expr.ptr + base, offset = get_expr_base_offset(ptr) + memarray = self.base_to_memarray.get(base, None) + if memarray is None: + return False + for i in range(expr.size // 8): + if offset + i not in memarray: + return False + return True + + def contains_partial(self, expr): + """ + Return True if a part of @expr is present in memory + """ + if not expr.is_mem(): + return False + ptr = expr.ptr + base, offset = get_expr_base_offset(ptr) + memarray = self.base_to_memarray.get(base, None) + if memarray is None: + return False + for i in range(expr.size // 8): + if offset + i in memarray: + return True + return False + + def clear(self): + """Reset the current object content""" + self.base_to_memarray.clear() + + def copy(self): + """Copy the current object instance""" + base_to_memarray = {} + for base, memarray in viewitems(self.base_to_memarray): + base_to_memarray[base] = memarray.copy() + obj = MemSparse(self.addrsize, self.expr_simp) + obj.base_to_memarray = base_to_memarray + return obj + + def __delitem__(self, expr): + """ + Delete a value @expr *fully* present in memory + For partial delete, use delete_partial + """ + ptr = expr.ptr + base, offset = get_expr_base_offset(ptr) + memarray = self.base_to_memarray.get(base, None) + if memarray is None: + raise KeyError + # Check if whole entity is in the MemArray before deleting it + for i in range(expr.size // 8): + if (offset + i) & memarray.mask not in memarray: + raise KeyError + for i in range(expr.size // 8): + del memarray[(offset + i) & memarray.mask] + + def delete_partial(self, expr): + """ + Delete @expr from memory. Skip parts of @expr which are not present in + memory. + """ + ptr = expr.ptr + base, offset = get_expr_base_offset(ptr) + memarray = self.base_to_memarray.get(base, None) + if memarray is None: + raise KeyError + # Check if whole entity is in the MemArray before deleting it + for i in range(expr.size // 8): + real_offset = (offset + i) & memarray.mask + if real_offset in memarray: + del memarray[real_offset] + + def read(self, ptr, size): + """ + Return the value associated with the Expr at address @ptr + @ptr: Expr representing the memory address + @size: memory size (in bits), byte aligned + """ + assert size % 8 == 0 + base, offset = get_expr_base_offset(ptr) + memarray = self.base_to_memarray.get(base, None) + if memarray is not None: + mems = memarray.read(offset, size) + ret = mems[0] if len(mems) == 1 else ExprCompose(*mems) + else: + ret = ExprMem(ptr, size) + return ret + + def write(self, ptr, expr): + """ + Update the corresponding Expr @expr at address @ptr + @ptr: Expr representing the memory address + @expr: Expr instance + """ + assert ptr.size == self.addrsize + base, offset = get_expr_base_offset(ptr) + memarray = self.base_to_memarray.get(base, None) + if memarray is None: + memarray = MemArray(base, self.expr_simp) + self.base_to_memarray[base] = memarray + memarray.write(offset, expr) + + def iteritems(self): + """Iterate on stored memory variables and their values.""" + for _, memarray in viewitems(self.base_to_memarray): + for mem, value in memarray.memory(): + yield mem, value + + def items(self): + """Return stored memory variables and their values.""" + return list(self.iteritems()) + + def dump(self): + """Display MemSparse content""" + for mem, value in viewitems(self): + print("%s = %s" % (mem, value)) + + def __repr__(self): + out = [] + for _, memarray in sorted(viewitems(self.base_to_memarray)): + out.append(repr(memarray)) + return '\n'.join(out) + + +class SymbolMngr(object): + """Symbolic store manager (IDs and MEMs)""" + + def __init__(self, init=None, addrsize=None, expr_simp=expr_simp_explicit): + assert addrsize is not None + if init is None: + init = {} + self.addrsize = addrsize + self.expr_simp = expr_simp + self.symbols_id = {} + self.symbols_mem = MemSparse(addrsize, expr_simp) + self.mask = (1 << addrsize) - 1 + for expr, value in viewitems(init): + self.write(expr, value) + + def __contains__(self, expr): + if expr.is_id(): + return self.symbols_id.__contains__(expr) + if expr.is_mem(): + return self.symbols_mem.__contains__(expr) + return False + + def __getitem__(self, expr): + return self.read(expr) + + def __setitem__(self, expr, value): + self.write(expr, value) + + def __delitem__(self, expr): + if expr.is_id(): + del self.symbols_id[expr] + elif expr.is_mem(): + del self.symbols_mem[expr] + else: + raise TypeError("Bad source expr") + + def copy(self): + """Copy object instance""" + obj = SymbolMngr(self, addrsize=self.addrsize, expr_simp=self.expr_simp) + return obj + + def clear(self): + """Forget every variables values""" + self.symbols_id.clear() + self.symbols_mem.clear() + + def read(self, src): + """ + Return the value corresponding to Expr @src + @src: ExprId or ExprMem instance + """ + if src.is_id(): + return self.symbols_id.get(src, src) + elif src.is_mem(): + # Only byte aligned accesses are supported for now + assert src.size % 8 == 0 + return self.symbols_mem.read(src.ptr, src.size) + else: + raise TypeError("Bad source expr") + + def write(self, dst, src): + """ + Update @dst with @src expression + @dst: ExprId or ExprMem instance + @src: Expression instance + """ + assert dst.size == src.size + if dst.is_id(): + if dst == src: + if dst in self.symbols_id: + del self.symbols_id[dst] + else: + self.symbols_id[dst] = src + elif dst.is_mem(): + # Only byte aligned accesses are supported for now + assert dst.size % 8 == 0 + self.symbols_mem.write(dst.ptr, src) + else: + raise TypeError("Bad destination expr") + + def dump(self, ids=True, mems=True): + """Display memory content""" + if ids: + for variable, value in self.ids(): + print('%s = %s' % (variable, value)) + if mems: + for mem, value in self.memory(): + print('%s = %s' % (mem, value)) + + def __repr__(self): + out = [] + for variable, value in viewitems(self): + out.append('%s = %s' % (variable, value)) + return "\n".join(out) + + def iteritems(self): + """ExprId/ExprMem iteritems of the current state""" + for variable, value in self.ids(): + yield variable, value + for variable, value in self.memory(): + yield variable, value + + def items(self): + """Return variables/values of the current state""" + return list(self.iteritems()) + + def __iter__(self): + for expr, _ in self.iteritems(): + yield expr + + def ids(self): + """Iterate on variables and their values.""" + for expr, value in viewitems(self.symbols_id): + yield expr, value + + def memory(self): + """Iterate on memory variables and their values.""" + for mem, value in viewitems(self.symbols_mem): + yield mem, value + + def keys(self): + """Variables of the current state""" + return list(self) + + +def merge_ptr_read(known, ptrs): + """ + Merge common memory parts in a multiple byte memory. + @ptrs: memory bytes list + @known: ptrs' associated boolean for present/unpresent memory part in the + store + """ + assert known + out = [] + known.append(None) + ptrs.append(None) + last, value, size = known[0], ptrs[0], 8 + for index, part in enumerate(known[1:], 1): + if part == last: + size += 8 + else: + out.append((last, value, size)) + last, value, size = part, ptrs[index], 8 + return out + + +class SymbolicExecutionEngine(object): + """ + Symbolic execution engine + Allow IR code emulation in symbolic domain + + + Examples: + from miasm.ir.symbexec import SymbolicExecutionEngine + from miasm.ir.ir import AssignBlock + + lifter = Lifter_X86_32() + + init_state = { + lifter.arch.regs.EAX: lifter.arch.regs.EBX, + ExprMem(id_x+ExprInt(0x10, 32), 32): id_a, + } + + sb_exec = SymbolicExecutionEngine(lifter, init_state) + + >>> sb_exec.dump() + EAX = a + @32[x + 0x10] = a + >>> sb_exec.dump(mems=False) + EAX = a + + >>> print sb_exec.eval_expr(lifter.arch.regs.EAX + lifter.arch.regs.ECX) + EBX + ECX + + Inspecting state: + - dump + - modified + State manipulation: + - '.state' (rw) + + Evaluation (read only): + - eval_expr + - eval_assignblk + Evaluation with state update: + - eval_updt_expr + - eval_updt_assignblk + - eval_updt_irblock + + Start a symbolic execution based on provisioned '.lifter' blocks: + - run_block_at + - run_at + """ + + StateEngine = SymbolicState + + def __init__(self, lifter, state=None, + sb_expr_simp=expr_simp_explicit): + + self.expr_to_visitor = { + ExprInt: self.eval_exprint, + ExprId: self.eval_exprid, + ExprLoc: self.eval_exprloc, + ExprMem: self.eval_exprmem, + ExprSlice: self.eval_exprslice, + ExprCond: self.eval_exprcond, + ExprOp: self.eval_exprop, + ExprCompose: self.eval_exprcompose, + } + + if state is None: + state = {} + + self.symbols = SymbolMngr(addrsize=lifter.addrsize, expr_simp=sb_expr_simp) + + for dst, src in viewitems(state): + self.symbols.write(dst, src) + + self.lifter = lifter + self.expr_simp = sb_expr_simp + + @property + def ir_arch(self): + warnings.warn('DEPRECATION WARNING: use ".lifter" instead of ".ir_arch"') + return self.lifter + + def get_state(self): + """Return the current state of the SymbolicEngine""" + state = self.StateEngine(dict(self.symbols)) + return state + + def set_state(self, state): + """Restaure the @state of the engine + @state: StateEngine instance + """ + self.symbols = SymbolMngr(addrsize=self.lifter.addrsize, expr_simp=self.expr_simp) + for dst, src in viewitems(dict(state)): + self.symbols[dst] = src + + state = property(get_state, set_state) + + def eval_expr_visitor(self, expr, cache=None): + """ + [DEV]: Override to change the behavior of an Expr evaluation. + This function recursively applies 'eval_expr*' to @expr. + This function uses @cache to speedup re-evaluation of expression. + """ + if cache is None: + cache = {} + + ret = cache.get(expr, None) + if ret is not None: + return ret + + new_expr = self.expr_simp(expr) + ret = cache.get(new_expr, None) + if ret is not None: + return ret + + func = self.expr_to_visitor.get(new_expr.__class__, None) + if func is None: + raise TypeError("Unknown expr type") + + ret = func(new_expr, cache=cache) + ret = self.expr_simp(ret) + assert ret is not None + + cache[expr] = ret + cache[new_expr] = ret + return ret + + def eval_exprint(self, expr, **kwargs): + """[DEV]: Evaluate an ExprInt using the current state""" + return expr + + def eval_exprid(self, expr, **kwargs): + """[DEV]: Evaluate an ExprId using the current state""" + ret = self.symbols.read(expr) + return ret + + def eval_exprloc(self, expr, **kwargs): + """[DEV]: Evaluate an ExprLoc using the current state""" + offset = self.lifter.loc_db.get_location_offset(expr.loc_key) + if offset is not None: + ret = ExprInt(offset, expr.size) + else: + ret = expr + return ret + + def eval_exprmem(self, expr, **kwargs): + """[DEV]: Evaluate an ExprMem using the current state + This function first evaluate the memory pointer value. + Override 'mem_read' to modify the effective memory accesses + """ + ptr = self.eval_expr_visitor(expr.ptr, **kwargs) + mem = ExprMem(ptr, expr.size) + ret = self.mem_read(mem) + return ret + + def eval_exprcond(self, expr, **kwargs): + """[DEV]: Evaluate an ExprCond using the current state""" + cond = self.eval_expr_visitor(expr.cond, **kwargs) + src1 = self.eval_expr_visitor(expr.src1, **kwargs) + src2 = self.eval_expr_visitor(expr.src2, **kwargs) + ret = ExprCond(cond, src1, src2) + return ret + + def eval_exprslice(self, expr, **kwargs): + """[DEV]: Evaluate an ExprSlice using the current state""" + arg = self.eval_expr_visitor(expr.arg, **kwargs) + ret = ExprSlice(arg, expr.start, expr.stop) + return ret + + def eval_exprop(self, expr, **kwargs): + """[DEV]: Evaluate an ExprOp using the current state""" + args = [] + for oarg in expr.args: + arg = self.eval_expr_visitor(oarg, **kwargs) + args.append(arg) + ret = ExprOp(expr.op, *args) + return ret + + def eval_exprcompose(self, expr, **kwargs): + """[DEV]: Evaluate an ExprCompose using the current state""" + args = [] + for arg in expr.args: + args.append(self.eval_expr_visitor(arg, **kwargs)) + ret = ExprCompose(*args) + return ret + + def eval_expr(self, expr, eval_cache=None): + """ + Evaluate @expr + @expr: Expression instance to evaluate + @cache: None or dictionary linking variables to their values + """ + if eval_cache is None: + eval_cache = {} + ret = self.eval_expr_visitor(expr, cache=eval_cache) + assert ret is not None + return ret + + def modified(self, init_state=None, ids=True, mems=True): + """ + Return the modified variables. + @init_state: a base dictionary linking variables to their initial values + to diff. Can be None. + @ids: track ids only + @mems: track mems only + """ + if init_state is None: + init_state = {} + if ids: + for variable, value in viewitems(self.symbols.symbols_id): + if variable in init_state and init_state[variable] == value: + continue + yield variable, value + if mems: + for mem, value in self.symbols.memory(): + if mem in init_state and init_state[mem] == value: + continue + yield mem, value + + def dump(self, ids=True, mems=True): + """ + Display modififed variables + @ids: display modified ids + @mems: display modified memory + """ + + for variable, value in self.modified(None, ids, mems): + print("%-18s" % variable, "=", "%s" % value) + + def eval_assignblk(self, assignblk): + """ + Evaluate AssignBlock using the current state + + Returns a dictionary containing modified keys associated to their values + + @assignblk: AssignBlock instance + """ + pool_out = {} + eval_cache = {} + for dst, src in viewitems(assignblk): + src = self.eval_expr(src, eval_cache) + if dst.is_mem(): + ptr = self.eval_expr(dst.ptr, eval_cache) + # Test if mem lookup is known + tmp = ExprMem(ptr, dst.size) + pool_out[tmp] = src + elif dst.is_id(): + pool_out[dst] = src + else: + raise ValueError("Unknown destination type", str(dst)) + + return pool_out + + def apply_change(self, dst, src): + """ + Apply @dst = @src on the current state WITHOUT evaluating both side + @dst: Expr, destination + @src: Expr, source + """ + if dst.is_mem(): + self.mem_write(dst, src) + else: + self.symbols.write(dst, src) + + def eval_updt_assignblk(self, assignblk): + """ + Apply an AssignBlock on the current state + @assignblk: AssignBlock instance + """ + mem_dst = [] + dst_src = self.eval_assignblk(assignblk) + for dst, src in viewitems(dst_src): + self.apply_change(dst, src) + if dst.is_mem(): + mem_dst.append(dst) + return mem_dst + + def eval_updt_irblock(self, irb, step=False): + """ + Symbolic execution of the @irb on the current state + @irb: irbloc instance + @step: display intermediate steps + """ + for assignblk in irb: + if step: + print('Instr', assignblk.instr) + print('Assignblk:') + print(assignblk) + print('_' * 80) + self.eval_updt_assignblk(assignblk) + if step: + self.dump(mems=False) + self.dump(ids=False) + print('_' * 80) + dst = self.eval_expr(self.lifter.IRDst) + + return dst + + def run_block_at(self, ircfg, addr, step=False): + """ + Symbolic execution of the block at @addr + @addr: address to execute (int or ExprInt or label) + @step: display intermediate steps + """ + irblock = ircfg.get_block(addr) + if irblock is not None: + addr = self.eval_updt_irblock(irblock, step=step) + return addr + + def run_at(self, ircfg, addr, lbl_stop=None, step=False): + """ + Symbolic execution starting at @addr + @addr: address to execute (int or ExprInt or label) + @lbl_stop: LocKey to stop execution on + @step: display intermediate steps + """ + while True: + irblock = ircfg.get_block(addr) + if irblock is None: + break + if irblock.loc_key == lbl_stop: + break + addr = self.eval_updt_irblock(irblock, step=step) + return addr + + def del_mem_above_stack(self, stack_ptr): + """ + Remove all stored memory values with following properties: + * pointer based on initial stack value + * pointer below current stack pointer + """ + stack_ptr = self.eval_expr(stack_ptr) + base, stk_offset = get_expr_base_offset(stack_ptr) + memarray = self.symbols.symbols_mem.base_to_memarray.get(base, None) + if memarray: + to_del = set() + for offset in memarray: + if ((offset - stk_offset) & int(stack_ptr.mask)) >> (stack_ptr.size - 1) != 0: + to_del.add(offset) + + for offset in to_del: + del memarray[offset] + + def eval_updt_expr(self, expr): + """ + Evaluate @expr and apply side effect if needed (ie. if expr is an + assignment). Return the evaluated value + """ + + # Update value if needed + if expr.is_assign(): + ret = self.eval_expr(expr.src) + self.eval_updt_assignblk(AssignBlock([expr])) + else: + ret = self.eval_expr(expr) + + return ret + + def mem_read(self, expr): + """ + [DEV]: Override to modify the effective memory reads + + Read symbolic value at ExprMem @expr + @expr: ExprMem + """ + return self.symbols.read(expr) + + def mem_write(self, dst, src): + """ + [DEV]: Override to modify the effective memory writes + + Write symbolic value @src at ExprMem @dst + @dst: destination ExprMem + @src: source Expression + """ + self.symbols.write(dst, src) |