summary refs log tree commit diff stats
path: root/util/thread-pool.c
diff options
context:
space:
mode:
Diffstat (limited to 'util/thread-pool.c')
-rw-r--r--util/thread-pool.c55
1 files changed, 51 insertions, 4 deletions
diff --git a/util/thread-pool.c b/util/thread-pool.c
index d763cea505..196835b4d3 100644
--- a/util/thread-pool.c
+++ b/util/thread-pool.c
@@ -58,7 +58,6 @@ struct ThreadPool {
     QemuMutex lock;
     QemuCond worker_stopped;
     QemuSemaphore sem;
-    int max_threads;
     QEMUBH *new_thread_bh;
 
     /* The following variables are only accessed from one AioContext. */
@@ -71,8 +70,27 @@ struct ThreadPool {
     int new_threads;     /* backlog of threads we need to create */
     int pending_threads; /* threads created but not running yet */
     bool stopping;
+    int min_threads;
+    int max_threads;
 };
 
+static inline bool back_to_sleep(ThreadPool *pool, int ret)
+{
+    /*
+     * The semaphore timed out, we should exit the loop except when:
+     *  - There is work to do, we raced with the signal.
+     *  - The max threads threshold just changed, we raced with the signal.
+     *  - The thread pool forces a minimum number of readily available threads.
+     */
+    if (ret == -1 && (!QTAILQ_EMPTY(&pool->request_list) ||
+            pool->cur_threads > pool->max_threads ||
+            pool->cur_threads <= pool->min_threads)) {
+            return true;
+    }
+
+    return false;
+}
+
 static void *worker_thread(void *opaque)
 {
     ThreadPool *pool = opaque;
@@ -91,8 +109,9 @@ static void *worker_thread(void *opaque)
             ret = qemu_sem_timedwait(&pool->sem, 10000);
             qemu_mutex_lock(&pool->lock);
             pool->idle_threads--;
-        } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
-        if (ret == -1 || pool->stopping) {
+        } while (back_to_sleep(pool, ret));
+        if (ret == -1 || pool->stopping ||
+            pool->cur_threads > pool->max_threads) {
             break;
         }
 
@@ -294,6 +313,33 @@ void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
     thread_pool_submit_aio(pool, func, arg, NULL, NULL);
 }
 
+void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
+{
+    qemu_mutex_lock(&pool->lock);
+
+    pool->min_threads = ctx->thread_pool_min;
+    pool->max_threads = ctx->thread_pool_max;
+
+    /*
+     * We either have to:
+     *  - Increase the number available of threads until over the min_threads
+     *    threshold.
+     *  - Decrease the number of available threads until under the max_threads
+     *    threshold.
+     *  - Do nothing. The current number of threads fall in between the min and
+     *    max thresholds. We'll let the pool manage itself.
+     */
+    for (int i = pool->cur_threads; i < pool->min_threads; i++) {
+        spawn_thread(pool);
+    }
+
+    for (int i = pool->cur_threads; i > pool->max_threads; i--) {
+        qemu_sem_post(&pool->sem);
+    }
+
+    qemu_mutex_unlock(&pool->lock);
+}
+
 static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
 {
     if (!ctx) {
@@ -306,11 +352,12 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
     qemu_mutex_init(&pool->lock);
     qemu_cond_init(&pool->worker_stopped);
     qemu_sem_init(&pool->sem, 0);
-    pool->max_threads = 64;
     pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
 
     QLIST_INIT(&pool->head);
     QTAILQ_INIT(&pool->request_list);
+
+    thread_pool_update_params(pool, ctx);
 }
 
 ThreadPool *thread_pool_new(AioContext *ctx)