summary refs log tree commit diff stats
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--migration/multifd.c39
-rw-r--r--migration/multifd.h13
2 files changed, 36 insertions, 16 deletions
diff --git a/migration/multifd.c b/migration/multifd.c
index 8bb1fd95cf..ea25bbe6bd 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -442,8 +442,8 @@ static int multifd_send_pages(void)
         }
         p = &multifd_send_state->params[i];
         qemu_mutex_lock(&p->mutex);
-        if (!p->pending_job) {
-            p->pending_job++;
+        if (qatomic_read(&p->pending_job) == false) {
+            qatomic_set(&p->pending_job, true);
             next_channel = (i + 1) % migrate_multifd_channels();
             break;
         }
@@ -631,8 +631,12 @@ int multifd_send_sync_main(void)
 
         qemu_mutex_lock(&p->mutex);
         p->packet_num = multifd_send_state->packet_num++;
-        p->flags |= MULTIFD_FLAG_SYNC;
-        p->pending_job++;
+        /*
+         * We should be the only user so far, so not possible to be set by
+         * others concurrently.
+         */
+        assert(qatomic_read(&p->pending_sync) == false);
+        qatomic_set(&p->pending_sync, true);
         qemu_mutex_unlock(&p->mutex);
         qemu_sem_post(&p->sem);
     }
@@ -685,10 +689,9 @@ static void *multifd_send_thread(void *opaque)
         }
         qemu_mutex_lock(&p->mutex);
 
-        if (p->pending_job) {
+        if (qatomic_read(&p->pending_job)) {
             uint64_t packet_num = p->packet_num;
             MultiFDPages_t *pages = p->pages;
-            uint32_t flags;
 
             if (use_zero_copy_send) {
                 p->iovs_num = 0;
@@ -704,13 +707,11 @@ static void *multifd_send_thread(void *opaque)
                 }
             }
             multifd_send_fill_packet(p);
-            flags = p->flags;
-            p->flags = 0;
             p->num_packets++;
             p->total_normal_pages += pages->num;
             qemu_mutex_unlock(&p->mutex);
 
-            trace_multifd_send(p->id, packet_num, pages->num, flags,
+            trace_multifd_send(p->id, packet_num, pages->num, p->flags,
                                p->next_packet_size);
 
             if (use_zero_copy_send) {
@@ -738,12 +739,23 @@ static void *multifd_send_thread(void *opaque)
             multifd_pages_reset(p->pages);
             p->next_packet_size = 0;
             qemu_mutex_lock(&p->mutex);
-            p->pending_job--;
+            qatomic_set(&p->pending_job, false);
             qemu_mutex_unlock(&p->mutex);
-
-            if (flags & MULTIFD_FLAG_SYNC) {
-                qemu_sem_post(&p->sem_sync);
+        } else if (qatomic_read(&p->pending_sync)) {
+            p->flags = MULTIFD_FLAG_SYNC;
+            multifd_send_fill_packet(p);
+            ret = qio_channel_write_all(p->c, (void *)p->packet,
+                                        p->packet_len, &local_err);
+            if (ret != 0) {
+                qemu_mutex_unlock(&p->mutex);
+                break;
             }
+            /* p->next_packet_size will always be zero for a SYNC packet */
+            stat64_add(&mig_stats.multifd_bytes, p->packet_len);
+            p->flags = 0;
+            qatomic_set(&p->pending_sync, false);
+            qemu_mutex_unlock(&p->mutex);
+            qemu_sem_post(&p->sem_sync);
         } else {
             qemu_mutex_unlock(&p->mutex);
             /* sometimes there are spurious wakeups */
@@ -907,7 +919,6 @@ int multifd_save_setup(Error **errp)
         qemu_mutex_init(&p->mutex);
         qemu_sem_init(&p->sem, 0);
         qemu_sem_init(&p->sem_sync, 0);
-        p->pending_job = 0;
         p->id = i;
         p->pages = multifd_pages_init(page_count);
         p->packet_len = sizeof(MultiFDPacket_t)
diff --git a/migration/multifd.h b/migration/multifd.h
index 3920bdbcf1..08f26ef3fe 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -99,8 +99,17 @@ typedef struct {
     uint32_t flags;
     /* global number of generated multifd packets */
     uint64_t packet_num;
-    /* thread has work to do */
-    int pending_job;
+    /*
+     * The sender thread has work to do if either of below boolean is set.
+     *
+     * @pending_job:  a job is pending
+     * @pending_sync: a sync request is pending
+     *
+     * For both of these fields, they're only set by the requesters, and
+     * cleared by the multifd sender threads.
+     */
+    bool pending_job;
+    bool pending_sync;
     /* array of pages to sent.
      * The owner of 'pages' depends of 'pending_job' value:
      * pending_job == 0 -> migration_thread can use it.