summary refs log tree commit diff stats
path: root/migration/ram.c
diff options
context:
space:
mode:
Diffstat (limited to 'migration/ram.c')
-rw-r--r--migration/ram.c694
1 files changed, 148 insertions, 546 deletions
diff --git a/migration/ram.c b/migration/ram.c
index 79d881f735..f69d8d42b0 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -32,10 +32,11 @@
 #include "qemu/bitmap.h"
 #include "qemu/madvise.h"
 #include "qemu/main-loop.h"
-#include "io/channel-null.h"
 #include "xbzrle.h"
+#include "ram-compress.h"
 #include "ram.h"
 #include "migration.h"
+#include "migration-stats.h"
 #include "migration/register.h"
 #include "migration/misc.h"
 #include "qemu-file.h"
@@ -57,6 +58,7 @@
 #include "qemu/iov.h"
 #include "multifd.h"
 #include "sysemu/runstate.h"
+#include "options.h"
 
 #include "hw/boards.h" /* for machine_dump_guest_core() */
 
@@ -85,6 +87,7 @@
 #define RAM_SAVE_FLAG_XBZRLE   0x40
 /* 0x80 is reserved in qemu-file.h for RAM_SAVE_FLAG_HOOK */
 #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
+#define RAM_SAVE_FLAG_MULTIFD_FLUSH    0x200
 /* We can't use any flag that is bigger than 0x200 */
 
 int (*xbzrle_encode_buffer_func)(uint8_t *, uint8_t *, int,
@@ -155,14 +158,14 @@ static struct {
 
 static void XBZRLE_cache_lock(void)
 {
-    if (migrate_use_xbzrle()) {
+    if (migrate_xbzrle()) {
         qemu_mutex_lock(&XBZRLE.lock);
     }
 }
 
 static void XBZRLE_cache_unlock(void)
 {
-    if (migrate_use_xbzrle()) {
+    if (migrate_xbzrle()) {
         qemu_mutex_unlock(&XBZRLE.lock);
     }
 }
@@ -385,8 +388,8 @@ struct RAMState {
     uint64_t xbzrle_pages_prev;
     /* Amount of xbzrle encoded bytes since the beginning of the period */
     uint64_t xbzrle_bytes_prev;
-    /* Start using XBZRLE (e.g., after the first round). */
-    bool xbzrle_enabled;
+    /* Are we really using XBZRLE (e.g., after the first round). */
+    bool xbzrle_started;
     /* Are we on the last stage of migration */
     bool last_stage;
     /* compression statistics since the beginning of the period */
@@ -458,30 +461,16 @@ uint64_t ram_bytes_remaining(void)
                        0;
 }
 
-/*
- * NOTE: not all stats in ram_counters are used in reality.  See comments
- * for struct MigrationAtomicStats.  The ultimate result of ram migration
- * counters will be a merged version with both ram_counters and the atomic
- * fields in ram_atomic_counters.
- */
-MigrationStats ram_counters;
-MigrationAtomicStats ram_atomic_counters;
-
 void ram_transferred_add(uint64_t bytes)
 {
     if (runstate_is_running()) {
-        ram_counters.precopy_bytes += bytes;
+        stat64_add(&mig_stats.precopy_bytes, bytes);
     } else if (migration_in_postcopy()) {
-        stat64_add(&ram_atomic_counters.postcopy_bytes, bytes);
+        stat64_add(&mig_stats.postcopy_bytes, bytes);
     } else {
-        ram_counters.downtime_bytes += bytes;
+        stat64_add(&mig_stats.downtime_bytes, bytes);
     }
-    stat64_add(&ram_atomic_counters.transferred, bytes);
-}
-
-void dirty_sync_missed_zero_copy(void)
-{
-    ram_counters.dirty_sync_missed_zero_copy++;
+    stat64_add(&mig_stats.transferred, bytes);
 }
 
 struct MigrationOps {
@@ -491,56 +480,8 @@ typedef struct MigrationOps MigrationOps;
 
 MigrationOps *migration_ops;
 
-CompressionStats compression_counters;
-
-struct CompressParam {
-    bool done;
-    bool quit;
-    bool zero_page;
-    QEMUFile *file;
-    QemuMutex mutex;
-    QemuCond cond;
-    RAMBlock *block;
-    ram_addr_t offset;
-
-    /* internally used fields */
-    z_stream stream;
-    uint8_t *originbuf;
-};
-typedef struct CompressParam CompressParam;
-
-struct DecompressParam {
-    bool done;
-    bool quit;
-    QemuMutex mutex;
-    QemuCond cond;
-    void *des;
-    uint8_t *compbuf;
-    int len;
-    z_stream stream;
-};
-typedef struct DecompressParam DecompressParam;
-
-static CompressParam *comp_param;
-static QemuThread *compress_threads;
-/* comp_done_cond is used to wake up the migration thread when
- * one of the compression threads has finished the compression.
- * comp_done_lock is used to co-work with comp_done_cond.
- */
-static QemuMutex comp_done_lock;
-static QemuCond comp_done_cond;
-
-static QEMUFile *decomp_file;
-static DecompressParam *decomp_param;
-static QemuThread *decompress_threads;
-static QemuMutex decomp_done_lock;
-static QemuCond decomp_done_cond;
-
 static int ram_save_host_page_urgent(PageSearchStatus *pss);
 
-static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
-                                 ram_addr_t offset, uint8_t *source_buf);
-
 /* NOTE: page is the PFN not real ram_addr_t. */
 static void pss_init(PageSearchStatus *pss, RAMBlock *rb, ram_addr_t page)
 {
@@ -559,123 +500,6 @@ static bool pss_overlap(PageSearchStatus *pss1, PageSearchStatus *pss2)
         (pss1->host_page_start == pss2->host_page_start);
 }
 
-static void *do_data_compress(void *opaque)
-{
-    CompressParam *param = opaque;
-    RAMBlock *block;
-    ram_addr_t offset;
-    bool zero_page;
-
-    qemu_mutex_lock(&param->mutex);
-    while (!param->quit) {
-        if (param->block) {
-            block = param->block;
-            offset = param->offset;
-            param->block = NULL;
-            qemu_mutex_unlock(&param->mutex);
-
-            zero_page = do_compress_ram_page(param->file, &param->stream,
-                                             block, offset, param->originbuf);
-
-            qemu_mutex_lock(&comp_done_lock);
-            param->done = true;
-            param->zero_page = zero_page;
-            qemu_cond_signal(&comp_done_cond);
-            qemu_mutex_unlock(&comp_done_lock);
-
-            qemu_mutex_lock(&param->mutex);
-        } else {
-            qemu_cond_wait(&param->cond, &param->mutex);
-        }
-    }
-    qemu_mutex_unlock(&param->mutex);
-
-    return NULL;
-}
-
-static void compress_threads_save_cleanup(void)
-{
-    int i, thread_count;
-
-    if (!migrate_use_compression() || !comp_param) {
-        return;
-    }
-
-    thread_count = migrate_compress_threads();
-    for (i = 0; i < thread_count; i++) {
-        /*
-         * we use it as a indicator which shows if the thread is
-         * properly init'd or not
-         */
-        if (!comp_param[i].file) {
-            break;
-        }
-
-        qemu_mutex_lock(&comp_param[i].mutex);
-        comp_param[i].quit = true;
-        qemu_cond_signal(&comp_param[i].cond);
-        qemu_mutex_unlock(&comp_param[i].mutex);
-
-        qemu_thread_join(compress_threads + i);
-        qemu_mutex_destroy(&comp_param[i].mutex);
-        qemu_cond_destroy(&comp_param[i].cond);
-        deflateEnd(&comp_param[i].stream);
-        g_free(comp_param[i].originbuf);
-        qemu_fclose(comp_param[i].file);
-        comp_param[i].file = NULL;
-    }
-    qemu_mutex_destroy(&comp_done_lock);
-    qemu_cond_destroy(&comp_done_cond);
-    g_free(compress_threads);
-    g_free(comp_param);
-    compress_threads = NULL;
-    comp_param = NULL;
-}
-
-static int compress_threads_save_setup(void)
-{
-    int i, thread_count;
-
-    if (!migrate_use_compression()) {
-        return 0;
-    }
-    thread_count = migrate_compress_threads();
-    compress_threads = g_new0(QemuThread, thread_count);
-    comp_param = g_new0(CompressParam, thread_count);
-    qemu_cond_init(&comp_done_cond);
-    qemu_mutex_init(&comp_done_lock);
-    for (i = 0; i < thread_count; i++) {
-        comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
-        if (!comp_param[i].originbuf) {
-            goto exit;
-        }
-
-        if (deflateInit(&comp_param[i].stream,
-                        migrate_compress_level()) != Z_OK) {
-            g_free(comp_param[i].originbuf);
-            goto exit;
-        }
-
-        /* comp_param[i].file is just used as a dummy buffer to save data,
-         * set its ops to empty.
-         */
-        comp_param[i].file = qemu_file_new_output(
-            QIO_CHANNEL(qio_channel_null_new()));
-        comp_param[i].done = true;
-        comp_param[i].quit = false;
-        qemu_mutex_init(&comp_param[i].mutex);
-        qemu_cond_init(&comp_param[i].cond);
-        qemu_thread_create(compress_threads + i, "compress",
-                           do_data_compress, comp_param + i,
-                           QEMU_THREAD_JOINABLE);
-    }
-    return 0;
-
-exit:
-    compress_threads_save_cleanup();
-    return -1;
-}
-
 /**
  * save_page_header: write page header to wire
  *
@@ -722,11 +546,10 @@ static size_t save_page_header(PageSearchStatus *pss, QEMUFile *f,
 static void mig_throttle_guest_down(uint64_t bytes_dirty_period,
                                     uint64_t bytes_dirty_threshold)
 {
-    MigrationState *s = migrate_get_current();
-    uint64_t pct_initial = s->parameters.cpu_throttle_initial;
-    uint64_t pct_increment = s->parameters.cpu_throttle_increment;
-    bool pct_tailslow = s->parameters.cpu_throttle_tailslow;
-    int pct_max = s->parameters.max_cpu_throttle;
+    uint64_t pct_initial = migrate_cpu_throttle_initial();
+    uint64_t pct_increment = migrate_cpu_throttle_increment();
+    bool pct_tailslow = migrate_cpu_throttle_tailslow();
+    int pct_max = migrate_max_cpu_throttle();
 
     uint64_t throttle_now = cpu_throttle_get_percentage();
     uint64_t cpu_now, cpu_ideal, throttle_inc;
@@ -756,7 +579,7 @@ void mig_throttle_counter_reset(void)
 
     rs->time_last_bitmap_sync = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
     rs->num_dirty_pages_period = 0;
-    rs->bytes_xfer_prev = stat64_get(&ram_atomic_counters.transferred);
+    rs->bytes_xfer_prev = stat64_get(&mig_stats.transferred);
 }
 
 /**
@@ -776,7 +599,7 @@ static void xbzrle_cache_zero_page(RAMState *rs, ram_addr_t current_addr)
     /* We don't care if this fails to allocate a new cache page
      * as long as it updated an old one */
     cache_insert(XBZRLE.cache, current_addr, XBZRLE.zero_target_page,
-                 ram_counters.dirty_sync_count);
+                 stat64_get(&mig_stats.dirty_sync_count));
 }
 
 #define ENCODING_FLAG_XBZRLE 0x1
@@ -802,13 +625,13 @@ static int save_xbzrle_page(RAMState *rs, PageSearchStatus *pss,
     int encoded_len = 0, bytes_xbzrle;
     uint8_t *prev_cached_page;
     QEMUFile *file = pss->pss_channel;
+    uint64_t generation = stat64_get(&mig_stats.dirty_sync_count);
 
-    if (!cache_is_cached(XBZRLE.cache, current_addr,
-                         ram_counters.dirty_sync_count)) {
+    if (!cache_is_cached(XBZRLE.cache, current_addr, generation)) {
         xbzrle_counters.cache_miss++;
         if (!rs->last_stage) {
             if (cache_insert(XBZRLE.cache, current_addr, *current_data,
-                             ram_counters.dirty_sync_count) == -1) {
+                             generation) == -1) {
                 return -1;
             } else {
                 /* update *current_data when the page has been
@@ -1130,8 +953,8 @@ uint64_t ram_pagesize_summary(void)
 
 uint64_t ram_get_total_transferred_pages(void)
 {
-    return  stat64_get(&ram_atomic_counters.normal) +
-        stat64_get(&ram_atomic_counters.duplicate) +
+    return stat64_get(&mig_stats.normal_pages) +
+        stat64_get(&mig_stats.zero_pages) +
         compression_counters.pages + xbzrle_counters.pages;
 }
 
@@ -1141,14 +964,15 @@ static void migration_update_rates(RAMState *rs, int64_t end_time)
     double compressed_size;
 
     /* calculate period counters */
-    ram_counters.dirty_pages_rate = rs->num_dirty_pages_period * 1000
-                / (end_time - rs->time_last_bitmap_sync);
+    stat64_set(&mig_stats.dirty_pages_rate,
+               rs->num_dirty_pages_period * 1000 /
+               (end_time - rs->time_last_bitmap_sync));
 
     if (!page_count) {
         return;
     }
 
-    if (migrate_use_xbzrle()) {
+    if (migrate_xbzrle()) {
         double encoded_size, unencoded_size;
 
         xbzrle_counters.cache_miss_rate = (double)(xbzrle_counters.cache_miss -
@@ -1166,7 +990,7 @@ static void migration_update_rates(RAMState *rs, int64_t end_time)
         rs->xbzrle_bytes_prev = xbzrle_counters.bytes;
     }
 
-    if (migrate_use_compression()) {
+    if (migrate_compress()) {
         compression_counters.busy_rate = (double)(compression_counters.busy -
             rs->compress_thread_busy_prev) / page_count;
         rs->compress_thread_busy_prev = compression_counters.busy;
@@ -1189,10 +1013,9 @@ static void migration_update_rates(RAMState *rs, int64_t end_time)
 
 static void migration_trigger_throttle(RAMState *rs)
 {
-    MigrationState *s = migrate_get_current();
-    uint64_t threshold = s->parameters.throttle_trigger_threshold;
+    uint64_t threshold = migrate_throttle_trigger_threshold();
     uint64_t bytes_xfer_period =
-        stat64_get(&ram_atomic_counters.transferred) - rs->bytes_xfer_prev;
+        stat64_get(&mig_stats.transferred) - rs->bytes_xfer_prev;
     uint64_t bytes_dirty_period = rs->num_dirty_pages_period * TARGET_PAGE_SIZE;
     uint64_t bytes_dirty_threshold = bytes_xfer_period * threshold / 100;
 
@@ -1221,7 +1044,7 @@ static void migration_bitmap_sync(RAMState *rs)
     RAMBlock *block;
     int64_t end_time;
 
-    ram_counters.dirty_sync_count++;
+    stat64_add(&mig_stats.dirty_sync_count, 1);
 
     if (!rs->time_last_bitmap_sync) {
         rs->time_last_bitmap_sync = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
@@ -1235,7 +1058,7 @@ static void migration_bitmap_sync(RAMState *rs)
         RAMBLOCK_FOREACH_NOT_IGNORED(block) {
             ramblock_sync_dirty_bitmap(rs, block);
         }
-        ram_counters.remaining = ram_bytes_remaining();
+        stat64_set(&mig_stats.dirty_bytes_last_sync, ram_bytes_remaining());
     }
     qemu_mutex_unlock(&rs->bitmap_mutex);
 
@@ -1255,10 +1078,11 @@ static void migration_bitmap_sync(RAMState *rs)
         /* reset period counters */
         rs->time_last_bitmap_sync = end_time;
         rs->num_dirty_pages_period = 0;
-        rs->bytes_xfer_prev = stat64_get(&ram_atomic_counters.transferred);
+        rs->bytes_xfer_prev = stat64_get(&mig_stats.transferred);
     }
-    if (migrate_use_events()) {
-        qapi_event_send_migration_pass(ram_counters.dirty_sync_count);
+    if (migrate_events()) {
+        uint64_t generation = stat64_get(&mig_stats.dirty_sync_count);
+        qapi_event_send_migration_pass(generation);
     }
 }
 
@@ -1331,7 +1155,7 @@ static int save_zero_page(PageSearchStatus *pss, QEMUFile *f, RAMBlock *block,
     int len = save_zero_page_to_file(pss, f, block, offset);
 
     if (len) {
-        stat64_add(&ram_atomic_counters.duplicate, 1);
+        stat64_add(&mig_stats.zero_pages, 1);
         ram_transferred_add(len);
         return 1;
     }
@@ -1368,9 +1192,9 @@ static bool control_save_page(PageSearchStatus *pss, RAMBlock *block,
     }
 
     if (bytes_xmit > 0) {
-        stat64_add(&ram_atomic_counters.normal, 1);
+        stat64_add(&mig_stats.normal_pages, 1);
     } else if (bytes_xmit == 0) {
-        stat64_add(&ram_atomic_counters.duplicate, 1);
+        stat64_add(&mig_stats.zero_pages, 1);
     }
 
     return true;
@@ -1402,7 +1226,7 @@ static int save_normal_page(PageSearchStatus *pss, RAMBlock *block,
         qemu_put_buffer(file, buf, TARGET_PAGE_SIZE);
     }
     ram_transferred_add(TARGET_PAGE_SIZE);
-    stat64_add(&ram_atomic_counters.normal, 1);
+    stat64_add(&mig_stats.normal_pages, 1);
     return 1;
 }
 
@@ -1431,7 +1255,7 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss)
     trace_ram_save_page(block->idstr, (uint64_t)offset, p);
 
     XBZRLE_cache_lock();
-    if (rs->xbzrle_enabled && !migration_in_postcopy()) {
+    if (rs->xbzrle_started && !migration_in_postcopy()) {
         pages = save_xbzrle_page(rs, pss, &p, current_addr,
                                  block, offset);
         if (!rs->last_stage) {
@@ -1458,46 +1282,18 @@ static int ram_save_multifd_page(QEMUFile *file, RAMBlock *block,
     if (multifd_queue_page(file, block, offset) < 0) {
         return -1;
     }
-    stat64_add(&ram_atomic_counters.normal, 1);
+    stat64_add(&mig_stats.normal_pages, 1);
 
     return 1;
 }
 
-static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
-                                 ram_addr_t offset, uint8_t *source_buf)
-{
-    RAMState *rs = ram_state;
-    PageSearchStatus *pss = &rs->pss[RAM_CHANNEL_PRECOPY];
-    uint8_t *p = block->host + offset;
-    int ret;
-
-    if (save_zero_page_to_file(pss, f, block, offset)) {
-        return true;
-    }
-
-    save_page_header(pss, f, block, offset | RAM_SAVE_FLAG_COMPRESS_PAGE);
-
-    /*
-     * copy it to a internal buffer to avoid it being modified by VM
-     * so that we can catch up the error during compression and
-     * decompression
-     */
-    memcpy(source_buf, p, TARGET_PAGE_SIZE);
-    ret = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE);
-    if (ret < 0) {
-        qemu_file_set_error(migrate_get_current()->to_dst_file, ret);
-        error_report("compressed data failed!");
-    }
-    return false;
-}
-
 static void
 update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
 {
     ram_transferred_add(bytes_xmit);
 
-    if (param->zero_page) {
-        stat64_add(&ram_atomic_counters.duplicate, 1);
+    if (param->result == RES_ZEROPAGE) {
+        stat64_add(&mig_stats.zero_pages, 1);
         return;
     }
 
@@ -1508,81 +1304,49 @@ update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
 
 static bool save_page_use_compression(RAMState *rs);
 
-static void flush_compressed_data(RAMState *rs)
+static int send_queued_data(CompressParam *param)
 {
+    PageSearchStatus *pss = &ram_state->pss[RAM_CHANNEL_PRECOPY];
     MigrationState *ms = migrate_get_current();
-    int idx, len, thread_count;
+    QEMUFile *file = ms->to_dst_file;
+    int len = 0;
 
-    if (!save_page_use_compression(rs)) {
-        return;
-    }
-    thread_count = migrate_compress_threads();
+    RAMBlock *block = param->block;
+    ram_addr_t offset = param->offset;
 
-    qemu_mutex_lock(&comp_done_lock);
-    for (idx = 0; idx < thread_count; idx++) {
-        while (!comp_param[idx].done) {
-            qemu_cond_wait(&comp_done_cond, &comp_done_lock);
-        }
+    if (param->result == RES_NONE) {
+        return 0;
     }
-    qemu_mutex_unlock(&comp_done_lock);
 
-    for (idx = 0; idx < thread_count; idx++) {
-        qemu_mutex_lock(&comp_param[idx].mutex);
-        if (!comp_param[idx].quit) {
-            len = qemu_put_qemu_file(ms->to_dst_file, comp_param[idx].file);
-            /*
-             * it's safe to fetch zero_page without holding comp_done_lock
-             * as there is no further request submitted to the thread,
-             * i.e, the thread should be waiting for a request at this point.
-             */
-            update_compress_thread_counts(&comp_param[idx], len);
-        }
-        qemu_mutex_unlock(&comp_param[idx].mutex);
+    assert(block == pss->last_sent_block);
+
+    if (param->result == RES_ZEROPAGE) {
+        assert(qemu_file_buffer_empty(param->file));
+        len += save_page_header(pss, file, block, offset | RAM_SAVE_FLAG_ZERO);
+        qemu_put_byte(file, 0);
+        len += 1;
+        ram_release_page(block->idstr, offset);
+    } else if (param->result == RES_COMPRESS) {
+        assert(!qemu_file_buffer_empty(param->file));
+        len += save_page_header(pss, file, block,
+                                offset | RAM_SAVE_FLAG_COMPRESS_PAGE);
+        len += qemu_put_qemu_file(file, param->file);
+    } else {
+        abort();
     }
-}
 
-static inline void set_compress_params(CompressParam *param, RAMBlock *block,
-                                       ram_addr_t offset)
-{
-    param->block = block;
-    param->offset = offset;
+    update_compress_thread_counts(param, len);
+
+    return len;
 }
 
-static int compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset)
+static void ram_flush_compressed_data(RAMState *rs)
 {
-    int idx, thread_count, bytes_xmit = -1, pages = -1;
-    bool wait = migrate_compress_wait_thread();
-    MigrationState *ms = migrate_get_current();
-
-    thread_count = migrate_compress_threads();
-    qemu_mutex_lock(&comp_done_lock);
-retry:
-    for (idx = 0; idx < thread_count; idx++) {
-        if (comp_param[idx].done) {
-            comp_param[idx].done = false;
-            bytes_xmit = qemu_put_qemu_file(ms->to_dst_file,
-                                            comp_param[idx].file);
-            qemu_mutex_lock(&comp_param[idx].mutex);
-            set_compress_params(&comp_param[idx], block, offset);
-            qemu_cond_signal(&comp_param[idx].cond);
-            qemu_mutex_unlock(&comp_param[idx].mutex);
-            pages = 1;
-            update_compress_thread_counts(&comp_param[idx], bytes_xmit);
-            break;
-        }
-    }
-
-    /*
-     * wait for the free thread if the user specifies 'compress-wait-thread',
-     * otherwise we will post the page out in the main thread as normal page.
-     */
-    if (pages < 0 && wait) {
-        qemu_cond_wait(&comp_done_cond, &comp_done_lock);
-        goto retry;
+    if (!save_page_use_compression(rs)) {
+        return;
     }
-    qemu_mutex_unlock(&comp_done_lock);
 
-    return pages;
+    flush_compressed_data(send_queued_data);
 }
 
 #define PAGE_ALL_CLEAN 0
@@ -1593,6 +1357,7 @@ retry:
  * associated with the search process.
  *
  * Returns:
+ *         <0: An error happened
  *         PAGE_ALL_CLEAN: no dirty page found, give up
  *         PAGE_TRY_AGAIN: no dirty page found, retry for next block
  *         PAGE_DIRTY_FOUND: dirty page found
@@ -1620,6 +1385,15 @@ static int find_dirty_block(RAMState *rs, PageSearchStatus *pss)
         pss->page = 0;
         pss->block = QLIST_NEXT_RCU(pss->block, next);
         if (!pss->block) {
+            if (!migrate_multifd_flush_after_each_section()) {
+                QEMUFile *f = rs->pss[RAM_CHANNEL_PRECOPY].pss_channel;
+                int ret = multifd_send_sync_main(f);
+                if (ret < 0) {
+                    return ret;
+                }
+                qemu_put_be64(f, RAM_SAVE_FLAG_MULTIFD_FLUSH);
+                qemu_fflush(f);
+            }
             /*
              * If memory migration starts over, we will meet a dirtied page
              * which may still exists in compression threads's ring, so we
@@ -1629,15 +1403,15 @@ static int find_dirty_block(RAMState *rs, PageSearchStatus *pss)
              * Also If xbzrle is on, stop using the data compression at this
              * point. In theory, xbzrle can do better than compression.
              */
-            flush_compressed_data(rs);
+            ram_flush_compressed_data(rs);
 
             /* Hit the end of the list */
             pss->block = QLIST_FIRST_RCU(&ram_list.blocks);
             /* Flag that we've looped */
             pss->complete_round = true;
             /* After the first round, enable XBZRLE. */
-            if (migrate_use_xbzrle()) {
-                rs->xbzrle_enabled = true;
+            if (migrate_xbzrle()) {
+                rs->xbzrle_started = true;
             }
         }
         /* Didn't find anything this time, but try again on the new block */
@@ -2180,7 +1954,7 @@ int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len)
     RAMBlock *ramblock;
     RAMState *rs = ram_state;
 
-    ram_counters.postcopy_requests++;
+    stat64_add(&mig_stats.postcopy_requests, 1);
     RCU_READ_LOCK_GUARD();
 
     if (!rbname) {
@@ -2280,7 +2054,7 @@ int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len)
 
 static bool save_page_use_compression(RAMState *rs)
 {
-    if (!migrate_use_compression()) {
+    if (!migrate_compress()) {
         return false;
     }
 
@@ -2289,7 +2063,7 @@ static bool save_page_use_compression(RAMState *rs)
      * using the data compression. In theory, xbzrle can do better than
      * compression.
      */
-    if (rs->xbzrle_enabled) {
+    if (rs->xbzrle_started) {
         return false;
     }
 
@@ -2319,11 +2093,11 @@ static bool save_compress_page(RAMState *rs, PageSearchStatus *pss,
      * much CPU resource.
      */
     if (block != pss->last_sent_block) {
-        flush_compressed_data(rs);
+        ram_flush_compressed_data(rs);
         return false;
     }
 
-    if (compress_page_with_multi_thread(block, offset) > 0) {
+    if (compress_page_with_multi_thread(block, offset, send_queued_data) > 0) {
         return true;
     }
 
@@ -2358,7 +2132,7 @@ static int ram_save_target_page_legacy(RAMState *rs, PageSearchStatus *pss)
         /* Must let xbzrle know, otherwise a previous (now 0'd) cached
          * page would be stale
          */
-        if (rs->xbzrle_enabled) {
+        if (rs->xbzrle_started) {
             XBZRLE_cache_lock();
             xbzrle_cache_zero_page(rs, block->offset + offset);
             XBZRLE_cache_unlock();
@@ -2372,7 +2146,7 @@ static int ram_save_target_page_legacy(RAMState *rs, PageSearchStatus *pss)
      * if host page size == guest page size the dest guest during run may
      * still see partially copied pages which is data corruption.
      */
-    if (migrate_use_multifd() && !migration_in_postcopy()) {
+    if (migrate_multifd() && !migration_in_postcopy()) {
         return ram_save_multifd_page(pss->pss_channel, block, offset);
     }
 
@@ -2612,6 +2386,9 @@ static int ram_find_and_save_block(RAMState *rs)
                     break;
                 } else if (res == PAGE_TRY_AGAIN) {
                     continue;
+                } else if (res < 0) {
+                    pages = res;
+                    break;
                 }
             }
         }
@@ -2627,19 +2404,6 @@ static int ram_find_and_save_block(RAMState *rs)
     return pages;
 }
 
-void acct_update_position(QEMUFile *f, size_t size, bool zero)
-{
-    uint64_t pages = size / TARGET_PAGE_SIZE;
-
-    if (zero) {
-        stat64_add(&ram_atomic_counters.duplicate, pages);
-    } else {
-        stat64_add(&ram_atomic_counters.normal, pages);
-        ram_transferred_add(size);
-        qemu_file_credit_transfer(f, size);
-    }
-}
-
 static uint64_t ram_bytes_total_with_ignored(void)
 {
     RAMBlock *block;
@@ -2749,7 +2513,7 @@ static void ram_state_reset(RAMState *rs)
     rs->last_seen_block = NULL;
     rs->last_page = 0;
     rs->last_version = ram_list.version;
-    rs->xbzrle_enabled = false;
+    rs->xbzrle_started = false;
 }
 
 #define MAX_WAIT 50 /* ms, half buffered_file limit */
@@ -2989,7 +2753,7 @@ static int xbzrle_init(void)
 {
     Error *local_err = NULL;
 
-    if (!migrate_use_xbzrle()) {
+    if (!migrate_xbzrle()) {
         return 0;
     }
 
@@ -3293,11 +3057,15 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
 
     migration_ops = g_malloc0(sizeof(MigrationOps));
     migration_ops->ram_save_target_page = ram_save_target_page_legacy;
-    ret =  multifd_send_sync_main(f);
+    ret = multifd_send_sync_main(f);
     if (ret < 0) {
         return ret;
     }
 
+    if (!migrate_multifd_flush_after_each_section()) {
+        qemu_put_be64(f, RAM_SAVE_FLAG_MULTIFD_FLUSH);
+    }
+
     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
     qemu_fflush(f);
 
@@ -3375,7 +3143,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
              * page is sent in one chunk.
              */
             if (migrate_postcopy_ram()) {
-                flush_compressed_data(rs);
+                ram_flush_compressed_data(rs);
             }
 
             /*
@@ -3406,9 +3174,11 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
 out:
     if (ret >= 0
         && migration_is_setup_or_active(migrate_get_current()->state)) {
-        ret = multifd_send_sync_main(rs->pss[RAM_CHANNEL_PRECOPY].pss_channel);
-        if (ret < 0) {
-            return ret;
+        if (migrate_multifd_flush_after_each_section()) {
+            ret = multifd_send_sync_main(rs->pss[RAM_CHANNEL_PRECOPY].pss_channel);
+            if (ret < 0) {
+                return ret;
+            }
         }
 
         qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
@@ -3468,7 +3238,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
         }
         qemu_mutex_unlock(&rs->bitmap_mutex);
 
-        flush_compressed_data(rs);
+        ram_flush_compressed_data(rs);
         ram_control_after_iterate(f, RAM_CONTROL_FINISH);
     }
 
@@ -3481,6 +3251,9 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
         return ret;
     }
 
+    if (!migrate_multifd_flush_after_each_section()) {
+        qemu_put_be64(f, RAM_SAVE_FLAG_MULTIFD_FLUSH);
+    }
     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
     qemu_fflush(f);
 
@@ -3635,6 +3408,18 @@ static ram_addr_t host_page_offset_from_ram_block_offset(RAMBlock *block,
     return ((uintptr_t)block->host + offset) & (block->page_size - 1);
 }
 
+void colo_record_bitmap(RAMBlock *block, ram_addr_t *normal, uint32_t pages)
+{
+    qemu_mutex_lock(&ram_state->bitmap_mutex);
+    for (int i = 0; i < pages; i++) {
+        ram_addr_t offset = normal[i];
+        ram_state->migration_dirty_pages += !test_and_set_bit(
+                                                offset >> TARGET_PAGE_BITS,
+                                                block->bmap);
+    }
+    qemu_mutex_unlock(&ram_state->bitmap_mutex);
+}
+
 static inline void *colo_cache_from_block_offset(RAMBlock *block,
                              ram_addr_t offset, bool record_bitmap)
 {
@@ -3652,9 +3437,8 @@ static inline void *colo_cache_from_block_offset(RAMBlock *block,
     * It help us to decide which pages in ram cache should be flushed
     * into VM's RAM later.
     */
-    if (record_bitmap &&
-        !test_and_set_bit(offset >> TARGET_PAGE_BITS, block->bmap)) {
-        ram_state->migration_dirty_pages++;
+    if (record_bitmap) {
+        colo_record_bitmap(block, &offset, 1);
     }
     return block->colo_cache + offset;
 }
@@ -3676,192 +3460,6 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
     }
 }
 
-/* return the size after decompression, or negative value on error */
-static int
-qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
-                     const uint8_t *source, size_t source_len)
-{
-    int err;
-
-    err = inflateReset(stream);
-    if (err != Z_OK) {
-        return -1;
-    }
-
-    stream->avail_in = source_len;
-    stream->next_in = (uint8_t *)source;
-    stream->avail_out = dest_len;
-    stream->next_out = dest;
-
-    err = inflate(stream, Z_NO_FLUSH);
-    if (err != Z_STREAM_END) {
-        return -1;
-    }
-
-    return stream->total_out;
-}
-
-static void *do_data_decompress(void *opaque)
-{
-    DecompressParam *param = opaque;
-    unsigned long pagesize;
-    uint8_t *des;
-    int len, ret;
-
-    qemu_mutex_lock(&param->mutex);
-    while (!param->quit) {
-        if (param->des) {
-            des = param->des;
-            len = param->len;
-            param->des = 0;
-            qemu_mutex_unlock(&param->mutex);
-
-            pagesize = TARGET_PAGE_SIZE;
-
-            ret = qemu_uncompress_data(&param->stream, des, pagesize,
-                                       param->compbuf, len);
-            if (ret < 0 && migrate_get_current()->decompress_error_check) {
-                error_report("decompress data failed");
-                qemu_file_set_error(decomp_file, ret);
-            }
-
-            qemu_mutex_lock(&decomp_done_lock);
-            param->done = true;
-            qemu_cond_signal(&decomp_done_cond);
-            qemu_mutex_unlock(&decomp_done_lock);
-
-            qemu_mutex_lock(&param->mutex);
-        } else {
-            qemu_cond_wait(&param->cond, &param->mutex);
-        }
-    }
-    qemu_mutex_unlock(&param->mutex);
-
-    return NULL;
-}
-
-static int wait_for_decompress_done(void)
-{
-    int idx, thread_count;
-
-    if (!migrate_use_compression()) {
-        return 0;
-    }
-
-    thread_count = migrate_decompress_threads();
-    qemu_mutex_lock(&decomp_done_lock);
-    for (idx = 0; idx < thread_count; idx++) {
-        while (!decomp_param[idx].done) {
-            qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
-        }
-    }
-    qemu_mutex_unlock(&decomp_done_lock);
-    return qemu_file_get_error(decomp_file);
-}
-
-static void compress_threads_load_cleanup(void)
-{
-    int i, thread_count;
-
-    if (!migrate_use_compression()) {
-        return;
-    }
-    thread_count = migrate_decompress_threads();
-    for (i = 0; i < thread_count; i++) {
-        /*
-         * we use it as a indicator which shows if the thread is
-         * properly init'd or not
-         */
-        if (!decomp_param[i].compbuf) {
-            break;
-        }
-
-        qemu_mutex_lock(&decomp_param[i].mutex);
-        decomp_param[i].quit = true;
-        qemu_cond_signal(&decomp_param[i].cond);
-        qemu_mutex_unlock(&decomp_param[i].mutex);
-    }
-    for (i = 0; i < thread_count; i++) {
-        if (!decomp_param[i].compbuf) {
-            break;
-        }
-
-        qemu_thread_join(decompress_threads + i);
-        qemu_mutex_destroy(&decomp_param[i].mutex);
-        qemu_cond_destroy(&decomp_param[i].cond);
-        inflateEnd(&decomp_param[i].stream);
-        g_free(decomp_param[i].compbuf);
-        decomp_param[i].compbuf = NULL;
-    }
-    g_free(decompress_threads);
-    g_free(decomp_param);
-    decompress_threads = NULL;
-    decomp_param = NULL;
-    decomp_file = NULL;
-}
-
-static int compress_threads_load_setup(QEMUFile *f)
-{
-    int i, thread_count;
-
-    if (!migrate_use_compression()) {
-        return 0;
-    }
-
-    thread_count = migrate_decompress_threads();
-    decompress_threads = g_new0(QemuThread, thread_count);
-    decomp_param = g_new0(DecompressParam, thread_count);
-    qemu_mutex_init(&decomp_done_lock);
-    qemu_cond_init(&decomp_done_cond);
-    decomp_file = f;
-    for (i = 0; i < thread_count; i++) {
-        if (inflateInit(&decomp_param[i].stream) != Z_OK) {
-            goto exit;
-        }
-
-        decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
-        qemu_mutex_init(&decomp_param[i].mutex);
-        qemu_cond_init(&decomp_param[i].cond);
-        decomp_param[i].done = true;
-        decomp_param[i].quit = false;
-        qemu_thread_create(decompress_threads + i, "decompress",
-                           do_data_decompress, decomp_param + i,
-                           QEMU_THREAD_JOINABLE);
-    }
-    return 0;
-exit:
-    compress_threads_load_cleanup();
-    return -1;
-}
-
-static void decompress_data_with_multi_threads(QEMUFile *f,
-                                               void *host, int len)
-{
-    int idx, thread_count;
-
-    thread_count = migrate_decompress_threads();
-    QEMU_LOCK_GUARD(&decomp_done_lock);
-    while (true) {
-        for (idx = 0; idx < thread_count; idx++) {
-            if (decomp_param[idx].done) {
-                decomp_param[idx].done = false;
-                qemu_mutex_lock(&decomp_param[idx].mutex);
-                qemu_get_buffer(f, decomp_param[idx].compbuf, len);
-                decomp_param[idx].des = host;
-                decomp_param[idx].len = len;
-                qemu_cond_signal(&decomp_param[idx].cond);
-                qemu_mutex_unlock(&decomp_param[idx].mutex);
-                break;
-            }
-        }
-        if (idx < thread_count) {
-            break;
-        } else {
-            qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
-        }
-    }
-}
-
 static void colo_init_ram_state(void)
 {
     ram_state_init(&ram_state);
@@ -3971,10 +3569,6 @@ void colo_release_ram_cache(void)
  */
 static int ram_load_setup(QEMUFile *f, void *opaque)
 {
-    if (compress_threads_load_setup(f)) {
-        return -1;
-    }
-
     xbzrle_load_setup();
     ramblock_recv_map_init();
 
@@ -3990,7 +3584,6 @@ static int ram_load_cleanup(void *opaque)
     }
 
     xbzrle_load_cleanup();
-    compress_threads_load_cleanup();
 
     RAMBLOCK_FOREACH_NOT_IGNORED(rb) {
         g_free(rb->receivedmap);
@@ -4162,10 +3755,14 @@ int ram_load_postcopy(QEMUFile *f, int channel)
             }
             decompress_data_with_multi_threads(f, page_buffer, len);
             break;
-
+        case RAM_SAVE_FLAG_MULTIFD_FLUSH:
+            multifd_recv_sync_main();
+            break;
         case RAM_SAVE_FLAG_EOS:
             /* normal exit */
-            multifd_recv_sync_main();
+            if (migrate_multifd_flush_after_each_section()) {
+                multifd_recv_sync_main();
+            }
             break;
         default:
             error_report("Unknown combination of migration flags: 0x%x"
@@ -4217,6 +3814,7 @@ void colo_flush_ram_cache(void)
     unsigned long offset = 0;
 
     memory_global_dirty_log_sync();
+    qemu_mutex_lock(&ram_state->bitmap_mutex);
     WITH_RCU_READ_LOCK_GUARD() {
         RAMBLOCK_FOREACH_NOT_IGNORED(block) {
             ramblock_sync_dirty_bitmap(ram_state, block);
@@ -4251,6 +3849,7 @@ void colo_flush_ram_cache(void)
             }
         }
     }
+    qemu_mutex_unlock(&ram_state->bitmap_mutex);
     trace_colo_flush_ram_cache_end();
 }
 
@@ -4270,7 +3869,7 @@ static int ram_load_precopy(QEMUFile *f)
     int flags = 0, ret = 0, invalid_flags = 0, len = 0, i = 0;
     /* ADVISE is earlier, it shows the source has the postcopy capability on */
     bool postcopy_advised = migration_incoming_postcopy_advised();
-    if (!migrate_use_compression()) {
+    if (!migrate_compress()) {
         invalid_flags |= RAM_SAVE_FLAG_COMPRESS_PAGE;
     }
 
@@ -4434,18 +4033,21 @@ static int ram_load_precopy(QEMUFile *f)
                 break;
             }
             break;
+        case RAM_SAVE_FLAG_MULTIFD_FLUSH:
+            multifd_recv_sync_main();
+            break;
         case RAM_SAVE_FLAG_EOS:
             /* normal exit */
-            multifd_recv_sync_main();
+            if (migrate_multifd_flush_after_each_section()) {
+                multifd_recv_sync_main();
+            }
+            break;
+        case RAM_SAVE_FLAG_HOOK:
+            ram_control_load_hook(f, RAM_CONTROL_HOOK, NULL);
             break;
         default:
-            if (flags & RAM_SAVE_FLAG_HOOK) {
-                ram_control_load_hook(f, RAM_CONTROL_HOOK, NULL);
-            } else {
-                error_report("Unknown combination of migration flags: 0x%x",
-                             flags);
-                ret = -EINVAL;
-            }
+            error_report("Unknown combination of migration flags: 0x%x", flags);
+            ret = -EINVAL;
         }
         if (!ret) {
             ret = qemu_file_get_error(f);