diff options
Diffstat (limited to 'migration')
| -rw-r--r-- | migration/block-dirty-bitmap.c | 7 | ||||
| -rw-r--r-- | migration/block.c | 20 | ||||
| -rw-r--r-- | migration/colo-failover.c | 3 | ||||
| -rw-r--r-- | migration/colo.c | 1 | ||||
| -rw-r--r-- | migration/migration-hmp-cmds.c | 1 | ||||
| -rw-r--r-- | migration/migration.c | 158 | ||||
| -rw-r--r-- | migration/migration.h | 15 | ||||
| -rw-r--r-- | migration/multifd.c | 87 | ||||
| -rw-r--r-- | migration/multifd.c.orig | 1274 | ||||
| -rw-r--r-- | migration/multifd.h | 3 | ||||
| -rw-r--r-- | migration/postcopy-ram.c | 31 | ||||
| -rw-r--r-- | migration/postcopy-ram.h | 4 | ||||
| -rw-r--r-- | migration/qemu-file.c | 34 | ||||
| -rw-r--r-- | migration/qemu-file.h | 1 | ||||
| -rw-r--r-- | migration/ram.c | 168 | ||||
| -rw-r--r-- | migration/savevm.c | 38 | ||||
| -rw-r--r-- | migration/savevm.h | 10 | ||||
| -rw-r--r-- | migration/trace-events | 5 | ||||
| -rw-r--r-- | migration/xbzrle.c | 124 | ||||
| -rw-r--r-- | migration/xbzrle.h | 4 |
20 files changed, 470 insertions, 1518 deletions
diff --git a/migration/block-dirty-bitmap.c b/migration/block-dirty-bitmap.c index 5a621419d3..fe73aa94b1 100644 --- a/migration/block-dirty-bitmap.c +++ b/migration/block-dirty-bitmap.c @@ -763,9 +763,8 @@ static int dirty_bitmap_save_complete(QEMUFile *f, void *opaque) } static void dirty_bitmap_state_pending(void *opaque, - uint64_t *res_precopy_only, - uint64_t *res_compatible, - uint64_t *res_postcopy_only) + uint64_t *must_precopy, + uint64_t *can_postcopy) { DBMSaveState *s = &((DBMState *)opaque)->save; SaveBitmapState *dbms; @@ -785,7 +784,7 @@ static void dirty_bitmap_state_pending(void *opaque, trace_dirty_bitmap_state_pending(pending); - *res_postcopy_only += pending; + *can_postcopy += pending; } /* First occurrence of this bitmap. It should be created if doesn't exist */ diff --git a/migration/block.c b/migration/block.c index 29f69025af..426a25bb19 100644 --- a/migration/block.c +++ b/migration/block.c @@ -42,16 +42,6 @@ #define MAX_IO_BUFFERS 512 #define MAX_PARALLEL_IO 16 -/* #define DEBUG_BLK_MIGRATION */ - -#ifdef DEBUG_BLK_MIGRATION -#define DPRINTF(fmt, ...) \ - do { printf("blk_migration: " fmt, ## __VA_ARGS__); } while (0) -#else -#define DPRINTF(fmt, ...) \ - do { } while (0) -#endif - typedef struct BlkMigDevState { /* Written during setup phase. Can be read without a lock. */ BlockBackend *blk; @@ -502,7 +492,7 @@ static int blk_mig_save_bulked_block(QEMUFile *f) block_mig_state.prev_progress = progress; qemu_put_be64(f, (progress << BDRV_SECTOR_BITS) | BLK_MIG_FLAG_PROGRESS); - DPRINTF("Completed %d %%\r", progress); + trace_migration_block_progression(progress); } return ret; @@ -863,10 +853,8 @@ static int block_save_complete(QEMUFile *f, void *opaque) return 0; } -static void block_state_pending(void *opaque, - uint64_t *res_precopy_only, - uint64_t *res_compatible, - uint64_t *res_postcopy_only) +static void block_state_pending(void *opaque, uint64_t *must_precopy, + uint64_t *can_postcopy) { /* Estimate pending number of bytes to send */ uint64_t pending; @@ -887,7 +875,7 @@ static void block_state_pending(void *opaque, trace_migration_block_state_pending(pending); /* We don't do postcopy */ - *res_precopy_only += pending; + *must_precopy += pending; } static int block_load(QEMUFile *f, void *opaque, int version_id) diff --git a/migration/colo-failover.c b/migration/colo-failover.c index 42453481c4..6cb6f90357 100644 --- a/migration/colo-failover.c +++ b/migration/colo-failover.c @@ -17,7 +17,6 @@ #include "migration.h" #include "qapi/error.h" #include "qapi/qapi-commands-migration.h" -#include "qapi/qmp/qerror.h" #include "qemu/error-report.h" #include "trace.h" @@ -78,7 +77,7 @@ FailoverStatus failover_get_state(void) void qmp_x_colo_lost_heartbeat(Error **errp) { if (get_colo_mode() == COLO_MODE_NONE) { - error_setg(errp, QERR_FEATURE_DISABLED, "colo"); + error_setg(errp, "VM is not in COLO mode"); return; } diff --git a/migration/colo.c b/migration/colo.c index 232c8d44b1..0716e64689 100644 --- a/migration/colo.c +++ b/migration/colo.c @@ -33,7 +33,6 @@ #include "net/colo.h" #include "block/block.h" #include "qapi/qapi-events-migration.h" -#include "qapi/qmp/qerror.h" #include "sysemu/cpus.h" #include "sysemu/runstate.h" #include "net/filter.h" diff --git a/migration/migration-hmp-cmds.c b/migration/migration-hmp-cmds.c index ef25bc8929..72519ea99f 100644 --- a/migration/migration-hmp-cmds.c +++ b/migration/migration-hmp-cmds.c @@ -23,7 +23,6 @@ #include "qapi/qapi-commands-migration.h" #include "qapi/qapi-visit-migration.h" #include "qapi/qmp/qdict.h" -#include "qapi/qmp/qerror.h" #include "qapi/string-input-visitor.h" #include "qapi/string-output-visitor.h" #include "qemu/cutils.h" diff --git a/migration/migration.c b/migration/migration.c index 7a14aa98d8..ae2025d9d8 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -184,16 +184,27 @@ static int migration_maybe_pause(MigrationState *s, int new_state); static void migrate_fd_cancel(MigrationState *s); -static bool migrate_allow_multi_channels = true; +static bool migration_needs_multiple_sockets(void) +{ + return migrate_use_multifd() || migrate_postcopy_preempt(); +} -void migrate_protocol_allow_multi_channels(bool allow) +static bool uri_supports_multi_channels(const char *uri) { - migrate_allow_multi_channels = allow; + return strstart(uri, "tcp:", NULL) || strstart(uri, "unix:", NULL) || + strstart(uri, "vsock:", NULL); } -bool migrate_multi_channels_is_allowed(void) +static bool +migration_channels_and_uri_compatible(const char *uri, Error **errp) { - return migrate_allow_multi_channels; + if (migration_needs_multiple_sockets() && + !uri_supports_multi_channels(uri)) { + error_setg(errp, "Migration requires multi-channel URIs (e.g. tcp)"); + return false; + } + + return true; } static gint page_request_addr_cmp(gconstpointer ap, gconstpointer bp) @@ -224,6 +235,8 @@ void migration_object_init(void) qemu_sem_init(¤t_incoming->postcopy_pause_sem_dst, 0); qemu_sem_init(¤t_incoming->postcopy_pause_sem_fault, 0); qemu_sem_init(¤t_incoming->postcopy_pause_sem_fast_load, 0); + qemu_sem_init(¤t_incoming->postcopy_qemufile_dst_done, 0); + qemu_mutex_init(¤t_incoming->page_request_mutex); current_incoming->page_requested = g_tree_new(page_request_addr_cmp); @@ -302,6 +315,8 @@ void migration_incoming_state_destroy(void) { struct MigrationIncomingState *mis = migration_incoming_get_current(); + multifd_load_cleanup(); + if (mis->to_src_file) { /* Tell source that we are done */ migrate_send_rp_shut(mis, qemu_file_get_error(mis->from_src_file) != 0); @@ -493,12 +508,15 @@ static void qemu_start_incoming_migration(const char *uri, Error **errp) { const char *p = NULL; - migrate_protocol_allow_multi_channels(false); /* reset it anyway */ + /* URI is not suitable for migration? */ + if (!migration_channels_and_uri_compatible(uri, errp)) { + return; + } + qapi_event_send_migration(MIGRATION_STATUS_SETUP); if (strstart(uri, "tcp:", &p) || strstart(uri, "unix:", NULL) || strstart(uri, "vsock:", NULL)) { - migrate_protocol_allow_multi_channels(true); socket_start_incoming_migration(p ? p : uri, errp); #ifdef CONFIG_RDMA } else if (strstart(uri, "rdma:", &p)) { @@ -543,13 +561,7 @@ static void process_incoming_migration_bh(void *opaque) */ qemu_announce_self(&mis->announce_timer, migrate_announce_params()); - if (multifd_load_cleanup(&local_err) != 0) { - error_report_err(local_err); - autostart = false; - } - /* If global state section was not received or we are in running - state, we need to obey autostart. Any other state is set with - runstate_set. */ + multifd_load_shutdown(); dirty_bitmap_mig_before_vm_start(); @@ -649,9 +661,9 @@ fail: migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE, MIGRATION_STATUS_FAILED); qemu_fclose(mis->from_src_file); - if (multifd_load_cleanup(&local_err) != 0) { - error_report_err(local_err); - } + + multifd_load_cleanup(); + exit(EXIT_FAILURE); } @@ -723,9 +735,29 @@ void migration_fd_process_incoming(QEMUFile *f, Error **errp) migration_incoming_process(); } -static bool migration_needs_multiple_sockets(void) +/* + * Returns true when we want to start a new incoming migration process, + * false otherwise. + */ +static bool migration_should_start_incoming(bool main_channel) { - return migrate_use_multifd() || migrate_postcopy_preempt(); + /* Multifd doesn't start unless all channels are established */ + if (migrate_use_multifd()) { + return migration_has_all_channels(); + } + + /* Preempt channel only starts when the main channel is created */ + if (migrate_postcopy_preempt()) { + return main_channel; + } + + /* + * For all the rest types of migration, we should only reach here when + * it's the main channel that's being created, and we should always + * proceed with this channel. + */ + assert(main_channel); + return true; } void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp) @@ -789,7 +821,7 @@ void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp) } } - if (migration_has_all_channels()) { + if (migration_should_start_incoming(default_channel)) { /* If it's a recovery, we're done */ if (postcopy_try_recover()) { return; @@ -1378,15 +1410,6 @@ static bool migrate_caps_check(bool *cap_list, } #endif - - /* incoming side only */ - if (runstate_check(RUN_STATE_INMIGRATE) && - !migrate_multi_channels_is_allowed() && - cap_list[MIGRATION_CAPABILITY_MULTIFD]) { - error_setg(errp, "multifd is not supported by current protocol"); - return false; - } - if (cap_list[MIGRATION_CAPABILITY_POSTCOPY_PREEMPT]) { if (!cap_list[MIGRATION_CAPABILITY_POSTCOPY_RAM]) { error_setg(errp, "Postcopy preempt requires postcopy-ram"); @@ -2471,6 +2494,11 @@ void qmp_migrate(const char *uri, bool has_blk, bool blk, MigrationState *s = migrate_get_current(); const char *p = NULL; + /* URI is not suitable for migration? */ + if (!migration_channels_and_uri_compatible(uri, errp)) { + return; + } + if (!migrate_prepare(s, has_blk && blk, has_inc && inc, has_resume && resume, errp)) { /* Error detected, put into errp */ @@ -2483,11 +2511,9 @@ void qmp_migrate(const char *uri, bool has_blk, bool blk, } } - migrate_protocol_allow_multi_channels(false); if (strstart(uri, "tcp:", &p) || strstart(uri, "unix:", NULL) || strstart(uri, "vsock:", NULL)) { - migrate_protocol_allow_multi_channels(true); socket_start_outgoing_migration(s, p ? p : uri, &local_err); #ifdef CONFIG_RDMA } else if (strstart(uri, "rdma:", &p)) { @@ -3022,6 +3048,7 @@ retry: case MIG_RP_MSG_PONG: tmp32 = ldl_be_p(buf); trace_source_return_path_thread_pong(tmp32); + qemu_sem_post(&ms->rp_state.rp_pong_acks); break; case MIG_RP_MSG_REQ_PAGES: @@ -3155,6 +3182,13 @@ static int await_return_path_close_on_source(MigrationState *ms) return ms->rp_state.error; } +static inline void +migration_wait_main_channel(MigrationState *ms) +{ + /* Wait until one PONG message received */ + qemu_sem_wait(&ms->rp_state.rp_pong_acks); +} + /* * Switch from normal iteration to postcopy * Returns non-0 on error @@ -3169,9 +3203,12 @@ static int postcopy_start(MigrationState *ms) bool restart_block = false; int cur_state = MIGRATION_STATUS_ACTIVE; - if (postcopy_preempt_wait_channel(ms)) { - migrate_set_state(&ms->state, ms->state, MIGRATION_STATUS_FAILED); - return -1; + if (migrate_postcopy_preempt()) { + migration_wait_main_channel(ms); + if (postcopy_preempt_establish_channel(ms)) { + migrate_set_state(&ms->state, ms->state, MIGRATION_STATUS_FAILED); + return -1; + } } if (!migrate_pause_before_switchover()) { @@ -3583,6 +3620,20 @@ static int postcopy_do_resume(MigrationState *s) } /* + * If preempt is enabled, re-establish the preempt channel. Note that + * we do it after resume prepare to make sure the main channel will be + * created before the preempt channel. E.g. with weak network, the + * dest QEMU may get messed up with the preempt and main channels on + * the order of connection setup. This guarantees the correct order. + */ + ret = postcopy_preempt_establish_channel(s); + if (ret) { + error_report("%s: postcopy_preempt_establish_channel(): %d", + __func__, ret); + return ret; + } + + /* * Last handshake with destination on the resume (destination will * switch to postcopy-active afterwards) */ @@ -3643,14 +3694,6 @@ static MigThrError postcopy_pause(MigrationState *s) if (s->state == MIGRATION_STATUS_POSTCOPY_RECOVER) { /* Woken up by a recover procedure. Give it a shot */ - if (postcopy_preempt_wait_channel(s)) { - /* - * Preempt enabled, and new channel create failed; loop - * back to wait for another recovery. - */ - continue; - } - /* * Firstly, let's wake up the return path now, with a new * return path channel. @@ -3820,20 +3863,18 @@ typedef enum { */ static MigIterateState migration_iteration_run(MigrationState *s) { - uint64_t pend_pre, pend_compat, pend_post; + uint64_t must_precopy, can_postcopy; bool in_postcopy = s->state == MIGRATION_STATUS_POSTCOPY_ACTIVE; - qemu_savevm_state_pending_estimate(&pend_pre, &pend_compat, &pend_post); - uint64_t pending_size = pend_pre + pend_compat + pend_post; + qemu_savevm_state_pending_estimate(&must_precopy, &can_postcopy); + uint64_t pending_size = must_precopy + can_postcopy; - trace_migrate_pending_estimate(pending_size, - pend_pre, pend_compat, pend_post); + trace_migrate_pending_estimate(pending_size, must_precopy, can_postcopy); - if (pend_pre + pend_compat <= s->threshold_size) { - qemu_savevm_state_pending_exact(&pend_pre, &pend_compat, &pend_post); - pending_size = pend_pre + pend_compat + pend_post; - trace_migrate_pending_exact(pending_size, - pend_pre, pend_compat, pend_post); + if (must_precopy <= s->threshold_size) { + qemu_savevm_state_pending_exact(&must_precopy, &can_postcopy); + pending_size = must_precopy + can_postcopy; + trace_migrate_pending_exact(pending_size, must_precopy, can_postcopy); } if (!pending_size || pending_size < s->threshold_size) { @@ -3843,7 +3884,7 @@ static MigIterateState migration_iteration_run(MigrationState *s) } /* Still a significant amount to transfer */ - if (!in_postcopy && pend_pre <= s->threshold_size && + if (!in_postcopy && must_precopy <= s->threshold_size && qatomic_read(&s->start_postcopy)) { if (postcopy_start(s)) { error_report("%s: postcopy failed to start", __func__); @@ -4343,15 +4384,6 @@ void migrate_fd_connect(MigrationState *s, Error *error_in) } } - /* This needs to be done before resuming a postcopy */ - if (postcopy_preempt_setup(s, &local_err)) { - error_report_err(local_err); - migrate_set_state(&s->state, MIGRATION_STATUS_SETUP, - MIGRATION_STATUS_FAILED); - migrate_fd_cleanup(s); - return; - } - if (resume) { /* Wakeup the main migration thread to do the recovery */ migrate_set_state(&s->state, MIGRATION_STATUS_POSTCOPY_PAUSED, @@ -4525,6 +4557,7 @@ static void migration_instance_finalize(Object *obj) qemu_sem_destroy(&ms->postcopy_pause_sem); qemu_sem_destroy(&ms->postcopy_pause_rp_sem); qemu_sem_destroy(&ms->rp_state.rp_sem); + qemu_sem_destroy(&ms->rp_state.rp_pong_acks); qemu_sem_destroy(&ms->postcopy_qemufile_src_sem); error_free(ms->error); } @@ -4571,6 +4604,7 @@ static void migration_instance_init(Object *obj) qemu_sem_init(&ms->postcopy_pause_sem, 0); qemu_sem_init(&ms->postcopy_pause_rp_sem, 0); qemu_sem_init(&ms->rp_state.rp_sem, 0); + qemu_sem_init(&ms->rp_state.rp_pong_acks, 0); qemu_sem_init(&ms->rate_limit_sem, 0); qemu_sem_init(&ms->wait_unplug_sem, 0); qemu_sem_init(&ms->postcopy_qemufile_src_sem, 0); diff --git a/migration/migration.h b/migration/migration.h index 66511ce532..2da2f8a164 100644 --- a/migration/migration.h +++ b/migration/migration.h @@ -116,6 +116,12 @@ struct MigrationIncomingState { unsigned int postcopy_channels; /* QEMUFile for postcopy only; it'll be handled by a separate thread */ QEMUFile *postcopy_qemufile_dst; + /* + * When postcopy_qemufile_dst is properly setup, this sem is posted. + * One can wait on this semaphore to wait until the preempt channel is + * properly setup. + */ + QemuSemaphore postcopy_qemufile_dst_done; /* Postcopy priority thread is used to receive postcopy requested pages */ QemuThread postcopy_prio_thread; bool postcopy_prio_thread_created; @@ -276,6 +282,12 @@ struct MigrationState { */ bool rp_thread_created; QemuSemaphore rp_sem; + /* + * We post to this when we got one PONG from dest. So far it's an + * easy way to know the main channel has successfully established + * on dest QEMU. + */ + QemuSemaphore rp_pong_acks; } rp_state; double mbps; @@ -474,7 +486,4 @@ void migration_cancel(const Error *error); void populate_vfio_info(MigrationInfo *info); void postcopy_temp_page_reset(PostcopyTmpPage *tmp_page); -bool migrate_multi_channels_is_allowed(void); -void migrate_protocol_allow_multi_channels(bool allow); - #endif diff --git a/migration/multifd.c b/migration/multifd.c index b7ad7002e0..5e85c3ea9b 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -516,7 +516,7 @@ void multifd_save_cleanup(void) { int i; - if (!migrate_use_multifd() || !migrate_multi_channels_is_allowed()) { + if (!migrate_use_multifd()) { return; } multifd_send_terminate_threads(NULL); @@ -843,30 +843,29 @@ static bool multifd_channel_connect(MultiFDSendParams *p, ioc, object_get_typename(OBJECT(ioc)), migrate_get_current()->hostname, error); - if (!error) { - if (migrate_channel_requires_tls_upgrade(ioc)) { - multifd_tls_channel_connect(p, ioc, &error); - if (!error) { - /* - * tls_channel_connect will call back to this - * function after the TLS handshake, - * so we mustn't call multifd_send_thread until then - */ - return true; - } else { - return false; - } + if (error) { + return false; + } + if (migrate_channel_requires_tls_upgrade(ioc)) { + multifd_tls_channel_connect(p, ioc, &error); + if (!error) { + /* + * tls_channel_connect will call back to this + * function after the TLS handshake, + * so we mustn't call multifd_send_thread until then + */ + return true; } else { - migration_ioc_register_yank(ioc); - p->registered_yank = true; - p->c = ioc; - qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, - QEMU_THREAD_JOINABLE); - } - return true; + return false; + } + } else { + migration_ioc_register_yank(ioc); + p->registered_yank = true; + p->c = ioc; + qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, + QEMU_THREAD_JOINABLE); } - - return false; + return true; } static void multifd_new_send_channel_cleanup(MultiFDSendParams *p, @@ -893,19 +892,15 @@ static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) Error *local_err = NULL; trace_multifd_new_send_channel_async(p->id); - if (qio_task_propagate_error(task, &local_err)) { - goto cleanup; - } else { + if (!qio_task_propagate_error(task, &local_err)) { p->c = QIO_CHANNEL(sioc); qio_channel_set_delay(p->c, false); p->running = true; - if (!multifd_channel_connect(p, sioc, local_err)) { - goto cleanup; + if (multifd_channel_connect(p, sioc, local_err)) { + return; } - return; } -cleanup: multifd_new_send_channel_cleanup(p, sioc, local_err); } @@ -918,10 +913,6 @@ int multifd_save_setup(Error **errp) if (!migrate_use_multifd()) { return 0; } - if (!migrate_multi_channels_is_allowed()) { - error_setg(errp, "multifd is not supported by current protocol"); - return -1; - } thread_count = migrate_multifd_channels(); multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); @@ -1022,26 +1013,33 @@ static void multifd_recv_terminate_threads(Error *err) } } -int multifd_load_cleanup(Error **errp) +void multifd_load_shutdown(void) +{ + if (migrate_use_multifd()) { + multifd_recv_terminate_threads(NULL); + } +} + +void multifd_load_cleanup(void) { int i; - if (!migrate_use_multifd() || !migrate_multi_channels_is_allowed()) { - return 0; + if (!migrate_use_multifd()) { + return; } multifd_recv_terminate_threads(NULL); for (i = 0; i < migrate_multifd_channels(); i++) { MultiFDRecvParams *p = &multifd_recv_state->params[i]; if (p->running) { - p->quit = true; /* * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code, * however try to wakeup it without harm in cleanup phase. */ qemu_sem_post(&p->sem_sync); - qemu_thread_join(&p->thread); } + + qemu_thread_join(&p->thread); } for (i = 0; i < migrate_multifd_channels(); i++) { MultiFDRecvParams *p = &multifd_recv_state->params[i]; @@ -1067,8 +1065,6 @@ int multifd_load_cleanup(Error **errp) multifd_recv_state->params = NULL; g_free(multifd_recv_state); multifd_recv_state = NULL; - - return 0; } void multifd_recv_sync_main(void) @@ -1116,10 +1112,7 @@ static void *multifd_recv_thread(void *opaque) ret = qio_channel_read_all_eof(p->c, (void *)p->packet, p->packet_len, &local_err); - if (ret == 0) { /* EOF */ - break; - } - if (ret == -1) { /* Error */ + if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */ break; } @@ -1180,10 +1173,6 @@ int multifd_load_setup(Error **errp) return 0; } - if (!migrate_multi_channels_is_allowed()) { - error_setg(errp, "multifd is not supported by current protocol"); - return -1; - } thread_count = migrate_multifd_channels(); multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); diff --git a/migration/multifd.c.orig b/migration/multifd.c.orig deleted file mode 100644 index ad89293b4e..0000000000 --- a/migration/multifd.c.orig +++ /dev/null @@ -1,1274 +0,0 @@ -/* - * Multifd common code - * - * Copyright (c) 2019-2020 Red Hat Inc - * - * Authors: - * Juan Quintela <quintela@redhat.com> - * - * This work is licensed under the terms of the GNU GPL, version 2 or later. - * See the COPYING file in the top-level directory. - */ - -#include "qemu/osdep.h" -#include "qemu/rcu.h" -#include "exec/target_page.h" -#include "sysemu/sysemu.h" -#include "exec/ramblock.h" -#include "qemu/error-report.h" -#include "qapi/error.h" -#include "ram.h" -#include "migration.h" -#include "socket.h" -#include "tls.h" -#include "qemu-file.h" -#include "trace.h" -#include "multifd.h" - -#include "qemu/yank.h" -#include "io/channel-socket.h" -#include "yank_functions.h" - -/* Multiple fd's */ - -#define MULTIFD_MAGIC 0x11223344U -#define MULTIFD_VERSION 1 - -typedef struct { - uint32_t magic; - uint32_t version; - unsigned char uuid[16]; /* QemuUUID */ - uint8_t id; - uint8_t unused1[7]; /* Reserved for future use */ - uint64_t unused2[4]; /* Reserved for future use */ -} __attribute__((packed)) MultiFDInit_t; - -/* Multifd without compression */ - -/** - * nocomp_send_setup: setup send side - * - * For no compression this function does nothing. - * - * Returns 0 for success or -1 for error - * - * @p: Params for the channel that we are using - * @errp: pointer to an error - */ -static int nocomp_send_setup(MultiFDSendParams *p, Error **errp) -{ - return 0; -} - -/** - * nocomp_send_cleanup: cleanup send side - * - * For no compression this function does nothing. - * - * @p: Params for the channel that we are using - * @errp: pointer to an error - */ -static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp) -{ - return; -} - -/** - * nocomp_send_prepare: prepare date to be able to send - * - * For no compression we just have to calculate the size of the - * packet. - * - * Returns 0 for success or -1 for error - * - * @p: Params for the channel that we are using - * @errp: pointer to an error - */ -static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp) -{ - MultiFDPages_t *pages = p->pages; - - for (int i = 0; i < p->normal_num; i++) { - p->iov[p->iovs_num].iov_base = pages->block->host + p->normal[i]; - p->iov[p->iovs_num].iov_len = p->page_size; - p->iovs_num++; - } - - p->next_packet_size = p->normal_num * p->page_size; - p->flags |= MULTIFD_FLAG_NOCOMP; - return 0; -} - -/** - * nocomp_recv_setup: setup receive side - * - * For no compression this function does nothing. - * - * Returns 0 for success or -1 for error - * - * @p: Params for the channel that we are using - * @errp: pointer to an error - */ -static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp) -{ - return 0; -} - -/** - * nocomp_recv_cleanup: setup receive side - * - * For no compression this function does nothing. - * - * @p: Params for the channel that we are using - */ -static void nocomp_recv_cleanup(MultiFDRecvParams *p) -{ -} - -/** - * nocomp_recv_pages: read the data from the channel into actual pages - * - * For no compression we just need to read things into the correct place. - * - * Returns 0 for success or -1 for error - * - * @p: Params for the channel that we are using - * @errp: pointer to an error - */ -static int nocomp_recv_pages(MultiFDRecvParams *p, Error **errp) -{ - uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; - - if (flags != MULTIFD_FLAG_NOCOMP) { - error_setg(errp, "multifd %u: flags received %x flags expected %x", - p->id, flags, MULTIFD_FLAG_NOCOMP); - return -1; - } - for (int i = 0; i < p->normal_num; i++) { - p->iov[i].iov_base = p->host + p->normal[i]; - p->iov[i].iov_len = p->page_size; - } - return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp); -} - -static MultiFDMethods multifd_nocomp_ops = { - .send_setup = nocomp_send_setup, - .send_cleanup = nocomp_send_cleanup, - .send_prepare = nocomp_send_prepare, - .recv_setup = nocomp_recv_setup, - .recv_cleanup = nocomp_recv_cleanup, - .recv_pages = nocomp_recv_pages -}; - -static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = { - [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops, -}; - -void multifd_register_ops(int method, MultiFDMethods *ops) -{ - assert(0 < method && method < MULTIFD_COMPRESSION__MAX); - multifd_ops[method] = ops; -} - -static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) -{ - MultiFDInit_t msg = {}; - int ret; - - msg.magic = cpu_to_be32(MULTIFD_MAGIC); - msg.version = cpu_to_be32(MULTIFD_VERSION); - msg.id = p->id; - memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); - - ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp); - if (ret != 0) { - return -1; - } - return 0; -} - -static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) -{ - MultiFDInit_t msg; - int ret; - - ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); - if (ret != 0) { - return -1; - } - - msg.magic = be32_to_cpu(msg.magic); - msg.version = be32_to_cpu(msg.version); - - if (msg.magic != MULTIFD_MAGIC) { - error_setg(errp, "multifd: received packet magic %x " - "expected %x", msg.magic, MULTIFD_MAGIC); - return -1; - } - - if (msg.version != MULTIFD_VERSION) { - error_setg(errp, "multifd: received packet version %u " - "expected %u", msg.version, MULTIFD_VERSION); - return -1; - } - - if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { - char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); - char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); - - error_setg(errp, "multifd: received uuid '%s' and expected " - "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); - g_free(uuid); - g_free(msg_uuid); - return -1; - } - - if (msg.id > migrate_multifd_channels()) { - error_setg(errp, "multifd: received channel version %u " - "expected %u", msg.version, MULTIFD_VERSION); - return -1; - } - - return msg.id; -} - -static MultiFDPages_t *multifd_pages_init(size_t size) -{ - MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1); - - pages->allocated = size; - pages->offset = g_new0(ram_addr_t, size); - - return pages; -} - -static void multifd_pages_clear(MultiFDPages_t *pages) -{ - pages->num = 0; - pages->allocated = 0; - pages->packet_num = 0; - pages->block = NULL; - g_free(pages->offset); - pages->offset = NULL; - g_free(pages); -} - -static void multifd_send_fill_packet(MultiFDSendParams *p) -{ - MultiFDPacket_t *packet = p->packet; - int i; - - packet->flags = cpu_to_be32(p->flags); - packet->pages_alloc = cpu_to_be32(p->pages->allocated); - packet->normal_pages = cpu_to_be32(p->normal_num); - packet->next_packet_size = cpu_to_be32(p->next_packet_size); - packet->packet_num = cpu_to_be64(p->packet_num); - - if (p->pages->block) { - strncpy(packet->ramblock, p->pages->block->idstr, 256); - } - - for (i = 0; i < p->normal_num; i++) { - /* there are architectures where ram_addr_t is 32 bit */ - uint64_t temp = p->normal[i]; - - packet->offset[i] = cpu_to_be64(temp); - } -} - -static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) -{ - MultiFDPacket_t *packet = p->packet; - RAMBlock *block; - int i; - - packet->magic = be32_to_cpu(packet->magic); - if (packet->magic != MULTIFD_MAGIC) { - error_setg(errp, "multifd: received packet " - "magic %x and expected magic %x", - packet->magic, MULTIFD_MAGIC); - return -1; - } - - packet->version = be32_to_cpu(packet->version); - if (packet->version != MULTIFD_VERSION) { - error_setg(errp, "multifd: received packet " - "version %u and expected version %u", - packet->version, MULTIFD_VERSION); - return -1; - } - - p->flags = be32_to_cpu(packet->flags); - - packet->pages_alloc = be32_to_cpu(packet->pages_alloc); - /* - * If we received a packet that is 100 times bigger than expected - * just stop migration. It is a magic number. - */ - if (packet->pages_alloc > p->page_count) { - error_setg(errp, "multifd: received packet " - "with size %u and expected a size of %u", - packet->pages_alloc, p->page_count) ; - return -1; - } - - p->normal_num = be32_to_cpu(packet->normal_pages); - if (p->normal_num > packet->pages_alloc) { - error_setg(errp, "multifd: received packet " - "with %u pages and expected maximum pages are %u", - p->normal_num, packet->pages_alloc) ; - return -1; - } - - p->next_packet_size = be32_to_cpu(packet->next_packet_size); - p->packet_num = be64_to_cpu(packet->packet_num); - - if (p->normal_num == 0) { - return 0; - } - - /* make sure that ramblock is 0 terminated */ - packet->ramblock[255] = 0; - block = qemu_ram_block_by_name(packet->ramblock); - if (!block) { - error_setg(errp, "multifd: unknown ram block %s", - packet->ramblock); - return -1; - } - - p->host = block->host; - for (i = 0; i < p->normal_num; i++) { - uint64_t offset = be64_to_cpu(packet->offset[i]); - - if (offset > (block->used_length - p->page_size)) { - error_setg(errp, "multifd: offset too long %" PRIu64 - " (max " RAM_ADDR_FMT ")", - offset, block->used_length); - return -1; - } - p->normal[i] = offset; - } - - return 0; -} - -struct { - MultiFDSendParams *params; - /* array of pages to sent */ - MultiFDPages_t *pages; - /* global number of generated multifd packets */ - uint64_t packet_num; - /* send channels ready */ - QemuSemaphore channels_ready; - /* - * Have we already run terminate threads. There is a race when it - * happens that we got one error while we are exiting. - * We will use atomic operations. Only valid values are 0 and 1. - */ - int exiting; - /* multifd ops */ - MultiFDMethods *ops; -} *multifd_send_state; - -/* - * How we use multifd_send_state->pages and channel->pages? - * - * We create a pages for each channel, and a main one. Each time that - * we need to send a batch of pages we interchange the ones between - * multifd_send_state and the channel that is sending it. There are - * two reasons for that: - * - to not have to do so many mallocs during migration - * - to make easier to know what to free at the end of migration - * - * This way we always know who is the owner of each "pages" struct, - * and we don't need any locking. It belongs to the migration thread - * or to the channel thread. Switching is safe because the migration - * thread is using the channel mutex when changing it, and the channel - * have to had finish with its own, otherwise pending_job can't be - * false. - */ - -static int multifd_send_pages(QEMUFile *f) -{ - int i; - static int next_channel; - MultiFDSendParams *p = NULL; /* make happy gcc */ - MultiFDPages_t *pages = multifd_send_state->pages; - uint64_t transferred; - - if (qatomic_read(&multifd_send_state->exiting)) { - return -1; - } - - qemu_sem_wait(&multifd_send_state->channels_ready); - /* - * next_channel can remain from a previous migration that was - * using more channels, so ensure it doesn't overflow if the - * limit is lower now. - */ - next_channel %= migrate_multifd_channels(); - for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { - p = &multifd_send_state->params[i]; - - qemu_mutex_lock(&p->mutex); - if (p->quit) { - error_report("%s: channel %d has already quit!", __func__, i); - qemu_mutex_unlock(&p->mutex); - return -1; - } - if (!p->pending_job) { - p->pending_job++; - next_channel = (i + 1) % migrate_multifd_channels(); - break; - } - qemu_mutex_unlock(&p->mutex); - } - assert(!p->pages->num); - assert(!p->pages->block); - - p->packet_num = multifd_send_state->packet_num++; - multifd_send_state->pages = p->pages; - p->pages = pages; - transferred = ((uint64_t) pages->num) * p->page_size + p->packet_len; - qemu_file_acct_rate_limit(f, transferred); - ram_counters.multifd_bytes += transferred; - stat64_add(&ram_atomic_counters.transferred, transferred); - qemu_mutex_unlock(&p->mutex); - qemu_sem_post(&p->sem); - - return 1; -} - -int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset) -{ - MultiFDPages_t *pages = multifd_send_state->pages; - bool changed = false; - - if (!pages->block) { - pages->block = block; - } - - if (pages->block == block) { - pages->offset[pages->num] = offset; - pages->num++; - - if (pages->num < pages->allocated) { - return 1; - } - } else { - changed = true; - } - - if (multifd_send_pages(f) < 0) { - return -1; - } - - if (changed) { - return multifd_queue_page(f, block, offset); - } - - return 1; -} - -static void multifd_send_terminate_threads(Error *err) -{ - int i; - - trace_multifd_send_terminate_threads(err != NULL); - - if (err) { - MigrationState *s = migrate_get_current(); - migrate_set_error(s, err); - if (s->state == MIGRATION_STATUS_SETUP || - s->state == MIGRATION_STATUS_PRE_SWITCHOVER || - s->state == MIGRATION_STATUS_DEVICE || - s->state == MIGRATION_STATUS_ACTIVE) { - migrate_set_state(&s->state, s->state, - MIGRATION_STATUS_FAILED); - } - } - - /* - * We don't want to exit each threads twice. Depending on where - * we get the error, or if there are two independent errors in two - * threads at the same time, we can end calling this function - * twice. - */ - if (qatomic_xchg(&multifd_send_state->exiting, 1)) { - return; - } - - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDSendParams *p = &multifd_send_state->params[i]; - - qemu_mutex_lock(&p->mutex); - p->quit = true; - qemu_sem_post(&p->sem); - if (p->c) { - qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); - } - qemu_mutex_unlock(&p->mutex); - } -} - -void multifd_save_cleanup(void) -{ - int i; - - if (!migrate_use_multifd() || !migrate_multi_channels_is_allowed()) { - return; - } - multifd_send_terminate_threads(NULL); - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDSendParams *p = &multifd_send_state->params[i]; - - if (p->running) { - qemu_thread_join(&p->thread); - } - } - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDSendParams *p = &multifd_send_state->params[i]; - Error *local_err = NULL; - - if (p->registered_yank) { - migration_ioc_unregister_yank(p->c); - } - socket_send_channel_destroy(p->c); - p->c = NULL; - qemu_mutex_destroy(&p->mutex); - qemu_sem_destroy(&p->sem); - qemu_sem_destroy(&p->sem_sync); - g_free(p->name); - p->name = NULL; - multifd_pages_clear(p->pages); - p->pages = NULL; - p->packet_len = 0; - g_free(p->packet); - p->packet = NULL; - g_free(p->iov); - p->iov = NULL; - g_free(p->normal); - p->normal = NULL; - multifd_send_state->ops->send_cleanup(p, &local_err); - if (local_err) { - migrate_set_error(migrate_get_current(), local_err); - error_free(local_err); - } - } - qemu_sem_destroy(&multifd_send_state->channels_ready); - g_free(multifd_send_state->params); - multifd_send_state->params = NULL; - multifd_pages_clear(multifd_send_state->pages); - multifd_send_state->pages = NULL; - g_free(multifd_send_state); - multifd_send_state = NULL; -} - -static int multifd_zero_copy_flush(QIOChannel *c) -{ - int ret; - Error *err = NULL; - - ret = qio_channel_flush(c, &err); - if (ret < 0) { - error_report_err(err); - return -1; - } - if (ret == 1) { - dirty_sync_missed_zero_copy(); - } - - return ret; -} - -int multifd_send_sync_main(QEMUFile *f) -{ - int i; - bool flush_zero_copy; - - if (!migrate_use_multifd()) { - return 0; - } - if (multifd_send_state->pages->num) { - if (multifd_send_pages(f) < 0) { - error_report("%s: multifd_send_pages fail", __func__); - return -1; - } - } - - /* - * When using zero-copy, it's necessary to flush the pages before any of - * the pages can be sent again, so we'll make sure the new version of the - * pages will always arrive _later_ than the old pages. - * - * Currently we achieve this by flushing the zero-page requested writes - * per ram iteration, but in the future we could potentially optimize it - * to be less frequent, e.g. only after we finished one whole scanning of - * all the dirty bitmaps. - */ - - flush_zero_copy = migrate_use_zero_copy_send(); - - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDSendParams *p = &multifd_send_state->params[i]; - - trace_multifd_send_sync_main_signal(p->id); - - qemu_mutex_lock(&p->mutex); - - if (p->quit) { - error_report("%s: channel %d has already quit", __func__, i); - qemu_mutex_unlock(&p->mutex); - return -1; - } - - p->packet_num = multifd_send_state->packet_num++; - p->flags |= MULTIFD_FLAG_SYNC; - p->pending_job++; - qemu_file_acct_rate_limit(f, p->packet_len); - ram_counters.multifd_bytes += p->packet_len; - stat64_add(&ram_atomic_counters.transferred, p->packet_len); - qemu_mutex_unlock(&p->mutex); - qemu_sem_post(&p->sem); - - if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) { - return -1; - } - } - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDSendParams *p = &multifd_send_state->params[i]; - - trace_multifd_send_sync_main_wait(p->id); - qemu_sem_wait(&p->sem_sync); - } - trace_multifd_send_sync_main(multifd_send_state->packet_num); - - return 0; -} - -static void *multifd_send_thread(void *opaque) -{ - MultiFDSendParams *p = opaque; - Error *local_err = NULL; - int ret = 0; - bool use_zero_copy_send = migrate_use_zero_copy_send(); - - trace_multifd_send_thread_start(p->id); - rcu_register_thread(); - - if (multifd_send_initial_packet(p, &local_err) < 0) { - ret = -1; - goto out; - } - /* initial packet */ - p->num_packets = 1; - - while (true) { - qemu_sem_wait(&p->sem); - - if (qatomic_read(&multifd_send_state->exiting)) { - break; - } - qemu_mutex_lock(&p->mutex); - - if (p->pending_job) { - uint64_t packet_num = p->packet_num; - uint32_t flags = p->flags; - p->normal_num = 0; - - if (use_zero_copy_send) { - p->iovs_num = 0; - } else { - p->iovs_num = 1; - } - - for (int i = 0; i < p->pages->num; i++) { - p->normal[p->normal_num] = p->pages->offset[i]; - p->normal_num++; - } - - if (p->normal_num) { - ret = multifd_send_state->ops->send_prepare(p, &local_err); - if (ret != 0) { - qemu_mutex_unlock(&p->mutex); - break; - } - } - multifd_send_fill_packet(p); - p->flags = 0; - p->num_packets++; - p->total_normal_pages += p->normal_num; - p->pages->num = 0; - p->pages->block = NULL; - qemu_mutex_unlock(&p->mutex); - - trace_multifd_send(p->id, packet_num, p->normal_num, flags, - p->next_packet_size); - - if (use_zero_copy_send) { - /* Send header first, without zerocopy */ - ret = qio_channel_write_all(p->c, (void *)p->packet, - p->packet_len, &local_err); - if (ret != 0) { - break; - } - } else { - /* Send header using the same writev call */ - p->iov[0].iov_len = p->packet_len; - p->iov[0].iov_base = p->packet; - } - - ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL, - 0, p->write_flags, &local_err); - if (ret != 0) { - break; - } - - qemu_mutex_lock(&p->mutex); - p->pending_job--; - qemu_mutex_unlock(&p->mutex); - - if (flags & MULTIFD_FLAG_SYNC) { - qemu_sem_post(&p->sem_sync); - } - qemu_sem_post(&multifd_send_state->channels_ready); - } else if (p->quit) { - qemu_mutex_unlock(&p->mutex); - break; - } else { - qemu_mutex_unlock(&p->mutex); - /* sometimes there are spurious wakeups */ - } - } - -out: - if (local_err) { - trace_multifd_send_error(p->id); - multifd_send_terminate_threads(local_err); - error_free(local_err); - } - - /* - * Error happen, I will exit, but I can't just leave, tell - * who pay attention to me. - */ - if (ret != 0) { - qemu_sem_post(&p->sem_sync); - qemu_sem_post(&multifd_send_state->channels_ready); - } - - qemu_mutex_lock(&p->mutex); - p->running = false; - qemu_mutex_unlock(&p->mutex); - - rcu_unregister_thread(); - trace_multifd_send_thread_end(p->id, p->num_packets, p->total_normal_pages); - - return NULL; -} - -static bool multifd_channel_connect(MultiFDSendParams *p, - QIOChannel *ioc, - Error *error); - -static void multifd_tls_outgoing_handshake(QIOTask *task, - gpointer opaque) -{ - MultiFDSendParams *p = opaque; - QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task)); - Error *err = NULL; - - if (qio_task_propagate_error(task, &err)) { - trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err)); - } else { - trace_multifd_tls_outgoing_handshake_complete(ioc); - } - - if (!multifd_channel_connect(p, ioc, err)) { - /* - * Error happen, mark multifd_send_thread status as 'quit' although it - * is not created, and then tell who pay attention to me. - */ - p->quit = true; - qemu_sem_post(&multifd_send_state->channels_ready); - qemu_sem_post(&p->sem_sync); - } -} - -static void *multifd_tls_handshake_thread(void *opaque) -{ - MultiFDSendParams *p = opaque; - QIOChannelTLS *tioc = QIO_CHANNEL_TLS(p->c); - - qio_channel_tls_handshake(tioc, - multifd_tls_outgoing_handshake, - p, - NULL, - NULL); - return NULL; -} - -static void multifd_tls_channel_connect(MultiFDSendParams *p, - QIOChannel *ioc, - Error **errp) -{ - MigrationState *s = migrate_get_current(); - const char *hostname = s->hostname; - QIOChannelTLS *tioc; - - tioc = migration_tls_client_create(s, ioc, hostname, errp); - if (!tioc) { - return; - } - - object_unref(OBJECT(ioc)); - trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname); - qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing"); - p->c = QIO_CHANNEL(tioc); - qemu_thread_create(&p->thread, "multifd-tls-handshake-worker", - multifd_tls_handshake_thread, p, - QEMU_THREAD_JOINABLE); -} - -static bool multifd_channel_connect(MultiFDSendParams *p, - QIOChannel *ioc, - Error *error) -{ - trace_multifd_set_outgoing_channel( - ioc, object_get_typename(OBJECT(ioc)), - migrate_get_current()->hostname, error); - - if (!error) { - if (migrate_channel_requires_tls_upgrade(ioc)) { - multifd_tls_channel_connect(p, ioc, &error); - if (!error) { - /* - * tls_channel_connect will call back to this - * function after the TLS handshake, - * so we mustn't call multifd_send_thread until then - */ - return true; - } else { - return false; - } - } else { - migration_ioc_register_yank(ioc); - p->registered_yank = true; - p->c = ioc; - qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, - QEMU_THREAD_JOINABLE); - } - return true; - } - - return false; -} - -static void multifd_new_send_channel_cleanup(MultiFDSendParams *p, - QIOChannel *ioc, Error *err) -{ - migrate_set_error(migrate_get_current(), err); - /* Error happen, we need to tell who pay attention to me */ - qemu_sem_post(&multifd_send_state->channels_ready); - qemu_sem_post(&p->sem_sync); - /* - * Although multifd_send_thread is not created, but main migration - * thread neet to judge whether it is running, so we need to mark - * its status. - */ - p->quit = true; - object_unref(OBJECT(ioc)); - error_free(err); -} - -static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) -{ - MultiFDSendParams *p = opaque; - QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task)); - Error *local_err = NULL; - - trace_multifd_new_send_channel_async(p->id); - if (qio_task_propagate_error(task, &local_err)) { - goto cleanup; - } else { - p->c = QIO_CHANNEL(sioc); - qio_channel_set_delay(p->c, false); - p->running = true; - if (!multifd_channel_connect(p, sioc, local_err)) { - goto cleanup; - } - return; - } - -cleanup: - multifd_new_send_channel_cleanup(p, sioc, local_err); -} - -int multifd_save_setup(Error **errp) -{ - int thread_count; - uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); - uint8_t i; - - if (!migrate_use_multifd()) { - return 0; - } - if (!migrate_multi_channels_is_allowed()) { - error_setg(errp, "multifd is not supported by current protocol"); - return -1; - } - - thread_count = migrate_multifd_channels(); - multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); - multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); - multifd_send_state->pages = multifd_pages_init(page_count); - qemu_sem_init(&multifd_send_state->channels_ready, 0); - qatomic_set(&multifd_send_state->exiting, 0); - multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; - - for (i = 0; i < thread_count; i++) { - MultiFDSendParams *p = &multifd_send_state->params[i]; - - qemu_mutex_init(&p->mutex); - qemu_sem_init(&p->sem, 0); - qemu_sem_init(&p->sem_sync, 0); - p->quit = false; - p->pending_job = 0; - p->id = i; - p->pages = multifd_pages_init(page_count); - p->packet_len = sizeof(MultiFDPacket_t) - + sizeof(uint64_t) * page_count; - p->packet = g_malloc0(p->packet_len); - p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); - p->packet->version = cpu_to_be32(MULTIFD_VERSION); - p->name = g_strdup_printf("multifdsend_%d", i); - /* We need one extra place for the packet header */ - p->iov = g_new0(struct iovec, page_count + 1); - p->normal = g_new0(ram_addr_t, page_count); - p->page_size = qemu_target_page_size(); - p->page_count = page_count; - - if (migrate_use_zero_copy_send()) { - p->write_flags = QIO_CHANNEL_WRITE_FLAG_ZERO_COPY; - } else { - p->write_flags = 0; - } - - socket_send_channel_create(multifd_new_send_channel_async, p); - } - - for (i = 0; i < thread_count; i++) { - MultiFDSendParams *p = &multifd_send_state->params[i]; - Error *local_err = NULL; - int ret; - - ret = multifd_send_state->ops->send_setup(p, &local_err); - if (ret) { - error_propagate(errp, local_err); - return ret; - } - } - return 0; -} - -struct { - MultiFDRecvParams *params; - /* number of created threads */ - int count; - /* syncs main thread and channels */ - QemuSemaphore sem_sync; - /* global number of generated multifd packets */ - uint64_t packet_num; - /* multifd ops */ - MultiFDMethods *ops; -} *multifd_recv_state; - -static void multifd_recv_terminate_threads(Error *err) -{ - int i; - - trace_multifd_recv_terminate_threads(err != NULL); - - if (err) { - MigrationState *s = migrate_get_current(); - migrate_set_error(s, err); - if (s->state == MIGRATION_STATUS_SETUP || - s->state == MIGRATION_STATUS_ACTIVE) { - migrate_set_state(&s->state, s->state, - MIGRATION_STATUS_FAILED); - } - } - - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDRecvParams *p = &multifd_recv_state->params[i]; - - qemu_mutex_lock(&p->mutex); - p->quit = true; - /* - * We could arrive here for two reasons: - * - normal quit, i.e. everything went fine, just finished - * - error quit: We close the channels so the channel threads - * finish the qio_channel_read_all_eof() - */ - if (p->c) { - qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); - } - qemu_mutex_unlock(&p->mutex); - } -} - -int multifd_load_cleanup(Error **errp) -{ - int i; - - if (!migrate_use_multifd() || !migrate_multi_channels_is_allowed()) { - return 0; - } - multifd_recv_terminate_threads(NULL); - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDRecvParams *p = &multifd_recv_state->params[i]; - - if (p->running) { - p->quit = true; - /* - * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code, - * however try to wakeup it without harm in cleanup phase. - */ - qemu_sem_post(&p->sem_sync); - qemu_thread_join(&p->thread); - } - } - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDRecvParams *p = &multifd_recv_state->params[i]; - - migration_ioc_unregister_yank(p->c); - object_unref(OBJECT(p->c)); - p->c = NULL; - qemu_mutex_destroy(&p->mutex); - qemu_sem_destroy(&p->sem_sync); - g_free(p->name); - p->name = NULL; - p->packet_len = 0; - g_free(p->packet); - p->packet = NULL; - g_free(p->iov); - p->iov = NULL; - g_free(p->normal); - p->normal = NULL; - multifd_recv_state->ops->recv_cleanup(p); - } - qemu_sem_destroy(&multifd_recv_state->sem_sync); - g_free(multifd_recv_state->params); - multifd_recv_state->params = NULL; - g_free(multifd_recv_state); - multifd_recv_state = NULL; - - return 0; -} - -void multifd_recv_sync_main(void) -{ - int i; - - if (!migrate_use_multifd()) { - return; - } - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDRecvParams *p = &multifd_recv_state->params[i]; - - trace_multifd_recv_sync_main_wait(p->id); - qemu_sem_wait(&multifd_recv_state->sem_sync); - } - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDRecvParams *p = &multifd_recv_state->params[i]; - - WITH_QEMU_LOCK_GUARD(&p->mutex) { - if (multifd_recv_state->packet_num < p->packet_num) { - multifd_recv_state->packet_num = p->packet_num; - } - } - trace_multifd_recv_sync_main_signal(p->id); - qemu_sem_post(&p->sem_sync); - } - trace_multifd_recv_sync_main(multifd_recv_state->packet_num); -} - -static void *multifd_recv_thread(void *opaque) -{ - MultiFDRecvParams *p = opaque; - Error *local_err = NULL; - int ret; - - trace_multifd_recv_thread_start(p->id); - rcu_register_thread(); - - while (true) { - uint32_t flags; - - if (p->quit) { - break; - } - - ret = qio_channel_read_all_eof(p->c, (void *)p->packet, - p->packet_len, &local_err); - if (ret == 0) { /* EOF */ - break; - } - if (ret == -1) { /* Error */ - break; - } - - qemu_mutex_lock(&p->mutex); - ret = multifd_recv_unfill_packet(p, &local_err); - if (ret) { - qemu_mutex_unlock(&p->mutex); - break; - } - - flags = p->flags; - /* recv methods don't know how to handle the SYNC flag */ - p->flags &= ~MULTIFD_FLAG_SYNC; - trace_multifd_recv(p->id, p->packet_num, p->normal_num, flags, - p->next_packet_size); - p->num_packets++; - p->total_normal_pages += p->normal_num; - qemu_mutex_unlock(&p->mutex); - - if (p->normal_num) { - ret = multifd_recv_state->ops->recv_pages(p, &local_err); - if (ret != 0) { - break; - } - } - - if (flags & MULTIFD_FLAG_SYNC) { - qemu_sem_post(&multifd_recv_state->sem_sync); - qemu_sem_wait(&p->sem_sync); - } - } - - if (local_err) { - multifd_recv_terminate_threads(local_err); - error_free(local_err); - } - qemu_mutex_lock(&p->mutex); - p->running = false; - qemu_mutex_unlock(&p->mutex); - - rcu_unregister_thread(); - trace_multifd_recv_thread_end(p->id, p->num_packets, p->total_normal_pages); - - return NULL; -} - -int multifd_load_setup(Error **errp) -{ - int thread_count; - uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); - uint8_t i; - - /* - * Return successfully if multiFD recv state is already initialised - * or multiFD is not enabled. - */ - if (multifd_recv_state || !migrate_use_multifd()) { - return 0; - } - - if (!migrate_multi_channels_is_allowed()) { - error_setg(errp, "multifd is not supported by current protocol"); - return -1; - } - thread_count = migrate_multifd_channels(); - multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); - multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); - qatomic_set(&multifd_recv_state->count, 0); - qemu_sem_init(&multifd_recv_state->sem_sync, 0); - multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()]; - - for (i = 0; i < thread_count; i++) { - MultiFDRecvParams *p = &multifd_recv_state->params[i]; - - qemu_mutex_init(&p->mutex); - qemu_sem_init(&p->sem_sync, 0); - p->quit = false; - p->id = i; - p->packet_len = sizeof(MultiFDPacket_t) - + sizeof(uint64_t) * page_count; - p->packet = g_malloc0(p->packet_len); - p->name = g_strdup_printf("multifdrecv_%d", i); - p->iov = g_new0(struct iovec, page_count); - p->normal = g_new0(ram_addr_t, page_count); - p->page_count = page_count; - p->page_size = qemu_target_page_size(); - } - - for (i = 0; i < thread_count; i++) { - MultiFDRecvParams *p = &multifd_recv_state->params[i]; - Error *local_err = NULL; - int ret; - - ret = multifd_recv_state->ops->recv_setup(p, &local_err); - if (ret) { - error_propagate(errp, local_err); - return ret; - } - } - return 0; -} - -bool multifd_recv_all_channels_created(void) -{ - int thread_count = migrate_multifd_channels(); - - if (!migrate_use_multifd()) { - return true; - } - - if (!multifd_recv_state) { - /* Called before any connections created */ - return false; - } - - return thread_count == qatomic_read(&multifd_recv_state->count); -} - -/* - * Try to receive all multifd channels to get ready for the migration. - * Sets @errp when failing to receive the current channel. - */ -void multifd_recv_new_channel(QIOChannel *ioc, Error **errp) -{ - MultiFDRecvParams *p; - Error *local_err = NULL; - int id; - - id = multifd_recv_initial_packet(ioc, &local_err); - if (id < 0) { - multifd_recv_terminate_threads(local_err); - error_propagate_prepend(errp, local_err, - "failed to receive packet" - " via multifd channel %d: ", - qatomic_read(&multifd_recv_state->count)); - return; - } - trace_multifd_recv_new_channel(id); - - p = &multifd_recv_state->params[id]; - if (p->c != NULL) { - error_setg(&local_err, "multifd: received id '%d' already setup'", - id); - multifd_recv_terminate_threads(local_err); - error_propagate(errp, local_err); - return; - } - p->c = ioc; - object_ref(OBJECT(ioc)); - /* initial packet */ - p->num_packets = 1; - - p->running = true; - qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, - QEMU_THREAD_JOINABLE); - qatomic_inc(&multifd_recv_state->count); -} diff --git a/migration/multifd.h b/migration/multifd.h index ff3aa2e2e9..7cfc265148 100644 --- a/migration/multifd.h +++ b/migration/multifd.h @@ -16,7 +16,8 @@ int multifd_save_setup(Error **errp); void multifd_save_cleanup(void); int multifd_load_setup(Error **errp); -int multifd_load_cleanup(Error **errp); +void multifd_load_cleanup(void); +void multifd_load_shutdown(void); bool multifd_recv_all_channels_created(void); void multifd_recv_new_channel(QIOChannel *ioc, Error **errp); void multifd_recv_sync_main(void); diff --git a/migration/postcopy-ram.c b/migration/postcopy-ram.c index 53299b7a5e..f54f44d899 100644 --- a/migration/postcopy-ram.c +++ b/migration/postcopy-ram.c @@ -1198,6 +1198,11 @@ int postcopy_ram_incoming_setup(MigrationIncomingState *mis) if (migrate_postcopy_preempt()) { /* + * The preempt channel is established in asynchronous way. Wait + * for its completion. + */ + qemu_sem_wait(&mis->postcopy_qemufile_dst_done); + /* * This thread needs to be created after the temp pages because * it'll fetch RAM_CHANNEL_POSTCOPY PostcopyTmpPage immediately. */ @@ -1544,6 +1549,7 @@ void postcopy_preempt_new_channel(MigrationIncomingState *mis, QEMUFile *file) */ qemu_file_set_blocking(file, true); mis->postcopy_qemufile_dst = file; + qemu_sem_post(&mis->postcopy_qemufile_dst_done); trace_postcopy_preempt_new_channel(); } @@ -1612,14 +1618,21 @@ out: postcopy_preempt_send_channel_done(s, ioc, local_err); } -/* Returns 0 if channel established, -1 for error. */ -int postcopy_preempt_wait_channel(MigrationState *s) +/* + * This function will kick off an async task to establish the preempt + * channel, and wait until the connection setup completed. Returns 0 if + * channel established, -1 for error. + */ +int postcopy_preempt_establish_channel(MigrationState *s) { /* If preempt not enabled, no need to wait */ if (!migrate_postcopy_preempt()) { return 0; } + /* Kick off async task to establish preempt channel */ + postcopy_preempt_setup(s); + /* * We need the postcopy preempt channel to be established before * starting doing anything. @@ -1629,22 +1642,10 @@ int postcopy_preempt_wait_channel(MigrationState *s) return s->postcopy_qemufile_src ? 0 : -1; } -int postcopy_preempt_setup(MigrationState *s, Error **errp) +void postcopy_preempt_setup(MigrationState *s) { - if (!migrate_postcopy_preempt()) { - return 0; - } - - if (!migrate_multi_channels_is_allowed()) { - error_setg(errp, "Postcopy preempt is not supported as current " - "migration stream does not support multi-channels."); - return -1; - } - /* Kick an async task to connect */ socket_send_channel_create(postcopy_preempt_send_channel_new, s); - - return 0; } static void postcopy_pause_ram_fast_load(MigrationIncomingState *mis) diff --git a/migration/postcopy-ram.h b/migration/postcopy-ram.h index 25881c4127..b4867a32d5 100644 --- a/migration/postcopy-ram.h +++ b/migration/postcopy-ram.h @@ -191,7 +191,7 @@ enum PostcopyChannels { }; void postcopy_preempt_new_channel(MigrationIncomingState *mis, QEMUFile *file); -int postcopy_preempt_setup(MigrationState *s, Error **errp); -int postcopy_preempt_wait_channel(MigrationState *s); +void postcopy_preempt_setup(MigrationState *s); +int postcopy_preempt_establish_channel(MigrationState *s); #endif diff --git a/migration/qemu-file.c b/migration/qemu-file.c index 2d5f74ffc2..102ab3b439 100644 --- a/migration/qemu-file.c +++ b/migration/qemu-file.c @@ -940,3 +940,37 @@ QIOChannel *qemu_file_get_ioc(QEMUFile *file) { return file->ioc; } + +/* + * Read size bytes from QEMUFile f and write them to fd. + */ +int qemu_file_get_to_fd(QEMUFile *f, int fd, size_t size) +{ + while (size) { + size_t pending = f->buf_size - f->buf_index; + ssize_t rc; + + if (!pending) { + rc = qemu_fill_buffer(f); + if (rc < 0) { + return rc; + } + if (rc == 0) { + return -EIO; + } + continue; + } + + rc = write(fd, f->buf + f->buf_index, MIN(pending, size)); + if (rc < 0) { + return -errno; + } + if (rc == 0) { + return -EIO; + } + f->buf_index += rc; + size -= rc; + } + + return 0; +} diff --git a/migration/qemu-file.h b/migration/qemu-file.h index fa13d04d78..9d0155a2a1 100644 --- a/migration/qemu-file.h +++ b/migration/qemu-file.h @@ -148,6 +148,7 @@ int qemu_file_shutdown(QEMUFile *f); QEMUFile *qemu_file_get_return_path(QEMUFile *f); void qemu_fflush(QEMUFile *f); void qemu_file_set_blocking(QEMUFile *f, bool block); +int qemu_file_get_to_fd(QEMUFile *f, int fd, size_t size); void ram_control_before_iterate(QEMUFile *f, uint64_t flags); void ram_control_after_iterate(QEMUFile *f, uint64_t flags); diff --git a/migration/ram.c b/migration/ram.c index b966e148c2..96e8a19a58 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -67,21 +67,53 @@ /***********************************************************/ /* ram save/restore */ -/* RAM_SAVE_FLAG_ZERO used to be named RAM_SAVE_FLAG_COMPRESS, it - * worked for pages that where filled with the same char. We switched +/* + * RAM_SAVE_FLAG_ZERO used to be named RAM_SAVE_FLAG_COMPRESS, it + * worked for pages that were filled with the same char. We switched * it to only search for the zero value. And to avoid confusion with - * RAM_SSAVE_FLAG_COMPRESS_PAGE just rename it. + * RAM_SAVE_FLAG_COMPRESS_PAGE just rename it. */ - -#define RAM_SAVE_FLAG_FULL 0x01 /* Obsolete, not used anymore */ +/* + * RAM_SAVE_FLAG_FULL was obsoleted in 2009, it can be reused now + */ +#define RAM_SAVE_FLAG_FULL 0x01 #define RAM_SAVE_FLAG_ZERO 0x02 #define RAM_SAVE_FLAG_MEM_SIZE 0x04 #define RAM_SAVE_FLAG_PAGE 0x08 #define RAM_SAVE_FLAG_EOS 0x10 #define RAM_SAVE_FLAG_CONTINUE 0x20 #define RAM_SAVE_FLAG_XBZRLE 0x40 -/* 0x80 is reserved in migration.h start with 0x100 next */ +/* 0x80 is reserved in qemu-file.h for RAM_SAVE_FLAG_HOOK */ #define RAM_SAVE_FLAG_COMPRESS_PAGE 0x100 +/* We can't use any flag that is bigger than 0x200 */ + +int (*xbzrle_encode_buffer_func)(uint8_t *, uint8_t *, int, + uint8_t *, int) = xbzrle_encode_buffer; +#if defined(CONFIG_AVX512BW_OPT) +#include "qemu/cpuid.h" +static void __attribute__((constructor)) init_cpu_flag(void) +{ + unsigned max = __get_cpuid_max(0, NULL); + int a, b, c, d; + if (max >= 1) { + __cpuid(1, a, b, c, d); + /* We must check that AVX is not just available, but usable. */ + if ((c & bit_OSXSAVE) && (c & bit_AVX) && max >= 7) { + int bv; + __asm("xgetbv" : "=a"(bv), "=d"(d) : "c"(0)); + __cpuid_count(7, 0, a, b, c, d); + /* 0xe6: + * XCR0[7:5] = 111b (OPMASK state, upper 256-bit of ZMM0-ZMM15 + * and ZMM16-ZMM31 state are enabled by OS) + * XCR0[2:1] = 11b (XMM state and YMM state are enabled by OS) + */ + if ((bv & 0xe6) == 0xe6 && (b & bit_AVX512BW)) { + xbzrle_encode_buffer_func = xbzrle_encode_buffer_avx512; + } + } + } +} +#endif XBZRLECacheStats xbzrle_counters; @@ -330,6 +362,8 @@ struct RAMState { PageSearchStatus pss[RAM_CHANNEL_MAX]; /* UFFD file descriptor, used in 'write-tracking' migration */ int uffdio_fd; + /* total ram size in bytes */ + uint64_t ram_bytes_total; /* Last block that we have visited searching for dirty pages */ RAMBlock *last_seen_block; /* Last dirty target page we have sent */ @@ -450,6 +484,13 @@ void dirty_sync_missed_zero_copy(void) ram_counters.dirty_sync_missed_zero_copy++; } +struct MigrationOps { + int (*ram_save_target_page)(RAMState *rs, PageSearchStatus *pss); +}; +typedef struct MigrationOps MigrationOps; + +MigrationOps *migration_ops; + CompressionStats compression_counters; struct CompressParam { @@ -797,9 +838,9 @@ static int save_xbzrle_page(RAMState *rs, PageSearchStatus *pss, memcpy(XBZRLE.current_buf, *current_data, TARGET_PAGE_SIZE); /* XBZRLE encoding (if there is no overflow) */ - encoded_len = xbzrle_encode_buffer(prev_cached_page, XBZRLE.current_buf, - TARGET_PAGE_SIZE, XBZRLE.encoded_buf, - TARGET_PAGE_SIZE); + encoded_len = xbzrle_encode_buffer_func(prev_cached_page, XBZRLE.current_buf, + TARGET_PAGE_SIZE, XBZRLE.encoded_buf, + TARGET_PAGE_SIZE); /* * Update the cache contents, so that it corresponds to the data @@ -1546,17 +1587,23 @@ retry: return pages; } +#define PAGE_ALL_CLEAN 0 +#define PAGE_TRY_AGAIN 1 +#define PAGE_DIRTY_FOUND 2 /** * find_dirty_block: find the next dirty page and update any state * associated with the search process. * - * Returns true if a page is found + * Returns: + * PAGE_ALL_CLEAN: no dirty page found, give up + * PAGE_TRY_AGAIN: no dirty page found, retry for next block + * PAGE_DIRTY_FOUND: dirty page found * * @rs: current RAM state * @pss: data about the state of the current dirty page scan * @again: set to false if the search has scanned the whole of RAM */ -static bool find_dirty_block(RAMState *rs, PageSearchStatus *pss, bool *again) +static int find_dirty_block(RAMState *rs, PageSearchStatus *pss) { /* Update pss->page for the next dirty bit in ramblock */ pss_find_next_dirty(pss); @@ -1567,8 +1614,7 @@ static bool find_dirty_block(RAMState *rs, PageSearchStatus *pss, bool *again) * We've been once around the RAM and haven't found anything. * Give up. */ - *again = false; - return false; + return PAGE_ALL_CLEAN; } if (!offset_in_ramblock(pss->block, ((ram_addr_t)pss->page) << TARGET_PAGE_BITS)) { @@ -1597,13 +1643,10 @@ static bool find_dirty_block(RAMState *rs, PageSearchStatus *pss, bool *again) } } /* Didn't find anything this time, but try again on the new block */ - *again = true; - return false; + return PAGE_TRY_AGAIN; } else { - /* Can go around again, but... */ - *again = true; - /* We've found something so probably don't need to */ - return true; + /* We've found something */ + return PAGE_DIRTY_FOUND; } } @@ -2291,14 +2334,14 @@ static bool save_compress_page(RAMState *rs, PageSearchStatus *pss, } /** - * ram_save_target_page: save one target page + * ram_save_target_page_legacy: save one target page * * Returns the number of pages written * * @rs: current RAM state * @pss: data about the page we want to send */ -static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss) +static int ram_save_target_page_legacy(RAMState *rs, PageSearchStatus *pss) { RAMBlock *block = pss->block; ram_addr_t offset = ((ram_addr_t)pss->page) << TARGET_PAGE_BITS; @@ -2424,7 +2467,7 @@ static int ram_save_host_page_urgent(PageSearchStatus *pss) if (page_dirty) { /* Be strict to return code; it must be 1, or what else? */ - if (ram_save_target_page(rs, pss) != 1) { + if (migration_ops->ram_save_target_page(rs, pss) != 1) { error_report_once("%s: ram_save_target_page failed", __func__); ret = -1; goto out; @@ -2493,7 +2536,7 @@ static int ram_save_host_page(RAMState *rs, PageSearchStatus *pss) if (preempt_active) { qemu_mutex_unlock(&rs->bitmap_mutex); } - tmppages = ram_save_target_page(rs, pss); + tmppages = migration_ops->ram_save_target_page(rs, pss); if (tmppages >= 0) { pages += tmppages; /* @@ -2542,10 +2585,9 @@ static int ram_find_and_save_block(RAMState *rs) { PageSearchStatus *pss = &rs->pss[RAM_CHANNEL_PRECOPY]; int pages = 0; - bool again, found; /* No dirty page as there is zero RAM */ - if (!ram_bytes_total()) { + if (!rs->ram_bytes_total) { return pages; } @@ -2563,19 +2605,23 @@ static int ram_find_and_save_block(RAMState *rs) pss_init(pss, rs->last_seen_block, rs->last_page); - do { - again = true; - found = get_queued_page(rs, pss); - - if (!found) { + while (true){ + if (!get_queued_page(rs, pss)) { /* priority queue empty, so just search for something dirty */ - found = find_dirty_block(rs, pss, &again); + int res = find_dirty_block(rs, pss); + if (res != PAGE_DIRTY_FOUND) { + if (res == PAGE_ALL_CLEAN) { + break; + } else if (res == PAGE_TRY_AGAIN) { + continue; + } + } } - - if (found) { - pages = ram_save_host_page(rs, pss); + pages = ram_save_host_page(rs, pss); + if (pages) { + break; } - } while (!pages && again); + } rs->last_seen_block = pss->block; rs->last_page = pss->page; @@ -2596,28 +2642,30 @@ void acct_update_position(QEMUFile *f, size_t size, bool zero) } } -static uint64_t ram_bytes_total_common(bool count_ignored) +static uint64_t ram_bytes_total_with_ignored(void) { RAMBlock *block; uint64_t total = 0; RCU_READ_LOCK_GUARD(); - if (count_ignored) { - RAMBLOCK_FOREACH_MIGRATABLE(block) { - total += block->used_length; - } - } else { - RAMBLOCK_FOREACH_NOT_IGNORED(block) { - total += block->used_length; - } + RAMBLOCK_FOREACH_MIGRATABLE(block) { + total += block->used_length; } return total; } uint64_t ram_bytes_total(void) { - return ram_bytes_total_common(false); + RAMBlock *block; + uint64_t total = 0; + + RCU_READ_LOCK_GUARD(); + + RAMBLOCK_FOREACH_NOT_IGNORED(block) { + total += block->used_length; + } + return total; } static void xbzrle_load_setup(void) @@ -2688,6 +2736,8 @@ static void ram_save_cleanup(void *opaque) xbzrle_cleanup(); compress_threads_save_cleanup(); ram_state_cleanup(rsp); + g_free(migration_ops); + migration_ops = NULL; } static void ram_state_reset(RAMState *rs) @@ -3002,13 +3052,14 @@ static int ram_state_init(RAMState **rsp) qemu_mutex_init(&(*rsp)->bitmap_mutex); qemu_mutex_init(&(*rsp)->src_page_req_mutex); QSIMPLEQ_INIT(&(*rsp)->src_page_requests); + (*rsp)->ram_bytes_total = ram_bytes_total(); /* * Count the total number of pages used by ram blocks not including any * gaps due to alignment or unplugs. * This must match with the initial values of dirty bitmap. */ - (*rsp)->migration_dirty_pages = ram_bytes_total() >> TARGET_PAGE_BITS; + (*rsp)->migration_dirty_pages = (*rsp)->ram_bytes_total >> TARGET_PAGE_BITS; ram_state_reset(*rsp); return 0; @@ -3222,7 +3273,8 @@ static int ram_save_setup(QEMUFile *f, void *opaque) (*rsp)->pss[RAM_CHANNEL_PRECOPY].pss_channel = f; WITH_RCU_READ_LOCK_GUARD() { - qemu_put_be64(f, ram_bytes_total_common(true) | RAM_SAVE_FLAG_MEM_SIZE); + qemu_put_be64(f, ram_bytes_total_with_ignored() + | RAM_SAVE_FLAG_MEM_SIZE); RAMBLOCK_FOREACH_MIGRATABLE(block) { qemu_put_byte(f, strlen(block->idstr)); @@ -3241,6 +3293,8 @@ static int ram_save_setup(QEMUFile *f, void *opaque) ram_control_before_iterate(f, RAM_CONTROL_SETUP); ram_control_after_iterate(f, RAM_CONTROL_SETUP); + migration_ops = g_malloc0(sizeof(MigrationOps)); + migration_ops->ram_save_target_page = ram_save_target_page_legacy; ret = multifd_send_sync_main(f); if (ret < 0) { return ret; @@ -3435,10 +3489,8 @@ static int ram_save_complete(QEMUFile *f, void *opaque) return 0; } -static void ram_state_pending_estimate(void *opaque, - uint64_t *res_precopy_only, - uint64_t *res_compatible, - uint64_t *res_postcopy_only) +static void ram_state_pending_estimate(void *opaque, uint64_t *must_precopy, + uint64_t *can_postcopy) { RAMState **temp = opaque; RAMState *rs = *temp; @@ -3447,16 +3499,14 @@ static void ram_state_pending_estimate(void *opaque, if (migrate_postcopy_ram()) { /* We can do postcopy, and all the data is postcopiable */ - *res_postcopy_only += remaining_size; + *can_postcopy += remaining_size; } else { - *res_precopy_only += remaining_size; + *must_precopy += remaining_size; } } -static void ram_state_pending_exact(void *opaque, - uint64_t *res_precopy_only, - uint64_t *res_compatible, - uint64_t *res_postcopy_only) +static void ram_state_pending_exact(void *opaque, uint64_t *must_precopy, + uint64_t *can_postcopy) { RAMState **temp = opaque; RAMState *rs = *temp; @@ -3474,9 +3524,9 @@ static void ram_state_pending_exact(void *opaque, if (migrate_postcopy_ram()) { /* We can do postcopy, and all the data is postcopiable */ - *res_compatible += remaining_size; + *can_postcopy += remaining_size; } else { - *res_precopy_only += remaining_size; + *must_precopy += remaining_size; } } diff --git a/migration/savevm.c b/migration/savevm.c index e9cf4999ad..aa54a67fda 100644 --- a/migration/savevm.c +++ b/migration/savevm.c @@ -1541,18 +1541,16 @@ flush: * the result is split into the amount for units that can and * for units that can't do postcopy. */ -void qemu_savevm_state_pending_estimate(uint64_t *res_precopy_only, - uint64_t *res_compatible, - uint64_t *res_postcopy_only) +void qemu_savevm_state_pending_estimate(uint64_t *must_precopy, + uint64_t *can_postcopy) { SaveStateEntry *se; - *res_precopy_only = 0; - *res_compatible = 0; - *res_postcopy_only = 0; + *must_precopy = 0; + *can_postcopy = 0; QTAILQ_FOREACH(se, &savevm_state.handlers, entry) { - if (!se->ops || !se->ops->state_pending_exact) { + if (!se->ops || !se->ops->state_pending_estimate) { continue; } if (se->ops->is_active) { @@ -1560,24 +1558,20 @@ void qemu_savevm_state_pending_estimate(uint64_t *res_precopy_only, continue; } } - se->ops->state_pending_exact(se->opaque, - res_precopy_only, res_compatible, - res_postcopy_only); + se->ops->state_pending_estimate(se->opaque, must_precopy, can_postcopy); } } -void qemu_savevm_state_pending_exact(uint64_t *res_precopy_only, - uint64_t *res_compatible, - uint64_t *res_postcopy_only) +void qemu_savevm_state_pending_exact(uint64_t *must_precopy, + uint64_t *can_postcopy) { SaveStateEntry *se; - *res_precopy_only = 0; - *res_compatible = 0; - *res_postcopy_only = 0; + *must_precopy = 0; + *can_postcopy = 0; QTAILQ_FOREACH(se, &savevm_state.handlers, entry) { - if (!se->ops || !se->ops->state_pending_estimate) { + if (!se->ops || !se->ops->state_pending_exact) { continue; } if (se->ops->is_active) { @@ -1585,9 +1579,7 @@ void qemu_savevm_state_pending_exact(uint64_t *res_precopy_only, continue; } } - se->ops->state_pending_estimate(se->opaque, - res_precopy_only, res_compatible, - res_postcopy_only); + se->ops->state_pending_exact(se->opaque, must_precopy, can_postcopy); } } @@ -2200,7 +2192,11 @@ static int loadvm_postcopy_handle_resume(MigrationIncomingState *mis) qemu_sem_post(&mis->postcopy_pause_sem_fault); if (migrate_postcopy_preempt()) { - /* The channel should already be setup again; make sure of it */ + /* + * The preempt channel will be created in async manner, now let's + * wait for it and make sure it's created. + */ + qemu_sem_wait(&mis->postcopy_qemufile_dst_done); assert(mis->postcopy_qemufile_dst); /* Kick the fast ram load thread too */ qemu_sem_post(&mis->postcopy_pause_sem_fast_load); diff --git a/migration/savevm.h b/migration/savevm.h index b1901e68d5..fb636735f0 100644 --- a/migration/savevm.h +++ b/migration/savevm.h @@ -40,12 +40,10 @@ void qemu_savevm_state_cleanup(void); void qemu_savevm_state_complete_postcopy(QEMUFile *f); int qemu_savevm_state_complete_precopy(QEMUFile *f, bool iterable_only, bool inactivate_disks); -void qemu_savevm_state_pending_exact(uint64_t *res_precopy_only, - uint64_t *res_compatible, - uint64_t *res_postcopy_only); -void qemu_savevm_state_pending_estimate(uint64_t *res_precopy_only, - uint64_t *res_compatible, - uint64_t *res_postcopy_only); +void qemu_savevm_state_pending_exact(uint64_t *must_precopy, + uint64_t *can_postcopy); +void qemu_savevm_state_pending_estimate(uint64_t *must_precopy, + uint64_t *can_postcopy); void qemu_savevm_send_ping(QEMUFile *f, uint32_t value); void qemu_savevm_send_open_return_path(QEMUFile *f); int qemu_savevm_send_packaged(QEMUFile *f, const uint8_t *buf, size_t len); diff --git a/migration/trace-events b/migration/trace-events index 67b65a70ff..92161eeac5 100644 --- a/migration/trace-events +++ b/migration/trace-events @@ -150,8 +150,8 @@ migrate_fd_cleanup(void) "" migrate_fd_error(const char *error_desc) "error=%s" migrate_fd_cancel(void) "" migrate_handle_rp_req_pages(const char *rbname, size_t start, size_t len) "in %s at 0x%zx len 0x%zx" -migrate_pending_exact(uint64_t size, uint64_t pre, uint64_t compat, uint64_t post) "exact pending size %" PRIu64 " (pre = %" PRIu64 " compat=%" PRIu64 " post=%" PRIu64 ")" -migrate_pending_estimate(uint64_t size, uint64_t pre, uint64_t compat, uint64_t post) "estimate pending size %" PRIu64 " (pre = %" PRIu64 " compat=%" PRIu64 " post=%" PRIu64 ")" +migrate_pending_exact(uint64_t size, uint64_t pre, uint64_t post) "exact pending size %" PRIu64 " (pre = %" PRIu64 " post=%" PRIu64 ")" +migrate_pending_estimate(uint64_t size, uint64_t pre, uint64_t post) "estimate pending size %" PRIu64 " (pre = %" PRIu64 " post=%" PRIu64 ")" migrate_send_rp_message(int msg_type, uint16_t len) "%d: len %d" migrate_send_rp_recv_bitmap(char *name, int64_t size) "block '%s' size 0x%"PRIi64 migration_completion_file_err(void) "" @@ -357,6 +357,7 @@ migration_block_flush_blks(const char *action, int submitted, int read_done, int migration_block_save(const char *mig_stage, int submitted, int transferred) "Enter save live %s submitted %d transferred %d" migration_block_save_complete(void) "Block migration completed" migration_block_state_pending(uint64_t pending) "Enter save live pending %" PRIu64 +migration_block_progression(unsigned percent) "Completed %u%%" # page_cache.c migration_pagecache_init(int64_t max_num_items) "Setting cache buckets to %" PRId64 diff --git a/migration/xbzrle.c b/migration/xbzrle.c index 1ba482ded9..05366e86c0 100644 --- a/migration/xbzrle.c +++ b/migration/xbzrle.c @@ -174,3 +174,127 @@ int xbzrle_decode_buffer(uint8_t *src, int slen, uint8_t *dst, int dlen) return d; } + +#if defined(CONFIG_AVX512BW_OPT) +#pragma GCC push_options +#pragma GCC target("avx512bw") +#include <immintrin.h> +int xbzrle_encode_buffer_avx512(uint8_t *old_buf, uint8_t *new_buf, int slen, + uint8_t *dst, int dlen) +{ + uint32_t zrun_len = 0, nzrun_len = 0; + int d = 0, i = 0, num = 0; + uint8_t *nzrun_start = NULL; + /* add 1 to include residual part in main loop */ + uint32_t count512s = (slen >> 6) + 1; + /* countResidual is tail of data, i.e., countResidual = slen % 64 */ + uint32_t count_residual = slen & 0b111111; + bool never_same = true; + uint64_t mask_residual = 1; + mask_residual <<= count_residual; + mask_residual -= 1; + __m512i r = _mm512_set1_epi32(0); + + while (count512s) { + if (d + 2 > dlen) { + return -1; + } + + int bytes_to_check = 64; + uint64_t mask = 0xffffffffffffffff; + if (count512s == 1) { + bytes_to_check = count_residual; + mask = mask_residual; + } + __m512i old_data = _mm512_mask_loadu_epi8(r, + mask, old_buf + i); + __m512i new_data = _mm512_mask_loadu_epi8(r, + mask, new_buf + i); + uint64_t comp = _mm512_cmpeq_epi8_mask(old_data, new_data); + count512s--; + + bool is_same = (comp & 0x1); + while (bytes_to_check) { + if (is_same) { + if (nzrun_len) { + d += uleb128_encode_small(dst + d, nzrun_len); + if (d + nzrun_len > dlen) { + return -1; + } + nzrun_start = new_buf + i - nzrun_len; + memcpy(dst + d, nzrun_start, nzrun_len); + d += nzrun_len; + nzrun_len = 0; + } + /* 64 data at a time for speed */ + if (count512s && (comp == 0xffffffffffffffff)) { + i += 64; + zrun_len += 64; + break; + } + never_same = false; + num = __builtin_ctzll(~comp); + num = (num < bytes_to_check) ? num : bytes_to_check; + zrun_len += num; + bytes_to_check -= num; + comp >>= num; + i += num; + if (bytes_to_check) { + /* still has different data after same data */ + d += uleb128_encode_small(dst + d, zrun_len); + zrun_len = 0; + } else { + break; + } + } + if (never_same || zrun_len) { + /* + * never_same only acts if + * data begins with diff in first count512s + */ + d += uleb128_encode_small(dst + d, zrun_len); + zrun_len = 0; + never_same = false; + } + /* has diff, 64 data at a time for speed */ + if ((bytes_to_check == 64) && (comp == 0x0)) { + i += 64; + nzrun_len += 64; + break; + } + num = __builtin_ctzll(comp); + num = (num < bytes_to_check) ? num : bytes_to_check; + nzrun_len += num; + bytes_to_check -= num; + comp >>= num; + i += num; + if (bytes_to_check) { + /* mask like 111000 */ + d += uleb128_encode_small(dst + d, nzrun_len); + /* overflow */ + if (d + nzrun_len > dlen) { + return -1; + } + nzrun_start = new_buf + i - nzrun_len; + memcpy(dst + d, nzrun_start, nzrun_len); + d += nzrun_len; + nzrun_len = 0; + is_same = true; + } + } + } + + if (nzrun_len != 0) { + d += uleb128_encode_small(dst + d, nzrun_len); + /* overflow */ + if (d + nzrun_len > dlen) { + return -1; + } + nzrun_start = new_buf + i - nzrun_len; + memcpy(dst + d, nzrun_start, nzrun_len); + d += nzrun_len; + } + return d; +} +#pragma GCC pop_options +#endif diff --git a/migration/xbzrle.h b/migration/xbzrle.h index a0db507b9c..6feb49160a 100644 --- a/migration/xbzrle.h +++ b/migration/xbzrle.h @@ -18,4 +18,8 @@ int xbzrle_encode_buffer(uint8_t *old_buf, uint8_t *new_buf, int slen, uint8_t *dst, int dlen); int xbzrle_decode_buffer(uint8_t *src, int slen, uint8_t *dst, int dlen); +#if defined(CONFIG_AVX512BW_OPT) +int xbzrle_encode_buffer_avx512(uint8_t *old_buf, uint8_t *new_buf, int slen, + uint8_t *dst, int dlen); +#endif #endif |