about summary refs log tree commit diff stats
diff options
context:
space:
mode:
authorserpilliere <serpilliere@users.noreply.github.com>2017-09-05 20:50:21 +0200
committerGitHub <noreply@github.com>2017-09-05 20:50:21 +0200
commitc4e78df2485a8368f74ad18a88cd3ab168258c3d (patch)
tree60a78801b8f774ac6fb95c501317d08ea201c8f4
parent93a6b40af9fc16c662798f051187f4b0ddaff176 (diff)
parentc08ea372f47f79c448acf3415397c700e2925579 (diff)
downloadmiasm-c4e78df2485a8368f74ad18a88cd3ab168258c3d.tar.gz
miasm-c4e78df2485a8368f74ad18a88cd3ab168258c3d.zip
Merge pull request #611 from commial/feature/dse-politics
Feature/dse politics
-rw-r--r--example/symbol_exec/dse_crackme.py64
-rw-r--r--example/symbol_exec/dse_strategies.py129
-rw-r--r--miasm2/analysis/dse.py83
-rwxr-xr-xtest/test_all.py14
4 files changed, 240 insertions, 50 deletions
diff --git a/example/symbol_exec/dse_crackme.py b/example/symbol_exec/dse_crackme.py
index 34c39138..f4b42176 100644
--- a/example/symbol_exec/dse_crackme.py
+++ b/example/symbol_exec/dse_crackme.py
@@ -57,6 +57,10 @@ def xxx_fclose(jitter):
 # Create sandbox
 parser = Sandbox_Linux_x86_64.parser(description="ELF sandboxer")
 parser.add_argument("filename", help="ELF Filename")
+parser.add_argument("--strategy",
+                    choices=["code-cov", "branch-cov", "path-cov"],
+                    help="Strategy to use for solution creation",
+                    default="code-cov")
 options = parser.parse_args()
 options.mimic_env = True
 sb = Sandbox_Linux_x86_64(options.filename, options, globals())
@@ -209,44 +213,17 @@ def xxx_puts_symb(dse):
     raise FinishOn(string)
 
 
-done = set([]) # Set of jump address already handled
 todo = set([""]) # Set of file content to test
 
-class DSEGenFile(DSEPathConstraint):
-    """DSE with a specific solution creation:
-    The solution is the content of the FILE to be read
-
-    The politics of exploration is the branch coverage: create a solution only
-    if the target address has never been seen
-    """
-
-    def handle_solution(self, model, destination):
-        global todo, done
-
-        if destination in done:
-            # Skip this path, already treated
-            return
-
-        finfo = FILE_to_info_symb[FILE_stream]
-
-        # Build corresponding file
-        out = ""
-        fsize = max(model.eval(self.z3_trans.from_expr(FILE_size)).as_long(),
-                    len(finfo.gen_bytes))
-        for index in xrange(fsize):
-            try:
-                byteid = finfo.gen_bytes[index]
-                out += chr(model.eval(self.z3_trans.from_expr(byteid)).as_long())
-            except (KeyError, AttributeError) as _:
-                # Default value if there is no constraint on current byte
-                out += "\x00"
-
-        todo.add(out)
-        done.add(destination)
-
 # Instanciate the DSE engine
 machine = Machine("x86_64")
-dse = DSEGenFile(machine)
+# Convert strategy to the correct value
+strategy = {
+    "code-cov": DSEPathConstraint.PRODUCE_SOLUTION_CODE_COV,
+    "branch-cov": DSEPathConstraint.PRODUCE_SOLUTION_BRANCH_COV,
+    "path-cov": DSEPathConstraint.PRODUCE_SOLUTION_PATH_COV,
+}[options.strategy]
+dse = DSEPathConstraint(machine, produce_solution=strategy)
 
 # Attach to the jitter
 dse.attach(sb.jitter)
@@ -280,7 +257,7 @@ while todo:
     file_content = todo.pop()
     print "CUR: %r" % file_content
     open("test.txt", "w").write(file_content)
-    dse.restore_snapshot(snapshot)
+    dse.restore_snapshot(snapshot, keep_known_solutions=True)
     FILE_to_info.clear()
     FILE_to_info_symb.clear()
 
@@ -294,6 +271,23 @@ while todo:
             found = True
             break
 
