summary refs log tree commit diff stats
path: root/io
diff options
context:
space:
mode:
Diffstat (limited to 'io')
-rw-r--r--io/channel-command.c10
-rw-r--r--io/channel-file.c9
-rw-r--r--io/channel-null.c3
-rw-r--r--io/channel-socket.c9
-rw-r--r--io/channel-tls.c6
-rw-r--r--io/channel-util.c24
-rw-r--r--io/channel.c118
7 files changed, 138 insertions, 41 deletions
diff --git a/io/channel-command.c b/io/channel-command.c
index 7ed726c802..6d5f64e146 100644
--- a/io/channel-command.c
+++ b/io/channel-command.c
@@ -20,6 +20,7 @@
 
 #include "qemu/osdep.h"
 #include "io/channel-command.h"
+#include "io/channel-util.h"
 #include "io/channel-watch.h"
 #include "qapi/error.h"
 #include "qemu/module.h"
@@ -331,14 +332,17 @@ static int qio_channel_command_close(QIOChannel *ioc,
 
 
 static void qio_channel_command_set_aio_fd_handler(QIOChannel *ioc,
-                                                   AioContext *ctx,
+                                                   AioContext *read_ctx,
                                                    IOHandler *io_read,
+                                                   AioContext *write_ctx,
                                                    IOHandler *io_write,
                                                    void *opaque)
 {
     QIOChannelCommand *cioc = QIO_CHANNEL_COMMAND(ioc);
-    aio_set_fd_handler(ctx, cioc->readfd, io_read, NULL, NULL, NULL, opaque);
-    aio_set_fd_handler(ctx, cioc->writefd, NULL, io_write, NULL, NULL, opaque);
+
+    qio_channel_util_set_aio_fd_handler(cioc->readfd, read_ctx, io_read,
+                                        cioc->writefd, write_ctx, io_write,
+                                        opaque);
 }
 
 
diff --git a/io/channel-file.c b/io/channel-file.c
index 8b5821f452..4a12c61886 100644
--- a/io/channel-file.c
+++ b/io/channel-file.c
@@ -20,6 +20,7 @@
 
 #include "qemu/osdep.h"
 #include "io/channel-file.h"
+#include "io/channel-util.h"
 #include "io/channel-watch.h"
 #include "qapi/error.h"
 #include "qemu/module.h"
@@ -192,13 +193,17 @@ static int qio_channel_file_close(QIOChannel *ioc,
 
 
 static void qio_channel_file_set_aio_fd_handler(QIOChannel *ioc,
-                                                AioContext *ctx,
+                                                AioContext *read_ctx,
                                                 IOHandler *io_read,
+                                                AioContext *write_ctx,
                                                 IOHandler *io_write,
                                                 void *opaque)
 {
     QIOChannelFile *fioc = QIO_CHANNEL_FILE(ioc);
-    aio_set_fd_handler(ctx, fioc->fd, io_read, io_write, NULL, NULL, opaque);
+
+    qio_channel_util_set_aio_fd_handler(fioc->fd, read_ctx, io_read,
+                                        fioc->fd, write_ctx, io_write,
+                                        opaque);
 }
 
 static GSource *qio_channel_file_create_watch(QIOChannel *ioc,
diff --git a/io/channel-null.c b/io/channel-null.c
index 4fafdb770d..ef99586348 100644
--- a/io/channel-null.c
+++ b/io/channel-null.c
@@ -128,8 +128,9 @@ qio_channel_null_close(QIOChannel *ioc,
 
 static void
 qio_channel_null_set_aio_fd_handler(QIOChannel *ioc G_GNUC_UNUSED,
-                                    AioContext *ctx G_GNUC_UNUSED,
+                                    AioContext *read_ctx G_GNUC_UNUSED,
                                     IOHandler *io_read G_GNUC_UNUSED,
+                                    AioContext *write_ctx G_GNUC_UNUSED,
                                     IOHandler *io_write G_GNUC_UNUSED,
                                     void *opaque G_GNUC_UNUSED)
 {
diff --git a/io/channel-socket.c b/io/channel-socket.c
index d99945ebec..02ffb51e99 100644
--- a/io/channel-socket.c
+++ b/io/channel-socket.c
@@ -22,6 +22,7 @@
 #include "qapi/qapi-visit-sockets.h"
 #include "qemu/module.h"
 #include "io/channel-socket.h"
+#include "io/channel-util.h"
 #include "io/channel-watch.h"
 #include "trace.h"
 #include "qapi/clone-visitor.h"
@@ -893,13 +894,17 @@ qio_channel_socket_shutdown(QIOChannel *ioc,
 }
 
 static void qio_channel_socket_set_aio_fd_handler(QIOChannel *ioc,
-                                                  AioContext *ctx,
+                                                  AioContext *read_ctx,
                                                   IOHandler *io_read,
+                                                  AioContext *write_ctx,
                                                   IOHandler *io_write,
                                                   void *opaque)
 {
     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
-    aio_set_fd_handler(ctx, sioc->fd, io_read, io_write, NULL, NULL, opaque);
+
+    qio_channel_util_set_aio_fd_handler(sioc->fd, read_ctx, io_read,
+                                        sioc->fd, write_ctx, io_write,
+                                        opaque);
 }
 
 static GSource *qio_channel_socket_create_watch(QIOChannel *ioc,
diff --git a/io/channel-tls.c b/io/channel-tls.c
index 847d5297c3..58fe1aceee 100644
--- a/io/channel-tls.c
+++ b/io/channel-tls.c
@@ -388,14 +388,16 @@ static int qio_channel_tls_close(QIOChannel *ioc,
 }
 
 static void qio_channel_tls_set_aio_fd_handler(QIOChannel *ioc,
-                                               AioContext *ctx,
+                                               AioContext *read_ctx,
                                                IOHandler *io_read,
+                                               AioContext *write_ctx,
                                                IOHandler *io_write,
                                                void *opaque)
 {
     QIOChannelTLS *tioc = QIO_CHANNEL_TLS(ioc);
 
-    qio_channel_set_aio_fd_handler(tioc->master, ctx, io_read, io_write, opaque);
+    qio_channel_set_aio_fd_handler(tioc->master, read_ctx, io_read,
+            write_ctx, io_write, opaque);
 }
 
 typedef struct QIOChannelTLSSource QIOChannelTLSSource;
diff --git a/io/channel-util.c b/io/channel-util.c
index 848a7a43d6..4b340d46d7 100644
--- a/io/channel-util.c
+++ b/io/channel-util.c
@@ -36,3 +36,27 @@ QIOChannel *qio_channel_new_fd(int fd,
     }
     return ioc;
 }
+
+
+void qio_channel_util_set_aio_fd_handler(int read_fd,
+                                         AioContext *read_ctx,
+                                         IOHandler *io_read,
+                                         int write_fd,
+                                         AioContext *write_ctx,
+                                         IOHandler *io_write,
+                                         void *opaque)
+{
+    if (read_fd == write_fd && read_ctx == write_ctx) {
+        aio_set_fd_handler(read_ctx, read_fd, io_read, io_write,
+                NULL, NULL, opaque);
+    } else {
+        if (read_ctx) {
+            aio_set_fd_handler(read_ctx, read_fd, io_read, NULL,
+                    NULL, NULL, opaque);
+        }
+        if (write_ctx) {
+            aio_set_fd_handler(write_ctx, write_fd, NULL, io_write,
+                    NULL, NULL, opaque);
+        }
+    }
+}
diff --git a/io/channel.c b/io/channel.c
index 72f0066af5..86c5834510 100644
--- a/io/channel.c
+++ b/io/channel.c
@@ -365,6 +365,12 @@ int qio_channel_set_blocking(QIOChannel *ioc,
 }
 
 
+void qio_channel_set_follow_coroutine_ctx(QIOChannel *ioc, bool enabled)
+{
+    ioc->follow_coroutine_ctx = enabled;
+}
+
+
 int qio_channel_close(QIOChannel *ioc,
                       Error **errp)
 {
@@ -388,14 +394,16 @@ GSource *qio_channel_create_watch(QIOChannel *ioc,
 
 
 void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
-                                    AioContext *ctx,
+                                    AioContext *read_ctx,
                                     IOHandler *io_read,
+                                    AioContext *write_ctx,
                                     IOHandler *io_write,
                                     void *opaque)
 {
     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 
-    klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque);
+    klass->io_set_aio_fd_handler(ioc, read_ctx, io_read, write_ctx, io_write,
+            opaque);
 }
 
 guint qio_channel_add_watch_full(QIOChannel *ioc,
@@ -542,56 +550,101 @@ static void qio_channel_restart_write(void *opaque)
     aio_co_wake(co);
 }
 
-static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc)
+static void coroutine_fn
+qio_channel_set_fd_handlers(QIOChannel *ioc, GIOCondition condition)
 {
-    IOHandler *rd_handler = NULL, *wr_handler = NULL;
-    AioContext *ctx;
+    AioContext *ctx = ioc->follow_coroutine_ctx ?
+        qemu_coroutine_get_aio_context(qemu_coroutine_self()) :
+        iohandler_get_aio_context();
+    AioContext *read_ctx = NULL;
+    IOHandler *io_read = NULL;
+    AioContext *write_ctx = NULL;
+    IOHandler *io_write = NULL;
 
-    if (ioc->read_coroutine) {
-        rd_handler = qio_channel_restart_read;
-    }
-    if (ioc->write_coroutine) {
-        wr_handler = qio_channel_restart_write;
+    if (condition == G_IO_IN) {
+        ioc->read_coroutine = qemu_coroutine_self();
+        ioc->read_ctx = ctx;
+        read_ctx = ctx;
+        io_read = qio_channel_restart_read;
+
+        /*
+         * Thread safety: if the other coroutine is set and its AioContext
+         * matches ours, then there is mutual exclusion between read and write
+         * because they share a single thread and it's safe to set both read
+         * and write fd handlers here. If the AioContext does not match ours,
+         * then both threads may run in parallel but there is no shared state
+         * to worry about.
+         */
+        if (ioc->write_coroutine && ioc->write_ctx == ctx) {
+            write_ctx = ctx;
+            io_write = qio_channel_restart_write;
+        }
+    } else if (condition == G_IO_OUT) {
+        ioc->write_coroutine = qemu_coroutine_self();
+        ioc->write_ctx = ctx;
+        write_ctx = ctx;
+        io_write = qio_channel_restart_write;
+        if (ioc->read_coroutine && ioc->read_ctx == ctx) {
+            read_ctx = ctx;
+            io_read = qio_channel_restart_read;
+        }
+    } else {
+        abort();
     }
 
-    ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context();
-    qio_channel_set_aio_fd_handler(ioc, ctx, rd_handler, wr_handler, ioc);
+    qio_channel_set_aio_fd_handler(ioc, read_ctx, io_read,
+            write_ctx, io_write, ioc);
 }
 
-void qio_channel_attach_aio_context(QIOChannel *ioc,
-                                    AioContext *ctx)
+static void coroutine_fn
+qio_channel_clear_fd_handlers(QIOChannel *ioc, GIOCondition condition)
 {
-    assert(!ioc->read_coroutine);
-    assert(!ioc->write_coroutine);
-    ioc->ctx = ctx;
-}
+    AioContext *read_ctx = NULL;
+    IOHandler *io_read = NULL;
+    AioContext *write_ctx = NULL;
+    IOHandler *io_write = NULL;
+    AioContext *ctx;
 
-void qio_channel_detach_aio_context(QIOChannel *ioc)
-{
-    ioc->read_coroutine = NULL;
-    ioc->write_coroutine = NULL;
-    qio_channel_set_aio_fd_handlers(ioc);
-    ioc->ctx = NULL;
+    if (condition == G_IO_IN) {
+        ctx = ioc->read_ctx;
+        read_ctx = ctx;
+        io_read = NULL;
+        if (ioc->write_coroutine && ioc->write_ctx == ctx) {
+            write_ctx = ctx;
+            io_write = qio_channel_restart_write;
+        }
+    } else if (condition == G_IO_OUT) {
+        ctx = ioc->write_ctx;
+        write_ctx = ctx;
+        io_write = NULL;
+        if (ioc->read_coroutine && ioc->read_ctx == ctx) {
+            read_ctx = ctx;
+            io_read = qio_channel_restart_read;
+        }
+    } else {
+        abort();
+    }
+
+    qio_channel_set_aio_fd_handler(ioc, read_ctx, io_read,
+            write_ctx, io_write, ioc);
 }
 
 void coroutine_fn qio_channel_yield(QIOChannel *ioc,
                                     GIOCondition condition)
 {
-    AioContext *ioc_ctx = ioc->ctx ?: qemu_get_aio_context();
+    AioContext *ioc_ctx;
 
     assert(qemu_in_coroutine());
-    assert(in_aio_context_home_thread(ioc_ctx));
+    ioc_ctx = qemu_coroutine_get_aio_context(qemu_coroutine_self());
 
     if (condition == G_IO_IN) {
         assert(!ioc->read_coroutine);
-        ioc->read_coroutine = qemu_coroutine_self();
     } else if (condition == G_IO_OUT) {
         assert(!ioc->write_coroutine);
-        ioc->write_coroutine = qemu_coroutine_self();
     } else {
         abort();
     }
-    qio_channel_set_aio_fd_handlers(ioc);
+    qio_channel_set_fd_handlers(ioc, condition);
     qemu_coroutine_yield();
     assert(in_aio_context_home_thread(ioc_ctx));
 
@@ -599,11 +652,10 @@ void coroutine_fn qio_channel_yield(QIOChannel *ioc,
      * through the aio_fd_handlers. */
     if (condition == G_IO_IN) {
         assert(ioc->read_coroutine == NULL);
-        qio_channel_set_aio_fd_handlers(ioc);
     } else if (condition == G_IO_OUT) {
         assert(ioc->write_coroutine == NULL);
-        qio_channel_set_aio_fd_handlers(ioc);
     }
+    qio_channel_clear_fd_handlers(ioc, condition);
 }
 
 void qio_channel_wake_read(QIOChannel *ioc)
@@ -653,6 +705,10 @@ static void qio_channel_finalize(Object *obj)
 {
     QIOChannel *ioc = QIO_CHANNEL(obj);
 
+    /* Must not have coroutines in qio_channel_yield() */
+    assert(!ioc->read_coroutine);
+    assert(!ioc->write_coroutine);
+
     g_free(ioc->name);
 
 #ifdef _WIN32