diff options
| -rw-r--r-- | test/utils/__init__.py | 1 | ||||
| -rw-r--r-- | test/utils/cosmetics.py | 47 | ||||
| -rw-r--r-- | test/utils/monothread.py | 20 | ||||
| -rw-r--r-- | test/utils/screendisplay.py | 115 | ||||
| -rw-r--r-- | test/utils/test.py | 29 | ||||
| -rw-r--r-- | test/utils/testset.py | 226 |
6 files changed, 438 insertions, 0 deletions
diff --git a/test/utils/__init__.py b/test/utils/__init__.py new file mode 100644 index 00000000..d34ce5f7 --- /dev/null +++ b/test/utils/__init__.py @@ -0,0 +1 @@ +__all__ = ["test", "testset", "cosmetics"] diff --git a/test/utils/cosmetics.py b/test/utils/cosmetics.py new file mode 100644 index 00000000..d870507b --- /dev/null +++ b/test/utils/cosmetics.py @@ -0,0 +1,47 @@ +import os + + +def getTerminalSize(): + "Return the size of the terminal : COLUMNS, LINES" + + env = os.environ + + def ioctl_GWINSZ(fd): + try: + import fcntl + import termios + import struct + import os + cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, + '1234')) + except: + return + return cr + cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2) + if not cr: + try: + fd = os.open(os.ctermid(), os.O_RDONLY) + cr = ioctl_GWINSZ(fd) + os.close(fd) + except: + pass + if not cr: + cr = (env.get('LINES', 25), env.get('COLUMNS', 80)) + return int(cr[1]), int(cr[0]) + + +WIDTH = getTerminalSize()[0] +colors = {"red": "\033[91;1m", + "end": "\033[0m", + "green": "\033[92;1m", + "lightcyan": "\033[96m", + "blue": "\033[94;1m"} + + +def write_colored(text, color, already_printed=0): + text_colored = colors[color] + text + colors["end"] + print " " * (WIDTH - already_printed - len(text)) + text_colored + + +def write_underline(text): + print "\033[4m" + text + colors["end"] diff --git a/test/utils/monothread.py b/test/utils/monothread.py new file mode 100644 index 00000000..ae64f3c5 --- /dev/null +++ b/test/utils/monothread.py @@ -0,0 +1,20 @@ +import sys +import cosmetics + + +def task_done(test, error): + s = "[%s] Running tests on %s ..." % (test.base_dir.upper(), + " ".join(test.command_line)) + already_printed = len(s) + if error is not None: + cosmetics.write_colored("ERROR", "red", already_printed) + print error + else: + cosmetics.write_colored("OK", "green", already_printed) + + +def task_new(test): + s = "[%s] Running tests on %s ..." % (test.base_dir.upper(), + " ".join(test.command_line)) + sys.stdout.write(s) + sys.stdout.flush() diff --git a/test/utils/screendisplay.py b/test/utils/screendisplay.py new file mode 100644 index 00000000..7c7bfde1 --- /dev/null +++ b/test/utils/screendisplay.py @@ -0,0 +1,115 @@ +import time +import signal +from cosmetics import getTerminalSize, colors + + +global_state = {"termSize": getTerminalSize(), + "message": "", + "pstate": []} + + +def print_conf(conf, value): + "Print a configuration line" + return colors["green"] + conf + ": " + colors["end"] + str(value) + + +def clr_screen(): + "Update the screen to display some information" + + # Header + to_print = [] + to_print.append(" " * (global_state["termSize"][0] / 2 - 10) + colors[ + "blue"] + "Miasm2 Regression tests" + colors["end"]) + to_print.append("") + to_print.append("=" * global_state["termSize"][0]) + to_print.append("") + to_print.append(print_conf("Current mode", "Multiprocessing")) + to_print.append(print_conf("Nb CPU detected", global_state["cpu_c"])) + to_print.append("") + to_print.append("=" * global_state["termSize"][0]) + to_print.append("") + test_done = 0 + test_failed = 0 + message = global_state["message"] + "\n" + for v in global_state["pstate"]: + if v["status"] != "running": + test_done += 1 + if v["status"] != 0: + test_failed += 1 + cmd_line = " ".join(v["test"].command_line) + message += colors["red"] + "FAIL:" + colors["end"] + cmd_line + message += "\n" + v["message"] + "\n" + + to_print.append(print_conf("Success rate", "%d/%d" % + (test_done - test_failed, test_done))) + printed_time = time.strftime( + "%M:%S", time.gmtime(time.time() - global_state["init_time"])) + to_print.append(print_conf("Cumulated time", printed_time)) + to_print.append("") + to_print.append("=" * global_state["termSize"][0]) + + cur = "\n".join(to_print) + cur += "\n" + + # Message + cur += message + print cur + already_printed = cur.count("\n") + + # Current state + current_job = [] + for process in global_state["pstate"]: + if process["status"] == "running": + current_job.append(process) + print "\n" * (global_state["termSize"][1] - already_printed - 3 - len(current_job)) + + for job in current_job: + command_line = " ".join(job["test"].command_line) + base_dir = job["test"].base_dir.upper() + s = "[" + colors["lightcyan"] + command_line + colors["end"] + s_end = base_dir + cur_time = time.strftime( + "%M:%Ss", time.gmtime(time.time() - job["init_time"])) + l = len(command_line) + len(s_end) + 4 + len(str(cur_time)) + 2 + s_end += " " + colors["blue"] + cur_time + colors["end"] + "]" + print "%s%s%s" % (s, " " * (global_state["termSize"][0] - l), s_end) + + +def on_signal(sig1, sig2): + "Update view every second" + clr_screen() + signal.alarm(1) + + +def init(cpu_c): + """Initialize global state + @cpu_c: number of cpu (for conf displaying) + """ + # Init global_state + global_state["cpu_c"] = cpu_c + global_state["init_time"] = time.time() + + # Launch view updater + signal.signal(signal.SIGALRM, on_signal) + signal.alarm(1) + + +def task_done(test, error): + "Report a test done" + for task in global_state["pstate"]: + if task["test"] == test: + if error is not None: + task["status"] = -1 + task["message"] = error + else: + task["status"] = 0 + break + clr_screen() + + +def task_new(test): + "Report a new test" + global_state["pstate"].append({"status": "running", + "test": test, + "init_time": time.time()}) + clr_screen() diff --git a/test/utils/test.py b/test/utils/test.py new file mode 100644 index 00000000..1caf1013 --- /dev/null +++ b/test/utils/test.py @@ -0,0 +1,29 @@ +class Test(object): + "Stand for a test to run" + + def __init__(self, command_line, base_dir="", depends=None, + products=None): + """Create a Test instance. + @command_line: list of string standing for arguments to launch + @base_dir: base directory for launch + @depends: list of Test instance indicating dependencies + @products: elements produced to remove after tests + """ + self.command_line = command_line + self.base_dir = base_dir + self.depends = depends if depends else [] + self.products = products if products else [] + + def __repr__(self): + displayed = ["command_line", "base_dir", "depends", "products"] + return "<Test " + \ + " ".join("%s=%s" % (n, getattr(self,n)) for n in displayed ) + ">" + + def __eq__(self, test): + if not isinstance(test, Test): + return False + + return all([self.command_line == test.command_line, + self.base_dir == test.base_dir, + self.depends == test.depends, + self.products == test.products]) diff --git a/test/utils/testset.py b/test/utils/testset.py new file mode 100644 index 00000000..ca7022fb --- /dev/null +++ b/test/utils/testset.py @@ -0,0 +1,226 @@ +import os +import subprocess +from multiprocessing import cpu_count, Queue, Process +from test import Test + + +class Message(object): + "Message exchanged in the TestSet message queue" + pass + + +class MessageTaskNew(object): + "Stand for a new task" + def __init__(self, task): + self.task = task + + +class MessageTaskDone(object): + "Stand for a task done" + def __init__(self, task, error): + self.task = task + self.error = error + + +class MessageClose(object): + "Close the channel" + pass + + +class TestSet(object): + "Manage a set of test" + + def __init__(self, base_dir): + """Initalise a test set + @base_dir: base directory for tests + """ + # Parse arguments + self.base_dir = base_dir + + # Init internals + self.task_done_cb = lambda tst, err: None # On task done callback + self.task_new_cb = lambda tst: None # On new task callback + self.todo_queue = Queue() # Tasks to do + self.message_queue = Queue() # Messages with workers + self.tests = [] # Tests to run + self.tests_done = [] # Tasks done + self.cpu_c = cpu_count() # CPUs available + self.errorcode = 0 # Non-zero if a test failed + self.additional_args = [] # Arguments to always add + + def __add__(self, test): + "Same as TestSet.add" + self.add(test) + return self + + def add(self, test): + "Add a test instance to the current test set" + if not isinstance(test, Test): + raise ValueError("%s is not a valid test instance" % (repr(test))) + self.tests.append(test) + + def set_cpu_numbers(self, cpu_c): + """Set the number of cpu to use + @cpu_c: Number of CPU to use (default is maximum) + """ + self.cpu_c = cpu_c + + def set_callback(self, task_done=None, task_new=None): + """Set callbacks for task information retrieval + @task_done: function(Test, Error message) + @task_new: function(Test) + """ + if task_done: + self.task_done_cb = task_done + if task_new: + self.task_new_cb = task_new + + def add_tasks(self): + "Add tests to do, regarding to dependencies" + for test in self.tests: + # Check dependencies + launchable = True + for dependency in test.depends: + if dependency not in self.tests_done: + launchable = False + break + + if launchable: + # Add task + self.tests.remove(test) + self.todo_queue.put(test) + + if len(self.tests) == 0: + # Poison pills + for _ in xrange(self.cpu_c): + self.todo_queue.put(None) + + # All tasks done + if len(self.tests_done) == self.init_tests_number: + self.message_queue.put(MessageClose()) + + def messages_handler(self): + "Manage message between Master and Workers" + + # Main loop + while True: + message = self.message_queue.get() + if isinstance(message, MessageClose): + # Poison pill + break + elif isinstance(message, MessageTaskNew): + # A task begins + self.task_new_cb(message.task) + elif isinstance(message, MessageTaskDone): + # A task has been done + self.tests_done.append(message.task) + self.add_tasks() + self.task_done_cb(message.task, message.error) + if message.error is not None: + self.errorcode = -1 + else: + raise ValueError("Unknown message type %s" % type(message)) + + @staticmethod + def worker(todo_queue, message_queue, init_args): + """Worker launched in parrallel + @todo_queue: task to do + @message_queue: communication with Host + @init_args: additionnal arguments for command line + """ + + # Main loop + while True: + # Acquire a task + test = todo_queue.get() + if test is None: + break + message_queue.put(MessageTaskNew(test)) + + # Go to the expected directory + current_directory = os.getcwd() + os.chdir(test.base_dir) + + # Launch test + testpy = subprocess.Popen(["python"] + init_args + test.command_line, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + outputs = testpy.communicate() + + # Check result + error = None + if testpy.returncode != 0: + error = outputs[1] + + # Restore directory + os.chdir(current_directory) + + # Report task finish + message_queue.put(MessageTaskDone(test, error)) + + def clean(self): + "Remove produced files" + + for test in self.tests_done: + # Go to the expected directory + current_directory = os.getcwd() + os.chdir(test.base_dir) + + # Remove files + for product in test.products: + try: + os.remove(product) + except OSError: + print "Cleanning error: Unable to remove %s" % product + + # Restore directory + os.chdir(current_directory) + + def add_additionnal_args(self, args): + """Add arguments to used on the test command line + @args: list of str + """ + self.add_additionnal_args += args + + def run(self): + "Launch tests" + + # Go in the right directory + current_directory = os.getcwd() + os.chdir(self.base_dir) + + # Launch workers + processes = [] + for _ in xrange(self.cpu_c): + p = Process(target=TestSet.worker, args=(self.todo_queue, + self.message_queue, + self.additional_args)) + + processes.append(p) + p.start() + + # Add initial tasks + self.init_tests_number = len(self.tests) + # Initial tasks + self.add_tasks() + + # Handle messages + self.messages_handler() + + # Close queue and join processes + self.todo_queue.close() + self.todo_queue.join_thread() + self.message_queue.close() + self.message_queue.join_thread() + for p in processes: + p.join() + + # Clean + self.clean() + + # Restore directory + os.chdir(current_directory) + + def tests_passed(self): + "Return a non zero value if at least one test failed" + return self.errorcode |