summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCarlos Garcia Campos <cgarcia@igalia.com>2021-05-24 16:10:53 +0200
committerCarlos Garcia Campos <cgarcia@igalia.com>2021-05-24 16:10:53 +0200
commite02749b4d3b3f26c300b537145c444130022c38a (patch)
tree997273ba94f600736f4bfaab917bca45171ce5a0
parentc83062d16b04bdcfeb6bb12859ce937ebaa69ba8 (diff)
downloadlibsoup-e02749b4d3b3f26c300b537145c444130022c38a.tar.gz
io-http2: use the item cancellable for send data operations
And use the data source cancellable to create the io source to wait for the send data operations.
-rw-r--r--libsoup/http2/soup-client-message-io-http2.c34
-rw-r--r--tests/http2-test.c2
2 files changed, 18 insertions, 18 deletions
diff --git a/libsoup/http2/soup-client-message-io-http2.c b/libsoup/http2/soup-client-message-io-http2.c
index 4eaf4d43..a70d41f1 100644
--- a/libsoup/http2/soup-client-message-io-http2.c
+++ b/libsoup/http2/soup-client-message-io-http2.c
@@ -562,6 +562,9 @@ on_data_readable (GInputStream *stream,
{
SoupHTTP2MessageData *data = (SoupHTTP2MessageData*)user_data;
+ g_cancellable_cancel (data->data_source_cancellable);
+ g_clear_object (&data->data_source_cancellable);
+
NGCHECK (nghttp2_session_resume_data (data->io->session, data->stream_id));
g_clear_pointer (&data->data_source_poll, g_source_unref);
@@ -579,6 +582,9 @@ on_data_read (GInputStream *source,
h2_debug (data->io, data, "[SEND_BODY] Read %zd", read);
+ g_cancellable_cancel (data->data_source_cancellable);
+ g_clear_object (&data->data_source_cancellable);
+
/* This operation may have outlived the message data in which
case this will have been cancelled. */
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
@@ -625,13 +631,6 @@ on_data_source_read_callback (nghttp2_session *session,
SoupHTTP2MessageData *data = nghttp2_session_get_stream_user_data (session, stream_id);
SoupClientMessageIOHTTP2 *io = get_io_data (data->msg);
- /* This cancellable is only used for async data source operations,
- * only exists while reading is happening, and will be cancelled
- * at any point if the data is freed.
- */
- if (!data->data_source_cancellable)
- data->data_source_cancellable = g_cancellable_new ();
-
/* We support pollable streams in the best case because they
* should perform better with one fewer copy of each buffer and no threading. */
if (G_IS_POLLABLE_INPUT_STREAM (source->ptr) && g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (source->ptr))) {
@@ -650,12 +649,14 @@ on_data_source_read_callback (nghttp2_session *session,
g_assert (data->data_source_poll == NULL);
h2_debug (io, data, "[SEND_BODY] Polling");
- data->data_source_poll = g_pollable_input_stream_create_source (in_stream, data->data_source_cancellable);
+ data->data_source_poll = g_pollable_input_stream_create_source (in_stream, data->cancellable);
g_source_set_callback (data->data_source_poll, (GSourceFunc)on_data_readable, data, NULL);
g_source_set_priority (data->data_source_poll, get_data_io_priority (data));
g_source_attach (data->data_source_poll, g_main_context_get_thread_default ());
g_error_free (error);
+ g_assert (!data->data_source_cancellable);
+ data->data_source_cancellable = g_cancellable_new ();
return NGHTTP2_ERR_DEFERRED;
}
@@ -678,7 +679,7 @@ on_data_source_read_callback (nghttp2_session *session,
if (!data->data_source_buffer)
data->data_source_buffer = g_byte_array_new ();
- gsize buffer_len = data->data_source_buffer->len;
+ guint buffer_len = data->data_source_buffer->len;
if (buffer_len) {
h2_debug (io, data, "[SEND_BODY] Sending %zu", buffer_len);
g_assert (buffer_len <= length); /* QUESTION: Maybe not reliable */
@@ -688,19 +689,19 @@ on_data_source_read_callback (nghttp2_session *session,
return buffer_len;
} else if (data->data_source_eof) {
h2_debug (io, data, "[SEND_BODY] EOF");
- g_clear_object (&data->data_source_cancellable);
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
return 0;
} else if (data->data_source_error) {
- g_clear_object (&data->data_source_cancellable);
set_error_for_data (data, g_steal_pointer (&data->data_source_error));
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
} else {
h2_debug (io, data, "[SEND_BODY] Reading async");
g_byte_array_set_size (data->data_source_buffer, length);
+ g_assert (!data->data_source_cancellable);
+ data->data_source_cancellable = g_cancellable_new ();
g_input_stream_read_async (in_stream, data->data_source_buffer->data, length,
get_data_io_priority (data),
- data->data_source_cancellable,
+ data->cancellable,
(GAsyncReadyCallback)on_data_read, data);
return NGHTTP2_ERR_DEFERRED;
}
@@ -754,10 +755,7 @@ soup_http2_message_data_free (SoupHTTP2MessageData *data)
g_clear_error (&data->data_source_error);
g_clear_pointer (&data->data_source_buffer, g_byte_array_unref);
- if (data->data_source_cancellable) {
- g_cancellable_cancel (data->data_source_cancellable);
- g_clear_object (&data->data_source_cancellable);
- }
+ g_clear_object (&data->data_source_cancellable);
g_clear_error (&data->error);
@@ -1024,6 +1022,8 @@ soup_client_message_io_http2_get_source (SoupMessage *msg,
/* TODO: Handle mixing writes in? */
if (data->paused)
base_source = cancellable ? g_cancellable_source_new (cancellable) : NULL;
+ else if (data->state < STATE_WRITE_DONE && data->data_source_cancellable)
+ base_source = g_cancellable_source_new (data->data_source_cancellable);
else if (data->state < STATE_WRITE_DONE && nghttp2_session_want_write (io->session))
base_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM (io->ostream), cancellable);
else if (data->state < STATE_READ_DONE && data->decoded_data_istream)
@@ -1196,7 +1196,7 @@ io_run_until (SoupClientMessageIOHTTP2 *io,
g_object_ref (msg);
- while (progress && get_io_data (msg) == io && !data->paused && data->state < state)
+ while (progress && get_io_data (msg) == io && !data->paused && !data->data_source_cancellable && data->state < state)
progress = io_run (data, blocking, cancellable, &my_error);
if (my_error) {
diff --git a/tests/http2-test.c b/tests/http2-test.c
index 13e2e4c6..dd3214a6 100644
--- a/tests/http2-test.c
+++ b/tests/http2-test.c
@@ -314,7 +314,7 @@ do_post_blocked_async_test (Test *test, gconstpointer data)
soup_body_input_stream_http2_add_data (SOUP_BODY_INPUT_STREAM_HTTP2 (in_stream), (guint8*)" Part 2", 8);
soup_body_input_stream_http2_complete (SOUP_BODY_INPUT_STREAM_HTTP2 (in_stream));
}
- g_main_context_iteration (async_context, FALSE);
+ g_main_context_iteration (async_context, TRUE);
}
g_assert_cmpstr (g_bytes_get_data (response, NULL), ==, "Part 1 - Part 2");