summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--libsoup/Makefile.am2
-rw-r--r--libsoup/soup-cache-input-stream.c333
-rw-r--r--libsoup/soup-cache-input-stream.h52
-rw-r--r--po/POTFILES.in1
4 files changed, 388 insertions, 0 deletions
diff --git a/libsoup/Makefile.am b/libsoup/Makefile.am
index 87629ed1..59553f04 100644
--- a/libsoup/Makefile.am
+++ b/libsoup/Makefile.am
@@ -114,6 +114,8 @@ libsoup_2_4_la_SOURCES = \
soup-body-output-stream.h \
soup-body-output-stream.c \
soup-cache.c \
+ soup-cache-input-stream.h \
+ soup-cache-input-stream.c \
soup-cache-private.h \
soup-client-input-stream.h \
soup-client-input-stream.c \
diff --git a/libsoup/soup-cache-input-stream.c b/libsoup/soup-cache-input-stream.c
new file mode 100644
index 00000000..a44652a8
--- /dev/null
+++ b/libsoup/soup-cache-input-stream.c
@@ -0,0 +1,333 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * Copyright (C) 2012 Igalia, S.L.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <glib/gi18n-lib.h>
+#include "soup-cache-input-stream.h"
+#include "soup-message-body.h"
+
+static void soup_cache_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);
+
+G_DEFINE_TYPE_WITH_CODE (SoupCacheInputStream, soup_cache_input_stream, SOUP_TYPE_FILTER_INPUT_STREAM,
+ G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
+ soup_cache_input_stream_pollable_init))
+
+/* properties */
+enum {
+ PROP_0,
+
+ PROP_OUTPUT_STREAM,
+
+ LAST_PROP
+};
+
+struct _SoupCacheInputStreamPrivate
+{
+ GOutputStream *output_stream;
+ gsize bytes_written;
+
+ gboolean read_finished;
+ SoupBuffer *current_writing_buffer;
+ GQueue *buffer_queue;
+
+ GTask *task;
+};
+
+static void soup_cache_input_stream_write_next_buffer (SoupCacheInputStream *istream);
+
+static inline void
+notify_and_clear (SoupCacheInputStream *istream, GError *error)
+{
+ SoupCacheInputStreamPrivate *priv = istream->priv;
+
+ if (error)
+ g_task_return_error (priv->task, error);
+ else
+ g_task_return_int (priv->task, priv->bytes_written);
+
+ g_clear_object (&priv->output_stream);
+ g_clear_object (&priv->task);
+}
+
+gsize
+soup_cache_input_stream_cache_finish (SoupCacheInputStream *istream,
+ GAsyncResult *result,
+ GError **error)
+{
+ return g_task_propagate_int (G_TASK (result), error);
+}
+
+static inline void
+try_write_next_buffer (SoupCacheInputStream *istream)
+{
+ SoupCacheInputStreamPrivate *priv = istream->priv;
+
+ if (priv->current_writing_buffer == NULL && priv->buffer_queue->length)
+ soup_cache_input_stream_write_next_buffer (istream);
+ else if (priv->read_finished)
+ notify_and_clear (istream, NULL);
+ else if (g_input_stream_is_closed (G_INPUT_STREAM (istream))) {
+ GError *error = NULL;
+ g_set_error_literal (&error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+ _("Network stream unexpectedly closed"));
+ notify_and_clear (istream, error);
+ }
+}
+
+static void
+file_replaced_cb (GObject *source,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (user_data);
+ SoupCacheInputStreamPrivate *priv = istream->priv;
+ GError *error = NULL;
+
+ priv->output_stream = (GOutputStream *) g_file_replace_finish (G_FILE (source), res, &error);
+
+ if (error)
+ g_task_return_error (priv->task, error);
+ else
+ try_write_next_buffer (istream);
+}
+
+void
+soup_cache_input_stream_cache (SoupCacheInputStream *istream,
+ GFile *file,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ SoupCacheInputStreamPrivate *priv = istream->priv;
+
+ priv->task = g_task_new (istream, cancellable, callback, user_data);
+
+ g_file_replace_async (file, NULL, FALSE,
+ G_FILE_CREATE_PRIVATE | G_FILE_CREATE_REPLACE_DESTINATION,
+ G_PRIORITY_LOW, cancellable, file_replaced_cb, istream);
+}
+
+static void
+soup_cache_input_stream_init (SoupCacheInputStream *self)
+{
+ SoupCacheInputStreamPrivate *priv =
+ G_TYPE_INSTANCE_GET_PRIVATE (self, SOUP_TYPE_CACHE_INPUT_STREAM,
+ SoupCacheInputStreamPrivate);
+
+ priv->buffer_queue = g_queue_new ();
+ self->priv = priv;
+}
+
+static void
+soup_cache_input_stream_get_property (GObject *object,
+ guint property_id, GValue *value, GParamSpec *pspec)
+{
+ SoupCacheInputStream *self = SOUP_CACHE_INPUT_STREAM (object);
+ SoupCacheInputStreamPrivate *priv = self->priv;
+
+ switch (property_id) {
+ case PROP_OUTPUT_STREAM:
+ g_value_set_object (value, priv->output_stream);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+static void
+soup_cache_input_stream_set_property (GObject *object,
+ guint property_id, const GValue *value, GParamSpec *pspec)
+{
+ SoupCacheInputStream *self = SOUP_CACHE_INPUT_STREAM (object);
+ SoupCacheInputStreamPrivate *priv = self->priv;
+
+ switch (property_id) {
+ case PROP_OUTPUT_STREAM:
+ priv->output_stream = g_value_dup_object (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+static void
+soup_cache_input_stream_dispose (GObject *object)
+{
+ SoupCacheInputStreamPrivate *priv = SOUP_CACHE_INPUT_STREAM (object)->priv;
+
+ g_clear_object (&priv->output_stream);
+ g_clear_object (&priv->task);
+
+ G_OBJECT_CLASS (soup_cache_input_stream_parent_class)->dispose (object);
+}
+
+static void
+soup_cache_input_stream_finalize (GObject *object)
+{
+ SoupCacheInputStream *self = (SoupCacheInputStream *)object;
+ SoupCacheInputStreamPrivate *priv = self->priv;
+
+ g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
+ g_queue_free_full (priv->buffer_queue, (GDestroyNotify) soup_buffer_free);
+
+ G_OBJECT_CLASS (soup_cache_input_stream_parent_class)->finalize (object);
+}
+
+static void
+write_ready_cb (GObject *source, GAsyncResult *result, SoupCacheInputStream *istream)
+{
+ GOutputStream *ostream = G_OUTPUT_STREAM (source);
+ SoupCacheInputStreamPrivate *priv = istream->priv;
+ gssize write_size;
+ gsize pending;
+ GError *error = NULL;
+
+ write_size = g_output_stream_write_finish (ostream, result, &error);
+ if (error) {
+ notify_and_clear (istream, error);
+ g_object_unref (istream);
+ return;
+ }
+
+ /* Check that we have written everything */
+ pending = priv->current_writing_buffer->length - write_size;
+ if (pending) {
+ SoupBuffer *subbuffer = soup_buffer_new_subbuffer (priv->current_writing_buffer,
+ write_size, pending);
+ g_queue_push_head (priv->buffer_queue, subbuffer);
+ }
+
+ priv->bytes_written += write_size;
+ g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
+
+ try_write_next_buffer (istream);
+ g_object_unref (istream);
+}
+
+static void
+soup_cache_input_stream_write_next_buffer (SoupCacheInputStream *istream)
+{
+ SoupCacheInputStreamPrivate *priv = istream->priv;
+ SoupBuffer *buffer = g_queue_pop_head (priv->buffer_queue);
+ int priority;
+
+ g_assert (priv->output_stream && !g_output_stream_is_closed (priv->output_stream));
+ g_assert (priv->task);
+
+ g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
+ priv->current_writing_buffer = buffer;
+
+ if (priv->buffer_queue->length > 10)
+ priority = G_PRIORITY_DEFAULT;
+ else
+ priority = G_PRIORITY_LOW;
+
+ g_output_stream_write_async (priv->output_stream, buffer->data, buffer->length,
+ priority, g_task_get_cancellable (priv->task),
+ (GAsyncReadyCallback) write_ready_cb,
+ g_object_ref (istream));
+}
+
+static gssize
+read_internal (GInputStream *stream,
+ void *buffer,
+ gsize count,
+ gboolean blocking,
+ GCancellable *cancellable,
+ GError **error)
+{
+ SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (stream);
+ SoupCacheInputStreamPrivate *priv = istream->priv;
+ GInputStream *base_stream;
+ gssize nread;
+
+ base_stream = g_filter_input_stream_get_base_stream (G_FILTER_INPUT_STREAM (stream));
+ nread = g_pollable_stream_read (base_stream, buffer, count, blocking,
+ cancellable, error);
+
+ if (G_UNLIKELY (nread == -1 || priv->read_finished || !priv->task))
+ return nread;
+
+ if (nread == 0) {
+ priv->read_finished = TRUE;
+
+ if (priv->current_writing_buffer == NULL && priv->output_stream)
+ notify_and_clear (istream, NULL);
+ } else {
+ SoupBuffer *soup_buffer = soup_buffer_new (SOUP_MEMORY_COPY, buffer, nread);
+ g_queue_push_tail (priv->buffer_queue, soup_buffer);
+
+ if (priv->current_writing_buffer == NULL && priv->output_stream)
+ soup_cache_input_stream_write_next_buffer (istream);
+ }
+
+ return nread;
+}
+
+static gssize
+soup_cache_input_stream_read_fn (GInputStream *stream,
+ void *buffer,
+ gsize count,
+ GCancellable *cancellable,
+ GError **error)
+{
+ return read_internal (stream, buffer, count, TRUE,
+ cancellable, error);
+}
+
+static gssize
+soup_cache_input_stream_read_nonblocking (GPollableInputStream *stream,
+ void *buffer,
+ gsize count,
+ GError **error)
+{
+ return read_internal (G_INPUT_STREAM (stream), buffer, count, FALSE,
+ NULL, error);
+}
+
+static void
+soup_cache_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface,
+ gpointer interface_data)
+{
+ pollable_interface->read_nonblocking = soup_cache_input_stream_read_nonblocking;
+}
+
+static void
+soup_cache_input_stream_class_init (SoupCacheInputStreamClass *klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+ GInputStreamClass *istream_class = G_INPUT_STREAM_CLASS (klass);
+
+ g_type_class_add_private (klass, sizeof (SoupCacheInputStreamPrivate));
+
+ gobject_class->get_property = soup_cache_input_stream_get_property;
+ gobject_class->set_property = soup_cache_input_stream_set_property;
+ gobject_class->dispose = soup_cache_input_stream_dispose;
+ gobject_class->finalize = soup_cache_input_stream_finalize;
+
+ istream_class->read_fn = soup_cache_input_stream_read_fn;
+
+ g_object_class_install_property (gobject_class, PROP_OUTPUT_STREAM,
+ g_param_spec_object ("output-stream", "Output stream",
+ "the output stream where to write.",
+ G_TYPE_OUTPUT_STREAM,
+ G_PARAM_READWRITE |
+ G_PARAM_CONSTRUCT_ONLY |
+ G_PARAM_STATIC_STRINGS));
+}
+
+GInputStream *
+soup_cache_input_stream_new (GInputStream *base_stream)
+{
+ return g_object_new (SOUP_TYPE_CACHE_INPUT_STREAM,
+ "base-stream", base_stream,
+ "close-base-stream", FALSE,
+ NULL);
+}
diff --git a/libsoup/soup-cache-input-stream.h b/libsoup/soup-cache-input-stream.h
new file mode 100644
index 00000000..c999d102
--- /dev/null
+++ b/libsoup/soup-cache-input-stream.h
@@ -0,0 +1,52 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * soup-cache-input-stream.h - Header for SoupCacheInputStream
+ */
+
+#ifndef __SOUP_CACHE_INPUT_STREAM_H__
+#define __SOUP_CACHE_INPUT_STREAM_H__
+
+#include "soup-filter-input-stream.h"
+
+G_BEGIN_DECLS
+
+#define SOUP_TYPE_CACHE_INPUT_STREAM (soup_cache_input_stream_get_type())
+#define SOUP_CACHE_INPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), SOUP_TYPE_CACHE_INPUT_STREAM, SoupCacheInputStream))
+#define SOUP_CACHE_INPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), SOUP_TYPE_CACHE_INPUT_STREAM, SoupCacheInputStreamClass))
+#define SOUP_IS_CACHE_INPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), SOUP_TYPE_CACHE_INPUT_STREAM))
+#define SOUP_IS_CACHE_INPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), SOUP_TYPE_CACHE_INPUT_STREAM))
+#define SOUP_CACHE_INPUT_STREAM_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), SOUP_TYPE_CACHE_INPUT_STREAM, SoupCacheInputStreamClass))
+
+typedef struct _SoupCacheInputStream SoupCacheInputStream;
+typedef struct _SoupCacheInputStreamClass SoupCacheInputStreamClass;
+typedef struct _SoupCacheInputStreamPrivate SoupCacheInputStreamPrivate;
+
+struct _SoupCacheInputStreamClass
+{
+ SoupFilterInputStreamClass parent_class;
+};
+
+struct _SoupCacheInputStream
+{
+ SoupFilterInputStream parent;
+
+ SoupCacheInputStreamPrivate *priv;
+};
+
+GType soup_cache_input_stream_get_type (void) G_GNUC_CONST;
+
+GInputStream *soup_cache_input_stream_new (GInputStream *base_stream);
+
+void soup_cache_input_stream_cache (SoupCacheInputStream *istream,
+ GFile *file,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data);
+
+gsize soup_cache_input_stream_cache_finish (SoupCacheInputStream *istream,
+ GAsyncResult *result,
+ GError **error);
+
+G_END_DECLS
+
+#endif /* __SOUP_CACHE_INPUT_STREAM_H__ */
diff --git a/po/POTFILES.in b/po/POTFILES.in
index fff1f0ea..21c70d42 100644
--- a/po/POTFILES.in
+++ b/po/POTFILES.in
@@ -1,4 +1,5 @@
libsoup/soup-body-input-stream.c
+libsoup/soup-cache-input-stream.c
libsoup/soup-converter-wrapper.c
libsoup/soup-message-client-io.c
libsoup/soup-message-io.c