diff options
Diffstat (limited to 'migration/multifd.c')
| -rw-r--r-- | migration/multifd.c | 111 |
1 files changed, 60 insertions, 51 deletions
diff --git a/migration/multifd.c b/migration/multifd.c index adfe8c9a0a..6c07f19af1 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -79,6 +79,19 @@ struct { MultiFDMethods *ops; } *multifd_send_state; +struct { + MultiFDRecvParams *params; + /* number of created threads */ + int count; + /* syncs main thread and channels */ + QemuSemaphore sem_sync; + /* global number of generated multifd packets */ + uint64_t packet_num; + int exiting; + /* multifd ops */ + MultiFDMethods *ops; +} *multifd_recv_state; + /* Multifd without compression */ /** @@ -440,6 +453,11 @@ static bool multifd_send_should_exit(void) return qatomic_read(&multifd_send_state->exiting); } +static bool multifd_recv_should_exit(void) +{ + return qatomic_read(&multifd_recv_state->exiting); +} + /* * The migration thread can wait on either of the two semaphores. This * function can be used to kick the main thread out of waiting on either of @@ -641,18 +659,13 @@ static void multifd_send_terminate_threads(void) } } -static int multifd_send_channel_destroy(QIOChannel *send) -{ - return socket_send_channel_destroy(send); -} - static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) { - if (p->registered_yank) { + if (p->c) { migration_ioc_unregister_yank(p->c); + object_unref(OBJECT(p->c)); + p->c = NULL; } - multifd_send_channel_destroy(p->c); - p->c = NULL; qemu_sem_destroy(&p->sem); qemu_sem_destroy(&p->sem_sync); g_free(p->name); @@ -671,6 +684,7 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) static void multifd_send_cleanup_state(void) { + socket_cleanup_outgoing_migration(); qemu_sem_destroy(&multifd_send_state->channels_created); qemu_sem_destroy(&multifd_send_state->channels_ready); g_free(multifd_send_state->params); @@ -873,16 +887,22 @@ out: static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque); +typedef struct { + MultiFDSendParams *p; + QIOChannelTLS *tioc; +} MultiFDTLSThreadArgs; + static void *multifd_tls_handshake_thread(void *opaque) { - MultiFDSendParams *p = opaque; - QIOChannelTLS *tioc = QIO_CHANNEL_TLS(p->c); + MultiFDTLSThreadArgs *args = opaque; - qio_channel_tls_handshake(tioc, + qio_channel_tls_handshake(args->tioc, multifd_new_send_channel_async, - p, + args->p, NULL, NULL); + g_free(args); + return NULL; } @@ -892,6 +912,7 @@ static bool multifd_tls_channel_connect(MultiFDSendParams *p, { MigrationState *s = migrate_get_current(); const char *hostname = s->hostname; + MultiFDTLSThreadArgs *args; QIOChannelTLS *tioc; tioc = migration_tls_client_create(ioc, hostname, errp); @@ -906,29 +927,29 @@ static bool multifd_tls_channel_connect(MultiFDSendParams *p, object_unref(OBJECT(ioc)); trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname); qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing"); - p->c = QIO_CHANNEL(tioc); + + args = g_new0(MultiFDTLSThreadArgs, 1); + args->tioc = tioc; + args->p = p; p->tls_thread_created = true; qemu_thread_create(&p->tls_thread, "multifd-tls-handshake-worker", - multifd_tls_handshake_thread, p, + multifd_tls_handshake_thread, args, QEMU_THREAD_JOINABLE); return true; } -static bool multifd_channel_connect(MultiFDSendParams *p, - QIOChannel *ioc, - Error **errp) +static void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc) { qio_channel_set_delay(ioc, false); migration_ioc_register_yank(ioc); - p->registered_yank = true; + /* Setup p->c only if the channel is completely setup */ p->c = ioc; p->thread_created = true; qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, QEMU_THREAD_JOINABLE); - return true; } /* @@ -960,7 +981,8 @@ static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) return; } } else { - ret = multifd_channel_connect(p, ioc, &local_err); + multifd_channel_connect(p, ioc); + ret = true; } out: @@ -976,14 +998,12 @@ out: trace_multifd_new_send_channel_async_error(p->id, local_err); multifd_send_set_error(local_err); - if (!p->c) { - /* - * If no channel has been created, drop the initial - * reference. Otherwise cleanup happens at - * multifd_send_channel_destroy() - */ - object_unref(OBJECT(ioc)); - } + /* + * For error cases (TLS or non-TLS), IO channel is always freed here + * rather than when cleanup multifd: since p->c is not set, multifd + * cleanup code doesn't even know its existence. + */ + object_unref(OBJECT(ioc)); error_free(local_err); } @@ -1063,24 +1083,16 @@ bool multifd_send_setup(void) return true; } -struct { - MultiFDRecvParams *params; - /* number of created threads */ - int count; - /* syncs main thread and channels */ - QemuSemaphore sem_sync; - /* global number of generated multifd packets */ - uint64_t packet_num; - /* multifd ops */ - MultiFDMethods *ops; -} *multifd_recv_state; - static void multifd_recv_terminate_threads(Error *err) { int i; trace_multifd_recv_terminate_threads(err != NULL); + if (qatomic_xchg(&multifd_recv_state->exiting, 1)) { + return; + } + if (err) { MigrationState *s = migrate_get_current(); migrate_set_error(s, err); @@ -1094,8 +1106,12 @@ static void multifd_recv_terminate_threads(Error *err) for (i = 0; i < migrate_multifd_channels(); i++) { MultiFDRecvParams *p = &multifd_recv_state->params[i]; - qemu_mutex_lock(&p->mutex); - p->quit = true; + /* + * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code, + * however try to wakeup it without harm in cleanup phase. + */ + qemu_sem_post(&p->sem_sync); + /* * We could arrive here for two reasons: * - normal quit, i.e. everything went fine, just finished @@ -1105,7 +1121,6 @@ static void multifd_recv_terminate_threads(Error *err) if (p->c) { qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); } - qemu_mutex_unlock(&p->mutex); } } @@ -1155,12 +1170,6 @@ void multifd_recv_cleanup(void) for (i = 0; i < migrate_multifd_channels(); i++) { MultiFDRecvParams *p = &multifd_recv_state->params[i]; - /* - * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code, - * however try to wakeup it without harm in cleanup phase. - */ - qemu_sem_post(&p->sem_sync); - if (p->thread_created) { qemu_thread_join(&p->thread); } @@ -1210,7 +1219,7 @@ static void *multifd_recv_thread(void *opaque) while (true) { uint32_t flags; - if (p->quit) { + if (multifd_recv_should_exit()) { break; } @@ -1274,6 +1283,7 @@ int multifd_recv_setup(Error **errp) multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); qatomic_set(&multifd_recv_state->count, 0); + qatomic_set(&multifd_recv_state->exiting, 0); qemu_sem_init(&multifd_recv_state->sem_sync, 0); multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()]; @@ -1282,7 +1292,6 @@ int multifd_recv_setup(Error **errp) qemu_mutex_init(&p->mutex); qemu_sem_init(&p->sem_sync, 0); - p->quit = false; p->id = i; p->packet_len = sizeof(MultiFDPacket_t) + sizeof(uint64_t) * page_count; |