diff options
Diffstat (limited to 'libs/gst/base/gstbasesrc.c')
-rw-r--r-- | libs/gst/base/gstbasesrc.c | 598 |
1 files changed, 340 insertions, 258 deletions
diff --git a/libs/gst/base/gstbasesrc.c b/libs/gst/base/gstbasesrc.c index e2efcaa7bd..c40ae940dc 100644 --- a/libs/gst/base/gstbasesrc.c +++ b/libs/gst/base/gstbasesrc.c @@ -47,9 +47,6 @@ * </listitem> * </itemizedlist> * - * Since 0.10.9, any #GstBaseSrc can enable pull based scheduling at any time - * by overriding #GstBaseSrcClass.check_get_range() so that it returns %TRUE. - * * If all the conditions are met for operating in pull mode, #GstBaseSrc is * automatically seekable in push mode as well. The following conditions must * be met to make the element seekable in push mode when the format is not @@ -150,9 +147,6 @@ * an EOS message to be posted on the pipeline's bus. Once this EOS message is * received, it may safely shut down the entire pipeline. * - * The old behaviour for controlled shutdown introduced since GStreamer 0.10.3 - * is still available but deprecated as it is dangerous and less flexible. - * * Last reviewed on 2007-12-19 (0.10.16) * </para> * </refsect2> @@ -212,15 +206,11 @@ enum struct _GstBaseSrcPrivate { - gboolean last_sent_eos; /* last thing we did was send an EOS (we set this - * to avoid the sending of two EOS in some cases) */ gboolean discont; gboolean flushing; - /* two segments to be sent in the streaming thread with STREAM_LOCK */ - GstEvent *close_segment; - GstEvent *start_segment; - gboolean newsegment_pending; + /* if segment should be sent */ + gboolean segment_pending; /* if EOS is pending (atomic) */ gint pending_eos; @@ -248,11 +238,15 @@ struct _GstBaseSrcPrivate gboolean qos_enabled; gdouble proportion; GstClockTime earliest_time; + + GstBufferPool *pool; + const GstAllocator *allocator; + guint prefix; + guint alignment; }; static GstElementClass *parent_class = NULL; -static void gst_base_src_base_init (gpointer g_class); static void gst_base_src_class_init (GstBaseSrcClass * klass); static void gst_base_src_init (GstBaseSrc * src, gpointer g_class); static void gst_base_src_finalize (GObject * object); @@ -267,7 +261,7 @@ gst_base_src_get_type (void) GType _type; static const GTypeInfo base_src_info = { sizeof (GstBaseSrcClass), - (GBaseInitFunc) gst_base_src_base_init, + NULL, NULL, (GClassInitFunc) gst_base_src_class_init, NULL, @@ -284,10 +278,10 @@ gst_base_src_get_type (void) return base_src_type; } -static GstCaps *gst_base_src_getcaps (GstPad * pad); -static gboolean gst_base_src_setcaps (GstPad * pad, GstCaps * caps); +static GstCaps *gst_base_src_getcaps (GstPad * pad, GstCaps * filter); static void gst_base_src_fixate (GstPad * pad, GstCaps * caps); +static gboolean gst_base_src_is_random_access (GstBaseSrc * src); static gboolean gst_base_src_activate_push (GstPad * pad, gboolean active); static gboolean gst_base_src_activate_pull (GstPad * pad, gboolean active); static void gst_base_src_set_property (GObject * object, guint prop_id, @@ -301,12 +295,16 @@ static const GstQueryType *gst_base_src_get_query_types (GstElement * element); static gboolean gst_base_src_query (GstPad * pad, GstQuery * query); +static gboolean gst_base_src_activate_pool (GstBaseSrc * basesrc, + gboolean active); static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc); static gboolean gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment); static gboolean gst_base_src_default_query (GstBaseSrc * src, GstQuery * query); static gboolean gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event, GstSegment * segment); +static GstFlowReturn gst_base_src_default_create (GstBaseSrc * basesrc, + guint64 offset, guint size, GstBuffer ** buf); static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc, gboolean flushing, gboolean live_play, gboolean unlock, gboolean * playing); @@ -317,23 +315,16 @@ static GstStateChangeReturn gst_base_src_change_state (GstElement * element, GstStateChange transition); static void gst_base_src_loop (GstPad * pad); -static gboolean gst_base_src_pad_check_get_range (GstPad * pad); -static gboolean gst_base_src_default_check_get_range (GstBaseSrc * bsrc); static GstFlowReturn gst_base_src_pad_get_range (GstPad * pad, guint64 offset, guint length, GstBuffer ** buf); static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length, GstBuffer ** buf); static gboolean gst_base_src_seekable (GstBaseSrc * src); +static gboolean gst_base_src_negotiate (GstBaseSrc * basesrc); static gboolean gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length); static void -gst_base_src_base_init (gpointer g_class) -{ - GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element"); -} - -static void gst_base_src_class_init (GstBaseSrcClass * klass) { GObjectClass *gobject_class; @@ -342,6 +333,8 @@ gst_base_src_class_init (GstBaseSrcClass * klass) gobject_class = G_OBJECT_CLASS (klass); gstelement_class = GST_ELEMENT_CLASS (klass); + GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element"); + g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate)); parent_class = g_type_class_peek_parent (klass); @@ -350,10 +343,9 @@ gst_base_src_class_init (GstBaseSrcClass * klass) gobject_class->set_property = gst_base_src_set_property; gobject_class->get_property = gst_base_src_get_property; -/* FIXME 0.11: blocksize property should be int, not ulong (min is >max here) */ g_object_class_install_property (gobject_class, PROP_BLOCKSIZE, - g_param_spec_ulong ("blocksize", "Block size", - "Size in bytes to read per buffer (-1 = default)", 0, G_MAXULONG, + g_param_spec_uint ("blocksize", "Block size", + "Size in bytes to read per buffer (-1 = default)", 0, G_MAXUINT, DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS, g_param_spec_int ("num-buffers", "num-buffers", @@ -379,10 +371,9 @@ gst_base_src_class_init (GstBaseSrcClass * klass) klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event); klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek); klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query); - klass->check_get_range = - GST_DEBUG_FUNCPTR (gst_base_src_default_check_get_range); klass->prepare_seek_segment = GST_DEBUG_FUNCPTR (gst_base_src_default_prepare_seek_segment); + klass->create = GST_DEBUG_FUNCPTR (gst_base_src_default_create); /* Registering debug symbols for function pointers */ GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_push); @@ -390,9 +381,7 @@ gst_base_src_class_init (GstBaseSrcClass * klass) GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_event_handler); GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_query); GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_pad_get_range); - GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_pad_check_get_range); GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_getcaps); - GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_setcaps); GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_fixate); } @@ -411,7 +400,6 @@ gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class) basesrc->num_buffers_left = -1; basesrc->can_activate_push = TRUE; - basesrc->pad_mode = GST_ACTIVATE_NONE; pad_template = gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src"); @@ -425,10 +413,8 @@ gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class) gst_pad_set_activatepull_function (pad, gst_base_src_activate_pull); gst_pad_set_event_function (pad, gst_base_src_event_handler); gst_pad_set_query_function (pad, gst_base_src_query); - gst_pad_set_checkgetrange_function (pad, gst_base_src_pad_check_get_range); gst_pad_set_getrange_function (pad, gst_base_src_pad_get_range); gst_pad_set_getcaps_function (pad, gst_base_src_getcaps); - gst_pad_set_setcaps_function (pad, gst_base_src_setcaps); gst_pad_set_fixatecaps_function (pad, gst_base_src_fixate); /* hold pointer to pad */ @@ -440,7 +426,7 @@ gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class) basesrc->clock_id = NULL; /* we operate in BYTES by default */ gst_base_src_set_format (basesrc, GST_FORMAT_BYTES); - basesrc->data.ABI.typefind = DEFAULT_TYPEFIND; + basesrc->typefind = DEFAULT_TYPEFIND; basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP; g_atomic_int_set (&basesrc->priv->have_events, FALSE); @@ -461,7 +447,7 @@ gst_base_src_finalize (GObject * object) g_mutex_free (basesrc->live_lock); g_cond_free (basesrc->live_cond); - event_p = &basesrc->data.ABI.pending_seek; + event_p = &basesrc->pending_seek; gst_event_replace (event_p, NULL); if (basesrc->priv->pending_events) { @@ -667,9 +653,8 @@ gst_base_src_query_latency (GstBaseSrc * src, gboolean * live, * * Since: 0.10.22 */ -/* FIXME 0.11: blocksize property should be int, not ulong */ void -gst_base_src_set_blocksize (GstBaseSrc * src, gulong blocksize) +gst_base_src_set_blocksize (GstBaseSrc * src, guint blocksize) { g_return_if_fail (GST_IS_BASE_SRC (src)); @@ -688,11 +673,10 @@ gst_base_src_set_blocksize (GstBaseSrc * src, gulong blocksize) * * Since: 0.10.22 */ -/* FIXME 0.11: blocksize property should be int, not ulong */ -gulong +guint gst_base_src_get_blocksize (GstBaseSrc * src) { - gulong res; + gint res; g_return_val_if_fail (GST_IS_BASE_SRC (src), 0); @@ -779,51 +763,33 @@ gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop, GST_TIME_ARGS (stop), GST_TIME_ARGS (position)); GST_OBJECT_LOCK (src); - if (src->data.ABI.running && !src->priv->newsegment_pending) { - if (src->priv->close_segment) - gst_event_unref (src->priv->close_segment); - src->priv->close_segment = - gst_event_new_new_segment_full (TRUE, - src->segment.rate, src->segment.applied_rate, src->segment.format, - src->segment.start, src->segment.last_stop, src->segment.time); - } - - gst_segment_set_newsegment_full (&src->segment, FALSE, src->segment.rate, - src->segment.applied_rate, src->segment.format, start, stop, position); - - if (src->priv->start_segment) - gst_event_unref (src->priv->start_segment); - if (src->segment.rate >= 0.0) { - /* forward, we send data from last_stop to stop */ - src->priv->start_segment = - gst_event_new_new_segment_full (FALSE, - src->segment.rate, src->segment.applied_rate, src->segment.format, - src->segment.last_stop, stop, src->segment.time); - } else { - /* reverse, we send data from last_stop to start */ - src->priv->start_segment = - gst_event_new_new_segment_full (FALSE, - src->segment.rate, src->segment.applied_rate, src->segment.format, - src->segment.start, src->segment.last_stop, src->segment.time); - } + + src->segment.base = gst_segment_to_running_time (&src->segment, + src->segment.format, src->segment.position); + src->segment.start = start; + src->segment.stop = stop; + src->segment.position = position; + + /* forward, we send data from position to stop */ + src->priv->segment_pending = TRUE; GST_OBJECT_UNLOCK (src); src->priv->discont = TRUE; - src->data.ABI.running = TRUE; + src->running = TRUE; return res; } static gboolean -gst_base_src_setcaps (GstPad * pad, GstCaps * caps) +gst_base_src_setcaps (GstBaseSrc * bsrc, GstCaps * caps) { GstBaseSrcClass *bclass; - GstBaseSrc *bsrc; gboolean res = TRUE; - bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad)); bclass = GST_BASE_SRC_GET_CLASS (bsrc); + gst_pad_push_event (bsrc->srcpad, gst_event_new_caps (caps)); + if (bclass->set_caps) res = bclass->set_caps (bsrc, caps); @@ -831,7 +797,7 @@ gst_base_src_setcaps (GstPad * pad, GstCaps * caps) } static GstCaps * -gst_base_src_getcaps (GstPad * pad) +gst_base_src_getcaps (GstPad * pad, GstCaps * filter) { GstBaseSrcClass *bclass; GstBaseSrc *bsrc; @@ -840,7 +806,7 @@ gst_base_src_getcaps (GstPad * pad) bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad)); bclass = GST_BASE_SRC_GET_CLASS (bsrc); if (bclass->get_caps) - caps = bclass->get_caps (bsrc); + caps = bclass->get_caps (bsrc, filter); if (caps == NULL) { GstPadTemplate *pad_template; @@ -848,7 +814,16 @@ gst_base_src_getcaps (GstPad * pad) pad_template = gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src"); if (pad_template != NULL) { - caps = gst_caps_ref (gst_pad_template_get_caps (pad_template)); + caps = gst_pad_template_get_caps (pad_template); + + if (filter) { + GstCaps *intersection; + + intersection = + gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST); + gst_caps_unref (caps); + caps = intersection; + } } } return caps; @@ -892,7 +867,7 @@ gst_base_src_default_query (GstBaseSrc * src, GstQuery * query) gint64 duration; GST_OBJECT_LOCK (src); - position = src->segment.last_stop; + position = src->segment.position; duration = src->segment.duration; GST_OBJECT_UNLOCK (src); @@ -917,7 +892,7 @@ gst_base_src_default_query (GstBaseSrc * src, GstQuery * query) GST_OBJECT_LOCK (src); position = gst_segment_to_stream_time (&src->segment, src->segment.format, - src->segment.last_stop); + src->segment.position); seg_format = src->segment.format; GST_OBJECT_UNLOCK (src); @@ -1122,6 +1097,20 @@ gst_base_src_default_query (GstBaseSrc * src, GstQuery * query) gst_query_set_buffering_range (query, format, start, stop, estimated); break; } + case GST_QUERY_SCHEDULING: + { + gboolean random_access; + + random_access = gst_base_src_is_random_access (src); + + /* we can operate in getrange mode if the native format is bytes + * and we are seekable, this condition is set in the random_access + * flag and is set in the _start() method. */ + gst_query_set_scheduling (query, random_access, TRUE, FALSE, 1, -1, 1); + + res = TRUE; + break; + } default: res = FALSE; break; @@ -1213,7 +1202,7 @@ gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event, dest_format = segment->format; if (seek_format == dest_format) { - gst_segment_set_seek (segment, rate, seek_format, flags, + gst_segment_do_seek (segment, rate, seek_format, flags, cur_type, cur, stop_type, stop, &update); return TRUE; } @@ -1235,7 +1224,7 @@ gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event, } /* And finally, configure our output segment in the desired format */ - gst_segment_set_seek (segment, rate, dest_format, flags, cur_type, cur, + gst_segment_do_seek (segment, rate, dest_format, flags, cur_type, cur, stop_type, stop, &update); if (!res) @@ -1265,6 +1254,77 @@ gst_base_src_prepare_seek_segment (GstBaseSrc * src, GstEvent * event, return result; } +static GstFlowReturn +gst_base_src_alloc_buffer (GstBaseSrc * src, guint64 offset, + guint size, GstBuffer ** buffer) +{ + GstFlowReturn ret; + GstBaseSrcPrivate *priv = src->priv; + + if (priv->pool) { + ret = gst_buffer_pool_acquire_buffer (priv->pool, buffer, NULL); + } else { + *buffer = gst_buffer_new_allocate (priv->allocator, size, priv->alignment); + if (G_UNLIKELY (*buffer == NULL)) + goto alloc_failed; + + ret = GST_FLOW_OK; + } + return ret; + + /* ERRORS */ +alloc_failed: + { + GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", size); + return GST_FLOW_ERROR; + } +} + +static GstFlowReturn +gst_base_src_default_create (GstBaseSrc * src, guint64 offset, + guint size, GstBuffer ** buffer) +{ + GstBaseSrcClass *bclass; + GstFlowReturn ret; + + bclass = GST_BASE_SRC_GET_CLASS (src); + + if (G_UNLIKELY (!bclass->fill)) + goto no_function; + + ret = gst_base_src_alloc_buffer (src, offset, size, buffer); + if (G_UNLIKELY (ret != GST_FLOW_OK)) + goto alloc_failed; + + if (G_LIKELY (size > 0)) { + /* only call fill when there is a size */ + ret = bclass->fill (src, offset, size, *buffer); + if (G_UNLIKELY (ret != GST_FLOW_OK)) + goto not_ok; + } + + return GST_FLOW_OK; + + /* ERRORS */ +no_function: + { + GST_DEBUG_OBJECT (src, "no fill function"); + return GST_FLOW_NOT_SUPPORTED; + } +alloc_failed: + { + GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", size); + return ret; + } +not_ok: + { + GST_DEBUG_OBJECT (src, "fill returned %d (%s)", ret, + gst_flow_get_name (ret)); + gst_buffer_unref (*buffer); + return ret; + } +} + /* this code implements the seeking. It is a good example * handling all cases. * @@ -1416,7 +1476,7 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock) } else { /* The seek format matches our processing format, no need to ask the * the subclass to configure the segment. */ - gst_segment_set_seek (&seeksegment, rate, seek_format, flags, + gst_segment_do_seek (&seeksegment, rate, seek_format, flags, cur_type, cur, stop_type, stop, &update); } } @@ -1427,33 +1487,19 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock) if (res) { GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT, - seeksegment.start, seeksegment.stop, seeksegment.last_stop); + seeksegment.start, seeksegment.stop, seeksegment.position); - /* do the seek, segment.last_stop contains the new position. */ + /* do the seek, segment.position contains the new position. */ res = gst_base_src_do_seek (src, &seeksegment); } /* and prepare to continue streaming */ if (flush) { - tevent = gst_event_new_flush_stop (); + tevent = gst_event_new_flush_stop (TRUE); gst_event_set_seqnum (tevent, seqnum); /* send flush stop, peer will accept data and events again. We * are not yet providing data as we still have the STREAM_LOCK. */ gst_pad_push_event (src->srcpad, tevent); - } else if (res && src->data.ABI.running) { - /* we are running the current segment and doing a non-flushing seek, - * close the segment first based on the last_stop. */ - GST_DEBUG_OBJECT (src, "closing running segment %" G_GINT64_FORMAT - " to %" G_GINT64_FORMAT, src->segment.start, src->segment.last_stop); - - /* queue the segment for sending in the stream thread */ - if (src->priv->close_segment) - gst_event_unref (src->priv->close_segment); - src->priv->close_segment = - gst_event_new_new_segment_full (TRUE, - src->segment.rate, src->segment.applied_rate, src->segment.format, - src->segment.start, src->segment.last_stop, src->segment.time); - gst_event_set_seqnum (src->priv->close_segment, seqnum); } /* The subclass must have converted the segment to the processing format @@ -1475,7 +1521,7 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock) GstMessage *message; message = gst_message_new_segment_start (GST_OBJECT (src), - seeksegment.format, seeksegment.last_stop); + seeksegment.format, seeksegment.position); gst_message_set_seqnum (message, seqnum); gst_element_post_message (GST_ELEMENT (src), message); @@ -1486,32 +1532,11 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock) if ((stop = seeksegment.stop) == -1) stop = seeksegment.duration; - GST_DEBUG_OBJECT (src, "Sending newsegment from %" G_GINT64_FORMAT - " to %" G_GINT64_FORMAT, seeksegment.start, stop); - - /* now replace the old segment so that we send it in the stream thread the - * next time it is scheduled. */ - if (src->priv->start_segment) - gst_event_unref (src->priv->start_segment); - if (seeksegment.rate >= 0.0) { - /* forward, we send data from last_stop to stop */ - src->priv->start_segment = - gst_event_new_new_segment_full (FALSE, - seeksegment.rate, seeksegment.applied_rate, seeksegment.format, - seeksegment.last_stop, stop, seeksegment.time); - } else { - /* reverse, we send data from last_stop to start */ - src->priv->start_segment = - gst_event_new_new_segment_full (FALSE, - seeksegment.rate, seeksegment.applied_rate, seeksegment.format, - seeksegment.start, seeksegment.last_stop, seeksegment.time); - } - gst_event_set_seqnum (src->priv->start_segment, seqnum); - src->priv->newsegment_pending = TRUE; + src->priv->segment_pending = TRUE; } src->priv->discont = TRUE; - src->data.ABI.running = TRUE; + src->running = TRUE; /* and restart the task in case it got paused explicitly or by * the FLUSH_START event we pushed out. */ tres = gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop, @@ -1593,10 +1618,12 @@ gst_base_src_send_event (GstElement * element, GstEvent * event) g_atomic_int_set (&src->priv->pending_eos, TRUE); GST_DEBUG_OBJECT (src, "EOS marked, calling unlock"); + /* unlock the _create function so that we can check the pending_eos flag * and we can do EOS. This will eventually release the LIVE_LOCK again so * that we can grab it and stop the unlock again. We don't take the stream * lock so that this operation is guaranteed to never block. */ + gst_base_src_activate_pool (src, FALSE); if (bclass->unlock) bclass->unlock (src); @@ -1608,13 +1635,14 @@ gst_base_src_send_event (GstElement * element, GstEvent * event) * lock is enough because that protects the create function. */ if (bclass->unlock_stop) bclass->unlock_stop (src); + gst_base_src_activate_pool (src, TRUE); GST_LIVE_UNLOCK (src); result = TRUE; break; } - case GST_EVENT_NEWSEGMENT: - /* sending random NEWSEGMENT downstream can break sync. */ + case GST_EVENT_SEGMENT: + /* sending random SEGMENT downstream can break sync. */ break; case GST_EVENT_TAG: case GST_EVENT_CUSTOM_DOWNSTREAM: @@ -1658,7 +1686,7 @@ gst_base_src_send_event (GstElement * element, GstEvent * event) * get activated */ GST_OBJECT_LOCK (src); GST_DEBUG_OBJECT (src, "queueing seek"); - event_p = &src->data.ABI.pending_seek; + event_p = &src->pending_seek; gst_event_replace ((GstEvent **) event_p, event); GST_OBJECT_UNLOCK (src); /* assume the seek will work */ @@ -1761,7 +1789,7 @@ gst_base_src_default_event (GstBaseSrc * src, GstEvent * event) GstClockTimeDiff diff; GstClockTime timestamp; - gst_event_parse_qos (event, &proportion, &diff, ×tamp); + gst_event_parse_qos (event, NULL, &proportion, &diff, ×tamp); gst_base_src_update_qos (src, proportion, diff, timestamp); result = TRUE; break; @@ -1824,13 +1852,13 @@ gst_base_src_set_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_BLOCKSIZE: - gst_base_src_set_blocksize (src, g_value_get_ulong (value)); + gst_base_src_set_blocksize (src, g_value_get_uint (value)); break; case PROP_NUM_BUFFERS: src->num_buffers = g_value_get_int (value); break; case PROP_TYPEFIND: - src->data.ABI.typefind = g_value_get_boolean (value); + src->typefind = g_value_get_boolean (value); break; case PROP_DO_TIMESTAMP: gst_base_src_set_do_timestamp (src, g_value_get_boolean (value)); @@ -1851,13 +1879,13 @@ gst_base_src_get_property (GObject * object, guint prop_id, GValue * value, switch (prop_id) { case PROP_BLOCKSIZE: - g_value_set_ulong (value, gst_base_src_get_blocksize (src)); + g_value_set_uint (value, gst_base_src_get_blocksize (src)); break; case PROP_NUM_BUFFERS: g_value_set_int (value, src->num_buffers); break; case PROP_TYPEFIND: - g_value_set_boolean (value, src->data.ABI.typefind); + g_value_set_boolean (value, src->typefind); break; case PROP_DO_TIMESTAMP: g_value_set_boolean (value, gst_base_src_get_do_timestamp (src)); @@ -2062,7 +2090,7 @@ gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length) format = src->segment.format; stop = src->segment.stop; /* get total file size */ - size = (guint64) src->segment.duration; + size = src->segment.duration; /* only operate if we are working with bytes */ if (format != GST_FORMAT_BYTES) @@ -2111,10 +2139,10 @@ gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length) } } - /* keep track of current position and update duration. + /* keep track of current duration. * segment is in bytes, we checked that above. */ GST_OBJECT_LOCK (src); - gst_segment_set_duration (&src->segment, GST_FORMAT_BYTES, size); + src->segment.duration = size; GST_OBJECT_UNLOCK (src); return TRUE; @@ -2197,16 +2225,10 @@ again: /* no timestamp set and we are at offset 0, we can timestamp with 0 */ if (offset == 0 && src->segment.time == 0 && GST_BUFFER_TIMESTAMP (*buf) == -1 && !src->is_live) { - *buf = gst_buffer_make_metadata_writable (*buf); + *buf = gst_buffer_make_writable (*buf); GST_BUFFER_TIMESTAMP (*buf) = 0; } - /* set pad caps on the buffer if the buffer had no caps */ - if (GST_BUFFER_CAPS (*buf) == NULL) { - *buf = gst_buffer_make_metadata_writable (*buf); - gst_buffer_set_caps (*buf, GST_PAD_CAPS (src->srcpad)); - } - /* now sync before pushing the buffer */ status = gst_base_src_do_sync (src, *buf); @@ -2276,7 +2298,7 @@ not_started: no_function: { GST_DEBUG_OBJECT (src, "no create function"); - return GST_FLOW_ERROR; + return GST_FLOW_NOT_SUPPORTED; } unexpected_length: { @@ -2335,60 +2357,16 @@ flushing: } static gboolean -gst_base_src_default_check_get_range (GstBaseSrc * src) +gst_base_src_is_random_access (GstBaseSrc * src) { - gboolean res; - + /* we need to start the basesrc to check random access */ if (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)) { GST_LOG_OBJECT (src, "doing start/stop to check get_range support"); if (G_LIKELY (gst_base_src_start (src))) gst_base_src_stop (src); } - /* we can operate in getrange mode if the native format is bytes - * and we are seekable, this condition is set in the random_access - * flag and is set in the _start() method. */ - res = src->random_access; - - return res; -} - -static gboolean -gst_base_src_check_get_range (GstBaseSrc * src) -{ - GstBaseSrcClass *bclass; - gboolean res; - - bclass = GST_BASE_SRC_GET_CLASS (src); - - if (bclass->check_get_range == NULL) - goto no_function; - - res = bclass->check_get_range (src); - GST_LOG_OBJECT (src, "%s() returned %d", - GST_DEBUG_FUNCPTR_NAME (bclass->check_get_range), (gint) res); - - return res; - - /* ERRORS */ -no_function: - { - GST_WARNING_OBJECT (src, "no check_get_range function set"); - return FALSE; - } -} - -static gboolean -gst_base_src_pad_check_get_range (GstPad * pad) -{ - GstBaseSrc *src; - gboolean res; - - src = GST_BASE_SRC (GST_OBJECT_PARENT (pad)); - - res = gst_base_src_check_get_range (src); - - return res; + return src->random_access; } static void @@ -2399,25 +2377,29 @@ gst_base_src_loop (GstPad * pad) GstFlowReturn ret; gint64 position; gboolean eos; - gulong blocksize; + guint blocksize; GList *pending_events = NULL, *tmp; eos = FALSE; src = GST_BASE_SRC (GST_OBJECT_PARENT (pad)); + /* check if we need to renegotiate */ + if (gst_pad_check_reconfigure (pad)) { + if (!gst_base_src_negotiate (src)) + GST_DEBUG_OBJECT (src, "Failed to renegotiate"); + } + GST_LIVE_LOCK (src); if (G_UNLIKELY (src->priv->flushing)) goto flushing; - src->priv->last_sent_eos = FALSE; - blocksize = src->blocksize; /* if we operate in bytes, we can calculate an offset */ if (src->segment.format == GST_FORMAT_BYTES) { - position = src->segment.last_stop; + position = src->segment.position; /* for negative rates, start with subtracting the blocksize */ if (src->segment.rate < 0.0) { /* we cannot go below segment.start */ @@ -2432,7 +2414,7 @@ gst_base_src_loop (GstPad * pad) } else position = -1; - GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %lu", + GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %u", GST_TIME_ARGS (position), blocksize); ret = gst_base_src_get_range (src, position, blocksize, &buf); @@ -2447,15 +2429,10 @@ gst_base_src_loop (GstPad * pad) goto null_buffer; /* push events to close/start our segment before we push the buffer. */ - if (G_UNLIKELY (src->priv->close_segment)) { - gst_pad_push_event (pad, src->priv->close_segment); - src->priv->close_segment = NULL; - } - if (G_UNLIKELY (src->priv->start_segment)) { - gst_pad_push_event (pad, src->priv->start_segment); - src->priv->start_segment = NULL; + if (G_UNLIKELY (src->priv->segment_pending)) { + gst_pad_push_event (pad, gst_event_new_segment (&src->segment)); + src->priv->segment_pending = FALSE; } - src->priv->newsegment_pending = FALSE; if (g_atomic_int_get (&src->priv->have_events)) { GST_OBJECT_LOCK (src); @@ -2479,7 +2456,7 @@ gst_base_src_loop (GstPad * pad) switch (src->segment.format) { case GST_FORMAT_BYTES: { - guint bufsize = GST_BUFFER_SIZE (buf); + guint bufsize = gst_buffer_get_size (buf); /* we subtracted above for negative rates */ if (src->segment.rate >= 0.0) @@ -2496,7 +2473,7 @@ gst_base_src_loop (GstPad * pad) if (GST_CLOCK_TIME_IS_VALID (start)) position = start; else - position = src->segment.last_stop; + position = src->segment.position; if (GST_CLOCK_TIME_IS_VALID (duration)) { if (src->segment.rate >= 0.0) @@ -2538,12 +2515,12 @@ gst_base_src_loop (GstPad * pad) src->priv->discont = TRUE; } GST_OBJECT_LOCK (src); - gst_segment_set_last_stop (&src->segment, src->segment.format, position); + src->segment.position = position; GST_OBJECT_UNLOCK (src); } if (G_UNLIKELY (src->priv->discont)) { - buf = gst_buffer_make_metadata_writable (buf); + buf = gst_buffer_make_writable (buf); GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT); src->priv->discont = FALSE; } @@ -2579,30 +2556,29 @@ pause: GstEvent *event; GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason); - src->data.ABI.running = FALSE; + src->running = FALSE; gst_pad_pause_task (pad); if (ret == GST_FLOW_UNEXPECTED) { gboolean flag_segment; GstFormat format; - gint64 last_stop; + gint64 position; /* perform EOS logic */ flag_segment = (src->segment.flags & GST_SEEK_FLAG_SEGMENT) != 0; format = src->segment.format; - last_stop = src->segment.last_stop; + position = src->segment.position; if (flag_segment) { GstMessage *message; message = gst_message_new_segment_done (GST_OBJECT_CAST (src), - format, last_stop); + format, position); gst_message_set_seqnum (message, src->priv->seqnum); gst_element_post_message (GST_ELEMENT_CAST (src), message); } else { event = gst_event_new_eos (); gst_event_set_seqnum (event, src->priv->seqnum); gst_pad_push_event (pad, event); - src->priv->last_sent_eos = TRUE; } } else if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_UNEXPECTED) { event = gst_event_new_eos (); @@ -2617,7 +2593,6 @@ pause: (_("Internal data flow error.")), ("streaming task paused, reason %s (%d)", reason, ret)); gst_pad_push_event (pad, event); - src->priv->last_sent_eos = TRUE; } goto done; } @@ -2630,6 +2605,119 @@ null_buffer: } } +static gboolean +gst_base_src_set_allocation (GstBaseSrc * basesrc, GstBufferPool * pool, + const GstAllocator * allocator, guint prefix, guint alignment) +{ + GstBufferPool *oldpool; + GstBaseSrcPrivate *priv = basesrc->priv; + + if (pool) { + if (!gst_buffer_pool_set_active (pool, TRUE)) + goto activate_failed; + } + + GST_OBJECT_LOCK (basesrc); + oldpool = priv->pool; + priv->pool = pool; + + priv->allocator = allocator; + + priv->prefix = prefix; + priv->alignment = alignment; + GST_OBJECT_UNLOCK (basesrc); + + if (oldpool) { + gst_buffer_pool_set_active (oldpool, FALSE); + gst_object_unref (oldpool); + } + return TRUE; + + /* ERRORS */ +activate_failed: + { + GST_ERROR_OBJECT (basesrc, "failed to activate bufferpool."); + return FALSE; + } +} + +static gboolean +gst_base_src_activate_pool (GstBaseSrc * basesrc, gboolean active) +{ + GstBaseSrcPrivate *priv = basesrc->priv; + GstBufferPool *pool; + gboolean res = TRUE; + + GST_OBJECT_LOCK (basesrc); + if ((pool = priv->pool)) + pool = gst_object_ref (pool); + GST_OBJECT_UNLOCK (basesrc); + + if (pool) { + res = gst_buffer_pool_set_active (pool, active); + gst_object_unref (pool); + } + return res; +} + +static gboolean +gst_base_src_prepare_allocation (GstBaseSrc * basesrc, GstCaps * caps) +{ + GstBaseSrcClass *bclass; + gboolean result = TRUE; + GstQuery *query; + GstBufferPool *pool = NULL; + const GstAllocator *allocator = NULL; + guint size, min, max, prefix, alignment; + + bclass = GST_BASE_SRC_GET_CLASS (basesrc); + + /* make query and let peer pad answer, we don't really care if it worked or + * not, if it failed, the allocation query would contain defaults and the + * subclass would then set better values if needed */ + query = gst_query_new_allocation (caps, TRUE); + if (!gst_pad_peer_query (basesrc->srcpad, query)) { + /* not a problem, just debug a little */ + GST_DEBUG_OBJECT (basesrc, "peer ALLOCATION query failed"); + } + + if (G_LIKELY (bclass->setup_allocation)) + result = bclass->setup_allocation (basesrc, query); + + GST_DEBUG_OBJECT (basesrc, "ALLOCATION params: %" GST_PTR_FORMAT, query); + gst_query_parse_allocation_params (query, &size, &min, &max, &prefix, + &alignment, &pool); + + if (size == 0) { + const gchar *mem = NULL; + + /* no size, we have variable size buffers */ + if (gst_query_get_n_allocation_memories (query) > 0) { + mem = gst_query_parse_nth_allocation_memory (query, 0); + } + allocator = gst_allocator_find (mem); + } else if (pool == NULL) { + /* fixed size, we can use a bufferpool */ + GstStructure *config; + + /* we did not get a pool, make one ourselves then */ + pool = gst_buffer_pool_new (); + + config = gst_buffer_pool_get_config (pool); + gst_buffer_pool_config_set (config, caps, size, min, max, prefix, + alignment); + gst_buffer_pool_set_config (pool, config); + } + + gst_query_unref (query); + + result = + gst_base_src_set_allocation (basesrc, pool, allocator, prefix, alignment); + + return result; + +} + /* default negotiation code. * * Take intersection between src and sink pads, take first @@ -2644,7 +2732,7 @@ gst_base_src_default_negotiate (GstBaseSrc * basesrc) gboolean result = FALSE; /* first see what is possible on our source pad */ - thiscaps = gst_pad_get_caps_reffed (GST_BASE_SRC_PAD (basesrc)); + thiscaps = gst_pad_get_caps (GST_BASE_SRC_PAD (basesrc), NULL); GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps); /* nothing or anything is allowed, we're done */ if (thiscaps == NULL || gst_caps_is_any (thiscaps)) @@ -2654,36 +2742,35 @@ gst_base_src_default_negotiate (GstBaseSrc * basesrc) goto no_caps; /* get the peer caps */ - peercaps = gst_pad_peer_get_caps_reffed (GST_BASE_SRC_PAD (basesrc)); + peercaps = gst_pad_peer_get_caps (GST_BASE_SRC_PAD (basesrc), thiscaps); GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps); if (peercaps) { - /* get intersection */ - caps = - gst_caps_intersect_full (peercaps, thiscaps, GST_CAPS_INTERSECT_FIRST); - GST_DEBUG_OBJECT (basesrc, "intersect: %" GST_PTR_FORMAT, caps); - gst_caps_unref (peercaps); + /* The result is already a subset of our caps */ + caps = peercaps; + gst_caps_unref (thiscaps); } else { /* no peer, work with our own caps then */ - caps = gst_caps_copy (thiscaps); + caps = thiscaps; } - gst_caps_unref (thiscaps); - if (caps) { + if (caps && !gst_caps_is_empty (caps)) { + caps = gst_caps_make_writable (caps); + /* take first (and best, since they are sorted) possibility */ gst_caps_truncate (caps); /* now fixate */ - if (!gst_caps_is_empty (caps)) { + GST_DEBUG_OBJECT (basesrc, "have caps: %" GST_PTR_FORMAT, caps); + if (gst_caps_is_any (caps)) { + /* hmm, still anything, so element can do anything and + * nego is not needed */ + result = TRUE; + } else { gst_pad_fixate_caps (GST_BASE_SRC_PAD (basesrc), caps); GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps); - - if (gst_caps_is_any (caps)) { - /* hmm, still anything, so element can do anything and - * nego is not needed */ - result = TRUE; - } else if (gst_caps_is_fixed (caps)) { + if (gst_caps_is_fixed (caps)) { /* yay, fixed caps, use those then, it's possible that the subclass does * not accept this caps after all and we have to fail. */ - result = gst_pad_set_caps (GST_BASE_SRC_PAD (basesrc), caps); + result = gst_base_src_setcaps (basesrc, caps); } } gst_caps_unref (caps); @@ -2718,9 +2805,16 @@ gst_base_src_negotiate (GstBaseSrc * basesrc) bclass = GST_BASE_SRC_GET_CLASS (basesrc); - if (bclass->negotiate) + if (G_LIKELY (bclass->negotiate)) result = bclass->negotiate (basesrc); + if (G_LIKELY (result)) { + GstCaps *caps; + + caps = gst_pad_get_current_caps (basesrc->srcpad); + + result = gst_base_src_prepare_allocation (basesrc, caps); + } return result; } @@ -2744,8 +2838,8 @@ gst_base_src_start (GstBaseSrc * basesrc) gst_segment_init (&basesrc->segment, basesrc->segment.format); GST_OBJECT_UNLOCK (basesrc); - basesrc->data.ABI.running = FALSE; - basesrc->priv->newsegment_pending = FALSE; + basesrc->running = FALSE; + basesrc->priv->segment_pending = FALSE; bclass = GST_BASE_SRC_GET_CLASS (basesrc); if (bclass->start) @@ -2773,7 +2867,7 @@ gst_base_src_start (GstBaseSrc * basesrc) /* only update the size when operating in bytes, subclass is supposed * to set duration in the start method for other formats */ GST_OBJECT_LOCK (basesrc); - gst_segment_set_duration (&basesrc->segment, GST_FORMAT_BYTES, size); + basesrc->segment.duration = size; GST_OBJECT_UNLOCK (basesrc); } else { size = -1; @@ -2793,13 +2887,13 @@ gst_base_src_start (GstBaseSrc * basesrc) GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access); /* run typefind if we are random_access and the typefinding is enabled. */ - if (basesrc->random_access && basesrc->data.ABI.typefind && size != -1) { + if (basesrc->random_access && basesrc->typefind && size != -1) { GstCaps *caps; if (!(caps = gst_type_find_helper (basesrc->srcpad, size))) goto typefind_failed; - result = gst_pad_set_caps (basesrc->srcpad, caps); + result = gst_base_src_setcaps (basesrc, caps); gst_caps_unref (caps); } else { /* use class or default negotiate function */ @@ -2850,6 +2944,8 @@ gst_base_src_stop (GstBaseSrc * basesrc) if (bclass->stop) result = bclass->stop (basesrc); + gst_base_src_set_allocation (basesrc, NULL, NULL, 0, 0); + if (result) GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED); @@ -2867,6 +2963,7 @@ gst_base_src_set_flushing (GstBaseSrc * basesrc, bclass = GST_BASE_SRC_GET_CLASS (basesrc); if (flushing && unlock) { + gst_base_src_activate_pool (basesrc, FALSE); /* unlock any subclasses, we need to do this before grabbing the * LIVE_LOCK since we hold this lock before going into ::create. We pass an * unlock to the params because of backwards compat (see seek handler)*/ @@ -2898,6 +2995,8 @@ gst_base_src_set_flushing (GstBaseSrc * basesrc, /* signal the live source that it can start playing */ basesrc->live_running = live_play; + gst_base_src_activate_pool (basesrc, TRUE); + /* When unlocking drop all delayed events */ if (unlock) { GST_OBJECT_LOCK (basesrc); @@ -2929,6 +3028,7 @@ gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play) /* unlock subclasses locked in ::create, we only do this when we stop playing. */ if (!live_play) { GST_DEBUG_OBJECT (basesrc, "unlock"); + gst_base_src_activate_pool (basesrc, FALSE); if (bclass->unlock) bclass->unlock (basesrc); } @@ -2952,6 +3052,7 @@ gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play) /* clear our unlock request when going to PLAYING */ GST_DEBUG_OBJECT (basesrc, "unlock stop"); + gst_base_src_activate_pool (basesrc, TRUE); if (bclass->unlock_stop) bclass->unlock_stop (basesrc); @@ -2991,14 +3092,13 @@ gst_base_src_activate_push (GstPad * pad, gboolean active) if (G_UNLIKELY (!gst_base_src_start (basesrc))) goto error_start; - basesrc->priv->last_sent_eos = FALSE; basesrc->priv->discont = TRUE; gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL); /* do initial seek, which will start the task */ GST_OBJECT_LOCK (basesrc); - event = basesrc->data.ABI.pending_seek; - basesrc->data.ABI.pending_seek = NULL; + event = basesrc->pending_seek; + basesrc->pending_seek = NULL; GST_OBJECT_UNLOCK (basesrc); /* no need to unlock anything, the task is certainly @@ -3066,7 +3166,7 @@ gst_base_src_activate_pull (GstPad * pad, gboolean active) goto error_start; /* if not random_access, we cannot operate in pull mode for now */ - if (G_UNLIKELY (!gst_base_src_check_get_range (basesrc))) + if (G_UNLIKELY (!gst_base_src_is_random_access (basesrc))) goto no_get_range; /* stop flushing now but for live sources, still block in the LIVE lock when @@ -3077,9 +3177,6 @@ gst_base_src_activate_pull (GstPad * pad, gboolean active) /* flush all, there is no task to stop */ gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL); - /* don't send EOS when going from PAUSED => READY when in pull mode */ - basesrc->priv->last_sent_eos = TRUE; - if (G_UNLIKELY (!gst_base_src_stop (basesrc))) goto error_stop; } @@ -3146,27 +3243,12 @@ gst_base_src_change_state (GstElement * element, GstStateChange transition) break; case GST_STATE_CHANGE_PAUSED_TO_READY: { - GstEvent **event_p, *event; + GstEvent **event_p; /* we don't need to unblock anything here, the pad deactivation code * already did this */ - - /* FIXME, deprecate this behaviour, it is very dangerous. - * the prefered way of sending EOS downstream is by sending - * the EOS event to the element */ - if (!basesrc->priv->last_sent_eos) { - GST_DEBUG_OBJECT (basesrc, "Sending EOS event"); - event = gst_event_new_eos (); - gst_event_set_seqnum (event, basesrc->priv->seqnum); - gst_pad_push_event (basesrc->srcpad, event); - basesrc->priv->last_sent_eos = TRUE; - } g_atomic_int_set (&basesrc->priv->pending_eos, FALSE); - event_p = &basesrc->data.ABI.pending_seek; - gst_event_replace (event_p, NULL); - event_p = &basesrc->priv->close_segment; - gst_event_replace (event_p, NULL); - event_p = &basesrc->priv->start_segment; + event_p = &basesrc->pending_seek; gst_event_replace (event_p, NULL); break; } |