summary refs log tree commit diff stats
path: root/net/stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/stream.c')
-rw-r--r--net/stream.c282
1 files changed, 64 insertions, 218 deletions
diff --git a/net/stream.c b/net/stream.c
index 6152d2a05e..d893f02cab 100644
--- a/net/stream.c
+++ b/net/stream.c
@@ -27,173 +27,50 @@
 
 #include "net/net.h"
 #include "clients.h"
-#include "monitor/monitor.h"
 #include "qapi/error.h"
-#include "qemu/error-report.h"
-#include "qemu/option.h"
-#include "qemu/sockets.h"
-#include "qemu/iov.h"
-#include "qemu/main-loop.h"
-#include "qemu/cutils.h"
-#include "io/channel.h"
-#include "io/channel-socket.h"
 #include "io/net-listener.h"
 #include "qapi/qapi-events-net.h"
 #include "qapi/qapi-visit-sockets.h"
 #include "qapi/clone-visitor.h"
 
+#include "stream_data.h"
+
 typedef struct NetStreamState {
-    NetClientState nc;
-    QIOChannel *listen_ioc;
-    QIONetListener *listener;
-    QIOChannel *ioc;
-    guint ioc_read_tag;
-    guint ioc_write_tag;
-    SocketReadState rs;
-    unsigned int send_index;      /* number of bytes sent*/
+    NetStreamData data;
     uint32_t reconnect_ms;
     guint timer_tag;
     SocketAddress *addr;
 } NetStreamState;
 
-static void net_stream_listen(QIONetListener *listener,
-                              QIOChannelSocket *cioc,
-                              void *opaque);
 static void net_stream_arm_reconnect(NetStreamState *s);
 
-static gboolean net_stream_writable(QIOChannel *ioc,
-                                    GIOCondition condition,
-                                    gpointer data)
-{
-    NetStreamState *s = data;
-
-    s->ioc_write_tag = 0;
-
-    qemu_flush_queued_packets(&s->nc);
-
-    return G_SOURCE_REMOVE;
-}
-
 static ssize_t net_stream_receive(NetClientState *nc, const uint8_t *buf,
                                   size_t size)
 {
-    NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
-    uint32_t len = htonl(size);
-    struct iovec iov[] = {
-        {
-            .iov_base = &len,
-            .iov_len  = sizeof(len),
-        }, {
-            .iov_base = (void *)buf,
-            .iov_len  = size,
-        },
-    };
-    struct iovec local_iov[2];
-    unsigned int nlocal_iov;
-    size_t remaining;
-    ssize_t ret;
-
-    remaining = iov_size(iov, 2) - s->send_index;
-    nlocal_iov = iov_copy(local_iov, 2, iov, 2, s->send_index, remaining);
-    ret = qio_channel_writev(s->ioc, local_iov, nlocal_iov, NULL);
-    if (ret == QIO_CHANNEL_ERR_BLOCK) {
-        ret = 0; /* handled further down */
-    }
-    if (ret == -1) {
-        s->send_index = 0;
-        return -errno;
-    }
-    if (ret < (ssize_t)remaining) {
-        s->send_index += ret;
-        s->ioc_write_tag = qio_channel_add_watch(s->ioc, G_IO_OUT,
-                                                 net_stream_writable, s, NULL);
-        return 0;
-    }
-    s->send_index = 0;
-    return size;
-}
-
-static gboolean net_stream_send(QIOChannel *ioc,
-                                GIOCondition condition,
-                                gpointer data);
-
-static void net_stream_send_completed(NetClientState *nc, ssize_t len)
-{
-    NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
-
-    if (!s->ioc_read_tag) {
-        s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN,
-                                                net_stream_send, s, NULL);
-    }
-}
+    NetStreamData *d = DO_UPCAST(NetStreamData, nc, nc);
 
