summary refs log tree commit diff stats
path: root/tests/qemu-iotests/iotests.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/qemu-iotests/iotests.py')
-rw-r--r--tests/qemu-iotests/iotests.py366
1 files changed, 219 insertions, 147 deletions
diff --git a/tests/qemu-iotests/iotests.py b/tests/qemu-iotests/iotests.py
index 5f8c263d59..6c0e781af7 100644
--- a/tests/qemu-iotests/iotests.py
+++ b/tests/qemu-iotests/iotests.py
@@ -16,26 +16,39 @@
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 #
 
-import errno
+import atexit
+from collections import OrderedDict
+import faulthandler
+import io
+import json
+import logging
 import os
 import re
+import signal
+import struct
 import subprocess
-import string
-import unittest
 import sys
-import struct
-import json
-import signal
-import logging
-import atexit
-import io
-from collections import OrderedDict
-import faulthandler
+from typing import (Any, Callable, Dict, Iterable,
+                    List, Optional, Sequence, TypeVar)
+import unittest
 
+# pylint: disable=import-error, wrong-import-position
 sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
 from qemu import qtest
 
-assert sys.version_info >= (3,6)
+assert sys.version_info >= (3, 6)
+
+# Type Aliases
+QMPResponse = Dict[str, Any]
+
+
+# Use this logger for logging messages directly from the iotests module
+logger = logging.getLogger('qemu.iotests')
+logger.addHandler(logging.NullHandler())
+
+# Use this logger for messages that ought to be used for diff output.
+test_logger = logging.getLogger('qemu.iotests.diff_io')
+
 
 faulthandler.enable()
 
@@ -80,9 +93,11 @@ luks_default_key_secret_opt = 'key-secret=keysec0'
 def qemu_img(*args):
     '''Run qemu-img and return the exit code'''
     devnull = open('/dev/null', 'r+')
-    exitcode = subprocess.call(qemu_img_args + list(args), stdin=devnull, stdout=devnull)
+    exitcode = subprocess.call(qemu_img_args + list(args),
+                               stdin=devnull, stdout=devnull)
     if exitcode < 0:
-        sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
+        sys.stderr.write('qemu-img received signal %i: %s\n'
+                         % (-exitcode, ' '.join(qemu_img_args + list(args))))
     return exitcode
 
 def ordered_qmp(qmsg, conv_keys=True):
@@ -121,7 +136,8 @@ def qemu_img_verbose(*args):
     '''Run qemu-img without suppressing its output and return the exit code'''
     exitcode = subprocess.call(qemu_img_args + list(args))
     if exitcode < 0:
-        sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
+        sys.stderr.write('qemu-img received signal %i: %s\n'
+                         % (-exitcode, ' '.join(qemu_img_args + list(args))))
     return exitcode
 
 def qemu_img_pipe(*args):
@@ -132,7 +148,8 @@ def qemu_img_pipe(*args):
                             universal_newlines=True)
     exitcode = subp.wait()
     if exitcode < 0:
-        sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
+        sys.stderr.write('qemu-img received signal %i: %s\n'
+                         % (-exitcode, ' '.join(qemu_img_args + list(args))))
     return subp.communicate()[0]
 
 def qemu_img_log(*args):
@@ -140,12 +157,12 @@ def qemu_img_log(*args):
     log(result, filters=[filter_testfiles])
     return result
 
-def img_info_log(filename, filter_path=None, imgopts=False, extra_args=[]):
-    args = [ 'info' ]
+def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
+    args = ['info']
     if imgopts:
         args.append('--image-opts')
     else:
-        args += [ '-f', imgfmt ]
+        args += ['-f', imgfmt]
     args += extra_args
     args.append(filename)
 
@@ -162,7 +179,8 @@ def qemu_io(*args):
                             universal_newlines=True)
     exitcode = subp.wait()
     if exitcode < 0:
-        sys.stderr.write('qemu-io received signal %i: %s\n' % (-exitcode, ' '.join(args)))
+        sys.stderr.write('qemu-io received signal %i: %s\n'
+                         % (-exitcode, ' '.join(args)))
     return subp.communicate()[0]
 
 def qemu_io_log(*args):
@@ -224,7 +242,7 @@ class QemuIoInteractive:
         # quit command is in close(), '\n' is added automatically
         assert '\n' not in cmd
         cmd = cmd.strip()
