summary refs log tree commit diff stats
path: root/migration/multifd.c
diff options
context:
space:
mode:
Diffstat (limited to 'migration/multifd.c')
-rw-r--r--migration/multifd.c148
1 files changed, 70 insertions, 78 deletions
diff --git a/migration/multifd.c b/migration/multifd.c
index 3242f688e5..76b57a7177 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -86,28 +86,21 @@ static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
  */
 static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
 {
-    p->next_packet_size = p->pages->num * qemu_target_page_size();
+    MultiFDPages_t *pages = p->pages;
+    size_t page_size = qemu_target_page_size();
+
+    for (int i = 0; i < p->normal_num; i++) {
+        p->iov[p->iovs_num].iov_base = pages->block->host + p->normal[i];
+        p->iov[p->iovs_num].iov_len = page_size;
+        p->iovs_num++;
+    }
+
+    p->next_packet_size = p->normal_num * page_size;
     p->flags |= MULTIFD_FLAG_NOCOMP;
     return 0;
 }
 
 /**
- * nocomp_send_write: do the actual write of the data
- *
- * For no compression we just have to write the data.
- *
- * Returns 0 for success or -1 for error
- *
- * @p: Params for the channel that we are using
- * @used: number of pages used
- * @errp: pointer to an error
- */
-static int nocomp_send_write(MultiFDSendParams *p, uint32_t used, Error **errp)
-{
-    return qio_channel_writev_all(p->c, p->pages->iov, used, errp);
-}
-
-/**
  * nocomp_recv_setup: setup receive side
  *
  * For no compression this function does nothing.
@@ -146,20 +139,24 @@ static void nocomp_recv_cleanup(MultiFDRecvParams *p)
 static int nocomp_recv_pages(MultiFDRecvParams *p, Error **errp)
 {
     uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
+    size_t page_size = qemu_target_page_size();
 
     if (flags != MULTIFD_FLAG_NOCOMP) {
-        error_setg(errp, "multifd %d: flags received %x flags expected %x",
+        error_setg(errp, "multifd %u: flags received %x flags expected %x",
                    p->id, flags, MULTIFD_FLAG_NOCOMP);
         return -1;
     }
-    return qio_channel_readv_all(p->c, p->pages->iov, p->pages->num, errp);
+    for (int i = 0; i < p->normal_num; i++) {
+        p->iov[i].iov_base = p->host + p->normal[i];
+        p->iov[i].iov_len = page_size;
+    }
+    return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp);
 }
 
 static MultiFDMethods multifd_nocomp_ops = {
     .send_setup = nocomp_send_setup,
     .send_cleanup = nocomp_send_cleanup,
     .send_prepare = nocomp_send_prepare,
-    .send_write = nocomp_send_write,
     .recv_setup = nocomp_recv_setup,
     .recv_cleanup = nocomp_recv_cleanup,
     .recv_pages = nocomp_recv_pages
@@ -212,8 +209,8 @@ static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
     }
 
     if (msg.version != MULTIFD_VERSION) {
-        error_setg(errp, "multifd: received packet version %d "
-                   "expected %d", msg.version, MULTIFD_VERSION);
+        error_setg(errp, "multifd: received packet version %u "
+                   "expected %u", msg.version, MULTIFD_VERSION);
         return -1;
     }
 
@@ -229,8 +226,8 @@ static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
     }
 
     if (msg.id > migrate_multifd_channels()) {
-        error_setg(errp, "multifd: received channel version %d "
-                   "expected %d", msg.version, MULTIFD_VERSION);
+        error_setg(errp, "multifd: received channel version %u "
+                   "expected %u", msg.version, MULTIFD_VERSION);
         return -1;
     }
 
@@ -242,7 +239,6 @@ static MultiFDPages_t *multifd_pages_init(size_t size)
     MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
 
     pages->allocated = size;
-    pages->iov = g_new0(struct iovec, size);
     pages->offset = g_new0(ram_addr_t, size);
 
     return pages;
@@ -254,8 +250,6 @@ static void multifd_pages_clear(MultiFDPages_t *pages)
     pages->allocated = 0;
     pages->packet_num = 0;
     pages->block = NULL;
-    g_free(pages->iov);
-    pages->iov = NULL;
     g_free(pages->offset);
     pages->offset = NULL;
     g_free(pages);
@@ -268,7 +262,7 @@ static void multifd_send_fill_packet(MultiFDSendParams *p)
 
     packet->flags = cpu_to_be32(p->flags);
     packet->pages_alloc = cpu_to_be32(p->pages->allocated);
-    packet->pages_used = cpu_to_be32(p->pages->num);
+    packet->normal_pages = cpu_to_be32(p->normal_num);
     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
     packet->packet_num = cpu_to_be64(p->packet_num);
 
@@ -276,9 +270,9 @@ static void multifd_send_fill_packet(MultiFDSendParams *p)
         strncpy(packet->ramblock, p->pages->block->idstr, 256);
     }
 
-    for (i = 0; i < p->pages->num; i++) {
+    for (i = 0; i < p->normal_num; i++) {
         /* there are architectures where ram_addr_t is 32 bit */
