summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Catanzaro <mcatanzaro@igalia.com>2019-05-25 18:30:12 -0500
committerMichael Catanzaro <mcatanzaro@igalia.com>2019-05-26 10:17:15 -0500
commit72ed9e04394746495f021133915e6bf7c1d3b5b1 (patch)
tree65070eae0394e5e16c927dc9efa4e402d7ca3b52
parente10eff122cbfbc1285dc3ebcae1f07182da36587 (diff)
downloadglib-mcatanzaro/#1346.tar.gz
gtask: ensure task is destroyed on its context's threadmcatanzaro/#1346
The GTask must be destroyed on the thread that is running its GMainContext, i.e. the thread that started the task. It must never be destroyed on the actual task thread when running with g_task_run_in_thread(), because when it is destroyed, it will unref its source object and destroy its user data (if a GDestroyNotify was set for the data using g_task_set_task_data()). The source object and task data might not be safe to destroy on a secondary thread, though, so this is incorrect. We have to ensure they are destroyed on the task's context's thread. There are different ways we could do this, but the simplest by far is to ensure the task thread has unreffed the task before the context's thread executes the callback. And that is simple enough to do using a condition variable. We have to keep a static global map of all GTasks with outstanding task threads, which is slightly unfortunate, but we already have a bunch of global data in this file for managing the thread pool, and the map will only contain tasks that are currently running in threads, so it should be small. Fixes #1346
-rw-r--r--gio/gtask.c127
-rw-r--r--gio/tests/task.c110
2 files changed, 222 insertions, 15 deletions
diff --git a/gio/gtask.c b/gio/gtask.c
index 9950bb36b..5502d5695 100644
--- a/gio/gtask.c
+++ b/gio/gtask.c
@@ -558,7 +558,6 @@ struct _GTask {
GTaskThreadFunc task_func;
GMutex lock;
- GCond cond;
/* This can’t be in the bit field because we access it from TRACE(). */
gboolean thread_cancelled;
@@ -616,6 +615,46 @@ static GSource *task_pool_manager;
static guint64 task_wait_time;
static gint tasks_running;
+static GHashTable *completion_synchronizer_table; /* GTask -> ThreadedTaskCompletionSynchronizer */
+static GMutex completion_synchronizer_table_mutex;
+
+typedef struct {
+ GMutex mutex;
+ GCond condition;
+ gboolean finished;
+ gatomicrefcount refcount;
+} ThreadedTaskCompletionSynchronizer;
+
+static ThreadedTaskCompletionSynchronizer *
+threaded_task_completion_synchronizer_new (void)
+{
+ ThreadedTaskCompletionSynchronizer *synchronizer = g_new (ThreadedTaskCompletionSynchronizer, 1);
+
+ g_mutex_init (&synchronizer->mutex);
+ g_cond_init (&synchronizer->condition);
+ synchronizer->finished = FALSE;
+ g_atomic_ref_count_init (&synchronizer->refcount);
+
+ return synchronizer;
+}
+
+static void
+threaded_task_completion_synchronizer_ref (ThreadedTaskCompletionSynchronizer *synchronizer)
+{
+ g_atomic_ref_count_inc (&synchronizer->refcount);
+}
+
+static void
+threaded_task_completion_synchronizer_unref (ThreadedTaskCompletionSynchronizer *synchronizer)
+{
+ if (g_atomic_ref_count_dec (&synchronizer->refcount))
+ {
+ g_mutex_clear (&synchronizer->mutex);
+ g_cond_clear (&synchronizer->condition);
+ g_free (synchronizer);
+ }
+}
+
/* When the task pool fills up and blocks, and the program keeps
* queueing more tasks, we will slowly add more threads to the pool
* (in case the existing tasks are trying to queue subtasks of their
@@ -660,13 +699,10 @@ g_task_finalize (GObject *object)
task->result_destroy (task->result.pointer);
if (task->error)
- g_error_free (task->error);
+ g_error_free (task->error);
if (G_TASK_IS_THREADED (task))
- {
- g_mutex_clear (&task->lock);
- g_cond_clear (&task->cond);
- }
+ g_mutex_clear (&task->lock);
G_OBJECT_CLASS (g_task_parent_class)->finalize (object);
}
@@ -1220,9 +1256,38 @@ g_task_return_now (GTask *task)
g_main_context_pop_thread_default (task->context);
}
+static void
+wait_for_threaded_task_completion (GTask *task)
+{
+ ThreadedTaskCompletionSynchronizer *synchronizer;
+
+ /* Here we ensure that the task has already been unreffed on the task
+ * thread, to ensure we never destroy the task's source object or user
+ * data on the task thread.
+ */
+ g_mutex_lock (&completion_synchronizer_table_mutex);
+ synchronizer = g_hash_table_lookup (completion_synchronizer_table, task);
+ g_mutex_unlock (&completion_synchronizer_table_mutex);
+
+ g_assert (synchronizer != NULL);
+ g_mutex_lock (&synchronizer->mutex);
+ while (!synchronizer->finished)
+ g_cond_wait (&synchronizer->condition, &synchronizer->mutex);
+ g_mutex_unlock (&synchronizer->mutex);
+
+ g_mutex_lock (&completion_synchronizer_table_mutex);
+ g_hash_table_remove (completion_synchronizer_table, task);
+ g_mutex_unlock (&completion_synchronizer_table_mutex);
+}
+
static gboolean
-complete_in_idle_cb (gpointer task)
+complete_in_idle_cb (gpointer user_data)
{
+ GTask *task = user_data;
+
+ if (G_TASK_IS_THREADED (task))
+ wait_for_threaded_task_completion (task);
+
g_task_return_now (task);
g_object_unref (task);
return FALSE;
@@ -1343,9 +1408,7 @@ g_task_thread_complete (GTask *task)
if (task->cancellable)
g_signal_handlers_disconnect_by_func (task->cancellable, task_thread_cancelled, task);
- if (task->synchronous)
- g_cond_signal (&task->cond);
- else
+ if (!task->synchronous)
g_task_return (task, G_TASK_RETURN_FROM_THREAD);
}
@@ -1404,6 +1467,7 @@ g_task_thread_pool_thread (gpointer thread_data,
gpointer pool_data)
{
GTask *task = thread_data;
+ ThreadedTaskCompletionSynchronizer *synchronizer;
g_task_thread_setup ();
@@ -1412,6 +1476,26 @@ g_task_thread_pool_thread (gpointer thread_data,
g_task_thread_complete (task);
g_object_unref (task);
+ /* Now we have to ensure that the task is not destroyed in this thread, since
+ * that would cause its source object and user data to be destroyed, and that
+ * needs to happen back in the original thread. So signal to the original
+ * thread that we are done. Note we cannot use task at this point except for
+ * its address.
+ */
+ g_mutex_lock (&completion_synchronizer_table_mutex);
+ synchronizer = g_hash_table_lookup (completion_synchronizer_table, task);
+ g_mutex_unlock (&completion_synchronizer_table_mutex);
+
+ g_assert (synchronizer != NULL);
+ threaded_task_completion_synchronizer_ref (synchronizer);
+
+ g_mutex_lock (&synchronizer->mutex);
+ synchronizer->finished = TRUE;
+ g_cond_signal (&synchronizer->condition);
+ g_mutex_unlock (&synchronizer->mutex);
+
+ threaded_task_completion_synchronizer_unref (synchronizer);
+
g_task_thread_cleanup ();
}
@@ -1454,8 +1538,19 @@ static void
g_task_start_task_thread (GTask *task,
GTaskThreadFunc task_func)
{
+ g_mutex_lock (&completion_synchronizer_table_mutex);
+ if (g_hash_table_contains (completion_synchronizer_table, task))
+ {
+ g_critical ("Task is already running in thread");
+ g_mutex_unlock (&completion_synchronizer_table_mutex);
+ return;
+ }
+
+ g_hash_table_insert (completion_synchronizer_table, task,
+ threaded_task_completion_synchronizer_new ());
+ g_mutex_unlock (&completion_synchronizer_table_mutex);
+
g_mutex_init (&task->lock);
- g_cond_init (&task->cond);
g_mutex_lock (&task->lock);
@@ -1571,12 +1666,10 @@ g_task_run_in_thread_sync (GTask *task,
task->synchronous = TRUE;
g_task_start_task_thread (task, task_func);
-
- while (!task->thread_complete)
- g_cond_wait (&task->cond, &task->lock);
-
g_mutex_unlock (&task->lock);
+ wait_for_threaded_task_completion (task);
+
TRACE (GIO_TASK_BEFORE_RETURN (task, task->source_object,
NULL /* callback */,
NULL /* callback data */));
@@ -2067,6 +2160,10 @@ g_task_thread_pool_init (void)
g_source_attach (task_pool_manager,
GLIB_PRIVATE_CALL (g_get_worker_context ()));
g_source_unref (task_pool_manager);
+
+ completion_synchronizer_table = g_hash_table_new_full (g_direct_hash, g_direct_equal,
+ NULL,
+ (GDestroyNotify)threaded_task_completion_synchronizer_unref);
}
static void
diff --git a/gio/tests/task.c b/gio/tests/task.c
index 9b2c6912c..284cbb255 100644
--- a/gio/tests/task.c
+++ b/gio/tests/task.c
@@ -1408,6 +1408,114 @@ test_run_in_thread_overflow (void)
g_assert_cmpint (i + strspn (buf + i, "X"), ==, NUM_OVERFLOW_TASKS);
}
+/* test_run_in_thread_destruction: source object and user data must be
+ * destroyed on the original thread, not the task thread
+ */
+
+#define SOURCE_OBJECT_TYPE_DESTRUCTION_THREAD_CHECKER (source_object_destruction_thread_checker_get_type())
+
+G_DECLARE_FINAL_TYPE (SourceObjectDestructionThreadChecker, source_object_destruction_thread_checker, SOURCE_OBJECT, DESTRUCTION_THREAD_CHECKER, GObject)
+
+struct _SourceObjectDestructionThreadChecker
+{
+ GObject parent_instance;
+};
+
+G_DEFINE_TYPE (SourceObjectDestructionThreadChecker, source_object_destruction_thread_checker, G_TYPE_OBJECT)
+
+static void
+source_object_destruction_thread_checker_finalize (GObject *object)
+{
+ g_assert_true (main_thread == g_thread_self ());
+
+ G_OBJECT_CLASS (source_object_destruction_thread_checker_parent_class)->finalize (object);
+}
+
+static void
+source_object_destruction_thread_checker_class_init (SourceObjectDestructionThreadCheckerClass *klass)
+{
+ GObjectClass *object_class = G_OBJECT_CLASS (klass);
+
+ object_class->finalize = source_object_destruction_thread_checker_finalize;
+}
+
+static void
+source_object_destruction_thread_checker_init (SourceObjectDestructionThreadChecker *self)
+{
+}
+
+static void
+user_data_destruction_thread_checker (gpointer user_data)
+{
+ g_assert_true (main_thread == g_thread_self ());
+}
+
+static void
+destruction_thread_test_cb (GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ g_assert_true (main_thread == g_thread_self ());
+ g_main_loop_quit ((GMainLoop *)user_data);
+}
+
+static void
+destruction_thread_test_thread (GTask *task,
+ gpointer source_object,
+ gpointer task_data,
+ GCancellable *cancellable)
+{
+ g_assert_false (main_thread == g_thread_self ());
+ g_object_unref (source_object);
+}
+
+static void
+test_run_in_thread_destruction (void)
+{
+ SourceObjectDestructionThreadChecker *source_object;
+ GMainLoop *main_loop;
+ GTask *task;
+ int i;
+
+ g_test_bug ("https://gitlab.gnome.org/GNOME/glib/issues/1346");
+
+ main_loop = g_main_loop_new (NULL, FALSE);
+
+ for (i = 0; i < 1000000; i++)
+ {
+ source_object = g_object_new (SOURCE_OBJECT_TYPE_DESTRUCTION_THREAD_CHECKER, NULL);
+ task = g_task_new (source_object, NULL, destruction_thread_test_cb, main_loop);
+ g_task_set_task_data (task, NULL, user_data_destruction_thread_checker);
+ g_task_run_in_thread (task, destruction_thread_test_thread);
+ g_main_loop_run (main_loop);
+ g_object_unref (task);
+ }
+
+ g_main_loop_unref (main_loop);
+}
+
+/* test_run_in_thread_sync_destruction: source object and user data
+ * must be destroyed on the original thread, not the task thread
+ */
+static void
+test_run_in_thread_sync_destruction (void)
+{
+ SourceObjectDestructionThreadChecker *source_object;
+ GTask *task;
+ int i;
+
+ g_test_bug ("https://gitlab.gnome.org/GNOME/glib/issues/1346");
+
+ for (i = 0; i < 1000000; i++)
+ {
+ source_object = g_object_new (SOURCE_OBJECT_TYPE_DESTRUCTION_THREAD_CHECKER, NULL);
+ task = g_task_new (source_object, NULL, NULL, NULL);
+ g_task_set_task_data (task, NULL, user_data_destruction_thread_checker);
+ g_task_run_in_thread_sync (task, destruction_thread_test_thread);
+ g_object_unref (task);
+ }
+}
+
/* test_return_on_cancel */
GMutex roc_init_mutex, roc_finish_mutex;
@@ -2345,6 +2453,8 @@ main (int argc, char **argv)
g_test_add_func ("/gtask/run-in-thread-priority", test_run_in_thread_priority);
g_test_add_func ("/gtask/run-in-thread-nested", test_run_in_thread_nested);
g_test_add_func ("/gtask/run-in-thread-overflow", test_run_in_thread_overflow);
+ g_test_add_func ("/gtask/run-in-thread-destruction", test_run_in_thread_destruction);
+ g_test_add_func ("/gtask/run-in-thread-sync-destruction", test_run_in_thread_sync_destruction);
g_test_add_func ("/gtask/return-on-cancel", test_return_on_cancel);
g_test_add_func ("/gtask/return-on-cancel-sync", test_return_on_cancel_sync);
g_test_add_func ("/gtask/return-on-cancel-atomic", test_return_on_cancel_atomic);