summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--block.c96
-rw-r--r--block/backup.c2
-rw-r--r--block/block-backend.c5
-rw-r--r--block/dirty-bitmap.c57
-rw-r--r--block/io.c332
-rw-r--r--block/mirror.c613
-rw-r--r--block/vvfat.c1
-rw-r--r--blockdev.c9
-rw-r--r--blockjob.c23
-rw-r--r--include/block/aio-wait.h25
-rw-r--r--include/block/block.h31
-rw-r--r--include/block/block_int.h18
-rw-r--r--include/block/blockjob_int.h8
-rw-r--r--include/block/dirty-bitmap.h2
-rw-r--r--include/qemu/hbitmap.h5
-rw-r--r--include/qemu/job.h15
-rw-r--r--job.c5
-rw-r--r--qapi/block-core.json29
-rwxr-xr-xtests/qemu-iotests/151120
-rw-r--r--tests/qemu-iotests/151.out5
-rw-r--r--tests/qemu-iotests/group1
-rw-r--r--tests/test-bdrv-drain.c705
-rw-r--r--tests/test-hbitmap.c38
-rw-r--r--util/hbitmap.c10
24 files changed, 1836 insertions, 319 deletions
diff --git a/block.c b/block.c
index afe30caac3..1b8147c1b3 100644
--- a/block.c
+++ b/block.c
@@ -333,6 +333,10 @@ BlockDriverState *bdrv_new(void)
 
     qemu_co_queue_init(&bs->flush_queue);
 
+    for (i = 0; i < bdrv_drain_all_count; i++) {
+        bdrv_drained_begin(bs);
+    }
+
     QTAILQ_INSERT_TAIL(&all_bdrv_states, bs, bs_list);
 
     return bs;
@@ -818,7 +822,13 @@ static char *bdrv_child_get_parent_desc(BdrvChild *c)
 static void bdrv_child_cb_drained_begin(BdrvChild *child)
 {
     BlockDriverState *bs = child->opaque;
-    bdrv_drained_begin(bs);
+    bdrv_do_drained_begin_quiesce(bs, NULL, false);
+}
+
+static bool bdrv_child_cb_drained_poll(BdrvChild *child)
+{
+    BlockDriverState *bs = child->opaque;
+    return bdrv_drain_poll(bs, false, NULL, false);
 }
 
 static void bdrv_child_cb_drained_end(BdrvChild *child)
