about summary refs log tree commit diff stats
path: root/example
diff options
context:
space:
mode:
authorserpilliere <serpilliere@users.noreply.github.com>2017-09-05 20:50:21 +0200
committerGitHub <noreply@github.com>2017-09-05 20:50:21 +0200
commitc4e78df2485a8368f74ad18a88cd3ab168258c3d (patch)
tree60a78801b8f774ac6fb95c501317d08ea201c8f4 /example
parent93a6b40af9fc16c662798f051187f4b0ddaff176 (diff)
parentc08ea372f47f79c448acf3415397c700e2925579 (diff)
downloadmiasm-c4e78df2485a8368f74ad18a88cd3ab168258c3d.tar.gz
miasm-c4e78df2485a8368f74ad18a88cd3ab168258c3d.zip
Merge pull request #611 from commial/feature/dse-politics
Feature/dse politics
Diffstat (limited to 'example')
-rw-r--r--example/symbol_exec/dse_crackme.py64
-rw-r--r--example/symbol_exec/dse_strategies.py129
2 files changed, 158 insertions, 35 deletions
diff --git a/example/symbol_exec/dse_crackme.py b/example/symbol_exec/dse_crackme.py
index 34c39138..f4b42176 100644
--- a/example/symbol_exec/dse_crackme.py
+++ b/example/symbol_exec/dse_crackme.py
@@ -57,6 +57,10 @@ def xxx_fclose(jitter):
 # Create sandbox
 parser = Sandbox_Linux_x86_64.parser(description="ELF sandboxer")
 parser.add_argument("filename", help="ELF Filename")
+parser.add_argument("--strategy",
+                    choices=["code-cov", "branch-cov", "path-cov"],
+                    help="Strategy to use for solution creation",
+                    default="code-cov")
 options = parser.parse_args()
 options.mimic_env = True
 sb = Sandbox_Linux_x86_64(options.filename, options, globals())
@@ -209,44 +213,17 @@ def xxx_puts_symb(dse):
     raise FinishOn(string)
 
 
-done = set([]) # Set of jump address already handled
 todo = set([""]) # Set of file content to test
 
-class DSEGenFile(DSEPathConstraint):
-    """DSE with a specific solution creation:
-    The solution is the content of the FILE to be read
-
-    The politics of exploration is the branch coverage: create a solution only
-    if the target address has never been seen
-    """
-
-    def handle_solution(self, model, destination):
-        global todo, done
-
-        if destination in done:
-            # Skip this path, already treated
-            return
-
-        finfo = FILE_to_info_symb[FILE_stream]
-
-        # Build corresponding file
-        out = ""
-        fsize = max(model.eval(self.z3_trans.from_expr(FILE_size)).as_long(),
-                    len(finfo.gen_bytes))
-        for index in xrange(fsize):
-            try:
-                byteid = finfo.gen_bytes[index]
-                out += chr(model.eval(self.z3_trans.from_expr(byteid)).as_long())
-            except (KeyError, AttributeError) as _:
-                # Default value if there is no constraint on current byte
-                out += "\x00"
-
-        todo.add(out)
-        done.add(destination)
-
 # Instanciate the DSE engine
 machine = Machine("x86_64")
-dse = DSEGenFile(machine)
+# Convert strategy to the correct value
+strategy = {
+    "code-cov": DSEPathConstraint.PRODUCE_SOLUTION_CODE_COV,
+    "branch-cov": DSEPathConstraint.PRODUCE_SOLUTION_BRANCH_COV,
+    "path-cov": DSEPathConstraint.PRODUCE_SOLUTION_PATH_COV,
+}[options.strategy]
+dse = DSEPathConstraint(machine, produce_solution=strategy)
 
 # Attach to the jitter
 dse.attach(sb.jitter)
@@ -280,7 +257,7 @@ while todo:
     file_content = todo.pop()
     print "CUR: %r" % file_content
     open("test.txt", "w").write(file_content)
-    dse.restore_snapshot(snapshot)
+    dse.restore_snapshot(snapshot, keep_known_solutions=True)
     FILE_to_info.clear()
     FILE_to_info_symb.clear()
 
@@ -294,6 +271,23 @@ while todo:
             found = True
             break
 
+    finfo = FILE_to_info_symb[FILE_stream]
+    for sol_ident, model in dse.new_solutions.iteritems():
+        # Build the file corresponding to solution in 'model'
+
+        out = ""
+        fsize = max(model.eval(dse.z3_trans.from_expr(FILE_size)).as_long(),
+                    len(finfo.gen_bytes))
+        for index in xrange(fsize):
+            try:
+                byteid = finfo.gen_bytes[index]
+                out += chr(model.eval(dse.z3_trans.from_expr(byteid)).as_long())
+            except (KeyError, AttributeError) as _:
+                # Default value if there is no constraint on current byte
+                out += "\x00"
+
+        todo.add(out)
+
 # Assert that the result has been found
 assert found == True
 print "FOUND !"
