summary refs log tree commit diff stats
path: root/tests/functional/qemu_test
diff options
context:
space:
mode:
Diffstat (limited to 'tests/functional/qemu_test')
-rw-r--r--tests/functional/qemu_test/__init__.py9
-rw-r--r--tests/functional/qemu_test/archive.py117
-rw-r--r--tests/functional/qemu_test/asset.py26
-rw-r--r--tests/functional/qemu_test/cmd.py76
-rw-r--r--tests/functional/qemu_test/decorators.py107
-rw-r--r--tests/functional/qemu_test/linuxkernel.py29
-rw-r--r--tests/functional/qemu_test/tesseract.py21
-rw-r--r--tests/functional/qemu_test/testcase.py205
-rw-r--r--tests/functional/qemu_test/tuxruntest.py19
-rw-r--r--tests/functional/qemu_test/uncompress.py83
-rw-r--r--tests/functional/qemu_test/utils.py52
11 files changed, 559 insertions, 185 deletions
diff --git a/tests/functional/qemu_test/__init__.py b/tests/functional/qemu_test/__init__.py
index 67f87be9c4..da1830286d 100644
--- a/tests/functional/qemu_test/__init__.py
+++ b/tests/functional/qemu_test/__init__.py
@@ -8,8 +8,13 @@
 
 from .asset import Asset
 from .config import BUILD_DIR
-from .cmd import has_cmd, has_cmds, run_cmd, is_readable_executable_file, \
+from .cmd import is_readable_executable_file, \
     interrupt_interactive_console_until_pattern, wait_for_console_pattern, \
-    exec_command, exec_command_and_wait_for_pattern, get_qemu_img
+    exec_command, exec_command_and_wait_for_pattern, get_qemu_img, which
 from .testcase import QemuBaseTest, QemuUserTest, QemuSystemTest
 from .linuxkernel import LinuxKernelTest
+from .decorators import skipIfMissingCommands, skipIfNotMachine, \
+    skipFlakyTest, skipUntrustedTest, skipBigDataTest, \
+    skipIfMissingImports
+from .archive import archive_extract
+from .uncompress import uncompress
diff --git a/tests/functional/qemu_test/archive.py b/tests/functional/qemu_test/archive.py
new file mode 100644
index 0000000000..c803fdaf6d
--- /dev/null
+++ b/tests/functional/qemu_test/archive.py
@@ -0,0 +1,117 @@
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+# Utilities for python-based QEMU tests
+#
+# Copyright 2024 Red Hat, Inc.
+#
+# Authors:
+#  Thomas Huth <thuth@redhat.com>
+
+import os
+from subprocess import check_call, run, DEVNULL
+import tarfile
+from urllib.parse import urlparse
+import zipfile
+
+from .asset import Asset
+
+
+def tar_extract(archive, dest_dir, member=None):
+    with tarfile.open(archive) as tf:
+        if hasattr(tarfile, 'data_filter'):
+            tf.extraction_filter = getattr(tarfile, 'data_filter',
+                                           (lambda member, path: member))
+        if member:
+            tf.extract(member=member, path=dest_dir)
+        else:
+            tf.extractall(path=dest_dir)
+
+def cpio_extract(archive, output_path):
+    cwd = os.getcwd()
+    os.chdir(output_path)
+    # Not passing 'check=True' as cpio exits with non-zero
+    # status if the archive contains any device nodes :-(
+    if type(archive) == str:
+        run(['cpio', '-i', '-F', archive],
+            stdout=DEVNULL, stderr=DEVNULL)
+    else:
+        run(['cpio', '-i'],
+            input=archive.read(),
+            stdout=DEVNULL, stderr=DEVNULL)
+    os.chdir(cwd)
+
+def zip_extract(archive, dest_dir, member=None):
+    with zipfile.ZipFile(archive, 'r') as zf:
+        if member:
+            zf.extract(member=member, path=dest_dir)
+        else:
+            zf.extractall(path=dest_dir)
+
+def deb_extract(archive, dest_dir, member=None):
+    cwd = os.getcwd()
+    os.chdir(dest_dir)
+    try:
+        proc = run(['ar', 't', archive],
+                   check=True, capture_output=True, encoding='utf8')
+        file_path = proc.stdout.split()[2]
+        check_call(['ar', 'x', archive, file_path],
+                   stdout=DEVNULL, stderr=DEVNULL)
+        tar_extract(file_path, dest_dir, member)
+    finally:
+        os.chdir(cwd)
+
+'''
+@params archive: filename, Asset, or file-like object to extract
+@params dest_dir: target directory to extract into
+@params member: optional member file to limit extraction to
+
+Extracts @archive into @dest_dir. All files are extracted
+unless @member specifies a limit.
+
+If @format is None, heuristics will be applied to guess the format
+from the filename or Asset URL. @format must be non-None if @archive
+is a file-like object.
+'''
+def archive_extract(archive, dest_dir, format=None, member=None):
+    if format is None:
+        format = guess_archive_format(archive)
+    if type(archive) == Asset:
+        archive = str(archive)
+
+    if format == "tar":
+        tar_extract(archive, dest_dir, member)
+    elif format == "zip":
+        zip_extract(archive, dest_dir, member)
+    elif format == "cpio":
+        if member is not None:
+            raise Exception("Unable to filter cpio extraction")
+        cpio_extract(archive, dest_dir)
+    elif format == "deb":
+        if type(archive) != str:
+            raise Exception("Unable to use file-like object with deb archives")
+        deb_extract(archive, dest_dir, "./" + member)
+    else:
+        raise Exception(f"Unknown archive format {format}")
+
+'''
+@params archive: filename, or Asset to guess
+
+Guess the format of @compressed, raising an exception if
+no format can be determined
+'''
+def guess_archive_format(archive):
+    if type(archive) == Asset:
+        archive = urlparse(archive.url).path
+    elif type(archive) != str:
+        raise Exception(f"Unable to guess archive format for {archive}")
+
+    if ".tar." in archive or archive.endswith("tgz"):
+        return "tar"
+    elif archive.endswith(".zip"):
+        return "zip"
+    elif archive.endswith(".cpio"):
+        return "cpio"
+    elif archive.endswith(".deb") or archive.endswith(".udeb"):
+        return "deb"
+    else:
+        raise Exception(f"Unknown archive format for {archive}")
diff --git a/tests/functional/qemu_test/asset.py b/tests/functional/qemu_test/asset.py
index f126cd5863..f0730695f0 100644
--- a/tests/functional/qemu_test/asset.py
+++ b/tests/functional/qemu_test/asset.py
@@ -9,13 +9,13 @@ import hashlib
 import logging
 import os
 import stat
