summary refs log tree commit diff stats
path: root/migration
diff options
context:
space:
mode:
Diffstat (limited to 'migration')
-rw-r--r--migration/block.c7
-rw-r--r--migration/colo.c11
-rw-r--r--migration/migration-hmp-cmds.c10
-rw-r--r--migration/migration-stats.c10
-rw-r--r--migration/migration-stats.h16
-rw-r--r--migration/migration.c27
-rw-r--r--migration/multifd.c3
-rw-r--r--migration/options.c36
-rw-r--r--migration/qemu-file.c43
-rw-r--r--migration/qemu-file.h27
-rw-r--r--migration/ram-compress.c112
-rw-r--r--migration/ram-compress.h10
-rw-r--r--migration/ram.c150
-rw-r--r--migration/ram.h3
-rw-r--r--migration/rdma.c12
-rw-r--r--migration/savevm.c11
-rw-r--r--migration/vmstate.c9
17 files changed, 245 insertions, 252 deletions
diff --git a/migration/block.c b/migration/block.c
index b60698d6e2..a15f9bddcb 100644
--- a/migration/block.c
+++ b/migration/block.c
@@ -731,6 +731,9 @@ static int block_save_setup(QEMUFile *f, void *opaque)
     trace_migration_block_save("setup", block_mig_state.submitted,
                                block_mig_state.transferred);
 
+    warn_report("block migration is deprecated;"
+                " use blockdev-mirror with NBD instead");
+
     ret = init_blk_migration(f);
     if (ret < 0) {
         return ret;
@@ -752,7 +755,7 @@ static int block_save_setup(QEMUFile *f, void *opaque)
 static int block_save_iterate(QEMUFile *f, void *opaque)
 {
     int ret;
-    uint64_t last_bytes = qemu_file_transferred_noflush(f);
+    uint64_t last_bytes = qemu_file_transferred(f);
 
     trace_migration_block_save("iterate", block_mig_state.submitted,
                                block_mig_state.transferred);
@@ -804,7 +807,7 @@ static int block_save_iterate(QEMUFile *f, void *opaque)
     }
 
     qemu_put_be64(f, BLK_MIG_FLAG_EOS);
-    uint64_t delta_bytes = qemu_file_transferred_noflush(f) - last_bytes;
+    uint64_t delta_bytes = qemu_file_transferred(f) - last_bytes;
     return (delta_bytes > 0);
 }
 
diff --git a/migration/colo.c b/migration/colo.c
index 72f4f7b37e..4447e34914 100644
--- a/migration/colo.c
+++ b/migration/colo.c
@@ -314,9 +314,7 @@ static void colo_send_message(QEMUFile *f, COLOMessage msg,
         return;
     }
     qemu_put_be32(f, msg);
-    qemu_fflush(f);
-
-    ret = qemu_file_get_error(f);
+    ret = qemu_fflush(f);
     if (ret < 0) {
         error_setg_errno(errp, -ret, "Can't send COLO message");
     }
@@ -335,9 +333,7 @@ static void colo_send_message_value(QEMUFile *f, COLOMessage msg,
         return;
     }
     qemu_put_be64(f, value);
-    qemu_fflush(f);
-
-    ret = qemu_file_get_error(f);
+    ret = qemu_fflush(f);
     if (ret < 0) {
         error_setg_errno(errp, -ret, "Failed to send value for message:%s",
                          COLOMessage_str(msg));
@@ -483,8 +479,7 @@ static int colo_do_checkpoint_transaction(MigrationState *s,
     }
 
     qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage);
-    qemu_fflush(s->to_dst_file);
-    ret = qemu_file_get_error(s->to_dst_file);
+    ret = qemu_fflush(s->to_dst_file);
     if (ret < 0) {
         goto out;
     }
diff --git a/migration/migration-hmp-cmds.c b/migration/migration-hmp-cmds.c
index a82597f18e..dfe98da355 100644
--- a/migration/migration-hmp-cmds.c
+++ b/migration/migration-hmp-cmds.c
@@ -745,6 +745,16 @@ void hmp_migrate(Monitor *mon, const QDict *qdict)
     const char *uri = qdict_get_str(qdict, "uri");
     Error *err = NULL;
 
+    if (inc) {
+        warn_report("option '-i' is deprecated;"
+                    " use blockdev-mirror with NBD instead");
+    }
+
+    if (blk) {
+        warn_report("option '-b' is deprecated;"
+                    " use blockdev-mirror with NBD instead");
+    }
+
     qmp_migrate(uri, !!blk, blk, !!inc, inc,
                 false, false, true, resume, &err);
     if (hmp_handle_error(mon, err)) {
diff --git a/migration/migration-stats.c b/migration/migration-stats.c
index 4cc989d975..f690b98a03 100644
--- a/migration/migration-stats.c
+++ b/migration/migration-stats.c
@@ -30,7 +30,7 @@ bool migration_rate_exceeded(QEMUFile *f)
     }
 
     uint64_t rate_limit_start = stat64_get(&mig_stats.rate_limit_start);
-    uint64_t rate_limit_current = migration_transferred_bytes(f);
+    uint64_t rate_limit_current = migration_transferred_bytes();
     uint64_t rate_limit_used = rate_limit_current - rate_limit_start;
 
     if (rate_limit_max > 0 && rate_limit_used > rate_limit_max) {
@@ -54,16 +54,16 @@ void migration_rate_set(uint64_t limit)
     stat64_set(&mig_stats.rate_limit_max, limit / XFER_LIMIT_RATIO);
 }
 
-void migration_rate_reset(QEMUFile *f)
+void migration_rate_reset(void)
 {
-    stat64_set(&mig_stats.rate_limit_start, migration_transferred_bytes(f));
+    stat64_set(&mig_stats.rate_limit_start, migration_transferred_bytes());
 }
 
