From 3a308914423d4aa16e5546c1484c983c69fd2780 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Sat, 28 Sep 2013 16:09:06 -0400 Subject: Fix asynchronicity of soup_session_queue_message() on plain SoupSession Messages sent via soup_session_queue_message() on a plain SoupSession accidentally ended up using blocking I/O. Fix this (and also make switching between sync and async ops during a streaming operation work better). https://bugzilla.gnome.org/show_bug.cgi?id=707711 --- libsoup/soup-client-input-stream.c | 4 +- libsoup/soup-message-client-io.c | 2 +- libsoup/soup-message-io.c | 88 +++++++++++++++++++++----------------- libsoup/soup-message-private.h | 3 ++ libsoup/soup-session.c | 4 +- 5 files changed, 57 insertions(+), 44 deletions(-) diff --git a/libsoup/soup-client-input-stream.c b/libsoup/soup-client-input-stream.c index 3e533ca8..d73fb007 100644 --- a/libsoup/soup-client-input-stream.c +++ b/libsoup/soup-client-input-stream.c @@ -129,7 +129,7 @@ soup_client_input_stream_close_fn (GInputStream *stream, { SoupClientInputStream *cistream = SOUP_CLIENT_INPUT_STREAM (stream); - return soup_message_io_run_until_finish (cistream->priv->msg, + return soup_message_io_run_until_finish (cistream->priv->msg, TRUE, cancellable, error); } @@ -150,7 +150,7 @@ close_async_ready (SoupMessage *msg, gpointer user_data) SoupClientInputStream *cistream = g_task_get_source_object (task); GError *error = NULL; - if (!soup_message_io_run_until_finish (cistream->priv->msg, + if (!soup_message_io_run_until_finish (cistream->priv->msg, FALSE, g_task_get_cancellable (task), &error) && g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { diff --git a/libsoup/soup-message-client-io.c b/libsoup/soup-message-client-io.c index 1d96729d..b145bbaf 100644 --- a/libsoup/soup-message-client-io.c +++ b/libsoup/soup-message-client-io.c @@ -150,7 +150,7 @@ soup_message_send_request (SoupMessageQueueItem *item, GMainContext *async_context; GIOStream *iostream; - if (SOUP_IS_SESSION_ASYNC (item->session)) { + if (!SOUP_IS_SESSION_SYNC (item->session)) { async_context = soup_session_get_async_context (item->session); if (!async_context) async_context = g_main_context_default (); diff --git a/libsoup/soup-message-io.c b/libsoup/soup-message-io.c index 497fd063..f5f4c514 100644 --- a/libsoup/soup-message-io.c +++ b/libsoup/soup-message-io.c @@ -60,7 +60,6 @@ typedef struct { GOutputStream *ostream; GOutputStream *body_ostream; GMainContext *async_context; - gboolean blocking; SoupMessageIOState read_state; SoupEncoding read_encoding; @@ -167,7 +166,8 @@ soup_message_io_finished (SoupMessage *msg) } static gboolean -read_headers (SoupMessage *msg, GCancellable *cancellable, GError **error) +read_headers (SoupMessage *msg, gboolean blocking, + GCancellable *cancellable, GError **error) { SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg); SoupMessageIOData *io = priv->io_data; @@ -180,7 +180,7 @@ read_headers (SoupMessage *msg, GCancellable *cancellable, GError **error) nread = soup_filter_input_stream_read_line (io->istream, io->read_header_buf->data + old_len, RESPONSE_BLOCK_SIZE, - io->blocking, + blocking, &got_lf, cancellable, error); io->read_header_buf->len = old_len + MAX (nread, 0); @@ -303,7 +303,8 @@ soup_message_setup_body_istream (GInputStream *body_stream, * socket not writable, write is complete, etc). */ static gboolean -io_write (SoupMessage *msg, GCancellable *cancellable, GError **error) +io_write (SoupMessage *msg, gboolean blocking, + GCancellable *cancellable, GError **error) { SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg); SoupMessageIOData *io = priv->io_data; @@ -322,7 +323,7 @@ io_write (SoupMessage *msg, GCancellable *cancellable, GError **error) nwrote = g_pollable_stream_write (io->ostream, io->write_buf->str + io->written, io->write_buf->len - io->written, - io->blocking, + blocking, cancellable, error); if (nwrote == -1) return FALSE; @@ -414,7 +415,7 @@ io_write (SoupMessage *msg, GCancellable *cancellable, GError **error) nwrote = g_pollable_stream_write (io->body_ostream, io->write_chunk->data + io->written, io->write_chunk->length - io->written, - io->blocking, + blocking, cancellable, error); if (nwrote == -1) return FALSE; @@ -486,7 +487,8 @@ io_write (SoupMessage *msg, GCancellable *cancellable, GError **error) * socket not readable, read is complete, etc). */ static gboolean -io_read (SoupMessage *msg, GCancellable *cancellable, GError **error) +io_read (SoupMessage *msg, gboolean blocking, + GCancellable *cancellable, GError **error) { SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg); SoupMessageIOData *io = priv->io_data; @@ -497,7 +499,7 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error) switch (io->read_state) { case SOUP_MESSAGE_IO_STATE_HEADERS: - if (!read_headers (msg, cancellable, error)) + if (!read_headers (msg, blocking, cancellable, error)) return FALSE; status = io->parse_headers_cb (msg, (char *)io->read_header_buf->data, @@ -608,7 +610,8 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error) const char *content_type; GHashTable *params; - if (!soup_content_sniffer_stream_is_ready (sniffer_stream, io->blocking, cancellable, error)) + if (!soup_content_sniffer_stream_is_ready (sniffer_stream, blocking, + cancellable, error)) return FALSE; content_type = soup_content_sniffer_stream_sniff (sniffer_stream, ¶ms); @@ -638,7 +641,7 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error) nread = g_pollable_stream_read (io->body_istream, (guchar *)buffer->data, buffer->length, - io->blocking, + blocking, cancellable, error); if (nread > 0) { buffer->length = nread; @@ -829,7 +832,7 @@ request_is_restartable (SoupMessage *msg, GError *error) } static gboolean -io_run_until (SoupMessage *msg, +io_run_until (SoupMessage *msg, gboolean blocking, SoupMessageIOState read_state, SoupMessageIOState write_state, GCancellable *cancellable, GError **error) { @@ -853,9 +856,9 @@ io_run_until (SoupMessage *msg, (io->read_state < read_state || io->write_state < write_state)) { if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state)) - progress = io_read (msg, cancellable, &my_error); + progress = io_read (msg, blocking, cancellable, &my_error); else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state)) - progress = io_write (msg, cancellable, &my_error); + progress = io_write (msg, blocking, cancellable, &my_error); else progress = FALSE; } @@ -887,7 +890,7 @@ io_run_until (SoupMessage *msg, done = (io->read_state >= read_state && io->write_state >= write_state); - if (io->paused && !done) { + if (!blocking && !done) { g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, _("Operation would block")); @@ -899,8 +902,17 @@ io_run_until (SoupMessage *msg, return done; } +static void io_run (SoupMessage *msg, gboolean blocking); + static gboolean -io_run (SoupMessage *msg, gpointer user_data) +io_run_ready (SoupMessage *msg, gpointer user_data) +{ + io_run (msg, FALSE); + return FALSE; +} + +static void +io_run (SoupMessage *msg, gboolean blocking) { SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg); SoupMessageIOData *io = priv->io_data; @@ -916,14 +928,14 @@ io_run (SoupMessage *msg, gpointer user_data) g_object_ref (msg); cancellable = io->cancellable ? g_object_ref (io->cancellable) : NULL; - if (io_run_until (msg, + if (io_run_until (msg, blocking, SOUP_MESSAGE_IO_STATE_DONE, SOUP_MESSAGE_IO_STATE_DONE, cancellable, &error)) { soup_message_io_finished (msg); } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { g_clear_error (&error); - io->io_source = soup_message_io_get_source (msg, NULL, io_run, msg); + io->io_source = soup_message_io_get_source (msg, NULL, io_run_ready, msg); g_source_attach (io->io_source, io->async_context); } else if (error && priv->io_data == io) { if (g_error_matches (error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN)) @@ -942,25 +954,23 @@ io_run (SoupMessage *msg, gpointer user_data) g_object_unref (msg); g_clear_object (&cancellable); - - return FALSE; } gboolean -soup_message_io_run_until_write (SoupMessage *msg, +soup_message_io_run_until_write (SoupMessage *msg, gboolean blocking, GCancellable *cancellable, GError **error) { - return io_run_until (msg, + return io_run_until (msg, blocking, SOUP_MESSAGE_IO_STATE_ANY, SOUP_MESSAGE_IO_STATE_BODY, cancellable, error); } gboolean -soup_message_io_run_until_read (SoupMessage *msg, +soup_message_io_run_until_read (SoupMessage *msg, gboolean blocking, GCancellable *cancellable, GError **error) { - return io_run_until (msg, + return io_run_until (msg, blocking, SOUP_MESSAGE_IO_STATE_BODY, SOUP_MESSAGE_IO_STATE_ANY, cancellable, error); @@ -968,12 +978,13 @@ soup_message_io_run_until_read (SoupMessage *msg, gboolean soup_message_io_run_until_finish (SoupMessage *msg, + gboolean blocking, GCancellable *cancellable, GError **error) { g_object_ref (msg); - if (!io_run_until (msg, + if (!io_run_until (msg, blocking, SOUP_MESSAGE_IO_STATE_DONE, SOUP_MESSAGE_IO_STATE_DONE, cancellable, error)) { @@ -1045,11 +1056,8 @@ new_iostate (SoupMessage *msg, GIOStream *iostream, io->istream = SOUP_FILTER_INPUT_STREAM (g_io_stream_get_input_stream (iostream)); io->ostream = g_io_stream_get_output_stream (iostream); - if (async_context) { + if (async_context) io->async_context = g_main_context_ref (async_context); - io->blocking = FALSE; - } else - io->blocking = TRUE; io->read_header_buf = g_byte_array_new (); io->write_buf = g_string_new (NULL); @@ -1088,8 +1096,13 @@ soup_message_io_client (SoupMessageQueueItem *item, io->write_body = item->msg->request_body; io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS; - if (!item->new_api) - io_run (item->msg, NULL); + + if (!item->new_api) { + gboolean blocking = + SOUP_IS_SESSION_SYNC (item->session) || + (!SOUP_IS_SESSION_ASYNC (item->session) && !item->async); + io_run (item->msg, blocking); + } } void @@ -1112,7 +1125,7 @@ soup_message_io_server (SoupMessage *msg, io->write_body = msg->response_body; io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS; - io_run (msg, NULL); + io_run (msg, FALSE); } void @@ -1155,7 +1168,7 @@ io_unpause_internal (gpointer msg) if (io->io_source) return FALSE; - io_run (msg, NULL); + io_run (msg, FALSE); return FALSE; } @@ -1173,13 +1186,10 @@ soup_message_io_unpause (SoupMessage *msg) return; } - if (!io->blocking) { - if (!io->unpause_source) { - io->unpause_source = soup_add_completion_reffed ( - io->async_context, io_unpause_internal, msg); - } - } else - io_unpause_internal (msg); + if (!io->unpause_source) { + io->unpause_source = soup_add_completion_reffed (io->async_context, + io_unpause_internal, msg); + } } /** diff --git a/libsoup/soup-message-private.h b/libsoup/soup-message-private.h index 356b96db..35cc9887 100644 --- a/libsoup/soup-message-private.h +++ b/libsoup/soup-message-private.h @@ -105,12 +105,15 @@ void soup_message_io_unpause (SoupMessage *msg); gboolean soup_message_io_in_progress (SoupMessage *msg); gboolean soup_message_io_run_until_write (SoupMessage *msg, + gboolean blocking, GCancellable *cancellable, GError **error); gboolean soup_message_io_run_until_read (SoupMessage *msg, + gboolean blocking, GCancellable *cancellable, GError **error); gboolean soup_message_io_run_until_finish (SoupMessage *msg, + gboolean blocking, GCancellable *cancellable, GError **error); diff --git a/libsoup/soup-session.c b/libsoup/soup-session.c index 85ef1b25..a9258ef7 100644 --- a/libsoup/soup-session.c +++ b/libsoup/soup-session.c @@ -3767,7 +3767,7 @@ try_run_until_read (SoupMessageQueueItem *item) GError *error = NULL; GInputStream *stream = NULL; - if (soup_message_io_run_until_read (item->msg, item->cancellable, &error)) + if (soup_message_io_run_until_read (item->msg, FALSE, item->cancellable, &error)) stream = soup_message_io_get_response_istream (item->msg, &error); if (stream) { send_async_maybe_complete (item, stream); @@ -4157,7 +4157,7 @@ soup_session_send (SoupSession *session, break; /* Send request, read headers */ - if (!soup_message_io_run_until_read (msg, item->cancellable, &my_error)) { + if (!soup_message_io_run_until_read (msg, TRUE, item->cancellable, &my_error)) { if (g_error_matches (my_error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN)) { item->state = SOUP_MESSAGE_RESTARTING; soup_message_io_finished (item->msg); -- cgit v1.2.1