from __future__ import print_function import logging from argparse import ArgumentParser from pdb import pm from future.utils import viewitems, viewvalues from miasm.analysis.binary import Container from miasm.core.asmblock import log_asmblock, AsmCFG from miasm.core.interval import interval from miasm.analysis.machine import Machine from miasm.analysis.data_flow import \ DiGraphDefUse, ReachingDefinitions, load_from_int from miasm.expression.simplifications import expr_simp from miasm.analysis.ssa import SSADiGraph from miasm.ir.ir import AssignBlock, IRBlock from miasm.analysis.simplifier import IRCFGSimplifierCommon, IRCFGSimplifierSSA from miasm.core.locationdb import LocationDB log = logging.getLogger("dis") console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter("%(levelname)-5s: %(message)s")) log.addHandler(console_handler) log.setLevel(logging.INFO) parser = ArgumentParser("Disassemble a binary") parser.add_argument('filename', help="File to disassemble") parser.add_argument('address', help="Starting address for disassembly engine", nargs="*") parser.add_argument('-m', '--architecture', help="architecture: " + \ ",".join(Machine.available_machine())) parser.add_argument('-f', "--followcall", action="store_true", help="Follow call instructions") parser.add_argument('-b', "--blockwatchdog", default=None, type=int, help="Maximum number of basic block to disassemble") parser.add_argument('-n', "--funcswatchdog", default=None, type=int, help="Maximum number of function to disassemble") parser.add_argument('-r', "--recurfunctions", action="store_true", help="Disassemble founded functions") parser.add_argument('-v', "--verbose", action="count", help="Verbose mode", default=0) parser.add_argument('-g', "--gen_ir", action="store_true", help="Compute the intermediate representation") parser.add_argument('-z', "--dis-nulstart-block", action="store_true", help="Do not disassemble NULL starting block") parser.add_argument('-l', "--dontdis-retcall", action="store_true", help="If set, disassemble only call destinations") parser.add_argument('-s', "--simplify", action="count", help="Apply simplifications rules (liveness, graph simplification, ...)", default=0) parser.add_argument("--base-address", default=0, type=lambda x: int(x, 0), help="Base address of the input binary") parser.add_argument('-a', "--try-disasm-all", action="store_true", help="Try to disassemble the whole binary") parser.add_argument('-i', "--image", action="store_true", help="Display image representation of disasm") parser.add_argument('-c', "--rawbinary", default=False, action="store_true", help="Don't interpret input as ELF/PE/...") parser.add_argument('-d', "--defuse", action="store_true", help="Dump the def-use graph in file 'defuse.dot'." "The defuse is dumped after simplifications if -s option is specified") parser.add_argument('-p', "--ssa", action="store_true", help="Generate the ssa form in 'ssa.dot'.") parser.add_argument('-x', "--propagexpr", action="store_true", help="Do Expression propagation.") parser.add_argument('-e', "--loadint", action="store_true", help="Load integers from binary in fixed memory lookup.") parser.add_argument('-j', "--calldontmodstack", action="store_true", help="Consider stack high is not modified in subcalls") args = parser.parse_args() if args.verbose: log_asmblock.setLevel(logging.DEBUG) loc_db = LocationDB() log.info('Load binary') if args.rawbinary: cont = Container.fallback_container( open(args.filename, "rb").read(), vm=None, addr=args.base_address, loc_db=loc_db, ) else: with open(args.filename, "rb") as fdesc: cont = Container.from_stream( fdesc, addr=args.base_address, loc_db=loc_db, ) default_addr = cont.entry_point bs = cont.bin_stream e = cont.executable log.info('ok') log.info("import machine...") # Use the guessed architecture or the specified one arch = args.architecture if args.architecture else cont.arch if not arch: print("Architecture recognition fail. Please specify it in arguments") exit(-1) # Instance the arch-dependent machine machine = Machine(arch) mn, dis_engine = machine.mn, machine.dis_engine log.info('ok') mdis = dis_engine(bs, loc_db=cont.loc_db) # configure disasm engine mdis.dontdis_retcall = args.dontdis_retcall mdis.blocs_wd = args.blockwatchdog mdis.dont_dis_nulstart_bloc = not args.dis_nulstart_block mdis.follow_call = args.followcall todo = [] addrs = [] for addr in args.address: try: addrs.append(int(addr, 0)) except ValueError: # Second chance, try with symbol loc_key = mdis.loc_db.get_name_location(addr) offset = mdis.loc_db.get_location_offset(loc_key) addrs.append(offset) if len(addrs) == 0 and default_addr is not None: addrs.append(default_addr) for ad in addrs: todo += [(mdis, None, ad)] done = set() all_funcs = set() all_funcs_blocks = {} done_interval = interval() finish = False entry_points = set() # Main disasm loop while not finish and todo: while not finish and todo: mdis, caller, ad = todo.pop(0) if ad in done: continue done.add(ad) asmcfg = mdis.dis_multiblock(ad) entry_points.add(mdis.loc_db.get_offset_location(ad)) log.info('func ok %.16x (%d)' % (ad, len(all_funcs))) all_funcs.add(ad) all_funcs_blocks[ad] = asmcfg for block in asmcfg.blocks: for l in block.lines: done_interval += interval([(l.offset, l.offset + l.l)]) if args.funcswatchdog is not None: args.funcswatchdog -= 1 if args.recurfunctions: for block in asmcfg.blocks: instr = block.get_subcall_instr() if not instr: continue for dest in instr.getdstflow(mdis.loc_db): if not dest.is_loc(): continue offset = mdis.loc_db.get_location_offset(dest.loc_key) todo.append((mdis, instr, offset)) if args.funcswatchdog is not None and args.funcswatchdog <= 0: finish = True if args.try_disasm_all: for a, b in done_interval.intervals: if b in done: continue log.debug('add func %s' % hex(b)) todo.append((mdis, None, b)) # Generate dotty graph all_asmcfg = AsmCFG(mdis.loc_db) for blocks in viewvalues(all_funcs_blocks): all_asmcfg += blocks log.info('generate graph file') open('graph_execflow.dot', 'w').write(all_asmcfg.dot(offset=True)) log.info('generate intervals') all_lines = [] total_l = 0 print(done_interval) if args.image: log.info('build img') done_interval.show() for i, j in done_interval.intervals: log.debug((hex(i), "->", hex(j))) all_lines.sort(key=lambda x: x.offset) open('lines.dot', 'w').write('\n'.join(str(l) for l in all_lines)) log.info('total lines %s' % total_l) if args.propagexpr: args.gen_ir = True class LifterDelModCallStack(machine.lifter_model_call): def call_effects(self, addr, instr): assignblks, extra = super(LifterDelModCallStack, self).call_effects(addr, instr) if not args.calldontmodstack: return assignblks, extra out = [] for assignblk in assignblks: dct = dict(assignblk) dct = { dst:src for (dst, src) in viewitems(dct) if dst != self.sp } out.append(AssignBlock(dct, assignblk.instr)) return out, extra # Bonus, generate IR graph if args.gen_ir: log.info("Lift and Lift with modeled calls") lifter = machine.lifter(mdis.loc_db) lifter_model_call = LifterDelModCallStack(mdis.loc_db) ircfg = lifter.new_ircfg() ircfg_model_call = lifter.new_ircfg() head = list(entry_points)[0] for ad, asmcfg in viewitems(all_funcs_blocks): log.info("generating IR... %x" % ad) for block in asmcfg.blocks: lifter.add_asmblock_to_ircfg(block, ircfg) lifter_model_call.add_asmblock_to_ircfg(block, ircfg_model_call) log.info("Print blocks (without analyse)") for label, block in viewitems(ircfg.blocks): print(block) log.info("Gen Graph... %x" % ad) log.info("Print blocks (with analyse)") for label, block in viewitems(ircfg_model_call.blocks): print(block) if args.simplify > 0: log.info("Simplify...") ircfg_simplifier = IRCFGSimplifierCommon(lifter_model_call) ircfg_simplifier.simplify(ircfg_model_call, head) log.info("ok...") if args.defuse: reachings = ReachingDefinitions(ircfg_model_call) open('graph_defuse.dot', 'w').write(DiGraphDefUse(reachings).dot()) out = ircfg.dot() open('graph_irflow_raw.dot', 'w').write(out) out = ircfg_model_call.dot() open('graph_irflow.dot', 'w').write(out) if args.ssa and not args.propagexpr: if len(entry_points) != 1: raise RuntimeError("Your graph should have only one head") ssa = SSADiGraph(ircfg_model_call) ssa.transform(head) open("ssa.dot", "w").write(ircfg_model_call.dot()) if args.propagexpr: def is_addr_ro_variable(bs, addr, size): """ Return True if address at @addr is a read-only variable. WARNING: Quick & Dirty @addr: integer representing the address of the variable @size: size in bits """ try: _ = bs.getbytes(addr, size // 8) except IOError: return False return True class CustomIRCFGSimplifierSSA(IRCFGSimplifierSSA): def do_simplify(self, ssa, head): modified = super(CustomIRCFGSimplifierSSA, self).do_simplify(ssa, head) if args.loadint: modified |= load_from_int(ssa.graph, bs, is_addr_ro_variable) def simplify(self, ircfg, head): ssa = self.ircfg_to_ssa(ircfg, head) ssa = self.do_simplify_loop(ssa, head) ircfg = self.ssa_to_unssa(ssa, head) ircfg_simplifier = IRCFGSimplifierCommon(self.lifter) ircfg_simplifier.deadremoval.add_expr_to_original_expr(ssa.ssa_variable_to_expr) ircfg_simplifier.simplify(ircfg, head) return ircfg head = list(entry_points)[0] simplifier = CustomIRCFGSimplifierSSA(lifter_model_call) ircfg = simplifier.simplify(ircfg_model_call, head) open('final.dot', 'w').write(ircfg.dot())