summary refs log tree commit diff stats
path: root/migration/ram.c
diff options
context:
space:
mode:
Diffstat (limited to 'migration/ram.c')
-rw-r--r--migration/ram.c500
1 files changed, 467 insertions, 33 deletions
diff --git a/migration/ram.c b/migration/ram.c
index 912810c18e..5bcbf7a9f9 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -36,6 +36,7 @@
 #include "xbzrle.h"
 #include "ram.h"
 #include "migration.h"
+#include "socket.h"
 #include "migration/register.h"
 #include "migration/misc.h"
 #include "qemu-file.h"
@@ -51,6 +52,9 @@
 #include "qemu/rcu_queue.h"
 #include "migration/colo.h"
 #include "migration/block.h"
+#include "sysemu/sysemu.h"
+#include "qemu/uuid.h"
+#include "savevm.h"
 
 /***********************************************************/
 /* ram save/restore */
@@ -187,6 +191,70 @@ void ramblock_recv_bitmap_set_range(RAMBlock *rb, void *host_addr,
                       nr);
 }
 
+#define  RAMBLOCK_RECV_BITMAP_ENDING  (0x0123456789abcdefULL)
+
+/*
+ * Format: bitmap_size (8 bytes) + whole_bitmap (N bytes).
+ *
+ * Returns >0 if success with sent bytes, or <0 if error.
+ */
+int64_t ramblock_recv_bitmap_send(QEMUFile *file,
+                                  const char *block_name)
+{
+    RAMBlock *block = qemu_ram_block_by_name(block_name);
+    unsigned long *le_bitmap, nbits;
+    uint64_t size;
+
+    if (!block) {
+        error_report("%s: invalid block name: %s", __func__, block_name);
+        return -1;
+    }
+
+    nbits = block->used_length >> TARGET_PAGE_BITS;
+
+    /*
+     * Make sure the tmp bitmap buffer is big enough, e.g., on 32bit
+     * machines we may need 4 more bytes for padding (see below
+     * comment). So extend it a bit before hand.
+     */
+    le_bitmap = bitmap_new(nbits + BITS_PER_LONG);
+
+    /*
+     * Always use little endian when sending the bitmap. This is
+     * required that when source and destination VMs are not using the
+     * same endianess. (Note: big endian won't work.)
+     */
+    bitmap_to_le(le_bitmap, block->receivedmap, nbits);
+
+    /* Size of the bitmap, in bytes */
+    size = nbits / 8;
+
+    /*
+     * size is always aligned to 8 bytes for 64bit machines, but it
+     * may not be true for 32bit machines. We need this padding to
+     * make sure the migration can survive even between 32bit and
+     * 64bit machines.
+     */
+    size = ROUND_UP(size, 8);
+
+    qemu_put_be64(file, size);
+    qemu_put_buffer(file, (const uint8_t *)le_bitmap, size);
+    /*
+     * Mark as an end, in case the middle part is screwed up due to
+     * some "misterious" reason.
+     */
+    qemu_put_be64(file, RAMBLOCK_RECV_BITMAP_ENDING);
+    qemu_fflush(file);
+
+    free(le_bitmap);
+
+    if (qemu_file_get_error(file)) {
+        return qemu_file_get_error(file);
+    }
+
+    return size + sizeof(size);
+}
+
 /*
  * An outstanding page request, on the source, having been received
  * and queued
@@ -432,15 +500,117 @@ exit:
 
 /* Multiple fd's */
 