-uint64_t migration_transferred_bytes(QEMUFile *f)
+uint64_t migration_transferred_bytes(void)
 {
     uint64_t multifd = stat64_get(&mig_stats.multifd_bytes);
     uint64_t rdma = stat64_get(&mig_stats.rdma_bytes);
-    uint64_t qemu_file = qemu_file_transferred(f);
+    uint64_t qemu_file = stat64_get(&mig_stats.qemu_file_transferred);
 
     trace_migration_transferred_bytes(qemu_file, multifd, rdma);
     return qemu_file + multifd + rdma;
diff --git a/migration/migration-stats.h b/migration/migration-stats.h
index 2358caad63..05290ade76 100644
--- a/migration/migration-stats.h
+++ b/migration/migration-stats.h
@@ -82,6 +82,10 @@ typedef struct {
      */
     Stat64 precopy_bytes;
     /*
+     * Number of bytes transferred with QEMUFile.
+     */
+    Stat64 qemu_file_transferred;
+    /*
      * Amount of transferred data at the start of current cycle.
      */
     Stat64 rate_limit_start;
@@ -94,10 +98,6 @@ typedef struct {
      */
     Stat64 rdma_bytes;
     /*
-     * Total number of bytes transferred.
-     */
-    Stat64 transferred;
-    /*
      * Number of pages transferred that were full of zeros.
      */
     Stat64 zero_pages;
@@ -116,10 +116,8 @@ uint64_t migration_rate_get(void);
  * migration_rate_reset: Reset the rate limit counter.
  *
  * This is called when we know we start a new transfer cycle.
- *
- * @f: QEMUFile used for main migration channel
  */
-void migration_rate_reset(QEMUFile *f);
+void migration_rate_reset(void);
 
 /**
  * migration_rate_set: Set the maximum amount that can be transferred.
@@ -133,11 +131,9 @@ void migration_rate_set(uint64_t new_rate);
 /**
  * migration_transferred_bytes: Return number of bytes transferred
  *
- * @f: QEMUFile used for main migration channel
- *
  * Returns how many bytes have we transferred since the beginning of
  * the migration.  It accounts for bytes sent through any migration
  * channel, multifd, qemu_file, rdma, ....
  */
-uint64_t migration_transferred_bytes(QEMUFile *f);
+uint64_t migration_transferred_bytes(void);
 #endif
diff --git a/migration/migration.c b/migration/migration.c
index 67547eb6a1..6abcbefd9c 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -305,12 +305,7 @@ static int migrate_send_rp_message(MigrationIncomingState *mis,
     qemu_put_be16(mis->to_src_file, (unsigned int)message_type);
     qemu_put_be16(mis->to_src_file, len);
     qemu_put_buffer(mis->to_src_file, data, len);
-    qemu_fflush(mis->to_src_file);
-
-    /* It's possible that qemu file got error during sending */
-    ret = qemu_file_get_error(mis->to_src_file);
-
-    return ret;
+    return qemu_fflush(mis->to_src_file);
 }
 
 /* Request one page from the source VM at the given start address.
@@ -942,7 +937,7 @@ static void populate_ram_info(MigrationInfo *info, MigrationState *s)
     size_t page_size = qemu_target_page_size();
 
     info->ram = g_malloc0(sizeof(*info->ram));
-    info->ram->transferred = stat64_get(&mig_stats.transferred);
+    info->ram->transferred = migration_transferred_bytes();
     info->ram->total = ram_bytes_total();
     info->ram->duplicate = stat64_get(&mig_stats.zero_pages);
     /* legacy value.  It is not used anymore */
@@ -1620,6 +1615,16 @@ static bool migrate_prepare(MigrationState *s, bool blk, bool blk_inc,
 {
     Error *local_err = NULL;
 
+    if (blk_inc) {
+        warn_report("parameter 'inc' is deprecated;"
+                    " use blockdev-mirror with NBD instead");
+    }
+
+    if (blk) {
+        warn_report("parameter 'blk' is deprecated;"
+                    " use blockdev-mirror with NBD instead");
+    }
+
     if (resume) {
         if (s->state != MIGRATION_STATUS_POSTCOPY_PAUSED) {
             error_setg(errp, "Cannot resume if there is no "
@@ -2694,7 +2699,7 @@ static MigThrError migration_detect_error(MigrationState *s)
 
 static void migration_calculate_complete(MigrationState *s)
 {
-    uint64_t bytes = migration_transferred_bytes(s->to_dst_file);
+    uint64_t bytes = migration_transferred_bytes();
     int64_t end_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
     int64_t transfer_time;
 
@@ -2720,7 +2725,7 @@ static void update_iteration_initial_status(MigrationState *s)
      * wrong speed calculation.
      */
     s->iteration_start_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
-    s->iteration_initial_bytes = migration_transferred_bytes(s->to_dst_file);
+    s->iteration_initial_bytes = migration_transferred_bytes();
     s->iteration_initial_pages = ram_get_total_transferred_pages();
 }
 
@@ -2739,7 +2744,7 @@ static void migration_update_counters(MigrationState *s,
     }
 
     switchover_bw = migrate_avail_switchover_bandwidth();
-    current_bytes = migration_transferred_bytes(s->to_dst_file);
+    current_bytes = migration_transferred_bytes();
     transferred = current_bytes - s->iteration_initial_bytes;
     time_spent = current_time - s->iteration_start_time;
     bandwidth = (double)transferred / time_spent;
@@ -2775,7 +2780,7 @@ static void migration_update_counters(MigrationState *s,
             stat64_get(&mig_stats.dirty_bytes_last_sync) / expected_bw_per_ms;
     }
 
-    migration_rate_reset(s->to_dst_file);
+    migration_rate_reset();
 
     update_iteration_initial_status(s);
 
diff --git a/migration/multifd.c b/migration/multifd.c
index e2a45c667a..ec58c58082 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -188,7 +188,6 @@ static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
         return -1;
     }
     stat64_add(&mig_stats.multifd_bytes, size);
-    stat64_add(&mig_stats.transferred, size);
     return 0;
 }
 
@@ -733,8 +732,6 @@ static void *multifd_send_thread(void *opaque)
 
             stat64_add(&mig_stats.multifd_bytes,
                        p->next_packet_size + p->packet_len);
-            stat64_add(&mig_stats.transferred,
-                       p->next_packet_size + p->packet_len);
             p->next_packet_size = 0;
             qemu_mutex_lock(&p->mutex);
             p->pending_job--;
diff --git a/migration/options.c b/migration/options.c
index 42fb818956..9a39826ca5 100644
--- a/migration/options.c
+++ b/migration/options.c
@@ -12,6 +12,7 @@
  */
 
 #include "qemu/osdep.h"
+#include "qemu/error-report.h"
 #include "exec/target_page.h"
 #include "qapi/clone-visitor.h"
 #include "qapi/error.h"
@@ -473,10 +474,19 @@ bool migrate_caps_check(bool *old_caps, bool *new_caps, Error **errp)
     if (new_caps[MIGRATION_CAPABILITY_BLOCK]) {
         error_setg(errp, "QEMU compiled without old-style (blk/-b, inc/-i) "
                    "block migration");
-        error_append_hint(errp, "Use drive_mirror+NBD instead.\n");
+        error_append_hint(errp, "Use blockdev-mirror with NBD instead.\n");
         return false;
     }
 #endif
+    if (new_caps[MIGRATION_CAPABILITY_BLOCK]) {
+        warn_report("block migration is deprecated;"
+                    " use blockdev-mirror with NBD instead");
+    }
+
+    if (new_caps[MIGRATION_CAPABILITY_COMPRESS]) {
+        warn_report("old compression method is deprecated;"
+                    " use multifd compression methods instead");
+    }
 
 #ifndef CONFIG_REPLICATION
     if (new_caps[MIGRATION_CAPABILITY_X_COLO]) {
@@ -618,6 +628,20 @@ bool migrate_caps_check(bool *old_caps, bool *new_caps, Error **errp)
         }
     }
 
+    if (new_caps[MIGRATION_CAPABILITY_MULTIFD]) {
+        if (new_caps[MIGRATION_CAPABILITY_XBZRLE]) {
+            error_setg(errp, "Multifd is not compatible with xbzrle");
+            return false;
+        }
+    }
+
+    if (new_caps[MIGRATION_CAPABILITY_COMPRESS]) {
+        if (new_caps[MIGRATION_CAPABILITY_XBZRLE]) {
+            error_setg(errp, "Compression is not compatible with xbzrle");
+            return false;
+        }
+    }
+
     return true;
 }
 
@@ -1316,18 +1340,26 @@ static void migrate_params_apply(MigrateSetParameters *params, Error **errp)
     /* TODO use QAPI_CLONE() instead of duplicating it inline */
 
     if (params->has_compress_level) {
+        warn_report("old compression is deprecated;"
+                    " use multifd compression methods instead");
         s->parameters.compress_level = params->compress_level;
     }
 
     if (params->has_compress_threads) {
+        warn_report("old compression is deprecated;"
+                    " use multifd compression methods instead");
         s->parameters.compress_threads = params->compress_threads;
     }
 
     if (params->has_compress_wait_thread) {
+        warn_report("old compression is deprecated;"
+                    " use multifd compression methods instead");
         s->parameters.compress_wait_thread = params->compress_wait_thread;
     }
 
     if (params->has_decompress_threads) {
+        warn_report("old compression is deprecated;"
+                    " use multifd compression methods instead");
         s->parameters.decompress_threads = params->decompress_threads;
     }
 
@@ -1386,6 +1418,8 @@ static void migrate_params_apply(MigrateSetParameters *params, Error **errp)
     }
 
     if (params->has_block_incremental) {
+        warn_report("block migration is deprecated;"
+                    " use blockdev-mirror with NBD instead");
         s->parameters.block_incremental = params->block_incremental;
     }
     if (params->has_multifd_channels) {
diff --git a/migration/qemu-file.c b/migration/qemu-file.c
index 3fb25148d1..d64500310d 100644
--- a/migration/qemu-file.c
+++ b/migration/qemu-file.c
@@ -41,9 +41,6 @@ struct QEMUFile {
     QIOChannel *ioc;
     bool is_writable;
 
-    /* The sum of bytes transferred on the wire */
-    uint64_t total_transferred;
-
     int buf_index;
     int buf_size; /* 0 when writing */
     uint8_t buf[IO_BUF_SIZE];
@@ -207,7 +204,7 @@ void qemu_file_set_error_obj(QEMUFile *f, int ret, Error *err)
  */
 int qemu_file_get_error(QEMUFile *f)
 {
-    return qemu_file_get_error_obj(f, NULL);
+    return f->last_error;
 }
 
 /*
@@ -265,14 +262,14 @@ static void qemu_iovec_release_ram(QEMUFile *f)
  * This will flush all pending data. If data was only partially flushed, it
  * will set an error state.
  */
-void qemu_fflush(QEMUFile *f)
+int qemu_fflush(QEMUFile *f)
 {
     if (!qemu_file_is_writable(f)) {
-        return;
+        return f->last_error;
     }
 
-    if (qemu_file_get_error(f)) {
-        return;
+    if (f->last_error) {
+        return f->last_error;
     }
     if (f->iovcnt > 0) {
         Error *local_error = NULL;
@@ -282,7 +279,7 @@ void qemu_fflush(QEMUFile *f)
             qemu_file_set_error_obj(f, -EIO, local_error);
         } else {
             uint64_t size = iov_size(f->iov, f->iovcnt);
-            f->total_transferred += size;
+            stat64_add(&mig_stats.qemu_file_transferred, size);
         }
 
         qemu_iovec_release_ram(f);
@@ -290,6 +287,7 @@ void qemu_fflush(QEMUFile *f)
 
     f->buf_index = 0;
     f->iovcnt = 0;
+    return f->last_error;
 }
 
 /*
@@ -337,7 +335,6 @@ static ssize_t coroutine_mixed_fn qemu_fill_buffer(QEMUFile *f)
 
     if (len > 0) {
         f->buf_size += len;
-        f->total_transferred += len;
     } else if (len == 0) {
         qemu_file_set_error_obj(f, -EIO, local_error);
     } else {
@@ -357,22 +354,12 @@ static ssize_t coroutine_mixed_fn qemu_fill_buffer(QEMUFile *f)
  */
 int qemu_fclose(QEMUFile *f)
 {
-    int ret, ret2;
-    qemu_fflush(f);
-    ret = qemu_file_get_error(f);
-
-    ret2 = qio_channel_close(f->ioc, NULL);
+    int ret = qemu_fflush(f);
+    int ret2 = qio_channel_close(f->ioc, NULL);
     if (ret >= 0) {
         ret = ret2;
     }
     g_clear_pointer(&f->ioc, object_unref);
-
-    /* If any error was spotted before closing, we should report it
-     * instead of the close() return value.
-     */
-    if (f->last_error) {
-        ret = f->last_error;
-    }
     error_free(f->last_error_obj);
     g_free(f);
     trace_qemu_file_fclose();
@@ -622,11 +609,13 @@ int coroutine_mixed_fn qemu_get_byte(QEMUFile *f)
     return result;
 }
 
-uint64_t qemu_file_transferred_noflush(QEMUFile *f)
+uint64_t qemu_file_transferred(QEMUFile *f)
 {
-    uint64_t ret = f->total_transferred;
+    uint64_t ret = stat64_get(&mig_stats.qemu_file_transferred);
     int i;
 
+    g_assert(qemu_file_is_writable(f));
+
     for (i = 0; i < f->iovcnt; i++) {
         ret += f->iov[i].iov_len;
     }
@@ -634,12 +623,6 @@ uint64_t qemu_file_transferred_noflush(QEMUFile *f)
     return ret;
 }
 
-uint64_t qemu_file_transferred(QEMUFile *f)
-{
-    qemu_fflush(f);
-    return f->total_transferred;
-}
-
 void qemu_put_be16(QEMUFile *f, unsigned int v)
 {
     qemu_put_byte(f, v >> 8);
diff --git a/migration/qemu-file.h b/migration/qemu-file.h
index a29c37b0d0..1774116f79 100644
--- a/migration/qemu-file.h
+++ b/migration/qemu-file.h
@@ -36,31 +36,12 @@ int qemu_fclose(QEMUFile *f);
 /*
  * qemu_file_transferred:
  *
- * Report the total number of bytes transferred with
- * this file.
- *
- * For writable files, any pending buffers will be
- * flushed, so the reported value will be equal to
- * the number of bytes transferred on the wire.
- *
- * For readable files, the reported value will be
- * equal to the number of bytes transferred on the
- * wire.
- *
- * Returns: the total bytes transferred
- */
-uint64_t qemu_file_transferred(QEMUFile *f);
-
-/*
- * qemu_file_transferred_noflush:
- *
- * As qemu_file_transferred except for writable files, where no flush
- * is performed and the reported amount will include the size of any
- * queued buffers, on top of the amount actually transferred.
+ * No flush is performed and the reported amount will include the size
+ * of any queued buffers, on top of the amount actually transferred.
  *
  * Returns: the total bytes transferred and queued
  */
-uint64_t qemu_file_transferred_noflush(QEMUFile *f);
+uint64_t qemu_file_transferred(QEMUFile *f);
 
 /*
  * put_buffer without copying the buffer.
@@ -90,7 +71,7 @@ void qemu_file_set_error_obj(QEMUFile *f, int ret, Error *err);
 void qemu_file_set_error(QEMUFile *f, int ret);
 int qemu_file_shutdown(QEMUFile *f);
 QEMUFile *qemu_file_get_return_path(QEMUFile *f);
-void qemu_fflush(QEMUFile *f);
+int qemu_fflush(QEMUFile *f);
 void qemu_file_set_blocking(QEMUFile *f, bool block);
 int qemu_file_get_to_fd(QEMUFile *f, int fd, size_t size);
 
diff --git a/migration/ram-compress.c b/migration/ram-compress.c
index d037dfe6cf..fa4388f6a6 100644
--- a/migration/ram-compress.c
+++ b/migration/ram-compress.c
@@ -41,7 +41,20 @@
 #include "ram.h"
 #include "migration-stats.h"
 
-CompressionStats compression_counters;
+static struct {
+    int64_t pages;
+    int64_t busy;
+    double busy_rate;
+    int64_t compressed_size;
+    double compression_rate;
+    /* compression statistics since the beginning of the period */
+    /* amount of count that no free thread to compress data */
+    uint64_t compress_thread_busy_prev;
+    /* amount bytes after compression */
+    uint64_t compressed_size_prev;
+    /* amount of compressed pages */
+    uint64_t compress_pages_prev;
+} compression_counters;
 
 static CompressParam *comp_param;
 static QemuThread *compress_threads;
@@ -228,10 +241,14 @@ static inline void compress_reset_result(CompressParam *param)
     param->offset = 0;
 }
 
-void flush_compressed_data(int (send_queued_data(CompressParam *)))
+void compress_flush_data(void)
 {
     int thread_count = migrate_compress_threads();
 
+    if (!migrate_compress()) {
+        return;
+    }
+
     qemu_mutex_lock(&comp_done_lock);
     for (int i = 0; i < thread_count; i++) {
         while (!comp_param[i].done) {
@@ -244,7 +261,7 @@ void flush_compressed_data(int (send_queued_data(CompressParam *)))
         qemu_mutex_lock(&comp_param[i].mutex);
         if (!comp_param[i].quit) {
             CompressParam *param = &comp_param[i];
-            send_queued_data(param);
+            compress_send_queued_data(param);
             assert(qemu_file_buffer_empty(param->file));
             compress_reset_result(param);
         }
@@ -260,43 +277,47 @@ static inline void set_compress_params(CompressParam *param, RAMBlock *block,
     param->trigger = true;
 }
 
-int compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset,
-                                int (send_queued_data(CompressParam *)))
+/*
+ * Return true when it compress a page
+ */
+bool compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset,
+                                     int (send_queued_data(CompressParam *)))
 {
-    int  thread_count, pages = -1;
+    int thread_count;
     bool wait = migrate_compress_wait_thread();
 
     thread_count = migrate_compress_threads();
     qemu_mutex_lock(&comp_done_lock);
-retry:
-    for (int i = 0; i < thread_count; i++) {
-        if (comp_param[i].done) {
-            CompressParam *param = &comp_param[i];
-            qemu_mutex_lock(&param->mutex);
-            param->done = false;
-            send_queued_data(param);
-            assert(qemu_file_buffer_empty(param->file));
-            compress_reset_result(param);
-            set_compress_params(param, block, offset);
 
-            qemu_cond_signal(&param->cond);
-            qemu_mutex_unlock(&param->mutex);
-            pages = 1;
-            break;
+    while (true) {
+        for (int i = 0; i < thread_count; i++) {
+            if (comp_param[i].done) {
+                CompressParam *param = &comp_param[i];
+                qemu_mutex_lock(&param->mutex);
+                param->done = false;
+                send_queued_data(param);
+                assert(qemu_file_buffer_empty(param->file));
+                compress_reset_result(param);
+                set_compress_params(param, block, offset);
+
+                qemu_cond_signal(&param->cond);
+                qemu_mutex_unlock(&param->mutex);
+                qemu_mutex_unlock(&comp_done_lock);
+                return true;
+            }
         }
-    }
-
-    /*
-     * wait for the free thread if the user specifies 'compress-wait-thread',
-     * otherwise we will post the page out in the main thread as normal page.
-     */
-    if (pages < 0 && wait) {
+        if (!wait) {
+            qemu_mutex_unlock(&comp_done_lock);
+            compression_counters.busy++;
+            return false;
+        }
+        /*
+         * wait for a free thread if the user specifies
+         * 'compress-wait-thread', otherwise we will post the page out
+         * in the main thread as normal page.
+         */
         qemu_cond_wait(&comp_done_cond, &comp_done_lock);
-        goto retry;
     }
-    qemu_mutex_unlock(&comp_done_lock);
-
-    return pages;
 }
 
 /* return the size after decompression, or negative value on error */
@@ -495,7 +516,7 @@ void populate_compress(MigrationInfo *info)
     info->compression->compression_rate = compression_counters.compression_rate;
 }
 
-uint64_t ram_compressed_pages(void)
+uint64_t compress_ram_pages(void)
 {
     return compression_counters.pages;
 }
@@ -514,3 +535,30 @@ void update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
     compression_counters.pages++;
 }
 
+void compress_update_rates(uint64_t page_count)
+{
+    if (!migrate_compress()) {
+        return;
+    }
+    compression_counters.busy_rate = (double)(compression_counters.busy -
+            compression_counters.compress_thread_busy_prev) / page_count;
+    compression_counters.compress_thread_busy_prev =
+            compression_counters.busy;
+
+    double compressed_size = compression_counters.compressed_size -
+        compression_counters.compressed_size_prev;
+    if (compressed_size) {
+        double uncompressed_size = (compression_counters.pages -
+                                    compression_counters.compress_pages_prev) *
+            qemu_target_page_size();
+
+        /* Compression-Ratio = Uncompressed-size / Compressed-size */
+        compression_counters.compression_rate =
+            uncompressed_size / compressed_size;
+
+        compression_counters.compress_pages_prev =
+            compression_counters.pages;
+        compression_counters.compressed_size_prev =
+            compression_counters.compressed_size;
+    }
+}
diff --git a/migration/ram-compress.h b/migration/ram-compress.h
index e55d3b50bd..0d89a2f55e 100644
--- a/migration/ram-compress.h
+++ b/migration/ram-compress.h
@@ -59,9 +59,8 @@ typedef struct CompressParam CompressParam;
 void compress_threads_save_cleanup(void);
 int compress_threads_save_setup(void);
 
-void flush_compressed_data(int (send_queued_data(CompressParam *)));
-int compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset,
-                                int (send_queued_data(CompressParam *)));
+bool compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset,
+                                      int (send_queued_data(CompressParam *)));
 
 int wait_for_decompress_done(void);
 void compress_threads_load_cleanup(void);
@@ -69,7 +68,10 @@ int compress_threads_load_setup(QEMUFile *f);
 void decompress_data_with_multi_threads(QEMUFile *f, void *host, int len);
 
 void populate_compress(MigrationInfo *info);
-uint64_t ram_compressed_pages(void);
+uint64_t compress_ram_pages(void);
 void update_compress_thread_counts(const CompressParam *param, int bytes_xmit);
+void compress_update_rates(uint64_t page_count);
+int compress_send_queued_data(CompressParam *param);
+void compress_flush_data(void);
 
 #endif
diff --git a/migration/ram.c b/migration/ram.c
index 92769902bb..34724e8fe8 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -305,17 +305,15 @@ int64_t ramblock_recv_bitmap_send(QEMUFile *file,
 
     qemu_put_be64(file, size);
     qemu_put_buffer(file, (const uint8_t *)le_bitmap, size);
+    g_free(le_bitmap);
     /*
      * Mark as an end, in case the middle part is screwed up due to
      * some "mysterious" reason.
      */
     qemu_put_be64(file, RAMBLOCK_RECV_BITMAP_ENDING);
-    qemu_fflush(file);
-
-    g_free(le_bitmap);
-
-    if (qemu_file_get_error(file)) {
-        return qemu_file_get_error(file);
+    int ret = qemu_fflush(file);
+    if (ret) {
+        return ret;
     }
 
     return size + sizeof(size);
@@ -369,13 +367,6 @@ struct RAMState {
     bool xbzrle_started;
     /* Are we on the last stage of migration */
     bool last_stage;
-    /* compression statistics since the beginning of the period */
-    /* amount of count that no free thread to compress data */
-    uint64_t compress_thread_busy_prev;
-    /* amount bytes after compression */
-    uint64_t compressed_size_prev;
-    /* amount of compressed pages */
-    uint64_t compress_pages_prev;
 
     /* total handled target pages at the beginning of period */
     uint64_t target_page_count_prev;
@@ -455,7 +446,6 @@ void ram_transferred_add(uint64_t bytes)
     } else {
         stat64_add(&mig_stats.downtime_bytes, bytes);
     }
-    stat64_add(&mig_stats.transferred, bytes);
 }
 
 struct MigrationOps {
@@ -564,7 +554,7 @@ void mig_throttle_counter_reset(void)
 
     rs->time_last_bitmap_sync = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
     rs->num_dirty_pages_period = 0;
-    rs->bytes_xfer_prev = stat64_get(&mig_stats.transferred);
+    rs->bytes_xfer_prev = migration_transferred_bytes();
 }
 
 /**
@@ -939,13 +929,12 @@ uint64_t ram_get_total_transferred_pages(void)
 {
     return stat64_get(&mig_stats.normal_pages) +
         stat64_get(&mig_stats.zero_pages) +
-        ram_compressed_pages() + xbzrle_counters.pages;
+        compress_ram_pages() + xbzrle_counters.pages;
 }
 
 static void migration_update_rates(RAMState *rs, int64_t end_time)
 {
     uint64_t page_count = rs->target_page_count - rs->target_page_count_prev;
-    double compressed_size;
 
     /* calculate period counters */
     stat64_set(&mig_stats.dirty_pages_rate,
@@ -973,26 +962,7 @@ static void migration_update_rates(RAMState *rs, int64_t end_time)
         rs->xbzrle_pages_prev = xbzrle_counters.pages;
         rs->xbzrle_bytes_prev = xbzrle_counters.bytes;
     }
-
-    if (migrate_compress()) {
-        compression_counters.busy_rate = (double)(compression_counters.busy -
-            rs->compress_thread_busy_prev) / page_count;
-        rs->compress_thread_busy_prev = compression_counters.busy;
-
-        compressed_size = compression_counters.compressed_size -
-                          rs->compressed_size_prev;
-        if (compressed_size) {
-            double uncompressed_size = (compression_counters.pages -
-                                    rs->compress_pages_prev) * TARGET_PAGE_SIZE;
-
-            /* Compression-Ratio = Uncompressed-size / Compressed-size */
-            compression_counters.compression_rate =
-                                        uncompressed_size / compressed_size;
-
-            rs->compress_pages_prev = compression_counters.pages;
-            rs->compressed_size_prev = compression_counters.compressed_size;
-        }
-    }
+    compress_update_rates(page_count);
 }
 
 /*
@@ -1030,7 +1000,7 @@ static void migration_trigger_throttle(RAMState *rs)
 {
     uint64_t threshold = migrate_throttle_trigger_threshold();
     uint64_t bytes_xfer_period =
-        stat64_get(&mig_stats.transferred) - rs->bytes_xfer_prev;
+        migration_transferred_bytes() - rs->bytes_xfer_prev;
     uint64_t bytes_dirty_period = rs->num_dirty_pages_period * TARGET_PAGE_SIZE;
     uint64_t bytes_dirty_threshold = bytes_xfer_period * threshold / 100;
 
@@ -1100,7 +1070,7 @@ static void migration_bitmap_sync(RAMState *rs, bool last_stage)
         /* reset period counters */
         rs->time_last_bitmap_sync = end_time;
         rs->num_dirty_pages_period = 0;
-        rs->bytes_xfer_prev = stat64_get(&mig_stats.transferred);
+        rs->bytes_xfer_prev = migration_transferred_bytes();
     }
     if (migrate_events()) {
         uint64_t generation = stat64_get(&mig_stats.dirty_sync_count);
@@ -1291,9 +1261,7 @@ static int ram_save_multifd_page(QEMUFile *file, RAMBlock *block,
     return 1;
 }
 
-static bool save_page_use_compression(RAMState *rs);
-
-static int send_queued_data(CompressParam *param)
+int compress_send_queued_data(CompressParam *param)
 {
     PageSearchStatus *pss = &ram_state->pss[RAM_CHANNEL_PRECOPY];
     MigrationState *ms = migrate_get_current();
@@ -1329,15 +1297,6 @@ static int send_queued_data(CompressParam *param)
     return len;
 }
 
-static void ram_flush_compressed_data(RAMState *rs)
-{
-    if (!save_page_use_compression(rs)) {
-        return;
-    }
-
-    flush_compressed_data(send_queued_data);
-}
-
 #define PAGE_ALL_CLEAN 0
 #define PAGE_TRY_AGAIN 1
 #define PAGE_DIRTY_FOUND 2
@@ -1393,7 +1352,7 @@ static int find_dirty_block(RAMState *rs, PageSearchStatus *pss)
              * Also If xbzrle is on, stop using the data compression at this
              * point. In theory, xbzrle can do better than compression.
              */
-            ram_flush_compressed_data(rs);
+            compress_flush_data();
 
             /* Hit the end of the list */
             pss->block = QLIST_FIRST_RCU(&ram_list.blocks);
@@ -2042,24 +2001,6 @@ int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len)
     return 0;
 }
 
-static bool save_page_use_compression(RAMState *rs)
-{
-    if (!migrate_compress()) {
-        return false;
-    }
-
-    /*
-     * If xbzrle is enabled (e.g., after first round of migration), stop
-     * using the data compression. In theory, xbzrle can do better than
-     * compression.
-     */
-    if (rs->xbzrle_started) {
-        return false;
-    }
-
-    return true;
-}
-
 /*
  * try to compress the page before posting it out, return true if the page
  * has been properly handled by compression, otherwise needs other
@@ -2068,7 +2009,7 @@ static bool save_page_use_compression(RAMState *rs)
 static bool save_compress_page(RAMState *rs, PageSearchStatus *pss,
                                ram_addr_t offset)
 {
-    if (!save_page_use_compression(rs)) {
+    if (!migrate_compress()) {
         return false;
     }
 
@@ -2083,17 +2024,12 @@ static bool save_compress_page(RAMState *rs, PageSearchStatus *pss,
      * much CPU resource.
      */
     if (pss->block != pss->last_sent_block) {
-        ram_flush_compressed_data(rs);
+        compress_flush_data();
         return false;
     }
 
-    if (compress_page_with_multi_thread(pss->block, offset,
-                                        send_queued_data) > 0) {
-        return true;
-    }
-
-    compression_counters.busy++;
-    return false;
+    return compress_page_with_multi_thread(pss->block, offset,
+                                           compress_send_queued_data);
 }
 
 /**
@@ -3034,11 +2970,13 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
     ret = rdma_registration_start(f, RAM_CONTROL_SETUP);
     if (ret < 0) {
         qemu_file_set_error(f, ret);
+        return ret;
     }
 
     ret = rdma_registration_stop(f, RAM_CONTROL_SETUP);
     if (ret < 0) {
         qemu_file_set_error(f, ret);
+        return ret;
     }
 
     migration_ops = g_malloc0(sizeof(MigrationOps));
@@ -3056,9 +2994,7 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
     }
 
     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
-    qemu_fflush(f);
-
-    return 0;
+    return qemu_fflush(f);
 }
 
 /**
@@ -3104,6 +3040,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
         ret = rdma_registration_start(f, RAM_CONTROL_ROUND);
         if (ret < 0) {
             qemu_file_set_error(f, ret);
+            goto out;
         }
 
         t0 = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
@@ -3135,7 +3072,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
              * page is sent in one chunk.
              */
             if (migrate_postcopy_ram()) {
-                ram_flush_compressed_data(rs);
+                compress_flush_data();
             }
 
             /*
@@ -3177,10 +3114,8 @@ out:
         }
 
         qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
-        qemu_fflush(f);
         ram_transferred_add(8);
-
-        ret = qemu_file_get_error(f);
+        ret = qemu_fflush(f);
     }
     if (ret < 0) {
         return ret;
@@ -3215,6 +3150,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
         ret = rdma_registration_start(f, RAM_CONTROL_FINISH);
         if (ret < 0) {
             qemu_file_set_error(f, ret);
+            return ret;
         }
 
         /* try transferring iterative blocks of memory */
@@ -3230,24 +3166,21 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
                 break;
             }
             if (pages < 0) {
-                ret = pages;
-                break;
+                qemu_mutex_unlock(&rs->bitmap_mutex);
+                return pages;
             }
         }
         qemu_mutex_unlock(&rs->bitmap_mutex);
 