-        assert cmd != 'q' and cmd != 'quit'
+        assert cmd not in ('q', 'quit')
         self._p.stdin.write(cmd + '\n')
         self._p.stdin.flush()
         return self._read_output()
@@ -246,10 +264,8 @@ def qemu_nbd_early_pipe(*args):
         sys.stderr.write('qemu-nbd received signal %i: %s\n' %
                          (-exitcode,
                           ' '.join(qemu_nbd_args + ['--fork'] + list(args))))
-    if exitcode == 0:
-        return exitcode, ''
-    else:
-        return exitcode, subp.communicate()[0]
+
+    return exitcode, subp.communicate()[0] if exitcode else ''
 
 def qemu_nbd_popen(*args):
     '''Run qemu-nbd in daemon mode and return the parent's exit code'''
@@ -286,10 +302,13 @@ win32_re = re.compile(r"\r")
 def filter_win32(msg):
     return win32_re.sub("", msg)
 
-qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* \([0-9\/.inf]* [EPTGMKiBbytes]*\/sec and [0-9\/.inf]* ops\/sec\)")
+qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
+                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
+                        r"and [0-9\/.inf]* ops\/sec\)")
 def filter_qemu_io(msg):
     msg = filter_win32(msg)
-    return qemu_io_re.sub("X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)", msg)
+    return qemu_io_re.sub("X ops; XX:XX:XX.X "
+                          "(XXX YYY/sec and XXX ops/sec)", msg)
 
 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
 def filter_chown(msg):
@@ -313,7 +332,7 @@ def filter_qmp(qmsg, filter_fn):
         items = qmsg.items()
 
     for k, v in items:
-        if isinstance(v, list) or isinstance(v, dict):
+        if isinstance(v, (dict, list)):
             qmsg[k] = filter_qmp(v, filter_fn)
         else:
             qmsg[k] = filter_fn(k, v)
@@ -324,7 +343,7 @@ def filter_testfiles(msg):
     return msg.replace(prefix, 'TEST_DIR/PID-')
 
 def filter_qmp_testfiles(qmsg):
-    def _filter(key, value):
+    def _filter(_key, value):
         if is_str(value):
             return filter_testfiles(value)
         return value
@@ -342,7 +361,9 @@ def filter_img_info(output, filename):
         line = filter_testfiles(line)
         line = line.replace(imgfmt, 'IMGFMT')
         line = re.sub('iters: [0-9]+', 'iters: XXX', line)