-import subprocess
 import sys
 import unittest
 import urllib.request
 from time import sleep
 from pathlib import Path
 from shutil import copyfileobj
+from urllib.error import HTTPError
 
 
 # Instances of this class must be declared as class level variables
@@ -40,6 +40,9 @@ class Asset:
         return "Asset: url=%s hash=%s cache=%s" % (
             self.url, self.hash, self.cache_file)
 
+    def __str__(self):
+        return str(self.cache_file)
+
     def _check(self, cache_file):
         if self.hash is None:
             return True
@@ -63,6 +66,12 @@ class Asset:
     def valid(self):
         return self.cache_file.exists() and self._check(self.cache_file)
 
+    def fetchable(self):
+        return not os.environ.get("QEMU_TEST_NO_DOWNLOAD", False)
+
+    def available(self):
+        return self.valid() or self.fetchable()
+
     def _wait_for_other_download(self, tmp_cache_file):
         # Another thread already seems to download the asset, so wait until
         # it is done, while also checking the size to see whether it is stuck
@@ -101,7 +110,7 @@ class Asset:
                            self.cache_file, self.url)
             return str(self.cache_file)
 
-        if os.environ.get("QEMU_TEST_NO_DOWNLOAD", False):
+        if not self.fetchable():
             raise Exception("Asset cache is invalid and downloads disabled")
 
         self.log.info("Downloading %s to %s...", self.url, self.cache_file)
@@ -162,7 +171,18 @@ class Asset:
         for name, asset in vars(test.__class__).items():
             if name.startswith("ASSET_") and type(asset) == Asset:
                 log.info("Attempting to cache '%s'" % asset)
-                asset.fetch()
+                try:
+                    asset.fetch()
+                except HTTPError as e:
+                    # Treat 404 as fatal, since it is highly likely to
+                    # indicate a broken test rather than a transient
+                    # server or networking problem
+                    if e.code == 404:
+                        raise
+
+                    log.debug(f"HTTP error {e.code} from {asset.url} " +
+                              "skipping asset precache")
+
         log.removeHandler(handler)
 
     def precache_suite(suite):
diff --git a/tests/functional/qemu_test/cmd.py b/tests/functional/qemu_test/cmd.py
index 11c8334a7c..dc5f422b77 100644
--- a/tests/functional/qemu_test/cmd.py
+++ b/tests/functional/qemu_test/cmd.py
@@ -14,66 +14,18 @@
 import logging
 import os
 import os.path
