summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Winship <danw@gnome.org>2012-01-26 16:25:57 -0500
committerDan Winship <danw@gnome.org>2012-04-17 21:26:26 -0400
commit9effb5ca942412ecde9242c745f2df6da80853a3 (patch)
tree335e8db3779f533cae4bad15cb39c6c9996e27c1
parent3f1180b9453899464b0ae49515cddeb8ae3abc7f (diff)
downloadlibsoup-9effb5ca942412ecde9242c745f2df6da80853a3.tar.gz
SoupHTTPRequest: O brave new world!
Kill SoupHTTPInputStream, and have SoupHTTPRequest return the message's body_istream directly (with a little help from SoupSession and its subclasses). SoupHTTPRequest works synchronously now as well (though it's still the case that async only works with SoupSessionAsync and sync only works with SoupSessionSync). https://bugzilla.gnome.org/show_bug.cgi?id=591739
-rw-r--r--libsoup/Makefile.am4
-rw-r--r--libsoup/soup-body-input-stream.c28
-rw-r--r--libsoup/soup-client-input-stream.c280
-rw-r--r--libsoup/soup-client-input-stream.h46
-rw-r--r--libsoup/soup-http-input-stream.c793
-rw-r--r--libsoup/soup-http-input-stream.h79
-rw-r--r--libsoup/soup-message-io.c310
-rw-r--r--libsoup/soup-message-private.h19
-rw-r--r--libsoup/soup-message-queue.c2
-rw-r--r--libsoup/soup-message-queue.h2
-rw-r--r--libsoup/soup-request-http.c71
-rw-r--r--libsoup/soup-session-async.c239
-rw-r--r--libsoup/soup-session-private.h14
-rw-r--r--libsoup/soup-session-sync.c117
-rw-r--r--libsoup/soup-session.c7
-rw-r--r--po/POTFILES.in1
16 files changed, 1047 insertions, 965 deletions
diff --git a/libsoup/Makefile.am b/libsoup/Makefile.am
index ad85b073..8fd8ee59 100644
--- a/libsoup/Makefile.am
+++ b/libsoup/Makefile.am
@@ -101,6 +101,8 @@ libsoup_2_4_la_SOURCES = \
soup-body-output-stream.c \
soup-cache.c \
soup-cache-private.h \
+ soup-client-input-stream.h \
+ soup-client-input-stream.c \
soup-connection.h \
soup-connection.c \
soup-content-decoder.c \
@@ -121,8 +123,6 @@ libsoup_2_4_la_SOURCES = \
soup-filter-input-stream.h \
soup-form.c \
soup-headers.c \
- soup-http-input-stream.h \
- soup-http-input-stream.c \
soup-logger.c \
soup-marshal.h \
soup-marshal.c \
diff --git a/libsoup/soup-body-input-stream.c b/libsoup/soup-body-input-stream.c
index 2c5d16ea..a635bd58 100644
--- a/libsoup/soup-body-input-stream.c
+++ b/libsoup/soup-body-input-stream.c
@@ -18,6 +18,7 @@
#include "soup-body-input-stream.h"
#include "soup-enum-types.h"
#include "soup-filter-input-stream.h"
+#include "soup-marshal.h"
#include "soup-message-headers.h"
typedef enum {
@@ -38,6 +39,13 @@ struct _SoupBodyInputStreamPrivate {
};
enum {
+ CLOSED,
+ LAST_SIGNAL
+};
+
+static guint signals[LAST_SIGNAL] = { 0 };
+
+enum {
PROP_0,
PROP_ENCODING,
@@ -270,6 +278,16 @@ soup_body_input_stream_read_fn (GInputStream *stream,
}
static gboolean
+soup_body_input_stream_close_fn (GInputStream *stream,
+ GCancellable *cancellable,
+ GError **error)
+{
+ g_signal_emit (stream, signals[CLOSED], 0);
+
+ return G_INPUT_STREAM_CLASS (soup_body_input_stream_parent_class)->close_fn (stream, cancellable, error);
+}
+
+static gboolean
soup_body_input_stream_is_readable (GPollableInputStream *stream)
{
SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (stream);
@@ -321,6 +339,16 @@ soup_body_input_stream_class_init (SoupBodyInputStreamClass *stream_class)
object_class->get_property = get_property;
input_stream_class->read_fn = soup_body_input_stream_read_fn;
+ input_stream_class->close_fn = soup_body_input_stream_close_fn;
+
+ signals[CLOSED] =
+ g_signal_new ("closed",
+ G_OBJECT_CLASS_TYPE (object_class),
+ G_SIGNAL_RUN_LAST,
+ 0,
+ NULL, NULL,
+ _soup_marshal_NONE__NONE,
+ G_TYPE_NONE, 0);
g_object_class_install_property (
object_class, PROP_ENCODING,
diff --git a/libsoup/soup-client-input-stream.c b/libsoup/soup-client-input-stream.c
new file mode 100644
index 00000000..8d1a2ea6
--- /dev/null
+++ b/libsoup/soup-client-input-stream.c
@@ -0,0 +1,280 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * soup-client-input-stream.c
+ *
+ * Copyright 2010-2012 Red Hat, Inc.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <gio/gio.h>
+
+#include "soup-client-input-stream.h"
+#include "soup-marshal.h"
+#include "soup-message.h"
+#include "soup-message-private.h"
+
+struct _SoupClientInputStreamPrivate {
+ SoupMessage *msg;
+};
+
+enum {
+ EOF,
+ LAST_SIGNAL
+};
+
+static guint signals[LAST_SIGNAL] = { 0 };
+
+enum {
+ PROP_0,
+
+ PROP_MESSAGE
+};
+
+static GPollableInputStreamInterface *soup_client_input_stream_parent_pollable_interface;
+static void soup_client_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);
+
+G_DEFINE_TYPE_WITH_CODE (SoupClientInputStream, soup_client_input_stream, SOUP_TYPE_FILTER_INPUT_STREAM,
+ G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
+ soup_client_input_stream_pollable_init))
+
+static void
+soup_client_input_stream_init (SoupClientInputStream *stream)
+{
+ stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
+ SOUP_TYPE_CLIENT_INPUT_STREAM,
+ SoupClientInputStreamPrivate);
+}
+
+static void
+set_property (GObject *object, guint prop_id,
+ const GValue *value, GParamSpec *pspec)
+{
+ SoupClientInputStream *cistream = SOUP_CLIENT_INPUT_STREAM (object);
+
+ switch (prop_id) {
+ case PROP_MESSAGE:
+ cistream->priv->msg = g_value_dup_object (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+get_property (GObject *object, guint prop_id,
+ GValue *value, GParamSpec *pspec)
+{
+ SoupClientInputStream *cistream = SOUP_CLIENT_INPUT_STREAM (object);
+
+ switch (prop_id) {
+ case PROP_MESSAGE:
+ g_value_set_object (value, cistream->priv->msg);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static gssize
+soup_client_input_stream_read_fn (GInputStream *stream,
+ void *buffer,
+ gsize count,
+ GCancellable *cancellable,
+ GError **error)
+{
+ gssize nread;
+
+ nread = G_INPUT_STREAM_CLASS (soup_client_input_stream_parent_class)->
+ read_fn (stream, buffer, count, cancellable, error);
+
+ if (nread == 0)
+ g_signal_emit (stream, signals[EOF], 0);
+
+ return nread;
+}
+
+static gssize
+soup_client_input_stream_read_nonblocking (GPollableInputStream *stream,
+ void *buffer,
+ gsize count,
+ GError **error)
+{
+ gssize nread;
+
+ nread = soup_client_input_stream_parent_pollable_interface->
+ read_nonblocking (stream, buffer, count, error);
+
+ if (nread == 0)
+ g_signal_emit (stream, signals[EOF], 0);
+
+ return nread;
+}
+
+static gboolean
+soup_client_input_stream_close_fn (GInputStream *stream,
+ GCancellable *cancellable,
+ GError **error)
+{
+ SoupClientInputStream *cistream = SOUP_CLIENT_INPUT_STREAM (stream);
+
+ if (!soup_message_io_run_until_finish (cistream->priv->msg,
+ cancellable, error))
+ return FALSE;
+
+ return G_INPUT_STREAM_CLASS (soup_client_input_stream_parent_class)->close_fn (stream, cancellable, error);
+}
+
+typedef struct {
+ SoupClientInputStream *cistream;
+ gint priority;
+ GCancellable *cancellable;
+ GSimpleAsyncResult *result;
+} CloseAsyncData;
+
+static void
+close_async_data_free (CloseAsyncData *cad)
+{
+ if (cad->cancellable)
+ g_object_unref (cad->cancellable);
+ g_object_unref (cad->result);
+ g_slice_free (CloseAsyncData, cad);
+}
+
+static void
+base_stream_closed (GObject *source, GAsyncResult *result, gpointer user_data)
+{
+ CloseAsyncData *cad = user_data;
+ GError *error = NULL;
+
+ if (G_INPUT_STREAM_CLASS (soup_client_input_stream_parent_class)->
+ close_finish (G_INPUT_STREAM (cad->cistream), result, &error))
+ g_simple_async_result_set_op_res_gboolean (cad->result, TRUE);
+ else
+ g_simple_async_result_take_error (cad->result, error);
+
+ g_simple_async_result_complete_in_idle (cad->result);
+ close_async_data_free (cad);
+}
+
+static gboolean
+close_async_ready (SoupMessage *msg, gpointer user_data)
+{
+ CloseAsyncData *cad = user_data;
+ GError *error = NULL;
+
+ if (soup_message_io_run_until_finish (cad->cistream->priv->msg,
+ cad->cancellable, &error)) {
+ G_INPUT_STREAM_CLASS (soup_client_input_stream_parent_class)->
+ close_async (G_INPUT_STREAM (cad->cistream),
+ cad->priority,
+ cad->cancellable,
+ base_stream_closed,
+ cad);
+ return FALSE;
+ } else if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+ g_simple_async_result_take_error (cad->result, error);
+ g_simple_async_result_complete_in_idle (cad->result);
+ close_async_data_free (cad);
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+static void
+soup_client_input_stream_close_async (GInputStream *stream,
+ gint priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ CloseAsyncData *cad;
+ GSource *source;
+
+ cad = g_slice_new (CloseAsyncData);
+ cad->cistream = SOUP_CLIENT_INPUT_STREAM (stream);
+ cad->result = g_simple_async_result_new (G_OBJECT (stream),
+ callback, user_data,
+ soup_client_input_stream_close_async);
+ cad->priority = priority;
+ cad->cancellable = cancellable ? g_object_ref (cancellable) : NULL;
+
+ source = soup_message_io_get_source (cad->cistream->priv->msg,
+ cancellable,
+ close_async_ready, cad);
+ g_source_set_priority (source, priority);
+ g_source_attach (source, g_main_context_get_thread_default ());
+ g_source_unref (source);
+}
+
+static gboolean
+soup_client_input_stream_close_finish (GInputStream *stream,
+ GAsyncResult *result,
+ GError **error)
+{
+ GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
+
+ if (g_simple_async_result_propagate_error (simple, error))
+ return FALSE;
+ else
+ return g_simple_async_result_get_op_res_gboolean (simple);
+}
+
+static void
+soup_client_input_stream_class_init (SoupClientInputStreamClass *stream_class)
+{
+ GObjectClass *object_class = G_OBJECT_CLASS (stream_class);
+ GInputStreamClass *input_stream_class = G_INPUT_STREAM_CLASS (stream_class);
+
+ g_type_class_add_private (stream_class, sizeof (SoupClientInputStreamPrivate));
+
+ object_class->set_property = set_property;
+ object_class->get_property = get_property;
+
+ input_stream_class->read_fn = soup_client_input_stream_read_fn;
+ input_stream_class->close_fn = soup_client_input_stream_close_fn;
+ input_stream_class->close_async = soup_client_input_stream_close_async;
+ input_stream_class->close_finish = soup_client_input_stream_close_finish;
+
+ signals[EOF] =
+ g_signal_new ("eof",
+ G_OBJECT_CLASS_TYPE (object_class),
+ G_SIGNAL_RUN_LAST,
+ 0,
+ NULL, NULL,
+ _soup_marshal_NONE__NONE,
+ G_TYPE_NONE, 0);
+
+ g_object_class_install_property (
+ object_class, PROP_MESSAGE,
+ g_param_spec_object ("message",
+ "Message",
+ "Message",
+ SOUP_TYPE_MESSAGE,
+ G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY));
+}
+
+static void
+soup_client_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface,
+ gpointer interface_data)
+{
+ soup_client_input_stream_parent_pollable_interface =
+ g_type_interface_peek_parent (pollable_interface);
+
+ pollable_interface->read_nonblocking = soup_client_input_stream_read_nonblocking;
+}
+
+GInputStream *
+soup_client_input_stream_new (GInputStream *base_stream,
+ SoupMessage *msg)
+{
+ return g_object_new (SOUP_TYPE_CLIENT_INPUT_STREAM,
+ "base-stream", base_stream,
+ "message", msg,
+ NULL);
+}
diff --git a/libsoup/soup-client-input-stream.h b/libsoup/soup-client-input-stream.h
new file mode 100644
index 00000000..098c6073
--- /dev/null
+++ b/libsoup/soup-client-input-stream.h
@@ -0,0 +1,46 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * Copyright 2010-2012 Red Hat, Inc.
+ */
+
+#ifndef SOUP_CLIENT_INPUT_STREAM_H
+#define SOUP_CLIENT_INPUT_STREAM_H 1
+
+#include "soup-types.h"
+#include "soup-filter-input-stream.h"
+
+G_BEGIN_DECLS
+
+#define SOUP_TYPE_CLIENT_INPUT_STREAM (soup_client_input_stream_get_type ())
+#define SOUP_CLIENT_INPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), SOUP_TYPE_CLIENT_INPUT_STREAM, SoupClientInputStream))
+#define SOUP_CLIENT_INPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), SOUP_TYPE_CLIENT_INPUT_STREAM, SoupClientInputStreamClass))
+#define SOUP_IS_CLIENT_INPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), SOUP_TYPE_CLIENT_INPUT_STREAM))
+#define SOUP_IS_CLIENT_INPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((obj), SOUP_TYPE_CLIENT_INPUT_STREAM))
+#define SOUP_CLIENT_INPUT_STREAM_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), SOUP_TYPE_CLIENT_INPUT_STREAM, SoupClientInputStreamClass))
+
+typedef struct _SoupClientInputStreamPrivate SoupClientInputStreamPrivate;
+
+typedef struct {
+ SoupFilterInputStream parent;
+
+ SoupClientInputStreamPrivate *priv;
+} SoupClientInputStream;
+
+typedef struct {
+ SoupFilterInputStreamClass parent_class;
+
+ /* Padding for future expansion */
+ void (*_libsoup_reserved1) (void);
+ void (*_libsoup_reserved2) (void);
+ void (*_libsoup_reserved3) (void);
+ void (*_libsoup_reserved4) (void);
+} SoupClientInputStreamClass;
+
+GType soup_client_input_stream_get_type (void);
+
+GInputStream *soup_client_input_stream_new (GInputStream *base_stream,
+ SoupMessage *msg);
+
+G_END_DECLS
+
+#endif /* SOUP_CLIENT_INPUT_STREAM_H */
diff --git a/libsoup/soup-http-input-stream.c b/libsoup/soup-http-input-stream.c
deleted file mode 100644
index c0337e9b..00000000
--- a/libsoup/soup-http-input-stream.c
+++ /dev/null
@@ -1,793 +0,0 @@
-/* soup-input-stream.c, based on gsocketinputstream.c
- *
- * Copyright (C) 2006-2007, 2010 Red Hat, Inc.
- * Copyright (C) 2010 Igalia, S.L.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General
- * Public License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
- * Boston, MA 02111-1307, USA.
- */
-
-#include <config.h>
-
-#include <string.h>
-
-#include <glib.h>
-#include <gio/gio.h>
-
-#include "soup-http-input-stream.h"
-#include "soup-headers.h"
-#include "soup-content-sniffer.h"
-#include "soup-session.h"
-
-G_DEFINE_TYPE (SoupHTTPInputStream, soup_http_input_stream, G_TYPE_INPUT_STREAM)
-
-typedef void (*SoupHTTPInputStreamCallback)(GInputStream *);
-
-typedef struct {
- SoupSession *session;
- GMainContext *async_context;
- SoupMessage *msg;
- gboolean got_headers, finished;
- goffset offset;
-
- GCancellable *cancellable;
- guint cancel_id;
- SoupHTTPInputStreamCallback got_headers_cb;
- SoupHTTPInputStreamCallback got_chunk_cb;
- SoupHTTPInputStreamCallback finished_cb;
- SoupHTTPInputStreamCallback cancelled_cb;
-
- GQueue *leftover_queue;
-
- guchar *caller_buffer;
- gsize caller_bufsize, caller_nread;
- GAsyncReadyCallback outstanding_callback;
- GSimpleAsyncResult *result;
-
- char *sniffed_content_type;
-} SoupHTTPInputStreamPrivate;
-#define SOUP_HTTP_INPUT_STREAM_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), SOUP_TYPE_HTTP_INPUT_STREAM, SoupHTTPInputStreamPrivate))
-
-
-static gssize soup_http_input_stream_read (GInputStream *stream,
- void *buffer,
- gsize count,
- GCancellable *cancellable,
- GError **error);
-static gboolean soup_http_input_stream_close (GInputStream *stream,
- GCancellable *cancellable,
- GError **error);
-static void soup_http_input_stream_read_async (GInputStream *stream,
- void *buffer,
- gsize count,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer data);
-static gssize soup_http_input_stream_read_finish (GInputStream *stream,
- GAsyncResult *result,
- GError **error);
-static void soup_http_input_stream_close_async (GInputStream *stream,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer data);
-static gboolean soup_http_input_stream_close_finish (GInputStream *stream,
- GAsyncResult *result,
- GError **error);
-
-static void soup_http_input_stream_got_headers (SoupMessage *msg, gpointer stream);
-static void soup_http_input_stream_content_sniffed (SoupMessage *msg, const char *content_type, GHashTable *params, gpointer stream);
-static void soup_http_input_stream_got_chunk (SoupMessage *msg, SoupBuffer *chunk, gpointer stream);
-static void soup_http_input_stream_restarted (SoupMessage *msg, gpointer stream);
-static void soup_http_input_stream_finished (SoupMessage *msg, gpointer stream);
-
-static void
-soup_http_input_stream_finalize (GObject *object)
-{
- SoupHTTPInputStream *stream = SOUP_HTTP_INPUT_STREAM (object);
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
-
- g_object_unref (priv->session);
-
- g_signal_handlers_disconnect_by_func (priv->msg, G_CALLBACK (soup_http_input_stream_got_headers), stream);
- g_signal_handlers_disconnect_by_func (priv->msg, G_CALLBACK (soup_http_input_stream_content_sniffed), stream);
- g_signal_handlers_disconnect_by_func (priv->msg, G_CALLBACK (soup_http_input_stream_got_chunk), stream);
- g_signal_handlers_disconnect_by_func (priv->msg, G_CALLBACK (soup_http_input_stream_restarted), stream);
- g_signal_handlers_disconnect_by_func (priv->msg, G_CALLBACK (soup_http_input_stream_finished), stream);
- g_object_unref (priv->msg);
-
- g_queue_foreach (priv->leftover_queue, (GFunc) soup_buffer_free, NULL);
- g_queue_free (priv->leftover_queue);
-
- g_free (priv->sniffed_content_type);
-
- if (G_OBJECT_CLASS (soup_http_input_stream_parent_class)->finalize)
- (*G_OBJECT_CLASS (soup_http_input_stream_parent_class)->finalize)(object);
-}
-
-static void
-soup_http_input_stream_class_init (SoupHTTPInputStreamClass *klass)
-{
- GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
- GInputStreamClass *stream_class = G_INPUT_STREAM_CLASS (klass);
-
- g_type_class_add_private (klass, sizeof (SoupHTTPInputStreamPrivate));
-
- gobject_class->finalize = soup_http_input_stream_finalize;
-
- stream_class->read_fn = soup_http_input_stream_read;
- stream_class->close_fn = soup_http_input_stream_close;
- stream_class->read_async = soup_http_input_stream_read_async;
- stream_class->read_finish = soup_http_input_stream_read_finish;
- stream_class->close_async = soup_http_input_stream_close_async;
- stream_class->close_finish = soup_http_input_stream_close_finish;
-}
-
-static void
-soup_http_input_stream_init (SoupHTTPInputStream *stream)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
-
- priv->leftover_queue = g_queue_new ();
-}
-
-static void
-soup_http_input_stream_queue_message (SoupHTTPInputStream *stream)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
-
- priv->got_headers = priv->finished = FALSE;
-
- if (soup_session_get_feature_for_message (priv->session, SOUP_TYPE_CONTENT_SNIFFER, priv->msg)) {
- g_signal_connect (priv->msg, "content_sniffed",
- G_CALLBACK (soup_http_input_stream_content_sniffed), stream);
- } else {
- g_signal_connect (priv->msg, "got_headers",
- G_CALLBACK (soup_http_input_stream_got_headers), stream);
- }
- g_signal_connect (priv->msg, "got_chunk",
- G_CALLBACK (soup_http_input_stream_got_chunk), stream);
- g_signal_connect (priv->msg, "restarted",
- G_CALLBACK (soup_http_input_stream_restarted), stream);
- g_signal_connect (priv->msg, "finished",
- G_CALLBACK (soup_http_input_stream_finished), stream);
-
- /* Add an extra ref since soup_session_queue_message steals one */
- g_object_ref (priv->msg);
- soup_session_queue_message (priv->session, priv->msg, NULL, NULL);
-}
-
-/**
- * soup_http_input_stream_new:
- * @session: the #SoupSession to use
- * @msg: the #SoupMessage whose response will be streamed
- *
- * Prepares to send @msg over @session, and returns a #GInputStream
- * that can be used to read the response.
- *
- * @msg may not be sent until the first read call; if you need to look
- * at the status code or response headers before reading the body, you
- * can use soup_http_input_stream_send() or soup_http_input_stream_send_async()
- * to force the message to be sent and the response headers read.
- *
- * If @msg gets a non-2xx result, the first read (or send) will return
- * an error with type %SOUP_HTTP_INPUT_STREAM_HTTP_ERROR.
- *
- * Internally, #SoupHTTPInputStream is implemented using asynchronous I/O,
- * so if you are using the synchronous API (eg,
- * g_input_stream_read()), you should create a new #GMainContext and
- * set it as the %SOUP_SESSION_ASYNC_CONTEXT property on @session. (If
- * you don't, then synchronous #GInputStream calls will cause the main
- * loop to be run recursively.) The async #GInputStream API works fine
- * with %SOUP_SESSION_ASYNC_CONTEXT either set or unset.
- *
- * Returns: a new #GInputStream.
- **/
-GInputStream *
-soup_http_input_stream_new (SoupSession *session, SoupMessage *msg)
-{
- SoupHTTPInputStream *stream;
- SoupHTTPInputStreamPrivate *priv;
-
- g_return_val_if_fail (SOUP_IS_MESSAGE (msg), NULL);
-
- stream = g_object_new (SOUP_TYPE_HTTP_INPUT_STREAM, NULL);
- priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
-
- priv->session = g_object_ref (session);
- priv->async_context = soup_session_get_async_context (session);
- priv->msg = g_object_ref (msg);
-
- return (GInputStream *)stream;
-}
-
-static void
-soup_http_input_stream_got_headers (SoupMessage *msg, gpointer stream)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
-
- /* If the message is expected to be restarted then we read the
- * whole message first and hope it does get restarted, but
- * if it doesn't, then we stream the body belatedly.
- */
- if (msg->status_code == SOUP_STATUS_UNAUTHORIZED ||
- msg->status_code == SOUP_STATUS_PROXY_UNAUTHORIZED ||
- soup_session_would_redirect (priv->session, msg))
- return;
-
- priv->got_headers = TRUE;
- if (!priv->caller_buffer) {
- /* Not ready to read the body yet */
- soup_session_pause_message (priv->session, msg);
- }
-
- if (priv->got_headers_cb)
- priv->got_headers_cb (stream);
-}
-
-static void
-soup_http_input_stream_content_sniffed (SoupMessage *msg, const char *content_type,
- GHashTable *params, gpointer stream)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
- GString *sniffed_type;
-
- sniffed_type = g_string_new (content_type);
- if (params) {
- GHashTableIter iter;
- gpointer key, value;
-
- g_hash_table_iter_init (&iter, params);
- while (g_hash_table_iter_next (&iter, &key, &value)) {
- g_string_append (sniffed_type, "; ");
- soup_header_g_string_append_param (sniffed_type, key, value);
- }
- }
- g_free (priv->sniffed_content_type);
- priv->sniffed_content_type = g_string_free (sniffed_type, FALSE);
-
- soup_http_input_stream_got_headers (msg, stream);
-}
-
-static void
-soup_http_input_stream_got_chunk (SoupMessage *msg, SoupBuffer *chunk_buffer,
- gpointer stream)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
- const gchar *chunk = chunk_buffer->data;
- gsize chunk_size = chunk_buffer->length;
-
- /* Copy what we can into priv->caller_buffer */
- if (priv->caller_bufsize > priv->caller_nread && priv->leftover_queue->length == 0) {
- gsize nread = MIN (chunk_size, priv->caller_bufsize - priv->caller_nread);
-
- memcpy (priv->caller_buffer + priv->caller_nread, chunk, nread);
- priv->caller_nread += nread;
- priv->offset += nread;
- chunk += nread;
- chunk_size -= nread;
- }
-
- if (chunk_size > 0) {
- if (priv->leftover_queue->length > 0) {
- g_queue_push_tail (priv->leftover_queue, soup_buffer_copy (chunk_buffer));
- } else {
- g_queue_push_head (priv->leftover_queue,
- soup_buffer_new_subbuffer (chunk_buffer,
- chunk_buffer->length - chunk_size,
- chunk_size));
- }
- }
-
- if (priv->got_headers) {
- soup_session_pause_message (priv->session, msg);
- if (priv->got_chunk_cb)
- priv->got_chunk_cb (stream);
- }
-}
-
-static void
-soup_http_input_stream_restarted (SoupMessage *msg, gpointer stream)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
- GList *q;
-
- /* Throw away any pending read data */
- for (q = priv->leftover_queue->head; q; q = q->next)
- soup_buffer_free (q->data);
- g_queue_clear (priv->leftover_queue);
-}
-
-static void
-soup_http_input_stream_finished (SoupMessage *msg, gpointer stream)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
-
- priv->got_headers = TRUE;
- priv->finished = TRUE;
-
- if (priv->finished_cb)
- priv->finished_cb (stream);
-}
-
-static void
-soup_http_input_stream_cancelled (GCancellable *cancellable,
- gpointer user_data)
-{
- SoupHTTPInputStream *stream = user_data;
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
-
- g_signal_handler_disconnect (cancellable, priv->cancel_id);
- priv->cancel_id = 0;
-
- soup_session_pause_message (priv->session, priv->msg);
- if (priv->cancelled_cb)
- priv->cancelled_cb (G_INPUT_STREAM (stream));
-}
-
-static void
-soup_http_input_stream_prepare_for_io (GInputStream *stream,
- GCancellable *cancellable,
- guchar *buffer,
- gsize count)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
-
- priv->cancellable = cancellable;
- if (cancellable) {
- priv->cancel_id = g_signal_connect (cancellable, "cancelled",
- G_CALLBACK (soup_http_input_stream_cancelled),
- stream);
- }
-
- priv->caller_buffer = buffer;
- priv->caller_bufsize = count;
- priv->caller_nread = 0;
-
- if (priv->got_headers)
- soup_session_unpause_message (priv->session, priv->msg);
-}
-
-static void
-soup_http_input_stream_done_io (GInputStream *stream)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
-
- if (priv->cancel_id) {
- g_signal_handler_disconnect (priv->cancellable, priv->cancel_id);
- priv->cancel_id = 0;
- }
- priv->cancellable = NULL;
-
- priv->caller_buffer = NULL;
- priv->caller_bufsize = 0;
-}
-
-static gboolean
-set_error_if_http_failed (SoupMessage *msg, GError **error)
-{
- if (SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code)) {
- g_set_error_literal (error, SOUP_HTTP_ERROR,
- msg->status_code, msg->reason_phrase);
- return TRUE;
- }
- return FALSE;
-}
-
-static gsize
-read_from_leftover (SoupHTTPInputStreamPrivate *priv,
- gpointer buffer, gsize bufsize)
-{
- gsize nread;
- SoupBuffer *soup_buffer = (SoupBuffer *) g_queue_peek_head (priv->leftover_queue);
- gboolean fits_in_buffer = soup_buffer->length <= bufsize;
-
- nread = fits_in_buffer ? soup_buffer->length : bufsize;
- memcpy (buffer, soup_buffer->data, nread);
-
- g_queue_pop_head (priv->leftover_queue);
- if (!fits_in_buffer)
- g_queue_push_head (priv->leftover_queue,
- soup_buffer_new_subbuffer (soup_buffer, nread, soup_buffer->length - nread));
- soup_buffer_free (soup_buffer);
-
- priv->offset += nread;
- return nread;
-}
-
-/* This does the work of soup_http_input_stream_send(), assuming that the
- * GInputStream pending flag has already been set. It is also used by
- * soup_http_input_stream_send_async() in some circumstances.
- */
-static gboolean
-soup_http_input_stream_send_internal (GInputStream *stream,
- GCancellable *cancellable,
- GError **error)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
-
- soup_http_input_stream_prepare_for_io (stream, cancellable, NULL, 0);
- while (!priv->finished && !priv->got_headers &&
- !g_cancellable_is_cancelled (cancellable))
- g_main_context_iteration (priv->async_context, TRUE);
- soup_http_input_stream_done_io (stream);
-
- if (g_cancellable_set_error_if_cancelled (cancellable, error))
- return FALSE;
- else if (set_error_if_http_failed (priv->msg, error))
- return FALSE;
- return TRUE;
-}
-
-static void
-send_sync_finished (GInputStream *stream)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
-
- priv->got_headers_cb = NULL;
- priv->finished_cb = NULL;
-
- /* Wake up the main context iteration */
- soup_add_completion (priv->async_context, NULL, NULL);
-}
-
-/**
- * soup_http_input_stream_send:
- * @httpstream: a #SoupHTTPInputStream
- * @cancellable: optional #GCancellable object, %NULL to ignore.
- * @error: location to store the error occuring, or %NULL to ignore
- *
- * Synchronously sends the HTTP request associated with @stream, and
- * reads the response headers. Call this after soup_http_input_stream_new()
- * and before the first g_input_stream_read() if you want to check the
- * HTTP status code before you start reading.
- *
- * Return value: %TRUE if msg has a successful (2xx) status, %FALSE if
- * not.
- **/
-gboolean
-soup_http_input_stream_send (SoupHTTPInputStream *httpstream,
- GCancellable *cancellable,
- GError **error)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (httpstream);
- GInputStream *istream = (GInputStream *)httpstream;
- gboolean result;
-
- g_return_val_if_fail (SOUP_IS_HTTP_INPUT_STREAM (httpstream), FALSE);
-
- soup_http_input_stream_queue_message (httpstream);
-
- if (!g_input_stream_set_pending (istream, error))
- return FALSE;
-
- priv->got_headers_cb = send_sync_finished;
- priv->finished_cb = send_sync_finished;
-
- result = soup_http_input_stream_send_internal (istream, cancellable, error);
- g_input_stream_clear_pending (istream);
-
- return result;
-}
-
-static gssize
-soup_http_input_stream_read (GInputStream *stream,
- void *buffer,
- gsize count,
- GCancellable *cancellable,
- GError **error)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
-
- /* If there is data leftover from a previous read, return it. */
- if (priv->leftover_queue->length)
- return read_from_leftover (priv, buffer, count);
-
- if (priv->finished)
- return 0;
-
- /* No leftover data, accept one chunk from the network */
- soup_http_input_stream_prepare_for_io (stream, cancellable, buffer, count);
- while (!priv->finished && priv->caller_nread == 0 &&
- !g_cancellable_is_cancelled (cancellable))
- g_main_context_iteration (priv->async_context, TRUE);
- soup_http_input_stream_done_io (stream);
-
- if (priv->caller_nread > 0)
- return priv->caller_nread;
-
- if (g_cancellable_set_error_if_cancelled (cancellable, error))
- return -1;
- else if (set_error_if_http_failed (priv->msg, error))
- return -1;
- else
- return 0;
-}
-
-static gboolean
-soup_http_input_stream_close (GInputStream *stream,
- GCancellable *cancellable,
- GError **error)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
-
- if (!priv->finished) {
- soup_session_unpause_message (priv->session, priv->msg);
- soup_session_cancel_message (priv->session, priv->msg, SOUP_STATUS_CANCELLED);
- }
-
- return TRUE;
-}
-
-static void
-wrapper_callback (GObject *source_object, GAsyncResult *res,
- gpointer user_data)
-{
- GInputStream *stream = G_INPUT_STREAM (source_object);
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
-
- g_input_stream_clear_pending (stream);
- if (priv->outstanding_callback)
- (*priv->outstanding_callback) (source_object, res, user_data);
- priv->outstanding_callback = NULL;
- g_object_unref (stream);
-}
-
-static void
-send_async_finished (GInputStream *stream)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
- GSimpleAsyncResult *result;
- GError *error = NULL;
-
- if (!g_cancellable_set_error_if_cancelled (priv->cancellable, &error))
- set_error_if_http_failed (priv->msg, &error);
-
- priv->got_headers_cb = NULL;
- priv->finished_cb = NULL;
- soup_http_input_stream_done_io (stream);
-
- result = priv->result;
- priv->result = NULL;
-
- g_simple_async_result_set_op_res_gboolean (result, error == NULL);
- if (error)
- g_simple_async_result_take_error (result, error);
- g_simple_async_result_complete (result);
- g_object_unref (result);
-}
-
-static void
-soup_http_input_stream_send_async_internal (GInputStream *stream,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer user_data)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
-
- g_return_if_fail (priv->async_context == g_main_context_get_thread_default ());
-
- g_object_ref (stream);
- priv->outstanding_callback = callback;
-
- priv->got_headers_cb = send_async_finished;
- priv->finished_cb = send_async_finished;
-
- soup_http_input_stream_prepare_for_io (stream, cancellable, NULL, 0);
- priv->result = g_simple_async_result_new (G_OBJECT (stream),
- wrapper_callback, user_data,
- soup_http_input_stream_send_async);
-}
-
-/**
- * soup_http_input_stream_send_async:
- * @httpstream: a #SoupHTTPInputStream
- * @io_priority: the io priority of the request.
- * @cancellable: optional #GCancellable object, %NULL to ignore.
- * @callback: callback to call when the request is satisfied
- * @user_data: the data to pass to callback function
- *
- * Asynchronously sends the HTTP request associated with @stream, and
- * reads the response headers. Call this after soup_http_input_stream_new()
- * and before the first g_input_stream_read_async() if you want to
- * check the HTTP status code before you start reading.
- **/
-void
-soup_http_input_stream_send_async (SoupHTTPInputStream *httpstream,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer user_data)
-{
- GInputStream *istream = (GInputStream *)httpstream;
- GError *error = NULL;
-
- g_return_if_fail (SOUP_IS_HTTP_INPUT_STREAM (httpstream));
-
- soup_http_input_stream_queue_message (httpstream);
-
- if (!g_input_stream_set_pending (istream, &error)) {
- g_simple_async_report_take_gerror_in_idle (G_OBJECT (httpstream),
- callback,
- user_data,
- error);
- return;
- }
- soup_http_input_stream_send_async_internal (istream, io_priority, cancellable,
- callback, user_data);
-}
-
-/**
- * soup_http_input_stream_send_finish:
- * @httpstream: a #SoupHTTPInputStream
- * @result: a #GAsyncResult.
- * @error: a #GError location to store the error occuring, or %NULL to
- * ignore.
- *
- * Finishes a soup_http_input_stream_send_async() operation.
- *
- * Return value: %TRUE if the message was sent successfully and
- * received a successful status code, %FALSE if not.
- **/
-gboolean
-soup_http_input_stream_send_finish (SoupHTTPInputStream *httpstream,
- GAsyncResult *result,
- GError **error)
-{
- GSimpleAsyncResult *simple;
-
- g_return_val_if_fail (G_IS_SIMPLE_ASYNC_RESULT (result), FALSE);
- simple = G_SIMPLE_ASYNC_RESULT (result);
-
- g_return_val_if_fail (g_simple_async_result_get_source_tag (simple) == soup_http_input_stream_send_async, FALSE);
-
- if (g_simple_async_result_propagate_error (simple, error))
- return FALSE;
-
- return g_simple_async_result_get_op_res_gboolean (simple);
-}
-
-static void
-read_async_done (GInputStream *stream)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
- GSimpleAsyncResult *result;
- GError *error = NULL;
-
- result = priv->result;
- priv->result = NULL;
-
- if (g_cancellable_set_error_if_cancelled (priv->cancellable, &error) ||
- set_error_if_http_failed (priv->msg, &error))
- g_simple_async_result_take_error (result, error);
- else
- g_simple_async_result_set_op_res_gssize (result, priv->caller_nread);
-
- priv->got_chunk_cb = NULL;
- priv->finished_cb = NULL;
- priv->cancelled_cb = NULL;
- soup_http_input_stream_done_io (stream);
-
- g_simple_async_result_complete (result);
- g_object_unref (result);
-}
-
-static void
-soup_http_input_stream_read_async (GInputStream *stream,
- void *buffer,
- gsize count,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer user_data)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
- GSimpleAsyncResult *result;
-
- g_return_if_fail (priv->async_context == g_main_context_get_thread_default ());
-
- result = g_simple_async_result_new (G_OBJECT (stream),
- callback, user_data,
- soup_http_input_stream_read_async);
-
- if (priv->leftover_queue->length) {
- gsize nread = read_from_leftover (priv, buffer, count);
- g_simple_async_result_set_op_res_gssize (result, nread);
- g_simple_async_result_complete_in_idle (result);
- g_object_unref (result);
- return;
- }
-
- if (priv->finished) {
- g_simple_async_result_set_op_res_gssize (result, 0);
- g_simple_async_result_complete_in_idle (result);
- g_object_unref (result);
- return;
- }
-
- priv->result = result;
-
- priv->got_chunk_cb = read_async_done;
- priv->finished_cb = read_async_done;
- priv->cancelled_cb = read_async_done;
- soup_http_input_stream_prepare_for_io (stream, cancellable, buffer, count);
-}
-
-static gssize
-soup_http_input_stream_read_finish (GInputStream *stream,
- GAsyncResult *result,
- GError **error)
-{
- GSimpleAsyncResult *simple;
-
- g_return_val_if_fail (G_IS_SIMPLE_ASYNC_RESULT (result), -1);
- simple = G_SIMPLE_ASYNC_RESULT (result);
- g_return_val_if_fail (g_simple_async_result_get_source_tag (simple) == soup_http_input_stream_read_async, -1);
-
- return g_simple_async_result_get_op_res_gssize (simple);
-}
-
-static void
-soup_http_input_stream_close_async (GInputStream *stream,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer user_data)
-{
- GSimpleAsyncResult *result;
- gboolean success;
- GError *error = NULL;
-
- result = g_simple_async_result_new (G_OBJECT (stream),
- callback, user_data,
- soup_http_input_stream_close_async);
- success = soup_http_input_stream_close (stream, cancellable, &error);
- g_simple_async_result_set_op_res_gboolean (result, success);
- if (error)
- g_simple_async_result_take_error (result, error);
-
- g_simple_async_result_complete_in_idle (result);
- g_object_unref (result);
-}
-
-static gboolean
-soup_http_input_stream_close_finish (GInputStream *stream,
- GAsyncResult *result,
- GError **error)
-{
- /* Failures handled in generic close_finish code */
- return TRUE;
-}
-
-SoupMessage *
-soup_http_input_stream_get_message (SoupHTTPInputStream *httpstream)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (httpstream);
- return priv->msg ? g_object_ref (priv->msg) : NULL;
-}
-
-const char *
-soup_http_input_stream_get_content_type (SoupHTTPInputStream *httpstream)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (httpstream);
-
- if (priv->sniffed_content_type)
- return priv->sniffed_content_type;
- else
- return soup_message_headers_get_content_type (priv->msg->response_headers, NULL);
-
-}
diff --git a/libsoup/soup-http-input-stream.h b/libsoup/soup-http-input-stream.h
deleted file mode 100644
index b6c598cb..00000000
--- a/libsoup/soup-http-input-stream.h
+++ /dev/null
@@ -1,79 +0,0 @@
-/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
-/*
- * Copyright (C) 2006, 2007, 2009, 2010 Red Hat, Inc.
- * Copyright (C) 2010 Igalia, S.L.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General
- * Public License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
- * Boston, MA 02111-1307, USA.
- */
-
-#ifndef __SOUP_HTTP_INPUT_STREAM_H__
-#define __SOUP_HTTP_INPUT_STREAM_H__
-
-#include <gio/gio.h>
-#include <libsoup/soup-types.h>
-
-G_BEGIN_DECLS
-
-#define SOUP_TYPE_HTTP_INPUT_STREAM (soup_http_input_stream_get_type ())
-#define SOUP_HTTP_INPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), SOUP_TYPE_HTTP_INPUT_STREAM, SoupHTTPInputStream))
-#define SOUP_HTTP_INPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST ((k), SOUP_TYPE_HTTP_INPUT_STREAM, SoupHTTPInputStreamClass))
-#define SOUP_IS_HTTP_INPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), SOUP_TYPE_HTTP_INPUT_STREAM))
-#define SOUP_IS_HTTP_INPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), SOUP_TYPE_HTTP_INPUT_STREAM))
-#define SOUP_HTTP_INPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), SOUP_TYPE_HTTP_INPUT_STREAM, SoupHTTPInputStreamClass))
-
-typedef struct SoupHTTPInputStream SoupHTTPInputStream;
-typedef struct SoupHTTPInputStreamClass SoupHTTPInputStreamClass;
-
-struct SoupHTTPInputStream {
- GInputStream parent;
-};
-
-struct SoupHTTPInputStreamClass {
- GInputStreamClass parent_class;
-
- /* Padding for future expansion */
- void (*_g_reserved1)(void);
- void (*_g_reserved2)(void);
- void (*_g_reserved3)(void);
- void (*_g_reserved4)(void);
- void (*_g_reserved5)(void);
-};
-
-GType soup_http_input_stream_get_type (void) G_GNUC_CONST;
-
-GInputStream *soup_http_input_stream_new (SoupSession *session,
- SoupMessage *msg);
-
-gboolean soup_http_input_stream_send (SoupHTTPInputStream *httpstream,
- GCancellable *cancellable,
- GError **error);
-
-void soup_http_input_stream_send_async (SoupHTTPInputStream *httpstream,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer user_data);
-gboolean soup_http_input_stream_send_finish (SoupHTTPInputStream *httpstream,
- GAsyncResult *result,
- GError **error);
-
-SoupMessage *soup_http_input_stream_get_message (SoupHTTPInputStream *httpstream);
-
-const char *soup_http_input_stream_get_content_type (SoupHTTPInputStream *httpstream);
-
-G_END_DECLS
-
-#endif /* __SOUP_HTTP_INPUT_STREAM_H__ */
diff --git a/libsoup/soup-message-io.c b/libsoup/soup-message-io.c
index e71a8adf..4e5c35f7 100644
--- a/libsoup/soup-message-io.c
+++ b/libsoup/soup-message-io.c
@@ -12,8 +12,11 @@
#include <stdlib.h>
#include <string.h>
+#include <glib/gi18n-lib.h>
+
#include "soup-body-input-stream.h"
#include "soup-body-output-stream.h"
+#include "soup-client-input-stream.h"
#include "soup-connection.h"
#include "soup-content-sniffer-stream.h"
#include "soup-converter-wrapper.h"
@@ -31,6 +34,7 @@ typedef enum {
typedef enum {
SOUP_MESSAGE_IO_STATE_NOT_STARTED,
+ SOUP_MESSAGE_IO_STATE_ANY = SOUP_MESSAGE_IO_STATE_NOT_STARTED,
SOUP_MESSAGE_IO_STATE_HEADERS,
SOUP_MESSAGE_IO_STATE_BLOCKING,
SOUP_MESSAGE_IO_STATE_BODY_START,
@@ -45,6 +49,9 @@ typedef enum {
(state != SOUP_MESSAGE_IO_STATE_NOT_STARTED && \
state != SOUP_MESSAGE_IO_STATE_BLOCKING && \
state != SOUP_MESSAGE_IO_STATE_DONE)
+#define SOUP_MESSAGE_IO_STATE_POLLABLE(state) \
+ (SOUP_MESSAGE_IO_STATE_ACTIVE (state) && \
+ state != SOUP_MESSAGE_IO_STATE_BODY_DONE)
typedef struct {
SoupMessageQueueItem *item;
@@ -136,11 +143,13 @@ soup_message_io_stop (SoupMessage *msg)
if (io->io_source) {
g_source_destroy (io->io_source);
+ g_source_unref (io->io_source);
io->io_source = NULL;
}
if (io->unpause_source) {
g_source_destroy (io->unpause_source);
+ g_source_unref (io->unpause_source);
io->unpause_source = NULL;
}
@@ -215,8 +224,12 @@ read_headers (SoupMessage *msg, GCancellable *cancellable, GError **error)
&got_lf,
cancellable, error);
io->read_header_buf->len = old_len + MAX (nread, 0);
- if (nread == 0)
- io_error (io->sock, msg, NULL);
+ if (nread == 0) {
+ soup_message_set_status (msg, SOUP_STATUS_MALFORMED);
+ g_set_error_literal (error, G_IO_ERROR,
+ G_IO_ERROR_PARTIAL_INPUT,
+ _("Connection terminated unexpectedly"));
+ }
if (nread <= 0)
return FALSE;
@@ -257,9 +270,10 @@ setup_body_istream (SoupMessage *msg)
GInputStream *filter;
GSList *d;
- io->body_istream = soup_body_input_stream_new (io->istream,
- io->read_encoding,
- io->read_length);
+ io->body_istream =
+ soup_body_input_stream_new (io->istream,
+ io->read_encoding,
+ io->read_length);
for (d = priv->decoders; d; d = d->next) {
decoder = d->data;
@@ -321,11 +335,6 @@ io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
gssize nwrote;
switch (io->write_state) {
- case SOUP_MESSAGE_IO_STATE_NOT_STARTED:
- case SOUP_MESSAGE_IO_STATE_BLOCKING:
- return FALSE;
-
-
case SOUP_MESSAGE_IO_STATE_HEADERS:
if (!io->write_buf->len) {
io->get_headers_cb (msg, io->write_buf,
@@ -416,6 +425,7 @@ io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
if (!io->write_chunk) {
io->write_chunk = soup_message_body_get_chunk (io->write_body, io->write_body_offset);
if (!io->write_chunk) {
+ g_return_val_if_fail (!io->item || !io->item->new_api, FALSE);
soup_message_io_pause (msg);
return FALSE;
}
@@ -486,7 +496,6 @@ io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
break;
- case SOUP_MESSAGE_IO_STATE_DONE:
default:
g_return_val_if_reached (FALSE);
}
@@ -511,11 +520,6 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
guint status;
switch (io->read_state) {
- case SOUP_MESSAGE_IO_STATE_NOT_STARTED:
- case SOUP_MESSAGE_IO_STATE_BLOCKING:
- return FALSE;
-
-
case SOUP_MESSAGE_IO_STATE_HEADERS:
if (!read_headers (msg, cancellable, error))
return FALSE;
@@ -628,6 +632,7 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
if (priv->chunk_allocator) {
buffer = priv->chunk_allocator (msg, io->read_length, priv->chunk_allocator_data);
if (!buffer) {
+ g_return_val_if_fail (!io->item || !io->item->new_api, FALSE);
soup_message_io_pause (msg);
return FALSE;
}
@@ -675,7 +680,6 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
break;
- case SOUP_MESSAGE_IO_STATE_DONE:
default:
g_return_val_if_reached (FALSE);
}
@@ -683,43 +687,160 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
return TRUE;
}
-static GSource *
+typedef struct {
+ GSource source;
+ SoupMessage *msg;
+} SoupMessageSource;
+
+static gboolean
+message_source_prepare (GSource *source,
+ gint *timeout)
+{
+ *timeout = -1;
+ return FALSE;
+}
+
+static gboolean
+message_source_check (GSource *source)
+{
+ return FALSE;
+}
+
+static gboolean
+message_source_dispatch (GSource *source,
+ GSourceFunc callback,
+ gpointer user_data)
+{
+ SoupMessageSourceFunc func = (SoupMessageSourceFunc)callback;
+ SoupMessageSource *message_source = (SoupMessageSource *)source;
+
+ return (*func) (message_source->msg, user_data);
+}
+
+static void
+message_source_finalize (GSource *source)
+{
+ SoupMessageSource *message_source = (SoupMessageSource *)source;
+
+ g_object_unref (message_source->msg);
+}
+
+static gboolean
+message_source_closure_callback (SoupMessage *msg,
+ gpointer data)
+{
+ GClosure *closure = data;
+
+ GValue param = G_VALUE_INIT;
+ GValue result_value = G_VALUE_INIT;
+ gboolean result;
+
+ g_value_init (&result_value, G_TYPE_BOOLEAN);
+
+ g_value_init (&param, SOUP_TYPE_MESSAGE);
+ g_value_set_object (&param, msg);
+
+ g_closure_invoke (closure, &result_value, 1, &param, NULL);
+
+ result = g_value_get_boolean (&result_value);
+ g_value_unset (&result_value);
+ g_value_unset (&param);
+
+ return result;
+}
+
+static GSourceFuncs message_source_funcs =
+{
+ message_source_prepare,
+ message_source_check,
+ message_source_dispatch,
+ message_source_finalize,
+ (GSourceFunc)message_source_closure_callback,
+ (GSourceDummyMarshal)g_cclosure_marshal_generic,
+};
+
+GSource *
soup_message_io_get_source (SoupMessage *msg, GCancellable *cancellable,
- GSourceFunc callback, gpointer user_data)
+ SoupMessageSourceFunc callback, gpointer user_data)
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io = priv->io_data;
- GSource *source;
-
- if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state)) {
- source = g_pollable_input_stream_create_source (
- G_POLLABLE_INPUT_STREAM (io->istream), cancellable);
- } else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state)) {
- source = g_pollable_output_stream_create_source (
- G_POLLABLE_OUTPUT_STREAM (io->ostream), cancellable);
+ GSource *base_source, *source;
+ SoupMessageSource *message_source;
+
+ if (io && SOUP_MESSAGE_IO_STATE_POLLABLE (io->read_state)) {
+ GPollableInputStream *istream;
+
+ if (io->body_istream)
+ istream = G_POLLABLE_INPUT_STREAM (io->body_istream);
+ else
+ istream = G_POLLABLE_INPUT_STREAM (io->istream);
+ base_source = g_pollable_input_stream_create_source (istream, cancellable);
+ } else if (io && SOUP_MESSAGE_IO_STATE_POLLABLE (io->write_state)) {
+ GPollableOutputStream *ostream;
+
+ if (io->body_ostream)
+ ostream = G_POLLABLE_OUTPUT_STREAM (io->body_ostream);
+ else
+ ostream = G_POLLABLE_OUTPUT_STREAM (io->ostream);
+ base_source = g_pollable_output_stream_create_source (ostream, cancellable);
} else
- g_return_val_if_reached (NULL);
-
- g_source_set_callback (source, callback, user_data, NULL);
+ base_source = g_timeout_source_new (0);
+
+ g_source_set_dummy_callback (base_source);
+ source = g_source_new (&message_source_funcs,
+ sizeof (SoupMessageSource));
+ g_source_set_name (source, "SoupMessageSource");
+ message_source = (SoupMessageSource *)source;
+ message_source->msg = g_object_ref (msg);
+
+ g_source_add_child_source (source, base_source);
+ g_source_unref (base_source);
+ g_source_set_callback (source, (GSourceFunc) callback, user_data, NULL);
return source;
}
-static gboolean io_run (GObject *stream, SoupMessage *msg);
-
-static void
-setup_io_source (SoupMessage *msg)
+static gboolean
+io_run_until (SoupMessage *msg,
+ SoupMessageIOState read_state, SoupMessageIOState write_state,
+ GCancellable *cancellable, GError **error)
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io = priv->io_data;
+ gboolean progress = TRUE, done;
+
+ if (g_cancellable_set_error_if_cancelled (cancellable, error))
+ return FALSE;
+ else if (!io) {
+ g_set_error_literal (error, G_IO_ERROR,
+ G_IO_ERROR_CANCELLED,
+ _("Operation was cancelled"));
+ return FALSE;
+ }
+
+ g_object_ref (msg);
- io->io_source = soup_message_io_get_source (msg, NULL,
- (GSourceFunc)io_run, msg);
- g_source_attach (io->io_source, io->async_context);
- g_source_unref (io->io_source);
+ while (progress && priv->io_data == io && !io->paused &&
+ (io->read_state < read_state || io->write_state < write_state)) {
+
+ if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
+ progress = io_read (msg, cancellable, error);
+ else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state))
+ progress = io_write (msg, cancellable, error);
+ else
+ progress = FALSE;
+ }
+
+ done = (priv->io_data == io &&
+ io->read_state >= read_state &&
+ io->write_state >= write_state);
+
+ g_object_unref (msg);
+ return done;
}
static gboolean
-io_run (GObject *stream, SoupMessage *msg)
+io_run (SoupMessage *msg, gpointer user_data)
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io = priv->io_data;
@@ -727,36 +848,99 @@ io_run (GObject *stream, SoupMessage *msg)
if (io->io_source) {
g_source_destroy (io->io_source);
+ g_source_unref (io->io_source);
io->io_source = NULL;
}
g_object_ref (msg);
- while (priv->io_data == io && !io->paused) {
- gboolean progress = FALSE;
+ if (io_run_until (msg,
+ SOUP_MESSAGE_IO_STATE_DONE,
+ SOUP_MESSAGE_IO_STATE_DONE,
+ io->cancellable, &error)) {
+ soup_message_io_finished (msg);
+ } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+ g_clear_error (&error);
+ io->io_source = soup_message_io_get_source (msg, NULL, io_run, msg);
+ g_source_attach (io->io_source, io->async_context);
+ } else if (error) {
+ io_error (io->sock, msg, error);
+ }
- if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
- progress = io_read (msg, io->cancellable, &error);
- else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state))
- progress = io_write (msg, io->cancellable, &error);
+ g_object_unref (msg);
+ return FALSE;
+}
- if (!progress)
- break;
- }
+gboolean
+soup_message_io_run_until_write (SoupMessage *msg,
+ GCancellable *cancellable, GError **error)
+{
+ return io_run_until (msg,
+ SOUP_MESSAGE_IO_STATE_ANY,
+ SOUP_MESSAGE_IO_STATE_BODY,
+ cancellable, error);
+}
- if (error) {
- if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
- g_clear_error (&error);
- setup_io_source (msg);
- } else
- io_error (io->sock, msg, error);
- } else if (priv->io_data == io &&
- io->read_state == SOUP_MESSAGE_IO_STATE_DONE &&
- io->write_state == SOUP_MESSAGE_IO_STATE_DONE)
- soup_message_io_finished (msg);
+gboolean
+soup_message_io_run_until_read (SoupMessage *msg,
+ GCancellable *cancellable, GError **error)
+{
+ return io_run_until (msg,
+ SOUP_MESSAGE_IO_STATE_BODY,
+ SOUP_MESSAGE_IO_STATE_ANY,
+ cancellable, error);
+}
+gboolean
+soup_message_io_run_until_finish (SoupMessage *msg,
+ GCancellable *cancellable,
+ GError **error)
+{
+ g_object_ref (msg);
+
+ if (!io_run_until (msg,
+ SOUP_MESSAGE_IO_STATE_DONE,
+ SOUP_MESSAGE_IO_STATE_DONE,
+ cancellable, error))
+ return FALSE;
+
+ soup_message_io_finished (msg);
g_object_unref (msg);
- return FALSE;
+ return TRUE;
+}
+
+static void
+client_stream_eof (SoupClientInputStream *stream, gpointer user_data)
+{
+ SoupMessage *msg = user_data;
+ SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
+ SoupMessageIOData *io = priv->io_data;
+
+ if (io && io->read_state == SOUP_MESSAGE_IO_STATE_BODY)
+ io->read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
+}
+
+GInputStream *
+soup_message_io_get_response_istream (SoupMessage *msg,
+ GError **error)
+{
+ SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
+ SoupMessageIOData *io = priv->io_data;
+ GInputStream *client_stream;
+
+ g_return_val_if_fail (io->mode == SOUP_MESSAGE_IO_CLIENT, NULL);
+
+ if (SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code)) {
+ g_set_error_literal (error, SOUP_HTTP_ERROR,
+ msg->status_code, msg->reason_phrase);
+ return NULL;
+ }
+
+ client_stream = soup_client_input_stream_new (io->body_istream, msg);
+ g_signal_connect (client_stream, "eof",
+ G_CALLBACK (client_stream_eof), msg);
+
+ return client_stream;
}
@@ -839,7 +1023,8 @@ soup_message_io_client (SoupMessageQueueItem *item,
io->write_body = item->msg->request_body;
io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
- io_run (NULL, item->msg);
+ if (!item->new_api)
+ io_run (item->msg, NULL);
}
void
@@ -860,7 +1045,7 @@ soup_message_io_server (SoupMessage *msg, SoupSocket *sock,
io->write_body = msg->response_body;
io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
- io_run (NULL, msg);
+ io_run (msg, NULL);
}
void
@@ -873,6 +1058,7 @@ soup_message_io_pause (SoupMessage *msg)
if (io->io_source) {
g_source_destroy (io->io_source);
+ g_source_unref (io->io_source);
io->io_source = NULL;
}
@@ -897,7 +1083,7 @@ io_unpause_internal (gpointer msg)
if (io->io_source)
return FALSE;
- io_run (NULL, msg);
+ io_run (msg, NULL);
return FALSE;
}
diff --git a/libsoup/soup-message-private.h b/libsoup/soup-message-private.h
index 56253546..0d348333 100644
--- a/libsoup/soup-message-private.h
+++ b/libsoup/soup-message-private.h
@@ -94,6 +94,25 @@ void soup_message_io_pause (SoupMessage *msg);
void soup_message_io_unpause (SoupMessage *msg);
gboolean soup_message_io_in_progress (SoupMessage *msg);
+gboolean soup_message_io_run_until_write (SoupMessage *msg,
+ GCancellable *cancellable,
+ GError **error);
+gboolean soup_message_io_run_until_read (SoupMessage *msg,
+ GCancellable *cancellable,
+ GError **error);
+gboolean soup_message_io_run_until_finish (SoupMessage *msg,
+ GCancellable *cancellable,
+ GError **error);
+
+typedef gboolean (*SoupMessageSourceFunc) (SoupMessage *, gpointer);
+GSource *soup_message_io_get_source (SoupMessage *msg,
+ GCancellable *cancellable,
+ SoupMessageSourceFunc callback,
+ gpointer user_data);
+
+GInputStream *soup_message_io_get_response_istream (SoupMessage *msg,
+ GError **error);
+
gboolean soup_message_disables_feature (SoupMessage *msg,
gpointer feature);
diff --git a/libsoup/soup-message-queue.c b/libsoup/soup-message-queue.c
index 7b1e5ddf..97150802 100644
--- a/libsoup/soup-message-queue.c
+++ b/libsoup/soup-message-queue.c
@@ -184,6 +184,8 @@ soup_message_queue_item_unref (SoupMessageQueueItem *item)
g_object_unref (item->proxy_addr);
if (item->proxy_uri)
soup_uri_free (item->proxy_uri);
+ if (item->result)
+ g_object_unref (item->result);
soup_message_queue_item_set_connection (item, NULL);
g_slice_free (SoupMessageQueueItem, item);
}
diff --git a/libsoup/soup-message-queue.h b/libsoup/soup-message-queue.h
index a1ae663d..a9242a1e 100644
--- a/libsoup/soup-message-queue.h
+++ b/libsoup/soup-message-queue.h
@@ -45,8 +45,10 @@ struct _SoupMessageQueueItem {
SoupAddress *proxy_addr;
SoupURI *proxy_uri;
SoupConnection *conn;
+ GSimpleAsyncResult *result;
guint paused : 1;
+ guint new_api : 1;
guint redirection_count : 31;
SoupMessageQueueItemState state;
diff --git a/libsoup/soup-request-http.c b/libsoup/soup-request-http.c
index 89547e19..264bda04 100644
--- a/libsoup/soup-request-http.c
+++ b/libsoup/soup-request-http.c
@@ -32,9 +32,9 @@
#include "soup-request-http.h"
#include "soup-cache.h"
#include "soup-cache-private.h"
-#include "soup-http-input-stream.h"
#include "soup-message.h"
#include "soup-session.h"
+#include "soup-session-private.h"
#include "soup-uri.h"
G_DEFINE_TYPE (SoupRequestHTTP, soup_request_http, SOUP_TYPE_REQUEST)
@@ -44,6 +44,11 @@ struct _SoupRequestHTTPPrivate {
char *content_type;
};
+static void content_sniffed (SoupMessage *msg,
+ const char *content_type,
+ GHashTable *params,
+ gpointer user_data);
+
static void
soup_request_http_init (SoupRequestHTTP *http)
{
@@ -61,6 +66,8 @@ soup_request_http_check_uri (SoupRequest *request,
return FALSE;
http->priv->msg = soup_message_new_from_uri (SOUP_METHOD_GET, uri);
+ g_signal_connect (http->priv->msg, "content-sniffed",
+ G_CALLBACK (content_sniffed), http);
return TRUE;
}
@@ -69,8 +76,12 @@ soup_request_http_finalize (GObject *object)
{
SoupRequestHTTP *http = SOUP_REQUEST_HTTP (object);
- if (http->priv->msg)
+ if (http->priv->msg) {
+ g_signal_handlers_disconnect_by_func (http->priv->msg,
+ G_CALLBACK (content_sniffed),
+ http);
g_object_unref (http->priv->msg);
+ }
g_free (http->priv->content_type);
@@ -82,17 +93,11 @@ soup_request_http_send (SoupRequest *request,
GCancellable *cancellable,
GError **error)
{
- GInputStream *httpstream;
SoupRequestHTTP *http = SOUP_REQUEST_HTTP (request);
- httpstream = soup_http_input_stream_new (soup_request_get_session (request), http->priv->msg);
- if (!soup_http_input_stream_send (SOUP_HTTP_INPUT_STREAM (httpstream),
- cancellable, error)) {
- g_object_unref (httpstream);
- return NULL;
- }
- http->priv->content_type = g_strdup (soup_http_input_stream_get_content_type (SOUP_HTTP_INPUT_STREAM (httpstream)));
- return httpstream;
+ return soup_session_send_request (soup_request_get_session (request),
+ http->priv->msg,
+ cancellable, error);
}
@@ -124,16 +129,15 @@ free_send_async_data (SendAsyncData *sadata)
static void
http_input_stream_ready_cb (GObject *source, GAsyncResult *result, gpointer user_data)
{
- SoupHTTPInputStream *httpstream = SOUP_HTTP_INPUT_STREAM (source);
SendAsyncData *sadata = user_data;
GError *error = NULL;
+ GInputStream *stream;
- if (soup_http_input_stream_send_finish (httpstream, result, &error)) {
- sadata->http->priv->content_type = g_strdup (soup_http_input_stream_get_content_type (httpstream));
- g_simple_async_result_set_op_res_gpointer (sadata->simple, httpstream, g_object_unref);
+ stream = soup_session_send_request_finish (SOUP_SESSION (source), result, &error);
+ if (stream) {
+ g_simple_async_result_set_op_res_gpointer (sadata->simple, stream, g_object_unref);
} else {
g_simple_async_result_take_error (sadata->simple, error);
- g_object_unref (httpstream);
}
g_simple_async_result_complete (sadata->simple);
free_send_async_data (sadata);
@@ -171,9 +175,8 @@ conditional_get_ready_cb (SoupSession *session, SoupMessage *msg, gpointer user_
/* The resource was modified, or else it mysteriously disappeared
* from our cache. Either way we need to reload it now.
*/
- stream = soup_http_input_stream_new (session, sadata->original);
- soup_http_input_stream_send_async (SOUP_HTTP_INPUT_STREAM (stream), G_PRIORITY_DEFAULT,
- sadata->cancellable, http_input_stream_ready_cb, sadata);
+ soup_session_send_request_async (session, msg, sadata->cancellable,
+ http_input_stream_ready_cb, sadata);
}
static gboolean
@@ -253,10 +256,8 @@ soup_request_http_send_async (SoupRequest *request,
}
}
- stream = soup_http_input_stream_new (session, http->priv->msg);
- soup_http_input_stream_send_async (SOUP_HTTP_INPUT_STREAM (stream),
- G_PRIORITY_DEFAULT, cancellable,
- http_input_stream_ready_cb, sadata);
+ soup_session_send_request_async (session, http->priv->msg, cancellable,
+ http_input_stream_ready_cb, sadata);
}
static GInputStream *
@@ -282,6 +283,30 @@ soup_request_http_get_content_length (SoupRequest *request)
return soup_message_headers_get_content_length (http->priv->msg->response_headers);
}
+static void
+content_sniffed (SoupMessage *msg,
+ const char *content_type,
+ GHashTable *params,
+ gpointer user_data)
+{
+ SoupRequestHTTP *http = user_data;
+ GString *sniffed_type;
+
+ sniffed_type = g_string_new (content_type);
+ if (params) {
+ GHashTableIter iter;
+ gpointer key, value;
+
+ g_hash_table_iter_init (&iter, params);
+ while (g_hash_table_iter_next (&iter, &key, &value)) {
+ g_string_append (sniffed_type, "; ");
+ soup_header_g_string_append_param (sniffed_type, key, value);
+ }
+ }
+ g_free (http->priv->content_type);
+ http->priv->content_type = g_string_free (sniffed_type, FALSE);
+}
+
static const char *
soup_request_http_get_content_type (SoupRequest *request)
{
diff --git a/libsoup/soup-session-async.c b/libsoup/soup-session-async.c
index a58439e3..90526dca 100644
--- a/libsoup/soup-session-async.c
+++ b/libsoup/soup-session-async.c
@@ -34,6 +34,10 @@
static void run_queue (SoupSessionAsync *sa);
static void do_idle_run_queue (SoupSession *session);
+static void send_request_running (SoupSession *session, SoupMessageQueueItem *item);
+static void send_request_restarted (SoupSession *session, SoupMessageQueueItem *item);
+static void send_request_finished (SoupSession *session, SoupMessageQueueItem *item);
+
static void queue_message (SoupSession *session, SoupMessage *req,
SoupSessionCallback callback, gpointer user_data);
static guint send_message (SoupSession *session, SoupMessage *req);
@@ -225,9 +229,10 @@ message_completed (SoupMessage *msg, gpointer user_data)
{
SoupMessageQueueItem *item = user_data;
+ do_idle_run_queue (item->session);
+
if (item->state != SOUP_MESSAGE_RESTARTING)
item->state = SOUP_MESSAGE_FINISHING;
- do_idle_run_queue (item->session);
}
static void
@@ -403,11 +408,15 @@ process_queue_item (SoupMessageQueueItem *item,
case SOUP_MESSAGE_READY:
item->state = SOUP_MESSAGE_RUNNING;
soup_session_send_queue_item (session, item, message_completed);
+ if (item->new_api)
+ send_request_running (session, item);
break;
case SOUP_MESSAGE_RESTARTING:
item->state = SOUP_MESSAGE_STARTING;
soup_message_restarted (item->msg);
+ if (item->new_api)
+ send_request_restarted (session, item);
break;
case SOUP_MESSAGE_FINISHING:
@@ -420,6 +429,8 @@ process_queue_item (SoupMessageQueueItem *item,
soup_session_unqueue_item (session, item);
if (item->callback)
item->callback (session, item->msg, item->callback_data);
+ else if (item->new_api)
+ send_request_finished (session, item);
g_object_unref (item->msg);
do_idle_run_queue (session);
g_object_unref (session);
@@ -611,3 +622,229 @@ kick (SoupSession *session)
{
do_idle_run_queue (session);
}
+
+
+static void
+send_request_return_result (SoupMessageQueueItem *item,
+ gpointer stream, GError *error)
+{
+ GSimpleAsyncResult *simple;
+
+ simple = item->result;
+ item->result = NULL;
+
+ if (error)
+ g_simple_async_result_take_error (simple, error);
+ else if (SOUP_STATUS_IS_TRANSPORT_ERROR (item->msg->status_code)) {
+ if (stream)
+ g_object_unref (stream);
+ g_simple_async_result_set_error (simple,
+ SOUP_HTTP_ERROR,
+ item->msg->status_code,
+ "%s",
+ item->msg->reason_phrase);
+ } else
+ g_simple_async_result_set_op_res_gpointer (simple, stream, g_object_unref);
+
+ g_simple_async_result_complete (simple);
+ g_object_unref (simple);
+}
+
+static void
+send_request_restarted (SoupSession *session, SoupMessageQueueItem *item)
+{
+ /* We won't be needing this, then. */
+ g_object_set_data (G_OBJECT (item->msg), "SoupSessionAsync:ostream", NULL);
+}
+
+static void
+send_request_finished (SoupSession *session, SoupMessageQueueItem *item)
+{
+ GMemoryOutputStream *mostream;
+ GInputStream *istream = NULL;
+
+ if (!item->result) {
+ /* Something else already took care of it. */
+ return;
+ }
+
+ mostream = g_object_get_data (G_OBJECT (item->msg), "SoupSessionAsync:ostream");
+ if (mostream && !SOUP_STATUS_IS_TRANSPORT_ERROR (item->msg->status_code)) {
+ gpointer data;
+ gssize size;
+
+ /* We thought it would be requeued, but it wasn't, so
+ * return the original body.
+ */
+ size = g_memory_output_stream_get_data_size (mostream);
+ data = size ? g_memory_output_stream_steal_data (mostream) : g_strdup ("");
+ istream = g_memory_input_stream_new_from_data (data, size, g_free);
+ }
+
+ send_request_return_result (item, istream, NULL);
+}
+
+static void
+send_async_spliced (GObject *source, GAsyncResult *result, gpointer user_data)
+{
+ SoupMessageQueueItem *item = user_data;
+ GInputStream *istream = g_object_get_data (source, "istream");
+
+ GError *error = NULL;
+
+ /* If the message was cancelled, it will be completed via other means */
+ if (g_cancellable_is_cancelled (item->cancellable) ||
+ !item->result) {
+ soup_message_queue_item_unref (item);
+ return;
+ }
+
+ if (g_output_stream_splice_finish (G_OUTPUT_STREAM (source),
+ result, &error) == -1) {
+ send_request_return_result (item, NULL, error);
+ return;
+ }
+
+ /* Otherwise either restarted or finished will eventually be called.
+ * It should be safe to call the sync close() method here since
+ * the message body has already been written.
+ */
+ g_input_stream_close (istream, NULL, NULL);
+ do_idle_run_queue (item->session);
+ soup_message_queue_item_unref (item);
+}
+
+static void
+send_async_maybe_complete (SoupMessageQueueItem *item,
+ GInputStream *stream)
+{
+ if (item->msg->status_code == SOUP_STATUS_UNAUTHORIZED ||
+ item->msg->status_code == SOUP_STATUS_PROXY_UNAUTHORIZED ||
+ soup_session_would_redirect (item->session, item->msg)) {
+ GOutputStream *ostream;
+
+ /* Message may be requeued, so gather the current message body... */
+ ostream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
+ g_object_set_data_full (G_OBJECT (item->msg), "SoupSessionAsync:ostream",
+ ostream, g_object_unref);
+
+ g_object_set_data_full (G_OBJECT (ostream), "istream",
+ stream, g_object_unref);
+
+ /* Give the splice op its own ref on item */
+ soup_message_queue_item_ref (item);
+ g_output_stream_splice_async (ostream, stream,
+ /* We can't use CLOSE_SOURCE because it
+ * might get closed in the wrong thread.
+ */
+ G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+ G_PRIORITY_DEFAULT,
+ item->cancellable,
+ send_async_spliced, item);
+ return;
+ }
+
+ send_request_return_result (item, stream, NULL);
+}
+
+static void try_run_until_read (SoupMessageQueueItem *item);
+
+static gboolean
+read_ready_cb (SoupMessage *msg, gpointer user_data)
+{
+ SoupMessageQueueItem *item = user_data;
+ GError *error = NULL;
+
+ if (g_cancellable_set_error_if_cancelled (item->cancellable, &error)) {
+ send_request_return_result (item, NULL, error);
+ return FALSE;
+ }
+
+ try_run_until_read (item);
+ return FALSE;
+}
+
+static void
+try_run_until_read (SoupMessageQueueItem *item)
+{
+ GError *error = NULL;
+ GInputStream *stream = NULL;
+ GSource *source;
+
+ if (soup_message_io_run_until_read (item->msg, item->cancellable, &error))
+ stream = soup_message_io_get_response_istream (item->msg, &error);
+ if (stream) {
+ send_async_maybe_complete (item, stream);
+ return;
+ }
+
+ if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+ send_request_return_result (item, NULL, error);
+ return;
+ }
+
+ g_clear_error (&error);
+ source = soup_message_io_get_source (item->msg, item->cancellable,
+ read_ready_cb, item);
+ g_source_attach (source, soup_session_get_async_context (item->session));
+ g_source_unref (source);
+}
+
+static void
+send_request_running (SoupSession *session, SoupMessageQueueItem *item)
+{
+ try_run_until_read (item);
+}
+
+void
+soup_session_send_request_async (SoupSession *session,
+ SoupMessage *msg,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ SoupMessageQueueItem *item;
+ gboolean use_thread_context;
+
+ g_return_if_fail (SOUP_IS_SESSION_ASYNC (session));
+
+ g_object_get (G_OBJECT (session),
+ SOUP_SESSION_USE_THREAD_CONTEXT, &use_thread_context,
+ NULL);
+ g_return_if_fail (use_thread_context);
+
+ /* Balance out the unref that queuing will eventually do */
+ g_object_ref (msg);
+
+ queue_message (session, msg, NULL, NULL);
+
+ item = soup_message_queue_lookup (soup_session_get_queue (session), msg);
+ g_return_if_fail (item != NULL);
+
+ item->new_api = TRUE;
+ item->result = g_simple_async_result_new (G_OBJECT (session),
+ callback, user_data,
+ soup_session_send_request_async);
+ g_simple_async_result_set_op_res_gpointer (item->result, item, (GDestroyNotify) soup_message_queue_item_unref);
+
+ if (cancellable) {
+ g_object_unref (item->cancellable);
+ item->cancellable = g_object_ref (cancellable);
+ }
+}
+
+GInputStream *
+soup_session_send_request_finish (SoupSession *session,
+ GAsyncResult *result,
+ GError **error)
+{
+ GSimpleAsyncResult *simple;
+
+ g_return_val_if_fail (SOUP_IS_SESSION_ASYNC (session), NULL);
+ g_return_val_if_fail (g_simple_async_result_is_valid (result, G_OBJECT (session), soup_session_send_request_async), NULL);
+
+ simple = G_SIMPLE_ASYNC_RESULT (result);
+ if (g_simple_async_result_propagate_error (simple, error))
+ return NULL;
+ return g_object_ref (g_simple_async_result_get_op_res_gpointer (simple));
+}
diff --git a/libsoup/soup-session-private.h b/libsoup/soup-session-private.h
index 7462c618..a72fb2b4 100644
--- a/libsoup/soup-session-private.h
+++ b/libsoup/soup-session-private.h
@@ -31,6 +31,20 @@ void soup_session_set_item_status (SoupSession *s
SoupMessageQueueItem *item,
guint status_code);
+GInputStream *soup_session_send_request (SoupSession *session,
+ SoupMessage *msg,
+ GCancellable *cancellable,
+ GError **error);
+
+void soup_session_send_request_async (SoupSession *session,
+ SoupMessage *msg,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data);
+GInputStream *soup_session_send_request_finish (SoupSession *session,
+ GAsyncResult *result,
+ GError **error);
+
G_END_DECLS
#endif /* SOUP_SESSION_PRIVATE_H */
diff --git a/libsoup/soup-session-sync.c b/libsoup/soup-session-sync.c
index 1a919c71..a09c5b4e 100644
--- a/libsoup/soup-session-sync.c
+++ b/libsoup/soup-session-sync.c
@@ -240,6 +240,19 @@ try_again:
item->state = SOUP_MESSAGE_READY;
}
+static void process_queue_item (SoupMessageQueueItem *item);
+
+static void
+new_api_message_completed (SoupMessage *msg, gpointer user_data)
+{
+ SoupMessageQueueItem *item = user_data;
+
+ if (item->state != SOUP_MESSAGE_RESTARTING) {
+ item->state = SOUP_MESSAGE_FINISHING;
+ process_queue_item (item);
+ }
+}
+
static void
process_queue_item (SoupMessageQueueItem *item)
{
@@ -249,7 +262,8 @@ process_queue_item (SoupMessageQueueItem *item)
SoupProxyURIResolver *proxy_resolver;
guint status;
- item->state = SOUP_MESSAGE_STARTING;
+ soup_message_queue_item_ref (item);
+
do {
if (item->paused) {
g_mutex_lock (&priv->lock);
@@ -303,11 +317,22 @@ process_queue_item (SoupMessageQueueItem *item)
case SOUP_MESSAGE_READY:
item->state = SOUP_MESSAGE_RUNNING;
+
+ if (item->new_api) {
+ soup_session_send_queue_item (item->session, item, new_api_message_completed);
+ goto out;
+ }
+
soup_session_send_queue_item (item->session, item, NULL);
if (item->state != SOUP_MESSAGE_RESTARTING)
item->state = SOUP_MESSAGE_FINISHING;
break;
+ case SOUP_MESSAGE_RUNNING:
+ g_warn_if_fail (item->new_api);
+ item->state = SOUP_MESSAGE_FINISHING;
+ break;
+
case SOUP_MESSAGE_RESTARTING:
item->state = SOUP_MESSAGE_STARTING;
soup_message_restarted (item->msg);
@@ -326,6 +351,9 @@ process_queue_item (SoupMessageQueueItem *item)
break;
}
} while (item->state != SOUP_MESSAGE_FINISHED);
+
+ out:
+ soup_message_queue_item_unref (item);
}
static gboolean
@@ -476,3 +504,90 @@ kick (SoupSession *session)
g_cond_broadcast (&priv->cond);
g_mutex_unlock (&priv->lock);
}
+
+
+GInputStream *
+soup_session_send_request (SoupSession *session,
+ SoupMessage *msg,
+ GCancellable *cancellable,
+ GError **error)
+{
+ SoupMessageQueueItem *item;
+ GInputStream *stream = NULL;
+ GOutputStream *ostream;
+ GMemoryOutputStream *mostream;
+ gssize size;
+
+ g_return_val_if_fail (SOUP_IS_SESSION_SYNC (session), NULL);
+
+ SOUP_SESSION_CLASS (soup_session_sync_parent_class)->queue_message (session, msg, NULL, NULL);
+
+ item = soup_message_queue_lookup (soup_session_get_queue (session), msg);
+ g_return_val_if_fail (item != NULL, NULL);
+
+ item->new_api = TRUE;
+
+ while (!stream) {
+ /* Get a connection, etc */
+ process_queue_item (item);
+ if (SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code))
+ break;
+
+ /* Send request, read headers */
+ if (!soup_message_io_run_until_read (msg, cancellable, error))
+ break;
+
+ stream = soup_message_io_get_response_istream (msg, error);
+ if (!stream)
+ break;
+
+ /* Break if the message doesn't look likely-to-be-requeued */
+ if (msg->status_code != SOUP_STATUS_UNAUTHORIZED &&
+ msg->status_code != SOUP_STATUS_PROXY_UNAUTHORIZED &&
+ !soup_session_would_redirect (session, msg))
+ break;
+
+ /* Gather the current message body... */
+ ostream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
+ if (g_output_stream_splice (ostream, stream,
+ G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
+ G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+ cancellable, error) == -1) {
+ g_object_unref (stream);
+ g_object_unref (ostream);
+ stream = NULL;
+ break;
+ }
+ g_object_unref (stream);
+ stream = NULL;
+
+ /* If the message was requeued, loop */
+ if (item->state == SOUP_MESSAGE_RESTARTING) {
+ g_object_unref (ostream);
+ continue;
+ }
+
+ /* Not requeued, so return the original body */
+ mostream = G_MEMORY_OUTPUT_STREAM (ostream);
+ size = g_memory_output_stream_get_data_size (mostream);
+ stream = g_memory_input_stream_new ();
+ if (size) {
+ g_memory_input_stream_add_data (G_MEMORY_INPUT_STREAM (stream),
+ g_memory_output_stream_steal_data (mostream),
+ size, g_free);
+ }
+ g_object_unref (ostream);
+ }
+
+ if (SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code)) {
+ if (stream) {
+ g_object_unref (stream);
+ stream = NULL;
+ }
+ g_set_error_literal (error, SOUP_HTTP_ERROR, msg->status_code,
+ msg->reason_phrase);
+ }
+
+ soup_message_queue_item_unref (item);
+ return stream;
+}
diff --git a/libsoup/soup-session.c b/libsoup/soup-session.c
index fa95f7e7..200a25a8 100644
--- a/libsoup/soup-session.c
+++ b/libsoup/soup-session.c
@@ -1739,10 +1739,7 @@ redirect_handler (SoupMessage *msg, gpointer user_data)
if (new_uri)
soup_uri_free (new_uri);
- if (invalid) {
- /* Really we should just leave the status as-is,
- * but that would be an API break.
- */
+ if (invalid && !item->new_api) {
soup_message_set_status_full (msg,
SOUP_STATUS_MALFORMED,
"Invalid Redirect URL");
@@ -2247,6 +2244,7 @@ soup_session_pause_message (SoupSession *session,
priv = SOUP_SESSION_GET_PRIVATE (session);
item = soup_message_queue_lookup (priv->queue, msg);
g_return_if_fail (item != NULL);
+ g_return_if_fail (!item->new_api);
item->paused = TRUE;
if (item->state == SOUP_MESSAGE_RUNNING)
@@ -2279,6 +2277,7 @@ soup_session_unpause_message (SoupSession *session,
priv = SOUP_SESSION_GET_PRIVATE (session);
item = soup_message_queue_lookup (priv->queue, msg);
g_return_if_fail (item != NULL);
+ g_return_if_fail (!item->new_api);
item->paused = FALSE;
if (item->state == SOUP_MESSAGE_RUNNING)
diff --git a/po/POTFILES.in b/po/POTFILES.in
index b35ee888..80db0551 100644
--- a/po/POTFILES.in
+++ b/po/POTFILES.in
@@ -1,4 +1,5 @@
libsoup/soup-body-input-stream.c
libsoup/soup-converter-wrapper.c
+libsoup/soup-message-io.c
libsoup/soup-request.c
libsoup/soup-requester.c