summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--aio-posix.c116
-rw-r--r--aio-win32.c83
-rw-r--r--async.c45
-rw-r--r--block/io.c41
-rw-r--r--docs/lockcnt.txt277
-rw-r--r--docs/multiple-iothreads.txt13
-rw-r--r--include/block/aio.h38
-rw-r--r--include/block/block.h2
-rw-r--r--include/block/block_int.h3
-rw-r--r--include/qemu/futex.h36
-rw-r--r--include/qemu/thread.h112
-rw-r--r--util/Makefile.objs1
-rw-r--r--util/lockcnt.c397
-rw-r--r--util/qemu-thread-posix.c35
-rw-r--r--util/qemu-thread-win32.c2
-rw-r--r--util/trace-events10
16 files changed, 1009 insertions, 202 deletions
diff --git a/aio-posix.c b/aio-posix.c
index 15855715d4..9453d83743 100644
--- a/aio-posix.c
+++ b/aio-posix.c
@@ -16,7 +16,7 @@
 #include "qemu/osdep.h"
 #include "qemu-common.h"
 #include "block/block.h"
-#include "qemu/queue.h"
+#include "qemu/rcu_queue.h"
 #include "qemu/sockets.h"
 #include "qemu/cutils.h"
 #include "trace.h"
@@ -66,7 +66,7 @@ static bool aio_epoll_try_enable(AioContext *ctx)
     AioHandler *node;
     struct epoll_event event;
 
-    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
         int r;
         if (node->deleted || !node->pfd.events) {
             continue;
@@ -212,24 +212,27 @@ void aio_set_fd_handler(AioContext *ctx,
     bool is_new = false;
     bool deleted = false;
 
+    qemu_lockcnt_lock(&ctx->list_lock);
+
     node = find_aio_handler(ctx, fd);
 
     /* Are we deleting the fd handler? */
     if (!io_read && !io_write && !io_poll) {
         if (node == NULL) {
+            qemu_lockcnt_unlock(&ctx->list_lock);
             return;
         }
 
         g_source_remove_poll(&ctx->source, &node->pfd);
 
         /* If the lock is held, just mark the node as deleted */
-        if (ctx->walking_handlers) {
+        if (qemu_lockcnt_count(&ctx->list_lock)) {
             node->deleted = 1;
             node->pfd.revents = 0;
         } else {
             /* Otherwise, delete it for real.  We can't just mark it as
-             * deleted because deleted nodes are only cleaned up after
-             * releasing the walking_handlers lock.
+             * deleted because deleted nodes are only cleaned up while
+             * no one is walking the handlers list.
              */
             QLIST_REMOVE(node, node);
             deleted = true;
@@ -243,7 +246,7 @@ void aio_set_fd_handler(AioContext *ctx,
             /* Alloc and insert if it's not already there */
             node = g_new0(AioHandler, 1);
             node->pfd.fd = fd;
-            QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
+            QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
 
             g_source_add_poll(&ctx->source, &node->pfd);
             is_new = true;
@@ -265,6 +268,7 @@ void aio_set_fd_handler(AioContext *ctx,
     }
 
     aio_epoll_update(ctx, node, is_new);
+    qemu_lockcnt_unlock(&ctx->list_lock);
     aio_notify(ctx);
 
     if (deleted) {
@@ -316,8 +320,8 @@ static void poll_set_started(AioContext *ctx, bool started)
 
     ctx->poll_started = started;
 
-    ctx->walking_handlers++;
-    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+    qemu_lockcnt_inc(&ctx->list_lock);
+    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
         IOHandler *fn;
 
         if (node->deleted) {
@@ -334,7 +338,7 @@ static void poll_set_started(AioContext *ctx, bool started)
             fn(node->opaque);
         }
     }
-    ctx->walking_handlers--;
+    qemu_lockcnt_dec(&ctx->list_lock);
 }
 
 
@@ -349,54 +353,47 @@ bool aio_prepare(AioContext *ctx)
 bool aio_pending(AioContext *ctx)
 {
     AioHandler *node;
+    bool result = false;
 
-    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+    /*
+     * We have to walk very carefully in case aio_set_fd_handler is
+     * called while we're walking.
+     */
+    qemu_lockcnt_inc(&ctx->list_lock);
+
+    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
         int revents;
 
         revents = node->pfd.revents & node->pfd.events;
         if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read &&
             aio_node_check(ctx, node->is_external)) {
-            return true;
+            result = true;
+            break;
         }
         if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write &&
             aio_node_check(ctx, node->is_external)) {
-            return true;
+            result = true;
+            break;
         }
     }
+    qemu_lockcnt_dec(&ctx->list_lock);
 
-    return false;
+    return result;
 }
 
-/*
- * Note that dispatch_fds == false has the side-effect of post-poning the
- * freeing of deleted handlers.
- */
-bool aio_dispatch(AioContext *ctx, bool dispatch_fds)
+static bool aio_dispatch_handlers(AioContext *ctx)
 {
-    AioHandler *node = NULL;
+    AioHandler *node, *tmp;
     bool progress = false;
 
     /*
-     * If there are callbacks left that have been queued, we need to call them.
-     * Do not call select in this case, because it is possible that the caller
-     * does not need a complete flush (as is the case for aio_poll loops).
-     */
-    if (aio_bh_poll(ctx)) {
-        progress = true;
-    }
-
-    /*
      * We have to walk very carefully in case aio_set_fd_handler is
      * called while we're walking.
      */
-    if (dispatch_fds) {
-        node = QLIST_FIRST(&ctx->aio_handlers);
-    }
-    while (node) {
-        AioHandler *tmp;
-        int revents;
+    qemu_lockcnt_inc(&ctx->list_lock);
 
-        ctx->walking_handlers++;
+    QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
+        int revents;
 
         revents = node->pfd.revents & node->pfd.events;
         node->pfd.revents = 0;
@@ -420,15 +417,36 @@ bool aio_dispatch(AioContext *ctx, bool dispatch_fds)
             progress = true;
         }
 
-        tmp = node;
-        node = QLIST_NEXT(node, node);
+        if (node->deleted) {
+            if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
+                QLIST_REMOVE(node, node);
+                g_free(node);
+                qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
+            }
+        }
+    }
 
-        ctx->walking_handlers--;
+    qemu_lockcnt_dec(&ctx->list_lock);
+    return progress;
+}
 
