diff options
| -rw-r--r-- | example/ida/ctype_propagation.py | 53 | ||||
| -rw-r--r-- | miasm2/analysis/cst_propag.py | 178 |
2 files changed, 211 insertions, 20 deletions
diff --git a/example/ida/ctype_propagation.py b/example/ida/ctype_propagation.py index 5b23e6a8..bedaa525 100644 --- a/example/ida/ctype_propagation.py +++ b/example/ida/ctype_propagation.py @@ -18,6 +18,7 @@ from miasm2.expression.expression import ExprMem, ExprId, ExprInt, ExprOp, ExprA from miasm2.ir.symbexec_types import SymbExecCType from miasm2.expression.parser import str_to_expr from miasm2.ir.symbexec import SymbolicExecutionEngine, SymbolicState +from miasm2.analysis.cst_propag import add_state, propagate_cst_expr from utils import guess_machine @@ -37,7 +38,7 @@ Dependency Graph Settings <##Header file :{headerFile}> <Architecture/complator:{arch}> <Types informations:{strTypesInfo}> -<Unalias stack:{rUnaliasStack}>{cMethod}> +<Unalias stack:{rUnaliasStack}>{cUnalias}> """, { 'headerFile': ida_kernwin.Form.FileInput(swidth=20, open=True), 'arch': ida_kernwin.Form.DropdownListControl( @@ -46,9 +47,10 @@ Dependency Graph Settings selval=archs[0]), 'strTypesInfo': ida_kernwin.Form.MultiLineTextControl(text=default_types_info, flags=ida_kernwin.Form.MultiLineTextControl.TXTF_FIXEDFONT), - 'cMethod': ida_kernwin.Form.ChkGroupControl(("rUnaliasStack",)), + 'cUnalias': ida_kernwin.Form.ChkGroupControl(("rUnaliasStack",)), }) form, args = self.Compile() + form.rUnaliasStack.checked = True def get_block(ir_arch, mdis, addr): @@ -99,6 +101,20 @@ class TypePropagationEngine(SymbExecCType): class SymbExecCTypeFix(SymbExecCType): + def __init__(self, ir_arch, + symbols, chandler, + cst_propag_link, + func_read=None, func_write=None, + sb_expr_simp=expr_simp): + super(SymbExecCTypeFix, self).__init__(ir_arch, + symbols, + chandler, + func_read=func_read, + func_write=func_write, + sb_expr_simp=expr_simp) + + self.cst_propag_link = cst_propag_link + def emulbloc(self, irb, step=False): """ Symbolic execution of the @irb on the current state @@ -108,6 +124,9 @@ class SymbExecCTypeFix(SymbExecCType): offset2cmt = {} for index, assignblk in enumerate(irb.irs): + if set(assignblk) == set([self.ir_arch.IRDst, self.ir_arch.pc]): + # Don't display on jxx + continue instr = assignblk.instr tmp_r = assignblk.get_r() tmp_w = assignblk.get_w() @@ -116,10 +135,8 @@ class SymbExecCTypeFix(SymbExecCType): # Replace PC with value to match IR args pc_fixed = {self.ir_arch.pc: m2_expr.ExprInt(instr.offset + instr.l, self.ir_arch.pc.size)} - args = instr.args - for arg in args: + for arg in tmp_r: arg = expr_simp(arg.replace_expr(pc_fixed)) - if arg in tmp_w and not arg.is_mem(): continue todo.add(arg) @@ -127,10 +144,10 @@ class SymbExecCTypeFix(SymbExecCType): for expr in todo: if expr.is_int(): continue - for c_str, c_type in self.chandler.expr_to_c_and_types(expr, self.symbols): + expr = self.cst_propag_link.get((irb.label, index), {}).get(expr, expr) offset2cmt.setdefault(instr.offset, set()).add( - "\n%s\n%s" % (c_str, c_type)) + "\n%s: %s\n%s" % (expr, c_str, c_type)) self.eval_ir(assignblk) for offset, value in offset2cmt.iteritems(): @@ -142,11 +159,12 @@ class SymbExecCTypeFix(SymbExecCType): class CTypeEngineFixer(SymbExecCTypeFix): - def __init__(self, ir_arch, types_mngr, state): + def __init__(self, ir_arch, types_mngr, state, cst_propag_link): mychandler = MyCHandler(types_mngr, state.symbols) super(CTypeEngineFixer, self).__init__(ir_arch, state.symbols, - mychandler) + mychandler, + cst_propag_link) def get_ira_call_fixer(ira): @@ -168,16 +186,6 @@ def get_ira_call_fixer(ira): return iraCallStackFixer -def add_state(ir_arch, todo, states, addr, state): - addr = ir_arch.get_label(addr) - if addr not in states: - states[addr] = state - todo.add(addr) - else: - todo.add(addr) - states[addr] = states[addr].merge(state) - - def analyse_function(): # Init @@ -207,6 +215,11 @@ def analyse_function(): if not ret: return + cst_propag_link = {} + if settings.cUnalias.value: + init_infos = {ir_arch.sp: ir_arch.arch.regs.regs_init[ir_arch.sp] } + cst_propag_link = propagate_cst_expr(ir_arch, addr, init_infos) + types_mngr = get_types_mngr(settings.headerFile.value, settings.arch.value) mychandler = MyCHandler(types_mngr, {}) @@ -264,7 +277,7 @@ def analyse_function(): symbexec_engine.get_state()) for lbl, state in states.iteritems(): - symbexec_engine = CTypeEngineFixer(ir_arch, types_mngr, state) + symbexec_engine = CTypeEngineFixer(ir_arch, types_mngr, state, cst_propag_link) addr = symbexec_engine.emul_ir_block(lbl) symbexec_engine.del_mem_above_stack(ir_arch.sp) diff --git a/miasm2/analysis/cst_propag.py b/miasm2/analysis/cst_propag.py new file mode 100644 index 00000000..d55d7e60 --- /dev/null +++ b/miasm2/analysis/cst_propag.py @@ -0,0 +1,178 @@ +import logging + +from miasm2.ir.symbexec import SymbolicExecutionEngine +from miasm2.expression.expression import ExprMem +from miasm2.expression.expression_helper import possible_values +from miasm2.expression.simplifications import expr_simp +from miasm2.ir.ir import IRBlock, AssignBlock + +LOG_CST_PROPAG = logging.getLogger("cst_propag") +CONSOLE_HANDLER = logging.StreamHandler() +CONSOLE_HANDLER.setFormatter(logging.Formatter("%(levelname)-5s: %(message)s")) +LOG_CST_PROPAG.addHandler(CONSOLE_HANDLER) +LOG_CST_PROPAG.setLevel(logging.WARNING) + + +class SymbExecState(SymbolicExecutionEngine): + """ + State manager for SymbolicExecution + """ + def __init__(self, ir_arch, state): + super(SymbExecState, self).__init__(ir_arch, {}) + self.set_state(state) + + +def add_state(ir_arch, todo, states, addr, state): + """ + Add or merge the computed @state for the block at @addr. Update @todo + @ir_arch: IR instance + @todo: modified block set + @states: dictionnary linking a label to its entering state. + @addr: address of the concidered block + @state: computed state + """ + addr = ir_arch.get_label(addr) + todo.add(addr) + if addr not in states: + states[addr] = state + else: + states[addr] = states[addr].merge(state) + + +def is_expr_cst(ir_arch, expr): + """Return true if @expr is only composed of ExprInt and init_regs + @ir_arch: IR instance + @expr: Expression to test""" + + elements = expr.get_r(mem_read=True) + for element in elements: + if element.is_mem(): + continue + if element.is_id() and element in ir_arch.arch.regs.all_regs_ids_init: + continue + if element.is_int(): + continue + return False + else: + # Expr is a constant + return True + + +class SymbExecStateFix(SymbolicExecutionEngine): + """ + Emul blocks and replace expressions with their corresponding constant if + any. + + """ + # Function used to test if an Expression is considered as a constant + is_expr_cst = lambda _, ir_arch, expr: is_expr_cst(ir_arch, expr) + + def __init__(self, ir_arch, state, cst_propag_link): + super(SymbExecStateFix, self).__init__(ir_arch, {}) + self.set_state(state) + self.cst_propag_link = cst_propag_link + + def propag_expr_cst(self, expr): + """Propagate consttant expressions in @expr + @expr: Expression to update""" + elements = expr.get_r(mem_read=True) + to_propag = {} + for element in elements: + # Only ExprId can be safely propagated + if not element.is_id(): + continue + value = self.eval_expr(element) + if self.is_expr_cst(self.ir_arch, value): + to_propag[element] = value + return expr_simp(expr.replace_expr(to_propag)) + + def emulbloc(self, irb, step=False): + """ + Symbolic execution of the @irb on the current state + @irb: IRBlock instance + @step: display intermediate steps + """ + assignblks = [] + for index, assignblk in enumerate(irb.irs): + new_assignblk = {} + links = {} + for dst, src in assignblk.iteritems(): + src = self.propag_expr_cst(src) + if dst.is_mem(): + ptr = dst.arg + ptr = self.propag_expr_cst(ptr) + dst = ExprMem(ptr, dst.size) + new_assignblk[dst] = src + + for arg in assignblk.instr.args: + new_arg = self.propag_expr_cst(arg) + links[new_arg] = arg + self.cst_propag_link[(irb.label, index)] = links + + self.eval_ir(assignblk) + assignblks.append(AssignBlock(new_assignblk, assignblk.instr)) + self.ir_arch.blocks[irb.label] = IRBlock(irb.label, assignblks) + + +def compute_cst_propagation_states(ir_arch, init_addr, init_infos): + """ + Propagate "constant expressions" in a function. + The attribute "constant expression" is true if the expression is based on + constants or "init" regs values. + + @ir_arch: IntermediateRepresentation instance + @init_addr: analysis start address + @init_infos: dictionnary linking expressions to their values at @init_addr + """ + + done = set() + state = SymbExecState.StateEngine(init_infos) + lbl = ir_arch.get_label(init_addr) + todo = set([lbl]) + states = {lbl: state} + + while todo: + if not todo: + break + lbl = todo.pop() + state = states[lbl] + if (lbl, state) in done: + continue + done.add((lbl, state)) + symbexec_engine = SymbExecState(ir_arch, state) + + assert lbl in ir_arch.blocks + addr = symbexec_engine.emul_ir_block(lbl) + symbexec_engine.del_mem_above_stack(ir_arch.sp) + + for dst in possible_values(addr): + value = dst.value + if value.is_mem(): + LOG_CST_PROPAG.warning('Bad destination: %s', value) + continue + elif value.is_int(): + value = ir_arch.get_label(value) + add_state(ir_arch, todo, states, value, + symbexec_engine.get_state()) + + return states + + +def propagate_cst_expr(ir_arch, addr, init_infos): + """ + Propagate "constant expressions" in a @ir_arch. + The attribute "constant expression" is true if the expression is based on + constants or "init" regs values. + + @ir_arch: IntermediateRepresentation instance + @addr: analysis start address + @init_infos: dictionnary linking expressions to their values at @init_addr + + Returns a mapping between replaced Expression and their new values. + """ + states = compute_cst_propagation_states(ir_arch, addr, init_infos) + cst_propag_link = {} + for lbl, state in states.iteritems(): + symbexec = SymbExecStateFix(ir_arch, state, cst_propag_link) + symbexec.emulbloc(ir_arch.blocks[lbl]) + return cst_propag_link |