summary refs log tree commit diff stats
path: root/net
diff options
context:
space:
mode:
Diffstat (limited to 'net')
-rw-r--r--net/hub.c6
-rw-r--r--net/queue.c40
-rw-r--r--net/queue.h2
-rw-r--r--net/socket.c110
4 files changed, 119 insertions, 39 deletions
diff --git a/net/hub.c b/net/hub.c
index ac157e32ee..650a8b4a40 100644
--- a/net/hub.c
+++ b/net/hub.c
@@ -97,12 +97,12 @@ static int net_hub_port_can_receive(NetClientState *nc)
             continue;
         }
 
-        if (!qemu_can_send_packet(&port->nc)) {
-            return 0;
+        if (qemu_can_send_packet(&port->nc)) {
+            return 1;
         }
     }
 
-    return 1;
+    return 0;
 }
 
 static ssize_t net_hub_port_receive(NetClientState *nc,
diff --git a/net/queue.c b/net/queue.c
index e8030aafe4..254f28013a 100644
--- a/net/queue.c
+++ b/net/queue.c
@@ -83,12 +83,12 @@ void qemu_del_net_queue(NetQueue *queue)
     g_free(queue);
 }
 
-static ssize_t qemu_net_queue_append(NetQueue *queue,
-                                     NetClientState *sender,
-                                     unsigned flags,
-                                     const uint8_t *buf,
-                                     size_t size,
-                                     NetPacketSent *sent_cb)
+static void qemu_net_queue_append(NetQueue *queue,
+                                  NetClientState *sender,
+                                  unsigned flags,
+                                  const uint8_t *buf,
+                                  size_t size,
+                                  NetPacketSent *sent_cb)
 {
     NetPacket *packet;
 
@@ -100,16 +100,14 @@ static ssize_t qemu_net_queue_append(NetQueue *queue,
     memcpy(packet->data, buf, size);
 
     QTAILQ_INSERT_TAIL(&queue->packets, packet, entry);
-
-    return size;
 }
 
-static ssize_t qemu_net_queue_append_iov(NetQueue *queue,
-                                         NetClientState *sender,
-                                         unsigned flags,
-                                         const struct iovec *iov,
-                                         int iovcnt,
-                                         NetPacketSent *sent_cb)
+static void qemu_net_queue_append_iov(NetQueue *queue,
+                                      NetClientState *sender,
+                                      unsigned flags,
+                                      const struct iovec *iov,
+                                      int iovcnt,
+                                      NetPacketSent *sent_cb)
 {
     NetPacket *packet;
     size_t max_len = 0;
@@ -133,8 +131,6 @@ static ssize_t qemu_net_queue_append_iov(NetQueue *queue,
     }
 
     QTAILQ_INSERT_TAIL(&queue->packets, packet, entry);
-
-    return packet->size;
 }
 
 static ssize_t qemu_net_queue_deliver(NetQueue *queue,
@@ -177,7 +173,8 @@ ssize_t qemu_net_queue_send(NetQueue *queue,
     ssize_t ret;
 
     if (queue->delivering || !qemu_can_send_packet(sender)) {
-        return qemu_net_queue_append(queue, sender, flags, data, size, sent_cb);
+        qemu_net_queue_append(queue, sender, flags, data, size, sent_cb);
+        return 0;
     }
 
     ret = qemu_net_queue_deliver(queue, sender, flags, data, size);
@@ -201,8 +198,8 @@ ssize_t qemu_net_queue_send_iov(NetQueue *queue,
     ssize_t ret;
 
     if (queue->delivering || !qemu_can_send_packet(sender)) {
-        return qemu_net_queue_append_iov(queue, sender, flags,
-                                         iov, iovcnt, sent_cb);
+        qemu_net_queue_append_iov(queue, sender, flags, iov, iovcnt, sent_cb);
+        return 0;
     }
 
     ret = qemu_net_queue_deliver_iov(queue, sender, flags, iov, iovcnt);
@@ -228,7 +225,7 @@ void qemu_net_queue_purge(NetQueue *queue, NetClientState *from)
     }
 }
 
-void qemu_net_queue_flush(NetQueue *queue)
+bool qemu_net_queue_flush(NetQueue *queue)
 {
     while (!QTAILQ_EMPTY(&queue->packets)) {
         NetPacket *packet;
@@ -244,7 +241,7 @@ void qemu_net_queue_flush(NetQueue *queue)
                                      packet->size);
         if (ret == 0) {
             QTAILQ_INSERT_HEAD(&queue->packets, packet, entry);
-            break;
+            return false;
         }
 
         if (packet->sent_cb) {
@@ -253,4 +250,5 @@ void qemu_net_queue_flush(NetQueue *queue)
 
         g_free(packet);
     }
+    return true;
 }
diff --git a/net/queue.h b/net/queue.h
index 9d44a9b3b8..fc02b33915 100644
--- a/net/queue.h
+++ b/net/queue.h
@@ -53,6 +53,6 @@ ssize_t qemu_net_queue_send_iov(NetQueue *queue,
                                 NetPacketSent *sent_cb);
 
 void qemu_net_queue_purge(NetQueue *queue, NetClientState *from);
-void qemu_net_queue_flush(NetQueue *queue);
+bool qemu_net_queue_flush(NetQueue *queue);
 
 #endif /* QEMU_NET_QUEUE_H */
diff --git a/net/socket.c b/net/socket.c
index 7c602e4c3a..5e0c92e062 100644
--- a/net/socket.c
+++ b/net/socket.c
@@ -32,6 +32,7 @@
 #include "qemu-error.h"
 #include "qemu-option.h"
 #include "qemu_socket.h"
+#include "iov.h"
 
 typedef struct NetSocketState {
     NetClientState nc;
@@ -40,29 +41,106 @@ typedef struct NetSocketState {
     int state; /* 0 = getting length, 1 = getting data */
     unsigned int index;
     unsigned int packet_len;
+    unsigned int send_index;      /* number of bytes sent (only SOCK_STREAM) */
     uint8_t buf[4096];
     struct sockaddr_in dgram_dst; /* contains inet host and port destination iff connectionless (SOCK_DGRAM) */
+    IOHandler *send_fn;           /* differs between SOCK_STREAM/SOCK_DGRAM */
+    bool read_poll;               /* waiting to receive data? */
+    bool write_poll;              /* waiting to transmit data? */
 } NetSocketState;
 
 static void net_socket_accept(void *opaque);
+static void net_socket_writable(void *opaque);
+
+/* Only read packets from socket when peer can receive them */
+static int net_socket_can_send(void *opaque)
+{
+    NetSocketState *s = opaque;
+
+    return qemu_can_send_packet(&s->nc);
+}
+
+static void net_socket_update_fd_handler(NetSocketState *s)
+{
+    qemu_set_fd_handler2(s->fd,
+                         s->read_poll  ? net_socket_can_send : NULL,
+                         s->read_poll  ? s->send_fn : NULL,
+                         s->write_poll ? net_socket_writable : NULL,
+                         s);
+}
+
+static void net_socket_read_poll(NetSocketState *s, bool enable)
+{
+    s->read_poll = enable;
+    net_socket_update_fd_handler(s);
+}
+
+static void net_socket_write_poll(NetSocketState *s, bool enable)
+{
+    s->write_poll = enable;
+    net_socket_update_fd_handler(s);
+}
+
+static void net_socket_writable(void *opaque)
+{
+    NetSocketState *s = opaque;
+
+    net_socket_write_poll(s, false);
+
+    qemu_flush_queued_packets(&s->nc);
+}
 
-/* XXX: we consider we can send the whole packet without blocking */
 static ssize_t net_socket_receive(NetClientState *nc, const uint8_t *buf, size_t size)
 {
     NetSocketState *s = DO_UPCAST(NetSocketState, nc, nc);
-    uint32_t len;
-    len = htonl(size);
-
-    send_all(s->fd, (const uint8_t *)&len, sizeof(len));
-    return send_all(s->fd, buf, size);
+    uint32_t len = htonl(size);
+    struct iovec iov[] = {
+        {
+            .iov_base = &len,
+            .iov_len  = sizeof(len),
+        }, {
+            .iov_base = (void *)buf,
+            .iov_len  = size,
+        },
+    };
+    size_t remaining;
+    ssize_t ret;
+
+    remaining = iov_size(iov, 2) - s->send_index;
+    ret = iov_send(s->fd, iov, 2, s->send_index, remaining);
+
+    if (ret == -1 && errno == EAGAIN) {
+        ret = 0; /* handled further down */
+    }
+    if (ret == -1) {
+        s->send_index = 0;
+        return -errno;
+    }
+    if (ret < (ssize_t)remaining) {
+        s->send_index += ret;
+        net_socket_write_poll(s, true);
+        return 0;
+    }
+    s->send_index = 0;
+    return size;
 }
 
 static ssize_t net_socket_receive_dgram(NetClientState *nc, const uint8_t *buf, size_t size)
 {
     NetSocketState *s = DO_UPCAST(NetSocketState, nc, nc);
+    ssize_t ret;
+
+    do {
+        ret = sendto(s->fd, buf, size, 0,
+                     (struct sockaddr *)&s->dgram_dst,
+                     sizeof(s->dgram_dst));
+    } while (ret == -1 && errno == EINTR);
 
-    return sendto(s->fd, (const void *)buf, size, 0,
-                  (struct sockaddr *)&s->dgram_dst, sizeof(s->dgram_dst));
+    if (ret == -1 && errno == EAGAIN) {
+        net_socket_write_poll(s, true);
+        return 0;
+    }
+    return ret;
 }
 
 static void net_socket_send(void *opaque)
@@ -81,7 +159,8 @@ static void net_socket_send(void *opaque)
     } else if (size == 0) {
         /* end of connection */
     eoc:
-        qemu_set_fd_handler(s->fd, NULL, NULL, NULL);
+        net_socket_read_poll(s, false);
+        net_socket_write_poll(s, false);
         if (s->listen_fd != -1) {
             qemu_set_fd_handler(s->listen_fd, net_socket_accept, NULL, s);
         }
@@ -152,7 +231,8 @@ static void net_socket_send_dgram(void *opaque)
         return;
     if (size == 0) {
         /* end of connection */
-        qemu_set_fd_handler(s->fd, NULL, NULL, NULL);
+        net_socket_read_poll(s, false);
+        net_socket_write_poll(s, false);
         return;
     }
     qemu_send_packet(&s->nc, s->buf, size);
@@ -243,7 +323,8 @@ static void net_socket_cleanup(NetClientState *nc)
 {
     NetSocketState *s = DO_UPCAST(NetSocketState, nc, nc);
     if (s->fd != -1) {
-        qemu_set_fd_handler(s->fd, NULL, NULL, NULL);
+        net_socket_read_poll(s, false);
+        net_socket_write_poll(s, false);
         close(s->fd);
         s->fd = -1;
     }
@@ -314,8 +395,8 @@ static NetSocketState *net_socket_fd_init_dgram(NetClientState *peer,
 
     s->fd = fd;
     s->listen_fd = -1;
-
-    qemu_set_fd_handler(s->fd, net_socket_send_dgram, NULL, s);
+    s->send_fn = net_socket_send_dgram;
+    net_socket_read_poll(s, true);
 
     /* mcast: save bound address as dst */
     if (is_connected) {
@@ -332,7 +413,8 @@ err:
 static void net_socket_connect(void *opaque)
 {
     NetSocketState *s = opaque;
-    qemu_set_fd_handler(s->fd, net_socket_send, NULL, s);
+    s->send_fn = net_socket_send;
+    net_socket_read_poll(s, true);
 }
 
 static NetClientInfo net_socket_info = {