summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorThijs Vermeir <thijsvermeir@gmail.com>2008-06-22 19:19:35 +0000
committerThijs Vermeir <thijsvermeir@gmail.com>2008-06-22 19:19:35 +0000
commit15b694fc7a029ee18354af9458b4c43a52bd8fc8 (patch)
tree340c5a90260e429aeaa4a90166e45986361d5212 /plugins
parentd7d1eecb97bfe6a24c5a4472ca304b1bd1cd030e (diff)
downloadgstreamer-15b694fc7a029ee18354af9458b4c43a52bd8fc8.tar.gz
plugins/elements/gstmultiqueue.c: Add functionality to extra-size-buffers property.
Original commit message from CVS: * plugins/elements/gstmultiqueue.c: Add functionality to extra-size-buffers property.
Diffstat (limited to 'plugins')
-rw-r--r--plugins/elements/gstmultiqueue.c115
1 files changed, 52 insertions, 63 deletions
diff --git a/plugins/elements/gstmultiqueue.c b/plugins/elements/gstmultiqueue.c
index 291f136bd9..abe8c18382 100644
--- a/plugins/elements/gstmultiqueue.c
+++ b/plugins/elements/gstmultiqueue.c
@@ -2,6 +2,7 @@
* Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
* Copyright (C) 2007 Jan Schmidt <jan@fluendo.com>
* Copyright (C) 2007 Wim Taymans <wim@fluendo.com>
+ * Copyright (C) 2008 Thijs Vermeir <thijsvermeir@gmail.com>
*
* gstmultiqueue.c:
*
@@ -218,11 +219,11 @@ enum
};
#define GST_MULTI_QUEUE_MUTEX_LOCK(q) G_STMT_START { \
- g_mutex_lock (q->qlock); \
+ g_mutex_lock (q->qlock); \
} G_STMT_END
#define GST_MULTI_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \
- g_mutex_unlock (q->qlock); \
+ g_mutex_unlock (q->qlock); \
} G_STMT_END
static void gst_multi_queue_finalize (GObject * object);
@@ -252,7 +253,8 @@ gst_multi_queue_base_init (gpointer g_class)
gst_element_class_set_details_simple (gstelement_class,
"MultiQueue",
- "Generic", "Multiple data queue", "Edward Hervey <edward@fluendo.com>");
+ "Generic", "Multiple data queue", "Edward Hervey <edward@fluendo.com>\n"
+ "Thijs Vermeir <thijsvermeir@gmail.com>");
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&sinktemplate));
gst_element_class_add_pad_template (gstelement_class,
@@ -379,13 +381,14 @@ gst_multi_queue_finalize (GObject * object)
G_OBJECT_CLASS (parent_class)->finalize (object);
}
-#define SET_CHILD_PROPERTY(mq,format) G_STMT_START { \
- GList * tmp = mq->queues; \
- while (tmp) { \
- GstSingleQueue *q = (GstSingleQueue*)tmp->data; \
+#define SET_CHILD_PROPERTY(mq,format) G_STMT_START { \
+ GList * tmp = mq->queues; \
+ while (tmp) { \
+ GstSingleQueue *q = (GstSingleQueue*)tmp->data; \
q->max_size.format = mq->max_size.format; \
- tmp = g_list_next(tmp); \
- }; \
+ q->extra_size.format = mq->extra_size.format; \
+ tmp = g_list_next(tmp); \
+ }; \
} G_STMT_END
static void
@@ -394,38 +397,37 @@ gst_multi_queue_set_property (GObject * object, guint prop_id,
{
GstMultiQueue *mq = GST_MULTI_QUEUE (object);
+ GST_MULTI_QUEUE_MUTEX_LOCK (mq);
switch (prop_id) {
case ARG_MAX_SIZE_BYTES:
- GST_MULTI_QUEUE_MUTEX_LOCK (mq);
mq->max_size.bytes = g_value_get_uint (value);
SET_CHILD_PROPERTY (mq, bytes);
- GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
break;
case ARG_MAX_SIZE_BUFFERS:
- GST_MULTI_QUEUE_MUTEX_LOCK (mq);
mq->max_size.visible = g_value_get_uint (value);
SET_CHILD_PROPERTY (mq, visible);
- GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
break;
case ARG_MAX_SIZE_TIME:
- GST_MULTI_QUEUE_MUTEX_LOCK (mq);
mq->max_size.time = g_value_get_uint64 (value);
SET_CHILD_PROPERTY (mq, time);
- GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
break;
case ARG_EXTRA_SIZE_BYTES:
mq->extra_size.bytes = g_value_get_uint (value);
+ SET_CHILD_PROPERTY (mq, bytes);
break;
case ARG_EXTRA_SIZE_BUFFERS:
mq->extra_size.visible = g_value_get_uint (value);
+ SET_CHILD_PROPERTY (mq, visible);
break;
case ARG_EXTRA_SIZE_TIME:
mq->extra_size.time = g_value_get_uint64 (value);
+ SET_CHILD_PROPERTY (mq, time);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
}
static void
@@ -501,7 +503,6 @@ no_parent:
}
}
-
/*
* GstElement methods
*/
@@ -600,6 +601,7 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
sq->srcresult = GST_FLOW_OK;
sq->cur_time = 0;
sq->max_size.visible = mq->max_size.visible;
+ sq->extra_size.visible = mq->extra_size.visible;
sq->is_eos = FALSE;
sq->inextra = FALSE;
sq->nextid = 0;
@@ -1279,61 +1281,48 @@ single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
GstMultiQueue *mq = sq->mqueue;
GList *tmp;
GstDataQueueSize size;
- gboolean filled = FALSE;
+ gboolean filled = TRUE;
gst_data_queue_get_level (sq->queue, &size);
GST_LOG_OBJECT (mq, "Single Queue %d is full", sq->id);
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
- for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
- GstSingleQueue *ssq = (GstSingleQueue *) tmp->data;
- GstDataQueueSize ssize;
- GST_LOG_OBJECT (mq, "Checking Queue %d", ssq->id);
+ /* if we have reached max visible we can maybe bump this
+ * if another queue is empty, skip this if we can't grow anymore
+ */
+ if (IS_FILLED (visible, size.visible) && sq->extra_size.visible > 0) {
+ for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
+ GstSingleQueue *ssq = (GstSingleQueue *) tmp->data;
+
+ if (ssq == sq)
+ continue;
- if (gst_data_queue_is_empty (ssq->queue)) {
- GST_LOG_OBJECT (mq, "Queue %d is empty", ssq->id);
- if (IS_FILLED (visible, size.visible)) {
- sq->max_size.visible = size.visible + 1;
+ if (gst_data_queue_is_empty (ssq->queue)) {
+ sq->max_size.visible++;
+ sq->extra_size.visible--;
GST_DEBUG_OBJECT (mq,
- "Another queue is empty, bumping single queue %d max visible to %d",
- sq->id, sq->max_size.visible);
+ "queue %d is empty, bumping single queue %d max visible to %d",
+ ssq->id, sq->id, sq->max_size.visible);
+ break;
}
- GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
- goto beach;
- }
- /* check if we reached the hard time/bytes limits */
- gst_data_queue_get_level (ssq->queue, &ssize);
-
- GST_DEBUG_OBJECT (mq,
- "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
- G_GUINT64_FORMAT, ssq->id, ssize.visible, sq->max_size.visible,
- ssize.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time);
-
- /* if this queue is filled completely we must signal overrun */
- if (IS_FILLED (bytes, ssize.bytes) || IS_FILLED (time, sq->cur_time)) {
- GST_LOG_OBJECT (mq, "Queue %d is filled", ssq->id);
- filled = TRUE;
}
+ /* check if the queue is still full */
+ filled = gst_data_queue_is_full (sq->queue);
}
- /* no queues were empty */
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
- /* Overrun is always forwarded, since this is blocking the upstream element */
if (filled) {
GST_DEBUG_OBJECT (mq, "A queue is filled, signalling overrun");
g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_OVERRUN], 0);
}
-
-beach:
- return;
}
static void
single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
{
- gboolean empty = TRUE;
+ gboolean all_empty = TRUE;
GstMultiQueue *mq = sq->mqueue;
GList *tmp;
@@ -1342,26 +1331,26 @@ single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
- GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
+ GstSingleQueue *ssq = (GstSingleQueue *) tmp->data;
- if (gst_data_queue_is_full (sq->queue)) {
- GstDataQueueSize size;
+ if (sq == ssq)
+ continue;
- gst_data_queue_get_level (sq->queue, &size);
- if (IS_FILLED (visible, size.visible)) {
- sq->max_size.visible = size.visible + 1;
- GST_DEBUG_OBJECT (mq,
- "queue %d is filled, bumping its max visible to %d", sq->id,
- sq->max_size.visible);
- gst_data_queue_limits_changed (sq->queue);
- }
+ /* prevent data starvation */
+ if (gst_data_queue_is_full (ssq->queue)) {
+ single_queue_overrun_cb (dq, ssq);
+ all_empty = FALSE;
+ break;
+ }
+
+ if (!gst_data_queue_is_empty (ssq->queue)) {
+ all_empty = FALSE;
+ break;
}
- if (!gst_data_queue_is_empty (sq->queue))
- empty = FALSE;
}
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
- if (empty) {
+ if (all_empty) {
GST_DEBUG_OBJECT (mq, "All queues are empty, signalling it");
g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_UNDERRUN], 0);
}
@@ -1442,7 +1431,7 @@ gst_single_queue_new (GstMultiQueue * mqueue)
sq->oldid = 0;
sq->turn = g_cond_new ();
- /* attach to underrun/overrun signals to handle non-starvation */
+ /* attach to underrun/overrun signals to handle non-starvation */
g_signal_connect (G_OBJECT (sq->queue), "full",
G_CALLBACK (single_queue_overrun_cb), sq);
g_signal_connect (G_OBJECT (sq->queue), "empty",