summaryrefslogtreecommitdiff
path: root/libsoup/soup-message-io.c
diff options
context:
space:
mode:
Diffstat (limited to 'libsoup/soup-message-io.c')
-rw-r--r--libsoup/soup-message-io.c223
1 files changed, 140 insertions, 83 deletions
diff --git a/libsoup/soup-message-io.c b/libsoup/soup-message-io.c
index eeb67553..be5cb2d2 100644
--- a/libsoup/soup-message-io.c
+++ b/libsoup/soup-message-io.c
@@ -16,8 +16,8 @@
#include "soup-body-output-stream.h"
#include "soup-client-input-stream.h"
#include "soup-connection.h"
+#include "soup-content-processor.h"
#include "soup-content-sniffer-stream.h"
-#include "soup-converter-wrapper.h"
#include "soup-filter-input-stream.h"
#include "soup-message-private.h"
#include "soup-message-queue.h"
@@ -60,7 +60,6 @@ typedef struct {
GOutputStream *ostream;
GOutputStream *body_ostream;
GMainContext *async_context;
- gboolean blocking;
SoupMessageIOState read_state;
SoupEncoding read_encoding;
@@ -156,8 +155,14 @@ soup_message_io_finished (SoupMessage *msg)
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io = priv->io_data;
- SoupMessageCompletionFn completion_cb = io->completion_cb;
- gpointer completion_data = io->completion_data;
+ SoupMessageCompletionFn completion_cb;
+ gpointer completion_data;
+
+ if (!io)
+ return;
+
+ completion_cb = io->completion_cb;
+ completion_data = io->completion_data;
g_object_ref (msg);
soup_message_io_cleanup (msg);
@@ -167,7 +172,8 @@ soup_message_io_finished (SoupMessage *msg)
}
static gboolean
-read_headers (SoupMessage *msg, GCancellable *cancellable, GError **error)
+read_headers (SoupMessage *msg, gboolean blocking,
+ GCancellable *cancellable, GError **error)
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io = priv->io_data;
@@ -180,7 +186,7 @@ read_headers (SoupMessage *msg, GCancellable *cancellable, GError **error)
nread = soup_filter_input_stream_read_line (io->istream,
io->read_header_buf->data + old_len,
RESPONSE_BLOCK_SIZE,
- io->blocking,
+ blocking,
&got_lf,
cancellable, error);
io->read_header_buf->len = old_len + MAX (nread, 0);
@@ -221,37 +227,53 @@ read_headers (SoupMessage *msg, GCancellable *cancellable, GError **error)
return TRUE;
}
-static void
-setup_body_istream (SoupMessage *msg)
+static gint
+processing_stage_cmp (gconstpointer a,
+ gconstpointer b)
{
- SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
- SoupMessageIOData *io = priv->io_data;
- GConverter *decoder, *wrapper;
- GInputStream *filter;
- GSList *d;
-
- io->body_istream =
- soup_body_input_stream_new (io->istream,
- io->read_encoding,
- io->read_length);
-
- for (d = priv->decoders; d; d = d->next) {
- decoder = d->data;
- wrapper = soup_converter_wrapper_new (decoder, msg);
- filter = g_object_new (G_TYPE_CONVERTER_INPUT_STREAM,
- "base-stream", io->body_istream,
- "converter", wrapper,
- NULL);
- g_object_unref (io->body_istream);
- io->body_istream = filter;
- }
+ SoupProcessingStage stage_a = soup_content_processor_get_processing_stage (SOUP_CONTENT_PROCESSOR (a));
+ SoupProcessingStage stage_b = soup_content_processor_get_processing_stage (SOUP_CONTENT_PROCESSOR (b));
+
+ if (stage_a > stage_b)
+ return 1;
+ if (stage_a == stage_b)
+ return 0;
+ return -1;
+}
- if (priv->sniffer) {
- filter = soup_content_sniffer_stream_new (priv->sniffer,
- msg, io->body_istream);
- g_object_unref (io->body_istream);
- io->body_istream = filter;
+GInputStream *
+soup_message_setup_body_istream (GInputStream *body_stream,
+ SoupMessage *msg,
+ SoupSession *session,
+ SoupProcessingStage start_at_stage)
+{
+ GInputStream *istream;
+ GSList *p, *processors;
+
+ istream = g_object_ref (body_stream);
+
+ processors = soup_session_get_features (session, SOUP_TYPE_CONTENT_PROCESSOR);
+ processors = g_slist_sort (processors, processing_stage_cmp);
+
+ for (p = processors; p; p = p->next) {
+ GInputStream *wrapper;
+ SoupContentProcessor *processor;
+
+ processor = SOUP_CONTENT_PROCESSOR (p->data);
+ if (soup_message_disables_feature (msg, p->data) ||
+ soup_content_processor_get_processing_stage (processor) < start_at_stage)
+ continue;
+
+ wrapper = soup_content_processor_wrap_input (processor, istream, msg, NULL);
+ if (wrapper) {
+ g_object_unref (istream);
+ istream = wrapper;
+ }
}
+
+ g_slist_free (processors);
+
+ return istream;
}
/*
@@ -287,7 +309,8 @@ setup_body_istream (SoupMessage *msg)
* socket not writable, write is complete, etc).
*/
static gboolean
-io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
+io_write (SoupMessage *msg, gboolean blocking,
+ GCancellable *cancellable, GError **error)
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io = priv->io_data;
@@ -306,7 +329,7 @@ io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
nwrote = g_pollable_stream_write (io->ostream,
io->write_buf->str + io->written,
io->write_buf->len - io->written,
- io->blocking,
+ blocking,
cancellable, error);
if (nwrote == -1)
return FALSE;
@@ -398,7 +421,7 @@ io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
nwrote = g_pollable_stream_write (io->body_ostream,
io->write_chunk->data + io->written,
io->write_chunk->length - io->written,
- io->blocking,
+ blocking,
cancellable, error);
if (nwrote == -1)
return FALSE;
@@ -470,7 +493,8 @@ io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
* socket not readable, read is complete, etc).
*/
static gboolean
-io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
+io_read (SoupMessage *msg, gboolean blocking,
+ GCancellable *cancellable, GError **error)
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io = priv->io_data;
@@ -481,13 +505,13 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
switch (io->read_state) {
case SOUP_MESSAGE_IO_STATE_HEADERS:
- if (!read_headers (msg, cancellable, error))
+ if (!read_headers (msg, blocking, cancellable, error))
return FALSE;
status = io->parse_headers_cb (msg, (char *)io->read_header_buf->data,
io->read_header_buf->len,
&io->read_encoding,
- io->header_data);
+ io->header_data, error);
g_byte_array_set_size (io->read_header_buf, 0);
if (status != SOUP_STATUS_OK) {
@@ -569,15 +593,31 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
case SOUP_MESSAGE_IO_STATE_BODY_START:
- if (!io->body_istream)
- setup_body_istream (msg);
+ if (!io->body_istream) {
+ GInputStream *body_istream = soup_body_input_stream_new (G_INPUT_STREAM (io->istream),
+ io->read_encoding,
+ io->read_length);
+
+ /* TODO: server-side messages do not have a io->item. This means
+ * that we cannot use content processors for them right now.
+ */
+ if (io->mode == SOUP_MESSAGE_IO_CLIENT) {
+ io->body_istream = soup_message_setup_body_istream (body_istream, msg,
+ io->item->session,
+ SOUP_STAGE_MESSAGE_BODY);
+ g_object_unref (body_istream);
+ } else {
+ io->body_istream = body_istream;
+ }
+ }
if (priv->sniffer) {
SoupContentSnifferStream *sniffer_stream = SOUP_CONTENT_SNIFFER_STREAM (io->body_istream);
const char *content_type;
GHashTable *params;
- if (!soup_content_sniffer_stream_is_ready (sniffer_stream, io->blocking, cancellable, error))
+ if (!soup_content_sniffer_stream_is_ready (sniffer_stream, blocking,
+ cancellable, error))
return FALSE;
content_type = soup_content_sniffer_stream_sniff (sniffer_stream, &params);
@@ -607,7 +647,7 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
nread = g_pollable_stream_read (io->body_istream,
(guchar *)buffer->data,
buffer->length,
- io->blocking,
+ blocking,
cancellable, error);
if (nread > 0) {
buffer->length = nread;
@@ -628,8 +668,6 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
case SOUP_MESSAGE_IO_STATE_BODY_DONE:
io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
- if (io->item && io->item->conn)
- soup_connection_set_reusable (io->item->conn);
soup_message_got_body (msg);
break;
@@ -800,7 +838,7 @@ request_is_restartable (SoupMessage *msg, GError *error)
}
static gboolean
-io_run_until (SoupMessage *msg,
+io_run_until (SoupMessage *msg, gboolean blocking,
SoupMessageIOState read_state, SoupMessageIOState write_state,
GCancellable *cancellable, GError **error)
{
@@ -824,9 +862,9 @@ io_run_until (SoupMessage *msg,
(io->read_state < read_state || io->write_state < write_state)) {
if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
- progress = io_read (msg, cancellable, &my_error);
+ progress = io_read (msg, blocking, cancellable, &my_error);
else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state))
- progress = io_write (msg, cancellable, &my_error);
+ progress = io_write (msg, blocking, cancellable, &my_error);
else
progress = FALSE;
}
@@ -858,7 +896,7 @@ io_run_until (SoupMessage *msg,
done = (io->read_state >= read_state &&
io->write_state >= write_state);
- if (io->paused && !done) {
+ if (!blocking && !done) {
g_set_error_literal (error, G_IO_ERROR,
G_IO_ERROR_WOULD_BLOCK,
_("Operation would block"));
@@ -870,8 +908,17 @@ io_run_until (SoupMessage *msg,
return done;
}
+static void io_run (SoupMessage *msg, gboolean blocking);
+
static gboolean
-io_run (SoupMessage *msg, gpointer user_data)
+io_run_ready (SoupMessage *msg, gpointer user_data)
+{
+ io_run (msg, FALSE);
+ return FALSE;
+}
+
+static void
+io_run (SoupMessage *msg, gboolean blocking)
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io = priv->io_data;
@@ -887,14 +934,14 @@ io_run (SoupMessage *msg, gpointer user_data)
g_object_ref (msg);
cancellable = io->cancellable ? g_object_ref (io->cancellable) : NULL;
- if (io_run_until (msg,
+ if (io_run_until (msg, blocking,
SOUP_MESSAGE_IO_STATE_DONE,
SOUP_MESSAGE_IO_STATE_DONE,
cancellable, &error)) {
soup_message_io_finished (msg);
} else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
g_clear_error (&error);
- io->io_source = soup_message_io_get_source (msg, NULL, io_run, msg);
+ io->io_source = soup_message_io_get_source (msg, NULL, io_run_ready, msg);
g_source_attach (io->io_source, io->async_context);
} else if (error && priv->io_data == io) {
if (g_error_matches (error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN))
@@ -908,29 +955,28 @@ io_run (SoupMessage *msg, gpointer user_data)
g_error_free (error);
soup_message_io_finished (msg);
- }
+ } else if (error)
+ g_error_free (error);
g_object_unref (msg);
g_clear_object (&cancellable);
-
- return FALSE;
}
gboolean
-soup_message_io_run_until_write (SoupMessage *msg,
+soup_message_io_run_until_write (SoupMessage *msg, gboolean blocking,
GCancellable *cancellable, GError **error)
{
- return io_run_until (msg,
+ return io_run_until (msg, blocking,
SOUP_MESSAGE_IO_STATE_ANY,
SOUP_MESSAGE_IO_STATE_BODY,
cancellable, error);
}
gboolean
-soup_message_io_run_until_read (SoupMessage *msg,
+soup_message_io_run_until_read (SoupMessage *msg, gboolean blocking,
GCancellable *cancellable, GError **error)
{
- return io_run_until (msg,
+ return io_run_until (msg, blocking,
SOUP_MESSAGE_IO_STATE_BODY,
SOUP_MESSAGE_IO_STATE_ANY,
cancellable, error);
@@ -938,20 +984,30 @@ soup_message_io_run_until_read (SoupMessage *msg,
gboolean
soup_message_io_run_until_finish (SoupMessage *msg,
+ gboolean blocking,
GCancellable *cancellable,
GError **error)
{
+ SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
+ SoupMessageIOData *io = priv->io_data;
+ gboolean success;
+
g_object_ref (msg);
- if (!io_run_until (msg,
- SOUP_MESSAGE_IO_STATE_DONE,
- SOUP_MESSAGE_IO_STATE_DONE,
- cancellable, error))
- return FALSE;
+ if (io) {
+ g_return_val_if_fail (io->mode == SOUP_MESSAGE_IO_CLIENT, FALSE);
+
+ if (io->read_state < SOUP_MESSAGE_IO_STATE_BODY_DONE)
+ io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
+ }
+
+ success = io_run_until (msg, blocking,
+ SOUP_MESSAGE_IO_STATE_DONE,
+ SOUP_MESSAGE_IO_STATE_DONE,
+ cancellable, error);
- soup_message_io_finished (msg);
g_object_unref (msg);
- return TRUE;
+ return success;
}
static void
@@ -1013,11 +1069,8 @@ new_iostate (SoupMessage *msg, GIOStream *iostream,
io->istream = SOUP_FILTER_INPUT_STREAM (g_io_stream_get_input_stream (iostream));
io->ostream = g_io_stream_get_output_stream (iostream);
- if (async_context) {
+ if (async_context)
io->async_context = g_main_context_ref (async_context);
- io->blocking = FALSE;
- } else
- io->blocking = TRUE;
io->read_header_buf = g_byte_array_new ();
io->write_buf = g_string_new (NULL);
@@ -1056,8 +1109,13 @@ soup_message_io_client (SoupMessageQueueItem *item,
io->write_body = item->msg->request_body;
io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
- if (!item->new_api)
- io_run (item->msg, NULL);
+
+ if (!item->new_api) {
+ gboolean blocking =
+ SOUP_IS_SESSION_SYNC (item->session) ||
+ (!SOUP_IS_SESSION_ASYNC (item->session) && !item->async);
+ io_run (item->msg, blocking);
+ }
}
void
@@ -1080,7 +1138,7 @@ soup_message_io_server (SoupMessage *msg,
io->write_body = msg->response_body;
io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
- io_run (msg, NULL);
+ io_run (msg, FALSE);
}
void
@@ -1102,6 +1160,7 @@ soup_message_io_pause (SoupMessage *msg)
if (io->unpause_source) {
g_source_destroy (io->unpause_source);
+ g_source_unref (io->unpause_source);
io->unpause_source = NULL;
}
@@ -1115,13 +1174,14 @@ io_unpause_internal (gpointer msg)
SoupMessageIOData *io = priv->io_data;
g_return_val_if_fail (io != NULL, FALSE);
- io->unpause_source = NULL;
+
+ g_clear_pointer (&io->unpause_source, g_source_unref);
io->paused = FALSE;
if (io->io_source)
return FALSE;
- io_run (msg, NULL);
+ io_run (msg, FALSE);
return FALSE;
}
@@ -1139,13 +1199,10 @@ soup_message_io_unpause (SoupMessage *msg)
return;
}
- if (!io->blocking) {
- if (!io->unpause_source) {
- io->unpause_source = soup_add_completion (
- io->async_context, io_unpause_internal, msg);
- }
- } else
- io_unpause_internal (msg);
+ if (!io->unpause_source) {
+ io->unpause_source = soup_add_completion_reffed (io->async_context,
+ io_unpause_internal, msg);
+ }
}
/**