summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--block/nbd.c266
1 files changed, 265 insertions, 1 deletions
diff --git a/block/nbd.c b/block/nbd.c
index 7bb881fef4..9daf003bea 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -38,6 +38,7 @@
 
 #include "qapi/qapi-visit-sockets.h"
 #include "qapi/qmp/qstring.h"
+#include "qapi/clone-visitor.h"
 
 #include "block/qdict.h"
 #include "block/nbd.h"
@@ -62,6 +63,47 @@ typedef enum NBDClientState {
     NBD_CLIENT_QUIT
 } NBDClientState;
 
+typedef enum NBDConnectThreadState {
+    /* No thread, no pending results */
+    CONNECT_THREAD_NONE,
+
+    /* Thread is running, no results for now */
+    CONNECT_THREAD_RUNNING,
+
+    /*
+     * Thread is running, but requestor exited. Thread should close
+     * the new socket and free the connect state on exit.
+     */
+    CONNECT_THREAD_RUNNING_DETACHED,
+
+    /* Thread finished, results are stored in a state */
+    CONNECT_THREAD_FAIL,
+    CONNECT_THREAD_SUCCESS
+} NBDConnectThreadState;
+
+typedef struct NBDConnectThread {
+    /* Initialization constants */
+    SocketAddress *saddr; /* address to connect to */
+    /*
+     * Bottom half to schedule on completion. Scheduled only if bh_ctx is not
+     * NULL
+     */
+    QEMUBHFunc *bh_func;
+    void *bh_opaque;
+
+    /*
+     * Result of last attempt. Valid in FAIL and SUCCESS states.
+     * If you want to steal error, don't forget to set pointer to NULL.
+     */
+    QIOChannelSocket *sioc;
+    Error *err;
+
+    /* state and bh_ctx are protected by mutex */
+    QemuMutex mutex;
+    NBDConnectThreadState state; /* current state of the thread */
+    AioContext *bh_ctx; /* where to schedule bh (NULL means don't schedule) */
+} NBDConnectThread;
+
 typedef struct BDRVNBDState {
     QIOChannelSocket *sioc; /* The master data channel */
     QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */
@@ -91,10 +133,17 @@ typedef struct BDRVNBDState {
     QCryptoTLSCreds *tlscreds;
     const char *hostname;
     char *x_dirty_bitmap;
+
+    bool wait_connect;
+    NBDConnectThread *connect_thread;
 } BDRVNBDState;
 
 static QIOChannelSocket *nbd_establish_connection(SocketAddress *saddr,
                                                   Error **errp);
+static QIOChannelSocket *nbd_co_establish_connection(BlockDriverState *bs,
+                                                     Error **errp);
+static void nbd_co_establish_connection_cancel(BlockDriverState *bs,
+                                               bool detach);
 static int nbd_client_handshake(BlockDriverState *bs, QIOChannelSocket *sioc,
                                 Error **errp);
 
@@ -191,6 +240,8 @@ static void coroutine_fn nbd_client_co_drain_begin(BlockDriverState *bs)
     if (s->connection_co_sleep_ns_state) {
         qemu_co_sleep_wake(s->connection_co_sleep_ns_state);
     }
+
+    nbd_co_establish_connection_cancel(bs, false);
 }
 
 static void coroutine_fn nbd_client_co_drain_end(BlockDriverState *bs)
@@ -223,6 +274,7 @@ static void nbd_teardown_connection(BlockDriverState *bs)
         if (s->connection_co_sleep_ns_state) {
             qemu_co_sleep_wake(s->connection_co_sleep_ns_state);
         }
+        nbd_co_establish_connection_cancel(bs, true);
     }
     if (qemu_in_coroutine()) {
         s->teardown_co = qemu_coroutine_self();
@@ -246,6 +298,216 @@ static bool nbd_client_connecting_wait(BDRVNBDState *s)
     return s->state == NBD_CLIENT_CONNECTING_WAIT;
 }
 
