summary refs log tree commit diff stats
path: root/migration/ram.c
diff options
context:
space:
mode:
Diffstat (limited to 'migration/ram.c')
-rw-r--r--migration/ram.c482
1 files changed, 286 insertions, 196 deletions
diff --git a/migration/ram.c b/migration/ram.c
index 0e90efa092..912810c18e 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -269,6 +269,10 @@ struct CompressParam {
     QemuCond cond;
     RAMBlock *block;
     ram_addr_t offset;
+
+    /* internally used fields */
+    z_stream stream;
+    uint8_t *originbuf;
 };
 typedef struct CompressParam CompressParam;
 
@@ -280,6 +284,7 @@ struct DecompressParam {
     void *des;
     uint8_t *compbuf;
     int len;
+    z_stream stream;
 };
 typedef struct DecompressParam DecompressParam;
 
@@ -294,13 +299,14 @@ static QemuCond comp_done_cond;
 /* The empty QEMUFileOps will be used by file in CompressParam */
 static const QEMUFileOps empty_ops = { };
 
+static QEMUFile *decomp_file;
 static DecompressParam *decomp_param;
 static QemuThread *decompress_threads;
 static QemuMutex decomp_done_lock;
 static QemuCond decomp_done_cond;
 
-static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
-                                ram_addr_t offset);
+static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
+                                ram_addr_t offset, uint8_t *source_buf);
 
 static void *do_data_compress(void *opaque)
 {
@@ -316,7 +322,8 @@ static void *do_data_compress(void *opaque)
             param->block = NULL;
             qemu_mutex_unlock(&param->mutex);
 
-            do_compress_ram_page(param->file, block, offset);
+            do_compress_ram_page(param->file, &param->stream, block, offset,
+                                 param->originbuf);
 
             qemu_mutex_lock(&comp_done_lock);
             param->done = true;
@@ -357,10 +364,20 @@ static void compress_threads_save_cleanup(void)
     terminate_compression_threads();
     thread_count = migrate_compress_threads();
     for (i = 0; i < thread_count; i++) {
+        /*
+         * we use it as a indicator which shows if the thread is
+         * properly init'd or not
+         */
+        if (!comp_param[i].file) {
+            break;
+        }
         qemu_thread_join(compress_threads + i);
-        qemu_fclose(comp_param[i].file);
         qemu_mutex_destroy(&comp_param[i].mutex);
         qemu_cond_destroy(&comp_param[i].cond);
+        deflateEnd(&comp_param[i].stream);
+        g_free(comp_param[i].originbuf);
+        qemu_fclose(comp_param[i].file);
+        comp_param[i].file = NULL;
     }
     qemu_mutex_destroy(&comp_done_lock);
     qemu_cond_destroy(&comp_done_cond);
@@ -370,12 +387,12 @@ static void compress_threads_save_cleanup(void)
     comp_param = NULL;
 }
 
-static void compress_threads_save_setup(void)
+static int compress_threads_save_setup(void)
 {
     int i, thread_count;
 
     if (!migrate_use_compression()) {
-        return;
+        return 0;
     }
     thread_count = migrate_compress_threads();
     compress_threads = g_new0(QemuThread, thread_count);
@@ -383,6 +400,17 @@ static void compress_threads_save_setup(void)
     qemu_cond_init(&comp_done_cond);
     qemu_mutex_init(&comp_done_lock);
     for (i = 0; i < thread_count; i++) {
+        comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
+        if (!comp_param[i].originbuf) {
+            goto exit;
+        }
+
+        if (deflateInit(&comp_param[i].stream,
+                        migrate_compress_level()) != Z_OK) {
+            g_free(comp_param[i].originbuf);
+            goto exit;
+        }
+
         /* comp_param[i].file is just used as a dummy buffer to save data,
          * set its ops to empty.
          */
@@ -395,6 +423,11 @@ static void compress_threads_save_setup(void)
                            do_data_compress, comp_param + i,
                            QEMU_THREAD_JOINABLE);
     }
+    return 0;
+
+exit:
+    compress_threads_save_cleanup();
+    return -1;
 }
 
 /* Multiple fd's */
