summary refs log tree commit diff stats
path: root/tests/functional/qemu_test/archive.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/functional/qemu_test/archive.py')
-rw-r--r--tests/functional/qemu_test/archive.py117
1 files changed, 117 insertions, 0 deletions
diff --git a/tests/functional/qemu_test/archive.py b/tests/functional/qemu_test/archive.py
new file mode 100644
index 0000000000..c803fdaf6d
--- /dev/null
+++ b/tests/functional/qemu_test/archive.py
@@ -0,0 +1,117 @@
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+# Utilities for python-based QEMU tests
+#
+# Copyright 2024 Red Hat, Inc.
+#
+# Authors:
+#  Thomas Huth <thuth@redhat.com>
+
+import os
+from subprocess import check_call, run, DEVNULL
+import tarfile
+from urllib.parse import urlparse
+import zipfile
+
+from .asset import Asset
+
+
+def tar_extract(archive, dest_dir, member=None):
+    with tarfile.open(archive) as tf:
+        if hasattr(tarfile, 'data_filter'):
+            tf.extraction_filter = getattr(tarfile, 'data_filter',
+                                           (lambda member, path: member))
+        if member:
+            tf.extract(member=member, path=dest_dir)
+        else:
+            tf.extractall(path=dest_dir)
+
+def cpio_extract(archive, output_path):
+    cwd = os.getcwd()
+    os.chdir(output_path)
+    # Not passing 'check=True' as cpio exits with non-zero
+    # status if the archive contains any device nodes :-(
+    if type(archive) == str:
+        run(['cpio', '-i', '-F', archive],
+            stdout=DEVNULL, stderr=DEVNULL)
+    else:
+        run(['cpio', '-i'],
+            input=archive.read(),
+            stdout=DEVNULL, stderr=DEVNULL)
+    os.chdir(cwd)
+
+def zip_extract(archive, dest_dir, member=None):
+    with zipfile.ZipFile(archive, 'r') as zf:
+        if member:
+            zf.extract(member=member, path=dest_dir)
+        else:
+            zf.extractall(path=dest_dir)
+
+def deb_extract(archive, dest_dir, member=None):
+    cwd = os.getcwd()
+    os.chdir(dest_dir)
+    try:
+        proc = run(['ar', 't', archive],
+                   check=True, capture_output=True, encoding='utf8')
+        file_path = proc.stdout.split()[2]
+        check_call(['ar', 'x', archive, file_path],
+                   stdout=DEVNULL, stderr=DEVNULL)
+        tar_extract(file_path, dest_dir, member)
+    finally:
+        os.chdir(cwd)
+
+'''
+@params archive: filename, Asset, or file-like object to extract
+@params dest_dir: target directory to extract into
+@params member: optional member file to limit extraction to
+
+Extracts @archive into @dest_dir. All files are extracted
+unless @member specifies a limit.
+
+If @format is None, heuristics will be applied to guess the format
+from the filename or Asset URL. @format must be non-None if @archive
+is a file-like object.
+'''
+def archive_extract(archive, dest_dir, format=None, member=None):
+    if format is None:
+        format = guess_archive_format(archive)
+    if type(archive) == Asset:
+        archive = str(archive)
+
+    if format == "tar":
+        tar_extract(archive, dest_dir, member)
+    elif format == "zip":
+        zip_extract(archive, dest_dir, member)
+    elif format == "cpio":
+        if member is not None:
+            raise Exception("Unable to filter cpio extraction")
+        cpio_extract(archive, dest_dir)
+    elif format == "deb":
+        if type(archive) != str:
+            raise Exception("Unable to use file-like object with deb archives")
+        deb_extract(archive, dest_dir, "./" + member)
+    else:
+        raise Exception(f"Unknown archive format {format}")
+
+'''
+@params archive: filename, or Asset to guess
+
+Guess the format of @compressed, raising an exception if
+no format can be determined
+'''
+def guess_archive_format(archive):
+    if type(archive) == Asset:
+        archive = urlparse(archive.url).path
+    elif type(archive) != str:
+        raise Exception(f"Unable to guess archive format for {archive}")
+
+    if ".tar." in archive or archive.endswith("tgz"):
+        return "tar"
+    elif archive.endswith(".zip"):
+        return "zip"
+    elif archive.endswith(".cpio"):
+        return "cpio"
+    elif archive.endswith(".deb") or archive.endswith(".udeb"):
+        return "deb"
+    else:
+        raise Exception(f"Unknown archive format for {archive}")