summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--block/nbd.c375
-rw-r--r--nbd/client.c2
2 files changed, 100 insertions, 277 deletions
diff --git a/block/nbd.c b/block/nbd.c
index 709c2499e3..8ff6daf43d 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -57,7 +57,7 @@
 typedef struct {
     Coroutine *coroutine;
     uint64_t offset;        /* original offset of the request */
-    bool receiving;         /* waiting for connection_co? */
+    bool receiving;         /* sleeping in the yield in nbd_receive_replies */
 } NBDClientRequest;
 
 typedef enum NBDClientState {
@@ -73,14 +73,10 @@ typedef struct BDRVNBDState {
 
     CoMutex send_mutex;
     CoQueue free_sema;
-    Coroutine *connection_co;
-    Coroutine *teardown_co;
-    QemuCoSleep reconnect_sleep;
-    bool drained;
-    bool wait_drained_end;
+
+    CoMutex receive_mutex;
     int in_flight;
     NBDClientState state;
-    bool wait_in_flight;
 
     QEMUTimer *reconnect_delay_timer;
 
@@ -163,6 +159,8 @@ static void nbd_channel_error(BDRVNBDState *s, int ret)
     } else {
         s->state = NBD_CLIENT_QUIT;
     }
+
+    nbd_recv_coroutines_wake(s, true);
 }
 
 static void reconnect_delay_timer_del(BDRVNBDState *s)
@@ -179,6 +177,7 @@ static void reconnect_delay_timer_cb(void *opaque)
 
     if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
         s->state = NBD_CLIENT_CONNECTING_NOWAIT;
+        nbd_co_establish_connection_cancel(s->conn);
         while (qemu_co_enter_next(&s->free_sema, NULL)) {
             /* Resume all queued requests */
         }
@@ -201,113 +200,21 @@ static void reconnect_delay_timer_init(BDRVNBDState *s, uint64_t expire_time_ns)
     timer_mod(s->reconnect_delay_timer, expire_time_ns);
 }
 
-static void nbd_client_detach_aio_context(BlockDriverState *bs)
-{
-    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
-
-    /* Timer is deleted in nbd_client_co_drain_begin() */
-    assert(!s->reconnect_delay_timer);
-    /*
-     * If reconnect is in progress we may have no ->ioc.  It will be
-     * re-instantiated in the proper aio context once the connection is
-     * reestablished.
-     */
-    if (s->ioc) {
-        qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
-    }
-}
-
-static void nbd_client_attach_aio_context_bh(void *opaque)
-{
-    BlockDriverState *bs = opaque;
-    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
-
-    if (s->connection_co) {
-        /*
-         * The node is still drained, so we know the coroutine has yielded in
-         * nbd_read_eof(), the only place where bs->in_flight can reach 0, or
-         * it is entered for the first time. Both places are safe for entering
-         * the coroutine.
-         */
-        qemu_aio_coroutine_enter(bs->aio_context, s->connection_co);
-    }
-    bdrv_dec_in_flight(bs);
-}
-
-static void nbd_client_attach_aio_context(BlockDriverState *bs,
-                                          AioContext *new_context)
-{
-    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
-
-    /*
-     * s->connection_co is either yielded from nbd_receive_reply or from
-     * nbd_co_reconnect_loop()
-     */
-    if (nbd_client_connected(s)) {
-        qio_channel_attach_aio_context(QIO_CHANNEL(s->ioc), new_context);
-    }
-
-    bdrv_inc_in_flight(bs);
-
-    /*
-     * Need to wait here for the BH to run because the BH must run while the
-     * node is still drained.
-     */
-    aio_wait_bh_oneshot(new_context, nbd_client_attach_aio_context_bh, bs);
-}
-
-static void coroutine_fn nbd_client_co_drain_begin(BlockDriverState *bs)
-{
-    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
-
-    s->drained = true;
-    qemu_co_sleep_wake(&s->reconnect_sleep);
-
-    nbd_co_establish_connection_cancel(s->conn);
-
-    reconnect_delay_timer_del(s);
-
-    if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
-        s->state = NBD_CLIENT_CONNECTING_NOWAIT;
-        qemu_co_queue_restart_all(&s->free_sema);
-    }
-}
-
-static void coroutine_fn nbd_client_co_drain_end(BlockDriverState *bs)
-{
-    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
-
-    s->drained = false;
-    if (s->wait_drained_end) {
-        s->wait_drained_end = false;
-        aio_co_wake(s->connection_co);
-    }
-}
-
-
 static void nbd_teardown_connection(BlockDriverState *bs)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 