@@ -941,6 +974,72 @@ static void ram_release_pages(const char *rbname, uint64_t offset, int pages)
     ram_discard_range(rbname, offset, pages << TARGET_PAGE_BITS);
 }
 
+/*
+ * @pages: the number of pages written by the control path,
+ *        < 0 - error
+ *        > 0 - number of pages written
+ *
+ * Return true if the pages has been saved, otherwise false is returned.
+ */
+static bool control_save_page(RAMState *rs, RAMBlock *block, ram_addr_t offset,
+                              int *pages)
+{
+    uint64_t bytes_xmit = 0;
+    int ret;
+
+    *pages = -1;
+    ret = ram_control_save_page(rs->f, block->offset, offset, TARGET_PAGE_SIZE,
+                                &bytes_xmit);
+    if (ret == RAM_SAVE_CONTROL_NOT_SUPP) {
+        return false;
+    }
+
+    if (bytes_xmit) {
+        ram_counters.transferred += bytes_xmit;
+        *pages = 1;
+    }
+
+    if (ret == RAM_SAVE_CONTROL_DELAYED) {
+        return true;
+    }
+
+    if (bytes_xmit > 0) {
+        ram_counters.normal++;
+    } else if (bytes_xmit == 0) {
+        ram_counters.duplicate++;
+    }
+
+    return true;
+}
+
+/*
+ * directly send the page to the stream
+ *
+ * Returns the number of pages written.
+ *
+ * @rs: current RAM state
+ * @block: block that contains the page we want to send
+ * @offset: offset inside the block for the page
+ * @buf: the page to be sent
+ * @async: send to page asyncly
+ */
+static int save_normal_page(RAMState *rs, RAMBlock *block, ram_addr_t offset,
+                            uint8_t *buf, bool async)
+{
+    ram_counters.transferred += save_page_header(rs, rs->f, block,
+                                                 offset | RAM_SAVE_FLAG_PAGE);
+    if (async) {
+        qemu_put_buffer_async(rs->f, buf, TARGET_PAGE_SIZE,
+                              migrate_release_ram() &
+                              migration_in_postcopy());
+    } else {
+        qemu_put_buffer(rs->f, buf, TARGET_PAGE_SIZE);
+    }
+    ram_counters.transferred += TARGET_PAGE_SIZE;
+    ram_counters.normal++;
+    return 1;
+}
+
 /**
  * ram_save_page: send the given page to the stream
  *
@@ -957,73 +1056,31 @@ static void ram_release_pages(const char *rbname, uint64_t offset, int pages)
 static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
 {
     int pages = -1;
-    uint64_t bytes_xmit;
-    ram_addr_t current_addr;
     uint8_t *p;
-    int ret;
     bool send_async = true;
     RAMBlock *block = pss->block;
     ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
+    ram_addr_t current_addr = block->offset + offset;
 
     p = block->host + offset;
     trace_ram_save_page(block->idstr, (uint64_t)offset, p);
 
-    /* In doubt sent page as normal */
-    bytes_xmit = 0;
-    ret = ram_control_save_page(rs->f, block->offset,
-                           offset, TARGET_PAGE_SIZE, &bytes_xmit);
-    if (bytes_xmit) {
-        ram_counters.transferred += bytes_xmit;
-        pages = 1;
-    }
-
     XBZRLE_cache_lock();
