summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--block.c19
-rw-r--r--block/accounting.c123
-rw-r--r--block/backup.c50
-rw-r--r--block/block-backend.c1
-rw-r--r--block/io.c150
-rw-r--r--block/iscsi.c73
-rw-r--r--block/mirror.c2
-rw-r--r--block/qapi.c51
-rw-r--r--block/qed.c13
-rw-r--r--block/raw-posix.c8
-rw-r--r--block/raw_bsd.c6
-rw-r--r--blockdev.c485
-rw-r--r--blockjob.c189
-rw-r--r--docs/bitmaps.md6
-rw-r--r--hmp.c4
-rw-r--r--hw/block/nvme.c11
-rw-r--r--hw/block/virtio-blk.c4
-rw-r--r--hw/block/xen_disk.c27
-rw-r--r--hw/ide/atapi.c31
-rw-r--r--hw/ide/core.c12
-rw-r--r--hw/ide/macio.c12
-rw-r--r--hw/scsi/scsi-disk.c46
-rw-r--r--include/block/accounting.h28
-rw-r--r--include/block/block.h18
-rw-r--r--include/block/block_int.h23
-rw-r--r--include/block/blockjob.h85
-rw-r--r--include/qemu/timed-average.h64
-rw-r--r--qapi-schema.json56
-rw-r--r--qapi/block-core.json103
-rw-r--r--qemu-img.c3
-rw-r--r--qemu-io-cmds.c9
-rw-r--r--qmp-commands.hx82
-rw-r--r--tests/Makefile7
-rw-r--r--tests/qemu-iotests/124182
-rw-r--r--tests/qemu-iotests/124.out4
-rw-r--r--tests/qemu-iotests/136349
-rw-r--r--tests/qemu-iotests/136.out5
-rw-r--r--tests/qemu-iotests/group1
-rw-r--r--tests/test-blockjob-txn.c250
-rw-r--r--tests/test-timed-average.c90
-rw-r--r--util/Makefile.objs1
-rw-r--r--util/timed-average.c231
42 files changed, 2618 insertions, 296 deletions
diff --git a/block.c b/block.c
index 867520a3fd..3a7324bb05 100644
--- a/block.c
+++ b/block.c
@@ -3404,10 +3404,25 @@ void bdrv_reset_dirty_bitmap(BdrvDirtyBitmap *bitmap,
     hbitmap_reset(bitmap->bitmap, cur_sector, nr_sectors);
 }
 
-void bdrv_clear_dirty_bitmap(BdrvDirtyBitmap *bitmap)
+void bdrv_clear_dirty_bitmap(BdrvDirtyBitmap *bitmap, HBitmap **out)
 {
     assert(bdrv_dirty_bitmap_enabled(bitmap));
-    hbitmap_reset_all(bitmap->bitmap);
+    if (!out) {
+        hbitmap_reset_all(bitmap->bitmap);
+    } else {
+        HBitmap *backup = bitmap->bitmap;
+        bitmap->bitmap = hbitmap_alloc(bitmap->size,
+                                       hbitmap_granularity(backup));
+        *out = backup;
+    }
+}
+
+void bdrv_undo_clear_dirty_bitmap(BdrvDirtyBitmap *bitmap, HBitmap *in)
+{
+    HBitmap *tmp = bitmap->bitmap;
+    assert(bdrv_dirty_bitmap_enabled(bitmap));
+    bitmap->bitmap = in;
+    hbitmap_free(tmp);
 }
 
 void bdrv_set_dirty(BlockDriverState *bs, int64_t cur_sector,
diff --git a/block/accounting.c b/block/accounting.c
index a423560206..185025ec1e 100644
--- a/block/accounting.c
+++ b/block/accounting.c
@@ -2,6 +2,7 @@
  * QEMU System Emulator block accounting
  *
  * Copyright (c) 2011 Christoph Hellwig
+ * Copyright (c) 2015 Igalia, S.L.
  *
  * Permission is hereby granted, free of charge, to any person obtaining a copy
  * of this software and associated documentation files (the "Software"), to deal
@@ -25,6 +26,54 @@
 #include "block/accounting.h"
 #include "block/block_int.h"
 #include "qemu/timer.h"
+#include "sysemu/qtest.h"
+
+static QEMUClockType clock_type = QEMU_CLOCK_REALTIME;
+static const int qtest_latency_ns = NANOSECONDS_PER_SECOND / 1000;
+
+void block_acct_init(BlockAcctStats *stats, bool account_invalid,
+                     bool account_failed)
+{
+    stats->account_invalid = account_invalid;
+    stats->account_failed = account_failed;
+
+    if (qtest_enabled()) {
+        clock_type = QEMU_CLOCK_VIRTUAL;
+    }
+}
+
+void block_acct_cleanup(BlockAcctStats *stats)
+{
+    BlockAcctTimedStats *s, *next;
+    QSLIST_FOREACH_SAFE(s, &stats->intervals, entries, next) {
+        g_free(s);
+    }
+}
+
+void block_acct_add_interval(BlockAcctStats *stats, unsigned interval_length)
+{
+    BlockAcctTimedStats *s;
+    unsigned i;
+
+    s = g_new0(BlockAcctTimedStats, 1);
+    s->interval_length = interval_length;
+    QSLIST_INSERT_HEAD(&stats->intervals, s, entries);
+
+    for (i = 0; i < BLOCK_MAX_IOTYPE; i++) {
+        timed_average_init(&s->latency[i], clock_type,
+                           (uint64_t) interval_length * NANOSECONDS_PER_SECOND);
+    }
+}
+
+BlockAcctTimedStats *block_acct_interval_next(BlockAcctStats *stats,
+                                              BlockAcctTimedStats *s)
+{
+    if (s == NULL) {
+        return QSLIST_FIRST(&stats->intervals);
+    } else {
+        return QSLIST_NEXT(s, entries);
+    }
+}
 
 void block_acct_start(BlockAcctStats *stats, BlockAcctCookie *cookie,
                       int64_t bytes, enum BlockAcctType type)
@@ -32,20 +81,71 @@ void block_acct_start(BlockAcctStats *stats, BlockAcctCookie *cookie,
     assert(type < BLOCK_MAX_IOTYPE);
 
     cookie->bytes = bytes;
-    cookie->start_time_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
+    cookie->start_time_ns = qemu_clock_get_ns(clock_type);
     cookie->type = type;
 }
 
 void block_acct_done(BlockAcctStats *stats, BlockAcctCookie *cookie)
 {
+    BlockAcctTimedStats *s;
+    int64_t time_ns = qemu_clock_get_ns(clock_type);
+    int64_t latency_ns = time_ns - cookie->start_time_ns;
+
+    if (qtest_enabled()) {
+        latency_ns = qtest_latency_ns;
+    }
+
     assert(cookie->type < BLOCK_MAX_IOTYPE);
 
     stats->nr_bytes[cookie->type] += cookie->bytes;
     stats->nr_ops[cookie->type]++;
-    stats->total_time_ns[cookie->type] +=
-        qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - cookie->start_time_ns;
+    stats->total_time_ns[cookie->type] += latency_ns;
+    stats->last_access_time_ns = time_ns;
+
+    QSLIST_FOREACH(s, &stats->intervals, entries) {
+        timed_average_account(&s->latency[cookie->type], latency_ns);
+    }
+}
+
+void block_acct_failed(BlockAcctStats *stats, BlockAcctCookie *cookie)
+{
+    assert(cookie->type < BLOCK_MAX_IOTYPE);
+
+    stats->failed_ops[cookie->type]++;
+
+    if (stats->account_failed) {
+        BlockAcctTimedStats *s;
+        int64_t time_ns = qemu_clock_get_ns(clock_type);
+        int64_t latency_ns = time_ns - cookie->start_time_ns;
+
+        if (qtest_enabled()) {
+            latency_ns = qtest_latency_ns;
+        }
+
+        stats->total_time_ns[cookie->type] += latency_ns;
+        stats->last_access_time_ns = time_ns;
+
+        QSLIST_FOREACH(s, &stats->intervals, entries) {
+            timed_average_account(&s->latency[cookie->type], latency_ns);
+        }
+    }
 }
 
+void block_acct_invalid(BlockAcctStats *stats, enum BlockAcctType type)
+{
+    assert(type < BLOCK_MAX_IOTYPE);
+
+    /* block_acct_done() and block_acct_failed() update
+     * total_time_ns[], but this one does not. The reason is that
+     * invalid requests are accounted during their submission,
+     * therefore there's no actual I/O involved. */
+
+    stats->invalid_ops[type]++;
+
+    if (stats->account_invalid) {
+        stats->last_access_time_ns = qemu_clock_get_ns(clock_type);
+    }
+}
 
 void block_acct_merge_done(BlockAcctStats *stats, enum BlockAcctType type,
                       int num_requests)
@@ -53,3 +153,20 @@ void block_acct_merge_done(BlockAcctStats *stats, enum BlockAcctType type,
     assert(type < BLOCK_MAX_IOTYPE);
     stats->merged[type] += num_requests;
 }
+
+int64_t block_acct_idle_time_ns(BlockAcctStats *stats)
+{
+    return qemu_clock_get_ns(clock_type) - stats->last_access_time_ns;
+}
+
+double block_acct_queue_depth(BlockAcctTimedStats *stats,
+                              enum BlockAcctType type)
+{
+    uint64_t sum, elapsed;
+
+    assert(type < BLOCK_MAX_IOTYPE);
+
+    sum = timed_average_sum(&stats->latency[type], &elapsed);
+
+    return (double) sum / elapsed;
+}
diff --git a/block/backup.c b/block/backup.c
index ec01db8eff..3b39119256 100644
--- a/block/backup.c
+++ b/block/backup.c
@@ -221,11 +221,45 @@ static void backup_iostatus_reset(BlockJob *job)
     }
 }
 
+static void backup_cleanup_sync_bitmap(BackupBlockJob *job, int ret)
+{
+    BdrvDirtyBitmap *bm;
+    BlockDriverState *bs = job->common.bs;
+
+    if (ret < 0 || block_job_is_cancelled(&job->common)) {
+        /* Merge the successor back into the parent, delete nothing. */
+        bm = bdrv_reclaim_dirty_bitmap(bs, job->sync_bitmap, NULL);
+        assert(bm);
+    } else {
+        /* Everything is fine, delete this bitmap and install the backup. */
+        bm = bdrv_dirty_bitmap_abdicate(bs, job->sync_bitmap, NULL);
+        assert(bm);
+    }
+}
+
+static void backup_commit(BlockJob *job)
+{
+    BackupBlockJob *s = container_of(job, BackupBlockJob, common);
+    if (s->sync_bitmap) {
+        backup_cleanup_sync_bitmap(s, 0);
+    }
+}
+
+static void backup_abort(BlockJob *job)
+{
+    BackupBlockJob *s = container_of(job, BackupBlockJob, common);
+    if (s->sync_bitmap) {
+        backup_cleanup_sync_bitmap(s, -1);
+    }
+}
+
 static const BlockJobDriver backup_job_driver = {
     .instance_size  = sizeof(BackupBlockJob),
     .job_type       = BLOCK_JOB_TYPE_BACKUP,
     .set_speed      = backup_set_speed,
     .iostatus_reset = backup_iostatus_reset,
+    .commit         = backup_commit,
+    .abort          = backup_abort,
 };
 
 static BlockErrorAction backup_error_action(BackupBlockJob *job,
@@ -441,19 +475,6 @@ static void coroutine_fn backup_run(void *opaque)
     /* wait until pending backup_do_cow() calls have completed */
     qemu_co_rwlock_wrlock(&job->flush_rwlock);
     qemu_co_rwlock_unlock(&job->flush_rwlock);
-
-    if (job->sync_bitmap) {
-        BdrvDirtyBitmap *bm;
-        if (ret < 0 || block_job_is_cancelled(&job->common)) {
-            /* Merge the successor back into the parent, delete nothing. */
-            bm = bdrv_reclaim_dirty_bitmap(bs, job->sync_bitmap, NULL);
-            assert(bm);
-        } else {
-            /* Everything is fine, delete this bitmap and install the backup. */
-            bm = bdrv_dirty_bitmap_abdicate(bs, job->sync_bitmap, NULL);
-            assert(bm);
-        }
-    }
     hbitmap_free(job->bitmap);
 
     if (target->blk) {
@@ -472,7 +493,7 @@ void backup_start(BlockDriverState *bs, BlockDriverState *target,
                   BlockdevOnError on_source_error,
                   BlockdevOnError on_target_error,
                   BlockCompletionFunc *cb, void *opaque,
-                  Error **errp)
+                  BlockJobTxn *txn, Error **errp)
 {
     int64_t len;
 
@@ -554,6 +575,7 @@ void backup_start(BlockDriverState *bs, BlockDriverState *target,
                        sync_bitmap : NULL;
     job->common.len = len;
     job->common.co = qemu_coroutine_create(backup_run);
+    block_job_txn_add_job(txn, &job->common);
     qemu_coroutine_enter(job->common.co, job);
     return;
 
diff --git a/block/block-backend.c b/block/block-backend.c
index 6f9309fef4..9889e813b6 100644
--- a/block/block-backend.c
+++ b/block/block-backend.c
@@ -176,6 +176,7 @@ static void blk_delete(BlockBackend *blk)
     }
     g_free(blk->name);
     drive_info_del(blk->legacy_dinfo);
+    block_acct_cleanup(&blk->stats);
     g_free(blk);
 }
 
diff --git a/block/io.c b/block/io.c
index 8dcad3b3fe..adc1eabef0 100644
--- a/block/io.c
+++ b/block/io.c
@@ -237,8 +237,21 @@ bool bdrv_requests_pending(BlockDriverState *bs)
     return false;
 }
 
+static void bdrv_drain_recurse(BlockDriverState *bs)
+{
+    BdrvChild *child;
+
+    if (bs->drv && bs->drv->bdrv_drain) {
+        bs->drv->bdrv_drain(bs);
+    }
+    QLIST_FOREACH(child, &bs->children, next) {
+        bdrv_drain_recurse(child->bs);
+    }
+}
+
 /*
- * Wait for pending requests to complete on a single BlockDriverState subtree
+ * Wait for pending requests to complete on a single BlockDriverState subtree,
+ * and suspend block driver's internal I/O until next request arrives.
  *
  * Note that unlike bdrv_drain_all(), the caller must hold the BlockDriverState
  * AioContext.
@@ -251,6 +264,7 @@ void bdrv_drain(BlockDriverState *bs)
 {
     bool busy = true;
 
+    bdrv_drain_recurse(bs);
     while (busy) {
         /* Keep iterating */
          bdrv_flush_io_queue(bs);
