diff options
-rw-r--r-- | gio/gtask.c | 127 | ||||
-rw-r--r-- | gio/tests/task.c | 110 |
2 files changed, 222 insertions, 15 deletions
diff --git a/gio/gtask.c b/gio/gtask.c index 9950bb36b..5502d5695 100644 --- a/gio/gtask.c +++ b/gio/gtask.c @@ -558,7 +558,6 @@ struct _GTask { GTaskThreadFunc task_func; GMutex lock; - GCond cond; /* This can’t be in the bit field because we access it from TRACE(). */ gboolean thread_cancelled; @@ -616,6 +615,46 @@ static GSource *task_pool_manager; static guint64 task_wait_time; static gint tasks_running; +static GHashTable *completion_synchronizer_table; /* GTask -> ThreadedTaskCompletionSynchronizer */ +static GMutex completion_synchronizer_table_mutex; + +typedef struct { + GMutex mutex; + GCond condition; + gboolean finished; + gatomicrefcount refcount; +} ThreadedTaskCompletionSynchronizer; + +static ThreadedTaskCompletionSynchronizer * +threaded_task_completion_synchronizer_new (void) +{ + ThreadedTaskCompletionSynchronizer *synchronizer = g_new (ThreadedTaskCompletionSynchronizer, 1); + + g_mutex_init (&synchronizer->mutex); + g_cond_init (&synchronizer->condition); + synchronizer->finished = FALSE; + g_atomic_ref_count_init (&synchronizer->refcount); + + return synchronizer; +} + +static void +threaded_task_completion_synchronizer_ref (ThreadedTaskCompletionSynchronizer *synchronizer) +{ + g_atomic_ref_count_inc (&synchronizer->refcount); +} + +static void +threaded_task_completion_synchronizer_unref (ThreadedTaskCompletionSynchronizer *synchronizer) +{ + if (g_atomic_ref_count_dec (&synchronizer->refcount)) + { + g_mutex_clear (&synchronizer->mutex); + g_cond_clear (&synchronizer->condition); + g_free (synchronizer); + } +} + /* When the task pool fills up and blocks, and the program keeps * queueing more tasks, we will slowly add more threads to the pool * (in case the existing tasks are trying to queue subtasks of their @@ -660,13 +699,10 @@ g_task_finalize (GObject *object) task->result_destroy (task->result.pointer); if (task->error) - g_error_free (task->error); + g_error_free (task->error); if (G_TASK_IS_THREADED (task)) - { - g_mutex_clear (&task->lock); - g_cond_clear (&task->cond); - } + g_mutex_clear (&task->lock); G_OBJECT_CLASS (g_task_parent_class)->finalize (object); } @@ -1220,9 +1256,38 @@ g_task_return_now (GTask *task) g_main_context_pop_thread_default (task->context); } +static void +wait_for_threaded_task_completion (GTask *task) +{ + ThreadedTaskCompletionSynchronizer *synchronizer; + + /* Here we ensure that the task has already been unreffed on the task + * thread, to ensure we never destroy the task's source object or user + * data on the task thread. + */ + g_mutex_lock (&completion_synchronizer_table_mutex); + synchronizer = g_hash_table_lookup (completion_synchronizer_table, task); + g_mutex_unlock (&completion_synchronizer_table_mutex); + + g_assert (synchronizer != NULL); + g_mutex_lock (&synchronizer->mutex); + while (!synchronizer->finished) + g_cond_wait (&synchronizer->condition, &synchronizer->mutex); + g_mutex_unlock (&synchronizer->mutex); + + g_mutex_lock (&completion_synchronizer_table_mutex); + g_hash_table_remove (completion_synchronizer_table, task); + g_mutex_unlock (&completion_synchronizer_table_mutex); +} + static gboolean -complete_in_idle_cb (gpointer task) +complete_in_idle_cb (gpointer user_data) { + GTask *task = user_data; + + if (G_TASK_IS_THREADED (task)) + wait_for_threaded_task_completion (task); + g_task_return_now (task); g_object_unref (task); return FALSE; @@ -1343,9 +1408,7 @@ g_task_thread_complete (GTask *task) if (task->cancellable) g_signal_handlers_disconnect_by_func (task->cancellable, task_thread_cancelled, task); - if (task->synchronous) - g_cond_signal (&task->cond); - else + if (!task->synchronous) g_task_return (task, G_TASK_RETURN_FROM_THREAD); } @@ -1404,6 +1467,7 @@ g_task_thread_pool_thread (gpointer thread_data, gpointer pool_data) { GTask *task = thread_data; + ThreadedTaskCompletionSynchronizer *synchronizer; g_task_thread_setup (); @@ -1412,6 +1476,26 @@ g_task_thread_pool_thread (gpointer thread_data, g_task_thread_complete (task); g_object_unref (task); + /* Now we have to ensure that the task is not destroyed in this thread, since + * that would cause its source object and user data to be destroyed, and that + * needs to happen back in the original thread. So signal to the original + * thread that we are done. Note we cannot use task at this point except for + * its address. + */ + g_mutex_lock (&completion_synchronizer_table_mutex); + synchronizer = g_hash_table_lookup (completion_synchronizer_table, task); + g_mutex_unlock (&completion_synchronizer_table_mutex); + + g_assert (synchronizer != NULL); + threaded_task_completion_synchronizer_ref (synchronizer); + + g_mutex_lock (&synchronizer->mutex); + synchronizer->finished = TRUE; + g_cond_signal (&synchronizer->condition); + g_mutex_unlock (&synchronizer->mutex); + + threaded_task_completion_synchronizer_unref (synchronizer); + g_task_thread_cleanup (); } @@ -1454,8 +1538,19 @@ static void g_task_start_task_thread (GTask *task, GTaskThreadFunc task_func) { + g_mutex_lock (&completion_synchronizer_table_mutex); + if (g_hash_table_contains (completion_synchronizer_table, task)) + { + g_critical ("Task is already running in thread"); + g_mutex_unlock (&completion_synchronizer_table_mutex); + return; + } + + g_hash_table_insert (completion_synchronizer_table, task, + threaded_task_completion_synchronizer_new ()); + g_mutex_unlock (&completion_synchronizer_table_mutex); + g_mutex_init (&task->lock); - g_cond_init (&task->cond); g_mutex_lock (&task->lock); @@ -1571,12 +1666,10 @@ g_task_run_in_thread_sync (GTask *task, task->synchronous = TRUE; g_task_start_task_thread (task, task_func); - - while (!task->thread_complete) - g_cond_wait (&task->cond, &task->lock); - g_mutex_unlock (&task->lock); + wait_for_threaded_task_completion (task); + TRACE (GIO_TASK_BEFORE_RETURN (task, task->source_object, NULL /* callback */, NULL /* callback data */)); @@ -2067,6 +2160,10 @@ g_task_thread_pool_init (void) g_source_attach (task_pool_manager, GLIB_PRIVATE_CALL (g_get_worker_context ())); g_source_unref (task_pool_manager); + + completion_synchronizer_table = g_hash_table_new_full (g_direct_hash, g_direct_equal, + NULL, + (GDestroyNotify)threaded_task_completion_synchronizer_unref); } static void diff --git a/gio/tests/task.c b/gio/tests/task.c index 9b2c6912c..284cbb255 100644 --- a/gio/tests/task.c +++ b/gio/tests/task.c @@ -1408,6 +1408,114 @@ test_run_in_thread_overflow (void) g_assert_cmpint (i + strspn (buf + i, "X"), ==, NUM_OVERFLOW_TASKS); } +/* test_run_in_thread_destruction: source object and user data must be + * destroyed on the original thread, not the task thread + */ + +#define SOURCE_OBJECT_TYPE_DESTRUCTION_THREAD_CHECKER (source_object_destruction_thread_checker_get_type()) + +G_DECLARE_FINAL_TYPE (SourceObjectDestructionThreadChecker, source_object_destruction_thread_checker, SOURCE_OBJECT, DESTRUCTION_THREAD_CHECKER, GObject) + +struct _SourceObjectDestructionThreadChecker +{ + GObject parent_instance; +}; + +G_DEFINE_TYPE (SourceObjectDestructionThreadChecker, source_object_destruction_thread_checker, G_TYPE_OBJECT) + +static void +source_object_destruction_thread_checker_finalize (GObject *object) +{ + g_assert_true (main_thread == g_thread_self ()); + + G_OBJECT_CLASS (source_object_destruction_thread_checker_parent_class)->finalize (object); +} + +static void +source_object_destruction_thread_checker_class_init (SourceObjectDestructionThreadCheckerClass *klass) +{ + GObjectClass *object_class = G_OBJECT_CLASS (klass); + + object_class->finalize = source_object_destruction_thread_checker_finalize; +} + +static void +source_object_destruction_thread_checker_init (SourceObjectDestructionThreadChecker *self) +{ +} + +static void +user_data_destruction_thread_checker (gpointer user_data) +{ + g_assert_true (main_thread == g_thread_self ()); +} + +static void +destruction_thread_test_cb (GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + g_assert_true (main_thread == g_thread_self ()); + g_main_loop_quit ((GMainLoop *)user_data); +} + +static void +destruction_thread_test_thread (GTask *task, + gpointer source_object, + gpointer task_data, + GCancellable *cancellable) +{ + g_assert_false (main_thread == g_thread_self ()); + g_object_unref (source_object); +} + +static void +test_run_in_thread_destruction (void) +{ + SourceObjectDestructionThreadChecker *source_object; + GMainLoop *main_loop; + GTask *task; + int i; + + g_test_bug ("https://gitlab.gnome.org/GNOME/glib/issues/1346"); + + main_loop = g_main_loop_new (NULL, FALSE); + + for (i = 0; i < 1000000; i++) + { + source_object = g_object_new (SOURCE_OBJECT_TYPE_DESTRUCTION_THREAD_CHECKER, NULL); + task = g_task_new (source_object, NULL, destruction_thread_test_cb, main_loop); + g_task_set_task_data (task, NULL, user_data_destruction_thread_checker); + g_task_run_in_thread (task, destruction_thread_test_thread); + g_main_loop_run (main_loop); + g_object_unref (task); + } + + g_main_loop_unref (main_loop); +} + +/* test_run_in_thread_sync_destruction: source object and user data + * must be destroyed on the original thread, not the task thread + */ +static void +test_run_in_thread_sync_destruction (void) +{ + SourceObjectDestructionThreadChecker *source_object; + GTask *task; + int i; + + g_test_bug ("https://gitlab.gnome.org/GNOME/glib/issues/1346"); + + for (i = 0; i < 1000000; i++) + { + source_object = g_object_new (SOURCE_OBJECT_TYPE_DESTRUCTION_THREAD_CHECKER, NULL); + task = g_task_new (source_object, NULL, NULL, NULL); + g_task_set_task_data (task, NULL, user_data_destruction_thread_checker); + g_task_run_in_thread_sync (task, destruction_thread_test_thread); + g_object_unref (task); + } +} + /* test_return_on_cancel */ GMutex roc_init_mutex, roc_finish_mutex; @@ -2345,6 +2453,8 @@ main (int argc, char **argv) g_test_add_func ("/gtask/run-in-thread-priority", test_run_in_thread_priority); g_test_add_func ("/gtask/run-in-thread-nested", test_run_in_thread_nested); g_test_add_func ("/gtask/run-in-thread-overflow", test_run_in_thread_overflow); + g_test_add_func ("/gtask/run-in-thread-destruction", test_run_in_thread_destruction); + g_test_add_func ("/gtask/run-in-thread-sync-destruction", test_run_in_thread_sync_destruction); g_test_add_func ("/gtask/return-on-cancel", test_return_on_cancel); g_test_add_func ("/gtask/return-on-cancel-sync", test_return_on_cancel_sync); g_test_add_func ("/gtask/return-on-cancel-atomic", test_return_on_cancel_atomic); |