summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--arch_init.c77
-rw-r--r--include/migration/migration.h4
-rw-r--r--migration/migration.c19
3 files changed, 100 insertions, 0 deletions
diff --git a/arch_init.c b/arch_init.c
index 179c58c852..874948195a 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -24,6 +24,7 @@
 #include <stdint.h>
 #include <stdarg.h>
 #include <stdlib.h>
+#include <zlib.h>
 #ifndef _WIN32
 #include <sys/types.h>
 #include <sys/mman.h>
@@ -127,6 +128,7 @@ static uint64_t bitmap_sync_count;
 #define RAM_SAVE_FLAG_CONTINUE 0x20
 #define RAM_SAVE_FLAG_XBZRLE   0x40
 /* 0x80 is reserved in migration.h start with 0x100 next */
+#define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
 
 static struct defconfig_file {
     const char *filename;
@@ -321,9 +323,18 @@ struct CompressParam {
 };
 typedef struct CompressParam CompressParam;
 
+struct DecompressParam {
+    /* To be done */
+};
+typedef struct DecompressParam DecompressParam;
+
 static CompressParam *comp_param;
 static QemuThread *compress_threads;
 static bool quit_comp_thread;
+static bool quit_decomp_thread;
+static DecompressParam *decomp_param;
+static QemuThread *decompress_threads;
+static uint8_t *compressed_data_buf;
 
 static void *do_data_compress(void *opaque)
 {
@@ -1203,10 +1214,59 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
     }
 }
 
+static void *do_data_decompress(void *opaque)
+{
+    while (!quit_decomp_thread) {
+        /* To be done */
+    }
+
+    return NULL;
+}
+
+void migrate_decompress_threads_create(void)
+{
+    int i, thread_count;
+
+    thread_count = migrate_decompress_threads();
+    decompress_threads = g_new0(QemuThread, thread_count);
+    decomp_param = g_new0(DecompressParam, thread_count);
+    compressed_data_buf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
+    quit_decomp_thread = false;
+    for (i = 0; i < thread_count; i++) {
+        qemu_thread_create(decompress_threads + i, "decompress",
+                           do_data_decompress, decomp_param + i,
+                           QEMU_THREAD_JOINABLE);
+    }
+}
+
+void migrate_decompress_threads_join(void)
+{
+    int i, thread_count;
+
+    quit_decomp_thread = true;
+    thread_count = migrate_decompress_threads();
+    for (i = 0; i < thread_count; i++) {
+        qemu_thread_join(decompress_threads + i);
+    }
+    g_free(decompress_threads);
+    g_free(decomp_param);
+    g_free(compressed_data_buf);
+    decompress_threads = NULL;
+    decomp_param = NULL;
+    compressed_data_buf = NULL;
+}
+
+static void decompress_data_with_multi_threads(uint8_t *compbuf,
+                                               void *host, int len)
+{
+    /* To be done */
+}
+
 static int ram_load(QEMUFile *f, void *opaque, int version_id)
 {
     int flags = 0, ret = 0;
     static uint64_t seq_iter;
+    int len = 0;
 
     seq_iter++;
 
@@ -1286,6 +1346,23 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
             }
             qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
             break;
