about summary refs log tree commit diff stats
path: root/test/utils/testset.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/utils/testset.py')
-rw-r--r--test/utils/testset.py226
1 files changed, 226 insertions, 0 deletions
diff --git a/test/utils/testset.py b/test/utils/testset.py
new file mode 100644
index 00000000..ca7022fb
--- /dev/null
+++ b/test/utils/testset.py
@@ -0,0 +1,226 @@
+import os
+import subprocess
+from multiprocessing import cpu_count, Queue, Process
+from test import Test
+
+
+class Message(object):
+    "Message exchanged in the TestSet message queue"
+    pass
+
+
+class MessageTaskNew(object):
+    "Stand for a new task"
+    def __init__(self, task):
+        self.task = task
+
+
+class MessageTaskDone(object):
+    "Stand for a task done"
+    def __init__(self, task, error):
+        self.task = task
+        self.error = error
+
+
+class MessageClose(object):
+    "Close the channel"
+    pass
+
+
+class TestSet(object):
+    "Manage a set of test"
+
+    def __init__(self, base_dir):
+        """Initalise a test set
+        @base_dir: base directory for tests
+        """
+        # Parse arguments
+        self.base_dir = base_dir
+
+        # Init internals
+        self.task_done_cb = lambda tst, err: None # On task done callback
+        self.task_new_cb = lambda tst: None       # On new task callback
+        self.todo_queue = Queue()                 # Tasks to do
+        self.message_queue = Queue()              # Messages with workers
+        self.tests = []                           # Tests to run
+        self.tests_done = []                      # Tasks done
+        self.cpu_c = cpu_count()                  # CPUs available
+        self.errorcode = 0                        # Non-zero if a test failed
+        self.additional_args = []                 # Arguments to always add
+
+    def __add__(self, test):
+        "Same as TestSet.add"
+        self.add(test)
+        return self
+
+    def add(self, test):
+        "Add a test instance to the current test set"
+        if not isinstance(test, Test):
+            raise ValueError("%s is not a valid test instance" % (repr(test)))
+        self.tests.append(test)
+
+    def set_cpu_numbers(self, cpu_c):
+        """Set the number of cpu to use
+        @cpu_c: Number of CPU to use (default is maximum)
+        """
+        self.cpu_c = cpu_c
+
+    def set_callback(self, task_done=None, task_new=None):
+        """Set callbacks for task information retrieval
+        @task_done: function(Test, Error message)
+        @task_new: function(Test)
+        """
+        if task_done:
+            self.task_done_cb = task_done
+        if task_new:
+            self.task_new_cb = task_new
+
+    def add_tasks(self):
+        "Add tests to do, regarding to dependencies"
+        for test in self.tests:
+            # Check dependencies
+            launchable = True
+            for dependency in test.depends:
+                if dependency not in self.tests_done:
+                    launchable = False
+                    break
+
+            if launchable:
+                # Add task
+                self.tests.remove(test)
+                self.todo_queue.put(test)
+
+        if len(self.tests) == 0:
+            # Poison pills
+            for _ in xrange(self.cpu_c):
+                self.todo_queue.put(None)
+
+        # All tasks done
+        if len(self.tests_done) == self.init_tests_number:
+            self.message_queue.put(MessageClose())
+
+    def messages_handler(self):
+        "Manage message between Master and Workers"
+
+        # Main loop
+        while True:
+            message = self.message_queue.get()
+            if isinstance(message, MessageClose):
+                # Poison pill
+                break
+            elif isinstance(message, MessageTaskNew):
+                # A task begins
+                self.task_new_cb(message.task)
+            elif isinstance(message, MessageTaskDone):
+                # A task has been done
+                self.tests_done.append(message.task)
+                self.add_tasks()
+                self.task_done_cb(message.task, message.error)
+                if message.error is not None:
+                    self.errorcode = -1
+            else:
+                raise ValueError("Unknown message type %s" % type(message))
+
+    @staticmethod
+    def worker(todo_queue, message_queue, init_args):
+        """Worker launched in parrallel
+        @todo_queue: task to do
+        @message_queue: communication with Host
+        @init_args: additionnal arguments for command line
+        """
+
+        # Main loop
+        while True:
+            # Acquire a task
+            test = todo_queue.get()
+            if test is None:
+                break
+            message_queue.put(MessageTaskNew(test))
+
+            # Go to the expected directory
+            current_directory = os.getcwd()
+            os.chdir(test.base_dir)
+
+            # Launch test
+            testpy = subprocess.Popen(["python"] + init_args + test.command_line,
+                                      stdout=subprocess.PIPE,
+                                      stderr=subprocess.PIPE)
+            outputs = testpy.communicate()
+
+            # Check result
+            error = None
+            if testpy.returncode != 0:
+                error = outputs[1]
+
+            # Restore directory
+            os.chdir(current_directory)
+
+            # Report task finish
+            message_queue.put(MessageTaskDone(test, error))
+
+    def clean(self):
+        "Remove produced files"
+
+        for test in self.tests_done:
+            # Go to the expected directory
+            current_directory = os.getcwd()
+            os.chdir(test.base_dir)
+
+            # Remove files
+            for product in test.products:
+                try:
+                    os.remove(product)
+                except OSError:
+                    print "Cleanning error: Unable to remove %s" % product
+
+            # Restore directory
+            os.chdir(current_directory)
+
+    def add_additionnal_args(self, args):
+        """Add arguments to used on the test command line
+        @args: list of str
+        """
+        self.add_additionnal_args += args
+
+    def run(self):
+        "Launch tests"
+
+        # Go in the right directory
+        current_directory = os.getcwd()
+        os.chdir(self.base_dir)
+
+        # Launch workers
+        processes = []
+        for _ in xrange(self.cpu_c):
+            p = Process(target=TestSet.worker, args=(self.todo_queue,
+                                                     self.message_queue,
+                                                     self.additional_args))
+
+            processes.append(p)
+            p.start()
+
+        # Add initial tasks
+        self.init_tests_number = len(self.tests)
+        # Initial tasks
+        self.add_tasks()
+
+        # Handle messages
+        self.messages_handler()
+
+        # Close queue and join processes
+        self.todo_queue.close()
+        self.todo_queue.join_thread()
+        self.message_queue.close()
+        self.message_queue.join_thread()
+        for p in processes:
+            p.join()
+
+        # Clean
+        self.clean()
+
+        # Restore directory
+        os.chdir(current_directory)
+
+    def tests_passed(self):
+        "Return a non zero value if at least one test failed"
+        return self.errorcode