summary refs log tree commit diff stats
diff options
context:
space:
mode:
authorAnthony Liguori <aliguori@us.ibm.com>2013-03-11 08:30:34 -0500
committerAnthony Liguori <aliguori@us.ibm.com>2013-03-11 08:30:34 -0500
commitfe3cc14fd83e0c8f376d849ccd0fc3433388442d (patch)
tree8a354b91eab8a1ac16b4186f9fbb2a75772b75fe
parentbba18e23f7266d63734fd31045fec7794cc34a38 (diff)
parentee0b44aa9d9450e873a761ca2030b2fa3ec52eb0 (diff)
downloadfocaccia-qemu-fe3cc14fd83e0c8f376d849ccd0fc3433388442d.tar.gz
focaccia-qemu-fe3cc14fd83e0c8f376d849ccd0fc3433388442d.zip
Merge remote-tracking branch 'quintela/migration.next' into staging
# By Paolo Bonzini (40) and others
# Via Juan Quintela
* quintela/migration.next: (46 commits)
  page_cache: dup memory on insert
  page_cache: fix memory leak
  Fix cache_resize to keep old entry age
  Fix page_cache leak in cache_resize
  migration: inline migrate_fd_close
  migration: eliminate s->migration_file
  migration: move contents of migration_close to migrate_fd_cleanup
  migration: move rate limiting to QEMUFile
  migration: small changes around rate-limiting
  migration: use qemu_ftell to compute bandwidth
  migration: use QEMUFile for writing outgoing migration data
  migration: use QEMUFile for migration channel lifetime
  qemu-file: simplify and export qemu_ftell
  qemu-file: add writable socket QEMUFile
  qemu-file: check exit status when closing a pipe QEMUFile
  qemu-file: fsync a writable stdio QEMUFile
  migration: merge qemu_popen_cmd with qemu_popen
  migration: use qemu_file_rate_limit consistently
  migration: remove useless qemu_file_get_error check
  migration: detect error before sleeping
  ...