-        line = re.sub('uuid: [-a-f0-9]+', 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', line)
+        line = re.sub('uuid: [-a-f0-9]+',
+                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
+                      line)
         line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
         lines.append(line)
     return '\n'.join(lines)
@@ -351,36 +372,40 @@ def filter_imgfmt(msg):
     return msg.replace(imgfmt, 'IMGFMT')
 
 def filter_qmp_imgfmt(qmsg):
-    def _filter(key, value):
+    def _filter(_key, value):
         if is_str(value):
             return filter_imgfmt(value)
         return value
     return filter_qmp(qmsg, _filter)
 
-def log(msg, filters=[], indent=None):
-    '''Logs either a string message or a JSON serializable message (like QMP).
-    If indent is provided, JSON serializable messages are pretty-printed.'''
+
+Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
+
+def log(msg: Msg,
+        filters: Iterable[Callable[[Msg], Msg]] = (),
+        indent: Optional[int] = None) -> None:
+    """
+    Logs either a string message or a JSON serializable message (like QMP).
+    If indent is provided, JSON serializable messages are pretty-printed.
+    """
     for flt in filters:
         msg = flt(msg)
-    if isinstance(msg, dict) or isinstance(msg, list):
-        # Python < 3.4 needs to know not to add whitespace when pretty-printing:
-        separators = (', ', ': ') if indent is None else (',', ': ')
+    if isinstance(msg, (dict, list)):
         # Don't sort if it's already sorted
         do_sort = not isinstance(msg, OrderedDict)
-        print(json.dumps(msg, sort_keys=do_sort,
-                         indent=indent, separators=separators))
+        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
     else:
-        print(msg)
+        test_logger.info(msg)
 
 class Timeout:
-    def __init__(self, seconds, errmsg = "Timeout"):
+    def __init__(self, seconds, errmsg="Timeout"):
         self.seconds = seconds
         self.errmsg = errmsg
     def __enter__(self):
         signal.signal(signal.SIGALRM, self.timeout)
         signal.setitimer(signal.ITIMER_REAL, self.seconds)
         return self
-    def __exit__(self, type, value, traceback):
+    def __exit__(self, exc_type, value, traceback):
         signal.setitimer(signal.ITIMER_REAL, 0)
         return False
     def timeout(self, signum, frame):
@@ -389,7 +414,7 @@ class Timeout:
 def file_pattern(name):
     return "{0}-{1}".format(os.getpid(), name)
 
-class FilePaths(object):
+class FilePaths:
     """
     FilePaths is an auto-generated filename that cleans itself up.
 
@@ -490,21 +515,21 @@ class VM(qtest.QEMUQtestMachine):
         self._args.append(opts)
         return self
 
-    def add_drive(self, path, opts='', interface='virtio', format=imgfmt):
+    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
         '''Add a virtio-blk drive to the VM'''
         options = ['if=%s' % interface,
                    'id=drive%d' % self._num_drives]
 
         if path is not None:
             options.append('file=%s' % path)
-            options.append('format=%s' % format)
+            options.append('format=%s' % img_format)
             options.append('cache=%s' % cachemode)
             options.append('aio=%s' % aiomode)
 
         if opts:
             options.append(opts)
 
-        if format == 'luks' and 'key-secret' not in opts:
+        if img_format == 'luks' and 'key-secret' not in opts:
             # default luks support
             if luks_default_secret_object not in self._args:
                 self.add_object(luks_default_secret_object)
@@ -529,30 +554,37 @@ class VM(qtest.QEMUQtestMachine):
         self._args.append(addr)
         return self
 
-    def pause_drive(self, drive, event=None):
-        '''Pause drive r/w operations'''
+    def hmp(self, command_line: str, use_log: bool = False) -> QMPResponse:
+        cmd = 'human-monitor-command'
+        kwargs = {'command-line': command_line}
+        if use_log:
+            return self.qmp_log(cmd, **kwargs)
+        else:
+            return self.qmp(cmd, **kwargs)
+
+    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
+        """Pause drive r/w operations"""
         if not event:
             self.pause_drive(drive, "read_aio")
             self.pause_drive(drive, "write_aio")
             return
-        self.qmp('human-monitor-command',
-                    command_line='qemu-io %s "break %s bp_%s"' % (drive, event, drive))
+        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
 
-    def resume_drive(self, drive):
-        self.qmp('human-monitor-command',
-                    command_line='qemu-io %s "remove_break bp_%s"' % (drive, drive))
+    def resume_drive(self, drive: str) -> None:
+        """Resume drive r/w operations"""
+        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
 
-    def hmp_qemu_io(self, drive, cmd):
-        '''Write to a given drive using an HMP command'''
-        return self.qmp('human-monitor-command',
-                        command_line='qemu-io %s "%s"' % (drive, cmd))
+    def hmp_qemu_io(self, drive: str, cmd: str,
+                    use_log: bool = False) -> QMPResponse:
+        """Write to a given drive using an HMP command"""
+        return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
 
     def flatten_qmp_object(self, obj, output=None, basestr=''):
         if output is None:
             output = dict()
         if isinstance(obj, list):
-            for i in range(len(obj)):
-                self.flatten_qmp_object(obj[i], output, basestr + str(i) + '.')
+            for i, item in enumerate(obj):
+                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
         elif isinstance(obj, dict):
             for key in obj:
                 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
@@ -573,7 +605,7 @@ class VM(qtest.QEMUQtestMachine):
             result.append(filter_qmp_event(ev))
         return result
 
-    def qmp_log(self, cmd, filters=[], indent=None, **kwargs):
+    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
         full_cmd = OrderedDict((
             ("execute", cmd),
             ("arguments", ordered_qmp(kwargs))
@@ -585,7 +617,7 @@ class VM(qtest.QEMUQtestMachine):
 
     # Returns None on success, and an error string on failure
     def run_job(self, job, auto_finalize=True, auto_dismiss=False,
-                pre_finalize=None, cancel=False, use_log=True, wait=60.0):
+                pre_finalize=None, cancel=False, wait=60.0):
         """
         run_job moves a job from creation through to dismissal.
 
@@ -598,7 +630,6 @@ class VM(qtest.QEMUQtestMachine):
                              invoked prior to issuing job-finalize, if any.
         :param cancel: Bool. When true, cancels the job after the pre_finalize
                        callback.
-        :param use_log: Bool. When false, does not log QMP messages.
         :param wait: Float. Timeout value specifying how long to wait for any
                      event, in seconds. Defaults to 60.0.
         """
@@ -616,8 +647,7 @@ class VM(qtest.QEMUQtestMachine):
         while True:
             ev = filter_qmp_event(self.events_wait(events, timeout=wait))
             if ev['event'] != 'JOB_STATUS_CHANGE':
-                if use_log:
-                    log(ev)
+                log(ev)
                 continue
             status = ev['data']['status']
             if status == 'aborting':
@@ -625,29 +655,18 @@ class VM(qtest.QEMUQtestMachine):
                 for j in result['return']:
                     if j['id'] == job:
                         error = j['error']
-                        if use_log:
-                            log('Job failed: %s' % (j['error']))
+                        log('Job failed: %s' % (j['error']))
             elif status == 'ready':
-                if use_log:
-                    self.qmp_log('job-complete', id=job)
-                else:
-                    self.qmp('job-complete', id=job)
+                self.qmp_log('job-complete', id=job)
             elif status == 'pending' and not auto_finalize:
                 if pre_finalize:
                     pre_finalize()
-                if cancel and use_log:
+                if cancel:
                     self.qmp_log('job-cancel', id=job)
-                elif cancel:
-                    self.qmp('job-cancel', id=job)
-                elif use_log:
-                    self.qmp_log('job-finalize', id=job)
                 else:
-                    self.qmp('job-finalize', id=job)
+                    self.qmp_log('job-finalize', id=job)
             elif status == 'concluded' and not auto_dismiss:
-                if use_log:
-                    self.qmp_log('job-dismiss', id=job)
-                else:
-                    self.qmp('job-dismiss', id=job)
+                self.qmp_log('job-dismiss', id=job)
             elif status == 'null':
                 return error
 
@@ -710,9 +729,7 @@ class VM(qtest.QEMUQtestMachine):
 
         for bitmap in bitmaps[node_name]:
             if bitmap.get('name', '') == bitmap_name:
-                if recording is None:
-                    return bitmap
-                elif bitmap.get('recording') == recording:
+                if recording is None or bitmap.get('recording') == recording:
                     return bitmap
         return None
 
@@ -763,12 +780,13 @@ class VM(qtest.QEMUQtestMachine):
             assert node is not None, 'Cannot follow path %s%s' % (root, path)
 
             try:
-                node_id = next(edge['child'] for edge in graph['edges'] \
-                                             if edge['parent'] == node['id'] and
-                                                edge['name'] == child_name)
+                node_id = next(edge['child'] for edge in graph['edges']
+                               if (edge['parent'] == node['id'] and
+                                   edge['name'] == child_name))
+
+                node = next(node for node in graph['nodes']
+                            if node['id'] == node_id)
 
-                node = next(node for node in graph['nodes'] \
-                                 if node['id'] == node_id)
             except StopIteration:
                 node = None
 
@@ -786,6 +804,12 @@ index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
 class QMPTestCase(unittest.TestCase):
     '''Abstract base class for QMP test cases'''
 
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        # Many users of this class set a VM property we rely on heavily
+        # in the methods below.
+        self.vm = None
+
     def dictpath(self, d, path):
         '''Traverse a path in a nested dict'''
         for component in path.split('/'):
@@ -795,16 +819,18 @@ class QMPTestCase(unittest.TestCase):
                 idx = int(idx)
 
             if not isinstance(d, dict) or component not in d:
-                self.fail('failed path traversal for "%s" in "%s"' % (path, str(d)))
+                self.fail(f'failed path traversal for "{path}" in "{d}"')
             d = d[component]
 
             if m:
                 if not isinstance(d, list):
-                    self.fail('path component "%s" in "%s" is not a list in "%s"' % (component, path, str(d)))
+                    self.fail(f'path component "{component}" in "{path}" '
+                              f'is not a list in "{d}"')
                 try:
                     d = d[idx]
                 except IndexError:
-                    self.fail('invalid index "%s" in path "%s" in "%s"' % (idx, path, str(d)))
+                    self.fail(f'invalid index "{idx}" in path "{path}" '
+                              f'in "{d}"')
         return d
 
     def assert_qmp_absent(self, d, path):
@@ -831,7 +857,7 @@ class QMPTestCase(unittest.TestCase):
         else:
             self.assertEqual(result, value,
                              '"%s" is "%s", expected "%s"'
-                                 % (path, str(result), str(value)))
+                             % (path, str(result), str(value)))
 
     def assert_no_active_block_jobs(self):
         result = self.vm.qmp('query-block-jobs')
@@ -841,24 +867,27 @@ class QMPTestCase(unittest.TestCase):
         """Issue a query-named-block-nodes and assert node_name and/or
         file_name is present in the result"""
         def check_equal_or_none(a, b):
-            return a == None or b == None or a == b
+            return a is None or b is None or a == b
         assert node_name or file_name
         result = self.vm.qmp('query-named-block-nodes')
         for x in result["return"]:
             if check_equal_or_none(x.get("node-name"), node_name) and \
                     check_equal_or_none(x.get("file"), file_name):
                 return
-        self.assertTrue(False, "Cannot find %s %s in result:\n%s" % \
-                (node_name, file_name, result))
+        self.fail("Cannot find %s %s in result:\n%s" %
+                  (node_name, file_name, result))
 
     def assert_json_filename_equal(self, json_filename, reference):
         '''Asserts that the given filename is a json: filename and that its
            content is equal to the given reference object'''
         self.assertEqual(json_filename[:5], 'json:')
-        self.assertEqual(self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
-                         self.vm.flatten_qmp_object(reference))
+        self.assertEqual(
+            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
+            self.vm.flatten_qmp_object(reference)
+        )
 
-    def cancel_and_wait(self, drive='drive0', force=False, resume=False, wait=60.0):
+    def cancel_and_wait(self, drive='drive0', force=False,
+                        resume=False, wait=60.0):
         '''Cancel a block job and wait for it to finish, returning the event'''
         result = self.vm.qmp('block-job-cancel', device=drive, force=force)
         self.assert_qmp(result, 'return', {})
@@ -882,8 +911,8 @@ class QMPTestCase(unittest.TestCase):
         self.assert_no_active_block_jobs()
         return result
 
-    def wait_until_completed(self, drive='drive0', check_offset=True, wait=60.0,
-                             error=None):
+    def wait_until_completed(self, drive='drive0', check_offset=True,
+                             wait=60.0, error=None):
         '''Wait for a block job to finish, returning the event'''
         while True:
             for event in self.vm.get_qmp_events(wait=wait):
@@ -898,13 +927,13 @@ class QMPTestCase(unittest.TestCase):
                         self.assert_qmp(event, 'data/error', error)
                     self.assert_no_active_block_jobs()
                     return event
-                elif event['event'] == 'JOB_STATUS_CHANGE':
+                if event['event'] == 'JOB_STATUS_CHANGE':
                     self.assert_qmp(event, 'data/id', drive)
 
     def wait_ready(self, drive='drive0'):
-        '''Wait until a block job BLOCK_JOB_READY event'''
-        f = {'data': {'type': 'mirror', 'device': drive } }
-        event = self.vm.event_wait(name='BLOCK_JOB_READY', match=f)
+        """Wait until a BLOCK_JOB_READY event, and return the event."""
+        f = {'data': {'type': 'mirror', 'device': drive}}
+        return self.vm.event_wait(name='BLOCK_JOB_READY', match=f)
 
     def wait_ready_and_cancel(self, drive='drive0'):
         self.wait_ready(drive=drive)
@@ -933,7 +962,7 @@ class QMPTestCase(unittest.TestCase):
                 for job in result['return']:
                     if job['device'] == job_id:
                         found = True
-                        if job['paused'] == True and job['busy'] == False:
+                        if job['paused'] and not job['busy']:
                             return job
                         break
                 assert found
@@ -957,7 +986,7 @@ def notrun(reason):
     seq = os.path.basename(sys.argv[0])
 
     open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
-    print('%s not run: %s' % (seq, reason))
+    logger.warning("%s not run: %s", seq, reason)
     sys.exit(0)
 
 def case_notrun(reason):
@@ -972,7 +1001,8 @@ def case_notrun(reason):
     open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
         '    [case not run] ' + reason + '\n')
 
-def verify_image_format(supported_fmts=[], unsupported_fmts=[]):
+def _verify_image_format(supported_fmts: Sequence[str] = (),
+                         unsupported_fmts: Sequence[str] = ()) -> None:
     assert not (supported_fmts and unsupported_fmts)
 
     if 'generic' in supported_fmts and \
@@ -986,7 +1016,8 @@ def verify_image_format(supported_fmts=[], unsupported_fmts=[]):
     if not_sup or (imgfmt in unsupported_fmts):
         notrun('not suitable for this image format: %s' % imgfmt)
 
-def verify_protocol(supported=[], unsupported=[]):
+def _verify_protocol(supported: Sequence[str] = (),
+                     unsupported: Sequence[str] = ()) -> None:
     assert not (supported and unsupported)
 
     if 'generic' in supported:
@@ -996,20 +1027,20 @@ def verify_protocol(supported=[], unsupported=[]):
     if not_sup or (imgproto in unsupported):
         notrun('not suitable for this protocol: %s' % imgproto)
 
-def verify_platform(supported=None, unsupported=None):
-    if unsupported is not None:
-        if any((sys.platform.startswith(x) for x in unsupported)):
-            notrun('not suitable for this OS: %s' % sys.platform)
+def _verify_platform(supported: Sequence[str] = (),
+                     unsupported: Sequence[str] = ()) -> None:
+    if any((sys.platform.startswith(x) for x in unsupported)):
+        notrun('not suitable for this OS: %s' % sys.platform)
 
-    if supported is not None:
+    if supported:
         if not any((sys.platform.startswith(x) for x in supported)):
             notrun('not suitable for this OS: %s' % sys.platform)
 
-def verify_cache_mode(supported_cache_modes=[]):
+def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
     if supported_cache_modes and (cachemode not in supported_cache_modes):
         notrun('not suitable for this cache mode: %s' % cachemode)
 
-def verify_aio_mode(supported_aio_modes=[]):
+def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()):
     if supported_aio_modes and (aiomode not in supported_aio_modes):
         notrun('not suitable for this aio mode: %s' % aiomode)
 
@@ -1022,16 +1053,19 @@ def verify_quorum():
         notrun('quorum support missing')
 
 def qemu_pipe(*args):
-    '''Run qemu with an option to print something and exit (e.g. a help option),
-    and return its output'''
+    """
+    Run qemu with an option to print something and exit (e.g. a help option).
+
+    :return: QEMU's stdout output.
+    """
     args = [qemu_prog] + qemu_opts + list(args)
     subp = subprocess.Popen(args, stdout=subprocess.PIPE,
                             stderr=subprocess.STDOUT,
                             universal_newlines=True)
     exitcode = subp.wait()
     if exitcode < 0:
-        sys.stderr.write('qemu received signal %i: %s\n' % (-exitcode,
-                         ' '.join(args)))
+        sys.stderr.write('qemu received signal %i: %s\n' %
+                         (-exitcode, ' '.join(args)))
     return subp.communicate()[0]
 
 def supported_formats(read_only=False):
@@ -1049,7 +1083,7 @@ def supported_formats(read_only=False):
 
     return supported_formats.formats[read_only]
 
-def skip_if_unsupported(required_formats=[], read_only=False):
+def skip_if_unsupported(required_formats=(), read_only=False):
     '''Skip Test Decorator
        Runs the test if all the required formats are whitelisted'''
     def skip_test_decorator(func):
@@ -1061,8 +1095,9 @@ def skip_if_unsupported(required_formats=[], read_only=False):
 
             usf_list = list(set(fmts) - set(supported_formats(read_only)))
             if usf_list:
-                test_case.case_skip('{}: formats {} are not whitelisted'.format(
-                    test_case, usf_list))
+                msg = f'{test_case}: formats {usf_list} are not whitelisted'
+                test_case.case_skip(msg)
+                return None
             else:
                 return func(test_case, *args, **kwargs)
         return func_wrapper
@@ -1074,11 +1109,23 @@ def skip_if_user_is_root(func):
     def func_wrapper(*args, **kwargs):
         if os.getuid() == 0:
             case_notrun('{}: cannot be run as root'.format(args[0]))
+            return None
         else:
             return func(*args, **kwargs)
     return func_wrapper
 
-def execute_unittest(output, verbosity, debug):
+def execute_unittest(debug=False):
+    """Executes unittests within the calling module."""
+
+    verbosity = 2 if debug else 1
+
+    if debug:
+        output = sys.stdout
+    else:
+        # We need to filter out the time taken from the output so that
+        # qemu-iotest can reliably diff the results against master output.
+        output = io.StringIO()
+
     runner = unittest.TextTestRunner(stream=output, descriptions=True,
                                      verbosity=verbosity)
     try:
@@ -1086,6 +1133,8 @@ def execute_unittest(output, verbosity, debug):
         # exception
         unittest.main(testRunner=runner)
     finally:
+        # We need to filter out the time taken from the output so that
+        # qemu-iotest can reliably diff the results against master output.
         if not debug:
             out = output.getvalue()
             out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
@@ -1097,13 +1146,19 @@ def execute_unittest(output, verbosity, debug):
 
             sys.stderr.write(out)
 
-def execute_test(test_function=None,
-                 supported_fmts=[],
-                 supported_platforms=None,
-                 supported_cache_modes=[], supported_aio_modes={},
-                 unsupported_fmts=[], supported_protocols=[],
-                 unsupported_protocols=[]):
-    """Run either unittest or script-style tests."""
+def execute_setup_common(supported_fmts: Sequence[str] = (),
+                         supported_platforms: Sequence[str] = (),
+                         supported_cache_modes: Sequence[str] = (),
+                         supported_aio_modes: Sequence[str] = (),
+                         unsupported_fmts: Sequence[str] = (),
+                         supported_protocols: Sequence[str] = (),
+                         unsupported_protocols: Sequence[str] = ()) -> bool:
+    """
+    Perform necessary setup for either script-style or unittest-style tests.
+
+    :return: Bool; Whether or not debug mode has been requested via the CLI.
+    """
+    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
 
     # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
     # indicate that we're not being run via "check". There may be
@@ -1113,34 +1168,51 @@ def execute_test(test_function=None,
         sys.stderr.write('Please run this test via the "check" script\n')
         sys.exit(os.EX_USAGE)
 
-    debug = '-d' in sys.argv
-    verbosity = 1
-    verify_image_format(supported_fmts, unsupported_fmts)
-    verify_protocol(supported_protocols, unsupported_protocols)
-    verify_platform(supported=supported_platforms)
-    verify_cache_mode(supported_cache_modes)
-    verify_aio_mode(supported_aio_modes)
+    _verify_image_format(supported_fmts, unsupported_fmts)
+    _verify_protocol(supported_protocols, unsupported_protocols)
+    _verify_platform(supported=supported_platforms)
+    _verify_cache_mode(supported_cache_modes)
+    _verify_aio_mode(supported_aio_modes)
 
+    debug = '-d' in sys.argv
     if debug:
-        output = sys.stdout
-        verbosity = 2
         sys.argv.remove('-d')
-    else:
-        # We need to filter out the time taken from the output so that
-        # qemu-iotest can reliably diff the results against master output.
-        output = io.StringIO()
-
     logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
+    logger.debug("iotests debugging messages active")
+
+    return debug
+
+def execute_test(*args, test_function=None, **kwargs):
+    """Run either unittest or script-style tests."""
 
+    debug = execute_setup_common(*args, **kwargs)
     if not test_function:
-        execute_unittest(output, verbosity, debug)
+        execute_unittest(debug)
     else:
         test_function()
 
+def activate_logging():
+    """Activate iotests.log() output to stdout for script-style tests."""
+    handler = logging.StreamHandler(stream=sys.stdout)
+    formatter = logging.Formatter('%(message)s')
+    handler.setFormatter(formatter)
+    test_logger.addHandler(handler)
+    test_logger.setLevel(logging.INFO)
+    test_logger.propagate = False
+
+# This is called from script-style iotests without a single point of entry
+def script_initialize(*args, **kwargs):
+    """Initialize script-style tests without running any tests."""
+    activate_logging()
+    execute_setup_common(*args, **kwargs)
+
+# This is called from script-style iotests with a single point of entry
 def script_main(test_function, *args, **kwargs):
     """Run script-style tests outside of the unittest framework"""
-    execute_test(test_function, *args, **kwargs)
+    activate_logging()
+    execute_test(*args, test_function=test_function, **kwargs)
 
+# This is called from unittest style iotests
 def main(*args, **kwargs):
     """Run tests using the unittest framework"""
-    execute_test(None, *args, **kwargs)
+    execute_test(*args, **kwargs)