summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--block.c6
-rw-r--r--block/block-backend.c31
-rw-r--r--block/io.c30
-rw-r--r--block/linux-aio.c2
-rw-r--r--blockdev.c32
-rw-r--r--blockjob.c9
-rw-r--r--include/block/aio-wait.h28
-rw-r--r--include/block/block.h6
-rw-r--r--include/block/block_int.h3
-rw-r--r--include/block/blockjob.h3
-rw-r--r--include/qemu/coroutine.h5
-rw-r--r--include/qemu/job.h12
-rw-r--r--job.c67
-rw-r--r--qapi/block-core.json24
-rwxr-xr-xtests/qemu-iotests/04052
-rw-r--r--tests/qemu-iotests/040.out4
-rwxr-xr-xtests/qemu-iotests/0513
-rw-r--r--tests/qemu-iotests/051.out3
-rw-r--r--tests/qemu-iotests/051.pc.out3
-rw-r--r--tests/test-bdrv-drain.c294
-rw-r--r--tests/test-blockjob.c6
-rw-r--r--util/aio-wait.c11
-rw-r--r--util/async.c2
-rw-r--r--util/qemu-coroutine.c5
24 files changed, 527 insertions, 114 deletions
diff --git a/block.c b/block.c
index 0dbb1fcc7b..c298ca6a19 100644
--- a/block.c
+++ b/block.c
@@ -2792,6 +2792,7 @@ static BlockDriverState *bdrv_open_inherit(const char *filename,
     bdrv_parent_cb_change_media(bs, true);
 
     qobject_unref(options);
+    options = NULL;
 
     /* For snapshot=on, create a temporary qcow2 overlay. bs points to the
      * temporary snapshot afterwards. */
@@ -4885,11 +4886,6 @@ AioContext *bdrv_get_aio_context(BlockDriverState *bs)
     return bs ? bs->aio_context : qemu_get_aio_context();
 }
 