-        ram_flush_compressed_data(rs);
+        compress_flush_data();
 
-        int ret = rdma_registration_stop(f, RAM_CONTROL_FINISH);
+        ret = rdma_registration_stop(f, RAM_CONTROL_FINISH);
         if (ret < 0) {
             qemu_file_set_error(f, ret);
+            return ret;
         }
     }
 
-    if (ret < 0) {
-        return ret;
-    }
-
     ret = multifd_send_sync_main(rs->pss[RAM_CHANNEL_PRECOPY].pss_channel);
     if (ret < 0) {
         return ret;
@@ -3257,9 +3190,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
         qemu_put_be64(f, RAM_SAVE_FLAG_MULTIFD_FLUSH);
     }
     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
-    qemu_fflush(f);
-
-    return 0;
+    return qemu_fflush(f);
 }
 
 static void ram_state_pending_estimate(void *opaque, uint64_t *must_precopy,
@@ -3446,7 +3377,7 @@ static inline void *colo_cache_from_block_offset(RAMBlock *block,
 }
 
 /**
- * ram_handle_compressed: handle the zero page case
+ * ram_handle_zero: handle the zero page case
  *
  * If a page (or a whole RDMA chunk) has been
  * determined to be zero, then zap it.
@@ -3455,10 +3386,10 @@ static inline void *colo_cache_from_block_offset(RAMBlock *block,
  * @ch: what the page is filled from.  We only support zero
  * @size: size of the zero page
  */
-void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
+void ram_handle_zero(void *host, uint64_t size)
 {
-    if (ch != 0 || !buffer_is_zero(host, size)) {
-        memset(host, ch, size);
+    if (!buffer_is_zero(host, size)) {
+        memset(host, 0, size);
     }
 }
 
@@ -3715,16 +3646,18 @@ int ram_load_postcopy(QEMUFile *f, int channel)
         switch (flags & ~RAM_SAVE_FLAG_CONTINUE) {
         case RAM_SAVE_FLAG_ZERO:
             ch = qemu_get_byte(f);
+            if (ch != 0) {
+                error_report("Found a zero page with value %d", ch);
+                ret = -EINVAL;
+                break;
+            }
             /*
              * Can skip to set page_buffer when
              * this is a zero page and (block->page_size == TARGET_PAGE_SIZE).
              */
-            if (ch || !matches_target_page_size) {
+            if (!matches_target_page_size) {
                 memset(page_buffer, ch, TARGET_PAGE_SIZE);
             }
-            if (ch) {
-                tmp_page->all_zero = false;
-            }
             break;
 
         case RAM_SAVE_FLAG_PAGE:
@@ -4030,7 +3963,12 @@ static int ram_load_precopy(QEMUFile *f)
 
         case RAM_SAVE_FLAG_ZERO:
             ch = qemu_get_byte(f);
-            ram_handle_compressed(host, ch, TARGET_PAGE_SIZE);
+            if (ch != 0) {
+                error_report("Found a zero page with value %d", ch);
+                ret = -EINVAL;
+                break;
+            }
+            ram_handle_zero(host, TARGET_PAGE_SIZE);
             break;
 
         case RAM_SAVE_FLAG_PAGE:
diff --git a/migration/ram.h b/migration/ram.h
index 145c915ca7..9f3ad1ee81 100644
--- a/migration/ram.h
+++ b/migration/ram.h
@@ -34,7 +34,6 @@
 #include "io/channel.h"
 
 extern XBZRLECacheStats xbzrle_counters;
-extern CompressionStats compression_counters;
 
 /* Should be holding either ram_list.mutex, or the RCU lock. */
 #define RAMBLOCK_FOREACH_NOT_IGNORED(block)            \
@@ -60,7 +59,7 @@ int ram_discard_range(const char *block_name, uint64_t start, size_t length);
 int ram_postcopy_incoming_init(MigrationIncomingState *mis);
 int ram_load_postcopy(QEMUFile *f, int channel);
 
-void ram_handle_compressed(void *host, uint8_t ch, uint64_t size);
+void ram_handle_zero(void *host, uint64_t size);
 
 void ram_transferred_add(uint64_t bytes);
 void ram_release_page(const char *rbname, uint64_t offset);
diff --git a/migration/rdma.c b/migration/rdma.c
index 2a1852ec7f..2938db4f64 100644
--- a/migration/rdma.c
+++ b/migration/rdma.c
@@ -3592,8 +3592,12 @@ int rdma_registration_handle(QEMUFile *f)
 
             host_addr = block->local_host_addr +
                             (comp->offset - block->offset);
-
-            ram_handle_compressed(host_addr, comp->value, comp->length);
+            if (comp->value) {
+                error_report("rdma: Zero page with non-zero (%d) value",
+                             comp->value);
+                goto err;
+            }
+            ram_handle_zero(host_addr, comp->length);
             break;
 
         case RDMA_CONTROL_REGISTER_FINISHED:
@@ -3849,9 +3853,7 @@ int rdma_registration_start(QEMUFile *f, uint64_t flags)
 
     trace_rdma_registration_start(flags);
     qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
-    qemu_fflush(f);
-
-    return 0;
+    return qemu_fflush(f);
 }
 
 /*
diff --git a/migration/savevm.c b/migration/savevm.c
index 8622f229e5..c7835e9c73 100644
--- a/migration/savevm.c
+++ b/migration/savevm.c
@@ -927,9 +927,9 @@ static int vmstate_load(QEMUFile *f, SaveStateEntry *se)
 static void vmstate_save_old_style(QEMUFile *f, SaveStateEntry *se,
                                    JSONWriter *vmdesc)
 {
-    uint64_t old_offset = qemu_file_transferred_noflush(f);
+    uint64_t old_offset = qemu_file_transferred(f);
     se->ops->save_state(f, se->opaque);
-    uint64_t size = qemu_file_transferred_noflush(f) - old_offset;
+    uint64_t size = qemu_file_transferred(f) - old_offset;
 
     if (vmdesc) {
         json_writer_int64(vmdesc, "size", size);
@@ -985,7 +985,7 @@ static int vmstate_save(QEMUFile *f, SaveStateEntry *se, JSONWriter *vmdesc)
     if ((!se->ops || !se->ops->save_state) && !se->vmsd) {
         return 0;
     }
-    if (se->vmsd && !vmstate_save_needed(se->vmsd, se->opaque)) {
+    if (se->vmsd && !vmstate_section_needed(se->vmsd, se->opaque)) {
         trace_savevm_section_skip(se->idstr, se->section_id);
         return 0;
     }
@@ -1583,8 +1583,7 @@ int qemu_savevm_state_complete_precopy(QEMUFile *f, bool iterable_only,
     }
 
 flush:
-    qemu_fflush(f);
-    return 0;
+    return qemu_fflush(f);
 }
 
 /* Give an estimate of the amount left to be transferred,
@@ -3053,7 +3052,7 @@ bool save_snapshot(const char *name, bool overwrite, const char *vmstate,
         goto the_end;
     }
     ret = qemu_savevm_state(f, errp);
-    vm_state_size = qemu_file_transferred_noflush(f);
+    vm_state_size = qemu_file_transferred(f);
     ret2 = qemu_fclose(f);
     if (ret < 0) {
         goto the_end;
diff --git a/migration/vmstate.c b/migration/vmstate.c
index 1cf9e45b85..b7723a4187 100644
--- a/migration/vmstate.c
+++ b/migration/vmstate.c
@@ -179,6 +179,7 @@ int vmstate_load_state(QEMUFile *f, const VMStateDescription *vmsd,
     assert(field->flags == VMS_END);
     ret = vmstate_subsection_load(f, vmsd, opaque);
     if (ret != 0) {
+        qemu_file_set_error(f, ret);
         return ret;
     }
     if (vmsd->post_load) {
@@ -324,7 +325,7 @@ static void vmsd_desc_field_end(const VMStateDescription *vmsd,
 }
 
 
-bool vmstate_save_needed(const VMStateDescription *vmsd, void *opaque)
+bool vmstate_section_needed(const VMStateDescription *vmsd, void *opaque)
 {
     if (vmsd->needed && !vmsd->needed(opaque)) {
         /* optional section not needed */
@@ -386,7 +387,7 @@ int vmstate_save_state_v(QEMUFile *f, const VMStateDescription *vmsd,
                 void *curr_elem = first_elem + size * i;
 
                 vmsd_desc_field_start(vmsd, vmdesc_loop, field, i, n_elems);
-                old_offset = qemu_file_transferred_noflush(f);
+                old_offset = qemu_file_transferred(f);
                 if (field->flags & VMS_ARRAY_OF_POINTER) {
                     assert(curr_elem);
                     curr_elem = *(void **)curr_elem;
@@ -416,7 +417,7 @@ int vmstate_save_state_v(QEMUFile *f, const VMStateDescription *vmsd,
                     return ret;
                 }
 
-                written_bytes = qemu_file_transferred_noflush(f) - old_offset;
+                written_bytes = qemu_file_transferred(f) - old_offset;
                 vmsd_desc_field_end(vmsd, vmdesc_loop, field, written_bytes, i);
 
                 /* Compressed arrays only care about the first element */
@@ -522,7 +523,7 @@ static int vmstate_subsection_save(QEMUFile *f, const VMStateDescription *vmsd,
 
     trace_vmstate_subsection_save_top(vmsd->name);
     while (sub && *sub) {
-        if (vmstate_save_needed(*sub, opaque)) {
+        if (vmstate_section_needed(*sub, opaque)) {
             const VMStateDescription *vmsdsub = *sub;
             uint8_t len;