summaryrefslogtreecommitdiff
path: root/gst-libs/gst/adaptivedemux/gstadaptivedemux.c
diff options
context:
space:
mode:
authorJan Schmidt <jan@centricular.com>2017-01-07 12:12:05 +0900
committerJan Schmidt <jan@centricular.com>2017-02-07 23:53:30 +1100
commitb2113f69c622c37c32d4336025f80a5c1d190897 (patch)
treefb6a37ca8f3dee0d26a2572a3fbeb97c9bfc0ebb /gst-libs/gst/adaptivedemux/gstadaptivedemux.c
parent9b778f7264a28e42fcbbafa7948ed3415b8edb44 (diff)
downloadgstreamer-plugins-bad-b2113f69c622c37c32d4336025f80a5c1d190897.tar.gz
adaptivedemux: Preroll streams before exposing them
To ensure that pads have caps when they are exposed, do the exposing when all pending streams have prerolled an output buffer, and only then EOS and remove any old pads. Improves the switching sequence by making caps available as soon as a pad appears. With fixes from Seungha Yang <sh.yang@lge.com> https://bugzilla.gnome.org/show_bug.cgi?id=758257
Diffstat (limited to 'gst-libs/gst/adaptivedemux/gstadaptivedemux.c')
-rw-r--r--gst-libs/gst/adaptivedemux/gstadaptivedemux.c208
1 files changed, 177 insertions, 31 deletions
diff --git a/gst-libs/gst/adaptivedemux/gstadaptivedemux.c b/gst-libs/gst/adaptivedemux/gstadaptivedemux.c
index ae61a5fea..ce648cd14 100644
--- a/gst-libs/gst/adaptivedemux/gstadaptivedemux.c
+++ b/gst-libs/gst/adaptivedemux/gstadaptivedemux.c
@@ -198,6 +198,11 @@ struct _GstAdaptiveDemuxPrivate
GCond manifest_cond;
GMutex manifest_update_lock;
+ /* Lock and condition for prerolling streams before exposing */
+ GMutex preroll_lock;
+ GCond preroll_cond;
+ gint preroll_pending;
+
GMutex api_lock;
/* Protects demux and stream segment information
@@ -242,8 +247,9 @@ static void gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux);
static void gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream *
stream);
static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux);
-static gboolean gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux,
+static gboolean gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
gboolean first_and_live);
+static gboolean gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux);
static gboolean gst_adaptive_demux_is_live (GstAdaptiveDemux * demux);
static GstFlowReturn gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream, gboolean forward, GstSeekFlags flags,
@@ -270,7 +276,8 @@ static GstFlowReturn
gst_adaptive_demux_stream_push_event (GstAdaptiveDemuxStream * stream,
GstEvent * event);
-static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux);
+static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux,
+ gboolean start_preroll_streams);
static void gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux);
static GstFlowReturn gst_adaptive_demux_combine_flows (GstAdaptiveDemux *
demux);
@@ -483,6 +490,9 @@ gst_adaptive_demux_init (GstAdaptiveDemux * demux,
g_mutex_init (&demux->priv->api_lock);
g_mutex_init (&demux->priv->segment_lock);
+ g_cond_init (&demux->priv->preroll_cond);
+ g_mutex_init (&demux->priv->preroll_lock);
+
pad_template =
gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
g_return_if_fail (pad_template != NULL);
@@ -525,6 +535,9 @@ gst_adaptive_demux_finalize (GObject * object)
demux->realtime_clock = NULL;
}
+ g_cond_clear (&demux->priv->preroll_cond);
+ g_mutex_clear (&demux->priv->preroll_lock);
+
G_OBJECT_CLASS (parent_class)->finalize (object);
}
@@ -684,9 +697,9 @@ gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
}
if (demux->next_streams) {
- gst_adaptive_demux_expose_streams (demux,
+ gst_adaptive_demux_prepare_streams (demux,
gst_adaptive_demux_is_live (demux));
- gst_adaptive_demux_start_tasks (demux);
+ gst_adaptive_demux_start_tasks (demux, TRUE);
if (gst_adaptive_demux_is_live (demux)) {
g_mutex_lock (&demux->priv->updates_timed_lock);
demux->priv->stop_updates_task = FALSE;
@@ -775,6 +788,16 @@ gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
gst_event_unref (eos);
g_list_free (demux->streams);
demux->streams = NULL;
+ if (demux->prepared_streams) {
+ g_list_free_full (demux->prepared_streams,
+ (GDestroyNotify) gst_adaptive_demux_stream_free);
+ demux->prepared_streams = NULL;
+ }
+ if (demux->next_streams) {
+ g_list_free_full (demux->next_streams,
+ (GDestroyNotify) gst_adaptive_demux_stream_free);
+ demux->next_streams = NULL;
+ }
if (old_streams) {
g_list_free_full (old_streams,
@@ -872,10 +895,9 @@ gst_adaptive_demux_set_stream_struct_size (GstAdaptiveDemux * demux,
/* must be called with manifest_lock taken */
static gboolean
-gst_adaptive_demux_expose_stream (GstAdaptiveDemux * demux,
+gst_adaptive_demux_prepare_stream (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream)
{
- gboolean ret;
GstPad *pad = stream->pad;
gchar *name = gst_pad_get_name (pad);
GstEvent *event;
@@ -907,16 +929,33 @@ gst_adaptive_demux_expose_stream (GstAdaptiveDemux * demux,
g_free (stream_id);
g_free (name);
- GST_DEBUG_OBJECT (demux, "Adding srcpad %s:%s with caps %" GST_PTR_FORMAT,
- GST_DEBUG_PAD_NAME (pad), stream->pending_caps);
+ GST_DEBUG_OBJECT (demux, "Preparing srcpad %s:%s", GST_DEBUG_PAD_NAME (pad));
+
+ stream->discont = TRUE;
+
+ return TRUE;
+}
+
+static gboolean
+gst_adaptive_demux_expose_stream (GstAdaptiveDemux * demux,
+ GstAdaptiveDemuxStream * stream)
+{
+ gboolean ret;
+ GstPad *pad = stream->pad;
+ GstCaps *caps;
if (stream->pending_caps) {
gst_pad_set_caps (pad, stream->pending_caps);
- gst_caps_unref (stream->pending_caps);
+ caps = stream->pending_caps;
stream->pending_caps = NULL;
+ } else {
+ caps = gst_pad_get_current_caps (pad);
}
- stream->discont = TRUE;
+ GST_DEBUG_OBJECT (demux, "Exposing srcpad %s:%s with caps %" GST_PTR_FORMAT,
+ GST_DEBUG_PAD_NAME (pad), caps);
+ if (caps)
+ gst_caps_unref (caps);
gst_object_ref (pad);
@@ -959,17 +998,21 @@ gst_adaptive_demux_get_period_start_time (GstAdaptiveDemux * demux)
/* must be called with manifest_lock taken */
static gboolean
-gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux,
+gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
gboolean first_and_live)
{
GList *iter;
- GList *old_streams;
GstClockTime period_start, min_pts = GST_CLOCK_TIME_NONE;
g_return_val_if_fail (demux->next_streams != NULL, FALSE);
+ if (demux->prepared_streams != NULL) {
+ /* Old streams that were never exposed, due to a seek or so */
+ GST_FIXME_OBJECT (demux,
+ "Preparing new streams without cleaning up old ones!");
+ return FALSE;
+ }
- old_streams = demux->streams;
- demux->streams = demux->next_streams;
+ demux->prepared_streams = demux->next_streams;
demux->next_streams = NULL;
if (!demux->running) {
@@ -977,10 +1020,12 @@ gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux,
return TRUE;
}
- for (iter = demux->streams; iter; iter = g_list_next (iter)) {
+ for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
GstAdaptiveDemuxStream *stream = iter->data;
- if (!gst_adaptive_demux_expose_stream (demux,
+ stream->do_block = TRUE;
+
+ if (!gst_adaptive_demux_prepare_stream (demux,
GST_ADAPTIVE_DEMUX_STREAM_CAST (stream))) {
/* TODO act on error */
}
@@ -1011,7 +1056,7 @@ gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux,
GST_SEEK_TYPE_NONE, -1, NULL);
}
- for (iter = demux->streams; iter; iter = g_list_next (iter)) {
+ for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
GstAdaptiveDemuxStream *stream = iter->data;
GstClockTime offset;
@@ -1091,7 +1136,35 @@ gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux,
stream->pending_segment = gst_event_new_segment (&stream->segment);
gst_event_set_seqnum (stream->pending_segment, demux->priv->segment_seqnum);
+
+ GST_DEBUG ("Prepared segment %" GST_SEGMENT_FORMAT " for stream %p",
+ &stream->segment, stream);
+ }
+
+ return TRUE;
+}
+
+static gboolean
+gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux)
+{
+ GList *iter;
+ GList *old_streams;
+
+ g_return_val_if_fail (demux->prepared_streams != NULL, FALSE);
+
+ old_streams = demux->streams;
+ demux->streams = demux->prepared_streams;
+ demux->prepared_streams = NULL;
+
+ for (iter = demux->streams; iter; iter = g_list_next (iter)) {
+ GstAdaptiveDemuxStream *stream = iter->data;
+
+ if (!gst_adaptive_demux_expose_stream (demux,
+ GST_ADAPTIVE_DEMUX_STREAM_CAST (stream))) {
+ /* TODO act on error */
+ }
}
+ demux->priv->preroll_pending = 0;
GST_MANIFEST_UNLOCK (demux);
gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
@@ -1142,6 +1215,14 @@ gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux,
g_list_concat (demux->priv->old_streams, old_streams);
}
+ /* Unblock after removing oldstreams */
+ for (iter = demux->streams; iter; iter = g_list_next (iter)) {
+ GstAdaptiveDemuxStream *stream = iter->data;
+ stream->do_block = FALSE;
+ }
+
+ GST_DEBUG_OBJECT (demux, "All streams are exposed");
+
return TRUE;
}
@@ -1166,6 +1247,11 @@ gst_adaptive_demux_stream_new (GstAdaptiveDemux * demux, GstPad * pad)
g_malloc0 (sizeof (guint64) * NUM_LOOKBACK_FRAGMENTS);
gst_pad_set_element_private (pad, stream);
+ g_mutex_lock (&demux->priv->preroll_lock);
+ stream->do_block = TRUE;
+ demux->priv->preroll_pending++;
+ g_mutex_unlock (&demux->priv->preroll_lock);
+
gst_pad_set_query_function (pad,
GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_query));
gst_pad_set_event_function (pad,
@@ -1509,7 +1595,10 @@ gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux, GstPad * pad,
}
if (demux->next_streams) {
- gst_adaptive_demux_expose_streams (demux, FALSE);
+ /* If the seek generated new streams, get them
+ * to preroll */
+ gst_adaptive_demux_prepare_streams (demux, FALSE);
+ gst_adaptive_demux_start_tasks (demux, TRUE);
} else {
GList *iter;
GstClockTime period_start =
@@ -1540,10 +1629,11 @@ gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux, GstPad * pad,
}
GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
+
+ /* Restart the demux */
+ gst_adaptive_demux_start_tasks (demux, FALSE);
}
- /* Restart the demux */
- gst_adaptive_demux_start_tasks (demux);
if (gst_adaptive_demux_is_live (demux)) {
g_mutex_lock (&demux->priv->updates_timed_lock);
demux->priv->stop_updates_task = FALSE;
@@ -1730,7 +1820,8 @@ gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
/* must be called with manifest_lock taken */
static void
-gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux)
+gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux,
+ gboolean start_preroll_streams)
{
GList *iter;
@@ -1740,12 +1831,17 @@ gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux)
}
GST_INFO_OBJECT (demux, "Starting streams' tasks");
- for (iter = demux->streams; iter; iter = g_list_next (iter)) {
+
+ iter = start_preroll_streams ? demux->prepared_streams : demux->streams;
+
+ for (; iter; iter = g_list_next (iter)) {
GstAdaptiveDemuxStream *stream = iter->data;
- g_mutex_lock (&stream->fragment_download_lock);
- stream->cancelled = FALSE;
- g_mutex_unlock (&stream->fragment_download_lock);
+ if (!start_preroll_streams) {
+ g_mutex_lock (&stream->fragment_download_lock);
+ stream->cancelled = FALSE;
+ g_mutex_unlock (&stream->fragment_download_lock);
+ }
stream->last_ret = GST_FLOW_OK;
gst_task_start (stream->download_task);
@@ -1770,8 +1866,6 @@ gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux)
g_cond_signal (&demux->priv->updates_timed_cond);
g_mutex_unlock (&demux->priv->updates_timed_lock);
- gst_uri_downloader_cancel (demux->downloader);
-
GST_LOG_OBJECT (demux, "Stopping tasks");
for (iter = demux->streams; iter; iter = g_list_next (iter)) {
@@ -1784,6 +1878,12 @@ gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux)
g_mutex_unlock (&stream->fragment_download_lock);
}
+ g_mutex_lock (&demux->priv->preroll_lock);
+ g_cond_broadcast (&demux->priv->preroll_cond);
+ g_mutex_unlock (&demux->priv->preroll_lock);
+
+ gst_uri_downloader_cancel (demux->downloader);
+
g_mutex_lock (&demux->priv->manifest_update_lock);
g_cond_broadcast (&demux->priv->manifest_cond);
g_mutex_unlock (&demux->priv->manifest_update_lock);
@@ -1973,6 +2073,21 @@ gst_adaptive_demux_combine_flows (GstAdaptiveDemux * demux)
return GST_FLOW_OK;
}
+/* Called with preroll_lock */
+static void
+gst_adaptive_demux_handle_preroll (GstAdaptiveDemux * demux,
+ GstAdaptiveDemuxStream * stream)
+{
+ demux->priv->preroll_pending--;
+ if (demux->priv->preroll_pending == 0) {
+ /* That was the last one, time to release all streams
+ * and expose them */
+ GST_DEBUG_OBJECT (demux, "All streams prerolled. exposing");
+ gst_adaptive_demux_expose_streams (demux);
+ g_cond_broadcast (&demux->priv->preroll_cond);
+ }
+}
+
/* must be called with manifest_lock taken.
* Temporarily releases manifest_lock
*/
@@ -2043,6 +2158,35 @@ gst_adaptive_demux_stream_push_buffer (GstAdaptiveDemuxStream * stream,
gst_caps_unref (stream->pending_caps);
stream->pending_caps = NULL;
}
+
+ if (stream->do_block) {
+
+ g_mutex_lock (&demux->priv->preroll_lock);
+
+ /* If we are preroll state, set caps in here */
+ if (pending_caps) {
+ gst_pad_push_event (stream->pad, pending_caps);
+ pending_caps = NULL;
+ }
+
+ gst_adaptive_demux_handle_preroll (demux, stream);
+ GST_MANIFEST_UNLOCK (demux);
+
+ while (stream->do_block && !stream->cancelled) {
+ GST_LOG_OBJECT (demux, "Stream %p sleeping for preroll", stream);
+ g_cond_wait (&demux->priv->preroll_cond, &demux->priv->preroll_lock);
+ }
+ if (stream->cancelled) {
+ GST_LOG_OBJECT (demux, "stream %p cancelled", stream);
+ gst_buffer_unref (buffer);
+ g_mutex_unlock (&demux->priv->preroll_lock);
+ return GST_FLOW_FLUSHING;
+ }
+
+ g_mutex_unlock (&demux->priv->preroll_lock);
+ GST_MANIFEST_LOCK (demux);
+ }
+
if (G_UNLIKELY (stream->pending_segment)) {
GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
pending_segment = stream->pending_segment;
@@ -2098,6 +2242,8 @@ gst_adaptive_demux_stream_push_buffer (GstAdaptiveDemuxStream * stream,
pending_events = g_list_delete_link (pending_events, pending_events);
}
+ /* Wait for preroll if blocking */
+
ret = gst_pad_push (stream->pad, buffer);
GST_MANIFEST_LOCK (demux);
@@ -3786,8 +3932,8 @@ gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
if (can_expose) {
GST_DEBUG_OBJECT (demux, "Subclass wants new pads "
"to do bitrate switching");
- gst_adaptive_demux_expose_streams (demux, FALSE);
- gst_adaptive_demux_start_tasks (demux);
+ gst_adaptive_demux_prepare_streams (demux, FALSE);
+ gst_adaptive_demux_start_tasks (demux, TRUE);
} else {
GST_LOG_OBJECT (demux, "Not switching yet - ongoing downloads");
}
@@ -3946,8 +4092,8 @@ gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux)
GST_DEBUG_OBJECT (demux, "Advancing to next period");
klass->advance_period (demux);
- gst_adaptive_demux_expose_streams (demux, FALSE);
- gst_adaptive_demux_start_tasks (demux);
+ gst_adaptive_demux_prepare_streams (demux, FALSE);
+ gst_adaptive_demux_start_tasks (demux, TRUE);
}
/**