diff options
Diffstat (limited to 'migration/multifd.c')
| -rw-r--r-- | migration/multifd.c | 248 |
1 files changed, 206 insertions, 42 deletions
diff --git a/migration/multifd.c b/migration/multifd.c index 215ad0414a..dfb5189f0e 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -12,6 +12,7 @@ #include "qemu/osdep.h" #include "qemu/cutils.h" +#include "qemu/iov.h" #include "qemu/rcu.h" #include "exec/target_page.h" #include "system/system.h" @@ -19,8 +20,10 @@ #include "qemu/error-report.h" #include "qapi/error.h" #include "file.h" +#include "migration/misc.h" #include "migration.h" #include "migration-stats.h" +#include "savevm.h" #include "socket.h" #include "tls.h" #include "qemu-file.h" @@ -49,6 +52,10 @@ typedef struct { struct { MultiFDSendParams *params; + + /* multifd_send() body is not thread safe, needs serialization */ + QemuMutex multifd_send_mutex; + /* * Global number of generated multifd packets. * @@ -98,24 +105,44 @@ struct { MultiFDSendData *multifd_send_data_alloc(void) { - size_t max_payload_size, size_minus_payload; + MultiFDSendData *new = g_new0(MultiFDSendData, 1); - /* - * MultiFDPages_t has a flexible array at the end, account for it - * when allocating MultiFDSendData. Use max() in case other types - * added to the union in the future are larger than - * (MultiFDPages_t + flex array). - */ - max_payload_size = MAX(multifd_ram_payload_size(), sizeof(MultiFDPayload)); + multifd_ram_payload_alloc(&new->u.ram); + /* Device state allocates its payload on-demand */ - /* - * Account for any holes the compiler might insert. We can't pack - * the structure because that misaligns the members and triggers - * Waddress-of-packed-member. - */ - size_minus_payload = sizeof(MultiFDSendData) - sizeof(MultiFDPayload); + return new; +} - return g_malloc0(size_minus_payload + max_payload_size); +void multifd_send_data_clear(MultiFDSendData *data) +{ + if (multifd_payload_empty(data)) { + return; + } + + switch (data->type) { + case MULTIFD_PAYLOAD_DEVICE_STATE: + multifd_send_data_clear_device_state(&data->u.device_state); + break; + default: + /* Nothing to do */ + break; + } + + data->type = MULTIFD_PAYLOAD_NONE; +} + +void multifd_send_data_free(MultiFDSendData *data) +{ + if (!data) { + return; + } + + /* This also free's device state payload */ + multifd_send_data_clear(data); + + multifd_ram_payload_free(&data->u.ram); + + g_free(data); } static bool multifd_use_packets(void) @@ -201,6 +228,7 @@ static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) return msg.id; } +/* Fills a RAM multifd packet */ void multifd_send_fill_packet(MultiFDSendParams *p) { MultiFDPacket_t *packet = p->packet; @@ -209,10 +237,10 @@ void multifd_send_fill_packet(MultiFDSendParams *p) memset(packet, 0, p->packet_len); - packet->magic = cpu_to_be32(MULTIFD_MAGIC); - packet->version = cpu_to_be32(MULTIFD_VERSION); + packet->hdr.magic = cpu_to_be32(MULTIFD_MAGIC); + packet->hdr.version = cpu_to_be32(MULTIFD_VERSION); - packet->flags = cpu_to_be32(p->flags); + packet->hdr.flags = cpu_to_be32(p->flags); packet->next_packet_size = cpu_to_be32(p->next_packet_size); packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num); @@ -228,12 +256,12 @@ void multifd_send_fill_packet(MultiFDSendParams *p) p->flags, p->next_packet_size); } -static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) +static int multifd_recv_unfill_packet_header(MultiFDRecvParams *p, + const MultiFDPacketHdr_t *hdr, + Error **errp) { - const MultiFDPacket_t *packet = p->packet; - uint32_t magic = be32_to_cpu(packet->magic); - uint32_t version = be32_to_cpu(packet->version); - int ret = 0; + uint32_t magic = be32_to_cpu(hdr->magic); + uint32_t version = be32_to_cpu(hdr->version); if (magic != MULTIFD_MAGIC) { error_setg(errp, "multifd: received packet magic %x, expected %x", @@ -247,10 +275,29 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) return -1; } - p->flags = be32_to_cpu(packet->flags); + p->flags = be32_to_cpu(hdr->flags); + + return 0; +} + +static int multifd_recv_unfill_packet_device_state(MultiFDRecvParams *p, + Error **errp) +{ + MultiFDPacketDeviceState_t *packet = p->packet_dev_state; + + packet->instance_id = be32_to_cpu(packet->instance_id); + p->next_packet_size = be32_to_cpu(packet->next_packet_size); + + return 0; +} + +static int multifd_recv_unfill_packet_ram(MultiFDRecvParams *p, Error **errp) +{ + const MultiFDPacket_t *packet = p->packet; + int ret = 0; + p->next_packet_size = be32_to_cpu(packet->next_packet_size); p->packet_num = be64_to_cpu(packet->packet_num); - p->packets_recved++; /* Always unfill, old QEMUs (<9.0) send data along with SYNC */ ret = multifd_ram_unfill_packet(p, errp); @@ -261,6 +308,17 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) return ret; } +static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) +{ + p->packets_recved++; + + if (p->flags & MULTIFD_FLAG_DEVICE_STATE) { + return multifd_recv_unfill_packet_device_state(p, errp); + } + + return multifd_recv_unfill_packet_ram(p, errp); +} + static bool multifd_send_should_exit(void) { return qatomic_read(&multifd_send_state->exiting); @@ -308,6 +366,8 @@ bool multifd_send(MultiFDSendData **send_data) return false; } + QEMU_LOCK_GUARD(&multifd_send_state->multifd_send_mutex); + /* We wait here, until at least one channel is ready */ qemu_sem_wait(&multifd_send_state->channels_ready); @@ -459,9 +519,9 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) qemu_sem_destroy(&p->sem_sync); g_free(p->name); p->name = NULL; - g_free(p->data); - p->data = NULL; + g_clear_pointer(&p->data, multifd_send_data_free); p->packet_len = 0; + g_clear_pointer(&p->packet_device_state, g_free); g_free(p->packet); p->packet = NULL; multifd_send_state->ops->send_cleanup(p, errp); @@ -474,8 +534,10 @@ static void multifd_send_cleanup_state(void) { file_cleanup_outgoing_migration(); socket_cleanup_outgoing_migration(); + multifd_device_state_send_cleanup(); qemu_sem_destroy(&multifd_send_state->channels_created); qemu_sem_destroy(&multifd_send_state->channels_ready); + qemu_mutex_destroy(&multifd_send_state->multifd_send_mutex); g_free(multifd_send_state->params); multifd_send_state->params = NULL; g_free(multifd_send_state); @@ -631,16 +693,32 @@ static void *multifd_send_thread(void *opaque) * qatomic_store_release() in multifd_send(). */ if (qatomic_load_acquire(&p->pending_job)) { + bool is_device_state = multifd_payload_device_state(p->data); + size_t total_size; + p->flags = 0; p->iovs_num = 0; assert(!multifd_payload_empty(p->data)); - ret = multifd_send_state->ops->send_prepare(p, &local_err); - if (ret != 0) { - break; + if (is_device_state) { + multifd_device_state_send_prepare(p); + } else { + ret = multifd_send_state->ops->send_prepare(p, &local_err); + if (ret != 0) { + break; + } } + /* + * The packet header in the zerocopy RAM case is accounted for + * in multifd_nocomp_send_prepare() - where it is actually + * being sent. + */ + total_size = iov_size(p->iov, p->iovs_num); + if (migrate_mapped_ram()) { + assert(!is_device_state); + ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num, &p->data->u.ram, &local_err); } else { @@ -653,11 +731,10 @@ static void *multifd_send_thread(void *opaque) break; } - stat64_add(&mig_stats.multifd_bytes, - (uint64_t)p->next_packet_size + p->packet_len); + stat64_add(&mig_stats.multifd_bytes, total_size); p->next_packet_size = 0; - multifd_set_payload_type(p->data, MULTIFD_PAYLOAD_NONE); + multifd_send_data_clear(p->data); /* * Making sure p->data is published before saying "we're @@ -856,6 +933,7 @@ bool multifd_send_setup(void) thread_count = migrate_multifd_channels(); multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); + qemu_mutex_init(&multifd_send_state->multifd_send_mutex); qemu_sem_init(&multifd_send_state->channels_created, 0); qemu_sem_init(&multifd_send_state->channels_ready, 0); qatomic_set(&multifd_send_state->exiting, 0); @@ -874,6 +952,9 @@ bool multifd_send_setup(void) p->packet_len = sizeof(MultiFDPacket_t) + sizeof(uint64_t) * page_count; p->packet = g_malloc0(p->packet_len); + p->packet_device_state = g_malloc0(sizeof(*p->packet_device_state)); + p->packet_device_state->hdr.magic = cpu_to_be32(MULTIFD_MAGIC); + p->packet_device_state->hdr.version = cpu_to_be32(MULTIFD_VERSION); } p->name = g_strdup_printf(MIGRATION_THREAD_SRC_MULTIFD, i); p->write_flags = 0; @@ -909,6 +990,8 @@ bool multifd_send_setup(void) assert(p->iov); } + multifd_device_state_send_setup(); + return true; err: @@ -1048,6 +1131,7 @@ static void multifd_recv_cleanup_channel(MultiFDRecvParams *p) p->packet_len = 0; g_free(p->packet); p->packet = NULL; + g_clear_pointer(&p->packet_dev_state, g_free); g_free(p->normal); p->normal = NULL; g_free(p->zero); @@ -1149,6 +1233,34 @@ void multifd_recv_sync_main(void) trace_multifd_recv_sync_main(multifd_recv_state->packet_num); } +static int multifd_device_state_recv(MultiFDRecvParams *p, Error **errp) +{ + g_autofree char *dev_state_buf = NULL; + int ret; + + dev_state_buf = g_malloc(p->next_packet_size); + + ret = qio_channel_read_all(p->c, dev_state_buf, p->next_packet_size, errp); + if (ret != 0) { + return ret; + } + + if (p->packet_dev_state->idstr[sizeof(p->packet_dev_state->idstr) - 1] + != 0) { + error_setg(errp, "unterminated multifd device state idstr"); + return -1; + } + + if (!qemu_loadvm_load_state_buffer(p->packet_dev_state->idstr, + p->packet_dev_state->instance_id, + dev_state_buf, p->next_packet_size, + errp)) { + ret = -1; + } + + return ret; +} + static void *multifd_recv_thread(void *opaque) { MigrationState *s = migrate_get_current(); @@ -1165,14 +1277,19 @@ static void *multifd_recv_thread(void *opaque) } while (true) { + MultiFDPacketHdr_t hdr; uint32_t flags = 0; + bool is_device_state = false; bool has_data = false; + uint8_t *pkt_buf; + size_t pkt_len; + p->normal_num = 0; if (use_packets) { struct iovec iov = { - .iov_base = (void *)p->packet, - .iov_len = p->packet_len + .iov_base = (void *)&hdr, + .iov_len = sizeof(hdr) }; if (multifd_recv_should_exit()) { @@ -1191,6 +1308,32 @@ static void *multifd_recv_thread(void *opaque) break; } + ret = multifd_recv_unfill_packet_header(p, &hdr, &local_err); + if (ret) { + break; + } + + is_device_state = p->flags & MULTIFD_FLAG_DEVICE_STATE; + if (is_device_state) { + pkt_buf = (uint8_t *)p->packet_dev_state + sizeof(hdr); + pkt_len = sizeof(*p->packet_dev_state) - sizeof(hdr); + } else { + pkt_buf = (uint8_t *)p->packet + sizeof(hdr); + pkt_len = p->packet_len - sizeof(hdr); + } + + ret = qio_channel_read_all_eof(p->c, (char *)pkt_buf, pkt_len, + &local_err); + if (!ret) { + /* EOF */ + error_setg(&local_err, "multifd: unexpected EOF after packet header"); + break; + } + + if (ret == -1) { + break; + } + qemu_mutex_lock(&p->mutex); ret = multifd_recv_unfill_packet(p, &local_err); if (ret) { @@ -1202,12 +1345,17 @@ static void *multifd_recv_thread(void *opaque) /* recv methods don't know how to handle the SYNC flag */ p->flags &= ~MULTIFD_FLAG_SYNC; - /* - * Even if it's a SYNC packet, this needs to be set - * because older QEMUs (<9.0) still send data along with - * the SYNC packet. - */ - has_data = p->normal_num || p->zero_num; + if (is_device_state) { + has_data = p->next_packet_size > 0; + } else { + /* + * Even if it's a SYNC packet, this needs to be set + * because older QEMUs (<9.0) still send data along with + * the SYNC packet. + */ + has_data = p->normal_num || p->zero_num; + } + qemu_mutex_unlock(&p->mutex); } else { /* @@ -1236,14 +1384,29 @@ static void *multifd_recv_thread(void *opaque) } if (has_data) { - ret = multifd_recv_state->ops->recv(p, &local_err); + if (is_device_state) { + assert(use_packets); + ret = multifd_device_state_recv(p, &local_err); + } else { + ret = multifd_recv_state->ops->recv(p, &local_err); + } if (ret != 0) { break; } + } else if (is_device_state) { + error_setg(&local_err, + "multifd: received empty device state packet"); + break; } if (use_packets) { if (flags & MULTIFD_FLAG_SYNC) { + if (is_device_state) { + error_setg(&local_err, + "multifd: received SYNC device state packet"); + break; + } + qemu_sem_post(&multifd_recv_state->sem_sync); qemu_sem_wait(&p->sem_sync); } @@ -1312,6 +1475,7 @@ int multifd_recv_setup(Error **errp) p->packet_len = sizeof(MultiFDPacket_t) + sizeof(uint64_t) * page_count; p->packet = g_malloc0(p->packet_len); + p->packet_dev_state = g_malloc0(sizeof(*p->packet_dev_state)); } p->name = g_strdup_printf(MIGRATION_THREAD_DST_MULTIFD, i); p->normal = g_new0(ram_addr_t, page_count); |