summary refs log tree commit diff stats
path: root/tests/qemu-iotests/iotests.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/qemu-iotests/iotests.py')
-rw-r--r--tests/qemu-iotests/iotests.py145
1 files changed, 81 insertions, 64 deletions
diff --git a/tests/qemu-iotests/iotests.py b/tests/qemu-iotests/iotests.py
index 5af0182895..777fa2ec0e 100644
--- a/tests/qemu-iotests/iotests.py
+++ b/tests/qemu-iotests/iotests.py
@@ -20,7 +20,6 @@ import atexit
 import bz2
 from collections import OrderedDict
 import faulthandler
-import io
 import json
 import logging
 import os
@@ -32,7 +31,7 @@ import subprocess
 import sys
 import time
 from typing import (Any, Callable, Dict, Iterable,
-                    List, Optional, Sequence, Tuple, TypeVar)
+                    List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
 import unittest
 
 from contextlib import contextmanager
@@ -113,15 +112,14 @@ def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
     Run a tool and return both its output and its exit code
     """
     stderr = subprocess.STDOUT if connect_stderr else None
-    subp = subprocess.Popen(args,
-                            stdout=subprocess.PIPE,
-                            stderr=stderr,
-                            universal_newlines=True)
-    output = subp.communicate()[0]
-    if subp.returncode < 0:
-        cmd = ' '.join(args)
-        sys.stderr.write(f'{tool} received signal {-subp.returncode}: {cmd}\n')
-    return (output, subp.returncode)
+    with subprocess.Popen(args, stdout=subprocess.PIPE,
+                          stderr=stderr, universal_newlines=True) as subp:
+        output = subp.communicate()[0]
+        if subp.returncode < 0:
+            cmd = ' '.join(args)
+            sys.stderr.write(f'{tool} received signal \
+                               {-subp.returncode}: {cmd}\n')
+        return (output, subp.returncode)
 
 def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
     """
@@ -237,6 +235,9 @@ def qemu_io_silent_check(*args):
 class QemuIoInteractive:
     def __init__(self, *args):
         self.args = qemu_io_args_no_fmt + list(args)
+        # We need to keep the Popen objext around, and not
+        # close it immediately. Therefore, disable the pylint check:
+        # pylint: disable=consider-using-with
         self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
                                    stdout=subprocess.PIPE,
                                    stderr=subprocess.STDOUT,
@@ -310,22 +311,22 @@ def qemu_nbd_popen(*args):
     cmd.extend(args)
 
     log('Start NBD server')
-    p = subprocess.Popen(cmd)
-    try:
-        while not os.path.exists(pid_file):
-            if p.poll() is not None:
-                raise RuntimeError(
-                    "qemu-nbd terminated with exit code {}: {}"
-                    .format(p.returncode, ' '.join(cmd)))
-
-            time.sleep(0.01)
-        yield
-    finally:
-        if os.path.exists(pid_file):
-            os.remove(pid_file)
-        log('Kill NBD server')
-        p.kill()
-        p.wait()
+    with subprocess.Popen(cmd) as p:
+        try:
+            while not os.path.exists(pid_file):
+                if p.poll() is not None:
+                    raise RuntimeError(
+                        "qemu-nbd terminated with exit code {}: {}"
+                        .format(p.returncode, ' '.join(cmd)))
+
+                time.sleep(0.01)
+            yield
+        finally:
+            if os.path.exists(pid_file):
+                os.remove(pid_file)
+            log('Kill NBD server')
+            p.kill()
+            p.wait()
 
 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
     '''Return True if two image files are identical'''
@@ -334,13 +335,12 @@ def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
 
 def create_image(name, size):
     '''Create a fully-allocated raw image with sector markers'''
-    file = open(name, 'wb')
-    i = 0
-    while i < size:
-        sector = struct.pack('>l504xl', i // 512, i // 512)
-        file.write(sector)
-        i = i + 512
-    file.close()
+    with open(name, 'wb') as file:
+        i = 0
+        while i < size:
+            sector = struct.pack('>l504xl', i // 512, i // 512)
+            file.write(sector)
+            i = i + 512
 
 def image_size(img):
     '''Return image's virtual size'''
@@ -1271,37 +1271,54 @@ def skip_if_user_is_root(func):
             return func(*args, **kwargs)
     return func_wrapper
 
-def execute_unittest(debug=False):
+# We need to filter out the time taken from the output so that
+# qemu-iotest can reliably diff the results against master output,
+# and hide skipped tests from the reference output.
+
+class ReproducibleTestResult(unittest.TextTestResult):
+    def addSkip(self, test, reason):
+        # Same as TextTestResult, but print dot instead of "s"
+        unittest.TestResult.addSkip(self, test, reason)
+        if self.showAll:
+            self.stream.writeln("skipped {0!r}".format(reason))
+        elif self.dots:
+            self.stream.write(".")
+            self.stream.flush()
+
+class ReproducibleStreamWrapper:
+    def __init__(self, stream: TextIO):
+        self.stream = stream
+
+    def __getattr__(self, attr):
+        if attr in ('stream', '__getstate__'):
+            raise AttributeError(attr)
+        return getattr(self.stream, attr)
+
+    def write(self, arg=None):
+        arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
+        arg = re.sub(r' \(skipped=\d+\)', r'', arg)
+        self.stream.write(arg)
+
+class ReproducibleTestRunner(unittest.TextTestRunner):
+    def __init__(self, stream: Optional[TextIO] = None,
+             resultclass: Type[unittest.TestResult] = ReproducibleTestResult,
+             **kwargs: Any) -> None:
+        rstream = ReproducibleStreamWrapper(stream or sys.stdout)
+        super().__init__(stream=rstream,           # type: ignore
+                         descriptions=True,
+                         resultclass=resultclass,
+                         **kwargs)
+
+def execute_unittest(argv: List[str], debug: bool = False) -> None:
     """Executes unittests within the calling module."""
 
-    verbosity = 2 if debug else 1
-
-    if debug:
-        output = sys.stdout
-    else:
-        # We need to filter out the time taken from the output so that
-        # qemu-iotest can reliably diff the results against master output.
-        output = io.StringIO()
-
-    runner = unittest.TextTestRunner(stream=output, descriptions=True,
-                                     verbosity=verbosity)
-    try:
-        # unittest.main() will use sys.exit(); so expect a SystemExit
-        # exception
-        unittest.main(testRunner=runner)
-    finally:
-        # We need to filter out the time taken from the output so that
-        # qemu-iotest can reliably diff the results against master output.
-        if not debug:
-            out = output.getvalue()
-            out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
-
-            # Hide skipped tests from the reference output
-            out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
-            out_first_line, out_rest = out.split('\n', 1)
-            out = out_first_line.replace('s', '.') + '\n' + out_rest
-
-            sys.stderr.write(out)
+    # Some tests have warnings, especially ResourceWarnings for unclosed
+    # files and sockets.  Ignore them for now to ensure reproducibility of
+    # the test output.
+    unittest.main(argv=argv,
+                  testRunner=ReproducibleTestRunner,
+                  verbosity=2 if debug else 1,
+                  warnings=None if sys.warnoptions else 'ignore')
 
 def execute_setup_common(supported_fmts: Sequence[str] = (),
                          supported_platforms: Sequence[str] = (),
@@ -1338,7 +1355,7 @@ def execute_test(*args, test_function=None, **kwargs):
 
     debug = execute_setup_common(*args, **kwargs)
     if not test_function:
-        execute_unittest(debug)
+        execute_unittest(sys.argv, debug)
     else:
         test_function()