-        if (!ctx->walking_handlers && tmp->deleted) {
-            QLIST_REMOVE(tmp, node);
-            g_free(tmp);
-        }
+/*
+ * Note that dispatch_fds == false has the side-effect of post-poning the
+ * freeing of deleted handlers.
+ */
+bool aio_dispatch(AioContext *ctx, bool dispatch_fds)
+{
+    bool progress;
+
+    /*
+     * If there are callbacks left that have been queued, we need to call them.
+     * Do not call select in this case, because it is possible that the caller
+     * does not need a complete flush (as is the case for aio_poll loops).
+     */
+    progress = aio_bh_poll(ctx);
+
+    if (dispatch_fds) {
+        progress |= aio_dispatch_handlers(ctx);
     }
 
     /* Run our timers */
@@ -488,7 +506,7 @@ static bool run_poll_handlers_once(AioContext *ctx)
     bool progress = false;
     AioHandler *node;
 
-    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
         if (!node->deleted && node->io_poll &&
                 node->io_poll(node->opaque)) {
             progress = true;
@@ -509,7 +527,7 @@ static bool run_poll_handlers_once(AioContext *ctx)
  * Note that ctx->notify_me must be non-zero so this function can detect
  * aio_notify().
  *
- * Note that the caller must have incremented ctx->walking_handlers.
+ * Note that the caller must have incremented ctx->list_lock.
  *
  * Returns: true if progress was made, false otherwise
  */
@@ -519,7 +537,7 @@ static bool run_poll_handlers(AioContext *ctx, int64_t max_ns)
     int64_t end_time;
 
     assert(ctx->notify_me);
-    assert(ctx->walking_handlers > 0);
+    assert(qemu_lockcnt_count(&ctx->list_lock) > 0);
     assert(ctx->poll_disable_cnt == 0);
 
     trace_run_poll_handlers_begin(ctx, max_ns);
@@ -541,7 +559,7 @@ static bool run_poll_handlers(AioContext *ctx, int64_t max_ns)
  *
  * ctx->notify_me must be non-zero so this function can detect aio_notify().
  *
- * Note that the caller must have incremented ctx->walking_handlers.
+ * Note that the caller must have incremented ctx->list_lock.
  *
  * Returns: true if progress was made, false otherwise
  */
@@ -592,7 +610,7 @@ bool aio_poll(AioContext *ctx, bool blocking)
         atomic_add(&ctx->notify_me, 2);
     }
 
-    ctx->walking_handlers++;
+    qemu_lockcnt_inc(&ctx->list_lock);
 
     if (ctx->poll_max_ns) {
         start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
@@ -606,7 +624,7 @@ bool aio_poll(AioContext *ctx, bool blocking)
         /* fill pollfds */
 
         if (!aio_epoll_enabled(ctx)) {
-            QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+            QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
                 if (!node->deleted && node->pfd.events
                     && aio_node_check(ctx, node->is_external)) {
                     add_pollfd(node);
@@ -691,7 +709,7 @@ bool aio_poll(AioContext *ctx, bool blocking)
     }
 
     npfd = 0;
-    ctx->walking_handlers--;
+    qemu_lockcnt_dec(&ctx->list_lock);
 
     /* Run dispatch even if there were no readable fds to run timers */
     if (aio_dispatch(ctx, ret > 0)) {
diff --git a/aio-win32.c b/aio-win32.c
index d19dc429d8..900524c9c2 100644
--- a/aio-win32.c
+++ b/aio-win32.c
@@ -21,6 +21,7 @@
 #include "qemu/queue.h"
 #include "qemu/sockets.h"
 #include "qapi/error.h"
+#include "qemu/rcu_queue.h"
 
 struct AioHandler {
     EventNotifier *e;
@@ -45,6 +46,7 @@ void aio_set_fd_handler(AioContext *ctx,
     /* fd is a SOCKET in our case */
     AioHandler *node;
 
+    qemu_lockcnt_lock(&ctx->list_lock);
     QLIST_FOREACH(node, &ctx->aio_handlers, node) {
         if (node->pfd.fd == fd && !node->deleted) {
             break;
@@ -54,14 +56,14 @@ void aio_set_fd_handler(AioContext *ctx,
     /* Are we deleting the fd handler? */
     if (!io_read && !io_write) {
         if (node) {
-            /* If the lock is held, just mark the node as deleted */
-            if (ctx->walking_handlers) {
+            /* If aio_poll is in progress, just mark the node as deleted */
+            if (qemu_lockcnt_count(&ctx->list_lock)) {
                 node->deleted = 1;
                 node->pfd.revents = 0;
             } else {
                 /* Otherwise, delete it for real.  We can't just mark it as
                  * deleted because deleted nodes are only cleaned up after
-                 * releasing the walking_handlers lock.
+                 * releasing the list_lock.
                  */
                 QLIST_REMOVE(node, node);
                 g_free(node);
@@ -74,7 +76,7 @@ void aio_set_fd_handler(AioContext *ctx,
             /* Alloc and insert if it's not already there */
             node = g_new0(AioHandler, 1);
             node->pfd.fd = fd;
-            QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
+            QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
         }
 
         node->pfd.events = 0;
@@ -99,6 +101,7 @@ void aio_set_fd_handler(AioContext *ctx,
                        FD_CONNECT | FD_WRITE | FD_OOB);
     }
 
+    qemu_lockcnt_unlock(&ctx->list_lock);
     aio_notify(ctx);
 }
 
@@ -117,6 +120,7 @@ void aio_set_event_notifier(AioContext *ctx,
 {
     AioHandler *node;
 
+    qemu_lockcnt_lock(&ctx->list_lock);
     QLIST_FOREACH(node, &ctx->aio_handlers, node) {
         if (node->e == e && !node->deleted) {
             break;
@@ -128,14 +132,14 @@ void aio_set_event_notifier(AioContext *ctx,
         if (node) {
             g_source_remove_poll(&ctx->source, &node->pfd);
 
-            /* If the lock is held, just mark the node as deleted */
-            if (ctx->walking_handlers) {
+            /* aio_poll is in progress, just mark the node as deleted */
+            if (qemu_lockcnt_count(&ctx->list_lock)) {
                 node->deleted = 1;
                 node->pfd.revents = 0;
             } else {
                 /* Otherwise, delete it for real.  We can't just mark it as
                  * deleted because deleted nodes are only cleaned up after
-                 * releasing the walking_handlers lock.
+                 * releasing the list_lock.
                  */
                 QLIST_REMOVE(node, node);
                 g_free(node);
@@ -149,7 +153,7 @@ void aio_set_event_notifier(AioContext *ctx,
             node->pfd.fd = (uintptr_t)event_notifier_get_handle(e);
             node->pfd.events = G_IO_IN;
             node->is_external = is_external;
-            QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
+            QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
 
             g_source_add_poll(&ctx->source, &node->pfd);
         }
@@ -157,6 +161,7 @@ void aio_set_event_notifier(AioContext *ctx,
         node->io_notify = io_notify;
     }
 
+    qemu_lockcnt_unlock(&ctx->list_lock);
     aio_notify(ctx);
 }
 
@@ -175,10 +180,16 @@ bool aio_prepare(AioContext *ctx)
     bool have_select_revents = false;
     fd_set rfds, wfds;
 
+    /*
+     * We have to walk very carefully in case aio_set_fd_handler is
+     * called while we're walking.
+     */
+    qemu_lockcnt_inc(&ctx->list_lock);
+
     /* fill fd sets */
     FD_ZERO(&rfds);
     FD_ZERO(&wfds);
-    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
         if (node->io_read) {
             FD_SET ((SOCKET)node->pfd.fd, &rfds);
         }
@@ -188,7 +199,7 @@ bool aio_prepare(AioContext *ctx)
     }
 
     if (select(0, &rfds, &wfds, NULL, &tv0) > 0) {
-        QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+        QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
             node->pfd.revents = 0;
             if (FD_ISSET(node->pfd.fd, &rfds)) {
                 node->pfd.revents |= G_IO_IN;
@@ -202,45 +213,55 @@ bool aio_prepare(AioContext *ctx)
         }
     }
 
+    qemu_lockcnt_dec(&ctx->list_lock);
     return have_select_revents;
 }
 
 bool aio_pending(AioContext *ctx)
 {
     AioHandler *node;
+    bool result = false;
 
-    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+    /*
+     * We have to walk very carefully in case aio_set_fd_handler is
+     * called while we're walking.
+     */
+    qemu_lockcnt_inc(&ctx->list_lock);
+    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
         if (node->pfd.revents && node->io_notify) {
-            return true;
+            result = true;
+            break;
         }
 
         if ((node->pfd.revents & G_IO_IN) && node->io_read) {
-            return true;
+            result = true;
+            break;
         }
         if ((node->pfd.revents & G_IO_OUT) && node->io_write) {
-            return true;
+            result = true;
+            break;
         }
     }
 
-    return false;
+    qemu_lockcnt_dec(&ctx->list_lock);
+    return result;
 }
 
 static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
 {
     AioHandler *node;
     bool progress = false;
+    AioHandler *tmp;
+
+    qemu_lockcnt_inc(&ctx->list_lock);
 
     /*
      * We have to walk very carefully in case aio_set_fd_handler is
      * called while we're walking.
      */
-    node = QLIST_FIRST(&ctx->aio_handlers);
-    while (node) {
-        AioHandler *tmp;
+    QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
         int revents = node->pfd.revents;
 
-        ctx->walking_handlers++;
-
         if (!node->deleted &&
             (revents || event_notifier_get_handle(node->e) == event) &&
             node->io_notify) {
@@ -275,17 +296,16 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
             }
         }
 
-        tmp = node;
-        node = QLIST_NEXT(node, node);
-
-        ctx->walking_handlers--;
-
-        if (!ctx->walking_handlers && tmp->deleted) {
-            QLIST_REMOVE(tmp, node);
-            g_free(tmp);
+        if (node->deleted) {
+            if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
+                QLIST_REMOVE(node, node);
+                g_free(node);
+                qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
+            }
         }
     }
 
+    qemu_lockcnt_dec(&ctx->list_lock);
     return progress;
 }
 
@@ -323,20 +343,19 @@ bool aio_poll(AioContext *ctx, bool blocking)
         atomic_add(&ctx->notify_me, 2);
     }
 
+    qemu_lockcnt_inc(&ctx->list_lock);
     have_select_revents = aio_prepare(ctx);
 
-    ctx->walking_handlers++;
-
     /* fill fd sets */
     count = 0;
-    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
         if (!node->deleted && node->io_notify
             && aio_node_check(ctx, node->is_external)) {
             events[count++] = event_notifier_get_handle(node->e);
         }
     }
 
-    ctx->walking_handlers--;
+    qemu_lockcnt_dec(&ctx->list_lock);
     first = true;
 
     /* ctx->notifier is always registered.  */
diff --git a/async.c b/async.c
index 2960171834..0d218ab0e0 100644
--- a/async.c
+++ b/async.c
@@ -53,14 +53,14 @@ void aio_bh_schedule_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
         .cb = cb,
         .opaque = opaque,
     };
-    qemu_mutex_lock(&ctx->bh_lock);
+    qemu_lockcnt_lock(&ctx->list_lock);
     bh->next = ctx->first_bh;
     bh->scheduled = 1;
     bh->deleted = 1;
     /* Make sure that the members are ready before putting bh into list */
     smp_wmb();
     ctx->first_bh = bh;
-    qemu_mutex_unlock(&ctx->bh_lock);
+    qemu_lockcnt_unlock(&ctx->list_lock);
     aio_notify(ctx);
 }
 
@@ -73,12 +73,12 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
         .cb = cb,
         .opaque = opaque,
     };
-    qemu_mutex_lock(&ctx->bh_lock);
+    qemu_lockcnt_lock(&ctx->list_lock);
     bh->next = ctx->first_bh;
     /* Make sure that the members are ready before putting bh into list */
     smp_wmb();
     ctx->first_bh = bh;
-    qemu_mutex_unlock(&ctx->bh_lock);
+    qemu_lockcnt_unlock(&ctx->list_lock);
     return bh;
 }
 
@@ -92,14 +92,13 @@ int aio_bh_poll(AioContext *ctx)
 {
     QEMUBH *bh, **bhp, *next;
     int ret;
+    bool deleted = false;
 
-    ctx->walking_bh++;
+    qemu_lockcnt_inc(&ctx->list_lock);
 
     ret = 0;
-    for (bh = ctx->first_bh; bh; bh = next) {
-        /* Make sure that fetching bh happens before accessing its members */
-        smp_read_barrier_depends();
-        next = bh->next;
+    for (bh = atomic_rcu_read(&ctx->first_bh); bh; bh = next) {
+        next = atomic_rcu_read(&bh->next);
         /* The atomic_xchg is paired with the one in qemu_bh_schedule.  The
          * implicit memory barrier ensures that the callback sees all writes
          * done by the scheduling thread.  It also ensures that the scheduling
@@ -114,13 +113,18 @@ int aio_bh_poll(AioContext *ctx)
             bh->idle = 0;
             aio_bh_call(bh);
         }
+        if (bh->deleted) {
+            deleted = true;
+        }
     }
 
-    ctx->walking_bh--;
-
     /* remove deleted bhs */
-    if (!ctx->walking_bh) {
-        qemu_mutex_lock(&ctx->bh_lock);
+    if (!deleted) {
+        qemu_lockcnt_dec(&ctx->list_lock);
+        return ret;
+    }
+
+    if (qemu_lockcnt_dec_and_lock(&ctx->list_lock)) {
         bhp = &ctx->first_bh;
         while (*bhp) {
             bh = *bhp;
@@ -131,9 +135,8 @@ int aio_bh_poll(AioContext *ctx)
                 bhp = &bh->next;
             }
         }
-        qemu_mutex_unlock(&ctx->bh_lock);
+        qemu_lockcnt_unlock(&ctx->list_lock);
     }
-
     return ret;
 }
 
@@ -187,7 +190,8 @@ aio_compute_timeout(AioContext *ctx)
     int timeout = -1;
     QEMUBH *bh;
 
-    for (bh = ctx->first_bh; bh; bh = bh->next) {
+    for (bh = atomic_rcu_read(&ctx->first_bh); bh;
+         bh = atomic_rcu_read(&bh->next)) {
         if (bh->scheduled) {
             if (bh->idle) {
                 /* idle bottom halves will be polled at least
@@ -270,7 +274,8 @@ aio_ctx_finalize(GSource     *source)
     }
 #endif
 
-    qemu_mutex_lock(&ctx->bh_lock);
+    qemu_lockcnt_lock(&ctx->list_lock);
+    assert(!qemu_lockcnt_count(&ctx->list_lock));
     while (ctx->first_bh) {
         QEMUBH *next = ctx->first_bh->next;
 
@@ -280,12 +285,12 @@ aio_ctx_finalize(GSource     *source)
         g_free(ctx->first_bh);
         ctx->first_bh = next;
     }
-    qemu_mutex_unlock(&ctx->bh_lock);
+    qemu_lockcnt_unlock(&ctx->list_lock);
 
     aio_set_event_notifier(ctx, &ctx->notifier, false, NULL, NULL);
     event_notifier_cleanup(&ctx->notifier);
     qemu_rec_mutex_destroy(&ctx->lock);
-    qemu_mutex_destroy(&ctx->bh_lock);
+    qemu_lockcnt_destroy(&ctx->list_lock);
     timerlistgroup_deinit(&ctx->tlg);
 }
 
@@ -372,6 +377,7 @@ AioContext *aio_context_new(Error **errp)
         goto fail;
     }
     g_source_set_can_recurse(&ctx->source, true);
+    qemu_lockcnt_init(&ctx->list_lock);
     aio_set_event_notifier(ctx, &ctx->notifier,
                            false,
                            (EventNotifierHandler *)
@@ -381,7 +387,6 @@ AioContext *aio_context_new(Error **errp)
     ctx->linux_aio = NULL;
 #endif
     ctx->thread_pool = NULL;
-    qemu_mutex_init(&ctx->bh_lock);
     qemu_rec_mutex_init(&ctx->lock);
     timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
 
diff --git a/block/io.c b/block/io.c
index 4f005623f7..c42b34a965 100644
--- a/block/io.c
+++ b/block/io.c
@@ -228,9 +228,7 @@ void bdrv_drained_begin(BlockDriverState *bs)
         bdrv_parent_drained_begin(bs);
     }
 
-    bdrv_io_unplugged_begin(bs);
     bdrv_drain_recurse(bs);
-    bdrv_io_unplugged_end(bs);
 }
 
 void bdrv_drained_end(BlockDriverState *bs)
@@ -302,7 +300,6 @@ void bdrv_drain_all_begin(void)
 
         aio_context_acquire(aio_context);
         bdrv_parent_drained_begin(bs);
-        bdrv_io_unplugged_begin(bs);
         aio_disable_external(aio_context);
         aio_context_release(aio_context);
 
@@ -347,7 +344,6 @@ void bdrv_drain_all_end(void)
 
         aio_context_acquire(aio_context);
         aio_enable_external(aio_context);
-        bdrv_io_unplugged_end(bs);
         bdrv_parent_drained_end(bs);
         aio_context_release(aio_context);
     }
@@ -2650,7 +2646,7 @@ void bdrv_io_plug(BlockDriverState *bs)
         bdrv_io_plug(child->bs);
     }
 
-    if (bs->io_plugged++ == 0 && bs->io_plug_disabled == 0) {
+    if (bs->io_plugged++ == 0) {
         BlockDriver *drv = bs->drv;
         if (drv && drv->bdrv_io_plug) {
             drv->bdrv_io_plug(bs);
@@ -2663,7 +2659,7 @@ void bdrv_io_unplug(BlockDriverState *bs)
     BdrvChild *child;
 
     assert(bs->io_plugged);
-    if (--bs->io_plugged == 0 && bs->io_plug_disabled == 0) {
+    if (--bs->io_plugged == 0) {
         BlockDriver *drv = bs->drv;
         if (drv && drv->bdrv_io_unplug) {
             drv->bdrv_io_unplug(bs);
@@ -2674,36 +2670,3 @@ void bdrv_io_unplug(BlockDriverState *bs)
         bdrv_io_unplug(child->bs);
     }
 }
-
-void bdrv_io_unplugged_begin(BlockDriverState *bs)
-{
-    BdrvChild *child;
-
-    if (bs->io_plug_disabled++ == 0 && bs->io_plugged > 0) {
-        BlockDriver *drv = bs->drv;
-        if (drv && drv->bdrv_io_unplug) {
-            drv->bdrv_io_unplug(bs);
-        }
-    }
-
-    QLIST_FOREACH(child, &bs->children, next) {
-        bdrv_io_unplugged_begin(child->bs);
-    }
-}
-
-void bdrv_io_unplugged_end(BlockDriverState *bs)
-{
-    BdrvChild *child;
-
-    assert(bs->io_plug_disabled);
-    QLIST_FOREACH(child, &bs->children, next) {
-        bdrv_io_unplugged_end(child->bs);
-    }
-
-    if (--bs->io_plug_disabled == 0 && bs->io_plugged > 0) {
-        BlockDriver *drv = bs->drv;
-        if (drv && drv->bdrv_io_plug) {
-            drv->bdrv_io_plug(bs);
-        }
-    }
-}
diff --git a/docs/lockcnt.txt b/docs/lockcnt.txt
new file mode 100644
index 0000000000..2a79b3205b
--- /dev/null
+++ b/docs/lockcnt.txt
@@ -0,0 +1,277 @@
+DOCUMENTATION FOR LOCKED COUNTERS (aka QemuLockCnt)
+===================================================
+
+QEMU often uses reference counts to track data structures that are being
+accessed and should not be freed.  For example, a loop that invoke
+callbacks like this is not safe:
+
+    QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
+        if (ioh->revents & G_IO_OUT) {
+            ioh->fd_write(ioh->opaque);
+        }
+    }
+
+QLIST_FOREACH_SAFE protects against deletion of the current node (ioh)
+by stashing away its "next" pointer.  However, ioh->fd_write could
+actually delete the next node from the list.  The simplest way to
+avoid this is to mark the node as deleted, and remove it from the
+list in the above loop:
+
+    QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
+        if (ioh->deleted) {
+            QLIST_REMOVE(ioh, next);
+            g_free(ioh);
+        } else {
+            if (ioh->revents & G_IO_OUT) {
+                ioh->fd_write(ioh->opaque);
+            }
+        }
+    }
+
+If however this loop must also be reentrant, i.e. it is possible that
+ioh->fd_write invokes the loop again, some kind of counting is needed:
+
+    walking_handlers++;
+    QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
+        if (ioh->deleted) {
+            if (walking_handlers == 1) {
+                QLIST_REMOVE(ioh, next);
+                g_free(ioh);
+            }
+        } else {
+            if (ioh->revents & G_IO_OUT) {
+                ioh->fd_write(ioh->opaque);
+            }
+        }
+    }
+    walking_handlers--;
+
+One may think of using the RCU primitives, rcu_read_lock() and
+rcu_read_unlock(); effectively, the RCU nesting count would take
+the place of the walking_handlers global variable.  Indeed,
+reference counting and RCU have similar purposes, but their usage in
+general is complementary:
+
+- reference counting is fine-grained and limited to a single data
+  structure; RCU delays reclamation of *all* RCU-protected data
+  structures;
+
+- reference counting works even in the presence of code that keeps
+  a reference for a long time; RCU critical sections in principle
+  should be kept short;
+
+- reference counting is often applied to code that is not thread-safe
+  but is reentrant; in fact, usage of reference counting in QEMU predates
+  the introduction of threads by many years.  RCU is generally used to
+  protect readers from other threads freeing memory after concurrent
+  modifications to a data structure.
+
+- reclaiming data can be done by a separate thread in the case of RCU;
+  this can improve performance, but also delay reclamation undesirably.
+  With reference counting, reclamation is deterministic.
+
+This file documents QemuLockCnt, an abstraction for using reference
+counting in code that has to be both thread-safe and reentrant.
+
+
+QemuLockCnt concepts
+--------------------
+
+A QemuLockCnt comprises both a counter and a mutex; it has primitives
+to increment and decrement the counter, and to take and release the
+mutex.  The counter notes how many visits to the data structures are
+taking place (the visits could be from different threads, or there could
+be multiple reentrant visits from the same thread).  The basic rules
+governing the counter/mutex pair then are the following:
+
+- Data protected by the QemuLockCnt must not be freed unless the
+  counter is zero and the mutex is taken.
+
+- A new visit cannot be started while the counter is zero and the
+  mutex is taken.
+
+Most of the time, the mutex protects all writes to the data structure,
+not just frees, though there could be cases where this is not necessary.
+
+Reads, instead, can be done without taking the mutex, as long as the
+readers and writers use the same macros that are used for RCU, for
+example atomic_rcu_read, atomic_rcu_set, QLIST_FOREACH_RCU, etc.  This is
+because the reads are done outside a lock and a set or QLIST_INSERT_HEAD
+can happen concurrently with the read.  The RCU API ensures that the
+processor and the compiler see all required memory barriers.
+
+This could be implemented simply by protecting the counter with the
+mutex, for example:
+
+    // (1)
+    qemu_mutex_lock(&walking_handlers_mutex);
+    walking_handlers++;
+    qemu_mutex_unlock(&walking_handlers_mutex);
+
+    ...
+
+    // (2)
+    qemu_mutex_lock(&walking_handlers_mutex);
+    if (--walking_handlers == 0) {
+        QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
+            if (ioh->deleted) {
+                QLIST_REMOVE(ioh, next);
+                g_free(ioh);
+            }
+        }
+    }
+    qemu_mutex_unlock(&walking_handlers_mutex);
+
+Here, no frees can happen in the code represented by the ellipsis.
+If another thread is executing critical section (2), that part of
+the code cannot be entered, because the thread will not be able
+to increment the walking_handlers variable.  And of course
+during the visit any other thread will see a nonzero value for
+walking_handlers, as in the single-threaded code.
+
+Note that it is possible for multiple concurrent accesses to delay
+the cleanup arbitrarily; in other words, for the walking_handlers
+counter to never become zero.  For this reason, this technique is
+more easily applicable if concurrent access to the structure is rare.
+
+However, critical sections are easy to forget since you have to do
+them for each modification of the counter.  QemuLockCnt ensures that
+all modifications of the counter take the lock appropriately, and it
+can also be more efficient in two ways:
+
+- it avoids taking the lock for many operations (for example
+  incrementing the counter while it is non-zero);
+
+- on some platforms, one can implement QemuLockCnt to hold the lock
+  and the mutex in a single word, making the fast path no more expensive
+  than simply managing a counter using atomic operations (see
+  docs/atomics.txt).  This can be very helpful if concurrent access to
+  the data structure is expected to be rare.
+
+
+Using the same mutex for frees and writes can still incur some small
+inefficiencies; for example, a visit can never start if the counter is
+zero and the mutex is taken---even if the mutex is taken by a write,
+which in principle need not block a visit of the data structure.
+However, these are usually not a problem if any of the following
+assumptions are valid:
+
+- concurrent access is possible but rare
+
+- writes are rare
+
+- writes are frequent, but this kind of write (e.g. appending to a
+  list) has a very small critical section.
+
+For example, QEMU uses QemuLockCnt to manage an AioContext's list of
+bottom halves and file descriptor handlers.  Modifications to the list
+of file descriptor handlers are rare.  Creation of a new bottom half is
+frequent and can happen on a fast path; however: 1) it is almost never
+concurrent with a visit to the list of bottom halves; 2) it only has
+three instructions in the critical path, two assignments and a smp_wmb().
+
+
+QemuLockCnt API
+---------------
+
+The QemuLockCnt API is described in include/qemu/thread.h.
+
+
+QemuLockCnt usage
+-----------------
+
+This section explains the typical usage patterns for QemuLockCnt functions.
+
+Setting a variable to a non-NULL value can be done between
+qemu_lockcnt_lock and qemu_lockcnt_unlock:
+
+    qemu_lockcnt_lock(&xyz_lockcnt);
+    if (!xyz) {
+        new_xyz = g_new(XYZ, 1);
+        ...
+        atomic_rcu_set(&xyz, new_xyz);
+    }
+    qemu_lockcnt_unlock(&xyz_lockcnt);
+
+Accessing the value can be done between qemu_lockcnt_inc and
+qemu_lockcnt_dec:
+
+    qemu_lockcnt_inc(&xyz_lockcnt);
+    if (xyz) {
+        XYZ *p = atomic_rcu_read(&xyz);
+        ...
+        /* Accesses can now be done through "p".  */
+    }
+    qemu_lockcnt_dec(&xyz_lockcnt);
+
+Freeing the object can similarly use qemu_lockcnt_lock and
+qemu_lockcnt_unlock, but you also need to ensure that the count
+is zero (i.e. there is no concurrent visit).  Because qemu_lockcnt_inc
+takes the QemuLockCnt's lock, the count cannot become non-zero while
+the object is being freed.  Freeing an object looks like this:
+
+    qemu_lockcnt_lock(&xyz_lockcnt);
+    if (!qemu_lockcnt_count(&xyz_lockcnt)) {
+        g_free(xyz);
+        xyz = NULL;
+    }
+    qemu_lockcnt_unlock(&xyz_lockcnt);
+
+If an object has to be freed right after a visit, you can combine
+the decrement, the locking and the check on count as follows:
+
+    qemu_lockcnt_inc(&xyz_lockcnt);
+    if (xyz) {
+        XYZ *p = atomic_rcu_read(&xyz);
+        ...
+        /* Accesses can now be done through "p".  */
+    }
+    if (qemu_lockcnt_dec_and_lock(&xyz_lockcnt)) {
+        g_free(xyz);
+        xyz = NULL;
+        qemu_lockcnt_unlock(&xyz_lockcnt);
+    }
+
+QemuLockCnt can also be used to access a list as follows:
+
+    qemu_lockcnt_inc(&io_handlers_lockcnt);
+    QLIST_FOREACH_RCU(ioh, &io_handlers, pioh) {
+        if (ioh->revents & G_IO_OUT) {
+            ioh->fd_write(ioh->opaque);
+        }
+    }
+
+    if (qemu_lockcnt_dec_and_lock(&io_handlers_lockcnt)) {
+        QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
+            if (ioh->deleted) {
+                QLIST_REMOVE(ioh, next);
+                g_free(ioh);
+            }
+        }
+        qemu_lockcnt_unlock(&io_handlers_lockcnt);
+    }
+
+Again, the RCU primitives are used because new items can be added to the
+list during the walk.  QLIST_FOREACH_RCU ensures that the processor and
+the compiler see the appropriate memory barriers.
+
+An alternative pattern uses qemu_lockcnt_dec_if_lock:
+
+    qemu_lockcnt_inc(&io_handlers_lockcnt);
+    QLIST_FOREACH_SAFE_RCU(ioh, &io_handlers, next, pioh) {
+        if (ioh->deleted) {
+            if (qemu_lockcnt_dec_if_lock(&io_handlers_lockcnt)) {
+                QLIST_REMOVE(ioh, next);
+                g_free(ioh);
+                qemu_lockcnt_inc_and_unlock(&io_handlers_lockcnt);
+            }
+        } else {
+            if (ioh->revents & G_IO_OUT) {
+                ioh->fd_write(ioh->opaque);
+            }
+        }
+    }
+    qemu_lockcnt_dec(&io_handlers_lockcnt);
+
+Here you can use qemu_lockcnt_dec instead of qemu_lockcnt_dec_and_lock,
+because there is no special task to do if the count goes from 1 to 0.
diff --git a/docs/multiple-iothreads.txt b/docs/multiple-iothreads.txt
index 0e7cdb2c28..e4d340bbb7 100644
--- a/docs/multiple-iothreads.txt
+++ b/docs/multiple-iothreads.txt
@@ -84,9 +84,8 @@ How to synchronize with an IOThread
 AioContext is not thread-safe so some rules must be followed when using file
 descriptors, event notifiers, timers, or BHs across threads:
 
-1. AioContext functions can be called safely from file descriptor, event
-notifier, timer, or BH callbacks invoked by the AioContext.  No locking is
-necessary.
+1. AioContext functions can always be called safely.  They handle their
+own locking internally.
 
 2. Other threads wishing to access the AioContext must use
 aio_context_acquire()/aio_context_release() for mutual exclusion.  Once the
@@ -94,16 +93,14 @@ context is acquired no other thread can access it or run event loop iterations
 in this AioContext.
 
 aio_context_acquire()/aio_context_release() calls may be nested.  This
-means you can call them if you're not sure whether #1 applies.
+means you can call them if you're not sure whether #2 applies.
 
 There is currently no lock ordering rule if a thread needs to acquire multiple
 AioContexts simultaneously.  Therefore, it is only safe for code holding the
 QEMU global mutex to acquire other AioContexts.
 
-Side note: the best way to schedule a function call across threads is to create
-a BH in the target AioContext beforehand and then call qemu_bh_schedule().  No
-acquire/release or locking is needed for the qemu_bh_schedule() call.  But be
-sure to acquire the AioContext for aio_bh_new() if necessary.
+Side note: the best way to schedule a function call across threads is to call
+aio_bh_schedule_oneshot().  No acquire/release or locking is needed.
 
 AioContext and the block layer
 ------------------------------
diff --git a/include/block/aio.h b/include/block/aio.h
index 4dca54d9c7..7df271d2b9 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -53,18 +53,12 @@ struct LinuxAioState;
 struct AioContext {
     GSource source;
 
-    /* Protects all fields from multi-threaded access */
+    /* Used by AioContext users to protect from multi-threaded access.  */
     QemuRecMutex lock;
 
-    /* The list of registered AIO handlers */
+    /* The list of registered AIO handlers.  Protected by ctx->list_lock. */
     QLIST_HEAD(, AioHandler) aio_handlers;
 
-    /* This is a simple lock used to protect the aio_handlers list.
-     * Specifically, it's used to ensure that no callbacks are removed while
-     * we're walking and dispatching callbacks.
-     */
-    int walking_handlers;
-
     /* Used to avoid unnecessary event_notifier_set calls in aio_notify;
      * accessed with atomic primitives.  If this field is 0, everything
      * (file descriptors, bottom halves, timers) will be re-evaluated
@@ -90,17 +84,15 @@ struct AioContext {
      */
     uint32_t notify_me;
 
-    /* lock to protect between bh's adders and deleter */
-    QemuMutex bh_lock;
+    /* A lock to protect between QEMUBH and AioHandler adders and deleter,
+     * and to ensure that no callbacks are removed while we're walking and
+     * dispatching them.
+     */
+    QemuLockCnt list_lock;
 
     /* Anchor of the list of Bottom Halves belonging to the context */
     struct QEMUBH *first_bh;
 
-    /* A simple lock used to protect the first_bh list, and ensure that
-     * no callbacks are removed while we're walking and dispatching callbacks.
-     */
-    int walking_bh;
-
     /* Used by aio_notify.
      *
      * "notified" is used to avoid expensive event_notifier_test_and_clear
@@ -116,7 +108,9 @@ struct AioContext {
     bool notified;
     EventNotifier notifier;
 
-    /* Thread pool for performing work and receiving completion callbacks */
+    /* Thread pool for performing work and receiving completion callbacks.
+     * Has its own locking.
+     */
     struct ThreadPool *thread_pool;
 
 #ifdef CONFIG_LINUX_AIO
@@ -126,7 +120,9 @@ struct AioContext {
     struct LinuxAioState *linux_aio;
 #endif
 
-    /* TimerLists for calling timers - one per clock type */
+    /* TimerLists for calling timers - one per clock type.  Has its own
+     * locking.
+     */
     QEMUTimerListGroup tlg;
 
     int external_disable_cnt;
@@ -180,9 +176,11 @@ void aio_context_unref(AioContext *ctx);
  * automatically takes care of calling aio_context_acquire and
  * aio_context_release.
  *
- * Access to timers and BHs from a thread that has not acquired AioContext
- * is possible.  Access to callbacks for now must be done while the AioContext
- * is owned by the thread (FIXME).
+ * Note that this is separate from bdrv_drained_begin/bdrv_drained_end.  A
+ * thread still has to call those to avoid being interrupted by the guest.
+ *
+ * Bottom halves, timers and callbacks can be created or removed without
+ * acquiring the AioContext.
  */
 void aio_context_acquire(AioContext *ctx);
 
diff --git a/include/block/block.h b/include/block/block.h
index 49bb0b239a..8b0dcdaa70 100644
--- a/include/block/block.h
+++ b/include/block/block.h
@@ -526,8 +526,6 @@ int bdrv_probe_geometry(BlockDriverState *bs, HDGeometry *geo);
 
 void bdrv_io_plug(BlockDriverState *bs);
 void bdrv_io_unplug(BlockDriverState *bs);
-void bdrv_io_unplugged_begin(BlockDriverState *bs);
-void bdrv_io_unplugged_end(BlockDriverState *bs);
 
 /**
  * bdrv_drained_begin:
diff --git a/include/block/block_int.h b/include/block/block_int.h
index 4e4562d444..2d92d7edfe 100644
--- a/include/block/block_int.h
+++ b/include/block/block_int.h
@@ -526,9 +526,8 @@ struct BlockDriverState {
     uint64_t write_threshold_offset;
     NotifierWithReturn write_threshold_notifier;
 
-    /* counters for nested bdrv_io_plug and bdrv_io_unplugged_begin */
+    /* counter for nested bdrv_io_plug */
     unsigned io_plugged;
-    unsigned io_plug_disabled;
 
     int quiesce_counter;
 };
diff --git a/include/qemu/futex.h b/include/qemu/futex.h
new file mode 100644
index 0000000000..bb7dc9e296
--- /dev/null
+++ b/include/qemu/futex.h
@@ -0,0 +1,36 @@
+/*
+ * Wrappers around Linux futex syscall
+ *
+ * Copyright Red Hat, Inc. 2017
+ *
+ * Author:
+ *  Paolo Bonzini <pbonzini@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include <sys/syscall.h>
+#include <linux/futex.h>
+
+#define qemu_futex(...)              syscall(__NR_futex, __VA_ARGS__)
+
+static inline void qemu_futex_wake(void *f, int n)
+{
+    qemu_futex(f, FUTEX_WAKE, n, NULL, NULL, 0);
+}
+
+static inline void qemu_futex_wait(void *f, unsigned val)
+{
+    while (qemu_futex(f, FUTEX_WAIT, (int) val, NULL, NULL, 0)) {
+        switch (errno) {
+        case EWOULDBLOCK:
+            return;
+        case EINTR:
+            break; /* get out of switch and retry */
+        default:
+            abort();
+        }
+    }
+}
diff --git a/include/qemu/thread.h b/include/qemu/thread.h
index e8e665f020..9910f49b3a 100644
--- a/include/qemu/thread.h
+++ b/include/qemu/thread.h
@@ -8,6 +8,7 @@ typedef struct QemuMutex QemuMutex;
 typedef struct QemuCond QemuCond;
 typedef struct QemuSemaphore QemuSemaphore;
 typedef struct QemuEvent QemuEvent;
+typedef struct QemuLockCnt QemuLockCnt;
 typedef struct QemuThread QemuThread;
 
 #ifdef _WIN32
@@ -98,4 +99,115 @@ static inline void qemu_spin_unlock(QemuSpin *spin)
     __sync_lock_release(&spin->value);
 }
 
+struct QemuLockCnt {
+#ifndef CONFIG_LINUX
+    QemuMutex mutex;
+#endif
+    unsigned count;
+};
+
+/**
+ * qemu_lockcnt_init: initialize a QemuLockcnt
+ * @lockcnt: the lockcnt to initialize
+ *
+ * Initialize lockcnt's counter to zero and prepare its mutex
+ * for usage.
+ */
+void qemu_lockcnt_init(QemuLockCnt *lockcnt);
+
+/**
+ * qemu_lockcnt_destroy: destroy a QemuLockcnt
+ * @lockcnt: the lockcnt to destruct
+ *
+ * Destroy lockcnt's mutex.
+ */
+void qemu_lockcnt_destroy(QemuLockCnt *lockcnt);
+
+/**
+ * qemu_lockcnt_inc: increment a QemuLockCnt's counter
+ * @lockcnt: the lockcnt to operate on
+ *
+ * If the lockcnt's count is zero, wait for critical sections
+ * to finish and increment lockcnt's count to 1.  If the count
+ * is not zero, just increment it.
+ *
+ * Because this function can wait on the mutex, it must not be
+ * called while the lockcnt's mutex is held by the current thread.
+ * For the same reason, qemu_lockcnt_inc can also contribute to
+ * AB-BA deadlocks.  This is a sample deadlock scenario:
+ *
+ *            thread 1                      thread 2
+ *            -------------------------------------------------------
+ *            qemu_lockcnt_lock(&lc1);
+ *                                          qemu_lockcnt_lock(&lc2);
+ *            qemu_lockcnt_inc(&lc2);
+ *                                          qemu_lockcnt_inc(&lc1);
+ */
+void qemu_lockcnt_inc(QemuLockCnt *lockcnt);
+
+/**
+ * qemu_lockcnt_dec: decrement a QemuLockCnt's counter
+ * @lockcnt: the lockcnt to operate on
+ */
+void qemu_lockcnt_dec(QemuLockCnt *lockcnt);
+
+/**
+ * qemu_lockcnt_dec_and_lock: decrement a QemuLockCnt's counter and
+ * possibly lock it.
+ * @lockcnt: the lockcnt to operate on
+ *
+ * Decrement lockcnt's count.  If the new count is zero, lock
+ * the mutex and return true.  Otherwise, return false.
+ */
+bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt);
+
+/**
+ * qemu_lockcnt_dec_if_lock: possibly decrement a QemuLockCnt's counter and
+ * lock it.
+ * @lockcnt: the lockcnt to operate on
+ *
+ * If the count is 1, decrement the count to zero, lock
+ * the mutex and return true.  Otherwise, return false.
+ */
+bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt);
+
+/**
+ * qemu_lockcnt_lock: lock a QemuLockCnt's mutex.
+ * @lockcnt: the lockcnt to operate on
+ *
+ * Remember that concurrent visits are not blocked unless the count is
+ * also zero.  You can use qemu_lockcnt_count to check for this inside a
+ * critical section.
+ */
+void qemu_lockcnt_lock(QemuLockCnt *lockcnt);
+
+/**
+ * qemu_lockcnt_unlock: release a QemuLockCnt's mutex.
+ * @lockcnt: the lockcnt to operate on.
+ */
+void qemu_lockcnt_unlock(QemuLockCnt *lockcnt);
+
+/**
+ * qemu_lockcnt_inc_and_unlock: combined unlock/increment on a QemuLockCnt.
+ * @lockcnt: the lockcnt to operate on.
+ *
+ * This is the same as
+ *
+ *     qemu_lockcnt_unlock(lockcnt);
+ *     qemu_lockcnt_inc(lockcnt);
+ *
+ * but more efficient.
+ */
+void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt);
+
+/**
+ * qemu_lockcnt_count: query a LockCnt's count.
+ * @lockcnt: the lockcnt to query.
+ *
+ * Note that the count can change at any time.  Still, while the
+ * lockcnt is locked, one can usefully check whether the count
+ * is non-zero.
+ */
+unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt);
+
 #endif
diff --git a/util/Makefile.objs b/util/Makefile.objs
index ad0f9c7fe4..c1f247d675 100644
--- a/util/Makefile.objs
+++ b/util/Makefile.objs
@@ -1,5 +1,6 @@
 util-obj-y = osdep.o cutils.o unicode.o qemu-timer-common.o
 util-obj-y += bufferiszero.o
+util-obj-y += lockcnt.o
 util-obj-$(CONFIG_POSIX) += compatfd.o
 util-obj-$(CONFIG_POSIX) += event_notifier-posix.o
 util-obj-$(CONFIG_POSIX) += mmap-alloc.o
diff --git a/util/lockcnt.c b/util/lockcnt.c
new file mode 100644
index 0000000000..4f88dcf8b8
--- /dev/null
+++ b/util/lockcnt.c
@@ -0,0 +1,397 @@
+/*
+ * QemuLockCnt implementation
+ *
+ * Copyright Red Hat, Inc. 2017
+ *
+ * Author:
+ *   Paolo Bonzini <pbonzini@redhat.com>
+ */
+#include "qemu/osdep.h"
+#include "qemu/thread.h"
+#include "qemu/atomic.h"
+#include "trace.h"
+
+#ifdef CONFIG_LINUX
+#include "qemu/futex.h"
+
+/* On Linux, bits 0-1 are a futex-based lock, bits 2-31 are the counter.
+ * For the mutex algorithm see Ulrich Drepper's "Futexes Are Tricky" (ok,
+ * this is not the most relaxing citation I could make...).  It is similar
+ * to mutex2 in the paper.
+ */
+
+#define QEMU_LOCKCNT_STATE_MASK    3
+#define QEMU_LOCKCNT_STATE_FREE    0   /* free, uncontended */
+#define QEMU_LOCKCNT_STATE_LOCKED  1   /* locked, uncontended */
+#define QEMU_LOCKCNT_STATE_WAITING 2   /* locked, contended */
+
+#define QEMU_LOCKCNT_COUNT_STEP    4
+#define QEMU_LOCKCNT_COUNT_SHIFT   2
+
+void qemu_lockcnt_init(QemuLockCnt *lockcnt)
+{
+    lockcnt->count = 0;
+}
+
+void qemu_lockcnt_destroy(QemuLockCnt *lockcnt)
+{
+}
+
+/* *val is the current value of lockcnt->count.
+ *
+ * If the lock is free, try a cmpxchg from *val to new_if_free; return
+ * true and set *val to the old value found by the cmpxchg in
+ * lockcnt->count.
+ *
+ * If the lock is taken, wait for it to be released and return false
+ * *without trying again to take the lock*.  Again, set *val to the
+ * new value of lockcnt->count.
+ *
+ * If *waited is true on return, new_if_free's bottom two bits must not
+ * be QEMU_LOCKCNT_STATE_LOCKED on subsequent calls, because the caller
+ * does not know if there are other waiters.  Furthermore, after *waited
+ * is set the caller has effectively acquired the lock.  If it returns
+ * with the lock not taken, it must wake another futex waiter.
+ */
+static bool qemu_lockcnt_cmpxchg_or_wait(QemuLockCnt *lockcnt, int *val,
+                                         int new_if_free, bool *waited)
+{
+    /* Fast path for when the lock is free.  */
+    if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_FREE) {
+        int expected = *val;
+
+        trace_lockcnt_fast_path_attempt(lockcnt, expected, new_if_free);
+        *val = atomic_cmpxchg(&lockcnt->count, expected, new_if_free);
+        if (*val == expected) {
+            trace_lockcnt_fast_path_success(lockcnt, expected, new_if_free);
+            *val = new_if_free;
+            return true;
+        }
+    }
+
+    /* The slow path moves from locked to waiting if necessary, then
+     * does a futex wait.  Both steps can be repeated ad nauseam,
+     * only getting out of the loop if we can have another shot at the
+     * fast path.  Once we can, get out to compute the new destination
+     * value for the fast path.
+     */
+    while ((*val & QEMU_LOCKCNT_STATE_MASK) != QEMU_LOCKCNT_STATE_FREE) {
+        if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_LOCKED) {
+            int expected = *val;
+            int new = expected - QEMU_LOCKCNT_STATE_LOCKED + QEMU_LOCKCNT_STATE_WAITING;
+
+            trace_lockcnt_futex_wait_prepare(lockcnt, expected, new);
+            *val = atomic_cmpxchg(&lockcnt->count, expected, new);
+            if (*val == expected) {
+                *val = new;
+            }
+            continue;
+        }
+
+        if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_WAITING) {
+            *waited = true;
+            trace_lockcnt_futex_wait(lockcnt, *val);
+            qemu_futex_wait(&lockcnt->count, *val);
+            *val = atomic_read(&lockcnt->count);
+            trace_lockcnt_futex_wait_resume(lockcnt, *val);
+            continue;
+        }
+
+        abort();
+    }
+    return false;
+}
+
+static void lockcnt_wake(QemuLockCnt *lockcnt)
+{
+    trace_lockcnt_futex_wake(lockcnt);
+    qemu_futex_wake(&lockcnt->count, 1);
+}
+
+void qemu_lockcnt_inc(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    bool waited = false;
+
+    for (;;) {
+        if (val >= QEMU_LOCKCNT_COUNT_STEP) {
+            int expected = val;
+            val = atomic_cmpxchg(&lockcnt->count, val, val + QEMU_LOCKCNT_COUNT_STEP);
+            if (val == expected) {
+                break;
+            }
+        } else {
+            /* The fast path is (0, unlocked)->(1, unlocked).  */
+            if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, QEMU_LOCKCNT_COUNT_STEP,
+                                             &waited)) {
+                break;
+            }
+        }
+    }
+
+    /* If we were woken by another thread, we should also wake one because
+     * we are effectively releasing the lock that was given to us.  This is
+     * the case where qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING
+     * in the low bits, and qemu_lockcnt_inc_and_unlock would find it and
+     * wake someone.
+     */
+    if (waited) {
+        lockcnt_wake(lockcnt);
+    }
+}
+
+void qemu_lockcnt_dec(QemuLockCnt *lockcnt)
+{
+    atomic_sub(&lockcnt->count, QEMU_LOCKCNT_COUNT_STEP);
+}
+
+/* Decrement a counter, and return locked if it is decremented to zero.
+ * If the function returns true, it is impossible for the counter to
+ * become nonzero until the next qemu_lockcnt_unlock.
+ */
+bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    int locked_state = QEMU_LOCKCNT_STATE_LOCKED;
+    bool waited = false;
+
+    for (;;) {
+        if (val >= 2 * QEMU_LOCKCNT_COUNT_STEP) {
+            int expected = val;
+            val = atomic_cmpxchg(&lockcnt->count, val, val - QEMU_LOCKCNT_COUNT_STEP);
+            if (val == expected) {
+                break;
+            }
+        } else {
+            /* If count is going 1->0, take the lock. The fast path is
+             * (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting).
+             */
+            if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state, &waited)) {
+                return true;
+            }
+
+            if (waited) {
+                /* At this point we do not know if there are more waiters.  Assume
+                 * there are.
+                 */
+                locked_state = QEMU_LOCKCNT_STATE_WAITING;
+            }
+        }
+    }
+
+    /* If we were woken by another thread, but we're returning in unlocked
+     * state, we should also wake a thread because we are effectively
+     * releasing the lock that was given to us.  This is the case where
+     * qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low
+     * bits, and qemu_lockcnt_unlock would find it and wake someone.
+     */
+    if (waited) {
+        lockcnt_wake(lockcnt);
+    }
+    return false;
+}
+
+/* If the counter is one, decrement it and return locked.  Otherwise do
+ * nothing.
+ *
+ * If the function returns true, it is impossible for the counter to
+ * become nonzero until the next qemu_lockcnt_unlock.
+ */
+bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    int locked_state = QEMU_LOCKCNT_STATE_LOCKED;
+    bool waited = false;
+
+    while (val < 2 * QEMU_LOCKCNT_COUNT_STEP) {
+        /* If count is going 1->0, take the lock. The fast path is
+         * (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting).
+         */
+        if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state, &waited)) {
+            return true;
+        }
+
+        if (waited) {
+            /* At this point we do not know if there are more waiters.  Assume
+             * there are.
+             */
+            locked_state = QEMU_LOCKCNT_STATE_WAITING;
+        }
+    }
+
+    /* If we were woken by another thread, but we're returning in unlocked
+     * state, we should also wake a thread because we are effectively
+     * releasing the lock that was given to us.  This is the case where
+     * qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low
+     * bits, and qemu_lockcnt_inc_and_unlock would find it and wake someone.
+     */
+    if (waited) {
+        lockcnt_wake(lockcnt);
+    }
+    return false;
+}
+
+void qemu_lockcnt_lock(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    int step = QEMU_LOCKCNT_STATE_LOCKED;
+    bool waited = false;
+
+    /* The third argument is only used if the low bits of val are 0
+     * (QEMU_LOCKCNT_STATE_FREE), so just blindly mix in the desired
+     * state.
+     */
+    while (!qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, val + step, &waited)) {
+        if (waited) {
+            /* At this point we do not know if there are more waiters.  Assume
+             * there are.
+             */
+            step = QEMU_LOCKCNT_STATE_WAITING;
+        }
+    }
+}
+
+void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt)
+{
+    int expected, new, val;
+
+    val = atomic_read(&lockcnt->count);
+    do {
+        expected = val;
+        new = (val + QEMU_LOCKCNT_COUNT_STEP) & ~QEMU_LOCKCNT_STATE_MASK;
+        trace_lockcnt_unlock_attempt(lockcnt, val, new);
+        val = atomic_cmpxchg(&lockcnt->count, val, new);
+    } while (val != expected);
+
+    trace_lockcnt_unlock_success(lockcnt, val, new);
+    if (val & QEMU_LOCKCNT_STATE_WAITING) {
+        lockcnt_wake(lockcnt);
+    }
+}
+
+void qemu_lockcnt_unlock(QemuLockCnt *lockcnt)
+{
+    int expected, new, val;
+
+    val = atomic_read(&lockcnt->count);
+    do {
+        expected = val;
+        new = val & ~QEMU_LOCKCNT_STATE_MASK;
+        trace_lockcnt_unlock_attempt(lockcnt, val, new);
+        val = atomic_cmpxchg(&lockcnt->count, val, new);
+    } while (val != expected);
+
+    trace_lockcnt_unlock_success(lockcnt, val, new);
+    if (val & QEMU_LOCKCNT_STATE_WAITING) {
+        lockcnt_wake(lockcnt);
+    }
+}
+
+unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt)
+{
+    return atomic_read(&lockcnt->count) >> QEMU_LOCKCNT_COUNT_SHIFT;
+}
+#else
+void qemu_lockcnt_init(QemuLockCnt *lockcnt)
+{
+    qemu_mutex_init(&lockcnt->mutex);
+    lockcnt->count = 0;
+}
+
+void qemu_lockcnt_destroy(QemuLockCnt *lockcnt)
+{
+    qemu_mutex_destroy(&lockcnt->mutex);
+}
+
+void qemu_lockcnt_inc(QemuLockCnt *lockcnt)
+{
+    int old;
+    for (;;) {
+        old = atomic_read(&lockcnt->count);
+        if (old == 0) {
+            qemu_lockcnt_lock(lockcnt);
+            qemu_lockcnt_inc_and_unlock(lockcnt);
+            return;
+        } else {
+            if (atomic_cmpxchg(&lockcnt->count, old, old + 1) == old) {
+                return;
+            }
+        }
+    }
+}
+
+void qemu_lockcnt_dec(QemuLockCnt *lockcnt)
+{
+    atomic_dec(&lockcnt->count);
+}
+
+/* Decrement a counter, and return locked if it is decremented to zero.
+ * It is impossible for the counter to become nonzero while the mutex
+ * is taken.
+ */
+bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt)
+{
+    int val = atomic_read(&lockcnt->count);
+    while (val > 1) {
+        int old = atomic_cmpxchg(&lockcnt->count, val, val - 1);
+        if (old != val) {
+            val = old;
+            continue;
+        }
+
+        return false;
+    }
+
+    qemu_lockcnt_lock(lockcnt);
+    if (atomic_fetch_dec(&lockcnt->count) == 1) {
+        return true;
+    }
+
+    qemu_lockcnt_unlock(lockcnt);
+    return false;
+}
+
+/* Decrement a counter and return locked if it is decremented to zero.
+ * Otherwise do nothing.
+ *
+ * It is impossible for the counter to become nonzero while the mutex
+ * is taken.
+ */
+bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt)
+{
+    /* No need for acquire semantics if we return false.  */
+    int val = atomic_read(&lockcnt->count);
+    if (val > 1) {
+        return false;
+    }
+
+    qemu_lockcnt_lock(lockcnt);
+    if (atomic_fetch_dec(&lockcnt->count) == 1) {
+        return true;
+    }
+
+    qemu_lockcnt_inc_and_unlock(lockcnt);
+    return false;
+}
+
+void qemu_lockcnt_lock(QemuLockCnt *lockcnt)
+{
+    qemu_mutex_lock(&lockcnt->mutex);
+}
+
+void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt)
+{
+    atomic_inc(&lockcnt->count);
+    qemu_mutex_unlock(&lockcnt->mutex);
+}
+
+void qemu_lockcnt_unlock(QemuLockCnt *lockcnt)
+{
+    qemu_mutex_unlock(&lockcnt->mutex);
+}
+
+unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt)
+{
+    return atomic_read(&lockcnt->count);
+}
+#endif
diff --git a/util/qemu-thread-posix.c b/util/qemu-thread-posix.c
index d20cddec0c..37cd8ba3fe 100644
--- a/util/qemu-thread-posix.c
+++ b/util/qemu-thread-posix.c
@@ -11,10 +11,6 @@
  *
  */
 #include "qemu/osdep.h"
-#ifdef __linux__
-#include <sys/syscall.h>
-#include <linux/futex.h>
-#endif
 #include "qemu/thread.h"
 #include "qemu/atomic.h"
 #include "qemu/notify.h"
@@ -294,28 +290,9 @@ void qemu_sem_wait(QemuSemaphore *sem)
 }
 
 #ifdef __linux__
-#define futex(...)              syscall(__NR_futex, __VA_ARGS__)
-
-static inline void futex_wake(QemuEvent *ev, int n)
-{
-    futex(ev, FUTEX_WAKE, n, NULL, NULL, 0);
-}
-
-static inline void futex_wait(QemuEvent *ev, unsigned val)
-{
-    while (futex(ev, FUTEX_WAIT, (int) val, NULL, NULL, 0)) {
-        switch (errno) {
-        case EWOULDBLOCK:
-            return;
-        case EINTR:
-            break; /* get out of switch and retry */
-        default:
-            abort();
-        }
-    }
-}
+#include "qemu/futex.h"
 #else
-static inline void futex_wake(QemuEvent *ev, int n)
+static inline void qemu_futex_wake(QemuEvent *ev, int n)
 {
     pthread_mutex_lock(&ev->lock);
     if (n == 1) {
@@ -326,7 +303,7 @@ static inline void futex_wake(QemuEvent *ev, int n)
     pthread_mutex_unlock(&ev->lock);
 }
 
-static inline void futex_wait(QemuEvent *ev, unsigned val)
+static inline void qemu_futex_wait(QemuEvent *ev, unsigned val)
 {
     pthread_mutex_lock(&ev->lock);
     if (ev->value == val) {
@@ -338,7 +315,7 @@ static inline void futex_wait(QemuEvent *ev, unsigned val)
 
 /* Valid transitions:
  * - free->set, when setting the event
- * - busy->set, when setting the event, followed by futex_wake
+ * - busy->set, when setting the event, followed by qemu_futex_wake
  * - set->free, when resetting the event
  * - free->busy, when waiting
  *
@@ -381,7 +358,7 @@ void qemu_event_set(QemuEvent *ev)
     if (atomic_read(&ev->value) != EV_SET) {
         if (atomic_xchg(&ev->value, EV_SET) == EV_BUSY) {
             /* There were waiters, wake them up.  */
-            futex_wake(ev, INT_MAX);
+            qemu_futex_wake(ev, INT_MAX);
         }
     }
 }