-import subprocess
 
-from .config import BUILD_DIR
 
-
-def has_cmd(name, args=None):
-    """
-    This function is for use in a @skipUnless decorator, e.g.:
-
-        @skipUnless(*has_cmd('sudo -n', ('sudo', '-n', 'true')))
-        def test_something_that_needs_sudo(self):
-            ...
-    """
-
-    if args is None:
-        args = ('which', name)
-
-    try:
-        _, stderr, exitcode = run_cmd(args)
-    except Exception as e:
-        exitcode = -1
-        stderr = str(e)
-
-    if exitcode != 0:
-        cmd_line = ' '.join(args)
-        err = f'{name} required, but "{cmd_line}" failed: {stderr.strip()}'
-        return (False, err)
-    else:
-        return (True, '')
-
-def has_cmds(*cmds):
+def which(tool):
+    """ looks up the full path for @tool, returns None if not found
+        or if @tool does not have executable permissions.
     """
-    This function is for use in a @skipUnless decorator and
-    allows checking for the availability of multiple commands, e.g.:
-
-        @skipUnless(*has_cmds(('cmd1', ('cmd1', '--some-parameter')),
-                              'cmd2', 'cmd3'))
-        def test_something_that_needs_cmd1_and_cmd2(self):
-            ...
-    """
-
-    for cmd in cmds:
-        if isinstance(cmd, str):
-            cmd = (cmd,)
-
-        ok, errstr = has_cmd(*cmd)
-        if not ok:
-            return (False, errstr)
-
-    return (True, '')
-
-def run_cmd(args):
-    subp = subprocess.Popen(args,
-                            stdout=subprocess.PIPE,
-                            stderr=subprocess.PIPE,
-                            universal_newlines=True)
-    stdout, stderr = subp.communicate()
-    ret = subp.returncode
-
-    return (stdout, stderr, ret)
+    paths=os.getenv('PATH')
+    for p in paths.split(os.path.pathsep):
+        p = os.path.join(p, tool)
+        if os.access(p, os.X_OK):
+            return p
+    return None
 
 def is_readable_executable_file(path):
     return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK)
@@ -241,10 +193,10 @@ def get_qemu_img(test):
 
     # If qemu-img has been built, use it, otherwise the system wide one
     # will be used.
-    qemu_img = os.path.join(BUILD_DIR, 'qemu-img')
+    qemu_img = test.build_file('qemu-img')
     if os.path.exists(qemu_img):
         return qemu_img
-    (has_system_qemu_img, errmsg) = has_cmd('qemu-img')
-    if has_system_qemu_img:
-        return 'qemu-img'
-    test.skipTest(errmsg)
+    qemu_img = which('qemu-img')
+    if qemu_img is not None:
+        return qemu_img
+    test.skipTest(f"qemu-img not found in build dir or '$PATH'")
diff --git a/tests/functional/qemu_test/decorators.py b/tests/functional/qemu_test/decorators.py
new file mode 100644
index 0000000000..df088bc090
--- /dev/null
+++ b/tests/functional/qemu_test/decorators.py
@@ -0,0 +1,107 @@
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+# Decorators useful in functional tests
+
+import os
+import platform
+from unittest import skipUnless
+
+from .cmd import which
+
+'''
+Decorator to skip execution of a test if the list
+of command binaries is not available in $PATH.
+Example:
+
+  @skipIfMissingCommands("mkisofs", "losetup")
+'''
+def skipIfMissingCommands(*args):
+    def has_cmds(cmdlist):
+        for cmd in cmdlist:
+            if not which(cmd):
+                return False
+        return True
+
+    return skipUnless(lambda: has_cmds(args),
+                      'required command(s) "%s" not installed' %
+                      ", ".join(args))
+
+'''
+Decorator to skip execution of a test if the current
+host machine does not match one of the permitted
+machines.
+Example
+
+  @skipIfNotMachine("x86_64", "aarch64")
+'''
+def skipIfNotMachine(*args):
+    return skipUnless(lambda: platform.machine() in args,
+                        'not running on one of the required machine(s) "%s"' %
+                        ", ".join(args))
+
+'''
+Decorator to skip execution of flaky tests, unless
+the $QEMU_TEST_FLAKY_TESTS environment variable is set.
+A bug URL must be provided that documents the observed
+failure behaviour, so it can be tracked & re-evaluated
+in future.
+
+Historical tests may be providing "None" as the bug_url
+but this should not be done for new test.
+
+Example:
+
+  @skipFlakyTest("https://gitlab.com/qemu-project/qemu/-/issues/NNN")
+'''
+def skipFlakyTest(bug_url):
+    if bug_url is None:
+        bug_url = "FIXME: reproduce flaky test and file bug report or remove"
+    return skipUnless(os.getenv('QEMU_TEST_FLAKY_TESTS'),
+                      f'Test is unstable: {bug_url}')
+
+'''
+Decorator to skip execution of tests which are likely
+to execute untrusted commands on the host, or commands
+which process untrusted code, unless the
+$QEMU_TEST_ALLOW_UNTRUSTED_CODE env var is set.
+Example:
+
+  @skipUntrustedTest()
+'''
+def skipUntrustedTest():
+    return skipUnless(os.getenv('QEMU_TEST_ALLOW_UNTRUSTED_CODE'),
+                      'Test runs untrusted code / processes untrusted data')
+
+'''
+Decorator to skip execution of tests which need large
+data storage (over around 500MB-1GB mark) on the host,
+unless the $QEMU_TEST_ALLOW_LARGE_STORAGE environment
+variable is set
+
+Example:
+
+  @skipBigDataTest()
+'''
+def skipBigDataTest():
+    return skipUnless(os.getenv('QEMU_TEST_ALLOW_LARGE_STORAGE'),
+                      'Test requires large host storage space')
+
+'''
+Decorator to skip execution of a test if the list
+of python imports is not available.
+Example:
+
+  @skipIfMissingImports("numpy", "cv2")
+'''
+def skipIfMissingImports(*args):
+    def has_imports(importlist):
+        for impname in importlist:
+            try:
+                import impname
+            except ImportError:
+                return False
+        return True
+
+    return skipUnless(lambda: has_imports(args),
+                      'required import(s) "%s" not installed' %
+                      ", ".join(args))
diff --git a/tests/functional/qemu_test/linuxkernel.py b/tests/functional/qemu_test/linuxkernel.py
index 2b5b9a5fda..2c9598102d 100644
--- a/tests/functional/qemu_test/linuxkernel.py
+++ b/tests/functional/qemu_test/linuxkernel.py
@@ -3,11 +3,9 @@
 # This work is licensed under the terms of the GNU GPL, version 2 or
 # later.  See the COPYING file in the top-level directory.
 
-import os
-
 from .testcase import QemuSystemTest
-from .cmd import run_cmd, wait_for_console_pattern
-from .utils import archive_extract
+from .cmd import wait_for_console_pattern
+
 
 class LinuxKernelTest(QemuSystemTest):
     KERNEL_COMMON_COMMAND_LINE = 'printk.time=0 '
@@ -28,26 +26,3 @@ class LinuxKernelTest(QemuSystemTest):
         self.vm.launch()
         if wait_for:
                 self.wait_for_console_pattern(wait_for)
-
-    def extract_from_deb(self, deb_path, path):
-        """
-        Extracts a file from a deb package into the test workdir
-
-        :param deb_path: path to the deb archive
-        :param path: path within the deb archive of the file to be extracted
-        :returns: path of the extracted file
-        """
-        cwd = os.getcwd()
-        os.chdir(self.workdir)
-        (stdout, stderr, ret) = run_cmd(['ar', 't', deb_path])
-        file_path = stdout.split()[2]
-        run_cmd(['ar', 'x', deb_path, file_path])
-        archive_extract(file_path, self.workdir)
-        os.chdir(cwd)
-        # Return complete path to extracted file.  Because callers to
-        # extract_from_deb() specify 'path' with a leading slash, it is
-        # necessary to use os.path.relpath() as otherwise os.path.join()
-        # interprets it as an absolute path and drops the self.workdir part.
-        return os.path.normpath(os.path.join(self.workdir,
-                                             os.path.relpath(path, '/')))
-
diff --git a/tests/functional/qemu_test/tesseract.py b/tests/functional/qemu_test/tesseract.py
index db441027b9..ede6c6501e 100644
--- a/tests/functional/qemu_test/tesseract.py
+++ b/tests/functional/qemu_test/tesseract.py
@@ -5,30 +5,19 @@
 # This work is licensed under the terms of the GNU GPL, version 2 or
 # later. See the COPYING file in the top-level directory.
 
-import re
 import logging
+from subprocess import run
 
-from . import has_cmd, run_cmd
-
-def tesseract_available(expected_version):
-    (has_tesseract, _) = has_cmd('tesseract')
-    if not has_tesseract:
-        return False
-    (stdout, stderr, ret) = run_cmd([ 'tesseract', '--version'])
-    if ret:
-        return False
-    version = stdout.split()[1]
-    return int(version.split('.')[0]) >= expected_version
 
 def tesseract_ocr(image_path, tesseract_args=''):
     console_logger = logging.getLogger('console')
     console_logger.debug(image_path)
-    (stdout, stderr, ret) = run_cmd(['tesseract', image_path,
-                                     'stdout'])
-    if ret:
+    proc = run(['tesseract', image_path, 'stdout'],
+               capture_output=True, encoding='utf8')
+    if proc.returncode:
         return None
     lines = []
-    for line in stdout.split('\n'):
+    for line in proc.stdout.split('\n'):
         sline = line.strip()
         if len(sline):
             console_logger.debug(sline)
diff --git a/tests/functional/qemu_test/testcase.py b/tests/functional/qemu_test/testcase.py
index 90ae59eb54..869f3949fe 100644
--- a/tests/functional/qemu_test/testcase.py
+++ b/tests/functional/qemu_test/testcase.py
@@ -13,19 +13,22 @@
 
 import logging
 import os
+from pathlib import Path
 import pycotap
 import shutil
-import subprocess
+from subprocess import run
 import sys
+import tempfile
 import unittest
 import uuid
 
 from qemu.machine import QEMUMachine
 from qemu.utils import kvm_available, tcg_available
 
+from .archive import archive_extract
 from .asset import Asset
-from .cmd import run_cmd
 from .config import BUILD_DIR
+from .uncompress import uncompress
 
 
 class QemuBaseTest(unittest.TestCase):
@@ -37,17 +40,169 @@ class QemuBaseTest(unittest.TestCase):
     log = None
     logdir = None
 
+    '''
+    @params compressed: filename, Asset, or file-like object to uncompress
+    @params format: optional compression format (gzip, lzma)
+
+    Uncompresses @compressed into the scratch directory.
+
+    If @format is None, heuristics will be applied to guess the format
+    from the filename or Asset URL. @format must be non-None if @uncompressed
+    is a file-like object.
+
+    Returns the fully qualified path to the uncompressed file
+    '''
+    def uncompress(self, compressed, format=None):
+        self.log.debug(f"Uncompress {compressed} format={format}")
+        if type(compressed) == Asset:
+            compressed.fetch()
+
+        (name, ext) = os.path.splitext(str(compressed))
+        uncompressed = self.scratch_file(os.path.basename(name))
+
+        uncompress(compressed, uncompressed, format)
+
+        return uncompressed
+
+    '''
+    @params archive: filename, Asset, or file-like object to extract
+    @params format: optional archive format (tar, zip, deb, cpio)
+    @params sub_dir: optional sub-directory to extract into
+    @params member: optional member file to limit extraction to
+
+    Extracts @archive into the scratch directory, or a directory beneath
+    named by @sub_dir. All files are extracted unless @member specifies
+    a limit.
+
+    If @format is None, heuristics will be applied to guess the format
+    from the filename or Asset URL. @format must be non-None if @archive
+    is a file-like object.
+
+    If @member is non-None, returns the fully qualified path to @member
+    '''
+    def archive_extract(self, archive, format=None, sub_dir=None, member=None):
+        self.log.debug(f"Extract {archive} format={format}" +
+                       f"sub_dir={sub_dir} member={member}")
+        if type(archive) == Asset:
+            archive.fetch()
+        if sub_dir is None:
+            archive_extract(archive, self.scratch_file(), format, member)
+        else:
+            archive_extract(archive, self.scratch_file(sub_dir),
+                            format, member)
+
+        if member is not None:
+            return self.scratch_file(member)
+        return None
+
+    '''
+    Create a temporary directory suitable for storing UNIX
+    socket paths.
+
+    Returns: a tempfile.TemporaryDirectory instance
+    '''
+    def socket_dir(self):
+        if self.socketdir is None:
+            self.socketdir = tempfile.TemporaryDirectory(
+                prefix="qemu_func_test_sock_")
+        return self.socketdir
+
+    '''
+    @params args list of zero or more subdirectories or file
+
+    Construct a path for accessing a data file located
+    relative to the source directory that is the root for
+    functional tests.
+
+    @args may be an empty list to reference the root dir
+    itself, may be a single element to reference a file in
+    the root directory, or may be multiple elements to
+    reference a file nested below. The path components
+    will be joined using the platform appropriate path
+    separator.
+
+    Returns: string representing a file path
+    '''
+    def data_file(self, *args):
+        return str(Path(Path(__file__).parent.parent, *args))
+
+    '''
+    @params args list of zero or more subdirectories or file
+
+    Construct a path for accessing a data file located
+    relative to the build directory root.
+
+    @args may be an empty list to reference the build dir
+    itself, may be a single element to reference a file in
+    the build directory, or may be multiple elements to
+    reference a file nested below. The path components
+    will be joined using the platform appropriate path
+    separator.
+
+    Returns: string representing a file path
+    '''
+    def build_file(self, *args):
+        return str(Path(BUILD_DIR, *args))
+
+    '''
+    @params args list of zero or more subdirectories or file
+
+    Construct a path for accessing/creating a scratch file
+    located relative to a temporary directory dedicated to
+    this test case. The directory and its contents will be
+    purged upon completion of the test.
+
+    @args may be an empty list to reference the scratch dir
+    itself, may be a single element to reference a file in
+    the scratch directory, or may be multiple elements to
+    reference a file nested below. The path components
+    will be joined using the platform appropriate path
+    separator.
+
+    Returns: string representing a file path
+    '''
+    def scratch_file(self, *args):
+        return str(Path(self.workdir, *args))
+
+    '''
+    @params args list of zero or more subdirectories or file
+
+    Construct a path for accessing/creating a log file
+    located relative to a temporary directory dedicated to
+    this test case. The directory and its log files will be
+    preserved upon completion of the test.
+
+    @args may be an empty list to reference the log dir
+    itself, may be a single element to reference a file in
+    the log directory, or may be multiple elements to
+    reference a file nested below. The path components
+    will be joined using the platform appropriate path
+    separator.
+
+    Returns: string representing a file path
+    '''
+    def log_file(self, *args):
+        return str(Path(self.outputdir, *args))
+
+    def assets_available(self):
+        for name, asset in vars(self.__class__).items():
+            if name.startswith("ASSET_") and type(asset) == Asset:
+                if not asset.available():
+                    self.log.debug(f"Asset {asset.url} not available")
+                    return False
+        return True
+
     def setUp(self, bin_prefix):
         self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set')
         self.arch = self.qemu_bin.split('-')[-1]
+        self.socketdir = None
 
-        self.outputdir = os.path.join(BUILD_DIR, 'tests', 'functional',
-                                      self.arch, self.id())
+        self.outputdir = self.build_file('tests', 'functional',
+                                         self.arch, self.id())
         self.workdir = os.path.join(self.outputdir, 'scratch')
         os.makedirs(self.workdir, exist_ok=True)
 
-        self.logdir = self.outputdir
-        self.log_filename = os.path.join(self.logdir, 'base.log')
+        self.log_filename = self.log_file('base.log')
         self.log = logging.getLogger('qemu-test')
         self.log.setLevel(logging.DEBUG)
         self._log_fh = logging.FileHandler(self.log_filename, mode='w')
@@ -62,9 +217,15 @@ class QemuBaseTest(unittest.TestCase):
         self.machinelog.setLevel(logging.DEBUG)
         self.machinelog.addHandler(self._log_fh)
 
+        if not self.assets_available():
+            self.skipTest('One or more assets is not available')
+
     def tearDown(self):
         if "QEMU_TEST_KEEP_SCRATCH" not in os.environ:
             shutil.rmtree(self.workdir)
+        if self.socketdir is not None:
+            shutil.rmtree(self.socketdir.name)
+            self.socketdir = None
         self.machinelog.removeHandler(self._log_fh)
         self.log.removeHandler(self._log_fh)
 
@@ -100,11 +261,11 @@ class QemuUserTest(QemuBaseTest):
         self._ldpath.append(os.path.abspath(ldpath))
 
     def run_cmd(self, bin_path, args=[]):
-        return subprocess.run([self.qemu_bin]
-                              + ["-L %s" % ldpath for ldpath in self._ldpath]
-                              + [bin_path]
-                              + args,
-                              text=True, capture_output=True)
+        return run([self.qemu_bin]
+                   + ["-L %s" % ldpath for ldpath in self._ldpath]
+                   + [bin_path]
+                   + args,
+                   text=True, capture_output=True)
 
 class QemuSystemTest(QemuBaseTest):
     """Facilitates system emulation tests."""
@@ -120,7 +281,7 @@ class QemuSystemTest(QemuBaseTest):
 
         console_log = logging.getLogger('console')
         console_log.setLevel(logging.DEBUG)
-        self.console_log_name = os.path.join(self.logdir, 'console.log')
+        self.console_log_name = self.log_file('console.log')
         self._console_log_fh = logging.FileHandler(self.console_log_name,
                                                    mode='w')
         self._console_log_fh.setLevel(logging.DEBUG)
@@ -131,7 +292,9 @@ class QemuSystemTest(QemuBaseTest):
     def set_machine(self, machinename):
         # TODO: We should use QMP to get the list of available machines
         if not self._machinehelp:
-            self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0];
+            self._machinehelp = run(
+                [self.qemu_bin, '-M', 'help'],
+                capture_output=True, check=True, encoding='utf8').stdout
         if self._machinehelp.find(machinename) < 0:
             self.skipTest('no support for machine ' + machinename)
         self.machine = machinename
@@ -159,22 +322,24 @@ class QemuSystemTest(QemuBaseTest):
                           "available" % accelerator)
 
     def require_netdev(self, netdevname):
-        netdevhelp = run_cmd([self.qemu_bin,
-                             '-M', 'none', '-netdev', 'help'])[0];
-        if netdevhelp.find('\n' + netdevname + '\n') < 0:
+        help = run([self.qemu_bin,
+                    '-M', 'none', '-netdev', 'help'],
+                   capture_output=True, check=True, encoding='utf8').stdout;
+        if help.find('\n' + netdevname + '\n') < 0:
             self.skipTest('no support for " + netdevname + " networking')
 
     def require_device(self, devicename):
-        devhelp = run_cmd([self.qemu_bin,
-                           '-M', 'none', '-device', 'help'])[0];
-        if devhelp.find(devicename) < 0:
+        help = run([self.qemu_bin,
+                    '-M', 'none', '-device', 'help'],
+                   capture_output=True, check=True, encoding='utf8').stdout;
+        if help.find(devicename) < 0:
             self.skipTest('no support for device ' + devicename)
 
     def _new_vm(self, name, *args):
         vm = QEMUMachine(self.qemu_bin,
                          name=name,
                          base_temp_dir=self.workdir,
-                         log_dir=self.logdir)
+                         log_dir=self.log_file())
         self.log.debug('QEMUMachine "%s" created', name)
         self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
 
diff --git a/tests/functional/qemu_test/tuxruntest.py b/tests/functional/qemu_test/tuxruntest.py
index ab3b27da43..7227a83757 100644
--- a/tests/functional/qemu_test/tuxruntest.py
+++ b/tests/functional/qemu_test/tuxruntest.py
@@ -11,12 +11,12 @@
 
 import os
 import stat
-import time
+from subprocess import check_call, DEVNULL
 
 from qemu_test import QemuSystemTest
-from qemu_test import exec_command, exec_command_and_wait_for_pattern
+from qemu_test import exec_command_and_wait_for_pattern
 from qemu_test import wait_for_console_pattern
-from qemu_test import has_cmd, run_cmd, get_qemu_img
+from qemu_test import which, get_qemu_img
 
 class TuxRunBaselineTest(QemuSystemTest):
 
@@ -39,10 +39,8 @@ class TuxRunBaselineTest(QemuSystemTest):
         super().setUp()
 
         # We need zstd for all the tuxrun tests
-        (has_zstd, msg) = has_cmd('zstd')
-        if has_zstd is False:
-            self.skipTest(msg)
-        self.zstd = 'zstd'
+        if which('zstd') is None:
+            self.skipTest("zstd not found in $PATH")
 
         # Pre-init TuxRun specific settings: Most machines work with
         # reasonable defaults but we sometimes need to tweak the
@@ -77,10 +75,11 @@ class TuxRunBaselineTest(QemuSystemTest):
         kernel_image =  kernel_asset.fetch()
         disk_image_zst = rootfs_asset.fetch()
 
-        disk_image = self.workdir + "/rootfs.ext4"
+        disk_image = self.scratch_file("rootfs.ext4")
 
-        run_cmd([self.zstd, "-f", "-d", disk_image_zst,
-                 "-o", disk_image])
+        check_call(['zstd', "-f", "-d", disk_image_zst,
+                    "-o", disk_image],
+                   stdout=DEVNULL, stderr=DEVNULL)
         # zstd copies source archive permissions for the output
         # file, so must make this writable for QEMU
         os.chmod(disk_image, stat.S_IRUSR | stat.S_IWUSR)
diff --git a/tests/functional/qemu_test/uncompress.py b/tests/functional/qemu_test/uncompress.py
new file mode 100644
index 0000000000..6d02ded066
--- /dev/null
+++ b/tests/functional/qemu_test/uncompress.py
@@ -0,0 +1,83 @@
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+# Utilities for python-based QEMU tests
+#
+# Copyright 2024 Red Hat, Inc.
+#
+# Authors:
+#  Thomas Huth <thuth@redhat.com>
+
+import gzip
+import lzma
+import os
+import shutil
+from urllib.parse import urlparse
+
+from .asset import Asset
+
+
+def gzip_uncompress(gz_path, output_path):
+    if os.path.exists(output_path):
+        return
+    with gzip.open(gz_path, 'rb') as gz_in:
+        try:
+            with open(output_path, 'wb') as raw_out:
+                shutil.copyfileobj(gz_in, raw_out)
+        except:
+            os.remove(output_path)
+            raise
+
+def lzma_uncompress(xz_path, output_path):
+    if os.path.exists(output_path):
+        return
+    with lzma.open(xz_path, 'rb') as lzma_in:
+        try:
+            with open(output_path, 'wb') as raw_out:
+                shutil.copyfileobj(lzma_in, raw_out)
+        except:
+            os.remove(output_path)
+            raise
+
+'''
+@params compressed: filename, Asset, or file-like object to uncompress
+@params uncompressed: filename to uncompress into
+@params format: optional compression format (gzip, lzma)
+
+Uncompresses @compressed into @uncompressed
+
+If @format is None, heuristics will be applied to guess the format
+from the filename or Asset URL. @format must be non-None if @uncompressed
+is a file-like object.
+
+Returns the fully qualified path to the uncompessed file
+'''
+def uncompress(compressed, uncompressed, format=None):
+    if format is None:
+        format = guess_uncompress_format(compressed)
+
+    if format == "xz":
+        lzma_uncompress(str(compressed), uncompressed)
+    elif format == "gz":
+        gzip_uncompress(str(compressed), uncompressed)
+    else:
+        raise Exception(f"Unknown compression format {format}")
+
+'''
+@params compressed: filename, Asset, or file-like object to guess
+
+Guess the format of @compressed, raising an exception if
+no format can be determined
+'''
+def guess_uncompress_format(compressed):
+    if type(compressed) == Asset:
+        compressed = urlparse(compressed.url).path
+    elif type(compressed) != str:
+        raise Exception(f"Unable to guess compression cformat for {compressed}")
+
+    (name, ext) = os.path.splitext(compressed)
+    if ext == ".xz":
+        return "xz"
+    elif ext == ".gz":
+        return "gz"
+    else:
+        raise Exception(f"Unknown compression format for {compressed}")
diff --git a/tests/functional/qemu_test/utils.py b/tests/functional/qemu_test/utils.py
index 1bf1c410d5..e7c8de8165 100644
--- a/tests/functional/qemu_test/utils.py
+++ b/tests/functional/qemu_test/utils.py
@@ -8,12 +8,14 @@
 # This work is licensed under the terms of the GNU GPL, version 2 or
 # later.  See the COPYING file in the top-level directory.
 
-import gzip
-import lzma
 import os
-import shutil
-import subprocess
-import tarfile
+
+from qemu.utils import get_info_usernet_hostfwd_port
+
+
+def get_usernet_hostfwd_port(vm):
+    res = vm.cmd('human-monitor-command', command_line='info usernet')
+    return get_info_usernet_hostfwd_port(res)
 
 """
 Round up to next power of 2
