diff options
Diffstat (limited to 'miasm2/core/asmblock.py')
| -rw-r--r-- | miasm2/core/asmblock.py | 169 |
1 files changed, 88 insertions, 81 deletions
diff --git a/miasm2/core/asmblock.py b/miasm2/core/asmblock.py index 2829737e..1aa057bd 100644 --- a/miasm2/core/asmblock.py +++ b/miasm2/core/asmblock.py @@ -36,7 +36,7 @@ class AsmRaw(object): def __str__(self): return repr(self.raw) - def to_string(self, symbol_pool): + def to_string(self, loc_db): return str(self) @@ -68,13 +68,13 @@ class AsmConstraint(object): label = property(get_label, set_label) - def to_string(self, symbol_pool=None): - if symbol_pool is None: + def to_string(self, loc_db=None): + if loc_db is None: return "%s:%s" % (self.c_t, self.loc_key) else: return "%s:%s" % ( self.c_t, - symbol_pool.str_loc_key(self.loc_key) + loc_db.str_loc_key(self.loc_key) ) def __str__(self): @@ -137,22 +137,22 @@ class AsmBlock(object): label = property(get_label) - def to_string(self, symbol_pool=None): + def to_string(self, loc_db=None): out = [] - if symbol_pool is None: + if loc_db is None: out.append(str(self.loc_key)) else: - out.append(symbol_pool.str_loc_key(self.loc_key)) + out.append(loc_db.str_loc_key(self.loc_key)) for instr in self.lines: - out.append(instr.to_string(symbol_pool)) + out.append(instr.to_string(loc_db)) if self.bto: lbls = ["->"] for dst in self.bto: if dst is None: lbls.append("Unknown? ") else: - lbls.append(dst.to_string(symbol_pool) + " ") + lbls.append(dst.to_string(loc_db) + " ") lbls = '\t'.join(lbls) out.append(lbls) return '\n'.join(out) @@ -167,12 +167,12 @@ class AsmBlock(object): assert isinstance(self.bto, set) self.bto.add(c) - def split(self, symbol_pool, offset): - loc_key = symbol_pool.getby_offset_create(offset) + def split(self, loc_db, offset): + loc_key = loc_db.getby_offset_create(offset) log_asmblock.debug('split at %x', offset) i = -1 offsets = [x.offset for x in self.lines] - offset = symbol_pool.loc_key_to_offset(loc_key) + offset = loc_db.loc_key_to_offset(loc_key) if offset not in offsets: log_asmblock.warning( 'cannot split bloc at %X ' % offset + @@ -376,7 +376,7 @@ class AsmCFG(DiGraph): AsmCFGPending = namedtuple("AsmCFGPending", ["waiter", "constraint"]) - def __init__(self, symbol_pool=None, *args, **kwargs): + def __init__(self, loc_db=None, *args, **kwargs): super(AsmCFG, self).__init__(*args, **kwargs) # Edges -> constraint self.edges2constraint = {} @@ -384,13 +384,13 @@ class AsmCFG(DiGraph): self._pendings = {} # Loc_Key2block built on the fly self._loc_key_to_block = {} - # symbol_pool - self.symbol_pool = symbol_pool + # loc_db + self.loc_db = loc_db def copy(self): """Copy the current graph instance""" - graph = self.__class__(self.symbol_pool) + graph = self.__class__(self.loc_db) return graph + self @@ -537,10 +537,10 @@ class AsmCFG(DiGraph): def node2lines(self, node): - if self.symbol_pool is None: + if self.loc_db is None: loc_key_name = str(node) else: - loc_key_name = self.symbol_pool.str_loc_key(node) + loc_key_name = self.loc_db.str_loc_key(node) yield self.DotCellDescription(text=loc_key_name, attr={'align': 'center', 'colspan': 2, @@ -561,9 +561,9 @@ class AsmCFG(DiGraph): if self._dot_offset: yield [self.DotCellDescription(text="%.8X" % line.offset, attr={}), - self.DotCellDescription(text=line.to_string(self.symbol_pool), attr={})] + self.DotCellDescription(text=line.to_string(self.loc_db), attr={})] else: - yield self.DotCellDescription(text=line.to_string(self.symbol_pool), attr={}) + yield self.DotCellDescription(text=line.to_string(self.loc_db), attr={}) def node_attr(self, node): block = self._loc_key_to_block.get(node, None) @@ -762,13 +762,13 @@ class AsmCFG(DiGraph): block.max_size = size log_asmblock.info("size: %d max: %d", block.size, block.max_size) - def apply_splitting(self, symbol_pool, dis_block_callback=None, **kwargs): + def apply_splitting(self, loc_db, dis_block_callback=None, **kwargs): """Consider @self' bto destinations and split block in @self if one of these destinations jumps in the middle of this block. In order to work, they must be only one block in @self per loc_key in - @symbol_pool (which is true if @self come from the same disasmEngine). + @loc_db (which is true if @self come from the same disasmEngine). - @symbol_pool: AsmSymbolPool instance associated with @self'loc_keys + @loc_db: LocationDB instance associated with @self'loc_keys @dis_block_callback: (optional) if set, this callback will be called on new block destinations @kwargs: (optional) named arguments to pass to dis_block_callback @@ -777,7 +777,7 @@ class AsmCFG(DiGraph): # offset block_dst = [] for loc_key in self.pendings: - offset = symbol_pool.loc_key_to_offset(loc_key) + offset = loc_db.loc_key_to_offset(loc_key) if offset is not None: block_dst.append(offset) @@ -793,8 +793,9 @@ class AsmCFG(DiGraph): if not (off > range_start and off < range_stop): continue - # `cur_block` must be splitted at offset `off` - new_b = cur_block.split(symbol_pool, off) + # `cur_block` must be splitted at offset `off`from miasm2.core.locationdb import LocationDB + + new_b = cur_block.split(loc_db, off) log_asmblock.debug("Split block %x", off) if new_b is None: log_asmblock.error("Cannot split %x!!", off) @@ -811,12 +812,12 @@ class AsmCFG(DiGraph): # The new block destinations may need to be disassembled if dis_block_callback: offsets_to_dis = set( - self.symbol_pool.loc_key_to_offset(constraint.loc_key) + self.loc_db.loc_key_to_offset(constraint.loc_key) for constraint in new_b.bto ) dis_block_callback(cur_bloc=new_b, offsets_to_dis=offsets_to_dis, - symbol_pool=symbol_pool, **kwargs) + loc_db=loc_db, **kwargs) # Update structure rebuild_needed = True @@ -929,16 +930,16 @@ def fix_expr_val(expr, symbols): return result -def fix_loc_offset(symbol_pool, loc_key, offset, modified): +def fix_loc_offset(loc_db, loc_key, offset, modified): """ Fix the @loc_key offset to @offset. If the @offset has changed, add @loc_key to @modified - @symbol_pool: current symbol_pool + @loc_db: current loc_db """ - loc_offset = symbol_pool.loc_key_to_offset(loc_key) + loc_offset = loc_db.loc_key_to_offset(loc_key) if loc_offset == offset: return - symbol_pool.set_offset(loc_key, offset) + loc_db.set_offset(loc_key, offset) modified.add(loc_key) @@ -946,8 +947,8 @@ class BlockChain(object): """Manage blocks linked with an asm_constraint_next""" - def __init__(self, symbol_pool, blocks): - self.symbol_pool = symbol_pool + def __init__(self, loc_db, blocks): + self.loc_db = loc_db self.blocks = blocks self.place() @@ -960,7 +961,7 @@ class BlockChain(object): self.pinned_block_idx = None for i, block in enumerate(self.blocks): loc_key = block.loc_key - if self.symbol_pool.loc_key_to_offset(loc_key) is not None: + if self.loc_db.loc_key_to_offset(loc_key) is not None: if self.pinned_block_idx is not None: raise ValueError("Multiples pinned block detected") self.pinned_block_idx = i @@ -979,7 +980,7 @@ class BlockChain(object): return loc = self.blocks[self.pinned_block_idx].loc_key - offset_base = self.symbol_pool.loc_key_to_offset(loc) + offset_base = self.loc_db.loc_key_to_offset(loc) assert(offset_base % self.blocks[self.pinned_block_idx].alignment == 0) self.offset_min = offset_base @@ -1008,25 +1009,25 @@ class BlockChain(object): # Propagate offset to blocks before pinned block pinned_block = self.blocks[self.pinned_block_idx] - offset = self.symbol_pool.loc_key_to_offset(pinned_block.loc_key) + offset = self.loc_db.loc_key_to_offset(pinned_block.loc_key) if offset % pinned_block.alignment != 0: raise RuntimeError('Bad alignment') for block in self.blocks[:self.pinned_block_idx - 1:-1]: new_offset = offset - block.size new_offset = new_offset - new_offset % pinned_block.alignment - fix_loc_offset(self.symbol_pool, + fix_loc_offset(self.loc_db, block.loc_key, new_offset, modified_loc_keys) # Propagate offset to blocks after pinned block - offset = self.symbol_pool.loc_key_to_offset(pinned_block.loc_key) + pinned_block.size + offset = self.loc_db.loc_key_to_offset(pinned_block.loc_key) + pinned_block.size last_block = pinned_block for block in self.blocks[self.pinned_block_idx + 1:]: offset += (- offset) % last_block.alignment - fix_loc_offset(self.symbol_pool, + fix_loc_offset(self.loc_db, block.loc_key, offset, modified_loc_keys) @@ -1039,8 +1040,8 @@ class BlockChainWedge(object): """Stand for wedges between blocks""" - def __init__(self, symbol_pool, offset, size): - self.symbol_pool = symbol_pool + def __init__(self, loc_db, offset, size): + self.loc_db = loc_db self.offset = offset self.max_size = size self.offset_min = offset @@ -1049,12 +1050,12 @@ class BlockChainWedge(object): def merge(self, chain): """Best effort merge two block chains Return the list of resulting blockchains""" - self.symbol_pool.set_offset(chain.blocks[0].loc_key, self.offset_max) + self.loc_db.set_offset(chain.blocks[0].loc_key, self.offset_max) chain.place() return [self, chain] -def group_constrained_blocks(symbol_pool, asmcfg): +def group_constrained_blocks(loc_db, asmcfg): """ Return the BlockChains list built from grouped blocks in asmcfg linked by asm_constraint_next @@ -1093,7 +1094,7 @@ def group_constrained_blocks(symbol_pool, asmcfg): out_block_chains = [] for loc_key in known_block_chains: - chain = BlockChain(symbol_pool, known_block_chains[loc_key]) + chain = BlockChain(loc_db, known_block_chains[loc_key]) out_block_chains.append(chain) return out_block_chains @@ -1113,7 +1114,7 @@ def get_blockchains_address_interval(blockChains, dst_interval): return allocated_interval -def resolve_symbol(blockChains, symbol_pool, dst_interval=None): +def resolve_symbol(blockChains, loc_db, dst_interval=None): """Place @blockChains in the @dst_interval""" log_asmblock.info('resolve_symbol') @@ -1131,7 +1132,7 @@ def resolve_symbol(blockChains, symbol_pool, dst_interval=None): # Add wedge in forbidden intervals for start, stop in forbidden_interval.intervals: wedge = BlockChainWedge( - symbol_pool, offset=start, size=stop + 1 - start) + loc_db, offset=start, size=stop + 1 - start) pinned_chains.append(wedge) # Try to place bigger blockChains first @@ -1174,8 +1175,8 @@ def get_block_loc_keys(block): return symbols -def assemble_block(mnemo, block, symbol_pool, conservative=False): - """Assemble a @block using @symbol_pool +def assemble_block(mnemo, block, loc_db, conservative=False): + """Assemble a @block using @loc_db @conservative: (optional) use original bytes when possible """ offset_i = 0 @@ -1186,7 +1187,7 @@ def assemble_block(mnemo, block, symbol_pool, conservative=False): # Fix special AsmRaw data = "" for expr in instr.raw: - expr_int = fix_expr_val(expr, symbol_pool) + expr_int = fix_expr_val(expr, loc_db) data += pck[expr_int.size](expr_int.arg) instr.data = data @@ -1196,16 +1197,16 @@ def assemble_block(mnemo, block, symbol_pool, conservative=False): # Assemble an instruction saved_args = list(instr.args) - instr.offset = symbol_pool.loc_key_to_offset(block.loc_key) + offset_i + instr.offset = loc_db.loc_key_to_offset(block.loc_key) + offset_i # Replace instruction's arguments by resolved ones - instr.args = instr.resolve_args_with_symbols(symbol_pool) + instr.args = instr.resolve_args_with_symbols(loc_db) if instr.dstflow(): instr.fixDstOffset() old_l = instr.l - cached_candidate, _ = conservative_asm(mnemo, instr, symbol_pool, + cached_candidate, _ = conservative_asm(mnemo, instr, loc_db, conservative) # Restore original arguments @@ -1219,8 +1220,8 @@ def assemble_block(mnemo, block, symbol_pool, conservative=False): offset_i += instr.l -def asmblock_final(mnemo, asmcfg, blockChains, symbol_pool, conservative=False): - """Resolve and assemble @blockChains using @symbol_pool until fixed point is +def asmblock_final(mnemo, asmcfg, blockChains, loc_db, conservative=False): + """Resolve and assemble @blockChains using @loc_db until fixed point is reached""" log_asmblock.debug("asmbloc_final") @@ -1267,36 +1268,36 @@ def asmblock_final(mnemo, asmcfg, blockChains, symbol_pool, conservative=False): while blocks_to_rework: block = blocks_to_rework.pop() - assemble_block(mnemo, block, symbol_pool, conservative) + assemble_block(mnemo, block, loc_db, conservative) -def asmbloc_final(mnemo, blocks, blockChains, symbol_pool, conservative=False): - """Resolve and assemble @blockChains using @symbol_pool until fixed point is +def asmbloc_final(mnemo, blocks, blockChains, loc_db, conservative=False): + """Resolve and assemble @blockChains using @loc_db until fixed point is reached""" warnings.warn('DEPRECATION WARNING: use "asmblock_final" instead of "asmbloc_final"') - asmblock_final(mnemo, blocks, blockChains, symbol_pool, conservative) + asmblock_final(mnemo, blocks, blockChains, loc_db, conservative) -def asm_resolve_final(mnemo, asmcfg, symbol_pool, dst_interval=None): - """Resolve and assemble @asmcfg using @symbol_pool into interval +def asm_resolve_final(mnemo, asmcfg, loc_db, dst_interval=None): + """Resolve and assemble @asmcfg using @loc_db into interval @dst_interval""" asmcfg.sanity_check() asmcfg.guess_blocks_size(mnemo) - blockChains = group_constrained_blocks(symbol_pool, asmcfg) + blockChains = group_constrained_blocks(loc_db, asmcfg) resolved_blockChains = resolve_symbol( blockChains, - symbol_pool, + loc_db, dst_interval ) - asmblock_final(mnemo, asmcfg, resolved_blockChains, symbol_pool) + asmblock_final(mnemo, asmcfg, resolved_blockChains, loc_db) patches = {} output_interval = interval() for block in asmcfg.blocks: - offset = symbol_pool.loc_key_to_offset(block.loc_key) + offset = loc_db.loc_key_to_offset(block.loc_key) for instr in block.lines: if not instr.data: # Empty line @@ -1335,7 +1336,7 @@ class disasmEngine(object): - blocs_wd: maximum number of distinct disassembled block + callback(arch, attrib, pool_bin, cur_bloc, offsets_to_dis, - symbol_pool) + loc_db) - dis_block_callback: callback after each new disassembled block """ @@ -1349,7 +1350,7 @@ class disasmEngine(object): self.arch = arch self.attrib = attrib self.bin_stream = bin_stream - self.symbol_pool = AsmSymbolPool() + self.loc_db = LocationDB() # Setup options self.dont_dis = [] @@ -1381,6 +1382,10 @@ class disasmEngine(object): warnings.warn("""DEPRECATION WARNING: "dis_bloc_callback" use dis_block_callback.""") self.dis_block_callback = function + @property + def symbol_pool(self): + warnings.warn("""DEPRECATION WARNING: use 'loc_db'""") + return self.loc_db # Deprecated job_done = property(get_job_done, set_job_done) @@ -1399,7 +1404,7 @@ class disasmEngine(object): delayslot_count = self.arch.delayslot offsets_to_dis = set() add_next_offset = False - loc_key = self.symbol_pool.getby_offset_create(offset) + loc_key = self.loc_db.getby_offset_create(offset) cur_block = AsmBlock(loc_key) log_asmblock.debug("dis at %X", int(offset)) while not in_delayslot or delayslot_count > 0: @@ -1414,12 +1419,12 @@ class disasmEngine(object): else: # Block is not empty, stop the desassembly pass and add a # constraint to the next block - loc_key_cst = self.symbol_pool.getby_offset_create(offset) + loc_key_cst = self.loc_db.getby_offset_create(offset) cur_block.add_cst(loc_key_cst, AsmConstraint.c_next) break if lines_cpt > 0 and offset in self.split_dis: - loc_key_cst = self.symbol_pool.getby_offset_create(offset) + loc_key_cst = self.loc_db.getby_offset_create(offset) cur_block.add_cst(loc_key_cst, AsmConstraint.c_next) offsets_to_dis.add(offset) break @@ -1430,7 +1435,7 @@ class disasmEngine(object): break if offset in job_done: - loc_key_cst = self.symbol_pool.getby_offset_create(offset) + loc_key_cst = self.loc_db.getby_offset_create(offset) cur_block.add_cst(loc_key_cst, AsmConstraint.c_next) break @@ -1457,7 +1462,7 @@ class disasmEngine(object): else: # Block is not empty, stop the desassembly pass and add a # constraint to the next block - loc_key_cst = self.symbol_pool.getby_offset_create(off_i) + loc_key_cst = self.loc_db.getby_offset_create(off_i) cur_block.add_cst(loc_key_cst, AsmConstraint.c_next) break @@ -1470,7 +1475,7 @@ class disasmEngine(object): else: # Block is not empty, stop the desassembly pass and add a # constraint to the next block - loc_key_cst = self.symbol_pool.getby_offset_create(off_i) + loc_key_cst = self.loc_db.getby_offset_create(off_i) cur_block.add_cst(loc_key_cst, AsmConstraint.c_next) break @@ -1493,14 +1498,14 @@ class disasmEngine(object): if instr.splitflow() and not (instr.is_subcall() and self.dontdis_retcall): add_next_offset = True if instr.dstflow(): - instr.dstflow2label(self.symbol_pool) - destinations = instr.getdstflow(self.symbol_pool) + instr.dstflow2label(self.loc_db) + destinations = instr.getdstflow(self.loc_db) known_dsts = [] for dst in destinations: if not dst.is_loc(): continue loc_key = dst.loc_key - loc_key_offset = self.symbol_pool.loc_key_to_offset(loc_key) + loc_key_offset = self.loc_db.loc_key_to_offset(loc_key) known_dsts.append(loc_key) if loc_key_offset in self.dont_dis_retcall_funcs: add_next_offset = False @@ -1512,11 +1517,11 @@ class disasmEngine(object): delayslot_count = instr.delayslot for c in cur_block.bto: - loc_key_offset = self.symbol_pool.loc_key_to_offset(c.loc_key) + loc_key_offset = self.loc_db.loc_key_to_offset(c.loc_key) offsets_to_dis.add(loc_key_offset) if add_next_offset: - loc_key_cst = self.symbol_pool.getby_offset_create(offset) + loc_key_cst = self.loc_db.getby_offset_create(offset) cur_block.add_cst(loc_key_cst, AsmConstraint.c_next) offsets_to_dis.add(offset) @@ -1527,7 +1532,9 @@ class disasmEngine(object): self.dis_block_callback(mn=self.arch, attrib=self.attrib, pool_bin=self.bin_stream, cur_bloc=cur_block, offsets_to_dis=offsets_to_dis, - symbol_pool=self.symbol_pool) + loc_db=self.loc_db, + # Deprecated API + symbol_pool=self.loc_db) return cur_block, offsets_to_dis def dis_block(self, offset): @@ -1557,7 +1564,7 @@ class disasmEngine(object): log_asmblock.info("dis bloc all") job_done = set() if blocks is None: - blocks = AsmCFG(self.symbol_pool) + blocks = AsmCFG(self.loc_db) todo = [offset] bloc_cpt = 0 @@ -1575,7 +1582,7 @@ class disasmEngine(object): todo += nexts blocks.add_block(cur_block) - blocks.apply_splitting(self.symbol_pool, + blocks.apply_splitting(self.loc_db, dis_block_callback=self.dis_block_callback, mn=self.arch, attrib=self.attrib, pool_bin=self.bin_stream) |