summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--MAINTAINERS9
-rw-r--r--block/backup.c6
-rw-r--r--block/block-backend.c12
-rw-r--r--block/dirty-bitmap.c26
-rw-r--r--block/export/export.c6
-rw-r--r--block/export/meson.build7
-rw-r--r--block/export/vduse-blk.c374
-rw-r--r--block/export/vduse-blk.h20
-rw-r--r--block/export/vhost-user-blk-server.c263
-rw-r--r--block/export/virtio-blk-handler.c240
-rw-r--r--block/export/virtio-blk-handler.h37
-rw-r--r--block/gluster.c2
-rw-r--r--block/io.c15
-rw-r--r--block/monitor/bitmap-qmp-cmds.c40
-rw-r--r--block/nbd.c8
-rw-r--r--block/rbd.c24
-rw-r--r--docs/tools/qemu-storage-daemon.rst22
-rw-r--r--hw/block/virtio-blk.c1
-rw-r--r--hw/block/xen-block.c1
-rw-r--r--hw/ide/core.c1
-rw-r--r--hw/scsi/scsi-disk.c1
-rw-r--r--hw/scsi/scsi-generic.c1
-rw-r--r--include/block/aio-wait.h2
-rw-r--r--include/block/block-io.h1
-rw-r--r--include/block/block_int-io.h2
-rw-r--r--include/qemu/hbitmap.h15
-rw-r--r--include/sysemu/block-backend-io.h1
-rw-r--r--linux-headers/linux/vduse.h306
-rw-r--r--meson.build34
-rw-r--r--meson_options.txt4
-rw-r--r--qapi/block-export.json29
-rw-r--r--scripts/meson-buildoptions.sh7
-rwxr-xr-xscripts/update-linux-headers.sh2
-rw-r--r--storage-daemon/qemu-storage-daemon.c10
l---------subprojects/libvduse/include/atomic.h1
l---------subprojects/libvduse/include/compiler.h1
-rw-r--r--subprojects/libvduse/libvduse.c1375
-rw-r--r--subprojects/libvduse/libvduse.h247
l---------subprojects/libvduse/linux-headers/linux1
-rw-r--r--subprojects/libvduse/meson.build10
l---------subprojects/libvduse/standard-headers/linux1
-rw-r--r--util/aio-wait.c16
-rw-r--r--util/hbitmap.c25
43 files changed, 2852 insertions, 354 deletions
diff --git a/MAINTAINERS b/MAINTAINERS
index aaa649a50d..1cbd6b72fa 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -3580,6 +3580,8 @@ M: Coiby Xu <Coiby.Xu@gmail.com>
 S: Maintained
 F: block/export/vhost-user-blk-server.c
 F: block/export/vhost-user-blk-server.h
+F: block/export/virtio-blk-handler.c
+F: block/export/virtio-blk-handler.h
 F: include/qemu/vhost-user-server.h
 F: tests/qtest/libqos/vhost-user-blk.c
 F: tests/qtest/libqos/vhost-user-blk.h
@@ -3592,6 +3594,13 @@ L: qemu-block@nongnu.org
 S: Supported
 F: block/export/fuse.c
 
+VDUSE library and block device exports
+M: Xie Yongji <xieyongji@bytedance.com>
+S: Maintained
+F: subprojects/libvduse/
+F: block/export/vduse-blk.c
+F: block/export/vduse-blk.h
+
 Replication
 M: Wen Congyang <wencongyang2@huawei.com>
 M: Xie Changlong <xiechanglong.d@gmail.com>
