summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilip Withnall <pwithnall@endlessos.org>2023-04-25 15:58:46 +0100
committerPhilip Withnall <pwithnall@endlessos.org>2023-04-27 12:23:25 +0100
commit7b18e6205a03058541391e02c013c893b144e1ed (patch)
tree6f1f51c0db42e65e580d6c213bd03196842fdb06
parent84074ce757d090f52f6701f43b99a41517131433 (diff)
downloadglib-7b18e6205a03058541391e02c013c893b144e1ed.tar.gz
gthreadedresolver: Switch to using a separate thread pool
Rather than running lookups in the global shared thread pool belonging to `GTask`, run them in a private thread pool. This is needed because the global shared thread pool is constrained to only 14 threads. If there are 14 ongoing calls to `g_task_run_in_thread()` from any library/code in the process, and then one of them asks to do a DNS lookup, the lookup will block forever. Under certain circumstances, particularly where there are a couple of deep chains of dependent tasks running with `g_task_run_in_thread()`, this can livelock the program. Since `GResolver` is likely to be called as a frequent leaf call in certain workloads, and in particular there are likely to be several lookups requested at the same time, it makes sense to move resolver lookups to a private thread pool. Signed-off-by: Philip Withnall <pwithnall@endlessos.org>
-rw-r--r--gio/gthreadedresolver.c198
1 files changed, 162 insertions, 36 deletions
diff --git a/gio/gthreadedresolver.c b/gio/gthreadedresolver.c
index 4e5b20751..1277907e7 100644
--- a/gio/gthreadedresolver.c
+++ b/gio/gthreadedresolver.c
@@ -41,18 +41,39 @@
struct _GThreadedResolver
{
GResolver parent_instance;
+
+ GThreadPool *thread_pool; /* (owned) */
};
G_DEFINE_TYPE (GThreadedResolver, g_threaded_resolver, G_TYPE_RESOLVER)
-static void threaded_resolver_worker_cb (GTask *task,
- gpointer source_object,
- gpointer task_data,
- GCancellable *cancellable);
+static void run_task_in_thread_pool_async (GThreadedResolver *self,
+ GTask *task);
+static void run_task_in_thread_pool_sync (GThreadedResolver *self,
+ GTask *task);
+static void threaded_resolver_worker_cb (gpointer task_data,
+ gpointer user_data);
static void
-g_threaded_resolver_init (GThreadedResolver *gtr)
+g_threaded_resolver_init (GThreadedResolver *self)
{
+ self->thread_pool = g_thread_pool_new_full (threaded_resolver_worker_cb,
+ self,
+ (GDestroyNotify) g_object_unref,
+ 20,
+ FALSE,
+ NULL);
+}
+
+static void
+g_threaded_resolver_finalize (GObject *object)
+{
+ GThreadedResolver *self = G_THREADED_RESOLVER (object);
+
+ g_thread_pool_free (self->thread_pool, TRUE, FALSE);
+ self->thread_pool = NULL;
+
+ G_OBJECT_CLASS (g_threaded_resolver_parent_class)->finalize (object);
}
static GResolverError
@@ -95,6 +116,23 @@ typedef struct {
GResolverRecordType record_type;
} lookup_records;
};
+
+ GCond cond; /* used for signalling completion of the task when running it sync */
+ GMutex lock;
+
+ /* This enum indicates that a particular code path has claimed the
+ * task and is shortly about to call g_task_return_*() on it.
+ * This must be accessed with GThreadedResolver.lock held. */
+ enum
+ {
+ NOT_YET,
+ COMPLETED, /* libc lookup call has completed successfully or errored */
+ } will_return;
+
+ /* Whether the thread pool thread executing this lookup has finished executing
+ * it and g_task_return_*() has been called on it already.
+ * This must be accessed with GThreadedResolver.lock held. */
+ gboolean has_returned;
} LookupData;
static LookupData *
@@ -103,6 +141,8 @@ lookup_data_new_by_name (const char *hostname,
{
LookupData *data = g_new0 (LookupData, 1);
data->lookup_type = LOOKUP_BY_NAME;
+ g_cond_init (&data->cond);
+ g_mutex_init (&data->lock);
data->lookup_by_name.hostname = g_strdup (hostname);
data->lookup_by_name.address_family = address_family;
return g_steal_pointer (&data);
@@ -113,6 +153,8 @@ lookup_data_new_by_address (GInetAddress *address)
{
LookupData *data = g_new0 (LookupData, 1);
data->lookup_type = LOOKUP_BY_ADDRESS;
+ g_cond_init (&data->cond);
+ g_mutex_init (&data->lock);
data->lookup_by_address.address = g_object_ref (address);
return g_steal_pointer (&data);
}
@@ -123,6 +165,8 @@ lookup_data_new_records (const gchar *rrname,
{
LookupData *data = g_new0 (LookupData, 1);
data->lookup_type = LOOKUP_RECORDS;
+ g_cond_init (&data->cond);
+ g_mutex_init (&data->lock);
data->lookup_records.rrname = g_strdup (rrname);
data->lookup_records.record_type = record_type;
return g_steal_pointer (&data);
@@ -145,6 +189,9 @@ lookup_data_free (LookupData *data)
g_assert_not_reached ();
}
+ g_mutex_clear (&data->lock);
+ g_cond_clear (&data->cond);
+
g_free (data);
}
@@ -243,6 +290,7 @@ lookup_by_name (GResolver *resolver,
GCancellable *cancellable,
GError **error)
{
+ GThreadedResolver *self = G_THREADED_RESOLVER (resolver);
GTask *task;
GList *addresses;
LookupData *data;
@@ -252,8 +300,9 @@ lookup_by_name (GResolver *resolver,
g_task_set_source_tag (task, lookup_by_name);
g_task_set_name (task, "[gio] resolver lookup");
g_task_set_task_data (task, g_steal_pointer (&data), (GDestroyNotify) lookup_data_free);
- g_task_set_return_on_cancel (task, TRUE);
- g_task_run_in_thread_sync (task, threaded_resolver_worker_cb);
+
+ run_task_in_thread_pool_sync (self, task);
+
addresses = g_task_propagate_pointer (task, error);
g_object_unref (task);
@@ -285,6 +334,7 @@ lookup_by_name_with_flags (GResolver *resolver,
GCancellable *cancellable,
GError **error)
{
+ GThreadedResolver *self = G_THREADED_RESOLVER (resolver);
GTask *task;
GList *addresses;
LookupData *data;
@@ -294,8 +344,9 @@ lookup_by_name_with_flags (GResolver *resolver,
g_task_set_source_tag (task, lookup_by_name_with_flags);
g_task_set_name (task, "[gio] resolver lookup");
g_task_set_task_data (task, g_steal_pointer (&data), (GDestroyNotify) lookup_data_free);
- g_task_set_return_on_cancel (task, TRUE);
- g_task_run_in_thread_sync (task, threaded_resolver_worker_cb);
+
+ run_task_in_thread_pool_sync (self, task);
+
addresses = g_task_propagate_pointer (task, error);
g_object_unref (task);
@@ -310,6 +361,7 @@ lookup_by_name_with_flags_async (GResolver *resolver,
GAsyncReadyCallback callback,
gpointer user_data)
{
+ GThreadedResolver *self = G_THREADED_RESOLVER (resolver);
GTask *task;
LookupData *data;
@@ -322,8 +374,9 @@ lookup_by_name_with_flags_async (GResolver *resolver,
g_task_set_source_tag (task, lookup_by_name_with_flags_async);
g_task_set_name (task, "[gio] resolver lookup");
g_task_set_task_data (task, g_steal_pointer (&data), (GDestroyNotify) lookup_data_free);
- g_task_set_return_on_cancel (task, TRUE);
- g_task_run_in_thread (task, threaded_resolver_worker_cb);
+
+ run_task_in_thread_pool_async (self, task);
+
g_object_unref (task);
}
@@ -415,6 +468,7 @@ lookup_by_address (GResolver *resolver,
GCancellable *cancellable,
GError **error)
{
+ GThreadedResolver *self = G_THREADED_RESOLVER (resolver);
LookupData *data = NULL;
GTask *task;
gchar *name;
@@ -424,8 +478,9 @@ lookup_by_address (GResolver *resolver,
g_task_set_source_tag (task, lookup_by_address);
g_task_set_name (task, "[gio] resolver lookup");
g_task_set_task_data (task, g_steal_pointer (&data), (GDestroyNotify) lookup_data_free);
- g_task_set_return_on_cancel (task, TRUE);
- g_task_run_in_thread_sync (task, threaded_resolver_worker_cb);
+
+ run_task_in_thread_pool_sync (self, task);
+
name = g_task_propagate_pointer (task, error);
g_object_unref (task);
@@ -439,6 +494,7 @@ lookup_by_address_async (GResolver *resolver,
GAsyncReadyCallback callback,
gpointer user_data)
{
+ GThreadedResolver *self = G_THREADED_RESOLVER (resolver);
LookupData *data = NULL;
GTask *task;
@@ -447,8 +503,9 @@ lookup_by_address_async (GResolver *resolver,
g_task_set_source_tag (task, lookup_by_address_async);
g_task_set_name (task, "[gio] resolver lookup");
g_task_set_task_data (task, g_steal_pointer (&data), (GDestroyNotify) lookup_data_free);
- g_task_set_return_on_cancel (task, TRUE);
- g_task_run_in_thread (task, threaded_resolver_worker_cb);
+
+ run_task_in_thread_pool_async (self, task);
+
g_object_unref (task);
}
@@ -1247,6 +1304,7 @@ lookup_records (GResolver *resolver,
GCancellable *cancellable,
GError **error)
{
+ GThreadedResolver *self = G_THREADED_RESOLVER (resolver);
GTask *task;
GList *records;
LookupData *data = NULL;
@@ -1258,8 +1316,8 @@ lookup_records (GResolver *resolver,
data = lookup_data_new_records (rrname, record_type);
g_task_set_task_data (task, g_steal_pointer (&data), (GDestroyNotify) lookup_data_free);
- g_task_set_return_on_cancel (task, TRUE);
- g_task_run_in_thread_sync (task, threaded_resolver_worker_cb);
+ run_task_in_thread_pool_sync (self, task);
+
records = g_task_propagate_pointer (task, error);
g_object_unref (task);
@@ -1274,6 +1332,7 @@ lookup_records_async (GResolver *resolver,
GAsyncReadyCallback callback,
gpointer user_data)
{
+ GThreadedResolver *self = G_THREADED_RESOLVER (resolver);
GTask *task;
LookupData *data = NULL;
@@ -1284,8 +1343,8 @@ lookup_records_async (GResolver *resolver,
data = lookup_data_new_records (rrname, record_type);
g_task_set_task_data (task, g_steal_pointer (&data), (GDestroyNotify) lookup_data_free);
- g_task_set_return_on_cancel (task, TRUE);
- g_task_run_in_thread (task, threaded_resolver_worker_cb);
+ run_task_in_thread_pool_async (self, task);
+
g_object_unref (task);
}
@@ -1300,13 +1359,41 @@ lookup_records_finish (GResolver *resolver,
}
static void
-threaded_resolver_worker_cb (GTask *task,
- gpointer source_object,
- gpointer task_data,
- GCancellable *cancellable)
+run_task_in_thread_pool_async (GThreadedResolver *self,
+ GTask *task)
+{
+ LookupData *data = g_task_get_task_data (task);
+
+ g_mutex_lock (&data->lock);
+
+ g_thread_pool_push (self->thread_pool, g_object_ref (task), NULL);
+
+ g_mutex_unlock (&data->lock);
+}
+
+static void
+run_task_in_thread_pool_sync (GThreadedResolver *self,
+ GTask *task)
+{
+ LookupData *data = g_task_get_task_data (task);
+
+ run_task_in_thread_pool_async (self, task);
+
+ g_mutex_lock (&data->lock);
+ while (!data->has_returned)
+ g_cond_wait (&data->cond, &data->lock);
+ g_mutex_unlock (&data->lock);
+}
+
+static void
+threaded_resolver_worker_cb (gpointer task_data,
+ gpointer user_data)
{
- LookupData *data = task_data;
+ GTask *task = G_TASK (g_steal_pointer (&task_data));
+ LookupData *data = g_task_get_task_data (task);
+ GCancellable *cancellable = g_task_get_cancellable (task);
GError *local_error = NULL;
+ gboolean should_return;
switch (data->lookup_type) {
case LOOKUP_BY_NAME:
@@ -1316,10 +1403,19 @@ threaded_resolver_worker_cb (GTask *task,
cancellable,
&local_error);
- if (addresses != NULL)
- g_task_return_pointer (task, g_steal_pointer (&addresses), (GDestroyNotify) g_resolver_free_addresses);
- else
- g_task_return_error (task, g_steal_pointer (&local_error));
+ g_mutex_lock (&data->lock);
+ should_return = g_atomic_int_compare_and_exchange (&data->will_return, NOT_YET, COMPLETED);
+ g_mutex_unlock (&data->lock);
+
+ if (should_return)
+ {
+ if (addresses != NULL)
+ g_task_return_pointer (task, g_steal_pointer (&addresses), (GDestroyNotify) g_resolver_free_addresses);
+ else
+ g_task_return_error (task, g_steal_pointer (&local_error));
+ }
+
+ g_clear_pointer (&addresses, g_resolver_free_addresses);
}
break;
case LOOKUP_BY_ADDRESS:
@@ -1328,10 +1424,19 @@ threaded_resolver_worker_cb (GTask *task,
cancellable,
&local_error);
- if (name != NULL)
- g_task_return_pointer (task, g_steal_pointer (&name), g_free);
- else
- g_task_return_error (task, g_steal_pointer (&local_error));
+ g_mutex_lock (&data->lock);
+ should_return = g_atomic_int_compare_and_exchange (&data->will_return, NOT_YET, COMPLETED);
+ g_mutex_unlock (&data->lock);
+
+ if (should_return)
+ {
+ if (name != NULL)
+ g_task_return_pointer (task, g_steal_pointer (&name), g_free);
+ else
+ g_task_return_error (task, g_steal_pointer (&local_error));
+ }
+
+ g_clear_pointer (&name, g_free);
}
break;
case LOOKUP_RECORDS:
@@ -1341,22 +1446,43 @@ threaded_resolver_worker_cb (GTask *task,
cancellable,
&local_error);
- if (records != NULL)
- g_task_return_pointer (task, g_steal_pointer (&records), (GDestroyNotify) free_records);
- else
- g_task_return_error (task, g_steal_pointer (&local_error));
+ g_mutex_lock (&data->lock);
+ should_return = g_atomic_int_compare_and_exchange (&data->will_return, NOT_YET, COMPLETED);
+ g_mutex_unlock (&data->lock);
+
+ if (should_return)
+ {
+ if (records != NULL)
+ g_task_return_pointer (task, g_steal_pointer (&records), (GDestroyNotify) free_records);
+ else
+ g_task_return_error (task, g_steal_pointer (&local_error));
+ }
+
+ g_clear_pointer (&records, free_records);
}
break;
default:
g_assert_not_reached ();
}
+
+ /* Signal completion of a task. */
+ g_mutex_lock (&data->lock);
+ g_assert (!data->has_returned);
+ data->has_returned = TRUE;
+ g_cond_broadcast (&data->cond);
+ g_mutex_unlock (&data->lock);
+
+ g_object_unref (task);
}
static void
g_threaded_resolver_class_init (GThreadedResolverClass *threaded_class)
{
+ GObjectClass *object_class = G_OBJECT_CLASS (threaded_class);
GResolverClass *resolver_class = G_RESOLVER_CLASS (threaded_class);
+ object_class->finalize = g_threaded_resolver_finalize;
+
resolver_class->lookup_by_name = lookup_by_name;
resolver_class->lookup_by_name_async = lookup_by_name_async;
resolver_class->lookup_by_name_finish = lookup_by_name_finish;