diff options
-rw-r--r-- | gst/rtpmanager/gstrtpssrcdemux.c | 36 | ||||
-rw-r--r-- | tests/check/elements/rtpssrcdemux.c | 109 |
2 files changed, 125 insertions, 20 deletions
diff --git a/gst/rtpmanager/gstrtpssrcdemux.c b/gst/rtpmanager/gstrtpssrcdemux.c index 7b8d0960d..132c52d10 100644 --- a/gst/rtpmanager/gstrtpssrcdemux.c +++ b/gst/rtpmanager/gstrtpssrcdemux.c @@ -83,6 +83,10 @@ GST_STATIC_PAD_TEMPLATE ("rtcp_src_%u", #define INTERNAL_STREAM_LOCK(obj) (g_rec_mutex_lock (&(obj)->padlock)) #define INTERNAL_STREAM_UNLOCK(obj) (g_rec_mutex_unlock (&(obj)->padlock)) +#define GST_PAD_FLAG_STICKIES_SENT (GST_PAD_FLAG_LAST << 0) +#define GST_PAD_STICKIES_SENT(pad) (GST_OBJECT_FLAG_IS_SET (pad, GST_PAD_FLAG_STICKIES_SENT)) +#define GST_PAD_SET_STICKIES_SENT(pad) (GST_OBJECT_FLAG_SET (pad, GST_PAD_FLAG_STICKIES_SENT)) + typedef enum { RTP_PAD, @@ -244,13 +248,11 @@ forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data) GstEvent *newevent; newevent = add_ssrc_and_ref (*event, data->ssrc); - gst_pad_push_event (data->pad, newevent); return TRUE; } -/* With internal stream lock held */ static void forward_initial_events (GstRtpSsrcDemux * demux, guint32 ssrc, GstPad * pad, PadType padtype) @@ -318,9 +320,6 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, dpads->rtp_pad = rtp_pad; dpads->rtcp_pad = rtcp_pad; - gst_pad_set_element_private (rtp_pad, dpads); - gst_pad_set_element_private (rtcp_pad, dpads); - GST_OBJECT_LOCK (demux); demux->srcpads = g_slist_prepend (demux->srcpads, dpads); GST_OBJECT_UNLOCK (demux); @@ -338,9 +337,6 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, gst_pad_use_fixed_caps (rtcp_pad); gst_pad_set_active (rtcp_pad, TRUE); - forward_initial_events (demux, ssrc, rtp_pad, RTP_PAD); - forward_initial_events (demux, ssrc, rtcp_pad, RTCP_PAD); - gst_element_add_pad (GST_ELEMENT_CAST (demux), rtp_pad); gst_element_add_pad (GST_ELEMENT_CAST (demux), rtcp_pad); @@ -606,6 +602,13 @@ forward_event (GstPad * pad, gpointer user_data) GSList *walk = NULL; GstEvent *newevent = NULL; + /* special case for EOS */ + if (GST_EVENT_TYPE (fdata->event) == GST_EVENT_EOS) + GST_PAD_SET_STICKIES_SENT (pad); + + if (GST_EVENT_IS_STICKY (fdata->event) && !GST_PAD_STICKIES_SENT (pad)) + return FALSE; + GST_OBJECT_LOCK (fdata->demux); for (walk = fdata->demux->srcpads; walk; walk = walk->next) { GstRtpSsrcDemuxPads *dpads = (GstRtpSsrcDemuxPads *) walk->data; @@ -668,6 +671,11 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf) if (srcpad == NULL) goto create_failed; + if (!GST_PAD_STICKIES_SENT (srcpad)) { + forward_initial_events (demux, ssrc, srcpad, RTP_PAD); + GST_PAD_SET_STICKIES_SENT (srcpad); + } + /* push to srcpad */ ret = gst_pad_push (srcpad, buf); @@ -758,6 +766,11 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstObject * parent, if (srcpad == NULL) goto create_failed; + if (!GST_PAD_STICKIES_SENT (srcpad)) { + forward_initial_events (demux, ssrc, srcpad, RTCP_PAD); + GST_PAD_SET_STICKIES_SENT (srcpad); + } + /* push to srcpad */ ret = gst_pad_push (srcpad, buf); @@ -944,17 +957,12 @@ gst_rtp_ssrc_demux_src_query (GstPad * pad, GstObject * parent, if ((res = gst_pad_peer_query (demux->rtp_sink, query))) { gboolean live; GstClockTime min_latency, max_latency; - GstRtpSsrcDemuxPads *dpads; - - dpads = gst_pad_get_element_private (pad); gst_query_parse_latency (query, &live, &min_latency, &max_latency); - GST_DEBUG_OBJECT (demux, "peer min latency %" GST_TIME_FORMAT, + GST_DEBUG_OBJECT (pad, "peer min latency %" GST_TIME_FORMAT, GST_TIME_ARGS (min_latency)); - GST_DEBUG_OBJECT (demux, "latency for SSRC %08x", dpads->ssrc); - gst_query_set_latency (query, live, min_latency, max_latency); } break; diff --git a/tests/check/elements/rtpssrcdemux.c b/tests/check/elements/rtpssrcdemux.c index 691fa4140..831d757b8 100644 --- a/tests/check/elements/rtpssrcdemux.c +++ b/tests/check/elements/rtpssrcdemux.c @@ -20,10 +20,22 @@ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, * Boston, MA 02110-1301, USA. */ +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + #include <gst/rtp/gstrtpbuffer.h> +#include <gst/rtp/gstrtcpbuffer.h> + #include <gst/check/gstcheck.h> #include <gst/check/gstharness.h> +#ifdef HAVE_VALGRIND +# include <valgrind/valgrind.h> +#else +# define RUNNING_ON_VALGRIND 0 +#endif + #define TEST_BUF_CLOCK_RATE 8000 #define TEST_BUF_PT 0 #define TEST_BUF_SSRC 0x01BADBAD @@ -68,6 +80,7 @@ create_buffer (guint seq_num, guint32 ssrc) return buf; } + typedef struct { GstHarness *rtp_sink; @@ -158,11 +171,7 @@ GST_START_TEST (test_event_forwarding) gst_harness_push_event (ctx.rtcp_sink, gst_event_new_eos ()); g_assert_cmpint (gst_harness_events_in_queue (ctx.rtp_src), ==, 0); - g_assert_cmpint (gst_harness_events_in_queue (ctx.rtcp_src), ==, 2); - - event = gst_harness_pull_event (ctx.rtcp_src); - g_assert_cmpint (event->type, ==, GST_EVENT_STREAM_START); - gst_event_unref (event); + g_assert_cmpint (gst_harness_events_in_queue (ctx.rtcp_src), ==, 1); event = gst_harness_pull_event (ctx.rtcp_src); g_assert_cmpint (event->type, ==, GST_EVENT_EOS); @@ -278,7 +287,7 @@ GST_START_TEST (test_rtpssrcdemux_max_streams) GST_END_TEST; static void -new_rtcp_ssrc_pad_found (GstElement * element, G_GNUC_UNUSED guint ssrc, +new_rtcp_ssrc_pad_found (GstElement * element, guint ssrc, G_GNUC_UNUSED GstPad * rtp_pad, GSList ** src_h) { GstHarness *h; @@ -355,7 +364,94 @@ GST_START_TEST (test_rtpssrcdemux_invalid_rtcp) GST_END_TEST; +static GstBuffer * +generate_rtcp_sr_buffer (guint ssrc) +{ + GstBuffer *buf; + GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT; + GstRTCPPacket packet; + + buf = gst_rtcp_buffer_new (1000); + fail_unless (gst_rtcp_buffer_map (buf, GST_MAP_READWRITE, &rtcp)); + fail_unless (gst_rtcp_buffer_add_packet (&rtcp, GST_RTCP_TYPE_SR, &packet)); + gst_rtcp_packet_sr_set_sender_info (&packet, ssrc, 0, 0, 1, 1); + gst_rtcp_buffer_unmap (&rtcp); + return buf; +} + +typedef struct +{ + GstHarness *rtp_h; + GstHarness *rtcp_h; +} SimulCtx; + +static void +_simul_ctx_new_ssrc_pad_cb (GstElement * element, guint ssrc, + GstPad * rtp_pad, SimulCtx * ctx) +{ + GstPad *rtcp_pad; + gchar *name; + + gst_harness_add_element_src_pad (ctx->rtp_h, rtp_pad); + + name = g_strdup_printf ("rtcp_src_%u", ssrc); + rtcp_pad = gst_element_get_static_pad (element, name); + gst_harness_add_element_src_pad (ctx->rtcp_h, rtcp_pad); + gst_object_unref (rtcp_pad); + g_free (name); +} + +static gpointer +_simul_ctx_push_rtp_buffers (gpointer user_data) +{ + SimulCtx *ctx = user_data; + + gst_harness_set_src_caps_str (ctx->rtp_h, "application/x-rtp"); + gst_harness_push (ctx->rtp_h, create_buffer (0, 1111)); + return NULL; +} + +static gpointer +_simul_ctx_push_rtcp_buffers (gpointer user_data) +{ + SimulCtx *ctx = user_data; + + g_usleep (10); + gst_harness_set_src_caps_str (ctx->rtcp_h, "application/x-rtcp"); + gst_harness_push (ctx->rtcp_h, generate_rtcp_sr_buffer (1111)); + return NULL; +} + +GST_START_TEST (test_rtp_and_rtcp_arrives_simultaneously) +{ + guint r; + guint repeats = 1000; + if (RUNNING_ON_VALGRIND) + repeats = 2; + + for (r = 0; r < repeats; r++) { + SimulCtx ctx; + GThread *t0, *t1; + + ctx.rtp_h = gst_harness_new_with_padnames ("rtpssrcdemux", "sink", NULL); + ctx.rtcp_h = + gst_harness_new_with_element (ctx.rtp_h->element, "rtcp_sink", NULL); + + g_signal_connect (ctx.rtp_h->element, + "new-ssrc-pad", (GCallback) _simul_ctx_new_ssrc_pad_cb, &ctx); + t0 = g_thread_new ("push rtp", _simul_ctx_push_rtp_buffers, &ctx); + t1 = g_thread_new ("push rtcp", _simul_ctx_push_rtcp_buffers, &ctx); + + g_thread_join (t0); + g_thread_join (t1); + + gst_harness_teardown (ctx.rtp_h); + gst_harness_teardown (ctx.rtcp_h); + } +} + +GST_END_TEST; static Suite * rtpssrcdemux_suite (void) @@ -370,6 +466,7 @@ rtpssrcdemux_suite (void) tcase_add_test (tc_chain, test_rtpssrcdemux_rtcp_app); tcase_add_test (tc_chain, test_rtpssrcdemux_invalid_rtp); tcase_add_test (tc_chain, test_rtpssrcdemux_invalid_rtcp); + tcase_add_test (tc_chain, test_rtp_and_rtcp_arrives_simultaneously); return s; } |