diff options
Diffstat (limited to 'migration/ram-compress.c')
| -rw-r--r-- | migration/ram-compress.c | 564 |
1 files changed, 0 insertions, 564 deletions
diff --git a/migration/ram-compress.c b/migration/ram-compress.c deleted file mode 100644 index fa4388f6a6..0000000000 --- a/migration/ram-compress.c +++ /dev/null @@ -1,564 +0,0 @@ -/* - * QEMU System Emulator - * - * Copyright (c) 2003-2008 Fabrice Bellard - * Copyright (c) 2011-2015 Red Hat Inc - * - * Authors: - * Juan Quintela <quintela@redhat.com> - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL - * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ - -#include "qemu/osdep.h" -#include "qemu/cutils.h" - -#include "ram-compress.h" - -#include "qemu/error-report.h" -#include "qemu/stats64.h" -#include "migration.h" -#include "options.h" -#include "io/channel-null.h" -#include "exec/target_page.h" -#include "exec/ramblock.h" -#include "ram.h" -#include "migration-stats.h" - -static struct { - int64_t pages; - int64_t busy; - double busy_rate; - int64_t compressed_size; - double compression_rate; - /* compression statistics since the beginning of the period */ - /* amount of count that no free thread to compress data */ - uint64_t compress_thread_busy_prev; - /* amount bytes after compression */ - uint64_t compressed_size_prev; - /* amount of compressed pages */ - uint64_t compress_pages_prev; -} compression_counters; - -static CompressParam *comp_param; -static QemuThread *compress_threads; -/* comp_done_cond is used to wake up the migration thread when - * one of the compression threads has finished the compression. - * comp_done_lock is used to co-work with comp_done_cond. - */ -static QemuMutex comp_done_lock; -static QemuCond comp_done_cond; - -struct DecompressParam { - bool done; - bool quit; - QemuMutex mutex; - QemuCond cond; - void *des; - uint8_t *compbuf; - int len; - z_stream stream; -}; -typedef struct DecompressParam DecompressParam; - -static QEMUFile *decomp_file; -static DecompressParam *decomp_param; -static QemuThread *decompress_threads; -static QemuMutex decomp_done_lock; -static QemuCond decomp_done_cond; - -static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream, - RAMBlock *block, ram_addr_t offset, - uint8_t *source_buf); - -static void *do_data_compress(void *opaque) -{ - CompressParam *param = opaque; - RAMBlock *block; - ram_addr_t offset; - CompressResult result; - - qemu_mutex_lock(¶m->mutex); - while (!param->quit) { - if (param->trigger) { - block = param->block; - offset = param->offset; - param->trigger = false; - qemu_mutex_unlock(¶m->mutex); - - result = do_compress_ram_page(param->file, ¶m->stream, - block, offset, param->originbuf); - - qemu_mutex_lock(&comp_done_lock); - param->done = true; - param->result = result; - qemu_cond_signal(&comp_done_cond); - qemu_mutex_unlock(&comp_done_lock); - - qemu_mutex_lock(¶m->mutex); - } else { - qemu_cond_wait(¶m->cond, ¶m->mutex); - } - } - qemu_mutex_unlock(¶m->mutex); - - return NULL; -} - -void compress_threads_save_cleanup(void) -{ - int i, thread_count; - - if (!migrate_compress() || !comp_param) { - return; - } - - thread_count = migrate_compress_threads(); - for (i = 0; i < thread_count; i++) { - /* - * we use it as a indicator which shows if the thread is - * properly init'd or not - */ - if (!comp_param[i].file) { - break; - } - - qemu_mutex_lock(&comp_param[i].mutex); - comp_param[i].quit = true; - qemu_cond_signal(&comp_param[i].cond); - qemu_mutex_unlock(&comp_param[i].mutex); - - qemu_thread_join(compress_threads + i); - qemu_mutex_destroy(&comp_param[i].mutex); - qemu_cond_destroy(&comp_param[i].cond); - deflateEnd(&comp_param[i].stream); - g_free(comp_param[i].originbuf); - qemu_fclose(comp_param[i].file); - comp_param[i].file = NULL; - } - qemu_mutex_destroy(&comp_done_lock); - qemu_cond_destroy(&comp_done_cond); - g_free(compress_threads); - g_free(comp_param); - compress_threads = NULL; - comp_param = NULL; -} - -int compress_threads_save_setup(void) -{ - int i, thread_count; - - if (!migrate_compress()) { - return 0; - } - thread_count = migrate_compress_threads(); - compress_threads = g_new0(QemuThread, thread_count); - comp_param = g_new0(CompressParam, thread_count); - qemu_cond_init(&comp_done_cond); - qemu_mutex_init(&comp_done_lock); - for (i = 0; i < thread_count; i++) { - comp_param[i].originbuf = g_try_malloc(qemu_target_page_size()); - if (!comp_param[i].originbuf) { - goto exit; - } - - if (deflateInit(&comp_param[i].stream, - migrate_compress_level()) != Z_OK) { - g_free(comp_param[i].originbuf); - goto exit; - } - - /* comp_param[i].file is just used as a dummy buffer to save data, - * set its ops to empty. - */ - comp_param[i].file = qemu_file_new_output( - QIO_CHANNEL(qio_channel_null_new())); - comp_param[i].done = true; - comp_param[i].quit = false; - qemu_mutex_init(&comp_param[i].mutex); - qemu_cond_init(&comp_param[i].cond); - qemu_thread_create(compress_threads + i, "compress", - do_data_compress, comp_param + i, - QEMU_THREAD_JOINABLE); - } - return 0; - -exit: - compress_threads_save_cleanup(); - return -1; -} - -static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream, - RAMBlock *block, ram_addr_t offset, - uint8_t *source_buf) -{ - uint8_t *p = block->host + offset; - size_t page_size = qemu_target_page_size(); - int ret; - - assert(qemu_file_buffer_empty(f)); - - if (buffer_is_zero(p, page_size)) { - return RES_ZEROPAGE; - } - - /* - * copy it to a internal buffer to avoid it being modified by VM - * so that we can catch up the error during compression and - * decompression - */ - memcpy(source_buf, p, page_size); - ret = qemu_put_compression_data(f, stream, source_buf, page_size); - if (ret < 0) { - qemu_file_set_error(migrate_get_current()->to_dst_file, ret); - error_report("compressed data failed!"); - qemu_fflush(f); - return RES_NONE; - } - return RES_COMPRESS; -} - -static inline void compress_reset_result(CompressParam *param) -{ - param->result = RES_NONE; - param->block = NULL; - param->offset = 0; -} - -void compress_flush_data(void) -{ - int thread_count = migrate_compress_threads(); - - if (!migrate_compress()) { - return; - } - - qemu_mutex_lock(&comp_done_lock); - for (int i = 0; i < thread_count; i++) { - while (!comp_param[i].done) { - qemu_cond_wait(&comp_done_cond, &comp_done_lock); - } - } - qemu_mutex_unlock(&comp_done_lock); - - for (int i = 0; i < thread_count; i++) { - qemu_mutex_lock(&comp_param[i].mutex); - if (!comp_param[i].quit) { - CompressParam *param = &comp_param[i]; - compress_send_queued_data(param); - assert(qemu_file_buffer_empty(param->file)); - compress_reset_result(param); - } - qemu_mutex_unlock(&comp_param[i].mutex); - } -} - -static inline void set_compress_params(CompressParam *param, RAMBlock *block, - ram_addr_t offset) -{ - param->block = block; - param->offset = offset; - param->trigger = true; -} - -/* - * Return true when it compress a page - */ -bool compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset, - int (send_queued_data(CompressParam *))) -{ - int thread_count; - bool wait = migrate_compress_wait_thread(); - - thread_count = migrate_compress_threads(); - qemu_mutex_lock(&comp_done_lock); - - while (true) { - for (int i = 0; i < thread_count; i++) { - if (comp_param[i].done) { - CompressParam *param = &comp_param[i]; - qemu_mutex_lock(¶m->mutex); - param->done = false; - send_queued_data(param); - assert(qemu_file_buffer_empty(param->file)); - compress_reset_result(param); - set_compress_params(param, block, offset); - - qemu_cond_signal(¶m->cond); - qemu_mutex_unlock(¶m->mutex); - qemu_mutex_unlock(&comp_done_lock); - return true; - } - } - if (!wait) { - qemu_mutex_unlock(&comp_done_lock); - compression_counters.busy++; - return false; - } - /* - * wait for a free thread if the user specifies - * 'compress-wait-thread', otherwise we will post the page out - * in the main thread as normal page. - */ - qemu_cond_wait(&comp_done_cond, &comp_done_lock); - } -} - -/* return the size after decompression, or negative value on error */ -static int -qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len, - const uint8_t *source, size_t source_len) -{ - int err; - - err = inflateReset(stream); - if (err != Z_OK) { - return -1; - } - - stream->avail_in = source_len; - stream->next_in = (uint8_t *)source; - stream->avail_out = dest_len; - stream->next_out = dest; - - err = inflate(stream, Z_NO_FLUSH); - if (err != Z_STREAM_END) { - return -1; - } - - return stream->total_out; -} - -static void *do_data_decompress(void *opaque) -{ - DecompressParam *param = opaque; - unsigned long pagesize; - uint8_t *des; - int len, ret; - - qemu_mutex_lock(¶m->mutex); - while (!param->quit) { - if (param->des) { - des = param->des; - len = param->len; - param->des = 0; - qemu_mutex_unlock(¶m->mutex); - - pagesize = qemu_target_page_size(); - - ret = qemu_uncompress_data(¶m->stream, des, pagesize, - param->compbuf, len); - if (ret < 0 && migrate_get_current()->decompress_error_check) { - error_report("decompress data failed"); - qemu_file_set_error(decomp_file, ret); - } - - qemu_mutex_lock(&decomp_done_lock); - param->done = true; - qemu_cond_signal(&decomp_done_cond); - qemu_mutex_unlock(&decomp_done_lock); - - qemu_mutex_lock(¶m->mutex); - } else { - qemu_cond_wait(¶m->cond, ¶m->mutex); - } - } - qemu_mutex_unlock(¶m->mutex); - - return NULL; -} - -int wait_for_decompress_done(void) -{ - if (!migrate_compress()) { - return 0; - } - - int thread_count = migrate_decompress_threads(); - qemu_mutex_lock(&decomp_done_lock); - for (int i = 0; i < thread_count; i++) { - while (!decomp_param[i].done) { - qemu_cond_wait(&decomp_done_cond, &decomp_done_lock); - } - } - qemu_mutex_unlock(&decomp_done_lock); - return qemu_file_get_error(decomp_file); -} - -void compress_threads_load_cleanup(void) -{ - int i, thread_count; - - if (!migrate_compress()) { - return; - } - thread_count = migrate_decompress_threads(); - for (i = 0; i < thread_count; i++) { - /* - * we use it as a indicator which shows if the thread is - * properly init'd or not - */ - if (!decomp_param[i].compbuf) { - break; - } - - qemu_mutex_lock(&decomp_param[i].mutex); - decomp_param[i].quit = true; - qemu_cond_signal(&decomp_param[i].cond); - qemu_mutex_unlock(&decomp_param[i].mutex); - } - for (i = 0; i < thread_count; i++) { - if (!decomp_param[i].compbuf) { - break; - } - - qemu_thread_join(decompress_threads + i); - qemu_mutex_destroy(&decomp_param[i].mutex); - qemu_cond_destroy(&decomp_param[i].cond); - inflateEnd(&decomp_param[i].stream); - g_free(decomp_param[i].compbuf); - decomp_param[i].compbuf = NULL; - } - g_free(decompress_threads); - g_free(decomp_param); - decompress_threads = NULL; - decomp_param = NULL; - decomp_file = NULL; -} - -int compress_threads_load_setup(QEMUFile *f) -{ - int i, thread_count; - - if (!migrate_compress()) { - return 0; - } - - /* - * set compression_counters memory to zero for a new migration - */ - memset(&compression_counters, 0, sizeof(compression_counters)); - - thread_count = migrate_decompress_threads(); - decompress_threads = g_new0(QemuThread, thread_count); - decomp_param = g_new0(DecompressParam, thread_count); - qemu_mutex_init(&decomp_done_lock); - qemu_cond_init(&decomp_done_cond); - decomp_file = f; - for (i = 0; i < thread_count; i++) { - if (inflateInit(&decomp_param[i].stream) != Z_OK) { - goto exit; - } - - size_t compbuf_size = compressBound(qemu_target_page_size()); - decomp_param[i].compbuf = g_malloc0(compbuf_size); - qemu_mutex_init(&decomp_param[i].mutex); - qemu_cond_init(&decomp_param[i].cond); - decomp_param[i].done = true; - decomp_param[i].quit = false; - qemu_thread_create(decompress_threads + i, "decompress", - do_data_decompress, decomp_param + i, - QEMU_THREAD_JOINABLE); - } - return 0; -exit: - compress_threads_load_cleanup(); - return -1; -} - -void decompress_data_with_multi_threads(QEMUFile *f, void *host, int len) -{ - int thread_count = migrate_decompress_threads(); - QEMU_LOCK_GUARD(&decomp_done_lock); - while (true) { - for (int i = 0; i < thread_count; i++) { - if (decomp_param[i].done) { - decomp_param[i].done = false; - qemu_mutex_lock(&decomp_param[i].mutex); - qemu_get_buffer(f, decomp_param[i].compbuf, len); - decomp_param[i].des = host; - decomp_param[i].len = len; - qemu_cond_signal(&decomp_param[i].cond); - qemu_mutex_unlock(&decomp_param[i].mutex); - return; - } - } - qemu_cond_wait(&decomp_done_cond, &decomp_done_lock); - } -} - -void populate_compress(MigrationInfo *info) -{ - if (!migrate_compress()) { - return; - } - info->compression = g_malloc0(sizeof(*info->compression)); - info->compression->pages = compression_counters.pages; - info->compression->busy = compression_counters.busy; - info->compression->busy_rate = compression_counters.busy_rate; - info->compression->compressed_size = compression_counters.compressed_size; - info->compression->compression_rate = compression_counters.compression_rate; -} - -uint64_t compress_ram_pages(void) -{ - return compression_counters.pages; -} - -void update_compress_thread_counts(const CompressParam *param, int bytes_xmit) -{ - ram_transferred_add(bytes_xmit); - - if (param->result == RES_ZEROPAGE) { - stat64_add(&mig_stats.zero_pages, 1); - return; - } - - /* 8 means a header with RAM_SAVE_FLAG_CONTINUE. */ - compression_counters.compressed_size += bytes_xmit - 8; - compression_counters.pages++; -} - -void compress_update_rates(uint64_t page_count) -{ - if (!migrate_compress()) { - return; - } - compression_counters.busy_rate = (double)(compression_counters.busy - - compression_counters.compress_thread_busy_prev) / page_count; - compression_counters.compress_thread_busy_prev = - compression_counters.busy; - - double compressed_size = compression_counters.compressed_size - - compression_counters.compressed_size_prev; - if (compressed_size) { - double uncompressed_size = (compression_counters.pages - - compression_counters.compress_pages_prev) * - qemu_target_page_size(); - - /* Compression-Ratio = Uncompressed-size / Compressed-size */ - compression_counters.compression_rate = - uncompressed_size / compressed_size; - - compression_counters.compress_pages_prev = - compression_counters.pages; - compression_counters.compressed_size_prev = - compression_counters.compressed_size; - } -} |