diff options
Diffstat (limited to 'migration/multifd.c')
| -rw-r--r-- | migration/multifd.c | 142 |
1 files changed, 119 insertions, 23 deletions
diff --git a/migration/multifd.c b/migration/multifd.c index 2de5263c32..68b171fb61 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -20,6 +20,7 @@ #include "ram.h" #include "migration.h" #include "socket.h" +#include "tls.h" #include "qemu-file.h" #include "trace.h" #include "multifd.h" @@ -410,7 +411,7 @@ static int multifd_send_pages(QEMUFile *f) MultiFDPages_t *pages = multifd_send_state->pages; uint64_t transferred; - if (atomic_read(&multifd_send_state->exiting)) { + if (qatomic_read(&multifd_send_state->exiting)) { return -1; } @@ -508,7 +509,7 @@ static void multifd_send_terminate_threads(Error *err) * threads at the same time, we can end calling this function * twice. */ - if (atomic_xchg(&multifd_send_state->exiting, 1)) { + if (qatomic_xchg(&multifd_send_state->exiting, 1)) { return; } @@ -548,6 +549,8 @@ void multifd_save_cleanup(void) qemu_sem_destroy(&p->sem_sync); g_free(p->name); p->name = NULL; + g_free(p->tls_hostname); + p->tls_hostname = NULL; multifd_pages_clear(p->pages); p->pages = NULL; p->packet_len = 0; @@ -632,7 +635,7 @@ static void *multifd_send_thread(void *opaque) while (true) { qemu_sem_wait(&p->sem); - if (atomic_read(&multifd_send_state->exiting)) { + if (qatomic_read(&multifd_send_state->exiting)) { break; } qemu_mutex_lock(&p->mutex); @@ -717,6 +720,102 @@ out: return NULL; } +static bool multifd_channel_connect(MultiFDSendParams *p, + QIOChannel *ioc, + Error *error); + +static void multifd_tls_outgoing_handshake(QIOTask *task, + gpointer opaque) +{ + MultiFDSendParams *p = opaque; + QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task)); + Error *err = NULL; + + if (qio_task_propagate_error(task, &err)) { + trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err)); + } else { + trace_multifd_tls_outgoing_handshake_complete(ioc); + } + multifd_channel_connect(p, ioc, err); +} + +static void multifd_tls_channel_connect(MultiFDSendParams *p, + QIOChannel *ioc, + Error **errp) +{ + MigrationState *s = migrate_get_current(); + const char *hostname = p->tls_hostname; + QIOChannelTLS *tioc; + + tioc = migration_tls_client_create(s, ioc, hostname, errp); + if (!tioc) { + return; + } + + trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname); + qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing"); + qio_channel_tls_handshake(tioc, + multifd_tls_outgoing_handshake, + p, + NULL, + NULL); + +} + +static bool multifd_channel_connect(MultiFDSendParams *p, + QIOChannel *ioc, + Error *error) +{ + MigrationState *s = migrate_get_current(); + + trace_multifd_set_outgoing_channel( + ioc, object_get_typename(OBJECT(ioc)), p->tls_hostname, error); + + if (!error) { + if (s->parameters.tls_creds && + *s->parameters.tls_creds && + !object_dynamic_cast(OBJECT(ioc), + TYPE_QIO_CHANNEL_TLS)) { + multifd_tls_channel_connect(p, ioc, &error); + if (!error) { + /* + * tls_channel_connect will call back to this + * function after the TLS handshake, + * so we mustn't call multifd_send_thread until then + */ + return false; + } else { + return true; + } + } else { + /* update for tls qio channel */ + p->c = ioc; + qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, + QEMU_THREAD_JOINABLE); + } + return false; + } + + return true; +} + +static void multifd_new_send_channel_cleanup(MultiFDSendParams *p, + QIOChannel *ioc, Error *err) +{ + migrate_set_error(migrate_get_current(), err); + /* Error happen, we need to tell who pay attention to me */ + qemu_sem_post(&multifd_send_state->channels_ready); + qemu_sem_post(&p->sem_sync); + /* + * Although multifd_send_thread is not created, but main migration + * thread neet to judge whether it is running, so we need to mark + * its status. + */ + p->quit = true; + object_unref(OBJECT(ioc)); + error_free(err); +} + static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) { MultiFDSendParams *p = opaque; @@ -725,25 +824,19 @@ static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) trace_multifd_new_send_channel_async(p->id); if (qio_task_propagate_error(task, &local_err)) { - migrate_set_error(migrate_get_current(), local_err); - /* Error happen, we need to tell who pay attention to me */ - qemu_sem_post(&multifd_send_state->channels_ready); - qemu_sem_post(&p->sem_sync); - /* - * Although multifd_send_thread is not created, but main migration - * thread needs to judge whether it is running, so we need to mark - * its status. - */ - p->quit = true; - object_unref(OBJECT(sioc)); - error_free(local_err); + goto cleanup; } else { p->c = QIO_CHANNEL(sioc); qio_channel_set_delay(p->c, false); p->running = true; - qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, - QEMU_THREAD_JOINABLE); + if (multifd_channel_connect(p, sioc, local_err)) { + goto cleanup; + } + return; } + +cleanup: + multifd_new_send_channel_cleanup(p, sioc, local_err); } int multifd_save_setup(Error **errp) @@ -751,16 +844,18 @@ int multifd_save_setup(Error **errp) int thread_count; uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); uint8_t i; + MigrationState *s; if (!migrate_use_multifd()) { return 0; } + s = migrate_get_current(); thread_count = migrate_multifd_channels(); multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); multifd_send_state->pages = multifd_pages_init(page_count); qemu_sem_init(&multifd_send_state->channels_ready, 0); - atomic_set(&multifd_send_state->exiting, 0); + qatomic_set(&multifd_send_state->exiting, 0); multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; for (i = 0; i < thread_count; i++) { @@ -779,6 +874,7 @@ int multifd_save_setup(Error **errp) p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); p->packet->version = cpu_to_be32(MULTIFD_VERSION); p->name = g_strdup_printf("multifdsend_%d", i); + p->tls_hostname = g_strdup(s->hostname); socket_send_channel_create(multifd_new_send_channel_async, p); } @@ -997,7 +1093,7 @@ int multifd_load_setup(Error **errp) thread_count = migrate_multifd_channels(); multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); - atomic_set(&multifd_recv_state->count, 0); + qatomic_set(&multifd_recv_state->count, 0); qemu_sem_init(&multifd_recv_state->sem_sync, 0); multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()]; @@ -1037,7 +1133,7 @@ bool multifd_recv_all_channels_created(void) return true; } - return thread_count == atomic_read(&multifd_recv_state->count); + return thread_count == qatomic_read(&multifd_recv_state->count); } /* @@ -1058,7 +1154,7 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp) error_propagate_prepend(errp, local_err, "failed to receive packet" " via multifd channel %d: ", - atomic_read(&multifd_recv_state->count)); + qatomic_read(&multifd_recv_state->count)); return false; } trace_multifd_recv_new_channel(id); @@ -1079,7 +1175,7 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp) p->running = true; qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, QEMU_THREAD_JOINABLE); - atomic_inc(&multifd_recv_state->count); - return atomic_read(&multifd_recv_state->count) == + qatomic_inc(&multifd_recv_state->count); + return qatomic_read(&multifd_recv_state->count) == migrate_multifd_channels(); } |