diff options
author | Carlos Garcia Campos <cgarcia@igalia.com> | 2020-10-06 14:52:06 +0200 |
---|---|---|
committer | Carlos Garcia Campos <cgarcia@igalia.com> | 2020-10-08 15:18:45 +0200 |
commit | 2cebb238316ceac9499be0e0941640df0bd1c51e (patch) | |
tree | 5807ab35f808a6ea507c0a6c331b8fe3931503a1 /libsoup/soup-message-io.c | |
parent | 150efc63d238a47879748d77fb9261841c9174e5 (diff) | |
download | libsoup-2cebb238316ceac9499be0e0941640df0bd1c51e.tar.gz |
message: add support for stream based request body
Add soup_message_set_request_body() that takes a GInputStream and
soup_message_set_request_body_from_bytes() for convenience that creates
a GMemoryInputStream for the given bytes.
Diffstat (limited to 'libsoup/soup-message-io.c')
-rw-r--r-- | libsoup/soup-message-io.c | 118 |
1 files changed, 96 insertions, 22 deletions
diff --git a/libsoup/soup-message-io.c b/libsoup/soup-message-io.c index 1d85daa8..cfbba0f7 100644 --- a/libsoup/soup-message-io.c +++ b/libsoup/soup-message-io.c @@ -85,8 +85,8 @@ typedef struct { GSource *unpause_source; gboolean paused; - GCancellable *async_close_wait; - GError *async_close_error; + GCancellable *async_wait; + GError *async_error; SoupMessageGetHeadersFn get_headers_cb; SoupMessageParseHeadersFn parse_headers_cb; @@ -130,11 +130,11 @@ soup_message_io_cleanup (SoupMessage *msg) g_string_free (io->write_buf, TRUE); g_clear_pointer (&io->write_chunk, g_bytes_unref); - if (io->async_close_wait) { - g_cancellable_cancel (io->async_close_wait); - g_clear_object (&io->async_close_wait); + if (io->async_wait) { + g_cancellable_cancel (io->async_wait); + g_clear_object (&io->async_wait); } - g_clear_error (&io->async_close_error); + g_clear_error (&io->async_error); g_slice_free (SoupMessageIOData, io); } @@ -322,6 +322,50 @@ soup_message_setup_body_istream (GInputStream *body_stream, } static void +request_body_stream_wrote_data_cb (SoupMessage *msg, + guint count) +{ + GBytes *chunk; + + /* FIXME: Change SoupMessage::wrote-body-data to pass just the size */ + chunk = g_bytes_new_static ("", count); + soup_message_wrote_body_data (msg, chunk); + g_bytes_unref (chunk); +} + +static void +request_body_stream_wrote_cb (GOutputStream *ostream, + GAsyncResult *result, + SoupMessage *msg) +{ + SoupMessageIOData *io; + gssize nwrote; + GCancellable *async_wait; + GError *error = NULL; + + nwrote = g_output_stream_splice_finish (ostream, result, &error); + + io = soup_message_get_io_data (msg); + if (!io || !io->async_wait || io->body_ostream != ostream) { + g_clear_error (&error); + g_object_unref (msg); + return; + } + + if (nwrote != -1) + io->write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH; + + if (error) + g_propagate_error (&io->async_error, error); + async_wait = io->async_wait; + io->async_wait = NULL; + g_cancellable_cancel (async_wait); + g_object_unref (async_wait); + + g_object_unref (msg); +} + +static void closed_async (GObject *source, GAsyncResult *result, gpointer user_data) @@ -329,21 +373,21 @@ closed_async (GObject *source, GOutputStream *body_ostream = G_OUTPUT_STREAM (source); SoupMessage *msg = user_data; SoupMessageIOData *io; - GCancellable *async_close_wait; + GCancellable *async_wait; io = soup_message_get_io_data (msg); - if (!io || !io->async_close_wait || io->body_ostream != body_ostream) { + if (!io || !io->async_wait || io->body_ostream != body_ostream) { g_object_unref (msg); return; } - g_output_stream_close_finish (body_ostream, result, &io->async_close_error); + g_output_stream_close_finish (body_ostream, result, &io->async_error); g_clear_object (&io->body_ostream); - async_close_wait = io->async_close_wait; - io->async_close_wait = NULL; - g_cancellable_cancel (async_close_wait); - g_object_unref (async_close_wait); + async_wait = io->async_wait; + io->async_wait = NULL; + g_cancellable_cancel (async_wait); + g_object_unref (async_wait); g_object_unref (msg); } @@ -388,11 +432,11 @@ io_write (SoupMessage *msg, gboolean blocking, GBytes *chunk; gssize nwrote; - if (io->async_close_error) { - g_propagate_error (error, io->async_close_error); - io->async_close_error = NULL; + if (io->async_error) { + g_propagate_error (error, io->async_error); + io->async_error = NULL; return FALSE; - } else if (io->async_close_wait) { + } else if (io->async_wait) { g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, _("Operation would block")); @@ -503,6 +547,36 @@ io_write (SoupMessage *msg, gboolean blocking, break; } + if (io->mode == SOUP_MESSAGE_IO_CLIENT && msg->request_body_stream) { + g_signal_connect_object (io->body_ostream, + "wrote-data", + G_CALLBACK (request_body_stream_wrote_data_cb), + msg, G_CONNECT_SWAPPED); + if (blocking) { + nwrote = g_output_stream_splice (io->body_ostream, + msg->request_body_stream, + G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE, + cancellable, + error); + if (nwrote == -1) + return FALSE; + io->write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH; + break; + } else { + io->async_wait = g_cancellable_new (); + g_main_context_push_thread_default (io->async_context); + g_output_stream_splice_async (io->body_ostream, + msg->request_body_stream, + G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE, + G_PRIORITY_DEFAULT, + cancellable, + (GAsyncReadyCallback)request_body_stream_wrote_cb, + g_object_ref (msg)); + g_main_context_pop_thread_default (io->async_context); + return FALSE; + } + } + if (!io->write_chunk) { io->write_chunk = soup_message_body_get_chunk (io->write_body, io->write_body_offset); if (!io->write_chunk) { @@ -562,7 +636,7 @@ io_write (SoupMessage *msg, gboolean blocking, return FALSE; g_clear_object (&io->body_ostream); } else { - io->async_close_wait = g_cancellable_new (); + io->async_wait = g_cancellable_new (); g_main_context_push_thread_default (io->async_context); g_output_stream_close_async (io->body_ostream, G_PRIORITY_DEFAULT, cancellable, @@ -883,8 +957,8 @@ soup_message_io_get_source (SoupMessage *msg, GCancellable *cancellable, base_source = g_timeout_source_new (0); } else if (io->paused) { base_source = NULL; - } else if (io->async_close_wait) { - base_source = g_cancellable_source_new (io->async_close_wait); + } else if (io->async_wait) { + base_source = g_cancellable_source_new (io->async_wait); } else if (SOUP_MESSAGE_IO_STATE_POLLABLE (io->read_state)) { GPollableInputStream *istream; @@ -958,7 +1032,7 @@ io_run_until (SoupMessage *msg, gboolean blocking, g_object_ref (msg); - while (progress && soup_message_get_io_data (msg) == io && !io->paused && !io->async_close_wait && + while (progress && soup_message_get_io_data (msg) == io && !io->paused && !io->async_wait && (io->read_state < read_state || io->write_state < write_state)) { if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state)) @@ -988,7 +1062,7 @@ io_run_until (SoupMessage *msg, gboolean blocking, _("Operation was cancelled")); g_object_unref (msg); return FALSE; - } else if (!io->async_close_wait && + } else if (!io->async_wait && g_cancellable_set_error_if_cancelled (cancellable, error)) { g_object_unref (msg); return FALSE; |