summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/plugins/Makefile.am1
-rw-r--r--docs/plugins/gst-plugins-base-plugins-docs.sgml1
-rw-r--r--docs/plugins/gst-plugins-base-plugins-sections.txt14
-rw-r--r--docs/plugins/inspect/plugin-tcp.xml15
-rw-r--r--gst/tcp/Makefile.am6
-rw-r--r--gst/tcp/gstsocketsrc.c350
-rw-r--r--gst/tcp/gstsocketsrc.h63
-rw-r--r--gst/tcp/gsttcpplugin.c4
-rw-r--r--tests/check/pipelines/tcp.c69
-rw-r--r--win32/vs7/libgsttcp.vcproj3
-rw-r--r--win32/vs8/libgsttcp.vcproj4
11 files changed, 528 insertions, 2 deletions
diff --git a/docs/plugins/Makefile.am b/docs/plugins/Makefile.am
index f6e467d39..640948cf0 100644
--- a/docs/plugins/Makefile.am
+++ b/docs/plugins/Makefile.am
@@ -80,6 +80,7 @@ EXTRA_HFILES = \
$(top_srcdir)/gst/audiorate/gstaudiorate.h \
$(top_srcdir)/gst/audioresample/gstaudioresample.h \
$(top_srcdir)/gst/tcp/gstmultisocketsink.h \
+ $(top_srcdir)/gst/tcp/gstsocketsrc.h \
$(top_srcdir)/gst/tcp/gsttcpclientsrc.h \
$(top_srcdir)/gst/tcp/gsttcpclientsink.h \
$(top_srcdir)/gst/tcp/gsttcpserversrc.h \
diff --git a/docs/plugins/gst-plugins-base-plugins-docs.sgml b/docs/plugins/gst-plugins-base-plugins-docs.sgml
index 6ae5478bb..ed4870965 100644
--- a/docs/plugins/gst-plugins-base-plugins-docs.sgml
+++ b/docs/plugins/gst-plugins-base-plugins-docs.sgml
@@ -43,6 +43,7 @@
<xi:include href="xml/element-playsink.xml" />
<xi:include href="xml/element-streamsynchronizer.xml" />
<xi:include href="xml/element-subtitleoverlay.xml" />
+ <xi:include href="xml/element-socketsrc.xml" />
<xi:include href="xml/element-tcpclientsrc.xml" />
<xi:include href="xml/element-tcpclientsink.xml" />
<xi:include href="xml/element-tcpserversrc.xml" />
diff --git a/docs/plugins/gst-plugins-base-plugins-sections.txt b/docs/plugins/gst-plugins-base-plugins-sections.txt
index 858dd0bd7..77781672b 100644
--- a/docs/plugins/gst-plugins-base-plugins-sections.txt
+++ b/docs/plugins/gst-plugins-base-plugins-sections.txt
@@ -486,6 +486,20 @@ gst_play_sink_plugin_init
</SECTION>
<SECTION>
+<FILE>element-socketsrc</FILE>
+<TITLE>socketsrc</TITLE>
+GstSocketSrc
+<SUBSECTION Standard>
+GstSocketSrcClass
+GST_SOCKET_SRC
+GST_SOCKET_SINK_CLASS
+GST_TYPE_SOCKET_SRC
+gst_socket_src_get_type
+GST_IS_SOCKET_SRC_CLASS
+GST_IS_SOCKET_SRC
+</SECTION>
+
+<SECTION>
<FILE>element-streamsynchronizer</FILE>
<TITLE>streamsynchronizer</TITLE>
GstStreamSynchronizer
diff --git a/docs/plugins/inspect/plugin-tcp.xml b/docs/plugins/inspect/plugin-tcp.xml
index 6e15bed49..1a24a301e 100644
--- a/docs/plugins/inspect/plugin-tcp.xml
+++ b/docs/plugins/inspect/plugin-tcp.xml
@@ -40,6 +40,21 @@
</pads>
</element>
<element>
+ <name>socketsrc</name>
+ <longname>socket source</longname>
+ <class>Source/Network</class>
+ <description>Receive data from a socket</description>
+ <author>William Manley &lt;will@williammanley.net&gt;</author>
+ <pads>
+ <caps>
+ <name>src</name>
+ <direction>source</direction>
+ <presence>always</presence>
+ <details>ANY</details>
+ </caps>
+ </pads>
+ </element>
+ <element>
<name>tcpclientsink</name>
<longname>TCP client sink</longname>
<class>Sink/Network</class>
diff --git a/gst/tcp/Makefile.am b/gst/tcp/Makefile.am
index c6f50d9f9..bc91a427d 100644
--- a/gst/tcp/Makefile.am
+++ b/gst/tcp/Makefile.am
@@ -8,6 +8,7 @@ multifdsink_SOURCES =
endif
libgsttcp_la_SOURCES = \
+ gstsocketsrc.c \
gsttcpplugin.c \
gsttcpclientsrc.c gsttcpclientsink.c \
$(multifdsink_SOURCES) \
@@ -15,12 +16,13 @@ libgsttcp_la_SOURCES = \
gstmultisocketsink.c \
gsttcpserversrc.c gsttcpserversink.c
-libgsttcp_la_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(GST_BASE_CFLAGS) $(GST_CFLAGS) $(GIO_CFLAGS)
+libgsttcp_la_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(GST_BASE_CFLAGS) $(GST_NET_CFLAGS) $(GST_CFLAGS) $(GIO_CFLAGS)
libgsttcp_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS)
-libgsttcp_la_LIBADD = $(GST_BASE_LIBS) $(GST_LIBS) $(GIO_LIBS)
+libgsttcp_la_LIBADD = $(GST_BASE_LIBS) $(GST_NET_LIBS) $(GST_LIBS) $(GIO_LIBS)
libgsttcp_la_LIBTOOLFLAGS = $(GST_PLUGIN_LIBTOOLFLAGS)
noinst_HEADERS = \
+ gstsocketsrc.h \
gsttcp.h \
gsttcpclientsrc.h gsttcpclientsink.h \
gstmultifdsink.h \
diff --git a/gst/tcp/gstsocketsrc.c b/gst/tcp/gstsocketsrc.c
new file mode 100644
index 000000000..cb70254d5
--- /dev/null
+++ b/gst/tcp/gstsocketsrc.c
@@ -0,0 +1,350 @@
+/* GStreamer
+ * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
+ * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
+ * Copyright (C) <2011> Collabora Ltd.
+ * Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
+ * Copyright (C) <2014> William Manley <will@williammanley.net>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * SECTION:element-socketsrc
+ *
+ * Receive data from a socket.
+ *
+ * As compared to other elements:
+ *
+ * socketsrc can be considered a source counterpart to the #multisocketsink
+ * sink.
+ *
+ * socketsrc can also be considered a generalization of #tcpclientsrc and
+ * #tcpserversrc: it contains all the logic required to communicate over the
+ * socket but none of the logic for creating the sockets/establishing the
+ * connection in the first place, allowing the user to accomplish this
+ * externally in whatever manner they wish making it applicable to other types
+ * of sockets besides TCP.
+ *
+ * As compared to #fdsrc socketsrc is socket specific and deals with #GSocket
+ * objects rather than sockets via integer file-descriptors.
+ *
+ * @see_also: #multisocketsink
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gst/gst-i18n-plugin.h>
+#include "gstsocketsrc.h"
+#include "gsttcp.h"
+
+GST_DEBUG_CATEGORY_STATIC (socketsrc_debug);
+#define GST_CAT_DEFAULT socketsrc_debug
+
+#define MAX_READ_SIZE 4 * 1024
+
+
+static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
+ GST_PAD_SRC,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS_ANY);
+
+
+enum
+{
+ PROP_0,
+ PROP_SOCKET,
+};
+
+#define gst_socket_src_parent_class parent_class
+G_DEFINE_TYPE (GstSocketSrc, gst_socket_src, GST_TYPE_PUSH_SRC);
+
+
+static void gst_socket_src_finalize (GObject * gobject);
+
+static GstFlowReturn gst_socket_src_create (GstPushSrc * psrc,
+ GstBuffer ** outbuf);
+static gboolean gst_socket_src_unlock (GstBaseSrc * bsrc);
+static gboolean gst_socket_src_unlock_stop (GstBaseSrc * bsrc);
+
+static void gst_socket_src_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec);
+static void gst_socket_src_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec);
+
+static void
+gst_socket_src_class_init (GstSocketSrcClass * klass)
+{
+ GObjectClass *gobject_class;
+ GstElementClass *gstelement_class;
+ GstBaseSrcClass *gstbasesrc_class;
+ GstPushSrcClass *gstpush_src_class;
+
+ gobject_class = (GObjectClass *) klass;
+ gstelement_class = (GstElementClass *) klass;
+ gstbasesrc_class = (GstBaseSrcClass *) klass;
+ gstpush_src_class = (GstPushSrcClass *) klass;
+
+ gobject_class->set_property = gst_socket_src_set_property;
+ gobject_class->get_property = gst_socket_src_get_property;
+ gobject_class->finalize = gst_socket_src_finalize;
+
+ g_object_class_install_property (gobject_class, PROP_SOCKET,
+ g_param_spec_object ("socket", "Socket",
+ "The socket to receive packets from", G_TYPE_SOCKET,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ gst_element_class_add_pad_template (gstelement_class,
+ gst_static_pad_template_get (&srctemplate));
+
+ gst_element_class_set_static_metadata (gstelement_class,
+ "socket source", "Source/Network",
+ "Receive data from a socket",
+ "Thomas Vander Stichele <thomas at apestaart dot org>, "
+ "William Manley <will@williammanley.net>");
+
+ gstbasesrc_class->unlock = gst_socket_src_unlock;
+ gstbasesrc_class->unlock_stop = gst_socket_src_unlock_stop;
+
+ gstpush_src_class->create = gst_socket_src_create;
+
+ GST_DEBUG_CATEGORY_INIT (socketsrc_debug, "socketsrc", 0, "Socket Source");
+}
+
+static void
+gst_socket_src_init (GstSocketSrc * this)
+{
+ this->socket = NULL;
+ this->cancellable = g_cancellable_new ();
+}
+
+static void
+gst_socket_src_finalize (GObject * gobject)
+{
+ GstSocketSrc *this = GST_SOCKET_SRC (gobject);
+
+ if (this->cancellable)
+ g_object_unref (this->cancellable);
+ this->cancellable = NULL;
+ if (this->socket)
+ g_object_unref (this->socket);
+ this->socket = NULL;
+
+ G_OBJECT_CLASS (parent_class)->finalize (gobject);
+}
+
+static GstFlowReturn
+gst_socket_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
+{
+ GstSocketSrc *src;
+ GstFlowReturn ret = GST_FLOW_OK;
+ gssize rret;
+ GError *err = NULL;
+ GstMapInfo map;
+ gssize avail, read;
+ GSocket *socket;
+
+ src = GST_SOCKET_SRC (psrc);
+
+ GST_OBJECT_LOCK (src);
+
+ socket = src->socket;
+ if (socket == NULL) {
+ GST_OBJECT_UNLOCK (src);
+ goto no_socket;
+ }
+ g_object_ref (socket);
+
+ GST_OBJECT_UNLOCK (src);
+
+ GST_LOG_OBJECT (src, "asked for a buffer");
+
+ /* read the buffer header */
+ avail = g_socket_get_available_bytes (socket);
+ if (avail < 0) {
+ goto get_available_error;
+ } else if (avail == 0) {
+ GIOCondition condition;
+
+ if (!g_socket_condition_wait (socket,
+ G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, src->cancellable, &err))
+ goto select_error;
+
+ condition =
+ g_socket_condition_check (socket,
+ G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
+
+ if ((condition & G_IO_ERR)) {
+ GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+ ("Socket in error state"));
+ *outbuf = NULL;
+ ret = GST_FLOW_ERROR;
+ goto done;
+ } else if ((condition & G_IO_HUP)) {
+ GST_DEBUG_OBJECT (src, "Connection closed");
+ *outbuf = NULL;
+ ret = GST_FLOW_EOS;
+ goto done;
+ }
+ avail = g_socket_get_available_bytes (socket);
+ if (avail < 0)
+ goto get_available_error;
+ }
+
+ if (avail > 0) {
+ read = MIN (avail, MAX_READ_SIZE);
+ *outbuf = gst_buffer_new_and_alloc (read);
+ gst_buffer_map (*outbuf, &map, GST_MAP_READWRITE);
+ rret =
+ g_socket_receive (socket, (gchar *) map.data, read,
+ src->cancellable, &err);
+ } else {
+ /* Connection closed */
+ *outbuf = NULL;
+ read = 0;
+ rret = 0;
+ }
+
+ if (rret == 0) {
+ GST_DEBUG_OBJECT (src, "Connection closed");
+ ret = GST_FLOW_EOS;
+ if (*outbuf) {
+ gst_buffer_unmap (*outbuf, &map);
+ gst_buffer_unref (*outbuf);
+ }
+ *outbuf = NULL;
+ } else if (rret < 0) {
+ if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+ ret = GST_FLOW_FLUSHING;
+ GST_DEBUG_OBJECT (src, "Cancelled reading from socket");
+ } else {
+ ret = GST_FLOW_ERROR;
+ GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+ ("Failed to read from socket: %s", err->message));
+ }
+ gst_buffer_unmap (*outbuf, &map);
+ gst_buffer_unref (*outbuf);
+ *outbuf = NULL;
+ } else {
+ ret = GST_FLOW_OK;
+ gst_buffer_unmap (*outbuf, &map);
+ gst_buffer_resize (*outbuf, 0, rret);
+
+ GST_LOG_OBJECT (src,
+ "Returning buffer from _get of size %" G_GSIZE_FORMAT ", ts %"
+ GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
+ ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
+ gst_buffer_get_size (*outbuf),
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*outbuf)),
+ GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)),
+ GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf));
+ }
+ g_clear_error (&err);
+
+done:
+ g_object_unref (socket);
+ return ret;
+
+select_error:
+ {
+ if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+ GST_DEBUG_OBJECT (src, "Cancelled");
+ ret = GST_FLOW_FLUSHING;
+ } else {
+ GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+ ("Select failed: %s", err->message));
+ ret = GST_FLOW_ERROR;
+ }
+ g_clear_error (&err);
+ g_object_unref (socket);
+ return ret;
+ }
+get_available_error:
+ {
+ GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+ ("Failed to get available bytes from socket"));
+ g_object_unref (socket);
+ return GST_FLOW_ERROR;
+ }
+no_socket:
+ {
+ GST_ELEMENT_ERROR (src, RESOURCE, NOT_FOUND, (NULL),
+ ("Cannot receive: No socket set on socketsrc"));
+ return GST_FLOW_ERROR;
+ }
+}
+
+#define SWAP(a, b) do { GSocket* tmp = a; a = b; b = tmp; } while (0);
+
+static void
+gst_socket_src_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ GstSocketSrc *socketsrc = GST_SOCKET_SRC (object);
+
+ switch (prop_id) {
+ case PROP_SOCKET:{
+ GSocket *socket = G_SOCKET (g_value_dup_object (value));
+ GST_OBJECT_LOCK (socketsrc);
+ SWAP (socket, socketsrc->socket);
+ GST_OBJECT_UNLOCK (socketsrc);
+ g_clear_object (&socket);
+ break;
+ }
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_socket_src_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec)
+{
+ GstSocketSrc *socketsrc = GST_SOCKET_SRC (object);
+
+ switch (prop_id) {
+ case PROP_SOCKET:
+ g_value_set_object (value, socketsrc->socket);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static gboolean
+gst_socket_src_unlock (GstBaseSrc * bsrc)
+{
+ GstSocketSrc *src = GST_SOCKET_SRC (bsrc);
+
+ GST_DEBUG_OBJECT (src, "set to flushing");
+ g_cancellable_cancel (src->cancellable);
+
+ return TRUE;
+}
+
+static gboolean
+gst_socket_src_unlock_stop (GstBaseSrc * bsrc)
+{
+ GstSocketSrc *src = GST_SOCKET_SRC (bsrc);
+
+ GST_DEBUG_OBJECT (src, "unset flushing");
+ g_cancellable_reset (src->cancellable);
+
+ return TRUE;
+}
diff --git a/gst/tcp/gstsocketsrc.h b/gst/tcp/gstsocketsrc.h
new file mode 100644
index 000000000..ba08a7486
--- /dev/null
+++ b/gst/tcp/gstsocketsrc.h
@@ -0,0 +1,63 @@
+/* GStreamer
+ * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
+ * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
+ * Copyright (C) <2014> William Manley <will@williammanley.net>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+
+#ifndef __GST_SOCKET_SRC_H__
+#define __GST_SOCKET_SRC_H__
+
+#include <gst/gst.h>
+#include <gst/base/gstpushsrc.h>
+
+#include <gio/gio.h>
+
+G_BEGIN_DECLS
+
+#define GST_TYPE_SOCKET_SRC \
+ (gst_socket_src_get_type())
+#define GST_SOCKET_SRC(obj) \
+ (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_SOCKET_SRC,GstSocketSrc))
+#define GST_SOCKET_SRC_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_SOCKET_SRC,GstSocketSrcClass))
+#define GST_IS_SOCKET_SRC(obj) \
+ (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_SOCKET_SRC))
+#define GST_IS_SOCKET_SRC_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_SOCKET_SRC))
+
+typedef struct _GstSocketSrc GstSocketSrc;
+typedef struct _GstSocketSrcClass GstSocketSrcClass;
+
+struct _GstSocketSrc {
+ GstPushSrc element;
+
+ /*< private >*/
+ GSocket *socket;
+ GCancellable *cancellable;
+};
+
+struct _GstSocketSrcClass {
+ GstPushSrcClass parent_class;
+};
+
+GType gst_socket_src_get_type (void);
+
+G_END_DECLS
+
+#endif /* __GST_SOCKET_SRC_H__ */
diff --git a/gst/tcp/gsttcpplugin.c b/gst/tcp/gsttcpplugin.c
index ac9867e15..49db6d3ea 100644
--- a/gst/tcp/gsttcpplugin.c
+++ b/gst/tcp/gsttcpplugin.c
@@ -21,6 +21,7 @@
#include "config.h"
#endif
+#include "gstsocketsrc.h"
#include "gsttcpclientsrc.h"
#include "gsttcpclientsink.h"
#include "gsttcpserversrc.h"
@@ -33,6 +34,9 @@ GST_DEBUG_CATEGORY (tcp_debug);
static gboolean
plugin_init (GstPlugin * plugin)
{
+ if (!gst_element_register (plugin, "socketsrc", GST_RANK_NONE,
+ GST_TYPE_SOCKET_SRC))
+ return FALSE;
if (!gst_element_register (plugin, "tcpclientsink", GST_RANK_NONE,
GST_TYPE_TCP_CLIENT_SINK))
return FALSE;
diff --git a/tests/check/pipelines/tcp.c b/tests/check/pipelines/tcp.c
index 18c01db34..71cdba109 100644
--- a/tests/check/pipelines/tcp.c
+++ b/tests/check/pipelines/tcp.c
@@ -18,11 +18,18 @@
* Boston, MA 02110-1301, USA.
*/
+#include <unistd.h>
+#include <sys/socket.h>
+
#include <gio/gio.h>
#include <gst/check/gstcheck.h>
#include <gst/app/gstappsink.h>
#include <gst/app/gstappsrc.h>
+static gboolean
+g_socketpair (GSocketFamily family, GSocketType type, GSocketProtocol protocol,
+ GSocket * gsv[2], GError ** error);
+
typedef struct
{
GstElement *sink;
@@ -102,6 +109,66 @@ symmetry_test_assert_passthrough (SymmetryTest * st, GstBuffer * in)
gst_sample_unref (out);
}
+static gboolean
+g_socketpair (GSocketFamily family, GSocketType type, GSocketProtocol protocol,
+ GSocket * gsv[2], GError ** error)
+{
+ int ret;
+ int sv[2];
+
+ ret = socketpair (family, type, protocol, sv);
+ if (ret != 0) {
+ g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "socketpair failed: %s",
+ g_strerror (errno));
+ return FALSE;
+ }
+
+ gsv[0] = g_socket_new_from_fd (sv[0], error);
+ if (gsv[0] == NULL) {
+ close (sv[0]);
+ close (sv[1]);
+ return FALSE;
+ }
+ gsv[1] = g_socket_new_from_fd (sv[1], error);
+ if (gsv[1] == NULL) {
+ g_object_unref (gsv[0]);
+ gsv[0] = NULL;
+ close (sv[1]);
+ return FALSE;
+ }
+ return TRUE;
+}
+
+GST_START_TEST (test_that_socketsrc_and_multisocketsink_are_symmetrical)
+{
+ SymmetryTest st = { 0 };
+ GSocket *sockets[2] = { NULL, NULL };
+ GError *err = NULL;
+
+ st.sink = gst_check_setup_element ("multisocketsink");
+ st.src = gst_check_setup_element ("socketsrc");
+
+ fail_unless (g_socketpair (G_SOCKET_FAMILY_UNIX,
+ G_SOCKET_TYPE_STREAM | SOCK_CLOEXEC, G_SOCKET_PROTOCOL_DEFAULT,
+ sockets, &err));
+
+ g_object_set (st.src, "socket", sockets[0], NULL);
+ g_object_unref (sockets[0]);
+ sockets[0] = NULL;
+
+ symmetry_test_setup (&st, st.sink, st.src);
+
+ g_signal_emit_by_name (st.sink, "add", sockets[1], NULL);
+ g_object_unref (sockets[1]);
+ sockets[1] = NULL;
+
+ symmetry_test_assert_passthrough (&st,
+ gst_buffer_new_wrapped (g_strdup ("hello"), 5));
+ symmetry_test_teardown (&st);
+}
+
+GST_END_TEST;
+
GST_START_TEST (test_that_tcpclientsink_and_tcpserversrc_are_symmetrical)
{
@@ -143,6 +210,8 @@ socketintegrationtest_suite (void)
suite_add_tcase (s, tc_chain);
tcase_add_test (tc_chain,
+ test_that_socketsrc_and_multisocketsink_are_symmetrical);
+ tcase_add_test (tc_chain,
test_that_tcpclientsink_and_tcpserversrc_are_symmetrical);
tcase_add_test (tc_chain,
test_that_tcpserversink_and_tcpclientsrc_are_symmetrical);
diff --git a/win32/vs7/libgsttcp.vcproj b/win32/vs7/libgsttcp.vcproj
index 2c9d3c891..4ec0200e9 100644
--- a/win32/vs7/libgsttcp.vcproj
+++ b/win32/vs7/libgsttcp.vcproj
@@ -126,6 +126,9 @@
RelativePath="..\..\gst\tcp\gstmultifdsink.c">
</File>
<File
+ RelativePath="..\..\gst\tcp\gstsocketsrc.c">
+ </File>
+ <File
RelativePath="..\..\gst\tcp\gsttcp.c">
</File>
<File
diff --git a/win32/vs8/libgsttcp.vcproj b/win32/vs8/libgsttcp.vcproj
index 6600730ab..3f59b33b6 100644
--- a/win32/vs8/libgsttcp.vcproj
+++ b/win32/vs8/libgsttcp.vcproj
@@ -190,6 +190,10 @@
>
</File>
<File
+ RelativePath="..\..\gst\tcp\gstsocketsrc.c"
+ >
+ </File>
+ <File
RelativePath="..\..\gst\tcp\gsttcp.c"
>
</File>