summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--event-loop-base.c23
-rw-r--r--include/block/aio.h10
-rw-r--r--include/block/thread-pool.h3
-rw-r--r--include/sysemu/event-loop-base.h4
-rw-r--r--iothread.c3
-rw-r--r--qapi/qom.json10
-rw-r--r--util/aio-posix.c1
-rw-r--r--util/async.c20
-rw-r--r--util/main-loop.c9
-rw-r--r--util/thread-pool.c55
10 files changed, 133 insertions, 5 deletions
diff --git a/event-loop-base.c b/event-loop-base.c
index e7f99a6ec8..d5be4dc6fc 100644
--- a/event-loop-base.c
+++ b/event-loop-base.c
@@ -14,6 +14,7 @@
 #include "qemu/osdep.h"
 #include "qom/object_interfaces.h"
 #include "qapi/error.h"
+#include "block/thread-pool.h"
 #include "sysemu/event-loop-base.h"
 
 typedef struct {
@@ -21,9 +22,22 @@ typedef struct {
     ptrdiff_t offset; /* field's byte offset in EventLoopBase struct */
 } EventLoopBaseParamInfo;
 
+static void event_loop_base_instance_init(Object *obj)
+{
+    EventLoopBase *base = EVENT_LOOP_BASE(obj);
+
+    base->thread_pool_max = THREAD_POOL_MAX_THREADS_DEFAULT;
+}
+
 static EventLoopBaseParamInfo aio_max_batch_info = {
     "aio-max-batch", offsetof(EventLoopBase, aio_max_batch),
 };
+static EventLoopBaseParamInfo thread_pool_min_info = {
+    "thread-pool-min", offsetof(EventLoopBase, thread_pool_min),
+};
+static EventLoopBaseParamInfo thread_pool_max_info = {
+    "thread-pool-max", offsetof(EventLoopBase, thread_pool_max),
+};
 
 static void event_loop_base_get_param(Object *obj, Visitor *v,
         const char *name, void *opaque, Error **errp)
@@ -95,12 +109,21 @@ static void event_loop_base_class_init(ObjectClass *klass, void *class_data)
                               event_loop_base_get_param,
                               event_loop_base_set_param,
                               NULL, &aio_max_batch_info);
+    object_class_property_add(klass, "thread-pool-min", "int",
+                              event_loop_base_get_param,
+                              event_loop_base_set_param,
+                              NULL, &thread_pool_min_info);
+    object_class_property_add(klass, "thread-pool-max", "int",
+                              event_loop_base_get_param,
+                              event_loop_base_set_param,
+                              NULL, &thread_pool_max_info);
 }
 
 static const TypeInfo event_loop_base_info = {
     .name = TYPE_EVENT_LOOP_BASE,
     .parent = TYPE_OBJECT,
     .instance_size = sizeof(EventLoopBase),
+    .instance_init = event_loop_base_instance_init,
     .class_size = sizeof(EventLoopBaseClass),
     .class_init = event_loop_base_class_init,
     .abstract = true,
diff --git a/include/block/aio.h b/include/block/aio.h
index 5634173b12..d128558f1d 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -192,6 +192,8 @@ struct AioContext {
     QSLIST_HEAD(, Coroutine) scheduled_coroutines;
     QEMUBH *co_schedule_bh;
 
+    int thread_pool_min;
+    int thread_pool_max;
     /* Thread pool for performing work and receiving completion callbacks.
      * Has its own locking.
      */
@@ -769,4 +771,12 @@ void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
 void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch,
                                 Error **errp);
 
+/**
+ * aio_context_set_thread_pool_params:
+ * @ctx: the aio context
+ * @min: min number of threads to have readily available in the thread pool
+ * @min: max number of threads the thread pool can contain
+ */
+void aio_context_set_thread_pool_params(AioContext *ctx, int64_t min,
+                                        int64_t max, Error **errp);
 #endif
diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
index 7dd7d730a0..2020bcc92d 100644
--- a/include/block/thread-pool.h
+++ b/include/block/thread-pool.h
@@ -20,6 +20,8 @@
 
 #include "block/block.h"
 
+#define THREAD_POOL_MAX_THREADS_DEFAULT         64
+
 typedef int ThreadPoolFunc(void *opaque);
 
 typedef struct ThreadPool ThreadPool;
@@ -33,5 +35,6 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
 int coroutine_fn thread_pool_submit_co(ThreadPool *pool,
         ThreadPoolFunc *func, void *arg);
 void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg);
