summary refs log tree commit diff stats
path: root/migration/ram.c
diff options
context:
space:
mode:
Diffstat (limited to 'migration/ram.c')
-rw-r--r--migration/ram.c202
1 files changed, 139 insertions, 63 deletions
diff --git a/migration/ram.c b/migration/ram.c
index fa79d0a5b9..79c89425a3 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -341,6 +341,7 @@ typedef struct PageSearchStatus PageSearchStatus;
 struct CompressParam {
     bool done;
     bool quit;
+    bool zero_page;
     QEMUFile *file;
     QemuMutex mutex;
     QemuCond cond;
@@ -382,14 +383,15 @@ static QemuThread *decompress_threads;
 static QemuMutex decomp_done_lock;
 static QemuCond decomp_done_cond;
 
-static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
-                                ram_addr_t offset, uint8_t *source_buf);
+static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
+                                 ram_addr_t offset, uint8_t *source_buf);
 
 static void *do_data_compress(void *opaque)
 {
     CompressParam *param = opaque;
     RAMBlock *block;
     ram_addr_t offset;
+    bool zero_page;
 
     qemu_mutex_lock(&param->mutex);
     while (!param->quit) {
@@ -399,11 +401,12 @@ static void *do_data_compress(void *opaque)
             param->block = NULL;
             qemu_mutex_unlock(&param->mutex);
 
-            do_compress_ram_page(param->file, &param->stream, block, offset,
-                                 param->originbuf);
+            zero_page = do_compress_ram_page(param->file, &param->stream,
+                                             block, offset, param->originbuf);
 
             qemu_mutex_lock(&comp_done_lock);
             param->done = true;
+            param->zero_page = zero_page;
             qemu_cond_signal(&comp_done_cond);
             qemu_mutex_unlock(&comp_done_lock);
 
@@ -989,6 +992,7 @@ static void *multifd_send_thread(void *opaque)
     int ret;
 
     trace_multifd_send_thread_start(p->id);
+    rcu_register_thread();
 
     if (multifd_send_initial_packet(p, &local_err) < 0) {
         goto out;
@@ -1051,6 +1055,7 @@ out:
     p->running = false;
     qemu_mutex_unlock(&p->mutex);
 
+    rcu_unregister_thread();
     trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages);
 
     return NULL;
@@ -1220,6 +1225,7 @@ static void *multifd_recv_thread(void *opaque)
     int ret;
 
     trace_multifd_recv_thread_start(p->id);
+    rcu_register_thread();
 
     while (true) {
         uint32_t used;
@@ -1266,6 +1272,7 @@ static void *multifd_recv_thread(void *opaque)
     p->running = false;
     qemu_mutex_unlock(&p->mutex);
 
+    rcu_unregister_thread();
     trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages);
 
     return NULL;
@@ -1391,13 +1398,15 @@ static void mig_throttle_guest_down(void)
     MigrationState *s = migrate_get_current();
     uint64_t pct_initial = s->parameters.cpu_throttle_initial;
     uint64_t pct_icrement = s->parameters.cpu_throttle_increment;
+    int pct_max = s->parameters.max_cpu_throttle;
 
     /* We have not started throttling yet. Let's start it. */
     if (!cpu_throttle_active()) {
         cpu_throttle_set(pct_initial);
     } else {
         /* Throttling already on, just increase the rate */
-        cpu_throttle_set(cpu_throttle_get_percentage() + pct_icrement);
+        cpu_throttle_set(MIN(cpu_throttle_get_percentage() + pct_icrement,
+                         pct_max));
     }
 }
 
@@ -1666,6 +1675,31 @@ static void migration_bitmap_sync(RAMState *rs)
 }
 
 /**
+ * save_zero_page_to_file: send the zero page to the file
+ *
+ * Returns the size of data written to the file, 0 means the page is not
+ * a zero page
+ *
+ * @rs: current RAM state
+ * @file: the file where the data is saved
+ * @block: block that contains the page we want to send
+ * @offset: offset inside the block for the page
+ */
+static int save_zero_page_to_file(RAMState *rs, QEMUFile *file,
+                                  RAMBlock *block, ram_addr_t offset)
+{
+    uint8_t *p = block->host + offset;
+    int len = 0;
+
+    if (is_zero_range(p, TARGET_PAGE_SIZE)) {
+        len += save_page_header(rs, file, block, offset | RAM_SAVE_FLAG_ZERO);
+        qemu_put_byte(file, 0);
+        len += 1;
+    }
+    return len;
+}
+
+/**
  * save_zero_page: send the zero page to the stream
  *
  * Returns the number of pages written.
@@ -1676,19 +1710,14 @@ static void migration_bitmap_sync(RAMState *rs)
  */
 static int save_zero_page(RAMState *rs, RAMBlock *block, ram_addr_t offset)
 {
-    uint8_t *p = block->host + offset;
-    int pages = -1;
+    int len = save_zero_page_to_file(rs, rs->f, block, offset);
 
-    if (is_zero_range(p, TARGET_PAGE_SIZE)) {
+    if (len) {
         ram_counters.duplicate++;
-        ram_counters.transferred +=
-            save_page_header(rs, rs->f, block, offset | RAM_SAVE_FLAG_ZERO);
-        qemu_put_byte(rs->f, 0);
-        ram_counters.transferred += 1;
-        pages = 1;
+        ram_counters.transferred += len;
+        return 1;
     }
-
-    return pages;
+    return -1;
 }
 
 static void ram_release_pages(const char *rbname, uint64_t offset, int pages)
@@ -1823,15 +1852,20 @@ static int ram_save_multifd_page(RAMState *rs, RAMBlock *block,
     return 1;
 }
 
-static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
-                                ram_addr_t offset, uint8_t *source_buf)
+static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
+                                 ram_addr_t offset, uint8_t *source_buf)
 {
     RAMState *rs = ram_state;
-    int bytes_sent, blen;
     uint8_t *p = block->host + (offset & TARGET_PAGE_MASK);
+    bool zero_page = false;
+    int ret;
+
+    if (save_zero_page_to_file(rs, f, block, offset)) {
+        zero_page = true;
+        goto exit;
+    }
 
-    bytes_sent = save_page_header(rs, f, block, offset |
-                                  RAM_SAVE_FLAG_COMPRESS_PAGE);
+    save_page_header(rs, f, block, offset | RAM_SAVE_FLAG_COMPRESS_PAGE);
 
     /*
      * copy it to a internal buffer to avoid it being modified by VM
@@ -1839,17 +1873,25 @@ static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
      * decompression
      */
     memcpy(source_buf, p, TARGET_PAGE_SIZE);
-    blen = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE);
-    if (blen < 0) {
-        bytes_sent = 0;
-        qemu_file_set_error(migrate_get_current()->to_dst_file, blen);
+    ret = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE);
+    if (ret < 0) {
+        qemu_file_set_error(migrate_get_current()->to_dst_file, ret);
         error_report("compressed data failed!");
-    } else {
-        bytes_sent += blen;
-        ram_release_pages(block->idstr, offset & TARGET_PAGE_MASK, 1);
+        return false;
     }
 
