summary refs log tree commit diff stats
diff options
context:
space:
mode:
authorPeter Maydell <peter.maydell@linaro.org>2021-03-31 16:38:49 +0100
committerPeter Maydell <peter.maydell@linaro.org>2021-03-31 16:38:49 +0100
commit1bd16067b652cce41a9214d0c62c73d5b45ab4b1 (patch)
tree594b7ebcd1e1e348f5a25f7670af6e7d8d8072ac
parent6ee55e1d10c25c2f6bf5ce2084ad2327e17affa5 (diff)
parentb6489ac06695e257ea0a9841364577e247fdee30 (diff)
downloadfocaccia-qemu-1bd16067b652cce41a9214d0c62c73d5b45ab4b1.tar.gz
focaccia-qemu-1bd16067b652cce41a9214d0c62c73d5b45ab4b1.zip
Merge remote-tracking branch 'remotes/stefanha-gitlab/tags/block-pull-request' into staging
Pull request

A fix for VDI image files and more generally for CoRwlock.

# gpg: Signature made Wed 31 Mar 2021 10:50:39 BST
# gpg:                using RSA key 8695A8BFD3F97CDAAC35775A9CA4ABB381AB73C8
# gpg: Good signature from "Stefan Hajnoczi <stefanha@redhat.com>" [full]
# gpg:                 aka "Stefan Hajnoczi <stefanha@gmail.com>" [full]
# Primary key fingerprint: 8695 A8BF D3F9 7CDA AC35  775A 9CA4 ABB3 81AB 73C8

