diff options
author | Dan Winship <danw@gnome.org> | 2014-01-19 10:48:56 -0500 |
---|---|---|
committer | Dan Winship <danw@gnome.org> | 2014-06-08 15:15:33 -0400 |
commit | 6ab3b9e745d7ecdc5a56ae22c068962cd5d943c6 (patch) | |
tree | cd9f6a1da35d80ac7002c9f1c6a75192b4d8d57c | |
parent | 62cb41219558c38c29d4c652e738c7790ef45092 (diff) | |
download | libsoup-wip/server-steal.tar.gz |
soup-server: add soup_client_context_steal_connection()wip/server-steal
Add a method to allow a SoupServer handler to steal the connection
from the server, and use this in simple-proxy to implement CONNECT.
Incorporates a patch from Lionel Landwerlin.
-rw-r--r-- | docs/reference/libsoup-2.4-sections.txt | 1 | ||||
-rw-r--r-- | examples/simple-proxy.c | 213 | ||||
-rw-r--r-- | libsoup/libsoup-2.4.sym | 1 | ||||
-rw-r--r-- | libsoup/soup-server.c | 52 | ||||
-rw-r--r-- | libsoup/soup-server.h | 2 | ||||
-rw-r--r-- | tests/server-test.c | 336 |
6 files changed, 601 insertions, 4 deletions
diff --git a/docs/reference/libsoup-2.4-sections.txt b/docs/reference/libsoup-2.4-sections.txt index 11f650fb..805a3918 100644 --- a/docs/reference/libsoup-2.4-sections.txt +++ b/docs/reference/libsoup-2.4-sections.txt @@ -238,6 +238,7 @@ soup_client_context_get_host soup_client_context_get_auth_domain soup_client_context_get_auth_user soup_client_context_get_gsocket +soup_client_context_steal_connection <SUBSECTION> soup_server_add_auth_domain soup_server_remove_auth_domain diff --git a/examples/simple-proxy.c b/examples/simple-proxy.c index 08bd847c..7657355e 100644 --- a/examples/simple-proxy.c +++ b/examples/simple-proxy.c @@ -15,6 +15,217 @@ static SoupSession *session; static SoupServer *server; +typedef struct { + GIOStream *iostream; + GInputStream *istream; + GOutputStream *ostream; + + gssize nread, nwrote; + guchar *buffer; +} TunnelEnd; + +typedef struct { + SoupServer *self; + SoupMessage *msg; + SoupClientContext *context; + GCancellable *cancellable; + + TunnelEnd client, server; +} Tunnel; + +#define BUFSIZE 8192 + +static void tunnel_read_cb (GObject *object, + GAsyncResult *result, + gpointer user_data); + +static void +tunnel_close (Tunnel *tunnel) +{ + if (tunnel->cancellable) { + g_cancellable_cancel (tunnel->cancellable); + g_object_unref (tunnel->cancellable); + } + + if (tunnel->client.iostream) { + g_io_stream_close (tunnel->client.iostream, NULL, NULL); + g_object_unref (tunnel->client.iostream); + } + if (tunnel->server.iostream) { + g_io_stream_close (tunnel->server.iostream, NULL, NULL); + g_object_unref (tunnel->server.iostream); + } + + g_free (tunnel->client.buffer); + g_free (tunnel->server.buffer); + + g_object_unref (tunnel->self); + g_object_unref (tunnel->msg); + + g_free (tunnel); +} + +static void +tunnel_wrote_cb (GObject *object, + GAsyncResult *result, + gpointer user_data) +{ + Tunnel *tunnel = user_data; + TunnelEnd *write_end, *read_end; + GError *error = NULL; + gssize nwrote; + + nwrote = g_output_stream_write_finish (G_OUTPUT_STREAM (object), result, &error); + if (nwrote <= 0) { + if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + g_error_free (error); + return; + } else if (error) { + g_print ("Tunnel write failed: %s\n", error->message); + g_error_free (error); + } + tunnel_close (tunnel); + return; + } + + if (object == (GObject *)tunnel->client.ostream) { + write_end = &tunnel->client; + read_end = &tunnel->server; + } else { + write_end = &tunnel->server; + read_end = &tunnel->client; + } + + write_end->nwrote += nwrote; + if (write_end->nwrote < read_end->nread) { + g_output_stream_write_async (write_end->ostream, + read_end->buffer + write_end->nwrote, + read_end->nread - write_end->nwrote, + G_PRIORITY_DEFAULT, tunnel->cancellable, + tunnel_wrote_cb, tunnel); + } else { + g_input_stream_read_async (read_end->istream, + read_end->buffer, BUFSIZE, + G_PRIORITY_DEFAULT, tunnel->cancellable, + tunnel_read_cb, tunnel); + } +} + +static void +tunnel_read_cb (GObject *object, + GAsyncResult *result, + gpointer user_data) +{ + Tunnel *tunnel = user_data; + TunnelEnd *read_end, *write_end; + GError *error = NULL; + gssize nread; + + nread = g_input_stream_read_finish (G_INPUT_STREAM (object), result, &error); + if (nread <= 0) { + if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + g_error_free (error); + return; + } else if (error) { + g_print ("Tunnel read failed: %s\n", error->message); + g_error_free (error); + } + tunnel_close (tunnel); + return; + } + + if (object == (GObject *)tunnel->client.istream) { + read_end = &tunnel->client; + write_end = &tunnel->server; + } else { + read_end = &tunnel->server; + write_end = &tunnel->client; + } + + read_end->nread = nread; + write_end->nwrote = 0; + g_output_stream_write_async (write_end->ostream, + read_end->buffer, read_end->nread, + G_PRIORITY_DEFAULT, tunnel->cancellable, + tunnel_wrote_cb, tunnel); +} + +static void +start_tunnel (SoupMessage *msg, gpointer user_data) +{ + Tunnel *tunnel = user_data; + + tunnel->client.iostream = soup_client_context_steal_connection (tunnel->context); + tunnel->client.istream = g_io_stream_get_input_stream (tunnel->client.iostream); + tunnel->client.ostream = g_io_stream_get_output_stream (tunnel->client.iostream); + + tunnel->client.buffer = g_malloc (BUFSIZE); + tunnel->server.buffer = g_malloc (BUFSIZE); + + tunnel->cancellable = g_cancellable_new (); + + g_input_stream_read_async (tunnel->client.istream, + tunnel->client.buffer, BUFSIZE, + G_PRIORITY_DEFAULT, tunnel->cancellable, + tunnel_read_cb, tunnel); + g_input_stream_read_async (tunnel->server.istream, + tunnel->server.buffer, BUFSIZE, + G_PRIORITY_DEFAULT, tunnel->cancellable, + tunnel_read_cb, tunnel); +} + + +static void +tunnel_connected_cb (GObject *object, + GAsyncResult *result, + gpointer user_data) +{ + Tunnel *tunnel = user_data; + GError *error = NULL; + + tunnel->server.iostream = (GIOStream *) + g_socket_client_connect_to_host_finish (G_SOCKET_CLIENT (object), result, &error); + if (!tunnel->server.iostream) { + soup_message_set_status (tunnel->msg, SOUP_STATUS_BAD_GATEWAY); + soup_message_set_response (tunnel->msg, "text/plain", + SOUP_MEMORY_COPY, + error->message, strlen (error->message)); + g_error_free (error); + soup_server_unpause_message (tunnel->self, tunnel->msg); + tunnel_close (tunnel); + return; + } + + tunnel->server.istream = g_io_stream_get_input_stream (tunnel->server.iostream); + tunnel->server.ostream = g_io_stream_get_output_stream (tunnel->server.iostream); + + soup_message_set_status (tunnel->msg, SOUP_STATUS_OK); + soup_server_unpause_message (tunnel->self, tunnel->msg); + g_signal_connect (tunnel->msg, "finished", + G_CALLBACK (start_tunnel), tunnel); +} + +static void +try_tunnel (SoupServer *server, SoupMessage *msg, SoupClientContext *context) +{ + Tunnel *tunnel; + SoupURI *dest_uri; + GSocketClient *sclient; + + soup_server_pause_message (server, msg); + + tunnel = g_new0 (Tunnel, 1); + tunnel->self = g_object_ref (server); + tunnel->msg = g_object_ref (msg); + tunnel->context = context; + + dest_uri = soup_message_get_uri (msg); + sclient = g_socket_client_new (); + g_socket_client_connect_to_host_async (sclient, dest_uri->host, dest_uri->port, + NULL, tunnel_connected_cb, tunnel); + g_object_unref (sclient); +} + static void copy_header (const char *name, const char *value, gpointer dest_headers) { @@ -78,7 +289,7 @@ server_callback (SoupServer *server, SoupMessage *msg, soup_message_get_http_version (msg)); if (msg->method == SOUP_METHOD_CONNECT) { - soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED); + try_tunnel (server, msg, context); return; } diff --git a/libsoup/libsoup-2.4.sym b/libsoup/libsoup-2.4.sym index d1b83889..908c26b8 100644 --- a/libsoup/libsoup-2.4.sym +++ b/libsoup/libsoup-2.4.sym @@ -93,6 +93,7 @@ soup_client_context_get_local_address soup_client_context_get_remote_address soup_client_context_get_socket soup_client_context_get_type +soup_client_context_steal_connection soup_connection_state_get_type soup_content_decoder_get_type soup_content_sniffer_get_buffer_size diff --git a/libsoup/soup-server.c b/libsoup/soup-server.c index 13d6e5e5..dee1a791 100644 --- a/libsoup/soup-server.c +++ b/libsoup/soup-server.c @@ -99,6 +99,7 @@ struct SoupClientContext { GSocketAddress *local_addr; int ref_count; + gboolean stole_connection; }; typedef struct { @@ -1136,8 +1137,12 @@ request_finished (SoupMessage *msg, gboolean io_complete, gpointer user_data) 0, msg, client); soup_client_context_cleanup (client); - if (io_complete && soup_socket_is_connected (sock) && - soup_message_is_keepalive (msg)) { + + if (client->stole_connection) { + soup_client_context_unref (client); + } else if (io_complete && + soup_socket_is_connected (sock) && + soup_message_is_keepalive (msg)) { /* Start a new request */ start_request (server, client); } else { @@ -2176,6 +2181,48 @@ soup_client_context_get_auth_user (SoupClientContext *client) } /** + * soup_client_context_steal_connection: + * @client: a #SoupClientContext + * + * "Steals" the HTTP connection associated with @client from its + * #SoupServer. Note that this happens immediately, regardless of the + * current state of the connection; if the response to the current + * #SoupMessage has not yet finished being sent, then it will be + * discarded; you can steal the connection from a #SoupMessage or + * #SoupServer signal handler if you need to wait for part or all of + * the response to be sent. + * + * Return value: (transfer full): the #GIOStream connected to @client. No + * guarantees are made about what kind of #GIOStream this is. + * + * Since: 2.48 + **/ +GIOStream * +soup_client_context_steal_connection (SoupClientContext *client) +{ + GIOStream *stream; + + g_return_val_if_fail (client != NULL, NULL); + g_return_val_if_fail (client->stole_connection == FALSE, NULL); + + soup_client_context_ref (client); + + client->stole_connection = TRUE; + g_object_set (G_OBJECT (client->sock), + SOUP_SOCKET_CLOSE_ON_DISPOSE, FALSE, + NULL); + stream = g_object_ref (soup_socket_get_iostream (client->sock)); + + if (soup_message_io_in_progress (client->msg)) + soup_message_io_finished (client->msg); + socket_disconnected (client->sock, client); + + soup_client_context_unref (client); + + return stream; +} + +/** * SoupServerCallback: * @server: the #SoupServer * @msg: the message being processed @@ -2420,4 +2467,3 @@ soup_server_unpause_message (SoupServer *server, soup_message_io_unpause (msg); } - diff --git a/libsoup/soup-server.h b/libsoup/soup-server.h index 79f6002c..0ea3adf0 100644 --- a/libsoup/soup-server.h +++ b/libsoup/soup-server.h @@ -145,6 +145,8 @@ const char *soup_client_context_get_host (SoupClientContext *clien SoupAuthDomain *soup_client_context_get_auth_domain (SoupClientContext *client); const char *soup_client_context_get_auth_user (SoupClientContext *client); +SOUP_AVAILABLE_IN_2_48 +GIOStream *soup_client_context_steal_connection (SoupClientContext *client); /* Legacy API */ diff --git a/tests/server-test.c b/tests/server-test.c index 25b02c2f..9b1eec1f 100644 --- a/tests/server-test.c +++ b/tests/server-test.c @@ -590,6 +590,340 @@ do_fd_import_test (void) g_object_unref (gsock); } +typedef struct { + GIOStream *iostream; + GInputStream *istream; + GOutputStream *ostream; + + gssize nread, nwrote; + guchar *buffer; +} TunnelEnd; + +typedef struct { + SoupServer *self; + SoupMessage *msg; + SoupClientContext *context; + GCancellable *cancellable; + + TunnelEnd client, server; +} Tunnel; + +#define BUFSIZE 8192 + +static void tunnel_read_cb (GObject *object, + GAsyncResult *result, + gpointer user_data); + +static void +tunnel_close (Tunnel *tunnel) +{ + if (tunnel->cancellable) { + g_cancellable_cancel (tunnel->cancellable); + g_object_unref (tunnel->cancellable); + } + + if (tunnel->client.iostream) { + g_io_stream_close (tunnel->client.iostream, NULL, NULL); + g_object_unref (tunnel->client.iostream); + } + if (tunnel->server.iostream) { + g_io_stream_close (tunnel->server.iostream, NULL, NULL); + g_object_unref (tunnel->server.iostream); + } + + g_free (tunnel->client.buffer); + g_free (tunnel->server.buffer); + + g_object_unref (tunnel->self); + g_object_unref (tunnel->msg); + + g_free (tunnel); +} + +static void +tunnel_wrote_cb (GObject *object, + GAsyncResult *result, + gpointer user_data) +{ + Tunnel *tunnel = user_data; + TunnelEnd *write_end, *read_end; + GError *error = NULL; + gssize nwrote; + + nwrote = g_output_stream_write_finish (G_OUTPUT_STREAM (object), result, &error); + if (nwrote <= 0) { + if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + g_error_free (error); + return; + } else if (error) { + g_print ("Tunnel write failed: %s\n", error->message); + g_error_free (error); + } + tunnel_close (tunnel); + return; + } + + if (object == (GObject *)tunnel->client.ostream) { + write_end = &tunnel->client; + read_end = &tunnel->server; + } else { + write_end = &tunnel->server; + read_end = &tunnel->client; + } + + write_end->nwrote += nwrote; + if (write_end->nwrote < read_end->nread) { + g_output_stream_write_async (write_end->ostream, + read_end->buffer + write_end->nwrote, + read_end->nread - write_end->nwrote, + G_PRIORITY_DEFAULT, tunnel->cancellable, + tunnel_wrote_cb, tunnel); + } else { + g_input_stream_read_async (read_end->istream, + read_end->buffer, BUFSIZE, + G_PRIORITY_DEFAULT, tunnel->cancellable, + tunnel_read_cb, tunnel); + } +} + +static void +tunnel_read_cb (GObject *object, + GAsyncResult *result, + gpointer user_data) +{ + Tunnel *tunnel = user_data; + TunnelEnd *read_end, *write_end; + GError *error = NULL; + gssize nread; + + nread = g_input_stream_read_finish (G_INPUT_STREAM (object), result, &error); + if (nread <= 0) { + if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + g_error_free (error); + return; + } else if (error) { + g_print ("Tunnel read failed: %s\n", error->message); + g_error_free (error); + } + tunnel_close (tunnel); + return; + } + + if (object == (GObject *)tunnel->client.istream) { + read_end = &tunnel->client; + write_end = &tunnel->server; + } else { + read_end = &tunnel->server; + write_end = &tunnel->client; + } + + read_end->nread = nread; + write_end->nwrote = 0; + g_output_stream_write_async (write_end->ostream, + read_end->buffer, read_end->nread, + G_PRIORITY_DEFAULT, tunnel->cancellable, + tunnel_wrote_cb, tunnel); +} + +static void +start_tunnel (SoupMessage *msg, gpointer user_data) +{ + Tunnel *tunnel = user_data; + + tunnel->client.iostream = soup_client_context_steal_connection (tunnel->context); + tunnel->client.istream = g_io_stream_get_input_stream (tunnel->client.iostream); + tunnel->client.ostream = g_io_stream_get_output_stream (tunnel->client.iostream); + + tunnel->client.buffer = g_malloc (BUFSIZE); + tunnel->server.buffer = g_malloc (BUFSIZE); + + tunnel->cancellable = g_cancellable_new (); + + g_input_stream_read_async (tunnel->client.istream, + tunnel->client.buffer, BUFSIZE, + G_PRIORITY_DEFAULT, tunnel->cancellable, + tunnel_read_cb, tunnel); + g_input_stream_read_async (tunnel->server.istream, + tunnel->server.buffer, BUFSIZE, + G_PRIORITY_DEFAULT, tunnel->cancellable, + tunnel_read_cb, tunnel); +} + + +static void +tunnel_connected_cb (GObject *object, + GAsyncResult *result, + gpointer user_data) +{ + Tunnel *tunnel = user_data; + GError *error = NULL; + + tunnel->server.iostream = (GIOStream *) + g_socket_client_connect_to_host_finish (G_SOCKET_CLIENT (object), result, &error); + if (!tunnel->server.iostream) { + soup_message_set_status (tunnel->msg, SOUP_STATUS_BAD_GATEWAY); + soup_message_set_response (tunnel->msg, "text/plain", + SOUP_MEMORY_COPY, + error->message, strlen (error->message)); + g_error_free (error); + soup_server_unpause_message (tunnel->self, tunnel->msg); + tunnel_close (tunnel); + return; + } + + tunnel->server.istream = g_io_stream_get_input_stream (tunnel->server.iostream); + tunnel->server.ostream = g_io_stream_get_output_stream (tunnel->server.iostream); + + soup_message_set_status (tunnel->msg, SOUP_STATUS_OK); + soup_server_unpause_message (tunnel->self, tunnel->msg); + g_signal_connect (tunnel->msg, "finished", + G_CALLBACK (start_tunnel), tunnel); +} + +static void +proxy_server_callback (SoupServer *server, SoupMessage *msg, + const char *path, GHashTable *query, + SoupClientContext *context, gpointer data) +{ + GSocketClient *sclient; + SoupURI *dest_uri; + Tunnel *tunnel; + + if (msg->method != SOUP_METHOD_CONNECT) { + soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED); + return; + } + + soup_server_pause_message (server, msg); + + tunnel = g_new0 (Tunnel, 1); + tunnel->self = g_object_ref (server); + tunnel->msg = g_object_ref (msg); + tunnel->context = context; + + dest_uri = soup_message_get_uri (msg); + sclient = g_socket_client_new (); + g_socket_client_connect_to_host_async (sclient, dest_uri->host, dest_uri->port, + NULL, tunnel_connected_cb, tunnel); + g_object_unref (sclient); +} + +static void +do_steal_connect_test (void) +{ + SoupServer *proxy; + SoupURI *proxy_uri; + SoupSession *session; + SoupMessage *msg; + const char *handled_by; + + proxy = soup_test_server_new (SOUP_TEST_SERVER_IN_THREAD); + proxy_uri = soup_test_server_get_uri (proxy, SOUP_URI_SCHEME_HTTP, "127.0.0.1"); + soup_server_add_handler (proxy, NULL, proxy_server_callback, NULL, NULL); + + session = soup_test_session_new (SOUP_TYPE_SESSION, + SOUP_SESSION_PROXY_URI, proxy_uri, + NULL); + msg = soup_message_new_from_uri ("GET", ssl_base_uri); + soup_session_send_message (session, msg); + + soup_test_assert_message_status (msg, SOUP_STATUS_OK); + handled_by = soup_message_headers_get_one (msg->response_headers, "X-Handled-By"); + g_assert_cmpstr (handled_by, ==, "server_callback"); + + g_object_unref (msg); + soup_test_session_abort_unref (session); + soup_test_server_quit_unref (proxy); + soup_uri_free (proxy_uri); +} + +#define UPGRADE_RESPONSE "HTTP/1.1 306 huh?\r\nX-Handled-By: non_http\r\n\r\n" + +static void +steal_after_upgrade (SoupMessage *msg, gpointer user_data) +{ + SoupClientContext *context = user_data; + GIOStream *stream; + GOutputStream *ostream; + GError *error = NULL; + + /* This should not ever be seen. */ + soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); + + stream = soup_client_context_steal_connection (context); + ostream = g_io_stream_get_output_stream (stream); + + /* SoupSession can't currently deal with a non-HTTP response, so + * we can't actually switch protocols. + */ + g_output_stream_write_all (ostream, UPGRADE_RESPONSE, strlen (UPGRADE_RESPONSE), + NULL, NULL, &error); + g_assert_no_error (error); + + g_io_stream_close (stream, NULL, &error); + g_assert_no_error (error); + g_object_unref (stream); +} + +static void +upgrade_server_callback (SoupServer *server, SoupMessage *msg, + const char *path, GHashTable *query, + SoupClientContext *context, gpointer data) +{ + if (msg->method != SOUP_METHOD_GET) { + soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED); + return; + } + + soup_message_set_status (msg, SOUP_STATUS_SWITCHING_PROTOCOLS); + soup_message_headers_append (msg->response_headers, + "X-Handled-By", "upgrade_server_callback"); + + g_signal_connect (msg, "wrote-informational", + G_CALLBACK (steal_after_upgrade), context); +} + +static void +got_informational (SoupMessage *msg, gpointer user_data) +{ + const char *handled_by; + + soup_test_assert_message_status (msg, SOUP_STATUS_SWITCHING_PROTOCOLS); + handled_by = soup_message_headers_get_one (msg->response_headers, "X-Handled-By"); + g_assert_cmpstr (handled_by, ==, "upgrade_server_callback"); +} + +static void +do_steal_upgrade_test (void) +{ + SoupServer *upserver; + SoupURI *uri; + SoupSession *session; + SoupMessage *msg; + const char *handled_by; + + upserver = soup_test_server_new (SOUP_TEST_SERVER_IN_THREAD); + uri = soup_test_server_get_uri (upserver, SOUP_URI_SCHEME_HTTP, "127.0.0.1"); + soup_server_add_handler (upserver, NULL, upgrade_server_callback, NULL, NULL); + + session = soup_test_session_new (SOUP_TYPE_SESSION, NULL); + msg = soup_message_new_from_uri ("GET", uri); + + g_signal_connect (msg, "got-informational", + G_CALLBACK (got_informational), NULL); + + soup_session_send_message (session, msg); + + soup_test_assert_message_status (msg, SOUP_STATUS_NOT_APPEARING_IN_THIS_PROTOCOL); + handled_by = soup_message_headers_get_one (msg->response_headers, "X-Handled-By"); + g_assert_cmpstr (handled_by, ==, "non_http"); + + g_object_unref (msg); + soup_test_session_abort_unref (session); + soup_test_server_quit_unref (upserver); + soup_uri_free (uri); +} + int main (int argc, char **argv) { @@ -623,6 +957,8 @@ main (int argc, char **argv) g_test_add_func ("/server/multi/family", do_multi_family_test); g_test_add_func ("/server/import/gsocket", do_gsocket_import_test); g_test_add_func ("/server/import/fd", do_fd_import_test); + g_test_add_func ("/server/steal/CONNECT", do_steal_connect_test); + g_test_add_func ("/server/steal/Upgrade", do_steal_upgrade_test); ret = g_test_run (); |