+    assert(!s->in_flight);
+
     if (s->ioc) {
-        /* finish any pending coroutines */
         qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
+        yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
+                                 nbd_yank, s->bs);
+        object_unref(OBJECT(s->ioc));
+        s->ioc = NULL;
     }
 
     s->state = NBD_CLIENT_QUIT;
-    if (s->connection_co) {
-        qemu_co_sleep_wake(&s->reconnect_sleep);
-        nbd_co_establish_connection_cancel(s->conn);
-    }
-    if (qemu_in_coroutine()) {
-        s->teardown_co = qemu_coroutine_self();
-        /* connection_co resumes us when it terminates */
-        qemu_coroutine_yield();
-        s->teardown_co = NULL;
-    } else {
-        BDRV_POLL_WHILE(bs, s->connection_co);
-    }
-    assert(!s->connection_co);
 }
 
 static bool nbd_client_connecting(BDRVNBDState *s)
@@ -372,10 +279,11 @@ int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     int ret;
+    bool blocking = nbd_client_connecting_wait(s);
 
     assert(!s->ioc);
 
-    s->ioc = nbd_co_establish_connection(s->conn, &s->info, true, errp);
+    s->ioc = nbd_co_establish_connection(s->conn, &s->info, blocking, errp);
     if (!s->ioc) {
         return -ECONNREFUSED;
     }
@@ -411,29 +319,22 @@ int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
     return 0;
 }
 
+/* called under s->send_mutex */
 static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
 {
-    if (!nbd_client_connecting(s)) {
-        return;
-    }
+    assert(nbd_client_connecting(s));
+    assert(s->in_flight == 0);
 
-    /* Wait for completion of all in-flight requests */
-
-    qemu_co_mutex_lock(&s->send_mutex);
-
-    while (s->in_flight > 0) {
-        qemu_co_mutex_unlock(&s->send_mutex);
-        nbd_recv_coroutines_wake(s, true);
-        s->wait_in_flight = true;
-        qemu_coroutine_yield();
-        s->wait_in_flight = false;
-        qemu_co_mutex_lock(&s->send_mutex);
-    }
-
-    qemu_co_mutex_unlock(&s->send_mutex);
-
-    if (!nbd_client_connecting(s)) {
-        return;
+    if (nbd_client_connecting_wait(s) && s->reconnect_delay &&
+        !s->reconnect_delay_timer)
+    {
+        /*
+         * It's first reconnect attempt after switching to
+         * NBD_CLIENT_CONNECTING_WAIT
+         */
+        reconnect_delay_timer_init(s,
+            qemu_clock_get_ns(QEMU_CLOCK_REALTIME) +
+            s->reconnect_delay * NANOSECONDS_PER_SECOND);
     }
 
     /*
@@ -453,135 +354,80 @@ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
     nbd_co_do_establish_connection(s->bs, NULL);
 }
 
-static coroutine_fn void nbd_co_reconnect_loop(BDRVNBDState *s)
+static coroutine_fn int nbd_receive_replies(BDRVNBDState *s, uint64_t handle)
 {
-    uint64_t timeout = 1 * NANOSECONDS_PER_SECOND;
-    uint64_t max_timeout = 16 * NANOSECONDS_PER_SECOND;
+    int ret;
+    uint64_t ind = HANDLE_TO_INDEX(s, handle), ind2;
+    QEMU_LOCK_GUARD(&s->receive_mutex);
 
-    if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
-        reconnect_delay_timer_init(s, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) +
-                                   s->reconnect_delay * NANOSECONDS_PER_SECOND);
-    }
-
-    nbd_reconnect_attempt(s);
-
-    while (nbd_client_connecting(s)) {
-        if (s->drained) {
-            bdrv_dec_in_flight(s->bs);
-            s->wait_drained_end = true;
-            while (s->drained) {
-                /*
-                 * We may be entered once from nbd_client_attach_aio_context_bh
-                 * and then from nbd_client_co_drain_end. So here is a loop.
-                 */
-                qemu_coroutine_yield();
-            }
-            bdrv_inc_in_flight(s->bs);
-        } else {
-            qemu_co_sleep_ns_wakeable(&s->reconnect_sleep,
-                                      QEMU_CLOCK_REALTIME, timeout);
-            if (s->drained) {
-                continue;
-            }
-            if (timeout < max_timeout) {
-                timeout *= 2;
-            }
+    while (true) {
+        if (s->reply.handle == handle) {
+            /* We are done */
+            return 0;
         }
 
-        nbd_reconnect_attempt(s);
-    }
-
-    reconnect_delay_timer_del(s);
-}
+        if (!nbd_client_connected(s)) {
+            return -EIO;
+        }
 
