diff options
Diffstat (limited to 'miasm/ir/symbexec.py')
| -rw-r--r-- | miasm/ir/symbexec.py | 1132 |
1 files changed, 0 insertions, 1132 deletions
diff --git a/miasm/ir/symbexec.py b/miasm/ir/symbexec.py deleted file mode 100644 index 2e6b07df..00000000 --- a/miasm/ir/symbexec.py +++ /dev/null @@ -1,1132 +0,0 @@ -from __future__ import print_function -from builtins import range -import logging -try: - from collections.abc import MutableMapping -except ImportError: - from collections import MutableMapping - -from future.utils import viewitems - -from miasm.expression.expression import ExprOp, ExprId, ExprLoc, ExprInt, \ - ExprMem, ExprCompose, ExprSlice, ExprCond -from miasm.expression.simplifications import expr_simp_explicit -from miasm.ir.ir import AssignBlock - -log = logging.getLogger("symbexec") -console_handler = logging.StreamHandler() -console_handler.setFormatter(logging.Formatter("[%(levelname)-8s]: %(message)s")) -log.addHandler(console_handler) -log.setLevel(logging.INFO) - - -def get_block(lifter, ircfg, mdis, addr): - """Get IRBlock at address @addr""" - loc_key = ircfg.get_or_create_loc_key(addr) - if not loc_key in ircfg.blocks: - offset = mdis.loc_db.get_location_offset(loc_key) - block = mdis.dis_block(offset) - lifter.add_asmblock_to_ircfg(block, ircfg) - irblock = ircfg.get_block(loc_key) - if irblock is None: - raise LookupError('No block found at that address: %s' % lifter.loc_db.pretty_str(loc_key)) - return irblock - - -class StateEngine(object): - """Stores an Engine state""" - - def merge(self, other): - """Generate a new state, representing the merge of self and @other - @other: a StateEngine instance""" - - raise NotImplementedError("Abstract method") - - -class SymbolicState(StateEngine): - """Stores a SymbolicExecutionEngine state""" - - def __init__(self, dct): - self._symbols = frozenset(viewitems(dct)) - - def __hash__(self): - return hash((self.__class__, self._symbols)) - - def __eq__(self, other): - if self is other: - return True - if self.__class__ != other.__class__: - return False - return self.symbols == other.symbols - - def __ne__(self, other): - return not self == other - - def __iter__(self): - for dst, src in self._symbols: - yield dst, src - - def iteritems(self): - """Iterate on stored memory/values""" - return self.__iter__() - - def merge(self, other): - """Merge two symbolic states - Only equal expressions are kept in both states - @other: second symbolic state - """ - - symb_a = self.symbols - symb_b = other.symbols - intersection = set(symb_a).intersection(set(symb_b)) - out = {} - for dst in intersection: - if symb_a[dst] == symb_b[dst]: - out[dst] = symb_a[dst] - return self.__class__(out) - - @property - def symbols(self): - """Return the dictionary of known symbols""" - return dict(self._symbols) - - -INTERNAL_INTBASE_NAME = "__INTERNAL_INTBASE__" - - -def get_expr_base_offset(expr): - """Return a couple representing the symbolic/concrete part of an addition - expression. - - If there is no symbolic part, ExprId(INTERNAL_INTBASE_NAME) is used - If there is not concrete part, 0 is used - @expr: Expression instance - - """ - if expr.is_int(): - internal_intbase = ExprId(INTERNAL_INTBASE_NAME, expr.size) - return internal_intbase, int(expr) - - if not expr.is_op('+'): - return expr, 0 - if expr.args[-1].is_int(): - args, offset = expr.args[:-1], int(expr.args[-1]) - if len(args) == 1: - return args[0], offset - return ExprOp('+', *args), offset - return expr, 0 - - -class MemArray(MutableMapping): - """Link between base and its (offset, Expr) - - Given an expression (say *base*), this structure will store every memory - content relatively to an integer offset from *base*. - - The value associated to a given offset is a description of the slice of a - stored expression. The slice size depends on the configuration of the - MemArray. For example, for a slice size of 8 bits, the assignment: - - @32[EAX+0x10] = EBX - - will store for the base EAX: - - 0x10: (EBX, 0) - - 0x11: (EBX, 1) - - 0x12: (EBX, 2) - - 0x13: (EBX, 3) - - If the *base* is EAX+EBX, this structure can store the following contents: - - @32[EAX+EBX] - - @8[EAX+EBX+0x100] - But not: - - @32[EAX+0x10] (which is stored in another MemArray based on EAX) - - @32[EAX+EBX+ECX] (which is stored in another MemArray based on - EAX+EBX+ECX) - - """ - - def __init__(self, base, expr_simp=expr_simp_explicit): - self._base = base - self.expr_simp = expr_simp - self._mask = int(base.mask) - self._offset_to_expr = {} - - @property - def base(self): - """Expression representing the symbolic base address""" - return self._base - - @property - def mask(self): - """Mask offset""" - return self._mask - - def __contains__(self, offset): - return offset in self._offset_to_expr - - def __getitem__(self, offset): - assert 0 <= offset <= self._mask - return self._offset_to_expr.__getitem__(offset) - - def __setitem__(self, offset, value): - raise RuntimeError("Use write api to update keys") - - def __delitem__(self, offset): - assert 0 <= offset <= self._mask - return self._offset_to_expr.__delitem__(offset) - - def __iter__(self): - for offset, _ in viewitems(self._offset_to_expr): - yield offset - - def __len__(self): - return len(self._offset_to_expr) - - def __repr__(self): - out = [] - out.append("Base: %s" % self.base) - for offset, (index, value) in sorted(viewitems(self._offset_to_expr)): - out.append("%16X %d %s" % (offset, index, value)) - return '\n'.join(out) - - def copy(self): - """Copy object instance""" - obj = MemArray(self.base, self.expr_simp) - obj._offset_to_expr = self._offset_to_expr.copy() - return obj - - @staticmethod - def offset_to_ptr(base, offset): - """ - Return an expression representing the @base + @offset - @base: symbolic base address - @offset: relative offset integer to the @base address - """ - if base.is_id(INTERNAL_INTBASE_NAME): - ptr = ExprInt(offset, base.size) - elif offset == 0: - ptr = base - else: - ptr = base + ExprInt(offset, base.size) - return ptr.canonize() - - def read(self, offset, size): - """ - Return memory at @offset with @size as an Expr list - @offset: integer (in bytes) - @size: integer (in bits), byte aligned - - Consider the following state: - - 0x10: (EBX, 0) - - 0x11: (EBX, 1) - - 0x12: (EBX, 2) - - 0x13: (EBX, 3) - - A read at 0x10 of 32 bits should return: EBX - """ - - assert size % 8 == 0 - # Parts is (Expr's offset, size, Expr) - parts = [] - for index in range(size // 8): - # Wrap read: - # @32[EAX+0xFFFFFFFF] is ok and will read at 0xFFFFFFFF, 0, 1, 2 - request_offset = (offset + index) & self._mask - if request_offset in self._offset_to_expr: - # Known memory portion - off, data = self._offset_to_expr[request_offset] - parts.append((off, 1, data)) - continue - - # Unknown memory portion - ptr = self.offset_to_ptr(self.base, request_offset) - data = ExprMem(ptr, 8) - parts.append((0, 1, data)) - - # Group similar data - # XXX TODO: only little endian here - index = 0 - while index + 1 < len(parts): - off_a, size_a, data_a = parts[index] - off_b, size_b, data_b = parts[index+1] - if data_a == data_b and off_a + size_a == off_b: - # Read consecutive bytes of a variable - # [(0, 8, x), (1, 8, x)] => (0, 16, x) - parts[index:index+2] = [(off_a, size_a + size_b, data_a)] - continue - if data_a.is_int() and data_b.is_int(): - # Read integer parts - # [(0, 8, 0x11223344), (1, 8, 0x55667788)] => (0, 16, 0x7744) - int1 = self.expr_simp(data_a[off_a*8:(off_a+size_a)*8]) - int2 = self.expr_simp(data_b[off_b*8:(off_b+size_b)*8]) - assert int1.is_int() and int2.is_int() - int1, int2 = int(int1), int(int2) - result = ExprInt((int2 << (size_a * 8)) | int1, (size_a + size_b) * 8) - parts[index:index+2] = [(0, size_a + size_b, result)] - continue - if data_a.is_mem() and data_b.is_mem(): - # Read consecutive bytes of a memory variable - ptr_base_a, ptr_offset_a = get_expr_base_offset(data_a.ptr) - ptr_base_b, ptr_offset_b = get_expr_base_offset(data_b.ptr) - if ptr_base_a != ptr_base_b: - index += 1 - continue - if (ptr_offset_a + off_a + size_a) & self._mask == (ptr_offset_b + off_b) & self._mask: - assert size_a <= data_a.size // 8 - off_a - assert size_b <= data_b.size // 8 - off_b - # Successive comparable symbolic pointers - # [(0, 8, @8[ptr]), (0, 8, @8[ptr+1])] => (0, 16, @16[ptr]) - ptr = self.offset_to_ptr(ptr_base_a, (ptr_offset_a + off_a) & self._mask) - - data = ExprMem(ptr, (size_a + size_b) * 8) - parts[index:index+2] = [(0, size_a + size_b, data)] - - continue - - index += 1 - - # Slice data - read_mem = [] - for off, bytesize, data in parts: - if data.size // 8 != bytesize: - data = data[off * 8: (off + bytesize) * 8] - read_mem.append(data) - - return read_mem - - def write(self, offset, expr): - """ - Write @expr at @offset - @offset: integer (in bytes) - @expr: Expr instance value - """ - assert expr.size % 8 == 0 - assert offset <= self._mask - for index in range(expr.size // 8): - # Wrap write: - # @32[EAX+0xFFFFFFFF] is ok and will write at 0xFFFFFFFF, 0, 1, 2 - request_offset = (offset + index) & self._mask - # XXX TODO: only little endian here - self._offset_to_expr[request_offset] = (index, expr) - - tmp = self.expr_simp(expr[index * 8: (index + 1) * 8]) - # Special case: Simplify slice of pointer (simplification is ok - # here, as we won't store the simplified expression) - if tmp.is_slice() and tmp.arg.is_mem() and tmp.start % 8 == 0: - new_ptr = self.expr_simp( - tmp.arg.ptr + ExprInt(tmp.start // 8, tmp.arg.ptr.size) - ) - tmp = ExprMem(new_ptr, tmp.stop - tmp.start) - # Test if write to original value - if tmp.is_mem(): - src_ptr, src_off = get_expr_base_offset(tmp.ptr) - if src_ptr == self.base and src_off == request_offset: - del self._offset_to_expr[request_offset] - - - def _get_variable_parts(self, index, known_offsets, forward=True): - """ - Find consecutive memory parts representing the same variable. The part - starts at offset known_offsets[@index] and search is in offset direction - determined by @forward - Return the number of consecutive parts of the same variable. - - @index: index of the memory offset in known_offsets - @known_offsets: sorted offsets - @forward: Search in offset growing direction if True, else in reverse - order - """ - - offset = known_offsets[index] - value_byte_index, value = self._offset_to_expr[offset] - assert value.size % 8 == 0 - - if forward: - start, end, step = value_byte_index + 1, value.size // 8, 1 - else: - start, end, step = value_byte_index - 1, -1, -1 - - partnum = 1 - for value_offset in range(start, end, step): - offset += step - # Check if next part is in known_offsets - next_index = index + step * partnum - if not 0 <= next_index < len(known_offsets): - break - - offset_next = known_offsets[next_index] - if offset_next != offset: - break - - # Check if next part is a part of the searched value - byte_index, value_next = self._offset_to_expr[offset_next] - if byte_index != value_offset: - break - if value != value_next: - break - partnum += 1 - - return partnum - - - def _build_value_at_offset(self, value, offset, start, length): - """ - Return a couple. The first element is the memory Expression representing - the value at @offset, the second is its value. The value is truncated - at byte @start with @length - - @value: Expression to truncate - @offset: offset in bytes of the variable (integer) - @start: value's byte offset (integer) - @length: length in bytes (integer) - """ - - ptr = self.offset_to_ptr(self.base, offset) - size = length * 8 - if start == 0 and size == value.size: - result = value - else: - result = self.expr_simp(value[start * 8: start * 8 + size]) - - return ExprMem(ptr, size), result - - - def memory(self): - """ - Iterate on stored memory/values - - The goal here is to group entities. - - Consider the following state: - EAX + 0x10 = (0, EDX) - EAX + 0x11 = (1, EDX) - EAX + 0x12 = (2, EDX) - EAX + 0x13 = (3, EDX) - - The function should return: - @32[EAX + 0x10] = EDX - """ - - if not self._offset_to_expr: - return - known_offsets = sorted(self._offset_to_expr) - index = 0 - # Test if the first element is the continuation of the last byte. If - # yes, merge and output it first. - min_int = 0 - max_int = (1 << self.base.size) - 1 - limit_index = len(known_offsets) - - first_element = None - # Special case where a variable spreads on max_int/min_int - if known_offsets[0] == min_int and known_offsets[-1] == max_int: - min_offset, max_offset = known_offsets[0], known_offsets[-1] - min_byte_index, min_value = self._offset_to_expr[min_offset] - max_byte_index, max_value = self._offset_to_expr[max_offset] - if min_value == max_value and max_byte_index + 1 == min_byte_index: - # Look for current variable start - partnum_before = self._get_variable_parts(len(known_offsets) - 1, known_offsets, False) - # Look for current variable end - partnum_after = self._get_variable_parts(0, known_offsets) - - partnum = partnum_before + partnum_after - offset = known_offsets[-partnum_before] - index_value, value = self._offset_to_expr[offset] - - mem, result = self._build_value_at_offset(value, offset, index_value, partnum) - first_element = mem, result - index = partnum_after - limit_index = len(known_offsets) - partnum_before - - # Special cases are done, walk and merge variables - while index < limit_index: - offset = known_offsets[index] - index_value, value = self._offset_to_expr[offset] - partnum = self._get_variable_parts(index, known_offsets) - mem, result = self._build_value_at_offset(value, offset, index_value, partnum) - yield mem, result - index += partnum - - if first_element is not None: - yield first_element - - def dump(self): - """Display MemArray content""" - for mem, value in self.memory(): - print("%s = %s" % (mem, value)) - - -class MemSparse(object): - """Link a symbolic memory pointer to its MemArray. - - For each symbolic memory object, this object will extract the memory pointer - *ptr*. It then splits *ptr* into a symbolic and an integer part. For - example, the memory @[ESP+4] will give ESP+4 for *ptr*. *ptr* is then split - into its base ESP and its offset 4. Each symbolic base address uses a - different MemArray. - - Example: - - @32[EAX+EBX] - - @8[EAX+EBX+0x100] - Will be stored in the same MemArray with a EAX+EBX base - - """ - - def __init__(self, addrsize, expr_simp=expr_simp_explicit): - """ - @addrsize: size (in bits) of the addresses manipulated by the MemSparse - @expr_simp: an ExpressionSimplifier instance - """ - self.addrsize = addrsize - self.expr_simp = expr_simp - self.base_to_memarray = {} - - def __contains__(self, expr): - """ - Return True if the whole @expr is present - For partial check, use 'contains_partial' - """ - if not expr.is_mem(): - return False - ptr = expr.ptr - base, offset = get_expr_base_offset(ptr) - memarray = self.base_to_memarray.get(base, None) - if memarray is None: - return False - for i in range(expr.size // 8): - if offset + i not in memarray: - return False - return True - - def contains_partial(self, expr): - """ - Return True if a part of @expr is present in memory - """ - if not expr.is_mem(): - return False - ptr = expr.ptr - base, offset = get_expr_base_offset(ptr) - memarray = self.base_to_memarray.get(base, None) - if memarray is None: - return False - for i in range(expr.size // 8): - if offset + i in memarray: - return True - return False - - def clear(self): - """Reset the current object content""" - self.base_to_memarray.clear() - - def copy(self): - """Copy the current object instance""" - base_to_memarray = {} - for base, memarray in viewitems(self.base_to_memarray): - base_to_memarray[base] = memarray.copy() - obj = MemSparse(self.addrsize, self.expr_simp) - obj.base_to_memarray = base_to_memarray - return obj - - def __delitem__(self, expr): - """ - Delete a value @expr *fully* present in memory - For partial delete, use delete_partial - """ - ptr = expr.ptr - base, offset = get_expr_base_offset(ptr) - memarray = self.base_to_memarray.get(base, None) - if memarray is None: - raise KeyError - # Check if whole entity is in the MemArray before deleting it - for i in range(expr.size // 8): - if (offset + i) & memarray.mask not in memarray: - raise KeyError - for i in range(expr.size // 8): - del memarray[(offset + i) & memarray.mask] - - def delete_partial(self, expr): - """ - Delete @expr from memory. Skip parts of @expr which are not present in - memory. - """ - ptr = expr.ptr - base, offset = get_expr_base_offset(ptr) - memarray = self.base_to_memarray.get(base, None) - if memarray is None: - raise KeyError - # Check if whole entity is in the MemArray before deleting it - for i in range(expr.size // 8): - real_offset = (offset + i) & memarray.mask - if real_offset in memarray: - del memarray[real_offset] - - def read(self, ptr, size): - """ - Return the value associated with the Expr at address @ptr - @ptr: Expr representing the memory address - @size: memory size (in bits), byte aligned - """ - assert size % 8 == 0 - base, offset = get_expr_base_offset(ptr) - memarray = self.base_to_memarray.get(base, None) - if memarray is not None: - mems = memarray.read(offset, size) - ret = mems[0] if len(mems) == 1 else ExprCompose(*mems) - else: - ret = ExprMem(ptr, size) - return ret - - def write(self, ptr, expr): - """ - Update the corresponding Expr @expr at address @ptr - @ptr: Expr representing the memory address - @expr: Expr instance - """ - assert ptr.size == self.addrsize - base, offset = get_expr_base_offset(ptr) - memarray = self.base_to_memarray.get(base, None) - if memarray is None: - memarray = MemArray(base, self.expr_simp) - self.base_to_memarray[base] = memarray - memarray.write(offset, expr) - - def iteritems(self): - """Iterate on stored memory variables and their values.""" - for _, memarray in viewitems(self.base_to_memarray): - for mem, value in memarray.memory(): - yield mem, value - - def items(self): - """Return stored memory variables and their values.""" - return list(self.iteritems()) - - def dump(self): - """Display MemSparse content""" - for mem, value in viewitems(self): - print("%s = %s" % (mem, value)) - - def __repr__(self): - out = [] - for _, memarray in sorted(viewitems(self.base_to_memarray)): - out.append(repr(memarray)) - return '\n'.join(out) - - -class SymbolMngr(object): - """Symbolic store manager (IDs and MEMs)""" - - def __init__(self, init=None, addrsize=None, expr_simp=expr_simp_explicit): - assert addrsize is not None - if init is None: - init = {} - self.addrsize = addrsize - self.expr_simp = expr_simp - self.symbols_id = {} - self.symbols_mem = MemSparse(addrsize, expr_simp) - self.mask = (1 << addrsize) - 1 - for expr, value in viewitems(init): - self.write(expr, value) - - def __contains__(self, expr): - if expr.is_id(): - return self.symbols_id.__contains__(expr) - if expr.is_mem(): - return self.symbols_mem.__contains__(expr) - return False - - def __getitem__(self, expr): - return self.read(expr) - - def __setitem__(self, expr, value): - self.write(expr, value) - - def __delitem__(self, expr): - if expr.is_id(): - del self.symbols_id[expr] - elif expr.is_mem(): - del self.symbols_mem[expr] - else: - raise TypeError("Bad source expr") - - def copy(self): - """Copy object instance""" - obj = SymbolMngr(self, addrsize=self.addrsize, expr_simp=self.expr_simp) - return obj - - def clear(self): - """Forget every variables values""" - self.symbols_id.clear() - self.symbols_mem.clear() - - def read(self, src): - """ - Return the value corresponding to Expr @src - @src: ExprId or ExprMem instance - """ - if src.is_id(): - return self.symbols_id.get(src, src) - elif src.is_mem(): - # Only byte aligned accesses are supported for now - assert src.size % 8 == 0 - return self.symbols_mem.read(src.ptr, src.size) - else: - raise TypeError("Bad source expr") - - def write(self, dst, src): - """ - Update @dst with @src expression - @dst: ExprId or ExprMem instance - @src: Expression instance - """ - assert dst.size == src.size - if dst.is_id(): - if dst == src: - if dst in self.symbols_id: - del self.symbols_id[dst] - else: - self.symbols_id[dst] = src - elif dst.is_mem(): - # Only byte aligned accesses are supported for now - assert dst.size % 8 == 0 - self.symbols_mem.write(dst.ptr, src) - else: - raise TypeError("Bad destination expr") - - def dump(self, ids=True, mems=True): - """Display memory content""" - if ids: - for variable, value in self.ids(): - print('%s = %s' % (variable, value)) - if mems: - for mem, value in self.memory(): - print('%s = %s' % (mem, value)) - - def __repr__(self): - out = [] - for variable, value in viewitems(self): - out.append('%s = %s' % (variable, value)) - return "\n".join(out) - - def iteritems(self): - """ExprId/ExprMem iteritems of the current state""" - for variable, value in self.ids(): - yield variable, value - for variable, value in self.memory(): - yield variable, value - - def items(self): - """Return variables/values of the current state""" - return list(self.iteritems()) - - def __iter__(self): - for expr, _ in self.iteritems(): - yield expr - - def ids(self): - """Iterate on variables and their values.""" - for expr, value in viewitems(self.symbols_id): - yield expr, value - - def memory(self): - """Iterate on memory variables and their values.""" - for mem, value in viewitems(self.symbols_mem): - yield mem, value - - def keys(self): - """Variables of the current state""" - return list(self) - - -def merge_ptr_read(known, ptrs): - """ - Merge common memory parts in a multiple byte memory. - @ptrs: memory bytes list - @known: ptrs' associated boolean for present/unpresent memory part in the - store - """ - assert known - out = [] - known.append(None) - ptrs.append(None) - last, value, size = known[0], ptrs[0], 8 - for index, part in enumerate(known[1:], 1): - if part == last: - size += 8 - else: - out.append((last, value, size)) - last, value, size = part, ptrs[index], 8 - return out - - -class SymbolicExecutionEngine(object): - """ - Symbolic execution engine - Allow IR code emulation in symbolic domain - - - Examples: - from miasm.ir.symbexec import SymbolicExecutionEngine - from miasm.ir.ir import AssignBlock - - lifter = Lifter_X86_32() - - init_state = { - lifter.arch.regs.EAX: lifter.arch.regs.EBX, - ExprMem(id_x+ExprInt(0x10, 32), 32): id_a, - } - - sb_exec = SymbolicExecutionEngine(lifter, init_state) - - >>> sb_exec.dump() - EAX = a - @32[x + 0x10] = a - >>> sb_exec.dump(mems=False) - EAX = a - - >>> print sb_exec.eval_expr(lifter.arch.regs.EAX + lifter.arch.regs.ECX) - EBX + ECX - - Inspecting state: - - dump - - modified - State manipulation: - - '.state' (rw) - - Evaluation (read only): - - eval_expr - - eval_assignblk - Evaluation with state update: - - eval_updt_expr - - eval_updt_assignblk - - eval_updt_irblock - - Start a symbolic execution based on provisioned '.lifter' blocks: - - run_block_at - - run_at - """ - - StateEngine = SymbolicState - - def __init__(self, lifter, state=None, - sb_expr_simp=expr_simp_explicit): - - self.expr_to_visitor = { - ExprInt: self.eval_exprint, - ExprId: self.eval_exprid, - ExprLoc: self.eval_exprloc, - ExprMem: self.eval_exprmem, - ExprSlice: self.eval_exprslice, - ExprCond: self.eval_exprcond, - ExprOp: self.eval_exprop, - ExprCompose: self.eval_exprcompose, - } - - if state is None: - state = {} - - self.symbols = SymbolMngr(addrsize=lifter.addrsize, expr_simp=sb_expr_simp) - - for dst, src in viewitems(state): - self.symbols.write(dst, src) - - self.lifter = lifter - self.expr_simp = sb_expr_simp - - @property - def ir_arch(self): - warnings.warn('DEPRECATION WARNING: use ".lifter" instead of ".ir_arch"') - return self.lifter - - def get_state(self): - """Return the current state of the SymbolicEngine""" - state = self.StateEngine(dict(self.symbols)) - return state - - def set_state(self, state): - """Restaure the @state of the engine - @state: StateEngine instance - """ - self.symbols = SymbolMngr(addrsize=self.lifter.addrsize, expr_simp=self.expr_simp) - for dst, src in viewitems(dict(state)): - self.symbols[dst] = src - - state = property(get_state, set_state) - - def eval_expr_visitor(self, expr, cache=None): - """ - [DEV]: Override to change the behavior of an Expr evaluation. - This function recursively applies 'eval_expr*' to @expr. - This function uses @cache to speedup re-evaluation of expression. - """ - if cache is None: - cache = {} - - ret = cache.get(expr, None) - if ret is not None: - return ret - - new_expr = self.expr_simp(expr) - ret = cache.get(new_expr, None) - if ret is not None: - return ret - - func = self.expr_to_visitor.get(new_expr.__class__, None) - if func is None: - raise TypeError("Unknown expr type") - - ret = func(new_expr, cache=cache) - ret = self.expr_simp(ret) - assert ret is not None - - cache[expr] = ret - cache[new_expr] = ret - return ret - - def eval_exprint(self, expr, **kwargs): - """[DEV]: Evaluate an ExprInt using the current state""" - return expr - - def eval_exprid(self, expr, **kwargs): - """[DEV]: Evaluate an ExprId using the current state""" - ret = self.symbols.read(expr) - return ret - - def eval_exprloc(self, expr, **kwargs): - """[DEV]: Evaluate an ExprLoc using the current state""" - offset = self.lifter.loc_db.get_location_offset(expr.loc_key) - if offset is not None: - ret = ExprInt(offset, expr.size) - else: - ret = expr - return ret - - def eval_exprmem(self, expr, **kwargs): - """[DEV]: Evaluate an ExprMem using the current state - This function first evaluate the memory pointer value. - Override 'mem_read' to modify the effective memory accesses - """ - ptr = self.eval_expr_visitor(expr.ptr, **kwargs) - mem = ExprMem(ptr, expr.size) - ret = self.mem_read(mem) - return ret - - def eval_exprcond(self, expr, **kwargs): - """[DEV]: Evaluate an ExprCond using the current state""" - cond = self.eval_expr_visitor(expr.cond, **kwargs) - src1 = self.eval_expr_visitor(expr.src1, **kwargs) - src2 = self.eval_expr_visitor(expr.src2, **kwargs) - ret = ExprCond(cond, src1, src2) - return ret - - def eval_exprslice(self, expr, **kwargs): - """[DEV]: Evaluate an ExprSlice using the current state""" - arg = self.eval_expr_visitor(expr.arg, **kwargs) - ret = ExprSlice(arg, expr.start, expr.stop) - return ret - - def eval_exprop(self, expr, **kwargs): - """[DEV]: Evaluate an ExprOp using the current state""" - args = [] - for oarg in expr.args: - arg = self.eval_expr_visitor(oarg, **kwargs) - args.append(arg) - ret = ExprOp(expr.op, *args) - return ret - - def eval_exprcompose(self, expr, **kwargs): - """[DEV]: Evaluate an ExprCompose using the current state""" - args = [] - for arg in expr.args: - args.append(self.eval_expr_visitor(arg, **kwargs)) - ret = ExprCompose(*args) - return ret - - def eval_expr(self, expr, eval_cache=None): - """ - Evaluate @expr - @expr: Expression instance to evaluate - @cache: None or dictionary linking variables to their values - """ - if eval_cache is None: - eval_cache = {} - ret = self.eval_expr_visitor(expr, cache=eval_cache) - assert ret is not None - return ret - - def modified(self, init_state=None, ids=True, mems=True): - """ - Return the modified variables. - @init_state: a base dictionary linking variables to their initial values - to diff. Can be None. - @ids: track ids only - @mems: track mems only - """ - if init_state is None: - init_state = {} - if ids: - for variable, value in viewitems(self.symbols.symbols_id): - if variable in init_state and init_state[variable] == value: - continue - yield variable, value - if mems: - for mem, value in self.symbols.memory(): - if mem in init_state and init_state[mem] == value: - continue - yield mem, value - - def dump(self, ids=True, mems=True): - """ - Display modififed variables - @ids: display modified ids - @mems: display modified memory - """ - - for variable, value in self.modified(None, ids, mems): - print("%-18s" % variable, "=", "%s" % value) - - def eval_assignblk(self, assignblk): - """ - Evaluate AssignBlock using the current state - - Returns a dictionary containing modified keys associated to their values - - @assignblk: AssignBlock instance - """ - pool_out = {} - eval_cache = {} - for dst, src in viewitems(assignblk): - src = self.eval_expr(src, eval_cache) - if dst.is_mem(): - ptr = self.eval_expr(dst.ptr, eval_cache) - # Test if mem lookup is known - tmp = ExprMem(ptr, dst.size) - pool_out[tmp] = src - elif dst.is_id(): - pool_out[dst] = src - else: - raise ValueError("Unknown destination type", str(dst)) - - return pool_out - - def apply_change(self, dst, src): - """ - Apply @dst = @src on the current state WITHOUT evaluating both side - @dst: Expr, destination - @src: Expr, source - """ - if dst.is_mem(): - self.mem_write(dst, src) - else: - self.symbols.write(dst, src) - - def eval_updt_assignblk(self, assignblk): - """ - Apply an AssignBlock on the current state - @assignblk: AssignBlock instance - """ - mem_dst = [] - dst_src = self.eval_assignblk(assignblk) - for dst, src in viewitems(dst_src): - self.apply_change(dst, src) - if dst.is_mem(): - mem_dst.append(dst) - return mem_dst - - def eval_updt_irblock(self, irb, step=False): - """ - Symbolic execution of the @irb on the current state - @irb: irbloc instance - @step: display intermediate steps - """ - for assignblk in irb: - if step: - print('Instr', assignblk.instr) - print('Assignblk:') - print(assignblk) - print('_' * 80) - self.eval_updt_assignblk(assignblk) - if step: - self.dump(mems=False) - self.dump(ids=False) - print('_' * 80) - dst = self.eval_expr(self.lifter.IRDst) - - return dst - - def run_block_at(self, ircfg, addr, step=False): - """ - Symbolic execution of the block at @addr - @addr: address to execute (int or ExprInt or label) - @step: display intermediate steps - """ - irblock = ircfg.get_block(addr) - if irblock is not None: - addr = self.eval_updt_irblock(irblock, step=step) - return addr - - def run_at(self, ircfg, addr, lbl_stop=None, step=False): - """ - Symbolic execution starting at @addr - @addr: address to execute (int or ExprInt or label) - @lbl_stop: LocKey to stop execution on - @step: display intermediate steps - """ - while True: - irblock = ircfg.get_block(addr) - if irblock is None: - break - if irblock.loc_key == lbl_stop: - break - addr = self.eval_updt_irblock(irblock, step=step) - return addr - - def del_mem_above_stack(self, stack_ptr): - """ - Remove all stored memory values with following properties: - * pointer based on initial stack value - * pointer below current stack pointer - """ - stack_ptr = self.eval_expr(stack_ptr) - base, stk_offset = get_expr_base_offset(stack_ptr) - memarray = self.symbols.symbols_mem.base_to_memarray.get(base, None) - if memarray: - to_del = set() - for offset in memarray: - if ((offset - stk_offset) & int(stack_ptr.mask)) >> (stack_ptr.size - 1) != 0: - to_del.add(offset) - - for offset in to_del: - del memarray[offset] - - def eval_updt_expr(self, expr): - """ - Evaluate @expr and apply side effect if needed (ie. if expr is an - assignment). Return the evaluated value - """ - - # Update value if needed - if expr.is_assign(): - ret = self.eval_expr(expr.src) - self.eval_updt_assignblk(AssignBlock([expr])) - else: - ret = self.eval_expr(expr) - - return ret - - def mem_read(self, expr): - """ - [DEV]: Override to modify the effective memory reads - - Read symbolic value at ExprMem @expr - @expr: ExprMem - """ - return self.symbols.read(expr) - - def mem_write(self, dst, src): - """ - [DEV]: Override to modify the effective memory writes - - Write symbolic value @src at ExprMem @dst - @dst: destination ExprMem - @src: source Expression - """ - self.symbols.write(dst, src) |