diff options
| author | Fabrice Desclaux <fabrice.desclaux@cea.fr> | 2014-10-20 13:39:48 +0200 |
|---|---|---|
| committer | Fabrice Desclaux <fabrice.desclaux@cea.fr> | 2014-10-20 13:39:48 +0200 |
| commit | 10664b49b1b419cb7ae0812788870bf5c9c2ebd8 (patch) | |
| tree | 5e80c31abaf4437e657d1a8ab92db956b82c0ec7 | |
| parent | 3ec550cf75e1d7cd95e8237d307934d5a28bad73 (diff) | |
| parent | dd3e3df9113fe1f0c7a339e7b657df5439d8cd1c (diff) | |
| download | miasm-10664b49b1b419cb7ae0812788870bf5c9c2ebd8.tar.gz miasm-10664b49b1b419cb7ae0812788870bf5c9c2ebd8.zip | |
merge
| -rw-r--r-- | test/test_all.py | 676 | ||||
| -rw-r--r-- | test/utils/__init__.py | 1 | ||||
| -rw-r--r-- | test/utils/cosmetics.py | 47 | ||||
| -rw-r--r-- | test/utils/monothread.py | 20 | ||||
| -rw-r--r-- | test/utils/screendisplay.py | 115 | ||||
| -rw-r--r-- | test/utils/test.py | 29 | ||||
| -rw-r--r-- | test/utils/testset.py | 226 |
7 files changed, 656 insertions, 458 deletions
diff --git a/test/test_all.py b/test/test_all.py index fdba4be4..b3059ae7 100644 --- a/test/test_all.py +++ b/test/test_all.py @@ -1,473 +1,233 @@ -import subprocess -import sys -import os -import time import argparse -import tempfile - -# Test derivations -def all_jit(assembly_line): - """Add all available jitter options to assembly_line thanks to '--jitter' - option. - @assembly_line: list(str) - Return a list of assembly lines: list(list(str)). - """ - out = [] - for jitter in ["tcc", "llvm", "python"]: - out.append(assembly_line + ["--jitter", jitter]) - return out - -# Available tests - -all_tests = { - "test": { - "architecture": [ - ["arch/x86/arch.py"], - ["arch/x86/sem.py"], - ["arch/x86/unit/mn_strings.py"], - ["arch/x86/unit/mn_float.py"], - ["arch/arm/arch.py"], - ["arch/arm/sem.py"], - ["arch/msp430/arch.py"], - ["arch/msp430/sem.py"], - ["arch/sh4/arch.py"], - ["arch/mips32/arch.py"], - ], - "core": [ - ["core/interval.py"], - ["core/graph.py"], - ["core/parse_asm.py"], - ], - "expression": [ - ["expression/modint.py"], - ["expression/stp.py"], - ["expression/simplifications.py"], - ], - "ir": [ - ["ir/ir2C.py"], - ["ir/symbexec.py"], - ], - "os_dep": [ - ["os_dep/win_api_x86_32.py"], - ], - "order": [ - "architecture", - "core", - "expression", - "ir", - "os_dep", - ], - }, - "example": { - "assembler": [ - ["asm_x86.py"], - ["asm_arm.py"], - ["asm_armt.py"], - ["asm_box_x86_32.py"], - ["asm_box_x86_32_enc.py"], - ["asm_box_x86_32_mod.py"], - ["asm_box_x86_32_mod_self.py"], - ["asm_box_x86_32_repmod.py"], - ["asm_msp430_sc.py"], - ["asm_mips32.py"], - ["disasm_01.py"], - ["disasm_02.py"], - ["disasm_03.py", "box_upx.exe", "0x410f90"], - ], - "expression": [ - ["symbol_exec.py"], - ["expression/basic_op.py"], - ["expression/get_read_write.py"], - ["expression/basic_simplification.py"], - ["expression/graph_dataflow.py", +import time +from utils.test import Test +from utils.testset import TestSet +from utils import cosmetics, monothread, screendisplay + +testset = TestSet("../") + +# Regression tests +## Architecture +testset += Test(["x86/arch.py"], base_dir="test/arch", + products=["x86_speed_reg_test.bin", + "regression_test16_ia32.bin", + "regression_test32_ia32.bin", + "regression_test64_ia32.bin"]) +for script in ["x86/sem.py", + "x86/unit/mn_strings.py", + "x86/unit/mn_float.py", + "arm/arch.py", + "arm/sem.py", + "msp430/arch.py", + "msp430/sem.py", + "sh4/arch.py", + "mips32/arch.py", + ]: + testset += Test([script], base_dir="test/arch") +## Core +for script in ["interval.py", + "graph.py", + "parse_asm.py", + ]: + testset += Test([script], base_dir="test/core") +## Expression +for script in ["modint.py", + "stp.py", + "simplifications.py", + ]: + testset += Test([script], base_dir="test/expression") +## IR +for script in ["ir2C.py", + "symbexec.py", + ]: + testset += Test([script], base_dir="test/ir") +## OS_DEP +for script in ["win_api_x86_32.py", + ]: + testset += Test([script], base_dir="test/os_dep") +# Examples +## Assembler +testset += Test(['asm_x86.py'], base_dir="example", + products=["demo_x86_32.bin"]) +test_arm = Test(["asm_arm.py"], base_dir="example", + products=["demo_arm_l.bin", "demo_arm_b.bin"]) +test_armt = Test(["asm_armt.py"], base_dir="example", + products=["demo_armt_l.bin", "demo_armt_b.bin"]) +test_box = Test(["asm_box_x86_32.py"], base_dir="example", + products=["box_x86_32.bin"]) +test_box_enc = Test(["asm_box_x86_32_enc.py"], base_dir="example", + products=["box_x86_32_enc.bin"]) +test_box_mod = Test(["asm_box_x86_32_mod.py"], base_dir="example", + products=["box_x86_32_mod.bin"]) +test_box_mod_self = Test(["asm_box_x86_32_mod_self.py"], base_dir="example", + products=["box_x86_32_mod_self.bin"]) +test_box_repmod = Test(["asm_box_x86_32_repmod.py"], base_dir="example", + products=["box_x86_32_repmod.bin"]) +test_msp430 = Test(["asm_msp430_sc.py"], base_dir="example", + products=["msp430_sc.bin"]) +test_mips32 = Test(["asm_mips32.py"], base_dir="example", + products=["mips32_sc_b.bin", "mips32_sc_l.bin"]) + +testset += test_arm +testset += test_armt +testset += test_box +testset += test_box_enc +testset += test_box_mod +testset += test_box_mod_self +testset += test_box_repmod +testset += test_msp430 +testset += test_mips32 +for script in [["disasm_01.py"], + ["disasm_02.py"], + ["disasm_03.py", "box_upx.exe", "0x410f90"], + ]: + testset += Test(script, base_dir="example") +## Expression +testset += Test(["test_dis.py", "-g", "-s", "-m", "arml", "demo_arm_l.bin", "0"], + base_dir = "example", depends=[test_arm]) +testset += Test(["test_dis.py", "-g", "-s", "-m", "armb", "demo_arm_b.bin", "0"], + base_dir = "example", depends=[test_arm]) +testset += Test(["test_dis.py", "-g", "-s", "-m", "armtl", "demo_armt_l.bin", "0"], + base_dir = "example", depends=[test_armt]) +testset += Test(["test_dis.py", "-g", "-s", "-m", "armtb", "demo_armt_b.bin", "0"], + base_dir = "example", depends=[test_armt]) +testset += Test(["test_dis.py", "-g", "-s", "-m", "x86_32", "box_x86_32.bin", + "0x401000"], base_dir="example", depends=[test_box]) +testset += Test(["test_dis.py", "-g", "-s", "-m", "msp430", "msp430_sc.bin", "0"], + base_dir = "example", depends=[test_msp430]) +testset += Test(["test_dis.py", "-g", "-s", "-m", "mips32l", "mips32_sc_l.bin", + "0"], base_dir = "example", depends=[test_mips32]) +testset += Test(["test_dis.py", "-g", "-s", "-m", "mips32b", "mips32_sc_b.bin", + "0"], base_dir = "example", depends=[test_mips32]) +for script in [["symbol_exec.py"], + ["expression/basic_op.py"], + ["expression/get_read_write.py"], + ["expression/basic_simplification.py"], + ["expression/graph_dataflow.py", "expression/sc_connect_back.bin", "0x2e"], - ["expression/simplification_tools.py"], - ["expression/asm_to_ir.py"], - ["expression/expr_grapher.py"], - ["expression/simplification_add.py"], - ["test_dis.py", "-g", "-s", "-m", "arml", "demo_arm_l.bin", "0"], - ["test_dis.py", "-g", "-s", "-m", "armb", "demo_arm_b.bin", "0"], - ["test_dis.py", "-g", "-s", "-m", "armtl", "demo_armt_l.bin", "0"], - ["test_dis.py", "-g", "-s", "-m", "armtb", "demo_armt_b.bin", "0"], - ["test_dis.py", "-g", "-s", "-m", - "x86_32", "box_x86_32.bin", "0x401000"], - ["test_dis.py", "-g", "-s", "-m", "msp430", "msp430_sc.bin", "0"], - ["test_dis.py", "-g", "-s", "-m", "mips32l", "mips32_sc_l.bin", "0"], - ["test_dis.py", "-g", "-s", "-m", "mips32b", "mips32_sc_b.bin", "0"], - ["expression/solve_condition_stp.py", + ["expression/simplification_tools.py"], + ["expression/asm_to_ir.py"], + ["expression/expr_grapher.py"], + ["expression/simplification_add.py"], + ["expression/solve_condition_stp.py", "expression/simple_test.bin"], - ], - "jitter": reduce(lambda x, y: x + y, - map(all_jit, [ - ["unpack_upx.py", "box_upx.exe"], # Take 5 mins on a Core i5 - ["test_jit_x86_32.py", "x86_32_sc.bin"], - ["test_jit_arm.py", "md5_arm", "-a", "A684"], - ["test_jit_msp430.py", "msp430_sc.bin", "0"], - ["test_jit_mips32.py", "mips32_sc_l.bin", "0"], - ["test_jit_arm_sc.py", "0", "demo_arm_b.bin", "b", "-a", "0"], - ["test_jit_arm_sc.py", "0", "demo_arm_l.bin", "l", "-a", "0"], - ["sandbox_pe_x86_32.py", "box_x86_32.bin"], - ["sandbox_pe_x86_32.py", "box_x86_32_enc.bin"], - ["sandbox_pe_x86_32.py", "box_x86_32_mod.bin"], - ["sandbox_pe_x86_32.py", "box_x86_32_repmod.bin"], - ["sandbox_pe_x86_32.py", "box_x86_32_mod_self.bin"], - ])), - "order": [ - "assembler", - "expression", - "jitter", - ], - }, - "order": [ - "test", - "example", - ], -} - -# Cosmetic - - -def getTerminalSize(): - "Return the size of the terminal : COLUMNS, LINES" - - env = os.environ - - def ioctl_GWINSZ(fd): - try: - import fcntl - import termios - import struct - import os - cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, - '1234')) - except: - return - return cr - cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2) - if not cr: - try: - fd = os.open(os.ctermid(), os.O_RDONLY) - cr = ioctl_GWINSZ(fd) - os.close(fd) - except: - pass - if not cr: - cr = (env.get('LINES', 25), env.get('COLUMNS', 80)) - return int(cr[1]), int(cr[0]) - - -WIDTH = getTerminalSize()[0] -colors = {"red": "\033[91;1m", - "end": "\033[0m", - "green": "\033[92;1m", - "lightcyan": "\033[96m", - "blue": "\033[94;1m"} - - -def write_colored(text, color, already_printed=0): - text_colored = colors[color] + text + colors["end"] - print " " * (WIDTH - already_printed - len(text)) + text_colored - - -def write_underline(text): - print "\033[4m" + text + colors["end"] - - -def print_conf(conf, value): - return colors["green"] + conf + ": " + colors["end"] + str(value) - - -def clr_screen(global_state, pstate): - "Update the screen to display some information" - - # Header - to_print = [] - to_print.append(" " * (global_state["termSize"][0] / 2 - 10) + colors[ - "blue"] + "Miasm2 Regression tests" + colors["end"]) - to_print.append("") - to_print.append("=" * global_state["termSize"][0]) - to_print.append("") - to_print.append(print_conf("Current mode", "Multiprocessing")) - to_print.append(print_conf("Nb CPU detected", global_state["cpu_c"])) - to_print.append("") - to_print.append("=" * global_state["termSize"][0]) - to_print.append("") - to_print.append( - print_conf("Current section", global_state["section"].upper())) - to_print.append( - print_conf("Current subsection", global_state["subsection"].upper())) - test_done = 0 - test_failed = 0 - message = global_state["message"] + "\n" - for k, v in pstate.items(): - if v["status"] != "running": - test_done += 1 - if v["status"] != 0: - test_failed += 1 - message += colors["red"] + "FAIL: " + colors["end"] + k - message += v["message"] + "\n" - - to_print.append(print_conf("Success rate", "%d/%d" % - (test_done - test_failed, test_done))) - printed_time = time.strftime( - "%M:%S", time.gmtime(time.time() - global_state["init_time"])) - to_print.append(print_conf("Cumulated time", printed_time)) - to_print.append("") - to_print.append("=" * global_state["termSize"][0]) - - cur = "\n".join(to_print) - cur += "\n" - - # Message - cur += message - print cur - already_printed = cur.count("\n") - - # Current state - current_job = [] - for t in pstate.values(): - if t["status"] == "running": - current_job.append(t) - print "\n" * (global_state["termSize"][1] - already_printed - 3 - len(current_job)) - - for j in current_job: - s = "[" + colors["lightcyan"] + j["command"] + colors["end"] - s_end = time.strftime( - "%M:%Ss", time.gmtime(time.time() - j["init_time"])) - l = len(j["command"]) + len(s_end) + 4 + len(str(j["pid"])) + 2 - s_end += " " + colors["blue"] + str(j["pid"]) + colors["end"] + "]" - print "%s%s%s" % (s, " " * (global_state["termSize"][0] - l), s_end) - -# Tests handling - - -def are_tests_finished(test_names, done): - for t in test_names: - if t not in done: - return False - return True - - -def are_tests_finished_multi(test_names, pstate): - for t in test_names: - t = " ".join(t) - if t not in pstate.keys(): - return False - if pstate[t]["status"] == "running": - return False - return True - - -def test_iter(done): - "Return an iterator on next tests, wait for previous sections" - - for section_name in all_tests["order"]: - # Go to the right directory - os.chdir(os.path.join("..", section_name)) - - # Update global state - section_content = all_tests[section_name] - write_underline(section_name.upper()) - - for subsection_name in section_content["order"]: - subsection_content = section_content[subsection_name] - write_underline("%s > %s" % (section_name.upper(), - subsection_name.upper())) - for test_line in subsection_content: - yield test_line - - while not(are_tests_finished(subsection_content, done)): - time.sleep(0.050) - - -def test_iter_multi(global_state, pstate): - "Multiprocessor version of test_iter" - - # Global message : subsections done - message = "" - - for section_name in all_tests["order"]: - # Update global state - section_content = all_tests[section_name] - global_state["section"] = section_name - - for subsection_name in section_content["order"]: - subsection_content = section_content[subsection_name] - beg_time = time.time() - global_state["subsection"] = subsection_name - - for test_line in subsection_content: - yield test_line - - while not(are_tests_finished_multi(subsection_content, pstate)): - # Wait for task to finish, update the screen - time.sleep(0.100) - clr_screen(global_state, pstate) - - message += "%s > %s completed in %.08f seconds\n" % (section_name.upper(), - subsection_name.upper( - ), - time.time() - beg_time) - global_state["message"] = message - - # Final update - clr_screen(global_state, pstate) - - -def run_test(test, coveragerc=None): - s = "Running tests on %s ..." % " ".join(test) - sys.stdout.write(s) - sys.stdout.flush() - - args = test - if coveragerc is not None: - args = ["-m", "coverage", "run", "--rcfile", coveragerc, "-a"] + test - - # Launch test - testpy = subprocess.Popen(["python"] + args, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - outputs = testpy.communicate() - - # Check result - if testpy.returncode == 0: - write_colored("OK", "green", len(s)) - else: - write_colored("ERROR", "red", len(s)) - print outputs[1] - - return testpy.returncode - -def run_test_parallel(test, current, global_state): - - pid = os.getpid() - test_key = " ".join(test) - - # Keep current PID - current[test_key] = {"status": "running", - "pid": pid, - "command": test_key, - "init_time": time.time()} - - # Go to the right directory - os.chdir(os.path.join("..", global_state["section"])) - - # Launch test - testpy = subprocess.Popen(["python"] + test, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - outputs = testpy.communicate() - - # Check result - message = "" - if testpy.returncode != 0: - message = outputs[1] + ]: + testset += Test(script, base_dir="example") +## Jitter +for script, dep in [(["unpack_upx.py", "box_upx.exe"], []), # Take 5 mins on a Core i5 + (["test_jit_x86_32.py", "x86_32_sc.bin"], []), + (["test_jit_arm.py", "md5_arm", "-a", "A684"], []), + (["test_jit_msp430.py", "msp430_sc.bin", "0"], + [test_msp430]), + (["test_jit_mips32.py", "mips32_sc_l.bin", "0"], + [test_mips32]), + (["test_jit_arm_sc.py", "0", "demo_arm_b.bin", "b", "-a", + "0"], [test_arm]), + (["test_jit_arm_sc.py", "0", "demo_arm_l.bin", "l", "-a", + "0"], [test_arm]), + (["sandbox_pe_x86_32.py", "box_x86_32.bin"], [test_box]), + (["sandbox_pe_x86_32.py", "box_x86_32_enc.bin"], + [test_box_enc]), + (["sandbox_pe_x86_32.py", "box_x86_32_mod.bin"], + [test_box_mod]), + (["sandbox_pe_x86_32.py", "box_x86_32_repmod.bin"], + [test_box_repmod]), + (["sandbox_pe_x86_32.py", "box_x86_32_mod_self.bin"], + [test_box_mod_self]), + ]: + for jitter in ["tcc", "llvm", "python"]: + testset += Test(script + ["--jitter", jitter], base_dir="example", + depends=dep) - # Update result - current[test_key] = {"status": testpy.returncode, - "message": message} -# Multiprocessing handling +if __name__ == "__main__": + # Argument parsing + parser = argparse.ArgumentParser(description="Miasm2 testing tool") + parser.add_argument("-m", "--mono", help="Force monothreading", + action="store_true") + parser.add_argument("-c", "--coverage", help="Include code coverage", + action="store_true") + args = parser.parse_args() -try: - from multiprocessing import Manager, Pool, cpu_count multiproc = True -except ImportError: - multiproc = False - -# Argument parsing -parser = argparse.ArgumentParser(description="Miasm2 testing tool") -parser.add_argument("-m", "--mono", help="Force monothreading", - action="store_true") -parser.add_argument("-c", "--coverage", help="Include code coverage", - action="store_true") -args = parser.parse_args() + if args.mono is True or args.coverage is True: + multiproc = False -if args.mono is True or args.coverage is True: - multiproc = False - -# Handle coverage -coveragerc = None -if args.coverage is True: + # Handle coverage + coveragerc = None + if args.coverage is True: + try: + import coverage + except ImportError: + print "%(red)s[Coverage]%(end)s " % cosmetics.colors + \ + "Python 'coverage' module is required" + exit(-1) + + # Create directory + suffix = "_" + str(int(time.time())) + cov_dir = tempfile.mkdtemp(suffix, "m2_coverage_") + + # Create configuration file + coveragerc = os.path.join(cov_dir, ".coveragerc") + coverage = os.path.join(cov_dir, ".coverage") + + from ConfigParser import ConfigParser + from os.path import expanduser + + config = ConfigParser() + config.read(['/etc/coveragerc', expanduser('~/.coveragerc')]) + if not config.has_section('run'): + config.add_section('run') + config.set('run', 'data_file', coverage) + config.write(open(coveragerc, 'w')) + + # Add arguments to tests command line + testset.add_additionnal_args(["-m", "coverage", "run", "--rcfile", + coveragerc, "-a"]) + + + # Inform the user + d = {"blue": cosmetics.colors['blue'], + "end": cosmetics.colors['end'], + "cov_dir": cov_dir} + print "[%(blue)sCoverage%(end)s] Report will be written in %(cov_dir)s" % d + + # Handle llvm modularity + llvm = True try: - import coverage + import llvm except ImportError: - print "%(red)s[Coverage]%(end)s Python 'coverage' module is required" % colors - exit(-1) - - # Create directory - suffix = "_" + str(int(time.time())) - cov_dir = tempfile.mkdtemp(suffix, "m2_coverage_") - - # Create configuration file - coveragerc = os.path.join(cov_dir, ".coveragerc") - coverage = os.path.join(cov_dir, ".coverage") - - from ConfigParser import ConfigParser - from os.path import expanduser - - config = ConfigParser() - config.read(['/etc/coveragerc', expanduser('~/.coveragerc')]) - if not config.has_section('run'): - config.add_section('run') - config.set('run', 'data_file', coverage) - config.write(open(coveragerc, 'w')) - - # Inform the user - d = {"blue": colors['blue'], - "end": colors['end'], - "cov_dir": cov_dir} - print "[%(blue)sCoverage%(end)s] Report will be written in %(cov_dir)s" % d + llvm = False -# Handle llvm modularity - -llvm = True -try: - import llvm -except ImportError: + # TODO XXX: fix llvm jitter (deactivated for the moment) llvm = False -# TODO XXX: fix llvm jitter (deactivated for the moment) -llvm = False - -if llvm is False: - print "%(red)s[LLVM]%(end)s Python 'py-llvm 3.2' module is required for llvm tests" % colors - - # Remove llvm tests - for test in all_tests["example"]["jitter"]: - if "llvm" in test: - all_tests["example"]["jitter"].remove(test) - print "%(red)s[LLVM]%(end)s Remove" % colors, " ".join(test) - - # Let the user see messages - time.sleep(0.5) - -# Run tests - -if multiproc is False: - done = list() - status = 0 - for test in test_iter(done): - status |= run_test(test, coveragerc=coveragerc) - done.append(test) - - # Return an error code if a test failed - assert(status == 0) - -else: - # Parallel version - cpu_c = cpu_count() - global_state = {"cpu_c": cpu_c, - "init_time": time.time(), - "termSize": getTerminalSize(), - "message": ""} - - manager = Manager() - pool = Pool(processes=cpu_c) - current = manager.dict() + if llvm is False: + print "%(red)s[LLVM]%(end)s Python" % cosmetics.colors + \ + "'py-llvm 3.2' module is required for llvm tests" + + # Remove llvm tests + for test in testset.tests: + if "llvm" in test.command_line: + testset.tests.remove(test) + print "%(red)s[LLVM]%(end)s Remove" % cosmetics.colors, \ + " ".join(test.command_line) + + # Let the user see messages + time.sleep(0.5) + + # Set callbacks + if multiproc is False: + testset.set_callback(task_done=monothread.task_done, + task_new=monothread.task_new) + testset.set_cpu_numbers(1) + else: + screendisplay.init(testset.cpu_c) + testset.set_callback(task_done=screendisplay.task_done, + task_new=screendisplay.task_new) - for test in test_iter_multi(global_state, current): - pool.apply_async(run_test_parallel, (test, - current, - global_state)) + # Run tests + testset.run() - pool.close() - pool.join() + # Exit with an error if at least a test failed + exit(testset.tests_passed()) diff --git a/test/utils/__init__.py b/test/utils/__init__.py new file mode 100644 index 00000000..d34ce5f7 --- /dev/null +++ b/test/utils/__init__.py @@ -0,0 +1 @@ +__all__ = ["test", "testset", "cosmetics"] diff --git a/test/utils/cosmetics.py b/test/utils/cosmetics.py new file mode 100644 index 00000000..d870507b --- /dev/null +++ b/test/utils/cosmetics.py @@ -0,0 +1,47 @@ +import os + + +def getTerminalSize(): + "Return the size of the terminal : COLUMNS, LINES" + + env = os.environ + + def ioctl_GWINSZ(fd): + try: + import fcntl + import termios + import struct + import os + cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, + '1234')) + except: + return + return cr + cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2) + if not cr: + try: + fd = os.open(os.ctermid(), os.O_RDONLY) + cr = ioctl_GWINSZ(fd) + os.close(fd) + except: + pass + if not cr: + cr = (env.get('LINES', 25), env.get('COLUMNS', 80)) + return int(cr[1]), int(cr[0]) + + +WIDTH = getTerminalSize()[0] +colors = {"red": "\033[91;1m", + "end": "\033[0m", + "green": "\033[92;1m", + "lightcyan": "\033[96m", + "blue": "\033[94;1m"} + + +def write_colored(text, color, already_printed=0): + text_colored = colors[color] + text + colors["end"] + print " " * (WIDTH - already_printed - len(text)) + text_colored + + +def write_underline(text): + print "\033[4m" + text + colors["end"] diff --git a/test/utils/monothread.py b/test/utils/monothread.py new file mode 100644 index 00000000..ae64f3c5 --- /dev/null +++ b/test/utils/monothread.py @@ -0,0 +1,20 @@ +import sys +import cosmetics + + +def task_done(test, error): + s = "[%s] Running tests on %s ..." % (test.base_dir.upper(), + " ".join(test.command_line)) + already_printed = len(s) + if error is not None: + cosmetics.write_colored("ERROR", "red", already_printed) + print error + else: + cosmetics.write_colored("OK", "green", already_printed) + + +def task_new(test): + s = "[%s] Running tests on %s ..." % (test.base_dir.upper(), + " ".join(test.command_line)) + sys.stdout.write(s) + sys.stdout.flush() diff --git a/test/utils/screendisplay.py b/test/utils/screendisplay.py new file mode 100644 index 00000000..7c7bfde1 --- /dev/null +++ b/test/utils/screendisplay.py @@ -0,0 +1,115 @@ +import time +import signal +from cosmetics import getTerminalSize, colors + + +global_state = {"termSize": getTerminalSize(), + "message": "", + "pstate": []} + + +def print_conf(conf, value): + "Print a configuration line" + return colors["green"] + conf + ": " + colors["end"] + str(value) + + +def clr_screen(): + "Update the screen to display some information" + + # Header + to_print = [] + to_print.append(" " * (global_state["termSize"][0] / 2 - 10) + colors[ + "blue"] + "Miasm2 Regression tests" + colors["end"]) + to_print.append("") + to_print.append("=" * global_state["termSize"][0]) + to_print.append("") + to_print.append(print_conf("Current mode", "Multiprocessing")) + to_print.append(print_conf("Nb CPU detected", global_state["cpu_c"])) + to_print.append("") + to_print.append("=" * global_state["termSize"][0]) + to_print.append("") + test_done = 0 + test_failed = 0 + message = global_state["message"] + "\n" + for v in global_state["pstate"]: + if v["status"] != "running": + test_done += 1 + if v["status"] != 0: + test_failed += 1 + cmd_line = " ".join(v["test"].command_line) + message += colors["red"] + "FAIL:" + colors["end"] + cmd_line + message += "\n" + v["message"] + "\n" + + to_print.append(print_conf("Success rate", "%d/%d" % + (test_done - test_failed, test_done))) + printed_time = time.strftime( + "%M:%S", time.gmtime(time.time() - global_state["init_time"])) + to_print.append(print_conf("Cumulated time", printed_time)) + to_print.append("") + to_print.append("=" * global_state["termSize"][0]) + + cur = "\n".join(to_print) + cur += "\n" + + # Message + cur += message + print cur + already_printed = cur.count("\n") + + # Current state + current_job = [] + for process in global_state["pstate"]: + if process["status"] == "running": + current_job.append(process) + print "\n" * (global_state["termSize"][1] - already_printed - 3 - len(current_job)) + + for job in current_job: + command_line = " ".join(job["test"].command_line) + base_dir = job["test"].base_dir.upper() + s = "[" + colors["lightcyan"] + command_line + colors["end"] + s_end = base_dir + cur_time = time.strftime( + "%M:%Ss", time.gmtime(time.time() - job["init_time"])) + l = len(command_line) + len(s_end) + 4 + len(str(cur_time)) + 2 + s_end += " " + colors["blue"] + cur_time + colors["end"] + "]" + print "%s%s%s" % (s, " " * (global_state["termSize"][0] - l), s_end) + + +def on_signal(sig1, sig2): + "Update view every second" + clr_screen() + signal.alarm(1) + + +def init(cpu_c): + """Initialize global state + @cpu_c: number of cpu (for conf displaying) + """ + # Init global_state + global_state["cpu_c"] = cpu_c + global_state["init_time"] = time.time() + + # Launch view updater + signal.signal(signal.SIGALRM, on_signal) + signal.alarm(1) + + +def task_done(test, error): + "Report a test done" + for task in global_state["pstate"]: + if task["test"] == test: + if error is not None: + task["status"] = -1 + task["message"] = error + else: + task["status"] = 0 + break + clr_screen() + + +def task_new(test): + "Report a new test" + global_state["pstate"].append({"status": "running", + "test": test, + "init_time": time.time()}) + clr_screen() diff --git a/test/utils/test.py b/test/utils/test.py new file mode 100644 index 00000000..1caf1013 --- /dev/null +++ b/test/utils/test.py @@ -0,0 +1,29 @@ +class Test(object): + "Stand for a test to run" + + def __init__(self, command_line, base_dir="", depends=None, + products=None): + """Create a Test instance. + @command_line: list of string standing for arguments to launch + @base_dir: base directory for launch + @depends: list of Test instance indicating dependencies + @products: elements produced to remove after tests + """ + self.command_line = command_line + self.base_dir = base_dir + self.depends = depends if depends else [] + self.products = products if products else [] + + def __repr__(self): + displayed = ["command_line", "base_dir", "depends", "products"] + return "<Test " + \ + " ".join("%s=%s" % (n, getattr(self,n)) for n in displayed ) + ">" + + def __eq__(self, test): + if not isinstance(test, Test): + return False + + return all([self.command_line == test.command_line, + self.base_dir == test.base_dir, + self.depends == test.depends, + self.products == test.products]) diff --git a/test/utils/testset.py b/test/utils/testset.py new file mode 100644 index 00000000..ca7022fb --- /dev/null +++ b/test/utils/testset.py @@ -0,0 +1,226 @@ +import os +import subprocess +from multiprocessing import cpu_count, Queue, Process +from test import Test + + +class Message(object): + "Message exchanged in the TestSet message queue" + pass + + +class MessageTaskNew(object): + "Stand for a new task" + def __init__(self, task): + self.task = task + + +class MessageTaskDone(object): + "Stand for a task done" + def __init__(self, task, error): + self.task = task + self.error = error + + +class MessageClose(object): + "Close the channel" + pass + + +class TestSet(object): + "Manage a set of test" + + def __init__(self, base_dir): + """Initalise a test set + @base_dir: base directory for tests + """ + # Parse arguments + self.base_dir = base_dir + + # Init internals + self.task_done_cb = lambda tst, err: None # On task done callback + self.task_new_cb = lambda tst: None # On new task callback + self.todo_queue = Queue() # Tasks to do + self.message_queue = Queue() # Messages with workers + self.tests = [] # Tests to run + self.tests_done = [] # Tasks done + self.cpu_c = cpu_count() # CPUs available + self.errorcode = 0 # Non-zero if a test failed + self.additional_args = [] # Arguments to always add + + def __add__(self, test): + "Same as TestSet.add" + self.add(test) + return self + + def add(self, test): + "Add a test instance to the current test set" + if not isinstance(test, Test): + raise ValueError("%s is not a valid test instance" % (repr(test))) + self.tests.append(test) + + def set_cpu_numbers(self, cpu_c): + """Set the number of cpu to use + @cpu_c: Number of CPU to use (default is maximum) + """ + self.cpu_c = cpu_c + + def set_callback(self, task_done=None, task_new=None): + """Set callbacks for task information retrieval + @task_done: function(Test, Error message) + @task_new: function(Test) + """ + if task_done: + self.task_done_cb = task_done + if task_new: + self.task_new_cb = task_new + + def add_tasks(self): + "Add tests to do, regarding to dependencies" + for test in self.tests: + # Check dependencies + launchable = True + for dependency in test.depends: + if dependency not in self.tests_done: + launchable = False + break + + if launchable: + # Add task + self.tests.remove(test) + self.todo_queue.put(test) + + if len(self.tests) == 0: + # Poison pills + for _ in xrange(self.cpu_c): + self.todo_queue.put(None) + + # All tasks done + if len(self.tests_done) == self.init_tests_number: + self.message_queue.put(MessageClose()) + + def messages_handler(self): + "Manage message between Master and Workers" + + # Main loop + while True: + message = self.message_queue.get() + if isinstance(message, MessageClose): + # Poison pill + break + elif isinstance(message, MessageTaskNew): + # A task begins + self.task_new_cb(message.task) + elif isinstance(message, MessageTaskDone): + # A task has been done + self.tests_done.append(message.task) + self.add_tasks() + self.task_done_cb(message.task, message.error) + if message.error is not None: + self.errorcode = -1 + else: + raise ValueError("Unknown message type %s" % type(message)) + + @staticmethod + def worker(todo_queue, message_queue, init_args): + """Worker launched in parrallel + @todo_queue: task to do + @message_queue: communication with Host + @init_args: additionnal arguments for command line + """ + + # Main loop + while True: + # Acquire a task + test = todo_queue.get() + if test is None: + break + message_queue.put(MessageTaskNew(test)) + + # Go to the expected directory + current_directory = os.getcwd() + os.chdir(test.base_dir) + + # Launch test + testpy = subprocess.Popen(["python"] + init_args + test.command_line, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + outputs = testpy.communicate() + + # Check result + error = None + if testpy.returncode != 0: + error = outputs[1] + + # Restore directory + os.chdir(current_directory) + + # Report task finish + message_queue.put(MessageTaskDone(test, error)) + + def clean(self): + "Remove produced files" + + for test in self.tests_done: + # Go to the expected directory + current_directory = os.getcwd() + os.chdir(test.base_dir) + + # Remove files + for product in test.products: + try: + os.remove(product) + except OSError: + print "Cleanning error: Unable to remove %s" % product + + # Restore directory + os.chdir(current_directory) + + def add_additionnal_args(self, args): + """Add arguments to used on the test command line + @args: list of str + """ + self.add_additionnal_args += args + + def run(self): + "Launch tests" + + # Go in the right directory + current_directory = os.getcwd() + os.chdir(self.base_dir) + + # Launch workers + processes = [] + for _ in xrange(self.cpu_c): + p = Process(target=TestSet.worker, args=(self.todo_queue, + self.message_queue, + self.additional_args)) + + processes.append(p) + p.start() + + # Add initial tasks + self.init_tests_number = len(self.tests) + # Initial tasks + self.add_tasks() + + # Handle messages + self.messages_handler() + + # Close queue and join processes + self.todo_queue.close() + self.todo_queue.join_thread() + self.message_queue.close() + self.message_queue.join_thread() + for p in processes: + p.join() + + # Clean + self.clean() + + # Restore directory + os.chdir(current_directory) + + def tests_passed(self): + "Return a non zero value if at least one test failed" + return self.errorcode |