summary refs log tree commit diff stats
path: root/migration/multifd.c
diff options
context:
space:
mode:
Diffstat (limited to 'migration/multifd.c')
-rw-r--r--migration/multifd.c111
1 files changed, 60 insertions, 51 deletions
diff --git a/migration/multifd.c b/migration/multifd.c
index adfe8c9a0a..6c07f19af1 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -79,6 +79,19 @@ struct {
     MultiFDMethods *ops;
 } *multifd_send_state;
 
+struct {
+    MultiFDRecvParams *params;
+    /* number of created threads */
+    int count;
+    /* syncs main thread and channels */
+    QemuSemaphore sem_sync;
+    /* global number of generated multifd packets */
+    uint64_t packet_num;
+    int exiting;
+    /* multifd ops */
+    MultiFDMethods *ops;
+} *multifd_recv_state;
+
 /* Multifd without compression */
 
 /**
@@ -440,6 +453,11 @@ static bool multifd_send_should_exit(void)
     return qatomic_read(&multifd_send_state->exiting);
 }
 
+static bool multifd_recv_should_exit(void)
+{
+    return qatomic_read(&multifd_recv_state->exiting);
+}
+
 /*
  * The migration thread can wait on either of the two semaphores.  This
  * function can be used to kick the main thread out of waiting on either of
@@ -641,18 +659,13 @@ static void multifd_send_terminate_threads(void)
     }
 }
 
-static int multifd_send_channel_destroy(QIOChannel *send)
-{
-    return socket_send_channel_destroy(send);
-}
-
 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
 {
-    if (p->registered_yank) {
+    if (p->c) {
         migration_ioc_unregister_yank(p->c);
+        object_unref(OBJECT(p->c));
+        p->c = NULL;
     }
-    multifd_send_channel_destroy(p->c);
-    p->c = NULL;
     qemu_sem_destroy(&p->sem);
     qemu_sem_destroy(&p->sem_sync);
     g_free(p->name);
@@ -671,6 +684,7 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
 
 static void multifd_send_cleanup_state(void)
 {
+    socket_cleanup_outgoing_migration();
     qemu_sem_destroy(&multifd_send_state->channels_created);
     qemu_sem_destroy(&multifd_send_state->channels_ready);
     g_free(multifd_send_state->params);
@@ -873,16 +887,22 @@ out:
 
 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque);
 
+typedef struct {
+    MultiFDSendParams *p;
+    QIOChannelTLS *tioc;
+} MultiFDTLSThreadArgs;
+
 static void *multifd_tls_handshake_thread(void *opaque)
 {
-    MultiFDSendParams *p = opaque;
-    QIOChannelTLS *tioc = QIO_CHANNEL_TLS(p->c);
+    MultiFDTLSThreadArgs *args = opaque;
 
-    qio_channel_tls_handshake(tioc,
+    qio_channel_tls_handshake(args->tioc,
                               multifd_new_send_channel_async,
-                              p,
+                              args->p,
                               NULL,
                               NULL);
+    g_free(args);
+
     return NULL;
 }
 
@@ -892,6 +912,7 @@ static bool multifd_tls_channel_connect(MultiFDSendParams *p,
 {
     MigrationState *s = migrate_get_current();
     const char *hostname = s->hostname;
+    MultiFDTLSThreadArgs *args;
     QIOChannelTLS *tioc;
 
     tioc = migration_tls_client_create(ioc, hostname, errp);
@@ -906,29 +927,29 @@ static bool multifd_tls_channel_connect(MultiFDSendParams *p,
     object_unref(OBJECT(ioc));
     trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
     qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
-    p->c = QIO_CHANNEL(tioc);
+
+    args = g_new0(MultiFDTLSThreadArgs, 1);
+    args->tioc = tioc;
+    args->p = p;
 
     p->tls_thread_created = true;
     qemu_thread_create(&p->tls_thread, "multifd-tls-handshake-worker",
-                       multifd_tls_handshake_thread, p,
+                       multifd_tls_handshake_thread, args,
                        QEMU_THREAD_JOINABLE);
     return true;
 }
 
-static bool multifd_channel_connect(MultiFDSendParams *p,
-                                    QIOChannel *ioc,
-                                    Error **errp)
+static void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc)
 {
     qio_channel_set_delay(ioc, false);
 
     migration_ioc_register_yank(ioc);
-    p->registered_yank = true;
+    /* Setup p->c only if the channel is completely setup */
     p->c = ioc;
 
     p->thread_created = true;
     qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
                        QEMU_THREAD_JOINABLE);
