diff options
| author | serpilliere <serpilliere@users.noreply.github.com> | 2017-09-05 20:50:21 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2017-09-05 20:50:21 +0200 |
| commit | c4e78df2485a8368f74ad18a88cd3ab168258c3d (patch) | |
| tree | 60a78801b8f774ac6fb95c501317d08ea201c8f4 | |
| parent | 93a6b40af9fc16c662798f051187f4b0ddaff176 (diff) | |
| parent | c08ea372f47f79c448acf3415397c700e2925579 (diff) | |
| download | miasm-c4e78df2485a8368f74ad18a88cd3ab168258c3d.tar.gz miasm-c4e78df2485a8368f74ad18a88cd3ab168258c3d.zip | |
Merge pull request #611 from commial/feature/dse-politics
Feature/dse politics
| -rw-r--r-- | example/symbol_exec/dse_crackme.py | 64 | ||||
| -rw-r--r-- | example/symbol_exec/dse_strategies.py | 129 | ||||
| -rw-r--r-- | miasm2/analysis/dse.py | 83 | ||||
| -rwxr-xr-x | test/test_all.py | 14 |
4 files changed, 240 insertions, 50 deletions
diff --git a/example/symbol_exec/dse_crackme.py b/example/symbol_exec/dse_crackme.py index 34c39138..f4b42176 100644 --- a/example/symbol_exec/dse_crackme.py +++ b/example/symbol_exec/dse_crackme.py @@ -57,6 +57,10 @@ def xxx_fclose(jitter): # Create sandbox parser = Sandbox_Linux_x86_64.parser(description="ELF sandboxer") parser.add_argument("filename", help="ELF Filename") +parser.add_argument("--strategy", + choices=["code-cov", "branch-cov", "path-cov"], + help="Strategy to use for solution creation", + default="code-cov") options = parser.parse_args() options.mimic_env = True sb = Sandbox_Linux_x86_64(options.filename, options, globals()) @@ -209,44 +213,17 @@ def xxx_puts_symb(dse): raise FinishOn(string) -done = set([]) # Set of jump address already handled todo = set([""]) # Set of file content to test -class DSEGenFile(DSEPathConstraint): - """DSE with a specific solution creation: - The solution is the content of the FILE to be read - - The politics of exploration is the branch coverage: create a solution only - if the target address has never been seen - """ - - def handle_solution(self, model, destination): - global todo, done - - if destination in done: - # Skip this path, already treated - return - - finfo = FILE_to_info_symb[FILE_stream] - - # Build corresponding file - out = "" - fsize = max(model.eval(self.z3_trans.from_expr(FILE_size)).as_long(), - len(finfo.gen_bytes)) - for index in xrange(fsize): - try: - byteid = finfo.gen_bytes[index] - out += chr(model.eval(self.z3_trans.from_expr(byteid)).as_long()) - except (KeyError, AttributeError) as _: - # Default value if there is no constraint on current byte - out += "\x00" - - todo.add(out) - done.add(destination) - # Instanciate the DSE engine machine = Machine("x86_64") -dse = DSEGenFile(machine) +# Convert strategy to the correct value +strategy = { + "code-cov": DSEPathConstraint.PRODUCE_SOLUTION_CODE_COV, + "branch-cov": DSEPathConstraint.PRODUCE_SOLUTION_BRANCH_COV, + "path-cov": DSEPathConstraint.PRODUCE_SOLUTION_PATH_COV, +}[options.strategy] +dse = DSEPathConstraint(machine, produce_solution=strategy) # Attach to the jitter dse.attach(sb.jitter) @@ -280,7 +257,7 @@ while todo: file_content = todo.pop() print "CUR: %r" % file_content open("test.txt", "w").write(file_content) - dse.restore_snapshot(snapshot) + dse.restore_snapshot(snapshot, keep_known_solutions=True) FILE_to_info.clear() FILE_to_info_symb.clear() @@ -294,6 +271,23 @@ while todo: found = True break + finfo = FILE_to_info_symb[FILE_stream] + for sol_ident, model in dse.new_solutions.iteritems(): + # Build the file corresponding to solution in 'model' + + out = "" + fsize = max(model.eval(dse.z3_trans.from_expr(FILE_size)).as_long(), + len(finfo.gen_bytes)) + for index in xrange(fsize): + try: + byteid = finfo.gen_bytes[index] + out += chr(model.eval(dse.z3_trans.from_expr(byteid)).as_long()) + except (KeyError, AttributeError) as _: + # Default value if there is no constraint on current byte + out += "\x00" + + todo.add(out) + # Assert that the result has been found assert found == True print "FOUND !" diff --git a/example/symbol_exec/dse_strategies.py b/example/symbol_exec/dse_strategies.py new file mode 100644 index 00000000..40998425 --- /dev/null +++ b/example/symbol_exec/dse_strategies.py @@ -0,0 +1,129 @@ +"""Example of DynamicSymbolicExecution engine use + +This example highlights how coverage can be obtained on a binary + +Expected target: 'simple_test.bin' + +Global overview: + - Prepare a 'jitter' instance with the targeted function + - Attach a DSE instance + - Make the function argument symbolic, to track constraints on it + - Take a snapshot + - Initialize the argument candidate list with an arbitrary value, 0 + - Main loop: + - Restore the snapshot (initial state, before running the function) + - Take an argument candidate and set it in the jitter + - Run the function + - Ask the DSE for new candidates, according to its strategy, ie. finding new +block / branch / path +""" +from argparse import ArgumentParser + +from miasm2.analysis.machine import Machine +from miasm2.jitter.csts import PAGE_READ, PAGE_WRITE +from miasm2.analysis.dse import DSEPathConstraint +from miasm2.expression.expression import ExprMem, ExprId, ExprInt, ExprAff + +# Argument handling +parser = ArgumentParser("DSE Example") +parser.add_argument("filename", help="Target x86 shellcode") +parser.add_argument("strategy", choices=["code-cov", "branch-cov", "path-cov"], + help="Strategy to use for solution creation") +args = parser.parse_args() + +# Convert strategy to the correct value +strategy = { + "code-cov": DSEPathConstraint.PRODUCE_SOLUTION_CODE_COV, + "branch-cov": DSEPathConstraint.PRODUCE_SOLUTION_BRANCH_COV, + "path-cov": DSEPathConstraint.PRODUCE_SOLUTION_PATH_COV, +}[args.strategy] + +# Map the shellcode +run_addr = 0x40000 +machine = Machine("x86_32") +jitter = machine.jitter("python") +with open(args.filename) as fdesc: + jitter.vm.add_memory_page(run_addr, PAGE_READ | PAGE_WRITE, fdesc.read(), + "Binary") + +# Expect a binary with one argument on the stack +jitter.init_stack() + +# Argument +jitter.push_uint32_t(0) + +# Handle return +def code_sentinelle(jitter): + jitter.run = False + return False + +ret_addr = 0x1337beef +jitter.add_breakpoint(ret_addr, code_sentinelle) +jitter.push_uint32_t(ret_addr) + +# Init the jitter +jitter.init_run(run_addr) + +# Init a DSE instance with a given strategy +dse = DSEPathConstraint(machine, produce_solution=strategy) +dse.attach(jitter) +# Concretize everything exept the argument +dse.update_state_from_concrete() +regs = jitter.ir_arch.arch.regs +arg = ExprId("ARG", 32) +arg_addr = ExprMem(ExprInt(jitter.cpu.ESP + 4, regs.ESP.size), arg.size) +dse.update_state({ + # @[ESP + 4] = ARG + arg_addr: arg +}) + +# Explore solutions +todo = set([ExprInt(0, arg.size)]) +done = set() +snapshot = dse.take_snapshot() + +# Only needed for the final output +reachs = set() + +while todo: + # Get the next candidate + arg_value = todo.pop() + + # Avoid using twice the same input + if arg_value in done: + continue + done.add(arg_value) + + print "Run with ARG = %s" % arg_value + # Restore state, while keeping already found solutions + dse.restore_snapshot(snapshot, keep_known_solutions=True) + + # Reinit jitter (reset jitter.run, etc.) + jitter.init_run(run_addr) # TODO degage avec new PR? + + # Set the argument value in the jitter context + jitter.eval_expr(ExprAff(arg_addr, arg_value)) + + # Launch + jitter.continue_run() + + # Obtained solutions are in dse.new_solutions: identifier -> solution model + # The identifier depends on the strategy: + # - block address for code coverage + # - last edge for branch coverage + # - execution path for path coverage + + for sol_ident, model in dse.new_solutions.iteritems(): + print "Found a solution to reach: %s" % str(sol_ident) + # Get the argument to use as a Miasm Expr + sol_value = model.eval(dse.z3_trans.from_expr(arg)).as_long() + sol_expr = ExprInt(sol_value, arg.size) + + # Display info and update storages + print "\tARG = %s" % sol_expr + todo.add(sol_expr) + reachs.add(sol_ident) + +print "Found %d input, to reach %d element of coverage" % (len(done), + len(reachs)) + diff --git a/miasm2/analysis/dse.py b/miasm2/analysis/dse.py index 74cc87e9..d97897d8 100644 --- a/miasm2/analysis/dse.py +++ b/miasm2/analysis/dse.py @@ -26,8 +26,9 @@ On branch, if the decision is symbolic, one can also collect "path constraints" and inverse them to produce new inputs potentially reaching new paths. Basically, this is done through DSEPathConstraint. In order to produce a new -solution, one can extend this class, and override 'handle_solution' to -produce a solution which fit its needs. +solution, one can extend this class, and override 'handle_solution' to produce a +solution which fit its needs. It could avoid computing new solution by +overriding 'produce_solution'. If one is only interested in constraints associated to its path, the option "produce_solution" should be set to False, to speed up emulation. @@ -448,7 +449,8 @@ class DSEPathConstraint(DSEEngine): to a new path. In order to produce a new solution, one can extend this class, and override - 'handle_solution' to produce a solution which fit its needs. + 'handle_solution' to produce a solution which fit its needs. It could avoid + computing new solution by overriding 'produce_solution'. If one is only interested in constraints associated to its path, the option "produce_solution" should be set to False, to speed up emulation. @@ -459,7 +461,15 @@ class DSEPathConstraint(DSEEngine): # Maximum memory size to inject in constraints solving MAX_MEMORY_INJECT = 0x10000 - def __init__(self, machine, produce_solution=True, **kwargs): + # Produce solution strategies + PRODUCE_NO_SOLUTION = 0 + PRODUCE_SOLUTION_CODE_COV = 1 + PRODUCE_SOLUTION_BRANCH_COV = 2 + PRODUCE_SOLUTION_PATH_COV = 3 + + def __init__(self, machine, produce_solution=PRODUCE_SOLUTION_CODE_COV, + known_solutions=None, + **kwargs): """Init a DSEPathConstraint @machine: Machine of the targeted architecture instance @produce_solution: (optional) if set, new solutions will be computed""" @@ -469,10 +479,14 @@ class DSEPathConstraint(DSEEngine): assert z3 is not None # Init PathConstraint specifics structures - self.produce_solution = produce_solution self.cur_solver = z3.Solver() - self.new_solutions = {} # destination -> set of model + self.new_solutions = {} # solution identifier -> solution's model + self._known_solutions = set() # set of solution identifiers self.z3_trans = Translator.to_language("z3") + self._produce_solution_strategy = produce_solution + self._history = None + if produce_solution == self.PRODUCE_SOLUTION_PATH_COV: + self._history = [] # List of addresses in the current path def take_snapshot(self, *args, **kwargs): snap = super(DSEPathConstraint, self).take_snapshot(*args, **kwargs) @@ -481,22 +495,69 @@ class DSEPathConstraint(DSEEngine): snap["cur_constraints"] = self.cur_solver.assertions() return snap - def restore_snapshot(self, snapshot, *args, **kwargs): - super(DSEPathConstraint, self).restore_snapshot(snapshot, *args, - **kwargs) + def restore_snapshot(self, snapshot, keep_known_solutions=True, **kwargs): + """Restore a DSEPathConstraint snapshot + @keep_known_solutions: if set, do not forget solutions already found. + -> They will not appear in 'new_solutions' + """ + super(DSEPathConstraint, self).restore_snapshot(snapshot, **kwargs) self.new_solutions.clear() self.new_solutions.update(snapshot["new_solutions"]) self.cur_solver = z3.Solver() self.cur_solver.add(snapshot["cur_constraints"]) + if not keep_known_solutions: + self._known_solutions.clear() + + def _key_for_solution_strategy(self, destination): + """Return the associated identifier for the current solution strategy""" + if self._produce_solution_strategy == self.PRODUCE_NO_SOLUTION: + # Never produce a solution + return None + elif self._produce_solution_strategy == self.PRODUCE_SOLUTION_CODE_COV: + # Decision based on code coverage + # -> produce a solution if the destination has never been seen + key = destination + + elif self._produce_solution_strategy == self.PRODUCE_SOLUTION_BRANCH_COV: + # Decision based on branch coverage + # -> produce a solution if the current branch has never been take + cur_addr = ExprInt(self.jitter.pc, self.ir_arch.IRDst.size) + key = (cur_addr, destination) + + elif self._produce_solution_strategy == self.PRODUCE_SOLUTION_PATH_COV: + # Decision based on path coverage + # -> produce a solution if the current path has never been take + key = tuple(self._history + [destination]) + else: + raise ValueError("Unknown produce solution strategy") + + return key + + def produce_solution(self, destination): + """Called to determine if a solution for @destination should be test for + satisfiability and computed + @destination: Expr instance of the target @destination + """ + key = self._key_for_solution_strategy(destination) + if key is None: + return False + return key not in self._known_solutions def handle_solution(self, model, destination): """Called when a new solution for destination @destination is founded @model: z3 model instance @destination: Expr instance for an addr which is not on the DSE path """ - self.new_solutions.setdefault(destination, set()).add(model) + key = self._key_for_solution_strategy(destination) + assert key is not None + self.new_solutions[key] = model + self._known_solutions.add(key) def handle(self, cur_addr): + # Update history if needed + if self._produce_solution_strategy == self.PRODUCE_SOLUTION_PATH_COV: + self._history.append(cur_addr) + symb_pc = self.eval_expr(self.ir_arch.IRDst) possibilities = possible_values(symb_pc) if len(possibilities) == 1: @@ -547,7 +608,7 @@ class DSEPathConstraint(DSEEngine): # Add path constraint cur_path_constraint = path_constraint - elif self.produce_solution: + elif self.produce_solution(possibility.value): # Looking for a new solution self.cur_solver.push() for cons in path_constraint: diff --git a/test/test_all.py b/test/test_all.py index 70ad62b8..30408ee2 100755 --- a/test/test_all.py +++ b/test/test_all.py @@ -631,10 +631,16 @@ dse_crackme = ExampleSymbolExec([Example.get_sample("dse_crackme.c"), products=[dse_crackme_out], executable="cc") testset += dse_crackme -testset += ExampleSymbolExec(["dse_crackme.py", dse_crackme_out], - depends=[dse_crackme], - products=["test.txt"], - tags=[TAGS["z3"]]) +for strategy in ["code-cov", "branch-cov", "path-cov"]: + testset += ExampleSymbolExec(["dse_crackme.py", dse_crackme_out, + "--strategy", strategy], + depends=[dse_crackme], + products=["test.txt"], + tags=[TAGS["z3"]]) + testset += ExampleSymbolExec(["dse_strategies.py", + Example.get_sample("simple_test.bin"), + strategy], + tags=[TAGS["z3"]]) ## Jitter class ExampleJitter(Example): |