summaryrefslogtreecommitdiff
path: root/src/media-engines/gstreamer/rygel-gst-sink.vala
diff options
context:
space:
mode:
Diffstat (limited to 'src/media-engines/gstreamer/rygel-gst-sink.vala')
-rw-r--r--src/media-engines/gstreamer/rygel-gst-sink.vala144
1 files changed, 144 insertions, 0 deletions
diff --git a/src/media-engines/gstreamer/rygel-gst-sink.vala b/src/media-engines/gstreamer/rygel-gst-sink.vala
new file mode 100644
index 00000000..690977e4
--- /dev/null
+++ b/src/media-engines/gstreamer/rygel-gst-sink.vala
@@ -0,0 +1,144 @@
+/*
+ * Copyright (C) 2011 Nokia Corporation.
+ * Copyright (C) 2012 Intel Corporation.
+ *
+ * Author: Zeeshan Ali (Khattak) <zeeshanak@gnome.org>
+ * <zeeshan.ali@nokia.com>
+ * Jens Georg <jensg@openismus.com>
+ *
+ * This file is part of Rygel.
+ *
+ * Rygel is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Rygel is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+using Gst;
+
+internal class Rygel.GstSink : BaseSink {
+ public const string NAME = "http-gst-sink";
+ public const string PAD_NAME = "sink";
+ // High and low threshold for number of buffered chunks
+ private const uint MAX_BUFFERED_CHUNKS = 32;
+ private const uint MIN_BUFFERED_CHUNKS = 4;
+
+ public Cancellable cancellable;
+
+ private int priority;
+
+ private int64 chunks_buffered;
+ private int64 bytes_sent;
+ private int64 max_bytes;
+
+ private Mutex buffer_mutex = Mutex ();
+ private Cond buffer_condition = Cond ();
+ private unowned DataSource source;
+ private HTTPSeek offsets;
+
+ private bool frozen;
+
+ static construct {
+ var caps = new Caps.any ();
+ var template = new PadTemplate (PAD_NAME,
+ PadDirection.SINK,
+ PadPresence.ALWAYS,
+ caps);
+ add_pad_template (template);
+ }
+
+ public GstSink (DataSource source, HTTPSeek? offsets) {
+ this.chunks_buffered = 0;
+ this.bytes_sent = 0;
+ this.max_bytes = int64.MAX;
+ this.source = source;
+ this.offsets = offsets;
+
+ this.cancellable = new Cancellable ();
+
+ this.sync = false;
+ this.name = NAME;
+ this.frozen = false;
+
+ if (this.offsets != null) {
+ if (this.offsets.seek_type == HTTPSeekType.BYTE) {
+ this.max_bytes = this.offsets.length;
+ }
+ }
+
+ this.cancellable.cancelled.connect (this.on_cancelled);
+ }
+
+ public void freeze () {
+ if (this.frozen) {
+ return;
+ }
+
+ this.buffer_mutex.lock ();
+ this.frozen = true;
+ this.buffer_mutex.unlock ();
+ }
+
+ public void thaw () {
+ if (!this.frozen) {
+ return;
+ }
+
+ this.buffer_mutex.lock ();
+ this.frozen = false;
+ this.buffer_condition.broadcast ();
+ this.buffer_mutex.unlock ();
+ }
+
+ public override FlowReturn render (Buffer buffer) {
+ this.buffer_mutex.lock ();
+ while (!this.cancellable.is_cancelled () &&
+ this.frozen) {
+ // Client is either not reading (Paused) or not fast enough
+ this.buffer_condition.wait (this.buffer_mutex);
+ }
+ this.buffer_mutex.unlock ();
+
+ if (this.cancellable.is_cancelled ()) {
+ return FlowReturn.OK;
+ }
+
+ Idle.add_full (this.priority, () => {
+ return this.push_data (buffer);
+ });
+
+ return FlowReturn.OK;
+ }
+
+ // Runs in application thread
+ public bool push_data (Buffer buffer) {
+ var left = this.max_bytes - this.bytes_sent;
+
+ if (this.cancellable.is_cancelled () || left <= 0) {
+ return false;
+ }
+
+ var to_send = int64.min (buffer.size, left);
+
+ this.source.data_available (buffer.data[0:to_send]);
+ this.chunks_buffered++;
+ this.bytes_sent += to_send;
+
+ return false;
+ }
+
+ private void on_cancelled () {
+ this.buffer_mutex.lock ();
+ this.buffer_condition.broadcast ();
+ this.buffer_mutex.unlock ();
+ }
+}