-    return true;
 }
 
 /*
@@ -960,7 +981,8 @@ static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
             return;
         }
     } else {
-        ret = multifd_channel_connect(p, ioc, &local_err);
+        multifd_channel_connect(p, ioc);
+        ret = true;
     }
 
 out:
@@ -976,14 +998,12 @@ out:
 
     trace_multifd_new_send_channel_async_error(p->id, local_err);
     multifd_send_set_error(local_err);
-    if (!p->c) {
-        /*
-         * If no channel has been created, drop the initial
-         * reference. Otherwise cleanup happens at
-         * multifd_send_channel_destroy()
-         */
-        object_unref(OBJECT(ioc));
-    }
+    /*
+     * For error cases (TLS or non-TLS), IO channel is always freed here
+     * rather than when cleanup multifd: since p->c is not set, multifd
+     * cleanup code doesn't even know its existence.
+     */
+    object_unref(OBJECT(ioc));
     error_free(local_err);
 }
 
@@ -1063,24 +1083,16 @@ bool multifd_send_setup(void)
     return true;
 }
 
-struct {
-    MultiFDRecvParams *params;
-    /* number of created threads */
-    int count;
-    /* syncs main thread and channels */
-    QemuSemaphore sem_sync;
-    /* global number of generated multifd packets */
-    uint64_t packet_num;
-    /* multifd ops */
-    MultiFDMethods *ops;
-} *multifd_recv_state;
-
 static void multifd_recv_terminate_threads(Error *err)
 {
     int i;
 
     trace_multifd_recv_terminate_threads(err != NULL);
 
+    if (qatomic_xchg(&multifd_recv_state->exiting, 1)) {
+        return;
+    }
+
     if (err) {
         MigrationState *s = migrate_get_current();
         migrate_set_error(s, err);
@@ -1094,8 +1106,12 @@ static void multifd_recv_terminate_threads(Error *err)
     for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
-        qemu_mutex_lock(&p->mutex);
-        p->quit = true;
+        /*
+         * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
+         * however try to wakeup it without harm in cleanup phase.
+         */
+        qemu_sem_post(&p->sem_sync);
+
         /*
          * We could arrive here for two reasons:
          *  - normal quit, i.e. everything went fine, just finished
@@ -1105,7 +1121,6 @@ static void multifd_recv_terminate_threads(Error *err)
         if (p->c) {
             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
         }
-        qemu_mutex_unlock(&p->mutex);
     }
 }
 
@@ -1155,12 +1170,6 @@ void multifd_recv_cleanup(void)
     for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
-        /*
-         * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
-         * however try to wakeup it without harm in cleanup phase.
-         */
-        qemu_sem_post(&p->sem_sync);
-
         if (p->thread_created) {
             qemu_thread_join(&p->thread);
         }
@@ -1210,7 +1219,7 @@ static void *multifd_recv_thread(void *opaque)
     while (true) {
         uint32_t flags;
 
-        if (p->quit) {
+        if (multifd_recv_should_exit()) {
             break;
         }
 
@@ -1274,6 +1283,7 @@ int multifd_recv_setup(Error **errp)
     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
     qatomic_set(&multifd_recv_state->count, 0);
+    qatomic_set(&multifd_recv_state->exiting, 0);
     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
     multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
 
@@ -1282,7 +1292,6 @@ int multifd_recv_setup(Error **errp)
 
         qemu_mutex_init(&p->mutex);
         qemu_sem_init(&p->sem_sync, 0);
-        p->quit = false;
         p->id = i;
         p->packet_len = sizeof(MultiFDPacket_t)
                       + sizeof(uint64_t) * page_count;