summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--hw/core/qdev-properties.c32
-rw-r--r--include/hw/qdev-properties.h3
-rw-r--r--include/migration/colo.h1
-rw-r--r--migration/colo.c5
-rw-r--r--migration/migration.c268
-rw-r--r--migration/rdma.c137
6 files changed, 343 insertions, 103 deletions
diff --git a/hw/core/qdev-properties.c b/hw/core/qdev-properties.c
index 58a8f92d92..078fc5d239 100644
--- a/hw/core/qdev-properties.c
+++ b/hw/core/qdev-properties.c
@@ -404,6 +404,31 @@ static void set_uint64(Object *obj, Visitor *v, const char *name,
     visit_type_uint64(v, name, ptr, errp);
 }
 
+static void get_int64(Object *obj, Visitor *v, const char *name,
+                      void *opaque, Error **errp)
+{
+    DeviceState *dev = DEVICE(obj);
+    Property *prop = opaque;
+    int64_t *ptr = qdev_get_prop_ptr(dev, prop);
+
+    visit_type_int64(v, name, ptr, errp);
+}
+
+static void set_int64(Object *obj, Visitor *v, const char *name,
+                      void *opaque, Error **errp)
+{
+    DeviceState *dev = DEVICE(obj);
+    Property *prop = opaque;
+    int64_t *ptr = qdev_get_prop_ptr(dev, prop);
+
+    if (dev->realized) {
+        qdev_prop_set_after_realize(dev, name, errp);
+        return;
+    }
+
+    visit_type_int64(v, name, ptr, errp);
+}
+
 const PropertyInfo qdev_prop_uint64 = {
     .name  = "uint64",
     .get   = get_uint64,
@@ -411,6 +436,13 @@ const PropertyInfo qdev_prop_uint64 = {
     .set_default_value = set_default_value_uint,
 };
 
+const PropertyInfo qdev_prop_int64 = {
+    .name  = "int64",
+    .get   = get_int64,
+    .set   = set_int64,
+    .set_default_value = set_default_value_int,
+};
+
 /* --- string --- */
 
 static void release_string(Object *obj, const char *name, void *opaque)
diff --git a/include/hw/qdev-properties.h b/include/hw/qdev-properties.h
index 39297961f3..e2321f1cc1 100644
--- a/include/hw/qdev-properties.h
+++ b/include/hw/qdev-properties.h
@@ -13,6 +13,7 @@ extern const PropertyInfo qdev_prop_uint16;
 extern const PropertyInfo qdev_prop_uint32;
 extern const PropertyInfo qdev_prop_int32;
 extern const PropertyInfo qdev_prop_uint64;
+extern const PropertyInfo qdev_prop_int64;
 extern const PropertyInfo qdev_prop_size;
 extern const PropertyInfo qdev_prop_string;
 extern const PropertyInfo qdev_prop_chr;
@@ -157,6 +158,8 @@ extern const PropertyInfo qdev_prop_link;
     DEFINE_PROP_SIGNED(_n, _s, _f, _d, qdev_prop_int32, int32_t)
 #define DEFINE_PROP_UINT64(_n, _s, _f, _d)                      \
     DEFINE_PROP_UNSIGNED(_n, _s, _f, _d, qdev_prop_uint64, uint64_t)
+#define DEFINE_PROP_INT64(_n, _s, _f, _d)                      \
+    DEFINE_PROP_SIGNED(_n, _s, _f, _d, qdev_prop_int64, int64_t)
 #define DEFINE_PROP_SIZE(_n, _s, _f, _d)                       \
     DEFINE_PROP_UNSIGNED(_n, _s, _f, _d, qdev_prop_size, uint64_t)
 #define DEFINE_PROP_PCI_DEVFN(_n, _s, _f, _d)                   \
diff --git a/include/migration/colo.h b/include/migration/colo.h
index be6beba301..ff9874ea16 100644
--- a/include/migration/colo.h
+++ b/include/migration/colo.h
@@ -15,7 +15,6 @@
 
 #include "qemu-common.h"
 
-bool colo_supported(void);
 void colo_info_init(void);
 
 void migrate_start_colo_process(MigrationState *s);
diff --git a/migration/colo.c b/migration/colo.c
index ef35f00c9a..a4255432ac 100644
--- a/migration/colo.c
+++ b/migration/colo.c
@@ -29,11 +29,6 @@ static bool vmstate_loading;
 
 #define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
 
-bool colo_supported(void)
-{
-    return true;
-}
-
 bool migration_in_colo_state(void)
 {
     MigrationState *s = migrate_get_current();
diff --git a/migration/migration.c b/migration/migration.c
index a0db40d364..76153914d1 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -102,14 +102,22 @@ enum mig_rp_message_type {
 
 static MigrationState *current_migration;
 
+static bool migration_object_check(MigrationState *ms, Error **errp);
+
 void migration_object_init(void)
 {
     MachineState *ms = MACHINE(qdev_get_machine());
+    Error *err = NULL;
 
     /* This can only be called once. */
     assert(!current_migration);
     current_migration = MIGRATION_OBJ(object_new(TYPE_MIGRATION));
 
+    if (!migration_object_check(current_migration, &err)) {
+        error_report_err(err);
+        exit(1);
+    }
+
     /*
      * We cannot really do this in migration_instance_init() since at
      * that time global properties are not yet applied, then this
@@ -348,6 +356,7 @@ static void process_incoming_migration_co(void *opaque)
         migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
                           MIGRATION_STATUS_FAILED);
         error_report("load of migration failed: %s", strerror(-ret));
+        qemu_fclose(mis->from_src_file);
         exit(EXIT_FAILURE);
     }
     mis->bh = qemu_bh_new(process_incoming_migration_bh, mis);
@@ -403,9 +412,6 @@ MigrationCapabilityStatusList *qmp_query_migrate_capabilities(Error **errp)
             continue;
         }
 #endif
-        if (i == MIGRATION_CAPABILITY_X_COLO && !colo_supported()) {
-            continue;
-        }
         if (head == NULL) {
             head = g_malloc0(sizeof(*caps));
             caps = head;
@@ -582,51 +588,49 @@ MigrationInfo *qmp_query_migrate(Error **errp)
     return info;
 }
 
-void qmp_migrate_set_capabilities(MigrationCapabilityStatusList *params,
-                                  Error **errp)
+/**
+ * @migration_caps_check - check capability validity
+ *
+ * @cap_list: old capability list, array of bool
+ * @params: new capabilities to be applied soon
+ * @errp: set *errp if the check failed, with reason
+ *
+ * Returns true if check passed, otherwise false.
+ */
+static bool migrate_caps_check(bool *cap_list,
+                               MigrationCapabilityStatusList *params,
+                               Error **errp)
 {
-    MigrationState *s = migrate_get_current();
     MigrationCapabilityStatusList *cap;
-    bool old_postcopy_cap = migrate_postcopy_ram();
+    bool old_postcopy_cap;
 
-    if (migration_is_setup_or_active(s->state)) {
-        error_setg(errp, QERR_MIGRATION_ACTIVE);
-        return;
-    }
+    old_postcopy_cap = cap_list[MIGRATION_CAPABILITY_POSTCOPY_RAM];
 
     for (cap = params; cap; cap = cap->next) {
+        cap_list[cap->value->capability] = cap->value->state;
+    }
+
 #ifndef CONFIG_LIVE_BLOCK_MIGRATION
-        if (cap->value->capability == MIGRATION_CAPABILITY_BLOCK
-            && cap->value->state) {
-            error_setg(errp, "QEMU compiled without old-style (blk/-b, inc/-i) "
-                       "block migration");
-            error_append_hint(errp, "Use drive_mirror+NBD instead.\n");
-            continue;
-        }
-#endif
-        if (cap->value->capability == MIGRATION_CAPABILITY_X_COLO) {
-            if (!colo_supported()) {
-                error_setg(errp, "COLO is not currently supported, please"
-                             " configure with --enable-colo option in order to"
-                             " support COLO feature");
-                continue;
-            }
-        }
-        s->enabled_capabilities[cap->value->capability] = cap->value->state;
+    if (cap_list[MIGRATION_CAPABILITY_BLOCK]) {
+        error_setg(errp, "QEMU compiled without old-style (blk/-b, inc/-i) "
+                   "block migration");
+        error_append_hint(errp, "Use drive_mirror+NBD instead.\n");
+        return false;
     }
+#endif
 
-    if (migrate_postcopy_ram()) {
-        if (migrate_use_compression()) {
+    if (cap_list[MIGRATION_CAPABILITY_POSTCOPY_RAM]) {
+        if (cap_list[MIGRATION_CAPABILITY_COMPRESS]) {
             /* The decompression threads asynchronously write into RAM
              * rather than use the atomic copies needed to avoid
              * userfaulting.  It should be possible to fix the decompression
              * threads for compatibility in future.
              */
-            error_report("Postcopy is not currently compatible with "
-                         "compression");
-            s->enabled_capabilities[MIGRATION_CAPABILITY_POSTCOPY_RAM] =
-                false;
+            error_setg(errp, "Postcopy is not currently compatible "
+                       "with compression");
+            return false;
         }
+
         /* This check is reasonably expensive, so only when it's being
          * set the first time, also it's only the destination that needs
          * special support.
@@ -636,96 +640,141 @@ void qmp_migrate_set_capabilities(MigrationCapabilityStatusList *params,
             /* postcopy_ram_supported_by_host will have emitted a more
              * detailed message
              */
-            error_report("Postcopy is not supported");
-            s->enabled_capabilities[MIGRATION_CAPABILITY_POSTCOPY_RAM] =
-                false;
+            error_setg(errp, "Postcopy is not supported");
+            return false;
         }
     }
+
+    return true;
 }
 
-void qmp_migrate_set_parameters(MigrationParameters *params, Error **errp)
+void qmp_migrate_set_capabilities(MigrationCapabilityStatusList *params,
+                                  Error **errp)
 {
     MigrationState *s = migrate_get_current();
+    MigrationCapabilityStatusList *cap;
 
+    if (migration_is_setup_or_active(s->state)) {
+        error_setg(errp, QERR_MIGRATION_ACTIVE);
+        return;
+    }
+
+    if (!migrate_caps_check(s->enabled_capabilities, params, errp)) {
+        return;
+    }
+
+    for (cap = params; cap; cap = cap->next) {
+        s->enabled_capabilities[cap->value->capability] = cap->value->state;
+    }
+}
+
+/*
+ * Check whether the parameters are valid. Error will be put into errp
+ * (if provided). Return true if valid, otherwise false.
+ */
+static bool migrate_params_check(MigrationParameters *params, Error **errp)
+{
     if (params->has_compress_level &&
         (params->compress_level < 0 || params->compress_level > 9)) {
         error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "compress_level",
                    "is invalid, it should be in the range of 0 to 9");
-        return;
+        return false;
     }
+
     if (params->has_compress_threads &&
         (params->compress_threads < 1 || params->compress_threads > 255)) {
         error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
                    "compress_threads",
                    "is invalid, it should be in the range of 1 to 255");
-        return;
+        return false;
     }
+
     if (params->has_decompress_threads &&
         (params->decompress_threads < 1 || params->decompress_threads > 255)) {
         error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
                    "decompress_threads",
                    "is invalid, it should be in the range of 1 to 255");
-        return;
+        return false;
     }
+
     if (params->has_cpu_throttle_initial &&
         (params->cpu_throttle_initial < 1 ||
          params->cpu_throttle_initial > 99)) {
         error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
                    "cpu_throttle_initial",
                    "an integer in the range of 1 to 99");
-        return;
+        return false;
     }
+
     if (params->has_cpu_throttle_increment &&
         (params->cpu_throttle_increment < 1 ||
          params->cpu_throttle_increment > 99)) {
         error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
                    "cpu_throttle_increment",
                    "an integer in the range of 1 to 99");
-        return;
+        return false;
     }
+
     if (params->has_max_bandwidth &&
         (params->max_bandwidth < 0 || params->max_bandwidth > SIZE_MAX)) {
         error_setg(errp, "Parameter 'max_bandwidth' expects an integer in the"
                          " range of 0 to %zu bytes/second", SIZE_MAX);
-        return;
+        return false;
     }
+
     if (params->has_downtime_limit &&
         (params->downtime_limit < 0 ||
          params->downtime_limit > MAX_MIGRATE_DOWNTIME)) {
         error_setg(errp, "Parameter 'downtime_limit' expects an integer in "
                          "the range of 0 to %d milliseconds",
                          MAX_MIGRATE_DOWNTIME);
-        return;
+        return false;
     }
+
     if (params->has_x_checkpoint_delay && (params->x_checkpoint_delay < 0)) {
         error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
                     "x_checkpoint_delay",
                     "is invalid, it should be positive");
+        return false;
     }
 
