summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--tools/virtiofsd/fuse_virtio.c359
1 files changed, 201 insertions, 158 deletions
diff --git a/tools/virtiofsd/fuse_virtio.c b/tools/virtiofsd/fuse_virtio.c
index f6242f9338..0dcf2ef57a 100644
--- a/tools/virtiofsd/fuse_virtio.c
+++ b/tools/virtiofsd/fuse_virtio.c
@@ -22,6 +22,7 @@
 
 #include <assert.h>
 #include <errno.h>
+#include <glib.h>
 #include <stdint.h>
 #include <stdio.h>
 #include <stdlib.h>
@@ -37,17 +38,28 @@
 struct fv_VuDev;
 struct fv_QueueInfo {
     pthread_t thread;
+    /*
+     * This lock protects the VuVirtq preventing races between
+     * fv_queue_thread() and fv_queue_worker().
+     */
+    pthread_mutex_t vq_lock;
+
     struct fv_VuDev *virtio_dev;
 
     /* Our queue index, corresponds to array position */
     int qidx;
     int kick_fd;
     int kill_fd; /* For killing the thread */
+};
 
-    /* The element for the command currently being processed */
-    VuVirtqElement *qe;
+/* A FUSE request */
+typedef struct {
+    VuVirtqElement elem;
+    struct fuse_chan ch;
+
+    /* Used to complete requests that involve no reply */
     bool reply_sent;
-};
+} FVRequest;
 
 /*
  * We pass the dev element into libvhost-user
@@ -191,8 +203,11 @@ static void copy_iov(struct iovec *src_iov, int src_count,
 int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
                     struct iovec *iov, int count)
 {
-    VuVirtqElement *elem;
-    VuVirtq *q;
+    FVRequest *req = container_of(ch, FVRequest, ch);
+    struct fv_QueueInfo *qi = ch->qi;
+    VuDev *dev = &se->virtio_dev->dev;
+    VuVirtq *q = vu_get_queue(dev, qi->qidx);
+    VuVirtqElement *elem = &req->elem;
     int ret = 0;
 
     assert(count >= 1);
@@ -205,11 +220,7 @@ int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
 
     /* unique == 0 is notification, which we don't support */
     assert(out->unique);
-    /* For virtio we always have ch */
-    assert(ch);
-    assert(!ch->qi->reply_sent);
-    elem = ch->qi->qe;
-    q = &ch->qi->virtio_dev->dev.vq[ch->qi->qidx];
+    assert(!req->reply_sent);
 
     /* The 'in' part of the elem is to qemu */
     unsigned int in_num = elem->in_num;
@@ -236,9 +247,15 @@ int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
     }
 
     copy_iov(iov, count, in_sg, in_num, tosend_len);
-    vu_queue_push(&se->virtio_dev->dev, q, elem, tosend_len);
-    vu_queue_notify(&se->virtio_dev->dev, q);
-    ch->qi->reply_sent = true;
+
+    pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
+    pthread_mutex_lock(&qi->vq_lock);
+    vu_queue_push(dev, q, elem, tosend_len);
+    vu_queue_notify(dev, q);
+    pthread_mutex_unlock(&qi->vq_lock);
+    pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
+
+    req->reply_sent = true;
 
 err:
     return ret;
