diff options
author | Carlos Garcia Campos <cgarcia@igalia.com> | 2022-04-01 13:47:39 +0200 |
---|---|---|
committer | Carlos Garcia Campos <cgarcia@igalia.com> | 2022-06-08 12:36:17 +0200 |
commit | f3cdcf8dfb58864b1e8d040f52fa67410dfdfc93 (patch) | |
tree | c6072e2f4096306c6d9cacac93d0937ea6a3593b /libsoup/soup-session.c | |
parent | a81f7567dd344245dc077c105eb02ffd1ef32013 (diff) | |
download | libsoup-f3cdcf8dfb58864b1e8d040f52fa67410dfdfc93.tar.gz |
session: make message queue handling thread safe
Ensure that queue items are only processed in the same thread they were
created.
Diffstat (limited to 'libsoup/soup-session.c')
-rw-r--r-- | libsoup/soup-session.c | 178 |
1 files changed, 156 insertions, 22 deletions
diff --git a/libsoup/soup-session.c b/libsoup/soup-session.c index 9f40f8f8..349e563b 100644 --- a/libsoup/soup-session.c +++ b/libsoup/soup-session.c @@ -83,9 +83,12 @@ typedef struct { SoupSocketProperties *socket_props; + GMainContext *context; + GMutex queue_mutex; GQueue *queue; - GSource *queue_source; - guint16 in_async_run_queue; + GMutex queue_sources_mutex; + GHashTable *queue_sources; + guint in_async_run_queue; gboolean needs_queue_sort; char *user_agent; @@ -168,6 +171,7 @@ G_DEFINE_QUARK (soup-session-error-quark, soup_session_error) typedef struct { GSource source; SoupSession* session; + guint num_items; } SoupMessageQueueSource; static gboolean @@ -190,19 +194,89 @@ static GSourceFuncs queue_source_funcs = { }; static void +soup_session_add_queue_source (SoupSession *session, + GMainContext *context) +{ + SoupSessionPrivate *priv = soup_session_get_instance_private (session); + SoupMessageQueueSource *queue_source; + + queue_source = g_hash_table_lookup (priv->queue_sources, context); + if (!queue_source) { + GSource *source; + + source = g_source_new (&queue_source_funcs, sizeof (SoupMessageQueueSource)); + queue_source = (SoupMessageQueueSource *)source; + queue_source->session = session; + queue_source->num_items = 0; + g_source_set_name (source, "SoupMessageQueue"); + g_source_set_can_recurse (source, TRUE); + g_source_attach (source, context); + g_hash_table_insert (priv->queue_sources, context, source); + } + + queue_source->num_items++; + +} + +static void +soup_session_add_queue_source_for_item (SoupSession *session, + SoupMessageQueueItem *item) +{ + SoupSessionPrivate *priv = soup_session_get_instance_private (session); + + if (item->context == priv->context) + return; + + g_mutex_lock (&priv->queue_sources_mutex); + soup_session_add_queue_source (session, item->context); + g_mutex_unlock (&priv->queue_sources_mutex); +} + +static void +soup_session_remove_queue_source (SoupSession *session, + GMainContext *context) +{ + SoupSessionPrivate *priv = soup_session_get_instance_private (session); + SoupMessageQueueSource *queue_source; + + queue_source = g_hash_table_lookup (priv->queue_sources, context); + if (!queue_source) + return; + + if (--queue_source->num_items > 0) + return; + + g_source_destroy ((GSource *)queue_source); + g_hash_table_remove (priv->queue_sources, context); +} + +static void +soup_session_remove_queue_source_for_item (SoupSession *session, + SoupMessageQueueItem *item) +{ + SoupSessionPrivate *priv = soup_session_get_instance_private (session); + + if (item->context == priv->context) + return; + + g_mutex_lock (&priv->queue_sources_mutex); + soup_session_remove_queue_source (session, item->context); + g_mutex_unlock (&priv->queue_sources_mutex); +} + +static void soup_session_init (SoupSession *session) { SoupSessionPrivate *priv = soup_session_get_instance_private (session); SoupAuthManager *auth_manager; - SoupMessageQueueSource *source; + priv->context = g_main_context_ref_thread_default (); + g_mutex_init (&priv->queue_mutex); priv->queue = g_queue_new (); - priv->queue_source = g_source_new (&queue_source_funcs, sizeof (SoupMessageQueueSource)); - source = (SoupMessageQueueSource *)priv->queue_source; - source->session = session; - g_source_set_name (priv->queue_source, "SoupMessageQueue"); - g_source_set_can_recurse (priv->queue_source, TRUE); - g_source_attach (priv->queue_source, g_main_context_get_thread_default ()); + g_mutex_init (&priv->queue_sources_mutex); + priv->queue_sources = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify)g_source_unref); + soup_session_add_queue_source (session, priv->context); + priv->io_timeout = priv->idle_timeout = 60; priv->conn_manager = soup_connection_manager_new (session, @@ -230,11 +304,20 @@ soup_session_init (SoupSession *session) } static void +destroy_queue_source (gpointer key, + GSource *source) +{ + g_source_destroy (source); +} + +static void soup_session_dispose (GObject *object) { SoupSession *session = SOUP_SESSION (object); SoupSessionPrivate *priv = soup_session_get_instance_private (session); + g_assert (priv->context == soup_thread_default_context ()); + priv->disposed = TRUE; soup_session_abort (session); g_warn_if_fail (soup_connection_manager_get_num_conns (priv->conn_manager) == 0); @@ -242,7 +325,7 @@ soup_session_dispose (GObject *object) while (priv->features) soup_session_remove_feature (session, priv->features->data); - g_source_destroy (priv->queue_source); + g_hash_table_foreach (priv->queue_sources, (GHFunc)destroy_queue_source, NULL); G_OBJECT_CLASS (soup_session_parent_class)->dispose (object); } @@ -253,9 +336,14 @@ soup_session_finalize (GObject *object) SoupSession *session = SOUP_SESSION (object); SoupSessionPrivate *priv = soup_session_get_instance_private (session); + g_assert (priv->context == soup_thread_default_context ()); + g_warn_if_fail (g_queue_is_empty (priv->queue)); g_queue_free (priv->queue); - g_source_unref (priv->queue_source); + g_mutex_clear (&priv->queue_mutex); + g_hash_table_destroy (priv->queue_sources); + g_mutex_clear (&priv->queue_sources_mutex); + g_main_context_unref (priv->context); g_clear_pointer (&priv->conn_manager, soup_connection_manager_free); @@ -958,7 +1046,9 @@ soup_session_lookup_queue (SoupSession *session, SoupSessionPrivate *priv = soup_session_get_instance_private (session); GList *link; + g_mutex_lock (&priv->queue_mutex); link = g_queue_find_custom (priv->queue, data, compare_func); + g_mutex_unlock (&priv->queue_mutex); return link ? (SoupMessageQueueItem *)link->data : NULL; } @@ -1192,13 +1282,15 @@ message_priority_changed (SoupMessage *msg, { SoupSessionPrivate *priv = soup_session_get_instance_private (item->session); - if (priv->in_async_run_queue) { - priv->needs_queue_sort = TRUE; + if (g_atomic_int_get (&priv->in_async_run_queue)) { + g_atomic_int_set (&priv->needs_queue_sort, TRUE); return; } + g_mutex_lock (&priv->queue_mutex); g_queue_sort (priv->queue, (GCompareDataFunc)compare_queue_item, NULL); - priv->needs_queue_sort = FALSE; + g_mutex_unlock (&priv->queue_mutex); + g_atomic_int_set (&priv->needs_queue_sort, FALSE); } static SoupMessageQueueItem * @@ -1216,9 +1308,13 @@ soup_session_append_queue_item (SoupSession *session, soup_message_set_is_preconnect (msg, FALSE); item = soup_message_queue_item_new (session, msg, async, cancellable); + g_mutex_lock (&priv->queue_mutex); g_queue_insert_sorted (priv->queue, soup_message_queue_item_ref (item), (GCompareDataFunc)compare_queue_item, NULL); + g_mutex_unlock (&priv->queue_mutex); + + soup_session_add_queue_source_for_item (session, item); if (!soup_message_query_flags (msg, SOUP_MESSAGE_NO_REDIRECT)) { soup_message_add_header_handler ( @@ -1253,6 +1349,8 @@ soup_session_send_queue_item (SoupSession *session, SoupMessageHeaders *request_headers; const char *method; + g_assert (item->context == soup_thread_default_context ()); + request_headers = soup_message_get_request_headers (item->msg); if (priv->user_agent) soup_message_headers_replace_common (request_headers, SOUP_HEADER_USER_AGENT, priv->user_agent); @@ -1297,7 +1395,11 @@ soup_session_unqueue_item (SoupSession *session, return; } + g_mutex_lock (&priv->queue_mutex); g_queue_remove (priv->queue, item); + g_mutex_unlock (&priv->queue_mutex); + + soup_session_remove_queue_source_for_item (session, item); /* g_signal_handlers_disconnect_by_func doesn't work if you * have a metamarshal, meaning it doesn't work with @@ -1321,6 +1423,8 @@ message_completed (SoupMessage *msg, SoupMessageIOCompletion completion, gpointe { SoupMessageQueueItem *item = user_data; + g_assert (item->context == soup_thread_default_context ()); + if (item->async) soup_session_kick_queue (item->session); @@ -1389,6 +1493,8 @@ tunnel_message_completed (SoupMessage *msg, SoupMessageIOCompletion completion, SoupSession *session = tunnel_item->session; guint status; + g_assert (tunnel_item->context == soup_thread_default_context ()); + if (tunnel_item->state == SOUP_MESSAGE_REQUEUED) tunnel_item->state = SOUP_MESSAGE_RESTARTING; @@ -1583,6 +1689,7 @@ soup_session_process_queue_item (SoupSession *session, gboolean loop) { g_assert (item->session == session); + g_assert (item->context == soup_thread_default_context ()); do { if (item->paused) @@ -1660,33 +1767,49 @@ soup_session_process_queue_item (SoupSession *session, } static void -process_queue_item (SoupMessageQueueItem *item) +collect_queue_item (SoupMessageQueueItem *item, + GList **items) { if (!item->async) return; + if (item->context != soup_thread_default_context ()) + return; + /* CONNECT messages are handled specially */ if (soup_message_get_method (item->msg) == SOUP_METHOD_CONNECT) return; - soup_session_process_queue_item (item->session, item, TRUE); + *items = g_list_prepend (*items, item); } static void async_run_queue (SoupSession *session) { SoupSessionPrivate *priv = soup_session_get_instance_private (session); + GList *items = NULL; + GList *i; g_object_ref (session); - priv->in_async_run_queue++; + g_atomic_int_inc (&priv->in_async_run_queue); soup_connection_manager_cleanup (priv->conn_manager, FALSE); - g_queue_foreach (priv->queue, (GFunc)process_queue_item, NULL); + g_mutex_lock (&priv->queue_mutex); + g_queue_foreach (priv->queue, (GFunc)collect_queue_item, &items); + g_mutex_unlock (&priv->queue_mutex); - priv->in_async_run_queue--; - if (!priv->in_async_run_queue && priv->needs_queue_sort) { + for (i = g_list_reverse (items); i != NULL; i = g_list_next (i)) { + SoupMessageQueueItem *item = (SoupMessageQueueItem *)i->data; + soup_session_process_queue_item (item->session, item, TRUE); + } + + g_list_free (items); + + if (g_atomic_int_dec_and_test (&priv->in_async_run_queue) && g_atomic_int_get (&priv->needs_queue_sort)) { + g_mutex_lock (&priv->queue_mutex); g_queue_sort (priv->queue, (GCompareDataFunc)compare_queue_item, NULL); - priv->needs_queue_sort = FALSE; + g_mutex_unlock (&priv->queue_mutex); + g_atomic_int_set (&priv->needs_queue_sort, FALSE); } g_object_unref (session); @@ -1734,12 +1857,21 @@ soup_session_pause_message (SoupSession *session, soup_message_io_pause (msg); } +static void +kick_queue_source (gpointer key, + GSource *source) +{ + g_source_set_ready_time (source, 0); +} + void soup_session_kick_queue (SoupSession *session) { SoupSessionPrivate *priv = soup_session_get_instance_private (session); - g_source_set_ready_time (priv->queue_source, 0); + g_mutex_lock (&priv->queue_sources_mutex); + g_hash_table_foreach (priv->queue_sources, (GHFunc)kick_queue_source, NULL); + g_mutex_unlock (&priv->queue_sources_mutex); } /** @@ -1804,7 +1936,9 @@ soup_session_abort (SoupSession *session) priv = soup_session_get_instance_private (session); /* Cancel everything */ + g_mutex_lock (&priv->queue_mutex); g_queue_foreach (priv->queue, (GFunc)soup_message_queue_item_cancel, NULL); + g_mutex_unlock (&priv->queue_mutex); /* Close all idle connections */ soup_connection_manager_cleanup (priv->conn_manager, TRUE); |