+    return true;
+}
+
+static void migrate_params_apply(MigrationParameters *params)
+{
+    MigrationState *s = migrate_get_current();
+
     if (params->has_compress_level) {
         s->parameters.compress_level = params->compress_level;
     }
+
     if (params->has_compress_threads) {
         s->parameters.compress_threads = params->compress_threads;
     }
+
     if (params->has_decompress_threads) {
         s->parameters.decompress_threads = params->decompress_threads;
     }
+
     if (params->has_cpu_throttle_initial) {
         s->parameters.cpu_throttle_initial = params->cpu_throttle_initial;
     }
+
     if (params->has_cpu_throttle_increment) {
         s->parameters.cpu_throttle_increment = params->cpu_throttle_increment;
     }
+
     if (params->has_tls_creds) {
         g_free(s->parameters.tls_creds);
         s->parameters.tls_creds = g_strdup(params->tls_creds);
     }
+
     if (params->has_tls_hostname) {
         g_free(s->parameters.tls_hostname);
         s->parameters.tls_hostname = g_strdup(params->tls_hostname);
     }
+
     if (params->has_max_bandwidth) {
         s->parameters.max_bandwidth = params->max_bandwidth;
         if (s->to_dst_file) {
@@ -733,6 +782,7 @@ void qmp_migrate_set_parameters(MigrationParameters *params, Error **errp)
                                 s->parameters.max_bandwidth / XFER_LIMIT_RATIO);
         }
     }
