summaryrefslogtreecommitdiff
path: root/libsoup/soup-session.c
diff options
context:
space:
mode:
authorCarlos Garcia Campos <cgarcia@igalia.com>2022-04-01 13:47:39 +0200
committerCarlos Garcia Campos <cgarcia@igalia.com>2022-06-08 12:36:17 +0200
commitf3cdcf8dfb58864b1e8d040f52fa67410dfdfc93 (patch)
treec6072e2f4096306c6d9cacac93d0937ea6a3593b /libsoup/soup-session.c
parenta81f7567dd344245dc077c105eb02ffd1ef32013 (diff)
downloadlibsoup-f3cdcf8dfb58864b1e8d040f52fa67410dfdfc93.tar.gz
session: make message queue handling thread safe
Ensure that queue items are only processed in the same thread they were created.
Diffstat (limited to 'libsoup/soup-session.c')
-rw-r--r--libsoup/soup-session.c178
1 files changed, 156 insertions, 22 deletions
diff --git a/libsoup/soup-session.c b/libsoup/soup-session.c
index 9f40f8f8..349e563b 100644
--- a/libsoup/soup-session.c
+++ b/libsoup/soup-session.c
@@ -83,9 +83,12 @@ typedef struct {
SoupSocketProperties *socket_props;
+ GMainContext *context;
+ GMutex queue_mutex;
GQueue *queue;
- GSource *queue_source;
- guint16 in_async_run_queue;
+ GMutex queue_sources_mutex;
+ GHashTable *queue_sources;
+ guint in_async_run_queue;
gboolean needs_queue_sort;
char *user_agent;
@@ -168,6 +171,7 @@ G_DEFINE_QUARK (soup-session-error-quark, soup_session_error)
typedef struct {
GSource source;
SoupSession* session;
+ guint num_items;
} SoupMessageQueueSource;
static gboolean
@@ -190,19 +194,89 @@ static GSourceFuncs queue_source_funcs = {
};
static void
+soup_session_add_queue_source (SoupSession *session,
+ GMainContext *context)
+{
+ SoupSessionPrivate *priv = soup_session_get_instance_private (session);
+ SoupMessageQueueSource *queue_source;
+
+ queue_source = g_hash_table_lookup (priv->queue_sources, context);
+ if (!queue_source) {
+ GSource *source;
+
+ source = g_source_new (&queue_source_funcs, sizeof (SoupMessageQueueSource));
+ queue_source = (SoupMessageQueueSource *)source;
+ queue_source->session = session;
+ queue_source->num_items = 0;
+ g_source_set_name (source, "SoupMessageQueue");
+ g_source_set_can_recurse (source, TRUE);
+ g_source_attach (source, context);
+ g_hash_table_insert (priv->queue_sources, context, source);
+ }
+
+ queue_source->num_items++;
+
+}
+
+static void
+soup_session_add_queue_source_for_item (SoupSession *session,
+ SoupMessageQueueItem *item)
+{
+ SoupSessionPrivate *priv = soup_session_get_instance_private (session);
+
+ if (item->context == priv->context)
+ return;
+
+ g_mutex_lock (&priv->queue_sources_mutex);
+ soup_session_add_queue_source (session, item->context);
+ g_mutex_unlock (&priv->queue_sources_mutex);
+}
+
+static void
+soup_session_remove_queue_source (SoupSession *session,
+ GMainContext *context)
+{
+ SoupSessionPrivate *priv = soup_session_get_instance_private (session);
+ SoupMessageQueueSource *queue_source;
+
+ queue_source = g_hash_table_lookup (priv->queue_sources, context);
+ if (!queue_source)
+ return;
+
+ if (--queue_source->num_items > 0)
+ return;
+
+ g_source_destroy ((GSource *)queue_source);
+ g_hash_table_remove (priv->queue_sources, context);
+}
+
+static void
+soup_session_remove_queue_source_for_item (SoupSession *session,
+ SoupMessageQueueItem *item)
+{
+ SoupSessionPrivate *priv = soup_session_get_instance_private (session);
+
+ if (item->context == priv->context)
+ return;
+
+ g_mutex_lock (&priv->queue_sources_mutex);
+ soup_session_remove_queue_source (session, item->context);
+ g_mutex_unlock (&priv->queue_sources_mutex);
+}
+
+static void
soup_session_init (SoupSession *session)
{
SoupSessionPrivate *priv = soup_session_get_instance_private (session);
SoupAuthManager *auth_manager;
- SoupMessageQueueSource *source;
+ priv->context = g_main_context_ref_thread_default ();
+ g_mutex_init (&priv->queue_mutex);
priv->queue = g_queue_new ();
- priv->queue_source = g_source_new (&queue_source_funcs, sizeof (SoupMessageQueueSource));
- source = (SoupMessageQueueSource *)priv->queue_source;
- source->session = session;
- g_source_set_name (priv->queue_source, "SoupMessageQueue");
- g_source_set_can_recurse (priv->queue_source, TRUE);
- g_source_attach (priv->queue_source, g_main_context_get_thread_default ());
+ g_mutex_init (&priv->queue_sources_mutex);
+ priv->queue_sources = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify)g_source_unref);
+ soup_session_add_queue_source (session, priv->context);
+
priv->io_timeout = priv->idle_timeout = 60;
priv->conn_manager = soup_connection_manager_new (session,
@@ -230,11 +304,20 @@ soup_session_init (SoupSession *session)
}
static void
+destroy_queue_source (gpointer key,
+ GSource *source)
+{
+ g_source_destroy (source);
+}
+
+static void
soup_session_dispose (GObject *object)
{
SoupSession *session = SOUP_SESSION (object);
SoupSessionPrivate *priv = soup_session_get_instance_private (session);
+ g_assert (priv->context == soup_thread_default_context ());
+
priv->disposed = TRUE;
soup_session_abort (session);
g_warn_if_fail (soup_connection_manager_get_num_conns (priv->conn_manager) == 0);
@@ -242,7 +325,7 @@ soup_session_dispose (GObject *object)
while (priv->features)
soup_session_remove_feature (session, priv->features->data);
- g_source_destroy (priv->queue_source);
+ g_hash_table_foreach (priv->queue_sources, (GHFunc)destroy_queue_source, NULL);
G_OBJECT_CLASS (soup_session_parent_class)->dispose (object);
}
@@ -253,9 +336,14 @@ soup_session_finalize (GObject *object)
SoupSession *session = SOUP_SESSION (object);
SoupSessionPrivate *priv = soup_session_get_instance_private (session);
+ g_assert (priv->context == soup_thread_default_context ());
+
g_warn_if_fail (g_queue_is_empty (priv->queue));
g_queue_free (priv->queue);
- g_source_unref (priv->queue_source);
+ g_mutex_clear (&priv->queue_mutex);
+ g_hash_table_destroy (priv->queue_sources);
+ g_mutex_clear (&priv->queue_sources_mutex);
+ g_main_context_unref (priv->context);
g_clear_pointer (&priv->conn_manager, soup_connection_manager_free);
@@ -958,7 +1046,9 @@ soup_session_lookup_queue (SoupSession *session,
SoupSessionPrivate *priv = soup_session_get_instance_private (session);
GList *link;
+ g_mutex_lock (&priv->queue_mutex);
link = g_queue_find_custom (priv->queue, data, compare_func);
+ g_mutex_unlock (&priv->queue_mutex);
return link ? (SoupMessageQueueItem *)link->data : NULL;
}
@@ -1192,13 +1282,15 @@ message_priority_changed (SoupMessage *msg,
{
SoupSessionPrivate *priv = soup_session_get_instance_private (item->session);
- if (priv->in_async_run_queue) {
- priv->needs_queue_sort = TRUE;
+ if (g_atomic_int_get (&priv->in_async_run_queue)) {
+ g_atomic_int_set (&priv->needs_queue_sort, TRUE);
return;
}
+ g_mutex_lock (&priv->queue_mutex);
g_queue_sort (priv->queue, (GCompareDataFunc)compare_queue_item, NULL);
- priv->needs_queue_sort = FALSE;
+ g_mutex_unlock (&priv->queue_mutex);
+ g_atomic_int_set (&priv->needs_queue_sort, FALSE);
}
static SoupMessageQueueItem *
@@ -1216,9 +1308,13 @@ soup_session_append_queue_item (SoupSession *session,
soup_message_set_is_preconnect (msg, FALSE);
item = soup_message_queue_item_new (session, msg, async, cancellable);
+ g_mutex_lock (&priv->queue_mutex);
g_queue_insert_sorted (priv->queue,
soup_message_queue_item_ref (item),
(GCompareDataFunc)compare_queue_item, NULL);
+ g_mutex_unlock (&priv->queue_mutex);
+
+ soup_session_add_queue_source_for_item (session, item);
if (!soup_message_query_flags (msg, SOUP_MESSAGE_NO_REDIRECT)) {
soup_message_add_header_handler (
@@ -1253,6 +1349,8 @@ soup_session_send_queue_item (SoupSession *session,
SoupMessageHeaders *request_headers;
const char *method;
+ g_assert (item->context == soup_thread_default_context ());
+
request_headers = soup_message_get_request_headers (item->msg);
if (priv->user_agent)
soup_message_headers_replace_common (request_headers, SOUP_HEADER_USER_AGENT, priv->user_agent);
@@ -1297,7 +1395,11 @@ soup_session_unqueue_item (SoupSession *session,
return;
}
+ g_mutex_lock (&priv->queue_mutex);
g_queue_remove (priv->queue, item);
+ g_mutex_unlock (&priv->queue_mutex);
+
+ soup_session_remove_queue_source_for_item (session, item);
/* g_signal_handlers_disconnect_by_func doesn't work if you
* have a metamarshal, meaning it doesn't work with
@@ -1321,6 +1423,8 @@ message_completed (SoupMessage *msg, SoupMessageIOCompletion completion, gpointe
{
SoupMessageQueueItem *item = user_data;
+ g_assert (item->context == soup_thread_default_context ());
+
if (item->async)
soup_session_kick_queue (item->session);
@@ -1389,6 +1493,8 @@ tunnel_message_completed (SoupMessage *msg, SoupMessageIOCompletion completion,
SoupSession *session = tunnel_item->session;
guint status;
+ g_assert (tunnel_item->context == soup_thread_default_context ());
+
if (tunnel_item->state == SOUP_MESSAGE_REQUEUED)
tunnel_item->state = SOUP_MESSAGE_RESTARTING;
@@ -1583,6 +1689,7 @@ soup_session_process_queue_item (SoupSession *session,
gboolean loop)
{
g_assert (item->session == session);
+ g_assert (item->context == soup_thread_default_context ());
do {
if (item->paused)
@@ -1660,33 +1767,49 @@ soup_session_process_queue_item (SoupSession *session,
}
static void
-process_queue_item (SoupMessageQueueItem *item)
+collect_queue_item (SoupMessageQueueItem *item,
+ GList **items)
{
if (!item->async)
return;
+ if (item->context != soup_thread_default_context ())
+ return;
+
/* CONNECT messages are handled specially */
if (soup_message_get_method (item->msg) == SOUP_METHOD_CONNECT)
return;
- soup_session_process_queue_item (item->session, item, TRUE);
+ *items = g_list_prepend (*items, item);
}
static void
async_run_queue (SoupSession *session)
{
SoupSessionPrivate *priv = soup_session_get_instance_private (session);
+ GList *items = NULL;
+ GList *i;
g_object_ref (session);
- priv->in_async_run_queue++;
+ g_atomic_int_inc (&priv->in_async_run_queue);
soup_connection_manager_cleanup (priv->conn_manager, FALSE);
- g_queue_foreach (priv->queue, (GFunc)process_queue_item, NULL);
+ g_mutex_lock (&priv->queue_mutex);
+ g_queue_foreach (priv->queue, (GFunc)collect_queue_item, &items);
+ g_mutex_unlock (&priv->queue_mutex);
- priv->in_async_run_queue--;
- if (!priv->in_async_run_queue && priv->needs_queue_sort) {
+ for (i = g_list_reverse (items); i != NULL; i = g_list_next (i)) {
+ SoupMessageQueueItem *item = (SoupMessageQueueItem *)i->data;
+ soup_session_process_queue_item (item->session, item, TRUE);
+ }
+
+ g_list_free (items);
+
+ if (g_atomic_int_dec_and_test (&priv->in_async_run_queue) && g_atomic_int_get (&priv->needs_queue_sort)) {
+ g_mutex_lock (&priv->queue_mutex);
g_queue_sort (priv->queue, (GCompareDataFunc)compare_queue_item, NULL);
- priv->needs_queue_sort = FALSE;
+ g_mutex_unlock (&priv->queue_mutex);
+ g_atomic_int_set (&priv->needs_queue_sort, FALSE);
}
g_object_unref (session);
@@ -1734,12 +1857,21 @@ soup_session_pause_message (SoupSession *session,
soup_message_io_pause (msg);
}
+static void
+kick_queue_source (gpointer key,
+ GSource *source)
+{
+ g_source_set_ready_time (source, 0);
+}
+
void
soup_session_kick_queue (SoupSession *session)
{
SoupSessionPrivate *priv = soup_session_get_instance_private (session);
- g_source_set_ready_time (priv->queue_source, 0);
+ g_mutex_lock (&priv->queue_sources_mutex);
+ g_hash_table_foreach (priv->queue_sources, (GHFunc)kick_queue_source, NULL);
+ g_mutex_unlock (&priv->queue_sources_mutex);
}
/**
@@ -1804,7 +1936,9 @@ soup_session_abort (SoupSession *session)
priv = soup_session_get_instance_private (session);
/* Cancel everything */
+ g_mutex_lock (&priv->queue_mutex);
g_queue_foreach (priv->queue, (GFunc)soup_message_queue_item_cancel, NULL);
+ g_mutex_unlock (&priv->queue_mutex);
/* Close all idle connections */
soup_connection_manager_cleanup (priv->conn_manager, TRUE);