-static coroutine_fn void nbd_connection_entry(void *opaque)
-{
-    BDRVNBDState *s = opaque;
-    uint64_t i;
-    int ret = 0;
-    Error *local_err = NULL;
+        if (s->reply.handle != 0) {
+            /*
+             * Some other request is being handled now. It should already be
+             * woken by whoever set s->reply.handle (or never wait in this
+             * yield). So, we should not wake it here.
+             */
+            ind2 = HANDLE_TO_INDEX(s, s->reply.handle);
+            assert(!s->requests[ind2].receiving);
 
-    while (qatomic_load_acquire(&s->state) != NBD_CLIENT_QUIT) {
-        /*
-         * The NBD client can only really be considered idle when it has
-         * yielded from qio_channel_readv_all_eof(), waiting for data. This is
-         * the point where the additional scheduled coroutine entry happens
-         * after nbd_client_attach_aio_context().
-         *
-         * Therefore we keep an additional in_flight reference all the time and
-         * only drop it temporarily here.
-         */
+            s->requests[ind].receiving = true;
+            qemu_co_mutex_unlock(&s->receive_mutex);
 
-        if (nbd_client_connecting(s)) {
-            nbd_co_reconnect_loop(s);
-        }
+            qemu_coroutine_yield();
+            /*
+             * We may be woken for 3 reasons:
+             * 1. From this function, executing in parallel coroutine, when our
+             *    handle is received.
+             * 2. From nbd_channel_error(), when connection is lost.
+             * 3. From nbd_co_receive_one_chunk(), when previous request is
+             *    finished and s->reply.handle set to 0.
+             * Anyway, it's OK to lock the mutex and go to the next iteration.
+             */
 
-        if (!nbd_client_connected(s)) {
+            qemu_co_mutex_lock(&s->receive_mutex);
+            assert(!s->requests[ind].receiving);
             continue;
         }
 
+        /* We are under mutex and handle is 0. We have to do the dirty work. */
         assert(s->reply.handle == 0);
-        ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, &local_err);
-
-        if (local_err) {
-            trace_nbd_read_reply_entry_fail(ret, error_get_pretty(local_err));
-            error_free(local_err);
-            local_err = NULL;
-        }
+        ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, NULL);
         if (ret <= 0) {
-            nbd_channel_error(s, ret ? ret : -EIO);
-            continue;
+            ret = ret ? ret : -EIO;
+            nbd_channel_error(s, ret);
+            return ret;
         }
-
-        /*
-         * There's no need for a mutex on the receive side, because the
-         * handler acts as a synchronization point and ensures that only
-         * one coroutine is called until the reply finishes.
-         */
-        i = HANDLE_TO_INDEX(s, s->reply.handle);
-        if (i >= MAX_NBD_REQUESTS ||
-            !s->requests[i].coroutine ||
-            !s->requests[i].receiving ||
-            (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply))
-        {
+        if (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply) {
             nbd_channel_error(s, -EINVAL);
-            continue;
+            return -EINVAL;
         }
-
-        /*
-         * We're woken up again by the request itself.  Note that there
-         * is no race between yielding and reentering connection_co.  This
-         * is because:
-         *
-         * - if the request runs on the same AioContext, it is only
-         *   entered after we yield
-         *
-         * - if the request runs on a different AioContext, reentering
-         *   connection_co happens through a bottom half, which can only
-         *   run after we yield.
-         */
-        s->requests[i].receiving = false;
-        aio_co_wake(s->requests[i].coroutine);
-        qemu_coroutine_yield();
-    }
-
-    qemu_co_queue_restart_all(&s->free_sema);
-    nbd_recv_coroutines_wake(s, true);
-    bdrv_dec_in_flight(s->bs);
-
-    s->connection_co = NULL;
-    if (s->ioc) {
-        qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
-        yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
-                                 nbd_yank, s->bs);
-        object_unref(OBJECT(s->ioc));
-        s->ioc = NULL;
-    }
-
-    if (s->teardown_co) {
-        aio_co_wake(s->teardown_co);
+        if (s->reply.handle == handle) {
+            /* We are done */
+            return 0;
+        }
+        ind2 = HANDLE_TO_INDEX(s, s->reply.handle);
+        if (ind2 >= MAX_NBD_REQUESTS || !s->requests[ind2].coroutine) {
+            /*
+             * We only check that ind2 request exists. But don't check
+             * whether it is now waiting for the reply header or
+             * not. We can't just check s->requests[ind2].receiving:
+             * ind2 request may wait in trying to lock
+             * receive_mutex. So that's a TODO.
+             */
+            nbd_channel_error(s, -EINVAL);
+            return -EINVAL;
+        }
+        nbd_recv_coroutine_wake_one(&s->requests[ind2]);
     }