+
     if (params->has_downtime_limit) {
         s->parameters.downtime_limit = params->downtime_limit;
     }
@@ -743,11 +793,22 @@ void qmp_migrate_set_parameters(MigrationParameters *params, Error **errp)
             colo_checkpoint_notify(s);
         }
     }
+
     if (params->has_block_incremental) {
         s->parameters.block_incremental = params->block_incremental;
     }
 }
 
+void qmp_migrate_set_parameters(MigrationParameters *params, Error **errp)
+{
+    if (!migrate_params_check(params, errp)) {
+        /* Invalid parameter */
+        return;
+    }
+
+    migrate_params_apply(params);
+}
+
 
 void qmp_migrate_start_postcopy(Error **errp)
 {
@@ -781,14 +842,27 @@ void migrate_set_state(int *state, int old_state, int new_state)
     }
 }
 
-void migrate_set_block_enabled(bool value, Error **errp)
+static MigrationCapabilityStatusList *migrate_cap_add(
+    MigrationCapabilityStatusList *list,
+    MigrationCapability index,
+    bool state)
 {
     MigrationCapabilityStatusList *cap;
 
     cap = g_new0(MigrationCapabilityStatusList, 1);
     cap->value = g_new0(MigrationCapabilityStatus, 1);
-    cap->value->capability = MIGRATION_CAPABILITY_BLOCK;
-    cap->value->state = value;
+    cap->value->capability = index;
+    cap->value->state = state;
+    cap->next = list;
+
+    return cap;
+}
+
+void migrate_set_block_enabled(bool value, Error **errp)
+{
+    MigrationCapabilityStatusList *cap;
+
+    cap = migrate_cap_add(NULL, MIGRATION_CAPABILITY_BLOCK, value);
     qmp_migrate_set_capabilities(cap, errp);
     qapi_free_MigrationCapabilityStatusList(cap);
 }