+void thread_pool_update_params(ThreadPool *pool, struct AioContext *ctx);
 
 #endif
diff --git a/include/sysemu/event-loop-base.h b/include/sysemu/event-loop-base.h
index fced4c9fea..2748bf6ae1 100644
--- a/include/sysemu/event-loop-base.h
+++ b/include/sysemu/event-loop-base.h
@@ -33,5 +33,9 @@ struct EventLoopBase {
 
     /* AioContext AIO engine parameters */
     int64_t aio_max_batch;
+
+    /* AioContext thread pool parameters */
+    int64_t thread_pool_min;
+    int64_t thread_pool_max;
 };
 #endif
diff --git a/iothread.c b/iothread.c
index 8fa2f3bfb8..529194a566 100644
--- a/iothread.c
+++ b/iothread.c
@@ -174,6 +174,9 @@ static void iothread_set_aio_context_params(EventLoopBase *base, Error **errp)
     aio_context_set_aio_params(iothread->ctx,
                                iothread->parent_obj.aio_max_batch,
                                errp);
+
+    aio_context_set_thread_pool_params(iothread->ctx, base->thread_pool_min,
+                                       base->thread_pool_max, errp);
 }
 
 
diff --git a/qapi/qom.json b/qapi/qom.json
index 7d4a2ac1b9..6a653c6636 100644
--- a/qapi/qom.json
+++ b/qapi/qom.json
@@ -508,10 +508,18 @@
 #                 0 means that the engine will use its default.
 #                 (default: 0)
 #
+# @thread-pool-min: minimum number of threads reserved in the thread pool
+#                   (default:0)
+#
+# @thread-pool-max: maximum number of threads the thread pool can contain
+#                   (default:64)
+#
 # Since: 7.1
 ##
 { 'struct': 'EventLoopBaseProperties',
-  'data': { '*aio-max-batch': 'int' } }
+  'data': { '*aio-max-batch': 'int',
+            '*thread-pool-min': 'int',
+            '*thread-pool-max': 'int' } }
 
 ##
 # @IothreadProperties:
diff --git a/util/aio-posix.c b/util/aio-posix.c
index be0182a3c6..731f3826c0 100644
--- a/util/aio-posix.c
+++ b/util/aio-posix.c
@@ -15,6 +15,7 @@
 
 #include "qemu/osdep.h"
 #include "block/block.h"
+#include "block/thread-pool.h"
 #include "qemu/main-loop.h"
 #include "qemu/rcu.h"
 #include "qemu/rcu_queue.h"
diff --git a/util/async.c b/util/async.c
index 2ea1172f3e..554ba70cca 100644
--- a/util/async.c
+++ b/util/async.c
@@ -563,6 +563,9 @@ AioContext *aio_context_new(Error **errp)
 
     ctx->aio_max_batch = 0;
 
+    ctx->thread_pool_min = 0;
+    ctx->thread_pool_max = THREAD_POOL_MAX_THREADS_DEFAULT;
+
     return ctx;
 fail:
     g_source_destroy(&ctx->source);
@@ -696,3 +699,20 @@ void qemu_set_current_aio_context(AioContext *ctx)
     assert(!get_my_aiocontext());
     set_my_aiocontext(ctx);
 }
+
+void aio_context_set_thread_pool_params(AioContext *ctx, int64_t min,
+                                        int64_t max, Error **errp)
+{
+
+    if (min > max || !max || min > INT_MAX || max > INT_MAX) {
+        error_setg(errp, "bad thread-pool-min/thread-pool-max values");
+        return;
+    }
+
+    ctx->thread_pool_min = min;
+    ctx->thread_pool_max = max;
+
+    if (ctx->thread_pool) {
+        thread_pool_update_params(ctx->thread_pool, ctx);
+    }
+}
diff --git a/util/main-loop.c b/util/main-loop.c
index e30f034815..f00a25451b 100644
--- a/util/main-loop.c
+++ b/util/main-loop.c
@@ -30,6 +30,7 @@
 #include "sysemu/replay.h"
 #include "qemu/main-loop.h"
 #include "block/aio.h"