+    finfo = FILE_to_info_symb[FILE_stream]
+    for sol_ident, model in dse.new_solutions.iteritems():
+        # Build the file corresponding to solution in 'model'
+
+        out = ""
+        fsize = max(model.eval(dse.z3_trans.from_expr(FILE_size)).as_long(),
+                    len(finfo.gen_bytes))
+        for index in xrange(fsize):
+            try:
+                byteid = finfo.gen_bytes[index]
+                out += chr(model.eval(dse.z3_trans.from_expr(byteid)).as_long())
+            except (KeyError, AttributeError) as _:
+                # Default value if there is no constraint on current byte
+                out += "\x00"
+
+        todo.add(out)
+
 # Assert that the result has been found
 assert found == True
 print "FOUND !"
diff --git a/example/symbol_exec/dse_strategies.py b/example/symbol_exec/dse_strategies.py
new file mode 100644
index 00000000..40998425
--- /dev/null
+++ b/example/symbol_exec/dse_strategies.py
@@ -0,0 +1,129 @@
+"""Example of DynamicSymbolicExecution engine use
+
+This example highlights how coverage can be obtained on a binary
+
+Expected target: 'simple_test.bin'
+
+Global overview:
+ - Prepare a 'jitter' instance with the targeted function
+ - Attach a DSE instance
+ - Make the function argument symbolic, to track constraints on it
+ - Take a snapshot
+ - Initialize the argument candidate list with an arbitrary value, 0
+ - Main loop:
+   - Restore the snapshot (initial state, before running the function)
+   - Take an argument candidate and set it in the jitter
+   - Run the function
+   - Ask the DSE for new candidates, according to its strategy, ie. finding new
+block / branch / path
+"""
+from argparse import ArgumentParser
+
+from miasm2.analysis.machine import Machine
+from miasm2.jitter.csts import PAGE_READ, PAGE_WRITE
+from miasm2.analysis.dse import DSEPathConstraint
+from miasm2.expression.expression import ExprMem, ExprId, ExprInt, ExprAff
+
+# Argument handling
+parser = ArgumentParser("DSE Example")
+parser.add_argument("filename", help="Target x86 shellcode")
+parser.add_argument("strategy", choices=["code-cov", "branch-cov", "path-cov"],
+                    help="Strategy to use for solution creation")
+args = parser.parse_args()
+
+# Convert strategy to the correct value
+strategy = {
+    "code-cov": DSEPathConstraint.PRODUCE_SOLUTION_CODE_COV,
+    "branch-cov": DSEPathConstraint.PRODUCE_SOLUTION_BRANCH_COV,
+    "path-cov": DSEPathConstraint.PRODUCE_SOLUTION_PATH_COV,
+}[args.strategy]
+
+# Map the shellcode
+run_addr = 0x40000
+machine = Machine("x86_32")
+jitter = machine.jitter("python")
+with open(args.filename) as fdesc:
+    jitter.vm.add_memory_page(run_addr, PAGE_READ | PAGE_WRITE, fdesc.read(),
+                              "Binary")
+
+# Expect a binary with one argument on the stack
+jitter.init_stack()
+
+# Argument
+jitter.push_uint32_t(0)
+
+# Handle return
+def code_sentinelle(jitter):
+    jitter.run = False
+    return False
+
+ret_addr = 0x1337beef
+jitter.add_breakpoint(ret_addr, code_sentinelle)
+jitter.push_uint32_t(ret_addr)
+
+# Init the jitter
+jitter.init_run(run_addr)
+
+# Init a DSE instance with a given strategy
+dse = DSEPathConstraint(machine, produce_solution=strategy)
+dse.attach(jitter)
+# Concretize everything exept the argument
+dse.update_state_from_concrete()
+regs = jitter.ir_arch.arch.regs
+arg = ExprId("ARG", 32)
+arg_addr = ExprMem(ExprInt(jitter.cpu.ESP + 4, regs.ESP.size), arg.size)
+dse.update_state({
+    # @[ESP + 4] = ARG
+    arg_addr: arg
+})
+
+# Explore solutions
+todo = set([ExprInt(0, arg.size)])
+done = set()
+snapshot = dse.take_snapshot()
+
+# Only needed for the final output
+reachs = set()
+
+while todo:
+    # Get the next candidate
+    arg_value = todo.pop()
+
+    # Avoid using twice the same input
+    if arg_value in done:
+        continue
+    done.add(arg_value)
+
+    print "Run with ARG = %s" % arg_value
+    # Restore state, while keeping already found solutions
+    dse.restore_snapshot(snapshot, keep_known_solutions=True)
+
+    # Reinit jitter (reset jitter.run, etc.)
+    jitter.init_run(run_addr) # TODO degage avec new PR?
+
+    # Set the argument value in the jitter context
+    jitter.eval_expr(ExprAff(arg_addr, arg_value))
+
+    # Launch
+    jitter.continue_run()
+
+    # Obtained solutions are in dse.new_solutions: identifier -> solution model
+    # The identifier depends on the strategy:
+    # - block address for code coverage
+    # - last edge for branch coverage
+    # - execution path for path coverage
+
+    for sol_ident, model in dse.new_solutions.iteritems():
+        print "Found a solution to reach: %s" % str(sol_ident)
+        # Get the argument to use as a Miasm Expr
+        sol_value = model.eval(dse.z3_trans.from_expr(arg)).as_long()
+        sol_expr = ExprInt(sol_value, arg.size)
+
+        # Display info and update storages
+        print "\tARG = %s" % sol_expr
+        todo.add(sol_expr)
+        reachs.add(sol_ident)
+
+print "Found %d input, to reach %d element of coverage" % (len(done),
+                                                           len(reachs))
+
diff --git a/miasm2/analysis/dse.py b/miasm2/analysis/dse.py
index 74cc87e9..d97897d8 100644
--- a/miasm2/analysis/dse.py
+++ b/miasm2/analysis/dse.py
@@ -26,8 +26,9 @@ On branch, if the decision is symbolic, one can also collect "path constraints"
 and inverse them to produce new inputs potentially reaching new paths.
 
 Basically, this is done through DSEPathConstraint. In order to produce a new
