summary refs log tree commit diff stats
path: root/migration/savevm.c
diff options
context:
space:
mode:
Diffstat (limited to 'migration/savevm.c')
-rw-r--r--migration/savevm.c201
1 files changed, 198 insertions, 3 deletions
diff --git a/migration/savevm.c b/migration/savevm.c
index 4046faf009..5c4fdfd95e 100644
--- a/migration/savevm.c
+++ b/migration/savevm.c
@@ -37,6 +37,7 @@
 #include "migration/register.h"
 #include "migration/global_state.h"
 #include "migration/channel-block.h"
+#include "multifd.h"
 #include "ram.h"
 #include "qemu-file.h"
 #include "savevm.h"
@@ -54,6 +55,7 @@
 #include "qemu/job.h"
 #include "qemu/main-loop.h"
 #include "block/snapshot.h"
+#include "block/thread-pool.h"
 #include "qemu/cutils.h"
 #include "io/channel-buffer.h"
 #include "io/channel-file.h"
@@ -90,6 +92,7 @@ enum qemu_vm_cmd {
     MIG_CMD_ENABLE_COLO,       /* Enable COLO */
     MIG_CMD_POSTCOPY_RESUME,   /* resume postcopy on dest */
     MIG_CMD_RECV_BITMAP,       /* Request for recved bitmap on dst */
+    MIG_CMD_SWITCHOVER_START,  /* Switchover start notification */
     MIG_CMD_MAX
 };
 
@@ -109,6 +112,7 @@ static struct mig_cmd_args {
     [MIG_CMD_POSTCOPY_RESUME]  = { .len =  0, .name = "POSTCOPY_RESUME" },
     [MIG_CMD_PACKAGED]         = { .len =  4, .name = "PACKAGED" },
     [MIG_CMD_RECV_BITMAP]      = { .len = -1, .name = "RECV_BITMAP" },
+    [MIG_CMD_SWITCHOVER_START] = { .len =  0, .name = "SWITCHOVER_START" },
     [MIG_CMD_MAX]              = { .len = -1, .name = "MAX" },
 };
 
@@ -130,6 +134,35 @@ static struct mig_cmd_args {
  */
 
 /***********************************************************/
+/* Optional load threads pool support */
+
+static void qemu_loadvm_thread_pool_create(MigrationIncomingState *mis)
+{
+    assert(!mis->load_threads);
+    mis->load_threads = thread_pool_new();
+    mis->load_threads_abort = false;
+}
+
+static void qemu_loadvm_thread_pool_destroy(MigrationIncomingState *mis)
+{
+    qatomic_set(&mis->load_threads_abort, true);
+
+    bql_unlock(); /* Load threads might be waiting for BQL */
+    g_clear_pointer(&mis->load_threads, thread_pool_free);
+    bql_lock();
+}
+
+static bool qemu_loadvm_thread_pool_wait(MigrationState *s,
+                                         MigrationIncomingState *mis)
+{
+    bql_unlock(); /* Let load threads do work requiring BQL */
+    thread_pool_wait(mis->load_threads);
+    bql_lock();
+
+    return !migrate_has_error(s);
+}
+
+/***********************************************************/
 /* savevm/loadvm support */
 
 static QEMUFile *qemu_fopen_bdrv(BlockDriverState *bs, int is_writable)
@@ -1201,6 +1234,19 @@ void qemu_savevm_send_recv_bitmap(QEMUFile *f, char *block_name)
     qemu_savevm_command_send(f, MIG_CMD_RECV_BITMAP, len + 1, (uint8_t *)buf);
 }
 