-AioWait *bdrv_get_aio_wait(BlockDriverState *bs)
-{
-    return bs ? &bs->wait : NULL;
-}
-
 void bdrv_coroutine_enter(BlockDriverState *bs, Coroutine *co)
 {
     aio_co_enter(bdrv_get_aio_context(bs), co);
diff --git a/block/block-backend.c b/block/block-backend.c
index 14a1b7ac6a..7b1ec5071b 100644
--- a/block/block-backend.c
+++ b/block/block-backend.c
@@ -88,7 +88,6 @@ struct BlockBackend {
      * Accessed with atomic ops.
      */
     unsigned int in_flight;
-    AioWait wait;
 };
 
 typedef struct BlockBackendAIOCB {
@@ -121,6 +120,7 @@ static void blk_root_inherit_options(int *child_flags, QDict *child_options,
     abort();
 }
 static void blk_root_drained_begin(BdrvChild *child);
+static bool blk_root_drained_poll(BdrvChild *child);
 static void blk_root_drained_end(BdrvChild *child);
 
 static void blk_root_change_media(BdrvChild *child, bool load);
@@ -294,6 +294,7 @@ static const BdrvChildRole child_root = {
     .get_parent_desc    = blk_root_get_parent_desc,
 
     .drained_begin      = blk_root_drained_begin,
+    .drained_poll       = blk_root_drained_poll,
     .drained_end        = blk_root_drained_end,
 
     .activate           = blk_root_activate,
@@ -433,6 +434,7 @@ int blk_get_refcnt(BlockBackend *blk)
  */
 void blk_ref(BlockBackend *blk)
 {
+    assert(blk->refcnt > 0);
     blk->refcnt++;
 }
 
@@ -445,7 +447,13 @@ void blk_unref(BlockBackend *blk)
 {
     if (blk) {
         assert(blk->refcnt > 0);
-        if (!--blk->refcnt) {
+        if (blk->refcnt > 1) {
+            blk->refcnt--;
+        } else {
+            blk_drain(blk);
+            /* blk_drain() cannot resurrect blk, nobody held a reference */
+            assert(blk->refcnt == 1);
+            blk->refcnt = 0;
             blk_delete(blk);
         }
     }
@@ -1289,7 +1297,7 @@ static void blk_inc_in_flight(BlockBackend *blk)
 static void blk_dec_in_flight(BlockBackend *blk)
 {
     atomic_dec(&blk->in_flight);
-    aio_wait_kick(&blk->wait);
+    aio_wait_kick();
 }
 
 static void error_callback_bh(void *opaque)
@@ -1330,8 +1338,8 @@ static const AIOCBInfo blk_aio_em_aiocb_info = {
 static void blk_aio_complete(BlkAioEmAIOCB *acb)
 {
     if (acb->has_returned) {
-        blk_dec_in_flight(acb->rwco.blk);
         acb->common.cb(acb->common.opaque, acb->rwco.ret);
+        blk_dec_in_flight(acb->rwco.blk);
         qemu_aio_unref(acb);
     }
 }
@@ -1590,9 +1598,8 @@ void blk_drain(BlockBackend *blk)
     }
 
     /* We may have -ENOMEDIUM completions in flight */
-    AIO_WAIT_WHILE(&blk->wait,
-            blk_get_aio_context(blk),
-            atomic_mb_read(&blk->in_flight) > 0);
+    AIO_WAIT_WHILE(blk_get_aio_context(blk),
+                   atomic_mb_read(&blk->in_flight) > 0);
 
     if (bs) {
         bdrv_drained_end(bs);
@@ -1611,8 +1618,7 @@ void blk_drain_all(void)
         aio_context_acquire(ctx);
 
         /* We may have -ENOMEDIUM completions in flight */
-        AIO_WAIT_WHILE(&blk->wait, ctx,
-                atomic_mb_read(&blk->in_flight) > 0);
+        AIO_WAIT_WHILE(ctx, atomic_mb_read(&blk->in_flight) > 0);
 
         aio_context_release(ctx);
     }
@@ -2189,6 +2195,13 @@ static void blk_root_drained_begin(BdrvChild *child)
     }
 }
 
+static bool blk_root_drained_poll(BdrvChild *child)
+{
+    BlockBackend *blk = child->opaque;
+    assert(blk->quiesce_counter);
+    return !!blk->in_flight;
+}
+
 static void blk_root_drained_end(BdrvChild *child)
 {
     BlockBackend *blk = child->opaque;
diff --git a/block/io.c b/block/io.c
index 7100344c7b..bd9d688f8b 100644
--- a/block/io.c
+++ b/block/io.c
@@ -38,8 +38,6 @@
 /* Maximum bounce buffer for copy-on-read and write zeroes, in bytes */
 #define MAX_BOUNCE_BUFFER (32768 << BDRV_SECTOR_BITS)
 
-static AioWait drain_all_aio_wait;
-
 static void bdrv_parent_cb_resize(BlockDriverState *bs);
 static int coroutine_fn bdrv_co_do_pwrite_zeroes(BlockDriverState *bs,
     int64_t offset, int bytes, BdrvRequestFlags flags);
@@ -268,10 +266,6 @@ bool bdrv_drain_poll(BlockDriverState *bs, bool recursive,
 static bool bdrv_drain_poll_top_level(BlockDriverState *bs, bool recursive,
                                       BdrvChild *ignore_parent)
 {
-    /* Execute pending BHs first and check everything else only after the BHs
-     * have executed. */
-    while (aio_poll(bs->aio_context, false));
-
     return bdrv_drain_poll(bs, recursive, ignore_parent, false);
 }
 
@@ -288,6 +282,18 @@ static void bdrv_co_drain_bh_cb(void *opaque)
     BlockDriverState *bs = data->bs;
 
     if (bs) {
+        AioContext *ctx = bdrv_get_aio_context(bs);
+        AioContext *co_ctx = qemu_coroutine_get_aio_context(co);
+
+        /*
+         * When the coroutine yielded, the lock for its home context was
+         * released, so we need to re-acquire it here. If it explicitly
+         * acquired a different context, the lock is still held and we don't
+         * want to lock it a second time (or AIO_WAIT_WHILE() would hang).
+         */
+        if (ctx == co_ctx) {
+            aio_context_acquire(ctx);
+        }
         bdrv_dec_in_flight(bs);
         if (data->begin) {
             bdrv_do_drained_begin(bs, data->recursive, data->parent,
@@ -296,6 +302,9 @@ static void bdrv_co_drain_bh_cb(void *opaque)
             bdrv_do_drained_end(bs, data->recursive, data->parent,
                                 data->ignore_bds_parents);
         }
+        if (ctx == co_ctx) {
+            aio_context_release(ctx);
+        }
     } else {
         assert(data->begin);
         bdrv_drain_all_begin();
@@ -496,10 +505,6 @@ static bool bdrv_drain_all_poll(void)
     BlockDriverState *bs = NULL;
     bool result = false;
 
-    /* Execute pending BHs first (may modify the graph) and check everything
-     * else only after the BHs have executed. */
-    while (aio_poll(qemu_get_aio_context(), false));
-
     /* bdrv_drain_poll() can't make changes to the graph and we are holding the
      * main AioContext lock, so iterating bdrv_next_all_states() is safe. */
     while ((bs = bdrv_next_all_states(bs))) {
@@ -550,7 +555,7 @@ void bdrv_drain_all_begin(void)
     }
 
     /* Now poll the in-flight requests */
-    AIO_WAIT_WHILE(&drain_all_aio_wait, NULL, bdrv_drain_all_poll());
+    AIO_WAIT_WHILE(NULL, bdrv_drain_all_poll());
 
     while ((bs = bdrv_next_all_states(bs))) {
         bdrv_drain_assert_idle(bs);
@@ -706,8 +711,7 @@ void bdrv_inc_in_flight(BlockDriverState *bs)
 
 void bdrv_wakeup(BlockDriverState *bs)
 {
-    aio_wait_kick(bdrv_get_aio_wait(bs));
-    aio_wait_kick(&drain_all_aio_wait);
+    aio_wait_kick();
 }
 
 void bdrv_dec_in_flight(BlockDriverState *bs)
diff --git a/block/linux-aio.c b/block/linux-aio.c
index 19eb922fdd..217ce60138 100644
--- a/block/linux-aio.c
+++ b/block/linux-aio.c
@@ -234,9 +234,9 @@ static void qemu_laio_process_completions(LinuxAioState *s)
 
 static void qemu_laio_process_completions_and_submit(LinuxAioState *s)
 {
+    aio_context_acquire(s->aio_context);
     qemu_laio_process_completions(s);
 
-    aio_context_acquire(s->aio_context);
     if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
         ioq_submit(s);
     }
diff --git a/blockdev.c b/blockdev.c
index d4b42403df..a8755bd908 100644
--- a/blockdev.c
+++ b/blockdev.c
@@ -3214,7 +3214,9 @@ out:
 }
 
 void qmp_block_commit(bool has_job_id, const char *job_id, const char *device,
+                      bool has_base_node, const char *base_node,
                       bool has_base, const char *base,
+                      bool has_top_node, const char *top_node,
                       bool has_top, const char *top,
                       bool has_backing_file, const char *backing_file,
                       bool has_speed, int64_t speed,
@@ -3275,7 +3277,20 @@ void qmp_block_commit(bool has_job_id, const char *job_id, const char *device,
     /* default top_bs is the active layer */
     top_bs = bs;
 
-    if (has_top && top) {
+    if (has_top_node && has_top) {
+        error_setg(errp, "'top-node' and 'top' are mutually exclusive");
+        goto out;
+    } else if (has_top_node) {
+        top_bs = bdrv_lookup_bs(NULL, top_node, errp);
+        if (top_bs == NULL) {
+            goto out;
+        }
+        if (!bdrv_chain_contains(bs, top_bs)) {
+            error_setg(errp, "'%s' is not in this backing file chain",
+                       top_node);
+            goto out;
+        }
+    } else if (has_top && top) {
         if (strcmp(bs->filename, top) != 0) {
             top_bs = bdrv_find_backing_image(bs, top);
         }
@@ -3288,7 +3303,20 @@ void qmp_block_commit(bool has_job_id, const char *job_id, const char *device,
 
     assert(bdrv_get_aio_context(top_bs) == aio_context);
 
-    if (has_base && base) {
+    if (has_base_node && has_base) {
+        error_setg(errp, "'base-node' and 'base' are mutually exclusive");
+        goto out;
+    } else if (has_base_node) {
+        base_bs = bdrv_lookup_bs(NULL, base_node, errp);
+        if (base_bs == NULL) {
+            goto out;
+        }
+        if (!bdrv_chain_contains(top_bs, base_bs)) {
+            error_setg(errp, "'%s' is not in this backing file chain",
+                       base_node);
+            goto out;
+        }
+    } else if (has_base && base) {
         base_bs = bdrv_find_backing_image(top_bs, base);
     } else {
         base_bs = bdrv_find_base(top_bs);
diff --git a/blockjob.c b/blockjob.c
index bf7ef48f98..58de8cb024 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -164,7 +164,7 @@ static bool child_job_drained_poll(BdrvChild *c)
     /* An inactive or completed job doesn't have any pending requests. Jobs
      * with !job->busy are either already paused or have a pause point after
      * being reentered, so no job driver code will run before they pause. */
-    if (!job->busy || job_is_completed(job) || job->deferred_to_main_loop) {
+    if (!job->busy || job_is_completed(job)) {
         return false;
     }
 
@@ -221,6 +221,11 @@ int block_job_add_bdrv(BlockJob *job, const char *name, BlockDriverState *bs,
     return 0;
 }
 
+static void block_job_on_idle(Notifier *n, void *opaque)
+{
+    aio_wait_kick();
+}
+
 bool block_job_is_internal(BlockJob *job)
 {
     return (job->job.id == NULL);
@@ -416,6 +421,7 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
     job->finalize_completed_notifier.notify = block_job_event_completed;
     job->pending_notifier.notify = block_job_event_pending;
     job->ready_notifier.notify = block_job_event_ready;
+    job->idle_notifier.notify = block_job_on_idle;
 
     notifier_list_add(&job->job.on_finalize_cancelled,
                       &job->finalize_cancelled_notifier);
@@ -423,6 +429,7 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
                       &job->finalize_completed_notifier);
     notifier_list_add(&job->job.on_pending, &job->pending_notifier);
     notifier_list_add(&job->job.on_ready, &job->ready_notifier);
+    notifier_list_add(&job->job.on_idle, &job->idle_notifier);
 
     error_setg(&job->blocker, "block device is in use by block job: %s",
                job_type_str(&job->job));
diff --git a/include/block/aio-wait.h b/include/block/aio-wait.h
index c85a62f798..afd0ff7eb8 100644
--- a/include/block/aio-wait.h
+++ b/include/block/aio-wait.h
@@ -30,14 +30,15 @@
 /**
  * AioWait:
  *
- * An object that facilitates synchronous waiting on a condition.  The main
- * loop can wait on an operation running in an IOThread as follows:
+ * An object that facilitates synchronous waiting on a condition. A single
+ * global AioWait object (global_aio_wait) is used internally.
+ *
+ * The main loop can wait on an operation running in an IOThread as follows:
  *
- *   AioWait *wait = ...;
  *   AioContext *ctx = ...;
  *   MyWork work = { .done = false };
  *   schedule_my_work_in_iothread(ctx, &work);
- *   AIO_WAIT_WHILE(wait, ctx, !work.done);
+ *   AIO_WAIT_WHILE(ctx, !work.done);
  *
  * The IOThread must call aio_wait_kick() to notify the main loop when
  * work.done changes:
@@ -46,7 +47,7 @@
  *   {
  *       ...
  *       work.done = true;
- *       aio_wait_kick(wait);
+ *       aio_wait_kick();
  *   }
  */
 typedef struct {
@@ -54,9 +55,10 @@ typedef struct {
     unsigned num_waiters;
 } AioWait;
 
+extern AioWait global_aio_wait;
+
 /**
  * AIO_WAIT_WHILE:
- * @wait: the aio wait object
  * @ctx: the aio context, or NULL if multiple aio contexts (for which the
  *       caller does not hold a lock) are involved in the polling condition.
  * @cond: wait while this conditional expression is true
@@ -72,10 +74,12 @@ typedef struct {
  * wait on conditions between two IOThreads since that could lead to deadlock,
  * go via the main loop instead.
  */
-#define AIO_WAIT_WHILE(wait, ctx, cond) ({                         \
+#define AIO_WAIT_WHILE(ctx, cond) ({                               \
     bool waited_ = false;                                          \
-    AioWait *wait_ = (wait);                                       \
+    AioWait *wait_ = &global_aio_wait;                             \
     AioContext *ctx_ = (ctx);                                      \
+    /* Increment wait_->num_waiters before evaluating cond. */     \
+    atomic_inc(&wait_->num_waiters);                               \
     if (ctx_ && in_aio_context_home_thread(ctx_)) {                \
         while ((cond)) {                                           \
             aio_poll(ctx_, true);                                  \
@@ -84,8 +88,6 @@ typedef struct {
     } else {                                                       \
         assert(qemu_get_current_aio_context() ==                   \
                qemu_get_aio_context());                            \
-        /* Increment wait_->num_waiters before evaluating cond. */ \
-        atomic_inc(&wait_->num_waiters);                           \
         while ((cond)) {                                           \
             if (ctx_) {                                            \
                 aio_context_release(ctx_);                         \
@@ -96,20 +98,18 @@ typedef struct {
             }                                                      \
             waited_ = true;                                        \
         }                                                          \
-        atomic_dec(&wait_->num_waiters);                           \
     }                                                              \
+    atomic_dec(&wait_->num_waiters);                               \
     waited_; })
 
 /**
  * aio_wait_kick:
- * @wait: the aio wait object that should re-evaluate its condition
- *
  * Wake up the main thread if it is waiting on AIO_WAIT_WHILE().  During
  * synchronous operations performed in an IOThread, the main thread lets the
  * IOThread's event loop run, waiting for the operation to complete.  A
  * aio_wait_kick() call will wake up the main thread.
  */
-void aio_wait_kick(AioWait *wait);
+void aio_wait_kick(void);
 
 /**
  * aio_wait_bh_oneshot:
diff --git a/include/block/block.h b/include/block/block.h
index 4e0871aaf9..4edc1e8afa 100644
--- a/include/block/block.h
+++ b/include/block/block.h
@@ -410,13 +410,9 @@ void bdrv_drain_all_begin(void);
 void bdrv_drain_all_end(void);
 void bdrv_drain_all(void);
 
-/* Returns NULL when bs == NULL */
-AioWait *bdrv_get_aio_wait(BlockDriverState *bs);
-
 #define BDRV_POLL_WHILE(bs, cond) ({                       \
     BlockDriverState *bs_ = (bs);                          \
-    AIO_WAIT_WHILE(bdrv_get_aio_wait(bs_),                 \
-                   bdrv_get_aio_context(bs_),              \
+    AIO_WAIT_WHILE(bdrv_get_aio_context(bs_),              \
                    cond); })
 
 int bdrv_pdiscard(BdrvChild *child, int64_t offset, int bytes);
diff --git a/include/block/block_int.h b/include/block/block_int.h
index 4000d2af45..92ecbd866e 100644
--- a/include/block/block_int.h
+++ b/include/block/block_int.h
@@ -794,9 +794,6 @@ struct BlockDriverState {
     unsigned int in_flight;
     unsigned int serialising_in_flight;
 
-    /* Kicked to signal main loop when a request completes. */
-    AioWait wait;
-
     /* counter for nested bdrv_io_plug.
      * Accessed with atomic ops.
     */
diff --git a/include/block/blockjob.h b/include/block/blockjob.h
index 32c00b7dc0..ede0bd8dcb 100644
--- a/include/block/blockjob.h
+++ b/include/block/blockjob.h
@@ -70,6 +70,9 @@ typedef struct BlockJob {
     /** Called when the job transitions to READY */
     Notifier ready_notifier;
 
+    /** Called when the job coroutine yields or terminates */
+    Notifier idle_notifier;
+
     /** BlockDriverStates that are involved in this block job */
     GSList *nodes;
 } BlockJob;
diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h
index 6f8a487041..9801e7f5a4 100644
--- a/include/qemu/coroutine.h
+++ b/include/qemu/coroutine.h
@@ -90,6 +90,11 @@ void qemu_aio_coroutine_enter(AioContext *ctx, Coroutine *co);
 void coroutine_fn qemu_coroutine_yield(void);
 
 /**
+ * Get the AioContext of the given coroutine
+ */
+AioContext *coroutine_fn qemu_coroutine_get_aio_context(Coroutine *co);
+
+/**
  * Get the currently executing coroutine
  */
 Coroutine *coroutine_fn qemu_coroutine_self(void);
diff --git a/include/qemu/job.h b/include/qemu/job.h
index 5cb0681834..9e7cd1e4a0 100644
--- a/include/qemu/job.h
+++ b/include/qemu/job.h
@@ -76,6 +76,9 @@ typedef struct Job {
      * Set to false by the job while the coroutine has yielded and may be
      * re-entered by job_enter(). There may still be I/O or event loop activity
      * pending. Accessed under block_job_mutex (in blockjob.c).
+     *
+     * When the job is deferred to the main loop, busy is true as long as the
+     * bottom half is still pending.
      */
     bool busy;
 
@@ -156,6 +159,9 @@ typedef struct Job {
     /** Notifiers called when the job transitions to READY */
     NotifierList on_ready;
 
+    /** Notifiers called when the job coroutine yields or terminates */
+    NotifierList on_idle;
+
     /** Element of the list of jobs */
     QLIST_ENTRY(Job) job_list;
 
@@ -521,6 +527,8 @@ void job_user_cancel(Job *job, bool force, Error **errp);
  *
  * Returns the return value from the job if the job actually completed
  * during the call, or -ECANCELED if it was canceled.
+ *
+ * Callers must hold the AioContext lock of job->aio_context.
  */
 int job_cancel_sync(Job *job);
 
@@ -538,6 +546,8 @@ void job_cancel_sync_all(void);
  * function).
  *
  * Returns the return value from the job.
+ *
+ * Callers must hold the AioContext lock of job->aio_context.
  */
 int job_complete_sync(Job *job, Error **errp);
 
@@ -563,6 +573,8 @@ void job_dismiss(Job **job, Error **errp);
  *
  * Returns 0 if the job is successfully completed, -ECANCELED if the job was
  * cancelled before completing, and -errno in other error cases.
+ *
+ * Callers must hold the AioContext lock of job->aio_context.
  */
 int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp), Error **errp);
 
diff --git a/job.c b/job.c
index 192f168423..c65e01bbfa 100644
--- a/job.c
+++ b/job.c
@@ -29,6 +29,7 @@
 #include "qemu/job.h"
 #include "qemu/id.h"
 #include "qemu/main-loop.h"
+#include "block/aio-wait.h"
 #include "trace-root.h"
 #include "qapi/qapi-events-job.h"
 
@@ -136,21 +137,13 @@ static void job_txn_del_job(Job *job)
     }
 }
 
-static int job_txn_apply(JobTxn *txn, int fn(Job *), bool lock)
+static int job_txn_apply(JobTxn *txn, int fn(Job *))
 {
-    AioContext *ctx;
     Job *job, *next;
     int rc = 0;
 
     QLIST_FOREACH_SAFE(job, &txn->jobs, txn_list, next) {
-        if (lock) {
-            ctx = job->aio_context;
-            aio_context_acquire(ctx);
-        }
         rc = fn(job);
-        if (lock) {
-            aio_context_release(ctx);
-        }
         if (rc) {
             break;
         }
@@ -410,6 +403,11 @@ static void job_event_ready(Job *job)
     notifier_list_notify(&job->on_ready, job);
 }
 
+static void job_event_idle(Job *job)
+{
+    notifier_list_notify(&job->on_idle, job);
+}
+
 void job_enter_cond(Job *job, bool(*fn)(Job *job))
 {
     if (!job_started(job)) {
@@ -455,6 +453,7 @@ static void coroutine_fn job_do_yield(Job *job, uint64_t ns)
         timer_mod(&job->sleep_timer, ns);
     }
     job->busy = false;
+    job_event_idle(job);
     job_unlock();
     qemu_coroutine_yield();
 
@@ -719,6 +718,7 @@ static void job_cancel_async(Job *job, bool force)
 
 static void job_completed_txn_abort(Job *job)
 {
+    AioContext *outer_ctx = job->aio_context;
     AioContext *ctx;
     JobTxn *txn = job->txn;
     Job *other_job;
@@ -732,23 +732,26 @@ static void job_completed_txn_abort(Job *job)
     txn->aborting = true;
     job_txn_ref(txn);
 
-    /* We are the first failed job. Cancel other jobs. */
-    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
-        ctx = other_job->aio_context;
-        aio_context_acquire(ctx);
-    }
+    /* We can only hold the single job's AioContext lock while calling
+     * job_finalize_single() because the finalization callbacks can involve
+     * calls of AIO_WAIT_WHILE(), which could deadlock otherwise. */
+    aio_context_release(outer_ctx);
 
     /* Other jobs are effectively cancelled by us, set the status for
      * them; this job, however, may or may not be cancelled, depending
      * on the caller, so leave it. */
     QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
         if (other_job != job) {
+            ctx = other_job->aio_context;
+            aio_context_acquire(ctx);
             job_cancel_async(other_job, false);
+            aio_context_release(ctx);
         }
     }
     while (!QLIST_EMPTY(&txn->jobs)) {
         other_job = QLIST_FIRST(&txn->jobs);
         ctx = other_job->aio_context;
+        aio_context_acquire(ctx);
         if (!job_is_completed(other_job)) {
             assert(job_is_cancelled(other_job));
             job_finish_sync(other_job, NULL, NULL);
@@ -757,6 +760,8 @@ static void job_completed_txn_abort(Job *job)
         aio_context_release(ctx);
     }
 
+    aio_context_acquire(outer_ctx);
+
     job_txn_unref(txn);
 }
 
@@ -780,11 +785,11 @@ static void job_do_finalize(Job *job)
     assert(job && job->txn);
 
     /* prepare the transaction to complete */
-    rc = job_txn_apply(job->txn, job_prepare, true);
+    rc = job_txn_apply(job->txn, job_prepare);
     if (rc) {
         job_completed_txn_abort(job);
     } else {
-        job_txn_apply(job->txn, job_finalize_single, true);
+        job_txn_apply(job->txn, job_finalize_single);
     }
 }
 
@@ -830,10 +835,10 @@ static void job_completed_txn_success(Job *job)
         assert(other_job->ret == 0);
     }
 
-    job_txn_apply(txn, job_transition_to_pending, false);
+    job_txn_apply(txn, job_transition_to_pending);
 
     /* If no jobs need manual finalization, automatically do so */
-    if (job_txn_apply(txn, job_needs_finalize, false) == 0) {
+    if (job_txn_apply(txn, job_needs_finalize) == 0) {
         job_do_finalize(job);
     }
 }
@@ -855,7 +860,20 @@ static void job_completed(Job *job)
 static void job_exit(void *opaque)
 {
     Job *job = (Job *)opaque;
+    AioContext *ctx = job->aio_context;
+
+    aio_context_acquire(ctx);
+
+    /* This is a lie, we're not quiescent, but still doing the completion
+     * callbacks. However, completion callbacks tend to involve operations that
+     * drain block nodes, and if .drained_poll still returned true, we would
+     * deadlock. */
+    job->busy = false;
+    job_event_idle(job);
+
     job_completed(job);
+
+    aio_context_release(ctx);
 }
 
 /**
@@ -870,6 +888,7 @@ static void coroutine_fn job_co_entry(void *opaque)
     job_pause_point(job);
     job->ret = job->driver->run(job, &job->err);
     job->deferred_to_main_loop = true;
+    job->busy = true;
     aio_bh_schedule_oneshot(qemu_get_aio_context(), job_exit, job);
 }
 
@@ -971,14 +990,10 @@ int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp), Error **errp)
         job_unref(job);
         return -EBUSY;
     }
-    /* job_drain calls job_enter, and it should be enough to induce progress
-     * until the job completes or moves to the main thread. */
-    while (!job->deferred_to_main_loop && !job_is_completed(job)) {
-        job_drain(job);
-    }
-    while (!job_is_completed(job)) {
-        aio_poll(qemu_get_aio_context(), true);
-    }
+
+    AIO_WAIT_WHILE(job->aio_context,
+                   (job_drain(job), !job_is_completed(job)));
+
     ret = (job_is_cancelled(job) && job->ret == 0) ? -ECANCELED : job->ret;
     job_unref(job);
     return ret;
diff --git a/qapi/block-core.json b/qapi/block-core.json
index c0b3d33dbb..ac3b48ee54 100644
--- a/qapi/block-core.json
+++ b/qapi/block-core.json
@@ -1457,12 +1457,23 @@
 #
 # @device:  the device name or node-name of a root node
 #
-# @base:   The file name of the backing image to write data into.
-#                    If not specified, this is the deepest backing image.
+# @base-node: The node name of the backing image to write data into.
+#             If not specified, this is the deepest backing image.
+#             (since: 3.1)
 #
-# @top:    The file name of the backing image within the image chain,
-#                    which contains the topmost data to be committed down. If
-#                    not specified, this is the active layer.
+# @base: Same as @base-node, except that it is a file name rather than a node
+#        name. This must be the exact filename string that was used to open the
+#        node; other strings, even if addressing the same file, are not
+#        accepted (deprecated, use @base-node instead)
+#
+# @top-node: The node name of the backing image within the image chain
+#            which contains the topmost data to be committed down. If
+#            not specified, this is the active layer. (since: 3.1)
+#
+# @top: Same as @top-node, except that it is a file name rather than a node
+#       name. This must be the exact filename string that was used to open the
+#       node; other strings, even if addressing the same file, are not
+#       accepted (deprecated, use @base-node instead)
 #
 # @backing-file:  The backing file string to write into the overlay
 #                           image of 'top'.  If 'top' is the active layer,
@@ -1528,7 +1539,8 @@
 #
 ##
 { 'command': 'block-commit',
-  'data': { '*job-id': 'str', 'device': 'str', '*base': 'str', '*top': 'str',
+  'data': { '*job-id': 'str', 'device': 'str', '*base-node': 'str',
+            '*base': 'str', '*top-node': 'str', '*top': 'str',
             '*backing-file': 'str', '*speed': 'int',
             '*filter-node-name': 'str',
             '*auto-finalize': 'bool', '*auto-dismiss': 'bool' } }
diff --git a/tests/qemu-iotests/040 b/tests/qemu-iotests/040
index 1beb5e6dab..1cb1ceeb33 100755
--- a/tests/qemu-iotests/040
+++ b/tests/qemu-iotests/040
@@ -57,9 +57,12 @@ class ImageCommitTestCase(iotests.QMPTestCase):
         self.assert_no_active_block_jobs()
         self.vm.shutdown()
 
-    def run_commit_test(self, top, base, need_ready=False):
+    def run_commit_test(self, top, base, need_ready=False, node_names=False):
         self.assert_no_active_block_jobs()
-        result = self.vm.qmp('block-commit', device='drive0', top=top, base=base)
+        if node_names:
+            result = self.vm.qmp('block-commit', device='drive0', top_node=top, base_node=base)
+        else:
+            result = self.vm.qmp('block-commit', device='drive0', top=top, base=base)
         self.assert_qmp(result, 'return', {})
         self.wait_for_complete(need_ready)
 
@@ -101,6 +104,11 @@ class TestSingleDrive(ImageCommitTestCase):
         self.assertEqual(-1, qemu_io('-f', 'raw', '-c', 'read -P 0xab 0 524288', backing_img).find("verification failed"))
         self.assertEqual(-1, qemu_io('-f', 'raw', '-c', 'read -P 0xef 524288 524288', backing_img).find("verification failed"))
 
+    def test_commit_node(self):
+        self.run_commit_test("mid", "base", node_names=True)
+        self.assertEqual(-1, qemu_io('-f', 'raw', '-c', 'read -P 0xab 0 524288', backing_img).find("verification failed"))
+        self.assertEqual(-1, qemu_io('-f', 'raw', '-c', 'read -P 0xef 524288 524288', backing_img).find("verification failed"))
+
     def test_device_not_found(self):
         result = self.vm.qmp('block-commit', device='nonexistent', top='%s' % mid_img)
         self.assert_qmp(result, 'error/class', 'DeviceNotFound')
@@ -123,6 +131,30 @@ class TestSingleDrive(ImageCommitTestCase):
         self.assert_qmp(result, 'error/class', 'GenericError')
         self.assert_qmp(result, 'error/desc', 'Base \'badfile\' not found')
 
+    def test_top_node_invalid(self):
+        self.assert_no_active_block_jobs()
+        result = self.vm.qmp('block-commit', device='drive0', top_node='badfile', base_node='base')
+        self.assert_qmp(result, 'error/class', 'GenericError')
+        self.assert_qmp(result, 'error/desc', "Cannot find device= nor node_name=badfile")
+
+    def test_base_node_invalid(self):
+        self.assert_no_active_block_jobs()
+        result = self.vm.qmp('block-commit', device='drive0', top_node='mid', base_node='badfile')
+        self.assert_qmp(result, 'error/class', 'GenericError')
+        self.assert_qmp(result, 'error/desc', "Cannot find device= nor node_name=badfile")
+
+    def test_top_path_and_node(self):
+        self.assert_no_active_block_jobs()
+        result = self.vm.qmp('block-commit', device='drive0', top_node='mid', base_node='base', top='%s' % mid_img)
+        self.assert_qmp(result, 'error/class', 'GenericError')
+        self.assert_qmp(result, 'error/desc', "'top-node' and 'top' are mutually exclusive")
+
+    def test_base_path_and_node(self):
+        self.assert_no_active_block_jobs()
+        result = self.vm.qmp('block-commit', device='drive0', top_node='mid', base_node='base', base='%s' % backing_img)
+        self.assert_qmp(result, 'error/class', 'GenericError')
+        self.assert_qmp(result, 'error/desc', "'base-node' and 'base' are mutually exclusive")
+
     def test_top_is_active(self):
         self.run_commit_test(test_img, backing_img, need_ready=True)
         self.assertEqual(-1, qemu_io('-f', 'raw', '-c', 'read -P 0xab 0 524288', backing_img).find("verification failed"))
@@ -139,6 +171,22 @@ class TestSingleDrive(ImageCommitTestCase):
         self.assert_qmp(result, 'error/class', 'GenericError')
         self.assert_qmp(result, 'error/desc', 'Base \'%s\' not found' % mid_img)
 
+    def test_top_and_base_node_reversed(self):
+        self.assert_no_active_block_jobs()
+        result = self.vm.qmp('block-commit', device='drive0', top_node='base', base_node='top')
+        self.assert_qmp(result, 'error/class', 'GenericError')
+        self.assert_qmp(result, 'error/desc', "'top' is not in this backing file chain")
+
+    def test_top_node_in_wrong_chain(self):
+        self.assert_no_active_block_jobs()
+
+        result = self.vm.qmp('blockdev-add', driver='null-co', node_name='null')
+        self.assert_qmp(result, 'return', {})
+
+        result = self.vm.qmp('block-commit', device='drive0', top_node='null', base_node='base')
+        self.assert_qmp(result, 'error/class', 'GenericError')
+        self.assert_qmp(result, 'error/desc', "'null' is not in this backing file chain")
+
     # When the job is running on a BB that is automatically deleted on hot
     # unplug, the job is cancelled when the device disappears
     def test_hot_unplug(self):
diff --git a/tests/qemu-iotests/040.out b/tests/qemu-iotests/040.out
index e20a75ce4f..802ffaa0c0 100644
--- a/tests/qemu-iotests/040.out
+++ b/tests/qemu-iotests/040.out
@@ -1,5 +1,5 @@
-.............................
+...........................................
 ----------------------------------------------------------------------
-Ran 29 tests
+Ran 43 tests
 
 OK
diff --git a/tests/qemu-iotests/051 b/tests/qemu-iotests/051
index ee9c820d0f..25d3b2d478 100755
--- a/tests/qemu-iotests/051
+++ b/tests/qemu-iotests/051
@@ -354,6 +354,9 @@ printf %b "qemu-io $device_id \"write -P 0x33 0 4k\"\ncommit $device_id\n" |
 
 $QEMU_IO -c "read -P 0x33 0 4k" "$TEST_IMG" | _filter_qemu_io
 
+# Using snapshot=on with a non-existent TMPDIR
+TMPDIR=/nonexistent run_qemu -drive driver=null-co,snapshot=on
+
 # success, all done
 echo "*** done"
 rm -f $seq.full
diff --git a/tests/qemu-iotests/051.out b/tests/qemu-iotests/051.out
index b7273505c7..793af2ab96 100644
--- a/tests/qemu-iotests/051.out
+++ b/tests/qemu-iotests/051.out
@@ -455,4 +455,7 @@ wrote 4096/4096 bytes at offset 0
 
 read 4096/4096 bytes at offset 0
 4 KiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
+Testing: -drive driver=null-co,snapshot=on
+QEMU_PROG: -drive driver=null-co,snapshot=on: Could not get temporary filename: No such file or directory
+
 *** done
diff --git a/tests/qemu-iotests/051.pc.out b/tests/qemu-iotests/051.pc.out
index e9257fe318..ca64edae6a 100644
--- a/tests/qemu-iotests/051.pc.out
+++ b/tests/qemu-iotests/051.pc.out
@@ -527,4 +527,7 @@ wrote 4096/4096 bytes at offset 0
 
 read 4096/4096 bytes at offset 0
 4 KiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
+Testing: -drive driver=null-co,snapshot=on
+QEMU_PROG: -drive driver=null-co,snapshot=on: Could not get temporary filename: No such file or directory
+
 *** done
diff --git a/tests/test-bdrv-drain.c b/tests/test-bdrv-drain.c
index 89ac15e88a..c9f29c8b10 100644
--- a/tests/test-bdrv-drain.c
+++ b/tests/test-bdrv-drain.c
@@ -174,6 +174,28 @@ static void do_drain_end(enum drain_type drain_type, BlockDriverState *bs)
     }
 }
 
+static void do_drain_begin_unlocked(enum drain_type drain_type, BlockDriverState *bs)
+{
+    if (drain_type != BDRV_DRAIN_ALL) {
+        aio_context_acquire(bdrv_get_aio_context(bs));
+    }
+    do_drain_begin(drain_type, bs);
+    if (drain_type != BDRV_DRAIN_ALL) {
+        aio_context_release(bdrv_get_aio_context(bs));
+    }
+}
+
+static void do_drain_end_unlocked(enum drain_type drain_type, BlockDriverState *bs)
+{
+    if (drain_type != BDRV_DRAIN_ALL) {
+        aio_context_acquire(bdrv_get_aio_context(bs));
+    }
+    do_drain_end(drain_type, bs);
+    if (drain_type != BDRV_DRAIN_ALL) {
+        aio_context_release(bdrv_get_aio_context(bs));
+    }
+}
+
 static void test_drv_cb_common(enum drain_type drain_type, bool recursive)
 {
     BlockBackend *blk;
@@ -614,6 +636,17 @@ static void test_iothread_aio_cb(void *opaque, int ret)
     qemu_event_set(&done_event);
 }
 
+static void test_iothread_main_thread_bh(void *opaque)
+{
+    struct test_iothread_data *data = opaque;
+
+    /* Test that the AioContext is not yet locked in a random BH that is
+     * executed during drain, otherwise this would deadlock. */
+    aio_context_acquire(bdrv_get_aio_context(data->bs));
+    bdrv_flush(data->bs);
+    aio_context_release(bdrv_get_aio_context(data->bs));
+}
+
 /*
  * Starts an AIO request on a BDS that runs in the AioContext of iothread 1.
  * The request involves a BH on iothread 2 before it can complete.
@@ -683,6 +716,8 @@ static void test_iothread_common(enum drain_type drain_type, int drain_thread)
             aio_context_acquire(ctx_a);
         }
 
+        aio_bh_schedule_oneshot(ctx_a, test_iothread_main_thread_bh, &data);
+
         /* The request is running on the IOThread a. Draining its block device
          * will make sure that it has completed as far as the BDS is concerned,
          * but the drain in this thread can continue immediately after
@@ -749,23 +784,56 @@ static void test_iothread_drain_subtree(void)
 
 typedef struct TestBlockJob {
     BlockJob common;
+    int run_ret;
+    int prepare_ret;
+    bool running;
     bool should_complete;
 } TestBlockJob;
 
+static int test_job_prepare(Job *job)
+{
+    TestBlockJob *s = container_of(job, TestBlockJob, common.job);
+
+    /* Provoke an AIO_WAIT_WHILE() call to verify there is no deadlock */
+    blk_flush(s->common.blk);
+    return s->prepare_ret;
+}
+
+static void test_job_commit(Job *job)
+{
+    TestBlockJob *s = container_of(job, TestBlockJob, common.job);
+
+    /* Provoke an AIO_WAIT_WHILE() call to verify there is no deadlock */
+    blk_flush(s->common.blk);
+}
+
+static void test_job_abort(Job *job)
+{
+    TestBlockJob *s = container_of(job, TestBlockJob, common.job);
+
+    /* Provoke an AIO_WAIT_WHILE() call to verify there is no deadlock */
+    blk_flush(s->common.blk);
+}
+
 static int coroutine_fn test_job_run(Job *job, Error **errp)
 {
     TestBlockJob *s = container_of(job, TestBlockJob, common.job);
 
+    /* We are running the actual job code past the pause point in
+     * job_co_entry(). */
+    s->running = true;
+
     job_transition_to_ready(&s->common.job);
     while (!s->should_complete) {
-        /* Avoid block_job_sleep_ns() because it marks the job as !busy. We
-         * want to emulate some actual activity (probably some I/O) here so
-         * that drain has to wait for this acitivity to stop. */
-        qemu_co_sleep_ns(QEMU_CLOCK_REALTIME, 100000);
+        /* Avoid job_sleep_ns() because it marks the job as !busy. We want to
+         * emulate some actual activity (probably some I/O) here so that drain
+         * has to wait for this activity to stop. */
+        qemu_co_sleep_ns(QEMU_CLOCK_REALTIME, 1000000);
+
         job_pause_point(&s->common.job);
     }
 
-    return 0;
+    return s->run_ret;
 }
 
 static void test_job_complete(Job *job, Error **errp)
@@ -782,36 +850,115 @@ BlockJobDriver test_job_driver = {
         .drain          = block_job_drain,
         .run            = test_job_run,
         .complete       = test_job_complete,
+        .prepare        = test_job_prepare,
+        .commit         = test_job_commit,
+        .abort          = test_job_abort,
     },
 };
 
-static void test_blockjob_common(enum drain_type drain_type)
+enum test_job_result {
+    TEST_JOB_SUCCESS,
+    TEST_JOB_FAIL_RUN,
+    TEST_JOB_FAIL_PREPARE,
+};
+
+enum test_job_drain_node {
+    TEST_JOB_DRAIN_SRC,
+    TEST_JOB_DRAIN_SRC_CHILD,
+    TEST_JOB_DRAIN_SRC_PARENT,
+};
+
+static void test_blockjob_common_drain_node(enum drain_type drain_type,
+                                            bool use_iothread,
+                                            enum test_job_result result,
+                                            enum test_job_drain_node drain_node)
 {
     BlockBackend *blk_src, *blk_target;
-    BlockDriverState *src, *target;
+    BlockDriverState *src, *src_backing, *src_overlay, *target, *drain_bs;
     BlockJob *job;
+    TestBlockJob *tjob;
+    IOThread *iothread = NULL;
+    AioContext *ctx;
     int ret;
 
     src = bdrv_new_open_driver(&bdrv_test, "source", BDRV_O_RDWR,
                                &error_abort);
+    src_backing = bdrv_new_open_driver(&bdrv_test, "source-backing",
+                                       BDRV_O_RDWR, &error_abort);
+    src_overlay = bdrv_new_open_driver(&bdrv_test, "source-overlay",
+                                       BDRV_O_RDWR, &error_abort);
+
+    bdrv_set_backing_hd(src_overlay, src, &error_abort);
+    bdrv_unref(src);
+    bdrv_set_backing_hd(src, src_backing, &error_abort);
+    bdrv_unref(src_backing);
+
     blk_src = blk_new(BLK_PERM_ALL, BLK_PERM_ALL);
-    blk_insert_bs(blk_src, src, &error_abort);
+    blk_insert_bs(blk_src, src_overlay, &error_abort);
+
+    switch (drain_node) {
+    case TEST_JOB_DRAIN_SRC:
+        drain_bs = src;
+        break;
+    case TEST_JOB_DRAIN_SRC_CHILD:
+        drain_bs = src_backing;
+        break;
+    case TEST_JOB_DRAIN_SRC_PARENT:
+        drain_bs = src_overlay;
+        break;
+    default:
+        g_assert_not_reached();
+    }
+
+    if (use_iothread) {
+        iothread = iothread_new();
+        ctx = iothread_get_aio_context(iothread);
+        blk_set_aio_context(blk_src, ctx);
+    } else {
+        ctx = qemu_get_aio_context();
+    }
 
     target = bdrv_new_open_driver(&bdrv_test, "target", BDRV_O_RDWR,
                                   &error_abort);
     blk_target = blk_new(BLK_PERM_ALL, BLK_PERM_ALL);
     blk_insert_bs(blk_target, target, &error_abort);
 
-    job = block_job_create("job0", &test_job_driver, NULL, src, 0, BLK_PERM_ALL,
-                           0, 0, NULL, NULL, &error_abort);
+    aio_context_acquire(ctx);
+    tjob = block_job_create("job0", &test_job_driver, NULL, src,
+                            0, BLK_PERM_ALL,
+                            0, 0, NULL, NULL, &error_abort);
+    job = &tjob->common;
     block_job_add_bdrv(job, "target", target, 0, BLK_PERM_ALL, &error_abort);
+
+    switch (result) {
+    case TEST_JOB_SUCCESS:
+        break;
+    case TEST_JOB_FAIL_RUN:
+        tjob->run_ret = -EIO;
+        break;
+    case TEST_JOB_FAIL_PREPARE:
+        tjob->prepare_ret = -EIO;
+        break;
+    }
+
     job_start(&job->job);
+    aio_context_release(ctx);
+
+    if (use_iothread) {
+        /* job_co_entry() is run in the I/O thread, wait for the actual job
+         * code to start (we don't want to catch the job in the pause point in
+         * job_co_entry(). */
+        while (!tjob->running) {
+            aio_poll(qemu_get_aio_context(), false);
+        }
+    }
 
     g_assert_cmpint(job->job.pause_count, ==, 0);
     g_assert_false(job->job.paused);
-    g_assert_true(job->job.busy); /* We're in job_sleep_ns() */
+    g_assert_true(tjob->running);
+    g_assert_true(job->job.busy); /* We're in qemu_co_sleep_ns() */
 
-    do_drain_begin(drain_type, src);
+    do_drain_begin_unlocked(drain_type, drain_bs);
 
     if (drain_type == BDRV_DRAIN_ALL) {
         /* bdrv_drain_all() drains both src and target */
@@ -822,7 +969,14 @@ static void test_blockjob_common(enum drain_type drain_type)
     g_assert_true(job->job.paused);
     g_assert_false(job->job.busy); /* The job is paused */
 
-    do_drain_end(drain_type, src);
+    do_drain_end_unlocked(drain_type, drain_bs);
+
+    if (use_iothread) {
+        /* paused is reset in the I/O thread, wait for it */
+        while (job->job.paused) {
+            aio_poll(qemu_get_aio_context(), false);
+        }
+    }
 
     g_assert_cmpint(job->job.pause_count, ==, 0);
     g_assert_false(job->job.paused);
@@ -841,32 +995,113 @@ static void test_blockjob_common(enum drain_type drain_type)
 
     do_drain_end(drain_type, target);
 
+    if (use_iothread) {
+        /* paused is reset in the I/O thread, wait for it */
+        while (job->job.paused) {
+            aio_poll(qemu_get_aio_context(), false);
+        }
+    }
+
     g_assert_cmpint(job->job.pause_count, ==, 0);
     g_assert_false(job->job.paused);
-    g_assert_true(job->job.busy); /* We're in job_sleep_ns() */
+    g_assert_true(job->job.busy); /* We're in qemu_co_sleep_ns() */
 
+    aio_context_acquire(ctx);
     ret = job_complete_sync(&job->job, &error_abort);
-    g_assert_cmpint(ret, ==, 0);
+    g_assert_cmpint(ret, ==, (result == TEST_JOB_SUCCESS ? 0 : -EIO));
+
+    if (use_iothread) {
+        blk_set_aio_context(blk_src, qemu_get_aio_context());
+    }
+    aio_context_release(ctx);
 
     blk_unref(blk_src);
     blk_unref(blk_target);
-    bdrv_unref(src);
+    bdrv_unref(src_overlay);
     bdrv_unref(target);
+
+    if (iothread) {
+        iothread_join(iothread);
+    }
+}
+
+static void test_blockjob_common(enum drain_type drain_type, bool use_iothread,
+                                 enum test_job_result result)
+{
+    test_blockjob_common_drain_node(drain_type, use_iothread, result,
+                                    TEST_JOB_DRAIN_SRC);
+    test_blockjob_common_drain_node(drain_type, use_iothread, result,
+                                    TEST_JOB_DRAIN_SRC_CHILD);
+    if (drain_type == BDRV_SUBTREE_DRAIN) {
+        test_blockjob_common_drain_node(drain_type, use_iothread, result,
+                                        TEST_JOB_DRAIN_SRC_PARENT);
+    }
 }
 
 static void test_blockjob_drain_all(void)
 {
-    test_blockjob_common(BDRV_DRAIN_ALL);
+    test_blockjob_common(BDRV_DRAIN_ALL, false, TEST_JOB_SUCCESS);
 }
 
 static void test_blockjob_drain(void)
 {
-    test_blockjob_common(BDRV_DRAIN);
+    test_blockjob_common(BDRV_DRAIN, false, TEST_JOB_SUCCESS);
 }
 
 static void test_blockjob_drain_subtree(void)
 {
-    test_blockjob_common(BDRV_SUBTREE_DRAIN);
+    test_blockjob_common(BDRV_SUBTREE_DRAIN, false, TEST_JOB_SUCCESS);
+}
+
+static void test_blockjob_error_drain_all(void)
+{
+    test_blockjob_common(BDRV_DRAIN_ALL, false, TEST_JOB_FAIL_RUN);
+    test_blockjob_common(BDRV_DRAIN_ALL, false, TEST_JOB_FAIL_PREPARE);
+}
+
+static void test_blockjob_error_drain(void)
+{
+    test_blockjob_common(BDRV_DRAIN, false, TEST_JOB_FAIL_RUN);
+    test_blockjob_common(BDRV_DRAIN, false, TEST_JOB_FAIL_PREPARE);
+}
+
+static void test_blockjob_error_drain_subtree(void)
+{
+    test_blockjob_common(BDRV_SUBTREE_DRAIN, false, TEST_JOB_FAIL_RUN);
+    test_blockjob_common(BDRV_SUBTREE_DRAIN, false, TEST_JOB_FAIL_PREPARE);
+}
+
+static void test_blockjob_iothread_drain_all(void)
+{
+    test_blockjob_common(BDRV_DRAIN_ALL, true, TEST_JOB_SUCCESS);
+}
+
+static void test_blockjob_iothread_drain(void)
+{
+    test_blockjob_common(BDRV_DRAIN, true, TEST_JOB_SUCCESS);
+}
+
+static void test_blockjob_iothread_drain_subtree(void)
+{
+    test_blockjob_common(BDRV_SUBTREE_DRAIN, true, TEST_JOB_SUCCESS);
+}
+
+static void test_blockjob_iothread_error_drain_all(void)
+{
+    test_blockjob_common(BDRV_DRAIN_ALL, true, TEST_JOB_FAIL_RUN);
+    test_blockjob_common(BDRV_DRAIN_ALL, true, TEST_JOB_FAIL_PREPARE);
+}
+
+static void test_blockjob_iothread_error_drain(void)
+{
+    test_blockjob_common(BDRV_DRAIN, true, TEST_JOB_FAIL_RUN);
+    test_blockjob_common(BDRV_DRAIN, true, TEST_JOB_FAIL_PREPARE);
+}
+
+static void test_blockjob_iothread_error_drain_subtree(void)
+{
+    test_blockjob_common(BDRV_SUBTREE_DRAIN, true, TEST_JOB_FAIL_RUN);
+    test_blockjob_common(BDRV_SUBTREE_DRAIN, true, TEST_JOB_FAIL_PREPARE);
 }
 
 
@@ -1338,6 +1573,27 @@ int main(int argc, char **argv)
     g_test_add_func("/bdrv-drain/blockjob/drain_subtree",
                     test_blockjob_drain_subtree);
 
+    g_test_add_func("/bdrv-drain/blockjob/error/drain_all",
+                    test_blockjob_error_drain_all);
+    g_test_add_func("/bdrv-drain/blockjob/error/drain",
+                    test_blockjob_error_drain);
+    g_test_add_func("/bdrv-drain/blockjob/error/drain_subtree",
+                    test_blockjob_error_drain_subtree);
+
+    g_test_add_func("/bdrv-drain/blockjob/iothread/drain_all",
+                    test_blockjob_iothread_drain_all);
+    g_test_add_func("/bdrv-drain/blockjob/iothread/drain",
+                    test_blockjob_iothread_drain);
+    g_test_add_func("/bdrv-drain/blockjob/iothread/drain_subtree",
+                    test_blockjob_iothread_drain_subtree);
+
+    g_test_add_func("/bdrv-drain/blockjob/iothread/error/drain_all",
+                    test_blockjob_iothread_error_drain_all);
+    g_test_add_func("/bdrv-drain/blockjob/iothread/error/drain",
+                    test_blockjob_iothread_error_drain);
+    g_test_add_func("/bdrv-drain/blockjob/iothread/error/drain_subtree",
+                    test_blockjob_iothread_error_drain_subtree);
+
     g_test_add_func("/bdrv-drain/deletion/drain", test_delete_by_drain);
     g_test_add_func("/bdrv-drain/detach/drain_all", test_detach_by_drain_all);
     g_test_add_func("/bdrv-drain/detach/drain", test_detach_by_drain);
diff --git a/tests/test-blockjob.c b/tests/test-blockjob.c
index de4c1c20aa..652d1e8359 100644
--- a/tests/test-blockjob.c
+++ b/tests/test-blockjob.c
@@ -223,6 +223,10 @@ static void cancel_common(CancelJob *s)
     BlockJob *job = &s->common;
     BlockBackend *blk = s->blk;
     JobStatus sts = job->job.status;
+    AioContext *ctx;
+
+    ctx = job->job.aio_context;
+    aio_context_acquire(ctx);
 
     job_cancel_sync(&job->job);
     if (sts != JOB_STATUS_CREATED && sts != JOB_STATUS_CONCLUDED) {
@@ -232,6 +236,8 @@ static void cancel_common(CancelJob *s)
     assert(job->job.status == JOB_STATUS_NULL);
     job_unref(&job->job);
     destroy_blk(blk);
+
+    aio_context_release(ctx);
 }
 
 static void test_cancel_created(void)
diff --git a/util/aio-wait.c b/util/aio-wait.c
index b8a8f86dba..b4877493f8 100644
--- a/util/aio-wait.c
+++ b/util/aio-wait.c
@@ -26,21 +26,22 @@
 #include "qemu/main-loop.h"
 #include "block/aio-wait.h"
 
+AioWait global_aio_wait;
+
 static void dummy_bh_cb(void *opaque)
 {
     /* The point is to make AIO_WAIT_WHILE()'s aio_poll() return */
 }
 
-void aio_wait_kick(AioWait *wait)
+void aio_wait_kick(void)
 {
     /* The barrier (or an atomic op) is in the caller.  */
-    if (atomic_read(&wait->num_waiters)) {
+    if (atomic_read(&global_aio_wait.num_waiters)) {
         aio_bh_schedule_oneshot(qemu_get_aio_context(), dummy_bh_cb, NULL);
     }
 }
 
 typedef struct {
-    AioWait wait;
     bool done;
     QEMUBHFunc *cb;
     void *opaque;
@@ -54,7 +55,7 @@ static void aio_wait_bh(void *opaque)
     data->cb(data->opaque);
 
     data->done = true;
-    aio_wait_kick(&data->wait);
+    aio_wait_kick();
 }
 
 void aio_wait_bh_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
@@ -67,5 +68,5 @@ void aio_wait_bh_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
     assert(qemu_get_current_aio_context() == qemu_get_aio_context());
 
     aio_bh_schedule_oneshot(ctx, aio_wait_bh, &data);
-    AIO_WAIT_WHILE(&data.wait, ctx, !data.done);
+    AIO_WAIT_WHILE(ctx, !data.done);
 }
diff --git a/util/async.c b/util/async.c
index 05979f8014..c10642a385 100644
--- a/util/async.c
+++ b/util/async.c
@@ -400,7 +400,7 @@ static void co_schedule_bh_cb(void *opaque)
 
         /* Protected by write barrier in qemu_aio_coroutine_enter */
         atomic_set(&co->scheduled, NULL);
-        qemu_coroutine_enter(co);
+        qemu_aio_coroutine_enter(ctx, co);
         aio_context_release(ctx);
     }
 }
diff --git a/util/qemu-coroutine.c b/util/qemu-coroutine.c
index 1ba4191b84..2295928d33 100644
--- a/util/qemu-coroutine.c
+++ b/util/qemu-coroutine.c
@@ -198,3 +198,8 @@ bool qemu_coroutine_entered(Coroutine *co)
 {
     return co->caller;
 }
+
+AioContext *coroutine_fn qemu_coroutine_get_aio_context(Coroutine *co)
+{
+    return co->ctx;
+}