diff options
-rw-r--r-- | configure.ac | 2 | ||||
-rw-r--r-- | gst/meson.build | 4 | ||||
-rw-r--r-- | gst/rist/Makefile.am | 23 | ||||
-rw-r--r-- | gst/rist/gstrist.h | 59 | ||||
-rw-r--r-- | gst/rist/gstristplugin.c | 50 | ||||
-rw-r--r-- | gst/rist/gstristrtxreceive.c | 302 | ||||
-rw-r--r-- | gst/rist/gstristrtxsend.c | 772 | ||||
-rw-r--r-- | gst/rist/gstristsink.c | 838 | ||||
-rw-r--r-- | gst/rist/gstristsrc.c | 1021 | ||||
-rw-r--r-- | gst/rist/meson.build | 17 | ||||
-rw-r--r-- | meson_options.txt | 1 |
11 files changed, 3087 insertions, 2 deletions
diff --git a/configure.ac b/configure.ac index 5114e36fc..cb72ee853 100644 --- a/configure.ac +++ b/configure.ac @@ -478,6 +478,7 @@ AG_GST_CHECK_PLUGIN(pnm) AG_GST_CHECK_PLUGIN(proxy) AG_GST_CHECK_PLUGIN(rawparse) AG_GST_CHECK_PLUGIN(removesilence) +AG_GST_CHECK_PLUGIN(rist) AG_GST_CHECK_PLUGIN(sdp) AG_GST_CHECK_PLUGIN(segmentclip) AG_GST_CHECK_PLUGIN(siren) @@ -2556,6 +2557,7 @@ gst/pnm/Makefile gst/proxy/Makefile gst/rawparse/Makefile gst/removesilence/Makefile +gst/rist/Makefile gst/sdp/Makefile gst/segmentclip/Makefile gst/siren/Makefile diff --git a/gst/meson.build b/gst/meson.build index 8c8349802..f6b306c1e 100644 --- a/gst/meson.build +++ b/gst/meson.build @@ -8,8 +8,8 @@ foreach plugin : ['accurip', 'adpcmdec', 'adpcmenc', 'aiff', 'asfmux', 'ivfparse', 'ivtc', 'jp2kdecimator', 'jpegformat', 'librfb', 'midi', 'mpegdemux', 'mpegpsmux', 'mpegtsdemux', 'mpegtsmux', 'mxf', 'netsim', 'onvif', 'pcapparse', 'pnm', 'proxy', - 'rawparse', 'removesilence', 'sdp', 'segmentclip', 'siren', - 'smooth', 'speed', 'subenc', 'timecode', + 'rawparse', 'removesilence', 'rist', 'sdp', 'segmentclip', + 'siren', 'smooth', 'speed', 'subenc', 'timecode', 'videofilters', 'videoframe_audiolevel', 'videoparsers', 'videosignal', 'vmnc', 'y4m', 'yadif'] if not get_option(plugin).disabled() diff --git a/gst/rist/Makefile.am b/gst/rist/Makefile.am new file mode 100644 index 000000000..d198f3f8e --- /dev/null +++ b/gst/rist/Makefile.am @@ -0,0 +1,23 @@ +plugin_LTLIBRARIES = libgstrist.la + +libgstrist_la_SOURCES = \ + gstristsrc.c \ + gstristsink.c \ + gstristrtxsend.c \ + gstristrtxreceive.c \ + gstristplugin.c + +noinst_HEADERS = \ + gstrist.h + +libgstrist_la_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) \ + $(GST_CFLAGS) \ + $(GIO_CFLAGS) +libgstrist_la_LIBADD = $(GST_PLUGINS_BASE_LIBS) \ + $(GST_BASE_LIBS) \ + -lgstrtp-@GST_API_VERSION@ \ + $(GST_NET_LIBS) \ + $(GST_LIBS) \ + $(GIO_LIBS) +libgstrist_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) +libgstrist_la_LIBTOOLFLAGS = $(GST_PLUGIN_LIBTOOLFLAGS) diff --git a/gst/rist/gstrist.h b/gst/rist/gstrist.h new file mode 100644 index 000000000..b4bcb6f04 --- /dev/null +++ b/gst/rist/gstrist.h @@ -0,0 +1,59 @@ +/* GStreamer RIST plugin + * Copyright (C) 2019 Net Insight AB + * Author: Nicolas Dufresne <nicolas.dufresne@collabora.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include <gst/gst.h> + +#ifndef __GST_RIST_H__ +#define __GST_RIST_H__ + +#define GST_TYPE_RIST_RTX_RECEIVE (gst_rist_rtx_receive_get_type()) +#define GST_RIST_RTX_RECEIVE(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RIST_RTX_RECEIVE, GstRistRtxReceive)) +typedef struct _GstRistRtxReceive GstRistRtxReceive; +typedef struct { + GstElementClass parent_class; +} GstRistRtxReceiveClass; +GType gst_rist_rtx_receive_get_type (void); + +#define GST_TYPE_RIST_RTX_SEND (gst_rist_rtx_send_get_type()) +#define GST_RIST_RTX_SEND(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RIST_RTX_SEND, GstRistRtxSend)) +typedef struct _GstRistRtxSend GstRistRtxSend; +typedef struct { + GstElementClass parent_class; +} GstRistRtxSendClass; +GType gst_rist_rtx_send_get_type (void); + +#define GST_TYPE_RIST_SRC (gst_rist_src_get_type()) +#define GST_RIST_SRC(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RIST_SRC,GstRistSrc)) +typedef struct _GstRistSrc GstRistSrc; +typedef struct { + GstBinClass parent; +} GstRistSrcClass; +GType gst_rist_src_get_type (void); + + +#define GST_TYPE_RIST_SINK (gst_rist_sink_get_type()) +#define GST_RIST_SINK(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RIST_SINK,GstRistSink)) +typedef struct _GstRistSink GstRistSink; +typedef struct { + GstBinClass parent; +} GstRistSinkClass; +GType gst_rist_sink_get_type (void); + +#endif diff --git a/gst/rist/gstristplugin.c b/gst/rist/gstristplugin.c new file mode 100644 index 000000000..cb6dcd361 --- /dev/null +++ b/gst/rist/gstristplugin.c @@ -0,0 +1,50 @@ +/* GStreamer RIST plugin + * Copyright (C) 2019 Net Insight AB + * Author: Nicolas Dufresne <nicolas.dufresne@collabora.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "gstrist.h" + +static gboolean +plugin_init (GstPlugin * plugin) +{ + if (!gst_element_register (plugin, "ristsrc", GST_RANK_PRIMARY, + GST_TYPE_RIST_SRC)) + return FALSE; + if (!gst_element_register (plugin, "ristsink", GST_RANK_PRIMARY, + GST_TYPE_RIST_SINK)) + return FALSE; + if (!gst_element_register (plugin, "ristrtxsend", GST_RANK_NONE, + GST_TYPE_RIST_RTX_SEND)) + return FALSE; + if (!gst_element_register (plugin, "ristrtxreceive", GST_RANK_NONE, + GST_TYPE_RIST_RTX_RECEIVE)) + return FALSE; + + return TRUE; +} + +GST_PLUGIN_DEFINE (GST_VERSION_MAJOR, + GST_VERSION_MINOR, + rist, + "Source and Sink for RIST TR-06-1 streaming specification", + plugin_init, VERSION, GST_LICENSE, GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN) diff --git a/gst/rist/gstristrtxreceive.c b/gst/rist/gstristrtxreceive.c new file mode 100644 index 000000000..fe8a6d5fa --- /dev/null +++ b/gst/rist/gstristrtxreceive.c @@ -0,0 +1,302 @@ +/* RTP Retransmission receiver element for GStreamer + * + * gstrtprtxreceive.c: + * + * Copyright (C) 2013-2019 Collabora Ltd. + * @author Julien Isorce <julien.isorce@collabora.co.uk> + * Nicolas Dufresne <nicolas.dufresne@collabora.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * SECTION:element-ristrtxreceive + * @title: ristrtxreceive + * @see_also: ristrtxsend + * + * This element translates RIST RTX packets into its original form with the + * %GST_RTP_BUFFER_FLAG_RETRANSMISSION flag set. This element is intented to + * be used by ristsrc element. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <gst/gst.h> +#include <gst/rtp/gstrtpbuffer.h> + +#include "gstrist.h" + +GST_DEBUG_CATEGORY_STATIC (gst_rist_rtx_receive_debug); +#define GST_CAT_DEFAULT gst_rist_rtx_receive_debug + +enum +{ + PROP_0, + PROP_NUM_RTX_REQUESTS, + PROP_NUM_RTX_PACKETS, + PROP_RIST +}; + +static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("application/x-rtp") + ); + +static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("application/x-rtp") + ); + +struct _GstRistRtxReceive +{ + GstElement element; + + /* pad */ + GstPad *sinkpad; + GstPad *srcpad; + + /* statistics */ + guint num_rtx_requests; + guint num_rtx_packets; + + GstClockTime last_time; +}; + +static gboolean gst_rist_rtx_receive_src_event (GstPad * pad, + GstObject * parent, GstEvent * event); +static GstFlowReturn gst_rist_rtx_receive_chain (GstPad * pad, + GstObject * parent, GstBuffer * buffer); +static GstStateChangeReturn gst_rist_rtx_receive_change_state (GstElement * + element, GstStateChange transition); +static void gst_rist_rtx_receive_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); + +G_DEFINE_TYPE_WITH_CODE (GstRistRtxReceive, gst_rist_rtx_receive, + GST_TYPE_ELEMENT, GST_DEBUG_CATEGORY_INIT (gst_rist_rtx_receive_debug, + "ristrtxreceive", 0, "RIST retransmission receiver")); + +static void +gst_rist_rtx_receive_class_init (GstRistRtxReceiveClass * klass) +{ + GObjectClass *gobject_class; + GstElementClass *gstelement_class; + + gobject_class = (GObjectClass *) klass; + gstelement_class = (GstElementClass *) klass; + + gobject_class->get_property = gst_rist_rtx_receive_get_property; + + g_object_class_install_property (gobject_class, PROP_NUM_RTX_REQUESTS, + g_param_spec_uint ("num-rtx-requests", "Num RTX Requests", + "Number of retransmission events received", 0, G_MAXUINT, + 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_NUM_RTX_PACKETS, + g_param_spec_uint ("num-rtx-packets", "Num RTX Packets", + " Number of retransmission packets received", 0, G_MAXUINT, + 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + + gst_element_class_add_static_pad_template (gstelement_class, &src_factory); + gst_element_class_add_static_pad_template (gstelement_class, &sink_factory); + + gst_element_class_set_static_metadata (gstelement_class, + "RIST Retransmission receiver", "Codec", + "Receive retransmitted RIST packets according to VSF TR-06-1", + "Nicolas Dufresne <nicolas.dufresne@collabora.com>"); + + gstelement_class->change_state = + GST_DEBUG_FUNCPTR (gst_rist_rtx_receive_change_state); +} + +static void +gst_rist_rtx_receive_reset (GstRistRtxReceive * rtx) +{ + GST_OBJECT_LOCK (rtx); + rtx->num_rtx_requests = 0; + rtx->num_rtx_packets = 0; + GST_OBJECT_UNLOCK (rtx); +} + +static void +gst_rist_rtx_receive_init (GstRistRtxReceive * rtx) +{ + GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtx); + + rtx->srcpad = + gst_pad_new_from_template (gst_element_class_get_pad_template (klass, + "src"), "src"); + GST_PAD_SET_PROXY_CAPS (rtx->srcpad); + GST_PAD_SET_PROXY_ALLOCATION (rtx->srcpad); + gst_pad_set_event_function (rtx->srcpad, + GST_DEBUG_FUNCPTR (gst_rist_rtx_receive_src_event)); + gst_element_add_pad (GST_ELEMENT (rtx), rtx->srcpad); + + rtx->sinkpad = + gst_pad_new_from_template (gst_element_class_get_pad_template (klass, + "sink"), "sink"); + GST_PAD_SET_PROXY_CAPS (rtx->sinkpad); + GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad); + gst_pad_set_chain_function (rtx->sinkpad, + GST_DEBUG_FUNCPTR (gst_rist_rtx_receive_chain)); + gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad); +} + +static gboolean +gst_rist_rtx_receive_src_event (GstPad * pad, GstObject * parent, + GstEvent * event) +{ + GstRistRtxReceive *rtx = GST_RIST_RTX_RECEIVE (parent); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_CUSTOM_UPSTREAM: + { + const GstStructure *s = gst_event_get_structure (event); + + /* This event usually comes from the downstream gstrtpjitterbuffer */ + if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) { +#ifndef GST_DISABLE_GST_DEBUG + guint seqnum = 0; + guint ssrc = 0; + + /* retrieve seqnum of the packet that need to be retransmitted */ + if (!gst_structure_get_uint (s, "seqnum", &seqnum)) + seqnum = -1; + + /* retrieve ssrc of the packet that need to be retransmitted + * it's useful when reconstructing the original packet from the rtx packet */ + if (!gst_structure_get_uint (s, "ssrc", &ssrc)) + ssrc = -1; + + GST_DEBUG_OBJECT (rtx, "got rtx request for seqnum: %u, ssrc: %X", + seqnum, ssrc); +#endif + + GST_OBJECT_LOCK (rtx); + /* increase number of seen requests for our statistics */ + ++rtx->num_rtx_requests; + GST_OBJECT_UNLOCK (rtx); + } + break; + } + default: + break; + } + + return gst_pad_event_default (pad, parent, event); +} + +static GstFlowReturn +gst_rist_rtx_receive_chain (GstPad * pad, GstObject * parent, + GstBuffer * buffer) +{ + GstRistRtxReceive *rtx = GST_RIST_RTX_RECEIVE (parent); + GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; + guint32 ssrc = 0; + guint16 seqnum = 0; + gboolean is_rtx; + + /* map current rtp packet to parse its header */ + if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)) + goto invalid_buffer; + + ssrc = gst_rtp_buffer_get_ssrc (&rtp); + seqnum = gst_rtp_buffer_get_seq (&rtp); + + /* check if we have a retransmission packet (this information comes from SDP) */ + GST_OBJECT_LOCK (rtx); + + /* RIST sets SSRC LSB to 1 to indicate an RTC packet */ + is_rtx = ssrc & 0x1; + rtx->last_time = GST_BUFFER_PTS (buffer); + + if (is_rtx) + /* increase our statistic */ + ++rtx->num_rtx_packets; + + GST_OBJECT_UNLOCK (rtx); + + /* create the retransmission packet */ + if (is_rtx) { + GST_DEBUG_OBJECT (rtx, + "Recovered packet from RIST RTX seqnum:%u ssrc: %u", + gst_rtp_buffer_get_seq (&rtp), gst_rtp_buffer_get_ssrc (&rtp)); + gst_rtp_buffer_set_ssrc (&rtp, ssrc & 0xFFFFFFFE); + GST_BUFFER_FLAG_SET (buffer, GST_RTP_BUFFER_FLAG_RETRANSMISSION); + } + + gst_rtp_buffer_unmap (&rtp); + + GST_TRACE_OBJECT (rtx, "pushing packet seqnum:%u from master stream " + "ssrc: %X", seqnum, ssrc); + return gst_pad_push (rtx->srcpad, buffer); + +invalid_buffer: + { + GST_ELEMENT_WARNING (rtx, STREAM, DECODE, (NULL), + ("Received invalid RTP payload, dropping")); + gst_buffer_unref (buffer); + return GST_FLOW_OK; + } +} + +static void +gst_rist_rtx_receive_get_property (GObject * object, + guint prop_id, GValue * value, GParamSpec * pspec) +{ + GstRistRtxReceive *rtx = GST_RIST_RTX_RECEIVE (object); + + switch (prop_id) { + case PROP_NUM_RTX_REQUESTS: + GST_OBJECT_LOCK (rtx); + g_value_set_uint (value, rtx->num_rtx_requests); + GST_OBJECT_UNLOCK (rtx); + break; + case PROP_NUM_RTX_PACKETS: + GST_OBJECT_LOCK (rtx); + g_value_set_uint (value, rtx->num_rtx_packets); + GST_OBJECT_UNLOCK (rtx); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static GstStateChangeReturn +gst_rist_rtx_receive_change_state (GstElement * element, + GstStateChange transition) +{ + GstRistRtxReceive *rtx = GST_RIST_RTX_RECEIVE (element); + GstStateChangeReturn ret; + + ret = + GST_ELEMENT_CLASS (gst_rist_rtx_receive_parent_class)->change_state + (element, transition); + + switch (transition) { + case GST_STATE_CHANGE_PAUSED_TO_READY: + gst_rist_rtx_receive_reset (rtx); + break; + default: + break; + } + + return ret; +} diff --git a/gst/rist/gstristrtxsend.c b/gst/rist/gstristrtxsend.c new file mode 100644 index 000000000..927fb1cf0 --- /dev/null +++ b/gst/rist/gstristrtxsend.c @@ -0,0 +1,772 @@ +/* RIST Retransmission sender element for GStreamer + * + * gsristprtxsend.c: + * + * Copyright (C) 2013-2019 Collabora Ltd. + * @author Julien Isorce <julien.isorce@collabora.co.uk> + * Nicoas Dufresne <nicolas.dufresne@collabora.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * SECTION:element-ristrtxsend + * @title: ristrtxsend + * @see_also: ristrtxreceive + * + * This elements replies to custom events 'GstRTPRetransmissionRequest' and + * when available sends in RIST form the lost packet. This element is intented + * to be used by ristsink element. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <gst/gst.h> +#include <gst/rtp/gstrtpbuffer.h> +#include <gst/base/gstdataqueue.h> + +#include "gstrist.h" + +GST_DEBUG_CATEGORY_STATIC (gst_rist_rtx_send_debug); +#define GST_CAT_DEFAULT gst_rist_rtx_send_debug + +#define DEFAULT_MAX_SIZE_TIME 0 +#define DEFAULT_MAX_SIZE_PACKETS 100 + +enum +{ + PROP_0, + PROP_MAX_SIZE_TIME, + PROP_MAX_SIZE_PACKETS, + PROP_NUM_RTX_REQUESTS, + PROP_NUM_RTX_PACKETS, +}; + +static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("application/x-rtp") + ); + +static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("application/x-rtp, " "clock-rate = (int) [1, MAX]") + ); + +struct _GstRistRtxSend +{ + GstElement element; + + /* pad */ + GstPad *sinkpad; + GstPad *srcpad; + + /* rtp packets that will be pushed out */ + GstDataQueue *queue; + + /* ssrc -> SSRCRtxData */ + GHashTable *ssrc_data; + /* rtx ssrc -> master ssrc */ + GHashTable *rtx_ssrcs; + + /* buffering control properties */ + guint max_size_time; + guint max_size_packets; + + /* statistics */ + guint num_rtx_requests; + guint num_rtx_packets; +}; + +static gboolean gst_rist_rtx_send_queue_check_full (GstDataQueue * queue, + guint visible, guint bytes, guint64 time, gpointer checkdata); + +static gboolean gst_rist_rtx_send_src_event (GstPad * pad, GstObject * parent, + GstEvent * event); +static gboolean gst_rist_rtx_send_sink_event (GstPad * pad, GstObject * parent, + GstEvent * event); +static GstFlowReturn gst_rist_rtx_send_chain (GstPad * pad, GstObject * parent, + GstBuffer * buffer); +static GstFlowReturn gst_rist_rtx_send_chain_list (GstPad * pad, + GstObject * parent, GstBufferList * list); + +static void gst_rist_rtx_send_src_loop (GstRistRtxSend * rtx); +static gboolean gst_rist_rtx_send_activate_mode (GstPad * pad, + GstObject * parent, GstPadMode mode, gboolean active); + +static GstStateChangeReturn gst_rist_rtx_send_change_state (GstElement * + element, GstStateChange transition); + +static void gst_rist_rtx_send_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); +static void gst_rist_rtx_send_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); +static void gst_rist_rtx_send_finalize (GObject * object); + +G_DEFINE_TYPE_WITH_CODE (GstRistRtxSend, gst_rist_rtx_send, GST_TYPE_ELEMENT, + GST_DEBUG_CATEGORY_INIT (gst_rist_rtx_send_debug, "ristrtxsend", 0, + "RIST retransmission sender")); + +typedef struct +{ + guint16 seqnum; + guint32 timestamp; + GstBuffer *buffer; +} BufferQueueItem; + +static void +buffer_queue_item_free (BufferQueueItem * item) +{ + gst_buffer_unref (item->buffer); + g_slice_free (BufferQueueItem, item); +} + +typedef struct +{ + guint32 rtx_ssrc; + guint16 seqnum_base, next_seqnum; + gint clock_rate; + + /* history of rtp packets */ + GSequence *queue; +} SSRCRtxData; + +static SSRCRtxData * +ssrc_rtx_data_new (guint32 rtx_ssrc) +{ + SSRCRtxData *data = g_slice_new0 (SSRCRtxData); + + data->rtx_ssrc = rtx_ssrc; + data->next_seqnum = data->seqnum_base = g_random_int_range (0, G_MAXUINT16); + data->queue = g_sequence_new ((GDestroyNotify) buffer_queue_item_free); + + return data; +} + +static void +ssrc_rtx_data_free (SSRCRtxData * data) +{ + g_sequence_free (data->queue); + g_slice_free (SSRCRtxData, data); +} + +static void +gst_rist_rtx_send_class_init (GstRistRtxSendClass * klass) +{ + GObjectClass *gobject_class; + GstElementClass *gstelement_class; + + gobject_class = (GObjectClass *) klass; + gstelement_class = (GstElementClass *) klass; + + gobject_class->get_property = gst_rist_rtx_send_get_property; + gobject_class->set_property = gst_rist_rtx_send_set_property; + gobject_class->finalize = gst_rist_rtx_send_finalize; + + g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME, + g_param_spec_uint ("max-size-time", "Max Size Time", + "Amount of ms to queue (0 = unlimited)", 0, G_MAXUINT, + DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_MAX_SIZE_PACKETS, + g_param_spec_uint ("max-size-packets", "Max Size Packets", + "Amount of packets to queue (0 = unlimited)", 0, G_MAXINT16, + DEFAULT_MAX_SIZE_PACKETS, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_NUM_RTX_REQUESTS, + g_param_spec_uint ("num-rtx-requests", "Num RTX Requests", + "Number of retransmission events received", 0, G_MAXUINT, + 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_NUM_RTX_PACKETS, + g_param_spec_uint ("num-rtx-packets", "Num RTX Packets", + " Number of retransmission packets sent", 0, G_MAXUINT, + 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + + gst_element_class_add_static_pad_template (gstelement_class, &src_factory); + gst_element_class_add_static_pad_template (gstelement_class, &sink_factory); + + gst_element_class_set_static_metadata (gstelement_class, + "RIST Retransmission Sender", "Codec", + "Retransmit RTP packets when needed, according to VSF TR-06-1", + "Nicolas Dufresne <nicolas.dufresne@collabora.com>"); + + gstelement_class->change_state = + GST_DEBUG_FUNCPTR (gst_rist_rtx_send_change_state); +} + +static void +gst_rist_rtx_send_reset (GstRistRtxSend * rtx) +{ + GST_OBJECT_LOCK (rtx); + gst_data_queue_flush (rtx->queue); + g_hash_table_remove_all (rtx->ssrc_data); + g_hash_table_remove_all (rtx->rtx_ssrcs); + rtx->num_rtx_requests = 0; + rtx->num_rtx_packets = 0; + GST_OBJECT_UNLOCK (rtx); +} + +static void +gst_rist_rtx_send_finalize (GObject * object) +{ + GstRistRtxSend *rtx = GST_RIST_RTX_SEND (object); + + g_hash_table_unref (rtx->ssrc_data); + g_hash_table_unref (rtx->rtx_ssrcs); + g_object_unref (rtx->queue); + + G_OBJECT_CLASS (gst_rist_rtx_send_parent_class)->finalize (object); +} + +static void +gst_rist_rtx_send_init (GstRistRtxSend * rtx) +{ + GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtx); + + rtx->srcpad = + gst_pad_new_from_template (gst_element_class_get_pad_template (klass, + "src"), "src"); + GST_PAD_SET_PROXY_CAPS (rtx->srcpad); + GST_PAD_SET_PROXY_ALLOCATION (rtx->srcpad); + gst_pad_set_event_function (rtx->srcpad, + GST_DEBUG_FUNCPTR (gst_rist_rtx_send_src_event)); + gst_pad_set_activatemode_function (rtx->srcpad, + GST_DEBUG_FUNCPTR (gst_rist_rtx_send_activate_mode)); + gst_element_add_pad (GST_ELEMENT (rtx), rtx->srcpad); + + rtx->sinkpad = + gst_pad_new_from_template (gst_element_class_get_pad_template (klass, + "sink"), "sink"); + GST_PAD_SET_PROXY_CAPS (rtx->sinkpad); + GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad); + gst_pad_set_event_function (rtx->sinkpad, + GST_DEBUG_FUNCPTR (gst_rist_rtx_send_sink_event)); + gst_pad_set_chain_function (rtx->sinkpad, + GST_DEBUG_FUNCPTR (gst_rist_rtx_send_chain)); + gst_pad_set_chain_list_function (rtx->sinkpad, + GST_DEBUG_FUNCPTR (gst_rist_rtx_send_chain_list)); + gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad); + + rtx->queue = gst_data_queue_new (gst_rist_rtx_send_queue_check_full, NULL, + NULL, rtx); + rtx->ssrc_data = g_hash_table_new_full (g_direct_hash, g_direct_equal, + NULL, (GDestroyNotify) ssrc_rtx_data_free); + rtx->rtx_ssrcs = g_hash_table_new (g_direct_hash, g_direct_equal); + + rtx->max_size_time = DEFAULT_MAX_SIZE_TIME; + rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS; +} + +static void +gst_rist_rtx_send_set_flushing (GstRistRtxSend * rtx, gboolean flush) +{ + GST_OBJECT_LOCK (rtx); + gst_data_queue_set_flushing (rtx->queue, flush); + gst_data_queue_flush (rtx->queue); + GST_OBJECT_UNLOCK (rtx); +} + +static gboolean +gst_rist_rtx_send_queue_check_full (GstDataQueue * queue, + guint visible, guint bytes, guint64 time, gpointer checkdata) +{ + return FALSE; +} + +static void +gst_rtp_rtx_data_queue_item_free (gpointer item) +{ + GstDataQueueItem *data = item; + if (data->object) + gst_mini_object_unref (data->object); + g_slice_free (GstDataQueueItem, data); +} + +static gboolean +gst_rist_rtx_send_push_out (GstRistRtxSend * rtx, gpointer object) +{ + GstDataQueueItem *data; + gboolean success; + + data = g_slice_new0 (GstDataQueueItem); + data->object = GST_MINI_OBJECT (object); + data->size = 1; + data->duration = 1; + data->visible = TRUE; + data->destroy = gst_rtp_rtx_data_queue_item_free; + + success = gst_data_queue_push (rtx->queue, data); + + if (!success) + data->destroy (data); + + return success; +} + +static SSRCRtxData * +gst_rist_rtx_send_get_ssrc_data (GstRistRtxSend * rtx, guint32 ssrc) +{ + SSRCRtxData *data; + guint32 rtx_ssrc = 0; + + data = g_hash_table_lookup (rtx->ssrc_data, GUINT_TO_POINTER (ssrc)); + if (!data) { + /* See 5.3.2 Retransmitted Packets, orignal packet have SSRC LSB set to + * 0, while RTX packet have LSB set to 1 */ + rtx_ssrc = ssrc + 1; + data = ssrc_rtx_data_new (rtx_ssrc); + g_hash_table_insert (rtx->ssrc_data, GUINT_TO_POINTER (ssrc), data); + g_hash_table_insert (rtx->rtx_ssrcs, GUINT_TO_POINTER (rtx_ssrc), + GUINT_TO_POINTER (ssrc)); + } + + return data; +} + +/* + * see RIST TR-06-1 5.3.2 Retransmitted Packets + * + * RIST simply resend the packet verbatim, with SSRC+1, the defaults SSRC always + * have the LSB set to 0, so we can differentiate the retransmission and the + * normal packet. + */ +static GstBuffer * +gst_rtp_rist_buffer_new (GstRistRtxSend * rtx, GstBuffer * buffer, guint32 ssrc) +{ + GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; + + buffer = gst_buffer_copy_deep (buffer); + gst_rtp_buffer_map (buffer, GST_MAP_WRITE, &rtp); + gst_rtp_buffer_set_ssrc (&rtp, ssrc + 1); + gst_rtp_buffer_unmap (&rtp); + + return buffer; +} + +static gint +buffer_queue_items_cmp (BufferQueueItem * a, BufferQueueItem * b, + gpointer user_data) +{ + /* gst_rtp_buffer_compare_seqnum returns the opposite of what we want, + * it returns negative when seqnum1 > seqnum2 and we want negative + * when b > a, i.e. a is smaller, so it comes first in the sequence */ + return gst_rtp_buffer_compare_seqnum (b->seqnum, a->seqnum); +} + +static gboolean +gst_rist_rtx_send_src_event (GstPad * pad, GstObject * parent, GstEvent * event) +{ + GstRistRtxSend *rtx = GST_RIST_RTX_SEND (parent); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_CUSTOM_UPSTREAM: + { + const GstStructure *s = gst_event_get_structure (event); + + /* This event usually comes from the downstream gstrtpsession */ + if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) { + guint seqnum = 0; + guint ssrc = 0; + GstBuffer *rtx_buf = NULL; + + /* retrieve seqnum of the packet that need to be retransmitted */ + if (!gst_structure_get_uint (s, "seqnum", &seqnum)) + seqnum = -1; + + /* retrieve ssrc of the packet that need to be retransmitted */ + if (!gst_structure_get_uint (s, "ssrc", &ssrc)) + ssrc = -1; + + GST_DEBUG_OBJECT (rtx, "got rtx request for seqnum: %u, ssrc: %X", + seqnum, ssrc); + + GST_OBJECT_LOCK (rtx); + /* check if request is for us */ + if (g_hash_table_contains (rtx->ssrc_data, GUINT_TO_POINTER (ssrc))) { + SSRCRtxData *data; + GSequenceIter *iter; + BufferQueueItem search_item; + + /* update statistics */ + ++rtx->num_rtx_requests; + + data = gst_rist_rtx_send_get_ssrc_data (rtx, ssrc); + + search_item.seqnum = seqnum; + iter = g_sequence_lookup (data->queue, &search_item, + (GCompareDataFunc) buffer_queue_items_cmp, NULL); + if (iter) { + BufferQueueItem *item = g_sequence_get (iter); + GST_LOG_OBJECT (rtx, "found %u", item->seqnum); + rtx_buf = gst_rtp_rist_buffer_new (rtx, item->buffer, ssrc); + } +#ifndef GST_DISABLE_DEBUG + else { + BufferQueueItem *item = NULL; + + iter = g_sequence_get_begin_iter (data->queue); + if (!g_sequence_iter_is_end (iter)) + item = g_sequence_get (iter); + + if (item && seqnum < item->seqnum) { + GST_DEBUG_OBJECT (rtx, "requested seqnum %u has already been " + "removed from the rtx queue; the first available is %u", + seqnum, item->seqnum); + } else { + GST_WARNING_OBJECT (rtx, "requested seqnum %u has not been " + "transmitted yet in the original stream; either the remote end " + "is not configured correctly, or the source is too slow", + seqnum); + } +#endif + } + } + GST_OBJECT_UNLOCK (rtx); + + if (rtx_buf) + gst_rist_rtx_send_push_out (rtx, rtx_buf); + + gst_event_unref (event); + return TRUE; + } + break; + } + default: + break; + } + + return gst_pad_event_default (pad, parent, event); +} + +static gboolean +gst_rist_rtx_send_sink_event (GstPad * pad, GstObject * parent, + GstEvent * event) +{ + GstRistRtxSend *rtx = GST_RIST_RTX_SEND (parent); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_FLUSH_START: + gst_pad_push_event (rtx->srcpad, event); + gst_rist_rtx_send_set_flushing (rtx, TRUE); + gst_pad_pause_task (rtx->srcpad); + return TRUE; + case GST_EVENT_FLUSH_STOP: + gst_pad_push_event (rtx->srcpad, event); + gst_rist_rtx_send_set_flushing (rtx, FALSE); + gst_pad_start_task (rtx->srcpad, + (GstTaskFunction) gst_rist_rtx_send_src_loop, rtx, NULL); + return TRUE; + case GST_EVENT_EOS: + GST_INFO_OBJECT (rtx, "Got EOS - enqueueing it"); + gst_rist_rtx_send_push_out (rtx, event); + return TRUE; + case GST_EVENT_CAPS: + { + GstCaps *caps; + GstStructure *s; + guint ssrc; + gint payload; + SSRCRtxData *data; + + gst_event_parse_caps (event, &caps); + + s = gst_caps_get_structure (caps, 0); + if (!gst_structure_get_uint (s, "ssrc", &ssrc)) + ssrc = -1; + if (!gst_structure_get_int (s, "payload", &payload)) + payload = -1; + + if (payload == -1) + GST_WARNING_OBJECT (rtx, "No payload in caps"); + + GST_OBJECT_LOCK (rtx); + data = gst_rist_rtx_send_get_ssrc_data (rtx, ssrc); + + GST_DEBUG_OBJECT (rtx, + "got caps for payload: %d->%d, ssrc: %u : %" GST_PTR_FORMAT, + payload, ssrc, data->rtx_ssrc, caps); + + gst_structure_get_int (s, "clock-rate", &data->clock_rate); + + /* The session might need to know the RTX ssrc */ + caps = gst_caps_copy (caps); + gst_caps_set_simple (caps, "rtx-ssrc", G_TYPE_UINT, data->rtx_ssrc, + "rtx-seqnum-offset", G_TYPE_UINT, data->seqnum_base, NULL); + + GST_DEBUG_OBJECT (rtx, "got clock-rate from caps: %d for ssrc: %u", + data->clock_rate, ssrc); + GST_OBJECT_UNLOCK (rtx); + + gst_event_unref (event); + event = gst_event_new_caps (caps); + gst_caps_unref (caps); + break; + } + default: + break; + } + + return gst_pad_event_default (pad, parent, event); +} + +/* like rtp_jitter_buffer_get_ts_diff() */ +static guint32 +gst_rist_rtx_send_get_ts_diff (SSRCRtxData * data) +{ + guint64 high_ts, low_ts; + BufferQueueItem *high_buf, *low_buf; + guint32 result; + + high_buf = + g_sequence_get (g_sequence_iter_prev (g_sequence_get_end_iter + (data->queue))); + low_buf = g_sequence_get (g_sequence_get_begin_iter (data->queue)); + + if (!high_buf || !low_buf || high_buf == low_buf) + return 0; + + high_ts = high_buf->timestamp; + low_ts = low_buf->timestamp; + + /* it needs to work if ts wraps */ + if (high_ts >= low_ts) { + result = (guint32) (high_ts - low_ts); + } else { + result = (guint32) (high_ts + G_MAXUINT32 + 1 - low_ts); + } + + /* return value in ms instead of clock ticks */ + return (guint32) gst_util_uint64_scale_int (result, 1000, data->clock_rate); +} + +/* Must be called with lock */ +static void +process_buffer (GstRistRtxSend * rtx, GstBuffer * buffer) +{ + GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; + BufferQueueItem *item; + SSRCRtxData *data; + guint16 seqnum; + guint32 ssrc, rtptime; + + /* read the information we want from the buffer */ + gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp); + seqnum = gst_rtp_buffer_get_seq (&rtp); + ssrc = gst_rtp_buffer_get_ssrc (&rtp); + rtptime = gst_rtp_buffer_get_timestamp (&rtp); + gst_rtp_buffer_unmap (&rtp); + + GST_TRACE_OBJECT (rtx, "Processing buffer seqnum: %u, ssrc: %X", seqnum, + ssrc); + + data = gst_rist_rtx_send_get_ssrc_data (rtx, ssrc); + + /* add current rtp buffer to queue history */ + item = g_slice_new0 (BufferQueueItem); + item->seqnum = seqnum; + item->timestamp = rtptime; + item->buffer = gst_buffer_ref (buffer); + g_sequence_append (data->queue, item); + + /* remove oldest packets from history if they are too many */ + if (rtx->max_size_packets) { + while (g_sequence_get_length (data->queue) > rtx->max_size_packets) + g_sequence_remove (g_sequence_get_begin_iter (data->queue)); + } + if (rtx->max_size_time) { + while (gst_rist_rtx_send_get_ts_diff (data) > rtx->max_size_time) + g_sequence_remove (g_sequence_get_begin_iter (data->queue)); + } +} + +static GstFlowReturn +gst_rist_rtx_send_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) +{ + GstRistRtxSend *rtx = GST_RIST_RTX_SEND (parent); + GstFlowReturn ret; + + GST_OBJECT_LOCK (rtx); + process_buffer (rtx, buffer); + GST_OBJECT_UNLOCK (rtx); + ret = gst_pad_push (rtx->srcpad, buffer); + + return ret; +} + +static gboolean +process_buffer_from_list (GstBuffer ** buffer, guint idx, gpointer user_data) +{ + process_buffer (user_data, *buffer); + return TRUE; +} + +static GstFlowReturn +gst_rist_rtx_send_chain_list (GstPad * pad, GstObject * parent, + GstBufferList * list) +{ + GstRistRtxSend *rtx = GST_RIST_RTX_SEND (parent); + GstFlowReturn ret; + + GST_OBJECT_LOCK (rtx); + gst_buffer_list_foreach (list, process_buffer_from_list, rtx); + GST_OBJECT_UNLOCK (rtx); + + ret = gst_pad_push_list (rtx->srcpad, list); + + return ret; +} + +static void +gst_rist_rtx_send_src_loop (GstRistRtxSend * rtx) +{ + GstDataQueueItem *data; + + if (gst_data_queue_pop (rtx->queue, &data)) { + GST_LOG_OBJECT (rtx, "pushing rtx buffer %p", data->object); + + if (G_LIKELY (GST_IS_BUFFER (data->object))) { + GST_OBJECT_LOCK (rtx); + /* Update statistics just before pushing. */ + rtx->num_rtx_packets++; + GST_OBJECT_UNLOCK (rtx); + + gst_pad_push (rtx->srcpad, GST_BUFFER (data->object)); + } else if (GST_IS_EVENT (data->object)) { + gst_pad_push_event (rtx->srcpad, GST_EVENT (data->object)); + + /* after EOS, we should not send any more buffers, + * even if there are more requests coming in */ + if (GST_EVENT_TYPE (data->object) == GST_EVENT_EOS) { + gst_rist_rtx_send_set_flushing (rtx, TRUE); + } + } else { + g_assert_not_reached (); + } + + data->object = NULL; /* we no longer own that object */ + data->destroy (data); + } else { + GST_LOG_OBJECT (rtx, "flushing"); + gst_pad_pause_task (rtx->srcpad); + } +} + +static gboolean +gst_rist_rtx_send_activate_mode (GstPad * pad, GstObject * parent, + GstPadMode mode, gboolean active) +{ + GstRistRtxSend *rtx = GST_RIST_RTX_SEND (parent); + gboolean ret = FALSE; + + switch (mode) { + case GST_PAD_MODE_PUSH: + if (active) { + gst_rist_rtx_send_set_flushing (rtx, FALSE); + ret = gst_pad_start_task (rtx->srcpad, + (GstTaskFunction) gst_rist_rtx_send_src_loop, rtx, NULL); + } else { + gst_rist_rtx_send_set_flushing (rtx, TRUE); + ret = gst_pad_stop_task (rtx->srcpad); + } + GST_INFO_OBJECT (rtx, "activate_mode: active %d, ret %d", active, ret); + break; + default: + break; + } + return ret; +} + +static void +gst_rist_rtx_send_get_property (GObject * object, + guint prop_id, GValue * value, GParamSpec * pspec) +{ + GstRistRtxSend *rtx = GST_RIST_RTX_SEND (object); + + switch (prop_id) { + case PROP_MAX_SIZE_TIME: + GST_OBJECT_LOCK (rtx); + g_value_set_uint (value, rtx->max_size_time); + GST_OBJECT_UNLOCK (rtx); + break; + case PROP_MAX_SIZE_PACKETS: + GST_OBJECT_LOCK (rtx); + g_value_set_uint (value, rtx->max_size_packets); + GST_OBJECT_UNLOCK (rtx); + break; + case PROP_NUM_RTX_REQUESTS: + GST_OBJECT_LOCK (rtx); + g_value_set_uint (value, rtx->num_rtx_requests); + GST_OBJECT_UNLOCK (rtx); + break; + case PROP_NUM_RTX_PACKETS: + GST_OBJECT_LOCK (rtx); + g_value_set_uint (value, rtx->num_rtx_packets); + GST_OBJECT_UNLOCK (rtx); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_rist_rtx_send_set_property (GObject * object, + guint prop_id, const GValue * value, GParamSpec * pspec) +{ + GstRistRtxSend *rtx = GST_RIST_RTX_SEND (object); + + switch (prop_id) { + case PROP_MAX_SIZE_TIME: + GST_OBJECT_LOCK (rtx); + rtx->max_size_time = g_value_get_uint (value); + GST_OBJECT_UNLOCK (rtx); + break; + case PROP_MAX_SIZE_PACKETS: + GST_OBJECT_LOCK (rtx); + rtx->max_size_packets = g_value_get_uint (value); + GST_OBJECT_UNLOCK (rtx); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static GstStateChangeReturn +gst_rist_rtx_send_change_state (GstElement * element, GstStateChange transition) +{ + GstStateChangeReturn ret; + GstRistRtxSend *rtx = GST_RIST_RTX_SEND (element); + + ret = + GST_ELEMENT_CLASS (gst_rist_rtx_send_parent_class)->change_state (element, + transition); + + switch (transition) { + case GST_STATE_CHANGE_PAUSED_TO_READY: + gst_rist_rtx_send_reset (rtx); + break; + default: + break; + } + + return ret; +} diff --git a/gst/rist/gstristsink.c b/gst/rist/gstristsink.c new file mode 100644 index 000000000..0bad78dac --- /dev/null +++ b/gst/rist/gstristsink.c @@ -0,0 +1,838 @@ +/* GStreamer RIST plugin + * Copyright (C) 2019 Net Insight AB + * Author: Nicolas Dufresne <nicolas.dufresne@collabora.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * SECTION:element-ristsink + * @title: ristsink + * @see_also: ristsrc + * + * This element implements RIST TR-06-1 Simple Profile transmitter. It + * currently supports any registered RTP payload types such as MPEG TS. The + * stream passed to this element must be RTP payloaded already. Even though + * RTP SSRC collision is rare in unidirectional streaming, this element expect + * the upstream elements to obey to collision events and change the SSRC in + * use. Collision will ocure when tranmitting and receiving over multicast on + * the same host. + * + * ## Example launch line + * |[ + * gst-launch-1.0 udpsrc ! tsparse set-timestamp=1 ! rtpmp2pay ! ristsink address=10.0.0.1 port=5004 + * ]| + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <gio/gio.h> +#include <gst/rtp/rtp.h> + +#include "gstrist.h" + +GST_DEBUG_CATEGORY_STATIC (gst_rist_sink_debug); +#define GST_CAT_DEFAULT gst_rist_sink_debug + +enum +{ + PROP_ADDRESS = 1, + PROP_PORT, + PROP_SENDER_BUFFER, + PROP_MIN_RTCP_INTERVAL, + PROP_MAX_RTCP_BANDWIDTH, + PROP_STATS_UPDATE_INTERVAL, + PROP_STATS, + PROP_CNAME, + PROP_MULTICAST_LOOPBACK, + PROP_MULTICAST_IFACE, + PROP_MULTICAST_TTL +}; + +static GstStaticPadTemplate sink_templ = GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("application/x-rtp")); + +struct _GstRistSink +{ + GstBin parent; + + /* Elements contained in the pipeline */ + GstElement *rtpbin; + GstElement *rtp_sink; + GstElement *rtcp_src; + GstElement *rtcp_sink; + GstElement *ssrc_filter; + GstPad *sinkpad; + + /* RTX Elements */ + GstElement *rtxbin; + GstElement *rtx_send; + + /* For stats */ + guint stats_interval; + guint32 rtp_ssrc; + guint32 rtcp_ssrc; + GstClockID stats_cid; + + /* This is set whenever there is a pipeline construction failure, and used + * to fail state changes later */ + gboolean construct_failed; + const gchar *missing_plugin; +}; + +G_DEFINE_TYPE_WITH_CODE (GstRistSink, gst_rist_sink, GST_TYPE_BIN, + GST_DEBUG_CATEGORY_INIT (gst_rist_sink_debug, "ristsink", 0, "RIST Sink")); + +static GstCaps * +gst_rist_sink_request_pt_map (GstRistSrc * sink, GstElement * session, guint pt) +{ + const GstRTPPayloadInfo *pt_info; + GstCaps *ret; + + pt_info = gst_rtp_payload_info_for_pt (pt); + if (!pt_info || !pt_info->clock_rate) + return NULL; + + ret = gst_caps_new_simple ("application/x-rtp", + "media", G_TYPE_STRING, pt_info->media, + "encoding_name", G_TYPE_STRING, pt_info->encoding_name, + "clock-rate", G_TYPE_INT, (gint) pt_info->clock_rate, NULL); + + /* FIXME add sprop-parameter-set if any */ + g_warn_if_fail (pt_info->encoding_parameters == NULL); + + return ret; +} + +static GstElement * +gst_rist_sink_request_aux_sender (GstRistSink * sink, guint session_id, + GstElement * rtpbin) +{ + if (session_id != 0) + return NULL; + + return gst_object_ref (sink->rtxbin); +} + +static void +on_app_rtcp (GObject * session, guint32 subtype, guint32 ssrc, + const gchar * name, GstBuffer * data, GstElement * rtpsession) +{ + if (g_str_equal (name, "RIST")) { + GstEvent *event; + GstPad *send_rtp_sink; + GstMapInfo map; + gint i; + + send_rtp_sink = gst_element_get_static_pad (rtpsession, "send_rtp_sink"); + if (send_rtp_sink) { + gst_buffer_map (data, &map, GST_MAP_READ); + + for (i = 0; i < map.size; i += sizeof (guint32)) { + guint32 dword = GST_READ_UINT32_BE (map.data + i); + guint16 seqnum = dword >> 16; + guint16 num = dword & 0x0000FFFF; + guint16 j; + + GST_DEBUG ("got RIST nack packet, #%u %u", seqnum, num); + + /* num is inclusive, i.e. it can be 0, which means exactly 1 seqnum */ + for (j = 0; j <= num; j++) { + event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, + gst_structure_new ("GstRTPRetransmissionRequest", + "seqnum", G_TYPE_UINT, (guint) seqnum + j, + "ssrc", G_TYPE_UINT, (guint) ssrc, NULL)); + gst_pad_push_event (send_rtp_sink, event); + } + } + + gst_buffer_unmap (data, &map); + gst_object_unref (send_rtp_sink); + } + } +} + +static void +gst_rist_sink_on_new_sender_ssrc (GstRistSink * sink, guint session_id, + guint ssrc, GstElement * rtpbin) +{ + GObject *gstsession = NULL; + GObject *session = NULL; + GObject *source = NULL; + + if (session_id != 0) + return; + + g_signal_emit_by_name (rtpbin, "get-session", session_id, &gstsession); + g_signal_emit_by_name (rtpbin, "get-internal-session", session_id, &session); + g_signal_emit_by_name (session, "get-source-by-ssrc", ssrc, &source); + + if (ssrc & 1) + g_object_set (source, "disable-rtcp", TRUE, NULL); + else + g_signal_connect (session, "on-app-rtcp", (GCallback) on_app_rtcp, + gstsession); + + g_object_unref (source); + g_object_unref (session); +} + +static void +gst_rist_sink_on_new_receiver_ssrc (GstRistSink * sink, guint session_id, + guint ssrc, GstElement * rtpbin) +{ + if (session_id != 0) + return; + + GST_INFO_OBJECT (sink, "Got RTCP remote SSRC %u", ssrc); + sink->rtcp_ssrc = ssrc; +} + +static GstPadProbeReturn +gst_rist_sink_fix_collision (GstPad * pad, GstPadProbeInfo * info, + gpointer user_data) +{ + GstEvent *event = info->data; + const GstStructure *cs; + GstStructure *s; + guint ssrc; + + /* We simply ignore collisions */ + if (GST_EVENT_TYPE (event) != GST_EVENT_CUSTOM_UPSTREAM) + return GST_PAD_PROBE_OK; + + cs = gst_event_get_structure (event); + if (!gst_structure_has_name (cs, "GstRTPCollision")) + return GST_PAD_PROBE_OK; + + gst_structure_get_uint (cs, "suggested-ssrc", &ssrc); + if ((ssrc & 1) == 0) + return GST_PAD_PROBE_OK; + + event = info->data = gst_event_make_writable (event); + /* we can drop the const qualifier as we ensured writability */ + s = (GstStructure *) gst_event_get_structure (event); + gst_structure_set (s, "suggested-ssrc", G_TYPE_UINT, ssrc - 1, NULL); + + return GST_PAD_PROBE_OK; +} + +static gboolean +gst_rist_sink_set_caps (GstRistSink * sink, GstCaps * caps) +{ + const GstStructure *s = gst_caps_get_structure (caps, 0); + + if (!gst_structure_get_uint (s, "ssrc", &sink->rtp_ssrc)) { + GST_ELEMENT_ERROR (sink, CORE, NEGOTIATION, ("No 'ssrc' field in caps."), + (NULL)); + return FALSE; + } + + if (sink->rtp_ssrc & 1) { + GST_ELEMENT_ERROR (sink, CORE, NEGOTIATION, + ("Invalid RIST SSRC, LSB must be zero."), (NULL)); + return FALSE; + } + + return TRUE; +} + +static gboolean +gst_rist_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) +{ + GstRistSink *sink = GST_RIST_SINK (parent); + GstCaps *caps; + gboolean ret = TRUE; + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_CAPS: + gst_event_parse_caps (event, &caps); + ret = gst_rist_sink_set_caps (sink, caps); + break; + default: + break; + } + + if (ret) + ret = gst_pad_event_default (pad, parent, event); + else + gst_event_unref (event); + + return ret; +} + +static void +gst_rist_sink_init (GstRistSink * sink) +{ + GstPad *ssrc_filter_sinkpad; + GstCaps *ssrc_caps; + GstPad *pad, *gpad; + GstStructure *sdes = NULL; + + /* Construct the RIST RTP sender pipeline. + * + * capsfilter*-> [send_rtp_sink_%u] -------- [send_rtp_src_%u] -> udpsink + * | rtpbin | + * udpsrc -> [recv_rtcp_sink_%u] -------- [send_rtcp_src_%u] -> * udpsink + * + * * To select RIST compatible SSRC + */ + sink->rtpbin = gst_element_factory_make ("rtpbin", "rist_send_rtbpin"); + if (!sink->rtpbin) { + sink->missing_plugin = "rtpmanager"; + goto missing_plugin; + } + + /* RIST specification says the SDES should only contain the CNAME */ + g_object_get (sink->rtpbin, "sdes", &sdes, NULL); + gst_structure_remove_field (sdes, "tool"); + + gst_bin_add (GST_BIN (sink), sink->rtpbin); + g_object_set (sink->rtpbin, "do-retransmission", TRUE, + "rtp-profile", 3 /* AVFP */ , + "sdes", sdes, NULL); + gst_structure_free (sdes); + + g_signal_connect_swapped (sink->rtpbin, "request-pt-map", + G_CALLBACK (gst_rist_sink_request_pt_map), sink); + g_signal_connect_swapped (sink->rtpbin, "request-aux-sender", + G_CALLBACK (gst_rist_sink_request_aux_sender), sink); + g_signal_connect_swapped (sink->rtpbin, "on-new-sender-ssrc", + G_CALLBACK (gst_rist_sink_on_new_sender_ssrc), sink); + g_signal_connect_swapped (sink->rtpbin, "on-new-ssrc", + G_CALLBACK (gst_rist_sink_on_new_receiver_ssrc), sink); + + sink->rtxbin = gst_bin_new ("rist_send_rtxbin"); + g_object_ref_sink (sink->rtxbin); + sink->rtx_send = gst_element_factory_make ("ristrtxsend", "rist_rtx_send"); + gst_bin_add (GST_BIN (sink->rtxbin), sink->rtx_send); + g_object_set (sink->rtx_send, "max-size-packets", 0, NULL); + + pad = gst_element_get_static_pad (sink->rtx_send, "sink"); + gpad = gst_ghost_pad_new ("sink_0", pad); + gst_object_unref (pad); + gst_element_add_pad (sink->rtxbin, gpad); + + pad = gst_element_get_static_pad (sink->rtx_send, "src"); + gpad = gst_ghost_pad_new ("src_0", pad); + gst_object_unref (pad); + gst_element_add_pad (sink->rtxbin, gpad); + + sink->rtp_sink = gst_element_factory_make ("udpsink", "rist_rtp_udpsink"); + sink->rtcp_src = gst_element_factory_make ("udpsrc", "rist_rtcp_udpsrc"); + sink->rtcp_sink = gst_element_factory_make ("udpsink", "rist_rtcp_udpsink"); + if (!sink->rtp_sink || !sink->rtcp_src || !sink->rtcp_sink) { + g_clear_object (&sink->rtp_sink); + g_clear_object (&sink->rtcp_src); + g_clear_object (&sink->rtcp_sink); + sink->missing_plugin = "udp"; + goto missing_plugin; + } + gst_bin_add_many (GST_BIN (sink), sink->rtp_sink, sink->rtcp_src, + sink->rtcp_sink, NULL); + gst_element_set_locked_state (sink->rtcp_src, TRUE); + gst_element_set_locked_state (sink->rtcp_sink, TRUE); + + sink->ssrc_filter = gst_element_factory_make ("capsfilter", + "rist_ssrc_filter"); + if (!sink->ssrc_filter) { + sink->missing_plugin = "coreelements"; + goto missing_plugin; + } + gst_bin_add (GST_BIN (sink), sink->ssrc_filter); + + sink->rtp_ssrc = g_random_int () & ~1; + ssrc_caps = gst_caps_new_simple ("application/x-rtp", + "ssrc", G_TYPE_UINT, sink->rtp_ssrc, NULL); + gst_caps_append_structure (ssrc_caps, + gst_structure_new_empty ("application/x-rtp")); + g_object_set (sink->ssrc_filter, "caps", ssrc_caps, NULL); + gst_caps_unref (ssrc_caps); + gst_element_link_pads (sink->ssrc_filter, "src", sink->rtpbin, + "send_rtp_sink_0"); + gst_element_link_pads (sink->rtpbin, "send_rtp_src_0", sink->rtp_sink, + "sink"); + gst_element_link_pads (sink->rtcp_src, "src", sink->rtpbin, + "recv_rtcp_sink_0"); + gst_element_link_pads (sink->rtpbin, "send_rtcp_src_0", sink->rtcp_sink, + "sink"); + + ssrc_filter_sinkpad = gst_element_get_static_pad (sink->ssrc_filter, "sink"); + sink->sinkpad = gst_ghost_pad_new_from_template ("sink", ssrc_filter_sinkpad, + gst_static_pad_template_get (&sink_templ)); + gst_pad_set_event_function (sink->sinkpad, gst_rist_sink_event); + gst_element_add_pad (GST_ELEMENT (sink), sink->sinkpad); + gst_object_unref (ssrc_filter_sinkpad); + + gst_pad_add_probe (sink->sinkpad, GST_PAD_PROBE_TYPE_EVENT_UPSTREAM, + gst_rist_sink_fix_collision, sink, NULL); + + return; + +missing_plugin: + { + GST_ERROR_OBJECT (sink, "'%s' plugin is missing.", sink->missing_plugin); + sink->construct_failed = TRUE; + /* Just make our element valid, so we fail cleanly */ + gst_element_add_pad (GST_ELEMENT (sink), + gst_pad_new_from_static_template (&sink_templ, "sink")); + } +} + +static GstStateChangeReturn +gst_rist_sink_start (GstRistSink * sink) +{ + GSocket *socket = NULL; + GInetAddress *iaddr = NULL; + gchar *remote_addr = NULL; + guint remote_port; + GError *error = NULL; + + if (sink->construct_failed) { + GST_ELEMENT_ERROR (sink, CORE, MISSING_PLUGIN, + ("Your GStreamer installation is missing plugin '%s'", + sink->missing_plugin), (NULL)); + return GST_STATE_CHANGE_FAILURE; + } + + g_object_get (sink->rtcp_sink, "host", &remote_addr, "port", &remote_port, + NULL); + + iaddr = g_inet_address_new_from_string (remote_addr); + if (!iaddr) { + GList *results; + GResolver *resolver = NULL; + + resolver = g_resolver_get_default (); + results = g_resolver_lookup_by_name (resolver, remote_addr, NULL, &error); + + if (!results) { + g_object_unref (resolver); + goto dns_resolve_failed; + } + + iaddr = G_INET_ADDRESS (g_object_ref (results->data)); + + g_free (remote_addr); + remote_addr = g_inet_address_to_string (iaddr); + + g_resolver_free_addresses (results); + g_object_unref (resolver); + } + + if (g_inet_address_get_is_multicast (iaddr)) { + g_object_set (sink->rtcp_src, "address", remote_addr, "port", remote_port, + NULL); + } else { + const gchar *any_addr; + + if (g_inet_address_get_family (iaddr) == G_SOCKET_FAMILY_IPV6) + any_addr = "::"; + else + any_addr = "0.0.0.0"; + + g_object_set (sink->rtcp_src, "address", any_addr, "port", 0, NULL); + } + g_object_unref (iaddr); + + gst_element_set_locked_state (sink->rtcp_src, FALSE); + gst_element_sync_state_with_parent (sink->rtcp_src); + + /* share the socket created by the sink */ + g_object_get (sink->rtcp_src, "used-socket", &socket, NULL); + g_object_set (sink->rtcp_sink, "socket", socket, "auto-multicast", FALSE, + "close-socket", FALSE, NULL); + g_object_unref (socket); + + gst_element_set_locked_state (sink->rtcp_sink, FALSE); + gst_element_sync_state_with_parent (sink->rtcp_sink); + + return GST_STATE_CHANGE_SUCCESS; + +dns_resolve_failed: + GST_ELEMENT_ERROR (sink, RESOURCE, NOT_FOUND, + ("Could not resolve hostname '%s'", remote_addr), + ("DNS resolver reported: %s", error->message)); + g_free (remote_addr); + g_error_free (error); + return GST_STATE_CHANGE_FAILURE; +} + +static GstStructure * +gst_rist_sink_create_stats (GstRistSink * sink) +{ + GObject *session = NULL, *source = NULL; + GstStructure *sstats = NULL, *ret; + guint64 pkt_sent = 0, rtx_sent = 0, rtt; + guint rb_rtt = 0; + + ret = gst_structure_new_empty ("rist/x-sender-stats"); + + g_signal_emit_by_name (sink->rtpbin, "get-internal-session", 0, &session); + if (!session) + return ret; + + g_signal_emit_by_name (session, "get-source-by-ssrc", sink->rtp_ssrc, + &source); + if (source) { + g_object_get (source, "stats", &sstats, NULL); + gst_structure_get_uint64 (sstats, "packets-sent", &pkt_sent); + gst_structure_free (sstats); + g_clear_object (&source); + } + + g_signal_emit_by_name (session, "get-source-by-ssrc", sink->rtcp_ssrc, + &source); + if (source) { + g_object_get (source, "stats", &sstats, NULL); + gst_structure_get_uint (sstats, "rb-round-trip", &rb_rtt); + gst_structure_free (sstats); + g_clear_object (&source); + } + g_object_unref (session); + + g_object_get (sink->rtx_send, "num-rtx-packets", &rtx_sent, NULL); + + /* rb_rtt is in Q16 in NTP time */ + rtt = gst_util_uint64_scale (rb_rtt, GST_SECOND, 65536); + + gst_structure_set (ret, "sent-original-packets", G_TYPE_UINT64, pkt_sent, + "sent-retransmitted-packets", G_TYPE_UINT64, rtx_sent, + "round-trip-time", G_TYPE_UINT64, rtt, NULL); + + return ret; +} + +static gboolean +gst_rist_sink_dump_stats (GstClock * clock, GstClockTime time, GstClockID id, + gpointer user_data) +{ + GstRistSink *sink = GST_RIST_SINK (user_data); + GstStructure *stats = gst_rist_sink_create_stats (sink); + + gst_println ("%s: %" GST_PTR_FORMAT, GST_OBJECT_NAME (sink), stats); + + gst_structure_free (stats); + return TRUE; +} + +static void +gst_rist_sink_enable_stats_interval (GstRistSink * sink) +{ + GstClock *clock; + GstClockTime start, interval; + + if (sink->stats_interval == 0) + return; + + interval = sink->stats_interval * GST_MSECOND; + clock = gst_system_clock_obtain (); + start = gst_clock_get_time (clock) + interval; + + sink->stats_cid = gst_clock_new_periodic_id (clock, start, interval); + gst_clock_id_wait_async (sink->stats_cid, gst_rist_sink_dump_stats, + gst_object_ref (sink), (GDestroyNotify) gst_object_unref); + + gst_object_unref (clock); +} + +static void +gst_rist_sink_disable_stats_interval (GstRistSink * sink) +{ + if (sink->stats_cid) { + gst_clock_id_unschedule (sink->stats_cid); + gst_clock_id_unref (sink->stats_cid); + sink->stats_cid = NULL; + } +} + +static GstStateChangeReturn +gst_rist_sink_change_state (GstElement * element, GstStateChange transition) +{ + GstRistSink *sink = GST_RIST_SINK (element); + GstStateChangeReturn ret; + + switch (transition) { + case GST_STATE_CHANGE_PAUSED_TO_READY: + gst_rist_sink_disable_stats_interval (sink); + break; + default: + break; + } + + ret = GST_ELEMENT_CLASS (gst_rist_sink_parent_class)->change_state (element, + transition); + + switch (transition) { + case GST_STATE_CHANGE_NULL_TO_READY: + ret = gst_rist_sink_start (sink); + break; + case GST_STATE_CHANGE_READY_TO_PAUSED: + gst_rist_sink_enable_stats_interval (sink); + break; + default: + break; + } + + return ret; +} + +static void +gst_rist_sink_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstRistSink *sink = GST_RIST_SINK (object); + GstElement *session = NULL; + GstClockTime interval; + GstStructure *sdes; + + if (sink->construct_failed) + return; + + switch (prop_id) { + case PROP_ADDRESS: + g_object_get_property (G_OBJECT (sink->rtp_sink), "host", value); + break; + + case PROP_PORT: + g_object_get_property (G_OBJECT (sink->rtp_sink), "port", value); + break; + + case PROP_SENDER_BUFFER: + g_object_get_property (G_OBJECT (sink->rtx_send), "max-size-time", value); + break; + + case PROP_MIN_RTCP_INTERVAL: + g_signal_emit_by_name (sink->rtpbin, "get-session", 0, &session); + g_object_get (session, "rtcp-min-interval", &interval, NULL); + g_value_set_uint (value, (guint) (interval / GST_MSECOND)); + g_object_unref (session); + break; + + case PROP_MAX_RTCP_BANDWIDTH: + g_signal_emit_by_name (sink->rtpbin, "get-session", 0, &session); + g_object_get_property (G_OBJECT (session), "rtcp-fraction", value); + g_object_unref (session); + break; + + case PROP_STATS_UPDATE_INTERVAL: + g_value_set_uint (value, sink->stats_interval); + break; + + case PROP_STATS: + g_value_take_boxed (value, gst_rist_sink_create_stats (sink)); + break; + + case PROP_CNAME: + g_object_get (sink->rtpbin, "sdes", &sdes, NULL); + g_value_set_string (value, gst_structure_get_string (sdes, "cname")); + gst_structure_free (sdes); + break; + + case PROP_MULTICAST_LOOPBACK: + g_object_get_property (G_OBJECT (sink->rtp_sink), "loop", value); + break; + + case PROP_MULTICAST_IFACE: + g_object_get_property (G_OBJECT (sink->rtp_sink), + "multicast-iface", value); + break; + + case PROP_MULTICAST_TTL: + g_object_get_property (G_OBJECT (sink->rtp_sink), "ttl-mc", value); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_rist_sink_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstRistSink *sink = GST_RIST_SINK (object); + GstElement *session = NULL; + GstStructure *sdes; + + if (sink->construct_failed) + return; + + switch (prop_id) { + case PROP_ADDRESS: + g_object_set_property (G_OBJECT (sink->rtp_sink), "host", value); + g_object_set_property (G_OBJECT (sink->rtcp_sink), "host", value); + break; + + case PROP_PORT:{ + guint port = g_value_get_uint (value); + + /* According to 5.1.1, RTCP receiver port most be event number and RTCP + * port should be the RTP port + 1 */ + + if (port & 0x1) { + g_warning ("Invalid RIST port %u, should be an even number.", port); + return; + } + + g_object_set (sink->rtp_sink, "port", port, NULL); + g_object_set (sink->rtcp_sink, "port", port + 1, NULL); + break; + } + + case PROP_SENDER_BUFFER: + g_object_set (sink->rtx_send, + "max-size-time", g_value_get_uint (value), NULL); + break; + + case PROP_MIN_RTCP_INTERVAL: + g_signal_emit_by_name (sink->rtpbin, "get-session", 0, &session); + g_object_set (session, "rtcp-min-interval", + g_value_get_uint (value) * GST_MSECOND, NULL); + g_object_unref (session); + break; + + case PROP_MAX_RTCP_BANDWIDTH: + g_signal_emit_by_name (sink->rtpbin, "get-session", 0, &session); + g_object_set (session, "rtcp-fraction", g_value_get_double (value), NULL); + g_object_unref (session); + break; + + case PROP_STATS_UPDATE_INTERVAL: + sink->stats_interval = g_value_get_uint (value); + break; + + case PROP_CNAME: + g_object_get (sink->rtpbin, "sdes", &sdes, NULL); + gst_structure_set_value (sdes, "cname", value); + g_object_set (sink->rtpbin, "sdes", sdes, NULL); + gst_structure_free (sdes); + break; + + case PROP_MULTICAST_LOOPBACK: + g_object_set_property (G_OBJECT (sink->rtp_sink), "loop", value); + g_object_set_property (G_OBJECT (sink->rtcp_sink), "loop", value); + break; + + case PROP_MULTICAST_IFACE: + g_object_set_property (G_OBJECT (sink->rtp_sink), + "multicast-iface", value); + g_object_set_property (G_OBJECT (sink->rtcp_sink), + "multicast-iface", value); + break; + + case PROP_MULTICAST_TTL: + g_object_set_property (G_OBJECT (sink->rtp_sink), "ttl-mc", value); + g_object_set_property (G_OBJECT (sink->rtcp_sink), "ttl-mc", value); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_rist_sink_finalize (GObject * object) +{ + GstRistSink *sink = GST_RIST_SINK (object); + g_clear_object (&sink->rtxbin); + + G_OBJECT_CLASS (gst_rist_sink_parent_class)->finalize (object); +} + +static void +gst_rist_sink_class_init (GstRistSinkClass * klass) +{ + GstElementClass *element_class = (GstElementClass *) klass; + GObjectClass *object_class = (GObjectClass *) klass; + + gst_element_class_set_metadata (element_class, + "RIST Sink", "Source/Network", + "Sink that implements RIST TR-06-1 streaming specification", + "Nicolas Dufresne <nicolas.dufresne@collabora.com"); + gst_element_class_add_static_pad_template (element_class, &sink_templ); + + element_class->change_state = gst_rist_sink_change_state; + + object_class->get_property = gst_rist_sink_get_property; + object_class->set_property = gst_rist_sink_set_property; + object_class->finalize = gst_rist_sink_finalize; + + g_object_class_install_property (object_class, PROP_ADDRESS, + g_param_spec_string ("address", "Address", + "Address to send packets to (can be IPv4 or IPv6).", "0.0.0.0", + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (object_class, PROP_PORT, + g_param_spec_uint ("port", "Port", "The port RTP packets will be sent, " + "RTCP port is derived from it, this port must be an even number.", + 2, 65534, 5004, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (object_class, PROP_SENDER_BUFFER, + g_param_spec_uint ("sender-buffer", "Sender Buffer", + "Size of the retransmission queue in ms", 0, G_MAXUINT, 1200, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (object_class, PROP_MIN_RTCP_INTERVAL, + g_param_spec_uint ("min-rtcp-interval", "Minimum RTCP Intercal", + "The minimum interval in ms between two regular successive RTCP " + "packets.", 0, 100, 100, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (object_class, PROP_MAX_RTCP_BANDWIDTH, + g_param_spec_double ("max-rtcp-bandwidth", "Maximum RTCP Bandwidth", + "The maximum bandwidth used for RTCP in fraction of RTP bandwdith", + 0.0, 0.05, 0.05, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (object_class, PROP_STATS_UPDATE_INTERVAL, + g_param_spec_uint ("stats-update-interval", "Statistics Update Interval", + "The interval between 'stats' update notification (0 disabled)", + 0, G_MAXUINT, 0, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (object_class, PROP_STATS, + g_param_spec_boxed ("stats", "Statistics", + "Statistic in a GstStructure named 'rist/x-sender-stats'", + GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (object_class, PROP_CNAME, + g_param_spec_string ("cname", "CName", + "Set the CNAME in the SDES block of the sender report.", NULL, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (object_class, PROP_MULTICAST_LOOPBACK, + g_param_spec_boolean ("multicast-loopback", "Multicast Loopback", + "When enabled, the packet will be received locally.", FALSE, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (object_class, PROP_MULTICAST_IFACE, + g_param_spec_string ("multicast-iface", "multicast-iface", + "The multicast interface to use to send packets.", NULL, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (object_class, PROP_MULTICAST_TTL, + g_param_spec_int ("multicast-ttl", "Multicast TTL", + "The multicast time-to-live parameter.", 0, 255, 1, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); +} diff --git a/gst/rist/gstristsrc.c b/gst/rist/gstristsrc.c new file mode 100644 index 000000000..9e8d403f6 --- /dev/null +++ b/gst/rist/gstristsrc.c @@ -0,0 +1,1021 @@ +/* GStreamer RIST plugin + * Copyright (C) 2019 Net Insight AB + * Author: Nicolas Dufresne <nicolas.dufresne@collabora.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * SECTION:element-ristsrc + * @title: ristsrc + * @see_also: ristsink + * + * This element implements RIST TR-06-1 Simple Profile receiver. The stream + * produced by this element will be RTP payloaded. This element also implement + * the URI scheme `rist://` allowing to render RIST streams in GStreamer based + * media players. The RIST uri handler also allow setting propertied through + * the URI query. + * + * ## Example launch line + * |[ + * gst-launch-1.0 ristsrc address=0.0.0.0 port=5004 ! rtpmp2depay ! udpsink + * gst-play-1.0 "rist://0.0.0.0:5004?receiver-buffer=700" + * ]| + */ + + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <gio/gio.h> +#include <gst/net/net.h> +#include <gst/rtp/rtp.h> + +/* for setsockopt() */ +#ifndef G_OS_WIN32 +#include <sys/types.h> +#include <sys/socket.h> +#endif + +#include "gstrist.h" + +GST_DEBUG_CATEGORY_STATIC (gst_rist_src_debug); +#define GST_CAT_DEFAULT gst_rist_src_debug + +enum +{ + PROP_ADDRESS = 1, + PROP_PORT, + PROP_RECEIVER_BUFFER, + PROP_REORDER_SECTION, + PROP_MAX_RTX_RETRIES, + PROP_MIN_RTCP_INTERVAL, + PROP_MAX_RTCP_BANDWIDTH, + PROP_STATS_UPDATE_INTERVAL, + PROP_STATS, + PROP_CNAME, + PROP_MULTICAST_LOOPBACK, + PROP_MULTICAST_IFACE, + PROP_MULTICAST_TTL +}; + +static GstStaticPadTemplate src_templ = GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("application/x-rtp")); + +struct _GstRistSrc +{ + GstBin parent; + + GstUri *uri; + + /* Elements contained in the pipeline, the rtp/rtcp_src are 'udpsrc' */ + GstElement *rtpbin; + GstElement *rtp_src; + GstElement *rtcp_src; + GstElement *rtcp_sink; + gulong rtcp_recv_probe; + gulong rtcp_send_probe; + GSocketAddress *rtcp_send_addr; + GstPad *srcpad; + gint multicast_ttl; + + /* RTX Elements */ + GstElement *rtxbin; + GstElement *rtx_receive; + + /* For property handling */ + guint reorder_section; + guint max_rtx_retries; + + /* For stats */ + guint stats_interval; + guint32 rtp_ssrc; + GstClockID stats_cid; + GstElement *jitterbuffer; + + /* This is set whenever there is a pipeline construction failure, and used + * to fail state changes later */ + gboolean construct_failed; + const gchar *missing_plugin; +}; + +static void gst_rist_src_uri_init (gpointer g_iface, gpointer iface_data); + +G_DEFINE_TYPE_WITH_CODE (GstRistSrc, gst_rist_src, GST_TYPE_BIN, + G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_rist_src_uri_init); + GST_DEBUG_CATEGORY_INIT (gst_rist_src_debug, "ristsrc", 0, "RIST Source")); + +static void +gst_rist_src_pad_added (GstRistSrc * src, GstPad * new_pad, GstElement * rtpbin) +{ + GST_TRACE_OBJECT (src, "New pad '%s'.", GST_PAD_NAME (new_pad)); + + if (g_str_has_prefix (GST_PAD_NAME (new_pad), "recv_rtp_src_0_")) { + GST_DEBUG_OBJECT (src, "Using new pad '%s' as ghost pad target.", + GST_PAD_NAME (new_pad)); + gst_ghost_pad_set_target (GST_GHOST_PAD (src->srcpad), new_pad); + } +} + +static GstCaps * +gst_rist_src_request_pt_map (GstRistSrc * src, GstElement * session, guint pt) +{ + const GstRTPPayloadInfo *pt_info; + GstCaps *ret; + + pt_info = gst_rtp_payload_info_for_pt (pt); + if (!pt_info || !pt_info->clock_rate) + return NULL; + + ret = gst_caps_new_simple ("application/x-rtp", + "media", G_TYPE_STRING, pt_info->media, + "encoding_name", G_TYPE_STRING, pt_info->encoding_name, + "clock-rate", G_TYPE_INT, (gint) pt_info->clock_rate, NULL); + + /* FIXME add sprop-parameter-set if any */ + g_warn_if_fail (pt_info->encoding_parameters == NULL); + + return ret; +} + +static GstElement * +gst_rist_src_request_aux_receiver (GstRistSrc * src, guint session_id, + GstElement * rtpbin) +{ + if (session_id != 0) + return NULL; + + return gst_object_ref (src->rtxbin); +} + +/* Overrides the nack creation. Right now we don't send mixed NACKS type, we + * simply send a set of range NACK if it takes less space, or allow adding + * more seqnum. */ +static guint +gst_rist_src_on_sending_nacks (GObject * session, guint sender_ssrc, + guint media_ssrc, GArray * nacks, GstBuffer * buffer, gpointer user_data) +{ + GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT; + GstRTCPPacket packet; + guint8 *app_data; + guint nacked_seqnums = 0; + guint range_size = 0; + guint n_rg_nacks = 0; + guint n_fb_nacks = 0; + guint16 seqnum; + guint i; + gint diff; + + /* We'll assume that range will be best, and find how many generic NACK + * would have been created. If this number ends up being smaller, we will + * just remove the APP packet and return 0, leaving it to RTPSession to + * create the generic NACK.*/ + + gst_rtcp_buffer_map (buffer, GST_MAP_READWRITE, &rtcp); + if (!gst_rtcp_buffer_add_packet (&rtcp, GST_RTCP_TYPE_APP, &packet)) + /* exit because the packet is full, will put next request in a + * further packet */ + goto done; + + gst_rtcp_packet_app_set_ssrc (&packet, media_ssrc); + gst_rtcp_packet_app_set_name (&packet, "RIST"); + + if (!gst_rtcp_packet_app_set_data_length (&packet, 1)) { + gst_rtcp_packet_remove (&packet); + GST_WARNING ("no range nacks fit in the packet"); + goto done; + } + + app_data = gst_rtcp_packet_app_get_data (&packet); + for (i = 0; i < nacks->len; i = nacked_seqnums) { + guint j; + seqnum = g_array_index (nacks, guint16, i); + + if (!gst_rtcp_packet_app_set_data_length (&packet, n_rg_nacks + 1)) + break; + + n_rg_nacks++; + nacked_seqnums++; + + for (j = i + 1; j < nacks->len; j++) { + guint16 next_seqnum = g_array_index (nacks, guint16, j); + diff = gst_rtp_buffer_compare_seqnum (seqnum, next_seqnum); + GST_TRACE ("[%u][%u] %u %u diff %i", i, j, seqnum, next_seqnum, diff); + if (diff > (j - i)) + break; + + nacked_seqnums++; + } + + range_size = j - i - 1; + GST_WRITE_UINT32_BE (app_data, seqnum << 16 | range_size); + app_data += 4; + } + + /* count how many FB NACK it would take to wrap nacked_seqnums */ + seqnum = g_array_index (nacks, guint16, 0); + n_fb_nacks = 1; + for (i = 1; i < nacked_seqnums; i++) { + guint16 next_seqnum = g_array_index (nacks, guint16, i); + diff = gst_rtp_buffer_compare_seqnum (seqnum, next_seqnum); + if (diff > 16) { + n_fb_nacks++; + seqnum = next_seqnum; + } + } + + if (n_fb_nacks <= n_rg_nacks) { + GST_DEBUG ("Not sending %u range nacks, as %u FB nacks will be smaller", + n_rg_nacks, n_fb_nacks); + gst_rtcp_packet_remove (&packet); + nacked_seqnums = 0; + goto done; + } + + GST_DEBUG ("Sent %u seqnums into %u Range NACKs", nacked_seqnums, n_rg_nacks); + +done: + gst_rtcp_buffer_unmap (&rtcp); + return nacked_seqnums; +} + +static void +gst_rist_src_on_new_ssrc (GstRistSrc * src, guint session_id, guint ssrc, + GstElement * rtpbin) +{ + GObject *session = NULL; + GObject *source = NULL; + + if (session_id != 0) + return; + + g_signal_emit_by_name (rtpbin, "get-internal-session", session_id, &session); + g_signal_emit_by_name (session, "get-source-by-ssrc", ssrc, &source); + + if (ssrc & 1) + g_object_set (source, "disable-rtcp", TRUE, "probation", 0, NULL); + else + g_signal_connect (session, "on-sending-nacks", + (GCallback) gst_rist_src_on_sending_nacks, NULL); + + g_object_unref (source); + g_object_unref (session); +} + +static void +gst_rist_src_new_jitterbuffer (GstRistSrc * src, GstElement * jitterbuffer, + guint session, guint ssrc, GstElement * rtpbin) +{ + GST_OBJECT_LOCK (src); + g_object_set (jitterbuffer, "rtx-delay", src->reorder_section, + "rtx-max-retries", src->max_rtx_retries, NULL); + + if ((ssrc & 1) == 0) { + GST_INFO_OBJECT (src, "Saving jitterbuffer for session %u ssrc %u", + session, ssrc); + g_clear_object (&src->jitterbuffer); + src->jitterbuffer = gst_object_ref (jitterbuffer); + src->rtp_ssrc = ssrc; + } + + GST_OBJECT_UNLOCK (src); +} + +static void +gst_rist_src_init (GstRistSrc * src) +{ + GstPad *pad, *gpad; + GstStructure *sdes = NULL; + + /* Construct the RIST RTP receiver pipeline. + * + * udpsrc -> [recv_rtp_sink_%u] -------- [recv_rtp_src_%u_%u_%u] + * | rtpbin | + * udpsrc -> [recv_rtcp_sink_%u] -------- [send_rtcp_src_%u] -> udpsink + * + * This pipeline is fixed for now, note that optionally an FEC stream could + * be added later. + */ + src->srcpad = gst_ghost_pad_new_no_target_from_template ("src", + gst_static_pad_template_get (&src_templ)); + gst_element_add_pad (GST_ELEMENT (src), src->srcpad); + + src->rtpbin = gst_element_factory_make ("rtpbin", "rist_recv_rtbpin"); + if (!src->rtpbin) { + src->missing_plugin = "rtpmanager"; + goto missing_plugin; + } + + /* RIST specification says the SDES should only contain the CNAME */ + g_object_get (src->rtpbin, "sdes", &sdes, NULL); + gst_structure_remove_field (sdes, "tool"); + + gst_bin_add (GST_BIN (src), src->rtpbin); + g_object_set (src->rtpbin, "do-retransmission", TRUE, + "rtp-profile", 3 /* AVPF */ , + "sdes", sdes, NULL); + + gst_structure_free (sdes); + + g_signal_connect_swapped (src->rtpbin, "request-pt-map", + G_CALLBACK (gst_rist_src_request_pt_map), src); + g_signal_connect_swapped (src->rtpbin, "request-aux-receiver", + G_CALLBACK (gst_rist_src_request_aux_receiver), src); + + src->rtxbin = gst_bin_new ("rist_recv_rtxbin"); + g_object_ref_sink (src->rtxbin); + src->rtx_receive = gst_element_factory_make ("ristrtxreceive", + "rist_rtx_receive"); + gst_bin_add (GST_BIN (src->rtxbin), src->rtx_receive); + + pad = gst_element_get_static_pad (src->rtx_receive, "sink"); + gpad = gst_ghost_pad_new ("sink_0", pad); + gst_object_unref (pad); + gst_element_add_pad (src->rtxbin, gpad); + + pad = gst_element_get_static_pad (src->rtx_receive, "src"); + gpad = gst_ghost_pad_new ("src_0", pad); + gst_object_unref (pad); + gst_element_add_pad (src->rtxbin, gpad); + + src->rtp_src = gst_element_factory_make ("udpsrc", "rist_rtp_udpsrc"); + src->rtcp_src = gst_element_factory_make ("udpsrc", "rist_rtcp_udpsrc"); + src->rtcp_sink = + gst_element_factory_make ("dynudpsink", "rist_rtcp_dynudpsink"); + if (!src->rtp_src || !src->rtcp_src || !src->rtcp_sink) { + g_clear_object (&src->rtp_src); + g_clear_object (&src->rtcp_src); + g_clear_object (&src->rtcp_sink); + src->missing_plugin = "udp"; + goto missing_plugin; + } + gst_bin_add_many (GST_BIN (src), src->rtp_src, src->rtcp_src, + src->rtcp_sink, NULL); + g_object_set (src->rtcp_sink, "sync", FALSE, "async", FALSE, NULL); + /* delay udpsink startup, we will give it the socket from the RTCP udpsrc, + * but socket can only be set in NULL state */ + gst_element_set_locked_state (src->rtcp_sink, TRUE); + + gst_element_link_pads (src->rtp_src, "src", src->rtpbin, "recv_rtp_sink_0"); + gst_element_link_pads (src->rtcp_src, "src", src->rtpbin, "recv_rtcp_sink_0"); + gst_element_link_pads (src->rtpbin, "send_rtcp_src_0", + src->rtcp_sink, "sink"); + + g_signal_connect_swapped (src->rtpbin, "pad-added", + G_CALLBACK (gst_rist_src_pad_added), src); + g_signal_connect_swapped (src->rtpbin, "on-new-ssrc", + G_CALLBACK (gst_rist_src_on_new_ssrc), src); + g_signal_connect_swapped (src->rtpbin, "new-jitterbuffer", + G_CALLBACK (gst_rist_src_new_jitterbuffer), src); + + return; + +missing_plugin: + { + GST_ERROR_OBJECT (src, "'%s' plugin is missing.", src->missing_plugin); + src->construct_failed = TRUE; + } +} + +static GstPadProbeReturn +gst_rist_src_on_recv_rtcp (GstPad * pad, GstPadProbeInfo * info, + gpointer user_data) +{ + GstRistSrc *src = GST_RIST_SRC (user_data); + GstBuffer *buffer; + GstNetAddressMeta *meta; + + if (info->type == GST_PAD_PROBE_TYPE_BUFFER_LIST) { + GstBufferList *buffer_list = info->data; + buffer = gst_buffer_list_get (buffer_list, 0); + } else { + buffer = info->data; + } + + meta = gst_buffer_get_net_address_meta (buffer); + + GST_OBJECT_LOCK (src); + g_clear_object (&src->rtcp_send_addr); + src->rtcp_send_addr = g_object_ref (meta->addr); + GST_OBJECT_UNLOCK (src); + + return GST_PAD_PROBE_OK; +} + +static inline void +gst_rist_src_attach_net_address_meta (GstRistSrc * src, GstBuffer * buffer) +{ + GST_OBJECT_LOCK (src); + if (src->rtcp_send_addr) + gst_buffer_add_net_address_meta (buffer, src->rtcp_send_addr); + GST_OBJECT_UNLOCK (src); +} + +static GstPadProbeReturn +gst_rist_src_on_send_rtcp (GstPad * pad, GstPadProbeInfo * info, + gpointer user_data) +{ + GstRistSrc *src = GST_RIST_SRC (user_data); + + if (info->type == GST_PAD_PROBE_TYPE_BUFFER_LIST) { + GstBufferList *buffer_list = info->data; + GstBuffer *buffer; + gint i; + + info->data = buffer_list = gst_buffer_list_make_writable (buffer_list); + for (i = 0; i < gst_buffer_list_length (buffer_list); i++) { + buffer = gst_buffer_list_get (buffer_list, i); + gst_rist_src_attach_net_address_meta (src, buffer); + } + } else { + GstBuffer *buffer = info->data; + info->data = buffer = gst_buffer_make_writable (buffer); + gst_rist_src_attach_net_address_meta (src, buffer); + } + + return GST_PAD_PROBE_OK; +} + +static GstStateChangeReturn +gst_rist_src_start (GstRistSrc * src) +{ + GstPad *pad; + GSocket *socket = NULL; + gchar *address; + guint rtcp_port; + GInetAddress *iaddr; + + if (src->construct_failed) { + GST_ELEMENT_ERROR (src, CORE, MISSING_PLUGIN, + ("Your GStreamer installation is missing plugin '%s'", + src->missing_plugin), (NULL)); + return GST_STATE_CHANGE_FAILURE; + } + + g_object_get (src->rtcp_src, "used-socket", &socket, + "address", &address, "port", &rtcp_port, NULL); + + iaddr = g_inet_address_new_from_string (address); + g_free (address); + + if (g_inet_address_get_is_multicast (iaddr)) { + /* mc-ttl is not supported by dynudpsink */ + g_socket_set_multicast_ttl (socket, src->multicast_ttl); + /* In multicast, send RTCP to the multicast group */ + src->rtcp_send_addr = g_inet_socket_address_new (iaddr, rtcp_port); + } else { + /* In unicast, send RTCP to the detected sender address */ + pad = gst_element_get_static_pad (src->rtcp_src, "src"); + src->rtcp_recv_probe = gst_pad_add_probe (pad, + GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST, + gst_rist_src_on_recv_rtcp, src, NULL); + gst_object_unref (pad); + } + g_object_unref (iaddr); + + pad = gst_element_get_static_pad (src->rtcp_sink, "sink"); + src->rtcp_send_probe = gst_pad_add_probe (pad, + GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST, + gst_rist_src_on_send_rtcp, src, NULL); + gst_object_unref (pad); + + /* share the socket created by the source */ + g_object_set (src->rtcp_sink, "socket", socket, "close-socket", FALSE, NULL); + g_object_unref (socket); + + gst_element_set_locked_state (src->rtcp_sink, FALSE); + gst_element_sync_state_with_parent (src->rtcp_sink); + + return GST_STATE_CHANGE_SUCCESS; +} + +static GstStructure * +gst_rist_src_create_stats (GstRistSrc * src) +{ + GObject *session = NULL, *source = NULL; + GstStructure *stats = NULL, *ret; + guint64 dropped = 0, received = 0, recovered = 0, lost = 0; + guint64 duplicates = 0, rtx_sent = 0, rtt = 0; + + ret = gst_structure_new_empty ("rist/x-receiver-stats"); + + g_signal_emit_by_name (src->rtpbin, "get-internal-session", 0, &session); + if (!session) + return ret; + + g_signal_emit_by_name (session, "get-source-by-ssrc", src->rtp_ssrc, &source); + if (source) { + gint packets_lost; + g_object_get (source, "stats", &stats, NULL); + gst_structure_get_int (stats, "packets-lost", &packets_lost); + gst_structure_free (stats); + g_clear_object (&source); + dropped = MAX (packets_lost, 0); + } + g_object_unref (session); + + if (src->jitterbuffer) { + g_object_get (src->jitterbuffer, "stats", &stats, NULL); + gst_structure_get (stats, "num-pushed", G_TYPE_UINT64, &received, + "num-lost", G_TYPE_UINT64, &lost, + "rtx-count", G_TYPE_UINT64, &rtx_sent, + "num-duplicates", G_TYPE_UINT64, &duplicates, + "rtx-success-count", G_TYPE_UINT64, &recovered, + "rtx-rtt", G_TYPE_UINT64, &rtt, NULL); + gst_structure_free (stats); + } + + gst_structure_set (ret, "dropped", G_TYPE_UINT64, dropped, + "received", G_TYPE_UINT64, received, + "recovered", G_TYPE_UINT64, recovered, + "permanently-lost", G_TYPE_UINT64, lost, + "duplicates", G_TYPE_UINT64, duplicates, + "retransmission-requests-sent", G_TYPE_UINT64, rtx_sent, + "rtx-roundtrip-time", G_TYPE_UINT64, rtt, NULL); + + return ret; +} + +static gboolean +gst_rist_src_dump_stats (GstClock * clock, GstClockTime time, GstClockID id, + gpointer user_data) +{ + GstRistSrc *src = GST_RIST_SRC (user_data); + GstStructure *stats = gst_rist_src_create_stats (src); + + gst_println ("%s: %" GST_PTR_FORMAT, GST_OBJECT_NAME (src), stats); + + gst_structure_free (stats); + return TRUE; +} + +static void +gst_rist_src_enable_stats_interval (GstRistSrc * src) +{ + GstClock *clock; + GstClockTime start, interval; + + if (src->stats_interval == 0) + return; + + interval = src->stats_interval * GST_MSECOND; + clock = gst_system_clock_obtain (); + start = gst_clock_get_time (clock) + interval; + + src->stats_cid = gst_clock_new_periodic_id (clock, start, interval); + gst_clock_id_wait_async (src->stats_cid, gst_rist_src_dump_stats, + gst_object_ref (src), (GDestroyNotify) gst_object_unref); + + gst_object_unref (clock); +} + +static void +gst_rist_src_disable_stats_interval (GstRistSrc * src) +{ + if (src->stats_cid) { + gst_clock_id_unschedule (src->stats_cid); + gst_clock_id_unref (src->stats_cid); + src->stats_cid = NULL; + } +} + +static void +gst_rist_src_stop (GstRistSrc * src) +{ + GstPad *pad; + + if (src->rtcp_recv_probe) { + pad = gst_element_get_static_pad (src->rtcp_src, "src"); + gst_pad_remove_probe (pad, src->rtcp_recv_probe); + src->rtcp_recv_probe = 0; + gst_object_unref (pad); + } + + pad = gst_element_get_static_pad (src->rtcp_sink, "sink"); + gst_pad_remove_probe (pad, src->rtcp_send_probe); + src->rtcp_send_probe = 0; + gst_object_unref (pad); +} + +static GstStateChangeReturn +gst_rist_src_change_state (GstElement * element, GstStateChange transition) +{ + GstRistSrc *src = GST_RIST_SRC (element); + GstStateChangeReturn ret; + + switch (transition) { + case GST_STATE_CHANGE_PAUSED_TO_READY: + gst_rist_src_disable_stats_interval (src); + break; + default: + break; + } + + ret = GST_ELEMENT_CLASS (gst_rist_src_parent_class)->change_state (element, + transition); + + switch (transition) { + case GST_STATE_CHANGE_NULL_TO_READY: + gst_rist_src_start (src); + break; + case GST_STATE_CHANGE_READY_TO_PAUSED: + gst_rist_src_enable_stats_interval (src); + break; + case GST_STATE_CHANGE_READY_TO_NULL: + gst_rist_src_stop (src); + break; + default: + break; + } + + return ret; +} + +static void +gst_rist_src_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstRistSrc *src = GST_RIST_SRC (object); + GstElement *session = NULL; + GstClockTime interval; + GstStructure *sdes; + + if (src->construct_failed) + return; + + switch (prop_id) { + case PROP_ADDRESS: + g_object_get_property (G_OBJECT (src->rtp_src), "address", value); + break; + + case PROP_PORT: + g_object_get_property (G_OBJECT (src->rtp_src), "port", value); + break; + + case PROP_RECEIVER_BUFFER: + g_object_get_property (G_OBJECT (src->rtpbin), "latency", value); + break; + + case PROP_REORDER_SECTION: + GST_OBJECT_LOCK (src); + g_value_set_uint (value, src->reorder_section); + GST_OBJECT_UNLOCK (src); + break; + + case PROP_MAX_RTX_RETRIES: + GST_OBJECT_LOCK (src); + g_value_set_uint (value, src->max_rtx_retries); + GST_OBJECT_UNLOCK (src); + break; + + case PROP_MIN_RTCP_INTERVAL: + g_signal_emit_by_name (src->rtpbin, "get-session", 0, &session); + g_object_get (session, "rtcp-min-interval", &interval, NULL); + g_value_set_uint (value, (guint) (interval / GST_MSECOND)); + g_object_unref (session); + break; + + case PROP_MAX_RTCP_BANDWIDTH: + g_signal_emit_by_name (src->rtpbin, "get-session", 0, &session); + g_object_get_property (G_OBJECT (session), "rtcp-fraction", value); + g_object_unref (session); + break; + + case PROP_STATS_UPDATE_INTERVAL: + g_value_set_uint (value, src->stats_interval); + break; + + case PROP_STATS: + g_value_take_boxed (value, gst_rist_src_create_stats (src)); + break; + + case PROP_CNAME: + g_object_get (src->rtpbin, "sdes", &sdes, NULL); + g_value_set_string (value, gst_structure_get_string (sdes, "cname")); + gst_structure_free (sdes); + break; + + case PROP_MULTICAST_LOOPBACK: + g_object_get_property (G_OBJECT (src->rtp_src), "loop", value); + break; + + case PROP_MULTICAST_IFACE: + g_object_get_property (G_OBJECT (src->rtp_src), "multicast-iface", value); + break; + + case PROP_MULTICAST_TTL: + g_value_set_int (value, src->multicast_ttl); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_rist_src_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstRistSrc *src = GST_RIST_SRC (object); + GstElement *session = NULL; + GstStructure *sdes; + + if (src->construct_failed) + return; + + switch (prop_id) { + case PROP_ADDRESS: + g_object_set_property (G_OBJECT (src->rtp_src), "address", value); + g_object_set_property (G_OBJECT (src->rtcp_src), "address", value); + break; + + case PROP_PORT:{ + guint port = g_value_get_uint (value); + + /* According to 5.1.1, RTCP receiver port most be event number and RTCP + * port should be the RTP port + 1 */ + + if (port & 0x1) { + g_warning ("Invalid RIST port %u, should be an even number.", port); + return; + } + + g_object_set (src->rtp_src, "port", port, NULL); + g_object_set (src->rtcp_src, "port", port + 1, NULL); + break; + } + + case PROP_RECEIVER_BUFFER: + g_object_set (src->rtpbin, "latency", g_value_get_uint (value), NULL); + break; + + case PROP_REORDER_SECTION: + GST_OBJECT_LOCK (src); + src->reorder_section = g_value_get_uint (value); + GST_OBJECT_UNLOCK (src); + break; + + case PROP_MAX_RTX_RETRIES: + GST_OBJECT_LOCK (src); + src->max_rtx_retries = g_value_get_uint (value); + GST_OBJECT_UNLOCK (src); + break; + + case PROP_MIN_RTCP_INTERVAL: + g_signal_emit_by_name (src->rtpbin, "get-session", 0, &session); + g_object_set (session, "rtcp-min-interval", + g_value_get_uint (value) * GST_MSECOND, NULL); + g_object_unref (session); + break; + + case PROP_MAX_RTCP_BANDWIDTH: + g_signal_emit_by_name (src->rtpbin, "get-session", 0, &session); + g_object_set (session, "rtcp-fraction", g_value_get_double (value), NULL); + g_object_unref (session); + break; + + case PROP_STATS_UPDATE_INTERVAL: + src->stats_interval = g_value_get_uint (value); + break; + + case PROP_CNAME: + g_object_get (src->rtpbin, "sdes", &sdes, NULL); + gst_structure_set_value (sdes, "cname", value); + g_object_set (src->rtpbin, "sdes", sdes, NULL); + gst_structure_free (sdes); + break; + + case PROP_MULTICAST_LOOPBACK: + g_object_set_property (G_OBJECT (src->rtp_src), "loop", value); + g_object_set_property (G_OBJECT (src->rtcp_src), "loop", value); + break; + + case PROP_MULTICAST_IFACE: + g_object_set_property (G_OBJECT (src->rtp_src), "multicast-iface", value); + g_object_set_property (G_OBJECT (src->rtcp_src), + "multicast-iface", value); + break; + + case PROP_MULTICAST_TTL: + src->multicast_ttl = g_value_get_int (value); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_rist_src_finalize (GObject * object) +{ + GstRistSrc *src = GST_RIST_SRC (object); + + if (src->jitterbuffer) + gst_object_unref (src->jitterbuffer); + + gst_object_unref (src->rtxbin); + + G_OBJECT_CLASS (gst_rist_src_parent_class)->finalize (object); +} + +static void +gst_rist_src_class_init (GstRistSrcClass * klass) +{ + GstElementClass *element_class = (GstElementClass *) klass; + GObjectClass *object_class = (GObjectClass *) klass; + + gst_element_class_set_metadata (element_class, + "RIST Source", "Source/Network", + "Source that implements RIST TR-06-1 streaming specification", + "Nicolas Dufresne <nicolas.dufresne@collabora.com"); + gst_element_class_add_static_pad_template (element_class, &src_templ); + + element_class->change_state = gst_rist_src_change_state; + + object_class->get_property = gst_rist_src_get_property; + object_class->set_property = gst_rist_src_set_property; + object_class->finalize = gst_rist_src_finalize; + + g_object_class_install_property (object_class, PROP_ADDRESS, + g_param_spec_string ("address", "Address", + "Address to receive packets from (can be IPv4 or IPv6).", "0.0.0.0", + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (object_class, PROP_PORT, + g_param_spec_uint ("port", "Port", "The port to listen for RTP packets, " + "RTCP port is derived from it, this port must be an even number.", + 2, 65534, 5004, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (object_class, PROP_RECEIVER_BUFFER, + g_param_spec_uint ("receiver-buffer", "Receiver Buffer", + "Buffering duration in ms", 0, G_MAXUINT, 1000, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (object_class, PROP_REORDER_SECTION, + g_param_spec_uint ("reorder-section", "Recorder Section", + "Time to wait before sending retransmission request in ms.", + 0, G_MAXUINT, 70, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (object_class, PROP_MAX_RTX_RETRIES, + g_param_spec_uint ("max-rtx-retries", "Maximum Retransmission Retries", + "The maximum number of retransmission requests for a lost packet.", + 0, G_MAXUINT, 7, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (object_class, PROP_MIN_RTCP_INTERVAL, + g_param_spec_uint ("min-rtcp-interval", "Minimum RTCP Intercal", + "The minimum interval in ms between two successive RTCP packets", + 0, 100, 100, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (object_class, PROP_MAX_RTCP_BANDWIDTH, + g_param_spec_double ("max-rtcp-bandwidth", "Maximum RTCP Bandwidth", + "The maximum bandwidth used for RTCP in fraction of RTP bandwdith", + 0.0, 0.05, 0.05, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (object_class, PROP_STATS_UPDATE_INTERVAL, + g_param_spec_uint ("stats-update-interval", "Statistics Update Interval", + "The interval between 'stats' update notification (0 disabled)", + 0, G_MAXUINT, 0, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (object_class, PROP_STATS, + g_param_spec_boxed ("stats", "Statistics", + "Statistic in a GstStructure named 'rist/x-receiver-stats'", + GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (object_class, PROP_CNAME, + g_param_spec_string ("cname", "CName", + "Set the CNAME in the SDES block of the receiver report.", NULL, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (object_class, PROP_MULTICAST_LOOPBACK, + g_param_spec_boolean ("multicast-loopback", "Multicast Loopback", + "When enabled, the packet will be received locally.", FALSE, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (object_class, PROP_MULTICAST_IFACE, + g_param_spec_string ("multicast-iface", "multicast-iface", + "The multicast interface to use to send packets.", NULL, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (object_class, PROP_MULTICAST_TTL, + g_param_spec_int ("multicast-ttl", "Multicast TTL", + "The multicast time-to-live parameter.", 0, 255, 1, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); +} + +static GstURIType +gst_rist_src_uri_get_type (GType type) +{ + return GST_URI_SRC; +} + +static const gchar *const * +gst_rist_src_uri_get_protocols (GType type) +{ + static const char *protocols[] = { "rist", NULL }; + return protocols; +} + +static gchar * +gst_rist_src_uri_get_uri (GstURIHandler * handler) +{ + GstRistSrc *src = GST_RIST_SRC (handler); + gchar *uri = NULL; + + GST_OBJECT_LOCK (src); + if (src->uri) + uri = gst_uri_to_string (src->uri); + GST_OBJECT_UNLOCK (src); + + return uri; +} + +static void +gst_rist_src_uri_query_foreach (const gchar * key, const gchar * value, + GObject * src) +{ + if (g_str_equal (key, "async-handling")) { + GST_WARNING_OBJECT (src, "Setting '%s' property from URI is not allowed.", + key); + return; + } + + GST_DEBUG_OBJECT (src, "Setting property '%s' to '%s'", key, value); + gst_util_set_object_arg (src, key, value); +} + +static gboolean +gst_rist_src_uri_set_uri (GstURIHandler * handler, const gchar * uri, + GError ** error) +{ + GstRistSrc *src = GST_RIST_SRC (handler); + GstUri *gsturi; + GHashTable *query_table; + + if (GST_STATE (src) >= GST_STATE_PAUSED) { + g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_STATE, + "Changing the URI on ristsrc when it is running is not supported"); + GST_ERROR_OBJECT (src, "%s", (*error)->message); + return FALSE; + } + + if (!(gsturi = gst_uri_from_string (uri))) { + g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI, + "Could not parse URI"); + GST_ERROR_OBJECT (src, "%s", (*error)->message); + gst_uri_unref (gsturi); + return FALSE; + } + + GST_OBJECT_LOCK (src); + if (src->uri) + gst_uri_unref (src->uri); + src->uri = gst_uri_ref (gsturi); + GST_OBJECT_UNLOCK (src); + + g_object_set (src, "address", gst_uri_get_host (gsturi), NULL); + if (gst_uri_get_port (gsturi)) + g_object_set (src, "port", gst_uri_get_port (gsturi), NULL); + + query_table = gst_uri_get_query_table (gsturi); + if (query_table) + g_hash_table_foreach (query_table, + (GHFunc) gst_rist_src_uri_query_foreach, src); + + gst_uri_unref (gsturi); + return TRUE; +} + +static void +gst_rist_src_uri_init (gpointer g_iface, gpointer iface_data) +{ + GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface; + + iface->get_type = gst_rist_src_uri_get_type; + iface->get_protocols = gst_rist_src_uri_get_protocols; + iface->get_uri = gst_rist_src_uri_get_uri; + iface->set_uri = gst_rist_src_uri_set_uri; +} diff --git a/gst/rist/meson.build b/gst/rist/meson.build new file mode 100644 index 000000000..9b8cf9897 --- /dev/null +++ b/gst/rist/meson.build @@ -0,0 +1,17 @@ +rist_sources = [ + 'gstristrtxsend.c', + 'gstristrtxreceive.c', + 'gstristsrc.c', + 'gstristsink.c', + 'gstristplugin.c', +] + +gstrist = library('gstrist', + rist_sources, + c_args : gst_plugins_bad_args, + include_directories : [configinc], + dependencies : [gstrtp_dep, gstnet_dep, gio_dep], + install : true, + install_dir : plugins_install_dir, +) +pkgconfig.generate(gstrist, install_dir : plugins_pkgconfig_install_dir) diff --git a/meson_options.txt b/meson_options.txt index 743951350..4d067f1e8 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -48,6 +48,7 @@ option('pnm', type : 'feature', value : 'auto') option('proxy', type : 'feature', value : 'auto') option('rawparse', type : 'feature', value : 'auto') option('removesilence', type : 'feature', value : 'auto') +option('rist', type : 'feature', value : 'auto') option('sdp', type : 'feature', value : 'auto') option('segmentclip', type : 'feature', value : 'auto') option('siren', type : 'feature', value : 'auto') |