diff options
Diffstat (limited to 'libsoup/http2')
-rw-r--r-- | libsoup/http2/soup-client-message-io-http2.c | 106 |
1 files changed, 65 insertions, 41 deletions
diff --git a/libsoup/http2/soup-client-message-io-http2.c b/libsoup/http2/soup-client-message-io-http2.c index 15794c89..686ab864 100644 --- a/libsoup/http2/soup-client-message-io-http2.c +++ b/libsoup/http2/soup-client-message-io-http2.c @@ -94,6 +94,8 @@ typedef struct { GCancellable *cancellable; GInputStream *decoded_data_istream; GInputStream *body_istream; + GTask *task; + gboolean in_run_until_read_async; /* Request body logger */ SoupLogger *logger; @@ -120,10 +122,12 @@ typedef struct { gboolean can_be_restarted; } SoupHTTP2MessageData; +static void io_run_until_read_async (SoupHTTP2MessageData *data); static gboolean io_read (SoupClientMessageIOHTTP2 *, gboolean, GCancellable *, GError **); static void io_write_until_stream_reset_is_sent (SoupHTTP2MessageData *data); static void io_idle_read (SoupClientMessageIOHTTP2 *io); static void io_close (SoupClientMessageIOHTTP2 *io); +static void io_poll (SoupHTTP2MessageData *data); static void NGCHECK (int return_code) @@ -592,10 +596,23 @@ on_stream_close_callback (nghttp2_session *session, void *user_data) { SoupHTTP2MessageData *data = nghttp2_session_get_stream_user_data (session, stream_id); + h2_debug (user_data, data, "[SESSION] Closed: %s", nghttp2_http2_strerror (error_code)); - if (error_code == NGHTTP2_REFUSED_STREAM && data && data->state < STATE_READ_DATA) + if (!data) + return 0; + + if (error_code == NGHTTP2_REFUSED_STREAM && data->state < STATE_READ_DATA) data->can_be_restarted = TRUE; + if (data->state < STATE_READ_DATA && !data->in_run_until_read_async) { + /* Start polling the decoded data stream instead of the network input stream. */ + if (data->io_source) { + g_source_destroy (data->io_source); + g_clear_pointer (&data->io_source, g_source_unref); + } + io_poll (data); + } + return 0; } @@ -1114,14 +1131,27 @@ message_source_check (GSource *source) return FALSE; } -static GSource * -soup_client_message_io_http2_get_source (SoupHTTP2MessageData *data, - SoupMessage *msg, - GCancellable *cancellable, - SoupMessageIOSourceFunc callback, - gpointer user_data) +static gboolean +io_poll_ready (SoupMessage *msg, + gpointer user_data) +{ + SoupHTTP2MessageData *data = user_data; + + io_run_until_read_async (data); + + return G_SOURCE_REMOVE; +} + +static void +io_poll (SoupHTTP2MessageData *data) { GSource *base_source; + GCancellable *cancellable; + + g_assert (data->task); + g_assert (!data->io_source); + + cancellable = g_task_get_cancellable (data->task); /* TODO: Handle mixing writes in? */ if (data->paused) @@ -1139,11 +1169,14 @@ soup_client_message_io_http2_get_source (SoupHTTP2MessageData *data, base_source = g_timeout_source_new (0); } - GSource *source = soup_message_io_source_new (base_source, G_OBJECT (msg), data->paused, message_source_check); - g_source_set_callback (source, (GSourceFunc)callback, user_data, NULL); - return source; + data->io_source = soup_message_io_source_new (base_source, G_OBJECT (data->msg), + data->paused, message_source_check); + g_source_set_callback (data->io_source, (GSourceFunc)io_poll_ready, data, NULL); + g_source_set_priority (data->io_source, g_task_get_priority (data->task)); + g_source_attach (data->io_source, data->io->async_context); } + static void client_stream_eof (SoupClientInputStream *stream, gpointer user_data) @@ -1461,26 +1494,11 @@ soup_client_message_io_http2_run (SoupClientMessageIO *iface, g_assert_not_reached (); } -static void io_run_until_read_async (SoupMessage *msg, - GTask *task); - -static gboolean -io_run_until_read_ready (SoupMessage *msg, - gpointer user_data) -{ - GTask *task = user_data; - - io_run_until_read_async (msg, task); - - return G_SOURCE_REMOVE; -} - static void -io_run_until_read_async (SoupMessage *msg, - GTask *task) +io_run_until_read_async (SoupHTTP2MessageData *data) { - SoupClientMessageIOHTTP2 *io = get_io_data (msg); - SoupHTTP2MessageData *data = get_data_for_message (io, msg); + SoupClientMessageIOHTTP2 *io = data->io; + GTask *task = data->task; GError *error = NULL; if (data->io_source) { @@ -1488,10 +1506,15 @@ io_run_until_read_async (SoupMessage *msg, g_clear_pointer (&data->io_source, g_source_unref); } - if (io_run_until (io, msg, FALSE, + data->in_run_until_read_async = TRUE; + + if (io_run_until (io, data->msg, FALSE, STATE_READ_DATA, g_task_get_cancellable (task), &error)) { + data->task = NULL; + data->in_run_until_read_async = FALSE; + g_task_return_boolean (task, TRUE); g_object_unref (task); return; @@ -1499,23 +1522,23 @@ io_run_until_read_async (SoupMessage *msg, if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { g_error_free (error); - data->io_source = soup_client_message_io_http2_get_source (data, msg, g_task_get_cancellable (task), - (SoupMessageIOSourceFunc)io_run_until_read_ready, - task); - g_source_set_priority (data->io_source, g_task_get_priority (task)); - g_source_attach (data->io_source, io->async_context); + io_poll (data); + data->in_run_until_read_async = FALSE; return; } - if (get_io_data (msg) == io) { + if (get_io_data (data->msg) == io) { if (data->can_be_restarted) data->item->state = SOUP_MESSAGE_RESTARTING; else - soup_message_set_metrics_timestamp (msg, SOUP_MESSAGE_METRICS_RESPONSE_END); + soup_message_set_metrics_timestamp (data->msg, SOUP_MESSAGE_METRICS_RESPONSE_END); - soup_client_message_io_http2_finished ((SoupClientMessageIO *)io, msg); + soup_client_message_io_http2_finished ((SoupClientMessageIO *)data->io, data->msg); } + data->task = NULL; + data->in_run_until_read_async = FALSE; + g_task_return_error (task, error); g_object_unref (task); } @@ -1528,11 +1551,12 @@ soup_client_message_io_http2_run_until_read_async (SoupClientMessageIO *iface, GAsyncReadyCallback callback, gpointer user_data) { - GTask *task; + SoupClientMessageIOHTTP2 *io = (SoupClientMessageIOHTTP2 *)iface; + SoupHTTP2MessageData *data = get_data_for_message (io, msg); - task = g_task_new (msg, cancellable, callback, user_data); - g_task_set_priority (task, io_priority); - io_run_until_read_async (msg, task); + data->task = g_task_new (msg, cancellable, callback, user_data); + g_task_set_priority (data->task, io_priority); + io_run_until_read_async (data); } static gboolean |