summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--block/coroutines.h6
-rw-r--r--block/nbd.c553
-rw-r--r--include/block/aio.h5
-rw-r--r--include/block/nbd.h18
-rw-r--r--include/qemu/coroutine.h6
-rw-r--r--include/qemu/sockets.h11
-rw-r--r--iothread.c9
-rw-r--r--nbd/client-connection.c388
-rw-r--r--nbd/meson.build1
-rw-r--r--scripts/block-coroutine-wrapper.py7
-rw-r--r--stubs/iothread-lock.c2
-rw-r--r--stubs/iothread.c8
-rw-r--r--stubs/meson.build1
-rw-r--r--tests/unit/iothread.c9
-rw-r--r--tests/unit/test-aio.c37
-rw-r--r--util/async.c20
-rw-r--r--util/main-loop.c1
-rw-r--r--util/qemu-sockets.c19
18 files changed, 626 insertions, 475 deletions
diff --git a/block/coroutines.h b/block/coroutines.h
index 4cfb4946e6..514d169d23 100644
--- a/block/coroutines.h
+++ b/block/coroutines.h
@@ -66,4 +66,10 @@ int coroutine_fn bdrv_co_readv_vmstate(BlockDriverState *bs,
 int coroutine_fn bdrv_co_writev_vmstate(BlockDriverState *bs,
                                         QEMUIOVector *qiov, int64_t pos);
 
+int generated_co_wrapper
+nbd_do_establish_connection(BlockDriverState *bs, Error **errp);
+int coroutine_fn
+nbd_co_do_establish_connection(BlockDriverState *bs, Error **errp);
+
+
 #endif /* BLOCK_COROUTINES_INT_H */
diff --git a/block/nbd.c b/block/nbd.c
index 616f9ae6c4..3cbee762de 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -44,6 +44,7 @@
 #include "block/qdict.h"
 #include "block/nbd.h"
 #include "block/block_int.h"
+#include "block/coroutines.h"
 
 #include "qemu/yank.h"
 
@@ -66,50 +67,8 @@ typedef enum NBDClientState {
     NBD_CLIENT_QUIT
 } NBDClientState;
 
-typedef enum NBDConnectThreadState {
-    /* No thread, no pending results */
-    CONNECT_THREAD_NONE,
-
-    /* Thread is running, no results for now */
-    CONNECT_THREAD_RUNNING,
-
-    /*
-     * Thread is running, but requestor exited. Thread should close
-     * the new socket and free the connect state on exit.
-     */
-    CONNECT_THREAD_RUNNING_DETACHED,
-
-    /* Thread finished, results are stored in a state */
-    CONNECT_THREAD_FAIL,
-    CONNECT_THREAD_SUCCESS
-} NBDConnectThreadState;
-
-typedef struct NBDConnectThread {
-    /* Initialization constants */
-    SocketAddress *saddr; /* address to connect to */
-    /*
-     * Bottom half to schedule on completion. Scheduled only if bh_ctx is not
-     * NULL
-     */
-    QEMUBHFunc *bh_func;
-    void *bh_opaque;
-
-    /*
-     * Result of last attempt. Valid in FAIL and SUCCESS states.
-     * If you want to steal error, don't forget to set pointer to NULL.
-     */
-    QIOChannelSocket *sioc;
-    Error *err;
-
-    /* state and bh_ctx are protected by mutex */
-    QemuMutex mutex;
-    NBDConnectThreadState state; /* current state of the thread */
-    AioContext *bh_ctx; /* where to schedule bh (NULL means don't schedule) */
-} NBDConnectThread;
-
 typedef struct BDRVNBDState {
-    QIOChannelSocket *sioc; /* The master data channel */
-    QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */
+    QIOChannel *ioc; /* The current I/O channel */
     NBDExportInfo info;
 
     CoMutex send_mutex;
@@ -121,8 +80,6 @@ typedef struct BDRVNBDState {
     bool wait_drained_end;
     int in_flight;
     NBDClientState state;
-    int connect_status;
-    Error *connect_err;
     bool wait_in_flight;
 
     QEMUTimer *reconnect_delay_timer;
@@ -140,20 +97,20 @@ typedef struct BDRVNBDState {
     char *x_dirty_bitmap;
     bool alloc_depth;
 
-    bool wait_connect;
-    NBDConnectThread *connect_thread;
+    NBDClientConnection *conn;
 } BDRVNBDState;
 
-static int nbd_establish_connection(BlockDriverState *bs, SocketAddress *saddr,
-                                    Error **errp);
-static int nbd_co_establish_connection(BlockDriverState *bs, Error **errp);
-static void nbd_co_establish_connection_cancel(BlockDriverState *bs,
-                                               bool detach);
-static int nbd_client_handshake(BlockDriverState *bs, Error **errp);
 static void nbd_yank(void *opaque);
 
-static void nbd_clear_bdrvstate(BDRVNBDState *s)
+static void nbd_clear_bdrvstate(BlockDriverState *bs)
 {
+    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
+
+    nbd_client_connection_release(s->conn);
+    s->conn = NULL;
+
+    yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name));
+
     object_unref(OBJECT(s->tlscreds));
     qapi_free_SocketAddress(s->saddr);
     s->saddr = NULL;
@@ -165,15 +122,20 @@ static void nbd_clear_bdrvstate(BDRVNBDState *s)
     s->x_dirty_bitmap = NULL;
 }
 
+static bool nbd_client_connected(BDRVNBDState *s)
+{
+    return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED;
+}
+
 static void nbd_channel_error(BDRVNBDState *s, int ret)
 {
     if (ret == -EIO) {
-        if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED) {
+        if (nbd_client_connected(s)) {
             s->state = s->reconnect_delay ? NBD_CLIENT_CONNECTING_WAIT :
                                             NBD_CLIENT_CONNECTING_NOWAIT;
         }
     } else {
-        if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED) {
+        if (nbd_client_connected(s)) {
             qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
         }
         s->state = NBD_CLIENT_QUIT;
@@ -188,6 +150,7 @@ static void nbd_recv_coroutines_wake_all(BDRVNBDState *s)
         NBDClientRequest *req = &s->requests[i];
 
         if (req->coroutine && req->receiving) {
+            req->receiving = false;
             aio_co_wake(req->coroutine);
         }
     }
@@ -271,7 +234,7 @@ static void nbd_client_attach_aio_context(BlockDriverState *bs,
      * s->connection_co is either yielded from nbd_receive_reply or from
      * nbd_co_reconnect_loop()
      */
-    if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED) {
+    if (nbd_client_connected(s)) {
         qio_channel_attach_aio_context(QIO_CHANNEL(s->ioc), new_context);
     }
 
@@ -291,7 +254,7 @@ static void coroutine_fn nbd_client_co_drain_begin(BlockDriverState *bs)
     s->drained = true;
     qemu_co_sleep_wake(&s->reconnect_sleep);
 
-    nbd_co_establish_connection_cancel(bs, false);
+    nbd_co_establish_connection_cancel(s->conn);
 
     reconnect_delay_timer_del(s);
 
@@ -320,16 +283,12 @@ static void nbd_teardown_connection(BlockDriverState *bs)
     if (s->ioc) {
         /* finish any pending coroutines */
         qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
-    } else if (s->sioc) {
-        /* abort negotiation */
-        qio_channel_shutdown(QIO_CHANNEL(s->sioc), QIO_CHANNEL_SHUTDOWN_BOTH,
-                             NULL);
     }
 
     s->state = NBD_CLIENT_QUIT;
     if (s->connection_co) {
         qemu_co_sleep_wake(&s->reconnect_sleep);
-        nbd_co_establish_connection_cancel(bs, true);
+        nbd_co_establish_connection_cancel(s->conn);
     }
     if (qemu_in_coroutine()) {
         s->teardown_co = qemu_coroutine_self();
@@ -354,239 +313,95 @@ static bool nbd_client_connecting_wait(BDRVNBDState *s)
     return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT;
 }
 
-static void connect_bh(void *opaque)
-{
-    BDRVNBDState *state = opaque;
-
-    assert(state->wait_connect);
-    state->wait_connect = false;
-    aio_co_wake(state->connection_co);
-}
-
-static void nbd_init_connect_thread(BDRVNBDState *s)
-{
-    s->connect_thread = g_new(NBDConnectThread, 1);
-
-    *s->connect_thread = (NBDConnectThread) {
-        .saddr = QAPI_CLONE(SocketAddress, s->saddr),
-        .state = CONNECT_THREAD_NONE,
-        .bh_func = connect_bh,
-        .bh_opaque = s,
-    };
-
-    qemu_mutex_init(&s->connect_thread->mutex);
-}
-
-static void nbd_free_connect_thread(NBDConnectThread *thr)
-{
-    if (thr->sioc) {
-        qio_channel_close(QIO_CHANNEL(thr->sioc), NULL);
-    }
-    error_free(thr->err);
-    qapi_free_SocketAddress(thr->saddr);
-    g_free(thr);
-}
-
-static void *connect_thread_func(void *opaque)
+/*
+ * Update @bs with information learned during a completed negotiation process.
+ * Return failure if the server's advertised options are incompatible with the
+ * client's needs.
+ */
+static int nbd_handle_updated_info(BlockDriverState *bs, Error **errp)
 {
-    NBDConnectThread *thr = opaque;
+    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     int ret;
-    bool do_free = false;
-
-    thr->sioc = qio_channel_socket_new();
 
-    error_free(thr->err);
-    thr->err = NULL;
-    ret = qio_channel_socket_connect_sync(thr->sioc, thr->saddr, &thr->err);
-    if (ret < 0) {
-        object_unref(OBJECT(thr->sioc));
-        thr->sioc = NULL;
+    if (s->x_dirty_bitmap) {
+        if (!s->info.base_allocation) {
+            error_setg(errp, "requested x-dirty-bitmap %s not found",
+                       s->x_dirty_bitmap);
+            return -EINVAL;
+        }
+        if (strcmp(s->x_dirty_bitmap, "qemu:allocation-depth") == 0) {
+            s->alloc_depth = true;
+        }
     }
 
-    qemu_mutex_lock(&thr->mutex);
-
-    switch (thr->state) {
-    case CONNECT_THREAD_RUNNING:
-        thr->state = ret < 0 ? CONNECT_THREAD_FAIL : CONNECT_THREAD_SUCCESS;
-        if (thr->bh_ctx) {
-            aio_bh_schedule_oneshot(thr->bh_ctx, thr->bh_func, thr->bh_opaque);
-
-            /* play safe, don't reuse bh_ctx on further connection attempts */
-            thr->bh_ctx = NULL;
+    if (s->info.flags & NBD_FLAG_READ_ONLY) {
+        ret = bdrv_apply_auto_read_only(bs, "NBD export is read-only", errp);
+        if (ret < 0) {
+            return ret;
         }
-        break;
-    case CONNECT_THREAD_RUNNING_DETACHED:
-        do_free = true;
-        break;
-    default:
-        abort();
     }
 
-    qemu_mutex_unlock(&thr->mutex);
+    if (s->info.flags & NBD_FLAG_SEND_FUA) {
+        bs->supported_write_flags = BDRV_REQ_FUA;
+        bs->supported_zero_flags |= BDRV_REQ_FUA;
+    }
 
-    if (do_free) {
-        nbd_free_connect_thread(thr);
+    if (s->info.flags & NBD_FLAG_SEND_WRITE_ZEROES) {
+        bs->supported_zero_flags |= BDRV_REQ_MAY_UNMAP;
+        if (s->info.flags & NBD_FLAG_SEND_FAST_ZERO) {
+            bs->supported_zero_flags |= BDRV_REQ_NO_FALLBACK;
+        }
     }
 
-    return NULL;
+    trace_nbd_client_handshake_success(s->export);
+
+    return 0;
 }
 
-static int coroutine_fn
-nbd_co_establish_connection(BlockDriverState *bs, Error **errp)
+int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
+                                                Error **errp)
 {
+    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     int ret;
-    QemuThread thread;
-    BDRVNBDState *s = bs->opaque;
-    NBDConnectThread *thr = s->connect_thread;
-
-    if (!thr) {
-        /* detached */
-        return -1;
-    }
-
-    qemu_mutex_lock(&thr->mutex);
-
-    switch (thr->state) {
-    case CONNECT_THREAD_FAIL:
-    case CONNECT_THREAD_NONE:
-        error_free(thr->err);
-        thr->err = NULL;
-        thr->state = CONNECT_THREAD_RUNNING;
-        qemu_thread_create(&thread, "nbd-connect",
-                           connect_thread_func, thr, QEMU_THREAD_DETACHED);
-        break;
-    case CONNECT_THREAD_SUCCESS:
-        /* Previous attempt finally succeeded in background */
-        thr->state = CONNECT_THREAD_NONE;
-        s->sioc = thr->sioc;
-        thr->sioc = NULL;
-        yank_register_function(BLOCKDEV_YANK_INSTANCE(bs->node_name),
-                               nbd_yank, bs);
-        qemu_mutex_unlock(&thr->mutex);
-        return 0;
-    case CONNECT_THREAD_RUNNING:
-        /* Already running, will wait */
-        break;
-    default:
-        abort();
-    }
-
-    thr->bh_ctx = qemu_get_current_aio_context();
-
-    qemu_mutex_unlock(&thr->mutex);
 
+    assert(!s->ioc);
 
-    /*
-     * We are going to wait for connect-thread finish, but
-     * nbd_client_co_drain_begin() can interrupt.
-     *
-     * Note that wait_connect variable is not visible for connect-thread. It
-     * doesn't need mutex protection, it used only inside home aio context of
-     * bs.
-     */
-    s->wait_connect = true;
-    qemu_coroutine_yield();
-
-    if (!s->connect_thread) {
-        /* detached */
-        return -1;
-    }
-    assert(thr == s->connect_thread);
-
-    qemu_mutex_lock(&thr->mutex);
-
-    switch (thr->state) {
-    case CONNECT_THREAD_SUCCESS:
-    case CONNECT_THREAD_FAIL:
-        thr->state = CONNECT_THREAD_NONE;
-        error_propagate(errp, thr->err);
-        thr->err = NULL;
-        s->sioc = thr->sioc;
-        thr->sioc = NULL;
-        if (s->sioc) {
-            yank_register_function(BLOCKDEV_YANK_INSTANCE(bs->node_name),
-                                   nbd_yank, bs);
-        }
-        ret = (s->sioc ? 0 : -1);
-        break;
-    case CONNECT_THREAD_RUNNING:
-    case CONNECT_THREAD_RUNNING_DETACHED:
-        /*
-         * Obviously, drained section wants to start. Report the attempt as
-         * failed. Still connect thread is executing in background, and its
-         * result may be used for next connection attempt.
-         */
-        ret = -1;
-        error_setg(errp, "Connection attempt cancelled by other operation");
-        break;
+    s->ioc = nbd_co_establish_connection(s->conn, &s->info, true, errp);
+    if (!s->ioc) {
+        return -ECONNREFUSED;
+    }
 
-    case CONNECT_THREAD_NONE:
+    ret = nbd_handle_updated_info(s->bs, NULL);
+    if (ret < 0) {
         /*
-         * Impossible. We've seen this thread running. So it should be
-         * running or at least give some results.
+         * We have connected, but must fail for other reasons.
+         * Send NBD_CMD_DISC as a courtesy to the server.
          */
-        abort();
-
-    default:
-        abort();
-    }
+        NBDRequest request = { .type = NBD_CMD_DISC };
 
-    qemu_mutex_unlock(&thr->mutex);
+        nbd_send_request(s->ioc, &request);
 
-    return ret;
-}
+        object_unref(OBJECT(s->ioc));
+        s->ioc = NULL;
 
-/*
- * nbd_co_establish_connection_cancel
- * Cancel nbd_co_establish_connection asynchronously: it will finish soon, to
- * allow drained section to begin.
- *
- * If detach is true, also cleanup the state (or if thread is running, move it
- * to CONNECT_THREAD_RUNNING_DETACHED state). s->connect_thread becomes NULL if
- * detach is true.
- */
-static void nbd_co_establish_connection_cancel(BlockDriverState *bs,
-                                               bool detach)
-{
-    BDRVNBDState *s = bs->opaque;
-    NBDConnectThread *thr = s->connect_thread;
-    bool wake = false;
-    bool do_free = false;
-
-    qemu_mutex_lock(&thr->mutex);
-
-    if (thr->state == CONNECT_THREAD_RUNNING) {
-        /* We can cancel only in running state, when bh is not yet scheduled */
-        thr->bh_ctx = NULL;
-        if (s->wait_connect) {
-            s->wait_connect = false;
-            wake = true;
-        }
-        if (detach) {
-            thr->state = CONNECT_THREAD_RUNNING_DETACHED;
-            s->connect_thread = NULL;
-        }
-    } else if (detach) {
-        do_free = true;
+        return ret;
     }
 
-    qemu_mutex_unlock(&thr->mutex);
+    qio_channel_set_blocking(s->ioc, false, NULL);
+    qio_channel_attach_aio_context(s->ioc, bdrv_get_aio_context(bs));
 
-    if (do_free) {
-        nbd_free_connect_thread(thr);
-        s->connect_thread = NULL;
-    }
+    yank_register_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name), nbd_yank,
+                           bs);
 
-    if (wake) {
-        aio_co_wake(s->connection_co);
-    }
+    /* successfully connected */
+    s->state = NBD_CLIENT_CONNECTED;
+    qemu_co_queue_restart_all(&s->free_sema);
+
+    return 0;
 }
 
 static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
 {
-    int ret;
-    Error *local_err = NULL;
-
     if (!nbd_client_connecting(s)) {
         return;
     }
@@ -620,44 +435,11 @@ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
         qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
         yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
                                  nbd_yank, s->bs);
-        object_unref(OBJECT(s->sioc));
-        s->sioc = NULL;
         object_unref(OBJECT(s->ioc));
         s->ioc = NULL;
     }
 
-    if (nbd_co_establish_connection(s->bs, &local_err) < 0) {
-        ret = -ECONNREFUSED;
-        goto out;
-    }
-
-    bdrv_dec_in_flight(s->bs);
-
-    ret = nbd_client_handshake(s->bs, &local_err);
-
-    if (s->drained) {
-        s->wait_drained_end = true;
-        while (s->drained) {
-            /*
-             * We may be entered once from nbd_client_attach_aio_context_bh
-             * and then from nbd_client_co_drain_end. So here is a loop.
-             */
-            qemu_coroutine_yield();
-        }
-    }
-    bdrv_inc_in_flight(s->bs);
-
-out:
-    s->connect_status = ret;
-    error_free(s->connect_err);
-    s->connect_err = NULL;
-    error_propagate(&s->connect_err, local_err);
-
-    if (ret >= 0) {
-        /* successfully connected */
-        s->state = NBD_CLIENT_CONNECTED;
-        qemu_co_queue_restart_all(&s->free_sema);
-    }
+    nbd_co_do_establish_connection(s->bs, NULL);
 }
 
 static coroutine_fn void nbd_co_reconnect_loop(BDRVNBDState *s)
@@ -723,7 +505,7 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
             nbd_co_reconnect_loop(s);
         }
 
-        if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) {
+        if (!nbd_client_connected(s)) {
             continue;
         }
 
@@ -767,6 +549,7 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
          *   connection_co happens through a bottom half, which can only
          *   run after we yield.
          */
+        s->requests[i].receiving = false;
         aio_co_wake(s->requests[i].coroutine);
         qemu_coroutine_yield();
     }
@@ -780,8 +563,6 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
         qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
         yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
                                  nbd_yank, s->bs);