diff --git a/example/symbol_exec/dse_strategies.py b/example/symbol_exec/dse_strategies.py
new file mode 100644
index 00000000..40998425
--- /dev/null
+++ b/example/symbol_exec/dse_strategies.py
@@ -0,0 +1,129 @@
+"""Example of DynamicSymbolicExecution engine use
+
+This example highlights how coverage can be obtained on a binary
+
+Expected target: 'simple_test.bin'
+
+Global overview:
+ - Prepare a 'jitter' instance with the targeted function
+ - Attach a DSE instance
+ - Make the function argument symbolic, to track constraints on it
+ - Take a snapshot
+ - Initialize the argument candidate list with an arbitrary value, 0
+ - Main loop:
+   - Restore the snapshot (initial state, before running the function)
+   - Take an argument candidate and set it in the jitter
+   - Run the function
+   - Ask the DSE for new candidates, according to its strategy, ie. finding new
+block / branch / path
+"""
+from argparse import ArgumentParser
+
+from miasm2.analysis.machine import Machine
+from miasm2.jitter.csts import PAGE_READ, PAGE_WRITE
+from miasm2.analysis.dse import DSEPathConstraint
+from miasm2.expression.expression import ExprMem, ExprId, ExprInt, ExprAff
+
+# Argument handling
+parser = ArgumentParser("DSE Example")
+parser.add_argument("filename", help="Target x86 shellcode")
+parser.add_argument("strategy", choices=["code-cov", "branch-cov", "path-cov"],
+                    help="Strategy to use for solution creation")
+args = parser.parse_args()
+
+# Convert strategy to the correct value
+strategy = {
+    "code-cov": DSEPathConstraint.PRODUCE_SOLUTION_CODE_COV,
+    "branch-cov": DSEPathConstraint.PRODUCE_SOLUTION_BRANCH_COV,
+    "path-cov": DSEPathConstraint.PRODUCE_SOLUTION_PATH_COV,
+}[args.strategy]
+
+# Map the shellcode
+run_addr = 0x40000
+machine = Machine("x86_32")
+jitter = machine.jitter("python")
+with open(args.filename) as fdesc:
+    jitter.vm.add_memory_page(run_addr, PAGE_READ | PAGE_WRITE, fdesc.read(),
+                              "Binary")
+
+# Expect a binary with one argument on the stack
+jitter.init_stack()
+
+# Argument
+jitter.push_uint32_t(0)
+
+# Handle return
+def code_sentinelle(jitter):
+    jitter.run = False
+    return False
+
+ret_addr = 0x1337beef
+jitter.add_breakpoint(ret_addr, code_sentinelle)
+jitter.push_uint32_t(ret_addr)
+
+# Init the jitter
+jitter.init_run(run_addr)
+
+# Init a DSE instance with a given strategy
+dse = DSEPathConstraint(machine, produce_solution=strategy)
+dse.attach(jitter)
+# Concretize everything exept the argument
+dse.update_state_from_concrete()
+regs = jitter.ir_arch.arch.regs
+arg = ExprId("ARG", 32)
+arg_addr = ExprMem(ExprInt(jitter.cpu.ESP + 4, regs.ESP.size), arg.size)
+dse.update_state({
+    # @[ESP + 4] = ARG
+    arg_addr: arg
+})
+
+# Explore solutions
+todo = set([ExprInt(0, arg.size)])
+done = set()
+snapshot = dse.take_snapshot()
+
+# Only needed for the final output
+reachs = set()
+
+while todo:
+    # Get the next candidate
+    arg_value = todo.pop()
+
+    # Avoid using twice the same input
+    if arg_value in done:
+        continue
+    done.add(arg_value)
+
+    print "Run with ARG = %s" % arg_value
+    # Restore state, while keeping already found solutions
+    dse.restore_snapshot(snapshot, keep_known_solutions=True)
+
+    # Reinit jitter (reset jitter.run, etc.)
+    jitter.init_run(run_addr) # TODO degage avec new PR?
+
+    # Set the argument value in the jitter context
+    jitter.eval_expr(ExprAff(arg_addr, arg_value))
+
+    # Launch
+    jitter.continue_run()
+
+    # Obtained solutions are in dse.new_solutions: identifier -> solution model
+    # The identifier depends on the strategy:
+    # - block address for code coverage
+    # - last edge for branch coverage
+    # - execution path for path coverage
+
+    for sol_ident, model in dse.new_solutions.iteritems():
+        print "Found a solution to reach: %s" % str(sol_ident)
+        # Get the argument to use as a Miasm Expr
+        sol_value = model.eval(dse.z3_trans.from_expr(arg)).as_long()
+        sol_expr = ExprInt(sol_value, arg.size)
+
+        # Display info and update storages
+        print "\tARG = %s" % sol_expr
+        todo.add(sol_expr)
+        reachs.add(sol_ident)
+
+print "Found %d input, to reach %d element of coverage" % (len(done),
+                                                           len(reachs))
+