-solution, one can extend this class, and override 'handle_solution' to
-produce a solution which fit its needs.
+solution, one can extend this class, and override 'handle_solution' to produce a
+solution which fit its needs. It could avoid computing new solution by
+overriding 'produce_solution'.
 
 If one is only interested in constraints associated to its path, the option
 "produce_solution" should be set to False, to speed up emulation.
@@ -448,7 +449,8 @@ class DSEPathConstraint(DSEEngine):
     to a new path.
 
     In order to produce a new solution, one can extend this class, and override
-    'handle_solution' to produce a solution which fit its needs.
+    'handle_solution' to produce a solution which fit its needs. It could avoid
+    computing new solution by overriding 'produce_solution'.
 
     If one is only interested in constraints associated to its path, the option
     "produce_solution" should be set to False, to speed up emulation.
@@ -459,7 +461,15 @@ class DSEPathConstraint(DSEEngine):
     # Maximum memory size to inject in constraints solving
     MAX_MEMORY_INJECT = 0x10000
 
-    def __init__(self, machine, produce_solution=True, **kwargs):
+    # Produce solution strategies
+    PRODUCE_NO_SOLUTION = 0
+    PRODUCE_SOLUTION_CODE_COV = 1
+    PRODUCE_SOLUTION_BRANCH_COV = 2
+    PRODUCE_SOLUTION_PATH_COV = 3
+
+    def __init__(self, machine, produce_solution=PRODUCE_SOLUTION_CODE_COV,
+                 known_solutions=None,
+                 **kwargs):
         """Init a DSEPathConstraint
         @machine: Machine of the targeted architecture instance
         @produce_solution: (optional) if set, new solutions will be computed"""
@@ -469,10 +479,14 @@ class DSEPathConstraint(DSEEngine):
         assert z3 is not None
 
         # Init PathConstraint specifics structures
-        self.produce_solution = produce_solution
         self.cur_solver = z3.Solver()
-        self.new_solutions = {} # destination -> set of model
+        self.new_solutions = {} # solution identifier -> solution's model
+        self._known_solutions = set() # set of solution identifiers
         self.z3_trans = Translator.to_language("z3")
+        self._produce_solution_strategy = produce_solution
+        self._history = None
+        if produce_solution == self.PRODUCE_SOLUTION_PATH_COV:
+            self._history = [] # List of addresses in the current path
 
     def take_snapshot(self, *args, **kwargs):
         snap = super(DSEPathConstraint, self).take_snapshot(*args, **kwargs)
@@ -481,22 +495,69 @@ class DSEPathConstraint(DSEEngine):
         snap["cur_constraints"] = self.cur_solver.assertions()
         return snap
 
