summary refs log tree commit diff stats
diff options
context:
space:
mode:
authorPeter Maydell <peter.maydell@linaro.org>2018-09-24 14:35:58 +0100
committerPeter Maydell <peter.maydell@linaro.org>2018-09-24 14:35:58 +0100
commitd6f71af65410d3e003ba331c5e57eddcf716cbcf (patch)
treec839d777f3d290b8f430e2a64aca44d1ed4271eb
parent539c251b64d2cdd6aeafde1c6283688915c9aad0 (diff)
parente21a1c9831fc80ae3f3c1affdfa43350035d8588 (diff)
downloadfocaccia-qemu-d6f71af65410d3e003ba331c5e57eddcf716cbcf.tar.gz
focaccia-qemu-d6f71af65410d3e003ba331c5e57eddcf716cbcf.zip
Merge remote-tracking branch 'remotes/xanclic/tags/pull-block-2018-08-31-v2' into staging
Block patches:
- (Block) job exit refactoring, part 1
  (removing job_defer_to_main_loop())
- test-bdrv-drain leak fix

# gpg: Signature made Fri 31 Aug 2018 15:30:33 BST
# gpg:                using RSA key F407DB0061D5CF40
# gpg: Good signature from "Max Reitz <mreitz@redhat.com>"
# Primary key fingerprint: 91BE B60A 30DB 3E88 57D1  1829 F407 DB00 61D5 CF40

* remotes/xanclic/tags/pull-block-2018-08-31-v2:
  jobs: remove job_defer_to_main_loop
  jobs: remove ret argument to job_completed; privatize it
  block/backup: make function variables consistently named
  jobs: utilize job_exit shim
  block/mirror: utilize job_exit shim
  block/commit: utilize job_exit shim
  jobs: add exit shim
  jobs: canonize Error object
  jobs: change start callback to run callback
  tests: fix bdrv-drain leak

Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
-rw-r--r--block/backup.c81
-rw-r--r--block/commit.c29
-rw-r--r--block/create.c19
-rw-r--r--block/mirror.c39
-rw-r--r--block/stream.c29
-rw-r--r--include/qemu/job.h70
-rw-r--r--job-qmp.c5
-rw-r--r--job.c73
-rw-r--r--tests/test-bdrv-drain.c14
-rw-r--r--tests/test-blockjob-txn.c25
-rw-r--r--tests/test-blockjob.c17
-rw-r--r--trace-events2
12 files changed, 161 insertions, 242 deletions
diff --git a/block/backup.c b/block/backup.c
index 8630d32926..4d084f6ca6 100644
--- a/block/backup.c
+++ b/block/backup.c
@@ -380,18 +380,6 @@ static BlockErrorAction backup_error_action(BackupBlockJob *job,
     }
 }
 
-typedef struct {
-    int ret;
-} BackupCompleteData;
-
-static void backup_complete(Job *job, void *opaque)
-{
-    BackupCompleteData *data = opaque;
-
-    job_completed(job, data->ret, NULL);
-    g_free(data);
-}
-
 static bool coroutine_fn yield_and_check(BackupBlockJob *job)
 {
     uint64_t delay_ns;
@@ -480,60 +468,59 @@ static void backup_incremental_init_copy_bitmap(BackupBlockJob *job)
     bdrv_dirty_iter_free(dbi);
 }
 
