diff options
author | Dan Winship <danw@gnome.org> | 2014-12-09 12:19:40 +0100 |
---|---|---|
committer | Dan Winship <danw@gnome.org> | 2014-12-10 17:33:57 +0100 |
commit | 35a384ebc1ad816941bccc8d4b38cf6ddba5d400 (patch) | |
tree | 0d48e29491b7cda1cde69cfea89afe50f1b55df7 | |
parent | d12416cdc816d579a3eb3b6a57199848b7c0ec48 (diff) | |
download | libsoup-wip/http2-b.tar.gz |
wipwip/http2-b
-rw-r--r-- | libsoup/soup-http2-channel.c | 724 | ||||
-rw-r--r-- | libsoup/soup-http2-channel.h | 35 | ||||
-rw-r--r-- | libsoup/soup-http2-connection.c | 248 | ||||
-rw-r--r-- | libsoup/soup-http2-input-stream.c | 392 | ||||
-rw-r--r-- | libsoup/soup-http2-input-stream.h | 47 | ||||
-rw-r--r-- | libsoup/soup-http2-output-stream.c | 433 | ||||
-rw-r--r-- | libsoup/soup-http2-output-stream.h | 45 |
7 files changed, 1924 insertions, 0 deletions
diff --git a/libsoup/soup-http2-channel.c b/libsoup/soup-http2-channel.c new file mode 100644 index 00000000..05f30c4e --- /dev/null +++ b/libsoup/soup-http2-channel.c @@ -0,0 +1,724 @@ +/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */ +/* + * soup-http2-channel.c + * + * Copyright 2014 Red Hat, Inc. + */ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif + +#include <string.h> + +#include <glib/gi18n-lib.h> + +#include "soup-http2-channel.h" +#include "soup.h" +#include "soup-body-input-stream.h" +#include "soup-body-output-stream.h" +#include "soup-filter-input-stream.h" +#include "soup-socket-private.h" + +G_DEFINE_TYPE (SoupHTTP2Channel, soup_http2_channel, SOUP_TYPE_HTTP_CHANNEL) + +typedef struct { + SoupSocket *server_sock; + + SoupFilterInputStream *istream; + GPollableInputStream *poll_istream; + GOutputStream *ostream; + GPollableOutputStream *poll_ostream; + + GString *input_headers, *output_headers; + gboolean headers_read; + gsize headers_nwritten; + + SoupEncoding input_encoding, output_encoding; + goffset input_length, output_length; + +} SoupHTTP2ChannelPrivate; +#define SOUP_HTTP2_CHANNEL_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), SOUP_TYPE_HTTP2_CHANNEL, SoupHTTP2ChannelPrivate)) + +static void +soup_http2_channel_init (SoupHTTP2Channel *channel) +{ + SoupHTTP2ChannelPrivate *priv = SOUP_HTTP2_CHANNEL_GET_PRIVATE (channel); + + priv->input_headers = g_string_new (NULL); + priv->output_headers = g_string_new (NULL); +} + +#define READ_BUFFER_SIZE 8192 + +static gboolean +read_from_channel (SoupHTTPChannel *channel, + gboolean blocking, + GCancellable *cancellable, + GError **error) +{ + SoupHTTP2ChannelPrivate *priv = SOUP_HTTP2_CHANNEL_GET_PRIVATE (channel); + gssize nread, old_len; + gboolean got_lf; + char buf[READ_BUFFER_SIZE]; + + if (priv->headers_read) { + /* restart */ + // FIXME + priv->headers_read = FALSE; + } + + nread = g_pollable_stream_read (priv->istream, buf, sizeof (buf), + blocking, cancellable, error); + if (nread < 0) + return FALSE; + else if (nread == 0) { + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PARTIAL_INPUT, + _("Connection terminated unexpectedly")); + return FALSE; + } + + while (total < nread) + readlen = nghttp2_session_mem_recv (priv->session, buf, nread); +} + +static gboolean +read_headers (SoupHTTPChannel *channel, + gboolean blocking, + GCancellable *cancellable, + GError **error) +{ + SoupHTTP2ChannelPrivate *priv = SOUP_HTTP2_CHANNEL_GET_PRIVATE (channel); + gssize nread, old_len; + gboolean got_lf; + + if (priv->headers_read) { + /* restart */ + g_string_truncate (priv->input_headers, 0); + priv->headers_read = FALSE; + } + + while (1) { + old_len = priv->input_headers->len; + g_string_set_size (priv->input_headers, old_len + READ_BUFFER_SIZE); + nread = soup_filter_input_stream_read_line (priv->istream, + priv->input_headers->str + old_len, + READ_BUFFER_SIZE, + blocking, + &got_lf, + cancellable, error); + priv->input_headers->len = old_len + MAX (nread, 0); + + if (nread < 0) + return FALSE; + else if (nread == 0) { + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PARTIAL_INPUT, + _("Connection terminated unexpectedly")); + return FALSE; + } + + if (got_lf) { + if (nread == 1 && old_len >= 2 && + !strncmp (priv->input_headers->str + + priv->input_headers->len - 2, + "\n\n", 2)) + break; + else if (nread == 2 && old_len >= 3 && + !strncmp (priv->input_headers->str + + priv->input_headers->len - 3, + "\n\r\n", 3)) + break; + } + } + + /* We need to "rewind" priv->input_headers back one line. + * That SHOULD be two characters (CR LF), but if the + * web server was stupid, it might only be one. + */ + if (priv->input_headers->len < 3 || + priv->input_headers->str[priv->input_headers->len - 2] == '\n') + priv->input_headers->len--; + else + priv->input_headers->len -= 2; + priv->input_headers->str[priv->input_headers->len] = '\0'; + + priv->headers_read = TRUE; + return TRUE; +} + +static gboolean +soup_http2_channel_read_request_headers (SoupHTTPChannel *channel, + gboolean blocking, + GCancellable *cancellable, + GError **error) +{ + SoupHTTP2ChannelPrivate *priv = SOUP_HTTP2_CHANNEL_GET_PRIVATE (channel); + SoupMessage *msg = soup_http_channel_get_message (channel); + guint status; + char *req_method, *req_path, *uri_string; + const char *req_host; + SoupHTTPVersion version; + SoupURI *uri; + + if (!read_headers (channel, blocking, cancellable, error)) + return FALSE; + + status = soup_headers_parse_request (priv->input_headers->str, + priv->input_headers->len, + msg->request_headers, + &req_method, &req_path, &version); + + if (status != SOUP_STATUS_OK) { + failed: + g_set_error_literal (error, SOUP_HTTP_ERROR, status, + _("Could not parse HTTP request")); + return FALSE; + } + + g_object_set (msg, + SOUP_MESSAGE_METHOD, req_method, + SOUP_MESSAGE_HTTP_VERSION, version, + NULL); + g_free (req_method); + + /* Handle request body encoding */ + priv->input_encoding = soup_message_headers_get_encoding (msg->request_headers); + if (priv->input_encoding == SOUP_ENCODING_UNRECOGNIZED) { + g_free (req_path); + if (soup_message_headers_get_list (msg->request_headers, "Transfer-Encoding")) + status = SOUP_STATUS_NOT_IMPLEMENTED; + else + status = SOUP_STATUS_BAD_REQUEST; + goto failed; + } + if (priv->input_encoding == SOUP_ENCODING_CONTENT_LENGTH) + priv->input_length = soup_message_headers_get_content_length (msg->request_headers); + else + priv->input_length = -1; + + /* Generate correct context for request */ + req_host = soup_message_headers_get_one (msg->request_headers, "Host"); + if (req_host && strchr (req_host, '/')) { + g_free (req_path); + status = SOUP_STATUS_BAD_REQUEST; + goto failed; + } + + if (!strcmp (req_path, "*") && req_host) { + /* Eg, "OPTIONS * HTTP/1.1" */ + uri_string = g_strdup_printf ("%s://%s", + soup_socket_is_ssl (priv->server_sock) ? "https" : "http", + req_host); + uri = soup_uri_new (uri_string); + if (uri) + soup_uri_set_path (uri, "*"); + g_free (uri_string); + } else if (*req_path != '/') { + /* Must be an absolute URI */ + uri = soup_uri_new (req_path); + } else if (req_host) { + uri_string = g_strdup_printf ("%s://%s%s", + soup_socket_is_ssl (priv->server_sock) ? "https" : "http", + req_host, req_path); + uri = soup_uri_new (uri_string); + g_free (uri_string); + } else if (version == SOUP_HTTP_1_0) { + /* No Host header, no AbsoluteUri */ + SoupAddress *addr = soup_socket_get_local_address (priv->server_sock); + + uri = soup_uri_new (NULL); + soup_uri_set_scheme (uri, soup_socket_is_ssl (priv->server_sock) ? "https" : "http"); + soup_uri_set_host (uri, soup_address_get_physical (addr)); + soup_uri_set_port (uri, soup_address_get_port (addr)); + soup_uri_set_path (uri, req_path); + } else + uri = NULL; + + g_free (req_path); + + if (!uri || !uri->host) { + if (uri) + soup_uri_free (uri); + status = SOUP_STATUS_BAD_REQUEST; + goto failed; + } + + g_object_set (msg, + SOUP_MESSAGE_URI, uri, + NULL); + soup_uri_free (uri); + + return TRUE; +} + +static gboolean +soup_http2_channel_read_response_headers (SoupHTTPChannel *channel, + gboolean blocking, + GCancellable *cancellable, + GError **error) +{ + SoupHTTP2ChannelPrivate *priv = SOUP_HTTP2_CHANNEL_GET_PRIVATE (channel); + SoupMessage *msg = soup_http_channel_get_message (channel); + SoupHTTPVersion version; + guint status_code; + char *reason_phrase; + gboolean ok; + + if (!read_headers (channel, blocking, cancellable, error)) + return FALSE; + + ok = soup_headers_parse_response (priv->input_headers->str, + priv->input_headers->len, + msg->response_headers, + &version, &status_code, &reason_phrase); + + if (!ok) { + g_set_error_literal (error, SOUP_HTTP_ERROR, + SOUP_STATUS_MALFORMED, + _("Could not parse HTTP response")); + return FALSE; + } + + g_object_set (msg, + SOUP_MESSAGE_STATUS_CODE, status_code, + SOUP_MESSAGE_REASON_PHRASE, reason_phrase, + SOUP_MESSAGE_HTTP_VERSION, MIN (version, soup_message_get_http_version (msg)), + NULL); + + if (msg->method == SOUP_METHOD_HEAD || + status_code == SOUP_STATUS_NO_CONTENT || + status_code == SOUP_STATUS_NOT_MODIFIED || + SOUP_STATUS_IS_INFORMATIONAL (status_code) || + (msg->method == SOUP_METHOD_CONNECT && + SOUP_STATUS_IS_SUCCESSFUL (status_code))) + priv->input_encoding = SOUP_ENCODING_NONE; + else { + priv->input_encoding = soup_message_headers_get_encoding (msg->response_headers); + + if (priv->input_encoding == SOUP_ENCODING_UNRECOGNIZED) { + g_set_error_literal (error, SOUP_HTTP_ERROR, + SOUP_STATUS_NOT_IMPLEMENTED, + _("Unrecognized HTTP encoding")); + return FALSE; + } + } + + if (priv->input_encoding == SOUP_ENCODING_CONTENT_LENGTH) { + const char *conn; + + priv->input_length = soup_message_headers_get_content_length (msg->response_headers); + + /* Some servers suck and send incorrect Content-Length + * values, so if the message isn't keepalive anyway, allow + * EOF termination. + */ + conn = soup_message_headers_get_one (msg->response_headers, "Connection"); + if (version == SOUP_HTTP_1_0 && + (!conn || !soup_header_contains (conn, "Keep-Alive"))) + priv->input_encoding = SOUP_ENCODING_EOF; + else if (version == SOUP_HTTP_1_1 && conn && + soup_header_contains (conn, "close")) + priv->input_encoding = SOUP_ENCODING_EOF; + } else + priv->input_length = -1; + + return TRUE; +} + +static gboolean +write_headers (SoupHTTPChannel *channel, + gboolean blocking, + GCancellable *cancellable, + GError **error) +{ + SoupHTTP2ChannelPrivate *priv = SOUP_HTTP2_CHANNEL_GET_PRIVATE (channel); + gssize nwrote; + + while (priv->headers_nwritten < priv->output_headers->len) { + nwrote = g_pollable_stream_write (priv->ostream, + priv->output_headers->str + priv->headers_nwritten, + priv->output_headers->len - priv->headers_nwritten, + blocking, cancellable, error); + if (nwrote == -1) + return FALSE; + priv->headers_nwritten += nwrote; + } + return TRUE; +} + +static void +finish_build_headers (SoupHTTPChannel *channel, + SoupMessageHeaders *headers) +{ + SoupHTTP2ChannelPrivate *priv = SOUP_HTTP2_CHANNEL_GET_PRIVATE (channel); + SoupMessageHeadersIter iter; + const char *name, *value; + + if (priv->output_encoding == SOUP_ENCODING_CONTENT_LENGTH) + priv->output_length = soup_message_headers_get_content_length (headers); + + soup_message_headers_iter_init (&iter, headers); + while (soup_message_headers_iter_next (&iter, &name, &value)) + g_string_append_printf (priv->output_headers, "%s: %s\r\n", name, value); + g_string_append (priv->output_headers, "\r\n"); +} + +static void +build_request_headers (SoupHTTPChannel *channel) +{ + SoupHTTP2ChannelPrivate *priv = SOUP_HTTP2_CHANNEL_GET_PRIVATE (channel); + SoupMessage *msg = soup_http_channel_get_message (channel); + SoupURI *request_uri = soup_message_get_uri (msg); + char *uri_host; + char *uri_string; + + g_string_truncate (priv->output_headers, 0); + priv->headers_nwritten = 0; + + if (strchr (request_uri->host, ':')) + uri_host = g_strdup_printf ("[%.*s]", (int) strcspn (request_uri->host, "%"), request_uri->host); + else if (g_hostname_is_non_ascii (request_uri->host)) + uri_host = g_hostname_to_ascii (request_uri->host); + else + uri_host = request_uri->host; + + if (msg->method == SOUP_METHOD_CONNECT) { + /* CONNECT URI is hostname:port for tunnel destination */ + uri_string = g_strdup_printf ("%s:%d", uri_host, request_uri->port); + } else { + /* Proxy expects full URI to destination. Otherwise + * just the path. + */ + if (soup_connection_is_via_proxy (soup_message_get_connection (msg))) { + uri_string = soup_uri_to_string (request_uri, FALSE); + if (request_uri->fragment) { + /* Strip fragment */ + char *fragment = strchr (uri_string, '#'); + if (fragment) + *fragment = '\0'; + } + } else + uri_string = soup_uri_to_string (request_uri, TRUE); + } + + g_string_append_printf (priv->output_headers, "%s %s HTTP/1.%d\r\n", + msg->method, uri_string, + (soup_message_get_http_version (msg) == SOUP_HTTP_1_0) ? 0 : 1); + + if (!soup_message_headers_get_one (msg->request_headers, "Host")) { + if (soup_uri_uses_default_port (request_uri)) { + g_string_append_printf (priv->output_headers, "Host: %s\r\n", + uri_host); + } else { + g_string_append_printf (priv->output_headers, "Host: %s:%d\r\n", + uri_host, request_uri->port); + } + } + g_free (uri_string); + if (uri_host != request_uri->host) + g_free (uri_host); + + priv->output_encoding = soup_message_headers_get_encoding (msg->request_headers); + finish_build_headers (channel, msg->request_headers); +} + +static void +build_response_headers (SoupHTTPChannel *channel) +{ + SoupHTTP2ChannelPrivate *priv = SOUP_HTTP2_CHANNEL_GET_PRIVATE (channel); + SoupMessage *msg = soup_http_channel_get_message (channel); + SoupEncoding claimed_encoding; + + g_string_truncate (priv->output_headers, 0); + priv->headers_nwritten = 0; + + g_string_append_printf (priv->output_headers, "HTTP/1.%c %d %s\r\n", + (soup_message_get_http_version (msg) == SOUP_HTTP_1_0) ? '0' : '1', + msg->status_code, msg->reason_phrase); + + claimed_encoding = soup_message_headers_get_encoding (msg->response_headers); + if ((msg->method == SOUP_METHOD_HEAD || + msg->status_code == SOUP_STATUS_NO_CONTENT || + msg->status_code == SOUP_STATUS_NOT_MODIFIED || + SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) || + (msg->method == SOUP_METHOD_CONNECT && + SOUP_STATUS_IS_SUCCESSFUL (msg->status_code))) + priv->output_encoding = SOUP_ENCODING_NONE; + else + priv->output_encoding = claimed_encoding; + + finish_build_headers (channel, msg->response_headers); +} + +static gboolean +soup_http2_channel_write_request_headers (SoupHTTPChannel *channel, + gboolean blocking, + GCancellable *cancellable, + GError **error) +{ + SoupHTTP2ChannelPrivate *priv = SOUP_HTTP2_CHANNEL_GET_PRIVATE (channel); + + if (priv->headers_nwritten == priv->output_headers->len) + build_request_headers (channel); + return write_headers (channel, blocking, cancellable, error); +} + +static gboolean +soup_http2_channel_write_response_headers (SoupHTTPChannel *channel, + gboolean blocking, + GCancellable *cancellable, + GError **error) +{ + SoupHTTP2ChannelPrivate *priv = SOUP_HTTP2_CHANNEL_GET_PRIVATE (channel); + + if (priv->headers_nwritten == priv->output_headers->len) + build_response_headers (channel); + return write_headers (channel, blocking, cancellable, error); +} + +static void +istream_close (SoupHTTP2InputStream *h2i, gpointer channel) +{ + SoupHTTP2ChannelPrivate *priv = SOUP_HTTP2_CHANNEL_GET_PRIVATE (channel); + + soup_http2_connection_close_input (priv->connection, priv->stream_id); +} + +static GInputStream * +soup_http2_channel_get_body_input_stream (SoupHTTPChannel *channel) +{ + SoupHTTP2ChannelPrivate *priv = SOUP_HTTP2_CHANNEL_GET_PRIVATE (channel); + + g_return_val_if_fail (priv->headers_read, NULL); + + if (!priv->istream) { + priv->istream = soup_http2_input_stream_new (channel, priv->stream_id); + g_signal_connect (priv->istream, "close", + G_CALLBACK (istream_close), channel); + } + + return g_object_ref (priv->istream); +} + +static void +ostream_close (SoupHTTP2OutputStream *h2o, gpointer channel) +{ + SoupHTTP2ChannelPrivate *priv = SOUP_HTTP2_CHANNEL_GET_PRIVATE (channel); + + soup_http2_connection_close_output (priv->connection, priv->stream_id); +} + +static void +ostream_write (SoupHTTP2OutputStream *h2o, + gconstpointer buffer, gulong length, + gpointer channel) +{ + SoupHTTP2ChannelPrivate *priv = SOUP_HTTP2_CHANNEL_GET_PRIVATE (channel); + + soup_http2_connection_write_body (priv->connection, priv->stream_id, + buffer, length); +} + +static GOutputStream * +soup_http2_channel_get_body_output_stream (SoupHTTPChannel *channel) +{ + SoupHTTP2ChannelPrivate *priv = SOUP_HTTP2_CHANNEL_GET_PRIVATE (channel); + + g_return_val_if_fail (priv->headers_nwritten > 0, NULL); + + if (!priv->ostream) { + priv->ostream = soup_http2_output_stream_new (channel, priv->stream_id); + g_signal_connect (priv->ostream, "close", + G_CALLBACK (ostream_close), channel); + g_signal_connect (priv->ostream, "write", + G_CALLBACK (ostream_write), channel); + } + + return g_object_ref (priv->ostream); +} + +static GSource * +soup_http2_channel_create_source (SoupHTTPChannel *channel, + GIOCondition cond, + GCancellable *cancellable) +{ + SoupHTTP2ChannelPrivate *priv = SOUP_HTTP2_CHANNEL_GET_PRIVATE (channel); + + if (cond == G_IO_IN) + return g_pollable_input_stream_create_source (priv->poll_istream, cancellable); + else if (cond == G_IO_OUT) + return g_pollable_output_stream_create_source (priv->poll_ostream, cancellable); + else + g_assert_not_reached (); +} + +static void +soup_http2_channel_dispose (GObject *object) +{ + SoupHTTP2ChannelPrivate *priv = SOUP_HTTP2_CHANNEL_GET_PRIVATE (object); + + g_clear_object (&priv->server_sock); + g_clear_object (&priv->istream); + g_clear_object (&priv->poll_istream); + g_clear_object (&priv->ostream); + g_clear_object (&priv->poll_ostream); + + G_OBJECT_CLASS (soup_http2_channel_parent_class)->dispose (object); +} + +static void +soup_http2_channel_finalize (GObject *object) +{ + SoupHTTP2ChannelPrivate *priv = SOUP_HTTP2_CHANNEL_GET_PRIVATE (object); + + g_string_free (priv->input_headers, TRUE); + g_string_free (priv->output_headers, TRUE); + + G_OBJECT_CLASS (soup_http2_channel_parent_class)->finalize (object); +} + +static void +soup_http2_channel_class_init (SoupHTTP2ChannelClass *http2_channel_class) +{ + GObjectClass *object_class = G_OBJECT_CLASS (http2_channel_class); + SoupHTTPChannelClass *channel_class = SOUP_HTTP_CHANNEL_CLASS (http2_channel_class); + + g_type_class_add_private (http2_channel_class, sizeof (SoupHTTP2ChannelPrivate)); + + object_class->dispose = soup_http2_channel_dispose; + object_class->finalize = soup_http2_channel_finalize; + + channel_class->read_request_headers = soup_http2_channel_read_request_headers; + channel_class->read_response_headers = soup_http2_channel_read_response_headers; + channel_class->write_request_headers = soup_http2_channel_write_request_headers; + channel_class->write_response_headers = soup_http2_channel_write_response_headers; + channel_class->get_body_input_stream = soup_http2_channel_get_body_input_stream; + channel_class->get_body_output_stream = soup_http2_channel_get_body_output_stream; + channel_class->create_source = soup_http2_channel_create_source; +} + +SoupHTTPChannel * +soup_http2_channel_new_client (SoupMessage *msg) +{ + SoupHTTPChannel *channel; + SoupHTTP2ChannelPrivate *priv; + GIOStream *iostream; + + channel = g_object_new (SOUP_TYPE_HTTP2_CHANNEL, + SOUP_HTTP_CHANNEL_MESSAGE, msg, + SOUP_HTTP_CHANNEL_MODE, SOUP_HTTP_CHANNEL_CLIENT, + NULL); + priv = SOUP_HTTP2_CHANNEL_GET_PRIVATE (channel); + + iostream = soup_socket_get_iostream (soup_connection_get_socket (soup_message_get_connection (msg))); + priv->istream = g_object_ref (g_io_stream_get_input_stream (iostream)); + priv->poll_istream = g_object_ref (priv->istream); + priv->ostream = g_object_ref (g_io_stream_get_output_stream (iostream)); + priv->poll_ostream = g_object_ref (priv->ostream); + + return channel; +} + +SoupHTTPChannel * +soup_http2_channel_new_server (SoupMessage *msg, + SoupSocket *sock) +{ + SoupHTTPChannel *channel; + SoupHTTP2ChannelPrivate *priv; + GIOStream *iostream; + + channel = g_object_new (SOUP_TYPE_HTTP2_CHANNEL, + SOUP_HTTP_CHANNEL_MESSAGE, msg, + SOUP_HTTP_CHANNEL_MODE, SOUP_HTTP_CHANNEL_SERVER, + NULL); + priv = SOUP_HTTP2_CHANNEL_GET_PRIVATE (channel); + + priv->server_sock = g_object_ref (sock); + + iostream = soup_socket_get_iostream (sock); + priv->istream = g_object_ref (g_io_stream_get_input_stream (iostream)); + priv->poll_istream = g_object_ref (priv->istream); + priv->ostream = g_object_ref (g_io_stream_get_output_stream (iostream)); + priv->poll_ostream = g_object_ref (priv->ostream); + + return channel; +} + +void +soup_http2_channel_push_header (SoupHTTP2Channel *channel, + const char *name_raw, gsize name_len, + const char *value_raw, gsize value_len) +{ + SoupHTTP2ChannelPrivate *priv = SOUP_HTTP2_CHANNEL_GET_PRIVATE (object); + char *name, *value; + + value = g_strndup (value_raw, valuelen); + + if (*name_raw == ':') { + name_raw++; + name_len--; + + if (FIXME_client_side) { + if (!strncmp (name_raw, "status", name_len)) { + priv->status = atoi (value); + g_free (value); + } else + FIXME invalid; + } else { + if (!strncmp (name_raw, "method", name_len)) + priv->method = value; + else if (!strncmp (name_raw, "scheme", name_len)) + priv->scheme = value; + else if (!strncmp (name_raw, "authority", name_len)) + priv->authority = value; + else if (!strncmp (name_raw, "path", name_len)) + priv->path = value; + else + FIXME invalid; + } + return; + } + + name = g_strndup (name_raw, namelen); + + soup_message_headers_append (priv->input_headers, name, value); + g_free (name); + g_free (value); +} + +void +soup_http2_channel_get_headers_complete (SoupHTTP2Channel *channel) +{ + SoupHTTP2ChannelPrivate *priv = SOUP_HTTP2_CHANNEL_GET_PRIVATE (object); + + return priv->headers_complete; +} + +void +soup_http2_channel_set_headers_complete (SoupHTTP2Channel *channel) +{ + SoupHTTP2ChannelPrivate *priv = SOUP_HTTP2_CHANNEL_GET_PRIVATE (object); + + priv->headers_complete = TRUE; + g_async_queue_push (priv->headers_queue, GINT_TO_POINTER (1)); +} + +void +soup_http2_channel_push_data (SoupHTTP2Channel *channel, + const guchar *data, + gsize len) +{ + SoupHTTP2ChannelPrivate *priv = SOUP_HTTP2_CHANNEL_GET_PRIVATE (object); + + soup_http2_input_stream_push_data (priv->istream, data, len); +} + +void +soup_http2_channel_closed (SoupHTTP2Channel *channel, + guint32 error_code) +{ + SoupHTTP2ChannelPrivate *priv = SOUP_HTTP2_CHANNEL_GET_PRIVATE (object); + + /* FIXME: error_code */ + soup_http2_input_stream_push_eof (priv->istream); +} diff --git a/libsoup/soup-http2-channel.h b/libsoup/soup-http2-channel.h new file mode 100644 index 00000000..e56d9a58 --- /dev/null +++ b/libsoup/soup-http2-channel.h @@ -0,0 +1,35 @@ +/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */ +/* + * Copyright 2014 Red Hat, Inc. + */ + +#ifndef SOUP_HTTP2_CHANNEL_H +#define SOUP_HTTP2_CHANNEL_H 1 + +#include "soup-http-channel.h" +#include "soup-connection.h" + +#define SOUP_TYPE_HTTP2_CHANNEL (soup_http2_channel_get_type ()) +#define SOUP_HTTP2_CHANNEL(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), SOUP_TYPE_HTTP2_CHANNEL, SoupHTTP2Channel)) +#define SOUP_HTTP2_CHANNEL_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), SOUP_TYPE_HTTP2_CHANNEL, SoupHTTP2ChannelClass)) +#define SOUP_IS_HTTP2_CHANNEL(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), SOUP_TYPE_HTTP2_CHANNEL)) +#define SOUP_IS_HTTP2_CHANNEL_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((obj), SOUP_TYPE_HTTP2_CHANNEL)) +#define SOUP_HTTP2_CHANNEL_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), SOUP_TYPE_HTTP2_CHANNEL, SoupHTTP2ChannelClass)) + +typedef struct { + SoupHTTPChannel parent; + +} SoupHTTP2Channel; + +typedef struct { + SoupHTTPChannelClass parent_class; + +} SoupHTTP2ChannelClass; + +GType soup_http2_channel_get_type (void); + +SoupHTTPChannel *soup_http2_channel_new_client (SoupMessage *msg); +SoupHTTPChannel *soup_http2_channel_new_server (SoupMessage *msg, + SoupSocket *socket); + +#endif /* SOUP_HTTP2_CHANNEL_H */ diff --git a/libsoup/soup-http2-connection.c b/libsoup/soup-http2-connection.c new file mode 100644 index 00000000..8935b62c --- /dev/null +++ b/libsoup/soup-http2-connection.c @@ -0,0 +1,248 @@ +/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */ +/* + * soup-http2-connection.c + * + * Copyright 2014 Red Hat, Inc. + */ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif + +#include <nghttp2/nghttp2.h> + +#include "soup-http2-connection.h" + +G_DEFINE_TYPE (SoupHTTP2Connection, soup_http2_connection, G_TYPE_OBJECT) + +typedef struct { + nghttp2_session *session; + + GInputStream *istream; + GPollableInputStream *poll_istream; + GOutputStream *ostream; + GPollableOutputStream *poll_ostream; + +} SoupHTTP2ConnectionPrivate; +#define SOUP_HTTP2_CONNECTION_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), SOUP_TYPE_HTTP2_CONNECTION, SoupHTTP2ConnectionPrivate)) + +static ssize_t +ngh2_send_cb (nghttp2_session *session, + const uint8_t *data, size_t length, + int flags, void *conn) +{ + SoupHTTP2ConnectionPrivate *priv = SOUP_HTTP2_CONNECTION_GET_PRIVATE (conn); + gssize nwrote; + GError *error = NULL; + + nwrote = g_pollable_output_stream_write_nonblocking (priv->ostream, + buf, length, + NULL, &error); + if (nwrote == -1) { + if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { + g_error_free (error); + return NGHTTP2_ERR_WOULDBLOCK; + } + g_error_free (error); + return NGHTTP2_ERR_CALLBACK_FAILURE; + } else + return nread; +} + +static ssize_t +ngh_recv_cb (nghttp2_session *session, + uint8_t *buf, size_t length, + int flags, void *conn) +{ + SoupHTTP2ConnectionPrivate *priv = SOUP_HTTP2_CONNECTION_GET_PRIVATE (conn); + gssize nread; + GError *error = NULL; + + nread = g_pollable_input_stream_read_nonblocking (priv->istream, + buf, length, + NULL, &error); + if (nread == -1) { + if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { + g_error_free (error); + return NGHTTP2_ERR_WOULDBLOCK; + } + g_error_free (error); + return NGHTTP2_ERR_CALLBACK_FAILURE; + } else if (nread == 0) + return NGHTTP2_ERR_EOF; + else + return nread; +} + +static int +ngh2_data_chunk_recv_cb (nghttp2_session *session, uint8_t flags, + int32_t stream_id, const uint8_t *data, size_t len, + void *conn) +{ + SoupHTTP2ConnectionPrivate *priv = SOUP_HTTP2_CONNECTION_GET_PRIVATE (conn); + SoupHTTP2Channel *channel; + + channel = g_hash_table_lookup (priv->channels, GINT_TO_POINTER (stream_id)); + if (!channel) + return -1; + + soup_http2_channel_push_data (channel, data, len); + return 0; +} + +static int +ngh2_stream_close_cb (nghttp2_session *session, + int32_t stream_id, uint32_t error_code, + void *conn) +{ + SoupHTTP2ConnectionPrivate *priv = SOUP_HTTP2_CONNECTION_GET_PRIVATE (conn); + SoupHTTP2Channel *channel; + + channel = g_hash_table_lookup (priv->channels, GINT_TO_POINTER (stream_id)); + if (channel) { + soup_http2_channel_input_closed (channel, error_code); + if (soup_http2_channel_get_output_closed (channel)) + g_hash_table_remove (GINT_TO_POINTER (stream_id)); + } + return 0; +} + +static int +ngh2_begin_headers_cb (nghttp2_session *session, + const nghttp2_frame *frame, + void *conn) +{ + SoupHTTP2ConnectionPrivate *priv = SOUP_HTTP2_CONNECTION_GET_PRIVATE (conn); + SoupHTTP2Channel *channel; + + channel = g_hash_table_lookup (priv->channels, GINT_TO_POINTER (stream_id)); + if (channel) + return -1; + + if (frame->hd.type != NGHTTP2_HEADERS) + return -1; + + channel = soup_http2_channel_new (conn, frame->hd.stream_id); + g_hash_table_insert (priv->channels, GINT_TO_POINTER (frame->hd.stream_id), channel); + return 0 +} + +static int +ngh2_on_header_cb (nghttp2_session *session, const nghttp2_frame *frame, + const uint8_t *name, size_t namelen, + const uint8_t *value, size_t valuelen, + uint8_t flags, void *conn) +{ + SoupHTTP2ConnectionPrivate *priv = SOUP_HTTP2_CONNECTION_GET_PRIVATE (conn); + SoupHTTP2Channel *channel; + + channel = g_hash_table_lookup (priv->channels, GINT_TO_POINTER (stream_id)); + if (!channel) + return -1; + + if (soup_http2_channel_get_headers_complete (channel)) + return 0; + + if (!nghttp2_check_header_value (value, valuelen)) + return -1; + if (namelen == 0 || + (name[0] == ':' && !nghttp2_check_header_name (name + 1, namelen - 1)) || + (name[0] != ':' && !nghttp2_check_header_name (name, namelen))) + return -1; + + soup_http2_channel_push_header (channel, + name, namelen, + value, valuelen); + return 0; +} + +static int +ngh2_frame_recv_cb (nghttp2_session *session, + const nghttp2_frame *frame, + void *conn) +{ + SoupHTTP2ConnectionPrivate *priv = SOUP_HTTP2_CONNECTION_GET_PRIVATE (conn); + SoupHTTP2Channel *channel; + + if (frame->hd.type != NGHTTP2_HEADERS) + return 0; + + channel = g_hash_table_lookup (priv->channels, GINT_TO_POINTER (stream_id)); + if (!channel) + return 0; + + soup_http2_channel_set_headers_complete (channel); + return 0; +} + +static void + soup_http2_connection_init (SoupHTTP2Connection *connection) +{ + SoupHTTP2ConnectionPrivate *priv = SOUP_HTTP2_CONNECTION_GET_PRIVATE (connection); + nghttp2_session_callbacks *callbacks; + + priv->channels = g_hash_table_new_full (NULL, NULL, NULL, g_object_unref); + + nghttp2_session_callbacks_new (&callbacks); + nghttp2_session_callbacks_set_send_callback (callbacks, ngh2_send_cb); + nghttp2_session_callbacks_set_recv_callback (callbacks, ngh2_recv_cb); + nghttp2_session_callbacks_set_on_data_chunk_recv_callback (callbacks, ngh2_data_chunk_recv_cb); + nghttp2_session_callbacks_set_on_stream_close_callback (callbacks, ngh2_stream_close_cb); + nghttp2_session_callbacks_set_on_begin_headers_callback(callbacks, ngh2_begin_headers_cb); + nghttp2_session_callbacks_set_on_header_callback (callbacks, ngh2_header_cb); + nghttp2_session_callbacks_set_on_frame_recv_callback (callbacks, ngh2_frame_recv_cb); + + nghttp2_session_client_new (&priv->session, callbacks, connection); + nghttp2_session_callbacks_del (callbacks); +} + +static void +soup_http2_connection_run (SoupHTTP2Connection *conn) +{ + SoupHTTP2ConnectionPrivate *priv = SOUP_HTTP2_CONNECTION_GET_PRIVATE (connection); + GSource *in, *out; + GMainContext *ctx; + GMainLoop *loop; + int status; + + ctx = g_main_context_new (); + g_main_context_push_thread_default (ctx); + loop = g_main_loop_new (ctx, FALSE); + + in = g_pollable_input_stream_create_source (priv->istream, NULL); + g_source_set_callback (in, (GSourceFunc) quit_callback, loop); + g_source_attach (in, ctx); + + out = g_pollable_output_stream_create_source (priv->ostream, NULL); + g_source_set_callback (out, (GSourceFunc) quit_callback, loop); + g_source_attach (out, ctx); + + while (TRUE) { + if (g_pollable_output_stream_is_writable (priv->ostream)) { + status = nghttp2_session_send (priv->session); + if (status != 0) + break; + } + if (g_pollable_input_stream_is_readable (priv->ostream)) { + status = nghttp2_session_recv (priv->session); + if (status != 0) + break; + } + + if (!nghttp2_session_want_read (priv->session) && + !nghttp2_session_want_write (priv->session)) + break; + + g_main_loop_run (loop); + } + + g_source_destroy (in); + g_source_unref (in); + g_source_destroy (out); + g_source_unref (out); + + g_main_loop_unref (loop); + g_main_context_pop_thread_default (ctx); + g_main_context_unref (ctx); +} + diff --git a/libsoup/soup-http2-input-stream.c b/libsoup/soup-http2-input-stream.c new file mode 100644 index 00000000..e479a206 --- /dev/null +++ b/libsoup/soup-http2-input-stream.c @@ -0,0 +1,392 @@ +/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */ +/* + * soup-http2-input-stream.c + * + * Copyright 2014 Red Hat, Inc. + */ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif + +#include "soup-http2-input-stream.h" +#include "soup.h" +#include "soup-message-private.h" + +static void soup_http2_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data); + +G_DEFINE_TYPE_WITH_CODE (SoupHTTP2InputStream, soup_http2_input_stream, SOUP_TYPE_FILTER_INPUT_STREAM, + G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM, + soup_http2_input_stream_pollable_init)) + +enum { + CLOSE, + LAST_SIGNAL +}; + +static guint signals[LAST_SIGNAL] = { 0 }; + +enum { + PROP_0, + + PROP_CHANNEL, + PROP_STREAM_ID +}; + +struct _SoupHTTP2InputStreamPrivate { + SoupHTTP2Channel *chan; + guint32 stream_id; + + GAsyncQueue *queue; + GBytes *current; + + GMutex *mutex; + GError *error; + gboolean eof; +}; +#define SOUP_HTTP2_INPUT_STREAM_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), SOUP_TYPE_HTTP2_INPUT_STREAM, SoupHTTP2InputStreamPrivate)) + +static void +soup_http2_input_stream_init (SoupHTTP2InputStream *stream) +{ + SoupHTTP2InputStreamPrivate *priv = SOUP_HTTP2_INPUT_STREAM_GET_PRIVATE (stream); + + g_mutex_init (&priv->mutex); + priv->queue = g_async_queue_new_full ((GDestroyNotify) g_byte_array_unref); +} + +static void +soup_http2_input_stream_finalize (GObject *object) +{ + SoupHTTP2InputStreamPrivate *priv = SOUP_HTTP2_INPUT_STREAM_GET_PRIVATE (object); + + g_mutex_clear (&priv->mutex); + g_clear_object (&priv->chan); + g_clear_pointer (&priv->current, g_bytes_unref); + g_clear_pointer (&priv->queue, g_async_queue_unref); + g_clear_error (&priv->error); + + G_OBJECT_CLASS (soup_http2_input_stream_parent_class)->finalize (object); +} + +static void +soup_http2_input_stream_set_property (GObject *object, guint prop_id, + const GValue *value, GParamSpec *pspec) +{ + SoupHTTP2InputStreamPrivate *priv = SOUP_HTTP2_INPUT_STREAM_GET_PRIVATE (object); + + switch (prop_id) { + case PROP_CHANNEL: + priv->chan = g_value_dup_object (value); + break; + case PROP_STREAM_ID: + priv->stream_id = g_value_get_uint (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +soup_http2_input_stream_get_property (GObject *object, guint prop_id, + GValue *value, GParamSpec *pspec) +{ + SoupHTTP2InputStreamPrivate *priv = SOUP_HTTP2_INPUT_STREAM_GET_PRIVATE (object); + + switch (prop_id) { + case PROP_CHANNEL: + g_value_set_object (value, priv->chan); + break; + case PROP_STREAM_ID: + g_value_set_uint (value, priv->stream_id); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +void +soup_http2_input_stream_push_data (SoupHTTP2InputStream *h2i, + const guchar *data, + gsize len) +{ + SoupHTTP2InputStreamPrivate *priv = SOUP_HTTP2_INPUT_STREAM_GET_PRIVATE (h2i); + + g_async_queue_push (priv->queue, g_bytes_new (data, len)); +} + +void +soup_http2_input_stream_push_eof (SoupHTTP2InputStream *h2i) +{ + SoupHTTP2InputStreamPrivate *priv = SOUP_HTTP2_INPUT_STREAM_GET_PRIVATE (h2i); + + g_mutex_lock (&priv->mutex); + priv->eof = TRUE; + g_mutex_unlock (&priv->mutex); + g_async_queue_push (priv->queue, g_bytes_new (NULL, 0)); +} + +void +soup_http2_input_stream_push_error (SoupHTTP2InputStream *h2i, + GError *error) +{ + SoupHTTP2InputStreamPrivate *priv = SOUP_HTTP2_INPUT_STREAM_GET_PRIVATE (h2i); + + g_mutex_lock (&priv->mutex); + if (!priv->error) { + priv->error = g_error_copy (error); + g_async_queue_push (priv->queue, g_bytes_new (NULL, 0)); + } + g_mutex_unlock (&priv->mutex); +} + +static gssize +read_one_gbytes (SoupHTTP2InputStreamPrivate *priv, + GBytes *bytes, + guchar *buffer, + gsize count) +{ + gconstpointer data; + gsize length; + + g_assert (priv->current == NULL); + + data = g_bytes_get_data (bytes, &length); + if (length > count) { + priv->current = g_bytes_new_from_bytes (bytes, count, length - count); + length = count; + } + + memcpy (buffer, data, length); + return length; +} + +static gssize +read_internal (SoupHTTP2InputStreamPrivate *priv, + guchar *buffer, + gsize count, + gboolean blocking, + GError **error) +{ + GBytes *bytes; + gssize nread, total = 0; + gpointer (*queue_pop) (GAsyncQueue *) = + blocking ? g_async_queue_pop : g_async_queue_try_pop; + + if (priv->current) { + bytes = priv->current; + priv->current = NULL; + + nread = read_one_gbytes (priv, bytes, buffer + total, count - total); + g_bytes_unref (bytes); + + total += nread; + } + + while (total < count && (bytes = queue_pop (priv->queue))) { + nread = read_one_gbytes (priv, bytes, buffer + total, count - total); + g_bytes_unref (bytes); + + if (nread == 0) + break; + + total += nread; + } + + if (total == 0) { + g_mutex_lock (&priv->mutex); + if (priv->error) { + g_propagate_error (error, priv->error); + priv->error = NULL; + total = -1; + } else if (!blocking && !priv->eof) { + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, + _("Operation would block")); + total = -1; + } + g_mutex_unlock (&priv->mutex); + } + + return total; +} + +static void +read_cancelled (GCancellable *cancellable, + gpointer user_data) +{ + SoupHTTP2InputStream *h2i = SOUP_HTTP2_INPUT_STREAM (user_data); + SoupHTTP2InputStreamPrivate *priv = SOUP_HTTP2_INPUT_STREAM_GET_PRIVATE (h2i); + GError *error = NULL; + + g_cancellable_set_error_if_cancelled (cancellable, &error); + soup_http2_input_stream_take_error (h2i, error); +} + +static gssize +soup_http2_input_stream_read_fn (GInputStream *stream, + void *buffer, + gsize count, + GCancellable *cancellable, + GError **error) +{ + SoupHTTP2InputStreamPrivate *priv = SOUP_HTTP2_INPUT_STREAM_GET_PRIVATE (stream); + guint cancelled_id = 0; + gssize nread; + + if (cancellable) { + if (g_cancellable_set_error_if_cancelled (cancellable, error)) + return -1; + + cancelled_id = g_signal_connect (cancellable, "cancelled", + G_CALLBACK (read_cancelled), stream); + } + + nread = read_internal (priv, buffer, count, TRUE, error); + + if (cancellable) + g_signal_handler_disconnect (cancellable, cancelled_id); + + return nread; +} + +static gssize +soup_http2_input_stream_read_nonblocking (GPollableInputStream *stream, + void *buffer, + gsize count, + GError **error) +{ + SoupHTTP2InputStreamPrivate *priv = SOUP_HTTP2_INPUT_STREAM_GET_PRIVATE (object); + + return read_internal (priv, buffer, count, FALSE, error); +} + +static gboolean +soup_http2_input_stream_is_readable (GPollableInputStream *stream) +{ + SoupHTTP2InputStreamPrivate *priv = SOUP_HTTP2_INPUT_STREAM_GET_PRIVATE (object); + + return (priv->current != NULL || + g_async_queue_length (priv->queue) > 0); +} + +static GSource * +soup_http2_input_stream_create_source (GPollableInputStream *stream, + GCancellable *cancellable) +{ + SoupHTTP2InputStreamPrivate *priv = SOUP_HTTP2_INPUT_STREAM_GET_PRIVATE (object); + GSource *base_source, *source; + + base_source = g_async_queue_create_source (priv->queue); + source = g_pollable_source_new_full (stream, base_source, cancellable); + g_source_unref (base_source); + + return source; +} + +static gboolean +soup_http2_input_stream_close_fn (GInputStream *stream, + GCancellable *cancellable, + GError **error) +{ + g_signal_emit (stream, signals[CLOSE], 0); + return TRUE; +} + +static void +soup_http2_input_stream_close_async (GInputStream *stream, + gint priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + GTask *task; + + g_signal_emit (stream, signals[CLOSE], 0); + + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_priority (task, priority); + g_task_return_boolean (task, TRUE); + g_object_unref (task); +} + +static gboolean +soup_http2_input_stream_close_finish (GInputStream *stream, + GAsyncResult *result, + GError **error) +{ + return g_task_propagate_boolean (G_TASK (result), error); +} + +static void +soup_http2_input_stream_class_init (SoupHTTP2InputStreamClass *stream_class) +{ + GObjectClass *object_class = G_OBJECT_CLASS (stream_class); + GInputStreamClass *input_stream_class = G_INPUT_STREAM_CLASS (stream_class); + + g_type_class_add_private (stream_class, sizeof (SoupHTTP2InputStreamPrivate)); + + object_class->finalize = soup_http2_input_stream_finalize; + object_class->set_property = soup_http2_input_stream_set_property; + object_class->get_property = soup_http2_input_stream_get_property; + + input_stream_class->read_fn = soup_http2_input_stream_read_fn; + input_stream_class->close_fn = soup_http2_input_stream_close_fn; + input_stream_class->close_async = soup_http2_input_stream_close_async; + input_stream_class->close_finish = soup_http2_input_stream_close_finish; + + signals[CLOSE] = + g_signal_new ("close", + G_OBJECT_CLASS_TYPE (object_class), + G_SIGNAL_RUN_LAST, + 0, + NULL, NULL, + NULL, + G_TYPE_NONE, 0); + + g_object_class_install_property ( + object_class, PROP_CHANNEL, + g_param_spec_object ("channel", + "Channel", + "SoupHTTP2Channel", + SOUP_TYPE_HTTP2_CHANNEL, + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY | + G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property ( + object_class, PROP_STREAM_ID, + g_param_spec_uint ("stream-id", + "Stream ID", + "HTTP/2 stream ID", + 0, G_MAXUINT32, 0, + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY | + G_PARAM_STATIC_STRINGS)); +} + +static void +soup_http2_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, + gpointer interface_data) +{ + pollable_interface->is_readable = soup_http2_input_stream_is_readable; + pollable_interface->create_source = soup_http2_input_stream_create_source; + pollable_interface->read_nonblocking = soup_http2_input_stream_read_nonblocking; +} + +GInputStream * +soup_http2_input_stream_new (SoupHTTP2Channel *chan, + guint32 stream_id) +{ + return g_object_new (SOUP_TYPE_HTTP2_INPUT_STREAM, + "channel", chan, + "stream-id", stream_id, + NULL); +} + +guint32 +soup_http2_input_stream_get_stream_id (SoupHTTP2InputStream *h2i) +{ + return SOUP_HTTP2_INPUT_STREAM_GET_PRIVATE (h2i)->stream_id; +} diff --git a/libsoup/soup-http2-input-stream.h b/libsoup/soup-http2-input-stream.h new file mode 100644 index 00000000..f7c92ab2 --- /dev/null +++ b/libsoup/soup-http2-input-stream.h @@ -0,0 +1,47 @@ +/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */ +/* + * Copyright 2014 Red Hat, Inc. + */ + +#ifndef SOUP_HTTP2_INPUT_STREAM_H +#define SOUP_HTTP2_INPUT_STREAM_H 1 + +#include "soup-types.h" +#include "soup-http2-channel.h" + +G_BEGIN_DECLS + +#define SOUP_TYPE_HTTP2_INPUT_STREAM (soup_http2_input_stream_get_type ()) +#define SOUP_HTTP2_INPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), SOUP_TYPE_HTTP2_INPUT_STREAM, SoupHTTP2InputStream)) +#define SOUP_HTTP2_INPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), SOUP_TYPE_HTTP2_INPUT_STREAM, SoupHTTP2InputStreamClass)) +#define SOUP_IS_HTTP2_INPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), SOUP_TYPE_HTTP2_INPUT_STREAM)) +#define SOUP_IS_HTTP2_INPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((obj), SOUP_TYPE_HTTP2_INPUT_STREAM)) +#define SOUP_HTTP2_INPUT_STREAM_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), SOUP_TYPE_HTTP2_INPUT_STREAM, SoupHTTP2InputStreamClass)) + +typedef struct { + GInputStream parent; + +} SoupHTTP2InputStream; + +typedef struct { + GInputStreamClass parent_class; + +} SoupHTTP2InputStreamClass; + +GType soup_http2_input_stream_get_type (void); + +GInputStream *soup_http2_input_stream_new (SoupHTTP2Channel *chan, + guint32 stream_id); + +guint32 soup_http2_input_stream_get_stream_id (SoupHTTP2InputStream *h2i) + +void soup_http2_input_stream_push_data (SoupHTTP2InputStream *h2i, + const guchar *data, + gsize len); +void soup_http2_input_stream_push_eof (SoupHTTP2InputStream *h2i); +void soup_http2_input_stream_push_error (SoupHTTP2InputStream *h2i, + GError *error); + +G_END_DECLS + +#endif /* SOUP_HTTP2_INPUT_STREAM_H */ diff --git a/libsoup/soup-http2-output-stream.c b/libsoup/soup-http2-output-stream.c new file mode 100644 index 00000000..7319e23f --- /dev/null +++ b/libsoup/soup-http2-output-stream.c @@ -0,0 +1,433 @@ +/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */ +/* + * soup-http2-output-stream.c + * + * Copyright 2014 Red Hat, Inc. + */ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif + +#include "soup-http2-output-stream.h" +#include "soup.h" +#include "soup-message-private.h" + +static void soup_http2_output_stream_pollable_init (GPollableOutputStreamInterface *pollable_interface, gpointer interface_data); + +G_DEFINE_TYPE_WITH_CODE (SoupHTTP2OutputStream, soup_http2_output_stream, SOUP_TYPE_FILTER_OUTPUT_STREAM, + G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM, + soup_http2_output_stream_pollable_init)) + +enum { + CLOSE, + WRITE, + LAST_SIGNAL +}; + +static guint signals[LAST_SIGNAL] = { 0 }; + +enum { + PROP_0, + + PROP_CHANNEL, + PROP_STREAM_ID +}; + +struct _SoupHTTP2OutputStreamPrivate { + SoupHTTP2Channel *chan; + + GMutex mutex; + GCond cond; + gboolean writable; + GError *error; + GSList *sources; +}; +#define SOUP_HTTP2_OUTPUT_STREAM_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), SOUP_TYPE_HTTP2_OUTPUT_STREAM, SoupHTTP2OutputStreamPrivate)) + +static void +soup_http2_output_stream_init (SoupHTTP2OutputStream *stream) +{ + SoupHTTP2OutputStreamPrivate *priv = SOUP_HTTP2_OUTPUT_STREAM_GET_PRIVATE (object); + + g_mutex_init (&priv->mutex); + g_cond_init (&priv->cond); +} + +static void +soup_http2_output_stream_finalize (GObject *object) +{ + SoupHTTP2OutputStreamPrivate *priv = SOUP_HTTP2_OUTPUT_STREAM_GET_PRIVATE (object); + + g_clear_object (&priv->chan); + g_mutex_clear (&priv->mutex); + g_cond_clear (&priv->cond); + g_clear_error (&priv->error); + g_assert (priv->sources == NULL); + + G_OBJECT_CLASS (soup_http2_output_stream_parent_class)->finalize (object); +} + +static void +soup_http2_output_stream_set_property (GObject *object, guint prop_id, + const GValue *value, GParamSpec *pspec) +{ + SoupHTTP2OutputStreamPrivate *priv = SOUP_HTTP2_OUTPUT_STREAM_GET_PRIVATE (object); + + switch (prop_id) { + case PROP_CHANNEL: + priv->chan = g_value_dup_object (value); + break; + case PROP_STREAM_ID: + priv->stream_id = g_value_get_uint (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +soup_http2_output_stream_get_property (GObject *object, guint prop_id, + GValue *value, GParamSpec *pspec) +{ + SoupHTTP2OutputStreamPrivate *priv = SOUP_HTTP2_OUTPUT_STREAM_GET_PRIVATE (object); + + switch (prop_id) { + case PROP_CHANNEL: + g_value_set_object (value, priv->chan); + break; + case PROP_STREAM_ID: + g_value_set_uint (value, priv->stream_id); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +wakeup_unlocked (SoupHTTP2OutputStreamPrivate *priv) +{ + GSList *iter; + + g_cond_signal (&priv->cond); + for (iter = priv->sources; iter; iter = iter->next) + g_source_set_ready_time (iter->data, 0); +} + +void +soup_http2_output_stream_set_writable (SoupHTTP2OutputStream *h2o, + gboolean writable) +{ + SoupHTTP2OutputStreamPrivate *priv = SOUP_HTTP2_OUTPUT_STREAM_GET_PRIVATE (h2o); + + g_mutex_lock (&priv->mutex); + priv->writable = writable; + if (priv->writable) + wakeup_unlocked (priv); + g_mutex_unlock (&priv->mutex); +} + +void +soup_http2_output_stream_push_error (SoupHTTP2OutputStream *h2o, + GError *error) +{ + SoupHTTP2OutputStreamPrivate *priv = SOUP_HTTP2_OUTPUT_STREAM_GET_PRIVATE (h2o); + + g_mutex_lock (&priv->mutex); + if (!priv->error) + priv->error = error; + else + g_error_free (error); + wakeup_unlocked (priv); + g_mutex_lock (&priv->mutex); +} + +static gssize +write_internal (SoupHTTP2OutputStream *h2o, + guchar *buffer, + gsize count, + gboolean blocking, + GCancellable *cancellable, + GError **error) +{ + SoupHTTP2OutputStreamPrivate *priv = SOUP_HTTP2_OUTPUT_STREAM_GET_PRIVATE (h2o); + GBytes *bytes; + + g_mutex_lock (&priv->mutex); + + if (priv->error) { + g_propagate_error (error, priv->error); + priv->error = NULL; + g_mutex_unlock (&priv->mutex); + return -1; + } + + if (!priv->writable) { + if (!blocking) { + g_mutex_unlock (&priv->mutex); + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, + _("Operation would block")); + return -1; + } + + while (!priv->writable && !priv->error) + g_cond_wait (&priv->cond, &priv->mutex); + } + + g_signal_emit (h2o, signals[WRITE], 0, buffer, (gulong) count); + + g_mutex_unlock (&priv->mutex); + + return count; +} + +static void +write_cancelled (GCancellable *cancellable, + gpointer user_data) +{ + SoupHTTP2OutputStream *h2o = SOUP_HTTP2_OUTPUT_STREAM (user_data); + SoupHTTP2OutputStreamPrivate *priv = SOUP_HTTP2_OUTPUT_STREAM_GET_PRIVATE (h2o); + + g_mutex_lock (&priv->mutex); + if (!priv->write_error) { + g_cancellable_set_error_if_cancelled (cancellable, &priv->write_error); + wakeup_unlocked (priv); + } + g_mutex_unlock (&priv->mutex); +} + +static gssize +soup_http2_output_stream_write_fn (GOutputStream *stream, + void *buffer, + gsize count, + GCancellable *cancellable, + GError **error) +{ + SoupHTTP2OutputStream *h2o = SOUP_HTTP2_OUTPUT_STREAM (stream); + SoupHTTP2OutputStreamPrivate *priv = SOUP_HTTP2_OUTPUT_STREAM_GET_PRIVATE (stream); + guint cancelled_id = 0; + gssize nwrote; + + if (cancellable) { + if (g_cancellable_set_error_if_cancelled (cancellable, error)) + return -1; + + cancelled_id = g_signal_connect (cancellable, "cancelled", + G_CALLBACK (write_cancelled), stream); + } + + nwrote = write_internal (h2o, buffer, count, TRUE, cancellable, error); + + if (cancellable) + g_signal_handler_disconnect (cancellable, cancelled_id); + + return nwrote; +} + +static gssize +soup_http2_output_stream_write_nonblocking (GPollableOutputStream *stream, + void *buffer, + gsize count, + GError **error) +{ + SoupHTTP2OutputStream *h2o = SOUP_HTTP2_OUTPUT_STREAM (stream); + SoupHTTP2OutputStreamPrivate *priv = SOUP_HTTP2_OUTPUT_STREAM_GET_PRIVATE (stream); + + return write_internal (h2o, buffer, count, FALSE, NULL, error); +} + +static gboolean +soup_http2_output_stream_is_writable (GPollableOutputStream *stream) +{ + SoupHTTP2OutputStreamPrivate *priv = SOUP_HTTP2_OUTPUT_STREAM_GET_PRIVATE (object); + + return priv->writable; +} + +/* We need to create our own GSource type, so that it can remove + * itself from priv->sources when it's destroyed. + */ +typedef struct { + GSource source; + SoupHTTP2OutputStream *h2o; +} SoupHTTP2OutputStreamSource; + +static gboolean +http2_output_stream_source_dispatch (GSource *source, + GSourceFunc callback, + gpointer user_data) +{ + return callback (user_data); +} + +static void +http2_output_stream_source_finalize (GSource *source) +{ + SoupHTTP2OutputStreamSource *hosource = (SoupHTTP2OutputStreamSource *)source; + SoupHTTP2OutputStreamPrivate *priv = SOUP_HTTP2_OUTPUT_STREAM_GET_PRIVATE (hosource->h2o); + + g_mutex_lock (&priv->mutex); + priv->sources = g_slist_remove (priv->sources, source); + g_mutex_unlock (&priv->mutex); + g_object_unref (hosource->h2o); +} + +static gboolean +http2_output_stream_source_closure_callback (gpointer data) +{ + GClosure *closure = data; + GValue result_value = G_VALUE_INIT; + + g_value_init (&result_value, G_TYPE_BOOLEAN); + g_closure_invoke (closure, &result_value, 0, NULL, NULL); + return g_value_get_boolean (&result_value); +} + +static GSourceFuncs http2_output_stream_source_funcs = { + NULL, + NULL, + http2_output_stream_source_dispatch, + http2_output_stream_source_finalize, + (GSourceFunc) http2_output_stream_source_closure_callback, + (GSourceDummyMarshal) g_cclosure_marshal_generic +}; + +GSource * +soup_http2_output_stream_create_source (GPollableOutputStream *stream, + GCancellable *cancellable) +{ + SoupHTTP2OutputStreamPrivate *priv = SOUP_HTTP2_OUTPUT_STREAM_GET_PRIVATE (object); + GSource *base_source, *source; + + base_source = g_source_new (&http2_output_stream_source_funcs, + sizeof (SoupHTTP2OutputStreamSource)); + g_source_set_name (base_source, "SoupHTTP2OutputStreamSource"); + ((SoupHTTP2OutputStreamSource *)base_source)->h2o = g_object_ref (stream); + + g_mutex_lock (&priv->mutex); + if (priv->writable) + g_source_set_ready_time (base_source, 0); + priv->sources = g_slist_prepend (priv->sources, base_source); + g_mutex_unlock (&priv->mutex); + + source = g_pollable_source_new (stream, base_source, cancellable); + g_source_unref (base_source); + + return source; +} + +static gboolean +soup_http2_output_stream_close_fn (GOutputStream *stream, + GCancellable *cancellable, + GError **error) +{ + g_signal_emit (stream, signals[CLOSE], 0); + return TRUE; +} + +static void +soup_http2_output_stream_close_async (GOutputStream *stream, + gint priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + GTask *task; + + g_signal_emit (stream, signals[CLOSE], 0); + + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_priority (task, priority); + g_task_return_boolean (task, TRUE); + g_object_unref (task); +} + +static gboolean +soup_http2_output_stream_close_finish (GOutputStream *stream, + GAsyncResult *result, + GError **error) +{ + return g_task_propagate_boolean (G_TASK (result), error); +} + +static void +soup_http2_output_stream_class_init (SoupHTTP2OutputStreamClass *stream_class) +{ + GObjectClass *object_class = G_OBJECT_CLASS (stream_class); + GOutputStreamClass *output_stream_class = G_OUTPUT_STREAM_CLASS (stream_class); + + g_type_class_add_private (stream_class, sizeof (SoupHTTP2OutputStreamPrivate)); + + object_class->finalize = soup_http2_output_stream_finalize; + object_class->set_property = soup_http2_output_stream_set_property; + object_class->get_property = soup_http2_output_stream_get_property; + + output_stream_class->write_fn = soup_http2_output_stream_write_fn; + output_stream_class->close_fn = soup_http2_output_stream_close_fn; + output_stream_class->close_async = soup_http2_output_stream_close_async; + output_stream_class->close_finish = soup_http2_output_stream_close_finish; + + signals[CLOSE] = + g_signal_new ("close", + G_OBJECT_CLASS_TYPE (object_class), + G_SIGNAL_RUN_LAST, + 0, + NULL, NULL, + NULL, + G_TYPE_NONE, 0); + signals[WRITE] = + g_signal_new ("write", + G_OBJECT_CLASS_TYPE (object_class), + G_SIGNAL_RUN_LAST, + 0, + NULL, NULL, + NULL, + G_TYPE_NONE, 2, G_TYPE_POINTER, G_TYPE_ULONG); + + g_object_class_install_property ( + object_class, PROP_CHANNEL, + g_param_spec_object ("channel", + "Channel", + "SoupHTTP2Channel", + SOUP_TYPE_HTTP2_CHANNEL, + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY | + G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property ( + object_class, PROP_STREAM_ID, + g_param_spec_uint ("stream-id", + "Stream ID", + "HTTP/2 stream ID", + 0, G_MAXUINT32, 0, + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY | + G_PARAM_STATIC_STRINGS)); +} + +static void +soup_http2_output_stream_pollable_init (GPollableOutputStreamInterface *pollable_interface, + gpointer interface_data) +{ + pollable_interface->is_writable = soup_http2_output_stream_is_writable; + pollable_interface->create_source = soup_http2_output_stream_create_source; + pollable_interface->write_nonblocking = soup_http2_output_stream_write_nonblocking; +} + +GOutputStream * +soup_http2_output_stream_new (SoupHTTP2Connection *connection, + guint32 stream_id) +{ + return g_object_new (SOUP_TYPE_HTTP2_OUTPUT_STREAM, + "connection", connection, + "stream-id", stream_id, + NULL); +} + +guint32 +soup_http2_output_stream_get_stream_id (SoupHTTP2OutputStream *h2o) +{ + return SOUP_HTTP2_OUTPUT_STREAM_GET_PRIVATE (h2o)->stream_id; +} diff --git a/libsoup/soup-http2-output-stream.h b/libsoup/soup-http2-output-stream.h new file mode 100644 index 00000000..f07f0def --- /dev/null +++ b/libsoup/soup-http2-output-stream.h @@ -0,0 +1,45 @@ +/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */ +/* + * Copyright 2014 Red Hat, Inc. + */ + +#ifndef SOUP_HTTP2_OUTPUT_STREAM_H +#define SOUP_HTTP2_OUTPUT_STREAM_H 1 + +#include "soup-types.h" +#include "soup-http2-channel.h" + +G_BEGIN_DECLS + +#define SOUP_TYPE_HTTP2_OUTPUT_STREAM (soup_http2_output_stream_get_type ()) +#define SOUP_HTTP2_OUTPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), SOUP_TYPE_HTTP2_OUTPUT_STREAM, SoupHTTP2OutputStream)) +#define SOUP_HTTP2_OUTPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), SOUP_TYPE_HTTP2_OUTPUT_STREAM, SoupHTTP2OutputStreamClass)) +#define SOUP_IS_HTTP2_OUTPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), SOUP_TYPE_HTTP2_OUTPUT_STREAM)) +#define SOUP_IS_HTTP2_OUTPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((obj), SOUP_TYPE_HTTP2_OUTPUT_STREAM)) +#define SOUP_HTTP2_OUTPUT_STREAM_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), SOUP_TYPE_HTTP2_OUTPUT_STREAM, SoupHTTP2OutputStreamClass)) + +typedef struct { + GOutputStream parent; + +} SoupHTTP2OutputStream; + +typedef struct { + GOutputStreamClass parent_class; + +} SoupHTTP2OutputStreamClass; + +GType soup_http2_output_stream_get_type (void); + +GOutputStream *soup_http2_output_stream_new (SoupHTTP2Channel *chan, + guint32 stream_id); + +guint32 soup_http2_output_stream_get_stream_id (SoupHTTP2OutputStream *h2o) + +void soup_http2_output_stream_set_writable (SoupHTTP2OutputStream *h2o, + gboolean writable); +void soup_http2_output_stream_push_error (SoupHTTP2OutputStream *h2o, + GError *error); + +G_END_DECLS + +#endif /* SOUP_HTTP2_OUTPUT_STREAM_H */ |