summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGöran Jönsson <goranjn@axis.com>2019-01-16 12:59:11 +0100
committerSebastian Dröge <slomo@coaxion.net>2019-02-02 10:42:33 +0000
commitafb27f91cfec706d2a4dbf8a6c787504731035a3 (patch)
tree47c02a49b078b1dd7b1185fa734c62ee1fb5f83f
parent4be7424de5f6828427231d3f30f2c745d61e0be7 (diff)
downloadgstreamer-afb27f91cfec706d2a4dbf8a6c787504731035a3.tar.gz
rtsp-server: remove recursive behavior
Introduce a threadpool to send rtp and rtcp to avoid recursive behavior.
-rw-r--r--gst/rtsp-server/rtsp-client.c1
-rw-r--r--gst/rtsp-server/rtsp-stream.c115
-rw-r--r--gst/rtsp-server/rtsp-stream.h3
3 files changed, 43 insertions, 76 deletions
diff --git a/gst/rtsp-server/rtsp-client.c b/gst/rtsp-server/rtsp-client.c
index e48440e72f..3a839ec8c4 100644
--- a/gst/rtsp-server/rtsp-client.c
+++ b/gst/rtsp-server/rtsp-client.c
@@ -2594,7 +2594,6 @@ handle_setup_request (GstRTSPClient * client, GstRTSPContext * ctx)
g_object_ref (trans);
add_data_seq (client, ct->interleaved.min);
add_data_seq (client, ct->interleaved.max);
- gst_rtsp_stream_set_watch_context (stream, priv->watch_context);
}
/* create and serialize the server transport */
diff --git a/gst/rtsp-server/rtsp-stream.c b/gst/rtsp-server/rtsp-stream.c
index 11612578b4..b8759fc77a 100644
--- a/gst/rtsp-server/rtsp-stream.c
+++ b/gst/rtsp-server/rtsp-stream.c
@@ -181,7 +181,7 @@ struct _GstRTSPStreamPrivate
GHashTable *ptmap;
GstRTSPPublishClockMode publish_clock_mode;
- GMainContext *watch_context;
+ GThreadPool *send_pool;
};
#define DEFAULT_CONTROL NULL
@@ -298,7 +298,7 @@ gst_rtsp_stream_init (GstRTSPStream * stream)
NULL, (GDestroyNotify) gst_caps_unref);
priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
(GDestroyNotify) gst_caps_unref);
- priv->watch_context = NULL;
+ priv->send_pool = NULL;
}
typedef struct _UdpClientAddrInfo UdpClientAddrInfo;
@@ -334,6 +334,8 @@ gst_rtsp_stream_finalize (GObject * obj)
/* we really need to be unjoined now */
g_return_if_fail (priv->joined_bin == NULL);
+ if (priv->send_pool)
+ g_thread_pool_free (priv->send_pool, TRUE, TRUE);
if (priv->mcast_addr_v4)
gst_rtsp_address_free (priv->mcast_addr_v4);
if (priv->mcast_addr_v6)
@@ -378,9 +380,6 @@ gst_rtsp_stream_finalize (GObject * obj)
g_hash_table_unref (priv->keys);
g_hash_table_destroy (priv->ptmap);
- if (priv->watch_context)
- g_main_context_unref (priv->watch_context);
-
G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
}
@@ -2569,6 +2568,34 @@ send_tcp_message (GstRTSPStream * stream, gint idx)
g_mutex_lock (&priv->lock);
}
+static void
+send_thread_main (gpointer data, gpointer user_data)
+{
+ GstRTSPStream *stream = user_data;
+ GstRTSPStreamPrivate *priv = stream->priv;
+ gint idx;
+ gint i;
+
+ g_mutex_lock (&priv->lock);
+ do {
+ idx = -1;
+ /* iterate from 1 and down, so we prioritize RTCP over RTP */
+ for (i = 1; i >= 0; i--) {
+ if (priv->have_buffer[i]) {
+ /* send message */
+ idx = i;
+ break;
+ }
+ }
+
+ if (idx != -1 && priv->n_outstanding == 0)
+ send_tcp_message (stream, idx);
+ } while (idx != -1 && priv->n_outstanding == 0);
+
+ GST_DEBUG_OBJECT (stream, "send thread done");
+ g_mutex_unlock (&priv->lock);
+}
+
static GstFlowReturn
handle_new_sample (GstAppSink * sink, gpointer user_data)
{
@@ -2579,6 +2606,12 @@ handle_new_sample (GstAppSink * sink, gpointer user_data)
g_mutex_lock (&priv->lock);
+ if (priv->send_pool == NULL) {
+ GST_DEBUG_OBJECT (stream, "create thread pool");
+ priv->send_pool =
+ g_thread_pool_new (send_thread_main, user_data, 1, TRUE, NULL);
+ }
+
for (i = 0; i < 2; i++)
if (GST_ELEMENT_CAST (sink) == priv->appsink[i]) {
priv->have_buffer[i] = TRUE;
@@ -4360,37 +4393,12 @@ mcast_error:
}
}
-static gboolean
-cb_send_tcp_message (GstRTSPStream * stream)
-{
- GstRTSPStreamPrivate *priv = stream->priv;
- gint idx = -1;
- gint i;
-
- g_mutex_lock (&priv->lock);
-
- /* iterate from 1 and down, so we prioritize RTCP over RTP */
- for (i = 1; i >= 0; i--) {
- if (priv->have_buffer[i]) {
- /* send message */
- idx = i;
- break;
- }
- }
-
- if (idx != -1)
- send_tcp_message (stream, idx);
- g_mutex_unlock (&priv->lock);
- return G_SOURCE_REMOVE;
-}
-
static void
on_message_sent (gpointer user_data)
{
GstRTSPStream *stream = user_data;
GstRTSPStreamPrivate *priv = stream->priv;
gint idx = -1;
- GSource *idle_src;
GST_DEBUG_OBJECT (stream, "message send complete");
@@ -4416,24 +4424,12 @@ on_message_sent (gpointer user_data)
}
if (idx != -1) {
- /* When appsink running this callback we want to send as much as we can
- * But when idle callback or watch callback is running we will first
- * queue an idle probe. This so we prevent a loop to occur were callback
- * is sending more data that then call the callback that sends more data
- * and so on. If the loop occur then it will starve out handling off
- * other events that are handled by watch's context. */
- if (priv->watch_context && g_main_context_is_owner (priv->watch_context)) {
- /* underlaying layer is running this callback */
- idle_src = g_idle_source_new ();
- g_source_set_callback (idle_src, (GSourceFunc) cb_send_tcp_message,
- g_object_ref (stream), g_object_unref);
- g_source_attach (idle_src, priv->watch_context);
- g_source_unref (idle_src);
- } else {
- /* appsink is running this callback */
- send_tcp_message (stream, idx);
- }
+ gint dummy;
+
+ GST_DEBUG_OBJECT (stream, "start thread");
+ g_thread_pool_push (priv->send_pool, &dummy, NULL);
}
+
g_mutex_unlock (&priv->lock);
return;
@@ -5802,28 +5798,3 @@ gst_rtsp_stream_get_ulpfec_percentage (GstRTSPStream * stream)
return res;
}
-
-/**
- * gst_rtsp_stream_set_watch_context:
- * @stream: a #GstRTSPStream
- * @context: a #GMainContext
- *
- * Sets stream private watch_context.
- *
- */
-void
-gst_rtsp_stream_set_watch_context (GstRTSPStream * stream,
- GMainContext * context)
-{
- GstRTSPStreamPrivate *priv;
- priv = stream->priv;
-
- g_mutex_lock (&priv->lock);
- if (priv->watch_context != NULL) {
- g_main_context_unref (priv->watch_context);
- priv->watch_context = NULL;
- }
- if (context)
- priv->watch_context = g_main_context_ref (context);
- g_mutex_unlock (&priv->lock);
-}
diff --git a/gst/rtsp-server/rtsp-stream.h b/gst/rtsp-server/rtsp-stream.h
index 53ad57c942..7910bb05d7 100644
--- a/gst/rtsp-server/rtsp-stream.h
+++ b/gst/rtsp-server/rtsp-stream.h
@@ -354,9 +354,6 @@ void gst_rtsp_stream_set_ulpfec_percentage (GstRTSPStream *stream,
GST_RTSP_SERVER_API
guint gst_rtsp_stream_get_ulpfec_percentage (GstRTSPStream *stream);
-GST_RTSP_SERVER_API
-void gst_rtsp_stream_set_watch_context (GstRTSPStream * stream, GMainContext * context);
-
/**
* GstRTSPStreamTransportFilterFunc:
* @stream: a #GstRTSPStream object