summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--block/sheepdog.c2
-rw-r--r--block/ssh.c2
-rw-r--r--include/io/channel.h92
-rw-r--r--include/qemu/sockets.h12
-rw-r--r--io/channel-socket.c2
-rw-r--r--io/channel.c94
-rw-r--r--tests/io-channel-helpers.c102
-rw-r--r--tests/test-io-channel-tls.c6
-rw-r--r--util/qemu-sockets.c205
9 files changed, 225 insertions, 292 deletions
diff --git a/block/sheepdog.c b/block/sheepdog.c
index 1052098ec5..696a71442a 100644
--- a/block/sheepdog.c
+++ b/block/sheepdog.c
@@ -591,7 +591,7 @@ static int connect_to_sdog(BDRVSheepdogState *s, Error **errp)
 {
     int fd;
 
-    fd = socket_connect(s->addr, NULL, NULL, errp);
+    fd = socket_connect(s->addr, errp);
 
     if (s->addr->type == SOCKET_ADDRESS_TYPE_INET && fd >= 0) {
         int ret = socket_set_nodelay(fd);
diff --git a/block/ssh.c b/block/ssh.c
index e8f0404c03..b049a16eb9 100644
--- a/block/ssh.c
+++ b/block/ssh.c
@@ -678,7 +678,7 @@ static int connect_to_ssh(BDRVSSHState *s, QDict *options,
     }
 
     /* Open the socket and connect. */
-    s->sock = inet_connect_saddr(s->inet, NULL, NULL, errp);
+    s->sock = inet_connect_saddr(s->inet, errp);
     if (s->sock < 0) {
         ret = -EIO;
         goto err;
diff --git a/include/io/channel.h b/include/io/channel.h
index db9bb022a1..8f25893c45 100644
--- a/include/io/channel.h
+++ b/include/io/channel.h
@@ -269,6 +269,58 @@ ssize_t qio_channel_writev_full(QIOChannel *ioc,
                                 Error **errp);
 
 /**
+ * qio_channel_readv_all:
+ * @ioc: the channel object
+ * @iov: the array of memory regions to read data into
+ * @niov: the length of the @iov array
+ * @errp: pointer to a NULL-initialized error object
+ *
+ * Read data from the IO channel, storing it in the
+ * memory regions referenced by @iov. Each element
+ * in the @iov will be fully populated with data
+ * before the next one is used. The @niov parameter
+ * specifies the total number of elements in @iov.
+ *
+ * The function will wait for all requested data
+ * to be read, yielding from the current coroutine
+ * if required.
+ *
+ * If end-of-file occurs before all requested data
+ * has been read, an error will be reported.
+ *
+ * Returns: 0 if all bytes were read, or -1 on error
+ */
+int qio_channel_readv_all(QIOChannel *ioc,
+                          const struct iovec *iov,
+                          size_t niov,
+                          Error **errp);
+
+
+/**
+ * qio_channel_writev_all:
+ * @ioc: the channel object
+ * @iov: the array of memory regions to write data from
+ * @niov: the length of the @iov array
+ * @errp: pointer to a NULL-initialized error object
+ *
+ * Write data to the IO channel, reading it from the
+ * memory regions referenced by @iov. Each element
+ * in the @iov will be fully sent, before the next
+ * one is used. The @niov parameter specifies the
+ * total number of elements in @iov.
+ *
+ * The function will wait for all requested data
+ * to be written, yielding from the current coroutine
+ * if required.
+ *
+ * Returns: 0 if all bytes were written, or -1 on error
+ */
+int qio_channel_writev_all(QIOChannel *ioc,
+                           const struct iovec *iov,
+                           size_t niov,
+                           Error **erp);
+
+/**
  * qio_channel_readv:
  * @ioc: the channel object
  * @iov: the array of memory regions to read data into
@@ -299,7 +351,7 @@ ssize_t qio_channel_writev(QIOChannel *ioc,
                            Error **errp);
 
 /**
- * qio_channel_readv:
+ * qio_channel_read:
  * @ioc: the channel object
  * @buf: the memory region to read data into
  * @buflen: the length of @buf
@@ -331,6 +383,44 @@ ssize_t qio_channel_write(QIOChannel *ioc,
                           Error **errp);
 
 /**
+ * qio_channel_read_all:
+ * @ioc: the channel object
+ * @buf: the memory region to read data into
+ * @buflen: the number of bytes to @buf
+ * @errp: pointer to a NULL-initialized error object
+ *
+ * Reads @buflen bytes into @buf, possibly blocking or (if the
+ * channel is non-blocking) yielding from the current coroutine
+ * multiple times until the entire content is read. If end-of-file
+ * occurs it will return an error rather than a short-read. Otherwise
+ * behaves as qio_channel_read().
+ *
+ * Returns: 0 if all bytes were read, or -1 on error
+ */
+int qio_channel_read_all(QIOChannel *ioc,
+                         char *buf,
+                         size_t buflen,
+                         Error **errp);
+/**
+ * qio_channel_write_all:
+ * @ioc: the channel object
+ * @buf: the memory region to write data into
+ * @buflen: the number of bytes to @buf
+ * @errp: pointer to a NULL-initialized error object
+ *
+ * Writes @buflen bytes from @buf, possibly blocking or (if the
+ * channel is non-blocking) yielding from the current coroutine
+ * multiple times until the entire content is written.  Otherwise
+ * behaves as qio_channel_write().
+ *
+ * Returns: 0 if all bytes were written, or -1 on error
+ */
+int qio_channel_write_all(QIOChannel *ioc,
+                          const char *buf,
+                          size_t buflen,
+                          Error **errp);
+
+/**
  * qio_channel_set_blocking:
  * @ioc: the channel object
  * @enabled: the blocking flag state
diff --git a/include/qemu/sockets.h b/include/qemu/sockets.h
index ef6b5591f7..639cc079d9 100644
--- a/include/qemu/sockets.h
+++ b/include/qemu/sockets.h
@@ -27,18 +27,11 @@ int socket_set_fast_reuse(int fd);
 #define SHUT_RDWR 2
 #endif
 
-/* callback function for nonblocking connect
- * valid fd on success, negative error code on failure
- */
-typedef void NonBlockingConnectHandler(int fd, Error *err, void *opaque);
-
 int inet_ai_family_from_address(InetSocketAddress *addr,
                                 Error **errp);
 int inet_parse(InetSocketAddress *addr, const char *str, Error **errp);
 int inet_connect(const char *str, Error **errp);
-int inet_connect_saddr(InetSocketAddress *saddr,
-                       NonBlockingConnectHandler *callback, void *opaque,
-                       Error **errp);
+int inet_connect_saddr(InetSocketAddress *saddr, Error **errp);
 
 NetworkAddressFamily inet_netfamily(int family);
 
@@ -46,8 +39,7 @@ int unix_listen(const char *path, char *ostr, int olen, Error **errp);
 int unix_connect(const char *path, Error **errp);
 
 SocketAddress *socket_parse(const char *str, Error **errp);
-int socket_connect(SocketAddress *addr, NonBlockingConnectHandler *callback,
-                   void *opaque, Error **errp);
+int socket_connect(SocketAddress *addr, Error **errp);
 int socket_listen(SocketAddress *addr, Error **errp);
 void socket_listen_cleanup(int fd, Error **errp);
 int socket_dgram(SocketAddress *remote, SocketAddress *local, Error **errp);
diff --git a/io/channel-socket.c b/io/channel-socket.c
index 591d27e8c3..563e297357 100644
--- a/io/channel-socket.c
+++ b/io/channel-socket.c
@@ -140,7 +140,7 @@ int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
     int fd;
 
     trace_qio_channel_socket_connect_sync(ioc, addr);
-    fd = socket_connect(addr, NULL, NULL, errp);
+    fd = socket_connect(addr, errp);
     if (fd < 0) {
         trace_qio_channel_socket_connect_fail(ioc);
         return -1;
diff --git a/io/channel.c b/io/channel.c
index 1cfb8b33a2..5e8c2f0a91 100644
--- a/io/channel.c
+++ b/io/channel.c
@@ -22,6 +22,7 @@
 #include "io/channel.h"
 #include "qapi/error.h"
 #include "qemu/main-loop.h"
+#include "qemu/iov.h"
 
 bool qio_channel_has_feature(QIOChannel *ioc,
                              QIOChannelFeature feature)
@@ -85,6 +86,79 @@ ssize_t qio_channel_writev_full(QIOChannel *ioc,
 }
 
 
+
+int qio_channel_readv_all(QIOChannel *ioc,
+                          const struct iovec *iov,
+                          size_t niov,
+                          Error **errp)
+{
+    int ret = -1;
+    struct iovec *local_iov = g_new(struct iovec, niov);
+    struct iovec *local_iov_head = local_iov;
+    unsigned int nlocal_iov = niov;
+
+    nlocal_iov = iov_copy(local_iov, nlocal_iov,
+                          iov, niov,
+                          0, iov_size(iov, niov));
+
+    while (nlocal_iov > 0) {
+        ssize_t len;
+        len = qio_channel_readv(ioc, local_iov, nlocal_iov, errp);
+        if (len == QIO_CHANNEL_ERR_BLOCK) {
+            qio_channel_wait(ioc, G_IO_IN);
+            continue;
+        } else if (len < 0) {
+            goto cleanup;
+        } else if (len == 0) {
+            error_setg(errp,
+                       "Unexpected end-of-file before all bytes were read");
+            goto cleanup;
+        }
+
+        iov_discard_front(&local_iov, &nlocal_iov, len);
+    }
+
+    ret = 0;
+
+ cleanup:
+    g_free(local_iov_head);
+    return ret;
+}
+
+int qio_channel_writev_all(QIOChannel *ioc,
+                           const struct iovec *iov,
+                           size_t niov,
+                           Error **errp)
+{
+    int ret = -1;
+    struct iovec *local_iov = g_new(struct iovec, niov);
+    struct iovec *local_iov_head = local_iov;
+    unsigned int nlocal_iov = niov;
+
+    nlocal_iov = iov_copy(local_iov, nlocal_iov,
+                          iov, niov,
+                          0, iov_size(iov, niov));
+
+    while (nlocal_iov > 0) {
+        ssize_t len;
+        len = qio_channel_writev(ioc, local_iov, nlocal_iov, errp);
+        if (len == QIO_CHANNEL_ERR_BLOCK) {
+            qio_channel_wait(ioc, G_IO_OUT);
+            continue;
+        }
+        if (len < 0) {
+            goto cleanup;
+        }
+
+        iov_discard_front(&local_iov, &nlocal_iov, len);
+    }
+
+    ret = 0;
+ cleanup:
+    g_free(local_iov_head);
+    return ret;
+}
+
 ssize_t qio_channel_readv(QIOChannel *ioc,
                           const struct iovec *iov,
                           size_t niov,
@@ -123,6 +197,26 @@ ssize_t qio_channel_write(QIOChannel *ioc,
 }
 
 
+int qio_channel_read_all(QIOChannel *ioc,
+                         char *buf,
+                         size_t buflen,
+                         Error **errp)
+{
+    struct iovec iov = { .iov_base = buf, .iov_len = buflen };
+    return qio_channel_readv_all(ioc, &iov, 1, errp);
+}
+
+
+int qio_channel_write_all(QIOChannel *ioc,
+                          const char *buf,
+                          size_t buflen,
+                          Error **errp)
+{
+    struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
+    return qio_channel_writev_all(ioc, &iov, 1, errp);
+}
+
+
 int qio_channel_set_blocking(QIOChannel *ioc,
                               bool enabled,
                               Error **errp)
diff --git a/tests/io-channel-helpers.c b/tests/io-channel-helpers.c
index 05e5579cf8..5430e1389d 100644
--- a/tests/io-channel-helpers.c
+++ b/tests/io-channel-helpers.c
@@ -21,6 +21,7 @@
 #include "qemu/osdep.h"
 #include "io-channel-helpers.h"
 #include "qapi/error.h"
+#include "qemu/iov.h"
 
 struct QIOChannelTest {
     QIOChannel *src;
@@ -37,77 +38,17 @@ struct QIOChannelTest {
 };
 
 
-static void test_skip_iovec(struct iovec **iov,
-                            size_t *niov,
-                            size_t skip,
-                            struct iovec *old)
-{
-    size_t offset = 0;
-    size_t i;
-
-    for (i = 0; i < *niov; i++) {
-        if (skip < (*iov)[i].iov_len) {
-            old->iov_len = (*iov)[i].iov_len;
-            old->iov_base = (*iov)[i].iov_base;
-
-            (*iov)[i].iov_len -= skip;
-            (*iov)[i].iov_base += skip;
-            break;
-        } else {
-            skip -= (*iov)[i].iov_len;
-
-            if (i == 0 && old->iov_base) {
-                (*iov)[i].iov_len = old->iov_len;
-                (*iov)[i].iov_base = old->iov_base;
-                old->iov_len = 0;
-                old->iov_base = NULL;
-            }
-
-            offset++;
-        }
-    }
-
-    *iov = *iov + offset;
-    *niov -= offset;
-}
-
-
 /* This thread sends all data using iovecs */
 static gpointer test_io_thread_writer(gpointer opaque)
 {
     QIOChannelTest *data = opaque;
-    struct iovec *iov = data->inputv;
-    size_t niov = data->niov;
-    struct iovec old = { 0 };
 
     qio_channel_set_blocking(data->src, data->blocking, NULL);
 
-    while (niov) {
-        ssize_t ret;
-        ret = qio_channel_writev(data->src,
-                                 iov,
-                                 niov,
-                                 &data->writeerr);
-        if (ret == QIO_CHANNEL_ERR_BLOCK) {
-            if (data->blocking) {
-                error_setg(&data->writeerr,
-                           "Unexpected I/O blocking");
-                break;
-            } else {
-                qio_channel_wait(data->src,
-                                 G_IO_OUT);
-                continue;
-            }
-        } else if (ret < 0) {
-            break;
-        } else if (ret == 0) {
-            error_setg(&data->writeerr,
-                       "Unexpected zero length write");
-            break;
-        }
-
-        test_skip_iovec(&iov, &niov, ret, &old);
-    }
+    qio_channel_writev_all(data->src,
+                           data->inputv,
+                           data->niov,
+                           &data->writeerr);
 
     return NULL;
 }
@@ -117,38 +58,13 @@ static gpointer test_io_thread_writer(gpointer opaque)
 static gpointer test_io_thread_reader(gpointer opaque)
 {
     QIOChannelTest *data = opaque;
-    struct iovec *iov = data->outputv;
-    size_t niov = data->niov;
-    struct iovec old = { 0 };
 
     qio_channel_set_blocking(data->dst, data->blocking, NULL);
 
-    while (niov) {
-        ssize_t ret;
-
-        ret = qio_channel_readv(data->dst,
-                                iov,
-                                niov,
-                                &data->readerr);
-
-        if (ret == QIO_CHANNEL_ERR_BLOCK) {
-            if (data->blocking) {
-                error_setg(&data->readerr,
-                           "Unexpected I/O blocking");
-                break;
-            } else {
-                qio_channel_wait(data->dst,
-                                 G_IO_IN);
-                continue;
-            }
-        } else if (ret < 0) {
-            break;
-        } else if (ret == 0) {
-            break;
-        }
-
-        test_skip_iovec(&iov, &niov, ret, &old);
-    }
+    qio_channel_readv_all(data->dst,
+                          data->outputv,
+                          data->niov,
+                          &data->readerr);
 
     return NULL;
 }
diff --git a/tests/test-io-channel-tls.c b/tests/test-io-channel-tls.c
index 8eaa208e1b..a210d01ba5 100644
--- a/tests/test-io-channel-tls.c
+++ b/tests/test-io-channel-tls.c
@@ -127,8 +127,8 @@ static void test_io_channel_tls(const void *opaque)
     /* We'll use this for our fake client-server connection */
     g_assert(socketpair(AF_UNIX, SOCK_STREAM, 0, channel) == 0);
 
-#define CLIENT_CERT_DIR "tests/test-crypto-tlssession-client/"
-#define SERVER_CERT_DIR "tests/test-crypto-tlssession-server/"
+#define CLIENT_CERT_DIR "tests/test-io-channel-tls-client/"
+#define SERVER_CERT_DIR "tests/test-io-channel-tls-server/"
     mkdir(CLIENT_CERT_DIR, 0700);
     mkdir(SERVER_CERT_DIR, 0700);
 
@@ -218,7 +218,7 @@ static void test_io_channel_tls(const void *opaque)
     mainloop = g_main_context_default();
     do {
         g_main_context_iteration(mainloop, TRUE);
-    } while (!clientHandshake.finished &&
+    } while (!clientHandshake.finished ||
              !serverHandshake.finished);
 
     g_assert(clientHandshake.failed == data->expectClientFail);
diff --git a/util/qemu-sockets.c b/util/qemu-sockets.c
index 1358c81bcc..d149383bb9 100644
--- a/util/qemu-sockets.c
+++ b/util/qemu-sockets.c
@@ -300,88 +300,19 @@ listen:
     ((rc) == -EINPROGRESS)
 #endif
 
-/* Struct to store connect state for non blocking connect */
-typedef struct ConnectState {
-    int fd;
-    struct addrinfo *addr_list;
-    struct addrinfo *current_addr;
-    NonBlockingConnectHandler *callback;
-    void *opaque;
-} ConnectState;
-
-static int inet_connect_addr(struct addrinfo *addr, bool *in_progress,
-                             ConnectState *connect_state, Error **errp);
+static int inet_connect_addr(struct addrinfo *addr, Error **errp);
 
-static void wait_for_connect(void *opaque)
-{
-    ConnectState *s = opaque;
-    int val = 0, rc = 0;
-    socklen_t valsize = sizeof(val);
-    bool in_progress;
-    Error *err = NULL;
-
-    qemu_set_fd_handler(s->fd, NULL, NULL, NULL);
-
-    do {
-        rc = qemu_getsockopt(s->fd, SOL_SOCKET, SO_ERROR, &val, &valsize);
-    } while (rc == -1 && errno == EINTR);
-
-    /* update rc to contain error */
-    if (!rc && val) {
-        rc = -1;
-        errno = val;
-    }
-
-    /* connect error */
-    if (rc < 0) {
-        error_setg_errno(&err, errno, "Error connecting to socket");
-        closesocket(s->fd);
-        s->fd = rc;
-    }
-
-    /* try to connect to the next address on the list */
-    if (s->current_addr) {
-        while (s->current_addr->ai_next != NULL && s->fd < 0) {
-            s->current_addr = s->current_addr->ai_next;
-            s->fd = inet_connect_addr(s->current_addr, &in_progress, s, NULL);
-            if (s->fd < 0) {
-                error_free(err);
-                err = NULL;
-                error_setg_errno(&err, errno, "Unable to start socket connect");
-            }
-            /* connect in progress */
-            if (in_progress) {
-                goto out;
-            }
-        }
-
-        freeaddrinfo(s->addr_list);
-    }
-
-    if (s->callback) {
-        s->callback(s->fd, err, s->opaque);
-    }
-    g_free(s);
-out:
-    error_free(err);
-}
-
-static int inet_connect_addr(struct addrinfo *addr, bool *in_progress,
-                             ConnectState *connect_state, Error **errp)
+static int inet_connect_addr(struct addrinfo *addr, Error **errp)
 {
     int sock, rc;
 
-    *in_progress = false;
-
     sock = qemu_socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
     if (sock < 0) {
         error_setg_errno(errp, errno, "Failed to create socket");
         return -1;
     }
     socket_set_fast_reuse(sock);
-    if (connect_state != NULL) {
-        qemu_set_nonblock(sock);
-    }
+
     /* connect to peer */
     do {
         rc = 0;
@@ -390,15 +321,12 @@ static int inet_connect_addr(struct addrinfo *addr, bool *in_progress,
         }
     } while (rc == -EINTR);
 
-    if (connect_state != NULL && QEMU_SOCKET_RC_INPROGRESS(rc)) {
-        connect_state->fd = sock;
-        qemu_set_fd_handler(sock, NULL, wait_for_connect, connect_state);
-        *in_progress = true;
-    } else if (rc < 0) {
+    if (rc < 0) {
         error_setg_errno(errp, errno, "Failed to connect socket");
         closesocket(sock);
         return -1;
     }
+
     return sock;
 }
 
@@ -456,44 +384,24 @@ static struct addrinfo *inet_parse_connect_saddr(InetSocketAddress *saddr,
  *
  * @saddr: Inet socket address specification
  * @errp: set on error
- * @callback: callback function for non-blocking connect
- * @opaque: opaque for callback function
  *
  * Returns: -1 on error, file descriptor on success.
- *
- * If @callback is non-null, the connect is non-blocking.  If this
- * function succeeds, callback will be called when the connection
- * completes, with the file descriptor on success, or -1 on error.
  */
-int inet_connect_saddr(InetSocketAddress *saddr,
-                       NonBlockingConnectHandler *callback, void *opaque,
-                       Error **errp)
+int inet_connect_saddr(InetSocketAddress *saddr, Error **errp)
 {
     Error *local_err = NULL;
     struct addrinfo *res, *e;
     int sock = -1;
-    bool in_progress;
-    ConnectState *connect_state = NULL;
 
     res = inet_parse_connect_saddr(saddr, errp);
     if (!res) {
         return -1;
     }
 
-    if (callback != NULL) {
-        connect_state = g_malloc0(sizeof(*connect_state));
-        connect_state->addr_list = res;
-        connect_state->callback = callback;
-        connect_state->opaque = opaque;
-    }
-
     for (e = res; e != NULL; e = e->ai_next) {
         error_free(local_err);
         local_err = NULL;
-        if (connect_state != NULL) {
-            connect_state->current_addr = e;
-        }
-        sock = inet_connect_addr(e, &in_progress, connect_state, &local_err);
+        sock = inet_connect_addr(e, &local_err);
         if (sock >= 0) {
             break;
         }
@@ -501,15 +409,8 @@ int inet_connect_saddr(InetSocketAddress *saddr,
 
     if (sock < 0) {
         error_propagate(errp, local_err);
-    } else if (in_progress) {
-        /* wait_for_connect() will do the rest */
-        return sock;
-    } else {
-        if (callback) {
-            callback(sock, NULL, opaque);
-        }
     }
-    g_free(connect_state);
+
     freeaddrinfo(res);
     return sock;
 }
@@ -688,7 +589,7 @@ int inet_connect(const char *str, Error **errp)
     InetSocketAddress *addr = g_new(InetSocketAddress, 1);
 
     if (!inet_parse(addr, str, errp)) {
-        sock = inet_connect_saddr(addr, NULL, NULL, errp);
+        sock = inet_connect_saddr(addr, errp);
     }
     qapi_free_InetSocketAddress(addr);
     return sock;
@@ -721,21 +622,16 @@ static bool vsock_parse_vaddr_to_sockaddr(const VsockSocketAddress *vaddr,
     return true;
 }
 
-static int vsock_connect_addr(const struct sockaddr_vm *svm, bool *in_progress,
-                              ConnectState *connect_state, Error **errp)
+static int vsock_connect_addr(const struct sockaddr_vm *svm, Error **errp)
 {
     int sock, rc;
 
-    *in_progress = false;
-
     sock = qemu_socket(AF_VSOCK, SOCK_STREAM, 0);
     if (sock < 0) {
         error_setg_errno(errp, errno, "Failed to create socket");
         return -1;
     }
-    if (connect_state != NULL) {
-        qemu_set_nonblock(sock);
-    }
+
     /* connect to peer */
     do {
         rc = 0;
@@ -744,50 +640,26 @@ static int vsock_connect_addr(const struct sockaddr_vm *svm, bool *in_progress,
         }
     } while (rc == -EINTR);
 
-    if (connect_state != NULL && QEMU_SOCKET_RC_INPROGRESS(rc)) {
-        connect_state->fd = sock;
-        qemu_set_fd_handler(sock, NULL, wait_for_connect, connect_state);
-        *in_progress = true;
-    } else if (rc < 0) {
+    if (rc < 0) {
         error_setg_errno(errp, errno, "Failed to connect socket");
         closesocket(sock);
         return -1;
     }
+
     return sock;
 }
 
-static int vsock_connect_saddr(VsockSocketAddress *vaddr,
-                               NonBlockingConnectHandler *callback,
-                               void *opaque,
-                               Error **errp)
+static int vsock_connect_saddr(VsockSocketAddress *vaddr, Error **errp)
 {
     struct sockaddr_vm svm;
     int sock = -1;
-    bool in_progress;
-    ConnectState *connect_state = NULL;
 
     if (!vsock_parse_vaddr_to_sockaddr(vaddr, &svm, errp)) {
         return -1;
     }
 
-    if (callback != NULL) {
-        connect_state = g_malloc0(sizeof(*connect_state));
-        connect_state->callback = callback;
-        connect_state->opaque = opaque;
-    }
+    sock = vsock_connect_addr(&svm, errp);
 
-    sock = vsock_connect_addr(&svm, &in_progress, connect_state, errp);
-    if (sock < 0) {
-        /* do nothing */
-    } else if (in_progress) {
-        /* wait_for_connect() will do the rest */
-        return sock;
-    } else {
-        if (callback) {
-            callback(sock, NULL, opaque);
-        }
-    }
-    g_free(connect_state);
     return sock;
 }
 
@@ -847,9 +719,7 @@ static void vsock_unsupported(Error **errp)
     error_setg(errp, "socket family AF_VSOCK unsupported");
 }
 
-static int vsock_connect_saddr(VsockSocketAddress *vaddr,
-                               NonBlockingConnectHandler *callback,
-                               void *opaque, Error **errp)
+static int vsock_connect_saddr(VsockSocketAddress *vaddr, Error **errp)
 {
     vsock_unsupported(errp);
     return -1;
@@ -952,12 +822,9 @@ err:
     return -1;
 }
 
-static int unix_connect_saddr(UnixSocketAddress *saddr,
-                              NonBlockingConnectHandler *callback, void *opaque,
-                              Error **errp)
+static int unix_connect_saddr(UnixSocketAddress *saddr, Error **errp)
 {
     struct sockaddr_un un;
-    ConnectState *connect_state = NULL;
     int sock, rc;
 
     if (saddr->path == NULL) {
@@ -970,12 +837,6 @@ static int unix_connect_saddr(UnixSocketAddress *saddr,
         error_setg_errno(errp, errno, "Failed to create socket");
         return -1;
     }
-    if (callback != NULL) {
-        connect_state = g_malloc0(sizeof(*connect_state));
-        connect_state->callback = callback;
-        connect_state->opaque = opaque;
-        qemu_set_nonblock(sock);
-    }
 
     if (strlen(saddr->path) > sizeof(un.sun_path)) {
         error_setg(errp, "UNIX socket path '%s' is too long", saddr->path);
@@ -996,29 +857,16 @@ static int unix_connect_saddr(UnixSocketAddress *saddr,
         }
     } while (rc == -EINTR);
 
-    if (connect_state != NULL && QEMU_SOCKET_RC_INPROGRESS(rc)) {
-        connect_state->fd = sock;
-        qemu_set_fd_handler(sock, NULL, wait_for_connect, connect_state);
-        return sock;
-    } else if (rc >= 0) {
-        /* non blocking socket immediate success, call callback */
-        if (callback != NULL) {
-            callback(sock, NULL, opaque);
-        }
-    }
-
     if (rc < 0) {
         error_setg_errno(errp, -rc, "Failed to connect socket %s",
                          saddr->path);
         goto err;
     }
 
-    g_free(connect_state);
     return sock;
 
  err:
     close(sock);
-    g_free(connect_state);
     return -1;
 }
 
@@ -1033,9 +881,7 @@ static int unix_listen_saddr(UnixSocketAddress *saddr,
     return -1;
 }
 
-static int unix_connect_saddr(UnixSocketAddress *saddr,
-                              NonBlockingConnectHandler *callback, void *opaque,
-                              Error **errp)
+static int unix_connect_saddr(UnixSocketAddress *saddr, Error **errp)
 {
     error_setg(errp, "unix sockets are not available on windows");
     errno = ENOTSUP;
@@ -1081,7 +927,7 @@ int unix_connect(const char *path, Error **errp)
 
     saddr = g_new0(UnixSocketAddress, 1);
     saddr->path = g_strdup(path);
-    sock = unix_connect_saddr(saddr, NULL, NULL, errp);
+    sock = unix_connect_saddr(saddr, errp);
     qapi_free_UnixSocketAddress(saddr);
     return sock;
 }
@@ -1126,30 +972,25 @@ fail:
     return NULL;
 }
 
-int socket_connect(SocketAddress *addr, NonBlockingConnectHandler *callback,
-                   void *opaque, Error **errp)
+int socket_connect(SocketAddress *addr, Error **errp)
 {
     int fd;
 
     switch (addr->type) {
     case SOCKET_ADDRESS_TYPE_INET:
-        fd = inet_connect_saddr(&addr->u.inet, callback, opaque, errp);
+        fd = inet_connect_saddr(&addr->u.inet, errp);
         break;
 
     case SOCKET_ADDRESS_TYPE_UNIX:
-        fd = unix_connect_saddr(&addr->u.q_unix, callback, opaque, errp);
+        fd = unix_connect_saddr(&addr->u.q_unix, errp);
         break;
 
     case SOCKET_ADDRESS_TYPE_FD:
         fd = monitor_get_fd(cur_mon, addr->u.fd.str, errp);
-        if (fd >= 0 && callback) {
-            qemu_set_nonblock(fd);
-            callback(fd, NULL, opaque);
-        }
         break;
 
     case SOCKET_ADDRESS_TYPE_VSOCK:
-        fd = vsock_connect_saddr(&addr->u.vsock, callback, opaque, errp);
+        fd = vsock_connect_saddr(&addr->u.vsock, errp);
         break;
 
     default: