summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--block/blklogwrites.c85
1 files changed, 77 insertions, 8 deletions
diff --git a/block/blklogwrites.c b/block/blklogwrites.c
index ba717dab4d..ed38a93f21 100644
--- a/block/blklogwrites.c
+++ b/block/blklogwrites.c
@@ -3,7 +3,7 @@
  *
  * Copyright (c) 2017 Tuomas Tynkkynen <tuomas@tuxera.com>
  * Copyright (c) 2018 Aapo Vienamo <aapo@tuxera.com>
- * Copyright (c) 2018 Ari Sundholm <ari@tuxera.com>
+ * Copyright (c) 2018-2024 Ari Sundholm <ari@tuxera.com>
  *
  * This work is licensed under the terms of the GNU GPL, version 2 or later.
  * See the COPYING file in the top-level directory.
@@ -55,9 +55,34 @@ typedef struct {
     BdrvChild *log_file;
     uint32_t sectorsize;
     uint32_t sectorbits;
+    uint64_t update_interval;
+
+    /*
+     * The mutable state of the driver, consisting of the current log sector
+     * and the number of log entries.
+     *
+     * May be read and/or written from multiple threads, and the mutex must be
+     * held when accessing these fields.
+     */
     uint64_t cur_log_sector;
     uint64_t nr_entries;
-    uint64_t update_interval;
+    QemuMutex mutex;
+
+    /*
+     * The super block sequence number. Non-zero if a super block update is in
+     * progress.
+     *
+     * The mutex must be held when accessing this field.
+     */
+    uint64_t super_update_seq;
+
+    /*
+     * A coroutine-aware queue to serialize super block updates.
+     *
+     * Used with the mutex to ensure that only one thread be updating the super
+     * block at a time.
+     */
+    CoQueue super_update_queue;
 } BDRVBlkLogWritesState;
 
 static QemuOptsList runtime_opts = {
@@ -169,6 +194,9 @@ static int blk_log_writes_open(BlockDriverState *bs, QDict *options, int flags,
         goto fail;
     }
 
+    qemu_mutex_init(&s->mutex);
+    qemu_co_queue_init(&s->super_update_queue);
+
     log_append = qemu_opt_get_bool(opts, "log-append", false);
 
     if (log_append) {
@@ -231,6 +259,8 @@ static int blk_log_writes_open(BlockDriverState *bs, QDict *options, int flags,
         s->nr_entries = 0;
     }
 
+    s->super_update_seq = 0;
+
     if (!blk_log_writes_sector_size_valid(log_sector_size)) {
         ret = -EINVAL;
         error_setg(errp, "Invalid log sector size %"PRIu64, log_sector_size);
@@ -255,6 +285,7 @@ fail_log:
         bdrv_unref_child(bs, s->log_file);
         bdrv_graph_wrunlock();
         s->log_file = NULL;
+        qemu_mutex_destroy(&s->mutex);
     }
 fail:
     qemu_opts_del(opts);
@@ -269,6 +300,7 @@ static void blk_log_writes_close(BlockDriverState *bs)
     bdrv_unref_child(bs, s->log_file);
     s->log_file = NULL;
     bdrv_graph_wrunlock();
+    qemu_mutex_destroy(&s->mutex);
 }
 
 static int64_t coroutine_fn GRAPH_RDLOCK
@@ -295,7 +327,7 @@ static void blk_log_writes_child_perm(BlockDriverState *bs, BdrvChild *c,
 
 static void blk_log_writes_refresh_limits(BlockDriverState *bs, Error **errp)
 {
-    BDRVBlkLogWritesState *s = bs->opaque;
+    const BDRVBlkLogWritesState *s = bs->opaque;
     bs->bl.request_alignment = s->sectorsize;
 }
 
@@ -338,15 +370,18 @@ blk_log_writes_co_do_log(BlkLogWritesLogReq *lr)
      * driver may be modified by other driver operations while waiting for the
      * I/O to complete.
      */
+    qemu_mutex_lock(&s->mutex);
     const uint64_t entry_start_sector = s->cur_log_sector;
     const uint64_t entry_offset = entry_start_sector << s->sectorbits;
     const uint64_t qiov_aligned_size = ROUND_UP(lr->qiov->size, s->sectorsize);
     const uint64_t entry_aligned_size = qiov_aligned_size +
         ROUND_UP(lr->zero_size, s->sectorsize);
     const uint64_t entry_nr_sectors = entry_aligned_size >> s->sectorbits;
+    const uint64_t entry_seq = s->nr_entries + 1;
 
-    s->nr_entries++;
+    s->nr_entries = entry_seq;
     s->cur_log_sector += entry_nr_sectors;
+    qemu_mutex_unlock(&s->mutex);
 
     /*
      * Write the log entry. Note that if this is a "write zeroes" operation,
@@ -366,17 +401,44 @@ blk_log_writes_co_do_log(BlkLogWritesLogReq *lr)
 
     /* Update super block on flush or every update interval */
     if (lr->log_ret == 0 && ((lr->entry.flags & LOG_FLUSH_FLAG)
-        || (s->nr_entries % s->update_interval == 0)))
+        || (entry_seq % s->update_interval == 0)))
     {
         struct log_write_super super = {
             .magic      = cpu_to_le64(WRITE_LOG_MAGIC),
             .version    = cpu_to_le64(WRITE_LOG_VERSION),
-            .nr_entries = cpu_to_le64(s->nr_entries),
+            .nr_entries = 0, /* updated below */
             .sectorsize = cpu_to_le32(s->sectorsize),
         };
-        void *zeroes = g_malloc0(s->sectorsize - sizeof(super));
+        void *zeroes;
         QEMUIOVector qiov;
 
+        /*
+         * Wait if a super block update is already in progress.
+         * Bail out if a newer update got its turn before us.
+         */
+        WITH_QEMU_LOCK_GUARD(&s->mutex) {
+            CoQueueWaitFlags wait_flags = 0;
+            while (s->super_update_seq) {
+                if (entry_seq < s->super_update_seq) {
+                    return;
+                }
+                qemu_co_queue_wait_flags(&s->super_update_queue,
+                    &s->mutex, wait_flags);
+
+                /*
+                 * In case the wait condition remains true after wakeup,
+                 * to avoid starvation, make sure that this request is
+                 * scheduled to rerun next by pushing it to the front of the
+                 * queue.
+                 */
+                wait_flags = CO_QUEUE_WAIT_FRONT;
+            }
+            s->super_update_seq = entry_seq;
+            super.nr_entries = cpu_to_le64(s->nr_entries);
+        }
+
+        zeroes = g_malloc0(s->sectorsize - sizeof(super));
+
         qemu_iovec_init(&qiov, 2);
         qemu_iovec_add(&qiov, &super, sizeof(super));
         qemu_iovec_add(&qiov, zeroes, s->sectorsize - sizeof(super));
@@ -386,6 +448,13 @@ blk_log_writes_co_do_log(BlkLogWritesLogReq *lr)
         if (lr->log_ret == 0) {
             lr->log_ret = bdrv_co_flush(s->log_file->bs);
         }
+
+        /* The super block has been updated. Let another request have a go. */
+        qemu_mutex_lock(&s->mutex);
+        s->super_update_seq = 0;
+        (void) qemu_co_queue_next(&s->super_update_queue);
+        qemu_mutex_unlock(&s->mutex);
+
         qemu_iovec_destroy(&qiov);
         g_free(zeroes);
     }
@@ -405,7 +474,7 @@ blk_log_writes_co_log(BlockDriverState *bs, uint64_t offset, uint64_t bytes,
 {
     QEMUIOVector log_qiov;
     size_t niov = qiov ? qiov->niov : 0;
-    BDRVBlkLogWritesState *s = bs->opaque;
+    const BDRVBlkLogWritesState *s = bs->opaque;
     BlkLogWritesFileReq fr = {
         .bs         = bs,
         .offset     = offset,