-struct MultiFDSendParams {
+#define MULTIFD_MAGIC 0x11223344U
+#define MULTIFD_VERSION 1
+
+typedef struct {
+    uint32_t magic;
+    uint32_t version;
+    unsigned char uuid[16]; /* QemuUUID */
+    uint8_t id;
+} __attribute__((packed)) MultiFDInit_t;
+
+typedef struct {
+    /* this fields are not changed once the thread is created */
+    /* channel number */
     uint8_t id;
+    /* channel thread name */
     char *name;
+    /* channel thread id */
     QemuThread thread;
+    /* communication channel */
+    QIOChannel *c;
+    /* sem where to wait for more work */
     QemuSemaphore sem;
+    /* this mutex protects the following parameters */
     QemuMutex mutex;
+    /* is this channel thread running */
+    bool running;
+    /* should this thread finish */
     bool quit;
-};
-typedef struct MultiFDSendParams MultiFDSendParams;
+}  MultiFDSendParams;
+
+typedef struct {
+    /* this fields are not changed once the thread is created */
+    /* channel number */
+    uint8_t id;
+    /* channel thread name */
+    char *name;
+    /* channel thread id */
+    QemuThread thread;
+    /* communication channel */
+    QIOChannel *c;
+    /* sem where to wait for more work */
+    QemuSemaphore sem;
+    /* this mutex protects the following parameters */
+    QemuMutex mutex;
+    /* is this channel thread running */
+    bool running;
+    /* should this thread finish */
+    bool quit;
+} MultiFDRecvParams;
+
+static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
+{
+    MultiFDInit_t msg;
+    int ret;
+
+    msg.magic = cpu_to_be32(MULTIFD_MAGIC);
+    msg.version = cpu_to_be32(MULTIFD_VERSION);
+    msg.id = p->id;
+    memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
+
+    ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp);
+    if (ret != 0) {
+        return -1;
+    }
+    return 0;
+}
+
+static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
+{
+    MultiFDInit_t msg;
+    int ret;
+
+    ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
+    if (ret != 0) {
+        return -1;
+    }
+
+    be32_to_cpus(&msg.magic);
+    be32_to_cpus(&msg.version);
+
+    if (msg.magic != MULTIFD_MAGIC) {
+        error_setg(errp, "multifd: received packet magic %x "
+                   "expected %x", msg.magic, MULTIFD_MAGIC);
+        return -1;
+    }
+
+    if (msg.version != MULTIFD_VERSION) {
+        error_setg(errp, "multifd: received packet version %d "
+                   "expected %d", msg.version, MULTIFD_VERSION);
+        return -1;
+    }
+
+    if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
+        char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
+        char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
+
+        error_setg(errp, "multifd: received uuid '%s' and expected "
+                   "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
+        g_free(uuid);
+        g_free(msg_uuid);
+        return -1;
+    }
+
+    if (msg.id > migrate_multifd_channels()) {
+        error_setg(errp, "multifd: received channel version %d "
+                   "expected %d", msg.version, MULTIFD_VERSION);
+        return -1;
+    }
+
+    return msg.id;
+}
 
 struct {
     MultiFDSendParams *params;
@@ -448,11 +618,23 @@ struct {
     int count;
 } *multifd_send_state;
 
-static void terminate_multifd_send_threads(Error *errp)
+static void multifd_send_terminate_threads(Error *err)
 {
     int i;
 
-    for (i = 0; i < multifd_send_state->count; i++) {
+    if (err) {
+        MigrationState *s = migrate_get_current();
+        migrate_set_error(s, err);
+        if (s->state == MIGRATION_STATUS_SETUP ||
+            s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
+            s->state == MIGRATION_STATUS_DEVICE ||
+            s->state == MIGRATION_STATUS_ACTIVE) {
+            migrate_set_state(&s->state, s->state,
+                              MIGRATION_STATUS_FAILED);
+        }
+    }
+
+    for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
         qemu_mutex_lock(&p->mutex);
@@ -470,11 +652,15 @@ int multifd_save_cleanup(Error **errp)
     if (!migrate_use_multifd()) {
         return 0;
     }
-    terminate_multifd_send_threads(NULL);
-    for (i = 0; i < multifd_send_state->count; i++) {
+    multifd_send_terminate_threads(NULL);
+    for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
-        qemu_thread_join(&p->thread);
+        if (p->running) {
+            qemu_thread_join(&p->thread);
+        }
+        socket_send_channel_destroy(p->c);
+        p->c = NULL;
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
         g_free(p->name);
@@ -490,6 +676,11 @@ int multifd_save_cleanup(Error **errp)
 static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *p = opaque;
+    Error *local_err = NULL;
+
+    if (multifd_send_initial_packet(p, &local_err) < 0) {
+        goto out;
+    }
 
     while (true) {
         qemu_mutex_lock(&p->mutex);
@@ -501,9 +692,39 @@ static void *multifd_send_thread(void *opaque)
         qemu_sem_wait(&p->sem);
     }
 
+out:
+    if (local_err) {
+        multifd_send_terminate_threads(local_err);
+    }
+
+    qemu_mutex_lock(&p->mutex);
+    p->running = false;
+    qemu_mutex_unlock(&p->mutex);
+
     return NULL;
 }
 
+static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
+{
+    MultiFDSendParams *p = opaque;
+    QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
+    Error *local_err = NULL;
+
+    if (qio_task_propagate_error(task, &local_err)) {
+        if (multifd_save_cleanup(&local_err) != 0) {
+            migrate_set_error(migrate_get_current(), local_err);
+        }
+    } else {
+        p->c = QIO_CHANNEL(sioc);
+        qio_channel_set_delay(p->c, false);
+        p->running = true;
+        qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
+                           QEMU_THREAD_JOINABLE);
+
+        atomic_inc(&multifd_send_state->count);
+    }
+}
+
 int multifd_save_setup(void)
 {
     int thread_count;
@@ -515,7 +736,7 @@ int multifd_save_setup(void)
     thread_count = migrate_multifd_channels();
     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
-    multifd_send_state->count = 0;
+    atomic_set(&multifd_send_state->count, 0);
     for (i = 0; i < thread_count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
@@ -524,35 +745,32 @@ int multifd_save_setup(void)
         p->quit = false;
         p->id = i;
         p->name = g_strdup_printf("multifdsend_%d", i);
-        qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
-                           QEMU_THREAD_JOINABLE);
-
-        multifd_send_state->count++;
+        socket_send_channel_create(multifd_new_send_channel_async, p);
     }
     return 0;
 }
 
-struct MultiFDRecvParams {
-    uint8_t id;
-    char *name;
-    QemuThread thread;
-    QemuSemaphore sem;
-    QemuMutex mutex;
-    bool quit;
-};
-typedef struct MultiFDRecvParams MultiFDRecvParams;
-
 struct {
     MultiFDRecvParams *params;
     /* number of created threads */
     int count;
 } *multifd_recv_state;
 
-static void terminate_multifd_recv_threads(Error *errp)
+static void multifd_recv_terminate_threads(Error *err)
 {
     int i;
 
-    for (i = 0; i < multifd_recv_state->count; i++) {
+    if (err) {
+        MigrationState *s = migrate_get_current();
+        migrate_set_error(s, err);
+        if (s->state == MIGRATION_STATUS_SETUP ||
+            s->state == MIGRATION_STATUS_ACTIVE) {
+            migrate_set_state(&s->state, s->state,
+                              MIGRATION_STATUS_FAILED);
+        }
+    }
+
+    for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
         qemu_mutex_lock(&p->mutex);
@@ -570,11 +788,15 @@ int multifd_load_cleanup(Error **errp)
     if (!migrate_use_multifd()) {
         return 0;
     }
-    terminate_multifd_recv_threads(NULL);
-    for (i = 0; i < multifd_recv_state->count; i++) {
+    multifd_recv_terminate_threads(NULL);
+    for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
-        qemu_thread_join(&p->thread);
+        if (p->running) {
+            qemu_thread_join(&p->thread);
+        }
+        object_unref(OBJECT(p->c));
+        p->c = NULL;
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
         g_free(p->name);
@@ -602,6 +824,10 @@ static void *multifd_recv_thread(void *opaque)
         qemu_sem_wait(&p->sem);
     }
 
+    qemu_mutex_lock(&p->mutex);
+    p->running = false;
+    qemu_mutex_unlock(&p->mutex);
+
     return NULL;
 }
 
@@ -616,7 +842,7 @@ int multifd_load_setup(void)
     thread_count = migrate_multifd_channels();
     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
-    multifd_recv_state->count = 0;
+    atomic_set(&multifd_recv_state->count, 0);
     for (i = 0; i < thread_count; i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
@@ -625,13 +851,52 @@ int multifd_load_setup(void)
         p->quit = false;
         p->id = i;
         p->name = g_strdup_printf("multifdrecv_%d", i);
-        qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
-                           QEMU_THREAD_JOINABLE);
-        multifd_recv_state->count++;
     }
     return 0;
 }
 