* remotes/stefanha-gitlab/tags/block-pull-request:
  test-coroutine: Add rwlock downgrade test
  test-coroutine: Add rwlock upgrade test
  coroutine-lock: Reimplement CoRwlock to fix downgrade bug
  coroutine-lock: Store the coroutine in the CoWaitRecord only once
  block/vdi: Don't assume that blocks are larger than VdiHeader
  block/vdi: When writing new bmap entry fails, don't leak the buffer

Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
-rw-r--r--block/vdi.c11
-rw-r--r--include/qemu/coroutine.h17
-rw-r--r--tests/unit/test-coroutine.c161
-rw-r--r--util/qemu-coroutine-lock.c149
4 files changed, 274 insertions, 64 deletions
diff --git a/block/vdi.c b/block/vdi.c
index 5627e7d764..548f8a057b 100644
--- a/block/vdi.c
+++ b/block/vdi.c
@@ -690,23 +690,26 @@ nonallocating_write:
 
     logout("finished data write\n");
     if (ret < 0) {
+        g_free(block);
         return ret;
     }
 
     if (block) {
         /* One or more new blocks were allocated. */
-        VdiHeader *header = (VdiHeader *) block;
+        VdiHeader *header;
         uint8_t *base;
         uint64_t offset;
         uint32_t n_sectors;
 
+        g_free(block);
+        header = g_malloc(sizeof(*header));
+
         logout("now writing modified header\n");
         assert(VDI_IS_ALLOCATED(bmap_first));
         *header = s->header;
         vdi_header_to_le(header);
-        ret = bdrv_pwrite(bs->file, 0, block, sizeof(VdiHeader));
-        g_free(block);
-        block = NULL;
+        ret = bdrv_pwrite(bs->file, 0, header, sizeof(*header));
+        g_free(header);
 
         if (ret < 0) {
             return ret;
diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h
index 84eab6e3bf..ce5b9c6851 100644
--- a/include/qemu/coroutine.h
+++ b/include/qemu/coroutine.h
@@ -237,11 +237,15 @@ bool qemu_co_enter_next_impl(CoQueue *queue, QemuLockable *lock);
 bool qemu_co_queue_empty(CoQueue *queue);
 
 
+typedef struct CoRwTicket CoRwTicket;
 typedef struct CoRwlock {
-    int pending_writer;
-    int reader;
     CoMutex mutex;
-    CoQueue queue;
+
+    /* Number of readers, or -1 if owned for writing.  */
+    int owners;
+
+    /* Waiting coroutines.  */
+    QSIMPLEQ_HEAD(, CoRwTicket) tickets;
 } CoRwlock;
 
 /**
@@ -260,10 +264,9 @@ void qemu_co_rwlock_rdlock(CoRwlock *lock);
 /**
  * Write Locks the CoRwlock from a reader.  This is a bit more efficient than
  * @qemu_co_rwlock_unlock followed by a separate @qemu_co_rwlock_wrlock.
- * However, if the lock cannot be upgraded immediately, control is transferred
- * to the caller of the current coroutine.  Also, @qemu_co_rwlock_upgrade
- * only overrides CoRwlock fairness if there are no concurrent readers, so
- * another writer might run while @qemu_co_rwlock_upgrade blocks.
+ * Note that if the lock cannot be upgraded immediately, control is transferred
+ * to the caller of the current coroutine; another writer might run while
+ * @qemu_co_rwlock_upgrade blocks.
  */
 void qemu_co_rwlock_upgrade(CoRwlock *lock);
 
diff --git a/tests/unit/test-coroutine.c b/tests/unit/test-coroutine.c
index e946d93a65..aa77a3bcb3 100644
--- a/tests/unit/test-coroutine.c
+++ b/tests/unit/test-coroutine.c
@@ -264,6 +264,165 @@ static void test_co_mutex_lockable(void)
     g_assert(QEMU_MAKE_LOCKABLE(null_pointer) == NULL);
 }
 
+static CoRwlock rwlock;
+
+/* Test that readers are properly sent back to the queue when upgrading,
+ * even if they are the sole readers.  The test scenario is as follows:
+ *
+ *
+ * | c1           | c2         |
+ * |--------------+------------+
+ * | rdlock       |            |
+ * | yield        |            |
+ * |              | wrlock     |
+ * |              | <queued>   |
+ * | upgrade      |            |
+ * | <queued>     | <dequeued> |
+ * |              | unlock     |
+ * | <dequeued>   |            |
+ * | unlock       |            |
+ */
+
+static void coroutine_fn rwlock_yield_upgrade(void *opaque)
+{
+    qemu_co_rwlock_rdlock(&rwlock);
+    qemu_coroutine_yield();
+
+    qemu_co_rwlock_upgrade(&rwlock);
+    qemu_co_rwlock_unlock(&rwlock);
+
+    *(bool *)opaque = true;
+}
+
+static void coroutine_fn rwlock_wrlock_yield(void *opaque)
+{
+    qemu_co_rwlock_wrlock(&rwlock);
+    qemu_coroutine_yield();
+
+    qemu_co_rwlock_unlock(&rwlock);
+    *(bool *)opaque = true;
+}
+
+static void test_co_rwlock_upgrade(void)
+{
+    bool c1_done = false;
+    bool c2_done = false;
+    Coroutine *c1, *c2;
+
+    qemu_co_rwlock_init(&rwlock);
+    c1 = qemu_coroutine_create(rwlock_yield_upgrade, &c1_done);
+    c2 = qemu_coroutine_create(rwlock_wrlock_yield, &c2_done);
+
+    qemu_coroutine_enter(c1);
+    qemu_coroutine_enter(c2);
+
+    /* c1 now should go to sleep.  */
+    qemu_coroutine_enter(c1);
+    g_assert(!c1_done);
+
+    qemu_coroutine_enter(c2);
+    g_assert(c1_done);
+    g_assert(c2_done);
+}
+
+static void coroutine_fn rwlock_rdlock_yield(void *opaque)
+{
+    qemu_co_rwlock_rdlock(&rwlock);
+    qemu_coroutine_yield();
+
+    qemu_co_rwlock_unlock(&rwlock);
+    qemu_coroutine_yield();
+
+    *(bool *)opaque = true;
+}
+
+static void coroutine_fn rwlock_wrlock_downgrade(void *opaque)
+{
+    qemu_co_rwlock_wrlock(&rwlock);
+
+    qemu_co_rwlock_downgrade(&rwlock);
+    qemu_co_rwlock_unlock(&rwlock);
+    *(bool *)opaque = true;
+}
+
+static void coroutine_fn rwlock_rdlock(void *opaque)
+{
+    qemu_co_rwlock_rdlock(&rwlock);
+
+    qemu_co_rwlock_unlock(&rwlock);
+    *(bool *)opaque = true;
+}
+
+static void coroutine_fn rwlock_wrlock(void *opaque)
+{
+    qemu_co_rwlock_wrlock(&rwlock);
+
+    qemu_co_rwlock_unlock(&rwlock);
+    *(bool *)opaque = true;
+}
+
+/*
+ * Check that downgrading a reader-writer lock does not cause a hang.
+ *
+ * Four coroutines are used to produce a situation where there are
+ * both reader and writer hopefuls waiting to acquire an rwlock that
+ * is held by a reader.
+ *
+ * The correct sequence of operations we aim to provoke can be
+ * represented as:
+ *
+ * | c1     | c2         | c3         | c4         |
+ * |--------+------------+------------+------------|
+ * | rdlock |            |            |            |
+ * | yield  |            |            |            |
+ * |        | wrlock     |            |            |
+ * |        | <queued>   |            |            |
+ * |        |            | rdlock     |            |
+ * |        |            | <queued>   |            |
+ * |        |            |            | wrlock     |
+ * |        |            |            | <queued>   |
+ * | unlock |            |            |            |
+ * | yield  |            |            |            |
+ * |        | <dequeued> |            |            |
+ * |        | downgrade  |            |            |
+ * |        |            | <dequeued> |            |
+ * |        |            | unlock     |            |
+ * |        | ...        |            |            |
+ * |        | unlock     |            |            |
+ * |        |            |            | <dequeued> |
+ * |        |            |            | unlock     |
+ */
+static void test_co_rwlock_downgrade(void)
+{
+    bool c1_done = false;
+    bool c2_done = false;
+    bool c3_done = false;
+    bool c4_done = false;
+    Coroutine *c1, *c2, *c3, *c4;
+
+    qemu_co_rwlock_init(&rwlock);
+
+    c1 = qemu_coroutine_create(rwlock_rdlock_yield, &c1_done);
+    c2 = qemu_coroutine_create(rwlock_wrlock_downgrade, &c2_done);
+    c3 = qemu_coroutine_create(rwlock_rdlock, &c3_done);
+    c4 = qemu_coroutine_create(rwlock_wrlock, &c4_done);
+
+    qemu_coroutine_enter(c1);
+    qemu_coroutine_enter(c2);
+    qemu_coroutine_enter(c3);
+    qemu_coroutine_enter(c4);
+
+    qemu_coroutine_enter(c1);
+
+    g_assert(c2_done);
+    g_assert(c3_done);
+    g_assert(c4_done);
+
+    qemu_coroutine_enter(c1);
+
+    g_assert(c1_done);
+}
+
 /*
  * Check that creation, enter, and return work
  */
@@ -501,6 +660,8 @@ int main(int argc, char **argv)
     g_test_add_func("/basic/order", test_order);
     g_test_add_func("/locking/co-mutex", test_co_mutex);
     g_test_add_func("/locking/co-mutex/lockable", test_co_mutex_lockable);
+    g_test_add_func("/locking/co-rwlock/upgrade", test_co_rwlock_upgrade);
+    g_test_add_func("/locking/co-rwlock/downgrade", test_co_rwlock_downgrade);
     if (g_test_perf()) {
         g_test_add_func("/perf/lifecycle", perf_lifecycle);
         g_test_add_func("/perf/nesting", perf_nesting);
diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c
index 5816bf8900..2669403839 100644
--- a/util/qemu-coroutine-lock.c
+++ b/util/qemu-coroutine-lock.c
@@ -204,7 +204,6 @@ static void coroutine_fn qemu_co_mutex_lock_slowpath(AioContext *ctx,
     unsigned old_handoff;
 
     trace_qemu_co_mutex_lock_entry(mutex, self);
-    w.co = self;
     push_waiter(mutex, &w);
 
     /* This is the "Responsibility Hand-Off" protocol; a lock() picks from
@@ -328,11 +327,51 @@ void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
     trace_qemu_co_mutex_unlock_return(mutex, self);
 }
 
+struct CoRwTicket {
+    bool read;
+    Coroutine *co;
+    QSIMPLEQ_ENTRY(CoRwTicket) next;
+};
+
 void qemu_co_rwlock_init(CoRwlock *lock)
 {
-    memset(lock, 0, sizeof(*lock));
-    qemu_co_queue_init(&lock->queue);
     qemu_co_mutex_init(&lock->mutex);
+    lock->owners = 0;
+    QSIMPLEQ_INIT(&lock->tickets);
+}
+
+/* Releases the internal CoMutex.  */
+static void qemu_co_rwlock_maybe_wake_one(CoRwlock *lock)
+{
+    CoRwTicket *tkt = QSIMPLEQ_FIRST(&lock->tickets);
+    Coroutine *co = NULL;
+
+    /*
+     * Setting lock->owners here prevents rdlock and wrlock from
+     * sneaking in between unlock and wake.
+     */
+
+    if (tkt) {
+        if (tkt->read) {
+            if (lock->owners >= 0) {
+                lock->owners++;
+                co = tkt->co;
+            }
+        } else {
+            if (lock->owners == 0) {
+                lock->owners = -1;
+                co = tkt->co;
+            }
+        }
+    }
+
+    if (co) {
+        QSIMPLEQ_REMOVE_HEAD(&lock->tickets, next);
+        qemu_co_mutex_unlock(&lock->mutex);
+        aio_co_wake(co);
+    } else {
+        qemu_co_mutex_unlock(&lock->mutex);
+    }
 }
 
 void qemu_co_rwlock_rdlock(CoRwlock *lock)
@@ -341,13 +380,22 @@ void qemu_co_rwlock_rdlock(CoRwlock *lock)
 
     qemu_co_mutex_lock(&lock->mutex);
     /* For fairness, wait if a writer is in line.  */
-    while (lock->pending_writer) {
-        qemu_co_queue_wait(&lock->queue, &lock->mutex);
+    if (lock->owners == 0 || (lock->owners > 0 && QSIMPLEQ_EMPTY(&lock->tickets))) {
+        lock->owners++;
+        qemu_co_mutex_unlock(&lock->mutex);
+    } else {
+        CoRwTicket my_ticket = { true, self };
+
+        QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);
+        qemu_co_mutex_unlock(&lock->mutex);
+        qemu_coroutine_yield();
+        assert(lock->owners >= 1);
+
+        /* Possibly wake another reader, which will wake the next in line.  */
+        qemu_co_mutex_lock(&lock->mutex);
+        qemu_co_rwlock_maybe_wake_one(lock);
     }
-    lock->reader++;
-    qemu_co_mutex_unlock(&lock->mutex);
 
-    /* The rest of the read-side critical section is run without the mutex.  */
     self->locks_held++;
 }
 
@@ -356,69 +404,64 @@ void qemu_co_rwlock_unlock(CoRwlock *lock)
     Coroutine *self = qemu_coroutine_self();
 
     assert(qemu_in_coroutine());
-    if (!lock->reader) {
-        /* The critical section started in qemu_co_rwlock_wrlock.  */
-        qemu_co_queue_restart_all(&lock->queue);
-    } else {
-        self->locks_held--;
+    self->locks_held--;
 
-        qemu_co_mutex_lock(&lock->mutex);
-        lock->reader--;
-        assert(lock->reader >= 0);
-        /* Wakeup only one waiting writer */
-        if (!lock->reader) {
-            qemu_co_queue_next(&lock->queue);
-        }
+    qemu_co_mutex_lock(&lock->mutex);
+    if (lock->owners > 0) {
+        lock->owners--;
+    } else {
+        assert(lock->owners == -1);
+        lock->owners = 0;
     }
-    qemu_co_mutex_unlock(&lock->mutex);
+
+    qemu_co_rwlock_maybe_wake_one(lock);
 }
 
 void qemu_co_rwlock_downgrade(CoRwlock *lock)
 {
-    Coroutine *self = qemu_coroutine_self();
-
-    /* lock->mutex critical section started in qemu_co_rwlock_wrlock or
-     * qemu_co_rwlock_upgrade.
-     */
-    assert(lock->reader == 0);
-    lock->reader++;
-    qemu_co_mutex_unlock(&lock->mutex);
+    qemu_co_mutex_lock(&lock->mutex);
+    assert(lock->owners == -1);
+    lock->owners = 1;
 
-    /* The rest of the read-side critical section is run without the mutex.  */
-    self->locks_held++;
+    /* Possibly wake another reader, which will wake the next in line.  */
+    qemu_co_rwlock_maybe_wake_one(lock);
 }
 
 void qemu_co_rwlock_wrlock(CoRwlock *lock)
 {
+    Coroutine *self = qemu_coroutine_self();
+
     qemu_co_mutex_lock(&lock->mutex);
-    lock->pending_writer++;
-    while (lock->reader) {
-        qemu_co_queue_wait(&lock->queue, &lock->mutex);
+    if (lock->owners == 0) {
+        lock->owners = -1;
+        qemu_co_mutex_unlock(&lock->mutex);
+    } else {
+        CoRwTicket my_ticket = { false, qemu_coroutine_self() };
+
+        QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);
+        qemu_co_mutex_unlock(&lock->mutex);
+        qemu_coroutine_yield();
+        assert(lock->owners == -1);
     }
-    lock->pending_writer--;
 
-    /* The rest of the write-side critical section is run with
-     * the mutex taken, so that lock->reader remains zero.
-     * There is no need to update self->locks_held.
-     */
+    self->locks_held++;
 }
 
 void qemu_co_rwlock_upgrade(CoRwlock *lock)
 {
-    Coroutine *self = qemu_coroutine_self();
-
     qemu_co_mutex_lock(&lock->mutex);
-    assert(lock->reader > 0);
-    lock->reader--;
-    lock->pending_writer++;
-    while (lock->reader) {
-        qemu_co_queue_wait(&lock->queue, &lock->mutex);
-    }
-    lock->pending_writer--;
+    assert(lock->owners > 0);
+    /* For fairness, wait if a writer is in line.  */
+    if (lock->owners == 1 && QSIMPLEQ_EMPTY(&lock->tickets)) {
+        lock->owners = -1;
+        qemu_co_mutex_unlock(&lock->mutex);
+    } else {
+        CoRwTicket my_ticket = { false, qemu_coroutine_self() };
 
-    /* The rest of the write-side critical section is run with
-     * the mutex taken, similar to qemu_co_rwlock_wrlock.  Do
-     * not account for the lock twice in self->locks_held.
-     */
-    self->locks_held--;
+        lock->owners--;
+        QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);
+        qemu_co_rwlock_maybe_wake_one(lock);
+        qemu_coroutine_yield();
+        assert(lock->owners == -1);
+    }
 }