diff options
author | Carlos Garcia Campos <cgarcia@igalia.com> | 2021-05-24 16:10:53 +0200 |
---|---|---|
committer | Carlos Garcia Campos <cgarcia@igalia.com> | 2021-05-24 16:10:53 +0200 |
commit | e02749b4d3b3f26c300b537145c444130022c38a (patch) | |
tree | 997273ba94f600736f4bfaab917bca45171ce5a0 | |
parent | c83062d16b04bdcfeb6bb12859ce937ebaa69ba8 (diff) | |
download | libsoup-e02749b4d3b3f26c300b537145c444130022c38a.tar.gz |
io-http2: use the item cancellable for send data operations
And use the data source cancellable to create the io source to wait for
the send data operations.
-rw-r--r-- | libsoup/http2/soup-client-message-io-http2.c | 34 | ||||
-rw-r--r-- | tests/http2-test.c | 2 |
2 files changed, 18 insertions, 18 deletions
diff --git a/libsoup/http2/soup-client-message-io-http2.c b/libsoup/http2/soup-client-message-io-http2.c index 4eaf4d43..a70d41f1 100644 --- a/libsoup/http2/soup-client-message-io-http2.c +++ b/libsoup/http2/soup-client-message-io-http2.c @@ -562,6 +562,9 @@ on_data_readable (GInputStream *stream, { SoupHTTP2MessageData *data = (SoupHTTP2MessageData*)user_data; + g_cancellable_cancel (data->data_source_cancellable); + g_clear_object (&data->data_source_cancellable); + NGCHECK (nghttp2_session_resume_data (data->io->session, data->stream_id)); g_clear_pointer (&data->data_source_poll, g_source_unref); @@ -579,6 +582,9 @@ on_data_read (GInputStream *source, h2_debug (data->io, data, "[SEND_BODY] Read %zd", read); + g_cancellable_cancel (data->data_source_cancellable); + g_clear_object (&data->data_source_cancellable); + /* This operation may have outlived the message data in which case this will have been cancelled. */ if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { @@ -625,13 +631,6 @@ on_data_source_read_callback (nghttp2_session *session, SoupHTTP2MessageData *data = nghttp2_session_get_stream_user_data (session, stream_id); SoupClientMessageIOHTTP2 *io = get_io_data (data->msg); - /* This cancellable is only used for async data source operations, - * only exists while reading is happening, and will be cancelled - * at any point if the data is freed. - */ - if (!data->data_source_cancellable) - data->data_source_cancellable = g_cancellable_new (); - /* We support pollable streams in the best case because they * should perform better with one fewer copy of each buffer and no threading. */ if (G_IS_POLLABLE_INPUT_STREAM (source->ptr) && g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (source->ptr))) { @@ -650,12 +649,14 @@ on_data_source_read_callback (nghttp2_session *session, g_assert (data->data_source_poll == NULL); h2_debug (io, data, "[SEND_BODY] Polling"); - data->data_source_poll = g_pollable_input_stream_create_source (in_stream, data->data_source_cancellable); + data->data_source_poll = g_pollable_input_stream_create_source (in_stream, data->cancellable); g_source_set_callback (data->data_source_poll, (GSourceFunc)on_data_readable, data, NULL); g_source_set_priority (data->data_source_poll, get_data_io_priority (data)); g_source_attach (data->data_source_poll, g_main_context_get_thread_default ()); g_error_free (error); + g_assert (!data->data_source_cancellable); + data->data_source_cancellable = g_cancellable_new (); return NGHTTP2_ERR_DEFERRED; } @@ -678,7 +679,7 @@ on_data_source_read_callback (nghttp2_session *session, if (!data->data_source_buffer) data->data_source_buffer = g_byte_array_new (); - gsize buffer_len = data->data_source_buffer->len; + guint buffer_len = data->data_source_buffer->len; if (buffer_len) { h2_debug (io, data, "[SEND_BODY] Sending %zu", buffer_len); g_assert (buffer_len <= length); /* QUESTION: Maybe not reliable */ @@ -688,19 +689,19 @@ on_data_source_read_callback (nghttp2_session *session, return buffer_len; } else if (data->data_source_eof) { h2_debug (io, data, "[SEND_BODY] EOF"); - g_clear_object (&data->data_source_cancellable); *data_flags |= NGHTTP2_DATA_FLAG_EOF; return 0; } else if (data->data_source_error) { - g_clear_object (&data->data_source_cancellable); set_error_for_data (data, g_steal_pointer (&data->data_source_error)); return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; } else { h2_debug (io, data, "[SEND_BODY] Reading async"); g_byte_array_set_size (data->data_source_buffer, length); + g_assert (!data->data_source_cancellable); + data->data_source_cancellable = g_cancellable_new (); g_input_stream_read_async (in_stream, data->data_source_buffer->data, length, get_data_io_priority (data), - data->data_source_cancellable, + data->cancellable, (GAsyncReadyCallback)on_data_read, data); return NGHTTP2_ERR_DEFERRED; } @@ -754,10 +755,7 @@ soup_http2_message_data_free (SoupHTTP2MessageData *data) g_clear_error (&data->data_source_error); g_clear_pointer (&data->data_source_buffer, g_byte_array_unref); - if (data->data_source_cancellable) { - g_cancellable_cancel (data->data_source_cancellable); - g_clear_object (&data->data_source_cancellable); - } + g_clear_object (&data->data_source_cancellable); g_clear_error (&data->error); @@ -1024,6 +1022,8 @@ soup_client_message_io_http2_get_source (SoupMessage *msg, /* TODO: Handle mixing writes in? */ if (data->paused) base_source = cancellable ? g_cancellable_source_new (cancellable) : NULL; + else if (data->state < STATE_WRITE_DONE && data->data_source_cancellable) + base_source = g_cancellable_source_new (data->data_source_cancellable); else if (data->state < STATE_WRITE_DONE && nghttp2_session_want_write (io->session)) base_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM (io->ostream), cancellable); else if (data->state < STATE_READ_DONE && data->decoded_data_istream) @@ -1196,7 +1196,7 @@ io_run_until (SoupClientMessageIOHTTP2 *io, g_object_ref (msg); - while (progress && get_io_data (msg) == io && !data->paused && data->state < state) + while (progress && get_io_data (msg) == io && !data->paused && !data->data_source_cancellable && data->state < state) progress = io_run (data, blocking, cancellable, &my_error); if (my_error) { diff --git a/tests/http2-test.c b/tests/http2-test.c index 13e2e4c6..dd3214a6 100644 --- a/tests/http2-test.c +++ b/tests/http2-test.c @@ -314,7 +314,7 @@ do_post_blocked_async_test (Test *test, gconstpointer data) soup_body_input_stream_http2_add_data (SOUP_BODY_INPUT_STREAM_HTTP2 (in_stream), (guint8*)" Part 2", 8); soup_body_input_stream_http2_complete (SOUP_BODY_INPUT_STREAM_HTTP2 (in_stream)); } - g_main_context_iteration (async_context, FALSE); + g_main_context_iteration (async_context, TRUE); } g_assert_cmpstr (g_bytes_get_data (response, NULL), ==, "Part 1 - Part 2"); |