+bool multifd_recv_all_channels_created(void)
+{
+    int thread_count = migrate_multifd_channels();
+
+    if (!migrate_use_multifd()) {
+        return true;
+    }
+
+    return thread_count == atomic_read(&multifd_recv_state->count);
+}
+
+void multifd_recv_new_channel(QIOChannel *ioc)
+{
+    MultiFDRecvParams *p;
+    Error *local_err = NULL;
+    int id;
+
+    id = multifd_recv_initial_packet(ioc, &local_err);
+    if (id < 0) {
+        multifd_recv_terminate_threads(local_err);
+        return;
+    }
+
+    p = &multifd_recv_state->params[id];
+    if (p->c != NULL) {
+        error_setg(&local_err, "multifd: received id '%d' already setup'",
+                   id);
+        multifd_recv_terminate_threads(local_err);
+        return;
+    }
+    p->c = ioc;
+    object_ref(OBJECT(ioc));
+
+    p->running = true;
+    qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
+                       QEMU_THREAD_JOINABLE);
+    atomic_inc(&multifd_recv_state->count);
+    if (multifd_recv_state->count == migrate_multifd_channels()) {
+        migration_incoming_process();
+    }
+}
+
 /**
  * save_page_header: write page header to wire
  *
@@ -1490,7 +1755,7 @@ static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss,
      * CPU resource.
      */
     if (block == rs->last_sent_block && save_page_use_compression(rs)) {
-        res = compress_page_with_multi_thread(rs, block, offset);
+        return compress_page_with_multi_thread(rs, block, offset);
     }
 
     return ram_save_page(rs, pss, last_stage);
