From 0ffcece32519e85a2971cafdc421f4fd3107d766 Mon Sep 17 00:00:00 2001 From: Zhang Chen Date: Mon, 3 Sep 2018 12:38:43 +0800 Subject: colo-compare: implement the process of checkpoint While do checkpoint, we need to flush all the unhandled packets, By using the filter notifier mechanism, we can easily to notify every compare object to do this process, which runs inside of compare threads as a coroutine. Signed-off-by: zhanghailiang Signed-off-by: Zhang Chen Signed-off-by: Zhang Chen Signed-off-by: Jason Wang --- net/colo-compare.c | 78 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) (limited to 'net/colo-compare.c') diff --git a/net/colo-compare.c b/net/colo-compare.c index dd745a491b..80e6532e8b 100644 --- a/net/colo-compare.c +++ b/net/colo-compare.c @@ -27,11 +27,16 @@ #include "qemu/sockets.h" #include "colo.h" #include "sysemu/iothread.h" +#include "net/colo-compare.h" +#include "migration/colo.h" #define TYPE_COLO_COMPARE "colo-compare" #define COLO_COMPARE(obj) \ OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE) +static QTAILQ_HEAD(, CompareState) net_compares = + QTAILQ_HEAD_INITIALIZER(net_compares); + #define COMPARE_READ_LEN_MAX NET_BUFSIZE #define MAX_QUEUE_SIZE 1024 @@ -41,6 +46,10 @@ /* TODO: Should be configurable */ #define REGULAR_PACKET_CHECK_MS 3000 +static QemuMutex event_mtx; +static QemuCond event_complete_cond; +static int event_unhandled_count; + /* * + CompareState ++ * | | @@ -87,6 +96,11 @@ typedef struct CompareState { IOThread *iothread; GMainContext *worker_context; QEMUTimer *packet_check_timer; + + QEMUBH *event_bh; + enum colo_event event; + + QTAILQ_ENTRY(CompareState) next; } CompareState; typedef struct CompareClass { @@ -736,6 +750,25 @@ static void check_old_packet_regular(void *opaque) REGULAR_PACKET_CHECK_MS); } +/* Public API, Used for COLO frame to notify compare event */ +void colo_notify_compares_event(void *opaque, int event, Error **errp) +{ + CompareState *s; + + qemu_mutex_lock(&event_mtx); + QTAILQ_FOREACH(s, &net_compares, next) { + s->event = event; + qemu_bh_schedule(s->event_bh); + event_unhandled_count++; + } + /* Wait all compare threads to finish handling this event */ + while (event_unhandled_count > 0) { + qemu_cond_wait(&event_complete_cond, &event_mtx); + } + + qemu_mutex_unlock(&event_mtx); +} + static void colo_compare_timer_init(CompareState *s) { AioContext *ctx = iothread_get_aio_context(s->iothread); @@ -756,6 +789,30 @@ static void colo_compare_timer_del(CompareState *s) } } +static void colo_flush_packets(void *opaque, void *user_data); + +static void colo_compare_handle_event(void *opaque) +{ + CompareState *s = opaque; + + switch (s->event) { + case COLO_EVENT_CHECKPOINT: + g_queue_foreach(&s->conn_list, colo_flush_packets, s); + break; + case COLO_EVENT_FAILOVER: + break; + default: + break; + } + + assert(event_unhandled_count > 0); + + qemu_mutex_lock(&event_mtx); + event_unhandled_count--; + qemu_cond_broadcast(&event_complete_cond); + qemu_mutex_unlock(&event_mtx); +} + static void colo_compare_iothread(CompareState *s) { object_ref(OBJECT(s->iothread)); @@ -769,6 +826,7 @@ static void colo_compare_iothread(CompareState *s) s, s->worker_context, true); colo_compare_timer_init(s); + s->event_bh = qemu_bh_new(colo_compare_handle_event, s); } static char *compare_get_pri_indev(Object *obj, Error **errp) @@ -926,8 +984,13 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp) net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr); net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr); + QTAILQ_INSERT_TAIL(&net_compares, s, next); + g_queue_init(&s->conn_list); + qemu_mutex_init(&event_mtx); + qemu_cond_init(&event_complete_cond); + s->connection_track_table = g_hash_table_new_full(connection_key_hash, connection_key_equal, g_free, @@ -990,6 +1053,7 @@ static void colo_compare_init(Object *obj) static void colo_compare_finalize(Object *obj) { CompareState *s = COLO_COMPARE(obj); + CompareState *tmp = NULL; qemu_chr_fe_deinit(&s->chr_pri_in, false); qemu_chr_fe_deinit(&s->chr_sec_in, false); @@ -997,6 +1061,16 @@ static void colo_compare_finalize(Object *obj) if (s->iothread) { colo_compare_timer_del(s); } + + qemu_bh_delete(s->event_bh); + + QTAILQ_FOREACH(tmp, &net_compares, next) { + if (tmp == s) { + QTAILQ_REMOVE(&net_compares, s, next); + break; + } + } + /* Release all unhandled packets after compare thead exited */ g_queue_foreach(&s->conn_list, colo_flush_packets, s); @@ -1009,6 +1083,10 @@ static void colo_compare_finalize(Object *obj) if (s->iothread) { object_unref(OBJECT(s->iothread)); } + + qemu_mutex_destroy(&event_mtx); + qemu_cond_destroy(&event_complete_cond); + g_free(s->pri_indev); g_free(s->sec_indev); g_free(s->outdev); -- cgit 1.4.1 From dccd0313b69161fe2235c97633f40ecc041542ac Mon Sep 17 00:00:00 2001 From: Zhang Chen Date: Mon, 3 Sep 2018 12:38:44 +0800 Subject: colo-compare: use notifier to notify packets comparing result It's a good idea to use notifier to notify COLO frame of inconsistent packets comparing. Signed-off-by: Zhang Chen Signed-off-by: Zhang Chen Signed-off-by: zhanghailiang Signed-off-by: Jason Wang --- net/colo-compare.c | 37 ++++++++++++++++++++++++++----------- net/colo-compare.h | 2 ++ 2 files changed, 28 insertions(+), 11 deletions(-) (limited to 'net/colo-compare.c') diff --git a/net/colo-compare.c b/net/colo-compare.c index 80e6532e8b..3f7e240590 100644 --- a/net/colo-compare.c +++ b/net/colo-compare.c @@ -29,6 +29,7 @@ #include "sysemu/iothread.h" #include "net/colo-compare.h" #include "migration/colo.h" +#include "migration/migration.h" #define TYPE_COLO_COMPARE "colo-compare" #define COLO_COMPARE(obj) \ @@ -37,6 +38,9 @@ static QTAILQ_HEAD(, CompareState) net_compares = QTAILQ_HEAD_INITIALIZER(net_compares); +static NotifierList colo_compare_notifiers = + NOTIFIER_LIST_INITIALIZER(colo_compare_notifiers); + #define COMPARE_READ_LEN_MAX NET_BUFSIZE #define MAX_QUEUE_SIZE 1024 @@ -326,6 +330,12 @@ static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt, return false; } +static void colo_compare_inconsistency_notify(void) +{ + notifier_list_notify(&colo_compare_notifiers, + migrate_get_current()); +} + static void colo_compare_tcp(CompareState *s, Connection *conn) { Packet *ppkt = NULL, *spkt = NULL; @@ -427,10 +437,7 @@ sec: qemu_hexdump((char *)spkt->data, stderr, "colo-compare spkt", spkt->size); - /* - * colo_compare_inconsistent_notify(); - * TODO: notice to checkpoint(); - */ + colo_compare_inconsistency_notify(); } } @@ -561,8 +568,18 @@ static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time) } } +void colo_compare_register_notifier(Notifier *notify) +{ + notifier_list_add(&colo_compare_notifiers, notify); +} + +void colo_compare_unregister_notifier(Notifier *notify) +{ + notifier_remove(notify); +} + static int colo_old_packet_check_one_conn(Connection *conn, - void *user_data) + void *user_data) { GList *result = NULL; int64_t check_time = REGULAR_PACKET_CHECK_MS; @@ -573,10 +590,7 @@ static int colo_old_packet_check_one_conn(Connection *conn, if (result) { /* Do checkpoint will flush old packet */ - /* - * TODO: Notify colo frame to do checkpoint. - * colo_compare_inconsistent_notify(); - */ + colo_compare_inconsistency_notify(); return 0; } @@ -620,11 +634,12 @@ static void colo_compare_packet(CompareState *s, Connection *conn, /* * If one packet arrive late, the secondary_list or * primary_list will be empty, so we can't compare it - * until next comparison. + * until next comparison. If the packets in the list are + * timeout, it will trigger a checkpoint request. */ trace_colo_compare_main("packet different"); g_queue_push_head(&conn->primary_list, pkt); - /* TODO: colo_notify_checkpoint();*/ + colo_compare_inconsistency_notify(); break; } } diff --git a/net/colo-compare.h b/net/colo-compare.h index 1b1ce76aea..22ddd512e2 100644 --- a/net/colo-compare.h +++ b/net/colo-compare.h @@ -18,5 +18,7 @@ #define QEMU_COLO_COMPARE_H void colo_notify_compares_event(void *opaque, int event, Error **errp); +void colo_compare_register_notifier(Notifier *notify); +void colo_compare_unregister_notifier(Notifier *notify); #endif /* QEMU_COLO_COMPARE_H */ -- cgit 1.4.1 From 24525e93c17aabdd88df893f1ceecc37e8b289f3 Mon Sep 17 00:00:00 2001 From: Zhang Chen Date: Mon, 3 Sep 2018 12:38:57 +0800 Subject: filter-rewriter: handle checkpoint and failover event After one round of checkpoint, the states between PVM and SVM become consistent, so it is unnecessary to adjust the sequence of net packets for old connections, besides, while failover happens, filter-rewriter will into failover mode that needn't handle the new TCP connection. Signed-off-by: zhanghailiang Signed-off-by: Zhang Chen Signed-off-by: Zhang Chen Signed-off-by: Jason Wang --- net/colo-compare.c | 12 +++++------ net/colo.c | 8 ++++++++ net/colo.h | 2 ++ net/filter-rewriter.c | 57 +++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 73 insertions(+), 6 deletions(-) (limited to 'net/colo-compare.c') diff --git a/net/colo-compare.c b/net/colo-compare.c index 3f7e240590..a39191d522 100644 --- a/net/colo-compare.c +++ b/net/colo-compare.c @@ -116,6 +116,12 @@ enum { SECONDARY_IN, }; +static void colo_compare_inconsistency_notify(void) +{ + notifier_list_notify(&colo_compare_notifiers, + migrate_get_current()); +} + static int compare_chr_send(CompareState *s, const uint8_t *buf, uint32_t size, @@ -330,12 +336,6 @@ static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt, return false; } -static void colo_compare_inconsistency_notify(void) -{ - notifier_list_notify(&colo_compare_notifiers, - migrate_get_current()); -} - static void colo_compare_tcp(CompareState *s, Connection *conn) { Packet *ppkt = NULL, *spkt = NULL; diff --git a/net/colo.c b/net/colo.c index 97c8fc928f..49176bf07b 100644 --- a/net/colo.c +++ b/net/colo.c @@ -221,3 +221,11 @@ Connection *connection_get(GHashTable *connection_track_table, return conn; } + +bool connection_has_tracked(GHashTable *connection_track_table, + ConnectionKey *key) +{ + Connection *conn = g_hash_table_lookup(connection_track_table, key); + + return conn ? true : false; +} diff --git a/net/colo.h b/net/colo.h index 0277e0e9ba..11c5226488 100644 --- a/net/colo.h +++ b/net/colo.h @@ -98,6 +98,8 @@ void connection_destroy(void *opaque); Connection *connection_get(GHashTable *connection_track_table, ConnectionKey *key, GQueue *conn_list); +bool connection_has_tracked(GHashTable *connection_track_table, + ConnectionKey *key); void connection_hashtable_reset(GHashTable *connection_track_table); Packet *packet_new(const void *data, int size, int vnet_hdr_len); void packet_destroy(void *opaque, void *user_data); diff --git a/net/filter-rewriter.c b/net/filter-rewriter.c index dd323faf61..bb8f4d93b1 100644 --- a/net/filter-rewriter.c +++ b/net/filter-rewriter.c @@ -20,11 +20,15 @@ #include "qemu/main-loop.h" #include "qemu/iov.h" #include "net/checksum.h" +#include "net/colo.h" +#include "migration/colo.h" #define FILTER_COLO_REWRITER(obj) \ OBJECT_CHECK(RewriterState, (obj), TYPE_FILTER_REWRITER) #define TYPE_FILTER_REWRITER "filter-rewriter" +#define FAILOVER_MODE_ON true +#define FAILOVER_MODE_OFF false typedef struct RewriterState { NetFilterState parent_obj; @@ -32,8 +36,14 @@ typedef struct RewriterState { /* hashtable to save connection */ GHashTable *connection_track_table; bool vnet_hdr; + bool failover_mode; } RewriterState; +static void filter_rewriter_failover_mode(RewriterState *s) +{ + s->failover_mode = FAILOVER_MODE_ON; +} + static void filter_rewriter_flush(NetFilterState *nf) { RewriterState *s = FILTER_COLO_REWRITER(nf); @@ -273,6 +283,13 @@ static ssize_t colo_rewriter_receive_iov(NetFilterState *nf, */ reverse_connection_key(&key); } + + /* After failover we needn't change new TCP packet */ + if (s->failover_mode && + !connection_has_tracked(s->connection_track_table, &key)) { + goto out; + } + conn = connection_get(s->connection_track_table, &key, NULL); @@ -306,11 +323,49 @@ static ssize_t colo_rewriter_receive_iov(NetFilterState *nf, } } +out: packet_destroy(pkt, NULL); pkt = NULL; return 0; } +static void reset_seq_offset(gpointer key, gpointer value, gpointer user_data) +{ + Connection *conn = (Connection *)value; + + conn->offset = 0; +} + +static gboolean offset_is_nonzero(gpointer key, + gpointer value, + gpointer user_data) +{ + Connection *conn = (Connection *)value; + + return conn->offset ? true : false; +} + +static void colo_rewriter_handle_event(NetFilterState *nf, int event, + Error **errp) +{ + RewriterState *rs = FILTER_COLO_REWRITER(nf); + + switch (event) { + case COLO_EVENT_CHECKPOINT: + g_hash_table_foreach(rs->connection_track_table, + reset_seq_offset, NULL); + break; + case COLO_EVENT_FAILOVER: + if (!g_hash_table_find(rs->connection_track_table, + offset_is_nonzero, NULL)) { + filter_rewriter_failover_mode(rs); + } + break; + default: + break; + } +} + static void colo_rewriter_cleanup(NetFilterState *nf) { RewriterState *s = FILTER_COLO_REWRITER(nf); @@ -354,6 +409,7 @@ static void filter_rewriter_init(Object *obj) RewriterState *s = FILTER_COLO_REWRITER(obj); s->vnet_hdr = false; + s->failover_mode = FAILOVER_MODE_OFF; object_property_add_bool(obj, "vnet_hdr_support", filter_rewriter_get_vnet_hdr, filter_rewriter_set_vnet_hdr, NULL); @@ -366,6 +422,7 @@ static void colo_rewriter_class_init(ObjectClass *oc, void *data) nfc->setup = colo_rewriter_setup; nfc->cleanup = colo_rewriter_cleanup; nfc->receive_iov = colo_rewriter_receive_iov; + nfc->handle_event = colo_rewriter_handle_event; } static const TypeInfo colo_rewriter_info = { -- cgit 1.4.1