diff options
| author | serpilliere <serpilliere@users.noreply.github.com> | 2017-09-05 20:50:21 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2017-09-05 20:50:21 +0200 |
| commit | c4e78df2485a8368f74ad18a88cd3ab168258c3d (patch) | |
| tree | 60a78801b8f774ac6fb95c501317d08ea201c8f4 /example | |
| parent | 93a6b40af9fc16c662798f051187f4b0ddaff176 (diff) | |
| parent | c08ea372f47f79c448acf3415397c700e2925579 (diff) | |
| download | miasm-c4e78df2485a8368f74ad18a88cd3ab168258c3d.tar.gz miasm-c4e78df2485a8368f74ad18a88cd3ab168258c3d.zip | |
Merge pull request #611 from commial/feature/dse-politics
Feature/dse politics
Diffstat (limited to 'example')
| -rw-r--r-- | example/symbol_exec/dse_crackme.py | 64 | ||||
| -rw-r--r-- | example/symbol_exec/dse_strategies.py | 129 |
2 files changed, 158 insertions, 35 deletions
diff --git a/example/symbol_exec/dse_crackme.py b/example/symbol_exec/dse_crackme.py index 34c39138..f4b42176 100644 --- a/example/symbol_exec/dse_crackme.py +++ b/example/symbol_exec/dse_crackme.py @@ -57,6 +57,10 @@ def xxx_fclose(jitter): # Create sandbox parser = Sandbox_Linux_x86_64.parser(description="ELF sandboxer") parser.add_argument("filename", help="ELF Filename") +parser.add_argument("--strategy", + choices=["code-cov", "branch-cov", "path-cov"], + help="Strategy to use for solution creation", + default="code-cov") options = parser.parse_args() options.mimic_env = True sb = Sandbox_Linux_x86_64(options.filename, options, globals()) @@ -209,44 +213,17 @@ def xxx_puts_symb(dse): raise FinishOn(string) -done = set([]) # Set of jump address already handled todo = set([""]) # Set of file content to test -class DSEGenFile(DSEPathConstraint): - """DSE with a specific solution creation: - The solution is the content of the FILE to be read - - The politics of exploration is the branch coverage: create a solution only - if the target address has never been seen - """ - - def handle_solution(self, model, destination): - global todo, done - - if destination in done: - # Skip this path, already treated - return - - finfo = FILE_to_info_symb[FILE_stream] - - # Build corresponding file - out = "" - fsize = max(model.eval(self.z3_trans.from_expr(FILE_size)).as_long(), - len(finfo.gen_bytes)) - for index in xrange(fsize): - try: - byteid = finfo.gen_bytes[index] - out += chr(model.eval(self.z3_trans.from_expr(byteid)).as_long()) - except (KeyError, AttributeError) as _: - # Default value if there is no constraint on current byte - out += "\x00" - - todo.add(out) - done.add(destination) - # Instanciate the DSE engine machine = Machine("x86_64") -dse = DSEGenFile(machine) +# Convert strategy to the correct value +strategy = { + "code-cov": DSEPathConstraint.PRODUCE_SOLUTION_CODE_COV, + "branch-cov": DSEPathConstraint.PRODUCE_SOLUTION_BRANCH_COV, + "path-cov": DSEPathConstraint.PRODUCE_SOLUTION_PATH_COV, +}[options.strategy] +dse = DSEPathConstraint(machine, produce_solution=strategy) # Attach to the jitter dse.attach(sb.jitter) @@ -280,7 +257,7 @@ while todo: file_content = todo.pop() print "CUR: %r" % file_content open("test.txt", "w").write(file_content) - dse.restore_snapshot(snapshot) + dse.restore_snapshot(snapshot, keep_known_solutions=True) FILE_to_info.clear() FILE_to_info_symb.clear() @@ -294,6 +271,23 @@ while todo: found = True break + finfo = FILE_to_info_symb[FILE_stream] + for sol_ident, model in dse.new_solutions.iteritems(): + # Build the file corresponding to solution in 'model' + + out = "" + fsize = max(model.eval(dse.z3_trans.from_expr(FILE_size)).as_long(), + len(finfo.gen_bytes)) + for index in xrange(fsize): + try: + byteid = finfo.gen_bytes[index] + out += chr(model.eval(dse.z3_trans.from_expr(byteid)).as_long()) + except (KeyError, AttributeError) as _: + # Default value if there is no constraint on current byte + out += "\x00" + + todo.add(out) + # Assert that the result has been found assert found == True print "FOUND !" diff --git a/example/symbol_exec/dse_strategies.py b/example/symbol_exec/dse_strategies.py new file mode 100644 index 00000000..40998425 --- /dev/null +++ b/example/symbol_exec/dse_strategies.py @@ -0,0 +1,129 @@ +"""Example of DynamicSymbolicExecution engine use + +This example highlights how coverage can be obtained on a binary + +Expected target: 'simple_test.bin' + +Global overview: + - Prepare a 'jitter' instance with the targeted function + - Attach a DSE instance + - Make the function argument symbolic, to track constraints on it + - Take a snapshot + - Initialize the argument candidate list with an arbitrary value, 0 + - Main loop: + - Restore the snapshot (initial state, before running the function) + - Take an argument candidate and set it in the jitter + - Run the function + - Ask the DSE for new candidates, according to its strategy, ie. finding new +block / branch / path +""" +from argparse import ArgumentParser + +from miasm2.analysis.machine import Machine +from miasm2.jitter.csts import PAGE_READ, PAGE_WRITE +from miasm2.analysis.dse import DSEPathConstraint +from miasm2.expression.expression import ExprMem, ExprId, ExprInt, ExprAff + +# Argument handling +parser = ArgumentParser("DSE Example") +parser.add_argument("filename", help="Target x86 shellcode") +parser.add_argument("strategy", choices=["code-cov", "branch-cov", "path-cov"], + help="Strategy to use for solution creation") +args = parser.parse_args() + +# Convert strategy to the correct value +strategy = { + "code-cov": DSEPathConstraint.PRODUCE_SOLUTION_CODE_COV, + "branch-cov": DSEPathConstraint.PRODUCE_SOLUTION_BRANCH_COV, + "path-cov": DSEPathConstraint.PRODUCE_SOLUTION_PATH_COV, +}[args.strategy] + +# Map the shellcode +run_addr = 0x40000 +machine = Machine("x86_32") +jitter = machine.jitter("python") +with open(args.filename) as fdesc: + jitter.vm.add_memory_page(run_addr, PAGE_READ | PAGE_WRITE, fdesc.read(), + "Binary") + +# Expect a binary with one argument on the stack +jitter.init_stack() + +# Argument +jitter.push_uint32_t(0) + +# Handle return +def code_sentinelle(jitter): + jitter.run = False + return False + +ret_addr = 0x1337beef +jitter.add_breakpoint(ret_addr, code_sentinelle) +jitter.push_uint32_t(ret_addr) + +# Init the jitter +jitter.init_run(run_addr) + +# Init a DSE instance with a given strategy +dse = DSEPathConstraint(machine, produce_solution=strategy) +dse.attach(jitter) +# Concretize everything exept the argument +dse.update_state_from_concrete() +regs = jitter.ir_arch.arch.regs +arg = ExprId("ARG", 32) +arg_addr = ExprMem(ExprInt(jitter.cpu.ESP + 4, regs.ESP.size), arg.size) +dse.update_state({ + # @[ESP + 4] = ARG + arg_addr: arg +}) + +# Explore solutions +todo = set([ExprInt(0, arg.size)]) +done = set() +snapshot = dse.take_snapshot() + +# Only needed for the final output +reachs = set() + +while todo: + # Get the next candidate + arg_value = todo.pop() + + # Avoid using twice the same input + if arg_value in done: + continue + done.add(arg_value) + + print "Run with ARG = %s" % arg_value + # Restore state, while keeping already found solutions + dse.restore_snapshot(snapshot, keep_known_solutions=True) + + # Reinit jitter (reset jitter.run, etc.) + jitter.init_run(run_addr) # TODO degage avec new PR? + + # Set the argument value in the jitter context + jitter.eval_expr(ExprAff(arg_addr, arg_value)) + + # Launch + jitter.continue_run() + + # Obtained solutions are in dse.new_solutions: identifier -> solution model + # The identifier depends on the strategy: + # - block address for code coverage + # - last edge for branch coverage + # - execution path for path coverage + + for sol_ident, model in dse.new_solutions.iteritems(): + print "Found a solution to reach: %s" % str(sol_ident) + # Get the argument to use as a Miasm Expr + sol_value = model.eval(dse.z3_trans.from_expr(arg)).as_long() + sol_expr = ExprInt(sol_value, arg.size) + + # Display info and update storages + print "\tARG = %s" % sol_expr + todo.add(sol_expr) + reachs.add(sol_ident) + +print "Found %d input, to reach %d element of coverage" % (len(done), + len(reachs)) + |