summary refs log tree commit diff stats
path: root/block
diff options
context:
space:
mode:
Diffstat (limited to 'block')
-rw-r--r--block/block-backend.c25
-rw-r--r--block/block-copy.c21
-rw-r--r--block/block-gen.h11
-rw-r--r--block/commit.c4
-rw-r--r--block/coroutines.h21
-rw-r--r--block/crypto.c2
-rw-r--r--block/dirty-bitmap.c88
-rw-r--r--block/graph-lock.c275
-rw-r--r--block/io.c367
-rw-r--r--block/meson.build2
-rw-r--r--block/parallels.c2
-rw-r--r--block/qcow.c2
-rw-r--r--block/qcow2.c4
-rw-r--r--block/qed.c28
-rw-r--r--block/raw-format.c2
-rw-r--r--block/replication.c6
-rw-r--r--block/stream.c26
-rw-r--r--block/throttle.c8
-rw-r--r--block/vdi.c2
-rw-r--r--block/vhdx.c2
-rw-r--r--block/vmdk.c38
-rw-r--r--block/vpc.c2
22 files changed, 527 insertions, 411 deletions
diff --git a/block/block-backend.c b/block/block-backend.c
index bf0ea3cfed..ba7bf1d6bc 100644
--- a/block/block-backend.c
+++ b/block/block-backend.c
@@ -129,7 +129,7 @@ static void blk_root_inherit_options(BdrvChildRole role, bool parent_is_format,
 }
 static void blk_root_drained_begin(BdrvChild *child);
 static bool blk_root_drained_poll(BdrvChild *child);
-static void blk_root_drained_end(BdrvChild *child, int *drained_end_counter);
+static void blk_root_drained_end(BdrvChild *child);
 
 static void blk_root_change_media(BdrvChild *child, bool load);
 static void blk_root_resize(BdrvChild *child);
@@ -1424,6 +1424,27 @@ int coroutine_fn blk_co_pwritev(BlockBackend *blk, int64_t offset,
     return blk_co_pwritev_part(blk, offset, bytes, qiov, 0, flags);
 }
 
+int coroutine_fn blk_co_block_status_above(BlockBackend *blk,
+                                           BlockDriverState *base,
+                                           int64_t offset, int64_t bytes,
+                                           int64_t *pnum, int64_t *map,
+                                           BlockDriverState **file)
+{
+    IO_CODE();
+    return bdrv_co_block_status_above(blk_bs(blk), base, offset, bytes, pnum,
+                                      map, file);
+}
+
+int coroutine_fn blk_co_is_allocated_above(BlockBackend *blk,
+                                           BlockDriverState *base,
+                                           bool include_base, int64_t offset,
+                                           int64_t bytes, int64_t *pnum)
+{
+    IO_CODE();
+    return bdrv_co_is_allocated_above(blk_bs(blk), base, include_base, offset,
+                                      bytes, pnum);
+}
+
 typedef struct BlkRwCo {
     BlockBackend *blk;
     int64_t offset;
@@ -2556,7 +2577,7 @@ static bool blk_root_drained_poll(BdrvChild *child)
     return busy || !!blk->in_flight;
 }
 
-static void blk_root_drained_end(BdrvChild *child, int *drained_end_counter)
+static void blk_root_drained_end(BdrvChild *child)
 {
     BlockBackend *blk = child->opaque;
     assert(blk->quiesce_counter);
diff --git a/block/block-copy.c b/block/block-copy.c
index bb947afdda..5e59d6262f 100644
--- a/block/block-copy.c
+++ b/block/block-copy.c
@@ -577,8 +577,9 @@ static coroutine_fn int block_copy_task_entry(AioTask *task)
     return ret;
 }
 
