diff options
Diffstat (limited to 'migration')
| -rw-r--r-- | migration/channel.c | 12 | ||||
| -rw-r--r-- | migration/exec.c | 9 | ||||
| -rw-r--r-- | migration/fd.c | 9 | ||||
| -rw-r--r-- | migration/migration.c | 559 | ||||
| -rw-r--r-- | migration/migration.h | 22 | ||||
| -rw-r--r-- | migration/postcopy-ram.c | 54 | ||||
| -rw-r--r-- | migration/ram.c | 500 | ||||
| -rw-r--r-- | migration/ram.h | 6 | ||||
| -rw-r--r-- | migration/rdma.c | 7 | ||||
| -rw-r--r-- | migration/savevm.c | 191 | ||||
| -rw-r--r-- | migration/savevm.h | 3 | ||||
| -rw-r--r-- | migration/socket.c | 39 | ||||
| -rw-r--r-- | migration/socket.h | 7 | ||||
| -rw-r--r-- | migration/trace-events | 21 |
14 files changed, 1333 insertions, 106 deletions
diff --git a/migration/channel.c b/migration/channel.c index c5eaf0fa0e..33e0e9b82f 100644 --- a/migration/channel.c +++ b/migration/channel.c @@ -71,11 +71,21 @@ void migration_channel_connect(MigrationState *s, !object_dynamic_cast(OBJECT(ioc), TYPE_QIO_CHANNEL_TLS)) { migration_tls_channel_connect(s, ioc, hostname, &error); + + if (!error) { + /* tls_channel_connect will call back to this + * function after the TLS handshake, + * so we mustn't call migrate_fd_connect until then + */ + + return; + } } else { QEMUFile *f = qemu_fopen_channel_output(ioc); + qemu_mutex_lock(&s->qemu_file_lock); s->to_dst_file = f; - + qemu_mutex_unlock(&s->qemu_file_lock); } } migrate_fd_connect(s, error); diff --git a/migration/exec.c b/migration/exec.c index 0bc5a427dd..9d0f82f1f0 100644 --- a/migration/exec.c +++ b/migration/exec.c @@ -65,9 +65,8 @@ void exec_start_incoming_migration(const char *command, Error **errp) } qio_channel_set_name(ioc, "migration-exec-incoming"); - qio_channel_add_watch(ioc, - G_IO_IN, - exec_accept_incoming_migration, - NULL, - NULL); + qio_channel_add_watch_full(ioc, G_IO_IN, + exec_accept_incoming_migration, + NULL, NULL, + g_main_context_get_thread_default()); } diff --git a/migration/fd.c b/migration/fd.c index cd06182d1e..9a380bbbc4 100644 --- a/migration/fd.c +++ b/migration/fd.c @@ -66,9 +66,8 @@ void fd_start_incoming_migration(const char *infd, Error **errp) } qio_channel_set_name(QIO_CHANNEL(ioc), "migration-fd-incoming"); - qio_channel_add_watch(ioc, - G_IO_IN, - fd_accept_incoming_migration, - NULL, - NULL); + qio_channel_add_watch_full(ioc, G_IO_IN, + fd_accept_incoming_migration, + NULL, NULL, + g_main_context_get_thread_default()); } diff --git a/migration/migration.c b/migration/migration.c index 35f2781b03..05aec2c905 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -95,6 +95,8 @@ enum mig_rp_message_type { MIG_RP_MSG_REQ_PAGES_ID, /* data (start: be64, len: be32, id: string) */ MIG_RP_MSG_REQ_PAGES, /* data (start: be64, len: be32) */ + MIG_RP_MSG_RECV_BITMAP, /* send recved_bitmap back to source */ + MIG_RP_MSG_RESUME_ACK, /* tell source that we are ready to resume */ MIG_RP_MSG_MAX }; @@ -104,6 +106,7 @@ enum mig_rp_message_type { dynamic creation of migration */ static MigrationState *current_migration; +static MigrationIncomingState *current_incoming; static bool migration_object_check(MigrationState *ms, Error **errp); static int migration_maybe_pause(MigrationState *s, @@ -119,6 +122,22 @@ void migration_object_init(void) assert(!current_migration); current_migration = MIGRATION_OBJ(object_new(TYPE_MIGRATION)); + /* + * Init the migrate incoming object as well no matter whether + * we'll use it or not. + */ + assert(!current_incoming); + current_incoming = g_new0(MigrationIncomingState, 1); + current_incoming->state = MIGRATION_STATUS_NONE; + current_incoming->postcopy_remote_fds = + g_array_new(FALSE, TRUE, sizeof(struct PostCopyFD)); + qemu_mutex_init(¤t_incoming->rp_mutex); + qemu_event_init(¤t_incoming->main_thread_load_event, false); + qemu_sem_init(¤t_incoming->postcopy_pause_sem_dst, 0); + qemu_sem_init(¤t_incoming->postcopy_pause_sem_fault, 0); + + init_dirty_bitmap_incoming_migration(); + if (!migration_object_check(current_migration, &err)) { error_report_err(err); exit(1); @@ -149,22 +168,8 @@ MigrationState *migrate_get_current(void) MigrationIncomingState *migration_incoming_get_current(void) { - static bool once; - static MigrationIncomingState mis_current; - - if (!once) { - mis_current.state = MIGRATION_STATUS_NONE; - memset(&mis_current, 0, sizeof(MigrationIncomingState)); - mis_current.postcopy_remote_fds = g_array_new(FALSE, TRUE, - sizeof(struct PostCopyFD)); - qemu_mutex_init(&mis_current.rp_mutex); - qemu_event_init(&mis_current.main_thread_load_event, false); - - init_dirty_bitmap_incoming_migration(); - - once = true; - } - return &mis_current; + assert(current_incoming); + return current_incoming; } void migration_incoming_state_destroy(void) @@ -430,7 +435,7 @@ static void migration_incoming_setup(QEMUFile *f) qemu_file_set_blocking(f, false); } -static void migration_incoming_process(void) +void migration_incoming_process(void) { Coroutine *co = qemu_coroutine_create(process_incoming_migration_co, NULL); qemu_coroutine_enter(co); @@ -438,8 +443,34 @@ static void migration_incoming_process(void) void migration_fd_process_incoming(QEMUFile *f) { - migration_incoming_setup(f); - migration_incoming_process(); + MigrationIncomingState *mis = migration_incoming_get_current(); + + if (mis->state == MIGRATION_STATUS_POSTCOPY_PAUSED) { + /* Resumed from a paused postcopy migration */ + + mis->from_src_file = f; + /* Postcopy has standalone thread to do vm load */ + qemu_file_set_blocking(f, true); + + /* Re-configure the return path */ + mis->to_src_file = qemu_file_get_return_path(f); + + migrate_set_state(&mis->state, MIGRATION_STATUS_POSTCOPY_PAUSED, + MIGRATION_STATUS_POSTCOPY_RECOVER); + + /* + * Here, we only wake up the main loading thread (while the + * fault thread will still be waiting), so that we can receive + * commands from source now, and answer it if needed. The + * fault thread will be woken up afterwards until we are sure + * that source is ready to reply to page requests. + */ + qemu_sem_post(&mis->postcopy_pause_sem_dst); + } else { + /* New incoming migration */ + migration_incoming_setup(f); + migration_incoming_process(); + } } void migration_ioc_process_incoming(QIOChannel *ioc) @@ -448,9 +479,10 @@ void migration_ioc_process_incoming(QIOChannel *ioc) if (!mis->from_src_file) { QEMUFile *f = qemu_fopen_channel_input(ioc); - migration_fd_process_incoming(f); + migration_incoming_setup(f); + return; } - /* We still only have a single channel. Nothing to do here yet */ + multifd_recv_new_channel(ioc); } /** @@ -461,7 +493,11 @@ void migration_ioc_process_incoming(QIOChannel *ioc) */ bool migration_has_all_channels(void) { - return true; + bool all_channels; + + all_channels = multifd_recv_all_channels_created(); + + return all_channels; } /* @@ -491,6 +527,53 @@ void migrate_send_rp_pong(MigrationIncomingState *mis, migrate_send_rp_message(mis, MIG_RP_MSG_PONG, sizeof(buf), &buf); } +void migrate_send_rp_recv_bitmap(MigrationIncomingState *mis, + char *block_name) +{ + char buf[512]; + int len; + int64_t res; + + /* + * First, we send the header part. It contains only the len of + * idstr, and the idstr itself. + */ + len = strlen(block_name); + buf[0] = len; + memcpy(buf + 1, block_name, len); + + if (mis->state != MIGRATION_STATUS_POSTCOPY_RECOVER) { + error_report("%s: MSG_RP_RECV_BITMAP only used for recovery", + __func__); + return; + } + + migrate_send_rp_message(mis, MIG_RP_MSG_RECV_BITMAP, len + 1, buf); + + /* + * Next, we dump the received bitmap to the stream. + * + * TODO: currently we are safe since we are the only one that is + * using the to_src_file handle (fault thread is still paused), + * and it's ok even not taking the mutex. However the best way is + * to take the lock before sending the message header, and release + * the lock after sending the bitmap. + */ + qemu_mutex_lock(&mis->rp_mutex); + res = ramblock_recv_bitmap_send(mis->to_src_file, block_name); + qemu_mutex_unlock(&mis->rp_mutex); + + trace_migrate_send_rp_recv_bitmap(block_name, res); +} + +void migrate_send_rp_resume_ack(MigrationIncomingState *mis, uint32_t value) +{ + uint32_t buf; + + buf = cpu_to_be32(value); + migrate_send_rp_message(mis, MIG_RP_MSG_RESUME_ACK, sizeof(buf), &buf); +} + MigrationCapabilityStatusList *qmp_query_migrate_capabilities(Error **errp) { MigrationCapabilityStatusList *head = NULL; @@ -569,6 +652,8 @@ static bool migration_is_setup_or_active(int state) switch (state) { case MIGRATION_STATUS_ACTIVE: case MIGRATION_STATUS_POSTCOPY_ACTIVE: + case MIGRATION_STATUS_POSTCOPY_PAUSED: + case MIGRATION_STATUS_POSTCOPY_RECOVER: case MIGRATION_STATUS_SETUP: case MIGRATION_STATUS_PRE_SWITCHOVER: case MIGRATION_STATUS_DEVICE: @@ -649,6 +734,8 @@ static void fill_source_migration_info(MigrationInfo *info) case MIGRATION_STATUS_POSTCOPY_ACTIVE: case MIGRATION_STATUS_PRE_SWITCHOVER: case MIGRATION_STATUS_DEVICE: + case MIGRATION_STATUS_POSTCOPY_PAUSED: + case MIGRATION_STATUS_POSTCOPY_RECOVER: /* TODO add some postcopy stats */ info->has_status = true; info->has_total_time = true; @@ -1147,6 +1234,7 @@ static void migrate_fd_cleanup(void *opaque) if (s->to_dst_file) { Error *local_err = NULL; + QEMUFile *tmp; trace_migrate_fd_cleanup(); qemu_mutex_unlock_iothread(); @@ -1159,8 +1247,15 @@ static void migrate_fd_cleanup(void *opaque) if (multifd_save_cleanup(&local_err) != 0) { error_report_err(local_err); } - qemu_fclose(s->to_dst_file); + qemu_mutex_lock(&s->qemu_file_lock); + tmp = s->to_dst_file; s->to_dst_file = NULL; + qemu_mutex_unlock(&s->qemu_file_lock); + /* + * Close the file handle without the lock to make sure the + * critical section won't block for long. + */ + qemu_fclose(tmp); } assert((s->state != MIGRATION_STATUS_ACTIVE) && @@ -1388,6 +1483,59 @@ void qmp_migrate_incoming(const char *uri, Error **errp) once = false; } +void qmp_migrate_recover(const char *uri, Error **errp) +{ + MigrationIncomingState *mis = migration_incoming_get_current(); + + if (mis->state != MIGRATION_STATUS_POSTCOPY_PAUSED) { + error_setg(errp, "Migrate recover can only be run " + "when postcopy is paused."); + return; + } + + if (atomic_cmpxchg(&mis->postcopy_recover_triggered, + false, true) == true) { + error_setg(errp, "Migrate recovery is triggered already"); + return; + } + + /* + * Note that this call will never start a real migration; it will + * only re-setup the migration stream and poke existing migration + * to continue using that newly established channel. + */ + qemu_start_incoming_migration(uri, errp); +} + +void qmp_migrate_pause(Error **errp) +{ + MigrationState *ms = migrate_get_current(); + MigrationIncomingState *mis = migration_incoming_get_current(); + int ret; + + if (ms->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) { + /* Source side, during postcopy */ + qemu_mutex_lock(&ms->qemu_file_lock); + ret = qemu_file_shutdown(ms->to_dst_file); + qemu_mutex_unlock(&ms->qemu_file_lock); + if (ret) { + error_setg(errp, "Failed to pause source migration"); + } + return; + } + + if (mis->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) { + ret = qemu_file_shutdown(mis->from_src_file); + if (ret) { + error_setg(errp, "Failed to pause destination migration"); + } + return; + } + + error_setg(errp, "migrate-pause is currently only supported " + "during postcopy-active state"); +} + bool migration_is_blocked(Error **errp) { if (qemu_savevm_state_blocked(errp)) { @@ -1402,49 +1550,75 @@ bool migration_is_blocked(Error **errp) return false; } -void qmp_migrate(const char *uri, bool has_blk, bool blk, - bool has_inc, bool inc, bool has_detach, bool detach, - Error **errp) +/* Returns true if continue to migrate, or false if error detected */ +static bool migrate_prepare(MigrationState *s, bool blk, bool blk_inc, + bool resume, Error **errp) { Error *local_err = NULL; - MigrationState *s = migrate_get_current(); - const char *p; + + if (resume) { + if (s->state != MIGRATION_STATUS_POSTCOPY_PAUSED) { + error_setg(errp, "Cannot resume if there is no " + "paused migration"); + return false; + } + /* This is a resume, skip init status */ + return true; + } if (migration_is_setup_or_active(s->state) || s->state == MIGRATION_STATUS_CANCELLING || s->state == MIGRATION_STATUS_COLO) { error_setg(errp, QERR_MIGRATION_ACTIVE); - return; + return false; } + if (runstate_check(RUN_STATE_INMIGRATE)) { error_setg(errp, "Guest is waiting for an incoming migration"); - return; + return false; } if (migration_is_blocked(errp)) { - return; + return false; } - if ((has_blk && blk) || (has_inc && inc)) { + if (blk || blk_inc) { if (migrate_use_block() || migrate_use_block_incremental()) { error_setg(errp, "Command options are incompatible with " "current migration capabilities"); - return; + return false; } migrate_set_block_enabled(true, &local_err); if (local_err) { error_propagate(errp, local_err); - return; + return false; } s->must_remove_block_options = true; } - if (has_inc && inc) { + if (blk_inc) { migrate_set_block_incremental(s, true); } migrate_init(s); + return true; +} + +void qmp_migrate(const char *uri, bool has_blk, bool blk, + bool has_inc, bool inc, bool has_detach, bool detach, + bool has_resume, bool resume, Error **errp) +{ + Error *local_err = NULL; + MigrationState *s = migrate_get_current(); + const char *p; + + if (!migrate_prepare(s, has_blk && blk, has_inc && inc, + has_resume && resume, errp)) { + /* Error detected, put into errp */ + return; + } + if (strstart(uri, "tcp:", &p)) { tcp_start_outgoing_migration(s, p, &local_err); #ifdef CONFIG_RDMA @@ -1739,6 +1913,8 @@ static struct rp_cmd_args { [MIG_RP_MSG_PONG] = { .len = 4, .name = "PONG" }, [MIG_RP_MSG_REQ_PAGES] = { .len = 12, .name = "REQ_PAGES" }, [MIG_RP_MSG_REQ_PAGES_ID] = { .len = -1, .name = "REQ_PAGES_ID" }, + [MIG_RP_MSG_RECV_BITMAP] = { .len = -1, .name = "RECV_BITMAP" }, + [MIG_RP_MSG_RESUME_ACK] = { .len = 4, .name = "RESUME_ACK" }, [MIG_RP_MSG_MAX] = { .len = -1, .name = "MAX" }, }; @@ -1771,6 +1947,51 @@ static void migrate_handle_rp_req_pages(MigrationState *ms, const char* rbname, } } +/* Return true to retry, false to quit */ +static bool postcopy_pause_return_path_thread(MigrationState *s) +{ + trace_postcopy_pause_return_path(); + + qemu_sem_wait(&s->postcopy_pause_rp_sem); + + trace_postcopy_pause_return_path_continued(); + + return true; +} + +static int migrate_handle_rp_recv_bitmap(MigrationState *s, char *block_name) +{ + RAMBlock *block = qemu_ram_block_by_name(block_name); + + if (!block) { + error_report("%s: invalid block name '%s'", __func__, block_name); + return -EINVAL; + } + + /* Fetch the received bitmap and refresh the dirty bitmap */ + return ram_dirty_bitmap_reload(s, block); +} + +static int migrate_handle_rp_resume_ack(MigrationState *s, uint32_t value) +{ + trace_source_return_path_thread_resume_ack(value); + + if (value != MIGRATION_RESUME_ACK_VALUE) { + error_report("%s: illegal resume_ack value %"PRIu32, + __func__, value); + return -1; + } + + /* Now both sides are active. */ + migrate_set_state(&s->state, MIGRATION_STATUS_POSTCOPY_RECOVER, + MIGRATION_STATUS_POSTCOPY_ACTIVE); + + /* Notify send thread that time to continue send pages */ + qemu_sem_post(&s->rp_state.rp_sem); + + return 0; +} + /* * Handles messages sent on the return path towards the source VM * @@ -1787,6 +2008,8 @@ static void *source_return_path_thread(void *opaque) int res; trace_source_return_path_thread_entry(); + +retry: while (!ms->rp_state.error && !qemu_file_get_error(rp) && migration_is_setup_or_active(ms->state)) { trace_source_return_path_thread_loop_top(); @@ -1874,23 +2097,61 @@ static void *source_return_path_thread(void *opaque) migrate_handle_rp_req_pages(ms, (char *)&buf[13], start, len); break; + case MIG_RP_MSG_RECV_BITMAP: + if (header_len < 1) { + error_report("%s: missing block name", __func__); + mark_source_rp_bad(ms); + goto out; + } + /* Format: len (1B) + idstr (<255B). This ends the idstr. */ + buf[buf[0] + 1] = '\0'; + if (migrate_handle_rp_recv_bitmap(ms, (char *)(buf + 1))) { + mark_source_rp_bad(ms); + goto out; + } + break; + + case MIG_RP_MSG_RESUME_ACK: + tmp32 = ldl_be_p(buf); + if (migrate_handle_rp_resume_ack(ms, tmp32)) { + mark_source_rp_bad(ms); + goto out; + } + break; + default: break; } } - if (qemu_file_get_error(rp)) { + +out: + res = qemu_file_get_error(rp); + if (res) { + if (res == -EIO) { + /* + * Maybe there is something we can do: it looks like a + * network down issue, and we pause for a recovery. + */ + if (postcopy_pause_return_path_thread(ms)) { + /* Reload rp, reset the rest */ + rp = ms->rp_state.from_dst_file; + ms->rp_state.error = false; + goto retry; + } + } + trace_source_return_path_thread_bad_end(); mark_source_rp_bad(ms); } trace_source_return_path_thread_end(); -out: ms->rp_state.from_dst_file = NULL; qemu_fclose(rp); return NULL; } -static int open_return_path_on_source(MigrationState *ms) +static int open_return_path_on_source(MigrationState *ms, + bool create_thread) { ms->rp_state.from_dst_file = qemu_file_get_return_path(ms->to_dst_file); @@ -1899,6 +2160,12 @@ static int open_return_path_on_source(MigrationState *ms) } trace_open_return_path_on_source(); + + if (!create_thread) { + /* We're done */ + return 0; + } + qemu_thread_create(&ms->rp_state.rp_thread, "return path", source_return_path_thread, ms, QEMU_THREAD_JOINABLE); @@ -2238,6 +2505,156 @@ bool migrate_colo_enabled(void) return s->enabled_capabilities[MIGRATION_CAPABILITY_X_COLO]; } +typedef enum MigThrError { + /* No error detected */ + MIG_THR_ERR_NONE = 0, + /* Detected error, but resumed successfully */ + MIG_THR_ERR_RECOVERED = 1, + /* Detected fatal error, need to exit */ + MIG_THR_ERR_FATAL = 2, +} MigThrError; + +static int postcopy_resume_handshake(MigrationState *s) +{ + qemu_savevm_send_postcopy_resume(s->to_dst_file); + + while (s->state == MIGRATION_STATUS_POSTCOPY_RECOVER) { + qemu_sem_wait(&s->rp_state.rp_sem); + } + + if (s->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) { + return 0; + } + + return -1; +} + +/* Return zero if success, or <0 for error */ +static int postcopy_do_resume(MigrationState *s) +{ + int ret; + + /* + * Call all the resume_prepare() hooks, so that modules can be + * ready for the migration resume. + */ + ret = qemu_savevm_state_resume_prepare(s); + if (ret) { + error_report("%s: resume_prepare() failure detected: %d", + __func__, ret); + return ret; + } + + /* + * Last handshake with destination on the resume (destination will + * switch to postcopy-active afterwards) + */ + ret = postcopy_resume_handshake(s); + if (ret) { + error_report("%s: handshake failed: %d", __func__, ret); + return ret; + } + + return 0; +} + +/* + * We don't return until we are in a safe state to continue current + * postcopy migration. Returns MIG_THR_ERR_RECOVERED if recovered, or + * MIG_THR_ERR_FATAL if unrecovery failure happened. + */ +static MigThrError postcopy_pause(MigrationState *s) +{ + assert(s->state == MIGRATION_STATUS_POSTCOPY_ACTIVE); + + while (true) { + QEMUFile *file; + + migrate_set_state(&s->state, s->state, + MIGRATION_STATUS_POSTCOPY_PAUSED); + + /* Current channel is possibly broken. Release it. */ + assert(s->to_dst_file); + qemu_mutex_lock(&s->qemu_file_lock); + file = s->to_dst_file; + s->to_dst_file = NULL; + qemu_mutex_unlock(&s->qemu_file_lock); + + qemu_file_shutdown(file); + qemu_fclose(file); + + error_report("Detected IO failure for postcopy. " + "Migration paused."); + + /* + * We wait until things fixed up. Then someone will setup the + * status back for us. + */ + while (s->state == MIGRATION_STATUS_POSTCOPY_PAUSED) { + qemu_sem_wait(&s->postcopy_pause_sem); + } + + if (s->state == MIGRATION_STATUS_POSTCOPY_RECOVER) { + /* Woken up by a recover procedure. Give it a shot */ + + /* + * Firstly, let's wake up the return path now, with a new + * return path channel. + */ + qemu_sem_post(&s->postcopy_pause_rp_sem); + + /* Do the resume logic */ + if (postcopy_do_resume(s) == 0) { + /* Let's continue! */ + trace_postcopy_pause_continued(); + return MIG_THR_ERR_RECOVERED; + } else { + /* + * Something wrong happened during the recovery, let's + * pause again. Pause is always better than throwing + * data away. + */ + continue; + } + } else { + /* This is not right... Time to quit. */ + return MIG_THR_ERR_FATAL; + } + } +} + +static MigThrError migration_detect_error(MigrationState *s) +{ + int ret; + + /* Try to detect any file errors */ + ret = qemu_file_get_error(s->to_dst_file); + + if (!ret) { + /* Everything is fine */ + return MIG_THR_ERR_NONE; + } + + if (s->state == MIGRATION_STATUS_POSTCOPY_ACTIVE && ret == -EIO) { + /* + * For postcopy, we allow the network to be down for a + * while. After that, it can be continued by a + * recovery phase. + */ + return postcopy_pause(s); + } else { + /* + * For precopy (or postcopy with error outside IO), we fail + * with no time. + */ + migrate_set_state(&s->state, s->state, MIGRATION_STATUS_FAILED); + trace_migration_thread_file_err(); + + /* Time to stop the migration, now. */ + return MIG_THR_ERR_FATAL; + } +} + static void migration_calculate_complete(MigrationState *s) { uint64_t bytes = qemu_ftell(s->to_dst_file); @@ -2394,6 +2811,7 @@ static void *migration_thread(void *opaque) { MigrationState *s = opaque; int64_t setup_start = qemu_clock_get_ms(QEMU_CLOCK_HOST); + MigThrError thr_error; rcu_register_thread(); @@ -2443,13 +2861,22 @@ static void *migration_thread(void *opaque) } } - if (qemu_file_get_error(s->to_dst_file)) { - if (migration_is_setup_or_active(s->state)) { - migrate_set_state(&s->state, s->state, - MIGRATION_STATUS_FAILED); - } - trace_migration_thread_file_err(); + /* + * Try to detect any kind of failures, and see whether we + * should stop the migration now. + */ + thr_error = migration_detect_error(s); + if (thr_error == MIG_THR_ERR_FATAL) { + /* Stop migration */ break; + } else if (thr_error == MIG_THR_ERR_RECOVERED) { + /* + * Just recovered from a e.g. network failure, reset all + * the local variables. This is important to avoid + * breaking transferred_bytes and bandwidth calculation + */ + s->iteration_start_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME); + s->iteration_initial_bytes = 0; } current_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME); @@ -2471,6 +2898,9 @@ static void *migration_thread(void *opaque) void migrate_fd_connect(MigrationState *s, Error *error_in) { + int64_t rate_limit; + bool resume = s->state == MIGRATION_STATUS_POSTCOPY_PAUSED; + s->expected_downtime = s->parameters.downtime_limit; s->cleanup_bh = qemu_bh_new(migrate_fd_cleanup, s); if (error_in) { @@ -2479,12 +2909,21 @@ void migrate_fd_connect(MigrationState *s, Error *error_in) return; } - qemu_file_set_blocking(s->to_dst_file, true); - qemu_file_set_rate_limit(s->to_dst_file, - s->parameters.max_bandwidth / XFER_LIMIT_RATIO); + if (resume) { + /* This is a resumed migration */ + rate_limit = INT64_MAX; + } else { + /* This is a fresh new migration */ + rate_limit = s->parameters.max_bandwidth / XFER_LIMIT_RATIO; + s->expected_downtime = s->parameters.downtime_limit; + s->cleanup_bh = qemu_bh_new(migrate_fd_cleanup, s); - /* Notify before starting migration thread */ - notifier_list_notify(&migration_state_notifiers, s); + /* Notify before starting migration thread */ + notifier_list_notify(&migration_state_notifiers, s); + } + + qemu_file_set_rate_limit(s->to_dst_file, rate_limit); + qemu_file_set_blocking(s->to_dst_file, true); /* * Open the return path. For postcopy, it is used exclusively. For @@ -2492,15 +2931,22 @@ void migrate_fd_connect(MigrationState *s, Error *error_in) * QEMU uses the return path. */ if (migrate_postcopy_ram() || migrate_use_return_path()) { - if (open_return_path_on_source(s)) { + if (open_return_path_on_source(s, !resume)) { error_report("Unable to open return-path for postcopy"); - migrate_set_state(&s->state, MIGRATION_STATUS_SETUP, - MIGRATION_STATUS_FAILED); + migrate_set_state(&s->state, s->state, MIGRATION_STATUS_FAILED); migrate_fd_cleanup(s); return; } } + if (resume) { + /* Wakeup the main migration thread to do the recovery */ + migrate_set_state(&s->state, MIGRATION_STATUS_POSTCOPY_PAUSED, + MIGRATION_STATUS_POSTCOPY_RECOVER); + qemu_sem_post(&s->postcopy_pause_sem); + return; + } + if (multifd_save_setup() != 0) { migrate_set_state(&s->state, MIGRATION_STATUS_SETUP, MIGRATION_STATUS_FAILED); @@ -2604,9 +3050,13 @@ static void migration_instance_finalize(Object *obj) MigrationParameters *params = &ms->parameters; qemu_mutex_destroy(&ms->error_mutex); + qemu_mutex_destroy(&ms->qemu_file_lock); g_free(params->tls_hostname); g_free(params->tls_creds); qemu_sem_destroy(&ms->pause_sem); + qemu_sem_destroy(&ms->postcopy_pause_sem); + qemu_sem_destroy(&ms->postcopy_pause_rp_sem); + qemu_sem_destroy(&ms->rp_state.rp_sem); error_free(ms->error); } @@ -2636,6 +3086,11 @@ static void migration_instance_init(Object *obj) params->has_x_multifd_channels = true; params->has_x_multifd_page_count = true; params->has_xbzrle_cache_size = true; + + qemu_sem_init(&ms->postcopy_pause_sem, 0); + qemu_sem_init(&ms->postcopy_pause_rp_sem, 0); + qemu_sem_init(&ms->rp_state.rp_sem, 0); + qemu_mutex_init(&ms->qemu_file_lock); } /* diff --git a/migration/migration.h b/migration/migration.h index 7c69598c54..8f0c82159b 100644 --- a/migration/migration.h +++ b/migration/migration.h @@ -24,6 +24,8 @@ struct PostcopyBlocktimeContext; +#define MIGRATION_RESUME_ACK_VALUE (1) + /* State for the incoming migration */ struct MigrationIncomingState { QEMUFile *from_src_file; @@ -73,6 +75,11 @@ struct MigrationIncomingState { * live migration, to calculate vCPU block time * */ struct PostcopyBlocktimeContext *blocktime_ctx; + + /* notify PAUSED postcopy incoming migrations to try to continue */ + bool postcopy_recover_triggered; + QemuSemaphore postcopy_pause_sem_dst; + QemuSemaphore postcopy_pause_sem_fault; }; MigrationIncomingState *migration_incoming_get_current(void); @@ -107,6 +114,12 @@ struct MigrationState QemuThread thread; QEMUBH *cleanup_bh; QEMUFile *to_dst_file; + /* + * Protects to_dst_file pointer. We need to make sure we won't + * yield or hang during the critical section, since this lock will + * be used in OOB command handler. + */ + QemuMutex qemu_file_lock; /* bytes already send at the beggining of current interation */ uint64_t iteration_initial_bytes; @@ -129,6 +142,7 @@ struct MigrationState QEMUFile *from_dst_file; QemuThread rp_thread; bool error; + QemuSemaphore rp_sem; } rp_state; double mbps; @@ -194,12 +208,17 @@ struct MigrationState bool send_configuration; /* Whether we send section footer during migration */ bool send_section_footer; + + /* Needed by postcopy-pause state */ + QemuSemaphore postcopy_pause_sem; + QemuSemaphore postcopy_pause_rp_sem; }; void migrate_set_state(int *state, int old_state, int new_state); void migration_fd_process_incoming(QEMUFile *f); void migration_ioc_process_incoming(QIOChannel *ioc); +void migration_incoming_process(void); bool migration_has_all_channels(void); @@ -251,6 +270,9 @@ void migrate_send_rp_pong(MigrationIncomingState *mis, uint32_t value); int migrate_send_rp_req_pages(MigrationIncomingState *mis, const char* rbname, ram_addr_t start, size_t len); +void migrate_send_rp_recv_bitmap(MigrationIncomingState *mis, + char *block_name); +void migrate_send_rp_resume_ack(MigrationIncomingState *mis, uint32_t value); void dirty_bitmap_mig_before_vm_start(void); void init_dirty_bitmap_incoming_migration(void); diff --git a/migration/postcopy-ram.c b/migration/postcopy-ram.c index 8ceeaa2a93..658b750a8e 100644 --- a/migration/postcopy-ram.c +++ b/migration/postcopy-ram.c @@ -830,6 +830,17 @@ static void mark_postcopy_blocktime_end(uintptr_t addr) affected_cpu); } +static bool postcopy_pause_fault_thread(MigrationIncomingState *mis) +{ + trace_postcopy_pause_fault_thread(); + + qemu_sem_wait(&mis->postcopy_pause_sem_fault); + + trace_postcopy_pause_fault_thread_continued(); + + return true; +} + /* * Handle faults detected by the USERFAULT markings */ @@ -880,6 +891,22 @@ static void *postcopy_ram_fault_thread(void *opaque) break; } + if (!mis->to_src_file) { + /* + * Possibly someone tells us that the return path is + * broken already using the event. We should hold until + * the channel is rebuilt. + */ + if (postcopy_pause_fault_thread(mis)) { + mis->last_rb = NULL; + /* Continue to read the userfaultfd */ + } else { + error_report("%s: paused but don't allow to continue", + __func__); + break; + } + } + if (pfd[1].revents) { uint64_t tmp64 = 0; @@ -942,18 +969,37 @@ static void *postcopy_ram_fault_thread(void *opaque) (uintptr_t)(msg.arg.pagefault.address), msg.arg.pagefault.feat.ptid, rb); +retry: /* * Send the request to the source - we want to request one * of our host page sizes (which is >= TPS) */ if (rb != mis->last_rb) { mis->last_rb = rb; - migrate_send_rp_req_pages(mis, qemu_ram_get_idstr(rb), - rb_offset, qemu_ram_pagesize(rb)); + ret = migrate_send_rp_req_pages(mis, + qemu_ram_get_idstr(rb), + rb_offset, + qemu_ram_pagesize(rb)); } else { /* Save some space */ - migrate_send_rp_req_pages(mis, NULL, - rb_offset, qemu_ram_pagesize(rb)); + ret = migrate_send_rp_req_pages(mis, + NULL, + rb_offset, + qemu_ram_pagesize(rb)); + } + + if (ret) { + /* May be network failure, try to wait for recovery */ + if (ret == -EIO && postcopy_pause_fault_thread(mis)) { + /* We got reconnected somehow, try to continue */ + mis->last_rb = NULL; + goto retry; + } else { + /* This is a unavoidable fault */ + error_report("%s: migrate_send_rp_req_pages() get %d", + __func__, ret); + break; + } } } diff --git a/migration/ram.c b/migration/ram.c index 912810c18e..5bcbf7a9f9 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -36,6 +36,7 @@ #include "xbzrle.h" #include "ram.h" #include "migration.h" +#include "socket.h" #include "migration/register.h" #include "migration/misc.h" #include "qemu-file.h" @@ -51,6 +52,9 @@ #include "qemu/rcu_queue.h" #include "migration/colo.h" #include "migration/block.h" +#include "sysemu/sysemu.h" +#include "qemu/uuid.h" +#include "savevm.h" /***********************************************************/ /* ram save/restore */ @@ -187,6 +191,70 @@ void ramblock_recv_bitmap_set_range(RAMBlock *rb, void *host_addr, nr); } +#define RAMBLOCK_RECV_BITMAP_ENDING (0x0123456789abcdefULL) + +/* + * Format: bitmap_size (8 bytes) + whole_bitmap (N bytes). + * + * Returns >0 if success with sent bytes, or <0 if error. + */ +int64_t ramblock_recv_bitmap_send(QEMUFile *file, + const char *block_name) +{ + RAMBlock *block = qemu_ram_block_by_name(block_name); + unsigned long *le_bitmap, nbits; + uint64_t size; + + if (!block) { + error_report("%s: invalid block name: %s", __func__, block_name); + return -1; + } + + nbits = block->used_length >> TARGET_PAGE_BITS; + + /* + * Make sure the tmp bitmap buffer is big enough, e.g., on 32bit + * machines we may need 4 more bytes for padding (see below + * comment). So extend it a bit before hand. + */ + le_bitmap = bitmap_new(nbits + BITS_PER_LONG); + + /* + * Always use little endian when sending the bitmap. This is + * required that when source and destination VMs are not using the + * same endianess. (Note: big endian won't work.) + */ + bitmap_to_le(le_bitmap, block->receivedmap, nbits); + + /* Size of the bitmap, in bytes */ + size = nbits / 8; + + /* + * size is always aligned to 8 bytes for 64bit machines, but it + * may not be true for 32bit machines. We need this padding to + * make sure the migration can survive even between 32bit and + * 64bit machines. + */ + size = ROUND_UP(size, 8); + + qemu_put_be64(file, size); + qemu_put_buffer(file, (const uint8_t *)le_bitmap, size); + /* + * Mark as an end, in case the middle part is screwed up due to + * some "misterious" reason. + */ + qemu_put_be64(file, RAMBLOCK_RECV_BITMAP_ENDING); + qemu_fflush(file); + + free(le_bitmap); + + if (qemu_file_get_error(file)) { + return qemu_file_get_error(file); + } + + return size + sizeof(size); +} + /* * An outstanding page request, on the source, having been received * and queued @@ -432,15 +500,117 @@ exit: /* Multiple fd's */ -struct MultiFDSendParams { +#define MULTIFD_MAGIC 0x11223344U +#define MULTIFD_VERSION 1 + +typedef struct { + uint32_t magic; + uint32_t version; + unsigned char uuid[16]; /* QemuUUID */ + uint8_t id; +} __attribute__((packed)) MultiFDInit_t; + +typedef struct { + /* this fields are not changed once the thread is created */ + /* channel number */ uint8_t id; + /* channel thread name */ char *name; + /* channel thread id */ QemuThread thread; + /* communication channel */ + QIOChannel *c; + /* sem where to wait for more work */ QemuSemaphore sem; + /* this mutex protects the following parameters */ QemuMutex mutex; + /* is this channel thread running */ + bool running; + /* should this thread finish */ bool quit; -}; -typedef struct MultiFDSendParams MultiFDSendParams; +} MultiFDSendParams; + +typedef struct { + /* this fields are not changed once the thread is created */ + /* channel number */ + uint8_t id; + /* channel thread name */ + char *name; + /* channel thread id */ + QemuThread thread; + /* communication channel */ + QIOChannel *c; + /* sem where to wait for more work */ + QemuSemaphore sem; + /* this mutex protects the following parameters */ + QemuMutex mutex; + /* is this channel thread running */ + bool running; + /* should this thread finish */ + bool quit; +} MultiFDRecvParams; + +static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) +{ + MultiFDInit_t msg; + int ret; + + msg.magic = cpu_to_be32(MULTIFD_MAGIC); + msg.version = cpu_to_be32(MULTIFD_VERSION); + msg.id = p->id; + memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); + + ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp); + if (ret != 0) { + return -1; + } + return 0; +} + +static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) +{ + MultiFDInit_t msg; + int ret; + + ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); + if (ret != 0) { + return -1; + } + + be32_to_cpus(&msg.magic); + be32_to_cpus(&msg.version); + + if (msg.magic != MULTIFD_MAGIC) { + error_setg(errp, "multifd: received packet magic %x " + "expected %x", msg.magic, MULTIFD_MAGIC); + return -1; + } + + if (msg.version != MULTIFD_VERSION) { + error_setg(errp, "multifd: received packet version %d " + "expected %d", msg.version, MULTIFD_VERSION); + return -1; + } + + if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { + char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); + char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); + + error_setg(errp, "multifd: received uuid '%s' and expected " + "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); + g_free(uuid); + g_free(msg_uuid); + return -1; + } + + if (msg.id > migrate_multifd_channels()) { + error_setg(errp, "multifd: received channel version %d " + "expected %d", msg.version, MULTIFD_VERSION); + return -1; + } + + return msg.id; +} struct { MultiFDSendParams *params; @@ -448,11 +618,23 @@ struct { int count; } *multifd_send_state; -static void terminate_multifd_send_threads(Error *errp) +static void multifd_send_terminate_threads(Error *err) { int i; - for (i = 0; i < multifd_send_state->count; i++) { + if (err) { + MigrationState *s = migrate_get_current(); + migrate_set_error(s, err); + if (s->state == MIGRATION_STATUS_SETUP || + s->state == MIGRATION_STATUS_PRE_SWITCHOVER || + s->state == MIGRATION_STATUS_DEVICE || + s->state == MIGRATION_STATUS_ACTIVE) { + migrate_set_state(&s->state, s->state, + MIGRATION_STATUS_FAILED); + } + } + + for (i = 0; i < migrate_multifd_channels(); i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; qemu_mutex_lock(&p->mutex); @@ -470,11 +652,15 @@ int multifd_save_cleanup(Error **errp) if (!migrate_use_multifd()) { return 0; } - terminate_multifd_send_threads(NULL); - for (i = 0; i < multifd_send_state->count; i++) { + multifd_send_terminate_threads(NULL); + for (i = 0; i < migrate_multifd_channels(); i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; - qemu_thread_join(&p->thread); + if (p->running) { + qemu_thread_join(&p->thread); + } + socket_send_channel_destroy(p->c); + p->c = NULL; qemu_mutex_destroy(&p->mutex); qemu_sem_destroy(&p->sem); g_free(p->name); @@ -490,6 +676,11 @@ int multifd_save_cleanup(Error **errp) static void *multifd_send_thread(void *opaque) { MultiFDSendParams *p = opaque; + Error *local_err = NULL; + + if (multifd_send_initial_packet(p, &local_err) < 0) { + goto out; + } while (true) { qemu_mutex_lock(&p->mutex); @@ -501,9 +692,39 @@ static void *multifd_send_thread(void *opaque) qemu_sem_wait(&p->sem); } +out: + if (local_err) { + multifd_send_terminate_threads(local_err); + } + + qemu_mutex_lock(&p->mutex); + p->running = false; + qemu_mutex_unlock(&p->mutex); + return NULL; } +static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) +{ + MultiFDSendParams *p = opaque; + QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task)); + Error *local_err = NULL; + + if (qio_task_propagate_error(task, &local_err)) { + if (multifd_save_cleanup(&local_err) != 0) { + migrate_set_error(migrate_get_current(), local_err); + } + } else { + p->c = QIO_CHANNEL(sioc); + qio_channel_set_delay(p->c, false); + p->running = true; + qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, + QEMU_THREAD_JOINABLE); + + atomic_inc(&multifd_send_state->count); + } +} + int multifd_save_setup(void) { int thread_count; @@ -515,7 +736,7 @@ int multifd_save_setup(void) thread_count = migrate_multifd_channels(); multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); - multifd_send_state->count = 0; + atomic_set(&multifd_send_state->count, 0); for (i = 0; i < thread_count; i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; @@ -524,35 +745,32 @@ int multifd_save_setup(void) p->quit = false; p->id = i; p->name = g_strdup_printf("multifdsend_%d", i); - qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, - QEMU_THREAD_JOINABLE); - - multifd_send_state->count++; + socket_send_channel_create(multifd_new_send_channel_async, p); } return 0; } -struct MultiFDRecvParams { - uint8_t id; - char *name; - QemuThread thread; - QemuSemaphore sem; - QemuMutex mutex; - bool quit; -}; -typedef struct MultiFDRecvParams MultiFDRecvParams; - struct { MultiFDRecvParams *params; /* number of created threads */ int count; } *multifd_recv_state; -static void terminate_multifd_recv_threads(Error *errp) +static void multifd_recv_terminate_threads(Error *err) { int i; - for (i = 0; i < multifd_recv_state->count; i++) { + if (err) { + MigrationState *s = migrate_get_current(); + migrate_set_error(s, err); + if (s->state == MIGRATION_STATUS_SETUP || + s->state == MIGRATION_STATUS_ACTIVE) { + migrate_set_state(&s->state, s->state, + MIGRATION_STATUS_FAILED); + } + } + + for (i = 0; i < migrate_multifd_channels(); i++) { MultiFDRecvParams *p = &multifd_recv_state->params[i]; qemu_mutex_lock(&p->mutex); @@ -570,11 +788,15 @@ int multifd_load_cleanup(Error **errp) if (!migrate_use_multifd()) { return 0; } - terminate_multifd_recv_threads(NULL); - for (i = 0; i < multifd_recv_state->count; i++) { + multifd_recv_terminate_threads(NULL); + for (i = 0; i < migrate_multifd_channels(); i++) { MultiFDRecvParams *p = &multifd_recv_state->params[i]; - qemu_thread_join(&p->thread); + if (p->running) { + qemu_thread_join(&p->thread); + } + object_unref(OBJECT(p->c)); + p->c = NULL; qemu_mutex_destroy(&p->mutex); qemu_sem_destroy(&p->sem); g_free(p->name); @@ -602,6 +824,10 @@ static void *multifd_recv_thread(void *opaque) qemu_sem_wait(&p->sem); } + qemu_mutex_lock(&p->mutex); + p->running = false; + qemu_mutex_unlock(&p->mutex); + return NULL; } @@ -616,7 +842,7 @@ int multifd_load_setup(void) thread_count = migrate_multifd_channels(); multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); - multifd_recv_state->count = 0; + atomic_set(&multifd_recv_state->count, 0); for (i = 0; i < thread_count; i++) { MultiFDRecvParams *p = &multifd_recv_state->params[i]; @@ -625,13 +851,52 @@ int multifd_load_setup(void) p->quit = false; p->id = i; p->name = g_strdup_printf("multifdrecv_%d", i); - qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, - QEMU_THREAD_JOINABLE); - multifd_recv_state->count++; } return 0; } +bool multifd_recv_all_channels_created(void) +{ + int thread_count = migrate_multifd_channels(); + + if (!migrate_use_multifd()) { + return true; + } + + return thread_count == atomic_read(&multifd_recv_state->count); +} + +void multifd_recv_new_channel(QIOChannel *ioc) +{ + MultiFDRecvParams *p; + Error *local_err = NULL; + int id; + + id = multifd_recv_initial_packet(ioc, &local_err); + if (id < 0) { + multifd_recv_terminate_threads(local_err); + return; + } + + p = &multifd_recv_state->params[id]; + if (p->c != NULL) { + error_setg(&local_err, "multifd: received id '%d' already setup'", + id); + multifd_recv_terminate_threads(local_err); + return; + } + p->c = ioc; + object_ref(OBJECT(ioc)); + + p->running = true; + qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, + QEMU_THREAD_JOINABLE); + atomic_inc(&multifd_recv_state->count); + if (multifd_recv_state->count == migrate_multifd_channels()) { + migration_incoming_process(); + } +} + /** * save_page_header: write page header to wire * @@ -1490,7 +1755,7 @@ static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss, * CPU resource. */ if (block == rs->last_sent_block && save_page_use_compression(rs)) { - res = compress_page_with_multi_thread(rs, block, offset); + return compress_page_with_multi_thread(rs, block, offset); } return ram_save_page(rs, pss, last_stage); @@ -2226,6 +2491,41 @@ static int ram_init_all(RAMState **rsp) return 0; } +static void ram_state_resume_prepare(RAMState *rs, QEMUFile *out) +{ + RAMBlock *block; + uint64_t pages = 0; + + /* + * Postcopy is not using xbzrle/compression, so no need for that. + * Also, since source are already halted, we don't need to care + * about dirty page logging as well. + */ + + RAMBLOCK_FOREACH(block) { + pages += bitmap_count_one(block->bmap, + block->used_length >> TARGET_PAGE_BITS); + } + + /* This may not be aligned with current bitmaps. Recalculate. */ + rs->migration_dirty_pages = pages; + + rs->last_seen_block = NULL; + rs->last_sent_block = NULL; + rs->last_page = 0; + rs->last_version = ram_list.version; + /* + * Disable the bulk stage, otherwise we'll resend the whole RAM no + * matter what we have sent. + */ + rs->ram_bulk_stage = false; + + /* Update RAMState cache of output QEMUFile */ + rs->f = out; + + trace_ram_state_resume_prepare(pages); +} + /* * Each of ram_save_setup, ram_save_iterate and ram_save_complete has * long-running RCU critical section. When rcu-reclaims in the code @@ -3100,6 +3400,139 @@ static bool ram_has_postcopy(void *opaque) return migrate_postcopy_ram(); } +/* Sync all the dirty bitmap with destination VM. */ +static int ram_dirty_bitmap_sync_all(MigrationState *s, RAMState *rs) +{ + RAMBlock *block; + QEMUFile *file = s->to_dst_file; + int ramblock_count = 0; + + trace_ram_dirty_bitmap_sync_start(); + + RAMBLOCK_FOREACH(block) { + qemu_savevm_send_recv_bitmap(file, block->idstr); + trace_ram_dirty_bitmap_request(block->idstr); + ramblock_count++; + } + + trace_ram_dirty_bitmap_sync_wait(); + + /* Wait until all the ramblocks' dirty bitmap synced */ + while (ramblock_count--) { + qemu_sem_wait(&s->rp_state.rp_sem); + } + + trace_ram_dirty_bitmap_sync_complete(); + + return 0; +} + +static void ram_dirty_bitmap_reload_notify(MigrationState *s) +{ + qemu_sem_post(&s->rp_state.rp_sem); +} + +/* + * Read the received bitmap, revert it as the initial dirty bitmap. + * This is only used when the postcopy migration is paused but wants + * to resume from a middle point. + */ +int ram_dirty_bitmap_reload(MigrationState *s, RAMBlock *block) +{ + int ret = -EINVAL; + QEMUFile *file = s->rp_state.from_dst_file; + unsigned long *le_bitmap, nbits = block->used_length >> TARGET_PAGE_BITS; + uint64_t local_size = nbits / 8; + uint64_t size, end_mark; + + trace_ram_dirty_bitmap_reload_begin(block->idstr); + + if (s->state != MIGRATION_STATUS_POSTCOPY_RECOVER) { + error_report("%s: incorrect state %s", __func__, + MigrationStatus_str(s->state)); + return -EINVAL; + } + + /* + * Note: see comments in ramblock_recv_bitmap_send() on why we + * need the endianess convertion, and the paddings. + */ + local_size = ROUND_UP(local_size, 8); + + /* Add paddings */ + le_bitmap = bitmap_new(nbits + BITS_PER_LONG); + + size = qemu_get_be64(file); + + /* The size of the bitmap should match with our ramblock */ + if (size != local_size) { + error_report("%s: ramblock '%s' bitmap size mismatch " + "(0x%"PRIx64" != 0x%"PRIx64")", __func__, + block->idstr, size, local_size); + ret = -EINVAL; + goto out; + } + + size = qemu_get_buffer(file, (uint8_t *)le_bitmap, local_size); + end_mark = qemu_get_be64(file); + + ret = qemu_file_get_error(file); + if (ret || size != local_size) { + error_report("%s: read bitmap failed for ramblock '%s': %d" + " (size 0x%"PRIx64", got: 0x%"PRIx64")", + __func__, block->idstr, ret, local_size, size); + ret = -EIO; + goto out; + } + + if (end_mark != RAMBLOCK_RECV_BITMAP_ENDING) { + error_report("%s: ramblock '%s' end mark incorrect: 0x%"PRIu64, + __func__, block->idstr, end_mark); + ret = -EINVAL; + goto out; + } + + /* + * Endianess convertion. We are during postcopy (though paused). + * The dirty bitmap won't change. We can directly modify it. + */ + bitmap_from_le(block->bmap, le_bitmap, nbits); + + /* + * What we received is "received bitmap". Revert it as the initial + * dirty bitmap for this ramblock. + */ + bitmap_complement(block->bmap, block->bmap, nbits); + + trace_ram_dirty_bitmap_reload_complete(block->idstr); + + /* + * We succeeded to sync bitmap for current ramblock. If this is + * the last one to sync, we need to notify the main send thread. + */ + ram_dirty_bitmap_reload_notify(s); + + ret = 0; +out: + free(le_bitmap); + return ret; +} + +static int ram_resume_prepare(MigrationState *s, void *opaque) +{ + RAMState *rs = *(RAMState **)opaque; + int ret; + + ret = ram_dirty_bitmap_sync_all(s, rs); + if (ret) { + return ret; + } + + ram_state_resume_prepare(rs, s->to_dst_file); + + return 0; +} + static SaveVMHandlers savevm_ram_handlers = { .save_setup = ram_save_setup, .save_live_iterate = ram_save_iterate, @@ -3111,6 +3544,7 @@ static SaveVMHandlers savevm_ram_handlers = { .save_cleanup = ram_save_cleanup, .load_setup = ram_load_setup, .load_cleanup = ram_load_cleanup, + .resume_prepare = ram_resume_prepare, }; void ram_mig_init(void) diff --git a/migration/ram.h b/migration/ram.h index 5030be110a..d386f4d641 100644 --- a/migration/ram.h +++ b/migration/ram.h @@ -32,6 +32,7 @@ #include "qemu-common.h" #include "qapi/qapi-types-migration.h" #include "exec/cpu-common.h" +#include "io/channel.h" extern MigrationStats ram_counters; extern XBZRLECacheStats xbzrle_counters; @@ -44,6 +45,8 @@ int multifd_save_setup(void); int multifd_save_cleanup(Error **errp); int multifd_load_setup(void); int multifd_load_cleanup(Error **errp); +bool multifd_recv_all_channels_created(void); +void multifd_recv_new_channel(QIOChannel *ioc); uint64_t ram_pagesize_summary(void); int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len); @@ -63,5 +66,8 @@ int ramblock_recv_bitmap_test(RAMBlock *rb, void *host_addr); bool ramblock_recv_bitmap_test_byte_offset(RAMBlock *rb, uint64_t byte_offset); void ramblock_recv_bitmap_set(RAMBlock *rb, void *host_addr); void ramblock_recv_bitmap_set_range(RAMBlock *rb, void *host_addr, size_t nr); +int64_t ramblock_recv_bitmap_send(QEMUFile *file, + const char *block_name); +int ram_dirty_bitmap_reload(MigrationState *s, RAMBlock *rb); #endif diff --git a/migration/rdma.c b/migration/rdma.c index da474fc19f..7d233b0820 100644 --- a/migration/rdma.c +++ b/migration/rdma.c @@ -708,6 +708,9 @@ static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block) memcpy(local->block + block->index, old + (block->index + 1), sizeof(RDMALocalBlock) * (local->nb_blocks - (block->index + 1))); + for (x = block->index; x < local->nb_blocks - 1; x++) { + local->block[x].index--; + } } } else { assert(block == local->block); @@ -3246,6 +3249,10 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque) qsort(rdma->local_ram_blocks.block, rdma->local_ram_blocks.nb_blocks, sizeof(RDMALocalBlock), dest_ram_sort_func); + for (i = 0; i < local->nb_blocks; i++) { + local->block[i].index = i; + } + if (rdma->pin_all) { ret = qemu_rdma_reg_whole_ram_blocks(rdma); if (ret) { diff --git a/migration/savevm.c b/migration/savevm.c index e2be02afe4..4251125831 100644 --- a/migration/savevm.c +++ b/migration/savevm.c @@ -80,7 +80,9 @@ enum qemu_vm_cmd { MIG_CMD_POSTCOPY_RAM_DISCARD, /* A list of pages to discard that were previously sent during precopy but are dirty. */ + MIG_CMD_POSTCOPY_RESUME, /* resume postcopy on dest */ MIG_CMD_PACKAGED, /* Send a wrapped stream within this stream */ + MIG_CMD_RECV_BITMAP, /* Request for recved bitmap on dst */ MIG_CMD_MAX }; @@ -97,7 +99,9 @@ static struct mig_cmd_args { [MIG_CMD_POSTCOPY_RUN] = { .len = 0, .name = "POSTCOPY_RUN" }, [MIG_CMD_POSTCOPY_RAM_DISCARD] = { .len = -1, .name = "POSTCOPY_RAM_DISCARD" }, + [MIG_CMD_POSTCOPY_RESUME] = { .len = 0, .name = "POSTCOPY_RESUME" }, [MIG_CMD_PACKAGED] = { .len = 4, .name = "PACKAGED" }, + [MIG_CMD_RECV_BITMAP] = { .len = -1, .name = "RECV_BITMAP" }, [MIG_CMD_MAX] = { .len = -1, .name = "MAX" }, }; @@ -956,6 +960,25 @@ void qemu_savevm_send_postcopy_run(QEMUFile *f) qemu_savevm_command_send(f, MIG_CMD_POSTCOPY_RUN, 0, NULL); } +void qemu_savevm_send_postcopy_resume(QEMUFile *f) +{ + trace_savevm_send_postcopy_resume(); + qemu_savevm_command_send(f, MIG_CMD_POSTCOPY_RESUME, 0, NULL); +} + +void qemu_savevm_send_recv_bitmap(QEMUFile *f, char *block_name) +{ + size_t len; + char buf[256]; + + trace_savevm_send_recv_bitmap(block_name); + + buf[0] = len = strlen(block_name); + memcpy(buf + 1, block_name, len); + + qemu_savevm_command_send(f, MIG_CMD_RECV_BITMAP, len + 1, (uint8_t *)buf); +} + bool qemu_savevm_state_blocked(Error **errp) { SaveStateEntry *se; @@ -1008,6 +1031,31 @@ void qemu_savevm_state_setup(QEMUFile *f) } } +int qemu_savevm_state_resume_prepare(MigrationState *s) +{ + SaveStateEntry *se; + int ret; + + trace_savevm_state_resume_prepare(); + + QTAILQ_FOREACH(se, &savevm_state.handlers, entry) { + if (!se->ops || !se->ops->resume_prepare) { + continue; + } + if (se->ops && se->ops->is_active) { + if (!se->ops->is_active(se->opaque)) { + continue; + } + } + ret = se->ops->resume_prepare(s, se->opaque); + if (ret < 0) { + return ret; + } + } + + return 0; +} + /* * this function has three return values: * negative: there was one error, and we have -errno. @@ -1564,8 +1612,8 @@ static int loadvm_postcopy_ram_handle_discard(MigrationIncomingState *mis, */ static void *postcopy_ram_listen_thread(void *opaque) { - QEMUFile *f = opaque; MigrationIncomingState *mis = migration_incoming_get_current(); + QEMUFile *f = mis->from_src_file; int load_res; migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE, @@ -1579,6 +1627,14 @@ static void *postcopy_ram_listen_thread(void *opaque) */ qemu_file_set_blocking(f, true); load_res = qemu_loadvm_state_main(f, mis); + + /* + * This is tricky, but, mis->from_src_file can change after it + * returns, when postcopy recovery happened. In the future, we may + * want a wrapper for the QEMUFile handle. + */ + f = mis->from_src_file; + /* And non-blocking again so we don't block in any cleanup */ qemu_file_set_blocking(f, false); @@ -1668,7 +1724,7 @@ static int loadvm_postcopy_handle_listen(MigrationIncomingState *mis) /* Start up the listening thread and wait for it to signal ready */ qemu_sem_init(&mis->listen_thread_sem, 0); qemu_thread_create(&mis->listen_thread, "postcopy/listen", - postcopy_ram_listen_thread, mis->from_src_file, + postcopy_ram_listen_thread, NULL, QEMU_THREAD_DETACHED); qemu_sem_wait(&mis->listen_thread_sem); qemu_sem_destroy(&mis->listen_thread_sem); @@ -1745,6 +1801,31 @@ static int loadvm_postcopy_handle_run(MigrationIncomingState *mis) return LOADVM_QUIT; } +static int loadvm_postcopy_handle_resume(MigrationIncomingState *mis) +{ + if (mis->state != MIGRATION_STATUS_POSTCOPY_RECOVER) { + error_report("%s: illegal resume received", __func__); + /* Don't fail the load, only for this. */ + return 0; + } + + /* + * This means source VM is ready to resume the postcopy migration. + * It's time to switch state and release the fault thread to + * continue service page faults. + */ + migrate_set_state(&mis->state, MIGRATION_STATUS_POSTCOPY_RECOVER, + MIGRATION_STATUS_POSTCOPY_ACTIVE); + qemu_sem_post(&mis->postcopy_pause_sem_fault); + + trace_loadvm_postcopy_handle_resume(); + + /* Tell source that "we are ready" */ + migrate_send_rp_resume_ack(mis, MIGRATION_RESUME_ACK_VALUE); + + return 0; +} + /** * Immediately following this command is a blob of data containing an embedded * chunk of migration stream; read it and load it. @@ -1794,6 +1875,49 @@ static int loadvm_handle_cmd_packaged(MigrationIncomingState *mis) } /* + * Handle request that source requests for recved_bitmap on + * destination. Payload format: + * + * len (1 byte) + ramblock_name (<255 bytes) + */ +static int loadvm_handle_recv_bitmap(MigrationIncomingState *mis, + uint16_t len) +{ + QEMUFile *file = mis->from_src_file; + RAMBlock *rb; + char block_name[256]; + size_t cnt; + + cnt = qemu_get_counted_string(file, block_name); + if (!cnt) { + error_report("%s: failed to read block name", __func__); + return -EINVAL; + } + + /* Validate before using the data */ + if (qemu_file_get_error(file)) { + return qemu_file_get_error(file); + } + + if (len != cnt + 1) { + error_report("%s: invalid payload length (%d)", __func__, len); + return -EINVAL; + } + + rb = qemu_ram_block_by_name(block_name); + if (!rb) { + error_report("%s: block '%s' not found", __func__, block_name); + return -EINVAL; + } + + migrate_send_rp_recv_bitmap(mis, block_name); + + trace_loadvm_handle_recv_bitmap(block_name); + + return 0; +} + +/* * Process an incoming 'QEMU_VM_COMMAND' * 0 just a normal return * LOADVM_QUIT All good, but exit the loop @@ -1866,6 +1990,12 @@ static int loadvm_process_command(QEMUFile *f) case MIG_CMD_POSTCOPY_RAM_DISCARD: return loadvm_postcopy_ram_handle_discard(mis, len); + + case MIG_CMD_POSTCOPY_RESUME: + return loadvm_postcopy_handle_resume(mis); + + case MIG_CMD_RECV_BITMAP: + return loadvm_handle_recv_bitmap(mis, len); } return 0; @@ -2055,11 +2185,50 @@ void qemu_loadvm_state_cleanup(void) } } +/* Return true if we should continue the migration, or false. */ +static bool postcopy_pause_incoming(MigrationIncomingState *mis) +{ + trace_postcopy_pause_incoming(); + + /* Clear the triggered bit to allow one recovery */ + mis->postcopy_recover_triggered = false; + + migrate_set_state(&mis->state, MIGRATION_STATUS_POSTCOPY_ACTIVE, + MIGRATION_STATUS_POSTCOPY_PAUSED); + + assert(mis->from_src_file); + qemu_file_shutdown(mis->from_src_file); + qemu_fclose(mis->from_src_file); + mis->from_src_file = NULL; + + assert(mis->to_src_file); + qemu_file_shutdown(mis->to_src_file); + qemu_mutex_lock(&mis->rp_mutex); + qemu_fclose(mis->to_src_file); + mis->to_src_file = NULL; + qemu_mutex_unlock(&mis->rp_mutex); + + /* Notify the fault thread for the invalidated file handle */ + postcopy_fault_thread_notify(mis); + + error_report("Detected IO failure for postcopy. " + "Migration paused."); + + while (mis->state == MIGRATION_STATUS_POSTCOPY_PAUSED) { + qemu_sem_wait(&mis->postcopy_pause_sem_dst); + } + + trace_postcopy_pause_incoming_continued(); + + return true; +} + static int qemu_loadvm_state_main(QEMUFile *f, MigrationIncomingState *mis) { uint8_t section_type; int ret = 0; +retry: while (true) { section_type = qemu_get_byte(f); @@ -2104,6 +2273,24 @@ static int qemu_loadvm_state_main(QEMUFile *f, MigrationIncomingState *mis) out: if (ret < 0) { qemu_file_set_error(f, ret); + + /* + * Detect whether it is: + * + * 1. postcopy running (after receiving all device data, which + * must be in POSTCOPY_INCOMING_RUNNING state. Note that + * POSTCOPY_INCOMING_LISTENING is still not enough, it's + * still receiving device states). + * 2. network failure (-EIO) + * + * If so, we try to wait for a recovery. + */ + if (postcopy_state_get() == POSTCOPY_INCOMING_RUNNING && + ret == -EIO && postcopy_pause_incoming(mis)) { + /* Reset f to point to the newly created channel */ + f = mis->from_src_file; + goto retry; + } } return ret; } diff --git a/migration/savevm.h b/migration/savevm.h index cf4f0d37ca..a5e65b8ae3 100644 --- a/migration/savevm.h +++ b/migration/savevm.h @@ -31,6 +31,7 @@ bool qemu_savevm_state_blocked(Error **errp); void qemu_savevm_state_setup(QEMUFile *f); +int qemu_savevm_state_resume_prepare(MigrationState *s); void qemu_savevm_state_header(QEMUFile *f); int qemu_savevm_state_iterate(QEMUFile *f, bool postcopy); void qemu_savevm_state_cleanup(void); @@ -47,6 +48,8 @@ int qemu_savevm_send_packaged(QEMUFile *f, const uint8_t *buf, size_t len); void qemu_savevm_send_postcopy_advise(QEMUFile *f); void qemu_savevm_send_postcopy_listen(QEMUFile *f); void qemu_savevm_send_postcopy_run(QEMUFile *f); +void qemu_savevm_send_postcopy_resume(QEMUFile *f); +void qemu_savevm_send_recv_bitmap(QEMUFile *f, char *block_name); void qemu_savevm_send_postcopy_ram_discard(QEMUFile *f, const char *name, uint16_t len, diff --git a/migration/socket.c b/migration/socket.c index 122d8ccfbe..3456eb76e9 100644 --- a/migration/socket.c +++ b/migration/socket.c @@ -28,6 +28,28 @@ #include "trace.h" +struct SocketOutgoingArgs { + SocketAddress *saddr; +} outgoing_args; + +void socket_send_channel_create(QIOTaskFunc f, void *data) +{ + QIOChannelSocket *sioc = qio_channel_socket_new(); + qio_channel_socket_connect_async(sioc, outgoing_args.saddr, + f, data, NULL, NULL); +} + +int socket_send_channel_destroy(QIOChannel *send) +{ + /* Remove channel */ + object_unref(OBJECT(send)); + if (outgoing_args.saddr) { + qapi_free_SocketAddress(outgoing_args.saddr); + outgoing_args.saddr = NULL; + } + return 0; +} + static SocketAddress *tcp_build_address(const char *host_port, Error **errp) { SocketAddress *saddr; @@ -95,6 +117,11 @@ static void socket_start_outgoing_migration(MigrationState *s, struct SocketConnectData *data = g_new0(struct SocketConnectData, 1); data->s = s; + + /* in case previous migration leaked it */ + qapi_free_SocketAddress(outgoing_args.saddr); + outgoing_args.saddr = saddr; + if (saddr->type == SOCKET_ADDRESS_TYPE_INET) { data->hostname = g_strdup(saddr->u.inet.host); } @@ -106,7 +133,6 @@ static void socket_start_outgoing_migration(MigrationState *s, data, socket_connect_data_free, NULL); - qapi_free_SocketAddress(saddr); } void tcp_start_outgoing_migration(MigrationState *s, @@ -144,6 +170,10 @@ static void socket_accept_incoming_migration(QIONetListener *listener, qio_net_listener_disconnect(listener); object_unref(OBJECT(listener)); + + if (!migrate_use_multifd()) { + migration_incoming_process(); + } } } @@ -160,9 +190,10 @@ static void socket_start_incoming_migration(SocketAddress *saddr, return; } - qio_net_listener_set_client_func(listener, - socket_accept_incoming_migration, - NULL, NULL); + qio_net_listener_set_client_func_full(listener, + socket_accept_incoming_migration, + NULL, NULL, + g_main_context_get_thread_default()); } void tcp_start_incoming_migration(const char *host_port, Error **errp) diff --git a/migration/socket.h b/migration/socket.h index 6b91e9db38..528c3b0202 100644 --- a/migration/socket.h +++ b/migration/socket.h @@ -16,6 +16,13 @@ #ifndef QEMU_MIGRATION_SOCKET_H #define QEMU_MIGRATION_SOCKET_H + +#include "io/channel.h" +#include "io/task.h" + +void socket_send_channel_create(QIOTaskFunc f, void *data); +int socket_send_channel_destroy(QIOChannel *send); + void tcp_start_incoming_migration(const char *host_port, Error **errp); void tcp_start_outgoing_migration(MigrationState *s, const char *host_port, diff --git a/migration/trace-events b/migration/trace-events index d6be74b7a7..3c798ddd11 100644 --- a/migration/trace-events +++ b/migration/trace-events @@ -12,11 +12,13 @@ loadvm_state_cleanup(void) "" loadvm_handle_cmd_packaged(unsigned int length) "%u" loadvm_handle_cmd_packaged_main(int ret) "%d" loadvm_handle_cmd_packaged_received(int ret) "%d" +loadvm_handle_recv_bitmap(char *s) "%s" loadvm_postcopy_handle_advise(void) "" loadvm_postcopy_handle_listen(void) "" loadvm_postcopy_handle_run(void) "" loadvm_postcopy_handle_run_cpu_sync(void) "" loadvm_postcopy_handle_run_vmstart(void) "" +loadvm_postcopy_handle_resume(void) "" loadvm_postcopy_ram_handle_discard(void) "" loadvm_postcopy_ram_handle_discard_end(void) "" loadvm_postcopy_ram_handle_discard_header(const char *ramid, uint16_t len) "%s: %ud" @@ -34,7 +36,10 @@ savevm_send_open_return_path(void) "" savevm_send_ping(uint32_t val) "0x%x" savevm_send_postcopy_listen(void) "" savevm_send_postcopy_run(void) "" +savevm_send_postcopy_resume(void) "" +savevm_send_recv_bitmap(char *name) "%s" savevm_state_setup(void) "" +savevm_state_resume_prepare(void) "" savevm_state_header(void) "" savevm_state_iterate(void) "" savevm_state_cleanup(void) "" @@ -77,6 +82,13 @@ ram_load_postcopy_loop(uint64_t addr, int flags) "@%" PRIx64 " %x" ram_postcopy_send_discard_bitmap(void) "" ram_save_page(const char *rbname, uint64_t offset, void *host) "%s: offset: 0x%" PRIx64 " host: %p" ram_save_queue_pages(const char *rbname, size_t start, size_t len) "%s: start: 0x%zx len: 0x%zx" +ram_dirty_bitmap_request(char *str) "%s" +ram_dirty_bitmap_reload_begin(char *str) "%s" +ram_dirty_bitmap_reload_complete(char *str) "%s" +ram_dirty_bitmap_sync_start(void) "" +ram_dirty_bitmap_sync_wait(void) "" +ram_dirty_bitmap_sync_complete(void) "" +ram_state_resume_prepare(uint64_t v) "%" PRId64 # migration/migration.c await_return_path_close_on_source_close(void) "" @@ -88,6 +100,7 @@ migrate_fd_cancel(void) "" migrate_handle_rp_req_pages(const char *rbname, size_t start, size_t len) "in %s at 0x%zx len 0x%zx" migrate_pending(uint64_t size, uint64_t max, uint64_t pre, uint64_t compat, uint64_t post) "pending size %" PRIu64 " max %" PRIu64 " (pre = %" PRIu64 " compat=%" PRIu64 " post=%" PRIu64 ")" migrate_send_rp_message(int msg_type, uint16_t len) "%d: len %d" +migrate_send_rp_recv_bitmap(char *name, int64_t size) "block '%s' size 0x%"PRIi64 migration_completion_file_err(void) "" migration_completion_postcopy_end(void) "" migration_completion_postcopy_end_after_complete(void) "" @@ -99,6 +112,13 @@ migration_thread_setup_complete(void) "" open_return_path_on_source(void) "" open_return_path_on_source_continue(void) "" postcopy_start(void) "" +postcopy_pause_return_path(void) "" +postcopy_pause_return_path_continued(void) "" +postcopy_pause_fault_thread(void) "" +postcopy_pause_fault_thread_continued(void) "" +postcopy_pause_continued(void) "" +postcopy_pause_incoming(void) "" +postcopy_pause_incoming_continued(void) "" postcopy_start_set_run(void) "" source_return_path_thread_bad_end(void) "" source_return_path_thread_end(void) "" @@ -106,6 +126,7 @@ source_return_path_thread_entry(void) "" source_return_path_thread_loop_top(void) "" source_return_path_thread_pong(uint32_t val) "0x%x" source_return_path_thread_shut(uint32_t val) "0x%x" +source_return_path_thread_resume_ack(uint32_t v) "%"PRIu32 migrate_global_state_post_load(const char *state) "loaded state: %s" migrate_global_state_pre_save(const char *state) "saved state: %s" migration_thread_low_pending(uint64_t pending) "%" PRIu64 |