-rw-r--r--arch_init.c17
-rw-r--r--block-migration.c167
-rw-r--r--docs/migration.txt20
-rw-r--r--include/migration/migration.h12
-rw-r--r--include/migration/page_cache.h3
-rw-r--r--include/migration/qemu-file.h21
-rw-r--r--include/migration/vmstate.h21
-rw-r--r--include/qemu/atomic.h1
-rw-r--r--include/qemu/osdep.h7
-rw-r--r--include/sysemu/sysemu.h6
-rw-r--r--migration-exec.c39
-rw-r--r--migration-fd.c47
-rw-r--r--migration-tcp.c33
-rw-r--r--migration-unix.c33
-rw-r--r--migration.c357
-rw-r--r--page_cache.c25
-rw-r--r--savevm.c214
-rw-r--r--trace-events3
18 files changed, 402 insertions, 624 deletions
diff --git a/arch_init.c b/arch_init.c
index 8daeafaf5c..98e2bc6f55 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -293,8 +293,7 @@ static int save_xbzrle_page(QEMUFile *f, uint8_t *current_data,
 
     if (!cache_is_cached(XBZRLE.cache, current_addr)) {
         if (!last_stage) {
-            cache_insert(XBZRLE.cache, current_addr,
-                         g_memdup(current_data, TARGET_PAGE_SIZE));
+            cache_insert(XBZRLE.cache, current_addr, current_data);
         }
         acct_info.xbzrle_cache_miss++;
         return -1;
@@ -379,6 +378,8 @@ static inline bool migration_bitmap_set_dirty(MemoryRegion *mr,
     return ret;
 }
 
+/* Needs iothread lock! */
+
 static void migration_bitmap_sync(void)
 {
     RAMBlock *block;
@@ -568,10 +569,6 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
     bitmap_set(migration_bitmap, 0, ram_pages);
     migration_dirty_pages = ram_pages;
 
-    qemu_mutex_lock_ramlist();
-    bytes_transferred = 0;
-    reset_ram_globals();
-
     if (migrate_use_xbzrle()) {
         XBZRLE.cache = cache_init(migrate_xbzrle_cache_size() /
                                   TARGET_PAGE_SIZE,
@@ -585,8 +582,14 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
         acct_clear();
     }
 
+    qemu_mutex_lock_iothread();
+    qemu_mutex_lock_ramlist();
+    bytes_transferred = 0;
+    reset_ram_globals();
+
     memory_global_dirty_log_start();
     migration_bitmap_sync();
+    qemu_mutex_unlock_iothread();
 
     qemu_put_be64(f, ram_bytes_total() | RAM_SAVE_FLAG_MEM_SIZE);
 
@@ -690,7 +693,9 @@ static uint64_t ram_save_pending(QEMUFile *f, void *opaque, uint64_t max_size)
     remaining_size = ram_save_remaining() * TARGET_PAGE_SIZE;
 
     if (remaining_size < max_size) {
+        qemu_mutex_lock_iothread();
         migration_bitmap_sync();
+        qemu_mutex_unlock_iothread();
         remaining_size = ram_save_remaining() * TARGET_PAGE_SIZE;
     }
     return remaining_size;
diff --git a/block-migration.c b/block-migration.c
index 43ab2028c0..2fd7699794 100644
--- a/block-migration.c
+++ b/block-migration.c
@@ -43,19 +43,24 @@
 #endif
 
 typedef struct BlkMigDevState {
+    /* Written during setup phase.  Can be read without a lock.  */
     BlockDriverState *bs;
-    int bulk_completed;
     int shared_base;
-    int64_t cur_sector;
-    int64_t cur_dirty;
-    int64_t completed_sectors;
     int64_t total_sectors;
-    int64_t dirty;
     QSIMPLEQ_ENTRY(BlkMigDevState) entry;
+
+    /* Only used by migration thread.  Does not need a lock.  */
+    int bulk_completed;
+    int64_t cur_sector;
+    int64_t cur_dirty;
+
+    /* Protected by block migration lock.  */
     unsigned long *aio_bitmap;
+    int64_t completed_sectors;
 } BlkMigDevState;
 
 typedef struct BlkMigBlock {
+    /* Only used by migration thread.  */
     uint8_t *buf;
     BlkMigDevState *bmds;
     int64_t sector;
@@ -63,26 +68,49 @@ typedef struct BlkMigBlock {
     struct iovec iov;
     QEMUIOVector qiov;
     BlockDriverAIOCB *aiocb;
+
+    /* Protected by block migration lock.  */
     int ret;
     QSIMPLEQ_ENTRY(BlkMigBlock) entry;
 } BlkMigBlock;
 
 typedef struct BlkMigState {
+    /* Written during setup phase.  Can be read without a lock.  */
     int blk_enable;
     int shared_base;
     QSIMPLEQ_HEAD(bmds_list, BlkMigDevState) bmds_list;
+    int64_t total_sector_sum;
+
+    /* Protected by lock.  */
     QSIMPLEQ_HEAD(blk_list, BlkMigBlock) blk_list;
     int submitted;
     int read_done;
+
+    /* Only used by migration thread.  Does not need a lock.  */
     int transferred;
-    int64_t total_sector_sum;
     int prev_progress;
     int bulk_completed;
-    long double prev_time_offset;
+
+    /* Lock must be taken _inside_ the iothread lock.  */
+    QemuMutex lock;
 } BlkMigState;
 
 static BlkMigState block_mig_state;
 
+static void blk_mig_lock(void)
+{
+    qemu_mutex_lock(&block_mig_state.lock);
+}
+
+static void blk_mig_unlock(void)
+{
+    qemu_mutex_unlock(&block_mig_state.lock);
+}
+
+/* Must run outside of the iothread lock during the bulk phase,
+ * or the VM will stall.
+ */
+
 static void blk_send(QEMUFile *f, BlkMigBlock * blk)
 {
     int len;
@@ -109,9 +137,11 @@ uint64_t blk_mig_bytes_transferred(void)
     BlkMigDevState *bmds;
     uint64_t sum = 0;
 
+    blk_mig_lock();
     QSIMPLEQ_FOREACH(bmds, &block_mig_state.bmds_list, entry) {
         sum += bmds->completed_sectors;
     }
+    blk_mig_unlock();
     return sum << BDRV_SECTOR_BITS;
 }
 
@@ -131,6 +161,9 @@ uint64_t blk_mig_bytes_total(void)
     return sum << BDRV_SECTOR_BITS;
 }
 
+
+/* Called with migration lock held.  */
+
 static int bmds_aio_inflight(BlkMigDevState *bmds, int64_t sector)
 {
     int64_t chunk = sector / (int64_t)BDRV_SECTORS_PER_DIRTY_CHUNK;
@@ -143,6 +176,8 @@ static int bmds_aio_inflight(BlkMigDevState *bmds, int64_t sector)
     }
 }
 
+/* Called with migration lock held.  */
+
 static void bmds_set_aio_inflight(BlkMigDevState *bmds, int64_t sector_num,
                              int nb_sectors, int set)
 {
@@ -177,23 +212,26 @@ static void alloc_aio_bitmap(BlkMigDevState *bmds)
     bmds->aio_bitmap = g_malloc0(bitmap_size);
 }
 
+/* Never hold migration lock when yielding to the main loop!  */
+
 static void blk_mig_read_cb(void *opaque, int ret)
 {
-    long double curr_time = qemu_get_clock_ns(rt_clock);
     BlkMigBlock *blk = opaque;
 
+    blk_mig_lock();
     blk->ret = ret;
 
-    block_mig_state.prev_time_offset = curr_time;
-
     QSIMPLEQ_INSERT_TAIL(&block_mig_state.blk_list, blk, entry);
     bmds_set_aio_inflight(blk->bmds, blk->sector, blk->nr_sectors, 0);
 
     block_mig_state.submitted--;
     block_mig_state.read_done++;
     assert(block_mig_state.submitted >= 0);
+    blk_mig_unlock();
 }
 
+/* Called with no lock taken.  */
+
 static int mig_save_device_bulk(QEMUFile *f, BlkMigDevState *bmds)
 {
     int64_t total_sectors = bmds->total_sectors;
@@ -203,11 +241,13 @@ static int mig_save_device_bulk(QEMUFile *f, BlkMigDevState *bmds)
     int nr_sectors;
 
     if (bmds->shared_base) {
+        qemu_mutex_lock_iothread();
         while (cur_sector < total_sectors &&
                !bdrv_is_allocated(bs, cur_sector, MAX_IS_ALLOCATED_SEARCH,
                                   &nr_sectors)) {
             cur_sector += nr_sectors;
         }
+        qemu_mutex_unlock_iothread();
     }
 
     if (cur_sector >= total_sectors) {
@@ -236,20 +276,23 @@ static int mig_save_device_bulk(QEMUFile *f, BlkMigDevState *bmds)
     blk->iov.iov_len = nr_sectors * BDRV_SECTOR_SIZE;
     qemu_iovec_init_external(&blk->qiov, &blk->iov, 1);
 
-    if (block_mig_state.submitted == 0) {
-        block_mig_state.prev_time_offset = qemu_get_clock_ns(rt_clock);
-    }
+    blk_mig_lock();
+    block_mig_state.submitted++;
+    blk_mig_unlock();
 
+    qemu_mutex_lock_iothread();
     blk->aiocb = bdrv_aio_readv(bs, cur_sector, &blk->qiov,
                                 nr_sectors, blk_mig_read_cb, blk);
-    block_mig_state.submitted++;
 
     bdrv_reset_dirty(bs, cur_sector, nr_sectors);
-    bmds->cur_sector = cur_sector + nr_sectors;
+    qemu_mutex_unlock_iothread();
 
+    bmds->cur_sector = cur_sector + nr_sectors;
     return (bmds->cur_sector >= total_sectors);
 }
 
+/* Called with iothread lock taken.  */
+
 static void set_dirty_tracking(int enable)
 {
     BlkMigDevState *bmds;
@@ -305,6 +348,8 @@ static void init_blk_migration(QEMUFile *f)
     bdrv_iterate(init_blk_migration_it, NULL);
 }
 
+/* Called with no lock taken.  */
+
 static int blk_mig_save_bulked_block(QEMUFile *f)
 {
     int64_t completed_sector_sum = 0;
@@ -351,6 +396,8 @@ static void blk_mig_reset_dirty_cursor(void)
     }
 }
 
+/* Called with iothread lock taken.  */
+
 static int mig_save_device_dirty(QEMUFile *f, BlkMigDevState *bmds,
                                  int is_async)
 {
@@ -361,8 +408,12 @@ static int mig_save_device_dirty(QEMUFile *f, BlkMigDevState *bmds,
     int ret = -EIO;
 
     for (sector = bmds->cur_dirty; sector < bmds->total_sectors;) {
+        blk_mig_lock();
         if (bmds_aio_inflight(bmds, sector)) {
+            blk_mig_unlock();
             bdrv_drain_all();
+        } else {
+            blk_mig_unlock();
         }
         if (bdrv_get_dirty(bmds->bs, sector)) {
 
@@ -382,14 +433,13 @@ static int mig_save_device_dirty(QEMUFile *f, BlkMigDevState *bmds,
                 blk->iov.iov_len = nr_sectors * BDRV_SECTOR_SIZE;
                 qemu_iovec_init_external(&blk->qiov, &blk->iov, 1);
 
-                if (block_mig_state.submitted == 0) {
-                    block_mig_state.prev_time_offset = qemu_get_clock_ns(rt_clock);
-                }
-
                 blk->aiocb = bdrv_aio_readv(bmds->bs, sector, &blk->qiov,
                                             nr_sectors, blk_mig_read_cb, blk);
+
+                blk_mig_lock();
                 block_mig_state.submitted++;
                 bmds_set_aio_inflight(bmds, sector, nr_sectors, 1);
+                blk_mig_unlock();
             } else {
                 ret = bdrv_read(bmds->bs, sector, blk->buf, nr_sectors);
                 if (ret < 0) {
@@ -417,7 +467,9 @@ error:
     return ret;
 }
 
-/* return value:
+/* Called with iothread lock taken.
+ *
+ * return value:
  * 0: too much data for max_downtime
  * 1: few enough data for max_downtime
 */
@@ -436,6 +488,8 @@ static int blk_mig_save_dirty_block(QEMUFile *f, int is_async)
     return ret;
 }
 
+/* Called with no locks taken.  */
+
 static int flush_blks(QEMUFile *f)
 {
     BlkMigBlock *blk;
@@ -445,6 +499,7 @@ static int flush_blks(QEMUFile *f)
             __FUNCTION__, block_mig_state.submitted, block_mig_state.read_done,
             block_mig_state.transferred);
 
+    blk_mig_lock();
     while ((blk = QSIMPLEQ_FIRST(&block_mig_state.blk_list)) != NULL) {
         if (qemu_file_rate_limit(f)) {
             break;
@@ -453,9 +508,12 @@ static int flush_blks(QEMUFile *f)
             ret = blk->ret;
             break;
         }
-        blk_send(f, blk);
 
         QSIMPLEQ_REMOVE_HEAD(&block_mig_state.blk_list, entry);
+        blk_mig_unlock();
+        blk_send(f, blk);
+        blk_mig_lock();
+
         g_free(blk->buf);
         g_free(blk);
 
@@ -463,6 +521,7 @@ static int flush_blks(QEMUFile *f)
         block_mig_state.transferred++;
         assert(block_mig_state.read_done >= 0);
     }
+    blk_mig_unlock();
 
     DPRINTF("%s Exit submitted %d read_done %d transferred %d\n", __FUNCTION__,
             block_mig_state.submitted, block_mig_state.read_done,
@@ -470,6 +529,8 @@ static int flush_blks(QEMUFile *f)
     return ret;
 }
 
+/* Called with iothread lock taken.  */
+
 static int64_t get_remaining_dirty(void)
 {
     BlkMigDevState *bmds;
@@ -482,6 +543,8 @@ static int64_t get_remaining_dirty(void)
     return dirty << BDRV_SECTOR_BITS;
 }
 
+/* Called with iothread lock taken.  */
+
 static void blk_mig_cleanup(void)
 {
     BlkMigDevState *bmds;
@@ -491,6 +554,7 @@ static void blk_mig_cleanup(void)
 
     set_dirty_tracking(0);
 
+    blk_mig_lock();
     while ((bmds = QSIMPLEQ_FIRST(&block_mig_state.bmds_list)) != NULL) {
         QSIMPLEQ_REMOVE_HEAD(&block_mig_state.bmds_list, entry);
         bdrv_set_in_use(bmds->bs, 0);
@@ -504,6 +568,7 @@ static void blk_mig_cleanup(void)
         g_free(blk->buf);
         g_free(blk);
     }
+    blk_mig_unlock();
 }
 
 static void block_migration_cancel(void *opaque)
@@ -518,22 +583,18 @@ static int block_save_setup(QEMUFile *f, void *opaque)
     DPRINTF("Enter save live setup submitted %d transferred %d\n",
             block_mig_state.submitted, block_mig_state.transferred);
 
+    qemu_mutex_lock_iothread();
     init_blk_migration(f);
 
     /* start track dirty blocks */
     set_dirty_tracking(1);
+    qemu_mutex_unlock_iothread();
 
     ret = flush_blks(f);
-    if (ret) {
-        blk_mig_cleanup();
-        return ret;
-    }
-
     blk_mig_reset_dirty_cursor();
-
     qemu_put_be64(f, BLK_MIG_FLAG_EOS);
 
-    return 0;
+    return ret;
 }
 
 static int block_save_iterate(QEMUFile *f, void *opaque)
@@ -546,46 +607,54 @@ static int block_save_iterate(QEMUFile *f, void *opaque)
 
     ret = flush_blks(f);
     if (ret) {
-        blk_mig_cleanup();
         return ret;
     }
 
     blk_mig_reset_dirty_cursor();
 
     /* control the rate of transfer */
+    blk_mig_lock();
     while ((block_mig_state.submitted +
             block_mig_state.read_done) * BLOCK_SIZE <
            qemu_file_get_rate_limit(f)) {
+        blk_mig_unlock();
         if (block_mig_state.bulk_completed == 0) {
             /* first finish the bulk phase */
             if (blk_mig_save_bulked_block(f) == 0) {
                 /* finished saving bulk on all devices */
                 block_mig_state.bulk_completed = 1;
             }
+            ret = 0;
         } else {
+            /* Always called with iothread lock taken for
+             * simplicity, block_save_complete also calls it.
+             */
+            qemu_mutex_lock_iothread();
             ret = blk_mig_save_dirty_block(f, 1);
-            if (ret != 0) {
-                /* no more dirty blocks */
-                break;
-            }
+            qemu_mutex_unlock_iothread();
+        }
+        if (ret < 0) {
+            return ret;
+        }
+        blk_mig_lock();
+        if (ret != 0) {
+            /* no more dirty blocks */
+            break;
         }
     }
-    if (ret < 0) {
-        blk_mig_cleanup();
-        return ret;
-    }
+    blk_mig_unlock();
 
     ret = flush_blks(f);
     if (ret) {
-        blk_mig_cleanup();
         return ret;
     }
 
     qemu_put_be64(f, BLK_MIG_FLAG_EOS);
-
     return qemu_ftell(f) - last_ftell;
 }
 
+/* Called with iothread lock taken.  */
+
 static int block_save_complete(QEMUFile *f, void *opaque)
 {
     int ret;
@@ -595,7 +664,6 @@ static int block_save_complete(QEMUFile *f, void *opaque)
 
     ret = flush_blks(f);
     if (ret) {
-        blk_mig_cleanup();
         return ret;
     }
 
@@ -603,16 +671,17 @@ static int block_save_complete(QEMUFile *f, void *opaque)
 
     /* we know for sure that save bulk is completed and
        all async read completed */
+    blk_mig_lock();
     assert(block_mig_state.submitted == 0);
+    blk_mig_unlock();
 
     do {
         ret = blk_mig_save_dirty_block(f, 0);
+        if (ret < 0) {
+            return ret;
+        }
     } while (ret == 0);
 
-    blk_mig_cleanup();
-    if (ret < 0) {
-        return ret;
-    }
     /* report completion */
     qemu_put_be64(f, (100 << BDRV_SECTOR_BITS) | BLK_MIG_FLAG_PROGRESS);
 
@@ -620,13 +689,18 @@ static int block_save_complete(QEMUFile *f, void *opaque)
 
     qemu_put_be64(f, BLK_MIG_FLAG_EOS);
 
+    blk_mig_cleanup();
     return 0;
 }
 
 static uint64_t block_save_pending(QEMUFile *f, void *opaque, uint64_t max_size)
 {
     /* Estimate pending number of bytes to send */
-    uint64_t pending = get_remaining_dirty() +
+    uint64_t pending;
+
+    qemu_mutex_lock_iothread();
+    blk_mig_lock();
+    pending = get_remaining_dirty() +
                        block_mig_state.submitted * BLOCK_SIZE +
                        block_mig_state.read_done * BLOCK_SIZE;
 
@@ -634,6 +708,8 @@ static uint64_t block_save_pending(QEMUFile *f, void *opaque, uint64_t max_size)
     if (pending == 0 && !block_mig_state.bulk_completed) {
         pending = BLOCK_SIZE;
     }
+    blk_mig_unlock();
+    qemu_mutex_unlock_iothread();
 
     DPRINTF("Enter save live pending  %" PRIu64 "\n", pending);
     return pending;
@@ -745,6 +821,7 @@ void blk_mig_init(void)
 {
     QSIMPLEQ_INIT(&block_mig_state.bmds_list);
     QSIMPLEQ_INIT(&block_mig_state.blk_list);
+    qemu_mutex_init(&block_mig_state.lock);
 
     register_savevm_live(NULL, "block", 0, 1, &savevm_block_handlers,
                          &block_mig_state);
diff --git a/docs/migration.txt b/docs/migration.txt
index f3ddd2f1a8..0719a55002 100644
--- a/docs/migration.txt
+++ b/docs/migration.txt
@@ -55,10 +55,7 @@ QEMUFile with:
 QEMUFile *qemu_fopen_ops(void *opaque,
                          QEMUFilePutBufferFunc *put_buffer,
                          QEMUFileGetBufferFunc *get_buffer,
-                         QEMUFileCloseFunc *close,
-                         QEMUFileRateLimit *rate_limit,
-                         QEMUFileSetRateLimit *set_rate_limit,
-                         QEMUFileGetRateLimit *get_rate_limit);
+                         QEMUFileCloseFunc *close);
 
 The functions have the following functionality:
 
@@ -80,24 +77,9 @@ Close a file and return an error code.
 
 typedef int (QEMUFileCloseFunc)(void *opaque);
 
-Called to determine if the file has exceeded its bandwidth allocation.  The
-bandwidth capping is a soft limit, not a hard limit.
-
-typedef int (QEMUFileRateLimit)(void *opaque);
-
-Called to change the current bandwidth allocation. This function must return
-the new actual bandwidth. It should be new_rate if everything goes OK, and
-the old rate otherwise.
-
-typedef size_t (QEMUFileSetRateLimit)(void *opaque, size_t new_rate);
-typedef size_t (QEMUFileGetRateLimit)(void *opaque);
-
 You can use any internal state that you need using the opaque void *
 pointer that is passed to all functions.
 
-The rate limiting functions are used to limit the bandwidth used by
-QEMU migration.
-
 The important functions for us are put_buffer()/get_buffer() that
 allow to write/read a buffer into the QEMUFile.
 
diff --git a/include/migration/migration.h b/include/migration/migration.h
index d1214097fe..bb617fdacf 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -34,18 +34,11 @@ struct MigrationState
     int64_t bandwidth_limit;
     size_t bytes_xfer;
     size_t xfer_limit;
-    uint8_t *buffer;
-    size_t buffer_size;
-    size_t buffer_capacity;
     QemuThread thread;
-
+    QEMUBH *cleanup_bh;
     QEMUFile *file;
-    int fd;
+
     int state;
-    int (*get_error)(MigrationState *s);
-    int (*close)(MigrationState *s);
-    int (*write)(MigrationState *s, const void *buff, size_t size);
-    void *opaque;
     MigrationParams params;
     int64_t total_time;
     int64_t downtime;
@@ -54,7 +47,6 @@ struct MigrationState
     int64_t dirty_bytes_rate;
     bool enabled_capabilities[MIGRATION_CAPABILITY_MAX];
     int64_t xbzrle_cache_size;
-    bool complete;
 };
 
 void process_incoming_migration(QEMUFile *f);
diff --git a/include/migration/page_cache.h b/include/migration/page_cache.h
index 3839ac7726..87894fea9f 100644
--- a/include/migration/page_cache.h
+++ b/include/migration/page_cache.h
@@ -57,7 +57,8 @@ bool cache_is_cached(const PageCache *cache, uint64_t addr);
 uint8_t *get_cached_data(const PageCache *cache, uint64_t addr);
 
 /**
- * cache_insert: insert the page into the cache. the previous value will be overwritten
+ * cache_insert: insert the page into the cache. the page cache
+ * will dup the data on insert. the previous value will be overwritten
  *
  * @cache pointer to the PageCache struct
  * @addr: page address
diff --git a/include/migration/qemu-file.h b/include/migration/qemu-file.h
index 46fc11dc99..df812617f8 100644
--- a/include/migration/qemu-file.h
+++ b/include/migration/qemu-file.h
@@ -51,33 +51,17 @@ typedef int (QEMUFileCloseFunc)(void *opaque);
  */
 typedef int (QEMUFileGetFD)(void *opaque);
 
-/* Called to determine if the file has exceeded its bandwidth allocation.  The
- * bandwidth capping is a soft limit, not a hard limit.
- */
-typedef int (QEMUFileRateLimit)(void *opaque);
-
-/* Called to change the current bandwidth allocation. This function must return
- * the new actual bandwidth. It should be new_rate if everything goes ok, and
- * the old rate otherwise
- */
-typedef int64_t (QEMUFileSetRateLimit)(void *opaque, int64_t new_rate);
-typedef int64_t (QEMUFileGetRateLimit)(void *opaque);
-
 typedef struct QEMUFileOps {
     QEMUFilePutBufferFunc *put_buffer;
     QEMUFileGetBufferFunc *get_buffer;
     QEMUFileCloseFunc *close;
     QEMUFileGetFD *get_fd;
-    QEMUFileRateLimit *rate_limit;
-    QEMUFileSetRateLimit *set_rate_limit;
-    QEMUFileGetRateLimit *get_rate_limit;
 } QEMUFileOps;
 
 QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps *ops);
 QEMUFile *qemu_fopen(const char *filename, const char *mode);
 QEMUFile *qemu_fdopen(int fd, const char *mode);
-QEMUFile *qemu_fopen_socket(int fd);
-QEMUFile *qemu_popen(FILE *popen_file, const char *mode);
+QEMUFile *qemu_fopen_socket(int fd, const char *mode);
 QEMUFile *qemu_popen_cmd(const char *command, const char *mode);
 int qemu_get_fd(QEMUFile *f);
 int qemu_fclose(QEMUFile *f);
@@ -110,7 +94,8 @@ unsigned int qemu_get_be32(QEMUFile *f);
 uint64_t qemu_get_be64(QEMUFile *f);
 
 int qemu_file_rate_limit(QEMUFile *f);
-int64_t qemu_file_set_rate_limit(QEMUFile *f, int64_t new_rate);
+void qemu_file_reset_rate_limit(QEMUFile *f);
+void qemu_file_set_rate_limit(QEMUFile *f, int64_t new_rate);
 int64_t qemu_file_get_rate_limit(QEMUFile *f);
 int qemu_file_get_error(QEMUFile *f);
 
diff --git a/include/migration/vmstate.h b/include/migration/vmstate.h
index 94a409b708..a64db941bc 100644
--- a/include/migration/vmstate.h
+++ b/include/migration/vmstate.h
@@ -32,15 +32,28 @@ typedef void SaveStateHandler(QEMUFile *f, void *opaque);
 typedef int LoadStateHandler(QEMUFile *f, void *opaque, int version_id);
 
 typedef struct SaveVMHandlers {
+    /* This runs inside the iothread lock.  */
     void (*set_params)(const MigrationParams *params, void * opaque);
     SaveStateHandler *save_state;
-    int (*save_live_setup)(QEMUFile *f, void *opaque);
-    int (*save_live_iterate)(QEMUFile *f, void *opaque);
+
+    void (*cancel)(void *opaque);
     int (*save_live_complete)(QEMUFile *f, void *opaque);
+
+    /* This runs both outside and inside the iothread lock.  */
+    bool (*is_active)(void *opaque);
+
+    /* This runs outside the iothread lock in the migration case, and
+     * within the lock in the savevm case.  The callback had better only
+     * use data that is local to the migration thread or protected
+     * by other locks.
+     */
+    int (*save_live_iterate)(QEMUFile *f, void *opaque);
+
+    /* This runs outside the iothread lock!  */
+    int (*save_live_setup)(QEMUFile *f, void *opaque);
     uint64_t (*save_live_pending)(QEMUFile *f, void *opaque, uint64_t max_size);
-    void (*cancel)(void *opaque);
+
     LoadStateHandler *load_state;
-    bool (*is_active)(void *opaque);
 } SaveVMHandlers;
 
 int register_savevm(DeviceState *dev,
diff --git a/include/qemu/atomic.h b/include/qemu/atomic.h
index 96a194bbee..10becb6101 100644
--- a/include/qemu/atomic.h
+++ b/include/qemu/atomic.h
@@ -16,6 +16,7 @@
  */
 #define smp_wmb()   barrier()
 #define smp_rmb()   barrier()
+
 /*
  * We use GCC builtin if it's available, as that can use
  * mfence on 32 bit as well, e.g. if built with -march=pentium-m.
diff --git a/include/qemu/osdep.h b/include/qemu/osdep.h
index 87d3b9cfa8..df244006c7 100644
--- a/include/qemu/osdep.h
+++ b/include/qemu/osdep.h
@@ -9,6 +9,13 @@
 #include <sys/signal.h>
 #endif
 
+#ifndef _WIN32
+#include <sys/wait.h>
+#else
+#define WIFEXITED(x)   1
+#define WEXITSTATUS(x) (x)
+#endif
+
 #include <sys/time.h>
 
 #if defined(CONFIG_SOLARIS) && CONFIG_SOLARIS_VERSION < 10
diff --git a/include/sysemu/sysemu.h b/include/sysemu/sysemu.h
index b19ec952b4..6578782fc3 100644
--- a/include/sysemu/sysemu.h
+++ b/include/sysemu/sysemu.h
@@ -73,10 +73,10 @@ void do_info_snapshots(Monitor *mon, const QDict *qdict);
 void qemu_announce_self(void);
 
 bool qemu_savevm_state_blocked(Error **errp);
-int qemu_savevm_state_begin(QEMUFile *f,
-                            const MigrationParams *params);
+void qemu_savevm_state_begin(QEMUFile *f,
+                             const MigrationParams *params);
 int qemu_savevm_state_iterate(QEMUFile *f);
-int qemu_savevm_state_complete(QEMUFile *f);
+void qemu_savevm_state_complete(QEMUFile *f);
 void qemu_savevm_state_cancel(void);
 uint64_t qemu_savevm_state_pending(QEMUFile *f, uint64_t max_size);
 int qemu_loadvm_state(QEMUFile *f);
diff --git a/migration-exec.c b/migration-exec.c
index a051a6e668..deab4e378e 100644
--- a/migration-exec.c
+++ b/migration-exec.c
@@ -33,49 +33,14 @@
     do { } while (0)
 #endif
 
-static int file_errno(MigrationState *s)
-{
-    return errno;
-}
-
-static int file_write(MigrationState *s, const void * buf, size_t size)
-{
-    return write(s->fd, buf, size);
-}
-
-static int exec_close(MigrationState *s)
-{
-    int ret = 0;
-    DPRINTF("exec_close\n");
-    ret = qemu_fclose(s->opaque);
-    s->opaque = NULL;
-    s->fd = -1;
-    if (ret >= 0 && !(WIFEXITED(ret) && WEXITSTATUS(ret) == 0)) {
-        /* close succeeded, but non-zero exit code: */
-        ret = -EIO; /* fake errno value */
-    }
-    return ret;
-}
-
 void exec_start_outgoing_migration(MigrationState *s, const char *command, Error **errp)
 {
-    FILE *f;
-
-    f = popen(command, "w");
-    if (f == NULL) {
+    s->file = qemu_popen_cmd(command, "w");
+    if (s->file == NULL) {
         error_setg_errno(errp, errno, "failed to popen the migration target");
         return;
     }
 
-    s->fd = fileno(f);
-    assert(s->fd != -1);
-
-    s->opaque = qemu_popen(f, "w");
-
-    s->close = exec_close;
-    s->get_error = file_errno;
-    s->write = file_write;
-
     migrate_fd_connect(s);
 }
 
diff --git a/migration-fd.c b/migration-fd.c
index a99e0e3971..3d4613cbaf 100644
--- a/migration-fd.c
+++ b/migration-fd.c
@@ -30,54 +30,13 @@
     do { } while (0)
 #endif
 
-static int fd_errno(MigrationState *s)
-{
-    return errno;
-}
-
-static int fd_write(MigrationState *s, const void * buf, size_t size)
-{
-    return write(s->fd, buf, size);
-}
-
-static int fd_close(MigrationState *s)
-{
-    struct stat st;
-    int ret;
-
-    DPRINTF("fd_close\n");
-    ret = fstat(s->fd, &st);
-    if (ret == 0 && S_ISREG(st.st_mode)) {
-        /*
-         * If the file handle is a regular file make sure the
-         * data is flushed to disk before signaling success.
-         */
-        ret = fsync(s->fd);
-        if (ret != 0) {
-            ret = -errno;
-            perror("migration-fd: fsync");
-            return ret;
-        }
-    }
-    ret = close(s->fd);
-    s->fd = -1;
-    if (ret != 0) {
-        ret = -errno;
-        perror("migration-fd: close");
-    }
-    return ret;
-}
-
 void fd_start_outgoing_migration(MigrationState *s, const char *fdname, Error **errp)
 {
-    s->fd = monitor_get_fd(cur_mon, fdname, errp);
-    if (s->fd == -1) {
+    int fd = monitor_get_fd(cur_mon, fdname, errp);
+    if (fd == -1) {
         return;
     }
-
-    s->get_error = fd_errno;
-    s->write = fd_write;
-    s->close = fd_close;
+    s->file = qemu_fdopen(fd, "wb");
 
     migrate_fd_connect(s);
 }
diff --git a/migration-tcp.c b/migration-tcp.c
index e78a296137..b20ee58f55 100644
--- a/migration-tcp.c
+++ b/migration-tcp.c
@@ -29,49 +29,24 @@
     do { } while (0)
 #endif
 
-static int socket_errno(MigrationState *s)
-{
-    return socket_error();
-}
-
-static int socket_write(MigrationState *s, const void * buf, size_t size)
-{
-    return send(s->fd, buf, size, 0);
-}
-
-static int tcp_close(MigrationState *s)
-{
-    int r = 0;
-    DPRINTF("tcp_close\n");
-    if (closesocket(s->fd) < 0) {
-        r = -socket_error();
-    }
-    return r;
-}
-
 static void tcp_wait_for_connect(int fd, void *opaque)
 {
     MigrationState *s = opaque;
 
     if (fd < 0) {
         DPRINTF("migrate connect error\n");
-        s->fd = -1;
+        s->file = NULL;
         migrate_fd_error(s);
     } else {
         DPRINTF("migrate connect success\n");
-        s->fd = fd;
-        socket_set_block(s->fd);
+        s->file = qemu_fopen_socket(fd, "wb");
         migrate_fd_connect(s);
     }
 }
 
 void tcp_start_outgoing_migration(MigrationState *s, const char *host_port, Error **errp)
 {
-    s->get_error = socket_errno;
-    s->write = socket_write;
-    s->close = tcp_close;
-
-    s->fd = inet_nonblocking_connect(host_port, tcp_wait_for_connect, s, errp);
+    inet_nonblocking_connect(host_port, tcp_wait_for_connect, s, errp);
 }
 
 static void tcp_accept_incoming_migration(void *opaque)
@@ -95,7 +70,7 @@ static void tcp_accept_incoming_migration(void *opaque)
         goto out;
     }
 
-    f = qemu_fopen_socket(c);
+    f = qemu_fopen_socket(c, "rb");
     if (f == NULL) {
         fprintf(stderr, "could not qemu_fopen socket\n");
         goto out;
diff --git a/migration-unix.c b/migration-unix.c
index 218835a7a4..94b7022fc8 100644
--- a/migration-unix.c
+++ b/migration-unix.c
@@ -29,49 +29,24 @@
     do { } while (0)
 #endif
 
-static int unix_errno(MigrationState *s)
-{
-    return errno;
-}
-
-static int unix_write(MigrationState *s, const void * buf, size_t size)
-{
-    return write(s->fd, buf, size);
-}
-
-static int unix_close(MigrationState *s)
-{
-    int r = 0;
-    DPRINTF("unix_close\n");
-    if (close(s->fd) < 0) {
-        r = -errno;
-    }
-    return r;
-}
-
 static void unix_wait_for_connect(int fd, void *opaque)
 {
     MigrationState *s = opaque;
 
     if (fd < 0) {
         DPRINTF("migrate connect error\n");
-        s->fd = -1;
+        s->file = NULL;
         migrate_fd_error(s);
     } else {
         DPRINTF("migrate connect success\n");
-        s->fd = fd;
-        socket_set_block(s->fd);
+        s->file = qemu_fopen_socket(fd, "wb");
         migrate_fd_connect(s);
     }
 }
 
 void unix_start_outgoing_migration(MigrationState *s, const char *path, Error **errp)
 {
-    s->get_error = unix_errno;
-    s->write = unix_write;
-    s->close = unix_close;
-
-    s->fd = unix_nonblocking_connect(path, unix_wait_for_connect, s, errp);
+    unix_nonblocking_connect(path, unix_wait_for_connect, s, errp);
 }
 
 static void unix_accept_incoming_migration(void *opaque)
@@ -95,7 +70,7 @@ static void unix_accept_incoming_migration(void *opaque)
         goto out;
     }
 
-    f = qemu_fopen_socket(c);
+    f = qemu_fopen_socket(c, "rb");
     if (f == NULL) {
         fprintf(stderr, "could not qemu_fopen socket\n");
         goto out;
diff --git a/migration.c b/migration.c
index 11725ae3fc..185d11260d 100644
--- a/migration.c
+++ b/migration.c
@@ -23,6 +23,7 @@
 #include "migration/block.h"
 #include "qemu/thread.h"
 #include "qmp-commands.h"
+#include "trace.h"
 
 //#define DEBUG_MIGRATION
 
@@ -260,81 +261,54 @@ void qmp_migrate_set_capabilities(MigrationCapabilityStatusList *params,
 
 /* shared migration helpers */
 
-static int migrate_fd_cleanup(MigrationState *s)
+static void migrate_fd_cleanup(void *opaque)
 {
-    int ret = 0;
+    MigrationState *s = opaque;
+
+    qemu_bh_delete(s->cleanup_bh);
+    s->cleanup_bh = NULL;
 
     if (s->file) {
         DPRINTF("closing file\n");
-        ret = qemu_fclose(s->file);
+        qemu_mutex_unlock_iothread();
+        qemu_thread_join(&s->thread);
+        qemu_mutex_lock_iothread();
+
+        qemu_fclose(s->file);
         s->file = NULL;
     }
 
-    assert(s->fd == -1);
-    return ret;
-}
+    assert(s->state != MIG_STATE_ACTIVE);
+
+    if (s->state != MIG_STATE_COMPLETED) {
+        qemu_savevm_state_cancel();
+    }
 
-void migrate_fd_error(MigrationState *s)
-{
-    DPRINTF("setting error state\n");
-    s->state = MIG_STATE_ERROR;
     notifier_list_notify(&migration_state_notifiers, s);
-    migrate_fd_cleanup(s);
 }
 
-static void migrate_fd_completed(MigrationState *s)
+static void migrate_finish_set_state(MigrationState *s, int new_state)
 {
-    DPRINTF("setting completed state\n");
-    if (migrate_fd_cleanup(s) < 0) {
-        s->state = MIG_STATE_ERROR;
-    } else {
-        s->state = MIG_STATE_COMPLETED;
-        runstate_set(RUN_STATE_POSTMIGRATE);
+    if (__sync_val_compare_and_swap(&s->state, MIG_STATE_ACTIVE,
+                                    new_state) == new_state) {
+        trace_migrate_set_state(new_state);
     }
-    notifier_list_notify(&migration_state_notifiers, s);
 }
 
-static ssize_t migrate_fd_put_buffer(MigrationState *s, const void *data,
-                                     size_t size)
+void migrate_fd_error(MigrationState *s)
 {
-    ssize_t ret;
-
-    if (s->state != MIG_STATE_ACTIVE) {
-        return -EIO;
-    }
-
-    do {
-        ret = s->write(s, data, size);
-    } while (ret == -1 && ((s->get_error(s)) == EINTR));
-
-    if (ret == -1)
-        ret = -(s->get_error(s));
-
-    return ret;
+    DPRINTF("setting error state\n");
+    assert(s->file == NULL);
+    s->state = MIG_STATE_ERROR;
+    trace_migrate_set_state(MIG_STATE_ERROR);
+    notifier_list_notify(&migration_state_notifiers, s);
 }
 
 static void migrate_fd_cancel(MigrationState *s)
 {
-    if (s->state != MIG_STATE_ACTIVE)
-        return;
-
     DPRINTF("cancelling migration\n");
 
-    s->state = MIG_STATE_CANCELLED;
-    notifier_list_notify(&migration_state_notifiers, s);
-    qemu_savevm_state_cancel();
-
-    migrate_fd_cleanup(s);
-}
-
-int migrate_fd_close(MigrationState *s)
-{
-    int rc = 0;
-    if (s->fd != -1) {
-        rc = s->close(s);
-        s->fd = -1;
-    }
-    return rc;
+    migrate_finish_set_state(s, MIG_STATE_CANCELLED);
 }
 
 void add_migration_state_change_notifier(Notifier *notify)
@@ -382,8 +356,9 @@ static MigrationState *migrate_init(const MigrationParams *params)
 
     s->bandwidth_limit = bandwidth_limit;
     s->state = MIG_STATE_SETUP;
-    s->total_time = qemu_get_clock_ms(rt_clock);
+    trace_migrate_set_state(MIG_STATE_SETUP);
 
+    s->total_time = qemu_get_clock_ms(rt_clock);
     return s;
 }
 
@@ -480,10 +455,15 @@ void qmp_migrate_set_speed(int64_t value, Error **errp)
     if (value < 0) {
         value = 0;
     }
+    if (value > SIZE_MAX) {
+        value = SIZE_MAX;
+    }
 
     s = migrate_get_current();
     s->bandwidth_limit = value;
-    qemu_file_set_rate_limit(s->file, s->bandwidth_limit);
+    if (s->file) {
+        qemu_file_set_rate_limit(s->file, s->bandwidth_limit / XFER_LIMIT_RATIO);
+    }
 }
 
 void qmp_migrate_set_downtime(double value, Error **errp)
@@ -513,224 +493,53 @@ int64_t migrate_xbzrle_cache_size(void)
 
 /* migration thread support */
 
-
-static ssize_t buffered_flush(MigrationState *s)
-{
-    size_t offset = 0;
-    ssize_t ret = 0;
-
-    DPRINTF("flushing %zu byte(s) of data\n", s->buffer_size);
-
-    while (s->bytes_xfer < s->xfer_limit && offset < s->buffer_size) {
-        size_t to_send = MIN(s->buffer_size - offset, s->xfer_limit - s->bytes_xfer);
-        ret = migrate_fd_put_buffer(s, s->buffer + offset, to_send);
-        if (ret <= 0) {
-            DPRINTF("error flushing data, %zd\n", ret);
-            break;
-        } else {
-            DPRINTF("flushed %zd byte(s)\n", ret);
-            offset += ret;
-            s->bytes_xfer += ret;
-        }
-    }
-
-    DPRINTF("flushed %zu of %zu byte(s)\n", offset, s->buffer_size);
-    memmove(s->buffer, s->buffer + offset, s->buffer_size - offset);
-    s->buffer_size -= offset;
-
-    if (ret < 0) {
-        return ret;
-    }
-    return offset;
-}
-
-static int buffered_put_buffer(void *opaque, const uint8_t *buf,
-                               int64_t pos, int size)
-{
-    MigrationState *s = opaque;
-    ssize_t error;
-
-    DPRINTF("putting %d bytes at %" PRId64 "\n", size, pos);
-
-    error = qemu_file_get_error(s->file);
-    if (error) {
-        DPRINTF("flush when error, bailing: %s\n", strerror(-error));
-        return error;
-    }
-
-    if (size <= 0) {
-        return size;
-    }
-
-    if (size > (s->buffer_capacity - s->buffer_size)) {
-        DPRINTF("increasing buffer capacity from %zu by %zu\n",
-                s->buffer_capacity, size + 1024);
-
-        s->buffer_capacity += size + 1024;
-
-        s->buffer = g_realloc(s->buffer, s->buffer_capacity);
-    }
-
-    memcpy(s->buffer + s->buffer_size, buf, size);
-    s->buffer_size += size;
-
-    return size;
-}
-
-static int buffered_close(void *opaque)
-{
-    MigrationState *s = opaque;
-    ssize_t ret = 0;
-    int ret2;
-
-    DPRINTF("closing\n");
-
-    s->xfer_limit = INT_MAX;
-    while (!qemu_file_get_error(s->file) && s->buffer_size) {
-        ret = buffered_flush(s);
-        if (ret < 0) {
-            break;
-        }
-    }
-
-    ret2 = migrate_fd_close(s);
-    if (ret >= 0) {
-        ret = ret2;
-    }
-    s->complete = true;
-    return ret;
-}
-
-static int buffered_get_fd(void *opaque)
-{
-    MigrationState *s = opaque;
-
-    return s->fd;
-}
-
-/*
- * The meaning of the return values is:
- *   0: We can continue sending
- *   1: Time to stop
- *   negative: There has been an error
- */
-static int buffered_rate_limit(void *opaque)
-{
-    MigrationState *s = opaque;
-    int ret;
-
-    ret = qemu_file_get_error(s->file);
-    if (ret) {
-        return ret;
-    }
-
-    if (s->bytes_xfer >= s->xfer_limit) {
-        return 1;
-    }
-
-    return 0;
-}
-
-static int64_t buffered_set_rate_limit(void *opaque, int64_t new_rate)
-{
-    MigrationState *s = opaque;
-    if (qemu_file_get_error(s->file)) {
-        goto out;
-    }
-    if (new_rate > SIZE_MAX) {
-        new_rate = SIZE_MAX;
-    }
-
-    s->xfer_limit = new_rate / XFER_LIMIT_RATIO;
-
-out:
-    return s->xfer_limit;
-}
-
-static int64_t buffered_get_rate_limit(void *opaque)
-{
-    MigrationState *s = opaque;
-
-    return s->xfer_limit;
-}
-
-static void *buffered_file_thread(void *opaque)
+static void *migration_thread(void *opaque)
 {
     MigrationState *s = opaque;
     int64_t initial_time = qemu_get_clock_ms(rt_clock);
     int64_t sleep_time = 0;
+    int64_t initial_bytes = 0;
     int64_t max_size = 0;
-    bool last_round = false;
-    int ret;
+    int64_t start_time = initial_time;
+    bool old_vm_running = false;
 
-    qemu_mutex_lock_iothread();
     DPRINTF("beginning savevm\n");
-    ret = qemu_savevm_state_begin(s->file, &s->params);
-    if (ret < 0) {
-        DPRINTF("failed, %d\n", ret);
-        qemu_mutex_unlock_iothread();
-        goto out;
-    }
-    qemu_mutex_unlock_iothread();
+    qemu_savevm_state_begin(s->file, &s->params);
 
-    while (true) {
+    while (s->state == MIG_STATE_ACTIVE) {
         int64_t current_time;
         uint64_t pending_size;
 
-        qemu_mutex_lock_iothread();
-        if (s->state != MIG_STATE_ACTIVE) {
-            DPRINTF("put_ready returning because of non-active state\n");
-            qemu_mutex_unlock_iothread();
-            break;
-        }
-        if (s->complete) {
-            qemu_mutex_unlock_iothread();
-            break;
-        }
-        if (s->bytes_xfer < s->xfer_limit) {
+        if (!qemu_file_rate_limit(s->file)) {
             DPRINTF("iterate\n");
             pending_size = qemu_savevm_state_pending(s->file, max_size);
             DPRINTF("pending size %lu max %lu\n", pending_size, max_size);
             if (pending_size && pending_size >= max_size) {
-                ret = qemu_savevm_state_iterate(s->file);
-                if (ret < 0) {
-                    qemu_mutex_unlock_iothread();
-                    break;
-                }
+                qemu_savevm_state_iterate(s->file);
             } else {
-                int old_vm_running = runstate_is_running();
-                int64_t start_time, end_time;
-
                 DPRINTF("done iterating\n");
+                qemu_mutex_lock_iothread();
                 start_time = qemu_get_clock_ms(rt_clock);
                 qemu_system_wakeup_request(QEMU_WAKEUP_REASON_OTHER);
-                if (old_vm_running) {
-                    vm_stop(RUN_STATE_FINISH_MIGRATE);
-                } else {
-                    vm_stop_force_state(RUN_STATE_FINISH_MIGRATE);
-                }
-                ret = qemu_savevm_state_complete(s->file);
-                if (ret < 0) {
-                    qemu_mutex_unlock_iothread();
+                old_vm_running = runstate_is_running();
+                vm_stop_force_state(RUN_STATE_FINISH_MIGRATE);
+                qemu_file_set_rate_limit(s->file, INT_MAX);
+                qemu_savevm_state_complete(s->file);
+                qemu_mutex_unlock_iothread();
+                if (!qemu_file_get_error(s->file)) {
+                    migrate_finish_set_state(s, MIG_STATE_COMPLETED);
                     break;
-                } else {
-                    migrate_fd_completed(s);
                 }
-                end_time = qemu_get_clock_ms(rt_clock);
-                s->total_time = end_time - s->total_time;
-                s->downtime = end_time - start_time;
-                if (s->state != MIG_STATE_COMPLETED) {
-                    if (old_vm_running) {
-                        vm_start();
-                    }
-                }
-                last_round = true;
             }
         }
-        qemu_mutex_unlock_iothread();
+
+        if (qemu_file_get_error(s->file)) {
+            migrate_finish_set_state(s, MIG_STATE_ERROR);
+            break;
+        }
         current_time = qemu_get_clock_ms(rt_clock);
         if (current_time >= initial_time + BUFFER_DELAY) {
-            uint64_t transferred_bytes = s->bytes_xfer;
+            uint64_t transferred_bytes = qemu_ftell(s->file) - initial_bytes;
             uint64_t time_spent = current_time - initial_time - sleep_time;
             double bandwidth = transferred_bytes / time_spent;
             max_size = bandwidth * migrate_max_downtime() / 1000000;
@@ -744,54 +553,48 @@ static void *buffered_file_thread(void *opaque)
                 s->expected_downtime = s->dirty_bytes_rate / bandwidth;
             }
 
-            s->bytes_xfer = 0;
+            qemu_file_reset_rate_limit(s->file);
             sleep_time = 0;
             initial_time = current_time;
+            initial_bytes = qemu_ftell(s->file);
         }
-        if (!last_round && (s->bytes_xfer >= s->xfer_limit)) {
+        if (qemu_file_rate_limit(s->file)) {
             /* usleep expects microseconds */
             g_usleep((initial_time + BUFFER_DELAY - current_time)*1000);
             sleep_time += qemu_get_clock_ms(rt_clock) - current_time;
         }
-        ret = buffered_flush(s);
-        if (ret < 0) {
-            break;
-        }
     }
 
-out:
-    if (ret < 0) {
-        migrate_fd_error(s);
+    qemu_mutex_lock_iothread();
+    if (s->state == MIG_STATE_COMPLETED) {
+        int64_t end_time = qemu_get_clock_ms(rt_clock);
+        s->total_time = end_time - s->total_time;
+        s->downtime = end_time - start_time;
+        runstate_set(RUN_STATE_POSTMIGRATE);
+    } else {
+        if (old_vm_running) {
+            vm_start();
+        }
     }
-    g_free(s->buffer);
+    qemu_bh_schedule(s->cleanup_bh);
+    qemu_mutex_unlock_iothread();
+
     return NULL;
 }
 
-static const QEMUFileOps buffered_file_ops = {
-    .get_fd =         buffered_get_fd,
-    .put_buffer =     buffered_put_buffer,
-    .close =          buffered_close,
-    .rate_limit =     buffered_rate_limit,
-    .get_rate_limit = buffered_get_rate_limit,
-    .set_rate_limit = buffered_set_rate_limit,
-};
-
 void migrate_fd_connect(MigrationState *s)
 {
     s->state = MIG_STATE_ACTIVE;
-    s->bytes_xfer = 0;
-    s->buffer = NULL;
-    s->buffer_size = 0;
-    s->buffer_capacity = 0;
+    trace_migrate_set_state(MIG_STATE_ACTIVE);
+
     /* This is a best 1st approximation. ns to ms */
     s->expected_downtime = max_downtime/1000000;
+    s->cleanup_bh = qemu_bh_new(migrate_fd_cleanup, s);
 
-    s->xfer_limit = s->bandwidth_limit / XFER_LIMIT_RATIO;
-    s->complete = false;
-
-    s->file = qemu_fopen_ops(s, &buffered_file_ops);
+    qemu_file_set_rate_limit(s->file,
+                             s->bandwidth_limit / XFER_LIMIT_RATIO);
 
-    qemu_thread_create(&s->thread, buffered_file_thread, s,
-                       QEMU_THREAD_DETACHED);
+    qemu_thread_create(&s->thread, migration_thread, s,
+                       QEMU_THREAD_JOINABLE);
     notifier_list_notify(&migration_state_notifiers, s);
 }
diff --git a/page_cache.c b/page_cache.c
index ba5640bd73..938a79c9ea 100644
--- a/page_cache.c
+++ b/page_cache.c
@@ -152,11 +152,14 @@ void cache_insert(PageCache *cache, uint64_t addr, uint8_t *pdata)
     /* actual update of entry */
     it = cache_get_by_addr(cache, addr);
 
+    /* free old cached data if any */
+    g_free(it->it_data);
+
     if (!it->it_data) {
         cache->num_items++;
     }
 
-    it->it_data = pdata;
+    it->it_data = g_memdup(pdata, cache->page_size);
     it->it_age = ++cache->max_item_age;
     it->it_addr = addr;
 }
@@ -192,22 +195,22 @@ int64_t cache_resize(PageCache *cache, int64_t new_num_pages)
         if (old_it->it_addr != -1) {
             /* check for collision, if there is, keep MRU page */
             new_it = cache_get_by_addr(new_cache, old_it->it_addr);
-            if (new_it->it_data) {
+            if (new_it->it_data && new_it->it_age >= old_it->it_age) {
                 /* keep the MRU page */
-                if (new_it->it_age >= old_it->it_age) {
-                    g_free(old_it->it_data);
-                } else {
-                    g_free(new_it->it_data);
-                    new_it->it_data = old_it->it_data;
-                    new_it->it_age = old_it->it_age;
-                    new_it->it_addr = old_it->it_addr;
-                }
+                g_free(old_it->it_data);
             } else {
-                cache_insert(new_cache, old_it->it_addr, old_it->it_data);
+                if (!new_it->it_data) {
+                    new_cache->num_items++;
+                }
+                g_free(new_it->it_data);
+                new_it->it_data = old_it->it_data;
+                new_it->it_age = old_it->it_age;
+                new_it->it_addr = old_it->it_addr;
             }
         }
     }
 
+    g_free(cache->page_cache);
     cache->page_cache = new_cache->page_cache;
     cache->max_num_items = new_cache->max_num_items;
     cache->num_items = new_cache->num_items;
diff --git a/savevm.c b/savevm.c
index a8a53efc9b..147e2d232e 100644
--- a/savevm.c
+++ b/savevm.c
@@ -119,8 +119,11 @@ struct QEMUFile {
     void *opaque;
     int is_write;
 
-    int64_t buf_offset; /* start of buffer when writing, end of buffer
-                           when reading */
+    int64_t bytes_xfer;
+    int64_t xfer_limit;
+
+    int64_t pos; /* start of buffer when writing, end of buffer
+                    when reading */
     int buf_index;
     int buf_size; /* 0 when writing */
     uint8_t buf[IO_BUF_SIZE];
@@ -198,6 +201,18 @@ static int socket_get_buffer(void *opaque, uint8_t *buf, int64_t pos, int size)
     return len;
 }
 
+static int socket_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size)
+{
+    QEMUFileSocket *s = opaque;
+    ssize_t len;
+
+    len = qemu_send_full(s->fd, buf, size, 0);
+    if (len < size) {
+        len = -socket_error();
+    }
+    return len;
+}
+
 static int socket_close(void *opaque)
 {
     QEMUFileSocket *s = opaque;
@@ -247,6 +262,9 @@ static int stdio_pclose(void *opaque)
     ret = pclose(s->stdio_file);
     if (ret == -1) {
         ret = -errno;
+    } else if (!WIFEXITED(ret) || WEXITSTATUS(ret) != 0) {
+        /* close succeeded, but non-zero exit code: */
+        ret = -EIO; /* fake errno value */
     }
     g_free(s);
     return ret;
@@ -256,6 +274,24 @@ static int stdio_fclose(void *opaque)
 {
     QEMUFileStdio *s = opaque;
     int ret = 0;
+
+    if (s->file->ops->put_buffer) {
+        int fd = fileno(s->stdio_file);
+        struct stat st;
+
+        ret = fstat(fd, &st);
+        if (ret == 0 && S_ISREG(st.st_mode)) {
+            /*
+             * If the file handle is a regular file make sure the
+             * data is flushed to disk before signaling success.
+             */
+            ret = fsync(fd);
+            if (ret != 0) {
+                ret = -errno;
+                return ret;
+            }
+        }
+    }
     if (fclose(s->stdio_file) == EOF) {
         ret = -errno;
     }
@@ -275,11 +311,17 @@ static const QEMUFileOps stdio_pipe_write_ops = {
     .close =      stdio_pclose
 };
 
-QEMUFile *qemu_popen(FILE *stdio_file, const char *mode)
+QEMUFile *qemu_popen_cmd(const char *command, const char *mode)
 {
+    FILE *stdio_file;
     QEMUFileStdio *s;
 
-    if (stdio_file == NULL || mode == NULL || (mode[0] != 'r' && mode[0] != 'w') || mode[1] != 0) {
+    stdio_file = popen(command, mode);
+    if (stdio_file == NULL) {
+        return NULL;
+    }
+
+    if (mode == NULL || (mode[0] != 'r' && mode[0] != 'w') || mode[1] != 0) {
         fprintf(stderr, "qemu_popen: Argument validity check failed\n");
         return NULL;
     }
@@ -296,18 +338,6 @@ QEMUFile *qemu_popen(FILE *stdio_file, const char *mode)
     return s->file;
 }
 
-QEMUFile *qemu_popen_cmd(const char *command, const char *mode)
-{
-    FILE *popen_file;
-
-    popen_file = popen(command, mode);
-    if(popen_file == NULL) {
-        return NULL;
-    }
-
-    return qemu_popen(popen_file, mode);
-}
-
 static const QEMUFileOps stdio_file_read_ops = {
     .get_fd =     stdio_get_fd,
     .get_buffer = stdio_get_buffer,
@@ -354,12 +384,30 @@ static const QEMUFileOps socket_read_ops = {
     .close =      socket_close
 };
 
-QEMUFile *qemu_fopen_socket(int fd)
+static const QEMUFileOps socket_write_ops = {
+    .get_fd =     socket_get_fd,
+    .put_buffer = socket_put_buffer,
+    .close =      socket_close
+};
+
+QEMUFile *qemu_fopen_socket(int fd, const char *mode)
 {
     QEMUFileSocket *s = g_malloc0(sizeof(QEMUFileSocket));
 
+    if (mode == NULL ||
+        (mode[0] != 'r' && mode[0] != 'w') ||
+        mode[1] != 'b' || mode[2] != 0) {
+        fprintf(stderr, "qemu_fopen: Argument validity check failed\n");
+        return NULL;
+    }
+
     s->fd = fd;
-    s->file = qemu_fopen_ops(s, &socket_read_ops);
+    if (mode[0] == 'w') {
+        socket_set_block(s->fd);
+        s->file = qemu_fopen_ops(s, &socket_write_ops);
+    } else {
+        s->file = qemu_fopen_ops(s, &socket_read_ops);
+    }
     return s->file;
 }
 
@@ -434,7 +482,6 @@ QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps *ops)
     f->opaque = opaque;
     f->ops = ops;
     f->is_write = 0;
-
     return f;
 }
 
@@ -453,21 +500,23 @@ static void qemu_file_set_error(QEMUFile *f, int ret)
 /** Flushes QEMUFile buffer
  *
  */
-static int qemu_fflush(QEMUFile *f)
+static void qemu_fflush(QEMUFile *f)
 {
     int ret = 0;
 
-    if (!f->ops->put_buffer)
-        return 0;
-
+    if (!f->ops->put_buffer) {
+        return;
+    }
     if (f->is_write && f->buf_index > 0) {
-        ret = f->ops->put_buffer(f->opaque, f->buf, f->buf_offset, f->buf_index);
+        ret = f->ops->put_buffer(f->opaque, f->buf, f->pos, f->buf_index);
         if (ret >= 0) {
-            f->buf_offset += f->buf_index;
+            f->pos += f->buf_index;
         }
         f->buf_index = 0;
     }
-    return ret;
+    if (ret < 0) {
+        qemu_file_set_error(f, ret);
+    }
 }
 
 static void qemu_fill_buffer(QEMUFile *f)
@@ -488,11 +537,11 @@ static void qemu_fill_buffer(QEMUFile *f)
     f->buf_index = 0;
     f->buf_size = pending;
 
-    len = f->ops->get_buffer(f->opaque, f->buf + pending, f->buf_offset,
+    len = f->ops->get_buffer(f->opaque, f->buf + pending, f->pos,
                         IO_BUF_SIZE - pending);
     if (len > 0) {
         f->buf_size += len;
-        f->buf_offset += len;
+        f->pos += len;
     } else if (len == 0) {
         qemu_file_set_error(f, -EIO);
     } else if (len != -EAGAIN)
@@ -518,7 +567,8 @@ int qemu_get_fd(QEMUFile *f)
 int qemu_fclose(QEMUFile *f)
 {
     int ret;
-    ret = qemu_fflush(f);
+    qemu_fflush(f);
+    ret = qemu_file_get_error(f);
 
     if (f->ops->close) {
         int ret2 = f->ops->close(f->opaque);
@@ -557,12 +607,12 @@ void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, int size)
         memcpy(f->buf + f->buf_index, buf, l);
         f->is_write = 1;
         f->buf_index += l;
+        f->bytes_xfer += l;
         buf += l;
         size -= l;
         if (f->buf_index >= IO_BUF_SIZE) {
-            int ret = qemu_fflush(f);
-            if (ret < 0) {
-                qemu_file_set_error(f, ret);
+            qemu_fflush(f);
+            if (qemu_file_get_error(f)) {
                 break;
             }
         }
@@ -584,10 +634,7 @@ void qemu_put_byte(QEMUFile *f, int v)
     f->buf[f->buf_index++] = v;
     f->is_write = 1;
     if (f->buf_index >= IO_BUF_SIZE) {
-        int ret = qemu_fflush(f);
-        if (ret < 0) {
-            qemu_file_set_error(f, ret);
-        }
+        qemu_fflush(f);
     }
 }
 
@@ -675,38 +722,34 @@ int qemu_get_byte(QEMUFile *f)
 
 int64_t qemu_ftell(QEMUFile *f)
 {
-    /* buf_offset excludes buffer for writing but includes it for reading */
-    if (f->is_write) {
-        return f->buf_offset + f->buf_index;
-    } else {
-        return f->buf_offset - f->buf_size + f->buf_index;
-    }
+    qemu_fflush(f);
+    return f->pos;
 }
 
 int qemu_file_rate_limit(QEMUFile *f)
 {
-    if (f->ops->rate_limit)
-        return f->ops->rate_limit(f->opaque);
-
+    if (qemu_file_get_error(f)) {
+        return 1;
+    }
+    if (f->xfer_limit > 0 && f->bytes_xfer > f->xfer_limit) {
+        return 1;
+    }
     return 0;
 }
 
 int64_t qemu_file_get_rate_limit(QEMUFile *f)
 {
-    if (f->ops->get_rate_limit)
-        return f->ops->get_rate_limit(f->opaque);
-
-    return 0;
+    return f->xfer_limit;
 }
 
-int64_t qemu_file_set_rate_limit(QEMUFile *f, int64_t new_rate)
+void qemu_file_set_rate_limit(QEMUFile *f, int64_t limit)
 {
-    /* any failed or completed migration keeps its state to allow probing of
-     * migration data, but has no associated file anymore */
-    if (f && f->ops->set_rate_limit)
-        return f->ops->set_rate_limit(f->opaque, new_rate);
+    f->xfer_limit = limit;
+}
 
-    return 0;
+void qemu_file_reset_rate_limit(QEMUFile *f)
+{
+    f->bytes_xfer = 0;
 }
 
 void qemu_put_be16(QEMUFile *f, unsigned int v)
@@ -1580,8 +1623,8 @@ bool qemu_savevm_state_blocked(Error **errp)
     return false;
 }
 
-int qemu_savevm_state_begin(QEMUFile *f,
-                            const MigrationParams *params)
+void qemu_savevm_state_begin(QEMUFile *f,
+                             const MigrationParams *params)
 {
     SaveStateEntry *se;
     int ret;
@@ -1621,17 +1664,10 @@ int qemu_savevm_state_begin(QEMUFile *f,
 
         ret = se->ops->save_live_setup(f, se->opaque);
         if (ret < 0) {
-            qemu_savevm_state_cancel();
-            return ret;
+            qemu_file_set_error(f, ret);
+            break;
         }
     }
-    ret = qemu_file_get_error(f);
-    if (ret != 0) {
-        qemu_savevm_state_cancel();
-    }
-
-    return ret;
-
 }
 
 /*
@@ -1665,6 +1701,9 @@ int qemu_savevm_state_iterate(QEMUFile *f)
         ret = se->ops->save_live_iterate(f, se->opaque);
         trace_savevm_section_end(se->section_id);
 
+        if (ret < 0) {
+            qemu_file_set_error(f, ret);
+        }
         if (ret <= 0) {
             /* Do not proceed to the next vmstate before this one reported
                completion of the current stage. This serializes the migration
@@ -1673,17 +1712,10 @@ int qemu_savevm_state_iterate(QEMUFile *f)
             break;
         }
     }
-    if (ret != 0) {
-        return ret;
-    }
-    ret = qemu_file_get_error(f);
-    if (ret != 0) {
-        qemu_savevm_state_cancel();
-    }
     return ret;
 }
 
-int qemu_savevm_state_complete(QEMUFile *f)
+void qemu_savevm_state_complete(QEMUFile *f)
 {
     SaveStateEntry *se;
     int ret;
@@ -1707,7 +1739,8 @@ int qemu_savevm_state_complete(QEMUFile *f)
         ret = se->ops->save_live_complete(f, se->opaque);
         trace_savevm_section_end(se->section_id);
         if (ret < 0) {
-            return ret;
+            qemu_file_set_error(f, ret);
+            return;
         }
     }
 
@@ -1735,8 +1768,7 @@ int qemu_savevm_state_complete(QEMUFile *f)
     }
 
     qemu_put_byte(f, QEMU_VM_EOF);
-
-    return qemu_file_get_error(f);
+    qemu_fflush(f);
 }
 
 uint64_t qemu_savevm_state_pending(QEMUFile *f, uint64_t max_size)
@@ -1778,27 +1810,27 @@ static int qemu_savevm_state(QEMUFile *f)
     };
 
     if (qemu_savevm_state_blocked(NULL)) {
-        ret = -EINVAL;
-        goto out;
+        return -EINVAL;
     }
 
-    ret = qemu_savevm_state_begin(f, &params);
-    if (ret < 0)
-        goto out;
-
-    do {
-        ret = qemu_savevm_state_iterate(f);
-        if (ret < 0)
-            goto out;
-    } while (ret == 0);
+    qemu_mutex_unlock_iothread();
+    qemu_savevm_state_begin(f, &params);
+    qemu_mutex_lock_iothread();
 
-    ret = qemu_savevm_state_complete(f);
+    while (qemu_file_get_error(f) == 0) {
+        if (qemu_savevm_state_iterate(f) > 0) {
+            break;
+        }
+    }
 
-out:
+    ret = qemu_file_get_error(f);
     if (ret == 0) {
+        qemu_savevm_state_complete(f);
         ret = qemu_file_get_error(f);
     }
-
+    if (ret != 0) {
+        qemu_savevm_state_cancel();
+    }
     return ret;
 }
 
diff --git a/trace-events b/trace-events
index 3064fc7767..8389d83568 100644
--- a/trace-events
+++ b/trace-events
@@ -1091,3 +1091,6 @@ css_io_interrupt(int cssid, int ssid, int schid, uint32_t intparm, uint8_t isc,
 # hw/s390x/virtio-ccw.c
 virtio_ccw_interpret_ccw(int cssid, int ssid, int schid, int cmd_code) "VIRTIO-CCW: %x.%x.%04x: interpret command %x"
 virtio_ccw_new_device(int cssid, int ssid, int schid, int devno, const char *devno_mode) "VIRTIO-CCW: add subchannel %x.%x.%04x, devno %04x (%s)"
+
+# migration.c
+migrate_set_state(int new_state) "new state %d"