@@ -35,43 +37,3 @@ def image_pow2ceil_expand(path):
         if size != size_aligned:
             with open(path, 'ab+') as fd:
                 fd.truncate(size_aligned)
-
-def archive_extract(archive, dest_dir, member=None):
-    with tarfile.open(archive) as tf:
-        if hasattr(tarfile, 'data_filter'):
-            tf.extraction_filter = getattr(tarfile, 'data_filter',
-                                           (lambda member, path: member))
-        if member:
-            tf.extract(member=member, path=dest_dir)
-        else:
-            tf.extractall(path=dest_dir)
-
-def gzip_uncompress(gz_path, output_path):
-    if os.path.exists(output_path):
-        return
-    with gzip.open(gz_path, 'rb') as gz_in:
-        try:
-            with open(output_path, 'wb') as raw_out:
-                shutil.copyfileobj(gz_in, raw_out)
-        except:
-            os.remove(output_path)
-            raise
-
-def lzma_uncompress(xz_path, output_path):
-    if os.path.exists(output_path):
-        return
-    with lzma.open(xz_path, 'rb') as lzma_in:
-        try:
-            with open(output_path, 'wb') as raw_out:
-                shutil.copyfileobj(lzma_in, raw_out)
-        except:
-            os.remove(output_path)
-            raise
-
-def cpio_extract(cpio_handle, output_path):
-    cwd = os.getcwd()
-    os.chdir(output_path)
-    subprocess.run(['cpio', '-i'],
-                   input=cpio_handle.read(),
-                   stderr=subprocess.DEVNULL)
-    os.chdir(cwd)