diff options
Diffstat (limited to 'miasm2/ir')
| -rw-r--r-- | miasm2/ir/__init__.py | 1 | ||||
| -rw-r--r-- | miasm2/ir/analysis.py | 113 | ||||
| -rw-r--r-- | miasm2/ir/ir.py | 929 | ||||
| -rw-r--r-- | miasm2/ir/symbexec.py | 1124 | ||||
| -rw-r--r-- | miasm2/ir/symbexec_top.py | 221 | ||||
| -rw-r--r-- | miasm2/ir/symbexec_types.py | 131 | ||||
| -rw-r--r-- | miasm2/ir/translators/C.py | 528 | ||||
| -rw-r--r-- | miasm2/ir/translators/__init__.py | 13 | ||||
| -rw-r--r-- | miasm2/ir/translators/miasm.py | 45 | ||||
| -rw-r--r-- | miasm2/ir/translators/python.py | 98 | ||||
| -rw-r--r-- | miasm2/ir/translators/smt2.py | 326 | ||||
| -rw-r--r-- | miasm2/ir/translators/translator.py | 127 | ||||
| -rw-r--r-- | miasm2/ir/translators/z3_ir.py | 281 |
13 files changed, 0 insertions, 3937 deletions
diff --git a/miasm2/ir/__init__.py b/miasm2/ir/__init__.py deleted file mode 100644 index 0627b488..00000000 --- a/miasm2/ir/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"Intermediate representation methods" diff --git a/miasm2/ir/analysis.py b/miasm2/ir/analysis.py deleted file mode 100644 index 9158aceb..00000000 --- a/miasm2/ir/analysis.py +++ /dev/null @@ -1,113 +0,0 @@ -#-*- coding:utf-8 -*- - -import warnings -import logging - -from miasm2.ir.ir import IntermediateRepresentation, AssignBlock -from miasm2.expression.expression import ExprOp, ExprAssign -from miasm2.analysis.data_flow import dead_simp as new_dead_simp_imp - - -log = logging.getLogger("analysis") -console_handler = logging.StreamHandler() -console_handler.setFormatter(logging.Formatter("%(levelname)-5s: %(message)s")) -log.addHandler(console_handler) -log.setLevel(logging.WARNING) - - -class ira(IntermediateRepresentation): - """IR Analysis - This class provides higher level manipulations on IR, such as dead - instruction removals. - - This class can be used as a common parent with - `miasm2.ir.ir::IntermediateRepresentation` class. - - For instance: - class ira_x86_16(ir_x86_16, ira) - - """ - ret_reg = None - - def call_effects(self, addr, instr): - """Default modelisation of a function call to @addr. This may be used to: - - * insert dependencies to arguments (stack base, registers, ...) - * add some side effects (stack clean, return value, ...) - - Return a couple: - * list of assignments to add to the current irblock - * list of additional irblocks - - @addr: (Expr) address of the called function - @instr: native instruction which is responsible of the call - """ - - call_assignblk = AssignBlock( - [ - ExprAssign(self.ret_reg, ExprOp('call_func_ret', addr, self.sp)), - ExprAssign(self.sp, ExprOp('call_func_stack', addr, self.sp)) - ], - instr - ) - return [call_assignblk], [] - - def add_instr_to_current_state(self, instr, block, assignments, ir_blocks_all, gen_pc_updt): - """ - Add the IR effects of an instruction to the current state. - If the instruction is a function call, replace the original IR by a - model of the sub function - - Returns a bool: - * True if the current assignments list must be split - * False in other cases. - - @instr: native instruction - @block: native block source - @assignments: current irbloc - @ir_blocks_all: list of additional effects - @gen_pc_updt: insert PC update effects between instructions - """ - if instr.is_subcall(): - call_assignblks, extra_irblocks = self.call_effects( - instr.args[0], - instr - ) - assignments += call_assignblks - ir_blocks_all += extra_irblocks - return True - - if gen_pc_updt is not False: - self.gen_pc_update(assignments, instr) - - assignblk, ir_blocks_extra = self.instr2ir(instr) - assignments.append(assignblk) - ir_blocks_all += ir_blocks_extra - if ir_blocks_extra: - return True - return False - - def sizeof_char(self): - "Return the size of a char in bits" - raise NotImplementedError("Abstract method") - - def sizeof_short(self): - "Return the size of a short in bits" - raise NotImplementedError("Abstract method") - - def sizeof_int(self): - "Return the size of an int in bits" - raise NotImplementedError("Abstract method") - - def sizeof_long(self): - "Return the size of a long in bits" - raise NotImplementedError("Abstract method") - - def sizeof_pointer(self): - "Return the size of a void* in bits" - raise NotImplementedError("Abstract method") - - def dead_simp(self, ircfg): - """Deprecated: See miasm2.analysis.data_flow.dead_simp()""" - warnings.warn('DEPRECATION WARNING: Please use miasm2.analysis.data_flow.dead_simp(ira) instead of ira.dead_simp()') - new_dead_simp_imp(self, ircfg) diff --git a/miasm2/ir/ir.py b/miasm2/ir/ir.py deleted file mode 100644 index 82b12dcd..00000000 --- a/miasm2/ir/ir.py +++ /dev/null @@ -1,929 +0,0 @@ -#-*- coding:utf-8 -*- - -# -# Copyright (C) 2013 Fabrice Desclaux -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -# -from builtins import zip -import warnings - -from itertools import chain -from future.utils import viewvalues, viewitems - -import miasm2.expression.expression as m2_expr -from miasm2.expression.expression_helper import get_missing_interval -from miasm2.core.asmblock import AsmBlock, AsmConstraint -from miasm2.core.graph import DiGraph -from functools import reduce - - -def _expr_loc_to_symb(expr, loc_db): - if not expr.is_loc(): - return expr - if loc_db is None: - name = str(expr) - else: - names = loc_db.get_location_names(expr.loc_key) - if not names: - name = loc_db.pretty_str(expr.loc_key) - else: - # Use only one name for readability - name = sorted(names)[0] - return m2_expr.ExprId(name, expr.size) - -class AssignBlock(object): - """Represent parallel IR assignment, such as: - EAX = EBX - EBX = EAX - - -> Exchange between EBX and EAX - - AssignBlock can be seen as a dictionary where keys are the destinations - (ExprId or ExprMem), and values their corresponding sources. - - Also provides common manipulation on this assignments. - - """ - __slots__ = ["_assigns", "_instr"] - - def __init__(self, irs=None, instr=None): - """Create a new AssignBlock - @irs: (optional) sequence of ExprAssign, or dictionary dst (Expr) -> src - (Expr) - @instr: (optional) associate an instruction with this AssignBlock - - """ - if irs is None: - irs = [] - self._instr = instr - self._assigns = {} # ExprAssign.dst -> ExprAssign.src - - # Concurrent assignments are handled in _set - if hasattr(irs, "items"): - for dst, src in viewitems(irs): - self._set(dst, src) - else: - for expraff in irs: - self._set(expraff.dst, expraff.src) - - @property - def instr(self): - """Return the associated instruction, if any""" - return self._instr - - def _set(self, dst, src): - """ - Special cases: - * if dst is an ExprSlice, expand it to assign the full Expression - * if dst already known, sources are merged - """ - if dst.size != src.size: - raise RuntimeError( - "sanitycheck: args must have same size! %s" % - ([(str(arg), arg.size) for arg in [dst, src]])) - - if isinstance(dst, m2_expr.ExprSlice): - # Complete the source with missing slice parts - new_dst = dst.arg - rest = [(m2_expr.ExprSlice(dst.arg, r[0], r[1]), r[0], r[1]) - for r in dst.slice_rest()] - all_a = [(src, dst.start, dst.stop)] + rest - all_a.sort(key=lambda x: x[1]) - args = [expr for (expr, _, _) in all_a] - new_src = m2_expr.ExprCompose(*args) - else: - new_dst, new_src = dst, src - - if new_dst in self._assigns and isinstance(new_src, m2_expr.ExprCompose): - if not isinstance(self[new_dst], m2_expr.ExprCompose): - # prev_RAX = 0x1122334455667788 - # input_RAX[0:8] = 0x89 - # final_RAX -> ? (assignment are in parallel) - raise RuntimeError("Concurrent access on same bit not allowed") - - # Consider slice grouping - expr_list = [(new_dst, new_src), - (new_dst, self[new_dst])] - # Find collision - e_colision = reduce(lambda x, y: x.union(y), - (self.get_modified_slice(dst, src) - for (dst, src) in expr_list), - set()) - - # Sort interval collision - known_intervals = sorted([(x[1], x[2]) for x in e_colision]) - - for i, (_, stop) in enumerate(known_intervals[:-1]): - if stop > known_intervals[i + 1][0]: - raise RuntimeError( - "Concurrent access on same bit not allowed") - - # Fill with missing data - missing_i = get_missing_interval(known_intervals, 0, new_dst.size) - remaining = ((m2_expr.ExprSlice(new_dst, *interval), - interval[0], - interval[1]) - for interval in missing_i) - - # Build the merging expression - args = list(e_colision.union(remaining)) - args.sort(key=lambda x: x[1]) - starts = [start for (_, start, _) in args] - assert len(set(starts)) == len(starts) - args = [expr for (expr, _, _) in args] - new_src = m2_expr.ExprCompose(*args) - - # Sanity check - if not isinstance(new_dst, (m2_expr.ExprId, m2_expr.ExprMem)): - raise TypeError("Destination cannot be a %s" % type(new_dst)) - - self._assigns[new_dst] = new_src - - def __setitem__(self, dst, src): - raise RuntimeError('AssignBlock is immutable') - - def __getitem__(self, key): - return self._assigns[key] - - def __contains__(self, key): - return key in self._assigns - - def iteritems(self): - for dst, src in viewitems(self._assigns): - yield dst, src - - def items(self): - return [(dst, src) for dst, src in viewitems(self._assigns)] - - def itervalues(self): - for src in viewvalues(self._assigns): - yield src - - def keys(self): - return list(self._assigns) - - def values(self): - return list(viewvalues(self._assigns)) - - def __iter__(self): - for dst in self._assigns: - yield dst - - def __delitem__(self, _): - raise RuntimeError('AssignBlock is immutable') - - def update(self, _): - raise RuntimeError('AssignBlock is immutable') - - def __eq__(self, other): - if set(self.keys()) != set(other.keys()): - return False - return all(other[dst] == src for dst, src in viewitems(self)) - - def __ne__(self, other): - return not self == other - - def __len__(self): - return len(self._assigns) - - def get(self, key, default): - return self._assigns.get(key, default) - - @staticmethod - def get_modified_slice(dst, src): - """Return an Expr list of extra expressions needed during the - object instantiation""" - if not isinstance(src, m2_expr.ExprCompose): - raise ValueError("Get mod slice not on expraff slice", str(src)) - modified_s = [] - for index, arg in src.iter_args(): - if not (isinstance(arg, m2_expr.ExprSlice) and - arg.arg == dst and - index == arg.start and - index+arg.size == arg.stop): - # If x is not the initial expression - modified_s.append((arg, index, index+arg.size)) - return modified_s - - def get_w(self): - """Return a set of elements written""" - return set(self.keys()) - - def get_rw(self, mem_read=False, cst_read=False): - """Return a dictionary associating written expressions to a set of - their read requirements - @mem_read: (optional) mem_read argument of `get_r` - @cst_read: (optional) cst_read argument of `get_r` - """ - out = {} - for dst, src in viewitems(self): - src_read = src.get_r(mem_read=mem_read, cst_read=cst_read) - if isinstance(dst, m2_expr.ExprMem) and mem_read: - # Read on destination happens only with ExprMem - src_read.update(dst.ptr.get_r(mem_read=mem_read, - cst_read=cst_read)) - out[dst] = src_read - return out - - def get_r(self, mem_read=False, cst_read=False): - """Return a set of elements reads - @mem_read: (optional) mem_read argument of `get_r` - @cst_read: (optional) cst_read argument of `get_r` - """ - return set( - chain.from_iterable( - viewvalues( - self.get_rw( - mem_read=mem_read, - cst_read=cst_read - ) - ) - ) - ) - - def __str__(self): - out = [] - for dst, src in sorted(viewitems(self._assigns)): - out.append("%s = %s" % (dst, src)) - return "\n".join(out) - - def dst2ExprAssign(self, dst): - """Return an ExprAssign corresponding to @dst equation - @dst: Expr instance""" - return m2_expr.ExprAssign(dst, self[dst]) - - def simplify(self, simplifier): - """ - Return a new AssignBlock with expression simplified - - @simplifier: ExpressionSimplifier instance - """ - new_assignblk = {} - for dst, src in viewitems(self): - if dst == src: - continue - new_src = simplifier(src) - new_dst = simplifier(dst) - new_assignblk[new_dst] = new_src - return AssignBlock(irs=new_assignblk, instr=self.instr) - - def to_string(self, loc_db=None): - out = [] - for dst, src in viewitems(self): - new_src = src.visit(lambda expr:_expr_loc_to_symb(expr, loc_db)) - new_dst = dst.visit(lambda expr:_expr_loc_to_symb(expr, loc_db)) - line = "%s = %s" % (new_dst, new_src) - out.append(line) - out.append("") - return "\n".join(out) - -class IRBlock(object): - """Intermediate representation block object. - - Stand for an intermediate representation basic block. - """ - - __slots__ = ["_loc_key", "_assignblks", "_dst", "_dst_linenb"] - - def __init__(self, loc_key, assignblks): - """ - @loc_key: LocKey of the IR basic block - @assignblks: list of AssignBlock - """ - - assert isinstance(loc_key, m2_expr.LocKey) - self._loc_key = loc_key - for assignblk in assignblks: - assert isinstance(assignblk, AssignBlock) - self._assignblks = tuple(assignblks) - self._dst = None - self._dst_linenb = None - - def __eq__(self, other): - if self.__class__ is not other.__class__: - return False - if self.loc_key != other.loc_key: - return False - if len(self.assignblks) != len(other.assignblks): - return False - for assignblk1, assignblk2 in zip(self.assignblks, other.assignblks): - if assignblk1 != assignblk2: - return False - return True - - def __ne__(self, other): - return not self == other - - def get_label(self): - warnings.warn('DEPRECATION WARNING: use ".loc_key" instead of ".label"') - return self.loc_key - - loc_key = property(lambda self:self._loc_key) - label = property(get_label) - - @property - def assignblks(self): - return self._assignblks - - @property - def irs(self): - warnings.warn('DEPRECATION WARNING: use "irblock.assignblks" instead of "irblock.irs"') - return self._assignblks - - def __iter__(self): - """Iterate on assignblks""" - return self._assignblks.__iter__() - - def __getitem__(self, index): - """Getitem on assignblks""" - return self._assignblks.__getitem__(index) - - def __len__(self): - """Length of assignblks""" - return self._assignblks.__len__() - - def is_dst_set(self): - return self._dst is not None - - def cache_dst(self): - final_dst = None - final_linenb = None - for linenb, assignblk in enumerate(self): - for dst, src in viewitems(assignblk): - if dst.is_id("IRDst"): - if final_dst is not None: - raise ValueError('Multiple destinations!') - final_dst = src - final_linenb = linenb - self._dst = final_dst - self._dst_linenb = final_linenb - return final_dst - - @property - def dst(self): - """Return the value of IRDst for the IRBlock""" - if self.is_dst_set(): - return self._dst - return self.cache_dst() - - def set_dst(self, value): - """Generate a new IRBlock with a dst (IRBlock) fixed to @value""" - irs = [] - dst_found = False - for assignblk in self: - new_assignblk = {} - for dst, src in viewitems(assignblk): - if dst.is_id("IRDst"): - assert dst_found is False - dst_found = True - new_assignblk[dst] = value - else: - new_assignblk[dst] = src - irs.append(AssignBlock(new_assignblk, assignblk.instr)) - return IRBlock(self.loc_key, irs) - - @property - def dst_linenb(self): - """Line number of the IRDst setting statement in the current irs""" - if not self.is_dst_set(): - self.cache_dst() - return self._dst_linenb - - def __str__(self): - out = [] - out.append(str(self.loc_key)) - for assignblk in self: - for dst, src in viewitems(assignblk): - out.append('\t%s = %s' % (dst, src)) - out.append("") - return "\n".join(out) - - - def modify_exprs(self, mod_dst=None, mod_src=None): - """ - Generate a new IRBlock with its AssignBlock expressions modified - according to @mod_dst and @mod_src - @mod_dst: function called to modify Expression destination - @mod_src: function called to modify Expression source - """ - - if mod_dst is None: - mod_dst = lambda expr:expr - if mod_src is None: - mod_src = lambda expr:expr - - assignblks = [] - for assignblk in self: - new_assignblk = {} - for dst, src in viewitems(assignblk): - new_assignblk[mod_dst(dst)] = mod_src(src) - assignblks.append(AssignBlock(new_assignblk, assignblk.instr)) - return IRBlock(self.loc_key, assignblks) - - def to_string(self, loc_db=None): - out = [] - if loc_db is None: - node_name = "%s:" % self.loc_key - else: - names = loc_db.get_location_names(self.loc_key) - if not names: - node_name = "%s:" % loc_db.pretty_str(self.loc_key) - else: - node_name = "".join("%s:\n" % name for name in names) - out.append(node_name) - - for assignblk in self: - out.append(assignblk.to_string(loc_db)) - return '\n'.join(out) - - - def simplify(self, simplifier): - """ - Simplify expressions in each assignblock - @simplifier: ExpressionSimplifier instance - """ - modified = False - assignblks = [] - for assignblk in self: - new_assignblk = assignblk.simplify(simplifier) - if assignblk != new_assignblk: - modified = True - assignblks.append(new_assignblk) - return modified, IRBlock(self.loc_key, assignblks) - - -class irbloc(IRBlock): - """ - DEPRECATED object - Use IRBlock instead of irbloc - """ - - def __init__(self, loc_key, irs, lines=None): - warnings.warn('DEPRECATION WARNING: use "IRBlock" instead of "irblock"') - super(irbloc, self).__init__(loc_key, irs) - - -class IRCFG(DiGraph): - - """DiGraph for IR instances""" - - def __init__(self, irdst, loc_db, blocks=None, *args, **kwargs): - """Instantiate a IRCFG - @loc_db: LocationDB instance - @blocks: IR blocks - """ - self.loc_db = loc_db - if blocks is None: - blocks = {} - self._blocks = blocks - self._irdst = irdst - super(IRCFG, self).__init__(*args, **kwargs) - - @property - def IRDst(self): - return self._irdst - - @property - def blocks(self): - return self._blocks - - def add_irblock(self, irblock): - """ - Add the @irblock to the current IRCFG - @irblock: IRBlock instance - """ - self.blocks[irblock.loc_key] = irblock - self.add_node(irblock.loc_key) - - for dst in self.dst_trackback(irblock): - if dst.is_int(): - dst_loc_key = self.loc_db.get_or_create_offset_location(int(dst)) - dst = m2_expr.ExprLoc(dst_loc_key, irblock.dst.size) - if dst.is_loc(): - self.add_uniq_edge(irblock.loc_key, dst.loc_key) - - def node2lines(self, node): - if self.loc_db is None: - node_name = str(node) - else: - node_name = self.loc_db.pretty_str(node) - yield self.DotCellDescription( - text="%s" % node_name, - attr={ - 'align': 'center', - 'colspan': 2, - 'bgcolor': 'grey', - } - ) - if node not in self._blocks: - yield [self.DotCellDescription(text="NOT PRESENT", attr={})] - return - for i, assignblk in enumerate(self._blocks[node]): - for dst, src in viewitems(assignblk): - - new_src = src.visit(lambda expr:_expr_loc_to_symb(expr, self.loc_db)) - new_dst = dst.visit(lambda expr:_expr_loc_to_symb(expr, self.loc_db)) - line = "%s = %s" % (new_dst, new_src) - if self._dot_offset: - yield [self.DotCellDescription(text="%-4d" % i, attr={}), - self.DotCellDescription(text=line, attr={})] - else: - yield self.DotCellDescription(text=line, attr={}) - yield self.DotCellDescription(text="", attr={}) - - def edge_attr(self, src, dst): - if src not in self._blocks or dst not in self._blocks: - return {} - src_irdst = self._blocks[src].dst - edge_color = "blue" - if isinstance(src_irdst, m2_expr.ExprCond): - src1, src2 = src_irdst.src1, src_irdst.src2 - if src1.is_loc(dst): - edge_color = "limegreen" - elif src2.is_loc(dst): - edge_color = "red" - return {"color": edge_color} - - def node_attr(self, node): - if node not in self._blocks: - return {'style': 'filled', 'fillcolor': 'red'} - return {} - - def dot(self, offset=False): - """ - @offset: (optional) if set, add the corresponding line number in each - node - """ - self._dot_offset = offset - return super(IRCFG, self).dot() - - def get_loc_key(self, addr): - """Transforms an ExprId/ExprInt/loc_key/int into a loc_key - @addr: an ExprId/ExprInt/loc_key/int""" - - if isinstance(addr, m2_expr.LocKey): - return addr - elif isinstance(addr, m2_expr.ExprLoc): - return addr.loc_key - - try: - addr = int(addr) - except (ValueError, TypeError): - return None - - return self.loc_db.get_offset_location(addr) - - - def get_or_create_loc_key(self, addr): - """Transforms an ExprId/ExprInt/loc_key/int into a loc_key - If the offset @addr is not in the LocationDB, create it - @addr: an ExprId/ExprInt/loc_key/int""" - - loc_key = self.get_loc_key(addr) - if loc_key is not None: - return loc_key - - return self.loc_db.add_location(offset=int(addr)) - - def get_block(self, addr): - """Returns the irbloc associated to an ExprId/ExprInt/loc_key/int - @addr: an ExprId/ExprInt/loc_key/int""" - - loc_key = self.get_loc_key(addr) - if loc_key is None: - return None - return self.blocks.get(loc_key, None) - - def getby_offset(self, offset): - """ - Return the set of loc_keys of irblocks containing @offset - @offset: address - """ - out = set() - for irb in viewvalues(self.blocks): - for assignblk in irb: - instr = assignblk.instr - if instr is None: - continue - if instr.offset <= offset < instr.offset + instr.l: - out.add(irb.loc_key) - return out - - - def simplify(self, simplifier): - """ - Simplify expressions in each irblocks - @simplifier: ExpressionSimplifier instance - """ - modified = False - for loc_key, block in list(viewitems(self.blocks)): - assignblks = [] - for assignblk in block: - new_assignblk = assignblk.simplify(simplifier) - if assignblk != new_assignblk: - modified = True - assignblks.append(new_assignblk) - self.blocks[loc_key] = IRBlock(loc_key, assignblks) - return modified - - def get_rw(self, regs_ids=None): - """ - Calls get_rw(irb) for each bloc - @regs_ids : ids of registers used in IR - """ - if regs_ids is None: - regs_ids = [] - for irblock in viewvalues(self.blocks): - irblock.get_rw(regs_ids) - - def _extract_dst(self, todo, done): - """ - Naive extraction of @todo destinations - WARNING: @todo and @done are modified - """ - out = set() - while todo: - dst = todo.pop() - if dst.is_loc(): - done.add(dst) - elif dst.is_mem() or dst.is_int(): - done.add(dst) - elif dst.is_cond(): - todo.add(dst.src1) - todo.add(dst.src2) - elif dst.is_id(): - out.add(dst) - else: - done.add(dst) - return out - - def dst_trackback(self, irb): - """ - Naive backtracking of IRDst - @irb: irbloc instance - """ - todo = set([irb.dst]) - done = set() - - for assignblk in reversed(irb): - if not todo: - break - out = self._extract_dst(todo, done) - found = set() - follow = set() - for dst in out: - if dst in assignblk: - follow.add(assignblk[dst]) - found.add(dst) - - follow.update(out.difference(found)) - todo = follow - - return done - - -class DiGraphIR(IRCFG): - """ - DEPRECATED object - Use IRCFG instead of DiGraphIR - """ - - def __init__(self, *args, **kwargs): - warnings.warn('DEPRECATION WARNING: use "IRCFG" instead of "DiGraphIR"') - raise NotImplementedError("Depreceated") - - -class IntermediateRepresentation(object): - """ - Intermediate representation object - - Allow native assembly to intermediate representation traduction - """ - - def __init__(self, arch, attrib, loc_db): - self.pc = arch.getpc(attrib) - self.sp = arch.getsp(attrib) - self.arch = arch - self.attrib = attrib - self.loc_db = loc_db - self.IRDst = None - - def get_ir(self, instr): - raise NotImplementedError("Abstract Method") - - def new_ircfg(self, *args, **kwargs): - """ - Return a new instance of IRCFG - """ - return IRCFG(self.IRDst, self.loc_db, *args, **kwargs) - - def new_ircfg_from_asmcfg(self, asmcfg, *args, **kwargs): - """ - Return a new instance of IRCFG from an @asmcfg - @asmcfg: AsmCFG instance - """ - - ircfg = IRCFG(self.IRDst, self.loc_db, *args, **kwargs) - for block in asmcfg.blocks: - self.add_asmblock_to_ircfg(block, ircfg) - return ircfg - - def instr2ir(self, instr): - ir_bloc_cur, extra_irblocks = self.get_ir(instr) - for index, irb in enumerate(extra_irblocks): - irs = [] - for assignblk in irb: - irs.append(AssignBlock(assignblk, instr)) - extra_irblocks[index] = IRBlock(irb.loc_key, irs) - assignblk = AssignBlock(ir_bloc_cur, instr) - return assignblk, extra_irblocks - - def add_instr_to_ircfg(self, instr, ircfg, loc_key=None, gen_pc_updt=False): - """ - Add the native instruction @instr to the @ircfg - @instr: instruction instance - @ircfg: IRCFG instance - @loc_key: loc_key instance of the instruction destination - @gen_pc_updt: insert PC update effects between instructions - """ - - if loc_key is None: - offset = getattr(instr, "offset", None) - loc_key = self.loc_db.add_location(offset=offset) - block = AsmBlock(loc_key) - block.lines = [instr] - self.add_asmblock_to_ircfg(block, ircfg, gen_pc_updt) - return loc_key - - def gen_pc_update(self, assignments, instr): - offset = m2_expr.ExprInt(instr.offset, self.pc.size) - assignments.append(AssignBlock({self.pc:offset}, instr)) - - def add_instr_to_current_state(self, instr, block, assignments, ir_blocks_all, gen_pc_updt): - """ - Add the IR effects of an instruction to the current state. - - Returns a bool: - * True if the current assignments list must be split - * False in other cases. - - @instr: native instruction - @block: native block source - @assignments: list of current AssignBlocks - @ir_blocks_all: list of additional effects - @gen_pc_updt: insert PC update effects between instructions - """ - if gen_pc_updt is not False: - self.gen_pc_update(assignments, instr) - - assignblk, ir_blocks_extra = self.instr2ir(instr) - assignments.append(assignblk) - ir_blocks_all += ir_blocks_extra - if ir_blocks_extra: - return True - return False - - def add_asmblock_to_ircfg(self, block, ircfg, gen_pc_updt=False): - """ - Add a native block to the current IR - @block: native assembly block - @ircfg: IRCFG instance - @gen_pc_updt: insert PC update effects between instructions - """ - - loc_key = block.loc_key - ir_blocks_all = [] - - assignments = [] - for instr in block.lines: - if loc_key is None: - assignments = [] - loc_key = self.get_loc_key_for_instr(instr) - split = self.add_instr_to_current_state( - instr, block, assignments, - ir_blocks_all, gen_pc_updt - ) - if split: - ir_blocks_all.append(IRBlock(loc_key, assignments)) - loc_key = None - assignments = [] - if loc_key is not None: - ir_blocks_all.append(IRBlock(loc_key, assignments)) - - new_ir_blocks_all = self.post_add_asmblock_to_ircfg(block, ircfg, ir_blocks_all) - for irblock in new_ir_blocks_all: - ircfg.add_irblock(irblock) - return new_ir_blocks_all - - def add_block(self, block, gen_pc_updt=False): - """ - DEPRECATED function - Use add_block instead of add_block - """ - warnings.warn("""DEPRECATION WARNING - ircfg is now out of IntermediateRepresentation - Use: - ircfg = ir_arch.new_ircfg() - ir_arch.add_asmblock_to_ircfg(block, ircfg) - """) - raise RuntimeError("API Deprecated") - - def add_bloc(self, block, gen_pc_updt=False): - """ - DEPRECATED function - Use add_block instead of add_block - """ - self.add_block(block, gen_pc_updt) - - def get_next_loc_key(self, instr): - loc_key = self.loc_db.get_or_create_offset_location(instr.offset + instr.l) - return loc_key - - def get_loc_key_for_instr(self, instr): - """Returns the loc_key associated to an instruction - @instr: current instruction""" - return self.loc_db.get_or_create_offset_location(instr.offset) - - def gen_loc_key_and_expr(self, size): - """ - Return a loc_key and it's corresponding ExprLoc - @size: size of expression - """ - loc_key = self.loc_db.add_location() - return loc_key, m2_expr.ExprLoc(loc_key, size) - - def expr_fix_regs_for_mode(self, expr, *args, **kwargs): - return expr - - def expraff_fix_regs_for_mode(self, expr, *args, **kwargs): - return expr - - def irbloc_fix_regs_for_mode(self, irblock, *args, **kwargs): - return irblock - - def is_pc_written(self, block): - """Return the first Assignblk of the @blockin which PC is written - @block: IRBlock instance""" - all_pc = viewvalues(self.arch.pc) - for assignblk in block: - if assignblk.dst in all_pc: - return assignblk - return None - - def set_empty_dst_to_next(self, block, ir_blocks): - for index, irblock in enumerate(ir_blocks): - if irblock.dst is not None: - continue - next_loc_key = block.get_next() - if next_loc_key is None: - loc_key = None - if block.lines: - line = block.lines[-1] - if line.offset is not None: - loc_key = self.loc_db.get_or_create_offset_location(line.offset + line.l) - if loc_key is None: - loc_key = self.loc_db.add_location() - block.add_cst(loc_key, AsmConstraint.c_next) - else: - loc_key = next_loc_key - dst = m2_expr.ExprLoc(loc_key, self.pc.size) - if irblock.assignblks: - instr = irblock.assignblks[-1].instr - else: - instr = None - assignblk = AssignBlock({self.IRDst: dst}, instr) - ir_blocks[index] = IRBlock(irblock.loc_key, list(irblock.assignblks) + [assignblk]) - - def post_add_asmblock_to_ircfg(self, block, ircfg, ir_blocks): - self.set_empty_dst_to_next(block, ir_blocks) - - new_irblocks = [] - for irblock in ir_blocks: - new_irblock = self.irbloc_fix_regs_for_mode(irblock, self.attrib) - ircfg.add_irblock(new_irblock) - new_irblocks.append(new_irblock) - return new_irblocks - - -class ir(IntermediateRepresentation): - """ - DEPRECATED object - Use IntermediateRepresentation instead of ir - """ - - def __init__(self, loc_key, irs, lines=None): - warnings.warn('DEPRECATION WARNING: use "IntermediateRepresentation" instead of "ir"') - super(ir, self).__init__(loc_key, irs, lines) diff --git a/miasm2/ir/symbexec.py b/miasm2/ir/symbexec.py deleted file mode 100644 index b945e85c..00000000 --- a/miasm2/ir/symbexec.py +++ /dev/null @@ -1,1124 +0,0 @@ -from __future__ import print_function -from builtins import range -import logging -from collections import MutableMapping - -from future.utils import viewitems - -from miasm2.expression.expression import ExprOp, ExprId, ExprLoc, ExprInt, \ - ExprMem, ExprCompose, ExprSlice, ExprCond -from miasm2.expression.simplifications import expr_simp_explicit -from miasm2.ir.ir import AssignBlock - -log = logging.getLogger("symbexec") -console_handler = logging.StreamHandler() -console_handler.setFormatter(logging.Formatter("%(levelname)-5s: %(message)s")) -log.addHandler(console_handler) -log.setLevel(logging.INFO) - - -def get_block(ir_arch, ircfg, mdis, addr): - """Get IRBlock at address @addr""" - loc_key = ircfg.get_or_create_loc_key(addr) - if not loc_key in ircfg.blocks: - offset = mdis.loc_db.get_location_offset(loc_key) - block = mdis.dis_block(offset) - ir_arch.add_asmblock_to_ircfg(block, ircfg) - irblock = ircfg.get_block(loc_key) - if irblock is None: - raise LookupError('No block found at that address: %s' % ir_arch.loc_db.pretty_str(loc_key)) - return irblock - - -class StateEngine(object): - """Stores an Engine state""" - - def merge(self, other): - """Generate a new state, representing the merge of self and @other - @other: a StateEngine instance""" - - raise NotImplementedError("Abstract method") - - -class SymbolicState(StateEngine): - """Stores a SymbolicExecutionEngine state""" - - def __init__(self, dct): - self._symbols = frozenset(viewitems(dct)) - - def __hash__(self): - return hash((self.__class__, self._symbols)) - - def __eq__(self, other): - if self is other: - return True - if self.__class__ != other.__class__: - return False - return self.symbols == other.symbols - - def __ne__(self, other): - return not self == other - - def __iter__(self): - for dst, src in self._symbols: - yield dst, src - - def iteritems(self): - """Iterate on stored memory/values""" - return self.__iter__() - - def merge(self, other): - """Merge two symbolic states - Only equal expressions are kept in both states - @other: second symbolic state - """ - - symb_a = self.symbols - symb_b = other.symbols - intersection = set(symb_a).intersection(set(symb_b)) - out = {} - for dst in intersection: - if symb_a[dst] == symb_b[dst]: - out[dst] = symb_a[dst] - return self.__class__(out) - - @property - def symbols(self): - """Return the dictionary of known symbols""" - return dict(self._symbols) - - -INTERNAL_INTBASE_NAME = "__INTERNAL_INTBASE__" - - -def get_expr_base_offset(expr): - """Return a couple representing the symbolic/concrete part of an addition - expression. - - If there is no symbolic part, ExprId(INTERNAL_INTBASE_NAME) is used - If there is not concrete part, 0 is used - @expr: Expression instance - - """ - if expr.is_int(): - internal_intbase = ExprId(INTERNAL_INTBASE_NAME, expr.size) - return internal_intbase, int(expr) - - if not expr.is_op('+'): - return expr, 0 - if expr.args[-1].is_int(): - args, offset = expr.args[:-1], int(expr.args[-1]) - if len(args) == 1: - return args[0], offset - return ExprOp('+', *args), offset - return expr, 0 - - -class MemArray(MutableMapping): - """Link between base and its (offset, Expr) - - Given an expression (say *base*), this structure will store every memory - content relatively to an integer offset from *base*. - - The value associated to a given offset is a description of the slice of a - stored expression. The slice size depends on the configutation of the - MemArray. For example, for a slice size of 8 bits, the assignment: - - @32[EAX+0x10] = EBX - - will store for the base EAX: - - 0x10: (EBX, 0) - - 0x11: (EBX, 1) - - 0x12: (EBX, 2) - - 0x13: (EBX, 3) - - If the *base* is EAX+EBX, this structure can store the following contents: - - @32[EAX+EBX] - - @8[EAX+EBX+0x100] - But not: - - @32[EAX+0x10] (which is stored in another MemArray based on EAX) - - @32[EAX+EBX+ECX] (which is stored in another MemArray based on - EAX+EBX+ECX) - - """ - - def __init__(self, base, expr_simp=expr_simp_explicit): - self._base = base - self.expr_simp = expr_simp - self._mask = int(base.mask) - self._offset_to_expr = {} - - @property - def base(self): - """Expression representing the symbolic base address""" - return self._base - - @property - def mask(self): - """Mask offset""" - return self._mask - - def __contains__(self, offset): - return offset in self._offset_to_expr - - def __getitem__(self, offset): - assert 0 <= offset <= self._mask - return self._offset_to_expr.__getitem__(offset) - - def __setitem__(self, offset, value): - raise RuntimeError("Use write api to update keys") - - def __delitem__(self, offset): - assert 0 <= offset <= self._mask - return self._offset_to_expr.__delitem__(offset) - - def __iter__(self): - for offset, _ in viewitems(self._offset_to_expr): - yield offset - - def __len__(self): - return len(self._offset_to_expr) - - def __repr__(self): - out = [] - out.append("Base: %s" % self.base) - for offset, (index, value) in sorted(viewitems(self._offset_to_expr)): - out.append("%16X %d %s" % (offset, index, value)) - return '\n'.join(out) - - def copy(self): - """Copy object instance""" - obj = MemArray(self.base, self.expr_simp) - obj._offset_to_expr = self._offset_to_expr.copy() - return obj - - @staticmethod - def offset_to_ptr(base, offset): - """ - Return an expression representing the @base + @offset - @base: symbolic base address - @offset: relative offset integer to the @base address - """ - if base.is_id(INTERNAL_INTBASE_NAME): - ptr = ExprInt(offset, base.size) - elif offset == 0: - ptr = base - else: - ptr = base + ExprInt(offset, base.size) - return ptr.canonize() - - def read(self, offset, size): - """ - Return memory at @offset with @size as an Expr list - @offset: integer (in bytes) - @size: integer (in bits), byte aligned - - Consider the following state: - - 0x10: (EBX, 0) - - 0x11: (EBX, 1) - - 0x12: (EBX, 2) - - 0x13: (EBX, 3) - - A read at 0x10 of 32 bits should return: EBX - """ - - assert size % 8 == 0 - # Parts is (Expr's offset, size, Expr) - parts = [] - for index in range(size // 8): - # Wrap read: - # @32[EAX+0xFFFFFFFF] is ok and will read at 0xFFFFFFFF, 0, 1, 2 - request_offset = (offset + index) & self._mask - if request_offset in self._offset_to_expr: - # Known memory portion - off, data = self._offset_to_expr[request_offset] - parts.append((off, 1, data)) - continue - - # Unknown memory portion - ptr = self.offset_to_ptr(self.base, request_offset) - data = ExprMem(ptr, 8) - parts.append((0, 1, data)) - - # Group similar datas - # XXX TODO: only little endian here - index = 0 - while index + 1 < len(parts): - off_a, size_a, data_a = parts[index] - off_b, size_b, data_b = parts[index+1] - if data_a == data_b and off_a + size_a == off_b: - # Read consecutive bytes of a variable - # [(0, 8, x), (1, 8, x)] => (0, 16, x) - parts[index:index+2] = [(off_a, size_a + size_b, data_a)] - continue - if data_a.is_int() and data_b.is_int(): - # Read integer parts - # [(0, 8, 0x11223344), (1, 8, 0x55667788)] => (0, 16, 0x7744) - int1 = self.expr_simp(data_a[off_a*8:(off_a+size_a)*8]) - int2 = self.expr_simp(data_b[off_b*8:(off_b+size_b)*8]) - assert int1.is_int() and int2.is_int() - int1, int2 = int(int1), int(int2) - result = ExprInt((int2 << (size_a * 8)) | int1, (size_a + size_b) * 8) - parts[index:index+2] = [(0, size_a + size_b, result)] - continue - if data_a.is_mem() and data_b.is_mem(): - # Read consecutive bytes of a memory variable - ptr_base_a, ptr_offset_a = get_expr_base_offset(data_a.ptr) - ptr_base_b, ptr_offset_b = get_expr_base_offset(data_b.ptr) - if ptr_base_a != ptr_base_b: - index += 1 - continue - if (ptr_offset_a + off_a + size_a) & self._mask == (ptr_offset_b + off_b) & self._mask: - assert size_a <= data_a.size // 8 - off_a - assert size_b <= data_b.size // 8 - off_b - # Successive comparable symbolic pointers - # [(0, 8, @8[ptr]), (0, 8, @8[ptr+1])] => (0, 16, @16[ptr]) - ptr = self.offset_to_ptr(ptr_base_a, (ptr_offset_a + off_a) & self._mask) - - data = ExprMem(ptr, (size_a + size_b) * 8) - parts[index:index+2] = [(0, size_a + size_b, data)] - - continue - - index += 1 - - # Slice datas - read_mem = [] - for off, bytesize, data in parts: - if data.size // 8 != bytesize: - data = data[off * 8: (off + bytesize) * 8] - read_mem.append(data) - - return read_mem - - def write(self, offset, expr): - """ - Write @expr at @offset - @offset: integer (in bytes) - @expr: Expr instance value - """ - assert expr.size % 8 == 0 - assert offset <= self._mask - for index in range(expr.size // 8): - # Wrap write: - # @32[EAX+0xFFFFFFFF] is ok and will write at 0xFFFFFFFF, 0, 1, 2 - request_offset = (offset + index) & self._mask - # XXX TODO: only little endian here - self._offset_to_expr[request_offset] = (index, expr) - - tmp = self.expr_simp(expr[index * 8: (index + 1) * 8]) - # Special case: Simplify slice of pointer (simplification is ok - # here, as we won't store the simplified expression) - if tmp.is_slice() and tmp.arg.is_mem() and tmp.start % 8 == 0: - new_ptr = self.expr_simp( - tmp.arg.ptr + ExprInt(tmp.start // 8, tmp.arg.ptr.size) - ) - tmp = ExprMem(new_ptr, tmp.stop - tmp.start) - # Test if write to original value - if tmp.is_mem(): - src_ptr, src_off = get_expr_base_offset(tmp.ptr) - if src_ptr == self.base and src_off == request_offset: - del self._offset_to_expr[request_offset] - - - def _get_variable_parts(self, index, known_offsets, forward=True): - """ - Find consecutive memory parts representing the same variable. The part - starts at offset known_offsets[@index] and search is in offset direction - determined by @forward - Return the number of consecutive parts of the same variable. - - @index: index of the memory offset in known_offsets - @known_offsets: sorted offsets - @forward: Search in offset growing direction if True, else in reverse - order - """ - - offset = known_offsets[index] - value_byte_index, value = self._offset_to_expr[offset] - assert value.size % 8 == 0 - - if forward: - start, end, step = value_byte_index + 1, value.size // 8, 1 - else: - start, end, step = value_byte_index - 1, -1, -1 - - partnum = 1 - for value_offset in range(start, end, step): - offset += step - # Check if next part is in known_offsets - next_index = index + step * partnum - if not 0 <= next_index < len(known_offsets): - break - - offset_next = known_offsets[next_index] - if offset_next != offset: - break - - # Check if next part is a part of the searched value - byte_index, value_next = self._offset_to_expr[offset_next] - if byte_index != value_offset: - break - if value != value_next: - break - partnum += 1 - - return partnum - - - def _build_value_at_offset(self, value, offset, start, length): - """ - Return a couple. The first element is the memory Expression representing - the value at @offset, the second is its value. The value is truncated - at byte @start with @length - - @value: Expression to truncate - @offset: offset in bytes of the variable (integer) - @start: value's byte offset (integer) - @length: length in bytes (integer) - """ - - ptr = self.offset_to_ptr(self.base, offset) - size = length * 8 - if start == 0 and size == value.size: - result = value - else: - result = self.expr_simp(value[start * 8: start * 8 + size]) - - return ExprMem(ptr, size), result - - - def memory(self): - """ - Iterate on stored memory/values - - The goal here is to group entities. - - Consider the following state: - EAX + 0x10 = (0, EDX) - EAX + 0x11 = (1, EDX) - EAX + 0x12 = (2, EDX) - EAX + 0x13 = (3, EDX) - - The function should return: - @32[EAX + 0x10] = EDX - """ - - if not self._offset_to_expr: - return - known_offsets = sorted(self._offset_to_expr) - index = 0 - # Test if the first element is the continuation of the last byte. If - # yes, merge and output it first. - min_int = 0 - max_int = (1 << self.base.size) - 1 - limit_index = len(known_offsets) - - first_element = None - # Special case where a variable spreads on max_int/min_int - if known_offsets[0] == min_int and known_offsets[-1] == max_int: - min_offset, max_offset = known_offsets[0], known_offsets[-1] - min_byte_index, min_value = self._offset_to_expr[min_offset] - max_byte_index, max_value = self._offset_to_expr[max_offset] - if min_value == max_value and max_byte_index + 1 == min_byte_index: - # Look for current variable start - partnum_before = self._get_variable_parts(len(known_offsets) - 1, known_offsets, False) - # Look for current variable end - partnum_after = self._get_variable_parts(0, known_offsets) - - partnum = partnum_before + partnum_after - offset = known_offsets[-partnum_before] - index_value, value = self._offset_to_expr[offset] - - mem, result = self._build_value_at_offset(value, offset, index_value, partnum) - first_element = mem, result - index = partnum_after - limit_index = len(known_offsets) - partnum_before - - # Special cases are done, walk and merge variables - while index < limit_index: - offset = known_offsets[index] - index_value, value = self._offset_to_expr[offset] - partnum = self._get_variable_parts(index, known_offsets) - mem, result = self._build_value_at_offset(value, offset, index_value, partnum) - yield mem, result - index += partnum - - if first_element is not None: - yield first_element - - def dump(self): - """Display MemArray content""" - for mem, value in self.memory(): - print("%s = %s" % (mem, value)) - - -class MemSparse(object): - """Link a symbolic memory pointer to its MemArray. - - For each symbolic memory object, this object will extract the memory pointer - *ptr*. It then splits *ptr* into a symbolic and an integer part. For - example, the memory @[ESP+4] will give ESP+4 for *ptr*. *ptr* is then split - into its base ESP and its offset 4. Each symbolic base address uses a - different MemArray. - - Example: - - @32[EAX+EBX] - - @8[EAX+EBX+0x100] - Will be stored in the same MemArray with a EAX+EBX base - - """ - - def __init__(self, addrsize, expr_simp=expr_simp_explicit): - """ - @addrsize: size (in bits) of the addresses manipulated by the MemSparse - @expr_simp: an ExpressionSimplifier instance - """ - self.addrsize = addrsize - self.expr_simp = expr_simp - self.base_to_memarray = {} - - def __contains__(self, expr): - """ - Return True if the whole @expr is present - For partial check, use 'contains_partial' - """ - if not expr.is_mem(): - return False - ptr = expr.ptr - base, offset = get_expr_base_offset(ptr) - memarray = self.base_to_memarray.get(base, None) - if memarray is None: - return False - for i in range(expr.size // 8): - if offset + i not in memarray: - return False - return True - - def contains_partial(self, expr): - """ - Return True if a part of @expr is present in memory - """ - if not expr.is_mem(): - return False - ptr = expr.ptr - base, offset = get_expr_base_offset(ptr) - memarray = self.base_to_memarray.get(base, None) - if memarray is None: - return False - for i in range(expr.size // 8): - if offset + i in memarray: - return True - return False - - def clear(self): - """Reset the current object content""" - self.base_to_memarray.clear() - - def copy(self): - """Copy the current object instance""" - base_to_memarray = {} - for base, memarray in viewitems(self.base_to_memarray): - base_to_memarray[base] = memarray.copy() - obj = MemSparse(self.addrsize, self.expr_simp) - obj.base_to_memarray = base_to_memarray - return obj - - def __delitem__(self, expr): - """ - Delete a value @expr *fully* present in memory - For partial delete, use delete_partial - """ - ptr = expr.ptr - base, offset = get_expr_base_offset(ptr) - memarray = self.base_to_memarray.get(base, None) - if memarray is None: - raise KeyError - # Check if whole entity is in the MemArray before deleting it - for i in range(expr.size // 8): - if (offset + i) & memarray.mask not in memarray: - raise KeyError - for i in range(expr.size // 8): - del memarray[(offset + i) & memarray.mask] - - def delete_partial(self, expr): - """ - Delete @expr from memory. Skip parts of @expr which are not present in - memory. - """ - ptr = expr.ptr - base, offset = get_expr_base_offset(ptr) - memarray = self.base_to_memarray.get(base, None) - if memarray is None: - raise KeyError - # Check if whole entity is in the MemArray before deleting it - for i in range(expr.size // 8): - real_offset = (offset + i) & memarray.mask - if real_offset in memarray: - del memarray[real_offset] - - def read(self, ptr, size): - """ - Return the value associated with the Expr at address @ptr - @ptr: Expr representing the memory address - @size: memory size (in bits), byte aligned - """ - assert size % 8 == 0 - base, offset = get_expr_base_offset(ptr) - memarray = self.base_to_memarray.get(base, None) - if memarray is not None: - mems = memarray.read(offset, size) - ret = ExprCompose(*mems) - else: - ret = ExprMem(ptr, size) - return ret - - def write(self, ptr, expr): - """ - Update the corresponding Expr @expr at address @ptr - @ptr: Expr representing the memory address - @expr: Expr instance - """ - assert ptr.size == self.addrsize - base, offset = get_expr_base_offset(ptr) - memarray = self.base_to_memarray.get(base, None) - if memarray is None: - memarray = MemArray(base, self.expr_simp) - self.base_to_memarray[base] = memarray - memarray.write(offset, expr) - - def iteritems(self): - """Iterate on stored memory variables and their values.""" - for _, memarray in viewitems(self.base_to_memarray): - for mem, value in memarray.memory(): - yield mem, value - - def items(self): - """Return stored memory variables and their values.""" - return list(self.iteritems()) - - def dump(self): - """Display MemSparse content""" - for mem, value in viewitems(self): - print("%s = %s" % (mem, value)) - - def __repr__(self): - out = [] - for _, memarray in sorted(viewitems(self.base_to_memarray)): - out.append(repr(memarray)) - return '\n'.join(out) - - -class SymbolMngr(object): - """Symbolic store manager (IDs and MEMs)""" - - def __init__(self, init=None, addrsize=None, expr_simp=expr_simp_explicit): - assert addrsize is not None - if init is None: - init = {} - self.addrsize = addrsize - self.expr_simp = expr_simp - self.symbols_id = {} - self.symbols_mem = MemSparse(addrsize, expr_simp) - self.mask = (1 << addrsize) - 1 - for expr, value in viewitems(init): - self.write(expr, value) - - def __contains__(self, expr): - if expr.is_id(): - return self.symbols_id.__contains__(expr) - if expr.is_mem(): - return self.symbols_mem.__contains__(expr) - return False - - def __getitem__(self, expr): - return self.read(expr) - - def __setitem__(self, expr, value): - self.write(expr, value) - - def __delitem__(self, expr): - if expr.is_id(): - del self.symbols_id[expr] - elif expr.is_mem(): - del self.symbols_mem[expr] - else: - raise TypeError("Bad source expr") - - def copy(self): - """Copy object instance""" - obj = SymbolMngr(self, addrsize=self.addrsize, expr_simp=self.expr_simp) - return obj - - def clear(self): - """Forget every variables values""" - self.symbols_id.clear() - self.symbols_mem.clear() - - def read(self, src): - """ - Return the value corresponding to Expr @src - @src: ExprId or ExprMem instance - """ - if src.is_id(): - return self.symbols_id.get(src, src) - elif src.is_mem(): - # Only byte aligned accesses are supported for now - assert src.size % 8 == 0 - return self.symbols_mem.read(src.ptr, src.size) - else: - raise TypeError("Bad source expr") - - def write(self, dst, src): - """ - Update @dst with @src expression - @dst: ExprId or ExprMem instance - @src: Expression instance - """ - assert dst.size == src.size - if dst.is_id(): - if dst == src: - if dst in self.symbols_id: - del self.symbols_id[dst] - else: - self.symbols_id[dst] = src - elif dst.is_mem(): - # Only byte aligned accesses are supported for now - assert dst.size % 8 == 0 - self.symbols_mem.write(dst.ptr, src) - else: - raise TypeError("Bad destination expr") - - def dump(self, ids=True, mems=True): - """Display memory content""" - if ids: - for variable, value in self.ids(): - print('%s = %s' % (variable, value)) - if mems: - for mem, value in self.memory(): - print('%s = %s' % (mem, value)) - - def __repr__(self): - out = [] - for variable, value in viewitems(self): - out.append('%s = %s' % (variable, value)) - return "\n".join(out) - - def iteritems(self): - """ExprId/ExprMem iteritems of the current state""" - for variable, value in self.ids(): - yield variable, value - for variable, value in self.memory(): - yield variable, value - - def items(self): - """Return variables/values of the current state""" - return list(self.iteritems()) - - def __iter__(self): - for expr, _ in self.iteritems(): - yield expr - - def ids(self): - """Iterate on variables and their values.""" - for expr, value in viewitems(self.symbols_id): - yield expr, value - - def memory(self): - """Iterate on memory variables and their values.""" - for mem, value in viewitems(self.symbols_mem): - yield mem, value - - def keys(self): - """Variables of the current state""" - return list(self) - - -def merge_ptr_read(known, ptrs): - """ - Merge common memory parts in a multiple byte memory. - @ptrs: memory bytes list - @known: ptrs' associated boolean for present/unpresent memory part in the - store - """ - assert known - out = [] - known.append(None) - ptrs.append(None) - last, value, size = known[0], ptrs[0], 8 - for index, part in enumerate(known[1:], 1): - if part == last: - size += 8 - else: - out.append((last, value, size)) - last, value, size = part, ptrs[index], 8 - return out - - -class SymbolicExecutionEngine(object): - """ - Symbolic execution engine - Allow IR code emulation in symbolic domain - - - Examples: - from miasm2.ir.symbexec import SymbolicExecutionEngine - from miasm2.ir.ir import AssignBlock - - ir_arch = ir_x86_32() - - init_state = { - ir_arch.arch.regs.EAX: ir_arch.arch.regs.EBX, - ExprMem(id_x+ExprInt(0x10, 32), 32): id_a, - } - - sb_exec = SymbolicExecutionEngine(ir_arch, init_state) - - >>> sb_exec.dump() - EAX = a - @32[x + 0x10] = a - >>> sb_exec.dump(mems=False) - EAX = a - - >>> print sb_exec.eval_expr(ir_arch.arch.regs.EAX + ir_arch.arch.regs.ECX) - EBX + ECX - - Inspecting state: - - dump - - modified - State manipulation: - - '.state' (rw) - - Evaluation (read only): - - eval_expr - - eval_assignblk - Evaluation with state update: - - eval_updt_expr - - eval_updt_assignblk - - eval_updt_irblock - - Start a symbolic execution based on provisioned '.ir_arch' blocks: - - run_block_at - - run_at - """ - - StateEngine = SymbolicState - - def __init__(self, ir_arch, state=None, - sb_expr_simp=expr_simp_explicit): - - self.expr_to_visitor = { - ExprInt: self.eval_exprint, - ExprId: self.eval_exprid, - ExprLoc: self.eval_exprloc, - ExprMem: self.eval_exprmem, - ExprSlice: self.eval_exprslice, - ExprCond: self.eval_exprcond, - ExprOp: self.eval_exprop, - ExprCompose: self.eval_exprcompose, - } - - if state is None: - state = {} - - self.symbols = SymbolMngr(addrsize=ir_arch.addrsize, expr_simp=sb_expr_simp) - - for dst, src in viewitems(state): - self.symbols.write(dst, src) - - self.ir_arch = ir_arch - self.expr_simp = sb_expr_simp - - def get_state(self): - """Return the current state of the SymbolicEngine""" - state = self.StateEngine(dict(self.symbols)) - return state - - def set_state(self, state): - """Restaure the @state of the engine - @state: StateEngine instance - """ - self.symbols = SymbolMngr(addrsize=self.ir_arch.addrsize, expr_simp=self.expr_simp) - for dst, src in viewitems(dict(state)): - self.symbols[dst] = src - - state = property(get_state, set_state) - - def eval_expr_visitor(self, expr, cache=None): - """ - [DEV]: Override to change the behavior of an Expr evaluation. - This function recursively applies 'eval_expr*' to @expr. - This function uses @cache to speedup re-evaluation of expression. - """ - if cache is None: - cache = {} - - ret = cache.get(expr, None) - if ret is not None: - return ret - - new_expr = self.expr_simp(expr) - ret = cache.get(new_expr, None) - if ret is not None: - return ret - - func = self.expr_to_visitor.get(new_expr.__class__, None) - if func is None: - raise TypeError("Unknown expr type") - - ret = func(new_expr, cache=cache) - ret = self.expr_simp(ret) - assert ret is not None - - cache[expr] = ret - cache[new_expr] = ret - return ret - - def eval_exprint(self, expr, **kwargs): - """[DEV]: Evaluate an ExprInt using the current state""" - return expr - - def eval_exprid(self, expr, **kwargs): - """[DEV]: Evaluate an ExprId using the current state""" - ret = self.symbols.read(expr) - return ret - - def eval_exprloc(self, expr, **kwargs): - """[DEV]: Evaluate an ExprLoc using the current state""" - offset = self.ir_arch.loc_db.get_location_offset(expr.loc_key) - if offset is not None: - ret = ExprInt(offset, expr.size) - else: - ret = expr - return ret - - def eval_exprmem(self, expr, **kwargs): - """[DEV]: Evaluate an ExprMem using the current state - This function first evaluate the memory pointer value. - Override 'mem_read' to modify the effective memory accesses - """ - ptr = self.eval_expr_visitor(expr.ptr, **kwargs) - mem = ExprMem(ptr, expr.size) - ret = self.mem_read(mem) - return ret - - def eval_exprcond(self, expr, **kwargs): - """[DEV]: Evaluate an ExprCond using the current state""" - cond = self.eval_expr_visitor(expr.cond, **kwargs) - src1 = self.eval_expr_visitor(expr.src1, **kwargs) - src2 = self.eval_expr_visitor(expr.src2, **kwargs) - ret = ExprCond(cond, src1, src2) - return ret - - def eval_exprslice(self, expr, **kwargs): - """[DEV]: Evaluate an ExprSlice using the current state""" - arg = self.eval_expr_visitor(expr.arg, **kwargs) - ret = ExprSlice(arg, expr.start, expr.stop) - return ret - - def eval_exprop(self, expr, **kwargs): - """[DEV]: Evaluate an ExprOp using the current state""" - args = [] - for oarg in expr.args: - arg = self.eval_expr_visitor(oarg, **kwargs) - args.append(arg) - ret = ExprOp(expr.op, *args) - return ret - - def eval_exprcompose(self, expr, **kwargs): - """[DEV]: Evaluate an ExprCompose using the current state""" - args = [] - for arg in expr.args: - args.append(self.eval_expr_visitor(arg, **kwargs)) - ret = ExprCompose(*args) - return ret - - def eval_expr(self, expr, eval_cache=None): - """ - Evaluate @expr - @expr: Expression instance to evaluate - @cache: None or dictionary linking variables to their values - """ - if eval_cache is None: - eval_cache = {} - ret = self.eval_expr_visitor(expr, cache=eval_cache) - assert ret is not None - return ret - - def modified(self, init_state=None, ids=True, mems=True): - """ - Return the modified variables. - @init_state: a base dictionary linking variables to their initial values - to diff. Can be None. - @ids: track ids only - @mems: track mems only - """ - if init_state is None: - init_state = {} - if ids: - for variable, value in viewitems(self.symbols.symbols_id): - if variable in init_state and init_state[variable] == value: - continue - yield variable, value - if mems: - for mem, value in self.symbols.memory(): - if mem in init_state and init_state[mem] == value: - continue - yield mem, value - - def dump(self, ids=True, mems=True): - """ - Display modififed variables - @ids: display modified ids - @mems: display modified memory - """ - - for variable, value in self.modified(None, ids, mems): - print("%-18s" % variable, "=", "%s" % value) - - def eval_assignblk(self, assignblk): - """ - Evaluate AssignBlock using the current state - - Returns a dictionary containing modified keys associated to their values - - @assignblk: AssignBlock instance - """ - pool_out = {} - eval_cache = {} - for dst, src in viewitems(assignblk): - src = self.eval_expr(src, eval_cache) - if dst.is_mem(): - ptr = self.eval_expr(dst.ptr, eval_cache) - # Test if mem lookup is known - tmp = ExprMem(ptr, dst.size) - pool_out[tmp] = src - elif dst.is_id(): - pool_out[dst] = src - else: - raise ValueError("Unknown destination type", str(dst)) - - return pool_out - - def apply_change(self, dst, src): - """ - Apply @dst = @src on the current state WITHOUT evaluating both side - @dst: Expr, destination - @src: Expr, source - """ - if dst.is_mem(): - self.mem_write(dst, src) - else: - self.symbols.write(dst, src) - - def eval_updt_assignblk(self, assignblk): - """ - Apply an AssignBlock on the current state - @assignblk: AssignBlock instance - """ - mem_dst = [] - dst_src = self.eval_assignblk(assignblk) - for dst, src in viewitems(dst_src): - self.apply_change(dst, src) - if dst.is_mem(): - mem_dst.append(dst) - return mem_dst - - def eval_updt_irblock(self, irb, step=False): - """ - Symbolic execution of the @irb on the current state - @irb: irbloc instance - @step: display intermediate steps - """ - for assignblk in irb: - if step: - print('Instr', assignblk.instr) - print('Assignblk:') - print(assignblk) - print('_' * 80) - self.eval_updt_assignblk(assignblk) - if step: - self.dump(mems=False) - self.dump(ids=False) - print('_' * 80) - dst = self.eval_expr(self.ir_arch.IRDst) - - return dst - - def run_block_at(self, ircfg, addr, step=False): - """ - Symbolic execution of the block at @addr - @addr: address to execute (int or ExprInt or label) - @step: display intermediate steps - """ - irblock = ircfg.get_block(addr) - if irblock is not None: - addr = self.eval_updt_irblock(irblock, step=step) - return addr - - def run_at(self, ircfg, addr, lbl_stop=None, step=False): - """ - Symbolic execution starting at @addr - @addr: address to execute (int or ExprInt or label) - @lbl_stop: LocKey to stop execution on - @step: display intermediate steps - """ - while True: - irblock = ircfg.get_block(addr) - if irblock is None: - break - if irblock.loc_key == lbl_stop: - break - addr = self.eval_updt_irblock(irblock, step=step) - return addr - - def del_mem_above_stack(self, stack_ptr): - """ - Remove all stored memory values with following properties: - * pointer based on initial stack value - * pointer below current stack pointer - """ - stack_ptr = self.eval_expr(stack_ptr) - base, stk_offset = get_expr_base_offset(stack_ptr) - memarray = self.symbols.symbols_mem.base_to_memarray.get(base, None) - if memarray: - to_del = set() - for offset in memarray: - if ((offset - stk_offset) & int(stack_ptr.mask)) >> (stack_ptr.size - 1) != 0: - to_del.add(offset) - - for offset in to_del: - del memarray[offset] - - def eval_updt_expr(self, expr): - """ - Evaluate @expr and apply side effect if needed (ie. if expr is an - assignment). Return the evaluated value - """ - - # Update value if needed - if expr.is_aff(): - ret = self.eval_expr(expr.src) - self.eval_updt_assignblk(AssignBlock([expr])) - else: - ret = self.eval_expr(expr) - - return ret - - def mem_read(self, expr): - """ - [DEV]: Override to modify the effective memory reads - - Read symbolic value at ExprMem @expr - @expr: ExprMem - """ - return self.symbols.read(expr) - - def mem_write(self, dst, src): - """ - [DEV]: Override to modify the effective memory writes - - Write symbolic value @src at ExprMem @dst - @dst: destination ExprMem - @src: source Expression - """ - self.symbols.write(dst, src) diff --git a/miasm2/ir/symbexec_top.py b/miasm2/ir/symbexec_top.py deleted file mode 100644 index a1a255f8..00000000 --- a/miasm2/ir/symbexec_top.py +++ /dev/null @@ -1,221 +0,0 @@ -from future.utils import viewitems - -from miasm2.ir.symbexec import SymbolicExecutionEngine, StateEngine -from miasm2.expression.simplifications import expr_simp -from miasm2.expression.expression import ExprId, ExprInt, ExprSlice,\ - ExprMem, ExprCond, ExprCompose, ExprOp - - -TOPSTR = "TOP" - -def exprid_top(expr): - """Return a TOP expression (ExprId("TOP") of size @expr.size - @expr: expression to replace with TOP - """ - return ExprId(TOPSTR, expr.size) - - -class SymbolicStateTop(StateEngine): - - def __init__(self, dct, regstop): - self._symbols = frozenset(viewitems(dct)) - self._regstop = frozenset(regstop) - - def __hash__(self): - return hash((self.__class__, self._symbols, self._regstop)) - - def __str__(self): - out = [] - for dst, src in sorted(self._symbols): - out.append("%s = %s" % (dst, src)) - for dst in self._regstop: - out.append('TOP %s' %dst) - return "\n".join(out) - - def __eq__(self, other): - if self is other: - return True - if self.__class__ != other.__class__: - return False - return (self.symbols == other.symbols and - self.regstop == other.regstop) - - def __ne__(self, other): - return not self.__eq__(other) - - def __iter__(self): - for dst, src in self._symbols: - yield dst, src - - def merge(self, other): - """Merge two symbolic states - Only equal expressions are kept in both states - @other: second symbolic state - """ - symb_a = self.symbols - symb_b = other.symbols - intersection = set(symb_a).intersection(symb_b) - diff = set(symb_a).union(symb_b).difference(intersection) - symbols = {} - regstop = set() - for dst in diff: - if dst.is_id(): - regstop.add(dst) - for dst in intersection: - if symb_a[dst] == symb_b[dst]: - symbols[dst] = symb_a[dst] - else: - regstop.add(dst) - return self.__class__(symbols, regstop) - - @property - def symbols(self): - """Return the dictionary of known symbols""" - return dict(self._symbols) - - @property - def regstop(self): - """Return the set of expression with TOP values""" - return self._regstop - -class SymbExecTopNoMem(SymbolicExecutionEngine): - """ - Symbolic execution, include TOP value. - ExprMem are not propagated. - Any computation involving a TOP will generate TOP. - """ - - StateEngine = SymbolicStateTop - - def __init__(self, ir_arch, state, regstop, - sb_expr_simp=expr_simp): - known_symbols = dict(state) - super(SymbExecTopNoMem, self).__init__(ir_arch, known_symbols, - sb_expr_simp) - self.regstop = set(regstop) - - def get_state(self): - """Return the current state of the SymbolicEngine""" - return self.StateEngine(self.symbols, self.regstop) - - def eval_expr(self, expr, eval_cache=None): - if expr in self.regstop: - return exprid_top(expr) - if eval_cache is None: - eval_cache = {} - ret = self.apply_expr_on_state_visit_cache(expr, self.symbols, eval_cache) - return ret - - def manage_mem(self, expr, state, cache, level): - ptr = self.apply_expr_on_state_visit_cache(expr.arg, state, cache, level+1) - ret = ExprMem(ptr, expr.size) - ret = self.get_mem_state(ret) - if ret.is_mem() and not ret.arg.is_int() and ret.arg == ptr: - ret = exprid_top(expr) - assert expr.size == ret.size - return ret - - - def eval_exprid(self, expr, **kwargs): - """[DEV]: Evaluate an ExprId using the current state""" - if expr in self.regstop: - ret = exprid_top(expr) - else: - ret = self.symbols.read(expr) - return ret - - def eval_exprloc(self, expr, **kwargs): - offset = self.ir_arch.loc_db.get_location_offset(expr.loc_key) - if offset is not None: - ret = ExprInt(offset, expr.size) - else: - ret = expr - return ret - - def eval_exprcond(self, expr, **kwargs): - """[DEV]: Evaluate an ExprCond using the current state""" - cond = self.eval_expr_visitor(expr.cond, **kwargs) - src1 = self.eval_expr_visitor(expr.src1, **kwargs) - src2 = self.eval_expr_visitor(expr.src2, **kwargs) - if cond.is_id(TOPSTR) or src1.is_id(TOPSTR) or src2.is_id(TOPSTR): - ret = exprid_top(expr) - else: - ret = ExprCond(cond, src1, src2) - return ret - - def eval_exprslice(self, expr, **kwargs): - """[DEV]: Evaluate an ExprSlice using the current state""" - arg = self.eval_expr_visitor(expr.arg, **kwargs) - if arg.is_id(TOPSTR): - ret = exprid_top(expr) - else: - ret = ExprSlice(arg, expr.start, expr.stop) - return ret - - def eval_exprop(self, expr, **kwargs): - """[DEV]: Evaluate an ExprOp using the current state""" - args = [] - for oarg in expr.args: - arg = self.eval_expr_visitor(oarg, **kwargs) - if arg.is_id(TOPSTR): - return exprid_top(expr) - args.append(arg) - ret = ExprOp(expr.op, *args) - return ret - - def eval_exprcompose(self, expr, **kwargs): - """[DEV]: Evaluate an ExprCompose using the current state""" - args = [] - for arg in expr.args: - arg = self.eval_expr_visitor(arg, **kwargs) - if arg.is_id(TOPSTR): - return exprid_top(expr) - args.append(arg) - ret = ExprCompose(*args) - return ret - - def apply_change(self, dst, src): - eval_cache = {} - if dst.is_mem(): - # If Write to TOP, forget all memory information - ret = self.eval_expr(dst.arg, eval_cache) - if ret.is_id(TOPSTR): - to_del = set() - for dst_tmp in self.symbols: - if dst_tmp.is_mem(): - to_del.add(dst_tmp) - for dst_to_del in to_del: - del self.symbols[dst_to_del] - return - src_o = self.expr_simp(src) - - # Force update. Ex: - # EBX += 1 (state: EBX = EBX+1) - # EBX -= 1 (state: EBX = EBX, must be updated) - if dst in self.regstop: - self.regstop.discard(dst) - self.symbols[dst] = src_o - - if dst == src_o: - # Avoid useless X = X information - del self.symbols[dst] - - if src_o.is_id(TOPSTR): - if dst in self.symbols: - del self.symbols[dst] - self.regstop.add(dst) - -class SymbExecTop(SymbExecTopNoMem): - """ - Symbolic execution, include TOP value. - ExprMem are propagated. - Any computation involving a TOP will generate TOP. - WARNING: avoid memory aliases here! - """ - - def manage_mem(self, expr, state, cache, level): - ptr = self.apply_expr_on_state_visit_cache(expr.arg, state, cache, level+1) - ret = ExprMem(ptr, expr.size) - ret = self.get_mem_state(ret) - assert expr.size == ret.size - return ret diff --git a/miasm2/ir/symbexec_types.py b/miasm2/ir/symbexec_types.py deleted file mode 100644 index 57b7580a..00000000 --- a/miasm2/ir/symbexec_types.py +++ /dev/null @@ -1,131 +0,0 @@ -from __future__ import print_function - -from future.utils import viewitems - -from miasm2.ir.symbexec import SymbolicExecutionEngine, StateEngine -from miasm2.expression.simplifications import expr_simp -from miasm2.expression.expression import ExprId, ExprMem - - -class SymbolicStateCTypes(StateEngine): - """Store C types of symbols""" - - def __init__(self, symbols): - tmp = {} - for expr, types in viewitems(symbols): - tmp[expr] = frozenset(types) - self._symbols = frozenset(viewitems(tmp)) - - def __hash__(self): - return hash((self.__class__, self._symbols)) - - def __str__(self): - out = [] - for dst, src in sorted(self._symbols): - out.append("%s = %s" % (dst, src)) - return "\n".join(out) - - def __eq__(self, other): - if self is other: - return True - if self.__class__ != other.__class__: - return False - return self.symbols == other.symbols - - def __ne__(self, other): - return not self.__eq__(other) - - def __iter__(self): - for dst, src in self._symbols: - yield dst, src - - def merge(self, other): - """Merge two symbolic states - The resulting types are the union of types of both states. - @other: second symbolic state - """ - symb_a = self.symbols - symb_b = other.symbols - symbols = {} - for expr in set(symb_a).union(set(symb_b)): - ctypes = symb_a.get(expr, set()).union(symb_b.get(expr, set())) - if ctypes: - symbols[expr] = ctypes - return self.__class__(symbols) - - @property - def symbols(self): - """Return the dictionary of known symbols'types""" - return dict(self._symbols) - - -class SymbExecCType(SymbolicExecutionEngine): - """Engine of C types propagation - WARNING: avoid memory aliases here! - """ - - StateEngine = SymbolicStateCTypes - OBJC_INTERNAL = "___OBJC___" - - def __init__(self, ir_arch, - symbols, - chandler, - sb_expr_simp=expr_simp): - self.chandler = chandler - - super(SymbExecCType, self).__init__(ir_arch, - {}, - sb_expr_simp) - self.symbols = dict(symbols) - - def get_state(self): - """Return the current state of the SymbolicEngine""" - return self.StateEngine(self.symbols) - - def eval_assignblk(self, assignblk): - """ - Evaluate AssignBlock on the current state - @assignblk: AssignBlock instance - """ - pool_out = {} - for dst, src in viewitems(assignblk): - objcs = self.chandler.expr_to_types(src, self.symbols) - if isinstance(dst, ExprMem): - continue - elif isinstance(dst, ExprId): - pool_out[dst] = frozenset(objcs) - else: - raise ValueError("Unsupported assignment", str(dst)) - return pool_out - - def eval_expr(self, expr, eval_cache=None): - return frozenset(self.chandler.expr_to_types(expr, self.symbols)) - - def apply_change(self, dst, src): - if src is None: - if dst in self.symbols: - del self.symbols[dst] - else: - self.symbols[dst] = src - - def del_mem_above_stack(self, stack_ptr): - """No stack deletion""" - return - - def dump_id(self): - """ - Dump modififed registers symbols only - """ - for expr, expr_types in sorted(viewitems(self.symbols)): - if not expr.is_mem(): - print(expr) - for expr_type in expr_types: - print('\t', expr_type) - - def dump_mem(self): - """ - Dump modififed memory symbols - """ - for expr, value in sorted(viewitems(self.symbols)): - if expr.is_mem(): - print(expr, value) diff --git a/miasm2/ir/translators/C.py b/miasm2/ir/translators/C.py deleted file mode 100644 index e44e859f..00000000 --- a/miasm2/ir/translators/C.py +++ /dev/null @@ -1,528 +0,0 @@ -from miasm2.ir.translators.translator import Translator -from miasm2.expression.modint import size2mask -from miasm2.expression.expression import ExprInt, ExprCond, ExprCompose, \ - TOK_EQUAL, \ - TOK_INF_SIGNED, TOK_INF_UNSIGNED, \ - TOK_INF_EQUAL_SIGNED, TOK_INF_EQUAL_UNSIGNED - -def int_size_to_bn(value, size): - if size < 32: - int_str = "%.8x" % value - size_nibble = 8 - else: - # size must be multiple of 4 - size = ((size + 31) // 32) * 32 - size_nibble = size // 4 - fmt_str = "%%.%dx" % size_nibble - int_str = fmt_str % value - assert len(int_str) == size_nibble - return int_str, size_nibble - - -TOK_CMP_TO_NATIVE_C = { - TOK_EQUAL: "==", - TOK_INF_SIGNED: "<", - TOK_INF_UNSIGNED: "<", - TOK_INF_EQUAL_SIGNED: "<=", - TOK_INF_EQUAL_UNSIGNED: "<=", -} - -TOK_CMP_TO_BIGNUM_C = { - TOK_EQUAL: "equal", - TOK_INF_SIGNED: "inf_signed", - TOK_INF_UNSIGNED: "inf_unsigned", - TOK_INF_EQUAL_SIGNED: "inf_equal_signed", - TOK_INF_EQUAL_UNSIGNED: "inf_equal_unsigned", -} - - -class TranslatorC(Translator): - "Translate a Miasm expression to an equivalent C code" - - # Implemented language - __LANG__ = "C" - - # Operations translation - dct_shift = {'a>>': "right_arith", - '>>': "right_logic", - '<<': "left_logic", - } - dct_rot = {'<<<': 'rot_left', - '>>>': 'rot_right', - } - - NATIVE_INT_MAX_SIZE = 64 - - def __init__(self, loc_db=None, **kwargs): - """Instance a C translator - @loc_db: LocationDB instance - """ - super(TranslatorC, self).__init__(**kwargs) - # symbol pool - self.loc_db = loc_db - - def _size2mask(self, size): - """Return a C string corresponding to the size2mask operation, with support for - @size <= 64""" - assert size <= 64 - mask = size2mask(size) - return "0x%x" % mask - - def from_ExprId(self, expr): - return str(expr) - - def from_ExprInt(self, expr): - if expr.size <= self.NATIVE_INT_MAX_SIZE: - assert expr.size <= 64 - out = "0x%x" % int(expr) - if expr.size == 64: - out += "ULL" - return out - value, int_size = int_size_to_bn(int(expr), expr.size) - return 'bignum_from_string("%s", %d)' % (value, int_size) - - def from_ExprLoc(self, expr): - loc_key = expr.loc_key - if self.loc_db is None: - return str(loc_key) - offset = self.loc_db.get_location_offset(loc_key) - if offset is None: - return str(loc_key) - - if expr.size <= self.NATIVE_INT_MAX_SIZE: - return "0x%x" % offset - - value, int_size = int_size_to_bn(offset, 64) - return 'bignum_from_string("%s", %d)' % (value, int_size) - - def from_ExprAssign(self, expr): - new_dst = self.from_expr(expr.dst) - new_src = self.from_expr(expr.src) - return "%s = %s" % (new_dst, new_src) - - def from_ExprCond(self, expr): - cond = self.from_expr(expr.cond) - src1 = self.from_expr(expr.src1) - src2 = self.from_expr(expr.src2) - if not expr.cond.size <= self.NATIVE_INT_MAX_SIZE: - cond = "(!bignum_is_zero(%s))" % cond - out = "(%s?%s:%s)" % (cond, src1, src2) - return out - - def from_ExprMem(self, expr): - ptr = expr.ptr - if ptr.size <= self.NATIVE_INT_MAX_SIZE: - new_ptr = self.from_expr(ptr) - if expr.size <= self.NATIVE_INT_MAX_SIZE: - # Native ptr, Native Mem - return "MEM_LOOKUP_%.2d(jitcpu, %s)" % (expr.size, new_ptr) - else: - # Native ptr, BN mem - return "MEM_LOOKUP_INT_BN(jitcpu, %d, %s)" % (expr.size, new_ptr) - # BN ptr - new_ptr = self.from_expr(ptr) - - if expr.size <= self.NATIVE_INT_MAX_SIZE: - # BN ptr, Native Mem - return "MEM_LOOKUP_BN_INT(jitcpu, %d, %s)" % (expr.size, new_ptr) - else: - # BN ptr, BN mem - return "MEM_LOOKUP_BN_BN(jitcpu, %d, %s)" % (expr.size, new_ptr) - - def from_ExprOp(self, expr): - if len(expr.args) == 1: - if expr.op == 'parity': - arg = expr.args[0] - out = self.from_expr(arg) - if arg.size <= self.NATIVE_INT_MAX_SIZE: - out = "(%s&%s)" % (out, self._size2mask(arg.size)) - else: - out = 'bignum_mask(%s, 8)' % (out, 8) - out = 'bignum_to_uint64(%s)' % out - out = 'parity(%s)' % out - return out - - elif expr.op.startswith("zeroExt_"): - arg = expr.args[0] - if expr.size == arg.size: - return arg - return self.from_expr(ExprCompose(arg, ExprInt(0, expr.size - arg.size))) - - elif expr.op.startswith("signExt_"): - arg = expr.args[0] - if expr.size == arg.size: - return arg - add_size = expr.size - arg.size - new_expr = ExprCompose( - arg, - ExprCond( - arg.msb(), - ExprInt(size2mask(add_size), add_size), - ExprInt(0, add_size) - ) - ) - return self.from_expr(new_expr) - - - elif expr.op in ['cntleadzeros', 'cnttrailzeros']: - arg = expr.args[0] - out = self.from_expr(arg) - if arg.size <= self.NATIVE_INT_MAX_SIZE: - out = "%s(0x%x, %s)" % (expr.op, expr.args[0].size, out) - else: - out = "bignum_%s(%s, %d)" % (expr.op, out, arg.size) - return out - - elif expr.op == '!': - arg = expr.args[0] - out = self.from_expr(arg) - if expr.size <= self.NATIVE_INT_MAX_SIZE: - out = "(~ %s)&%s" % (out, self._size2mask(arg.size)) - else: - out = "bignum_not(%s)" % out - out = "bignum_mask(%s, expr.size)" % out - return out - - elif expr.op in [ - "ftan", "frndint", "f2xm1", "fsin", "fsqrt", "fabs", "fcos", - "fchs", - ]: - return "fpu_%s%d(%s)" % ( - expr.op, - expr.size, - self.from_expr(expr.args[0]), - ) - elif (expr.op.startswith("access_") or - expr.op.startswith("load_") or - expr.op.startswith("fxam_c")): - arg = expr.args[0] - out = self.from_expr(arg) - out = "%s(%s)" % (expr.op, out) - return out - - elif expr.op == "-": - arg = expr.args[0] - out = self.from_expr(arg) - if arg.size <= self.NATIVE_INT_MAX_SIZE: - out = "(%s(%s))" % (expr.op, out) - out = "(%s&%s)" % (out, self._size2mask(arg.size)) - else: - out = "bignum_sub(bignum_from_uint64(0), %s)" % out - out = "bignum_mask(%s, %d)"% (out, expr.size) - return out - - elif expr.op.startswith("fpround_"): - return "%s_fp%d(%s)" % ( - expr.op, - expr.size, - self.from_expr(expr.args[0]), - ) - elif expr.op == "sint_to_fp": - size = expr.size - arg = expr.args[0] - if size not in [32, 64]: - raise RuntimeError( - "Unsupported size for sint_to_fp: %r" % size - ) - return "%s_%d(%s)" % (expr.op, size, self.from_expr(arg)) - elif expr.op.startswith("fp_to_sint"): - dest_size = expr.size - arg_size = expr.args[0].size - if (arg_size, dest_size) in [ - (32, 32), (64, 64), (64, 32), - ]: - func = "fp%d_to_sint%d" % (arg_size, dest_size) - else: - raise RuntimeError( - "Unsupported size for fp_to_sint: %r to %r" % ( - arg_size, - dest_size - )) - return "%s(%s)" % (func, self.from_expr(expr.args[0])) - elif expr.op.startswith("fpconvert_fp"): - dest_size = expr.size - arg_size = expr.args[0].size - if (arg_size, dest_size) in [ - (32, 64), (64, 32) - ]: - func = "fp%d_to_fp%d" % (arg_size, dest_size) - else: - raise RuntimeError( - "Unsupported size for fpconvert: %r to %r" % (arg_size, - dest_size) - ) - return "%s(%s)" % (func, self.from_expr(expr.args[0])) - else: - raise NotImplementedError('Unknown op: %r' % expr.op) - - elif len(expr.args) == 2: - if expr.op in self.dct_shift: - arg0 = self.from_expr(expr.args[0]) - arg1 = self.from_expr(expr.args[1]) - if expr.size <= self.NATIVE_INT_MAX_SIZE: - out = 'SHIFT_%s(%d, %s, %s)' % ( - self.dct_shift[expr.op].upper(), - expr.args[0].size, - arg0, - arg1 - ) - else: - op = { - "<<": "lshift", - ">>": "rshift", - "a>>": "a_rshift" - } - out = "bignum_%s(%s, bignum_to_uint64(%s))" % ( - op[expr.op], arg0, arg1 - ) - out = "bignum_mask(%s, %d)"% (out, expr.size) - return out - - elif expr.is_associative(): - args = [self.from_expr(arg) - for arg in expr.args] - if expr.size <= self.NATIVE_INT_MAX_SIZE: - out = (" %s " % expr.op).join(args) - out = "((%s)&%s)" % (out, self._size2mask(expr.size)) - else: - op_to_bn_func = { - "+": "add", - "*": "mul", - "|": "or", - "^": "xor", - "&": "and", - } - args = list(expr.args) - out = self.from_expr(args.pop()) - while args: - out = 'bignum_mask(bignum_%s(%s, %s), %d)' % ( - op_to_bn_func[expr.op], - out, - self.from_expr(args.pop()), - expr.size - ) - return out - - elif expr.op in ['-']: - return '(((%s&%s) %s (%s&%s))&%s)' % ( - self.from_expr(expr.args[0]), - self._size2mask(expr.args[0].size), - str(expr.op), - self.from_expr(expr.args[1]), - self._size2mask(expr.args[1].size), - self._size2mask(expr.args[0].size) - ) - elif expr.op in self.dct_rot: - arg0 = self.from_expr(expr.args[0]) - arg1 = self.from_expr(expr.args[1]) - if expr.size <= self.NATIVE_INT_MAX_SIZE: - out = '(%s(%s, %s, %s) &%s)' % ( - self.dct_rot[expr.op], - expr.args[0].size, - arg0, - arg1, - self._size2mask(expr.args[0].size), - ) - else: - op = { - ">>>": "ror", - "<<<": "rol" - } - out = "bignum_%s(%s, %d, bignum_to_uint64(%s))" % ( - op[expr.op], arg0, expr.size, arg1 - ) - out = "bignum_mask(%s, %d)"% (out, expr.size) - return out - - elif expr.op == 'x86_cpuid': - return "%s(%s, %s)" % (expr.op, - self.from_expr(expr.args[0]), - self.from_expr(expr.args[1])) - elif expr.op.startswith("fcom"): - arg0 = self.from_expr(expr.args[0]) - arg1 = self.from_expr(expr.args[1]) - if not expr.args[0].size <= self.NATIVE_INT_MAX_SIZE: - raise ValueError("Bad semantic: fpu do operations do not support such size") - out = "fpu_%s(%s, %s)" % (expr.op, arg0, arg1) - return out - - elif expr.op in ["fadd", "fsub", "fdiv", 'fmul', "fscale", - "fprem", "fyl2x", "fpatan"]: - arg0 = self.from_expr(expr.args[0]) - arg1 = self.from_expr(expr.args[1]) - if not expr.args[0].size <= self.NATIVE_INT_MAX_SIZE: - raise ValueError("Bad semantic: fpu do operations do not support such size") - out = "fpu_%s%d(%s, %s)" % (expr.op, expr.size, arg0, arg1) - return out - - elif expr.op == "segm": - return "segm2addr(jitcpu, %s, %s)" % ( - self.from_expr(expr.args[0]), - self.from_expr(expr.args[1]) - ) - - elif expr.op in ['udiv', 'umod']: - arg0 = self.from_expr(expr.args[0]) - arg1 = self.from_expr(expr.args[1]) - - if expr.size <= self.NATIVE_INT_MAX_SIZE: - out = '%s%d(%s, %s)' % ( - expr.op, - expr.args[0].size, - arg0, - arg1 - ) - else: - out = "bignum_%s(%s, %s)" % ( - expr.op, - arg0, - arg1 - ) - out = "bignum_mask(%s, %d)"% (out, expr.size) - return out - - - - elif expr.op in ['sdiv', 'smod']: - arg0 = self.from_expr(expr.args[0]) - arg1 = self.from_expr(expr.args[1]) - - if expr.size <= self.NATIVE_INT_MAX_SIZE: - out = '%s%d(%s, %s)' % ( - expr.op, - expr.args[0].size, - arg0, - arg1 - ) - else: - out = "bignum_%s(%s, %s, %d)" % ( - expr.op, - arg0, - arg1, - expr.size - ) - out = "bignum_mask(%s, %d)"% (out, expr.size) - return out - - elif expr.op in ["bcdadd", "bcdadd_cf"]: - return "%s_%d(%s, %s)" % ( - expr.op, expr.args[0].size, - self.from_expr(expr.args[0]), - self.from_expr(expr.args[1]) - ) - - - elif expr.op in [ - TOK_EQUAL, - TOK_INF_SIGNED, - TOK_INF_UNSIGNED, - TOK_INF_EQUAL_SIGNED, - TOK_INF_EQUAL_UNSIGNED, - ]: - arg0 = self.from_expr(expr.args[0]) - arg1 = self.from_expr(expr.args[1]) - - if expr.size <= self.NATIVE_INT_MAX_SIZE: - op = TOK_CMP_TO_NATIVE_C[expr.op] - if expr.op in [TOK_INF_SIGNED, TOK_INF_EQUAL_SIGNED]: - cast = "(int%d_t)" % expr.args[0].size - else: - cast = "(uint%d_t)" % expr.args[0].size - out = '((%s%s %s %s%s)?1:0)' % ( - cast, - arg0, - op, - cast, - arg1 - ) - else: - op = TOK_CMP_TO_BIGNUM_C[expr.op] - out = "bignum_is_%s(%s, %s)" % ( - op, - arg0, - arg1 - ) - out = "bignum_mask(%s, %d)"% (out, expr.size) - return out - - - else: - raise NotImplementedError('Unknown op: %r' % expr.op) - - elif len(expr.args) >= 3 and expr.is_associative(): # ????? - oper = ['(%s&%s)' % ( - self.from_expr(arg), - self._size2mask(arg.size), - ) - for arg in expr.args] - oper = str(expr.op).join(oper) - return "((%s)&%s)" % ( - oper, - self._size2mask(expr.args[0].size) - ) - else: - raise NotImplementedError('Unknown op: %s' % expr.op) - - def from_ExprSlice(self, expr): - out = self.from_expr(expr.arg) - if expr.arg.size <= self.NATIVE_INT_MAX_SIZE: - # XXX check mask for 64 bit & 32 bit compat - out = "((%s>>%d) &%s)" % ( - out, expr.start, - self._size2mask(expr.stop - expr.start) - ) - else: - out = "bignum_rshift(%s, %d)" % (out, expr.start) - out = "bignum_mask(%s, %d)" % (out, expr.stop - expr.start) - - if expr.size <= self.NATIVE_INT_MAX_SIZE: - # Convert bignum to int - out = "bignum_to_uint64(%s)" % out - return out - - def from_ExprCompose(self, expr): - if expr.size <= self.NATIVE_INT_MAX_SIZE: - - out = [] - # XXX check mask for 64 bit & 32 bit compat - if expr.size in [8, 16, 32, 64, 128]: - size = expr.size - else: - # Uncommon expression size, use at least uint8 - size = max(expr.size, 8) - next_power = 1 - while next_power <= size: - next_power <<= 1 - size = next_power - - dst_cast = "uint%d_t" % size - for index, arg in expr.iter_args(): - out.append("(((%s)(%s & %s)) << %d)" % ( - dst_cast, - self.from_expr(arg), - self._size2mask(arg.size), - index) - ) - out = ' | '.join(out) - return '(' + out + ')' - else: - # Convert all parts to bignum - args = [] - for index, arg in expr.iter_args(): - arg_str = self.from_expr(arg) - if arg.size <= self.NATIVE_INT_MAX_SIZE: - arg_str = '((%s) & %s)' % (arg_str, self._size2mask(arg.size)) - arg_str = 'bignum_from_uint64(%s)' % arg_str - else: - arg_str = 'bignum_mask(%s, %d)' % (arg_str, arg.size) - arg_str = 'bignum_lshift(%s, %d)' % (arg_str, index) - args.append(arg_str) - out = args.pop() - while args: - arg = args.pop() - out = "bignum_or(%s, %s)" % (out, arg) - return out - - -# Register the class -Translator.register(TranslatorC) diff --git a/miasm2/ir/translators/__init__.py b/miasm2/ir/translators/__init__.py deleted file mode 100644 index d3678ffc..00000000 --- a/miasm2/ir/translators/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -"""IR Translators""" -from miasm2.ir.translators.translator import Translator -import miasm2.ir.translators.C -import miasm2.ir.translators.python -import miasm2.ir.translators.miasm -import miasm2.ir.translators.smt2 -try: - import miasm2.ir.translators.z3_ir -except ImportError: - # Nothing to do, z3 not available - pass - -__all__ = ["Translator"] diff --git a/miasm2/ir/translators/miasm.py b/miasm2/ir/translators/miasm.py deleted file mode 100644 index e93e9499..00000000 --- a/miasm2/ir/translators/miasm.py +++ /dev/null @@ -1,45 +0,0 @@ -from builtins import map -from miasm2.ir.translators.translator import Translator - - -class TranslatorMiasm(Translator): - "Translate a Miasm expression to its Python building form" - - __LANG__ = "Miasm" - - def from_ExprId(self, expr): - return "ExprId(%s, size=%d)" % (repr(expr.name), expr.size) - - def from_ExprInt(self, expr): - return "ExprInt(0x%x, %d)" % (int(expr), expr.size) - - def from_ExprCond(self, expr): - return "ExprCond(%s, %s, %s)" % (self.from_expr(expr.cond), - self.from_expr(expr.src1), - self.from_expr(expr.src2)) - - def from_ExprSlice(self, expr): - return "ExprSlice(%s, %d, %d)" % (self.from_expr(expr.arg), - expr.start, - expr.stop) - - def from_ExprOp(self, expr): - return "ExprOp(%s, %s)" % ( - repr(expr.op), - ", ".join(map(self.from_expr, expr.args)) - ) - - def from_ExprCompose(self, expr): - args = ["%s" % self.from_expr(arg) for arg in expr.args] - return "ExprCompose(%s)" % ", ".join(args) - - def from_ExprAssign(self, expr): - return "ExprAssign(%s, %s)" % (self.from_expr(expr.dst), - self.from_expr(expr.src)) - - def from_ExprMem(self, expr): - return "ExprMem(%s, size=%d)" % (self.from_expr(expr.ptr), expr.size) - - -# Register the class -Translator.register(TranslatorMiasm) diff --git a/miasm2/ir/translators/python.py b/miasm2/ir/translators/python.py deleted file mode 100644 index 4b1b4b52..00000000 --- a/miasm2/ir/translators/python.py +++ /dev/null @@ -1,98 +0,0 @@ -from builtins import map -from miasm2.expression.expression import ExprInt -from miasm2.ir.translators.translator import Translator - - -class TranslatorPython(Translator): - """Translate a Miasm expression to an equivalent Python code - - Memory is abstracted using the unimplemented function: - int memory(int address, int size) - """ - - # Implemented language - __LANG__ = "Python" - # Operations translation - op_no_translate = ["+", "-", "/", "%", ">>", "<<", "&", "^", "|", "*"] - - def from_ExprInt(self, expr): - return str(expr) - - def from_ExprId(self, expr): - return str(expr) - - def from_ExprLoc(self, expr): - return str(expr) - - def from_ExprMem(self, expr): - return "memory(%s, 0x%x)" % ( - self.from_expr(expr.ptr), - expr.size // 8 - ) - - def from_ExprSlice(self, expr): - out = self.from_expr(expr.arg) - if expr.start != 0: - out = "(%s >> %d)" % (out, expr.start) - return "(%s & 0x%x)" % (out, (1 << (expr.stop - expr.start)) - 1) - - def from_ExprCompose(self, expr): - out = [] - for index, arg in expr.iter_args(): - out.append( - "((%s & 0x%x) << %d)" % ( - self.from_expr(arg), - (1 << arg.size) - 1, - index - ) - ) - return "(%s)" % ' | '.join(out) - - def from_ExprCond(self, expr): - return "(%s if (%s) else %s)" % ( - self.from_expr(expr.src1), - self.from_expr(expr.cond), - self.from_expr(expr.src2) - ) - - def from_ExprOp(self, expr): - if expr.op in self.op_no_translate: - args = list(map(self.from_expr, expr.args)) - if len(expr.args) == 1: - return "((%s %s) & 0x%x)" % ( - expr.op, - args[0], - (1 << expr.size) - 1 - ) - else: - return "((%s) & 0x%x)" % ( - (" %s " % expr.op).join(args), - (1 << expr.size) - 1 - ) - elif expr.op == "parity": - return "(%s & 0x1)" % self.from_expr(expr.args[0]) - - elif expr.op in ["<<<", ">>>"]: - amount_raw = expr.args[1] - amount = expr.args[1] % ExprInt(amount_raw.size, expr.size) - amount_inv = ExprInt(expr.size, expr.size) - amount - if expr.op == "<<<": - amount, amount_inv = amount_inv, amount - part1 = "(%s >> %s)"% (self.from_expr(expr.args[0]), - self.from_expr(amount)) - part2 = "(%s << %s)"% (self.from_expr(expr.args[0]), - self.from_expr(amount_inv)) - - return "((%s | %s) &0x%x)" % (part1, part2, int(expr.mask)) - - raise NotImplementedError("Unknown operator: %s" % expr.op) - - def from_ExprAssign(self, expr): - return "%s = %s" % ( - self.from_expr(expr.dst), - self.from_expr(expr.src) - ) - - -# Register the class -Translator.register(TranslatorPython) diff --git a/miasm2/ir/translators/smt2.py b/miasm2/ir/translators/smt2.py deleted file mode 100644 index 7b619457..00000000 --- a/miasm2/ir/translators/smt2.py +++ /dev/null @@ -1,326 +0,0 @@ -from builtins import map -from builtins import range -import logging - -from miasm2.ir.translators.translator import Translator -from miasm2.expression.smt2_helper import * - -log = logging.getLogger("translator_smt2") -console_handler = logging.StreamHandler() -console_handler.setFormatter(logging.Formatter("%(levelname)-5s: %(message)s")) -log.addHandler(console_handler) -log.setLevel(logging.WARNING) - -class SMT2Mem(object): - """ - Memory abstraction for TranslatorSMT2. Memory elements are only accessed, - never written. To give a concrete value for a given memory cell in a solver, - add "mem32.get(address, size) == <value>" constraints to your equation. - The endianness of memory accesses is handled accordingly to the "endianness" - attribute. - Note: Will have one memory space for each addressing size used. - For example, if memory is accessed via 32 bits values and 16 bits values, - these access will not occur in the same address space. - - Adapted from Z3Mem - """ - - def __init__(self, endianness="<", name="mem"): - """Initializes an SMT2Mem object with a given @name and @endianness. - @endianness: Endianness of memory representation. '<' for little endian, - '>' for big endian. - @name: name of memory Arrays generated. They will be named - name+str(address size) (for example mem32, mem16...). - """ - if endianness not in ['<', '>']: - raise ValueError("Endianness should be '>' (big) or '<' (little)") - self.endianness = endianness - self.mems = {} # Address size -> SMT2 memory array - self.name = name - # initialise address size - self.addr_size = 0 - - def get_mem_array(self, size): - """Returns an SMT Array used internally to represent memory for addresses - of size @size. - @size: integer, size in bit of addresses in the memory to get. - Return an string with the name of the SMT array.. - """ - try: - mem = self.mems[size] - except KeyError: - # Lazy instantiation - self.mems[size] = self.name + str(size) - mem = self.mems[size] - return mem - - def __getitem__(self, addr): - """One byte memory access. Different address sizes with the same value - will result in different memory accesses. - @addr: an SMT2 expression, the address to read. - Return an SMT2 expression of size 8 bits representing a memory access. - """ - size = self.addr_size - mem = self.get_mem_array(size) - return array_select(mem, addr) - - def get(self, addr, size, addr_size): - """ Memory access at address @addr of size @size with - address size @addr_size. - @addr: an SMT2 expression, the address to read. - @size: int, size of the read in bits. - @addr_size: int, size of the address - Return a SMT2 expression representing a memory access. - """ - # set address size per read access - self.addr_size = addr_size - - original_size = size - if original_size % 8 != 0: - # Size not aligned on 8bits -> read more than size and extract after - size = ((original_size // 8) + 1) * 8 - res = self[addr] - if self.is_little_endian(): - for i in range(1, size // 8): - index = bvadd(addr, bit_vec_val(i, addr_size)) - res = bv_concat(self[index], res) - else: - for i in range(1, size // 8): - res = bv_concat(res, self[index]) - if size == original_size: - return res - else: - # Size not aligned, extract right sized result - return bv_extract(original_size-1, 0, res) - - def is_little_endian(self): - """True if this memory is little endian.""" - return self.endianness == "<" - - def is_big_endian(self): - """True if this memory is big endian.""" - return not self.is_little_endian() - - -class TranslatorSMT2(Translator): - """Translate a Miasm expression into an equivalent SMT2 - expression. Memory is abstracted via SMT2Mem. - The result of from_expr will be an SMT2 expression. - - If you want to interact with the memory abstraction after the translation, - you can instantiate your own SMT2Mem that will be equivalent to the one - used by TranslatorSMT2. - - TranslatorSMT2 provides the creation of a valid SMT2 file. For this, - it keeps track of the translated bit vectors. - - Adapted from TranslatorZ3 - """ - - # Implemented language - __LANG__ = "smt2" - - def __init__(self, endianness="<", loc_db=None, **kwargs): - """Instance a SMT2 translator - @endianness: (optional) memory endianness - """ - super(TranslatorSMT2, self).__init__(**kwargs) - # memory abstraction - self._mem = SMT2Mem(endianness) - # map of translated bit vectors - self._bitvectors = dict() - # symbol pool - self.loc_db = loc_db - - def from_ExprInt(self, expr): - return bit_vec_val(expr.arg.arg, expr.size) - - def from_ExprId(self, expr): - if str(expr) not in self._bitvectors: - self._bitvectors[str(expr)] = expr.size - return str(expr) - - def from_ExprLoc(self, expr): - loc_key = expr.loc_key - if self.loc_db is None or self.loc_db.get_location_offset(loc_key) is None: - if str(loc_key) not in self._bitvectors: - self._bitvectors[str(loc_key)] = expr.size - return str(loc_key) - - offset = self.loc_db.get_location_offset(loc_key) - return bit_vec_val(str(offset), expr.size) - - def from_ExprMem(self, expr): - addr = self.from_expr(expr.ptr) - # size to read from memory - size = expr.size - # size of memory address - addr_size = expr.ptr.size - return self._mem.get(addr, size, addr_size) - - def from_ExprSlice(self, expr): - res = self.from_expr(expr.arg) - res = bv_extract(expr.stop-1, expr.start, res) - return res - - def from_ExprCompose(self, expr): - res = None - for arg in expr.args: - e = bv_extract(arg.size-1, 0, self.from_expr(arg)) - if res: - res = bv_concat(e, res) - else: - res = e - return res - - def from_ExprCond(self, expr): - cond = self.from_expr(expr.cond) - src1 = self.from_expr(expr.src1) - src2 = self.from_expr(expr.src2) - - # (and (distinct cond (_ bv0 <size>)) true) - zero = bit_vec_val(0, expr.cond.size) - distinct = smt2_distinct(cond, zero) - distinct_and = smt2_and(distinct, "true") - - # (ite ((and (distinct cond (_ bv0 <size>)) true) src1 src2)) - return smt2_ite(distinct_and, src1, src2) - - def from_ExprOp(self, expr): - args = list(map(self.from_expr, expr.args)) - res = args[0] - - if len(args) > 1: - for arg in args[1:]: - if expr.op == "+": - res = bvadd(res, arg) - elif expr.op == "-": - res = bvsub(res, arg) - elif expr.op == "*": - res = bvmul(res, arg) - elif expr.op == "/": - res = bvsdiv(res, arg) - elif expr.op == "sdiv": - res = bvsdiv(res, arg) - elif expr.op == "udiv": - res = bvudiv(res, arg) - elif expr.op == "%": - res = bvsmod(res, arg) - elif expr.op == "smod": - res = bvsmod(res, arg) - elif expr.op == "umod": - res = bvurem(res, arg) - elif expr.op == "&": - res = bvand(res, arg) - elif expr.op == "^": - res = bvxor(res, arg) - elif expr.op == "|": - res = bvor(res, arg) - elif expr.op == "<<": - res = bvshl(res, arg) - elif expr.op == ">>": - res = bvlshr(res, arg) - elif expr.op == "a>>": - res = bvashr(res, arg) - elif expr.op == "<<<": - res = bv_rotate_left(res, arg, expr.size) - elif expr.op == ">>>": - res = bv_rotate_right(res, arg, expr.size) - else: - raise NotImplementedError("Unsupported OP yet: %s" % expr.op) - elif expr.op == 'parity': - arg = bv_extract(7, 0, res) - res = bit_vec_val(1, 1) - for i in range(8): - res = bvxor(res, bv_extract(i, i, arg)) - elif expr.op == '-': - res = bvneg(res) - elif expr.op == "cnttrailzeros": - src = res - size = expr.size - size_smt2 = bit_vec_val(size, size) - one_smt2 = bit_vec_val(1, size) - zero_smt2 = bit_vec_val(0, size) - # src & (1 << (size - 1)) - op = bvand(src, bvshl(one_smt2, bvsub(size_smt2, one_smt2))) - # op != 0 - cond = smt2_distinct(op, zero_smt2) - # ite(cond, size - 1, src) - res = smt2_ite(cond, bvsub(size_smt2, one_smt2), src) - for i in range(size - 2, -1, -1): - # smt2 expression of i - i_smt2 = bit_vec_val(i, size) - # src & (1 << i) - op = bvand(src, bvshl(one_smt2, i_smt2)) - # op != 0 - cond = smt2_distinct(op, zero_smt2) - # ite(cond, i, res) - res = smt2_ite(cond, i_smt2, res) - elif expr.op == "cntleadzeros": - src = res - size = expr.size - one_smt2 = bit_vec_val(1, size) - zero_smt2 = bit_vec_val(0, size) - # (src & 1) != 0 - cond = smt2_distinct(bvand(src, one_smt2), zero_smt2) - # ite(cond, 0, src) - res= smt2_ite(cond, zero_smt2, src) - for i in range(size - 1, 0, -1): - index = - i % size - index_smt2 = bit_vec_val(index, size) - # src & (1 << index) - op = bvand(src, bvshl(one_smt2, index_smt2)) - # op != 0 - cond = smt2_distinct(op, zero_smt2) - # ite(cond, index, res) - value_smt2 = bit_vec_val(size - (index + 1), size) - res = smt2_ite(cond, value_smt2, res) - else: - raise NotImplementedError("Unsupported OP yet: %s" % expr.op) - - return res - - def from_ExprAssign(self, expr): - src = self.from_expr(expr.src) - dst = self.from_expr(expr.dst) - return smt2_assert(smt2_eq(src, dst)) - - def to_smt2(self, exprs, logic="QF_ABV", model=False): - """ - Converts a valid SMT2 file for a given list of - SMT2 expressions. - - :param exprs: list of SMT2 expressions - :param logic: SMT2 logic - :param model: model generation flag - :return: String of the SMT2 file - """ - ret = "" - ret += "(set-logic {})\n".format(logic) - - # define bit vectors - for bv in self._bitvectors: - size = self._bitvectors[bv] - ret += "{}\n".format(declare_bv(bv, size)) - - # define memory arrays - for size in self._mem.mems: - mem = self._mem.mems[size] - ret += "{}\n".format(declare_array(mem, bit_vec(size), bit_vec(8))) - - # merge SMT2 expressions - for expr in exprs: - ret += expr + "\n" - - # define action - ret += "(check-sat)\n" - - # enable model generation - if model: - ret += "(get-model)\n" - - return ret - - -# Register the class -Translator.register(TranslatorSMT2) diff --git a/miasm2/ir/translators/translator.py b/miasm2/ir/translators/translator.py deleted file mode 100644 index 65875072..00000000 --- a/miasm2/ir/translators/translator.py +++ /dev/null @@ -1,127 +0,0 @@ -from future.utils import viewitems - -import miasm2.expression.expression as m2_expr -from miasm2.core.utils import BoundedDict - - -class Translator(object): - "Abstract parent class for translators." - - # Registered translators - available_translators = [] - # Implemented language - __LANG__ = "" - - @classmethod - def register(cls, translator): - """Register a translator - @translator: Translator sub-class - """ - cls.available_translators.append(translator) - - @classmethod - def to_language(cls, target_lang, *args, **kwargs): - """Return the corresponding translator instance - @target_lang: str (case insensitive) wanted language - Raise a NotImplementedError in case of unmatched language - """ - target_lang = target_lang.lower() - for translator in cls.available_translators: - if translator.__LANG__.lower() == target_lang: - return translator(*args, **kwargs) - - raise NotImplementedError("Unknown target language: %s" % target_lang) - - @classmethod - def available_languages(cls): - "Return the list of registered languages" - return [translator.__LANG__ for translator in cls.available_translators] - - def __init__(self, cache_size=1000): - """Instance a translator - @cache_size: (optional) Expr cache size - """ - self._cache = BoundedDict(cache_size) - - def from_ExprInt(self, expr): - """Translate an ExprInt - @expr: ExprInt to translate - """ - raise NotImplementedError("Abstract method") - - def from_ExprId(self, expr): - """Translate an ExprId - @expr: ExprId to translate - """ - raise NotImplementedError("Abstract method") - - def from_ExprLoc(self, expr): - """Translate an ExprLoc - @expr: ExprLoc to translate - """ - raise NotImplementedError("Abstract method") - - def from_ExprCompose(self, expr): - """Translate an ExprCompose - @expr: ExprCompose to translate - """ - raise NotImplementedError("Abstract method") - - def from_ExprSlice(self, expr): - """Translate an ExprSlice - @expr: ExprSlice to translate - """ - raise NotImplementedError("Abstract method") - - def from_ExprOp(self, expr): - """Translate an ExprOp - @expr: ExprOp to translate - """ - raise NotImplementedError("Abstract method") - - def from_ExprMem(self, expr): - """Translate an ExprMem - @expr: ExprMem to translate - """ - raise NotImplementedError("Abstract method") - - def from_ExprAssign(self, expr): - """Translate an ExprAssign - @expr: ExprAssign to translate - """ - raise NotImplementedError("Abstract method") - - def from_ExprCond(self, expr): - """Translate an ExprCond - @expr: ExprCond to translate - """ - raise NotImplementedError("Abstract method") - - def from_expr(self, expr): - """Translate an expression according to its type - @expr: expression to translate - """ - # Use cache - if expr in self._cache: - return self._cache[expr] - - # Handle Expr type - handlers = { - m2_expr.ExprInt: self.from_ExprInt, - m2_expr.ExprId: self.from_ExprId, - m2_expr.ExprLoc: self.from_ExprLoc, - m2_expr.ExprCompose: self.from_ExprCompose, - m2_expr.ExprSlice: self.from_ExprSlice, - m2_expr.ExprOp: self.from_ExprOp, - m2_expr.ExprMem: self.from_ExprMem, - m2_expr.ExprAssign: self.from_ExprAssign, - m2_expr.ExprCond: self.from_ExprCond - } - for target, handler in viewitems(handlers): - if isinstance(expr, target): - ## Compute value and update the internal cache - ret = handler(expr) - self._cache[expr] = ret - return ret - raise ValueError("Unhandled type for %s" % expr) - diff --git a/miasm2/ir/translators/z3_ir.py b/miasm2/ir/translators/z3_ir.py deleted file mode 100644 index 902e72bd..00000000 --- a/miasm2/ir/translators/z3_ir.py +++ /dev/null @@ -1,281 +0,0 @@ -from builtins import map -from builtins import range -import imp -import logging - -# Raise an ImportError if z3 is not available WITHOUT actually importing it -imp.find_module("z3") - -from miasm2.ir.translators.translator import Translator - -log = logging.getLogger("translator_z3") -console_handler = logging.StreamHandler() -console_handler.setFormatter(logging.Formatter("%(levelname)-5s: %(message)s")) -log.addHandler(console_handler) -log.setLevel(logging.WARNING) - -class Z3Mem(object): - """Memory abstration for TranslatorZ3. Memory elements are only accessed, - never written. To give a concrete value for a given memory cell in a solver, - add "mem32.get(address, size) == <value>" constraints to your equation. - The endianness of memory accesses is handled accordingly to the "endianness" - attribute. - - Note: Will have one memory space for each addressing size used. - For example, if memory is accessed via 32 bits values and 16 bits values, - these access will not occur in the same address space. - """ - - def __init__(self, endianness="<", name="mem"): - """Initializes a Z3Mem object with a given @name and @endianness. - @endianness: Endianness of memory representation. '<' for little endian, - '>' for big endian. - @name: name of memory Arrays generated. They will be named - name+str(address size) (for example mem32, mem16...). - """ - # Import z3 only on demand - global z3 - import z3 - - if endianness not in ['<', '>']: - raise ValueError("Endianness should be '>' (big) or '<' (little)") - self.endianness = endianness - self.mems = {} # Address size -> memory z3.Array - self.name = name - - def get_mem_array(self, size): - """Returns a z3 Array used internally to represent memory for addresses - of size @size. - @size: integer, size in bit of addresses in the memory to get. - Return a z3 Array: BitVecSort(size) -> BitVecSort(8). - """ - try: - mem = self.mems[size] - except KeyError: - # Lazy instantiation - self.mems[size] = z3.Array(self.name + str(size), - z3.BitVecSort(size), - z3.BitVecSort(8)) - mem = self.mems[size] - return mem - - def __getitem__(self, addr): - """One byte memory access. Different address sizes with the same value - will result in different memory accesses. - @addr: a z3 BitVec, the address to read. - Return a z3 BitVec of size 8 bits representing a memory access. - """ - size = addr.size() - mem = self.get_mem_array(size) - return mem[addr] - - def get(self, addr, size): - """ Memory access at address @addr of size @size. - @addr: a z3 BitVec, the address to read. - @size: int, size of the read in bits. - Return a z3 BitVec of size @size representing a memory access. - """ - original_size = size - if original_size % 8 != 0: - # Size not aligned on 8bits -> read more than size and extract after - size = ((original_size // 8) + 1) * 8 - res = self[addr] - if self.is_little_endian(): - for i in range(1, size // 8): - res = z3.Concat(self[addr+i], res) - else: - for i in range(1, size //8): - res = z3.Concat(res, self[addr+i]) - if size == original_size: - return res - else: - # Size not aligned, extract right sized result - return z3.Extract(original_size-1, 0, res) - - def is_little_endian(self): - """True if this memory is little endian.""" - return self.endianness == "<" - - def is_big_endian(self): - """True if this memory is big endian.""" - return not self.is_little_endian() - - -class TranslatorZ3(Translator): - """Translate a Miasm expression to an equivalent z3 python binding - expression. Memory is abstracted via z3.Array (see Z3Mem). - The result of from_expr will be a z3 Expr. - - If you want to interact with the memory abstraction after the translation, - you can instantiate your own Z3Mem, that will be equivalent to the one - used by TranslatorZ3. - """ - - # Implemented language - __LANG__ = "z3" - # Operations translation - trivial_ops = ["+", "-", "/", "%", "&", "^", "|", "*", "<<"] - - def __init__(self, endianness="<", loc_db=None, **kwargs): - """Instance a Z3 translator - @endianness: (optional) memory endianness - """ - # Import z3 only on demand - global z3 - import z3 - - super(TranslatorZ3, self).__init__(**kwargs) - self._mem = Z3Mem(endianness) - self.loc_db = loc_db - - def from_ExprInt(self, expr): - return z3.BitVecVal(expr.arg.arg, expr.size) - - def from_ExprId(self, expr): - return z3.BitVec(str(expr), expr.size) - - def from_ExprLoc(self, expr): - if self.loc_db is None: - # No loc_db, fallback to default name - return z3.BitVec(str(expr), expr.size) - loc_key = expr.loc_key - offset = self.loc_db.get_location_offset(loc_key) - if offset is not None: - return z3.BitVecVal(offset, expr.size) - # fallback to default name - return z3.BitVec(str(loc_key), expr.size) - - def from_ExprMem(self, expr): - addr = self.from_expr(expr.ptr) - return self._mem.get(addr, expr.size) - - def from_ExprSlice(self, expr): - res = self.from_expr(expr.arg) - res = z3.Extract(expr.stop-1, expr.start, res) - return res - - def from_ExprCompose(self, expr): - res = None - for arg in expr.args: - e = z3.Extract(arg.size-1, 0, self.from_expr(arg)) - if res != None: - res = z3.Concat(e, res) - else: - res = e - return res - - def from_ExprCond(self, expr): - cond = self.from_expr(expr.cond) - src1 = self.from_expr(expr.src1) - src2 = self.from_expr(expr.src2) - return z3.If(cond != 0, src1, src2) - - def _abs(self, z3_value): - return z3.If(z3_value >= 0,z3_value,-z3_value) - - def _sdivC(self, num, den): - """Divide (signed) @num by @den (z3 values) as C would - See modint.__div__ for implementation choice - """ - result_sign = z3.If(num * den >= 0, - z3.BitVecVal(1, num.size()), - z3.BitVecVal(-1, num.size()), - ) - return z3.UDiv(self._abs(num), self._abs(den)) * result_sign - - def from_ExprOp(self, expr): - args = list(map(self.from_expr, expr.args)) - res = args[0] - - if len(args) > 1: - for arg in args[1:]: - if expr.op in self.trivial_ops: - res = eval("res %s arg" % expr.op) - elif expr.op == ">>": - res = z3.LShR(res, arg) - elif expr.op == "a>>": - res = res >> arg - elif expr.op == "<<<": - res = z3.RotateLeft(res, arg) - elif expr.op == ">>>": - res = z3.RotateRight(res, arg) - elif expr.op == "sdiv": - res = self._sdivC(res, arg) - elif expr.op == "udiv": - res = z3.UDiv(res, arg) - elif expr.op == "smod": - res = res - (arg * (self._sdivC(res, arg))) - elif expr.op == "umod": - res = z3.URem(res, arg) - elif expr.op == "==": - res = z3.If( - args[0] == args[1], - z3.BitVecVal(1, 1), - z3.BitVecVal(0, 1) - ) - elif expr.op == "<u": - res = z3.If( - z3.ULT(args[0], args[1]), - z3.BitVecVal(1, 1), - z3.BitVecVal(0, 1) - ) - elif expr.op == "<s": - res = z3.If( - args[0] < args[1], - z3.BitVecVal(1, 1), - z3.BitVecVal(0, 1) - ) - elif expr.op == "<=u": - res = z3.If( - z3.ULE(args[0], args[1]), - z3.BitVecVal(1, 1), - z3.BitVecVal(0, 1) - ) - elif expr.op == "<=s": - res = z3.If( - args[0] <= args[1], - z3.BitVecVal(1, 1), - z3.BitVecVal(0, 1) - ) - else: - raise NotImplementedError("Unsupported OP yet: %s" % expr.op) - elif expr.op == 'parity': - arg = z3.Extract(7, 0, res) - res = z3.BitVecVal(1, 1) - for i in range(8): - res = res ^ z3.Extract(i, i, arg) - elif expr.op == '-': - res = -res - elif expr.op == "cnttrailzeros": - size = expr.size - src = res - res = z3.If(src == 0, size, src) - for i in range(size - 1, -1, -1): - res = z3.If((src & (1 << i)) != 0, i, res) - elif expr.op == "cntleadzeros": - size = expr.size - src = res - res = z3.If(src == 0, size, src) - for i in range(size, 0, -1): - index = - i % size - out = size - (index + 1) - res = z3.If((src & (1 << index)) != 0, out, res) - elif expr.op.startswith("zeroExt"): - arg, = expr.args - res = z3.ZeroExt(expr.size - arg.size, self.from_expr(arg)) - elif expr.op.startswith("signExt"): - arg, = expr.args - res = z3.SignExt(expr.size - arg.size, self.from_expr(arg)) - else: - raise NotImplementedError("Unsupported OP yet: %s" % expr.op) - - return res - - def from_ExprAssign(self, expr): - src = self.from_expr(expr.src) - dst = self.from_expr(expr.dst) - return (src == dst) - - -# Register the class -Translator.register(TranslatorZ3) |