+static void connect_bh(void *opaque)
+{
+    BDRVNBDState *state = opaque;
+
+    assert(state->wait_connect);
+    state->wait_connect = false;
+    aio_co_wake(state->connection_co);
+}
+
+static void nbd_init_connect_thread(BDRVNBDState *s)
+{
+    s->connect_thread = g_new(NBDConnectThread, 1);
+
+    *s->connect_thread = (NBDConnectThread) {
+        .saddr = QAPI_CLONE(SocketAddress, s->saddr),
+        .state = CONNECT_THREAD_NONE,
+        .bh_func = connect_bh,
+        .bh_opaque = s,
+    };
+
+    qemu_mutex_init(&s->connect_thread->mutex);
+}
+
+static void nbd_free_connect_thread(NBDConnectThread *thr)
+{
+    if (thr->sioc) {
+        qio_channel_close(QIO_CHANNEL(thr->sioc), NULL);
+    }
+    error_free(thr->err);
+    qapi_free_SocketAddress(thr->saddr);
+    g_free(thr);
+}
+
+static void *connect_thread_func(void *opaque)
+{
+    NBDConnectThread *thr = opaque;
+    int ret;
+    bool do_free = false;
+
+    thr->sioc = qio_channel_socket_new();
+
+    error_free(thr->err);
+    thr->err = NULL;
+    ret = qio_channel_socket_connect_sync(thr->sioc, thr->saddr, &thr->err);
+    if (ret < 0) {
+        object_unref(OBJECT(thr->sioc));
+        thr->sioc = NULL;
+    }
+
+    qemu_mutex_lock(&thr->mutex);
+
+    switch (thr->state) {
+    case CONNECT_THREAD_RUNNING:
+        thr->state = ret < 0 ? CONNECT_THREAD_FAIL : CONNECT_THREAD_SUCCESS;
+        if (thr->bh_ctx) {
+            aio_bh_schedule_oneshot(thr->bh_ctx, thr->bh_func, thr->bh_opaque);
+
+            /* play safe, don't reuse bh_ctx on further connection attempts */
+            thr->bh_ctx = NULL;
+        }
+        break;
+    case CONNECT_THREAD_RUNNING_DETACHED:
+        do_free = true;
+        break;
+    default:
+        abort();
+    }
+
+    qemu_mutex_unlock(&thr->mutex);
+
+    if (do_free) {
+        nbd_free_connect_thread(thr);
+    }
+
+    return NULL;
+}
+
+static QIOChannelSocket *coroutine_fn
+nbd_co_establish_connection(BlockDriverState *bs, Error **errp)
+{
+    QemuThread thread;
+    BDRVNBDState *s = bs->opaque;
+    QIOChannelSocket *res;
+    NBDConnectThread *thr = s->connect_thread;
+
+    qemu_mutex_lock(&thr->mutex);
+
+    switch (thr->state) {
+    case CONNECT_THREAD_FAIL:
+    case CONNECT_THREAD_NONE:
+        error_free(thr->err);
+        thr->err = NULL;
+        thr->state = CONNECT_THREAD_RUNNING;
+        qemu_thread_create(&thread, "nbd-connect",
+                           connect_thread_func, thr, QEMU_THREAD_DETACHED);
+        break;
+    case CONNECT_THREAD_SUCCESS:
+        /* Previous attempt finally succeeded in background */
+        thr->state = CONNECT_THREAD_NONE;
+        res = thr->sioc;
+        thr->sioc = NULL;
+        qemu_mutex_unlock(&thr->mutex);
+        return res;
+    case CONNECT_THREAD_RUNNING:
+        /* Already running, will wait */
+        break;
+    default:
+        abort();
+    }
+
+    thr->bh_ctx = qemu_get_current_aio_context();
+
+    qemu_mutex_unlock(&thr->mutex);
+
+
+    /*
+     * We are going to wait for connect-thread finish, but
+     * nbd_client_co_drain_begin() can interrupt.
+     *
+     * Note that wait_connect variable is not visible for connect-thread. It
+     * doesn't need mutex protection, it used only inside home aio context of
+     * bs.
+     */
+    s->wait_connect = true;
+    qemu_coroutine_yield();
+
+    qemu_mutex_lock(&thr->mutex);
+
+    switch (thr->state) {
+    case CONNECT_THREAD_SUCCESS:
+    case CONNECT_THREAD_FAIL:
+        thr->state = CONNECT_THREAD_NONE;
+        error_propagate(errp, thr->err);
+        thr->err = NULL;
+        res = thr->sioc;
+        thr->sioc = NULL;
+        break;
+    case CONNECT_THREAD_RUNNING:
+    case CONNECT_THREAD_RUNNING_DETACHED:
+        /*
+         * Obviously, drained section wants to start. Report the attempt as
+         * failed. Still connect thread is executing in background, and its
+         * result may be used for next connection attempt.
+         */
+        res = NULL;
+        error_setg(errp, "Connection attempt cancelled by other operation");
+        break;
+
+    case CONNECT_THREAD_NONE:
+        /*
+         * Impossible. We've seen this thread running. So it should be
+         * running or at least give some results.
+         */
+        abort();
+
+    default:
+        abort();
+    }
+
+    qemu_mutex_unlock(&thr->mutex);
+
+    return res;
+}
+
+/*
+ * nbd_co_establish_connection_cancel
+ * Cancel nbd_co_establish_connection asynchronously: it will finish soon, to
+ * allow drained section to begin.
+ *
+ * If detach is true, also cleanup the state (or if thread is running, move it
+ * to CONNECT_THREAD_RUNNING_DETACHED state). s->connect_thread becomes NULL if
+ * detach is true.
+ */
+static void nbd_co_establish_connection_cancel(BlockDriverState *bs,
+                                               bool detach)
+{
+    BDRVNBDState *s = bs->opaque;
+    NBDConnectThread *thr = s->connect_thread;
+    bool wake = false;
+    bool do_free = false;
+
+    qemu_mutex_lock(&thr->mutex);
+
+    if (thr->state == CONNECT_THREAD_RUNNING) {
+        /* We can cancel only in running state, when bh is not yet scheduled */
+        thr->bh_ctx = NULL;
+        if (s->wait_connect) {
+            s->wait_connect = false;
+            wake = true;
+        }
+        if (detach) {
+            thr->state = CONNECT_THREAD_RUNNING_DETACHED;
+            s->connect_thread = NULL;
+        }
+    } else if (detach) {
+        do_free = true;
+    }
+
+    qemu_mutex_unlock(&thr->mutex);
+
+    if (do_free) {
+        nbd_free_connect_thread(thr);
+        s->connect_thread = NULL;
+    }
+
+    if (wake) {
+        aio_co_wake(s->connection_co);
+    }
+}
+
 static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
 {
     int ret;
@@ -289,7 +551,7 @@ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
         s->ioc = NULL;
     }
 
-    sioc = nbd_establish_connection(s->saddr, &local_err);
+    sioc = nbd_co_establish_connection(s->bs, &local_err);
     if (!sioc) {
         ret = -ECONNREFUSED;
         goto out;
@@ -1946,6 +2208,8 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
     /* successfully connected */
     s->state = NBD_CLIENT_CONNECTED;
 
+    nbd_init_connect_thread(s);
+
     s->connection_co = qemu_coroutine_create(nbd_connection_entry, s);
     bdrv_inc_in_flight(bs);
     aio_co_schedule(bdrv_get_aio_context(bs), s->connection_co);