diff options
author | Dan Winship <danw@gnome.org> | 2012-02-06 15:08:08 -0500 |
---|---|---|
committer | Dan Winship <danw@gnome.org> | 2012-04-17 12:33:12 -0400 |
commit | 82ec4dcaed8107d436f76c45ec30645715b6dbef (patch) | |
tree | a2b8a062d0e4cac98d65babedb0ff0704946e041 /gio/gconverterinputstream.c | |
parent | 111ba203c2440a5d7d8a14b043213b78d54752ae (diff) | |
download | glib-82ec4dcaed8107d436f76c45ec30645715b6dbef.tar.gz |
gio: implement GPollableInput/OutputStream in more stream types
Implement GPollableInputStream in GMemoryInputStream and
GConverterInputStream, and likewise implement GPollableOutputStream in
the corresponding output streams.
https://bugzilla.gnome.org/show_bug.cgi?id=673997
Diffstat (limited to 'gio/gconverterinputstream.c')
-rw-r--r-- | gio/gconverterinputstream.c | 130 |
1 files changed, 115 insertions, 15 deletions
diff --git a/gio/gconverterinputstream.c b/gio/gconverterinputstream.c index 2fbf94db6..1acf9a948 100644 --- a/gio/gconverterinputstream.c +++ b/gio/gconverterinputstream.c @@ -25,6 +25,7 @@ #include <string.h> #include "gconverterinputstream.h" +#include "gpollableinputstream.h" #include "gsimpleasyncresult.h" #include "gcancellable.h" #include "gioenumtypes.h" @@ -41,6 +42,8 @@ * Converter input stream implements #GInputStream and allows * conversion of data of various types during reading. * + * As of GLib 2.34, #GConverterInputStream implements + * #GPollableInputStream. **/ #define INITIAL_BUFFER_SIZE 4096 @@ -55,6 +58,7 @@ typedef struct { struct _GConverterInputStreamPrivate { gboolean at_input_end; gboolean finished; + gboolean need_input; GConverter *converter; Buffer input_buffer; Buffer converted_buffer; @@ -80,9 +84,24 @@ static gssize g_converter_input_stream_read (GInputStream *stream, GCancellable *cancellable, GError **error); -G_DEFINE_TYPE (GConverterInputStream, - g_converter_input_stream, - G_TYPE_FILTER_INPUT_STREAM) +static gboolean g_converter_input_stream_can_poll (GPollableInputStream *stream); +static gboolean g_converter_input_stream_is_readable (GPollableInputStream *stream); +static gssize g_converter_input_stream_read_nonblocking (GPollableInputStream *stream, + void *buffer, + gsize size, + GError **error); + +static GSource *g_converter_input_stream_create_source (GPollableInputStream *stream, + GCancellable *cancellable); + +static void g_converter_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface); + +G_DEFINE_TYPE_WITH_CODE (GConverterInputStream, + g_converter_input_stream, + G_TYPE_FILTER_INPUT_STREAM, + G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM, + g_converter_input_stream_pollable_iface_init); + ) static void g_converter_input_stream_class_init (GConverterInputStreamClass *klass) @@ -113,6 +132,15 @@ g_converter_input_stream_class_init (GConverterInputStreamClass *klass) } static void +g_converter_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface) +{ + iface->can_poll = g_converter_input_stream_can_poll; + iface->is_readable = g_converter_input_stream_is_readable; + iface->read_nonblocking = g_converter_input_stream_read_nonblocking; + iface->create_source = g_converter_input_stream_create_source; +} + +static void g_converter_input_stream_finalize (GObject *object) { GConverterInputStreamPrivate *priv; @@ -320,6 +348,7 @@ buffer_ensure_space (Buffer *buffer, static gssize fill_input_buffer (GConverterInputStream *stream, gsize at_least_size, + gboolean blocking, GCancellable *cancellable, GError **error) { @@ -332,25 +361,30 @@ fill_input_buffer (GConverterInputStream *stream, buffer_ensure_space (&priv->input_buffer, at_least_size); base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream; - nread = g_input_stream_read (base_stream, - priv->input_buffer.data + priv->input_buffer.end, - buffer_tailspace (&priv->input_buffer), - cancellable, - error); + nread = g_pollable_stream_read (base_stream, + priv->input_buffer.data + priv->input_buffer.end, + buffer_tailspace (&priv->input_buffer), + blocking, + cancellable, + error); if (nread > 0) - priv->input_buffer.end += nread; + { + priv->input_buffer.end += nread; + priv->need_input = FALSE; + } return nread; } static gssize -g_converter_input_stream_read (GInputStream *stream, - void *buffer, - gsize count, - GCancellable *cancellable, - GError **error) +read_internal (GInputStream *stream, + void *buffer, + gsize count, + gboolean blocking, + GCancellable *cancellable, + GError **error) { GConverterInputStream *cstream; GConverterInputStreamPrivate *priv; @@ -389,7 +423,7 @@ g_converter_input_stream_read (GInputStream *stream, total_bytes_read == 0 && !priv->at_input_end) { - nread = fill_input_buffer (cstream, count, cancellable, error); + nread = fill_input_buffer (cstream, count, blocking, cancellable, error); if (nread < 0) return -1; if (nread == 0) @@ -497,6 +531,7 @@ g_converter_input_stream_read (GInputStream *stream, my_error2 = NULL; nread = fill_input_buffer (cstream, buffer_data_size (&priv->input_buffer) + 4096, + blocking, cancellable, &my_error2); if (nread < 0) @@ -504,6 +539,7 @@ g_converter_input_stream_read (GInputStream *stream, /* Can't read any more data, return that error */ g_error_free (my_error); g_propagate_error (error, my_error2); + priv->need_input = TRUE; return -1; } else if (nread == 0) @@ -536,6 +572,70 @@ g_converter_input_stream_read (GInputStream *stream, g_assert_not_reached (); } +static gssize +g_converter_input_stream_read (GInputStream *stream, + void *buffer, + gsize count, + GCancellable *cancellable, + GError **error) +{ + return read_internal (stream, buffer, count, TRUE, cancellable, error); +} + +static gboolean +g_converter_input_stream_can_poll (GPollableInputStream *stream) +{ + GInputStream *base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream; + + return (G_IS_POLLABLE_INPUT_STREAM (base_stream) && + g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (base_stream))); +} + +static gboolean +g_converter_input_stream_is_readable (GPollableInputStream *stream) +{ + GInputStream *base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream; + GConverterInputStream *cstream = G_CONVERTER_INPUT_STREAM (stream); + + if (buffer_data_size (&cstream->priv->converted_buffer)) + return TRUE; + else if (buffer_data_size (&cstream->priv->input_buffer) && + !cstream->priv->need_input) + return TRUE; + else + return g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (base_stream)); +} + +static gssize +g_converter_input_stream_read_nonblocking (GPollableInputStream *stream, + void *buffer, + gsize count, + GError **error) +{ + return read_internal (G_INPUT_STREAM (stream), buffer, count, + FALSE, NULL, error); +} + +static GSource * +g_converter_input_stream_create_source (GPollableInputStream *stream, + GCancellable *cancellable) +{ + GInputStream *base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream; + GSource *base_source, *pollable_source; + + if (g_pollable_input_stream_is_readable (stream)) + base_source = g_timeout_source_new (0); + else + base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (base_stream), NULL); + + pollable_source = g_pollable_source_new_full (stream, base_source, + cancellable); + g_source_unref (base_source); + + return pollable_source; +} + + /** * g_converter_input_stream_get_converter: * @converter_stream: a #GConverterInputStream |