summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--Makefile.objs1
-rw-r--r--async.c18
-rw-r--r--block.c61
-rw-r--r--block/blkverify.c17
-rw-r--r--block/qcow2-cluster.c4
-rw-r--r--block/qcow2-refcount.c3
-rw-r--r--block/qcow2.c17
-rw-r--r--block/qed.c3
-rw-r--r--block/quorum.c3
-rw-r--r--block/raw-posix.c47
-rw-r--r--block/raw-win32.c10
-rw-r--r--hw/block/dataplane/virtio-blk.c96
-rw-r--r--hw/core/qdev-properties-system.c70
-rw-r--r--include/block/aio.h18
-rw-r--r--include/block/block.h9
-rw-r--r--include/block/block_int.h8
-rw-r--r--include/hw/qdev-properties.h3
-rw-r--r--include/hw/virtio/virtio-blk.h8
-rw-r--r--include/qemu-io.h2
-rw-r--r--include/qemu/rfifolock.h54
-rw-r--r--include/qom/object.h8
-rw-r--r--include/sysemu/iothread.h30
-rw-r--r--iothread.c178
-rw-r--r--qapi-schema.json29
-rw-r--r--qemu-io-cmds.c2
-rw-r--r--qemu-io.c7
-rw-r--r--qmp-commands.hx39
-rw-r--r--qom/object.c54
-rw-r--r--tests/Makefile2
-rwxr-xr-xtests/qemu-iotests/06026
-rw-r--r--tests/qemu-iotests/060.out15
-rw-r--r--tests/test-aio.c59
-rw-r--r--tests/test-rfifolock.c91
-rw-r--r--util/Makefile.objs1
-rw-r--r--util/rfifolock.c78
35 files changed, 938 insertions, 133 deletions
diff --git a/Makefile.objs b/Makefile.objs
index 5cd3d816ff..a6e0e2aacc 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -39,6 +39,7 @@ libcacard-y += libcacard/vcardt.o
 
 ifeq ($(CONFIG_SOFTMMU),y)
 common-obj-y = blockdev.o blockdev-nbd.o block/
+common-obj-y += iothread.o
 common-obj-y += net/
 common-obj-y += qdev-monitor.o device-hotplug.o
 common-obj-$(CONFIG_WIN32) += os-win32.o
diff --git a/async.c b/async.c
index 5fb3fa61df..6930185e64 100644
--- a/async.c
+++ b/async.c
@@ -214,6 +214,7 @@ aio_ctx_finalize(GSource     *source)
     thread_pool_free(ctx->thread_pool);
     aio_set_event_notifier(ctx, &ctx->notifier, NULL);
     event_notifier_cleanup(&ctx->notifier);
+    rfifolock_destroy(&ctx->lock);
     qemu_mutex_destroy(&ctx->bh_lock);
     g_array_free(ctx->pollfds, TRUE);
     timerlistgroup_deinit(&ctx->tlg);
@@ -250,6 +251,12 @@ static void aio_timerlist_notify(void *opaque)
     aio_notify(opaque);
 }
 