-static void net_stream_rs_finalize(SocketReadState *rs)
-{
-    NetStreamState *s = container_of(rs, NetStreamState, rs);
-
-    if (qemu_send_packet_async(&s->nc, rs->buf,
-                               rs->packet_len,
-                               net_stream_send_completed) == 0) {
-        if (s->ioc_read_tag) {
-            g_source_remove(s->ioc_read_tag);
-            s->ioc_read_tag = 0;
-        }
-    }
+    return net_stream_data_receive(d, buf, size);
 }
 
 static gboolean net_stream_send(QIOChannel *ioc,
                                 GIOCondition condition,
                                 gpointer data)
 {
-    NetStreamState *s = data;
-    int size;
-    int ret;
-    QEMU_UNINITIALIZED char buf1[NET_BUFSIZE];
-    const char *buf;
-
-    size = qio_channel_read(s->ioc, buf1, sizeof(buf1), NULL);
-    if (size < 0) {
-        if (errno != EWOULDBLOCK) {
-            goto eoc;
-        }
-    } else if (size == 0) {
-        /* end of connection */
-    eoc:
-        s->ioc_read_tag = 0;
-        if (s->ioc_write_tag) {
-            g_source_remove(s->ioc_write_tag);
-            s->ioc_write_tag = 0;
-        }
-        if (s->listener) {
-            qemu_set_info_str(&s->nc, "listening");
-            qio_net_listener_set_client_func(s->listener, net_stream_listen,
-                                             s, NULL);
-        }
-        object_unref(OBJECT(s->ioc));
-        s->ioc = NULL;
-
-        net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
-        s->nc.link_down = true;
+    if (net_stream_data_send(ioc, condition, data) == G_SOURCE_REMOVE) {
+        NetStreamState *s = DO_UPCAST(NetStreamState, data, data);
 
-        qapi_event_send_netdev_stream_disconnected(s->nc.name);
+        qapi_event_send_netdev_stream_disconnected(s->data.nc.name);
         net_stream_arm_reconnect(s);
 
         return G_SOURCE_REMOVE;
     }
-    buf = buf1;
-
-    ret = net_fill_rstate(&s->rs, (const uint8_t *)buf, size);
-
-    if (ret == -1) {
-        goto eoc;
-    }
 
     return G_SOURCE_CONTINUE;
 }
 
 static void net_stream_cleanup(NetClientState *nc)
 {
-    NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
+    NetStreamState *s = DO_UPCAST(NetStreamState, data.nc, nc);
     if (s->timer_tag) {
         g_source_remove(s->timer_tag);
         s->timer_tag = 0;
@@ -202,28 +79,28 @@ static void net_stream_cleanup(NetClientState *nc)
         qapi_free_SocketAddress(s->addr);
         s->addr = NULL;
     }
-    if (s->ioc) {
-        if (QIO_CHANNEL_SOCKET(s->ioc)->fd != -1) {
-            if (s->ioc_read_tag) {
-                g_source_remove(s->ioc_read_tag);
-                s->ioc_read_tag = 0;
+    if (s->data.ioc) {
+        if (QIO_CHANNEL_SOCKET(s->data.ioc)->fd != -1) {
+            if (s->data.ioc_read_tag) {
+                g_source_remove(s->data.ioc_read_tag);
+                s->data.ioc_read_tag = 0;
             }
-            if (s->ioc_write_tag) {
-                g_source_remove(s->ioc_write_tag);
-                s->ioc_write_tag = 0;
+            if (s->data.ioc_write_tag) {
+                g_source_remove(s->data.ioc_write_tag);
+                s->data.ioc_write_tag = 0;
             }
         }
-        object_unref(OBJECT(s->ioc));
-        s->ioc = NULL;
+        object_unref(OBJECT(s->data.ioc));
+        s->data.ioc = NULL;
     }
-    if (s->listen_ioc) {
-        if (s->listener) {
-            qio_net_listener_disconnect(s->listener);
-            object_unref(OBJECT(s->listener));
-            s->listener = NULL;
+    if (s->data.listen_ioc) {
+        if (s->data.listener) {
+            qio_net_listener_disconnect(s->data.listener);
+            object_unref(OBJECT(s->data.listener));
+            s->data.listener = NULL;
         }
-        object_unref(OBJECT(s->listen_ioc));
-        s->listen_ioc = NULL;
+        object_unref(OBJECT(s->data.listen_ioc));
+        s->data.listen_ioc = NULL;
     }
 }
 
@@ -235,23 +112,13 @@ static NetClientInfo net_stream_info = {
 };
 
 static void net_stream_listen(QIONetListener *listener,
-                              QIOChannelSocket *cioc,
-                              void *opaque)
+                                  QIOChannelSocket *cioc, gpointer data)
 {
-    NetStreamState *s = opaque;
+    NetStreamData *d = data;
     SocketAddress *addr;
     char *uri;
 
-    object_ref(OBJECT(cioc));
-
-    qio_net_listener_set_client_func(s->listener, NULL, s, NULL);
-
-    s->ioc = QIO_CHANNEL(cioc);
-    qio_channel_set_name(s->ioc, "stream-server");
-    s->nc.link_down = false;
-
-    s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send,
-                                            s, NULL);
+    net_stream_data_listen(listener, cioc, data);
 
     if (cioc->localAddr.ss_family == AF_UNIX) {
         addr = qio_channel_socket_get_local_address(cioc, NULL);
@@ -260,22 +127,22 @@ static void net_stream_listen(QIONetListener *listener,
     }
     g_assert(addr != NULL);
     uri = socket_uri(addr);
-    qemu_set_info_str(&s->nc, "%s", uri);
+    qemu_set_info_str(&d->nc, "%s", uri);
     g_free(uri);
-    qapi_event_send_netdev_stream_connected(s->nc.name, addr);
+    qapi_event_send_netdev_stream_connected(d->nc.name, addr);
     qapi_free_SocketAddress(addr);
 }
 
 static void net_stream_server_listening(QIOTask *task, gpointer opaque)
 {
-    NetStreamState *s = opaque;
-    QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(s->listen_ioc);
+    NetStreamData *d = opaque;
+    QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(d->listen_ioc);
     SocketAddress *addr;
     int ret;
     Error *err = NULL;
 
     if (qio_task_propagate_error(task, &err)) {
-        qemu_set_info_str(&s->nc, "error: %s", error_get_pretty(err));
+        qemu_set_info_str(&d->nc, "error: %s", error_get_pretty(err));
         error_free(err);
         return;
     }
@@ -284,20 +151,21 @@ static void net_stream_server_listening(QIOTask *task, gpointer opaque)
     g_assert(addr != NULL);
     ret = qemu_socket_try_set_nonblock(listen_sioc->fd);
     if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
-        qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)",
+        qemu_set_info_str(&d->nc, "can't use file descriptor %s (errno %d)",
                           addr->u.fd.str, -ret);
         return;
     }
     g_assert(ret == 0);
     qapi_free_SocketAddress(addr);
 
-    s->nc.link_down = true;
-    s->listener = qio_net_listener_new();
+    d->nc.link_down = true;
+    d->listener = qio_net_listener_new();
 
-    qemu_set_info_str(&s->nc, "listening");
-    net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
-    qio_net_listener_set_client_func(s->listener, net_stream_listen, s, NULL);
-    qio_net_listener_add(s->listener, listen_sioc);
+    qemu_set_info_str(&d->nc, "listening");
+    net_socket_rs_init(&d->rs, net_stream_data_rs_finalize, false);
+    qio_net_listener_set_client_func(d->listener, d->listen, d,
+                                     NULL);
+    qio_net_listener_add(d->listener, listen_sioc);
 }
 
 static int net_stream_server_init(NetClientState *peer,
@@ -307,16 +175,18 @@ static int net_stream_server_init(NetClientState *peer,
                                   Error **errp)
 {
     NetClientState *nc;
-    NetStreamState *s;
+    NetStreamData *d;
     QIOChannelSocket *listen_sioc = qio_channel_socket_new();
 
     nc = qemu_new_net_client(&net_stream_info, peer, model, name);
-    s = DO_UPCAST(NetStreamState, nc, nc);
-    qemu_set_info_str(&s->nc, "initializing");
+    d = DO_UPCAST(NetStreamData, nc, nc);
+    d->send = net_stream_send;
+    d->listen = net_stream_listen;
+    qemu_set_info_str(&d->nc, "initializing");
 
-    s->listen_ioc = QIO_CHANNEL(listen_sioc);
+    d->listen_ioc = QIO_CHANNEL(listen_sioc);
     qio_channel_socket_listen_async(listen_sioc, addr, 0,
-                                    net_stream_server_listening, s,
+                                    net_stream_server_listening, d,
                                     NULL, NULL);
 
     return 0;
@@ -325,49 +195,23 @@ static int net_stream_server_init(NetClientState *peer,
 static void net_stream_client_connected(QIOTask *task, gpointer opaque)
 {
     NetStreamState *s = opaque;
-    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(s->ioc);
+    NetStreamData *d = &s->data;
+    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(d->ioc);
     SocketAddress *addr;
     gchar *uri;
-    int ret;
-    Error *err = NULL;
 
-    if (qio_task_propagate_error(task, &err)) {
-        qemu_set_info_str(&s->nc, "error: %s", error_get_pretty(err));
-        error_free(err);
-        goto error;
+    if (net_stream_data_client_connected(task, d) == -1) {
+        net_stream_arm_reconnect(s);
+        return;
     }
 
     addr = qio_channel_socket_get_remote_address(sioc, NULL);
     g_assert(addr != NULL);
     uri = socket_uri(addr);
-    qemu_set_info_str(&s->nc, "%s", uri);
+    qemu_set_info_str(&d->nc, "%s", uri);
     g_free(uri);
-
-    ret = qemu_socket_try_set_nonblock(sioc->fd);
-    if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
-        qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)",
-                          addr->u.fd.str, -ret);
-        qapi_free_SocketAddress(addr);
-        goto error;
-    }
-    g_assert(ret == 0);
-
-    net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
-
-    /* Disable Nagle algorithm on TCP sockets to reduce latency */
-    qio_channel_set_delay(s->ioc, false);
-
-    s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send,
-                                            s, NULL);
-    s->nc.link_down = false;
-    qapi_event_send_netdev_stream_connected(s->nc.name, addr);
+    qapi_event_send_netdev_stream_connected(d->nc.name, addr);
     qapi_free_SocketAddress(addr);
-
-    return;
-error:
-    object_unref(OBJECT(s->ioc));
-    s->ioc = NULL;
-    net_stream_arm_reconnect(s);
 }
 
 static gboolean net_stream_reconnect(gpointer data)
@@ -378,7 +222,7 @@ static gboolean net_stream_reconnect(gpointer data)
     s->timer_tag = 0;
 
     sioc = qio_channel_socket_new();
-    s->ioc = QIO_CHANNEL(sioc);
+    s->data.ioc = QIO_CHANNEL(sioc);
     qio_channel_socket_connect_async(sioc, s->addr,
                                      net_stream_client_connected, s,
                                      NULL, NULL);
@@ -388,7 +232,7 @@ static gboolean net_stream_reconnect(gpointer data)
 static void net_stream_arm_reconnect(NetStreamState *s)
 {
     if (s->reconnect_ms && s->timer_tag == 0) {
-        qemu_set_info_str(&s->nc, "connecting");
+        qemu_set_info_str(&s->data.nc, "connecting");
         s->timer_tag = g_timeout_add(s->reconnect_ms, net_stream_reconnect, s);
     }
 }
@@ -405,11 +249,13 @@ static int net_stream_client_init(NetClientState *peer,
     QIOChannelSocket *sioc = qio_channel_socket_new();
 
     nc = qemu_new_net_client(&net_stream_info, peer, model, name);
-    s = DO_UPCAST(NetStreamState, nc, nc);
-    qemu_set_info_str(&s->nc, "connecting");
+    s = DO_UPCAST(NetStreamState, data.nc, nc);
+    qemu_set_info_str(&s->data.nc, "connecting");
 
-    s->ioc = QIO_CHANNEL(sioc);
-    s->nc.link_down = true;
+    s->data.ioc = QIO_CHANNEL(sioc);
+    s->data.nc.link_down = true;
+    s->data.send = net_stream_send;
+    s->data.listen = net_stream_listen;
 
     s->reconnect_ms = reconnect_ms;
     if (reconnect_ms) {