diff options
Diffstat (limited to 'tests/functional/qemu_test')
| -rw-r--r-- | tests/functional/qemu_test/__init__.py | 9 | ||||
| -rw-r--r-- | tests/functional/qemu_test/archive.py | 117 | ||||
| -rw-r--r-- | tests/functional/qemu_test/asset.py | 26 | ||||
| -rw-r--r-- | tests/functional/qemu_test/cmd.py | 76 | ||||
| -rw-r--r-- | tests/functional/qemu_test/decorators.py | 107 | ||||
| -rw-r--r-- | tests/functional/qemu_test/linuxkernel.py | 29 | ||||
| -rw-r--r-- | tests/functional/qemu_test/tesseract.py | 21 | ||||
| -rw-r--r-- | tests/functional/qemu_test/testcase.py | 205 | ||||
| -rw-r--r-- | tests/functional/qemu_test/tuxruntest.py | 19 | ||||
| -rw-r--r-- | tests/functional/qemu_test/uncompress.py | 83 | ||||
| -rw-r--r-- | tests/functional/qemu_test/utils.py | 52 |
11 files changed, 559 insertions, 185 deletions
diff --git a/tests/functional/qemu_test/__init__.py b/tests/functional/qemu_test/__init__.py index 67f87be9c4..da1830286d 100644 --- a/tests/functional/qemu_test/__init__.py +++ b/tests/functional/qemu_test/__init__.py @@ -8,8 +8,13 @@ from .asset import Asset from .config import BUILD_DIR -from .cmd import has_cmd, has_cmds, run_cmd, is_readable_executable_file, \ +from .cmd import is_readable_executable_file, \ interrupt_interactive_console_until_pattern, wait_for_console_pattern, \ - exec_command, exec_command_and_wait_for_pattern, get_qemu_img + exec_command, exec_command_and_wait_for_pattern, get_qemu_img, which from .testcase import QemuBaseTest, QemuUserTest, QemuSystemTest from .linuxkernel import LinuxKernelTest +from .decorators import skipIfMissingCommands, skipIfNotMachine, \ + skipFlakyTest, skipUntrustedTest, skipBigDataTest, \ + skipIfMissingImports +from .archive import archive_extract +from .uncompress import uncompress diff --git a/tests/functional/qemu_test/archive.py b/tests/functional/qemu_test/archive.py new file mode 100644 index 0000000000..c803fdaf6d --- /dev/null +++ b/tests/functional/qemu_test/archive.py @@ -0,0 +1,117 @@ +# SPDX-License-Identifier: GPL-2.0-or-later +# +# Utilities for python-based QEMU tests +# +# Copyright 2024 Red Hat, Inc. +# +# Authors: +# Thomas Huth <thuth@redhat.com> + +import os +from subprocess import check_call, run, DEVNULL +import tarfile +from urllib.parse import urlparse +import zipfile + +from .asset import Asset + + +def tar_extract(archive, dest_dir, member=None): + with tarfile.open(archive) as tf: + if hasattr(tarfile, 'data_filter'): + tf.extraction_filter = getattr(tarfile, 'data_filter', + (lambda member, path: member)) + if member: + tf.extract(member=member, path=dest_dir) + else: + tf.extractall(path=dest_dir) + +def cpio_extract(archive, output_path): + cwd = os.getcwd() + os.chdir(output_path) + # Not passing 'check=True' as cpio exits with non-zero + # status if the archive contains any device nodes :-( + if type(archive) == str: + run(['cpio', '-i', '-F', archive], + stdout=DEVNULL, stderr=DEVNULL) + else: + run(['cpio', '-i'], + input=archive.read(), + stdout=DEVNULL, stderr=DEVNULL) + os.chdir(cwd) + +def zip_extract(archive, dest_dir, member=None): + with zipfile.ZipFile(archive, 'r') as zf: + if member: + zf.extract(member=member, path=dest_dir) + else: + zf.extractall(path=dest_dir) + +def deb_extract(archive, dest_dir, member=None): + cwd = os.getcwd() + os.chdir(dest_dir) + try: + proc = run(['ar', 't', archive], + check=True, capture_output=True, encoding='utf8') + file_path = proc.stdout.split()[2] + check_call(['ar', 'x', archive, file_path], + stdout=DEVNULL, stderr=DEVNULL) + tar_extract(file_path, dest_dir, member) + finally: + os.chdir(cwd) + +''' +@params archive: filename, Asset, or file-like object to extract +@params dest_dir: target directory to extract into +@params member: optional member file to limit extraction to + +Extracts @archive into @dest_dir. All files are extracted +unless @member specifies a limit. + +If @format is None, heuristics will be applied to guess the format +from the filename or Asset URL. @format must be non-None if @archive +is a file-like object. +''' +def archive_extract(archive, dest_dir, format=None, member=None): + if format is None: + format = guess_archive_format(archive) + if type(archive) == Asset: + archive = str(archive) + + if format == "tar": + tar_extract(archive, dest_dir, member) + elif format == "zip": + zip_extract(archive, dest_dir, member) + elif format == "cpio": + if member is not None: + raise Exception("Unable to filter cpio extraction") + cpio_extract(archive, dest_dir) + elif format == "deb": + if type(archive) != str: + raise Exception("Unable to use file-like object with deb archives") + deb_extract(archive, dest_dir, "./" + member) + else: + raise Exception(f"Unknown archive format {format}") + +''' +@params archive: filename, or Asset to guess + +Guess the format of @compressed, raising an exception if +no format can be determined +''' +def guess_archive_format(archive): + if type(archive) == Asset: + archive = urlparse(archive.url).path + elif type(archive) != str: + raise Exception(f"Unable to guess archive format for {archive}") + + if ".tar." in archive or archive.endswith("tgz"): + return "tar" + elif archive.endswith(".zip"): + return "zip" + elif archive.endswith(".cpio"): + return "cpio" + elif archive.endswith(".deb") or archive.endswith(".udeb"): + return "deb" + else: + raise Exception(f"Unknown archive format for {archive}") diff --git a/tests/functional/qemu_test/asset.py b/tests/functional/qemu_test/asset.py index f126cd5863..f0730695f0 100644 --- a/tests/functional/qemu_test/asset.py +++ b/tests/functional/qemu_test/asset.py @@ -9,13 +9,13 @@ import hashlib import logging import os import stat -import subprocess import sys import unittest import urllib.request from time import sleep from pathlib import Path from shutil import copyfileobj +from urllib.error import HTTPError # Instances of this class must be declared as class level variables @@ -40,6 +40,9 @@ class Asset: return "Asset: url=%s hash=%s cache=%s" % ( self.url, self.hash, self.cache_file) + def __str__(self): + return str(self.cache_file) + def _check(self, cache_file): if self.hash is None: return True @@ -63,6 +66,12 @@ class Asset: def valid(self): return self.cache_file.exists() and self._check(self.cache_file) + def fetchable(self): + return not os.environ.get("QEMU_TEST_NO_DOWNLOAD", False) + + def available(self): + return self.valid() or self.fetchable() + def _wait_for_other_download(self, tmp_cache_file): # Another thread already seems to download the asset, so wait until # it is done, while also checking the size to see whether it is stuck @@ -101,7 +110,7 @@ class Asset: self.cache_file, self.url) return str(self.cache_file) - if os.environ.get("QEMU_TEST_NO_DOWNLOAD", False): + if not self.fetchable(): raise Exception("Asset cache is invalid and downloads disabled") self.log.info("Downloading %s to %s...", self.url, self.cache_file) @@ -162,7 +171,18 @@ class Asset: for name, asset in vars(test.__class__).items(): if name.startswith("ASSET_") and type(asset) == Asset: log.info("Attempting to cache '%s'" % asset) - asset.fetch() + try: + asset.fetch() + except HTTPError as e: + # Treat 404 as fatal, since it is highly likely to + # indicate a broken test rather than a transient + # server or networking problem + if e.code == 404: + raise + + log.debug(f"HTTP error {e.code} from {asset.url} " + + "skipping asset precache") + log.removeHandler(handler) def precache_suite(suite): diff --git a/tests/functional/qemu_test/cmd.py b/tests/functional/qemu_test/cmd.py index 11c8334a7c..dc5f422b77 100644 --- a/tests/functional/qemu_test/cmd.py +++ b/tests/functional/qemu_test/cmd.py @@ -14,66 +14,18 @@ import logging import os import os.path -import subprocess -from .config import BUILD_DIR - -def has_cmd(name, args=None): - """ - This function is for use in a @skipUnless decorator, e.g.: - - @skipUnless(*has_cmd('sudo -n', ('sudo', '-n', 'true'))) - def test_something_that_needs_sudo(self): - ... - """ - - if args is None: - args = ('which', name) - - try: - _, stderr, exitcode = run_cmd(args) - except Exception as e: - exitcode = -1 - stderr = str(e) - - if exitcode != 0: - cmd_line = ' '.join(args) - err = f'{name} required, but "{cmd_line}" failed: {stderr.strip()}' - return (False, err) - else: - return (True, '') - -def has_cmds(*cmds): +def which(tool): + """ looks up the full path for @tool, returns None if not found + or if @tool does not have executable permissions. """ - This function is for use in a @skipUnless decorator and - allows checking for the availability of multiple commands, e.g.: - - @skipUnless(*has_cmds(('cmd1', ('cmd1', '--some-parameter')), - 'cmd2', 'cmd3')) - def test_something_that_needs_cmd1_and_cmd2(self): - ... - """ - - for cmd in cmds: - if isinstance(cmd, str): - cmd = (cmd,) - - ok, errstr = has_cmd(*cmd) - if not ok: - return (False, errstr) - - return (True, '') - -def run_cmd(args): - subp = subprocess.Popen(args, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=True) - stdout, stderr = subp.communicate() - ret = subp.returncode - - return (stdout, stderr, ret) + paths=os.getenv('PATH') + for p in paths.split(os.path.pathsep): + p = os.path.join(p, tool) + if os.access(p, os.X_OK): + return p + return None def is_readable_executable_file(path): return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK) @@ -241,10 +193,10 @@ def get_qemu_img(test): # If qemu-img has been built, use it, otherwise the system wide one # will be used. - qemu_img = os.path.join(BUILD_DIR, 'qemu-img') + qemu_img = test.build_file('qemu-img') if os.path.exists(qemu_img): return qemu_img - (has_system_qemu_img, errmsg) = has_cmd('qemu-img') - if has_system_qemu_img: - return 'qemu-img' - test.skipTest(errmsg) + qemu_img = which('qemu-img') + if qemu_img is not None: + return qemu_img + test.skipTest(f"qemu-img not found in build dir or '$PATH'") diff --git a/tests/functional/qemu_test/decorators.py b/tests/functional/qemu_test/decorators.py new file mode 100644 index 0000000000..df088bc090 --- /dev/null +++ b/tests/functional/qemu_test/decorators.py @@ -0,0 +1,107 @@ +# SPDX-License-Identifier: GPL-2.0-or-later +# +# Decorators useful in functional tests + +import os +import platform +from unittest import skipUnless + +from .cmd import which + +''' +Decorator to skip execution of a test if the list +of command binaries is not available in $PATH. +Example: + + @skipIfMissingCommands("mkisofs", "losetup") +''' +def skipIfMissingCommands(*args): + def has_cmds(cmdlist): + for cmd in cmdlist: + if not which(cmd): + return False + return True + + return skipUnless(lambda: has_cmds(args), + 'required command(s) "%s" not installed' % + ", ".join(args)) + +''' +Decorator to skip execution of a test if the current +host machine does not match one of the permitted +machines. +Example + + @skipIfNotMachine("x86_64", "aarch64") +''' +def skipIfNotMachine(*args): + return skipUnless(lambda: platform.machine() in args, + 'not running on one of the required machine(s) "%s"' % + ", ".join(args)) + +''' +Decorator to skip execution of flaky tests, unless +the $QEMU_TEST_FLAKY_TESTS environment variable is set. +A bug URL must be provided that documents the observed +failure behaviour, so it can be tracked & re-evaluated +in future. + +Historical tests may be providing "None" as the bug_url +but this should not be done for new test. + +Example: + + @skipFlakyTest("https://gitlab.com/qemu-project/qemu/-/issues/NNN") +''' +def skipFlakyTest(bug_url): + if bug_url is None: + bug_url = "FIXME: reproduce flaky test and file bug report or remove" + return skipUnless(os.getenv('QEMU_TEST_FLAKY_TESTS'), + f'Test is unstable: {bug_url}') + +''' +Decorator to skip execution of tests which are likely +to execute untrusted commands on the host, or commands +which process untrusted code, unless the +$QEMU_TEST_ALLOW_UNTRUSTED_CODE env var is set. +Example: + + @skipUntrustedTest() +''' +def skipUntrustedTest(): + return skipUnless(os.getenv('QEMU_TEST_ALLOW_UNTRUSTED_CODE'), + 'Test runs untrusted code / processes untrusted data') + +''' +Decorator to skip execution of tests which need large +data storage (over around 500MB-1GB mark) on the host, +unless the $QEMU_TEST_ALLOW_LARGE_STORAGE environment +variable is set + +Example: + + @skipBigDataTest() +''' +def skipBigDataTest(): + return skipUnless(os.getenv('QEMU_TEST_ALLOW_LARGE_STORAGE'), + 'Test requires large host storage space') + +''' +Decorator to skip execution of a test if the list +of python imports is not available. +Example: + + @skipIfMissingImports("numpy", "cv2") +''' +def skipIfMissingImports(*args): + def has_imports(importlist): + for impname in importlist: + try: + import impname + except ImportError: + return False + return True + + return skipUnless(lambda: has_imports(args), + 'required import(s) "%s" not installed' % + ", ".join(args)) diff --git a/tests/functional/qemu_test/linuxkernel.py b/tests/functional/qemu_test/linuxkernel.py index 2b5b9a5fda..2c9598102d 100644 --- a/tests/functional/qemu_test/linuxkernel.py +++ b/tests/functional/qemu_test/linuxkernel.py @@ -3,11 +3,9 @@ # This work is licensed under the terms of the GNU GPL, version 2 or # later. See the COPYING file in the top-level directory. -import os - from .testcase import QemuSystemTest -from .cmd import run_cmd, wait_for_console_pattern -from .utils import archive_extract +from .cmd import wait_for_console_pattern + class LinuxKernelTest(QemuSystemTest): KERNEL_COMMON_COMMAND_LINE = 'printk.time=0 ' @@ -28,26 +26,3 @@ class LinuxKernelTest(QemuSystemTest): self.vm.launch() if wait_for: self.wait_for_console_pattern(wait_for) - - def extract_from_deb(self, deb_path, path): - """ - Extracts a file from a deb package into the test workdir - - :param deb_path: path to the deb archive - :param path: path within the deb archive of the file to be extracted - :returns: path of the extracted file - """ - cwd = os.getcwd() - os.chdir(self.workdir) - (stdout, stderr, ret) = run_cmd(['ar', 't', deb_path]) - file_path = stdout.split()[2] - run_cmd(['ar', 'x', deb_path, file_path]) - archive_extract(file_path, self.workdir) - os.chdir(cwd) - # Return complete path to extracted file. Because callers to - # extract_from_deb() specify 'path' with a leading slash, it is - # necessary to use os.path.relpath() as otherwise os.path.join() - # interprets it as an absolute path and drops the self.workdir part. - return os.path.normpath(os.path.join(self.workdir, - os.path.relpath(path, '/'))) - diff --git a/tests/functional/qemu_test/tesseract.py b/tests/functional/qemu_test/tesseract.py index db441027b9..ede6c6501e 100644 --- a/tests/functional/qemu_test/tesseract.py +++ b/tests/functional/qemu_test/tesseract.py @@ -5,30 +5,19 @@ # This work is licensed under the terms of the GNU GPL, version 2 or # later. See the COPYING file in the top-level directory. -import re import logging +from subprocess import run -from . import has_cmd, run_cmd - -def tesseract_available(expected_version): - (has_tesseract, _) = has_cmd('tesseract') - if not has_tesseract: - return False - (stdout, stderr, ret) = run_cmd([ 'tesseract', '--version']) - if ret: - return False - version = stdout.split()[1] - return int(version.split('.')[0]) >= expected_version def tesseract_ocr(image_path, tesseract_args=''): console_logger = logging.getLogger('console') console_logger.debug(image_path) - (stdout, stderr, ret) = run_cmd(['tesseract', image_path, - 'stdout']) - if ret: + proc = run(['tesseract', image_path, 'stdout'], + capture_output=True, encoding='utf8') + if proc.returncode: return None lines = [] - for line in stdout.split('\n'): + for line in proc.stdout.split('\n'): sline = line.strip() if len(sline): console_logger.debug(sline) diff --git a/tests/functional/qemu_test/testcase.py b/tests/functional/qemu_test/testcase.py index 90ae59eb54..869f3949fe 100644 --- a/tests/functional/qemu_test/testcase.py +++ b/tests/functional/qemu_test/testcase.py @@ -13,19 +13,22 @@ import logging import os +from pathlib import Path import pycotap import shutil -import subprocess +from subprocess import run import sys +import tempfile import unittest import uuid from qemu.machine import QEMUMachine from qemu.utils import kvm_available, tcg_available +from .archive import archive_extract from .asset import Asset -from .cmd import run_cmd from .config import BUILD_DIR +from .uncompress import uncompress class QemuBaseTest(unittest.TestCase): @@ -37,17 +40,169 @@ class QemuBaseTest(unittest.TestCase): log = None logdir = None + ''' + @params compressed: filename, Asset, or file-like object to uncompress + @params format: optional compression format (gzip, lzma) + + Uncompresses @compressed into the scratch directory. + + If @format is None, heuristics will be applied to guess the format + from the filename or Asset URL. @format must be non-None if @uncompressed + is a file-like object. + + Returns the fully qualified path to the uncompressed file + ''' + def uncompress(self, compressed, format=None): + self.log.debug(f"Uncompress {compressed} format={format}") + if type(compressed) == Asset: + compressed.fetch() + + (name, ext) = os.path.splitext(str(compressed)) + uncompressed = self.scratch_file(os.path.basename(name)) + + uncompress(compressed, uncompressed, format) + + return uncompressed + + ''' + @params archive: filename, Asset, or file-like object to extract + @params format: optional archive format (tar, zip, deb, cpio) + @params sub_dir: optional sub-directory to extract into + @params member: optional member file to limit extraction to + + Extracts @archive into the scratch directory, or a directory beneath + named by @sub_dir. All files are extracted unless @member specifies + a limit. + + If @format is None, heuristics will be applied to guess the format + from the filename or Asset URL. @format must be non-None if @archive + is a file-like object. + + If @member is non-None, returns the fully qualified path to @member + ''' + def archive_extract(self, archive, format=None, sub_dir=None, member=None): + self.log.debug(f"Extract {archive} format={format}" + + f"sub_dir={sub_dir} member={member}") + if type(archive) == Asset: + archive.fetch() + if sub_dir is None: + archive_extract(archive, self.scratch_file(), format, member) + else: + archive_extract(archive, self.scratch_file(sub_dir), + format, member) + + if member is not None: + return self.scratch_file(member) + return None + + ''' + Create a temporary directory suitable for storing UNIX + socket paths. + + Returns: a tempfile.TemporaryDirectory instance + ''' + def socket_dir(self): + if self.socketdir is None: + self.socketdir = tempfile.TemporaryDirectory( + prefix="qemu_func_test_sock_") + return self.socketdir + + ''' + @params args list of zero or more subdirectories or file + + Construct a path for accessing a data file located + relative to the source directory that is the root for + functional tests. + + @args may be an empty list to reference the root dir + itself, may be a single element to reference a file in + the root directory, or may be multiple elements to + reference a file nested below. The path components + will be joined using the platform appropriate path + separator. + + Returns: string representing a file path + ''' + def data_file(self, *args): + return str(Path(Path(__file__).parent.parent, *args)) + + ''' + @params args list of zero or more subdirectories or file + + Construct a path for accessing a data file located + relative to the build directory root. + + @args may be an empty list to reference the build dir + itself, may be a single element to reference a file in + the build directory, or may be multiple elements to + reference a file nested below. The path components + will be joined using the platform appropriate path + separator. + + Returns: string representing a file path + ''' + def build_file(self, *args): + return str(Path(BUILD_DIR, *args)) + + ''' + @params args list of zero or more subdirectories or file + + Construct a path for accessing/creating a scratch file + located relative to a temporary directory dedicated to + this test case. The directory and its contents will be + purged upon completion of the test. + + @args may be an empty list to reference the scratch dir + itself, may be a single element to reference a file in + the scratch directory, or may be multiple elements to + reference a file nested below. The path components + will be joined using the platform appropriate path + separator. + + Returns: string representing a file path + ''' + def scratch_file(self, *args): + return str(Path(self.workdir, *args)) + + ''' + @params args list of zero or more subdirectories or file + + Construct a path for accessing/creating a log file + located relative to a temporary directory dedicated to + this test case. The directory and its log files will be + preserved upon completion of the test. + + @args may be an empty list to reference the log dir + itself, may be a single element to reference a file in + the log directory, or may be multiple elements to + reference a file nested below. The path components + will be joined using the platform appropriate path + separator. + + Returns: string representing a file path + ''' + def log_file(self, *args): + return str(Path(self.outputdir, *args)) + + def assets_available(self): + for name, asset in vars(self.__class__).items(): + if name.startswith("ASSET_") and type(asset) == Asset: + if not asset.available(): + self.log.debug(f"Asset {asset.url} not available") + return False + return True + def setUp(self, bin_prefix): self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set') self.arch = self.qemu_bin.split('-')[-1] + self.socketdir = None - self.outputdir = os.path.join(BUILD_DIR, 'tests', 'functional', - self.arch, self.id()) + self.outputdir = self.build_file('tests', 'functional', + self.arch, self.id()) self.workdir = os.path.join(self.outputdir, 'scratch') os.makedirs(self.workdir, exist_ok=True) - self.logdir = self.outputdir - self.log_filename = os.path.join(self.logdir, 'base.log') + self.log_filename = self.log_file('base.log') self.log = logging.getLogger('qemu-test') self.log.setLevel(logging.DEBUG) self._log_fh = logging.FileHandler(self.log_filename, mode='w') @@ -62,9 +217,15 @@ class QemuBaseTest(unittest.TestCase): self.machinelog.setLevel(logging.DEBUG) self.machinelog.addHandler(self._log_fh) + if not self.assets_available(): + self.skipTest('One or more assets is not available') + def tearDown(self): if "QEMU_TEST_KEEP_SCRATCH" not in os.environ: shutil.rmtree(self.workdir) + if self.socketdir is not None: + shutil.rmtree(self.socketdir.name) + self.socketdir = None self.machinelog.removeHandler(self._log_fh) self.log.removeHandler(self._log_fh) @@ -100,11 +261,11 @@ class QemuUserTest(QemuBaseTest): self._ldpath.append(os.path.abspath(ldpath)) def run_cmd(self, bin_path, args=[]): - return subprocess.run([self.qemu_bin] - + ["-L %s" % ldpath for ldpath in self._ldpath] - + [bin_path] - + args, - text=True, capture_output=True) + return run([self.qemu_bin] + + ["-L %s" % ldpath for ldpath in self._ldpath] + + [bin_path] + + args, + text=True, capture_output=True) class QemuSystemTest(QemuBaseTest): """Facilitates system emulation tests.""" @@ -120,7 +281,7 @@ class QemuSystemTest(QemuBaseTest): console_log = logging.getLogger('console') console_log.setLevel(logging.DEBUG) - self.console_log_name = os.path.join(self.logdir, 'console.log') + self.console_log_name = self.log_file('console.log') self._console_log_fh = logging.FileHandler(self.console_log_name, mode='w') self._console_log_fh.setLevel(logging.DEBUG) @@ -131,7 +292,9 @@ class QemuSystemTest(QemuBaseTest): def set_machine(self, machinename): # TODO: We should use QMP to get the list of available machines if not self._machinehelp: - self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0]; + self._machinehelp = run( + [self.qemu_bin, '-M', 'help'], + capture_output=True, check=True, encoding='utf8').stdout if self._machinehelp.find(machinename) < 0: self.skipTest('no support for machine ' + machinename) self.machine = machinename @@ -159,22 +322,24 @@ class QemuSystemTest(QemuBaseTest): "available" % accelerator) def require_netdev(self, netdevname): - netdevhelp = run_cmd([self.qemu_bin, - '-M', 'none', '-netdev', 'help'])[0]; - if netdevhelp.find('\n' + netdevname + '\n') < 0: + help = run([self.qemu_bin, + '-M', 'none', '-netdev', 'help'], + capture_output=True, check=True, encoding='utf8').stdout; + if help.find('\n' + netdevname + '\n') < 0: self.skipTest('no support for " + netdevname + " networking') def require_device(self, devicename): - devhelp = run_cmd([self.qemu_bin, - '-M', 'none', '-device', 'help'])[0]; - if devhelp.find(devicename) < 0: + help = run([self.qemu_bin, + '-M', 'none', '-device', 'help'], + capture_output=True, check=True, encoding='utf8').stdout; + if help.find(devicename) < 0: self.skipTest('no support for device ' + devicename) def _new_vm(self, name, *args): vm = QEMUMachine(self.qemu_bin, name=name, base_temp_dir=self.workdir, - log_dir=self.logdir) + log_dir=self.log_file()) self.log.debug('QEMUMachine "%s" created', name) self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir) diff --git a/tests/functional/qemu_test/tuxruntest.py b/tests/functional/qemu_test/tuxruntest.py index ab3b27da43..7227a83757 100644 --- a/tests/functional/qemu_test/tuxruntest.py +++ b/tests/functional/qemu_test/tuxruntest.py @@ -11,12 +11,12 @@ import os import stat -import time +from subprocess import check_call, DEVNULL from qemu_test import QemuSystemTest -from qemu_test import exec_command, exec_command_and_wait_for_pattern +from qemu_test import exec_command_and_wait_for_pattern from qemu_test import wait_for_console_pattern -from qemu_test import has_cmd, run_cmd, get_qemu_img +from qemu_test import which, get_qemu_img class TuxRunBaselineTest(QemuSystemTest): @@ -39,10 +39,8 @@ class TuxRunBaselineTest(QemuSystemTest): super().setUp() # We need zstd for all the tuxrun tests - (has_zstd, msg) = has_cmd('zstd') - if has_zstd is False: - self.skipTest(msg) - self.zstd = 'zstd' + if which('zstd') is None: + self.skipTest("zstd not found in $PATH") # Pre-init TuxRun specific settings: Most machines work with # reasonable defaults but we sometimes need to tweak the @@ -77,10 +75,11 @@ class TuxRunBaselineTest(QemuSystemTest): kernel_image = kernel_asset.fetch() disk_image_zst = rootfs_asset.fetch() - disk_image = self.workdir + "/rootfs.ext4" + disk_image = self.scratch_file("rootfs.ext4") - run_cmd([self.zstd, "-f", "-d", disk_image_zst, - "-o", disk_image]) + check_call(['zstd', "-f", "-d", disk_image_zst, + "-o", disk_image], + stdout=DEVNULL, stderr=DEVNULL) # zstd copies source archive permissions for the output # file, so must make this writable for QEMU os.chmod(disk_image, stat.S_IRUSR | stat.S_IWUSR) diff --git a/tests/functional/qemu_test/uncompress.py b/tests/functional/qemu_test/uncompress.py new file mode 100644 index 0000000000..6d02ded066 --- /dev/null +++ b/tests/functional/qemu_test/uncompress.py @@ -0,0 +1,83 @@ +# SPDX-License-Identifier: GPL-2.0-or-later +# +# Utilities for python-based QEMU tests +# +# Copyright 2024 Red Hat, Inc. +# +# Authors: +# Thomas Huth <thuth@redhat.com> + +import gzip +import lzma +import os +import shutil +from urllib.parse import urlparse + +from .asset import Asset + + +def gzip_uncompress(gz_path, output_path): + if os.path.exists(output_path): + return + with gzip.open(gz_path, 'rb') as gz_in: + try: + with open(output_path, 'wb') as raw_out: + shutil.copyfileobj(gz_in, raw_out) + except: + os.remove(output_path) + raise + +def lzma_uncompress(xz_path, output_path): + if os.path.exists(output_path): + return + with lzma.open(xz_path, 'rb') as lzma_in: + try: + with open(output_path, 'wb') as raw_out: + shutil.copyfileobj(lzma_in, raw_out) + except: + os.remove(output_path) + raise + +''' +@params compressed: filename, Asset, or file-like object to uncompress +@params uncompressed: filename to uncompress into +@params format: optional compression format (gzip, lzma) + +Uncompresses @compressed into @uncompressed + +If @format is None, heuristics will be applied to guess the format +from the filename or Asset URL. @format must be non-None if @uncompressed +is a file-like object. + +Returns the fully qualified path to the uncompessed file +''' +def uncompress(compressed, uncompressed, format=None): + if format is None: + format = guess_uncompress_format(compressed) + + if format == "xz": + lzma_uncompress(str(compressed), uncompressed) + elif format == "gz": + gzip_uncompress(str(compressed), uncompressed) + else: + raise Exception(f"Unknown compression format {format}") + +''' +@params compressed: filename, Asset, or file-like object to guess + +Guess the format of @compressed, raising an exception if +no format can be determined +''' +def guess_uncompress_format(compressed): + if type(compressed) == Asset: + compressed = urlparse(compressed.url).path + elif type(compressed) != str: + raise Exception(f"Unable to guess compression cformat for {compressed}") + + (name, ext) = os.path.splitext(compressed) + if ext == ".xz": + return "xz" + elif ext == ".gz": + return "gz" + else: + raise Exception(f"Unknown compression format for {compressed}") diff --git a/tests/functional/qemu_test/utils.py b/tests/functional/qemu_test/utils.py index 1bf1c410d5..e7c8de8165 100644 --- a/tests/functional/qemu_test/utils.py +++ b/tests/functional/qemu_test/utils.py @@ -8,12 +8,14 @@ # This work is licensed under the terms of the GNU GPL, version 2 or # later. See the COPYING file in the top-level directory. -import gzip -import lzma import os -import shutil -import subprocess -import tarfile + +from qemu.utils import get_info_usernet_hostfwd_port + + +def get_usernet_hostfwd_port(vm): + res = vm.cmd('human-monitor-command', command_line='info usernet') + return get_info_usernet_hostfwd_port(res) """ Round up to next power of 2 @@ -35,43 +37,3 @@ def image_pow2ceil_expand(path): if size != size_aligned: with open(path, 'ab+') as fd: fd.truncate(size_aligned) - -def archive_extract(archive, dest_dir, member=None): - with tarfile.open(archive) as tf: - if hasattr(tarfile, 'data_filter'): - tf.extraction_filter = getattr(tarfile, 'data_filter', - (lambda member, path: member)) - if member: - tf.extract(member=member, path=dest_dir) - else: - tf.extractall(path=dest_dir) - -def gzip_uncompress(gz_path, output_path): - if os.path.exists(output_path): - return - with gzip.open(gz_path, 'rb') as gz_in: - try: - with open(output_path, 'wb') as raw_out: - shutil.copyfileobj(gz_in, raw_out) - except: - os.remove(output_path) - raise - -def lzma_uncompress(xz_path, output_path): - if os.path.exists(output_path): - return - with lzma.open(xz_path, 'rb') as lzma_in: - try: - with open(output_path, 'wb') as raw_out: - shutil.copyfileobj(lzma_in, raw_out) - except: - os.remove(output_path) - raise - -def cpio_extract(cpio_handle, output_path): - cwd = os.getcwd() - os.chdir(output_path) - subprocess.run(['cpio', '-i'], - input=cpio_handle.read(), - stderr=subprocess.DEVNULL) - os.chdir(cwd) |