summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--block/backup.c3
-rw-r--r--block/commit.c5
-rw-r--r--block/mirror.c5
-rw-r--r--block/stream.c5
-rw-r--r--block/trace-events6
-rw-r--r--blockjob.c54
-rw-r--r--include/block/blockjob.h9
-rw-r--r--tests/test-blockjob-txn.c12
8 files changed, 67 insertions, 32 deletions
diff --git a/block/backup.c b/block/backup.c
index 4ed449448a..ae1b99aa84 100644
--- a/block/backup.c
+++ b/block/backup.c
@@ -654,9 +654,8 @@ void backup_start(const char *job_id, BlockDriverState *bs,
 
     block_job_add_bdrv(&job->common, target);
     job->common.len = len;
-    job->common.co = qemu_coroutine_create(job->common.driver->start, job);
     block_job_txn_add_job(txn, &job->common);
-    qemu_coroutine_enter(job->common.co);
+    block_job_start(&job->common);
     return;
 
  error:
diff --git a/block/commit.c b/block/commit.c
index 20d27e27a8..c284e8535d 100644
--- a/block/commit.c
+++ b/block/commit.c
@@ -289,10 +289,9 @@ void commit_start(const char *job_id, BlockDriverState *bs,
     s->backing_file_str = g_strdup(backing_file_str);
 
     s->on_error = on_error;
-    s->common.co = qemu_coroutine_create(s->common.driver->start, s);
 
-    trace_commit_start(bs, base, top, s, s->common.co);
-    qemu_coroutine_enter(s->common.co);
+    trace_commit_start(bs, base, top, s);
+    block_job_start(&s->common);
 }
 
 
diff --git a/block/mirror.c b/block/mirror.c
index 659e09cbaf..62ac87f0c4 100644
--- a/block/mirror.c
+++ b/block/mirror.c
@@ -1009,9 +1009,8 @@ static void mirror_start_job(const char *job_id, BlockDriverState *bs,
         }
     }
 
