diff options
author | Dan Winship <danw@gnome.org> | 2013-12-10 20:30:05 +0100 |
---|---|---|
committer | Dan Winship <danw@gnome.org> | 2014-11-02 10:24:52 -0500 |
commit | 11e3d519524bb7ff22020e34c70984288b79d1af (patch) | |
tree | f558aae9f481cdfb031d621289f959e4b516533b | |
parent | 678a3c40d0bb00c0832814d812cf7cbaac250912 (diff) | |
download | libsoup-wip/http2.tar.gz |
wipwip/http2
-rw-r--r-- | libsoup/soup-message-queue.c | 42 | ||||
-rw-r--r-- | libsoup/soup-message-queue.h | 15 | ||||
-rw-r--r-- | libsoup/soup-session-host.c | 81 | ||||
-rw-r--r-- | libsoup/soup-session-host.h | 14 | ||||
-rw-r--r-- | libsoup/soup-session.c | 130 |
5 files changed, 204 insertions, 78 deletions
diff --git a/libsoup/soup-message-queue.c b/libsoup/soup-message-queue.c index 575f51f5..961cd728 100644 --- a/libsoup/soup-message-queue.c +++ b/libsoup/soup-message-queue.c @@ -34,12 +34,11 @@ struct _SoupMessageQueue { }; SoupMessageQueue * -soup_message_queue_new (SoupSession *session) +soup_message_queue_new (void) { SoupMessageQueue *queue; queue = g_slice_new0 (SoupMessageQueue); - queue->session = session; g_mutex_init (&queue->mutex); return queue; } @@ -62,29 +61,30 @@ queue_message_restarted (SoupMessage *msg, gpointer user_data) } /** - * soup_message_queue_append: - * @queue: a #SoupMessageQueue + * soup_message_queue_item_new: + * @session: the #SoupSession that will own the item * @msg: a #SoupMessage * @callback: the callback for @msg * @user_data: the data to pass to @callback * - * Creates a new #SoupMessageQueueItem and appends it to @queue. + * Creates a new #SoupMessageQueueItem for @msg. * * Return value: the new item, which you must unref with * soup_message_queue_unref_item() when you are done with. **/ SoupMessageQueueItem * -soup_message_queue_append (SoupMessageQueue *queue, SoupMessage *msg, - SoupSessionCallback callback, gpointer user_data) +soup_message_queue_item_new (SoupSession *session, + SoupMessage *msg, + SoupSessionCallback callback, + gpointer user_data) { SoupMessageQueueItem *item; item = g_slice_new0 (SoupMessageQueueItem); - item->session = g_object_ref (queue->session); + item->session = g_object_ref (session); item->async_context = soup_session_get_async_context (item->session); if (item->async_context) g_main_context_ref (item->async_context); - item->queue = queue; item->msg = g_object_ref (msg); item->callback = callback; item->callback_data = user_data; @@ -94,11 +94,26 @@ soup_message_queue_append (SoupMessageQueue *queue, SoupMessage *msg, g_signal_connect (msg, "restarted", G_CALLBACK (queue_message_restarted), item); - /* Note: the initial ref_count of 1 represents the caller's - * ref; the queue's own ref is indicated by the absence of the - * "removed" flag. - */ item->ref_count = 1; + item->removed = TRUE; + + return item; +} + +/** + * soup_message_queue_append: + * @queue: a #SoupMessageQueue + * @item: a #SoupMessageQueueItem + * + * Appends @item to @queue. + **/ +void +soup_message_queue_append (SoupMessageQueue *queue, + SoupMessageQueueItem *item) +{ + g_return_if_fail (item->removed == TRUE); + + item->removed = FALSE; g_mutex_lock (&queue->mutex); if (queue->head) { @@ -127,7 +142,6 @@ soup_message_queue_append (SoupMessageQueue *queue, SoupMessage *msg, queue->head = queue->tail = item; g_mutex_unlock (&queue->mutex); - return item; } /** diff --git a/libsoup/soup-message-queue.h b/libsoup/soup-message-queue.h index d2dfda43..be5689ac 100644 --- a/libsoup/soup-message-queue.h +++ b/libsoup/soup-message-queue.h @@ -10,6 +10,7 @@ #include "soup-connection.h" #include "soup-message.h" #include "soup-session.h" +#include "soup-session-host.h" G_BEGIN_DECLS @@ -40,6 +41,7 @@ struct _SoupMessageQueueItem { GCancellable *cancellable; GError *error; + SoupSessionHost *host; SoupConnection *conn; GTask *task; GSource *io_source; @@ -60,12 +62,18 @@ struct _SoupMessageQueueItem { SoupMessageQueueItem *related; }; -SoupMessageQueue *soup_message_queue_new (SoupSession *session); -SoupMessageQueueItem *soup_message_queue_append (SoupMessageQueue *queue, +SoupMessageQueue *soup_message_queue_new (void); + +SoupMessageQueueItem *soup_message_queue_item_new (SoupSession *session, SoupMessage *msg, SoupSessionCallback callback, gpointer user_data); +void soup_message_queue_append (SoupMessageQueue *queue, + SoupMessageQueueItem *item); +void soup_message_queue_remove (SoupMessageQueue *queue, + SoupMessageQueueItem *item); + SoupMessageQueueItem *soup_message_queue_lookup (SoupMessageQueue *queue, SoupMessage *msg); @@ -73,9 +81,6 @@ SoupMessageQueueItem *soup_message_queue_first (SoupMessageQueue *queue SoupMessageQueueItem *soup_message_queue_next (SoupMessageQueue *queue, SoupMessageQueueItem *item); -void soup_message_queue_remove (SoupMessageQueue *queue, - SoupMessageQueueItem *item); - void soup_message_queue_destroy (SoupMessageQueue *queue); void soup_message_queue_item_ref (SoupMessageQueueItem *item); diff --git a/libsoup/soup-session-host.c b/libsoup/soup-session-host.c index af64fcff..ce1bc8c9 100644 --- a/libsoup/soup-session-host.c +++ b/libsoup/soup-session-host.c @@ -12,6 +12,7 @@ #include "soup-session-host.h" #include "soup.h" #include "soup-connection.h" +#include "soup-message-queue.h" #include "soup-misc-private.h" #include "soup-session-private.h" #include "soup-socket-private.h" @@ -24,6 +25,8 @@ typedef struct { SoupURI *uri; SoupAddress *addr; + SoupMessageQueue *queue; + GSList *connections; /* CONTAINS: SoupConnection */ guint num_conns; guint max_conns; @@ -52,6 +55,8 @@ soup_session_host_init (SoupSessionHost *host) SoupSessionHostPrivate *priv = SOUP_SESSION_HOST_GET_PRIVATE (host); g_mutex_init (&priv->mutex); + + priv->queue = soup_message_queue_new (); } static void @@ -69,6 +74,8 @@ soup_session_host_finalize (GObject *object) soup_uri_free (priv->uri); g_object_unref (priv->addr); + soup_message_queue_destroy (priv->queue); + g_mutex_clear (&priv->mutex); G_OBJECT_CLASS (soup_session_host_parent_class)->finalize (object); @@ -133,17 +140,79 @@ soup_session_host_get_address (SoupSessionHost *host) } void -soup_session_host_add_message (SoupSessionHost *host, - SoupMessage *msg) +soup_session_host_run_queue (SoupSessionHost *host, + gboolean *should_cleanup) { - SOUP_SESSION_HOST_GET_PRIVATE (host)->num_messages++; + SoupSessionHostPrivate *priv = SOUP_SESSION_HOST_GET_PRIVATE (host); + SoupMessageQueueItem *item; + SoupMessage *msg; + + g_object_ref (host); + soup_session_host_cleanup_connections (host, FALSE); + + for (item = soup_message_queue_first (priv->queue); + item; + item = soup_message_queue_next (priv->queue, item)) { + msg = item->msg; + + /* CONNECT messages are handled specially */ + if (msg->method == SOUP_METHOD_CONNECT) + continue; + + if (!item->async || + item->async_context != soup_session_get_async_context (priv->session)) + continue; + + soup_session_process_queue_item (priv->session, item, should_cleanup, TRUE); + } + + g_object_unref (host); +} + +GSList * +soup_session_host_get_queue_items (SoupSessionHost *host) +{ + SoupSessionHostPrivate *priv = SOUP_SESSION_HOST_GET_PRIVATE (host); + SoupMessageQueueItem *item; + GSList *items = NULL; + + for (item = soup_message_queue_first (priv->queue); + item; + item = soup_message_queue_next (priv->queue, item)) { + soup_message_queue_item_ref (item); + items = g_slist_prepend (items, item); + } + + return items; } void -soup_session_host_remove_message (SoupSessionHost *host, - SoupMessage *msg) +soup_session_host_add_queue_item (SoupSessionHost *host, + SoupMessageQueueItem *item) +{ + SoupSessionHostPrivate *priv = SOUP_SESSION_HOST_GET_PRIVATE (host); + + priv->num_messages++; + soup_message_queue_append (priv->queue, item); +} + +SoupMessageQueueItem * +soup_session_host_lookup_queue_item (SoupSessionHost *host, + SoupMessage *msg) { - SOUP_SESSION_HOST_GET_PRIVATE (host)->num_messages--; + SoupSessionHostPrivate *priv = SOUP_SESSION_HOST_GET_PRIVATE (host); + + return soup_message_queue_lookup (priv->queue, msg); +} + +void +soup_session_host_remove_item (SoupSessionHost *host, + SoupMessageQueueItem *item) +{ + SoupSessionHostPrivate *priv = SOUP_SESSION_HOST_GET_PRIVATE (host); + + soup_message_queue_remove (priv->queue, item); + priv->num_messages--; } static gboolean diff --git a/libsoup/soup-session-host.h b/libsoup/soup-session-host.h index 4e5694d8..31a6fa68 100644 --- a/libsoup/soup-session-host.h +++ b/libsoup/soup-session-host.h @@ -35,10 +35,16 @@ SoupSessionHost *soup_session_host_new (SoupSession *se SoupURI *soup_session_host_get_uri (SoupSessionHost *host); SoupAddress *soup_session_host_get_address (SoupSessionHost *host); -void soup_session_host_add_message (SoupSessionHost *host, - SoupMessage *msg); -void soup_session_host_remove_message (SoupSessionHost *host, - SoupMessage *msg); +void soup_session_host_add_queue_item (SoupSessionHost *host, + SoupMessageQueueItem *item); +SoupMessageQueueItem *soup_session_host_lookup_queue_item (SoupSessionHost *host, + SoupMessage *msg); +void soup_session_host_run_queue (SoupSessionHost *host, + gboolean *should_cleanup); +GSList *soup_session_host_get_queue_items (SoupSessionHost *host); + +void soup_session_host_remove_item (SoupSessionHost *host, + SoupMessageQueueItem *item); SoupConnection *soup_session_host_get_connection (SoupSessionHost *host, gboolean need_new_connection, diff --git a/libsoup/soup-session.c b/libsoup/soup-session.c index 58e6c4e0..a70593ba 100644 --- a/libsoup/soup-session.c +++ b/libsoup/soup-session.c @@ -107,8 +107,6 @@ typedef struct { SoupSocketProperties *socket_props; - SoupMessageQueue *queue; - char *user_agent; char *accept_language; gboolean accept_language_auto; @@ -217,8 +215,6 @@ soup_session_init (SoupSession *session) priv->session = session; - priv->queue = soup_message_queue_new (session); - g_mutex_init (&priv->conn_lock); g_cond_init (&priv->conn_cond); priv->http_hosts = g_hash_table_new_full (soup_host_uri_hash, @@ -328,8 +324,6 @@ soup_session_finalize (GObject *object) SoupSession *session = SOUP_SESSION (object); SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session); - soup_message_queue_destroy (priv->queue); - g_mutex_clear (&priv->conn_lock); g_cond_clear (&priv->conn_cond); g_hash_table_destroy (priv->http_hosts); @@ -1186,12 +1180,17 @@ re_emit_connection_event (SoupConnection *conn, static void soup_session_set_item_connection (SoupSession *session, SoupMessageQueueItem *item, + SoupSessionHost *host, SoupConnection *conn) { if (item->conn) { g_signal_handlers_disconnect_by_func (item->conn, re_emit_connection_event, item); g_object_unref (item->conn); } + if (item->host && item->host != host) { + soup_session_host_remove_item (item->host, item); + item->host = NULL; + } item->conn = conn; soup_message_set_connection (item->msg, conn); @@ -1201,6 +1200,11 @@ soup_session_set_item_connection (SoupSession *session, g_signal_connect (item->conn, "event", G_CALLBACK (re_emit_connection_event), item); } + + if (!item->host) { + item->host = host; + soup_session_host_add_queue_item (host, item); + } } static void @@ -1213,7 +1217,7 @@ message_restarted (SoupMessage *msg, gpointer user_data) SOUP_STATUS_IS_REDIRECTION (msg->status_code))) { if (soup_connection_get_state (item->conn) == SOUP_CONNECTION_IN_USE) soup_connection_set_state (item->conn, SOUP_CONNECTION_IDLE); - soup_session_set_item_connection (item->session, item, NULL); + soup_session_set_item_connection (item->session, item, NULL, NULL); } soup_message_cleanup_response (msg); @@ -1230,13 +1234,13 @@ soup_session_append_queue_item (SoupSession *session, SoupMessage *msg, soup_message_cleanup_response (msg); - item = soup_message_queue_append (priv->queue, msg, callback, user_data); + item = soup_message_queue_item_new (session, msg, callback, user_data); item->async = async; item->new_api = new_api; g_mutex_lock (&priv->conn_lock); host = get_host_for_message (session, item->msg); - soup_session_host_add_message (host, msg); + soup_session_host_add_queue_item (host, item); g_mutex_unlock (&priv->conn_lock); if (!(soup_message_get_flags (msg) & SOUP_MESSAGE_NO_REDIRECT)) { @@ -1406,8 +1410,18 @@ soup_session_lookup_queue_item (SoupSession *session, SoupMessage *msg) { SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session); + GSList *hosts, *h; + SoupMessageQueueItem *item = NULL; + + g_mutex_lock (&priv->conn_lock); + hosts = get_all_hosts (session); + g_mutex_unlock (&priv->conn_lock); + + for (h = hosts; h && !item; h = h->next) + item = soup_session_host_lookup_queue_item (h->data, msg); - return soup_message_queue_lookup (priv->queue, msg); + g_slist_free_full (hosts, g_object_unref); + return item; } static void @@ -1421,7 +1435,7 @@ soup_session_unqueue_item (SoupSession *session, if (item->msg->method != SOUP_METHOD_CONNECT || !SOUP_STATUS_IS_SUCCESSFUL (item->msg->status_code)) soup_connection_set_state (item->conn, SOUP_CONNECTION_IDLE); - soup_session_set_item_connection (session, item, NULL); + soup_session_set_item_connection (session, item, NULL, NULL); } if (item->state != SOUP_MESSAGE_FINISHED) { @@ -1429,11 +1443,9 @@ soup_session_unqueue_item (SoupSession *session, return; } - soup_message_queue_remove (priv->queue, item); - g_mutex_lock (&priv->conn_lock); host = get_host_for_message (session, item->msg); - soup_session_host_remove_message (host, item->msg); + soup_session_host_remove_item (host, item); g_cond_broadcast (&priv->conn_cond); g_mutex_unlock (&priv->conn_lock); @@ -1574,7 +1586,7 @@ tunnel_complete (SoupMessageQueueItem *tunnel_item, status = status_from_connect_error (item, error); if (!SOUP_STATUS_IS_SUCCESSFUL (status)) { soup_connection_disconnect (item->conn); - soup_session_set_item_connection (session, item, NULL); + soup_session_set_item_connection (session, item, NULL, NULL); if (!item->new_api || item->msg->status_code == 0) soup_session_set_item_status (session, item, status, error); } @@ -1659,7 +1671,7 @@ tunnel_connect (SoupMessageQueueItem *item) g_object_unref (msg); tunnel_item->related = item; soup_message_queue_item_ref (item); - soup_session_set_item_connection (session, tunnel_item, item->conn); + soup_session_set_item_connection (session, tunnel_item, item->host, item->conn); tunnel_item->state = SOUP_MESSAGE_RUNNING; g_signal_emit (session, signals[TUNNELING], 0, tunnel_item->conn); @@ -1687,7 +1699,7 @@ connect_complete (SoupMessageQueueItem *item, SoupConnection *conn, GError *erro if (item->state == SOUP_MESSAGE_CONNECTING) { if (!item->new_api || item->msg->status_code == 0) soup_session_set_item_status (session, item, status, error); - soup_session_set_item_connection (session, item, NULL); + soup_session_set_item_connection (session, item, NULL, NULL); item->state = SOUP_MESSAGE_READY; } } @@ -1800,7 +1812,7 @@ get_connection (SoupMessageQueueItem *item, gboolean *should_cleanup) return FALSE; } - soup_session_set_item_connection (session, item, conn); + soup_session_set_item_connection (session, item, host, conn); if (soup_connection_get_state (item->conn) != SOUP_CONNECTION_NEW) { item->state = SOUP_MESSAGE_READY; @@ -1917,29 +1929,19 @@ static void async_run_queue (SoupSession *session) { SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session); - SoupMessageQueueItem *item; - SoupMessage *msg; gboolean try_cleanup = TRUE, should_cleanup = FALSE; + GSList *hosts, *h; g_object_ref (session); soup_session_cleanup_connections (session, FALSE); - try_again: - for (item = soup_message_queue_first (priv->queue); - item; - item = soup_message_queue_next (priv->queue, item)) { - msg = item->msg; - - /* CONNECT messages are handled specially */ - if (msg->method == SOUP_METHOD_CONNECT) - continue; - - if (!item->async || - item->async_context != soup_session_get_async_context (session)) - continue; + g_mutex_lock (&priv->conn_lock); + hosts = get_all_hosts (session); + g_mutex_unlock (&priv->conn_lock); - soup_session_process_queue_item (session, item, &should_cleanup, TRUE); - } + try_again: + for (h = hosts; h; h = hosts->next) + soup_session_host_run_queue (h->data, &should_cleanup); if (try_cleanup && should_cleanup) { /* There is at least one message in the queue that @@ -1952,6 +1954,7 @@ async_run_queue (SoupSession *session) } } + g_slist_free_full (hosts, g_object_unref); g_object_unref (session); } @@ -2158,19 +2161,42 @@ soup_session_pause_message (SoupSession *session, soup_message_queue_item_unref (item); } +static GSList * +get_all_queue_items (SoupSession *session) +{ + GSList *hosts, *h, *items, *host_items; + + hosts = get_all_hosts (session); + + items = NULL; + for (h = hosts; h; h = h->next) { + host_items = soup_session_host_get_queue_items (h->data); + items = g_slist_concat (host_items, items); + } + + g_slist_free_full (hosts, g_object_unref); + + return items; +} + static void soup_session_real_kick_queue (SoupSession *session) { SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session); + GSList *items, *i; SoupMessageQueueItem *item; gboolean have_sync_items = FALSE; if (priv->disposed) return; - for (item = soup_message_queue_first (priv->queue); - item; - item = soup_message_queue_next (priv->queue, item)) { + g_mutex_lock (&priv->conn_lock); + items = get_all_queue_items (session); + g_mutex_unlock (&priv->conn_lock); + + for (i = items; i; i = i->next) { + item = i->data; + if (item->async) { GSource *source; @@ -2189,6 +2215,7 @@ soup_session_real_kick_queue (SoupSession *session) } else have_sync_items = TRUE; } + g_slist_free_full (items, (GDestroyNotify) soup_message_queue_item_unref); if (have_sync_items) { g_mutex_lock (&priv->conn_lock); @@ -2312,26 +2339,29 @@ static void soup_session_real_flush_queue (SoupSession *session) { SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session); + GSList *items, *i; SoupMessageQueueItem *item; GHashTable *current = NULL; gboolean done = FALSE; + g_mutex_lock (&priv->conn_lock); + items = get_all_queue_items (session); + g_mutex_unlock (&priv->conn_lock); + if (SOUP_IS_SESSION_SYNC (session)) { /* Record the current contents of the queue */ current = g_hash_table_new (NULL, NULL); - for (item = soup_message_queue_first (priv->queue); - item; - item = soup_message_queue_next (priv->queue, item)) - g_hash_table_insert (current, item, item); + for (i = items; i; i = i->next) + g_hash_table_insert (current, i->data, i->data); } /* Cancel everything */ - for (item = soup_message_queue_first (priv->queue); - item; - item = soup_message_queue_next (priv->queue, item)) { + for (i = items; i; i = i->next) { + item = i->data; soup_session_cancel_message (session, item->msg, SOUP_STATUS_CANCELLED); } + g_slist_free_full (items, (GDestroyNotify) soup_message_queue_item_unref); if (SOUP_IS_SESSION_SYNC (session)) { /* Wait until all of the items in @current have been @@ -2345,12 +2375,14 @@ soup_session_real_flush_queue (SoupSession *session) g_mutex_lock (&priv->conn_lock); do { done = TRUE; - for (item = soup_message_queue_first (priv->queue); - item; - item = soup_message_queue_next (priv->queue, item)) { - if (g_hash_table_lookup (current, item)) + items = get_all_queue_items (session); + for (i = items; i; i = i->next) { + if (g_hash_table_lookup (current, i->data)) { done = FALSE; + break; + } } + g_slist_free_full (items, (GDestroyNotify) soup_message_queue_item_unref); if (!done) g_cond_wait (&priv->conn_cond, &priv->conn_lock); |