-    aio_wait_kick();
 }
 
 static int nbd_co_send_request(BlockDriverState *bs,
@@ -592,10 +438,17 @@ static int nbd_co_send_request(BlockDriverState *bs,
     int rc, i = -1;
 
     qemu_co_mutex_lock(&s->send_mutex);
-    while (s->in_flight == MAX_NBD_REQUESTS || nbd_client_connecting_wait(s)) {
+
+    while (s->in_flight == MAX_NBD_REQUESTS ||
+           (!nbd_client_connected(s) && s->in_flight > 0))
+    {
         qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
     }
 
+    if (nbd_client_connecting(s)) {
+        nbd_reconnect_attempt(s);
+    }
+
     if (!nbd_client_connected(s)) {
         rc = -EIO;
         goto err;
@@ -642,10 +495,6 @@ err:
         if (i != -1) {
             s->requests[i].coroutine = NULL;
             s->in_flight--;
-        }
-        if (s->in_flight == 0 && s->wait_in_flight) {
-            aio_co_wake(s->connection_co);
-        } else {
             qemu_co_queue_next(&s->free_sema);
         }
     }
@@ -944,10 +793,7 @@ static coroutine_fn int nbd_co_do_receive_one_chunk(
     }
     *request_ret = 0;
 
-    /* Wait until we're woken up by nbd_connection_entry.  */
-    s->requests[i].receiving = true;
-    qemu_coroutine_yield();
-    assert(!s->requests[i].receiving);
+    nbd_receive_replies(s, handle);
     if (!nbd_client_connected(s)) {
         error_setg(errp, "Connection closed");
         return -EIO;
@@ -1040,14 +886,7 @@ static coroutine_fn int nbd_co_receive_one_chunk(
     }
     s->reply.handle = 0;
 
-    if (s->connection_co && !s->wait_in_flight) {
-        /*
-         * We must check s->wait_in_flight, because we may entered by
-         * nbd_recv_coroutines_wake(), in this case we should not
-         * wake connection_co here, it will woken by last request.
-         */
-        aio_co_wake(s->connection_co);
-    }
+    nbd_recv_coroutines_wake(s, false);
 
     return ret;
 }
@@ -1158,11 +997,7 @@ break_loop:
 
     qemu_co_mutex_lock(&s->send_mutex);
     s->in_flight--;
-    if (s->in_flight == 0 && s->wait_in_flight) {
-        aio_co_wake(s->connection_co);
-    } else {
-        qemu_co_queue_next(&s->free_sema);
-    }
+    qemu_co_queue_next(&s->free_sema);
     qemu_co_mutex_unlock(&s->send_mutex);
 
     return false;
@@ -1984,6 +1819,7 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
     s->bs = bs;
     qemu_co_mutex_init(&s->send_mutex);
     qemu_co_queue_init(&s->free_sema);
+    qemu_co_mutex_init(&s->receive_mutex);
 
     if (!yank_register_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name), errp)) {
         return -EEXIST;
@@ -1998,14 +1834,13 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
                                         s->x_dirty_bitmap, s->tlscreds);
 
     /* TODO: Configurable retry-until-timeout behaviour. */
+    s->state = NBD_CLIENT_CONNECTING_WAIT;
     ret = nbd_do_establish_connection(bs, errp);
     if (ret < 0) {
         goto fail;
     }
 
-    s->connection_co = qemu_coroutine_create(nbd_connection_entry, s);
-    bdrv_inc_in_flight(bs);
-    aio_co_schedule(bdrv_get_aio_context(bs), s->connection_co);
+    nbd_client_connection_enable_retry(s->conn);
 
     return 0;
 
@@ -2159,6 +1994,8 @@ static void nbd_cancel_in_flight(BlockDriverState *bs)
         s->state = NBD_CLIENT_CONNECTING_NOWAIT;
         qemu_co_queue_restart_all(&s->free_sema);
     }