@@ -419,7 +396,7 @@ void qemu_event_wait(QemuEvent *ev)
                 return;
             }
         }
-        futex_wait(ev, EV_BUSY);
+        qemu_futex_wait(ev, EV_BUSY);
     }
 }
 
diff --git a/util/qemu-thread-win32.c b/util/qemu-thread-win32.c
index 728e76b5b2..178e0168a1 100644
--- a/util/qemu-thread-win32.c
+++ b/util/qemu-thread-win32.c
@@ -269,7 +269,7 @@ void qemu_sem_wait(QemuSemaphore *sem)
  *
  * Valid transitions:
  * - free->set, when setting the event
- * - busy->set, when setting the event, followed by futex_wake
+ * - busy->set, when setting the event, followed by SetEvent
  * - set->free, when resetting the event
  * - free->busy, when waiting
  *
diff --git a/util/trace-events b/util/trace-events
index ed06aee2ec..2b8aa30739 100644
--- a/util/trace-events
+++ b/util/trace-events
@@ -30,3 +30,13 @@ qemu_anon_ram_free(void *ptr, size_t size) "ptr %p size %zu"
 hbitmap_iter_skip_words(const void *hb, void *hbi, uint64_t pos, unsigned long cur) "hb %p hbi %p pos %"PRId64" cur 0x%lx"
 hbitmap_reset(void *hb, uint64_t start, uint64_t count, uint64_t sbit, uint64_t ebit) "hb %p items %"PRIu64",%"PRIu64" bits %"PRIu64"..%"PRIu64
 hbitmap_set(void *hb, uint64_t start, uint64_t count, uint64_t sbit, uint64_t ebit) "hb %p items %"PRIu64",%"PRIu64" bits %"PRIu64"..%"PRIu64
+
+# util/lockcnt.c
+lockcnt_fast_path_attempt(const void *lockcnt, int expected, int new) "lockcnt %p fast path %d->%d"
+lockcnt_fast_path_success(const void *lockcnt, int expected, int new) "lockcnt %p fast path %d->%d succeeded"
+lockcnt_unlock_attempt(const void *lockcnt, int expected, int new) "lockcnt %p unlock %d->%d"
+lockcnt_unlock_success(const void *lockcnt, int expected, int new) "lockcnt %p unlock %d->%d succeeded"
+lockcnt_futex_wait_prepare(const void *lockcnt, int expected, int new) "lockcnt %p preparing slow path %d->%d"
+lockcnt_futex_wait(const void *lockcnt, int val) "lockcnt %p waiting on %d"
+lockcnt_futex_wait_resume(const void *lockcnt, int new) "lockcnt %p after wait: %d"
+lockcnt_futex_wake(const void *lockcnt) "lockcnt %p waking up one waiter"