-
-    current_addr = block->offset + offset;
-
-    if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
-        if (ret != RAM_SAVE_CONTROL_DELAYED) {
-            if (bytes_xmit > 0) {
-                ram_counters.normal++;
-            } else if (bytes_xmit == 0) {
-                ram_counters.duplicate++;
-            }
-        }
-    } else {
-        pages = save_zero_page(rs, block, offset);
-        if (pages > 0) {
-            /* Must let xbzrle know, otherwise a previous (now 0'd) cached
-             * page would be stale
+    if (!rs->ram_bulk_stage && !migration_in_postcopy() &&
+        migrate_use_xbzrle()) {
+        pages = save_xbzrle_page(rs, &p, current_addr, block,
+                                 offset, last_stage);
+        if (!last_stage) {
+            /* Can't send this cached data async, since the cache page
+             * might get updated before it gets to the wire
              */
-            xbzrle_cache_zero_page(rs, current_addr);
-            ram_release_pages(block->idstr, offset, pages);
-        } else if (!rs->ram_bulk_stage &&
-                   !migration_in_postcopy() && migrate_use_xbzrle()) {
-            pages = save_xbzrle_page(rs, &p, current_addr, block,
-                                     offset, last_stage);
-            if (!last_stage) {
-                /* Can't send this cached data async, since the cache page
-                 * might get updated before it gets to the wire
-                 */
-                send_async = false;
-            }
+            send_async = false;
         }
     }
 
     /* XBZRLE overflow or normal page */
     if (pages == -1) {
-        ram_counters.transferred +=
-            save_page_header(rs, rs->f, block, offset | RAM_SAVE_FLAG_PAGE);
-        if (send_async) {
-            qemu_put_buffer_async(rs->f, p, TARGET_PAGE_SIZE,
-                                  migrate_release_ram() &
-                                  migration_in_postcopy());
-        } else {
-            qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE);
-        }
-        ram_counters.transferred += TARGET_PAGE_SIZE;
-        pages = 1;
-        ram_counters.normal++;
+        pages = save_normal_page(rs, block, offset, p, send_async);
     }
 
     XBZRLE_cache_unlock();
@@ -1031,8 +1088,8 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
     return pages;
 }
 