@@ -254,9 +271,12 @@ int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
                          struct iovec *iov, int count, struct fuse_bufvec *buf,
                          size_t len)
 {
+    FVRequest *req = container_of(ch, FVRequest, ch);
+    struct fv_QueueInfo *qi = ch->qi;
+    VuDev *dev = &se->virtio_dev->dev;
+    VuVirtq *q = vu_get_queue(dev, qi->qidx);
+    VuVirtqElement *elem = &req->elem;
     int ret = 0;
-    VuVirtqElement *elem;
-    VuVirtq *q;
 
     assert(count >= 1);
     assert(iov[0].iov_len >= sizeof(struct fuse_out_header));
@@ -275,11 +295,7 @@ int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
     /* unique == 0 is notification which we don't support */
     assert(out->unique);
 
-    /* For virtio we always have ch */
-    assert(ch);
-    assert(!ch->qi->reply_sent);
-    elem = ch->qi->qe;
-    q = &ch->qi->virtio_dev->dev.vq[ch->qi->qidx];
+    assert(!req->reply_sent);
 
     /* The 'in' part of the elem is to qemu */
     unsigned int in_num = elem->in_num;
@@ -395,33 +411,175 @@ int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
 
     ret = 0;
 
-    vu_queue_push(&se->virtio_dev->dev, q, elem, tosend_len);
-    vu_queue_notify(&se->virtio_dev->dev, q);
+    pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
+    pthread_mutex_lock(&qi->vq_lock);
+    vu_queue_push(dev, q, elem, tosend_len);
+    vu_queue_notify(dev, q);
+    pthread_mutex_unlock(&qi->vq_lock);
+    pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
 
 err:
     if (ret == 0) {
-        ch->qi->reply_sent = true;
+        req->reply_sent = true;
     }
 
     return ret;
 }
 