@@ -2226,6 +2491,41 @@ static int ram_init_all(RAMState **rsp)
     return 0;
 }
 
+static void ram_state_resume_prepare(RAMState *rs, QEMUFile *out)
+{
+    RAMBlock *block;
+    uint64_t pages = 0;
+
+    /*
+     * Postcopy is not using xbzrle/compression, so no need for that.
+     * Also, since source are already halted, we don't need to care
+     * about dirty page logging as well.
+     */
+
+    RAMBLOCK_FOREACH(block) {
+        pages += bitmap_count_one(block->bmap,
+                                  block->used_length >> TARGET_PAGE_BITS);
+    }
+
+    /* This may not be aligned with current bitmaps. Recalculate. */
+    rs->migration_dirty_pages = pages;
+
+    rs->last_seen_block = NULL;
+    rs->last_sent_block = NULL;
+    rs->last_page = 0;
+    rs->last_version = ram_list.version;
+    /*
+     * Disable the bulk stage, otherwise we'll resend the whole RAM no
+     * matter what we have sent.
+     */
+    rs->ram_bulk_stage = false;
+
+    /* Update RAMState cache of output QEMUFile */
+    rs->f = out;
+
+    trace_ram_state_resume_prepare(pages);
+}
+
 /*
  * Each of ram_save_setup, ram_save_iterate and ram_save_complete has
  * long-running RCU critical section.  When rcu-reclaims in the code
@@ -3100,6 +3400,139 @@ static bool ram_has_postcopy(void *opaque)
     return migrate_postcopy_ram();
 }
 
+/* Sync all the dirty bitmap with destination VM.  */
+static int ram_dirty_bitmap_sync_all(MigrationState *s, RAMState *rs)
+{
+    RAMBlock *block;
+    QEMUFile *file = s->to_dst_file;
+    int ramblock_count = 0;
+
+    trace_ram_dirty_bitmap_sync_start();
+
+    RAMBLOCK_FOREACH(block) {
+        qemu_savevm_send_recv_bitmap(file, block->idstr);
+        trace_ram_dirty_bitmap_request(block->idstr);
+        ramblock_count++;
+    }
+
+    trace_ram_dirty_bitmap_sync_wait();
+
+    /* Wait until all the ramblocks' dirty bitmap synced */
+    while (ramblock_count--) {
+        qemu_sem_wait(&s->rp_state.rp_sem);
+    }
+
+    trace_ram_dirty_bitmap_sync_complete();
+
+    return 0;
+}
+
+static void ram_dirty_bitmap_reload_notify(MigrationState *s)
+{
+    qemu_sem_post(&s->rp_state.rp_sem);
+}
+
+/*
+ * Read the received bitmap, revert it as the initial dirty bitmap.
+ * This is only used when the postcopy migration is paused but wants
+ * to resume from a middle point.
+ */
+int ram_dirty_bitmap_reload(MigrationState *s, RAMBlock *block)
+{
+    int ret = -EINVAL;
+    QEMUFile *file = s->rp_state.from_dst_file;
+    unsigned long *le_bitmap, nbits = block->used_length >> TARGET_PAGE_BITS;
+    uint64_t local_size = nbits / 8;
+    uint64_t size, end_mark;
+
+    trace_ram_dirty_bitmap_reload_begin(block->idstr);
+
+    if (s->state != MIGRATION_STATUS_POSTCOPY_RECOVER) {
+        error_report("%s: incorrect state %s", __func__,
+                     MigrationStatus_str(s->state));
+        return -EINVAL;
+    }
+
+    /*
+     * Note: see comments in ramblock_recv_bitmap_send() on why we
+     * need the endianess convertion, and the paddings.
+     */
+    local_size = ROUND_UP(local_size, 8);
+
+    /* Add paddings */
+    le_bitmap = bitmap_new(nbits + BITS_PER_LONG);
+
+    size = qemu_get_be64(file);
+
+    /* The size of the bitmap should match with our ramblock */
+    if (size != local_size) {
+        error_report("%s: ramblock '%s' bitmap size mismatch "
+                     "(0x%"PRIx64" != 0x%"PRIx64")", __func__,
+                     block->idstr, size, local_size);
+        ret = -EINVAL;
+        goto out;
+    }
+
+    size = qemu_get_buffer(file, (uint8_t *)le_bitmap, local_size);
+    end_mark = qemu_get_be64(file);
+
+    ret = qemu_file_get_error(file);
+    if (ret || size != local_size) {
+        error_report("%s: read bitmap failed for ramblock '%s': %d"
+                     " (size 0x%"PRIx64", got: 0x%"PRIx64")",
+                     __func__, block->idstr, ret, local_size, size);
+        ret = -EIO;
+        goto out;
+    }
+
+    if (end_mark != RAMBLOCK_RECV_BITMAP_ENDING) {
+        error_report("%s: ramblock '%s' end mark incorrect: 0x%"PRIu64,
+                     __func__, block->idstr, end_mark);
+        ret = -EINVAL;
+        goto out;
+    }
+
+    /*
+     * Endianess convertion. We are during postcopy (though paused).
+     * The dirty bitmap won't change. We can directly modify it.
+     */
+    bitmap_from_le(block->bmap, le_bitmap, nbits);
+
+    /*
+     * What we received is "received bitmap". Revert it as the initial
+     * dirty bitmap for this ramblock.
+     */
+    bitmap_complement(block->bmap, block->bmap, nbits);
+
+    trace_ram_dirty_bitmap_reload_complete(block->idstr);
+
+    /*
+     * We succeeded to sync bitmap for current ramblock. If this is
+     * the last one to sync, we need to notify the main send thread.
+     */
+    ram_dirty_bitmap_reload_notify(s);
+
+    ret = 0;
+out:
+    free(le_bitmap);
+    return ret;
+}
+
+static int ram_resume_prepare(MigrationState *s, void *opaque)
+{
+    RAMState *rs = *(RAMState **)opaque;
+    int ret;
+
+    ret = ram_dirty_bitmap_sync_all(s, rs);
+    if (ret) {
+        return ret;
+    }
+
+    ram_state_resume_prepare(rs, s->to_dst_file);
+
+    return 0;
+}
+
 static SaveVMHandlers savevm_ram_handlers = {
     .save_setup = ram_save_setup,
     .save_live_iterate = ram_save_iterate,
@@ -3111,6 +3544,7 @@ static SaveVMHandlers savevm_ram_handlers = {
     .save_cleanup = ram_save_cleanup,
     .load_setup = ram_load_setup,
     .load_cleanup = ram_load_cleanup,
+    .resume_prepare = ram_resume_prepare,
 };
 
 void ram_mig_init(void)