diff options
Diffstat (limited to 'libsoup/soup-message-io.c')
-rw-r--r-- | libsoup/soup-message-io.c | 223 |
1 files changed, 140 insertions, 83 deletions
diff --git a/libsoup/soup-message-io.c b/libsoup/soup-message-io.c index eeb67553..be5cb2d2 100644 --- a/libsoup/soup-message-io.c +++ b/libsoup/soup-message-io.c @@ -16,8 +16,8 @@ #include "soup-body-output-stream.h" #include "soup-client-input-stream.h" #include "soup-connection.h" +#include "soup-content-processor.h" #include "soup-content-sniffer-stream.h" -#include "soup-converter-wrapper.h" #include "soup-filter-input-stream.h" #include "soup-message-private.h" #include "soup-message-queue.h" @@ -60,7 +60,6 @@ typedef struct { GOutputStream *ostream; GOutputStream *body_ostream; GMainContext *async_context; - gboolean blocking; SoupMessageIOState read_state; SoupEncoding read_encoding; @@ -156,8 +155,14 @@ soup_message_io_finished (SoupMessage *msg) { SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg); SoupMessageIOData *io = priv->io_data; - SoupMessageCompletionFn completion_cb = io->completion_cb; - gpointer completion_data = io->completion_data; + SoupMessageCompletionFn completion_cb; + gpointer completion_data; + + if (!io) + return; + + completion_cb = io->completion_cb; + completion_data = io->completion_data; g_object_ref (msg); soup_message_io_cleanup (msg); @@ -167,7 +172,8 @@ soup_message_io_finished (SoupMessage *msg) } static gboolean -read_headers (SoupMessage *msg, GCancellable *cancellable, GError **error) +read_headers (SoupMessage *msg, gboolean blocking, + GCancellable *cancellable, GError **error) { SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg); SoupMessageIOData *io = priv->io_data; @@ -180,7 +186,7 @@ read_headers (SoupMessage *msg, GCancellable *cancellable, GError **error) nread = soup_filter_input_stream_read_line (io->istream, io->read_header_buf->data + old_len, RESPONSE_BLOCK_SIZE, - io->blocking, + blocking, &got_lf, cancellable, error); io->read_header_buf->len = old_len + MAX (nread, 0); @@ -221,37 +227,53 @@ read_headers (SoupMessage *msg, GCancellable *cancellable, GError **error) return TRUE; } -static void -setup_body_istream (SoupMessage *msg) +static gint +processing_stage_cmp (gconstpointer a, + gconstpointer b) { - SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg); - SoupMessageIOData *io = priv->io_data; - GConverter *decoder, *wrapper; - GInputStream *filter; - GSList *d; - - io->body_istream = - soup_body_input_stream_new (io->istream, - io->read_encoding, - io->read_length); - - for (d = priv->decoders; d; d = d->next) { - decoder = d->data; - wrapper = soup_converter_wrapper_new (decoder, msg); - filter = g_object_new (G_TYPE_CONVERTER_INPUT_STREAM, - "base-stream", io->body_istream, - "converter", wrapper, - NULL); - g_object_unref (io->body_istream); - io->body_istream = filter; - } + SoupProcessingStage stage_a = soup_content_processor_get_processing_stage (SOUP_CONTENT_PROCESSOR (a)); + SoupProcessingStage stage_b = soup_content_processor_get_processing_stage (SOUP_CONTENT_PROCESSOR (b)); + + if (stage_a > stage_b) + return 1; + if (stage_a == stage_b) + return 0; + return -1; +} - if (priv->sniffer) { - filter = soup_content_sniffer_stream_new (priv->sniffer, - msg, io->body_istream); - g_object_unref (io->body_istream); - io->body_istream = filter; +GInputStream * +soup_message_setup_body_istream (GInputStream *body_stream, + SoupMessage *msg, + SoupSession *session, + SoupProcessingStage start_at_stage) +{ + GInputStream *istream; + GSList *p, *processors; + + istream = g_object_ref (body_stream); + + processors = soup_session_get_features (session, SOUP_TYPE_CONTENT_PROCESSOR); + processors = g_slist_sort (processors, processing_stage_cmp); + + for (p = processors; p; p = p->next) { + GInputStream *wrapper; + SoupContentProcessor *processor; + + processor = SOUP_CONTENT_PROCESSOR (p->data); + if (soup_message_disables_feature (msg, p->data) || + soup_content_processor_get_processing_stage (processor) < start_at_stage) + continue; + + wrapper = soup_content_processor_wrap_input (processor, istream, msg, NULL); + if (wrapper) { + g_object_unref (istream); + istream = wrapper; + } } + + g_slist_free (processors); + + return istream; } /* @@ -287,7 +309,8 @@ setup_body_istream (SoupMessage *msg) * socket not writable, write is complete, etc). */ static gboolean -io_write (SoupMessage *msg, GCancellable *cancellable, GError **error) +io_write (SoupMessage *msg, gboolean blocking, + GCancellable *cancellable, GError **error) { SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg); SoupMessageIOData *io = priv->io_data; @@ -306,7 +329,7 @@ io_write (SoupMessage *msg, GCancellable *cancellable, GError **error) nwrote = g_pollable_stream_write (io->ostream, io->write_buf->str + io->written, io->write_buf->len - io->written, - io->blocking, + blocking, cancellable, error); if (nwrote == -1) return FALSE; @@ -398,7 +421,7 @@ io_write (SoupMessage *msg, GCancellable *cancellable, GError **error) nwrote = g_pollable_stream_write (io->body_ostream, io->write_chunk->data + io->written, io->write_chunk->length - io->written, - io->blocking, + blocking, cancellable, error); if (nwrote == -1) return FALSE; @@ -470,7 +493,8 @@ io_write (SoupMessage *msg, GCancellable *cancellable, GError **error) * socket not readable, read is complete, etc). */ static gboolean -io_read (SoupMessage *msg, GCancellable *cancellable, GError **error) +io_read (SoupMessage *msg, gboolean blocking, + GCancellable *cancellable, GError **error) { SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg); SoupMessageIOData *io = priv->io_data; @@ -481,13 +505,13 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error) switch (io->read_state) { case SOUP_MESSAGE_IO_STATE_HEADERS: - if (!read_headers (msg, cancellable, error)) + if (!read_headers (msg, blocking, cancellable, error)) return FALSE; status = io->parse_headers_cb (msg, (char *)io->read_header_buf->data, io->read_header_buf->len, &io->read_encoding, - io->header_data); + io->header_data, error); g_byte_array_set_size (io->read_header_buf, 0); if (status != SOUP_STATUS_OK) { @@ -569,15 +593,31 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error) case SOUP_MESSAGE_IO_STATE_BODY_START: - if (!io->body_istream) - setup_body_istream (msg); + if (!io->body_istream) { + GInputStream *body_istream = soup_body_input_stream_new (G_INPUT_STREAM (io->istream), + io->read_encoding, + io->read_length); + + /* TODO: server-side messages do not have a io->item. This means + * that we cannot use content processors for them right now. + */ + if (io->mode == SOUP_MESSAGE_IO_CLIENT) { + io->body_istream = soup_message_setup_body_istream (body_istream, msg, + io->item->session, + SOUP_STAGE_MESSAGE_BODY); + g_object_unref (body_istream); + } else { + io->body_istream = body_istream; + } + } if (priv->sniffer) { SoupContentSnifferStream *sniffer_stream = SOUP_CONTENT_SNIFFER_STREAM (io->body_istream); const char *content_type; GHashTable *params; - if (!soup_content_sniffer_stream_is_ready (sniffer_stream, io->blocking, cancellable, error)) + if (!soup_content_sniffer_stream_is_ready (sniffer_stream, blocking, + cancellable, error)) return FALSE; content_type = soup_content_sniffer_stream_sniff (sniffer_stream, ¶ms); @@ -607,7 +647,7 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error) nread = g_pollable_stream_read (io->body_istream, (guchar *)buffer->data, buffer->length, - io->blocking, + blocking, cancellable, error); if (nread > 0) { buffer->length = nread; @@ -628,8 +668,6 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error) case SOUP_MESSAGE_IO_STATE_BODY_DONE: io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING; - if (io->item && io->item->conn) - soup_connection_set_reusable (io->item->conn); soup_message_got_body (msg); break; @@ -800,7 +838,7 @@ request_is_restartable (SoupMessage *msg, GError *error) } static gboolean -io_run_until (SoupMessage *msg, +io_run_until (SoupMessage *msg, gboolean blocking, SoupMessageIOState read_state, SoupMessageIOState write_state, GCancellable *cancellable, GError **error) { @@ -824,9 +862,9 @@ io_run_until (SoupMessage *msg, (io->read_state < read_state || io->write_state < write_state)) { if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state)) - progress = io_read (msg, cancellable, &my_error); + progress = io_read (msg, blocking, cancellable, &my_error); else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state)) - progress = io_write (msg, cancellable, &my_error); + progress = io_write (msg, blocking, cancellable, &my_error); else progress = FALSE; } @@ -858,7 +896,7 @@ io_run_until (SoupMessage *msg, done = (io->read_state >= read_state && io->write_state >= write_state); - if (io->paused && !done) { + if (!blocking && !done) { g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, _("Operation would block")); @@ -870,8 +908,17 @@ io_run_until (SoupMessage *msg, return done; } +static void io_run (SoupMessage *msg, gboolean blocking); + static gboolean -io_run (SoupMessage *msg, gpointer user_data) +io_run_ready (SoupMessage *msg, gpointer user_data) +{ + io_run (msg, FALSE); + return FALSE; +} + +static void +io_run (SoupMessage *msg, gboolean blocking) { SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg); SoupMessageIOData *io = priv->io_data; @@ -887,14 +934,14 @@ io_run (SoupMessage *msg, gpointer user_data) g_object_ref (msg); cancellable = io->cancellable ? g_object_ref (io->cancellable) : NULL; - if (io_run_until (msg, + if (io_run_until (msg, blocking, SOUP_MESSAGE_IO_STATE_DONE, SOUP_MESSAGE_IO_STATE_DONE, cancellable, &error)) { soup_message_io_finished (msg); } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { g_clear_error (&error); - io->io_source = soup_message_io_get_source (msg, NULL, io_run, msg); + io->io_source = soup_message_io_get_source (msg, NULL, io_run_ready, msg); g_source_attach (io->io_source, io->async_context); } else if (error && priv->io_data == io) { if (g_error_matches (error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN)) @@ -908,29 +955,28 @@ io_run (SoupMessage *msg, gpointer user_data) g_error_free (error); soup_message_io_finished (msg); - } + } else if (error) + g_error_free (error); g_object_unref (msg); g_clear_object (&cancellable); - - return FALSE; } gboolean -soup_message_io_run_until_write (SoupMessage *msg, +soup_message_io_run_until_write (SoupMessage *msg, gboolean blocking, GCancellable *cancellable, GError **error) { - return io_run_until (msg, + return io_run_until (msg, blocking, SOUP_MESSAGE_IO_STATE_ANY, SOUP_MESSAGE_IO_STATE_BODY, cancellable, error); } gboolean -soup_message_io_run_until_read (SoupMessage *msg, +soup_message_io_run_until_read (SoupMessage *msg, gboolean blocking, GCancellable *cancellable, GError **error) { - return io_run_until (msg, + return io_run_until (msg, blocking, SOUP_MESSAGE_IO_STATE_BODY, SOUP_MESSAGE_IO_STATE_ANY, cancellable, error); @@ -938,20 +984,30 @@ soup_message_io_run_until_read (SoupMessage *msg, gboolean soup_message_io_run_until_finish (SoupMessage *msg, + gboolean blocking, GCancellable *cancellable, GError **error) { + SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg); + SoupMessageIOData *io = priv->io_data; + gboolean success; + g_object_ref (msg); - if (!io_run_until (msg, - SOUP_MESSAGE_IO_STATE_DONE, - SOUP_MESSAGE_IO_STATE_DONE, - cancellable, error)) - return FALSE; + if (io) { + g_return_val_if_fail (io->mode == SOUP_MESSAGE_IO_CLIENT, FALSE); + + if (io->read_state < SOUP_MESSAGE_IO_STATE_BODY_DONE) + io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING; + } + + success = io_run_until (msg, blocking, + SOUP_MESSAGE_IO_STATE_DONE, + SOUP_MESSAGE_IO_STATE_DONE, + cancellable, error); - soup_message_io_finished (msg); g_object_unref (msg); - return TRUE; + return success; } static void @@ -1013,11 +1069,8 @@ new_iostate (SoupMessage *msg, GIOStream *iostream, io->istream = SOUP_FILTER_INPUT_STREAM (g_io_stream_get_input_stream (iostream)); io->ostream = g_io_stream_get_output_stream (iostream); - if (async_context) { + if (async_context) io->async_context = g_main_context_ref (async_context); - io->blocking = FALSE; - } else - io->blocking = TRUE; io->read_header_buf = g_byte_array_new (); io->write_buf = g_string_new (NULL); @@ -1056,8 +1109,13 @@ soup_message_io_client (SoupMessageQueueItem *item, io->write_body = item->msg->request_body; io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS; - if (!item->new_api) - io_run (item->msg, NULL); + + if (!item->new_api) { + gboolean blocking = + SOUP_IS_SESSION_SYNC (item->session) || + (!SOUP_IS_SESSION_ASYNC (item->session) && !item->async); + io_run (item->msg, blocking); + } } void @@ -1080,7 +1138,7 @@ soup_message_io_server (SoupMessage *msg, io->write_body = msg->response_body; io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS; - io_run (msg, NULL); + io_run (msg, FALSE); } void @@ -1102,6 +1160,7 @@ soup_message_io_pause (SoupMessage *msg) if (io->unpause_source) { g_source_destroy (io->unpause_source); + g_source_unref (io->unpause_source); io->unpause_source = NULL; } @@ -1115,13 +1174,14 @@ io_unpause_internal (gpointer msg) SoupMessageIOData *io = priv->io_data; g_return_val_if_fail (io != NULL, FALSE); - io->unpause_source = NULL; + + g_clear_pointer (&io->unpause_source, g_source_unref); io->paused = FALSE; if (io->io_source) return FALSE; - io_run (msg, NULL); + io_run (msg, FALSE); return FALSE; } @@ -1139,13 +1199,10 @@ soup_message_io_unpause (SoupMessage *msg) return; } - if (!io->blocking) { - if (!io->unpause_source) { - io->unpause_source = soup_add_completion ( - io->async_context, io_unpause_internal, msg); - } - } else - io_unpause_internal (msg); + if (!io->unpause_source) { + io->unpause_source = soup_add_completion_reffed (io->async_context, + io_unpause_internal, msg); + } } /** |