summary refs log tree commit diff stats
path: root/tests/functional/qemu_test/uncompress.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/functional/qemu_test/uncompress.py')
-rw-r--r--tests/functional/qemu_test/uncompress.py83
1 files changed, 83 insertions, 0 deletions
diff --git a/tests/functional/qemu_test/uncompress.py b/tests/functional/qemu_test/uncompress.py
new file mode 100644
index 0000000000..6d02ded066
--- /dev/null
+++ b/tests/functional/qemu_test/uncompress.py
@@ -0,0 +1,83 @@
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+# Utilities for python-based QEMU tests
+#
+# Copyright 2024 Red Hat, Inc.
+#
+# Authors:
+#  Thomas Huth <thuth@redhat.com>
+
+import gzip
+import lzma
+import os
+import shutil
+from urllib.parse import urlparse
+
+from .asset import Asset
+
+
+def gzip_uncompress(gz_path, output_path):
+    if os.path.exists(output_path):
+        return
+    with gzip.open(gz_path, 'rb') as gz_in:
+        try:
+            with open(output_path, 'wb') as raw_out:
+                shutil.copyfileobj(gz_in, raw_out)
+        except:
+            os.remove(output_path)
+            raise
+
+def lzma_uncompress(xz_path, output_path):
+    if os.path.exists(output_path):
+        return
+    with lzma.open(xz_path, 'rb') as lzma_in:
+        try:
+            with open(output_path, 'wb') as raw_out:
+                shutil.copyfileobj(lzma_in, raw_out)
+        except:
+            os.remove(output_path)
+            raise
+
+'''
+@params compressed: filename, Asset, or file-like object to uncompress
+@params uncompressed: filename to uncompress into
+@params format: optional compression format (gzip, lzma)
+
+Uncompresses @compressed into @uncompressed
+
+If @format is None, heuristics will be applied to guess the format
+from the filename or Asset URL. @format must be non-None if @uncompressed
+is a file-like object.
+
+Returns the fully qualified path to the uncompessed file
+'''
+def uncompress(compressed, uncompressed, format=None):
+    if format is None:
+        format = guess_uncompress_format(compressed)
+
+    if format == "xz":
+        lzma_uncompress(str(compressed), uncompressed)
+    elif format == "gz":
+        gzip_uncompress(str(compressed), uncompressed)
+    else:
+        raise Exception(f"Unknown compression format {format}")
+
+'''
+@params compressed: filename, Asset, or file-like object to guess
+
+Guess the format of @compressed, raising an exception if
+no format can be determined
+'''
+def guess_uncompress_format(compressed):
+    if type(compressed) == Asset:
+        compressed = urlparse(compressed.url).path
+    elif type(compressed) != str:
+        raise Exception(f"Unable to guess compression cformat for {compressed}")
+
+    (name, ext) = os.path.splitext(compressed)
+    if ext == ".xz":
+        return "xz"
+    elif ext == ".gz":
+        return "gz"
+    else:
+        raise Exception(f"Unknown compression format for {compressed}")