1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
import time
import signal
from cosmetics import getTerminalSize, colors
global_state = {"termSize": getTerminalSize(),
"message": "",
"pstate": []}
def print_conf(conf, value):
"Print a configuration line"
return colors["green"] + conf + ": " + colors["end"] + str(value)
def clr_screen():
"Update the screen to display some information"
# Header
to_print = []
to_print.append(" " * (global_state["termSize"][0] / 2 - 10) + colors[
"blue"] + "Miasm2 Regression tests" + colors["end"])
to_print.append("")
to_print.append("=" * global_state["termSize"][0])
to_print.append("")
to_print.append(print_conf("Current mode", "Multiprocessing"))
to_print.append(print_conf("Nb CPU detected", global_state["cpu_c"]))
to_print.append("")
to_print.append("=" * global_state["termSize"][0])
to_print.append("")
test_done = 0
test_failed = 0
message = global_state["message"] + "\n"
for v in global_state["pstate"]:
if v["status"] != "running":
test_done += 1
if v["status"] != 0:
test_failed += 1
cmd_line = " ".join(v["test"].command_line)
message += colors["red"] + "FAIL:" + colors["end"] + cmd_line
message += "\n" + v["message"] + "\n"
to_print.append(print_conf("Success rate", "%d/%d" %
(test_done - test_failed, test_done)))
printed_time = time.strftime(
"%M:%S", time.gmtime(time.time() - global_state["init_time"]))
to_print.append(print_conf("Cumulated time", printed_time))
to_print.append("")
to_print.append("=" * global_state["termSize"][0])
cur = "\n".join(to_print)
cur += "\n"
# Message
cur += message
print cur
already_printed = cur.count("\n")
# Current state
current_job = []
for process in global_state["pstate"]:
if process["status"] == "running":
current_job.append(process)
print "\n" * (global_state["termSize"][1] - already_printed - 3 - len(current_job))
for job in current_job:
command_line = " ".join(job["test"].command_line)
base_dir = job["test"].base_dir.upper()
s = "[" + colors["lightcyan"] + command_line + colors["end"]
s_end = base_dir
cur_time = time.strftime(
"%M:%Ss", time.gmtime(time.time() - job["init_time"]))
l = len(command_line) + len(s_end) + 4 + len(str(cur_time)) + 2
s_end += " " + colors["blue"] + cur_time + colors["end"] + "]"
print "%s%s%s" % (s, " " * (global_state["termSize"][0] - l), s_end)
def on_signal(sig1, sig2):
"Update view every second"
clr_screen()
signal.alarm(1)
def init(cpu_c):
"""Initialize global state
@cpu_c: number of cpu (for conf displaying)
"""
# Init global_state
global_state["cpu_c"] = cpu_c
global_state["init_time"] = time.time()
# Launch view updater
signal.signal(signal.SIGALRM, on_signal)
signal.alarm(1)
def task_done(test, error):
"Report a test done"
for task in global_state["pstate"]:
if task["test"] == test:
if error is not None:
task["status"] = -1
task["message"] = error
else:
task["status"] = 0
break
clr_screen()
def task_new(test):
"Report a new test"
global_state["pstate"].append({"status": "running",
"test": test,
"init_time": time.time()})
clr_screen()
|