summaryrefslogtreecommitdiff
path: root/libsoup/http2
diff options
context:
space:
mode:
authorCarlos Garcia Campos <cgarcia@igalia.com>2022-04-18 15:53:01 +0200
committerCarlos Garcia Campos <cgarcia@igalia.com>2022-06-08 12:36:17 +0200
commite3333be150200eba59220ad1ff82c3e3507bf78c (patch)
tree9427978bec4c07c2d67cccbbb1d96616f2465acf /libsoup/http2
parent522f8c5030da689d745558fb1fd02504d60d86ac (diff)
downloadlibsoup-e3333be150200eba59220ad1ff82c3e3507bf78c.tar.gz
http2: make message IO thread safe
nghttp2 session can't be used by multiple threads at the same time, so we need to ensure that only messages from the same thread share the connection. Connections in idle state can be reused from other threads, though but we need to ensure all the pending IO is completed before switching to another thread. When the connection switches to IN_USE state, the current thread becomes the owner of the connection IO. In the case of HTTP/2 there might be session IO not related to a particular message, in that case a thread with no default context is considered synchronous and all IO that is not explicitly sync or async will be sync.
Diffstat (limited to 'libsoup/http2')
-rw-r--r--libsoup/http2/soup-client-message-io-http2.c90
1 files changed, 69 insertions, 21 deletions
diff --git a/libsoup/http2/soup-client-message-io-http2.c b/libsoup/http2/soup-client-message-io-http2.c
index 16e46b02..fe6aeaab 100644
--- a/libsoup/http2/soup-client-message-io-http2.c
+++ b/libsoup/http2/soup-client-message-io-http2.c
@@ -62,6 +62,8 @@ typedef enum {
typedef struct {
SoupClientMessageIO iface;
+ GThread *owner;
+ gboolean async;
SoupConnection *conn;
GIOStream *stream;
GInputStream *istream;
@@ -429,7 +431,7 @@ io_try_write (SoupClientMessageIOHTTP2 *io,
io_write (io, blocking, NULL, &error);
}
- if (io->in_callback || g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+ if (!blocking && (io->in_callback || g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))) {
g_clear_error (&error);
io->write_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM (io->ostream), NULL);
g_source_set_name (io->write_source, "Soup HTTP/2 write source");
@@ -546,7 +548,7 @@ soup_client_message_io_http2_terminate_session (SoupClientMessageIOHTTP2 *io)
io->session_terminated = TRUE;
NGCHECK (nghttp2_session_terminate_session (io->session, NGHTTP2_NO_ERROR));
- io_try_write (io, FALSE);
+ io_try_write (io, !io->async);
}
/* HTTP2 read callbacks */
@@ -908,7 +910,11 @@ on_frame_send_callback (nghttp2_session *session,
break;
case NGHTTP2_RST_STREAM:
h2_debug (io, data, "[SEND] [RST_STREAM] stream_id=%u", frame->hd.stream_id);
- g_hash_table_foreach_remove (io->closed_messages, (GHRFunc)remove_closed_stream, (gpointer)frame);
+ if (g_hash_table_foreach_remove (io->closed_messages, (GHRFunc)remove_closed_stream, (gpointer)frame)) {
+ if (io->conn)
+ soup_connection_set_in_use (io->conn, FALSE);
+ }
+
break;
case NGHTTP2_GOAWAY:
h2_debug (io, data, "[SEND] [%s]", frame_type_to_string (frame->hd.type));
@@ -920,7 +926,7 @@ on_frame_send_callback (nghttp2_session *session,
source = g_idle_source_new ();
g_source_set_name (source, "Soup HTTP/2 close source");
g_source_set_callback (source, (GSourceFunc)close_in_idle_cb, io, NULL);
- g_source_attach (source, g_main_context_get_thread_default ());
+ g_source_attach (source, g_task_get_context (io->close_task));
g_source_unref (source);
}
break;
@@ -1428,6 +1434,11 @@ soup_client_message_io_http2_finished (SoupClientMessageIO *iface,
g_warn_if_reached ();
if (!g_hash_table_add (io->closed_messages, data))
g_warn_if_reached ();
+
+ if (io->conn)
+ soup_connection_set_in_use (io->conn, TRUE);
+
+ io_try_write (io, !io->async);
} else {
if (!g_hash_table_remove (io->messages, msg))
g_warn_if_reached ();
@@ -1438,12 +1449,8 @@ soup_client_message_io_http2_finished (SoupClientMessageIO *iface,
g_object_unref (msg);
- if (io->is_shutdown) {
+ if (io->is_shutdown)
soup_client_message_io_http2_terminate_session (io);
- return;
- }
-
- io_try_write (io, FALSE);
}
static void
@@ -1578,12 +1585,13 @@ io_run (SoupHTTP2MessageData *data,
GCancellable *cancellable,
GError **error)
{
+ SoupClientMessageIOHTTP2 *io = data->io;
gboolean progress = FALSE;
- if (data->state < STATE_WRITE_DONE && nghttp2_session_want_write (data->io->session))
- progress = io_write (data->io, TRUE, cancellable, error);
- else if (data->state < STATE_READ_DONE && nghttp2_session_want_read (data->io->session))
- progress = io_read (data->io, TRUE, cancellable, error);
+ if (data->state < STATE_WRITE_DONE && !io->in_callback && nghttp2_session_want_write (io->session))
+ progress = io_write (io, TRUE, cancellable, error);
+ else if (data->state < STATE_READ_DONE && !io->in_callback && nghttp2_session_want_read (io->session))
+ progress = io_read (io, TRUE, cancellable, error);
return progress;
}
@@ -1712,6 +1720,31 @@ soup_client_message_io_http2_run_until_read_async (SoupClientMessageIO *iface,
soup_http2_message_data_check_status (data);
}
+static void
+soup_client_message_io_http2_set_owner (SoupClientMessageIOHTTP2 *io,
+ GThread *owner)
+{
+ if (owner == io->owner)
+ return;
+
+ io->owner = owner;
+ g_assert (!io->write_source);
+ if (io->read_source) {
+ g_source_destroy (io->read_source);
+ g_source_unref (io->read_source);
+ io->read_source = NULL;
+ }
+
+ io->async = g_main_context_is_owner (g_main_context_get_thread_default ());
+ if (!io->async)
+ return;
+
+ io->read_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (io->istream), NULL);
+ g_source_set_name (io->read_source, "Soup HTTP/2 read source");
+ g_source_set_callback (io->read_source, (GSourceFunc)io_read_ready, io, NULL);
+ g_source_attach (io->read_source, g_main_context_get_thread_default ());
+}
+
static gboolean
soup_client_message_io_http2_close_async (SoupClientMessageIO *iface,
SoupConnection *conn,
@@ -1722,9 +1755,18 @@ soup_client_message_io_http2_close_async (SoupClientMessageIO *iface,
if (io->goaway_sent)
return FALSE;
- g_assert (!io->close_task);
- io->close_task = g_task_new (conn, NULL, callback, NULL);
+ soup_client_message_io_http2_set_owner (io, g_thread_self ());
+ if (io->async) {
+ g_assert (!io->close_task);
+ io->close_task = g_task_new (conn, NULL, callback, NULL);
+ }
+
soup_client_message_io_http2_terminate_session (io);
+ if (!io->async) {
+ g_assert (io->goaway_sent);
+ return FALSE;
+ }
+
return TRUE;
}
@@ -1755,6 +1797,14 @@ soup_client_message_io_http2_destroy (SoupClientMessageIO *iface)
g_free (io);
}
+static void
+soup_client_message_io_http2_owner_changed (SoupClientMessageIO *iface)
+{
+ SoupClientMessageIOHTTP2 *io = (SoupClientMessageIOHTTP2 *)iface;
+
+ soup_client_message_io_http2_set_owner (io, g_thread_self ());
+}
+
static const SoupClientMessageIOFuncs io_funcs = {
soup_client_message_io_http2_destroy,
soup_client_message_io_http2_finished,
@@ -1772,7 +1822,8 @@ static const SoupClientMessageIOFuncs io_funcs = {
soup_client_message_io_http2_is_open,
soup_client_message_io_http2_in_progress,
soup_client_message_io_http2_is_reusable,
- soup_client_message_io_http2_get_cancellable
+ soup_client_message_io_http2_get_cancellable,
+ soup_client_message_io_http2_owner_changed
};
G_GNUC_PRINTF(1, 0)
@@ -1840,10 +1891,7 @@ soup_client_message_io_http2_new (SoupConnection *conn)
io->ostream = g_io_stream_get_output_stream (io->stream);
io->connection_id = soup_connection_get_id (conn);
- io->read_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (io->istream), NULL);
- g_source_set_name (io->read_source, "Soup HTTP/2 read source");
- g_source_set_callback (io->read_source, (GSourceFunc)io_read_ready, io, NULL);
- g_source_attach (io->read_source, g_main_context_get_thread_default ());
+ soup_client_message_io_http2_set_owner (io, soup_connection_get_owner (conn));
NGCHECK (nghttp2_session_set_local_window_size (io->session, NGHTTP2_FLAG_NONE, 0, INITIAL_WINDOW_SIZE));
@@ -1853,7 +1901,7 @@ soup_client_message_io_http2_new (SoupConnection *conn)
{ NGHTTP2_SETTINGS_ENABLE_PUSH, 0 },
};
NGCHECK (nghttp2_submit_settings (io->session, NGHTTP2_FLAG_NONE, settings, G_N_ELEMENTS (settings)));
- io_try_write (io, FALSE);
+ io_try_write (io, !io->async);
return (SoupClientMessageIO *)io;
}