+/* Process one FVRequest in a thread pool */
+static void fv_queue_worker(gpointer data, gpointer user_data)
+{
+    struct fv_QueueInfo *qi = user_data;
+    struct fuse_session *se = qi->virtio_dev->se;
+    struct VuDev *dev = &qi->virtio_dev->dev;
+    FVRequest *req = data;
+    VuVirtqElement *elem = &req->elem;
+    struct fuse_buf fbuf = {};
+    bool allocated_bufv = false;
+    struct fuse_bufvec bufv;
+    struct fuse_bufvec *pbufv;
+
+    assert(se->bufsize > sizeof(struct fuse_in_header));
+
+    /*
+     * An element contains one request and the space to send our response
+     * They're spread over multiple descriptors in a scatter/gather set
+     * and we can't trust the guest to keep them still; so copy in/out.
+     */
+    fbuf.mem = malloc(se->bufsize);
+    assert(fbuf.mem);
+
+    fuse_mutex_init(&req->ch.lock);
+    req->ch.fd = -1;
+    req->ch.qi = qi;
+
+    /* The 'out' part of the elem is from qemu */
+    unsigned int out_num = elem->out_num;
+    struct iovec *out_sg = elem->out_sg;
+    size_t out_len = iov_size(out_sg, out_num);
+    fuse_log(FUSE_LOG_DEBUG,
+             "%s: elem %d: with %d out desc of length %zd\n",
+             __func__, elem->index, out_num, out_len);
+
+    /*
+     * The elem should contain a 'fuse_in_header' (in to fuse)
+     * plus the data based on the len in the header.
+     */
+    if (out_len < sizeof(struct fuse_in_header)) {
+        fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for in_header\n",
+                 __func__, elem->index);
+        assert(0); /* TODO */
+    }
+    if (out_len > se->bufsize) {
+        fuse_log(FUSE_LOG_ERR, "%s: elem %d too large for buffer\n", __func__,
+                 elem->index);
+        assert(0); /* TODO */
+    }
+    /* Copy just the first element and look at it */
+    copy_from_iov(&fbuf, 1, out_sg);
+
+    pbufv = NULL; /* Compiler thinks an unitialised path */
+    if (out_num > 2 &&
+        out_sg[0].iov_len == sizeof(struct fuse_in_header) &&
+        ((struct fuse_in_header *)fbuf.mem)->opcode == FUSE_WRITE &&
+        out_sg[1].iov_len == sizeof(struct fuse_write_in)) {
+        /*
+         * For a write we don't actually need to copy the
+         * data, we can just do it straight out of guest memory
+         * but we must still copy the headers in case the guest
+         * was nasty and changed them while we were using them.
+         */
+        fuse_log(FUSE_LOG_DEBUG, "%s: Write special case\n", __func__);
+
+        /* copy the fuse_write_in header afte rthe fuse_in_header */
+        fbuf.mem += out_sg->iov_len;
+        copy_from_iov(&fbuf, 1, out_sg + 1);
+        fbuf.mem -= out_sg->iov_len;
+        fbuf.size = out_sg[0].iov_len + out_sg[1].iov_len;
+
+        /* Allocate the bufv, with space for the rest of the iov */
+        pbufv = malloc(sizeof(struct fuse_bufvec) +
+                       sizeof(struct fuse_buf) * (out_num - 2));
+        if (!pbufv) {
+            fuse_log(FUSE_LOG_ERR, "%s: pbufv malloc failed\n",
+                    __func__);
+            goto out;
+        }
+
+        allocated_bufv = true;
+        pbufv->count = 1;
+        pbufv->buf[0] = fbuf;
+
+        size_t iovindex, pbufvindex;
+        iovindex = 2; /* 2 headers, separate iovs */
+        pbufvindex = 1; /* 2 headers, 1 fusebuf */
+
+        for (; iovindex < out_num; iovindex++, pbufvindex++) {
+            pbufv->count++;
+            pbufv->buf[pbufvindex].pos = ~0; /* Dummy */
+            pbufv->buf[pbufvindex].flags = 0;
+            pbufv->buf[pbufvindex].mem = out_sg[iovindex].iov_base;
+            pbufv->buf[pbufvindex].size = out_sg[iovindex].iov_len;
+        }
+    } else {
+        /* Normal (non fast write) path */
+
+        /* Copy the rest of the buffer */
+        fbuf.mem += out_sg->iov_len;
+        copy_from_iov(&fbuf, out_num - 1, out_sg + 1);
+        fbuf.mem -= out_sg->iov_len;
+        fbuf.size = out_len;
+
+        /* TODO! Endianness of header */
+
+        /* TODO: Add checks for fuse_session_exited */
+        bufv.buf[0] = fbuf;
+        bufv.count = 1;
+        pbufv = &bufv;
+    }
+    pbufv->idx = 0;
+    pbufv->off = 0;
+    fuse_session_process_buf_int(se, pbufv, &req->ch);
+
+out:
+    if (allocated_bufv) {
+        free(pbufv);
+    }
+
+    /* If the request has no reply, still recycle the virtqueue element */
+    if (!req->reply_sent) {
+        struct VuVirtq *q = vu_get_queue(dev, qi->qidx);
+
+        fuse_log(FUSE_LOG_DEBUG, "%s: elem %d no reply sent\n", __func__,
+                 elem->index);
+
+        pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
+        pthread_mutex_lock(&qi->vq_lock);
+        vu_queue_push(dev, q, elem, 0);
+        vu_queue_notify(dev, q);
+        pthread_mutex_unlock(&qi->vq_lock);
+        pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
+    }
+
+    pthread_mutex_destroy(&req->ch.lock);
+    free(fbuf.mem);
+    free(req);
+}
+
 /* Thread function for individual queues, created when a queue is 'started' */
 static void *fv_queue_thread(void *opaque)
 {
     struct fv_QueueInfo *qi = opaque;
     struct VuDev *dev = &qi->virtio_dev->dev;
     struct VuVirtq *q = vu_get_queue(dev, qi->qidx);
-    struct fuse_session *se = qi->virtio_dev->se;
-    struct fuse_chan ch;
-    struct fuse_buf fbuf;
+    GThreadPool *pool;
 
-    fbuf.mem = NULL;
-    fbuf.flags = 0;
-
-    fuse_mutex_init(&ch.lock);
-    ch.fd = (int)0xdaff0d111;
-    ch.qi = qi;
+    pool = g_thread_pool_new(fv_queue_worker, qi, 1 /* TODO max_threads */,
+                             TRUE, NULL);
+    if (!pool) {
+        fuse_log(FUSE_LOG_ERR, "%s: g_thread_pool_new failed\n", __func__);
+        return NULL;
+    }
 
     fuse_log(FUSE_LOG_INFO, "%s: Start for queue %d kick_fd %d\n", __func__,
              qi->qidx, qi->kick_fd);
@@ -478,6 +636,7 @@ static void *fv_queue_thread(void *opaque)
         /* Mutual exclusion with virtio_loop() */
         ret = pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
         assert(ret == 0); /* there is no possible error case */
+        pthread_mutex_lock(&qi->vq_lock);
         /* out is from guest, in is too guest */
         unsigned int in_bytes, out_bytes;
         vu_queue_get_avail_bytes(dev, q, &in_bytes, &out_bytes, ~0, ~0);
@@ -486,141 +645,22 @@ static void *fv_queue_thread(void *opaque)
                  "%s: Queue %d gave evalue: %zx available: in: %u out: %u\n",
                  __func__, qi->qidx, (size_t)evalue, in_bytes, out_bytes);
 
