summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--include/io/task.h29
-rw-r--r--io/task.c41
-rw-r--r--io/trace-events2
3 files changed, 67 insertions, 5 deletions
diff --git a/include/io/task.h b/include/io/task.h
index 9e09b95b2e..57d8ba835e 100644
--- a/include/io/task.h
+++ b/include/io/task.h
@@ -232,7 +232,8 @@ QIOTask *qio_task_new(Object *source,
  *
  * Run a task in a background thread. When @worker
  * returns it will call qio_task_complete() in
- * the event thread context that provided.
+ * the thread that is running the main loop associated
+ * with @context.
  */
 void qio_task_run_in_thread(QIOTask *task,
                             QIOTaskWorker worker,
@@ -240,6 +241,32 @@ void qio_task_run_in_thread(QIOTask *task,
                             GDestroyNotify destroy,
                             GMainContext *context);
 
+
+/**
+ * qio_task_wait_thread:
+ * @task: the task struct
+ *
+ * Wait for completion of a task that was previously
+ * invoked using qio_task_run_in_thread. This MUST
+ * ONLY be invoked if the task has not already
+ * completed, since after the completion callback
+ * is invoked, @task will have been freed.
+ *
+ * To avoid racing with execution of the completion
+ * callback provided with qio_task_new, this method
+ * MUST ONLY be invoked from the thread that is
+ * running the main loop associated with @context
+ * parameter to qio_task_run_in_thread.
+ *
+ * When the thread has completed, the completion
+ * callback provided to qio_task_new will be invoked.
+ * When that callback returns @task will be freed,
+ * so @task must not be referenced after this
+ * method completes.
+ */
+void qio_task_wait_thread(QIOTask *task);
+
+
 /**
  * qio_task_complete:
  * @task: the task struct
diff --git a/io/task.c b/io/task.c
index 396866b10f..64c4c7126a 100644
--- a/io/task.c
+++ b/io/task.c
@@ -29,6 +29,7 @@ struct QIOTaskThreadData {
     gpointer opaque;
     GDestroyNotify destroy;
     GMainContext *context;
+    GSource *completion;
 };
 
 
@@ -40,6 +41,8 @@ struct QIOTask {
     Error *err;
     gpointer result;
     GDestroyNotify destroyResult;
+    QemuMutex thread_lock;
+    QemuCond thread_cond;
     struct QIOTaskThreadData *thread;
 };
 
@@ -58,6 +61,8 @@ QIOTask *qio_task_new(Object *source,
     task->func = func;
     task->opaque = opaque;
     task->destroy = destroy;
+    qemu_mutex_init(&task->thread_lock);
+    qemu_cond_init(&task->thread_cond);
 
     trace_qio_task_new(task, source, func, opaque);
 
@@ -66,6 +71,7 @@ QIOTask *qio_task_new(Object *source,
 
 static void qio_task_free(QIOTask *task)
 {
+    qemu_mutex_lock(&task->thread_lock);
     if (task->thread) {
         if (task->thread->destroy) {
             task->thread->destroy(task->thread->opaque);
@@ -89,6 +95,10 @@ static void qio_task_free(QIOTask *task)
     }
     object_unref(task->source);
 
+    qemu_mutex_unlock(&task->thread_lock);
+    qemu_mutex_destroy(&task->thread_lock);
+    qemu_cond_destroy(&task->thread_cond);
+
     g_free(task);
 }
 
@@ -107,7 +117,6 @@ static gboolean qio_task_thread_result(gpointer opaque)
 static gpointer qio_task_thread_worker(gpointer opaque)
 {
     QIOTask *task = opaque;
-    GSource *idle;
 
     trace_qio_task_thread_run(task);
 
@@ -120,9 +129,17 @@ static gpointer qio_task_thread_worker(gpointer opaque)
      */
     trace_qio_task_thread_exit(task);
 
-    idle = g_idle_source_new();
-    g_source_set_callback(idle, qio_task_thread_result, task, NULL);
-    g_source_attach(idle, task->thread->context);
+    qemu_mutex_lock(&task->thread_lock);
+
+    task->thread->completion = g_idle_source_new();
+    g_source_set_callback(task->thread->completion,
+                          qio_task_thread_result, task, NULL);
+    g_source_attach(task->thread->completion,
+                    task->thread->context);
+    trace_qio_task_thread_source_attach(task, task->thread->completion);
+
+    qemu_cond_signal(&task->thread_cond);
+    qemu_mutex_unlock(&task->thread_lock);
 
     return NULL;
 }
@@ -157,6 +174,22 @@ void qio_task_run_in_thread(QIOTask *task,
 }
 
 
+void qio_task_wait_thread(QIOTask *task)
+{
+    qemu_mutex_lock(&task->thread_lock);
+    g_assert(task->thread != NULL);
+    while (task->thread->completion == NULL) {
+        qemu_cond_wait(&task->thread_cond, &task->thread_lock);
+    }
+
+    trace_qio_task_thread_source_cancel(task, task->thread->completion);
+    g_source_destroy(task->thread->completion);
+    qemu_mutex_unlock(&task->thread_lock);
+
+    qio_task_thread_result(task);
+}
+
+
 void qio_task_complete(QIOTask *task)
 {
     task->func(task, task->opaque);
diff --git a/io/trace-events b/io/trace-events
index f70bad7cbe..07a7bbec6a 100644
--- a/io/trace-events
+++ b/io/trace-events
@@ -7,6 +7,8 @@ qio_task_thread_start(void *task, void *worker, void *opaque) "Task thread start
 qio_task_thread_run(void *task) "Task thread run task=%p"
 qio_task_thread_exit(void *task) "Task thread exit task=%p"
 qio_task_thread_result(void *task) "Task thread result task=%p"
+qio_task_thread_source_attach(void *task, void *source) "Task thread source attach task=%p source=%p"
+qio_task_thread_source_cancel(void *task, void *source) "Task thread source cancel task=%p source=%p"
 
 # io/channel-socket.c
 qio_channel_socket_new(void *ioc) "Socket new ioc=%p"