+#include "block/thread-pool.h"
 #include "qemu/error-report.h"
 #include "qemu/queue.h"
 #include "qemu/compiler.h"
@@ -187,12 +188,20 @@ int qemu_init_main_loop(Error **errp)
 
 static void main_loop_update_params(EventLoopBase *base, Error **errp)
 {
+    ERRP_GUARD();
+
     if (!qemu_aio_context) {
         error_setg(errp, "qemu aio context not ready");
         return;
     }
 
     aio_context_set_aio_params(qemu_aio_context, base->aio_max_batch, errp);
+    if (*errp) {
+        return;
+    }
+
+    aio_context_set_thread_pool_params(qemu_aio_context, base->thread_pool_min,
+                                       base->thread_pool_max, errp);
 }
 
 MainLoop *mloop;
diff --git a/util/thread-pool.c b/util/thread-pool.c
index d763cea505..196835b4d3 100644
--- a/util/thread-pool.c
+++ b/util/thread-pool.c
@@ -58,7 +58,6 @@ struct ThreadPool {
     QemuMutex lock;
     QemuCond worker_stopped;
     QemuSemaphore sem;
-    int max_threads;
     QEMUBH *new_thread_bh;
 
     /* The following variables are only accessed from one AioContext. */
@@ -71,8 +70,27 @@ struct ThreadPool {
     int new_threads;     /* backlog of threads we need to create */
     int pending_threads; /* threads created but not running yet */
     bool stopping;
+    int min_threads;
+    int max_threads;
 };
 
+static inline bool back_to_sleep(ThreadPool *pool, int ret)
+{
+    /*
+     * The semaphore timed out, we should exit the loop except when:
+     *  - There is work to do, we raced with the signal.
+     *  - The max threads threshold just changed, we raced with the signal.
+     *  - The thread pool forces a minimum number of readily available threads.
+     */
+    if (ret == -1 && (!QTAILQ_EMPTY(&pool->request_list) ||
+            pool->cur_threads > pool->max_threads ||
+            pool->cur_threads <= pool->min_threads)) {
+            return true;
+    }
+
+    return false;
+}
+
 static void *worker_thread(void *opaque)
 {
     ThreadPool *pool = opaque;
@@ -91,8 +109,9 @@ static void *worker_thread(void *opaque)
             ret = qemu_sem_timedwait(&pool->sem, 10000);
             qemu_mutex_lock(&pool->lock);
             pool->idle_threads--;
-        } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
-        if (ret == -1 || pool->stopping) {
+        } while (back_to_sleep(pool, ret));
+        if (ret == -1 || pool->stopping ||
+            pool->cur_threads > pool->max_threads) {
             break;
         }
 
@@ -294,6 +313,33 @@ void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
     thread_pool_submit_aio(pool, func, arg, NULL, NULL);
 }
 
+void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
+{
+    qemu_mutex_lock(&pool->lock);
+
+    pool->min_threads = ctx->thread_pool_min;
+    pool->max_threads = ctx->thread_pool_max;
+
+    /*
+     * We either have to:
+     *  - Increase the number available of threads until over the min_threads
+     *    threshold.
+     *  - Decrease the number of available threads until under the max_threads
+     *    threshold.
+     *  - Do nothing. The current number of threads fall in between the min and
+     *    max thresholds. We'll let the pool manage itself.
+     */
+    for (int i = pool->cur_threads; i < pool->min_threads; i++) {
+        spawn_thread(pool);
+    }
+
+    for (int i = pool->cur_threads; i > pool->max_threads; i--) {
+        qemu_sem_post(&pool->sem);
+    }
+
+    qemu_mutex_unlock(&pool->lock);
+}
+
 static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
 {
     if (!ctx) {
@@ -306,11 +352,12 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
     qemu_mutex_init(&pool->lock);
     qemu_cond_init(&pool->worker_stopped);
     qemu_sem_init(&pool->sem, 0);
-    pool->max_threads = 64;
     pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
 
     QLIST_INIT(&pool->head);
     QTAILQ_INIT(&pool->request_list);
+
+    thread_pool_update_params(pool, ctx);
 }
 
 ThreadPool *thread_pool_new(AioContext *ctx)