diff --git a/block/backup.c b/block/backup.c
index 5cfd0b999c..b2b649e305 100644
--- a/block/backup.c
+++ b/block/backup.c
@@ -228,15 +228,13 @@ out:
 
 static void backup_init_bcs_bitmap(BackupBlockJob *job)
 {
-    bool ret;
     uint64_t estimate;
     BdrvDirtyBitmap *bcs_bitmap = block_copy_dirty_bitmap(job->bcs);
 
     if (job->sync_mode == MIRROR_SYNC_MODE_BITMAP) {
         bdrv_clear_dirty_bitmap(bcs_bitmap, NULL);
-        ret = bdrv_dirty_bitmap_merge_internal(bcs_bitmap, job->sync_bitmap,
-                                               NULL, true);
-        assert(ret);
+        bdrv_dirty_bitmap_merge_internal(bcs_bitmap, job->sync_bitmap, NULL,
+                                         true);
     } else if (job->sync_mode == MIRROR_SYNC_MODE_TOP) {
         /*
          * We can't hog the coroutine to initialize this thoroughly.
diff --git a/block/block-backend.c b/block/block-backend.c
index e0e1aff4b1..f425b00793 100644
--- a/block/block-backend.c
+++ b/block/block-backend.c
@@ -56,9 +56,6 @@ struct BlockBackend {
     const BlockDevOps *dev_ops;
     void *dev_opaque;
 
-    /* the block size for which the guest device expects atomicity */
-    int guest_block_size;
-
     /* If the BDS tree is removed, some of its options are stored here (which
      * can be used to restore those options in the new BDS on insert) */
     BlockBackendRootState root_state;
@@ -998,7 +995,6 @@ void blk_detach_dev(BlockBackend *blk, DeviceState *dev)
     blk->dev = NULL;
     blk->dev_ops = NULL;
     blk->dev_opaque = NULL;
-    blk->guest_block_size = 512;
     blk_set_perm(blk, 0, BLK_PERM_ALL, &error_abort);
     blk_unref(blk);
 }
@@ -1062,7 +1058,7 @@ void blk_set_dev_ops(BlockBackend *blk, const BlockDevOps *ops,
     blk->dev_opaque = opaque;
 
     /* Are we currently quiesced? Should we enforce this right now? */
-    if (blk->quiesce_counter && ops->drained_begin) {
+    if (blk->quiesce_counter && ops && ops->drained_begin) {
         ops->drained_begin(opaque);
     }
 }
@@ -2100,12 +2096,6 @@ int blk_get_max_iov(BlockBackend *blk)
     return blk->root->bs->bl.max_iov;
 }
 
-void blk_set_guest_block_size(BlockBackend *blk, int align)
-{
-    IO_CODE();
-    blk->guest_block_size = align;
-}
-
 void *blk_try_blockalign(BlockBackend *blk, size_t size)
 {
     IO_CODE();
diff --git a/block/dirty-bitmap.c b/block/dirty-bitmap.c
index da1b91166f..bf3dc0512a 100644
--- a/block/dirty-bitmap.c
+++ b/block/dirty-bitmap.c
@@ -309,10 +309,7 @@ BdrvDirtyBitmap *bdrv_reclaim_dirty_bitmap_locked(BdrvDirtyBitmap *parent,
         return NULL;
     }
 
-    if (!hbitmap_merge(parent->bitmap, successor->bitmap, parent->bitmap)) {
-        error_setg(errp, "Merging of parent and successor bitmap failed");
-        return NULL;
-    }
+    hbitmap_merge(parent->bitmap, successor->bitmap, parent->bitmap);
 
     parent->disabled = successor->disabled;
     parent->busy = false;
@@ -912,13 +909,15 @@ bool bdrv_merge_dirty_bitmap(BdrvDirtyBitmap *dest, const BdrvDirtyBitmap *src,
         goto out;
     }
 
-    if (!hbitmap_can_merge(dest->bitmap, src->bitmap)) {
-        error_setg(errp, "Bitmaps are incompatible and can't be merged");
+    if (bdrv_dirty_bitmap_size(src) != bdrv_dirty_bitmap_size(dest)) {
+        error_setg(errp, "Bitmaps are of different sizes (destination size is %"
+                   PRId64 ", source size is %" PRId64 ") and can't be merged",
+                   bdrv_dirty_bitmap_size(dest), bdrv_dirty_bitmap_size(src));
         goto out;
     }
 
-    ret = bdrv_dirty_bitmap_merge_internal(dest, src, backup, false);
-    assert(ret);
+    bdrv_dirty_bitmap_merge_internal(dest, src, backup, false);
+    ret = true;
 
 out:
     bdrv_dirty_bitmaps_unlock(dest->bs);
@@ -932,17 +931,16 @@ out:
 /**
  * bdrv_dirty_bitmap_merge_internal: merge src into dest.
  * Does NOT check bitmap permissions; not suitable for use as public API.
+ * @dest, @src and @backup (if not NULL) must have same size.
  *
  * @backup: If provided, make a copy of dest here prior to merge.
  * @lock: If true, lock and unlock bitmaps on the way in/out.
- * returns true if the merge succeeded; false if unattempted.
  */
-bool bdrv_dirty_bitmap_merge_internal(BdrvDirtyBitmap *dest,
+void bdrv_dirty_bitmap_merge_internal(BdrvDirtyBitmap *dest,
                                       const BdrvDirtyBitmap *src,
                                       HBitmap **backup,
                                       bool lock)
 {
-    bool ret;
     IO_CODE();
 
     assert(!bdrv_dirty_bitmap_readonly(dest));
@@ -959,9 +957,9 @@ bool bdrv_dirty_bitmap_merge_internal(BdrvDirtyBitmap *dest,
     if (backup) {
         *backup = dest->bitmap;
         dest->bitmap = hbitmap_alloc(dest->size, hbitmap_granularity(*backup));
-        ret = hbitmap_merge(*backup, src->bitmap, dest->bitmap);
+        hbitmap_merge(*backup, src->bitmap, dest->bitmap);
     } else {
-        ret = hbitmap_merge(dest->bitmap, src->bitmap, dest->bitmap);
+        hbitmap_merge(dest->bitmap, src->bitmap, dest->bitmap);
     }
 
     if (lock) {
@@ -970,6 +968,4 @@ bool bdrv_dirty_bitmap_merge_internal(BdrvDirtyBitmap *dest,
             bdrv_dirty_bitmaps_unlock(src->bs);
         }
     }
-
-    return ret;
 }
diff --git a/block/export/export.c b/block/export/export.c
index 7253af3bc3..4744862915 100644
--- a/block/export/export.c
+++ b/block/export/export.c
@@ -26,6 +26,9 @@
 #ifdef CONFIG_VHOST_USER_BLK_SERVER
 #include "vhost-user-blk-server.h"
 #endif
+#ifdef CONFIG_VDUSE_BLK_EXPORT
+#include "vduse-blk.h"
+#endif
 
 static const BlockExportDriver *blk_exp_drivers[] = {
     &blk_exp_nbd,
@@ -35,6 +38,9 @@ static const BlockExportDriver *blk_exp_drivers[] = {
 #ifdef CONFIG_FUSE
     &blk_exp_fuse,
 #endif
+#ifdef CONFIG_VDUSE_BLK_EXPORT
+    &blk_exp_vduse_blk,
+#endif
 };
 
 /* Only accessed from the main thread */
diff --git a/block/export/meson.build b/block/export/meson.build
index 0a08e384c7..c60116f455 100644
--- a/block/export/meson.build
+++ b/block/export/meson.build
@@ -1,7 +1,12 @@
 blockdev_ss.add(files('export.c'))
 
 if have_vhost_user_blk_server
-    blockdev_ss.add(files('vhost-user-blk-server.c'))
+    blockdev_ss.add(files('vhost-user-blk-server.c', 'virtio-blk-handler.c'))
 endif
 
 blockdev_ss.add(when: fuse, if_true: files('fuse.c'))
+
+if have_vduse_blk_export
+    blockdev_ss.add(files('vduse-blk.c', 'virtio-blk-handler.c'))
+    blockdev_ss.add(libvduse)
+endif
diff --git a/block/export/vduse-blk.c b/block/export/vduse-blk.c
new file mode 100644
index 0000000000..f101c24c3f
--- /dev/null
+++ b/block/export/vduse-blk.c
@@ -0,0 +1,374 @@
+/*
+ * Export QEMU block device via VDUSE
+ *
+ * Copyright (C) 2022 Bytedance Inc. and/or its affiliates. All rights reserved.
+ *
+ * Author:
+ *   Xie Yongji <xieyongji@bytedance.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or
+ * later.  See the COPYING file in the top-level directory.
+ */
+
+#include <sys/eventfd.h>
+
+#include "qemu/osdep.h"
+#include "qapi/error.h"
+#include "block/export.h"
+#include "qemu/error-report.h"
+#include "util/block-helpers.h"
+#include "subprojects/libvduse/libvduse.h"
+#include "virtio-blk-handler.h"
+
+#include "standard-headers/linux/virtio_blk.h"
+
+#define VDUSE_DEFAULT_NUM_QUEUE 1
+#define VDUSE_DEFAULT_QUEUE_SIZE 256
+
+typedef struct VduseBlkExport {
+    BlockExport export;
+    VirtioBlkHandler handler;
+    VduseDev *dev;
+    uint16_t num_queues;
+    char *recon_file;
+    unsigned int inflight;
+} VduseBlkExport;
+
+typedef struct VduseBlkReq {
+    VduseVirtqElement elem;
+    VduseVirtq *vq;
+} VduseBlkReq;
+
+static void vduse_blk_inflight_inc(VduseBlkExport *vblk_exp)
+{
+    vblk_exp->inflight++;
+}
+
+static void vduse_blk_inflight_dec(VduseBlkExport *vblk_exp)
+{
+    if (--vblk_exp->inflight == 0) {
+        aio_wait_kick();
+    }
+}
+
+static void vduse_blk_req_complete(VduseBlkReq *req, size_t in_len)
+{
+    vduse_queue_push(req->vq, &req->elem, in_len);
+    vduse_queue_notify(req->vq);
+
+    free(req);
+}
+
+static void coroutine_fn vduse_blk_virtio_process_req(void *opaque)
+{
+    VduseBlkReq *req = opaque;
+    VduseVirtq *vq = req->vq;
+    VduseDev *dev = vduse_queue_get_dev(vq);
+    VduseBlkExport *vblk_exp = vduse_dev_get_priv(dev);
+    VirtioBlkHandler *handler = &vblk_exp->handler;
+    VduseVirtqElement *elem = &req->elem;
+    struct iovec *in_iov = elem->in_sg;
+    struct iovec *out_iov = elem->out_sg;
+    unsigned in_num = elem->in_num;
+    unsigned out_num = elem->out_num;
+    int in_len;
+
+    in_len = virtio_blk_process_req(handler, in_iov,
+                                    out_iov, in_num, out_num);
+    if (in_len < 0) {
+        free(req);
+        return;
+    }
+
+    vduse_blk_req_complete(req, in_len);
+    vduse_blk_inflight_dec(vblk_exp);
+}
+
+static void vduse_blk_vq_handler(VduseDev *dev, VduseVirtq *vq)
+{
+    VduseBlkExport *vblk_exp = vduse_dev_get_priv(dev);
+
+    while (1) {
+        VduseBlkReq *req;
+
+        req = vduse_queue_pop(vq, sizeof(VduseBlkReq));
+        if (!req) {
+            break;
+        }
+        req->vq = vq;
+
+        Coroutine *co =
+            qemu_coroutine_create(vduse_blk_virtio_process_req, req);
+
+        vduse_blk_inflight_inc(vblk_exp);
+        qemu_coroutine_enter(co);
+    }
+}
+
+static void on_vduse_vq_kick(void *opaque)
+{
+    VduseVirtq *vq = opaque;
+    VduseDev *dev = vduse_queue_get_dev(vq);
+    int fd = vduse_queue_get_fd(vq);
+    eventfd_t kick_data;
+
+    if (eventfd_read(fd, &kick_data) == -1) {
+        error_report("failed to read data from eventfd");
+        return;
+    }
+
+    vduse_blk_vq_handler(dev, vq);
+}
+
+static void vduse_blk_enable_queue(VduseDev *dev, VduseVirtq *vq)
+{
+    VduseBlkExport *vblk_exp = vduse_dev_get_priv(dev);
+
+    aio_set_fd_handler(vblk_exp->export.ctx, vduse_queue_get_fd(vq),
+                       true, on_vduse_vq_kick, NULL, NULL, NULL, vq);
+    /* Make sure we don't miss any kick afer reconnecting */
+    eventfd_write(vduse_queue_get_fd(vq), 1);
+}
+
+static void vduse_blk_disable_queue(VduseDev *dev, VduseVirtq *vq)
+{
+    VduseBlkExport *vblk_exp = vduse_dev_get_priv(dev);
+
+    aio_set_fd_handler(vblk_exp->export.ctx, vduse_queue_get_fd(vq),
+                       true, NULL, NULL, NULL, NULL, NULL);
+}
+
+static const VduseOps vduse_blk_ops = {
+    .enable_queue = vduse_blk_enable_queue,
+    .disable_queue = vduse_blk_disable_queue,
+};
+
+static void on_vduse_dev_kick(void *opaque)
+{
+    VduseDev *dev = opaque;
+
+    vduse_dev_handler(dev);
+}
+
+static void vduse_blk_attach_ctx(VduseBlkExport *vblk_exp, AioContext *ctx)
+{
+    int i;
+
+    aio_set_fd_handler(vblk_exp->export.ctx, vduse_dev_get_fd(vblk_exp->dev),
+                       true, on_vduse_dev_kick, NULL, NULL, NULL,
+                       vblk_exp->dev);
+
+    for (i = 0; i < vblk_exp->num_queues; i++) {
+        VduseVirtq *vq = vduse_dev_get_queue(vblk_exp->dev, i);
+        int fd = vduse_queue_get_fd(vq);
+
+        if (fd < 0) {
+            continue;
+        }
+        aio_set_fd_handler(vblk_exp->export.ctx, fd, true,
+                           on_vduse_vq_kick, NULL, NULL, NULL, vq);
+    }
+}
+
+static void vduse_blk_detach_ctx(VduseBlkExport *vblk_exp)
+{
+    int i;
+
+    for (i = 0; i < vblk_exp->num_queues; i++) {
+        VduseVirtq *vq = vduse_dev_get_queue(vblk_exp->dev, i);
+        int fd = vduse_queue_get_fd(vq);
+
+        if (fd < 0) {
+            continue;
+        }
+        aio_set_fd_handler(vblk_exp->export.ctx, fd,
+                           true, NULL, NULL, NULL, NULL, NULL);
+    }
+    aio_set_fd_handler(vblk_exp->export.ctx, vduse_dev_get_fd(vblk_exp->dev),
+                       true, NULL, NULL, NULL, NULL, NULL);
+
+    AIO_WAIT_WHILE(vblk_exp->export.ctx, vblk_exp->inflight > 0);
+}
+
+
+static void blk_aio_attached(AioContext *ctx, void *opaque)
+{
+    VduseBlkExport *vblk_exp = opaque;
+
+    vblk_exp->export.ctx = ctx;
+    vduse_blk_attach_ctx(vblk_exp, ctx);
+}
+
+static void blk_aio_detach(void *opaque)
+{
+    VduseBlkExport *vblk_exp = opaque;
+
+    vduse_blk_detach_ctx(vblk_exp);
+    vblk_exp->export.ctx = NULL;
+}
+
+static void vduse_blk_resize(void *opaque)
+{
+    BlockExport *exp = opaque;
+    VduseBlkExport *vblk_exp = container_of(exp, VduseBlkExport, export);
+    struct virtio_blk_config config;
+
+    config.capacity =
+            cpu_to_le64(blk_getlength(exp->blk) >> VIRTIO_BLK_SECTOR_BITS);
+    vduse_dev_update_config(vblk_exp->dev, sizeof(config.capacity),
+                            offsetof(struct virtio_blk_config, capacity),
+                            (char *)&config.capacity);
+}
+
+static const BlockDevOps vduse_block_ops = {
+    .resize_cb = vduse_blk_resize,
+};
+
+static int vduse_blk_exp_create(BlockExport *exp, BlockExportOptions *opts,
+                                Error **errp)
+{
+    VduseBlkExport *vblk_exp = container_of(exp, VduseBlkExport, export);
+    BlockExportOptionsVduseBlk *vblk_opts = &opts->u.vduse_blk;
+    uint64_t logical_block_size = VIRTIO_BLK_SECTOR_SIZE;
+    uint16_t num_queues = VDUSE_DEFAULT_NUM_QUEUE;
+    uint16_t queue_size = VDUSE_DEFAULT_QUEUE_SIZE;
+    Error *local_err = NULL;
+    struct virtio_blk_config config = { 0 };
+    uint64_t features;
+    int i, ret;
+
+    if (vblk_opts->has_num_queues) {
+        num_queues = vblk_opts->num_queues;
+        if (num_queues == 0) {
+            error_setg(errp, "num-queues must be greater than 0");
+            return -EINVAL;
+        }
+    }
+
+    if (vblk_opts->has_queue_size) {
+        queue_size = vblk_opts->queue_size;
+        if (queue_size <= 2 || !is_power_of_2(queue_size) ||
+            queue_size > VIRTQUEUE_MAX_SIZE) {
+            error_setg(errp, "queue-size is invalid");
+            return -EINVAL;
+        }
+    }
+
+    if (vblk_opts->has_logical_block_size) {
+        logical_block_size = vblk_opts->logical_block_size;
+        check_block_size(exp->id, "logical-block-size", logical_block_size,
+                         &local_err);
+        if (local_err) {
+            error_propagate(errp, local_err);
+            return -EINVAL;
+        }
+    }
+    vblk_exp->num_queues = num_queues;
+    vblk_exp->handler.blk = exp->blk;
+    vblk_exp->handler.serial = g_strdup(vblk_opts->has_serial ?
+                                        vblk_opts->serial : "");
+    vblk_exp->handler.logical_block_size = logical_block_size;
+    vblk_exp->handler.writable = opts->writable;
+
+    config.capacity =
+            cpu_to_le64(blk_getlength(exp->blk) >> VIRTIO_BLK_SECTOR_BITS);
+    config.seg_max = cpu_to_le32(queue_size - 2);
+    config.min_io_size = cpu_to_le16(1);
+    config.opt_io_size = cpu_to_le32(1);
+    config.num_queues = cpu_to_le16(num_queues);
+    config.blk_size = cpu_to_le32(logical_block_size);
+    config.max_discard_sectors = cpu_to_le32(VIRTIO_BLK_MAX_DISCARD_SECTORS);
+    config.max_discard_seg = cpu_to_le32(1);
+    config.discard_sector_alignment =
+        cpu_to_le32(logical_block_size >> VIRTIO_BLK_SECTOR_BITS);
+    config.max_write_zeroes_sectors =
+        cpu_to_le32(VIRTIO_BLK_MAX_WRITE_ZEROES_SECTORS);
+    config.max_write_zeroes_seg = cpu_to_le32(1);
+
+    features = vduse_get_virtio_features() |
+               (1ULL << VIRTIO_BLK_F_SEG_MAX) |
+               (1ULL << VIRTIO_BLK_F_TOPOLOGY) |
+               (1ULL << VIRTIO_BLK_F_BLK_SIZE) |
+               (1ULL << VIRTIO_BLK_F_FLUSH) |
+               (1ULL << VIRTIO_BLK_F_DISCARD) |
+               (1ULL << VIRTIO_BLK_F_WRITE_ZEROES);
+
+    if (num_queues > 1) {
+        features |= 1ULL << VIRTIO_BLK_F_MQ;
+    }
+    if (!opts->writable) {
+        features |= 1ULL << VIRTIO_BLK_F_RO;
+    }
+
+    vblk_exp->dev = vduse_dev_create(vblk_opts->name, VIRTIO_ID_BLOCK, 0,
+                                     features, num_queues,
+                                     sizeof(struct virtio_blk_config),
+                                     (char *)&config, &vduse_blk_ops,
+                                     vblk_exp);
+    if (!vblk_exp->dev) {
+        error_setg(errp, "failed to create vduse device");
+        ret = -ENOMEM;
+        goto err_dev;
+    }
+
+    vblk_exp->recon_file = g_strdup_printf("%s/vduse-blk-%s",
+                                           g_get_tmp_dir(), vblk_opts->name);
+    if (vduse_set_reconnect_log_file(vblk_exp->dev, vblk_exp->recon_file)) {
+        error_setg(errp, "failed to set reconnect log file");
+        ret = -EINVAL;
+        goto err;
+    }
+
+    for (i = 0; i < num_queues; i++) {
+        vduse_dev_setup_queue(vblk_exp->dev, i, queue_size);
+    }
+
+    aio_set_fd_handler(exp->ctx, vduse_dev_get_fd(vblk_exp->dev), true,
+                       on_vduse_dev_kick, NULL, NULL, NULL, vblk_exp->dev);
+
+    blk_add_aio_context_notifier(exp->blk, blk_aio_attached, blk_aio_detach,
+                                 vblk_exp);
+
+    blk_set_dev_ops(exp->blk, &vduse_block_ops, exp);
+
+    return 0;
+err:
+    vduse_dev_destroy(vblk_exp->dev);
+    g_free(vblk_exp->recon_file);
+err_dev:
+    g_free(vblk_exp->handler.serial);
+    return ret;
+}
+
+static void vduse_blk_exp_delete(BlockExport *exp)
+{
+    VduseBlkExport *vblk_exp = container_of(exp, VduseBlkExport, export);
+    int ret;
+
+    blk_remove_aio_context_notifier(exp->blk, blk_aio_attached, blk_aio_detach,
+                                    vblk_exp);
+    blk_set_dev_ops(exp->blk, NULL, NULL);
+    ret = vduse_dev_destroy(vblk_exp->dev);
+    if (ret != -EBUSY) {
+        unlink(vblk_exp->recon_file);
+    }
+    g_free(vblk_exp->recon_file);
+    g_free(vblk_exp->handler.serial);
+}
+
+static void vduse_blk_exp_request_shutdown(BlockExport *exp)
+{
+    VduseBlkExport *vblk_exp = container_of(exp, VduseBlkExport, export);
+
+    aio_context_acquire(vblk_exp->export.ctx);
+    vduse_blk_detach_ctx(vblk_exp);
+    aio_context_acquire(vblk_exp->export.ctx);
+}
+
+const BlockExportDriver blk_exp_vduse_blk = {
+    .type               = BLOCK_EXPORT_TYPE_VDUSE_BLK,
+    .instance_size      = sizeof(VduseBlkExport),
+    .create             = vduse_blk_exp_create,
+    .delete             = vduse_blk_exp_delete,
+    .request_shutdown   = vduse_blk_exp_request_shutdown,
+};
diff --git a/block/export/vduse-blk.h b/block/export/vduse-blk.h
new file mode 100644
index 0000000000..c4eeb1b70e
--- /dev/null
+++ b/block/export/vduse-blk.h
@@ -0,0 +1,20 @@
+/*
+ * Export QEMU block device via VDUSE
+ *
+ * Copyright (C) 2022 Bytedance Inc. and/or its affiliates. All rights reserved.
+ *
+ * Author:
+ *   Xie Yongji <xieyongji@bytedance.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or
+ * later.  See the COPYING file in the top-level directory.
+ */
+
+#ifndef VDUSE_BLK_H
+#define VDUSE_BLK_H
+
+#include "block/export.h"
+
+extern const BlockExportDriver blk_exp_vduse_blk;
+
+#endif /* VDUSE_BLK_H */
diff --git a/block/export/vhost-user-blk-server.c b/block/export/vhost-user-blk-server.c
index a129204c44..3409d9e02e 100644
--- a/block/export/vhost-user-blk-server.c
+++ b/block/export/vhost-user-blk-server.c
@@ -17,31 +17,15 @@
 #include "vhost-user-blk-server.h"
 #include "qapi/error.h"
 #include "qom/object_interfaces.h"
-#include "sysemu/block-backend.h"
 #include "util/block-helpers.h"
-
-/*
- * Sector units are 512 bytes regardless of the
- * virtio_blk_config->blk_size value.
- */
-#define VIRTIO_BLK_SECTOR_BITS 9
-#define VIRTIO_BLK_SECTOR_SIZE (1ull << VIRTIO_BLK_SECTOR_BITS)
+#include "virtio-blk-handler.h"
 
 enum {
     VHOST_USER_BLK_NUM_QUEUES_DEFAULT = 1,
-    VHOST_USER_BLK_MAX_DISCARD_SECTORS = 32768,
-    VHOST_USER_BLK_MAX_WRITE_ZEROES_SECTORS = 32768,
-};
-struct virtio_blk_inhdr {
-    unsigned char status;
 };
 
 typedef struct VuBlkReq {
     VuVirtqElement elem;
-    int64_t sector_num;
-    size_t size;
-    struct virtio_blk_inhdr *in;
-    struct virtio_blk_outhdr out;
     VuServer *server;
     struct VuVirtq *vq;
 } VuBlkReq;
@@ -50,248 +34,44 @@ typedef struct VuBlkReq {
 typedef struct {
     BlockExport export;
     VuServer vu_server;
-    uint32_t blk_size;
+    VirtioBlkHandler handler;
     QIOChannelSocket *sioc;
     struct virtio_blk_config blkcfg;
-    bool writable;
 } VuBlkExport;
 
-static void vu_blk_req_complete(VuBlkReq *req)
+static void vu_blk_req_complete(VuBlkReq *req, size_t in_len)
 {
     VuDev *vu_dev = &req->server->vu_dev;
 
-    /* IO size with 1 extra status byte */
-    vu_queue_push(vu_dev, req->vq, &req->elem, req->size + 1);
+    vu_queue_push(vu_dev, req->vq, &req->elem, in_len);
     vu_queue_notify(vu_dev, req->vq);
 
     free(req);
 }
 
-static bool vu_blk_sect_range_ok(VuBlkExport *vexp, uint64_t sector,
-                                 size_t size)
-{
-    uint64_t nb_sectors;
-    uint64_t total_sectors;
-
-    if (size % VIRTIO_BLK_SECTOR_SIZE) {
-        return false;
-    }
-
-    nb_sectors = size >> VIRTIO_BLK_SECTOR_BITS;
-
-    QEMU_BUILD_BUG_ON(BDRV_SECTOR_SIZE != VIRTIO_BLK_SECTOR_SIZE);
-    if (nb_sectors > BDRV_REQUEST_MAX_SECTORS) {
-        return false;
-    }
-    if ((sector << VIRTIO_BLK_SECTOR_BITS) % vexp->blk_size) {
-        return false;
-    }
-    blk_get_geometry(vexp->export.blk, &total_sectors);
-    if (sector > total_sectors || nb_sectors > total_sectors - sector) {
-        return false;
-    }
-    return true;
-}
-
-static int coroutine_fn
-vu_blk_discard_write_zeroes(VuBlkExport *vexp, struct iovec *iov,
-                            uint32_t iovcnt, uint32_t type)
-{
-    BlockBackend *blk = vexp->export.blk;
-    struct virtio_blk_discard_write_zeroes desc;
-    ssize_t size;
-    uint64_t sector;
-    uint32_t num_sectors;
-    uint32_t max_sectors;
-    uint32_t flags;
-    int bytes;
-
-    /* Only one desc is currently supported */
-    if (unlikely(iov_size(iov, iovcnt) > sizeof(desc))) {
-        return VIRTIO_BLK_S_UNSUPP;
-    }
-
-    size = iov_to_buf(iov, iovcnt, 0, &desc, sizeof(desc));
-    if (unlikely(size != sizeof(desc))) {
-        error_report("Invalid size %zd, expected %zu", size, sizeof(desc));
-        return VIRTIO_BLK_S_IOERR;
-    }
-
-    sector = le64_to_cpu(desc.sector);
-    num_sectors = le32_to_cpu(desc.num_sectors);
-    flags = le32_to_cpu(desc.flags);
-    max_sectors = (type == VIRTIO_BLK_T_WRITE_ZEROES) ?
-                  VHOST_USER_BLK_MAX_WRITE_ZEROES_SECTORS :
-                  VHOST_USER_BLK_MAX_DISCARD_SECTORS;
-
-    /* This check ensures that 'bytes' fits in an int */
-    if (unlikely(num_sectors > max_sectors)) {
-        return VIRTIO_BLK_S_IOERR;
-    }
-
-    bytes = num_sectors << VIRTIO_BLK_SECTOR_BITS;
-
-    if (unlikely(!vu_blk_sect_range_ok(vexp, sector, bytes))) {
-        return VIRTIO_BLK_S_IOERR;
-    }
-
-    /*
-     * The device MUST set the status byte to VIRTIO_BLK_S_UNSUPP for discard
-     * and write zeroes commands if any unknown flag is set.
-     */
-    if (unlikely(flags & ~VIRTIO_BLK_WRITE_ZEROES_FLAG_UNMAP)) {
-        return VIRTIO_BLK_S_UNSUPP;
-    }
-
-    if (type == VIRTIO_BLK_T_WRITE_ZEROES) {
-        int blk_flags = 0;
-
-        if (flags & VIRTIO_BLK_WRITE_ZEROES_FLAG_UNMAP) {
-            blk_flags |= BDRV_REQ_MAY_UNMAP;
-        }
-
-        if (blk_co_pwrite_zeroes(blk, sector << VIRTIO_BLK_SECTOR_BITS,
-                                 bytes, blk_flags) == 0) {
-            return VIRTIO_BLK_S_OK;
-        }
-    } else if (type == VIRTIO_BLK_T_DISCARD) {
-        /*
-         * The device MUST set the status byte to VIRTIO_BLK_S_UNSUPP for
-         * discard commands if the unmap flag is set.
-         */
-        if (unlikely(flags & VIRTIO_BLK_WRITE_ZEROES_FLAG_UNMAP)) {
-            return VIRTIO_BLK_S_UNSUPP;
-        }
-
-        if (blk_co_pdiscard(blk, sector << VIRTIO_BLK_SECTOR_BITS,
-                            bytes) == 0) {
-            return VIRTIO_BLK_S_OK;
-        }
-    }
-
-    return VIRTIO_BLK_S_IOERR;
-}
-
 /* Called with server refcount increased, must decrease before returning */
 static void coroutine_fn vu_blk_virtio_process_req(void *opaque)
 {
     VuBlkReq *req = opaque;
     VuServer *server = req->server;
     VuVirtqElement *elem = &req->elem;
-    uint32_t type;
-
     VuBlkExport *vexp = container_of(server, VuBlkExport, vu_server);
-    BlockBackend *blk = vexp->export.blk;
-
+    VirtioBlkHandler *handler = &vexp->handler;
     struct iovec *in_iov = elem->in_sg;
     struct iovec *out_iov = elem->out_sg;
     unsigned in_num = elem->in_num;
     unsigned out_num = elem->out_num;
-
-    /* refer to hw/block/virtio_blk.c */
-    if (elem->out_num < 1 || elem->in_num < 1) {
-        error_report("virtio-blk request missing headers");
-        goto err;
-    }
-
-    if (unlikely(iov_to_buf(out_iov, out_num, 0, &req->out,
-                            sizeof(req->out)) != sizeof(req->out))) {
-        error_report("virtio-blk request outhdr too short");
-        goto err;
-    }
-
-    iov_discard_front(&out_iov, &out_num, sizeof(req->out));
-
-    if (in_iov[in_num - 1].iov_len < sizeof(struct virtio_blk_inhdr)) {
-        error_report("virtio-blk request inhdr too short");
-        goto err;
-    }
-
-    /* We always touch the last byte, so just see how big in_iov is.  */
-    req->in = (void *)in_iov[in_num - 1].iov_base
-              + in_iov[in_num - 1].iov_len
-              - sizeof(struct virtio_blk_inhdr);
-    iov_discard_back(in_iov, &in_num, sizeof(struct virtio_blk_inhdr));
-
-    type = le32_to_cpu(req->out.type);
-    switch (type & ~VIRTIO_BLK_T_BARRIER) {
-    case VIRTIO_BLK_T_IN:
-    case VIRTIO_BLK_T_OUT: {
-        QEMUIOVector qiov;
-        int64_t offset;
-        ssize_t ret = 0;
-        bool is_write = type & VIRTIO_BLK_T_OUT;
-        req->sector_num = le64_to_cpu(req->out.sector);
-
-        if (is_write && !vexp->writable) {
-            req->in->status = VIRTIO_BLK_S_IOERR;
-            break;
-        }
-
-        if (is_write) {
-            qemu_iovec_init_external(&qiov, out_iov, out_num);
-        } else {
-            qemu_iovec_init_external(&qiov, in_iov, in_num);
-        }
-
-        if (unlikely(!vu_blk_sect_range_ok(vexp,
-                                           req->sector_num,
-                                           qiov.size))) {
-            req->in->status = VIRTIO_BLK_S_IOERR;
-            break;
-        }
-
-        offset = req->sector_num << VIRTIO_BLK_SECTOR_BITS;
-
-        if (is_write) {
-            ret = blk_co_pwritev(blk, offset, qiov.size, &qiov, 0);
-        } else {
-            ret = blk_co_preadv(blk, offset, qiov.size, &qiov, 0);
-        }
-        if (ret >= 0) {
-            req->in->status = VIRTIO_BLK_S_OK;
-        } else {
-            req->in->status = VIRTIO_BLK_S_IOERR;
-        }
-        break;
-    }
-    case VIRTIO_BLK_T_FLUSH:
-        if (blk_co_flush(blk) == 0) {
-            req->in->status = VIRTIO_BLK_S_OK;
-        } else {
-            req->in->status = VIRTIO_BLK_S_IOERR;
-        }
-        break;
-    case VIRTIO_BLK_T_GET_ID: {
-        size_t size = MIN(iov_size(&elem->in_sg[0], in_num),
-                          VIRTIO_BLK_ID_BYTES);
-        snprintf(elem->in_sg[0].iov_base, size, "%s", "vhost_user_blk");
-        req->in->status = VIRTIO_BLK_S_OK;
-        req->size = elem->in_sg[0].iov_len;
-        break;
+    int in_len;
+
+    in_len = virtio_blk_process_req(handler, in_iov, out_iov,
+                                    in_num, out_num);
+    if (in_len < 0) {
+        free(req);
+        vhost_user_server_unref(server);
+        return;
     }
-    case VIRTIO_BLK_T_DISCARD:
-    case VIRTIO_BLK_T_WRITE_ZEROES: {
-        if (!vexp->writable) {
-            req->in->status = VIRTIO_BLK_S_IOERR;
-            break;
-        }
-
-        req->in->status = vu_blk_discard_write_zeroes(vexp, out_iov, out_num,
-                                                      type);
-        break;
-    }
-    default:
-        req->in->status = VIRTIO_BLK_S_UNSUPP;
-        break;
-    }
-
-    vu_blk_req_complete(req);
-    vhost_user_server_unref(server);
-    return;
 
-err:
-    free(req);
+    vu_blk_req_complete(req, in_len);
     vhost_user_server_unref(server);
 }
 
@@ -348,7 +128,7 @@ static uint64_t vu_blk_get_features(VuDev *dev)
                1ull << VIRTIO_RING_F_EVENT_IDX |
                1ull << VHOST_USER_F_PROTOCOL_FEATURES;
 
-    if (!vexp->writable) {
+    if (!vexp->handler.writable) {
         features |= 1ull << VIRTIO_BLK_F_RO;
     }
 
@@ -455,12 +235,12 @@ vu_blk_initialize_config(BlockDriverState *bs,
     config->opt_io_size = cpu_to_le32(1);
     config->num_queues = cpu_to_le16(num_queues);
     config->max_discard_sectors =
-        cpu_to_le32(VHOST_USER_BLK_MAX_DISCARD_SECTORS);
+        cpu_to_le32(VIRTIO_BLK_MAX_DISCARD_SECTORS);
     config->max_discard_seg = cpu_to_le32(1);
     config->discard_sector_alignment =
         cpu_to_le32(blk_size >> VIRTIO_BLK_SECTOR_BITS);
     config->max_write_zeroes_sectors
-        = cpu_to_le32(VHOST_USER_BLK_MAX_WRITE_ZEROES_SECTORS);
+        = cpu_to_le32(VIRTIO_BLK_MAX_WRITE_ZEROES_SECTORS);
     config->max_write_zeroes_seg = cpu_to_le32(1);
 }
 
@@ -480,7 +260,6 @@ static int vu_blk_exp_create(BlockExport *exp, BlockExportOptions *opts,
     uint64_t logical_block_size;
     uint16_t num_queues = VHOST_USER_BLK_NUM_QUEUES_DEFAULT;
 
-    vexp->writable = opts->writable;
     vexp->blkcfg.wce = 0;
 
     if (vu_opts->has_logical_block_size) {
@@ -494,8 +273,6 @@ static int vu_blk_exp_create(BlockExport *exp, BlockExportOptions *opts,
         error_propagate(errp, local_err);
         return -EINVAL;
     }
-    vexp->blk_size = logical_block_size;
-    blk_set_guest_block_size(exp->blk, logical_block_size);
 
     if (vu_opts->has_num_queues) {
         num_queues = vu_opts->num_queues;
@@ -504,6 +281,10 @@ static int vu_blk_exp_create(BlockExport *exp, BlockExportOptions *opts,
         error_setg(errp, "num-queues must be greater than 0");
         return -EINVAL;
     }
+    vexp->handler.blk = exp->blk;
+    vexp->handler.serial = g_strdup("vhost_user_blk");
+    vexp->handler.logical_block_size = logical_block_size;
+    vexp->handler.writable = opts->writable;
 
     vu_blk_initialize_config(blk_bs(exp->blk), &vexp->blkcfg,
                              logical_block_size, num_queues);
@@ -515,6 +296,7 @@ static int vu_blk_exp_create(BlockExport *exp, BlockExportOptions *opts,
                                  num_queues, &vu_blk_iface, errp)) {
         blk_remove_aio_context_notifier(exp->blk, blk_aio_attached,
                                         blk_aio_detach, vexp);
+        g_free(vexp->handler.serial);
         return -EADDRNOTAVAIL;
     }
 
@@ -527,6 +309,7 @@ static void vu_blk_exp_delete(BlockExport *exp)
 
     blk_remove_aio_context_notifier(exp->blk, blk_aio_attached, blk_aio_detach,
                                     vexp);
+    g_free(vexp->handler.serial);
 }
 
 const BlockExportDriver blk_exp_vhost_user_blk = {
diff --git a/block/export/virtio-blk-handler.c b/block/export/virtio-blk-handler.c
new file mode 100644
index 0000000000..313666e8ab
--- /dev/null
+++ b/block/export/virtio-blk-handler.c
@@ -0,0 +1,240 @@
+/*
+ * Handler for virtio-blk I/O
+ *
+ * Copyright (c) 2020 Red Hat, Inc.
+ * Copyright (C) 2022 Bytedance Inc. and/or its affiliates. All rights reserved.
+ *
+ * Author:
+ *   Coiby Xu <coiby.xu@gmail.com>
+ *   Xie Yongji <xieyongji@bytedance.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or
+ * later.  See the COPYING file in the top-level directory.
+ */
+
+#include "qemu/osdep.h"
+#include "qemu/error-report.h"
+#include "virtio-blk-handler.h"
+
+#include "standard-headers/linux/virtio_blk.h"
+
+struct virtio_blk_inhdr {
+    unsigned char status;
+};
+
+static bool virtio_blk_sect_range_ok(BlockBackend *blk, uint32_t block_size,
+                                     uint64_t sector, size_t size)
+{
+    uint64_t nb_sectors;
+    uint64_t total_sectors;
+
+    if (size % VIRTIO_BLK_SECTOR_SIZE) {
+        return false;
+    }
+
+    nb_sectors = size >> VIRTIO_BLK_SECTOR_BITS;
+
+    QEMU_BUILD_BUG_ON(BDRV_SECTOR_SIZE != VIRTIO_BLK_SECTOR_SIZE);
+    if (nb_sectors > BDRV_REQUEST_MAX_SECTORS) {
+        return false;
+    }
+    if ((sector << VIRTIO_BLK_SECTOR_BITS) % block_size) {
+        return false;
+    }
+    blk_get_geometry(blk, &total_sectors);
+    if (sector > total_sectors || nb_sectors > total_sectors - sector) {
+        return false;
+    }
+    return true;
+}
+
+static int coroutine_fn
+virtio_blk_discard_write_zeroes(VirtioBlkHandler *handler, struct iovec *iov,
+                                uint32_t iovcnt, uint32_t type)
+{
+    BlockBackend *blk = handler->blk;
+    struct virtio_blk_discard_write_zeroes desc;
+    ssize_t size;
+    uint64_t sector;
+    uint32_t num_sectors;
+    uint32_t max_sectors;
+    uint32_t flags;
+    int bytes;
+
+    /* Only one desc is currently supported */
+    if (unlikely(iov_size(iov, iovcnt) > sizeof(desc))) {
+        return VIRTIO_BLK_S_UNSUPP;
+    }
+
+    size = iov_to_buf(iov, iovcnt, 0, &desc, sizeof(desc));
+    if (unlikely(size != sizeof(desc))) {
+        error_report("Invalid size %zd, expected %zu", size, sizeof(desc));
+        return VIRTIO_BLK_S_IOERR;
+    }
+
+    sector = le64_to_cpu(desc.sector);
+    num_sectors = le32_to_cpu(desc.num_sectors);
+    flags = le32_to_cpu(desc.flags);
+    max_sectors = (type == VIRTIO_BLK_T_WRITE_ZEROES) ?
+                  VIRTIO_BLK_MAX_WRITE_ZEROES_SECTORS :
+                  VIRTIO_BLK_MAX_DISCARD_SECTORS;
+
+    /* This check ensures that 'bytes' fits in an int */
+    if (unlikely(num_sectors > max_sectors)) {
+        return VIRTIO_BLK_S_IOERR;
+    }
+
+    bytes = num_sectors << VIRTIO_BLK_SECTOR_BITS;
+
+    if (unlikely(!virtio_blk_sect_range_ok(blk, handler->logical_block_size,
+                                           sector, bytes))) {
+        return VIRTIO_BLK_S_IOERR;
+    }
+
+    /*
+     * The device MUST set the status byte to VIRTIO_BLK_S_UNSUPP for discard
+     * and write zeroes commands if any unknown flag is set.
+     */
+    if (unlikely(flags & ~VIRTIO_BLK_WRITE_ZEROES_FLAG_UNMAP)) {
+        return VIRTIO_BLK_S_UNSUPP;
+    }
+
+    if (type == VIRTIO_BLK_T_WRITE_ZEROES) {
+        int blk_flags = 0;
+
+        if (flags & VIRTIO_BLK_WRITE_ZEROES_FLAG_UNMAP) {
+            blk_flags |= BDRV_REQ_MAY_UNMAP;
+        }
+
+        if (blk_co_pwrite_zeroes(blk, sector << VIRTIO_BLK_SECTOR_BITS,
+                                 bytes, blk_flags) == 0) {
+            return VIRTIO_BLK_S_OK;
+        }
+    } else if (type == VIRTIO_BLK_T_DISCARD) {
+        /*
+         * The device MUST set the status byte to VIRTIO_BLK_S_UNSUPP for
+         * discard commands if the unmap flag is set.
+         */
+        if (unlikely(flags & VIRTIO_BLK_WRITE_ZEROES_FLAG_UNMAP)) {
+            return VIRTIO_BLK_S_UNSUPP;
+        }
+
+        if (blk_co_pdiscard(blk, sector << VIRTIO_BLK_SECTOR_BITS,
+                            bytes) == 0) {
+            return VIRTIO_BLK_S_OK;
+        }
+    }
+
+    return VIRTIO_BLK_S_IOERR;
+}
+
+int coroutine_fn virtio_blk_process_req(VirtioBlkHandler *handler,
+                                        struct iovec *in_iov,
+                                        struct iovec *out_iov,
+                                        unsigned int in_num,
+                                        unsigned int out_num)
+{
+    BlockBackend *blk = handler->blk;
+    struct virtio_blk_inhdr *in;
+    struct virtio_blk_outhdr out;
+    uint32_t type;
+    int in_len;
+
+    if (out_num < 1 || in_num < 1) {
+        error_report("virtio-blk request missing headers");
+        return -EINVAL;
+    }
+
+    if (unlikely(iov_to_buf(out_iov, out_num, 0, &out,
+                            sizeof(out)) != sizeof(out))) {
+        error_report("virtio-blk request outhdr too short");
+        return -EINVAL;
+    }
+
+    iov_discard_front(&out_iov, &out_num, sizeof(out));
+
+    if (in_iov[in_num - 1].iov_len < sizeof(struct virtio_blk_inhdr)) {
+        error_report("virtio-blk request inhdr too short");
+        return -EINVAL;
+    }
+
+    /* We always touch the last byte, so just see how big in_iov is. */
+    in_len = iov_size(in_iov, in_num);
+    in = (void *)in_iov[in_num - 1].iov_base
+                 + in_iov[in_num - 1].iov_len
+                 - sizeof(struct virtio_blk_inhdr);
+    iov_discard_back(in_iov, &in_num, sizeof(struct virtio_blk_inhdr));
+
+    type = le32_to_cpu(out.type);
+    switch (type & ~VIRTIO_BLK_T_BARRIER) {
+    case VIRTIO_BLK_T_IN:
+    case VIRTIO_BLK_T_OUT: {
+        QEMUIOVector qiov;
+        int64_t offset;
+        ssize_t ret = 0;
+        bool is_write = type & VIRTIO_BLK_T_OUT;
+        int64_t sector_num = le64_to_cpu(out.sector);
+
+        if (is_write && !handler->writable) {
+            in->status = VIRTIO_BLK_S_IOERR;
+            break;
+        }
+
+        if (is_write) {
+            qemu_iovec_init_external(&qiov, out_iov, out_num);
+        } else {
+            qemu_iovec_init_external(&qiov, in_iov, in_num);
+        }
+
+        if (unlikely(!virtio_blk_sect_range_ok(blk,
+                                               handler->logical_block_size,
+                                               sector_num, qiov.size))) {
+            in->status = VIRTIO_BLK_S_IOERR;
+            break;
+        }
+
+        offset = sector_num << VIRTIO_BLK_SECTOR_BITS;
+
+        if (is_write) {
+            ret = blk_co_pwritev(blk, offset, qiov.size, &qiov, 0);
+        } else {
+            ret = blk_co_preadv(blk, offset, qiov.size, &qiov, 0);
+        }
+        if (ret >= 0) {
+            in->status = VIRTIO_BLK_S_OK;
+        } else {
+            in->status = VIRTIO_BLK_S_IOERR;
+        }
+        break;
+    }
+    case VIRTIO_BLK_T_FLUSH:
+        if (blk_co_flush(blk) == 0) {
+            in->status = VIRTIO_BLK_S_OK;
+        } else {
+            in->status = VIRTIO_BLK_S_IOERR;
+        }
+        break;
+    case VIRTIO_BLK_T_GET_ID: {
+        size_t size = MIN(strlen(handler->serial) + 1,
+                          MIN(iov_size(in_iov, in_num),
+                              VIRTIO_BLK_ID_BYTES));
+        iov_from_buf(in_iov, in_num, 0, handler->serial, size);
+        in->status = VIRTIO_BLK_S_OK;
+        break;
+    }
+    case VIRTIO_BLK_T_DISCARD:
+    case VIRTIO_BLK_T_WRITE_ZEROES:
+        if (!handler->writable) {
+            in->status = VIRTIO_BLK_S_IOERR;
+            break;
+        }
+        in->status = virtio_blk_discard_write_zeroes(handler, out_iov,
+                                                     out_num, type);
+        break;
+    default:
+        in->status = VIRTIO_BLK_S_UNSUPP;
+        break;
+    }
+
+    return in_len;
+}
diff --git a/block/export/virtio-blk-handler.h b/block/export/virtio-blk-handler.h
new file mode 100644
index 0000000000..150d44cff2
--- /dev/null
+++ b/block/export/virtio-blk-handler.h
@@ -0,0 +1,37 @@
+/*
+ * Handler for virtio-blk I/O
+ *
+ * Copyright (C) 2022 Bytedance Inc. and/or its affiliates. All rights reserved.
+ *
+ * Author:
+ *   Xie Yongji <xieyongji@bytedance.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or
+ * later.  See the COPYING file in the top-level directory.
+ */
+
+#ifndef VIRTIO_BLK_HANDLER_H
+#define VIRTIO_BLK_HANDLER_H
+
+#include "sysemu/block-backend.h"
+
+#define VIRTIO_BLK_SECTOR_BITS 9
+#define VIRTIO_BLK_SECTOR_SIZE (1ULL << VIRTIO_BLK_SECTOR_BITS)
+
+#define VIRTIO_BLK_MAX_DISCARD_SECTORS 32768
+#define VIRTIO_BLK_MAX_WRITE_ZEROES_SECTORS 32768
+
+typedef struct {
+    BlockBackend *blk;
+    char *serial;
+    uint32_t logical_block_size;
+    bool writable;
+} VirtioBlkHandler;
+
+int coroutine_fn virtio_blk_process_req(VirtioBlkHandler *handler,
+                                        struct iovec *in_iov,
+                                        struct iovec *out_iov,
+                                        unsigned int in_num,
+                                        unsigned int out_num);
+
+#endif /* VIRTIO_BLK_HANDLER_H */
diff --git a/block/gluster.c b/block/gluster.c
index 398976bc66..b60213ab80 100644
--- a/block/gluster.c
+++ b/block/gluster.c
@@ -891,7 +891,7 @@ out:
 static void qemu_gluster_refresh_limits(BlockDriverState *bs, Error **errp)
 {
     bs->bl.max_transfer = GLUSTER_MAX_TRANSFER;
-    bs->bl.max_pdiscard = SIZE_MAX;
+    bs->bl.max_pdiscard = MIN(SIZE_MAX, INT64_MAX);
 }
 
 static int qemu_gluster_reopen_prepare(BDRVReopenState *state,
diff --git a/block/io.c b/block/io.c
index 789e6373d5..1e9bf09a49 100644
--- a/block/io.c
+++ b/block/io.c
@@ -588,21 +588,6 @@ void bdrv_unapply_subtree_drain(BdrvChild *child, BlockDriverState *old_parent)
     BDRV_POLL_WHILE(child->bs, qatomic_read(&drained_end_counter) > 0);
 }
 
-/*
- * Wait for pending requests to complete on a single BlockDriverState subtree,
- * and suspend block driver's internal I/O until next request arrives.
- *
- * Note that unlike bdrv_drain_all(), the caller must hold the BlockDriverState
- * AioContext.
- */
-void coroutine_fn bdrv_co_drain(BlockDriverState *bs)
-{
-    IO_OR_GS_CODE();
-    assert(qemu_in_coroutine());
-    bdrv_drained_begin(bs);
-    bdrv_drained_end(bs);
-}
-
 void bdrv_drain(BlockDriverState *bs)
 {
     IO_OR_GS_CODE();
diff --git a/block/monitor/bitmap-qmp-cmds.c b/block/monitor/bitmap-qmp-cmds.c
index 2b677c4a2f..282363606f 100644
--- a/block/monitor/bitmap-qmp-cmds.c
+++ b/block/monitor/bitmap-qmp-cmds.c
@@ -261,8 +261,9 @@ BdrvDirtyBitmap *block_dirty_bitmap_merge(const char *node, const char *target,
                                           HBitmap **backup, Error **errp)
 {
     BlockDriverState *bs;
-    BdrvDirtyBitmap *dst, *src, *anon;
+    BdrvDirtyBitmap *dst, *src;
     BlockDirtyBitmapOrStrList *lst;
+    HBitmap *local_backup = NULL;
 
     GLOBAL_STATE_CODE();
 
@@ -271,12 +272,6 @@ BdrvDirtyBitmap *block_dirty_bitmap_merge(const char *node, const char *target,
         return NULL;
     }
 
-    anon = bdrv_create_dirty_bitmap(bs, bdrv_dirty_bitmap_granularity(dst),
-                                    NULL, errp);
-    if (!anon) {
-        return NULL;
-    }
-
     for (lst = bms; lst; lst = lst->next) {
         switch (lst->value->type) {
             const char *name, *node;
@@ -285,8 +280,7 @@ BdrvDirtyBitmap *block_dirty_bitmap_merge(const char *node, const char *target,
             src = bdrv_find_dirty_bitmap(bs, name);
             if (!src) {
                 error_setg(errp, "Dirty bitmap '%s' not found", name);
-                dst = NULL;
-                goto out;
+                goto fail;
             }
             break;
         case QTYPE_QDICT:
@@ -294,26 +288,36 @@ BdrvDirtyBitmap *block_dirty_bitmap_merge(const char *node, const char *target,
             name = lst->value->u.external.name;
             src = block_dirty_bitmap_lookup(node, name, NULL, errp);
             if (!src) {
-                dst = NULL;
-                goto out;
+                goto fail;
             }
             break;
         default:
             abort();
         }
 
-        if (!bdrv_merge_dirty_bitmap(anon, src, NULL, errp)) {
-            dst = NULL;
-            goto out;
+        /* We do backup only for first merge operation */
+        if (!bdrv_merge_dirty_bitmap(dst, src,
+                                     local_backup ? NULL : &local_backup,
+                                     errp))
+        {
+            goto fail;
         }
     }
 
-    /* Merge into dst; dst is unchanged on failure. */
-    bdrv_merge_dirty_bitmap(dst, anon, backup, errp);
+    if (backup) {
+        *backup = local_backup;
+    } else {
+        hbitmap_free(local_backup);
+    }
 
- out:
-    bdrv_release_dirty_bitmap(anon);
     return dst;
+
+fail:
+    if (local_backup) {
+        bdrv_restore_dirty_bitmap(dst, local_backup);
+    }
+
+    return NULL;
 }
 
 void qmp_block_dirty_bitmap_merge(const char *node, const char *target,
diff --git a/block/nbd.c b/block/nbd.c
index 6085ab1d2c..7f5f50ec46 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -521,12 +521,8 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
     if (qiov) {
         qio_channel_set_cork(s->ioc, true);
         rc = nbd_send_request(s->ioc, request);
-        if (rc >= 0) {
-            if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
-                                       NULL) < 0) {
-                rc = -EIO;
-            }
-        } else if (rc >= 0) {
+        if (rc >= 0 && qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
+                                              NULL) < 0) {
             rc = -EIO;
         }
         qio_channel_set_cork(s->ioc, false);
diff --git a/block/rbd.c b/block/rbd.c
index 6caf35cbba..f826410f40 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -831,6 +831,26 @@ static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
         error_setg_errno(errp, -r, "error opening pool %s", opts->pool);
         goto failed_shutdown;
     }
+
+#ifdef HAVE_RBD_NAMESPACE_EXISTS
+    if (opts->has_q_namespace && strlen(opts->q_namespace) > 0) {
+        bool exists;
+
+        r = rbd_namespace_exists(*io_ctx, opts->q_namespace, &exists);
+        if (r < 0) {
+            error_setg_errno(errp, -r, "error checking namespace");
+            goto failed_ioctx_destroy;
+        }
+
+        if (!exists) {
+            error_setg(errp, "namespace '%s' does not exist",
+                       opts->q_namespace);
+            r = -ENOENT;
+            goto failed_ioctx_destroy;
+        }
+    }
+#endif
+
     /*
      * Set the namespace after opening the io context on the pool,
      * if nspace == NULL or if nspace == "", it is just as we did nothing
@@ -840,6 +860,10 @@ static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
     r = 0;
     goto out;
 
+#ifdef HAVE_RBD_NAMESPACE_EXISTS
+failed_ioctx_destroy:
+    rados_ioctx_destroy(*io_ctx);
+#endif
 failed_shutdown:
     rados_shutdown(*cluster);
 out:
diff --git a/docs/tools/qemu-storage-daemon.rst b/docs/tools/qemu-storage-daemon.rst
index 8b97592663..ea00149a63 100644
--- a/docs/tools/qemu-storage-daemon.rst
+++ b/docs/tools/qemu-storage-daemon.rst
@@ -77,6 +77,7 @@ Standard options:
   --export [type=]vhost-user-blk,id=<id>,node-name=<node-name>,addr.type=unix,addr.path=<socket-path>[,writable=on|off][,logical-block-size=<block-size>][,num-queues=<num-queues>]
   --export [type=]vhost-user-blk,id=<id>,node-name=<node-name>,addr.type=fd,addr.str=<fd>[,writable=on|off][,logical-block-size=<block-size>][,num-queues=<num-queues>]
   --export [type=]fuse,id=<id>,node-name=<node-name>,mountpoint=<file>[,growable=on|off][,writable=on|off][,allow-other=on|off|auto]
+  --export [type=]vduse-blk,id=<id>,node-name=<node-name>,name=<vduse-name>[,writable=on|off][,num-queues=<num-queues>][,queue-size=<queue-size>][,logical-block-size=<block-size>][,serial=<serial-number>]
 
   is a block export definition. ``node-name`` is the block node that should be
   exported. ``writable`` determines whether or not the export allows write
@@ -110,6 +111,27 @@ Standard options:
   ``allow-other`` to auto (the default) will try enabling this option, and on
   error fall back to disabling it.
 
+  The ``vduse-blk`` export type takes a ``name`` (must be unique across the host)
+  to create the VDUSE device.
+  ``num-queues`` sets the number of virtqueues (the default is 1).
+  ``queue-size`` sets the virtqueue descriptor table size (the default is 256).
+
+  The instantiated VDUSE device must then be added to the vDPA bus using the
+  vdpa(8) command from the iproute2 project::
+
+  # vdpa dev add name <id> mgmtdev vduse
+
+  The device can be removed from the vDPA bus later as follows::
+
+  # vdpa dev del <id>
+
+  For more information about attaching vDPA devices to the host with
+  virtio_vdpa.ko or attaching them to guests with vhost_vdpa.ko, see
+  https://vdpa-dev.gitlab.io/.
+
+  For more information about VDUSE, see
+  https://docs.kernel.org/userspace-api/vduse.html.
+
 .. option:: --monitor MONITORDEF
 
   is a QMP monitor definition. See the :manpage:`qemu(1)` manual page for
diff --git a/hw/block/virtio-blk.c b/hw/block/virtio-blk.c
index cd804795c6..e9ba752f6b 100644
--- a/hw/block/virtio-blk.c
+++ b/hw/block/virtio-blk.c
@@ -1228,7 +1228,6 @@ static void virtio_blk_device_realize(DeviceState *dev, Error **errp)
 
     s->change = qemu_add_vm_change_state_handler(virtio_blk_dma_restart_cb, s);
     blk_set_dev_ops(s->blk, &virtio_block_ops, s);
-    blk_set_guest_block_size(s->blk, s->conf.conf.logical_block_size);
 
     blk_iostatus_enable(s->blk);
 
diff --git a/hw/block/xen-block.c b/hw/block/xen-block.c
index 674953f1ad..345b284d70 100644
--- a/hw/block/xen-block.c
+++ b/hw/block/xen-block.c
@@ -243,7 +243,6 @@ static void xen_block_realize(XenDevice *xendev, Error **errp)
     }
 
     blk_set_dev_ops(blk, &xen_block_dev_ops, blockdev);
-    blk_set_guest_block_size(blk, conf->logical_block_size);
 
     if (conf->discard_granularity == -1) {
         conf->discard_granularity = conf->physical_block_size;
diff --git a/hw/ide/core.c b/hw/ide/core.c
index c2caa54285..7cbc0a54a7 100644
--- a/hw/ide/core.c
+++ b/hw/ide/core.c
@@ -2548,7 +2548,6 @@ int ide_init_drive(IDEState *s, BlockBackend *blk, IDEDriveKind kind,
     s->smart_selftest_count = 0;
     if (kind == IDE_CD) {
         blk_set_dev_ops(blk, &ide_cd_block_ops, s);
-        blk_set_guest_block_size(blk, 2048);
     } else {
         if (!blk_is_inserted(s->blk)) {
             error_setg(errp, "Device needs media, but drive is empty");
diff --git a/hw/scsi/scsi-disk.c b/hw/scsi/scsi-disk.c
index 072686ed58..91acb5c0ce 100644
--- a/hw/scsi/scsi-disk.c
+++ b/hw/scsi/scsi-disk.c
@@ -2419,7 +2419,6 @@ static void scsi_realize(SCSIDevice *dev, Error **errp)
     } else {
         blk_set_dev_ops(s->qdev.conf.blk, &scsi_disk_block_ops, s);
     }
-    blk_set_guest_block_size(s->qdev.conf.blk, s->qdev.blocksize);
 
     blk_iostatus_enable(s->qdev.conf.blk);
 
diff --git a/hw/scsi/scsi-generic.c b/hw/scsi/scsi-generic.c
index 0ab00ef85c..ada24d7486 100644
--- a/hw/scsi/scsi-generic.c
+++ b/hw/scsi/scsi-generic.c
@@ -321,7 +321,6 @@ static void scsi_read_complete(void * opaque, int ret)
         s->blocksize = ldl_be_p(&r->buf[8]);
         s->max_lba = ldq_be_p(&r->buf[0]);
     }
-    blk_set_guest_block_size(s->conf.blk, s->blocksize);
 
     /*
      * Patch MODE SENSE device specific parameters if the BDS is opened
diff --git a/include/block/aio-wait.h b/include/block/aio-wait.h
index b39eefb38d..54840f8622 100644
--- a/include/block/aio-wait.h
+++ b/include/block/aio-wait.h
@@ -81,6 +81,8 @@ extern AioWait global_aio_wait;
     AioContext *ctx_ = (ctx);                                      \
     /* Increment wait_->num_waiters before evaluating cond. */     \
     qatomic_inc(&wait_->num_waiters);                              \
+    /* Paired with smp_mb in aio_wait_kick(). */                   \
+    smp_mb();                                                      \
     if (ctx_ && in_aio_context_home_thread(ctx_)) {                \
         while ((cond)) {                                           \
             aio_poll(ctx_, true);                                  \
diff --git a/include/block/block-io.h b/include/block/block-io.h
index 62c84f0519..053a27141a 100644
--- a/include/block/block-io.h
+++ b/include/block/block-io.h
@@ -270,7 +270,6 @@ void bdrv_drained_end_no_poll(BlockDriverState *bs, int *drained_end_counter);
                    cond); })
 
 void bdrv_drain(BlockDriverState *bs);
-void coroutine_fn bdrv_co_drain(BlockDriverState *bs);
 
 int generated_co_wrapper
 bdrv_truncate(BdrvChild *child, int64_t offset, bool exact,
diff --git a/include/block/block_int-io.h b/include/block/block_int-io.h
index bb454200e5..ded29e7494 100644
--- a/include/block/block_int-io.h
+++ b/include/block/block_int-io.h
@@ -102,7 +102,7 @@ bool blk_dev_is_tray_open(BlockBackend *blk);
 void bdrv_set_dirty(BlockDriverState *bs, int64_t offset, int64_t bytes);
 
 void bdrv_clear_dirty_bitmap(BdrvDirtyBitmap *bitmap, HBitmap **out);
-bool bdrv_dirty_bitmap_merge_internal(BdrvDirtyBitmap *dest,
+void bdrv_dirty_bitmap_merge_internal(BdrvDirtyBitmap *dest,
                                       const BdrvDirtyBitmap *src,
                                       HBitmap **backup, bool lock);
 
diff --git a/include/qemu/hbitmap.h b/include/qemu/hbitmap.h
index 5bd986aa44..af4e4ab746 100644
--- a/include/qemu/hbitmap.h
+++ b/include/qemu/hbitmap.h
@@ -76,20 +76,9 @@ void hbitmap_truncate(HBitmap *hb, uint64_t size);
  *
  * Store result of merging @a and @b into @result.
  * @result is allowed to be equal to @a or @b.
- *
- * Return true if the merge was successful,
- *        false if it was not attempted.
- */
-bool hbitmap_merge(const HBitmap *a, const HBitmap *b, HBitmap *result);
-
-/**
- * hbitmap_can_merge:
- *
- * hbitmap_can_merge(a, b) && hbitmap_can_merge(a, result) is sufficient and
- * necessary for hbitmap_merge will not fail.
- *
+ * All bitmaps must have same size.
  */
-bool hbitmap_can_merge(const HBitmap *a, const HBitmap *b);
+void hbitmap_merge(const HBitmap *a, const HBitmap *b, HBitmap *result);
 
 /**
  * hbitmap_empty:
diff --git a/include/sysemu/block-backend-io.h b/include/sysemu/block-backend-io.h
index 6517c39295..ccef514023 100644
--- a/include/sysemu/block-backend-io.h
+++ b/include/sysemu/block-backend-io.h
@@ -72,7 +72,6 @@ void blk_error_action(BlockBackend *blk, BlockErrorAction action,
 void blk_iostatus_set_err(BlockBackend *blk, int error);
 int blk_get_max_iov(BlockBackend *blk);
 int blk_get_max_hw_iov(BlockBackend *blk);
-void blk_set_guest_block_size(BlockBackend *blk, int align);
 
 void blk_io_plug(BlockBackend *blk);
 void blk_io_unplug(BlockBackend *blk);
diff --git a/linux-headers/linux/vduse.h b/linux-headers/linux/vduse.h
new file mode 100644
index 0000000000..d47b004ce6
--- /dev/null
+++ b/linux-headers/linux/vduse.h
@@ -0,0 +1,306 @@
+/* SPDX-License-Identifier: GPL-2.0 WITH Linux-syscall-note */
+#ifndef _VDUSE_H_
+#define _VDUSE_H_
+
+#include <linux/types.h>
+
+#define VDUSE_BASE	0x81
+
+/* The ioctls for control device (/dev/vduse/control) */
+
+#define VDUSE_API_VERSION	0
+
+/*
+ * Get the version of VDUSE API that kernel supported (VDUSE_API_VERSION).
+ * This is used for future extension.
+ */
+#define VDUSE_GET_API_VERSION	_IOR(VDUSE_BASE, 0x00, __u64)
+
+/* Set the version of VDUSE API that userspace supported. */
+#define VDUSE_SET_API_VERSION	_IOW(VDUSE_BASE, 0x01, __u64)
+
+/**
+ * struct vduse_dev_config - basic configuration of a VDUSE device
+ * @name: VDUSE device name, needs to be NUL terminated
+ * @vendor_id: virtio vendor id
+ * @device_id: virtio device id
+ * @features: virtio features
+ * @vq_num: the number of virtqueues
+ * @vq_align: the allocation alignment of virtqueue's metadata
+ * @reserved: for future use, needs to be initialized to zero
+ * @config_size: the size of the configuration space
+ * @config: the buffer of the configuration space
+ *
+ * Structure used by VDUSE_CREATE_DEV ioctl to create VDUSE device.
+ */
+struct vduse_dev_config {
+#define VDUSE_NAME_MAX	256
+	char name[VDUSE_NAME_MAX];
+	__u32 vendor_id;
+	__u32 device_id;
+	__u64 features;
+	__u32 vq_num;
+	__u32 vq_align;
+	__u32 reserved[13];
+	__u32 config_size;
+	__u8 config[];
+};
+
+/* Create a VDUSE device which is represented by a char device (/dev/vduse/$NAME) */
+#define VDUSE_CREATE_DEV	_IOW(VDUSE_BASE, 0x02, struct vduse_dev_config)
+
+/*
+ * Destroy a VDUSE device. Make sure there are no more references
+ * to the char device (/dev/vduse/$NAME).
+ */
+#define VDUSE_DESTROY_DEV	_IOW(VDUSE_BASE, 0x03, char[VDUSE_NAME_MAX])
+
+/* The ioctls for VDUSE device (/dev/vduse/$NAME) */
+
+/**
+ * struct vduse_iotlb_entry - entry of IOTLB to describe one IOVA region [start, last]
+ * @offset: the mmap offset on returned file descriptor
+ * @start: start of the IOVA region
+ * @last: last of the IOVA region
+ * @perm: access permission of the IOVA region
+ *
+ * Structure used by VDUSE_IOTLB_GET_FD ioctl to find an overlapped IOVA region.
+ */
+struct vduse_iotlb_entry {
+	__u64 offset;
+	__u64 start;
+	__u64 last;
+#define VDUSE_ACCESS_RO 0x1
+#define VDUSE_ACCESS_WO 0x2
+#define VDUSE_ACCESS_RW 0x3
+	__u8 perm;
+};
+
+/*
+ * Find the first IOVA region that overlaps with the range [start, last]
+ * and return the corresponding file descriptor. Return -EINVAL means the
+ * IOVA region doesn't exist. Caller should set start and last fields.
+ */
+#define VDUSE_IOTLB_GET_FD	_IOWR(VDUSE_BASE, 0x10, struct vduse_iotlb_entry)
+
+/*
+ * Get the negotiated virtio features. It's a subset of the features in
+ * struct vduse_dev_config which can be accepted by virtio driver. It's
+ * only valid after FEATURES_OK status bit is set.
+ */
+#define VDUSE_DEV_GET_FEATURES	_IOR(VDUSE_BASE, 0x11, __u64)
+
+/**
+ * struct vduse_config_data - data used to update configuration space
+ * @offset: the offset from the beginning of configuration space
+ * @length: the length to write to configuration space
+ * @buffer: the buffer used to write from
+ *
+ * Structure used by VDUSE_DEV_SET_CONFIG ioctl to update device
+ * configuration space.
+ */
+struct vduse_config_data {
+	__u32 offset;
+	__u32 length;
+	__u8 buffer[];
+};
+
+/* Set device configuration space */
+#define VDUSE_DEV_SET_CONFIG	_IOW(VDUSE_BASE, 0x12, struct vduse_config_data)
+
+/*
+ * Inject a config interrupt. It's usually used to notify virtio driver
+ * that device configuration space has changed.
+ */
+#define VDUSE_DEV_INJECT_CONFIG_IRQ	_IO(VDUSE_BASE, 0x13)
+
+/**
+ * struct vduse_vq_config - basic configuration of a virtqueue
+ * @index: virtqueue index
+ * @max_size: the max size of virtqueue
+ * @reserved: for future use, needs to be initialized to zero
+ *
+ * Structure used by VDUSE_VQ_SETUP ioctl to setup a virtqueue.
+ */
+struct vduse_vq_config {
+	__u32 index;
+	__u16 max_size;
+	__u16 reserved[13];
+};
+
+/*
+ * Setup the specified virtqueue. Make sure all virtqueues have been
+ * configured before the device is attached to vDPA bus.
+ */
+#define VDUSE_VQ_SETUP		_IOW(VDUSE_BASE, 0x14, struct vduse_vq_config)
+
+/**
+ * struct vduse_vq_state_split - split virtqueue state
+ * @avail_index: available index
+ */
+struct vduse_vq_state_split {
+	__u16 avail_index;
+};
+
+/**
+ * struct vduse_vq_state_packed - packed virtqueue state
+ * @last_avail_counter: last driver ring wrap counter observed by device
+ * @last_avail_idx: device available index
+ * @last_used_counter: device ring wrap counter
+ * @last_used_idx: used index
+ */
+struct vduse_vq_state_packed {
+	__u16 last_avail_counter;
+	__u16 last_avail_idx;
+	__u16 last_used_counter;
+	__u16 last_used_idx;
+};
+
+/**
+ * struct vduse_vq_info - information of a virtqueue
+ * @index: virtqueue index
+ * @num: the size of virtqueue
+ * @desc_addr: address of desc area
+ * @driver_addr: address of driver area
+ * @device_addr: address of device area
+ * @split: split virtqueue state
+ * @packed: packed virtqueue state
+ * @ready: ready status of virtqueue
+ *
+ * Structure used by VDUSE_VQ_GET_INFO ioctl to get virtqueue's information.
+ */
+struct vduse_vq_info {
+	__u32 index;
+	__u32 num;
+	__u64 desc_addr;
+	__u64 driver_addr;
+	__u64 device_addr;
+	union {
+		struct vduse_vq_state_split split;
+		struct vduse_vq_state_packed packed;
+	};
+	__u8 ready;
+};
+
+/* Get the specified virtqueue's information. Caller should set index field. */
+#define VDUSE_VQ_GET_INFO	_IOWR(VDUSE_BASE, 0x15, struct vduse_vq_info)
+
+/**
+ * struct vduse_vq_eventfd - eventfd configuration for a virtqueue
+ * @index: virtqueue index
+ * @fd: eventfd, -1 means de-assigning the eventfd
+ *
+ * Structure used by VDUSE_VQ_SETUP_KICKFD ioctl to setup kick eventfd.
+ */
+struct vduse_vq_eventfd {
+	__u32 index;
+#define VDUSE_EVENTFD_DEASSIGN -1
+	int fd;
+};
+
+/*
+ * Setup kick eventfd for specified virtqueue. The kick eventfd is used
+ * by VDUSE kernel module to notify userspace to consume the avail vring.
+ */
+#define VDUSE_VQ_SETUP_KICKFD	_IOW(VDUSE_BASE, 0x16, struct vduse_vq_eventfd)
+
+/*
+ * Inject an interrupt for specific virtqueue. It's used to notify virtio driver
+ * to consume the used vring.
+ */
+#define VDUSE_VQ_INJECT_IRQ	_IOW(VDUSE_BASE, 0x17, __u32)
+
+/* The control messages definition for read(2)/write(2) on /dev/vduse/$NAME */
+
+/**
+ * enum vduse_req_type - request type
+ * @VDUSE_GET_VQ_STATE: get the state for specified virtqueue from userspace
+ * @VDUSE_SET_STATUS: set the device status
+ * @VDUSE_UPDATE_IOTLB: Notify userspace to update the memory mapping for
+ *                      specified IOVA range via VDUSE_IOTLB_GET_FD ioctl
+ */
+enum vduse_req_type {
+	VDUSE_GET_VQ_STATE,
+	VDUSE_SET_STATUS,
+	VDUSE_UPDATE_IOTLB,
+};
+
+/**
+ * struct vduse_vq_state - virtqueue state
+ * @index: virtqueue index
+ * @split: split virtqueue state
+ * @packed: packed virtqueue state
+ */
+struct vduse_vq_state {
+	__u32 index;
+	union {
+		struct vduse_vq_state_split split;
+		struct vduse_vq_state_packed packed;
+	};
+};
+
+/**
+ * struct vduse_dev_status - device status
+ * @status: device status
+ */
+struct vduse_dev_status {
+	__u8 status;
+};
+
+/**
+ * struct vduse_iova_range - IOVA range [start, last]
+ * @start: start of the IOVA range
+ * @last: last of the IOVA range
+ */
+struct vduse_iova_range {
+	__u64 start;
+	__u64 last;
+};
+
+/**
+ * struct vduse_dev_request - control request
+ * @type: request type
+ * @request_id: request id
+ * @reserved: for future use
+ * @vq_state: virtqueue state, only index field is available
+ * @s: device status
+ * @iova: IOVA range for updating
+ * @padding: padding
+ *
+ * Structure used by read(2) on /dev/vduse/$NAME.
+ */
+struct vduse_dev_request {
+	__u32 type;
+	__u32 request_id;
+	__u32 reserved[4];
+	union {
+		struct vduse_vq_state vq_state;
+		struct vduse_dev_status s;
+		struct vduse_iova_range iova;
+		__u32 padding[32];
+	};
+};
+
+/**
+ * struct vduse_dev_response - response to control request
+ * @request_id: corresponding request id
+ * @result: the result of request
+ * @reserved: for future use, needs to be initialized to zero
+ * @vq_state: virtqueue state
+ * @padding: padding
+ *
+ * Structure used by write(2) on /dev/vduse/$NAME.
+ */
+struct vduse_dev_response {
+	__u32 request_id;
+#define VDUSE_REQ_RESULT_OK	0x00
+#define VDUSE_REQ_RESULT_FAILED	0x01
+	__u32 result;
+	__u32 reserved[4];
+	union {
+		struct vduse_vq_state vq_state;
+		__u32 padding[32];
+	};
+};
+
+#endif /* _VDUSE_H_ */
diff --git a/meson.build b/meson.build
index 9efcb175d1..a113078f1a 100644
--- a/meson.build
+++ b/meson.build
@@ -1541,6 +1541,26 @@ if get_option('fuse_lseek').allowed()
   endif
 endif
 
+have_libvduse = (targetos == 'linux')
+if get_option('libvduse').enabled()
+    if targetos != 'linux'
+        error('libvduse requires linux')
+    endif
+elif get_option('libvduse').disabled()
+    have_libvduse = false
+endif
+
+have_vduse_blk_export = (have_libvduse and targetos == 'linux')
+if get_option('vduse_blk_export').enabled()
+    if targetos != 'linux'
+        error('vduse_blk_export requires linux')
+    elif not have_libvduse
+        error('vduse_blk_export requires libvduse support')
+    endif
+elif get_option('vduse_blk_export').disabled()
+    have_vduse_blk_export = false
+endif
+
 # libbpf
 libbpf = dependency('libbpf', required: get_option('bpf'), method: 'pkg-config')
 if libbpf.found() and not cc.links('''
@@ -1783,6 +1803,7 @@ config_host_data.set('CONFIG_VHOST_CRYPTO', have_vhost_user_crypto)
 config_host_data.set('CONFIG_VHOST_VDPA', have_vhost_vdpa)
 config_host_data.set('CONFIG_VMNET', vmnet.found())
 config_host_data.set('CONFIG_VHOST_USER_BLK_SERVER', have_vhost_user_blk_server)
+config_host_data.set('CONFIG_VDUSE_BLK_EXPORT', have_vduse_blk_export)
 config_host_data.set('CONFIG_PNG', png.found())
 config_host_data.set('CONFIG_VNC', vnc.found())
 config_host_data.set('CONFIG_VNC_JPEG', jpeg.found())
@@ -1882,6 +1903,12 @@ config_host_data.set('HAVE_GETIFADDRS', cc.has_function('getifaddrs'))
 config_host_data.set('HAVE_OPENPTY', cc.has_function('openpty', dependencies: util))
 config_host_data.set('HAVE_STRCHRNUL', cc.has_function('strchrnul'))
 config_host_data.set('HAVE_SYSTEM_FUNCTION', cc.has_function('system', prefix: '#include <stdlib.h>'))
+if rbd.found()
+  config_host_data.set('HAVE_RBD_NAMESPACE_EXISTS',
+                       cc.has_function('rbd_namespace_exists',
+                                       dependencies: rbd,
+                                       prefix: '#include <rbd/librbd.h>'))
+endif
 if rdma.found()
   config_host_data.set('HAVE_IBV_ADVISE_MR',
                        cc.has_function('ibv_advise_mr',
@@ -2986,6 +3013,12 @@ if targetos == 'linux' and have_vhost_user
   vhost_user = libvhost_user.get_variable('vhost_user_dep')
 endif
 
+libvduse = not_found
+if have_libvduse
+  libvduse_proj = subproject('libvduse')
+  libvduse = libvduse_proj.get_variable('libvduse_dep')
+endif
+
 # NOTE: the trace/ subdirectory needs the qapi_trace_events variable
 # that is filled in by qapi/.
 subdir('qapi')
@@ -3842,6 +3875,7 @@ if have_block
   summary_info += {'qed support':       get_option('qed').allowed()}
   summary_info += {'parallels support': get_option('parallels').allowed()}
   summary_info += {'FUSE exports':      fuse}
+  summary_info += {'VDUSE block exports': have_vduse_blk_export}
 endif
 summary(summary_info, bool_yn: true, section: 'Block layer support')
 
diff --git a/meson_options.txt b/meson_options.txt
index f3e2f22c1e..97c38109b1 100644
--- a/meson_options.txt
+++ b/meson_options.txt
@@ -257,6 +257,10 @@ option('virtfs', type: 'feature', value: 'auto',
        description: 'virtio-9p support')
 option('virtiofsd', type: 'feature', value: 'auto',
        description: 'build virtiofs daemon (virtiofsd)')
+option('libvduse', type: 'feature', value: 'auto',
+       description: 'build VDUSE Library')
+option('vduse_blk_export', type: 'feature', value: 'auto',
+       description: 'VDUSE block export support')
 
 option('capstone', type: 'feature', value: 'auto',
        description: 'Whether and how to find the capstone library')
diff --git a/qapi/block-export.json b/qapi/block-export.json
index 8afb1b65b3..4627bbc4e6 100644
--- a/qapi/block-export.json
+++ b/qapi/block-export.json
@@ -179,6 +179,27 @@
   'if': 'CONFIG_FUSE' }
 
 ##
+# @BlockExportOptionsVduseBlk:
+#
+# A vduse-blk block export.
+#
+# @name: the name of VDUSE device (must be unique across the host).
+# @num-queues: the number of virtqueues. Defaults to 1.
+# @queue-size: the size of virtqueue. Defaults to 256.
+# @logical-block-size: Logical block size in bytes. Range [512, PAGE_SIZE]
+#                      and must be power of 2. Defaults to 512 bytes.
+# @serial: the serial number of virtio block device. Defaults to empty string.
+#
+# Since: 7.1
+##
+{ 'struct': 'BlockExportOptionsVduseBlk',
+  'data': { 'name': 'str',
+            '*num-queues': 'uint16',
+            '*queue-size': 'uint16',
+            '*logical-block-size': 'size',
+            '*serial': 'str' } }
+
+##
 # @NbdServerAddOptions:
 #
 # An NBD block export, per legacy nbd-server-add command.
@@ -284,6 +305,7 @@
 # @nbd: NBD export
 # @vhost-user-blk: vhost-user-blk export (since 5.2)
 # @fuse: FUSE export (since: 6.0)
+# @vduse-blk: vduse-blk export (since 7.1)
 #
 # Since: 4.2
 ##
@@ -291,7 +313,8 @@
   'data': [ 'nbd',
             { 'name': 'vhost-user-blk',
               'if': 'CONFIG_VHOST_USER_BLK_SERVER' },
-            { 'name': 'fuse', 'if': 'CONFIG_FUSE' } ] }
+            { 'name': 'fuse', 'if': 'CONFIG_FUSE' },
+            { 'name': 'vduse-blk', 'if': 'CONFIG_VDUSE_BLK_EXPORT' } ] }
 
 ##
 # @BlockExportOptions:
@@ -335,7 +358,9 @@
       'vhost-user-blk': { 'type': 'BlockExportOptionsVhostUserBlk',
                           'if': 'CONFIG_VHOST_USER_BLK_SERVER' },
       'fuse': { 'type': 'BlockExportOptionsFuse',
-                'if': 'CONFIG_FUSE' }
+                'if': 'CONFIG_FUSE' },
+      'vduse-blk': { 'type': 'BlockExportOptionsVduseBlk',
+                     'if': 'CONFIG_VDUSE_BLK_EXPORT' }
    } }
 
 ##
diff --git a/scripts/meson-buildoptions.sh b/scripts/meson-buildoptions.sh
index 24eb5f35ea..d0e14fd6de 100644
--- a/scripts/meson-buildoptions.sh
+++ b/scripts/meson-buildoptions.sh
@@ -110,6 +110,7 @@ meson_options_help() {
   printf "%s\n" '  libssh          ssh block device support'
   printf "%s\n" '  libudev         Use libudev to enumerate host devices'
   printf "%s\n" '  libusb          libusb support for USB passthrough'
+  printf "%s\n" '  libvduse        build VDUSE Library'
   printf "%s\n" '  linux-aio       Linux AIO support'
   printf "%s\n" '  linux-io-uring  Linux io_uring support'
   printf "%s\n" '  live-block-migration'
@@ -161,6 +162,8 @@ meson_options_help() {
   printf "%s\n" '  vhost-user      vhost-user backend support'
   printf "%s\n" '  vhost-user-blk-server'
   printf "%s\n" '                  build vhost-user-blk server'
+  printf "%s\n" '  vduse-blk-export'
+  printf "%s\n" '                  VDUSE block export support'
   printf "%s\n" '  vhost-vdpa      vhost-vdpa kernel backend support'
   printf "%s\n" '  virglrenderer   virgl rendering support'
   printf "%s\n" '  virtfs          virtio-9p support'
@@ -307,6 +310,8 @@ _meson_option_parse() {
     --disable-libudev) printf "%s" -Dlibudev=disabled ;;
     --enable-libusb) printf "%s" -Dlibusb=enabled ;;
     --disable-libusb) printf "%s" -Dlibusb=disabled ;;
+    --enable-libvduse) printf "%s" -Dlibvduse=enabled ;;
+    --disable-libvduse) printf "%s" -Dlibvduse=disabled ;;
     --enable-linux-aio) printf "%s" -Dlinux_aio=enabled ;;
     --disable-linux-aio) printf "%s" -Dlinux_aio=disabled ;;
     --enable-linux-io-uring) printf "%s" -Dlinux_io_uring=enabled ;;
@@ -429,6 +434,8 @@ _meson_option_parse() {
     --disable-vhost-user) printf "%s" -Dvhost_user=disabled ;;
     --enable-vhost-user-blk-server) printf "%s" -Dvhost_user_blk_server=enabled ;;
     --disable-vhost-user-blk-server) printf "%s" -Dvhost_user_blk_server=disabled ;;
+    --enable-vduse-blk-export) printf "%s" -Dvduse_blk_export=enabled ;;
+    --disable-vduse-blk-export) printf "%s" -Dvduse_blk_export=disabled ;;
     --enable-vhost-vdpa) printf "%s" -Dvhost_vdpa=enabled ;;
     --disable-vhost-vdpa) printf "%s" -Dvhost_vdpa=disabled ;;
     --enable-virglrenderer) printf "%s" -Dvirglrenderer=enabled ;;
diff --git a/scripts/update-linux-headers.sh b/scripts/update-linux-headers.sh
index 839a5ec614..b1ad99cba8 100755
--- a/scripts/update-linux-headers.sh
+++ b/scripts/update-linux-headers.sh
@@ -161,7 +161,7 @@ done
 rm -rf "$output/linux-headers/linux"
 mkdir -p "$output/linux-headers/linux"
 for header in kvm.h vfio.h vfio_ccw.h vfio_zdev.h vhost.h \
-              psci.h psp-sev.h userfaultfd.h mman.h; do
+              psci.h psp-sev.h userfaultfd.h mman.h vduse.h; do
     cp "$tmpdir/include/linux/$header" "$output/linux-headers/linux"
 done
 
diff --git a/storage-daemon/qemu-storage-daemon.c b/storage-daemon/qemu-storage-daemon.c
index c104817cdd..b8e910f220 100644
--- a/storage-daemon/qemu-storage-daemon.c
+++ b/storage-daemon/qemu-storage-daemon.c
@@ -121,6 +121,16 @@ static void help(void)
 "                         vhost-user-blk device over file descriptor\n"
 "\n"
 #endif /* CONFIG_VHOST_USER_BLK_SERVER */
+#ifdef CONFIG_VDUSE_BLK_EXPORT
+"  --export [type=]vduse-blk,id=<id>,node-name=<node-name>\n"
+"           ,name=<vduse-name>[,writable=on|off]\n"
+"           [,num-queues=<num-queues>][,queue-size=<queue-size>]\n"
+"           [,logical-block-size=<logical-block-size>]\n"
+"           [,serial=<serial-number>]\n"
+"                         export the specified block node as a\n"
+"                         vduse-blk device\n"
+"\n"
+#endif /* CONFIG_VDUSE_BLK_EXPORT */
 "  --monitor [chardev=]name[,mode=control][,pretty[=on|off]]\n"
 "                         configure a QMP monitor\n"
 "\n"
diff --git a/subprojects/libvduse/include/atomic.h b/subprojects/libvduse/include/atomic.h
new file mode 120000
index 0000000000..8c2be64f7b
--- /dev/null
+++ b/subprojects/libvduse/include/atomic.h
@@ -0,0 +1 @@
+../../../include/qemu/atomic.h
\ No newline at end of file
diff --git a/subprojects/libvduse/include/compiler.h b/subprojects/libvduse/include/compiler.h
new file mode 120000
index 0000000000..de7b70697c
--- /dev/null
+++ b/subprojects/libvduse/include/compiler.h
@@ -0,0 +1 @@
+../../../include/qemu/compiler.h
\ No newline at end of file
diff --git a/subprojects/libvduse/libvduse.c b/subprojects/libvduse/libvduse.c
new file mode 100644
index 0000000000..9a2bcec282
--- /dev/null
+++ b/subprojects/libvduse/libvduse.c
@@ -0,0 +1,1375 @@
+/*
+ * VDUSE (vDPA Device in Userspace) library
+ *
+ * Copyright (C) 2022 Bytedance Inc. and/or its affiliates. All rights reserved.
+ *   Portions of codes and concepts borrowed from libvhost-user.c, so:
+ *     Copyright IBM, Corp. 2007
+ *     Copyright (c) 2016 Red Hat, Inc.
+ *
+ * Author:
+ *   Xie Yongji <xieyongji@bytedance.com>
+ *   Anthony Liguori <aliguori@us.ibm.com>
+ *   Marc-André Lureau <mlureau@redhat.com>
+ *   Victor Kaplansky <victork@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or
+ * later.  See the COPYING file in the top-level directory.
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <stdbool.h>
+#include <stddef.h>
+#include <errno.h>
+#include <string.h>
+#include <assert.h>
+#include <endian.h>
+#include <unistd.h>
+#include <limits.h>
+#include <fcntl.h>
+#include <inttypes.h>
+
+#include <sys/ioctl.h>
+#include <sys/eventfd.h>
+#include <sys/mman.h>
+
+#include "include/atomic.h"
+#include "linux-headers/linux/virtio_ring.h"
+#include "linux-headers/linux/virtio_config.h"
+#include "linux-headers/linux/vduse.h"
+#include "libvduse.h"
+
+#define VDUSE_VQ_ALIGN 4096
+#define MAX_IOVA_REGIONS 256
+
+#define LOG_ALIGNMENT 64
+
+/* Round number down to multiple */
+#define ALIGN_DOWN(n, m) ((n) / (m) * (m))
+
+/* Round number up to multiple */
+#define ALIGN_UP(n, m) ALIGN_DOWN((n) + (m) - 1, (m))
+
+#ifndef unlikely
+#define unlikely(x)   __builtin_expect(!!(x), 0)
+#endif
+
+typedef struct VduseDescStateSplit {
+    uint8_t inflight;
+    uint8_t padding[5];
+    uint16_t next;
+    uint64_t counter;
+} VduseDescStateSplit;
+
+typedef struct VduseVirtqLogInflight {
+    uint64_t features;
+    uint16_t version;
+    uint16_t desc_num;
+    uint16_t last_batch_head;
+    uint16_t used_idx;
+    VduseDescStateSplit desc[];
+} VduseVirtqLogInflight;
+
+typedef struct VduseVirtqLog {
+    VduseVirtqLogInflight inflight;
+} VduseVirtqLog;
+
+typedef struct VduseVirtqInflightDesc {
+    uint16_t index;
+    uint64_t counter;
+} VduseVirtqInflightDesc;
+
+typedef struct VduseRing {
+    unsigned int num;
+    uint64_t desc_addr;
+    uint64_t avail_addr;
+    uint64_t used_addr;
+    struct vring_desc *desc;
+    struct vring_avail *avail;
+    struct vring_used *used;
+} VduseRing;
+
+struct VduseVirtq {
+    VduseRing vring;
+    uint16_t last_avail_idx;
+    uint16_t shadow_avail_idx;
+    uint16_t used_idx;
+    uint16_t signalled_used;
+    bool signalled_used_valid;
+    int index;
+    int inuse;
+    bool ready;
+    int fd;
+    VduseDev *dev;
+    VduseVirtqInflightDesc *resubmit_list;
+    uint16_t resubmit_num;
+    uint64_t counter;
+    VduseVirtqLog *log;
+};
+
+typedef struct VduseIovaRegion {
+    uint64_t iova;
+    uint64_t size;
+    uint64_t mmap_offset;
+    uint64_t mmap_addr;
+} VduseIovaRegion;
+
+struct VduseDev {
+    VduseVirtq *vqs;
+    VduseIovaRegion regions[MAX_IOVA_REGIONS];
+    int num_regions;
+    char *name;
+    uint32_t device_id;
+    uint32_t vendor_id;
+    uint16_t num_queues;
+    uint16_t queue_size;
+    uint64_t features;
+    const VduseOps *ops;
+    int fd;
+    int ctrl_fd;
+    void *priv;
+    void *log;
+};
+
+static inline size_t vduse_vq_log_size(uint16_t queue_size)
+{
+    return ALIGN_UP(sizeof(VduseDescStateSplit) * queue_size +
+                    sizeof(VduseVirtqLogInflight), LOG_ALIGNMENT);
+}
+
+static void *vduse_log_get(const char *filename, size_t size)
+{
+    void *ptr = MAP_FAILED;
+    int fd;
+
+    fd = open(filename, O_RDWR | O_CREAT, 0600);
+    if (fd == -1) {
+        return MAP_FAILED;
+    }
+
+    if (ftruncate(fd, size) == -1) {
+        goto out;
+    }
+
+    ptr = mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+
+out:
+    close(fd);
+    return ptr;
+}
+
+static inline bool has_feature(uint64_t features, unsigned int fbit)
+{
+    assert(fbit < 64);
+    return !!(features & (1ULL << fbit));
+}
+
+static inline bool vduse_dev_has_feature(VduseDev *dev, unsigned int fbit)
+{
+    return has_feature(dev->features, fbit);
+}
+
+uint64_t vduse_get_virtio_features(void)
+{
+    return (1ULL << VIRTIO_F_IOMMU_PLATFORM) |
+           (1ULL << VIRTIO_F_VERSION_1) |
+           (1ULL << VIRTIO_F_NOTIFY_ON_EMPTY) |
+           (1ULL << VIRTIO_RING_F_EVENT_IDX) |
+           (1ULL << VIRTIO_RING_F_INDIRECT_DESC);
+}
+
+VduseDev *vduse_queue_get_dev(VduseVirtq *vq)
+{
+    return vq->dev;
+}
+
+int vduse_queue_get_fd(VduseVirtq *vq)
+{
+    return vq->fd;
+}
+
+void *vduse_dev_get_priv(VduseDev *dev)
+{
+    return dev->priv;
+}
+
+VduseVirtq *vduse_dev_get_queue(VduseDev *dev, int index)
+{
+    return &dev->vqs[index];
+}
+
+int vduse_dev_get_fd(VduseDev *dev)
+{
+    return dev->fd;
+}
+
+static int vduse_inject_irq(VduseDev *dev, int index)
+{
+    return ioctl(dev->fd, VDUSE_VQ_INJECT_IRQ, &index);
+}
+
+static int inflight_desc_compare(const void *a, const void *b)
+{
+    VduseVirtqInflightDesc *desc0 = (VduseVirtqInflightDesc *)a,
+                           *desc1 = (VduseVirtqInflightDesc *)b;
+
+    if (desc1->counter > desc0->counter &&
+        (desc1->counter - desc0->counter) < VIRTQUEUE_MAX_SIZE * 2) {
+        return 1;
+    }
+
+    return -1;
+}
+
+static int vduse_queue_check_inflights(VduseVirtq *vq)
+{
+    int i = 0;
+    VduseDev *dev = vq->dev;
+
+    vq->used_idx = le16toh(vq->vring.used->idx);
+    vq->resubmit_num = 0;
+    vq->resubmit_list = NULL;
+    vq->counter = 0;
+
+    if (unlikely(vq->log->inflight.used_idx != vq->used_idx)) {
+        if (vq->log->inflight.last_batch_head > VIRTQUEUE_MAX_SIZE) {
+            return -1;
+        }
+
+        vq->log->inflight.desc[vq->log->inflight.last_batch_head].inflight = 0;
+
+        barrier();
+
+        vq->log->inflight.used_idx = vq->used_idx;
+    }
+
+    for (i = 0; i < vq->log->inflight.desc_num; i++) {
+        if (vq->log->inflight.desc[i].inflight == 1) {
+            vq->inuse++;
+        }
+    }
+
+    vq->shadow_avail_idx = vq->last_avail_idx = vq->inuse + vq->used_idx;
+
+    if (vq->inuse) {
+        vq->resubmit_list = calloc(vq->inuse, sizeof(VduseVirtqInflightDesc));
+        if (!vq->resubmit_list) {
+            return -1;
+        }
+
+        for (i = 0; i < vq->log->inflight.desc_num; i++) {
+            if (vq->log->inflight.desc[i].inflight) {
+                vq->resubmit_list[vq->resubmit_num].index = i;
+                vq->resubmit_list[vq->resubmit_num].counter =
+                                        vq->log->inflight.desc[i].counter;
+                vq->resubmit_num++;
+            }
+        }
+
+        if (vq->resubmit_num > 1) {
+            qsort(vq->resubmit_list, vq->resubmit_num,
+                  sizeof(VduseVirtqInflightDesc), inflight_desc_compare);
+        }
+        vq->counter = vq->resubmit_list[0].counter + 1;
+    }
+
+    vduse_inject_irq(dev, vq->index);
+
+    return 0;
+}
+
+static int vduse_queue_inflight_get(VduseVirtq *vq, int desc_idx)
+{
+    vq->log->inflight.desc[desc_idx].counter = vq->counter++;
+
+    barrier();
+
+    vq->log->inflight.desc[desc_idx].inflight = 1;
+
+    return 0;
+}
+
+static int vduse_queue_inflight_pre_put(VduseVirtq *vq, int desc_idx)
+{
+    vq->log->inflight.last_batch_head = desc_idx;
+
+    return 0;
+}
+
+static int vduse_queue_inflight_post_put(VduseVirtq *vq, int desc_idx)
+{
+    vq->log->inflight.desc[desc_idx].inflight = 0;
+
+    barrier();
+
+    vq->log->inflight.used_idx = vq->used_idx;
+
+    return 0;
+}
+
+static void vduse_iova_remove_region(VduseDev *dev, uint64_t start,
+                                     uint64_t last)
+{
+    int i;
+
+    if (last == start) {
+        return;
+    }
+
+    for (i = 0; i < MAX_IOVA_REGIONS; i++) {
+        if (!dev->regions[i].mmap_addr) {
+            continue;
+        }
+
+        if (start <= dev->regions[i].iova &&
+            last >= (dev->regions[i].iova + dev->regions[i].size - 1)) {
+            munmap((void *)(uintptr_t)dev->regions[i].mmap_addr,
+                   dev->regions[i].mmap_offset + dev->regions[i].size);
+            dev->regions[i].mmap_addr = 0;
+            dev->num_regions--;
+        }
+    }
+}
+
+static int vduse_iova_add_region(VduseDev *dev, int fd,
+                                 uint64_t offset, uint64_t start,
+                                 uint64_t last, int prot)
+{
+    int i;
+    uint64_t size = last - start + 1;
+    void *mmap_addr = mmap(0, size + offset, prot, MAP_SHARED, fd, 0);
+
+    if (mmap_addr == MAP_FAILED) {
+        close(fd);
+        return -EINVAL;
+    }
+
+    for (i = 0; i < MAX_IOVA_REGIONS; i++) {
+        if (!dev->regions[i].mmap_addr) {
+            dev->regions[i].mmap_addr = (uint64_t)(uintptr_t)mmap_addr;
+            dev->regions[i].mmap_offset = offset;
+            dev->regions[i].iova = start;
+            dev->regions[i].size = size;
+            dev->num_regions++;
+            break;
+        }
+    }
+    assert(i < MAX_IOVA_REGIONS);
+    close(fd);
+
+    return 0;
+}
+
+static int perm_to_prot(uint8_t perm)
+{
+    int prot = 0;
+
+    switch (perm) {
+    case VDUSE_ACCESS_WO:
+        prot |= PROT_WRITE;
+        break;
+    case VDUSE_ACCESS_RO:
+        prot |= PROT_READ;
+        break;
+    case VDUSE_ACCESS_RW:
+        prot |= PROT_READ | PROT_WRITE;
+        break;
+    default:
+        break;
+    }
+
+    return prot;
+}
+
+static inline void *iova_to_va(VduseDev *dev, uint64_t *plen, uint64_t iova)
+{
+    int i, ret;
+    struct vduse_iotlb_entry entry;
+
+    for (i = 0; i < MAX_IOVA_REGIONS; i++) {
+        VduseIovaRegion *r = &dev->regions[i];
+
+        if (!r->mmap_addr) {
+            continue;
+        }
+
+        if ((iova >= r->iova) && (iova < (r->iova + r->size))) {
+            if ((iova + *plen) > (r->iova + r->size)) {
+                *plen = r->iova + r->size - iova;
+            }
+            return (void *)(uintptr_t)(iova - r->iova +
+                   r->mmap_addr + r->mmap_offset);
+        }
+    }
+
+    entry.start = iova;
+    entry.last = iova + 1;
+    ret = ioctl(dev->fd, VDUSE_IOTLB_GET_FD, &entry);
+    if (ret < 0) {
+        return NULL;
+    }
+
+    if (!vduse_iova_add_region(dev, ret, entry.offset, entry.start,
+                               entry.last, perm_to_prot(entry.perm))) {
+        return iova_to_va(dev, plen, iova);
+    }
+
+    return NULL;
+}
+
+static inline uint16_t vring_avail_flags(VduseVirtq *vq)
+{
+    return le16toh(vq->vring.avail->flags);
+}
+
+static inline uint16_t vring_avail_idx(VduseVirtq *vq)
+{
+    vq->shadow_avail_idx = le16toh(vq->vring.avail->idx);
+
+    return vq->shadow_avail_idx;
+}
+
+static inline uint16_t vring_avail_ring(VduseVirtq *vq, int i)
+{
+    return le16toh(vq->vring.avail->ring[i]);
+}
+
+static inline uint16_t vring_get_used_event(VduseVirtq *vq)
+{
+    return vring_avail_ring(vq, vq->vring.num);
+}
+
+static bool vduse_queue_get_head(VduseVirtq *vq, unsigned int idx,
+                                 unsigned int *head)
+{
+    /*
+     * Grab the next descriptor number they're advertising, and increment
+     * the index we've seen.
+     */
+    *head = vring_avail_ring(vq, idx % vq->vring.num);
+
+    /* If their number is silly, that's a fatal mistake. */
+    if (*head >= vq->vring.num) {
+        fprintf(stderr, "Guest says index %u is available\n", *head);
+        return false;
+    }
+
+    return true;
+}
+
+static int
+vduse_queue_read_indirect_desc(VduseDev *dev, struct vring_desc *desc,
+                               uint64_t addr, size_t len)
+{
+    struct vring_desc *ori_desc;
+    uint64_t read_len;
+
+    if (len > (VIRTQUEUE_MAX_SIZE * sizeof(struct vring_desc))) {
+        return -1;
+    }
+
+    if (len == 0) {
+        return -1;
+    }
+
+    while (len) {
+        read_len = len;
+        ori_desc = iova_to_va(dev, &read_len, addr);
+        if (!ori_desc) {
+            return -1;
+        }
+
+        memcpy(desc, ori_desc, read_len);
+        len -= read_len;
+        addr += read_len;
+        desc += read_len;
+    }
+
+    return 0;
+}
+
+enum {
+    VIRTQUEUE_READ_DESC_ERROR = -1,
+    VIRTQUEUE_READ_DESC_DONE = 0,   /* end of chain */
+    VIRTQUEUE_READ_DESC_MORE = 1,   /* more buffers in chain */
+};
+
+static int vduse_queue_read_next_desc(struct vring_desc *desc, int i,
+                                      unsigned int max, unsigned int *next)
+{
+    /* If this descriptor says it doesn't chain, we're done. */
+    if (!(le16toh(desc[i].flags) & VRING_DESC_F_NEXT)) {
+        return VIRTQUEUE_READ_DESC_DONE;
+    }
+
+    /* Check they're not leading us off end of descriptors. */
+    *next = desc[i].next;
+    /* Make sure compiler knows to grab that: we don't want it changing! */
+    smp_wmb();
+
+    if (*next >= max) {
+        fprintf(stderr, "Desc next is %u\n", *next);
+        return VIRTQUEUE_READ_DESC_ERROR;
+    }
+
+    return VIRTQUEUE_READ_DESC_MORE;
+}
+
+/*
+ * Fetch avail_idx from VQ memory only when we really need to know if
+ * guest has added some buffers.
+ */
+static bool vduse_queue_empty(VduseVirtq *vq)
+{
+    if (unlikely(!vq->vring.avail)) {
+        return true;
+    }
+
+    if (vq->shadow_avail_idx != vq->last_avail_idx) {
+        return false;
+    }
+
+    return vring_avail_idx(vq) == vq->last_avail_idx;
+}
+
+static bool vduse_queue_should_notify(VduseVirtq *vq)
+{
+    VduseDev *dev = vq->dev;
+    uint16_t old, new;
+    bool v;
+
+    /* We need to expose used array entries before checking used event. */
+    smp_mb();
+
+    /* Always notify when queue is empty (when feature acknowledge) */
+    if (vduse_dev_has_feature(dev, VIRTIO_F_NOTIFY_ON_EMPTY) &&
+        !vq->inuse && vduse_queue_empty(vq)) {
+        return true;
+    }
+
+    if (!vduse_dev_has_feature(dev, VIRTIO_RING_F_EVENT_IDX)) {
+        return !(vring_avail_flags(vq) & VRING_AVAIL_F_NO_INTERRUPT);
+    }
+
+    v = vq->signalled_used_valid;
+    vq->signalled_used_valid = true;
+    old = vq->signalled_used;
+    new = vq->signalled_used = vq->used_idx;
+    return !v || vring_need_event(vring_get_used_event(vq), new, old);
+}
+
+void vduse_queue_notify(VduseVirtq *vq)
+{
+    VduseDev *dev = vq->dev;
+
+    if (unlikely(!vq->vring.avail)) {
+        return;
+    }
+
+    if (!vduse_queue_should_notify(vq)) {
+        return;
+    }
+
+    if (vduse_inject_irq(dev, vq->index) < 0) {
+        fprintf(stderr, "Error inject irq for vq %d: %s\n",
+                vq->index, strerror(errno));
+    }
+}
+
+static inline void vring_set_avail_event(VduseVirtq *vq, uint16_t val)
+{
+    *((uint16_t *)&vq->vring.used->ring[vq->vring.num]) = htole16(val);
+}
+
+static bool vduse_queue_map_single_desc(VduseVirtq *vq, unsigned int *p_num_sg,
+                                   struct iovec *iov, unsigned int max_num_sg,
+                                   bool is_write, uint64_t pa, size_t sz)
+{
+    unsigned num_sg = *p_num_sg;
+    VduseDev *dev = vq->dev;
+
+    assert(num_sg <= max_num_sg);
+
+    if (!sz) {
+        fprintf(stderr, "virtio: zero sized buffers are not allowed\n");
+        return false;
+    }
+
+    while (sz) {
+        uint64_t len = sz;
+
+        if (num_sg == max_num_sg) {
+            fprintf(stderr,
+                    "virtio: too many descriptors in indirect table\n");
+            return false;
+        }
+
+        iov[num_sg].iov_base = iova_to_va(dev, &len, pa);
+        if (iov[num_sg].iov_base == NULL) {
+            fprintf(stderr, "virtio: invalid address for buffers\n");
+            return false;
+        }
+        iov[num_sg++].iov_len = len;
+        sz -= len;
+        pa += len;
+    }
+
+    *p_num_sg = num_sg;
+    return true;
+}
+
+static void *vduse_queue_alloc_element(size_t sz, unsigned out_num,
+                                       unsigned in_num)
+{
+    VduseVirtqElement *elem;
+    size_t in_sg_ofs = ALIGN_UP(sz, __alignof__(elem->in_sg[0]));
+    size_t out_sg_ofs = in_sg_ofs + in_num * sizeof(elem->in_sg[0]);
+    size_t out_sg_end = out_sg_ofs + out_num * sizeof(elem->out_sg[0]);
+
+    assert(sz >= sizeof(VduseVirtqElement));
+    elem = malloc(out_sg_end);
+    if (!elem) {
+        return NULL;
+    }
+    elem->out_num = out_num;
+    elem->in_num = in_num;
+    elem->in_sg = (void *)elem + in_sg_ofs;
+    elem->out_sg = (void *)elem + out_sg_ofs;
+    return elem;
+}
+
+static void *vduse_queue_map_desc(VduseVirtq *vq, unsigned int idx, size_t sz)
+{
+    struct vring_desc *desc = vq->vring.desc;
+    VduseDev *dev = vq->dev;
+    uint64_t desc_addr, read_len;
+    unsigned int desc_len;
+    unsigned int max = vq->vring.num;
+    unsigned int i = idx;
+    VduseVirtqElement *elem;
+    struct iovec iov[VIRTQUEUE_MAX_SIZE];
+    struct vring_desc desc_buf[VIRTQUEUE_MAX_SIZE];
+    unsigned int out_num = 0, in_num = 0;
+    int rc;
+
+    if (le16toh(desc[i].flags) & VRING_DESC_F_INDIRECT) {
+        if (le32toh(desc[i].len) % sizeof(struct vring_desc)) {
+            fprintf(stderr, "Invalid size for indirect buffer table\n");
+            return NULL;
+        }
+
+        /* loop over the indirect descriptor table */
+        desc_addr = le64toh(desc[i].addr);
+        desc_len = le32toh(desc[i].len);
+        max = desc_len / sizeof(struct vring_desc);
+        read_len = desc_len;
+        desc = iova_to_va(dev, &read_len, desc_addr);
+        if (unlikely(desc && read_len != desc_len)) {
+            /* Failed to use zero copy */
+            desc = NULL;
+            if (!vduse_queue_read_indirect_desc(dev, desc_buf,
+                                                desc_addr,
+                                                desc_len)) {
+                desc = desc_buf;
+            }
+        }
+        if (!desc) {
+            fprintf(stderr, "Invalid indirect buffer table\n");
+            return NULL;
+        }
+        i = 0;
+    }
+
+    /* Collect all the descriptors */
+    do {
+        if (le16toh(desc[i].flags) & VRING_DESC_F_WRITE) {
+            if (!vduse_queue_map_single_desc(vq, &in_num, iov + out_num,
+                                             VIRTQUEUE_MAX_SIZE - out_num,
+                                             true, le64toh(desc[i].addr),
+                                             le32toh(desc[i].len))) {
+                return NULL;
+            }
+        } else {
+            if (in_num) {
+                fprintf(stderr, "Incorrect order for descriptors\n");
+                return NULL;
+            }
+            if (!vduse_queue_map_single_desc(vq, &out_num, iov,
+                                             VIRTQUEUE_MAX_SIZE, false,
+                                             le64toh(desc[i].addr),
+                                             le32toh(desc[i].len))) {
+                return NULL;
+            }
+        }
+
+        /* If we've got too many, that implies a descriptor loop. */
+        if ((in_num + out_num) > max) {
+            fprintf(stderr, "Looped descriptor\n");
+            return NULL;
+        }
+        rc = vduse_queue_read_next_desc(desc, i, max, &i);
+    } while (rc == VIRTQUEUE_READ_DESC_MORE);
+
+    if (rc == VIRTQUEUE_READ_DESC_ERROR) {
+        fprintf(stderr, "read descriptor error\n");
+        return NULL;
+    }
+
+    /* Now copy what we have collected and mapped */
+    elem = vduse_queue_alloc_element(sz, out_num, in_num);
+    if (!elem) {
+        fprintf(stderr, "read descriptor error\n");
+        return NULL;
+    }
+    elem->index = idx;
+    for (i = 0; i < out_num; i++) {
+        elem->out_sg[i] = iov[i];
+    }
+    for (i = 0; i < in_num; i++) {
+        elem->in_sg[i] = iov[out_num + i];
+    }
+
+    return elem;
+}
+
+void *vduse_queue_pop(VduseVirtq *vq, size_t sz)
+{
+    unsigned int head;
+    VduseVirtqElement *elem;
+    VduseDev *dev = vq->dev;
+    int i;
+
+    if (unlikely(!vq->vring.avail)) {
+        return NULL;
+    }
+
+    if (unlikely(vq->resubmit_list && vq->resubmit_num > 0)) {
+        i = (--vq->resubmit_num);
+        elem = vduse_queue_map_desc(vq, vq->resubmit_list[i].index, sz);
+
+        if (!vq->resubmit_num) {
+            free(vq->resubmit_list);
+            vq->resubmit_list = NULL;
+        }
+
+        return elem;
+    }
+
+    if (vduse_queue_empty(vq)) {
+        return NULL;
+    }
+    /* Needed after virtio_queue_empty() */
+    smp_rmb();
+
+    if (vq->inuse >= vq->vring.num) {
+        fprintf(stderr, "Virtqueue size exceeded: %d\n", vq->inuse);
+        return NULL;
+    }
+
+    if (!vduse_queue_get_head(vq, vq->last_avail_idx++, &head)) {
+        return NULL;
+    }
+
+    if (vduse_dev_has_feature(dev, VIRTIO_RING_F_EVENT_IDX)) {
+        vring_set_avail_event(vq, vq->last_avail_idx);
+    }
+
+    elem = vduse_queue_map_desc(vq, head, sz);
+
+    if (!elem) {
+        return NULL;
+    }
+
+    vq->inuse++;
+
+    vduse_queue_inflight_get(vq, head);
+
+    return elem;
+}
+
+static inline void vring_used_write(VduseVirtq *vq,
+                                    struct vring_used_elem *uelem, int i)
+{
+    struct vring_used *used = vq->vring.used;
+
+    used->ring[i] = *uelem;
+}
+
+static void vduse_queue_fill(VduseVirtq *vq, const VduseVirtqElement *elem,
+                             unsigned int len, unsigned int idx)
+{
+    struct vring_used_elem uelem;
+
+    if (unlikely(!vq->vring.used)) {
+        return;
+    }
+
+    idx = (idx + vq->used_idx) % vq->vring.num;
+
+    uelem.id = htole32(elem->index);
+    uelem.len = htole32(len);
+    vring_used_write(vq, &uelem, idx);
+}
+
+static inline void vring_used_idx_set(VduseVirtq *vq, uint16_t val)
+{
+    vq->vring.used->idx = htole16(val);
+    vq->used_idx = val;
+}
+
+static void vduse_queue_flush(VduseVirtq *vq, unsigned int count)
+{
+    uint16_t old, new;
+
+    if (unlikely(!vq->vring.used)) {
+        return;
+    }
+
+    /* Make sure buffer is written before we update index. */
+    smp_wmb();
+
+    old = vq->used_idx;
+    new = old + count;
+    vring_used_idx_set(vq, new);
+    vq->inuse -= count;
+    if (unlikely((int16_t)(new - vq->signalled_used) < (uint16_t)(new - old))) {
+        vq->signalled_used_valid = false;
+    }
+}
+
+void vduse_queue_push(VduseVirtq *vq, const VduseVirtqElement *elem,
+                      unsigned int len)
+{
+    vduse_queue_fill(vq, elem, len, 0);
+    vduse_queue_inflight_pre_put(vq, elem->index);
+    vduse_queue_flush(vq, 1);
+    vduse_queue_inflight_post_put(vq, elem->index);
+}
+
+static int vduse_queue_update_vring(VduseVirtq *vq, uint64_t desc_addr,
+                                    uint64_t avail_addr, uint64_t used_addr)
+{
+    struct VduseDev *dev = vq->dev;
+    uint64_t len;
+
+    len = sizeof(struct vring_desc);
+    vq->vring.desc = iova_to_va(dev, &len, desc_addr);
+    if (len != sizeof(struct vring_desc)) {
+        return -EINVAL;
+    }
+
+    len = sizeof(struct vring_avail);
+    vq->vring.avail = iova_to_va(dev, &len, avail_addr);
+    if (len != sizeof(struct vring_avail)) {
+        return -EINVAL;
+    }
+
+    len = sizeof(struct vring_used);
+    vq->vring.used = iova_to_va(dev, &len, used_addr);
+    if (len != sizeof(struct vring_used)) {
+        return -EINVAL;
+    }
+
+    if (!vq->vring.desc || !vq->vring.avail || !vq->vring.used) {
+        fprintf(stderr, "Failed to get vq[%d] iova mapping\n", vq->index);
+        return -EINVAL;
+    }
+
+    return 0;
+}
+
+static void vduse_queue_enable(VduseVirtq *vq)
+{
+    struct VduseDev *dev = vq->dev;
+    struct vduse_vq_info vq_info;
+    struct vduse_vq_eventfd vq_eventfd;
+    int fd;
+
+    vq_info.index = vq->index;
+    if (ioctl(dev->fd, VDUSE_VQ_GET_INFO, &vq_info)) {
+        fprintf(stderr, "Failed to get vq[%d] info: %s\n",
+                vq->index, strerror(errno));
+        return;
+    }
+
+    if (!vq_info.ready) {
+        return;
+    }
+
+    vq->vring.num = vq_info.num;
+    vq->vring.desc_addr = vq_info.desc_addr;
+    vq->vring.avail_addr = vq_info.driver_addr;
+    vq->vring.used_addr = vq_info.device_addr;
+
+    if (vduse_queue_update_vring(vq, vq_info.desc_addr,
+                                 vq_info.driver_addr, vq_info.device_addr)) {
+        fprintf(stderr, "Failed to update vring for vq[%d]\n", vq->index);
+        return;
+    }
+
+    fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
+    if (fd < 0) {
+        fprintf(stderr, "Failed to init eventfd for vq[%d]\n", vq->index);
+        return;
+    }
+
+    vq_eventfd.index = vq->index;
+    vq_eventfd.fd = fd;
+    if (ioctl(dev->fd, VDUSE_VQ_SETUP_KICKFD, &vq_eventfd)) {
+        fprintf(stderr, "Failed to setup kick fd for vq[%d]\n", vq->index);
+        close(fd);
+        return;
+    }
+
+    vq->fd = fd;
+    vq->signalled_used_valid = false;
+    vq->ready = true;
+
+    if (vduse_queue_check_inflights(vq)) {
+        fprintf(stderr, "Failed to check inflights for vq[%d]\n", vq->index);
+        close(fd);
+        return;
+    }
+
+    dev->ops->enable_queue(dev, vq);
+}
+
+static void vduse_queue_disable(VduseVirtq *vq)
+{
+    struct VduseDev *dev = vq->dev;
+    struct vduse_vq_eventfd eventfd;
+
+    if (!vq->ready) {
+        return;
+    }
+
+    dev->ops->disable_queue(dev, vq);
+
+    eventfd.index = vq->index;
+    eventfd.fd = VDUSE_EVENTFD_DEASSIGN;
+    ioctl(dev->fd, VDUSE_VQ_SETUP_KICKFD, &eventfd);
+    close(vq->fd);
+
+    assert(vq->inuse == 0);
+
+    vq->vring.num = 0;
+    vq->vring.desc_addr = 0;
+    vq->vring.avail_addr = 0;
+    vq->vring.used_addr = 0;
+    vq->vring.desc = 0;
+    vq->vring.avail = 0;
+    vq->vring.used = 0;
+    vq->ready = false;
+    vq->fd = -1;
+}
+
+static void vduse_dev_start_dataplane(VduseDev *dev)
+{
+    int i;
+
+    if (ioctl(dev->fd, VDUSE_DEV_GET_FEATURES, &dev->features)) {
+        fprintf(stderr, "Failed to get features: %s\n", strerror(errno));
+        return;
+    }
+    assert(vduse_dev_has_feature(dev, VIRTIO_F_VERSION_1));
+
+    for (i = 0; i < dev->num_queues; i++) {
+        vduse_queue_enable(&dev->vqs[i]);
+    }
+}
+
+static void vduse_dev_stop_dataplane(VduseDev *dev)
+{
+    size_t log_size = dev->num_queues * vduse_vq_log_size(VIRTQUEUE_MAX_SIZE);
+    int i;
+
+    for (i = 0; i < dev->num_queues; i++) {
+        vduse_queue_disable(&dev->vqs[i]);
+    }
+    if (dev->log) {
+        memset(dev->log, 0, log_size);
+    }
+    dev->features = 0;
+    vduse_iova_remove_region(dev, 0, ULONG_MAX);
+}
+
+int vduse_dev_handler(VduseDev *dev)
+{
+    struct vduse_dev_request req;
+    struct vduse_dev_response resp = { 0 };
+    VduseVirtq *vq;
+    int i, ret;
+
+    ret = read(dev->fd, &req, sizeof(req));
+    if (ret != sizeof(req)) {
+        fprintf(stderr, "Read request error [%d]: %s\n",
+                ret, strerror(errno));
+        return -errno;
+    }
+    resp.request_id = req.request_id;
+
+    switch (req.type) {
+    case VDUSE_GET_VQ_STATE:
+        vq = &dev->vqs[req.vq_state.index];
+        resp.vq_state.split.avail_index = vq->last_avail_idx;
+        resp.result = VDUSE_REQ_RESULT_OK;
+        break;
+    case VDUSE_SET_STATUS:
+        if (req.s.status & VIRTIO_CONFIG_S_DRIVER_OK) {
+            vduse_dev_start_dataplane(dev);
+        } else if (req.s.status == 0) {
+            vduse_dev_stop_dataplane(dev);
+        }
+        resp.result = VDUSE_REQ_RESULT_OK;
+        break;
+    case VDUSE_UPDATE_IOTLB:
+        /* The iova will be updated by iova_to_va() later, so just remove it */
+        vduse_iova_remove_region(dev, req.iova.start, req.iova.last);
+        for (i = 0; i < dev->num_queues; i++) {
+            VduseVirtq *vq = &dev->vqs[i];
+            if (vq->ready) {
+                if (vduse_queue_update_vring(vq, vq->vring.desc_addr,
+                                             vq->vring.avail_addr,
+                                             vq->vring.used_addr)) {
+                    fprintf(stderr, "Failed to update vring for vq[%d]\n",
+                            vq->index);
+                }
+            }
+        }
+        resp.result = VDUSE_REQ_RESULT_OK;
+        break;
+    default:
+        resp.result = VDUSE_REQ_RESULT_FAILED;
+        break;
+    }
+
+    ret = write(dev->fd, &resp, sizeof(resp));
+    if (ret != sizeof(resp)) {
+        fprintf(stderr, "Write request %d error [%d]: %s\n",
+                req.type, ret, strerror(errno));
+        return -errno;
+    }
+    return 0;
+}
+
+int vduse_dev_update_config(VduseDev *dev, uint32_t size,
+                            uint32_t offset, char *buffer)
+{
+    int ret;
+    struct vduse_config_data *data;
+
+    data = malloc(offsetof(struct vduse_config_data, buffer) + size);
+    if (!data) {
+        return -ENOMEM;
+    }
+
+    data->offset = offset;
+    data->length = size;
+    memcpy(data->buffer, buffer, size);
+
+    ret = ioctl(dev->fd, VDUSE_DEV_SET_CONFIG, data);
+    free(data);
+
+    if (ret) {
+        return -errno;
+    }
+
+    if (ioctl(dev->fd, VDUSE_DEV_INJECT_CONFIG_IRQ)) {
+        return -errno;
+    }
+
+    return 0;
+}
+
+int vduse_dev_setup_queue(VduseDev *dev, int index, int max_size)
+{
+    VduseVirtq *vq = &dev->vqs[index];
+    struct vduse_vq_config vq_config = { 0 };
+
+    if (max_size > VIRTQUEUE_MAX_SIZE) {
+        return -EINVAL;
+    }
+
+    vq_config.index = vq->index;
+    vq_config.max_size = max_size;
+
+    if (ioctl(dev->fd, VDUSE_VQ_SETUP, &vq_config)) {
+        return -errno;
+    }
+
+    vduse_queue_enable(vq);
+
+    return 0;
+}
+
+int vduse_set_reconnect_log_file(VduseDev *dev, const char *filename)
+{
+
+    size_t log_size = dev->num_queues * vduse_vq_log_size(VIRTQUEUE_MAX_SIZE);
+    void *log;
+    int i;
+
+    dev->log = log = vduse_log_get(filename, log_size);
+    if (log == MAP_FAILED) {
+        fprintf(stderr, "Failed to get vduse log\n");
+        return -EINVAL;
+    }
+
+    for (i = 0; i < dev->num_queues; i++) {
+        dev->vqs[i].log = log;
+        dev->vqs[i].log->inflight.desc_num = VIRTQUEUE_MAX_SIZE;
+        log = (void *)((char *)log + vduse_vq_log_size(VIRTQUEUE_MAX_SIZE));
+    }
+
+    return 0;
+}
+
+static int vduse_dev_init_vqs(VduseDev *dev, uint16_t num_queues)
+{
+    VduseVirtq *vqs;
+    int i;
+
+    vqs = calloc(sizeof(VduseVirtq), num_queues);
+    if (!vqs) {
+        return -ENOMEM;
+    }
+
+    for (i = 0; i < num_queues; i++) {
+        vqs[i].index = i;
+        vqs[i].dev = dev;
+        vqs[i].fd = -1;
+    }
+    dev->vqs = vqs;
+
+    return 0;
+}
+
+static int vduse_dev_init(VduseDev *dev, const char *name,
+                          uint16_t num_queues, const VduseOps *ops,
+                          void *priv)
+{
+    char *dev_path, *dev_name;
+    int ret, fd;
+
+    dev_path = malloc(strlen(name) + strlen("/dev/vduse/") + 1);
+    if (!dev_path) {
+        return -ENOMEM;
+    }
+    sprintf(dev_path, "/dev/vduse/%s", name);
+
+    fd = open(dev_path, O_RDWR);
+    free(dev_path);
+    if (fd < 0) {
+        fprintf(stderr, "Failed to open vduse dev %s: %s\n",
+                name, strerror(errno));
+        return -errno;
+    }
+
+    if (ioctl(fd, VDUSE_DEV_GET_FEATURES, &dev->features)) {
+        fprintf(stderr, "Failed to get features: %s\n", strerror(errno));
+        close(fd);
+        return -errno;
+    }
+
+    dev_name = strdup(name);
+    if (!dev_name) {
+        close(fd);
+        return -ENOMEM;
+    }
+
+    ret = vduse_dev_init_vqs(dev, num_queues);
+    if (ret) {
+        free(dev_name);
+        close(fd);
+        return ret;
+    }
+
+    dev->name = dev_name;
+    dev->num_queues = num_queues;
+    dev->fd = fd;
+    dev->ops = ops;
+    dev->priv = priv;
+
+    return 0;
+}
+
+static inline bool vduse_name_is_valid(const char *name)
+{
+    return strlen(name) >= VDUSE_NAME_MAX || strstr(name, "..");
+}
+
+VduseDev *vduse_dev_create_by_fd(int fd, uint16_t num_queues,
+                                 const VduseOps *ops, void *priv)
+{
+    VduseDev *dev;
+    int ret;
+
+    if (!ops || !ops->enable_queue || !ops->disable_queue) {
+        fprintf(stderr, "Invalid parameter for vduse\n");
+        return NULL;
+    }
+
+    dev = calloc(sizeof(VduseDev), 1);
+    if (!dev) {
+        fprintf(stderr, "Failed to allocate vduse device\n");
+        return NULL;
+    }
+
+    if (ioctl(fd, VDUSE_DEV_GET_FEATURES, &dev->features)) {
+        fprintf(stderr, "Failed to get features: %s\n", strerror(errno));
+        free(dev);
+        return NULL;
+    }
+
+    ret = vduse_dev_init_vqs(dev, num_queues);
+    if (ret) {
+        fprintf(stderr, "Failed to init vqs\n");
+        free(dev);
+        return NULL;
+    }
+
+    dev->num_queues = num_queues;
+    dev->fd = fd;
+    dev->ops = ops;
+    dev->priv = priv;
+
+    return dev;
+}
+
+VduseDev *vduse_dev_create_by_name(const char *name, uint16_t num_queues,
+                                   const VduseOps *ops, void *priv)
+{
+    VduseDev *dev;
+    int ret;
+
+    if (!name || vduse_name_is_valid(name) || !ops ||
+        !ops->enable_queue || !ops->disable_queue) {
+        fprintf(stderr, "Invalid parameter for vduse\n");
+        return NULL;
+    }
+
+    dev = calloc(sizeof(VduseDev), 1);
+    if (!dev) {
+        fprintf(stderr, "Failed to allocate vduse device\n");
+        return NULL;
+    }
+
+    ret = vduse_dev_init(dev, name, num_queues, ops, priv);
+    if (ret < 0) {
+        fprintf(stderr, "Failed to init vduse device %s: %s\n",
+                name, strerror(ret));
+        free(dev);
+        return NULL;
+    }
+
+    return dev;
+}
+
+VduseDev *vduse_dev_create(const char *name, uint32_t device_id,
+                           uint32_t vendor_id, uint64_t features,
+                           uint16_t num_queues, uint32_t config_size,
+                           char *config, const VduseOps *ops, void *priv)
+{
+    VduseDev *dev;
+    int ret, ctrl_fd;
+    uint64_t version;
+    struct vduse_dev_config *dev_config;
+    size_t size = offsetof(struct vduse_dev_config, config);
+
+    if (!name || vduse_name_is_valid(name) ||
+        !has_feature(features,  VIRTIO_F_VERSION_1) || !config ||
+        !config_size || !ops || !ops->enable_queue || !ops->disable_queue) {
+        fprintf(stderr, "Invalid parameter for vduse\n");
+        return NULL;
+    }
+
+    dev = calloc(sizeof(VduseDev), 1);
+    if (!dev) {
+        fprintf(stderr, "Failed to allocate vduse device\n");
+        return NULL;
+    }
+
+    ctrl_fd = open("/dev/vduse/control", O_RDWR);
+    if (ctrl_fd < 0) {
+        fprintf(stderr, "Failed to open /dev/vduse/control: %s\n",
+                strerror(errno));
+        goto err_ctrl;
+    }
+
+    version = VDUSE_API_VERSION;
+    if (ioctl(ctrl_fd, VDUSE_SET_API_VERSION, &version)) {
+        fprintf(stderr, "Failed to set api version %" PRIu64 ": %s\n",
+                version, strerror(errno));
+        goto err_dev;
+    }
+
+    dev_config = calloc(size + config_size, 1);
+    if (!dev_config) {
+        fprintf(stderr, "Failed to allocate config space\n");
+        goto err_dev;
+    }
+
+    strcpy(dev_config->name, name);
+    dev_config->device_id = device_id;
+    dev_config->vendor_id = vendor_id;
+    dev_config->features = features;
+    dev_config->vq_num = num_queues;
+    dev_config->vq_align = VDUSE_VQ_ALIGN;
+    dev_config->config_size = config_size;
+    memcpy(dev_config->config, config, config_size);
+
+    ret = ioctl(ctrl_fd, VDUSE_CREATE_DEV, dev_config);
+    free(dev_config);
+    if (ret && errno != EEXIST) {
+        fprintf(stderr, "Failed to create vduse device %s: %s\n",
+                name, strerror(errno));
+        goto err_dev;
+    }
+    dev->ctrl_fd = ctrl_fd;
+
+    ret = vduse_dev_init(dev, name, num_queues, ops, priv);
+    if (ret < 0) {
+        fprintf(stderr, "Failed to init vduse device %s: %s\n",
+                name, strerror(ret));
+        goto err;
+    }
+
+    return dev;
+err:
+    ioctl(ctrl_fd, VDUSE_DESTROY_DEV, name);
+err_dev:
+    close(ctrl_fd);
+err_ctrl:
+    free(dev);
+
+    return NULL;
+}
+
+int vduse_dev_destroy(VduseDev *dev)
+{
+    size_t log_size = dev->num_queues * vduse_vq_log_size(VIRTQUEUE_MAX_SIZE);
+    int i, ret = 0;
+
+    if (dev->log) {
+        munmap(dev->log, log_size);
+    }
+    for (i = 0; i < dev->num_queues; i++) {
+        free(dev->vqs[i].resubmit_list);
+    }
+    free(dev->vqs);
+    if (dev->fd >= 0) {
+        close(dev->fd);
+        dev->fd = -1;
+    }
+    if (dev->ctrl_fd >= 0) {
+        if (ioctl(dev->ctrl_fd, VDUSE_DESTROY_DEV, dev->name)) {
+            ret = -errno;
+        }
+        close(dev->ctrl_fd);
+        dev->ctrl_fd = -1;
+    }
+    free(dev->name);
+    free(dev);
+
+    return ret;
+}
diff --git a/subprojects/libvduse/libvduse.h b/subprojects/libvduse/libvduse.h
new file mode 100644
index 0000000000..32f19e7b48
--- /dev/null
+++ b/subprojects/libvduse/libvduse.h
@@ -0,0 +1,247 @@
+/*
+ * VDUSE (vDPA Device in Userspace) library
+ *
+ * Copyright (C) 2022 Bytedance Inc. and/or its affiliates. All rights reserved.
+ *
+ * Author:
+ *   Xie Yongji <xieyongji@bytedance.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or
+ * later.  See the COPYING file in the top-level directory.
+ */
+
+#ifndef LIBVDUSE_H
+#define LIBVDUSE_H
+
+#include <stdint.h>
+#include <sys/uio.h>
+
+#define VIRTQUEUE_MAX_SIZE 1024
+
+/* VDUSE device structure */
+typedef struct VduseDev VduseDev;
+
+/* Virtqueue structure */
+typedef struct VduseVirtq VduseVirtq;
+
+/* Some operation of VDUSE backend */
+typedef struct VduseOps {
+    /* Called when virtqueue can be processed */
+    void (*enable_queue)(VduseDev *dev, VduseVirtq *vq);
+    /* Called when virtqueue processing should be stopped */
+    void (*disable_queue)(VduseDev *dev, VduseVirtq *vq);
+} VduseOps;
+
+/* Describing elements of the I/O buffer */
+typedef struct VduseVirtqElement {
+    /* Descriptor table index */
+    unsigned int index;
+    /* Number of physically-contiguous device-readable descriptors */
+    unsigned int out_num;
+    /* Number of physically-contiguous device-writable descriptors */
+    unsigned int in_num;
+    /* Array to store physically-contiguous device-writable descriptors */
+    struct iovec *in_sg;
+    /* Array to store physically-contiguous device-readable descriptors */
+    struct iovec *out_sg;
+} VduseVirtqElement;
+
+
+/**
+ * vduse_get_virtio_features:
+ *
+ * Get supported virtio features
+ *
+ * Returns: supported feature bits
+ */
+uint64_t vduse_get_virtio_features(void);
+
+/**
+ * vduse_queue_get_dev:
+ * @vq: specified virtqueue
+ *
+ * Get corresponding VDUSE device from the virtqueue.
+ *
+ * Returns: a pointer to VDUSE device on success, NULL on failure.
+ */
+VduseDev *vduse_queue_get_dev(VduseVirtq *vq);
+
+/**
+ * vduse_queue_get_fd:
+ * @vq: specified virtqueue
+ *
+ * Get the kick fd for the virtqueue.
+ *
+ * Returns: file descriptor on success, -1 on failure.
+ */
+int vduse_queue_get_fd(VduseVirtq *vq);
+
+/**
+ * vduse_queue_pop:
+ * @vq: specified virtqueue
+ * @sz: the size of struct to return (must be >= VduseVirtqElement)
+ *
+ * Pop an element from virtqueue available ring.
+ *
+ * Returns: a pointer to a structure containing VduseVirtqElement on success,
+ * NULL on failure.
+ */
+void *vduse_queue_pop(VduseVirtq *vq, size_t sz);
+
+/**
+ * vduse_queue_push:
+ * @vq: specified virtqueue
+ * @elem: pointer to VduseVirtqElement returned by vduse_queue_pop()
+ * @len: length in bytes to write
+ *
+ * Push an element to virtqueue used ring.
+ */
+void vduse_queue_push(VduseVirtq *vq, const VduseVirtqElement *elem,
+                      unsigned int len);
+/**
+ * vduse_queue_notify:
+ * @vq: specified virtqueue
+ *
+ * Request to notify the queue.
+ */
+void vduse_queue_notify(VduseVirtq *vq);
+
+/**
+ * vduse_dev_get_priv:
+ * @dev: VDUSE device
+ *
+ * Get the private pointer passed to vduse_dev_create().
+ *
+ * Returns: private pointer on success, NULL on failure.
+ */
+void *vduse_dev_get_priv(VduseDev *dev);
+
+/**
+ * vduse_dev_get_queue:
+ * @dev: VDUSE device
+ * @index: virtqueue index
+ *
+ * Get the specified virtqueue.
+ *
+ * Returns: a pointer to the virtqueue on success, NULL on failure.
+ */
+VduseVirtq *vduse_dev_get_queue(VduseDev *dev, int index);
+
+/**
+ * vduse_dev_get_fd:
+ * @dev: VDUSE device
+ *
+ * Get the control message fd for the VDUSE device.
+ *
+ * Returns: file descriptor on success, -1 on failure.
+ */
+int vduse_dev_get_fd(VduseDev *dev);
+
+/**
+ * vduse_dev_handler:
+ * @dev: VDUSE device
+ *
+ * Used to process the control message.
+ *
+ * Returns: file descriptor on success, -errno on failure.
+ */
+int vduse_dev_handler(VduseDev *dev);
+
+/**
+ * vduse_dev_update_config:
+ * @dev: VDUSE device
+ * @size: the size to write to configuration space
+ * @offset: the offset from the beginning of configuration space
+ * @buffer: the buffer used to write from
+ *
+ * Update device configuration space and inject a config interrupt.
+ *
+ * Returns: 0 on success, -errno on failure.
+ */
+int vduse_dev_update_config(VduseDev *dev, uint32_t size,
+                            uint32_t offset, char *buffer);
+
+/**
+ * vduse_dev_setup_queue:
+ * @dev: VDUSE device
+ * @index: virtqueue index
+ * @max_size: the max size of virtqueue
+ *
+ * Setup the specified virtqueue.
+ *
+ * Returns: 0 on success, -errno on failure.
+ */
+int vduse_dev_setup_queue(VduseDev *dev, int index, int max_size);
+
+/**
+ * vduse_set_reconnect_log_file:
+ * @dev: VDUSE device
+ * @file: filename of reconnect log
+ *
+ * Specify the file to store log for reconnecting. It should
+ * be called before vduse_dev_setup_queue().
+ *
+ * Returns: 0 on success, -errno on failure.
+ */
+int vduse_set_reconnect_log_file(VduseDev *dev, const char *filename);
+
+/**
+ * vduse_dev_create_by_fd:
+ * @fd: passed file descriptor
+ * @num_queues: the number of virtqueues
+ * @ops: the operation of VDUSE backend
+ * @priv: private pointer
+ *
+ * Create VDUSE device from a passed file descriptor.
+ *
+ * Returns: pointer to VDUSE device on success, NULL on failure.
+ */
+VduseDev *vduse_dev_create_by_fd(int fd, uint16_t num_queues,
+                                 const VduseOps *ops, void *priv);
+
+/**
+ * vduse_dev_create_by_name:
+ * @name: VDUSE device name
+ * @num_queues: the number of virtqueues
+ * @ops: the operation of VDUSE backend
+ * @priv: private pointer
+ *
+ * Create VDUSE device on /dev/vduse/$NAME.
+ *
+ * Returns: pointer to VDUSE device on success, NULL on failure.
+ */
+VduseDev *vduse_dev_create_by_name(const char *name, uint16_t num_queues,
+                                   const VduseOps *ops, void *priv);
+
+/**
+ * vduse_dev_create:
+ * @name: VDUSE device name
+ * @device_id: virtio device id
+ * @vendor_id: virtio vendor id
+ * @features: virtio features
+ * @num_queues: the number of virtqueues
+ * @config_size: the size of the configuration space
+ * @config: the buffer of the configuration space
+ * @ops: the operation of VDUSE backend
+ * @priv: private pointer
+ *
+ * Create VDUSE device.
+ *
+ * Returns: pointer to VDUSE device on success, NULL on failure.
+ */
+VduseDev *vduse_dev_create(const char *name, uint32_t device_id,
+                           uint32_t vendor_id, uint64_t features,
+                           uint16_t num_queues, uint32_t config_size,
+                           char *config, const VduseOps *ops, void *priv);
+
+/**
+ * vduse_dev_destroy:
+ * @dev: VDUSE device
+ *
+ * Destroy the VDUSE device.
+ *
+ * Returns: 0 on success, -errno on failure.
+ */
+int vduse_dev_destroy(VduseDev *dev);
+
+#endif
diff --git a/subprojects/libvduse/linux-headers/linux b/subprojects/libvduse/linux-headers/linux
new file mode 120000
index 0000000000..04f3304f79
--- /dev/null
+++ b/subprojects/libvduse/linux-headers/linux
@@ -0,0 +1 @@
+../../../linux-headers/linux/
\ No newline at end of file
diff --git a/subprojects/libvduse/meson.build b/subprojects/libvduse/meson.build
new file mode 100644
index 0000000000..ba08f5ee1a
--- /dev/null
+++ b/subprojects/libvduse/meson.build
@@ -0,0 +1,10 @@
+project('libvduse', 'c',
+        license: 'GPL-2.0-or-later',
+        default_options: ['c_std=gnu99'])
+
+libvduse = static_library('vduse',
+                          files('libvduse.c'),
+                          c_args: '-D_GNU_SOURCE')
+
+libvduse_dep = declare_dependency(link_with: libvduse,
+                                  include_directories: include_directories('.'))
diff --git a/subprojects/libvduse/standard-headers/linux b/subprojects/libvduse/standard-headers/linux
new file mode 120000
index 0000000000..c416f068ac
--- /dev/null
+++ b/subprojects/libvduse/standard-headers/linux
@@ -0,0 +1 @@
+../../../include/standard-headers/linux/
\ No newline at end of file
diff --git a/util/aio-wait.c b/util/aio-wait.c
index bdb3d3af22..98c5accd29 100644
--- a/util/aio-wait.c
+++ b/util/aio-wait.c
@@ -35,7 +35,21 @@ static void dummy_bh_cb(void *opaque)
 
 void aio_wait_kick(void)
 {
-    /* The barrier (or an atomic op) is in the caller.  */
+    /*
+     * Paired with smp_mb in AIO_WAIT_WHILE. Here we have:
+     * write(condition);
+     * aio_wait_kick() {
+     *      smp_mb();
+     *      read(num_waiters);
+     * }
+     *
+     * And in AIO_WAIT_WHILE:
+     * write(num_waiters);
+     * smp_mb();
+     * read(condition);
+     */
+    smp_mb();
+
     if (qatomic_read(&global_aio_wait.num_waiters)) {
         aio_bh_schedule_oneshot(qemu_get_aio_context(), dummy_bh_cb, NULL);
     }
diff --git a/util/hbitmap.c b/util/hbitmap.c
index ea989e1f0e..297db35fb1 100644
--- a/util/hbitmap.c
+++ b/util/hbitmap.c
@@ -873,11 +873,6 @@ void hbitmap_truncate(HBitmap *hb, uint64_t size)
     }
 }
 
-bool hbitmap_can_merge(const HBitmap *a, const HBitmap *b)
-{
-    return (a->orig_size == b->orig_size);
-}
-
 /**
  * hbitmap_sparse_merge: performs dst = dst | src
  * works with differing granularities.
@@ -901,28 +896,24 @@ static void hbitmap_sparse_merge(HBitmap *dst, const HBitmap *src)
  * Given HBitmaps A and B, let R := A (BITOR) B.
  * Bitmaps A and B will not be modified,
  *     except when bitmap R is an alias of A or B.
- *
- * @return true if the merge was successful,
- *         false if it was not attempted.
+ * Bitmaps must have same size.
  */
-bool hbitmap_merge(const HBitmap *a, const HBitmap *b, HBitmap *result)
+void hbitmap_merge(const HBitmap *a, const HBitmap *b, HBitmap *result)
 {
     int i;
     uint64_t j;
 
-    if (!hbitmap_can_merge(a, b) || !hbitmap_can_merge(a, result)) {
-        return false;
-    }
-    assert(hbitmap_can_merge(b, result));
+    assert(a->orig_size == result->orig_size);
+    assert(b->orig_size == result->orig_size);
 
     if ((!hbitmap_count(a) && result == b) ||
         (!hbitmap_count(b) && result == a)) {
-        return true;
+        return;
     }
 
     if (!hbitmap_count(a) && !hbitmap_count(b)) {
         hbitmap_reset_all(result);
-        return true;
+        return;
     }
 
     if (a->granularity != b->granularity) {
@@ -935,7 +926,7 @@ bool hbitmap_merge(const HBitmap *a, const HBitmap *b, HBitmap *result)
         if (b != result) {
             hbitmap_sparse_merge(result, b);
         }
-        return true;
+        return;
     }
 
     /* This merge is O(size), as BITS_PER_LONG and HBITMAP_LEVELS are constant.
@@ -951,8 +942,6 @@ bool hbitmap_merge(const HBitmap *a, const HBitmap *b, HBitmap *result)
 
     /* Recompute the dirty count */
     result->count = hb_count_between(result, 0, result->size - 1);
-
-    return true;
 }
 
 char *hbitmap_sha256(const HBitmap *bitmap, Error **errp)