@@ -902,9 +912,11 @@ static void bdrv_inherited_options(int *child_flags, QDict *child_options,
 }
 
 const BdrvChildRole child_file = {
+    .parent_is_bds   = true,
     .get_parent_desc = bdrv_child_get_parent_desc,
     .inherit_options = bdrv_inherited_options,
     .drained_begin   = bdrv_child_cb_drained_begin,
+    .drained_poll    = bdrv_child_cb_drained_poll,
     .drained_end     = bdrv_child_cb_drained_end,
     .attach          = bdrv_child_cb_attach,
     .detach          = bdrv_child_cb_detach,
@@ -926,9 +938,11 @@ static void bdrv_inherited_fmt_options(int *child_flags, QDict *child_options,
 }
 
 const BdrvChildRole child_format = {
+    .parent_is_bds   = true,
     .get_parent_desc = bdrv_child_get_parent_desc,
     .inherit_options = bdrv_inherited_fmt_options,
     .drained_begin   = bdrv_child_cb_drained_begin,
+    .drained_poll    = bdrv_child_cb_drained_poll,
     .drained_end     = bdrv_child_cb_drained_end,
     .attach          = bdrv_child_cb_attach,
     .detach          = bdrv_child_cb_detach,
@@ -1043,11 +1057,13 @@ static int bdrv_backing_update_filename(BdrvChild *c, BlockDriverState *base,
 }
 
 const BdrvChildRole child_backing = {
+    .parent_is_bds   = true,
     .get_parent_desc = bdrv_child_get_parent_desc,
     .attach          = bdrv_backing_attach,
     .detach          = bdrv_backing_detach,
     .inherit_options = bdrv_backing_options,
     .drained_begin   = bdrv_child_cb_drained_begin,
+    .drained_poll    = bdrv_child_cb_drained_poll,
     .drained_end     = bdrv_child_cb_drained_end,
     .inactivate      = bdrv_child_cb_inactivate,
     .update_filename = bdrv_backing_update_filename,
@@ -1152,7 +1168,7 @@ static int bdrv_open_driver(BlockDriverState *bs, BlockDriver *drv,
                             int open_flags, Error **errp)
 {
     Error *local_err = NULL;
-    int ret;
+    int i, ret;
 
     bdrv_assign_node_name(bs, node_name, &local_err);
     if (local_err) {
@@ -1200,6 +1216,12 @@ static int bdrv_open_driver(BlockDriverState *bs, BlockDriver *drv,
     assert(bdrv_min_mem_align(bs) != 0);
     assert(is_power_of_2(bs->bl.request_alignment));
 
+    for (i = 0; i < bs->quiesce_counter; i++) {
+        if (drv->bdrv_co_drain_begin) {
+            drv->bdrv_co_drain_begin(bs);
+        }
+    }
+
     return 0;
 open_failed:
     bs->drv = NULL;
@@ -2021,7 +2043,12 @@ static void bdrv_replace_child_noperm(BdrvChild *child,
             child->role->detach(child);
         }
         if (old_bs->quiesce_counter && child->role->drained_end) {
-            for (i = 0; i < old_bs->quiesce_counter; i++) {
+            int num = old_bs->quiesce_counter;
+            if (child->role->parent_is_bds) {
+                num -= bdrv_drain_all_count;
+            }
+            assert(num >= 0);
+            for (i = 0; i < num; i++) {
                 child->role->drained_end(child);
             }
         }
@@ -2033,7 +2060,12 @@ static void bdrv_replace_child_noperm(BdrvChild *child,
     if (new_bs) {
         QLIST_INSERT_HEAD(&new_bs->parents, child, next_parent);
         if (new_bs->quiesce_counter && child->role->drained_begin) {
-            for (i = 0; i < new_bs->quiesce_counter; i++) {
+            int num = new_bs->quiesce_counter;
+            if (child->role->parent_is_bds) {
+                num -= bdrv_drain_all_count;
+            }
+            assert(num >= 0);
+            for (i = 0; i < num; i++) {
                 child->role->drained_begin(child);
             }
         }
@@ -3395,16 +3427,39 @@ static bool should_update_child(BdrvChild *c, BlockDriverState *to)
         return false;
     }
 
-    if (c->role == &child_backing) {
-        /* If @from is a backing file of @to, ignore the child to avoid
-         * creating a loop. We only want to change the pointer of other
-         * parents. */
-        QLIST_FOREACH(to_c, &to->children, next) {
-            if (to_c == c) {
-                break;
-            }
-        }
-        if (to_c) {
+    /* If the child @c belongs to the BDS @to, replacing the current
+     * c->bs by @to would mean to create a loop.
+     *
+     * Such a case occurs when appending a BDS to a backing chain.
+     * For instance, imagine the following chain:
+     *
+     *   guest device -> node A -> further backing chain...
+     *
+     * Now we create a new BDS B which we want to put on top of this
+     * chain, so we first attach A as its backing node:
+     *
+     *                   node B
+     *                     |
+     *                     v
+     *   guest device -> node A -> further backing chain...
+     *
+     * Finally we want to replace A by B.  When doing that, we want to
+     * replace all pointers to A by pointers to B -- except for the
+     * pointer from B because (1) that would create a loop, and (2)
+     * that pointer should simply stay intact:
+     *
+     *   guest device -> node B
+     *                     |
+     *                     v
+     *                   node A -> further backing chain...
+     *
+     * In general, when replacing a node A (c->bs) by a node B (@to),
+     * if A is a child of B, that means we cannot replace A by B there
+     * because that would create a loop.  Silently detaching A from B
+     * is also not really an option.  So overall just leaving A in
+     * place there is the most sensible choice. */
+    QLIST_FOREACH(to_c, &to->children, next) {
+        if (to_c == c) {
             return false;
         }
     }
@@ -3430,6 +3485,7 @@ void bdrv_replace_node(BlockDriverState *from, BlockDriverState *to,
 
     /* Put all parents into @list and calculate their cumulative permissions */
     QLIST_FOREACH_SAFE(c, &from->parents, next_parent, next) {
+        assert(c->bs == from);
         if (!should_update_child(c, to)) {
             continue;
         }
@@ -4037,6 +4093,14 @@ BlockDriverState *bdrv_next_node(BlockDriverState *bs)
     return QTAILQ_NEXT(bs, node_list);
 }
 
+BlockDriverState *bdrv_next_all_states(BlockDriverState *bs)
+{
+    if (!bs) {
+        return QTAILQ_FIRST(&all_bdrv_states);
+    }
+    return QTAILQ_NEXT(bs, bs_list);
+}
+
 const char *bdrv_get_node_name(const BlockDriverState *bs)
 {
     return bs->node_name;
@@ -4948,7 +5012,7 @@ void bdrv_set_aio_context(BlockDriverState *bs, AioContext *new_context)
     AioContext *ctx = bdrv_get_aio_context(bs);
 
     aio_disable_external(ctx);
-    bdrv_parent_drained_begin(bs, NULL);
+    bdrv_parent_drained_begin(bs, NULL, false);
     bdrv_drain(bs); /* ensure there are no in-flight requests */
 
     while (aio_poll(ctx, false)) {
@@ -4962,7 +5026,7 @@ void bdrv_set_aio_context(BlockDriverState *bs, AioContext *new_context)
      */
     aio_context_acquire(new_context);
     bdrv_attach_aio_context(bs, new_context);
-    bdrv_parent_drained_end(bs, NULL);
+    bdrv_parent_drained_end(bs, NULL, false);
     aio_enable_external(ctx);
     aio_context_release(new_context);
 }
diff --git a/block/backup.c b/block/backup.c
index 5661435675..d18be40caf 100644
--- a/block/backup.c
+++ b/block/backup.c
@@ -354,7 +354,7 @@ static int coroutine_fn backup_run_incremental(BackupBlockJob *job)
     HBitmapIter hbi;
 
     hbitmap_iter_init(&hbi, job->copy_bitmap, 0);
-    while ((cluster = hbitmap_iter_next(&hbi)) != -1) {
+    while ((cluster = hbitmap_iter_next(&hbi, true)) != -1) {
         do {
             if (yield_and_check(job)) {
                 return 0;
diff --git a/block/block-backend.c b/block/block-backend.c
index 2d1a3463e8..6b75bca317 100644
--- a/block/block-backend.c
+++ b/block/block-backend.c
@@ -767,6 +767,11 @@ void blk_remove_bs(BlockBackend *blk)
 
     blk_update_root_state(blk);
 
+    /* bdrv_root_unref_child() will cause blk->root to become stale and may
+     * switch to a completion coroutine later on. Let's drain all I/O here
+     * to avoid that and a potential QEMU crash.
+     */
+    blk_drain(blk);
     bdrv_root_unref_child(blk->root);
     blk->root = NULL;
 }
diff --git a/block/dirty-bitmap.c b/block/dirty-bitmap.c
index 383d742cdb..db1782ec1f 100644
--- a/block/dirty-bitmap.c
+++ b/block/dirty-bitmap.c
@@ -519,7 +519,62 @@ void bdrv_dirty_iter_free(BdrvDirtyBitmapIter *iter)
 
 int64_t bdrv_dirty_iter_next(BdrvDirtyBitmapIter *iter)
 {
-    return hbitmap_iter_next(&iter->hbi);
+    return hbitmap_iter_next(&iter->hbi, true);
+}
+
+/**
+ * Return the next consecutively dirty area in the dirty bitmap
+ * belonging to the given iterator @iter.
+ *
+ * @max_offset: Maximum value that may be returned for
+ *              *offset + *bytes
+ * @offset:     Will contain the start offset of the next dirty area
+ * @bytes:      Will contain the length of the next dirty area
+ *
+ * Returns: True if a dirty area could be found before max_offset
+ *          (which means that *offset and *bytes then contain valid
+ *          values), false otherwise.
+ *
+ * Note that @iter is never advanced if false is returned.  If an area
+ * is found (which means that true is returned), it will be advanced
+ * past that area.
+ */
+bool bdrv_dirty_iter_next_area(BdrvDirtyBitmapIter *iter, uint64_t max_offset,
+                               uint64_t *offset, int *bytes)
+{
+    uint32_t granularity = bdrv_dirty_bitmap_granularity(iter->bitmap);
+    uint64_t gran_max_offset;
+    int64_t ret;
+    int size;
+
+    if (max_offset == iter->bitmap->size) {
+        /* If max_offset points to the image end, round it up by the
+         * bitmap granularity */
+        gran_max_offset = ROUND_UP(max_offset, granularity);
+    } else {
+        gran_max_offset = max_offset;
+    }
+
+    ret = hbitmap_iter_next(&iter->hbi, false);
+    if (ret < 0 || ret + granularity > gran_max_offset) {
+        return false;
+    }
+
+    *offset = ret;
+    size = 0;
+
+    assert(granularity <= INT_MAX);
+
+    do {
+        /* Advance iterator */
+        ret = hbitmap_iter_next(&iter->hbi, true);
+        size += granularity;
+    } while (ret + granularity <= gran_max_offset &&
+             hbitmap_iter_next(&iter->hbi, false) == ret + granularity &&
+             size <= INT_MAX - granularity);
+
+    *bytes = MIN(size, max_offset - *offset);
+    return true;
 }
 
 /* Called within bdrv_dirty_bitmap_lock..unlock */
diff --git a/block/io.c b/block/io.c
index b7beaeeb9f..ef4fedd364 100644
--- a/block/io.c
+++ b/block/io.c
@@ -38,15 +38,18 @@
 /* Maximum bounce buffer for copy-on-read and write zeroes, in bytes */
 #define MAX_BOUNCE_BUFFER (32768 << BDRV_SECTOR_BITS)
 
+static AioWait drain_all_aio_wait;
+
 static int coroutine_fn bdrv_co_do_pwrite_zeroes(BlockDriverState *bs,
     int64_t offset, int bytes, BdrvRequestFlags flags);
 
-void bdrv_parent_drained_begin(BlockDriverState *bs, BdrvChild *ignore)
+void bdrv_parent_drained_begin(BlockDriverState *bs, BdrvChild *ignore,
+                               bool ignore_bds_parents)
 {
     BdrvChild *c, *next;
 
     QLIST_FOREACH_SAFE(c, &bs->parents, next_parent, next) {
-        if (c == ignore) {
+        if (c == ignore || (ignore_bds_parents && c->role->parent_is_bds)) {
             continue;
         }
         if (c->role->drained_begin) {
@@ -55,12 +58,13 @@ void bdrv_parent_drained_begin(BlockDriverState *bs, BdrvChild *ignore)
     }
 }
 
-void bdrv_parent_drained_end(BlockDriverState *bs, BdrvChild *ignore)
+void bdrv_parent_drained_end(BlockDriverState *bs, BdrvChild *ignore,
+                             bool ignore_bds_parents)
 {
     BdrvChild *c, *next;
 
     QLIST_FOREACH_SAFE(c, &bs->parents, next_parent, next) {
-        if (c == ignore) {
+        if (c == ignore || (ignore_bds_parents && c->role->parent_is_bds)) {
             continue;
         }
         if (c->role->drained_end) {
@@ -69,6 +73,24 @@ void bdrv_parent_drained_end(BlockDriverState *bs, BdrvChild *ignore)
     }
 }
 
+static bool bdrv_parent_drained_poll(BlockDriverState *bs, BdrvChild *ignore,
+                                     bool ignore_bds_parents)
+{
+    BdrvChild *c, *next;
+    bool busy = false;
+
+    QLIST_FOREACH_SAFE(c, &bs->parents, next_parent, next) {
+        if (c == ignore || (ignore_bds_parents && c->role->parent_is_bds)) {
+            continue;
+        }
+        if (c->role->drained_poll) {
+            busy |= c->role->drained_poll(c);
+        }
+    }
+
+    return busy;
+}
+
 static void bdrv_merge_limits(BlockLimits *dst, const BlockLimits *src)
 {
     dst->opt_transfer = MAX(dst->opt_transfer, src->opt_transfer);
@@ -148,7 +170,9 @@ typedef struct {
     bool done;
     bool begin;
     bool recursive;
+    bool poll;
     BdrvChild *parent;
+    bool ignore_bds_parents;
 } BdrvCoDrainData;
 
 static void coroutine_fn bdrv_drain_invoke_entry(void *opaque)
@@ -164,67 +188,83 @@ static void coroutine_fn bdrv_drain_invoke_entry(void *opaque)
 
     /* Set data->done before reading bs->wakeup.  */
     atomic_mb_set(&data->done, true);
-    bdrv_wakeup(bs);
+    bdrv_dec_in_flight(bs);
+
+    if (data->begin) {
+        g_free(data);
+    }
 }
 
 /* Recursively call BlockDriver.bdrv_co_drain_begin/end callbacks */
-static void bdrv_drain_invoke(BlockDriverState *bs, bool begin, bool recursive)
+static void bdrv_drain_invoke(BlockDriverState *bs, bool begin)
 {
-    BdrvChild *child, *tmp;
-    BdrvCoDrainData data = { .bs = bs, .done = false, .begin = begin};
+    BdrvCoDrainData *data;
 
     if (!bs->drv || (begin && !bs->drv->bdrv_co_drain_begin) ||
             (!begin && !bs->drv->bdrv_co_drain_end)) {
         return;
     }
 
-    data.co = qemu_coroutine_create(bdrv_drain_invoke_entry, &data);
-    bdrv_coroutine_enter(bs, data.co);
-    BDRV_POLL_WHILE(bs, !data.done);
+    data = g_new(BdrvCoDrainData, 1);
+    *data = (BdrvCoDrainData) {
+        .bs = bs,
+        .done = false,
+        .begin = begin
+    };
 
-    if (recursive) {
-        QLIST_FOREACH_SAFE(child, &bs->children, next, tmp) {
-            bdrv_drain_invoke(child->bs, begin, true);
-        }
+    /* Make sure the driver callback completes during the polling phase for
+     * drain_begin. */
+    bdrv_inc_in_flight(bs);
+    data->co = qemu_coroutine_create(bdrv_drain_invoke_entry, data);
+    aio_co_schedule(bdrv_get_aio_context(bs), data->co);
+
+    if (!begin) {
+        BDRV_POLL_WHILE(bs, !data->done);
+        g_free(data);
     }
 }
 
-static bool bdrv_drain_recurse(BlockDriverState *bs)
+/* Returns true if BDRV_POLL_WHILE() should go into a blocking aio_poll() */
+bool bdrv_drain_poll(BlockDriverState *bs, bool recursive,
+                     BdrvChild *ignore_parent, bool ignore_bds_parents)
 {
-    BdrvChild *child, *tmp;
-    bool waited;
+    BdrvChild *child, *next;
 
-    /* Wait for drained requests to finish */
-    waited = BDRV_POLL_WHILE(bs, atomic_read(&bs->in_flight) > 0);
-
-    QLIST_FOREACH_SAFE(child, &bs->children, next, tmp) {
-        BlockDriverState *bs = child->bs;
-        bool in_main_loop =
-            qemu_get_current_aio_context() == qemu_get_aio_context();
-        assert(bs->refcnt > 0);
-        if (in_main_loop) {
-            /* In case the recursive bdrv_drain_recurse processes a
-             * block_job_defer_to_main_loop BH and modifies the graph,
-             * let's hold a reference to bs until we are done.
-             *
-             * IOThread doesn't have such a BH, and it is not safe to call
-             * bdrv_unref without BQL, so skip doing it there.
-             */
-            bdrv_ref(bs);
-        }
-        waited |= bdrv_drain_recurse(bs);
-        if (in_main_loop) {
-            bdrv_unref(bs);
+    if (bdrv_parent_drained_poll(bs, ignore_parent, ignore_bds_parents)) {
+        return true;
+    }
+
+    if (atomic_read(&bs->in_flight)) {
+        return true;
+    }
+
+    if (recursive) {
+        assert(!ignore_bds_parents);
+        QLIST_FOREACH_SAFE(child, &bs->children, next, next) {
+            if (bdrv_drain_poll(child->bs, recursive, child, false)) {
+                return true;
+            }
         }
     }
 
-    return waited;
+    return false;
+}
+
+static bool bdrv_drain_poll_top_level(BlockDriverState *bs, bool recursive,
+                                      BdrvChild *ignore_parent)
+{
+    /* Execute pending BHs first and check everything else only after the BHs
+     * have executed. */
+    while (aio_poll(bs->aio_context, false));
+
+    return bdrv_drain_poll(bs, recursive, ignore_parent, false);
 }
 
 static void bdrv_do_drained_begin(BlockDriverState *bs, bool recursive,
-                                  BdrvChild *parent);
+                                  BdrvChild *parent, bool ignore_bds_parents,
+                                  bool poll);
 static void bdrv_do_drained_end(BlockDriverState *bs, bool recursive,
-                                BdrvChild *parent);
+                                BdrvChild *parent, bool ignore_bds_parents);
 
 static void bdrv_co_drain_bh_cb(void *opaque)
 {
@@ -232,11 +272,18 @@ static void bdrv_co_drain_bh_cb(void *opaque)
     Coroutine *co = data->co;
     BlockDriverState *bs = data->bs;
 
-    bdrv_dec_in_flight(bs);
-    if (data->begin) {
-        bdrv_do_drained_begin(bs, data->recursive, data->parent);
+    if (bs) {
+        bdrv_dec_in_flight(bs);
+        if (data->begin) {
+            bdrv_do_drained_begin(bs, data->recursive, data->parent,
+                                  data->ignore_bds_parents, data->poll);
+        } else {
+            bdrv_do_drained_end(bs, data->recursive, data->parent,
+                                data->ignore_bds_parents);
+        }
     } else {
-        bdrv_do_drained_end(bs, data->recursive, data->parent);
+        assert(data->begin);
+        bdrv_drain_all_begin();
     }
 
     data->done = true;
@@ -245,7 +292,9 @@ static void bdrv_co_drain_bh_cb(void *opaque)
 
 static void coroutine_fn bdrv_co_yield_to_drain(BlockDriverState *bs,
                                                 bool begin, bool recursive,
-                                                BdrvChild *parent)
+                                                BdrvChild *parent,
+                                                bool ignore_bds_parents,
+                                                bool poll)
 {
     BdrvCoDrainData data;
 
@@ -260,8 +309,12 @@ static void coroutine_fn bdrv_co_yield_to_drain(BlockDriverState *bs,
         .begin = begin,
         .recursive = recursive,
         .parent = parent,
+        .ignore_bds_parents = ignore_bds_parents,
+        .poll = poll,
     };
-    bdrv_inc_in_flight(bs);
+    if (bs) {
+        bdrv_inc_in_flight(bs);
+    }
     aio_bh_schedule_oneshot(bdrv_get_aio_context(bs),
                             bdrv_co_drain_bh_cb, &data);
 
@@ -271,79 +324,106 @@ static void coroutine_fn bdrv_co_yield_to_drain(BlockDriverState *bs,
     assert(data.done);
 }
 
-void bdrv_do_drained_begin(BlockDriverState *bs, bool recursive,
-                           BdrvChild *parent)
+void bdrv_do_drained_begin_quiesce(BlockDriverState *bs,
+                                   BdrvChild *parent, bool ignore_bds_parents)
 {
-    BdrvChild *child, *next;
-
-    if (qemu_in_coroutine()) {
-        bdrv_co_yield_to_drain(bs, true, recursive, parent);
-        return;
-    }
+    assert(!qemu_in_coroutine());
 
     /* Stop things in parent-to-child order */
     if (atomic_fetch_inc(&bs->quiesce_counter) == 0) {
         aio_disable_external(bdrv_get_aio_context(bs));
     }
 
-    bdrv_parent_drained_begin(bs, parent);
-    bdrv_drain_invoke(bs, true, false);
-    bdrv_drain_recurse(bs);
+    bdrv_parent_drained_begin(bs, parent, ignore_bds_parents);
+    bdrv_drain_invoke(bs, true);
+}
+
+static void bdrv_do_drained_begin(BlockDriverState *bs, bool recursive,
+                                  BdrvChild *parent, bool ignore_bds_parents,
+                                  bool poll)
+{
+    BdrvChild *child, *next;
+
+    if (qemu_in_coroutine()) {
+        bdrv_co_yield_to_drain(bs, true, recursive, parent, ignore_bds_parents,
+                               poll);
+        return;
+    }
+
+    bdrv_do_drained_begin_quiesce(bs, parent, ignore_bds_parents);
 
     if (recursive) {
+        assert(!ignore_bds_parents);
         bs->recursive_quiesce_counter++;
         QLIST_FOREACH_SAFE(child, &bs->children, next, next) {
-            bdrv_do_drained_begin(child->bs, true, child);
+            bdrv_do_drained_begin(child->bs, true, child, ignore_bds_parents,
+                                  false);
         }
     }
+
+    /*
+     * Wait for drained requests to finish.
+     *
+     * Calling BDRV_POLL_WHILE() only once for the top-level node is okay: The
+     * call is needed so things in this AioContext can make progress even
+     * though we don't return to the main AioContext loop - this automatically
+     * includes other nodes in the same AioContext and therefore all child
+     * nodes.
+     */
+    if (poll) {
+        assert(!ignore_bds_parents);
+        BDRV_POLL_WHILE(bs, bdrv_drain_poll_top_level(bs, recursive, parent));
+    }
 }
 
 void bdrv_drained_begin(BlockDriverState *bs)
 {
-    bdrv_do_drained_begin(bs, false, NULL);
+    bdrv_do_drained_begin(bs, false, NULL, false, true);
 }
 
 void bdrv_subtree_drained_begin(BlockDriverState *bs)
 {
-    bdrv_do_drained_begin(bs, true, NULL);
+    bdrv_do_drained_begin(bs, true, NULL, false, true);
 }
 
-void bdrv_do_drained_end(BlockDriverState *bs, bool recursive,
-                         BdrvChild *parent)
+static void bdrv_do_drained_end(BlockDriverState *bs, bool recursive,
+                                BdrvChild *parent, bool ignore_bds_parents)
 {
     BdrvChild *child, *next;
     int old_quiesce_counter;
 
     if (qemu_in_coroutine()) {
-        bdrv_co_yield_to_drain(bs, false, recursive, parent);
+        bdrv_co_yield_to_drain(bs, false, recursive, parent, ignore_bds_parents,
+                               false);
         return;
     }
     assert(bs->quiesce_counter > 0);
     old_quiesce_counter = atomic_fetch_dec(&bs->quiesce_counter);
 
     /* Re-enable things in child-to-parent order */
-    bdrv_drain_invoke(bs, false, false);
-    bdrv_parent_drained_end(bs, parent);
+    bdrv_drain_invoke(bs, false);
+    bdrv_parent_drained_end(bs, parent, ignore_bds_parents);
     if (old_quiesce_counter == 1) {
         aio_enable_external(bdrv_get_aio_context(bs));
     }
 
     if (recursive) {
+        assert(!ignore_bds_parents);
         bs->recursive_quiesce_counter--;
         QLIST_FOREACH_SAFE(child, &bs->children, next, next) {
-            bdrv_do_drained_end(child->bs, true, child);
+            bdrv_do_drained_end(child->bs, true, child, ignore_bds_parents);
         }
     }
 }
 
 void bdrv_drained_end(BlockDriverState *bs)
 {
-    bdrv_do_drained_end(bs, false, NULL);
+    bdrv_do_drained_end(bs, false, NULL, false);
 }
 
 void bdrv_subtree_drained_end(BlockDriverState *bs)
 {
-    bdrv_do_drained_end(bs, true, NULL);
+    bdrv_do_drained_end(bs, true, NULL, false);
 }
 
 void bdrv_apply_subtree_drain(BdrvChild *child, BlockDriverState *new_parent)
@@ -351,7 +431,7 @@ void bdrv_apply_subtree_drain(BdrvChild *child, BlockDriverState *new_parent)
     int i;
 
     for (i = 0; i < new_parent->recursive_quiesce_counter; i++) {
-        bdrv_do_drained_begin(child->bs, true, child);
+        bdrv_do_drained_begin(child->bs, true, child, false, true);
     }
 }
 
@@ -360,7 +440,7 @@ void bdrv_unapply_subtree_drain(BdrvChild *child, BlockDriverState *old_parent)
     int i;
 
     for (i = 0; i < old_parent->recursive_quiesce_counter; i++) {
-        bdrv_do_drained_end(child->bs, true, child);
+        bdrv_do_drained_end(child->bs, true, child, false);
     }
 }
 
@@ -370,10 +450,6 @@ void bdrv_unapply_subtree_drain(BdrvChild *child, BlockDriverState *old_parent)
  *
  * Note that unlike bdrv_drain_all(), the caller must hold the BlockDriverState
  * AioContext.
- *
- * Only this BlockDriverState's AioContext is run, so in-flight requests must
- * not depend on events in other AioContexts.  In that case, use
- * bdrv_drain_all() instead.
  */
 void coroutine_fn bdrv_co_drain(BlockDriverState *bs)
 {
@@ -388,6 +464,39 @@ void bdrv_drain(BlockDriverState *bs)
     bdrv_drained_end(bs);
 }
 
+static void bdrv_drain_assert_idle(BlockDriverState *bs)
+{
+    BdrvChild *child, *next;
+
+    assert(atomic_read(&bs->in_flight) == 0);
+    QLIST_FOREACH_SAFE(child, &bs->children, next, next) {
+        bdrv_drain_assert_idle(child->bs);
+    }
+}
+
+unsigned int bdrv_drain_all_count = 0;
+
+static bool bdrv_drain_all_poll(void)
+{
+    BlockDriverState *bs = NULL;
+    bool result = false;
+
+    /* Execute pending BHs first (may modify the graph) and check everything
+     * else only after the BHs have executed. */
+    while (aio_poll(qemu_get_aio_context(), false));
+
+    /* bdrv_drain_poll() can't make changes to the graph and we are holding the
+     * main AioContext lock, so iterating bdrv_next_all_states() is safe. */
+    while ((bs = bdrv_next_all_states(bs))) {
+        AioContext *aio_context = bdrv_get_aio_context(bs);
+        aio_context_acquire(aio_context);
+        result |= bdrv_drain_poll(bs, false, NULL, true);
+        aio_context_release(aio_context);
+    }
+
+    return result;
+}
+
 /*
  * Wait for pending requests to complete across all BlockDriverStates
  *
@@ -402,73 +511,51 @@ void bdrv_drain(BlockDriverState *bs)
  */
 void bdrv_drain_all_begin(void)
 {
-    /* Always run first iteration so any pending completion BHs run */
-    bool waited = true;
-    BlockDriverState *bs;
-    BdrvNextIterator it;
-    GSList *aio_ctxs = NULL, *ctx;
+    BlockDriverState *bs = NULL;
+
+    if (qemu_in_coroutine()) {
+        bdrv_co_yield_to_drain(NULL, true, false, NULL, true, true);
+        return;
+    }
 
-    /* BDRV_POLL_WHILE() for a node can only be called from its own I/O thread
-     * or the main loop AioContext. We potentially use BDRV_POLL_WHILE() on
-     * nodes in several different AioContexts, so make sure we're in the main
-     * context. */
+    /* AIO_WAIT_WHILE() with a NULL context can only be called from the main
+     * loop AioContext, so make sure we're in the main context. */
     assert(qemu_get_current_aio_context() == qemu_get_aio_context());
+    assert(bdrv_drain_all_count < INT_MAX);
+    bdrv_drain_all_count++;
 
-    for (bs = bdrv_first(&it); bs; bs = bdrv_next(&it)) {
+    /* Quiesce all nodes, without polling in-flight requests yet. The graph
+     * cannot change during this loop. */
+    while ((bs = bdrv_next_all_states(bs))) {
         AioContext *aio_context = bdrv_get_aio_context(bs);
 
-        /* Stop things in parent-to-child order */
         aio_context_acquire(aio_context);
-        aio_disable_external(aio_context);
-        bdrv_parent_drained_begin(bs, NULL);
-        bdrv_drain_invoke(bs, true, true);
+        bdrv_do_drained_begin(bs, false, NULL, true, false);
         aio_context_release(aio_context);
-
-        if (!g_slist_find(aio_ctxs, aio_context)) {
-            aio_ctxs = g_slist_prepend(aio_ctxs, aio_context);
-        }
     }
 
-    /* Note that completion of an asynchronous I/O operation can trigger any
-     * number of other I/O operations on other devices---for example a
-     * coroutine can submit an I/O request to another device in response to
-     * request completion.  Therefore we must keep looping until there was no
-     * more activity rather than simply draining each device independently.
-     */
-    while (waited) {
-        waited = false;
-
-        for (ctx = aio_ctxs; ctx != NULL; ctx = ctx->next) {
-            AioContext *aio_context = ctx->data;
+    /* Now poll the in-flight requests */
+    AIO_WAIT_WHILE(&drain_all_aio_wait, NULL, bdrv_drain_all_poll());
 
-            aio_context_acquire(aio_context);
-            for (bs = bdrv_first(&it); bs; bs = bdrv_next(&it)) {
-                if (aio_context == bdrv_get_aio_context(bs)) {
-                    waited |= bdrv_drain_recurse(bs);
-                }
-            }
-            aio_context_release(aio_context);
-        }
+    while ((bs = bdrv_next_all_states(bs))) {
+        bdrv_drain_assert_idle(bs);
     }
-
-    g_slist_free(aio_ctxs);
 }
 
 void bdrv_drain_all_end(void)
 {
-    BlockDriverState *bs;
-    BdrvNextIterator it;
+    BlockDriverState *bs = NULL;
 
-    for (bs = bdrv_first(&it); bs; bs = bdrv_next(&it)) {
+    while ((bs = bdrv_next_all_states(bs))) {
         AioContext *aio_context = bdrv_get_aio_context(bs);
 
-        /* Re-enable things in child-to-parent order */
         aio_context_acquire(aio_context);
-        bdrv_drain_invoke(bs, false, true);
-        bdrv_parent_drained_end(bs, NULL);
-        aio_enable_external(aio_context);
+        bdrv_do_drained_end(bs, false, NULL, true);
         aio_context_release(aio_context);
     }
+
+    assert(bdrv_drain_all_count > 0);
+    bdrv_drain_all_count--;
 }
 
 void bdrv_drain_all(void)
@@ -591,6 +678,7 @@ void bdrv_inc_in_flight(BlockDriverState *bs)
 void bdrv_wakeup(BlockDriverState *bs)
 {
     aio_wait_kick(bdrv_get_aio_wait(bs));
+    aio_wait_kick(&drain_all_aio_wait);
 }
 
 void bdrv_dec_in_flight(BlockDriverState *bs)
diff --git a/block/mirror.c b/block/mirror.c
index 435268bbbf..61bd9f3cf1 100644
--- a/block/mirror.c
+++ b/block/mirror.c
@@ -13,6 +13,8 @@
 
 #include "qemu/osdep.h"
 #include "qemu/cutils.h"
+#include "qemu/coroutine.h"
+#include "qemu/range.h"
 #include "trace.h"
 #include "block/blockjob_int.h"
 #include "block/block_int.h"
@@ -33,11 +35,12 @@ typedef struct MirrorBuffer {
     QSIMPLEQ_ENTRY(MirrorBuffer) next;
 } MirrorBuffer;
 
+typedef struct MirrorOp MirrorOp;
+
 typedef struct MirrorBlockJob {
     BlockJob common;
     BlockBackend *target;
     BlockDriverState *mirror_top_bs;
-    BlockDriverState *source;
     BlockDriverState *base;
 
     /* The name of the graph node to replace */
@@ -48,8 +51,12 @@ typedef struct MirrorBlockJob {
     Error *replace_blocker;
     bool is_none_mode;
     BlockMirrorBackingMode backing_mode;
+    MirrorCopyMode copy_mode;
     BlockdevOnError on_source_error, on_target_error;
     bool synced;
+    /* Set when the target is synced (dirty bitmap is clean, nothing
+     * in flight) and the job is running in active mode */
+    bool actively_synced;
     bool should_complete;
     int64_t granularity;
     size_t buf_size;
@@ -65,25 +72,47 @@ typedef struct MirrorBlockJob {
     unsigned long *in_flight_bitmap;
     int in_flight;
     int64_t bytes_in_flight;
+    QTAILQ_HEAD(MirrorOpList, MirrorOp) ops_in_flight;
     int ret;
     bool unmap;
-    bool waiting_for_io;
     int target_cluster_size;
     int max_iov;
     bool initial_zeroing_ongoing;
+    int in_active_write_counter;
 } MirrorBlockJob;
 
-typedef struct MirrorOp {
+typedef struct MirrorBDSOpaque {
+    MirrorBlockJob *job;
+} MirrorBDSOpaque;
+
+struct MirrorOp {
     MirrorBlockJob *s;
     QEMUIOVector qiov;
     int64_t offset;
     uint64_t bytes;
-} MirrorOp;
+
+    /* The pointee is set by mirror_co_read(), mirror_co_zero(), and
+     * mirror_co_discard() before yielding for the first time */
+    int64_t *bytes_handled;
+
+    bool is_pseudo_op;
+    bool is_active_write;
+    CoQueue waiting_requests;
+
+    QTAILQ_ENTRY(MirrorOp) next;
+};
+
+typedef enum MirrorMethod {
+    MIRROR_METHOD_COPY,
+    MIRROR_METHOD_ZERO,
+    MIRROR_METHOD_DISCARD,
+} MirrorMethod;
 
 static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
                                             int error)
 {
     s->synced = false;
+    s->actively_synced = false;
     if (read) {
         return block_job_error_action(&s->common, s->on_source_error,
                                       true, error);
@@ -93,7 +122,42 @@ static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
     }
 }
 
-static void mirror_iteration_done(MirrorOp *op, int ret)
+static void coroutine_fn mirror_wait_on_conflicts(MirrorOp *self,
+                                                  MirrorBlockJob *s,
+                                                  uint64_t offset,
+                                                  uint64_t bytes)
+{
+    uint64_t self_start_chunk = offset / s->granularity;
+    uint64_t self_end_chunk = DIV_ROUND_UP(offset + bytes, s->granularity);
+    uint64_t self_nb_chunks = self_end_chunk - self_start_chunk;
+
+    while (find_next_bit(s->in_flight_bitmap, self_end_chunk,
+                         self_start_chunk) < self_end_chunk &&
+           s->ret >= 0)
+    {
+        MirrorOp *op;
+
+        QTAILQ_FOREACH(op, &s->ops_in_flight, next) {
+            uint64_t op_start_chunk = op->offset / s->granularity;
+            uint64_t op_nb_chunks = DIV_ROUND_UP(op->offset + op->bytes,
+                                                 s->granularity) -
+                                    op_start_chunk;
+
+            if (op == self) {
+                continue;
+            }
+
+            if (ranges_overlap(self_start_chunk, self_nb_chunks,
+                               op_start_chunk, op_nb_chunks))
+            {
+                qemu_co_queue_wait(&op->waiting_requests, NULL);
+                break;
+            }
+        }
+    }
+}
+
+static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret)
 {
     MirrorBlockJob *s = op->s;
     struct iovec *iov;
@@ -113,7 +177,9 @@ static void mirror_iteration_done(MirrorOp *op, int ret)
 
     chunk_num = op->offset / s->granularity;
     nb_chunks = DIV_ROUND_UP(op->bytes, s->granularity);
+
     bitmap_clear(s->in_flight_bitmap, chunk_num, nb_chunks);
+    QTAILQ_REMOVE(&s->ops_in_flight, op, next);
     if (ret >= 0) {
         if (s->cow_bitmap) {
             bitmap_set(s->cow_bitmap, chunk_num, nb_chunks);
@@ -123,16 +189,13 @@ static void mirror_iteration_done(MirrorOp *op, int ret)
         }
     }
     qemu_iovec_destroy(&op->qiov);
-    g_free(op);
 
-    if (s->waiting_for_io) {
-        qemu_coroutine_enter(s->common.job.co);
-    }
+    qemu_co_queue_restart_all(&op->waiting_requests);
+    g_free(op);
 }
 
-static void mirror_write_complete(void *opaque, int ret)
+static void coroutine_fn mirror_write_complete(MirrorOp *op, int ret)
 {
-    MirrorOp *op = opaque;
     MirrorBlockJob *s = op->s;
 
     aio_context_acquire(blk_get_aio_context(s->common.blk));
@@ -149,9 +212,8 @@ static void mirror_write_complete(void *opaque, int ret)
     aio_context_release(blk_get_aio_context(s->common.blk));
 }
 
-static void mirror_read_complete(void *opaque, int ret)
+static void coroutine_fn mirror_read_complete(MirrorOp *op, int ret)
 {
-    MirrorOp *op = opaque;
     MirrorBlockJob *s = op->s;
 
     aio_context_acquire(blk_get_aio_context(s->common.blk));
@@ -166,8 +228,9 @@ static void mirror_read_complete(void *opaque, int ret)
 
         mirror_iteration_done(op, ret);
     } else {
-        blk_aio_pwritev(s->target, op->offset, &op->qiov,
-                        0, mirror_write_complete, op);
+        ret = blk_co_pwritev(s->target, op->offset,
+                             op->qiov.size, &op->qiov, 0);
+        mirror_write_complete(op, ret);
     }
     aio_context_release(blk_get_aio_context(s->common.blk));
 }
@@ -216,68 +279,80 @@ static int mirror_cow_align(MirrorBlockJob *s, int64_t *offset,
     return ret;
 }
 
-static inline void mirror_wait_for_io(MirrorBlockJob *s)
+static inline void mirror_wait_for_any_operation(MirrorBlockJob *s, bool active)
 {
-    assert(!s->waiting_for_io);
-    s->waiting_for_io = true;
-    qemu_coroutine_yield();
-    s->waiting_for_io = false;
+    MirrorOp *op;
+
+    QTAILQ_FOREACH(op, &s->ops_in_flight, next) {
+        /* Do not wait on pseudo ops, because it may in turn wait on
+         * some other operation to start, which may in fact be the
+         * caller of this function.  Since there is only one pseudo op
+         * at any given time, we will always find some real operation
+         * to wait on. */
+        if (!op->is_pseudo_op && op->is_active_write == active) {
+            qemu_co_queue_wait(&op->waiting_requests, NULL);
+            return;
+        }
+    }
+    abort();
 }
 
-/* Submit async read while handling COW.
- * Returns: The number of bytes copied after and including offset,
- *          excluding any bytes copied prior to offset due to alignment.
- *          This will be @bytes if no alignment is necessary, or
- *          (new_end - offset) if tail is rounded up or down due to
- *          alignment or buffer limit.
+static inline void mirror_wait_for_free_in_flight_slot(MirrorBlockJob *s)
+{
+    /* Only non-active operations use up in-flight slots */
+    mirror_wait_for_any_operation(s, false);
+}
+
+/* Perform a mirror copy operation.
+ *
+ * *op->bytes_handled is set to the number of bytes copied after and
+ * including offset, excluding any bytes copied prior to offset due
+ * to alignment.  This will be op->bytes if no alignment is necessary,
+ * or (new_end - op->offset) if the tail is rounded up or down due to
+ * alignment or buffer limit.
  */
-static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset,
-                               uint64_t bytes)
+static void coroutine_fn mirror_co_read(void *opaque)
 {
-    BlockBackend *source = s->common.blk;
+    MirrorOp *op = opaque;
+    MirrorBlockJob *s = op->s;
     int nb_chunks;
     uint64_t ret;
-    MirrorOp *op;
     uint64_t max_bytes;
 
     max_bytes = s->granularity * s->max_iov;
 
     /* We can only handle as much as buf_size at a time. */
-    bytes = MIN(s->buf_size, MIN(max_bytes, bytes));
-    assert(bytes);
-    assert(bytes < BDRV_REQUEST_MAX_BYTES);
-    ret = bytes;
+    op->bytes = MIN(s->buf_size, MIN(max_bytes, op->bytes));
+    assert(op->bytes);
+    assert(op->bytes < BDRV_REQUEST_MAX_BYTES);
+    *op->bytes_handled = op->bytes;
 
     if (s->cow_bitmap) {
-        ret += mirror_cow_align(s, &offset, &bytes);
+        *op->bytes_handled += mirror_cow_align(s, &op->offset, &op->bytes);
     }
-    assert(bytes <= s->buf_size);
+    /* Cannot exceed BDRV_REQUEST_MAX_BYTES + INT_MAX */
+    assert(*op->bytes_handled <= UINT_MAX);
+    assert(op->bytes <= s->buf_size);
     /* The offset is granularity-aligned because:
      * 1) Caller passes in aligned values;
      * 2) mirror_cow_align is used only when target cluster is larger. */
-    assert(QEMU_IS_ALIGNED(offset, s->granularity));
+    assert(QEMU_IS_ALIGNED(op->offset, s->granularity));
     /* The range is sector-aligned, since bdrv_getlength() rounds up. */
-    assert(QEMU_IS_ALIGNED(bytes, BDRV_SECTOR_SIZE));
-    nb_chunks = DIV_ROUND_UP(bytes, s->granularity);
+    assert(QEMU_IS_ALIGNED(op->bytes, BDRV_SECTOR_SIZE));
+    nb_chunks = DIV_ROUND_UP(op->bytes, s->granularity);
 
     while (s->buf_free_count < nb_chunks) {
-        trace_mirror_yield_in_flight(s, offset, s->in_flight);
-        mirror_wait_for_io(s);
+        trace_mirror_yield_in_flight(s, op->offset, s->in_flight);
+        mirror_wait_for_free_in_flight_slot(s);
     }
 
-    /* Allocate a MirrorOp that is used as an AIO callback.  */
-    op = g_new(MirrorOp, 1);
-    op->s = s;
-    op->offset = offset;
-    op->bytes = bytes;
-
     /* Now make a QEMUIOVector taking enough granularity-sized chunks
      * from s->buf_free.
      */
     qemu_iovec_init(&op->qiov, nb_chunks);
     while (nb_chunks-- > 0) {
         MirrorBuffer *buf = QSIMPLEQ_FIRST(&s->buf_free);
-        size_t remaining = bytes - op->qiov.size;
+        size_t remaining = op->bytes - op->qiov.size;
 
         QSIMPLEQ_REMOVE_HEAD(&s->buf_free, next);
         s->buf_free_count--;
@@ -286,44 +361,92 @@ static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset,
 
     /* Copy the dirty cluster.  */
     s->in_flight++;
-    s->bytes_in_flight += bytes;
-    trace_mirror_one_iteration(s, offset, bytes);
+    s->bytes_in_flight += op->bytes;
+    trace_mirror_one_iteration(s, op->offset, op->bytes);
 
-    blk_aio_preadv(source, offset, &op->qiov, 0, mirror_read_complete, op);
-    return ret;
+    ret = bdrv_co_preadv(s->mirror_top_bs->backing, op->offset, op->bytes,
+                         &op->qiov, 0);
+    mirror_read_complete(op, ret);
 }
 
-static void mirror_do_zero_or_discard(MirrorBlockJob *s,
-                                      int64_t offset,
-                                      uint64_t bytes,
-                                      bool is_discard)
+static void coroutine_fn mirror_co_zero(void *opaque)
 {
-    MirrorOp *op;
+    MirrorOp *op = opaque;
+    int ret;
 
-    /* Allocate a MirrorOp that is used as an AIO callback. The qiov is zeroed
-     * so the freeing in mirror_iteration_done is nop. */
-    op = g_new0(MirrorOp, 1);
-    op->s = s;
-    op->offset = offset;
-    op->bytes = bytes;
+    op->s->in_flight++;
+    op->s->bytes_in_flight += op->bytes;
+    *op->bytes_handled = op->bytes;
 
-    s->in_flight++;
-    s->bytes_in_flight += bytes;
-    if (is_discard) {
-        blk_aio_pdiscard(s->target, offset,
-                         op->bytes, mirror_write_complete, op);
-    } else {
-        blk_aio_pwrite_zeroes(s->target, offset,
-                              op->bytes, s->unmap ? BDRV_REQ_MAY_UNMAP : 0,
-                              mirror_write_complete, op);
+    ret = blk_co_pwrite_zeroes(op->s->target, op->offset, op->bytes,
+                               op->s->unmap ? BDRV_REQ_MAY_UNMAP : 0);
+    mirror_write_complete(op, ret);
+}
+
+static void coroutine_fn mirror_co_discard(void *opaque)
+{
+    MirrorOp *op = opaque;
+    int ret;
+
+    op->s->in_flight++;
+    op->s->bytes_in_flight += op->bytes;
+    *op->bytes_handled = op->bytes;
+
+    ret = blk_co_pdiscard(op->s->target, op->offset, op->bytes);
+    mirror_write_complete(op, ret);
+}
+
+static unsigned mirror_perform(MirrorBlockJob *s, int64_t offset,
+                               unsigned bytes, MirrorMethod mirror_method)
+{
+    MirrorOp *op;
+    Coroutine *co;
+    int64_t bytes_handled = -1;
+
+    op = g_new(MirrorOp, 1);
+    *op = (MirrorOp){
+        .s              = s,
+        .offset         = offset,
+        .bytes          = bytes,
+        .bytes_handled  = &bytes_handled,
+    };
+    qemu_co_queue_init(&op->waiting_requests);
+
+    switch (mirror_method) {
+    case MIRROR_METHOD_COPY:
+        co = qemu_coroutine_create(mirror_co_read, op);
+        break;
+    case MIRROR_METHOD_ZERO:
+        co = qemu_coroutine_create(mirror_co_zero, op);
+        break;
+    case MIRROR_METHOD_DISCARD:
+        co = qemu_coroutine_create(mirror_co_discard, op);
+        break;
+    default:
+        abort();
     }
+
+    QTAILQ_INSERT_TAIL(&s->ops_in_flight, op, next);
+    qemu_coroutine_enter(co);
+    /* At this point, ownership of op has been moved to the coroutine
+     * and the object may already be freed */
+
+    /* Assert that this value has been set */
+    assert(bytes_handled >= 0);
+
+    /* Same assertion as in mirror_co_read() (and for mirror_co_read()
+     * and mirror_co_discard(), bytes_handled == op->bytes, which
+     * is the @bytes parameter given to this function) */
+    assert(bytes_handled <= UINT_MAX);
+    return bytes_handled;
 }
 
 static uint64_t coroutine_fn mirror_iteration(MirrorBlockJob *s)
 {
-    BlockDriverState *source = s->source;
-    int64_t offset, first_chunk;
-    uint64_t delay_ns = 0;
+    BlockDriverState *source = s->mirror_top_bs->backing->bs;
+    MirrorOp *pseudo_op;
+    int64_t offset;
+    uint64_t delay_ns = 0, ret = 0;
     /* At least the first dirty chunk is mirrored in one iteration. */
     int nb_chunks = 1;
     bool write_zeroes_ok = bdrv_can_write_zeroes_with_unmap(blk_bs(s->target));
@@ -339,11 +462,7 @@ static uint64_t coroutine_fn mirror_iteration(MirrorBlockJob *s)
     }
     bdrv_dirty_bitmap_unlock(s->dirty_bitmap);
 
-    first_chunk = offset / s->granularity;
-    while (test_bit(first_chunk, s->in_flight_bitmap)) {
-        trace_mirror_yield_in_flight(s, offset, s->in_flight);
-        mirror_wait_for_io(s);
-    }
+    mirror_wait_on_conflicts(NULL, s, offset, 1);
 
     job_pause_point(&s->common.job);
 
@@ -380,16 +499,27 @@ static uint64_t coroutine_fn mirror_iteration(MirrorBlockJob *s)
                                    nb_chunks * s->granularity);
     bdrv_dirty_bitmap_unlock(s->dirty_bitmap);
 
+    /* Before claiming an area in the in-flight bitmap, we have to
+     * create a MirrorOp for it so that conflicting requests can wait
+     * for it.  mirror_perform() will create the real MirrorOps later,
+     * for now we just create a pseudo operation that will wake up all
+     * conflicting requests once all real operations have been
+     * launched. */
+    pseudo_op = g_new(MirrorOp, 1);
+    *pseudo_op = (MirrorOp){
+        .offset         = offset,
+        .bytes          = nb_chunks * s->granularity,
+        .is_pseudo_op   = true,
+    };
+    qemu_co_queue_init(&pseudo_op->waiting_requests);
+    QTAILQ_INSERT_TAIL(&s->ops_in_flight, pseudo_op, next);
+
     bitmap_set(s->in_flight_bitmap, offset / s->granularity, nb_chunks);
     while (nb_chunks > 0 && offset < s->bdev_length) {
         int ret;
         int64_t io_bytes;
         int64_t io_bytes_acct;
-        enum MirrorMethod {
-            MIRROR_METHOD_COPY,
-            MIRROR_METHOD_ZERO,
-            MIRROR_METHOD_DISCARD
-        } mirror_method = MIRROR_METHOD_COPY;
+        MirrorMethod mirror_method = MIRROR_METHOD_COPY;
 
         assert(!(offset % s->granularity));
         ret = bdrv_block_status_above(source, NULL, offset,
@@ -419,37 +549,34 @@ static uint64_t coroutine_fn mirror_iteration(MirrorBlockJob *s)
 
         while (s->in_flight >= MAX_IN_FLIGHT) {
             trace_mirror_yield_in_flight(s, offset, s->in_flight);
-            mirror_wait_for_io(s);
+            mirror_wait_for_free_in_flight_slot(s);
         }
 
         if (s->ret < 0) {
-            return 0;
+            ret = 0;
+            goto fail;
         }
 
         io_bytes = mirror_clip_bytes(s, offset, io_bytes);
-        switch (mirror_method) {
-        case MIRROR_METHOD_COPY:
-            io_bytes = io_bytes_acct = mirror_do_read(s, offset, io_bytes);
-            break;
-        case MIRROR_METHOD_ZERO:
-        case MIRROR_METHOD_DISCARD:
-            mirror_do_zero_or_discard(s, offset, io_bytes,
-                                      mirror_method == MIRROR_METHOD_DISCARD);
-            if (write_zeroes_ok) {
-                io_bytes_acct = 0;
-            } else {
-                io_bytes_acct = io_bytes;
-            }
-            break;
-        default:
-            abort();
+        io_bytes = mirror_perform(s, offset, io_bytes, mirror_method);
+        if (mirror_method != MIRROR_METHOD_COPY && write_zeroes_ok) {
+            io_bytes_acct = 0;
+        } else {
+            io_bytes_acct = io_bytes;
         }
         assert(io_bytes);
         offset += io_bytes;
         nb_chunks -= DIV_ROUND_UP(io_bytes, s->granularity);
         delay_ns = block_job_ratelimit_get_delay(&s->common, io_bytes_acct);
     }
-    return delay_ns;
+
+    ret = delay_ns;
+fail:
+    QTAILQ_REMOVE(&s->ops_in_flight, pseudo_op, next);
+    qemu_co_queue_restart_all(&pseudo_op->waiting_requests);
+    g_free(pseudo_op);
+
+    return ret;
 }
 
 static void mirror_free_init(MirrorBlockJob *s)
@@ -476,7 +603,7 @@ static void mirror_free_init(MirrorBlockJob *s)
 static void mirror_wait_for_all_io(MirrorBlockJob *s)
 {
     while (s->in_flight > 0) {
-        mirror_wait_for_io(s);
+        mirror_wait_for_free_in_flight_slot(s);
     }
 }
 
@@ -489,8 +616,9 @@ static void mirror_exit(Job *job, void *opaque)
     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
     BlockJob *bjob = &s->common;
     MirrorExitData *data = opaque;
+    MirrorBDSOpaque *bs_opaque = s->mirror_top_bs->opaque;
     AioContext *replace_aio_context = NULL;
-    BlockDriverState *src = s->source;
+    BlockDriverState *src = s->mirror_top_bs->backing->bs;
     BlockDriverState *target_bs = blk_bs(s->target);
     BlockDriverState *mirror_top_bs = s->mirror_top_bs;
     Error *local_err = NULL;
@@ -581,6 +709,7 @@ static void mirror_exit(Job *job, void *opaque)
     blk_set_perm(bjob->blk, 0, BLK_PERM_ALL, &error_abort);
     blk_insert_bs(bjob->blk, mirror_top_bs, &error_abort);
 
+    bs_opaque->job = NULL;
     job_completed(job, data->ret, NULL);
 
     g_free(data);
@@ -605,7 +734,7 @@ static int coroutine_fn mirror_dirty_init(MirrorBlockJob *s)
 {
     int64_t offset;
     BlockDriverState *base = s->base;
-    BlockDriverState *bs = s->source;
+    BlockDriverState *bs = s->mirror_top_bs->backing->bs;
     BlockDriverState *target_bs = blk_bs(s->target);
     int ret;
     int64_t count;
@@ -631,11 +760,11 @@ static int coroutine_fn mirror_dirty_init(MirrorBlockJob *s)
             if (s->in_flight >= MAX_IN_FLIGHT) {
                 trace_mirror_yield(s, UINT64_MAX, s->buf_free_count,
                                    s->in_flight);
-                mirror_wait_for_io(s);
+                mirror_wait_for_free_in_flight_slot(s);
                 continue;
             }
 
-            mirror_do_zero_or_discard(s, offset, bytes, false);
+            mirror_perform(s, offset, bytes, MIRROR_METHOD_ZERO);
             offset += bytes;
         }
 
@@ -687,7 +816,7 @@ static void coroutine_fn mirror_run(void *opaque)
 {
     MirrorBlockJob *s = opaque;
     MirrorExitData *data;
-    BlockDriverState *bs = s->source;
+    BlockDriverState *bs = s->mirror_top_bs->backing->bs;
     BlockDriverState *target_bs = blk_bs(s->target);
     bool need_drain = true;
     int64_t length;
@@ -730,6 +859,7 @@ static void coroutine_fn mirror_run(void *opaque)
         /* Transition to the READY state and wait for complete. */
         job_transition_to_ready(&s->common.job);
         s->synced = true;
+        s->actively_synced = true;
         while (!job_is_cancelled(&s->common.job) && !s->should_complete) {
             job_yield(&s->common.job);
         }
@@ -781,6 +911,12 @@ static void coroutine_fn mirror_run(void *opaque)
         int64_t cnt, delta;
         bool should_complete;
 
+        /* Do not start passive operations while there are active
+         * writes in progress */
+        while (s->in_active_write_counter) {
+            mirror_wait_for_any_operation(s, true);
+        }
+
         if (s->ret < 0) {
             ret = s->ret;
             goto immediate_exit;
@@ -804,7 +940,7 @@ static void coroutine_fn mirror_run(void *opaque)
             if (s->in_flight >= MAX_IN_FLIGHT || s->buf_free_count == 0 ||
                 (cnt == 0 && s->in_flight > 0)) {
                 trace_mirror_yield(s, cnt, s->buf_free_count, s->in_flight);
-                mirror_wait_for_io(s);
+                mirror_wait_for_free_in_flight_slot(s);
                 continue;
             } else if (cnt != 0) {
                 delay_ns = mirror_iteration(s);
@@ -826,6 +962,9 @@ static void coroutine_fn mirror_run(void *opaque)
                  */
                 job_transition_to_ready(&s->common.job);
                 s->synced = true;
+                if (s->copy_mode != MIRROR_COPY_MODE_BACKGROUND) {
+                    s->actively_synced = true;
+                }
             }
 
             should_complete = s->should_complete ||
@@ -964,6 +1103,12 @@ static void mirror_pause(Job *job)
     mirror_wait_for_all_io(s);
 }
 
+static bool mirror_drained_poll(BlockJob *job)
+{
+    MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
+    return !!s->in_flight;
+}
+
 static void mirror_attached_aio_context(BlockJob *job, AioContext *new_context)
 {
     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
@@ -997,6 +1142,7 @@ static const BlockJobDriver mirror_job_driver = {
         .pause                  = mirror_pause,
         .complete               = mirror_complete,
     },
+    .drained_poll           = mirror_drained_poll,
     .attached_aio_context   = mirror_attached_aio_context,
     .drain                  = mirror_drain,
 };
@@ -1012,20 +1158,237 @@ static const BlockJobDriver commit_active_job_driver = {
         .pause                  = mirror_pause,
         .complete               = mirror_complete,
     },
+    .drained_poll           = mirror_drained_poll,
     .attached_aio_context   = mirror_attached_aio_context,
     .drain                  = mirror_drain,
 };
 
+static void do_sync_target_write(MirrorBlockJob *job, MirrorMethod method,
+                                 uint64_t offset, uint64_t bytes,
+                                 QEMUIOVector *qiov, int flags)
+{
+    BdrvDirtyBitmapIter *iter;
+    QEMUIOVector target_qiov;
+    uint64_t dirty_offset;
+    int dirty_bytes;
+
+    if (qiov) {
+        qemu_iovec_init(&target_qiov, qiov->niov);
+    }
+
+    iter = bdrv_dirty_iter_new(job->dirty_bitmap);
+    bdrv_set_dirty_iter(iter, offset);
+
+    while (true) {
+        bool valid_area;
+        int ret;
+
+        bdrv_dirty_bitmap_lock(job->dirty_bitmap);
+        valid_area = bdrv_dirty_iter_next_area(iter, offset + bytes,
+                                               &dirty_offset, &dirty_bytes);
+        if (!valid_area) {
+            bdrv_dirty_bitmap_unlock(job->dirty_bitmap);
+            break;
+        }
+
+        bdrv_reset_dirty_bitmap_locked(job->dirty_bitmap,
+                                       dirty_offset, dirty_bytes);
+        bdrv_dirty_bitmap_unlock(job->dirty_bitmap);
+
+        job_progress_increase_remaining(&job->common.job, dirty_bytes);
+
+        assert(dirty_offset - offset <= SIZE_MAX);
+        if (qiov) {
+            qemu_iovec_reset(&target_qiov);
+            qemu_iovec_concat(&target_qiov, qiov,
+                              dirty_offset - offset, dirty_bytes);
+        }
+
+        switch (method) {
+        case MIRROR_METHOD_COPY:
+            ret = blk_co_pwritev(job->target, dirty_offset, dirty_bytes,
+                                 qiov ? &target_qiov : NULL, flags);
+            break;
+
+        case MIRROR_METHOD_ZERO:
+            assert(!qiov);
+            ret = blk_co_pwrite_zeroes(job->target, dirty_offset, dirty_bytes,
+                                       flags);
+            break;
+
+        case MIRROR_METHOD_DISCARD:
+            assert(!qiov);
+            ret = blk_co_pdiscard(job->target, dirty_offset, dirty_bytes);
+            break;
+
+        default:
+            abort();
+        }
+
+        if (ret >= 0) {
+            job_progress_update(&job->common.job, dirty_bytes);
+        } else {
+            BlockErrorAction action;
+
+            bdrv_set_dirty_bitmap(job->dirty_bitmap, dirty_offset, dirty_bytes);
+            job->actively_synced = false;
+
+            action = mirror_error_action(job, false, -ret);
+            if (action == BLOCK_ERROR_ACTION_REPORT) {
+                if (!job->ret) {
+                    job->ret = ret;
+                }
+                break;
+            }
+        }
+    }
+
+    bdrv_dirty_iter_free(iter);
+    if (qiov) {
+        qemu_iovec_destroy(&target_qiov);
+    }
+}
+
+static MirrorOp *coroutine_fn active_write_prepare(MirrorBlockJob *s,
+                                                   uint64_t offset,
+                                                   uint64_t bytes)
+{
+    MirrorOp *op;
+    uint64_t start_chunk = offset / s->granularity;
+    uint64_t end_chunk = DIV_ROUND_UP(offset + bytes, s->granularity);
+
+    op = g_new(MirrorOp, 1);
+    *op = (MirrorOp){
+        .s                  = s,
+        .offset             = offset,
+        .bytes              = bytes,
+        .is_active_write    = true,
+    };
+    qemu_co_queue_init(&op->waiting_requests);
+    QTAILQ_INSERT_TAIL(&s->ops_in_flight, op, next);
+
+    s->in_active_write_counter++;
+
+    mirror_wait_on_conflicts(op, s, offset, bytes);
+
+    bitmap_set(s->in_flight_bitmap, start_chunk, end_chunk - start_chunk);
+
+    return op;
+}
+
+static void coroutine_fn active_write_settle(MirrorOp *op)
+{
+    uint64_t start_chunk = op->offset / op->s->granularity;
+    uint64_t end_chunk = DIV_ROUND_UP(op->offset + op->bytes,
+                                      op->s->granularity);
+
+    if (!--op->s->in_active_write_counter && op->s->actively_synced) {
+        BdrvChild *source = op->s->mirror_top_bs->backing;
+
+        if (QLIST_FIRST(&source->bs->parents) == source &&
+            QLIST_NEXT(source, next_parent) == NULL)
+        {
+            /* Assert that we are back in sync once all active write
+             * operations are settled.
+             * Note that we can only assert this if the mirror node
+             * is the source node's only parent. */
+            assert(!bdrv_get_dirty_count(op->s->dirty_bitmap));
+        }
+    }
+    bitmap_clear(op->s->in_flight_bitmap, start_chunk, end_chunk - start_chunk);
+    QTAILQ_REMOVE(&op->s->ops_in_flight, op, next);
+    qemu_co_queue_restart_all(&op->waiting_requests);
+    g_free(op);
+}
+
 static int coroutine_fn bdrv_mirror_top_preadv(BlockDriverState *bs,
     uint64_t offset, uint64_t bytes, QEMUIOVector *qiov, int flags)
 {
     return bdrv_co_preadv(bs->backing, offset, bytes, qiov, flags);
 }
 
+static int coroutine_fn bdrv_mirror_top_do_write(BlockDriverState *bs,
+    MirrorMethod method, uint64_t offset, uint64_t bytes, QEMUIOVector *qiov,
+    int flags)
+{
+    MirrorOp *op = NULL;
+    MirrorBDSOpaque *s = bs->opaque;
+    int ret = 0;
+    bool copy_to_target;
+
+    copy_to_target = s->job->ret >= 0 &&
+                     s->job->copy_mode == MIRROR_COPY_MODE_WRITE_BLOCKING;
+
+    if (copy_to_target) {
+        op = active_write_prepare(s->job, offset, bytes);
+    }
+
+    switch (method) {
+    case MIRROR_METHOD_COPY:
+        ret = bdrv_co_pwritev(bs->backing, offset, bytes, qiov, flags);
+        break;
+
+    case MIRROR_METHOD_ZERO:
+        ret = bdrv_co_pwrite_zeroes(bs->backing, offset, bytes, flags);
+        break;
+
+    case MIRROR_METHOD_DISCARD:
+        ret = bdrv_co_pdiscard(bs->backing->bs, offset, bytes);
+        break;
+
+    default:
+        abort();
+    }
+
+    if (ret < 0) {
+        goto out;
+    }
+
+    if (copy_to_target) {
+        do_sync_target_write(s->job, method, offset, bytes, qiov, flags);
+    }
+
+out:
+    if (copy_to_target) {
+        active_write_settle(op);
+    }
+    return ret;
+}
+
 static int coroutine_fn bdrv_mirror_top_pwritev(BlockDriverState *bs,
     uint64_t offset, uint64_t bytes, QEMUIOVector *qiov, int flags)
 {
-    return bdrv_co_pwritev(bs->backing, offset, bytes, qiov, flags);
+    MirrorBDSOpaque *s = bs->opaque;
+    QEMUIOVector bounce_qiov;
+    void *bounce_buf;
+    int ret = 0;
+    bool copy_to_target;
+
+    copy_to_target = s->job->ret >= 0 &&
+                     s->job->copy_mode == MIRROR_COPY_MODE_WRITE_BLOCKING;
+
+    if (copy_to_target) {
+        /* The guest might concurrently modify the data to write; but
+         * the data on source and destination must match, so we have
+         * to use a bounce buffer if we are going to write to the
+         * target now. */
+        bounce_buf = qemu_blockalign(bs, bytes);
+        iov_to_buf_full(qiov->iov, qiov->niov, 0, bounce_buf, bytes);
+
+        qemu_iovec_init(&bounce_qiov, 1);
+        qemu_iovec_add(&bounce_qiov, bounce_buf, bytes);
+        qiov = &bounce_qiov;
+    }
+
+    ret = bdrv_mirror_top_do_write(bs, MIRROR_METHOD_COPY, offset, bytes, qiov,
+                                   flags);
+
+    if (copy_to_target) {
+        qemu_iovec_destroy(&bounce_qiov);
+        qemu_vfree(bounce_buf);
+    }
+
+    return ret;
 }
 
 static int coroutine_fn bdrv_mirror_top_flush(BlockDriverState *bs)
@@ -1040,13 +1403,15 @@ static int coroutine_fn bdrv_mirror_top_flush(BlockDriverState *bs)
 static int coroutine_fn bdrv_mirror_top_pwrite_zeroes(BlockDriverState *bs,
     int64_t offset, int bytes, BdrvRequestFlags flags)
 {
-    return bdrv_co_pwrite_zeroes(bs->backing, offset, bytes, flags);
+    return bdrv_mirror_top_do_write(bs, MIRROR_METHOD_ZERO, offset, bytes, NULL,
+                                    flags);
 }
 
 static int coroutine_fn bdrv_mirror_top_pdiscard(BlockDriverState *bs,
     int64_t offset, int bytes)
 {
-    return bdrv_co_pdiscard(bs->backing->bs, offset, bytes);
+    return bdrv_mirror_top_do_write(bs, MIRROR_METHOD_DISCARD, offset, bytes,
+                                    NULL, 0);
 }
 
 static void bdrv_mirror_top_refresh_filename(BlockDriverState *bs, QDict *opts)
@@ -1108,10 +1473,11 @@ static void mirror_start_job(const char *job_id, BlockDriverState *bs,
                              const BlockJobDriver *driver,
                              bool is_none_mode, BlockDriverState *base,
                              bool auto_complete, const char *filter_node_name,
-                             bool is_mirror,
+                             bool is_mirror, MirrorCopyMode copy_mode,
                              Error **errp)
 {
     MirrorBlockJob *s;
+    MirrorBDSOpaque *bs_opaque;
     BlockDriverState *mirror_top_bs;
     bool target_graph_mod;
     bool target_is_backing;
@@ -1147,6 +1513,8 @@ static void mirror_start_job(const char *job_id, BlockDriverState *bs,
     mirror_top_bs->total_sectors = bs->total_sectors;
     mirror_top_bs->supported_write_flags = BDRV_REQ_WRITE_UNCHANGED;
     mirror_top_bs->supported_zero_flags = BDRV_REQ_WRITE_UNCHANGED;
+    bs_opaque = g_new0(MirrorBDSOpaque, 1);
+    mirror_top_bs->opaque = bs_opaque;
     bdrv_set_aio_context(mirror_top_bs, bdrv_get_aio_context(bs));
 
     /* bdrv_append takes ownership of the mirror_top_bs reference, need to keep
@@ -1171,10 +1539,11 @@ static void mirror_start_job(const char *job_id, BlockDriverState *bs,
     if (!s) {
         goto fail;
     }
+    bs_opaque->job = s;
+
     /* The block job now has a reference to this node */
     bdrv_unref(mirror_top_bs);
 
-    s->source = bs;
     s->mirror_top_bs = mirror_top_bs;
 
     /* No resize for the target either; while the mirror is still running, a
@@ -1212,6 +1581,7 @@ static void mirror_start_job(const char *job_id, BlockDriverState *bs,
     s->on_target_error = on_target_error;
     s->is_none_mode = is_none_mode;
     s->backing_mode = backing_mode;
+    s->copy_mode = copy_mode;
     s->base = base;
     s->granularity = granularity;
     s->buf_size = ROUND_UP(buf_size, granularity);
@@ -1247,6 +1617,8 @@ static void mirror_start_job(const char *job_id, BlockDriverState *bs,
         }
     }
 
+    QTAILQ_INIT(&s->ops_in_flight);
+
     trace_mirror_start(bs, s, opaque);
     job_start(&s->common.job);
     return;
@@ -1259,6 +1631,7 @@ fail:
 
         g_free(s->replaces);
         blk_unref(s->target);
+        bs_opaque->job = NULL;
         job_early_fail(&s->common.job);
     }
 
@@ -1275,7 +1648,8 @@ void mirror_start(const char *job_id, BlockDriverState *bs,
                   MirrorSyncMode mode, BlockMirrorBackingMode backing_mode,
                   BlockdevOnError on_source_error,
                   BlockdevOnError on_target_error,
-                  bool unmap, const char *filter_node_name, Error **errp)
+                  bool unmap, const char *filter_node_name,
+                  MirrorCopyMode copy_mode, Error **errp)
 {
     bool is_none_mode;
     BlockDriverState *base;
@@ -1290,7 +1664,7 @@ void mirror_start(const char *job_id, BlockDriverState *bs,
                      speed, granularity, buf_size, backing_mode,
                      on_source_error, on_target_error, unmap, NULL, NULL,
                      &mirror_job_driver, is_none_mode, base, false,
-                     filter_node_name, true, errp);
+                     filter_node_name, true, copy_mode, errp);
 }
 
 void commit_active_start(const char *job_id, BlockDriverState *bs,
@@ -1313,7 +1687,8 @@ void commit_active_start(const char *job_id, BlockDriverState *bs,
                      MIRROR_LEAVE_BACKING_CHAIN,
                      on_error, on_error, true, cb, opaque,
                      &commit_active_job_driver, false, base, auto_complete,
-                     filter_node_name, false, &local_err);
+                     filter_node_name, false, MIRROR_COPY_MODE_BACKGROUND,
+                     &local_err);
     if (local_err) {
         error_propagate(errp, local_err);
         goto error_restore_flags;
diff --git a/block/vvfat.c b/block/vvfat.c
index 4595f335b8..c7d2ed2d96 100644
--- a/block/vvfat.c
+++ b/block/vvfat.c
@@ -3134,6 +3134,7 @@ static void vvfat_qcow_options(int *child_flags, QDict *child_options,
 }
 
 static const BdrvChildRole child_vvfat_qcow = {
+    .parent_is_bds      = true,
     .inherit_options    = vvfat_qcow_options,
 };
 
diff --git a/blockdev.c b/blockdev.c
index 7f65cd7497..58d7570932 100644
--- a/blockdev.c
+++ b/blockdev.c
@@ -3586,6 +3586,7 @@ static void blockdev_mirror_common(const char *job_id, BlockDriverState *bs,
                                    bool has_unmap, bool unmap,
                                    bool has_filter_node_name,
                                    const char *filter_node_name,
+                                   bool has_copy_mode, MirrorCopyMode copy_mode,
                                    Error **errp)
 {
 
@@ -3610,6 +3611,9 @@ static void blockdev_mirror_common(const char *job_id, BlockDriverState *bs,
     if (!has_filter_node_name) {
         filter_node_name = NULL;
     }
+    if (!has_copy_mode) {
+        copy_mode = MIRROR_COPY_MODE_BACKGROUND;
+    }
 
     if (granularity != 0 && (granularity < 512 || granularity > 1048576 * 64)) {
         error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "granularity",
@@ -3640,7 +3644,7 @@ static void blockdev_mirror_common(const char *job_id, BlockDriverState *bs,
                  has_replaces ? replaces : NULL,
                  speed, granularity, buf_size, sync, backing_mode,
                  on_source_error, on_target_error, unmap, filter_node_name,
-                 errp);
+                 copy_mode, errp);
 }
 
 void qmp_drive_mirror(DriveMirror *arg, Error **errp)
@@ -3786,6 +3790,7 @@ void qmp_drive_mirror(DriveMirror *arg, Error **errp)
                            arg->has_on_target_error, arg->on_target_error,
                            arg->has_unmap, arg->unmap,
                            false, NULL,
+                           arg->has_copy_mode, arg->copy_mode,
                            &local_err);
     bdrv_unref(target_bs);
     error_propagate(errp, local_err);
@@ -3806,6 +3811,7 @@ void qmp_blockdev_mirror(bool has_job_id, const char *job_id,
                          BlockdevOnError on_target_error,
                          bool has_filter_node_name,
                          const char *filter_node_name,
+                         bool has_copy_mode, MirrorCopyMode copy_mode,
                          Error **errp)
 {
     BlockDriverState *bs;
@@ -3838,6 +3844,7 @@ void qmp_blockdev_mirror(bool has_job_id, const char *job_id,
                            has_on_target_error, on_target_error,
                            true, true,
                            has_filter_node_name, filter_node_name,
+                           has_copy_mode, copy_mode,
                            &local_err);
     error_propagate(errp, local_err);
 
diff --git a/blockjob.c b/blockjob.c
index 0306533a2e..be5903aa96 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -155,6 +155,28 @@ static void child_job_drained_begin(BdrvChild *c)
     job_pause(&job->job);
 }
 
+static bool child_job_drained_poll(BdrvChild *c)
+{
+    BlockJob *bjob = c->opaque;
+    Job *job = &bjob->job;
+    const BlockJobDriver *drv = block_job_driver(bjob);
+
+    /* An inactive or completed job doesn't have any pending requests. Jobs
+     * with !job->busy are either already paused or have a pause point after
+     * being reentered, so no job driver code will run before they pause. */
+    if (!job->busy || job_is_completed(job) || job->deferred_to_main_loop) {
+        return false;
+    }
+
+    /* Otherwise, assume that it isn't fully stopped yet, but allow the job to
+     * override this assumption. */
+    if (drv->drained_poll) {
+        return drv->drained_poll(bjob);
+    } else {
+        return true;
+    }
+}
+
 static void child_job_drained_end(BdrvChild *c)
 {
     BlockJob *job = c->opaque;
@@ -164,6 +186,7 @@ static void child_job_drained_end(BdrvChild *c)
 static const BdrvChildRole child_job = {
     .get_parent_desc    = child_job_get_parent_desc,
     .drained_begin      = child_job_drained_begin,
+    .drained_poll       = child_job_drained_poll,
     .drained_end        = child_job_drained_end,
     .stay_at_node       = true,
 };
diff --git a/include/block/aio-wait.h b/include/block/aio-wait.h
index 8c90a2e66e..c85a62f798 100644
--- a/include/block/aio-wait.h
+++ b/include/block/aio-wait.h
@@ -57,7 +57,8 @@ typedef struct {
 /**
  * AIO_WAIT_WHILE:
  * @wait: the aio wait object
- * @ctx: the aio context
+ * @ctx: the aio context, or NULL if multiple aio contexts (for which the
+ *       caller does not hold a lock) are involved in the polling condition.
  * @cond: wait while this conditional expression is true
  *
  * Wait while a condition is true.  Use this to implement synchronous
@@ -73,29 +74,27 @@ typedef struct {
  */
 #define AIO_WAIT_WHILE(wait, ctx, cond) ({                         \
     bool waited_ = false;                                          \
-    bool busy_ = true;                                             \
     AioWait *wait_ = (wait);                                       \
     AioContext *ctx_ = (ctx);                                      \
-    if (in_aio_context_home_thread(ctx_)) {                        \
-        while ((cond) || busy_) {                                  \
-            busy_ = aio_poll(ctx_, (cond));                        \
-            waited_ |= !!(cond) | busy_;                           \
+    if (ctx_ && in_aio_context_home_thread(ctx_)) {                \
+        while ((cond)) {                                           \
+            aio_poll(ctx_, true);                                  \
+            waited_ = true;                                        \
         }                                                          \
     } else {                                                       \
         assert(qemu_get_current_aio_context() ==                   \
                qemu_get_aio_context());                            \
         /* Increment wait_->num_waiters before evaluating cond. */ \
         atomic_inc(&wait_->num_waiters);                           \
-        while (busy_) {                                            \
-            if ((cond)) {                                          \
-                waited_ = busy_ = true;                            \
+        while ((cond)) {                                           \
+            if (ctx_) {                                            \
                 aio_context_release(ctx_);                         \
-                aio_poll(qemu_get_aio_context(), true);            \
+            }                                                      \
+            aio_poll(qemu_get_aio_context(), true);                \
+            if (ctx_) {                                            \
                 aio_context_acquire(ctx_);                         \
-            } else {                                               \
-                busy_ = aio_poll(ctx_, false);                     \
-                waited_ |= busy_;                                  \
             }                                                      \
+            waited_ = true;                                        \
         }                                                          \
         atomic_dec(&wait_->num_waiters);                           \
     }                                                              \
diff --git a/include/block/block.h b/include/block/block.h
index e677080c4e..b1d6fdb97a 100644
--- a/include/block/block.h
+++ b/include/block/block.h
@@ -421,6 +421,7 @@ BlockDriverState *bdrv_lookup_bs(const char *device,
                                  Error **errp);
 bool bdrv_chain_contains(BlockDriverState *top, BlockDriverState *base);
 BlockDriverState *bdrv_next_node(BlockDriverState *bs);
+BlockDriverState *bdrv_next_all_states(BlockDriverState *bs);
 
 typedef struct BdrvNextIterator {
     enum {
@@ -557,7 +558,8 @@ void bdrv_io_unplug(BlockDriverState *bs);
  * Begin a quiesced section of all users of @bs. This is part of
  * bdrv_drained_begin.
  */
-void bdrv_parent_drained_begin(BlockDriverState *bs, BdrvChild *ignore);
+void bdrv_parent_drained_begin(BlockDriverState *bs, BdrvChild *ignore,
+                               bool ignore_bds_parents);
 
 /**
  * bdrv_parent_drained_end:
@@ -565,7 +567,23 @@ void bdrv_parent_drained_begin(BlockDriverState *bs, BdrvChild *ignore);
  * End a quiesced section of all users of @bs. This is part of
  * bdrv_drained_end.
  */
-void bdrv_parent_drained_end(BlockDriverState *bs, BdrvChild *ignore);
+void bdrv_parent_drained_end(BlockDriverState *bs, BdrvChild *ignore,
+                             bool ignore_bds_parents);
+
+/**
+ * bdrv_drain_poll:
+ *
+ * Poll for pending requests in @bs, its parents (except for @ignore_parent),
+ * and if @recursive is true its children as well (used for subtree drain).
+ *
+ * If @ignore_bds_parents is true, parents that are BlockDriverStates must
+ * ignore the drain request because they will be drained separately (used for
+ * drain_all).
+ *
+ * This is part of bdrv_drained_begin.
+ */
+bool bdrv_drain_poll(BlockDriverState *bs, bool recursive,
+                     BdrvChild *ignore_parent, bool ignore_bds_parents);
 
 /**
  * bdrv_drained_begin:
@@ -580,6 +598,15 @@ void bdrv_parent_drained_end(BlockDriverState *bs, BdrvChild *ignore);
 void bdrv_drained_begin(BlockDriverState *bs);
 
 /**
+ * bdrv_do_drained_begin_quiesce:
+ *
+ * Quiesces a BDS like bdrv_drained_begin(), but does not wait for already
+ * running requests to complete.
+ */
+void bdrv_do_drained_begin_quiesce(BlockDriverState *bs,
+                                   BdrvChild *parent, bool ignore_bds_parents);
+
+/**
  * Like bdrv_drained_begin, but recursively begins a quiesced section for
  * exclusive access to all child nodes as well.
  */
diff --git a/include/block/block_int.h b/include/block/block_int.h
index 327e478a73..74646ed722 100644
--- a/include/block/block_int.h
+++ b/include/block/block_int.h
@@ -577,6 +577,12 @@ struct BdrvChildRole {
      * points to. */
     bool stay_at_node;
 
+    /* If true, the parent is a BlockDriverState and bdrv_next_all_states()
+     * will return it. This information is used for drain_all, where every node
+     * will be drained separately, so the drain only needs to be propagated to
+     * non-BDS parents. */
+    bool parent_is_bds;
+
     void (*inherit_options)(int *child_flags, QDict *child_options,
                             int parent_flags, QDict *parent_options);
 
@@ -605,6 +611,13 @@ struct BdrvChildRole {
     void (*drained_begin)(BdrvChild *child);
     void (*drained_end)(BdrvChild *child);
 
+    /*
+     * Returns whether the parent has pending requests for the child. This
+     * callback is polled after .drained_begin() has been called until all
+     * activity on the child has stopped.
+     */
+    bool (*drained_poll)(BdrvChild *child);
+
     /* Notifies the parent that the child has been activated/inactivated (e.g.
      * when migration is completing) and it can start/stop requesting
      * permissions and doing I/O on it. */
@@ -841,6 +854,7 @@ int coroutine_fn bdrv_co_pwritev(BdrvChild *child,
     int64_t offset, unsigned int bytes, QEMUIOVector *qiov,
     BdrvRequestFlags flags);
 
+extern unsigned int bdrv_drain_all_count;
 void bdrv_apply_subtree_drain(BdrvChild *child, BlockDriverState *new_parent);
 void bdrv_unapply_subtree_drain(BdrvChild *child, BlockDriverState *old_parent);
 
@@ -1017,6 +1031,7 @@ void commit_active_start(const char *job_id, BlockDriverState *bs,
  * @filter_node_name: The node name that should be assigned to the filter
  * driver that the mirror job inserts into the graph above @bs. NULL means that
  * a node name should be autogenerated.
+ * @copy_mode: When to trigger writes to the target.
  * @errp: Error object.
  *
  * Start a mirroring operation on @bs.  Clusters that are allocated
@@ -1030,7 +1045,8 @@ void mirror_start(const char *job_id, BlockDriverState *bs,
                   MirrorSyncMode mode, BlockMirrorBackingMode backing_mode,
                   BlockdevOnError on_source_error,
                   BlockdevOnError on_target_error,
-                  bool unmap, const char *filter_node_name, Error **errp);
+                  bool unmap, const char *filter_node_name,
+                  MirrorCopyMode copy_mode, Error **errp);
 
 /*
  * backup_job_create:
diff --git a/include/block/blockjob_int.h b/include/block/blockjob_int.h
index 5cd50c6639..e4a318dd15 100644
--- a/include/block/blockjob_int.h
+++ b/include/block/blockjob_int.h
@@ -39,6 +39,14 @@ struct BlockJobDriver {
     JobDriver job_driver;
 
     /*
+     * Returns whether the job has pending requests for the child or will
+     * submit new requests before the next pause point. This callback is polled
+     * in the context of draining a job node after requesting that the job be
+     * paused, until all activity on the child has stopped.
+     */
+    bool (*drained_poll)(BlockJob *job);
+
+    /*
      * If the callback is not NULL, it will be invoked before the job is
      * resumed in a new AioContext.  This is the place to move any resources
      * besides job->blk to the new AioContext.
diff --git a/include/block/dirty-bitmap.h b/include/block/dirty-bitmap.h
index 02e0cbabd2..288dc6adb6 100644
--- a/include/block/dirty-bitmap.h
+++ b/include/block/dirty-bitmap.h
@@ -82,6 +82,8 @@ void bdrv_set_dirty_bitmap_locked(BdrvDirtyBitmap *bitmap,
 void bdrv_reset_dirty_bitmap_locked(BdrvDirtyBitmap *bitmap,
                                     int64_t offset, int64_t bytes);
 int64_t bdrv_dirty_iter_next(BdrvDirtyBitmapIter *iter);
+bool bdrv_dirty_iter_next_area(BdrvDirtyBitmapIter *iter, uint64_t max_offset,
+                               uint64_t *offset, int *bytes);
 void bdrv_set_dirty_iter(BdrvDirtyBitmapIter *hbi, int64_t offset);
 int64_t bdrv_get_dirty_count(BdrvDirtyBitmap *bitmap);
 int64_t bdrv_get_meta_dirty_count(BdrvDirtyBitmap *bitmap);
diff --git a/include/qemu/hbitmap.h b/include/qemu/hbitmap.h
index 6b6490ecad..ddca52c48e 100644
--- a/include/qemu/hbitmap.h
+++ b/include/qemu/hbitmap.h
@@ -324,11 +324,14 @@ void hbitmap_free_meta(HBitmap *hb);
 /**
  * hbitmap_iter_next:
  * @hbi: HBitmapIter to operate on.
+ * @advance: If true, advance the iterator.  Otherwise, the next call
+ *           of this function will return the same result (if that
+ *           position is still dirty).
  *
  * Return the next bit that is set in @hbi's associated HBitmap,
  * or -1 if all remaining bits are zero.
  */
-int64_t hbitmap_iter_next(HBitmapIter *hbi);
+int64_t hbitmap_iter_next(HBitmapIter *hbi, bool advance);
 
 /**
  * hbitmap_iter_next_word:
diff --git a/include/qemu/job.h b/include/qemu/job.h
index 1d820530fa..18c9223e31 100644
--- a/include/qemu/job.h
+++ b/include/qemu/job.h
@@ -335,6 +335,21 @@ void job_progress_update(Job *job, uint64_t done);
  */
 void job_progress_set_remaining(Job *job, uint64_t remaining);
 
+/**
+ * @job: The job whose expected progress end value is updated
+ * @delta: Value which is to be added to the current expected end
+ *         value
+ *
+ * Increases the expected end value of the progress counter of a job.
+ * This is useful for parenthesis operations: If a job has to
+ * conditionally perform a high-priority operation as part of its
+ * progress, it calls this function with the expected operation's
+ * length before, and job_progress_update() afterwards.
+ * (So the operation acts as a parenthesis in regards to the main job
+ * operation running in background.)
+ */
+void job_progress_increase_remaining(Job *job, uint64_t delta);
+
 /** To be called when a cancelled job is finalised. */
 void job_event_cancelled(Job *job);
 
diff --git a/job.c b/job.c
index 84e140238b..fa671b431a 100644
--- a/job.c
+++ b/job.c
@@ -385,6 +385,11 @@ void job_progress_set_remaining(Job *job, uint64_t remaining)
     job->progress_total = job->progress_current + remaining;
 }
 
+void job_progress_increase_remaining(Job *job, uint64_t delta)
+{
+    job->progress_total += delta;
+}
+
 void job_event_cancelled(Job *job)
 {
     notifier_list_notify(&job->on_finalize_cancelled, job);
diff --git a/qapi/block-core.json b/qapi/block-core.json
index ab629d1647..cc3ede0630 100644
--- a/qapi/block-core.json
+++ b/qapi/block-core.json
@@ -1051,6 +1051,24 @@
   'data': ['top', 'full', 'none', 'incremental'] }
 
 ##
+# @MirrorCopyMode:
+#
+# An enumeration whose values tell the mirror block job when to
+# trigger writes to the target.
+#
+# @background: copy data in background only.
+#
+# @write-blocking: when data is written to the source, write it
+#                  (synchronously) to the target as well.  In
+#                  addition, data is copied in background just like in
+#                  @background mode.
+#
+# Since: 3.0
+##
+{ 'enum': 'MirrorCopyMode',
+  'data': ['background', 'write-blocking'] }
+
+##
 # @BlockJobInfo:
 #
 # Information about a long-running block device operation.
@@ -1692,6 +1710,9 @@
 #         written. Both will result in identical contents.
 #         Default is true. (Since 2.4)
 #
+# @copy-mode: when to copy data to the destination; defaults to 'background'
+#             (Since: 3.0)
+#
 # Since: 1.3
 ##
 { 'struct': 'DriveMirror',
@@ -1701,7 +1722,7 @@
             '*speed': 'int', '*granularity': 'uint32',
             '*buf-size': 'int', '*on-source-error': 'BlockdevOnError',
             '*on-target-error': 'BlockdevOnError',
-            '*unmap': 'bool' } }
+            '*unmap': 'bool', '*copy-mode': 'MirrorCopyMode' } }
 
 ##
 # @BlockDirtyBitmap:
@@ -1964,6 +1985,9 @@
 #                    above @device. If this option is not given, a node name is
 #                    autogenerated. (Since: 2.9)
 #
+# @copy-mode: when to copy data to the destination; defaults to 'background'
+#             (Since: 3.0)
+#
 # Returns: nothing on success.
 #
 # Since: 2.6
@@ -1984,7 +2008,8 @@
             '*speed': 'int', '*granularity': 'uint32',
             '*buf-size': 'int', '*on-source-error': 'BlockdevOnError',
             '*on-target-error': 'BlockdevOnError',
-            '*filter-node-name': 'str' } }
+            '*filter-node-name': 'str',
+            '*copy-mode': 'MirrorCopyMode' } }
 
 ##
 # @block_set_io_throttle:
diff --git a/tests/qemu-iotests/151 b/tests/qemu-iotests/151
new file mode 100755
index 0000000000..fe53b9f446
--- /dev/null
+++ b/tests/qemu-iotests/151
@@ -0,0 +1,120 @@
+#!/usr/bin/env python
+#
+# Tests for active mirroring
+#
+# Copyright (C) 2018 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+
+import os
+import iotests
+from iotests import qemu_img
+
+source_img = os.path.join(iotests.test_dir, 'source.' + iotests.imgfmt)
+target_img = os.path.join(iotests.test_dir, 'target.' + iotests.imgfmt)
+
+class TestActiveMirror(iotests.QMPTestCase):
+    image_len = 128 * 1024 * 1024 # MB
+    potential_writes_in_flight = True
+
+    def setUp(self):
+        qemu_img('create', '-f', iotests.imgfmt, source_img, '128M')
+        qemu_img('create', '-f', iotests.imgfmt, target_img, '128M')
+
+        blk_source = {'id': 'source',
+                      'if': 'none',
+                      'node-name': 'source-node',
+                      'driver': iotests.imgfmt,
+                      'file': {'driver': 'file',
+                               'filename': source_img}}
+
+        blk_target = {'node-name': 'target-node',
+                      'driver': iotests.imgfmt,
+                      'file': {'driver': 'file',
+                               'filename': target_img}}
+
+        self.vm = iotests.VM()
+        self.vm.add_drive_raw(self.vm.qmp_to_opts(blk_source))
+        self.vm.add_blockdev(self.vm.qmp_to_opts(blk_target))
+        self.vm.add_device('virtio-blk,drive=source')
+        self.vm.launch()
+
+    def tearDown(self):
+        self.vm.shutdown()
+
+        if not self.potential_writes_in_flight:
+            self.assertTrue(iotests.compare_images(source_img, target_img),
+                            'mirror target does not match source')
+
+        os.remove(source_img)
+        os.remove(target_img)
+
+    def doActiveIO(self, sync_source_and_target):
+        # Fill the source image
+        self.vm.hmp_qemu_io('source',
+                            'write -P 1 0 %i' % self.image_len);
+
+        # Start some background requests
+        for offset in range(1 * self.image_len / 8, 3 * self.image_len / 8, 1024 * 1024):
+            self.vm.hmp_qemu_io('source', 'aio_write -P 2 %i 1M' % offset)
+        for offset in range(2 * self.image_len / 8, 3 * self.image_len / 8, 1024 * 1024):
+            self.vm.hmp_qemu_io('source', 'aio_write -z %i 1M' % offset)
+
+        # Start the block job
+        result = self.vm.qmp('blockdev-mirror',
+                             job_id='mirror',
+                             filter_node_name='mirror-node',
+                             device='source-node',
+                             target='target-node',
+                             sync='full',
+                             copy_mode='write-blocking')
+        self.assert_qmp(result, 'return', {})
+
+        # Start some more requests
+        for offset in range(3 * self.image_len / 8, 5 * self.image_len / 8, 1024 * 1024):
+            self.vm.hmp_qemu_io('source', 'aio_write -P 3 %i 1M' % offset)
+        for offset in range(4 * self.image_len / 8, 5 * self.image_len / 8, 1024 * 1024):
+            self.vm.hmp_qemu_io('source', 'aio_write -z %i 1M' % offset)
+
+        # Wait for the READY event
+        self.wait_ready(drive='mirror')
+
+        # Now start some final requests; all of these (which land on
+        # the source) should be settled using the active mechanism.
+        # The mirror code itself asserts that the source BDS's dirty
+        # bitmap will stay clean between READY and COMPLETED.
+        for offset in range(5 * self.image_len / 8, 7 * self.image_len / 8, 1024 * 1024):
+            self.vm.hmp_qemu_io('source', 'aio_write -P 3 %i 1M' % offset)
+        for offset in range(6 * self.image_len / 8, 7 * self.image_len / 8, 1024 * 1024):
+            self.vm.hmp_qemu_io('source', 'aio_write -z %i 1M' % offset)
+
+        if sync_source_and_target:
+            # If source and target should be in sync after the mirror,
+            # we have to flush before completion
+            self.vm.hmp_qemu_io('source', 'aio_flush')
+            self.potential_writes_in_flight = False
+
+        self.complete_and_wait(drive='mirror', wait_ready=False)
+
+    def testActiveIO(self):
+        self.doActiveIO(False)
+
+    def testActiveIOFlushed(self):
+        self.doActiveIO(True)
+
+
+
+if __name__ == '__main__':
+    iotests.main(supported_fmts=['qcow2', 'raw'])
diff --git a/tests/qemu-iotests/151.out b/tests/qemu-iotests/151.out
new file mode 100644
index 0000000000..fbc63e62f8
--- /dev/null
+++ b/tests/qemu-iotests/151.out
@@ -0,0 +1,5 @@
+..
+----------------------------------------------------------------------
+Ran 2 tests
+
+OK
diff --git a/tests/qemu-iotests/group b/tests/qemu-iotests/group
index 937a3d0a4d..eea75819d2 100644
--- a/tests/qemu-iotests/group
+++ b/tests/qemu-iotests/group
@@ -157,6 +157,7 @@
 148 rw auto quick
 149 rw auto sudo
 150 rw auto quick
+151 rw auto
 152 rw auto quick
 153 rw auto quick
 154 rw auto backing quick
diff --git a/tests/test-bdrv-drain.c b/tests/test-bdrv-drain.c
index a11c4cfbf2..291a050f86 100644
--- a/tests/test-bdrv-drain.c
+++ b/tests/test-bdrv-drain.c
@@ -27,15 +27,23 @@
 #include "block/blockjob_int.h"
 #include "sysemu/block-backend.h"
 #include "qapi/error.h"
+#include "iothread.h"
+
+static QemuEvent done_event;
 
 typedef struct BDRVTestState {
     int drain_count;
+    AioContext *bh_indirection_ctx;
+    bool sleep_in_drain_begin;
 } BDRVTestState;
 
 static void coroutine_fn bdrv_test_co_drain_begin(BlockDriverState *bs)
 {
     BDRVTestState *s = bs->opaque;
     s->drain_count++;
+    if (s->sleep_in_drain_begin) {
+        qemu_co_sleep_ns(QEMU_CLOCK_REALTIME, 100000);
+    }
 }
 
 static void coroutine_fn bdrv_test_co_drain_end(BlockDriverState *bs)
@@ -50,19 +58,48 @@ static void bdrv_test_close(BlockDriverState *bs)
     g_assert_cmpint(s->drain_count, >, 0);
 }
 
+static void co_reenter_bh(void *opaque)
+{
+    aio_co_wake(opaque);
+}
+
 static int coroutine_fn bdrv_test_co_preadv(BlockDriverState *bs,
                                             uint64_t offset, uint64_t bytes,
                                             QEMUIOVector *qiov, int flags)
 {
+    BDRVTestState *s = bs->opaque;
+
     /* We want this request to stay until the polling loop in drain waits for
      * it to complete. We need to sleep a while as bdrv_drain_invoke() comes
      * first and polls its result, too, but it shouldn't accidentally complete
      * this request yet. */
     qemu_co_sleep_ns(QEMU_CLOCK_REALTIME, 100000);
 
+    if (s->bh_indirection_ctx) {
+        aio_bh_schedule_oneshot(s->bh_indirection_ctx, co_reenter_bh,
+                                qemu_coroutine_self());
+        qemu_coroutine_yield();
+    }
+
     return 0;
 }
 
+static void bdrv_test_child_perm(BlockDriverState *bs, BdrvChild *c,
+                                 const BdrvChildRole *role,
+                                 BlockReopenQueue *reopen_queue,
+                                 uint64_t perm, uint64_t shared,
+                                 uint64_t *nperm, uint64_t *nshared)
+{
+    /* bdrv_format_default_perms() accepts only these two, so disguise
+     * detach_by_driver_cb_role as one of them. */
+    if (role != &child_file && role != &child_backing) {
+        role = &child_file;
+    }
+
+    bdrv_format_default_perms(bs, c, role, reopen_queue, perm, shared,
+                              nperm, nshared);
+}
+
 static BlockDriver bdrv_test = {
     .format_name            = "test",
     .instance_size          = sizeof(BDRVTestState),
@@ -73,7 +110,7 @@ static BlockDriver bdrv_test = {
     .bdrv_co_drain_begin    = bdrv_test_co_drain_begin,
     .bdrv_co_drain_end      = bdrv_test_co_drain_end,
 
-    .bdrv_child_perm        = bdrv_format_default_perms,
+    .bdrv_child_perm        = bdrv_test_child_perm,
 };
 
 static void aio_ret_cb(void *opaque, int ret)
@@ -216,6 +253,11 @@ static void test_drv_cb_drain_subtree(void)
     test_drv_cb_common(BDRV_SUBTREE_DRAIN, true);
 }
 
+static void test_drv_cb_co_drain_all(void)
+{
+    call_in_coroutine(test_drv_cb_drain_all);
+}
+
 static void test_drv_cb_co_drain(void)
 {
     call_in_coroutine(test_drv_cb_drain);
@@ -259,8 +301,7 @@ static void test_quiesce_common(enum drain_type drain_type, bool recursive)
 
 static void test_quiesce_drain_all(void)
 {
-    // XXX drain_all doesn't quiesce
-    //test_quiesce_common(BDRV_DRAIN_ALL, true);
+    test_quiesce_common(BDRV_DRAIN_ALL, true);
 }
 
 static void test_quiesce_drain(void)
@@ -273,6 +314,11 @@ static void test_quiesce_drain_subtree(void)
     test_quiesce_common(BDRV_SUBTREE_DRAIN, true);
 }
 
+static void test_quiesce_co_drain_all(void)
+{
+    call_in_coroutine(test_quiesce_drain_all);
+}
+
 static void test_quiesce_co_drain(void)
 {
     call_in_coroutine(test_quiesce_drain);
@@ -302,12 +348,7 @@ static void test_nested(void)
 
     for (outer = 0; outer < DRAIN_TYPE_MAX; outer++) {
         for (inner = 0; inner < DRAIN_TYPE_MAX; inner++) {
-            /* XXX bdrv_drain_all() doesn't increase the quiesce_counter */
-            int bs_quiesce      = (outer != BDRV_DRAIN_ALL) +
-                                  (inner != BDRV_DRAIN_ALL);
-            int backing_quiesce = (outer == BDRV_SUBTREE_DRAIN) +
-                                  (inner == BDRV_SUBTREE_DRAIN);
-            int backing_cb_cnt  = (outer != BDRV_DRAIN) +
+            int backing_quiesce = (outer != BDRV_DRAIN) +
                                   (inner != BDRV_DRAIN);
 
             g_assert_cmpint(bs->quiesce_counter, ==, 0);
@@ -318,10 +359,10 @@ static void test_nested(void)
             do_drain_begin(outer, bs);
             do_drain_begin(inner, bs);
 
-            g_assert_cmpint(bs->quiesce_counter, ==, bs_quiesce);
+            g_assert_cmpint(bs->quiesce_counter, ==, 2);
             g_assert_cmpint(backing->quiesce_counter, ==, backing_quiesce);
             g_assert_cmpint(s->drain_count, ==, 2);
-            g_assert_cmpint(backing_s->drain_count, ==, backing_cb_cnt);
+            g_assert_cmpint(backing_s->drain_count, ==, backing_quiesce);
 
             do_drain_end(inner, bs);
             do_drain_end(outer, bs);
@@ -411,7 +452,7 @@ static void test_multiparent(void)
     blk_unref(blk_b);
 }
 
-static void test_graph_change(void)
+static void test_graph_change_drain_subtree(void)
 {
     BlockBackend *blk_a, *blk_b;
     BlockDriverState *bs_a, *bs_b, *backing;
@@ -490,6 +531,221 @@ static void test_graph_change(void)
     blk_unref(blk_b);
 }
 
+static void test_graph_change_drain_all(void)
+{
+    BlockBackend *blk_a, *blk_b;
+    BlockDriverState *bs_a, *bs_b;
+    BDRVTestState *a_s, *b_s;
+
+    /* Create node A with a BlockBackend */
+    blk_a = blk_new(BLK_PERM_ALL, BLK_PERM_ALL);
+    bs_a = bdrv_new_open_driver(&bdrv_test, "test-node-a", BDRV_O_RDWR,
+                                &error_abort);
+    a_s = bs_a->opaque;
+    blk_insert_bs(blk_a, bs_a, &error_abort);
+
+    g_assert_cmpint(bs_a->quiesce_counter, ==, 0);
+    g_assert_cmpint(a_s->drain_count, ==, 0);
+
+    /* Call bdrv_drain_all_begin() */
+    bdrv_drain_all_begin();
+
+    g_assert_cmpint(bs_a->quiesce_counter, ==, 1);
+    g_assert_cmpint(a_s->drain_count, ==, 1);
+
+    /* Create node B with a BlockBackend */
+    blk_b = blk_new(BLK_PERM_ALL, BLK_PERM_ALL);
+    bs_b = bdrv_new_open_driver(&bdrv_test, "test-node-b", BDRV_O_RDWR,
+                                &error_abort);
+    b_s = bs_b->opaque;
+    blk_insert_bs(blk_b, bs_b, &error_abort);
+
+    g_assert_cmpint(bs_a->quiesce_counter, ==, 1);
+    g_assert_cmpint(bs_b->quiesce_counter, ==, 1);
+    g_assert_cmpint(a_s->drain_count, ==, 1);
+    g_assert_cmpint(b_s->drain_count, ==, 1);
+
+    /* Unref and finally delete node A */
+    blk_unref(blk_a);
+
+    g_assert_cmpint(bs_a->quiesce_counter, ==, 1);
+    g_assert_cmpint(bs_b->quiesce_counter, ==, 1);
+    g_assert_cmpint(a_s->drain_count, ==, 1);
+    g_assert_cmpint(b_s->drain_count, ==, 1);
+
+    bdrv_unref(bs_a);
+
+    g_assert_cmpint(bs_b->quiesce_counter, ==, 1);
+    g_assert_cmpint(b_s->drain_count, ==, 1);
+
+    /* End the drained section */
+    bdrv_drain_all_end();
+
+    g_assert_cmpint(bs_b->quiesce_counter, ==, 0);
+    g_assert_cmpint(b_s->drain_count, ==, 0);
+
+    bdrv_unref(bs_b);
+    blk_unref(blk_b);
+}
+
+struct test_iothread_data {
+    BlockDriverState *bs;
+    enum drain_type drain_type;
+    int *aio_ret;
+};
+
+static void test_iothread_drain_entry(void *opaque)
+{
+    struct test_iothread_data *data = opaque;
+
+    aio_context_acquire(bdrv_get_aio_context(data->bs));
+    do_drain_begin(data->drain_type, data->bs);
+    g_assert_cmpint(*data->aio_ret, ==, 0);
+    do_drain_end(data->drain_type, data->bs);
+    aio_context_release(bdrv_get_aio_context(data->bs));
+
+    qemu_event_set(&done_event);
+}
+
+static void test_iothread_aio_cb(void *opaque, int ret)
+{
+    int *aio_ret = opaque;
+    *aio_ret = ret;
+    qemu_event_set(&done_event);
+}
+
+/*
+ * Starts an AIO request on a BDS that runs in the AioContext of iothread 1.
+ * The request involves a BH on iothread 2 before it can complete.
+ *
+ * @drain_thread = 0 means that do_drain_begin/end are called from the main
+ * thread, @drain_thread = 1 means that they are called from iothread 1. Drain
+ * for this BDS cannot be called from iothread 2 because only the main thread
+ * may do cross-AioContext polling.
+ */
+static void test_iothread_common(enum drain_type drain_type, int drain_thread)
+{
+    BlockBackend *blk;
+    BlockDriverState *bs;
+    BDRVTestState *s;
+    BlockAIOCB *acb;
+    int aio_ret;
+    struct test_iothread_data data;
+
+    IOThread *a = iothread_new();
+    IOThread *b = iothread_new();
+    AioContext *ctx_a = iothread_get_aio_context(a);
+    AioContext *ctx_b = iothread_get_aio_context(b);
+
+    QEMUIOVector qiov;
+    struct iovec iov = {
+        .iov_base = NULL,
+        .iov_len = 0,
+    };
+    qemu_iovec_init_external(&qiov, &iov, 1);
+
+    /* bdrv_drain_all() may only be called from the main loop thread */
+    if (drain_type == BDRV_DRAIN_ALL && drain_thread != 0) {
+        goto out;
+    }
+
+    blk = blk_new(BLK_PERM_ALL, BLK_PERM_ALL);
+    bs = bdrv_new_open_driver(&bdrv_test, "test-node", BDRV_O_RDWR,
+                              &error_abort);
+    s = bs->opaque;
+    blk_insert_bs(blk, bs, &error_abort);
+
+    blk_set_aio_context(blk, ctx_a);
+    aio_context_acquire(ctx_a);
+
+    s->bh_indirection_ctx = ctx_b;
+
+    aio_ret = -EINPROGRESS;
+    if (drain_thread == 0) {
+        acb = blk_aio_preadv(blk, 0, &qiov, 0, test_iothread_aio_cb, &aio_ret);
+    } else {
+        acb = blk_aio_preadv(blk, 0, &qiov, 0, aio_ret_cb, &aio_ret);
+    }
+    g_assert(acb != NULL);
+    g_assert_cmpint(aio_ret, ==, -EINPROGRESS);
+
+    aio_context_release(ctx_a);
+
+    data = (struct test_iothread_data) {
+        .bs         = bs,
+        .drain_type = drain_type,
+        .aio_ret    = &aio_ret,
+    };
+
+    switch (drain_thread) {
+    case 0:
+        if (drain_type != BDRV_DRAIN_ALL) {
+            aio_context_acquire(ctx_a);
+        }
+
+        /* The request is running on the IOThread a. Draining its block device
+         * will make sure that it has completed as far as the BDS is concerned,
+         * but the drain in this thread can continue immediately after
+         * bdrv_dec_in_flight() and aio_ret might be assigned only slightly
+         * later. */
+        qemu_event_reset(&done_event);
+        do_drain_begin(drain_type, bs);
+        g_assert_cmpint(bs->in_flight, ==, 0);
+
+        if (drain_type != BDRV_DRAIN_ALL) {
+            aio_context_release(ctx_a);
+        }
+        qemu_event_wait(&done_event);
+        if (drain_type != BDRV_DRAIN_ALL) {
+            aio_context_acquire(ctx_a);
+        }
+
+        g_assert_cmpint(aio_ret, ==, 0);
+        do_drain_end(drain_type, bs);
+
+        if (drain_type != BDRV_DRAIN_ALL) {
+            aio_context_release(ctx_a);
+        }
+        break;
+    case 1:
+        qemu_event_reset(&done_event);
+        aio_bh_schedule_oneshot(ctx_a, test_iothread_drain_entry, &data);
+        qemu_event_wait(&done_event);
+        break;
+    default:
+        g_assert_not_reached();
+    }
+
+    aio_context_acquire(ctx_a);
+    blk_set_aio_context(blk, qemu_get_aio_context());
+    aio_context_release(ctx_a);
+
+    bdrv_unref(bs);
+    blk_unref(blk);
+
+out:
+    iothread_join(a);
+    iothread_join(b);
+}
+
+static void test_iothread_drain_all(void)
+{
+    test_iothread_common(BDRV_DRAIN_ALL, 0);
+    test_iothread_common(BDRV_DRAIN_ALL, 1);
+}
+
+static void test_iothread_drain(void)
+{
+    test_iothread_common(BDRV_DRAIN, 0);
+    test_iothread_common(BDRV_DRAIN, 1);
+}
+
+static void test_iothread_drain_subtree(void)
+{
+    test_iothread_common(BDRV_SUBTREE_DRAIN, 0);
+    test_iothread_common(BDRV_SUBTREE_DRAIN, 1);
+}
+
 
 typedef struct TestBlockJob {
     BlockJob common;
@@ -507,7 +763,11 @@ static void coroutine_fn test_job_start(void *opaque)
 
     job_transition_to_ready(&s->common.job);
     while (!s->should_complete) {
-        job_sleep_ns(&s->common.job, 100000);
+        /* Avoid block_job_sleep_ns() because it marks the job as !busy. We
+         * want to emulate some actual activity (probably some I/O) here so
+         * that drain has to wait for this acitivity to stop. */
+        qemu_co_sleep_ns(QEMU_CLOCK_REALTIME, 100000);
+        job_pause_point(&s->common.job);
     }
 
     job_defer_to_main_loop(&s->common.job, test_job_completed, NULL);
@@ -554,7 +814,7 @@ static void test_blockjob_common(enum drain_type drain_type)
 
     g_assert_cmpint(job->job.pause_count, ==, 0);
     g_assert_false(job->job.paused);
-    g_assert_false(job->job.busy); /* We're in job_sleep_ns() */
+    g_assert_true(job->job.busy); /* We're in job_sleep_ns() */
 
     do_drain_begin(drain_type, src);
 
@@ -564,15 +824,14 @@ static void test_blockjob_common(enum drain_type drain_type)
     } else {
         g_assert_cmpint(job->job.pause_count, ==, 1);
     }
-    /* XXX We don't wait until the job is actually paused. Is this okay? */
-    /* g_assert_true(job->job.paused); */
+    g_assert_true(job->job.paused);
     g_assert_false(job->job.busy); /* The job is paused */
 
     do_drain_end(drain_type, src);
 
     g_assert_cmpint(job->job.pause_count, ==, 0);
     g_assert_false(job->job.paused);
-    g_assert_false(job->job.busy); /* We're in job_sleep_ns() */
+    g_assert_true(job->job.busy); /* We're in qemu_co_sleep_ns() */
 
     do_drain_begin(drain_type, target);
 
@@ -582,15 +841,14 @@ static void test_blockjob_common(enum drain_type drain_type)
     } else {
         g_assert_cmpint(job->job.pause_count, ==, 1);
     }
-    /* XXX We don't wait until the job is actually paused. Is this okay? */
-    /* g_assert_true(job->job.paused); */
+    g_assert_true(job->job.paused);
     g_assert_false(job->job.busy); /* The job is paused */
 
     do_drain_end(drain_type, target);
 
     g_assert_cmpint(job->job.pause_count, ==, 0);
     g_assert_false(job->job.paused);
-    g_assert_false(job->job.busy); /* We're in job_sleep_ns() */
+    g_assert_true(job->job.busy); /* We're in job_sleep_ns() */
 
     ret = job_complete_sync(&job->job, &error_abort);
     g_assert_cmpint(ret, ==, 0);
@@ -616,19 +874,399 @@ static void test_blockjob_drain_subtree(void)
     test_blockjob_common(BDRV_SUBTREE_DRAIN);
 }
 
+
+typedef struct BDRVTestTopState {
+    BdrvChild *wait_child;
+} BDRVTestTopState;
+
+static void bdrv_test_top_close(BlockDriverState *bs)
+{
+    BdrvChild *c, *next_c;
+    QLIST_FOREACH_SAFE(c, &bs->children, next, next_c) {
+        bdrv_unref_child(bs, c);
+    }
+}
+
+static int coroutine_fn bdrv_test_top_co_preadv(BlockDriverState *bs,
+                                                uint64_t offset, uint64_t bytes,
+                                                QEMUIOVector *qiov, int flags)
+{
+    BDRVTestTopState *tts = bs->opaque;
+    return bdrv_co_preadv(tts->wait_child, offset, bytes, qiov, flags);
+}
+
+static BlockDriver bdrv_test_top_driver = {
+    .format_name            = "test_top_driver",
+    .instance_size          = sizeof(BDRVTestTopState),
+
+    .bdrv_close             = bdrv_test_top_close,
+    .bdrv_co_preadv         = bdrv_test_top_co_preadv,
+
+    .bdrv_child_perm        = bdrv_format_default_perms,
+};
+
+typedef struct TestCoDeleteByDrainData {
+    BlockBackend *blk;
+    bool detach_instead_of_delete;
+    bool done;
+} TestCoDeleteByDrainData;
+
+static void coroutine_fn test_co_delete_by_drain(void *opaque)
+{
+    TestCoDeleteByDrainData *dbdd = opaque;
+    BlockBackend *blk = dbdd->blk;
+    BlockDriverState *bs = blk_bs(blk);
+    BDRVTestTopState *tts = bs->opaque;
+    void *buffer = g_malloc(65536);
+    QEMUIOVector qiov;
+    struct iovec iov = {
+        .iov_base = buffer,
+        .iov_len  = 65536,
+    };
+
+    qemu_iovec_init_external(&qiov, &iov, 1);
+
+    /* Pretend some internal write operation from parent to child.
+     * Important: We have to read from the child, not from the parent!
+     * Draining works by first propagating it all up the tree to the
+     * root and then waiting for drainage from root to the leaves
+     * (protocol nodes).  If we have a request waiting on the root,
+     * everything will be drained before we go back down the tree, but
+     * we do not want that.  We want to be in the middle of draining
+     * when this following requests returns. */
+    bdrv_co_preadv(tts->wait_child, 0, 65536, &qiov, 0);
+
+    g_assert_cmpint(bs->refcnt, ==, 1);
+
+    if (!dbdd->detach_instead_of_delete) {
+        blk_unref(blk);
+    } else {
+        BdrvChild *c, *next_c;
+        QLIST_FOREACH_SAFE(c, &bs->children, next, next_c) {
+            bdrv_unref_child(bs, c);
+        }
+    }
+
+    dbdd->done = true;
+}
+
+/**
+ * Test what happens when some BDS has some children, you drain one of
+ * them and this results in the BDS being deleted.
+ *
+ * If @detach_instead_of_delete is set, the BDS is not going to be
+ * deleted but will only detach all of its children.
+ */
+static void do_test_delete_by_drain(bool detach_instead_of_delete,
+                                    enum drain_type drain_type)
+{
+    BlockBackend *blk;
+    BlockDriverState *bs, *child_bs, *null_bs;
+    BDRVTestTopState *tts;
+    TestCoDeleteByDrainData dbdd;
+    Coroutine *co;
+
+    bs = bdrv_new_open_driver(&bdrv_test_top_driver, "top", BDRV_O_RDWR,
+                              &error_abort);
+    bs->total_sectors = 65536 >> BDRV_SECTOR_BITS;
+    tts = bs->opaque;
+
+    null_bs = bdrv_open("null-co://", NULL, NULL, BDRV_O_RDWR | BDRV_O_PROTOCOL,
+                        &error_abort);
+    bdrv_attach_child(bs, null_bs, "null-child", &child_file, &error_abort);
+
+    /* This child will be the one to pass to requests through to, and
+     * it will stall until a drain occurs */
+    child_bs = bdrv_new_open_driver(&bdrv_test, "child", BDRV_O_RDWR,
+                                    &error_abort);
+    child_bs->total_sectors = 65536 >> BDRV_SECTOR_BITS;
+    /* Takes our reference to child_bs */
+    tts->wait_child = bdrv_attach_child(bs, child_bs, "wait-child", &child_file,
+                                        &error_abort);
+
+    /* This child is just there to be deleted
+     * (for detach_instead_of_delete == true) */
+    null_bs = bdrv_open("null-co://", NULL, NULL, BDRV_O_RDWR | BDRV_O_PROTOCOL,
+                        &error_abort);
+    bdrv_attach_child(bs, null_bs, "null-child", &child_file, &error_abort);
+
+    blk = blk_new(BLK_PERM_ALL, BLK_PERM_ALL);
+    blk_insert_bs(blk, bs, &error_abort);
+
+    /* Referenced by blk now */
+    bdrv_unref(bs);
+
+    g_assert_cmpint(bs->refcnt, ==, 1);
+    g_assert_cmpint(child_bs->refcnt, ==, 1);
+    g_assert_cmpint(null_bs->refcnt, ==, 1);
+
+
+    dbdd = (TestCoDeleteByDrainData){
+        .blk = blk,
+        .detach_instead_of_delete = detach_instead_of_delete,
+        .done = false,
+    };
+    co = qemu_coroutine_create(test_co_delete_by_drain, &dbdd);
+    qemu_coroutine_enter(co);
+
+    /* Drain the child while the read operation is still pending.
+     * This should result in the operation finishing and
+     * test_co_delete_by_drain() resuming.  Thus, @bs will be deleted
+     * and the coroutine will exit while this drain operation is still
+     * in progress. */
+    switch (drain_type) {
+    case BDRV_DRAIN:
+        bdrv_ref(child_bs);
+        bdrv_drain(child_bs);
+        bdrv_unref(child_bs);
+        break;
+    case BDRV_SUBTREE_DRAIN:
+        /* Would have to ref/unref bs here for !detach_instead_of_delete, but
+         * then the whole test becomes pointless because the graph changes
+         * don't occur during the drain any more. */
+        assert(detach_instead_of_delete);
+        bdrv_subtree_drained_begin(bs);
+        bdrv_subtree_drained_end(bs);
+        break;
+    case BDRV_DRAIN_ALL:
+        bdrv_drain_all_begin();
+        bdrv_drain_all_end();
+        break;
+    default:
+        g_assert_not_reached();
+    }
+
+    while (!dbdd.done) {
+        aio_poll(qemu_get_aio_context(), true);
+    }
+
+    if (detach_instead_of_delete) {
+        /* Here, the reference has not passed over to the coroutine,
+         * so we have to delete the BB ourselves */
+        blk_unref(blk);
+    }
+}
+
+static void test_delete_by_drain(void)
+{
+    do_test_delete_by_drain(false, BDRV_DRAIN);
+}
+
+static void test_detach_by_drain_all(void)
+{
+    do_test_delete_by_drain(true, BDRV_DRAIN_ALL);
+}
+
+static void test_detach_by_drain(void)
+{
+    do_test_delete_by_drain(true, BDRV_DRAIN);
+}
+
+static void test_detach_by_drain_subtree(void)
+{
+    do_test_delete_by_drain(true, BDRV_SUBTREE_DRAIN);
+}
+
+
+struct detach_by_parent_data {
+    BlockDriverState *parent_b;
+    BdrvChild *child_b;
+    BlockDriverState *c;
+    BdrvChild *child_c;
+    bool by_parent_cb;
+};
+static struct detach_by_parent_data detach_by_parent_data;
+
+static void detach_indirect_bh(void *opaque)
+{
+    struct detach_by_parent_data *data = opaque;
+
+    bdrv_unref_child(data->parent_b, data->child_b);
+
+    bdrv_ref(data->c);
+    data->child_c = bdrv_attach_child(data->parent_b, data->c, "PB-C",
+                                      &child_file, &error_abort);
+}
+
+static void detach_by_parent_aio_cb(void *opaque, int ret)
+{
+    struct detach_by_parent_data *data = &detach_by_parent_data;
+
+    g_assert_cmpint(ret, ==, 0);
+    if (data->by_parent_cb) {
+        detach_indirect_bh(data);
+    }
+}
+
+static void detach_by_driver_cb_drained_begin(BdrvChild *child)
+{
+    aio_bh_schedule_oneshot(qemu_get_current_aio_context(),
+                            detach_indirect_bh, &detach_by_parent_data);
+    child_file.drained_begin(child);
+}
+
+static BdrvChildRole detach_by_driver_cb_role;
+
+/*
+ * Initial graph:
+ *
+ * PA     PB
+ *    \ /   \
+ *     A     B     C
+ *
+ * by_parent_cb == true:  Test that parent callbacks don't poll
+ *
+ *     PA has a pending write request whose callback changes the child nodes of
+ *     PB: It removes B and adds C instead. The subtree of PB is drained, which
+ *     will indirectly drain the write request, too.
+ *
+ * by_parent_cb == false: Test that bdrv_drain_invoke() doesn't poll
+ *
+ *     PA's BdrvChildRole has a .drained_begin callback that schedules a BH
+ *     that does the same graph change. If bdrv_drain_invoke() calls it, the
+ *     state is messed up, but if it is only polled in the single
+ *     BDRV_POLL_WHILE() at the end of the drain, this should work fine.
+ */
+static void test_detach_indirect(bool by_parent_cb)
+{
+    BlockBackend *blk;
+    BlockDriverState *parent_a, *parent_b, *a, *b, *c;
+    BdrvChild *child_a, *child_b;
+    BlockAIOCB *acb;
+
+    QEMUIOVector qiov;
+    struct iovec iov = {
+        .iov_base = NULL,
+        .iov_len = 0,
+    };
+    qemu_iovec_init_external(&qiov, &iov, 1);
+
+    if (!by_parent_cb) {
+        detach_by_driver_cb_role = child_file;
+        detach_by_driver_cb_role.drained_begin =
+            detach_by_driver_cb_drained_begin;
+    }
+
+    /* Create all involved nodes */
+    parent_a = bdrv_new_open_driver(&bdrv_test, "parent-a", BDRV_O_RDWR,
+                                    &error_abort);
+    parent_b = bdrv_new_open_driver(&bdrv_test, "parent-b", 0,
+                                    &error_abort);
+
+    a = bdrv_new_open_driver(&bdrv_test, "a", BDRV_O_RDWR, &error_abort);
+    b = bdrv_new_open_driver(&bdrv_test, "b", BDRV_O_RDWR, &error_abort);
+    c = bdrv_new_open_driver(&bdrv_test, "c", BDRV_O_RDWR, &error_abort);
+
+    /* blk is a BB for parent-a */
+    blk = blk_new(BLK_PERM_ALL, BLK_PERM_ALL);
+    blk_insert_bs(blk, parent_a, &error_abort);
+    bdrv_unref(parent_a);
+
+    /* If we want to get bdrv_drain_invoke() to call aio_poll(), the driver
+     * callback must not return immediately. */
+    if (!by_parent_cb) {
+        BDRVTestState *s = parent_a->opaque;
+        s->sleep_in_drain_begin = true;
+    }
+
+    /* Set child relationships */
+    bdrv_ref(b);
+    bdrv_ref(a);
+    child_b = bdrv_attach_child(parent_b, b, "PB-B", &child_file, &error_abort);
+    child_a = bdrv_attach_child(parent_b, a, "PB-A", &child_backing, &error_abort);
+
+    bdrv_ref(a);
+    bdrv_attach_child(parent_a, a, "PA-A",
+                      by_parent_cb ? &child_file : &detach_by_driver_cb_role,
+                      &error_abort);
+
+    g_assert_cmpint(parent_a->refcnt, ==, 1);
+    g_assert_cmpint(parent_b->refcnt, ==, 1);
+    g_assert_cmpint(a->refcnt, ==, 3);
+    g_assert_cmpint(b->refcnt, ==, 2);
+    g_assert_cmpint(c->refcnt, ==, 1);
+
+    g_assert(QLIST_FIRST(&parent_b->children) == child_a);
+    g_assert(QLIST_NEXT(child_a, next) == child_b);
+    g_assert(QLIST_NEXT(child_b, next) == NULL);
+
+    /* Start the evil write request */
+    detach_by_parent_data = (struct detach_by_parent_data) {
+        .parent_b = parent_b,
+        .child_b = child_b,
+        .c = c,
+        .by_parent_cb = by_parent_cb,
+    };
+    acb = blk_aio_preadv(blk, 0, &qiov, 0, detach_by_parent_aio_cb, NULL);
+    g_assert(acb != NULL);
+
+    /* Drain and check the expected result */
+    bdrv_subtree_drained_begin(parent_b);
+
+    g_assert(detach_by_parent_data.child_c != NULL);
+
+    g_assert_cmpint(parent_a->refcnt, ==, 1);
+    g_assert_cmpint(parent_b->refcnt, ==, 1);
+    g_assert_cmpint(a->refcnt, ==, 3);
+    g_assert_cmpint(b->refcnt, ==, 1);
+    g_assert_cmpint(c->refcnt, ==, 2);
+
+    g_assert(QLIST_FIRST(&parent_b->children) == detach_by_parent_data.child_c);
+    g_assert(QLIST_NEXT(detach_by_parent_data.child_c, next) == child_a);
+    g_assert(QLIST_NEXT(child_a, next) == NULL);
+
+    g_assert_cmpint(parent_a->quiesce_counter, ==, 1);
+    g_assert_cmpint(parent_b->quiesce_counter, ==, 1);
+    g_assert_cmpint(a->quiesce_counter, ==, 1);
+    g_assert_cmpint(b->quiesce_counter, ==, 0);
+    g_assert_cmpint(c->quiesce_counter, ==, 1);
+
+    bdrv_subtree_drained_end(parent_b);
+
+    bdrv_unref(parent_b);
+    blk_unref(blk);
+
+    /* XXX Once bdrv_close() unref's children instead of just detaching them,
+     * this won't be necessary any more. */
+    bdrv_unref(a);
+    bdrv_unref(a);
+    bdrv_unref(c);
+
+    g_assert_cmpint(a->refcnt, ==, 1);
+    g_assert_cmpint(b->refcnt, ==, 1);
+    g_assert_cmpint(c->refcnt, ==, 1);
+    bdrv_unref(a);
+    bdrv_unref(b);
+    bdrv_unref(c);
+}
+
+static void test_detach_by_parent_cb(void)
+{
+    test_detach_indirect(true);
+}
+
+static void test_detach_by_driver_cb(void)
+{
+    test_detach_indirect(false);
+}
+
 int main(int argc, char **argv)
 {
+    int ret;
+
     bdrv_init();
     qemu_init_main_loop(&error_abort);
 
     g_test_init(&argc, &argv, NULL);
+    qemu_event_init(&done_event, false);
 
     g_test_add_func("/bdrv-drain/driver-cb/drain_all", test_drv_cb_drain_all);
     g_test_add_func("/bdrv-drain/driver-cb/drain", test_drv_cb_drain);
     g_test_add_func("/bdrv-drain/driver-cb/drain_subtree",
                     test_drv_cb_drain_subtree);
 
-    // XXX bdrv_drain_all() doesn't work in coroutine context
+    g_test_add_func("/bdrv-drain/driver-cb/co/drain_all",
+                    test_drv_cb_co_drain_all);
     g_test_add_func("/bdrv-drain/driver-cb/co/drain", test_drv_cb_co_drain);
     g_test_add_func("/bdrv-drain/driver-cb/co/drain_subtree",
                     test_drv_cb_co_drain_subtree);
@@ -639,19 +1277,38 @@ int main(int argc, char **argv)
     g_test_add_func("/bdrv-drain/quiesce/drain_subtree",
                     test_quiesce_drain_subtree);
 
-    // XXX bdrv_drain_all() doesn't work in coroutine context
+    g_test_add_func("/bdrv-drain/quiesce/co/drain_all",
+                    test_quiesce_co_drain_all);
     g_test_add_func("/bdrv-drain/quiesce/co/drain", test_quiesce_co_drain);
     g_test_add_func("/bdrv-drain/quiesce/co/drain_subtree",
                     test_quiesce_co_drain_subtree);
 
     g_test_add_func("/bdrv-drain/nested", test_nested);
     g_test_add_func("/bdrv-drain/multiparent", test_multiparent);
-    g_test_add_func("/bdrv-drain/graph-change", test_graph_change);
+
+    g_test_add_func("/bdrv-drain/graph-change/drain_subtree",
+                    test_graph_change_drain_subtree);
+    g_test_add_func("/bdrv-drain/graph-change/drain_all",
+                    test_graph_change_drain_all);
+
+    g_test_add_func("/bdrv-drain/iothread/drain_all", test_iothread_drain_all);
+    g_test_add_func("/bdrv-drain/iothread/drain", test_iothread_drain);
+    g_test_add_func("/bdrv-drain/iothread/drain_subtree",
+                    test_iothread_drain_subtree);
 
     g_test_add_func("/bdrv-drain/blockjob/drain_all", test_blockjob_drain_all);
     g_test_add_func("/bdrv-drain/blockjob/drain", test_blockjob_drain);
     g_test_add_func("/bdrv-drain/blockjob/drain_subtree",
                     test_blockjob_drain_subtree);
 
-    return g_test_run();
+    g_test_add_func("/bdrv-drain/deletion/drain", test_delete_by_drain);
+    g_test_add_func("/bdrv-drain/detach/drain_all", test_detach_by_drain_all);
+    g_test_add_func("/bdrv-drain/detach/drain", test_detach_by_drain);
+    g_test_add_func("/bdrv-drain/detach/drain_subtree", test_detach_by_drain_subtree);
+    g_test_add_func("/bdrv-drain/detach/parent_cb", test_detach_by_parent_cb);
+    g_test_add_func("/bdrv-drain/detach/driver_cb", test_detach_by_driver_cb);
+
+    ret = g_test_run();
+    qemu_event_destroy(&done_event);
+    return ret;
 }
diff --git a/tests/test-hbitmap.c b/tests/test-hbitmap.c
index f29631f939..5e67ac1d3a 100644
--- a/tests/test-hbitmap.c
+++ b/tests/test-hbitmap.c
@@ -30,6 +30,18 @@ typedef struct TestHBitmapData {
 } TestHBitmapData;
 
 
+static int64_t check_hbitmap_iter_next(HBitmapIter *hbi)
+{
+    int next0, next1;
+
+    next0 = hbitmap_iter_next(hbi, false);
+    next1 = hbitmap_iter_next(hbi, true);
+
+    g_assert_cmpint(next0, ==, next1);
+
+    return next0;
+}
+
 /* Check that the HBitmap and the shadow bitmap contain the same data,
  * ignoring the same "first" bits.
  */
@@ -46,7 +58,7 @@ static void hbitmap_test_check(TestHBitmapData *data,
 
     i = first;
     for (;;) {
-        next = hbitmap_iter_next(&hbi);
+        next = check_hbitmap_iter_next(&hbi);
         if (next < 0) {
             next = data->size;
         }
@@ -435,25 +447,25 @@ static void test_hbitmap_iter_granularity(TestHBitmapData *data,
     /* Note that hbitmap_test_check has to be invoked manually in this test.  */
     hbitmap_test_init(data, 131072 << 7, 7);
     hbitmap_iter_init(&hbi, data->hb, 0);
-    g_assert_cmpint(hbitmap_iter_next(&hbi), <, 0);
+    g_assert_cmpint(check_hbitmap_iter_next(&hbi), <, 0);
 
     hbitmap_test_set(data, ((L2 + L1 + 1) << 7) + 8, 8);
     hbitmap_iter_init(&hbi, data->hb, 0);
-    g_assert_cmpint(hbitmap_iter_next(&hbi), ==, (L2 + L1 + 1) << 7);
-    g_assert_cmpint(hbitmap_iter_next(&hbi), <, 0);
+    g_assert_cmpint(check_hbitmap_iter_next(&hbi), ==, (L2 + L1 + 1) << 7);
+    g_assert_cmpint(check_hbitmap_iter_next(&hbi), <, 0);
 
     hbitmap_iter_init(&hbi, data->hb, (L2 + L1 + 2) << 7);
-    g_assert_cmpint(hbitmap_iter_next(&hbi), <, 0);
+    g_assert_cmpint(hbitmap_iter_next(&hbi, true), <, 0);
 
     hbitmap_test_set(data, (131072 << 7) - 8, 8);
     hbitmap_iter_init(&hbi, data->hb, 0);
-    g_assert_cmpint(hbitmap_iter_next(&hbi), ==, (L2 + L1 + 1) << 7);
-    g_assert_cmpint(hbitmap_iter_next(&hbi), ==, 131071 << 7);
-    g_assert_cmpint(hbitmap_iter_next(&hbi), <, 0);
+    g_assert_cmpint(check_hbitmap_iter_next(&hbi), ==, (L2 + L1 + 1) << 7);
+    g_assert_cmpint(check_hbitmap_iter_next(&hbi), ==, 131071 << 7);
+    g_assert_cmpint(check_hbitmap_iter_next(&hbi), <, 0);
 
     hbitmap_iter_init(&hbi, data->hb, (L2 + L1 + 2) << 7);
-    g_assert_cmpint(hbitmap_iter_next(&hbi), ==, 131071 << 7);
-    g_assert_cmpint(hbitmap_iter_next(&hbi), <, 0);
+    g_assert_cmpint(check_hbitmap_iter_next(&hbi), ==, 131071 << 7);
+    g_assert_cmpint(check_hbitmap_iter_next(&hbi), <, 0);
 }
 
 static void hbitmap_test_set_boundary_bits(TestHBitmapData *data, ssize_t diff)
@@ -893,7 +905,7 @@ static void test_hbitmap_serialize_zeroes(TestHBitmapData *data,
     for (i = 0; i < num_positions; i++) {
         hbitmap_deserialize_zeroes(data->hb, positions[i], min_l1, true);
         hbitmap_iter_init(&iter, data->hb, 0);
-        next = hbitmap_iter_next(&iter);
+        next = check_hbitmap_iter_next(&iter);
         if (i == num_positions - 1) {
             g_assert_cmpint(next, ==, -1);
         } else {
@@ -919,10 +931,10 @@ static void test_hbitmap_iter_and_reset(TestHBitmapData *data,
 
     hbitmap_iter_init(&hbi, data->hb, BITS_PER_LONG - 1);
 
-    hbitmap_iter_next(&hbi);
+    check_hbitmap_iter_next(&hbi);
 
     hbitmap_reset_all(data->hb);
-    hbitmap_iter_next(&hbi);
+    check_hbitmap_iter_next(&hbi);
 }
 
 static void test_hbitmap_next_zero_check(TestHBitmapData *data, int64_t start)
diff --git a/util/hbitmap.c b/util/hbitmap.c
index 58a2c93842..bcd304041a 100644
--- a/util/hbitmap.c
+++ b/util/hbitmap.c
@@ -141,7 +141,7 @@ unsigned long hbitmap_iter_skip_words(HBitmapIter *hbi)
     return cur;
 }
 
-int64_t hbitmap_iter_next(HBitmapIter *hbi)
+int64_t hbitmap_iter_next(HBitmapIter *hbi, bool advance)
 {
     unsigned long cur = hbi->cur[HBITMAP_LEVELS - 1] &
             hbi->hb->levels[HBITMAP_LEVELS - 1][hbi->pos];
@@ -154,8 +154,12 @@ int64_t hbitmap_iter_next(HBitmapIter *hbi)
         }
     }
 
-    /* The next call will resume work from the next bit.  */
-    hbi->cur[HBITMAP_LEVELS - 1] = cur & (cur - 1);
+    if (advance) {
+        /* The next call will resume work from the next bit.  */
+        hbi->cur[HBITMAP_LEVELS - 1] = cur & (cur - 1);
+    } else {
+        hbi->cur[HBITMAP_LEVELS - 1] = cur;
+    }
     item = ((uint64_t)hbi->pos << BITS_PER_LEVEL) + ctzl(cur);
 
     return item << hbi->granularity;