summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--configure.ac2
-rw-r--r--gst/meson.build4
-rw-r--r--gst/rist/Makefile.am23
-rw-r--r--gst/rist/gstrist.h59
-rw-r--r--gst/rist/gstristplugin.c50
-rw-r--r--gst/rist/gstristrtxreceive.c302
-rw-r--r--gst/rist/gstristrtxsend.c772
-rw-r--r--gst/rist/gstristsink.c838
-rw-r--r--gst/rist/gstristsrc.c1021
-rw-r--r--gst/rist/meson.build17
-rw-r--r--meson_options.txt1
11 files changed, 3087 insertions, 2 deletions
diff --git a/configure.ac b/configure.ac
index 5114e36fc..cb72ee853 100644
--- a/configure.ac
+++ b/configure.ac
@@ -478,6 +478,7 @@ AG_GST_CHECK_PLUGIN(pnm)
AG_GST_CHECK_PLUGIN(proxy)
AG_GST_CHECK_PLUGIN(rawparse)
AG_GST_CHECK_PLUGIN(removesilence)
+AG_GST_CHECK_PLUGIN(rist)
AG_GST_CHECK_PLUGIN(sdp)
AG_GST_CHECK_PLUGIN(segmentclip)
AG_GST_CHECK_PLUGIN(siren)
@@ -2556,6 +2557,7 @@ gst/pnm/Makefile
gst/proxy/Makefile
gst/rawparse/Makefile
gst/removesilence/Makefile
+gst/rist/Makefile
gst/sdp/Makefile
gst/segmentclip/Makefile
gst/siren/Makefile
diff --git a/gst/meson.build b/gst/meson.build
index 8c8349802..f6b306c1e 100644
--- a/gst/meson.build
+++ b/gst/meson.build
@@ -8,8 +8,8 @@ foreach plugin : ['accurip', 'adpcmdec', 'adpcmenc', 'aiff', 'asfmux',
'ivfparse', 'ivtc', 'jp2kdecimator', 'jpegformat', 'librfb',
'midi', 'mpegdemux', 'mpegpsmux', 'mpegtsdemux', 'mpegtsmux',
'mxf', 'netsim', 'onvif', 'pcapparse', 'pnm', 'proxy',
- 'rawparse', 'removesilence', 'sdp', 'segmentclip', 'siren',
- 'smooth', 'speed', 'subenc', 'timecode',
+ 'rawparse', 'removesilence', 'rist', 'sdp', 'segmentclip',
+ 'siren', 'smooth', 'speed', 'subenc', 'timecode',
'videofilters', 'videoframe_audiolevel', 'videoparsers',
'videosignal', 'vmnc', 'y4m', 'yadif']
if not get_option(plugin).disabled()
diff --git a/gst/rist/Makefile.am b/gst/rist/Makefile.am
new file mode 100644
index 000000000..d198f3f8e
--- /dev/null
+++ b/gst/rist/Makefile.am
@@ -0,0 +1,23 @@
+plugin_LTLIBRARIES = libgstrist.la
+
+libgstrist_la_SOURCES = \
+ gstristsrc.c \
+ gstristsink.c \
+ gstristrtxsend.c \
+ gstristrtxreceive.c \
+ gstristplugin.c
+
+noinst_HEADERS = \
+ gstrist.h
+
+libgstrist_la_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) \
+ $(GST_CFLAGS) \
+ $(GIO_CFLAGS)
+libgstrist_la_LIBADD = $(GST_PLUGINS_BASE_LIBS) \
+ $(GST_BASE_LIBS) \
+ -lgstrtp-@GST_API_VERSION@ \
+ $(GST_NET_LIBS) \
+ $(GST_LIBS) \
+ $(GIO_LIBS)
+libgstrist_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS)
+libgstrist_la_LIBTOOLFLAGS = $(GST_PLUGIN_LIBTOOLFLAGS)
diff --git a/gst/rist/gstrist.h b/gst/rist/gstrist.h
new file mode 100644
index 000000000..b4bcb6f04
--- /dev/null
+++ b/gst/rist/gstrist.h
@@ -0,0 +1,59 @@
+/* GStreamer RIST plugin
+ * Copyright (C) 2019 Net Insight AB
+ * Author: Nicolas Dufresne <nicolas.dufresne@collabora.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#include <gst/gst.h>
+
+#ifndef __GST_RIST_H__
+#define __GST_RIST_H__
+
+#define GST_TYPE_RIST_RTX_RECEIVE (gst_rist_rtx_receive_get_type())
+#define GST_RIST_RTX_RECEIVE(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RIST_RTX_RECEIVE, GstRistRtxReceive))
+typedef struct _GstRistRtxReceive GstRistRtxReceive;
+typedef struct {
+ GstElementClass parent_class;
+} GstRistRtxReceiveClass;
+GType gst_rist_rtx_receive_get_type (void);
+
+#define GST_TYPE_RIST_RTX_SEND (gst_rist_rtx_send_get_type())
+#define GST_RIST_RTX_SEND(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RIST_RTX_SEND, GstRistRtxSend))
+typedef struct _GstRistRtxSend GstRistRtxSend;
+typedef struct {
+ GstElementClass parent_class;
+} GstRistRtxSendClass;
+GType gst_rist_rtx_send_get_type (void);
+
+#define GST_TYPE_RIST_SRC (gst_rist_src_get_type())
+#define GST_RIST_SRC(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RIST_SRC,GstRistSrc))
+typedef struct _GstRistSrc GstRistSrc;
+typedef struct {
+ GstBinClass parent;
+} GstRistSrcClass;
+GType gst_rist_src_get_type (void);
+
+
+#define GST_TYPE_RIST_SINK (gst_rist_sink_get_type())
+#define GST_RIST_SINK(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RIST_SINK,GstRistSink))
+typedef struct _GstRistSink GstRistSink;
+typedef struct {
+ GstBinClass parent;
+} GstRistSinkClass;
+GType gst_rist_sink_get_type (void);
+
+#endif
diff --git a/gst/rist/gstristplugin.c b/gst/rist/gstristplugin.c
new file mode 100644
index 000000000..cb6dcd361
--- /dev/null
+++ b/gst/rist/gstristplugin.c
@@ -0,0 +1,50 @@
+/* GStreamer RIST plugin
+ * Copyright (C) 2019 Net Insight AB
+ * Author: Nicolas Dufresne <nicolas.dufresne@collabora.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "gstrist.h"
+
+static gboolean
+plugin_init (GstPlugin * plugin)
+{
+ if (!gst_element_register (plugin, "ristsrc", GST_RANK_PRIMARY,
+ GST_TYPE_RIST_SRC))
+ return FALSE;
+ if (!gst_element_register (plugin, "ristsink", GST_RANK_PRIMARY,
+ GST_TYPE_RIST_SINK))
+ return FALSE;
+ if (!gst_element_register (plugin, "ristrtxsend", GST_RANK_NONE,
+ GST_TYPE_RIST_RTX_SEND))
+ return FALSE;
+ if (!gst_element_register (plugin, "ristrtxreceive", GST_RANK_NONE,
+ GST_TYPE_RIST_RTX_RECEIVE))
+ return FALSE;
+
+ return TRUE;
+}
+
+GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
+ GST_VERSION_MINOR,
+ rist,
+ "Source and Sink for RIST TR-06-1 streaming specification",
+ plugin_init, VERSION, GST_LICENSE, GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN)
diff --git a/gst/rist/gstristrtxreceive.c b/gst/rist/gstristrtxreceive.c
new file mode 100644
index 000000000..fe8a6d5fa
--- /dev/null
+++ b/gst/rist/gstristrtxreceive.c
@@ -0,0 +1,302 @@
+/* RTP Retransmission receiver element for GStreamer
+ *
+ * gstrtprtxreceive.c:
+ *
+ * Copyright (C) 2013-2019 Collabora Ltd.
+ * @author Julien Isorce <julien.isorce@collabora.co.uk>
+ * Nicolas Dufresne <nicolas.dufresne@collabora.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * SECTION:element-ristrtxreceive
+ * @title: ristrtxreceive
+ * @see_also: ristrtxsend
+ *
+ * This element translates RIST RTX packets into its original form with the
+ * %GST_RTP_BUFFER_FLAG_RETRANSMISSION flag set. This element is intented to
+ * be used by ristsrc element.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gst/gst.h>
+#include <gst/rtp/gstrtpbuffer.h>
+
+#include "gstrist.h"
+
+GST_DEBUG_CATEGORY_STATIC (gst_rist_rtx_receive_debug);
+#define GST_CAT_DEFAULT gst_rist_rtx_receive_debug
+
+enum
+{
+ PROP_0,
+ PROP_NUM_RTX_REQUESTS,
+ PROP_NUM_RTX_PACKETS,
+ PROP_RIST
+};
+
+static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
+ GST_PAD_SRC,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS ("application/x-rtp")
+ );
+
+static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink",
+ GST_PAD_SINK,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS ("application/x-rtp")
+ );
+
+struct _GstRistRtxReceive
+{
+ GstElement element;
+
+ /* pad */
+ GstPad *sinkpad;
+ GstPad *srcpad;
+
+ /* statistics */
+ guint num_rtx_requests;
+ guint num_rtx_packets;
+
+ GstClockTime last_time;
+};
+
+static gboolean gst_rist_rtx_receive_src_event (GstPad * pad,
+ GstObject * parent, GstEvent * event);
+static GstFlowReturn gst_rist_rtx_receive_chain (GstPad * pad,
+ GstObject * parent, GstBuffer * buffer);
+static GstStateChangeReturn gst_rist_rtx_receive_change_state (GstElement *
+ element, GstStateChange transition);
+static void gst_rist_rtx_receive_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec);
+
+G_DEFINE_TYPE_WITH_CODE (GstRistRtxReceive, gst_rist_rtx_receive,
+ GST_TYPE_ELEMENT, GST_DEBUG_CATEGORY_INIT (gst_rist_rtx_receive_debug,
+ "ristrtxreceive", 0, "RIST retransmission receiver"));
+
+static void
+gst_rist_rtx_receive_class_init (GstRistRtxReceiveClass * klass)
+{
+ GObjectClass *gobject_class;
+ GstElementClass *gstelement_class;
+
+ gobject_class = (GObjectClass *) klass;
+ gstelement_class = (GstElementClass *) klass;
+
+ gobject_class->get_property = gst_rist_rtx_receive_get_property;
+
+ g_object_class_install_property (gobject_class, PROP_NUM_RTX_REQUESTS,
+ g_param_spec_uint ("num-rtx-requests", "Num RTX Requests",
+ "Number of retransmission events received", 0, G_MAXUINT,
+ 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class, PROP_NUM_RTX_PACKETS,
+ g_param_spec_uint ("num-rtx-packets", "Num RTX Packets",
+ " Number of retransmission packets received", 0, G_MAXUINT,
+ 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
+ gst_element_class_add_static_pad_template (gstelement_class, &src_factory);
+ gst_element_class_add_static_pad_template (gstelement_class, &sink_factory);
+
+ gst_element_class_set_static_metadata (gstelement_class,
+ "RIST Retransmission receiver", "Codec",
+ "Receive retransmitted RIST packets according to VSF TR-06-1",
+ "Nicolas Dufresne <nicolas.dufresne@collabora.com>");
+
+ gstelement_class->change_state =
+ GST_DEBUG_FUNCPTR (gst_rist_rtx_receive_change_state);
+}
+
+static void
+gst_rist_rtx_receive_reset (GstRistRtxReceive * rtx)
+{
+ GST_OBJECT_LOCK (rtx);
+ rtx->num_rtx_requests = 0;
+ rtx->num_rtx_packets = 0;
+ GST_OBJECT_UNLOCK (rtx);
+}
+
+static void
+gst_rist_rtx_receive_init (GstRistRtxReceive * rtx)
+{
+ GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtx);
+
+ rtx->srcpad =
+ gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
+ "src"), "src");
+ GST_PAD_SET_PROXY_CAPS (rtx->srcpad);
+ GST_PAD_SET_PROXY_ALLOCATION (rtx->srcpad);
+ gst_pad_set_event_function (rtx->srcpad,
+ GST_DEBUG_FUNCPTR (gst_rist_rtx_receive_src_event));
+ gst_element_add_pad (GST_ELEMENT (rtx), rtx->srcpad);
+
+ rtx->sinkpad =
+ gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
+ "sink"), "sink");
+ GST_PAD_SET_PROXY_CAPS (rtx->sinkpad);
+ GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad);
+ gst_pad_set_chain_function (rtx->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_rist_rtx_receive_chain));
+ gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
+}
+
+static gboolean
+gst_rist_rtx_receive_src_event (GstPad * pad, GstObject * parent,
+ GstEvent * event)
+{
+ GstRistRtxReceive *rtx = GST_RIST_RTX_RECEIVE (parent);
+
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_CUSTOM_UPSTREAM:
+ {
+ const GstStructure *s = gst_event_get_structure (event);
+
+ /* This event usually comes from the downstream gstrtpjitterbuffer */
+ if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
+#ifndef GST_DISABLE_GST_DEBUG
+ guint seqnum = 0;
+ guint ssrc = 0;
+
+ /* retrieve seqnum of the packet that need to be retransmitted */
+ if (!gst_structure_get_uint (s, "seqnum", &seqnum))
+ seqnum = -1;
+
+ /* retrieve ssrc of the packet that need to be retransmitted
+ * it's useful when reconstructing the original packet from the rtx packet */
+ if (!gst_structure_get_uint (s, "ssrc", &ssrc))
+ ssrc = -1;
+
+ GST_DEBUG_OBJECT (rtx, "got rtx request for seqnum: %u, ssrc: %X",
+ seqnum, ssrc);
+#endif
+
+ GST_OBJECT_LOCK (rtx);
+ /* increase number of seen requests for our statistics */
+ ++rtx->num_rtx_requests;
+ GST_OBJECT_UNLOCK (rtx);
+ }
+ break;
+ }
+ default:
+ break;
+ }
+
+ return gst_pad_event_default (pad, parent, event);
+}
+
+static GstFlowReturn
+gst_rist_rtx_receive_chain (GstPad * pad, GstObject * parent,
+ GstBuffer * buffer)
+{
+ GstRistRtxReceive *rtx = GST_RIST_RTX_RECEIVE (parent);
+ GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
+ guint32 ssrc = 0;
+ guint16 seqnum = 0;
+ gboolean is_rtx;
+
+ /* map current rtp packet to parse its header */
+ if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp))
+ goto invalid_buffer;
+
+ ssrc = gst_rtp_buffer_get_ssrc (&rtp);
+ seqnum = gst_rtp_buffer_get_seq (&rtp);
+
+ /* check if we have a retransmission packet (this information comes from SDP) */
+ GST_OBJECT_LOCK (rtx);
+
+ /* RIST sets SSRC LSB to 1 to indicate an RTC packet */
+ is_rtx = ssrc & 0x1;
+ rtx->last_time = GST_BUFFER_PTS (buffer);
+
+ if (is_rtx)
+ /* increase our statistic */
+ ++rtx->num_rtx_packets;
+
+ GST_OBJECT_UNLOCK (rtx);
+
+ /* create the retransmission packet */
+ if (is_rtx) {
+ GST_DEBUG_OBJECT (rtx,
+ "Recovered packet from RIST RTX seqnum:%u ssrc: %u",
+ gst_rtp_buffer_get_seq (&rtp), gst_rtp_buffer_get_ssrc (&rtp));
+ gst_rtp_buffer_set_ssrc (&rtp, ssrc & 0xFFFFFFFE);
+ GST_BUFFER_FLAG_SET (buffer, GST_RTP_BUFFER_FLAG_RETRANSMISSION);
+ }
+
+ gst_rtp_buffer_unmap (&rtp);
+
+ GST_TRACE_OBJECT (rtx, "pushing packet seqnum:%u from master stream "
+ "ssrc: %X", seqnum, ssrc);
+ return gst_pad_push (rtx->srcpad, buffer);
+
+invalid_buffer:
+ {
+ GST_ELEMENT_WARNING (rtx, STREAM, DECODE, (NULL),
+ ("Received invalid RTP payload, dropping"));
+ gst_buffer_unref (buffer);
+ return GST_FLOW_OK;
+ }
+}
+
+static void
+gst_rist_rtx_receive_get_property (GObject * object,
+ guint prop_id, GValue * value, GParamSpec * pspec)
+{
+ GstRistRtxReceive *rtx = GST_RIST_RTX_RECEIVE (object);
+
+ switch (prop_id) {
+ case PROP_NUM_RTX_REQUESTS:
+ GST_OBJECT_LOCK (rtx);
+ g_value_set_uint (value, rtx->num_rtx_requests);
+ GST_OBJECT_UNLOCK (rtx);
+ break;
+ case PROP_NUM_RTX_PACKETS:
+ GST_OBJECT_LOCK (rtx);
+ g_value_set_uint (value, rtx->num_rtx_packets);
+ GST_OBJECT_UNLOCK (rtx);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static GstStateChangeReturn
+gst_rist_rtx_receive_change_state (GstElement * element,
+ GstStateChange transition)
+{
+ GstRistRtxReceive *rtx = GST_RIST_RTX_RECEIVE (element);
+ GstStateChangeReturn ret;
+
+ ret =
+ GST_ELEMENT_CLASS (gst_rist_rtx_receive_parent_class)->change_state
+ (element, transition);
+
+ switch (transition) {
+ case GST_STATE_CHANGE_PAUSED_TO_READY:
+ gst_rist_rtx_receive_reset (rtx);
+ break;
+ default:
+ break;
+ }
+
+ return ret;
+}
diff --git a/gst/rist/gstristrtxsend.c b/gst/rist/gstristrtxsend.c
new file mode 100644
index 000000000..927fb1cf0
--- /dev/null
+++ b/gst/rist/gstristrtxsend.c
@@ -0,0 +1,772 @@
+/* RIST Retransmission sender element for GStreamer
+ *
+ * gsristprtxsend.c:
+ *
+ * Copyright (C) 2013-2019 Collabora Ltd.
+ * @author Julien Isorce <julien.isorce@collabora.co.uk>
+ * Nicoas Dufresne <nicolas.dufresne@collabora.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * SECTION:element-ristrtxsend
+ * @title: ristrtxsend
+ * @see_also: ristrtxreceive
+ *
+ * This elements replies to custom events 'GstRTPRetransmissionRequest' and
+ * when available sends in RIST form the lost packet. This element is intented
+ * to be used by ristsink element.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gst/gst.h>
+#include <gst/rtp/gstrtpbuffer.h>
+#include <gst/base/gstdataqueue.h>
+
+#include "gstrist.h"
+
+GST_DEBUG_CATEGORY_STATIC (gst_rist_rtx_send_debug);
+#define GST_CAT_DEFAULT gst_rist_rtx_send_debug
+
+#define DEFAULT_MAX_SIZE_TIME 0
+#define DEFAULT_MAX_SIZE_PACKETS 100
+
+enum
+{
+ PROP_0,
+ PROP_MAX_SIZE_TIME,
+ PROP_MAX_SIZE_PACKETS,
+ PROP_NUM_RTX_REQUESTS,
+ PROP_NUM_RTX_PACKETS,
+};
+
+static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
+ GST_PAD_SRC,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS ("application/x-rtp")
+ );
+
+static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink",
+ GST_PAD_SINK,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS ("application/x-rtp, " "clock-rate = (int) [1, MAX]")
+ );
+
+struct _GstRistRtxSend
+{
+ GstElement element;
+
+ /* pad */
+ GstPad *sinkpad;
+ GstPad *srcpad;
+
+ /* rtp packets that will be pushed out */
+ GstDataQueue *queue;
+
+ /* ssrc -> SSRCRtxData */
+ GHashTable *ssrc_data;
+ /* rtx ssrc -> master ssrc */
+ GHashTable *rtx_ssrcs;
+
+ /* buffering control properties */
+ guint max_size_time;
+ guint max_size_packets;
+
+ /* statistics */
+ guint num_rtx_requests;
+ guint num_rtx_packets;
+};
+
+static gboolean gst_rist_rtx_send_queue_check_full (GstDataQueue * queue,
+ guint visible, guint bytes, guint64 time, gpointer checkdata);
+
+static gboolean gst_rist_rtx_send_src_event (GstPad * pad, GstObject * parent,
+ GstEvent * event);
+static gboolean gst_rist_rtx_send_sink_event (GstPad * pad, GstObject * parent,
+ GstEvent * event);
+static GstFlowReturn gst_rist_rtx_send_chain (GstPad * pad, GstObject * parent,
+ GstBuffer * buffer);
+static GstFlowReturn gst_rist_rtx_send_chain_list (GstPad * pad,
+ GstObject * parent, GstBufferList * list);
+
+static void gst_rist_rtx_send_src_loop (GstRistRtxSend * rtx);
+static gboolean gst_rist_rtx_send_activate_mode (GstPad * pad,
+ GstObject * parent, GstPadMode mode, gboolean active);
+
+static GstStateChangeReturn gst_rist_rtx_send_change_state (GstElement *
+ element, GstStateChange transition);
+
+static void gst_rist_rtx_send_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec);
+static void gst_rist_rtx_send_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec);
+static void gst_rist_rtx_send_finalize (GObject * object);
+
+G_DEFINE_TYPE_WITH_CODE (GstRistRtxSend, gst_rist_rtx_send, GST_TYPE_ELEMENT,
+ GST_DEBUG_CATEGORY_INIT (gst_rist_rtx_send_debug, "ristrtxsend", 0,
+ "RIST retransmission sender"));
+
+typedef struct
+{
+ guint16 seqnum;
+ guint32 timestamp;
+ GstBuffer *buffer;
+} BufferQueueItem;
+
+static void
+buffer_queue_item_free (BufferQueueItem * item)
+{
+ gst_buffer_unref (item->buffer);
+ g_slice_free (BufferQueueItem, item);
+}
+
+typedef struct
+{
+ guint32 rtx_ssrc;
+ guint16 seqnum_base, next_seqnum;
+ gint clock_rate;
+
+ /* history of rtp packets */
+ GSequence *queue;
+} SSRCRtxData;
+
+static SSRCRtxData *
+ssrc_rtx_data_new (guint32 rtx_ssrc)
+{
+ SSRCRtxData *data = g_slice_new0 (SSRCRtxData);
+
+ data->rtx_ssrc = rtx_ssrc;
+ data->next_seqnum = data->seqnum_base = g_random_int_range (0, G_MAXUINT16);
+ data->queue = g_sequence_new ((GDestroyNotify) buffer_queue_item_free);
+
+ return data;
+}
+
+static void
+ssrc_rtx_data_free (SSRCRtxData * data)
+{
+ g_sequence_free (data->queue);
+ g_slice_free (SSRCRtxData, data);
+}
+
+static void
+gst_rist_rtx_send_class_init (GstRistRtxSendClass * klass)
+{
+ GObjectClass *gobject_class;
+ GstElementClass *gstelement_class;
+
+ gobject_class = (GObjectClass *) klass;
+ gstelement_class = (GstElementClass *) klass;
+
+ gobject_class->get_property = gst_rist_rtx_send_get_property;
+ gobject_class->set_property = gst_rist_rtx_send_set_property;
+ gobject_class->finalize = gst_rist_rtx_send_finalize;
+
+ g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
+ g_param_spec_uint ("max-size-time", "Max Size Time",
+ "Amount of ms to queue (0 = unlimited)", 0, G_MAXUINT,
+ DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class, PROP_MAX_SIZE_PACKETS,
+ g_param_spec_uint ("max-size-packets", "Max Size Packets",
+ "Amount of packets to queue (0 = unlimited)", 0, G_MAXINT16,
+ DEFAULT_MAX_SIZE_PACKETS,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class, PROP_NUM_RTX_REQUESTS,
+ g_param_spec_uint ("num-rtx-requests", "Num RTX Requests",
+ "Number of retransmission events received", 0, G_MAXUINT,
+ 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class, PROP_NUM_RTX_PACKETS,
+ g_param_spec_uint ("num-rtx-packets", "Num RTX Packets",
+ " Number of retransmission packets sent", 0, G_MAXUINT,
+ 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
+ gst_element_class_add_static_pad_template (gstelement_class, &src_factory);
+ gst_element_class_add_static_pad_template (gstelement_class, &sink_factory);
+
+ gst_element_class_set_static_metadata (gstelement_class,
+ "RIST Retransmission Sender", "Codec",
+ "Retransmit RTP packets when needed, according to VSF TR-06-1",
+ "Nicolas Dufresne <nicolas.dufresne@collabora.com>");
+
+ gstelement_class->change_state =
+ GST_DEBUG_FUNCPTR (gst_rist_rtx_send_change_state);
+}
+
+static void
+gst_rist_rtx_send_reset (GstRistRtxSend * rtx)
+{
+ GST_OBJECT_LOCK (rtx);
+ gst_data_queue_flush (rtx->queue);
+ g_hash_table_remove_all (rtx->ssrc_data);
+ g_hash_table_remove_all (rtx->rtx_ssrcs);
+ rtx->num_rtx_requests = 0;
+ rtx->num_rtx_packets = 0;
+ GST_OBJECT_UNLOCK (rtx);
+}
+
+static void
+gst_rist_rtx_send_finalize (GObject * object)
+{
+ GstRistRtxSend *rtx = GST_RIST_RTX_SEND (object);
+
+ g_hash_table_unref (rtx->ssrc_data);
+ g_hash_table_unref (rtx->rtx_ssrcs);
+ g_object_unref (rtx->queue);
+
+ G_OBJECT_CLASS (gst_rist_rtx_send_parent_class)->finalize (object);
+}
+
+static void
+gst_rist_rtx_send_init (GstRistRtxSend * rtx)
+{
+ GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtx);
+
+ rtx->srcpad =
+ gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
+ "src"), "src");
+ GST_PAD_SET_PROXY_CAPS (rtx->srcpad);
+ GST_PAD_SET_PROXY_ALLOCATION (rtx->srcpad);
+ gst_pad_set_event_function (rtx->srcpad,
+ GST_DEBUG_FUNCPTR (gst_rist_rtx_send_src_event));
+ gst_pad_set_activatemode_function (rtx->srcpad,
+ GST_DEBUG_FUNCPTR (gst_rist_rtx_send_activate_mode));
+ gst_element_add_pad (GST_ELEMENT (rtx), rtx->srcpad);
+
+ rtx->sinkpad =
+ gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
+ "sink"), "sink");
+ GST_PAD_SET_PROXY_CAPS (rtx->sinkpad);
+ GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad);
+ gst_pad_set_event_function (rtx->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_rist_rtx_send_sink_event));
+ gst_pad_set_chain_function (rtx->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_rist_rtx_send_chain));
+ gst_pad_set_chain_list_function (rtx->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_rist_rtx_send_chain_list));
+ gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
+
+ rtx->queue = gst_data_queue_new (gst_rist_rtx_send_queue_check_full, NULL,
+ NULL, rtx);
+ rtx->ssrc_data = g_hash_table_new_full (g_direct_hash, g_direct_equal,
+ NULL, (GDestroyNotify) ssrc_rtx_data_free);
+ rtx->rtx_ssrcs = g_hash_table_new (g_direct_hash, g_direct_equal);
+
+ rtx->max_size_time = DEFAULT_MAX_SIZE_TIME;
+ rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS;
+}
+
+static void
+gst_rist_rtx_send_set_flushing (GstRistRtxSend * rtx, gboolean flush)
+{
+ GST_OBJECT_LOCK (rtx);
+ gst_data_queue_set_flushing (rtx->queue, flush);
+ gst_data_queue_flush (rtx->queue);
+ GST_OBJECT_UNLOCK (rtx);
+}
+
+static gboolean
+gst_rist_rtx_send_queue_check_full (GstDataQueue * queue,
+ guint visible, guint bytes, guint64 time, gpointer checkdata)
+{
+ return FALSE;
+}
+
+static void
+gst_rtp_rtx_data_queue_item_free (gpointer item)
+{
+ GstDataQueueItem *data = item;
+ if (data->object)
+ gst_mini_object_unref (data->object);
+ g_slice_free (GstDataQueueItem, data);
+}
+
+static gboolean
+gst_rist_rtx_send_push_out (GstRistRtxSend * rtx, gpointer object)
+{
+ GstDataQueueItem *data;
+ gboolean success;
+
+ data = g_slice_new0 (GstDataQueueItem);
+ data->object = GST_MINI_OBJECT (object);
+ data->size = 1;
+ data->duration = 1;
+ data->visible = TRUE;
+ data->destroy = gst_rtp_rtx_data_queue_item_free;
+
+ success = gst_data_queue_push (rtx->queue, data);
+
+ if (!success)
+ data->destroy (data);
+
+ return success;
+}
+
+static SSRCRtxData *
+gst_rist_rtx_send_get_ssrc_data (GstRistRtxSend * rtx, guint32 ssrc)
+{
+ SSRCRtxData *data;
+ guint32 rtx_ssrc = 0;
+
+ data = g_hash_table_lookup (rtx->ssrc_data, GUINT_TO_POINTER (ssrc));
+ if (!data) {
+ /* See 5.3.2 Retransmitted Packets, orignal packet have SSRC LSB set to
+ * 0, while RTX packet have LSB set to 1 */
+ rtx_ssrc = ssrc + 1;
+ data = ssrc_rtx_data_new (rtx_ssrc);
+ g_hash_table_insert (rtx->ssrc_data, GUINT_TO_POINTER (ssrc), data);
+ g_hash_table_insert (rtx->rtx_ssrcs, GUINT_TO_POINTER (rtx_ssrc),
+ GUINT_TO_POINTER (ssrc));
+ }
+
+ return data;
+}
+
+/*
+ * see RIST TR-06-1 5.3.2 Retransmitted Packets
+ *
+ * RIST simply resend the packet verbatim, with SSRC+1, the defaults SSRC always
+ * have the LSB set to 0, so we can differentiate the retransmission and the
+ * normal packet.
+ */
+static GstBuffer *
+gst_rtp_rist_buffer_new (GstRistRtxSend * rtx, GstBuffer * buffer, guint32 ssrc)
+{
+ GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
+
+ buffer = gst_buffer_copy_deep (buffer);
+ gst_rtp_buffer_map (buffer, GST_MAP_WRITE, &rtp);
+ gst_rtp_buffer_set_ssrc (&rtp, ssrc + 1);
+ gst_rtp_buffer_unmap (&rtp);
+
+ return buffer;
+}
+
+static gint
+buffer_queue_items_cmp (BufferQueueItem * a, BufferQueueItem * b,
+ gpointer user_data)
+{
+ /* gst_rtp_buffer_compare_seqnum returns the opposite of what we want,
+ * it returns negative when seqnum1 > seqnum2 and we want negative
+ * when b > a, i.e. a is smaller, so it comes first in the sequence */
+ return gst_rtp_buffer_compare_seqnum (b->seqnum, a->seqnum);
+}
+
+static gboolean
+gst_rist_rtx_send_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
+{
+ GstRistRtxSend *rtx = GST_RIST_RTX_SEND (parent);
+
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_CUSTOM_UPSTREAM:
+ {
+ const GstStructure *s = gst_event_get_structure (event);
+
+ /* This event usually comes from the downstream gstrtpsession */
+ if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
+ guint seqnum = 0;
+ guint ssrc = 0;
+ GstBuffer *rtx_buf = NULL;
+
+ /* retrieve seqnum of the packet that need to be retransmitted */
+ if (!gst_structure_get_uint (s, "seqnum", &seqnum))
+ seqnum = -1;
+
+ /* retrieve ssrc of the packet that need to be retransmitted */
+ if (!gst_structure_get_uint (s, "ssrc", &ssrc))
+ ssrc = -1;
+
+ GST_DEBUG_OBJECT (rtx, "got rtx request for seqnum: %u, ssrc: %X",
+ seqnum, ssrc);
+
+ GST_OBJECT_LOCK (rtx);
+ /* check if request is for us */
+ if (g_hash_table_contains (rtx->ssrc_data, GUINT_TO_POINTER (ssrc))) {
+ SSRCRtxData *data;
+ GSequenceIter *iter;
+ BufferQueueItem search_item;
+
+ /* update statistics */
+ ++rtx->num_rtx_requests;
+
+ data = gst_rist_rtx_send_get_ssrc_data (rtx, ssrc);
+
+ search_item.seqnum = seqnum;
+ iter = g_sequence_lookup (data->queue, &search_item,
+ (GCompareDataFunc) buffer_queue_items_cmp, NULL);
+ if (iter) {
+ BufferQueueItem *item = g_sequence_get (iter);
+ GST_LOG_OBJECT (rtx, "found %u", item->seqnum);
+ rtx_buf = gst_rtp_rist_buffer_new (rtx, item->buffer, ssrc);
+ }
+#ifndef GST_DISABLE_DEBUG
+ else {
+ BufferQueueItem *item = NULL;
+
+ iter = g_sequence_get_begin_iter (data->queue);
+ if (!g_sequence_iter_is_end (iter))
+ item = g_sequence_get (iter);
+
+ if (item && seqnum < item->seqnum) {
+ GST_DEBUG_OBJECT (rtx, "requested seqnum %u has already been "
+ "removed from the rtx queue; the first available is %u",
+ seqnum, item->seqnum);
+ } else {
+ GST_WARNING_OBJECT (rtx, "requested seqnum %u has not been "
+ "transmitted yet in the original stream; either the remote end "
+ "is not configured correctly, or the source is too slow",
+ seqnum);
+ }
+#endif
+ }
+ }
+ GST_OBJECT_UNLOCK (rtx);
+
+ if (rtx_buf)
+ gst_rist_rtx_send_push_out (rtx, rtx_buf);
+
+ gst_event_unref (event);
+ return TRUE;
+ }
+ break;
+ }
+ default:
+ break;
+ }
+
+ return gst_pad_event_default (pad, parent, event);
+}
+
+static gboolean
+gst_rist_rtx_send_sink_event (GstPad * pad, GstObject * parent,
+ GstEvent * event)
+{
+ GstRistRtxSend *rtx = GST_RIST_RTX_SEND (parent);
+
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_FLUSH_START:
+ gst_pad_push_event (rtx->srcpad, event);
+ gst_rist_rtx_send_set_flushing (rtx, TRUE);
+ gst_pad_pause_task (rtx->srcpad);
+ return TRUE;
+ case GST_EVENT_FLUSH_STOP:
+ gst_pad_push_event (rtx->srcpad, event);
+ gst_rist_rtx_send_set_flushing (rtx, FALSE);
+ gst_pad_start_task (rtx->srcpad,
+ (GstTaskFunction) gst_rist_rtx_send_src_loop, rtx, NULL);
+ return TRUE;
+ case GST_EVENT_EOS:
+ GST_INFO_OBJECT (rtx, "Got EOS - enqueueing it");
+ gst_rist_rtx_send_push_out (rtx, event);
+ return TRUE;
+ case GST_EVENT_CAPS:
+ {
+ GstCaps *caps;
+ GstStructure *s;
+ guint ssrc;
+ gint payload;
+ SSRCRtxData *data;
+
+ gst_event_parse_caps (event, &caps);
+
+ s = gst_caps_get_structure (caps, 0);
+ if (!gst_structure_get_uint (s, "ssrc", &ssrc))
+ ssrc = -1;
+ if (!gst_structure_get_int (s, "payload", &payload))
+ payload = -1;
+
+ if (payload == -1)
+ GST_WARNING_OBJECT (rtx, "No payload in caps");
+
+ GST_OBJECT_LOCK (rtx);
+ data = gst_rist_rtx_send_get_ssrc_data (rtx, ssrc);
+
+ GST_DEBUG_OBJECT (rtx,
+ "got caps for payload: %d->%d, ssrc: %u : %" GST_PTR_FORMAT,
+ payload, ssrc, data->rtx_ssrc, caps);
+
+ gst_structure_get_int (s, "clock-rate", &data->clock_rate);
+
+ /* The session might need to know the RTX ssrc */
+ caps = gst_caps_copy (caps);
+ gst_caps_set_simple (caps, "rtx-ssrc", G_TYPE_UINT, data->rtx_ssrc,
+ "rtx-seqnum-offset", G_TYPE_UINT, data->seqnum_base, NULL);
+
+ GST_DEBUG_OBJECT (rtx, "got clock-rate from caps: %d for ssrc: %u",
+ data->clock_rate, ssrc);
+ GST_OBJECT_UNLOCK (rtx);
+
+ gst_event_unref (event);
+ event = gst_event_new_caps (caps);
+ gst_caps_unref (caps);
+ break;
+ }
+ default:
+ break;
+ }
+
+ return gst_pad_event_default (pad, parent, event);
+}
+
+/* like rtp_jitter_buffer_get_ts_diff() */
+static guint32
+gst_rist_rtx_send_get_ts_diff (SSRCRtxData * data)
+{
+ guint64 high_ts, low_ts;
+ BufferQueueItem *high_buf, *low_buf;
+ guint32 result;
+
+ high_buf =
+ g_sequence_get (g_sequence_iter_prev (g_sequence_get_end_iter
+ (data->queue)));
+ low_buf = g_sequence_get (g_sequence_get_begin_iter (data->queue));
+
+ if (!high_buf || !low_buf || high_buf == low_buf)
+ return 0;
+
+ high_ts = high_buf->timestamp;
+ low_ts = low_buf->timestamp;
+
+ /* it needs to work if ts wraps */
+ if (high_ts >= low_ts) {
+ result = (guint32) (high_ts - low_ts);
+ } else {
+ result = (guint32) (high_ts + G_MAXUINT32 + 1 - low_ts);
+ }
+
+ /* return value in ms instead of clock ticks */
+ return (guint32) gst_util_uint64_scale_int (result, 1000, data->clock_rate);
+}
+
+/* Must be called with lock */
+static void
+process_buffer (GstRistRtxSend * rtx, GstBuffer * buffer)
+{
+ GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
+ BufferQueueItem *item;
+ SSRCRtxData *data;
+ guint16 seqnum;
+ guint32 ssrc, rtptime;
+
+ /* read the information we want from the buffer */
+ gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
+ seqnum = gst_rtp_buffer_get_seq (&rtp);
+ ssrc = gst_rtp_buffer_get_ssrc (&rtp);
+ rtptime = gst_rtp_buffer_get_timestamp (&rtp);
+ gst_rtp_buffer_unmap (&rtp);
+
+ GST_TRACE_OBJECT (rtx, "Processing buffer seqnum: %u, ssrc: %X", seqnum,
+ ssrc);
+
+ data = gst_rist_rtx_send_get_ssrc_data (rtx, ssrc);
+
+ /* add current rtp buffer to queue history */
+ item = g_slice_new0 (BufferQueueItem);
+ item->seqnum = seqnum;
+ item->timestamp = rtptime;
+ item->buffer = gst_buffer_ref (buffer);
+ g_sequence_append (data->queue, item);
+
+ /* remove oldest packets from history if they are too many */
+ if (rtx->max_size_packets) {
+ while (g_sequence_get_length (data->queue) > rtx->max_size_packets)
+ g_sequence_remove (g_sequence_get_begin_iter (data->queue));
+ }
+ if (rtx->max_size_time) {
+ while (gst_rist_rtx_send_get_ts_diff (data) > rtx->max_size_time)
+ g_sequence_remove (g_sequence_get_begin_iter (data->queue));
+ }
+}
+
+static GstFlowReturn
+gst_rist_rtx_send_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
+{
+ GstRistRtxSend *rtx = GST_RIST_RTX_SEND (parent);
+ GstFlowReturn ret;
+
+ GST_OBJECT_LOCK (rtx);
+ process_buffer (rtx, buffer);
+ GST_OBJECT_UNLOCK (rtx);
+ ret = gst_pad_push (rtx->srcpad, buffer);
+
+ return ret;
+}
+
+static gboolean
+process_buffer_from_list (GstBuffer ** buffer, guint idx, gpointer user_data)
+{
+ process_buffer (user_data, *buffer);
+ return TRUE;
+}
+
+static GstFlowReturn
+gst_rist_rtx_send_chain_list (GstPad * pad, GstObject * parent,
+ GstBufferList * list)
+{
+ GstRistRtxSend *rtx = GST_RIST_RTX_SEND (parent);
+ GstFlowReturn ret;
+
+ GST_OBJECT_LOCK (rtx);
+ gst_buffer_list_foreach (list, process_buffer_from_list, rtx);
+ GST_OBJECT_UNLOCK (rtx);
+
+ ret = gst_pad_push_list (rtx->srcpad, list);
+
+ return ret;
+}
+
+static void
+gst_rist_rtx_send_src_loop (GstRistRtxSend * rtx)
+{
+ GstDataQueueItem *data;
+
+ if (gst_data_queue_pop (rtx->queue, &data)) {
+ GST_LOG_OBJECT (rtx, "pushing rtx buffer %p", data->object);
+
+ if (G_LIKELY (GST_IS_BUFFER (data->object))) {
+ GST_OBJECT_LOCK (rtx);
+ /* Update statistics just before pushing. */
+ rtx->num_rtx_packets++;
+ GST_OBJECT_UNLOCK (rtx);
+
+ gst_pad_push (rtx->srcpad, GST_BUFFER (data->object));
+ } else if (GST_IS_EVENT (data->object)) {
+ gst_pad_push_event (rtx->srcpad, GST_EVENT (data->object));
+
+ /* after EOS, we should not send any more buffers,
+ * even if there are more requests coming in */
+ if (GST_EVENT_TYPE (data->object) == GST_EVENT_EOS) {
+ gst_rist_rtx_send_set_flushing (rtx, TRUE);
+ }
+ } else {
+ g_assert_not_reached ();
+ }
+
+ data->object = NULL; /* we no longer own that object */
+ data->destroy (data);
+ } else {
+ GST_LOG_OBJECT (rtx, "flushing");
+ gst_pad_pause_task (rtx->srcpad);
+ }
+}
+
+static gboolean
+gst_rist_rtx_send_activate_mode (GstPad * pad, GstObject * parent,
+ GstPadMode mode, gboolean active)
+{
+ GstRistRtxSend *rtx = GST_RIST_RTX_SEND (parent);
+ gboolean ret = FALSE;
+
+ switch (mode) {
+ case GST_PAD_MODE_PUSH:
+ if (active) {
+ gst_rist_rtx_send_set_flushing (rtx, FALSE);
+ ret = gst_pad_start_task (rtx->srcpad,
+ (GstTaskFunction) gst_rist_rtx_send_src_loop, rtx, NULL);
+ } else {
+ gst_rist_rtx_send_set_flushing (rtx, TRUE);
+ ret = gst_pad_stop_task (rtx->srcpad);
+ }
+ GST_INFO_OBJECT (rtx, "activate_mode: active %d, ret %d", active, ret);
+ break;
+ default:
+ break;
+ }
+ return ret;
+}
+
+static void
+gst_rist_rtx_send_get_property (GObject * object,
+ guint prop_id, GValue * value, GParamSpec * pspec)
+{
+ GstRistRtxSend *rtx = GST_RIST_RTX_SEND (object);
+
+ switch (prop_id) {
+ case PROP_MAX_SIZE_TIME:
+ GST_OBJECT_LOCK (rtx);
+ g_value_set_uint (value, rtx->max_size_time);
+ GST_OBJECT_UNLOCK (rtx);
+ break;
+ case PROP_MAX_SIZE_PACKETS:
+ GST_OBJECT_LOCK (rtx);
+ g_value_set_uint (value, rtx->max_size_packets);
+ GST_OBJECT_UNLOCK (rtx);
+ break;
+ case PROP_NUM_RTX_REQUESTS:
+ GST_OBJECT_LOCK (rtx);
+ g_value_set_uint (value, rtx->num_rtx_requests);
+ GST_OBJECT_UNLOCK (rtx);
+ break;
+ case PROP_NUM_RTX_PACKETS:
+ GST_OBJECT_LOCK (rtx);
+ g_value_set_uint (value, rtx->num_rtx_packets);
+ GST_OBJECT_UNLOCK (rtx);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_rist_rtx_send_set_property (GObject * object,
+ guint prop_id, const GValue * value, GParamSpec * pspec)
+{
+ GstRistRtxSend *rtx = GST_RIST_RTX_SEND (object);
+
+ switch (prop_id) {
+ case PROP_MAX_SIZE_TIME:
+ GST_OBJECT_LOCK (rtx);
+ rtx->max_size_time = g_value_get_uint (value);
+ GST_OBJECT_UNLOCK (rtx);
+ break;
+ case PROP_MAX_SIZE_PACKETS:
+ GST_OBJECT_LOCK (rtx);
+ rtx->max_size_packets = g_value_get_uint (value);
+ GST_OBJECT_UNLOCK (rtx);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static GstStateChangeReturn
+gst_rist_rtx_send_change_state (GstElement * element, GstStateChange transition)
+{
+ GstStateChangeReturn ret;
+ GstRistRtxSend *rtx = GST_RIST_RTX_SEND (element);
+
+ ret =
+ GST_ELEMENT_CLASS (gst_rist_rtx_send_parent_class)->change_state (element,
+ transition);
+
+ switch (transition) {
+ case GST_STATE_CHANGE_PAUSED_TO_READY:
+ gst_rist_rtx_send_reset (rtx);
+ break;
+ default:
+ break;
+ }
+
+ return ret;
+}
diff --git a/gst/rist/gstristsink.c b/gst/rist/gstristsink.c
new file mode 100644
index 000000000..0bad78dac
--- /dev/null
+++ b/gst/rist/gstristsink.c
@@ -0,0 +1,838 @@
+/* GStreamer RIST plugin
+ * Copyright (C) 2019 Net Insight AB
+ * Author: Nicolas Dufresne <nicolas.dufresne@collabora.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * SECTION:element-ristsink
+ * @title: ristsink
+ * @see_also: ristsrc
+ *
+ * This element implements RIST TR-06-1 Simple Profile transmitter. It
+ * currently supports any registered RTP payload types such as MPEG TS. The
+ * stream passed to this element must be RTP payloaded already. Even though
+ * RTP SSRC collision is rare in unidirectional streaming, this element expect
+ * the upstream elements to obey to collision events and change the SSRC in
+ * use. Collision will ocure when tranmitting and receiving over multicast on
+ * the same host.
+ *
+ * ## Example launch line
+ * |[
+ * gst-launch-1.0 udpsrc ! tsparse set-timestamp=1 ! rtpmp2pay ! ristsink address=10.0.0.1 port=5004
+ * ]|
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gio/gio.h>
+#include <gst/rtp/rtp.h>
+
+#include "gstrist.h"
+
+GST_DEBUG_CATEGORY_STATIC (gst_rist_sink_debug);
+#define GST_CAT_DEFAULT gst_rist_sink_debug
+
+enum
+{
+ PROP_ADDRESS = 1,
+ PROP_PORT,
+ PROP_SENDER_BUFFER,
+ PROP_MIN_RTCP_INTERVAL,
+ PROP_MAX_RTCP_BANDWIDTH,
+ PROP_STATS_UPDATE_INTERVAL,
+ PROP_STATS,
+ PROP_CNAME,
+ PROP_MULTICAST_LOOPBACK,
+ PROP_MULTICAST_IFACE,
+ PROP_MULTICAST_TTL
+};
+
+static GstStaticPadTemplate sink_templ = GST_STATIC_PAD_TEMPLATE ("sink",
+ GST_PAD_SINK,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS ("application/x-rtp"));
+
+struct _GstRistSink
+{
+ GstBin parent;
+
+ /* Elements contained in the pipeline */
+ GstElement *rtpbin;
+ GstElement *rtp_sink;
+ GstElement *rtcp_src;
+ GstElement *rtcp_sink;
+ GstElement *ssrc_filter;
+ GstPad *sinkpad;
+
+ /* RTX Elements */
+ GstElement *rtxbin;
+ GstElement *rtx_send;
+
+ /* For stats */
+ guint stats_interval;
+ guint32 rtp_ssrc;
+ guint32 rtcp_ssrc;
+ GstClockID stats_cid;
+
+ /* This is set whenever there is a pipeline construction failure, and used
+ * to fail state changes later */
+ gboolean construct_failed;
+ const gchar *missing_plugin;
+};
+
+G_DEFINE_TYPE_WITH_CODE (GstRistSink, gst_rist_sink, GST_TYPE_BIN,
+ GST_DEBUG_CATEGORY_INIT (gst_rist_sink_debug, "ristsink", 0, "RIST Sink"));
+
+static GstCaps *
+gst_rist_sink_request_pt_map (GstRistSrc * sink, GstElement * session, guint pt)
+{
+ const GstRTPPayloadInfo *pt_info;
+ GstCaps *ret;
+
+ pt_info = gst_rtp_payload_info_for_pt (pt);
+ if (!pt_info || !pt_info->clock_rate)
+ return NULL;
+
+ ret = gst_caps_new_simple ("application/x-rtp",
+ "media", G_TYPE_STRING, pt_info->media,
+ "encoding_name", G_TYPE_STRING, pt_info->encoding_name,
+ "clock-rate", G_TYPE_INT, (gint) pt_info->clock_rate, NULL);
+
+ /* FIXME add sprop-parameter-set if any */
+ g_warn_if_fail (pt_info->encoding_parameters == NULL);
+
+ return ret;
+}
+
+static GstElement *
+gst_rist_sink_request_aux_sender (GstRistSink * sink, guint session_id,
+ GstElement * rtpbin)
+{
+ if (session_id != 0)
+ return NULL;
+
+ return gst_object_ref (sink->rtxbin);
+}
+
+static void
+on_app_rtcp (GObject * session, guint32 subtype, guint32 ssrc,
+ const gchar * name, GstBuffer * data, GstElement * rtpsession)
+{
+ if (g_str_equal (name, "RIST")) {
+ GstEvent *event;
+ GstPad *send_rtp_sink;
+ GstMapInfo map;
+ gint i;
+
+ send_rtp_sink = gst_element_get_static_pad (rtpsession, "send_rtp_sink");
+ if (send_rtp_sink) {
+ gst_buffer_map (data, &map, GST_MAP_READ);
+
+ for (i = 0; i < map.size; i += sizeof (guint32)) {
+ guint32 dword = GST_READ_UINT32_BE (map.data + i);
+ guint16 seqnum = dword >> 16;
+ guint16 num = dword & 0x0000FFFF;
+ guint16 j;
+
+ GST_DEBUG ("got RIST nack packet, #%u %u", seqnum, num);
+
+ /* num is inclusive, i.e. it can be 0, which means exactly 1 seqnum */
+ for (j = 0; j <= num; j++) {
+ event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
+ gst_structure_new ("GstRTPRetransmissionRequest",
+ "seqnum", G_TYPE_UINT, (guint) seqnum + j,
+ "ssrc", G_TYPE_UINT, (guint) ssrc, NULL));
+ gst_pad_push_event (send_rtp_sink, event);
+ }
+ }
+
+ gst_buffer_unmap (data, &map);
+ gst_object_unref (send_rtp_sink);
+ }
+ }
+}
+
+static void
+gst_rist_sink_on_new_sender_ssrc (GstRistSink * sink, guint session_id,
+ guint ssrc, GstElement * rtpbin)
+{
+ GObject *gstsession = NULL;
+ GObject *session = NULL;
+ GObject *source = NULL;
+
+ if (session_id != 0)
+ return;
+
+ g_signal_emit_by_name (rtpbin, "get-session", session_id, &gstsession);
+ g_signal_emit_by_name (rtpbin, "get-internal-session", session_id, &session);
+ g_signal_emit_by_name (session, "get-source-by-ssrc", ssrc, &source);
+
+ if (ssrc & 1)
+ g_object_set (source, "disable-rtcp", TRUE, NULL);
+ else
+ g_signal_connect (session, "on-app-rtcp", (GCallback) on_app_rtcp,
+ gstsession);
+
+ g_object_unref (source);
+ g_object_unref (session);
+}
+
+static void
+gst_rist_sink_on_new_receiver_ssrc (GstRistSink * sink, guint session_id,
+ guint ssrc, GstElement * rtpbin)
+{
+ if (session_id != 0)
+ return;
+
+ GST_INFO_OBJECT (sink, "Got RTCP remote SSRC %u", ssrc);
+ sink->rtcp_ssrc = ssrc;
+}
+
+static GstPadProbeReturn
+gst_rist_sink_fix_collision (GstPad * pad, GstPadProbeInfo * info,
+ gpointer user_data)
+{
+ GstEvent *event = info->data;
+ const GstStructure *cs;
+ GstStructure *s;
+ guint ssrc;
+
+ /* We simply ignore collisions */
+ if (GST_EVENT_TYPE (event) != GST_EVENT_CUSTOM_UPSTREAM)
+ return GST_PAD_PROBE_OK;
+
+ cs = gst_event_get_structure (event);
+ if (!gst_structure_has_name (cs, "GstRTPCollision"))
+ return GST_PAD_PROBE_OK;
+
+ gst_structure_get_uint (cs, "suggested-ssrc", &ssrc);
+ if ((ssrc & 1) == 0)
+ return GST_PAD_PROBE_OK;
+
+ event = info->data = gst_event_make_writable (event);
+ /* we can drop the const qualifier as we ensured writability */
+ s = (GstStructure *) gst_event_get_structure (event);
+ gst_structure_set (s, "suggested-ssrc", G_TYPE_UINT, ssrc - 1, NULL);
+
+ return GST_PAD_PROBE_OK;
+}
+
+static gboolean
+gst_rist_sink_set_caps (GstRistSink * sink, GstCaps * caps)
+{
+ const GstStructure *s = gst_caps_get_structure (caps, 0);
+
+ if (!gst_structure_get_uint (s, "ssrc", &sink->rtp_ssrc)) {
+ GST_ELEMENT_ERROR (sink, CORE, NEGOTIATION, ("No 'ssrc' field in caps."),
+ (NULL));
+ return FALSE;
+ }
+
+ if (sink->rtp_ssrc & 1) {
+ GST_ELEMENT_ERROR (sink, CORE, NEGOTIATION,
+ ("Invalid RIST SSRC, LSB must be zero."), (NULL));
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+static gboolean
+gst_rist_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
+{
+ GstRistSink *sink = GST_RIST_SINK (parent);
+ GstCaps *caps;
+ gboolean ret = TRUE;
+
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_CAPS:
+ gst_event_parse_caps (event, &caps);
+ ret = gst_rist_sink_set_caps (sink, caps);
+ break;
+ default:
+ break;
+ }
+
+ if (ret)
+ ret = gst_pad_event_default (pad, parent, event);
+ else
+ gst_event_unref (event);
+
+ return ret;
+}
+
+static void
+gst_rist_sink_init (GstRistSink * sink)
+{
+ GstPad *ssrc_filter_sinkpad;
+ GstCaps *ssrc_caps;
+ GstPad *pad, *gpad;
+ GstStructure *sdes = NULL;
+
+ /* Construct the RIST RTP sender pipeline.
+ *
+ * capsfilter*-> [send_rtp_sink_%u] -------- [send_rtp_src_%u] -> udpsink
+ * | rtpbin |
+ * udpsrc -> [recv_rtcp_sink_%u] -------- [send_rtcp_src_%u] -> * udpsink
+ *
+ * * To select RIST compatible SSRC
+ */
+ sink->rtpbin = gst_element_factory_make ("rtpbin", "rist_send_rtbpin");
+ if (!sink->rtpbin) {
+ sink->missing_plugin = "rtpmanager";
+ goto missing_plugin;
+ }
+
+ /* RIST specification says the SDES should only contain the CNAME */
+ g_object_get (sink->rtpbin, "sdes", &sdes, NULL);
+ gst_structure_remove_field (sdes, "tool");
+
+ gst_bin_add (GST_BIN (sink), sink->rtpbin);
+ g_object_set (sink->rtpbin, "do-retransmission", TRUE,
+ "rtp-profile", 3 /* AVFP */ ,
+ "sdes", sdes, NULL);
+ gst_structure_free (sdes);
+
+ g_signal_connect_swapped (sink->rtpbin, "request-pt-map",
+ G_CALLBACK (gst_rist_sink_request_pt_map), sink);
+ g_signal_connect_swapped (sink->rtpbin, "request-aux-sender",
+ G_CALLBACK (gst_rist_sink_request_aux_sender), sink);
+ g_signal_connect_swapped (sink->rtpbin, "on-new-sender-ssrc",
+ G_CALLBACK (gst_rist_sink_on_new_sender_ssrc), sink);
+ g_signal_connect_swapped (sink->rtpbin, "on-new-ssrc",
+ G_CALLBACK (gst_rist_sink_on_new_receiver_ssrc), sink);
+
+ sink->rtxbin = gst_bin_new ("rist_send_rtxbin");
+ g_object_ref_sink (sink->rtxbin);
+ sink->rtx_send = gst_element_factory_make ("ristrtxsend", "rist_rtx_send");
+ gst_bin_add (GST_BIN (sink->rtxbin), sink->rtx_send);
+ g_object_set (sink->rtx_send, "max-size-packets", 0, NULL);
+
+ pad = gst_element_get_static_pad (sink->rtx_send, "sink");
+ gpad = gst_ghost_pad_new ("sink_0", pad);
+ gst_object_unref (pad);
+ gst_element_add_pad (sink->rtxbin, gpad);
+
+ pad = gst_element_get_static_pad (sink->rtx_send, "src");
+ gpad = gst_ghost_pad_new ("src_0", pad);
+ gst_object_unref (pad);
+ gst_element_add_pad (sink->rtxbin, gpad);
+
+ sink->rtp_sink = gst_element_factory_make ("udpsink", "rist_rtp_udpsink");
+ sink->rtcp_src = gst_element_factory_make ("udpsrc", "rist_rtcp_udpsrc");
+ sink->rtcp_sink = gst_element_factory_make ("udpsink", "rist_rtcp_udpsink");
+ if (!sink->rtp_sink || !sink->rtcp_src || !sink->rtcp_sink) {
+ g_clear_object (&sink->rtp_sink);
+ g_clear_object (&sink->rtcp_src);
+ g_clear_object (&sink->rtcp_sink);
+ sink->missing_plugin = "udp";
+ goto missing_plugin;
+ }
+ gst_bin_add_many (GST_BIN (sink), sink->rtp_sink, sink->rtcp_src,
+ sink->rtcp_sink, NULL);
+ gst_element_set_locked_state (sink->rtcp_src, TRUE);
+ gst_element_set_locked_state (sink->rtcp_sink, TRUE);
+
+ sink->ssrc_filter = gst_element_factory_make ("capsfilter",
+ "rist_ssrc_filter");
+ if (!sink->ssrc_filter) {
+ sink->missing_plugin = "coreelements";
+ goto missing_plugin;
+ }
+ gst_bin_add (GST_BIN (sink), sink->ssrc_filter);
+
+ sink->rtp_ssrc = g_random_int () & ~1;
+ ssrc_caps = gst_caps_new_simple ("application/x-rtp",
+ "ssrc", G_TYPE_UINT, sink->rtp_ssrc, NULL);
+ gst_caps_append_structure (ssrc_caps,
+ gst_structure_new_empty ("application/x-rtp"));
+ g_object_set (sink->ssrc_filter, "caps", ssrc_caps, NULL);
+ gst_caps_unref (ssrc_caps);
+ gst_element_link_pads (sink->ssrc_filter, "src", sink->rtpbin,
+ "send_rtp_sink_0");
+ gst_element_link_pads (sink->rtpbin, "send_rtp_src_0", sink->rtp_sink,
+ "sink");
+ gst_element_link_pads (sink->rtcp_src, "src", sink->rtpbin,
+ "recv_rtcp_sink_0");
+ gst_element_link_pads (sink->rtpbin, "send_rtcp_src_0", sink->rtcp_sink,
+ "sink");
+
+ ssrc_filter_sinkpad = gst_element_get_static_pad (sink->ssrc_filter, "sink");
+ sink->sinkpad = gst_ghost_pad_new_from_template ("sink", ssrc_filter_sinkpad,
+ gst_static_pad_template_get (&sink_templ));
+ gst_pad_set_event_function (sink->sinkpad, gst_rist_sink_event);
+ gst_element_add_pad (GST_ELEMENT (sink), sink->sinkpad);
+ gst_object_unref (ssrc_filter_sinkpad);
+
+ gst_pad_add_probe (sink->sinkpad, GST_PAD_PROBE_TYPE_EVENT_UPSTREAM,
+ gst_rist_sink_fix_collision, sink, NULL);
+
+ return;
+
+missing_plugin:
+ {
+ GST_ERROR_OBJECT (sink, "'%s' plugin is missing.", sink->missing_plugin);
+ sink->construct_failed = TRUE;
+ /* Just make our element valid, so we fail cleanly */
+ gst_element_add_pad (GST_ELEMENT (sink),
+ gst_pad_new_from_static_template (&sink_templ, "sink"));
+ }
+}
+
+static GstStateChangeReturn
+gst_rist_sink_start (GstRistSink * sink)
+{
+ GSocket *socket = NULL;
+ GInetAddress *iaddr = NULL;
+ gchar *remote_addr = NULL;
+ guint remote_port;
+ GError *error = NULL;
+
+ if (sink->construct_failed) {
+ GST_ELEMENT_ERROR (sink, CORE, MISSING_PLUGIN,
+ ("Your GStreamer installation is missing plugin '%s'",
+ sink->missing_plugin), (NULL));
+ return GST_STATE_CHANGE_FAILURE;
+ }
+
+ g_object_get (sink->rtcp_sink, "host", &remote_addr, "port", &remote_port,
+ NULL);
+
+ iaddr = g_inet_address_new_from_string (remote_addr);
+ if (!iaddr) {
+ GList *results;
+ GResolver *resolver = NULL;
+
+ resolver = g_resolver_get_default ();
+ results = g_resolver_lookup_by_name (resolver, remote_addr, NULL, &error);
+
+ if (!results) {
+ g_object_unref (resolver);
+ goto dns_resolve_failed;
+ }
+
+ iaddr = G_INET_ADDRESS (g_object_ref (results->data));
+
+ g_free (remote_addr);
+ remote_addr = g_inet_address_to_string (iaddr);
+
+ g_resolver_free_addresses (results);
+ g_object_unref (resolver);
+ }
+
+ if (g_inet_address_get_is_multicast (iaddr)) {
+ g_object_set (sink->rtcp_src, "address", remote_addr, "port", remote_port,
+ NULL);
+ } else {
+ const gchar *any_addr;
+
+ if (g_inet_address_get_family (iaddr) == G_SOCKET_FAMILY_IPV6)
+ any_addr = "::";
+ else
+ any_addr = "0.0.0.0";
+
+ g_object_set (sink->rtcp_src, "address", any_addr, "port", 0, NULL);
+ }
+ g_object_unref (iaddr);
+
+ gst_element_set_locked_state (sink->rtcp_src, FALSE);
+ gst_element_sync_state_with_parent (sink->rtcp_src);
+
+ /* share the socket created by the sink */
+ g_object_get (sink->rtcp_src, "used-socket", &socket, NULL);
+ g_object_set (sink->rtcp_sink, "socket", socket, "auto-multicast", FALSE,
+ "close-socket", FALSE, NULL);
+ g_object_unref (socket);
+
+ gst_element_set_locked_state (sink->rtcp_sink, FALSE);
+ gst_element_sync_state_with_parent (sink->rtcp_sink);
+
+ return GST_STATE_CHANGE_SUCCESS;
+
+dns_resolve_failed:
+ GST_ELEMENT_ERROR (sink, RESOURCE, NOT_FOUND,
+ ("Could not resolve hostname '%s'", remote_addr),
+ ("DNS resolver reported: %s", error->message));
+ g_free (remote_addr);
+ g_error_free (error);
+ return GST_STATE_CHANGE_FAILURE;
+}
+
+static GstStructure *
+gst_rist_sink_create_stats (GstRistSink * sink)
+{
+ GObject *session = NULL, *source = NULL;
+ GstStructure *sstats = NULL, *ret;
+ guint64 pkt_sent = 0, rtx_sent = 0, rtt;
+ guint rb_rtt = 0;
+
+ ret = gst_structure_new_empty ("rist/x-sender-stats");
+
+ g_signal_emit_by_name (sink->rtpbin, "get-internal-session", 0, &session);
+ if (!session)
+ return ret;
+
+ g_signal_emit_by_name (session, "get-source-by-ssrc", sink->rtp_ssrc,
+ &source);
+ if (source) {
+ g_object_get (source, "stats", &sstats, NULL);
+ gst_structure_get_uint64 (sstats, "packets-sent", &pkt_sent);
+ gst_structure_free (sstats);
+ g_clear_object (&source);
+ }
+
+ g_signal_emit_by_name (session, "get-source-by-ssrc", sink->rtcp_ssrc,
+ &source);
+ if (source) {
+ g_object_get (source, "stats", &sstats, NULL);
+ gst_structure_get_uint (sstats, "rb-round-trip", &rb_rtt);
+ gst_structure_free (sstats);
+ g_clear_object (&source);
+ }
+ g_object_unref (session);
+
+ g_object_get (sink->rtx_send, "num-rtx-packets", &rtx_sent, NULL);
+
+ /* rb_rtt is in Q16 in NTP time */
+ rtt = gst_util_uint64_scale (rb_rtt, GST_SECOND, 65536);
+
+ gst_structure_set (ret, "sent-original-packets", G_TYPE_UINT64, pkt_sent,
+ "sent-retransmitted-packets", G_TYPE_UINT64, rtx_sent,
+ "round-trip-time", G_TYPE_UINT64, rtt, NULL);
+
+ return ret;
+}
+
+static gboolean
+gst_rist_sink_dump_stats (GstClock * clock, GstClockTime time, GstClockID id,
+ gpointer user_data)
+{
+ GstRistSink *sink = GST_RIST_SINK (user_data);
+ GstStructure *stats = gst_rist_sink_create_stats (sink);
+
+ gst_println ("%s: %" GST_PTR_FORMAT, GST_OBJECT_NAME (sink), stats);
+
+ gst_structure_free (stats);
+ return TRUE;
+}
+
+static void
+gst_rist_sink_enable_stats_interval (GstRistSink * sink)
+{
+ GstClock *clock;
+ GstClockTime start, interval;
+
+ if (sink->stats_interval == 0)
+ return;
+
+ interval = sink->stats_interval * GST_MSECOND;
+ clock = gst_system_clock_obtain ();
+ start = gst_clock_get_time (clock) + interval;
+
+ sink->stats_cid = gst_clock_new_periodic_id (clock, start, interval);
+ gst_clock_id_wait_async (sink->stats_cid, gst_rist_sink_dump_stats,
+ gst_object_ref (sink), (GDestroyNotify) gst_object_unref);
+
+ gst_object_unref (clock);
+}
+
+static void
+gst_rist_sink_disable_stats_interval (GstRistSink * sink)
+{
+ if (sink->stats_cid) {
+ gst_clock_id_unschedule (sink->stats_cid);
+ gst_clock_id_unref (sink->stats_cid);
+ sink->stats_cid = NULL;
+ }
+}
+
+static GstStateChangeReturn
+gst_rist_sink_change_state (GstElement * element, GstStateChange transition)
+{
+ GstRistSink *sink = GST_RIST_SINK (element);
+ GstStateChangeReturn ret;
+
+ switch (transition) {
+ case GST_STATE_CHANGE_PAUSED_TO_READY:
+ gst_rist_sink_disable_stats_interval (sink);
+ break;
+ default:
+ break;
+ }
+
+ ret = GST_ELEMENT_CLASS (gst_rist_sink_parent_class)->change_state (element,
+ transition);
+
+ switch (transition) {
+ case GST_STATE_CHANGE_NULL_TO_READY:
+ ret = gst_rist_sink_start (sink);
+ break;
+ case GST_STATE_CHANGE_READY_TO_PAUSED:
+ gst_rist_sink_enable_stats_interval (sink);
+ break;
+ default:
+ break;
+ }
+
+ return ret;
+}
+
+static void
+gst_rist_sink_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec)
+{
+ GstRistSink *sink = GST_RIST_SINK (object);
+ GstElement *session = NULL;
+ GstClockTime interval;
+ GstStructure *sdes;
+
+ if (sink->construct_failed)
+ return;
+
+ switch (prop_id) {
+ case PROP_ADDRESS:
+ g_object_get_property (G_OBJECT (sink->rtp_sink), "host", value);
+ break;
+
+ case PROP_PORT:
+ g_object_get_property (G_OBJECT (sink->rtp_sink), "port", value);
+ break;
+
+ case PROP_SENDER_BUFFER:
+ g_object_get_property (G_OBJECT (sink->rtx_send), "max-size-time", value);
+ break;
+
+ case PROP_MIN_RTCP_INTERVAL:
+ g_signal_emit_by_name (sink->rtpbin, "get-session", 0, &session);
+ g_object_get (session, "rtcp-min-interval", &interval, NULL);
+ g_value_set_uint (value, (guint) (interval / GST_MSECOND));
+ g_object_unref (session);
+ break;
+
+ case PROP_MAX_RTCP_BANDWIDTH:
+ g_signal_emit_by_name (sink->rtpbin, "get-session", 0, &session);
+ g_object_get_property (G_OBJECT (session), "rtcp-fraction", value);
+ g_object_unref (session);
+ break;
+
+ case PROP_STATS_UPDATE_INTERVAL:
+ g_value_set_uint (value, sink->stats_interval);
+ break;
+
+ case PROP_STATS:
+ g_value_take_boxed (value, gst_rist_sink_create_stats (sink));
+ break;
+
+ case PROP_CNAME:
+ g_object_get (sink->rtpbin, "sdes", &sdes, NULL);
+ g_value_set_string (value, gst_structure_get_string (sdes, "cname"));
+ gst_structure_free (sdes);
+ break;
+
+ case PROP_MULTICAST_LOOPBACK:
+ g_object_get_property (G_OBJECT (sink->rtp_sink), "loop", value);
+ break;
+
+ case PROP_MULTICAST_IFACE:
+ g_object_get_property (G_OBJECT (sink->rtp_sink),
+ "multicast-iface", value);
+ break;
+
+ case PROP_MULTICAST_TTL:
+ g_object_get_property (G_OBJECT (sink->rtp_sink), "ttl-mc", value);
+ break;
+
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_rist_sink_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ GstRistSink *sink = GST_RIST_SINK (object);
+ GstElement *session = NULL;
+ GstStructure *sdes;
+
+ if (sink->construct_failed)
+ return;
+
+ switch (prop_id) {
+ case PROP_ADDRESS:
+ g_object_set_property (G_OBJECT (sink->rtp_sink), "host", value);
+ g_object_set_property (G_OBJECT (sink->rtcp_sink), "host", value);
+ break;
+
+ case PROP_PORT:{
+ guint port = g_value_get_uint (value);
+
+ /* According to 5.1.1, RTCP receiver port most be event number and RTCP
+ * port should be the RTP port + 1 */
+
+ if (port & 0x1) {
+ g_warning ("Invalid RIST port %u, should be an even number.", port);
+ return;
+ }
+
+ g_object_set (sink->rtp_sink, "port", port, NULL);
+ g_object_set (sink->rtcp_sink, "port", port + 1, NULL);
+ break;
+ }
+
+ case PROP_SENDER_BUFFER:
+ g_object_set (sink->rtx_send,
+ "max-size-time", g_value_get_uint (value), NULL);
+ break;
+
+ case PROP_MIN_RTCP_INTERVAL:
+ g_signal_emit_by_name (sink->rtpbin, "get-session", 0, &session);
+ g_object_set (session, "rtcp-min-interval",
+ g_value_get_uint (value) * GST_MSECOND, NULL);
+ g_object_unref (session);
+ break;
+
+ case PROP_MAX_RTCP_BANDWIDTH:
+ g_signal_emit_by_name (sink->rtpbin, "get-session", 0, &session);
+ g_object_set (session, "rtcp-fraction", g_value_get_double (value), NULL);
+ g_object_unref (session);
+ break;
+
+ case PROP_STATS_UPDATE_INTERVAL:
+ sink->stats_interval = g_value_get_uint (value);
+ break;
+
+ case PROP_CNAME:
+ g_object_get (sink->rtpbin, "sdes", &sdes, NULL);
+ gst_structure_set_value (sdes, "cname", value);
+ g_object_set (sink->rtpbin, "sdes", sdes, NULL);
+ gst_structure_free (sdes);
+ break;
+
+ case PROP_MULTICAST_LOOPBACK:
+ g_object_set_property (G_OBJECT (sink->rtp_sink), "loop", value);
+ g_object_set_property (G_OBJECT (sink->rtcp_sink), "loop", value);
+ break;
+
+ case PROP_MULTICAST_IFACE:
+ g_object_set_property (G_OBJECT (sink->rtp_sink),
+ "multicast-iface", value);
+ g_object_set_property (G_OBJECT (sink->rtcp_sink),
+ "multicast-iface", value);
+ break;
+
+ case PROP_MULTICAST_TTL:
+ g_object_set_property (G_OBJECT (sink->rtp_sink), "ttl-mc", value);
+ g_object_set_property (G_OBJECT (sink->rtcp_sink), "ttl-mc", value);
+ break;
+
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_rist_sink_finalize (GObject * object)
+{
+ GstRistSink *sink = GST_RIST_SINK (object);
+ g_clear_object (&sink->rtxbin);
+
+ G_OBJECT_CLASS (gst_rist_sink_parent_class)->finalize (object);
+}
+
+static void
+gst_rist_sink_class_init (GstRistSinkClass * klass)
+{
+ GstElementClass *element_class = (GstElementClass *) klass;
+ GObjectClass *object_class = (GObjectClass *) klass;
+
+ gst_element_class_set_metadata (element_class,
+ "RIST Sink", "Source/Network",
+ "Sink that implements RIST TR-06-1 streaming specification",
+ "Nicolas Dufresne <nicolas.dufresne@collabora.com");
+ gst_element_class_add_static_pad_template (element_class, &sink_templ);
+
+ element_class->change_state = gst_rist_sink_change_state;
+
+ object_class->get_property = gst_rist_sink_get_property;
+ object_class->set_property = gst_rist_sink_set_property;
+ object_class->finalize = gst_rist_sink_finalize;
+
+ g_object_class_install_property (object_class, PROP_ADDRESS,
+ g_param_spec_string ("address", "Address",
+ "Address to send packets to (can be IPv4 or IPv6).", "0.0.0.0",
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (object_class, PROP_PORT,
+ g_param_spec_uint ("port", "Port", "The port RTP packets will be sent, "
+ "RTCP port is derived from it, this port must be an even number.",
+ 2, 65534, 5004,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
+
+ g_object_class_install_property (object_class, PROP_SENDER_BUFFER,
+ g_param_spec_uint ("sender-buffer", "Sender Buffer",
+ "Size of the retransmission queue in ms", 0, G_MAXUINT, 1200,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
+
+ g_object_class_install_property (object_class, PROP_MIN_RTCP_INTERVAL,
+ g_param_spec_uint ("min-rtcp-interval", "Minimum RTCP Intercal",
+ "The minimum interval in ms between two regular successive RTCP "
+ "packets.", 0, 100, 100,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
+
+ g_object_class_install_property (object_class, PROP_MAX_RTCP_BANDWIDTH,
+ g_param_spec_double ("max-rtcp-bandwidth", "Maximum RTCP Bandwidth",
+ "The maximum bandwidth used for RTCP in fraction of RTP bandwdith",
+ 0.0, 0.05, 0.05,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
+
+ g_object_class_install_property (object_class, PROP_STATS_UPDATE_INTERVAL,
+ g_param_spec_uint ("stats-update-interval", "Statistics Update Interval",
+ "The interval between 'stats' update notification (0 disabled)",
+ 0, G_MAXUINT, 0,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
+
+ g_object_class_install_property (object_class, PROP_STATS,
+ g_param_spec_boxed ("stats", "Statistics",
+ "Statistic in a GstStructure named 'rist/x-sender-stats'",
+ GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (object_class, PROP_CNAME,
+ g_param_spec_string ("cname", "CName",
+ "Set the CNAME in the SDES block of the sender report.", NULL,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (object_class, PROP_MULTICAST_LOOPBACK,
+ g_param_spec_boolean ("multicast-loopback", "Multicast Loopback",
+ "When enabled, the packet will be received locally.", FALSE,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
+
+ g_object_class_install_property (object_class, PROP_MULTICAST_IFACE,
+ g_param_spec_string ("multicast-iface", "multicast-iface",
+ "The multicast interface to use to send packets.", NULL,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (object_class, PROP_MULTICAST_TTL,
+ g_param_spec_int ("multicast-ttl", "Multicast TTL",
+ "The multicast time-to-live parameter.", 0, 255, 1,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
+}
diff --git a/gst/rist/gstristsrc.c b/gst/rist/gstristsrc.c
new file mode 100644
index 000000000..9e8d403f6
--- /dev/null
+++ b/gst/rist/gstristsrc.c
@@ -0,0 +1,1021 @@
+/* GStreamer RIST plugin
+ * Copyright (C) 2019 Net Insight AB
+ * Author: Nicolas Dufresne <nicolas.dufresne@collabora.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * SECTION:element-ristsrc
+ * @title: ristsrc
+ * @see_also: ristsink
+ *
+ * This element implements RIST TR-06-1 Simple Profile receiver. The stream
+ * produced by this element will be RTP payloaded. This element also implement
+ * the URI scheme `rist://` allowing to render RIST streams in GStreamer based
+ * media players. The RIST uri handler also allow setting propertied through
+ * the URI query.
+ *
+ * ## Example launch line
+ * |[
+ * gst-launch-1.0 ristsrc address=0.0.0.0 port=5004 ! rtpmp2depay ! udpsink
+ * gst-play-1.0 "rist://0.0.0.0:5004?receiver-buffer=700"
+ * ]|
+ */
+
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gio/gio.h>
+#include <gst/net/net.h>
+#include <gst/rtp/rtp.h>
+
+/* for setsockopt() */
+#ifndef G_OS_WIN32
+#include <sys/types.h>
+#include <sys/socket.h>
+#endif
+
+#include "gstrist.h"
+
+GST_DEBUG_CATEGORY_STATIC (gst_rist_src_debug);
+#define GST_CAT_DEFAULT gst_rist_src_debug
+
+enum
+{
+ PROP_ADDRESS = 1,
+ PROP_PORT,
+ PROP_RECEIVER_BUFFER,
+ PROP_REORDER_SECTION,
+ PROP_MAX_RTX_RETRIES,
+ PROP_MIN_RTCP_INTERVAL,
+ PROP_MAX_RTCP_BANDWIDTH,
+ PROP_STATS_UPDATE_INTERVAL,
+ PROP_STATS,
+ PROP_CNAME,
+ PROP_MULTICAST_LOOPBACK,
+ PROP_MULTICAST_IFACE,
+ PROP_MULTICAST_TTL
+};
+
+static GstStaticPadTemplate src_templ = GST_STATIC_PAD_TEMPLATE ("src",
+ GST_PAD_SRC,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS ("application/x-rtp"));
+
+struct _GstRistSrc
+{
+ GstBin parent;
+
+ GstUri *uri;
+
+ /* Elements contained in the pipeline, the rtp/rtcp_src are 'udpsrc' */
+ GstElement *rtpbin;
+ GstElement *rtp_src;
+ GstElement *rtcp_src;
+ GstElement *rtcp_sink;
+ gulong rtcp_recv_probe;
+ gulong rtcp_send_probe;
+ GSocketAddress *rtcp_send_addr;
+ GstPad *srcpad;
+ gint multicast_ttl;
+
+ /* RTX Elements */
+ GstElement *rtxbin;
+ GstElement *rtx_receive;
+
+ /* For property handling */
+ guint reorder_section;
+ guint max_rtx_retries;
+
+ /* For stats */
+ guint stats_interval;
+ guint32 rtp_ssrc;
+ GstClockID stats_cid;
+ GstElement *jitterbuffer;
+
+ /* This is set whenever there is a pipeline construction failure, and used
+ * to fail state changes later */
+ gboolean construct_failed;
+ const gchar *missing_plugin;
+};
+
+static void gst_rist_src_uri_init (gpointer g_iface, gpointer iface_data);
+
+G_DEFINE_TYPE_WITH_CODE (GstRistSrc, gst_rist_src, GST_TYPE_BIN,
+ G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_rist_src_uri_init);
+ GST_DEBUG_CATEGORY_INIT (gst_rist_src_debug, "ristsrc", 0, "RIST Source"));
+
+static void
+gst_rist_src_pad_added (GstRistSrc * src, GstPad * new_pad, GstElement * rtpbin)
+{
+ GST_TRACE_OBJECT (src, "New pad '%s'.", GST_PAD_NAME (new_pad));
+
+ if (g_str_has_prefix (GST_PAD_NAME (new_pad), "recv_rtp_src_0_")) {
+ GST_DEBUG_OBJECT (src, "Using new pad '%s' as ghost pad target.",
+ GST_PAD_NAME (new_pad));
+ gst_ghost_pad_set_target (GST_GHOST_PAD (src->srcpad), new_pad);
+ }
+}
+
+static GstCaps *
+gst_rist_src_request_pt_map (GstRistSrc * src, GstElement * session, guint pt)
+{
+ const GstRTPPayloadInfo *pt_info;
+ GstCaps *ret;
+
+ pt_info = gst_rtp_payload_info_for_pt (pt);
+ if (!pt_info || !pt_info->clock_rate)
+ return NULL;
+
+ ret = gst_caps_new_simple ("application/x-rtp",
+ "media", G_TYPE_STRING, pt_info->media,
+ "encoding_name", G_TYPE_STRING, pt_info->encoding_name,
+ "clock-rate", G_TYPE_INT, (gint) pt_info->clock_rate, NULL);
+
+ /* FIXME add sprop-parameter-set if any */
+ g_warn_if_fail (pt_info->encoding_parameters == NULL);
+
+ return ret;
+}
+
+static GstElement *
+gst_rist_src_request_aux_receiver (GstRistSrc * src, guint session_id,
+ GstElement * rtpbin)
+{
+ if (session_id != 0)
+ return NULL;
+
+ return gst_object_ref (src->rtxbin);
+}
+
+/* Overrides the nack creation. Right now we don't send mixed NACKS type, we
+ * simply send a set of range NACK if it takes less space, or allow adding
+ * more seqnum. */
+static guint
+gst_rist_src_on_sending_nacks (GObject * session, guint sender_ssrc,
+ guint media_ssrc, GArray * nacks, GstBuffer * buffer, gpointer user_data)
+{
+ GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT;
+ GstRTCPPacket packet;
+ guint8 *app_data;
+ guint nacked_seqnums = 0;
+ guint range_size = 0;
+ guint n_rg_nacks = 0;
+ guint n_fb_nacks = 0;
+ guint16 seqnum;
+ guint i;
+ gint diff;
+
+ /* We'll assume that range will be best, and find how many generic NACK
+ * would have been created. If this number ends up being smaller, we will
+ * just remove the APP packet and return 0, leaving it to RTPSession to
+ * create the generic NACK.*/
+
+ gst_rtcp_buffer_map (buffer, GST_MAP_READWRITE, &rtcp);
+ if (!gst_rtcp_buffer_add_packet (&rtcp, GST_RTCP_TYPE_APP, &packet))
+ /* exit because the packet is full, will put next request in a
+ * further packet */
+ goto done;
+
+ gst_rtcp_packet_app_set_ssrc (&packet, media_ssrc);
+ gst_rtcp_packet_app_set_name (&packet, "RIST");
+
+ if (!gst_rtcp_packet_app_set_data_length (&packet, 1)) {
+ gst_rtcp_packet_remove (&packet);
+ GST_WARNING ("no range nacks fit in the packet");
+ goto done;
+ }
+
+ app_data = gst_rtcp_packet_app_get_data (&packet);
+ for (i = 0; i < nacks->len; i = nacked_seqnums) {
+ guint j;
+ seqnum = g_array_index (nacks, guint16, i);
+
+ if (!gst_rtcp_packet_app_set_data_length (&packet, n_rg_nacks + 1))
+ break;
+
+ n_rg_nacks++;
+ nacked_seqnums++;
+
+ for (j = i + 1; j < nacks->len; j++) {
+ guint16 next_seqnum = g_array_index (nacks, guint16, j);
+ diff = gst_rtp_buffer_compare_seqnum (seqnum, next_seqnum);
+ GST_TRACE ("[%u][%u] %u %u diff %i", i, j, seqnum, next_seqnum, diff);
+ if (diff > (j - i))
+ break;
+
+ nacked_seqnums++;
+ }
+
+ range_size = j - i - 1;
+ GST_WRITE_UINT32_BE (app_data, seqnum << 16 | range_size);
+ app_data += 4;
+ }
+
+ /* count how many FB NACK it would take to wrap nacked_seqnums */
+ seqnum = g_array_index (nacks, guint16, 0);
+ n_fb_nacks = 1;
+ for (i = 1; i < nacked_seqnums; i++) {
+ guint16 next_seqnum = g_array_index (nacks, guint16, i);
+ diff = gst_rtp_buffer_compare_seqnum (seqnum, next_seqnum);
+ if (diff > 16) {
+ n_fb_nacks++;
+ seqnum = next_seqnum;
+ }
+ }
+
+ if (n_fb_nacks <= n_rg_nacks) {
+ GST_DEBUG ("Not sending %u range nacks, as %u FB nacks will be smaller",
+ n_rg_nacks, n_fb_nacks);
+ gst_rtcp_packet_remove (&packet);
+ nacked_seqnums = 0;
+ goto done;
+ }
+
+ GST_DEBUG ("Sent %u seqnums into %u Range NACKs", nacked_seqnums, n_rg_nacks);
+
+done:
+ gst_rtcp_buffer_unmap (&rtcp);
+ return nacked_seqnums;
+}
+
+static void
+gst_rist_src_on_new_ssrc (GstRistSrc * src, guint session_id, guint ssrc,
+ GstElement * rtpbin)
+{
+ GObject *session = NULL;
+ GObject *source = NULL;
+
+ if (session_id != 0)
+ return;
+
+ g_signal_emit_by_name (rtpbin, "get-internal-session", session_id, &session);
+ g_signal_emit_by_name (session, "get-source-by-ssrc", ssrc, &source);
+
+ if (ssrc & 1)
+ g_object_set (source, "disable-rtcp", TRUE, "probation", 0, NULL);
+ else
+ g_signal_connect (session, "on-sending-nacks",
+ (GCallback) gst_rist_src_on_sending_nacks, NULL);
+
+ g_object_unref (source);
+ g_object_unref (session);
+}
+
+static void
+gst_rist_src_new_jitterbuffer (GstRistSrc * src, GstElement * jitterbuffer,
+ guint session, guint ssrc, GstElement * rtpbin)
+{
+ GST_OBJECT_LOCK (src);
+ g_object_set (jitterbuffer, "rtx-delay", src->reorder_section,
+ "rtx-max-retries", src->max_rtx_retries, NULL);
+
+ if ((ssrc & 1) == 0) {
+ GST_INFO_OBJECT (src, "Saving jitterbuffer for session %u ssrc %u",
+ session, ssrc);
+ g_clear_object (&src->jitterbuffer);
+ src->jitterbuffer = gst_object_ref (jitterbuffer);
+ src->rtp_ssrc = ssrc;
+ }
+
+ GST_OBJECT_UNLOCK (src);
+}
+
+static void
+gst_rist_src_init (GstRistSrc * src)
+{
+ GstPad *pad, *gpad;
+ GstStructure *sdes = NULL;
+
+ /* Construct the RIST RTP receiver pipeline.
+ *
+ * udpsrc -> [recv_rtp_sink_%u] -------- [recv_rtp_src_%u_%u_%u]
+ * | rtpbin |
+ * udpsrc -> [recv_rtcp_sink_%u] -------- [send_rtcp_src_%u] -> udpsink
+ *
+ * This pipeline is fixed for now, note that optionally an FEC stream could
+ * be added later.
+ */
+ src->srcpad = gst_ghost_pad_new_no_target_from_template ("src",
+ gst_static_pad_template_get (&src_templ));
+ gst_element_add_pad (GST_ELEMENT (src), src->srcpad);
+
+ src->rtpbin = gst_element_factory_make ("rtpbin", "rist_recv_rtbpin");
+ if (!src->rtpbin) {
+ src->missing_plugin = "rtpmanager";
+ goto missing_plugin;
+ }
+
+ /* RIST specification says the SDES should only contain the CNAME */
+ g_object_get (src->rtpbin, "sdes", &sdes, NULL);
+ gst_structure_remove_field (sdes, "tool");
+
+ gst_bin_add (GST_BIN (src), src->rtpbin);
+ g_object_set (src->rtpbin, "do-retransmission", TRUE,
+ "rtp-profile", 3 /* AVPF */ ,
+ "sdes", sdes, NULL);
+
+ gst_structure_free (sdes);
+
+ g_signal_connect_swapped (src->rtpbin, "request-pt-map",
+ G_CALLBACK (gst_rist_src_request_pt_map), src);
+ g_signal_connect_swapped (src->rtpbin, "request-aux-receiver",
+ G_CALLBACK (gst_rist_src_request_aux_receiver), src);
+
+ src->rtxbin = gst_bin_new ("rist_recv_rtxbin");
+ g_object_ref_sink (src->rtxbin);
+ src->rtx_receive = gst_element_factory_make ("ristrtxreceive",
+ "rist_rtx_receive");
+ gst_bin_add (GST_BIN (src->rtxbin), src->rtx_receive);
+
+ pad = gst_element_get_static_pad (src->rtx_receive, "sink");
+ gpad = gst_ghost_pad_new ("sink_0", pad);
+ gst_object_unref (pad);
+ gst_element_add_pad (src->rtxbin, gpad);
+
+ pad = gst_element_get_static_pad (src->rtx_receive, "src");
+ gpad = gst_ghost_pad_new ("src_0", pad);
+ gst_object_unref (pad);
+ gst_element_add_pad (src->rtxbin, gpad);
+
+ src->rtp_src = gst_element_factory_make ("udpsrc", "rist_rtp_udpsrc");
+ src->rtcp_src = gst_element_factory_make ("udpsrc", "rist_rtcp_udpsrc");
+ src->rtcp_sink =
+ gst_element_factory_make ("dynudpsink", "rist_rtcp_dynudpsink");
+ if (!src->rtp_src || !src->rtcp_src || !src->rtcp_sink) {
+ g_clear_object (&src->rtp_src);
+ g_clear_object (&src->rtcp_src);
+ g_clear_object (&src->rtcp_sink);
+ src->missing_plugin = "udp";
+ goto missing_plugin;
+ }
+ gst_bin_add_many (GST_BIN (src), src->rtp_src, src->rtcp_src,
+ src->rtcp_sink, NULL);
+ g_object_set (src->rtcp_sink, "sync", FALSE, "async", FALSE, NULL);
+ /* delay udpsink startup, we will give it the socket from the RTCP udpsrc,
+ * but socket can only be set in NULL state */
+ gst_element_set_locked_state (src->rtcp_sink, TRUE);
+
+ gst_element_link_pads (src->rtp_src, "src", src->rtpbin, "recv_rtp_sink_0");
+ gst_element_link_pads (src->rtcp_src, "src", src->rtpbin, "recv_rtcp_sink_0");
+ gst_element_link_pads (src->rtpbin, "send_rtcp_src_0",
+ src->rtcp_sink, "sink");
+
+ g_signal_connect_swapped (src->rtpbin, "pad-added",
+ G_CALLBACK (gst_rist_src_pad_added), src);
+ g_signal_connect_swapped (src->rtpbin, "on-new-ssrc",
+ G_CALLBACK (gst_rist_src_on_new_ssrc), src);
+ g_signal_connect_swapped (src->rtpbin, "new-jitterbuffer",
+ G_CALLBACK (gst_rist_src_new_jitterbuffer), src);
+
+ return;
+
+missing_plugin:
+ {
+ GST_ERROR_OBJECT (src, "'%s' plugin is missing.", src->missing_plugin);
+ src->construct_failed = TRUE;
+ }
+}
+
+static GstPadProbeReturn
+gst_rist_src_on_recv_rtcp (GstPad * pad, GstPadProbeInfo * info,
+ gpointer user_data)
+{
+ GstRistSrc *src = GST_RIST_SRC (user_data);
+ GstBuffer *buffer;
+ GstNetAddressMeta *meta;
+
+ if (info->type == GST_PAD_PROBE_TYPE_BUFFER_LIST) {
+ GstBufferList *buffer_list = info->data;
+ buffer = gst_buffer_list_get (buffer_list, 0);
+ } else {
+ buffer = info->data;
+ }
+
+ meta = gst_buffer_get_net_address_meta (buffer);
+
+ GST_OBJECT_LOCK (src);
+ g_clear_object (&src->rtcp_send_addr);
+ src->rtcp_send_addr = g_object_ref (meta->addr);
+ GST_OBJECT_UNLOCK (src);
+
+ return GST_PAD_PROBE_OK;
+}
+
+static inline void
+gst_rist_src_attach_net_address_meta (GstRistSrc * src, GstBuffer * buffer)
+{
+ GST_OBJECT_LOCK (src);
+ if (src->rtcp_send_addr)
+ gst_buffer_add_net_address_meta (buffer, src->rtcp_send_addr);
+ GST_OBJECT_UNLOCK (src);
+}
+
+static GstPadProbeReturn
+gst_rist_src_on_send_rtcp (GstPad * pad, GstPadProbeInfo * info,
+ gpointer user_data)
+{
+ GstRistSrc *src = GST_RIST_SRC (user_data);
+
+ if (info->type == GST_PAD_PROBE_TYPE_BUFFER_LIST) {
+ GstBufferList *buffer_list = info->data;
+ GstBuffer *buffer;
+ gint i;
+
+ info->data = buffer_list = gst_buffer_list_make_writable (buffer_list);
+ for (i = 0; i < gst_buffer_list_length (buffer_list); i++) {
+ buffer = gst_buffer_list_get (buffer_list, i);
+ gst_rist_src_attach_net_address_meta (src, buffer);
+ }
+ } else {
+ GstBuffer *buffer = info->data;
+ info->data = buffer = gst_buffer_make_writable (buffer);
+ gst_rist_src_attach_net_address_meta (src, buffer);
+ }
+
+ return GST_PAD_PROBE_OK;
+}
+
+static GstStateChangeReturn
+gst_rist_src_start (GstRistSrc * src)
+{
+ GstPad *pad;
+ GSocket *socket = NULL;
+ gchar *address;
+ guint rtcp_port;
+ GInetAddress *iaddr;
+
+ if (src->construct_failed) {
+ GST_ELEMENT_ERROR (src, CORE, MISSING_PLUGIN,
+ ("Your GStreamer installation is missing plugin '%s'",
+ src->missing_plugin), (NULL));
+ return GST_STATE_CHANGE_FAILURE;
+ }
+
+ g_object_get (src->rtcp_src, "used-socket", &socket,
+ "address", &address, "port", &rtcp_port, NULL);
+
+ iaddr = g_inet_address_new_from_string (address);
+ g_free (address);
+
+ if (g_inet_address_get_is_multicast (iaddr)) {
+ /* mc-ttl is not supported by dynudpsink */
+ g_socket_set_multicast_ttl (socket, src->multicast_ttl);
+ /* In multicast, send RTCP to the multicast group */
+ src->rtcp_send_addr = g_inet_socket_address_new (iaddr, rtcp_port);
+ } else {
+ /* In unicast, send RTCP to the detected sender address */
+ pad = gst_element_get_static_pad (src->rtcp_src, "src");
+ src->rtcp_recv_probe = gst_pad_add_probe (pad,
+ GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST,
+ gst_rist_src_on_recv_rtcp, src, NULL);
+ gst_object_unref (pad);
+ }
+ g_object_unref (iaddr);
+
+ pad = gst_element_get_static_pad (src->rtcp_sink, "sink");
+ src->rtcp_send_probe = gst_pad_add_probe (pad,
+ GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST,
+ gst_rist_src_on_send_rtcp, src, NULL);
+ gst_object_unref (pad);
+
+ /* share the socket created by the source */
+ g_object_set (src->rtcp_sink, "socket", socket, "close-socket", FALSE, NULL);
+ g_object_unref (socket);
+
+ gst_element_set_locked_state (src->rtcp_sink, FALSE);
+ gst_element_sync_state_with_parent (src->rtcp_sink);
+
+ return GST_STATE_CHANGE_SUCCESS;
+}
+
+static GstStructure *
+gst_rist_src_create_stats (GstRistSrc * src)
+{
+ GObject *session = NULL, *source = NULL;
+ GstStructure *stats = NULL, *ret;
+ guint64 dropped = 0, received = 0, recovered = 0, lost = 0;
+ guint64 duplicates = 0, rtx_sent = 0, rtt = 0;
+
+ ret = gst_structure_new_empty ("rist/x-receiver-stats");
+
+ g_signal_emit_by_name (src->rtpbin, "get-internal-session", 0, &session);
+ if (!session)
+ return ret;
+
+ g_signal_emit_by_name (session, "get-source-by-ssrc", src->rtp_ssrc, &source);
+ if (source) {
+ gint packets_lost;
+ g_object_get (source, "stats", &stats, NULL);
+ gst_structure_get_int (stats, "packets-lost", &packets_lost);
+ gst_structure_free (stats);
+ g_clear_object (&source);
+ dropped = MAX (packets_lost, 0);
+ }
+ g_object_unref (session);
+
+ if (src->jitterbuffer) {
+ g_object_get (src->jitterbuffer, "stats", &stats, NULL);
+ gst_structure_get (stats, "num-pushed", G_TYPE_UINT64, &received,
+ "num-lost", G_TYPE_UINT64, &lost,
+ "rtx-count", G_TYPE_UINT64, &rtx_sent,
+ "num-duplicates", G_TYPE_UINT64, &duplicates,
+ "rtx-success-count", G_TYPE_UINT64, &recovered,
+ "rtx-rtt", G_TYPE_UINT64, &rtt, NULL);
+ gst_structure_free (stats);
+ }
+
+ gst_structure_set (ret, "dropped", G_TYPE_UINT64, dropped,
+ "received", G_TYPE_UINT64, received,
+ "recovered", G_TYPE_UINT64, recovered,
+ "permanently-lost", G_TYPE_UINT64, lost,
+ "duplicates", G_TYPE_UINT64, duplicates,
+ "retransmission-requests-sent", G_TYPE_UINT64, rtx_sent,
+ "rtx-roundtrip-time", G_TYPE_UINT64, rtt, NULL);
+
+ return ret;
+}
+
+static gboolean
+gst_rist_src_dump_stats (GstClock * clock, GstClockTime time, GstClockID id,
+ gpointer user_data)
+{
+ GstRistSrc *src = GST_RIST_SRC (user_data);
+ GstStructure *stats = gst_rist_src_create_stats (src);
+
+ gst_println ("%s: %" GST_PTR_FORMAT, GST_OBJECT_NAME (src), stats);
+
+ gst_structure_free (stats);
+ return TRUE;
+}
+
+static void
+gst_rist_src_enable_stats_interval (GstRistSrc * src)
+{
+ GstClock *clock;
+ GstClockTime start, interval;
+
+ if (src->stats_interval == 0)
+ return;
+
+ interval = src->stats_interval * GST_MSECOND;
+ clock = gst_system_clock_obtain ();
+ start = gst_clock_get_time (clock) + interval;
+
+ src->stats_cid = gst_clock_new_periodic_id (clock, start, interval);
+ gst_clock_id_wait_async (src->stats_cid, gst_rist_src_dump_stats,
+ gst_object_ref (src), (GDestroyNotify) gst_object_unref);
+
+ gst_object_unref (clock);
+}
+
+static void
+gst_rist_src_disable_stats_interval (GstRistSrc * src)
+{
+ if (src->stats_cid) {
+ gst_clock_id_unschedule (src->stats_cid);
+ gst_clock_id_unref (src->stats_cid);
+ src->stats_cid = NULL;
+ }
+}
+
+static void
+gst_rist_src_stop (GstRistSrc * src)
+{
+ GstPad *pad;
+
+ if (src->rtcp_recv_probe) {
+ pad = gst_element_get_static_pad (src->rtcp_src, "src");
+ gst_pad_remove_probe (pad, src->rtcp_recv_probe);
+ src->rtcp_recv_probe = 0;
+ gst_object_unref (pad);
+ }
+
+ pad = gst_element_get_static_pad (src->rtcp_sink, "sink");
+ gst_pad_remove_probe (pad, src->rtcp_send_probe);
+ src->rtcp_send_probe = 0;
+ gst_object_unref (pad);
+}
+
+static GstStateChangeReturn
+gst_rist_src_change_state (GstElement * element, GstStateChange transition)
+{
+ GstRistSrc *src = GST_RIST_SRC (element);
+ GstStateChangeReturn ret;
+
+ switch (transition) {
+ case GST_STATE_CHANGE_PAUSED_TO_READY:
+ gst_rist_src_disable_stats_interval (src);
+ break;
+ default:
+ break;
+ }
+
+ ret = GST_ELEMENT_CLASS (gst_rist_src_parent_class)->change_state (element,
+ transition);
+
+ switch (transition) {
+ case GST_STATE_CHANGE_NULL_TO_READY:
+ gst_rist_src_start (src);
+ break;
+ case GST_STATE_CHANGE_READY_TO_PAUSED:
+ gst_rist_src_enable_stats_interval (src);
+ break;
+ case GST_STATE_CHANGE_READY_TO_NULL:
+ gst_rist_src_stop (src);
+ break;
+ default:
+ break;
+ }
+
+ return ret;
+}
+
+static void
+gst_rist_src_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec)
+{
+ GstRistSrc *src = GST_RIST_SRC (object);
+ GstElement *session = NULL;
+ GstClockTime interval;
+ GstStructure *sdes;
+
+ if (src->construct_failed)
+ return;
+
+ switch (prop_id) {
+ case PROP_ADDRESS:
+ g_object_get_property (G_OBJECT (src->rtp_src), "address", value);
+ break;
+
+ case PROP_PORT:
+ g_object_get_property (G_OBJECT (src->rtp_src), "port", value);
+ break;
+
+ case PROP_RECEIVER_BUFFER:
+ g_object_get_property (G_OBJECT (src->rtpbin), "latency", value);
+ break;
+
+ case PROP_REORDER_SECTION:
+ GST_OBJECT_LOCK (src);
+ g_value_set_uint (value, src->reorder_section);
+ GST_OBJECT_UNLOCK (src);
+ break;
+
+ case PROP_MAX_RTX_RETRIES:
+ GST_OBJECT_LOCK (src);
+ g_value_set_uint (value, src->max_rtx_retries);
+ GST_OBJECT_UNLOCK (src);
+ break;
+
+ case PROP_MIN_RTCP_INTERVAL:
+ g_signal_emit_by_name (src->rtpbin, "get-session", 0, &session);
+ g_object_get (session, "rtcp-min-interval", &interval, NULL);
+ g_value_set_uint (value, (guint) (interval / GST_MSECOND));
+ g_object_unref (session);
+ break;
+
+ case PROP_MAX_RTCP_BANDWIDTH:
+ g_signal_emit_by_name (src->rtpbin, "get-session", 0, &session);
+ g_object_get_property (G_OBJECT (session), "rtcp-fraction", value);
+ g_object_unref (session);
+ break;
+
+ case PROP_STATS_UPDATE_INTERVAL:
+ g_value_set_uint (value, src->stats_interval);
+ break;
+
+ case PROP_STATS:
+ g_value_take_boxed (value, gst_rist_src_create_stats (src));
+ break;
+
+ case PROP_CNAME:
+ g_object_get (src->rtpbin, "sdes", &sdes, NULL);
+ g_value_set_string (value, gst_structure_get_string (sdes, "cname"));
+ gst_structure_free (sdes);
+ break;
+
+ case PROP_MULTICAST_LOOPBACK:
+ g_object_get_property (G_OBJECT (src->rtp_src), "loop", value);
+ break;
+
+ case PROP_MULTICAST_IFACE:
+ g_object_get_property (G_OBJECT (src->rtp_src), "multicast-iface", value);
+ break;
+
+ case PROP_MULTICAST_TTL:
+ g_value_set_int (value, src->multicast_ttl);
+ break;
+
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_rist_src_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ GstRistSrc *src = GST_RIST_SRC (object);
+ GstElement *session = NULL;
+ GstStructure *sdes;
+
+ if (src->construct_failed)
+ return;
+
+ switch (prop_id) {
+ case PROP_ADDRESS:
+ g_object_set_property (G_OBJECT (src->rtp_src), "address", value);
+ g_object_set_property (G_OBJECT (src->rtcp_src), "address", value);
+ break;
+
+ case PROP_PORT:{
+ guint port = g_value_get_uint (value);
+
+ /* According to 5.1.1, RTCP receiver port most be event number and RTCP
+ * port should be the RTP port + 1 */
+
+ if (port & 0x1) {
+ g_warning ("Invalid RIST port %u, should be an even number.", port);
+ return;
+ }
+
+ g_object_set (src->rtp_src, "port", port, NULL);
+ g_object_set (src->rtcp_src, "port", port + 1, NULL);
+ break;
+ }
+
+ case PROP_RECEIVER_BUFFER:
+ g_object_set (src->rtpbin, "latency", g_value_get_uint (value), NULL);
+ break;
+
+ case PROP_REORDER_SECTION:
+ GST_OBJECT_LOCK (src);
+ src->reorder_section = g_value_get_uint (value);
+ GST_OBJECT_UNLOCK (src);
+ break;
+
+ case PROP_MAX_RTX_RETRIES:
+ GST_OBJECT_LOCK (src);
+ src->max_rtx_retries = g_value_get_uint (value);
+ GST_OBJECT_UNLOCK (src);
+ break;
+
+ case PROP_MIN_RTCP_INTERVAL:
+ g_signal_emit_by_name (src->rtpbin, "get-session", 0, &session);
+ g_object_set (session, "rtcp-min-interval",
+ g_value_get_uint (value) * GST_MSECOND, NULL);
+ g_object_unref (session);
+ break;
+
+ case PROP_MAX_RTCP_BANDWIDTH:
+ g_signal_emit_by_name (src->rtpbin, "get-session", 0, &session);
+ g_object_set (session, "rtcp-fraction", g_value_get_double (value), NULL);
+ g_object_unref (session);
+ break;
+
+ case PROP_STATS_UPDATE_INTERVAL:
+ src->stats_interval = g_value_get_uint (value);
+ break;
+
+ case PROP_CNAME:
+ g_object_get (src->rtpbin, "sdes", &sdes, NULL);
+ gst_structure_set_value (sdes, "cname", value);
+ g_object_set (src->rtpbin, "sdes", sdes, NULL);
+ gst_structure_free (sdes);
+ break;
+
+ case PROP_MULTICAST_LOOPBACK:
+ g_object_set_property (G_OBJECT (src->rtp_src), "loop", value);
+ g_object_set_property (G_OBJECT (src->rtcp_src), "loop", value);
+ break;
+
+ case PROP_MULTICAST_IFACE:
+ g_object_set_property (G_OBJECT (src->rtp_src), "multicast-iface", value);
+ g_object_set_property (G_OBJECT (src->rtcp_src),
+ "multicast-iface", value);
+ break;
+
+ case PROP_MULTICAST_TTL:
+ src->multicast_ttl = g_value_get_int (value);
+ break;
+
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_rist_src_finalize (GObject * object)
+{
+ GstRistSrc *src = GST_RIST_SRC (object);
+
+ if (src->jitterbuffer)
+ gst_object_unref (src->jitterbuffer);
+
+ gst_object_unref (src->rtxbin);
+
+ G_OBJECT_CLASS (gst_rist_src_parent_class)->finalize (object);
+}
+
+static void
+gst_rist_src_class_init (GstRistSrcClass * klass)
+{
+ GstElementClass *element_class = (GstElementClass *) klass;
+ GObjectClass *object_class = (GObjectClass *) klass;
+
+ gst_element_class_set_metadata (element_class,
+ "RIST Source", "Source/Network",
+ "Source that implements RIST TR-06-1 streaming specification",
+ "Nicolas Dufresne <nicolas.dufresne@collabora.com");
+ gst_element_class_add_static_pad_template (element_class, &src_templ);
+
+ element_class->change_state = gst_rist_src_change_state;
+
+ object_class->get_property = gst_rist_src_get_property;
+ object_class->set_property = gst_rist_src_set_property;
+ object_class->finalize = gst_rist_src_finalize;
+
+ g_object_class_install_property (object_class, PROP_ADDRESS,
+ g_param_spec_string ("address", "Address",
+ "Address to receive packets from (can be IPv4 or IPv6).", "0.0.0.0",
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (object_class, PROP_PORT,
+ g_param_spec_uint ("port", "Port", "The port to listen for RTP packets, "
+ "RTCP port is derived from it, this port must be an even number.",
+ 2, 65534, 5004,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
+
+ g_object_class_install_property (object_class, PROP_RECEIVER_BUFFER,
+ g_param_spec_uint ("receiver-buffer", "Receiver Buffer",
+ "Buffering duration in ms", 0, G_MAXUINT, 1000,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
+
+ g_object_class_install_property (object_class, PROP_REORDER_SECTION,
+ g_param_spec_uint ("reorder-section", "Recorder Section",
+ "Time to wait before sending retransmission request in ms.",
+ 0, G_MAXUINT, 70,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
+
+ g_object_class_install_property (object_class, PROP_MAX_RTX_RETRIES,
+ g_param_spec_uint ("max-rtx-retries", "Maximum Retransmission Retries",
+ "The maximum number of retransmission requests for a lost packet.",
+ 0, G_MAXUINT, 7,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
+
+ g_object_class_install_property (object_class, PROP_MIN_RTCP_INTERVAL,
+ g_param_spec_uint ("min-rtcp-interval", "Minimum RTCP Intercal",
+ "The minimum interval in ms between two successive RTCP packets",
+ 0, 100, 100,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
+
+ g_object_class_install_property (object_class, PROP_MAX_RTCP_BANDWIDTH,
+ g_param_spec_double ("max-rtcp-bandwidth", "Maximum RTCP Bandwidth",
+ "The maximum bandwidth used for RTCP in fraction of RTP bandwdith",
+ 0.0, 0.05, 0.05,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
+
+ g_object_class_install_property (object_class, PROP_STATS_UPDATE_INTERVAL,
+ g_param_spec_uint ("stats-update-interval", "Statistics Update Interval",
+ "The interval between 'stats' update notification (0 disabled)",
+ 0, G_MAXUINT, 0,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
+
+ g_object_class_install_property (object_class, PROP_STATS,
+ g_param_spec_boxed ("stats", "Statistics",
+ "Statistic in a GstStructure named 'rist/x-receiver-stats'",
+ GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (object_class, PROP_CNAME,
+ g_param_spec_string ("cname", "CName",
+ "Set the CNAME in the SDES block of the receiver report.", NULL,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (object_class, PROP_MULTICAST_LOOPBACK,
+ g_param_spec_boolean ("multicast-loopback", "Multicast Loopback",
+ "When enabled, the packet will be received locally.", FALSE,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
+
+ g_object_class_install_property (object_class, PROP_MULTICAST_IFACE,
+ g_param_spec_string ("multicast-iface", "multicast-iface",
+ "The multicast interface to use to send packets.", NULL,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (object_class, PROP_MULTICAST_TTL,
+ g_param_spec_int ("multicast-ttl", "Multicast TTL",
+ "The multicast time-to-live parameter.", 0, 255, 1,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
+}
+
+static GstURIType
+gst_rist_src_uri_get_type (GType type)
+{
+ return GST_URI_SRC;
+}
+
+static const gchar *const *
+gst_rist_src_uri_get_protocols (GType type)
+{
+ static const char *protocols[] = { "rist", NULL };
+ return protocols;
+}
+
+static gchar *
+gst_rist_src_uri_get_uri (GstURIHandler * handler)
+{
+ GstRistSrc *src = GST_RIST_SRC (handler);
+ gchar *uri = NULL;
+
+ GST_OBJECT_LOCK (src);
+ if (src->uri)
+ uri = gst_uri_to_string (src->uri);
+ GST_OBJECT_UNLOCK (src);
+
+ return uri;
+}
+
+static void
+gst_rist_src_uri_query_foreach (const gchar * key, const gchar * value,
+ GObject * src)
+{
+ if (g_str_equal (key, "async-handling")) {
+ GST_WARNING_OBJECT (src, "Setting '%s' property from URI is not allowed.",
+ key);
+ return;
+ }
+
+ GST_DEBUG_OBJECT (src, "Setting property '%s' to '%s'", key, value);
+ gst_util_set_object_arg (src, key, value);
+}
+
+static gboolean
+gst_rist_src_uri_set_uri (GstURIHandler * handler, const gchar * uri,
+ GError ** error)
+{
+ GstRistSrc *src = GST_RIST_SRC (handler);
+ GstUri *gsturi;
+ GHashTable *query_table;
+
+ if (GST_STATE (src) >= GST_STATE_PAUSED) {
+ g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_STATE,
+ "Changing the URI on ristsrc when it is running is not supported");
+ GST_ERROR_OBJECT (src, "%s", (*error)->message);
+ return FALSE;
+ }
+
+ if (!(gsturi = gst_uri_from_string (uri))) {
+ g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
+ "Could not parse URI");
+ GST_ERROR_OBJECT (src, "%s", (*error)->message);
+ gst_uri_unref (gsturi);
+ return FALSE;
+ }
+
+ GST_OBJECT_LOCK (src);
+ if (src->uri)
+ gst_uri_unref (src->uri);
+ src->uri = gst_uri_ref (gsturi);
+ GST_OBJECT_UNLOCK (src);
+
+ g_object_set (src, "address", gst_uri_get_host (gsturi), NULL);
+ if (gst_uri_get_port (gsturi))
+ g_object_set (src, "port", gst_uri_get_port (gsturi), NULL);
+
+ query_table = gst_uri_get_query_table (gsturi);
+ if (query_table)
+ g_hash_table_foreach (query_table,
+ (GHFunc) gst_rist_src_uri_query_foreach, src);
+
+ gst_uri_unref (gsturi);
+ return TRUE;
+}
+
+static void
+gst_rist_src_uri_init (gpointer g_iface, gpointer iface_data)
+{
+ GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
+
+ iface->get_type = gst_rist_src_uri_get_type;
+ iface->get_protocols = gst_rist_src_uri_get_protocols;
+ iface->get_uri = gst_rist_src_uri_get_uri;
+ iface->set_uri = gst_rist_src_uri_set_uri;
+}
diff --git a/gst/rist/meson.build b/gst/rist/meson.build
new file mode 100644
index 000000000..9b8cf9897
--- /dev/null
+++ b/gst/rist/meson.build
@@ -0,0 +1,17 @@
+rist_sources = [
+ 'gstristrtxsend.c',
+ 'gstristrtxreceive.c',
+ 'gstristsrc.c',
+ 'gstristsink.c',
+ 'gstristplugin.c',
+]
+
+gstrist = library('gstrist',
+ rist_sources,
+ c_args : gst_plugins_bad_args,
+ include_directories : [configinc],
+ dependencies : [gstrtp_dep, gstnet_dep, gio_dep],
+ install : true,
+ install_dir : plugins_install_dir,
+)
+pkgconfig.generate(gstrist, install_dir : plugins_pkgconfig_install_dir)
diff --git a/meson_options.txt b/meson_options.txt
index 743951350..4d067f1e8 100644
--- a/meson_options.txt
+++ b/meson_options.txt
@@ -48,6 +48,7 @@ option('pnm', type : 'feature', value : 'auto')
option('proxy', type : 'feature', value : 'auto')
option('rawparse', type : 'feature', value : 'auto')
option('removesilence', type : 'feature', value : 'auto')
+option('rist', type : 'feature', value : 'auto')
option('sdp', type : 'feature', value : 'auto')
option('segmentclip', type : 'feature', value : 'auto')
option('siren', type : 'feature', value : 'auto')