diff options
author | Philip Withnall <philip@tecnocode.co.uk> | 2023-04-27 12:46:53 +0000 |
---|---|---|
committer | Philip Withnall <philip@tecnocode.co.uk> | 2023-04-27 12:46:53 +0000 |
commit | 7922d3200cabd4c6ddb85e020b68698bbcc85ffd (patch) | |
tree | 761577670ac0277898599b1e8a548223c991ca95 | |
parent | 6c1cbdff639ba62e482c6fbe59d526c4801ef695 (diff) | |
parent | e73c5ed5e1754205122a7f17838525052fc83e84 (diff) | |
download | glib-7922d3200cabd4c6ddb85e020b68698bbcc85ffd.tar.gz |
Merge branch 'resolver-thread-pool' into 'main'
gthreadedresolver: Switch to using a separate thread pool and support timeouts
See merge request GNOME/glib!3397
-rw-r--r-- | docs/reference/gio/gio-sections-common.txt | 2 | ||||
-rw-r--r-- | gio/gresolver.c | 124 | ||||
-rw-r--r-- | gio/gresolver.h | 5 | ||||
-rw-r--r-- | gio/gthreadedresolver.c | 588 | ||||
-rw-r--r-- | gio/gthreadedresolver.h | 25 | ||||
-rw-r--r-- | gio/tests/resolver.c | 16 |
6 files changed, 626 insertions, 134 deletions
diff --git a/docs/reference/gio/gio-sections-common.txt b/docs/reference/gio/gio-sections-common.txt index 63d167678..f5737ef42 100644 --- a/docs/reference/gio/gio-sections-common.txt +++ b/docs/reference/gio/gio-sections-common.txt @@ -1934,6 +1934,8 @@ g_resolver_lookup_service g_resolver_lookup_service_async g_resolver_lookup_service_finish g_resolver_free_targets +g_resolver_get_timeout +g_resolver_set_timeout <SUBSECTION> GResolverRecordType g_resolver_lookup_records diff --git a/gio/gresolver.c b/gio/gresolver.c index 6a735e8d9..676f1e271 100644 --- a/gio/gresolver.c +++ b/gio/gresolver.c @@ -55,8 +55,18 @@ * #GNetworkAddress and #GNetworkService provide wrappers around * #GResolver functionality that also implement #GSocketConnectable, * making it easy to connect to a remote host/service. + * + * The default resolver (see g_resolver_get_default()) has a timeout of 30s set + * on it since GLib 2.78. Earlier versions of GLib did not support resolver + * timeouts. */ +typedef enum { + PROP_TIMEOUT = 1, +} GResolverProperty; + +static GParamSpec *props[PROP_TIMEOUT + 1] = { NULL, }; + enum { RELOAD, LAST_SIGNAL @@ -65,11 +75,11 @@ enum { static guint signals[LAST_SIGNAL] = { 0 }; struct _GResolverPrivate { + unsigned timeout_ms; + #ifdef G_OS_UNIX GMutex mutex; time_t resolv_conf_timestamp; /* protected by @mutex */ -#else - int dummy; #endif }; @@ -152,6 +162,42 @@ g_resolver_real_lookup_service_finish (GResolver *resolver, } static void +g_resolver_get_property (GObject *object, + guint prop_id, + GValue *value, + GParamSpec *pspec) +{ + GResolver *self = G_RESOLVER (object); + + switch ((GResolverProperty) prop_id) + { + case PROP_TIMEOUT: + g_value_set_uint (value, g_resolver_get_timeout (self)); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + } +} + +static void +g_resolver_set_property (GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + GResolver *self = G_RESOLVER (object); + + switch ((GResolverProperty) prop_id) + { + case PROP_TIMEOUT: + g_resolver_set_timeout (self, g_value_get_uint (value)); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + } +} + +static void g_resolver_finalize (GObject *object) { #ifdef G_OS_UNIX @@ -168,6 +214,8 @@ g_resolver_class_init (GResolverClass *resolver_class) { GObjectClass *object_class = G_OBJECT_CLASS (resolver_class); + object_class->get_property = g_resolver_get_property; + object_class->set_property = g_resolver_set_property; object_class->finalize = g_resolver_finalize; /* Automatically pass these over to the lookup_records methods */ @@ -176,6 +224,31 @@ g_resolver_class_init (GResolverClass *resolver_class) resolver_class->lookup_service_finish = g_resolver_real_lookup_service_finish; /** + * GResolver:timeout: + * + * The timeout applied to all resolver lookups, in milliseconds. + * + * This may be changed through the lifetime of the #GResolver. The new value + * will apply to any lookups started after the change, but not to any + * already-ongoing lookups. + * + * If this is `0`, no timeout is applied to lookups. + * + * No timeout was applied to lookups before this property was added in + * GLib 2.78. + * + * Since: 2.78 + */ + props[PROP_TIMEOUT] = + g_param_spec_uint ("timeout", + P_("Timeout"), + P_("Timeout (ms) applied to all resolver lookups"), + 0, G_MAXUINT, 0, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_EXPLICIT_NOTIFY); + + g_object_class_install_properties (object_class, G_N_ELEMENTS (props), props); + + /** * GResolver::reload: * @resolver: a #GResolver * @@ -230,7 +303,9 @@ g_resolver_get_default (void) G_LOCK (default_resolver); if (!default_resolver) - default_resolver = g_object_new (G_TYPE_THREADED_RESOLVER, NULL); + default_resolver = g_object_new (G_TYPE_THREADED_RESOLVER, + "timeout", 30000, + NULL); ret = g_object_ref (default_resolver); G_UNLOCK (default_resolver); @@ -1245,6 +1320,49 @@ g_resolver_get_serial (GResolver *resolver) } /** + * g_resolver_get_timeout: + * @resolver: a #GResolver + * + * Get the timeout applied to all resolver lookups. See #GResolver:timeout. + * + * Returns: the resolver timeout, in milliseconds, or `0` for no timeout + * Since: 2.78 + */ +unsigned +g_resolver_get_timeout (GResolver *resolver) +{ + GResolverPrivate *priv = g_resolver_get_instance_private (resolver); + + g_return_val_if_fail (G_IS_RESOLVER (resolver), 0); + + return priv->timeout_ms; +} + +/** + * g_resolver_set_timeout: + * @resolver: a #GResolver + * @timeout_ms: timeout in milliseconds, or `0` for no timeouts + * + * Set the timeout applied to all resolver lookups. See #GResolver:timeout. + * + * Since: 2.78 + */ +void +g_resolver_set_timeout (GResolver *resolver, + unsigned timeout_ms) +{ + GResolverPrivate *priv = g_resolver_get_instance_private (resolver); + + g_return_if_fail (G_IS_RESOLVER (resolver)); + + if (priv->timeout_ms == timeout_ms) + return; + + priv->timeout_ms = timeout_ms; + g_object_notify_by_pspec (G_OBJECT (resolver), props[PROP_TIMEOUT]); +} + +/** * g_resolver_error_quark: * * Gets the #GResolver Error Quark. diff --git a/gio/gresolver.h b/gio/gresolver.h index 2dffeadbf..9b9a8a81a 100644 --- a/gio/gresolver.h +++ b/gio/gresolver.h @@ -277,6 +277,11 @@ GList *g_resolver_lookup_records_finish (GResolver GIO_AVAILABLE_IN_ALL void g_resolver_free_targets (GList *targets); +GIO_AVAILABLE_IN_2_78 +unsigned g_resolver_get_timeout (GResolver *resolver); +GIO_AVAILABLE_IN_2_78 +void g_resolver_set_timeout (GResolver *resolver, + unsigned timeout_ms); /** * G_RESOLVER_ERROR: diff --git a/gio/gthreadedresolver.c b/gio/gthreadedresolver.c index 68b5c20d7..b452d1e1b 100644 --- a/gio/gthreadedresolver.c +++ b/gio/gthreadedresolver.c @@ -28,6 +28,7 @@ #include <stdio.h> #include <string.h> +#include "glib/glib-private.h" #include "gthreadedresolver.h" #include "gnetworkingprivate.h" @@ -38,12 +39,83 @@ #include "gsocketaddress.h" #include "gsrvtarget.h" +/* + * GThreadedResolver is a threaded wrapper around the system libc’s + * `getaddrinfo()`. + * + * It has to be threaded, as `getaddrinfo()` is synchronous. libc does provide + * `getaddrinfo_a()` as an asynchronous version of `getaddrinfo()`, but it does + * not integrate with a poll loop. It requires use of sigevent to notify of + * completion of an asynchronous operation. That either emits a signal, or calls + * a callback function in a newly spawned thread. + * + * A signal (`SIGEV_SIGNAL`) can’t be used for completion as (aside from being + * another expensive round trip into the kernel) GLib cannot pick a `SIG*` + * number which is guaranteed to not be in use elsewhere in the process. Various + * other things could be interfering with signal dispositions, such as gdb or + * other libraries in the process. Using a `signalfd()` + * [cannot improve this situation](https://ldpreload.com/blog/signalfd-is-useless). + * + * A callback function in a newly spawned thread (`SIGEV_THREAD`) could be used, + * but that is very expensive. Internally, glibc currently also just implements + * `getaddrinfo_a()` + * [using its own thread pool](https://github.com/bminor/glibc/blob/master/resolv/gai_misc.c), + * and then + * [spawns an additional thread for each completion callback](https://github.com/bminor/glibc/blob/master/resolv/gai_notify.c). + * That is very expensive. + * + * No other appropriate sigevent callback types + * [currently exist](https://sourceware.org/bugzilla/show_bug.cgi?id=30287), and + * [others agree that sigevent is not great](http://davmac.org/davpage/linux/async-io.html#posixaio). + * + * Hence, #GThreadedResolver calls the normal synchronous `getaddrinfo()` in its + * own thread pool. Previously, #GThreadedResolver used the thread pool which is + * internal to #GTask by calling g_task_run_in_thread(). That lead to exhaustion + * of the #GTask thread pool in some situations, though, as DNS lookups are + * quite frequent leaf operations in some use cases. Now, #GThreadedResolver + * uses its own private thread pool. + * + * This is similar to what + * [libasyncns](http://git.0pointer.net/libasyncns.git/tree/libasyncns/asyncns.h) + * and other multi-threaded users of `getaddrinfo()` do. + */ + +struct _GThreadedResolver +{ + GResolver parent_instance; + + GThreadPool *thread_pool; /* (owned) */ +}; G_DEFINE_TYPE (GThreadedResolver, g_threaded_resolver, G_TYPE_RESOLVER) +static void run_task_in_thread_pool_async (GThreadedResolver *self, + GTask *task); +static void run_task_in_thread_pool_sync (GThreadedResolver *self, + GTask *task); +static void threaded_resolver_worker_cb (gpointer task_data, + gpointer user_data); + static void -g_threaded_resolver_init (GThreadedResolver *gtr) +g_threaded_resolver_init (GThreadedResolver *self) { + self->thread_pool = g_thread_pool_new_full (threaded_resolver_worker_cb, + self, + (GDestroyNotify) g_object_unref, + 20, + FALSE, + NULL); +} + +static void +g_threaded_resolver_finalize (GObject *object) +{ + GThreadedResolver *self = G_THREADED_RESOLVER (object); + + g_thread_pool_free (self->thread_pool, TRUE, FALSE); + self->thread_pool = NULL; + + G_OBJECT_CLASS (g_threaded_resolver_parent_class)->finalize (object); } static GResolverError @@ -67,35 +139,127 @@ g_resolver_error_from_addrinfo_error (gint err) } typedef struct { - char *hostname; - int address_family; + enum { + LOOKUP_BY_NAME, + LOOKUP_BY_ADDRESS, + LOOKUP_RECORDS, + } lookup_type; + + union { + struct { + char *hostname; + int address_family; + } lookup_by_name; + struct { + GInetAddress *address; /* (owned) */ + } lookup_by_address; + struct { + char *rrname; + GResolverRecordType record_type; + } lookup_records; + }; + + GCond cond; /* used for signalling completion of the task when running it sync */ + GMutex lock; + + GSource *timeout_source; /* (nullable) (owned) */ + GSource *cancellable_source; /* (nullable) (owned) */ + + /* This enum indicates that a particular code path has claimed the + * task and is shortly about to call g_task_return_*() on it. + * This must be accessed with GThreadedResolver.lock held. */ + enum + { + NOT_YET, + COMPLETED, /* libc lookup call has completed successfully or errored */ + TIMED_OUT, + CANCELLED, + } will_return; + + /* Whether the thread pool thread executing this lookup has finished executing + * it and g_task_return_*() has been called on it already. + * This must be accessed with GThreadedResolver.lock held. */ + gboolean has_returned; } LookupData; static LookupData * -lookup_data_new (const char *hostname, - int address_family) +lookup_data_new_by_name (const char *hostname, + int address_family) { - LookupData *data = g_new (LookupData, 1); - data->hostname = g_strdup (hostname); - data->address_family = address_family; - return data; + LookupData *data = g_new0 (LookupData, 1); + data->lookup_type = LOOKUP_BY_NAME; + g_cond_init (&data->cond); + g_mutex_init (&data->lock); + data->lookup_by_name.hostname = g_strdup (hostname); + data->lookup_by_name.address_family = address_family; + return g_steal_pointer (&data); +} + +static LookupData * +lookup_data_new_by_address (GInetAddress *address) +{ + LookupData *data = g_new0 (LookupData, 1); + data->lookup_type = LOOKUP_BY_ADDRESS; + g_cond_init (&data->cond); + g_mutex_init (&data->lock); + data->lookup_by_address.address = g_object_ref (address); + return g_steal_pointer (&data); +} + +static LookupData * +lookup_data_new_records (const gchar *rrname, + GResolverRecordType record_type) +{ + LookupData *data = g_new0 (LookupData, 1); + data->lookup_type = LOOKUP_RECORDS; + g_cond_init (&data->cond); + g_mutex_init (&data->lock); + data->lookup_records.rrname = g_strdup (rrname); + data->lookup_records.record_type = record_type; + return g_steal_pointer (&data); } static void lookup_data_free (LookupData *data) { - g_free (data->hostname); + switch (data->lookup_type) { + case LOOKUP_BY_NAME: + g_free (data->lookup_by_name.hostname); + break; + case LOOKUP_BY_ADDRESS: + g_clear_object (&data->lookup_by_address.address); + break; + case LOOKUP_RECORDS: + g_free (data->lookup_records.rrname); + break; + default: + g_assert_not_reached (); + } + + if (data->timeout_source != NULL) + { + g_source_destroy (data->timeout_source); + g_clear_pointer (&data->timeout_source, g_source_unref); + } + + if (data->cancellable_source != NULL) + { + g_source_destroy (data->cancellable_source); + g_clear_pointer (&data->cancellable_source, g_source_unref); + } + + g_mutex_clear (&data->lock); + g_cond_clear (&data->cond); + g_free (data); } -static void -do_lookup_by_name (GTask *task, - gpointer source_object, - gpointer task_data, - GCancellable *cancellable) +static GList * +do_lookup_by_name (const gchar *hostname, + int address_family, + GCancellable *cancellable, + GError **error) { - LookupData *lookup_data = task_data; - const char *hostname = lookup_data->hostname; struct addrinfo *res = NULL; GList *addresses; gint retval; @@ -111,7 +275,7 @@ do_lookup_by_name (GTask *task, addrinfo_hints.ai_socktype = SOCK_STREAM; addrinfo_hints.ai_protocol = IPPROTO_TCP; - addrinfo_hints.ai_family = lookup_data->address_family; + addrinfo_hints.ai_family = address_family; retval = getaddrinfo (hostname, NULL, &addrinfo_hints, &res); if (retval == 0) @@ -137,21 +301,23 @@ do_lookup_by_name (GTask *task, g_object_unref (sockaddr); } + g_clear_pointer (&res, freeaddrinfo); + if (addresses != NULL) { addresses = g_list_reverse (addresses); - g_task_return_pointer (task, addresses, - (GDestroyNotify)g_resolver_free_addresses); + return g_steal_pointer (&addresses); } else { /* All addresses failed to be converted to GSocketAddresses. */ - g_task_return_new_error (task, - G_RESOLVER_ERROR, - G_RESOLVER_ERROR_NOT_FOUND, - _("Error resolving “%s”: %s"), - hostname, - _("No valid addresses were found")); + g_set_error (error, + G_RESOLVER_ERROR, + G_RESOLVER_ERROR_NOT_FOUND, + _("Error resolving “%s”: %s"), + hostname, + _("No valid addresses were found")); + return NULL; } } else @@ -164,16 +330,17 @@ do_lookup_by_name (GTask *task, error_message = g_strdup ("[Invalid UTF-8]"); #endif - g_task_return_new_error (task, - G_RESOLVER_ERROR, - g_resolver_error_from_addrinfo_error (retval), - _("Error resolving “%s”: %s"), - hostname, error_message); + g_clear_pointer (&res, freeaddrinfo); + + g_set_error (error, + G_RESOLVER_ERROR, + g_resolver_error_from_addrinfo_error (retval), + _("Error resolving “%s”: %s"), + hostname, error_message); g_free (error_message); - } - if (res) - freeaddrinfo (res); + return NULL; + } } static GList * @@ -182,17 +349,19 @@ lookup_by_name (GResolver *resolver, GCancellable *cancellable, GError **error) { + GThreadedResolver *self = G_THREADED_RESOLVER (resolver); GTask *task; GList *addresses; LookupData *data; - data = lookup_data_new (hostname, AF_UNSPEC); + data = lookup_data_new_by_name (hostname, AF_UNSPEC); task = g_task_new (resolver, cancellable, NULL, NULL); g_task_set_source_tag (task, lookup_by_name); g_task_set_name (task, "[gio] resolver lookup"); - g_task_set_task_data (task, data, (GDestroyNotify)lookup_data_free); - g_task_set_return_on_cancel (task, TRUE); - g_task_run_in_thread_sync (task, do_lookup_by_name); + g_task_set_task_data (task, g_steal_pointer (&data), (GDestroyNotify) lookup_data_free); + + run_task_in_thread_pool_sync (self, task); + addresses = g_task_propagate_pointer (task, error); g_object_unref (task); @@ -224,17 +393,19 @@ lookup_by_name_with_flags (GResolver *resolver, GCancellable *cancellable, GError **error) { + GThreadedResolver *self = G_THREADED_RESOLVER (resolver); GTask *task; GList *addresses; LookupData *data; - data = lookup_data_new (hostname, flags_to_family (flags)); + data = lookup_data_new_by_name (hostname, flags_to_family (flags)); task = g_task_new (resolver, cancellable, NULL, NULL); g_task_set_source_tag (task, lookup_by_name_with_flags); g_task_set_name (task, "[gio] resolver lookup"); - g_task_set_task_data (task, data, (GDestroyNotify)lookup_data_free); - g_task_set_return_on_cancel (task, TRUE); - g_task_run_in_thread_sync (task, do_lookup_by_name); + g_task_set_task_data (task, g_steal_pointer (&data), (GDestroyNotify) lookup_data_free); + + run_task_in_thread_pool_sync (self, task); + addresses = g_task_propagate_pointer (task, error); g_object_unref (task); @@ -249,16 +420,22 @@ lookup_by_name_with_flags_async (GResolver *resolver, GAsyncReadyCallback callback, gpointer user_data) { + GThreadedResolver *self = G_THREADED_RESOLVER (resolver); GTask *task; LookupData *data; - data = lookup_data_new (hostname, flags_to_family (flags)); + data = lookup_data_new_by_name (hostname, flags_to_family (flags)); task = g_task_new (resolver, cancellable, callback, user_data); + + g_debug ("%s: starting new lookup for %s with GTask %p, LookupData %p", + G_STRFUNC, hostname, task, data); + g_task_set_source_tag (task, lookup_by_name_with_flags_async); g_task_set_name (task, "[gio] resolver lookup"); - g_task_set_task_data (task, data, (GDestroyNotify)lookup_data_free); - g_task_set_return_on_cancel (task, TRUE); - g_task_run_in_thread (task, do_lookup_by_name); + g_task_set_task_data (task, g_steal_pointer (&data), (GDestroyNotify) lookup_data_free); + + run_task_in_thread_pool_async (self, task); + g_object_unref (task); } @@ -297,13 +474,11 @@ lookup_by_name_with_flags_finish (GResolver *resolver, return g_task_propagate_pointer (G_TASK (result), error); } -static void -do_lookup_by_address (GTask *task, - gpointer source_object, - gpointer task_data, - GCancellable *cancellable) +static gchar * +do_lookup_by_address (GInetAddress *address, + GCancellable *cancellable, + GError **error) { - GInetAddress *address = task_data; struct sockaddr_storage sockaddr_address; gsize sockaddr_address_size; GSocketAddress *gsockaddr; @@ -319,7 +494,7 @@ do_lookup_by_address (GTask *task, retval = getnameinfo ((struct sockaddr *) &sockaddr_address, sockaddr_address_size, name, sizeof (name), NULL, 0, NI_NAMEREQD); if (retval == 0) - g_task_return_pointer (task, g_strdup (name), g_free); + return g_strdup (name); else { gchar *phys; @@ -333,14 +508,16 @@ do_lookup_by_address (GTask *task, #endif phys = g_inet_address_to_string (address); - g_task_return_new_error (task, - G_RESOLVER_ERROR, - g_resolver_error_from_addrinfo_error (retval), - _("Error reverse-resolving “%s”: %s"), - phys ? phys : "(unknown)", - error_message); + g_set_error (error, + G_RESOLVER_ERROR, + g_resolver_error_from_addrinfo_error (retval), + _("Error reverse-resolving “%s”: %s"), + phys ? phys : "(unknown)", + error_message); g_free (phys); g_free (error_message); + + return NULL; } } @@ -350,15 +527,19 @@ lookup_by_address (GResolver *resolver, GCancellable *cancellable, GError **error) { + GThreadedResolver *self = G_THREADED_RESOLVER (resolver); + LookupData *data = NULL; GTask *task; gchar *name; + data = lookup_data_new_by_address (address); task = g_task_new (resolver, cancellable, NULL, NULL); g_task_set_source_tag (task, lookup_by_address); g_task_set_name (task, "[gio] resolver lookup"); - g_task_set_task_data (task, g_object_ref (address), g_object_unref); - g_task_set_return_on_cancel (task, TRUE); - g_task_run_in_thread_sync (task, do_lookup_by_address); + g_task_set_task_data (task, g_steal_pointer (&data), (GDestroyNotify) lookup_data_free); + + run_task_in_thread_pool_sync (self, task); + name = g_task_propagate_pointer (task, error); g_object_unref (task); @@ -372,14 +553,18 @@ lookup_by_address_async (GResolver *resolver, GAsyncReadyCallback callback, gpointer user_data) { + GThreadedResolver *self = G_THREADED_RESOLVER (resolver); + LookupData *data = NULL; GTask *task; + data = lookup_data_new_by_address (address); task = g_task_new (resolver, cancellable, callback, user_data); g_task_set_source_tag (task, lookup_by_address_async); g_task_set_name (task, "[gio] resolver lookup"); - g_task_set_task_data (task, g_object_ref (address), g_object_unref); - g_task_set_return_on_cancel (task, TRUE); - g_task_run_in_thread (task, do_lookup_by_address); + g_task_set_task_data (task, g_steal_pointer (&data), (GDestroyNotify) lookup_data_free); + + run_task_in_thread_pool_async (self, task); + g_object_unref (task); } @@ -1066,18 +1251,6 @@ g_resolver_records_from_DnsQuery (const gchar *rrname, #endif -typedef struct { - char *rrname; - GResolverRecordType record_type; -} LookupRecordsData; - -static void -free_lookup_records_data (LookupRecordsData *lrd) -{ - g_free (lrd->rrname); - g_slice_free (LookupRecordsData, lrd); -} - static void free_records (GList *records) { @@ -1093,15 +1266,13 @@ int res_query(const char *, int, int, u_char *, int); #endif #endif -static void -do_lookup_records (GTask *task, - gpointer source_object, - gpointer task_data, - GCancellable *cancellable) +static GList * +do_lookup_records (const gchar *rrname, + GResolverRecordType record_type, + GCancellable *cancellable, + GError **error) { - LookupRecordsData *lrd = task_data; GList *records; - GError *error = NULL; #if defined(G_OS_UNIX) gint len = 512; @@ -1125,21 +1296,21 @@ do_lookup_records (GTask *task, struct __res_state res = { 0, }; if (res_ninit (&res) != 0) { - g_task_return_new_error (task, G_RESOLVER_ERROR, G_RESOLVER_ERROR_INTERNAL, - _("Error resolving “%s”"), lrd->rrname); - return; + g_set_error (error, G_RESOLVER_ERROR, G_RESOLVER_ERROR_INTERNAL, + _("Error resolving “%s”"), rrname); + return NULL; } #endif - rrtype = g_resolver_record_type_to_rrtype (lrd->record_type); + rrtype = g_resolver_record_type_to_rrtype (record_type); answer = g_byte_array_new (); for (;;) { g_byte_array_set_size (answer, len * 2); #if defined(HAVE_RES_NQUERY) - len = res_nquery (&res, lrd->rrname, C_IN, rrtype, answer->data, answer->len); + len = res_nquery (&res, rrname, C_IN, rrtype, answer->data, answer->len); #else - len = res_query (lrd->rrname, C_IN, rrtype, answer->data, answer->len); + len = res_query (rrname, C_IN, rrtype, answer->data, answer->len); #endif /* If answer fit in the buffer then we're done */ @@ -1153,7 +1324,7 @@ do_lookup_records (GTask *task, } herr = h_errno; - records = g_resolver_records_from_res_query (lrd->rrname, rrtype, answer->data, len, herr, &error); + records = g_resolver_records_from_res_query (rrname, rrtype, answer->data, len, herr, error); g_byte_array_free (answer, TRUE); #ifdef HAVE_RES_NQUERY @@ -1174,18 +1345,15 @@ do_lookup_records (GTask *task, DNS_RECORD *results = NULL; WORD dnstype; - dnstype = g_resolver_record_type_to_dnstype (lrd->record_type); - status = DnsQuery_A (lrd->rrname, dnstype, DNS_QUERY_STANDARD, NULL, &results, NULL); - records = g_resolver_records_from_DnsQuery (lrd->rrname, dnstype, status, results, &error); + dnstype = g_resolver_record_type_to_dnstype (record_type); + status = DnsQuery_A (rrname, dnstype, DNS_QUERY_STANDARD, NULL, &results, NULL); + records = g_resolver_records_from_DnsQuery (rrname, dnstype, status, results, error); if (results != NULL) DnsRecordListFree (results, DnsFreeRecordList); #endif - if (records) - g_task_return_pointer (task, records, (GDestroyNotify) free_records); - else - g_task_return_error (task, error); + return g_steal_pointer (&records); } static GList * @@ -1195,21 +1363,20 @@ lookup_records (GResolver *resolver, GCancellable *cancellable, GError **error) { + GThreadedResolver *self = G_THREADED_RESOLVER (resolver); GTask *task; GList *records; - LookupRecordsData *lrd; + LookupData *data = NULL; task = g_task_new (resolver, cancellable, NULL, NULL); g_task_set_source_tag (task, lookup_records); g_task_set_name (task, "[gio] resolver lookup records"); - lrd = g_slice_new (LookupRecordsData); - lrd->rrname = g_strdup (rrname); - lrd->record_type = record_type; - g_task_set_task_data (task, lrd, (GDestroyNotify) free_lookup_records_data); + data = lookup_data_new_records (rrname, record_type); + g_task_set_task_data (task, g_steal_pointer (&data), (GDestroyNotify) lookup_data_free); + + run_task_in_thread_pool_sync (self, task); - g_task_set_return_on_cancel (task, TRUE); - g_task_run_in_thread_sync (task, do_lookup_records); records = g_task_propagate_pointer (task, error); g_object_unref (task); @@ -1224,20 +1391,19 @@ lookup_records_async (GResolver *resolver, GAsyncReadyCallback callback, gpointer user_data) { + GThreadedResolver *self = G_THREADED_RESOLVER (resolver); GTask *task; - LookupRecordsData *lrd; + LookupData *data = NULL; task = g_task_new (resolver, cancellable, callback, user_data); g_task_set_source_tag (task, lookup_records_async); g_task_set_name (task, "[gio] resolver lookup records"); - lrd = g_slice_new (LookupRecordsData); - lrd->rrname = g_strdup (rrname); - lrd->record_type = record_type; - g_task_set_task_data (task, lrd, (GDestroyNotify) free_lookup_records_data); + data = lookup_data_new_records (rrname, record_type); + g_task_set_task_data (task, g_steal_pointer (&data), (GDestroyNotify) lookup_data_free); + + run_task_in_thread_pool_async (self, task); - g_task_set_return_on_cancel (task, TRUE); - g_task_run_in_thread (task, do_lookup_records); g_object_unref (task); } @@ -1251,12 +1417,210 @@ lookup_records_finish (GResolver *resolver, return g_task_propagate_pointer (G_TASK (result), error); } +/* Will be called in the GLib worker thread, so must lock all accesses to shared + * data. */ +static gboolean +timeout_cb (gpointer user_data) +{ + GTask *task = G_TASK (user_data); + LookupData *data = g_task_get_task_data (task); + gboolean should_return; + + g_mutex_lock (&data->lock); + + should_return = g_atomic_int_compare_and_exchange (&data->will_return, NOT_YET, TIMED_OUT); + g_clear_pointer (&data->timeout_source, g_source_unref); + + g_mutex_unlock (&data->lock); + + if (should_return) + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_TIMED_OUT, + _("Socket I/O timed out")); + + /* Signal completion of the task. */ + g_mutex_lock (&data->lock); + g_assert (!data->has_returned); + data->has_returned = TRUE; + g_cond_broadcast (&data->cond); + g_mutex_unlock (&data->lock); + + return G_SOURCE_REMOVE; +} + +/* Will be called in the GLib worker thread, so must lock all accesses to shared + * data. */ +static gboolean +cancelled_cb (GCancellable *cancellable, + gpointer user_data) +{ + GTask *task = G_TASK (user_data); + LookupData *data = g_task_get_task_data (task); + gboolean should_return; + + g_mutex_lock (&data->lock); + + g_assert (g_cancellable_is_cancelled (cancellable)); + should_return = g_atomic_int_compare_and_exchange (&data->will_return, NOT_YET, CANCELLED); + g_clear_pointer (&data->cancellable_source, g_source_unref); + + g_mutex_unlock (&data->lock); + + if (should_return) + g_task_return_error_if_cancelled (task); + + /* Signal completion of the task. */ + g_mutex_lock (&data->lock); + g_assert (!data->has_returned); + data->has_returned = TRUE; + g_cond_broadcast (&data->cond); + g_mutex_unlock (&data->lock); + + return G_SOURCE_REMOVE; +} + +static void +run_task_in_thread_pool_async (GThreadedResolver *self, + GTask *task) +{ + LookupData *data = g_task_get_task_data (task); + guint timeout_ms = g_resolver_get_timeout (G_RESOLVER (self)); + GCancellable *cancellable = g_task_get_cancellable (task); + + g_mutex_lock (&data->lock); + + g_thread_pool_push (self->thread_pool, g_object_ref (task), NULL); + + if (timeout_ms != 0) + { + data->timeout_source = g_timeout_source_new (timeout_ms); + g_source_set_static_name (data->timeout_source, "[gio] threaded resolver timeout"); + g_source_set_callback (data->timeout_source, G_SOURCE_FUNC (timeout_cb), task, NULL); + g_source_attach (data->timeout_source, GLIB_PRIVATE_CALL (g_get_worker_context) ()); + } + + if (cancellable != NULL) + { + data->cancellable_source = g_cancellable_source_new (cancellable); + g_source_set_static_name (data->cancellable_source, "[gio] threaded resolver cancellable"); + g_source_set_callback (data->cancellable_source, G_SOURCE_FUNC (cancelled_cb), task, NULL); + g_source_attach (data->cancellable_source, GLIB_PRIVATE_CALL (g_get_worker_context) ()); + } + + g_mutex_unlock (&data->lock); +} + +static void +run_task_in_thread_pool_sync (GThreadedResolver *self, + GTask *task) +{ + LookupData *data = g_task_get_task_data (task); + + run_task_in_thread_pool_async (self, task); + + g_mutex_lock (&data->lock); + while (!data->has_returned) + g_cond_wait (&data->cond, &data->lock); + g_mutex_unlock (&data->lock); +} + +static void +threaded_resolver_worker_cb (gpointer task_data, + gpointer user_data) +{ + GTask *task = G_TASK (g_steal_pointer (&task_data)); + LookupData *data = g_task_get_task_data (task); + GCancellable *cancellable = g_task_get_cancellable (task); + GError *local_error = NULL; + gboolean should_return; + + switch (data->lookup_type) { + case LOOKUP_BY_NAME: + { + GList *addresses = do_lookup_by_name (data->lookup_by_name.hostname, + data->lookup_by_name.address_family, + cancellable, + &local_error); + + g_mutex_lock (&data->lock); + should_return = g_atomic_int_compare_and_exchange (&data->will_return, NOT_YET, COMPLETED); + g_mutex_unlock (&data->lock); + + if (should_return) + { + if (addresses != NULL) + g_task_return_pointer (task, g_steal_pointer (&addresses), (GDestroyNotify) g_resolver_free_addresses); + else + g_task_return_error (task, g_steal_pointer (&local_error)); + } + + g_clear_pointer (&addresses, g_resolver_free_addresses); + } + break; + case LOOKUP_BY_ADDRESS: + { + gchar *name = do_lookup_by_address (data->lookup_by_address.address, + cancellable, + &local_error); + + g_mutex_lock (&data->lock); + should_return = g_atomic_int_compare_and_exchange (&data->will_return, NOT_YET, COMPLETED); + g_mutex_unlock (&data->lock); + + if (should_return) + { + if (name != NULL) + g_task_return_pointer (task, g_steal_pointer (&name), g_free); + else + g_task_return_error (task, g_steal_pointer (&local_error)); + } + + g_clear_pointer (&name, g_free); + } + break; + case LOOKUP_RECORDS: + { + GList *records = do_lookup_records (data->lookup_records.rrname, + data->lookup_records.record_type, + cancellable, + &local_error); + + g_mutex_lock (&data->lock); + should_return = g_atomic_int_compare_and_exchange (&data->will_return, NOT_YET, COMPLETED); + g_mutex_unlock (&data->lock); + + if (should_return) + { + if (records != NULL) + g_task_return_pointer (task, g_steal_pointer (&records), (GDestroyNotify) free_records); + else + g_task_return_error (task, g_steal_pointer (&local_error)); + } + + g_clear_pointer (&records, free_records); + } + break; + default: + g_assert_not_reached (); + } + + /* Signal completion of a task. */ + g_mutex_lock (&data->lock); + g_assert (!data->has_returned); + data->has_returned = TRUE; + g_cond_broadcast (&data->cond); + g_mutex_unlock (&data->lock); + + g_object_unref (task); +} static void g_threaded_resolver_class_init (GThreadedResolverClass *threaded_class) { + GObjectClass *object_class = G_OBJECT_CLASS (threaded_class); GResolverClass *resolver_class = G_RESOLVER_CLASS (threaded_class); + object_class->finalize = g_threaded_resolver_finalize; + resolver_class->lookup_by_name = lookup_by_name; resolver_class->lookup_by_name_async = lookup_by_name_async; resolver_class->lookup_by_name_finish = lookup_by_name_finish; diff --git a/gio/gthreadedresolver.h b/gio/gthreadedresolver.h index ea76e4bfd..099df5b84 100644 --- a/gio/gthreadedresolver.h +++ b/gio/gthreadedresolver.h @@ -21,28 +21,23 @@ #ifndef __G_THREADED_RESOLVER_H__ #define __G_THREADED_RESOLVER_H__ +#include <gio/gio.h> #include <gio/gresolver.h> G_BEGIN_DECLS +/** + * GThreadedResolver: + * + * #GThreadedResolver is an implementation of #GResolver which calls the libc + * lookup functions in threads to allow them to run asynchronously. + * + * Since: 2.20 + */ #define G_TYPE_THREADED_RESOLVER (g_threaded_resolver_get_type ()) -#define G_THREADED_RESOLVER(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), G_TYPE_THREADED_RESOLVER, GThreadedResolver)) -#define G_THREADED_RESOLVER_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), G_TYPE_THREADED_RESOLVER, GThreadedResolverClass)) -#define G_IS_THREADED_RESOLVER(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), G_TYPE_THREADED_RESOLVER)) -#define G_IS_THREADED_RESOLVER_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), G_TYPE_THREADED_RESOLVER)) -#define G_THREADED_RESOLVER_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), G_TYPE_THREADED_RESOLVER, GThreadedResolverClass)) - -typedef struct { - GResolver parent_instance; -} GThreadedResolver; - -typedef struct { - GResolverClass parent_class; - -} GThreadedResolverClass; GIO_AVAILABLE_IN_ALL -GType g_threaded_resolver_get_type (void) G_GNUC_CONST; +G_DECLARE_FINAL_TYPE (GThreadedResolver, g_threaded_resolver, G, THREADED_RESOLVER, GResolver) /* Used for a private test API */ #ifdef G_OS_UNIX diff --git a/gio/tests/resolver.c b/gio/tests/resolver.c index ec9b9e9de..b8adb4a36 100644 --- a/gio/tests/resolver.c +++ b/gio/tests/resolver.c @@ -41,6 +41,7 @@ static int nlookups = 0; static gboolean synchronous = FALSE; static guint connectable_count = 0; static GResolverRecordType record_type = 0; +static gint timeout_ms = 0; static G_NORETURN void usage (void) @@ -688,7 +689,7 @@ static gboolean async_cancel (GIOChannel *source, GIOCondition cond, gpointer cancel) { g_cancellable_cancel (cancel); - return FALSE; + return G_SOURCE_REMOVE; } #endif @@ -722,6 +723,7 @@ static const GOptionEntry option_entries[] = { { "synchronous", 's', 0, G_OPTION_ARG_NONE, &synchronous, "Synchronous connections", NULL }, { "connectable", 'c', 0, G_OPTION_ARG_INT, &connectable_count, "Connectable count", "C" }, { "special-type", 't', 0, G_OPTION_ARG_CALLBACK, record_type_arg, "Record type like MX, TXT, NS or SOA", "RR" }, + { "timeout", 0, 0, G_OPTION_ARG_INT, &timeout_ms, "Timeout (ms)", "ms" }, G_OPTION_ENTRY_NULL, }; @@ -732,7 +734,7 @@ main (int argc, char **argv) GError *error = NULL; #ifdef G_OS_UNIX GIOChannel *chan; - guint watch; + GSource *watch_source = NULL; #endif context = g_option_context_new ("lookups ..."); @@ -749,6 +751,9 @@ main (int argc, char **argv) resolver = g_resolver_get_default (); + if (timeout_ms != 0) + g_resolver_set_timeout (resolver, timeout_ms); + cancellable = g_cancellable_new (); #ifdef G_OS_UNIX @@ -763,7 +768,9 @@ main (int argc, char **argv) exit (1); } chan = g_io_channel_unix_new (cancel_fds[0]); - watch = g_io_add_watch (chan, G_IO_IN, async_cancel, cancellable); + watch_source = g_io_create_watch (chan, G_IO_IN); + g_source_set_callback (watch_source, (GSourceFunc) async_cancel, cancellable, NULL); + g_source_attach (watch_source, NULL); g_io_channel_unref (chan); #endif @@ -787,7 +794,8 @@ main (int argc, char **argv) g_main_loop_unref (loop); #ifdef G_OS_UNIX - g_source_remove (watch); + g_source_destroy (watch_source); + g_clear_pointer (&watch_source, g_source_unref); #endif g_object_unref (cancellable); g_option_context_free (context); |