diff options
| author | ajax <devnull@localhost> | 2014-10-17 23:07:10 +0200 |
|---|---|---|
| committer | ajax <devnull@localhost> | 2014-10-17 23:07:10 +0200 |
| commit | dd3e3df9113fe1f0c7a339e7b657df5439d8cd1c (patch) | |
| tree | 6299c7929bc7e92b452486767b045f772136b172 | |
| parent | 473f60bbd719911e56865f87e5be1b98e6c96448 (diff) | |
| download | miasm-dd3e3df9113fe1f0c7a339e7b657df5439d8cd1c.tar.gz miasm-dd3e3df9113fe1f0c7a339e7b657df5439d8cd1c.zip | |
TestAll: Update test all to use test utils
| -rw-r--r-- | test/test_all.py | 676 |
1 files changed, 218 insertions, 458 deletions
diff --git a/test/test_all.py b/test/test_all.py index fdba4be4..b3059ae7 100644 --- a/test/test_all.py +++ b/test/test_all.py @@ -1,473 +1,233 @@ -import subprocess -import sys -import os -import time import argparse -import tempfile - -# Test derivations -def all_jit(assembly_line): - """Add all available jitter options to assembly_line thanks to '--jitter' - option. - @assembly_line: list(str) - Return a list of assembly lines: list(list(str)). - """ - out = [] - for jitter in ["tcc", "llvm", "python"]: - out.append(assembly_line + ["--jitter", jitter]) - return out - -# Available tests - -all_tests = { - "test": { - "architecture": [ - ["arch/x86/arch.py"], - ["arch/x86/sem.py"], - ["arch/x86/unit/mn_strings.py"], - ["arch/x86/unit/mn_float.py"], - ["arch/arm/arch.py"], - ["arch/arm/sem.py"], - ["arch/msp430/arch.py"], - ["arch/msp430/sem.py"], - ["arch/sh4/arch.py"], - ["arch/mips32/arch.py"], - ], - "core": [ - ["core/interval.py"], - ["core/graph.py"], - ["core/parse_asm.py"], - ], - "expression": [ - ["expression/modint.py"], - ["expression/stp.py"], - ["expression/simplifications.py"], - ], - "ir": [ - ["ir/ir2C.py"], - ["ir/symbexec.py"], - ], - "os_dep": [ - ["os_dep/win_api_x86_32.py"], - ], - "order": [ - "architecture", - "core", - "expression", - "ir", - "os_dep", - ], - }, - "example": { - "assembler": [ - ["asm_x86.py"], - ["asm_arm.py"], - ["asm_armt.py"], - ["asm_box_x86_32.py"], - ["asm_box_x86_32_enc.py"], - ["asm_box_x86_32_mod.py"], - ["asm_box_x86_32_mod_self.py"], - ["asm_box_x86_32_repmod.py"], - ["asm_msp430_sc.py"], - ["asm_mips32.py"], - ["disasm_01.py"], - ["disasm_02.py"], - ["disasm_03.py", "box_upx.exe", "0x410f90"], - ], - "expression": [ - ["symbol_exec.py"], - ["expression/basic_op.py"], - ["expression/get_read_write.py"], - ["expression/basic_simplification.py"], - ["expression/graph_dataflow.py", +import time +from utils.test import Test +from utils.testset import TestSet +from utils import cosmetics, monothread, screendisplay + +testset = TestSet("../") + +# Regression tests +## Architecture +testset += Test(["x86/arch.py"], base_dir="test/arch", + products=["x86_speed_reg_test.bin", + "regression_test16_ia32.bin", + "regression_test32_ia32.bin", + "regression_test64_ia32.bin"]) +for script in ["x86/sem.py", + "x86/unit/mn_strings.py", + "x86/unit/mn_float.py", + "arm/arch.py", + "arm/sem.py", + "msp430/arch.py", + "msp430/sem.py", + "sh4/arch.py", + "mips32/arch.py", + ]: + testset += Test([script], base_dir="test/arch") +## Core +for script in ["interval.py", + "graph.py", + "parse_asm.py", + ]: + testset += Test([script], base_dir="test/core") +## Expression +for script in ["modint.py", + "stp.py", + "simplifications.py", + ]: + testset += Test([script], base_dir="test/expression") +## IR +for script in ["ir2C.py", + "symbexec.py", + ]: + testset += Test([script], base_dir="test/ir") +## OS_DEP +for script in ["win_api_x86_32.py", + ]: + testset += Test([script], base_dir="test/os_dep") +# Examples +## Assembler +testset += Test(['asm_x86.py'], base_dir="example", + products=["demo_x86_32.bin"]) +test_arm = Test(["asm_arm.py"], base_dir="example", + products=["demo_arm_l.bin", "demo_arm_b.bin"]) +test_armt = Test(["asm_armt.py"], base_dir="example", + products=["demo_armt_l.bin", "demo_armt_b.bin"]) +test_box = Test(["asm_box_x86_32.py"], base_dir="example", + products=["box_x86_32.bin"]) +test_box_enc = Test(["asm_box_x86_32_enc.py"], base_dir="example", + products=["box_x86_32_enc.bin"]) +test_box_mod = Test(["asm_box_x86_32_mod.py"], base_dir="example", + products=["box_x86_32_mod.bin"]) +test_box_mod_self = Test(["asm_box_x86_32_mod_self.py"], base_dir="example", + products=["box_x86_32_mod_self.bin"]) +test_box_repmod = Test(["asm_box_x86_32_repmod.py"], base_dir="example", + products=["box_x86_32_repmod.bin"]) +test_msp430 = Test(["asm_msp430_sc.py"], base_dir="example", + products=["msp430_sc.bin"]) +test_mips32 = Test(["asm_mips32.py"], base_dir="example", + products=["mips32_sc_b.bin", "mips32_sc_l.bin"]) + +testset += test_arm +testset += test_armt +testset += test_box +testset += test_box_enc +testset += test_box_mod +testset += test_box_mod_self +testset += test_box_repmod +testset += test_msp430 +testset += test_mips32 +for script in [["disasm_01.py"], + ["disasm_02.py"], + ["disasm_03.py", "box_upx.exe", "0x410f90"], + ]: + testset += Test(script, base_dir="example") +## Expression +testset += Test(["test_dis.py", "-g", "-s", "-m", "arml", "demo_arm_l.bin", "0"], + base_dir = "example", depends=[test_arm]) +testset += Test(["test_dis.py", "-g", "-s", "-m", "armb", "demo_arm_b.bin", "0"], + base_dir = "example", depends=[test_arm]) +testset += Test(["test_dis.py", "-g", "-s", "-m", "armtl", "demo_armt_l.bin", "0"], + base_dir = "example", depends=[test_armt]) +testset += Test(["test_dis.py", "-g", "-s", "-m", "armtb", "demo_armt_b.bin", "0"], + base_dir = "example", depends=[test_armt]) +testset += Test(["test_dis.py", "-g", "-s", "-m", "x86_32", "box_x86_32.bin", + "0x401000"], base_dir="example", depends=[test_box]) +testset += Test(["test_dis.py", "-g", "-s", "-m", "msp430", "msp430_sc.bin", "0"], + base_dir = "example", depends=[test_msp430]) +testset += Test(["test_dis.py", "-g", "-s", "-m", "mips32l", "mips32_sc_l.bin", + "0"], base_dir = "example", depends=[test_mips32]) +testset += Test(["test_dis.py", "-g", "-s", "-m", "mips32b", "mips32_sc_b.bin", + "0"], base_dir = "example", depends=[test_mips32]) +for script in [["symbol_exec.py"], + ["expression/basic_op.py"], + ["expression/get_read_write.py"], + ["expression/basic_simplification.py"], + ["expression/graph_dataflow.py", "expression/sc_connect_back.bin", "0x2e"], - ["expression/simplification_tools.py"], - ["expression/asm_to_ir.py"], - ["expression/expr_grapher.py"], - ["expression/simplification_add.py"], - ["test_dis.py", "-g", "-s", "-m", "arml", "demo_arm_l.bin", "0"], - ["test_dis.py", "-g", "-s", "-m", "armb", "demo_arm_b.bin", "0"], - ["test_dis.py", "-g", "-s", "-m", "armtl", "demo_armt_l.bin", "0"], - ["test_dis.py", "-g", "-s", "-m", "armtb", "demo_armt_b.bin", "0"], - ["test_dis.py", "-g", "-s", "-m", - "x86_32", "box_x86_32.bin", "0x401000"], - ["test_dis.py", "-g", "-s", "-m", "msp430", "msp430_sc.bin", "0"], - ["test_dis.py", "-g", "-s", "-m", "mips32l", "mips32_sc_l.bin", "0"], - ["test_dis.py", "-g", "-s", "-m", "mips32b", "mips32_sc_b.bin", "0"], - ["expression/solve_condition_stp.py", + ["expression/simplification_tools.py"], + ["expression/asm_to_ir.py"], + ["expression/expr_grapher.py"], + ["expression/simplification_add.py"], + ["expression/solve_condition_stp.py", "expression/simple_test.bin"], - ], - "jitter": reduce(lambda x, y: x + y, - map(all_jit, [ - ["unpack_upx.py", "box_upx.exe"], # Take 5 mins on a Core i5 - ["test_jit_x86_32.py", "x86_32_sc.bin"], - ["test_jit_arm.py", "md5_arm", "-a", "A684"], - ["test_jit_msp430.py", "msp430_sc.bin", "0"], - ["test_jit_mips32.py", "mips32_sc_l.bin", "0"], - ["test_jit_arm_sc.py", "0", "demo_arm_b.bin", "b", "-a", "0"], - ["test_jit_arm_sc.py", "0", "demo_arm_l.bin", "l", "-a", "0"], - ["sandbox_pe_x86_32.py", "box_x86_32.bin"], - ["sandbox_pe_x86_32.py", "box_x86_32_enc.bin"], - ["sandbox_pe_x86_32.py", "box_x86_32_mod.bin"], - ["sandbox_pe_x86_32.py", "box_x86_32_repmod.bin"], - ["sandbox_pe_x86_32.py", "box_x86_32_mod_self.bin"], - ])), - "order": [ - "assembler", - "expression", - "jitter", - ], - }, - "order": [ - "test", - "example", - ], -} - -# Cosmetic - - -def getTerminalSize(): - "Return the size of the terminal : COLUMNS, LINES" - - env = os.environ - - def ioctl_GWINSZ(fd): - try: - import fcntl - import termios - import struct - import os - cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, - '1234')) - except: - return - return cr - cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2) - if not cr: - try: - fd = os.open(os.ctermid(), os.O_RDONLY) - cr = ioctl_GWINSZ(fd) - os.close(fd) - except: - pass - if not cr: - cr = (env.get('LINES', 25), env.get('COLUMNS', 80)) - return int(cr[1]), int(cr[0]) - - -WIDTH = getTerminalSize()[0] -colors = {"red": "\033[91;1m", - "end": "\033[0m", - "green": "\033[92;1m", - "lightcyan": "\033[96m", - "blue": "\033[94;1m"} - - -def write_colored(text, color, already_printed=0): - text_colored = colors[color] + text + colors["end"] - print " " * (WIDTH - already_printed - len(text)) + text_colored - - -def write_underline(text): - print "\033[4m" + text + colors["end"] - - -def print_conf(conf, value): - return colors["green"] + conf + ": " + colors["end"] + str(value) - - -def clr_screen(global_state, pstate): - "Update the screen to display some information" - - # Header - to_print = [] - to_print.append(" " * (global_state["termSize"][0] / 2 - 10) + colors[ - "blue"] + "Miasm2 Regression tests" + colors["end"]) - to_print.append("") - to_print.append("=" * global_state["termSize"][0]) - to_print.append("") - to_print.append(print_conf("Current mode", "Multiprocessing")) - to_print.append(print_conf("Nb CPU detected", global_state["cpu_c"])) - to_print.append("") - to_print.append("=" * global_state["termSize"][0]) - to_print.append("") - to_print.append( - print_conf("Current section", global_state["section"].upper())) - to_print.append( - print_conf("Current subsection", global_state["subsection"].upper())) - test_done = 0 - test_failed = 0 - message = global_state["message"] + "\n" - for k, v in pstate.items(): - if v["status"] != "running": - test_done += 1 - if v["status"] != 0: - test_failed += 1 - message += colors["red"] + "FAIL: " + colors["end"] + k - message += v["message"] + "\n" - - to_print.append(print_conf("Success rate", "%d/%d" % - (test_done - test_failed, test_done))) - printed_time = time.strftime( - "%M:%S", time.gmtime(time.time() - global_state["init_time"])) - to_print.append(print_conf("Cumulated time", printed_time)) - to_print.append("") - to_print.append("=" * global_state["termSize"][0]) - - cur = "\n".join(to_print) - cur += "\n" - - # Message - cur += message - print cur - already_printed = cur.count("\n") - - # Current state - current_job = [] - for t in pstate.values(): - if t["status"] == "running": - current_job.append(t) - print "\n" * (global_state["termSize"][1] - already_printed - 3 - len(current_job)) - - for j in current_job: - s = "[" + colors["lightcyan"] + j["command"] + colors["end"] - s_end = time.strftime( - "%M:%Ss", time.gmtime(time.time() - j["init_time"])) - l = len(j["command"]) + len(s_end) + 4 + len(str(j["pid"])) + 2 - s_end += " " + colors["blue"] + str(j["pid"]) + colors["end"] + "]" - print "%s%s%s" % (s, " " * (global_state["termSize"][0] - l), s_end) - -# Tests handling - - -def are_tests_finished(test_names, done): - for t in test_names: - if t not in done: - return False - return True - - -def are_tests_finished_multi(test_names, pstate): - for t in test_names: - t = " ".join(t) - if t not in pstate.keys(): - return False - if pstate[t]["status"] == "running": - return False - return True - - -def test_iter(done): - "Return an iterator on next tests, wait for previous sections" - - for section_name in all_tests["order"]: - # Go to the right directory - os.chdir(os.path.join("..", section_name)) - - # Update global state - section_content = all_tests[section_name] - write_underline(section_name.upper()) - - for subsection_name in section_content["order"]: - subsection_content = section_content[subsection_name] - write_underline("%s > %s" % (section_name.upper(), - subsection_name.upper())) - for test_line in subsection_content: - yield test_line - - while not(are_tests_finished(subsection_content, done)): - time.sleep(0.050) - - -def test_iter_multi(global_state, pstate): - "Multiprocessor version of test_iter" - - # Global message : subsections done - message = "" - - for section_name in all_tests["order"]: - # Update global state - section_content = all_tests[section_name] - global_state["section"] = section_name - - for subsection_name in section_content["order"]: - subsection_content = section_content[subsection_name] - beg_time = time.time() - global_state["subsection"] = subsection_name - - for test_line in subsection_content: - yield test_line - - while not(are_tests_finished_multi(subsection_content, pstate)): - # Wait for task to finish, update the screen - time.sleep(0.100) - clr_screen(global_state, pstate) - - message += "%s > %s completed in %.08f seconds\n" % (section_name.upper(), - subsection_name.upper( - ), - time.time() - beg_time) - global_state["message"] = message - - # Final update - clr_screen(global_state, pstate) - - -def run_test(test, coveragerc=None): - s = "Running tests on %s ..." % " ".join(test) - sys.stdout.write(s) - sys.stdout.flush() - - args = test - if coveragerc is not None: - args = ["-m", "coverage", "run", "--rcfile", coveragerc, "-a"] + test - - # Launch test - testpy = subprocess.Popen(["python"] + args, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - outputs = testpy.communicate() - - # Check result - if testpy.returncode == 0: - write_colored("OK", "green", len(s)) - else: - write_colored("ERROR", "red", len(s)) - print outputs[1] - - return testpy.returncode - -def run_test_parallel(test, current, global_state): - - pid = os.getpid() - test_key = " ".join(test) - - # Keep current PID - current[test_key] = {"status": "running", - "pid": pid, - "command": test_key, - "init_time": time.time()} - - # Go to the right directory - os.chdir(os.path.join("..", global_state["section"])) - - # Launch test - testpy = subprocess.Popen(["python"] + test, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - outputs = testpy.communicate() - - # Check result - message = "" - if testpy.returncode != 0: - message = outputs[1] + ]: + testset += Test(script, base_dir="example") +## Jitter +for script, dep in [(["unpack_upx.py", "box_upx.exe"], []), # Take 5 mins on a Core i5 + (["test_jit_x86_32.py", "x86_32_sc.bin"], []), + (["test_jit_arm.py", "md5_arm", "-a", "A684"], []), + (["test_jit_msp430.py", "msp430_sc.bin", "0"], + [test_msp430]), + (["test_jit_mips32.py", "mips32_sc_l.bin", "0"], + [test_mips32]), + (["test_jit_arm_sc.py", "0", "demo_arm_b.bin", "b", "-a", + "0"], [test_arm]), + (["test_jit_arm_sc.py", "0", "demo_arm_l.bin", "l", "-a", + "0"], [test_arm]), + (["sandbox_pe_x86_32.py", "box_x86_32.bin"], [test_box]), + (["sandbox_pe_x86_32.py", "box_x86_32_enc.bin"], + [test_box_enc]), + (["sandbox_pe_x86_32.py", "box_x86_32_mod.bin"], + [test_box_mod]), + (["sandbox_pe_x86_32.py", "box_x86_32_repmod.bin"], + [test_box_repmod]), + (["sandbox_pe_x86_32.py", "box_x86_32_mod_self.bin"], + [test_box_mod_self]), + ]: + for jitter in ["tcc", "llvm", "python"]: + testset += Test(script + ["--jitter", jitter], base_dir="example", + depends=dep) - # Update result - current[test_key] = {"status": testpy.returncode, - "message": message} -# Multiprocessing handling +if __name__ == "__main__": + # Argument parsing + parser = argparse.ArgumentParser(description="Miasm2 testing tool") + parser.add_argument("-m", "--mono", help="Force monothreading", + action="store_true") + parser.add_argument("-c", "--coverage", help="Include code coverage", + action="store_true") + args = parser.parse_args() -try: - from multiprocessing import Manager, Pool, cpu_count multiproc = True -except ImportError: - multiproc = False - -# Argument parsing -parser = argparse.ArgumentParser(description="Miasm2 testing tool") -parser.add_argument("-m", "--mono", help="Force monothreading", - action="store_true") -parser.add_argument("-c", "--coverage", help="Include code coverage", - action="store_true") -args = parser.parse_args() + if args.mono is True or args.coverage is True: + multiproc = False -if args.mono is True or args.coverage is True: - multiproc = False - -# Handle coverage -coveragerc = None -if args.coverage is True: + # Handle coverage + coveragerc = None + if args.coverage is True: + try: + import coverage + except ImportError: + print "%(red)s[Coverage]%(end)s " % cosmetics.colors + \ + "Python 'coverage' module is required" + exit(-1) + + # Create directory + suffix = "_" + str(int(time.time())) + cov_dir = tempfile.mkdtemp(suffix, "m2_coverage_") + + # Create configuration file + coveragerc = os.path.join(cov_dir, ".coveragerc") + coverage = os.path.join(cov_dir, ".coverage") + + from ConfigParser import ConfigParser + from os.path import expanduser + + config = ConfigParser() + config.read(['/etc/coveragerc', expanduser('~/.coveragerc')]) + if not config.has_section('run'): + config.add_section('run') + config.set('run', 'data_file', coverage) + config.write(open(coveragerc, 'w')) + + # Add arguments to tests command line + testset.add_additionnal_args(["-m", "coverage", "run", "--rcfile", + coveragerc, "-a"]) + + + # Inform the user + d = {"blue": cosmetics.colors['blue'], + "end": cosmetics.colors['end'], + "cov_dir": cov_dir} + print "[%(blue)sCoverage%(end)s] Report will be written in %(cov_dir)s" % d + + # Handle llvm modularity + llvm = True try: - import coverage + import llvm except ImportError: - print "%(red)s[Coverage]%(end)s Python 'coverage' module is required" % colors - exit(-1) - - # Create directory - suffix = "_" + str(int(time.time())) - cov_dir = tempfile.mkdtemp(suffix, "m2_coverage_") - - # Create configuration file - coveragerc = os.path.join(cov_dir, ".coveragerc") - coverage = os.path.join(cov_dir, ".coverage") - - from ConfigParser import ConfigParser - from os.path import expanduser - - config = ConfigParser() - config.read(['/etc/coveragerc', expanduser('~/.coveragerc')]) - if not config.has_section('run'): - config.add_section('run') - config.set('run', 'data_file', coverage) - config.write(open(coveragerc, 'w')) - - # Inform the user - d = {"blue": colors['blue'], - "end": colors['end'], - "cov_dir": cov_dir} - print "[%(blue)sCoverage%(end)s] Report will be written in %(cov_dir)s" % d + llvm = False -# Handle llvm modularity - -llvm = True -try: - import llvm -except ImportError: + # TODO XXX: fix llvm jitter (deactivated for the moment) llvm = False -# TODO XXX: fix llvm jitter (deactivated for the moment) -llvm = False - -if llvm is False: - print "%(red)s[LLVM]%(end)s Python 'py-llvm 3.2' module is required for llvm tests" % colors - - # Remove llvm tests - for test in all_tests["example"]["jitter"]: - if "llvm" in test: - all_tests["example"]["jitter"].remove(test) - print "%(red)s[LLVM]%(end)s Remove" % colors, " ".join(test) - - # Let the user see messages - time.sleep(0.5) - -# Run tests - -if multiproc is False: - done = list() - status = 0 - for test in test_iter(done): - status |= run_test(test, coveragerc=coveragerc) - done.append(test) - - # Return an error code if a test failed - assert(status == 0) - -else: - # Parallel version - cpu_c = cpu_count() - global_state = {"cpu_c": cpu_c, - "init_time": time.time(), - "termSize": getTerminalSize(), - "message": ""} - - manager = Manager() - pool = Pool(processes=cpu_c) - current = manager.dict() + if llvm is False: + print "%(red)s[LLVM]%(end)s Python" % cosmetics.colors + \ + "'py-llvm 3.2' module is required for llvm tests" + + # Remove llvm tests + for test in testset.tests: + if "llvm" in test.command_line: + testset.tests.remove(test) + print "%(red)s[LLVM]%(end)s Remove" % cosmetics.colors, \ + " ".join(test.command_line) + + # Let the user see messages + time.sleep(0.5) + + # Set callbacks + if multiproc is False: + testset.set_callback(task_done=monothread.task_done, + task_new=monothread.task_new) + testset.set_cpu_numbers(1) + else: + screendisplay.init(testset.cpu_c) + testset.set_callback(task_done=screendisplay.task_done, + task_new=screendisplay.task_new) - for test in test_iter_multi(global_state, current): - pool.apply_async(run_test_parallel, (test, - current, - global_state)) + # Run tests + testset.run() - pool.close() - pool.join() + # Exit with an error if at least a test failed + exit(testset.tests_passed()) |