about summary refs log tree commit diff stats
path: root/test/test_all.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_all.py')
-rw-r--r--test/test_all.py443
1 files changed, 443 insertions, 0 deletions
diff --git a/test/test_all.py b/test/test_all.py
new file mode 100644
index 00000000..378755a4
--- /dev/null
+++ b/test/test_all.py
@@ -0,0 +1,443 @@
+import subprocess
+import sys
+import os
+import time
+import argparse
+import tempfile
+
+# Available tests
+
+all_tests = {
+    "test": {
+        "architecture": [
+            ["arch/x86/arch.py"],
+            ["arch/arm/arch.py"],
+            ["arch/arm/sem.py"],
+            ["arch/msp430/arch.py"],
+            ["arch/msp430/sem.py"],
+            ["arch/sh4/arch.py"],
+        ],
+        "core": [
+            ["core/interval.py"],
+            ["core/graph.py"],
+            ["core/parse_asm.py"],
+        ],
+        "expression": [
+            ["expression/modint.py"],
+            ["expression/stp.py"],
+            ["expression/simplifications.py"],
+        ],
+        "ir": [
+            ["ir/ir2C.py"],
+            ["ir/symbexec.py"],
+        ],
+        "order": [
+            "architecture",
+            "core",
+            "expression",
+            "ir",
+        ],
+    },
+    "example": {
+        "assembler": [
+            ["asm_x86.py"],
+            ["asm_arm.py"],
+            ["asm_box_x86_32.py"],
+            ["asm_box_x86_32_enc.py"],
+            ["asm_box_x86_32_mod.py"],
+            ["asm_box_x86_32_mod_self.py"],
+            ["asm_box_x86_32_repmod.py"],
+            ["disasm_01.py"],
+            ["disasm_02.py"],
+            ["disasm_03.py", "box_upx.exe", "0x410f90"],
+        ],
+        "expression": [
+            ["symbol_exec.py"],
+            ["expression/manip_expression1.py"],
+            ["expression/manip_expression2.py"],
+            ["expression/manip_expression3.py"],
+            ["expression/manip_expression4.py",
+                "expression/sc_connect_back.bin", "0x2e"],
+            ["expression/manip_expression5.py"],
+            ["expression/manip_expression6.py"],
+            ["expression/manip_expression7.py"],
+            ["test_dis.py", "-g", "-s", "-m", "arm", "demo_arm.bin", "0"],
+            ["test_dis.py", "-g", "-s", "-m",
+                "x86_32", "box_x86_32.bin", "0x401000"],
+            ["expression/solve_condition_stp.py",
+                "expression/simple_test.bin"],
+        ],
+        "jitter": [
+            ["unpack_upx.py", "--jitter", "tcc", "box_upx.exe"],
+            ["unpack_upx.py", "--jitter", "llvm", "box_upx.exe"],
+            ["test_jit_x86_32.py", "x86_32_sc.bin"],
+            ["test_jit_arm.py", "md5_arm", "A684"],
+            ["sandbox_pe_x86_32.py", "--jitter", "tcc", "box_x86_32.bin"],
+            ["sandbox_pe_x86_32.py", "--jitter", "llvm", "box_x86_32.bin"],
+            ["sandbox_pe_x86_32.py", "--jitter", "tcc", "box_x86_32_enc.bin"],
+            ["sandbox_pe_x86_32.py", "--jitter", "llvm", "box_x86_32_enc.bin"],
+            ["sandbox_pe_x86_32.py", "--jitter", "tcc", "box_x86_32_mod.bin"],
+            ["sandbox_pe_x86_32.py", "--jitter", "llvm", "box_x86_32_mod.bin"],
+            ["sandbox_pe_x86_32.py", "--jitter",
+                "tcc", "box_x86_32_mod_self.bin"],
+            ["sandbox_pe_x86_32.py", "--jitter",
+                "llvm", "box_x86_32_mod_self.bin"],
+            ["sandbox_pe_x86_32.py", "--jitter",
+                "tcc", "box_x86_32_repmod.bin"],
+            ["sandbox_pe_x86_32.py", "--jitter",
+                "llvm", "box_x86_32_repmod.bin"],
+        ],
+        "order": [
+            "assembler",
+            "expression",
+            "jitter",
+        ],
+    },
+    "order": [
+        "test",
+        "example",
+    ],
+}
+
+# Cosmetic
+
+
+def getTerminalSize():
+    "Return the size of the terminal : COLUMNS, LINES"
+
+    env = os.environ
+
+    def ioctl_GWINSZ(fd):
+        try:
+            import fcntl
+            import termios
+            import struct
+            import os
+            cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ,
+                                                 '1234'))
+        except:
+            return
+        return cr
+    cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
+    if not cr:
+        try:
+            fd = os.open(os.ctermid(), os.O_RDONLY)
+            cr = ioctl_GWINSZ(fd)
+            os.close(fd)
+        except:
+            pass
+    if not cr:
+        cr = (env.get('LINES', 25), env.get('COLUMNS', 80))
+    return int(cr[1]), int(cr[0])
+
+
+WIDTH = getTerminalSize()[0]
+colors = {"red": "\033[91;1m",
+          "end": "\033[0m",
+          "green": "\033[92;1m",
+          "lightcyan": "\033[96m",
+          "blue": "\033[94;1m"}
+
+
+def write_colored(text, color, already_printed=0):
+    text_colored = colors[color] + text + colors["end"]
+    print " " * (WIDTH - already_printed - len(text)) + text_colored
+
+
+def write_underline(text):
+    print "\033[4m" + text + colors["end"]
+
+
+def print_conf(conf, value):
+    return colors["green"] + conf + ": " + colors["end"] + str(value)
+
+
+def clr_screen(global_state, pstate):
+    "Update the screen to display some information"
+
+    # Header
+    to_print = []
+    to_print.append(" " * (global_state["termSize"][0] / 2 - 10) + colors[
+                    "blue"] + "Miasm2 Regression tests" + colors["end"])
+    to_print.append("")
+    to_print.append("=" * global_state["termSize"][0])
+    to_print.append("")
+    to_print.append(print_conf("Current mode", "Multiprocessing"))
+    to_print.append(print_conf("Nb CPU detected", global_state["cpu_c"]))
+    to_print.append("")
+    to_print.append("=" * global_state["termSize"][0])
+    to_print.append("")
+    to_print.append(
+        print_conf("Current section", global_state["section"].upper()))
+    to_print.append(
+        print_conf("Current subsection", global_state["subsection"].upper()))
+    test_done = 0
+    test_failed = 0
+    message = global_state["message"] + "\n"
+    for k, v in pstate.items():
+        if v["status"] != "running":
+            test_done += 1
+            if v["status"] != 0:
+                test_failed += 1
+                message += colors["red"] + "FAIL: " + colors["end"] + k
+                message += v["message"] + "\n"
+
+    to_print.append(print_conf("Success rate", "%d/%d" %
+                    (test_done - test_failed, test_done)))
+    printed_time = time.strftime(
+        "%M:%S", time.gmtime(time.time() - global_state["init_time"]))
+    to_print.append(print_conf("Cumulated time", printed_time))
+    to_print.append("")
+    to_print.append("=" * global_state["termSize"][0])
+
+    cur = "\n".join(to_print)
+    cur += "\n"
+
+    # Message
+    cur += message
+    print cur
+    already_printed = cur.count("\n")
+
+    # Current state
+    current_job = []
+    for t in pstate.values():
+        if t["status"] == "running":
+            current_job.append(t)
+    print "\n" * (global_state["termSize"][1] - already_printed - 3 - len(current_job))
+
+    for j in current_job:
+        s = "[" + colors["lightcyan"] + j["command"] + colors["end"]
+        s_end = time.strftime(
+            "%M:%Ss", time.gmtime(time.time() - j["init_time"]))
+        l = len(j["command"]) + len(s_end) + 4 + len(str(j["pid"])) + 2
+        s_end += "    " + colors["blue"] + str(j["pid"]) + colors["end"] + "]"
+        print "%s%s%s" % (s, " " * (global_state["termSize"][0] - l), s_end)
+
+# Tests handling
+
+
+def are_tests_finished(test_names, done):
+    for t in test_names:
+        if t not in done:
+            return False
+    return True
+
+
+def are_tests_finished_multi(test_names, pstate):
+    for t in test_names:
+        t = " ".join(t)
+        if t not in pstate.keys():
+            return False
+        if pstate[t]["status"] == "running":
+            return False
+    return True
+
+
+def test_iter(done):
+    "Return an iterator on next tests, wait for previous sections"
+
+    for section_name in all_tests["order"]:
+        # Go to the right directory
+        os.chdir(os.path.join("..", section_name))
+
+        # Update global state
+        section_content = all_tests[section_name]
+        write_underline(section_name.upper())
+
+        for subsection_name in section_content["order"]:
+            subsection_content = section_content[subsection_name]
+            write_underline("%s > %s" % (section_name.upper(),
+                                         subsection_name.upper()))
+            for test_line in subsection_content:
+                yield test_line
+
+            while not(are_tests_finished(subsection_content, done)):
+                time.sleep(0.050)
+
+
+def test_iter_multi(global_state, pstate):
+    "Multiprocessor version of test_iter"
+
+    # Global message : subsections done
+    message = ""
+
+    for section_name in all_tests["order"]:
+        # Update global state
+        section_content = all_tests[section_name]
+        global_state["section"] = section_name
+
+        for subsection_name in section_content["order"]:
+            subsection_content = section_content[subsection_name]
+            beg_time = time.time()
+            global_state["subsection"] = subsection_name
+
+            for test_line in subsection_content:
+                yield test_line
+
+            while not(are_tests_finished_multi(subsection_content, pstate)):
+                # Wait for task to finish, update the screen
+                time.sleep(0.100)
+                clr_screen(global_state, pstate)
+
+            message += "%s > %s completed in %.08f seconds\n" % (section_name.upper(),
+                                                                 subsection_name.upper(
+                                                                 ),
+                                                                 time.time() - beg_time)
+            global_state["message"] = message
+
+    # Final update
+    clr_screen(global_state, pstate)
+
+
+def run_test(test, coveragerc=None):
+    s = "Running tests on %s ..." % " ".join(test)
+    sys.stdout.write(s)
+    sys.stdout.flush()
+
+    args = test
+    if coveragerc is not None:
+        args = ["-m", "coverage", "run", "--rcfile", coveragerc, "-a"] + test
+
+    # Launch test
+    testpy = subprocess.Popen(["python"] + args, stdout=subprocess.PIPE,
+                              stderr=subprocess.PIPE)
+    outputs = testpy.communicate()
+
+    # Check result
+    if testpy.returncode == 0:
+        write_colored("OK", "green", len(s))
+    else:
+        write_colored("ERROR", "red", len(s))
+        print outputs[1]
+
+
+def run_test_parallel(test, current, global_state):
+
+    pid = os.getpid()
+    test_key = " ".join(test)
+
+    # Keep current PID
+    current[test_key] = {"status": "running",
+                         "pid": pid,
+                         "command": test_key,
+                         "init_time": time.time()}
+
+    # Go to the right directory
+    os.chdir(os.path.join("..", global_state["section"]))
+
+    # Launch test
+    testpy = subprocess.Popen(["python"] + test, stdout=subprocess.PIPE,
+                              stderr=subprocess.PIPE)
+    outputs = testpy.communicate()
+
+    # Check result
+    message = ""
+    if testpy.returncode != 0:
+        message = outputs[1]
+
+    # Update result
+    current[test_key] = {"status": testpy.returncode,
+                         "message": message}
+
+# Multiprocessing handling
+
+try:
+    from multiprocessing import Manager, Pool, cpu_count
+    multiproc = True
+except ImportError:
+    multiproc = False
+
+# Argument parsing
+parser = argparse.ArgumentParser(description="Miasm2 testing tool")
+parser.add_argument("-m", "--mono", help="Force monothreading",
+                    action="store_true")
+parser.add_argument("-c", "--coverage", help="Include code coverage",
+                    action="store_true")
+args = parser.parse_args()
+
+if args.mono is True or args.coverage is True:
+    multiproc = False
+
+# Handle coverage
+coveragerc = None
+if args.coverage is True:
+    try:
+        import coverage
+    except ImportError:
+        print "%(red)s[Coverage]%(end)s Python 'coverage' module is required" % colors
+        exit(-1)
+
+    # Create directory
+    suffix = "_" + str(int(time.time()))
+    cov_dir = tempfile.mkdtemp(suffix, "m2_coverage_")
+
+    # Create configuration file
+    coveragerc = os.path.join(cov_dir, ".coveragerc")
+    coverage = os.path.join(cov_dir, ".coverage")
+
+    from ConfigParser import ConfigParser
+    from os.path import expanduser
+
+    config = ConfigParser()
+    config.read(['/etc/coveragerc', expanduser('~/.coveragerc')])
+    if not config.has_section('run'):
+        config.add_section('run')
+    config.set('run', 'data_file', coverage)
+    config.write(open(coveragerc, 'w'))
+
+    # Inform the user
+    d = {"blue": colors['blue'],
+         "end": colors['end'],
+         "cov_dir": cov_dir}
+    print "[%(blue)sCoverage%(end)s] Report will be written in %(cov_dir)s" % d
+
+# Handle llvm modularity
+
+llvm = True
+try:
+    import llvm
+except ImportError:
+    llvm = False
+
+# if llvm.version != (3,2):
+#    llvm = False
+
+if llvm is False:
+    print "%(red)s[LLVM]%(end)s Python 'py-llvm 3.2' module is required for llvm tests" % colors
+
+    # Remove llvm tests
+    for test in all_tests["example"]["jitter"]:
+        if "llvm" in test:
+            all_tests["example"]["jitter"].remove(test)
+            print "%(red)s[LLVM]%(end)s Remove" % colors, " ".join(test)
+
+    # Let the user see messages
+    time.sleep(0.5)
+
+# Run tests
+
+if multiproc is False:
+    done = list()
+    for test in test_iter(done):
+        run_test(test, coveragerc=coveragerc)
+        done.append(test)
+
+else:
+    # Parallel version
+    cpu_c = cpu_count()
+    global_state = {"cpu_c": cpu_c,
+                    "init_time": time.time(),
+                    "termSize": getTerminalSize(),
+                    "message": ""}
+
+    manager = Manager()
+    pool = Pool(processes=cpu_c)
+    current = manager.dict()
+
+    for test in test_iter_multi(global_state, current):
+        pool.apply_async(run_test_parallel, (test,
+                                             current,
+                                             global_state))
+
+    pool.close()
+    pool.join()