-    return bytes_sent;
+exit:
+    ram_release_pages(block->idstr, offset & TARGET_PAGE_MASK, 1);
+    return zero_page;
+}
+
+static void
+update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
+{
+    if (param->zero_page) {
+        ram_counters.duplicate++;
+    }
+    ram_counters.transferred += bytes_xmit;
 }
 
 static void flush_compressed_data(RAMState *rs)
@@ -1873,7 +1915,12 @@ static void flush_compressed_data(RAMState *rs)
         qemu_mutex_lock(&comp_param[idx].mutex);
         if (!comp_param[idx].quit) {
             len = qemu_put_qemu_file(rs->f, comp_param[idx].file);
-            ram_counters.transferred += len;
+            /*
+             * it's safe to fetch zero_page without holding comp_done_lock
+             * as there is no further request submitted to the thread,
+             * i.e, the thread should be waiting for a request at this point.
+             */
+            update_compress_thread_counts(&comp_param[idx], len);
         }
         qemu_mutex_unlock(&comp_param[idx].mutex);
     }
@@ -1890,30 +1937,33 @@ static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block,
                                            ram_addr_t offset)
 {
     int idx, thread_count, bytes_xmit = -1, pages = -1;
+    bool wait = migrate_compress_wait_thread();
 
     thread_count = migrate_compress_threads();
     qemu_mutex_lock(&comp_done_lock);
-    while (true) {
-        for (idx = 0; idx < thread_count; idx++) {
-            if (comp_param[idx].done) {
-                comp_param[idx].done = false;
-                bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file);
-                qemu_mutex_lock(&comp_param[idx].mutex);
-                set_compress_params(&comp_param[idx], block, offset);
-                qemu_cond_signal(&comp_param[idx].cond);
-                qemu_mutex_unlock(&comp_param[idx].mutex);
-                pages = 1;
-                ram_counters.normal++;
-                ram_counters.transferred += bytes_xmit;
-                break;
-            }
-        }
-        if (pages > 0) {
+retry:
+    for (idx = 0; idx < thread_count; idx++) {
+        if (comp_param[idx].done) {
+            comp_param[idx].done = false;
+            bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file);
+            qemu_mutex_lock(&comp_param[idx].mutex);
+            set_compress_params(&comp_param[idx], block, offset);
+            qemu_cond_signal(&comp_param[idx].cond);
+            qemu_mutex_unlock(&comp_param[idx].mutex);
+            pages = 1;
+            update_compress_thread_counts(&comp_param[idx], bytes_xmit);
             break;
-        } else {
-            qemu_cond_wait(&comp_done_cond, &comp_done_lock);
         }
     }
+
+    /*
+     * wait for the free thread if the user specifies 'compress-wait-thread',
+     * otherwise we will post the page out in the main thread as normal page.
+     */
+    if (pages < 0 && wait) {
+        qemu_cond_wait(&comp_done_cond, &comp_done_lock);
+        goto retry;
+    }
     qemu_mutex_unlock(&comp_done_lock);
 
     return pages;
@@ -1983,6 +2033,10 @@ static RAMBlock *unqueue_page(RAMState *rs, ram_addr_t *offset)
 {
     RAMBlock *block = NULL;
 
+    if (QSIMPLEQ_EMPTY_ATOMIC(&rs->src_page_requests)) {
+        return NULL;
+    }
+
     qemu_mutex_lock(&rs->src_page_req_mutex);
     if (!QSIMPLEQ_EMPTY(&rs->src_page_requests)) {
         struct RAMSrcPageRequest *entry =
@@ -2175,6 +2229,39 @@ static bool save_page_use_compression(RAMState *rs)
     return false;
 }
 
+/*
+ * try to compress the page before posting it out, return true if the page
+ * has been properly handled by compression, otherwise needs other
+ * paths to handle it
+ */
+static bool save_compress_page(RAMState *rs, RAMBlock *block, ram_addr_t offset)
+{
+    if (!save_page_use_compression(rs)) {
+        return false;
+    }
+
+    /*
+     * When starting the process of a new block, the first page of
+     * the block should be sent out before other pages in the same
+     * block, and all the pages in last block should have been sent
+     * out, keeping this order is important, because the 'cont' flag
+     * is used to avoid resending the block name.
+     *
+     * We post the fist page as normal page as compression will take
+     * much CPU resource.
+     */
+    if (block != rs->last_sent_block) {
+        flush_compressed_data(rs);
+        return false;
+    }
+
+    if (compress_page_with_multi_thread(rs, block, offset) > 0) {
+        return true;
+    }
+
+    return false;
+}
+
 /**
  * ram_save_target_page: save one target page
  *
@@ -2195,15 +2282,8 @@ static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss,
         return res;
     }
 
-    /*
-     * When starting the process of a new block, the first page of
-     * the block should be sent out before other pages in the same
-     * block, and all the pages in last block should have been sent
-     * out, keeping this order is important, because the 'cont' flag
-     * is used to avoid resending the block name.
-     */
-    if (block != rs->last_sent_block && save_page_use_compression(rs)) {
-            flush_compressed_data(rs);
+    if (save_compress_page(rs, block, offset)) {
+        return 1;
     }
 
     res = save_zero_page(rs, block, offset);
@@ -2221,14 +2301,10 @@ static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss,
     }
 
     /*
-     * Make sure the first page is sent out before other pages.
-     *
-     * we post it as normal page as compression will take much
-     * CPU resource.
+     * do not use multifd for compression as the first page in the new
+     * block should be posted out before sending the compressed page
      */
-    if (block == rs->last_sent_block && save_page_use_compression(rs)) {
-        return compress_page_with_multi_thread(rs, block, offset);
-    } else if (migrate_use_multifd()) {
+    if (!save_page_use_compression(rs) && migrate_use_multifd()) {
         return ram_save_multifd_page(rs, block, offset);
     }