summary refs log tree commit diff stats
path: root/tests/functional/qemu_test/testcase.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/functional/qemu_test/testcase.py')
-rw-r--r--tests/functional/qemu_test/testcase.py205
1 files changed, 185 insertions, 20 deletions
diff --git a/tests/functional/qemu_test/testcase.py b/tests/functional/qemu_test/testcase.py
index 90ae59eb54..869f3949fe 100644
--- a/tests/functional/qemu_test/testcase.py
+++ b/tests/functional/qemu_test/testcase.py
@@ -13,19 +13,22 @@
 
 import logging
 import os
+from pathlib import Path
 import pycotap
 import shutil
-import subprocess
+from subprocess import run
 import sys
+import tempfile
 import unittest
 import uuid
 
 from qemu.machine import QEMUMachine
 from qemu.utils import kvm_available, tcg_available
 
+from .archive import archive_extract
 from .asset import Asset
-from .cmd import run_cmd
 from .config import BUILD_DIR
+from .uncompress import uncompress
 
 
 class QemuBaseTest(unittest.TestCase):
@@ -37,17 +40,169 @@ class QemuBaseTest(unittest.TestCase):
     log = None
     logdir = None
 
+    '''
+    @params compressed: filename, Asset, or file-like object to uncompress
+    @params format: optional compression format (gzip, lzma)
+
+    Uncompresses @compressed into the scratch directory.
+
+    If @format is None, heuristics will be applied to guess the format
+    from the filename or Asset URL. @format must be non-None if @uncompressed
+    is a file-like object.
+
+    Returns the fully qualified path to the uncompressed file
+    '''
+    def uncompress(self, compressed, format=None):
+        self.log.debug(f"Uncompress {compressed} format={format}")
+        if type(compressed) == Asset:
+            compressed.fetch()
+
+        (name, ext) = os.path.splitext(str(compressed))
+        uncompressed = self.scratch_file(os.path.basename(name))
+
+        uncompress(compressed, uncompressed, format)
+
+        return uncompressed
+
+    '''
+    @params archive: filename, Asset, or file-like object to extract
+    @params format: optional archive format (tar, zip, deb, cpio)
+    @params sub_dir: optional sub-directory to extract into
+    @params member: optional member file to limit extraction to
+
+    Extracts @archive into the scratch directory, or a directory beneath
+    named by @sub_dir. All files are extracted unless @member specifies
+    a limit.
+
+    If @format is None, heuristics will be applied to guess the format
+    from the filename or Asset URL. @format must be non-None if @archive
+    is a file-like object.
+
+    If @member is non-None, returns the fully qualified path to @member
+    '''
+    def archive_extract(self, archive, format=None, sub_dir=None, member=None):
+        self.log.debug(f"Extract {archive} format={format}" +
+                       f"sub_dir={sub_dir} member={member}")
+        if type(archive) == Asset:
+            archive.fetch()
+        if sub_dir is None:
+            archive_extract(archive, self.scratch_file(), format, member)
+        else:
+            archive_extract(archive, self.scratch_file(sub_dir),
+                            format, member)
+
+        if member is not None:
+            return self.scratch_file(member)
+        return None
+
+    '''
+    Create a temporary directory suitable for storing UNIX
+    socket paths.
+
+    Returns: a tempfile.TemporaryDirectory instance
+    '''
+    def socket_dir(self):
+        if self.socketdir is None:
+            self.socketdir = tempfile.TemporaryDirectory(
+                prefix="qemu_func_test_sock_")
+        return self.socketdir
+
+    '''
+    @params args list of zero or more subdirectories or file
+
+    Construct a path for accessing a data file located
+    relative to the source directory that is the root for
+    functional tests.
+
+    @args may be an empty list to reference the root dir
+    itself, may be a single element to reference a file in
+    the root directory, or may be multiple elements to
+    reference a file nested below. The path components
+    will be joined using the platform appropriate path
+    separator.
+
+    Returns: string representing a file path
+    '''
+    def data_file(self, *args):
+        return str(Path(Path(__file__).parent.parent, *args))
+
+    '''
+    @params args list of zero or more subdirectories or file
+
+    Construct a path for accessing a data file located
+    relative to the build directory root.
+
+    @args may be an empty list to reference the build dir
+    itself, may be a single element to reference a file in
+    the build directory, or may be multiple elements to
+    reference a file nested below. The path components
+    will be joined using the platform appropriate path
+    separator.
+
+    Returns: string representing a file path
+    '''
+    def build_file(self, *args):
+        return str(Path(BUILD_DIR, *args))
+
+    '''
+    @params args list of zero or more subdirectories or file
+
+    Construct a path for accessing/creating a scratch file
+    located relative to a temporary directory dedicated to
+    this test case. The directory and its contents will be
+    purged upon completion of the test.
+
+    @args may be an empty list to reference the scratch dir
+    itself, may be a single element to reference a file in
+    the scratch directory, or may be multiple elements to
+    reference a file nested below. The path components
+    will be joined using the platform appropriate path
+    separator.
+
+    Returns: string representing a file path
+    '''
+    def scratch_file(self, *args):
+        return str(Path(self.workdir, *args))
+
+    '''
+    @params args list of zero or more subdirectories or file
+
+    Construct a path for accessing/creating a log file
+    located relative to a temporary directory dedicated to
+    this test case. The directory and its log files will be
+    preserved upon completion of the test.
+
+    @args may be an empty list to reference the log dir
+    itself, may be a single element to reference a file in
+    the log directory, or may be multiple elements to
+    reference a file nested below. The path components
+    will be joined using the platform appropriate path
+    separator.
+
+    Returns: string representing a file path
+    '''
+    def log_file(self, *args):
+        return str(Path(self.outputdir, *args))
+
+    def assets_available(self):
+        for name, asset in vars(self.__class__).items():
+            if name.startswith("ASSET_") and type(asset) == Asset:
+                if not asset.available():
+                    self.log.debug(f"Asset {asset.url} not available")
+                    return False
+        return True
+
     def setUp(self, bin_prefix):
         self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set')
         self.arch = self.qemu_bin.split('-')[-1]