-
         while (1) {
-            bool allocated_bufv = false;
-            struct fuse_bufvec bufv;
-            struct fuse_bufvec *pbufv;
-
-            /*
-             * An element contains one request and the space to send our
-             * response They're spread over multiple descriptors in a
-             * scatter/gather set and we can't trust the guest to keep them
-             * still; so copy in/out.
-             */
-            VuVirtqElement *elem = vu_queue_pop(dev, q, sizeof(VuVirtqElement));
-            if (!elem) {
+            FVRequest *req = vu_queue_pop(dev, q, sizeof(FVRequest));
+            if (!req) {
                 break;
             }
 
-            qi->qe = elem;
-            qi->reply_sent = false;
+            req->reply_sent = false;
 
-            if (!fbuf.mem) {
-                fbuf.mem = malloc(se->bufsize);
-                assert(fbuf.mem);
-                assert(se->bufsize > sizeof(struct fuse_in_header));
-            }
-            /* The 'out' part of the elem is from qemu */
-            unsigned int out_num = elem->out_num;
-            struct iovec *out_sg = elem->out_sg;
-            size_t out_len = iov_size(out_sg, out_num);
-            fuse_log(FUSE_LOG_DEBUG,
-                     "%s: elem %d: with %d out desc of length %zd\n", __func__,
-                     elem->index, out_num, out_len);
-
-            /*
-             * The elem should contain a 'fuse_in_header' (in to fuse)
-             * plus the data based on the len in the header.
-             */
-            if (out_len < sizeof(struct fuse_in_header)) {
-                fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for in_header\n",
-                         __func__, elem->index);
-                assert(0); /* TODO */
-            }
-            if (out_len > se->bufsize) {
-                fuse_log(FUSE_LOG_ERR, "%s: elem %d too large for buffer\n",
-                         __func__, elem->index);
-                assert(0); /* TODO */
-            }
-            /* Copy just the first element and look at it */
-            copy_from_iov(&fbuf, 1, out_sg);
-
-            if (out_num > 2 &&
-                out_sg[0].iov_len == sizeof(struct fuse_in_header) &&
-                ((struct fuse_in_header *)fbuf.mem)->opcode == FUSE_WRITE &&
-                out_sg[1].iov_len == sizeof(struct fuse_write_in)) {
-                /*
-                 * For a write we don't actually need to copy the
-                 * data, we can just do it straight out of guest memory
-                 * but we must still copy the headers in case the guest
-                 * was nasty and changed them while we were using them.
-                 */
-                fuse_log(FUSE_LOG_DEBUG, "%s: Write special case\n", __func__);
-
-                /* copy the fuse_write_in header after the fuse_in_header */
-                fbuf.mem += out_sg->iov_len;
-                copy_from_iov(&fbuf, 1, out_sg + 1);
-                fbuf.mem -= out_sg->iov_len;
-                fbuf.size = out_sg[0].iov_len + out_sg[1].iov_len;
-
-                /* Allocate the bufv, with space for the rest of the iov */
-                allocated_bufv = true;
-                pbufv = malloc(sizeof(struct fuse_bufvec) +
-                               sizeof(struct fuse_buf) * (out_num - 2));
-                if (!pbufv) {
-                    vu_queue_unpop(dev, q, elem, 0);
-                    free(elem);
-                    fuse_log(FUSE_LOG_ERR, "%s: pbufv malloc failed\n",
-                             __func__);
-                    goto out;
-                }
-
-                pbufv->count = 1;
-                pbufv->buf[0] = fbuf;
-
-                size_t iovindex, pbufvindex;
-                iovindex = 2; /* 2 headers, separate iovs */
-                pbufvindex = 1; /* 2 headers, 1 fusebuf */
-
-                for (; iovindex < out_num; iovindex++, pbufvindex++) {
-                    pbufv->count++;
-                    pbufv->buf[pbufvindex].pos = ~0; /* Dummy */
-                    pbufv->buf[pbufvindex].flags = 0;
-                    pbufv->buf[pbufvindex].mem = out_sg[iovindex].iov_base;
-                    pbufv->buf[pbufvindex].size = out_sg[iovindex].iov_len;
-                }
-            } else {
-                /* Normal (non fast write) path */
-
-                /* Copy the rest of the buffer */
-                fbuf.mem += out_sg->iov_len;
-                copy_from_iov(&fbuf, out_num - 1, out_sg + 1);
-                fbuf.mem -= out_sg->iov_len;
-                fbuf.size = out_len;
-
-                /* TODO! Endianness of header */
-
-                /* TODO: Add checks for fuse_session_exited */
-                bufv.buf[0] = fbuf;
-                bufv.count = 1;
-                pbufv = &bufv;
-            }
-            pbufv->idx = 0;
-            pbufv->off = 0;
-            fuse_session_process_buf_int(se, pbufv, &ch);
-
-            if (allocated_bufv) {
-                free(pbufv);
-            }
-
-            if (!qi->reply_sent) {
-                fuse_log(FUSE_LOG_DEBUG, "%s: elem %d no reply sent\n",
-                         __func__, elem->index);
-                /* I think we've still got to recycle the element */
-                vu_queue_push(dev, q, elem, 0);
-                vu_queue_notify(dev, q);
-            }
-            qi->qe = NULL;
-            free(elem);
-            elem = NULL;
+            g_thread_pool_push(pool, req, NULL);
         }
 
+        pthread_mutex_unlock(&qi->vq_lock);
         pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
     }
-out:
-    pthread_mutex_destroy(&ch.lock);
-    free(fbuf.mem);
+
+    g_thread_pool_free(pool, FALSE, TRUE);
 
     return NULL;
 }
@@ -643,6 +683,7 @@ static void fv_queue_cleanup_thread(struct fv_VuDev *vud, int qidx)
         fuse_log(FUSE_LOG_ERR, "%s: Failed to join thread idx %d err %d\n",
                  __func__, qidx, ret);
     }
+    pthread_mutex_destroy(&ourqi->vq_lock);
     close(ourqi->kill_fd);
     ourqi->kick_fd = -1;
     free(vud->qi[qidx]);
@@ -696,6 +737,8 @@ static void fv_queue_set_started(VuDev *dev, int qidx, bool started)
 
         ourqi->kill_fd = eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
         assert(ourqi->kill_fd != -1);
+        pthread_mutex_init(&ourqi->vq_lock, NULL);
+
         if (pthread_create(&ourqi->thread, NULL, fv_queue_thread, ourqi)) {
             fuse_log(FUSE_LOG_ERR, "%s: Failed to create thread for queue %d\n",
                      __func__, qidx);