diff options
| -rw-r--r-- | miasm2/ir/symbexec_top.py | 221 | ||||
| -rw-r--r-- | miasm2/ir/symbexec_types.py | 243 |
2 files changed, 464 insertions, 0 deletions
diff --git a/miasm2/ir/symbexec_top.py b/miasm2/ir/symbexec_top.py new file mode 100644 index 00000000..fe54c65f --- /dev/null +++ b/miasm2/ir/symbexec_top.py @@ -0,0 +1,221 @@ +from miasm2.ir.symbexec import SymbolicExecutionEngine, StateEngine +from miasm2.expression.simplifications import expr_simp +from miasm2.expression.expression import ExprId, ExprInt, ExprSlice,\ + ExprMem, ExprCond, ExprCompose, ExprOp +from miasm2.core import asmblock + + +TOPSTR = "TOP" + +def exprid_top(expr): + """Return a TOP expression (ExprId("TOP") of size @expr.size + @expr: expression to replace with TOP + """ + return ExprId(TOPSTR, expr.size) + + +class SymbolicStateTop(StateEngine): + + def __init__(self, dct, regstop): + self._symbols = frozenset(dct.items()) + self._regstop = frozenset(regstop) + + def __hash__(self): + return hash((self.__class__, self._symbols, self._regstop)) + + def __str__(self): + out = [] + for dst, src in sorted(self._symbols): + out.append("%s = %s" % (dst, src)) + for dst in self._regstop: + out.append('TOP %s' %dst) + return "\n".join(out) + + def __eq__(self, other): + if self is other: + return True + if self.__class__ != other.__class__: + return False + return (self.symbols == other.symbols and + self.regstop == other.regstop) + + def __iter__(self): + for dst, src in self._symbols: + yield dst, src + + def merge(self, other): + """Merge two symbolic states + Only equal expressions are kept in both states + @other: second symbolic state + """ + symb_a = self.symbols + symb_b = other.symbols + intersection = set(symb_a.keys()).intersection(symb_b.keys()) + diff = set(symb_a.keys()).union(symb_b.keys()).difference(intersection) + symbols = {} + regstop = set() + for dst in diff: + if dst.is_id(): + regstop.add(dst) + for dst in intersection: + if symb_a[dst] == symb_b[dst]: + symbols[dst] = symb_a[dst] + else: + regstop.add(dst) + return self.__class__(symbols, regstop) + + @property + def symbols(self): + """Return the dictionnary of known symbols""" + return dict(self._symbols) + + @property + def regstop(self): + """Return the set of expression with TOP values""" + return self._regstop + +class SymbExecTopNoMem(SymbolicExecutionEngine): + """ + Symbolic execution, include TOP value. + ExprMem are not propagated. + Any computation involving a TOP will generate TOP. + """ + + StateEngine = SymbolicStateTop + + def __init__(self, ir_arch, state, regstop, + func_read=None, + func_write=None, + sb_expr_simp=expr_simp): + known_symbols = dict(state) + super(SymbExecTopNoMem, self).__init__(ir_arch, known_symbols, + func_read, + func_write, + sb_expr_simp) + self.regstop = set(regstop) + + def get_state(self): + """Return the current state of the SymbolicEngine""" + return self.StateEngine(self.symbols, self.regstop) + + def eval_expr(self, expr, eval_cache=None): + if expr in self.regstop: + return exprid_top(expr) + ret = self.apply_expr_on_state(expr, eval_cache) + return ret + + def manage_mem(self, expr, state, cache, level): + ptr = self.apply_expr_on_state_visit_cache(expr.arg, state, cache, level+1) + ret = ExprMem(ptr, expr.size) + ret = self.get_mem_state(ret) + if ret.is_mem() and not ret.arg.is_int() and ret.arg == ptr: + ret = exprid_top(expr) + assert expr.size == ret.size + return ret + + def apply_expr_on_state_visit_cache(self, expr, state, cache, level=0): + """ + Deep First evaluate nodes: + 1. evaluate node's sons + 2. simplify + """ + + if expr in cache: + ret = cache[expr] + elif expr in state: + return state[expr] + elif expr.is_int(): + ret = expr + elif expr.is_id(): + if isinstance(expr.name, asmblock.asm_label) and expr.name.offset is not None: + ret = ExprInt(expr.name.offset, expr.size) + elif expr in self.regstop: + ret = exprid_top(expr) + else: + ret = state.get(expr, expr) + elif expr.is_mem(): + ret = self.manage_mem(expr, state, cache, level) + elif expr.is_cond(): + cond = self.apply_expr_on_state_visit_cache(expr.cond, state, cache, level+1) + src1 = self.apply_expr_on_state_visit_cache(expr.src1, state, cache, level+1) + src2 = self.apply_expr_on_state_visit_cache(expr.src2, state, cache, level+1) + if cond.is_id(TOPSTR) or src1.is_id(TOPSTR) or src2.is_id(TOPSTR): + ret = exprid_top(expr) + else: + ret = ExprCond(cond, src1, src2) + elif expr.is_slice(): + arg = self.apply_expr_on_state_visit_cache(expr.arg, state, cache, level+1) + if arg.is_id(TOPSTR): + ret = exprid_top(expr) + else: + ret = ExprSlice(arg, expr.start, expr.stop) + elif expr.is_op(): + args = [] + for oarg in expr.args: + arg = self.apply_expr_on_state_visit_cache(oarg, state, cache, level+1) + assert oarg.size == arg.size + if arg.is_id(TOPSTR): + return exprid_top(expr) + args.append(arg) + ret = ExprOp(expr.op, *args) + elif expr.is_compose(): + args = [] + for arg in expr.args: + arg = self.apply_expr_on_state_visit_cache(arg, state, cache, level+1) + if arg.is_id(TOPSTR): + return exprid_top(expr) + + args.append(arg) + ret = ExprCompose(*args) + else: + raise TypeError("Unknown expr type") + ret = self.expr_simp(ret) + assert expr.size == ret.size + cache[expr] = ret + return ret + + def apply_change(self, dst, src): + eval_cache = {} + if dst.is_mem(): + # If Write to TOP, forget all memory information + ret = self.eval_expr(dst.arg, eval_cache) + if ret.is_id(TOPSTR): + to_del = set() + for dst_tmp in self.symbols: + if dst_tmp.is_mem(): + to_del.add(dst_tmp) + for dst_to_del in to_del: + del self.symbols[dst_to_del] + return + src_o = self.expr_simp(src) + + # Force update. Ex: + # EBX += 1 (state: EBX = EBX+1) + # EBX -= 1 (state: EBX = EBX, must be updated) + if dst in self.regstop: + self.regstop.discard(dst) + self.symbols[dst] = src_o + + if dst == src_o: + # Avoid useless X = X information + del self.symbols[dst] + + if src_o.is_id(TOPSTR): + if dst in self.symbols: + del self.symbols[dst] + self.regstop.add(dst) + +class SymbExecTop(SymbExecTopNoMem): + """ + Symbolic execution, include TOP value. + ExprMem are propagated. + Any computation involving a TOP will generate TOP. + WARNING: avoid memory aliases here! + """ + + def manage_mem(self, expr, state, cache, level): + ptr = self.apply_expr_on_state_visit_cache(expr.arg, state, cache, level+1) + ret = ExprMem(ptr, expr.size) + ret = self.get_mem_state(ret) + assert expr.size == ret.size + return ret diff --git a/miasm2/ir/symbexec_types.py b/miasm2/ir/symbexec_types.py new file mode 100644 index 00000000..df159939 --- /dev/null +++ b/miasm2/ir/symbexec_types.py @@ -0,0 +1,243 @@ +from miasm2.ir.symbexec import SymbolicExecutionEngine, StateEngine +from miasm2.expression.simplifications import expr_simp +from miasm2.expression.expression import ExprId, ExprInt, ExprSlice,\ + ExprMem, ExprCond, ExprCompose, ExprOp + +from miasm2.core.ctypesmngr import CTypeId + + +class SymbolicStateCTypes(StateEngine): + """Store C types of symbols""" + + def __init__(self, dct, infos_types): + self._symbols = frozenset(dct.items()) + self._infos_types = frozenset(infos_types.items()) + + def __hash__(self): + return hash((self.__class__, self._symbols, self._infos_types)) + + def __str__(self): + out = [] + for dst, src in sorted(self._symbols): + out.append("%s = %s" % (dst, src)) + return "\n".join(out) + + def __eq__(self, other): + if self is other: + return True + if self.__class__ != other.__class__: + return False + return (self.symbols == other.symbols and + self.infos_types == other.infos_types) + + def __iter__(self): + for dst, src in self._symbols: + yield dst, src + + def merge(self, other): + """Merge two symbolic states + Only expressions with equal C types in both states are kept. + @other: second symbolic state + """ + symb_a = self.symbols + symb_b = other.symbols + types_a = set(self.infos_types.items()) + types_b = set(other.infos_types.items()) + intersection = set(symb_a.keys()).intersection(symb_b.keys()) + symbols = {} + infos_types = dict(types_a.intersection(types_b)) + for dst in intersection: + if symb_a[dst] == symb_b[dst]: + symbols[dst] = symb_a[dst] + return self.__class__(symbols, infos_types) + + @property + def symbols(self): + """Return the dictionnary of known symbols'types""" + return dict(self._symbols) + + @property + def infos_types(self): + """Return known types of the state""" + return dict(self._infos_types) + + +class SymbExecCType(SymbolicExecutionEngine): + """Engine of C types propagation + WARNING: avoid memory aliases here! + """ + + StateEngine = SymbolicStateCTypes + OBJC_INTERNAL = "___OBJC___" + + def __init__(self, ir_arch, + symbols, infos_types, + chandler, + func_read=None, + func_write=None, + sb_expr_simp=expr_simp): + self.chandler = chandler + self.infos_types = dict(infos_types) + super(SymbExecCType, self).__init__(ir_arch, + {}, + func_read, + func_write, + sb_expr_simp) + self.symbols = dict(symbols) + offset_types = [] + for name in [('int',), ('long',), + ('long', 'long'), + ('char',), ('short',), + + ('unsigned', 'char',), ('unsigned', 'short',), + ('unsigned', 'int',), ('unsigned', 'long',), + ('unsigned', 'long', 'long')]: + objc = self.chandler.type_analyzer.types_mngr.get_objc(CTypeId(*name)) + offset_types.append(objc) + self.offset_types = offset_types + + def is_type_offset(self, objc): + """Return True if @objc is char/short/int/long""" + return objc in self.offset_types + + def get_tpye_int_by_size(self, size): + """Return a char/short/int/long type with the size equal to @size + @size: size in bit""" + + for objc in self.offset_types: + if objc.size == size / 8: + return objc + return None + + def is_offset_list(self, types, size): + """Return the corresponding char/short/int/long type of @size, if every + types in the list @types are type offset + @types: a list of c types + @size: size in bit""" + + for arg_type in types: + if not self.is_type_offset(arg_type): + return None + objc = self.get_tpye_int_by_size(size) + if objc: + return objc + # default size + objc = self.offset_types[0] + return objc + + def apply_expr_on_state_visit_cache(self, expr, state, cache, level=0): + """ + Deep First evaluate nodes: + 1. evaluate node's sons + 2. simplify + """ + + expr = self.expr_simp(expr) + + if expr in cache: + return cache[expr] + elif expr in state: + return state[expr] + elif isinstance(expr, ExprInt): + objc = self.get_tpye_int_by_size(expr.size) + if objc is None: + objc = self.chandler.type_analyzer.types_mngr.get_objc(CTypeId('int')) + return objc + elif isinstance(expr, ExprId): + if expr in state: + return state[expr] + return None + elif isinstance(expr, ExprMem): + ptr = self.apply_expr_on_state_visit_cache(expr.arg, state, cache, level + 1) + if ptr is None: + return None + self.chandler.type_analyzer.expr_types[self.OBJC_INTERNAL] = ptr + ptr_expr = ExprId(self.OBJC_INTERNAL, expr.arg.size) + objcs = self.chandler.expr_to_types(ExprMem(ptr_expr, expr.size)) + if objcs is None: + return None + objc = objcs[0] + return objc + elif isinstance(expr, ExprCond): + src1 = self.apply_expr_on_state_visit_cache(expr.src1, state, cache, level + 1) + src2 = self.apply_expr_on_state_visit_cache(expr.src2, state, cache, level + 1) + types = [src1, src2] + objc = self.is_offset_list(types, expr.size) + if objc: + return objc + return None + elif isinstance(expr, ExprSlice): + objc = self.get_tpye_int_by_size(expr.size) + if objc is None: + # default size + objc = self.offset_types[0] + return objc + elif isinstance(expr, ExprOp): + args = [] + types = [] + for oarg in expr.args: + arg = self.apply_expr_on_state_visit_cache(oarg, state, cache, level + 1) + types.append(arg) + if None in types: + return None + objc = self.is_offset_list(types, expr.size) + if objc: + return objc + # Find Base + int + if expr.op != '+': + return None + args = list(expr.args) + if args[-1].is_int(): + offset = args.pop() + types.pop() + if len(args) == 1: + arg, arg_type = args.pop(), types.pop() + self.chandler.type_analyzer.expr_types[self.OBJC_INTERNAL] = arg_type + ptr_expr = ExprId(self.OBJC_INTERNAL, arg.size) + objc = self.chandler.expr_to_types(ptr_expr + offset) + objc = objc[0] + return objc + return None + elif isinstance(expr, ExprCompose): + types = set() + for oarg in expr.args: + arg = self.apply_expr_on_state_visit_cache(oarg, state, cache, level + 1) + types.add(arg) + objc = self.is_offset_list(types, expr.size) + if objc: + return objc + return None + else: + raise TypeError("Unknown expr type") + + def get_state(self): + """Return the current state of the SymbolicEngine""" + return self.StateEngine(self.symbols, self.infos_types) + + def eval_ir_expr(self, assignblk): + """ + Evaluate AssignBlock on the current state + @assignblk: AssignBlock instance + """ + pool_out = {} + eval_cache = {} + for dst, src in assignblk.iteritems(): + src = self.eval_expr(src, eval_cache) + if isinstance(dst, ExprMem): + continue + elif isinstance(dst, ExprId): + pool_out[dst] = src + else: + raise ValueError("affected zarb", str(dst)) + return pool_out.iteritems() + + def apply_change(self, dst, src): + objc = src + if objc is None and dst in self.symbols: + del self.symbols[dst] + else: + self.symbols[dst] = objc + + def del_mem_above_stack(self, stack_ptr): + """No stack deletion""" + return |