summary refs log tree commit diff stats
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--block/block-copy.c396
-rw-r--r--block/meson.build1
-rw-r--r--block/progress_meter.c64
-rw-r--r--blockjob.c46
-rw-r--r--include/block/block-copy.h2
-rw-r--r--include/qemu/co-shared-resource.h4
-rw-r--r--include/qemu/progress_meter.h34
-rw-r--r--include/qemu/ratelimit.h12
-rw-r--r--job-qmp.c8
-rw-r--r--job.c3
-rw-r--r--qemu-img.c9
-rw-r--r--util/qemu-co-shared-resource.c24
12 files changed, 399 insertions, 204 deletions
diff --git a/block/block-copy.c b/block/block-copy.c
index 5808cfe657..0becad52da 100644
--- a/block/block-copy.c
+++ b/block/block-copy.c
@@ -28,10 +28,18 @@
 #define BLOCK_COPY_MAX_WORKERS 64
 #define BLOCK_COPY_SLICE_TIME 100000000ULL /* ns */
 
+typedef enum {
+    COPY_READ_WRITE_CLUSTER,
+    COPY_READ_WRITE,
+    COPY_WRITE_ZEROES,
+    COPY_RANGE_SMALL,
+    COPY_RANGE_FULL
+} BlockCopyMethod;
+
 static coroutine_fn int block_copy_task_entry(AioTask *task);
 
 typedef struct BlockCopyCallState {
-    /* IN parameters. Initialized in block_copy_async() and never changed. */
+    /* Fields initialized in block_copy_async() and never changed. */
     BlockCopyState *s;
     int64_t offset;
     int64_t bytes;
@@ -40,34 +48,60 @@ typedef struct BlockCopyCallState {
     bool ignore_ratelimit;
     BlockCopyAsyncCallbackFunc cb;
     void *cb_opaque;
-
     /* Coroutine where async block-copy is running */
     Coroutine *co;
 
+    /* Fields whose state changes throughout the execution */
+    bool finished; /* atomic */
+    QemuCoSleep sleep; /* TODO: protect API with a lock */
+    bool cancelled; /* atomic */
     /* To reference all call states from BlockCopyState */
     QLIST_ENTRY(BlockCopyCallState) list;
 
-    /* State */
-    int ret;
-    bool finished;
-    QemuCoSleep sleep;
-    bool cancelled;
-
-    /* OUT parameters */
+    /*
+     * Fields that report information about return values and erros.
+     * Protected by lock in BlockCopyState.
+     */
     bool error_is_read;
+    /*
+     * @ret is set concurrently by tasks under mutex. Only set once by first
+     * failed task (and untouched if no task failed).
+     * After finishing (call_state->finished is true), it is not modified
+     * anymore and may be safely read without mutex.
+     */
+    int ret;
 } BlockCopyCallState;
 
 typedef struct BlockCopyTask {
     AioTask task;
 
+    /*
+     * Fields initialized in block_copy_task_create()
+     * and never changed.
+     */
     BlockCopyState *s;
     BlockCopyCallState *call_state;
     int64_t offset;
+    /*
+     * @method can also be set again in the while loop of
+     * block_copy_dirty_clusters(), but it is never accessed concurrently
+     * because the only other function that reads it is
+     * block_copy_task_entry() and it is invoked afterwards in the same
+     * iteration.
+     */
+    BlockCopyMethod method;
+
+    /*
+     * Fields whose state changes throughout the execution
+     * Protected by lock in BlockCopyState.
+     */
+    CoQueue wait_queue; /* coroutines blocked on this task */
+    /*
+     * Only protect the case of parallel read while updating @bytes
+     * value in block_copy_task_shrink().
+     */
     int64_t bytes;
-    bool zeroes;
-    bool copy_range;
     QLIST_ENTRY(BlockCopyTask) list;
-    CoQueue wait_queue; /* coroutines blocked on this task */
 } BlockCopyTask;
 
 static int64_t task_end(BlockCopyTask *task)
@@ -83,18 +117,26 @@ typedef struct BlockCopyState {
      */
     BdrvChild *source;
     BdrvChild *target;
-    BdrvDirtyBitmap *copy_bitmap;
-    int64_t in_flight_bytes;
+
+    /*
+     * Fields initialized in block_copy_state_new()
+     * and never changed.
+     */
     int64_t cluster_size;
-    bool use_copy_range;
-    int64_t copy_size;
+    int64_t max_transfer;
     uint64_t len;
-    QLIST_HEAD(, BlockCopyTask) tasks; /* All tasks from all block-copy calls */
-    QLIST_HEAD(, BlockCopyCallState) calls;
-
     BdrvRequestFlags write_flags;
 
     /*
+     * Fields whose state changes throughout the execution
+     * Protected by lock.
+     */
+    CoMutex lock;
+    int64_t in_flight_bytes;
+    BlockCopyMethod method;
+    QLIST_HEAD(, BlockCopyTask) tasks; /* All tasks from all block-copy calls */
+    QLIST_HEAD(, BlockCopyCallState) calls;
+    /*
      * skip_unallocated:
      *
      * Used by sync=top jobs, which first scan the source node for unallocated
@@ -108,16 +150,15 @@ typedef struct BlockCopyState {
      * skip unallocated regions, clear them in the copy_bitmap, and invoke
      * block_copy_reset_unallocated() every time it does.
      */
-    bool skip_unallocated;
-
+    bool skip_unallocated; /* atomic */
+    /* State fields that use a thread-safe API */
+    BdrvDirtyBitmap *copy_bitmap;
     ProgressMeter *progress;
-
     SharedResource *mem;
-
-    uint64_t speed;
     RateLimit rate_limit;
 } BlockCopyState;
 
+/* Called with lock held */
 static BlockCopyTask *find_conflicting_task(BlockCopyState *s,
                                             int64_t offset, int64_t bytes)
 {
@@ -135,6 +176,9 @@ static BlockCopyTask *find_conflicting_task(BlockCopyState *s,
 /*
  * If there are no intersecting tasks return false. Otherwise, wait for the
  * first found intersecting tasks to finish and return true.
+ *
+ * Called with lock held. May temporary release the lock.
+ * Return value of 0 proves that lock was NOT released.
  */
 static bool coroutine_fn block_copy_wait_one(BlockCopyState *s, int64_t offset,
                                              int64_t bytes)
@@ -145,22 +189,43 @@ static bool coroutine_fn block_copy_wait_one(BlockCopyState *s, int64_t offset,
         return false;
     }
 
-    qemu_co_queue_wait(&task->wait_queue, NULL);
+    qemu_co_queue_wait(&task->wait_queue, &s->lock);
 
     return true;
 }
 
+/* Called with lock held */
+static int64_t block_copy_chunk_size(BlockCopyState *s)
+{
+    switch (s->method) {
+    case COPY_READ_WRITE_CLUSTER:
+        return s->cluster_size;
+    case COPY_READ_WRITE:
+    case COPY_RANGE_SMALL:
+        return MIN(MAX(s->cluster_size, BLOCK_COPY_MAX_BUFFER),
+                   s->max_transfer);
+    case COPY_RANGE_FULL:
+        return MIN(MAX(s->cluster_size, BLOCK_COPY_MAX_COPY_RANGE),
+                   s->max_transfer);
+    default:
+        /* Cannot have COPY_WRITE_ZEROES here.  */
+        abort();
+    }
+}
+
 /*
  * Search for the first dirty area in offset/bytes range and create task at
  * the beginning of it.
  */
-static BlockCopyTask *block_copy_task_create(BlockCopyState *s,
-                                             BlockCopyCallState *call_state,
-                                             int64_t offset, int64_t bytes)
+static coroutine_fn BlockCopyTask *
+block_copy_task_create(BlockCopyState *s, BlockCopyCallState *call_state,
+                       int64_t offset, int64_t bytes)
 {
     BlockCopyTask *task;
-    int64_t max_chunk = MIN_NON_ZERO(s->copy_size, call_state->max_chunk);
+    int64_t max_chunk;
 
+    QEMU_LOCK_GUARD(&s->lock);
+    max_chunk = MIN_NON_ZERO(block_copy_chunk_size(s), call_state->max_chunk);
     if (!bdrv_dirty_bitmap_next_dirty_area(s->copy_bitmap,
                                            offset, offset + bytes,
                                            max_chunk, &offset, &bytes))
@@ -184,7 +249,7 @@ static BlockCopyTask *block_copy_task_create(BlockCopyState *s,
         .call_state = call_state,
         .offset = offset,
         .bytes = bytes,
-        .copy_range = s->use_copy_range,
+        .method = s->method,
     };
     qemu_co_queue_init(&task->wait_queue);
     QLIST_INSERT_HEAD(&s->tasks, task, list);
@@ -202,6 +267,7 @@ static BlockCopyTask *block_copy_task_create(BlockCopyState *s,
 static void coroutine_fn block_copy_task_shrink(BlockCopyTask *task,
                                                 int64_t new_bytes)
 {
+    QEMU_LOCK_GUARD(&task->s->lock);
     if (new_bytes == task->bytes) {
         return;
     }
@@ -218,11 +284,15 @@ static void coroutine_fn block_copy_task_shrink(BlockCopyTask *task,
 
 static void coroutine_fn block_copy_task_end(BlockCopyTask *task, int ret)
 {
+    QEMU_LOCK_GUARD(&task->s->lock);
     task->s->in_flight_bytes -= task->bytes;
     if (ret < 0) {
         bdrv_set_dirty_bitmap(task->s->copy_bitmap, task->offset, task->bytes);
     }
     QLIST_REMOVE(task, list);
+    progress_set_remaining(task->s->progress,
+                           bdrv_get_dirty_count(task->s->copy_bitmap) +
+                           task->s->in_flight_bytes);
     qemu_co_queue_restart_all(&task->wait_queue);
 }
 
@@ -268,37 +338,39 @@ BlockCopyState *block_copy_state_new(BdrvChild *source, BdrvChild *target,
         .len = bdrv_dirty_bitmap_size(copy_bitmap),
         .write_flags = write_flags,
         .mem = shres_create(BLOCK_COPY_MAX_MEM),
+        .max_transfer = QEMU_ALIGN_DOWN(
+                                    block_copy_max_transfer(source, target),
+                                    cluster_size),
     };
 
-    if (block_copy_max_transfer(source, target) < cluster_size) {
+    if (s->max_transfer < cluster_size) {
         /*
          * copy_range does not respect max_transfer. We don't want to bother
          * with requests smaller than block-copy cluster size, so fallback to
          * buffered copying (read and write respect max_transfer on their
          * behalf).
          */
-        s->use_copy_range = false;
-        s->copy_size = cluster_size;
+        s->method = COPY_READ_WRITE_CLUSTER;
     } else if (write_flags & BDRV_REQ_WRITE_COMPRESSED) {
         /* Compression supports only cluster-size writes and no copy-range. */
-        s->use_copy_range = false;
-        s->copy_size = cluster_size;
+        s->method = COPY_READ_WRITE_CLUSTER;
     } else {
         /*
-         * We enable copy-range, but keep small copy_size, until first
+         * If copy range enabled, start with COPY_RANGE_SMALL, until first
          * successful copy_range (look at block_copy_do_copy).
          */
-        s->use_copy_range = use_copy_range;
-        s->copy_size = MAX(s->cluster_size, BLOCK_COPY_MAX_BUFFER);
+        s->method = use_copy_range ? COPY_RANGE_SMALL : COPY_READ_WRITE;
     }
 
     ratelimit_init(&s->rate_limit);
+    qemu_co_mutex_init(&s->lock);
     QLIST_INIT(&s->tasks);
     QLIST_INIT(&s->calls);
 
     return s;
 }
 
+/* Only set before running the job, no need for locking. */
 void block_copy_set_progress_meter(BlockCopyState *s, ProgressMeter *pm)
 {
     s->progress = pm;
@@ -344,17 +416,14 @@ static coroutine_fn int block_copy_task_run(AioTaskPool *pool,
  *
  * No sync here: nor bitmap neighter intersecting requests handling, only copy.
  *
- * @copy_range is an in-out argument: if *copy_range is false, copy_range is not
- * done. If *copy_range is true, copy_range is attempted. If the copy_range
- * attempt fails, the function falls back to the usual read+write and
- * *copy_range is set to false. *copy_range and zeroes must not be true
- * simultaneously.
- *
+ * @method is an in-out argument, so that copy_range can be either extended to
+ * a full-size buffer or disabled if the copy_range attempt fails.  The output
+ * value of @method should be used for subsequent tasks.
  * Returns 0 on success.
  */
 static int coroutine_fn block_copy_do_copy(BlockCopyState *s,
                                            int64_t offset, int64_t bytes,
-                                           bool zeroes, bool *copy_range,
+                                           BlockCopyMethod *method,
                                            bool *error_is_read)
 {
     int ret;
@@ -368,9 +437,9 @@ static int coroutine_fn block_copy_do_copy(BlockCopyState *s,
     assert(offset + bytes <= s->len ||
            offset + bytes == QEMU_ALIGN_UP(s->len, s->cluster_size));
     assert(nbytes < INT_MAX);
-    assert(!(*copy_range && zeroes));
 
-    if (zeroes) {
+    switch (*method) {
+    case COPY_WRITE_ZEROES:
         ret = bdrv_co_pwrite_zeroes(s->target, offset, nbytes, s->write_flags &
                                     ~BDRV_REQ_WRITE_COMPRESSED);
         if (ret < 0) {
@@ -378,99 +447,86 @@ static int coroutine_fn block_copy_do_copy(BlockCopyState *s,
             *error_is_read = false;
         }
         return ret;
-    }
 
-    if (*copy_range) {
+    case COPY_RANGE_SMALL:
+    case COPY_RANGE_FULL:
         ret = bdrv_co_copy_range(s->source, offset, s->target, offset, nbytes,
                                  0, s->write_flags);
-        if (ret < 0) {
-            trace_block_copy_copy_range_fail(s, offset, ret);
-            *copy_range = false;
-            /* Fallback to read+write with allocated buffer */
-        } else {
+        if (ret >= 0) {
+            /* Successful copy-range, increase chunk size.  */
+            *method = COPY_RANGE_FULL;
             return 0;
         }
-    }
 
-    /*
-     * In case of failed copy_range request above, we may proceed with buffered
-     * request larger than BLOCK_COPY_MAX_BUFFER. Still, further requests will
-     * be properly limited, so don't care too much. Moreover the most likely
-     * case (copy_range is unsupported for the configuration, so the very first
-     * copy_range request fails) is handled by setting large copy_size only
-     * after first successful copy_range.
-     */
+        trace_block_copy_copy_range_fail(s, offset, ret);
+        *method = COPY_READ_WRITE;
+        /* Fall through to read+write with allocated buffer */
 
-    bounce_buffer = qemu_blockalign(s->source->bs, nbytes);
+    case COPY_READ_WRITE_CLUSTER:
+    case COPY_READ_WRITE:
+        /*
+         * In case of failed copy_range request above, we may proceed with
+         * buffered request larger than BLOCK_COPY_MAX_BUFFER.
+         * Still, further requests will be properly limited, so don't care too
+         * much. Moreover the most likely case (copy_range is unsupported for
+         * the configuration, so the very first copy_range request fails)
+         * is handled by setting large copy_size only after first successful
+         * copy_range.
+         */
 
-    ret = bdrv_co_pread(s->source, offset, nbytes, bounce_buffer, 0);
-    if (ret < 0) {
-        trace_block_copy_read_fail(s, offset, ret);
-        *error_is_read = true;
-        goto out;
-    }
+        bounce_buffer = qemu_blockalign(s->source->bs, nbytes);
 
-    ret = bdrv_co_pwrite(s->target, offset, nbytes, bounce_buffer,
-                         s->write_flags);
-    if (ret < 0) {
-        trace_block_copy_write_fail(s, offset, ret);
-        *error_is_read = false;
-        goto out;
-    }
+        ret = bdrv_co_pread(s->source, offset, nbytes, bounce_buffer, 0);
+        if (ret < 0) {
+            trace_block_copy_read_fail(s, offset, ret);
+            *error_is_read = true;
+            goto out;
+        }
 
-out:
-    qemu_vfree(bounce_buffer);
+        ret = bdrv_co_pwrite(s->target, offset, nbytes, bounce_buffer,
+                             s->write_flags);
+        if (ret < 0) {
+            trace_block_copy_write_fail(s, offset, ret);
+            *error_is_read = false;
+            goto out;
+        }
 
-    return ret;
-}
+    out:
+        qemu_vfree(bounce_buffer);
+        break;
 
-static void block_copy_handle_copy_range_result(BlockCopyState *s,
-                                                bool is_success)
-{
-    if (!s->use_copy_range) {
-        /* already disabled */
-        return;
+    default:
+        abort();
     }
 
-    if (is_success) {
-        /*
-         * Successful copy-range. Now increase copy_size.  copy_range
-         * does not respect max_transfer (it's a TODO), so we factor
-         * that in here.
-         */
-        s->copy_size =
-                MIN(MAX(s->cluster_size, BLOCK_COPY_MAX_COPY_RANGE),
-                    QEMU_ALIGN_DOWN(block_copy_max_transfer(s->source,
-                                                            s->target),
-                                    s->cluster_size));
-    } else {
-        /* Copy-range failed, disable it. */
-        s->use_copy_range = false;
-        s->copy_size = MAX(s->cluster_size, BLOCK_COPY_MAX_BUFFER);
-    }
+    return ret;
 }
 
 static coroutine_fn int block_copy_task_entry(AioTask *task)
 {
     BlockCopyTask *t = container_of(task, BlockCopyTask, task);
+    BlockCopyState *s = t->s;
     bool error_is_read = false;
-    bool copy_range = t->copy_range;
+    BlockCopyMethod method = t->method;
     int ret;
 
-    ret = block_copy_do_copy(t->s, t->offset, t->bytes, t->zeroes,
-                             &copy_range, &error_is_read);
-    if (t->copy_range) {
-        block_copy_handle_copy_range_result(t->s, copy_range);
-    }
-    if (ret < 0) {
-        if (!t->call_state->ret) {
-            t->call_state->ret = ret;
-            t->call_state->error_is_read = error_is_read;
+    ret = block_copy_do_copy(s, t->offset, t->bytes, &method, &error_is_read);
+
+    WITH_QEMU_LOCK_GUARD(&s->lock) {
+        if (s->method == t->method) {
+            s->method = method;
+        }
+
+        if (ret < 0) {
+            if (!t->call_state->ret) {
+                t->call_state->ret = ret;
+                t->call_state->error_is_read = error_is_read;
+            }
+        } else {
+            progress_work_done(s->progress, t->bytes);
         }
-    } else {
-        progress_work_done(t->s->progress, t->bytes);
     }
-    co_put_to_shres(t->s->mem, t->bytes);
+    co_put_to_shres(s->mem, t->bytes);
     block_copy_task_end(t, ret);
 
     return ret;
@@ -483,7 +539,7 @@ static int block_copy_block_status(BlockCopyState *s, int64_t offset,
     BlockDriverState *base;
     int ret;
 
-    if (s->skip_unallocated) {
+    if (qatomic_read(&s->skip_unallocated)) {
         base = bdrv_backing_chain_next(s->source->bs);
     } else {
         base = NULL;
@@ -570,10 +626,12 @@ int64_t block_copy_reset_unallocated(BlockCopyState *s,
     bytes = clusters * s->cluster_size;
 
     if (!ret) {
+        qemu_co_mutex_lock(&s->lock);
         bdrv_reset_dirty_bitmap(s->copy_bitmap, offset, bytes);
         progress_set_remaining(s->progress,
                                bdrv_get_dirty_count(s->copy_bitmap) +
                                s->in_flight_bytes);
+        qemu_co_mutex_unlock(&s->lock);
     }
 
     *count = bytes;
@@ -609,7 +667,8 @@ block_copy_dirty_clusters(BlockCopyCallState *call_state)
     assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
     assert(QEMU_IS_ALIGNED(bytes, s->cluster_size));
 
-    while (bytes && aio_task_pool_status(aio) == 0 && !call_state->cancelled) {
+    while (bytes && aio_task_pool_status(aio) == 0 &&
+           !qatomic_read(&call_state->cancelled)) {
         BlockCopyTask *task;
         int64_t status_bytes;
 
@@ -631,11 +690,9 @@ block_copy_dirty_clusters(BlockCopyCallState *call_state)
         if (status_bytes < task->bytes) {
             block_copy_task_shrink(task, status_bytes);
         }
-        if (s->skip_unallocated && !(ret & BDRV_BLOCK_ALLOCATED)) {
+        if (qatomic_read(&s->skip_unallocated) &&
+            !(ret & BDRV_BLOCK_ALLOCATED)) {
             block_copy_task_end(task, 0);
-            progress_set_remaining(s->progress,
-                                   bdrv_get_dirty_count(s->copy_bitmap) +
-                                   s->in_flight_bytes);
             trace_block_copy_skip_range(s, task->offset, task->bytes);
             offset = task_end(task);
             bytes = end - offset;
@@ -643,25 +700,22 @@ block_copy_dirty_clusters(BlockCopyCallState *call_state)
             continue;
         }
         if (ret & BDRV_BLOCK_ZERO) {
-            task->zeroes = true;
-            task->copy_range = false;
+            task->method = COPY_WRITE_ZEROES;
         }
 
-        if (s->speed) {
-            if (!call_state->ignore_ratelimit) {
-                uint64_t ns = ratelimit_calculate_delay(&s->rate_limit, 0);
-                if (ns > 0) {
-                    block_copy_task_end(task, -EAGAIN);
-                    g_free(task);
-                    qemu_co_sleep_ns_wakeable(&call_state->sleep,
-                                              QEMU_CLOCK_REALTIME, ns);
-                    continue;
-                }
+        if (!call_state->ignore_ratelimit) {
+            uint64_t ns = ratelimit_calculate_delay(&s->rate_limit, 0);
+            if (ns > 0) {
+                block_copy_task_end(task, -EAGAIN);
+                g_free(task);
+                qemu_co_sleep_ns_wakeable(&call_state->sleep,
+                                          QEMU_CLOCK_REALTIME, ns);
+                continue;
             }
-
-            ratelimit_calculate_delay(&s->rate_limit, task->bytes);
         }
 
+        ratelimit_calculate_delay(&s->rate_limit, task->bytes);
+
         trace_block_copy_process(s, task->offset);
 
         co_get_from_shres(s->mem, task->bytes);
@@ -717,15 +771,40 @@ void block_copy_kick(BlockCopyCallState *call_state)
 static int coroutine_fn block_copy_common(BlockCopyCallState *call_state)
 {
     int ret;
+    BlockCopyState *s = call_state->s;
 
-    QLIST_INSERT_HEAD(&call_state->s->calls, call_state, list);
+    qemu_co_mutex_lock(&s->lock);
+    QLIST_INSERT_HEAD(&s->calls, call_state, list);
+    qemu_co_mutex_unlock(&s->lock);
 
     do {
         ret = block_copy_dirty_clusters(call_state);
 
-        if (ret == 0 && !call_state->cancelled) {
-            ret = block_copy_wait_one(call_state->s, call_state->offset,
-                                      call_state->bytes);
+        if (ret == 0 && !qatomic_read(&call_state->cancelled)) {
+            WITH_QEMU_LOCK_GUARD(&s->lock) {
+                /*
+                 * Check that there is no task we still need to
+                 * wait to complete
+                 */
+                ret = block_copy_wait_one(s, call_state->offset,
+                                          call_state->bytes);
+                if (ret == 0) {
+                    /*
+                     * No pending tasks, but check again the bitmap in this
+                     * same critical section, since a task might have failed
+                     * between this and the critical section in
+                     * block_copy_dirty_clusters().
+                     *
+                     * block_copy_wait_one return value 0 also means that it
+                     * didn't release the lock. So, we are still in the same
+                     * critical section, not interrupted by any concurrent
+                     * access to state.
+                     */
+                    ret = bdrv_dirty_bitmap_next_dirty(s->copy_bitmap,
+                                                       call_state->offset,
+                                                       call_state->bytes) >= 0;
+                }
+            }
         }
 
         /*
@@ -737,15 +816,17 @@ static int coroutine_fn block_copy_common(BlockCopyCallState *call_state)
          * 2. We have waited for some intersecting block-copy request
          *    It may have failed and produced new dirty bits.
          */
-    } while (ret > 0 && !call_state->cancelled);
+    } while (ret > 0 && !qatomic_read(&call_state->cancelled));
 
-    call_state->finished = true;
+    qatomic_store_release(&call_state->finished, true);
 
     if (call_state->cb) {
         call_state->cb(call_state->cb_opaque);
     }
 
+    qemu_co_mutex_lock(&s->lock);
     QLIST_REMOVE(call_state, list);
+    qemu_co_mutex_unlock(&s->lock);
 
     return ret;
 }
@@ -800,44 +881,50 @@ void block_copy_call_free(BlockCopyCallState *call_state)
         return;
     }
 
-    assert(call_state->finished);
+    assert(qatomic_read(&call_state->finished));
     g_free(call_state);
 }
 
 bool block_copy_call_finished(BlockCopyCallState *call_state)
 {
-    return call_state->finished;
+    return qatomic_read(&call_state->finished);
 }
 
 bool block_copy_call_succeeded(BlockCopyCallState *call_state)
 {
-    return call_state->finished && !call_state->cancelled &&
-        call_state->ret == 0;
+    return qatomic_load_acquire(&call_state->finished) &&
+           !qatomic_read(&call_state->cancelled) &&
+           call_state->ret == 0;
 }
 
 bool block_copy_call_failed(BlockCopyCallState *call_state)
 {
-    return call_state->finished && !call_state->cancelled &&
-        call_state->ret < 0;
+    return qatomic_load_acquire(&call_state->finished) &&
+           !qatomic_read(&call_state->cancelled) &&
+           call_state->ret < 0;
 }
 
 bool block_copy_call_cancelled(BlockCopyCallState *call_state)
 {
-    return call_state->cancelled;
+    return qatomic_read(&call_state->cancelled);
 }
 
 int block_copy_call_status(BlockCopyCallState *call_state, bool *error_is_read)
 {
-    assert(call_state->finished);
+    assert(qatomic_load_acquire(&call_state->finished));
     if (error_is_read) {
         *error_is_read = call_state->error_is_read;
     }
     return call_state->ret;
 }
 
+/*
+ * Note that cancelling and finishing are racy.
+ * User can cancel a block-copy that is already finished.
+ */
 void block_copy_call_cancel(BlockCopyCallState *call_state)
 {
-    call_state->cancelled = true;
+    qatomic_set(&call_state->cancelled, true);
     block_copy_kick(call_state);
 }
 
@@ -848,15 +935,12 @@ BdrvDirtyBitmap *block_copy_dirty_bitmap(BlockCopyState *s)
 
 void block_copy_set_skip_unallocated(BlockCopyState *s, bool skip)
 {
-    s->skip_unallocated = skip;
+    qatomic_set(&s->skip_unallocated, skip);
 }
 
 void block_copy_set_speed(BlockCopyState *s, uint64_t speed)
 {
-    s->speed = speed;
-    if (speed > 0) {
-        ratelimit_set_speed(&s->rate_limit, speed, BLOCK_COPY_SLICE_TIME);
-    }
+    ratelimit_set_speed(&s->rate_limit, speed, BLOCK_COPY_SLICE_TIME);
 
     /*
      * Note: it's good to kick all call states from here, but it should be done
diff --git a/block/meson.build b/block/meson.build
index 01861e1545..ef1ba3d973 100644
--- a/block/meson.build
+++ b/block/meson.build
@@ -13,6 +13,7 @@ block_ss.add(files(
   'commit.c',
   'copy-on-read.c',
   'preallocate.c',
+  'progress_meter.c',
   'create.c',
   'crypto.c',
   'dirty-bitmap.c',
diff --git a/block/progress_meter.c b/block/progress_meter.c
new file mode 100644
index 0000000000..aa2e60248c
--- /dev/null
+++ b/block/progress_meter.c
@@ -0,0 +1,64 @@
+/*
+ * Helper functionality for some process progress tracking.
+ *
+ * Copyright (c) 2011 IBM Corp.
+ * Copyright (c) 2012, 2018 Red Hat, Inc.
+ * Copyright (c) 2020 Virtuozzo International GmbH
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+#include "qemu/osdep.h"
+#include "qemu/progress_meter.h"
+
+void progress_init(ProgressMeter *pm)
+{
+    qemu_mutex_init(&pm->lock);
+}
+
+void progress_destroy(ProgressMeter *pm)
+{
+    qemu_mutex_destroy(&pm->lock);
+}
+
+void progress_get_snapshot(ProgressMeter *pm, uint64_t *current,
+                           uint64_t *total)
+{
+    QEMU_LOCK_GUARD(&pm->lock);
+
+    *current = pm->current;
+    *total = pm->total;
+}
+
+void progress_work_done(ProgressMeter *pm, uint64_t done)
+{
+    QEMU_LOCK_GUARD(&pm->lock);
+    pm->current += done;
+}
+
+void progress_set_remaining(ProgressMeter *pm, uint64_t remaining)
+{
+    QEMU_LOCK_GUARD(&pm->lock);
+    pm->total = pm->current + remaining;
+}
+
+void progress_increase_remaining(ProgressMeter *pm, uint64_t delta)
+{
+    QEMU_LOCK_GUARD(&pm->lock);
+    pm->total += delta;
+}
diff --git a/blockjob.c b/blockjob.c
index dc1d9e0e46..4bad1408cb 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -300,28 +300,29 @@ bool block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
 
 int64_t block_job_ratelimit_get_delay(BlockJob *job, uint64_t n)
 {
-    if (!job->speed) {
-        return 0;
-    }
-
     return ratelimit_calculate_delay(&job->limit, n);
 }
 
 BlockJobInfo *block_job_query(BlockJob *job, Error **errp)
 {
     BlockJobInfo *info;
+    uint64_t progress_current, progress_total;
 
     if (block_job_is_internal(job)) {
         error_setg(errp, "Cannot query QEMU internal jobs");
         return NULL;
     }
+
+    progress_get_snapshot(&job->job.progress, &progress_current,
+                          &progress_total);
+
     info = g_new0(BlockJobInfo, 1);
     info->type      = g_strdup(job_type_str(&job->job));
     info->device    = g_strdup(job->job.id);
     info->busy      = qatomic_read(&job->job.busy);
     info->paused    = job->job.pause_count > 0;
-    info->offset    = job->job.progress.current;
-    info->len       = job->job.progress.total;
+    info->offset    = progress_current;
+    info->len       = progress_total;
     info->speed     = job->speed;
     info->io_status = job->iostatus;
     info->ready     = job_is_ready(&job->job),
@@ -348,15 +349,19 @@ static void block_job_iostatus_set_err(BlockJob *job, int error)
 static void block_job_event_cancelled(Notifier *n, void *opaque)
 {
     BlockJob *job = opaque;
+    uint64_t progress_current, progress_total;
 
     if (block_job_is_internal(job)) {
         return;
     }
 
+    progress_get_snapshot(&job->job.progress, &progress_current,
+                          &progress_total);
+
     qapi_event_send_block_job_cancelled(job_type(&job->job),
                                         job->job.id,
-                                        job->job.progress.total,
-                                        job->job.progress.current,
+                                        progress_total,
+                                        progress_current,
                                         job->speed);
 }
 
@@ -364,6 +369,7 @@ static void block_job_event_completed(Notifier *n, void *opaque)
 {
     BlockJob *job = opaque;
     const char *msg = NULL;
+    uint64_t progress_current, progress_total;
 
     if (block_job_is_internal(job)) {
         return;
@@ -373,10 +379,13 @@ static void block_job_event_completed(Notifier *n, void *opaque)
         msg = error_get_pretty(job->job.err);
     }
 
+    progress_get_snapshot(&job->job.progress, &progress_current,
+                          &progress_total);
+
     qapi_event_send_block_job_completed(job_type(&job->job),
                                         job->job.id,
-                                        job->job.progress.total,
-                                        job->job.progress.current,
+                                        progress_total,
+                                        progress_current,
                                         job->speed,
                                         !!msg,
                                         msg);
@@ -397,15 +406,19 @@ static void block_job_event_pending(Notifier *n, void *opaque)
 static void block_job_event_ready(Notifier *n, void *opaque)
 {
     BlockJob *job = opaque;
+    uint64_t progress_current, progress_total;
 
     if (block_job_is_internal(job)) {
         return;
     }
 
+    progress_get_snapshot(&job->job.progress, &progress_current,
+                          &progress_total);
+
     qapi_event_send_block_job_ready(job_type(&job->job),
                                     job->job.id,
-                                    job->job.progress.total,
-                                    job->job.progress.current,
+                                    progress_total,
+                                    progress_current,
                                     job->speed);
 }
 
@@ -472,12 +485,9 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
     blk_set_disable_request_queuing(blk, true);
     blk_set_allow_aio_context_change(blk, true);
 
-    /* Only set speed when necessary to avoid NotSupported error */
-    if (speed != 0) {
-        if (!block_job_set_speed(job, speed, errp)) {
-            job_early_fail(&job->job);
-            return NULL;
-        }
+    if (!block_job_set_speed(job, speed, errp)) {
+        job_early_fail(&job->job);
+        return NULL;
     }
 
     return job;
diff --git a/include/block/block-copy.h b/include/block/block-copy.h
index 338f2ea7fd..5c8278895c 100644
--- a/include/block/block-copy.h
+++ b/include/block/block-copy.h
@@ -18,6 +18,8 @@
 #include "block/block.h"
 #include "qemu/co-shared-resource.h"
 
+/* All APIs are thread-safe */
+
 typedef void (*BlockCopyAsyncCallbackFunc)(void *opaque);
 typedef struct BlockCopyState BlockCopyState;
 typedef struct BlockCopyCallState BlockCopyCallState;
diff --git a/include/qemu/co-shared-resource.h b/include/qemu/co-shared-resource.h
index 4e4503004c..78ca5850f8 100644
--- a/include/qemu/co-shared-resource.h
+++ b/include/qemu/co-shared-resource.h
@@ -26,15 +26,13 @@
 #ifndef QEMU_CO_SHARED_RESOURCE_H
 #define QEMU_CO_SHARED_RESOURCE_H
 
-
+/* Accesses to co-shared-resource API are thread-safe */
 typedef struct SharedResource SharedResource;
 
 /*
  * Create SharedResource structure
  *
  * @total: total amount of some resource to be shared between clients
- *
- * Note: this API is not thread-safe.
  */
 SharedResource *shres_create(uint64_t total);
 
diff --git a/include/qemu/progress_meter.h b/include/qemu/progress_meter.h
index 9a23ff071c..dadf822bbf 100644
--- a/include/qemu/progress_meter.h
+++ b/include/qemu/progress_meter.h
@@ -27,6 +27,8 @@
 #ifndef QEMU_PROGRESS_METER_H
 #define QEMU_PROGRESS_METER_H
 
+#include "qemu/lockable.h"
+
 typedef struct ProgressMeter {
     /**
      * Current progress. The unit is arbitrary as long as the ratio between
@@ -37,22 +39,24 @@ typedef struct ProgressMeter {
 
     /** Estimated current value at the completion of the process */
     uint64_t total;
+
+    QemuMutex lock; /* protects concurrent access to above fields */
 } ProgressMeter;
 
-static inline void progress_work_done(ProgressMeter *pm, uint64_t done)
-{
-    pm->current += done;
-}
-
-static inline void progress_set_remaining(ProgressMeter *pm, uint64_t remaining)
-{
-    pm->total = pm->current + remaining;
-}
-
-static inline void progress_increase_remaining(ProgressMeter *pm,
-                                               uint64_t delta)
-{
-    pm->total += delta;
-}
+void progress_init(ProgressMeter *pm);
+void progress_destroy(ProgressMeter *pm);
+
+/* Get a snapshot of internal current and total values  */
+void progress_get_snapshot(ProgressMeter *pm, uint64_t *current,
+                           uint64_t *total);
+
+/* Increases the amount of work done so far by @done */
+void progress_work_done(ProgressMeter *pm, uint64_t done);
+
+/* Sets how much work has to be done to complete to @remaining */
+void progress_set_remaining(ProgressMeter *pm, uint64_t remaining);
+
+/* Increases the total work to do by @delta */
+void progress_increase_remaining(ProgressMeter *pm, uint64_t delta);
 
 #endif /* QEMU_PROGRESS_METER_H */
diff --git a/include/qemu/ratelimit.h b/include/qemu/ratelimit.h
index 003ea6d5a3..48bf59e857 100644
--- a/include/qemu/ratelimit.h
+++ b/include/qemu/ratelimit.h
@@ -43,7 +43,11 @@ static inline int64_t ratelimit_calculate_delay(RateLimit *limit, uint64_t n)
     double delay_slices;
 
     QEMU_LOCK_GUARD(&limit->lock);
-    assert(limit->slice_quota && limit->slice_ns);
+    if (!limit->slice_quota) {
+        /* Throttling disabled.  */
+        return 0;
+    }
+    assert(limit->slice_ns);
 
     if (limit->slice_end_time < now) {
         /* Previous, possibly extended, time slice finished; reset the
@@ -83,7 +87,11 @@ static inline void ratelimit_set_speed(RateLimit *limit, uint64_t speed,
 {
     QEMU_LOCK_GUARD(&limit->lock);
     limit->slice_ns = slice_ns;
-    limit->slice_quota = MAX(((double)speed * slice_ns) / 1000000000ULL, 1);
+    if (speed == 0) {
+        limit->slice_quota = 0;
+    } else {
+        limit->slice_quota = MAX(((double)speed * slice_ns) / 1000000000ULL, 1);
+    }
 }
 
 #endif
diff --git a/job-qmp.c b/job-qmp.c
index 34c4da094f..829a28aa70 100644
--- a/job-qmp.c
+++ b/job-qmp.c
@@ -144,16 +144,20 @@ void qmp_job_dismiss(const char *id, Error **errp)
 static JobInfo *job_query_single(Job *job, Error **errp)
 {
     JobInfo *info;
+    uint64_t progress_current;
+    uint64_t progress_total;
 
     assert(!job_is_internal(job));
+    progress_get_snapshot(&job->progress, &progress_current,
+                          &progress_total);
 
     info = g_new(JobInfo, 1);
     *info = (JobInfo) {
         .id                 = g_strdup(job->id),
         .type               = job_type(job),
         .status             = job->status,
-        .current_progress   = job->progress.current,
-        .total_progress     = job->progress.total,
+        .current_progress   = progress_current,
+        .total_progress     = progress_total,
         .has_error          = !!job->err,
         .error              = job->err ? \
                               g_strdup(error_get_pretty(job->err)) : NULL,
diff --git a/job.c b/job.c
index 8775c1803b..e7a5d28854 100644
--- a/job.c
+++ b/job.c
@@ -339,6 +339,8 @@ void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
     job->cb            = cb;
     job->opaque        = opaque;
 
+    progress_init(&job->progress);
+
     notifier_list_init(&job->on_finalize_cancelled);
     notifier_list_init(&job->on_finalize_completed);
     notifier_list_init(&job->on_pending);
@@ -382,6 +384,7 @@ void job_unref(Job *job)
 
         QLIST_REMOVE(job, job_list);
 
+        progress_destroy(&job->progress);
         error_free(job->err);
         g_free(job->id);
         g_free(job);
diff --git a/qemu-img.c b/qemu-img.c
index a5993682aa..7956a89965 100644
--- a/qemu-img.c
+++ b/qemu-img.c
@@ -900,6 +900,7 @@ static void common_block_job_cb(void *opaque, int ret)
 
 static void run_block_job(BlockJob *job, Error **errp)
 {
+    uint64_t progress_current, progress_total;
     AioContext *aio_context = blk_get_aio_context(job->blk);
     int ret = 0;
 
@@ -908,9 +909,11 @@ static void run_block_job(BlockJob *job, Error **errp)
     do {
         float progress = 0.0f;
         aio_poll(aio_context, true);
-        if (job->job.progress.total) {
-            progress = (float)job->job.progress.current /
-                       job->job.progress.total * 100.f;
+
+        progress_get_snapshot(&job->job.progress, &progress_current,
+                              &progress_total);
+        if (progress_total) {
+            progress = (float)progress_current / progress_total * 100.f;
         }
         qemu_progress_print(progress, 0);
     } while (!job_is_ready(&job->job) && !job_is_completed(&job->job));
diff --git a/util/qemu-co-shared-resource.c b/util/qemu-co-shared-resource.c
index 1c83cd9d29..a66cc07e75 100644
--- a/util/qemu-co-shared-resource.c
+++ b/util/qemu-co-shared-resource.c
@@ -28,10 +28,13 @@
 #include "qemu/co-shared-resource.h"
 
 struct SharedResource {
-    uint64_t total;
-    uint64_t available;
+    uint64_t total; /* Set in shres_create() and not changed anymore */
 
+    /* State fields protected by lock */
+    uint64_t available;
     CoQueue queue;
+
+    QemuMutex lock;
 };
 
 SharedResource *shres_create(uint64_t total)
@@ -40,6 +43,7 @@ SharedResource *shres_create(uint64_t total)
 
     s->total = s->available = total;
     qemu_co_queue_init(&s->queue);
+    qemu_mutex_init(&s->lock);
 
     return s;
 }
@@ -47,10 +51,12 @@ SharedResource *shres_create(uint64_t total)
 void shres_destroy(SharedResource *s)
 {
     assert(s->available == s->total);
+    qemu_mutex_destroy(&s->lock);
     g_free(s);
 }
 
-bool co_try_get_from_shres(SharedResource *s, uint64_t n)
+/* Called with lock held. */
+static bool co_try_get_from_shres_locked(SharedResource *s, uint64_t n)
 {
     if (s->available >= n) {
         s->available -= n;
@@ -60,16 +66,24 @@ bool co_try_get_from_shres(SharedResource *s, uint64_t n)
     return false;
 }
 
+bool co_try_get_from_shres(SharedResource *s, uint64_t n)
+{
+    QEMU_LOCK_GUARD(&s->lock);
+    return co_try_get_from_shres_locked(s, n);
+}
+
 void coroutine_fn co_get_from_shres(SharedResource *s, uint64_t n)
 {
     assert(n <= s->total);
-    while (!co_try_get_from_shres(s, n)) {
-        qemu_co_queue_wait(&s->queue, NULL);
+    QEMU_LOCK_GUARD(&s->lock);
+    while (!co_try_get_from_shres_locked(s, n)) {
+        qemu_co_queue_wait(&s->queue, &s->lock);
     }
 }
 
 void coroutine_fn co_put_to_shres(SharedResource *s, uint64_t n)
 {
+    QEMU_LOCK_GUARD(&s->lock);
     assert(s->total - s->available >= n);
     s->available += n;
     qemu_co_queue_restart_all(&s->queue);