about summary refs log tree commit diff stats
path: root/example/test_ida.py
diff options
context:
space:
mode:
Diffstat (limited to 'example/test_ida.py')
-rw-r--r--example/test_ida.py409
1 files changed, 409 insertions, 0 deletions
diff --git a/example/test_ida.py b/example/test_ida.py
new file mode 100644
index 00000000..449c630c
--- /dev/null
+++ b/example/test_ida.py
@@ -0,0 +1,409 @@
+import sys
+
+# Set your path first!
+sys.path.append("/home/serpilliere/tools/pyparsing/pyparsing-2.0.1/build/lib.linux-x86_64-2.7")
+sys.path.append("/home/serpilliere/projet/m2_devel/build/lib.linux-x86_64-2.7")
+
+from miasm2.core.bin_stream import bin_stream_str
+from miasm2.core.asmbloc import *
+from miasm2.expression.simplifications import expr_simp
+
+from miasm2.analysis.data_analysis import intra_bloc_flow_raw, inter_bloc_flow
+from miasm2.analysis.data_analysis import intra_bloc_flow_symbexec
+
+from idaapi import *
+import idautils
+
+
+class bin_stream_ida(bin_stream_str):
+    # ida should provide Byte function
+
+    def getbytes(self, start, l=1):
+        o = ""
+        for ad in xrange(start - self.shift, start - self.shift + l):
+            o += chr(Byte(ad))
+        return o
+
+    def readbs(self, l=1):
+        if self.offset + l > self.l:
+            raise IOError
+        o = self.getbytes(self.offset)
+        self.offset += l
+        return p
+
+    def writebs(self, l=1):
+        raise ValueError('writebs unsupported')
+
+    def __str__(self):
+        raise NotImplementedError('not fully functional')
+        out = self.bin[self.offset - self.shift:]
+        return out
+
+    def setoffset(self, val):
+        self.offset = val
+
+    def __len__(self):
+        return 0x7FFFFFFF
+
+    def getlen(self):
+        return 0x7FFFFFFF - self.offset - self.shift
+
+
+def expr2colorstr(my_ir, e):
+    # print "XXX", e
+    if isinstance(e, ExprId):
+        s = str(e)
+        if e in my_ir.arch.regs.all_regs_ids:
+            s = idaapi.COLSTR(s, idaapi.SCOLOR_REG)
+    elif isinstance(e, ExprInt):
+        s = str(e)
+        s = idaapi.COLSTR(s, idaapi.SCOLOR_NUMBER)
+    elif isinstance(e, ExprMem):
+        s = '@%d[%s]' % (e.size, expr2colorstr(my_ir, e.arg))
+    elif isinstance(e, ExprOp):
+        out = []
+        for a in e.args:
+            s = expr2colorstr(my_ir, a)
+            if isinstance(a, ExprOp):
+                s = "(%s)" % s
+            out.append(s)
+        if len(out) == 1:
+            s = "%s %s" % (e.op, str(out[0]))
+        else:
+            s = (" " + e.op + " ").join(out)
+    elif isinstance(e, ExprAff):
+        s = "%s = %s" % (
+            expr2colorstr(my_ir, e.dst), expr2colorstr(my_ir, e.src))
+    elif isinstance(e, ExprCond):
+        cond = expr2colorstr(my_ir, e.cond)
+        src1 = expr2colorstr(my_ir, e.src1)
+        src2 = expr2colorstr(my_ir, e.src2)
+        s = "(%s?%s:%s)" % (cond, src1, src2)
+    elif isinstance(e, ExprSlice):
+        s = "(%s)[%d:%d]" % (expr2colorstr(my_ir, e.arg), e.start, e.stop)
+    else:
+        s = str(e)
+    # print repr(s)
+    return s
+
+
+def color_irbloc(irbloc):
+    o = []
+    lbl = '%s' % irbloc.label
+    lbl = idaapi.COLSTR(lbl, idaapi.SCOLOR_INSN)
+    o.append(lbl)
+    for i, expr in enumerate(irbloc.irs):
+        for e in expr:
+            s = expr2colorstr(my_ir, e)
+            s = idaapi.COLSTR(s, idaapi.SCOLOR_INSN)
+            o.append('    %s' % s)
+        o.append("")
+    o.pop()
+    i = len(irbloc.irs)
+    s = str('    Dst: %s' % irbloc.dst)
+    s = idaapi.COLSTR(s, idaapi.SCOLOR_RPTCMT)
+    o.append(s)
+
+    return "\n".join(o)
+
+
+class GraphMiasmIR(GraphViewer):
+
+    def __init__(self, my_ir, title, result):
+        GraphViewer.__init__(self, title)
+        print 'init'
+        self.my_ir = my_ir
+        self.result = result
+        self.names = {}
+
+    def OnRefresh(self):
+        print 'refresh'
+        self.Clear()
+        addr_id = {}
+        for irbloc in self.my_ir.blocs.values():
+            id_irbloc = self.AddNode(color_irbloc(irbloc))
+            addr_id[irbloc] = id_irbloc
+
+        for irbloc in self.my_ir.blocs.values():
+            if not irbloc:
+                continue
+            dst = my_ir.dst_trackback(irbloc)
+            for d in dst:
+                if not self.my_ir.ExprIsLabel(d):
+                    continue
+
+                d = d.name
+                if not d in self.my_ir.blocs:
+                    continue
+                b = self.my_ir.blocs[d]
+                node1 = addr_id[irbloc]
+                node2 = addr_id[b]
+                self.AddEdge(node1, node2)
+        return True
+
+    def OnGetText(self, node_id):
+        b = self[node_id]
+        return str(b)
+
+    def OnSelect(self, node_id):
+        return True
+
+    def OnClick(self, node_id):
+        return True
+
+    def OnCommand(self, cmd_id):
+        if self.cmd_test == cmd_id:
+            print 'TEST!'
+            return
+        print "command:", cmd_id
+
+    def Show(self):
+        if not GraphViewer.Show(self):
+            return False
+        self.cmd_test = self.AddCommand("Test", "F2")
+        if self.cmd_test == 0:
+            print "Failed to add popup menu item!"
+        return True
+
+
+from miasm2.analysis.disasm_cb import guess_funcs, guess_multi_cb
+
+
+processor_name = GetLongPrm(INF_PROCNAME)
+dis_engine = None
+if processor_name == "metapc":
+
+    # HACK: check 32/64 using INF_START_SP
+    max_size = GetLongPrm(INF_START_SP)
+    if max_size == 0x80:  # TODO XXX check
+        from miasm2.arch.x86.disasm import dis_x86_16 as dis_engine
+        from miasm2.arch.x86.x86.ira import ir_a_x86_16 as ira
+    elif max_size == 0xFFFFFFFF:
+        from miasm2.arch.x86.disasm import dis_x86_32 as dis_engine
+        from miasm2.arch.x86.ira import ir_a_x86_32 as ira
+
+    elif max_size == 0xFFFFFFFFFFFFFFFF:
+        from miasm2.arch.x86.disasm import dis_x86_64 as dis_engine
+        from miasm2.arch.x86.ira import ir_a_x86_64 as ira
+
+    else:
+        raise ValueError('cannot guess 32/64 bit! (%x)' % max_size)
+elif processor_name == "ARM":
+    # TODO ARM/thumb
+    # hack for thumb: place armt = True in globals :/
+    is_armt = globals().get('armt', False)
+    if is_armt:
+        from miasm2.arch.arm.disasm import dis_armt as dis_engine
+        from miasm2.arch.arm.ira import ir_a_armt as ira
+    else:
+        from miasm2.arch.arm.disasm import dis_arm as dis_engine
+        from miasm2.arch.arm.ira import ir_a_arm as ira
+
+    from miasm2.analysis.disasm_cb import arm_guess_subcall, arm_guess_jump_table
+    guess_funcs.append(arm_guess_subcall)
+    guess_funcs.append(arm_guess_jump_table)
+
+elif processor_name == "msp430":
+    # TODO ARM/thumb
+    from miasm2.arch.msp430.disasm import dis_msp430 as dis_engine
+    from miasm2.arch.msp430.ira import ir_a_msp430 as ira
+
+else:
+    print repr(processor_name)
+    raise NotImplementedError('not fully functional')
+
+print "Arch", dis_engine
+
+fname = GetInputFile()
+print fname
+
+bs = bin_stream_ida()
+mdis = dis_engine(bs)
+my_ir = ira(mdis.symbol_pool)
+
+# populate symbols with ida names
+for ad, name in Names():
+    # print hex(ad), repr(name)
+    if name is None:
+        continue
+    mdis.symbol_pool.add_label(name, ad)
+
+print "start disasm"
+ad = ScreenEA()
+print hex(ad)
+
+ab = mdis.dis_multibloc(ad)
+
+print "generating graph"
+g = bloc2graph(ab, True)
+open('asm_flow.txt', 'w').write(g)
+
+
+print "generating IR... %x" % ad
+
+for b in ab:
+    print 'ADD'
+    print b
+    my_ir.add_bloc(b)
+
+
+print "IR ok... %x" % ad
+
+for irb in my_ir.blocs.values():
+    for irs in irb.irs:
+        for i, e in enumerate(irs):
+            e.dst, e.src = expr_simp(e.dst), expr_simp(e.src)
+
+my_ir.gen_graph()
+out = my_ir.graph()
+open('/tmp/graph.txt', 'w').write(out)
+
+
+# my_ir.dead_simp()
+
+g = GraphMiasmIR(my_ir, "Miasm IR graph", None)
+
+
+def mycb(*test):
+    print test
+    raise NotImplementedError('not fully functional')
+
+g.cmd_a = g.AddCommand("cmd a", "x")
+g.cmd_b = g.AddCommand("cmd b", "y")
+
+g.Show()
+
+
+def node2str(n):
+    label, i, node = n
+    print n
+    # out = "%s,%s\n%s"%n
+    out = "%s" % node
+    return out
+
+
+def get_node_name(label, i, n):
+    # n_name = "%s_%d_%s"%(label.name, i, n)
+    n_name = (label.name, i, n)
+    return n_name
+
+
+def get_modified_symbols(sb):
+    # get modified IDS
+    ids = sb.symbols.symbols_id.keys()
+    ids.sort()
+    out = {}
+    for i in ids:
+        if i in sb.arch.regs.regs_init and \
+                i in sb.symbols.symbols_id and \
+                sb.symbols.symbols_id[i] == sb.arch.regs.regs_init[i]:
+            continue
+        # print i, sb.symbols.symbols_id[i]
+        out[i] = sb.symbols.symbols_id[i]
+
+    # get mem IDS
+    mems = sb.symbols.symbols_mem.values()
+    for m, v in mems:
+        # print m, v
+        out[m] = v
+    pp([(str(x[0]), str(x[1])) for x in out.items()])
+    return out
+
+
+def gen_bloc_data_flow_graph(my_ir, in_str, ad):  # arch, attrib, pool_bin, bloc, symbol_pool):
+    out_str = ""
+
+    my_ir.gen_graph()
+    # my_ir.dead_simp()
+
+    irbloc_0 = None
+    for irbloc in my_ir.blocs.values():
+        if irbloc.label.offset == ad:
+            irbloc_0 = irbloc
+            break
+    assert(irbloc_0 is not None)
+    flow_graph = DiGraph()
+    done = set()
+    todo = set([irbloc_0.label])
+
+    bloc2w = {}
+
+    for irbloc in my_ir.blocs.values():
+        # intra_bloc_flow_raw(my_ir, flow_graph, irbloc)
+        intra_bloc_flow_symbexec(my_ir, flow_graph, irbloc)
+        # intra_bloc_flow_symb(my_ir, flow_graph, irbloc)
+
+    for irbloc in my_ir.blocs.values():
+        print irbloc
+        print 'IN', [str(x) for x in irbloc.in_nodes]
+        print 'OUT', [str(x) for x in irbloc.out_nodes]
+
+    print '*' * 20, 'interbloc', '*' * 20
+    inter_bloc_flow(my_ir, flow_graph, irbloc_0.label, False)
+
+    print 'Dataflow roots:'
+    for node in flow_graph.roots():
+        lbl, i, n = node
+        if n in my_ir.arch.regs.all_regs_ids:
+            print node
+
+    open('data.txt', 'w').write(flow_graph.dot())
+    return flow_graph
+
+
+class GraphMiasmIRFlow(GraphViewer):
+
+    def __init__(self, flow_graph, title, result):
+        GraphViewer.__init__(self, title)
+        print 'init'
+        self.flow_graph = flow_graph
+        self.result = result
+        self.names = {}
+
+    def OnRefresh(self):
+        print 'refresh'
+        self.Clear()
+        addr_id = {}
+        for n in self.flow_graph.nodes():
+            id_n = self.AddNode(node2str(self.flow_graph, n))
+            addr_id[n] = id_n
+
+        for a, b in self.flow_graph.edges():
+                node1, node2 = addr_id[a], addr_id[b]
+                self.AddEdge(node1, node2)
+        return True
+
+    def OnGetText(self, node_id):
+        b = self[node_id]
+        return str(b).lower()
+
+    def OnSelect(self, node_id):
+        return True
+
+    def OnClick(self, node_id):
+        return True
+
+    def OnCommand(self, cmd_id):
+        if self.cmd_test == cmd_id:
+            print 'TEST!'
+            return
+        print "command:", cmd_id
+
+    def Show(self):
+        if not GraphViewer.Show(self):
+            return False
+        self.cmd_test = self.AddCommand("Test", "F2")
+        if self.cmd_test == 0:
+            print "Failed to add popup menu item!"
+        return True
+
+
+#print "gen bloc data flow"
+#flow_graph = gen_bloc_data_flow_graph(my_ir, bs, ad)
+#def node2str(self, n):
+#    return "%s, %s\\l%s" % n
+#flow_graph.node2str = lambda n: node2str(flow_graph, n)
+#open('data_flow.txt', 'w').write(flow_graph.dot())
+
+# h =  GraphMiasmIRFlow(flow_graph, "Miasm IRFlow graph", None)
+# h.Show()