-    def restore_snapshot(self, snapshot, *args, **kwargs):
-        super(DSEPathConstraint, self).restore_snapshot(snapshot, *args,
-                                                        **kwargs)
+    def restore_snapshot(self, snapshot, keep_known_solutions=True, **kwargs):
+        """Restore a DSEPathConstraint snapshot
+        @keep_known_solutions: if set, do not forget solutions already found.
+        -> They will not appear in 'new_solutions'
+        """
+        super(DSEPathConstraint, self).restore_snapshot(snapshot, **kwargs)
         self.new_solutions.clear()
         self.new_solutions.update(snapshot["new_solutions"])
         self.cur_solver = z3.Solver()
         self.cur_solver.add(snapshot["cur_constraints"])
+        if not keep_known_solutions:
+            self._known_solutions.clear()
+
+    def _key_for_solution_strategy(self, destination):
+        """Return the associated identifier for the current solution strategy"""
+        if self._produce_solution_strategy == self.PRODUCE_NO_SOLUTION:
+            # Never produce a solution
+            return None
+        elif self._produce_solution_strategy == self.PRODUCE_SOLUTION_CODE_COV:
+            # Decision based on code coverage
+            # -> produce a solution if the destination has never been seen
+            key = destination
+
+        elif self._produce_solution_strategy == self.PRODUCE_SOLUTION_BRANCH_COV:
+            # Decision based on branch coverage
+            # -> produce a solution if the current branch has never been take
+            cur_addr = ExprInt(self.jitter.pc, self.ir_arch.IRDst.size)
+            key = (cur_addr, destination)
+
+        elif self._produce_solution_strategy == self.PRODUCE_SOLUTION_PATH_COV:
+            # Decision based on path coverage
+            # -> produce a solution if the current path has never been take
+            key = tuple(self._history + [destination])
+        else:
+            raise ValueError("Unknown produce solution strategy")
+
+        return key
+
+    def produce_solution(self, destination):
+        """Called to determine if a solution for @destination should be test for
+        satisfiability and computed
+        @destination: Expr instance of the target @destination
+        """
+        key = self._key_for_solution_strategy(destination)
+        if key is None:
+            return False
+        return key not in self._known_solutions
 
     def handle_solution(self, model, destination):
         """Called when a new solution for destination @destination is founded
         @model: z3 model instance
         @destination: Expr instance for an addr which is not on the DSE path
         """
-        self.new_solutions.setdefault(destination, set()).add(model)
+        key = self._key_for_solution_strategy(destination)
+        assert key is not None
+        self.new_solutions[key] = model
+        self._known_solutions.add(key)
 
     def handle(self, cur_addr):
+        # Update history if needed
+        if self._produce_solution_strategy == self.PRODUCE_SOLUTION_PATH_COV:
+            self._history.append(cur_addr)
+
         symb_pc = self.eval_expr(self.ir_arch.IRDst)
         possibilities = possible_values(symb_pc)
         if len(possibilities) == 1:
@@ -547,7 +608,7 @@ class DSEPathConstraint(DSEEngine):
                     # Add path constraint
                     cur_path_constraint = path_constraint
 
-                elif self.produce_solution:
+                elif self.produce_solution(possibility.value):
                     # Looking for a new solution
                     self.cur_solver.push()
                     for cons in path_constraint:
diff --git a/test/test_all.py b/test/test_all.py
index 70ad62b8..30408ee2 100755
--- a/test/test_all.py
+++ b/test/test_all.py
@@ -631,10 +631,16 @@ dse_crackme = ExampleSymbolExec([Example.get_sample("dse_crackme.c"),
                                 products=[dse_crackme_out],
                                 executable="cc")
 testset += dse_crackme
-testset += ExampleSymbolExec(["dse_crackme.py", dse_crackme_out],
-                             depends=[dse_crackme],
-                             products=["test.txt"],
-                             tags=[TAGS["z3"]])
+for strategy in ["code-cov", "branch-cov", "path-cov"]:
+    testset += ExampleSymbolExec(["dse_crackme.py", dse_crackme_out,
+                                  "--strategy", strategy],
+                                 depends=[dse_crackme],
+                                 products=["test.txt"],
+                                 tags=[TAGS["z3"]])
+    testset += ExampleSymbolExec(["dse_strategies.py",
+                                  Example.get_sample("simple_test.bin"),
+                                  strategy],
+                                 tags=[TAGS["z3"]])
 
 ## Jitter
 class ExampleJitter(Example):