+
+    nbd_co_establish_connection_cancel(s->conn);
 }
 
 static BlockDriver bdrv_nbd = {
@@ -2179,10 +2016,6 @@ static BlockDriver bdrv_nbd = {
     .bdrv_refresh_limits        = nbd_refresh_limits,
     .bdrv_co_truncate           = nbd_co_truncate,
     .bdrv_getlength             = nbd_getlength,
-    .bdrv_detach_aio_context    = nbd_client_detach_aio_context,
-    .bdrv_attach_aio_context    = nbd_client_attach_aio_context,
-    .bdrv_co_drain_begin        = nbd_client_co_drain_begin,
-    .bdrv_co_drain_end          = nbd_client_co_drain_end,
     .bdrv_refresh_filename      = nbd_refresh_filename,
     .bdrv_co_block_status       = nbd_client_co_block_status,
     .bdrv_dirname               = nbd_dirname,
@@ -2208,10 +2041,6 @@ static BlockDriver bdrv_nbd_tcp = {
     .bdrv_refresh_limits        = nbd_refresh_limits,
     .bdrv_co_truncate           = nbd_co_truncate,
     .bdrv_getlength             = nbd_getlength,
-    .bdrv_detach_aio_context    = nbd_client_detach_aio_context,
-    .bdrv_attach_aio_context    = nbd_client_attach_aio_context,
-    .bdrv_co_drain_begin        = nbd_client_co_drain_begin,
-    .bdrv_co_drain_end          = nbd_client_co_drain_end,
     .bdrv_refresh_filename      = nbd_refresh_filename,
     .bdrv_co_block_status       = nbd_client_co_block_status,
     .bdrv_dirname               = nbd_dirname,
@@ -2237,10 +2066,6 @@ static BlockDriver bdrv_nbd_unix = {
     .bdrv_refresh_limits        = nbd_refresh_limits,
     .bdrv_co_truncate           = nbd_co_truncate,
     .bdrv_getlength             = nbd_getlength,
-    .bdrv_detach_aio_context    = nbd_client_detach_aio_context,
-    .bdrv_attach_aio_context    = nbd_client_attach_aio_context,
-    .bdrv_co_drain_begin        = nbd_client_co_drain_begin,
-    .bdrv_co_drain_end          = nbd_client_co_drain_end,
     .bdrv_refresh_filename      = nbd_refresh_filename,
     .bdrv_co_block_status       = nbd_client_co_block_status,
     .bdrv_dirname               = nbd_dirname,
diff --git a/nbd/client.c b/nbd/client.c
index 0c2db4bcba..30d5383cb1 100644
--- a/nbd/client.c
+++ b/nbd/client.c
@@ -1434,9 +1434,7 @@ nbd_read_eof(BlockDriverState *bs, QIOChannel *ioc, void *buffer, size_t size,
 
         len = qio_channel_readv(ioc, &iov, 1, errp);
         if (len == QIO_CHANNEL_ERR_BLOCK) {
-            bdrv_dec_in_flight(bs);
             qio_channel_yield(ioc, G_IO_IN);
-            bdrv_inc_in_flight(bs);
             continue;
         } else if (len < 0) {
             return -EIO;