+        self.socketdir = None
 
-        self.outputdir = os.path.join(BUILD_DIR, 'tests', 'functional',
-                                      self.arch, self.id())
+        self.outputdir = self.build_file('tests', 'functional',
+                                         self.arch, self.id())
         self.workdir = os.path.join(self.outputdir, 'scratch')
         os.makedirs(self.workdir, exist_ok=True)
 
-        self.logdir = self.outputdir
-        self.log_filename = os.path.join(self.logdir, 'base.log')
+        self.log_filename = self.log_file('base.log')
         self.log = logging.getLogger('qemu-test')
         self.log.setLevel(logging.DEBUG)
         self._log_fh = logging.FileHandler(self.log_filename, mode='w')
@@ -62,9 +217,15 @@ class QemuBaseTest(unittest.TestCase):
         self.machinelog.setLevel(logging.DEBUG)
         self.machinelog.addHandler(self._log_fh)
 
+        if not self.assets_available():
+            self.skipTest('One or more assets is not available')
+
     def tearDown(self):
         if "QEMU_TEST_KEEP_SCRATCH" not in os.environ:
             shutil.rmtree(self.workdir)
+        if self.socketdir is not None:
+            shutil.rmtree(self.socketdir.name)
+            self.socketdir = None
         self.machinelog.removeHandler(self._log_fh)
         self.log.removeHandler(self._log_fh)
 
@@ -100,11 +261,11 @@ class QemuUserTest(QemuBaseTest):
         self._ldpath.append(os.path.abspath(ldpath))
 
     def run_cmd(self, bin_path, args=[]):
-        return subprocess.run([self.qemu_bin]
-                              + ["-L %s" % ldpath for ldpath in self._ldpath]
-                              + [bin_path]
-                              + args,
-                              text=True, capture_output=True)
+        return run([self.qemu_bin]
+                   + ["-L %s" % ldpath for ldpath in self._ldpath]
+                   + [bin_path]
+                   + args,
+                   text=True, capture_output=True)
 
 class QemuSystemTest(QemuBaseTest):
     """Facilitates system emulation tests."""
@@ -120,7 +281,7 @@ class QemuSystemTest(QemuBaseTest):
 
         console_log = logging.getLogger('console')
         console_log.setLevel(logging.DEBUG)
-        self.console_log_name = os.path.join(self.logdir, 'console.log')
+        self.console_log_name = self.log_file('console.log')
         self._console_log_fh = logging.FileHandler(self.console_log_name,
                                                    mode='w')
         self._console_log_fh.setLevel(logging.DEBUG)
@@ -131,7 +292,9 @@ class QemuSystemTest(QemuBaseTest):
     def set_machine(self, machinename):
         # TODO: We should use QMP to get the list of available machines
         if not self._machinehelp:
-            self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0];
+            self._machinehelp = run(
+                [self.qemu_bin, '-M', 'help'],
+                capture_output=True, check=True, encoding='utf8').stdout
         if self._machinehelp.find(machinename) < 0:
             self.skipTest('no support for machine ' + machinename)
         self.machine = machinename
@@ -159,22 +322,24 @@ class QemuSystemTest(QemuBaseTest):
                           "available" % accelerator)
 
     def require_netdev(self, netdevname):
-        netdevhelp = run_cmd([self.qemu_bin,
-                             '-M', 'none', '-netdev', 'help'])[0];
-        if netdevhelp.find('\n' + netdevname + '\n') < 0:
+        help = run([self.qemu_bin,
+                    '-M', 'none', '-netdev', 'help'],
+                   capture_output=True, check=True, encoding='utf8').stdout;
+        if help.find('\n' + netdevname + '\n') < 0:
             self.skipTest('no support for " + netdevname + " networking')
 
     def require_device(self, devicename):
-        devhelp = run_cmd([self.qemu_bin,
-                           '-M', 'none', '-device', 'help'])[0];
-        if devhelp.find(devicename) < 0:
+        help = run([self.qemu_bin,
+                    '-M', 'none', '-device', 'help'],
+                   capture_output=True, check=True, encoding='utf8').stdout;
+        if help.find(devicename) < 0:
             self.skipTest('no support for device ' + devicename)
 
     def _new_vm(self, name, *args):
         vm = QEMUMachine(self.qemu_bin,
                          name=name,
                          base_temp_dir=self.workdir,
-                         log_dir=self.logdir)
+                         log_dir=self.log_file())
         self.log.debug('QEMUMachine "%s" created', name)
         self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)