diff options
Diffstat (limited to 'migration/multifd.c')
| -rw-r--r-- | migration/multifd.c | 778 |
1 files changed, 440 insertions, 338 deletions
diff --git a/migration/multifd.c b/migration/multifd.c index 25cbc6dc6b..adfe8c9a0a 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -45,20 +45,54 @@ typedef struct { uint64_t unused2[4]; /* Reserved for future use */ } __attribute__((packed)) MultiFDInit_t; +struct { + MultiFDSendParams *params; + /* array of pages to sent */ + MultiFDPages_t *pages; + /* + * Global number of generated multifd packets. + * + * Note that we used 'uintptr_t' because it'll naturally support atomic + * operations on both 32bit / 64 bits hosts. It means on 32bit systems + * multifd will overflow the packet_num easier, but that should be + * fine. + * + * Another option is to use QEMU's Stat64 then it'll be 64 bits on all + * hosts, however so far it does not support atomic fetch_add() yet. + * Make it easy for now. + */ + uintptr_t packet_num; + /* + * Synchronization point past which no more channels will be + * created. + */ + QemuSemaphore channels_created; + /* send channels ready */ + QemuSemaphore channels_ready; + /* + * Have we already run terminate threads. There is a race when it + * happens that we got one error while we are exiting. + * We will use atomic operations. Only valid values are 0 and 1. + */ + int exiting; + /* multifd ops */ + MultiFDMethods *ops; +} *multifd_send_state; + /* Multifd without compression */ /** * nocomp_send_setup: setup send side * - * For no compression this function does nothing. - * - * Returns 0 for success or -1 for error - * * @p: Params for the channel that we are using * @errp: pointer to an error */ static int nocomp_send_setup(MultiFDSendParams *p, Error **errp) { + if (migrate_zero_copy_send()) { + p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY; + } + return 0; } @@ -88,16 +122,38 @@ static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp) */ static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp) { + bool use_zero_copy_send = migrate_zero_copy_send(); MultiFDPages_t *pages = p->pages; + int ret; - for (int i = 0; i < p->normal_num; i++) { - p->iov[p->iovs_num].iov_base = pages->block->host + p->normal[i]; + if (!use_zero_copy_send) { + /* + * Only !zerocopy needs the header in IOV; zerocopy will + * send it separately. + */ + multifd_send_prepare_header(p); + } + + for (int i = 0; i < pages->num; i++) { + p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i]; p->iov[p->iovs_num].iov_len = p->page_size; p->iovs_num++; } - p->next_packet_size = p->normal_num * p->page_size; + p->next_packet_size = pages->num * p->page_size; p->flags |= MULTIFD_FLAG_NOCOMP; + + multifd_send_fill_packet(p); + + if (use_zero_copy_send) { + /* Send header first, without zerocopy */ + ret = qio_channel_write_all(p->c, (void *)p->packet, + p->packet_len, errp); + if (ret != 0) { + return -1; + } + } + return 0; } @@ -172,6 +228,17 @@ void multifd_register_ops(int method, MultiFDMethods *ops) multifd_ops[method] = ops; } +/* Reset a MultiFDPages_t* object for the next use */ +static void multifd_pages_reset(MultiFDPages_t *pages) +{ + /* + * We don't need to touch offset[] array, because it will be + * overwritten later when reused. + */ + pages->num = 0; + pages->block = NULL; +} + static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) { MultiFDInit_t msg = {}; @@ -248,35 +315,44 @@ static MultiFDPages_t *multifd_pages_init(uint32_t n) static void multifd_pages_clear(MultiFDPages_t *pages) { - pages->num = 0; + multifd_pages_reset(pages); pages->allocated = 0; - pages->block = NULL; g_free(pages->offset); pages->offset = NULL; g_free(pages); } -static void multifd_send_fill_packet(MultiFDSendParams *p) +void multifd_send_fill_packet(MultiFDSendParams *p) { MultiFDPacket_t *packet = p->packet; + MultiFDPages_t *pages = p->pages; + uint64_t packet_num; int i; packet->flags = cpu_to_be32(p->flags); packet->pages_alloc = cpu_to_be32(p->pages->allocated); - packet->normal_pages = cpu_to_be32(p->normal_num); + packet->normal_pages = cpu_to_be32(pages->num); packet->next_packet_size = cpu_to_be32(p->next_packet_size); - packet->packet_num = cpu_to_be64(p->packet_num); - if (p->pages->block) { - strncpy(packet->ramblock, p->pages->block->idstr, 256); + packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num); + packet->packet_num = cpu_to_be64(packet_num); + + if (pages->block) { + strncpy(packet->ramblock, pages->block->idstr, 256); } - for (i = 0; i < p->normal_num; i++) { + for (i = 0; i < pages->num; i++) { /* there are architectures where ram_addr_t is 32 bit */ - uint64_t temp = p->normal[i]; + uint64_t temp = pages->offset[i]; packet->offset[i] = cpu_to_be64(temp); } + + p->packets_sent++; + p->total_normal_pages += pages->num; + + trace_multifd_send(p->id, packet_num, pages->num, p->flags, + p->next_packet_size); } static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) @@ -324,6 +400,11 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) p->next_packet_size = be32_to_cpu(packet->next_packet_size); p->packet_num = be64_to_cpu(packet->packet_num); + p->packets_recved++; + p->total_normal_pages += p->normal_num; + + trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->flags, + p->next_packet_size); if (p->normal_num == 0) { return 0; @@ -354,23 +435,22 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) return 0; } -struct { - MultiFDSendParams *params; - /* array of pages to sent */ - MultiFDPages_t *pages; - /* global number of generated multifd packets */ - uint64_t packet_num; - /* send channels ready */ - QemuSemaphore channels_ready; - /* - * Have we already run terminate threads. There is a race when it - * happens that we got one error while we are exiting. - * We will use atomic operations. Only valid values are 0 and 1. - */ - int exiting; - /* multifd ops */ - MultiFDMethods *ops; -} *multifd_send_state; +static bool multifd_send_should_exit(void) +{ + return qatomic_read(&multifd_send_state->exiting); +} + +/* + * The migration thread can wait on either of the two semaphores. This + * function can be used to kick the main thread out of waiting on either of + * them. Should mostly only be called when something wrong happened with + * the current multifd send thread. + */ +static void multifd_send_kick_main(MultiFDSendParams *p) +{ + qemu_sem_post(&p->sem_sync); + qemu_sem_post(&multifd_send_state->channels_ready); +} /* * How we use multifd_send_state->pages and channel->pages? @@ -388,20 +468,23 @@ struct { * thread is using the channel mutex when changing it, and the channel * have to had finish with its own, otherwise pending_job can't be * false. + * + * Returns true if succeed, false otherwise. */ - -static int multifd_send_pages(void) +static bool multifd_send_pages(void) { int i; static int next_channel; MultiFDSendParams *p = NULL; /* make happy gcc */ MultiFDPages_t *pages = multifd_send_state->pages; - if (qatomic_read(&multifd_send_state->exiting)) { - return -1; + if (multifd_send_should_exit()) { + return false; } + /* We wait here, until at least one channel is ready */ qemu_sem_wait(&multifd_send_state->channels_ready); + /* * next_channel can remain from a previous migration that was * using more channels, so ensure it doesn't overflow if the @@ -409,69 +492,100 @@ static int multifd_send_pages(void) */ next_channel %= migrate_multifd_channels(); for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { - p = &multifd_send_state->params[i]; - - qemu_mutex_lock(&p->mutex); - if (p->quit) { - error_report("%s: channel %d has already quit!", __func__, i); - qemu_mutex_unlock(&p->mutex); - return -1; + if (multifd_send_should_exit()) { + return false; } - if (!p->pending_job) { - p->pending_job++; + p = &multifd_send_state->params[i]; + /* + * Lockless read to p->pending_job is safe, because only multifd + * sender thread can clear it. + */ + if (qatomic_read(&p->pending_job) == false) { next_channel = (i + 1) % migrate_multifd_channels(); break; } - qemu_mutex_unlock(&p->mutex); } - assert(!p->pages->num); - assert(!p->pages->block); - p->packet_num = multifd_send_state->packet_num++; + /* + * Make sure we read p->pending_job before all the rest. Pairs with + * qatomic_store_release() in multifd_send_thread(). + */ + smp_mb_acquire(); + assert(!p->pages->num); multifd_send_state->pages = p->pages; p->pages = pages; - qemu_mutex_unlock(&p->mutex); + /* + * Making sure p->pages is setup before marking pending_job=true. Pairs + * with the qatomic_load_acquire() in multifd_send_thread(). + */ + qatomic_store_release(&p->pending_job, true); qemu_sem_post(&p->sem); - return 1; + return true; } -int multifd_queue_page(RAMBlock *block, ram_addr_t offset) +static inline bool multifd_queue_empty(MultiFDPages_t *pages) { - MultiFDPages_t *pages = multifd_send_state->pages; - bool changed = false; + return pages->num == 0; +} - if (!pages->block) { - pages->block = block; - } +static inline bool multifd_queue_full(MultiFDPages_t *pages) +{ + return pages->num == pages->allocated; +} - if (pages->block == block) { - pages->offset[pages->num] = offset; - pages->num++; +static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset) +{ + pages->offset[pages->num++] = offset; +} - if (pages->num < pages->allocated) { - return 1; - } - } else { - changed = true; - } +/* Returns true if enqueue successful, false otherwise */ +bool multifd_queue_page(RAMBlock *block, ram_addr_t offset) +{ + MultiFDPages_t *pages; - if (multifd_send_pages() < 0) { - return -1; +retry: + pages = multifd_send_state->pages; + + /* If the queue is empty, we can already enqueue now */ + if (multifd_queue_empty(pages)) { + pages->block = block; + multifd_enqueue(pages, offset); + return true; } - if (changed) { - return multifd_queue_page(block, offset); + /* + * Not empty, meanwhile we need a flush. It can because of either: + * + * (1) The page is not on the same ramblock of previous ones, or, + * (2) The queue is full. + * + * After flush, always retry. + */ + if (pages->block != block || multifd_queue_full(pages)) { + if (!multifd_send_pages()) { + return false; + } + goto retry; } - return 1; + /* Not empty, and we still have space, do it! */ + multifd_enqueue(pages, offset); + return true; } -static void multifd_send_terminate_threads(Error *err) +/* Multifd send side hit an error; remember it and prepare to quit */ +static void multifd_send_set_error(Error *err) { - int i; - - trace_multifd_send_terminate_threads(err != NULL); + /* + * We don't want to exit each threads twice. Depending on where + * we get the error, or if there are two independent errors in two + * threads at the same time, we can end calling this function + * twice. + */ + if (qatomic_xchg(&multifd_send_state->exiting, 1)) { + return; + } if (err) { MigrationState *s = migrate_get_current(); @@ -484,27 +598,46 @@ static void multifd_send_terminate_threads(Error *err) MIGRATION_STATUS_FAILED); } } +} + +static void multifd_send_terminate_threads(void) +{ + int i; + + trace_multifd_send_terminate_threads(); /* - * We don't want to exit each threads twice. Depending on where - * we get the error, or if there are two independent errors in two - * threads at the same time, we can end calling this function - * twice. + * Tell everyone we're quitting. No xchg() needed here; we simply + * always set it. */ - if (qatomic_xchg(&multifd_send_state->exiting, 1)) { - return; - } + qatomic_set(&multifd_send_state->exiting, 1); + /* + * Firstly, kick all threads out; no matter whether they are just idle, + * or blocked in an IO system call. + */ for (i = 0; i < migrate_multifd_channels(); i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; - qemu_mutex_lock(&p->mutex); - p->quit = true; qemu_sem_post(&p->sem); if (p->c) { qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); } - qemu_mutex_unlock(&p->mutex); + } + + /* + * Finally recycle all the threads. + */ + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDSendParams *p = &multifd_send_state->params[i]; + + if (p->tls_thread_created) { + qemu_thread_join(&p->tls_thread); + } + + if (p->thread_created) { + qemu_thread_join(&p->thread); + } } } @@ -513,57 +646,62 @@ static int multifd_send_channel_destroy(QIOChannel *send) return socket_send_channel_destroy(send); } -void multifd_save_cleanup(void) +static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) +{ + if (p->registered_yank) { + migration_ioc_unregister_yank(p->c); + } + multifd_send_channel_destroy(p->c); + p->c = NULL; + qemu_sem_destroy(&p->sem); + qemu_sem_destroy(&p->sem_sync); + g_free(p->name); + p->name = NULL; + multifd_pages_clear(p->pages); + p->pages = NULL; + p->packet_len = 0; + g_free(p->packet); + p->packet = NULL; + g_free(p->iov); + p->iov = NULL; + multifd_send_state->ops->send_cleanup(p, errp); + + return *errp == NULL; +} + +static void multifd_send_cleanup_state(void) +{ + qemu_sem_destroy(&multifd_send_state->channels_created); + qemu_sem_destroy(&multifd_send_state->channels_ready); + g_free(multifd_send_state->params); + multifd_send_state->params = NULL; + multifd_pages_clear(multifd_send_state->pages); + multifd_send_state->pages = NULL; + g_free(multifd_send_state); + multifd_send_state = NULL; +} + +void multifd_send_shutdown(void) { int i; if (!migrate_multifd()) { return; } - multifd_send_terminate_threads(NULL); - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDSendParams *p = &multifd_send_state->params[i]; - if (p->running) { - qemu_thread_join(&p->thread); - } - } + multifd_send_terminate_threads(); + for (i = 0; i < migrate_multifd_channels(); i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; Error *local_err = NULL; - if (p->registered_yank) { - migration_ioc_unregister_yank(p->c); - } - multifd_send_channel_destroy(p->c); - p->c = NULL; - qemu_mutex_destroy(&p->mutex); - qemu_sem_destroy(&p->sem); - qemu_sem_destroy(&p->sem_sync); - g_free(p->name); - p->name = NULL; - multifd_pages_clear(p->pages); - p->pages = NULL; - p->packet_len = 0; - g_free(p->packet); - p->packet = NULL; - g_free(p->iov); - p->iov = NULL; - g_free(p->normal); - p->normal = NULL; - multifd_send_state->ops->send_cleanup(p, &local_err); - if (local_err) { + if (!multifd_send_cleanup_channel(p, &local_err)) { migrate_set_error(migrate_get_current(), local_err); error_free(local_err); } } - qemu_sem_destroy(&multifd_send_state->channels_ready); - g_free(multifd_send_state->params); - multifd_send_state->params = NULL; - multifd_pages_clear(multifd_send_state->pages); - multifd_send_state->pages = NULL; - g_free(multifd_send_state); - multifd_send_state = NULL; + + multifd_send_cleanup_state(); } static int multifd_zero_copy_flush(QIOChannel *c) @@ -592,47 +730,38 @@ int multifd_send_sync_main(void) return 0; } if (multifd_send_state->pages->num) { - if (multifd_send_pages() < 0) { + if (!multifd_send_pages()) { error_report("%s: multifd_send_pages fail", __func__); return -1; } } - /* - * When using zero-copy, it's necessary to flush the pages before any of - * the pages can be sent again, so we'll make sure the new version of the - * pages will always arrive _later_ than the old pages. - * - * Currently we achieve this by flushing the zero-page requested writes - * per ram iteration, but in the future we could potentially optimize it - * to be less frequent, e.g. only after we finished one whole scanning of - * all the dirty bitmaps. - */ - flush_zero_copy = migrate_zero_copy_send(); for (i = 0; i < migrate_multifd_channels(); i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; - trace_multifd_send_sync_main_signal(p->id); - - qemu_mutex_lock(&p->mutex); - - if (p->quit) { - error_report("%s: channel %d has already quit", __func__, i); - qemu_mutex_unlock(&p->mutex); + if (multifd_send_should_exit()) { return -1; } - p->packet_num = multifd_send_state->packet_num++; - p->flags |= MULTIFD_FLAG_SYNC; - p->pending_job++; - qemu_mutex_unlock(&p->mutex); + trace_multifd_send_sync_main_signal(p->id); + + /* + * We should be the only user so far, so not possible to be set by + * others concurrently. + */ + assert(qatomic_read(&p->pending_sync) == false); + qatomic_set(&p->pending_sync, true); qemu_sem_post(&p->sem); } for (i = 0; i < migrate_multifd_channels(); i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; + if (multifd_send_should_exit()) { + return -1; + } + qemu_sem_wait(&multifd_send_state->channels_ready); trace_multifd_send_sync_main_wait(p->id); qemu_sem_wait(&p->sem_sync); @@ -652,7 +781,6 @@ static void *multifd_send_thread(void *opaque) MigrationThread *thread = NULL; Error *local_err = NULL; int ret = 0; - bool use_zero_copy_send = migrate_zero_copy_send(); thread = migration_threads_add(p->name, qemu_get_thread_id()); @@ -663,64 +791,28 @@ static void *multifd_send_thread(void *opaque) ret = -1; goto out; } - /* initial packet */ - p->num_packets = 1; while (true) { qemu_sem_post(&multifd_send_state->channels_ready); qemu_sem_wait(&p->sem); - if (qatomic_read(&multifd_send_state->exiting)) { + if (multifd_send_should_exit()) { break; } - qemu_mutex_lock(&p->mutex); - - if (p->pending_job) { - uint64_t packet_num = p->packet_num; - uint32_t flags; - p->normal_num = 0; - - if (use_zero_copy_send) { - p->iovs_num = 0; - } else { - p->iovs_num = 1; - } - for (int i = 0; i < p->pages->num; i++) { - p->normal[p->normal_num] = p->pages->offset[i]; - p->normal_num++; - } + /* + * Read pending_job flag before p->pages. Pairs with the + * qatomic_store_release() in multifd_send_pages(). + */ + if (qatomic_load_acquire(&p->pending_job)) { + MultiFDPages_t *pages = p->pages; - if (p->normal_num) { - ret = multifd_send_state->ops->send_prepare(p, &local_err); - if (ret != 0) { - qemu_mutex_unlock(&p->mutex); - break; - } - } - multifd_send_fill_packet(p); - flags = p->flags; - p->flags = 0; - p->num_packets++; - p->total_normal_pages += p->normal_num; - p->pages->num = 0; - p->pages->block = NULL; - qemu_mutex_unlock(&p->mutex); + p->iovs_num = 0; + assert(pages->num); - trace_multifd_send(p->id, packet_num, p->normal_num, flags, - p->next_packet_size); - - if (use_zero_copy_send) { - /* Send header first, without zerocopy */ - ret = qio_channel_write_all(p->c, (void *)p->packet, - p->packet_len, &local_err); - if (ret != 0) { - break; - } - } else { - /* Send header using the same writev call */ - p->iov[0].iov_len = p->packet_len; - p->iov[0].iov_base = p->packet; + ret = multifd_send_state->ops->send_prepare(p, &local_err); + if (ret != 0) { + break; } ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL, @@ -731,17 +823,35 @@ static void *multifd_send_thread(void *opaque) stat64_add(&mig_stats.multifd_bytes, p->next_packet_size + p->packet_len); + + multifd_pages_reset(p->pages); p->next_packet_size = 0; - qemu_mutex_lock(&p->mutex); - p->pending_job--; - qemu_mutex_unlock(&p->mutex); - if (flags & MULTIFD_FLAG_SYNC) { - qemu_sem_post(&p->sem_sync); - } + /* + * Making sure p->pages is published before saying "we're + * free". Pairs with the smp_mb_acquire() in + * multifd_send_pages(). + */ + qatomic_store_release(&p->pending_job, false); } else { - qemu_mutex_unlock(&p->mutex); - /* sometimes there are spurious wakeups */ + /* + * If not a normal job, must be a sync request. Note that + * pending_sync is a standalone flag (unlike pending_job), so + * it doesn't require explicit memory barriers. + */ + assert(qatomic_read(&p->pending_sync)); + p->flags = MULTIFD_FLAG_SYNC; + multifd_send_fill_packet(p); + ret = qio_channel_write_all(p->c, (void *)p->packet, + p->packet_len, &local_err); + if (ret != 0) { + break; + } + /* p->next_packet_size will always be zero for a SYNC packet */ + stat64_add(&mig_stats.multifd_bytes, p->packet_len); + p->flags = 0; + qatomic_set(&p->pending_sync, false); + qemu_sem_post(&p->sem_sync); } } @@ -749,53 +859,19 @@ out: if (ret) { assert(local_err); trace_multifd_send_error(p->id); - multifd_send_terminate_threads(local_err); - qemu_sem_post(&p->sem_sync); - qemu_sem_post(&multifd_send_state->channels_ready); + multifd_send_set_error(local_err); + multifd_send_kick_main(p); error_free(local_err); } - qemu_mutex_lock(&p->mutex); - p->running = false; - qemu_mutex_unlock(&p->mutex); - rcu_unregister_thread(); migration_threads_remove(thread); - trace_multifd_send_thread_end(p->id, p->num_packets, p->total_normal_pages); + trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages); return NULL; } -static bool multifd_channel_connect(MultiFDSendParams *p, - QIOChannel *ioc, - Error **errp); - -static void multifd_tls_outgoing_handshake(QIOTask *task, - gpointer opaque) -{ - MultiFDSendParams *p = opaque; - QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task)); - Error *err = NULL; - - if (!qio_task_propagate_error(task, &err)) { - trace_multifd_tls_outgoing_handshake_complete(ioc); - if (multifd_channel_connect(p, ioc, &err)) { - return; - } - } - - trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err)); - - migrate_set_error(migrate_get_current(), err); - /* - * Error happen, mark multifd_send_thread status as 'quit' although it - * is not created, and then tell who pay attention to me. - */ - p->quit = true; - qemu_sem_post(&multifd_send_state->channels_ready); - qemu_sem_post(&p->sem_sync); - error_free(err); -} +static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque); static void *multifd_tls_handshake_thread(void *opaque) { @@ -803,7 +879,7 @@ static void *multifd_tls_handshake_thread(void *opaque) QIOChannelTLS *tioc = QIO_CHANNEL_TLS(p->c); qio_channel_tls_handshake(tioc, - multifd_tls_outgoing_handshake, + multifd_new_send_channel_async, p, NULL, NULL); @@ -823,11 +899,17 @@ static bool multifd_tls_channel_connect(MultiFDSendParams *p, return false; } + /* + * Ownership of the socket channel now transfers to the newly + * created TLS channel, which has already taken a reference. + */ object_unref(OBJECT(ioc)); trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname); qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing"); p->c = QIO_CHANNEL(tioc); - qemu_thread_create(&p->thread, "multifd-tls-handshake-worker", + + p->tls_thread_created = true; + qemu_thread_create(&p->tls_thread, "multifd-tls-handshake-worker", multifd_tls_handshake_thread, p, QEMU_THREAD_JOINABLE); return true; @@ -837,61 +919,72 @@ static bool multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc, Error **errp) { - trace_multifd_set_outgoing_channel( - ioc, object_get_typename(OBJECT(ioc)), - migrate_get_current()->hostname); - - if (migrate_channel_requires_tls_upgrade(ioc)) { - /* - * tls_channel_connect will call back to this - * function after the TLS handshake, - * so we mustn't call multifd_send_thread until then - */ - return multifd_tls_channel_connect(p, ioc, errp); - } + qio_channel_set_delay(ioc, false); migration_ioc_register_yank(ioc); p->registered_yank = true; p->c = ioc; + + p->thread_created = true; qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, QEMU_THREAD_JOINABLE); return true; } -static void multifd_new_send_channel_cleanup(MultiFDSendParams *p, - QIOChannel *ioc, Error *err) -{ - migrate_set_error(migrate_get_current(), err); - /* Error happen, we need to tell who pay attention to me */ - qemu_sem_post(&multifd_send_state->channels_ready); - qemu_sem_post(&p->sem_sync); - /* - * Although multifd_send_thread is not created, but main migration - * thread need to judge whether it is running, so we need to mark - * its status. - */ - p->quit = true; - object_unref(OBJECT(ioc)); - error_free(err); -} - +/* + * When TLS is enabled this function is called once to establish the + * TLS connection and a second time after the TLS handshake to create + * the multifd channel. Without TLS it goes straight into the channel + * creation. + */ static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) { MultiFDSendParams *p = opaque; QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task)); Error *local_err = NULL; + bool ret; trace_multifd_new_send_channel_async(p->id); - if (!qio_task_propagate_error(task, &local_err)) { - qio_channel_set_delay(ioc, false); - p->running = true; - if (multifd_channel_connect(p, ioc, &local_err)) { + + if (qio_task_propagate_error(task, &local_err)) { + ret = false; + goto out; + } + + trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)), + migrate_get_current()->hostname); + + if (migrate_channel_requires_tls_upgrade(ioc)) { + ret = multifd_tls_channel_connect(p, ioc, &local_err); + if (ret) { return; } + } else { + ret = multifd_channel_connect(p, ioc, &local_err); + } + +out: + /* + * Here we're not interested whether creation succeeded, only that + * it happened at all. + */ + qemu_sem_post(&multifd_send_state->channels_created); + + if (ret) { + return; } trace_multifd_new_send_channel_async_error(p->id, local_err); - multifd_new_send_channel_cleanup(p, ioc, local_err); + multifd_send_set_error(local_err); + if (!p->c) { + /* + * If no channel has been created, drop the initial + * reference. Otherwise cleanup happens at + * multifd_send_channel_destroy() + */ + object_unref(OBJECT(ioc)); + } + error_free(local_err); } static void multifd_new_send_channel_create(gpointer opaque) @@ -899,20 +992,23 @@ static void multifd_new_send_channel_create(gpointer opaque) socket_send_channel_create(multifd_new_send_channel_async, opaque); } -int multifd_save_setup(Error **errp) +bool multifd_send_setup(void) { - int thread_count; + MigrationState *s = migrate_get_current(); + Error *local_err = NULL; + int thread_count, ret = 0; uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); uint8_t i; if (!migrate_multifd()) { - return 0; + return true; } thread_count = migrate_multifd_channels(); multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); multifd_send_state->pages = multifd_pages_init(page_count); + qemu_sem_init(&multifd_send_state->channels_created, 0); qemu_sem_init(&multifd_send_state->channels_ready, 0); qatomic_set(&multifd_send_state->exiting, 0); multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; @@ -920,11 +1016,8 @@ int multifd_save_setup(Error **errp) for (i = 0; i < thread_count; i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; - qemu_mutex_init(&p->mutex); qemu_sem_init(&p->sem, 0); qemu_sem_init(&p->sem_sync, 0); - p->quit = false; - p->pending_job = 0; p->id = i; p->pages = multifd_pages_init(page_count); p->packet_len = sizeof(MultiFDPacket_t) @@ -935,29 +1028,39 @@ int multifd_save_setup(Error **errp) p->name = g_strdup_printf("multifdsend_%d", i); /* We need one extra place for the packet header */ p->iov = g_new0(struct iovec, page_count + 1); - p->normal = g_new0(ram_addr_t, page_count); p->page_size = qemu_target_page_size(); p->page_count = page_count; - - if (migrate_zero_copy_send()) { - p->write_flags = QIO_CHANNEL_WRITE_FLAG_ZERO_COPY; - } else { - p->write_flags = 0; - } - + p->write_flags = 0; multifd_new_send_channel_create(p); } + /* + * Wait until channel creation has started for all channels. The + * creation can still fail, but no more channels will be created + * past this point. + */ + for (i = 0; i < thread_count; i++) { + qemu_sem_wait(&multifd_send_state->channels_created); + } + for (i = 0; i < thread_count; i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; - int ret; - ret = multifd_send_state->ops->send_setup(p, errp); + ret = multifd_send_state->ops->send_setup(p, &local_err); if (ret) { - return ret; + break; } } - return 0; + + if (ret) { + migrate_set_error(s, local_err); + error_report_err(local_err); + migrate_set_state(&s->state, MIGRATION_STATUS_SETUP, + MIGRATION_STATUS_FAILED); + return false; + } + + return true; } struct { @@ -1006,14 +1109,42 @@ static void multifd_recv_terminate_threads(Error *err) } } -void multifd_load_shutdown(void) +void multifd_recv_shutdown(void) { if (migrate_multifd()) { multifd_recv_terminate_threads(NULL); } } -void multifd_load_cleanup(void) +static void multifd_recv_cleanup_channel(MultiFDRecvParams *p) +{ + migration_ioc_unregister_yank(p->c); + object_unref(OBJECT(p->c)); + p->c = NULL; + qemu_mutex_destroy(&p->mutex); + qemu_sem_destroy(&p->sem_sync); + g_free(p->name); + p->name = NULL; + p->packet_len = 0; + g_free(p->packet); + p->packet = NULL; + g_free(p->iov); + p->iov = NULL; + g_free(p->normal); + p->normal = NULL; + multifd_recv_state->ops->recv_cleanup(p); +} + +static void multifd_recv_cleanup_state(void) +{ + qemu_sem_destroy(&multifd_recv_state->sem_sync); + g_free(multifd_recv_state->params); + multifd_recv_state->params = NULL; + g_free(multifd_recv_state); + multifd_recv_state = NULL; +} + +void multifd_recv_cleanup(void) { int i; @@ -1024,40 +1155,20 @@ void multifd_load_cleanup(void) for (i = 0; i < migrate_multifd_channels(); i++) { MultiFDRecvParams *p = &multifd_recv_state->params[i]; - if (p->running) { - /* - * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code, - * however try to wakeup it without harm in cleanup phase. - */ - qemu_sem_post(&p->sem_sync); - } + /* + * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code, + * however try to wakeup it without harm in cleanup phase. + */ + qemu_sem_post(&p->sem_sync); - qemu_thread_join(&p->thread); + if (p->thread_created) { + qemu_thread_join(&p->thread); + } } for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDRecvParams *p = &multifd_recv_state->params[i]; - - migration_ioc_unregister_yank(p->c); - object_unref(OBJECT(p->c)); - p->c = NULL; - qemu_mutex_destroy(&p->mutex); - qemu_sem_destroy(&p->sem_sync); - g_free(p->name); - p->name = NULL; - p->packet_len = 0; - g_free(p->packet); - p->packet = NULL; - g_free(p->iov); - p->iov = NULL; - g_free(p->normal); - p->normal = NULL; - multifd_recv_state->ops->recv_cleanup(p); + multifd_recv_cleanup_channel(&multifd_recv_state->params[i]); } - qemu_sem_destroy(&multifd_recv_state->sem_sync); - g_free(multifd_recv_state->params); - multifd_recv_state->params = NULL; - g_free(multifd_recv_state); - multifd_recv_state = NULL; + multifd_recv_cleanup_state(); } void multifd_recv_sync_main(void) @@ -1119,10 +1230,6 @@ static void *multifd_recv_thread(void *opaque) flags = p->flags; /* recv methods don't know how to handle the SYNC flag */ p->flags &= ~MULTIFD_FLAG_SYNC; - trace_multifd_recv(p->id, p->packet_num, p->normal_num, flags, - p->next_packet_size); - p->num_packets++; - p->total_normal_pages += p->normal_num; qemu_mutex_unlock(&p->mutex); if (p->normal_num) { @@ -1142,17 +1249,14 @@ static void *multifd_recv_thread(void *opaque) multifd_recv_terminate_threads(local_err); error_free(local_err); } - qemu_mutex_lock(&p->mutex); - p->running = false; - qemu_mutex_unlock(&p->mutex); rcu_unregister_thread(); - trace_multifd_recv_thread_end(p->id, p->num_packets, p->total_normal_pages); + trace_multifd_recv_thread_end(p->id, p->packets_recved, p->total_normal_pages); return NULL; } -int multifd_load_setup(Error **errp) +int multifd_recv_setup(Error **errp) { int thread_count; uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); @@ -1249,10 +1353,8 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp) } p->c = ioc; object_ref(OBJECT(ioc)); - /* initial packet */ - p->num_packets = 1; - p->running = true; + p->thread_created = true; qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, QEMU_THREAD_JOINABLE); qatomic_inc(&multifd_recv_state->count); |