@@ -348,13 +362,14 @@ static void tracked_request_end(BdrvTrackedRequest *req)
 static void tracked_request_begin(BdrvTrackedRequest *req,
                                   BlockDriverState *bs,
                                   int64_t offset,
-                                  unsigned int bytes, bool is_write)
+                                  unsigned int bytes,
+                                  enum BdrvTrackedRequestType type)
 {
     *req = (BdrvTrackedRequest){
         .bs = bs,
         .offset         = offset,
         .bytes          = bytes,
-        .is_write       = is_write,
+        .type           = type,
         .co             = qemu_coroutine_self(),
         .serialising    = false,
         .overlap_offset = offset,
@@ -971,7 +986,7 @@ static int coroutine_fn bdrv_co_do_preadv(BlockDriverState *bs,
         bytes = ROUND_UP(bytes, align);
     }
 
-    tracked_request_begin(&req, bs, offset, bytes, false);
+    tracked_request_begin(&req, bs, offset, bytes, BDRV_TRACKED_READ);
     ret = bdrv_aligned_preadv(bs, &req, offset, bytes, align,
                               use_local_qiov ? &local_qiov : qiov,
                               flags);
@@ -1292,7 +1307,7 @@ static int coroutine_fn bdrv_co_do_pwritev(BlockDriverState *bs,
      * Pad qiov with the read parts and be sure to have a tracked request not
      * only for bdrv_aligned_pwritev, but also for the reads of the RMW cycle.
      */
-    tracked_request_begin(&req, bs, offset, bytes, true);
+    tracked_request_begin(&req, bs, offset, bytes, BDRV_TRACKED_WRITE);
 
     if (!qiov) {
         ret = bdrv_co_do_zero_pwritev(bs, offset, bytes, flags, &req);
@@ -2317,18 +2332,20 @@ static void coroutine_fn bdrv_flush_co_entry(void *opaque)
 int coroutine_fn bdrv_co_flush(BlockDriverState *bs)
 {
     int ret;
+    BdrvTrackedRequest req;
 
     if (!bs || !bdrv_is_inserted(bs) || bdrv_is_read_only(bs) ||
         bdrv_is_sg(bs)) {
         return 0;
     }
 
+    tracked_request_begin(&req, bs, 0, 0, BDRV_TRACKED_FLUSH);
     /* Write back cached data to the OS even with cache=unsafe */
     BLKDBG_EVENT(bs->file, BLKDBG_FLUSH_TO_OS);
     if (bs->drv->bdrv_co_flush_to_os) {
         ret = bs->drv->bdrv_co_flush_to_os(bs);
         if (ret < 0) {
-            return ret;
+            goto out;
         }
     }
 
@@ -2368,14 +2385,17 @@ int coroutine_fn bdrv_co_flush(BlockDriverState *bs)
         ret = 0;
     }
     if (ret < 0) {
-        return ret;
+        goto out;
     }
 
     /* Now flush the underlying protocol.  It will also have BDRV_O_NO_FLUSH
      * in the case of cache=unsafe, so there are no useless flushes.
      */
 flush_parent:
-    return bs->file ? bdrv_co_flush(bs->file->bs) : 0;
+    ret = bs->file ? bdrv_co_flush(bs->file->bs) : 0;
+out:
+    tracked_request_end(&req);
+    return ret;
 }
 
 int bdrv_flush(BlockDriverState *bs)
@@ -2418,6 +2438,7 @@ static void coroutine_fn bdrv_discard_co_entry(void *opaque)
 int coroutine_fn bdrv_co_discard(BlockDriverState *bs, int64_t sector_num,
                                  int nb_sectors)
 {
+    BdrvTrackedRequest req;
     int max_discard, ret;
 
     if (!bs->drv) {
@@ -2440,6 +2461,8 @@ int coroutine_fn bdrv_co_discard(BlockDriverState *bs, int64_t sector_num,
         return 0;
     }
 
+    tracked_request_begin(&req, bs, sector_num, nb_sectors,
+                          BDRV_TRACKED_DISCARD);
     bdrv_set_dirty(bs, sector_num, nb_sectors);
 
     max_discard = MIN_NON_ZERO(bs->bl.max_discard, BDRV_REQUEST_MAX_SECTORS);
@@ -2473,20 +2496,24 @@ int coroutine_fn bdrv_co_discard(BlockDriverState *bs, int64_t sector_num,
             acb = bs->drv->bdrv_aio_discard(bs, sector_num, nb_sectors,
                                             bdrv_co_io_em_complete, &co);
             if (acb == NULL) {
-                return -EIO;
+                ret = -EIO;
+                goto out;
             } else {
                 qemu_coroutine_yield();
                 ret = co.ret;
             }
         }
         if (ret && ret != -ENOTSUP) {
-            return ret;
+            goto out;
         }
 
         sector_num += num;
         nb_sectors -= num;
     }
-    return 0;
+    ret = 0;
+out:
+    tracked_request_end(&req);
+    return ret;
 }
 
 int bdrv_discard(BlockDriverState *bs, int64_t sector_num, int nb_sectors)
@@ -2515,26 +2542,109 @@ int bdrv_discard(BlockDriverState *bs, int64_t sector_num, int nb_sectors)
     return rwco.ret;
 }
 
-/* needed for generic scsi interface */
+typedef struct {
+    CoroutineIOCompletion *co;
+    QEMUBH *bh;
+} BdrvIoctlCompletionData;
 
-int bdrv_ioctl(BlockDriverState *bs, unsigned long int req, void *buf)
+static void bdrv_ioctl_bh_cb(void *opaque)
+{
+    BdrvIoctlCompletionData *data = opaque;
+
+    bdrv_co_io_em_complete(data->co, -ENOTSUP);
+    qemu_bh_delete(data->bh);
+}
+
+static int bdrv_co_do_ioctl(BlockDriverState *bs, int req, void *buf)
 {
     BlockDriver *drv = bs->drv;
+    BdrvTrackedRequest tracked_req;
+    CoroutineIOCompletion co = {
+        .coroutine = qemu_coroutine_self(),
+    };
+    BlockAIOCB *acb;
 
-    if (drv && drv->bdrv_ioctl)
-        return drv->bdrv_ioctl(bs, req, buf);
-    return -ENOTSUP;
+    tracked_request_begin(&tracked_req, bs, 0, 0, BDRV_TRACKED_IOCTL);
+    if (!drv || !drv->bdrv_aio_ioctl) {
+        co.ret = -ENOTSUP;
+        goto out;
+    }
+
+    acb = drv->bdrv_aio_ioctl(bs, req, buf, bdrv_co_io_em_complete, &co);
+    if (!acb) {
+        BdrvIoctlCompletionData *data = g_new(BdrvIoctlCompletionData, 1);
+        data->bh = aio_bh_new(bdrv_get_aio_context(bs),
+                                bdrv_ioctl_bh_cb, data);
+        data->co = &co;
+        qemu_bh_schedule(data->bh);
+    }
+    qemu_coroutine_yield();
+out:
+    tracked_request_end(&tracked_req);
+    return co.ret;
+}
+
+typedef struct {
+    BlockDriverState *bs;
+    int req;
+    void *buf;
+    int ret;
+} BdrvIoctlCoData;
+
+static void coroutine_fn bdrv_co_ioctl_entry(void *opaque)
+{
+    BdrvIoctlCoData *data = opaque;
+    data->ret = bdrv_co_do_ioctl(data->bs, data->req, data->buf);
+}
+
+/* needed for generic scsi interface */
+int bdrv_ioctl(BlockDriverState *bs, unsigned long int req, void *buf)
+{
+    BdrvIoctlCoData data = {
+        .bs = bs,
+        .req = req,
+        .buf = buf,
+        .ret = -EINPROGRESS,
+    };
+
+    if (qemu_in_coroutine()) {
+        /* Fast-path if already in coroutine context */
+        bdrv_co_ioctl_entry(&data);
+    } else {
+        Coroutine *co = qemu_coroutine_create(bdrv_co_ioctl_entry);
+        qemu_coroutine_enter(co, &data);
+    }
+    while (data.ret == -EINPROGRESS) {
+        aio_poll(bdrv_get_aio_context(bs), true);
+    }
+    return data.ret;
+}
+
+static void coroutine_fn bdrv_co_aio_ioctl_entry(void *opaque)
+{
+    BlockAIOCBCoroutine *acb = opaque;
+    acb->req.error = bdrv_co_do_ioctl(acb->common.bs,
+                                      acb->req.req, acb->req.buf);
+    bdrv_co_complete(acb);
 }
 
 BlockAIOCB *bdrv_aio_ioctl(BlockDriverState *bs,
         unsigned long int req, void *buf,
         BlockCompletionFunc *cb, void *opaque)
 {
-    BlockDriver *drv = bs->drv;
+    BlockAIOCBCoroutine *acb = qemu_aio_get(&bdrv_em_co_aiocb_info,
+                                            bs, cb, opaque);
+    Coroutine *co;
 
-    if (drv && drv->bdrv_aio_ioctl)
-        return drv->bdrv_aio_ioctl(bs, req, buf, cb, opaque);
-    return NULL;
+    acb->need_bh = true;
+    acb->req.error = -EINPROGRESS;
+    acb->req.req = req;
+    acb->req.buf = buf;
+    co = qemu_coroutine_create(bdrv_co_aio_ioctl_entry);
+    qemu_coroutine_enter(co, acb);
+
+    bdrv_co_maybe_schedule_bh(acb);
+    return &acb->common;
 }
 
 void *qemu_blockalign(BlockDriverState *bs, size_t size)
diff --git a/block/iscsi.c b/block/iscsi.c
index 080ef52345..bd1f1bfcd1 100644
--- a/block/iscsi.c
+++ b/block/iscsi.c
@@ -97,6 +97,7 @@ typedef struct IscsiAIOCB {
     int status;
     int64_t sector_num;
     int nb_sectors;
+    int ret;
 #ifdef __linux__
     sg_io_hdr_t *ioh;
 #endif
@@ -779,6 +780,38 @@ iscsi_aio_ioctl_cb(struct iscsi_context *iscsi, int status,
     iscsi_schedule_bh(acb);
 }
 
+static void iscsi_ioctl_bh_completion(void *opaque)
+{
+    IscsiAIOCB *acb = opaque;
+
+    qemu_bh_delete(acb->bh);
+    acb->common.cb(acb->common.opaque, acb->ret);
+    qemu_aio_unref(acb);
+}
+
+static void iscsi_ioctl_handle_emulated(IscsiAIOCB *acb, int req, void *buf)
+{
+    BlockDriverState *bs = acb->common.bs;
+    IscsiLun *iscsilun = bs->opaque;
+    int ret = 0;
+
+    switch (req) {
+    case SG_GET_VERSION_NUM:
+        *(int *)buf = 30000;
+        break;
+    case SG_GET_SCSI_ID:
+        ((struct sg_scsi_id *)buf)->scsi_type = iscsilun->type;
+        break;
+    default:
+        ret = -EINVAL;
+    }
+    assert(!acb->bh);
+    acb->bh = aio_bh_new(bdrv_get_aio_context(bs),
+                         iscsi_ioctl_bh_completion, acb);
+    acb->ret = ret;
+    qemu_bh_schedule(acb->bh);
+}
+
 static BlockAIOCB *iscsi_aio_ioctl(BlockDriverState *bs,
         unsigned long int req, void *buf,
         BlockCompletionFunc *cb, void *opaque)
@@ -788,8 +821,6 @@ static BlockAIOCB *iscsi_aio_ioctl(BlockDriverState *bs,
     struct iscsi_data data;
     IscsiAIOCB *acb;
 
-    assert(req == SG_IO);
-
     acb = qemu_aio_get(&iscsi_aiocb_info, bs, cb, opaque);
 
     acb->iscsilun = iscsilun;
@@ -798,6 +829,11 @@ static BlockAIOCB *iscsi_aio_ioctl(BlockDriverState *bs,
     acb->buf         = NULL;
     acb->ioh         = buf;
 
+    if (req != SG_IO) {
+        iscsi_ioctl_handle_emulated(acb, req, buf);
+        return &acb->common;
+    }
+
     acb->task = malloc(sizeof(struct scsi_task));
     if (acb->task == NULL) {
         error_report("iSCSI: Failed to allocate task for scsi command. %s",
@@ -862,38 +898,6 @@ static BlockAIOCB *iscsi_aio_ioctl(BlockDriverState *bs,
     return &acb->common;
 }
 
-static void ioctl_cb(void *opaque, int status)
-{
-    int *p_status = opaque;
-    *p_status = status;
-}
-
-static int iscsi_ioctl(BlockDriverState *bs, unsigned long int req, void *buf)
-{
-    IscsiLun *iscsilun = bs->opaque;
-    int status;
-
-    switch (req) {
-    case SG_GET_VERSION_NUM:
-        *(int *)buf = 30000;
-        break;
-    case SG_GET_SCSI_ID:
-        ((struct sg_scsi_id *)buf)->scsi_type = iscsilun->type;
-        break;
-    case SG_IO:
-        status = -EINPROGRESS;
-        iscsi_aio_ioctl(bs, req, buf, ioctl_cb, &status);
-
-        while (status == -EINPROGRESS) {
-            aio_poll(iscsilun->aio_context, true);
-        }
-
-        return 0;
-    default:
-        return -1;
-    }
-    return 0;
-}
 #endif
 
 static int64_t
@@ -1824,7 +1828,6 @@ static BlockDriver bdrv_iscsi = {
     .bdrv_co_flush_to_disk = iscsi_co_flush,
 
 #ifdef __linux__
-    .bdrv_ioctl       = iscsi_ioctl,
     .bdrv_aio_ioctl   = iscsi_aio_ioctl,
 #endif
 
diff --git a/block/mirror.c b/block/mirror.c
index 60f1cb589d..52c9abfe14 100644
--- a/block/mirror.c
+++ b/block/mirror.c
@@ -742,7 +742,7 @@ static void mirror_start_job(BlockDriverState *bs, BlockDriverState *target,
     s->dirty_bitmap = bdrv_create_dirty_bitmap(bs, granularity, NULL, errp);
     if (!s->dirty_bitmap) {
         g_free(s->replaces);
-        block_job_release(bs);
+        block_job_unref(&s->common);
         return;
     }
 
diff --git a/block/qapi.c b/block/qapi.c
index 89d4274177..d20262decb 100644
--- a/block/qapi.c
+++ b/block/qapi.c
@@ -346,17 +346,68 @@ static BlockStats *bdrv_query_stats(const BlockDriverState *bs,
     s->stats = g_malloc0(sizeof(*s->stats));
     if (bs->blk) {
         BlockAcctStats *stats = blk_get_stats(bs->blk);
+        BlockAcctTimedStats *ts = NULL;
 
         s->stats->rd_bytes = stats->nr_bytes[BLOCK_ACCT_READ];
         s->stats->wr_bytes = stats->nr_bytes[BLOCK_ACCT_WRITE];
         s->stats->rd_operations = stats->nr_ops[BLOCK_ACCT_READ];
         s->stats->wr_operations = stats->nr_ops[BLOCK_ACCT_WRITE];
+
+        s->stats->failed_rd_operations = stats->failed_ops[BLOCK_ACCT_READ];
+        s->stats->failed_wr_operations = stats->failed_ops[BLOCK_ACCT_WRITE];
+        s->stats->failed_flush_operations = stats->failed_ops[BLOCK_ACCT_FLUSH];
+
+        s->stats->invalid_rd_operations = stats->invalid_ops[BLOCK_ACCT_READ];
+        s->stats->invalid_wr_operations = stats->invalid_ops[BLOCK_ACCT_WRITE];
+        s->stats->invalid_flush_operations =
+            stats->invalid_ops[BLOCK_ACCT_FLUSH];
+
         s->stats->rd_merged = stats->merged[BLOCK_ACCT_READ];
         s->stats->wr_merged = stats->merged[BLOCK_ACCT_WRITE];
         s->stats->flush_operations = stats->nr_ops[BLOCK_ACCT_FLUSH];
         s->stats->wr_total_time_ns = stats->total_time_ns[BLOCK_ACCT_WRITE];
         s->stats->rd_total_time_ns = stats->total_time_ns[BLOCK_ACCT_READ];
         s->stats->flush_total_time_ns = stats->total_time_ns[BLOCK_ACCT_FLUSH];
+
+        s->stats->has_idle_time_ns = stats->last_access_time_ns > 0;
+        if (s->stats->has_idle_time_ns) {
+            s->stats->idle_time_ns = block_acct_idle_time_ns(stats);
+        }
+
+        s->stats->account_invalid = stats->account_invalid;
+        s->stats->account_failed = stats->account_failed;
+
+        while ((ts = block_acct_interval_next(stats, ts))) {
+            BlockDeviceTimedStatsList *timed_stats =
+                g_malloc0(sizeof(*timed_stats));
+            BlockDeviceTimedStats *dev_stats = g_malloc0(sizeof(*dev_stats));
+            timed_stats->next = s->stats->timed_stats;
+            timed_stats->value = dev_stats;
+            s->stats->timed_stats = timed_stats;
+
+            TimedAverage *rd = &ts->latency[BLOCK_ACCT_READ];
+            TimedAverage *wr = &ts->latency[BLOCK_ACCT_WRITE];
+            TimedAverage *fl = &ts->latency[BLOCK_ACCT_FLUSH];
+
+            dev_stats->interval_length = ts->interval_length;
+
+            dev_stats->min_rd_latency_ns = timed_average_min(rd);
+            dev_stats->max_rd_latency_ns = timed_average_max(rd);
+            dev_stats->avg_rd_latency_ns = timed_average_avg(rd);
+
+            dev_stats->min_wr_latency_ns = timed_average_min(wr);
+            dev_stats->max_wr_latency_ns = timed_average_max(wr);
+            dev_stats->avg_wr_latency_ns = timed_average_avg(wr);
+
+            dev_stats->min_flush_latency_ns = timed_average_min(fl);
+            dev_stats->max_flush_latency_ns = timed_average_max(fl);
+            dev_stats->avg_flush_latency_ns = timed_average_avg(fl);
+
+            dev_stats->avg_rd_queue_depth =
+                block_acct_queue_depth(ts, BLOCK_ACCT_READ);
+            dev_stats->avg_wr_queue_depth =
+                block_acct_queue_depth(ts, BLOCK_ACCT_WRITE);
+        }
     }
 
     s->stats->wr_highest_offset = bs->wr_highest_offset;
diff --git a/block/qed.c b/block/qed.c
index 5ea05d4909..9b88895038 100644
--- a/block/qed.c
+++ b/block/qed.c
@@ -375,6 +375,18 @@ static void bdrv_qed_attach_aio_context(BlockDriverState *bs,
     }
 }
 
+static void bdrv_qed_drain(BlockDriverState *bs)
+{
+    BDRVQEDState *s = bs->opaque;
+
+    /* Cancel timer and start doing I/O that were meant to happen as if it
+     * fired, that way we get bdrv_drain() taking care of the ongoing requests
+     * correctly. */
+    qed_cancel_need_check_timer(s);
+    qed_plug_allocating_write_reqs(s);
+    bdrv_aio_flush(s->bs, qed_clear_need_check, s);
+}
+
 static int bdrv_qed_open(BlockDriverState *bs, QDict *options, int flags,
                          Error **errp)
 {
@@ -1676,6 +1688,7 @@ static BlockDriver bdrv_qed = {
     .bdrv_check               = bdrv_qed_check,
     .bdrv_detach_aio_context  = bdrv_qed_detach_aio_context,
     .bdrv_attach_aio_context  = bdrv_qed_attach_aio_context,
+    .bdrv_drain               = bdrv_qed_drain,
 };
 
 static void bdrv_qed_init(void)
diff --git a/block/raw-posix.c b/block/raw-posix.c
index 918c756c2e..aec9ec6bbb 100644
--- a/block/raw-posix.c
+++ b/block/raw-posix.c
@@ -2175,12 +2175,6 @@ static int hdev_open(BlockDriverState *bs, QDict *options, int flags,
 }
 
 #if defined(__linux__)
-static int hdev_ioctl(BlockDriverState *bs, unsigned long int req, void *buf)
-{
-    BDRVRawState *s = bs->opaque;
-
-    return ioctl(s->fd, req, buf);
-}
 
 static BlockAIOCB *hdev_aio_ioctl(BlockDriverState *bs,
         unsigned long int req, void *buf,
@@ -2338,7 +2332,6 @@ static BlockDriver bdrv_host_device = {
 
     /* generic scsi device */
 #ifdef __linux__
-    .bdrv_ioctl         = hdev_ioctl,
     .bdrv_aio_ioctl     = hdev_aio_ioctl,
 #endif
 };
@@ -2471,7 +2464,6 @@ static BlockDriver bdrv_host_cdrom = {
     .bdrv_lock_medium   = cdrom_lock_medium,
 
     /* generic scsi device */
-    .bdrv_ioctl         = hdev_ioctl,
     .bdrv_aio_ioctl     = hdev_aio_ioctl,
 };
 #endif /* __linux__ */
diff --git a/block/raw_bsd.c b/block/raw_bsd.c
index 0aded31c22..915d6fd0e6 100644
--- a/block/raw_bsd.c
+++ b/block/raw_bsd.c
@@ -169,11 +169,6 @@ static void raw_lock_medium(BlockDriverState *bs, bool locked)
     bdrv_lock_medium(bs->file->bs, locked);
 }
 
-static int raw_ioctl(BlockDriverState *bs, unsigned long int req, void *buf)
-{
-    return bdrv_ioctl(bs->file->bs, req, buf);
-}
-
 static BlockAIOCB *raw_aio_ioctl(BlockDriverState *bs,
                                  unsigned long int req, void *buf,
                                  BlockCompletionFunc *cb,
@@ -262,7 +257,6 @@ BlockDriver bdrv_raw = {
     .bdrv_media_changed   = &raw_media_changed,
     .bdrv_eject           = &raw_eject,
     .bdrv_lock_medium     = &raw_lock_medium,
-    .bdrv_ioctl           = &raw_ioctl,
     .bdrv_aio_ioctl       = &raw_aio_ioctl,
     .create_opts          = &raw_create_opts,
     .bdrv_has_zero_init   = &raw_has_zero_init
diff --git a/blockdev.c b/blockdev.c
index 8607df90a9..fc85128e94 100644
--- a/blockdev.c
+++ b/blockdev.c
@@ -283,32 +283,6 @@ typedef struct {
     BlockDriverState *bs;
 } BDRVPutRefBH;
 
-static void bdrv_put_ref_bh(void *opaque)
-{
-    BDRVPutRefBH *s = opaque;
-
-    bdrv_unref(s->bs);
-    qemu_bh_delete(s->bh);
-    g_free(s);
-}
-
-/*
- * Release a BDS reference in a BH
- *
- * It is not safe to use bdrv_unref() from a callback function when the callers
- * still need the BlockDriverState.  In such cases we schedule a BH to release
- * the reference.
- */
-static void bdrv_put_ref_bh_schedule(BlockDriverState *bs)
-{
-    BDRVPutRefBH *s;
-
-    s = g_new(BDRVPutRefBH, 1);
-    s->bh = qemu_bh_new(bdrv_put_ref_bh, s);
-    s->bs = bs;
-    qemu_bh_schedule(s->bh);
-}
-
 static int parse_block_error_action(const char *buf, bool is_read, Error **errp)
 {
     if (!strcmp(buf, "ignore")) {
@@ -467,6 +441,8 @@ static BlockBackend *blockdev_init(const char *file, QDict *bs_opts,
     const char *buf;
     int bdrv_flags = 0;
     int on_read_error, on_write_error;
+    bool account_invalid, account_failed;
+    const char *stats_intervals;
     BlockBackend *blk;
     BlockDriverState *bs;
     ThrottleConfig cfg;
@@ -503,6 +479,11 @@ static BlockBackend *blockdev_init(const char *file, QDict *bs_opts,
     /* extract parameters */
     snapshot = qemu_opt_get_bool(opts, "snapshot", 0);
 
+    account_invalid = qemu_opt_get_bool(opts, "stats-account-invalid", true);
+    account_failed = qemu_opt_get_bool(opts, "stats-account-failed", true);
+
+    stats_intervals = qemu_opt_get(opts, "stats-intervals");
+
     extract_common_blockdev_options(opts, &bdrv_flags, &throttling_group, &cfg,
                                     &detect_zeroes, &error);
     if (error) {
@@ -599,6 +580,37 @@ static BlockBackend *blockdev_init(const char *file, QDict *bs_opts,
         if (bdrv_key_required(bs)) {
             autostart = 0;
         }
+
+        block_acct_init(blk_get_stats(blk), account_invalid, account_failed);
+
+        if (stats_intervals) {
+            char **intervals = g_strsplit(stats_intervals, ":", 0);
+            unsigned i;
+
+            if (*stats_intervals == '\0') {
+                error_setg(&error, "stats-intervals can't have an empty value");
+            }
+
+            for (i = 0; !error && intervals[i] != NULL; i++) {
+                unsigned long long val;
+                if (parse_uint_full(intervals[i], &val, 10) == 0 &&
+                    val > 0 && val <= UINT_MAX) {
+                    block_acct_add_interval(blk_get_stats(blk), val);
+                } else {
+                    error_setg(&error, "Invalid interval length: '%s'",
+                               intervals[i]);
+                }
+            }
+
+            g_strfreev(intervals);
+
+            if (error) {
+                error_propagate(errp, error);
+                blk_unref(blk);
+                blk = NULL;
+                goto err_no_bs_opts;
+            }
+        }
     }
 
     blk_set_on_error(blk, on_read_error, on_write_error);
@@ -1157,7 +1169,7 @@ static void blockdev_do_action(TransactionActionKind type, void *data,
     action.u.data = data;
     list.value = &action;
     list.next = NULL;
-    qmp_transaction(&list, errp);
+    qmp_transaction(&list, false, NULL, errp);
 }
 
 void qmp_blockdev_snapshot_sync(bool has_device, const char *device,
@@ -1359,44 +1371,75 @@ static BdrvDirtyBitmap *block_dirty_bitmap_lookup(const char *node,
 
 /* New and old BlockDriverState structs for atomic group operations */
 
-typedef struct BlkTransactionState BlkTransactionState;
+typedef struct BlkActionState BlkActionState;
 
-/* Only prepare() may fail. In a single transaction, only one of commit() or
-   abort() will be called, clean() will always be called if it present. */
-typedef struct BdrvActionOps {
-    /* Size of state struct, in bytes. */
+/**
+ * BlkActionOps:
+ * Table of operations that define an Action.
+ *
+ * @instance_size: Size of state struct, in bytes.
+ * @prepare: Prepare the work, must NOT be NULL.
+ * @commit: Commit the changes, can be NULL.
+ * @abort: Abort the changes on fail, can be NULL.
+ * @clean: Clean up resources after all transaction actions have called
+ *         commit() or abort(). Can be NULL.
+ *
+ * Only prepare() may fail. In a single transaction, only one of commit() or
+ * abort() will be called. clean() will always be called if it is present.
+ */
+typedef struct BlkActionOps {
     size_t instance_size;
-    /* Prepare the work, must NOT be NULL. */
-    void (*prepare)(BlkTransactionState *common, Error **errp);
-    /* Commit the changes, can be NULL. */
-    void (*commit)(BlkTransactionState *common);
-    /* Abort the changes on fail, can be NULL. */
-    void (*abort)(BlkTransactionState *common);
-    /* Clean up resource in the end, can be NULL. */
-    void (*clean)(BlkTransactionState *common);
-} BdrvActionOps;
+    void (*prepare)(BlkActionState *common, Error **errp);
+    void (*commit)(BlkActionState *common);
+    void (*abort)(BlkActionState *common);
+    void (*clean)(BlkActionState *common);
+} BlkActionOps;
 
-/*
- * This structure must be arranged as first member in child type, assuming
- * that compiler will also arrange it to the same address with parent instance.
- * Later it will be used in free().
+/**
+ * BlkActionState:
+ * Describes one Action's state within a Transaction.
+ *
+ * @action: QAPI-defined enum identifying which Action to perform.
+ * @ops: Table of ActionOps this Action can perform.
+ * @block_job_txn: Transaction which this action belongs to.
+ * @entry: List membership for all Actions in this Transaction.
+ *
+ * This structure must be arranged as first member in a subclassed type,
+ * assuming that the compiler will also arrange it to the same offsets as the
+ * base class.
  */
-struct BlkTransactionState {
+struct BlkActionState {
     TransactionAction *action;
-    const BdrvActionOps *ops;
-    QSIMPLEQ_ENTRY(BlkTransactionState) entry;
+    const BlkActionOps *ops;
+    BlockJobTxn *block_job_txn;
+    TransactionProperties *txn_props;
+    QSIMPLEQ_ENTRY(BlkActionState) entry;
 };
 
 /* internal snapshot private data */
 typedef struct InternalSnapshotState {
-    BlkTransactionState common;
+    BlkActionState common;
     BlockDriverState *bs;
     AioContext *aio_context;
     QEMUSnapshotInfo sn;
     bool created;
 } InternalSnapshotState;
 
-static void internal_snapshot_prepare(BlkTransactionState *common,
+
+static int action_check_completion_mode(BlkActionState *s, Error **errp)
+{
+    if (s->txn_props->completion_mode != ACTION_COMPLETION_MODE_INDIVIDUAL) {
+        error_setg(errp,
+                   "Action '%s' does not support Transaction property "
+                   "completion-mode = %s",
+                   TransactionActionKind_lookup[s->action->type],
+                   ActionCompletionMode_lookup[s->txn_props->completion_mode]);
+        return -1;
+    }
+    return 0;
+}
+
+static void internal_snapshot_prepare(BlkActionState *common,
                                       Error **errp)
 {
     Error *local_err = NULL;
@@ -1421,6 +1464,10 @@ static void internal_snapshot_prepare(BlkTransactionState *common,
     name = internal->name;
 
     /* 2. check for validation */
+    if (action_check_completion_mode(common, errp) < 0) {
+        return;
+    }
+
     blk = blk_by_name(device);
     if (!blk) {
         error_set(errp, ERROR_CLASS_DEVICE_NOT_FOUND,
@@ -1495,7 +1542,7 @@ static void internal_snapshot_prepare(BlkTransactionState *common,
     state->created = true;
 }
 
-static void internal_snapshot_abort(BlkTransactionState *common)
+static void internal_snapshot_abort(BlkActionState *common)
 {
     InternalSnapshotState *state =
                              DO_UPCAST(InternalSnapshotState, common, common);
@@ -1518,7 +1565,7 @@ static void internal_snapshot_abort(BlkTransactionState *common)
     }
 }
 
-static void internal_snapshot_clean(BlkTransactionState *common)
+static void internal_snapshot_clean(BlkActionState *common)
 {
     InternalSnapshotState *state = DO_UPCAST(InternalSnapshotState,
                                              common, common);
@@ -1533,13 +1580,13 @@ static void internal_snapshot_clean(BlkTransactionState *common)
 
 /* external snapshot private data */
 typedef struct ExternalSnapshotState {
-    BlkTransactionState common;
+    BlkActionState common;
     BlockDriverState *old_bs;
     BlockDriverState *new_bs;
     AioContext *aio_context;
 } ExternalSnapshotState;
 
-static void external_snapshot_prepare(BlkTransactionState *common,
+static void external_snapshot_prepare(BlkActionState *common,
                                       Error **errp)
 {
     int flags = 0, ret;
@@ -1582,6 +1629,10 @@ static void external_snapshot_prepare(BlkTransactionState *common,
     }
 
     /* start processing */
+    if (action_check_completion_mode(common, errp) < 0) {
+        return;
+    }
+
     state->old_bs = bdrv_lookup_bs(device, node_name, errp);
     if (!state->old_bs) {
         return;
@@ -1686,7 +1737,7 @@ static void external_snapshot_prepare(BlkTransactionState *common,
     }
 }
 
-static void external_snapshot_commit(BlkTransactionState *common)
+static void external_snapshot_commit(BlkActionState *common)
 {
     ExternalSnapshotState *state =
                              DO_UPCAST(ExternalSnapshotState, common, common);
@@ -1702,7 +1753,7 @@ static void external_snapshot_commit(BlkTransactionState *common)
                 NULL);
 }
 
-static void external_snapshot_abort(BlkTransactionState *common)
+static void external_snapshot_abort(BlkActionState *common)
 {
     ExternalSnapshotState *state =
                              DO_UPCAST(ExternalSnapshotState, common, common);
@@ -1711,7 +1762,7 @@ static void external_snapshot_abort(BlkTransactionState *common)
     }
 }
 
-static void external_snapshot_clean(BlkTransactionState *common)
+static void external_snapshot_clean(BlkActionState *common)
 {
     ExternalSnapshotState *state =
                              DO_UPCAST(ExternalSnapshotState, common, common);
@@ -1722,13 +1773,25 @@ static void external_snapshot_clean(BlkTransactionState *common)
 }
 
 typedef struct DriveBackupState {
-    BlkTransactionState common;
+    BlkActionState common;
     BlockDriverState *bs;
     AioContext *aio_context;
     BlockJob *job;
 } DriveBackupState;
 
-static void drive_backup_prepare(BlkTransactionState *common, Error **errp)
+static void do_drive_backup(const char *device, const char *target,
+                            bool has_format, const char *format,
+                            enum MirrorSyncMode sync,
+                            bool has_mode, enum NewImageMode mode,
+                            bool has_speed, int64_t speed,
+                            bool has_bitmap, const char *bitmap,
+                            bool has_on_source_error,
+                            BlockdevOnError on_source_error,
+                            bool has_on_target_error,
+                            BlockdevOnError on_target_error,
+                            BlockJobTxn *txn, Error **errp);
+
+static void drive_backup_prepare(BlkActionState *common, Error **errp)
 {
     DriveBackupState *state = DO_UPCAST(DriveBackupState, common, common);
     BlockBackend *blk;
@@ -1756,15 +1819,15 @@ static void drive_backup_prepare(BlkTransactionState *common, Error **errp)
     bdrv_drained_begin(blk_bs(blk));
     state->bs = blk_bs(blk);
 
-    qmp_drive_backup(backup->device, backup->target,
-                     backup->has_format, backup->format,
-                     backup->sync,
-                     backup->has_mode, backup->mode,
-                     backup->has_speed, backup->speed,
-                     backup->has_bitmap, backup->bitmap,
-                     backup->has_on_source_error, backup->on_source_error,
-                     backup->has_on_target_error, backup->on_target_error,
-                     &local_err);
+    do_drive_backup(backup->device, backup->target,
+                    backup->has_format, backup->format,
+                    backup->sync,
+                    backup->has_mode, backup->mode,
+                    backup->has_speed, backup->speed,
+                    backup->has_bitmap, backup->bitmap,
+                    backup->has_on_source_error, backup->on_source_error,
+                    backup->has_on_target_error, backup->on_target_error,
+                    common->block_job_txn, &local_err);
     if (local_err) {
         error_propagate(errp, local_err);
         return;
@@ -1773,7 +1836,7 @@ static void drive_backup_prepare(BlkTransactionState *common, Error **errp)
     state->job = state->bs->job;
 }
 
-static void drive_backup_abort(BlkTransactionState *common)
+static void drive_backup_abort(BlkActionState *common)
 {
     DriveBackupState *state = DO_UPCAST(DriveBackupState, common, common);
     BlockDriverState *bs = state->bs;
@@ -1784,7 +1847,7 @@ static void drive_backup_abort(BlkTransactionState *common)
     }
 }
 
-static void drive_backup_clean(BlkTransactionState *common)
+static void drive_backup_clean(BlkActionState *common)
 {
     DriveBackupState *state = DO_UPCAST(DriveBackupState, common, common);
 
@@ -1795,13 +1858,22 @@ static void drive_backup_clean(BlkTransactionState *common)
 }
 
 typedef struct BlockdevBackupState {
-    BlkTransactionState common;
+    BlkActionState common;
     BlockDriverState *bs;
     BlockJob *job;
     AioContext *aio_context;
 } BlockdevBackupState;
 
-static void blockdev_backup_prepare(BlkTransactionState *common, Error **errp)
+static void do_blockdev_backup(const char *device, const char *target,
+                               enum MirrorSyncMode sync,
+                               bool has_speed, int64_t speed,
+                               bool has_on_source_error,
+                               BlockdevOnError on_source_error,
+                               bool has_on_target_error,
+                               BlockdevOnError on_target_error,
+                               BlockJobTxn *txn, Error **errp);
+
+static void blockdev_backup_prepare(BlkActionState *common, Error **errp)
 {
     BlockdevBackupState *state = DO_UPCAST(BlockdevBackupState, common, common);
     BlockdevBackup *backup;
@@ -1839,12 +1911,12 @@ static void blockdev_backup_prepare(BlkTransactionState *common, Error **errp)
     state->bs = blk_bs(blk);
     bdrv_drained_begin(state->bs);
 
-    qmp_blockdev_backup(backup->device, backup->target,
-                        backup->sync,
-                        backup->has_speed, backup->speed,
-                        backup->has_on_source_error, backup->on_source_error,
-                        backup->has_on_target_error, backup->on_target_error,
-                        &local_err);
+    do_blockdev_backup(backup->device, backup->target,
+                       backup->sync,
+                       backup->has_speed, backup->speed,
+                       backup->has_on_source_error, backup->on_source_error,
+                       backup->has_on_target_error, backup->on_target_error,
+                       common->block_job_txn, &local_err);
     if (local_err) {
         error_propagate(errp, local_err);
         return;
@@ -1853,7 +1925,7 @@ static void blockdev_backup_prepare(BlkTransactionState *common, Error **errp)
     state->job = state->bs->job;
 }
 
-static void blockdev_backup_abort(BlkTransactionState *common)
+static void blockdev_backup_abort(BlkActionState *common)
 {
     BlockdevBackupState *state = DO_UPCAST(BlockdevBackupState, common, common);
     BlockDriverState *bs = state->bs;
@@ -1864,7 +1936,7 @@ static void blockdev_backup_abort(BlkTransactionState *common)
     }
 }
 
-static void blockdev_backup_clean(BlkTransactionState *common)
+static void blockdev_backup_clean(BlkActionState *common)
 {
     BlockdevBackupState *state = DO_UPCAST(BlockdevBackupState, common, common);
 
@@ -1874,17 +1946,125 @@ static void blockdev_backup_clean(BlkTransactionState *common)
     }
 }
 
-static void abort_prepare(BlkTransactionState *common, Error **errp)
+typedef struct BlockDirtyBitmapState {
+    BlkActionState common;
+    BdrvDirtyBitmap *bitmap;
+    BlockDriverState *bs;
+    AioContext *aio_context;
+    HBitmap *backup;
+    bool prepared;
+} BlockDirtyBitmapState;
+
+static void block_dirty_bitmap_add_prepare(BlkActionState *common,
+                                           Error **errp)
+{
+    Error *local_err = NULL;
+    BlockDirtyBitmapAdd *action;
+    BlockDirtyBitmapState *state = DO_UPCAST(BlockDirtyBitmapState,
+                                             common, common);
+
+    if (action_check_completion_mode(common, errp) < 0) {
+        return;
+    }
+
+    action = common->action->u.block_dirty_bitmap_add;
+    /* AIO context taken and released within qmp_block_dirty_bitmap_add */
+    qmp_block_dirty_bitmap_add(action->node, action->name,
+                               action->has_granularity, action->granularity,
+                               &local_err);
+
+    if (!local_err) {
+        state->prepared = true;
+    } else {
+        error_propagate(errp, local_err);
+    }
+}
+
+static void block_dirty_bitmap_add_abort(BlkActionState *common)
+{
+    BlockDirtyBitmapAdd *action;
+    BlockDirtyBitmapState *state = DO_UPCAST(BlockDirtyBitmapState,
+                                             common, common);
+
+    action = common->action->u.block_dirty_bitmap_add;
+    /* Should not be able to fail: IF the bitmap was added via .prepare(),
+     * then the node reference and bitmap name must have been valid.
+     */
+    if (state->prepared) {
+        qmp_block_dirty_bitmap_remove(action->node, action->name, &error_abort);
+    }
+}
+
+static void block_dirty_bitmap_clear_prepare(BlkActionState *common,
+                                             Error **errp)
+{
+    BlockDirtyBitmapState *state = DO_UPCAST(BlockDirtyBitmapState,
+                                             common, common);
+    BlockDirtyBitmap *action;
+
+    if (action_check_completion_mode(common, errp) < 0) {
+        return;
+    }
+
+    action = common->action->u.block_dirty_bitmap_clear;
+    state->bitmap = block_dirty_bitmap_lookup(action->node,
+                                              action->name,
+                                              &state->bs,
+                                              &state->aio_context,
+                                              errp);
+    if (!state->bitmap) {
+        return;
+    }
+
+    if (bdrv_dirty_bitmap_frozen(state->bitmap)) {
+        error_setg(errp, "Cannot modify a frozen bitmap");
+        return;
+    } else if (!bdrv_dirty_bitmap_enabled(state->bitmap)) {
+        error_setg(errp, "Cannot clear a disabled bitmap");
+        return;
+    }
+
+    bdrv_clear_dirty_bitmap(state->bitmap, &state->backup);
+    /* AioContext is released in .clean() */
+}
+
+static void block_dirty_bitmap_clear_abort(BlkActionState *common)
+{
+    BlockDirtyBitmapState *state = DO_UPCAST(BlockDirtyBitmapState,
+                                             common, common);
+
+    bdrv_undo_clear_dirty_bitmap(state->bitmap, state->backup);
+}
+
+static void block_dirty_bitmap_clear_commit(BlkActionState *common)
+{
+    BlockDirtyBitmapState *state = DO_UPCAST(BlockDirtyBitmapState,
+                                             common, common);
+
+    hbitmap_free(state->backup);
+}
+
+static void block_dirty_bitmap_clear_clean(BlkActionState *common)
+{
+    BlockDirtyBitmapState *state = DO_UPCAST(BlockDirtyBitmapState,
+                                             common, common);
+
+    if (state->aio_context) {
+        aio_context_release(state->aio_context);
+    }
+}
+
+static void abort_prepare(BlkActionState *common, Error **errp)
 {
     error_setg(errp, "Transaction aborted using Abort action");
 }
 
-static void abort_commit(BlkTransactionState *common)
+static void abort_commit(BlkActionState *common)
 {
     g_assert_not_reached(); /* this action never succeeds */
 }
 
-static const BdrvActionOps actions[] = {
+static const BlkActionOps actions[] = {
     [TRANSACTION_ACTION_KIND_BLOCKDEV_SNAPSHOT] = {
         .instance_size = sizeof(ExternalSnapshotState),
         .prepare  = external_snapshot_prepare,
@@ -1911,7 +2091,7 @@ static const BdrvActionOps actions[] = {
         .clean = blockdev_backup_clean,
     },
     [TRANSACTION_ACTION_KIND_ABORT] = {
-        .instance_size = sizeof(BlkTransactionState),
+        .instance_size = sizeof(BlkActionState),
         .prepare = abort_prepare,
         .commit = abort_commit,
     },
@@ -1921,28 +2101,71 @@ static const BdrvActionOps actions[] = {
         .abort = internal_snapshot_abort,
         .clean = internal_snapshot_clean,
     },
+    [TRANSACTION_ACTION_KIND_BLOCK_DIRTY_BITMAP_ADD] = {
+        .instance_size = sizeof(BlockDirtyBitmapState),
+        .prepare = block_dirty_bitmap_add_prepare,
+        .abort = block_dirty_bitmap_add_abort,
+    },
+    [TRANSACTION_ACTION_KIND_BLOCK_DIRTY_BITMAP_CLEAR] = {
+        .instance_size = sizeof(BlockDirtyBitmapState),
+        .prepare = block_dirty_bitmap_clear_prepare,
+        .commit = block_dirty_bitmap_clear_commit,
+        .abort = block_dirty_bitmap_clear_abort,
+        .clean = block_dirty_bitmap_clear_clean,
+    }
 };
 
+/**
+ * Allocate a TransactionProperties structure if necessary, and fill
+ * that structure with desired defaults if they are unset.
+ */
+static TransactionProperties *get_transaction_properties(
+    TransactionProperties *props)
+{
+    if (!props) {
+        props = g_new0(TransactionProperties, 1);
+    }
+
+    if (!props->has_completion_mode) {
+        props->has_completion_mode = true;
+        props->completion_mode = ACTION_COMPLETION_MODE_INDIVIDUAL;
+    }
+
+    return props;
+}
+
 /*
  * 'Atomic' group operations.  The operations are performed as a set, and if
  * any fail then we roll back all operations in the group.
  */
-void qmp_transaction(TransactionActionList *dev_list, Error **errp)
+void qmp_transaction(TransactionActionList *dev_list,
+                     bool has_props,
+                     struct TransactionProperties *props,
+                     Error **errp)
 {
     TransactionActionList *dev_entry = dev_list;
-    BlkTransactionState *state, *next;
+    BlockJobTxn *block_job_txn = NULL;
+    BlkActionState *state, *next;
     Error *local_err = NULL;
 
-    QSIMPLEQ_HEAD(snap_bdrv_states, BlkTransactionState) snap_bdrv_states;
+    QSIMPLEQ_HEAD(snap_bdrv_states, BlkActionState) snap_bdrv_states;
     QSIMPLEQ_INIT(&snap_bdrv_states);
 
+    /* Does this transaction get canceled as a group on failure?
+     * If not, we don't really need to make a BlockJobTxn.
+     */
+    props = get_transaction_properties(props);
+    if (props->completion_mode != ACTION_COMPLETION_MODE_INDIVIDUAL) {
+        block_job_txn = block_job_txn_new();
+    }
+
     /* drain all i/o before any operations */
     bdrv_drain_all();
 
     /* We don't do anything in this loop that commits us to the operations */
     while (NULL != dev_entry) {
         TransactionAction *dev_info = NULL;
-        const BdrvActionOps *ops;
+        const BlkActionOps *ops;
 
         dev_info = dev_entry->value;
         dev_entry = dev_entry->next;
@@ -1955,6 +2178,8 @@ void qmp_transaction(TransactionActionList *dev_list, Error **errp)
         state = g_malloc0(ops->instance_size);
         state->ops = ops;
         state->action = dev_info;
+        state->block_job_txn = block_job_txn;
+        state->txn_props = props;
         QSIMPLEQ_INSERT_TAIL(&snap_bdrv_states, state, entry);
 
         state->ops->prepare(state, &local_err);
@@ -1987,6 +2212,10 @@ exit:
         }
         g_free(state);
     }
+    if (!has_props) {
+        qapi_free_TransactionProperties(props);
+    }
+    block_job_txn_unref(block_job_txn);
 }
 
 void qmp_eject(const char *device, bool has_force, bool force, Error **errp)
@@ -2472,7 +2701,7 @@ void qmp_block_dirty_bitmap_clear(const char *node, const char *name,
         goto out;
     }
 
-    bdrv_clear_dirty_bitmap(bitmap);
+    bdrv_clear_dirty_bitmap(bitmap, NULL);
 
  out:
     aio_context_release(aio_context);
@@ -2615,8 +2844,6 @@ static void block_job_cb(void *opaque, int ret)
     } else {
         block_job_event_completed(bs->job, msg);
     }
-
-    bdrv_put_ref_bh_schedule(bs);
 }
 
 void qmp_block_stream(const char *device,
@@ -2797,15 +3024,17 @@ out:
     aio_context_release(aio_context);
 }
 
-void qmp_drive_backup(const char *device, const char *target,
-                      bool has_format, const char *format,
-                      enum MirrorSyncMode sync,
-                      bool has_mode, enum NewImageMode mode,
-                      bool has_speed, int64_t speed,
-                      bool has_bitmap, const char *bitmap,
-                      bool has_on_source_error, BlockdevOnError on_source_error,
-                      bool has_on_target_error, BlockdevOnError on_target_error,
-                      Error **errp)
+static void do_drive_backup(const char *device, const char *target,
+                            bool has_format, const char *format,
+                            enum MirrorSyncMode sync,
+                            bool has_mode, enum NewImageMode mode,
+                            bool has_speed, int64_t speed,
+                            bool has_bitmap, const char *bitmap,
+                            bool has_on_source_error,
+                            BlockdevOnError on_source_error,
+                            bool has_on_target_error,
+                            BlockdevOnError on_target_error,
+                            BlockJobTxn *txn, Error **errp)
 {
     BlockBackend *blk;
     BlockDriverState *bs;
@@ -2920,7 +3149,7 @@ void qmp_drive_backup(const char *device, const char *target,
 
     backup_start(bs, target_bs, speed, sync, bmap,
                  on_source_error, on_target_error,
-                 block_job_cb, bs, &local_err);
+                 block_job_cb, bs, txn, &local_err);
     if (local_err != NULL) {
         bdrv_unref(target_bs);
         error_propagate(errp, local_err);
@@ -2931,19 +3160,37 @@ out:
     aio_context_release(aio_context);
 }
 
+void qmp_drive_backup(const char *device, const char *target,
+                      bool has_format, const char *format,
+                      enum MirrorSyncMode sync,
+                      bool has_mode, enum NewImageMode mode,
+                      bool has_speed, int64_t speed,
+                      bool has_bitmap, const char *bitmap,
+                      bool has_on_source_error, BlockdevOnError on_source_error,
+                      bool has_on_target_error, BlockdevOnError on_target_error,
+                      Error **errp)
+{
+    return do_drive_backup(device, target, has_format, format, sync,
+                           has_mode, mode, has_speed, speed,
+                           has_bitmap, bitmap,
+                           has_on_source_error, on_source_error,
+                           has_on_target_error, on_target_error,
+                           NULL, errp);
+}
+
 BlockDeviceInfoList *qmp_query_named_block_nodes(Error **errp)
 {
     return bdrv_named_nodes_list(errp);
 }
 
-void qmp_blockdev_backup(const char *device, const char *target,
+void do_blockdev_backup(const char *device, const char *target,
                          enum MirrorSyncMode sync,
                          bool has_speed, int64_t speed,
                          bool has_on_source_error,
                          BlockdevOnError on_source_error,
                          bool has_on_target_error,
                          BlockdevOnError on_target_error,
-                         Error **errp)
+                         BlockJobTxn *txn, Error **errp)
 {
     BlockBackend *blk, *target_blk;
     BlockDriverState *bs;
@@ -2991,7 +3238,7 @@ void qmp_blockdev_backup(const char *device, const char *target,
     bdrv_ref(target_bs);
     bdrv_set_aio_context(target_bs, aio_context);
     backup_start(bs, target_bs, speed, sync, NULL, on_source_error,
-                 on_target_error, block_job_cb, bs, &local_err);
+                 on_target_error, block_job_cb, bs, txn, &local_err);
     if (local_err != NULL) {
         bdrv_unref(target_bs);
         error_propagate(errp, local_err);
@@ -3000,6 +3247,21 @@ out:
     aio_context_release(aio_context);
 }
 
+void qmp_blockdev_backup(const char *device, const char *target,
+                         enum MirrorSyncMode sync,
+                         bool has_speed, int64_t speed,
+                         bool has_on_source_error,
+                         BlockdevOnError on_source_error,
+                         bool has_on_target_error,
+                         BlockdevOnError on_target_error,
+                         Error **errp)
+{
+    do_blockdev_backup(device, target, sync, has_speed, speed,
+                       has_on_source_error, on_source_error,
+                       has_on_target_error, on_target_error,
+                       NULL, errp);
+}
+
 void qmp_drive_mirror(const char *device, const char *target,
                       bool has_format, const char *format,
                       bool has_node_name, const char *node_name,
@@ -3676,6 +3938,21 @@ QemuOptsList qemu_common_drive_opts = {
             .name = "detect-zeroes",
             .type = QEMU_OPT_STRING,
             .help = "try to optimize zero writes (off, on, unmap)",
+        },{
+            .name = "stats-account-invalid",
+            .type = QEMU_OPT_BOOL,
+            .help = "whether to account for invalid I/O operations "
+                    "in the statistics",
+        },{
+            .name = "stats-account-failed",
+            .type = QEMU_OPT_BOOL,
+            .help = "whether to account for failed I/O operations "
+                    "in the statistics",
+        },{
+            .name = "stats-intervals",
+            .type = QEMU_OPT_STRING,
+            .help = "colon-separated list of intervals "
+                    "for collecting I/O statistics, in seconds",
         },
         { /* end of list */ }
     },
diff --git a/blockjob.c b/blockjob.c
index c02fe598b8..80adb9d52a 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -37,6 +37,19 @@
 #include "qemu/timer.h"
 #include "qapi-event.h"
 
+/* Transactional group of block jobs */
+struct BlockJobTxn {
+
+    /* Is this txn being cancelled? */
+    bool aborting;
+
+    /* List of jobs */
+    QLIST_HEAD(, BlockJob) jobs;
+
+    /* Reference count */
+    int refcnt;
+};
+
 void *block_job_create(const BlockJobDriver *driver, BlockDriverState *bs,
                        int64_t speed, BlockCompletionFunc *cb,
                        void *opaque, Error **errp)
@@ -60,6 +73,7 @@ void *block_job_create(const BlockJobDriver *driver, BlockDriverState *bs,
     job->cb            = cb;
     job->opaque        = opaque;
     job->busy          = true;
+    job->refcnt        = 1;
     bs->job = job;
 
     /* Only set speed when necessary to avoid NotSupported error */
@@ -68,7 +82,7 @@ void *block_job_create(const BlockJobDriver *driver, BlockDriverState *bs,
 
         block_job_set_speed(job, speed, &local_err);
         if (local_err) {
-            block_job_release(bs);
+            block_job_unref(job);
             error_propagate(errp, local_err);
             return NULL;
         }
@@ -76,15 +90,101 @@ void *block_job_create(const BlockJobDriver *driver, BlockDriverState *bs,
     return job;
 }
 
-void block_job_release(BlockDriverState *bs)
+void block_job_ref(BlockJob *job)
 {
-    BlockJob *job = bs->job;
+    ++job->refcnt;
+}
 
-    bs->job = NULL;
-    bdrv_op_unblock_all(bs, job->blocker);
-    error_free(job->blocker);
-    g_free(job->id);
-    g_free(job);
+void block_job_unref(BlockJob *job)
+{
+    if (--job->refcnt == 0) {
+        job->bs->job = NULL;
+        bdrv_op_unblock_all(job->bs, job->blocker);
+        bdrv_unref(job->bs);
+        error_free(job->blocker);
+        g_free(job->id);
+        g_free(job);
+    }
+}
+
+static void block_job_completed_single(BlockJob *job)
+{
+    if (!job->ret) {
+        if (job->driver->commit) {
+            job->driver->commit(job);
+        }
+    } else {
+        if (job->driver->abort) {
+            job->driver->abort(job);
+        }
+    }
+    job->cb(job->opaque, job->ret);
+    if (job->txn) {
+        block_job_txn_unref(job->txn);
+    }
+    block_job_unref(job);
+}
+
+static void block_job_completed_txn_abort(BlockJob *job)
+{
+    AioContext *ctx;
+    BlockJobTxn *txn = job->txn;
+    BlockJob *other_job, *next;
+
+    if (txn->aborting) {
+        /*
+         * We are cancelled by another job, which will handle everything.
+         */
+        return;
+    }
+    txn->aborting = true;
+    /* We are the first failed job. Cancel other jobs. */
+    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
+        ctx = bdrv_get_aio_context(other_job->bs);
+        aio_context_acquire(ctx);
+    }
+    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
+        if (other_job == job || other_job->completed) {
+            /* Other jobs are "effectively" cancelled by us, set the status for
+             * them; this job, however, may or may not be cancelled, depending
+             * on the caller, so leave it. */
+            if (other_job != job) {
+                other_job->cancelled = true;
+            }
+            continue;
+        }
+        block_job_cancel_sync(other_job);
+        assert(other_job->completed);
+    }
+    QLIST_FOREACH_SAFE(other_job, &txn->jobs, txn_list, next) {
+        ctx = bdrv_get_aio_context(other_job->bs);
+        block_job_completed_single(other_job);
+        aio_context_release(ctx);
+    }
+}
+
+static void block_job_completed_txn_success(BlockJob *job)
+{
+    AioContext *ctx;
+    BlockJobTxn *txn = job->txn;
+    BlockJob *other_job, *next;
+    /*
+     * Successful completion, see if there are other running jobs in this
+     * txn.
+     */
+    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
+        if (!other_job->completed) {
+            return;
+        }
+    }
+    /* We are the last completed job, commit the transaction. */
+    QLIST_FOREACH_SAFE(other_job, &txn->jobs, txn_list, next) {
+        ctx = bdrv_get_aio_context(other_job->bs);
+        aio_context_acquire(ctx);
+        assert(other_job->ret == 0);
+        block_job_completed_single(other_job);
+        aio_context_release(ctx);
+    }
 }
 
 void block_job_completed(BlockJob *job, int ret)
@@ -92,8 +192,16 @@ void block_job_completed(BlockJob *job, int ret)
     BlockDriverState *bs = job->bs;
 
     assert(bs->job == job);
-    job->cb(job->opaque, ret);
-    block_job_release(bs);
+    assert(!job->completed);
+    job->completed = true;
+    job->ret = ret;
+    if (!job->txn) {
+        block_job_completed_single(job);
+    } else if (ret < 0 || block_job_is_cancelled(job)) {
+        block_job_completed_txn_abort(job);
+    } else {
+        block_job_completed_txn_success(job);
+    }
 }
 
 void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
@@ -178,43 +286,29 @@ struct BlockFinishData {
     int ret;
 };
 
-static void block_job_finish_cb(void *opaque, int ret)
-{
-    struct BlockFinishData *data = opaque;
-
-    data->cancelled = block_job_is_cancelled(data->job);
-    data->ret = ret;
-    data->cb(data->opaque, ret);
-}
-
 static int block_job_finish_sync(BlockJob *job,
                                  void (*finish)(BlockJob *, Error **errp),
                                  Error **errp)
 {
-    struct BlockFinishData data;
     BlockDriverState *bs = job->bs;
     Error *local_err = NULL;
+    int ret;
 
     assert(bs->job == job);
 
-    /* Set up our own callback to store the result and chain to
-     * the original callback.
-     */
-    data.job = job;
-    data.cb = job->cb;
-    data.opaque = job->opaque;
-    data.ret = -EINPROGRESS;
-    job->cb = block_job_finish_cb;
-    job->opaque = &data;
+    block_job_ref(job);
     finish(job, &local_err);
     if (local_err) {
         error_propagate(errp, local_err);
+        block_job_unref(job);
         return -EBUSY;
     }
-    while (data.ret == -EINPROGRESS) {
+    while (!job->completed) {
         aio_poll(bdrv_get_aio_context(bs), true);
     }
-    return (data.cancelled && data.ret == 0) ? -ECANCELED : data.ret;
+    ret = (job->cancelled && job->ret == 0) ? -ECANCELED : job->ret;
+    block_job_unref(job);
+    return ret;
 }
 
 /* A wrapper around block_job_cancel() taking an Error ** parameter so it may be
@@ -406,3 +500,36 @@ void block_job_defer_to_main_loop(BlockJob *job,
 
     qemu_bh_schedule(data->bh);
 }
+
+BlockJobTxn *block_job_txn_new(void)
+{
+    BlockJobTxn *txn = g_new0(BlockJobTxn, 1);
+    QLIST_INIT(&txn->jobs);
+    txn->refcnt = 1;
+    return txn;
+}
+
+static void block_job_txn_ref(BlockJobTxn *txn)
+{
+    txn->refcnt++;
+}
+
+void block_job_txn_unref(BlockJobTxn *txn)
+{
+    if (txn && --txn->refcnt == 0) {
+        g_free(txn);
+    }
+}
+
+void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job)
+{
+    if (!txn) {
+        return;
+    }
+
+    assert(!job->txn);
+    job->txn = txn;
+
+    QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
+    block_job_txn_ref(txn);
+}
diff --git a/docs/bitmaps.md b/docs/bitmaps.md
index fa87f077fe..9fd8ea65ea 100644
--- a/docs/bitmaps.md
+++ b/docs/bitmaps.md
@@ -97,11 +97,7 @@ which is included at the end of this document.
 }
 ```
 
-## Transactions (Not yet implemented)
-
-* Transactional commands are forthcoming in a future version,
-  and are not yet available for use. This section serves as
-  documentation of intent for their design and usage.
+## Transactions
 
 ### Justification
 
diff --git a/hmp.c b/hmp.c
index d3812831ce..21406059c7 100644
--- a/hmp.c
+++ b/hmp.c
@@ -522,6 +522,7 @@ void hmp_info_blockstats(Monitor *mon, const QDict *qdict)
                        " flush_total_time_ns=%" PRId64
                        " rd_merged=%" PRId64
                        " wr_merged=%" PRId64
+                       " idle_time_ns=%" PRId64
                        "\n",
                        stats->value->stats->rd_bytes,
                        stats->value->stats->wr_bytes,
@@ -532,7 +533,8 @@ void hmp_info_blockstats(Monitor *mon, const QDict *qdict)
                        stats->value->stats->rd_total_time_ns,
                        stats->value->stats->flush_total_time_ns,
                        stats->value->stats->rd_merged,
-                       stats->value->stats->wr_merged);
+                       stats->value->stats->wr_merged,
+                       stats->value->stats->idle_time_ns);
     }
 
     qapi_free_BlockStatsList(stats_list);
diff --git a/hw/block/nvme.c b/hw/block/nvme.c
index 5da41b23cf..169e4fa7a5 100644
--- a/hw/block/nvme.c
+++ b/hw/block/nvme.c
@@ -201,10 +201,11 @@ static void nvme_rw_cb(void *opaque, int ret)
     NvmeCtrl *n = sq->ctrl;
     NvmeCQueue *cq = n->cq[sq->cqid];
 
-    block_acct_done(blk_get_stats(n->conf.blk), &req->acct);
     if (!ret) {
+        block_acct_done(blk_get_stats(n->conf.blk), &req->acct);
         req->status = NVME_SUCCESS;
     } else {
+        block_acct_failed(blk_get_stats(n->conf.blk), &req->acct);
         req->status = NVME_INTERNAL_DEV_ERROR;
     }
     if (req->has_sg) {
@@ -238,18 +239,22 @@ static uint16_t nvme_rw(NvmeCtrl *n, NvmeNamespace *ns, NvmeCmd *cmd,
     uint64_t data_size = (uint64_t)nlb << data_shift;
     uint64_t aio_slba  = slba << (data_shift - BDRV_SECTOR_BITS);
     int is_write = rw->opcode == NVME_CMD_WRITE ? 1 : 0;
+    enum BlockAcctType acct = is_write ? BLOCK_ACCT_WRITE : BLOCK_ACCT_READ;
 
     if ((slba + nlb) > ns->id_ns.nsze) {
+        block_acct_invalid(blk_get_stats(n->conf.blk), acct);
         return NVME_LBA_RANGE | NVME_DNR;
     }
+
     if (nvme_map_prp(&req->qsg, prp1, prp2, data_size, n)) {
+        block_acct_invalid(blk_get_stats(n->conf.blk), acct);
         return NVME_INVALID_FIELD | NVME_DNR;
     }
+
     assert((nlb << data_shift) == req->qsg.size);
 
     req->has_sg = true;
-    dma_acct_start(n->conf.blk, &req->acct, &req->qsg,
-                   is_write ? BLOCK_ACCT_WRITE : BLOCK_ACCT_READ);
+    dma_acct_start(n->conf.blk, &req->acct, &req->qsg, acct);
     req->aiocb = is_write ?
         dma_blk_write(n->conf.blk, &req->qsg, aio_slba, nvme_rw_cb, req) :
         dma_blk_read(n->conf.blk, &req->qsg, aio_slba, nvme_rw_cb, req);
diff --git a/hw/block/virtio-blk.c b/hw/block/virtio-blk.c
index 093e475dc9..e70fccf80c 100644
--- a/hw/block/virtio-blk.c
+++ b/hw/block/virtio-blk.c
@@ -76,7 +76,7 @@ static int virtio_blk_handle_rw_error(VirtIOBlockReq *req, int error,
         s->rq = req;
     } else if (action == BLOCK_ERROR_ACTION_REPORT) {
         virtio_blk_req_complete(req, VIRTIO_BLK_S_IOERR);
-        block_acct_done(blk_get_stats(s->blk), &req->acct);
+        block_acct_failed(blk_get_stats(s->blk), &req->acct);
         virtio_blk_free_request(req);
     }
 
@@ -536,6 +536,8 @@ void virtio_blk_handle_request(VirtIOBlockReq *req, MultiReqBuffer *mrb)
         if (!virtio_blk_sect_range_ok(req->dev, req->sector_num,
                                       req->qiov.size)) {
             virtio_blk_req_complete(req, VIRTIO_BLK_S_IOERR);
+            block_acct_invalid(blk_get_stats(req->dev->blk),
+                               is_write ? BLOCK_ACCT_WRITE : BLOCK_ACCT_READ);
             virtio_blk_free_request(req);
             return;
         }
diff --git a/hw/block/xen_disk.c b/hw/block/xen_disk.c
index 1bbc111939..02eda6efbc 100644
--- a/hw/block/xen_disk.c
+++ b/hw/block/xen_disk.c
@@ -537,7 +537,11 @@ static void qemu_aio_complete(void *opaque, int ret)
             break;
         }
     case BLKIF_OP_READ:
-        block_acct_done(blk_get_stats(ioreq->blkdev->blk), &ioreq->acct);
+        if (ioreq->status == BLKIF_RSP_OKAY) {
+            block_acct_done(blk_get_stats(ioreq->blkdev->blk), &ioreq->acct);
+        } else {
+            block_acct_failed(blk_get_stats(ioreq->blkdev->blk), &ioreq->acct);
+        }
         break;
     case BLKIF_OP_DISCARD:
     default:
@@ -576,7 +580,9 @@ static int ioreq_runio_qemu_aio(struct ioreq *ioreq)
         }
 
         block_acct_start(blk_get_stats(blkdev->blk), &ioreq->acct,
-                         ioreq->v.size, BLOCK_ACCT_WRITE);
+                         ioreq->v.size,
+                         ioreq->req.operation == BLKIF_OP_WRITE ?
+                         BLOCK_ACCT_WRITE : BLOCK_ACCT_FLUSH);
         ioreq->aio_inflight++;
         blk_aio_writev(blkdev->blk, ioreq->start / BLOCK_SIZE,
                        &ioreq->v, ioreq->v.size / BLOCK_SIZE,
@@ -720,6 +726,23 @@ static void blk_handle_requests(struct XenBlkDev *blkdev)
 
         /* parse them */
         if (ioreq_parse(ioreq) != 0) {
+
+            switch (ioreq->req.operation) {
+            case BLKIF_OP_READ:
+                block_acct_invalid(blk_get_stats(blkdev->blk),
+                                   BLOCK_ACCT_READ);
+                break;
+            case BLKIF_OP_WRITE:
+                block_acct_invalid(blk_get_stats(blkdev->blk),
+                                   BLOCK_ACCT_WRITE);
+                break;
+            case BLKIF_OP_FLUSH_DISKCACHE:
+                block_acct_invalid(blk_get_stats(blkdev->blk),
+                                   BLOCK_ACCT_FLUSH);
+            default:
+                break;
+            };
+
             if (blk_send_response_one(ioreq)) {
                 xen_be_send_notify(&blkdev->xendev);
             }
diff --git a/hw/ide/atapi.c b/hw/ide/atapi.c
index 747f46611e..cf0b78e3a4 100644
--- a/hw/ide/atapi.c
+++ b/hw/ide/atapi.c
@@ -108,27 +108,30 @@ static void cd_data_to_raw(uint8_t *buf, int lba)
 static int cd_read_sector(IDEState *s, int lba, uint8_t *buf, int sector_size)
 {
     int ret;
+    block_acct_start(blk_get_stats(s->blk), &s->acct,
+                     4 * BDRV_SECTOR_SIZE, BLOCK_ACCT_READ);
 
     switch(sector_size) {
     case 2048:
-        block_acct_start(blk_get_stats(s->blk), &s->acct,
-                         4 * BDRV_SECTOR_SIZE, BLOCK_ACCT_READ);
         ret = blk_read(s->blk, (int64_t)lba << 2, buf, 4);
-        block_acct_done(blk_get_stats(s->blk), &s->acct);
         break;
     case 2352:
-        block_acct_start(blk_get_stats(s->blk), &s->acct,
-                         4 * BDRV_SECTOR_SIZE, BLOCK_ACCT_READ);
         ret = blk_read(s->blk, (int64_t)lba << 2, buf + 16, 4);
-        block_acct_done(blk_get_stats(s->blk), &s->acct);
-        if (ret < 0)
-            return ret;
-        cd_data_to_raw(buf, lba);
+        if (ret >= 0) {
+            cd_data_to_raw(buf, lba);
+        }
         break;
     default:
-        ret = -EIO;
-        break;
+        block_acct_invalid(blk_get_stats(s->blk), BLOCK_ACCT_READ);
+        return -EIO;
+    }
+
+    if (ret < 0) {
+        block_acct_failed(blk_get_stats(s->blk), &s->acct);
+    } else {
+        block_acct_done(blk_get_stats(s->blk), &s->acct);
     }
+
     return ret;
 }
 
@@ -357,7 +360,11 @@ static void ide_atapi_cmd_read_dma_cb(void *opaque, int ret)
     return;
 
 eot:
-    block_acct_done(blk_get_stats(s->blk), &s->acct);
+    if (ret < 0) {
+        block_acct_failed(blk_get_stats(s->blk), &s->acct);
+    } else {
+        block_acct_done(blk_get_stats(s->blk), &s->acct);
+    }
     ide_set_inactive(s, false);
 }
 
diff --git a/hw/ide/core.c b/hw/ide/core.c
index 364ba21e0c..2725dd3b81 100644
--- a/hw/ide/core.c
+++ b/hw/ide/core.c
@@ -574,7 +574,6 @@ static void ide_sector_read_cb(void *opaque, int ret)
     if (ret == -ECANCELED) {
         return;
     }
-    block_acct_done(blk_get_stats(s->blk), &s->acct);
     if (ret != 0) {
         if (ide_handle_rw_error(s, -ret, IDE_RETRY_PIO |
                                 IDE_RETRY_READ)) {
@@ -582,6 +581,8 @@ static void ide_sector_read_cb(void *opaque, int ret)
         }
     }
 
+    block_acct_done(blk_get_stats(s->blk), &s->acct);
+
     n = s->nsector;
     if (n > s->req_nb_sectors) {
         n = s->req_nb_sectors;
@@ -621,6 +622,7 @@ static void ide_sector_read(IDEState *s)
 
     if (!ide_sect_range_ok(s, sector_num, n)) {
         ide_rw_error(s);
+        block_acct_invalid(blk_get_stats(s->blk), BLOCK_ACCT_READ);
         return;
     }
 
@@ -672,6 +674,7 @@ static int ide_handle_rw_error(IDEState *s, int error, int op)
         assert(s->bus->retry_unit == s->unit);
         s->bus->error_status = op;
     } else if (action == BLOCK_ERROR_ACTION_REPORT) {
+        block_acct_failed(blk_get_stats(s->blk), &s->acct);
         if (op & IDE_RETRY_DMA) {
             ide_dma_error(s);
         } else {
@@ -750,6 +753,7 @@ static void ide_dma_cb(void *opaque, int ret)
     if ((s->dma_cmd == IDE_DMA_READ || s->dma_cmd == IDE_DMA_WRITE) &&
         !ide_sect_range_ok(s, sector_num, n)) {
         ide_dma_error(s);
+        block_acct_invalid(blk_get_stats(s->blk), s->acct.type);
         return;
     }
 
@@ -826,7 +830,6 @@ static void ide_sector_write_cb(void *opaque, int ret)
     if (ret == -ECANCELED) {
         return;
     }
-    block_acct_done(blk_get_stats(s->blk), &s->acct);
 
     s->pio_aiocb = NULL;
     s->status &= ~BUSY_STAT;
@@ -837,6 +840,8 @@ static void ide_sector_write_cb(void *opaque, int ret)
         }
     }
 
+    block_acct_done(blk_get_stats(s->blk), &s->acct);
+
     n = s->nsector;
     if (n > s->req_nb_sectors) {
         n = s->req_nb_sectors;
@@ -887,6 +892,7 @@ static void ide_sector_write(IDEState *s)
 
     if (!ide_sect_range_ok(s, sector_num, n)) {
         ide_rw_error(s);
+        block_acct_invalid(blk_get_stats(s->blk), BLOCK_ACCT_WRITE);
         return;
     }
 
@@ -895,7 +901,7 @@ static void ide_sector_write(IDEState *s)
     qemu_iovec_init_external(&s->qiov, &s->iov, 1);
 
     block_acct_start(blk_get_stats(s->blk), &s->acct,
-                     n * BDRV_SECTOR_SIZE, BLOCK_ACCT_READ);
+                     n * BDRV_SECTOR_SIZE, BLOCK_ACCT_WRITE);
     s->pio_aiocb = blk_aio_writev(s->blk, sector_num, &s->qiov, n,
                                   ide_sector_write_cb, s);
 }
diff --git a/hw/ide/macio.c b/hw/ide/macio.c
index 893c9b9bae..3ee962f830 100644
--- a/hw/ide/macio.c
+++ b/hw/ide/macio.c
@@ -286,7 +286,11 @@ static void pmac_ide_atapi_transfer_cb(void *opaque, int ret)
     return;
 
 done:
-    block_acct_done(blk_get_stats(s->blk), &s->acct);
+    if (ret < 0) {
+        block_acct_failed(blk_get_stats(s->blk), &s->acct);
+    } else {
+        block_acct_done(blk_get_stats(s->blk), &s->acct);
+    }
     io->dma_end(opaque);
 
     return;
@@ -348,7 +352,11 @@ static void pmac_ide_transfer_cb(void *opaque, int ret)
 
 done:
     if (s->dma_cmd == IDE_DMA_READ || s->dma_cmd == IDE_DMA_WRITE) {
-        block_acct_done(blk_get_stats(s->blk), &s->acct);
+        if (ret < 0) {
+            block_acct_failed(blk_get_stats(s->blk), &s->acct);
+        } else {
+            block_acct_done(blk_get_stats(s->blk), &s->acct);
+        }
     }
     io->dma_end(opaque);
 }
diff --git a/hw/scsi/scsi-disk.c b/hw/scsi/scsi-disk.c
index 707e7349a2..4797d83683 100644
--- a/hw/scsi/scsi-disk.c
+++ b/hw/scsi/scsi-disk.c
@@ -90,7 +90,7 @@ struct SCSIDiskState
     bool tray_locked;
 };
 
-static int scsi_handle_rw_error(SCSIDiskReq *r, int error);
+static int scsi_handle_rw_error(SCSIDiskReq *r, int error, bool acct_failed);
 
 static void scsi_free_request(SCSIRequest *req)
 {
@@ -169,18 +169,18 @@ static void scsi_aio_complete(void *opaque, int ret)
 
     assert(r->req.aiocb != NULL);
     r->req.aiocb = NULL;
-    block_acct_done(blk_get_stats(s->qdev.conf.blk), &r->acct);
     if (r->req.io_canceled) {
         scsi_req_cancel_complete(&r->req);
         goto done;
     }
 
     if (ret < 0) {
-        if (scsi_handle_rw_error(r, -ret)) {
+        if (scsi_handle_rw_error(r, -ret, true)) {
             goto done;
         }
     }
 
+    block_acct_done(blk_get_stats(s->qdev.conf.blk), &r->acct);
     scsi_req_complete(&r->req, GOOD);
 
 done:
@@ -247,7 +247,7 @@ static void scsi_dma_complete_noio(SCSIDiskReq *r, int ret)
     }
 
     if (ret < 0) {
-        if (scsi_handle_rw_error(r, -ret)) {
+        if (scsi_handle_rw_error(r, -ret, false)) {
             goto done;
         }
     }
@@ -273,7 +273,11 @@ static void scsi_dma_complete(void *opaque, int ret)
     assert(r->req.aiocb != NULL);
     r->req.aiocb = NULL;
 
-    block_acct_done(blk_get_stats(s->qdev.conf.blk), &r->acct);
+    if (ret < 0) {
+        block_acct_failed(blk_get_stats(s->qdev.conf.blk), &r->acct);
+    } else {
+        block_acct_done(blk_get_stats(s->qdev.conf.blk), &r->acct);
+    }
     scsi_dma_complete_noio(r, ret);
 }
 
@@ -285,18 +289,18 @@ static void scsi_read_complete(void * opaque, int ret)
 
     assert(r->req.aiocb != NULL);
     r->req.aiocb = NULL;
-    block_acct_done(blk_get_stats(s->qdev.conf.blk), &r->acct);
     if (r->req.io_canceled) {
         scsi_req_cancel_complete(&r->req);
         goto done;
     }
 
     if (ret < 0) {
-        if (scsi_handle_rw_error(r, -ret)) {
+        if (scsi_handle_rw_error(r, -ret, true)) {
             goto done;
         }
     }
 
+    block_acct_done(blk_get_stats(s->qdev.conf.blk), &r->acct);
     DPRINTF("Data ready tag=0x%x len=%zd\n", r->req.tag, r->qiov.size);
 
     n = r->qiov.size / 512;
@@ -322,7 +326,7 @@ static void scsi_do_read(SCSIDiskReq *r, int ret)
     }
 
     if (ret < 0) {
-        if (scsi_handle_rw_error(r, -ret)) {
+        if (scsi_handle_rw_error(r, -ret, false)) {
             goto done;
         }
     }
@@ -355,7 +359,11 @@ static void scsi_do_read_cb(void *opaque, int ret)
     assert (r->req.aiocb != NULL);
     r->req.aiocb = NULL;
 
-    block_acct_done(blk_get_stats(s->qdev.conf.blk), &r->acct);
+    if (ret < 0) {
+        block_acct_failed(blk_get_stats(s->qdev.conf.blk), &r->acct);
+    } else {
+        block_acct_done(blk_get_stats(s->qdev.conf.blk), &r->acct);
+    }
     scsi_do_read(opaque, ret);
 }
 
@@ -407,7 +415,7 @@ static void scsi_read_data(SCSIRequest *req)
  * scsi_handle_rw_error always manages its reference counts, independent
  * of the return value.
  */
-static int scsi_handle_rw_error(SCSIDiskReq *r, int error)
+static int scsi_handle_rw_error(SCSIDiskReq *r, int error, bool acct_failed)
 {
     bool is_read = (r->req.cmd.mode == SCSI_XFER_FROM_DEV);
     SCSIDiskState *s = DO_UPCAST(SCSIDiskState, qdev, r->req.dev);
@@ -415,6 +423,9 @@ static int scsi_handle_rw_error(SCSIDiskReq *r, int error)
                                                    is_read, error);
 
     if (action == BLOCK_ERROR_ACTION_REPORT) {
+        if (acct_failed) {
+            block_acct_failed(blk_get_stats(s->qdev.conf.blk), &r->acct);
+        }
         switch (error) {
         case ENOMEDIUM:
             scsi_check_condition(r, SENSE_CODE(NO_MEDIUM));
@@ -452,7 +463,7 @@ static void scsi_write_complete_noio(SCSIDiskReq *r, int ret)
     }
 
     if (ret < 0) {
-        if (scsi_handle_rw_error(r, -ret)) {
+        if (scsi_handle_rw_error(r, -ret, false)) {
             goto done;
         }
     }
@@ -481,7 +492,11 @@ static void scsi_write_complete(void * opaque, int ret)
     assert (r->req.aiocb != NULL);
     r->req.aiocb = NULL;
 
-    block_acct_done(blk_get_stats(s->qdev.conf.blk), &r->acct);
+    if (ret < 0) {
+        block_acct_failed(blk_get_stats(s->qdev.conf.blk), &r->acct);
+    } else {
+        block_acct_done(blk_get_stats(s->qdev.conf.blk), &r->acct);
+    }
     scsi_write_complete_noio(r, ret);
 }
 
@@ -1592,7 +1607,7 @@ static void scsi_unmap_complete_noio(UnmapCBData *data, int ret)
     }
 
     if (ret < 0) {
-        if (scsi_handle_rw_error(r, -ret)) {
+        if (scsi_handle_rw_error(r, -ret, false)) {
             goto done;
         }
     }
@@ -1696,18 +1711,19 @@ static void scsi_write_same_complete(void *opaque, int ret)
 
     assert(r->req.aiocb != NULL);
     r->req.aiocb = NULL;
-    block_acct_done(blk_get_stats(s->qdev.conf.blk), &r->acct);
     if (r->req.io_canceled) {
         scsi_req_cancel_complete(&r->req);
         goto done;
     }
 
     if (ret < 0) {
-        if (scsi_handle_rw_error(r, -ret)) {
+        if (scsi_handle_rw_error(r, -ret, true)) {
             goto done;
         }
     }
 
+    block_acct_done(blk_get_stats(s->qdev.conf.blk), &r->acct);
+
     data->nb_sectors -= data->iov.iov_len / 512;
     data->sector += data->iov.iov_len / 512;
     data->iov.iov_len = MIN(data->nb_sectors * 512, data->iov.iov_len);
diff --git a/include/block/accounting.h b/include/block/accounting.h
index 66637cdfed..0f46cb4ec1 100644
--- a/include/block/accounting.h
+++ b/include/block/accounting.h
@@ -2,6 +2,7 @@
  * QEMU System Emulator block accounting
  *
  * Copyright (c) 2011 Christoph Hellwig
+ * Copyright (c) 2015 Igalia, S.L.
  *
  * Permission is hereby granted, free of charge, to any person obtaining a copy
  * of this software and associated documentation files (the "Software"), to deal
@@ -25,8 +26,12 @@
 #define BLOCK_ACCOUNTING_H
 
 #include <stdint.h>
+#include <stdbool.h>
 
 #include "qemu/typedefs.h"
+#include "qemu/timed-average.h"
+
+typedef struct BlockAcctTimedStats BlockAcctTimedStats;
 
 enum BlockAcctType {
     BLOCK_ACCT_READ,
@@ -35,11 +40,23 @@ enum BlockAcctType {
     BLOCK_MAX_IOTYPE,
 };
 
+struct BlockAcctTimedStats {
+    TimedAverage latency[BLOCK_MAX_IOTYPE];
+    unsigned interval_length; /* in seconds */
+    QSLIST_ENTRY(BlockAcctTimedStats) entries;
+};
+
 typedef struct BlockAcctStats {
     uint64_t nr_bytes[BLOCK_MAX_IOTYPE];
     uint64_t nr_ops[BLOCK_MAX_IOTYPE];
+    uint64_t invalid_ops[BLOCK_MAX_IOTYPE];
+    uint64_t failed_ops[BLOCK_MAX_IOTYPE];
     uint64_t total_time_ns[BLOCK_MAX_IOTYPE];
     uint64_t merged[BLOCK_MAX_IOTYPE];
+    int64_t last_access_time_ns;
+    QSLIST_HEAD(, BlockAcctTimedStats) intervals;
+    bool account_invalid;
+    bool account_failed;
 } BlockAcctStats;
 
 typedef struct BlockAcctCookie {
@@ -48,10 +65,21 @@ typedef struct BlockAcctCookie {
     enum BlockAcctType type;
 } BlockAcctCookie;
 
+void block_acct_init(BlockAcctStats *stats, bool account_invalid,
+                     bool account_failed);
+void block_acct_cleanup(BlockAcctStats *stats);
+void block_acct_add_interval(BlockAcctStats *stats, unsigned interval_length);
+BlockAcctTimedStats *block_acct_interval_next(BlockAcctStats *stats,
+                                              BlockAcctTimedStats *s);
 void block_acct_start(BlockAcctStats *stats, BlockAcctCookie *cookie,
                       int64_t bytes, enum BlockAcctType type);
 void block_acct_done(BlockAcctStats *stats, BlockAcctCookie *cookie);
+void block_acct_failed(BlockAcctStats *stats, BlockAcctCookie *cookie);
+void block_acct_invalid(BlockAcctStats *stats, enum BlockAcctType type);
 void block_acct_merge_done(BlockAcctStats *stats, enum BlockAcctType type,
                            int num_requests);
+int64_t block_acct_idle_time_ns(BlockAcctStats *stats);
+double block_acct_queue_depth(BlockAcctTimedStats *stats,
+                              enum BlockAcctType type);
 
 #endif
diff --git a/include/block/block.h b/include/block/block.h
index 610db923d5..73edb1a79c 100644
--- a/include/block/block.h
+++ b/include/block/block.h
@@ -14,6 +14,7 @@ typedef struct BlockDriver BlockDriver;
 typedef struct BlockJob BlockJob;
 typedef struct BdrvChild BdrvChild;
 typedef struct BdrvChildRole BdrvChildRole;
+typedef struct BlockJobTxn BlockJobTxn;
 
 typedef struct BlockDriverInfo {
     /* in bytes, 0 if irrelevant */
@@ -335,10 +336,18 @@ void bdrv_aio_cancel_async(BlockAIOCB *acb);
 
 typedef struct BlockRequest {
     /* Fields to be filled by multiwrite caller */
-    int64_t sector;
-    int nb_sectors;
-    int flags;
-    QEMUIOVector *qiov;
+    union {
+        struct {
+            int64_t sector;
+            int nb_sectors;
+            int flags;
+            QEMUIOVector *qiov;
+        };
+        struct {
+            int req;
+            void *buf;
+        };
+    };
     BlockCompletionFunc *cb;
     void *opaque;
 
@@ -493,7 +502,6 @@ void bdrv_set_dirty_bitmap(BdrvDirtyBitmap *bitmap,
                            int64_t cur_sector, int nr_sectors);
 void bdrv_reset_dirty_bitmap(BdrvDirtyBitmap *bitmap,
                              int64_t cur_sector, int nr_sectors);
-void bdrv_clear_dirty_bitmap(BdrvDirtyBitmap *bitmap);
 void bdrv_dirty_iter_init(BdrvDirtyBitmap *bitmap, struct HBitmapIter *hbi);
 void bdrv_set_dirty_iter(struct HBitmapIter *hbi, int64_t offset);
 int64_t bdrv_get_dirty_count(BdrvDirtyBitmap *bitmap);
diff --git a/include/block/block_int.h b/include/block/block_int.h
index 603145a21d..4012e36437 100644
--- a/include/block/block_int.h
+++ b/include/block/block_int.h
@@ -60,11 +60,19 @@
 
 #define BLOCK_PROBE_BUF_SIZE        512
 
+enum BdrvTrackedRequestType {
+    BDRV_TRACKED_READ,
+    BDRV_TRACKED_WRITE,
+    BDRV_TRACKED_FLUSH,
+    BDRV_TRACKED_IOCTL,
+    BDRV_TRACKED_DISCARD,
+};
+
 typedef struct BdrvTrackedRequest {
     BlockDriverState *bs;
     int64_t offset;
     unsigned int bytes;
-    bool is_write;
+    enum BdrvTrackedRequestType type;
 
     bool serialising;
     int64_t overlap_offset;
@@ -219,7 +227,6 @@ struct BlockDriver {
     void (*bdrv_lock_medium)(BlockDriverState *bs, bool locked);
 
     /* to control generic scsi devices */
-    int (*bdrv_ioctl)(BlockDriverState *bs, unsigned long int req, void *buf);
     BlockAIOCB *(*bdrv_aio_ioctl)(BlockDriverState *bs,
         unsigned long int req, void *buf,
         BlockCompletionFunc *cb, void *opaque);
@@ -288,6 +295,12 @@ struct BlockDriver {
      */
     int (*bdrv_probe_geometry)(BlockDriverState *bs, HDGeometry *geo);
 
+    /**
+     * Drain and stop any internal sources of requests in the driver, and
+     * remain so until next I/O callback (e.g. bdrv_co_writev) is called.
+     */
+    void (*bdrv_drain)(BlockDriverState *bs);
+
     QLIST_ENTRY(BlockDriver) list;
 };
 
@@ -656,6 +669,7 @@ void mirror_start(BlockDriverState *bs, BlockDriverState *target,
  * @on_target_error: The action to take upon error writing to the target.
  * @cb: Completion function for the job.
  * @opaque: Opaque pointer value passed to @cb.
+ * @txn: Transaction that this job is part of (may be NULL).
  *
  * Start a backup operation on @bs.  Clusters in @bs are written to @target
  * until the job is cancelled or manually completed.
@@ -666,7 +680,7 @@ void backup_start(BlockDriverState *bs, BlockDriverState *target,
                   BlockdevOnError on_source_error,
                   BlockdevOnError on_target_error,
                   BlockCompletionFunc *cb, void *opaque,
-                  Error **errp);
+                  BlockJobTxn *txn, Error **errp);
 
 void blk_set_bs(BlockBackend *blk, BlockDriverState *bs);
 
@@ -680,4 +694,7 @@ void blk_dev_resize_cb(BlockBackend *blk);
 void bdrv_set_dirty(BlockDriverState *bs, int64_t cur_sector, int nr_sectors);
 bool bdrv_requests_pending(BlockDriverState *bs);
 
+void bdrv_clear_dirty_bitmap(BdrvDirtyBitmap *bitmap, HBitmap **out);
+void bdrv_undo_clear_dirty_bitmap(BdrvDirtyBitmap *bitmap, HBitmap *in);
+
 #endif /* BLOCK_INT_H */
diff --git a/include/block/blockjob.h b/include/block/blockjob.h
index 289b13f0c0..d84ccd8d2c 100644
--- a/include/block/blockjob.h
+++ b/include/block/blockjob.h
@@ -50,6 +50,26 @@ typedef struct BlockJobDriver {
      * manually.
      */
     void (*complete)(BlockJob *job, Error **errp);
+
+    /**
+     * If the callback is not NULL, it will be invoked when all the jobs
+     * belonging to the same transaction complete; or upon this job's
+     * completion if it is not in a transaction. Skipped if NULL.
+     *
+     * All jobs will complete with a call to either .commit() or .abort() but
+     * never both.
+     */
+    void (*commit)(BlockJob *job);
+
+    /**
+     * If the callback is not NULL, it will be invoked when any job in the
+     * same transaction fails; or upon this job's failure (due to error or
+     * cancellation) if it is not in a transaction. Skipped if NULL.
+     *
+     * All jobs will complete with a call to either .commit() or .abort() but
+     * never both.
+     */
+    void (*abort)(BlockJob *job);
 } BlockJobDriver;
 
 /**
@@ -130,6 +150,21 @@ struct BlockJob {
 
     /** The opaque value that is passed to the completion function.  */
     void *opaque;
+
+    /** Reference count of the block job */
+    int refcnt;
+
+    /* True if this job has reported completion by calling block_job_completed.
+     */
+    bool completed;
+
+    /* ret code passed to block_job_completed.
+     */
+    int ret;
+
+    /** Non-NULL if this job is part of a transaction */
+    BlockJobTxn *txn;
+    QLIST_ENTRY(BlockJob) txn_list;
 };
 
 /**
@@ -174,12 +209,21 @@ void block_job_sleep_ns(BlockJob *job, QEMUClockType type, int64_t ns);
 void block_job_yield(BlockJob *job);
 
 /**
- * block_job_release:
+ * block_job_ref:
  * @bs: The block device.
  *
- * Release job resources when an error occurred or job completed.
+ * Grab a reference to the block job. Should be paired with block_job_unref.
  */
-void block_job_release(BlockDriverState *bs);
+void block_job_ref(BlockJob *job);
+
+/**
+ * block_job_unref:
+ * @bs: The block device.
+ *
+ * Release reference to the block job and release resources if it is the last
+ * reference.
+ */
+void block_job_unref(BlockJob *job);
 
 /**
  * block_job_completed:
@@ -364,4 +408,39 @@ void block_job_defer_to_main_loop(BlockJob *job,
                                   BlockJobDeferToMainLoopFn *fn,
                                   void *opaque);
 
+/**
+ * block_job_txn_new:
+ *
+ * Allocate and return a new block job transaction.  Jobs can be added to the
+ * transaction using block_job_txn_add_job().
+ *
+ * The transaction is automatically freed when the last job completes or is
+ * cancelled.
+ *
+ * All jobs in the transaction either complete successfully or fail/cancel as a
+ * group.  Jobs wait for each other before completing.  Cancelling one job
+ * cancels all jobs in the transaction.
+ */
+BlockJobTxn *block_job_txn_new(void);
+
+/**
+ * block_job_txn_unref:
+ *
+ * Release a reference that was previously acquired with block_job_txn_add_job
+ * or block_job_txn_new. If it's the last reference to the object, it will be
+ * freed.
+ */
+void block_job_txn_unref(BlockJobTxn *txn);
+
+/**
+ * block_job_txn_add_job:
+ * @txn: The transaction (may be NULL)
+ * @job: Job to add to the transaction
+ *
+ * Add @job to the transaction.  The @job must not already be in a transaction.
+ * The caller must call either block_job_txn_unref() or block_job_completed()
+ * to release the reference that is automatically grabbed here.
+ */
+void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job);
+
 #endif
diff --git a/include/qemu/timed-average.h b/include/qemu/timed-average.h
new file mode 100644
index 0000000000..364bf88f70
--- /dev/null
+++ b/include/qemu/timed-average.h
@@ -0,0 +1,64 @@
+/*
+ * QEMU timed average computation
+ *
+ * Copyright (C) Nodalink, EURL. 2014
+ * Copyright (C) Igalia, S.L. 2015
+ *
+ * Authors:
+ *   Benoît Canet <benoit.canet@nodalink.com>
+ *   Alberto Garcia <berto@igalia.com>
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) version 3 or any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef TIMED_AVERAGE_H
+#define TIMED_AVERAGE_H
+
+#include <stdint.h>
+
+#include "qemu/timer.h"
+
+typedef struct TimedAverageWindow TimedAverageWindow;
+typedef struct TimedAverage TimedAverage;
+
+/* All fields of both structures are private */
+
+struct TimedAverageWindow {
+    uint64_t      min;             /* minimum value accounted in the window */
+    uint64_t      max;             /* maximum value accounted in the window */
+    uint64_t      sum;             /* sum of all values */
+    uint64_t      count;           /* number of values */
+    int64_t       expiration;      /* the end of the current window in ns */
+};
+
+struct TimedAverage {
+    uint64_t           period;     /* period in nanoseconds */
+    TimedAverageWindow windows[2]; /* two overlapping windows of with
+                                    * an offset of period / 2 between them */
+    unsigned           current;    /* the current window index: it's also the
+                                    * oldest window index */
+    QEMUClockType      clock_type; /* the clock used */
+};
+
+void timed_average_init(TimedAverage *ta, QEMUClockType clock_type,
+                        uint64_t period);
+
+void timed_average_account(TimedAverage *ta, uint64_t value);
+
+uint64_t timed_average_min(TimedAverage *ta);
+uint64_t timed_average_avg(TimedAverage *ta);
+uint64_t timed_average_max(TimedAverage *ta);
+uint64_t timed_average_sum(TimedAverage *ta, uint64_t *elapsed);
+
+#endif
diff --git a/qapi-schema.json b/qapi-schema.json
index c3f95ab170..36e59b8932 100644
--- a/qapi-schema.json
+++ b/qapi-schema.json
@@ -1534,6 +1534,26 @@
   'data': { } }
 
 ##
+# @ActionCompletionMode
+#
+# An enumeration of Transactional completion modes.
+#
+# @individual: Do not attempt to cancel any other Actions if any Actions fail
+#              after the Transaction request succeeds. All Actions that
+#              can complete successfully will do so without waiting on others.
+#              This is the default.
+#
+# @grouped: If any Action fails after the Transaction succeeds, cancel all
+#           Actions. Actions do not complete until all Actions are ready to
+#           complete. May be rejected by Actions that do not support this
+#           completion mode.
+#
+# Since: 2.5
+##
+{ 'enum': 'ActionCompletionMode',
+  'data': [ 'individual', 'grouped' ] }
+
+##
 # @TransactionAction
 #
 # A discriminated record of operations that can be performed with
@@ -1546,6 +1566,8 @@
 # blockdev-snapshot-internal-sync since 1.7
 # blockdev-backup since 2.3
 # blockdev-snapshot since 2.5
+# block-dirty-bitmap-add since 2.5
+# block-dirty-bitmap-clear since 2.5
 ##
 { 'union': 'TransactionAction',
   'data': {
@@ -1554,18 +1576,41 @@
        'drive-backup': 'DriveBackup',
        'blockdev-backup': 'BlockdevBackup',
        'abort': 'Abort',
-       'blockdev-snapshot-internal-sync': 'BlockdevSnapshotInternal'
+       'blockdev-snapshot-internal-sync': 'BlockdevSnapshotInternal',
+       'block-dirty-bitmap-add': 'BlockDirtyBitmapAdd',
+       'block-dirty-bitmap-clear': 'BlockDirtyBitmap'
    } }
 
 ##
+# @TransactionProperties
+#
+# Optional arguments to modify the behavior of a Transaction.
+#
+# @completion-mode: #optional Controls how jobs launched asynchronously by
+#                   Actions will complete or fail as a group.
+#                   See @ActionCompletionMode for details.
+#
+# Since: 2.5
+##
+{ 'struct': 'TransactionProperties',
+  'data': {
+       '*completion-mode': 'ActionCompletionMode'
+  }
+}
+
+##
 # @transaction
 #
 # Executes a number of transactionable QMP commands atomically. If any
 # operation fails, then the entire set of actions will be abandoned and the
 # appropriate error returned.
 #
-#  List of:
-#  @TransactionAction: information needed for the respective operation
+# @actions: List of @TransactionAction;
+#           information needed for the respective operations.
+#
+# @properties: #optional structure of additional options to control the
+#              execution of the transaction. See @TransactionProperties
+#              for additional detail.
 #
 # Returns: nothing on success
 #          Errors depend on the operations of the transaction
@@ -1577,7 +1622,10 @@
 # Since 1.1
 ##
 { 'command': 'transaction',
-  'data': { 'actions': [ 'TransactionAction' ] } }
+  'data': { 'actions': [ 'TransactionAction' ],
+            '*properties': 'TransactionProperties'
+          }
+}
 
 ##
 # @human-monitor-command:
diff --git a/qapi/block-core.json b/qapi/block-core.json
index 470e86c6df..f97c250ce9 100644
--- a/qapi/block-core.json
+++ b/qapi/block-core.json
@@ -414,6 +414,59 @@
 ##
 { 'command': 'query-block', 'returns': ['BlockInfo'] }
 
+
+##
+# @BlockDeviceTimedStats:
+#
+# Statistics of a block device during a given interval of time.
+#
+# @interval_length: Interval used for calculating the statistics,
+#                   in seconds.
+#
+# @min_rd_latency_ns: Minimum latency of read operations in the
+#                     defined interval, in nanoseconds.
+#
+# @min_wr_latency_ns: Minimum latency of write operations in the
+#                     defined interval, in nanoseconds.
+#
+# @min_flush_latency_ns: Minimum latency of flush operations in the
+#                        defined interval, in nanoseconds.
+#
+# @max_rd_latency_ns: Maximum latency of read operations in the
+#                     defined interval, in nanoseconds.
+#
+# @max_wr_latency_ns: Maximum latency of write operations in the
+#                     defined interval, in nanoseconds.
+#
+# @max_flush_latency_ns: Maximum latency of flush operations in the
+#                        defined interval, in nanoseconds.
+#
+# @avg_rd_latency_ns: Average latency of read operations in the
+#                     defined interval, in nanoseconds.
+#
+# @avg_wr_latency_ns: Average latency of write operations in the
+#                     defined interval, in nanoseconds.
+#
+# @avg_flush_latency_ns: Average latency of flush operations in the
+#                        defined interval, in nanoseconds.
+#
+# @avg_rd_queue_depth: Average number of pending read operations
+#                      in the defined interval.
+#
+# @avg_wr_queue_depth: Average number of pending write operations
+#                      in the defined interval.
+#
+# Since: 2.5
+##
+
+{ 'struct': 'BlockDeviceTimedStats',
+  'data': { 'interval_length': 'int', 'min_rd_latency_ns': 'int',
+            'max_rd_latency_ns': 'int', 'avg_rd_latency_ns': 'int',
+            'min_wr_latency_ns': 'int', 'max_wr_latency_ns': 'int',
+            'avg_wr_latency_ns': 'int', 'min_flush_latency_ns': 'int',
+            'max_flush_latency_ns': 'int', 'avg_flush_latency_ns': 'int',
+            'avg_rd_queue_depth': 'number', 'avg_wr_queue_depth': 'number' } }
+
 ##
 # @BlockDeviceStats:
 #
@@ -448,6 +501,37 @@
 # @wr_merged: Number of write requests that have been merged into another
 #             request (Since 2.3).
 #
+# @idle_time_ns: #optional Time since the last I/O operation, in
+#                nanoseconds. If the field is absent it means that
+#                there haven't been any operations yet (Since 2.5).
+#
+# @failed_rd_operations: The number of failed read operations
+#                        performed by the device (Since 2.5)
+#
+# @failed_wr_operations: The number of failed write operations
+#                        performed by the device (Since 2.5)
+#
+# @failed_flush_operations: The number of failed flush operations
+#                           performed by the device (Since 2.5)
+#
+# @invalid_rd_operations: The number of invalid read operations
+#                          performed by the device (Since 2.5)
+#
+# @invalid_wr_operations: The number of invalid write operations
+#                         performed by the device (Since 2.5)
+#
+# @invalid_flush_operations: The number of invalid flush operations
+#                            performed by the device (Since 2.5)
+#
+# @account_invalid: Whether invalid operations are included in the
+#                   last access statistics (Since 2.5)
+#
+# @account_failed: Whether failed operations are included in the
+#                  latency and last access statistics (Since 2.5)
+#
+# @timed_stats: Statistics specific to the set of previously defined
+#               intervals of time (Since 2.5)
+#
 # Since: 0.14.0
 ##
 { 'struct': 'BlockDeviceStats',
@@ -455,7 +539,12 @@
            'wr_operations': 'int', 'flush_operations': 'int',
            'flush_total_time_ns': 'int', 'wr_total_time_ns': 'int',
            'rd_total_time_ns': 'int', 'wr_highest_offset': 'int',
-           'rd_merged': 'int', 'wr_merged': 'int' } }
+           'rd_merged': 'int', 'wr_merged': 'int', '*idle_time_ns': 'int',
+           'failed_rd_operations': 'int', 'failed_wr_operations': 'int',
+           'failed_flush_operations': 'int', 'invalid_rd_operations': 'int',
+           'invalid_wr_operations': 'int', 'invalid_flush_operations': 'int',
+           'account_invalid': 'bool', 'account_failed': 'bool',
+           'timed_stats': ['BlockDeviceTimedStats'] } }
 
 ##
 # @BlockStats:
@@ -1436,6 +1525,15 @@
 #                 (default: enospc)
 # @read-only:     #optional whether the block device should be read-only
 #                 (default: false)
+# @stats-account-invalid: #optional whether to include invalid
+#                         operations when computing last access statistics
+#                         (default: true) (Since 2.5)
+# @stats-account-failed: #optional whether to include failed
+#                         operations when computing latency and last
+#                         access statistics (default: true) (Since 2.5)
+# @stats-intervals: #optional colon-separated list of intervals for
+#                   collecting I/O statistics, in seconds (default: none)
+#                   (Since 2.5)
 # @detect-zeroes: #optional detect and optimize zero writes (Since 2.1)
 #                 (default: off)
 #
@@ -1451,6 +1549,9 @@
             '*rerror': 'BlockdevOnError',
             '*werror': 'BlockdevOnError',
             '*read-only': 'bool',
+            '*stats-account-invalid': 'bool',
+            '*stats-account-failed': 'bool',
+            '*stats-intervals': 'str',
             '*detect-zeroes': 'BlockdevDetectZeroesOptions' } }
 
 ##
diff --git a/qemu-img.c b/qemu-img.c
index 9831db75ef..033011c4e7 100644
--- a/qemu-img.c
+++ b/qemu-img.c
@@ -645,9 +645,6 @@ static void common_block_job_cb(void *opaque, int ret)
     if (ret < 0) {
         error_setg_errno(cbi->errp, -ret, "Block job failed");
     }
-
-    /* Drop this block job's reference */
-    bdrv_unref(cbi->bs);
 }
 
 static void run_block_job(BlockJob *job, Error **errp)
diff --git a/qemu-io-cmds.c b/qemu-io-cmds.c
index 9c77aafb99..18fc2bdc10 100644
--- a/qemu-io-cmds.c
+++ b/qemu-io-cmds.c
@@ -1428,6 +1428,7 @@ static void aio_write_done(void *opaque, int ret)
 
     if (ret < 0) {
         printf("aio_write failed: %s\n", strerror(-ret));
+        block_acct_failed(blk_get_stats(ctx->blk), &ctx->acct);
         goto out;
     }
 
@@ -1456,6 +1457,7 @@ static void aio_read_done(void *opaque, int ret)
 
     if (ret < 0) {
         printf("readv failed: %s\n", strerror(-ret));
+        block_acct_failed(blk_get_stats(ctx->blk), &ctx->acct);
         goto out;
     }
 
@@ -1569,6 +1571,7 @@ static int aio_read_f(BlockBackend *blk, int argc, char **argv)
     if (ctx->offset & 0x1ff) {
         printf("offset %" PRId64 " is not sector aligned\n",
                ctx->offset);
+        block_acct_invalid(blk_get_stats(blk), BLOCK_ACCT_READ);
         g_free(ctx);
         return 0;
     }
@@ -1576,6 +1579,7 @@ static int aio_read_f(BlockBackend *blk, int argc, char **argv)
     nr_iov = argc - optind;
     ctx->buf = create_iovec(blk, &ctx->qiov, &argv[optind], nr_iov, 0xab);
     if (ctx->buf == NULL) {
+        block_acct_invalid(blk_get_stats(blk), BLOCK_ACCT_READ);
         g_free(ctx);
         return 0;
     }
@@ -1664,6 +1668,7 @@ static int aio_write_f(BlockBackend *blk, int argc, char **argv)
     if (ctx->offset & 0x1ff) {
         printf("offset %" PRId64 " is not sector aligned\n",
                ctx->offset);
+        block_acct_invalid(blk_get_stats(blk), BLOCK_ACCT_WRITE);
         g_free(ctx);
         return 0;
     }
@@ -1671,6 +1676,7 @@ static int aio_write_f(BlockBackend *blk, int argc, char **argv)
     nr_iov = argc - optind;
     ctx->buf = create_iovec(blk, &ctx->qiov, &argv[optind], nr_iov, pattern);
     if (ctx->buf == NULL) {
+        block_acct_invalid(blk_get_stats(blk), BLOCK_ACCT_WRITE);
         g_free(ctx);
         return 0;
     }
@@ -1685,7 +1691,10 @@ static int aio_write_f(BlockBackend *blk, int argc, char **argv)
 
 static int aio_flush_f(BlockBackend *blk, int argc, char **argv)
 {
+    BlockAcctCookie cookie;
+    block_acct_start(blk_get_stats(blk), &cookie, 0, BLOCK_ACCT_FLUSH);
     blk_drain_all();
+    block_acct_done(blk_get_stats(blk), &cookie);
     return 0;
 }
 
diff --git a/qmp-commands.hx b/qmp-commands.hx
index 02c0c5bc42..9d8b42f59a 100644
--- a/qmp-commands.hx
+++ b/qmp-commands.hx
@@ -1281,7 +1281,7 @@ EQMP
     },
     {
         .name       = "transaction",
-        .args_type  = "actions:q",
+        .args_type  = "actions:q,properties:q?",
         .mhandler.cmd_new = qmp_marshal_transaction,
     },
 
@@ -2583,6 +2583,64 @@ Each json-object contain the following:
                    another request (json-int)
     - "wr_merged": number of write requests that have been merged into
                    another request (json-int)
+    - "idle_time_ns": time since the last I/O operation, in
+                      nanoseconds. If the field is absent it means
+                      that there haven't been any operations yet
+                      (json-int, optional)
+    - "failed_rd_operations": number of failed read operations
+                              (json-int)
+    - "failed_wr_operations": number of failed write operations
+                              (json-int)
+    - "failed_flush_operations": number of failed flush operations
+                               (json-int)
+    - "invalid_rd_operations": number of invalid read operations
+                               (json-int)
+    - "invalid_wr_operations": number of invalid write operations
+                               (json-int)
+    - "invalid_flush_operations": number of invalid flush operations
+                                  (json-int)
+    - "account_invalid": whether invalid operations are included in
+                         the last access statistics (json-bool)
+    - "account_failed": whether failed operations are included in the
+                         latency and last access statistics
+                         (json-bool)
+    - "timed_stats": A json-array containing statistics collected in
+                     specific intervals, with the following members:
+        - "interval_length": interval used for calculating the
+                             statistics, in seconds (json-int)
+        - "min_rd_latency_ns": minimum latency of read operations in
+                               the defined interval, in nanoseconds
+                               (json-int)
+        - "min_wr_latency_ns": minimum latency of write operations in
+                               the defined interval, in nanoseconds
+                               (json-int)
+        - "min_flush_latency_ns": minimum latency of flush operations
+                                  in the defined interval, in
+                                  nanoseconds (json-int)
+        - "max_rd_latency_ns": maximum latency of read operations in
+                               the defined interval, in nanoseconds
+                               (json-int)
+        - "max_wr_latency_ns": maximum latency of write operations in
+                               the defined interval, in nanoseconds
+                               (json-int)
+        - "max_flush_latency_ns": maximum latency of flush operations
+                                  in the defined interval, in
+                                  nanoseconds (json-int)
+        - "avg_rd_latency_ns": average latency of read operations in
+                               the defined interval, in nanoseconds
+                               (json-int)
+        - "avg_wr_latency_ns": average latency of write operations in
+                               the defined interval, in nanoseconds
+                               (json-int)
+        - "avg_flush_latency_ns": average latency of flush operations
+                                  in the defined interval, in
+                                  nanoseconds (json-int)
+        - "avg_rd_queue_depth": average number of pending read
+                                operations in the defined interval
+                                (json-number)
+        - "avg_wr_queue_depth": average number of pending write
+                                operations in the defined interval
+                                (json-number).
 - "parent": Contains recursively the statistics of the underlying
             protocol (e.g. the host file for a qcow2 image). If there is
             no underlying protocol, this field is omitted
@@ -2607,7 +2665,10 @@ Example:
                   "flush_total_times_ns":49653
                   "flush_operations":61,
                   "rd_merged":0,
-                  "wr_merged":0
+                  "wr_merged":0,
+                  "idle_time_ns":2953431879,
+                  "account_invalid":true,
+                  "account_failed":false
                }
             },
             "stats":{
@@ -2621,7 +2682,10 @@ Example:
                "rd_total_times_ns":3465673657
                "flush_total_times_ns":49653,
                "rd_merged":0,
-               "wr_merged":0
+               "wr_merged":0,
+               "idle_time_ns":2953431879,
+               "account_invalid":true,
+               "account_failed":false
             }
          },
          {
@@ -2637,7 +2701,9 @@ Example:
                "rd_total_times_ns":0
                "flush_total_times_ns":0,
                "rd_merged":0,
-               "wr_merged":0
+               "wr_merged":0,
+               "account_invalid":false,
+               "account_failed":false
             }
          },
          {
@@ -2653,7 +2719,9 @@ Example:
                "rd_total_times_ns":0
                "flush_total_times_ns":0,
                "rd_merged":0,
-               "wr_merged":0
+               "wr_merged":0,
+               "account_invalid":false,
+               "account_failed":false
             }
          },
          {
@@ -2669,7 +2737,9 @@ Example:
                "rd_total_times_ns":0
                "flush_total_times_ns":0,
                "rd_merged":0,
-               "wr_merged":0
+               "wr_merged":0,
+               "account_invalid":false,
+               "account_failed":false
             }
          }
       ]
diff --git a/tests/Makefile b/tests/Makefile
index 92969e8288..90c4141ac5 100644
--- a/tests/Makefile
+++ b/tests/Makefile
@@ -47,6 +47,8 @@ check-unit-y += tests/test-thread-pool$(EXESUF)
 gcov-files-test-thread-pool-y = thread-pool.c
 gcov-files-test-hbitmap-y = util/hbitmap.c
 check-unit-y += tests/test-hbitmap$(EXESUF)
+gcov-files-test-hbitmap-y = blockjob.c
+check-unit-y += tests/test-blockjob-txn$(EXESUF)
 check-unit-y += tests/test-x86-cpuid$(EXESUF)
 # all code tested by test-x86-cpuid is inside topology.h
 gcov-files-test-x86-cpuid-y =
@@ -81,6 +83,7 @@ check-unit-y += tests/test-crypto-cipher$(EXESUF)
 check-unit-$(CONFIG_GNUTLS) += tests/test-crypto-tlscredsx509$(EXESUF)
 check-unit-$(CONFIG_GNUTLS) += tests/test-crypto-tlssession$(EXESUF)
 check-unit-$(CONFIG_LINUX) += tests/test-qga$(EXESUF)
+check-unit-y += tests/test-timed-average$(EXESUF)
 
 check-block-$(CONFIG_POSIX) += tests/qemu-iotests-quick.sh
 
@@ -390,6 +393,7 @@ tests/test-coroutine$(EXESUF): tests/test-coroutine.o $(test-block-obj-y)
 tests/test-aio$(EXESUF): tests/test-aio.o $(test-block-obj-y)
 tests/test-rfifolock$(EXESUF): tests/test-rfifolock.o $(test-util-obj-y)
 tests/test-throttle$(EXESUF): tests/test-throttle.o $(test-block-obj-y)
+tests/test-blockjob-txn$(EXESUF): tests/test-blockjob-txn.o $(test-block-obj-y) $(test-util-obj-y)
 tests/test-thread-pool$(EXESUF): tests/test-thread-pool.o $(test-block-obj-y)
 tests/test-iov$(EXESUF): tests/test-iov.o $(test-util-obj-y)
 tests/test-hbitmap$(EXESUF): tests/test-hbitmap.o $(test-util-obj-y)
@@ -409,6 +413,9 @@ tests/test-vmstate$(EXESUF): tests/test-vmstate.o \
 	migration/vmstate.o migration/qemu-file.o migration/qemu-file-buf.o \
         migration/qemu-file-unix.o qjson.o \
 	$(test-qom-obj-y)
+tests/test-timed-average$(EXESUF): tests/test-timed-average.o qemu-timer.o \
+	libqemuutil.a stubs/clock-warp.o stubs/cpu-get-icount.o \
+	stubs/notify-event.o stubs/replay.o
 
 tests/test-qapi-types.c tests/test-qapi-types.h :\
 $(SRC_PATH)/tests/qapi-schema/qapi-schema-test.json $(SRC_PATH)/scripts/qapi-types.py $(qapi-py)
diff --git a/tests/qemu-iotests/124 b/tests/qemu-iotests/124
index 9ccd11809f..c928f0101b 100644
--- a/tests/qemu-iotests/124
+++ b/tests/qemu-iotests/124
@@ -36,6 +36,23 @@ def try_remove(img):
         pass
 
 
+def transaction_action(action, **kwargs):
+    return {
+        'type': action,
+        'data': dict((k.replace('_', '-'), v) for k, v in kwargs.iteritems())
+    }
+
+
+def transaction_bitmap_clear(node, name, **kwargs):
+    return transaction_action('block-dirty-bitmap-clear',
+                              node=node, name=name, **kwargs)
+
+
+def transaction_drive_backup(device, target, **kwargs):
+    return transaction_action('drive-backup', device=device, target=target,
+                              **kwargs)
+
+
 class Bitmap:
     def __init__(self, name, drive):
         self.name = name
@@ -122,9 +139,12 @@ class TestIncrementalBackup(iotests.QMPTestCase):
     def do_qmp_backup(self, error='Input/output error', **kwargs):
         res = self.vm.qmp('drive-backup', **kwargs)
         self.assert_qmp(res, 'return', {})
+        return self.wait_qmp_backup(kwargs['device'], error)
+
 
+    def wait_qmp_backup(self, device, error='Input/output error'):
         event = self.vm.event_wait(name="BLOCK_JOB_COMPLETED",
-                                   match={'data': {'device': kwargs['device']}})
+                                   match={'data': {'device': device}})
         self.assertNotEqual(event, None)
 
         try:
@@ -139,6 +159,12 @@ class TestIncrementalBackup(iotests.QMPTestCase):
             return False
 
 
+    def wait_qmp_backup_cancelled(self, device):
+        event = self.vm.event_wait(name='BLOCK_JOB_CANCELLED',
+                                   match={'data': {'device': device}})
+        self.assertNotEqual(event, None)
+
+
     def create_anchor_backup(self, drive=None):
         if drive is None:
             drive = self.drives[-1]
@@ -264,6 +290,43 @@ class TestIncrementalBackup(iotests.QMPTestCase):
         return self.do_incremental_simple(granularity=131072)
 
 
+    def test_incremental_transaction(self):
+        '''Test: Verify backups made from transactionally created bitmaps.
+
+        Create a bitmap "before" VM execution begins, then create a second
+        bitmap AFTER writes have already occurred. Use transactions to create
+        a full backup and synchronize both bitmaps to this backup.
+        Create an incremental backup through both bitmaps and verify that
+        both backups match the current drive0 image.
+        '''
+
+        drive0 = self.drives[0]
+        bitmap0 = self.add_bitmap('bitmap0', drive0)
+        self.hmp_io_writes(drive0['id'], (('0xab', 0, 512),
+                                          ('0xfe', '16M', '256k'),
+                                          ('0x64', '32736k', '64k')))
+        bitmap1 = self.add_bitmap('bitmap1', drive0)
+
+        result = self.vm.qmp('transaction', actions=[
+            transaction_bitmap_clear(bitmap0.drive['id'], bitmap0.name),
+            transaction_bitmap_clear(bitmap1.drive['id'], bitmap1.name),
+            transaction_drive_backup(drive0['id'], drive0['backup'],
+                                     sync='full', format=drive0['fmt'])
+        ])
+        self.assert_qmp(result, 'return', {})
+        self.wait_until_completed(drive0['id'])
+        self.files.append(drive0['backup'])
+
+        self.hmp_io_writes(drive0['id'], (('0x9a', 0, 512),
+                                          ('0x55', '8M', '352k'),
+                                          ('0x78', '15872k', '1M')))
+        # Both bitmaps should be correctly in sync.
+        self.create_incremental(bitmap0)
+        self.create_incremental(bitmap1)
+        self.vm.shutdown()
+        self.check_backups()
+
+
     def test_incremental_failure(self):
         '''Test: Verify backups made after a failure are correct.
 
@@ -321,6 +384,123 @@ class TestIncrementalBackup(iotests.QMPTestCase):
         self.check_backups()
 
 
+    def test_transaction_failure(self):
+        '''Test: Verify backups made from a transaction that partially fails.
+
+        Add a second drive with its own unique pattern, and add a bitmap to each
+        drive. Use blkdebug to interfere with the backup on just one drive and
+        attempt to create a coherent incremental backup across both drives.
+
+        verify a failure in one but not both, then delete the failed stubs and
+        re-run the same transaction.
+
+        verify that both incrementals are created successfully.
+        '''
+
+        # Create a second drive, with pattern:
+        drive1 = self.add_node('drive1')
+        self.img_create(drive1['file'], drive1['fmt'])
+        io_write_patterns(drive1['file'], (('0x14', 0, 512),
+                                           ('0x5d', '1M', '32k'),
+                                           ('0xcd', '32M', '124k')))
+
+        # Create a blkdebug interface to this img as 'drive1'
+        result = self.vm.qmp('blockdev-add', options={
+            'id': drive1['id'],
+            'driver': drive1['fmt'],
+            'file': {
+                'driver': 'blkdebug',
+                'image': {
+                    'driver': 'file',
+                    'filename': drive1['file']
+                },
+                'set-state': [{
+                    'event': 'flush_to_disk',
+                    'state': 1,
+                    'new_state': 2
+                }],
+                'inject-error': [{
+                    'event': 'read_aio',
+                    'errno': 5,
+                    'state': 2,
+                    'immediately': False,
+                    'once': True
+                }],
+            }
+        })
+        self.assert_qmp(result, 'return', {})
+
+        # Create bitmaps and full backups for both drives
+        drive0 = self.drives[0]
+        dr0bm0 = self.add_bitmap('bitmap0', drive0)
+        dr1bm0 = self.add_bitmap('bitmap0', drive1)
+        self.create_anchor_backup(drive0)
+        self.create_anchor_backup(drive1)
+        self.assert_no_active_block_jobs()
+        self.assertFalse(self.vm.get_qmp_events(wait=False))
+
+        # Emulate some writes
+        self.hmp_io_writes(drive0['id'], (('0xab', 0, 512),
+                                          ('0xfe', '16M', '256k'),
+                                          ('0x64', '32736k', '64k')))
+        self.hmp_io_writes(drive1['id'], (('0xba', 0, 512),
+                                          ('0xef', '16M', '256k'),
+                                          ('0x46', '32736k', '64k')))
+
+        # Create incremental backup targets
+        target0 = self.prepare_backup(dr0bm0)
+        target1 = self.prepare_backup(dr1bm0)
+
+        # Ask for a new incremental backup per-each drive,
+        # expecting drive1's backup to fail:
+        transaction = [
+            transaction_drive_backup(drive0['id'], target0, sync='incremental',
+                                     format=drive0['fmt'], mode='existing',
+                                     bitmap=dr0bm0.name),
+            transaction_drive_backup(drive1['id'], target1, sync='incremental',
+                                     format=drive1['fmt'], mode='existing',
+                                     bitmap=dr1bm0.name)
+        ]
+        result = self.vm.qmp('transaction', actions=transaction,
+                             properties={'completion-mode': 'grouped'} )
+        self.assert_qmp(result, 'return', {})
+
+        # Observe that drive0's backup is cancelled and drive1 completes with
+        # an error.
+        self.wait_qmp_backup_cancelled(drive0['id'])
+        self.assertFalse(self.wait_qmp_backup(drive1['id']))
+        error = self.vm.event_wait('BLOCK_JOB_ERROR')
+        self.assert_qmp(error, 'data', {'device': drive1['id'],
+                                        'action': 'report',
+                                        'operation': 'read'})
+        self.assertFalse(self.vm.get_qmp_events(wait=False))
+        self.assert_no_active_block_jobs()
+
+        # Delete drive0's successful target and eliminate our record of the
+        # unsuccessful drive1 target. Then re-run the same transaction.
+        dr0bm0.del_target()
+        dr1bm0.del_target()
+        target0 = self.prepare_backup(dr0bm0)
+        target1 = self.prepare_backup(dr1bm0)
+
+        # Re-run the exact same transaction.
+        result = self.vm.qmp('transaction', actions=transaction,
+                             properties={'completion-mode':'grouped'})
+        self.assert_qmp(result, 'return', {})
+
+        # Both should complete successfully this time.
+        self.assertTrue(self.wait_qmp_backup(drive0['id']))
+        self.assertTrue(self.wait_qmp_backup(drive1['id']))
+        self.make_reference_backup(dr0bm0)
+        self.make_reference_backup(dr1bm0)
+        self.assertFalse(self.vm.get_qmp_events(wait=False))
+        self.assert_no_active_block_jobs()
+
+        # And the images should of course validate.
+        self.vm.shutdown()
+        self.check_backups()
+
+
     def test_sync_dirty_bitmap_missing(self):
         self.assert_no_active_block_jobs()
         self.files.append(self.err_img)
diff --git a/tests/qemu-iotests/124.out b/tests/qemu-iotests/124.out
index 2f7d3902f2..dae404e278 100644
--- a/tests/qemu-iotests/124.out
+++ b/tests/qemu-iotests/124.out
@@ -1,5 +1,5 @@
-.......
+.........
 ----------------------------------------------------------------------
-Ran 7 tests
+Ran 9 tests
 
 OK
diff --git a/tests/qemu-iotests/136 b/tests/qemu-iotests/136
new file mode 100644
index 0000000000..f574d83ff7
--- /dev/null
+++ b/tests/qemu-iotests/136
@@ -0,0 +1,349 @@
+#!/usr/bin/env python
+#
+# Tests for block device statistics
+#
+# Copyright (C) 2015 Igalia, S.L.
+# Author: Alberto Garcia <berto@igalia.com>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+
+import iotests
+import os
+
+interval_length = 10
+nsec_per_sec = 1000000000
+op_latency = nsec_per_sec / 1000 # See qtest_latency_ns in accounting.c
+bad_sector = 8192
+bad_offset = bad_sector * 512
+blkdebug_file = os.path.join(iotests.test_dir, 'blkdebug.conf')
+
+class BlockDeviceStatsTestCase(iotests.QMPTestCase):
+    test_img = "null-aio://"
+    total_rd_bytes = 0
+    total_rd_ops = 0
+    total_wr_bytes = 0
+    total_wr_ops = 0
+    total_wr_merged = 0
+    total_flush_ops = 0
+    failed_rd_ops = 0
+    failed_wr_ops = 0
+    invalid_rd_ops = 0
+    invalid_wr_ops = 0
+    wr_highest_offset = 0
+    account_invalid = False
+    account_failed = False
+
+    def blockstats(self, device):
+        result = self.vm.qmp("query-blockstats")
+        for r in result['return']:
+            if r['device'] == device:
+                return r['stats']
+        raise Exception("Device not found for blockstats: %s" % device)
+
+    def create_blkdebug_file(self):
+        file = open(blkdebug_file, 'w')
+        file.write('''
+[inject-error]
+event = "read_aio"
+errno = "5"
+sector = "%d"
+
+[inject-error]
+event = "write_aio"
+errno = "5"
+sector = "%d"
+''' % (bad_sector, bad_sector))
+        file.close()
+
+    def setUp(self):
+        drive_args = []
+        drive_args.append("stats-intervals=%d" % interval_length)
+        drive_args.append("stats-account-invalid=%s" %
+                          (self.account_invalid and "on" or "off"))
+        drive_args.append("stats-account-failed=%s" %
+                          (self.account_failed and "on" or "off"))
+        self.create_blkdebug_file()
+        self.vm = iotests.VM().add_drive('blkdebug:%s:%s ' %
+                                         (blkdebug_file, self.test_img),
+                                         ','.join(drive_args))
+        self.vm.launch()
+        # Set an initial value for the clock
+        self.vm.qtest("clock_step %d" % nsec_per_sec)
+
+    def tearDown(self):
+        self.vm.shutdown()
+        os.remove(blkdebug_file)
+
+    def accounted_ops(self, read = False, write = False, flush = False):
+        ops = 0
+        if write:
+            ops += self.total_wr_ops
+            if self.account_failed:
+                ops += self.failed_wr_ops
+            if self.account_invalid:
+                ops += self.invalid_wr_ops
+        if read:
+            ops += self.total_rd_ops
+            if self.account_failed:
+                ops += self.failed_rd_ops
+            if self.account_invalid:
+                ops += self.invalid_rd_ops
+        if flush:
+            ops += self.total_flush_ops
+        return ops
+
+    def accounted_latency(self, read = False, write = False, flush = False):
+        latency = 0
+        if write:
+            latency += self.total_wr_ops * op_latency
+            if self.account_failed:
+                latency += self.failed_wr_ops * op_latency
+        if read:
+            latency += self.total_rd_ops * op_latency
+            if self.account_failed:
+                latency += self.failed_rd_ops * op_latency
+        if flush:
+            latency += self.total_flush_ops * op_latency
+        return latency
+
+    def check_values(self):
+        stats = self.blockstats('drive0')
+
+        # Check that the totals match with what we have calculated
+        self.assertEqual(self.total_rd_bytes, stats['rd_bytes'])
+        self.assertEqual(self.total_wr_bytes, stats['wr_bytes'])
+        self.assertEqual(self.total_rd_ops, stats['rd_operations'])
+        self.assertEqual(self.total_wr_ops, stats['wr_operations'])
+        self.assertEqual(self.total_flush_ops, stats['flush_operations'])
+        self.assertEqual(self.wr_highest_offset, stats['wr_highest_offset'])
+        self.assertEqual(self.failed_rd_ops, stats['failed_rd_operations'])
+        self.assertEqual(self.failed_wr_ops, stats['failed_wr_operations'])
+        self.assertEqual(self.invalid_rd_ops, stats['invalid_rd_operations'])
+        self.assertEqual(self.invalid_wr_ops, stats['invalid_wr_operations'])
+        self.assertEqual(self.account_invalid, stats['account_invalid'])
+        self.assertEqual(self.account_failed, stats['account_failed'])
+        self.assertEqual(self.total_wr_merged, stats['wr_merged'])
+
+        # Check that there's exactly one interval with the length we defined
+        self.assertEqual(1, len(stats['timed_stats']))
+        timed_stats = stats['timed_stats'][0]
+        self.assertEqual(interval_length, timed_stats['interval_length'])
+
+        total_rd_latency = self.accounted_latency(read = True)
+        if (total_rd_latency != 0):
+            self.assertEqual(total_rd_latency, stats['rd_total_time_ns'])
+            self.assertEqual(op_latency, timed_stats['min_rd_latency_ns'])
+            self.assertEqual(op_latency, timed_stats['max_rd_latency_ns'])
+            self.assertEqual(op_latency, timed_stats['avg_rd_latency_ns'])
+            self.assertLess(0, timed_stats['avg_rd_queue_depth'])
+        else:
+            self.assertEqual(0, stats['rd_total_time_ns'])
+            self.assertEqual(0, timed_stats['min_rd_latency_ns'])
+            self.assertEqual(0, timed_stats['max_rd_latency_ns'])
+            self.assertEqual(0, timed_stats['avg_rd_latency_ns'])
+            self.assertEqual(0, timed_stats['avg_rd_queue_depth'])
+
+        # min read latency <= avg read latency <= max read latency
+        self.assertLessEqual(timed_stats['min_rd_latency_ns'],
+                             timed_stats['avg_rd_latency_ns'])
+        self.assertLessEqual(timed_stats['avg_rd_latency_ns'],
+                             timed_stats['max_rd_latency_ns'])
+
+        total_wr_latency = self.accounted_latency(write = True)
+        if (total_wr_latency != 0):
+            self.assertEqual(total_wr_latency, stats['wr_total_time_ns'])
+            self.assertEqual(op_latency, timed_stats['min_wr_latency_ns'])
+            self.assertEqual(op_latency, timed_stats['max_wr_latency_ns'])
+            self.assertEqual(op_latency, timed_stats['avg_wr_latency_ns'])
+            self.assertLess(0, timed_stats['avg_wr_queue_depth'])
+        else:
+            self.assertEqual(0, stats['wr_total_time_ns'])
+            self.assertEqual(0, timed_stats['min_wr_latency_ns'])
+            self.assertEqual(0, timed_stats['max_wr_latency_ns'])
+            self.assertEqual(0, timed_stats['avg_wr_latency_ns'])
+            self.assertEqual(0, timed_stats['avg_wr_queue_depth'])
+
+        # min write latency <= avg write latency <= max write latency
+        self.assertLessEqual(timed_stats['min_wr_latency_ns'],
+                             timed_stats['avg_wr_latency_ns'])
+        self.assertLessEqual(timed_stats['avg_wr_latency_ns'],
+                             timed_stats['max_wr_latency_ns'])
+
+        total_flush_latency = self.accounted_latency(flush = True)
+        if (total_flush_latency != 0):
+            self.assertEqual(total_flush_latency, stats['flush_total_time_ns'])
+            self.assertEqual(op_latency, timed_stats['min_flush_latency_ns'])
+            self.assertEqual(op_latency, timed_stats['max_flush_latency_ns'])
+            self.assertEqual(op_latency, timed_stats['avg_flush_latency_ns'])
+        else:
+            self.assertEqual(0, stats['flush_total_time_ns'])
+            self.assertEqual(0, timed_stats['min_flush_latency_ns'])
+            self.assertEqual(0, timed_stats['max_flush_latency_ns'])
+            self.assertEqual(0, timed_stats['avg_flush_latency_ns'])
+
+        # min flush latency <= avg flush latency <= max flush latency
+        self.assertLessEqual(timed_stats['min_flush_latency_ns'],
+                             timed_stats['avg_flush_latency_ns'])
+        self.assertLessEqual(timed_stats['avg_flush_latency_ns'],
+                             timed_stats['max_flush_latency_ns'])
+
+        # idle_time_ns must be > 0 if we have performed any operation
+        if (self.accounted_ops(read = True, write = True, flush = True) != 0):
+            self.assertLess(0, stats['idle_time_ns'])
+        else:
+            self.assertFalse(stats.has_key('idle_time_ns'))
+
+        # This test does not alter these, so they must be all 0
+        self.assertEqual(0, stats['rd_merged'])
+        self.assertEqual(0, stats['failed_flush_operations'])
+        self.assertEqual(0, stats['invalid_flush_operations'])
+
+    def do_test_stats(self, rd_size = 0, rd_ops = 0, wr_size = 0, wr_ops = 0,
+                      flush_ops = 0, invalid_rd_ops = 0, invalid_wr_ops = 0,
+                      failed_rd_ops = 0, failed_wr_ops = 0, wr_merged = 0):
+        # The 'ops' list will contain all the requested I/O operations
+        ops = []
+        for i in range(rd_ops):
+            ops.append("aio_read %d %d" % (i * rd_size, rd_size))
+
+        for i in range(wr_ops):
+            ops.append("aio_write %d %d" % (i * wr_size, wr_size))
+
+        for i in range(flush_ops):
+            ops.append("aio_flush")
+
+        highest_offset = wr_ops * wr_size
+
+        # Two types of invalid operations: unaligned length and unaligned offset
+        for i in range(invalid_rd_ops / 2):
+            ops.append("aio_read 0 511")
+
+        for i in range(invalid_rd_ops / 2, invalid_rd_ops):
+            ops.append("aio_read 13 512")
+
+        for i in range(invalid_wr_ops / 2):
+            ops.append("aio_write 0 511")
+
+        for i in range(invalid_wr_ops / 2, invalid_wr_ops):
+            ops.append("aio_write 13 512")
+
+        for i in range(failed_rd_ops):
+            ops.append("aio_read %d 512" % bad_offset)
+
+        for i in range(failed_wr_ops):
+            ops.append("aio_write %d 512" % bad_offset)
+
+        if failed_wr_ops > 0:
+            highest_offset = max(highest_offset, bad_offset + 512)
+
+        for i in range(wr_merged):
+            first = i * wr_size * 2
+            second = first + wr_size
+            ops.append("multiwrite %d %d ; %d %d" %
+                       (first, wr_size, second, wr_size))
+
+        highest_offset = max(highest_offset, wr_merged * wr_size * 2)
+
+        # Now perform all operations
+        for op in ops:
+            self.vm.hmp_qemu_io("drive0", op)
+
+        # Update the expected totals
+        self.total_rd_bytes += rd_ops * rd_size
+        self.total_rd_ops += rd_ops
+        self.total_wr_bytes += wr_ops * wr_size
+        self.total_wr_ops += wr_ops
+        self.total_wr_merged += wr_merged
+        self.total_flush_ops += flush_ops
+        self.invalid_rd_ops += invalid_rd_ops
+        self.invalid_wr_ops += invalid_wr_ops
+        self.failed_rd_ops += failed_rd_ops
+        self.failed_wr_ops += failed_wr_ops
+
+        self.wr_highest_offset = max(self.wr_highest_offset, highest_offset)
+
+        # Advance the clock so idle_time_ns has a meaningful value
+        self.vm.qtest("clock_step %d" % nsec_per_sec)
+
+        # And check that the actual statistics match the expected ones
+        self.check_values()
+
+    def test_read_only(self):
+        test_values = [[512,    1],
+                       [65536,  1],
+                       [512,   12],
+                       [65536, 12]]
+        for i in test_values:
+            self.do_test_stats(rd_size = i[0], rd_ops = i[1])
+
+    def test_write_only(self):
+        test_values = [[512,    1],
+                       [65536,  1],
+                       [512,   12],
+                       [65536, 12]]
+        for i in test_values:
+            self.do_test_stats(wr_size = i[0], wr_ops = i[1])
+
+    def test_invalid(self):
+        self.do_test_stats(invalid_rd_ops = 7)
+        self.do_test_stats(invalid_wr_ops = 3)
+        self.do_test_stats(invalid_rd_ops = 4, invalid_wr_ops = 5)
+
+    def test_failed(self):
+        self.do_test_stats(failed_rd_ops = 8)
+        self.do_test_stats(failed_wr_ops = 6)
+        self.do_test_stats(failed_rd_ops = 5, failed_wr_ops = 12)
+
+    def test_flush(self):
+        self.do_test_stats(flush_ops = 8)
+
+    def test_merged(self):
+        for i in range(5):
+            self.do_test_stats(wr_merged = i * 3)
+
+    def test_all(self):
+        # rd_size, rd_ops, wr_size, wr_ops, flush_ops
+        # invalid_rd_ops,  invalid_wr_ops,
+        # failed_rd_ops,   failed_wr_ops
+        # wr_merged
+        test_values = [[512,    1, 512,   1, 1, 4, 7, 5, 2, 1],
+                       [65536,  1, 2048, 12, 7, 7, 5, 2, 5, 5],
+                       [32768,  9, 8192,  1, 4, 3, 2, 4, 6, 4],
+                       [16384, 11, 3584, 16, 9, 8, 6, 7, 3, 4]]
+        for i in test_values:
+            self.do_test_stats(*i)
+
+    def test_no_op(self):
+        # All values must be sane before doing any I/O
+        self.check_values()
+
+
+class BlockDeviceStatsTestAccountInvalid(BlockDeviceStatsTestCase):
+    account_invalid = True
+    account_failed = False
+
+class BlockDeviceStatsTestAccountFailed(BlockDeviceStatsTestCase):
+    account_invalid = False
+    account_failed = True
+
+class BlockDeviceStatsTestAccountBoth(BlockDeviceStatsTestCase):
+    account_invalid = True
+    account_failed = True
+
+class BlockDeviceStatsTestCoroutine(BlockDeviceStatsTestCase):
+    test_img = "null-co://"
+
+if __name__ == '__main__':
+    iotests.main(supported_fmts=["raw"])
diff --git a/tests/qemu-iotests/136.out b/tests/qemu-iotests/136.out
new file mode 100644
index 0000000000..0a5e9583a4
--- /dev/null
+++ b/tests/qemu-iotests/136.out
@@ -0,0 +1,5 @@
+........................................
+----------------------------------------------------------------------
+Ran 40 tests
+
+OK
diff --git a/tests/qemu-iotests/group b/tests/qemu-iotests/group
index c69265decd..5a0880893a 100644
--- a/tests/qemu-iotests/group
+++ b/tests/qemu-iotests/group
@@ -136,6 +136,7 @@
 132 rw auto quick
 134 rw auto quick
 135 rw auto
+136 rw auto
 137 rw auto
 138 rw auto quick
 139 rw auto quick
diff --git a/tests/test-blockjob-txn.c b/tests/test-blockjob-txn.c
new file mode 100644
index 0000000000..34747e924d
--- /dev/null
+++ b/tests/test-blockjob-txn.c
@@ -0,0 +1,250 @@
+/*
+ * Blockjob transactions tests
+ *
+ * Copyright Red Hat, Inc. 2015
+ *
+ * Authors:
+ *  Stefan Hajnoczi    <stefanha@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ */
+
+#include <glib.h>
+#include "qapi/error.h"
+#include "qemu/main-loop.h"
+#include "block/blockjob.h"
+
+typedef struct {
+    BlockJob common;
+    unsigned int iterations;
+    bool use_timer;
+    int rc;
+    int *result;
+} TestBlockJob;
+
+static const BlockJobDriver test_block_job_driver = {
+    .instance_size = sizeof(TestBlockJob),
+};
+
+static void test_block_job_complete(BlockJob *job, void *opaque)
+{
+    BlockDriverState *bs = job->bs;
+    int rc = (intptr_t)opaque;
+
+    if (block_job_is_cancelled(job)) {
+        rc = -ECANCELED;
+    }
+
+    block_job_completed(job, rc);
+    bdrv_unref(bs);
+}
+
+static void coroutine_fn test_block_job_run(void *opaque)
+{
+    TestBlockJob *s = opaque;
+    BlockJob *job = &s->common;
+
+    while (s->iterations--) {
+        if (s->use_timer) {
+            block_job_sleep_ns(job, QEMU_CLOCK_REALTIME, 0);
+        } else {
+            block_job_yield(job);
+        }
+
+        if (block_job_is_cancelled(job)) {
+            break;
+        }
+    }
+
+    block_job_defer_to_main_loop(job, test_block_job_complete,
+                                 (void *)(intptr_t)s->rc);
+}
+
+typedef struct {
+    TestBlockJob *job;
+    int *result;
+} TestBlockJobCBData;
+
+static void test_block_job_cb(void *opaque, int ret)
+{
+    TestBlockJobCBData *data = opaque;
+    if (!ret && block_job_is_cancelled(&data->job->common)) {
+        ret = -ECANCELED;
+    }
+    *data->result = ret;
+    g_free(data);
+}
+
+/* Create a block job that completes with a given return code after a given
+ * number of event loop iterations.  The return code is stored in the given
+ * result pointer.
+ *
+ * The event loop iterations can either be handled automatically with a 0 delay
+ * timer, or they can be stepped manually by entering the coroutine.
+ */
+static BlockJob *test_block_job_start(unsigned int iterations,
+                                      bool use_timer,
+                                      int rc, int *result)
+{
+    BlockDriverState *bs;
+    TestBlockJob *s;
+    TestBlockJobCBData *data;
+
+    data = g_new0(TestBlockJobCBData, 1);
+    bs = bdrv_new();
+    s = block_job_create(&test_block_job_driver, bs, 0, test_block_job_cb,
+                         data, &error_abort);
+    s->iterations = iterations;
+    s->use_timer = use_timer;
+    s->rc = rc;
+    s->result = result;
+    s->common.co = qemu_coroutine_create(test_block_job_run);
+    data->job = s;
+    data->result = result;
+    qemu_coroutine_enter(s->common.co, s);
+    return &s->common;
+}
+
+static void test_single_job(int expected)
+{
+    BlockJob *job;
+    BlockJobTxn *txn;
+    int result = -EINPROGRESS;
+
+    txn = block_job_txn_new();
+    job = test_block_job_start(1, true, expected, &result);
+    block_job_txn_add_job(txn, job);
+
+    if (expected == -ECANCELED) {
+        block_job_cancel(job);
+    }
+
+    while (result == -EINPROGRESS) {
+        aio_poll(qemu_get_aio_context(), true);
+    }
+    g_assert_cmpint(result, ==, expected);
+
+    block_job_txn_unref(txn);
+}
+
+static void test_single_job_success(void)
+{
+    test_single_job(0);
+}
+
+static void test_single_job_failure(void)
+{
+    test_single_job(-EIO);
+}
+
+static void test_single_job_cancel(void)
+{
+    test_single_job(-ECANCELED);
+}
+
+static void test_pair_jobs(int expected1, int expected2)
+{
+    BlockJob *job1;
+    BlockJob *job2;
+    BlockJobTxn *txn;
+    int result1 = -EINPROGRESS;
+    int result2 = -EINPROGRESS;
+
+    txn = block_job_txn_new();
+    job1 = test_block_job_start(1, true, expected1, &result1);
+    block_job_txn_add_job(txn, job1);
+    job2 = test_block_job_start(2, true, expected2, &result2);
+    block_job_txn_add_job(txn, job2);
+
+    if (expected1 == -ECANCELED) {
+        block_job_cancel(job1);
+    }
+    if (expected2 == -ECANCELED) {
+        block_job_cancel(job2);
+    }
+
+    while (result1 == -EINPROGRESS || result2 == -EINPROGRESS) {
+        aio_poll(qemu_get_aio_context(), true);
+    }
+
+    /* Failure or cancellation of one job cancels the other job */
+    if (expected1 != 0) {
+        expected2 = -ECANCELED;
+    } else if (expected2 != 0) {
+        expected1 = -ECANCELED;
+    }
+
+    g_assert_cmpint(result1, ==, expected1);
+    g_assert_cmpint(result2, ==, expected2);
+
+    block_job_txn_unref(txn);
+}
+
+static void test_pair_jobs_success(void)
+{
+    test_pair_jobs(0, 0);
+}
+
+static void test_pair_jobs_failure(void)
+{
+    /* Test both orderings.  The two jobs run for a different number of
+     * iterations so the code path is different depending on which job fails
+     * first.
+     */
+    test_pair_jobs(-EIO, 0);
+    test_pair_jobs(0, -EIO);
+}
+
+static void test_pair_jobs_cancel(void)
+{
+    test_pair_jobs(-ECANCELED, 0);
+    test_pair_jobs(0, -ECANCELED);
+}
+
+static void test_pair_jobs_fail_cancel_race(void)
+{
+    BlockJob *job1;
+    BlockJob *job2;
+    BlockJobTxn *txn;
+    int result1 = -EINPROGRESS;
+    int result2 = -EINPROGRESS;
+
+    txn = block_job_txn_new();
+    job1 = test_block_job_start(1, true, -ECANCELED, &result1);
+    block_job_txn_add_job(txn, job1);
+    job2 = test_block_job_start(2, false, 0, &result2);
+    block_job_txn_add_job(txn, job2);
+
+    block_job_cancel(job1);
+
+    /* Now make job2 finish before the main loop kicks jobs.  This simulates
+     * the race between a pending kick and another job completing.
+     */
+    block_job_enter(job2);
+    block_job_enter(job2);
+
+    while (result1 == -EINPROGRESS || result2 == -EINPROGRESS) {
+        aio_poll(qemu_get_aio_context(), true);
+    }
+
+    g_assert_cmpint(result1, ==, -ECANCELED);
+    g_assert_cmpint(result2, ==, -ECANCELED);
+
+    block_job_txn_unref(txn);
+}
+
+int main(int argc, char **argv)
+{
+    qemu_init_main_loop(&error_abort);
+
+    g_test_init(&argc, &argv, NULL);
+    g_test_add_func("/single/success", test_single_job_success);
+    g_test_add_func("/single/failure", test_single_job_failure);
+    g_test_add_func("/single/cancel", test_single_job_cancel);
+    g_test_add_func("/pair/success", test_pair_jobs_success);
+    g_test_add_func("/pair/failure", test_pair_jobs_failure);
+    g_test_add_func("/pair/cancel", test_pair_jobs_cancel);
+    g_test_add_func("/pair/fail-cancel-race", test_pair_jobs_fail_cancel_race);
+    return g_test_run();
+}
diff --git a/tests/test-timed-average.c b/tests/test-timed-average.c
new file mode 100644
index 0000000000..a049799b80
--- /dev/null
+++ b/tests/test-timed-average.c
@@ -0,0 +1,90 @@
+/*
+ * Timed average computation tests
+ *
+ * Copyright Nodalink, EURL. 2014
+ *
+ * Authors:
+ *  Benoît Canet     <benoit.canet@nodalink.com>
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ */
+
+#include <glib.h>
+#include <unistd.h>
+
+#include "qemu/timed-average.h"
+
+/* This is the clock for QEMU_CLOCK_VIRTUAL */
+static int64_t my_clock_value;
+
+int64_t cpu_get_clock(void)
+{
+    return my_clock_value;
+}
+
+static void account(TimedAverage *ta)
+{
+    timed_average_account(ta, 1);
+    timed_average_account(ta, 5);
+    timed_average_account(ta, 2);
+    timed_average_account(ta, 4);
+    timed_average_account(ta, 3);
+}
+
+static void test_average(void)
+{
+    TimedAverage ta;
+    uint64_t result;
+    int i;
+
+    /* we will compute some average on a period of 1 second */
+    timed_average_init(&ta, QEMU_CLOCK_VIRTUAL, NANOSECONDS_PER_SECOND);
+
+    result = timed_average_min(&ta);
+    g_assert(result == 0);
+    result = timed_average_avg(&ta);
+    g_assert(result == 0);
+    result = timed_average_max(&ta);
+    g_assert(result == 0);
+
+    for (i = 0; i < 100; i++) {
+        account(&ta);
+        result = timed_average_min(&ta);
+        g_assert(result == 1);
+        result = timed_average_avg(&ta);
+        g_assert(result == 3);
+        result = timed_average_max(&ta);
+        g_assert(result == 5);
+        my_clock_value += NANOSECONDS_PER_SECOND / 10;
+    }
+
+    my_clock_value += NANOSECONDS_PER_SECOND * 100;
+
+    result = timed_average_min(&ta);
+    g_assert(result == 0);
+    result = timed_average_avg(&ta);
+    g_assert(result == 0);
+    result = timed_average_max(&ta);
+    g_assert(result == 0);
+
+    for (i = 0; i < 100; i++) {
+        account(&ta);
+        result = timed_average_min(&ta);
+        g_assert(result == 1);
+        result = timed_average_avg(&ta);
+        g_assert(result == 3);
+        result = timed_average_max(&ta);
+        g_assert(result == 5);
+        my_clock_value += NANOSECONDS_PER_SECOND / 10;
+    }
+}
+
+int main(int argc, char **argv)
+{
+    /* tests in the same order as the header function declarations */
+    g_test_init(&argc, &argv, NULL);
+    g_test_add_func("/timed-average/average", test_average);
+    return g_test_run();
+}
+
diff --git a/util/Makefile.objs b/util/Makefile.objs
index d7cc39907f..89dd80ef86 100644
--- a/util/Makefile.objs
+++ b/util/Makefile.objs
@@ -29,3 +29,4 @@ util-obj-y += qemu-coroutine.o qemu-coroutine-lock.o qemu-coroutine-io.o
 util-obj-y += qemu-coroutine-sleep.o
 util-obj-y += coroutine-$(CONFIG_COROUTINE_BACKEND).o
 util-obj-y += buffer.o
+util-obj-y += timed-average.o
diff --git a/util/timed-average.c b/util/timed-average.c
new file mode 100644
index 0000000000..a2dfb4834d
--- /dev/null
+++ b/util/timed-average.c
@@ -0,0 +1,231 @@
+/*
+ * QEMU timed average computation
+ *
+ * Copyright (C) Nodalink, EURL. 2014
+ * Copyright (C) Igalia, S.L. 2015
+ *
+ * Authors:
+ *   Benoît Canet <benoit.canet@nodalink.com>
+ *   Alberto Garcia <berto@igalia.com>
+ *
+ * This program is free sofware: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Sofware Foundation, either version 2 of the License, or
+ * (at your option) version 3 or any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <string.h>
+
+#include "qemu/timed-average.h"
+
+/* This module computes an average of a set of values within a time
+ * window.
+ *
+ * Algorithm:
+ *
+ * - Create two windows with a certain expiration period, and
+ *   offsetted by period / 2.
+ * - Each time you want to account a new value, do it in both windows.
+ * - The minimum / maximum / average values are always returned from
+ *   the oldest window.
+ *
+ * Example:
+ *
+ *        t=0          |t=0.5           |t=1          |t=1.5            |t=2
+ *        wnd0: [0,0.5)|wnd0: [0.5,1.5) |             |wnd0: [1.5,2.5)  |
+ *        wnd1: [0,1)  |                |wnd1: [1,2)  |                 |
+ *
+ * Values are returned from:
+ *
+ *        wnd0---------|wnd1------------|wnd0---------|wnd1-------------|
+ */
+
+/* Update the expiration of a time window
+ *
+ * @w:      the window used
+ * @now:    the current time in nanoseconds
+ * @period: the expiration period in nanoseconds
+ */
+static void update_expiration(TimedAverageWindow *w, int64_t now,
+                              int64_t period)
+{
+    /* time elapsed since the last theoretical expiration */
+    int64_t elapsed = (now - w->expiration) % period;
+    /* time remaininging until the next expiration */
+    int64_t remaining = period - elapsed;
+    /* compute expiration */
+    w->expiration = now + remaining;
+}
+
+/* Reset a window
+ *
+ * @w: the window to reset
+ */
+static void window_reset(TimedAverageWindow *w)
+{
+    w->min = UINT64_MAX;
+    w->max = 0;
+    w->sum = 0;
+    w->count = 0;
+}
+
+/* Get the current window (that is, the one with the earliest
+ * expiration time).
+ *
+ * @ta:  the TimedAverage structure
+ * @ret: a pointer to the current window
+ */
+static TimedAverageWindow *current_window(TimedAverage *ta)
+{
+     return &ta->windows[ta->current];
+}
+
+/* Initialize a TimedAverage structure
+ *
+ * @ta:         the TimedAverage structure
+ * @clock_type: the type of clock to use
+ * @period:     the time window period in nanoseconds
+ */
+void timed_average_init(TimedAverage *ta, QEMUClockType clock_type,
+                        uint64_t period)
+{
+    int64_t now = qemu_clock_get_ns(clock_type);
+
+    /* Returned values are from the oldest window, so they belong to
+     * the interval [ta->period/2,ta->period). By adjusting the
+     * requested period by 4/3, we guarantee that they're in the
+     * interval [2/3 period,4/3 period), closer to the requested
+     * period on average */
+    ta->period = (uint64_t) period * 4 / 3;
+    ta->clock_type = clock_type;
+    ta->current = 0;
+
+    window_reset(&ta->windows[0]);
+    window_reset(&ta->windows[1]);
+
+    /* Both windows are offsetted by half a period */
+    ta->windows[0].expiration = now + ta->period / 2;
+    ta->windows[1].expiration = now + ta->period;
+}
+
+/* Check if the time windows have expired, updating their counters and
+ * expiration time if that's the case.
+ *
+ * @ta: the TimedAverage structure
+ * @elapsed: if non-NULL, the elapsed time (in ns) within the current
+ *           window will be stored here
+ */
+static void check_expirations(TimedAverage *ta, uint64_t *elapsed)
+{
+    int64_t now = qemu_clock_get_ns(ta->clock_type);
+    int i;
+
+    assert(ta->period != 0);
+
+    /* Check if the windows have expired */
+    for (i = 0; i < 2; i++) {
+        TimedAverageWindow *w = &ta->windows[i];
+        if (w->expiration <= now) {
+            window_reset(w);
+            update_expiration(w, now, ta->period);
+        }
+    }
+
+    /* Make ta->current point to the oldest window */
+    if (ta->windows[0].expiration < ta->windows[1].expiration) {
+        ta->current = 0;
+    } else {
+        ta->current = 1;
+    }
+
+    /* Calculate the elapsed time within the current window */
+    if (elapsed) {
+        int64_t remaining = ta->windows[ta->current].expiration - now;
+        *elapsed = ta->period - remaining;
+    }
+}
+
+/* Account a value
+ *
+ * @ta:    the TimedAverage structure
+ * @value: the value to account
+ */
+void timed_average_account(TimedAverage *ta, uint64_t value)
+{
+    int i;
+    check_expirations(ta, NULL);
+
+    /* Do the accounting in both windows at the same time */
+    for (i = 0; i < 2; i++) {
+        TimedAverageWindow *w = &ta->windows[i];
+
+        w->sum += value;
+        w->count++;
+
+        if (value < w->min) {
+            w->min = value;
+        }
+
+        if (value > w->max) {
+            w->max = value;
+        }
+    }
+}
+
+/* Get the minimum value
+ *
+ * @ta:  the TimedAverage structure
+ * @ret: the minimum value
+ */
+uint64_t timed_average_min(TimedAverage *ta)
+{
+    TimedAverageWindow *w;
+    check_expirations(ta, NULL);
+    w = current_window(ta);
+    return w->min < UINT64_MAX ? w->min : 0;
+}
+
+/* Get the average value
+ *
+ * @ta:  the TimedAverage structure
+ * @ret: the average value
+ */
+uint64_t timed_average_avg(TimedAverage *ta)
+{
+    TimedAverageWindow *w;
+    check_expirations(ta, NULL);
+    w = current_window(ta);
+    return w->count > 0 ? w->sum / w->count : 0;
+}
+
+/* Get the maximum value
+ *
+ * @ta:  the TimedAverage structure
+ * @ret: the maximum value
+ */
+uint64_t timed_average_max(TimedAverage *ta)
+{
+    check_expirations(ta, NULL);
+    return current_window(ta)->max;
+}
+
+/* Get the sum of all accounted values
+ * @ta:      the TimedAverage structure
+ * @elapsed: if non-NULL, the elapsed time (in ns) will be stored here
+ * @ret:     the sum of all accounted values
+ */
+uint64_t timed_average_sum(TimedAverage *ta, uint64_t *elapsed)
+{
+    TimedAverageWindow *w;
+    check_expirations(ta, elapsed);
+    w = current_window(ta);
+    return w->sum;
+}