summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Winship <danw@gnome.org>2013-12-10 20:30:05 +0100
committerDan Winship <danw@gnome.org>2014-11-02 10:24:52 -0500
commit11e3d519524bb7ff22020e34c70984288b79d1af (patch)
treef558aae9f481cdfb031d621289f959e4b516533b
parent678a3c40d0bb00c0832814d812cf7cbaac250912 (diff)
downloadlibsoup-wip/http2.tar.gz
-rw-r--r--libsoup/soup-message-queue.c42
-rw-r--r--libsoup/soup-message-queue.h15
-rw-r--r--libsoup/soup-session-host.c81
-rw-r--r--libsoup/soup-session-host.h14
-rw-r--r--libsoup/soup-session.c130
5 files changed, 204 insertions, 78 deletions
diff --git a/libsoup/soup-message-queue.c b/libsoup/soup-message-queue.c
index 575f51f5..961cd728 100644
--- a/libsoup/soup-message-queue.c
+++ b/libsoup/soup-message-queue.c
@@ -34,12 +34,11 @@ struct _SoupMessageQueue {
};
SoupMessageQueue *
-soup_message_queue_new (SoupSession *session)
+soup_message_queue_new (void)
{
SoupMessageQueue *queue;
queue = g_slice_new0 (SoupMessageQueue);
- queue->session = session;
g_mutex_init (&queue->mutex);
return queue;
}
@@ -62,29 +61,30 @@ queue_message_restarted (SoupMessage *msg, gpointer user_data)
}
/**
- * soup_message_queue_append:
- * @queue: a #SoupMessageQueue
+ * soup_message_queue_item_new:
+ * @session: the #SoupSession that will own the item
* @msg: a #SoupMessage
* @callback: the callback for @msg
* @user_data: the data to pass to @callback
*
- * Creates a new #SoupMessageQueueItem and appends it to @queue.
+ * Creates a new #SoupMessageQueueItem for @msg.
*
* Return value: the new item, which you must unref with
* soup_message_queue_unref_item() when you are done with.
**/
SoupMessageQueueItem *
-soup_message_queue_append (SoupMessageQueue *queue, SoupMessage *msg,
- SoupSessionCallback callback, gpointer user_data)
+soup_message_queue_item_new (SoupSession *session,
+ SoupMessage *msg,
+ SoupSessionCallback callback,
+ gpointer user_data)
{
SoupMessageQueueItem *item;
item = g_slice_new0 (SoupMessageQueueItem);
- item->session = g_object_ref (queue->session);
+ item->session = g_object_ref (session);
item->async_context = soup_session_get_async_context (item->session);
if (item->async_context)
g_main_context_ref (item->async_context);
- item->queue = queue;
item->msg = g_object_ref (msg);
item->callback = callback;
item->callback_data = user_data;
@@ -94,11 +94,26 @@ soup_message_queue_append (SoupMessageQueue *queue, SoupMessage *msg,
g_signal_connect (msg, "restarted",
G_CALLBACK (queue_message_restarted), item);
- /* Note: the initial ref_count of 1 represents the caller's
- * ref; the queue's own ref is indicated by the absence of the
- * "removed" flag.
- */
item->ref_count = 1;
+ item->removed = TRUE;
+
+ return item;
+}
+
+/**
+ * soup_message_queue_append:
+ * @queue: a #SoupMessageQueue
+ * @item: a #SoupMessageQueueItem
+ *
+ * Appends @item to @queue.
+ **/
+void
+soup_message_queue_append (SoupMessageQueue *queue,
+ SoupMessageQueueItem *item)
+{
+ g_return_if_fail (item->removed == TRUE);
+
+ item->removed = FALSE;
g_mutex_lock (&queue->mutex);
if (queue->head) {
@@ -127,7 +142,6 @@ soup_message_queue_append (SoupMessageQueue *queue, SoupMessage *msg,
queue->head = queue->tail = item;
g_mutex_unlock (&queue->mutex);
- return item;
}
/**
diff --git a/libsoup/soup-message-queue.h b/libsoup/soup-message-queue.h
index d2dfda43..be5689ac 100644
--- a/libsoup/soup-message-queue.h
+++ b/libsoup/soup-message-queue.h
@@ -10,6 +10,7 @@
#include "soup-connection.h"
#include "soup-message.h"
#include "soup-session.h"
+#include "soup-session-host.h"
G_BEGIN_DECLS
@@ -40,6 +41,7 @@ struct _SoupMessageQueueItem {
GCancellable *cancellable;
GError *error;
+ SoupSessionHost *host;
SoupConnection *conn;
GTask *task;
GSource *io_source;
@@ -60,12 +62,18 @@ struct _SoupMessageQueueItem {
SoupMessageQueueItem *related;
};
-SoupMessageQueue *soup_message_queue_new (SoupSession *session);
-SoupMessageQueueItem *soup_message_queue_append (SoupMessageQueue *queue,
+SoupMessageQueue *soup_message_queue_new (void);
+
+SoupMessageQueueItem *soup_message_queue_item_new (SoupSession *session,
SoupMessage *msg,
SoupSessionCallback callback,
gpointer user_data);
+void soup_message_queue_append (SoupMessageQueue *queue,
+ SoupMessageQueueItem *item);
+void soup_message_queue_remove (SoupMessageQueue *queue,
+ SoupMessageQueueItem *item);
+
SoupMessageQueueItem *soup_message_queue_lookup (SoupMessageQueue *queue,
SoupMessage *msg);
@@ -73,9 +81,6 @@ SoupMessageQueueItem *soup_message_queue_first (SoupMessageQueue *queue
SoupMessageQueueItem *soup_message_queue_next (SoupMessageQueue *queue,
SoupMessageQueueItem *item);
-void soup_message_queue_remove (SoupMessageQueue *queue,
- SoupMessageQueueItem *item);
-
void soup_message_queue_destroy (SoupMessageQueue *queue);
void soup_message_queue_item_ref (SoupMessageQueueItem *item);
diff --git a/libsoup/soup-session-host.c b/libsoup/soup-session-host.c
index af64fcff..ce1bc8c9 100644
--- a/libsoup/soup-session-host.c
+++ b/libsoup/soup-session-host.c
@@ -12,6 +12,7 @@
#include "soup-session-host.h"
#include "soup.h"
#include "soup-connection.h"
+#include "soup-message-queue.h"
#include "soup-misc-private.h"
#include "soup-session-private.h"
#include "soup-socket-private.h"
@@ -24,6 +25,8 @@ typedef struct {
SoupURI *uri;
SoupAddress *addr;
+ SoupMessageQueue *queue;
+
GSList *connections; /* CONTAINS: SoupConnection */
guint num_conns;
guint max_conns;
@@ -52,6 +55,8 @@ soup_session_host_init (SoupSessionHost *host)
SoupSessionHostPrivate *priv = SOUP_SESSION_HOST_GET_PRIVATE (host);
g_mutex_init (&priv->mutex);
+
+ priv->queue = soup_message_queue_new ();
}
static void
@@ -69,6 +74,8 @@ soup_session_host_finalize (GObject *object)
soup_uri_free (priv->uri);
g_object_unref (priv->addr);
+ soup_message_queue_destroy (priv->queue);
+
g_mutex_clear (&priv->mutex);
G_OBJECT_CLASS (soup_session_host_parent_class)->finalize (object);
@@ -133,17 +140,79 @@ soup_session_host_get_address (SoupSessionHost *host)
}
void
-soup_session_host_add_message (SoupSessionHost *host,
- SoupMessage *msg)
+soup_session_host_run_queue (SoupSessionHost *host,
+ gboolean *should_cleanup)
{
- SOUP_SESSION_HOST_GET_PRIVATE (host)->num_messages++;
+ SoupSessionHostPrivate *priv = SOUP_SESSION_HOST_GET_PRIVATE (host);
+ SoupMessageQueueItem *item;
+ SoupMessage *msg;
+
+ g_object_ref (host);
+ soup_session_host_cleanup_connections (host, FALSE);
+
+ for (item = soup_message_queue_first (priv->queue);
+ item;
+ item = soup_message_queue_next (priv->queue, item)) {
+ msg = item->msg;
+
+ /* CONNECT messages are handled specially */
+ if (msg->method == SOUP_METHOD_CONNECT)
+ continue;
+
+ if (!item->async ||
+ item->async_context != soup_session_get_async_context (priv->session))
+ continue;
+
+ soup_session_process_queue_item (priv->session, item, should_cleanup, TRUE);
+ }
+
+ g_object_unref (host);
+}
+
+GSList *
+soup_session_host_get_queue_items (SoupSessionHost *host)
+{
+ SoupSessionHostPrivate *priv = SOUP_SESSION_HOST_GET_PRIVATE (host);
+ SoupMessageQueueItem *item;
+ GSList *items = NULL;
+
+ for (item = soup_message_queue_first (priv->queue);
+ item;
+ item = soup_message_queue_next (priv->queue, item)) {
+ soup_message_queue_item_ref (item);
+ items = g_slist_prepend (items, item);
+ }
+
+ return items;
}
void
-soup_session_host_remove_message (SoupSessionHost *host,
- SoupMessage *msg)
+soup_session_host_add_queue_item (SoupSessionHost *host,
+ SoupMessageQueueItem *item)
+{
+ SoupSessionHostPrivate *priv = SOUP_SESSION_HOST_GET_PRIVATE (host);
+
+ priv->num_messages++;
+ soup_message_queue_append (priv->queue, item);
+}
+
+SoupMessageQueueItem *
+soup_session_host_lookup_queue_item (SoupSessionHost *host,
+ SoupMessage *msg)
{
- SOUP_SESSION_HOST_GET_PRIVATE (host)->num_messages--;
+ SoupSessionHostPrivate *priv = SOUP_SESSION_HOST_GET_PRIVATE (host);
+
+ return soup_message_queue_lookup (priv->queue, msg);
+}
+
+void
+soup_session_host_remove_item (SoupSessionHost *host,
+ SoupMessageQueueItem *item)
+{
+ SoupSessionHostPrivate *priv = SOUP_SESSION_HOST_GET_PRIVATE (host);
+
+ soup_message_queue_remove (priv->queue, item);
+ priv->num_messages--;
}
static gboolean
diff --git a/libsoup/soup-session-host.h b/libsoup/soup-session-host.h
index 4e5694d8..31a6fa68 100644
--- a/libsoup/soup-session-host.h
+++ b/libsoup/soup-session-host.h
@@ -35,10 +35,16 @@ SoupSessionHost *soup_session_host_new (SoupSession *se
SoupURI *soup_session_host_get_uri (SoupSessionHost *host);
SoupAddress *soup_session_host_get_address (SoupSessionHost *host);
-void soup_session_host_add_message (SoupSessionHost *host,
- SoupMessage *msg);
-void soup_session_host_remove_message (SoupSessionHost *host,
- SoupMessage *msg);
+void soup_session_host_add_queue_item (SoupSessionHost *host,
+ SoupMessageQueueItem *item);
+SoupMessageQueueItem *soup_session_host_lookup_queue_item (SoupSessionHost *host,
+ SoupMessage *msg);
+void soup_session_host_run_queue (SoupSessionHost *host,
+ gboolean *should_cleanup);
+GSList *soup_session_host_get_queue_items (SoupSessionHost *host);
+
+void soup_session_host_remove_item (SoupSessionHost *host,
+ SoupMessageQueueItem *item);
SoupConnection *soup_session_host_get_connection (SoupSessionHost *host,
gboolean need_new_connection,
diff --git a/libsoup/soup-session.c b/libsoup/soup-session.c
index 58e6c4e0..a70593ba 100644
--- a/libsoup/soup-session.c
+++ b/libsoup/soup-session.c
@@ -107,8 +107,6 @@ typedef struct {
SoupSocketProperties *socket_props;
- SoupMessageQueue *queue;
-
char *user_agent;
char *accept_language;
gboolean accept_language_auto;
@@ -217,8 +215,6 @@ soup_session_init (SoupSession *session)
priv->session = session;
- priv->queue = soup_message_queue_new (session);
-
g_mutex_init (&priv->conn_lock);
g_cond_init (&priv->conn_cond);
priv->http_hosts = g_hash_table_new_full (soup_host_uri_hash,
@@ -328,8 +324,6 @@ soup_session_finalize (GObject *object)
SoupSession *session = SOUP_SESSION (object);
SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
- soup_message_queue_destroy (priv->queue);
-
g_mutex_clear (&priv->conn_lock);
g_cond_clear (&priv->conn_cond);
g_hash_table_destroy (priv->http_hosts);
@@ -1186,12 +1180,17 @@ re_emit_connection_event (SoupConnection *conn,
static void
soup_session_set_item_connection (SoupSession *session,
SoupMessageQueueItem *item,
+ SoupSessionHost *host,
SoupConnection *conn)
{
if (item->conn) {
g_signal_handlers_disconnect_by_func (item->conn, re_emit_connection_event, item);
g_object_unref (item->conn);
}
+ if (item->host && item->host != host) {
+ soup_session_host_remove_item (item->host, item);
+ item->host = NULL;
+ }
item->conn = conn;
soup_message_set_connection (item->msg, conn);
@@ -1201,6 +1200,11 @@ soup_session_set_item_connection (SoupSession *session,
g_signal_connect (item->conn, "event",
G_CALLBACK (re_emit_connection_event), item);
}
+
+ if (!item->host) {
+ item->host = host;
+ soup_session_host_add_queue_item (host, item);
+ }
}
static void
@@ -1213,7 +1217,7 @@ message_restarted (SoupMessage *msg, gpointer user_data)
SOUP_STATUS_IS_REDIRECTION (msg->status_code))) {
if (soup_connection_get_state (item->conn) == SOUP_CONNECTION_IN_USE)
soup_connection_set_state (item->conn, SOUP_CONNECTION_IDLE);
- soup_session_set_item_connection (item->session, item, NULL);
+ soup_session_set_item_connection (item->session, item, NULL, NULL);
}
soup_message_cleanup_response (msg);
@@ -1230,13 +1234,13 @@ soup_session_append_queue_item (SoupSession *session, SoupMessage *msg,
soup_message_cleanup_response (msg);
- item = soup_message_queue_append (priv->queue, msg, callback, user_data);
+ item = soup_message_queue_item_new (session, msg, callback, user_data);
item->async = async;
item->new_api = new_api;
g_mutex_lock (&priv->conn_lock);
host = get_host_for_message (session, item->msg);
- soup_session_host_add_message (host, msg);
+ soup_session_host_add_queue_item (host, item);
g_mutex_unlock (&priv->conn_lock);
if (!(soup_message_get_flags (msg) & SOUP_MESSAGE_NO_REDIRECT)) {
@@ -1406,8 +1410,18 @@ soup_session_lookup_queue_item (SoupSession *session,
SoupMessage *msg)
{
SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
+ GSList *hosts, *h;
+ SoupMessageQueueItem *item = NULL;
+
+ g_mutex_lock (&priv->conn_lock);
+ hosts = get_all_hosts (session);
+ g_mutex_unlock (&priv->conn_lock);
+
+ for (h = hosts; h && !item; h = h->next)
+ item = soup_session_host_lookup_queue_item (h->data, msg);
- return soup_message_queue_lookup (priv->queue, msg);
+ g_slist_free_full (hosts, g_object_unref);
+ return item;
}
static void
@@ -1421,7 +1435,7 @@ soup_session_unqueue_item (SoupSession *session,
if (item->msg->method != SOUP_METHOD_CONNECT ||
!SOUP_STATUS_IS_SUCCESSFUL (item->msg->status_code))
soup_connection_set_state (item->conn, SOUP_CONNECTION_IDLE);
- soup_session_set_item_connection (session, item, NULL);
+ soup_session_set_item_connection (session, item, NULL, NULL);
}
if (item->state != SOUP_MESSAGE_FINISHED) {
@@ -1429,11 +1443,9 @@ soup_session_unqueue_item (SoupSession *session,
return;
}
- soup_message_queue_remove (priv->queue, item);
-
g_mutex_lock (&priv->conn_lock);
host = get_host_for_message (session, item->msg);
- soup_session_host_remove_message (host, item->msg);
+ soup_session_host_remove_item (host, item);
g_cond_broadcast (&priv->conn_cond);
g_mutex_unlock (&priv->conn_lock);
@@ -1574,7 +1586,7 @@ tunnel_complete (SoupMessageQueueItem *tunnel_item,
status = status_from_connect_error (item, error);
if (!SOUP_STATUS_IS_SUCCESSFUL (status)) {
soup_connection_disconnect (item->conn);
- soup_session_set_item_connection (session, item, NULL);
+ soup_session_set_item_connection (session, item, NULL, NULL);
if (!item->new_api || item->msg->status_code == 0)
soup_session_set_item_status (session, item, status, error);
}
@@ -1659,7 +1671,7 @@ tunnel_connect (SoupMessageQueueItem *item)
g_object_unref (msg);
tunnel_item->related = item;
soup_message_queue_item_ref (item);
- soup_session_set_item_connection (session, tunnel_item, item->conn);
+ soup_session_set_item_connection (session, tunnel_item, item->host, item->conn);
tunnel_item->state = SOUP_MESSAGE_RUNNING;
g_signal_emit (session, signals[TUNNELING], 0, tunnel_item->conn);
@@ -1687,7 +1699,7 @@ connect_complete (SoupMessageQueueItem *item, SoupConnection *conn, GError *erro
if (item->state == SOUP_MESSAGE_CONNECTING) {
if (!item->new_api || item->msg->status_code == 0)
soup_session_set_item_status (session, item, status, error);
- soup_session_set_item_connection (session, item, NULL);
+ soup_session_set_item_connection (session, item, NULL, NULL);
item->state = SOUP_MESSAGE_READY;
}
}
@@ -1800,7 +1812,7 @@ get_connection (SoupMessageQueueItem *item, gboolean *should_cleanup)
return FALSE;
}
- soup_session_set_item_connection (session, item, conn);
+ soup_session_set_item_connection (session, item, host, conn);
if (soup_connection_get_state (item->conn) != SOUP_CONNECTION_NEW) {
item->state = SOUP_MESSAGE_READY;
@@ -1917,29 +1929,19 @@ static void
async_run_queue (SoupSession *session)
{
SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
- SoupMessageQueueItem *item;
- SoupMessage *msg;
gboolean try_cleanup = TRUE, should_cleanup = FALSE;
+ GSList *hosts, *h;
g_object_ref (session);
soup_session_cleanup_connections (session, FALSE);
- try_again:
- for (item = soup_message_queue_first (priv->queue);
- item;
- item = soup_message_queue_next (priv->queue, item)) {
- msg = item->msg;
-
- /* CONNECT messages are handled specially */
- if (msg->method == SOUP_METHOD_CONNECT)
- continue;
-
- if (!item->async ||
- item->async_context != soup_session_get_async_context (session))
- continue;
+ g_mutex_lock (&priv->conn_lock);
+ hosts = get_all_hosts (session);
+ g_mutex_unlock (&priv->conn_lock);
- soup_session_process_queue_item (session, item, &should_cleanup, TRUE);
- }
+ try_again:
+ for (h = hosts; h; h = hosts->next)
+ soup_session_host_run_queue (h->data, &should_cleanup);
if (try_cleanup && should_cleanup) {
/* There is at least one message in the queue that
@@ -1952,6 +1954,7 @@ async_run_queue (SoupSession *session)
}
}
+ g_slist_free_full (hosts, g_object_unref);
g_object_unref (session);
}
@@ -2158,19 +2161,42 @@ soup_session_pause_message (SoupSession *session,
soup_message_queue_item_unref (item);
}
+static GSList *
+get_all_queue_items (SoupSession *session)
+{
+ GSList *hosts, *h, *items, *host_items;
+
+ hosts = get_all_hosts (session);
+
+ items = NULL;
+ for (h = hosts; h; h = h->next) {
+ host_items = soup_session_host_get_queue_items (h->data);
+ items = g_slist_concat (host_items, items);
+ }
+
+ g_slist_free_full (hosts, g_object_unref);
+
+ return items;
+}
+
static void
soup_session_real_kick_queue (SoupSession *session)
{
SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
+ GSList *items, *i;
SoupMessageQueueItem *item;
gboolean have_sync_items = FALSE;
if (priv->disposed)
return;
- for (item = soup_message_queue_first (priv->queue);
- item;
- item = soup_message_queue_next (priv->queue, item)) {
+ g_mutex_lock (&priv->conn_lock);
+ items = get_all_queue_items (session);
+ g_mutex_unlock (&priv->conn_lock);
+
+ for (i = items; i; i = i->next) {
+ item = i->data;
+
if (item->async) {
GSource *source;
@@ -2189,6 +2215,7 @@ soup_session_real_kick_queue (SoupSession *session)
} else
have_sync_items = TRUE;
}
+ g_slist_free_full (items, (GDestroyNotify) soup_message_queue_item_unref);
if (have_sync_items) {
g_mutex_lock (&priv->conn_lock);
@@ -2312,26 +2339,29 @@ static void
soup_session_real_flush_queue (SoupSession *session)
{
SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
+ GSList *items, *i;
SoupMessageQueueItem *item;
GHashTable *current = NULL;
gboolean done = FALSE;
+ g_mutex_lock (&priv->conn_lock);
+ items = get_all_queue_items (session);
+ g_mutex_unlock (&priv->conn_lock);
+
if (SOUP_IS_SESSION_SYNC (session)) {
/* Record the current contents of the queue */
current = g_hash_table_new (NULL, NULL);
- for (item = soup_message_queue_first (priv->queue);
- item;
- item = soup_message_queue_next (priv->queue, item))
- g_hash_table_insert (current, item, item);
+ for (i = items; i; i = i->next)
+ g_hash_table_insert (current, i->data, i->data);
}
/* Cancel everything */
- for (item = soup_message_queue_first (priv->queue);
- item;
- item = soup_message_queue_next (priv->queue, item)) {
+ for (i = items; i; i = i->next) {
+ item = i->data;
soup_session_cancel_message (session, item->msg,
SOUP_STATUS_CANCELLED);
}
+ g_slist_free_full (items, (GDestroyNotify) soup_message_queue_item_unref);
if (SOUP_IS_SESSION_SYNC (session)) {
/* Wait until all of the items in @current have been
@@ -2345,12 +2375,14 @@ soup_session_real_flush_queue (SoupSession *session)
g_mutex_lock (&priv->conn_lock);
do {
done = TRUE;
- for (item = soup_message_queue_first (priv->queue);
- item;
- item = soup_message_queue_next (priv->queue, item)) {
- if (g_hash_table_lookup (current, item))
+ items = get_all_queue_items (session);
+ for (i = items; i; i = i->next) {
+ if (g_hash_table_lookup (current, i->data)) {
done = FALSE;
+ break;
+ }
}
+ g_slist_free_full (items, (GDestroyNotify) soup_message_queue_item_unref);
if (!done)
g_cond_wait (&priv->conn_cond, &priv->conn_lock);