summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Winship <danw@gnome.org>2010-09-18 13:05:25 -0400
committerDan Winship <danw@gnome.org>2010-11-26 15:08:08 -0500
commitc20c2c0abd3bdb1b30b85a586ee6095ed75a7bc2 (patch)
tree77b0d54c202be5c4a068d374323f98750c9dca9a
parent6181c7de36771d4d3bb55785912a934e078b16df (diff)
downloadglib-c20c2c0abd3bdb1b30b85a586ee6095ed75a7bc2.tar.gz
Add pollable input/output streams
When interfacing with APIs that expect unix-style async I/O, it is useful to be able to tell in advance whether a read/write is going to block. This adds new interfaces GPollableInputStream and GPollableOutputStream that can be implemented by a GInputStream or GOutputStream to add _is_readable/_is_writable, _create_source, and _read_nonblocking/_write_nonblocking methods. Also, implement for GUnixInput/OutputStream and GSocketInput/OutputStream https://bugzilla.gnome.org/show_bug.cgi?id=634241
-rw-r--r--docs/reference/gio/gio-docs.xml2
-rw-r--r--docs/reference/gio/gio-sections.txt41
-rw-r--r--docs/reference/gio/gio.types3
-rw-r--r--gio/Makefile.am4
-rw-r--r--gio/gio.h2
-rw-r--r--gio/gio.symbols21
-rw-r--r--gio/giotypes.h18
-rw-r--r--gio/gpollableinputstream.c304
-rw-r--r--gio/gpollableinputstream.h101
-rw-r--r--gio/gpollableoutputstream.c201
-rw-r--r--gio/gpollableoutputstream.h98
-rw-r--r--gio/gsocketconnection.c3
-rw-r--r--gio/gsocketinputstream.c59
-rw-r--r--gio/gsocketoutputstream.c60
-rw-r--r--gio/gunixinputstream.c51
-rw-r--r--gio/gunixoutputstream.c50
-rw-r--r--gio/tests/.gitignore1
-rw-r--r--gio/tests/Makefile.am4
-rw-r--r--gio/tests/pollable.c240
19 files changed, 1251 insertions, 12 deletions
diff --git a/docs/reference/gio/gio-docs.xml b/docs/reference/gio/gio-docs.xml
index 85c8fabfc..f628e45b4 100644
--- a/docs/reference/gio/gio-docs.xml
+++ b/docs/reference/gio/gio-docs.xml
@@ -68,6 +68,8 @@
<xi:include href="xml/gunixoutputstream.xml"/>
<xi:include href="xml/gconverterinputstream.xml"/>
<xi:include href="xml/gconverteroutputstream.xml"/>
+ <xi:include href="xml/gpollableinputstream.xml"/>
+ <xi:include href="xml/gpollableoutputstream.xml"/>
</chapter>
<chapter id="types">
<title>File types and applications</title>
diff --git a/docs/reference/gio/gio-sections.txt b/docs/reference/gio/gio-sections.txt
index 25532adcc..68d6b28a8 100644
--- a/docs/reference/gio/gio-sections.txt
+++ b/docs/reference/gio/gio-sections.txt
@@ -2934,3 +2934,44 @@ G_IS_PERIODIC
<SUBSECTION Private>
g_periodic_get_type
</SECTION>
+
+<SECTION>
+<FILE>gpollableinputstream</FILE>
+<TITLE>GPollableInputStream</TITLE>
+GPollableInputStream
+GPollableInputStreamInterface
+<SUBSECTION>
+g_pollable_input_stream_can_poll
+g_pollable_input_stream_is_readable
+g_pollable_input_stream_create_source
+g_pollable_input_stream_read_nonblocking
+<SUBSECTION>
+GPollableSourceFunc
+g_pollable_source_new
+<SUBSECTION Standard>
+G_POLLABLE_INPUT_STREAM
+G_POLLABLE_INPUT_STREAM_GET_INTERFACE
+G_IS_POLLABLE_INPUT_STREAM
+G_TYPE_POLLABLE_INPUT_STREAM
+<SUBSECTION Private>
+g_pollable_input_stream_get_type
+</SECTION>
+
+<SECTION>
+<FILE>gpollableoutputstream</FILE>
+<TITLE>GPollableOutputStream</TITLE>
+GPollableOutputStream
+GPollableOutputStreamInterface
+<SUBSECTION>
+g_pollable_output_stream_can_poll
+g_pollable_output_stream_is_writable
+g_pollable_output_stream_create_source
+g_pollable_output_stream_write_nonblocking
+<SUBSECTION Standard>
+G_POLLABLE_OUTPUT_STREAM
+G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE
+G_IS_POLLABLE_OUTPUT_STREAM
+G_TYPE_POLLABLE_OUTPUT_STREAM
+<SUBSECTION Private>
+g_pollable_output_stream_get_type
+</SECTION>
diff --git a/docs/reference/gio/gio.types b/docs/reference/gio/gio.types
index ec01b40c9..b0472199a 100644
--- a/docs/reference/gio/gio.types
+++ b/docs/reference/gio/gio.types
@@ -76,6 +76,9 @@ g_output_stream_splice_flags_get_type
g_password_save_get_type
g_periodic_get_type
g_permission_get_type
+g_pollable_input_stream_get_type
+g_pollable_io_stream_get_type
+g_pollable_output_stream_get_type
g_proxy_address_enumerator_get_type
g_proxy_address_get_type
g_proxy_get_type
diff --git a/gio/Makefile.am b/gio/Makefile.am
index 2aedfc102..5da34b2c6 100644
--- a/gio/Makefile.am
+++ b/gio/Makefile.am
@@ -349,6 +349,8 @@ libgio_2_0_la_SOURCES = \
goutputstream.c \
gperiodic.c \
gpermission.c \
+ gpollableinputstream.c \
+ gpollableoutputstream.c \
gpollfilemonitor.c \
gpollfilemonitor.h \
gproxyresolver.c \
@@ -505,6 +507,8 @@ gio_headers = \
goutputstream.h \
gperiodic.h \
gpermission.h \
+ gpollableinputstream.h \
+ gpollableoutputstream.h \
gproxyaddress.h \
gproxy.h \
gproxyaddressenumerator.h \
diff --git a/gio/gio.h b/gio/gio.h
index c18384ff0..0babc7c10 100644
--- a/gio/gio.h
+++ b/gio/gio.h
@@ -95,6 +95,8 @@
#include <gio/goutputstream.h>
#include <gio/gperiodic.h>
#include <gio/gpermission.h>
+#include <gio/gpollableinputstream.h>
+#include <gio/gpollableoutputstream.h>
#include <gio/gproxy.h>
#include <gio/gproxyaddress.h>
#include <gio/gproxyaddressenumerator.h>
diff --git a/gio/gio.symbols b/gio/gio.symbols
index b417ebae2..867cf6385 100644
--- a/gio/gio.symbols
+++ b/gio/gio.symbols
@@ -1973,3 +1973,24 @@ g_periodic_remove
g_periodic_unblock
#endif
#endif
+
+#if IN_HEADER(__G_POLLABLE_INPUT_STREAM_H__)
+#if IN_FILE(__G_POLLABLE_INPUT_STREAM_C__)
+g_pollable_input_stream_get_type G_GNUC_CONST
+g_pollable_input_stream_can_poll
+g_pollable_input_stream_create_source
+g_pollable_input_stream_is_readable
+g_pollable_input_stream_read_nonblocking
+g_pollable_source_new
+#endif
+#endif
+
+#if IN_HEADER(__G_POLLABLE_OUTPUT_STREAM_H__)
+#if IN_FILE(__G_POLLABLE_OUTPUT_STREAM_C__)
+g_pollable_output_stream_get_type G_GNUC_CONST
+g_pollable_output_stream_can_poll
+g_pollable_output_stream_create_source
+g_pollable_output_stream_is_writable
+g_pollable_output_stream_write_nonblocking
+#endif
+#endif
diff --git a/gio/giotypes.h b/gio/giotypes.h
index 829d28534..808e84d41 100644
--- a/gio/giotypes.h
+++ b/gio/giotypes.h
@@ -124,6 +124,8 @@ typedef struct _GNetworkAddress GNetworkAddress;
typedef struct _GNetworkService GNetworkService;
typedef struct _GOutputStream GOutputStream;
typedef struct _GIOStream GIOStream;
+typedef struct _GPollableInputStream GPollableInputStream; /* Dummy typedef */
+typedef struct _GPollableOutputStream GPollableOutputStream; /* Dummy typedef */
typedef struct _GResolver GResolver;
typedef struct _GSeekable GSeekable;
typedef struct _GSimpleAsyncResult GSimpleAsyncResult;
@@ -391,6 +393,22 @@ typedef struct _GDBusNodeInfo GDBusNodeInfo;
typedef gboolean (*GCancellableSourceFunc) (GCancellable *cancellable,
gpointer user_data);
+/**
+ * GPollableSourceFunc:
+ * @pollable_stream: the #GPollableInputStream or #GPollableOutputStream
+ * @user_data: data passed in by the user.
+ *
+ * This is the function type of the callback used for the #GSource
+ * returned by g_pollable_input_stream_create_source() and
+ * g_pollable_output_stream_create_source().
+ *
+ * Returns: it should return %FALSE if the source should be removed.
+ *
+ * Since: 2.28
+ */
+typedef gboolean (*GPollableSourceFunc) (GObject *pollable_stream,
+ gpointer user_data);
+
G_END_DECLS
#endif /* __GIO_TYPES_H__ */
diff --git a/gio/gpollableinputstream.c b/gio/gpollableinputstream.c
new file mode 100644
index 000000000..19946c666
--- /dev/null
+++ b/gio/gpollableinputstream.c
@@ -0,0 +1,304 @@
+/* GIO - GLib Input, Output and Streaming Library
+ *
+ * Copyright (C) 2010 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General
+ * Public License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#include "config.h"
+
+#include <errno.h>
+
+#include "gpollableinputstream.h"
+#include "gasynchelper.h"
+#include "gio-marshal.h"
+#include "glibintl.h"
+
+/**
+ * SECTION:gpollableinputstream
+ * @short_description: Interface for pollable input streams
+ * @include: gio/gio.h
+ * @see_also: #GInputStream, #GPollableOutputStream, #GFileDescriptorBased
+ *
+ * #GPollableInputStream is implemented by #GInputStream<!-- -->s that
+ * can be polled for readiness to read. This can be used when
+ * interfacing with a non-gio API that expects
+ * unix-file-descriptor-style asynchronous I/O rather than gio-style.
+ *
+ * Since: 2.28
+ */
+
+G_DEFINE_INTERFACE (GPollableInputStream, g_pollable_input_stream, G_TYPE_INPUT_STREAM)
+
+static gboolean g_pollable_input_stream_default_can_poll (GPollableInputStream *stream);
+static gssize g_pollable_input_stream_default_read_nonblocking (GPollableInputStream *stream,
+ void *buffer,
+ gsize size,
+ GError **error);
+
+static void
+g_pollable_input_stream_default_init (GPollableInputStreamInterface *iface)
+{
+ iface->can_poll = g_pollable_input_stream_default_can_poll;
+ iface->read_nonblocking = g_pollable_input_stream_default_read_nonblocking;
+}
+
+static gboolean
+g_pollable_input_stream_default_can_poll (GPollableInputStream *stream)
+{
+ return TRUE;
+}
+
+/**
+ * g_pollable_input_stream_can_poll:
+ * @stream: a #GPollableInputStream.
+ *
+ * Checks if @stream is actually pollable. Some classes may implement
+ * #GPollableInputStream but have only certain instances of that class
+ * be pollable. If this method returns %FALSE, then the behavior of
+ * other #GPollableInputStream methods is undefined.
+ *
+ * For any given stream, the value returned by this method is constant;
+ * a stream cannot switch from pollable to non-pollable or vice versa.
+ *
+ * Returns: %TRUE if @stream is pollable, %FALSE if not.
+ *
+ * Since: 2.28
+ */
+gboolean
+g_pollable_input_stream_can_poll (GPollableInputStream *stream)
+{
+ g_return_val_if_fail (G_IS_POLLABLE_INPUT_STREAM (stream), FALSE);
+
+ return G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->can_poll (stream);
+}
+
+/**
+ * g_pollable_input_stream_is_readable:
+ * @stream: a #GPollableInputStream.
+ *
+ * Checks if @stream can be read.
+ *
+ * Note that some stream types may not be able to implement this 100%
+ * reliably, and it is possible that a call to g_input_stream_read()
+ * after this returns %TRUE would still block. To guarantee
+ * non-blocking behavior, you should always use
+ * g_pollable_input_stream_read_nonblocking(), which will return a
+ * %G_IO_ERROR_WOULD_BLOCK error rather than blocking.
+ *
+ * Returns: %TRUE if @stream is readable, %FALSE if not. If an error
+ * has occurred on @stream, this will result in
+ * g_pollable_input_stream_is_readable() returning %TRUE, and the
+ * next attempt to read will return the error.
+ *
+ * Since: 2.28
+ */
+gboolean
+g_pollable_input_stream_is_readable (GPollableInputStream *stream)
+{
+ g_return_val_if_fail (G_IS_POLLABLE_INPUT_STREAM (stream), FALSE);
+
+ return G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->is_readable (stream);
+}
+
+/**
+ * g_pollable_input_stream_create_source:
+ * @stream: a #GPollableInputStream.
+ * @cancellable: a #GCancellable, or %NULL
+ *
+ * Creates a #GSource that triggers when @stream can be read, or
+ * @cancellable is triggered or an error occurs. The callback on the
+ * source is of the #GPollableSourceFunc type.
+ *
+ * As with g_pollable_input_stream_is_readable(), it is possible that
+ * the stream may not actually be readable even after the source
+ * triggers, so you should use
+ * g_pollable_input_stream_read_nonblocking() rather than
+ * g_input_stream_read() from the callback.
+ *
+ * Returns: a new #GSource
+ *
+ * Since: 2.28
+ */
+GSource *
+g_pollable_input_stream_create_source (GPollableInputStream *stream,
+ GCancellable *cancellable)
+{
+ g_return_val_if_fail (G_IS_POLLABLE_INPUT_STREAM (stream), NULL);
+
+ return G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
+ create_source (stream, cancellable);
+}
+
+static gssize
+g_pollable_input_stream_default_read_nonblocking (GPollableInputStream *stream,
+ void *buffer,
+ gsize size,
+ GError **error)
+{
+ if (!g_pollable_input_stream_is_readable (stream))
+ {
+ g_set_error (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
+ g_strerror (EAGAIN));
+ return -1;
+ }
+
+ return g_input_stream_read (G_INPUT_STREAM (stream), buffer, size,
+ NULL, error);
+}
+
+/**
+ * g_pollable_input_stream_read_nonblocking:
+ * @stream: a #GPollableInputStream
+ * @buffer: a buffer to read data into (which should be at least @size
+ * bytes long).
+ * @size: the number of bytes you want to read
+ * @cancellable: a #GCancellable, or %NULL
+ * @error: #GError for error reporting, or %NULL to ignore.
+ *
+ * Attempts to read up to @size bytes from @stream into @buffer, as
+ * with g_input_stream_read(). If @stream is not currently readable,
+ * this will immediately return %G_IO_ERROR_WOULD_BLOCK, and you can
+ * use g_pollable_input_stream_create_source() to create a #GSource
+ * that will be triggered when @stream is readable.
+ *
+ * Note that since this method never blocks, you cannot actually
+ * use @cancellable to cancel it. However, it will return an error
+ * if @cancellable has already been cancelled when you call, which
+ * may happen if you call this method after a source triggers due
+ * to having been cancelled.
+ *
+ * Return value: the number of bytes read, or -1 on error (including
+ * %G_IO_ERROR_WOULD_BLOCK).
+ */
+gssize
+g_pollable_input_stream_read_nonblocking (GPollableInputStream *stream,
+ void *buffer,
+ gsize size,
+ GCancellable *cancellable,
+ GError **error)
+{
+ g_return_val_if_fail (G_IS_POLLABLE_INPUT_STREAM (stream), -1);
+
+ if (g_cancellable_set_error_if_cancelled (cancellable, error))
+ return -1;
+
+ return G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
+ read_nonblocking (stream, buffer, size, error);
+}
+
+/* GPollableSource */
+
+typedef struct {
+ GSource source;
+
+ GObject *stream;
+} GPollableSource;
+
+static gboolean
+pollable_source_prepare (GSource *source,
+ gint *timeout)
+{
+ *timeout = -1;
+ return FALSE;
+}
+
+static gboolean
+pollable_source_check (GSource *source)
+{
+ return FALSE;
+}
+
+static gboolean
+pollable_source_dispatch (GSource *source,
+ GSourceFunc callback,
+ gpointer user_data)
+{
+ GPollableSourceFunc func = (GPollableSourceFunc)callback;
+ GPollableSource *pollable_source = (GPollableSource *)source;
+
+ return (*func) (pollable_source->stream, user_data);
+}
+
+static void
+pollable_source_finalize (GSource *source)
+{
+ GPollableSource *pollable_source = (GPollableSource *)source;
+
+ g_object_unref (pollable_source->stream);
+}
+
+static gboolean
+pollable_source_closure_callback (GObject *stream,
+ gpointer data)
+{
+ GClosure *closure = data;
+
+ GValue param = { 0, };
+ GValue result_value = { 0, };
+ gboolean result;
+
+ g_value_init (&result_value, G_TYPE_BOOLEAN);
+
+ g_value_init (&param, G_TYPE_OBJECT);
+ g_value_set_object (&param, stream);
+
+ g_closure_invoke (closure, &result_value, 1, &param, NULL);
+
+ result = g_value_get_boolean (&result_value);
+ g_value_unset (&result_value);
+ g_value_unset (&param);
+
+ return result;
+}
+
+static GSourceFuncs pollable_source_funcs =
+{
+ pollable_source_prepare,
+ pollable_source_check,
+ pollable_source_dispatch,
+ pollable_source_finalize,
+ (GSourceFunc)pollable_source_closure_callback,
+ (GSourceDummyMarshal)_gio_marshal_BOOLEAN__VOID,
+};
+
+/**
+ * g_pollable_source_new:
+ * @pollable_stream: the stream associated with the new source
+ *
+ * Utility method for #GPollableInputStream and #GPollableOutputStream
+ * implementations. Creates a new #GSource that expects a callback of
+ * type #GPollableSourceFunc. The new source does not actually do
+ * anything on its own; use g_source_add_child_source() to add other
+ * sources to it to cause it to trigger.
+ *
+ * Return value: the new #GSource.
+ *
+ * Since: 2.28
+ */
+GSource *
+g_pollable_source_new (GObject *pollable_stream)
+{
+ GSource *source;
+ GPollableSource *pollable_source;
+
+ source = g_source_new (&pollable_source_funcs, sizeof (GPollableSource));
+ g_source_set_name (source, "GPollableSource");
+ pollable_source = (GPollableSource *)source;
+ pollable_source->stream = g_object_ref (pollable_stream);
+
+ return source;
+}
diff --git a/gio/gpollableinputstream.h b/gio/gpollableinputstream.h
new file mode 100644
index 000000000..5def93ba8
--- /dev/null
+++ b/gio/gpollableinputstream.h
@@ -0,0 +1,101 @@
+/* GIO - GLib Input, Output and Streaming Library
+ *
+ * Copyright (C) 2010 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General
+ * Public License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __G_POLLABLE_INPUT_STREAM_H__
+#define __G_POLLABLE_INPUT_STREAM_H__
+
+#include <gio/gio.h>
+
+G_BEGIN_DECLS
+
+#define G_TYPE_POLLABLE_INPUT_STREAM (g_pollable_input_stream_get_type ())
+#define G_POLLABLE_INPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), G_TYPE_POLLABLE_INPUT_STREAM, GPollableInputStream))
+#define G_IS_POLLABLE_INPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), G_TYPE_POLLABLE_INPUT_STREAM))
+#define G_POLLABLE_INPUT_STREAM_GET_INTERFACE(obj) (G_TYPE_INSTANCE_GET_INTERFACE ((obj), G_TYPE_POLLABLE_INPUT_STREAM, GPollableInputStreamInterface))
+
+/**
+ * GPollableInputStream:
+ *
+ * An interface for a #GInputStream that can be polled for readability.
+ *
+ * Since: 2.28
+ */
+typedef struct _GPollableInputStreamInterface GPollableInputStreamInterface;
+
+/**
+ * GPollableInputStreamInterface:
+ * @g_iface: The parent interface.
+ * @can_poll: Checks if the #GPollableInputStream instance is actually pollable
+ * @is_readable: Checks if the stream is readable
+ * @create_source: Creates a #GSource to poll the stream
+ * @read_nonblocking: Does a non-blocking read or returns
+ * %G_IO_ERROR_WOULD_BLOCK
+ *
+ * The interface for pollable input streams.
+ *
+ * The default implementation of @can_poll always returns %TRUE.
+ *
+ * The default implementation of @read_nonblocking calls
+ * g_pollable_input_stream_is_readable(), and then calls
+ * g_input_stream_read() if it returns %TRUE. This means you only need
+ * to override it if it is possible that your @is_readable
+ * implementation may return %TRUE when the stream is not actually
+ * readable.
+ *
+ * Since: 2.28
+ */
+struct _GPollableInputStreamInterface
+{
+ GTypeInterface g_iface;
+
+ /* Virtual Table */
+ gboolean (*can_poll) (GPollableInputStream *stream);
+
+ gboolean (*is_readable) (GPollableInputStream *stream);
+ GSource * (*create_source) (GPollableInputStream *stream,
+ GCancellable *cancellable);
+ gssize (*read_nonblocking) (GPollableInputStream *stream,
+ void *buffer,
+ gsize size,
+ GError **error);
+};
+
+GType g_pollable_input_stream_get_type (void) G_GNUC_CONST;
+
+gboolean g_pollable_input_stream_can_poll (GPollableInputStream *stream);
+
+gboolean g_pollable_input_stream_is_readable (GPollableInputStream *stream);
+GSource *g_pollable_input_stream_create_source (GPollableInputStream *stream,
+ GCancellable *cancellable);
+
+gssize g_pollable_input_stream_read_nonblocking (GPollableInputStream *stream,
+ void *buffer,
+ gsize size,
+ GCancellable *cancellable,
+ GError **error);
+
+/* Helper method for stream implementations */
+GSource *g_pollable_source_new (GObject *stream);
+
+G_END_DECLS
+
+
+#endif /* __G_POLLABLE_INPUT_STREAM_H__ */
+
diff --git a/gio/gpollableoutputstream.c b/gio/gpollableoutputstream.c
new file mode 100644
index 000000000..cb9138aad
--- /dev/null
+++ b/gio/gpollableoutputstream.c
@@ -0,0 +1,201 @@
+/* GIO - GLib Input, Output and Streaming Library
+ *
+ * Copyright (C) 2010 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General
+ * Public License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#include "config.h"
+
+#include <errno.h>
+
+#include "gpollableoutputstream.h"
+#include "gasynchelper.h"
+#include "gfiledescriptorbased.h"
+#include "gio-marshal.h"
+#include "glibintl.h"
+
+/**
+ * SECTION:gpollableoutputstream
+ * @short_description: Interface for pollable output streams
+ * @include: gio/gio.h
+ * @see_also: #GOutputStream, #GFileDescriptorBased, #GPollableInputStream
+ *
+ * #GPollableOutputStream is implemented by #GOutputStream<!-- -->s that
+ * can be polled for readiness to write. This can be used when
+ * interfacing with a non-gio API that expects
+ * unix-file-descriptor-style asynchronous I/O rather than gio-style.
+ *
+ * Since: 2.28
+ */
+
+G_DEFINE_INTERFACE (GPollableOutputStream, g_pollable_output_stream, G_TYPE_OUTPUT_STREAM)
+
+static gboolean g_pollable_output_stream_default_can_poll (GPollableOutputStream *stream);
+static gssize g_pollable_output_stream_default_write_nonblocking (GPollableOutputStream *stream,
+ const void *buffer,
+ gsize size,
+ GError **error);
+
+static void
+g_pollable_output_stream_default_init (GPollableOutputStreamInterface *iface)
+{
+ iface->can_poll = g_pollable_output_stream_default_can_poll;
+ iface->write_nonblocking = g_pollable_output_stream_default_write_nonblocking;
+}
+
+static gboolean
+g_pollable_output_stream_default_can_poll (GPollableOutputStream *stream)
+{
+ return TRUE;
+}
+
+/**
+ * g_pollable_output_stream_can_poll:
+ * @stream: a #GPollableOutputStream.
+ *
+ * Checks if @stream is actually pollable. Some classes may implement
+ * #GPollableOutputStream but have only certain instances of that
+ * class be pollable. If this method returns %FALSE, then the behavior
+ * of other #GPollableOutputStream methods is undefined.
+ *
+ * For any given stream, the value returned by this method is constant;
+ * a stream cannot switch from pollable to non-pollable or vice versa.
+ *
+ * Returns: %TRUE if @stream is pollable, %FALSE if not.
+ *
+ * Since: 2.28
+ */
+gboolean
+g_pollable_output_stream_can_poll (GPollableOutputStream *stream)
+{
+ g_return_val_if_fail (G_IS_POLLABLE_OUTPUT_STREAM (stream), FALSE);
+
+ return G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->can_poll (stream);
+}
+
+/**
+ * g_pollable_output_stream_is_writable:
+ * @stream: a #GPollableOutputStream.
+ *
+ * Checks if @stream can be written.
+ *
+ * Note that some stream types may not be able to implement this 100%
+ * reliably, and it is possible that a call to g_output_stream_write()
+ * after this returns %TRUE would still block. To guarantee
+ * non-blocking behavior, you should always use
+ * g_pollable_output_stream_write_nonblocking(), which will return a
+ * %G_IO_ERROR_WOULD_BLOCK error rather than blocking.
+ *
+ * Returns: %TRUE if @stream is writable, %FALSE if not. If an error
+ * has occurred on @stream, this will result in
+ * g_pollable_output_stream_is_writable() returning %TRUE, and the
+ * next attempt to write will return the error.
+ *
+ * Since: 2.28
+ */
+gboolean
+g_pollable_output_stream_is_writable (GPollableOutputStream *stream)
+{
+ g_return_val_if_fail (G_IS_POLLABLE_OUTPUT_STREAM (stream), FALSE);
+
+ return G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->is_writable (stream);
+}
+
+/**
+ * g_pollable_output_stream_create_source:
+ * @stream: a #GPollableOutputStream.
+ * @cancellable: a #GCancellable, or %NULL
+ *
+ * Creates a #GSource that triggers when @stream can be written, or
+ * @cancellable is triggered or an error occurs. The callback on the
+ * source is of the #GPollableSourceFunc type.
+ *
+ * As with g_pollable_output_stream_is_writable(), it is possible that
+ * the stream may not actually be writable even after the source
+ * triggers, so you should use
+ * g_pollable_output_stream_write_nonblocking() rather than
+ * g_output_stream_write() from the callback.
+ *
+ * Returns: a new #GSource
+ *
+ * Since: 2.28
+ */
+GSource *
+g_pollable_output_stream_create_source (GPollableOutputStream *stream,
+ GCancellable *cancellable)
+{
+ g_return_val_if_fail (G_IS_POLLABLE_OUTPUT_STREAM (stream), NULL);
+
+ return G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
+ create_source (stream, cancellable);
+}
+
+static gssize
+g_pollable_output_stream_default_write_nonblocking (GPollableOutputStream *stream,
+ const void *buffer,
+ gsize size,
+ GError **error)
+{
+ if (!g_pollable_output_stream_is_writable (stream))
+ {
+ g_set_error (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
+ g_strerror (EAGAIN));
+ return -1;
+ }
+
+ return g_output_stream_write (G_OUTPUT_STREAM (stream), buffer, size,
+ NULL, error);
+}
+
+/**
+ * g_pollable_output_stream_write_nonblocking:
+ * @stream: a #GPollableOutputStream
+ * @buffer: a buffer to write data from
+ * @size: the number of bytes you want to write
+ * @cancellable: a #GCancellable, or %NULL
+ * @error: #GError for error reporting, or %NULL to ignore.
+ *
+ * Attempts to write up to @size bytes from @buffer to @stream, as
+ * with g_output_stream_write(). If @stream is not currently writable,
+ * this will immediately return %G_IO_ERROR_WOULD_BLOCK, and you can
+ * use g_pollable_output_stream_create_source() to create a #GSource
+ * that will be triggered when @stream is writable.
+ *
+ * Note that since this method never blocks, you cannot actually
+ * use @cancellable to cancel it. However, it will return an error
+ * if @cancellable has already been cancelled when you call, which
+ * may happen if you call this method after a source triggers due
+ * to having been cancelled.
+ *
+ * Return value: the number of bytes written, or -1 on error (including
+ * %G_IO_ERROR_WOULD_BLOCK).
+ */
+gssize
+g_pollable_output_stream_write_nonblocking (GPollableOutputStream *stream,
+ const void *buffer,
+ gsize size,
+ GCancellable *cancellable,
+ GError **error)
+{
+ g_return_val_if_fail (G_IS_POLLABLE_OUTPUT_STREAM (stream), -1);
+
+ if (g_cancellable_set_error_if_cancelled (cancellable, error))
+ return -1;
+
+ return G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
+ write_nonblocking (stream, buffer, size, error);
+}
diff --git a/gio/gpollableoutputstream.h b/gio/gpollableoutputstream.h
new file mode 100644
index 000000000..abef0ede6
--- /dev/null
+++ b/gio/gpollableoutputstream.h
@@ -0,0 +1,98 @@
+/* GIO - GLib Input, Output and Streaming Library
+ *
+ * Copyright (C) 2010 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General
+ * Public License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __G_POLLABLE_OUTPUT_STREAM_H__
+#define __G_POLLABLE_OUTPUT_STREAM_H__
+
+#include <gio/gio.h>
+
+G_BEGIN_DECLS
+
+#define G_TYPE_POLLABLE_OUTPUT_STREAM (g_pollable_output_stream_get_type ())
+#define G_POLLABLE_OUTPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), G_TYPE_POLLABLE_OUTPUT_STREAM, GPollableOutputStream))
+#define G_IS_POLLABLE_OUTPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), G_TYPE_POLLABLE_OUTPUT_STREAM))
+#define G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE(obj) (G_TYPE_INSTANCE_GET_INTERFACE ((obj), G_TYPE_POLLABLE_OUTPUT_STREAM, GPollableOutputStreamInterface))
+
+/**
+ * GPollableOutputStream:
+ *
+ * An interface for a #GOutputStream that can be polled for readability.
+ *
+ * Since: 2.28
+ */
+typedef struct _GPollableOutputStreamInterface GPollableOutputStreamInterface;
+
+/**
+ * GPollableOutputStreamInterface:
+ * @g_iface: The parent interface.
+ * @can_poll: Checks if the #GPollableOutputStream instance is actually pollable
+ * @is_writable: Checks if the stream is writable
+ * @create_source: Creates a #GSource to poll the stream
+ * @write_nonblocking: Does a non-blocking write or returns
+ * %G_IO_ERROR_WOULD_BLOCK
+ *
+ * The interface for pollable output streams.
+ *
+ * The default implementation of @can_poll always returns %TRUE.
+ *
+ * The default implementation of @write_nonblocking calls
+ * g_pollable_output_stream_is_writable(), and then calls
+ * g_output_stream_write() if it returns %TRUE. This means you only
+ * need to override it if it is possible that your @is_writable
+ * implementation may return %TRUE when the stream is not actually
+ * writable.
+ *
+ * Since: 2.28
+ */
+struct _GPollableOutputStreamInterface
+{
+ GTypeInterface g_iface;
+
+ /* Virtual Table */
+ gboolean (*can_poll) (GPollableOutputStream *stream);
+
+ gboolean (*is_writable) (GPollableOutputStream *stream);
+ GSource * (*create_source) (GPollableOutputStream *stream,
+ GCancellable *cancellable);
+ gssize (*write_nonblocking) (GPollableOutputStream *stream,
+ const void *buffer,
+ gsize size,
+ GError **error);
+};
+
+GType g_pollable_output_stream_get_type (void) G_GNUC_CONST;
+
+gboolean g_pollable_output_stream_can_poll (GPollableOutputStream *stream);
+
+gboolean g_pollable_output_stream_is_writable (GPollableOutputStream *stream);
+GSource *g_pollable_output_stream_create_source (GPollableOutputStream *stream,
+ GCancellable *cancellable);
+
+gssize g_pollable_output_stream_write_nonblocking (GPollableOutputStream *stream,
+ const void *buffer,
+ gsize size,
+ GCancellable *cancellable,
+ GError **error);
+
+G_END_DECLS
+
+
+#endif /* __G_POLLABLE_OUTPUT_STREAM_H__ */
+
diff --git a/gio/gsocketconnection.c b/gio/gsocketconnection.c
index 495f81c36..852805af3 100644
--- a/gio/gsocketconnection.c
+++ b/gio/gsocketconnection.c
@@ -60,8 +60,7 @@
* Since: 2.22
*/
-G_DEFINE_TYPE (GSocketConnection,
- g_socket_connection, G_TYPE_IO_STREAM);
+G_DEFINE_TYPE (GSocketConnection, g_socket_connection, G_TYPE_IO_STREAM);
enum
{
diff --git a/gio/gsocketinputstream.c b/gio/gsocketinputstream.c
index 4a27d9034..fb9da1bf1 100644
--- a/gio/gsocketinputstream.c
+++ b/gio/gsocketinputstream.c
@@ -27,13 +27,18 @@
#include "gsocketinputstream.h"
#include "glibintl.h"
-#include <gio/gsimpleasyncresult.h>
-#include <gio/gcancellable.h>
-#include <gio/gioerror.h>
+#include "gsimpleasyncresult.h"
+#include "gcancellable.h"
+#include "gpollableinputstream.h"
+#include "gioerror.h"
+static void g_socket_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
+
#define g_socket_input_stream_get_type _g_socket_input_stream_get_type
-G_DEFINE_TYPE (GSocketInputStream, g_socket_input_stream, G_TYPE_INPUT_STREAM);
+G_DEFINE_TYPE_WITH_CODE (GSocketInputStream, g_socket_input_stream, G_TYPE_INPUT_STREAM,
+ G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM, g_socket_input_stream_pollable_iface_init)
+ )
enum
{
@@ -205,6 +210,44 @@ g_socket_input_stream_read_finish (GInputStream *stream,
return count;
}
+static gboolean
+g_socket_input_stream_pollable_is_readable (GPollableInputStream *pollable)
+{
+ GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (pollable);
+
+ return g_socket_condition_check (input_stream->priv->socket, G_IO_IN);
+}
+
+static GSource *
+g_socket_input_stream_pollable_create_source (GPollableInputStream *pollable,
+ GCancellable *cancellable)
+{
+ GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (pollable);
+ GSource *socket_source, *pollable_source;
+
+ pollable_source = g_pollable_source_new (G_OBJECT (input_stream));
+ socket_source = g_socket_create_source (input_stream->priv->socket,
+ G_IO_IN, cancellable);
+ g_source_set_dummy_callback (socket_source);
+ g_source_add_child_source (pollable_source, socket_source);
+ g_source_unref (socket_source);
+
+ return pollable_source;
+}
+
+static gssize
+g_socket_input_stream_pollable_read_nonblocking (GPollableInputStream *pollable,
+ void *buffer,
+ gsize size,
+ GError **error)
+{
+ GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (pollable);
+
+ return g_socket_receive_with_blocking (input_stream->priv->socket,
+ buffer, size, FALSE,
+ NULL, error);
+}
+
static void
g_socket_input_stream_class_init (GSocketInputStreamClass *klass)
{
@@ -230,6 +273,14 @@ g_socket_input_stream_class_init (GSocketInputStreamClass *klass)
}
static void
+g_socket_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
+{
+ iface->is_readable = g_socket_input_stream_pollable_is_readable;
+ iface->create_source = g_socket_input_stream_pollable_create_source;
+ iface->read_nonblocking = g_socket_input_stream_pollable_read_nonblocking;
+}
+
+static void
g_socket_input_stream_init (GSocketInputStream *stream)
{
stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream, G_TYPE_SOCKET_INPUT_STREAM, GSocketInputStreamPrivate);
diff --git a/gio/gsocketoutputstream.c b/gio/gsocketoutputstream.c
index 4febf8803..0ef336016 100644
--- a/gio/gsocketoutputstream.c
+++ b/gio/gsocketoutputstream.c
@@ -28,14 +28,20 @@
#include "gsocketoutputstream.h"
#include "gsocket.h"
-#include <gio/gsimpleasyncresult.h>
-#include <gio/gcancellable.h>
-#include <gio/gioerror.h>
+#include "gsimpleasyncresult.h"
+#include "gcancellable.h"
+#include "gpollableinputstream.h"
+#include "gpollableoutputstream.h"
+#include "gioerror.h"
#include "glibintl.h"
+static void g_socket_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface);
+
#define g_socket_output_stream_get_type _g_socket_output_stream_get_type
-G_DEFINE_TYPE (GSocketOutputStream, g_socket_output_stream, G_TYPE_OUTPUT_STREAM);
+G_DEFINE_TYPE_WITH_CODE (GSocketOutputStream, g_socket_output_stream, G_TYPE_OUTPUT_STREAM,
+ G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM, g_socket_output_stream_pollable_iface_init)
+ )
enum
{
@@ -207,6 +213,44 @@ g_socket_output_stream_write_finish (GOutputStream *stream,
return count;
}
+static gboolean
+g_socket_output_stream_pollable_is_writable (GPollableOutputStream *pollable)
+{
+ GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (pollable);
+
+ return g_socket_condition_check (output_stream->priv->socket, G_IO_OUT);
+}
+
+static GSource *
+g_socket_output_stream_pollable_create_source (GPollableOutputStream *pollable,
+ GCancellable *cancellable)
+{
+ GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (pollable);
+ GSource *socket_source, *pollable_source;
+
+ pollable_source = g_pollable_source_new (G_OBJECT (output_stream));
+ socket_source = g_socket_create_source (output_stream->priv->socket,
+ G_IO_OUT, cancellable);
+ g_source_set_dummy_callback (socket_source);
+ g_source_add_child_source (pollable_source, socket_source);
+ g_source_unref (socket_source);
+
+ return pollable_source;
+}
+
+static gssize
+g_socket_output_stream_pollable_write_nonblocking (GPollableOutputStream *pollable,
+ const void *buffer,
+ gsize size,
+ GError **error)
+{
+ GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (pollable);
+
+ return g_socket_send_with_blocking (output_stream->priv->socket,
+ buffer, size, FALSE,
+ NULL, error);
+}
+
static void
g_socket_output_stream_class_init (GSocketOutputStreamClass *klass)
{
@@ -232,6 +276,14 @@ g_socket_output_stream_class_init (GSocketOutputStreamClass *klass)
}
static void
+g_socket_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface)
+{
+ iface->is_writable = g_socket_output_stream_pollable_is_writable;
+ iface->create_source = g_socket_output_stream_pollable_create_source;
+ iface->write_nonblocking = g_socket_output_stream_pollable_write_nonblocking;
+}
+
+static void
g_socket_output_stream_init (GSocketOutputStream *stream)
{
stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream, G_TYPE_SOCKET_OUTPUT_STREAM, GSocketOutputStreamPrivate);
diff --git a/gio/gunixinputstream.c b/gio/gunixinputstream.c
index e1ee34ac1..43a924729 100644
--- a/gio/gunixinputstream.c
+++ b/gio/gunixinputstream.c
@@ -60,7 +60,12 @@ enum {
PROP_CLOSE_FD
};
-G_DEFINE_TYPE (GUnixInputStream, g_unix_input_stream, G_TYPE_INPUT_STREAM);
+static void g_unix_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
+
+G_DEFINE_TYPE_WITH_CODE (GUnixInputStream, g_unix_input_stream, G_TYPE_INPUT_STREAM,
+ G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
+ g_unix_input_stream_pollable_iface_init)
+ );
struct _GUnixInputStreamPrivate {
int fd;
@@ -111,6 +116,9 @@ static gboolean g_unix_input_stream_close_finish (GInputStream *stream,
GAsyncResult *result,
GError **error);
+static gboolean g_unix_input_stream_pollable_is_readable (GPollableInputStream *stream);
+static GSource *g_unix_input_stream_pollable_create_source (GPollableInputStream *stream,
+ GCancellable *cancellable);
static void
g_unix_input_stream_finalize (GObject *object)
@@ -175,6 +183,13 @@ g_unix_input_stream_class_init (GUnixInputStreamClass *klass)
}
static void
+g_unix_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
+{
+ iface->is_readable = g_unix_input_stream_pollable_is_readable;
+ iface->create_source = g_unix_input_stream_pollable_create_source;
+}
+
+static void
g_unix_input_stream_set_property (GObject *object,
guint prop_id,
const GValue *value,
@@ -637,3 +652,37 @@ g_unix_input_stream_close_finish (GInputStream *stream,
/* Failures handled in generic close_finish code */
return TRUE;
}
+
+static gboolean
+g_unix_input_stream_pollable_is_readable (GPollableInputStream *stream)
+{
+ GUnixInputStream *unix_stream = G_UNIX_INPUT_STREAM (stream);
+ GPollFD poll_fd;
+ gint result;
+
+ poll_fd.fd = unix_stream->priv->fd;
+ poll_fd.events = G_IO_IN;
+
+ do
+ result = g_poll (&poll_fd, 1, 0);
+ while (result == -1 && errno == EINTR);
+
+ return poll_fd.revents != 0;
+}
+
+static GSource *
+g_unix_input_stream_pollable_create_source (GPollableInputStream *stream,
+ GCancellable *cancellable)
+{
+ GUnixInputStream *unix_stream = G_UNIX_INPUT_STREAM (stream);
+ GSource *inner_source, *pollable_source;
+
+ pollable_source = g_pollable_source_new (G_OBJECT (stream));
+
+ inner_source = _g_fd_source_new (unix_stream->priv->fd, G_IO_IN, cancellable);
+ g_source_set_dummy_callback (inner_source);
+ g_source_add_child_source (pollable_source, inner_source);
+ g_source_unref (inner_source);
+
+ return pollable_source;
+}
diff --git a/gio/gunixoutputstream.c b/gio/gunixoutputstream.c
index a0acc318d..81021be8e 100644
--- a/gio/gunixoutputstream.c
+++ b/gio/gunixoutputstream.c
@@ -60,8 +60,12 @@ enum {
PROP_CLOSE_FD
};
-G_DEFINE_TYPE (GUnixOutputStream, g_unix_output_stream, G_TYPE_OUTPUT_STREAM);
+static void g_unix_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface);
+G_DEFINE_TYPE_WITH_CODE (GUnixOutputStream, g_unix_output_stream, G_TYPE_OUTPUT_STREAM,
+ G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM,
+ g_unix_output_stream_pollable_iface_init)
+ );
struct _GUnixOutputStreamPrivate {
int fd;
@@ -103,6 +107,9 @@ static gboolean g_unix_output_stream_close_finish (GOutputStream *stream,
GAsyncResult *result,
GError **error);
+static gboolean g_unix_output_stream_pollable_is_writable (GPollableOutputStream *stream);
+static GSource *g_unix_output_stream_pollable_create_source (GPollableOutputStream *stream,
+ GCancellable *cancellable);
static void
g_unix_output_stream_finalize (GObject *object)
@@ -161,6 +168,13 @@ g_unix_output_stream_class_init (GUnixOutputStreamClass *klass)
}
static void
+g_unix_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface)
+{
+ iface->is_writable = g_unix_output_stream_pollable_is_writable;
+ iface->create_source = g_unix_output_stream_pollable_create_source;
+}
+
+static void
g_unix_output_stream_set_property (GObject *object,
guint prop_id,
const GValue *value,
@@ -593,3 +607,37 @@ g_unix_output_stream_close_finish (GOutputStream *stream,
/* Failures handled in generic close_finish code */
return TRUE;
}
+
+static gboolean
+g_unix_output_stream_pollable_is_writable (GPollableOutputStream *stream)
+{
+ GUnixOutputStream *unix_stream = G_UNIX_OUTPUT_STREAM (stream);
+ GPollFD poll_fd;
+ gint result;
+
+ poll_fd.fd = unix_stream->priv->fd;
+ poll_fd.events = G_IO_OUT;
+
+ do
+ result = g_poll (&poll_fd, 1, 0);
+ while (result == -1 && errno == EINTR);
+
+ return poll_fd.revents != 0;
+}
+
+static GSource *
+g_unix_output_stream_pollable_create_source (GPollableOutputStream *stream,
+ GCancellable *cancellable)
+{
+ GUnixOutputStream *unix_stream = G_UNIX_OUTPUT_STREAM (stream);
+ GSource *inner_source, *pollable_source;
+
+ pollable_source = g_pollable_source_new (G_OBJECT (stream));
+
+ inner_source = _g_fd_source_new (unix_stream->priv->fd, G_IO_OUT, cancellable);
+ g_source_set_dummy_callback (inner_source);
+ g_source_add_child_source (pollable_source, inner_source);
+ g_source_unref (inner_source);
+
+ return pollable_source;
+}
diff --git a/gio/tests/.gitignore b/gio/tests/.gitignore
index c987b6471..0663fefcd 100644
--- a/gio/tests/.gitignore
+++ b/gio/tests/.gitignore
@@ -60,6 +60,7 @@ memory-input-stream
memory-output-stream
network-address
org.gtk.test.enums.xml
+pollable
proxy
readwrite
resolver
diff --git a/gio/tests/Makefile.am b/gio/tests/Makefile.am
index 5e65c08e2..ad2273a42 100644
--- a/gio/tests/Makefile.am
+++ b/gio/tests/Makefile.am
@@ -43,6 +43,7 @@ TEST_PROGS += \
network-address \
gdbus-message \
socket \
+ pollable \
$(NULL)
if OS_UNIX
@@ -204,6 +205,9 @@ network_address_LDADD = $(progs_ldadd)
socket_SOURCE = socket.c
socket_LDADD = $(progs_ldadd)
+pollable_SOURCE = pollable.c
+pollable_LDADD = $(progs_ldadd)
+
contexts_SOURCES = contexts.c
contexts_LDADD = $(progs_ldadd) \
$(top_builddir)/gthread/libgthread-2.0.la
diff --git a/gio/tests/pollable.c b/gio/tests/pollable.c
new file mode 100644
index 000000000..8669e2b1a
--- /dev/null
+++ b/gio/tests/pollable.c
@@ -0,0 +1,240 @@
+/* GLib testing framework examples and tests
+ *
+ * Copyright (C) 2010 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General
+ * Public License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#include <gio/gio.h>
+
+#ifdef G_OS_UNIX
+#include <gio/gunixinputstream.h>
+#include <gio/gunixoutputstream.h>
+#endif
+
+GMainLoop *loop;
+GPollableInputStream *in;
+GOutputStream *out;
+
+static gboolean
+poll_source_callback (GPollableInputStream *in,
+ gpointer user_data)
+{
+ GError *error = NULL;
+ char buf[2];
+ gssize nread;
+ gboolean *success = user_data;
+
+ nread = g_pollable_input_stream_read_nonblocking (in, buf, 2, NULL, &error);
+ g_assert_no_error (error);
+ g_assert_cmpint (nread, ==, 2);
+ g_assert_cmpstr (buf, ==, "x");
+
+ *success = TRUE;
+ return FALSE;
+}
+
+static gboolean
+check_source_readability_callback (gpointer user_data)
+{
+ gboolean expected = GPOINTER_TO_INT (user_data);
+ gboolean readable;
+
+ readable = g_pollable_input_stream_is_readable (in);
+ g_assert_cmpint (readable, ==, expected);
+ return FALSE;
+}
+
+static gboolean
+write_callback (gpointer user_data)
+{
+ char *buf = "x";
+ gssize nwrote;
+ GError *error = NULL;
+
+ nwrote = g_output_stream_write (out, buf, 2, NULL, &error);
+ g_assert_no_error (error);
+ g_assert_cmpint (nwrote, ==, 2);
+
+ check_source_readability_callback (GINT_TO_POINTER (TRUE));
+
+ return FALSE;
+}
+
+static gboolean
+check_source_and_quit_callback (gpointer user_data)
+{
+ check_source_readability_callback (user_data);
+ g_main_loop_quit (loop);
+ return FALSE;
+}
+
+static void
+test_streams (void)
+{
+ gboolean readable;
+ GError *error = NULL;
+ char buf[1];
+ gssize nread;
+ GSource *poll_source;
+ gboolean success = FALSE;
+
+ readable = g_pollable_input_stream_is_readable (in);
+ g_assert (!readable);
+
+ nread = g_pollable_input_stream_read_nonblocking (in, buf, 1, NULL, &error);
+ g_assert_cmpint (nread, ==, -1);
+ g_assert_error (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
+ g_clear_error (&error);
+
+ /* Create 4 sources, in decreasing order of priority:
+ * 1. poll source on @in
+ * 2. idle source that checks if @in is readable once
+ * (it won't be) and then removes itself
+ * 3. idle source that writes a byte to @out, checks that
+ * @in is now readable, and removes itself
+ * 4. idle source that checks if @in is readable once
+ * (it won't be, since the poll source will fire before
+ * this one does) and then quits the loop.
+ *
+ * If the poll source triggers before it should, then it will get a
+ * %G_IO_ERROR_WOULD_BLOCK, and if check() fails in either
+ * direction, we will catch it at some point.
+ */
+
+ poll_source = g_pollable_input_stream_create_source (in, NULL);
+ g_source_set_priority (poll_source, 1);
+ g_source_set_callback (poll_source, (GSourceFunc) poll_source_callback, &success, NULL);
+ g_source_attach (poll_source, NULL);
+ g_source_unref (poll_source);
+
+ g_idle_add_full (2, check_source_readability_callback, GINT_TO_POINTER (FALSE), NULL);
+ g_idle_add_full (3, write_callback, NULL, NULL);
+ g_idle_add_full (4, check_source_and_quit_callback, GINT_TO_POINTER (FALSE), NULL);
+
+ loop = g_main_loop_new (NULL, FALSE);
+ g_main_loop_run (loop);
+ g_main_loop_unref (loop);
+
+ g_assert_cmpint (success, ==, TRUE);
+}
+
+#ifdef G_OS_UNIX
+static void
+test_pollable_unix (void)
+{
+ int pipefds[2], status;
+
+ status = pipe (pipefds);
+ g_assert_cmpint (status, ==, 0);
+
+ in = G_POLLABLE_INPUT_STREAM (g_unix_input_stream_new (pipefds[0], TRUE));
+ out = g_unix_output_stream_new (pipefds[1], TRUE);
+
+ test_streams ();
+
+ g_object_unref (in);
+ g_object_unref (out);
+}
+#endif
+
+static void
+client_connected (GObject *source,
+ GAsyncResult *result,
+ gpointer user_data)
+{
+ GSocketClient *client = G_SOCKET_CLIENT (source);
+ GSocketConnection **conn = user_data;
+ GError *error = NULL;
+
+ *conn = g_socket_client_connect_finish (client, result, &error);
+ g_assert_no_error (error);
+}
+
+static void
+server_connected (GObject *source,
+ GAsyncResult *result,
+ gpointer user_data)
+{
+ GSocketListener *listener = G_SOCKET_LISTENER (source);
+ GSocketConnection **conn = user_data;
+ GError *error = NULL;
+
+ *conn = g_socket_listener_accept_finish (listener, result, NULL, &error);
+ g_assert_no_error (error);
+}
+
+static void
+test_pollable_socket (void)
+{
+ GInetAddress *iaddr;
+ GSocketAddress *saddr, *effective_address;
+ GSocketListener *listener;
+ GSocketClient *client;
+ GError *error = NULL;
+ GSocketConnection *client_conn = NULL, *server_conn = NULL;
+
+ iaddr = g_inet_address_new_loopback (G_SOCKET_FAMILY_IPV4);
+ saddr = g_inet_socket_address_new (iaddr, 0);
+ g_object_unref (iaddr);
+
+ listener = g_socket_listener_new ();
+ g_socket_listener_add_address (listener, saddr,
+ G_SOCKET_TYPE_STREAM,
+ G_SOCKET_PROTOCOL_TCP,
+ NULL,
+ &effective_address,
+ &error);
+ g_assert_no_error (error);
+ g_object_unref (saddr);
+
+ client = g_socket_client_new ();
+
+ g_socket_client_connect_async (client,
+ G_SOCKET_CONNECTABLE (effective_address),
+ NULL, client_connected, &client_conn);
+ g_socket_listener_accept_async (listener, NULL,
+ server_connected, &server_conn);
+
+ while (!client_conn || !server_conn)
+ g_main_context_iteration (NULL, TRUE);
+
+ in = G_POLLABLE_INPUT_STREAM (g_io_stream_get_input_stream (G_IO_STREAM (client_conn)));
+ out = g_io_stream_get_output_stream (G_IO_STREAM (server_conn));
+
+ test_streams ();
+
+ g_object_unref (client_conn);
+ g_object_unref (server_conn);
+ g_object_unref (client);
+ g_object_unref (listener);
+}
+
+int
+main (int argc,
+ char *argv[])
+{
+ g_type_init ();
+ g_test_init (&argc, &argv, NULL);
+
+#ifdef G_OS_UNIX
+ g_test_add_func ("/pollable/unix", test_pollable_unix);
+#endif
+ g_test_add_func ("/pollable/socket", test_pollable_socket);
+
+ return g_test_run();
+}
+