-        object_unref(OBJECT(s->sioc));
-        s->sioc = NULL;
         object_unref(OBJECT(s->ioc));
         s->ioc = NULL;
     }
@@ -804,7 +585,7 @@ static int nbd_co_send_request(BlockDriverState *bs,
         qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
     }
 
-    if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) {
+    if (!nbd_client_connected(s)) {
         rc = -EIO;
         goto err;
     }
@@ -831,8 +612,7 @@ static int nbd_co_send_request(BlockDriverState *bs,
     if (qiov) {
         qio_channel_set_cork(s->ioc, true);
         rc = nbd_send_request(s->ioc, request);
-        if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED &&
-            rc >= 0) {
+        if (nbd_client_connected(s) && rc >= 0) {
             if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
                                        NULL) < 0) {
                 rc = -EIO;
@@ -1156,8 +936,8 @@ static coroutine_fn int nbd_co_do_receive_one_chunk(
     /* Wait until we're woken up by nbd_connection_entry.  */
     s->requests[i].receiving = true;
     qemu_coroutine_yield();
-    s->requests[i].receiving = false;
-    if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) {
+    assert(!s->requests[i].receiving);
+    if (!nbd_client_connected(s)) {
         error_setg(errp, "Connection closed");
         return -EIO;
     }
@@ -1316,7 +1096,7 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
     NBDReply local_reply;
     NBDStructuredReplyChunk *chunk;
     Error *local_err = NULL;
-    if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) {
+    if (!nbd_client_connected(s)) {
         error_setg(&local_err, "Connection closed");
         nbd_iter_channel_error(iter, -EIO, &local_err);
         goto break_loop;
@@ -1341,8 +1121,7 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
     }
 
     /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
-    if (nbd_reply_is_simple(reply) ||
-        qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) {
+    if (nbd_reply_is_simple(reply) || !nbd_client_connected(s)) {
         goto break_loop;
     }
 
@@ -1780,7 +1559,7 @@ static void nbd_yank(void *opaque)
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 
     qatomic_store_release(&s->state, NBD_CLIENT_QUIT);
-    qio_channel_shutdown(QIO_CHANNEL(s->sioc), QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
+    qio_channel_shutdown(QIO_CHANNEL(s->ioc), QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
 }
 
 static void nbd_client_close(BlockDriverState *bs)
@@ -1795,111 +1574,6 @@ static void nbd_client_close(BlockDriverState *bs)
     nbd_teardown_connection(bs);
 }
 
-static int nbd_establish_connection(BlockDriverState *bs,
-                                    SocketAddress *saddr,
-                                    Error **errp)
-{
-    ERRP_GUARD();
-    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
-
-    s->sioc = qio_channel_socket_new();
-    qio_channel_set_name(QIO_CHANNEL(s->sioc), "nbd-client");
-
-    qio_channel_socket_connect_sync(s->sioc, saddr, errp);
-    if (*errp) {
-        object_unref(OBJECT(s->sioc));
-        s->sioc = NULL;
-        return -1;
-    }
-
-    yank_register_function(BLOCKDEV_YANK_INSTANCE(bs->node_name), nbd_yank, bs);
-    qio_channel_set_delay(QIO_CHANNEL(s->sioc), false);
-
-    return 0;
-}
-
-/* nbd_client_handshake takes ownership on s->sioc. On failure it's unref'ed. */
-static int nbd_client_handshake(BlockDriverState *bs, Error **errp)
-{
-    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
-    AioContext *aio_context = bdrv_get_aio_context(bs);
-    int ret;
-
-    trace_nbd_client_handshake(s->export);
-    qio_channel_set_blocking(QIO_CHANNEL(s->sioc), false, NULL);
-    qio_channel_attach_aio_context(QIO_CHANNEL(s->sioc), aio_context);
-
-    s->info.request_sizes = true;
-    s->info.structured_reply = true;
-    s->info.base_allocation = true;
-    s->info.x_dirty_bitmap = g_strdup(s->x_dirty_bitmap);
-    s->info.name = g_strdup(s->export ?: "");
-    ret = nbd_receive_negotiate(aio_context, QIO_CHANNEL(s->sioc), s->tlscreds,
-                                s->hostname, &s->ioc, &s->info, errp);
-    g_free(s->info.x_dirty_bitmap);
-    g_free(s->info.name);
-    if (ret < 0) {
-        yank_unregister_function(BLOCKDEV_YANK_INSTANCE(bs->node_name),
-                                 nbd_yank, bs);
-        object_unref(OBJECT(s->sioc));
-        s->sioc = NULL;
-        return ret;
-    }
-    if (s->x_dirty_bitmap) {
-        if (!s->info.base_allocation) {
-            error_setg(errp, "requested x-dirty-bitmap %s not found",
-                       s->x_dirty_bitmap);
-            ret = -EINVAL;
-            goto fail;
-        }
-        if (strcmp(s->x_dirty_bitmap, "qemu:allocation-depth") == 0) {
-            s->alloc_depth = true;
-        }
-    }
-    if (s->info.flags & NBD_FLAG_READ_ONLY) {
-        ret = bdrv_apply_auto_read_only(bs, "NBD export is read-only", errp);
-        if (ret < 0) {
-            goto fail;
-        }
-    }
-    if (s->info.flags & NBD_FLAG_SEND_FUA) {
-        bs->supported_write_flags = BDRV_REQ_FUA;
-        bs->supported_zero_flags |= BDRV_REQ_FUA;
-    }
-    if (s->info.flags & NBD_FLAG_SEND_WRITE_ZEROES) {
-        bs->supported_zero_flags |= BDRV_REQ_MAY_UNMAP;
-        if (s->info.flags & NBD_FLAG_SEND_FAST_ZERO) {
-            bs->supported_zero_flags |= BDRV_REQ_NO_FALLBACK;
-        }
-    }
-
-    if (!s->ioc) {
-        s->ioc = QIO_CHANNEL(s->sioc);
-        object_ref(OBJECT(s->ioc));
-    }
-
-    trace_nbd_client_handshake_success(s->export);
-
-    return 0;
-
- fail:
-    /*
-     * We have connected, but must fail for other reasons.
-     * Send NBD_CMD_DISC as a courtesy to the server.
-     */
-    {
-        NBDRequest request = { .type = NBD_CMD_DISC };
-
-        nbd_send_request(s->ioc ?: QIO_CHANNEL(s->sioc), &request);
-
-        yank_unregister_function(BLOCKDEV_YANK_INSTANCE(bs->node_name),
-                                 nbd_yank, bs);
-        object_unref(OBJECT(s->sioc));
-        s->sioc = NULL;
-
-        return ret;
-    }
-}
 
 /*
  * Parse nbd_open options
@@ -2133,6 +1807,12 @@ static SocketAddress *nbd_config(BDRVNBDState *s, QDict *options,
         goto done;
     }
 
+    if (socket_address_parse_named_fd(saddr, errp) < 0) {
+        qapi_free_SocketAddress(saddr);
+        saddr = NULL;
+        goto done;
+    }
+
 done:
     qobject_unref(addr);
     visit_free(iv);
@@ -2274,9 +1954,6 @@ static int nbd_process_options(BlockDriverState *bs, QDict *options,
     ret = 0;
 
  error:
-    if (ret < 0) {
-        nbd_clear_bdrvstate(s);
-    }
     qemu_opts_del(opts);
     return ret;
 }
@@ -2287,11 +1964,6 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
     int ret;
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 
-    ret = nbd_process_options(bs, options, errp);
-    if (ret < 0) {
-        return ret;
-    }
-
     s->bs = bs;
     qemu_co_mutex_init(&s->send_mutex);
     qemu_co_queue_init(&s->free_sema);
@@ -2300,31 +1972,29 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
         return -EEXIST;
     }
 
-    /*
-     * establish TCP connection, return error if it fails
-     * TODO: Configurable retry-until-timeout behaviour.
-     */
-    if (nbd_establish_connection(bs, s->saddr, errp) < 0) {
-        yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name));
-        return -ECONNREFUSED;
+    ret = nbd_process_options(bs, options, errp);
+    if (ret < 0) {
+        goto fail;
     }
 
-    ret = nbd_client_handshake(bs, errp);
+    s->conn = nbd_client_connection_new(s->saddr, true, s->export,
+                                        s->x_dirty_bitmap, s->tlscreds);
+
+    /* TODO: Configurable retry-until-timeout behaviour. */
+    ret = nbd_do_establish_connection(bs, errp);
     if (ret < 0) {
-        yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name));
-        nbd_clear_bdrvstate(s);
-        return ret;
+        goto fail;
     }
-    /* successfully connected */
-    s->state = NBD_CLIENT_CONNECTED;
-
-    nbd_init_connect_thread(s);
 
     s->connection_co = qemu_coroutine_create(nbd_connection_entry, s);
     bdrv_inc_in_flight(bs);
     aio_co_schedule(bdrv_get_aio_context(bs), s->connection_co);
 
     return 0;
+
+fail:
+    nbd_clear_bdrvstate(bs);
+    return ret;
 }
 
 static int nbd_co_flush(BlockDriverState *bs)
@@ -2368,11 +2038,8 @@ static void nbd_refresh_limits(BlockDriverState *bs, Error **errp)
 
 static void nbd_close(BlockDriverState *bs)
 {
-    BDRVNBDState *s = bs->opaque;
-
     nbd_client_close(bs);
-    yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name));
-    nbd_clear_bdrvstate(s);
+    nbd_clear_bdrvstate(bs);
 }
 
 /*
diff --git a/include/block/aio.h b/include/block/aio.h
index 5f342267d5..10fcae1515 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -691,10 +691,13 @@ void aio_co_enter(AioContext *ctx, struct Coroutine *co);
  * Return the AioContext whose event loop runs in the current thread.
  *
  * If called from an IOThread this will be the IOThread's AioContext.  If
- * called from another thread it will be the main loop AioContext.
+ * called from the main thread or with the "big QEMU lock" taken it
+ * will be the main loop AioContext.
  */
 AioContext *qemu_get_current_aio_context(void);
 