-static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
-                                ram_addr_t offset)
+static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
+                                ram_addr_t offset, uint8_t *source_buf)
 {
     RAMState *rs = ram_state;
     int bytes_sent, blen;
@@ -1040,8 +1097,14 @@ static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
 
     bytes_sent = save_page_header(rs, f, block, offset |
                                   RAM_SAVE_FLAG_COMPRESS_PAGE);
-    blen = qemu_put_compression_data(f, p, TARGET_PAGE_SIZE,
-                                     migrate_compress_level());
+
+    /*
+     * copy it to a internal buffer to avoid it being modified by VM
+     * so that we can catch up the error during compression and
+     * decompression
+     */
+    memcpy(source_buf, p, TARGET_PAGE_SIZE);
+    blen = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE);
     if (blen < 0) {
         bytes_sent = 0;
         qemu_file_set_error(migrate_get_current()->to_dst_file, blen);
@@ -1122,83 +1185,6 @@ static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block,
 }
 
 /**
- * ram_save_compressed_page: compress the given page and send it to the stream
- *
- * Returns the number of pages written.
- *
- * @rs: current RAM state
- * @block: block that contains the page we want to send
- * @offset: offset inside the block for the page
- * @last_stage: if we are at the completion stage
- */
-static int ram_save_compressed_page(RAMState *rs, PageSearchStatus *pss,
-                                    bool last_stage)
-{
-    int pages = -1;
-    uint64_t bytes_xmit = 0;
-    uint8_t *p;
-    int ret, blen;
-    RAMBlock *block = pss->block;
-    ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
-
-    p = block->host + offset;
-
-    ret = ram_control_save_page(rs->f, block->offset,
-                                offset, TARGET_PAGE_SIZE, &bytes_xmit);
-    if (bytes_xmit) {
-        ram_counters.transferred += bytes_xmit;
-        pages = 1;
-    }
-    if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
-        if (ret != RAM_SAVE_CONTROL_DELAYED) {
-            if (bytes_xmit > 0) {
-                ram_counters.normal++;
-            } else if (bytes_xmit == 0) {
-                ram_counters.duplicate++;
-            }
-        }
-    } else {
-        /* When starting the process of a new block, the first page of
-         * the block should be sent out before other pages in the same
-         * block, and all the pages in last block should have been sent
-         * out, keeping this order is important, because the 'cont' flag
-         * is used to avoid resending the block name.
-         */
-        if (block != rs->last_sent_block) {
-            flush_compressed_data(rs);
-            pages = save_zero_page(rs, block, offset);
-            if (pages == -1) {
-                /* Make sure the first page is sent out before other pages */
-                bytes_xmit = save_page_header(rs, rs->f, block, offset |
-                                              RAM_SAVE_FLAG_COMPRESS_PAGE);
-                blen = qemu_put_compression_data(rs->f, p, TARGET_PAGE_SIZE,
-                                                 migrate_compress_level());
-                if (blen > 0) {
-                    ram_counters.transferred += bytes_xmit + blen;
-                    ram_counters.normal++;
-                    pages = 1;
-                } else {
-                    qemu_file_set_error(rs->f, blen);
-                    error_report("compressed data failed!");
-                }
-            }
-            if (pages > 0) {
-                ram_release_pages(block->idstr, offset, pages);
-            }
-        } else {
-            pages = save_zero_page(rs, block, offset);
-            if (pages == -1) {
-                pages = compress_page_with_multi_thread(rs, block, offset);
-            } else {
-                ram_release_pages(block->idstr, offset, pages);
-            }
-        }
-    }
-
-    return pages;
-}
-
-/**
  * find_dirty_block: find the next dirty page and update any state
  * associated with the search process.
  *
@@ -1434,44 +1420,80 @@ err:
     return -1;
 }
 
+static bool save_page_use_compression(RAMState *rs)
+{
+    if (!migrate_use_compression()) {
+        return false;
+    }
+
+    /*
+     * If xbzrle is on, stop using the data compression after first
+     * round of migration even if compression is enabled. In theory,
+     * xbzrle can do better than compression.
+     */
+    if (rs->ram_bulk_stage || !migrate_use_xbzrle()) {
+        return true;
+    }
+
+    return false;
+}
+
 /**
  * ram_save_target_page: save one target page
  *
  * Returns the number of pages written
  *
  * @rs: current RAM state
- * @ms: current migration state
  * @pss: data about the page we want to send
  * @last_stage: if we are at the completion stage
  */
 static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss,
                                 bool last_stage)
 {
-    int res = 0;
+    RAMBlock *block = pss->block;
+    ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
+    int res;
 
-    /* Check the pages is dirty and if it is send it */
-    if (migration_bitmap_clear_dirty(rs, pss->block, pss->page)) {
-        /*
-         * If xbzrle is on, stop using the data compression after first
-         * round of migration even if compression is enabled. In theory,
-         * xbzrle can do better than compression.
+    if (control_save_page(rs, block, offset, &res)) {
+        return res;
+    }
+
+    /*
+     * When starting the process of a new block, the first page of
+     * the block should be sent out before other pages in the same
+     * block, and all the pages in last block should have been sent
+     * out, keeping this order is important, because the 'cont' flag
+     * is used to avoid resending the block name.
+     */
+    if (block != rs->last_sent_block && save_page_use_compression(rs)) {
+            flush_compressed_data(rs);
+    }
+
+    res = save_zero_page(rs, block, offset);
+    if (res > 0) {
+        /* Must let xbzrle know, otherwise a previous (now 0'd) cached
+         * page would be stale
          */
-        if (migrate_use_compression() &&
-            (rs->ram_bulk_stage || !migrate_use_xbzrle())) {
-            res = ram_save_compressed_page(rs, pss, last_stage);
-        } else {
-            res = ram_save_page(rs, pss, last_stage);
+        if (!save_page_use_compression(rs)) {
+            XBZRLE_cache_lock();
+            xbzrle_cache_zero_page(rs, block->offset + offset);
+            XBZRLE_cache_unlock();
         }
+        ram_release_pages(block->idstr, offset, res);
+        return res;
+    }
 
-        if (res < 0) {
-            return res;
-        }
-        if (pss->block->unsentmap) {
-            clear_bit(pss->page, pss->block->unsentmap);
-        }
+    /*
+     * Make sure the first page is sent out before other pages.
+     *
+     * we post it as normal page as compression will take much
+     * CPU resource.
+     */
+    if (block == rs->last_sent_block && save_page_use_compression(rs)) {
+        res = compress_page_with_multi_thread(rs, block, offset);
     }
 
-    return res;
+    return ram_save_page(rs, pss, last_stage);
 }
 
 /**
@@ -1500,12 +1522,22 @@ static int ram_save_host_page(RAMState *rs, PageSearchStatus *pss,
         qemu_ram_pagesize(pss->block) >> TARGET_PAGE_BITS;
 
     do {
+        /* Check the pages is dirty and if it is send it */
+        if (!migration_bitmap_clear_dirty(rs, pss->block, pss->page)) {
+            pss->page++;
+            continue;
+        }
+
         tmppages = ram_save_target_page(rs, pss, last_stage);
         if (tmppages < 0) {
             return tmppages;
         }
 
         pages += tmppages;
+        if (pss->block->unsentmap) {
+            clear_bit(pss->page, pss->block->unsentmap);
+        }
+
         pss->page++;
     } while ((pss->page & (pagesize_bits - 1)) &&
              offset_in_ramblock(pss->block, pss->page << TARGET_PAGE_BITS));
@@ -2214,9 +2246,14 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
     RAMState **rsp = opaque;
     RAMBlock *block;
 
+    if (compress_threads_save_setup()) {
+        return -1;
+    }
+
     /* migration has already setup the bitmap, reuse it. */
     if (!migration_in_colo_state()) {
         if (ram_init_all(rsp) != 0) {
+            compress_threads_save_cleanup();
             return -1;
         }
     }
@@ -2236,7 +2273,6 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
     }
 
     rcu_read_unlock();