@@ -2001,6 +2075,9 @@ void migration_global_dump(Monitor *mon)
                    ms->send_configuration, ms->send_section_footer);
 }
 
+#define DEFINE_PROP_MIG_CAP(name, x)             \
+    DEFINE_PROP_BOOL(name, MigrationState, enabled_capabilities[x], false)
+
 static Property migration_properties[] = {
     DEFINE_PROP_BOOL("store-global-state", MigrationState,
                      store_global_state, true),
@@ -2009,6 +2086,45 @@ static Property migration_properties[] = {
                      send_configuration, true),
     DEFINE_PROP_BOOL("send-section-footer", MigrationState,
                      send_section_footer, true),
+
+    /* Migration parameters */
+    DEFINE_PROP_INT64("x-compress-level", MigrationState,
+                      parameters.compress_level,
+                      DEFAULT_MIGRATE_COMPRESS_LEVEL),
+    DEFINE_PROP_INT64("x-compress-threads", MigrationState,
+                      parameters.compress_threads,
+                      DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT),
+    DEFINE_PROP_INT64("x-decompress-threads", MigrationState,
+                      parameters.decompress_threads,
+                      DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT),
+    DEFINE_PROP_INT64("x-cpu-throttle-initial", MigrationState,
+                      parameters.cpu_throttle_initial,
+                      DEFAULT_MIGRATE_CPU_THROTTLE_INITIAL),
+    DEFINE_PROP_INT64("x-cpu-throttle-increment", MigrationState,
+                      parameters.cpu_throttle_increment,
+                      DEFAULT_MIGRATE_CPU_THROTTLE_INCREMENT),
+    DEFINE_PROP_INT64("x-max-bandwidth", MigrationState,
+                      parameters.max_bandwidth, MAX_THROTTLE),
+    DEFINE_PROP_INT64("x-downtime-limit", MigrationState,
+                      parameters.downtime_limit,
+                      DEFAULT_MIGRATE_SET_DOWNTIME),
+    DEFINE_PROP_INT64("x-checkpoint-delay", MigrationState,
+                      parameters.x_checkpoint_delay,
+                      DEFAULT_MIGRATE_X_CHECKPOINT_DELAY),
+
+    /* Migration capabilities */
+    DEFINE_PROP_MIG_CAP("x-xbzrle", MIGRATION_CAPABILITY_XBZRLE),
+    DEFINE_PROP_MIG_CAP("x-rdma-pin-all", MIGRATION_CAPABILITY_RDMA_PIN_ALL),
+    DEFINE_PROP_MIG_CAP("x-auto-converge", MIGRATION_CAPABILITY_AUTO_CONVERGE),
+    DEFINE_PROP_MIG_CAP("x-zero-blocks", MIGRATION_CAPABILITY_ZERO_BLOCKS),
+    DEFINE_PROP_MIG_CAP("x-compress", MIGRATION_CAPABILITY_COMPRESS),
+    DEFINE_PROP_MIG_CAP("x-events", MIGRATION_CAPABILITY_EVENTS),
+    DEFINE_PROP_MIG_CAP("x-postcopy-ram", MIGRATION_CAPABILITY_POSTCOPY_RAM),
+    DEFINE_PROP_MIG_CAP("x-colo", MIGRATION_CAPABILITY_X_COLO),
+    DEFINE_PROP_MIG_CAP("x-release-ram", MIGRATION_CAPABILITY_RELEASE_RAM),
+    DEFINE_PROP_MIG_CAP("x-block", MIGRATION_CAPABILITY_BLOCK),
+    DEFINE_PROP_MIG_CAP("x-return-path", MIGRATION_CAPABILITY_RETURN_PATH),
+
     DEFINE_PROP_END_OF_LIST(),
 };
 