-        uint64_t temp = p->pages->offset[i];
+        uint64_t temp = p->normal[i];
 
         packet->offset[i] = cpu_to_be64(temp);
     }
@@ -288,7 +282,7 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
 {
     MultiFDPacket_t *packet = p->packet;
     size_t page_size = qemu_target_page_size();
-    uint32_t pages_max = MULTIFD_PACKET_SIZE / page_size;
+    uint32_t page_count = MULTIFD_PACKET_SIZE / page_size;
     RAMBlock *block;
     int i;
 
@@ -303,7 +297,7 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
     packet->version = be32_to_cpu(packet->version);
     if (packet->version != MULTIFD_VERSION) {
         error_setg(errp, "multifd: received packet "
-                   "version %d and expected version %d",
+                   "version %u and expected version %u",
                    packet->version, MULTIFD_VERSION);
         return -1;
     }
@@ -315,33 +309,25 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
      * If we received a packet that is 100 times bigger than expected
      * just stop migration.  It is a magic number.
      */
-    if (packet->pages_alloc > pages_max * 100) {
+    if (packet->pages_alloc > page_count) {
         error_setg(errp, "multifd: received packet "
-                   "with size %d and expected a maximum size of %d",
-                   packet->pages_alloc, pages_max * 100) ;
+                   "with size %u and expected a size of %u",
+                   packet->pages_alloc, page_count) ;
         return -1;
     }
-    /*
-     * We received a packet that is bigger than expected but inside
-     * reasonable limits (see previous comment).  Just reallocate.
-     */
-    if (packet->pages_alloc > p->pages->allocated) {
-        multifd_pages_clear(p->pages);
-        p->pages = multifd_pages_init(packet->pages_alloc);
-    }
 
-    p->pages->num = be32_to_cpu(packet->pages_used);
-    if (p->pages->num > packet->pages_alloc) {
+    p->normal_num = be32_to_cpu(packet->normal_pages);
+    if (p->normal_num > packet->pages_alloc) {
         error_setg(errp, "multifd: received packet "
-                   "with %d pages and expected maximum pages are %d",
-                   p->pages->num, packet->pages_alloc) ;
+                   "with %u pages and expected maximum pages are %u",
+                   p->normal_num, packet->pages_alloc) ;
         return -1;
     }
 
     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
     p->packet_num = be64_to_cpu(packet->packet_num);
 
-    if (p->pages->num == 0) {
+    if (p->normal_num == 0) {
         return 0;
     }
 
@@ -354,8 +340,8 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
         return -1;
     }
 
-    p->pages->block = block;
-    for (i = 0; i < p->pages->num; i++) {
+    p->host = block->host;
+    for (i = 0; i < p->normal_num; i++) {
         uint64_t offset = be64_to_cpu(packet->offset[i]);
 
         if (offset > (block->used_length - page_size)) {
@@ -364,9 +350,7 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
                        offset, block->used_length);
             return -1;
         }
-        p->pages->offset[i] = offset;
-        p->pages->iov[i].iov_base = block->host + offset;
-        p->pages->iov[i].iov_len = page_size;
+        p->normal[i] = offset;
     }
 
     return 0;
@@ -470,8 +454,6 @@ int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
 
     if (pages->block == block) {
         pages->offset[pages->num] = offset;
-        pages->iov[pages->num].iov_base = block->host + offset;
-        pages->iov[pages->num].iov_len = qemu_target_page_size();
         pages->num++;
 
         if (pages->num < pages->allocated) {
@@ -567,6 +549,10 @@ void multifd_save_cleanup(void)
         p->packet_len = 0;
         g_free(p->packet);
         p->packet = NULL;
+        g_free(p->iov);
+        p->iov = NULL;
+        g_free(p->normal);
+        p->normal = NULL;
         multifd_send_state->ops->send_cleanup(p, &local_err);
         if (local_err) {
             migrate_set_error(migrate_get_current(), local_err);
@@ -651,11 +637,17 @@ static void *multifd_send_thread(void *opaque)
         qemu_mutex_lock(&p->mutex);
 
         if (p->pending_job) {
-            uint32_t used = p->pages->num;
             uint64_t packet_num = p->packet_num;
             uint32_t flags = p->flags;
+            p->iovs_num = 1;
+            p->normal_num = 0;
+
+            for (int i = 0; i < p->pages->num; i++) {
+                p->normal[p->normal_num] = p->pages->offset[i];
+                p->normal_num++;
+            }
 
-            if (used) {
+            if (p->normal_num) {
                 ret = multifd_send_state->ops->send_prepare(p, &local_err);
                 if (ret != 0) {
                     qemu_mutex_unlock(&p->mutex);
@@ -665,27 +657,23 @@ static void *multifd_send_thread(void *opaque)
             multifd_send_fill_packet(p);
             p->flags = 0;
             p->num_packets++;
-            p->num_pages += used;
+            p->total_normal_pages += p->normal_num;
             p->pages->num = 0;
             p->pages->block = NULL;
             qemu_mutex_unlock(&p->mutex);
 
-            trace_multifd_send(p->id, packet_num, used, flags,
+            trace_multifd_send(p->id, packet_num, p->normal_num, flags,
                                p->next_packet_size);
 
-            ret = qio_channel_write_all(p->c, (void *)p->packet,
-                                        p->packet_len, &local_err);
+            p->iov[0].iov_len = p->packet_len;
+            p->iov[0].iov_base = p->packet;
+
+            ret = qio_channel_writev_all(p->c, p->iov, p->iovs_num,
+                                         &local_err);
             if (ret != 0) {
                 break;
             }
 
-            if (used) {
-                ret = multifd_send_state->ops->send_write(p, used, &local_err);
-                if (ret != 0) {
-                    break;
-                }
-            }
-
             qemu_mutex_lock(&p->mutex);
             p->pending_job--;
             qemu_mutex_unlock(&p->mutex);
@@ -724,7 +712,7 @@ out:
     qemu_mutex_unlock(&p->mutex);
 
     rcu_unregister_thread();
-    trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages);
+    trace_multifd_send_thread_end(p->id, p->num_packets, p->total_normal_pages);
 
     return NULL;
 }
@@ -922,6 +910,9 @@ int multifd_save_setup(Error **errp)
         p->packet->version = cpu_to_be32(MULTIFD_VERSION);
         p->name = g_strdup_printf("multifdsend_%d", i);
         p->tls_hostname = g_strdup(s->hostname);
+        /* We need one extra place for the packet header */
+        p->iov = g_new0(struct iovec, page_count + 1);
+        p->normal = g_new0(ram_addr_t, page_count);
         socket_send_channel_create(multifd_new_send_channel_async, p);
     }
 
@@ -1016,11 +1007,13 @@ int multifd_load_cleanup(Error **errp)
         qemu_sem_destroy(&p->sem_sync);
         g_free(p->name);
         p->name = NULL;
-        multifd_pages_clear(p->pages);
-        p->pages = NULL;
         p->packet_len = 0;
         g_free(p->packet);
         p->packet = NULL;
+        g_free(p->iov);
+        p->iov = NULL;
+        g_free(p->normal);
+        p->normal = NULL;
         multifd_recv_state->ops->recv_cleanup(p);
     }
     qemu_sem_destroy(&multifd_recv_state->sem_sync);
@@ -1069,7 +1062,6 @@ static void *multifd_recv_thread(void *opaque)
     rcu_register_thread();
 
     while (true) {
-        uint32_t used;
         uint32_t flags;
 
         if (p->quit) {
@@ -1092,17 +1084,16 @@ static void *multifd_recv_thread(void *opaque)
             break;
         }
 
-        used = p->pages->num;
         flags = p->flags;
         /* recv methods don't know how to handle the SYNC flag */
         p->flags &= ~MULTIFD_FLAG_SYNC;
-        trace_multifd_recv(p->id, p->packet_num, used, flags,
+        trace_multifd_recv(p->id, p->packet_num, p->normal_num, flags,
                            p->next_packet_size);
         p->num_packets++;
-        p->num_pages += used;
+        p->total_normal_pages += p->normal_num;
         qemu_mutex_unlock(&p->mutex);
 
-        if (used) {
+        if (p->normal_num) {
             ret = multifd_recv_state->ops->recv_pages(p, &local_err);
             if (ret != 0) {
                 break;
@@ -1124,7 +1115,7 @@ static void *multifd_recv_thread(void *opaque)
     qemu_mutex_unlock(&p->mutex);
 
     rcu_unregister_thread();
-    trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages);
+    trace_multifd_recv_thread_end(p->id, p->num_packets, p->total_normal_pages);
 
     return NULL;
 }
@@ -1156,11 +1147,12 @@ int multifd_load_setup(Error **errp)
         qemu_sem_init(&p->sem_sync, 0);
         p->quit = false;
         p->id = i;
-        p->pages = multifd_pages_init(page_count);
         p->packet_len = sizeof(MultiFDPacket_t)
                       + sizeof(uint64_t) * page_count;
         p->packet = g_malloc0(p->packet_len);
         p->name = g_strdup_printf("multifdrecv_%d", i);
+        p->iov = g_new0(struct iovec, page_count);
+        p->normal = g_new0(ram_addr_t, page_count);
     }
 
     for (i = 0; i < thread_count; i++) {