diff options
Diffstat (limited to 'miasm/analysis')
| -rw-r--r-- | miasm/analysis/cst_propag.py | 36 | ||||
| -rw-r--r-- | miasm/analysis/data_analysis.py | 30 | ||||
| -rw-r--r-- | miasm/analysis/data_flow.py | 58 | ||||
| -rw-r--r-- | miasm/analysis/depgraph.py | 12 | ||||
| -rw-r--r-- | miasm/analysis/disasm_cb.py | 12 | ||||
| -rw-r--r-- | miasm/analysis/simplifier.py | 1 |
6 files changed, 74 insertions, 75 deletions
diff --git a/miasm/analysis/cst_propag.py b/miasm/analysis/cst_propag.py index dd7733b0..cdb62d3c 100644 --- a/miasm/analysis/cst_propag.py +++ b/miasm/analysis/cst_propag.py @@ -19,8 +19,8 @@ class SymbExecState(SymbolicExecutionEngine): """ State manager for SymbolicExecution """ - def __init__(self, ir_arch, ircfg, state): - super(SymbExecState, self).__init__(ir_arch, {}) + def __init__(self, lifter, ircfg, state): + super(SymbExecState, self).__init__(lifter, {}) self.set_state(state) @@ -40,16 +40,16 @@ def add_state(ircfg, todo, states, addr, state): states[addr] = states[addr].merge(state) -def is_expr_cst(ir_arch, expr): +def is_expr_cst(lifter, expr): """Return true if @expr is only composed of ExprInt and init_regs - @ir_arch: IR instance + @lifter: Lifter instance @expr: Expression to test""" elements = expr.get_r(mem_read=True) for element in elements: if element.is_mem(): continue - if element.is_id() and element in ir_arch.arch.regs.all_regs_ids_init: + if element.is_id() and element in lifter.arch.regs.all_regs_ids_init: continue if element.is_int(): continue @@ -65,11 +65,11 @@ class SymbExecStateFix(SymbolicExecutionEngine): """ # Function used to test if an Expression is considered as a constant - is_expr_cst = lambda _, ir_arch, expr: is_expr_cst(ir_arch, expr) + is_expr_cst = lambda _, lifter, expr: is_expr_cst(lifter, expr) - def __init__(self, ir_arch, ircfg, state, cst_propag_link): + def __init__(self, lifter, ircfg, state, cst_propag_link): self.ircfg = ircfg - super(SymbExecStateFix, self).__init__(ir_arch, {}) + super(SymbExecStateFix, self).__init__(lifter, {}) self.set_state(state) self.cst_propag_link = cst_propag_link @@ -83,7 +83,7 @@ class SymbExecStateFix(SymbolicExecutionEngine): if not element.is_id(): continue value = self.eval_expr(element) - if self.is_expr_cst(self.ir_arch, value): + if self.is_expr_cst(self.lifter, value): to_propag[element] = value return expr_simp(expr.replace_expr(to_propag)) @@ -116,13 +116,13 @@ class SymbExecStateFix(SymbolicExecutionEngine): self.ircfg.blocks[irb.loc_key] = IRBlock(irb.loc_db, irb.loc_key, assignblks) -def compute_cst_propagation_states(ir_arch, ircfg, init_addr, init_infos): +def compute_cst_propagation_states(lifter, ircfg, init_addr, init_infos): """ Propagate "constant expressions" in a function. The attribute "constant expression" is true if the expression is based on constants or "init" regs values. - @ir_arch: Lifter instance + @lifter: Lifter instance @init_addr: analysis start address @init_infos: dictionary linking expressions to their values at @init_addr """ @@ -144,9 +144,9 @@ def compute_cst_propagation_states(ir_arch, ircfg, init_addr, init_infos): if lbl not in ircfg.blocks: continue - symbexec_engine = SymbExecState(ir_arch, ircfg, state) + symbexec_engine = SymbExecState(lifter, ircfg, state) addr = symbexec_engine.run_block_at(ircfg, lbl) - symbexec_engine.del_mem_above_stack(ir_arch.sp) + symbexec_engine.del_mem_above_stack(lifter.sp) for dst in possible_values(addr): value = dst.value @@ -163,23 +163,23 @@ def compute_cst_propagation_states(ir_arch, ircfg, init_addr, init_infos): return states -def propagate_cst_expr(ir_arch, ircfg, addr, init_infos): +def propagate_cst_expr(lifter, ircfg, addr, init_infos): """ - Propagate "constant expressions" in a @ir_arch. + Propagate "constant expressions" in a @lifter. The attribute "constant expression" is true if the expression is based on constants or "init" regs values. - @ir_arch: Lifter instance + @lifter: Lifter instance @addr: analysis start address @init_infos: dictionary linking expressions to their values at @init_addr Returns a mapping between replaced Expression and their new values. """ - states = compute_cst_propagation_states(ir_arch, ircfg, addr, init_infos) + states = compute_cst_propagation_states(lifter, ircfg, addr, init_infos) cst_propag_link = {} for lbl, state in viewitems(states): if lbl not in ircfg.blocks: continue - symbexec = SymbExecStateFix(ir_arch, ircfg, state, cst_propag_link) + symbexec = SymbExecStateFix(lifter, ircfg, state, cst_propag_link) symbexec.eval_updt_irblock(ircfg.blocks[lbl]) return cst_propag_link diff --git a/miasm/analysis/data_analysis.py b/miasm/analysis/data_analysis.py index ae06c59b..cd290d67 100644 --- a/miasm/analysis/data_analysis.py +++ b/miasm/analysis/data_analysis.py @@ -15,7 +15,7 @@ def get_node_name(label, i, n): return n_name -def intra_block_flow_raw(ir_arch, ircfg, flow_graph, irb, in_nodes, out_nodes): +def intra_block_flow_raw(lifter, ircfg, flow_graph, irb, in_nodes, out_nodes): """ Create data flow for an irbloc using raw IR expressions """ @@ -66,7 +66,7 @@ def intra_block_flow_raw(ir_arch, ircfg, flow_graph, irb, in_nodes, out_nodes): -def inter_block_flow_link(ir_arch, ircfg, flow_graph, irb_in_nodes, irb_out_nodes, todo, link_exec_to_data): +def inter_block_flow_link(lifter, ircfg, flow_graph, irb_in_nodes, irb_out_nodes, todo, link_exec_to_data): lbl, current_nodes, exec_nodes = todo current_nodes = dict(current_nodes) @@ -106,19 +106,19 @@ def inter_block_flow_link(ir_arch, ircfg, flow_graph, irb_in_nodes, irb_out_node return todo -def create_implicit_flow(ir_arch, flow_graph, irb_in_nodes, irb_out_ndes): +def create_implicit_flow(lifter, flow_graph, irb_in_nodes, irb_out_ndes): # first fix IN/OUT # If a son read a node which in not in OUT, add it - todo = set(ir_arch.blocks.keys()) + todo = set(lifter.blocks.keys()) while todo: lbl = todo.pop() - irb = ir_arch.blocks[lbl] - for lbl_son in ir_arch.graph.successors(irb.loc_key): - if not lbl_son in ir_arch.blocks: + irb = lifter.blocks[lbl] + for lbl_son in lifter.graph.successors(irb.loc_key): + if not lbl_son in lifter.blocks: print("cannot find block!!", lbl) continue - irb_son = ir_arch.blocks[lbl_son] + irb_son = lifter.blocks[lbl_son] for n_r in irb_in_nodes[irb_son.loc_key]: if n_r in irb_out_nodes[irb.loc_key]: continue @@ -130,13 +130,13 @@ def create_implicit_flow(ir_arch, flow_graph, irb_in_nodes, irb_out_ndes): if not n_r in irb_in_nodes[irb.loc_key]: irb_in_nodes[irb.loc_key][n_r] = irb.loc_key, 0, n_r node_n_r = irb_in_nodes[irb.loc_key][n_r] - for lbl_p in ir_arch.graph.predecessors(irb.loc_key): + for lbl_p in lifter.graph.predecessors(irb.loc_key): todo.add(lbl_p) flow_graph.add_uniq_edge(node_n_r, node_n_w) -def inter_block_flow(ir_arch, ircfg, flow_graph, irb_0, irb_in_nodes, irb_out_nodes, link_exec_to_data=True): +def inter_block_flow(lifter, ircfg, flow_graph, irb_0, irb_in_nodes, irb_out_nodes, link_exec_to_data=True): todo = set() done = set() @@ -147,7 +147,7 @@ def inter_block_flow(ir_arch, ircfg, flow_graph, irb_0, irb_in_nodes, irb_out_no if state in done: continue done.add(state) - out = inter_block_flow_link(ir_arch, ircfg, flow_graph, irb_in_nodes, irb_out_nodes, state, link_exec_to_data) + out = inter_block_flow_link(lifter, ircfg, flow_graph, irb_in_nodes, irb_out_nodes, state, link_exec_to_data) todo.update(out) @@ -161,20 +161,20 @@ class symb_exec_func(object): There is no real magic here, loops and complex merging will certainly fail. """ - def __init__(self, ir_arch): + def __init__(self, lifter): self.todo = set() self.stateby_ad = {} self.cpt = {} self.states_var_done = set() self.states_done = set() self.total_done = 0 - self.ir_arch = ir_arch + self.lifter = lifter def add_state(self, parent, ad, state): variables = dict(state.symbols) # get block dead, and remove from state - b = self.ir_arch.get_block(ad) + b = self.lifter.get_block(ad) if b is None: raise ValueError("unknown block! %s" % ad) s = parent, ad, tuple(sorted(viewitems(variables))) @@ -198,7 +198,7 @@ class symb_exec_func(object): self.states_done.add(state) self.states_var_done.add(state) - sb = SymbolicExecutionEngine(self.ir_arch, dict(s)) + sb = SymbolicExecutionEngine(self.lifter, dict(s)) return parent, ad, sb return None diff --git a/miasm/analysis/data_flow.py b/miasm/analysis/data_flow.py index 6ace4f4e..4ff96caa 100644 --- a/miasm/analysis/data_flow.py +++ b/miasm/analysis/data_flow.py @@ -213,8 +213,8 @@ class DeadRemoval(object): Do dead removal """ - def __init__(self, ir_arch, expr_to_original_expr=None): - self.ir_arch = ir_arch + def __init__(self, lifter, expr_to_original_expr=None): + self.lifter = lifter if expr_to_original_expr is None: expr_to_original_expr = {} self.expr_to_original_expr = expr_to_original_expr @@ -226,7 +226,7 @@ class DeadRemoval(object): def is_unkillable_destination(self, lval, rval): if ( lval.is_mem() or - self.ir_arch.IRDst == lval or + self.lifter.IRDst == lval or lval.is_id("exception_flags") or is_function_call(rval) ): @@ -290,7 +290,7 @@ class DeadRemoval(object): Find definitions of out regs starting from @block """ worklist = set() - for reg in self.ir_arch.get_out_regs(block): + for reg in self.lifter.get_out_regs(block): worklist.add((reg, block.loc_key)) ret = self.find_definitions_from_worklist(worklist, ircfg) return ret @@ -743,7 +743,7 @@ def expr_has_mem(expr): def stack_to_reg(expr): if expr.is_mem(): ptr = expr.arg - SP = ir_arch_a.sp + SP = lifter.sp if ptr == SP: return ExprId("STACK.0", expr.size) elif (ptr.is_op('+') and @@ -757,26 +757,26 @@ def stack_to_reg(expr): return False -def is_stack_access(ir_arch_a, expr): +def is_stack_access(lifter, expr): if not expr.is_mem(): return False ptr = expr.ptr - diff = expr_simp(ptr - ir_arch_a.sp) + diff = expr_simp(ptr - lifter.sp) if not diff.is_int(): return False return expr -def visitor_get_stack_accesses(ir_arch_a, expr, stack_vars): - if is_stack_access(ir_arch_a, expr): +def visitor_get_stack_accesses(lifter, expr, stack_vars): + if is_stack_access(lifter, expr): stack_vars.add(expr) return expr -def get_stack_accesses(ir_arch_a, expr): +def get_stack_accesses(lifter, expr): result = set() def get_stack(expr_to_test): - visitor_get_stack_accesses(ir_arch_a, expr_to_test, result) + visitor_get_stack_accesses(lifter, expr_to_test, result) return None visitor = ExprWalk(get_stack) visitor.visit(expr) @@ -790,14 +790,14 @@ def get_interval_length(interval_in): return length -def check_expr_below_stack(ir_arch_a, expr): +def check_expr_below_stack(lifter, expr): """ Return False if expr pointer is below original stack pointer - @ir_arch_a: lifter_model_call instance + @lifter: lifter_model_call instance @expr: Expression instance """ ptr = expr.ptr - diff = expr_simp(ptr - ir_arch_a.sp) + diff = expr_simp(ptr - lifter.sp) if not diff.is_int(): return True if int(diff) == 0 or int(expr_simp(diff.msb())) == 0: @@ -805,20 +805,20 @@ def check_expr_below_stack(ir_arch_a, expr): return True -def retrieve_stack_accesses(ir_arch_a, ircfg): +def retrieve_stack_accesses(lifter, ircfg): """ Walk the ssa graph and find stack based variables. Return a dictionary linking stack base address to its size/name - @ir_arch_a: lifter_model_call instance + @lifter: lifter_model_call instance @ircfg: IRCFG instance """ stack_vars = set() for block in viewvalues(ircfg.blocks): for assignblk in block: for dst, src in viewitems(assignblk): - stack_vars.update(get_stack_accesses(ir_arch_a, dst)) - stack_vars.update(get_stack_accesses(ir_arch_a, src)) - stack_vars = [expr for expr in stack_vars if check_expr_below_stack(ir_arch_a, expr)] + stack_vars.update(get_stack_accesses(lifter, dst)) + stack_vars.update(get_stack_accesses(lifter, src)) + stack_vars = [expr for expr in stack_vars if check_expr_below_stack(lifter, expr)] base_to_var = {} for var in stack_vars: @@ -829,7 +829,7 @@ def retrieve_stack_accesses(ir_arch_a, ircfg): for addr, vars in viewitems(base_to_var): var_interval = interval() for var in vars: - offset = expr_simp(addr - ir_arch_a.sp) + offset = expr_simp(addr - lifter.sp) if not offset.is_int(): # skip non linear stack offset continue @@ -879,7 +879,7 @@ def replace_mem_stack_vars(expr, base_to_info): return expr.visit(lambda expr:fix_stack_vars(expr, base_to_info)) -def replace_stack_vars(ir_arch_a, ircfg): +def replace_stack_vars(lifter, ircfg): """ Try to replace stack based memory accesses by variables. @@ -889,11 +889,11 @@ def replace_stack_vars(ir_arch_a, ircfg): WARNING: may fail - @ir_arch_a: lifter_model_call instance + @lifter: lifter_model_call instance @ircfg: IRCFG instance """ - base_to_info = retrieve_stack_accesses(ir_arch_a, ircfg) + base_to_info = retrieve_stack_accesses(lifter, ircfg) modified = False for block in list(viewvalues(ircfg.blocks)): assignblks = [] @@ -946,16 +946,16 @@ def read_mem(bs, expr): return ExprInt(value, expr.size) -def load_from_int(ir_arch, bs, is_addr_ro_variable): +def load_from_int(ircfg, bs, is_addr_ro_variable): """ Replace memory read based on constant with static value - @ir_arch: lifter_model_call instance + @ircfg: IRCFG instance @bs: binstream instance @is_addr_ro_variable: callback(addr, size) to test memory candidate """ modified = False - for block in list(viewvalues(ir_arch.blocks)): + for block in list(viewvalues(ircfg.blocks)): assignblks = list() for assignblk in block: out = {} @@ -988,7 +988,7 @@ def load_from_int(ir_arch, bs, is_addr_ro_variable): out = AssignBlock(out, assignblk.instr) assignblks.append(out) block = IRBlock(block.loc_db, block.loc_key, assignblks) - ir_arch.blocks[block.loc_key] = block + ircfg.blocks[block.loc_key] = block return modified @@ -1188,14 +1188,14 @@ class DiGraphLivenessIRA(DiGraphLiveness): DiGraph representing variable liveness for IRA """ - def init_var_info(self, ir_arch_a): + def init_var_info(self, lifter): """Add ircfg out regs""" for node in self.leaves(): irblock = self.ircfg.blocks.get(node, None) if irblock is None: continue - var_out = ir_arch_a.get_out_regs(irblock) + var_out = lifter.get_out_regs(irblock) irblock_liveness = self.blocks[node] irblock_liveness.infos[-1].var_out = var_out diff --git a/miasm/analysis/depgraph.py b/miasm/analysis/depgraph.py index 8b5c87e2..7fadd9bf 100644 --- a/miasm/analysis/depgraph.py +++ b/miasm/analysis/depgraph.py @@ -282,10 +282,10 @@ class DependencyResult(DependencyState): return IRBlock(irb.loc_db, irb.loc_key, assignblks) - def emul(self, ir_arch, ctx=None, step=False): + def emul(self, lifter, ctx=None, step=False): """Symbolic execution of relevant nodes according to the history Return the values of inputs nodes' elements - @ir_arch: Lifter instance + @lifter: Lifter instance @ctx: (optional) Initial context as dictionary @step: (optional) Verbose execution Warning: The emulation is not sound if the inputs nodes depend on loop @@ -308,9 +308,9 @@ class DependencyResult(DependencyState): line_nb).assignblks # Eval the block - loc_db = ir_arch.loc_db + loc_db = lifter.loc_db temp_loc = loc_db.get_or_create_name_location("Temp") - symb_exec = SymbolicExecutionEngine(ir_arch, ctx_init) + symb_exec = SymbolicExecutionEngine(lifter, ctx_init) symb_exec.eval_updt_irblock(IRBlock(loc_db, temp_loc, assignblks), step=step) # Return only inputs values (others could be wrongs) @@ -361,13 +361,13 @@ class DependencyResultImplicit(DependencyResult): conds = translator.from_expr(self.unsat_expr) return conds - def emul(self, ir_arch, ctx=None, step=False): + def emul(self, lifter, ctx=None, step=False): # Init ctx_init = {} if ctx is not None: ctx_init.update(ctx) solver = z3.Solver() - symb_exec = SymbolicExecutionEngine(ir_arch, ctx_init) + symb_exec = SymbolicExecutionEngine(lifter, ctx_init) history = self.history[::-1] history_size = len(history) translator = Translator.to_language("z3") diff --git a/miasm/analysis/disasm_cb.py b/miasm/analysis/disasm_cb.py index a3d7a68c..f180f0a2 100644 --- a/miasm/analysis/disasm_cb.py +++ b/miasm/analysis/disasm_cb.py @@ -29,11 +29,11 @@ def arm_guess_subcall(dis_engine, cur_block, offsets_to_dis): loc_db = dis_engine.loc_db lifter_model_call = get_lifter_model_call(arch, dis_engine.attrib) - ir_arch = lifter_model_call(loc_db) + lifter = lifter_model_call(loc_db) ircfg = lifter_model_call.new_ircfg() print('###') print(cur_block) - ir_arch.add_asmblock_to_ircfg(cur_block, ircfg) + lifter.add_asmblock_to_ircfg(cur_block, ircfg) to_add = set() for irblock in viewvalues(ircfg.blocks): @@ -41,7 +41,7 @@ def arm_guess_subcall(dis_engine, cur_block, offsets_to_dis): lr_val = None for exprs in irblock: for e in exprs: - if e.dst == ir_arch.pc: + if e.dst == lifter.pc: pc_val = e.src if e.dst == arch.regs.LR: lr_val = e.src @@ -71,15 +71,15 @@ def arm_guess_jump_table(dis_engine, cur_block, offsets_to_dis): jra = ExprId('jra') jrb = ExprId('jrb') - ir_arch = lifter_model_call(loc_db) + lifter = lifter_model_call(loc_db) ircfg = lifter_model_call.new_ircfg() - ir_arch.add_asmblock_to_ircfg(cur_block, ircfg) + lifter.add_asmblock_to_ircfg(cur_block, ircfg) for irblock in viewvalues(ircfg.blocks): pc_val = None for exprs in irblock: for e in exprs: - if e.dst == ir_arch.pc: + if e.dst == lifter.pc: pc_val = e.src if pc_val is None: continue diff --git a/miasm/analysis/simplifier.py b/miasm/analysis/simplifier.py index 193b329c..a7c29b06 100644 --- a/miasm/analysis/simplifier.py +++ b/miasm/analysis/simplifier.py @@ -53,7 +53,6 @@ class IRCFGSimplifier(object): @property def ir_arch(self): - fds warnings.warn('DEPRECATION WARNING: use ".lifter" instead of ".ir_arch"') return self.lifter |