about summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--test/utils/__init__.py1
-rw-r--r--test/utils/cosmetics.py47
-rw-r--r--test/utils/monothread.py20
-rw-r--r--test/utils/screendisplay.py115
-rw-r--r--test/utils/test.py29
-rw-r--r--test/utils/testset.py226
6 files changed, 438 insertions, 0 deletions
diff --git a/test/utils/__init__.py b/test/utils/__init__.py
new file mode 100644
index 00000000..d34ce5f7
--- /dev/null
+++ b/test/utils/__init__.py
@@ -0,0 +1 @@
+__all__ = ["test", "testset", "cosmetics"]
diff --git a/test/utils/cosmetics.py b/test/utils/cosmetics.py
new file mode 100644
index 00000000..d870507b
--- /dev/null
+++ b/test/utils/cosmetics.py
@@ -0,0 +1,47 @@
+import os
+
+
+def getTerminalSize():
+    "Return the size of the terminal : COLUMNS, LINES"
+
+    env = os.environ
+
+    def ioctl_GWINSZ(fd):
+        try:
+            import fcntl
+            import termios
+            import struct
+            import os
+            cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ,
+                                                 '1234'))
+        except:
+            return
+        return cr
+    cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
+    if not cr:
+        try:
+            fd = os.open(os.ctermid(), os.O_RDONLY)
+            cr = ioctl_GWINSZ(fd)
+            os.close(fd)
+        except:
+            pass
+    if not cr:
+        cr = (env.get('LINES', 25), env.get('COLUMNS', 80))
+    return int(cr[1]), int(cr[0])
+
+
+WIDTH = getTerminalSize()[0]
+colors = {"red": "\033[91;1m",
+          "end": "\033[0m",
+          "green": "\033[92;1m",
+          "lightcyan": "\033[96m",
+          "blue": "\033[94;1m"}
+
+
+def write_colored(text, color, already_printed=0):
+    text_colored = colors[color] + text + colors["end"]
+    print " " * (WIDTH - already_printed - len(text)) + text_colored
+
+
+def write_underline(text):
+    print "\033[4m" + text + colors["end"]
diff --git a/test/utils/monothread.py b/test/utils/monothread.py
new file mode 100644
index 00000000..ae64f3c5
--- /dev/null
+++ b/test/utils/monothread.py
@@ -0,0 +1,20 @@
+import sys
+import cosmetics
+
+
+def task_done(test, error):
+    s = "[%s] Running tests on %s ..." % (test.base_dir.upper(),
+                                          " ".join(test.command_line))
+    already_printed = len(s)
+    if error is not None:
+        cosmetics.write_colored("ERROR", "red", already_printed)
+        print error
+    else:
+        cosmetics.write_colored("OK", "green", already_printed)
+
+
+def task_new(test):
+    s = "[%s] Running tests on %s ..." % (test.base_dir.upper(),
+                                          " ".join(test.command_line))
+    sys.stdout.write(s)
+    sys.stdout.flush()
diff --git a/test/utils/screendisplay.py b/test/utils/screendisplay.py
new file mode 100644
index 00000000..7c7bfde1
--- /dev/null
+++ b/test/utils/screendisplay.py
@@ -0,0 +1,115 @@
+import time
+import signal
+from cosmetics import getTerminalSize, colors
+
+
+global_state = {"termSize": getTerminalSize(),
+                "message": "",
+                "pstate": []}
+
+
+def print_conf(conf, value):
+    "Print a configuration line"
+    return colors["green"] + conf + ": " + colors["end"] + str(value)
+
+
+def clr_screen():
+    "Update the screen to display some information"
+
+    # Header
+    to_print = []
+    to_print.append(" " * (global_state["termSize"][0] / 2 - 10) + colors[
+                    "blue"] + "Miasm2 Regression tests" + colors["end"])
+    to_print.append("")
+    to_print.append("=" * global_state["termSize"][0])
+    to_print.append("")
+    to_print.append(print_conf("Current mode", "Multiprocessing"))
+    to_print.append(print_conf("Nb CPU detected", global_state["cpu_c"]))
+    to_print.append("")
+    to_print.append("=" * global_state["termSize"][0])
+    to_print.append("")
+    test_done = 0
+    test_failed = 0
+    message = global_state["message"] + "\n"
+    for v in global_state["pstate"]:
+        if v["status"] != "running":
+            test_done += 1
+            if v["status"] != 0:
+                test_failed += 1
+                cmd_line = " ".join(v["test"].command_line)
+                message += colors["red"] + "FAIL:" + colors["end"] + cmd_line
+                message += "\n" + v["message"] + "\n"
+
+    to_print.append(print_conf("Success rate", "%d/%d" %
+                    (test_done - test_failed, test_done)))
+    printed_time = time.strftime(
+        "%M:%S", time.gmtime(time.time() - global_state["init_time"]))
+    to_print.append(print_conf("Cumulated time", printed_time))
+    to_print.append("")
+    to_print.append("=" * global_state["termSize"][0])
+
+    cur = "\n".join(to_print)
+    cur += "\n"
+
+    # Message
+    cur += message
+    print cur
+    already_printed = cur.count("\n")
+
+    # Current state
+    current_job = []
+    for process in global_state["pstate"]:
+        if process["status"] == "running":
+            current_job.append(process)
+    print "\n" * (global_state["termSize"][1] - already_printed - 3 - len(current_job))
+
+    for job in current_job:
+        command_line = " ".join(job["test"].command_line)
+        base_dir = job["test"].base_dir.upper()
+        s = "[" + colors["lightcyan"] + command_line + colors["end"]
+        s_end = base_dir
+        cur_time = time.strftime(
+            "%M:%Ss", time.gmtime(time.time() - job["init_time"]))
+        l = len(command_line) + len(s_end) + 4 + len(str(cur_time)) + 2
+        s_end += "    " + colors["blue"] + cur_time + colors["end"] + "]"
+        print "%s%s%s" % (s, " " * (global_state["termSize"][0] - l), s_end)
+
+
+def on_signal(sig1, sig2):
+    "Update view every second"
+    clr_screen()
+    signal.alarm(1)
+
+
+def init(cpu_c):
+    """Initialize global state
+    @cpu_c: number of cpu (for conf displaying)
+    """
+    # Init global_state
+    global_state["cpu_c"] = cpu_c
+    global_state["init_time"] = time.time()
+
+    # Launch view updater
+    signal.signal(signal.SIGALRM, on_signal)
+    signal.alarm(1)
+
+
+def task_done(test, error):
+    "Report a test done"
+    for task in global_state["pstate"]:
+        if task["test"] == test:
+            if error is not None:
+                task["status"] = -1
+                task["message"] = error
+            else:
+                task["status"] = 0
+            break
+    clr_screen()
+
+
+def task_new(test):
+    "Report a new test"
+    global_state["pstate"].append({"status": "running",
+                                   "test": test,
+                                   "init_time": time.time()})
+    clr_screen()
diff --git a/test/utils/test.py b/test/utils/test.py
new file mode 100644
index 00000000..1caf1013
--- /dev/null
+++ b/test/utils/test.py
@@ -0,0 +1,29 @@
+class Test(object):
+    "Stand for a test to run"
+
+    def __init__(self, command_line, base_dir="", depends=None,
+                 products=None):
+        """Create a Test instance.
+        @command_line: list of string standing for arguments to launch
+        @base_dir: base directory for launch
+        @depends: list of Test instance indicating dependencies
+        @products: elements produced to remove after tests
+        """
+        self.command_line = command_line
+        self.base_dir = base_dir
+        self.depends = depends if depends else []
+        self.products = products if products else []
+
+    def __repr__(self):
+        displayed = ["command_line", "base_dir", "depends", "products"]
+        return "<Test " + \
+            " ".join("%s=%s" % (n, getattr(self,n)) for n in displayed ) + ">"
+
+    def __eq__(self, test):
+        if not isinstance(test, Test):
+            return False
+
+        return all([self.command_line == test.command_line,
+                    self.base_dir == test.base_dir,
+                    self.depends == test.depends,
+                    self.products == test.products])
diff --git a/test/utils/testset.py b/test/utils/testset.py
new file mode 100644
index 00000000..ca7022fb
--- /dev/null
+++ b/test/utils/testset.py
@@ -0,0 +1,226 @@
+import os
+import subprocess
+from multiprocessing import cpu_count, Queue, Process
+from test import Test
+
+
+class Message(object):
+    "Message exchanged in the TestSet message queue"
+    pass
+
+
+class MessageTaskNew(object):
+    "Stand for a new task"
+    def __init__(self, task):
+        self.task = task
+
+
+class MessageTaskDone(object):
+    "Stand for a task done"
+    def __init__(self, task, error):
+        self.task = task
+        self.error = error
+
+
+class MessageClose(object):
+    "Close the channel"
+    pass
+
+
+class TestSet(object):
+    "Manage a set of test"
+
+    def __init__(self, base_dir):
+        """Initalise a test set
+        @base_dir: base directory for tests
+        """
+        # Parse arguments
+        self.base_dir = base_dir
+
+        # Init internals
+        self.task_done_cb = lambda tst, err: None # On task done callback
+        self.task_new_cb = lambda tst: None       # On new task callback
+        self.todo_queue = Queue()                 # Tasks to do
+        self.message_queue = Queue()              # Messages with workers
+        self.tests = []                           # Tests to run
+        self.tests_done = []                      # Tasks done
+        self.cpu_c = cpu_count()                  # CPUs available
+        self.errorcode = 0                        # Non-zero if a test failed
+        self.additional_args = []                 # Arguments to always add
+
+    def __add__(self, test):
+        "Same as TestSet.add"
+        self.add(test)
+        return self
+
+    def add(self, test):
+        "Add a test instance to the current test set"
+        if not isinstance(test, Test):
+            raise ValueError("%s is not a valid test instance" % (repr(test)))
+        self.tests.append(test)
+
+    def set_cpu_numbers(self, cpu_c):
+        """Set the number of cpu to use
+        @cpu_c: Number of CPU to use (default is maximum)
+        """
+        self.cpu_c = cpu_c
+
+    def set_callback(self, task_done=None, task_new=None):
+        """Set callbacks for task information retrieval
+        @task_done: function(Test, Error message)
+        @task_new: function(Test)
+        """
+        if task_done:
+            self.task_done_cb = task_done
+        if task_new:
+            self.task_new_cb = task_new
+
+    def add_tasks(self):
+        "Add tests to do, regarding to dependencies"
+        for test in self.tests:
+            # Check dependencies
+            launchable = True
+            for dependency in test.depends:
+                if dependency not in self.tests_done:
+                    launchable = False
+                    break
+
+            if launchable:
+                # Add task
+                self.tests.remove(test)
+                self.todo_queue.put(test)
+
+        if len(self.tests) == 0:
+            # Poison pills
+            for _ in xrange(self.cpu_c):
+                self.todo_queue.put(None)
+
+        # All tasks done
+        if len(self.tests_done) == self.init_tests_number:
+            self.message_queue.put(MessageClose())
+
+    def messages_handler(self):
+        "Manage message between Master and Workers"
+
+        # Main loop
+        while True:
+            message = self.message_queue.get()
+            if isinstance(message, MessageClose):
+                # Poison pill
+                break
+            elif isinstance(message, MessageTaskNew):
+                # A task begins
+                self.task_new_cb(message.task)
+            elif isinstance(message, MessageTaskDone):
+                # A task has been done
+                self.tests_done.append(message.task)
+                self.add_tasks()
+                self.task_done_cb(message.task, message.error)
+                if message.error is not None:
+                    self.errorcode = -1
+            else:
+                raise ValueError("Unknown message type %s" % type(message))
+
+    @staticmethod
+    def worker(todo_queue, message_queue, init_args):
+        """Worker launched in parrallel
+        @todo_queue: task to do
+        @message_queue: communication with Host
+        @init_args: additionnal arguments for command line
+        """
+
+        # Main loop
+        while True:
+            # Acquire a task
+            test = todo_queue.get()
+            if test is None:
+                break
+            message_queue.put(MessageTaskNew(test))
+
+            # Go to the expected directory
+            current_directory = os.getcwd()
+            os.chdir(test.base_dir)
+
+            # Launch test
+            testpy = subprocess.Popen(["python"] + init_args + test.command_line,
+                                      stdout=subprocess.PIPE,
+                                      stderr=subprocess.PIPE)
+            outputs = testpy.communicate()
+
+            # Check result
+            error = None
+            if testpy.returncode != 0:
+                error = outputs[1]
+
+            # Restore directory
+            os.chdir(current_directory)
+
+            # Report task finish
+            message_queue.put(MessageTaskDone(test, error))
+
+    def clean(self):
+        "Remove produced files"
+
+        for test in self.tests_done:
+            # Go to the expected directory
+            current_directory = os.getcwd()
+            os.chdir(test.base_dir)
+
+            # Remove files
+            for product in test.products:
+                try:
+                    os.remove(product)
+                except OSError:
+                    print "Cleanning error: Unable to remove %s" % product
+
+            # Restore directory
+            os.chdir(current_directory)
+
+    def add_additionnal_args(self, args):
+        """Add arguments to used on the test command line
+        @args: list of str
+        """
+        self.add_additionnal_args += args
+
+    def run(self):
+        "Launch tests"
+
+        # Go in the right directory
+        current_directory = os.getcwd()
+        os.chdir(self.base_dir)
+
+        # Launch workers
+        processes = []
+        for _ in xrange(self.cpu_c):
+            p = Process(target=TestSet.worker, args=(self.todo_queue,
+                                                     self.message_queue,
+                                                     self.additional_args))
+
+            processes.append(p)
+            p.start()
+
+        # Add initial tasks
+        self.init_tests_number = len(self.tests)
+        # Initial tasks
+        self.add_tasks()
+
+        # Handle messages
+        self.messages_handler()
+
+        # Close queue and join processes
+        self.todo_queue.close()
+        self.todo_queue.join_thread()
+        self.message_queue.close()
+        self.message_queue.join_thread()
+        for p in processes:
+            p.join()
+
+        # Clean
+        self.clean()
+
+        # Restore directory
+        os.chdir(current_directory)
+
+    def tests_passed(self):
+        "Return a non zero value if at least one test failed"
+        return self.errorcode