diff options
Diffstat (limited to 'migration')
| -rw-r--r-- | migration/fd.c | 44 | ||||
| -rw-r--r-- | migration/fd.h | 2 | ||||
| -rw-r--r-- | migration/file.c | 149 | ||||
| -rw-r--r-- | migration/file.h | 8 | ||||
| -rw-r--r-- | migration/migration.c | 56 | ||||
| -rw-r--r-- | migration/multifd-zlib.c | 26 | ||||
| -rw-r--r-- | migration/multifd-zstd.c | 26 | ||||
| -rw-r--r-- | migration/multifd.c | 417 | ||||
| -rw-r--r-- | migration/multifd.h | 27 | ||||
| -rw-r--r-- | migration/options.c | 47 | ||||
| -rw-r--r-- | migration/options.h | 1 | ||||
| -rw-r--r-- | migration/qemu-file.c | 106 | ||||
| -rw-r--r-- | migration/qemu-file.h | 6 | ||||
| -rw-r--r-- | migration/ram.c | 373 | ||||
| -rw-r--r-- | migration/ram.h | 1 | ||||
| -rw-r--r-- | migration/savevm.c | 1 | ||||
| -rw-r--r-- | migration/trace-events | 2 |
17 files changed, 1146 insertions, 146 deletions
diff --git a/migration/fd.c b/migration/fd.c index 0eb677dcae..d4ae72d132 100644 --- a/migration/fd.c +++ b/migration/fd.c @@ -15,18 +15,41 @@ */ #include "qemu/osdep.h" +#include "qapi/error.h" #include "channel.h" #include "fd.h" #include "migration.h" #include "monitor/monitor.h" +#include "io/channel-file.h" #include "io/channel-util.h" +#include "options.h" #include "trace.h" +static struct FdOutgoingArgs { + int fd; +} outgoing_args; + +int fd_args_get_fd(void) +{ + return outgoing_args.fd; +} + +void fd_cleanup_outgoing_migration(void) +{ + if (outgoing_args.fd > 0) { + close(outgoing_args.fd); + outgoing_args.fd = -1; + } +} + void fd_start_outgoing_migration(MigrationState *s, const char *fdname, Error **errp) { QIOChannel *ioc; int fd = monitor_get_fd(monitor_cur(), fdname, errp); + + outgoing_args.fd = -1; + if (fd == -1) { return; } @@ -38,6 +61,8 @@ void fd_start_outgoing_migration(MigrationState *s, const char *fdname, Error ** return; } + outgoing_args.fd = fd; + qio_channel_set_name(ioc, "migration-fd-outgoing"); migration_channel_connect(s, ioc, NULL, NULL); object_unref(OBJECT(ioc)); @@ -73,4 +98,23 @@ void fd_start_incoming_migration(const char *fdname, Error **errp) fd_accept_incoming_migration, NULL, NULL, g_main_context_get_thread_default()); + + if (migrate_multifd()) { + int channels = migrate_multifd_channels(); + + while (channels--) { + ioc = QIO_CHANNEL(qio_channel_file_new_fd(dup(fd))); + + if (QIO_CHANNEL_FILE(ioc)->fd == -1) { + error_setg(errp, "Failed to duplicate fd %d", fd); + return; + } + + qio_channel_set_name(ioc, "migration-fd-incoming"); + qio_channel_add_watch_full(ioc, G_IO_IN, + fd_accept_incoming_migration, + NULL, NULL, + g_main_context_get_thread_default()); + } + } } diff --git a/migration/fd.h b/migration/fd.h index b901bc014e..0c0a18d9e7 100644 --- a/migration/fd.h +++ b/migration/fd.h @@ -20,4 +20,6 @@ void fd_start_incoming_migration(const char *fdname, Error **errp); void fd_start_outgoing_migration(MigrationState *s, const char *fdname, Error **errp); +void fd_cleanup_outgoing_migration(void); +int fd_args_get_fd(void); #endif diff --git a/migration/file.c b/migration/file.c index 5d4975f43e..164b079966 100644 --- a/migration/file.c +++ b/migration/file.c @@ -6,17 +6,25 @@ */ #include "qemu/osdep.h" +#include "exec/ramblock.h" #include "qemu/cutils.h" +#include "qemu/error-report.h" #include "qapi/error.h" #include "channel.h" +#include "fd.h" #include "file.h" #include "migration.h" #include "io/channel-file.h" #include "io/channel-util.h" +#include "options.h" #include "trace.h" #define OFFSET_OPTION ",offset=" +static struct FileOutgoingArgs { + char *fname; +} outgoing_args; + /* Remove the offset option from @filespec and return it in @offsetp. */ int file_parse_offset(char *filespec, uint64_t *offsetp, Error **errp) @@ -36,6 +44,41 @@ int file_parse_offset(char *filespec, uint64_t *offsetp, Error **errp) return 0; } +void file_cleanup_outgoing_migration(void) +{ + g_free(outgoing_args.fname); + outgoing_args.fname = NULL; +} + +bool file_send_channel_create(gpointer opaque, Error **errp) +{ + QIOChannelFile *ioc; + int flags = O_WRONLY; + bool ret = false; + int fd = fd_args_get_fd(); + + if (fd && fd != -1) { + ioc = qio_channel_file_new_fd(dup(fd)); + } else { + ioc = qio_channel_file_new_path(outgoing_args.fname, flags, 0, errp); + if (!ioc) { + goto out; + } + } + + multifd_channel_connect(opaque, QIO_CHANNEL(ioc)); + ret = true; + +out: + /* + * File channel creation is synchronous. However posting this + * semaphore here is simpler than adding a special case. + */ + multifd_send_channel_created(); + + return ret; +} + void file_start_outgoing_migration(MigrationState *s, FileMigrationArgs *file_args, Error **errp) { @@ -52,6 +95,8 @@ void file_start_outgoing_migration(MigrationState *s, return; } + outgoing_args.fname = g_strdup(filename); + ioc = QIO_CHANNEL(fioc); if (offset && qio_channel_io_seek(ioc, offset, SEEK_SET, errp) < 0) { return; @@ -74,7 +119,8 @@ void file_start_incoming_migration(FileMigrationArgs *file_args, Error **errp) g_autofree char *filename = g_strdup(file_args->filename); QIOChannelFile *fioc = NULL; uint64_t offset = file_args->offset; - QIOChannel *ioc; + int channels = 1; + int i = 0; trace_migration_file_incoming(filename); @@ -83,13 +129,100 @@ void file_start_incoming_migration(FileMigrationArgs *file_args, Error **errp) return; } - ioc = QIO_CHANNEL(fioc); - if (offset && qio_channel_io_seek(ioc, offset, SEEK_SET, errp) < 0) { + if (offset && + qio_channel_io_seek(QIO_CHANNEL(fioc), offset, SEEK_SET, errp) < 0) { return; } - qio_channel_set_name(QIO_CHANNEL(ioc), "migration-file-incoming"); - qio_channel_add_watch_full(ioc, G_IO_IN, - file_accept_incoming_migration, - NULL, NULL, - g_main_context_get_thread_default()); + + if (migrate_multifd()) { + channels += migrate_multifd_channels(); + } + + do { + QIOChannel *ioc = QIO_CHANNEL(fioc); + + qio_channel_set_name(ioc, "migration-file-incoming"); + qio_channel_add_watch_full(ioc, G_IO_IN, + file_accept_incoming_migration, + NULL, NULL, + g_main_context_get_thread_default()); + + fioc = qio_channel_file_new_fd(dup(fioc->fd)); + + if (!fioc || fioc->fd == -1) { + error_setg(errp, "Error creating migration incoming channel"); + break; + } + } while (++i < channels); +} + +int file_write_ramblock_iov(QIOChannel *ioc, const struct iovec *iov, + int niov, RAMBlock *block, Error **errp) +{ + ssize_t ret = -1; + int i, slice_idx, slice_num; + uintptr_t base, next, offset; + size_t len; + + slice_idx = 0; + slice_num = 1; + + /* + * If the iov array doesn't have contiguous elements, we need to + * split it in slices because we only have one file offset for the + * whole iov. Do this here so callers don't need to break the iov + * array themselves. + */ + for (i = 0; i < niov; i++, slice_num++) { + base = (uintptr_t) iov[i].iov_base; + + if (i != niov - 1) { + len = iov[i].iov_len; + next = (uintptr_t) iov[i + 1].iov_base; + + if (base + len == next) { + continue; + } + } + + /* + * Use the offset of the first element of the segment that + * we're sending. + */ + offset = (uintptr_t) iov[slice_idx].iov_base - (uintptr_t) block->host; + if (offset >= block->used_length) { + error_setg(errp, "offset " RAM_ADDR_FMT + "outside of ramblock %s range", offset, block->idstr); + ret = -1; + break; + } + + ret = qio_channel_pwritev(ioc, &iov[slice_idx], slice_num, + block->pages_offset + offset, errp); + if (ret < 0) { + break; + } + + slice_idx += slice_num; + slice_num = 0; + } + + return (ret < 0) ? ret : 0; +} + +int multifd_file_recv_data(MultiFDRecvParams *p, Error **errp) +{ + MultiFDRecvData *data = p->data; + size_t ret; + + ret = qio_channel_pread(p->c, (char *) data->opaque, + data->size, data->file_offset, errp); + if (ret != data->size) { + error_prepend(errp, + "multifd recv (%u): read 0x%zx, expected 0x%zx", + p->id, ret, data->size); + return -1; + } + + return 0; } diff --git a/migration/file.h b/migration/file.h index 37d6a08bfc..9f71e87f74 100644 --- a/migration/file.h +++ b/migration/file.h @@ -9,10 +9,18 @@ #define QEMU_MIGRATION_FILE_H #include "qapi/qapi-types-migration.h" +#include "io/task.h" +#include "channel.h" +#include "multifd.h" void file_start_incoming_migration(FileMigrationArgs *file_args, Error **errp); void file_start_outgoing_migration(MigrationState *s, FileMigrationArgs *file_args, Error **errp); int file_parse_offset(char *filespec, uint64_t *offsetp, Error **errp); +void file_cleanup_outgoing_migration(void); +bool file_send_channel_create(gpointer opaque, Error **errp); +int file_write_ramblock_iov(QIOChannel *ioc, const struct iovec *iov, + int niov, RAMBlock *block, Error **errp); +int multifd_file_recv_data(MultiFDRecvParams *p, Error **errp); #endif diff --git a/migration/migration.c b/migration/migration.c index bab68bcbef..a49fcd53ee 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -140,9 +140,38 @@ static bool transport_supports_multi_channels(MigrationAddress *addr) if (addr->transport == MIGRATION_ADDRESS_TYPE_SOCKET) { SocketAddress *saddr = &addr->u.socket; - return saddr->type == SOCKET_ADDRESS_TYPE_INET || - saddr->type == SOCKET_ADDRESS_TYPE_UNIX || - saddr->type == SOCKET_ADDRESS_TYPE_VSOCK; + if (saddr->type == SOCKET_ADDRESS_TYPE_FD) { + return migrate_mapped_ram(); + } + + return (saddr->type == SOCKET_ADDRESS_TYPE_INET || + saddr->type == SOCKET_ADDRESS_TYPE_UNIX || + saddr->type == SOCKET_ADDRESS_TYPE_VSOCK); + } else if (addr->transport == MIGRATION_ADDRESS_TYPE_FILE) { + return migrate_mapped_ram(); + } else { + return false; + } +} + +static bool migration_needs_seekable_channel(void) +{ + return migrate_mapped_ram(); +} + +static bool transport_supports_seeking(MigrationAddress *addr) +{ + if (addr->transport == MIGRATION_ADDRESS_TYPE_FILE) { + return true; + } + + /* + * At this point, the user might not yet have passed the file + * descriptor to QEMU, so we cannot know for sure whether it + * refers to a plain file or a socket. Let it through anyway. + */ + if (addr->transport == MIGRATION_ADDRESS_TYPE_SOCKET) { + return addr->u.socket.type == SOCKET_ADDRESS_TYPE_FD; } return false; @@ -152,6 +181,12 @@ static bool migration_channels_and_transport_compatible(MigrationAddress *addr, Error **errp) { + if (migration_needs_seekable_channel() && + !transport_supports_seeking(addr)) { + error_setg(errp, "Migration requires seekable transport (e.g. file)"); + return false; + } + if (migration_needs_multiple_sockets() && !transport_supports_multi_channels(addr)) { error_setg(errp, "Migration requires multi-channel URIs (e.g. tcp)"); @@ -881,7 +916,8 @@ void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp) uint32_t channel_magic = 0; int ret = 0; - if (migrate_multifd() && !migrate_postcopy_ram() && + if (migrate_multifd() && !migrate_mapped_ram() && + !migrate_postcopy_ram() && qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_READ_MSG_PEEK)) { /* * With multiple channels, it is possible that we receive channels @@ -1950,6 +1986,18 @@ static bool migrate_prepare(MigrationState *s, bool blk, bool blk_inc, return false; } + if (migrate_mapped_ram()) { + if (migrate_tls()) { + error_setg(errp, "Cannot use TLS with mapped-ram"); + return false; + } + + if (migrate_multifd_compression()) { + error_setg(errp, "Cannot use compression with mapped-ram"); + return false; + } + } + if (migrate_mode_is_cpr(s)) { const char *conflict = NULL; diff --git a/migration/multifd-zlib.c b/migration/multifd-zlib.c index 012e3bdea1..6120faad65 100644 --- a/migration/multifd-zlib.c +++ b/migration/multifd-zlib.c @@ -69,7 +69,7 @@ static int zlib_send_setup(MultiFDSendParams *p, Error **errp) err_msg = "out of memory for buf"; goto err_free_zbuff; } - p->data = z; + p->compress_data = z; return 0; err_free_zbuff: @@ -92,15 +92,15 @@ err_free_z: */ static void zlib_send_cleanup(MultiFDSendParams *p, Error **errp) { - struct zlib_data *z = p->data; + struct zlib_data *z = p->compress_data; deflateEnd(&z->zs); g_free(z->zbuff); z->zbuff = NULL; g_free(z->buf); z->buf = NULL; - g_free(p->data); - p->data = NULL; + g_free(p->compress_data); + p->compress_data = NULL; } /** @@ -117,7 +117,7 @@ static void zlib_send_cleanup(MultiFDSendParams *p, Error **errp) static int zlib_send_prepare(MultiFDSendParams *p, Error **errp) { MultiFDPages_t *pages = p->pages; - struct zlib_data *z = p->data; + struct zlib_data *z = p->compress_data; z_stream *zs = &z->zs; uint32_t out_size = 0; int ret; @@ -194,7 +194,7 @@ static int zlib_recv_setup(MultiFDRecvParams *p, Error **errp) struct zlib_data *z = g_new0(struct zlib_data, 1); z_stream *zs = &z->zs; - p->data = z; + p->compress_data = z; zs->zalloc = Z_NULL; zs->zfree = Z_NULL; zs->opaque = Z_NULL; @@ -224,17 +224,17 @@ static int zlib_recv_setup(MultiFDRecvParams *p, Error **errp) */ static void zlib_recv_cleanup(MultiFDRecvParams *p) { - struct zlib_data *z = p->data; + struct zlib_data *z = p->compress_data; inflateEnd(&z->zs); g_free(z->zbuff); z->zbuff = NULL; - g_free(p->data); - p->data = NULL; + g_free(p->compress_data); + p->compress_data = NULL; } /** - * zlib_recv_pages: read the data from the channel into actual pages + * zlib_recv: read the data from the channel into actual pages * * Read the compressed buffer, and uncompress it into the actual * pages. @@ -244,9 +244,9 @@ static void zlib_recv_cleanup(MultiFDRecvParams *p) * @p: Params for the channel that we are using * @errp: pointer to an error */ -static int zlib_recv_pages(MultiFDRecvParams *p, Error **errp) +static int zlib_recv(MultiFDRecvParams *p, Error **errp) { - struct zlib_data *z = p->data; + struct zlib_data *z = p->compress_data; z_stream *zs = &z->zs; uint32_t in_size = p->next_packet_size; /* we measure the change of total_out */ @@ -319,7 +319,7 @@ static MultiFDMethods multifd_zlib_ops = { .send_prepare = zlib_send_prepare, .recv_setup = zlib_recv_setup, .recv_cleanup = zlib_recv_cleanup, - .recv_pages = zlib_recv_pages + .recv = zlib_recv }; static void multifd_zlib_register(void) diff --git a/migration/multifd-zstd.c b/migration/multifd-zstd.c index dc8fe43e94..cac236833d 100644 --- a/migration/multifd-zstd.c +++ b/migration/multifd-zstd.c @@ -52,7 +52,7 @@ static int zstd_send_setup(MultiFDSendParams *p, Error **errp) struct zstd_data *z = g_new0(struct zstd_data, 1); int res; - p->data = z; + p->compress_data = z; z->zcs = ZSTD_createCStream(); if (!z->zcs) { g_free(z); @@ -90,14 +90,14 @@ static int zstd_send_setup(MultiFDSendParams *p, Error **errp) */ static void zstd_send_cleanup(MultiFDSendParams *p, Error **errp) { - struct zstd_data *z = p->data; + struct zstd_data *z = p->compress_data; ZSTD_freeCStream(z->zcs); z->zcs = NULL; g_free(z->zbuff); z->zbuff = NULL; - g_free(p->data); - p->data = NULL; + g_free(p->compress_data); + p->compress_data = NULL; } /** @@ -114,7 +114,7 @@ static void zstd_send_cleanup(MultiFDSendParams *p, Error **errp) static int zstd_send_prepare(MultiFDSendParams *p, Error **errp) { MultiFDPages_t *pages = p->pages; - struct zstd_data *z = p->data; + struct zstd_data *z = p->compress_data; int ret; uint32_t i; @@ -183,7 +183,7 @@ static int zstd_recv_setup(MultiFDRecvParams *p, Error **errp) struct zstd_data *z = g_new0(struct zstd_data, 1); int ret; - p->data = z; + p->compress_data = z; z->zds = ZSTD_createDStream(); if (!z->zds) { g_free(z); @@ -221,18 +221,18 @@ static int zstd_recv_setup(MultiFDRecvParams *p, Error **errp) */ static void zstd_recv_cleanup(MultiFDRecvParams *p) { - struct zstd_data *z = p->data; + struct zstd_data *z = p->compress_data; ZSTD_freeDStream(z->zds); z->zds = NULL; g_free(z->zbuff); z->zbuff = NULL; - g_free(p->data); - p->data = NULL; + g_free(p->compress_data); + p->compress_data = NULL; } /** - * zstd_recv_pages: read the data from the channel into actual pages + * zstd_recv: read the data from the channel into actual pages * * Read the compressed buffer, and uncompress it into the actual * pages. @@ -242,13 +242,13 @@ static void zstd_recv_cleanup(MultiFDRecvParams *p) * @p: Params for the channel that we are using * @errp: pointer to an error */ -static int zstd_recv_pages(MultiFDRecvParams *p, Error **errp) +static int zstd_recv(MultiFDRecvParams *p, Error **errp) { uint32_t in_size = p->next_packet_size; uint32_t out_size = 0; uint32_t expected_size = p->normal_num * p->page_size; uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; - struct zstd_data *z = p->data; + struct zstd_data *z = p->compress_data; int ret; int i; @@ -310,7 +310,7 @@ static MultiFDMethods multifd_zstd_ops = { .send_prepare = zstd_send_prepare, .recv_setup = zstd_recv_setup, .recv_cleanup = zstd_recv_cleanup, - .recv_pages = zstd_recv_pages + .recv = zstd_recv }; static void multifd_zstd_register(void) diff --git a/migration/multifd.c b/migration/multifd.c index 6c07f19af1..d4a44da559 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -17,7 +17,8 @@ #include "exec/ramblock.h" #include "qemu/error-report.h" #include "qapi/error.h" -#include "ram.h" +#include "fd.h" +#include "file.h" #include "migration.h" #include "migration-stats.h" #include "socket.h" @@ -28,6 +29,7 @@ #include "threadinfo.h" #include "options.h" #include "qemu/yank.h" +#include "io/channel-file.h" #include "io/channel-socket.h" #include "yank_functions.h" @@ -81,9 +83,13 @@ struct { struct { MultiFDRecvParams *params; + MultiFDRecvData *data; /* number of created threads */ int count; - /* syncs main thread and channels */ + /* + * This is always posted by the recv threads, the migration thread + * uses it to wait for recv threads to finish assigned tasks. + */ QemuSemaphore sem_sync; /* global number of generated multifd packets */ uint64_t packet_num; @@ -92,6 +98,27 @@ struct { MultiFDMethods *ops; } *multifd_recv_state; +static bool multifd_use_packets(void) +{ + return !migrate_mapped_ram(); +} + +void multifd_send_channel_created(void) +{ + qemu_sem_post(&multifd_send_state->channels_created); +} + +static void multifd_set_file_bitmap(MultiFDSendParams *p) +{ + MultiFDPages_t *pages = p->pages; + + assert(pages->block); + + for (int i = 0; i < p->pages->num; i++) { + ramblock_set_file_bmap_atomic(pages->block, pages->offset[i]); + } +} + /* Multifd without compression */ /** @@ -122,6 +149,19 @@ static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp) return; } +static void multifd_send_prepare_iovs(MultiFDSendParams *p) +{ + MultiFDPages_t *pages = p->pages; + + for (int i = 0; i < pages->num; i++) { + p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i]; + p->iov[p->iovs_num].iov_len = p->page_size; + p->iovs_num++; + } + + p->next_packet_size = pages->num * p->page_size; +} + /** * nocomp_send_prepare: prepare date to be able to send * @@ -136,9 +176,15 @@ static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp) static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp) { bool use_zero_copy_send = migrate_zero_copy_send(); - MultiFDPages_t *pages = p->pages; int ret; + if (!multifd_use_packets()) { + multifd_send_prepare_iovs(p); + multifd_set_file_bitmap(p); + + return 0; + } + if (!use_zero_copy_send) { /* * Only !zerocopy needs the header in IOV; zerocopy will @@ -147,13 +193,7 @@ static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp) multifd_send_prepare_header(p); } - for (int i = 0; i < pages->num; i++) { - p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i]; - p->iov[p->iovs_num].iov_len = p->page_size; - p->iovs_num++; - } - - p->next_packet_size = pages->num * p->page_size; + multifd_send_prepare_iovs(p); p->flags |= MULTIFD_FLAG_NOCOMP; multifd_send_fill_packet(p); @@ -197,7 +237,7 @@ static void nocomp_recv_cleanup(MultiFDRecvParams *p) } /** - * nocomp_recv_pages: read the data from the channel into actual pages + * nocomp_recv: read the data from the channel * * For no compression we just need to read things into the correct place. * @@ -206,9 +246,15 @@ static void nocomp_recv_cleanup(MultiFDRecvParams *p) * @p: Params for the channel that we are using * @errp: pointer to an error */ -static int nocomp_recv_pages(MultiFDRecvParams *p, Error **errp) +static int nocomp_recv(MultiFDRecvParams *p, Error **errp) { - uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; + uint32_t flags; + + if (!multifd_use_packets()) { + return multifd_file_recv_data(p, errp); + } + + flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; if (flags != MULTIFD_FLAG_NOCOMP) { error_setg(errp, "multifd %u: flags received %x flags expected %x", @@ -228,7 +274,7 @@ static MultiFDMethods multifd_nocomp_ops = { .send_prepare = nocomp_send_prepare, .recv_setup = nocomp_recv_setup, .recv_cleanup = nocomp_recv_cleanup, - .recv_pages = nocomp_recv_pages + .recv = nocomp_recv }; static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = { @@ -663,6 +709,19 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) { if (p->c) { migration_ioc_unregister_yank(p->c); + /* + * An explicit close() on the channel here is normally not + * required, but can be helpful for "file:" iochannels, where it + * will include fdatasync() to make sure the data is flushed to the + * disk backend. + * + * The object_unref() cannot guarantee that because: (1) finalize() + * of the iochannel is only triggered on the last reference, and + * it's not guaranteed that we always hold the last refcount when + * reaching here, and, (2) even if finalize() is invoked, it only + * does a close(fd) without data flush. + */ + qio_channel_close(p->c, &error_abort); object_unref(OBJECT(p->c)); p->c = NULL; } @@ -684,6 +743,8 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) static void multifd_send_cleanup_state(void) { + file_cleanup_outgoing_migration(); + fd_cleanup_outgoing_migration(); socket_cleanup_outgoing_migration(); qemu_sem_destroy(&multifd_send_state->channels_created); qemu_sem_destroy(&multifd_send_state->channels_ready); @@ -795,15 +856,18 @@ static void *multifd_send_thread(void *opaque) MigrationThread *thread = NULL; Error *local_err = NULL; int ret = 0; + bool use_packets = multifd_use_packets(); thread = migration_threads_add(p->name, qemu_get_thread_id()); trace_multifd_send_thread_start(p->id); rcu_register_thread(); - if (multifd_send_initial_packet(p, &local_err) < 0) { - ret = -1; - goto out; + if (use_packets) { + if (multifd_send_initial_packet(p, &local_err) < 0) { + ret = -1; + goto out; + } } while (true) { @@ -829,8 +893,15 @@ static void *multifd_send_thread(void *opaque) break; } - ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL, - 0, p->write_flags, &local_err); + if (migrate_mapped_ram()) { + ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num, + p->pages->block, &local_err); + } else { + ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, + NULL, 0, p->write_flags, + &local_err); + } + if (ret != 0) { break; } @@ -854,16 +925,20 @@ static void *multifd_send_thread(void *opaque) * it doesn't require explicit memory barriers. */ assert(qatomic_read(&p->pending_sync)); - p->flags = MULTIFD_FLAG_SYNC; - multifd_send_fill_packet(p); - ret = qio_channel_write_all(p->c, (void *)p->packet, - p->packet_len, &local_err); - if (ret != 0) { - break; + + if (use_packets) { + p->flags = MULTIFD_FLAG_SYNC; + multifd_send_fill_packet(p); + ret = qio_channel_write_all(p->c, (void *)p->packet, + p->packet_len, &local_err); + if (ret != 0) { + break; + } + /* p->next_packet_size will always be zero for a SYNC packet */ + stat64_add(&mig_stats.multifd_bytes, p->packet_len); + p->flags = 0; } - /* p->next_packet_size will always be zero for a SYNC packet */ - stat64_add(&mig_stats.multifd_bytes, p->packet_len); - p->flags = 0; + qatomic_set(&p->pending_sync, false); qemu_sem_post(&p->sem_sync); } @@ -939,7 +1014,7 @@ static bool multifd_tls_channel_connect(MultiFDSendParams *p, return true; } -static void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc) +void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc) { qio_channel_set_delay(ioc, false); @@ -990,7 +1065,7 @@ out: * Here we're not interested whether creation succeeded, only that * it happened at all. */ - qemu_sem_post(&multifd_send_state->channels_created); + multifd_send_channel_created(); if (ret) { return; @@ -1007,9 +1082,14 @@ out: error_free(local_err); } -static void multifd_new_send_channel_create(gpointer opaque) +static bool multifd_new_send_channel_create(gpointer opaque, Error **errp) { + if (!multifd_use_packets()) { + return file_send_channel_create(opaque, errp); + } + socket_send_channel_create(multifd_new_send_channel_async, opaque); + return true; } bool multifd_send_setup(void) @@ -1018,6 +1098,7 @@ bool multifd_send_setup(void) Error *local_err = NULL; int thread_count, ret = 0; uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); + bool use_packets = multifd_use_packets(); uint8_t i; if (!migrate_multifd()) { @@ -1040,18 +1121,27 @@ bool multifd_send_setup(void) qemu_sem_init(&p->sem_sync, 0); p->id = i; p->pages = multifd_pages_init(page_count); - p->packet_len = sizeof(MultiFDPacket_t) - + sizeof(uint64_t) * page_count; - p->packet = g_malloc0(p->packet_len); - p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); - p->packet->version = cpu_to_be32(MULTIFD_VERSION); + + if (use_packets) { + p->packet_len = sizeof(MultiFDPacket_t) + + sizeof(uint64_t) * page_count; + p->packet = g_malloc0(p->packet_len); + p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); + p->packet->version = cpu_to_be32(MULTIFD_VERSION); + + /* We need one extra place for the packet header */ + p->iov = g_new0(struct iovec, page_count + 1); + } else { + p->iov = g_new0(struct iovec, page_count); + } p->name = g_strdup_printf("multifdsend_%d", i); - /* We need one extra place for the packet header */ - p->iov = g_new0(struct iovec, page_count + 1); p->page_size = qemu_target_page_size(); p->page_count = page_count; p->write_flags = 0; - multifd_new_send_channel_create(p); + + if (!multifd_new_send_channel_create(p, &local_err)) { + return false; + } } /* @@ -1083,6 +1173,57 @@ bool multifd_send_setup(void) return true; } +bool multifd_recv(void) +{ + int i; + static int next_recv_channel; + MultiFDRecvParams *p = NULL; + MultiFDRecvData *data = multifd_recv_state->data; + + /* + * next_channel can remain from a previous migration that was + * using more channels, so ensure it doesn't overflow if the + * limit is lower now. + */ + next_recv_channel %= migrate_multifd_channels(); + for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) { + if (multifd_recv_should_exit()) { + return false; + } + + p = &multifd_recv_state->params[i]; + + if (qatomic_read(&p->pending_job) == false) { + next_recv_channel = (i + 1) % migrate_multifd_channels(); + break; + } + } + + /* + * Order pending_job read before manipulating p->data below. Pairs + * with qatomic_store_release() at multifd_recv_thread(). + */ + smp_mb_acquire(); + + assert(!p->data->size); + multifd_recv_state->data = p->data; + p->data = data; + + /* + * Order p->data update before setting pending_job. Pairs with + * qatomic_load_acquire() at multifd_recv_thread(). + */ + qatomic_store_release(&p->pending_job, true); + qemu_sem_post(&p->sem); + + return true; +} + +MultiFDRecvData *multifd_get_recv_data(void) +{ + return multifd_recv_state->data; +} + static void multifd_recv_terminate_threads(Error *err) { int i; @@ -1107,10 +1248,27 @@ static void multifd_recv_terminate_threads(Error *err) MultiFDRecvParams *p = &multifd_recv_state->params[i]; /* - * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code, - * however try to wakeup it without harm in cleanup phase. + * The migration thread and channels interact differently + * depending on the presence of packets. */ - qemu_sem_post(&p->sem_sync); + if (multifd_use_packets()) { + /* + * The channel receives as long as there are packets. When + * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the + * channel waits for the migration thread to sync. If the + * sync never happens, do it here. + */ + qemu_sem_post(&p->sem_sync); + } else { + /* + * The channel waits for the migration thread to give it + * work. When the migration thread runs out of work, it + * releases the channel and waits for any pending work to + * finish. If we reach here (e.g. due to error) before the + * work runs out, release the channel. + */ + qemu_sem_post(&p->sem); + } /* * We could arrive here for two reasons: @@ -1138,6 +1296,7 @@ static void multifd_recv_cleanup_channel(MultiFDRecvParams *p) p->c = NULL; qemu_mutex_destroy(&p->mutex); qemu_sem_destroy(&p->sem_sync); + qemu_sem_destroy(&p->sem); g_free(p->name); p->name = NULL; p->packet_len = 0; @@ -1155,6 +1314,8 @@ static void multifd_recv_cleanup_state(void) qemu_sem_destroy(&multifd_recv_state->sem_sync); g_free(multifd_recv_state->params); multifd_recv_state->params = NULL; + g_free(multifd_recv_state->data); + multifd_recv_state->data = NULL; g_free(multifd_recv_state); multifd_recv_state = NULL; } @@ -1182,18 +1343,53 @@ void multifd_recv_cleanup(void) void multifd_recv_sync_main(void) { + int thread_count = migrate_multifd_channels(); + bool file_based = !multifd_use_packets(); int i; if (!migrate_multifd()) { return; } - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDRecvParams *p = &multifd_recv_state->params[i]; - trace_multifd_recv_sync_main_wait(p->id); + /* + * File-based channels don't use packets and therefore need to + * wait for more work. Release them to start the sync. + */ + if (file_based) { + for (i = 0; i < thread_count; i++) { + MultiFDRecvParams *p = &multifd_recv_state->params[i]; + + trace_multifd_recv_sync_main_signal(p->id); + qemu_sem_post(&p->sem); + } + } + + /* + * Initiate the synchronization by waiting for all channels. + * + * For socket-based migration this means each channel has received + * the SYNC packet on the stream. + * + * For file-based migration this means each channel is done with + * the work (pending_job=false). + */ + for (i = 0; i < thread_count; i++) { + trace_multifd_recv_sync_main_wait(i); qemu_sem_wait(&multifd_recv_state->sem_sync); } - for (i = 0; i < migrate_multifd_channels(); i++) { + + if (file_based) { + /* + * For file-based loading is done in one iteration. We're + * done. + */ + return; + } + + /* + * Sync done. Release the channels for the next iteration. + */ + for (i = 0; i < thread_count; i++) { MultiFDRecvParams *p = &multifd_recv_state->params[i]; WITH_QEMU_LOCK_GUARD(&p->mutex) { @@ -1211,46 +1407,87 @@ static void *multifd_recv_thread(void *opaque) { MultiFDRecvParams *p = opaque; Error *local_err = NULL; + bool use_packets = multifd_use_packets(); int ret; trace_multifd_recv_thread_start(p->id); rcu_register_thread(); while (true) { - uint32_t flags; + uint32_t flags = 0; + bool has_data = false; + p->normal_num = 0; - if (multifd_recv_should_exit()) { - break; - } + if (use_packets) { + if (multifd_recv_should_exit()) { + break; + } - ret = qio_channel_read_all_eof(p->c, (void *)p->packet, - p->packet_len, &local_err); - if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */ - break; - } + ret = qio_channel_read_all_eof(p->c, (void *)p->packet, + p->packet_len, &local_err); + if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */ + break; + } - qemu_mutex_lock(&p->mutex); - ret = multifd_recv_unfill_packet(p, &local_err); - if (ret) { + qemu_mutex_lock(&p->mutex); + ret = multifd_recv_unfill_packet(p, &local_err); + if (ret) { + qemu_mutex_unlock(&p->mutex); + break; + } + + flags = p->flags; + /* recv methods don't know how to handle the SYNC flag */ + p->flags &= ~MULTIFD_FLAG_SYNC; + has_data = !!p->normal_num; qemu_mutex_unlock(&p->mutex); - break; - } + } else { + /* + * No packets, so we need to wait for the vmstate code to + * give us work. + */ + qemu_sem_wait(&p->sem); + + if (multifd_recv_should_exit()) { + break; + } + + /* pairs with qatomic_store_release() at multifd_recv() */ + if (!qatomic_load_acquire(&p->pending_job)) { + /* + * Migration thread did not send work, this is + * equivalent to pending_sync on the sending + * side. Post sem_sync to notify we reached this + * point. + */ + qemu_sem_post(&multifd_recv_state->sem_sync); + continue; + } - flags = p->flags; - /* recv methods don't know how to handle the SYNC flag */ - p->flags &= ~MULTIFD_FLAG_SYNC; - qemu_mutex_unlock(&p->mutex); + has_data = !!p->data->size; + } - if (p->normal_num) { - ret = multifd_recv_state->ops->recv_pages(p, &local_err); + if (has_data) { + ret = multifd_recv_state->ops->recv(p, &local_err); if (ret != 0) { break; } } - if (flags & MULTIFD_FLAG_SYNC) { - qemu_sem_post(&multifd_recv_state->sem_sync); - qemu_sem_wait(&p->sem_sync); + if (use_packets) { + if (flags & MULTIFD_FLAG_SYNC) { + qemu_sem_post(&multifd_recv_state->sem_sync); + qemu_sem_wait(&p->sem_sync); + } + } else { + p->total_normal_pages += p->data->size / qemu_target_page_size(); + p->data->size = 0; + /* + * Order data->size update before clearing + * pending_job. Pairs with smp_mb_acquire() at + * multifd_recv(). + */ + qatomic_store_release(&p->pending_job, false); } } @@ -1269,6 +1506,7 @@ int multifd_recv_setup(Error **errp) { int thread_count; uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); + bool use_packets = multifd_use_packets(); uint8_t i; /* @@ -1282,6 +1520,10 @@ int multifd_recv_setup(Error **errp) thread_count = migrate_multifd_channels(); multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); + + multifd_recv_state->data = g_new0(MultiFDRecvData, 1); + multifd_recv_state->data->size = 0; + qatomic_set(&multifd_recv_state->count, 0); qatomic_set(&multifd_recv_state->exiting, 0); qemu_sem_init(&multifd_recv_state->sem_sync, 0); @@ -1292,10 +1534,18 @@ int multifd_recv_setup(Error **errp) qemu_mutex_init(&p->mutex); qemu_sem_init(&p->sem_sync, 0); + qemu_sem_init(&p->sem, 0); + p->pending_job = false; p->id = i; - p->packet_len = sizeof(MultiFDPacket_t) - + sizeof(uint64_t) * page_count; - p->packet = g_malloc0(p->packet_len); + + p->data = g_new0(MultiFDRecvData, 1); + p->data->size = 0; + + if (use_packets) { + p->packet_len = sizeof(MultiFDPacket_t) + + sizeof(uint64_t) * page_count; + p->packet = g_malloc0(p->packet_len); + } p->name = g_strdup_printf("multifdrecv_%d", i); p->iov = g_new0(struct iovec, page_count); p->normal = g_new0(ram_addr_t, page_count); @@ -1339,18 +1589,23 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp) { MultiFDRecvParams *p; Error *local_err = NULL; + bool use_packets = multifd_use_packets(); int id; - id = multifd_recv_initial_packet(ioc, &local_err); - if (id < 0) { - multifd_recv_terminate_threads(local_err); - error_propagate_prepend(errp, local_err, - "failed to receive packet" - " via multifd channel %d: ", - qatomic_read(&multifd_recv_state->count)); - return; + if (use_packets) { + id = multifd_recv_initial_packet(ioc, &local_err); + if (id < 0) { + multifd_recv_terminate_threads(local_err); + error_propagate_prepend(errp, local_err, + "failed to receive packet" + " via multifd channel %d: ", + qatomic_read(&multifd_recv_state->count)); + return; + } + trace_multifd_recv_new_channel(id); + } else { + id = qatomic_read(&multifd_recv_state->count); } - trace_multifd_recv_new_channel(id); p = &multifd_recv_state->params[id]; if (p->c != NULL) { diff --git a/migration/multifd.h b/migration/multifd.h index b3fe27ae93..7447c2bea3 100644 --- a/migration/multifd.h +++ b/migration/multifd.h @@ -13,8 +13,13 @@ #ifndef QEMU_MIGRATION_MULTIFD_H #define QEMU_MIGRATION_MULTIFD_H +#include "ram.h" + +typedef struct MultiFDRecvData MultiFDRecvData; + bool multifd_send_setup(void); void multifd_send_shutdown(void); +void multifd_send_channel_created(void); int multifd_recv_setup(Error **errp); void multifd_recv_cleanup(void); void multifd_recv_shutdown(void); @@ -23,6 +28,8 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp); void multifd_recv_sync_main(void); int multifd_send_sync_main(void); bool multifd_queue_page(RAMBlock *block, ram_addr_t offset); +bool multifd_recv(void); +MultiFDRecvData *multifd_get_recv_data(void); /* Multifd Compression flags */ #define MULTIFD_FLAG_SYNC (1 << 0) @@ -63,6 +70,13 @@ typedef struct { RAMBlock *block; } MultiFDPages_t; +struct MultiFDRecvData { + void *opaque; + size_t size; + /* for preadv */ + off_t file_offset; +}; + typedef struct { /* Fields are only written at creating/deletion time */ /* No lock required for them, they are read only */ @@ -127,7 +141,7 @@ typedef struct { /* number of iovs used */ uint32_t iovs_num; /* used for compression methods */ - void *data; + void *compress_data; } MultiFDSendParams; typedef struct { @@ -152,6 +166,8 @@ typedef struct { /* syncs main thread and channels */ QemuSemaphore sem_sync; + /* sem where to wait for more work */ + QemuSemaphore sem; /* this mutex protects the following parameters */ QemuMutex mutex; @@ -161,6 +177,8 @@ typedef struct { uint32_t flags; /* global number of generated multifd packets */ uint64_t packet_num; + int pending_job; + MultiFDRecvData *data; /* thread local variables. No locking required */ @@ -183,7 +201,7 @@ typedef struct { /* num of non zero pages */ uint32_t normal_num; /* used for de-compression methods */ - void *data; + void *compress_data; } MultiFDRecvParams; typedef struct { @@ -197,8 +215,8 @@ typedef struct { int (*recv_setup)(MultiFDRecvParams *p, Error **errp); /* Cleanup for receiving side */ void (*recv_cleanup)(MultiFDRecvParams *p); - /* Read all pages */ - int (*recv_pages)(MultiFDRecvParams *p, Error **errp); + /* Read all data */ + int (*recv)(MultiFDRecvParams *p, Error **errp); } MultiFDMethods; void multifd_register_ops(int method, MultiFDMethods *ops); @@ -211,5 +229,6 @@ static inline void multifd_send_prepare_header(MultiFDSendParams *p) p->iovs_num++; } +void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc); #endif diff --git a/migration/options.c b/migration/options.c index 3e3e0b93b4..40eb930940 100644 --- a/migration/options.c +++ b/migration/options.c @@ -204,6 +204,7 @@ Property migration_properties[] = { DEFINE_PROP_MIG_CAP("x-switchover-ack", MIGRATION_CAPABILITY_SWITCHOVER_ACK), DEFINE_PROP_MIG_CAP("x-dirty-limit", MIGRATION_CAPABILITY_DIRTY_LIMIT), + DEFINE_PROP_MIG_CAP("mapped-ram", MIGRATION_CAPABILITY_MAPPED_RAM), DEFINE_PROP_END_OF_LIST(), }; @@ -263,6 +264,13 @@ bool migrate_events(void) return s->capabilities[MIGRATION_CAPABILITY_EVENTS]; } +bool migrate_mapped_ram(void) +{ + MigrationState *s = migrate_get_current(); + + return s->capabilities[MIGRATION_CAPABILITY_MAPPED_RAM]; +} + bool migrate_ignore_shared(void) { MigrationState *s = migrate_get_current(); @@ -645,6 +653,26 @@ bool migrate_caps_check(bool *old_caps, bool *new_caps, Error **errp) } } + if (new_caps[MIGRATION_CAPABILITY_MAPPED_RAM]) { + if (new_caps[MIGRATION_CAPABILITY_XBZRLE]) { + error_setg(errp, + "Mapped-ram migration is incompatible with xbzrle"); + return false; + } + + if (new_caps[MIGRATION_CAPABILITY_COMPRESS]) { + error_setg(errp, + "Mapped-ram migration is incompatible with compression"); + return false; + } + + if (new_caps[MIGRATION_CAPABILITY_POSTCOPY_RAM]) { + error_setg(errp, + "Mapped-ram migration is incompatible with postcopy"); + return false; + } + } + return true; } @@ -1218,6 +1246,13 @@ bool migrate_params_check(MigrationParameters *params, Error **errp) } #endif + if (migrate_mapped_ram() && + (migrate_multifd_compression() || migrate_tls())) { + error_setg(errp, + "Mapped-ram only available for non-compressed non-TLS multifd migration"); + return false; + } + if (params->has_x_vcpu_dirty_limit_period && (params->x_vcpu_dirty_limit_period < 1 || params->x_vcpu_dirty_limit_period > 1000)) { @@ -1312,6 +1347,12 @@ static void migrate_params_test_apply(MigrateSetParameters *params, if (params->has_multifd_compression) { dest->multifd_compression = params->multifd_compression; } + if (params->has_multifd_zlib_level) { + dest->multifd_zlib_level = params->multifd_zlib_level; + } + if (params->has_multifd_zstd_level) { + dest->multifd_zstd_level = params->multifd_zstd_level; + } if (params->has_xbzrle_cache_size) { dest->xbzrle_cache_size = params->xbzrle_cache_size; } @@ -1447,6 +1488,12 @@ static void migrate_params_apply(MigrateSetParameters *params, Error **errp) if (params->has_multifd_compression) { s->parameters.multifd_compression = params->multifd_compression; } + if (params->has_multifd_zlib_level) { + s->parameters.multifd_zlib_level = params->multifd_zlib_level; + } + if (params->has_multifd_zstd_level) { + s->parameters.multifd_zstd_level = params->multifd_zstd_level; + } if (params->has_xbzrle_cache_size) { s->parameters.xbzrle_cache_size = params->xbzrle_cache_size; xbzrle_cache_resize(params->xbzrle_cache_size, errp); diff --git a/migration/options.h b/migration/options.h index 246c160aee..6ddd8dad9b 100644 --- a/migration/options.h +++ b/migration/options.h @@ -31,6 +31,7 @@ bool migrate_compress(void); bool migrate_dirty_bitmaps(void); bool migrate_dirty_limit(void); bool migrate_events(void); +bool migrate_mapped_ram(void); bool migrate_ignore_shared(void); bool migrate_late_block_activate(void); bool migrate_multifd(void); diff --git a/migration/qemu-file.c b/migration/qemu-file.c index 94231ff295..b10c882629 100644 --- a/migration/qemu-file.c +++ b/migration/qemu-file.c @@ -33,6 +33,7 @@ #include "options.h" #include "qapi/error.h" #include "rdma.h" +#include "io/channel-file.h" #define IO_BUF_SIZE 32768 #define MAX_IOV_SIZE MIN_CONST(IOV_MAX, 64) @@ -255,6 +256,10 @@ static void qemu_iovec_release_ram(QEMUFile *f) memset(f->may_free, 0, sizeof(f->may_free)); } +bool qemu_file_is_seekable(QEMUFile *f) +{ + return qio_channel_has_feature(f->ioc, QIO_CHANNEL_FEATURE_SEEKABLE); +} /** * Flushes QEMUFile buffer @@ -447,6 +452,107 @@ void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, size_t size) } } +void qemu_put_buffer_at(QEMUFile *f, const uint8_t *buf, size_t buflen, + off_t pos) +{ + Error *err = NULL; + size_t ret; + + if (f->last_error) { + return; + } + + qemu_fflush(f); + ret = qio_channel_pwrite(f->ioc, (char *)buf, buflen, pos, &err); + + if (err) { + qemu_file_set_error_obj(f, -EIO, err); + return; + } + + if ((ssize_t)ret == QIO_CHANNEL_ERR_BLOCK) { + qemu_file_set_error_obj(f, -EAGAIN, NULL); + return; + } + + if (ret != buflen) { + error_setg(&err, "Partial write of size %zu, expected %zu", ret, + buflen); + qemu_file_set_error_obj(f, -EIO, err); + return; + } + + stat64_add(&mig_stats.qemu_file_transferred, buflen); + + return; +} + + +size_t qemu_get_buffer_at(QEMUFile *f, const uint8_t *buf, size_t buflen, + off_t pos) +{ + Error *err = NULL; + size_t ret; + + if (f->last_error) { + return 0; + } + + ret = qio_channel_pread(f->ioc, (char *)buf, buflen, pos, &err); + + if ((ssize_t)ret == -1 || err) { + qemu_file_set_error_obj(f, -EIO, err); + return 0; + } + + if ((ssize_t)ret == QIO_CHANNEL_ERR_BLOCK) { + qemu_file_set_error_obj(f, -EAGAIN, NULL); + return 0; + } + + if (ret != buflen) { + error_setg(&err, "Partial read of size %zu, expected %zu", ret, buflen); + qemu_file_set_error_obj(f, -EIO, err); + return 0; + } + + return ret; +} + +void qemu_set_offset(QEMUFile *f, off_t off, int whence) +{ + Error *err = NULL; + off_t ret; + + if (qemu_file_is_writable(f)) { + qemu_fflush(f); + } else { + /* Drop all cached buffers if existed; will trigger a re-fill later */ + f->buf_index = 0; + f->buf_size = 0; + } + + ret = qio_channel_io_seek(f->ioc, off, whence, &err); + if (ret == (off_t)-1) { + qemu_file_set_error_obj(f, -EIO, err); + } +} + +off_t qemu_get_offset(QEMUFile *f) +{ + Error *err = NULL; + off_t ret; + + qemu_fflush(f); + + ret = qio_channel_io_seek(f->ioc, 0, SEEK_CUR, &err); + if (ret == (off_t)-1) { + qemu_file_set_error_obj(f, -EIO, err); + } + return ret; +} + + void qemu_put_byte(QEMUFile *f, int v) { if (f->last_error) { diff --git a/migration/qemu-file.h b/migration/qemu-file.h index 8aec9fabf7..32fd4a34fd 100644 --- a/migration/qemu-file.h +++ b/migration/qemu-file.h @@ -75,6 +75,12 @@ QEMUFile *qemu_file_get_return_path(QEMUFile *f); int qemu_fflush(QEMUFile *f); void qemu_file_set_blocking(QEMUFile *f, bool block); int qemu_file_get_to_fd(QEMUFile *f, int fd, size_t size); +void qemu_set_offset(QEMUFile *f, off_t off, int whence); +off_t qemu_get_offset(QEMUFile *f); +void qemu_put_buffer_at(QEMUFile *f, const uint8_t *buf, size_t buflen, + off_t pos); +size_t qemu_get_buffer_at(QEMUFile *f, const uint8_t *buf, size_t buflen, + off_t pos); QIOChannel *qemu_file_get_ioc(QEMUFile *file); diff --git a/migration/ram.c b/migration/ram.c index 45a00b45ed..003c28e133 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -94,6 +94,24 @@ #define RAM_SAVE_FLAG_MULTIFD_FLUSH 0x200 /* We can't use any flag that is bigger than 0x200 */ +/* + * mapped-ram migration supports O_DIRECT, so we need to make sure the + * userspace buffer, the IO operation size and the file offset are + * aligned according to the underlying device's block size. The first + * two are already aligned to page size, but we need to add padding to + * the file to align the offset. We cannot read the block size + * dynamically because the migration file can be moved between + * different systems, so use 1M to cover most block sizes and to keep + * the file offset aligned at page size as well. + */ +#define MAPPED_RAM_FILE_OFFSET_ALIGNMENT 0x100000 + +/* + * When doing mapped-ram migration, this is the amount we read from + * the pages region in the migration file at a time. + */ +#define MAPPED_RAM_LOAD_BUF_SIZE 0x100000 + XBZRLECacheStats xbzrle_counters; /* used by the search for pages to send */ @@ -1126,12 +1144,18 @@ static int save_zero_page(RAMState *rs, PageSearchStatus *pss, return 0; } + stat64_add(&mig_stats.zero_pages, 1); + + if (migrate_mapped_ram()) { + /* zero pages are not transferred with mapped-ram */ + clear_bit_atomic(offset >> TARGET_PAGE_BITS, pss->block->file_bmap); + return 1; + } + len += save_page_header(pss, file, pss->block, offset | RAM_SAVE_FLAG_ZERO); qemu_put_byte(file, 0); len += 1; ram_release_page(pss->block->idstr, offset); - - stat64_add(&mig_stats.zero_pages, 1); ram_transferred_add(len); /* @@ -1189,14 +1213,20 @@ static int save_normal_page(PageSearchStatus *pss, RAMBlock *block, { QEMUFile *file = pss->pss_channel; - ram_transferred_add(save_page_header(pss, pss->pss_channel, block, - offset | RAM_SAVE_FLAG_PAGE)); - if (async) { - qemu_put_buffer_async(file, buf, TARGET_PAGE_SIZE, - migrate_release_ram() && - migration_in_postcopy()); + if (migrate_mapped_ram()) { + qemu_put_buffer_at(file, buf, TARGET_PAGE_SIZE, + block->pages_offset + offset); + set_bit(offset >> TARGET_PAGE_BITS, block->file_bmap); } else { - qemu_put_buffer(file, buf, TARGET_PAGE_SIZE); + ram_transferred_add(save_page_header(pss, pss->pss_channel, block, + offset | RAM_SAVE_FLAG_PAGE)); + if (async) { + qemu_put_buffer_async(file, buf, TARGET_PAGE_SIZE, + migrate_release_ram() && + migration_in_postcopy()); + } else { + qemu_put_buffer(file, buf, TARGET_PAGE_SIZE); + } } ram_transferred_add(TARGET_PAGE_SIZE); stat64_add(&mig_stats.normal_pages, 1); @@ -1332,14 +1362,18 @@ static int find_dirty_block(RAMState *rs, PageSearchStatus *pss) pss->block = QLIST_NEXT_RCU(pss->block, next); if (!pss->block) { if (migrate_multifd() && - !migrate_multifd_flush_after_each_section()) { + (!migrate_multifd_flush_after_each_section() || + migrate_mapped_ram())) { QEMUFile *f = rs->pss[RAM_CHANNEL_PRECOPY].pss_channel; int ret = multifd_send_sync_main(); if (ret < 0) { return ret; } - qemu_put_be64(f, RAM_SAVE_FLAG_MULTIFD_FLUSH); - qemu_fflush(f); + + if (!migrate_mapped_ram()) { + qemu_put_be64(f, RAM_SAVE_FLAG_MULTIFD_FLUSH); + qemu_fflush(f); + } } /* * If memory migration starts over, we will meet a dirtied page @@ -2778,6 +2812,9 @@ static void ram_list_init_bitmaps(void) */ block->bmap = bitmap_new(pages); bitmap_set(block->bmap, 0, pages); + if (migrate_mapped_ram()) { + block->file_bmap = bitmap_new(pages); + } block->clear_bmap_shift = shift; block->clear_bmap = bitmap_new(clear_bmap_size(pages, shift)); } @@ -2915,6 +2952,89 @@ void qemu_guest_free_page_hint(void *addr, size_t len) } } +#define MAPPED_RAM_HDR_VERSION 1 +struct MappedRamHeader { + uint32_t version; + /* + * The target's page size, so we know how many pages are in the + * bitmap. + */ + uint64_t page_size; + /* + * The offset in the migration file where the pages bitmap is + * stored. + */ + uint64_t bitmap_offset; + /* + * The offset in the migration file where the actual pages (data) + * are stored. + */ + uint64_t pages_offset; +} QEMU_PACKED; +typedef struct MappedRamHeader MappedRamHeader; + +static void mapped_ram_setup_ramblock(QEMUFile *file, RAMBlock *block) +{ + g_autofree MappedRamHeader *header = NULL; + size_t header_size, bitmap_size; + long num_pages; + + header = g_new0(MappedRamHeader, 1); + header_size = sizeof(MappedRamHeader); + + num_pages = block->used_length >> TARGET_PAGE_BITS; + bitmap_size = BITS_TO_LONGS(num_pages) * sizeof(unsigned long); + + /* + * Save the file offsets of where the bitmap and the pages should + * go as they are written at the end of migration and during the + * iterative phase, respectively. + */ + block->bitmap_offset = qemu_get_offset(file) + header_size; + block->pages_offset = ROUND_UP(block->bitmap_offset + + bitmap_size, + MAPPED_RAM_FILE_OFFSET_ALIGNMENT); + + header->version = cpu_to_be32(MAPPED_RAM_HDR_VERSION); + header->page_size = cpu_to_be64(TARGET_PAGE_SIZE); + header->bitmap_offset = cpu_to_be64(block->bitmap_offset); + header->pages_offset = cpu_to_be64(block->pages_offset); + + qemu_put_buffer(file, (uint8_t *) header, header_size); + + /* prepare offset for next ramblock */ + qemu_set_offset(file, block->pages_offset + block->used_length, SEEK_SET); +} + +static bool mapped_ram_read_header(QEMUFile *file, MappedRamHeader *header, + Error **errp) +{ + size_t ret, header_size = sizeof(MappedRamHeader); + + ret = qemu_get_buffer(file, (uint8_t *)header, header_size); + if (ret != header_size) { + error_setg(errp, "Could not read whole mapped-ram migration header " + "(expected %zd, got %zd bytes)", header_size, ret); + return false; + } + + /* migration stream is big-endian */ + header->version = be32_to_cpu(header->version); + + if (header->version > MAPPED_RAM_HDR_VERSION) { + error_setg(errp, "Migration mapped-ram capability version not " + "supported (expected <= %d, got %d)", MAPPED_RAM_HDR_VERSION, + header->version); + return false; + } + + header->page_size = be64_to_cpu(header->page_size); + header->bitmap_offset = be64_to_cpu(header->bitmap_offset); + header->pages_offset = be64_to_cpu(header->pages_offset); + + return true; +} + /* * Each of ram_save_setup, ram_save_iterate and ram_save_complete has * long-running RCU critical section. When rcu-reclaims in the code @@ -2934,7 +3054,7 @@ static int ram_save_setup(QEMUFile *f, void *opaque) { RAMState **rsp = opaque; RAMBlock *block; - int ret; + int ret, max_hg_page_size; if (compress_threads_save_setup()) { return -1; @@ -2949,6 +3069,12 @@ static int ram_save_setup(QEMUFile *f, void *opaque) } (*rsp)->pss[RAM_CHANNEL_PRECOPY].pss_channel = f; + /* + * ??? Mirrors the previous value of qemu_host_page_size, + * but is this really what was intended for the migration? + */ + max_hg_page_size = MAX(qemu_real_host_page_size(), TARGET_PAGE_SIZE); + WITH_RCU_READ_LOCK_GUARD() { qemu_put_be64(f, ram_bytes_total_with_ignored() | RAM_SAVE_FLAG_MEM_SIZE); @@ -2957,13 +3083,17 @@ static int ram_save_setup(QEMUFile *f, void *opaque) qemu_put_byte(f, strlen(block->idstr)); qemu_put_buffer(f, (uint8_t *)block->idstr, strlen(block->idstr)); qemu_put_be64(f, block->used_length); - if (migrate_postcopy_ram() && block->page_size != - qemu_host_page_size) { + if (migrate_postcopy_ram() && + block->page_size != max_hg_page_size) { qemu_put_be64(f, block->page_size); } if (migrate_ignore_shared()) { qemu_put_be64(f, block->mr->addr); } + + if (migrate_mapped_ram()) { + mapped_ram_setup_ramblock(f, block); + } } } @@ -2989,7 +3119,8 @@ static int ram_save_setup(QEMUFile *f, void *opaque) return ret; } - if (migrate_multifd() && !migrate_multifd_flush_after_each_section()) { + if (migrate_multifd() && !migrate_multifd_flush_after_each_section() + && !migrate_mapped_ram()) { qemu_put_be64(f, RAM_SAVE_FLAG_MULTIFD_FLUSH); } @@ -2997,6 +3128,33 @@ static int ram_save_setup(QEMUFile *f, void *opaque) return qemu_fflush(f); } +static void ram_save_file_bmap(QEMUFile *f) +{ + RAMBlock *block; + + RAMBLOCK_FOREACH_MIGRATABLE(block) { + long num_pages = block->used_length >> TARGET_PAGE_BITS; + long bitmap_size = BITS_TO_LONGS(num_pages) * sizeof(unsigned long); + + qemu_put_buffer_at(f, (uint8_t *)block->file_bmap, bitmap_size, + block->bitmap_offset); + ram_transferred_add(bitmap_size); + + /* + * Free the bitmap here to catch any synchronization issues + * with multifd channels. No channels should be sending pages + * after we've written the bitmap to file. + */ + g_free(block->file_bmap); + block->file_bmap = NULL; + } +} + +void ramblock_set_file_bmap_atomic(RAMBlock *block, ram_addr_t offset) +{ + set_bit_atomic(offset >> TARGET_PAGE_BITS, block->file_bmap); +} + /** * ram_save_iterate: iterative stage for migration * @@ -3106,7 +3264,8 @@ static int ram_save_iterate(QEMUFile *f, void *opaque) out: if (ret >= 0 && migration_is_setup_or_active(migrate_get_current()->state)) { - if (migrate_multifd() && migrate_multifd_flush_after_each_section()) { + if (migrate_multifd() && migrate_multifd_flush_after_each_section() && + !migrate_mapped_ram()) { ret = multifd_send_sync_main(); if (ret < 0) { return ret; @@ -3186,7 +3345,20 @@ static int ram_save_complete(QEMUFile *f, void *opaque) return ret; } - if (migrate_multifd() && !migrate_multifd_flush_after_each_section()) { + if (migrate_mapped_ram()) { + ram_save_file_bmap(f); + + if (qemu_file_get_error(f)) { + Error *local_err = NULL; + int err = qemu_file_get_error_obj(f, &local_err); + + error_reportf_err(local_err, "Failed to write bitmap to file: "); + return -err; + } + } + + if (migrate_multifd() && !migrate_multifd_flush_after_each_section() && + !migrate_mapped_ram()) { qemu_put_be64(f, RAM_SAVE_FLAG_MULTIFD_FLUSH); } qemu_put_be64(f, RAM_SAVE_FLAG_EOS); @@ -3786,31 +3958,165 @@ void colo_flush_ram_cache(void) trace_colo_flush_ram_cache_end(); } +static size_t ram_load_multifd_pages(void *host_addr, size_t size, + uint64_t offset) +{ + MultiFDRecvData *data = multifd_get_recv_data(); + + data->opaque = host_addr; + data->file_offset = offset; + data->size = size; + + if (!multifd_recv()) { + return 0; + } + + return size; +} + +static bool read_ramblock_mapped_ram(QEMUFile *f, RAMBlock *block, + long num_pages, unsigned long *bitmap, + Error **errp) +{ + ERRP_GUARD(); + unsigned long set_bit_idx, clear_bit_idx; + ram_addr_t offset; + void *host; + size_t read, unread, size; + + for (set_bit_idx = find_first_bit(bitmap, num_pages); + set_bit_idx < num_pages; + set_bit_idx = find_next_bit(bitmap, num_pages, clear_bit_idx + 1)) { + + clear_bit_idx = find_next_zero_bit(bitmap, num_pages, set_bit_idx + 1); + + unread = TARGET_PAGE_SIZE * (clear_bit_idx - set_bit_idx); + offset = set_bit_idx << TARGET_PAGE_BITS; + + while (unread > 0) { + host = host_from_ram_block_offset(block, offset); + if (!host) { + error_setg(errp, "page outside of ramblock %s range", + block->idstr); + return false; + } + + size = MIN(unread, MAPPED_RAM_LOAD_BUF_SIZE); + + if (migrate_multifd()) { + read = ram_load_multifd_pages(host, size, + block->pages_offset + offset); + } else { + read = qemu_get_buffer_at(f, host, size, + block->pages_offset + offset); + } + + if (!read) { + goto err; + } + offset += read; + unread -= read; + } + } + + return true; + +err: + qemu_file_get_error_obj(f, errp); + error_prepend(errp, "(%s) failed to read page " RAM_ADDR_FMT + "from file offset %" PRIx64 ": ", block->idstr, offset, + block->pages_offset + offset); + return false; +} + +static void parse_ramblock_mapped_ram(QEMUFile *f, RAMBlock *block, + ram_addr_t length, Error **errp) +{ + g_autofree unsigned long *bitmap = NULL; + MappedRamHeader header; + size_t bitmap_size; + long num_pages; + + if (!mapped_ram_read_header(f, &header, errp)) { + return; + } + + block->pages_offset = header.pages_offset; + + /* + * Check the alignment of the file region that contains pages. We + * don't enforce MAPPED_RAM_FILE_OFFSET_ALIGNMENT to allow that + * value to change in the future. Do only a sanity check with page + * size alignment. + */ + if (!QEMU_IS_ALIGNED(block->pages_offset, TARGET_PAGE_SIZE)) { + error_setg(errp, + "Error reading ramblock %s pages, region has bad alignment", + block->idstr); + return; + } + + num_pages = length / header.page_size; + bitmap_size = BITS_TO_LONGS(num_pages) * sizeof(unsigned long); + + bitmap = g_malloc0(bitmap_size); + if (qemu_get_buffer_at(f, (uint8_t *)bitmap, bitmap_size, + header.bitmap_offset) != bitmap_size) { + error_setg(errp, "Error reading dirty bitmap"); + return; + } + + if (!read_ramblock_mapped_ram(f, block, num_pages, bitmap, errp)) { + return; + } + + /* Skip pages array */ + qemu_set_offset(f, block->pages_offset + length, SEEK_SET); + + return; +} + static int parse_ramblock(QEMUFile *f, RAMBlock *block, ram_addr_t length) { int ret = 0; /* ADVISE is earlier, it shows the source has the postcopy capability on */ bool postcopy_advised = migration_incoming_postcopy_advised(); + int max_hg_page_size; + Error *local_err = NULL; assert(block); + if (migrate_mapped_ram()) { + parse_ramblock_mapped_ram(f, block, length, &local_err); + if (local_err) { + error_report_err(local_err); + return -EINVAL; + } + return 0; + } + if (!qemu_ram_is_migratable(block)) { error_report("block %s should not be migrated !", block->idstr); return -EINVAL; } if (length != block->used_length) { - Error *local_err = NULL; - ret = qemu_ram_resize(block, length, &local_err); if (local_err) { error_report_err(local_err); return ret; } } + + /* + * ??? Mirrors the previous value of qemu_host_page_size, + * but is this really what was intended for the migration? + */ + max_hg_page_size = MAX(qemu_real_host_page_size(), TARGET_PAGE_SIZE); + /* For postcopy we need to check hugepage sizes match */ if (postcopy_advised && migrate_postcopy_ram() && - block->page_size != qemu_host_page_size) { + block->page_size != max_hg_page_size) { uint64_t remote_page_size = qemu_get_be64(f); if (remote_page_size != block->page_size) { error_report("Mismatched RAM page size %s " @@ -3885,6 +4191,12 @@ static int ram_load_precopy(QEMUFile *f) invalid_flags |= RAM_SAVE_FLAG_COMPRESS_PAGE; } + if (migrate_mapped_ram()) { + invalid_flags |= (RAM_SAVE_FLAG_HOOK | RAM_SAVE_FLAG_MULTIFD_FLUSH | + RAM_SAVE_FLAG_PAGE | RAM_SAVE_FLAG_XBZRLE | + RAM_SAVE_FLAG_ZERO); + } + while (!ret && !(flags & RAM_SAVE_FLAG_EOS)) { ram_addr_t addr; void *host = NULL, *host_bak = NULL; @@ -3906,6 +4218,8 @@ static int ram_load_precopy(QEMUFile *f) addr &= TARGET_PAGE_MASK; if (flags & invalid_flags) { + error_report("Unexpected RAM flags: %d", flags & invalid_flags); + if (flags & invalid_flags & RAM_SAVE_FLAG_COMPRESS_PAGE) { error_report("Received an unexpected compressed page"); } @@ -3958,6 +4272,16 @@ static int ram_load_precopy(QEMUFile *f) switch (flags & ~RAM_SAVE_FLAG_CONTINUE) { case RAM_SAVE_FLAG_MEM_SIZE: ret = parse_ramblocks(f, addr); + /* + * For mapped-ram migration (to a file) using multifd, we sync + * once and for all here to make sure all tasks we queued to + * multifd threads are completed, so that all the ramblocks + * (including all the guest memory pages within) are fully + * loaded after this sync returns. + */ + if (migrate_mapped_ram()) { + multifd_recv_sync_main(); + } break; case RAM_SAVE_FLAG_ZERO: @@ -3998,7 +4322,12 @@ static int ram_load_precopy(QEMUFile *f) case RAM_SAVE_FLAG_EOS: /* normal exit */ if (migrate_multifd() && - migrate_multifd_flush_after_each_section()) { + migrate_multifd_flush_after_each_section() && + /* + * Mapped-ram migration flushes once and for all after + * parsing ramblocks. Always ignore EOS for it. + */ + !migrate_mapped_ram()) { multifd_recv_sync_main(); } break; diff --git a/migration/ram.h b/migration/ram.h index 9b937a446b..b9ac0da587 100644 --- a/migration/ram.h +++ b/migration/ram.h @@ -75,6 +75,7 @@ bool ram_dirty_bitmap_reload(MigrationState *s, RAMBlock *rb, Error **errp); bool ramblock_page_is_discarded(RAMBlock *rb, ram_addr_t start); void postcopy_preempt_shutdown_file(MigrationState *s); void *postcopy_preempt_thread(void *opaque); +void ramblock_set_file_bmap_atomic(RAMBlock *block, ram_addr_t offset); /* ram cache */ int colo_init_ram_cache(void); diff --git a/migration/savevm.c b/migration/savevm.c index d612c8a902..dc1fb9c0d3 100644 --- a/migration/savevm.c +++ b/migration/savevm.c @@ -245,6 +245,7 @@ static bool should_validate_capability(int capability) /* Validate only new capabilities to keep compatibility. */ switch (capability) { case MIGRATION_CAPABILITY_X_IGNORE_SHARED: + case MIGRATION_CAPABILITY_MAPPED_RAM: return true; default: return false; diff --git a/migration/trace-events b/migration/trace-events index 298ad2b0dd..bf1a069632 100644 --- a/migration/trace-events +++ b/migration/trace-events @@ -132,7 +132,7 @@ multifd_recv(uint8_t id, uint64_t packet_num, uint32_t used, uint32_t flags, uin multifd_recv_new_channel(uint8_t id) "channel %u" multifd_recv_sync_main(long packet_num) "packet num %ld" multifd_recv_sync_main_signal(uint8_t id) "channel %u" -multifd_recv_sync_main_wait(uint8_t id) "channel %u" +multifd_recv_sync_main_wait(uint8_t id) "iter %u" multifd_recv_terminate_threads(bool error) "error %d" multifd_recv_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel %u packets %" PRIu64 " pages %" PRIu64 multifd_recv_thread_start(uint8_t id) "%u" |