+static void qemu_savevm_send_switchover_start(QEMUFile *f)
+{
+    trace_savevm_send_switchover_start();
+    qemu_savevm_command_send(f, MIG_CMD_SWITCHOVER_START, 0, NULL);
+}
+
+void qemu_savevm_maybe_send_switchover_start(QEMUFile *f)
+{
+    if (migrate_send_switchover_start()) {
+        qemu_savevm_send_switchover_start(f);
+    }
+}
+
 bool qemu_savevm_state_blocked(Error **errp)
 {
     SaveStateEntry *se;
@@ -1482,6 +1528,24 @@ int qemu_savevm_state_complete_precopy_iterable(QEMUFile *f, bool in_postcopy)
     int64_t start_ts_each, end_ts_each;
     SaveStateEntry *se;
     int ret;
+    bool multifd_device_state = multifd_device_state_supported();
+
+    if (multifd_device_state) {
+        QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
+            SaveLiveCompletePrecopyThreadHandler hdlr;
+
+            if (!se->ops || (in_postcopy && se->ops->has_postcopy &&
+                             se->ops->has_postcopy(se->opaque)) ||
+                !se->ops->save_live_complete_precopy_thread) {
+                continue;
+            }
+
+            hdlr = se->ops->save_live_complete_precopy_thread;
+            multifd_spawn_device_state_save_thread(hdlr,
+                                                   se->idstr, se->instance_id,
+                                                   se->opaque);
+        }
+    }
 
     QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
         if (!se->ops ||
@@ -1507,16 +1571,35 @@ int qemu_savevm_state_complete_precopy_iterable(QEMUFile *f, bool in_postcopy)
         save_section_footer(f, se);
         if (ret < 0) {
             qemu_file_set_error(f, ret);
-            return -1;
+            goto ret_fail_abort_threads;
         }
         end_ts_each = qemu_clock_get_us(QEMU_CLOCK_REALTIME);
         trace_vmstate_downtime_save("iterable", se->idstr, se->instance_id,
                                     end_ts_each - start_ts_each);
     }
 
+    if (multifd_device_state) {
+        if (migrate_has_error(migrate_get_current())) {
+            multifd_abort_device_state_save_threads();
+        }
+
+        if (!multifd_join_device_state_save_threads()) {
+            qemu_file_set_error(f, -EINVAL);
+            return -1;
+        }
+    }
+
     trace_vmstate_downtime_checkpoint("src-iterable-saved");
 
     return 0;
+
+ret_fail_abort_threads:
+    if (multifd_device_state) {
+        multifd_abort_device_state_save_threads();
+        multifd_join_device_state_save_threads();
+    }
+
+    return -1;
 }
 
 int qemu_savevm_state_complete_precopy_non_iterable(QEMUFile *f,
@@ -1687,6 +1770,7 @@ static int qemu_savevm_state(QEMUFile *f, Error **errp)
 
     ret = qemu_file_get_error(f);
     if (ret == 0) {
+        qemu_savevm_maybe_send_switchover_start(f);
         qemu_savevm_state_complete_precopy(f, false);
         ret = qemu_file_get_error(f);
     }
@@ -1970,6 +2054,8 @@ static void *postcopy_ram_listen_thread(void *opaque)
      * in qemu_file, and thus we must be blocking now.
      */
     qemu_file_set_blocking(f, true);
+
+    /* TODO: sanity check that only postcopiable data will be loaded here */
     load_res = qemu_loadvm_state_main(f, mis);
 
     /*
@@ -2030,7 +2116,9 @@ static void *postcopy_ram_listen_thread(void *opaque)
      * (If something broke then qemu will have to exit anyway since it's
      * got a bad migration state).
      */
+    bql_lock();
     migration_incoming_state_destroy();
+    bql_unlock();
 
     rcu_unregister_thread();
     mis->have_listen_thread = false;
@@ -2383,6 +2471,26 @@ static int loadvm_process_enable_colo(MigrationIncomingState *mis)
     return ret;
 }
 
