diff options
Diffstat (limited to 'migration')
| -rw-r--r-- | migration/colo.c | 3 | ||||
| -rw-r--r-- | migration/meson.build | 1 | ||||
| -rw-r--r-- | migration/migration-hmp-cmds.c | 2 | ||||
| -rw-r--r-- | migration/migration.c | 17 | ||||
| -rw-r--r-- | migration/migration.h | 7 | ||||
| -rw-r--r-- | migration/multifd-device-state.c | 212 | ||||
| -rw-r--r-- | migration/multifd-nocomp.c | 30 | ||||
| -rw-r--r-- | migration/multifd.c | 248 | ||||
| -rw-r--r-- | migration/multifd.h | 74 | ||||
| -rw-r--r-- | migration/options.c | 9 | ||||
| -rw-r--r-- | migration/qemu-file.h | 2 | ||||
| -rw-r--r-- | migration/savevm.c | 201 | ||||
| -rw-r--r-- | migration/savevm.h | 6 | ||||
| -rw-r--r-- | migration/trace-events | 1 |
14 files changed, 742 insertions, 71 deletions
diff --git a/migration/colo.c b/migration/colo.c index 9a8e5fbe9b..c976b3ff34 100644 --- a/migration/colo.c +++ b/migration/colo.c @@ -452,6 +452,9 @@ static int colo_do_checkpoint_transaction(MigrationState *s, bql_unlock(); goto out; } + + qemu_savevm_maybe_send_switchover_start(s->to_dst_file); + /* Note: device state is saved into buffer */ ret = qemu_save_device_state(fb); diff --git a/migration/meson.build b/migration/meson.build index d3bfe84d62..9aa48b290e 100644 --- a/migration/meson.build +++ b/migration/meson.build @@ -25,6 +25,7 @@ system_ss.add(files( 'migration-hmp-cmds.c', 'migration.c', 'multifd.c', + 'multifd-device-state.c', 'multifd-nocomp.c', 'multifd-zlib.c', 'multifd-zero-page.c', diff --git a/migration/migration-hmp-cmds.c b/migration/migration-hmp-cmds.c index 3347e34c48..49c26daed3 100644 --- a/migration/migration-hmp-cmds.c +++ b/migration/migration-hmp-cmds.c @@ -46,6 +46,8 @@ static void migration_global_dump(Monitor *mon) ms->send_configuration ? "on" : "off"); monitor_printf(mon, "send-section-footer: %s\n", ms->send_section_footer ? "on" : "off"); + monitor_printf(mon, "send-switchover-start: %s\n", + ms->send_switchover_start ? "on" : "off"); monitor_printf(mon, "clear-bitmap-shift: %u\n", ms->clear_bitmap_shift); } diff --git a/migration/migration.c b/migration/migration.c index c597aa707e..1833cfe358 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -402,11 +402,24 @@ void migration_incoming_state_destroy(void) struct MigrationIncomingState *mis = migration_incoming_get_current(); multifd_recv_cleanup(); + /* * RAM state cleanup needs to happen after multifd cleanup, because * multifd threads can use some of its states (receivedmap). + * The VFIO load_cleanup() implementation is BQL-sensitive. It requires + * BQL must NOT be taken when recycling load threads, so that it won't + * block the load threads from making progress on address space + * modification operations. + * + * To make it work, we could try to not take BQL for all load_cleanup(), + * or conditionally unlock BQL only if bql_locked() in VFIO. + * + * Since most existing call sites take BQL for load_cleanup(), make + * it simple by taking BQL always as the rule, so that VFIO can unlock + * BQL and retake unconditionally. */ - qemu_loadvm_state_cleanup(); + assert(bql_locked()); + qemu_loadvm_state_cleanup(mis); if (mis->to_src_file) { /* Tell source that we are done */ @@ -2891,6 +2904,8 @@ static bool migration_switchover_start(MigrationState *s, Error **errp) precopy_notify_complete(); + qemu_savevm_maybe_send_switchover_start(s->to_dst_file); + return true; } diff --git a/migration/migration.h b/migration/migration.h index 4639e2a7e4..d53f7cad84 100644 --- a/migration/migration.h +++ b/migration/migration.h @@ -43,6 +43,7 @@ #define MIGRATION_THREAD_DST_PREEMPT "mig/dst/preempt" struct PostcopyBlocktimeContext; +typedef struct ThreadPool ThreadPool; #define MIGRATION_RESUME_ACK_VALUE (1) @@ -187,6 +188,10 @@ struct MigrationIncomingState { Coroutine *colo_incoming_co; QemuSemaphore colo_incoming_sem; + /* Optional load threads pool and its thread exit request flag */ + ThreadPool *load_threads; + bool load_threads_abort; + /* * PostcopyBlocktimeContext to keep information for postcopy * live migration, to calculate vCPU block time @@ -400,6 +405,8 @@ struct MigrationState { bool send_configuration; /* Whether we send section footer during migration */ bool send_section_footer; + /* Whether we send switchover start notification during migration */ + bool send_switchover_start; /* Needed by postcopy-pause state */ QemuSemaphore postcopy_pause_sem; diff --git a/migration/multifd-device-state.c b/migration/multifd-device-state.c new file mode 100644 index 0000000000..94222d0eb0 --- /dev/null +++ b/migration/multifd-device-state.c @@ -0,0 +1,212 @@ +/* + * Multifd device state migration + * + * Copyright (C) 2024,2025 Oracle and/or its affiliates. + * + * This work is licensed under the terms of the GNU GPL, version 2 or later. + * See the COPYING file in the top-level directory. + * + * SPDX-License-Identifier: GPL-2.0-or-later + */ + +#include "qemu/osdep.h" +#include "qapi/error.h" +#include "qemu/lockable.h" +#include "block/thread-pool.h" +#include "migration.h" +#include "migration/misc.h" +#include "multifd.h" +#include "options.h" + +static struct { + QemuMutex queue_job_mutex; + + MultiFDSendData *send_data; + + ThreadPool *threads; + bool threads_abort; +} *multifd_send_device_state; + +void multifd_device_state_send_setup(void) +{ + assert(!multifd_send_device_state); + multifd_send_device_state = g_malloc(sizeof(*multifd_send_device_state)); + + qemu_mutex_init(&multifd_send_device_state->queue_job_mutex); + + multifd_send_device_state->send_data = multifd_send_data_alloc(); + + multifd_send_device_state->threads = thread_pool_new(); + multifd_send_device_state->threads_abort = false; +} + +void multifd_device_state_send_cleanup(void) +{ + g_clear_pointer(&multifd_send_device_state->threads, thread_pool_free); + g_clear_pointer(&multifd_send_device_state->send_data, + multifd_send_data_free); + + qemu_mutex_destroy(&multifd_send_device_state->queue_job_mutex); + + g_clear_pointer(&multifd_send_device_state, g_free); +} + +void multifd_send_data_clear_device_state(MultiFDDeviceState_t *device_state) +{ + g_clear_pointer(&device_state->idstr, g_free); + g_clear_pointer(&device_state->buf, g_free); +} + +static void multifd_device_state_fill_packet(MultiFDSendParams *p) +{ + MultiFDDeviceState_t *device_state = &p->data->u.device_state; + MultiFDPacketDeviceState_t *packet = p->packet_device_state; + + packet->hdr.flags = cpu_to_be32(p->flags); + strncpy(packet->idstr, device_state->idstr, sizeof(packet->idstr) - 1); + packet->idstr[sizeof(packet->idstr) - 1] = 0; + packet->instance_id = cpu_to_be32(device_state->instance_id); + packet->next_packet_size = cpu_to_be32(p->next_packet_size); +} + +static void multifd_prepare_header_device_state(MultiFDSendParams *p) +{ + p->iov[0].iov_len = sizeof(*p->packet_device_state); + p->iov[0].iov_base = p->packet_device_state; + p->iovs_num++; +} + +void multifd_device_state_send_prepare(MultiFDSendParams *p) +{ + MultiFDDeviceState_t *device_state = &p->data->u.device_state; + + assert(multifd_payload_device_state(p->data)); + + multifd_prepare_header_device_state(p); + + assert(!(p->flags & MULTIFD_FLAG_SYNC)); + + p->next_packet_size = device_state->buf_len; + if (p->next_packet_size > 0) { + p->iov[p->iovs_num].iov_base = device_state->buf; + p->iov[p->iovs_num].iov_len = p->next_packet_size; + p->iovs_num++; + } + + p->flags |= MULTIFD_FLAG_NOCOMP | MULTIFD_FLAG_DEVICE_STATE; + + multifd_device_state_fill_packet(p); +} + +bool multifd_queue_device_state(char *idstr, uint32_t instance_id, + char *data, size_t len) +{ + /* Device state submissions can come from multiple threads */ + QEMU_LOCK_GUARD(&multifd_send_device_state->queue_job_mutex); + MultiFDDeviceState_t *device_state; + + assert(multifd_payload_empty(multifd_send_device_state->send_data)); + + multifd_set_payload_type(multifd_send_device_state->send_data, + MULTIFD_PAYLOAD_DEVICE_STATE); + device_state = &multifd_send_device_state->send_data->u.device_state; + device_state->idstr = g_strdup(idstr); + device_state->instance_id = instance_id; + device_state->buf = g_memdup2(data, len); + device_state->buf_len = len; + + if (!multifd_send(&multifd_send_device_state->send_data)) { + multifd_send_data_clear(multifd_send_device_state->send_data); + return false; + } + + return true; +} + +bool multifd_device_state_supported(void) +{ + return migrate_multifd() && !migrate_mapped_ram() && + migrate_multifd_compression() == MULTIFD_COMPRESSION_NONE; +} + +static void multifd_device_state_save_thread_data_free(void *opaque) +{ + SaveLiveCompletePrecopyThreadData *data = opaque; + + g_clear_pointer(&data->idstr, g_free); + g_free(data); +} + +static int multifd_device_state_save_thread(void *opaque) +{ + SaveLiveCompletePrecopyThreadData *data = opaque; + g_autoptr(Error) local_err = NULL; + + if (!data->hdlr(data, &local_err)) { + MigrationState *s = migrate_get_current(); + + /* + * Can't call abort_device_state_save_threads() here since new + * save threads could still be in process of being launched + * (if, for example, the very first save thread launched exited + * with an error very quickly). + */ + + assert(local_err); + + /* + * In case of multiple save threads failing which thread error + * return we end setting is purely arbitrary. + */ + migrate_set_error(s, local_err); + } + + return 0; +} + +bool multifd_device_state_save_thread_should_exit(void) +{ + return qatomic_read(&multifd_send_device_state->threads_abort); +} + +void +multifd_spawn_device_state_save_thread(SaveLiveCompletePrecopyThreadHandler hdlr, + char *idstr, uint32_t instance_id, + void *opaque) +{ + SaveLiveCompletePrecopyThreadData *data; + + assert(multifd_device_state_supported()); + assert(multifd_send_device_state); + + assert(!qatomic_read(&multifd_send_device_state->threads_abort)); + + data = g_new(SaveLiveCompletePrecopyThreadData, 1); + data->hdlr = hdlr; + data->idstr = g_strdup(idstr); + data->instance_id = instance_id; + data->handler_opaque = opaque; + + thread_pool_submit_immediate(multifd_send_device_state->threads, + multifd_device_state_save_thread, + data, + multifd_device_state_save_thread_data_free); +} + +void multifd_abort_device_state_save_threads(void) +{ + assert(multifd_device_state_supported()); + + qatomic_set(&multifd_send_device_state->threads_abort, true); +} + +bool multifd_join_device_state_save_threads(void) +{ + MigrationState *s = migrate_get_current(); + + assert(multifd_device_state_supported()); + + thread_pool_wait(multifd_send_device_state->threads); + + return !migrate_has_error(s); +} diff --git a/migration/multifd-nocomp.c b/migration/multifd-nocomp.c index 1325dba97c..ffe75256c9 100644 --- a/migration/multifd-nocomp.c +++ b/migration/multifd-nocomp.c @@ -14,6 +14,7 @@ #include "exec/ramblock.h" #include "exec/target_page.h" #include "file.h" +#include "migration-stats.h" #include "multifd.h" #include "options.h" #include "qapi/error.h" @@ -24,15 +25,14 @@ static MultiFDSendData *multifd_ram_send; -size_t multifd_ram_payload_size(void) +void multifd_ram_payload_alloc(MultiFDPages_t *pages) { - uint32_t n = multifd_ram_page_count(); + pages->offset = g_new0(ram_addr_t, multifd_ram_page_count()); +} - /* - * We keep an array of page offsets at the end of MultiFDPages_t, - * add space for it in the allocation. - */ - return sizeof(MultiFDPages_t) + n * sizeof(ram_addr_t); +void multifd_ram_payload_free(MultiFDPages_t *pages) +{ + g_clear_pointer(&pages->offset, g_free); } void multifd_ram_save_setup(void) @@ -42,8 +42,7 @@ void multifd_ram_save_setup(void) void multifd_ram_save_cleanup(void) { - g_free(multifd_ram_send); - multifd_ram_send = NULL; + g_clear_pointer(&multifd_ram_send, multifd_send_data_free); } static void multifd_set_file_bitmap(MultiFDSendParams *p) @@ -86,6 +85,13 @@ static void multifd_nocomp_send_cleanup(MultiFDSendParams *p, Error **errp) return; } +static void multifd_ram_prepare_header(MultiFDSendParams *p) +{ + p->iov[0].iov_len = p->packet_len; + p->iov[0].iov_base = p->packet; + p->iovs_num++; +} + static void multifd_send_prepare_iovs(MultiFDSendParams *p) { MultiFDPages_t *pages = &p->data->u.ram; @@ -119,7 +125,7 @@ static int multifd_nocomp_send_prepare(MultiFDSendParams *p, Error **errp) * Only !zerocopy needs the header in IOV; zerocopy will * send it separately. */ - multifd_send_prepare_header(p); + multifd_ram_prepare_header(p); } multifd_send_prepare_iovs(p); @@ -134,6 +140,8 @@ static int multifd_nocomp_send_prepare(MultiFDSendParams *p, Error **errp) if (ret != 0) { return -1; } + + stat64_add(&mig_stats.multifd_bytes, p->packet_len); } return 0; @@ -432,7 +440,7 @@ int multifd_ram_flush_and_sync(QEMUFile *f) bool multifd_send_prepare_common(MultiFDSendParams *p) { MultiFDPages_t *pages = &p->data->u.ram; - multifd_send_prepare_header(p); + multifd_ram_prepare_header(p); multifd_send_zero_page_detect(p); if (!pages->normal_num) { diff --git a/migration/multifd.c b/migration/multifd.c index 215ad0414a..dfb5189f0e 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -12,6 +12,7 @@ #include "qemu/osdep.h" #include "qemu/cutils.h" +#include "qemu/iov.h" #include "qemu/rcu.h" #include "exec/target_page.h" #include "system/system.h" @@ -19,8 +20,10 @@ #include "qemu/error-report.h" #include "qapi/error.h" #include "file.h" +#include "migration/misc.h" #include "migration.h" #include "migration-stats.h" +#include "savevm.h" #include "socket.h" #include "tls.h" #include "qemu-file.h" @@ -49,6 +52,10 @@ typedef struct { struct { MultiFDSendParams *params; + + /* multifd_send() body is not thread safe, needs serialization */ + QemuMutex multifd_send_mutex; + /* * Global number of generated multifd packets. * @@ -98,24 +105,44 @@ struct { MultiFDSendData *multifd_send_data_alloc(void) { - size_t max_payload_size, size_minus_payload; + MultiFDSendData *new = g_new0(MultiFDSendData, 1); - /* - * MultiFDPages_t has a flexible array at the end, account for it - * when allocating MultiFDSendData. Use max() in case other types - * added to the union in the future are larger than - * (MultiFDPages_t + flex array). - */ - max_payload_size = MAX(multifd_ram_payload_size(), sizeof(MultiFDPayload)); + multifd_ram_payload_alloc(&new->u.ram); + /* Device state allocates its payload on-demand */ - /* - * Account for any holes the compiler might insert. We can't pack - * the structure because that misaligns the members and triggers - * Waddress-of-packed-member. - */ - size_minus_payload = sizeof(MultiFDSendData) - sizeof(MultiFDPayload); + return new; +} - return g_malloc0(size_minus_payload + max_payload_size); +void multifd_send_data_clear(MultiFDSendData *data) +{ + if (multifd_payload_empty(data)) { + return; + } + + switch (data->type) { + case MULTIFD_PAYLOAD_DEVICE_STATE: + multifd_send_data_clear_device_state(&data->u.device_state); + break; + default: + /* Nothing to do */ + break; + } + + data->type = MULTIFD_PAYLOAD_NONE; +} + +void multifd_send_data_free(MultiFDSendData *data) +{ + if (!data) { + return; + } + + /* This also free's device state payload */ + multifd_send_data_clear(data); + + multifd_ram_payload_free(&data->u.ram); + + g_free(data); } static bool multifd_use_packets(void) @@ -201,6 +228,7 @@ static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) return msg.id; } +/* Fills a RAM multifd packet */ void multifd_send_fill_packet(MultiFDSendParams *p) { MultiFDPacket_t *packet = p->packet; @@ -209,10 +237,10 @@ void multifd_send_fill_packet(MultiFDSendParams *p) memset(packet, 0, p->packet_len); - packet->magic = cpu_to_be32(MULTIFD_MAGIC); - packet->version = cpu_to_be32(MULTIFD_VERSION); + packet->hdr.magic = cpu_to_be32(MULTIFD_MAGIC); + packet->hdr.version = cpu_to_be32(MULTIFD_VERSION); - packet->flags = cpu_to_be32(p->flags); + packet->hdr.flags = cpu_to_be32(p->flags); packet->next_packet_size = cpu_to_be32(p->next_packet_size); packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num); @@ -228,12 +256,12 @@ void multifd_send_fill_packet(MultiFDSendParams *p) p->flags, p->next_packet_size); } -static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) +static int multifd_recv_unfill_packet_header(MultiFDRecvParams *p, + const MultiFDPacketHdr_t *hdr, + Error **errp) { - const MultiFDPacket_t *packet = p->packet; - uint32_t magic = be32_to_cpu(packet->magic); - uint32_t version = be32_to_cpu(packet->version); - int ret = 0; + uint32_t magic = be32_to_cpu(hdr->magic); + uint32_t version = be32_to_cpu(hdr->version); if (magic != MULTIFD_MAGIC) { error_setg(errp, "multifd: received packet magic %x, expected %x", @@ -247,10 +275,29 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) return -1; } - p->flags = be32_to_cpu(packet->flags); + p->flags = be32_to_cpu(hdr->flags); + + return 0; +} + +static int multifd_recv_unfill_packet_device_state(MultiFDRecvParams *p, + Error **errp) +{ + MultiFDPacketDeviceState_t *packet = p->packet_dev_state; + + packet->instance_id = be32_to_cpu(packet->instance_id); + p->next_packet_size = be32_to_cpu(packet->next_packet_size); + + return 0; +} + +static int multifd_recv_unfill_packet_ram(MultiFDRecvParams *p, Error **errp) +{ + const MultiFDPacket_t *packet = p->packet; + int ret = 0; + p->next_packet_size = be32_to_cpu(packet->next_packet_size); p->packet_num = be64_to_cpu(packet->packet_num); - p->packets_recved++; /* Always unfill, old QEMUs (<9.0) send data along with SYNC */ ret = multifd_ram_unfill_packet(p, errp); @@ -261,6 +308,17 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) return ret; } +static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) +{ + p->packets_recved++; + + if (p->flags & MULTIFD_FLAG_DEVICE_STATE) { + return multifd_recv_unfill_packet_device_state(p, errp); + } + + return multifd_recv_unfill_packet_ram(p, errp); +} + static bool multifd_send_should_exit(void) { return qatomic_read(&multifd_send_state->exiting); @@ -308,6 +366,8 @@ bool multifd_send(MultiFDSendData **send_data) return false; } + QEMU_LOCK_GUARD(&multifd_send_state->multifd_send_mutex); + /* We wait here, until at least one channel is ready */ qemu_sem_wait(&multifd_send_state->channels_ready); @@ -459,9 +519,9 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) qemu_sem_destroy(&p->sem_sync); g_free(p->name); p->name = NULL; - g_free(p->data); - p->data = NULL; + g_clear_pointer(&p->data, multifd_send_data_free); p->packet_len = 0; + g_clear_pointer(&p->packet_device_state, g_free); g_free(p->packet); p->packet = NULL; multifd_send_state->ops->send_cleanup(p, errp); @@ -474,8 +534,10 @@ static void multifd_send_cleanup_state(void) { file_cleanup_outgoing_migration(); socket_cleanup_outgoing_migration(); + multifd_device_state_send_cleanup(); qemu_sem_destroy(&multifd_send_state->channels_created); qemu_sem_destroy(&multifd_send_state->channels_ready); + qemu_mutex_destroy(&multifd_send_state->multifd_send_mutex); g_free(multifd_send_state->params); multifd_send_state->params = NULL; g_free(multifd_send_state); @@ -631,16 +693,32 @@ static void *multifd_send_thread(void *opaque) * qatomic_store_release() in multifd_send(). */ if (qatomic_load_acquire(&p->pending_job)) { + bool is_device_state = multifd_payload_device_state(p->data); + size_t total_size; + p->flags = 0; p->iovs_num = 0; assert(!multifd_payload_empty(p->data)); - ret = multifd_send_state->ops->send_prepare(p, &local_err); - if (ret != 0) { - break; + if (is_device_state) { + multifd_device_state_send_prepare(p); + } else { + ret = multifd_send_state->ops->send_prepare(p, &local_err); + if (ret != 0) { + break; + } } + /* + * The packet header in the zerocopy RAM case is accounted for + * in multifd_nocomp_send_prepare() - where it is actually + * being sent. + */ + total_size = iov_size(p->iov, p->iovs_num); + if (migrate_mapped_ram()) { + assert(!is_device_state); + ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num, &p->data->u.ram, &local_err); } else { @@ -653,11 +731,10 @@ static void *multifd_send_thread(void *opaque) break; } - stat64_add(&mig_stats.multifd_bytes, - (uint64_t)p->next_packet_size + p->packet_len); + stat64_add(&mig_stats.multifd_bytes, total_size); p->next_packet_size = 0; - multifd_set_payload_type(p->data, MULTIFD_PAYLOAD_NONE); + multifd_send_data_clear(p->data); /* * Making sure p->data is published before saying "we're @@ -856,6 +933,7 @@ bool multifd_send_setup(void) thread_count = migrate_multifd_channels(); multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); + qemu_mutex_init(&multifd_send_state->multifd_send_mutex); qemu_sem_init(&multifd_send_state->channels_created, 0); qemu_sem_init(&multifd_send_state->channels_ready, 0); qatomic_set(&multifd_send_state->exiting, 0); @@ -874,6 +952,9 @@ bool multifd_send_setup(void) p->packet_len = sizeof(MultiFDPacket_t) + sizeof(uint64_t) * page_count; p->packet = g_malloc0(p->packet_len); + p->packet_device_state = g_malloc0(sizeof(*p->packet_device_state)); + p->packet_device_state->hdr.magic = cpu_to_be32(MULTIFD_MAGIC); + p->packet_device_state->hdr.version = cpu_to_be32(MULTIFD_VERSION); } p->name = g_strdup_printf(MIGRATION_THREAD_SRC_MULTIFD, i); p->write_flags = 0; @@ -909,6 +990,8 @@ bool multifd_send_setup(void) assert(p->iov); } + multifd_device_state_send_setup(); + return true; err: @@ -1048,6 +1131,7 @@ static void multifd_recv_cleanup_channel(MultiFDRecvParams *p) p->packet_len = 0; g_free(p->packet); p->packet = NULL; + g_clear_pointer(&p->packet_dev_state, g_free); g_free(p->normal); p->normal = NULL; g_free(p->zero); @@ -1149,6 +1233,34 @@ void multifd_recv_sync_main(void) trace_multifd_recv_sync_main(multifd_recv_state->packet_num); } +static int multifd_device_state_recv(MultiFDRecvParams *p, Error **errp) +{ + g_autofree char *dev_state_buf = NULL; + int ret; + + dev_state_buf = g_malloc(p->next_packet_size); + + ret = qio_channel_read_all(p->c, dev_state_buf, p->next_packet_size, errp); + if (ret != 0) { + return ret; + } + + if (p->packet_dev_state->idstr[sizeof(p->packet_dev_state->idstr) - 1] + != 0) { + error_setg(errp, "unterminated multifd device state idstr"); + return -1; + } + + if (!qemu_loadvm_load_state_buffer(p->packet_dev_state->idstr, + p->packet_dev_state->instance_id, + dev_state_buf, p->next_packet_size, + errp)) { + ret = -1; + } + + return ret; +} + static void *multifd_recv_thread(void *opaque) { MigrationState *s = migrate_get_current(); @@ -1165,14 +1277,19 @@ static void *multifd_recv_thread(void *opaque) } while (true) { + MultiFDPacketHdr_t hdr; uint32_t flags = 0; + bool is_device_state = false; bool has_data = false; + uint8_t *pkt_buf; + size_t pkt_len; + p->normal_num = 0; if (use_packets) { struct iovec iov = { - .iov_base = (void *)p->packet, - .iov_len = p->packet_len + .iov_base = (void *)&hdr, + .iov_len = sizeof(hdr) }; if (multifd_recv_should_exit()) { @@ -1191,6 +1308,32 @@ static void *multifd_recv_thread(void *opaque) break; } + ret = multifd_recv_unfill_packet_header(p, &hdr, &local_err); + if (ret) { + break; + } + + is_device_state = p->flags & MULTIFD_FLAG_DEVICE_STATE; + if (is_device_state) { + pkt_buf = (uint8_t *)p->packet_dev_state + sizeof(hdr); + pkt_len = sizeof(*p->packet_dev_state) - sizeof(hdr); + } else { + pkt_buf = (uint8_t *)p->packet + sizeof(hdr); + pkt_len = p->packet_len - sizeof(hdr); + } + + ret = qio_channel_read_all_eof(p->c, (char *)pkt_buf, pkt_len, + &local_err); + if (!ret) { + /* EOF */ + error_setg(&local_err, "multifd: unexpected EOF after packet header"); + break; + } + + if (ret == -1) { + break; + } + qemu_mutex_lock(&p->mutex); ret = multifd_recv_unfill_packet(p, &local_err); if (ret) { @@ -1202,12 +1345,17 @@ static void *multifd_recv_thread(void *opaque) /* recv methods don't know how to handle the SYNC flag */ p->flags &= ~MULTIFD_FLAG_SYNC; - /* - * Even if it's a SYNC packet, this needs to be set - * because older QEMUs (<9.0) still send data along with - * the SYNC packet. - */ - has_data = p->normal_num || p->zero_num; + if (is_device_state) { + has_data = p->next_packet_size > 0; + } else { + /* + * Even if it's a SYNC packet, this needs to be set + * because older QEMUs (<9.0) still send data along with + * the SYNC packet. + */ + has_data = p->normal_num || p->zero_num; + } + qemu_mutex_unlock(&p->mutex); } else { /* @@ -1236,14 +1384,29 @@ static void *multifd_recv_thread(void *opaque) } if (has_data) { - ret = multifd_recv_state->ops->recv(p, &local_err); + if (is_device_state) { + assert(use_packets); + ret = multifd_device_state_recv(p, &local_err); + } else { + ret = multifd_recv_state->ops->recv(p, &local_err); + } if (ret != 0) { break; } + } else if (is_device_state) { + error_setg(&local_err, + "multifd: received empty device state packet"); + break; } if (use_packets) { if (flags & MULTIFD_FLAG_SYNC) { + if (is_device_state) { + error_setg(&local_err, + "multifd: received SYNC device state packet"); + break; + } + qemu_sem_post(&multifd_recv_state->sem_sync); qemu_sem_wait(&p->sem_sync); } @@ -1312,6 +1475,7 @@ int multifd_recv_setup(Error **errp) p->packet_len = sizeof(MultiFDPacket_t) + sizeof(uint64_t) * page_count; p->packet = g_malloc0(p->packet_len); + p->packet_dev_state = g_malloc0(sizeof(*p->packet_dev_state)); } p->name = g_strdup_printf(MIGRATION_THREAD_DST_MULTIFD, i); p->normal = g_new0(ram_addr_t, page_count); diff --git a/migration/multifd.h b/migration/multifd.h index cf408ff721..2d337e7b3b 100644 --- a/migration/multifd.h +++ b/migration/multifd.h @@ -62,6 +62,12 @@ MultiFDRecvData *multifd_get_recv_data(void); #define MULTIFD_FLAG_UADK (8 << 1) #define MULTIFD_FLAG_QATZIP (16 << 1) +/* + * If set it means that this packet contains device state + * (MultiFDPacketDeviceState_t), not RAM data (MultiFDPacket_t). + */ +#define MULTIFD_FLAG_DEVICE_STATE (32 << 1) + /* This value needs to be a multiple of qemu_target_page_size() */ #define MULTIFD_PACKET_SIZE (512 * 1024) @@ -69,6 +75,11 @@ typedef struct { uint32_t magic; uint32_t version; uint32_t flags; +} __attribute__((packed)) MultiFDPacketHdr_t; + +typedef struct { + MultiFDPacketHdr_t hdr; + /* maximum number of allocated pages */ uint32_t pages_alloc; /* non zero pages */ @@ -90,13 +101,27 @@ typedef struct { } __attribute__((packed)) MultiFDPacket_t; typedef struct { + MultiFDPacketHdr_t hdr; + + char idstr[256]; + uint32_t instance_id; + + /* size of the next packet that contains the actual data */ + uint32_t next_packet_size; +} __attribute__((packed)) MultiFDPacketDeviceState_t; + +typedef struct { /* number of used pages */ uint32_t num; /* number of normal pages */ uint32_t normal_num; + /* + * Pointer to the ramblock. NOTE: it's caller's responsibility to make + * sure the pointer is always valid! + */ RAMBlock *block; - /* offset of each page */ - ram_addr_t offset[]; + /* offset array of each page, managed by multifd */ + ram_addr_t *offset; } MultiFDPages_t; struct MultiFDRecvData { @@ -106,13 +131,22 @@ struct MultiFDRecvData { off_t file_offset; }; +typedef struct { + char *idstr; + uint32_t instance_id; + char *buf; + size_t buf_len; +} MultiFDDeviceState_t; + typedef enum { MULTIFD_PAYLOAD_NONE, MULTIFD_PAYLOAD_RAM, + MULTIFD_PAYLOAD_DEVICE_STATE, } MultiFDPayloadType; -typedef union MultiFDPayload { +typedef struct MultiFDPayload { MultiFDPages_t ram; + MultiFDDeviceState_t device_state; } MultiFDPayload; struct MultiFDSendData { @@ -125,9 +159,17 @@ static inline bool multifd_payload_empty(MultiFDSendData *data) return data->type == MULTIFD_PAYLOAD_NONE; } +static inline bool multifd_payload_device_state(MultiFDSendData *data) +{ + return data->type == MULTIFD_PAYLOAD_DEVICE_STATE; +} + static inline void multifd_set_payload_type(MultiFDSendData *data, MultiFDPayloadType type) { + assert(multifd_payload_empty(data)); + assert(type != MULTIFD_PAYLOAD_NONE); + data->type = type; } @@ -174,8 +216,9 @@ typedef struct { /* thread local variables. No locking required */ - /* pointer to the packet */ + /* pointers to the possible packet types */ MultiFDPacket_t *packet; + MultiFDPacketDeviceState_t *packet_device_state; /* size of the next packet that contains pages */ uint32_t next_packet_size; /* packets sent through this channel */ @@ -222,8 +265,9 @@ typedef struct { /* thread local variables. No locking required */ - /* pointer to the packet */ + /* pointers to the possible packet types */ MultiFDPacket_t *packet; + MultiFDPacketDeviceState_t *packet_dev_state; /* size of the next packet that contains pages */ uint32_t next_packet_size; /* packets received through this channel */ @@ -333,16 +377,11 @@ bool multifd_send_prepare_common(MultiFDSendParams *p); void multifd_send_zero_page_detect(MultiFDSendParams *p); void multifd_recv_zero_page_process(MultiFDRecvParams *p); -static inline void multifd_send_prepare_header(MultiFDSendParams *p) -{ - p->iov[0].iov_len = p->packet_len; - p->iov[0].iov_base = p->packet; - p->iovs_num++; -} - void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc); bool multifd_send(MultiFDSendData **send_data); MultiFDSendData *multifd_send_data_alloc(void); +void multifd_send_data_clear(MultiFDSendData *data); +void multifd_send_data_free(MultiFDSendData *data); static inline uint32_t multifd_ram_page_size(void) { @@ -359,7 +398,16 @@ void multifd_ram_save_cleanup(void); int multifd_ram_flush_and_sync(QEMUFile *f); bool multifd_ram_sync_per_round(void); bool multifd_ram_sync_per_section(void); -size_t multifd_ram_payload_size(void); +void multifd_ram_payload_alloc(MultiFDPages_t *pages); +void multifd_ram_payload_free(MultiFDPages_t *pages); void multifd_ram_fill_packet(MultiFDSendParams *p); int multifd_ram_unfill_packet(MultiFDRecvParams *p, Error **errp); + +void multifd_send_data_clear_device_state(MultiFDDeviceState_t *device_state); + +void multifd_device_state_send_setup(void); +void multifd_device_state_send_cleanup(void); + +void multifd_device_state_send_prepare(MultiFDSendParams *p); + #endif diff --git a/migration/options.c b/migration/options.c index bb259d192a..b0ac2ea408 100644 --- a/migration/options.c +++ b/migration/options.c @@ -93,6 +93,8 @@ const Property migration_properties[] = { send_configuration, true), DEFINE_PROP_BOOL("send-section-footer", MigrationState, send_section_footer, true), + DEFINE_PROP_BOOL("send-switchover-start", MigrationState, + send_switchover_start, true), DEFINE_PROP_BOOL("multifd-flush-after-each-section", MigrationState, multifd_flush_after_each_section, false), DEFINE_PROP_UINT8("x-clear-bitmap-shift", MigrationState, @@ -209,6 +211,13 @@ bool migrate_auto_converge(void) return s->capabilities[MIGRATION_CAPABILITY_AUTO_CONVERGE]; } +bool migrate_send_switchover_start(void) +{ + MigrationState *s = migrate_get_current(); + + return s->send_switchover_start; +} + bool migrate_background_snapshot(void) { MigrationState *s = migrate_get_current(); diff --git a/migration/qemu-file.h b/migration/qemu-file.h index 3e47a20621..f5b9f430e0 100644 --- a/migration/qemu-file.h +++ b/migration/qemu-file.h @@ -33,6 +33,8 @@ QEMUFile *qemu_file_new_input(QIOChannel *ioc); QEMUFile *qemu_file_new_output(QIOChannel *ioc); int qemu_fclose(QEMUFile *f); +G_DEFINE_AUTOPTR_CLEANUP_FUNC(QEMUFile, qemu_fclose) + /* * qemu_file_transferred: * diff --git a/migration/savevm.c b/migration/savevm.c index 4046faf009..5c4fdfd95e 100644 --- a/migration/savevm.c +++ b/migration/savevm.c @@ -37,6 +37,7 @@ #include "migration/register.h" #include "migration/global_state.h" #include "migration/channel-block.h" +#include "multifd.h" #include "ram.h" #include "qemu-file.h" #include "savevm.h" @@ -54,6 +55,7 @@ #include "qemu/job.h" #include "qemu/main-loop.h" #include "block/snapshot.h" +#include "block/thread-pool.h" #include "qemu/cutils.h" #include "io/channel-buffer.h" #include "io/channel-file.h" @@ -90,6 +92,7 @@ enum qemu_vm_cmd { MIG_CMD_ENABLE_COLO, /* Enable COLO */ MIG_CMD_POSTCOPY_RESUME, /* resume postcopy on dest */ MIG_CMD_RECV_BITMAP, /* Request for recved bitmap on dst */ + MIG_CMD_SWITCHOVER_START, /* Switchover start notification */ MIG_CMD_MAX }; @@ -109,6 +112,7 @@ static struct mig_cmd_args { [MIG_CMD_POSTCOPY_RESUME] = { .len = 0, .name = "POSTCOPY_RESUME" }, [MIG_CMD_PACKAGED] = { .len = 4, .name = "PACKAGED" }, [MIG_CMD_RECV_BITMAP] = { .len = -1, .name = "RECV_BITMAP" }, + [MIG_CMD_SWITCHOVER_START] = { .len = 0, .name = "SWITCHOVER_START" }, [MIG_CMD_MAX] = { .len = -1, .name = "MAX" }, }; @@ -130,6 +134,35 @@ static struct mig_cmd_args { */ /***********************************************************/ +/* Optional load threads pool support */ + +static void qemu_loadvm_thread_pool_create(MigrationIncomingState *mis) +{ + assert(!mis->load_threads); + mis->load_threads = thread_pool_new(); + mis->load_threads_abort = false; +} + +static void qemu_loadvm_thread_pool_destroy(MigrationIncomingState *mis) +{ + qatomic_set(&mis->load_threads_abort, true); + + bql_unlock(); /* Load threads might be waiting for BQL */ + g_clear_pointer(&mis->load_threads, thread_pool_free); + bql_lock(); +} + +static bool qemu_loadvm_thread_pool_wait(MigrationState *s, + MigrationIncomingState *mis) +{ + bql_unlock(); /* Let load threads do work requiring BQL */ + thread_pool_wait(mis->load_threads); + bql_lock(); + + return !migrate_has_error(s); +} + +/***********************************************************/ /* savevm/loadvm support */ static QEMUFile *qemu_fopen_bdrv(BlockDriverState *bs, int is_writable) @@ -1201,6 +1234,19 @@ void qemu_savevm_send_recv_bitmap(QEMUFile *f, char *block_name) qemu_savevm_command_send(f, MIG_CMD_RECV_BITMAP, len + 1, (uint8_t *)buf); } +static void qemu_savevm_send_switchover_start(QEMUFile *f) +{ + trace_savevm_send_switchover_start(); + qemu_savevm_command_send(f, MIG_CMD_SWITCHOVER_START, 0, NULL); +} + +void qemu_savevm_maybe_send_switchover_start(QEMUFile *f) +{ + if (migrate_send_switchover_start()) { + qemu_savevm_send_switchover_start(f); + } +} + bool qemu_savevm_state_blocked(Error **errp) { SaveStateEntry *se; @@ -1482,6 +1528,24 @@ int qemu_savevm_state_complete_precopy_iterable(QEMUFile *f, bool in_postcopy) int64_t start_ts_each, end_ts_each; SaveStateEntry *se; int ret; + bool multifd_device_state = multifd_device_state_supported(); + + if (multifd_device_state) { + QTAILQ_FOREACH(se, &savevm_state.handlers, entry) { + SaveLiveCompletePrecopyThreadHandler hdlr; + + if (!se->ops || (in_postcopy && se->ops->has_postcopy && + se->ops->has_postcopy(se->opaque)) || + !se->ops->save_live_complete_precopy_thread) { + continue; + } + + hdlr = se->ops->save_live_complete_precopy_thread; + multifd_spawn_device_state_save_thread(hdlr, + se->idstr, se->instance_id, + se->opaque); + } + } QTAILQ_FOREACH(se, &savevm_state.handlers, entry) { if (!se->ops || @@ -1507,16 +1571,35 @@ int qemu_savevm_state_complete_precopy_iterable(QEMUFile *f, bool in_postcopy) save_section_footer(f, se); if (ret < 0) { qemu_file_set_error(f, ret); - return -1; + goto ret_fail_abort_threads; } end_ts_each = qemu_clock_get_us(QEMU_CLOCK_REALTIME); trace_vmstate_downtime_save("iterable", se->idstr, se->instance_id, end_ts_each - start_ts_each); } + if (multifd_device_state) { + if (migrate_has_error(migrate_get_current())) { + multifd_abort_device_state_save_threads(); + } + + if (!multifd_join_device_state_save_threads()) { + qemu_file_set_error(f, -EINVAL); + return -1; + } + } + trace_vmstate_downtime_checkpoint("src-iterable-saved"); return 0; + +ret_fail_abort_threads: + if (multifd_device_state) { + multifd_abort_device_state_save_threads(); + multifd_join_device_state_save_threads(); + } + + return -1; } int qemu_savevm_state_complete_precopy_non_iterable(QEMUFile *f, @@ -1687,6 +1770,7 @@ static int qemu_savevm_state(QEMUFile *f, Error **errp) ret = qemu_file_get_error(f); if (ret == 0) { + qemu_savevm_maybe_send_switchover_start(f); qemu_savevm_state_complete_precopy(f, false); ret = qemu_file_get_error(f); } @@ -1970,6 +2054,8 @@ static void *postcopy_ram_listen_thread(void *opaque) * in qemu_file, and thus we must be blocking now. */ qemu_file_set_blocking(f, true); + + /* TODO: sanity check that only postcopiable data will be loaded here */ load_res = qemu_loadvm_state_main(f, mis); /* @@ -2030,7 +2116,9 @@ static void *postcopy_ram_listen_thread(void *opaque) * (If something broke then qemu will have to exit anyway since it's * got a bad migration state). */ + bql_lock(); migration_incoming_state_destroy(); + bql_unlock(); rcu_unregister_thread(); mis->have_listen_thread = false; @@ -2383,6 +2471,26 @@ static int loadvm_process_enable_colo(MigrationIncomingState *mis) return ret; } +static int loadvm_postcopy_handle_switchover_start(void) +{ + SaveStateEntry *se; + + QTAILQ_FOREACH(se, &savevm_state.handlers, entry) { + int ret; + + if (!se->ops || !se->ops->switchover_start) { + continue; + } + + ret = se->ops->switchover_start(se->opaque); + if (ret < 0) { + return ret; + } + } + + return 0; +} + /* * Process an incoming 'QEMU_VM_COMMAND' * 0 just a normal return @@ -2481,6 +2589,9 @@ static int loadvm_process_command(QEMUFile *f) case MIG_CMD_ENABLE_COLO: return loadvm_process_enable_colo(mis); + + case MIG_CMD_SWITCHOVER_START: + return loadvm_postcopy_handle_switchover_start(); } return 0; @@ -2740,16 +2851,68 @@ static int qemu_loadvm_state_setup(QEMUFile *f, Error **errp) return 0; } -void qemu_loadvm_state_cleanup(void) +struct LoadThreadData { + MigrationLoadThread function; + void *opaque; +}; + +static int qemu_loadvm_load_thread(void *thread_opaque) +{ + struct LoadThreadData *data = thread_opaque; + MigrationIncomingState *mis = migration_incoming_get_current(); + g_autoptr(Error) local_err = NULL; + + if (!data->function(data->opaque, &mis->load_threads_abort, &local_err)) { + MigrationState *s = migrate_get_current(); + + /* + * Can't set load_threads_abort here since processing of main migration + * channel data could still be happening, resulting in launching of new + * load threads. + */ + + assert(local_err); + + /* + * In case of multiple load threads failing which thread error + * return we end setting is purely arbitrary. + */ + migrate_set_error(s, local_err); + } + + return 0; +} + +void qemu_loadvm_start_load_thread(MigrationLoadThread function, + void *opaque) +{ + MigrationIncomingState *mis = migration_incoming_get_current(); + struct LoadThreadData *data; + + /* We only set it from this thread so it's okay to read it directly */ + assert(!mis->load_threads_abort); + + data = g_new(struct LoadThreadData, 1); + data->function = function; + data->opaque = opaque; + + thread_pool_submit_immediate(mis->load_threads, qemu_loadvm_load_thread, + data, g_free); +} + +void qemu_loadvm_state_cleanup(MigrationIncomingState *mis) { SaveStateEntry *se; trace_loadvm_state_cleanup(); + QTAILQ_FOREACH(se, &savevm_state.handlers, entry) { if (se->ops && se->ops->load_cleanup) { se->ops->load_cleanup(se->opaque); } } + + qemu_loadvm_thread_pool_destroy(mis); } /* Return true if we should continue the migration, or false. */ @@ -2900,6 +3063,7 @@ out: int qemu_loadvm_state(QEMUFile *f) { + MigrationState *s = migrate_get_current(); MigrationIncomingState *mis = migration_incoming_get_current(); Error *local_err = NULL; int ret; @@ -2909,6 +3073,8 @@ int qemu_loadvm_state(QEMUFile *f) return -EINVAL; } + qemu_loadvm_thread_pool_create(mis); + ret = qemu_loadvm_state_header(f); if (ret) { return ret; @@ -2940,12 +3106,18 @@ int qemu_loadvm_state(QEMUFile *f) /* When reaching here, it must be precopy */ if (ret == 0) { - if (migrate_has_error(migrate_get_current())) { + if (migrate_has_error(migrate_get_current()) || + !qemu_loadvm_thread_pool_wait(s, mis)) { ret = -EINVAL; } else { ret = qemu_file_get_error(f); } } + /* + * Set this flag unconditionally so we'll catch further attempts to + * start additional threads via an appropriate assert() + */ + qatomic_set(&mis->load_threads_abort, true); /* * Try to read in the VMDESC section as well, so that dumping tools that @@ -3021,6 +3193,29 @@ int qemu_loadvm_approve_switchover(void) return migrate_send_rp_switchover_ack(mis); } +bool qemu_loadvm_load_state_buffer(const char *idstr, uint32_t instance_id, + char *buf, size_t len, Error **errp) +{ + SaveStateEntry *se; + + se = find_se(idstr, instance_id); + if (!se) { + error_setg(errp, + "Unknown idstr %s or instance id %u for load state buffer", + idstr, instance_id); + return false; + } + + if (!se->ops || !se->ops->load_state_buffer) { + error_setg(errp, + "idstr %s / instance %u has no load state buffer operation", + idstr, instance_id); + return false; + } + + return se->ops->load_state_buffer(se->opaque, buf, len, errp); +} + bool save_snapshot(const char *name, bool overwrite, const char *vmstate, bool has_devices, strList *devices, Error **errp) { diff --git a/migration/savevm.h b/migration/savevm.h index 7957460062..138c39a7f9 100644 --- a/migration/savevm.h +++ b/migration/savevm.h @@ -53,6 +53,7 @@ void qemu_savevm_send_postcopy_listen(QEMUFile *f); void qemu_savevm_send_postcopy_run(QEMUFile *f); void qemu_savevm_send_postcopy_resume(QEMUFile *f); void qemu_savevm_send_recv_bitmap(QEMUFile *f, char *block_name); +void qemu_savevm_maybe_send_switchover_start(QEMUFile *f); void qemu_savevm_send_postcopy_ram_discard(QEMUFile *f, const char *name, uint16_t len, @@ -63,11 +64,14 @@ void qemu_savevm_live_state(QEMUFile *f); int qemu_save_device_state(QEMUFile *f); int qemu_loadvm_state(QEMUFile *f); -void qemu_loadvm_state_cleanup(void); +void qemu_loadvm_state_cleanup(MigrationIncomingState *mis); int qemu_loadvm_state_main(QEMUFile *f, MigrationIncomingState *mis); int qemu_load_device_state(QEMUFile *f); int qemu_loadvm_approve_switchover(void); int qemu_savevm_state_complete_precopy_non_iterable(QEMUFile *f, bool in_postcopy); +bool qemu_loadvm_load_state_buffer(const char *idstr, uint32_t instance_id, + char *buf, size_t len, Error **errp); + #endif diff --git a/migration/trace-events b/migration/trace-events index 58c0f07f5b..c506e11a2e 100644 --- a/migration/trace-events +++ b/migration/trace-events @@ -39,6 +39,7 @@ savevm_send_postcopy_run(void) "" savevm_send_postcopy_resume(void) "" savevm_send_colo_enable(void) "" savevm_send_recv_bitmap(char *name) "%s" +savevm_send_switchover_start(void) "" savevm_state_setup(void) "" savevm_state_resume_prepare(void) "" savevm_state_header(void) "" |