-    s->common.co = qemu_coroutine_create(s->common.driver->start, s);
-    trace_mirror_start(bs, s, s->common.co, opaque);
-    qemu_coroutine_enter(s->common.co);
+    trace_mirror_start(bs, s, opaque);
+    block_job_start(&s->common);
 }
 
 void mirror_start(const char *job_id, BlockDriverState *bs,
diff --git a/block/stream.c b/block/stream.c
index 92309ffc2e..1523ba7dfb 100644
--- a/block/stream.c
+++ b/block/stream.c
@@ -255,7 +255,6 @@ void stream_start(const char *job_id, BlockDriverState *bs,
     s->bs_flags = orig_bs_flags;
 
     s->on_error = on_error;
-    s->common.co = qemu_coroutine_create(s->common.driver->start, s);
-    trace_stream_start(bs, base, s, s->common.co);
-    qemu_coroutine_enter(s->common.co);
+    trace_stream_start(bs, base, s);
+    block_job_start(&s->common);
 }
diff --git a/block/trace-events b/block/trace-events
index 882c9034c2..cfc05f2478 100644
--- a/block/trace-events
+++ b/block/trace-events
@@ -19,14 +19,14 @@ bdrv_co_do_copy_on_readv(void *bs, int64_t offset, unsigned int bytes, int64_t c
 
 # block/stream.c
 stream_one_iteration(void *s, int64_t sector_num, int nb_sectors, int is_allocated) "s %p sector_num %"PRId64" nb_sectors %d is_allocated %d"
-stream_start(void *bs, void *base, void *s, void *co) "bs %p base %p s %p co %p"
+stream_start(void *bs, void *base, void *s) "bs %p base %p s %p"
 
 # block/commit.c
 commit_one_iteration(void *s, int64_t sector_num, int nb_sectors, int is_allocated) "s %p sector_num %"PRId64" nb_sectors %d is_allocated %d"
-commit_start(void *bs, void *base, void *top, void *s, void *co) "bs %p base %p top %p s %p co %p"
+commit_start(void *bs, void *base, void *top, void *s) "bs %p base %p top %p s %p"
 
 # block/mirror.c
-mirror_start(void *bs, void *s, void *co, void *opaque) "bs %p s %p co %p opaque %p"
+mirror_start(void *bs, void *s, void *opaque) "bs %p s %p opaque %p"
 mirror_restart_iter(void *s, int64_t cnt) "s %p dirty count %"PRId64
 mirror_before_flush(void *s) "s %p"
 mirror_before_drain(void *s, int64_t cnt) "s %p dirty count %"PRId64
diff --git a/blockjob.c b/blockjob.c
index e3c458c021..513620c199 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -174,7 +174,9 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
     job->blk           = blk;
     job->cb            = cb;
     job->opaque        = opaque;
-    job->busy          = true;
+    job->busy          = false;
+    job->paused        = true;
+    job->pause_count   = 1;
     job->refcnt        = 1;
     bs->job = job;
 
@@ -202,6 +204,23 @@ bool block_job_is_internal(BlockJob *job)
     return (job->id == NULL);
 }
 
+static bool block_job_started(BlockJob *job)
+{
+    return job->co;
+}
+
+void block_job_start(BlockJob *job)
+{
+    assert(job && !block_job_started(job) && job->paused &&
+           !job->busy && job->driver->start);
+    job->co = qemu_coroutine_create(job->driver->start, job);
+    if (--job->pause_count == 0) {
+        job->paused = false;
+        job->busy = true;
+        qemu_coroutine_enter(job->co);
+    }
+}
+
 void block_job_ref(BlockJob *job)
 {
     ++job->refcnt;
@@ -248,14 +267,18 @@ static void block_job_completed_single(BlockJob *job)
     if (job->cb) {
         job->cb(job->opaque, job->ret);
     }
-    if (block_job_is_cancelled(job)) {
-        block_job_event_cancelled(job);
-    } else {
-        const char *msg = NULL;
-        if (job->ret < 0) {
-            msg = strerror(-job->ret);
+
+    /* Emit events only if we actually started */
+    if (block_job_started(job)) {
+        if (block_job_is_cancelled(job)) {
+            block_job_event_cancelled(job);
+        } else {
+            const char *msg = NULL;
+            if (job->ret < 0) {
+                msg = strerror(-job->ret);
+            }
+            block_job_event_completed(job, msg);
         }
-        block_job_event_completed(job, msg);
     }
 
     if (job->txn) {
@@ -363,7 +386,8 @@ void block_job_complete(BlockJob *job, Error **errp)
 {
     /* Should not be reachable via external interface for internal jobs */
     assert(job->id);
-    if (job->pause_count || job->cancelled || !job->driver->complete) {
+    if (job->pause_count || job->cancelled ||
+        !block_job_started(job) || !job->driver->complete) {
         error_setg(errp, "The active block job '%s' cannot be completed",
                    job->id);
         return;
@@ -395,6 +419,8 @@ bool block_job_user_paused(BlockJob *job)
 
 void coroutine_fn block_job_pause_point(BlockJob *job)
 {
+    assert(job && block_job_started(job));
+
     if (!block_job_should_pause(job)) {
         return;
     }
@@ -446,9 +472,13 @@ void block_job_enter(BlockJob *job)
 
 void block_job_cancel(BlockJob *job)
 {
-    job->cancelled = true;
-    block_job_iostatus_reset(job);
-    block_job_enter(job);
+    if (block_job_started(job)) {
+        job->cancelled = true;
+        block_job_iostatus_reset(job);
+        block_job_enter(job);
+    } else {
+        block_job_completed(job, -ECANCELED);
+    }
 }
 
 bool block_job_is_cancelled(BlockJob *job)
diff --git a/include/block/blockjob.h b/include/block/blockjob.h
index 356cacf004..1acb256223 100644
--- a/include/block/blockjob.h
+++ b/include/block/blockjob.h
@@ -189,6 +189,15 @@ void block_job_add_bdrv(BlockJob *job, BlockDriverState *bs);
 void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp);
 
 /**
+ * block_job_start:
+ * @job: A job that has not yet been started.
+ *
+ * Begins execution of a block job.
+ * Takes ownership of one reference to the job object.
+ */
+void block_job_start(BlockJob *job);
+
+/**
  * block_job_cancel:
  * @job: The job to be canceled.
  *
diff --git a/tests/test-blockjob-txn.c b/tests/test-blockjob-txn.c
index f9afc3be41..b132e39097 100644
--- a/tests/test-blockjob-txn.c
+++ b/tests/test-blockjob-txn.c
@@ -24,10 +24,6 @@ typedef struct {
     int *result;
 } TestBlockJob;
 
-static const BlockJobDriver test_block_job_driver = {
-    .instance_size = sizeof(TestBlockJob),
-};
-
 static void test_block_job_complete(BlockJob *job, void *opaque)
 {
     BlockDriverState *bs = blk_bs(job->blk);
@@ -77,6 +73,11 @@ static void test_block_job_cb(void *opaque, int ret)
     g_free(data);
 }
 
+static const BlockJobDriver test_block_job_driver = {
+    .instance_size = sizeof(TestBlockJob),
+    .start = test_block_job_run,
+};
+
 /* Create a block job that completes with a given return code after a given
  * number of event loop iterations.  The return code is stored in the given
  * result pointer.
@@ -104,10 +105,9 @@ static BlockJob *test_block_job_start(unsigned int iterations,
     s->use_timer = use_timer;
     s->rc = rc;
     s->result = result;
-    s->common.co = qemu_coroutine_create(test_block_job_run, s);
     data->job = s;
     data->result = result;
-    qemu_coroutine_enter(s->common.co);
+    block_job_start(&s->common);
     return &s->common;
 }