summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--block/io.c18
-rw-r--r--block/nbd.c195
-rw-r--r--block/stream.c24
-rw-r--r--include/block/block.h8
-rw-r--r--include/block/nbd.h3
-rw-r--r--nbd/client.c16
-rw-r--r--nbd/server.c43
-rw-r--r--qapi/block-core.json11
-rw-r--r--qapi/sockets.json6
-rw-r--r--qemu-nbd.c2
-rw-r--r--util/qemu-sockets.c28
11 files changed, 233 insertions, 121 deletions
diff --git a/block/io.c b/block/io.c
index 17a243cde9..56bbf195bb 100644
--- a/block/io.c
+++ b/block/io.c
@@ -1168,7 +1168,8 @@ bdrv_driver_pwritev_compressed(BlockDriverState *bs, uint64_t offset,
 }
 
 static int coroutine_fn bdrv_co_do_copy_on_readv(BdrvChild *child,
-        int64_t offset, unsigned int bytes, QEMUIOVector *qiov)
+        int64_t offset, unsigned int bytes, QEMUIOVector *qiov,
+        int flags)
 {
     BlockDriverState *bs = child->bs;
 
@@ -1279,9 +1280,11 @@ static int coroutine_fn bdrv_co_do_copy_on_readv(BdrvChild *child,
                 goto err;
             }
 
-            qemu_iovec_from_buf(qiov, progress, bounce_buffer + skip_bytes,
-                                pnum - skip_bytes);
-        } else {
+            if (!(flags & BDRV_REQ_PREFETCH)) {
+                qemu_iovec_from_buf(qiov, progress, bounce_buffer + skip_bytes,
+                                    pnum - skip_bytes);
+            }
+        } else if (!(flags & BDRV_REQ_PREFETCH)) {
             /* Read directly into the destination */
             qemu_iovec_init(&local_qiov, qiov->niov);
             qemu_iovec_concat(&local_qiov, qiov, progress, pnum - skip_bytes);
@@ -1332,7 +1335,8 @@ static int coroutine_fn bdrv_aligned_preadv(BdrvChild *child,
      * potential fallback support, if we ever implement any read flags
      * to pass through to drivers.  For now, there aren't any
      * passthrough flags.  */
-    assert(!(flags & ~(BDRV_REQ_NO_SERIALISING | BDRV_REQ_COPY_ON_READ)));
+    assert(!(flags & ~(BDRV_REQ_NO_SERIALISING | BDRV_REQ_COPY_ON_READ |
+                       BDRV_REQ_PREFETCH)));
 
     /* Handle Copy on Read and associated serialisation */
     if (flags & BDRV_REQ_COPY_ON_READ) {
@@ -1360,7 +1364,9 @@ static int coroutine_fn bdrv_aligned_preadv(BdrvChild *child,
         }
 
         if (!ret || pnum != bytes) {
-            ret = bdrv_co_do_copy_on_readv(child, offset, bytes, qiov);
+            ret = bdrv_co_do_copy_on_readv(child, offset, bytes, qiov, flags);
+            goto out;
+        } else if (flags & BDRV_REQ_PREFETCH) {
             goto out;
         }
     }
diff --git a/block/nbd.c b/block/nbd.c
index 56b1c6ec74..beed46fb34 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -54,6 +54,11 @@ typedef struct {
     bool receiving;         /* waiting for connection_co? */
 } NBDClientRequest;
 
+typedef enum NBDClientState {
+    NBD_CLIENT_CONNECTED,
+    NBD_CLIENT_QUIT
+} NBDClientState;
+
 typedef struct BDRVNBDState {
     QIOChannelSocket *sioc; /* The master data channel */
     QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */
@@ -63,17 +68,27 @@ typedef struct BDRVNBDState {
     CoQueue free_sema;
     Coroutine *connection_co;
     int in_flight;
+    NBDClientState state;
 
     NBDClientRequest requests[MAX_NBD_REQUESTS];
     NBDReply reply;
     BlockDriverState *bs;
-    bool quit;
 
-    /* For nbd_refresh_filename() */
+    /* Connection parameters */
+    uint32_t reconnect_delay;
     SocketAddress *saddr;
     char *export, *tlscredsid;
+    QCryptoTLSCreds *tlscreds;
+    const char *hostname;
+    char *x_dirty_bitmap;
 } BDRVNBDState;
 
+/* @ret will be used for reconnect in future */
+static void nbd_channel_error(BDRVNBDState *s, int ret)
+{
+    s->state = NBD_CLIENT_QUIT;
+}
+
 static void nbd_recv_coroutines_wake_all(BDRVNBDState *s)
 {
     int i;
@@ -152,7 +167,7 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
     int ret = 0;
     Error *local_err = NULL;
 
-    while (!s->quit) {
+    while (s->state != NBD_CLIENT_QUIT) {
         /*
          * The NBD client can only really be considered idle when it has
          * yielded from qio_channel_readv_all_eof(), waiting for data. This is
@@ -170,6 +185,7 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
             error_free(local_err);
         }
         if (ret <= 0) {
+            nbd_channel_error(s, ret ? ret : -EIO);
             break;
         }
 
@@ -184,6 +200,7 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
             !s->requests[i].receiving ||
             (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply))
         {
+            nbd_channel_error(s, -EINVAL);
             break;
         }
 
@@ -203,7 +220,6 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
         qemu_coroutine_yield();
     }
 
-    s->quit = true;
     nbd_recv_coroutines_wake_all(s);
     bdrv_dec_in_flight(s->bs);
 
@@ -216,12 +232,18 @@ static int nbd_co_send_request(BlockDriverState *bs,
                                QEMUIOVector *qiov)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
-    int rc, i;
+    int rc, i = -1;
 
     qemu_co_mutex_lock(&s->send_mutex);
     while (s->in_flight == MAX_NBD_REQUESTS) {
         qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
     }
+
+    if (s->state != NBD_CLIENT_CONNECTED) {
+        rc = -EIO;
+        goto err;
+    }
+
     s->in_flight++;
 
     for (i = 0; i < MAX_NBD_REQUESTS; i++) {
@@ -239,16 +261,12 @@ static int nbd_co_send_request(BlockDriverState *bs,
 
     request->handle = INDEX_TO_HANDLE(s, i);
 
-    if (s->quit) {
-        rc = -EIO;
-        goto err;
-    }
     assert(s->ioc);
 
     if (qiov) {
         qio_channel_set_cork(s->ioc, true);
         rc = nbd_send_request(s->ioc, request);
-        if (rc >= 0 && !s->quit) {
+        if (rc >= 0 && s->state == NBD_CLIENT_CONNECTED) {
             if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
                                        NULL) < 0) {
                 rc = -EIO;
@@ -263,9 +281,11 @@ static int nbd_co_send_request(BlockDriverState *bs,
 
 err:
     if (rc < 0) {
-        s->quit = true;
-        s->requests[i].coroutine = NULL;
-        s->in_flight--;
+        nbd_channel_error(s, rc);
+        if (i != -1) {
+            s->requests[i].coroutine = NULL;
+            s->in_flight--;
+        }
         qemu_co_queue_next(&s->free_sema);
     }
     qemu_co_mutex_unlock(&s->send_mutex);
@@ -557,7 +577,7 @@ static coroutine_fn int nbd_co_do_receive_one_chunk(
     s->requests[i].receiving = true;
     qemu_coroutine_yield();
     s->requests[i].receiving = false;
-    if (s->quit) {
+    if (s->state != NBD_CLIENT_CONNECTED) {
         error_setg(errp, "Connection closed");
         return -EIO;
     }
@@ -642,7 +662,7 @@ static coroutine_fn int nbd_co_receive_one_chunk(
 
     if (ret < 0) {
         memset(reply, 0, sizeof(*reply));
-        s->quit = true;
+        nbd_channel_error(s, ret);
     } else {
         /* For assert at loop start in nbd_connection_entry */
         *reply = s->reply;
@@ -710,7 +730,7 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
     NBDReply local_reply;
     NBDStructuredReplyChunk *chunk;
     Error *local_err = NULL;
-    if (s->quit) {
+    if (s->state != NBD_CLIENT_CONNECTED) {
         error_setg(&local_err, "Connection closed");
         nbd_iter_channel_error(iter, -EIO, &local_err);
         goto break_loop;
@@ -735,7 +755,7 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
     }
 
     /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
-    if (nbd_reply_is_simple(reply) || s->quit) {
+    if (nbd_reply_is_simple(reply) || s->state != NBD_CLIENT_CONNECTED) {
         goto break_loop;
     }
 
@@ -809,14 +829,14 @@ static int nbd_co_receive_cmdread_reply(BDRVNBDState *s, uint64_t handle,
             ret = nbd_parse_offset_hole_payload(s, &reply.structured, payload,
                                                 offset, qiov, &local_err);
             if (ret < 0) {
-                s->quit = true;
+                nbd_channel_error(s, ret);
                 nbd_iter_channel_error(&iter, ret, &local_err);
             }
             break;
         default:
             if (!nbd_reply_type_is_error(chunk->type)) {
                 /* not allowed reply type */
-                s->quit = true;
+                nbd_channel_error(s, -EINVAL);
                 error_setg(&local_err,
                            "Unexpected reply type: %d (%s) for CMD_READ",
                            chunk->type, nbd_reply_type_lookup(chunk->type));
@@ -854,7 +874,7 @@ static int nbd_co_receive_blockstatus_reply(BDRVNBDState *s,
         switch (chunk->type) {
         case NBD_REPLY_TYPE_BLOCK_STATUS:
             if (received) {
-                s->quit = true;
+                nbd_channel_error(s, -EINVAL);
                 error_setg(&local_err, "Several BLOCK_STATUS chunks in reply");
                 nbd_iter_channel_error(&iter, -EINVAL, &local_err);
             }
@@ -864,13 +884,13 @@ static int nbd_co_receive_blockstatus_reply(BDRVNBDState *s,
                                                 payload, length, extent,
                                                 &local_err);
             if (ret < 0) {
-                s->quit = true;
+                nbd_channel_error(s, ret);
                 nbd_iter_channel_error(&iter, ret, &local_err);
             }
             break;
         default:
             if (!nbd_reply_type_is_error(chunk->type)) {
-                s->quit = true;
+                nbd_channel_error(s, -EINVAL);
                 error_setg(&local_err,
                            "Unexpected reply type: %d (%s) "
                            "for CMD_BLOCK_STATUS",
@@ -1167,47 +1187,43 @@ static QIOChannelSocket *nbd_establish_connection(SocketAddress *saddr,
     return sioc;
 }
 
-static int nbd_client_connect(BlockDriverState *bs,
-                              SocketAddress *saddr,
-                              const char *export,
-                              QCryptoTLSCreds *tlscreds,
-                              const char *hostname,
-                              const char *x_dirty_bitmap,
-                              Error **errp)
+static int nbd_client_connect(BlockDriverState *bs, Error **errp)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
+    AioContext *aio_context = bdrv_get_aio_context(bs);
     int ret;
 
     /*
      * establish TCP connection, return error if it fails
      * TODO: Configurable retry-until-timeout behaviour.
      */
-    QIOChannelSocket *sioc = nbd_establish_connection(saddr, errp);
+    QIOChannelSocket *sioc = nbd_establish_connection(s->saddr, errp);
 
     if (!sioc) {
         return -ECONNREFUSED;
     }
 
     /* NBD handshake */
-    trace_nbd_client_connect(export);
-    qio_channel_set_blocking(QIO_CHANNEL(sioc), true, NULL);
+    trace_nbd_client_connect(s->export);
+    qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
+    qio_channel_attach_aio_context(QIO_CHANNEL(sioc), aio_context);
 
     s->info.request_sizes = true;
     s->info.structured_reply = true;
     s->info.base_allocation = true;
-    s->info.x_dirty_bitmap = g_strdup(x_dirty_bitmap);
-    s->info.name = g_strdup(export ?: "");
-    ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), tlscreds, hostname,
-                                &s->ioc, &s->info, errp);
+    s->info.x_dirty_bitmap = g_strdup(s->x_dirty_bitmap);
+    s->info.name = g_strdup(s->export ?: "");
+    ret = nbd_receive_negotiate(aio_context, QIO_CHANNEL(sioc), s->tlscreds,
+                                s->hostname, &s->ioc, &s->info, errp);
     g_free(s->info.x_dirty_bitmap);
     g_free(s->info.name);
     if (ret < 0) {
         object_unref(OBJECT(sioc));
         return ret;
     }
-    if (x_dirty_bitmap && !s->info.base_allocation) {
+    if (s->x_dirty_bitmap && !s->info.base_allocation) {
         error_setg(errp, "requested x-dirty-bitmap %s not found",
-                   x_dirty_bitmap);
+                   s->x_dirty_bitmap);
         ret = -EINVAL;
         goto fail;
     }
@@ -1232,24 +1248,14 @@ static int nbd_client_connect(BlockDriverState *bs,
         object_ref(OBJECT(s->ioc));
     }
 
-    /*
-     * Now that we're connected, set the socket to be non-blocking and
-     * kick the reply mechanism.
-     */
-    qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
-    s->connection_co = qemu_coroutine_create(nbd_connection_entry, s);
-    bdrv_inc_in_flight(bs);
-    nbd_client_attach_aio_context(bs, bdrv_get_aio_context(bs));
-
-    trace_nbd_client_connect_success(export);
+    trace_nbd_client_connect_success(s->export);
 
     return 0;
 
  fail:
     /*
-     * We have connected, but must fail for other reasons. The
-     * connection is still blocking; send NBD_CMD_DISC as a courtesy
-     * to the server.
+     * We have connected, but must fail for other reasons.
+     * Send NBD_CMD_DISC as a courtesy to the server.
      */
     {
         NBDRequest request = { .type = NBD_CMD_DISC };
@@ -1262,23 +1268,9 @@ static int nbd_client_connect(BlockDriverState *bs,
     }
 }
 
-static int nbd_client_init(BlockDriverState *bs,
-                           SocketAddress *saddr,
-                           const char *export,
-                           QCryptoTLSCreds *tlscreds,
-                           const char *hostname,
-                           const char *x_dirty_bitmap,
-                           Error **errp)
-{
-    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
-
-    s->bs = bs;
-    qemu_co_mutex_init(&s->send_mutex);
-    qemu_co_queue_init(&s->free_sema);
-
-    return nbd_client_connect(bs, saddr, export, tlscreds, hostname,
-                              x_dirty_bitmap, errp);
-}
+/*
+ * Parse nbd_open options
+ */
 
 static int nbd_parse_uri(const char *filename, QDict *options)
 {
@@ -1583,18 +1575,27 @@ static QemuOptsList nbd_runtime_opts = {
             .help = "experimental: expose named dirty bitmap in place of "
                     "block status",
         },
+        {
+            .name = "reconnect-delay",
+            .type = QEMU_OPT_NUMBER,
+            .help = "On an unexpected disconnect, the nbd client tries to "
+                    "connect again until succeeding or encountering a serious "
+                    "error.  During the first @reconnect-delay seconds, all "
+                    "requests are paused and will be rerun on a successful "
+                    "reconnect. After that time, any delayed requests and all "
+                    "future requests before a successful reconnect will "
+                    "immediately fail. Default 0",
+        },
         { /* end of list */ }
     },
 };
 
-static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
-                    Error **errp)
+static int nbd_process_options(BlockDriverState *bs, QDict *options,
+                               Error **errp)
 {
     BDRVNBDState *s = bs->opaque;
-    QemuOpts *opts = NULL;
+    QemuOpts *opts;
     Error *local_err = NULL;
-    QCryptoTLSCreds *tlscreds = NULL;
-    const char *hostname = NULL;
     int ret = -EINVAL;
 
     opts = qemu_opts_create(&nbd_runtime_opts, NULL, 0, &error_abort);
@@ -1619,8 +1620,8 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
 
     s->tlscredsid = g_strdup(qemu_opt_get(opts, "tls-creds"));
     if (s->tlscredsid) {
-        tlscreds = nbd_get_tls_creds(s->tlscredsid, errp);
-        if (!tlscreds) {
+        s->tlscreds = nbd_get_tls_creds(s->tlscredsid, errp);
+        if (!s->tlscreds) {
             goto error;
         }
 
@@ -1629,18 +1630,17 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
             error_setg(errp, "TLS only supported over IP sockets");
             goto error;
         }
-        hostname = s->saddr->u.inet.host;
+        s->hostname = s->saddr->u.inet.host;
     }
 
-    /* NBD handshake */
-    ret = nbd_client_init(bs, s->saddr, s->export, tlscreds, hostname,
-                          qemu_opt_get(opts, "x-dirty-bitmap"), errp);
+    s->x_dirty_bitmap = g_strdup(qemu_opt_get(opts, "x-dirty-bitmap"));
+    s->reconnect_delay = qemu_opt_get_number(opts, "reconnect-delay", 0);
+
+    ret = 0;
 
  error:
-    if (tlscreds) {
-        object_unref(OBJECT(tlscreds));
-    }
     if (ret < 0) {
+        object_unref(OBJECT(s->tlscreds));
         qapi_free_SocketAddress(s->saddr);
         g_free(s->export);
         g_free(s->tlscredsid);
@@ -1649,6 +1649,35 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
     return ret;
 }
 
+static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
+                    Error **errp)
+{
+    int ret;
+    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
+
+    ret = nbd_process_options(bs, options, errp);
+    if (ret < 0) {
+        return ret;
+    }
+
+    s->bs = bs;
+    qemu_co_mutex_init(&s->send_mutex);
+    qemu_co_queue_init(&s->free_sema);
+
+    ret = nbd_client_connect(bs, errp);
+    if (ret < 0) {
+        return ret;
+    }
+    /* successfully connected */
+    s->state = NBD_CLIENT_CONNECTED;
+
+    s->connection_co = qemu_coroutine_create(nbd_connection_entry, s);
+    bdrv_inc_in_flight(bs);
+    aio_co_schedule(bdrv_get_aio_context(bs), s->connection_co);
+
+    return 0;
+}
+
 static int nbd_co_flush(BlockDriverState *bs)
 {
     return nbd_client_co_flush(bs);
@@ -1694,9 +1723,11 @@ static void nbd_close(BlockDriverState *bs)
 
     nbd_client_close(bs);
 
+    object_unref(OBJECT(s->tlscreds));
     qapi_free_SocketAddress(s->saddr);
     g_free(s->export);
     g_free(s->tlscredsid);
+    g_free(s->x_dirty_bitmap);
 }
 
 static int64_t nbd_getlength(BlockDriverState *bs)
diff --git a/block/stream.c b/block/stream.c
index 6ac1e7bec4..0d3a6ac7c3 100644
--- a/block/stream.c
+++ b/block/stream.c
@@ -22,11 +22,11 @@
 
 enum {
     /*
-     * Size of data buffer for populating the image file.  This should be large
-     * enough to process multiple clusters in a single call, so that populating
-     * contiguous regions of the image is efficient.
+     * Maximum chunk size to feed to copy-on-read.  This should be
+     * large enough to process multiple clusters in a single call, so
+     * that populating contiguous regions of the image is efficient.
      */
-    STREAM_BUFFER_SIZE = 512 * 1024, /* in bytes */
+    STREAM_CHUNK = 512 * 1024, /* in bytes */
 };
 
 typedef struct StreamBlockJob {
@@ -39,13 +39,12 @@ typedef struct StreamBlockJob {
 } StreamBlockJob;
 
 static int coroutine_fn stream_populate(BlockBackend *blk,
-                                        int64_t offset, uint64_t bytes,
-                                        void *buf)
+                                        int64_t offset, uint64_t bytes)
 {
     assert(bytes < SIZE_MAX);
 
-    /* Copy-on-read the unallocated clusters */
-    return blk_co_pread(blk, offset, bytes, buf, BDRV_REQ_COPY_ON_READ);
+    return blk_co_preadv(blk, offset, bytes, NULL,
+                         BDRV_REQ_COPY_ON_READ | BDRV_REQ_PREFETCH);
 }
 
 static void stream_abort(Job *job)
@@ -117,7 +116,6 @@ static int coroutine_fn stream_run(Job *job, Error **errp)
     int error = 0;
     int ret = 0;
     int64_t n = 0; /* bytes */
-    void *buf;
 
     if (bs == s->bottom) {
         /* Nothing to stream */
@@ -130,8 +128,6 @@ static int coroutine_fn stream_run(Job *job, Error **errp)
     }
     job_progress_set_remaining(&s->common.job, len);
 
-    buf = qemu_blockalign(bs, STREAM_BUFFER_SIZE);
-
     /* Turn on copy-on-read for the whole block device so that guest read
      * requests help us make progress.  Only do this when copying the entire
      * backing chain since the copy-on-read operation does not take base into
@@ -154,7 +150,7 @@ static int coroutine_fn stream_run(Job *job, Error **errp)
 
         copy = false;
 
-        ret = bdrv_is_allocated(bs, offset, STREAM_BUFFER_SIZE, &n);
+        ret = bdrv_is_allocated(bs, offset, STREAM_CHUNK, &n);
         if (ret == 1) {
             /* Allocated in the top, no need to copy.  */
         } else if (ret >= 0) {
@@ -171,7 +167,7 @@ static int coroutine_fn stream_run(Job *job, Error **errp)
         }
         trace_stream_one_iteration(s, offset, n, ret);
         if (copy) {
-            ret = stream_populate(blk, offset, n, buf);
+            ret = stream_populate(blk, offset, n);
         }
         if (ret < 0) {
             BlockErrorAction action =
@@ -202,8 +198,6 @@ static int coroutine_fn stream_run(Job *job, Error **errp)
         bdrv_disable_copy_on_read(bs);
     }
 
-    qemu_vfree(buf);
-
     /* Do not remove the backing file if an error was there but ignored. */
     return error;
 }
diff --git a/include/block/block.h b/include/block/block.h
index ae79b70e2d..89e40318cf 100644
--- a/include/block/block.h
+++ b/include/block/block.h
@@ -87,8 +87,14 @@ typedef enum {
      * fallback. */
     BDRV_REQ_NO_FALLBACK        = 0x100,
 
+    /*
+     * BDRV_REQ_PREFETCH may be used only together with BDRV_REQ_COPY_ON_READ
+     * on read request and means that caller doesn't really need data to be
+     * written to qiov parameter which may be NULL.
+     */
+    BDRV_REQ_PREFETCH  = 0x200,
     /* Mask of valid flags */
-    BDRV_REQ_MASK               = 0x1ff,
+    BDRV_REQ_MASK               = 0x3ff,
 } BdrvRequestFlags;
 
 typedef struct BlockSizes {
diff --git a/include/block/nbd.h b/include/block/nbd.h
index bb9f5bc021..7b36d672f0 100644
--- a/include/block/nbd.h
+++ b/include/block/nbd.h
@@ -304,7 +304,8 @@ struct NBDExportInfo {
 };
 typedef struct NBDExportInfo NBDExportInfo;
 
-int nbd_receive_negotiate(QIOChannel *ioc, QCryptoTLSCreds *tlscreds,
+int nbd_receive_negotiate(AioContext *aio_context, QIOChannel *ioc,
+                          QCryptoTLSCreds *tlscreds,
                           const char *hostname, QIOChannel **outioc,
                           NBDExportInfo *info, Error **errp);
 void nbd_free_export_list(NBDExportInfo *info, int count);
diff --git a/nbd/client.c b/nbd/client.c
index d554ae353d..49bf9906f9 100644
--- a/nbd/client.c
+++ b/nbd/client.c
@@ -868,7 +868,8 @@ static int nbd_list_meta_contexts(QIOChannel *ioc,
  *          2: server is newstyle, but lacks structured replies
  *          3: server is newstyle and set up for structured replies
  */
-static int nbd_start_negotiate(QIOChannel *ioc, QCryptoTLSCreds *tlscreds,
+static int nbd_start_negotiate(AioContext *aio_context, QIOChannel *ioc,
+                               QCryptoTLSCreds *tlscreds,
                                const char *hostname, QIOChannel **outioc,
                                bool structured_reply, bool *zeroes,
                                Error **errp)
@@ -935,6 +936,10 @@ static int nbd_start_negotiate(QIOChannel *ioc, QCryptoTLSCreds *tlscreds,
                     return -EINVAL;
                 }
                 ioc = *outioc;
+                if (aio_context) {
+                    qio_channel_set_blocking(ioc, false, NULL);
+                    qio_channel_attach_aio_context(ioc, aio_context);
+                }
             } else {
                 error_setg(errp, "Server does not support STARTTLS");
                 return -EINVAL;
@@ -999,7 +1004,8 @@ static int nbd_negotiate_finish_oldstyle(QIOChannel *ioc, NBDExportInfo *info,
  * Returns: negative errno: failure talking to server
  *          0: server is connected
  */
-int nbd_receive_negotiate(QIOChannel *ioc, QCryptoTLSCreds *tlscreds,
+int nbd_receive_negotiate(AioContext *aio_context, QIOChannel *ioc,
+                          QCryptoTLSCreds *tlscreds,
                           const char *hostname, QIOChannel **outioc,
                           NBDExportInfo *info, Error **errp)
 {
@@ -1010,7 +1016,7 @@ int nbd_receive_negotiate(QIOChannel *ioc, QCryptoTLSCreds *tlscreds,
     assert(info->name);
     trace_nbd_receive_negotiate_name(info->name);
 
-    result = nbd_start_negotiate(ioc, tlscreds, hostname, outioc,
+    result = nbd_start_negotiate(aio_context, ioc, tlscreds, hostname, outioc,
                                  info->structured_reply, &zeroes, errp);
 
     info->structured_reply = false;
@@ -1130,8 +1136,8 @@ int nbd_receive_export_list(QIOChannel *ioc, QCryptoTLSCreds *tlscreds,
     QIOChannel *sioc = NULL;
 
     *info = NULL;
-    result = nbd_start_negotiate(ioc, tlscreds, hostname, &sioc, true, NULL,
-                                 errp);
+    result = nbd_start_negotiate(NULL, ioc, tlscreds, hostname, &sioc, true,
+                                 NULL, errp);
     if (tlscreds && sioc) {
         ioc = sioc;
     }
diff --git a/nbd/server.c b/nbd/server.c
index dbd2ff8555..3eacb89875 100644
--- a/nbd/server.c
+++ b/nbd/server.c
@@ -2105,12 +2105,15 @@ static int nbd_co_receive_request(NBDRequestData *req, NBDRequest *request,
             return -EINVAL;
         }
 
-        req->data = blk_try_blockalign(client->exp->blk, request->len);
-        if (req->data == NULL) {
-            error_setg(errp, "No memory");
-            return -ENOMEM;
+        if (request->type != NBD_CMD_CACHE) {
+            req->data = blk_try_blockalign(client->exp->blk, request->len);
+            if (req->data == NULL) {
+                error_setg(errp, "No memory");
+                return -ENOMEM;
+            }
         }
     }
+
     if (request->type == NBD_CMD_WRITE) {
         if (nbd_read(client->ioc, req->data, request->len, "CMD_WRITE data",
                      errp) < 0)
@@ -2195,7 +2198,7 @@ static coroutine_fn int nbd_do_cmd_read(NBDClient *client, NBDRequest *request,
     int ret;
     NBDExport *exp = client->exp;
 
-    assert(request->type == NBD_CMD_READ || request->type == NBD_CMD_CACHE);
+    assert(request->type == NBD_CMD_READ);
 
     /* XXX: NBD Protocol only documents use of FUA with WRITE */
     if (request->flags & NBD_CMD_FLAG_FUA) {
@@ -2207,7 +2210,7 @@ static coroutine_fn int nbd_do_cmd_read(NBDClient *client, NBDRequest *request,
     }
 
     if (client->structured_reply && !(request->flags & NBD_CMD_FLAG_DF) &&
-        request->len && request->type != NBD_CMD_CACHE)
+        request->len)
     {
         return nbd_co_send_sparse_read(client, request->handle, request->from,
                                        data, request->len, errp);
@@ -2215,7 +2218,7 @@ static coroutine_fn int nbd_do_cmd_read(NBDClient *client, NBDRequest *request,
 
     ret = blk_pread(exp->blk, request->from + exp->dev_offset, data,
                     request->len);
-    if (ret < 0 || request->type == NBD_CMD_CACHE) {
+    if (ret < 0) {
         return nbd_send_generic_reply(client, request->handle, ret,
                                       "reading from file failed", errp);
     }
@@ -2234,6 +2237,28 @@ static coroutine_fn int nbd_do_cmd_read(NBDClient *client, NBDRequest *request,
     }
 }
 
+/*
+ * nbd_do_cmd_cache
+ *
+ * Handle NBD_CMD_CACHE request.
+ * Return -errno if sending fails. Other errors are reported directly to the
+ * client as an error reply.
+ */
+static coroutine_fn int nbd_do_cmd_cache(NBDClient *client, NBDRequest *request,
+                                         Error **errp)
+{
+    int ret;
+    NBDExport *exp = client->exp;
+
+    assert(request->type == NBD_CMD_CACHE);
+
+    ret = blk_co_preadv(exp->blk, request->from + exp->dev_offset, request->len,
+                        NULL, BDRV_REQ_COPY_ON_READ | BDRV_REQ_PREFETCH);
+
+    return nbd_send_generic_reply(client, request->handle, ret,
+                                  "caching data failed", errp);
+}
+
 /* Handle NBD request.
  * Return -errno if sending fails. Other errors are reported directly to the
  * client as an error reply. */
@@ -2247,8 +2272,10 @@ static coroutine_fn int nbd_handle_request(NBDClient *client,
     char *msg;
 
     switch (request->type) {
-    case NBD_CMD_READ:
     case NBD_CMD_CACHE:
+        return nbd_do_cmd_cache(client, request, errp);
+
+    case NBD_CMD_READ:
         return nbd_do_cmd_read(client, request, data, errp);
 
     case NBD_CMD_WRITE:
diff --git a/qapi/block-core.json b/qapi/block-core.json
index 0d43d4f37c..f1e7701fbe 100644
--- a/qapi/block-core.json
+++ b/qapi/block-core.json
@@ -3860,13 +3860,22 @@
 #                  traditional "base:allocation" block status (see
 #                  NBD_OPT_LIST_META_CONTEXT in the NBD protocol) (since 3.0)
 #
+# @reconnect-delay: On an unexpected disconnect, the nbd client tries to
+#                   connect again until succeeding or encountering a serious
+#                   error.  During the first @reconnect-delay seconds, all
+#                   requests are paused and will be rerun on a successful
+#                   reconnect. After that time, any delayed requests and all
+#                   future requests before a successful reconnect will
+#                   immediately fail. Default 0 (Since 4.2)
+#
 # Since: 2.9
 ##
 { 'struct': 'BlockdevOptionsNbd',
   'data': { 'server': 'SocketAddress',
             '*export': 'str',
             '*tls-creds': 'str',
-            '*x-dirty-bitmap': 'str' } }
+            '*x-dirty-bitmap': 'str',
+            '*reconnect-delay': 'uint32' } }
 
 ##
 # @BlockdevOptionsRaw:
diff --git a/qapi/sockets.json b/qapi/sockets.json
index fc81d8d5e8..32375f3a36 100644
--- a/qapi/sockets.json
+++ b/qapi/sockets.json
@@ -53,6 +53,9 @@
 #
 # @ipv6: whether to accept IPv6 addresses, default try both IPv4 and IPv6
 #
+# @keep-alive: enable keep-alive when connecting to this socket. Not supported
+#              for passive sockets. (Since 4.2)
+#
 # Since: 1.3
 ##
 { 'struct': 'InetSocketAddress',
@@ -61,7 +64,8 @@
     '*numeric':  'bool',
     '*to': 'uint16',
     '*ipv4': 'bool',
-    '*ipv6': 'bool' } }
+    '*ipv6': 'bool',
+    '*keep-alive': 'bool' } }
 
 ##
 # @UnixSocketAddress:
diff --git a/qemu-nbd.c b/qemu-nbd.c
index a8cb39e510..049645491d 100644
--- a/qemu-nbd.c
+++ b/qemu-nbd.c
@@ -362,7 +362,7 @@ static void *nbd_client_thread(void *arg)
         goto out;
     }
 
-    ret = nbd_receive_negotiate(QIO_CHANNEL(sioc),
+    ret = nbd_receive_negotiate(NULL, QIO_CHANNEL(sioc),
                                 NULL, NULL, NULL, &info, &local_error);
     if (ret < 0) {
         if (local_error) {
diff --git a/util/qemu-sockets.c b/util/qemu-sockets.c
index a5092dbd12..e3a1666578 100644
--- a/util/qemu-sockets.c
+++ b/util/qemu-sockets.c
@@ -219,6 +219,12 @@ static int inet_listen_saddr(InetSocketAddress *saddr,
     bool socket_created = false;
     Error *err = NULL;
 
+    if (saddr->keep_alive) {
+        error_setg(errp, "keep-alive option is not supported for passive "
+                   "sockets");
+        return -1;
+    }
+
     memset(&ai,0, sizeof(ai));
     ai.ai_flags = AI_PASSIVE;
     if (saddr->has_numeric && saddr->numeric) {
@@ -458,6 +464,19 @@ int inet_connect_saddr(InetSocketAddress *saddr, Error **errp)
     }
 
     freeaddrinfo(res);
+
+    if (saddr->keep_alive) {
+        int val = 1;
+        int ret = qemu_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
+                                  &val, sizeof(val));
+
+        if (ret < 0) {
+            error_setg_errno(errp, errno, "Unable to set KEEPALIVE");
+            close(sock);
+            return -1;
+        }
+    }
+
     return sock;
 }
 
@@ -653,6 +672,15 @@ int inet_parse(InetSocketAddress *addr, const char *str, Error **errp)
         }
         addr->has_ipv6 = true;
     }
+    begin = strstr(optstr, ",keep-alive");
+    if (begin) {
+        if (inet_parse_flag("keep-alive", begin + strlen(",keep-alive"),
+                            &addr->keep_alive, errp) < 0)
+        {
+            return -1;
+        }
+        addr->has_keep_alive = true;
+    }
     return 0;
 }