about summary refs log tree commit diff stats
path: root/example/expression/solve_condition_stp.py
diff options
context:
space:
mode:
authorserpilliere <devnull@localhost>2014-06-03 10:27:56 +0200
committerserpilliere <devnull@localhost>2014-06-03 10:27:56 +0200
commited5c3668cc9f545b52674ad699fc2b0ed1ccb575 (patch)
tree07faf97d7e4d083173a1f7e1bfd249baed2d74f9 /example/expression/solve_condition_stp.py
parenta183e1ebd525453710306695daa8c410fd0cb2af (diff)
downloadfocaccia-miasm-ed5c3668cc9f545b52674ad699fc2b0ed1ccb575.tar.gz
focaccia-miasm-ed5c3668cc9f545b52674ad699fc2b0ed1ccb575.zip
Miasm v2
* API has changed, so old scripts need updates
* See example for API usage
* Use tcc or llvm for jit emulation
* Go to test and run test_all.py to check install

Enjoy !
Diffstat (limited to 'example/expression/solve_condition_stp.py')
-rw-r--r--example/expression/solve_condition_stp.py245
1 files changed, 245 insertions, 0 deletions
diff --git a/example/expression/solve_condition_stp.py b/example/expression/solve_condition_stp.py
new file mode 100644
index 00000000..828629fc
--- /dev/null
+++ b/example/expression/solve_condition_stp.py
@@ -0,0 +1,245 @@
+import os
+import sys
+from miasm2.arch.x86.arch import *
+from miasm2.arch.x86.regs import *
+from miasm2.arch.x86.sem import *
+from miasm2.core.bin_stream import bin_stream_str
+from miasm2.core import asmbloc
+from miasm2.expression.expression import get_rw
+from miasm2.ir.symbexec import symbexec
+from miasm2.expression.simplifications import expr_simp
+from miasm2.expression import stp
+from collections import defaultdict
+from optparse import OptionParser
+import subprocess
+from miasm2.core import parse_asm
+from elfesteem.strpatchwork import StrPatchwork
+
+from miasm2.arch.x86.disasm import dis_x86_32 as dis_engine
+
+
+filename = os.environ.get('PYTHONSTARTUP')
+if filename and os.path.isfile(filename):
+    execfile(filename)
+
+
+mn = mn_x86
+
+parser = OptionParser(usage="usage: %prog [options] file")
+parser.add_option('-a', "--address", dest="address", metavar="ADDRESS",
+                  help="address to disasemble", default="0")
+
+(options, args) = parser.parse_args(sys.argv[1:])
+if not args:
+    parser.print_help()
+    sys.exit(0)
+
+
+def get_bloc(my_ir, mdis, ad):
+    if isinstance(ad, asmbloc.asm_label):
+        l = ad
+    else:
+        l = mdis.symbol_pool.getby_offset_create(ad)
+    if not l in my_ir.blocs:
+        ad = l.offset
+        b = mdis.dis_bloc(ad)
+        my_ir.add_bloc(b)
+    b = my_ir.get_bloc(l)
+    if b is None:
+        raise LookupError('no bloc found at that address: %s' % l)
+    return b
+
+
+def emul_symb(my_ir, mdis, states_todo, states_done):
+    while states_todo:
+        ad, symbols, conds = states_todo.pop()
+        print '*' * 40, "addr", ad, '*' * 40
+        if (ad, symbols, conds) in states_done:
+            print 'skip', ad
+            continue
+        states_done.add((ad, symbols, conds))
+        sb = symbexec(mn, {})
+        sb.symbols = symbols.copy()
+        if my_ir.pc in sb.symbols:
+            del(sb.symbols[my_ir.pc])
+        b = get_bloc(my_ir, mdis, ad)
+
+        print 'run bloc'
+        print b
+        # print blocs[ad]
+        ad = sb.emulbloc(b)
+        print 'final state'
+        sb.dump_id()
+        print 'dataflow'
+        # data_flow_graph_from_expr(sb)
+
+        assert(ad is not None)
+        print "DST", ad
+
+        if isinstance(ad, ExprCond):
+            # Create 2 states, each including complementary conditions
+            p1 = sb.symbols.copy()
+            p2 = sb.symbols.copy()
+            c1 = {ad.cond: ExprInt_from(ad.cond, 0)}
+            c2 = {ad.cond: ExprInt_from(ad.cond, 1)}
+            print ad.cond
+            p1[ad.cond] = ExprInt_from(ad.cond, 0)
+            p2[ad.cond] = ExprInt_from(ad.cond, 1)
+            ad1 = expr_simp(sb.eval_expr(ad.replace_expr(c1), {}))
+            ad2 = expr_simp(sb.eval_expr(ad.replace_expr(c2), {}))
+            if not (isinstance(ad1, ExprInt) or (isinstance(ad1, ExprId) and isinstance(ad1.name, asmbloc.asm_label)) and
+                    isinstance(ad2, ExprInt) or (isinstance(ad2, ExprId) and isinstance(ad2.name, asmbloc.asm_label))):
+                print str(ad1), str(ad2)
+                raise ValueError("zarb condition")
+            conds1 = list(conds) + c1.items()
+            conds2 = list(conds) + c2.items()
+            if isinstance(ad1, ExprId):
+                ad1 = ad1.name
+            if isinstance(ad2, ExprId):
+                ad2 = ad2.name
+            if isinstance(ad1, ExprInt):
+                ad1 = ad1.arg
+            if isinstance(ad2, ExprInt):
+                ad2 = ad2.arg
+            states_todo.add((ad1, p1, tuple(conds1)))
+            states_todo.add((ad2, p2, tuple(conds2)))
+        elif isinstance(ad, ExprInt):
+            ad = int(ad.arg)
+            states_todo.add((ad, sb.symbols.copy(), tuple(conds)))
+        elif isinstance(ad, ExprId) and isinstance(ad.name, asmbloc.asm_label):
+            if isinstance(ad, ExprId):
+                ad = ad.name
+            states_todo.add((ad, sb.symbols.copy(), tuple(conds)))
+        elif ad == ret_addr:
+            print 'ret reached'
+            continue
+        else:
+            raise ValueError("zarb eip")
+
+
+if __name__ == '__main__':
+
+    data = open(args[0]).read()
+    bs = bin_stream_str(data)
+
+    mdis = dis_engine(bs)
+
+    ad = int(options.address, 16)
+
+    symbols_init = {}
+    for i, r in enumerate(all_regs_ids):
+        symbols_init[r] = all_regs_ids_init[i]
+
+    # config parser for 32 bit
+    reg_and_id = dict(mn_x86.regs.all_regs_ids_byname)
+
+    def my_ast_int2expr(a):
+        return ExprInt32(a)
+
+    def my_ast_id2expr(t):
+        if t in reg_and_id:
+            r = reg_and_id[t]
+        else:
+            r = ExprId(t, size=32)
+        return r
+    my_var_parser = parse_ast(my_ast_id2expr, my_ast_int2expr)
+    base_expr.setParseAction(my_var_parser)
+
+    argc = ExprId('argc', 32)
+    argv = ExprId('argv', 32)
+    ret_addr = ExprId('ret_addr')
+    reg_and_id[argc.name] = argc
+    reg_and_id[argv.name] = argv
+    reg_and_id[ret_addr.name] = ret_addr
+
+    my_symbols = [argc, argv, ret_addr]
+    my_symbols = dict([(x.name, x) for x in my_symbols])
+    my_symbols.update(mn_x86.regs.all_regs_ids_byname)
+
+    sb = symbexec(mn, symbols_init)
+
+    blocs, symbol_pool = parse_asm.parse_txt(mn_x86, 32, '''
+    PUSH argv
+    PUSH argc
+    PUSH ret_addr
+    ''')
+
+    my_ir = ir_x86_32(mdis.symbol_pool)
+
+    b = blocs[0][0]
+    print b
+    # add fake address and len to parsed instructions
+    for i, l in enumerate(b.lines):
+        l.offset, l.l = i, 1
+    my_ir.add_bloc(b)
+    irb = get_bloc(my_ir, mdis, 0)
+    sb.emulbloc(irb)
+    sb.dump_mem()
+
+    # reset my_ir blocs
+    my_ir.blocs = {}
+
+    states_todo = set()
+    states_done = set()
+    states_todo.add((uint32(ad), sb.symbols, ()))
+
+    # emul blocs, propagate states
+    emul_symb(my_ir, mdis, states_todo, states_done)
+
+    all_info = []
+
+    print '*' * 40, 'conditions to match', '*' * 40
+    for ad, symbols, conds in sorted(states_done):
+        print '*' * 40, ad, '*' * 40
+        reqs = []
+        for k, v in conds:
+            print k, v
+            reqs.append((k, v))
+        all_info.append((ad, reqs))
+
+    all_cases = set()
+
+    sb = symbexec(mn, symbols_init)
+    for ad, reqs_cond in all_info:
+        all_ids = set()
+        for k, v in reqs_cond:
+            all_ids.update(get_expr_ids(k))
+
+        out = []
+
+        # declare variables
+        for v in all_ids:
+            out.append(str(v) + ":" + "BITVECTOR(%d);" % v.size)
+
+        all_csts = []
+        for k, v in reqs_cond:
+            cst = k.strcst()
+            val = v.arg
+            assert(val in [0, 1])
+            inv = ""
+            if val == 1:
+                inv = "NOT "
+            val = "0" * v.size
+            all_csts.append("(%s%s=0bin%s)" % (inv, cst, val))
+        if not all_csts:
+            continue
+        rez = " AND ".join(all_csts)
+        out.append("QUERY(NOT (%s));" % rez)
+        end = "\n".join(out)
+        open('out.txt', 'w').write(end)
+        try:
+            cases = subprocess.check_output(["/home/serpilliere/tools/stp/stp",
+                                             "-p",
+                                             "out.txt"])
+        except OSError:
+            print "ERF, cannot find stp"
+            break
+        for c in cases.split('\n'):
+            if c.startswith('ASSERT'):
+                all_cases.add((ad, c))
+
+    print '*' * 40, 'ALL COND', '*' * 40
+    all_cases = list(all_cases)
+    all_cases.sort(key=lambda x: (x[0], x[1]))
+    for ad, val in all_cases:
+        print 'address', ad, 'is reachable using argc', val