-    compress_threads_save_setup();
 
     ram_control_before_iterate(f, RAM_CONTROL_SETUP);
     ram_control_after_iterate(f, RAM_CONTROL_SETUP);
@@ -2501,12 +2537,37 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
     }
 }
 
+/* return the size after decompression, or negative value on error */
+static int
+qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
+                     const uint8_t *source, size_t source_len)
+{
+    int err;
+
+    err = inflateReset(stream);
+    if (err != Z_OK) {
+        return -1;
+    }
+
+    stream->avail_in = source_len;
+    stream->next_in = (uint8_t *)source;
+    stream->avail_out = dest_len;
+    stream->next_out = dest;
+
+    err = inflate(stream, Z_NO_FLUSH);
+    if (err != Z_STREAM_END) {
+        return -1;
+    }
+
+    return stream->total_out;
+}
+
 static void *do_data_decompress(void *opaque)
 {
     DecompressParam *param = opaque;
     unsigned long pagesize;
     uint8_t *des;
-    int len;
+    int len, ret;
 
     qemu_mutex_lock(&param->mutex);
     while (!param->quit) {
@@ -2517,13 +2578,13 @@ static void *do_data_decompress(void *opaque)
             qemu_mutex_unlock(&param->mutex);
 
             pagesize = TARGET_PAGE_SIZE;
-            /* uncompress() will return failed in some case, especially
-             * when the page is dirted when doing the compression, it's
-             * not a problem because the dirty page will be retransferred
-             * and uncompress() won't break the data in other pages.
-             */
-            uncompress((Bytef *)des, &pagesize,
-                       (const Bytef *)param->compbuf, len);
+
+            ret = qemu_uncompress_data(&param->stream, des, pagesize,
+                                       param->compbuf, len);
+            if (ret < 0) {
+                error_report("decompress data failed");
+                qemu_file_set_error(decomp_file, ret);
+            }
 
             qemu_mutex_lock(&decomp_done_lock);
             param->done = true;
@@ -2540,12 +2601,12 @@ static void *do_data_decompress(void *opaque)
     return NULL;
 }
 
-static void wait_for_decompress_done(void)
+static int wait_for_decompress_done(void)
 {
     int idx, thread_count;
 
     if (!migrate_use_compression()) {
-        return;
+        return 0;
     }
 
     thread_count = migrate_decompress_threads();
@@ -2556,30 +2617,7 @@ static void wait_for_decompress_done(void)
         }
     }
     qemu_mutex_unlock(&decomp_done_lock);
