diff options
Diffstat (limited to 'src/miasm/ir')
| -rw-r--r-- | src/miasm/ir/__init__.py | 1 | ||||
| -rw-r--r-- | src/miasm/ir/analysis.py | 119 | ||||
| -rw-r--r-- | src/miasm/ir/ir.py | 1052 | ||||
| -rw-r--r-- | src/miasm/ir/symbexec.py | 1132 | ||||
| -rw-r--r-- | src/miasm/ir/symbexec_top.py | 221 | ||||
| -rw-r--r-- | src/miasm/ir/symbexec_types.py | 131 | ||||
| -rw-r--r-- | src/miasm/ir/translators/C.py | 536 | ||||
| -rw-r--r-- | src/miasm/ir/translators/__init__.py | 13 | ||||
| -rw-r--r-- | src/miasm/ir/translators/miasm_ir.py | 45 | ||||
| -rw-r--r-- | src/miasm/ir/translators/python.py | 103 | ||||
| -rw-r--r-- | src/miasm/ir/translators/smt2.py | 330 | ||||
| -rw-r--r-- | src/miasm/ir/translators/translator.py | 127 | ||||
| -rw-r--r-- | src/miasm/ir/translators/z3_ir.py | 285 |
13 files changed, 4095 insertions, 0 deletions
diff --git a/src/miasm/ir/__init__.py b/src/miasm/ir/__init__.py new file mode 100644 index 00000000..0627b488 --- /dev/null +++ b/src/miasm/ir/__init__.py @@ -0,0 +1 @@ +"Intermediate representation methods" diff --git a/src/miasm/ir/analysis.py b/src/miasm/ir/analysis.py new file mode 100644 index 00000000..c08ea13b --- /dev/null +++ b/src/miasm/ir/analysis.py @@ -0,0 +1,119 @@ +#-*- coding:utf-8 -*- + +import warnings +import logging + +from miasm.ir.ir import Lifter, AssignBlock +from miasm.expression.expression import ExprOp, ExprAssign + + +log = logging.getLogger("analysis") +console_handler = logging.StreamHandler() +console_handler.setFormatter(logging.Formatter("[%(levelname)-8s]: %(message)s")) +log.addHandler(console_handler) +log.setLevel(logging.WARNING) + + +class LifterModelCall(Lifter): + """IR Analysis + This class provides higher level manipulations on IR, such as dead + instruction removals. + + This class can be used as a common parent with + `miasm.ir.ir::Lifter` class. + + For instance: + class LifterModelCall_x86_16(Lifter_X86_16, LifterModelCall) + + """ + ret_reg = None + + def call_effects(self, addr, instr): + """Default modelisation of a function call to @addr. This may be used to: + + * insert dependencies to arguments (stack base, registers, ...) + * add some side effects (stack clean, return value, ...) + + Return a couple: + * list of assignments to add to the current irblock + * list of additional irblocks + + @addr: (Expr) address of the called function + @instr: native instruction which is responsible of the call + """ + + call_assignblk = AssignBlock( + [ + ExprAssign(self.ret_reg, ExprOp('call_func_ret', addr, self.sp)), + ExprAssign(self.sp, ExprOp('call_func_stack', addr, self.sp)) + ], + instr + ) + return [call_assignblk], [] + + def add_instr_to_current_state(self, instr, block, assignments, ir_blocks_all, gen_pc_updt): + """ + Add the IR effects of an instruction to the current state. + If the instruction is a function call, replace the original IR by a + model of the sub function + + Returns a bool: + * True if the current assignments list must be split + * False in other cases. + + @instr: native instruction + @block: native block source + @assignments: current irbloc + @ir_blocks_all: list of additional effects + @gen_pc_updt: insert PC update effects between instructions + """ + if instr.is_subcall(): + call_assignblks, extra_irblocks = self.call_effects( + instr.args[0], + instr + ) + assignments += call_assignblks + ir_blocks_all += extra_irblocks + return True + + if gen_pc_updt is not False: + self.gen_pc_update(assignments, instr) + + assignblk, ir_blocks_extra = self.instr2ir(instr) + assignments.append(assignblk) + ir_blocks_all += ir_blocks_extra + if ir_blocks_extra: + return True + return False + + def sizeof_char(self): + "Return the size of a char in bits" + raise NotImplementedError("Abstract method") + + def sizeof_short(self): + "Return the size of a short in bits" + raise NotImplementedError("Abstract method") + + def sizeof_int(self): + "Return the size of an int in bits" + raise NotImplementedError("Abstract method") + + def sizeof_long(self): + "Return the size of a long in bits" + raise NotImplementedError("Abstract method") + + def sizeof_pointer(self): + "Return the size of a void* in bits" + raise NotImplementedError("Abstract method") + + + +class ira(LifterModelCall): + """ + DEPRECATED object + Use LifterModelCall instead of ira + """ + + def __init__(self, arch, attrib, loc_db): + warnings.warn('DEPRECATION WARNING: use "LifterModelCall" instead of "ira"') + super(ira, self).__init__(arch, attrib, loc_db) diff --git a/src/miasm/ir/ir.py b/src/miasm/ir/ir.py new file mode 100644 index 00000000..57bff4db --- /dev/null +++ b/src/miasm/ir/ir.py @@ -0,0 +1,1052 @@ +#-*- coding:utf-8 -*- + +# +# Copyright (C) 2013 Fabrice Desclaux +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# +from builtins import zip +import warnings + +from itertools import chain +from future.utils import viewvalues, viewitems + +import miasm.expression.expression as m2_expr +from miasm.expression.expression_helper import get_missing_interval +from miasm.core.asmblock import AsmBlock, AsmBlockBad, AsmConstraint +from miasm.core.graph import DiGraph +from miasm.ir.translators import Translator +from functools import reduce +from miasm.core import utils +import re + + +def _expr_loc_to_symb(expr, loc_db): + if not expr.is_loc(): + return expr + if loc_db is None: + name = str(expr) + else: + names = loc_db.get_location_names(expr.loc_key) + if not names: + name = loc_db.pretty_str(expr.loc_key) + else: + # Use only one name for readability + name = sorted(names)[0] + return m2_expr.ExprId(name, expr.size) + + +ESCAPE_CHARS = re.compile('[' + re.escape('{}[]') + '&|<>' + ']') + +class TranslatorHtml(Translator): + __LANG__ = "custom_expr_color" + + @staticmethod + def _fix_chars(token): + return "&#%04d;" % ord(token.group()) + + def __init__(self, loc_db=None, **kwargs): + super(TranslatorHtml, self).__init__(**kwargs) + self.loc_db = loc_db + + def str_protected_child(self, child, parent): + return ("(%s)" % ( + self.from_expr(child)) if m2_expr.should_parenthesize_child(child, parent) + else self.from_expr(child) + ) + + def from_ExprInt(self, expr): + out = str(expr) + out = '<font color="%s">%s</font>' % (utils.COLOR_INT, out) + return out + + def from_ExprId(self, expr): + out = str(expr) + out = '<font color="%s">%s</font>' % (utils.COLOR_ID, out) + return out + + def from_ExprLoc(self, expr): + + if self.loc_db is None: + name = ESCAPE_CHARS.sub(self._fix_chars, str((expr))) + else: + names = self.loc_db.get_location_names(expr.loc_key) + if not names: + name = self.loc_db.pretty_str(expr.loc_key) + else: + # Use only one name for readability + name = sorted(names)[0] + name = ESCAPE_CHARS.sub(self._fix_chars, name) + out = '<font color="%s">%s</font>' % (utils.COLOR_LOC, name) + return out + + def from_ExprMem(self, expr): + ptr = self.from_expr(expr.ptr) + size = '@' + str(expr.size) + size = '<font color="%s">%s</font>' % (utils.COLOR_MEM, size) + bracket_left = ESCAPE_CHARS.sub(self._fix_chars, '[') + bracket_right = ESCAPE_CHARS.sub(self._fix_chars, ']') + out = '%s%s%s%s' % (size, bracket_left, ptr, bracket_right) + return out + + def from_ExprSlice(self, expr): + base = self.from_expr(expr.arg) + start = str(expr.start) + stop = str(expr.stop) + bracket_left = ESCAPE_CHARS.sub(self._fix_chars, '[') + bracket_right = ESCAPE_CHARS.sub(self._fix_chars, ']') + out = "(%s)%s%s:%s%s" % (base, bracket_left, start, stop, bracket_right) + return out + + def from_ExprCompose(self, expr): + out = ESCAPE_CHARS.sub(self._fix_chars, "{") + out += ", ".join(["%s, %s, %s" % (self.from_expr(subexpr), + str(idx), + str(idx + subexpr.size)) + for idx, subexpr in expr.iter_args()]) + out += ESCAPE_CHARS.sub(self._fix_chars, "}") + return out + + def from_ExprCond(self, expr): + cond = self.str_protected_child(expr.cond, expr) + src1 = self.from_expr(expr.src1) + src2 = self.from_expr(expr.src2) + out = "%s?(%s,%s)" % (cond, src1, src2) + return out + + def from_ExprOp(self, expr): + op = ESCAPE_CHARS.sub(self._fix_chars, expr._op) + if expr._op == '-': # Unary minus + return '-' + self.str_protected_child(expr._args[0], expr) + if expr.is_associative() or expr.is_infix(): + return (' ' + op + ' ').join([self.str_protected_child(arg, expr) + for arg in expr._args]) + + op = '<font color="%s">%s</font>' % (utils.COLOR_OP_FUNC, op) + return (op + '(' + + ', '.join( + self.from_expr(arg) + for arg in expr._args + ) + ')') + + def from_ExprAssign(self, expr): + return "%s = %s" % tuple(map(expr.from_expr, (expr.dst, expr.src))) + + +def color_expr_html(expr, loc_db): + translator = TranslatorHtml(loc_db=loc_db) + return translator.from_expr(expr) + +def slice_rest(expr): + "Return the completion of the current slice" + size = expr.arg.size + if expr.start >= size or expr.stop > size: + raise ValueError('bad slice rest %s %s %s' % + (size, expr.start, expr.stop)) + + if expr.start == expr.stop: + return [(0, size)] + + rest = [] + if expr.start != 0: + rest.append((0, expr.start)) + if expr.stop < size: + rest.append((expr.stop, size)) + + return rest + + + +class AssignBlock(object): + """Represent parallel IR assignment, such as: + EAX = EBX + EBX = EAX + + -> Exchange between EBX and EAX + + AssignBlock can be seen as a dictionary where keys are the destinations + (ExprId or ExprMem), and values their corresponding sources. + + Also provides common manipulation on this assignments. + + """ + __slots__ = ["_assigns", "_instr"] + + def __init__(self, irs=None, instr=None): + """Create a new AssignBlock + @irs: (optional) sequence of ExprAssign, or dictionary dst (Expr) -> src + (Expr) + @instr: (optional) associate an instruction with this AssignBlock + + """ + if irs is None: + irs = [] + self._instr = instr + self._assigns = {} # ExprAssign.dst -> ExprAssign.src + + # Concurrent assignments are handled in _set + if hasattr(irs, "items"): + for dst, src in viewitems(irs): + self._set(dst, src) + else: + for expraff in irs: + self._set(expraff.dst, expraff.src) + + @property + def instr(self): + """Return the associated instruction, if any""" + return self._instr + + def _set(self, dst, src): + """ + Special cases: + * if dst is an ExprSlice, expand it to assign the full Expression + * if dst already known, sources are merged + """ + if dst.size != src.size: + raise RuntimeError( + "sanitycheck: args must have same size! %s" % + ([(str(arg), arg.size) for arg in [dst, src]])) + + if isinstance(dst, m2_expr.ExprSlice): + # Complete the source with missing slice parts + new_dst = dst.arg + rest = [(m2_expr.ExprSlice(dst.arg, r[0], r[1]), r[0], r[1]) + for r in slice_rest(dst)] + all_a = [(src, dst.start, dst.stop)] + rest + all_a.sort(key=lambda x: x[1]) + args = [expr for (expr, _, _) in all_a] + new_src = m2_expr.ExprCompose(*args) + else: + new_dst, new_src = dst, src + + if new_dst in self._assigns and isinstance(new_src, m2_expr.ExprCompose): + if not isinstance(self[new_dst], m2_expr.ExprCompose): + # prev_RAX = 0x1122334455667788 + # input_RAX[0:8] = 0x89 + # final_RAX -> ? (assignment are in parallel) + raise RuntimeError("Concurrent access on same bit not allowed") + + # Consider slice grouping + expr_list = [(new_dst, new_src), + (new_dst, self[new_dst])] + # Find collision + e_colision = reduce(lambda x, y: x.union(y), + (self.get_modified_slice(dst, src) + for (dst, src) in expr_list), + set()) + + # Sort interval collision + known_intervals = sorted([(x[1], x[2]) for x in e_colision]) + + for i, (_, stop) in enumerate(known_intervals[:-1]): + if stop > known_intervals[i + 1][0]: + raise RuntimeError( + "Concurrent access on same bit not allowed") + + # Fill with missing data + missing_i = get_missing_interval(known_intervals, 0, new_dst.size) + remaining = ((m2_expr.ExprSlice(new_dst, *interval), + interval[0], + interval[1]) + for interval in missing_i) + + # Build the merging expression + args = list(e_colision.union(remaining)) + args.sort(key=lambda x: x[1]) + starts = [start for (_, start, _) in args] + assert len(set(starts)) == len(starts) + args = [expr for (expr, _, _) in args] + new_src = m2_expr.ExprCompose(*args) + + # Sanity check + if not isinstance(new_dst, (m2_expr.ExprId, m2_expr.ExprMem)): + raise TypeError("Destination cannot be a %s" % type(new_dst)) + + self._assigns[new_dst] = new_src + + def __setitem__(self, dst, src): + raise RuntimeError('AssignBlock is immutable') + + def __getitem__(self, key): + return self._assigns[key] + + def __contains__(self, key): + return key in self._assigns + + def iteritems(self): + for dst, src in viewitems(self._assigns): + yield dst, src + + def items(self): + return [(dst, src) for dst, src in viewitems(self._assigns)] + + def itervalues(self): + for src in viewvalues(self._assigns): + yield src + + def keys(self): + return list(self._assigns) + + def values(self): + return list(viewvalues(self._assigns)) + + def __iter__(self): + for dst in self._assigns: + yield dst + + def __delitem__(self, _): + raise RuntimeError('AssignBlock is immutable') + + def update(self, _): + raise RuntimeError('AssignBlock is immutable') + + def __eq__(self, other): + if set(self.keys()) != set(other.keys()): + return False + return all(other[dst] == src for dst, src in viewitems(self)) + + def __ne__(self, other): + return not self == other + + def __len__(self): + return len(self._assigns) + + def get(self, key, default): + return self._assigns.get(key, default) + + @staticmethod + def get_modified_slice(dst, src): + """Return an Expr list of extra expressions needed during the + object instantiation""" + if not isinstance(src, m2_expr.ExprCompose): + raise ValueError("Get mod slice not on expraff slice", str(src)) + modified_s = [] + for index, arg in src.iter_args(): + if not (isinstance(arg, m2_expr.ExprSlice) and + arg.arg == dst and + index == arg.start and + index+arg.size == arg.stop): + # If x is not the initial expression + modified_s.append((arg, index, index+arg.size)) + return modified_s + + def get_w(self): + """Return a set of elements written""" + return set(self.keys()) + + def get_rw(self, mem_read=False, cst_read=False): + """Return a dictionary associating written expressions to a set of + their read requirements + @mem_read: (optional) mem_read argument of `get_r` + @cst_read: (optional) cst_read argument of `get_r` + """ + out = {} + for dst, src in viewitems(self): + src_read = src.get_r(mem_read=mem_read, cst_read=cst_read) + if isinstance(dst, m2_expr.ExprMem) and mem_read: + # Read on destination happens only with ExprMem + src_read.update(dst.ptr.get_r(mem_read=mem_read, + cst_read=cst_read)) + out[dst] = src_read + return out + + def get_r(self, mem_read=False, cst_read=False): + """Return a set of elements reads + @mem_read: (optional) mem_read argument of `get_r` + @cst_read: (optional) cst_read argument of `get_r` + """ + return set( + chain.from_iterable( + viewvalues( + self.get_rw( + mem_read=mem_read, + cst_read=cst_read + ) + ) + ) + ) + + def __str__(self): + out = [] + for dst, src in sorted(viewitems(self._assigns)): + out.append("%s = %s" % (dst, src)) + return "\n".join(out) + + def dst2ExprAssign(self, dst): + """Return an ExprAssign corresponding to @dst equation + @dst: Expr instance""" + return m2_expr.ExprAssign(dst, self[dst]) + + def simplify(self, simplifier): + """ + Return a new AssignBlock with expression simplified + + @simplifier: ExpressionSimplifier instance + """ + new_assignblk = {} + for dst, src in viewitems(self): + if dst == src: + continue + new_src = simplifier(src) + new_dst = simplifier(dst) + new_assignblk[new_dst] = new_src + return AssignBlock(irs=new_assignblk, instr=self.instr) + + def to_string(self, loc_db=None): + out = [] + for dst, src in viewitems(self): + new_src = src.visit(lambda expr:_expr_loc_to_symb(expr, loc_db)) + new_dst = dst.visit(lambda expr:_expr_loc_to_symb(expr, loc_db)) + line = "%s = %s" % (new_dst, new_src) + out.append(line) + out.append("") + return "\n".join(out) + +class IRBlock(object): + """Intermediate representation block object. + + Stand for an intermediate representation basic block. + """ + + __slots__ = ["_loc_db", "_loc_key", "_assignblks", "_dst", "_dst_linenb"] + + def __init__(self, loc_db, loc_key, assignblks): + """ + @loc_key: LocKey of the IR basic block + @assignblks: list of AssignBlock + """ + + assert isinstance(loc_key, m2_expr.LocKey) + self._loc_key = loc_key + self._loc_db = loc_db + for assignblk in assignblks: + assert isinstance(assignblk, AssignBlock) + self._assignblks = tuple(assignblks) + self._dst = None + self._dst_linenb = None + + def __eq__(self, other): + if self.__class__ is not other.__class__: + return False + if self.loc_key != other.loc_key: + return False + if self.loc_db != other.loc_db: + return False + if len(self.assignblks) != len(other.assignblks): + return False + for assignblk1, assignblk2 in zip(self.assignblks, other.assignblks): + if assignblk1 != assignblk2: + return False + return True + + def __ne__(self, other): + return not self == other + + def get_label(self): + warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"') + return self.loc_key + + loc_key = property(lambda self:self._loc_key) + loc_db = property(lambda self:self._loc_db) + label = property(get_label) + + @property + def assignblks(self): + return self._assignblks + + @property + def irs(self): + warnings.warn('DEPRECATION WARNING: use "irblock.assignblks" instead of "irblock.irs"') + return self._assignblks + + def __iter__(self): + """Iterate on assignblks""" + return self._assignblks.__iter__() + + def __getitem__(self, index): + """Getitem on assignblks""" + return self._assignblks.__getitem__(index) + + def __len__(self): + """Length of assignblks""" + return self._assignblks.__len__() + + def is_dst_set(self): + return self._dst is not None + + def cache_dst(self): + final_dst = None + final_linenb = None + for linenb, assignblk in enumerate(self): + for dst, src in viewitems(assignblk): + if dst.is_id("IRDst"): + if final_dst is not None: + raise ValueError('Multiple destinations!') + final_dst = src + final_linenb = linenb + self._dst = final_dst + self._dst_linenb = final_linenb + return final_dst + + @property + def dst(self): + """Return the value of IRDst for the IRBlock""" + if self.is_dst_set(): + return self._dst + return self.cache_dst() + + def set_dst(self, value): + """Generate a new IRBlock with a dst (IRBlock) fixed to @value""" + irs = [] + dst_found = False + for assignblk in self: + new_assignblk = {} + for dst, src in viewitems(assignblk): + if dst.is_id("IRDst"): + assert dst_found is False + dst_found = True + new_assignblk[dst] = value + else: + new_assignblk[dst] = src + irs.append(AssignBlock(new_assignblk, assignblk.instr)) + return IRBlock(self.loc_db, self.loc_key, irs) + + @property + def dst_linenb(self): + """Line number of the IRDst setting statement in the current irs""" + if not self.is_dst_set(): + self.cache_dst() + return self._dst_linenb + + def to_string(self): + out = [] + names = self.loc_db.get_location_names(self.loc_key) + if not names: + node_name = "%s:" % self.loc_db.pretty_str(self.loc_key) + else: + node_name = "".join("%s:\n" % name for name in names) + out.append(node_name) + + for assignblk in self: + out.append(assignblk.to_string(self.loc_db)) + return '\n'.join(out) + + def __str__(self): + return self.to_string() + + def modify_exprs(self, mod_dst=None, mod_src=None): + """ + Generate a new IRBlock with its AssignBlock expressions modified + according to @mod_dst and @mod_src + @mod_dst: function called to modify Expression destination + @mod_src: function called to modify Expression source + """ + + if mod_dst is None: + mod_dst = lambda expr:expr + if mod_src is None: + mod_src = lambda expr:expr + + assignblks = [] + for assignblk in self: + new_assignblk = {} + for dst, src in viewitems(assignblk): + new_assignblk[mod_dst(dst)] = mod_src(src) + assignblks.append(AssignBlock(new_assignblk, assignblk.instr)) + return IRBlock(self.loc_db, self.loc_key, assignblks) + + def simplify(self, simplifier): + """ + Simplify expressions in each assignblock + @simplifier: ExpressionSimplifier instance + """ + modified = False + assignblks = [] + for assignblk in self: + new_assignblk = assignblk.simplify(simplifier) + if assignblk != new_assignblk: + modified = True + assignblks.append(new_assignblk) + return modified, IRBlock(self.loc_db, self.loc_key, assignblks) + + +class irbloc(IRBlock): + """ + DEPRECATED object + Use IRBlock instead of irbloc + """ + + def __init__(self, loc_key, irs, lines=None): + warnings.warn('DEPRECATION WARNING: use "IRBlock" instead of "irblock"') + super(irbloc, self).__init__(loc_key, irs) + + + +class IRCFG(DiGraph): + + """DiGraph for IR instances""" + + def __init__(self, irdst, loc_db, blocks=None, *args, **kwargs): + """Instantiate a IRCFG + @loc_db: LocationDB instance + @blocks: IR blocks + """ + self.loc_db = loc_db + if blocks is None: + blocks = {} + self._blocks = blocks + self._irdst = irdst + super(IRCFG, self).__init__(*args, **kwargs) + + @property + def IRDst(self): + return self._irdst + + @property + def blocks(self): + return self._blocks + + def add_irblock(self, irblock): + """ + Add the @irblock to the current IRCFG + @irblock: IRBlock instance + """ + self.blocks[irblock.loc_key] = irblock + self.add_node(irblock.loc_key) + + for dst in self.dst_trackback(irblock): + if dst.is_int(): + dst_loc_key = self.loc_db.get_or_create_offset_location(int(dst)) + dst = m2_expr.ExprLoc(dst_loc_key, irblock.dst.size) + if dst.is_loc(): + self.add_uniq_edge(irblock.loc_key, dst.loc_key) + + def escape_text(self, text): + return text + + def node2lines(self, node): + node_name = self.loc_db.pretty_str(node) + yield self.DotCellDescription( + text="%s" % node_name, + attr={ + 'align': 'center', + 'colspan': 2, + 'bgcolor': 'grey', + } + ) + if node not in self._blocks: + yield [self.DotCellDescription(text="NOT PRESENT", attr={'bgcolor': 'red'})] + return + for i, assignblk in enumerate(self._blocks[node]): + for dst, src in viewitems(assignblk): + line = "%s = %s" % ( + color_expr_html(dst, self.loc_db), + color_expr_html(src, self.loc_db) + ) + if self._dot_offset: + yield [self.DotCellDescription(text="%-4d" % i, attr={}), + self.DotCellDescription(text=line, attr={})] + else: + yield self.DotCellDescription(text=line, attr={}) + yield self.DotCellDescription(text="", attr={}) + + def edge_attr(self, src, dst): + if src not in self._blocks or dst not in self._blocks: + return {} + src_irdst = self._blocks[src].dst + edge_color = "blue" + if isinstance(src_irdst, m2_expr.ExprCond): + src1, src2 = src_irdst.src1, src_irdst.src2 + if src1.is_loc(dst): + edge_color = "limegreen" + elif src2.is_loc(dst): + edge_color = "red" + return {"color": edge_color} + + def node_attr(self, node): + if node not in self._blocks: + return {'style': 'filled', 'fillcolor': 'red'} + return {} + + def dot(self, offset=False): + """ + @offset: (optional) if set, add the corresponding line number in each + node + """ + self._dot_offset = offset + return super(IRCFG, self).dot() + + def get_loc_key(self, addr): + """Transforms an ExprId/ExprInt/loc_key/int into a loc_key + @addr: an ExprId/ExprInt/loc_key/int""" + + if isinstance(addr, m2_expr.LocKey): + return addr + elif isinstance(addr, m2_expr.ExprLoc): + return addr.loc_key + + try: + addr = int(addr) + except (ValueError, TypeError): + return None + + return self.loc_db.get_offset_location(addr) + + + def get_or_create_loc_key(self, addr): + """Transforms an ExprId/ExprInt/loc_key/int into a loc_key + If the offset @addr is not in the LocationDB, create it + @addr: an ExprId/ExprInt/loc_key/int""" + + loc_key = self.get_loc_key(addr) + if loc_key is not None: + return loc_key + + return self.loc_db.add_location(offset=int(addr)) + + def get_block(self, addr): + """Returns the irbloc associated to an ExprId/ExprInt/loc_key/int + @addr: an ExprId/ExprInt/loc_key/int""" + + loc_key = self.get_loc_key(addr) + if loc_key is None: + return None + return self.blocks.get(loc_key, None) + + def getby_offset(self, offset): + """ + Return the set of loc_keys of irblocks containing @offset + @offset: address + """ + out = set() + for irb in viewvalues(self.blocks): + for assignblk in irb: + instr = assignblk.instr + if instr is None: + continue + if instr.offset <= offset < instr.offset + instr.l: + out.add(irb.loc_key) + return out + + + def simplify(self, simplifier): + """ + Simplify expressions in each irblocks + @simplifier: ExpressionSimplifier instance + """ + modified = False + for loc_key, block in list(viewitems(self.blocks)): + assignblks = [] + for assignblk in block: + new_assignblk = assignblk.simplify(simplifier) + if assignblk != new_assignblk: + modified = True + assignblks.append(new_assignblk) + self.blocks[loc_key] = IRBlock(self.loc_db, loc_key, assignblks) + return modified + + def _extract_dst(self, todo, done): + """ + Naive extraction of @todo destinations + WARNING: @todo and @done are modified + """ + out = set() + while todo: + dst = todo.pop() + if dst.is_loc(): + done.add(dst) + elif dst.is_mem() or dst.is_int(): + done.add(dst) + elif dst.is_cond(): + todo.add(dst.src1) + todo.add(dst.src2) + elif dst.is_id(): + out.add(dst) + else: + done.add(dst) + return out + + def dst_trackback(self, irb): + """ + Naive backtracking of IRDst + @irb: irbloc instance + """ + todo = set([irb.dst]) + done = set() + + for assignblk in reversed(irb): + if not todo: + break + out = self._extract_dst(todo, done) + found = set() + follow = set() + for dst in out: + if dst in assignblk: + follow.add(assignblk[dst]) + found.add(dst) + + follow.update(out.difference(found)) + todo = follow + + return done + + +class DiGraphIR(IRCFG): + """ + DEPRECATED object + Use IRCFG instead of DiGraphIR + """ + + def __init__(self, *args, **kwargs): + warnings.warn('DEPRECATION WARNING: use "IRCFG" instead of "DiGraphIR"') + raise NotImplementedError("Deprecated") + + +class Lifter(object): + """ + Intermediate representation object + + Allow native assembly to intermediate representation traduction + """ + + def __init__(self, arch, attrib, loc_db): + self.pc = arch.getpc(attrib) + self.sp = arch.getsp(attrib) + self.arch = arch + self.attrib = attrib + self.loc_db = loc_db + self.IRDst = None + + def get_ir(self, instr): + raise NotImplementedError("Abstract Method") + + def new_ircfg(self, *args, **kwargs): + """ + Return a new instance of IRCFG + """ + return IRCFG(self.IRDst, self.loc_db, *args, **kwargs) + + def new_ircfg_from_asmcfg(self, asmcfg, *args, **kwargs): + """ + Return a new instance of IRCFG from an @asmcfg + @asmcfg: AsmCFG instance + """ + + ircfg = IRCFG(self.IRDst, self.loc_db, *args, **kwargs) + for block in asmcfg.blocks: + self.add_asmblock_to_ircfg(block, ircfg) + return ircfg + + def instr2ir(self, instr): + ir_bloc_cur, extra_irblocks = self.get_ir(instr) + for index, irb in enumerate(extra_irblocks): + irs = [] + for assignblk in irb: + irs.append(AssignBlock(assignblk, instr)) + extra_irblocks[index] = IRBlock(self.loc_db, irb.loc_key, irs) + assignblk = AssignBlock(ir_bloc_cur, instr) + return assignblk, extra_irblocks + + def add_instr_to_ircfg(self, instr, ircfg, loc_key=None, gen_pc_updt=False): + """ + Add the native instruction @instr to the @ircfg + @instr: instruction instance + @ircfg: IRCFG instance + @loc_key: loc_key instance of the instruction destination + @gen_pc_updt: insert PC update effects between instructions + """ + + if loc_key is None: + offset = getattr(instr, "offset", None) + loc_key = self.loc_db.get_or_create_offset_location(offset) + block = AsmBlock(self.loc_db, loc_key) + block.lines = [instr] + self.add_asmblock_to_ircfg(block, ircfg, gen_pc_updt) + return loc_key + + def gen_pc_update(self, assignments, instr): + offset = m2_expr.ExprInt(instr.offset, self.pc.size) + assignments.append(AssignBlock({self.pc:offset}, instr)) + + def add_instr_to_current_state(self, instr, block, assignments, ir_blocks_all, gen_pc_updt): + """ + Add the IR effects of an instruction to the current state. + + Returns a bool: + * True if the current assignments list must be split + * False in other cases. + + @instr: native instruction + @block: native block source + @assignments: list of current AssignBlocks + @ir_blocks_all: list of additional effects + @gen_pc_updt: insert PC update effects between instructions + """ + if gen_pc_updt is not False: + self.gen_pc_update(assignments, instr) + + assignblk, ir_blocks_extra = self.instr2ir(instr) + assignments.append(assignblk) + ir_blocks_all += ir_blocks_extra + if ir_blocks_extra: + return True + return False + + def add_asmblock_to_ircfg(self, block, ircfg, gen_pc_updt=False): + """ + Add a native block to the current IR + @block: native assembly block + @ircfg: IRCFG instance + @gen_pc_updt: insert PC update effects between instructions + """ + + loc_key = block.loc_key + ir_blocks_all = [] + + if isinstance(block, AsmBlockBad): + return ir_blocks_all + + assignments = [] + for instr in block.lines: + if loc_key is None: + assignments = [] + loc_key = self.get_loc_key_for_instr(instr) + split = self.add_instr_to_current_state( + instr, block, assignments, + ir_blocks_all, gen_pc_updt + ) + if split: + ir_blocks_all.append(IRBlock(self.loc_db, loc_key, assignments)) + loc_key = None + assignments = [] + if loc_key is not None: + ir_blocks_all.append(IRBlock(self.loc_db, loc_key, assignments)) + + new_ir_blocks_all = self.post_add_asmblock_to_ircfg(block, ircfg, ir_blocks_all) + for irblock in new_ir_blocks_all: + ircfg.add_irblock(irblock) + return new_ir_blocks_all + + def add_block(self, block, gen_pc_updt=False): + """ + DEPRECATED function + Use add_asmblock_to_ircfg instead of add_block + """ + warnings.warn("""DEPRECATION WARNING + ircfg is now out of Lifter + Use: + ircfg = lifter.new_ircfg() + lifter.add_asmblock_to_ircfg(block, ircfg) + """) + raise RuntimeError("API Deprecated") + + def add_bloc(self, block, gen_pc_updt=False): + """ + DEPRECATED function + Use add_asmblock_to_ircfg instead of add_bloc + """ + self.add_block(block, gen_pc_updt) + + def get_next_loc_key(self, instr): + loc_key = self.loc_db.get_or_create_offset_location(instr.offset + instr.l) + return loc_key + + def get_loc_key_for_instr(self, instr): + """Returns the loc_key associated to an instruction + @instr: current instruction""" + return self.loc_db.get_or_create_offset_location(instr.offset) + + def gen_loc_key_and_expr(self, size): + """ + Return a loc_key and it's corresponding ExprLoc + @size: size of expression + """ + loc_key = self.loc_db.add_location() + return loc_key, m2_expr.ExprLoc(loc_key, size) + + def expr_fix_regs_for_mode(self, expr, *args, **kwargs): + return expr + + def expraff_fix_regs_for_mode(self, expr, *args, **kwargs): + return expr + + def irbloc_fix_regs_for_mode(self, irblock, *args, **kwargs): + return irblock + + def is_pc_written(self, block): + """Return the first Assignblk of the @block in which PC is written + @block: IRBlock instance""" + all_pc = viewvalues(self.arch.pc) + for assignblk in block: + if assignblk.dst in all_pc: + return assignblk + return None + + def set_empty_dst_to_next(self, block, ir_blocks): + for index, irblock in enumerate(ir_blocks): + if irblock.dst is not None: + continue + next_loc_key = block.get_next() + if next_loc_key is None: + loc_key = None + if block.lines: + line = block.lines[-1] + if line.offset is not None: + loc_key = self.loc_db.get_or_create_offset_location(line.offset + line.l) + if loc_key is None: + loc_key = self.loc_db.add_location() + block.add_cst(loc_key, AsmConstraint.c_next) + else: + loc_key = next_loc_key + dst = m2_expr.ExprLoc(loc_key, self.pc.size) + if irblock.assignblks: + instr = irblock.assignblks[-1].instr + else: + instr = None + assignblk = AssignBlock({self.IRDst: dst}, instr) + ir_blocks[index] = IRBlock(self.loc_db, irblock.loc_key, list(irblock.assignblks) + [assignblk]) + + def post_add_asmblock_to_ircfg(self, block, ircfg, ir_blocks): + self.set_empty_dst_to_next(block, ir_blocks) + + new_irblocks = [] + for irblock in ir_blocks: + new_irblock = self.irbloc_fix_regs_for_mode(irblock, self.attrib) + ircfg.add_irblock(new_irblock) + new_irblocks.append(new_irblock) + return new_irblocks + + +class IntermediateRepresentation(Lifter): + """ + DEPRECATED object + Use Lifter instead of IntermediateRepresentation + """ + + def __init__(self, arch, attrib, loc_db): + warnings.warn('DEPRECATION WARNING: use "Lifter" instead of "IntermediateRepresentation"') + super(IntermediateRepresentation, self).__init__(arch, attrib, loc_db) + + +class ir(Lifter): + """ + DEPRECATED object + Use Lifter instead of ir + """ + + def __init__(self, loc_key, irs, lines=None): + warnings.warn('DEPRECATION WARNING: use "Lifter" instead of "ir"') + super(ir, self).__init__(loc_key, irs, lines) diff --git a/src/miasm/ir/symbexec.py b/src/miasm/ir/symbexec.py new file mode 100644 index 00000000..2e6b07df --- /dev/null +++ b/src/miasm/ir/symbexec.py @@ -0,0 +1,1132 @@ +from __future__ import print_function +from builtins import range +import logging +try: + from collections.abc import MutableMapping +except ImportError: + from collections import MutableMapping + +from future.utils import viewitems + +from miasm.expression.expression import ExprOp, ExprId, ExprLoc, ExprInt, \ + ExprMem, ExprCompose, ExprSlice, ExprCond +from miasm.expression.simplifications import expr_simp_explicit +from miasm.ir.ir import AssignBlock + +log = logging.getLogger("symbexec") +console_handler = logging.StreamHandler() +console_handler.setFormatter(logging.Formatter("[%(levelname)-8s]: %(message)s")) +log.addHandler(console_handler) +log.setLevel(logging.INFO) + + +def get_block(lifter, ircfg, mdis, addr): + """Get IRBlock at address @addr""" + loc_key = ircfg.get_or_create_loc_key(addr) + if not loc_key in ircfg.blocks: + offset = mdis.loc_db.get_location_offset(loc_key) + block = mdis.dis_block(offset) + lifter.add_asmblock_to_ircfg(block, ircfg) + irblock = ircfg.get_block(loc_key) + if irblock is None: + raise LookupError('No block found at that address: %s' % lifter.loc_db.pretty_str(loc_key)) + return irblock + + +class StateEngine(object): + """Stores an Engine state""" + + def merge(self, other): + """Generate a new state, representing the merge of self and @other + @other: a StateEngine instance""" + + raise NotImplementedError("Abstract method") + + +class SymbolicState(StateEngine): + """Stores a SymbolicExecutionEngine state""" + + def __init__(self, dct): + self._symbols = frozenset(viewitems(dct)) + + def __hash__(self): + return hash((self.__class__, self._symbols)) + + def __eq__(self, other): + if self is other: + return True + if self.__class__ != other.__class__: + return False + return self.symbols == other.symbols + + def __ne__(self, other): + return not self == other + + def __iter__(self): + for dst, src in self._symbols: + yield dst, src + + def iteritems(self): + """Iterate on stored memory/values""" + return self.__iter__() + + def merge(self, other): + """Merge two symbolic states + Only equal expressions are kept in both states + @other: second symbolic state + """ + + symb_a = self.symbols + symb_b = other.symbols + intersection = set(symb_a).intersection(set(symb_b)) + out = {} + for dst in intersection: + if symb_a[dst] == symb_b[dst]: + out[dst] = symb_a[dst] + return self.__class__(out) + + @property + def symbols(self): + """Return the dictionary of known symbols""" + return dict(self._symbols) + + +INTERNAL_INTBASE_NAME = "__INTERNAL_INTBASE__" + + +def get_expr_base_offset(expr): + """Return a couple representing the symbolic/concrete part of an addition + expression. + + If there is no symbolic part, ExprId(INTERNAL_INTBASE_NAME) is used + If there is not concrete part, 0 is used + @expr: Expression instance + + """ + if expr.is_int(): + internal_intbase = ExprId(INTERNAL_INTBASE_NAME, expr.size) + return internal_intbase, int(expr) + + if not expr.is_op('+'): + return expr, 0 + if expr.args[-1].is_int(): + args, offset = expr.args[:-1], int(expr.args[-1]) + if len(args) == 1: + return args[0], offset + return ExprOp('+', *args), offset + return expr, 0 + + +class MemArray(MutableMapping): + """Link between base and its (offset, Expr) + + Given an expression (say *base*), this structure will store every memory + content relatively to an integer offset from *base*. + + The value associated to a given offset is a description of the slice of a + stored expression. The slice size depends on the configuration of the + MemArray. For example, for a slice size of 8 bits, the assignment: + - @32[EAX+0x10] = EBX + + will store for the base EAX: + - 0x10: (EBX, 0) + - 0x11: (EBX, 1) + - 0x12: (EBX, 2) + - 0x13: (EBX, 3) + + If the *base* is EAX+EBX, this structure can store the following contents: + - @32[EAX+EBX] + - @8[EAX+EBX+0x100] + But not: + - @32[EAX+0x10] (which is stored in another MemArray based on EAX) + - @32[EAX+EBX+ECX] (which is stored in another MemArray based on + EAX+EBX+ECX) + + """ + + def __init__(self, base, expr_simp=expr_simp_explicit): + self._base = base + self.expr_simp = expr_simp + self._mask = int(base.mask) + self._offset_to_expr = {} + + @property + def base(self): + """Expression representing the symbolic base address""" + return self._base + + @property + def mask(self): + """Mask offset""" + return self._mask + + def __contains__(self, offset): + return offset in self._offset_to_expr + + def __getitem__(self, offset): + assert 0 <= offset <= self._mask + return self._offset_to_expr.__getitem__(offset) + + def __setitem__(self, offset, value): + raise RuntimeError("Use write api to update keys") + + def __delitem__(self, offset): + assert 0 <= offset <= self._mask + return self._offset_to_expr.__delitem__(offset) + + def __iter__(self): + for offset, _ in viewitems(self._offset_to_expr): + yield offset + + def __len__(self): + return len(self._offset_to_expr) + + def __repr__(self): + out = [] + out.append("Base: %s" % self.base) + for offset, (index, value) in sorted(viewitems(self._offset_to_expr)): + out.append("%16X %d %s" % (offset, index, value)) + return '\n'.join(out) + + def copy(self): + """Copy object instance""" + obj = MemArray(self.base, self.expr_simp) + obj._offset_to_expr = self._offset_to_expr.copy() + return obj + + @staticmethod + def offset_to_ptr(base, offset): + """ + Return an expression representing the @base + @offset + @base: symbolic base address + @offset: relative offset integer to the @base address + """ + if base.is_id(INTERNAL_INTBASE_NAME): + ptr = ExprInt(offset, base.size) + elif offset == 0: + ptr = base + else: + ptr = base + ExprInt(offset, base.size) + return ptr.canonize() + + def read(self, offset, size): + """ + Return memory at @offset with @size as an Expr list + @offset: integer (in bytes) + @size: integer (in bits), byte aligned + + Consider the following state: + - 0x10: (EBX, 0) + - 0x11: (EBX, 1) + - 0x12: (EBX, 2) + - 0x13: (EBX, 3) + + A read at 0x10 of 32 bits should return: EBX + """ + + assert size % 8 == 0 + # Parts is (Expr's offset, size, Expr) + parts = [] + for index in range(size // 8): + # Wrap read: + # @32[EAX+0xFFFFFFFF] is ok and will read at 0xFFFFFFFF, 0, 1, 2 + request_offset = (offset + index) & self._mask + if request_offset in self._offset_to_expr: + # Known memory portion + off, data = self._offset_to_expr[request_offset] + parts.append((off, 1, data)) + continue + + # Unknown memory portion + ptr = self.offset_to_ptr(self.base, request_offset) + data = ExprMem(ptr, 8) + parts.append((0, 1, data)) + + # Group similar data + # XXX TODO: only little endian here + index = 0 + while index + 1 < len(parts): + off_a, size_a, data_a = parts[index] + off_b, size_b, data_b = parts[index+1] + if data_a == data_b and off_a + size_a == off_b: + # Read consecutive bytes of a variable + # [(0, 8, x), (1, 8, x)] => (0, 16, x) + parts[index:index+2] = [(off_a, size_a + size_b, data_a)] + continue + if data_a.is_int() and data_b.is_int(): + # Read integer parts + # [(0, 8, 0x11223344), (1, 8, 0x55667788)] => (0, 16, 0x7744) + int1 = self.expr_simp(data_a[off_a*8:(off_a+size_a)*8]) + int2 = self.expr_simp(data_b[off_b*8:(off_b+size_b)*8]) + assert int1.is_int() and int2.is_int() + int1, int2 = int(int1), int(int2) + result = ExprInt((int2 << (size_a * 8)) | int1, (size_a + size_b) * 8) + parts[index:index+2] = [(0, size_a + size_b, result)] + continue + if data_a.is_mem() and data_b.is_mem(): + # Read consecutive bytes of a memory variable + ptr_base_a, ptr_offset_a = get_expr_base_offset(data_a.ptr) + ptr_base_b, ptr_offset_b = get_expr_base_offset(data_b.ptr) + if ptr_base_a != ptr_base_b: + index += 1 + continue + if (ptr_offset_a + off_a + size_a) & self._mask == (ptr_offset_b + off_b) & self._mask: + assert size_a <= data_a.size // 8 - off_a + assert size_b <= data_b.size // 8 - off_b + # Successive comparable symbolic pointers + # [(0, 8, @8[ptr]), (0, 8, @8[ptr+1])] => (0, 16, @16[ptr]) + ptr = self.offset_to_ptr(ptr_base_a, (ptr_offset_a + off_a) & self._mask) + + data = ExprMem(ptr, (size_a + size_b) * 8) + parts[index:index+2] = [(0, size_a + size_b, data)] + + continue + + index += 1 + + # Slice data + read_mem = [] + for off, bytesize, data in parts: + if data.size // 8 != bytesize: + data = data[off * 8: (off + bytesize) * 8] + read_mem.append(data) + + return read_mem + + def write(self, offset, expr): + """ + Write @expr at @offset + @offset: integer (in bytes) + @expr: Expr instance value + """ + assert expr.size % 8 == 0 + assert offset <= self._mask + for index in range(expr.size // 8): + # Wrap write: + # @32[EAX+0xFFFFFFFF] is ok and will write at 0xFFFFFFFF, 0, 1, 2 + request_offset = (offset + index) & self._mask + # XXX TODO: only little endian here + self._offset_to_expr[request_offset] = (index, expr) + + tmp = self.expr_simp(expr[index * 8: (index + 1) * 8]) + # Special case: Simplify slice of pointer (simplification is ok + # here, as we won't store the simplified expression) + if tmp.is_slice() and tmp.arg.is_mem() and tmp.start % 8 == 0: + new_ptr = self.expr_simp( + tmp.arg.ptr + ExprInt(tmp.start // 8, tmp.arg.ptr.size) + ) + tmp = ExprMem(new_ptr, tmp.stop - tmp.start) + # Test if write to original value + if tmp.is_mem(): + src_ptr, src_off = get_expr_base_offset(tmp.ptr) + if src_ptr == self.base and src_off == request_offset: + del self._offset_to_expr[request_offset] + + + def _get_variable_parts(self, index, known_offsets, forward=True): + """ + Find consecutive memory parts representing the same variable. The part + starts at offset known_offsets[@index] and search is in offset direction + determined by @forward + Return the number of consecutive parts of the same variable. + + @index: index of the memory offset in known_offsets + @known_offsets: sorted offsets + @forward: Search in offset growing direction if True, else in reverse + order + """ + + offset = known_offsets[index] + value_byte_index, value = self._offset_to_expr[offset] + assert value.size % 8 == 0 + + if forward: + start, end, step = value_byte_index + 1, value.size // 8, 1 + else: + start, end, step = value_byte_index - 1, -1, -1 + + partnum = 1 + for value_offset in range(start, end, step): + offset += step + # Check if next part is in known_offsets + next_index = index + step * partnum + if not 0 <= next_index < len(known_offsets): + break + + offset_next = known_offsets[next_index] + if offset_next != offset: + break + + # Check if next part is a part of the searched value + byte_index, value_next = self._offset_to_expr[offset_next] + if byte_index != value_offset: + break + if value != value_next: + break + partnum += 1 + + return partnum + + + def _build_value_at_offset(self, value, offset, start, length): + """ + Return a couple. The first element is the memory Expression representing + the value at @offset, the second is its value. The value is truncated + at byte @start with @length + + @value: Expression to truncate + @offset: offset in bytes of the variable (integer) + @start: value's byte offset (integer) + @length: length in bytes (integer) + """ + + ptr = self.offset_to_ptr(self.base, offset) + size = length * 8 + if start == 0 and size == value.size: + result = value + else: + result = self.expr_simp(value[start * 8: start * 8 + size]) + + return ExprMem(ptr, size), result + + + def memory(self): + """ + Iterate on stored memory/values + + The goal here is to group entities. + + Consider the following state: + EAX + 0x10 = (0, EDX) + EAX + 0x11 = (1, EDX) + EAX + 0x12 = (2, EDX) + EAX + 0x13 = (3, EDX) + + The function should return: + @32[EAX + 0x10] = EDX + """ + + if not self._offset_to_expr: + return + known_offsets = sorted(self._offset_to_expr) + index = 0 + # Test if the first element is the continuation of the last byte. If + # yes, merge and output it first. + min_int = 0 + max_int = (1 << self.base.size) - 1 + limit_index = len(known_offsets) + + first_element = None + # Special case where a variable spreads on max_int/min_int + if known_offsets[0] == min_int and known_offsets[-1] == max_int: + min_offset, max_offset = known_offsets[0], known_offsets[-1] + min_byte_index, min_value = self._offset_to_expr[min_offset] + max_byte_index, max_value = self._offset_to_expr[max_offset] + if min_value == max_value and max_byte_index + 1 == min_byte_index: + # Look for current variable start + partnum_before = self._get_variable_parts(len(known_offsets) - 1, known_offsets, False) + # Look for current variable end + partnum_after = self._get_variable_parts(0, known_offsets) + + partnum = partnum_before + partnum_after + offset = known_offsets[-partnum_before] + index_value, value = self._offset_to_expr[offset] + + mem, result = self._build_value_at_offset(value, offset, index_value, partnum) + first_element = mem, result + index = partnum_after + limit_index = len(known_offsets) - partnum_before + + # Special cases are done, walk and merge variables + while index < limit_index: + offset = known_offsets[index] + index_value, value = self._offset_to_expr[offset] + partnum = self._get_variable_parts(index, known_offsets) + mem, result = self._build_value_at_offset(value, offset, index_value, partnum) + yield mem, result + index += partnum + + if first_element is not None: + yield first_element + + def dump(self): + """Display MemArray content""" + for mem, value in self.memory(): + print("%s = %s" % (mem, value)) + + +class MemSparse(object): + """Link a symbolic memory pointer to its MemArray. + + For each symbolic memory object, this object will extract the memory pointer + *ptr*. It then splits *ptr* into a symbolic and an integer part. For + example, the memory @[ESP+4] will give ESP+4 for *ptr*. *ptr* is then split + into its base ESP and its offset 4. Each symbolic base address uses a + different MemArray. + + Example: + - @32[EAX+EBX] + - @8[EAX+EBX+0x100] + Will be stored in the same MemArray with a EAX+EBX base + + """ + + def __init__(self, addrsize, expr_simp=expr_simp_explicit): + """ + @addrsize: size (in bits) of the addresses manipulated by the MemSparse + @expr_simp: an ExpressionSimplifier instance + """ + self.addrsize = addrsize + self.expr_simp = expr_simp + self.base_to_memarray = {} + + def __contains__(self, expr): + """ + Return True if the whole @expr is present + For partial check, use 'contains_partial' + """ + if not expr.is_mem(): + return False + ptr = expr.ptr + base, offset = get_expr_base_offset(ptr) + memarray = self.base_to_memarray.get(base, None) + if memarray is None: + return False + for i in range(expr.size // 8): + if offset + i not in memarray: + return False + return True + + def contains_partial(self, expr): + """ + Return True if a part of @expr is present in memory + """ + if not expr.is_mem(): + return False + ptr = expr.ptr + base, offset = get_expr_base_offset(ptr) + memarray = self.base_to_memarray.get(base, None) + if memarray is None: + return False + for i in range(expr.size // 8): + if offset + i in memarray: + return True + return False + + def clear(self): + """Reset the current object content""" + self.base_to_memarray.clear() + + def copy(self): + """Copy the current object instance""" + base_to_memarray = {} + for base, memarray in viewitems(self.base_to_memarray): + base_to_memarray[base] = memarray.copy() + obj = MemSparse(self.addrsize, self.expr_simp) + obj.base_to_memarray = base_to_memarray + return obj + + def __delitem__(self, expr): + """ + Delete a value @expr *fully* present in memory + For partial delete, use delete_partial + """ + ptr = expr.ptr + base, offset = get_expr_base_offset(ptr) + memarray = self.base_to_memarray.get(base, None) + if memarray is None: + raise KeyError + # Check if whole entity is in the MemArray before deleting it + for i in range(expr.size // 8): + if (offset + i) & memarray.mask not in memarray: + raise KeyError + for i in range(expr.size // 8): + del memarray[(offset + i) & memarray.mask] + + def delete_partial(self, expr): + """ + Delete @expr from memory. Skip parts of @expr which are not present in + memory. + """ + ptr = expr.ptr + base, offset = get_expr_base_offset(ptr) + memarray = self.base_to_memarray.get(base, None) + if memarray is None: + raise KeyError + # Check if whole entity is in the MemArray before deleting it + for i in range(expr.size // 8): + real_offset = (offset + i) & memarray.mask + if real_offset in memarray: + del memarray[real_offset] + + def read(self, ptr, size): + """ + Return the value associated with the Expr at address @ptr + @ptr: Expr representing the memory address + @size: memory size (in bits), byte aligned + """ + assert size % 8 == 0 + base, offset = get_expr_base_offset(ptr) + memarray = self.base_to_memarray.get(base, None) + if memarray is not None: + mems = memarray.read(offset, size) + ret = mems[0] if len(mems) == 1 else ExprCompose(*mems) + else: + ret = ExprMem(ptr, size) + return ret + + def write(self, ptr, expr): + """ + Update the corresponding Expr @expr at address @ptr + @ptr: Expr representing the memory address + @expr: Expr instance + """ + assert ptr.size == self.addrsize + base, offset = get_expr_base_offset(ptr) + memarray = self.base_to_memarray.get(base, None) + if memarray is None: + memarray = MemArray(base, self.expr_simp) + self.base_to_memarray[base] = memarray + memarray.write(offset, expr) + + def iteritems(self): + """Iterate on stored memory variables and their values.""" + for _, memarray in viewitems(self.base_to_memarray): + for mem, value in memarray.memory(): + yield mem, value + + def items(self): + """Return stored memory variables and their values.""" + return list(self.iteritems()) + + def dump(self): + """Display MemSparse content""" + for mem, value in viewitems(self): + print("%s = %s" % (mem, value)) + + def __repr__(self): + out = [] + for _, memarray in sorted(viewitems(self.base_to_memarray)): + out.append(repr(memarray)) + return '\n'.join(out) + + +class SymbolMngr(object): + """Symbolic store manager (IDs and MEMs)""" + + def __init__(self, init=None, addrsize=None, expr_simp=expr_simp_explicit): + assert addrsize is not None + if init is None: + init = {} + self.addrsize = addrsize + self.expr_simp = expr_simp + self.symbols_id = {} + self.symbols_mem = MemSparse(addrsize, expr_simp) + self.mask = (1 << addrsize) - 1 + for expr, value in viewitems(init): + self.write(expr, value) + + def __contains__(self, expr): + if expr.is_id(): + return self.symbols_id.__contains__(expr) + if expr.is_mem(): + return self.symbols_mem.__contains__(expr) + return False + + def __getitem__(self, expr): + return self.read(expr) + + def __setitem__(self, expr, value): + self.write(expr, value) + + def __delitem__(self, expr): + if expr.is_id(): + del self.symbols_id[expr] + elif expr.is_mem(): + del self.symbols_mem[expr] + else: + raise TypeError("Bad source expr") + + def copy(self): + """Copy object instance""" + obj = SymbolMngr(self, addrsize=self.addrsize, expr_simp=self.expr_simp) + return obj + + def clear(self): + """Forget every variables values""" + self.symbols_id.clear() + self.symbols_mem.clear() + + def read(self, src): + """ + Return the value corresponding to Expr @src + @src: ExprId or ExprMem instance + """ + if src.is_id(): + return self.symbols_id.get(src, src) + elif src.is_mem(): + # Only byte aligned accesses are supported for now + assert src.size % 8 == 0 + return self.symbols_mem.read(src.ptr, src.size) + else: + raise TypeError("Bad source expr") + + def write(self, dst, src): + """ + Update @dst with @src expression + @dst: ExprId or ExprMem instance + @src: Expression instance + """ + assert dst.size == src.size + if dst.is_id(): + if dst == src: + if dst in self.symbols_id: + del self.symbols_id[dst] + else: + self.symbols_id[dst] = src + elif dst.is_mem(): + # Only byte aligned accesses are supported for now + assert dst.size % 8 == 0 + self.symbols_mem.write(dst.ptr, src) + else: + raise TypeError("Bad destination expr") + + def dump(self, ids=True, mems=True): + """Display memory content""" + if ids: + for variable, value in self.ids(): + print('%s = %s' % (variable, value)) + if mems: + for mem, value in self.memory(): + print('%s = %s' % (mem, value)) + + def __repr__(self): + out = [] + for variable, value in viewitems(self): + out.append('%s = %s' % (variable, value)) + return "\n".join(out) + + def iteritems(self): + """ExprId/ExprMem iteritems of the current state""" + for variable, value in self.ids(): + yield variable, value + for variable, value in self.memory(): + yield variable, value + + def items(self): + """Return variables/values of the current state""" + return list(self.iteritems()) + + def __iter__(self): + for expr, _ in self.iteritems(): + yield expr + + def ids(self): + """Iterate on variables and their values.""" + for expr, value in viewitems(self.symbols_id): + yield expr, value + + def memory(self): + """Iterate on memory variables and their values.""" + for mem, value in viewitems(self.symbols_mem): + yield mem, value + + def keys(self): + """Variables of the current state""" + return list(self) + + +def merge_ptr_read(known, ptrs): + """ + Merge common memory parts in a multiple byte memory. + @ptrs: memory bytes list + @known: ptrs' associated boolean for present/unpresent memory part in the + store + """ + assert known + out = [] + known.append(None) + ptrs.append(None) + last, value, size = known[0], ptrs[0], 8 + for index, part in enumerate(known[1:], 1): + if part == last: + size += 8 + else: + out.append((last, value, size)) + last, value, size = part, ptrs[index], 8 + return out + + +class SymbolicExecutionEngine(object): + """ + Symbolic execution engine + Allow IR code emulation in symbolic domain + + + Examples: + from miasm.ir.symbexec import SymbolicExecutionEngine + from miasm.ir.ir import AssignBlock + + lifter = Lifter_X86_32() + + init_state = { + lifter.arch.regs.EAX: lifter.arch.regs.EBX, + ExprMem(id_x+ExprInt(0x10, 32), 32): id_a, + } + + sb_exec = SymbolicExecutionEngine(lifter, init_state) + + >>> sb_exec.dump() + EAX = a + @32[x + 0x10] = a + >>> sb_exec.dump(mems=False) + EAX = a + + >>> print sb_exec.eval_expr(lifter.arch.regs.EAX + lifter.arch.regs.ECX) + EBX + ECX + + Inspecting state: + - dump + - modified + State manipulation: + - '.state' (rw) + + Evaluation (read only): + - eval_expr + - eval_assignblk + Evaluation with state update: + - eval_updt_expr + - eval_updt_assignblk + - eval_updt_irblock + + Start a symbolic execution based on provisioned '.lifter' blocks: + - run_block_at + - run_at + """ + + StateEngine = SymbolicState + + def __init__(self, lifter, state=None, + sb_expr_simp=expr_simp_explicit): + + self.expr_to_visitor = { + ExprInt: self.eval_exprint, + ExprId: self.eval_exprid, + ExprLoc: self.eval_exprloc, + ExprMem: self.eval_exprmem, + ExprSlice: self.eval_exprslice, + ExprCond: self.eval_exprcond, + ExprOp: self.eval_exprop, + ExprCompose: self.eval_exprcompose, + } + + if state is None: + state = {} + + self.symbols = SymbolMngr(addrsize=lifter.addrsize, expr_simp=sb_expr_simp) + + for dst, src in viewitems(state): + self.symbols.write(dst, src) + + self.lifter = lifter + self.expr_simp = sb_expr_simp + + @property + def ir_arch(self): + warnings.warn('DEPRECATION WARNING: use ".lifter" instead of ".ir_arch"') + return self.lifter + + def get_state(self): + """Return the current state of the SymbolicEngine""" + state = self.StateEngine(dict(self.symbols)) + return state + + def set_state(self, state): + """Restaure the @state of the engine + @state: StateEngine instance + """ + self.symbols = SymbolMngr(addrsize=self.lifter.addrsize, expr_simp=self.expr_simp) + for dst, src in viewitems(dict(state)): + self.symbols[dst] = src + + state = property(get_state, set_state) + + def eval_expr_visitor(self, expr, cache=None): + """ + [DEV]: Override to change the behavior of an Expr evaluation. + This function recursively applies 'eval_expr*' to @expr. + This function uses @cache to speedup re-evaluation of expression. + """ + if cache is None: + cache = {} + + ret = cache.get(expr, None) + if ret is not None: + return ret + + new_expr = self.expr_simp(expr) + ret = cache.get(new_expr, None) + if ret is not None: + return ret + + func = self.expr_to_visitor.get(new_expr.__class__, None) + if func is None: + raise TypeError("Unknown expr type") + + ret = func(new_expr, cache=cache) + ret = self.expr_simp(ret) + assert ret is not None + + cache[expr] = ret + cache[new_expr] = ret + return ret + + def eval_exprint(self, expr, **kwargs): + """[DEV]: Evaluate an ExprInt using the current state""" + return expr + + def eval_exprid(self, expr, **kwargs): + """[DEV]: Evaluate an ExprId using the current state""" + ret = self.symbols.read(expr) + return ret + + def eval_exprloc(self, expr, **kwargs): + """[DEV]: Evaluate an ExprLoc using the current state""" + offset = self.lifter.loc_db.get_location_offset(expr.loc_key) + if offset is not None: + ret = ExprInt(offset, expr.size) + else: + ret = expr + return ret + + def eval_exprmem(self, expr, **kwargs): + """[DEV]: Evaluate an ExprMem using the current state + This function first evaluate the memory pointer value. + Override 'mem_read' to modify the effective memory accesses + """ + ptr = self.eval_expr_visitor(expr.ptr, **kwargs) + mem = ExprMem(ptr, expr.size) + ret = self.mem_read(mem) + return ret + + def eval_exprcond(self, expr, **kwargs): + """[DEV]: Evaluate an ExprCond using the current state""" + cond = self.eval_expr_visitor(expr.cond, **kwargs) + src1 = self.eval_expr_visitor(expr.src1, **kwargs) + src2 = self.eval_expr_visitor(expr.src2, **kwargs) + ret = ExprCond(cond, src1, src2) + return ret + + def eval_exprslice(self, expr, **kwargs): + """[DEV]: Evaluate an ExprSlice using the current state""" + arg = self.eval_expr_visitor(expr.arg, **kwargs) + ret = ExprSlice(arg, expr.start, expr.stop) + return ret + + def eval_exprop(self, expr, **kwargs): + """[DEV]: Evaluate an ExprOp using the current state""" + args = [] + for oarg in expr.args: + arg = self.eval_expr_visitor(oarg, **kwargs) + args.append(arg) + ret = ExprOp(expr.op, *args) + return ret + + def eval_exprcompose(self, expr, **kwargs): + """[DEV]: Evaluate an ExprCompose using the current state""" + args = [] + for arg in expr.args: + args.append(self.eval_expr_visitor(arg, **kwargs)) + ret = ExprCompose(*args) + return ret + + def eval_expr(self, expr, eval_cache=None): + """ + Evaluate @expr + @expr: Expression instance to evaluate + @cache: None or dictionary linking variables to their values + """ + if eval_cache is None: + eval_cache = {} + ret = self.eval_expr_visitor(expr, cache=eval_cache) + assert ret is not None + return ret + + def modified(self, init_state=None, ids=True, mems=True): + """ + Return the modified variables. + @init_state: a base dictionary linking variables to their initial values + to diff. Can be None. + @ids: track ids only + @mems: track mems only + """ + if init_state is None: + init_state = {} + if ids: + for variable, value in viewitems(self.symbols.symbols_id): + if variable in init_state and init_state[variable] == value: + continue + yield variable, value + if mems: + for mem, value in self.symbols.memory(): + if mem in init_state and init_state[mem] == value: + continue + yield mem, value + + def dump(self, ids=True, mems=True): + """ + Display modififed variables + @ids: display modified ids + @mems: display modified memory + """ + + for variable, value in self.modified(None, ids, mems): + print("%-18s" % variable, "=", "%s" % value) + + def eval_assignblk(self, assignblk): + """ + Evaluate AssignBlock using the current state + + Returns a dictionary containing modified keys associated to their values + + @assignblk: AssignBlock instance + """ + pool_out = {} + eval_cache = {} + for dst, src in viewitems(assignblk): + src = self.eval_expr(src, eval_cache) + if dst.is_mem(): + ptr = self.eval_expr(dst.ptr, eval_cache) + # Test if mem lookup is known + tmp = ExprMem(ptr, dst.size) + pool_out[tmp] = src + elif dst.is_id(): + pool_out[dst] = src + else: + raise ValueError("Unknown destination type", str(dst)) + + return pool_out + + def apply_change(self, dst, src): + """ + Apply @dst = @src on the current state WITHOUT evaluating both side + @dst: Expr, destination + @src: Expr, source + """ + if dst.is_mem(): + self.mem_write(dst, src) + else: + self.symbols.write(dst, src) + + def eval_updt_assignblk(self, assignblk): + """ + Apply an AssignBlock on the current state + @assignblk: AssignBlock instance + """ + mem_dst = [] + dst_src = self.eval_assignblk(assignblk) + for dst, src in viewitems(dst_src): + self.apply_change(dst, src) + if dst.is_mem(): + mem_dst.append(dst) + return mem_dst + + def eval_updt_irblock(self, irb, step=False): + """ + Symbolic execution of the @irb on the current state + @irb: irbloc instance + @step: display intermediate steps + """ + for assignblk in irb: + if step: + print('Instr', assignblk.instr) + print('Assignblk:') + print(assignblk) + print('_' * 80) + self.eval_updt_assignblk(assignblk) + if step: + self.dump(mems=False) + self.dump(ids=False) + print('_' * 80) + dst = self.eval_expr(self.lifter.IRDst) + + return dst + + def run_block_at(self, ircfg, addr, step=False): + """ + Symbolic execution of the block at @addr + @addr: address to execute (int or ExprInt or label) + @step: display intermediate steps + """ + irblock = ircfg.get_block(addr) + if irblock is not None: + addr = self.eval_updt_irblock(irblock, step=step) + return addr + + def run_at(self, ircfg, addr, lbl_stop=None, step=False): + """ + Symbolic execution starting at @addr + @addr: address to execute (int or ExprInt or label) + @lbl_stop: LocKey to stop execution on + @step: display intermediate steps + """ + while True: + irblock = ircfg.get_block(addr) + if irblock is None: + break + if irblock.loc_key == lbl_stop: + break + addr = self.eval_updt_irblock(irblock, step=step) + return addr + + def del_mem_above_stack(self, stack_ptr): + """ + Remove all stored memory values with following properties: + * pointer based on initial stack value + * pointer below current stack pointer + """ + stack_ptr = self.eval_expr(stack_ptr) + base, stk_offset = get_expr_base_offset(stack_ptr) + memarray = self.symbols.symbols_mem.base_to_memarray.get(base, None) + if memarray: + to_del = set() + for offset in memarray: + if ((offset - stk_offset) & int(stack_ptr.mask)) >> (stack_ptr.size - 1) != 0: + to_del.add(offset) + + for offset in to_del: + del memarray[offset] + + def eval_updt_expr(self, expr): + """ + Evaluate @expr and apply side effect if needed (ie. if expr is an + assignment). Return the evaluated value + """ + + # Update value if needed + if expr.is_assign(): + ret = self.eval_expr(expr.src) + self.eval_updt_assignblk(AssignBlock([expr])) + else: + ret = self.eval_expr(expr) + + return ret + + def mem_read(self, expr): + """ + [DEV]: Override to modify the effective memory reads + + Read symbolic value at ExprMem @expr + @expr: ExprMem + """ + return self.symbols.read(expr) + + def mem_write(self, dst, src): + """ + [DEV]: Override to modify the effective memory writes + + Write symbolic value @src at ExprMem @dst + @dst: destination ExprMem + @src: source Expression + """ + self.symbols.write(dst, src) diff --git a/src/miasm/ir/symbexec_top.py b/src/miasm/ir/symbexec_top.py new file mode 100644 index 00000000..e4b9708b --- /dev/null +++ b/src/miasm/ir/symbexec_top.py @@ -0,0 +1,221 @@ +from future.utils import viewitems + +from miasm.ir.symbexec import SymbolicExecutionEngine, StateEngine +from miasm.expression.simplifications import expr_simp +from miasm.expression.expression import ExprId, ExprInt, ExprSlice,\ + ExprMem, ExprCond, ExprCompose, ExprOp + + +TOPSTR = "TOP" + +def exprid_top(expr): + """Return a TOP expression (ExprId("TOP") of size @expr.size + @expr: expression to replace with TOP + """ + return ExprId(TOPSTR, expr.size) + + +class SymbolicStateTop(StateEngine): + + def __init__(self, dct, regstop): + self._symbols = frozenset(viewitems(dct)) + self._regstop = frozenset(regstop) + + def __hash__(self): + return hash((self.__class__, self._symbols, self._regstop)) + + def __str__(self): + out = [] + for dst, src in sorted(self._symbols): + out.append("%s = %s" % (dst, src)) + for dst in self._regstop: + out.append('TOP %s' %dst) + return "\n".join(out) + + def __eq__(self, other): + if self is other: + return True + if self.__class__ != other.__class__: + return False + return (self.symbols == other.symbols and + self.regstop == other.regstop) + + def __ne__(self, other): + return not self.__eq__(other) + + def __iter__(self): + for dst, src in self._symbols: + yield dst, src + + def merge(self, other): + """Merge two symbolic states + Only equal expressions are kept in both states + @other: second symbolic state + """ + symb_a = self.symbols + symb_b = other.symbols + intersection = set(symb_a).intersection(symb_b) + diff = set(symb_a).union(symb_b).difference(intersection) + symbols = {} + regstop = set() + for dst in diff: + if dst.is_id(): + regstop.add(dst) + for dst in intersection: + if symb_a[dst] == symb_b[dst]: + symbols[dst] = symb_a[dst] + else: + regstop.add(dst) + return self.__class__(symbols, regstop) + + @property + def symbols(self): + """Return the dictionary of known symbols""" + return dict(self._symbols) + + @property + def regstop(self): + """Return the set of expression with TOP values""" + return self._regstop + +class SymbExecTopNoMem(SymbolicExecutionEngine): + """ + Symbolic execution, include TOP value. + ExprMem are not propagated. + Any computation involving a TOP will generate TOP. + """ + + StateEngine = SymbolicStateTop + + def __init__(self, lifter, state, regstop, + sb_expr_simp=expr_simp): + known_symbols = dict(state) + super(SymbExecTopNoMem, self).__init__(lifter, known_symbols, + sb_expr_simp) + self.regstop = set(regstop) + + def get_state(self): + """Return the current state of the SymbolicEngine""" + return self.StateEngine(self.symbols, self.regstop) + + def eval_expr(self, expr, eval_cache=None): + if expr in self.regstop: + return exprid_top(expr) + if eval_cache is None: + eval_cache = {} + ret = self.apply_expr_on_state_visit_cache(expr, self.symbols, eval_cache) + return ret + + def manage_mem(self, expr, state, cache, level): + ptr = self.apply_expr_on_state_visit_cache(expr.arg, state, cache, level+1) + ret = ExprMem(ptr, expr.size) + ret = self.get_mem_state(ret) + if ret.is_mem() and not ret.arg.is_int() and ret.arg == ptr: + ret = exprid_top(expr) + assert expr.size == ret.size + return ret + + + def eval_exprid(self, expr, **kwargs): + """[DEV]: Evaluate an ExprId using the current state""" + if expr in self.regstop: + ret = exprid_top(expr) + else: + ret = self.symbols.read(expr) + return ret + + def eval_exprloc(self, expr, **kwargs): + offset = self.lifter.loc_db.get_location_offset(expr.loc_key) + if offset is not None: + ret = ExprInt(offset, expr.size) + else: + ret = expr + return ret + + def eval_exprcond(self, expr, **kwargs): + """[DEV]: Evaluate an ExprCond using the current state""" + cond = self.eval_expr_visitor(expr.cond, **kwargs) + src1 = self.eval_expr_visitor(expr.src1, **kwargs) + src2 = self.eval_expr_visitor(expr.src2, **kwargs) + if cond.is_id(TOPSTR) or src1.is_id(TOPSTR) or src2.is_id(TOPSTR): + ret = exprid_top(expr) + else: + ret = ExprCond(cond, src1, src2) + return ret + + def eval_exprslice(self, expr, **kwargs): + """[DEV]: Evaluate an ExprSlice using the current state""" + arg = self.eval_expr_visitor(expr.arg, **kwargs) + if arg.is_id(TOPSTR): + ret = exprid_top(expr) + else: + ret = ExprSlice(arg, expr.start, expr.stop) + return ret + + def eval_exprop(self, expr, **kwargs): + """[DEV]: Evaluate an ExprOp using the current state""" + args = [] + for oarg in expr.args: + arg = self.eval_expr_visitor(oarg, **kwargs) + if arg.is_id(TOPSTR): + return exprid_top(expr) + args.append(arg) + ret = ExprOp(expr.op, *args) + return ret + + def eval_exprcompose(self, expr, **kwargs): + """[DEV]: Evaluate an ExprCompose using the current state""" + args = [] + for arg in expr.args: + arg = self.eval_expr_visitor(arg, **kwargs) + if arg.is_id(TOPSTR): + return exprid_top(expr) + args.append(arg) + ret = ExprCompose(*args) + return ret + + def apply_change(self, dst, src): + eval_cache = {} + if dst.is_mem(): + # If Write to TOP, forget all memory information + ret = self.eval_expr(dst.arg, eval_cache) + if ret.is_id(TOPSTR): + to_del = set() + for dst_tmp in self.symbols: + if dst_tmp.is_mem(): + to_del.add(dst_tmp) + for dst_to_del in to_del: + del self.symbols[dst_to_del] + return + src_o = self.expr_simp(src) + + # Force update. Ex: + # EBX += 1 (state: EBX = EBX+1) + # EBX -= 1 (state: EBX = EBX, must be updated) + if dst in self.regstop: + self.regstop.discard(dst) + self.symbols[dst] = src_o + + if dst == src_o: + # Avoid useless X = X information + del self.symbols[dst] + + if src_o.is_id(TOPSTR): + if dst in self.symbols: + del self.symbols[dst] + self.regstop.add(dst) + +class SymbExecTop(SymbExecTopNoMem): + """ + Symbolic execution, include TOP value. + ExprMem are propagated. + Any computation involving a TOP will generate TOP. + WARNING: avoid memory aliases here! + """ + + def manage_mem(self, expr, state, cache, level): + ptr = self.apply_expr_on_state_visit_cache(expr.arg, state, cache, level+1) + ret = ExprMem(ptr, expr.size) + ret = self.get_mem_state(ret) + assert expr.size == ret.size + return ret diff --git a/src/miasm/ir/symbexec_types.py b/src/miasm/ir/symbexec_types.py new file mode 100644 index 00000000..e720f6fd --- /dev/null +++ b/src/miasm/ir/symbexec_types.py @@ -0,0 +1,131 @@ +from __future__ import print_function + +from future.utils import viewitems + +from miasm.ir.symbexec import SymbolicExecutionEngine, StateEngine +from miasm.expression.simplifications import expr_simp +from miasm.expression.expression import ExprId, ExprMem + + +class SymbolicStateCTypes(StateEngine): + """Store C types of symbols""" + + def __init__(self, symbols): + tmp = {} + for expr, types in viewitems(symbols): + tmp[expr] = frozenset(types) + self._symbols = frozenset(viewitems(tmp)) + + def __hash__(self): + return hash((self.__class__, self._symbols)) + + def __str__(self): + out = [] + for dst, src in sorted(self._symbols): + out.append("%s = %s" % (dst, src)) + return "\n".join(out) + + def __eq__(self, other): + if self is other: + return True + if self.__class__ != other.__class__: + return False + return self.symbols == other.symbols + + def __ne__(self, other): + return not self.__eq__(other) + + def __iter__(self): + for dst, src in self._symbols: + yield dst, src + + def merge(self, other): + """Merge two symbolic states + The resulting types are the union of types of both states. + @other: second symbolic state + """ + symb_a = self.symbols + symb_b = other.symbols + symbols = {} + for expr in set(symb_a).union(set(symb_b)): + ctypes = symb_a.get(expr, set()).union(symb_b.get(expr, set())) + if ctypes: + symbols[expr] = ctypes + return self.__class__(symbols) + + @property + def symbols(self): + """Return the dictionary of known symbols'types""" + return dict(self._symbols) + + +class SymbExecCType(SymbolicExecutionEngine): + """Engine of C types propagation + WARNING: avoid memory aliases here! + """ + + StateEngine = SymbolicStateCTypes + OBJC_INTERNAL = "___OBJC___" + + def __init__(self, lifter, + symbols, + chandler, + sb_expr_simp=expr_simp): + self.chandler = chandler + + super(SymbExecCType, self).__init__(lifter, + {}, + sb_expr_simp) + self.symbols = dict(symbols) + + def get_state(self): + """Return the current state of the SymbolicEngine""" + return self.StateEngine(self.symbols) + + def eval_assignblk(self, assignblk): + """ + Evaluate AssignBlock on the current state + @assignblk: AssignBlock instance + """ + pool_out = {} + for dst, src in viewitems(assignblk): + objcs = self.chandler.expr_to_types(src, self.symbols) + if isinstance(dst, ExprMem): + continue + elif isinstance(dst, ExprId): + pool_out[dst] = frozenset(objcs) + else: + raise ValueError("Unsupported assignment", str(dst)) + return pool_out + + def eval_expr(self, expr, eval_cache=None): + return frozenset(self.chandler.expr_to_types(expr, self.symbols)) + + def apply_change(self, dst, src): + if src is None: + if dst in self.symbols: + del self.symbols[dst] + else: + self.symbols[dst] = src + + def del_mem_above_stack(self, stack_ptr): + """No stack deletion""" + return + + def dump_id(self): + """ + Dump modififed registers symbols only + """ + for expr, expr_types in sorted(viewitems(self.symbols)): + if not expr.is_mem(): + print(expr) + for expr_type in expr_types: + print('\t', expr_type) + + def dump_mem(self): + """ + Dump modififed memory symbols + """ + for expr, value in sorted(viewitems(self.symbols)): + if expr.is_mem(): + print(expr, value) diff --git a/src/miasm/ir/translators/C.py b/src/miasm/ir/translators/C.py new file mode 100644 index 00000000..55a229ff --- /dev/null +++ b/src/miasm/ir/translators/C.py @@ -0,0 +1,536 @@ +from miasm.ir.translators.translator import Translator +from miasm.core.utils import size2mask +from miasm.expression.expression import ExprInt, ExprCond, ExprCompose, \ + TOK_EQUAL, \ + TOK_INF_SIGNED, TOK_INF_UNSIGNED, \ + TOK_INF_EQUAL_SIGNED, TOK_INF_EQUAL_UNSIGNED, \ + is_associative + +def int_size_to_bn(value, size): + if size < 32: + int_str = "%.8x" % value + size_nibble = 8 + else: + # size must be multiple of 4 + size = ((size + 31) // 32) * 32 + size_nibble = size // 4 + fmt_str = "%%.%dx" % size_nibble + int_str = fmt_str % value + assert len(int_str) == size_nibble + return int_str, size_nibble + + +TOK_CMP_TO_NATIVE_C = { + TOK_EQUAL: "==", + TOK_INF_SIGNED: "<", + TOK_INF_UNSIGNED: "<", + TOK_INF_EQUAL_SIGNED: "<=", + TOK_INF_EQUAL_UNSIGNED: "<=", +} + +TOK_CMP_TO_BIGNUM_C = { + TOK_EQUAL: "equal", + TOK_INF_SIGNED: "inf_signed", + TOK_INF_UNSIGNED: "inf_unsigned", + TOK_INF_EQUAL_SIGNED: "inf_equal_signed", + TOK_INF_EQUAL_UNSIGNED: "inf_equal_unsigned", +} + + +def get_c_common_next_pow2(size): + # For uncommon expression size, use at least uint8 + size = max(size, 8) + next_power = 1 + while next_power < size: + next_power <<= 1 + size = next_power + return size + + +class TranslatorC(Translator): + "Translate a Miasm expression to an equivalent C code" + + # Implemented language + __LANG__ = "C" + + # Operations translation + dct_shift = {'a>>': "right_arith", + '>>': "right_logic", + '<<': "left_logic", + } + dct_rot = {'<<<': 'rot_left', + '>>>': 'rot_right', + } + + NATIVE_INT_MAX_SIZE = 64 + + def __init__(self, loc_db=None, **kwargs): + """Instance a C translator + @loc_db: LocationDB instance + """ + super(TranslatorC, self).__init__(**kwargs) + # symbol pool + self.loc_db = loc_db + + def _size2mask(self, size): + """Return a C string corresponding to the size2mask operation, with support for + @size <= 64""" + assert size <= 64 + mask = size2mask(size) + return "0x%x" % mask + + def from_ExprId(self, expr): + return str(expr) + + def from_ExprInt(self, expr): + if expr.size <= self.NATIVE_INT_MAX_SIZE: + assert expr.size <= 64 + out = "0x%x" % int(expr) + if expr.size == 64: + out += "ULL" + return out + value, int_size = int_size_to_bn(int(expr), expr.size) + return 'bignum_from_string("%s", %d)' % (value, int_size) + + def from_ExprLoc(self, expr): + loc_key = expr.loc_key + if self.loc_db is None: + return str(loc_key) + offset = self.loc_db.get_location_offset(loc_key) + if offset is None: + return str(loc_key) + + if expr.size <= self.NATIVE_INT_MAX_SIZE: + return "0x%x" % offset + + value, int_size = int_size_to_bn(offset, 64) + return 'bignum_from_string("%s", %d)' % (value, int_size) + + def from_ExprAssign(self, expr): + new_dst = self.from_expr(expr.dst) + new_src = self.from_expr(expr.src) + return "%s = %s" % (new_dst, new_src) + + def from_ExprCond(self, expr): + cond = self.from_expr(expr.cond) + src1 = self.from_expr(expr.src1) + src2 = self.from_expr(expr.src2) + if not expr.cond.size <= self.NATIVE_INT_MAX_SIZE: + cond = "(!bignum_is_zero(%s))" % cond + out = "(%s?%s:%s)" % (cond, src1, src2) + return out + + def from_ExprMem(self, expr): + ptr = expr.ptr + if ptr.size <= self.NATIVE_INT_MAX_SIZE: + new_ptr = self.from_expr(ptr) + if expr.size <= self.NATIVE_INT_MAX_SIZE: + # Native ptr, Native Mem + return "MEM_LOOKUP_%.2d(jitcpu, %s)" % (expr.size, new_ptr) + else: + # Native ptr, BN mem + return "MEM_LOOKUP_INT_BN(jitcpu, %d, %s)" % (expr.size, new_ptr) + # BN ptr + new_ptr = self.from_expr(ptr) + + if expr.size <= self.NATIVE_INT_MAX_SIZE: + # BN ptr, Native Mem + return "MEM_LOOKUP_BN_INT(jitcpu, %d, %s)" % (expr.size, new_ptr) + else: + # BN ptr, BN mem + return "MEM_LOOKUP_BN_BN(jitcpu, %d, %s)" % (expr.size, new_ptr) + + def from_ExprOp(self, expr): + if len(expr.args) == 1: + if expr.op == 'parity': + arg = expr.args[0] + out = self.from_expr(arg) + if arg.size <= self.NATIVE_INT_MAX_SIZE: + out = "(%s&%s)" % (out, self._size2mask(arg.size)) + else: + out = 'bignum_mask(%s, 8)' % (out, 8) + out = 'bignum_to_uint64(%s)' % out + out = 'parity(%s)' % out + return out + + elif expr.op.startswith("zeroExt_"): + arg = expr.args[0] + if expr.size == arg.size: + return arg + return self.from_expr(ExprCompose(arg, ExprInt(0, expr.size - arg.size))) + + elif expr.op.startswith("signExt_"): + arg = expr.args[0] + if expr.size == arg.size: + return arg + add_size = expr.size - arg.size + new_expr = ExprCompose( + arg, + ExprCond( + arg.msb(), + ExprInt(size2mask(add_size), add_size), + ExprInt(0, add_size) + ) + ) + return self.from_expr(new_expr) + + + elif expr.op in ['cntleadzeros', 'cnttrailzeros']: + arg = expr.args[0] + out = self.from_expr(arg) + if arg.size <= self.NATIVE_INT_MAX_SIZE: + out = "%s(0x%x, %s)" % (expr.op, expr.args[0].size, out) + else: + out = "bignum_%s(%s, %d)" % (expr.op, out, arg.size) + return out + + elif expr.op == '!': + arg = expr.args[0] + out = self.from_expr(arg) + if expr.size <= self.NATIVE_INT_MAX_SIZE: + out = "(~ %s)&%s" % (out, self._size2mask(arg.size)) + else: + out = "bignum_not(%s)" % out + out = "bignum_mask(%s, expr.size)" % out + return out + + elif expr.op in [ + "ftan", "frndint", "f2xm1", "fsin", "fsqrt", "fabs", "fcos", + "fchs", + ]: + return "fpu_%s%d(%s)" % ( + expr.op, + expr.size, + self.from_expr(expr.args[0]), + ) + elif (expr.op.startswith("access_") or + expr.op.startswith("load_") or + expr.op.startswith("fxam_c")): + arg = expr.args[0] + out = self.from_expr(arg) + out = "%s(%s)" % (expr.op, out) + return out + + elif expr.op == "-": + arg = expr.args[0] + out = self.from_expr(arg) + if arg.size <= self.NATIVE_INT_MAX_SIZE: + out = "(%s(%s))" % (expr.op, out) + out = "(%s&%s)" % (out, self._size2mask(arg.size)) + else: + out = "bignum_sub(bignum_from_uint64(0), %s)" % out + out = "bignum_mask(%s, %d)"% (out, expr.size) + return out + + elif expr.op.startswith("fpround_"): + return "%s_fp%d(%s)" % ( + expr.op, + expr.size, + self.from_expr(expr.args[0]), + ) + elif expr.op == "sint_to_fp": + size = expr.size + arg = expr.args[0] + if size not in [32, 64]: + raise RuntimeError( + "Unsupported size for sint_to_fp: %r" % size + ) + return "%s_%d(%s)" % (expr.op, size, self.from_expr(arg)) + elif expr.op.startswith("fp_to_sint"): + dest_size = expr.size + arg_size = expr.args[0].size + if (arg_size, dest_size) in [ + (32, 32), (64, 64), (64, 32), + ]: + func = "fp%d_to_sint%d" % (arg_size, dest_size) + else: + raise RuntimeError( + "Unsupported size for fp_to_sint: %r to %r" % ( + arg_size, + dest_size + )) + return "%s(%s)" % (func, self.from_expr(expr.args[0])) + elif expr.op.startswith("fpconvert_fp"): + dest_size = expr.size + arg_size = expr.args[0].size + if (arg_size, dest_size) in [ + (32, 64), (64, 32) + ]: + func = "fp%d_to_fp%d" % (arg_size, dest_size) + else: + raise RuntimeError( + "Unsupported size for fpconvert: %r to %r" % (arg_size, + dest_size) + ) + return "%s(%s)" % (func, self.from_expr(expr.args[0])) + else: + raise NotImplementedError('Unknown op: %r' % expr.op) + + elif len(expr.args) == 2: + if expr.op in self.dct_shift: + arg0 = self.from_expr(expr.args[0]) + arg1 = self.from_expr(expr.args[1]) + if expr.size <= self.NATIVE_INT_MAX_SIZE: + out = 'SHIFT_%s(%d, %s, %s)' % ( + self.dct_shift[expr.op].upper(), + expr.args[0].size, + arg0, + arg1 + ) + else: + op = { + "<<": "lshift", + ">>": "rshift", + "a>>": "a_rshift" + } + out = "bignum_%s(%s, bignum_to_uint64(%s))" % ( + op[expr.op], arg0, arg1 + ) + out = "bignum_mask(%s, %d)"% (out, expr.size) + return out + + elif is_associative(expr): + args = [self.from_expr(arg) + for arg in expr.args] + if expr.size <= self.NATIVE_INT_MAX_SIZE: + out = (" %s " % expr.op).join(args) + out = "((%s)&%s)" % (out, self._size2mask(expr.size)) + else: + op_to_bn_func = { + "+": "add", + "*": "mul", + "|": "or", + "^": "xor", + "&": "and", + } + args = list(expr.args) + out = self.from_expr(args.pop()) + while args: + out = 'bignum_mask(bignum_%s(%s, %s), %d)' % ( + op_to_bn_func[expr.op], + out, + self.from_expr(args.pop()), + expr.size + ) + return out + + elif expr.op in ['-']: + return '(((%s&%s) %s (%s&%s))&%s)' % ( + self.from_expr(expr.args[0]), + self._size2mask(expr.args[0].size), + str(expr.op), + self.from_expr(expr.args[1]), + self._size2mask(expr.args[1].size), + self._size2mask(expr.args[0].size) + ) + elif expr.op in self.dct_rot: + arg0 = self.from_expr(expr.args[0]) + arg1 = self.from_expr(expr.args[1]) + if expr.size <= self.NATIVE_INT_MAX_SIZE: + out = '(%s(%s, %s, %s) &%s)' % ( + self.dct_rot[expr.op], + expr.args[0].size, + arg0, + arg1, + self._size2mask(expr.args[0].size), + ) + else: + op = { + ">>>": "ror", + "<<<": "rol" + } + out = "bignum_%s(%s, %d, bignum_to_uint64(%s))" % ( + op[expr.op], arg0, expr.size, arg1 + ) + out = "bignum_mask(%s, %d)"% (out, expr.size) + return out + + elif expr.op == 'x86_cpuid': + return "%s(%s, %s)" % (expr.op, + self.from_expr(expr.args[0]), + self.from_expr(expr.args[1])) + elif expr.op.startswith("fcom"): + arg0 = self.from_expr(expr.args[0]) + arg1 = self.from_expr(expr.args[1]) + if not expr.args[0].size <= self.NATIVE_INT_MAX_SIZE: + raise ValueError("Bad semantic: fpu do operations do not support such size") + out = "fpu_%s(%s, %s)" % (expr.op, arg0, arg1) + return out + + elif expr.op in ["fadd", "fsub", "fdiv", 'fmul', "fscale", + "fprem", "fyl2x", "fpatan"]: + arg0 = self.from_expr(expr.args[0]) + arg1 = self.from_expr(expr.args[1]) + if not expr.args[0].size <= self.NATIVE_INT_MAX_SIZE: + raise ValueError("Bad semantic: fpu do operations do not support such size") + out = "fpu_%s%d(%s, %s)" % (expr.op, expr.size, arg0, arg1) + return out + + elif expr.op == "segm": + return "segm2addr(jitcpu, %s, %s)" % ( + self.from_expr(expr.args[0]), + self.from_expr(expr.args[1]) + ) + + elif expr.op in ['udiv', 'umod']: + arg0 = self.from_expr(expr.args[0]) + arg1 = self.from_expr(expr.args[1]) + + if expr.size <= self.NATIVE_INT_MAX_SIZE: + out = '%s%d(%s, %s)' % ( + expr.op, + expr.args[0].size, + arg0, + arg1 + ) + else: + out = "bignum_%s(%s, %s)" % ( + expr.op, + arg0, + arg1 + ) + out = "bignum_mask(%s, %d)"% (out, expr.size) + return out + + + + elif expr.op in ['sdiv', 'smod']: + arg0 = self.from_expr(expr.args[0]) + arg1 = self.from_expr(expr.args[1]) + + if expr.size <= self.NATIVE_INT_MAX_SIZE: + out = '%s%d(%s, %s)' % ( + expr.op, + expr.args[0].size, + arg0, + arg1 + ) + else: + out = "bignum_%s(%s, %s, %d)" % ( + expr.op, + arg0, + arg1, + expr.size + ) + out = "bignum_mask(%s, %d)"% (out, expr.size) + return out + + elif expr.op in ["bcdadd", "bcdadd_cf"]: + return "%s_%d(%s, %s)" % ( + expr.op, expr.args[0].size, + self.from_expr(expr.args[0]), + self.from_expr(expr.args[1]) + ) + + + elif expr.op in [ + TOK_EQUAL, + TOK_INF_SIGNED, + TOK_INF_UNSIGNED, + TOK_INF_EQUAL_SIGNED, + TOK_INF_EQUAL_UNSIGNED, + ]: + arg0, arg1 = expr.args + if expr.size <= self.NATIVE_INT_MAX_SIZE: + size = get_c_common_next_pow2(arg0.size) + op = TOK_CMP_TO_NATIVE_C[expr.op] + if expr.op in [TOK_INF_SIGNED, TOK_INF_EQUAL_SIGNED]: + arg0 = arg0.signExtend(size) + arg1 = arg1.signExtend(size) + arg0_C = self.from_expr(arg0) + arg1_C = self.from_expr(arg1) + cast = "(int%d_t)" % size + else: + arg0 = arg0.signExtend(size) + arg1 = arg1.signExtend(size) + arg0_C = self.from_expr(arg0) + arg1_C = self.from_expr(arg1) + cast = "(uint%d_t)" % size + out = '((%s%s %s %s%s)?1:0)' % ( + cast, + arg0_C, + op, + cast, + arg1_C + ) + else: + op = TOK_CMP_TO_BIGNUM_C[expr.op] + out = "bignum_is_%s(%s, %s)" % ( + op, + arg0, + arg1 + ) + out = "bignum_mask(%s, %d)"% (out, expr.size) + return out + + + else: + raise NotImplementedError('Unknown op: %r' % expr.op) + + elif len(expr.args) >= 3 and is_associative(expr): # ????? + oper = ['(%s&%s)' % ( + self.from_expr(arg), + self._size2mask(arg.size), + ) + for arg in expr.args] + oper = str(expr.op).join(oper) + return "((%s)&%s)" % ( + oper, + self._size2mask(expr.args[0].size) + ) + else: + raise NotImplementedError('Unknown op: %s' % expr.op) + + def from_ExprSlice(self, expr): + out = self.from_expr(expr.arg) + if expr.arg.size <= self.NATIVE_INT_MAX_SIZE: + # XXX check mask for 64 bit & 32 bit compat + out = "((%s>>%d) &%s)" % ( + out, expr.start, + self._size2mask(expr.stop - expr.start) + ) + else: + out = "bignum_rshift(%s, %d)" % (out, expr.start) + out = "bignum_mask(%s, %d)" % (out, expr.stop - expr.start) + + if expr.size <= self.NATIVE_INT_MAX_SIZE: + # Convert bignum to int + out = "bignum_to_uint64(%s)" % out + return out + + def from_ExprCompose(self, expr): + if expr.size <= self.NATIVE_INT_MAX_SIZE: + + out = [] + size = get_c_common_next_pow2(expr.size) + dst_cast = "uint%d_t" % size + for index, arg in expr.iter_args(): + out.append("(((%s)(%s & %s)) << %d)" % ( + dst_cast, + self.from_expr(arg), + self._size2mask(arg.size), + index) + ) + out = ' | '.join(out) + return '(' + out + ')' + else: + # Convert all parts to bignum + args = [] + for index, arg in expr.iter_args(): + arg_str = self.from_expr(arg) + if arg.size <= self.NATIVE_INT_MAX_SIZE: + arg_str = '((%s) & %s)' % (arg_str, self._size2mask(arg.size)) + arg_str = 'bignum_from_uint64(%s)' % arg_str + else: + arg_str = 'bignum_mask(%s, %d)' % (arg_str, arg.size) + arg_str = 'bignum_lshift(%s, %d)' % (arg_str, index) + args.append(arg_str) + out = args.pop() + while args: + arg = args.pop() + out = "bignum_or(%s, %s)" % (out, arg) + return out + + +# Register the class +Translator.register(TranslatorC) diff --git a/src/miasm/ir/translators/__init__.py b/src/miasm/ir/translators/__init__.py new file mode 100644 index 00000000..45e19803 --- /dev/null +++ b/src/miasm/ir/translators/__init__.py @@ -0,0 +1,13 @@ +"""IR Translators""" +from miasm.ir.translators.translator import Translator +import miasm.ir.translators.C +import miasm.ir.translators.python +import miasm.ir.translators.miasm_ir +import miasm.ir.translators.smt2 +try: + import miasm.ir.translators.z3_ir +except ImportError: + # Nothing to do, z3 not available + pass + +__all__ = ["Translator"] diff --git a/src/miasm/ir/translators/miasm_ir.py b/src/miasm/ir/translators/miasm_ir.py new file mode 100644 index 00000000..a460d446 --- /dev/null +++ b/src/miasm/ir/translators/miasm_ir.py @@ -0,0 +1,45 @@ +from builtins import map +from miasm.ir.translators.translator import Translator + + +class TranslatorMiasm(Translator): + "Translate a Miasm expression to its Python building form" + + __LANG__ = "Miasm" + + def from_ExprId(self, expr): + return "ExprId(%s, size=%d)" % (repr(expr.name), expr.size) + + def from_ExprInt(self, expr): + return "ExprInt(0x%x, %d)" % (int(expr), expr.size) + + def from_ExprCond(self, expr): + return "ExprCond(%s, %s, %s)" % (self.from_expr(expr.cond), + self.from_expr(expr.src1), + self.from_expr(expr.src2)) + + def from_ExprSlice(self, expr): + return "ExprSlice(%s, %d, %d)" % (self.from_expr(expr.arg), + expr.start, + expr.stop) + + def from_ExprOp(self, expr): + return "ExprOp(%s, %s)" % ( + repr(expr.op), + ", ".join(map(self.from_expr, expr.args)) + ) + + def from_ExprCompose(self, expr): + args = ["%s" % self.from_expr(arg) for arg in expr.args] + return "ExprCompose(%s)" % ", ".join(args) + + def from_ExprAssign(self, expr): + return "ExprAssign(%s, %s)" % (self.from_expr(expr.dst), + self.from_expr(expr.src)) + + def from_ExprMem(self, expr): + return "ExprMem(%s, size=%d)" % (self.from_expr(expr.ptr), expr.size) + + +# Register the class +Translator.register(TranslatorMiasm) diff --git a/src/miasm/ir/translators/python.py b/src/miasm/ir/translators/python.py new file mode 100644 index 00000000..4e5cc5e1 --- /dev/null +++ b/src/miasm/ir/translators/python.py @@ -0,0 +1,103 @@ +from builtins import map +from miasm.expression.expression import ExprInt +from miasm.ir.translators.translator import Translator +from miasm.expression.expression import ExprCond, ExprInt + + +class TranslatorPython(Translator): + """Translate a Miasm expression to an equivalent Python code + + Memory is abstracted using the unimplemented function: + int memory(int address, int size) + """ + + # Implemented language + __LANG__ = "Python" + # Operations translation + op_no_translate = ["+", "-", "/", "%", ">>", "<<", "&", "^", "|", "*"] + + def from_ExprInt(self, expr): + return str(expr) + + def from_ExprId(self, expr): + return str(expr) + + def from_ExprLoc(self, expr): + return str(expr) + + def from_ExprMem(self, expr): + return "memory(%s, 0x%x)" % ( + self.from_expr(expr.ptr), + expr.size // 8 + ) + + def from_ExprSlice(self, expr): + out = self.from_expr(expr.arg) + if expr.start != 0: + out = "(%s >> %d)" % (out, expr.start) + return "(%s & 0x%x)" % (out, (1 << (expr.stop - expr.start)) - 1) + + def from_ExprCompose(self, expr): + out = [] + for index, arg in expr.iter_args(): + out.append( + "((%s & 0x%x) << %d)" % ( + self.from_expr(arg), + (1 << arg.size) - 1, + index + ) + ) + return "(%s)" % ' | '.join(out) + + def from_ExprCond(self, expr): + return "(%s if (%s) else %s)" % ( + self.from_expr(expr.src1), + self.from_expr(expr.cond), + self.from_expr(expr.src2) + ) + + def from_ExprOp(self, expr): + if expr.op in self.op_no_translate: + args = list(map(self.from_expr, expr.args)) + if len(expr.args) == 1: + return "((%s %s) & 0x%x)" % ( + expr.op, + args[0], + (1 << expr.size) - 1 + ) + else: + return "((%s) & 0x%x)" % ( + (" %s " % expr.op).join(args), + (1 << expr.size) - 1 + ) + elif expr.op == "parity": + return "(%s & 0x1)" % self.from_expr(expr.args[0]) + elif expr.op == "==": + return self.from_expr( + ExprCond(expr.args[0] - expr.args[1], ExprInt(0, 1), ExprInt(1, 1)) + ) + + elif expr.op in ["<<<", ">>>"]: + amount_raw = expr.args[1] + amount = expr.args[1] % ExprInt(amount_raw.size, expr.size) + amount_inv = ExprInt(expr.size, expr.size) - amount + if expr.op == "<<<": + amount, amount_inv = amount_inv, amount + part1 = "(%s >> %s)"% (self.from_expr(expr.args[0]), + self.from_expr(amount)) + part2 = "(%s << %s)"% (self.from_expr(expr.args[0]), + self.from_expr(amount_inv)) + + return "((%s | %s) &0x%x)" % (part1, part2, int(expr.mask)) + + raise NotImplementedError("Unknown operator: %s" % expr.op) + + def from_ExprAssign(self, expr): + return "%s = %s" % ( + self.from_expr(expr.dst), + self.from_expr(expr.src) + ) + + +# Register the class +Translator.register(TranslatorPython) diff --git a/src/miasm/ir/translators/smt2.py b/src/miasm/ir/translators/smt2.py new file mode 100644 index 00000000..d3366b8b --- /dev/null +++ b/src/miasm/ir/translators/smt2.py @@ -0,0 +1,330 @@ +from builtins import map +from builtins import range +import logging + +from miasm.ir.translators.translator import Translator +from miasm.expression.smt2_helper import * +from miasm.expression.expression import ExprCond, ExprInt + + +log = logging.getLogger("translator_smt2") +console_handler = logging.StreamHandler() +console_handler.setFormatter(logging.Formatter("[%(levelname)-8s]: %(message)s")) +log.addHandler(console_handler) +log.setLevel(logging.WARNING) + +class SMT2Mem(object): + """ + Memory abstraction for TranslatorSMT2. Memory elements are only accessed, + never written. To give a concrete value for a given memory cell in a solver, + add "mem32.get(address, size) == <value>" constraints to your equation. + The endianness of memory accesses is handled accordingly to the "endianness" + attribute. + Note: Will have one memory space for each addressing size used. + For example, if memory is accessed via 32 bits values and 16 bits values, + these access will not occur in the same address space. + + Adapted from Z3Mem + """ + + def __init__(self, endianness="<", name="mem"): + """Initializes an SMT2Mem object with a given @name and @endianness. + @endianness: Endianness of memory representation. '<' for little endian, + '>' for big endian. + @name: name of memory Arrays generated. They will be named + name+str(address size) (for example mem32, mem16...). + """ + if endianness not in ['<', '>']: + raise ValueError("Endianness should be '>' (big) or '<' (little)") + self.endianness = endianness + self.mems = {} # Address size -> SMT2 memory array + self.name = name + # initialise address size + self.addr_size = 0 + + def get_mem_array(self, size): + """Returns an SMT Array used internally to represent memory for addresses + of size @size. + @size: integer, size in bit of addresses in the memory to get. + Return an string with the name of the SMT array.. + """ + try: + mem = self.mems[size] + except KeyError: + # Lazy instantiation + self.mems[size] = self.name + str(size) + mem = self.mems[size] + return mem + + def __getitem__(self, addr): + """One byte memory access. Different address sizes with the same value + will result in different memory accesses. + @addr: an SMT2 expression, the address to read. + Return an SMT2 expression of size 8 bits representing a memory access. + """ + size = self.addr_size + mem = self.get_mem_array(size) + return array_select(mem, addr) + + def get(self, addr, size, addr_size): + """ Memory access at address @addr of size @size with + address size @addr_size. + @addr: an SMT2 expression, the address to read. + @size: int, size of the read in bits. + @addr_size: int, size of the address + Return a SMT2 expression representing a memory access. + """ + # set address size per read access + self.addr_size = addr_size + + original_size = size + if original_size % 8 != 0: + # Size not aligned on 8bits -> read more than size and extract after + size = ((original_size // 8) + 1) * 8 + res = self[addr] + if self.is_little_endian(): + for i in range(1, size // 8): + index = bvadd(addr, bit_vec_val(i, addr_size)) + res = bv_concat(self[index], res) + else: + for i in range(1, size // 8): + res = bv_concat(res, self[index]) + if size == original_size: + return res + else: + # Size not aligned, extract right sized result + return bv_extract(original_size-1, 0, res) + + def is_little_endian(self): + """True if this memory is little endian.""" + return self.endianness == "<" + + def is_big_endian(self): + """True if this memory is big endian.""" + return not self.is_little_endian() + + +class TranslatorSMT2(Translator): + """Translate a Miasm expression into an equivalent SMT2 + expression. Memory is abstracted via SMT2Mem. + The result of from_expr will be an SMT2 expression. + + If you want to interact with the memory abstraction after the translation, + you can instantiate your own SMT2Mem that will be equivalent to the one + used by TranslatorSMT2. + + TranslatorSMT2 provides the creation of a valid SMT2 file. For this, + it keeps track of the translated bit vectors. + + Adapted from TranslatorZ3 + """ + + # Implemented language + __LANG__ = "smt2" + + def __init__(self, endianness="<", loc_db=None, **kwargs): + """Instance a SMT2 translator + @endianness: (optional) memory endianness + """ + super(TranslatorSMT2, self).__init__(**kwargs) + # memory abstraction + self._mem = SMT2Mem(endianness) + # map of translated bit vectors + self._bitvectors = dict() + # symbol pool + self.loc_db = loc_db + + def from_ExprInt(self, expr): + return bit_vec_val(int(expr), expr.size) + + def from_ExprId(self, expr): + if str(expr) not in self._bitvectors: + self._bitvectors[str(expr)] = expr.size + return str(expr) + + def from_ExprLoc(self, expr): + loc_key = expr.loc_key + if self.loc_db is None or self.loc_db.get_location_offset(loc_key) is None: + if str(loc_key) not in self._bitvectors: + self._bitvectors[str(loc_key)] = expr.size + return str(loc_key) + + offset = self.loc_db.get_location_offset(loc_key) + return bit_vec_val(str(offset), expr.size) + + def from_ExprMem(self, expr): + addr = self.from_expr(expr.ptr) + # size to read from memory + size = expr.size + # size of memory address + addr_size = expr.ptr.size + return self._mem.get(addr, size, addr_size) + + def from_ExprSlice(self, expr): + res = self.from_expr(expr.arg) + res = bv_extract(expr.stop-1, expr.start, res) + return res + + def from_ExprCompose(self, expr): + res = None + for arg in expr.args: + e = bv_extract(arg.size-1, 0, self.from_expr(arg)) + if res: + res = bv_concat(e, res) + else: + res = e + return res + + def from_ExprCond(self, expr): + cond = self.from_expr(expr.cond) + src1 = self.from_expr(expr.src1) + src2 = self.from_expr(expr.src2) + + # (and (distinct cond (_ bv0 <size>)) true) + zero = bit_vec_val(0, expr.cond.size) + distinct = smt2_distinct(cond, zero) + distinct_and = smt2_and(distinct, "true") + + # (ite ((and (distinct cond (_ bv0 <size>)) true) src1 src2)) + return smt2_ite(distinct_and, src1, src2) + + def from_ExprOp(self, expr): + args = list(map(self.from_expr, expr.args)) + res = args[0] + + if len(args) > 1: + for arg in args[1:]: + if expr.op == "+": + res = bvadd(res, arg) + elif expr.op == "-": + res = bvsub(res, arg) + elif expr.op == "*": + res = bvmul(res, arg) + elif expr.op == "/": + res = bvsdiv(res, arg) + elif expr.op == "sdiv": + res = bvsdiv(res, arg) + elif expr.op == "udiv": + res = bvudiv(res, arg) + elif expr.op == "%": + res = bvsmod(res, arg) + elif expr.op == "smod": + res = bvsmod(res, arg) + elif expr.op == "umod": + res = bvurem(res, arg) + elif expr.op == "&": + res = bvand(res, arg) + elif expr.op == "^": + res = bvxor(res, arg) + elif expr.op == "|": + res = bvor(res, arg) + elif expr.op == "<<": + res = bvshl(res, arg) + elif expr.op == ">>": + res = bvlshr(res, arg) + elif expr.op == "a>>": + res = bvashr(res, arg) + elif expr.op == "<<<": + res = bv_rotate_left(res, arg, expr.size) + elif expr.op == ">>>": + res = bv_rotate_right(res, arg, expr.size) + elif expr.op == "==": + res = self.from_expr(ExprCond(expr.args[0] - expr.args[1], ExprInt(0, 1), ExprInt(1, 1))) + else: + raise NotImplementedError("Unsupported OP yet: %s" % expr.op) + elif expr.op == 'parity': + arg = bv_extract(7, 0, res) + res = bit_vec_val(1, 1) + for i in range(8): + res = bvxor(res, bv_extract(i, i, arg)) + elif expr.op == '-': + res = bvneg(res) + elif expr.op == "cnttrailzeros": + src = res + size = expr.size + size_smt2 = bit_vec_val(size, size) + one_smt2 = bit_vec_val(1, size) + zero_smt2 = bit_vec_val(0, size) + # src & (1 << (size - 1)) + op = bvand(src, bvshl(one_smt2, bvsub(size_smt2, one_smt2))) + # op != 0 + cond = smt2_distinct(op, zero_smt2) + # ite(cond, size - 1, src) + res = smt2_ite(cond, bvsub(size_smt2, one_smt2), src) + for i in range(size - 2, -1, -1): + # smt2 expression of i + i_smt2 = bit_vec_val(i, size) + # src & (1 << i) + op = bvand(src, bvshl(one_smt2, i_smt2)) + # op != 0 + cond = smt2_distinct(op, zero_smt2) + # ite(cond, i, res) + res = smt2_ite(cond, i_smt2, res) + elif expr.op == "cntleadzeros": + src = res + size = expr.size + one_smt2 = bit_vec_val(1, size) + zero_smt2 = bit_vec_val(0, size) + # (src & 1) != 0 + cond = smt2_distinct(bvand(src, one_smt2), zero_smt2) + # ite(cond, 0, src) + res= smt2_ite(cond, zero_smt2, src) + for i in range(size - 1, 0, -1): + index = - i % size + index_smt2 = bit_vec_val(index, size) + # src & (1 << index) + op = bvand(src, bvshl(one_smt2, index_smt2)) + # op != 0 + cond = smt2_distinct(op, zero_smt2) + # ite(cond, index, res) + value_smt2 = bit_vec_val(size - (index + 1), size) + res = smt2_ite(cond, value_smt2, res) + else: + raise NotImplementedError("Unsupported OP yet: %s" % expr.op) + + return res + + def from_ExprAssign(self, expr): + src = self.from_expr(expr.src) + dst = self.from_expr(expr.dst) + return smt2_assert(smt2_eq(src, dst)) + + def to_smt2(self, exprs, logic="QF_ABV", model=False): + """ + Converts a valid SMT2 file for a given list of + SMT2 expressions. + + :param exprs: list of SMT2 expressions + :param logic: SMT2 logic + :param model: model generation flag + :return: String of the SMT2 file + """ + ret = "" + ret += "(set-logic {})\n".format(logic) + + # define bit vectors + for bv in self._bitvectors: + size = self._bitvectors[bv] + ret += "{}\n".format(declare_bv(bv, size)) + + # define memory arrays + for size in self._mem.mems: + mem = self._mem.mems[size] + ret += "{}\n".format(declare_array(mem, bit_vec(size), bit_vec(8))) + + # merge SMT2 expressions + for expr in exprs: + ret += expr + "\n" + + # define action + ret += "(check-sat)\n" + + # enable model generation + if model: + ret += "(get-model)\n" + + return ret + + +# Register the class +Translator.register(TranslatorSMT2) diff --git a/src/miasm/ir/translators/translator.py b/src/miasm/ir/translators/translator.py new file mode 100644 index 00000000..c9368f09 --- /dev/null +++ b/src/miasm/ir/translators/translator.py @@ -0,0 +1,127 @@ +from future.utils import viewitems + +import miasm.expression.expression as m2_expr +from miasm.core.utils import BoundedDict + + +class Translator(object): + "Abstract parent class for translators." + + # Registered translators + available_translators = [] + # Implemented language + __LANG__ = "" + + @classmethod + def register(cls, translator): + """Register a translator + @translator: Translator sub-class + """ + cls.available_translators.append(translator) + + @classmethod + def to_language(cls, target_lang, *args, **kwargs): + """Return the corresponding translator instance + @target_lang: str (case insensitive) wanted language + Raise a NotImplementedError in case of unmatched language + """ + target_lang = target_lang.lower() + for translator in cls.available_translators: + if translator.__LANG__.lower() == target_lang: + return translator(*args, **kwargs) + + raise NotImplementedError("Unknown target language: %s" % target_lang) + + @classmethod + def available_languages(cls): + "Return the list of registered languages" + return [translator.__LANG__ for translator in cls.available_translators] + + def __init__(self, cache_size=1000): + """Instance a translator + @cache_size: (optional) Expr cache size + """ + self._cache = BoundedDict(cache_size) + + def from_ExprInt(self, expr): + """Translate an ExprInt + @expr: ExprInt to translate + """ + raise NotImplementedError("Abstract method") + + def from_ExprId(self, expr): + """Translate an ExprId + @expr: ExprId to translate + """ + raise NotImplementedError("Abstract method") + + def from_ExprLoc(self, expr): + """Translate an ExprLoc + @expr: ExprLoc to translate + """ + raise NotImplementedError("Abstract method") + + def from_ExprCompose(self, expr): + """Translate an ExprCompose + @expr: ExprCompose to translate + """ + raise NotImplementedError("Abstract method") + + def from_ExprSlice(self, expr): + """Translate an ExprSlice + @expr: ExprSlice to translate + """ + raise NotImplementedError("Abstract method") + + def from_ExprOp(self, expr): + """Translate an ExprOp + @expr: ExprOp to translate + """ + raise NotImplementedError("Abstract method") + + def from_ExprMem(self, expr): + """Translate an ExprMem + @expr: ExprMem to translate + """ + raise NotImplementedError("Abstract method") + + def from_ExprAssign(self, expr): + """Translate an ExprAssign + @expr: ExprAssign to translate + """ + raise NotImplementedError("Abstract method") + + def from_ExprCond(self, expr): + """Translate an ExprCond + @expr: ExprCond to translate + """ + raise NotImplementedError("Abstract method") + + def from_expr(self, expr): + """Translate an expression according to its type + @expr: expression to translate + """ + # Use cache + if expr in self._cache: + return self._cache[expr] + + # Handle Expr type + handlers = { + m2_expr.ExprInt: self.from_ExprInt, + m2_expr.ExprId: self.from_ExprId, + m2_expr.ExprLoc: self.from_ExprLoc, + m2_expr.ExprCompose: self.from_ExprCompose, + m2_expr.ExprSlice: self.from_ExprSlice, + m2_expr.ExprOp: self.from_ExprOp, + m2_expr.ExprMem: self.from_ExprMem, + m2_expr.ExprAssign: self.from_ExprAssign, + m2_expr.ExprCond: self.from_ExprCond + } + for target, handler in viewitems(handlers): + if isinstance(expr, target): + ## Compute value and update the internal cache + ret = handler(expr) + self._cache[expr] = ret + return ret + raise ValueError("Unhandled type for %s" % expr) + diff --git a/src/miasm/ir/translators/z3_ir.py b/src/miasm/ir/translators/z3_ir.py new file mode 100644 index 00000000..c72ff36f --- /dev/null +++ b/src/miasm/ir/translators/z3_ir.py @@ -0,0 +1,285 @@ +from builtins import map +from builtins import range +import importlib.util +import logging + +# Raise an ImportError if z3 is not available WITHOUT actually importing it +if importlib.util.find_spec("z3") is None: + raise ImportError("No module named 'z3'") + +from miasm.ir.translators.translator import Translator + +log = logging.getLogger("translator_z3") +console_handler = logging.StreamHandler() +console_handler.setFormatter(logging.Formatter("[%(levelname)-8s]: %(message)s")) +log.addHandler(console_handler) +log.setLevel(logging.WARNING) + +class Z3Mem(object): + """Memory abstraction for TranslatorZ3. Memory elements are only accessed, + never written. To give a concrete value for a given memory cell in a solver, + add "mem32.get(address, size) == <value>" constraints to your equation. + The endianness of memory accesses is handled accordingly to the "endianness" + attribute. + + Note: Will have one memory space for each addressing size used. + For example, if memory is accessed via 32 bits values and 16 bits values, + these access will not occur in the same address space. + """ + + def __init__(self, endianness="<", name="mem"): + """Initializes a Z3Mem object with a given @name and @endianness. + @endianness: Endianness of memory representation. '<' for little endian, + '>' for big endian. + @name: name of memory Arrays generated. They will be named + name+str(address size) (for example mem32, mem16...). + """ + # Import z3 only on demand + global z3 + import z3 + + if endianness not in ['<', '>']: + raise ValueError("Endianness should be '>' (big) or '<' (little)") + self.endianness = endianness + self.mems = {} # Address size -> memory z3.Array + self.name = name + + def get_mem_array(self, size): + """Returns a z3 Array used internally to represent memory for addresses + of size @size. + @size: integer, size in bit of addresses in the memory to get. + Return a z3 Array: BitVecSort(size) -> BitVecSort(8). + """ + try: + mem = self.mems[size] + except KeyError: + # Lazy instantiation + self.mems[size] = z3.Array(self.name + str(size), + z3.BitVecSort(size), + z3.BitVecSort(8)) + mem = self.mems[size] + return mem + + def __getitem__(self, addr): + """One byte memory access. Different address sizes with the same value + will result in different memory accesses. + @addr: a z3 BitVec, the address to read. + Return a z3 BitVec of size 8 bits representing a memory access. + """ + size = addr.size() + mem = self.get_mem_array(size) + return mem[addr] + + def get(self, addr, size): + """ Memory access at address @addr of size @size. + @addr: a z3 BitVec, the address to read. + @size: int, size of the read in bits. + Return a z3 BitVec of size @size representing a memory access. + """ + original_size = size + if original_size % 8 != 0: + # Size not aligned on 8bits -> read more than size and extract after + size = ((original_size // 8) + 1) * 8 + res = self[addr] + if self.is_little_endian(): + for i in range(1, size // 8): + res = z3.Concat(self[addr+i], res) + else: + for i in range(1, size //8): + res = z3.Concat(res, self[addr+i]) + if size == original_size: + return res + else: + # Size not aligned, extract right sized result + return z3.Extract(original_size-1, 0, res) + + def is_little_endian(self): + """True if this memory is little endian.""" + return self.endianness == "<" + + def is_big_endian(self): + """True if this memory is big endian.""" + return not self.is_little_endian() + + +class TranslatorZ3(Translator): + """Translate a Miasm expression to an equivalent z3 python binding + expression. Memory is abstracted via z3.Array (see Z3Mem). + The result of from_expr will be a z3 Expr. + + If you want to interact with the memory abstraction after the translation, + you can instantiate your own Z3Mem, that will be equivalent to the one + used by TranslatorZ3. + """ + + # Implemented language + __LANG__ = "z3" + # Operations translation + trivial_ops = ["+", "-", "/", "%", "&", "^", "|", "*", "<<"] + + def __init__(self, endianness="<", loc_db=None, **kwargs): + """Instance a Z3 translator + @endianness: (optional) memory endianness + """ + # Import z3 only on demand + global z3 + import z3 + + super(TranslatorZ3, self).__init__(**kwargs) + self._mem = Z3Mem(endianness) + self.loc_db = loc_db + + def from_ExprInt(self, expr): + return z3.BitVecVal(int(expr), expr.size) + + def from_ExprId(self, expr): + return z3.BitVec(str(expr), expr.size) + + def from_ExprLoc(self, expr): + if self.loc_db is None: + # No loc_db, fallback to default name + return z3.BitVec(str(expr), expr.size) + loc_key = expr.loc_key + offset = self.loc_db.get_location_offset(loc_key) + if offset is not None: + return z3.BitVecVal(offset, expr.size) + # fallback to default name + return z3.BitVec(str(loc_key), expr.size) + + def from_ExprMem(self, expr): + addr = self.from_expr(expr.ptr) + return self._mem.get(addr, expr.size) + + def from_ExprSlice(self, expr): + res = self.from_expr(expr.arg) + res = z3.Extract(expr.stop-1, expr.start, res) + return res + + def from_ExprCompose(self, expr): + res = None + for arg in expr.args: + e = z3.Extract(arg.size-1, 0, self.from_expr(arg)) + if res != None: + res = z3.Concat(e, res) + else: + res = e + return res + + def from_ExprCond(self, expr): + cond = self.from_expr(expr.cond) + src1 = self.from_expr(expr.src1) + src2 = self.from_expr(expr.src2) + return z3.If(cond != 0, src1, src2) + + def _abs(self, z3_value): + return z3.If(z3_value >= 0,z3_value,-z3_value) + + def _sdivC(self, num_expr, den_expr): + """Divide (signed) @num by @den (Expr) as C would + See modint.__div__ for implementation choice + """ + num, den = self.from_expr(num_expr), self.from_expr(den_expr) + num_s = self.from_expr(num_expr.signExtend(num_expr.size * 2)) + den_s = self.from_expr(den_expr.signExtend(den_expr.size * 2)) + result_sign = z3.If(num_s * den_s >= 0, + z3.BitVecVal(1, num.size()), + z3.BitVecVal(-1, num.size()), + ) + return z3.UDiv(self._abs(num), self._abs(den)) * result_sign + + def from_ExprOp(self, expr): + args = list(map(self.from_expr, expr.args)) + res = args[0] + + if len(args) > 1: + for arg in args[1:]: + if expr.op in self.trivial_ops: + res = eval("res %s arg" % expr.op) + elif expr.op == ">>": + res = z3.LShR(res, arg) + elif expr.op == "a>>": + res = res >> arg + elif expr.op == "<<<": + res = z3.RotateLeft(res, arg) + elif expr.op == ">>>": + res = z3.RotateRight(res, arg) + elif expr.op == "sdiv": + res = self._sdivC(expr.args[0], expr.args[1]) + elif expr.op == "udiv": + res = z3.UDiv(res, arg) + elif expr.op == "smod": + res = res - (arg * (self._sdivC(expr.args[0], expr.args[1]))) + elif expr.op == "umod": + res = z3.URem(res, arg) + elif expr.op == "==": + res = z3.If( + args[0] == args[1], + z3.BitVecVal(1, 1), + z3.BitVecVal(0, 1) + ) + elif expr.op == "<u": + res = z3.If( + z3.ULT(args[0], args[1]), + z3.BitVecVal(1, 1), + z3.BitVecVal(0, 1) + ) + elif expr.op == "<s": + res = z3.If( + args[0] < args[1], + z3.BitVecVal(1, 1), + z3.BitVecVal(0, 1) + ) + elif expr.op == "<=u": + res = z3.If( + z3.ULE(args[0], args[1]), + z3.BitVecVal(1, 1), + z3.BitVecVal(0, 1) + ) + elif expr.op == "<=s": + res = z3.If( + args[0] <= args[1], + z3.BitVecVal(1, 1), + z3.BitVecVal(0, 1) + ) + else: + raise NotImplementedError("Unsupported OP yet: %s" % expr.op) + elif expr.op == 'parity': + arg = z3.Extract(7, 0, res) + res = z3.BitVecVal(1, 1) + for i in range(8): + res = res ^ z3.Extract(i, i, arg) + elif expr.op == '-': + res = -res + elif expr.op == "cnttrailzeros": + size = expr.size + src = res + res = z3.If(src == 0, size, src) + for i in range(size - 1, -1, -1): + res = z3.If((src & (1 << i)) != 0, i, res) + elif expr.op == "cntleadzeros": + size = expr.size + src = res + res = z3.If(src == 0, size, src) + for i in range(size, 0, -1): + index = - i % size + out = size - (index + 1) + res = z3.If((src & (1 << index)) != 0, out, res) + elif expr.op.startswith("zeroExt"): + arg, = expr.args + res = z3.ZeroExt(expr.size - arg.size, self.from_expr(arg)) + elif expr.op.startswith("signExt"): + arg, = expr.args + res = z3.SignExt(expr.size - arg.size, self.from_expr(arg)) + else: + raise NotImplementedError("Unsupported OP yet: %s" % expr.op) + + return res + + def from_ExprAssign(self, expr): + src = self.from_expr(expr.src) + dst = self.from_expr(expr.dst) + return (src == dst) + + +# Register the class +Translator.register(TranslatorZ3) |