-static int block_copy_block_status(BlockCopyState *s, int64_t offset,
-                                   int64_t bytes, int64_t *pnum)
+static coroutine_fn int block_copy_block_status(BlockCopyState *s,
+                                                int64_t offset,
+                                                int64_t bytes, int64_t *pnum)
 {
     int64_t num;
     BlockDriverState *base;
@@ -590,8 +591,8 @@ static int block_copy_block_status(BlockCopyState *s, int64_t offset,
         base = NULL;
     }
 
-    ret = bdrv_block_status_above(s->source->bs, base, offset, bytes, &num,
-                                  NULL, NULL);
+    ret = bdrv_co_block_status_above(s->source->bs, base, offset, bytes, &num,
+                                     NULL, NULL);
     if (ret < 0 || num < s->cluster_size) {
         /*
          * On error or if failed to obtain large enough chunk just fallback to
@@ -613,8 +614,9 @@ static int block_copy_block_status(BlockCopyState *s, int64_t offset,
  * Check if the cluster starting at offset is allocated or not.
  * return via pnum the number of contiguous clusters sharing this allocation.
  */
-static int block_copy_is_cluster_allocated(BlockCopyState *s, int64_t offset,
-                                           int64_t *pnum)
+static int coroutine_fn block_copy_is_cluster_allocated(BlockCopyState *s,
+                                                        int64_t offset,
+                                                        int64_t *pnum)
 {
     BlockDriverState *bs = s->source->bs;
     int64_t count, total_count = 0;
@@ -624,7 +626,7 @@ static int block_copy_is_cluster_allocated(BlockCopyState *s, int64_t offset,
     assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
 
     while (true) {
-        ret = bdrv_is_allocated(bs, offset, bytes, &count);
+        ret = bdrv_co_is_allocated(bs, offset, bytes, &count);
         if (ret < 0) {
             return ret;
         }
@@ -669,8 +671,9 @@ void block_copy_reset(BlockCopyState *s, int64_t offset, int64_t bytes)
  * @return 0 when the cluster at @offset was unallocated,
  *         1 otherwise, and -ret on error.
  */
-int64_t block_copy_reset_unallocated(BlockCopyState *s,
-                                     int64_t offset, int64_t *count)
+int64_t coroutine_fn block_copy_reset_unallocated(BlockCopyState *s,
+                                                  int64_t offset,
+                                                  int64_t *count)
 {
     int ret;
     int64_t clusters, bytes;
diff --git a/block/block-gen.h b/block/block-gen.h
index f80cf4897d..89b7daaa1f 100644
--- a/block/block-gen.h
+++ b/block/block-gen.h
@@ -30,20 +30,17 @@
 
 /* Base structure for argument packing structures */
 typedef struct BdrvPollCo {
-    BlockDriverState *bs;
+    AioContext *ctx;
     bool in_progress;
-    int ret;
     Coroutine *co; /* Keep pointer here for debugging */
 } BdrvPollCo;
 
-static inline int bdrv_poll_co(BdrvPollCo *s)
+static inline void bdrv_poll_co(BdrvPollCo *s)
 {
     assert(!qemu_in_coroutine());
 
-    bdrv_coroutine_enter(s->bs, s->co);
-    BDRV_POLL_WHILE(s->bs, s->in_progress);
-
-    return s->ret;
+    aio_co_enter(s->ctx, s->co);
+    AIO_WAIT_WHILE(s->ctx, s->in_progress);
 }
 
 #endif /* BLOCK_BLOCK_GEN_H */
diff --git a/block/commit.c b/block/commit.c
index 0029b31944..b346341767 100644
--- a/block/commit.c
+++ b/block/commit.c
@@ -155,8 +155,8 @@ static int coroutine_fn commit_run(Job *job, Error **errp)
             break;
         }
         /* Copy if allocated above the base */
-        ret = bdrv_is_allocated_above(blk_bs(s->top), s->base_overlay, true,
-                                      offset, COMMIT_BUFFER_SIZE, &n);
+        ret = blk_co_is_allocated_above(s->top, s->base_overlay, true,
+                                        offset, COMMIT_BUFFER_SIZE, &n);
         copy = (ret > 0);
         trace_commit_one_iteration(s, offset, n, ret);
         if (copy) {
diff --git a/block/coroutines.h b/block/coroutines.h
index 3a2bad564f..2a1e0b3c9d 100644
--- a/block/coroutines.h
+++ b/block/coroutines.h
@@ -37,9 +37,11 @@
  * the I/O API.
  */
 
-int coroutine_fn bdrv_co_check(BlockDriverState *bs,
-                               BdrvCheckResult *res, BdrvCheckMode fix);
-int coroutine_fn bdrv_co_invalidate_cache(BlockDriverState *bs, Error **errp);
+int coroutine_fn GRAPH_RDLOCK
+bdrv_co_check(BlockDriverState *bs, BdrvCheckResult *res, BdrvCheckMode fix);
+
+int coroutine_fn GRAPH_RDLOCK
+bdrv_co_invalidate_cache(BlockDriverState *bs, Error **errp);
 
 int coroutine_fn
 bdrv_co_common_block_status_above(BlockDriverState *bs,
@@ -53,10 +55,11 @@ bdrv_co_common_block_status_above(BlockDriverState *bs,
                                   BlockDriverState **file,
                                   int *depth);
 
-int coroutine_fn bdrv_co_readv_vmstate(BlockDriverState *bs,
-                                       QEMUIOVector *qiov, int64_t pos);
-int coroutine_fn bdrv_co_writev_vmstate(BlockDriverState *bs,
-                                        QEMUIOVector *qiov, int64_t pos);
+int coroutine_fn GRAPH_RDLOCK
+bdrv_co_readv_vmstate(BlockDriverState *bs, QEMUIOVector *qiov, int64_t pos);
+
+int coroutine_fn GRAPH_RDLOCK
+bdrv_co_writev_vmstate(BlockDriverState *bs, QEMUIOVector *qiov, int64_t pos);
 
 int coroutine_fn
 nbd_co_do_establish_connection(BlockDriverState *bs, bool blocking,
@@ -71,7 +74,7 @@ nbd_co_do_establish_connection(BlockDriverState *bs, bool blocking,
  * the "I/O or GS" API.
  */
 
-int generated_co_wrapper
+int co_wrapper_mixed_bdrv_rdlock
 bdrv_common_block_status_above(BlockDriverState *bs,
                                BlockDriverState *base,
                                bool include_base,
@@ -82,7 +85,7 @@ bdrv_common_block_status_above(BlockDriverState *bs,
                                int64_t *map,
                                BlockDriverState **file,
                                int *depth);
-int generated_co_wrapper
+int co_wrapper_mixed
 nbd_do_establish_connection(BlockDriverState *bs, bool blocking, Error **errp);
 
 #endif /* BLOCK_COROUTINES_H */
diff --git a/block/crypto.c b/block/crypto.c
index 2fb8add458..bbeb9f437c 100644
--- a/block/crypto.c
+++ b/block/crypto.c
@@ -703,7 +703,7 @@ static int coroutine_fn block_crypto_co_create_opts_luks(BlockDriver *drv,
     }
 
     /* Create protocol layer */
-    ret = bdrv_create_file(filename, opts, errp);
+    ret = bdrv_co_create_file(filename, opts, errp);
     if (ret < 0) {
         goto fail;
     }
diff --git a/block/dirty-bitmap.c b/block/dirty-bitmap.c
index 9c39550698..956feeb2ae 100644
--- a/block/dirty-bitmap.c
+++ b/block/dirty-bitmap.c
@@ -388,7 +388,7 @@ void bdrv_release_named_dirty_bitmaps(BlockDriverState *bs)
  * not fail.
  * This function doesn't release corresponding BdrvDirtyBitmap.
  */
-static int coroutine_fn
+int coroutine_fn
 bdrv_co_remove_persistent_dirty_bitmap(BlockDriverState *bs, const char *name,
                                        Error **errp)
 {
@@ -399,45 +399,6 @@ bdrv_co_remove_persistent_dirty_bitmap(BlockDriverState *bs, const char *name,
     return 0;
 }
 
-typedef struct BdrvRemovePersistentDirtyBitmapCo {
-    BlockDriverState *bs;
-    const char *name;
-    Error **errp;
-    int ret;
-} BdrvRemovePersistentDirtyBitmapCo;
-
-static void coroutine_fn
-bdrv_co_remove_persistent_dirty_bitmap_entry(void *opaque)
-{
-    BdrvRemovePersistentDirtyBitmapCo *s = opaque;
-
-    s->ret = bdrv_co_remove_persistent_dirty_bitmap(s->bs, s->name, s->errp);
-    aio_wait_kick();
-}
-
-int bdrv_remove_persistent_dirty_bitmap(BlockDriverState *bs, const char *name,
-                                        Error **errp)
-{
-    if (qemu_in_coroutine()) {
-        return bdrv_co_remove_persistent_dirty_bitmap(bs, name, errp);
-    } else {
-        Coroutine *co;
-        BdrvRemovePersistentDirtyBitmapCo s = {
-            .bs = bs,
-            .name = name,
-            .errp = errp,
-            .ret = -EINPROGRESS,
-        };
-
-        co = qemu_coroutine_create(bdrv_co_remove_persistent_dirty_bitmap_entry,
-                                   &s);
-        bdrv_coroutine_enter(bs, co);
-        BDRV_POLL_WHILE(bs, s.ret == -EINPROGRESS);
-
-        return s.ret;
-    }
-}
-
 bool
 bdrv_supports_persistent_dirty_bitmap(BlockDriverState *bs)
 {
@@ -447,7 +408,7 @@ bdrv_supports_persistent_dirty_bitmap(BlockDriverState *bs)
     return false;
 }
 
-static bool coroutine_fn
+bool coroutine_fn
 bdrv_co_can_store_new_dirty_bitmap(BlockDriverState *bs, const char *name,
                                    uint32_t granularity, Error **errp)
 {
@@ -470,51 +431,6 @@ bdrv_co_can_store_new_dirty_bitmap(BlockDriverState *bs, const char *name,
     return drv->bdrv_co_can_store_new_dirty_bitmap(bs, name, granularity, errp);
 }
 
-typedef struct BdrvCanStoreNewDirtyBitmapCo {
-    BlockDriverState *bs;
-    const char *name;
-    uint32_t granularity;
-    Error **errp;
-    bool ret;
-
-    bool in_progress;
-} BdrvCanStoreNewDirtyBitmapCo;
-
-static void coroutine_fn bdrv_co_can_store_new_dirty_bitmap_entry(void *opaque)
-{
-    BdrvCanStoreNewDirtyBitmapCo *s = opaque;
-
-    s->ret = bdrv_co_can_store_new_dirty_bitmap(s->bs, s->name, s->granularity,
-                                                s->errp);
-    s->in_progress = false;
-    aio_wait_kick();
-}
-
-bool bdrv_can_store_new_dirty_bitmap(BlockDriverState *bs, const char *name,
-                                     uint32_t granularity, Error **errp)
-{
-    IO_CODE();
-    if (qemu_in_coroutine()) {
-        return bdrv_co_can_store_new_dirty_bitmap(bs, name, granularity, errp);
-    } else {
-        Coroutine *co;
-        BdrvCanStoreNewDirtyBitmapCo s = {
-            .bs = bs,
-            .name = name,
-            .granularity = granularity,
-            .errp = errp,
-            .in_progress = true,
-        };
-
-        co = qemu_coroutine_create(bdrv_co_can_store_new_dirty_bitmap_entry,
-                                   &s);
-        bdrv_coroutine_enter(bs, co);
-        BDRV_POLL_WHILE(bs, s.in_progress);
-
-        return s.ret;
-    }
-}
-
 void bdrv_disable_dirty_bitmap(BdrvDirtyBitmap *bitmap)
 {
     bdrv_dirty_bitmaps_lock(bitmap->bs);
diff --git a/block/graph-lock.c b/block/graph-lock.c
new file mode 100644
index 0000000000..454c31e691
--- /dev/null
+++ b/block/graph-lock.c
@@ -0,0 +1,275 @@
+/*
+ * Graph lock: rwlock to protect block layer graph manipulations (add/remove
+ * edges and nodes)
+ *
+ *  Copyright (c) 2022 Red Hat
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "qemu/osdep.h"
+#include "qemu/main-loop.h"
+#include "block/graph-lock.h"
+#include "block/block.h"
+#include "block/block_int.h"
+
+/* Dummy lock object to use for Thread Safety Analysis (TSA) */
+BdrvGraphLock graph_lock;
+
+/* Protects the list of aiocontext and orphaned_reader_count */
+static QemuMutex aio_context_list_lock;
+
+/* Written and read with atomic operations. */
+static int has_writer;
+
+/*
+ * A reader coroutine could move from an AioContext to another.
+ * If this happens, there is no problem from the point of view of
+ * counters. The problem is that the total count becomes
+ * unbalanced if one of the two AioContexts gets deleted.
+ * The count of readers must remain correct, so the AioContext's
+ * balance is transferred to this glboal variable.
+ * Protected by aio_context_list_lock.
+ */
+static uint32_t orphaned_reader_count;
+
+/* Queue of readers waiting for the writer to finish */
+static CoQueue reader_queue;
+
+struct BdrvGraphRWlock {
+    /* How many readers are currently reading the graph. */
+    uint32_t reader_count;
+
+    /*
+     * List of BdrvGraphRWlock kept in graph-lock.c
+     * Protected by aio_context_list_lock
+     */
+    QTAILQ_ENTRY(BdrvGraphRWlock) next_aio;
+};
+
+/*
+ * List of BdrvGraphRWlock. This list ensures that each BdrvGraphRWlock
+ * can safely modify only its own counter, avoid reading/writing
+ * others and thus improving performances by avoiding cacheline bounces.
+ */
+static QTAILQ_HEAD(, BdrvGraphRWlock) aio_context_list =
+    QTAILQ_HEAD_INITIALIZER(aio_context_list);
+
+static void __attribute__((__constructor__)) bdrv_init_graph_lock(void)
+{
+    qemu_mutex_init(&aio_context_list_lock);
+    qemu_co_queue_init(&reader_queue);
+}
+
+void register_aiocontext(AioContext *ctx)
+{
+    ctx->bdrv_graph = g_new0(BdrvGraphRWlock, 1);
+    QEMU_LOCK_GUARD(&aio_context_list_lock);
+    assert(ctx->bdrv_graph->reader_count == 0);
+    QTAILQ_INSERT_TAIL(&aio_context_list, ctx->bdrv_graph, next_aio);
+}
+
+void unregister_aiocontext(AioContext *ctx)
+{
+    QEMU_LOCK_GUARD(&aio_context_list_lock);
+    orphaned_reader_count += ctx->bdrv_graph->reader_count;
+    QTAILQ_REMOVE(&aio_context_list, ctx->bdrv_graph, next_aio);
+    g_free(ctx->bdrv_graph);
+}
+
+static uint32_t reader_count(void)
+{
+    BdrvGraphRWlock *brdv_graph;
+    uint32_t rd;
+
+    QEMU_LOCK_GUARD(&aio_context_list_lock);
+
+    /* rd can temporarly be negative, but the total will *always* be >= 0 */
+    rd = orphaned_reader_count;
+    QTAILQ_FOREACH(brdv_graph, &aio_context_list, next_aio) {
+        rd += qatomic_read(&brdv_graph->reader_count);
+    }
+
+    /* shouldn't overflow unless there are 2^31 readers */
+    assert((int32_t)rd >= 0);
+    return rd;
+}
+
+void bdrv_graph_wrlock(void)
+{
+    GLOBAL_STATE_CODE();
+    assert(!qatomic_read(&has_writer));
+
+    /* Make sure that constantly arriving new I/O doesn't cause starvation */
+    bdrv_drain_all_begin_nopoll();
+
+    /*
+     * reader_count == 0: this means writer will read has_reader as 1
+     * reader_count >= 1: we don't know if writer read has_writer == 0 or 1,
+     *                    but we need to wait.
+     * Wait by allowing other coroutine (and possible readers) to continue.
+     */
+    do {
+        /*
+         * has_writer must be 0 while polling, otherwise we get a deadlock if
+         * any callback involved during AIO_WAIT_WHILE() tries to acquire the
+         * reader lock.
+         */
+        qatomic_set(&has_writer, 0);
+        AIO_WAIT_WHILE(qemu_get_aio_context(), reader_count() >= 1);
+        qatomic_set(&has_writer, 1);
+
+        /*
+         * We want to only check reader_count() after has_writer = 1 is visible
+         * to other threads. That way no more readers can sneak in after we've
+         * determined reader_count() == 0.
+         */
+        smp_mb();
+    } while (reader_count() >= 1);
+
+    bdrv_drain_all_end();
+}
+
+void bdrv_graph_wrunlock(void)
+{
+    GLOBAL_STATE_CODE();
+    QEMU_LOCK_GUARD(&aio_context_list_lock);
+    assert(qatomic_read(&has_writer));
+
+    /*
+     * No need for memory barriers, this works in pair with
+     * the slow path of rdlock() and both take the lock.
+     */
+    qatomic_store_release(&has_writer, 0);
+
+    /* Wake up all coroutine that are waiting to read the graph */
+    qemu_co_enter_all(&reader_queue, &aio_context_list_lock);
+}
+
+void coroutine_fn bdrv_graph_co_rdlock(void)
+{
+    BdrvGraphRWlock *bdrv_graph;
+    bdrv_graph = qemu_get_current_aio_context()->bdrv_graph;
+
+    /* Do not lock if in main thread */
+    if (qemu_in_main_thread()) {
+        return;
+    }
+
+    for (;;) {
+        qatomic_set(&bdrv_graph->reader_count,
+                    bdrv_graph->reader_count + 1);
+        /* make sure writer sees reader_count before we check has_writer */
+        smp_mb();
+
+        /*
+         * has_writer == 0: this means writer will read reader_count as >= 1
+         * has_writer == 1: we don't know if writer read reader_count == 0
+         *                  or > 0, but we need to wait anyways because
+         *                  it will write.
+         */
+        if (!qatomic_read(&has_writer)) {
+            break;
+        }
+
+        /*
+         * Synchronize access with reader_count() in bdrv_graph_wrlock().
+         * Case 1:
+         * If this critical section gets executed first, reader_count will
+         * decrease and the reader will go to sleep.
+         * Then the writer will read reader_count that does not take into
+         * account this reader, and if there's no other reader it will
+         * enter the write section.
+         * Case 2:
+         * If reader_count() critical section gets executed first,
+         * then writer will read reader_count >= 1.
+         * It will wait in AIO_WAIT_WHILE(), but once it releases the lock
+         * we will enter this critical section and call aio_wait_kick().
+         */
+        WITH_QEMU_LOCK_GUARD(&aio_context_list_lock) {
+            /*
+             * Additional check when we use the above lock to synchronize
+             * with bdrv_graph_wrunlock().
+             * Case 1:
+             * If this gets executed first, has_writer is still 1, so we reduce
+             * reader_count and go to sleep.
+             * Then the writer will set has_writer to 0 and wake up all readers,
+             * us included.
+             * Case 2:
+             * If bdrv_graph_wrunlock() critical section gets executed first,
+             * then it will set has_writer to 0 and wake up all other readers.
+             * Then we execute this critical section, and therefore must check
+             * again for has_writer, otherwise we sleep without any writer
+             * actually running.
+             */
+            if (!qatomic_read(&has_writer)) {
+                return;
+            }
+
+            /* slow path where reader sleeps */
+            bdrv_graph->reader_count--;
+            aio_wait_kick();
+            qemu_co_queue_wait(&reader_queue, &aio_context_list_lock);
+        }
+    }
+}
+
+void coroutine_fn bdrv_graph_co_rdunlock(void)
+{
+    BdrvGraphRWlock *bdrv_graph;
+    bdrv_graph = qemu_get_current_aio_context()->bdrv_graph;
+
+    /* Do not lock if in main thread */
+    if (qemu_in_main_thread()) {
+        return;
+    }
+
+    qatomic_store_release(&bdrv_graph->reader_count,
+                          bdrv_graph->reader_count - 1);
+    /* make sure writer sees reader_count before we check has_writer */
+    smp_mb();
+
+    /*
+     * has_writer == 0: this means reader will read reader_count decreased
+     * has_writer == 1: we don't know if writer read reader_count old or
+     *                  new. Therefore, kick again so on next iteration
+     *                  writer will for sure read the updated value.
+     */
+    if (qatomic_read(&has_writer)) {
+        aio_wait_kick();
+    }
+}
+
+void bdrv_graph_rdlock_main_loop(void)
+{
+    GLOBAL_STATE_CODE();
+    assert(!qemu_in_coroutine());
+}
+
+void bdrv_graph_rdunlock_main_loop(void)
+{
+    GLOBAL_STATE_CODE();
+    assert(!qemu_in_coroutine());
+}
+
+void assert_bdrv_graph_readable(void)
+{
+    assert(qemu_in_main_thread() || reader_count());
+}
+
+void assert_bdrv_graph_writable(void)
+{
+    assert(qemu_in_main_thread());
+    assert(qatomic_read(&has_writer));
+}
diff --git a/block/io.c b/block/io.c
index b9424024f9..d87788dfbb 100644
--- a/block/io.c
+++ b/block/io.c
@@ -45,53 +45,43 @@ static void bdrv_parent_cb_resize(BlockDriverState *bs);
 static int coroutine_fn bdrv_co_do_pwrite_zeroes(BlockDriverState *bs,
     int64_t offset, int64_t bytes, BdrvRequestFlags flags);
 
-static void bdrv_parent_drained_begin(BlockDriverState *bs, BdrvChild *ignore,
-                                      bool ignore_bds_parents)
+static void bdrv_parent_drained_begin(BlockDriverState *bs, BdrvChild *ignore)
 {
     BdrvChild *c, *next;
 
     QLIST_FOREACH_SAFE(c, &bs->parents, next_parent, next) {
-        if (c == ignore || (ignore_bds_parents && c->klass->parent_is_bds)) {
+        if (c == ignore) {
             continue;
         }
-        bdrv_parent_drained_begin_single(c, false);
-    }
-}
-
-static void bdrv_parent_drained_end_single_no_poll(BdrvChild *c,
-                                                   int *drained_end_counter)
-{
-    assert(c->parent_quiesce_counter > 0);
-    c->parent_quiesce_counter--;
-    if (c->klass->drained_end) {
-        c->klass->drained_end(c, drained_end_counter);
+        bdrv_parent_drained_begin_single(c);
     }
 }
 
 void bdrv_parent_drained_end_single(BdrvChild *c)
 {
-    int drained_end_counter = 0;
-    AioContext *ctx = bdrv_child_get_parent_aio_context(c);
     IO_OR_GS_CODE();
-    bdrv_parent_drained_end_single_no_poll(c, &drained_end_counter);
-    AIO_WAIT_WHILE(ctx, qatomic_read(&drained_end_counter) > 0);
+
+    assert(c->quiesced_parent);
+    c->quiesced_parent = false;
+
+    if (c->klass->drained_end) {
+        c->klass->drained_end(c);
+    }
 }
 
-static void bdrv_parent_drained_end(BlockDriverState *bs, BdrvChild *ignore,
-                                    bool ignore_bds_parents,
-                                    int *drained_end_counter)
+static void bdrv_parent_drained_end(BlockDriverState *bs, BdrvChild *ignore)
 {
     BdrvChild *c;
 
     QLIST_FOREACH(c, &bs->parents, next_parent) {
-        if (c == ignore || (ignore_bds_parents && c->klass->parent_is_bds)) {
+        if (c == ignore) {
             continue;
         }
-        bdrv_parent_drained_end_single_no_poll(c, drained_end_counter);
+        bdrv_parent_drained_end_single(c);
     }
 }
 
-static bool bdrv_parent_drained_poll_single(BdrvChild *c)
+bool bdrv_parent_drained_poll_single(BdrvChild *c)
 {
     if (c->klass->drained_poll) {
         return c->klass->drained_poll(c);
@@ -115,17 +105,16 @@ static bool bdrv_parent_drained_poll(BlockDriverState *bs, BdrvChild *ignore,
     return busy;
 }
 
-void bdrv_parent_drained_begin_single(BdrvChild *c, bool poll)
+void bdrv_parent_drained_begin_single(BdrvChild *c)
 {
-    AioContext *ctx = bdrv_child_get_parent_aio_context(c);
     IO_OR_GS_CODE();
-    c->parent_quiesce_counter++;
+
+    assert(!c->quiesced_parent);
+    c->quiesced_parent = true;
+
     if (c->klass->drained_begin) {
         c->klass->drained_begin(c);
     }
-    if (poll) {
-        AIO_WAIT_WHILE(ctx, bdrv_parent_drained_poll_single(c));
-    }
 }
 
 static void bdrv_merge_limits(BlockLimits *dst, const BlockLimits *src)
@@ -245,69 +234,14 @@ typedef struct {
     BlockDriverState *bs;
     bool done;
     bool begin;
-    bool recursive;
     bool poll;
     BdrvChild *parent;
-    bool ignore_bds_parents;
-    int *drained_end_counter;
 } BdrvCoDrainData;
 
-static void coroutine_fn bdrv_drain_invoke_entry(void *opaque)
-{
-    BdrvCoDrainData *data = opaque;
-    BlockDriverState *bs = data->bs;
-
-    if (data->begin) {
-        bs->drv->bdrv_co_drain_begin(bs);
-    } else {
-        bs->drv->bdrv_co_drain_end(bs);
-    }
-
-    /* Set data->done and decrement drained_end_counter before bdrv_wakeup() */
-    qatomic_mb_set(&data->done, true);
-    if (!data->begin) {
-        qatomic_dec(data->drained_end_counter);
-    }
-    bdrv_dec_in_flight(bs);
-
-    g_free(data);
-}
-
-/* Recursively call BlockDriver.bdrv_co_drain_begin/end callbacks */
-static void bdrv_drain_invoke(BlockDriverState *bs, bool begin,
-                              int *drained_end_counter)
-{
-    BdrvCoDrainData *data;
-
-    if (!bs->drv || (begin && !bs->drv->bdrv_co_drain_begin) ||
-            (!begin && !bs->drv->bdrv_co_drain_end)) {
-        return;
-    }
-
-    data = g_new(BdrvCoDrainData, 1);
-    *data = (BdrvCoDrainData) {
-        .bs = bs,
-        .done = false,
-        .begin = begin,
-        .drained_end_counter = drained_end_counter,
-    };
-
-    if (!begin) {
-        qatomic_inc(drained_end_counter);
-    }
-
-    /* Make sure the driver callback completes during the polling phase for
-     * drain_begin. */
-    bdrv_inc_in_flight(bs);
-    data->co = qemu_coroutine_create(bdrv_drain_invoke_entry, data);
-    aio_co_schedule(bdrv_get_aio_context(bs), data->co);
-}
-
 /* Returns true if BDRV_POLL_WHILE() should go into a blocking aio_poll() */
-bool bdrv_drain_poll(BlockDriverState *bs, bool recursive,
-                     BdrvChild *ignore_parent, bool ignore_bds_parents)
+bool bdrv_drain_poll(BlockDriverState *bs, BdrvChild *ignore_parent,
+                     bool ignore_bds_parents)
 {
-    BdrvChild *child, *next;
     IO_OR_GS_CODE();
 
     if (bdrv_parent_drained_poll(bs, ignore_parent, ignore_bds_parents)) {
@@ -318,30 +252,18 @@ bool bdrv_drain_poll(BlockDriverState *bs, bool recursive,
         return true;
     }
 
-    if (recursive) {
-        assert(!ignore_bds_parents);
-        QLIST_FOREACH_SAFE(child, &bs->children, next, next) {
-            if (bdrv_drain_poll(child->bs, recursive, child, false)) {
-                return true;
-            }
-        }
-    }
-
     return false;
 }
 
-static bool bdrv_drain_poll_top_level(BlockDriverState *bs, bool recursive,
+static bool bdrv_drain_poll_top_level(BlockDriverState *bs,
                                       BdrvChild *ignore_parent)
 {
-    return bdrv_drain_poll(bs, recursive, ignore_parent, false);
+    return bdrv_drain_poll(bs, ignore_parent, false);
 }
 
-static void bdrv_do_drained_begin(BlockDriverState *bs, bool recursive,
-                                  BdrvChild *parent, bool ignore_bds_parents,
+static void bdrv_do_drained_begin(BlockDriverState *bs, BdrvChild *parent,
                                   bool poll);
-static void bdrv_do_drained_end(BlockDriverState *bs, bool recursive,
-                                BdrvChild *parent, bool ignore_bds_parents,
-                                int *drained_end_counter);
+static void bdrv_do_drained_end(BlockDriverState *bs, BdrvChild *parent);
 
 static void bdrv_co_drain_bh_cb(void *opaque)
 {
@@ -354,14 +276,10 @@ static void bdrv_co_drain_bh_cb(void *opaque)
         aio_context_acquire(ctx);
         bdrv_dec_in_flight(bs);
         if (data->begin) {
-            assert(!data->drained_end_counter);
-            bdrv_do_drained_begin(bs, data->recursive, data->parent,
-                                  data->ignore_bds_parents, data->poll);
+            bdrv_do_drained_begin(bs, data->parent, data->poll);
         } else {
             assert(!data->poll);
-            bdrv_do_drained_end(bs, data->recursive, data->parent,
-                                data->ignore_bds_parents,
-                                data->drained_end_counter);
+            bdrv_do_drained_end(bs, data->parent);
         }
         aio_context_release(ctx);
     } else {
@@ -374,11 +292,9 @@ static void bdrv_co_drain_bh_cb(void *opaque)
 }
 
 static void coroutine_fn bdrv_co_yield_to_drain(BlockDriverState *bs,
-                                                bool begin, bool recursive,
+                                                bool begin,
                                                 BdrvChild *parent,
-                                                bool ignore_bds_parents,
-                                                bool poll,
-                                                int *drained_end_counter)
+                                                bool poll)
 {
     BdrvCoDrainData data;
     Coroutine *self = qemu_coroutine_self();
@@ -394,11 +310,8 @@ static void coroutine_fn bdrv_co_yield_to_drain(BlockDriverState *bs,
         .bs = bs,
         .done = false,
         .begin = begin,
-        .recursive = recursive,
         .parent = parent,
-        .ignore_bds_parents = ignore_bds_parents,
         .poll = poll,
-        .drained_end_counter = drained_end_counter,
     };
 
     if (bs) {
@@ -429,41 +342,22 @@ static void coroutine_fn bdrv_co_yield_to_drain(BlockDriverState *bs,
     }
 }
 
-void bdrv_do_drained_begin_quiesce(BlockDriverState *bs,
-                                   BdrvChild *parent, bool ignore_bds_parents)
-{
-    IO_OR_GS_CODE();
-    assert(!qemu_in_coroutine());
-
-    /* Stop things in parent-to-child order */
-    if (qatomic_fetch_inc(&bs->quiesce_counter) == 0) {
-        aio_disable_external(bdrv_get_aio_context(bs));
-    }
-
-    bdrv_parent_drained_begin(bs, parent, ignore_bds_parents);
-    bdrv_drain_invoke(bs, true, NULL);
-}
-
-static void bdrv_do_drained_begin(BlockDriverState *bs, bool recursive,
-                                  BdrvChild *parent, bool ignore_bds_parents,
+static void bdrv_do_drained_begin(BlockDriverState *bs, BdrvChild *parent,
                                   bool poll)
 {
-    BdrvChild *child, *next;
+    IO_OR_GS_CODE();
 
     if (qemu_in_coroutine()) {
-        bdrv_co_yield_to_drain(bs, true, recursive, parent, ignore_bds_parents,
-                               poll, NULL);
+        bdrv_co_yield_to_drain(bs, true, parent, poll);
         return;
     }
 
-    bdrv_do_drained_begin_quiesce(bs, parent, ignore_bds_parents);
-
-    if (recursive) {
-        assert(!ignore_bds_parents);
-        bs->recursive_quiesce_counter++;
-        QLIST_FOREACH_SAFE(child, &bs->children, next, next) {
-            bdrv_do_drained_begin(child->bs, true, child, ignore_bds_parents,
-                                  false);
+    /* Stop things in parent-to-child order */
+    if (qatomic_fetch_inc(&bs->quiesce_counter) == 0) {
+        aio_disable_external(bdrv_get_aio_context(bs));
+        bdrv_parent_drained_begin(bs, parent);
+        if (bs->drv && bs->drv->bdrv_drain_begin) {
+            bs->drv->bdrv_drain_begin(bs);
         }
     }
 
@@ -477,117 +371,50 @@ static void bdrv_do_drained_begin(BlockDriverState *bs, bool recursive,
      * nodes.
      */
     if (poll) {
-        assert(!ignore_bds_parents);
-        BDRV_POLL_WHILE(bs, bdrv_drain_poll_top_level(bs, recursive, parent));
+        BDRV_POLL_WHILE(bs, bdrv_drain_poll_top_level(bs, parent));
     }
 }
 
-void bdrv_drained_begin(BlockDriverState *bs)
+void bdrv_do_drained_begin_quiesce(BlockDriverState *bs, BdrvChild *parent)
 {
-    IO_OR_GS_CODE();
-    bdrv_do_drained_begin(bs, false, NULL, false, true);
+    bdrv_do_drained_begin(bs, parent, false);
 }
 
-void bdrv_subtree_drained_begin(BlockDriverState *bs)
+void bdrv_drained_begin(BlockDriverState *bs)
 {
     IO_OR_GS_CODE();
-    bdrv_do_drained_begin(bs, true, NULL, false, true);
+    bdrv_do_drained_begin(bs, NULL, true);
 }
 
 /**
  * This function does not poll, nor must any of its recursively called
- * functions.  The *drained_end_counter pointee will be incremented
- * once for every background operation scheduled, and decremented once
- * the operation settles.  Therefore, the pointer must remain valid
- * until the pointee reaches 0.  That implies that whoever sets up the
- * pointee has to poll until it is 0.
- *
- * We use atomic operations to access *drained_end_counter, because
- * (1) when called from bdrv_set_aio_context_ignore(), the subgraph of
- *     @bs may contain nodes in different AioContexts,
- * (2) bdrv_drain_all_end() uses the same counter for all nodes,
- *     regardless of which AioContext they are in.
+ * functions.
  */
-static void bdrv_do_drained_end(BlockDriverState *bs, bool recursive,
-                                BdrvChild *parent, bool ignore_bds_parents,
-                                int *drained_end_counter)
+static void bdrv_do_drained_end(BlockDriverState *bs, BdrvChild *parent)
 {
-    BdrvChild *child;
     int old_quiesce_counter;
 
-    assert(drained_end_counter != NULL);
-
     if (qemu_in_coroutine()) {
-        bdrv_co_yield_to_drain(bs, false, recursive, parent, ignore_bds_parents,
-                               false, drained_end_counter);
+        bdrv_co_yield_to_drain(bs, false, parent, false);
         return;
     }
     assert(bs->quiesce_counter > 0);
 
     /* Re-enable things in child-to-parent order */
-    bdrv_drain_invoke(bs, false, drained_end_counter);
-    bdrv_parent_drained_end(bs, parent, ignore_bds_parents,
-                            drained_end_counter);
-
     old_quiesce_counter = qatomic_fetch_dec(&bs->quiesce_counter);
     if (old_quiesce_counter == 1) {
-        aio_enable_external(bdrv_get_aio_context(bs));
-    }
-
-    if (recursive) {
-        assert(!ignore_bds_parents);
-        bs->recursive_quiesce_counter--;
-        QLIST_FOREACH(child, &bs->children, next) {
-            bdrv_do_drained_end(child->bs, true, child, ignore_bds_parents,
-                                drained_end_counter);
+        if (bs->drv && bs->drv->bdrv_drain_end) {
+            bs->drv->bdrv_drain_end(bs);
         }
+        bdrv_parent_drained_end(bs, parent);
+        aio_enable_external(bdrv_get_aio_context(bs));
     }
 }
 
 void bdrv_drained_end(BlockDriverState *bs)
 {
-    int drained_end_counter = 0;
-    IO_OR_GS_CODE();
-    bdrv_do_drained_end(bs, false, NULL, false, &drained_end_counter);
-    BDRV_POLL_WHILE(bs, qatomic_read(&drained_end_counter) > 0);
-}
-
-void bdrv_drained_end_no_poll(BlockDriverState *bs, int *drained_end_counter)
-{
-    IO_CODE();
-    bdrv_do_drained_end(bs, false, NULL, false, drained_end_counter);
-}
-
-void bdrv_subtree_drained_end(BlockDriverState *bs)
-{
-    int drained_end_counter = 0;
-    IO_OR_GS_CODE();
-    bdrv_do_drained_end(bs, true, NULL, false, &drained_end_counter);
-    BDRV_POLL_WHILE(bs, qatomic_read(&drained_end_counter) > 0);
-}
-
-void bdrv_apply_subtree_drain(BdrvChild *child, BlockDriverState *new_parent)
-{
-    int i;
-    IO_OR_GS_CODE();
-
-    for (i = 0; i < new_parent->recursive_quiesce_counter; i++) {
-        bdrv_do_drained_begin(child->bs, true, child, false, true);
-    }
-}
-
-void bdrv_unapply_subtree_drain(BdrvChild *child, BlockDriverState *old_parent)
-{
-    int drained_end_counter = 0;
-    int i;
     IO_OR_GS_CODE();
-
-    for (i = 0; i < old_parent->recursive_quiesce_counter; i++) {
-        bdrv_do_drained_end(child->bs, true, child, false,
-                            &drained_end_counter);
-    }
-
-    BDRV_POLL_WHILE(child->bs, qatomic_read(&drained_end_counter) > 0);
+    bdrv_do_drained_end(bs, NULL);
 }
 
 void bdrv_drain(BlockDriverState *bs)
@@ -620,7 +447,7 @@ static bool bdrv_drain_all_poll(void)
     while ((bs = bdrv_next_all_states(bs))) {
         AioContext *aio_context = bdrv_get_aio_context(bs);
         aio_context_acquire(aio_context);
-        result |= bdrv_drain_poll(bs, false, NULL, true);
+        result |= bdrv_drain_poll(bs, NULL, true);
         aio_context_release(aio_context);
     }
 
@@ -639,16 +466,11 @@ static bool bdrv_drain_all_poll(void)
  * NOTE: no new block jobs or BlockDriverStates can be created between
  * the bdrv_drain_all_begin() and bdrv_drain_all_end() calls.
  */
-void bdrv_drain_all_begin(void)
+void bdrv_drain_all_begin_nopoll(void)
 {
     BlockDriverState *bs = NULL;
     GLOBAL_STATE_CODE();
 
-    if (qemu_in_coroutine()) {
-        bdrv_co_yield_to_drain(NULL, true, false, NULL, true, true, NULL);
-        return;
-    }
-
     /*
      * bdrv queue is managed by record/replay,
      * waiting for finishing the I/O requests may
@@ -670,9 +492,21 @@ void bdrv_drain_all_begin(void)
         AioContext *aio_context = bdrv_get_aio_context(bs);
 
         aio_context_acquire(aio_context);
-        bdrv_do_drained_begin(bs, false, NULL, true, false);
+        bdrv_do_drained_begin(bs, NULL, false);
         aio_context_release(aio_context);
     }
+}
+
+void bdrv_drain_all_begin(void)
+{
+    BlockDriverState *bs = NULL;
+
+    if (qemu_in_coroutine()) {
+        bdrv_co_yield_to_drain(NULL, true, NULL, true);
+        return;
+    }
+
+    bdrv_drain_all_begin_nopoll();
 
     /* Now poll the in-flight requests */
     AIO_WAIT_WHILE(NULL, bdrv_drain_all_poll());
@@ -684,22 +518,19 @@ void bdrv_drain_all_begin(void)
 
 void bdrv_drain_all_end_quiesce(BlockDriverState *bs)
 {
-    int drained_end_counter = 0;
     GLOBAL_STATE_CODE();
 
     g_assert(bs->quiesce_counter > 0);
     g_assert(!bs->refcnt);
 
     while (bs->quiesce_counter) {
-        bdrv_do_drained_end(bs, false, NULL, true, &drained_end_counter);
+        bdrv_do_drained_end(bs, NULL);
     }
-    BDRV_POLL_WHILE(bs, qatomic_read(&drained_end_counter) > 0);
 }
 
 void bdrv_drain_all_end(void)
 {
     BlockDriverState *bs = NULL;
-    int drained_end_counter = 0;
     GLOBAL_STATE_CODE();
 
     /*
@@ -715,13 +546,11 @@ void bdrv_drain_all_end(void)
         AioContext *aio_context = bdrv_get_aio_context(bs);
 
         aio_context_acquire(aio_context);
-        bdrv_do_drained_end(bs, false, NULL, true, &drained_end_counter);
+        bdrv_do_drained_end(bs, NULL);
         aio_context_release(aio_context);
     }
 
     assert(qemu_get_current_aio_context() == qemu_get_aio_context());
-    AIO_WAIT_WHILE(NULL, qatomic_read(&drained_end_counter) > 0);
-
     assert(bdrv_drain_all_count > 0);
     bdrv_drain_all_count--;
 }
@@ -2711,6 +2540,17 @@ bdrv_co_common_block_status_above(BlockDriverState *bs,
     return ret;
 }
 
+int coroutine_fn bdrv_co_block_status_above(BlockDriverState *bs,
+                                            BlockDriverState *base,
+                                            int64_t offset, int64_t bytes,
+                                            int64_t *pnum, int64_t *map,
+                                            BlockDriverState **file)
+{
+    IO_CODE();
+    return bdrv_co_common_block_status_above(bs, base, false, true, offset,
+                                             bytes, pnum, map, file, NULL);
+}
+
 int bdrv_block_status_above(BlockDriverState *bs, BlockDriverState *base,
                             int64_t offset, int64_t bytes, int64_t *pnum,
                             int64_t *map, BlockDriverState **file)
@@ -2756,6 +2596,22 @@ int coroutine_fn bdrv_co_is_zero_fast(BlockDriverState *bs, int64_t offset,
     return (pnum == bytes) && (ret & BDRV_BLOCK_ZERO);
 }
 
+int coroutine_fn bdrv_co_is_allocated(BlockDriverState *bs, int64_t offset,
+                                      int64_t bytes, int64_t *pnum)
+{
+    int ret;
+    int64_t dummy;
+    IO_CODE();
+
+    ret = bdrv_co_common_block_status_above(bs, bs, true, false, offset,
+                                            bytes, pnum ? pnum : &dummy, NULL,
+                                            NULL, NULL);
+    if (ret < 0) {
+        return ret;
+    }
+    return !!(ret & BDRV_BLOCK_ALLOCATED);
+}
+
 int bdrv_is_allocated(BlockDriverState *bs, int64_t offset, int64_t bytes,
                       int64_t *pnum)
 {
@@ -2772,6 +2628,29 @@ int bdrv_is_allocated(BlockDriverState *bs, int64_t offset, int64_t bytes,
     return !!(ret & BDRV_BLOCK_ALLOCATED);
 }
 
+/* See bdrv_is_allocated_above for documentation */
+int coroutine_fn bdrv_co_is_allocated_above(BlockDriverState *top,
+                                            BlockDriverState *base,
+                                            bool include_base, int64_t offset,
+                                            int64_t bytes, int64_t *pnum)
+{
+    int depth;
+    int ret;
+    IO_CODE();
+
+    ret = bdrv_co_common_block_status_above(top, base, include_base, false,
+                                            offset, bytes, pnum, NULL, NULL,
+                                            &depth);
+    if (ret < 0) {
+        return ret;
+    }
+
+    if (ret & BDRV_BLOCK_ALLOCATED) {
+        return depth;
+    }
+    return 0;
+}
+
 /*
  * Given an image chain: ... -> [BASE] -> [INTER1] -> [INTER2] -> [TOP]
  *
@@ -2795,10 +2674,12 @@ int bdrv_is_allocated_above(BlockDriverState *top,
                             int64_t bytes, int64_t *pnum)
 {
     int depth;
-    int ret = bdrv_common_block_status_above(top, base, include_base, false,
-                                             offset, bytes, pnum, NULL, NULL,
-                                             &depth);
+    int ret;
     IO_CODE();
+
+    ret = bdrv_common_block_status_above(top, base, include_base, false,
+                                         offset, bytes, pnum, NULL, NULL,
+                                         &depth);
     if (ret < 0) {
         return ret;
     }
@@ -2816,6 +2697,7 @@ bdrv_co_readv_vmstate(BlockDriverState *bs, QEMUIOVector *qiov, int64_t pos)
     BlockDriverState *child_bs = bdrv_primary_bs(bs);
     int ret;
     IO_CODE();
+    assert_bdrv_graph_readable();
 
     ret = bdrv_check_qiov_request(pos, qiov->size, qiov, 0, NULL);
     if (ret < 0) {
@@ -2848,6 +2730,7 @@ bdrv_co_writev_vmstate(BlockDriverState *bs, QEMUIOVector *qiov, int64_t pos)
     BlockDriverState *child_bs = bdrv_primary_bs(bs);
     int ret;
     IO_CODE();
+    assert_bdrv_graph_readable();
 
     ret = bdrv_check_qiov_request(pos, qiov->size, qiov, 0, NULL);
     if (ret < 0) {
diff --git a/block/meson.build b/block/meson.build
index b7c68b83a3..90011a2805 100644
--- a/block/meson.build
+++ b/block/meson.build
@@ -10,6 +10,7 @@ block_ss.add(files(
   'blkverify.c',
   'block-backend.c',
   'block-copy.c',
+  'graph-lock.c',
   'commit.c',
   'copy-on-read.c',
   'preallocate.c',
@@ -137,6 +138,7 @@ block_gen_c = custom_target('block-gen.c',
                             output: 'block-gen.c',
                             input: files(
                                       '../include/block/block-io.h',
+                                      '../include/block/dirty-bitmap.h',
                                       '../include/block/block-global-state.h',
                                       '../include/sysemu/block-backend-io.h',
                                       'coroutines.h'
diff --git a/block/parallels.c b/block/parallels.c
index fa08c1104b..bbea2f2221 100644
--- a/block/parallels.c
+++ b/block/parallels.c
@@ -646,7 +646,7 @@ static int coroutine_fn parallels_co_create_opts(BlockDriver *drv,
     }
 
     /* Create and open the file (protocol layer) */
-    ret = bdrv_create_file(filename, opts, errp);
+    ret = bdrv_co_create_file(filename, opts, errp);
     if (ret < 0) {
         goto done;
     }
diff --git a/block/qcow.c b/block/qcow.c
index 8bffc41531..5d99f00411 100644
--- a/block/qcow.c
+++ b/block/qcow.c
@@ -973,7 +973,7 @@ static int coroutine_fn qcow_co_create_opts(BlockDriver *drv,
     }
 
     /* Create and open the file (protocol layer) */
-    ret = bdrv_create_file(filename, opts, errp);
+    ret = bdrv_co_create_file(filename, opts, errp);
     if (ret < 0) {
         goto fail;
     }
diff --git a/block/qcow2.c b/block/qcow2.c
index 941782a011..bafbd077b9 100644
--- a/block/qcow2.c
+++ b/block/qcow2.c
@@ -3871,7 +3871,7 @@ static int coroutine_fn qcow2_co_create_opts(BlockDriver *drv,
     }
 
     /* Create and open the file (protocol layer) */
-    ret = bdrv_create_file(filename, opts, errp);
+    ret = bdrv_co_create_file(filename, opts, errp);
     if (ret < 0) {
         goto finish;
     }
@@ -3886,7 +3886,7 @@ static int coroutine_fn qcow2_co_create_opts(BlockDriver *drv,
     /* Create and open an external data file (protocol layer) */
     val = qdict_get_try_str(qdict, BLOCK_OPT_DATA_FILE);
     if (val) {
-        ret = bdrv_create_file(val, opts, errp);
+        ret = bdrv_co_create_file(val, opts, errp);
         if (ret < 0) {
             goto finish;
         }
diff --git a/block/qed.c b/block/qed.c
index 0ab159b468..faa606618e 100644
--- a/block/qed.c
+++ b/block/qed.c
@@ -262,7 +262,7 @@ static bool coroutine_fn qed_plug_allocating_write_reqs(BDRVQEDState *s)
     assert(!s->allocating_write_reqs_plugged);
     if (s->allocating_acb != NULL) {
         /* Another allocating write came concurrently.  This cannot happen
-         * from bdrv_qed_co_drain_begin, but it can happen when the timer runs.
+         * from bdrv_qed_drain_begin, but it can happen when the timer runs.
          */
         qemu_co_mutex_unlock(&s->table_lock);
         return false;
@@ -282,9 +282,8 @@ static void coroutine_fn qed_unplug_allocating_write_reqs(BDRVQEDState *s)
     qemu_co_mutex_unlock(&s->table_lock);
 }
 
-static void coroutine_fn qed_need_check_timer_entry(void *opaque)
+static void coroutine_fn qed_need_check_timer(BDRVQEDState *s)
 {
-    BDRVQEDState *s = opaque;
     int ret;
 
     trace_qed_need_check_timer_cb(s);
@@ -310,9 +309,20 @@ static void coroutine_fn qed_need_check_timer_entry(void *opaque)
     (void) ret;
 }
 
+static void coroutine_fn qed_need_check_timer_entry(void *opaque)
+{
+    BDRVQEDState *s = opaque;
+
+    qed_need_check_timer(opaque);
+    bdrv_dec_in_flight(s->bs);
+}
+
 static void qed_need_check_timer_cb(void *opaque)
 {
+    BDRVQEDState *s = opaque;
     Coroutine *co = qemu_coroutine_create(qed_need_check_timer_entry, opaque);
+
+    bdrv_inc_in_flight(s->bs);
     qemu_coroutine_enter(co);
 }
 
@@ -355,7 +365,7 @@ static void bdrv_qed_attach_aio_context(BlockDriverState *bs,
     }
 }
 
-static void coroutine_fn bdrv_qed_co_drain_begin(BlockDriverState *bs)
+static void bdrv_qed_drain_begin(BlockDriverState *bs)
 {
     BDRVQEDState *s = bs->opaque;
 
@@ -363,8 +373,12 @@ static void coroutine_fn bdrv_qed_co_drain_begin(BlockDriverState *bs)
      * header is flushed.
      */
     if (s->need_check_timer && timer_pending(s->need_check_timer)) {
+        Coroutine *co;
+
         qed_cancel_need_check_timer(s);
-        qed_need_check_timer_entry(s);
+        co = qemu_coroutine_create(qed_need_check_timer_entry, s);
+        bdrv_inc_in_flight(bs);
+        aio_co_enter(bdrv_get_aio_context(bs), co);
     }
 }
 
@@ -764,7 +778,7 @@ static int coroutine_fn bdrv_qed_co_create_opts(BlockDriver *drv,
     }
 
     /* Create and open the file (protocol layer) */
-    ret = bdrv_create_file(filename, opts, errp);
+    ret = bdrv_co_create_file(filename, opts, errp);
     if (ret < 0) {
         goto fail;
     }
@@ -1647,7 +1661,7 @@ static BlockDriver bdrv_qed = {
     .bdrv_co_check            = bdrv_qed_co_check,
     .bdrv_detach_aio_context  = bdrv_qed_detach_aio_context,
     .bdrv_attach_aio_context  = bdrv_qed_attach_aio_context,
-    .bdrv_co_drain_begin      = bdrv_qed_co_drain_begin,
+    .bdrv_drain_begin         = bdrv_qed_drain_begin,
 };
 
 static void bdrv_qed_init(void)
diff --git a/block/raw-format.c b/block/raw-format.c
index a68014ef0b..28905b09ee 100644
--- a/block/raw-format.c
+++ b/block/raw-format.c
@@ -433,7 +433,7 @@ static int coroutine_fn raw_co_create_opts(BlockDriver *drv,
                                            QemuOpts *opts,
                                            Error **errp)
 {
-    return bdrv_create_file(filename, opts, errp);
+    return bdrv_co_create_file(filename, opts, errp);
 }
 
 static int raw_open(BlockDriverState *bs, QDict *options, int flags,
diff --git a/block/replication.c b/block/replication.c
index f1eed25e43..c62f48a874 100644
--- a/block/replication.c
+++ b/block/replication.c
@@ -374,9 +374,6 @@ static void reopen_backing_file(BlockDriverState *bs, bool writable,
         s->orig_secondary_read_only = bdrv_is_read_only(secondary_disk->bs);
     }
 
-    bdrv_subtree_drained_begin(hidden_disk->bs);
-    bdrv_subtree_drained_begin(secondary_disk->bs);
-
     if (s->orig_hidden_read_only) {
         QDict *opts = qdict_new();
         qdict_put_bool(opts, BDRV_OPT_READ_ONLY, !writable);
@@ -401,9 +398,6 @@ static void reopen_backing_file(BlockDriverState *bs, bool writable,
             aio_context_acquire(ctx);
         }
     }
-
-    bdrv_subtree_drained_end(hidden_disk->bs);
-    bdrv_subtree_drained_end(secondary_disk->bs);
 }
 
 static void backup_job_cleanup(BlockDriverState *bs)
diff --git a/block/stream.c b/block/stream.c
index 694709bd25..8744ad103f 100644
--- a/block/stream.c
+++ b/block/stream.c
@@ -64,13 +64,16 @@ static int stream_prepare(Job *job)
     bdrv_cor_filter_drop(s->cor_filter_bs);
     s->cor_filter_bs = NULL;
 
-    bdrv_subtree_drained_begin(s->above_base);
+    /*
+     * bdrv_set_backing_hd() requires that unfiltered_bs is drained. Drain
+     * already here and use bdrv_set_backing_hd_drained() instead because
+     * the polling during drained_begin() might change the graph, and if we do
+     * this only later, we may end up working with the wrong base node (or it
+     * might even have gone away by the time we want to use it).
+     */
+    bdrv_drained_begin(unfiltered_bs);
 
     base = bdrv_filter_or_cow_bs(s->above_base);
-    if (base) {
-        bdrv_ref(base);
-    }
-
     unfiltered_base = bdrv_skip_filters(base);
 
     if (bdrv_cow_child(unfiltered_bs)) {
@@ -82,7 +85,13 @@ static int stream_prepare(Job *job)
             }
         }
 
-        bdrv_set_backing_hd(unfiltered_bs, base, &local_err);
+        bdrv_set_backing_hd_drained(unfiltered_bs, base, &local_err);
+
+        /*
+         * This call will do I/O, so the graph can change again from here on.
+         * We have already completed the graph change, so we are not in danger
+         * of operating on the wrong node any more if this happens.
+         */
         ret = bdrv_change_backing_file(unfiltered_bs, base_id, base_fmt, false);
         if (local_err) {
             error_report_err(local_err);
@@ -92,10 +101,7 @@ static int stream_prepare(Job *job)
     }
 
 out:
-    if (base) {
-        bdrv_unref(base);
-    }
-    bdrv_subtree_drained_end(s->above_base);
+    bdrv_drained_end(unfiltered_bs);
     return ret;
 }
 
diff --git a/block/throttle.c b/block/throttle.c
index 131eba3ab4..88851c84f4 100644
--- a/block/throttle.c
+++ b/block/throttle.c
@@ -214,7 +214,7 @@ static void throttle_reopen_abort(BDRVReopenState *reopen_state)
     reopen_state->opaque = NULL;
 }
 
-static void coroutine_fn throttle_co_drain_begin(BlockDriverState *bs)
+static void throttle_drain_begin(BlockDriverState *bs)
 {
     ThrottleGroupMember *tgm = bs->opaque;
     if (qatomic_fetch_inc(&tgm->io_limits_disabled) == 0) {
@@ -222,7 +222,7 @@ static void coroutine_fn throttle_co_drain_begin(BlockDriverState *bs)
     }
 }
 
-static void coroutine_fn throttle_co_drain_end(BlockDriverState *bs)
+static void throttle_drain_end(BlockDriverState *bs)
 {
     ThrottleGroupMember *tgm = bs->opaque;
     assert(tgm->io_limits_disabled);
@@ -261,8 +261,8 @@ static BlockDriver bdrv_throttle = {
     .bdrv_reopen_commit                 =   throttle_reopen_commit,
     .bdrv_reopen_abort                  =   throttle_reopen_abort,
 
-    .bdrv_co_drain_begin                =   throttle_co_drain_begin,
-    .bdrv_co_drain_end                  =   throttle_co_drain_end,
+    .bdrv_drain_begin                   =   throttle_drain_begin,
+    .bdrv_drain_end                     =   throttle_drain_end,
 
     .is_filter                          =   true,
     .strong_runtime_opts                =   throttle_strong_runtime_opts,
diff --git a/block/vdi.c b/block/vdi.c
index c0c111c4b9..479bcfe820 100644
--- a/block/vdi.c
+++ b/block/vdi.c
@@ -934,7 +934,7 @@ static int coroutine_fn vdi_co_create_opts(BlockDriver *drv,
     qdict = qemu_opts_to_qdict_filtered(opts, NULL, &vdi_create_opts, true);
 
     /* Create and open the file (protocol layer) */
-    ret = bdrv_create_file(filename, opts, errp);
+    ret = bdrv_co_create_file(filename, opts, errp);
     if (ret < 0) {
         goto done;
     }
diff --git a/block/vhdx.c b/block/vhdx.c
index bad9ca691b..4c929800fe 100644
--- a/block/vhdx.c
+++ b/block/vhdx.c
@@ -2084,7 +2084,7 @@ static int coroutine_fn vhdx_co_create_opts(BlockDriver *drv,
     }
 
     /* Create and open the file (protocol layer) */
-    ret = bdrv_create_file(filename, opts, errp);
+    ret = bdrv_co_create_file(filename, opts, errp);
     if (ret < 0) {
         goto fail;
     }
diff --git a/block/vmdk.c b/block/vmdk.c
index bac3d8db50..8894dac2d4 100644
--- a/block/vmdk.c
+++ b/block/vmdk.c
@@ -2285,15 +2285,16 @@ exit:
     return ret;
 }
 
-static int vmdk_create_extent(const char *filename, int64_t filesize,
-                              bool flat, bool compress, bool zeroed_grain,
-                              BlockBackend **pbb,
-                              QemuOpts *opts, Error **errp)
+static int coroutine_fn vmdk_create_extent(const char *filename,
+                                           int64_t filesize, bool flat,
+                                           bool compress, bool zeroed_grain,
+                                           BlockBackend **pbb,
+                                           QemuOpts *opts, Error **errp)
 {
     int ret;
     BlockBackend *blk = NULL;
 
-    ret = bdrv_create_file(filename, opts, errp);
+    ret = bdrv_co_create_file(filename, opts, errp);
     if (ret < 0) {
         goto exit;
     }
@@ -2366,14 +2367,14 @@ static int filename_decompose(const char *filename, char *path, char *prefix,
  *           non-split format.
  * idx >= 1: get the n-th extent if in a split subformat
  */
-typedef BlockBackend *(*vmdk_create_extent_fn)(int64_t size,
-                                               int idx,
-                                               bool flat,
-                                               bool split,
-                                               bool compress,
-                                               bool zeroed_grain,
-                                               void *opaque,
-                                               Error **errp);
+typedef BlockBackend * coroutine_fn (*vmdk_create_extent_fn)(int64_t size,
+                                                             int idx,
+                                                             bool flat,
+                                                             bool split,
+                                                             bool compress,
+                                                             bool zeroed_grain,
+                                                             void *opaque,
+                                                             Error **errp);
 
 static void vmdk_desc_add_extent(GString *desc,
                                  const char *extent_line_fmt,
@@ -2616,7 +2617,7 @@ typedef struct {
     QemuOpts *opts;
 } VMDKCreateOptsData;
 
-static BlockBackend *vmdk_co_create_opts_cb(int64_t size, int idx,
+static BlockBackend * coroutine_fn vmdk_co_create_opts_cb(int64_t size, int idx,
                                             bool flat, bool split, bool compress,
                                             bool zeroed_grain, void *opaque,
                                             Error **errp)
@@ -2768,10 +2769,11 @@ exit:
     return ret;
 }
 
-static BlockBackend *vmdk_co_create_cb(int64_t size, int idx,
-                                       bool flat, bool split, bool compress,
-                                       bool zeroed_grain, void *opaque,
-                                       Error **errp)
+static BlockBackend * coroutine_fn vmdk_co_create_cb(int64_t size, int idx,
+                                                     bool flat, bool split,
+                                                     bool compress,
+                                                     bool zeroed_grain,
+                                                     void *opaque, Error **errp)
 {
     int ret;
     BlockDriverState *bs;
diff --git a/block/vpc.c b/block/vpc.c
index 95841f259a..6ee95dcb96 100644
--- a/block/vpc.c
+++ b/block/vpc.c
@@ -1111,7 +1111,7 @@ static int coroutine_fn vpc_co_create_opts(BlockDriver *drv,
     }
 
     /* Create and open the file (protocol layer) */
-    ret = bdrv_create_file(filename, opts, errp);
+    ret = bdrv_co_create_file(filename, opts, errp);
     if (ret < 0) {
         goto fail;
     }