summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorAlessandro Decina <alessandro.d@gmail.com>2013-09-16 08:35:37 +0200
committerSebastian Dröge <sebastian@centricular.com>2013-11-11 16:50:42 +0100
commitf52b5ddcd2de711c1e095d6b4875566cffcc8244 (patch)
tree6c012b5946832f135de8741c335cdfe6605a3eaf /tests
parentd0636238519875fa25c5b544d6af54c96892d3fd (diff)
downloadgstreamer-f52b5ddcd2de711c1e095d6b4875566cffcc8244.tar.gz
tests: collectpads: add flushing seek tests
https://bugzilla.gnome.org/show_bug.cgi?id=708416
Diffstat (limited to 'tests')
-rw-r--r--tests/check/libs/collectpads.c237
1 files changed, 234 insertions, 3 deletions
diff --git a/tests/check/libs/collectpads.c b/tests/check/libs/collectpads.c
index 6de82d8cda..689353ea81 100644
--- a/tests/check/libs/collectpads.c
+++ b/tests/check/libs/collectpads.c
@@ -291,6 +291,7 @@ typedef struct
GstPad *pad;
GstBuffer *buffer;
GstEvent *event;
+ GstFlowReturn expected_result;
} TestData;
static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
@@ -305,10 +306,13 @@ static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
static GstCollectPads *collect;
static gboolean collected;
-static GstPad *srcpad1, *srcpad2;
+static GstPad *agg_srcpad, *srcpad1, *srcpad2;
static GstPad *sinkpad1, *sinkpad2;
static TestData *data1, *data2;
static GstBuffer *outbuf1, *outbuf2;
+static GstElement *agg;
+gboolean fail_seek;
+gint flush_start_events, flush_stop_events;
static GMutex lock;
static GCond cond;
@@ -362,7 +366,7 @@ push_buffer (gpointer user_data)
gst_pad_push_event (test_data->pad, gst_event_new_segment (&segment));
flow = gst_pad_push (test_data->pad, test_data->buffer);
- fail_unless (flow == GST_FLOW_OK, "got flow %s instead of OK",
+ fail_unless (flow == test_data->expected_result, "got flow %s instead of OK",
gst_flow_get_name (flow));
return NULL;
@@ -779,16 +783,237 @@ GST_START_TEST (test_branched_pipeline)
GST_END_TEST;
+static GstPadProbeReturn
+downstream_probe_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
+{
+ if (info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
+ if (GST_EVENT_TYPE (GST_PAD_PROBE_INFO_EVENT (info)) ==
+ GST_EVENT_FLUSH_START)
+ g_atomic_int_inc (&flush_start_events);
+ else if (GST_EVENT_TYPE (GST_PAD_PROBE_INFO_EVENT (info)) ==
+ GST_EVENT_FLUSH_STOP)
+ g_atomic_int_inc (&flush_stop_events);
+ } else if (info->type & GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM) {
+ g_mutex_lock (&lock);
+ collected = TRUE;
+ g_cond_signal (&cond);
+ g_mutex_unlock (&lock);
+ }
+
+ return GST_PAD_PROBE_DROP;
+}
+
+static gboolean
+src_event (GstPad * pad, GstObject * parent, GstEvent * event)
+{
+ gboolean ret = TRUE;
+ if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK) {
+ if (g_atomic_int_compare_and_exchange (&fail_seek, TRUE, FALSE) == TRUE) {
+ ret = FALSE;
+ }
+ }
+
+ gst_event_unref (event);
+ return ret;
+}
+
+static gboolean
+agg_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
+{
+ return gst_collect_pads_src_event_default (GST_AGGREGATOR (parent)->collect,
+ pad, event);
+}
+
+static GstPad *
+setup_src_pad (GstElement * element,
+ GstStaticPadTemplate * tmpl, const char *name)
+{
+ GstPad *srcpad, *sinkpad;
+
+ srcpad = gst_pad_new_from_static_template (tmpl, "src");
+ sinkpad = gst_element_get_request_pad (element, name);
+ fail_unless (gst_pad_link (srcpad, sinkpad) == GST_PAD_LINK_OK,
+ "Could not link source and %s sink pads", GST_ELEMENT_NAME (element));
+ gst_pad_set_event_function (srcpad, src_event);
+ gst_pad_set_active (srcpad, TRUE);
+
+ return srcpad;
+}
+
+static void
+flush_setup (void)
+{
+ agg = gst_check_setup_element ("aggregator");
+ agg_srcpad = gst_element_get_static_pad (agg, "src");
+ srcpad1 = setup_src_pad (agg, &srctemplate, "sink_0");
+ srcpad2 = setup_src_pad (agg, &srctemplate, "sink_1");
+ gst_pad_add_probe (agg_srcpad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM |
+ GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM |
+ GST_PAD_PROBE_TYPE_EVENT_FLUSH, downstream_probe_cb, NULL, NULL);
+ gst_pad_set_event_function (agg_srcpad, agg_src_event);
+ data1 = g_new0 (TestData, 1);
+ data2 = g_new0 (TestData, 1);
+ g_atomic_int_set (&flush_start_events, 0);
+ g_atomic_int_set (&flush_stop_events, 0);
+ gst_element_set_state (agg, GST_STATE_PLAYING);
+}
+
+static void
+flush_teardown (void)
+{
+ gst_element_set_state (agg, GST_STATE_NULL);
+ gst_object_unref (agg);
+ gst_object_unref (agg_srcpad);
+ gst_object_unref (srcpad1);
+ gst_object_unref (srcpad2);
+ g_free (data1);
+ g_free (data2);
+}
+
+GST_START_TEST (test_flushing_seek_failure)
+{
+ GstBuffer *buf1, *buf2;
+ GThread *thread1, *thread2;
+ GstEvent *event;
+
+ /* Queue a buffer in agg:sink_1. Do a flushing seek and simulate one upstream
+ * element failing to handle the seek (see src_event()). Check that the
+ * flushing seek logic doesn't get triggered by checking that the buffer
+ * queued on agg:sink_1 doesn't get flushed.
+ */
+
+ /* queue a buffer in agg:sink_1 */
+ buf2 = gst_buffer_new_allocate (NULL, 1, NULL);
+ GST_BUFFER_TIMESTAMP (buf2) = GST_SECOND;
+ data2->pad = srcpad2;
+ data2->buffer = buf2;
+ thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL);
+ fail_unless_collected (FALSE);
+
+ /* do the seek */
+ event = gst_event_new_seek (1, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH,
+ GST_SEEK_TYPE_SET, 0, GST_SEEK_TYPE_SET, 10 * GST_SECOND);
+ g_atomic_int_set (&fail_seek, TRUE);
+ fail_if (gst_pad_send_event (agg_srcpad, event));
+
+ /* flush srcpad1 (pretending it's the upstream that didn't fail to seek) */
+ fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_start ()));
+ fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_stop (TRUE)));
+
+ /* check that the flush events reached agg:src */
+ fail_unless_equals_int (flush_start_events, 1);
+ fail_unless_equals_int (flush_stop_events, 1);
+
+ /* push a buffer on agg:sink_0. This should trigger a collect since agg:sink_1
+ * should not have been flushed at this point */
+ buf1 = gst_buffer_new_allocate (NULL, 1, NULL);
+ GST_BUFFER_TIMESTAMP (buf1) = 0;
+ data1->pad = srcpad1;
+ data1->buffer = buf1;
+ thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
+ fail_unless_collected (TRUE);
+ collected = FALSE;
+
+ /* at this point thread1 must have returned */
+ g_thread_join (thread1);
+
+ /* push eos on agg:sink_0 so the buffer queued in agg:sink_1 is collected and
+ * the pushing thread returns */
+ data1->pad = srcpad1;
+ data1->event = gst_event_new_eos ();
+ thread1 = g_thread_try_new ("gst-check", push_event, data1, NULL);
+ fail_unless_collected (TRUE);
+
+ g_thread_join (thread1);
+ g_thread_join (thread2);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_flushing_seek)
+{
+ GstBuffer *buf1, *buf2;
+ GThread *thread1, *thread2;
+ GstEvent *event;
+
+ /* Queue a buffer in agg:sink_1. Then do a flushing seek and check that the
+ * new flushing seek logic is triggered. On the first FLUSH_START call the
+ * buffers queued in collectpads should get flushed. Only one FLUSH_START and
+ * one FLUSH_STOP should be forwarded downstream.
+ */
+ buf2 = gst_buffer_new_allocate (NULL, 1, NULL);
+ GST_BUFFER_TIMESTAMP (buf2) = 0;
+ data2->pad = srcpad2;
+ data2->buffer = buf2;
+ /* expect this buffer to be flushed */
+ data2->expected_result = GST_FLOW_FLUSHING;
+ thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL);
+
+ /* now do a successful flushing seek */
+ event = gst_event_new_seek (1, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH,
+ GST_SEEK_TYPE_SET, 0, GST_SEEK_TYPE_SET, 10 * GST_SECOND);
+ g_atomic_int_set (&fail_seek, FALSE);
+ fail_unless (gst_pad_send_event (agg_srcpad, event));
+
+ /* flushing starts once one of the upstream elements sends the first
+ * FLUSH_START */
+ fail_unless_equals_int (flush_start_events, 0);
+ fail_unless_equals_int (flush_stop_events, 0);
+
+ /* flush ogg:sink_0. This flushs collectpads, calls ::flush() and sends
+ * FLUSH_START downstream */
+ fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_start ()));
+ fail_unless_equals_int (flush_start_events, 1);
+ fail_unless_equals_int (flush_stop_events, 0);
+ /* the first FLUSH_STOP is forwarded downstream */
+ fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_stop (TRUE)));
+ fail_unless_equals_int (flush_start_events, 1);
+ fail_unless_equals_int (flush_stop_events, 1);
+ /* at this point even the other pad agg:sink_1 should be flushing so thread2
+ * should have stopped */
+ g_thread_join (thread2);
+
+ /* push a buffer on agg:sink_0 to trigger one collect after flushing to verify
+ * that flushing completes once all the pads have been flushed */
+ buf1 = gst_buffer_new_allocate (NULL, 1, NULL);
+ GST_BUFFER_TIMESTAMP (buf1) = GST_SECOND;
+ data1->pad = srcpad1;
+ data1->buffer = buf1;
+ thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
+
+ /* flush agg:sink_1 as well. This completes the flushing seek so a FLUSH_STOP is
+ * sent downstream */
+ gst_pad_push_event (srcpad2, gst_event_new_flush_start ());
+ gst_pad_push_event (srcpad2, gst_event_new_flush_stop (TRUE));
+
+ /* still, only one FLUSH_START and one FLUSH_STOP are forwarded downstream */
+ fail_unless_equals_int (flush_start_events, 1);
+ fail_unless_equals_int (flush_stop_events, 1);
+
+ /* EOS agg:sink_1 so the buffer queued in agg:sink_0 is collected */
+ data2->pad = srcpad2;
+ data2->event = gst_event_new_eos ();
+ thread2 = g_thread_try_new ("gst-check", push_event, data2, NULL);
+ fail_unless_collected (TRUE);
+
+ /* these will return immediately as at this point the threads have been
+ * unlocked and are finished */
+ g_thread_join (thread1);
+ g_thread_join (thread2);
+}
+
+GST_END_TEST;
static Suite *
gst_collect_pads_suite (void)
{
Suite *suite;
- TCase *general, *buffers, *pipeline;
+ TCase *general, *buffers, *pipeline, *flush;
gst_agregator_plugin_register ();
suite = suite_create ("GstCollectPads");
+
general = tcase_create ("general");
suite_add_tcase (suite, general);
tcase_add_checked_fixture (general, setup, teardown);
@@ -807,6 +1032,12 @@ gst_collect_pads_suite (void)
tcase_add_test (pipeline, test_linear_pipeline);
tcase_add_test (pipeline, test_branched_pipeline);
+ flush = tcase_create ("flush");
+ suite_add_tcase (suite, flush);
+ tcase_add_checked_fixture (flush, flush_setup, flush_teardown);
+ tcase_add_test (flush, test_flushing_seek_failure);
+ tcase_add_test (flush, test_flushing_seek);
+
return suite;
}