diff options
author | Carlos Garcia Campos <cgarcia@igalia.com> | 2022-04-18 15:53:01 +0200 |
---|---|---|
committer | Carlos Garcia Campos <cgarcia@igalia.com> | 2022-06-08 12:36:17 +0200 |
commit | e3333be150200eba59220ad1ff82c3e3507bf78c (patch) | |
tree | 9427978bec4c07c2d67cccbbb1d96616f2465acf /libsoup/http2 | |
parent | 522f8c5030da689d745558fb1fd02504d60d86ac (diff) | |
download | libsoup-e3333be150200eba59220ad1ff82c3e3507bf78c.tar.gz |
http2: make message IO thread safe
nghttp2 session can't be used by multiple threads at the same time, so
we need to ensure that only messages from the same thread share the
connection. Connections in idle state can be reused from other threads,
though but we need to ensure all the pending IO is completed before
switching to another thread. When the connection switches to IN_USE
state, the current thread becomes the owner of the connection IO. In the
case of HTTP/2 there might be session IO not related to a particular
message, in that case a thread with no default context is considered
synchronous and all IO that is not explicitly sync or async will be
sync.
Diffstat (limited to 'libsoup/http2')
-rw-r--r-- | libsoup/http2/soup-client-message-io-http2.c | 90 |
1 files changed, 69 insertions, 21 deletions
diff --git a/libsoup/http2/soup-client-message-io-http2.c b/libsoup/http2/soup-client-message-io-http2.c index 16e46b02..fe6aeaab 100644 --- a/libsoup/http2/soup-client-message-io-http2.c +++ b/libsoup/http2/soup-client-message-io-http2.c @@ -62,6 +62,8 @@ typedef enum { typedef struct { SoupClientMessageIO iface; + GThread *owner; + gboolean async; SoupConnection *conn; GIOStream *stream; GInputStream *istream; @@ -429,7 +431,7 @@ io_try_write (SoupClientMessageIOHTTP2 *io, io_write (io, blocking, NULL, &error); } - if (io->in_callback || g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { + if (!blocking && (io->in_callback || g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))) { g_clear_error (&error); io->write_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM (io->ostream), NULL); g_source_set_name (io->write_source, "Soup HTTP/2 write source"); @@ -546,7 +548,7 @@ soup_client_message_io_http2_terminate_session (SoupClientMessageIOHTTP2 *io) io->session_terminated = TRUE; NGCHECK (nghttp2_session_terminate_session (io->session, NGHTTP2_NO_ERROR)); - io_try_write (io, FALSE); + io_try_write (io, !io->async); } /* HTTP2 read callbacks */ @@ -908,7 +910,11 @@ on_frame_send_callback (nghttp2_session *session, break; case NGHTTP2_RST_STREAM: h2_debug (io, data, "[SEND] [RST_STREAM] stream_id=%u", frame->hd.stream_id); - g_hash_table_foreach_remove (io->closed_messages, (GHRFunc)remove_closed_stream, (gpointer)frame); + if (g_hash_table_foreach_remove (io->closed_messages, (GHRFunc)remove_closed_stream, (gpointer)frame)) { + if (io->conn) + soup_connection_set_in_use (io->conn, FALSE); + } + break; case NGHTTP2_GOAWAY: h2_debug (io, data, "[SEND] [%s]", frame_type_to_string (frame->hd.type)); @@ -920,7 +926,7 @@ on_frame_send_callback (nghttp2_session *session, source = g_idle_source_new (); g_source_set_name (source, "Soup HTTP/2 close source"); g_source_set_callback (source, (GSourceFunc)close_in_idle_cb, io, NULL); - g_source_attach (source, g_main_context_get_thread_default ()); + g_source_attach (source, g_task_get_context (io->close_task)); g_source_unref (source); } break; @@ -1428,6 +1434,11 @@ soup_client_message_io_http2_finished (SoupClientMessageIO *iface, g_warn_if_reached (); if (!g_hash_table_add (io->closed_messages, data)) g_warn_if_reached (); + + if (io->conn) + soup_connection_set_in_use (io->conn, TRUE); + + io_try_write (io, !io->async); } else { if (!g_hash_table_remove (io->messages, msg)) g_warn_if_reached (); @@ -1438,12 +1449,8 @@ soup_client_message_io_http2_finished (SoupClientMessageIO *iface, g_object_unref (msg); - if (io->is_shutdown) { + if (io->is_shutdown) soup_client_message_io_http2_terminate_session (io); - return; - } - - io_try_write (io, FALSE); } static void @@ -1578,12 +1585,13 @@ io_run (SoupHTTP2MessageData *data, GCancellable *cancellable, GError **error) { + SoupClientMessageIOHTTP2 *io = data->io; gboolean progress = FALSE; - if (data->state < STATE_WRITE_DONE && nghttp2_session_want_write (data->io->session)) - progress = io_write (data->io, TRUE, cancellable, error); - else if (data->state < STATE_READ_DONE && nghttp2_session_want_read (data->io->session)) - progress = io_read (data->io, TRUE, cancellable, error); + if (data->state < STATE_WRITE_DONE && !io->in_callback && nghttp2_session_want_write (io->session)) + progress = io_write (io, TRUE, cancellable, error); + else if (data->state < STATE_READ_DONE && !io->in_callback && nghttp2_session_want_read (io->session)) + progress = io_read (io, TRUE, cancellable, error); return progress; } @@ -1712,6 +1720,31 @@ soup_client_message_io_http2_run_until_read_async (SoupClientMessageIO *iface, soup_http2_message_data_check_status (data); } +static void +soup_client_message_io_http2_set_owner (SoupClientMessageIOHTTP2 *io, + GThread *owner) +{ + if (owner == io->owner) + return; + + io->owner = owner; + g_assert (!io->write_source); + if (io->read_source) { + g_source_destroy (io->read_source); + g_source_unref (io->read_source); + io->read_source = NULL; + } + + io->async = g_main_context_is_owner (g_main_context_get_thread_default ()); + if (!io->async) + return; + + io->read_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (io->istream), NULL); + g_source_set_name (io->read_source, "Soup HTTP/2 read source"); + g_source_set_callback (io->read_source, (GSourceFunc)io_read_ready, io, NULL); + g_source_attach (io->read_source, g_main_context_get_thread_default ()); +} + static gboolean soup_client_message_io_http2_close_async (SoupClientMessageIO *iface, SoupConnection *conn, @@ -1722,9 +1755,18 @@ soup_client_message_io_http2_close_async (SoupClientMessageIO *iface, if (io->goaway_sent) return FALSE; - g_assert (!io->close_task); - io->close_task = g_task_new (conn, NULL, callback, NULL); + soup_client_message_io_http2_set_owner (io, g_thread_self ()); + if (io->async) { + g_assert (!io->close_task); + io->close_task = g_task_new (conn, NULL, callback, NULL); + } + soup_client_message_io_http2_terminate_session (io); + if (!io->async) { + g_assert (io->goaway_sent); + return FALSE; + } + return TRUE; } @@ -1755,6 +1797,14 @@ soup_client_message_io_http2_destroy (SoupClientMessageIO *iface) g_free (io); } +static void +soup_client_message_io_http2_owner_changed (SoupClientMessageIO *iface) +{ + SoupClientMessageIOHTTP2 *io = (SoupClientMessageIOHTTP2 *)iface; + + soup_client_message_io_http2_set_owner (io, g_thread_self ()); +} + static const SoupClientMessageIOFuncs io_funcs = { soup_client_message_io_http2_destroy, soup_client_message_io_http2_finished, @@ -1772,7 +1822,8 @@ static const SoupClientMessageIOFuncs io_funcs = { soup_client_message_io_http2_is_open, soup_client_message_io_http2_in_progress, soup_client_message_io_http2_is_reusable, - soup_client_message_io_http2_get_cancellable + soup_client_message_io_http2_get_cancellable, + soup_client_message_io_http2_owner_changed }; G_GNUC_PRINTF(1, 0) @@ -1840,10 +1891,7 @@ soup_client_message_io_http2_new (SoupConnection *conn) io->ostream = g_io_stream_get_output_stream (io->stream); io->connection_id = soup_connection_get_id (conn); - io->read_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (io->istream), NULL); - g_source_set_name (io->read_source, "Soup HTTP/2 read source"); - g_source_set_callback (io->read_source, (GSourceFunc)io_read_ready, io, NULL); - g_source_attach (io->read_source, g_main_context_get_thread_default ()); + soup_client_message_io_http2_set_owner (io, soup_connection_get_owner (conn)); NGCHECK (nghttp2_session_set_local_window_size (io->session, NGHTTP2_FLAG_NONE, 0, INITIAL_WINDOW_SIZE)); @@ -1853,7 +1901,7 @@ soup_client_message_io_http2_new (SoupConnection *conn) { NGHTTP2_SETTINGS_ENABLE_PUSH, 0 }, }; NGCHECK (nghttp2_submit_settings (io->session, NGHTTP2_FLAG_NONE, settings, G_N_ELEMENTS (settings))); - io_try_write (io, FALSE); + io_try_write (io, !io->async); return (SoupClientMessageIO *)io; } |