summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--net/meson.build3
-rw-r--r--net/stream.c282
-rw-r--r--net/stream_data.c193
-rw-r--r--net/stream_data.h31
4 files changed, 290 insertions, 219 deletions
diff --git a/net/meson.build b/net/meson.build
index bb97b4dcbe..bb3c011e5a 100644
--- a/net/meson.build
+++ b/net/meson.build
@@ -1,6 +1,7 @@
 system_ss.add(files(
   'announce.c',
   'checksum.c',
+  'dgram.c',
   'dump.c',
   'eth.c',
   'filter-buffer.c',
@@ -12,7 +13,7 @@ system_ss.add(files(
   'queue.c',
   'socket.c',
   'stream.c',
-  'dgram.c',
+  'stream_data.c',
   'util.c',
 ))
 
diff --git a/net/stream.c b/net/stream.c
index 6152d2a05e..d893f02cab 100644
--- a/net/stream.c
+++ b/net/stream.c
@@ -27,173 +27,50 @@
 
 #include "net/net.h"
 #include "clients.h"
-#include "monitor/monitor.h"
 #include "qapi/error.h"
-#include "qemu/error-report.h"
-#include "qemu/option.h"
-#include "qemu/sockets.h"
-#include "qemu/iov.h"
-#include "qemu/main-loop.h"
-#include "qemu/cutils.h"
-#include "io/channel.h"
-#include "io/channel-socket.h"
 #include "io/net-listener.h"
 #include "qapi/qapi-events-net.h"
 #include "qapi/qapi-visit-sockets.h"
 #include "qapi/clone-visitor.h"
 
+#include "stream_data.h"
+
 typedef struct NetStreamState {
-    NetClientState nc;
-    QIOChannel *listen_ioc;
-    QIONetListener *listener;
-    QIOChannel *ioc;
-    guint ioc_read_tag;
-    guint ioc_write_tag;
-    SocketReadState rs;
-    unsigned int send_index;      /* number of bytes sent*/
+    NetStreamData data;
     uint32_t reconnect_ms;
     guint timer_tag;
     SocketAddress *addr;
 } NetStreamState;
 
-static void net_stream_listen(QIONetListener *listener,
-                              QIOChannelSocket *cioc,
-                              void *opaque);
 static void net_stream_arm_reconnect(NetStreamState *s);
 
-static gboolean net_stream_writable(QIOChannel *ioc,
-                                    GIOCondition condition,
-                                    gpointer data)
-{
-    NetStreamState *s = data;
-
-    s->ioc_write_tag = 0;
-
-    qemu_flush_queued_packets(&s->nc);
-
-    return G_SOURCE_REMOVE;
-}
-
 static ssize_t net_stream_receive(NetClientState *nc, const uint8_t *buf,
                                   size_t size)
 {
-    NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
-    uint32_t len = htonl(size);
-    struct iovec iov[] = {
-        {
-            .iov_base = &len,
-            .iov_len  = sizeof(len),
-        }, {
-            .iov_base = (void *)buf,
-            .iov_len  = size,
-        },
-    };
-    struct iovec local_iov[2];
-    unsigned int nlocal_iov;
-    size_t remaining;
-    ssize_t ret;
-
-    remaining = iov_size(iov, 2) - s->send_index;
-    nlocal_iov = iov_copy(local_iov, 2, iov, 2, s->send_index, remaining);
-    ret = qio_channel_writev(s->ioc, local_iov, nlocal_iov, NULL);
-    if (ret == QIO_CHANNEL_ERR_BLOCK) {
-        ret = 0; /* handled further down */
-    }
-    if (ret == -1) {
-        s->send_index = 0;
-        return -errno;
-    }
-    if (ret < (ssize_t)remaining) {
-        s->send_index += ret;
-        s->ioc_write_tag = qio_channel_add_watch(s->ioc, G_IO_OUT,
-                                                 net_stream_writable, s, NULL);
-        return 0;
-    }
-    s->send_index = 0;
-    return size;
-}
-
-static gboolean net_stream_send(QIOChannel *ioc,
-                                GIOCondition condition,
-                                gpointer data);
-
-static void net_stream_send_completed(NetClientState *nc, ssize_t len)
-{
-    NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
-
-    if (!s->ioc_read_tag) {
-        s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN,
-                                                net_stream_send, s, NULL);
-    }
-}
+    NetStreamData *d = DO_UPCAST(NetStreamData, nc, nc);
 
-static void net_stream_rs_finalize(SocketReadState *rs)
-{
-    NetStreamState *s = container_of(rs, NetStreamState, rs);
-
-    if (qemu_send_packet_async(&s->nc, rs->buf,
-                               rs->packet_len,
-                               net_stream_send_completed) == 0) {
-        if (s->ioc_read_tag) {
-            g_source_remove(s->ioc_read_tag);
-            s->ioc_read_tag = 0;
-        }
-    }
+    return net_stream_data_receive(d, buf, size);
 }
 
 static gboolean net_stream_send(QIOChannel *ioc,
                                 GIOCondition condition,
                                 gpointer data)
 {
-    NetStreamState *s = data;
-    int size;
-    int ret;
-    QEMU_UNINITIALIZED char buf1[NET_BUFSIZE];
-    const char *buf;
-
-    size = qio_channel_read(s->ioc, buf1, sizeof(buf1), NULL);
-    if (size < 0) {
-        if (errno != EWOULDBLOCK) {
-            goto eoc;
-        }
-    } else if (size == 0) {
-        /* end of connection */
-    eoc:
-        s->ioc_read_tag = 0;
-        if (s->ioc_write_tag) {
-            g_source_remove(s->ioc_write_tag);
-            s->ioc_write_tag = 0;
-        }
-        if (s->listener) {
-            qemu_set_info_str(&s->nc, "listening");
-            qio_net_listener_set_client_func(s->listener, net_stream_listen,
-                                             s, NULL);
-        }
-        object_unref(OBJECT(s->ioc));
-        s->ioc = NULL;
-
-        net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
-        s->nc.link_down = true;
+    if (net_stream_data_send(ioc, condition, data) == G_SOURCE_REMOVE) {
+        NetStreamState *s = DO_UPCAST(NetStreamState, data, data);
 
-        qapi_event_send_netdev_stream_disconnected(s->nc.name);
+        qapi_event_send_netdev_stream_disconnected(s->data.nc.name);
         net_stream_arm_reconnect(s);
 
         return G_SOURCE_REMOVE;
     }
-    buf = buf1;
-
-    ret = net_fill_rstate(&s->rs, (const uint8_t *)buf, size);
-
-    if (ret == -1) {
-        goto eoc;
-    }
 
     return G_SOURCE_CONTINUE;
 }
 
 static void net_stream_cleanup(NetClientState *nc)
 {
-    NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
+    NetStreamState *s = DO_UPCAST(NetStreamState, data.nc, nc);
     if (s->timer_tag) {
         g_source_remove(s->timer_tag);
         s->timer_tag = 0;
@@ -202,28 +79,28 @@ static void net_stream_cleanup(NetClientState *nc)
         qapi_free_SocketAddress(s->addr);
         s->addr = NULL;
     }
-    if (s->ioc) {
-        if (QIO_CHANNEL_SOCKET(s->ioc)->fd != -1) {
-            if (s->ioc_read_tag) {
-                g_source_remove(s->ioc_read_tag);
-                s->ioc_read_tag = 0;
+    if (s->data.ioc) {
+        if (QIO_CHANNEL_SOCKET(s->data.ioc)->fd != -1) {
+            if (s->data.ioc_read_tag) {
+                g_source_remove(s->data.ioc_read_tag);
+                s->data.ioc_read_tag = 0;
             }
-            if (s->ioc_write_tag) {
-                g_source_remove(s->ioc_write_tag);
-                s->ioc_write_tag = 0;
+            if (s->data.ioc_write_tag) {
+                g_source_remove(s->data.ioc_write_tag);
+                s->data.ioc_write_tag = 0;
             }
         }
-        object_unref(OBJECT(s->ioc));
-        s->ioc = NULL;
+        object_unref(OBJECT(s->data.ioc));
+        s->data.ioc = NULL;
     }
-    if (s->listen_ioc) {
-        if (s->listener) {
-            qio_net_listener_disconnect(s->listener);
-            object_unref(OBJECT(s->listener));
-            s->listener = NULL;
+    if (s->data.listen_ioc) {
+        if (s->data.listener) {
+            qio_net_listener_disconnect(s->data.listener);
+            object_unref(OBJECT(s->data.listener));
+            s->data.listener = NULL;
         }
-        object_unref(OBJECT(s->listen_ioc));
-        s->listen_ioc = NULL;
+        object_unref(OBJECT(s->data.listen_ioc));
+        s->data.listen_ioc = NULL;
     }
 }
 
@@ -235,23 +112,13 @@ static NetClientInfo net_stream_info = {
 };
 
 static void net_stream_listen(QIONetListener *listener,
-                              QIOChannelSocket *cioc,
-                              void *opaque)
+                                  QIOChannelSocket *cioc, gpointer data)
 {
-    NetStreamState *s = opaque;
+    NetStreamData *d = data;
     SocketAddress *addr;
     char *uri;
 
-    object_ref(OBJECT(cioc));
-
-    qio_net_listener_set_client_func(s->listener, NULL, s, NULL);
-
-    s->ioc = QIO_CHANNEL(cioc);
-    qio_channel_set_name(s->ioc, "stream-server");
-    s->nc.link_down = false;
-
-    s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send,
-                                            s, NULL);
+    net_stream_data_listen(listener, cioc, data);
 
     if (cioc->localAddr.ss_family == AF_UNIX) {
         addr = qio_channel_socket_get_local_address(cioc, NULL);
@@ -260,22 +127,22 @@ static void net_stream_listen(QIONetListener *listener,
     }
     g_assert(addr != NULL);
     uri = socket_uri(addr);
-    qemu_set_info_str(&s->nc, "%s", uri);
+    qemu_set_info_str(&d->nc, "%s", uri);
     g_free(uri);
-    qapi_event_send_netdev_stream_connected(s->nc.name, addr);
+    qapi_event_send_netdev_stream_connected(d->nc.name, addr);
     qapi_free_SocketAddress(addr);
 }
 
 static void net_stream_server_listening(QIOTask *task, gpointer opaque)
 {
-    NetStreamState *s = opaque;
-    QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(s->listen_ioc);
+    NetStreamData *d = opaque;
+    QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(d->listen_ioc);
     SocketAddress *addr;
     int ret;
     Error *err = NULL;
 
     if (qio_task_propagate_error(task, &err)) {
-        qemu_set_info_str(&s->nc, "error: %s", error_get_pretty(err));
+        qemu_set_info_str(&d->nc, "error: %s", error_get_pretty(err));
         error_free(err);
         return;
     }
@@ -284,20 +151,21 @@ static void net_stream_server_listening(QIOTask *task, gpointer opaque)
     g_assert(addr != NULL);
     ret = qemu_socket_try_set_nonblock(listen_sioc->fd);
     if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
-        qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)",
+        qemu_set_info_str(&d->nc, "can't use file descriptor %s (errno %d)",
                           addr->u.fd.str, -ret);
         return;
     }
     g_assert(ret == 0);
     qapi_free_SocketAddress(addr);
 
-    s->nc.link_down = true;
-    s->listener = qio_net_listener_new();
+    d->nc.link_down = true;
+    d->listener = qio_net_listener_new();
 
-    qemu_set_info_str(&s->nc, "listening");
-    net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
-    qio_net_listener_set_client_func(s->listener, net_stream_listen, s, NULL);
-    qio_net_listener_add(s->listener, listen_sioc);
+    qemu_set_info_str(&d->nc, "listening");
+    net_socket_rs_init(&d->rs, net_stream_data_rs_finalize, false);
+    qio_net_listener_set_client_func(d->listener, d->listen, d,
+                                     NULL);
+    qio_net_listener_add(d->listener, listen_sioc);
 }
 
 static int net_stream_server_init(NetClientState *peer,
@@ -307,16 +175,18 @@ static int net_stream_server_init(NetClientState *peer,
                                   Error **errp)
 {
     NetClientState *nc;
-    NetStreamState *s;
+    NetStreamData *d;
     QIOChannelSocket *listen_sioc = qio_channel_socket_new();
 
     nc = qemu_new_net_client(&net_stream_info, peer, model, name);
-    s = DO_UPCAST(NetStreamState, nc, nc);
-    qemu_set_info_str(&s->nc, "initializing");
+    d = DO_UPCAST(NetStreamData, nc, nc);
+    d->send = net_stream_send;
+    d->listen = net_stream_listen;
+    qemu_set_info_str(&d->nc, "initializing");
 
-    s->listen_ioc = QIO_CHANNEL(listen_sioc);
+    d->listen_ioc = QIO_CHANNEL(listen_sioc);
     qio_channel_socket_listen_async(listen_sioc, addr, 0,
-                                    net_stream_server_listening, s,
+                                    net_stream_server_listening, d,
                                     NULL, NULL);
 
     return 0;
@@ -325,49 +195,23 @@ static int net_stream_server_init(NetClientState *peer,
 static void net_stream_client_connected(QIOTask *task, gpointer opaque)
 {
     NetStreamState *s = opaque;
-    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(s->ioc);
+    NetStreamData *d = &s->data;
+    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(d->ioc);
     SocketAddress *addr;
     gchar *uri;
-    int ret;
-    Error *err = NULL;
 
-    if (qio_task_propagate_error(task, &err)) {
-        qemu_set_info_str(&s->nc, "error: %s", error_get_pretty(err));
-        error_free(err);
-        goto error;
+    if (net_stream_data_client_connected(task, d) == -1) {
+        net_stream_arm_reconnect(s);
+        return;
     }
 
     addr = qio_channel_socket_get_remote_address(sioc, NULL);
     g_assert(addr != NULL);
     uri = socket_uri(addr);
-    qemu_set_info_str(&s->nc, "%s", uri);
+    qemu_set_info_str(&d->nc, "%s", uri);
     g_free(uri);
-
-    ret = qemu_socket_try_set_nonblock(sioc->fd);
-    if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
-        qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)",
-                          addr->u.fd.str, -ret);
-        qapi_free_SocketAddress(addr);
-        goto error;
-    }
-    g_assert(ret == 0);
-
-    net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
-
-    /* Disable Nagle algorithm on TCP sockets to reduce latency */
-    qio_channel_set_delay(s->ioc, false);
-
-    s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send,
-                                            s, NULL);
-    s->nc.link_down = false;
-    qapi_event_send_netdev_stream_connected(s->nc.name, addr);
+    qapi_event_send_netdev_stream_connected(d->nc.name, addr);
     qapi_free_SocketAddress(addr);
-
-    return;
-error:
-    object_unref(OBJECT(s->ioc));
-    s->ioc = NULL;
-    net_stream_arm_reconnect(s);
 }
 
 static gboolean net_stream_reconnect(gpointer data)
@@ -378,7 +222,7 @@ static gboolean net_stream_reconnect(gpointer data)
     s->timer_tag = 0;
 
     sioc = qio_channel_socket_new();
-    s->ioc = QIO_CHANNEL(sioc);
+    s->data.ioc = QIO_CHANNEL(sioc);
     qio_channel_socket_connect_async(sioc, s->addr,
                                      net_stream_client_connected, s,
                                      NULL, NULL);
@@ -388,7 +232,7 @@ static gboolean net_stream_reconnect(gpointer data)
 static void net_stream_arm_reconnect(NetStreamState *s)
 {
     if (s->reconnect_ms && s->timer_tag == 0) {
-        qemu_set_info_str(&s->nc, "connecting");
+        qemu_set_info_str(&s->data.nc, "connecting");
         s->timer_tag = g_timeout_add(s->reconnect_ms, net_stream_reconnect, s);
     }
 }
@@ -405,11 +249,13 @@ static int net_stream_client_init(NetClientState *peer,
     QIOChannelSocket *sioc = qio_channel_socket_new();
 
     nc = qemu_new_net_client(&net_stream_info, peer, model, name);
-    s = DO_UPCAST(NetStreamState, nc, nc);
-    qemu_set_info_str(&s->nc, "connecting");
+    s = DO_UPCAST(NetStreamState, data.nc, nc);
+    qemu_set_info_str(&s->data.nc, "connecting");
 
-    s->ioc = QIO_CHANNEL(sioc);
-    s->nc.link_down = true;
+    s->data.ioc = QIO_CHANNEL(sioc);
+    s->data.nc.link_down = true;
+    s->data.send = net_stream_send;
+    s->data.listen = net_stream_listen;
 
     s->reconnect_ms = reconnect_ms;
     if (reconnect_ms) {
diff --git a/net/stream_data.c b/net/stream_data.c
new file mode 100644
index 0000000000..5af27e0d1d
--- /dev/null
+++ b/net/stream_data.c
@@ -0,0 +1,193 @@
+/*
+ * net stream generic functions
+ *
+ * Copyright Red Hat
+ *
+ * SPDX-License-Identifier: GPL-2.0-or-later
+ */
+
+#include "qemu/osdep.h"
+#include "qemu/iov.h"
+#include "qapi/error.h"
+#include "net/net.h"
+#include "io/channel.h"
+#include "io/net-listener.h"
+
+#include "stream_data.h"
+
+static gboolean net_stream_data_writable(QIOChannel *ioc,
+                                         GIOCondition condition, gpointer data)
+{
+    NetStreamData *d = data;
+
+    d->ioc_write_tag = 0;
+
+    qemu_flush_queued_packets(&d->nc);
+
+    return G_SOURCE_REMOVE;
+}
+
+ssize_t net_stream_data_receive(NetStreamData *d, const uint8_t *buf,
+                                size_t size)
+{
+    uint32_t len = htonl(size);
+    struct iovec iov[] = {
+        {
+            .iov_base = &len,
+            .iov_len  = sizeof(len),
+        }, {
+            .iov_base = (void *)buf,
+            .iov_len  = size,
+        },
+    };
+    struct iovec local_iov[2];
+    unsigned int nlocal_iov;
+    size_t remaining;
+    ssize_t ret;
+
+    remaining = iov_size(iov, 2) - d->send_index;
+    nlocal_iov = iov_copy(local_iov, 2, iov, 2, d->send_index, remaining);
+    ret = qio_channel_writev(d->ioc, local_iov, nlocal_iov, NULL);
+    if (ret == QIO_CHANNEL_ERR_BLOCK) {
+        ret = 0; /* handled further down */
+    }
+    if (ret == -1) {
+        d->send_index = 0;
+        return -errno;
+    }
+    if (ret < (ssize_t)remaining) {
+        d->send_index += ret;
+        d->ioc_write_tag = qio_channel_add_watch(d->ioc, G_IO_OUT,
+                                                 net_stream_data_writable, d,
+                                                 NULL);
+        return 0;
+    }
+    d->send_index = 0;
+    return size;
+}
+
+static void net_stream_data_send_completed(NetClientState *nc, ssize_t len)
+{
+    NetStreamData *d = DO_UPCAST(NetStreamData, nc, nc);
+
+    if (!d->ioc_read_tag) {
+        d->ioc_read_tag = qio_channel_add_watch(d->ioc, G_IO_IN, d->send, d,
+                                                NULL);
+    }
+}
+
+void net_stream_data_rs_finalize(SocketReadState *rs)
+{
+    NetStreamData *d = container_of(rs, NetStreamData, rs);
+
+    if (qemu_send_packet_async(&d->nc, rs->buf,
+                               rs->packet_len,
+                               net_stream_data_send_completed) == 0) {
+        if (d->ioc_read_tag) {
+            g_source_remove(d->ioc_read_tag);
+            d->ioc_read_tag = 0;
+        }
+    }
+}
+
+gboolean net_stream_data_send(QIOChannel *ioc, GIOCondition condition,
+                              NetStreamData *d)
+{
+    int size;
+    int ret;
+    QEMU_UNINITIALIZED char buf1[NET_BUFSIZE];
+    const char *buf;
+
+    size = qio_channel_read(d->ioc, buf1, sizeof(buf1), NULL);
+    if (size < 0) {
+        if (errno != EWOULDBLOCK) {
+            goto eoc;
+        }
+    } else if (size == 0) {
+        /* end of connection */
+    eoc:
+        d->ioc_read_tag = 0;
+        if (d->ioc_write_tag) {
+            g_source_remove(d->ioc_write_tag);
+            d->ioc_write_tag = 0;
+        }
+        if (d->listener) {
+            qemu_set_info_str(&d->nc, "listening");
+            qio_net_listener_set_client_func(d->listener,
+                                             d->listen, d, NULL);
+        }
+        object_unref(OBJECT(d->ioc));
+        d->ioc = NULL;
+
+        net_socket_rs_init(&d->rs, net_stream_data_rs_finalize, false);
+        d->nc.link_down = true;
+
+        return G_SOURCE_REMOVE;
+    }
+    buf = buf1;
+
+    ret = net_fill_rstate(&d->rs, (const uint8_t *)buf, size);
+
+    if (ret == -1) {
+        goto eoc;
+    }
+
+    return G_SOURCE_CONTINUE;
+}
+
+void net_stream_data_listen(QIONetListener *listener,
+                            QIOChannelSocket *cioc,
+                            NetStreamData *d)
+{
+    object_ref(OBJECT(cioc));
+
+    qio_net_listener_set_client_func(d->listener, NULL, d, NULL);
+
+    d->ioc = QIO_CHANNEL(cioc);
+    qio_channel_set_name(d->ioc, "stream-server");
+    d->nc.link_down = false;
+
+    d->ioc_read_tag = qio_channel_add_watch(d->ioc, G_IO_IN, d->send, d, NULL);
+}
+
+int net_stream_data_client_connected(QIOTask *task, NetStreamData *d)
+{
+    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(d->ioc);
+    SocketAddress *addr;
+    int ret;
+    Error *err = NULL;
+
+    if (qio_task_propagate_error(task, &err)) {
+        qemu_set_info_str(&d->nc, "error: %s", error_get_pretty(err));
+        error_free(err);
+        goto error;
+    }
+
+    addr = qio_channel_socket_get_remote_address(sioc, NULL);
+    g_assert(addr != NULL);
+
+    ret = qemu_socket_try_set_nonblock(sioc->fd);
+    if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
+        qemu_set_info_str(&d->nc, "can't use file descriptor %s (errno %d)",
+                          addr->u.fd.str, -ret);
+        qapi_free_SocketAddress(addr);
+        goto error;
+    }
+    g_assert(ret == 0);
+    qapi_free_SocketAddress(addr);
+
+    net_socket_rs_init(&d->rs, net_stream_data_rs_finalize, false);
+
+    /* Disable Nagle algorithm on TCP sockets to reduce latency */
+    qio_channel_set_delay(d->ioc, false);
+
+    d->ioc_read_tag = qio_channel_add_watch(d->ioc, G_IO_IN, d->send, d, NULL);
+    d->nc.link_down = false;
+
+    return 0;
+error:
+    object_unref(OBJECT(d->ioc));
+    d->ioc = NULL;
+
+    return -1;
+}
diff --git a/net/stream_data.h b/net/stream_data.h
new file mode 100644
index 0000000000..b868625665
--- /dev/null
+++ b/net/stream_data.h
@@ -0,0 +1,31 @@
+/*
+ * net stream generic functions
+ *
+ * Copyright Red Hat
+ *
+ * SPDX-License-Identifier: GPL-2.0-or-later
+ */
+
+typedef struct NetStreamData {
+    NetClientState nc;
+    QIOChannel *ioc;
+    guint ioc_read_tag;
+    guint ioc_write_tag;
+    SocketReadState rs;
+    unsigned int send_index;      /* number of bytes sent*/
+    QIOChannelFunc send;
+    /* server data */
+    QIOChannel *listen_ioc;
+    QIONetListener *listener;
+    QIONetListenerClientFunc listen;
+} NetStreamData;
+
+ssize_t net_stream_data_receive(NetStreamData *d, const uint8_t *buf,
+                                size_t size);
+void net_stream_data_rs_finalize(SocketReadState *rs);
+gboolean net_stream_data_send(QIOChannel *ioc, GIOCondition condition,
+                              NetStreamData *d);
+int net_stream_data_client_connected(QIOTask *task, NetStreamData *d);
+void net_stream_data_listen(QIONetListener *listener,
+                            QIOChannelSocket *cioc,
+                            NetStreamData *d);