+static void aio_rfifolock_cb(void *opaque)
+{
+    /* Kick owner thread in case they are blocked in aio_poll() */
+    aio_notify(opaque);
+}
+
 AioContext *aio_context_new(void)
 {
     AioContext *ctx;
@@ -257,6 +264,7 @@ AioContext *aio_context_new(void)
     ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
     ctx->thread_pool = NULL;
     qemu_mutex_init(&ctx->bh_lock);
+    rfifolock_init(&ctx->lock, aio_rfifolock_cb, ctx);
     event_notifier_init(&ctx->notifier, false);
     aio_set_event_notifier(ctx, &ctx->notifier, 
                            (EventNotifierHandler *)
@@ -275,3 +283,13 @@ void aio_context_unref(AioContext *ctx)
 {
     g_source_unref(&ctx->source);
 }
+
+void aio_context_acquire(AioContext *ctx)
+{
+    rfifolock_lock(&ctx->lock);
+}
+
+void aio_context_release(AioContext *ctx)
+{
+    rfifolock_unlock(&ctx->lock);
+}
diff --git a/block.c b/block.c
index f1ef4b0109..fae50c95b5 100644
--- a/block.c
+++ b/block.c
@@ -1321,7 +1321,7 @@ int bdrv_open(BlockDriverState **pbs, const char *filename,
                           bdrv_open_flags(bs, flags | BDRV_O_UNMAP) |
                           BDRV_O_PROTOCOL, true, &local_err);
     if (ret < 0) {
-        goto fail;
+        goto unlink_and_fail;
     }
 
     /* Find the right image format driver */
@@ -4055,7 +4055,7 @@ int bdrv_debug_remove_breakpoint(BlockDriverState *bs, const char *tag)
 
 int bdrv_debug_resume(BlockDriverState *bs, const char *tag)
 {
-    while (bs && bs->drv && !bs->drv->bdrv_debug_resume) {
+    while (bs && (!bs->drv || !bs->drv->bdrv_debug_resume)) {
         bs = bs->file;
     }
 
@@ -4776,9 +4776,17 @@ flush_parent:
 
 void bdrv_invalidate_cache(BlockDriverState *bs)
 {
-    if (bs->drv && bs->drv->bdrv_invalidate_cache) {
+    if (!bs->drv)  {
+        return;
+    }
+
+    if (bs->drv->bdrv_invalidate_cache) {
         bs->drv->bdrv_invalidate_cache(bs);
+    } else if (bs->file) {
+        bdrv_invalidate_cache(bs->file);
     }
+
+    refresh_total_sectors(bs, bs->total_sectors);
 }
 
 void bdrv_invalidate_cache_all(void)
@@ -5390,43 +5398,37 @@ int bdrv_amend_options(BlockDriverState *bs, QEMUOptionParameter *options)
     return bs->drv->bdrv_amend_options(bs, options);
 }
 
-/* Used to recurse on single child block filters.
- * Single child block filter will store their child in bs->file.
+/* This function will be called by the bdrv_recurse_is_first_non_filter method
+ * of block filter and by bdrv_is_first_non_filter.
+ * It is used to test if the given bs is the candidate or recurse more in the
+ * node graph.
  */
-bool bdrv_generic_is_first_non_filter(BlockDriverState *bs,
+bool bdrv_recurse_is_first_non_filter(BlockDriverState *bs,
                                       BlockDriverState *candidate)
 {
-    if (!bs->drv) {
-        return false;
-    }
-
-    if (!bs->drv->authorizations[BS_IS_A_FILTER]) {
-        if (bs == candidate) {
-            return true;
-        } else {
-            return false;
-        }
-    }
-
-    if (!bs->drv->authorizations[BS_FILTER_PASS_DOWN]) {
+    /* return false if basic checks fails */
+    if (!bs || !bs->drv) {
         return false;
     }
 
-    if (!bs->file) {
-        return false;
+    /* the code reached a non block filter driver -> check if the bs is
+     * the same as the candidate. It's the recursion termination condition.
+     */
+    if (!bs->drv->is_filter) {
+        return bs == candidate;
     }
+    /* Down this path the driver is a block filter driver */
 
-    return bdrv_recurse_is_first_non_filter(bs->file, candidate);
-}
-
-bool bdrv_recurse_is_first_non_filter(BlockDriverState *bs,
-                                      BlockDriverState *candidate)
-{
-    if (bs->drv && bs->drv->bdrv_recurse_is_first_non_filter) {
+    /* If the block filter recursion method is defined use it to recurse down
+     * the node graph.
+     */
+    if (bs->drv->bdrv_recurse_is_first_non_filter) {
         return bs->drv->bdrv_recurse_is_first_non_filter(bs, candidate);
     }
 
-    return bdrv_generic_is_first_non_filter(bs, candidate);
+    /* the driver is a block filter but don't allow to recurse -> return false
+     */
+    return false;
 }
 
 /* This function checks if the candidate is the first non filter bs down it's
@@ -5441,6 +5443,7 @@ bool bdrv_is_first_non_filter(BlockDriverState *candidate)
     QTAILQ_FOREACH(bs, &bdrv_states, device_list) {
         bool perm;
 
+        /* try to recurse in this top level bs */
         perm = bdrv_recurse_is_first_non_filter(bs, candidate);
 
         /* candidate is the first non filter */
diff --git a/block/blkverify.c b/block/blkverify.c
index b98b08bedf..e1c31171c3 100644
--- a/block/blkverify.c
+++ b/block/blkverify.c
@@ -288,6 +288,20 @@ static BlockDriverAIOCB *blkverify_aio_flush(BlockDriverState *bs,
     return bdrv_aio_flush(s->test_file, cb, opaque);
 }
 
+static bool blkverify_recurse_is_first_non_filter(BlockDriverState *bs,
+                                                  BlockDriverState *candidate)
+{
+    BDRVBlkverifyState *s = bs->opaque;
+
+    bool perm = bdrv_recurse_is_first_non_filter(bs->file, candidate);
+
+    if (perm) {
+        return true;
+    }
+
+    return bdrv_recurse_is_first_non_filter(s->test_file, candidate);
+}
+
 static BlockDriver bdrv_blkverify = {
     .format_name            = "blkverify",
     .protocol_name          = "blkverify",
@@ -302,7 +316,8 @@ static BlockDriver bdrv_blkverify = {
     .bdrv_aio_writev        = blkverify_aio_writev,
     .bdrv_aio_flush         = blkverify_aio_flush,
 
-    .authorizations         = { true, false },
+    .is_filter              = true,
+    .bdrv_recurse_is_first_non_filter = blkverify_recurse_is_first_non_filter,
 };
 
 static void bdrv_blkverify_init(void)
diff --git a/block/qcow2-cluster.c b/block/qcow2-cluster.c
index 36c1bed350..9499df9ef2 100644
--- a/block/qcow2-cluster.c
+++ b/block/qcow2-cluster.c
@@ -380,6 +380,10 @@ static int coroutine_fn copy_sectors(BlockDriverState *bs,
 
     BLKDBG_EVENT(bs->file, BLKDBG_COW_READ);
 
+    if (!bs->drv) {
+        return -ENOMEDIUM;
+    }
+
     /* Call .bdrv_co_readv() directly instead of using the public block-layer
      * interface.  This avoids double I/O throttling and request tracking,
      * which can lead to deadlock when block layer copy-on-read is enabled.
diff --git a/block/qcow2-refcount.c b/block/qcow2-refcount.c
index 8712d8bd54..6151148507 100644
--- a/block/qcow2-refcount.c
+++ b/block/qcow2-refcount.c
@@ -96,7 +96,8 @@ static int get_refcount(BlockDriverState *bs, int64_t cluster_index)
     refcount_table_index = cluster_index >> (s->cluster_bits - REFCOUNT_SHIFT);
     if (refcount_table_index >= s->refcount_table_size)
         return 0;
-    refcount_block_offset = s->refcount_table[refcount_table_index];
+    refcount_block_offset =
+        s->refcount_table[refcount_table_index] & REFT_OFFSET_MASK;
     if (!refcount_block_offset)
         return 0;
 
diff --git a/block/qcow2.c b/block/qcow2.c
index cfe80befa0..945c9d6334 100644
--- a/block/qcow2.c
+++ b/block/qcow2.c
@@ -644,7 +644,7 @@ static int qcow2_open(BlockDriverState *bs, QDict *options, int flags,
     }
 
     /* Clear unknown autoclear feature bits */
-    if (!bs->read_only && s->autoclear_features != 0) {
+    if (!bs->read_only && !(flags & BDRV_O_INCOMING) && s->autoclear_features) {
         s->autoclear_features = 0;
         ret = qcow2_update_header(bs);
         if (ret < 0) {
@@ -657,7 +657,7 @@ static int qcow2_open(BlockDriverState *bs, QDict *options, int flags,
     qemu_co_mutex_init(&s->lock);
 
     /* Repair image if dirty */
-    if (!(flags & BDRV_O_CHECK) && !bs->read_only &&
+    if (!(flags & (BDRV_O_CHECK | BDRV_O_INCOMING)) && !bs->read_only &&
         (s->incompatible_features & QCOW2_INCOMPAT_DIRTY)) {
         BdrvCheckResult result = {0};
 
@@ -1137,10 +1137,12 @@ static void qcow2_close(BlockDriverState *bs)
     /* else pre-write overlap checks in cache_destroy may crash */
     s->l1_table = NULL;
 
-    qcow2_cache_flush(bs, s->l2_table_cache);
-    qcow2_cache_flush(bs, s->refcount_block_cache);
+    if (!(bs->open_flags & BDRV_O_INCOMING)) {
+        qcow2_cache_flush(bs, s->l2_table_cache);
+        qcow2_cache_flush(bs, s->refcount_block_cache);
 
-    qcow2_mark_clean(bs);
+        qcow2_mark_clean(bs);
+    }
 
     qcow2_cache_destroy(bs, s->l2_table_cache);
     qcow2_cache_destroy(bs, s->refcount_block_cache);
@@ -1176,11 +1178,10 @@ static void qcow2_invalidate_cache(BlockDriverState *bs)
 
     qcow2_close(bs);
 
-    options = qdict_new();
-    qdict_put(options, QCOW2_OPT_LAZY_REFCOUNTS,
-              qbool_from_int(s->use_lazy_refcounts));
+    bdrv_invalidate_cache(bs->file);
 
     memset(s, 0, sizeof(BDRVQcowState));
+    options = qdict_clone_shallow(bs->options);
     qcow2_open(bs, options, flags, NULL);
 
     QDECREF(options);
diff --git a/block/qed.c b/block/qed.c
index 8802ad3845..837accd39b 100644
--- a/block/qed.c
+++ b/block/qed.c
@@ -1563,6 +1563,9 @@ static void bdrv_qed_invalidate_cache(BlockDriverState *bs)
     BDRVQEDState *s = bs->opaque;
 
     bdrv_qed_close(bs);
+
+    bdrv_invalidate_cache(bs->file);
+
     memset(s, 0, sizeof(BDRVQEDState));
     bdrv_qed_open(bs, NULL, bs->open_flags, NULL);
 }
diff --git a/block/quorum.c b/block/quorum.c
index bd997b7322..33bf2ae6a7 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -852,8 +852,6 @@ static BlockDriver bdrv_quorum = {
     .bdrv_file_open     = quorum_open,
     .bdrv_close         = quorum_close,
 
-    .authorizations     = { true, true },
-
     .bdrv_co_flush_to_disk = quorum_co_flush,
 
     .bdrv_getlength     = quorum_getlength,
@@ -862,6 +860,7 @@ static BlockDriver bdrv_quorum = {
     .bdrv_aio_writev    = quorum_aio_writev,
     .bdrv_invalidate_cache = quorum_invalidate_cache,
 
+    .is_filter           = true,
     .bdrv_recurse_is_first_non_filter = quorum_recurse_is_first_non_filter,
 };
 
diff --git a/block/raw-posix.c b/block/raw-posix.c
index e6b4c1fe02..1688e16c64 100644
--- a/block/raw-posix.c
+++ b/block/raw-posix.c
@@ -1561,6 +1561,15 @@ static int check_hdev_writable(BDRVRawState *s)
     return 0;
 }
 
+static void hdev_parse_filename(const char *filename, QDict *options,
+                                Error **errp)
+{
+    /* The prefix is optional, just as for "file". */
+    strstart(filename, "host_device:", &filename);
+
+    qdict_put_obj(options, "filename", QOBJECT(qstring_from_str(filename)));
+}
+
 static int hdev_open(BlockDriverState *bs, QDict *options, int flags,
                      Error **errp)
 {
@@ -1767,6 +1776,18 @@ static int hdev_create(const char *filename, QEMUOptionParameter *options,
     int ret = 0;
     struct stat stat_buf;
     int64_t total_size = 0;
+    bool has_prefix;
+
+    /* This function is used by all three protocol block drivers and therefore
+     * any of these three prefixes may be given.
+     * The return value has to be stored somewhere, otherwise this is an error
+     * due to -Werror=unused-value. */
+    has_prefix =
+        strstart(filename, "host_device:", &filename) ||
+        strstart(filename, "host_cdrom:" , &filename) ||
+        strstart(filename, "host_floppy:", &filename);
+
+    (void)has_prefix;
 
     /* Read out options */
     while (options && options->name) {
@@ -1805,6 +1826,7 @@ static BlockDriver bdrv_host_device = {
     .instance_size      = sizeof(BDRVRawState),
     .bdrv_needs_filename = true,
     .bdrv_probe_device  = hdev_probe_device,
+    .bdrv_parse_filename = hdev_parse_filename,
     .bdrv_file_open     = hdev_open,
     .bdrv_close         = raw_close,
     .bdrv_reopen_prepare = raw_reopen_prepare,
@@ -1834,6 +1856,15 @@ static BlockDriver bdrv_host_device = {
 };
 
 #ifdef __linux__
+static void floppy_parse_filename(const char *filename, QDict *options,
+                                  Error **errp)
+{
+    /* The prefix is optional, just as for "file". */
+    strstart(filename, "host_floppy:", &filename);
+
+    qdict_put_obj(options, "filename", QOBJECT(qstring_from_str(filename)));
+}
+
 static int floppy_open(BlockDriverState *bs, QDict *options, int flags,
                        Error **errp)
 {
@@ -1939,6 +1970,7 @@ static BlockDriver bdrv_host_floppy = {
     .instance_size      = sizeof(BDRVRawState),
     .bdrv_needs_filename = true,
     .bdrv_probe_device	= floppy_probe_device,
+    .bdrv_parse_filename = floppy_parse_filename,
     .bdrv_file_open     = floppy_open,
     .bdrv_close         = raw_close,
     .bdrv_reopen_prepare = raw_reopen_prepare,
@@ -1963,7 +1995,20 @@ static BlockDriver bdrv_host_floppy = {
     .bdrv_media_changed = floppy_media_changed,
     .bdrv_eject         = floppy_eject,
 };
+#endif
+
+#if defined(__linux__) || defined(__FreeBSD__) || defined(__FreeBSD_kernel__)
+static void cdrom_parse_filename(const char *filename, QDict *options,
+                                 Error **errp)
+{
+    /* The prefix is optional, just as for "file". */
+    strstart(filename, "host_cdrom:", &filename);
 
+    qdict_put_obj(options, "filename", QOBJECT(qstring_from_str(filename)));
+}
+#endif
+
+#ifdef __linux__
 static int cdrom_open(BlockDriverState *bs, QDict *options, int flags,
                       Error **errp)
 {
@@ -2050,6 +2095,7 @@ static BlockDriver bdrv_host_cdrom = {
     .instance_size      = sizeof(BDRVRawState),
     .bdrv_needs_filename = true,
     .bdrv_probe_device	= cdrom_probe_device,
+    .bdrv_parse_filename = cdrom_parse_filename,
     .bdrv_file_open     = cdrom_open,
     .bdrv_close         = raw_close,
     .bdrv_reopen_prepare = raw_reopen_prepare,
@@ -2180,6 +2226,7 @@ static BlockDriver bdrv_host_cdrom = {
     .instance_size      = sizeof(BDRVRawState),
     .bdrv_needs_filename = true,
     .bdrv_probe_device	= cdrom_probe_device,
+    .bdrv_parse_filename = cdrom_parse_filename,
     .bdrv_file_open     = cdrom_open,
     .bdrv_close         = raw_close,
     .bdrv_reopen_prepare = raw_reopen_prepare,
diff --git a/block/raw-win32.c b/block/raw-win32.c
index 99547488e4..48cb2c2258 100644
--- a/block/raw-win32.c
+++ b/block/raw-win32.c
@@ -593,6 +593,15 @@ static int hdev_probe_device(const char *filename)
     return 0;
 }
 
+static void hdev_parse_filename(const char *filename, QDict *options,
+                                Error **errp)
+{
+    /* The prefix is optional, just as for "file". */
+    strstart(filename, "host_device:", &filename);
+
+    qdict_put_obj(options, "filename", QOBJECT(qstring_from_str(filename)));
+}
+
 static int hdev_open(BlockDriverState *bs, QDict *options, int flags,
                      Error **errp)
 {
@@ -663,6 +672,7 @@ static BlockDriver bdrv_host_device = {
     .protocol_name	= "host_device",
     .instance_size	= sizeof(BDRVRawState),
     .bdrv_needs_filename = true,
+    .bdrv_parse_filename = hdev_parse_filename,
     .bdrv_probe_device	= hdev_probe_device,
     .bdrv_file_open	= hdev_open,
     .bdrv_close		= raw_close,
diff --git a/hw/block/dataplane/virtio-blk.c b/hw/block/dataplane/virtio-blk.c
index d1c7ad4574..a5afc217c0 100644
--- a/hw/block/dataplane/virtio-blk.c
+++ b/hw/block/dataplane/virtio-blk.c
@@ -23,6 +23,7 @@
 #include "virtio-blk.h"
 #include "block/aio.h"
 #include "hw/virtio/virtio-bus.h"
+#include "monitor/monitor.h" /* for object_add() */
 
 enum {
     SEG_MAX = 126,                  /* maximum number of I/O segments */
@@ -44,8 +45,6 @@ struct VirtIOBlockDataPlane {
     bool started;
     bool starting;
     bool stopping;
-    QEMUBH *start_bh;
-    QemuThread thread;
 
     VirtIOBlkConf *blk;
     int fd;                         /* image file descriptor */
@@ -59,12 +58,14 @@ struct VirtIOBlockDataPlane {
      * (because you don't own the file descriptor or handle; you just
      * use it).
      */
+    IOThread *iothread;
+    bool internal_iothread;
     AioContext *ctx;
     EventNotifier io_notifier;      /* Linux AIO completion */
     EventNotifier host_notifier;    /* doorbell */
 
     IOQueue ioqueue;                /* Linux AIO queue (should really be per
-                                       dataplane thread) */
+                                       IOThread) */
     VirtIOBlockRequest requests[REQ_MAX]; /* pool of requests, managed by the
                                              queue */
 
@@ -342,26 +343,7 @@ static void handle_io(EventNotifier *e)
     }
 }
 
-static void *data_plane_thread(void *opaque)
-{
-    VirtIOBlockDataPlane *s = opaque;
-
-    while (!s->stopping || s->num_reqs > 0) {
-        aio_poll(s->ctx, true);
-    }
-    return NULL;
-}
-
-static void start_data_plane_bh(void *opaque)
-{
-    VirtIOBlockDataPlane *s = opaque;
-
-    qemu_bh_delete(s->start_bh);
-    s->start_bh = NULL;
-    qemu_thread_create(&s->thread, "data_plane", data_plane_thread,
-                       s, QEMU_THREAD_JOINABLE);
-}
-
+/* Context: QEMU global mutex held */
 void virtio_blk_data_plane_create(VirtIODevice *vdev, VirtIOBlkConf *blk,
                                   VirtIOBlockDataPlane **dataplane,
                                   Error **errp)
@@ -408,12 +390,33 @@ void virtio_blk_data_plane_create(VirtIODevice *vdev, VirtIOBlkConf *blk,
     s->fd = fd;
     s->blk = blk;
 
+    if (blk->iothread) {
+        s->internal_iothread = false;
+        s->iothread = blk->iothread;
+        object_ref(OBJECT(s->iothread));
+    } else {
+        /* Create per-device IOThread if none specified */
+        Error *local_err = NULL;
+
+        s->internal_iothread = true;
+        object_add(TYPE_IOTHREAD, vdev->name, NULL, NULL, &local_err);
+        if (error_is_set(&local_err)) {
+            error_propagate(errp, local_err);
+            g_free(s);
+            return;
+        }
+        s->iothread = iothread_find(vdev->name);
+        assert(s->iothread);
+    }
+    s->ctx = iothread_get_aio_context(s->iothread);
+
     /* Prevent block operations that conflict with data plane thread */
     bdrv_set_in_use(blk->conf.bs, 1);
 
     *dataplane = s;
 }
 
+/* Context: QEMU global mutex held */
 void virtio_blk_data_plane_destroy(VirtIOBlockDataPlane *s)
 {
     if (!s) {
@@ -422,9 +425,14 @@ void virtio_blk_data_plane_destroy(VirtIOBlockDataPlane *s)
 
     virtio_blk_data_plane_stop(s);
     bdrv_set_in_use(s->blk->conf.bs, 0);
+    object_unref(OBJECT(s->iothread));
+    if (s->internal_iothread) {
+        object_unparent(OBJECT(s->iothread));
+    }
     g_free(s);
 }
 
+/* Context: QEMU global mutex held */
 void virtio_blk_data_plane_start(VirtIOBlockDataPlane *s)
 {
     BusState *qbus = BUS(qdev_get_parent_bus(DEVICE(s->vdev)));
@@ -448,8 +456,6 @@ void virtio_blk_data_plane_start(VirtIOBlockDataPlane *s)
         return;
     }
 
-    s->ctx = aio_context_new();
-
     /* Set up guest notifier (irq) */
     if (k->set_guest_notifiers(qbus->parent, 1, true) != 0) {
         fprintf(stderr, "virtio-blk failed to set guest notifier, "
@@ -464,7 +470,6 @@ void virtio_blk_data_plane_start(VirtIOBlockDataPlane *s)
         exit(1);
     }
     s->host_notifier = *virtio_queue_get_host_notifier(vq);
-    aio_set_event_notifier(s->ctx, &s->host_notifier, handle_notify);
 
     /* Set up ioqueue */
     ioq_init(&s->ioqueue, s->fd, REQ_MAX);
@@ -472,7 +477,6 @@ void virtio_blk_data_plane_start(VirtIOBlockDataPlane *s)
         ioq_put_iocb(&s->ioqueue, &s->requests[i].iocb);
     }
     s->io_notifier = *ioq_get_notifier(&s->ioqueue);
-    aio_set_event_notifier(s->ctx, &s->io_notifier, handle_io);
 
     s->starting = false;
     s->started = true;
@@ -481,11 +485,14 @@ void virtio_blk_data_plane_start(VirtIOBlockDataPlane *s)
     /* Kick right away to begin processing requests already in vring */
     event_notifier_set(virtio_queue_get_host_notifier(vq));
 
-    /* Spawn thread in BH so it inherits iothread cpusets */
-    s->start_bh = qemu_bh_new(start_data_plane_bh, s);
-    qemu_bh_schedule(s->start_bh);
+    /* Get this show started by hooking up our callbacks */
+    aio_context_acquire(s->ctx);
+    aio_set_event_notifier(s->ctx, &s->host_notifier, handle_notify);
+    aio_set_event_notifier(s->ctx, &s->io_notifier, handle_io);
+    aio_context_release(s->ctx);
 }
 
+/* Context: QEMU global mutex held */
 void virtio_blk_data_plane_stop(VirtIOBlockDataPlane *s)
 {
     BusState *qbus = BUS(qdev_get_parent_bus(DEVICE(s->vdev)));
@@ -496,27 +503,32 @@ void virtio_blk_data_plane_stop(VirtIOBlockDataPlane *s)
     s->stopping = true;
     trace_virtio_blk_data_plane_stop(s);
 
-    /* Stop thread or cancel pending thread creation BH */
-    if (s->start_bh) {
-        qemu_bh_delete(s->start_bh);
-        s->start_bh = NULL;
-    } else {
-        aio_notify(s->ctx);
-        qemu_thread_join(&s->thread);
+    aio_context_acquire(s->ctx);
+
+    /* Stop notifications for new requests from guest */
+    aio_set_event_notifier(s->ctx, &s->host_notifier, NULL);
+
+    /* Complete pending requests */
+    while (s->num_reqs > 0) {
+        aio_poll(s->ctx, true);
     }
 
+    /* Stop ioq callbacks (there are no pending requests left) */
     aio_set_event_notifier(s->ctx, &s->io_notifier, NULL);
-    ioq_cleanup(&s->ioqueue);
 
-    aio_set_event_notifier(s->ctx, &s->host_notifier, NULL);
-    k->set_host_notifier(qbus->parent, 0, false);
+    aio_context_release(s->ctx);
 
-    aio_context_unref(s->ctx);
+    /* Sync vring state back to virtqueue so that non-dataplane request
+     * processing can continue when we disable the host notifier below.
+     */
+    vring_teardown(&s->vring, s->vdev, 0);
+
+    ioq_cleanup(&s->ioqueue);
+    k->set_host_notifier(qbus->parent, 0, false);
 
     /* Clean up guest notifier (irq) */
     k->set_guest_notifiers(qbus->parent, 1, false);
 
-    vring_teardown(&s->vring, s->vdev, 0);
     s->started = false;
     s->stopping = false;
 }
diff --git a/hw/core/qdev-properties-system.c b/hw/core/qdev-properties-system.c
index 5f5957ed8e..de835612f0 100644
--- a/hw/core/qdev-properties-system.c
+++ b/hw/core/qdev-properties-system.c
@@ -18,17 +18,19 @@
 #include "net/hub.h"
 #include "qapi/visitor.h"
 #include "sysemu/char.h"
+#include "sysemu/iothread.h"
 
 static void get_pointer(Object *obj, Visitor *v, Property *prop,
-                        const char *(*print)(void *ptr),
+                        char *(*print)(void *ptr),
                         const char *name, Error **errp)
 {
     DeviceState *dev = DEVICE(obj);
     void **ptr = qdev_get_prop_ptr(dev, prop);
     char *p;
 
-    p = (char *) (*ptr ? print(*ptr) : "");
+    p = *ptr ? print(*ptr) : g_strdup("");
     visit_type_str(v, &p, name, errp);
+    g_free(p);
 }
 
 static void set_pointer(Object *obj, Visitor *v, Property *prop,
@@ -91,9 +93,9 @@ static void release_drive(Object *obj, const char *name, void *opaque)
     }
 }
 
-static const char *print_drive(void *ptr)
+static char *print_drive(void *ptr)
 {
-    return bdrv_get_device_name(ptr);
+    return g_strdup(bdrv_get_device_name(ptr));
 }
 
 static void get_drive(Object *obj, Visitor *v, void *opaque,
@@ -145,11 +147,12 @@ static void release_chr(Object *obj, const char *name, void *opaque)
 }
 
 
-static const char *print_chr(void *ptr)
+static char *print_chr(void *ptr)
 {
     CharDriverState *chr = ptr;
+    const char *val = chr->label ? chr->label : "";
 
-    return chr->label ? chr->label : "";
+    return g_strdup(val);
 }
 
 static void get_chr(Object *obj, Visitor *v, void *opaque,
@@ -224,11 +227,12 @@ err:
     return ret;
 }
 
-static const char *print_netdev(void *ptr)
+static char *print_netdev(void *ptr)
 {
     NetClientState *netdev = ptr;
+    const char *val = netdev->name ? netdev->name : "";
 
-    return netdev->name ? netdev->name : "";
+    return g_strdup(val);
 }
 
 static void get_netdev(Object *obj, Visitor *v, void *opaque,
@@ -382,6 +386,56 @@ void qdev_set_nic_properties(DeviceState *dev, NICInfo *nd)
     nd->instantiated = 1;
 }
 
+/* --- iothread --- */
+
+static char *print_iothread(void *ptr)
+{
+    return iothread_get_id(ptr);
+}
+
+static int parse_iothread(DeviceState *dev, const char *str, void **ptr)
+{
+    IOThread *iothread;
+
+    iothread = iothread_find(str);
+    if (!iothread) {
+        return -ENOENT;
+    }
+    object_ref(OBJECT(iothread));
+    *ptr = iothread;
+    return 0;
+}
+
+static void get_iothread(Object *obj, struct Visitor *v, void *opaque,
+                         const char *name, Error **errp)
+{
+    get_pointer(obj, v, opaque, print_iothread, name, errp);
+}
+
+static void set_iothread(Object *obj, struct Visitor *v, void *opaque,
+                         const char *name, Error **errp)
+{
+    set_pointer(obj, v, opaque, parse_iothread, name, errp);
+}
+
+static void release_iothread(Object *obj, const char *name, void *opaque)
+{
+    DeviceState *dev = DEVICE(obj);
+    Property *prop = opaque;
+    IOThread **ptr = qdev_get_prop_ptr(dev, prop);
+
+    if (*ptr) {
+        object_unref(OBJECT(*ptr));
+    }
+}
+
+PropertyInfo qdev_prop_iothread = {
+    .name = "iothread",
+    .get = get_iothread,
+    .set = set_iothread,
+    .release = release_iothread,
+};
+
 static int qdev_add_one_global(QemuOpts *opts, void *opaque)
 {
     GlobalProperty *g;
diff --git a/include/block/aio.h b/include/block/aio.h
index 2efdf416cf..a92511bd3b 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -19,6 +19,7 @@
 #include "qemu/queue.h"
 #include "qemu/event_notifier.h"
 #include "qemu/thread.h"
+#include "qemu/rfifolock.h"
 #include "qemu/timer.h"
 
 typedef struct BlockDriverAIOCB BlockDriverAIOCB;
@@ -47,6 +48,9 @@ typedef void IOHandler(void *opaque);
 struct AioContext {
     GSource source;
 
+    /* Protects all fields from multi-threaded access */
+    RFifoLock lock;
+
     /* The list of registered AIO handlers */
     QLIST_HEAD(, AioHandler) aio_handlers;
 
@@ -104,6 +108,20 @@ void aio_context_ref(AioContext *ctx);
  */
 void aio_context_unref(AioContext *ctx);
 
+/* Take ownership of the AioContext.  If the AioContext will be shared between
+ * threads, a thread must have ownership when calling aio_poll().
+ *
+ * Note that multiple threads calling aio_poll() means timers, BHs, and
+ * callbacks may be invoked from a different thread than they were registered
+ * from.  Therefore, code must use AioContext acquire/release or use
+ * fine-grained synchronization to protect shared state if other threads will
+ * be accessing it simultaneously.
+ */
+void aio_context_acquire(AioContext *ctx);
+
+/* Relinquish ownership of the AioContext. */
+void aio_context_release(AioContext *ctx);
+
 /**
  * aio_bh_new: Allocate a new bottom half structure.
  *
diff --git a/include/block/block.h b/include/block/block.h
index 780f48b7b3..bd34d14109 100644
--- a/include/block/block.h
+++ b/include/block/block.h
@@ -286,15 +286,6 @@ int bdrv_check(BlockDriverState *bs, BdrvCheckResult *res, BdrvCheckMode fix);
 int bdrv_amend_options(BlockDriverState *bs_new, QEMUOptionParameter *options);
 
 /* external snapshots */
-
-typedef enum {
-    BS_IS_A_FILTER,
-    BS_FILTER_PASS_DOWN,
-    BS_AUTHORIZATION_COUNT,
-} BsAuthorization;
-
-bool bdrv_generic_is_first_non_filter(BlockDriverState *bs,
-                                      BlockDriverState *candidate);
 bool bdrv_recurse_is_first_non_filter(BlockDriverState *bs,
                                       BlockDriverState *candidate);
 bool bdrv_is_first_non_filter(BlockDriverState *candidate);
diff --git a/include/block/block_int.h b/include/block/block_int.h
index 0bcf1c9b8c..4fc5ea8a65 100644
--- a/include/block/block_int.h
+++ b/include/block/block_int.h
@@ -76,10 +76,10 @@ struct BlockDriver {
     const char *format_name;
     int instance_size;
 
-    /* this table of boolean contains authorizations for the block operations */
-    bool authorizations[BS_AUTHORIZATION_COUNT];
-    /* for snapshots complex block filter like Quorum can implement the
-     * following recursive callback instead of BS_IS_A_FILTER.
+    /* set to true if the BlockDriver is a block filter */
+    bool is_filter;
+    /* for snapshots block filter like Quorum can implement the
+     * following recursive callback.
      * It's purpose is to recurse on the filter children while calling
      * bdrv_recurse_is_first_non_filter on them.
      * For a sample implementation look in the future Quorum block filter.
diff --git a/include/hw/qdev-properties.h b/include/hw/qdev-properties.h
index 0c0babfa6a..3c000eea75 100644
--- a/include/hw/qdev-properties.h
+++ b/include/hw/qdev-properties.h
@@ -22,6 +22,7 @@ extern PropertyInfo qdev_prop_bios_chs_trans;
 extern PropertyInfo qdev_prop_drive;
 extern PropertyInfo qdev_prop_netdev;
 extern PropertyInfo qdev_prop_vlan;
+extern PropertyInfo qdev_prop_iothread;
 extern PropertyInfo qdev_prop_pci_devfn;
 extern PropertyInfo qdev_prop_blocksize;
 extern PropertyInfo qdev_prop_pci_host_devaddr;
@@ -142,6 +143,8 @@ extern PropertyInfo qdev_prop_arraylen;
     DEFINE_PROP(_n, _s, _f, qdev_prop_vlan, NICPeers)
 #define DEFINE_PROP_DRIVE(_n, _s, _f) \
     DEFINE_PROP(_n, _s, _f, qdev_prop_drive, BlockDriverState *)
+#define DEFINE_PROP_IOTHREAD(_n, _s, _f)             \
+    DEFINE_PROP(_n, _s, _f, qdev_prop_iothread, IOThread *)
 #define DEFINE_PROP_MACADDR(_n, _s, _f)         \
     DEFINE_PROP(_n, _s, _f, qdev_prop_macaddr, MACAddr)
 #define DEFINE_PROP_LOSTTICKPOLICY(_n, _s, _f, _d) \
diff --git a/include/hw/virtio/virtio-blk.h b/include/hw/virtio/virtio-blk.h
index 41885da1a0..e4c41ff2ef 100644
--- a/include/hw/virtio/virtio-blk.h
+++ b/include/hw/virtio/virtio-blk.h
@@ -16,6 +16,7 @@
 
 #include "hw/virtio/virtio.h"
 #include "hw/block/block.h"
+#include "sysemu/iothread.h"
 
 #define TYPE_VIRTIO_BLK "virtio-blk-device"
 #define VIRTIO_BLK(obj) \
@@ -106,6 +107,7 @@ struct virtio_scsi_inhdr
 struct VirtIOBlkConf
 {
     BlockConf conf;
+    IOThread *iothread;
     char *serial;
     uint32_t scsi;
     uint32_t config_wce;
@@ -140,13 +142,15 @@ typedef struct VirtIOBlock {
         DEFINE_BLOCK_CHS_PROPERTIES(_state, _field.conf),                     \
         DEFINE_PROP_STRING("serial", _state, _field.serial),                  \
         DEFINE_PROP_BIT("config-wce", _state, _field.config_wce, 0, true),    \
-        DEFINE_PROP_BIT("scsi", _state, _field.scsi, 0, true)
+        DEFINE_PROP_BIT("scsi", _state, _field.scsi, 0, true),                \
+        DEFINE_PROP_IOTHREAD("x-iothread", _state, _field.iothread)
 #else
 #define DEFINE_VIRTIO_BLK_PROPERTIES(_state, _field)                          \
         DEFINE_BLOCK_PROPERTIES(_state, _field.conf),                         \
         DEFINE_BLOCK_CHS_PROPERTIES(_state, _field.conf),                     \
         DEFINE_PROP_STRING("serial", _state, _field.serial),                  \
-        DEFINE_PROP_BIT("config-wce", _state, _field.config_wce, 0, true)
+        DEFINE_PROP_BIT("config-wce", _state, _field.config_wce, 0, true),    \
+        DEFINE_PROP_IOTHREAD("x-iothread", _state, _field.iothread)
 #endif /* __linux__ */
 
 void virtio_blk_set_conf(DeviceState *dev, VirtIOBlkConf *blk);
diff --git a/include/qemu-io.h b/include/qemu-io.h
index 7e7c07c09b..5d6006f73b 100644
--- a/include/qemu-io.h
+++ b/include/qemu-io.h
@@ -38,6 +38,8 @@ typedef struct cmdinfo {
     helpfunc_t  help;
 } cmdinfo_t;
 
+extern bool qemuio_misalign;
+
 bool qemuio_command(BlockDriverState *bs, const char *cmd);
 
 void qemuio_add_command(const cmdinfo_t *ci);
diff --git a/include/qemu/rfifolock.h b/include/qemu/rfifolock.h
new file mode 100644
index 0000000000..b23ab538a6
--- /dev/null
+++ b/include/qemu/rfifolock.h
@@ -0,0 +1,54 @@
+/*
+ * Recursive FIFO lock
+ *
+ * Copyright Red Hat, Inc. 2013
+ *
+ * Authors:
+ *  Stefan Hajnoczi   <stefanha@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#ifndef QEMU_RFIFOLOCK_H
+#define QEMU_RFIFOLOCK_H
+
+#include "qemu/thread.h"
+
+/* Recursive FIFO lock
+ *
+ * This lock provides more features than a plain mutex:
+ *
+ * 1. Fairness - enforces FIFO order.
+ * 2. Nesting - can be taken recursively.
+ * 3. Contention callback - optional, called when thread must wait.
+ *
+ * The recursive FIFO lock is heavyweight so prefer other synchronization
+ * primitives if you do not need its features.
+ */
+typedef struct {
+    QemuMutex lock;             /* protects all fields */
+
+    /* FIFO order */
+    unsigned int head;          /* active ticket number */
+    unsigned int tail;          /* waiting ticket number */
+    QemuCond cond;              /* used to wait for our ticket number */
+
+    /* Nesting */
+    QemuThread owner_thread;    /* thread that currently has ownership */
+    unsigned int nesting;       /* amount of nesting levels */
+
+    /* Contention callback */
+    void (*cb)(void *);         /* called when thread must wait, with ->lock
+                                 * held so it may not recursively lock/unlock
+                                 */
+    void *cb_opaque;
+} RFifoLock;
+
+void rfifolock_init(RFifoLock *r, void (*cb)(void *), void *opaque);
+void rfifolock_destroy(RFifoLock *r);
+void rfifolock_lock(RFifoLock *r);
+void rfifolock_unlock(RFifoLock *r);
+
+#endif /* QEMU_RFIFOLOCK_H */
diff --git a/include/qom/object.h b/include/qom/object.h
index 9c7c361d30..4cd77049e4 100644
--- a/include/qom/object.h
+++ b/include/qom/object.h
@@ -974,6 +974,14 @@ const char *object_property_get_type(Object *obj, const char *name,
 Object *object_get_root(void);
 
 /**
+ * object_get_canonical_path_component:
+ *
+ * Returns: The final component in the object's canonical path.  The canonical
+ * path is the path within the composition tree starting from the root.
+ */
+gchar *object_get_canonical_path_component(Object *obj);
+
+/**
  * object_get_canonical_path:
  *
  * Returns: The canonical path for a object.  This is the path within the
diff --git a/include/sysemu/iothread.h b/include/sysemu/iothread.h
new file mode 100644
index 0000000000..a32214a647
--- /dev/null
+++ b/include/sysemu/iothread.h
@@ -0,0 +1,30 @@
+/*
+ * Event loop thread
+ *
+ * Copyright Red Hat Inc., 2013
+ *
+ * Authors:
+ *  Stefan Hajnoczi   <stefanha@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#ifndef IOTHREAD_H
+#define IOTHREAD_H
+
+#include "block/aio.h"
+
+#define TYPE_IOTHREAD "iothread"
+
+typedef struct IOThread IOThread;
+
+#define IOTHREAD(obj) \
+   OBJECT_CHECK(IOThread, obj, TYPE_IOTHREAD)
+
+IOThread *iothread_find(const char *id);
+char *iothread_get_id(IOThread *iothread);
+AioContext *iothread_get_aio_context(IOThread *iothread);
+
+#endif /* IOTHREAD_H */
diff --git a/iothread.c b/iothread.c
new file mode 100644
index 0000000000..cb5986b6c9
--- /dev/null
+++ b/iothread.c
@@ -0,0 +1,178 @@
+/*
+ * Event loop thread
+ *
+ * Copyright Red Hat Inc., 2013
+ *
+ * Authors:
+ *  Stefan Hajnoczi   <stefanha@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include "qom/object.h"
+#include "qom/object_interfaces.h"
+#include "qemu/module.h"
+#include "qemu/thread.h"
+#include "block/aio.h"
+#include "sysemu/iothread.h"
+#include "qmp-commands.h"
+
+#define IOTHREADS_PATH "/objects"
+
+typedef ObjectClass IOThreadClass;
+struct IOThread {
+    Object parent_obj;
+
+    QemuThread thread;
+    AioContext *ctx;
+    QemuMutex init_done_lock;
+    QemuCond init_done_cond;    /* is thread initialization done? */
+    bool stopping;
+    int thread_id;
+};
+
+#define IOTHREAD_GET_CLASS(obj) \
+   OBJECT_GET_CLASS(IOThreadClass, obj, TYPE_IOTHREAD)
+#define IOTHREAD_CLASS(klass) \
+   OBJECT_CLASS_CHECK(IOThreadClass, klass, TYPE_IOTHREAD)
+
+static void *iothread_run(void *opaque)
+{
+    IOThread *iothread = opaque;
+
+    qemu_mutex_lock(&iothread->init_done_lock);
+    iothread->thread_id = qemu_get_thread_id();
+    qemu_cond_signal(&iothread->init_done_cond);
+    qemu_mutex_unlock(&iothread->init_done_lock);
+
+    while (!iothread->stopping) {
+        aio_context_acquire(iothread->ctx);
+        while (!iothread->stopping && aio_poll(iothread->ctx, true)) {
+            /* Progress was made, keep going */
+        }
+        aio_context_release(iothread->ctx);
+    }
+    return NULL;
+}
+
+static void iothread_instance_finalize(Object *obj)
+{
+    IOThread *iothread = IOTHREAD(obj);
+
+    iothread->stopping = true;
+    aio_notify(iothread->ctx);
+    qemu_thread_join(&iothread->thread);
+    qemu_cond_destroy(&iothread->init_done_cond);
+    qemu_mutex_destroy(&iothread->init_done_lock);
+    aio_context_unref(iothread->ctx);
+}
+
+static void iothread_complete(UserCreatable *obj, Error **errp)
+{
+    IOThread *iothread = IOTHREAD(obj);
+
+    iothread->stopping = false;
+    iothread->ctx = aio_context_new();
+    iothread->thread_id = -1;
+
+    qemu_mutex_init(&iothread->init_done_lock);
+    qemu_cond_init(&iothread->init_done_cond);
+
+    /* This assumes we are called from a thread with useful CPU affinity for us
+     * to inherit.
+     */
+    qemu_thread_create(&iothread->thread, "iothread", iothread_run,
+                       iothread, QEMU_THREAD_JOINABLE);
+
+    /* Wait for initialization to complete */
+    qemu_mutex_lock(&iothread->init_done_lock);
+    while (iothread->thread_id == -1) {
+        qemu_cond_wait(&iothread->init_done_cond,
+                       &iothread->init_done_lock);
+    }
+    qemu_mutex_unlock(&iothread->init_done_lock);
+}
+
+static void iothread_class_init(ObjectClass *klass, void *class_data)
+{
+    UserCreatableClass *ucc = USER_CREATABLE_CLASS(klass);
+    ucc->complete = iothread_complete;
+}
+
+static const TypeInfo iothread_info = {
+    .name = TYPE_IOTHREAD,
+    .parent = TYPE_OBJECT,
+    .class_init = iothread_class_init,
+    .instance_size = sizeof(IOThread),
+    .instance_finalize = iothread_instance_finalize,
+    .interfaces = (InterfaceInfo[]) {
+        {TYPE_USER_CREATABLE},
+        {}
+    },
+};
+
+static void iothread_register_types(void)
+{
+    type_register_static(&iothread_info);
+}
+
+type_init(iothread_register_types)
+
+IOThread *iothread_find(const char *id)
+{
+    Object *container = container_get(object_get_root(), IOTHREADS_PATH);
+    Object *child;
+
+    child = object_property_get_link(container, id, NULL);
+    if (!child) {
+        return NULL;
+    }
+    return (IOThread *)object_dynamic_cast(child, TYPE_IOTHREAD);
+}
+
+char *iothread_get_id(IOThread *iothread)
+{
+    return object_get_canonical_path_component(OBJECT(iothread));
+}
+
+AioContext *iothread_get_aio_context(IOThread *iothread)
+{
+    return iothread->ctx;
+}
+
+static int query_one_iothread(Object *object, void *opaque)
+{
+    IOThreadInfoList ***prev = opaque;
+    IOThreadInfoList *elem;
+    IOThreadInfo *info;
+    IOThread *iothread;
+
+    iothread = (IOThread *)object_dynamic_cast(object, TYPE_IOTHREAD);
+    if (!iothread) {
+        return 0;
+    }
+
+    info = g_new0(IOThreadInfo, 1);
+    info->id = iothread_get_id(iothread);
+    info->thread_id = iothread->thread_id;
+
+    elem = g_new0(IOThreadInfoList, 1);
+    elem->value = info;
+    elem->next = NULL;
+
+    **prev = elem;
+    *prev = &elem->next;
+    return 0;
+}
+
+IOThreadInfoList *qmp_query_iothreads(Error **errp)
+{
+    IOThreadInfoList *head = NULL;
+    IOThreadInfoList **prev = &head;
+    Object *container = container_get(object_get_root(), IOTHREADS_PATH);
+
+    object_child_foreach(container, query_one_iothread, &prev);
+    return head;
+}
diff --git a/qapi-schema.json b/qapi-schema.json
index f4f9439fe6..b68cd44ebd 100644
--- a/qapi-schema.json
+++ b/qapi-schema.json
@@ -883,6 +883,35 @@
 { 'command': 'query-cpus', 'returns': ['CpuInfo'] }
 
 ##
+# @IOThreadInfo:
+#
+# Information about an iothread
+#
+# @id: the identifier of the iothread
+#
+# @thread-id: ID of the underlying host thread
+#
+# Since: 2.0
+##
+{ 'type': 'IOThreadInfo',
+  'data': {'id': 'str', 'thread-id': 'int'} }
+
+##
+# @query-iothreads:
+#
+# Returns a list of information about each iothread.
+#
+# Note this list excludes the QEMU main loop thread, which is not declared
+# using the -object iothread command-line option.  It is always the main thread
+# of the process.
+#
+# Returns: a list of @IOThreadInfo for each iothread
+#
+# Since: 2.0
+##
+{ 'command': 'query-iothreads', 'returns': ['IOThreadInfo'] }
+
+##
 # @BlockDeviceInfo:
 #
 # Information about the backing device for a block device.
diff --git a/qemu-io-cmds.c b/qemu-io-cmds.c
index f1de24c91c..fb1db53c6b 100644
--- a/qemu-io-cmds.c
+++ b/qemu-io-cmds.c
@@ -16,7 +16,7 @@
 
 #define CMD_NOFILE_OK   0x01
 
-int qemuio_misalign;
+bool qemuio_misalign;
 
 static cmdinfo_t *cmdtab;
 static int ncmds;
diff --git a/qemu-io.c b/qemu-io.c
index fc3860884c..2d119c28a6 100644
--- a/qemu-io.c
+++ b/qemu-io.c
@@ -24,10 +24,9 @@
 
 #define CMD_NOFILE_OK   0x01
 
-char *progname;
+static char *progname;
 
-BlockDriverState *qemuio_bs;
-extern int qemuio_misalign;
+static BlockDriverState *qemuio_bs;
 
 /* qemu-io commands passed using -c */
 static int ncmdline;
@@ -408,7 +407,7 @@ int main(int argc, char **argv)
             readonly = 1;
             break;
         case 'm':
-            qemuio_misalign = 1;
+            qemuio_misalign = true;
             break;
         case 'g':
             growable = 1;
diff --git a/qmp-commands.hx b/qmp-commands.hx
index d982cd62b9..a22621fd44 100644
--- a/qmp-commands.hx
+++ b/qmp-commands.hx
@@ -2327,6 +2327,45 @@ EQMP
     },
 
 SQMP
+query-iothreads
+---------------
+
+Returns a list of information about each iothread.
+
+Note this list excludes the QEMU main loop thread, which is not declared
+using the -object iothread command-line option.  It is always the main thread
+of the process.
+
+Return a json-array. Each iothread is represented by a json-object, which contains:
+
+- "id": name of iothread (json-str)
+- "thread-id": ID of the underlying host thread (json-int)
+
+Example:
+
+-> { "execute": "query-iothreads" }
+<- {
+      "return":[
+         {
+            "id":"iothread0",
+            "thread-id":3134
+         },
+         {
+            "id":"iothread1",
+            "thread-id":3135
+         }
+      ]
+   }
+
+EQMP
+
+    {
+        .name       = "query-iothreads",
+        .args_type  = "",
+        .mhandler.cmd_new = qmp_marshal_input_query_iothreads,
+    },
+
+SQMP
 query-pci
 ---------
 
diff --git a/qom/object.c b/qom/object.c
index c88909c6b6..a2a1ffa1b3 100644
--- a/qom/object.c
+++ b/qom/object.c
@@ -1102,39 +1102,49 @@ void object_property_add_link(Object *obj, const char *name,
     g_free(full_type);
 }
 
+gchar *object_get_canonical_path_component(Object *obj)
+{
+    ObjectProperty *prop = NULL;
+
+    g_assert(obj);
+    g_assert(obj->parent != NULL);
+
+    QTAILQ_FOREACH(prop, &obj->parent->properties, node) {
+        if (!object_property_is_child(prop)) {
+            continue;
+        }
+
+        if (prop->opaque == obj) {
+            return g_strdup(prop->name);
+        }
+    }
+
+    /* obj had a parent but was not a child, should never happen */
+    g_assert_not_reached();
+    return NULL;
+}
+
 gchar *object_get_canonical_path(Object *obj)
 {
     Object *root = object_get_root();
-    char *newpath = NULL, *path = NULL;
+    char *newpath, *path = NULL;
 
     while (obj != root) {
-        ObjectProperty *prop = NULL;
-
-        g_assert(obj->parent != NULL);
-
-        QTAILQ_FOREACH(prop, &obj->parent->properties, node) {
-            if (!object_property_is_child(prop)) {
-                continue;
-            }
+        char *component = object_get_canonical_path_component(obj);
 
-            if (prop->opaque == obj) {
-                if (path) {
-                    newpath = g_strdup_printf("%s/%s", prop->name, path);
-                    g_free(path);
-                    path = newpath;
-                } else {
-                    path = g_strdup(prop->name);
-                }
-                break;
-            }
+        if (path) {
+            newpath = g_strdup_printf("%s/%s", component, path);
+            g_free(component);
+            g_free(path);
+            path = newpath;
+        } else {
+            path = component;
         }
 
-        g_assert(prop != NULL);
-
         obj = obj->parent;
     }
 
-    newpath = g_strdup_printf("/%s", path);
+    newpath = g_strdup_printf("/%s", path ? path : "");
     g_free(path);
 
     return newpath;
diff --git a/tests/Makefile b/tests/Makefile
index 7bc3999ecc..471b4c8785 100644
--- a/tests/Makefile
+++ b/tests/Makefile
@@ -35,6 +35,7 @@ check-unit-y += tests/test-visitor-serialization$(EXESUF)
 check-unit-y += tests/test-iov$(EXESUF)
 gcov-files-test-iov-y = util/iov.c
 check-unit-y += tests/test-aio$(EXESUF)
+check-unit-y += tests/test-rfifolock$(EXESUF)
 check-unit-y += tests/test-throttle$(EXESUF)
 gcov-files-test-aio-$(CONFIG_WIN32) = aio-win32.c
 gcov-files-test-aio-$(CONFIG_POSIX) = aio-posix.c
@@ -193,6 +194,7 @@ tests/check-qjson$(EXESUF): tests/check-qjson.o libqemuutil.a libqemustub.a
 tests/check-qom-interface$(EXESUF): tests/check-qom-interface.o $(qom-core-obj) libqemuutil.a libqemustub.a
 tests/test-coroutine$(EXESUF): tests/test-coroutine.o $(block-obj-y) libqemuutil.a libqemustub.a
 tests/test-aio$(EXESUF): tests/test-aio.o $(block-obj-y) libqemuutil.a libqemustub.a
+tests/test-rfifolock$(EXESUF): tests/test-rfifolock.o libqemuutil.a libqemustub.a
 tests/test-throttle$(EXESUF): tests/test-throttle.o $(block-obj-y) libqemuutil.a libqemustub.a
 tests/test-thread-pool$(EXESUF): tests/test-thread-pool.o $(block-obj-y) libqemuutil.a libqemustub.a
 tests/test-iov$(EXESUF): tests/test-iov.o libqemuutil.a
diff --git a/tests/qemu-iotests/060 b/tests/qemu-iotests/060
index af8ed9f39a..f0116aab1d 100755
--- a/tests/qemu-iotests/060
+++ b/tests/qemu-iotests/060
@@ -138,6 +138,32 @@ $QEMU_IMG snapshot -a foo "$TEST_IMG"
 _check_test_img
 $QEMU_IO -c "$OPEN_RO" -c "read -P 1 0 512" | _filter_qemu_io
 
+echo
+echo "=== Testing overlap while COW is in flight ==="
+echo
+# compat=0.10 is required in order to make the following discard actually
+# unallocate the sector rather than make it a zero sector - we want COW, after
+# all.
+IMGOPTS='compat=0.10' _make_test_img 1G
+# Write two clusters, the second one enforces creation of an L2 table after
+# the first data cluster.
+$QEMU_IO -c 'write 0k 64k' -c 'write 512M 64k' "$TEST_IMG" | _filter_qemu_io
+# Discard the first cluster. This cluster will soon enough be reallocated and
+# used for COW.
+$QEMU_IO -c 'discard 0k 64k' "$TEST_IMG" | _filter_qemu_io
+# Now, corrupt the image by marking the second L2 table cluster as free.
+poke_file "$TEST_IMG" '131084' "\x00\x00" # 0x2000c
+# Start a write operation requiring COW on the image stopping it right before
+# doing the read; then, trigger the corruption prevention by writing anything to
+# any unallocated cluster, leading to an attempt to overwrite the second L2
+# table. Finally, resume the COW write and see it fail (but not crash).
+echo "open -o file.driver=blkdebug $TEST_IMG
+break cow_read 0
+aio_write 0k 1k
+wait_break 0
+write 64k 64k
+resume 0" | $QEMU_IO | _filter_qemu_io
+
 # success, all done
 echo "*** done"
 rm -f $seq.full
diff --git a/tests/qemu-iotests/060.out b/tests/qemu-iotests/060.out
index 6c7bdbb2f2..a517948036 100644
--- a/tests/qemu-iotests/060.out
+++ b/tests/qemu-iotests/060.out
@@ -78,4 +78,19 @@ read 512/512 bytes at offset 0
 No errors were found on the image.
 read 512/512 bytes at offset 0
 512 bytes, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
+
+=== Testing overlap while COW is in flight ===
+
+Formatting 'TEST_DIR/t.IMGFMT', fmt=IMGFMT size=1073741824 
+wrote 65536/65536 bytes at offset 0
+64 KiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
+wrote 65536/65536 bytes at offset 536870912
+64 KiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
+discard 65536/65536 bytes at offset 0
+64 KiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
+qcow2: Preventing invalid write on metadata (overlaps with active L2 table); image marked as corrupt.
+blkdebug: Suspended request '0'
+write failed: Input/output error
+blkdebug: Resuming request '0'
+aio_write failed: No medium found
 *** done
diff --git a/tests/test-aio.c b/tests/test-aio.c
index 592721ed3f..56f4288ca8 100644
--- a/tests/test-aio.c
+++ b/tests/test-aio.c
@@ -112,6 +112,64 @@ static void test_notify(void)
     g_assert(!aio_poll(ctx, false));
 }
 
+typedef struct {
+    QemuMutex start_lock;
+    bool thread_acquired;
+} AcquireTestData;
+
+static void *test_acquire_thread(void *opaque)
+{
+    AcquireTestData *data = opaque;
+
+    /* Wait for other thread to let us start */
+    qemu_mutex_lock(&data->start_lock);
+    qemu_mutex_unlock(&data->start_lock);
+
+    aio_context_acquire(ctx);
+    aio_context_release(ctx);
+
+    data->thread_acquired = true; /* success, we got here */
+
+    return NULL;
+}
+
+static void dummy_notifier_read(EventNotifier *unused)
+{
+    g_assert(false); /* should never be invoked */
+}
+
+static void test_acquire(void)
+{
+    QemuThread thread;
+    EventNotifier notifier;
+    AcquireTestData data;
+
+    /* Dummy event notifier ensures aio_poll() will block */
+    event_notifier_init(&notifier, false);
+    aio_set_event_notifier(ctx, &notifier, dummy_notifier_read);
+    g_assert(!aio_poll(ctx, false)); /* consume aio_notify() */
+
+    qemu_mutex_init(&data.start_lock);
+    qemu_mutex_lock(&data.start_lock);
+    data.thread_acquired = false;
+
+    qemu_thread_create(&thread, "test_acquire_thread",
+                       test_acquire_thread,
+                       &data, QEMU_THREAD_JOINABLE);
+
+    /* Block in aio_poll(), let other thread kick us and acquire context */
+    aio_context_acquire(ctx);
+    qemu_mutex_unlock(&data.start_lock); /* let the thread run */
+    g_assert(!aio_poll(ctx, true));
+    aio_context_release(ctx);
+
+    qemu_thread_join(&thread);
+    aio_set_event_notifier(ctx, &notifier, NULL);
+    event_notifier_cleanup(&notifier);
+
+    g_assert(data.thread_acquired);
+}
+
 static void test_bh_schedule(void)
 {
     BHTestData data = { .n = 0 };
@@ -775,6 +833,7 @@ int main(int argc, char **argv)
 
     g_test_init(&argc, &argv, NULL);
     g_test_add_func("/aio/notify",                  test_notify);
+    g_test_add_func("/aio/acquire",                 test_acquire);
     g_test_add_func("/aio/bh/schedule",             test_bh_schedule);
     g_test_add_func("/aio/bh/schedule10",           test_bh_schedule10);
     g_test_add_func("/aio/bh/cancel",               test_bh_cancel);
diff --git a/tests/test-rfifolock.c b/tests/test-rfifolock.c
new file mode 100644
index 0000000000..0572ebb42a
--- /dev/null
+++ b/tests/test-rfifolock.c
@@ -0,0 +1,91 @@
+/*
+ * RFifoLock tests
+ *
+ * Copyright Red Hat, Inc. 2013
+ *
+ * Authors:
+ *  Stefan Hajnoczi    <stefanha@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ */
+
+#include <glib.h>
+#include "qemu-common.h"
+#include "qemu/rfifolock.h"
+
+static void test_nesting(void)
+{
+    RFifoLock lock;
+
+    /* Trivial test, ensure the lock is recursive */
+    rfifolock_init(&lock, NULL, NULL);
+    rfifolock_lock(&lock);
+    rfifolock_lock(&lock);
+    rfifolock_lock(&lock);
+    rfifolock_unlock(&lock);
+    rfifolock_unlock(&lock);
+    rfifolock_unlock(&lock);
+    rfifolock_destroy(&lock);
+}
+
+typedef struct {
+    RFifoLock lock;
+    int fd[2];
+} CallbackTestData;
+
+static void rfifolock_cb(void *opaque)
+{
+    CallbackTestData *data = opaque;
+    int ret;
+    char c = 0;
+
+    ret = write(data->fd[1], &c, sizeof(c));
+    g_assert(ret == 1);
+}
+
+static void *callback_thread(void *opaque)
+{
+    CallbackTestData *data = opaque;
+
+    /* The other thread holds the lock so the contention callback will be
+     * invoked...
+     */
+    rfifolock_lock(&data->lock);
+    rfifolock_unlock(&data->lock);
+    return NULL;
+}
+
+static void test_callback(void)
+{
+    CallbackTestData data;
+    QemuThread thread;
+    int ret;
+    char c;
+
+    rfifolock_init(&data.lock, rfifolock_cb, &data);
+    ret = qemu_pipe(data.fd);
+    g_assert(ret == 0);
+
+    /* Hold lock but allow the callback to kick us by writing to the pipe */
+    rfifolock_lock(&data.lock);
+    qemu_thread_create(&thread, "callback_thread",
+                       callback_thread, &data, QEMU_THREAD_JOINABLE);
+    ret = read(data.fd[0], &c, sizeof(c));
+    g_assert(ret == 1);
+    rfifolock_unlock(&data.lock);
+    /* If we got here then the callback was invoked, as expected */
+
+    qemu_thread_join(&thread);
+    close(data.fd[0]);
+    close(data.fd[1]);
+    rfifolock_destroy(&data.lock);
+}
+
+int main(int argc, char **argv)
+{
+    g_test_init(&argc, &argv, NULL);
+    g_test_add_func("/nesting", test_nesting);
+    g_test_add_func("/callback", test_callback);
+    return g_test_run();
+}
diff --git a/util/Makefile.objs b/util/Makefile.objs
index 937376b082..df83b629a0 100644
--- a/util/Makefile.objs
+++ b/util/Makefile.objs
@@ -14,3 +14,4 @@ util-obj-y += crc32c.o
 util-obj-y += throttle.o
 util-obj-y += getauxval.o
 util-obj-y += readline.o
+util-obj-y += rfifolock.o
diff --git a/util/rfifolock.c b/util/rfifolock.c
new file mode 100644
index 0000000000..afbf7488df
--- /dev/null
+++ b/util/rfifolock.c
@@ -0,0 +1,78 @@
+/*
+ * Recursive FIFO lock
+ *
+ * Copyright Red Hat, Inc. 2013
+ *
+ * Authors:
+ *  Stefan Hajnoczi   <stefanha@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ *
+ */
+
+#include <assert.h>
+#include "qemu/rfifolock.h"
+
+void rfifolock_init(RFifoLock *r, void (*cb)(void *), void *opaque)
+{
+    qemu_mutex_init(&r->lock);
+    r->head = 0;
+    r->tail = 0;
+    qemu_cond_init(&r->cond);
+    r->nesting = 0;
+    r->cb = cb;
+    r->cb_opaque = opaque;
+}
+
+void rfifolock_destroy(RFifoLock *r)
+{
+    qemu_cond_destroy(&r->cond);
+    qemu_mutex_destroy(&r->lock);
+}
+
+/*
+ * Theory of operation:
+ *
+ * In order to ensure FIFO ordering, implement a ticketlock.  Threads acquiring
+ * the lock enqueue themselves by incrementing the tail index.  When the lock
+ * is unlocked, the head is incremented and waiting threads are notified.
+ *
+ * Recursive locking does not take a ticket since the head is only incremented
+ * when the outermost recursive caller unlocks.
+ */
+void rfifolock_lock(RFifoLock *r)
+{
+    qemu_mutex_lock(&r->lock);
+
+    /* Take a ticket */
+    unsigned int ticket = r->tail++;
+
+    if (r->nesting > 0 && qemu_thread_is_self(&r->owner_thread)) {
+        r->tail--; /* put ticket back, we're nesting */
+    } else {
+        while (ticket != r->head) {
+            /* Invoke optional contention callback */
+            if (r->cb) {
+                r->cb(r->cb_opaque);
+            }
+            qemu_cond_wait(&r->cond, &r->lock);
+        }
+    }
+
+    qemu_thread_get_self(&r->owner_thread);
+    r->nesting++;
+    qemu_mutex_unlock(&r->lock);
+}
+
+void rfifolock_unlock(RFifoLock *r)
+{
+    qemu_mutex_lock(&r->lock);
+    assert(r->nesting > 0);
+    assert(qemu_thread_is_self(&r->owner_thread));
+    if (--r->nesting == 0) {
+        r->head++;
+        qemu_cond_broadcast(&r->cond);
+    }
+    qemu_mutex_unlock(&r->lock);
+}