summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--migration/qemu-file.c39
-rw-r--r--migration/qemu-file.h6
-rw-r--r--migration/ram.c41
3 files changed, 68 insertions, 18 deletions
diff --git a/migration/qemu-file.c b/migration/qemu-file.c
index bb63c779cc..bafe3a0c0d 100644
--- a/migration/qemu-file.c
+++ b/migration/qemu-file.c
@@ -658,8 +658,32 @@ uint64_t qemu_get_be64(QEMUFile *f)
     return v;
 }
 
-/* Compress size bytes of data start at p with specific compression
- * level and store the compressed data to the buffer of f.
+/* return the size after compression, or negative value on error */
+static int qemu_compress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
+                              const uint8_t *source, size_t source_len)
+{
+    int err;
+
+    err = deflateReset(stream);
+    if (err != Z_OK) {
+        return -1;
+    }
+
+    stream->avail_in = source_len;
+    stream->next_in = (uint8_t *)source;
+    stream->avail_out = dest_len;
+    stream->next_out = dest;
+
+    err = deflate(stream, Z_FINISH);
+    if (err != Z_STREAM_END) {
+        return -1;
+    }
+
+    return stream->next_out - dest;
+}
+
+/* Compress size bytes of data start at p and store the compressed
+ * data to the buffer of f.
  *
  * When f is not writable, return -1 if f has no space to save the
  * compressed data.
@@ -667,9 +691,8 @@ uint64_t qemu_get_be64(QEMUFile *f)
  * do fflush first, if f still has no space to save the compressed
  * data, return -1.
  */
-
-ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size,
-                                  int level)
+ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
+                                  const uint8_t *p, size_t size)
 {
     ssize_t blen = IO_BUF_SIZE - f->buf_index - sizeof(int32_t);
 
@@ -683,8 +706,10 @@ ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size,
             return -1;
         }
     }
-    if (compress2(f->buf + f->buf_index + sizeof(int32_t), (uLongf *)&blen,
-                  (Bytef *)p, size, level) != Z_OK) {
+
+    blen = qemu_compress_data(stream, f->buf + f->buf_index + sizeof(int32_t),
+                              blen, p, size);
+    if (blen < 0) {
         error_report("Compress Failed!");
         return 0;
     }
diff --git a/migration/qemu-file.h b/migration/qemu-file.h
index f4f356ab12..2ccfcfb2a8 100644
--- a/migration/qemu-file.h
+++ b/migration/qemu-file.h
@@ -25,6 +25,8 @@
 #ifndef MIGRATION_QEMU_FILE_H
 #define MIGRATION_QEMU_FILE_H
 
+#include <zlib.h>
+
 /* Read a chunk of data from a file at the given position.  The pos argument
  * can be ignored if the file is only be used for streaming.  The number of
  * bytes actually read should be returned.
@@ -132,8 +134,8 @@ bool qemu_file_is_writable(QEMUFile *f);
 
 size_t qemu_peek_buffer(QEMUFile *f, uint8_t **buf, size_t size, size_t offset);
 size_t qemu_get_buffer_in_place(QEMUFile *f, uint8_t **buf, size_t size);
-ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size,
-                                  int level);
+ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
+                                  const uint8_t *p, size_t size);
 int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src);
 
 /*
diff --git a/migration/ram.c b/migration/ram.c
index 409c847a76..a21514a469 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -269,6 +269,7 @@ struct CompressParam {
     QemuCond cond;
     RAMBlock *block;
     ram_addr_t offset;
+    z_stream stream;
 };
 typedef struct CompressParam CompressParam;
 
@@ -299,7 +300,7 @@ static QemuThread *decompress_threads;
 static QemuMutex decomp_done_lock;
 static QemuCond decomp_done_cond;
 
-static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
+static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
                                 ram_addr_t offset);
 
 static void *do_data_compress(void *opaque)
@@ -316,7 +317,7 @@ static void *do_data_compress(void *opaque)
             param->block = NULL;
             qemu_mutex_unlock(&param->mutex);
 
-            do_compress_ram_page(param->file, block, offset);
+            do_compress_ram_page(param->file, &param->stream, block, offset);
 
             qemu_mutex_lock(&comp_done_lock);
             param->done = true;
@@ -357,10 +358,19 @@ static void compress_threads_save_cleanup(void)
     terminate_compression_threads();
     thread_count = migrate_compress_threads();
     for (i = 0; i < thread_count; i++) {
+        /*
+         * we use it as a indicator which shows if the thread is
+         * properly init'd or not
+         */
+        if (!comp_param[i].file) {
+            break;
+        }
         qemu_thread_join(compress_threads + i);
-        qemu_fclose(comp_param[i].file);
         qemu_mutex_destroy(&comp_param[i].mutex);
         qemu_cond_destroy(&comp_param[i].cond);
+        deflateEnd(&comp_param[i].stream);
+        qemu_fclose(comp_param[i].file);
+        comp_param[i].file = NULL;
     }
     qemu_mutex_destroy(&comp_done_lock);
     qemu_cond_destroy(&comp_done_cond);
@@ -370,12 +380,12 @@ static void compress_threads_save_cleanup(void)
     comp_param = NULL;
 }
 
-static void compress_threads_save_setup(void)
+static int compress_threads_save_setup(void)
 {
     int i, thread_count;
 
     if (!migrate_use_compression()) {
-        return;
+        return 0;
     }
     thread_count = migrate_compress_threads();
     compress_threads = g_new0(QemuThread, thread_count);
@@ -383,6 +393,11 @@ static void compress_threads_save_setup(void)
     qemu_cond_init(&comp_done_cond);
     qemu_mutex_init(&comp_done_lock);
     for (i = 0; i < thread_count; i++) {
+        if (deflateInit(&comp_param[i].stream,
+                        migrate_compress_level()) != Z_OK) {
+            goto exit;
+        }
+
         /* comp_param[i].file is just used as a dummy buffer to save data,
          * set its ops to empty.
          */
@@ -395,6 +410,11 @@ static void compress_threads_save_setup(void)
                            do_data_compress, comp_param + i,
                            QEMU_THREAD_JOINABLE);
     }
+    return 0;
+
+exit:
+    compress_threads_save_cleanup();
+    return -1;
 }
 
 /* Multiple fd's */
@@ -1031,7 +1051,7 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
     return pages;
 }
 
-static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
+static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
                                 ram_addr_t offset)
 {
     RAMState *rs = ram_state;
@@ -1040,8 +1060,7 @@ static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
 
     bytes_sent = save_page_header(rs, f, block, offset |
                                   RAM_SAVE_FLAG_COMPRESS_PAGE);
-    blen = qemu_put_compression_data(f, p, TARGET_PAGE_SIZE,
-                                     migrate_compress_level());
+    blen = qemu_put_compression_data(f, stream, p, TARGET_PAGE_SIZE);
     if (blen < 0) {
         bytes_sent = 0;
         qemu_file_set_error(migrate_get_current()->to_dst_file, blen);
@@ -2214,9 +2233,14 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
     RAMState **rsp = opaque;
     RAMBlock *block;
 
+    if (compress_threads_save_setup()) {
+        return -1;
+    }
+
     /* migration has already setup the bitmap, reuse it. */
     if (!migration_in_colo_state()) {
         if (ram_init_all(rsp) != 0) {
+            compress_threads_save_cleanup();
             return -1;
         }
     }
@@ -2236,7 +2260,6 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
     }
 
     rcu_read_unlock();
-    compress_threads_save_setup();
 
     ram_control_before_iterate(f, RAM_CONTROL_SETUP);
     ram_control_after_iterate(f, RAM_CONTROL_SETUP);