From ea08187036c64b47550d3e4707dfe07daae4c9ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Wed, 2 May 2018 18:11:58 +0300 Subject: appsrc/sink: Fix optimization for only signalling waiters if someone is actually waiting It is possible that both application and the stream are waiting currently, if for example the following happens: 1) app is waiting because no buffer in appsink 2) appsink providing a buffer and waking up app 3) appsink getting another buffer and waiting because it's full now 4) app thread getting back control Previously step 4 would overwrite that the appsink is currently waiting, so it would never be signalled again. https://bugzilla.gnome.org/show_bug.cgi?id=795551 --- gst-libs/gst/app/gstappsink.c | 36 ++++++++++++++++++------------------ gst-libs/gst/app/gstappsrc.c | 18 +++++++++--------- 2 files changed, 27 insertions(+), 27 deletions(-) (limited to 'gst-libs') diff --git a/gst-libs/gst/app/gstappsink.c b/gst-libs/gst/app/gstappsink.c index 87ae643e2..6e65528d0 100644 --- a/gst-libs/gst/app/gstappsink.c +++ b/gst-libs/gst/app/gstappsink.c @@ -75,9 +75,9 @@ typedef enum { - NOONE_WAITING, - STREAM_WAITING, /* streaming thread is waiting for application thread */ - APP_WAITING, /* application thread is waiting for streaming thread */ + NOONE_WAITING = 0, + STREAM_WAITING = 1 << 0, /* streaming thread is waiting for application thread */ + APP_WAITING = 1 << 1, /* application thread is waiting for streaming thread */ } GstAppSinkWaitStatus; struct _GstAppSinkPrivate @@ -719,9 +719,9 @@ gst_app_sink_event (GstBaseSink * sink, GstEvent * event) * consumed, which is a bit confusing for the application */ while (priv->num_buffers > 0 && !priv->flushing && priv->wait_on_eos) { - priv->wait_status = STREAM_WAITING; + priv->wait_status |= STREAM_WAITING; g_cond_wait (&priv->cond, &priv->mutex); - priv->wait_status = NOONE_WAITING; + priv->wait_status &= ~STREAM_WAITING; } if (priv->flushing) emit = FALSE; @@ -769,7 +769,7 @@ gst_app_sink_preroll (GstBaseSink * psink, GstBuffer * buffer) GST_DEBUG_OBJECT (appsink, "setting preroll buffer %p", buffer); gst_buffer_replace (&priv->preroll_buffer, buffer); - if (priv->wait_status == APP_WAITING) + if ((priv->wait_status & APP_WAITING)) g_cond_signal (&priv->cond); emit = priv->emit_signals; @@ -885,9 +885,9 @@ restart: } /* wait for a buffer to be removed or flush */ - priv->wait_status = STREAM_WAITING; + priv->wait_status |= STREAM_WAITING; g_cond_wait (&priv->cond, &priv->mutex); - priv->wait_status = NOONE_WAITING; + priv->wait_status &= ~STREAM_WAITING; if (priv->flushing) goto flushing; @@ -897,7 +897,7 @@ restart: gst_queue_array_push_tail (priv->queue, gst_mini_object_ref (data)); priv->num_buffers++; - if (priv->wait_status == APP_WAITING) + if ((priv->wait_status & APP_WAITING)) g_cond_signal (&priv->cond); emit = priv->emit_signals; @@ -995,9 +995,9 @@ gst_app_sink_query (GstBaseSink * bsink, GstQuery * query) g_mutex_lock (&priv->mutex); GST_DEBUG_OBJECT (appsink, "waiting buffers to be consumed"); while (priv->num_buffers > 0 || priv->preroll_buffer) { - priv->wait_status = STREAM_WAITING; + priv->wait_status |= STREAM_WAITING; g_cond_wait (&priv->cond, &priv->mutex); - priv->wait_status = NOONE_WAITING; + priv->wait_status &= ~STREAM_WAITING; } g_mutex_unlock (&priv->mutex); ret = GST_BASE_SINK_CLASS (parent_class)->query (bsink, query); @@ -1511,14 +1511,14 @@ gst_app_sink_try_pull_preroll (GstAppSink * appsink, GstClockTime timeout) /* nothing to return, wait */ GST_DEBUG_OBJECT (appsink, "waiting for the preroll buffer"); - priv->wait_status = APP_WAITING; + priv->wait_status |= APP_WAITING; if (timeout_valid) { if (!g_cond_wait_until (&priv->cond, &priv->mutex, end_time)) goto expired; } else { g_cond_wait (&priv->cond, &priv->mutex); } - priv->wait_status = NOONE_WAITING; + priv->wait_status &= ~APP_WAITING; } sample = gst_sample_new (priv->preroll_buffer, priv->preroll_caps, @@ -1533,7 +1533,7 @@ gst_app_sink_try_pull_preroll (GstAppSink * appsink, GstClockTime timeout) expired: { GST_DEBUG_OBJECT (appsink, "timeout expired, return NULL"); - priv->wait_status = NOONE_WAITING; + priv->wait_status &= ~APP_WAITING; g_mutex_unlock (&priv->mutex); return NULL; } @@ -1609,14 +1609,14 @@ gst_app_sink_try_pull_sample (GstAppSink * appsink, GstClockTime timeout) /* nothing to return, wait */ GST_DEBUG_OBJECT (appsink, "waiting for a buffer"); - priv->wait_status = APP_WAITING; + priv->wait_status |= APP_WAITING; if (timeout_valid) { if (!g_cond_wait_until (&priv->cond, &priv->mutex, end_time)) goto expired; } else { g_cond_wait (&priv->cond, &priv->mutex); } - priv->wait_status = NOONE_WAITING; + priv->wait_status &= ~APP_WAITING; } obj = dequeue_buffer (appsink); @@ -1631,7 +1631,7 @@ gst_app_sink_try_pull_sample (GstAppSink * appsink, GstClockTime timeout) } gst_mini_object_unref (obj); - if (priv->wait_status == STREAM_WAITING) + if ((priv->wait_status & STREAM_WAITING)) g_cond_signal (&priv->cond); g_mutex_unlock (&priv->mutex); @@ -1642,7 +1642,7 @@ gst_app_sink_try_pull_sample (GstAppSink * appsink, GstClockTime timeout) expired: { GST_DEBUG_OBJECT (appsink, "timeout expired, return NULL"); - priv->wait_status = NOONE_WAITING; + priv->wait_status &= ~APP_WAITING; g_mutex_unlock (&priv->mutex); return NULL; } diff --git a/gst-libs/gst/app/gstappsrc.c b/gst-libs/gst/app/gstappsrc.c index 94ed6f7a9..cf601034e 100644 --- a/gst-libs/gst/app/gstappsrc.c +++ b/gst-libs/gst/app/gstappsrc.c @@ -103,9 +103,9 @@ typedef enum { - NOONE_WAITING, - STREAM_WAITING, /* streaming thread is waiting for application thread */ - APP_WAITING, /* application thread is waiting for streaming thread */ + NOONE_WAITING = 0, + STREAM_WAITING = 1 << 0, /* streaming thread is waiting for application thread */ + APP_WAITING = 1 << 1, /* application thread is waiting for streaming thread */ } GstAppSrcWaitStatus; struct _GstAppSrcPrivate @@ -1242,7 +1242,7 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size, priv->offset += buf_size; /* signal that we removed an item */ - if (priv->wait_status == APP_WAITING) + if ((priv->wait_status & APP_WAITING)) g_cond_broadcast (&priv->cond); /* see if we go lower than the empty-percent */ @@ -1277,9 +1277,9 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size, goto eos; /* nothing to return, wait a while for new data or flushing. */ - priv->wait_status = STREAM_WAITING; + priv->wait_status |= STREAM_WAITING; g_cond_wait (&priv->cond, &priv->mutex); - priv->wait_status = NOONE_WAITING; + priv->wait_status &= ~STREAM_WAITING; } g_mutex_unlock (&priv->mutex); return ret; @@ -1840,9 +1840,9 @@ gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer, GST_DEBUG_OBJECT (appsrc, "waiting for free space"); /* we are filled, wait until a buffer gets popped or when we * flush. */ - priv->wait_status = APP_WAITING; + priv->wait_status |= APP_WAITING; g_cond_wait (&priv->cond, &priv->mutex); - priv->wait_status = NOONE_WAITING; + priv->wait_status &= ~APP_WAITING; } else { /* no need to wait for free space, we just pump more data into the * queue hoping that the caller reacts to the enough-data signal and @@ -1867,7 +1867,7 @@ gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer, priv->queued_bytes += gst_buffer_get_size (buffer); } - if (priv->wait_status == STREAM_WAITING) + if ((priv->wait_status & STREAM_WAITING)) g_cond_broadcast (&priv->cond); g_mutex_unlock (&priv->mutex); -- cgit v1.2.1