summaryrefslogtreecommitdiff
path: root/libsoup/soup-message-io.c
diff options
context:
space:
mode:
authorCarlos Garcia Campos <cgarcia@igalia.com>2020-10-06 14:52:06 +0200
committerCarlos Garcia Campos <cgarcia@igalia.com>2020-10-08 15:18:45 +0200
commit2cebb238316ceac9499be0e0941640df0bd1c51e (patch)
tree5807ab35f808a6ea507c0a6c331b8fe3931503a1 /libsoup/soup-message-io.c
parent150efc63d238a47879748d77fb9261841c9174e5 (diff)
downloadlibsoup-2cebb238316ceac9499be0e0941640df0bd1c51e.tar.gz
message: add support for stream based request body
Add soup_message_set_request_body() that takes a GInputStream and soup_message_set_request_body_from_bytes() for convenience that creates a GMemoryInputStream for the given bytes.
Diffstat (limited to 'libsoup/soup-message-io.c')
-rw-r--r--libsoup/soup-message-io.c118
1 files changed, 96 insertions, 22 deletions
diff --git a/libsoup/soup-message-io.c b/libsoup/soup-message-io.c
index 1d85daa8..cfbba0f7 100644
--- a/libsoup/soup-message-io.c
+++ b/libsoup/soup-message-io.c
@@ -85,8 +85,8 @@ typedef struct {
GSource *unpause_source;
gboolean paused;
- GCancellable *async_close_wait;
- GError *async_close_error;
+ GCancellable *async_wait;
+ GError *async_error;
SoupMessageGetHeadersFn get_headers_cb;
SoupMessageParseHeadersFn parse_headers_cb;
@@ -130,11 +130,11 @@ soup_message_io_cleanup (SoupMessage *msg)
g_string_free (io->write_buf, TRUE);
g_clear_pointer (&io->write_chunk, g_bytes_unref);
- if (io->async_close_wait) {
- g_cancellable_cancel (io->async_close_wait);
- g_clear_object (&io->async_close_wait);
+ if (io->async_wait) {
+ g_cancellable_cancel (io->async_wait);
+ g_clear_object (&io->async_wait);
}
- g_clear_error (&io->async_close_error);
+ g_clear_error (&io->async_error);
g_slice_free (SoupMessageIOData, io);
}
@@ -322,6 +322,50 @@ soup_message_setup_body_istream (GInputStream *body_stream,
}
static void
+request_body_stream_wrote_data_cb (SoupMessage *msg,
+ guint count)
+{
+ GBytes *chunk;
+
+ /* FIXME: Change SoupMessage::wrote-body-data to pass just the size */
+ chunk = g_bytes_new_static ("", count);
+ soup_message_wrote_body_data (msg, chunk);
+ g_bytes_unref (chunk);
+}
+
+static void
+request_body_stream_wrote_cb (GOutputStream *ostream,
+ GAsyncResult *result,
+ SoupMessage *msg)
+{
+ SoupMessageIOData *io;
+ gssize nwrote;
+ GCancellable *async_wait;
+ GError *error = NULL;
+
+ nwrote = g_output_stream_splice_finish (ostream, result, &error);
+
+ io = soup_message_get_io_data (msg);
+ if (!io || !io->async_wait || io->body_ostream != ostream) {
+ g_clear_error (&error);
+ g_object_unref (msg);
+ return;
+ }
+
+ if (nwrote != -1)
+ io->write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH;
+
+ if (error)
+ g_propagate_error (&io->async_error, error);
+ async_wait = io->async_wait;
+ io->async_wait = NULL;
+ g_cancellable_cancel (async_wait);
+ g_object_unref (async_wait);
+
+ g_object_unref (msg);
+}
+
+static void
closed_async (GObject *source,
GAsyncResult *result,
gpointer user_data)
@@ -329,21 +373,21 @@ closed_async (GObject *source,
GOutputStream *body_ostream = G_OUTPUT_STREAM (source);
SoupMessage *msg = user_data;
SoupMessageIOData *io;
- GCancellable *async_close_wait;
+ GCancellable *async_wait;
io = soup_message_get_io_data (msg);
- if (!io || !io->async_close_wait || io->body_ostream != body_ostream) {
+ if (!io || !io->async_wait || io->body_ostream != body_ostream) {
g_object_unref (msg);
return;
}
- g_output_stream_close_finish (body_ostream, result, &io->async_close_error);
+ g_output_stream_close_finish (body_ostream, result, &io->async_error);
g_clear_object (&io->body_ostream);
- async_close_wait = io->async_close_wait;
- io->async_close_wait = NULL;
- g_cancellable_cancel (async_close_wait);
- g_object_unref (async_close_wait);
+ async_wait = io->async_wait;
+ io->async_wait = NULL;
+ g_cancellable_cancel (async_wait);
+ g_object_unref (async_wait);
g_object_unref (msg);
}
@@ -388,11 +432,11 @@ io_write (SoupMessage *msg, gboolean blocking,
GBytes *chunk;
gssize nwrote;
- if (io->async_close_error) {
- g_propagate_error (error, io->async_close_error);
- io->async_close_error = NULL;
+ if (io->async_error) {
+ g_propagate_error (error, io->async_error);
+ io->async_error = NULL;
return FALSE;
- } else if (io->async_close_wait) {
+ } else if (io->async_wait) {
g_set_error_literal (error, G_IO_ERROR,
G_IO_ERROR_WOULD_BLOCK,
_("Operation would block"));
@@ -503,6 +547,36 @@ io_write (SoupMessage *msg, gboolean blocking,
break;
}
+ if (io->mode == SOUP_MESSAGE_IO_CLIENT && msg->request_body_stream) {
+ g_signal_connect_object (io->body_ostream,
+ "wrote-data",
+ G_CALLBACK (request_body_stream_wrote_data_cb),
+ msg, G_CONNECT_SWAPPED);
+ if (blocking) {
+ nwrote = g_output_stream_splice (io->body_ostream,
+ msg->request_body_stream,
+ G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE,
+ cancellable,
+ error);
+ if (nwrote == -1)
+ return FALSE;
+ io->write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH;
+ break;
+ } else {
+ io->async_wait = g_cancellable_new ();
+ g_main_context_push_thread_default (io->async_context);
+ g_output_stream_splice_async (io->body_ostream,
+ msg->request_body_stream,
+ G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE,
+ G_PRIORITY_DEFAULT,
+ cancellable,
+ (GAsyncReadyCallback)request_body_stream_wrote_cb,
+ g_object_ref (msg));
+ g_main_context_pop_thread_default (io->async_context);
+ return FALSE;
+ }
+ }
+
if (!io->write_chunk) {
io->write_chunk = soup_message_body_get_chunk (io->write_body, io->write_body_offset);
if (!io->write_chunk) {
@@ -562,7 +636,7 @@ io_write (SoupMessage *msg, gboolean blocking,
return FALSE;
g_clear_object (&io->body_ostream);
} else {
- io->async_close_wait = g_cancellable_new ();
+ io->async_wait = g_cancellable_new ();
g_main_context_push_thread_default (io->async_context);
g_output_stream_close_async (io->body_ostream,
G_PRIORITY_DEFAULT, cancellable,
@@ -883,8 +957,8 @@ soup_message_io_get_source (SoupMessage *msg, GCancellable *cancellable,
base_source = g_timeout_source_new (0);
} else if (io->paused) {
base_source = NULL;
- } else if (io->async_close_wait) {
- base_source = g_cancellable_source_new (io->async_close_wait);
+ } else if (io->async_wait) {
+ base_source = g_cancellable_source_new (io->async_wait);
} else if (SOUP_MESSAGE_IO_STATE_POLLABLE (io->read_state)) {
GPollableInputStream *istream;
@@ -958,7 +1032,7 @@ io_run_until (SoupMessage *msg, gboolean blocking,
g_object_ref (msg);
- while (progress && soup_message_get_io_data (msg) == io && !io->paused && !io->async_close_wait &&
+ while (progress && soup_message_get_io_data (msg) == io && !io->paused && !io->async_wait &&
(io->read_state < read_state || io->write_state < write_state)) {
if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
@@ -988,7 +1062,7 @@ io_run_until (SoupMessage *msg, gboolean blocking,
_("Operation was cancelled"));
g_object_unref (msg);
return FALSE;
- } else if (!io->async_close_wait &&
+ } else if (!io->async_wait &&
g_cancellable_set_error_if_cancelled (cancellable, error)) {
g_object_unref (msg);
return FALSE;