-}
-
-static void compress_threads_load_setup(void)
-{
-    int i, thread_count;
-
-    if (!migrate_use_compression()) {
-        return;
-    }
-    thread_count = migrate_decompress_threads();
-    decompress_threads = g_new0(QemuThread, thread_count);
-    decomp_param = g_new0(DecompressParam, thread_count);
-    qemu_mutex_init(&decomp_done_lock);
-    qemu_cond_init(&decomp_done_cond);
-    for (i = 0; i < thread_count; i++) {
-        qemu_mutex_init(&decomp_param[i].mutex);
-        qemu_cond_init(&decomp_param[i].cond);
-        decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
-        decomp_param[i].done = true;
-        decomp_param[i].quit = false;
-        qemu_thread_create(decompress_threads + i, "decompress",
-                           do_data_decompress, decomp_param + i,
-                           QEMU_THREAD_JOINABLE);
-    }
+    return qemu_file_get_error(decomp_file);
 }
 
 static void compress_threads_load_cleanup(void)
@@ -2591,21 +2629,70 @@ static void compress_threads_load_cleanup(void)
     }
     thread_count = migrate_decompress_threads();
     for (i = 0; i < thread_count; i++) {
+        /*
+         * we use it as a indicator which shows if the thread is
+         * properly init'd or not
+         */
+        if (!decomp_param[i].compbuf) {
+            break;
+        }
+
         qemu_mutex_lock(&decomp_param[i].mutex);
         decomp_param[i].quit = true;
         qemu_cond_signal(&decomp_param[i].cond);
         qemu_mutex_unlock(&decomp_param[i].mutex);
     }
     for (i = 0; i < thread_count; i++) {
+        if (!decomp_param[i].compbuf) {
+            break;
+        }
+
         qemu_thread_join(decompress_threads + i);
         qemu_mutex_destroy(&decomp_param[i].mutex);
         qemu_cond_destroy(&decomp_param[i].cond);
+        inflateEnd(&decomp_param[i].stream);
         g_free(decomp_param[i].compbuf);
+        decomp_param[i].compbuf = NULL;
     }
     g_free(decompress_threads);
     g_free(decomp_param);
     decompress_threads = NULL;
     decomp_param = NULL;
+    decomp_file = NULL;
+}
+
+static int compress_threads_load_setup(QEMUFile *f)
+{
+    int i, thread_count;
+
+    if (!migrate_use_compression()) {
+        return 0;
+    }
+
+    thread_count = migrate_decompress_threads();
+    decompress_threads = g_new0(QemuThread, thread_count);
+    decomp_param = g_new0(DecompressParam, thread_count);
+    qemu_mutex_init(&decomp_done_lock);
+    qemu_cond_init(&decomp_done_cond);
+    decomp_file = f;
+    for (i = 0; i < thread_count; i++) {
+        if (inflateInit(&decomp_param[i].stream) != Z_OK) {
+            goto exit;
+        }
+
+        decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
+        qemu_mutex_init(&decomp_param[i].mutex);
+        qemu_cond_init(&decomp_param[i].cond);
+        decomp_param[i].done = true;
+        decomp_param[i].quit = false;
+        qemu_thread_create(decompress_threads + i, "decompress",
+                           do_data_decompress, decomp_param + i,
+                           QEMU_THREAD_JOINABLE);
+    }
+    return 0;
+exit:
+    compress_threads_load_cleanup();
+    return -1;
 }
 
 static void decompress_data_with_multi_threads(QEMUFile *f,
@@ -2647,8 +2734,11 @@ static void decompress_data_with_multi_threads(QEMUFile *f,
  */
 static int ram_load_setup(QEMUFile *f, void *opaque)
 {
+    if (compress_threads_load_setup(f)) {
+        return -1;
+    }
+
     xbzrle_load_setup();
-    compress_threads_load_setup();
     ramblock_recv_map_init();
     return 0;
 }
@@ -2999,7 +3089,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
         }
     }
 
-    wait_for_decompress_done();
+    ret |= wait_for_decompress_done();
     rcu_read_unlock();
     trace_ram_load_complete(ret, seq_iter);
     return ret;