+void qemu_set_current_aio_context(AioContext *ctx);
+
 /**
  * aio_context_setup:
  * @ctx: the aio context
diff --git a/include/block/nbd.h b/include/block/nbd.h
index 5f34d23bb0..78d101b774 100644
--- a/include/block/nbd.h
+++ b/include/block/nbd.h
@@ -406,4 +406,22 @@ const char *nbd_info_lookup(uint16_t info);
 const char *nbd_cmd_lookup(uint16_t info);
 const char *nbd_err_lookup(int err);
 
+/* nbd/client-connection.c */
+typedef struct NBDClientConnection NBDClientConnection;
+
+void nbd_client_connection_enable_retry(NBDClientConnection *conn);
+
+NBDClientConnection *nbd_client_connection_new(const SocketAddress *saddr,
+                                               bool do_negotiation,
+                                               const char *export_name,
+                                               const char *x_dirty_bitmap,
+                                               QCryptoTLSCreds *tlscreds);
+void nbd_client_connection_release(NBDClientConnection *conn);
+
+QIOChannel *coroutine_fn
+nbd_co_establish_connection(NBDClientConnection *conn, NBDExportInfo *info,
+                            bool blocking, Error **errp);
+
+void coroutine_fn nbd_co_establish_connection_cancel(NBDClientConnection *conn);
+
 #endif
diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h
index 292e61aef0..4829ff373d 100644
--- a/include/qemu/coroutine.h
+++ b/include/qemu/coroutine.h
@@ -210,13 +210,15 @@ void coroutine_fn qemu_co_queue_wait_impl(CoQueue *queue, QemuLockable *lock);
 /**
  * Removes the next coroutine from the CoQueue, and wake it up.
  * Returns true if a coroutine was removed, false if the queue is empty.
+ * OK to run from coroutine and non-coroutine context.
  */
-bool coroutine_fn qemu_co_queue_next(CoQueue *queue);
+bool qemu_co_queue_next(CoQueue *queue);
 
 /**
  * Empties the CoQueue; all coroutines are woken up.
+ * OK to run from coroutine and non-coroutine context.
  */
-void coroutine_fn qemu_co_queue_restart_all(CoQueue *queue);
+void qemu_co_queue_restart_all(CoQueue *queue);
 
 /**
  * Removes the next coroutine from the CoQueue, and wake it up.  Unlike
diff --git a/include/qemu/sockets.h b/include/qemu/sockets.h
index 7d1f813576..0c34bf2398 100644
--- a/include/qemu/sockets.h
+++ b/include/qemu/sockets.h
@@ -111,4 +111,15 @@ SocketAddress *socket_remote_address(int fd, Error **errp);
  */
 SocketAddress *socket_address_flatten(SocketAddressLegacy *addr);
 
