summaryrefslogtreecommitdiff
path: root/libsoup/http2
diff options
context:
space:
mode:
Diffstat (limited to 'libsoup/http2')
-rw-r--r--libsoup/http2/soup-client-message-io-http2.c106
1 files changed, 65 insertions, 41 deletions
diff --git a/libsoup/http2/soup-client-message-io-http2.c b/libsoup/http2/soup-client-message-io-http2.c
index 15794c89..686ab864 100644
--- a/libsoup/http2/soup-client-message-io-http2.c
+++ b/libsoup/http2/soup-client-message-io-http2.c
@@ -94,6 +94,8 @@ typedef struct {
GCancellable *cancellable;
GInputStream *decoded_data_istream;
GInputStream *body_istream;
+ GTask *task;
+ gboolean in_run_until_read_async;
/* Request body logger */
SoupLogger *logger;
@@ -120,10 +122,12 @@ typedef struct {
gboolean can_be_restarted;
} SoupHTTP2MessageData;
+static void io_run_until_read_async (SoupHTTP2MessageData *data);
static gboolean io_read (SoupClientMessageIOHTTP2 *, gboolean, GCancellable *, GError **);
static void io_write_until_stream_reset_is_sent (SoupHTTP2MessageData *data);
static void io_idle_read (SoupClientMessageIOHTTP2 *io);
static void io_close (SoupClientMessageIOHTTP2 *io);
+static void io_poll (SoupHTTP2MessageData *data);
static void
NGCHECK (int return_code)
@@ -592,10 +596,23 @@ on_stream_close_callback (nghttp2_session *session,
void *user_data)
{
SoupHTTP2MessageData *data = nghttp2_session_get_stream_user_data (session, stream_id);
+
h2_debug (user_data, data, "[SESSION] Closed: %s", nghttp2_http2_strerror (error_code));
- if (error_code == NGHTTP2_REFUSED_STREAM && data && data->state < STATE_READ_DATA)
+ if (!data)
+ return 0;
+
+ if (error_code == NGHTTP2_REFUSED_STREAM && data->state < STATE_READ_DATA)
data->can_be_restarted = TRUE;
+ if (data->state < STATE_READ_DATA && !data->in_run_until_read_async) {
+ /* Start polling the decoded data stream instead of the network input stream. */
+ if (data->io_source) {
+ g_source_destroy (data->io_source);
+ g_clear_pointer (&data->io_source, g_source_unref);
+ }
+ io_poll (data);
+ }
+
return 0;
}
@@ -1114,14 +1131,27 @@ message_source_check (GSource *source)
return FALSE;
}
-static GSource *
-soup_client_message_io_http2_get_source (SoupHTTP2MessageData *data,
- SoupMessage *msg,
- GCancellable *cancellable,
- SoupMessageIOSourceFunc callback,
- gpointer user_data)
+static gboolean
+io_poll_ready (SoupMessage *msg,
+ gpointer user_data)
+{
+ SoupHTTP2MessageData *data = user_data;
+
+ io_run_until_read_async (data);
+
+ return G_SOURCE_REMOVE;
+}
+
+static void
+io_poll (SoupHTTP2MessageData *data)
{
GSource *base_source;
+ GCancellable *cancellable;
+
+ g_assert (data->task);
+ g_assert (!data->io_source);
+
+ cancellable = g_task_get_cancellable (data->task);
/* TODO: Handle mixing writes in? */
if (data->paused)
@@ -1139,11 +1169,14 @@ soup_client_message_io_http2_get_source (SoupHTTP2MessageData *data,
base_source = g_timeout_source_new (0);
}
- GSource *source = soup_message_io_source_new (base_source, G_OBJECT (msg), data->paused, message_source_check);
- g_source_set_callback (source, (GSourceFunc)callback, user_data, NULL);
- return source;
+ data->io_source = soup_message_io_source_new (base_source, G_OBJECT (data->msg),
+ data->paused, message_source_check);
+ g_source_set_callback (data->io_source, (GSourceFunc)io_poll_ready, data, NULL);
+ g_source_set_priority (data->io_source, g_task_get_priority (data->task));
+ g_source_attach (data->io_source, data->io->async_context);
}
+
static void
client_stream_eof (SoupClientInputStream *stream,
gpointer user_data)
@@ -1461,26 +1494,11 @@ soup_client_message_io_http2_run (SoupClientMessageIO *iface,
g_assert_not_reached ();
}
-static void io_run_until_read_async (SoupMessage *msg,
- GTask *task);
-
-static gboolean
-io_run_until_read_ready (SoupMessage *msg,
- gpointer user_data)
-{
- GTask *task = user_data;
-
- io_run_until_read_async (msg, task);
-
- return G_SOURCE_REMOVE;
-}
-
static void
-io_run_until_read_async (SoupMessage *msg,
- GTask *task)
+io_run_until_read_async (SoupHTTP2MessageData *data)
{
- SoupClientMessageIOHTTP2 *io = get_io_data (msg);
- SoupHTTP2MessageData *data = get_data_for_message (io, msg);
+ SoupClientMessageIOHTTP2 *io = data->io;
+ GTask *task = data->task;
GError *error = NULL;
if (data->io_source) {
@@ -1488,10 +1506,15 @@ io_run_until_read_async (SoupMessage *msg,
g_clear_pointer (&data->io_source, g_source_unref);
}
- if (io_run_until (io, msg, FALSE,
+ data->in_run_until_read_async = TRUE;
+
+ if (io_run_until (io, data->msg, FALSE,
STATE_READ_DATA,
g_task_get_cancellable (task),
&error)) {
+ data->task = NULL;
+ data->in_run_until_read_async = FALSE;
+
g_task_return_boolean (task, TRUE);
g_object_unref (task);
return;
@@ -1499,23 +1522,23 @@ io_run_until_read_async (SoupMessage *msg,
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
g_error_free (error);
- data->io_source = soup_client_message_io_http2_get_source (data, msg, g_task_get_cancellable (task),
- (SoupMessageIOSourceFunc)io_run_until_read_ready,
- task);
- g_source_set_priority (data->io_source, g_task_get_priority (task));
- g_source_attach (data->io_source, io->async_context);
+ io_poll (data);
+ data->in_run_until_read_async = FALSE;
return;
}
- if (get_io_data (msg) == io) {
+ if (get_io_data (data->msg) == io) {
if (data->can_be_restarted)
data->item->state = SOUP_MESSAGE_RESTARTING;
else
- soup_message_set_metrics_timestamp (msg, SOUP_MESSAGE_METRICS_RESPONSE_END);
+ soup_message_set_metrics_timestamp (data->msg, SOUP_MESSAGE_METRICS_RESPONSE_END);
- soup_client_message_io_http2_finished ((SoupClientMessageIO *)io, msg);
+ soup_client_message_io_http2_finished ((SoupClientMessageIO *)data->io, data->msg);
}
+ data->task = NULL;
+ data->in_run_until_read_async = FALSE;
+
g_task_return_error (task, error);
g_object_unref (task);
}
@@ -1528,11 +1551,12 @@ soup_client_message_io_http2_run_until_read_async (SoupClientMessageIO *iface,
GAsyncReadyCallback callback,
gpointer user_data)
{
- GTask *task;
+ SoupClientMessageIOHTTP2 *io = (SoupClientMessageIOHTTP2 *)iface;
+ SoupHTTP2MessageData *data = get_data_for_message (io, msg);
- task = g_task_new (msg, cancellable, callback, user_data);
- g_task_set_priority (task, io_priority);
- io_run_until_read_async (msg, task);
+ data->task = g_task_new (msg, cancellable, callback, user_data);
+ g_task_set_priority (data->task, io_priority);
+ io_run_until_read_async (data);
}
static gboolean