@@ -2023,22 +2139,54 @@ static void migration_class_init(ObjectClass *klass, void *data)
 static void migration_instance_init(Object *obj)
 {
     MigrationState *ms = MIGRATION_OBJ(obj);
+    MigrationParameters *params = &ms->parameters;
 
     ms->state = MIGRATION_STATUS_NONE;
     ms->xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE;
     ms->mbps = -1;
-    ms->parameters = (MigrationParameters) {
-        .compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL,
-        .compress_threads = DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT,
-        .decompress_threads = DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT,
-        .cpu_throttle_initial = DEFAULT_MIGRATE_CPU_THROTTLE_INITIAL,
-        .cpu_throttle_increment = DEFAULT_MIGRATE_CPU_THROTTLE_INCREMENT,
-        .max_bandwidth = MAX_THROTTLE,
-        .downtime_limit = DEFAULT_MIGRATE_SET_DOWNTIME,
-        .x_checkpoint_delay = DEFAULT_MIGRATE_X_CHECKPOINT_DELAY,
-    };
-    ms->parameters.tls_creds = g_strdup("");
-    ms->parameters.tls_hostname = g_strdup("");
+
+    params->tls_hostname = g_strdup("");
+    params->tls_creds = g_strdup("");
+
+    /* Set has_* up only for parameter checks */
+    params->has_compress_level = true;
+    params->has_compress_threads = true;
+    params->has_decompress_threads = true;
+    params->has_cpu_throttle_initial = true;
+    params->has_cpu_throttle_increment = true;
+    params->has_max_bandwidth = true;
+    params->has_downtime_limit = true;
+    params->has_x_checkpoint_delay = true;
+    params->has_block_incremental = true;
+}
+
+/*
+ * Return true if check pass, false otherwise. Error will be put
+ * inside errp if provided.
+ */
+static bool migration_object_check(MigrationState *ms, Error **errp)
+{
+    MigrationCapabilityStatusList *head = NULL;
+    /* Assuming all off */
+    bool cap_list[MIGRATION_CAPABILITY__MAX] = { 0 }, ret;
+    int i;
+
+    if (!migrate_params_check(&ms->parameters, errp)) {
+        return false;
+    }
+
+    for (i = 0; i < MIGRATION_CAPABILITY__MAX; i++) {
+        if (ms->enabled_capabilities[i]) {
+            head = migrate_cap_add(head, i, true);
+        }
+    }
+
+    ret = migrate_caps_check(cap_list, head, errp);
+
+    /* It works with head == NULL */
+    qapi_free_MigrationCapabilityStatusList(head);
+
+    return ret;
 }
 
 static const TypeInfo migration_type = {
diff --git a/migration/rdma.c b/migration/rdma.c
index c6bc607a03..ca56594328 100644
--- a/migration/rdma.c
+++ b/migration/rdma.c
@@ -165,20 +165,6 @@ enum {
     RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
 };
 
-static const char *control_desc[] = {
-    [RDMA_CONTROL_NONE] = "NONE",
-    [RDMA_CONTROL_ERROR] = "ERROR",
-    [RDMA_CONTROL_READY] = "READY",
-    [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
-    [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
-    [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
-    [RDMA_CONTROL_COMPRESS] = "COMPRESS",
-    [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
-    [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
-    [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
-    [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
-    [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
-};
 
 /*
  * Memory and MR structures used to represent an IB Send/Recv work request.
@@ -251,6 +237,30 @@ typedef struct QEMU_PACKED RDMADestBlock {
     uint32_t padding;
 } RDMADestBlock;
 
+static const char *control_desc(unsigned int rdma_control)
+{
+    static const char *strs[] = {
+        [RDMA_CONTROL_NONE] = "NONE",
+        [RDMA_CONTROL_ERROR] = "ERROR",
+        [RDMA_CONTROL_READY] = "READY",
+        [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
+        [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
+        [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
+        [RDMA_CONTROL_COMPRESS] = "COMPRESS",
+        [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
+        [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
+        [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
+        [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
+        [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
+    };
+
+    if (rdma_control > RDMA_CONTROL_UNREGISTER_FINISHED) {
+        return "??BAD CONTROL VALUE??";
+    }
+
+    return strs[rdma_control];
+}
+
 static uint64_t htonll(uint64_t v)
 {
     union { uint32_t lv[2]; uint64_t llv; } u;
@@ -1466,6 +1476,56 @@ static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
     return  0;
 }
 
+/* Wait for activity on the completion channel.
+ * Returns 0 on success, none-0 on error.
+ */
+static int qemu_rdma_wait_comp_channel(RDMAContext *rdma)
+{
+    /*
+     * Coroutine doesn't start until migration_fd_process_incoming()
+     * so don't yield unless we know we're running inside of a coroutine.
+     */
+    if (rdma->migration_started_on_destination) {
+        yield_until_fd_readable(rdma->comp_channel->fd);
+    } else {
+        /* This is the source side, we're in a separate thread
+         * or destination prior to migration_fd_process_incoming()
+         * we can't yield; so we have to poll the fd.
+         * But we need to be able to handle 'cancel' or an error
+         * without hanging forever.
+         */
+        while (!rdma->error_state  && !rdma->received_error) {
+            GPollFD pfds[1];
+            pfds[0].fd = rdma->comp_channel->fd;
+            pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
+            /* 0.1s timeout, should be fine for a 'cancel' */
+            switch (qemu_poll_ns(pfds, 1, 100 * 1000 * 1000)) {
+            case 1: /* fd active */
+                return 0;
+
+            case 0: /* Timeout, go around again */
+                break;
+
+            default: /* Error of some type -
+                      * I don't trust errno from qemu_poll_ns
+                     */
+                error_report("%s: poll failed", __func__);
+                return -EPIPE;
+            }
+
+            if (migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) {
+                /* Bail out and let the cancellation happen */
+                return -EPIPE;
+            }
+        }
+    }
+
+    if (rdma->received_error) {
+        return -EPIPE;
+    }
+    return rdma->error_state;
+}
+
 /*
  * Block until the next work request has completed.
  *
@@ -1513,22 +1573,21 @@ static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
     }
 
     while (1) {
-        /*
-         * Coroutine doesn't start until migration_fd_process_incoming()
-         * so don't yield unless we know we're running inside of a coroutine.
-         */
-        if (rdma->migration_started_on_destination) {
-            yield_until_fd_readable(rdma->comp_channel->fd);
+        ret = qemu_rdma_wait_comp_channel(rdma);
+        if (ret) {
+            goto err_block_for_wrid;
         }
 
-        if (ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx)) {
+        ret = ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx);
+        if (ret) {
             perror("ibv_get_cq_event");
             goto err_block_for_wrid;
         }
 
         num_cq_events++;
 
-        if (ibv_req_notify_cq(cq, 0)) {
+        ret = -ibv_req_notify_cq(cq, 0);
+        if (ret) {
             goto err_block_for_wrid;
         }
 
@@ -1564,6 +1623,8 @@ err_block_for_wrid:
     if (num_cq_events) {
         ibv_ack_cq_events(cq, num_cq_events);
     }
+
+    rdma->error_state = ret;
     return ret;
 }
 
@@ -1590,7 +1651,7 @@ static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
                                    .num_sge = 1,
                                 };
 
-    trace_qemu_rdma_post_send_control(control_desc[head->type]);
+    trace_qemu_rdma_post_send_control(control_desc(head->type));
 
     /*
      * We don't actually need to do a memcpy() in here if we used
@@ -1669,16 +1730,16 @@ static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
     network_to_control((void *) rdma->wr_data[idx].control);
     memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
 
-    trace_qemu_rdma_exchange_get_response_start(control_desc[expecting]);
+    trace_qemu_rdma_exchange_get_response_start(control_desc(expecting));
 
     if (expecting == RDMA_CONTROL_NONE) {
-        trace_qemu_rdma_exchange_get_response_none(control_desc[head->type],
+        trace_qemu_rdma_exchange_get_response_none(control_desc(head->type),
                                              head->type);
     } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
         error_report("Was expecting a %s (%d) control message"
                 ", but got: %s (%d), length: %d",
-                control_desc[expecting], expecting,
-                control_desc[head->type], head->type, head->len);
+                control_desc(expecting), expecting,
+                control_desc(head->type), head->type, head->len);
         if (head->type == RDMA_CONTROL_ERROR) {
             rdma->received_error = true;
         }
@@ -1788,7 +1849,7 @@ static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
             }
         }
 
-        trace_qemu_rdma_exchange_send_waiting(control_desc[resp->type]);
+        trace_qemu_rdma_exchange_send_waiting(control_desc(resp->type));
         ret = qemu_rdma_exchange_get_response(rdma, resp,
                                               resp->type, RDMA_WRID_DATA);
 
@@ -1800,7 +1861,7 @@ static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
         if (resp_idx) {
             *resp_idx = RDMA_WRID_DATA;
         }
-        trace_qemu_rdma_exchange_send_received(control_desc[resp->type]);
+        trace_qemu_rdma_exchange_send_received(control_desc(resp->type));
     }
 
     rdma->control_ready_expected = 1;
@@ -2208,7 +2269,9 @@ static void qemu_rdma_cleanup(RDMAContext *rdma)
     int ret, idx;
 
     if (rdma->cm_id && rdma->connected) {
-        if (rdma->error_state && !rdma->received_error) {
+        if ((rdma->error_state ||
+             migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) &&
+            !rdma->received_error) {
             RDMAControlHeader head = { .len = 0,
                                        .type = RDMA_CONTROL_ERROR,
                                        .repeat = 1,
@@ -2365,6 +2428,12 @@ static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
 
     caps_to_network(&cap);
 
+    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
+    if (ret) {
+        ERROR(errp, "posting second control recv");
+        goto err_rdma_source_connect;
+    }
+
     ret = rdma_connect(rdma->cm_id, &conn_param);
     if (ret) {
         perror("rdma_connect");
@@ -2405,12 +2474,6 @@ static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
 
     rdma_ack_cm_event(cm_event);
 
-    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
-    if (ret) {
-        ERROR(errp, "posting second control recv!");
-        goto err_rdma_source_connect;
-    }
-
     rdma->control_ready_expected = 1;
     rdma->nb_sent = 0;
     return 0;
@@ -3350,7 +3413,7 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
             ret = -EIO;
             goto out;
         default:
-            error_report("Unknown control message %s", control_desc[head.type]);
+            error_report("Unknown control message %s", control_desc(head.type));
             ret = -EIO;
             goto out;
         }