+/**
+ * socket_address_parse_named_fd:
+ *
+ * Modify @addr, replacing a named fd by its corresponding number.
+ * Needed for callers that plan to pass @addr to a context where the
+ * current monitor is not available.
+ *
+ * Return 0 on success.
+ */
+int socket_address_parse_named_fd(SocketAddress *addr, Error **errp);
+
 #endif /* QEMU_SOCKETS_H */
diff --git a/iothread.c b/iothread.c
index 7f086387be..2c5ccd7367 100644
--- a/iothread.c
+++ b/iothread.c
@@ -39,13 +39,6 @@ DECLARE_CLASS_CHECKERS(IOThreadClass, IOTHREAD,
 #define IOTHREAD_POLL_MAX_NS_DEFAULT 0ULL
 #endif
 
-static __thread IOThread *my_iothread;
-
-AioContext *qemu_get_current_aio_context(void)
-{
-    return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
-}
-
 static void *iothread_run(void *opaque)
 {
     IOThread *iothread = opaque;
@@ -56,7 +49,7 @@ static void *iothread_run(void *opaque)
      * in this new thread uses glib.
      */
     g_main_context_push_thread_default(iothread->worker_context);
-    my_iothread = iothread;
+    qemu_set_current_aio_context(iothread->ctx);
     iothread->thread_id = qemu_get_thread_id();
     qemu_sem_post(&iothread->init_done_sem);
 
diff --git a/nbd/client-connection.c b/nbd/client-connection.c
new file mode 100644
index 0000000000..7123b1e189
--- /dev/null
+++ b/nbd/client-connection.c
@@ -0,0 +1,388 @@
+/*
+ * QEMU Block driver for  NBD
+ *
+ * Copyright (c) 2021 Virtuozzo International GmbH.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+#include "qemu/osdep.h"
+
+#include "block/nbd.h"
+
+#include "qapi/qapi-visit-sockets.h"
+#include "qapi/clone-visitor.h"
+
+struct NBDClientConnection {
+    /* Initialization constants, never change */
+    SocketAddress *saddr; /* address to connect to */
+    QCryptoTLSCreds *tlscreds;
+    NBDExportInfo initial_info;
+    bool do_negotiation;
+    bool do_retry;
+
+    QemuMutex mutex;
+
+    /*
+     * @sioc and @err represent a connection attempt.  While running
+     * is true, they are only used by the connection thread, and mutex
+     * locking is not needed.  Once the thread finishes,
+     * nbd_co_establish_connection then steals these pointers while
+     * under the mutex.
+     */
+    NBDExportInfo updated_info;
+    QIOChannelSocket *sioc;
+    QIOChannel *ioc;
+    Error *err;
+
+    /* All further fields are accessed only under mutex */
+    bool running; /* thread is running now */
+    bool detached; /* thread is detached and should cleanup the state */
+
+    /*
+     * wait_co: if non-NULL, which coroutine to wake in
+     * nbd_co_establish_connection() after yield()
+     */
+    Coroutine *wait_co;
+};
+
+/*
+ * The function isn't protected by any mutex, only call it when the client
+ * connection attempt has not yet started.
+ */
+void nbd_client_connection_enable_retry(NBDClientConnection *conn)
+{
+    conn->do_retry = true;
+}
+
+NBDClientConnection *nbd_client_connection_new(const SocketAddress *saddr,
+                                               bool do_negotiation,
+                                               const char *export_name,
+                                               const char *x_dirty_bitmap,
+                                               QCryptoTLSCreds *tlscreds)
+{
+    NBDClientConnection *conn = g_new(NBDClientConnection, 1);
+
+    object_ref(OBJECT(tlscreds));
+    *conn = (NBDClientConnection) {
+        .saddr = QAPI_CLONE(SocketAddress, saddr),
+        .tlscreds = tlscreds,
+        .do_negotiation = do_negotiation,
+
+        .initial_info.request_sizes = true,
+        .initial_info.structured_reply = true,
+        .initial_info.base_allocation = true,
+        .initial_info.x_dirty_bitmap = g_strdup(x_dirty_bitmap),
+        .initial_info.name = g_strdup(export_name ?: "")
+    };
+
+    qemu_mutex_init(&conn->mutex);
+
+    return conn;
+}
+
+static void nbd_client_connection_do_free(NBDClientConnection *conn)
+{
+    if (conn->sioc) {
+        qio_channel_close(QIO_CHANNEL(conn->sioc), NULL);
+        object_unref(OBJECT(conn->sioc));
+    }
+    error_free(conn->err);
+    qapi_free_SocketAddress(conn->saddr);
+    object_unref(OBJECT(conn->tlscreds));
+    g_free(conn->initial_info.x_dirty_bitmap);
+    g_free(conn->initial_info.name);
+    g_free(conn);
+}
+
+/*
+ * Connect to @addr and do NBD negotiation if @info is not null. If @tlscreds
+ * are given @outioc is returned. @outioc is provided only on success.  The call
+ * may be cancelled from other thread by simply qio_channel_shutdown(sioc).
+ */
+static int nbd_connect(QIOChannelSocket *sioc, SocketAddress *addr,
+                       NBDExportInfo *info, QCryptoTLSCreds *tlscreds,
+                       QIOChannel **outioc, Error **errp)
+{
+    int ret;
+
+    if (outioc) {
+        *outioc = NULL;
+    }
+
+    ret = qio_channel_socket_connect_sync(sioc, addr, errp);
+    if (ret < 0) {
+        return ret;
+    }
+
+    qio_channel_set_delay(QIO_CHANNEL(sioc), false);
+
+    if (!info) {
+        return 0;
+    }
+
+    ret = nbd_receive_negotiate(NULL, QIO_CHANNEL(sioc), tlscreds,
+                                tlscreds ? addr->u.inet.host : NULL,
+                                outioc, info, errp);
+    if (ret < 0) {
+        /*
+         * nbd_receive_negotiate() may setup tls ioc and return it even on
+         * failure path. In this case we should use it instead of original
+         * channel.
+         */
+        if (outioc && *outioc) {
+            qio_channel_close(QIO_CHANNEL(*outioc), NULL);
+            object_unref(OBJECT(*outioc));
+            *outioc = NULL;
+        } else {
+            qio_channel_close(QIO_CHANNEL(sioc), NULL);
+        }
+
+        return ret;
+    }
+
+    return 0;
+}
+
+static void *connect_thread_func(void *opaque)
+{
+    NBDClientConnection *conn = opaque;
+    int ret;
+    bool do_free;
+    uint64_t timeout = 1;
+    uint64_t max_timeout = 16;
+
+    qemu_mutex_lock(&conn->mutex);
+    while (!conn->detached) {
+        assert(!conn->sioc);
+        conn->sioc = qio_channel_socket_new();
+
+        qemu_mutex_unlock(&conn->mutex);
+
+        error_free(conn->err);
+        conn->err = NULL;
+        conn->updated_info = conn->initial_info;
+
+        ret = nbd_connect(conn->sioc, conn->saddr,
+                          conn->do_negotiation ? &conn->updated_info : NULL,
+                          conn->tlscreds, &conn->ioc, &conn->err);
+
+        /*
+         * conn->updated_info will finally be returned to the user. Clear the
+         * pointers to our internally allocated strings, which are IN parameters
+         * of nbd_receive_negotiate() and therefore nbd_connect(). Caller
+         * shoudn't be interested in these fields.
+         */
+        conn->updated_info.x_dirty_bitmap = NULL;
+        conn->updated_info.name = NULL;
+
+        qemu_mutex_lock(&conn->mutex);
+
+        if (ret < 0) {
+            object_unref(OBJECT(conn->sioc));
+            conn->sioc = NULL;
+            if (conn->do_retry && !conn->detached) {
+                qemu_mutex_unlock(&conn->mutex);
+
+                sleep(timeout);
+                if (timeout < max_timeout) {
+                    timeout *= 2;
+                }
+
+                qemu_mutex_lock(&conn->mutex);
+                continue;
+            }
+        }
+
+        break;
+    }
+
+    /* mutex is locked */
+
+    assert(conn->running);
+    conn->running = false;
+    if (conn->wait_co) {
+        aio_co_wake(conn->wait_co);
+        conn->wait_co = NULL;
+    }
+    do_free = conn->detached;
+
+    qemu_mutex_unlock(&conn->mutex);
+
+    if (do_free) {
+        nbd_client_connection_do_free(conn);
+    }
+
+    return NULL;
+}
+
+void nbd_client_connection_release(NBDClientConnection *conn)
+{
+    bool do_free = false;
+
+    if (!conn) {
+        return;
+    }
+
+    WITH_QEMU_LOCK_GUARD(&conn->mutex) {
+        assert(!conn->detached);
+        if (conn->running) {
+            conn->detached = true;
+        } else {
+            do_free = true;
+        }
+        if (conn->sioc) {
+            qio_channel_shutdown(QIO_CHANNEL(conn->sioc),
+                                 QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
+        }
+    }
+
+    if (do_free) {
+        nbd_client_connection_do_free(conn);
+    }
+}
+
+/*
+ * Get a new connection in context of @conn:
+ *   if the thread is running, wait for completion
+ *   if the thread already succeeded in the background, and user didn't get the
+ *     result, just return it now
+ *   otherwise the thread is not running, so start a thread and wait for
+ *     completion
+ *
+ * If @blocking is false, don't wait for the thread, return immediately.
+ *
+ * If @info is not NULL, also do nbd-negotiation after successful connection.
+ * In this case info is used only as out parameter, and is fully initialized by
+ * nbd_co_establish_connection(). "IN" fields of info as well as related only to
+ * nbd_receive_export_list() would be zero (see description of NBDExportInfo in
+ * include/block/nbd.h).
+ */
+QIOChannel *coroutine_fn
+nbd_co_establish_connection(NBDClientConnection *conn, NBDExportInfo *info,
+                            bool blocking, Error **errp)
+{
+    QemuThread thread;
+
+    if (conn->do_negotiation) {
+        assert(info);
+    }
+
+    WITH_QEMU_LOCK_GUARD(&conn->mutex) {
+        /*
+         * Don't call nbd_co_establish_connection() in several coroutines in
+         * parallel. Only one call at once is supported.
+         */
+        assert(!conn->wait_co);
+
+        if (!conn->running) {
+            if (conn->sioc) {
+                /* Previous attempt finally succeeded in background */
+                if (conn->do_negotiation) {
+                    memcpy(info, &conn->updated_info, sizeof(*info));
+                    if (conn->ioc) {
+                        /* TLS channel now has own reference to parent */
+                        object_unref(OBJECT(conn->sioc));
+                        conn->sioc = NULL;
+
+                        return g_steal_pointer(&conn->ioc);
+                    }
+                }
+
+                assert(!conn->ioc);
+
+                return QIO_CHANNEL(g_steal_pointer(&conn->sioc));
+            }
+
+            conn->running = true;
+            error_free(conn->err);
+            conn->err = NULL;
+            qemu_thread_create(&thread, "nbd-connect",
+                               connect_thread_func, conn, QEMU_THREAD_DETACHED);
+        }
+
+        if (!blocking) {
+            return NULL;
+        }
+
+        conn->wait_co = qemu_coroutine_self();
+    }
+
+    /*
+     * We are going to wait for connect-thread finish, but
+     * nbd_co_establish_connection_cancel() can interrupt.
+     */
+    qemu_coroutine_yield();
+
+    WITH_QEMU_LOCK_GUARD(&conn->mutex) {
+        if (conn->running) {
+            /*
+             * The connection attempt was canceled and the coroutine resumed
+             * before the connection thread finished its job.  Report the
+             * attempt as failed, but leave the connection thread running,
+             * to reuse it for the next connection attempt.
+             */
+            error_setg(errp, "Connection attempt cancelled by other operation");
+            return NULL;
+        } else {
+            error_propagate(errp, conn->err);
+            conn->err = NULL;
+            if (!conn->sioc) {
+                return NULL;
+            }
+            if (conn->do_negotiation) {
+                memcpy(info, &conn->updated_info, sizeof(*info));
+                if (conn->ioc) {
+                    /* TLS channel now has own reference to parent */
+                    object_unref(OBJECT(conn->sioc));
+                    conn->sioc = NULL;
+
+                    return g_steal_pointer(&conn->ioc);
+                }
+            }
+
+            assert(!conn->ioc);
+
+            return QIO_CHANNEL(g_steal_pointer(&conn->sioc));
+        }
+    }
+
+    abort(); /* unreachable */
+}
+
+/*
+ * nbd_co_establish_connection_cancel
+ * Cancel nbd_co_establish_connection() asynchronously.
+ *
+ * Note that this function neither directly stops the thread nor closes the
+ * socket, but rather safely wakes nbd_co_establish_connection() which is
+ * sleeping in yield()
+ */
+void nbd_co_establish_connection_cancel(NBDClientConnection *conn)
+{
+    Coroutine *wait_co;
+
+    WITH_QEMU_LOCK_GUARD(&conn->mutex) {
+        wait_co = g_steal_pointer(&conn->wait_co);
+    }
+
+    if (wait_co) {
+        aio_co_wake(wait_co);
+    }
+}
diff --git a/nbd/meson.build b/nbd/meson.build
index 2baaa36948..b26d70565e 100644
--- a/nbd/meson.build
+++ b/nbd/meson.build
@@ -1,5 +1,6 @@
 block_ss.add(files(
   'client.c',
+  'client-connection.c',
   'common.c',
 ))
 blockdev_ss.add(files(
diff --git a/scripts/block-coroutine-wrapper.py b/scripts/block-coroutine-wrapper.py
index 0461fd1c45..85dbeb9ecf 100644
--- a/scripts/block-coroutine-wrapper.py
+++ b/scripts/block-coroutine-wrapper.py
@@ -98,12 +98,13 @@ def snake_to_camel(func_name: str) -> str:
 
 
 def gen_wrapper(func: FuncDecl) -> str:
-    assert func.name.startswith('bdrv_')
-    assert not func.name.startswith('bdrv_co_')
+    assert not '_co_' in func.name
     assert func.return_type == 'int'
     assert func.args[0].type in ['BlockDriverState *', 'BdrvChild *']
 
-    name = 'bdrv_co_' + func.name[5:]
+    subsystem, subname = func.name.split('_', 1)
+
+    name = f'{subsystem}_co_{subname}'
     bs = 'bs' if func.args[0].type == 'BlockDriverState *' else 'child->bs'
     struct_name = snake_to_camel(name)
 
diff --git a/stubs/iothread-lock.c b/stubs/iothread-lock.c
index 2a6efad64a..5b45b7fc8b 100644
--- a/stubs/iothread-lock.c
+++ b/stubs/iothread-lock.c
@@ -3,7 +3,7 @@
 
 bool qemu_mutex_iothread_locked(void)
 {
-    return true;
+    return false;
 }
 
 void qemu_mutex_lock_iothread_impl(const char *file, int line)
diff --git a/stubs/iothread.c b/stubs/iothread.c
deleted file mode 100644
index 8cc9e28c55..0000000000
--- a/stubs/iothread.c
+++ /dev/null
@@ -1,8 +0,0 @@
-#include "qemu/osdep.h"
-#include "block/aio.h"
-#include "qemu/main-loop.h"
-
-AioContext *qemu_get_current_aio_context(void)
-{
-    return qemu_get_aio_context();
-}
diff --git a/stubs/meson.build b/stubs/meson.build
index d4e9549dc9..2e79ff9f4d 100644
--- a/stubs/meson.build
+++ b/stubs/meson.build
@@ -16,7 +16,6 @@ stub_ss.add(files('fw_cfg.c'))
 stub_ss.add(files('gdbstub.c'))
 stub_ss.add(files('get-vm-name.c'))
 stub_ss.add(when: 'CONFIG_LINUX_IO_URING', if_true: files('io_uring.c'))
-stub_ss.add(files('iothread.c'))
 stub_ss.add(files('iothread-lock.c'))
 stub_ss.add(files('isa-bus.c'))
 stub_ss.add(files('is-daemonized.c'))
diff --git a/tests/unit/iothread.c b/tests/unit/iothread.c
index afde12b4ef..f9b0791084 100644
--- a/tests/unit/iothread.c
+++ b/tests/unit/iothread.c
@@ -30,13 +30,6 @@ struct IOThread {
     bool stopping;
 };
 
-static __thread IOThread *my_iothread;
-
-AioContext *qemu_get_current_aio_context(void)
-{
-    return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
-}
-
 static void iothread_init_gcontext(IOThread *iothread)
 {
     GSource *source;
@@ -54,9 +47,9 @@ static void *iothread_run(void *opaque)
 
     rcu_register_thread();
 
-    my_iothread = iothread;
     qemu_mutex_lock(&iothread->init_done_lock);
     iothread->ctx = aio_context_new(&error_abort);
+    qemu_set_current_aio_context(iothread->ctx);
 
     /*
      * We must connect the ctx to a GMainContext, because in older versions
diff --git a/tests/unit/test-aio.c b/tests/unit/test-aio.c
index 8a46078463..6feeb9a4a9 100644
--- a/tests/unit/test-aio.c
+++ b/tests/unit/test-aio.c
@@ -877,6 +877,42 @@ static void test_queue_chaining(void)
     g_assert_cmpint(data_b.i, ==, data_b.max);
 }
 
+static void co_check_current_thread(void *opaque)
+{
+    QemuThread *main_thread = opaque;
+    assert(qemu_thread_is_self(main_thread));
+}
+
+static void *test_aio_co_enter(void *co)
+{
+    /*
+     * qemu_get_current_aio_context() should not to be the main thread
+     * AioContext, because this is a worker thread that has not taken
+     * the BQL.  So aio_co_enter will schedule the coroutine in the
+     * main thread AioContext.
+     */
+    aio_co_enter(qemu_get_aio_context(), co);
+    return NULL;
+}
+
+static void test_worker_thread_co_enter(void)
+{
+    QemuThread this_thread, worker_thread;
+    Coroutine *co;
+
+    qemu_thread_get_self(&this_thread);
+    co = qemu_coroutine_create(co_check_current_thread, &this_thread);
+
+    qemu_thread_create(&worker_thread, "test_acquire_thread",
+                       test_aio_co_enter,
+                       co, QEMU_THREAD_JOINABLE);
+
+    /* Test aio_co_enter from a worker thread.  */
+    qemu_thread_join(&worker_thread);
+    g_assert(aio_poll(ctx, true));
+    g_assert(!aio_poll(ctx, false));
+}
+
 /* End of tests.  */
 
 int main(int argc, char **argv)
@@ -903,6 +939,7 @@ int main(int argc, char **argv)
     g_test_add_func("/aio/timer/schedule",          test_timer_schedule);
 
     g_test_add_func("/aio/coroutine/queue-chaining", test_queue_chaining);
+    g_test_add_func("/aio/coroutine/worker-thread-co-enter", test_worker_thread_co_enter);
 
     g_test_add_func("/aio-gsource/flush",                   test_source_flush);
     g_test_add_func("/aio-gsource/bh/schedule",             test_source_bh_schedule);
diff --git a/util/async.c b/util/async.c
index 674dbefb7c..5d9b7cc1eb 100644
--- a/util/async.c
+++ b/util/async.c
@@ -649,3 +649,23 @@ void aio_context_release(AioContext *ctx)
 {
     qemu_rec_mutex_unlock(&ctx->lock);
 }
+
+static __thread AioContext *my_aiocontext;
+
+AioContext *qemu_get_current_aio_context(void)
+{
+    if (my_aiocontext) {
+        return my_aiocontext;
+    }
+    if (qemu_mutex_iothread_locked()) {
+        /* Possibly in a vCPU thread.  */
+        return qemu_get_aio_context();
+    }
+    return NULL;
+}
+
+void qemu_set_current_aio_context(AioContext *ctx)
+{
+    assert(!my_aiocontext);
+    my_aiocontext = ctx;
+}
diff --git a/util/main-loop.c b/util/main-loop.c
index d9c55df6f5..4ae5b23e99 100644
--- a/util/main-loop.c
+++ b/util/main-loop.c
@@ -170,6 +170,7 @@ int qemu_init_main_loop(Error **errp)
     if (!qemu_aio_context) {
         return -EMFILE;
     }
+    qemu_set_current_aio_context(qemu_aio_context);
     qemu_notify_bh = qemu_bh_new(notify_event_cb, NULL);
     gpollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
     src = aio_get_g_source(qemu_aio_context);
diff --git a/util/qemu-sockets.c b/util/qemu-sockets.c
index c415c342c1..080a240b74 100644
--- a/util/qemu-sockets.c
+++ b/util/qemu-sockets.c
@@ -1164,6 +1164,25 @@ static int socket_get_fd(const char *fdstr, Error **errp)
     return fd;
 }
 
+int socket_address_parse_named_fd(SocketAddress *addr, Error **errp)
+{
+    int fd;
+
+    if (addr->type != SOCKET_ADDRESS_TYPE_FD) {
+        return 0;
+    }
+
+    fd = socket_get_fd(addr->u.fd.str, errp);
+    if (fd < 0) {
+        return fd;
+    }
+
+    g_free(addr->u.fd.str);
+    addr->u.fd.str = g_strdup_printf("%d", fd);
+
+    return 0;
+}
+
 int socket_connect(SocketAddress *addr, Error **errp)
 {
     int fd;