summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gst/rtpmanager/gstrtpssrcdemux.c36
-rw-r--r--tests/check/elements/rtpssrcdemux.c109
2 files changed, 125 insertions, 20 deletions
diff --git a/gst/rtpmanager/gstrtpssrcdemux.c b/gst/rtpmanager/gstrtpssrcdemux.c
index 7b8d0960d..132c52d10 100644
--- a/gst/rtpmanager/gstrtpssrcdemux.c
+++ b/gst/rtpmanager/gstrtpssrcdemux.c
@@ -83,6 +83,10 @@ GST_STATIC_PAD_TEMPLATE ("rtcp_src_%u",
#define INTERNAL_STREAM_LOCK(obj) (g_rec_mutex_lock (&(obj)->padlock))
#define INTERNAL_STREAM_UNLOCK(obj) (g_rec_mutex_unlock (&(obj)->padlock))
+#define GST_PAD_FLAG_STICKIES_SENT (GST_PAD_FLAG_LAST << 0)
+#define GST_PAD_STICKIES_SENT(pad) (GST_OBJECT_FLAG_IS_SET (pad, GST_PAD_FLAG_STICKIES_SENT))
+#define GST_PAD_SET_STICKIES_SENT(pad) (GST_OBJECT_FLAG_SET (pad, GST_PAD_FLAG_STICKIES_SENT))
+
typedef enum
{
RTP_PAD,
@@ -244,13 +248,11 @@ forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
GstEvent *newevent;
newevent = add_ssrc_and_ref (*event, data->ssrc);
-
gst_pad_push_event (data->pad, newevent);
return TRUE;
}
-/* With internal stream lock held */
static void
forward_initial_events (GstRtpSsrcDemux * demux, guint32 ssrc, GstPad * pad,
PadType padtype)
@@ -318,9 +320,6 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
dpads->rtp_pad = rtp_pad;
dpads->rtcp_pad = rtcp_pad;
- gst_pad_set_element_private (rtp_pad, dpads);
- gst_pad_set_element_private (rtcp_pad, dpads);
-
GST_OBJECT_LOCK (demux);
demux->srcpads = g_slist_prepend (demux->srcpads, dpads);
GST_OBJECT_UNLOCK (demux);
@@ -338,9 +337,6 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
gst_pad_use_fixed_caps (rtcp_pad);
gst_pad_set_active (rtcp_pad, TRUE);
- forward_initial_events (demux, ssrc, rtp_pad, RTP_PAD);
- forward_initial_events (demux, ssrc, rtcp_pad, RTCP_PAD);
-
gst_element_add_pad (GST_ELEMENT_CAST (demux), rtp_pad);
gst_element_add_pad (GST_ELEMENT_CAST (demux), rtcp_pad);
@@ -606,6 +602,13 @@ forward_event (GstPad * pad, gpointer user_data)
GSList *walk = NULL;
GstEvent *newevent = NULL;
+ /* special case for EOS */
+ if (GST_EVENT_TYPE (fdata->event) == GST_EVENT_EOS)
+ GST_PAD_SET_STICKIES_SENT (pad);
+
+ if (GST_EVENT_IS_STICKY (fdata->event) && !GST_PAD_STICKIES_SENT (pad))
+ return FALSE;
+
GST_OBJECT_LOCK (fdata->demux);
for (walk = fdata->demux->srcpads; walk; walk = walk->next) {
GstRtpSsrcDemuxPads *dpads = (GstRtpSsrcDemuxPads *) walk->data;
@@ -668,6 +671,11 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
if (srcpad == NULL)
goto create_failed;
+ if (!GST_PAD_STICKIES_SENT (srcpad)) {
+ forward_initial_events (demux, ssrc, srcpad, RTP_PAD);
+ GST_PAD_SET_STICKIES_SENT (srcpad);
+ }
+
/* push to srcpad */
ret = gst_pad_push (srcpad, buf);
@@ -758,6 +766,11 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstObject * parent,
if (srcpad == NULL)
goto create_failed;
+ if (!GST_PAD_STICKIES_SENT (srcpad)) {
+ forward_initial_events (demux, ssrc, srcpad, RTCP_PAD);
+ GST_PAD_SET_STICKIES_SENT (srcpad);
+ }
+
/* push to srcpad */
ret = gst_pad_push (srcpad, buf);
@@ -944,17 +957,12 @@ gst_rtp_ssrc_demux_src_query (GstPad * pad, GstObject * parent,
if ((res = gst_pad_peer_query (demux->rtp_sink, query))) {
gboolean live;
GstClockTime min_latency, max_latency;
- GstRtpSsrcDemuxPads *dpads;
-
- dpads = gst_pad_get_element_private (pad);
gst_query_parse_latency (query, &live, &min_latency, &max_latency);
- GST_DEBUG_OBJECT (demux, "peer min latency %" GST_TIME_FORMAT,
+ GST_DEBUG_OBJECT (pad, "peer min latency %" GST_TIME_FORMAT,
GST_TIME_ARGS (min_latency));
- GST_DEBUG_OBJECT (demux, "latency for SSRC %08x", dpads->ssrc);
-
gst_query_set_latency (query, live, min_latency, max_latency);
}
break;
diff --git a/tests/check/elements/rtpssrcdemux.c b/tests/check/elements/rtpssrcdemux.c
index 691fa4140..831d757b8 100644
--- a/tests/check/elements/rtpssrcdemux.c
+++ b/tests/check/elements/rtpssrcdemux.c
@@ -20,10 +20,22 @@
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
#include <gst/rtp/gstrtpbuffer.h>
+#include <gst/rtp/gstrtcpbuffer.h>
+
#include <gst/check/gstcheck.h>
#include <gst/check/gstharness.h>
+#ifdef HAVE_VALGRIND
+# include <valgrind/valgrind.h>
+#else
+# define RUNNING_ON_VALGRIND 0
+#endif
+
#define TEST_BUF_CLOCK_RATE 8000
#define TEST_BUF_PT 0
#define TEST_BUF_SSRC 0x01BADBAD
@@ -68,6 +80,7 @@ create_buffer (guint seq_num, guint32 ssrc)
return buf;
}
+
typedef struct
{
GstHarness *rtp_sink;
@@ -158,11 +171,7 @@ GST_START_TEST (test_event_forwarding)
gst_harness_push_event (ctx.rtcp_sink, gst_event_new_eos ());
g_assert_cmpint (gst_harness_events_in_queue (ctx.rtp_src), ==, 0);
- g_assert_cmpint (gst_harness_events_in_queue (ctx.rtcp_src), ==, 2);
-
- event = gst_harness_pull_event (ctx.rtcp_src);
- g_assert_cmpint (event->type, ==, GST_EVENT_STREAM_START);
- gst_event_unref (event);
+ g_assert_cmpint (gst_harness_events_in_queue (ctx.rtcp_src), ==, 1);
event = gst_harness_pull_event (ctx.rtcp_src);
g_assert_cmpint (event->type, ==, GST_EVENT_EOS);
@@ -278,7 +287,7 @@ GST_START_TEST (test_rtpssrcdemux_max_streams)
GST_END_TEST;
static void
-new_rtcp_ssrc_pad_found (GstElement * element, G_GNUC_UNUSED guint ssrc,
+new_rtcp_ssrc_pad_found (GstElement * element, guint ssrc,
G_GNUC_UNUSED GstPad * rtp_pad, GSList ** src_h)
{
GstHarness *h;
@@ -355,7 +364,94 @@ GST_START_TEST (test_rtpssrcdemux_invalid_rtcp)
GST_END_TEST;
+static GstBuffer *
+generate_rtcp_sr_buffer (guint ssrc)
+{
+ GstBuffer *buf;
+ GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT;
+ GstRTCPPacket packet;
+
+ buf = gst_rtcp_buffer_new (1000);
+ fail_unless (gst_rtcp_buffer_map (buf, GST_MAP_READWRITE, &rtcp));
+ fail_unless (gst_rtcp_buffer_add_packet (&rtcp, GST_RTCP_TYPE_SR, &packet));
+ gst_rtcp_packet_sr_set_sender_info (&packet, ssrc, 0, 0, 1, 1);
+ gst_rtcp_buffer_unmap (&rtcp);
+ return buf;
+}
+
+typedef struct
+{
+ GstHarness *rtp_h;
+ GstHarness *rtcp_h;
+} SimulCtx;
+
+static void
+_simul_ctx_new_ssrc_pad_cb (GstElement * element, guint ssrc,
+ GstPad * rtp_pad, SimulCtx * ctx)
+{
+ GstPad *rtcp_pad;
+ gchar *name;
+
+ gst_harness_add_element_src_pad (ctx->rtp_h, rtp_pad);
+
+ name = g_strdup_printf ("rtcp_src_%u", ssrc);
+ rtcp_pad = gst_element_get_static_pad (element, name);
+ gst_harness_add_element_src_pad (ctx->rtcp_h, rtcp_pad);
+ gst_object_unref (rtcp_pad);
+ g_free (name);
+}
+
+static gpointer
+_simul_ctx_push_rtp_buffers (gpointer user_data)
+{
+ SimulCtx *ctx = user_data;
+
+ gst_harness_set_src_caps_str (ctx->rtp_h, "application/x-rtp");
+ gst_harness_push (ctx->rtp_h, create_buffer (0, 1111));
+ return NULL;
+}
+
+static gpointer
+_simul_ctx_push_rtcp_buffers (gpointer user_data)
+{
+ SimulCtx *ctx = user_data;
+
+ g_usleep (10);
+ gst_harness_set_src_caps_str (ctx->rtcp_h, "application/x-rtcp");
+ gst_harness_push (ctx->rtcp_h, generate_rtcp_sr_buffer (1111));
+ return NULL;
+}
+
+GST_START_TEST (test_rtp_and_rtcp_arrives_simultaneously)
+{
+ guint r;
+ guint repeats = 1000;
+ if (RUNNING_ON_VALGRIND)
+ repeats = 2;
+
+ for (r = 0; r < repeats; r++) {
+ SimulCtx ctx;
+ GThread *t0, *t1;
+
+ ctx.rtp_h = gst_harness_new_with_padnames ("rtpssrcdemux", "sink", NULL);
+ ctx.rtcp_h =
+ gst_harness_new_with_element (ctx.rtp_h->element, "rtcp_sink", NULL);
+
+ g_signal_connect (ctx.rtp_h->element,
+ "new-ssrc-pad", (GCallback) _simul_ctx_new_ssrc_pad_cb, &ctx);
+ t0 = g_thread_new ("push rtp", _simul_ctx_push_rtp_buffers, &ctx);
+ t1 = g_thread_new ("push rtcp", _simul_ctx_push_rtcp_buffers, &ctx);
+
+ g_thread_join (t0);
+ g_thread_join (t1);
+
+ gst_harness_teardown (ctx.rtp_h);
+ gst_harness_teardown (ctx.rtcp_h);
+ }
+}
+
+GST_END_TEST;
static Suite *
rtpssrcdemux_suite (void)
@@ -370,6 +466,7 @@ rtpssrcdemux_suite (void)
tcase_add_test (tc_chain, test_rtpssrcdemux_rtcp_app);
tcase_add_test (tc_chain, test_rtpssrcdemux_invalid_rtp);
tcase_add_test (tc_chain, test_rtpssrcdemux_invalid_rtcp);
+ tcase_add_test (tc_chain, test_rtp_and_rtcp_arrives_simultaneously);
return s;
}