from __future__ import print_function import sys import subprocess from optparse import OptionParser from pdb import pm from future.utils import viewitems from miasm.analysis.machine import Machine from miasm.analysis.binary import Container from miasm.expression.expression import ExprInt, ExprCond, ExprId, \ get_expr_ids, ExprAssign, ExprLoc from miasm.core.bin_stream import bin_stream_str from miasm.ir.symbexec import SymbolicExecutionEngine, get_block from miasm.expression.simplifications import expr_simp from miasm.core import parse_asm from miasm.ir.translators.translator import Translator from miasm.core.locationdb import LocationDB machine = Machine("x86_32") parser = OptionParser(usage="usage: %prog [options] file") parser.add_option('-a', "--address", dest="address", metavar="ADDRESS", help="address to disasemble", default="0") (options, args) = parser.parse_args(sys.argv[1:]) if not args: parser.print_help() sys.exit(0) def emul_symb(ir_arch, ircfg, mdis, states_todo, states_done): while states_todo: addr, symbols, conds = states_todo.pop() print('*' * 40, "addr", addr, '*' * 40) if (addr, symbols, conds) in states_done: print('Known state, skipping', addr) continue states_done.add((addr, symbols, conds)) symbexec = SymbolicExecutionEngine(ir_arch) symbexec.symbols = symbols.copy() if ir_arch.pc in symbexec.symbols: del symbexec.symbols[ir_arch.pc] irblock = get_block(ir_arch, ircfg, mdis, addr) print('Run block:') print(irblock) addr = symbexec.eval_updt_irblock(irblock) print('Final state:') symbexec.dump(mems=False) assert addr is not None if isinstance(addr, ExprCond): # Create 2 states, each including complementary conditions cond_group_a = {addr.cond: ExprInt(0, addr.cond.size)} cond_group_b = {addr.cond: ExprInt(1, addr.cond.size)} addr_a = expr_simp(symbexec.eval_expr(addr.replace_expr(cond_group_a), {})) addr_b = expr_simp(symbexec.eval_expr(addr.replace_expr(cond_group_b), {})) if not (addr_a.is_int() or addr_a.is_loc() and addr_b.is_int() or addr_b.is_loc()): print(str(addr_a), str(addr_b)) raise ValueError("Unsupported condition") if isinstance(addr_a, ExprInt): addr_a = int(addr_a.arg) if isinstance(addr_b, ExprInt): addr_b = int(addr_b.arg) states_todo.add((addr_a, symbexec.symbols.copy(), tuple(list(conds) + list(viewitems(cond_group_a))))) states_todo.add((addr_b, symbexec.symbols.copy(), tuple(list(conds) + list(viewitems(cond_group_b))))) elif addr == ret_addr: print('Return address reached') continue elif addr.is_int(): addr = int(addr.arg) states_todo.add((addr, symbexec.symbols.copy(), tuple(conds))) elif addr.is_loc(): states_todo.add((addr, symbexec.symbols.copy(), tuple(conds))) else: raise ValueError("Unsupported destination") if __name__ == '__main__': loc_db = LocationDB() translator_smt2 = Translator.to_language("smt2") addr = int(options.address, 16) cont = Container.from_stream(open(args[0], 'rb'), loc_db) mdis = machine.dis_engine(cont.bin_stream, loc_db=loc_db) ir_arch = machine.ir(mdis.loc_db) ircfg = ir_arch.new_ircfg() symbexec = SymbolicExecutionEngine(ir_arch) asmcfg = parse_asm.parse_txt( machine.mn, 32, ''' init: PUSH argv PUSH argc PUSH ret_addr ''', loc_db ) argc_lbl = loc_db.get_name_location('argc') argv_lbl = loc_db.get_name_location('argv') ret_addr_lbl = loc_db.get_name_location('ret_addr') init_lbl = loc_db.get_name_location('init') argc_loc = ExprLoc(argc_lbl, 32) argv_loc = ExprLoc(argv_lbl, 32) ret_addr_loc = ExprLoc(ret_addr_lbl, 32) ret_addr = ExprId("ret_addr", ret_addr_loc.size) fix_args = { argc_loc: ExprId("argc", argc_loc.size), argv_loc: ExprId("argv", argv_loc.size), ret_addr_loc: ret_addr, } block = asmcfg.loc_key_to_block(init_lbl) for instr in block.lines: for i, arg in enumerate(instr.args): instr.args[i]= arg.replace_expr(fix_args) print(block) # add fake address and len to parsed instructions ir_arch.add_asmblock_to_ircfg(block, ircfg) irb = ircfg.blocks[init_lbl] symbexec.eval_updt_irblock(irb) symbexec.dump(ids=False) # reset ir_arch blocks ir_arch.blocks = {} states_todo = set() states_done = set() states_todo.add((addr, symbexec.symbols, ())) # emul blocks, propagate states emul_symb(ir_arch, ircfg, mdis, states_todo, states_done) all_info = [] print('*' * 40, 'conditions to match', '*' * 40) for addr, symbols, conds in sorted(states_done, key=str): print('*' * 40, addr, '*' * 40) reqs = [] for k, v in conds: print(k, v) reqs.append((k, v)) all_info.append((addr, reqs)) all_cases = set() symbexec = SymbolicExecutionEngine(ir_arch) for addr, reqs_cond in all_info: out = ['(set-logic QF_ABV)', '(set-info :smt-lib-version 2.0)'] conditions = [] all_ids = set() for expr, value in reqs_cond: all_ids.update(get_expr_ids(expr)) expr_test = ExprCond(expr, ExprInt(1, value.size), ExprInt(0, value.size)) cond = translator_smt2.from_expr(ExprAssign(expr_test, value)) conditions.append(cond) for name in all_ids: out.append("(declare-fun %s () (_ BitVec %d))" % (name, name.size)) if not out: continue out += conditions out.append('(check-sat)') open('out.dot', 'w').write('\n'.join(out)) try: cases = subprocess.check_output(["/home/serpilliere/tools/stp/stp", "-p", '--SMTLIB2', "out.dot"]) except OSError: print("Cannot find stp binary!") break for c in cases.split('\n'): if c.startswith('ASSERT'): all_cases.add((addr, c)) print('*' * 40, 'ALL COND', '*' * 40) all_cases = list(all_cases) all_cases.sort(key=lambda x: (x[0], x[1])) for addr, val in all_cases: print('Address:', addr, 'is reachable using argc', val)