+static int loadvm_postcopy_handle_switchover_start(void)
+{
+    SaveStateEntry *se;
+
+    QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
+        int ret;
+
+        if (!se->ops || !se->ops->switchover_start) {
+            continue;
+        }
+
+        ret = se->ops->switchover_start(se->opaque);
+        if (ret < 0) {
+            return ret;
+        }
+    }
+
+    return 0;
+}
+
 /*
  * Process an incoming 'QEMU_VM_COMMAND'
  * 0           just a normal return
@@ -2481,6 +2589,9 @@ static int loadvm_process_command(QEMUFile *f)
 
     case MIG_CMD_ENABLE_COLO:
         return loadvm_process_enable_colo(mis);
+
+    case MIG_CMD_SWITCHOVER_START:
+        return loadvm_postcopy_handle_switchover_start();
     }
 
     return 0;
@@ -2740,16 +2851,68 @@ static int qemu_loadvm_state_setup(QEMUFile *f, Error **errp)
     return 0;
 }
 
-void qemu_loadvm_state_cleanup(void)
+struct LoadThreadData {
+    MigrationLoadThread function;
+    void *opaque;
+};
+
+static int qemu_loadvm_load_thread(void *thread_opaque)
+{
+    struct LoadThreadData *data = thread_opaque;
+    MigrationIncomingState *mis = migration_incoming_get_current();
+    g_autoptr(Error) local_err = NULL;
+
+    if (!data->function(data->opaque, &mis->load_threads_abort, &local_err)) {
+        MigrationState *s = migrate_get_current();
+
+        /*
+         * Can't set load_threads_abort here since processing of main migration
+         * channel data could still be happening, resulting in launching of new
+         * load threads.
+         */
+
+        assert(local_err);
+
+        /*
+         * In case of multiple load threads failing which thread error
+         * return we end setting is purely arbitrary.
+         */
+        migrate_set_error(s, local_err);
+    }
+
+    return 0;
+}
+
+void qemu_loadvm_start_load_thread(MigrationLoadThread function,
+                                   void *opaque)
+{
+    MigrationIncomingState *mis = migration_incoming_get_current();
+    struct LoadThreadData *data;
+
+    /* We only set it from this thread so it's okay to read it directly */
+    assert(!mis->load_threads_abort);
+
+    data = g_new(struct LoadThreadData, 1);
+    data->function = function;
+    data->opaque = opaque;
+
+    thread_pool_submit_immediate(mis->load_threads, qemu_loadvm_load_thread,
+                                 data, g_free);
+}
+
+void qemu_loadvm_state_cleanup(MigrationIncomingState *mis)
 {
     SaveStateEntry *se;
 
     trace_loadvm_state_cleanup();
+
     QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
         if (se->ops && se->ops->load_cleanup) {
             se->ops->load_cleanup(se->opaque);
         }
     }
+
+    qemu_loadvm_thread_pool_destroy(mis);
 }
 
 /* Return true if we should continue the migration, or false. */
@@ -2900,6 +3063,7 @@ out:
 
 int qemu_loadvm_state(QEMUFile *f)
 {
+    MigrationState *s = migrate_get_current();
     MigrationIncomingState *mis = migration_incoming_get_current();
     Error *local_err = NULL;
     int ret;
@@ -2909,6 +3073,8 @@ int qemu_loadvm_state(QEMUFile *f)
         return -EINVAL;
     }
 
+    qemu_loadvm_thread_pool_create(mis);
+
     ret = qemu_loadvm_state_header(f);
     if (ret) {
         return ret;
@@ -2940,12 +3106,18 @@ int qemu_loadvm_state(QEMUFile *f)
 
     /* When reaching here, it must be precopy */
     if (ret == 0) {
-        if (migrate_has_error(migrate_get_current())) {
+        if (migrate_has_error(migrate_get_current()) ||
+            !qemu_loadvm_thread_pool_wait(s, mis)) {
             ret = -EINVAL;
         } else {
             ret = qemu_file_get_error(f);
         }
     }
+    /*
+     * Set this flag unconditionally so we'll catch further attempts to
+     * start additional threads via an appropriate assert()
+     */
+    qatomic_set(&mis->load_threads_abort, true);
 
     /*
      * Try to read in the VMDESC section as well, so that dumping tools that
@@ -3021,6 +3193,29 @@ int qemu_loadvm_approve_switchover(void)
     return migrate_send_rp_switchover_ack(mis);
 }
 
+bool qemu_loadvm_load_state_buffer(const char *idstr, uint32_t instance_id,
+                                   char *buf, size_t len, Error **errp)
+{
+    SaveStateEntry *se;
+
+    se = find_se(idstr, instance_id);
+    if (!se) {
+        error_setg(errp,
+                   "Unknown idstr %s or instance id %u for load state buffer",
+                   idstr, instance_id);
+        return false;
+    }
+
+    if (!se->ops || !se->ops->load_state_buffer) {
+        error_setg(errp,
+                   "idstr %s / instance %u has no load state buffer operation",
+                   idstr, instance_id);
+        return false;
+    }
+
+    return se->ops->load_state_buffer(se->opaque, buf, len, errp);
+}
+
 bool save_snapshot(const char *name, bool overwrite, const char *vmstate,
                   bool has_devices, strList *devices, Error **errp)
 {