-static void coroutine_fn backup_run(void *opaque)
+static int coroutine_fn backup_run(Job *job, Error **errp)
 {
-    BackupBlockJob *job = opaque;
-    BackupCompleteData *data;
-    BlockDriverState *bs = blk_bs(job->common.blk);
+    BackupBlockJob *s = container_of(job, BackupBlockJob, common.job);
+    BlockDriverState *bs = blk_bs(s->common.blk);
     int64_t offset, nb_clusters;
     int ret = 0;
 
-    QLIST_INIT(&job->inflight_reqs);
-    qemu_co_rwlock_init(&job->flush_rwlock);
+    QLIST_INIT(&s->inflight_reqs);
+    qemu_co_rwlock_init(&s->flush_rwlock);
 
-    nb_clusters = DIV_ROUND_UP(job->len, job->cluster_size);
-    job_progress_set_remaining(&job->common.job, job->len);
+    nb_clusters = DIV_ROUND_UP(s->len, s->cluster_size);
+    job_progress_set_remaining(job, s->len);
 
-    job->copy_bitmap = hbitmap_alloc(nb_clusters, 0);
-    if (job->sync_mode == MIRROR_SYNC_MODE_INCREMENTAL) {
-        backup_incremental_init_copy_bitmap(job);
+    s->copy_bitmap = hbitmap_alloc(nb_clusters, 0);
+    if (s->sync_mode == MIRROR_SYNC_MODE_INCREMENTAL) {
+        backup_incremental_init_copy_bitmap(s);
     } else {
-        hbitmap_set(job->copy_bitmap, 0, nb_clusters);
+        hbitmap_set(s->copy_bitmap, 0, nb_clusters);
     }
 
 
-    job->before_write.notify = backup_before_write_notify;
-    bdrv_add_before_write_notifier(bs, &job->before_write);
+    s->before_write.notify = backup_before_write_notify;
+    bdrv_add_before_write_notifier(bs, &s->before_write);
 
-    if (job->sync_mode == MIRROR_SYNC_MODE_NONE) {
+    if (s->sync_mode == MIRROR_SYNC_MODE_NONE) {
         /* All bits are set in copy_bitmap to allow any cluster to be copied.
          * This does not actually require them to be copied. */
-        while (!job_is_cancelled(&job->common.job)) {
+        while (!job_is_cancelled(job)) {
             /* Yield until the job is cancelled.  We just let our before_write
              * notify callback service CoW requests. */
-            job_yield(&job->common.job);
+            job_yield(job);
         }
-    } else if (job->sync_mode == MIRROR_SYNC_MODE_INCREMENTAL) {
-        ret = backup_run_incremental(job);
+    } else if (s->sync_mode == MIRROR_SYNC_MODE_INCREMENTAL) {
+        ret = backup_run_incremental(s);
     } else {
         /* Both FULL and TOP SYNC_MODE's require copying.. */
-        for (offset = 0; offset < job->len;
-             offset += job->cluster_size) {
+        for (offset = 0; offset < s->len;
+             offset += s->cluster_size) {
             bool error_is_read;
             int alloced = 0;
 
-            if (yield_and_check(job)) {
+            if (yield_and_check(s)) {
                 break;
             }
 
-            if (job->sync_mode == MIRROR_SYNC_MODE_TOP) {
+            if (s->sync_mode == MIRROR_SYNC_MODE_TOP) {
                 int i;
                 int64_t n;
 
                 /* Check to see if these blocks are already in the
                  * backing file. */
 
-                for (i = 0; i < job->cluster_size;) {
+                for (i = 0; i < s->cluster_size;) {
                     /* bdrv_is_allocated() only returns true/false based
                      * on the first set of sectors it comes across that
                      * are are all in the same state.
@@ -542,7 +529,7 @@ static void coroutine_fn backup_run(void *opaque)
                      * needed but at some point that is always the case. */
                     alloced =
                         bdrv_is_allocated(bs, offset + i,
-                                          job->cluster_size - i, &n);
+                                          s->cluster_size - i, &n);
                     i += n;
 
                     if (alloced || n == 0) {
@@ -560,33 +547,31 @@ static void coroutine_fn backup_run(void *opaque)
             if (alloced < 0) {
                 ret = alloced;
             } else {
-                ret = backup_do_cow(job, offset, job->cluster_size,
+                ret = backup_do_cow(s, offset, s->cluster_size,
                                     &error_is_read, false);
             }
             if (ret < 0) {
                 /* Depending on error action, fail now or retry cluster */
                 BlockErrorAction action =
-                    backup_error_action(job, error_is_read, -ret);
+                    backup_error_action(s, error_is_read, -ret);
                 if (action == BLOCK_ERROR_ACTION_REPORT) {
                     break;
                 } else {
-                    offset -= job->cluster_size;
+                    offset -= s->cluster_size;
                     continue;
                 }
             }
         }
     }
 
-    notifier_with_return_remove(&job->before_write);
+    notifier_with_return_remove(&s->before_write);
 
     /* wait until pending backup_do_cow() calls have completed */
-    qemu_co_rwlock_wrlock(&job->flush_rwlock);
-    qemu_co_rwlock_unlock(&job->flush_rwlock);
-    hbitmap_free(job->copy_bitmap);
+    qemu_co_rwlock_wrlock(&s->flush_rwlock);
+    qemu_co_rwlock_unlock(&s->flush_rwlock);
+    hbitmap_free(s->copy_bitmap);
 
-    data = g_malloc(sizeof(*data));
-    data->ret = ret;
-    job_defer_to_main_loop(&job->common.job, backup_complete, data);
+    return ret;
 }
 
 static const BlockJobDriver backup_job_driver = {
@@ -596,7 +581,7 @@ static const BlockJobDriver backup_job_driver = {
         .free                   = block_job_free,
         .user_resume            = block_job_user_resume,
         .drain                  = block_job_drain,
-        .start                  = backup_run,
+        .run                    = backup_run,
         .commit                 = backup_commit,
         .abort                  = backup_abort,
         .clean                  = backup_clean,
diff --git a/block/commit.c b/block/commit.c
index eb414579bd..da69165de3 100644
--- a/block/commit.c
+++ b/block/commit.c
@@ -68,19 +68,13 @@ static int coroutine_fn commit_populate(BlockBackend *bs, BlockBackend *base,
     return 0;
 }
 
-typedef struct {
-    int ret;
-} CommitCompleteData;
-
-static void commit_complete(Job *job, void *opaque)
+static void commit_exit(Job *job)
 {
     CommitBlockJob *s = container_of(job, CommitBlockJob, common.job);
     BlockJob *bjob = &s->common;
-    CommitCompleteData *data = opaque;
     BlockDriverState *top = blk_bs(s->top);
     BlockDriverState *base = blk_bs(s->base);
     BlockDriverState *commit_top_bs = s->commit_top_bs;
-    int ret = data->ret;
     bool remove_commit_top_bs = false;
 
     /* Make sure commit_top_bs and top stay around until bdrv_replace_node() */
@@ -91,10 +85,10 @@ static void commit_complete(Job *job, void *opaque)
      * the normal backing chain can be restored. */
     blk_unref(s->base);
 
-    if (!job_is_cancelled(job) && ret == 0) {
+    if (!job_is_cancelled(job) && job->ret == 0) {
         /* success */
-        ret = bdrv_drop_intermediate(s->commit_top_bs, base,
-                                     s->backing_file_str);
+        job->ret = bdrv_drop_intermediate(s->commit_top_bs, base,
+                                          s->backing_file_str);
     } else {
         /* XXX Can (or should) we somehow keep 'consistent read' blocked even
          * after the failed/cancelled commit job is gone? If we already wrote
@@ -117,9 +111,6 @@ static void commit_complete(Job *job, void *opaque)
      * bdrv_set_backing_hd() to fail. */
     block_job_remove_all_bdrv(bjob);
 
-    job_completed(job, ret, NULL);
-    g_free(data);
-
     /* If bdrv_drop_intermediate() didn't already do that, remove the commit
      * filter driver from the backing chain. Do this as the final step so that
      * the 'consistent read' permission can be granted.  */
@@ -134,10 +125,9 @@ static void commit_complete(Job *job, void *opaque)
     bdrv_unref(top);
 }
 
-static void coroutine_fn commit_run(void *opaque)
+static int coroutine_fn commit_run(Job *job, Error **errp)
 {
-    CommitBlockJob *s = opaque;
-    CommitCompleteData *data;
+    CommitBlockJob *s = container_of(job, CommitBlockJob, common.job);
     int64_t offset;
     uint64_t delay_ns = 0;
     int ret = 0;
@@ -210,9 +200,7 @@ static void coroutine_fn commit_run(void *opaque)
 out:
     qemu_vfree(buf);
 
-    data = g_malloc(sizeof(*data));
-    data->ret = ret;
-    job_defer_to_main_loop(&s->common.job, commit_complete, data);
+    return ret;
 }
 
 static const BlockJobDriver commit_job_driver = {
@@ -222,7 +210,8 @@ static const BlockJobDriver commit_job_driver = {
         .free          = block_job_free,
         .user_resume   = block_job_user_resume,
         .drain         = block_job_drain,
-        .start         = commit_run,
+        .run           = commit_run,
+        .exit          = commit_exit,
     },
 };
 
diff --git a/block/create.c b/block/create.c
index 915cd41bcc..95341219ef 100644
--- a/block/create.c
+++ b/block/create.c
@@ -34,33 +34,26 @@ typedef struct BlockdevCreateJob {
     Job common;
     BlockDriver *drv;
     BlockdevCreateOptions *opts;
-    int ret;
-    Error *err;
 } BlockdevCreateJob;
 
-static void blockdev_create_complete(Job *job, void *opaque)
+static int coroutine_fn blockdev_create_run(Job *job, Error **errp)
 {
     BlockdevCreateJob *s = container_of(job, BlockdevCreateJob, common);
-
-    job_completed(job, s->ret, s->err);
-}
-
-static void coroutine_fn blockdev_create_run(void *opaque)
-{
-    BlockdevCreateJob *s = opaque;
+    int ret;
 
     job_progress_set_remaining(&s->common, 1);
-    s->ret = s->drv->bdrv_co_create(s->opts, &s->err);
+    ret = s->drv->bdrv_co_create(s->opts, errp);
     job_progress_update(&s->common, 1);
 
     qapi_free_BlockdevCreateOptions(s->opts);
-    job_defer_to_main_loop(&s->common, blockdev_create_complete, NULL);
+
+    return ret;
 }
 
 static const JobDriver blockdev_create_job_driver = {
     .instance_size = sizeof(BlockdevCreateJob),
     .job_type      = JOB_TYPE_CREATE,
-    .start         = blockdev_create_run,
+    .run           = blockdev_create_run,
 };
 
 void qmp_blockdev_create(const char *job_id, BlockdevCreateOptions *options,
diff --git a/block/mirror.c b/block/mirror.c
index 6cc10df5c9..b8941db6c1 100644
--- a/block/mirror.c
+++ b/block/mirror.c
@@ -607,26 +607,22 @@ static void mirror_wait_for_all_io(MirrorBlockJob *s)
     }
 }
 
-typedef struct {
-    int ret;
-} MirrorExitData;
-
-static void mirror_exit(Job *job, void *opaque)
+static void mirror_exit(Job *job)
 {
     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
     BlockJob *bjob = &s->common;
-    MirrorExitData *data = opaque;
     MirrorBDSOpaque *bs_opaque = s->mirror_top_bs->opaque;
     AioContext *replace_aio_context = NULL;
     BlockDriverState *src = s->mirror_top_bs->backing->bs;
     BlockDriverState *target_bs = blk_bs(s->target);
     BlockDriverState *mirror_top_bs = s->mirror_top_bs;
     Error *local_err = NULL;
+    int ret = job->ret;
 
     bdrv_release_dirty_bitmap(src, s->dirty_bitmap);
 
-    /* Make sure that the source BDS doesn't go away before we called
-     * job_completed(). */
+    /* Make sure that the source BDS doesn't go away during bdrv_replace_node,
+     * before we can call bdrv_drained_end */
     bdrv_ref(src);
     bdrv_ref(mirror_top_bs);
     bdrv_ref(target_bs);
@@ -652,7 +648,7 @@ static void mirror_exit(Job *job, void *opaque)
             bdrv_set_backing_hd(target_bs, backing, &local_err);
             if (local_err) {
                 error_report_err(local_err);
-                data->ret = -EPERM;
+                ret = -EPERM;
             }
         }
     }
@@ -662,7 +658,7 @@ static void mirror_exit(Job *job, void *opaque)
         aio_context_acquire(replace_aio_context);
     }
 
-    if (s->should_complete && data->ret == 0) {
+    if (s->should_complete && ret == 0) {
         BlockDriverState *to_replace = src;
         if (s->to_replace) {
             to_replace = s->to_replace;
@@ -679,7 +675,7 @@ static void mirror_exit(Job *job, void *opaque)
         bdrv_drained_end(target_bs);
         if (local_err) {
             error_report_err(local_err);
-            data->ret = -EPERM;
+            ret = -EPERM;
         }
     }
     if (s->to_replace) {
@@ -710,12 +706,12 @@ static void mirror_exit(Job *job, void *opaque)
     blk_insert_bs(bjob->blk, mirror_top_bs, &error_abort);
 
     bs_opaque->job = NULL;
-    job_completed(job, data->ret, NULL);
 
-    g_free(data);
     bdrv_drained_end(src);
     bdrv_unref(mirror_top_bs);
     bdrv_unref(src);
+
+    job->ret = ret;
 }
 
 static void mirror_throttle(MirrorBlockJob *s)
@@ -812,10 +808,9 @@ static int mirror_flush(MirrorBlockJob *s)
     return ret;
 }
 
-static void coroutine_fn mirror_run(void *opaque)
+static int coroutine_fn mirror_run(Job *job, Error **errp)
 {
-    MirrorBlockJob *s = opaque;
-    MirrorExitData *data;
+    MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
     BlockDriverState *bs = s->mirror_top_bs->backing->bs;
     BlockDriverState *target_bs = blk_bs(s->target);
     bool need_drain = true;
@@ -1035,13 +1030,11 @@ immediate_exit:
     g_free(s->in_flight_bitmap);
     bdrv_dirty_iter_free(s->dbi);
 
-    data = g_malloc(sizeof(*data));
-    data->ret = ret;
-
     if (need_drain) {
         bdrv_drained_begin(bs);
     }
-    job_defer_to_main_loop(&s->common.job, mirror_exit, data);
+
+    return ret;
 }
 
 static void mirror_complete(Job *job, Error **errp)
@@ -1138,7 +1131,8 @@ static const BlockJobDriver mirror_job_driver = {
         .free                   = block_job_free,
         .user_resume            = block_job_user_resume,
         .drain                  = block_job_drain,
-        .start                  = mirror_run,
+        .run                    = mirror_run,
+        .exit                   = mirror_exit,
         .pause                  = mirror_pause,
         .complete               = mirror_complete,
     },
@@ -1154,7 +1148,8 @@ static const BlockJobDriver commit_active_job_driver = {
         .free                   = block_job_free,
         .user_resume            = block_job_user_resume,
         .drain                  = block_job_drain,
-        .start                  = mirror_run,
+        .run                    = mirror_run,
+        .exit                   = mirror_exit,
         .pause                  = mirror_pause,
         .complete               = mirror_complete,
     },
diff --git a/block/stream.c b/block/stream.c
index 9264b68a1e..67e1e72e23 100644
--- a/block/stream.c
+++ b/block/stream.c
@@ -54,20 +54,16 @@ static int coroutine_fn stream_populate(BlockBackend *blk,
     return blk_co_preadv(blk, offset, qiov.size, &qiov, BDRV_REQ_COPY_ON_READ);
 }
 
-typedef struct {
-    int ret;
-} StreamCompleteData;
-
-static void stream_complete(Job *job, void *opaque)
+static void stream_exit(Job *job)
 {
     StreamBlockJob *s = container_of(job, StreamBlockJob, common.job);
     BlockJob *bjob = &s->common;
-    StreamCompleteData *data = opaque;
     BlockDriverState *bs = blk_bs(bjob->blk);
     BlockDriverState *base = s->base;
     Error *local_err = NULL;
+    int ret = job->ret;
 
-    if (!job_is_cancelled(job) && bs->backing && data->ret == 0) {
+    if (!job_is_cancelled(job) && bs->backing && ret == 0) {
         const char *base_id = NULL, *base_fmt = NULL;
         if (base) {
             base_id = s->backing_file_str;
@@ -75,11 +71,11 @@ static void stream_complete(Job *job, void *opaque)
                 base_fmt = base->drv->format_name;
             }
         }
-        data->ret = bdrv_change_backing_file(bs, base_id, base_fmt);
+        ret = bdrv_change_backing_file(bs, base_id, base_fmt);
         bdrv_set_backing_hd(bs, base, &local_err);
         if (local_err) {
             error_report_err(local_err);
-            data->ret = -EPERM;
+            ret = -EPERM;
             goto out;
         }
     }
@@ -93,14 +89,12 @@ out:
     }
 
     g_free(s->backing_file_str);
-    job_completed(job, data->ret, NULL);
-    g_free(data);
+    job->ret = ret;
 }
 
-static void coroutine_fn stream_run(void *opaque)
+static int coroutine_fn stream_run(Job *job, Error **errp)
 {
-    StreamBlockJob *s = opaque;
-    StreamCompleteData *data;
+    StreamBlockJob *s = container_of(job, StreamBlockJob, common.job);
     BlockBackend *blk = s->common.blk;
     BlockDriverState *bs = blk_bs(blk);
     BlockDriverState *base = s->base;
@@ -203,9 +197,7 @@ static void coroutine_fn stream_run(void *opaque)
 
 out:
     /* Modify backing chain and close BDSes in main loop */
-    data = g_malloc(sizeof(*data));
-    data->ret = ret;
-    job_defer_to_main_loop(&s->common.job, stream_complete, data);
+    return ret;
 }
 
 static const BlockJobDriver stream_job_driver = {
@@ -213,7 +205,8 @@ static const BlockJobDriver stream_job_driver = {
         .instance_size = sizeof(StreamBlockJob),
         .job_type      = JOB_TYPE_STREAM,
         .free          = block_job_free,
-        .start         = stream_run,
+        .run           = stream_run,
+        .exit          = stream_exit,
         .user_resume   = block_job_user_resume,
         .drain         = block_job_drain,
     },
diff --git a/include/qemu/job.h b/include/qemu/job.h
index 18c9223e31..e0cff702b7 100644
--- a/include/qemu/job.h
+++ b/include/qemu/job.h
@@ -124,12 +124,20 @@ typedef struct Job {
     /** Estimated progress_current value at the completion of the job */
     int64_t progress_total;
 
-    /** Error string for a failed job (NULL if, and only if, job->ret == 0) */
-    char *error;
-
-    /** ret code passed to job_completed. */
+    /**
+     * Return code from @run and/or @prepare callback(s).
+     * Not final until the job has reached the CONCLUDED status.
+     * 0 on success, -errno on failure.
+     */
     int ret;
 
+    /**
+     * Error object for a failed job.
+     * If job->ret is nonzero and an error object was not set, it will be set
+     * to strerror(-job->ret) during job_completed.
+     */
+    Error *err;
+
     /** The completion function that will be called when the job completes.  */
     BlockCompletionFunc *cb;
 
@@ -168,8 +176,17 @@ struct JobDriver {
     /** Enum describing the operation */
     JobType job_type;
 
-    /** Mandatory: Entrypoint for the Coroutine. */
-    CoroutineEntry *start;
+    /**
+     * Mandatory: Entrypoint for the Coroutine.
+     *
+     * This callback will be invoked when moving from CREATED to RUNNING.
+     *
+     * If this callback returns nonzero, the job transaction it is part of is
+     * aborted. If it returns zero, the job moves into the WAITING state. If it
+     * is the last job to complete in its transaction, all jobs in the
+     * transaction move from WAITING to PENDING.
+     */
+    int coroutine_fn (*run)(Job *job, Error **errp);
 
     /**
      * If the callback is not NULL, it will be invoked when the job transitions
@@ -205,6 +222,17 @@ struct JobDriver {
     void (*drain)(Job *job);
 
     /**
+     * If the callback is not NULL, exit will be invoked from the main thread
+     * when the job's coroutine has finished, but before transactional
+     * convergence; before @prepare or @abort.
+     *
+     * FIXME TODO: This callback is only temporary to transition remaining jobs
+     * to prepare/commit/abort/clean callbacks and will be removed before 3.1.
+     * is released.
+     */
+    void (*exit)(Job *job);
+
+    /**
      * If the callback is not NULL, prepare will be invoked when all the jobs
      * belonging to the same transaction complete; or upon this job's completion
      * if it is not in a transaction.
@@ -481,19 +509,6 @@ void job_early_fail(Job *job);
 /** Moves the @job from RUNNING to READY */
 void job_transition_to_ready(Job *job);
 
-/**
- * @job: The job being completed.
- * @ret: The status code.
- * @error: The error message for a failing job (only with @ret < 0). If @ret is
- *         negative, but NULL is given for @error, strerror() is used.
- *
- * Marks @job as completed. If @ret is non-zero, the job transaction it is part
- * of is aborted. If @ret is zero, the job moves into the WAITING state. If it
- * is the last job to complete in its transaction, all jobs in the transaction
- * move from WAITING to PENDING.
- */
-void job_completed(Job *job, int ret, Error *error);
-
 /** Asynchronously complete the specified @job. */
 void job_complete(Job *job, Error **errp);
 
@@ -553,23 +568,6 @@ void job_finalize(Job *job, Error **errp);
  */
 void job_dismiss(Job **job, Error **errp);
 
-typedef void JobDeferToMainLoopFn(Job *job, void *opaque);
-
-/**
- * @job: The job
- * @fn: The function to run in the main loop
- * @opaque: The opaque value that is passed to @fn
- *
- * This function must be called by the main job coroutine just before it
- * returns.  @fn is executed in the main loop with the job AioContext acquired.
- *
- * Block jobs must call bdrv_unref(), bdrv_close(), and anything that uses
- * bdrv_drain_all() in the main loop.
- *
- * The @job AioContext is held while @fn executes.
- */
-void job_defer_to_main_loop(Job *job, JobDeferToMainLoopFn *fn, void *opaque);
-
 /**
  * Synchronously finishes the given @job. If @finish is given, it is called to
  * trigger completion or cancellation of the job.
diff --git a/job-qmp.c b/job-qmp.c
index 410775df61..a969b2bbf0 100644
--- a/job-qmp.c
+++ b/job-qmp.c
@@ -146,8 +146,9 @@ static JobInfo *job_query_single(Job *job, Error **errp)
         .status             = job->status,
         .current_progress   = job->progress_current,
         .total_progress     = job->progress_total,
-        .has_error          = !!job->error,
-        .error              = g_strdup(job->error),
+        .has_error          = !!job->err,
+        .error              = job->err ? \
+                              g_strdup(error_get_pretty(job->err)) : NULL,
     };
 
     return info;
diff --git a/job.c b/job.c
index b9ebd1c091..2327d790cc 100644
--- a/job.c
+++ b/job.c
@@ -369,7 +369,7 @@ void job_unref(Job *job)
 
         QLIST_REMOVE(job, job_list);
 
-        g_free(job->error);
+        error_free(job->err);
         g_free(job->id);
         g_free(job);
     }
@@ -535,6 +535,20 @@ void job_drain(Job *job)
     }
 }
 
+static void job_completed(Job *job);
+
+static void job_exit(void *opaque)
+{
+    Job *job = (Job *)opaque;
+    AioContext *aio_context = job->aio_context;
+
+    if (job->driver->exit) {
+        aio_context_acquire(aio_context);
+        job->driver->exit(job);
+        aio_context_release(aio_context);
+    }
+    job_completed(job);
+}
 
 /**
  * All jobs must allow a pause point before entering their job proper. This
@@ -544,16 +558,18 @@ static void coroutine_fn job_co_entry(void *opaque)
 {
     Job *job = opaque;
 
-    assert(job && job->driver && job->driver->start);
+    assert(job && job->driver && job->driver->run);
     job_pause_point(job);
-    job->driver->start(job);
+    job->ret = job->driver->run(job, &job->err);
+    job->deferred_to_main_loop = true;
+    aio_bh_schedule_oneshot(qemu_get_aio_context(), job_exit, job);
 }
 
 
 void job_start(Job *job)
 {
     assert(job && !job_started(job) && job->paused &&
-           job->driver && job->driver->start);
+           job->driver && job->driver->run);
     job->co = qemu_coroutine_create(job_co_entry, job);
     job->pause_count--;
     job->busy = true;
@@ -666,8 +682,8 @@ static void job_update_rc(Job *job)
         job->ret = -ECANCELED;
     }
     if (job->ret) {
-        if (!job->error) {
-            job->error = g_strdup(strerror(-job->ret));
+        if (!job->err) {
+            error_setg(&job->err, "%s", strerror(-job->ret));
         }
         job_state_transition(job, JOB_STATUS_ABORTING);
     }
@@ -865,19 +881,12 @@ static void job_completed_txn_success(Job *job)
     }
 }
 
-void job_completed(Job *job, int ret, Error *error)
+static void job_completed(Job *job)
 {
     assert(job && job->txn && !job_is_completed(job));
 
-    job->ret = ret;
-    if (error) {
-        assert(job->ret < 0);
-        job->error = g_strdup(error_get_pretty(error));
-        error_free(error);
-    }
-
     job_update_rc(job);
-    trace_job_completed(job, ret, job->ret);
+    trace_job_completed(job, job->ret);
     if (job->ret) {
         job_completed_txn_abort(job);
     } else {
@@ -893,7 +902,7 @@ void job_cancel(Job *job, bool force)
     }
     job_cancel_async(job, force);
     if (!job_started(job)) {
-        job_completed(job, -ECANCELED, NULL);
+        job_completed(job);
     } else if (job->deferred_to_main_loop) {
         job_completed_txn_abort(job);
     } else {
@@ -956,38 +965,6 @@ void job_complete(Job *job, Error **errp)
     job->driver->complete(job, errp);
 }
 
-
-typedef struct {
-    Job *job;
-    JobDeferToMainLoopFn *fn;
-    void *opaque;
-} JobDeferToMainLoopData;
-
-static void job_defer_to_main_loop_bh(void *opaque)
-{
-    JobDeferToMainLoopData *data = opaque;
-    Job *job = data->job;
-    AioContext *aio_context = job->aio_context;
-
-    aio_context_acquire(aio_context);
-    data->fn(data->job, data->opaque);
-    aio_context_release(aio_context);
-
-    g_free(data);
-}
-
-void job_defer_to_main_loop(Job *job, JobDeferToMainLoopFn *fn, void *opaque)
-{
-    JobDeferToMainLoopData *data = g_malloc(sizeof(*data));
-    data->job = job;
-    data->fn = fn;
-    data->opaque = opaque;
-    job->deferred_to_main_loop = true;
-
-    aio_bh_schedule_oneshot(qemu_get_aio_context(),
-                            job_defer_to_main_loop_bh, data);
-}
-
 int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp), Error **errp)
 {
     Error *local_err = NULL;
diff --git a/tests/test-bdrv-drain.c b/tests/test-bdrv-drain.c
index 17bb8508ae..89ac15e88a 100644
--- a/tests/test-bdrv-drain.c
+++ b/tests/test-bdrv-drain.c
@@ -752,14 +752,9 @@ typedef struct TestBlockJob {
     bool should_complete;
 } TestBlockJob;
 
-static void test_job_completed(Job *job, void *opaque)
+static int coroutine_fn test_job_run(Job *job, Error **errp)
 {
-    job_completed(job, 0, NULL);
-}
-
-static void coroutine_fn test_job_start(void *opaque)
-{
-    TestBlockJob *s = opaque;
+    TestBlockJob *s = container_of(job, TestBlockJob, common.job);
 
     job_transition_to_ready(&s->common.job);
     while (!s->should_complete) {
@@ -770,7 +765,7 @@ static void coroutine_fn test_job_start(void *opaque)
         job_pause_point(&s->common.job);
     }
 
-    job_defer_to_main_loop(&s->common.job, test_job_completed, NULL);
+    return 0;
 }
 
 static void test_job_complete(Job *job, Error **errp)
@@ -785,7 +780,7 @@ BlockJobDriver test_job_driver = {
         .free           = block_job_free,
         .user_resume    = block_job_user_resume,
         .drain          = block_job_drain,
-        .start          = test_job_start,
+        .run            = test_job_run,
         .complete       = test_job_complete,
     },
 };
@@ -948,6 +943,7 @@ static void coroutine_fn test_co_delete_by_drain(void *opaque)
     }
 
     dbdd->done = true;
+    g_free(buffer);
 }
 
 /**
diff --git a/tests/test-blockjob-txn.c b/tests/test-blockjob-txn.c
index 58d9b87fb2..ef29f35e44 100644
--- a/tests/test-blockjob-txn.c
+++ b/tests/test-blockjob-txn.c
@@ -24,39 +24,31 @@ typedef struct {
     int *result;
 } TestBlockJob;
 
-static void test_block_job_complete(Job *job, void *opaque)
+static void test_block_job_exit(Job *job)
 {
     BlockJob *bjob = container_of(job, BlockJob, job);
     BlockDriverState *bs = blk_bs(bjob->blk);
-    int rc = (intptr_t)opaque;
 
-    if (job_is_cancelled(job)) {
-        rc = -ECANCELED;
-    }
-
-    job_completed(job, rc, NULL);
     bdrv_unref(bs);
 }
 
-static void coroutine_fn test_block_job_run(void *opaque)
+static int coroutine_fn test_block_job_run(Job *job, Error **errp)
 {
-    TestBlockJob *s = opaque;
-    BlockJob *job = &s->common;
+    TestBlockJob *s = container_of(job, TestBlockJob, common.job);
 
     while (s->iterations--) {
         if (s->use_timer) {
-            job_sleep_ns(&job->job, 0);
+            job_sleep_ns(job, 0);
         } else {
-            job_yield(&job->job);
+            job_yield(job);
         }
 
-        if (job_is_cancelled(&job->job)) {
+        if (job_is_cancelled(job)) {
             break;
         }
     }
 
-    job_defer_to_main_loop(&job->job, test_block_job_complete,
-                           (void *)(intptr_t)s->rc);
+    return s->rc;
 }
 
 typedef struct {
@@ -80,7 +72,8 @@ static const BlockJobDriver test_block_job_driver = {
         .free          = block_job_free,
         .user_resume   = block_job_user_resume,
         .drain         = block_job_drain,
-        .start         = test_block_job_run,
+        .run           = test_block_job_run,
+        .exit          = test_block_job_exit,
     },
 };
 
diff --git a/tests/test-blockjob.c b/tests/test-blockjob.c
index cb42f06e61..ad4a65bc78 100644
--- a/tests/test-blockjob.c
+++ b/tests/test-blockjob.c
@@ -163,11 +163,10 @@ typedef struct CancelJob {
     bool completed;
 } CancelJob;
 
-static void cancel_job_completed(Job *job, void *opaque)
+static void cancel_job_exit(Job *job)
 {
-    CancelJob *s = opaque;
+    CancelJob *s = container_of(job, CancelJob, common.job);
     s->completed = true;
-    job_completed(job, 0, NULL);
 }
 
 static void cancel_job_complete(Job *job, Error **errp)
@@ -176,13 +175,13 @@ static void cancel_job_complete(Job *job, Error **errp)
     s->should_complete = true;
 }
 
-static void coroutine_fn cancel_job_start(void *opaque)
+static int coroutine_fn cancel_job_run(Job *job, Error **errp)
 {
-    CancelJob *s = opaque;
+    CancelJob *s = container_of(job, CancelJob, common.job);
 
     while (!s->should_complete) {
         if (job_is_cancelled(&s->common.job)) {
-            goto defer;
+            return 0;
         }
 
         if (!job_is_ready(&s->common.job) && s->should_converge) {
@@ -192,8 +191,7 @@ static void coroutine_fn cancel_job_start(void *opaque)
         job_sleep_ns(&s->common.job, 100000);
     }
 
- defer:
-    job_defer_to_main_loop(&s->common.job, cancel_job_completed, s);
+    return 0;
 }
 
 static const BlockJobDriver test_cancel_driver = {
@@ -202,7 +200,8 @@ static const BlockJobDriver test_cancel_driver = {
         .free          = block_job_free,
         .user_resume   = block_job_user_resume,
         .drain         = block_job_drain,
-        .start         = cancel_job_start,
+        .run           = cancel_job_run,
+        .exit          = cancel_job_exit,
         .complete      = cancel_job_complete,
     },
 };
diff --git a/trace-events b/trace-events
index c445f54773..4fd2cb4b97 100644
--- a/trace-events
+++ b/trace-events
@@ -107,7 +107,7 @@ gdbstub_err_checksum_incorrect(uint8_t expected, uint8_t got) "got command packe
 # job.c
 job_state_transition(void *job,  int ret, const char *legal, const char *s0, const char *s1) "job %p (ret: %d) attempting %s transition (%s-->%s)"
 job_apply_verb(void *job, const char *state, const char *verb, const char *legal) "job %p in state %s; applying verb %s (%s)"
-job_completed(void *job, int ret, int jret) "job %p ret %d corrected ret %d"
+job_completed(void *job, int ret) "job %p ret %d"
 
 # job-qmp.c
 qmp_job_cancel(void *job) "job %p"