+        case RAM_SAVE_FLAG_COMPRESS_PAGE:
+            host = host_from_stream_offset(f, addr, flags);
+            if (!host) {
+                error_report("Invalid RAM offset " RAM_ADDR_FMT, addr);
+                ret = -EINVAL;
+                break;
+            }
+
+            len = qemu_get_be32(f);
+            if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) {
+                error_report("Invalid compressed data length: %d", len);
+                ret = -EINVAL;
+                break;
+            }
+            qemu_get_buffer(f, compressed_data_buf, len);
+            decompress_data_with_multi_threads(compressed_data_buf, host, len);
+            break;
         case RAM_SAVE_FLAG_XBZRLE:
             host = host_from_stream_offset(f, addr, flags);
             if (!host) {
diff --git a/include/migration/migration.h b/include/migration/migration.h
index a3ebbf6c6e..d4a10627cd 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -51,6 +51,7 @@ struct MigrationState
     QEMUBH *cleanup_bh;
     QEMUFile *file;
     int compress_thread_count;
+    int decompress_thread_count;
     int compress_level;
 
     int state;
@@ -108,6 +109,8 @@ MigrationState *migrate_get_current(void);
 
 void migrate_compress_threads_create(void);
 void migrate_compress_threads_join(void);
+void migrate_decompress_threads_create(void);
+void migrate_decompress_threads_join(void);
 uint64_t ram_bytes_remaining(void);
 uint64_t ram_bytes_transferred(void);
 uint64_t ram_bytes_total(void);
@@ -159,6 +162,7 @@ int64_t xbzrle_cache_resize(int64_t new_size);
 bool migrate_use_compression(void);
 int migrate_compress_level(void);
 int migrate_compress_threads(void);
+int migrate_decompress_threads(void);
 
 void ram_control_before_iterate(QEMUFile *f, uint64_t flags);
 void ram_control_after_iterate(QEMUFile *f, uint64_t flags);
diff --git a/migration/migration.c b/migration/migration.c
index 5a8b5a7c74..19409e6790 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -35,6 +35,9 @@
 
 /* Default compression thread count */
 #define DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT 8
+/* Default decompression thread count, usually decompression is at
+ * least 4 times as fast as compression.*/
+#define DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT 2
 /*0: means nocompress, 1: best speed, ... 9: best compress ratio */
 #define DEFAULT_MIGRATE_COMPRESS_LEVEL 1
 
@@ -58,6 +61,7 @@ MigrationState *migrate_get_current(void)
         .xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE,
         .mbps = -1,
         .compress_thread_count = DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT,
+        .decompress_thread_count = DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT,
         .compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL,
     };
 
@@ -113,6 +117,7 @@ static void process_incoming_migration_co(void *opaque)
     free_xbzrle_decoded_buf();
     if (ret < 0) {
         error_report("load of migration failed: %s", strerror(-ret));
+        migrate_decompress_threads_join();
         exit(EXIT_FAILURE);
     }
     qemu_announce_self();
@@ -121,6 +126,7 @@ static void process_incoming_migration_co(void *opaque)
     bdrv_invalidate_cache_all(&local_err);
     if (local_err) {
         error_report_err(local_err);
+        migrate_decompress_threads_join();
         exit(EXIT_FAILURE);
     }
 
@@ -129,6 +135,7 @@ static void process_incoming_migration_co(void *opaque)
     } else {
         runstate_set(RUN_STATE_PAUSED);
     }
+    migrate_decompress_threads_join();
 }
 
 void process_incoming_migration(QEMUFile *f)
@@ -137,6 +144,7 @@ void process_incoming_migration(QEMUFile *f)
     int fd = qemu_get_fd(f);
 
     assert(fd != -1);
+    migrate_decompress_threads_create();
     qemu_set_nonblock(fd);
     qemu_coroutine_enter(co, f);
 }
@@ -400,6 +408,7 @@ static MigrationState *migrate_init(const MigrationParams *params)
     int64_t xbzrle_cache_size = s->xbzrle_cache_size;
     int compress_level = s->compress_level;
     int compress_thread_count = s->compress_thread_count;
+    int decompress_thread_count = s->decompress_thread_count;
 
     memcpy(enabled_capabilities, s->enabled_capabilities,
            sizeof(enabled_capabilities));
@@ -412,6 +421,7 @@ static MigrationState *migrate_init(const MigrationParams *params)
 
     s->compress_level = compress_level;
     s->compress_thread_count = compress_thread_count;
+    s->decompress_thread_count = decompress_thread_count;
     s->bandwidth_limit = bandwidth_limit;
     s->state = MIGRATION_STATUS_SETUP;
     trace_migrate_set_state(MIGRATION_STATUS_SETUP);
@@ -623,6 +633,15 @@ int migrate_compress_threads(void)
     return s->compress_thread_count;
 }
 
+int migrate_decompress_threads(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->decompress_thread_count;
+}
+
 int migrate_use_xbzrle(void)
 {
     MigrationState *s;