diff options
author | Carlos Garcia Campos <cgarcia@igalia.com> | 2021-04-22 13:08:36 +0200 |
---|---|---|
committer | Carlos Garcia Campos <cgarcia@igalia.com> | 2021-04-27 10:47:21 +0200 |
commit | b0a8ba8e29dc9bb337e9aa54a20d278d68cedb97 (patch) | |
tree | dde88dc751ad1eb55e656f77b13b7db67d90f0cc | |
parent | c0503e1a5b444ca1a50cbf1c18c665e02475d209 (diff) | |
download | libsoup-carlosgc/io-split.tar.gz |
Rename SoupClientMessageIOData as SoupClientMessageIOHTTP1carlosgc/io-split
-rw-r--r-- | docs/reference/meson.build | 1 | ||||
-rw-r--r-- | libsoup/meson.build | 2 | ||||
-rw-r--r-- | libsoup/soup-client-message-io-http1.c | 1117 | ||||
-rw-r--r-- | libsoup/soup-client-message-io-http1.h | 10 | ||||
-rw-r--r-- | libsoup/soup-connection.c | 3 | ||||
-rw-r--r-- | libsoup/soup-message-io.c | 1117 | ||||
-rw-r--r-- | libsoup/soup-message-private.h | 1 |
7 files changed, 1131 insertions, 1120 deletions
diff --git a/docs/reference/meson.build b/docs/reference/meson.build index 38792607..3aea8764 100644 --- a/docs/reference/meson.build +++ b/docs/reference/meson.build @@ -40,6 +40,7 @@ ignore_headers = [ 'soup-message-metrics-private.h', 'soup-client-message-io.h', 'soup-message-io-completion.h', + 'soup-client-message-io-http1.h', ] mkdb_args = [ diff --git a/libsoup/meson.build b/libsoup/meson.build index ceafe5a1..1448f820 100644 --- a/libsoup/meson.build +++ b/libsoup/meson.build @@ -52,6 +52,7 @@ soup_sources = [ 'soup-body-output-stream.c', 'soup-client-input-stream.c', 'soup-client-message-io.c', + 'soup-client-message-io-http1.c', 'soup-connection.c', 'soup-date-utils.c', 'soup-filter-input-stream.c', @@ -63,7 +64,6 @@ soup_sources = [ 'soup-logger-input-stream.c', 'soup-message.c', 'soup-message-headers.c', - 'soup-message-io.c', 'soup-message-io-data.c', 'soup-message-metrics.c', 'soup-message-queue-item.c', diff --git a/libsoup/soup-client-message-io-http1.c b/libsoup/soup-client-message-io-http1.c new file mode 100644 index 00000000..28c0ed19 --- /dev/null +++ b/libsoup/soup-client-message-io-http1.c @@ -0,0 +1,1117 @@ +/* -*- Mode: C; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 8 -*- */ +/* + * soup-message-io.c: HTTP message I/O + * + * Copyright (C) 2000-2003, Ximian, Inc. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <glib/gi18n-lib.h> + +#ifdef HAVE_SYSPROF +#include <sysprof-capture.h> +#endif + +#include "soup-client-message-io-http1.h" +#include "soup.h" +#include "soup-body-input-stream.h" +#include "soup-body-output-stream.h" +#include "soup-client-input-stream.h" +#include "soup-connection.h" +#include "soup-content-processor.h" +#include "content-sniffer/soup-content-sniffer-stream.h" +#include "soup-filter-input-stream.h" +#include "soup-logger-private.h" +#include "soup-message-private.h" +#include "soup-message-metrics-private.h" +#include "soup-message-queue-item.h" +#include "soup-misc.h" +#include "soup-uri-utils-private.h" + +typedef struct { + SoupClientMessageIO iface; + SoupMessageIOData base; + + SoupMessageQueueItem *item; + + SoupMessageMetrics *metrics; + +#ifdef HAVE_SYSPROF + gint64 begin_time_nsec; +#endif +} SoupClientMessageIOHTTP1; + +#define RESPONSE_BLOCK_SIZE 8192 +#define HEADER_SIZE_LIMIT (64 * 1024) + +static void +soup_client_message_io_http1_destroy (SoupClientMessageIO *iface) +{ + SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface; + + soup_message_io_data_cleanup (&io->base); + soup_message_queue_item_unref (io->item); + + g_slice_free (SoupClientMessageIOHTTP1, io); +} + +static int +soup_client_message_io_http1_get_priority (SoupClientMessageIOHTTP1 *io) +{ + if (!io->item->task) + return G_PRIORITY_DEFAULT; + + return g_task_get_priority (io->item->task); +} + +static void +soup_client_message_io_http1_finished (SoupClientMessageIO *iface) +{ + SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface; + SoupMessageIOCompletionFn completion_cb; + gpointer completion_data; + SoupMessageIOCompletion completion; + SoupMessage *msg; + + completion_cb = io->base.completion_cb; + completion_data = io->base.completion_data; + + if ((io->base.read_state >= SOUP_MESSAGE_IO_STATE_FINISHING && + io->base.write_state >= SOUP_MESSAGE_IO_STATE_FINISHING)) + completion = SOUP_MESSAGE_IO_COMPLETE; + else + completion = SOUP_MESSAGE_IO_INTERRUPTED; + + msg = g_object_ref (io->item->msg); + soup_connection_message_io_finished (soup_message_get_connection (msg), msg); + if (completion_cb) + completion_cb (G_OBJECT (msg), completion, completion_data); + g_object_unref (msg); +} + +static void +soup_client_message_io_http1_stolen (SoupClientMessageIO *iface) +{ + SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface; + SoupMessageIOCompletionFn completion_cb; + gpointer completion_data; + SoupMessage *msg; + + completion_cb = io->base.completion_cb; + completion_data = io->base.completion_data; + + msg = g_object_ref (io->item->msg); + soup_connection_message_io_finished (soup_message_get_connection (msg), msg); + if (completion_cb) + completion_cb (G_OBJECT (msg), SOUP_MESSAGE_IO_STOLEN, completion_data); + g_object_unref (msg); +} + +static gint +processing_stage_cmp (gconstpointer a, + gconstpointer b) +{ + SoupProcessingStage stage_a = soup_content_processor_get_processing_stage (SOUP_CONTENT_PROCESSOR ((gpointer)a)); + SoupProcessingStage stage_b = soup_content_processor_get_processing_stage (SOUP_CONTENT_PROCESSOR ((gpointer)b)); + + if (stage_a > stage_b) + return 1; + if (stage_a == stage_b) + return 0; + return -1; +} + +GInputStream * +soup_message_setup_body_istream (GInputStream *body_stream, + SoupMessage *msg, + SoupSession *session, + SoupProcessingStage start_at_stage) +{ + GInputStream *istream; + GSList *p, *processors; + + istream = g_object_ref (body_stream); + + processors = soup_session_get_features (session, SOUP_TYPE_CONTENT_PROCESSOR); + processors = g_slist_sort (processors, processing_stage_cmp); + + for (p = processors; p; p = p->next) { + GInputStream *wrapper; + SoupContentProcessor *processor; + + processor = SOUP_CONTENT_PROCESSOR (p->data); + if (soup_message_disables_feature (msg, p->data) || + soup_content_processor_get_processing_stage (processor) < start_at_stage) + continue; + + wrapper = soup_content_processor_wrap_input (processor, istream, msg, NULL); + if (wrapper) { + g_object_unref (istream); + istream = wrapper; + } + } + + g_slist_free (processors); + + return istream; +} + +static void +request_body_stream_wrote_data_cb (SoupMessage *msg, + const void *buffer, + guint count, + gboolean is_metadata) +{ + SoupClientMessageIOHTTP1 *client_io = (SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg); + + if (client_io->metrics) { + client_io->metrics->request_body_bytes_sent += count; + if (!is_metadata) + client_io->metrics->request_body_size += count; + } + + if (!is_metadata) + soup_message_wrote_body_data (msg, count); +} + +static void +request_body_stream_wrote_cb (GOutputStream *ostream, + GAsyncResult *result, + SoupMessage *msg) +{ + SoupClientMessageIOHTTP1 *io; + gssize nwrote; + GCancellable *async_wait; + GError *error = NULL; + + nwrote = g_output_stream_splice_finish (ostream, result, &error); + + io = (SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg); + if (!io || !io->base.async_wait || io->base.body_ostream != ostream) { + g_clear_error (&error); + g_object_unref (msg); + return; + } + + if (nwrote != -1) + io->base.write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH; + + if (error) + g_propagate_error (&io->base.async_error, error); + async_wait = io->base.async_wait; + io->base.async_wait = NULL; + g_cancellable_cancel (async_wait); + g_object_unref (async_wait); + + g_object_unref (msg); +} + +static void +closed_async (GObject *source, + GAsyncResult *result, + gpointer user_data) +{ + GOutputStream *body_ostream = G_OUTPUT_STREAM (source); + SoupMessage *msg = user_data; + SoupClientMessageIOHTTP1 *io; + GCancellable *async_wait; + + io = (SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg); + if (!io || !io->base.async_wait || io->base.body_ostream != body_ostream) { + g_object_unref (msg); + return; + } + + g_output_stream_close_finish (body_ostream, result, &io->base.async_error); + g_clear_object (&io->base.body_ostream); + + async_wait = io->base.async_wait; + io->base.async_wait = NULL; + g_cancellable_cancel (async_wait); + g_object_unref (async_wait); + + g_object_unref (msg); +} + +/* + * There are two request/response formats: the basic request/response, + * possibly with one or more unsolicited informational responses (such + * as the WebDAV "102 Processing" response): + * + * Client Server + * W:HEADERS / R:NOT_STARTED -> R:HEADERS / W:NOT_STARTED + * W:BODY / R:NOT_STARTED -> R:BODY / W:NOT_STARTED + * [W:DONE / R:HEADERS (1xx) <- R:DONE / W:HEADERS (1xx) ...] + * W:DONE / R:HEADERS <- R:DONE / W:HEADERS + * W:DONE / R:BODY <- R:DONE / W:BODY + * W:DONE / R:DONE R:DONE / W:DONE + * + * and the "Expect: 100-continue" request/response, with the client + * blocking halfway through its request, and then either continuing or + * aborting, depending on the server response: + * + * Client Server + * W:HEADERS / R:NOT_STARTED -> R:HEADERS / W:NOT_STARTED + * W:BLOCKING / R:HEADERS <- R:BLOCKING / W:HEADERS + * [W:BODY / R:BLOCKING -> R:BODY / W:BLOCKING] + * [W:DONE / R:HEADERS <- R:DONE / W:HEADERS] + * W:DONE / R:BODY <- R:DONE / W:BODY + * W:DONE / R:DONE R:DONE / W:DONE + */ + +static void +write_headers (SoupMessage *msg, + GString *header, + SoupEncoding *encoding) +{ + GUri *uri = soup_message_get_uri (msg); + char *uri_string; + SoupMessageHeadersIter iter; + const char *name, *value; + + if (soup_message_get_method (msg) == SOUP_METHOD_CONNECT) { + char *uri_host = soup_uri_get_host_for_headers (uri); + + /* CONNECT URI is hostname:port for tunnel destination */ + uri_string = g_strdup_printf ("%s:%d", uri_host, g_uri_get_port (uri)); + g_free (uri_host); + } else { + gboolean proxy = soup_connection_is_via_proxy (soup_message_get_connection (msg)); + + /* Proxy expects full URI to destination. Otherwise + * just the path. + */ + if (proxy) + uri_string = g_uri_to_string (uri); + else if (soup_message_get_is_options_ping (msg)) + uri_string = g_strdup ("*"); + else + uri_string = soup_uri_get_path_and_query (uri); + + if (proxy && g_uri_get_fragment (uri)) { + /* Strip fragment */ + char *fragment = strchr (uri_string, '#'); + if (fragment) + *fragment = '\0'; + } + } + + g_string_append_printf (header, "%s %s HTTP/1.%d\r\n", + soup_message_get_method (msg), uri_string, + (soup_message_get_http_version (msg) == SOUP_HTTP_1_0) ? 0 : 1); + g_free (uri_string); + + *encoding = soup_message_headers_get_encoding (soup_message_get_request_headers (msg)); + + soup_message_headers_iter_init (&iter, soup_message_get_request_headers (msg)); + while (soup_message_headers_iter_next (&iter, &name, &value)) + g_string_append_printf (header, "%s: %s\r\n", name, value); + g_string_append (header, "\r\n"); +} + +/* Attempts to push forward the writing side of @msg's I/O. Returns + * %TRUE if it manages to make some progress, and it is likely that + * further progress can be made. Returns %FALSE if it has reached a + * stopping point of some sort (need input from the application, + * socket not writable, write is complete, etc). + */ +static gboolean +io_write (SoupClientMessageIOHTTP1 *client_io, + gboolean blocking, + GCancellable *cancellable, + GError **error) +{ + SoupMessageIOData *io = &client_io->base; + SoupMessage *msg = client_io->item->msg; + SoupSessionFeature *logger; + gssize nwrote; + + if (io->async_error) { + g_propagate_error (error, io->async_error); + io->async_error = NULL; + return FALSE; + } else if (io->async_wait) { + g_set_error_literal (error, G_IO_ERROR, + G_IO_ERROR_WOULD_BLOCK, + _("Operation would block")); + return FALSE; + } + + switch (io->write_state) { + case SOUP_MESSAGE_IO_STATE_HEADERS: + if (!io->write_buf->len) + write_headers (msg, io->write_buf, &io->write_encoding); + + while (io->written < io->write_buf->len) { + nwrote = g_pollable_stream_write (io->ostream, + io->write_buf->str + io->written, + io->write_buf->len - io->written, + blocking, + cancellable, error); + if (nwrote == -1) + return FALSE; + io->written += nwrote; + if (client_io->metrics) + client_io->metrics->request_header_bytes_sent += nwrote; + } + + io->written = 0; + g_string_truncate (io->write_buf, 0); + + if (io->write_encoding == SOUP_ENCODING_CONTENT_LENGTH) + io->write_length = soup_message_headers_get_content_length (soup_message_get_request_headers (msg)); + + if (soup_message_headers_get_expectations (soup_message_get_request_headers (msg)) & SOUP_EXPECTATION_CONTINUE) { + /* Need to wait for the Continue response */ + io->write_state = SOUP_MESSAGE_IO_STATE_BLOCKING; + io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS; + } else + io->write_state = SOUP_MESSAGE_IO_STATE_BODY_START; + + soup_message_wrote_headers (msg); + break; + + case SOUP_MESSAGE_IO_STATE_BODY_START: + io->body_ostream = soup_body_output_stream_new (io->ostream, + io->write_encoding, + io->write_length); + io->write_state = SOUP_MESSAGE_IO_STATE_BODY; + logger = soup_session_get_feature_for_message (client_io->item->session, + SOUP_TYPE_LOGGER, msg); + if (logger) { + soup_logger_request_body_setup (SOUP_LOGGER (logger), msg, + SOUP_BODY_OUTPUT_STREAM (io->body_ostream)); + } + break; + + case SOUP_MESSAGE_IO_STATE_BODY: + if (!io->write_length && + io->write_encoding != SOUP_ENCODING_EOF && + io->write_encoding != SOUP_ENCODING_CHUNKED) { + io->write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH; + break; + } + + if (soup_message_get_request_body_stream (msg)) { + g_signal_connect_object (io->body_ostream, + "wrote-data", + G_CALLBACK (request_body_stream_wrote_data_cb), + msg, G_CONNECT_SWAPPED); + if (blocking) { + nwrote = g_output_stream_splice (io->body_ostream, + soup_message_get_request_body_stream (msg), + G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE, + cancellable, + error); + if (nwrote == -1) + return FALSE; + io->write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH; + break; + } else { + io->async_wait = g_cancellable_new (); + g_output_stream_splice_async (io->body_ostream, + soup_message_get_request_body_stream (msg), + G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE, + soup_client_message_io_http1_get_priority (client_io), + cancellable, + (GAsyncReadyCallback)request_body_stream_wrote_cb, + g_object_ref (msg)); + return FALSE; + } + } else + io->write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH; + break; + + case SOUP_MESSAGE_IO_STATE_BODY_FLUSH: + if (io->body_ostream) { + if (blocking || io->write_encoding != SOUP_ENCODING_CHUNKED) { + if (!g_output_stream_close (io->body_ostream, cancellable, error)) + return FALSE; + g_clear_object (&io->body_ostream); + } else { + io->async_wait = g_cancellable_new (); + g_output_stream_close_async (io->body_ostream, + soup_client_message_io_http1_get_priority (client_io), + cancellable, + closed_async, g_object_ref (msg)); + } + } + + io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE; + break; + + case SOUP_MESSAGE_IO_STATE_BODY_DONE: + io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING; + soup_message_wrote_body (msg); + break; + + case SOUP_MESSAGE_IO_STATE_FINISHING: + io->write_state = SOUP_MESSAGE_IO_STATE_DONE; + io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS; + break; + + default: + g_return_val_if_reached (FALSE); + } + + return TRUE; +} + +static gboolean +parse_headers (SoupMessage *msg, + char *headers, + guint headers_len, + SoupEncoding *encoding, + GError **error) +{ + SoupHTTPVersion version; + char *reason_phrase; + SoupStatus status; + + soup_message_set_reason_phrase (msg, NULL); + + if (!soup_headers_parse_response (headers, headers_len, + soup_message_get_response_headers (msg), + &version, + &status, + &reason_phrase)) { + g_set_error_literal (error, SOUP_SESSION_ERROR, + SOUP_SESSION_ERROR_PARSING, + _("Could not parse HTTP response")); + return FALSE; + } + + soup_message_set_status (msg, status, reason_phrase); + g_free (reason_phrase); + + if (version < soup_message_get_http_version (msg)) + soup_message_set_http_version (msg, version); + + if ((soup_message_get_method (msg) == SOUP_METHOD_HEAD || + soup_message_get_status (msg) == SOUP_STATUS_NO_CONTENT || + soup_message_get_status (msg) == SOUP_STATUS_NOT_MODIFIED || + SOUP_STATUS_IS_INFORMATIONAL (soup_message_get_status (msg))) || + (soup_message_get_method (msg) == SOUP_METHOD_CONNECT && + SOUP_STATUS_IS_SUCCESSFUL (soup_message_get_status (msg)))) + *encoding = SOUP_ENCODING_NONE; + else + *encoding = soup_message_headers_get_encoding (soup_message_get_response_headers (msg)); + + if (*encoding == SOUP_ENCODING_UNRECOGNIZED) { + g_set_error_literal (error, SOUP_SESSION_ERROR, + SOUP_SESSION_ERROR_ENCODING, + _("Unrecognized HTTP response encoding")); + return FALSE; + } + + return TRUE; +} + +static void +response_network_stream_read_data_cb (SoupMessage *msg, + guint count) +{ + SoupClientMessageIOHTTP1 *client_io = (SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg); + + if (client_io->base.read_state < SOUP_MESSAGE_IO_STATE_BODY_START) + client_io->metrics->response_header_bytes_received += count; + else + client_io->metrics->response_body_bytes_received += count; +} + +/* Attempts to push forward the reading side of @msg's I/O. Returns + * %TRUE if it manages to make some progress, and it is likely that + * further progress can be made. Returns %FALSE if it has reached a + * stopping point of some sort (need input from the application, + * socket not readable, read is complete, etc). + */ +static gboolean +io_read (SoupClientMessageIOHTTP1 *client_io, + gboolean blocking, + GCancellable *cancellable, + GError **error) +{ + SoupMessageIOData *io = &client_io->base; + SoupMessage *msg = client_io->item->msg; + gboolean succeeded; + gboolean is_first_read; + gushort extra_bytes; + + switch (io->read_state) { + case SOUP_MESSAGE_IO_STATE_HEADERS: + is_first_read = io->read_header_buf->len == 0 && + soup_message_get_status (msg) == SOUP_STATUS_NONE; + + if (!soup_message_io_data_read_headers (io, blocking, cancellable, &extra_bytes, error)) + return FALSE; + + if (client_io->metrics) { + /* Adjust the header and body bytes received, since we might + * have read part of the body already that is queued by the stream. + */ + if (client_io->metrics->response_header_bytes_received > io->read_header_buf->len + extra_bytes) { + client_io->metrics->response_body_bytes_received = + client_io->metrics->response_header_bytes_received - io->read_header_buf->len - extra_bytes; + client_io->metrics->response_header_bytes_received -= client_io->metrics->response_body_bytes_received; + } + } + + if (is_first_read) + soup_message_set_metrics_timestamp (msg, SOUP_MESSAGE_METRICS_RESPONSE_START); + + succeeded = parse_headers (msg, + (char *)io->read_header_buf->data, + io->read_header_buf->len, + &io->read_encoding, + error); + g_byte_array_set_size (io->read_header_buf, 0); + + if (!succeeded) { + /* Either we couldn't parse the headers, or they + * indicated something that would mean we wouldn't + * be able to parse the body. (Eg, unknown + * Transfer-Encoding.). Skip the rest of the + * reading, and make sure the connection gets + * closed when we're done. + */ + soup_message_headers_append (soup_message_get_request_headers (msg), + "Connection", "close"); + soup_message_set_metrics_timestamp (msg, SOUP_MESSAGE_METRICS_RESPONSE_END); + io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING; + break; + } + + if (SOUP_STATUS_IS_INFORMATIONAL (soup_message_get_status (msg))) { + if (soup_message_get_status (msg) == SOUP_STATUS_CONTINUE && + io->write_state == SOUP_MESSAGE_IO_STATE_BLOCKING) { + /* Pause the reader, unpause the writer */ + io->read_state = + SOUP_MESSAGE_IO_STATE_BLOCKING; + io->write_state = + SOUP_MESSAGE_IO_STATE_BODY_START; + } else { + /* Just stay in HEADERS */ + io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS; + } + + /* Informational responses have no bodies, so + * bail out here rather than parsing encoding, etc + */ + soup_message_got_informational (msg); + + /* If this was "101 Switching Protocols", then + * the session may have stolen the connection... + */ + if (client_io != (SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg)) + return FALSE; + + soup_message_cleanup_response (msg); + break; + } else { + io->read_state = SOUP_MESSAGE_IO_STATE_BODY_START; + + /* If the client was waiting for a Continue + * but got something else, then it's done + * writing. + */ + if (io->write_state == SOUP_MESSAGE_IO_STATE_BLOCKING) + io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING; + } + + if (io->read_encoding == SOUP_ENCODING_CONTENT_LENGTH) { + io->read_length = soup_message_headers_get_content_length (soup_message_get_response_headers (msg)); + + if (!soup_message_is_keepalive (msg)) { + /* Some servers suck and send + * incorrect Content-Length values, so + * allow EOF termination in this case + * (iff the message is too short) too. + */ + io->read_encoding = SOUP_ENCODING_EOF; + } + } else + io->read_length = -1; + + soup_message_got_headers (msg); + break; + + case SOUP_MESSAGE_IO_STATE_BODY_START: + if (!io->body_istream) { + GInputStream *body_istream = soup_body_input_stream_new (G_INPUT_STREAM (io->istream), + io->read_encoding, + io->read_length); + + io->body_istream = soup_message_setup_body_istream (body_istream, msg, + client_io->item->session, + SOUP_STAGE_MESSAGE_BODY); + g_object_unref (body_istream); + } + + if (soup_message_get_content_sniffer (msg)) { + SoupContentSnifferStream *sniffer_stream = SOUP_CONTENT_SNIFFER_STREAM (io->body_istream); + const char *content_type; + GHashTable *params; + + if (!soup_content_sniffer_stream_is_ready (sniffer_stream, blocking, + cancellable, error)) + return FALSE; + + content_type = soup_content_sniffer_stream_sniff (sniffer_stream, ¶ms); + soup_message_content_sniffed (msg, content_type, params); + } + + io->read_state = SOUP_MESSAGE_IO_STATE_BODY; + break; + + case SOUP_MESSAGE_IO_STATE_BODY: { + guchar buf[RESPONSE_BLOCK_SIZE]; + gssize nread; + + nread = g_pollable_stream_read (io->body_istream, + buf, + RESPONSE_BLOCK_SIZE, + blocking, + cancellable, error); + if (nread == -1) + return FALSE; + + if (nread == 0) + io->read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE; + + if (client_io->metrics) + client_io->metrics->response_body_size += nread; + + break; + } + + case SOUP_MESSAGE_IO_STATE_BODY_DONE: + io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING; + soup_message_set_metrics_timestamp (msg, SOUP_MESSAGE_METRICS_RESPONSE_END); + soup_message_got_body (msg); + break; + + case SOUP_MESSAGE_IO_STATE_FINISHING: + io->read_state = SOUP_MESSAGE_IO_STATE_DONE; + break; + + default: + g_return_val_if_reached (FALSE); + } + + return TRUE; +} + +static gboolean +request_is_restartable (SoupMessage *msg, GError *error) +{ + SoupClientMessageIOHTTP1 *client_io = (SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg); + SoupMessageIOData *io; + + if (!client_io) + return FALSE; + + io = &client_io->base; + + return (io->read_state <= SOUP_MESSAGE_IO_STATE_HEADERS && + io->read_header_buf->len == 0 && + soup_connection_get_ever_used (soup_message_get_connection (client_io->item->msg)) && + !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT) && + !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) && + !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) && + error->domain != G_TLS_ERROR && + SOUP_METHOD_IS_IDEMPOTENT (soup_message_get_method (msg))); +} + +static gboolean +io_run_until (SoupClientMessageIOHTTP1 *client_io, + gboolean blocking, + SoupMessageIOState read_state, + SoupMessageIOState write_state, + GCancellable *cancellable, + GError **error) +{ + SoupMessageIOData *io = &client_io->base; + SoupMessage *msg = client_io->item->msg; + gboolean progress = TRUE, done; + GError *my_error = NULL; + + if (g_cancellable_set_error_if_cancelled (cancellable, error)) + return FALSE; + else if (!io) { + g_set_error_literal (error, G_IO_ERROR, + G_IO_ERROR_CANCELLED, + _("Operation was cancelled")); + return FALSE; + } + + g_object_ref (msg); + + while (progress && (SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg) == client_io && + !io->paused && !io->async_wait && + (io->read_state < read_state || io->write_state < write_state)) { + + if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state)) + progress = io_read (client_io, blocking, cancellable, &my_error); + else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state)) + progress = io_write (client_io, blocking, cancellable, &my_error); + else + progress = FALSE; + } + + if (my_error) { + g_propagate_error (error, my_error); + g_object_unref (msg); + return FALSE; + } else if ((SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg) != client_io) { + g_set_error_literal (error, G_IO_ERROR, + G_IO_ERROR_CANCELLED, + _("Operation was cancelled")); + g_object_unref (msg); + return FALSE; + } else if (!io->async_wait && + g_cancellable_set_error_if_cancelled (cancellable, error)) { + g_object_unref (msg); + return FALSE; + } + + done = (io->read_state >= read_state && + io->write_state >= write_state); + + if (!blocking && !done) { + g_set_error_literal (error, G_IO_ERROR, + G_IO_ERROR_WOULD_BLOCK, + _("Operation would block")); + g_object_unref (msg); + return FALSE; + } + +#ifdef HAVE_SYSPROF + /* Allow profiling of network requests. */ + if (io->read_state == SOUP_MESSAGE_IO_STATE_DONE && + io->write_state == SOUP_MESSAGE_IO_STATE_DONE) { + GUri *uri = soup_message_get_uri (msg); + char *uri_str = g_uri_to_string_partial (uri, G_URI_HIDE_PASSWORD); + const gchar *last_modified = soup_message_headers_get_one (soup_message_get_response_headers (msg), "Last-Modified"); + const gchar *etag = soup_message_headers_get_one (soup_message_get_response_headers (msg), "ETag"); + const gchar *if_modified_since = soup_message_headers_get_one (soup_message_get_request_headers (msg), "If-Modified-Since"); + const gchar *if_none_match = soup_message_headers_get_one (soup_message_get_request_headers (msg), "If-None-Match"); + + /* FIXME: Expand and generalise sysprof support: + * https://gitlab.gnome.org/GNOME/sysprof/-/issues/43 */ + sysprof_collector_mark_printf (client_io->begin_time_nsec, + SYSPROF_CAPTURE_CURRENT_TIME - client_io->begin_time_nsec, + "libsoup", "message", + "%s request/response to %s: " + "read %" G_GOFFSET_FORMAT "B, " + "wrote %" G_GOFFSET_FORMAT "B, " + "If-Modified-Since: %s, " + "If-None-Match: %s, " + "Last-Modified: %s, " + "ETag: %s", + soup_message_get_tls_peer_certificate (msg) ? "HTTPS" : "HTTP", + uri_str, io->read_length, io->write_length, + (if_modified_since != NULL) ? if_modified_since : "(unset)", + (if_none_match != NULL) ? if_none_match : "(unset)", + (last_modified != NULL) ? last_modified : "(unset)", + (etag != NULL) ? etag : "(unset)"); + g_free (uri_str); + } +#endif /* HAVE_SYSPROF */ + + g_object_unref (msg); + return done; +} + +static void +soup_message_io_finish (SoupMessage *msg, + GError *error) +{ + if (request_is_restartable (msg, error)) { + SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg); + + /* Connection got closed, but we can safely try again. */ + io->item->state = SOUP_MESSAGE_RESTARTING; + } else if (error) { + soup_message_set_metrics_timestamp (msg, SOUP_MESSAGE_METRICS_RESPONSE_END); + } + + soup_message_io_finished (msg); +} + +static void soup_client_message_io_http1_run (SoupClientMessageIO *iface, gboolean blocking); + +static gboolean +io_run_ready (SoupMessage *msg, gpointer user_data) +{ + soup_client_message_io_http1_run (soup_message_get_io_data (msg), FALSE); + return FALSE; +} + +static void +soup_client_message_io_http1_run (SoupClientMessageIO *iface, + gboolean blocking) +{ + SoupClientMessageIOHTTP1 *client_io = (SoupClientMessageIOHTTP1 *)iface; + SoupMessageIOData *io = &client_io->base; + SoupMessage *msg = client_io->item->msg; + GError *error = NULL; + + if (io->io_source) { + g_source_destroy (io->io_source); + g_source_unref (io->io_source); + io->io_source = NULL; + } + + g_object_ref (msg); + + if (io_run_until (client_io, blocking, + SOUP_MESSAGE_IO_STATE_DONE, + SOUP_MESSAGE_IO_STATE_DONE, + client_io->item->cancellable, &error)) { + soup_message_io_finished (msg); + } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { + g_clear_error (&error); + io->io_source = soup_message_io_data_get_source (io, G_OBJECT (msg), + client_io->item->cancellable, + (SoupMessageIOSourceFunc)io_run_ready, + NULL); + g_source_set_priority (io->io_source, + soup_client_message_io_http1_get_priority (client_io)); + g_source_attach (io->io_source, g_main_context_get_thread_default ()); + } else { + if ((SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg) == client_io) + soup_message_io_finish (msg, error); + g_error_free (error); + + } + + g_object_unref (msg); +} + +static gboolean +soup_client_message_io_http1_run_until_read (SoupClientMessageIO *iface, + GCancellable *cancellable, + GError **error) +{ + SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface; + SoupMessage *msg = io->item->msg; + + if (io_run_until (io, TRUE, + SOUP_MESSAGE_IO_STATE_BODY, + SOUP_MESSAGE_IO_STATE_ANY, + cancellable, error)) + return TRUE; + + if ((SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg) == io) + soup_message_io_finish (msg, *error); + + return FALSE; +} + +static void io_run_until_read_async (SoupClientMessageIOHTTP1 *io, GTask *task); + +static gboolean +io_run_until_read_ready (SoupMessage *msg, + gpointer user_data) +{ + GTask *task = user_data; + + io_run_until_read_async ((SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg), task); + return FALSE; +} + +static void +io_run_until_read_async (SoupClientMessageIOHTTP1 *client_io, + GTask *task) +{ + SoupMessageIOData *io = &client_io->base; + SoupMessage *msg = client_io->item->msg; + GError *error = NULL; + + if (io->io_source) { + g_source_destroy (io->io_source); + g_source_unref (io->io_source); + io->io_source = NULL; + } + + if (io_run_until (client_io, FALSE, + SOUP_MESSAGE_IO_STATE_BODY, + SOUP_MESSAGE_IO_STATE_ANY, + g_task_get_cancellable (task), + &error)) { + g_task_return_boolean (task, TRUE); + g_object_unref (task); + return; + } + + if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { + g_error_free (error); + io->io_source = soup_message_io_data_get_source (io, G_OBJECT (msg), g_task_get_cancellable (task), + (SoupMessageIOSourceFunc)io_run_until_read_ready, + task); + g_source_set_priority (io->io_source, g_task_get_priority (task)); + g_source_attach (io->io_source, g_main_context_get_thread_default ()); + return; + } + + if ((SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg) == client_io) + soup_message_io_finish (msg, error); + + g_task_return_error (task, error); + g_object_unref (task); +} + +static void +soup_client_message_io_http1_run_until_read_async (SoupClientMessageIO *iface, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface; + SoupMessage *msg = io->item->msg; + GTask *task; + + task = g_task_new (msg, cancellable, callback, user_data); + g_task_set_priority (task, io_priority); + io_run_until_read_async (io, task); +} + +static gboolean +soup_client_message_io_http1_run_until_finish (SoupClientMessageIO *iface, + gboolean blocking, + GCancellable *cancellable, + GError **error) +{ + SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface; + SoupMessage *msg = io->item->msg; + gboolean success; + + g_object_ref (msg); + + if (io) { + if (io->base.read_state < SOUP_MESSAGE_IO_STATE_BODY_DONE) + io->base.read_state = SOUP_MESSAGE_IO_STATE_FINISHING; + } + + success = io_run_until (io, blocking, + SOUP_MESSAGE_IO_STATE_DONE, + SOUP_MESSAGE_IO_STATE_DONE, + cancellable, error); + + g_object_unref (msg); + return success; +} + +static void +client_stream_eof (SoupClientInputStream *stream, + SoupClientMessageIOHTTP1 *io) +{ + if (io && io->base.read_state == SOUP_MESSAGE_IO_STATE_BODY) + io->base.read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE; +} + +static GInputStream * +soup_client_message_io_http1_get_response_stream (SoupClientMessageIO *iface, + GError **error) +{ + SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface; + SoupMessage *msg = io->item->msg; + GInputStream *client_stream; + + client_stream = soup_client_input_stream_new (io->base.body_istream, msg); + g_signal_connect (client_stream, "eof", + G_CALLBACK (client_stream_eof), io); + + return client_stream; +} + +static void +soup_client_message_io_http1_send_item (SoupClientMessageIO *iface, + SoupMessageQueueItem *item, + SoupMessageIOCompletionFn completion_cb, + gpointer user_data) +{ + SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface; + + io->item = soup_message_queue_item_ref (item); + io->base.completion_cb = completion_cb; + io->base.completion_data = user_data; + + io->metrics = soup_message_get_metrics (io->item->msg); + if (io->metrics) { + g_signal_connect_object (io->base.istream, "read-data", + G_CALLBACK (response_network_stream_read_data_cb), + io->item->msg, G_CONNECT_SWAPPED); + } + +#ifdef HAVE_SYSPROF + io->begin_time_nsec = SYSPROF_CAPTURE_CURRENT_TIME; +#endif +} + +static void +soup_client_message_io_http1_pause (SoupClientMessageIO *iface) +{ + SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface; + + g_return_if_fail (io->base.read_state < SOUP_MESSAGE_IO_STATE_BODY); + + soup_message_io_data_pause (&io->base); +} + +static void +soup_client_message_io_http1_unpause (SoupClientMessageIO *iface) +{ + SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface; + + g_return_if_fail (io->base.read_state < SOUP_MESSAGE_IO_STATE_BODY); + io->base.paused = FALSE; +} + +static gboolean +soup_client_message_io_http1_is_paused (SoupClientMessageIO *iface) +{ + SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface; + + return io->base.paused; +} + +static const SoupClientMessageIOFuncs io_funcs = { + soup_client_message_io_http1_destroy, + soup_client_message_io_http1_finished, + soup_client_message_io_http1_stolen, + soup_client_message_io_http1_send_item, + soup_client_message_io_http1_get_response_stream, + soup_client_message_io_http1_pause, + soup_client_message_io_http1_unpause, + soup_client_message_io_http1_is_paused, + soup_client_message_io_http1_run, + soup_client_message_io_http1_run_until_read, + soup_client_message_io_http1_run_until_read_async, + soup_client_message_io_http1_run_until_finish +}; + +SoupClientMessageIO * +soup_client_message_io_http1_new (GIOStream *stream) +{ + SoupClientMessageIOHTTP1 *io; + + io = g_slice_new0 (SoupClientMessageIOHTTP1); + io->base.iostream = g_object_ref (stream); + io->base.istream = SOUP_FILTER_INPUT_STREAM (g_io_stream_get_input_stream (io->base.iostream)); + io->base.ostream = g_io_stream_get_output_stream (io->base.iostream); + + io->base.read_header_buf = g_byte_array_new (); + io->base.write_buf = g_string_new (NULL); + + io->base.read_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED; + io->base.write_state = SOUP_MESSAGE_IO_STATE_HEADERS; + + io->iface.funcs = &io_funcs; + + return (SoupClientMessageIO *)io; +} diff --git a/libsoup/soup-client-message-io-http1.h b/libsoup/soup-client-message-io-http1.h new file mode 100644 index 00000000..e749360d --- /dev/null +++ b/libsoup/soup-client-message-io-http1.h @@ -0,0 +1,10 @@ +/* -*- Mode: C; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 8 -*- */ +/* + * Copyright (C) 2021 Igalia S.L. + */ + +#pragma once + +#include "soup-client-message-io.h" + +SoupClientMessageIO *soup_client_message_io_http1_new (GIOStream *stream); diff --git a/libsoup/soup-connection.c b/libsoup/soup-connection.c index 8cd77925..d74a6398 100644 --- a/libsoup/soup-connection.c +++ b/libsoup/soup-connection.c @@ -13,6 +13,7 @@ #include "soup.h" #include "soup-io-stream.h" #include "soup-message-queue-item.h" +#include "soup-client-message-io-http1.h" #include "soup-socket-properties.h" #include "soup-private-enum-types.h" #include <gio/gnetworking.h> @@ -1047,7 +1048,7 @@ soup_connection_setup_message_io (SoupConnection *conn, priv->reusable = FALSE; g_assert (priv->io_data == NULL); - priv->io_data = soup_client_message_io_data_new (priv->iostream); + priv->io_data = soup_client_message_io_http1_new (priv->iostream); return priv->io_data; } diff --git a/libsoup/soup-message-io.c b/libsoup/soup-message-io.c deleted file mode 100644 index eba8fe8f..00000000 --- a/libsoup/soup-message-io.c +++ /dev/null @@ -1,1117 +0,0 @@ -/* -*- Mode: C; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 8 -*- */ -/* - * soup-message-io.c: HTTP message I/O - * - * Copyright (C) 2000-2003, Ximian, Inc. - */ - -#ifdef HAVE_CONFIG_H -#include "config.h" -#endif - -#include <glib/gi18n-lib.h> - -#ifdef HAVE_SYSPROF -#include <sysprof-capture.h> -#endif - -#include "soup.h" -#include "soup-body-input-stream.h" -#include "soup-body-output-stream.h" -#include "soup-client-input-stream.h" -#include "soup-connection.h" -#include "soup-content-processor.h" -#include "content-sniffer/soup-content-sniffer-stream.h" -#include "soup-filter-input-stream.h" -#include "soup-logger-private.h" -#include "soup-message-private.h" -#include "soup-message-metrics-private.h" -#include "soup-message-queue-item.h" -#include "soup-client-message-io.h" -#include "soup-misc.h" -#include "soup-uri-utils-private.h" - -typedef struct { - SoupClientMessageIO iface; - SoupMessageIOData base; - - SoupMessageQueueItem *item; - - SoupMessageMetrics *metrics; - -#ifdef HAVE_SYSPROF - gint64 begin_time_nsec; -#endif -} SoupClientMessageIOData; - -#define RESPONSE_BLOCK_SIZE 8192 -#define HEADER_SIZE_LIMIT (64 * 1024) - -static void -soup_client_message_io_data_destroy (SoupClientMessageIO *iface) -{ - SoupClientMessageIOData *io = (SoupClientMessageIOData *)iface; - - soup_message_io_data_cleanup (&io->base); - soup_message_queue_item_unref (io->item); - - g_slice_free (SoupClientMessageIOData, io); -} - -static int -soup_client_message_io_data_get_priority (SoupClientMessageIOData *io) -{ - if (!io->item->task) - return G_PRIORITY_DEFAULT; - - return g_task_get_priority (io->item->task); -} - -static void -soup_client_message_io_data_finished (SoupClientMessageIO *iface) -{ - SoupClientMessageIOData *io = (SoupClientMessageIOData *)iface; - SoupMessageIOCompletionFn completion_cb; - gpointer completion_data; - SoupMessageIOCompletion completion; - SoupMessage *msg; - - completion_cb = io->base.completion_cb; - completion_data = io->base.completion_data; - - if ((io->base.read_state >= SOUP_MESSAGE_IO_STATE_FINISHING && - io->base.write_state >= SOUP_MESSAGE_IO_STATE_FINISHING)) - completion = SOUP_MESSAGE_IO_COMPLETE; - else - completion = SOUP_MESSAGE_IO_INTERRUPTED; - - msg = g_object_ref (io->item->msg); - soup_connection_message_io_finished (soup_message_get_connection (msg), msg); - if (completion_cb) - completion_cb (G_OBJECT (msg), completion, completion_data); - g_object_unref (msg); -} - -static void -soup_client_message_io_data_stolen (SoupClientMessageIO *iface) -{ - SoupClientMessageIOData *io = (SoupClientMessageIOData *)iface; - SoupMessageIOCompletionFn completion_cb; - gpointer completion_data; - SoupMessage *msg; - - completion_cb = io->base.completion_cb; - completion_data = io->base.completion_data; - - msg = g_object_ref (io->item->msg); - soup_connection_message_io_finished (soup_message_get_connection (msg), msg); - if (completion_cb) - completion_cb (G_OBJECT (msg), SOUP_MESSAGE_IO_STOLEN, completion_data); - g_object_unref (msg); -} - -static gint -processing_stage_cmp (gconstpointer a, - gconstpointer b) -{ - SoupProcessingStage stage_a = soup_content_processor_get_processing_stage (SOUP_CONTENT_PROCESSOR ((gpointer)a)); - SoupProcessingStage stage_b = soup_content_processor_get_processing_stage (SOUP_CONTENT_PROCESSOR ((gpointer)b)); - - if (stage_a > stage_b) - return 1; - if (stage_a == stage_b) - return 0; - return -1; -} - -GInputStream * -soup_message_setup_body_istream (GInputStream *body_stream, - SoupMessage *msg, - SoupSession *session, - SoupProcessingStage start_at_stage) -{ - GInputStream *istream; - GSList *p, *processors; - - istream = g_object_ref (body_stream); - - processors = soup_session_get_features (session, SOUP_TYPE_CONTENT_PROCESSOR); - processors = g_slist_sort (processors, processing_stage_cmp); - - for (p = processors; p; p = p->next) { - GInputStream *wrapper; - SoupContentProcessor *processor; - - processor = SOUP_CONTENT_PROCESSOR (p->data); - if (soup_message_disables_feature (msg, p->data) || - soup_content_processor_get_processing_stage (processor) < start_at_stage) - continue; - - wrapper = soup_content_processor_wrap_input (processor, istream, msg, NULL); - if (wrapper) { - g_object_unref (istream); - istream = wrapper; - } - } - - g_slist_free (processors); - - return istream; -} - -static void -request_body_stream_wrote_data_cb (SoupMessage *msg, - const void *buffer, - guint count, - gboolean is_metadata) -{ - SoupClientMessageIOData *client_io = (SoupClientMessageIOData *)soup_message_get_io_data (msg); - - if (client_io->metrics) { - client_io->metrics->request_body_bytes_sent += count; - if (!is_metadata) - client_io->metrics->request_body_size += count; - } - - if (!is_metadata) - soup_message_wrote_body_data (msg, count); -} - -static void -request_body_stream_wrote_cb (GOutputStream *ostream, - GAsyncResult *result, - SoupMessage *msg) -{ - SoupClientMessageIOData *io; - gssize nwrote; - GCancellable *async_wait; - GError *error = NULL; - - nwrote = g_output_stream_splice_finish (ostream, result, &error); - - io = (SoupClientMessageIOData *)soup_message_get_io_data (msg); - if (!io || !io->base.async_wait || io->base.body_ostream != ostream) { - g_clear_error (&error); - g_object_unref (msg); - return; - } - - if (nwrote != -1) - io->base.write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH; - - if (error) - g_propagate_error (&io->base.async_error, error); - async_wait = io->base.async_wait; - io->base.async_wait = NULL; - g_cancellable_cancel (async_wait); - g_object_unref (async_wait); - - g_object_unref (msg); -} - -static void -closed_async (GObject *source, - GAsyncResult *result, - gpointer user_data) -{ - GOutputStream *body_ostream = G_OUTPUT_STREAM (source); - SoupMessage *msg = user_data; - SoupClientMessageIOData *io; - GCancellable *async_wait; - - io = (SoupClientMessageIOData *)soup_message_get_io_data (msg); - if (!io || !io->base.async_wait || io->base.body_ostream != body_ostream) { - g_object_unref (msg); - return; - } - - g_output_stream_close_finish (body_ostream, result, &io->base.async_error); - g_clear_object (&io->base.body_ostream); - - async_wait = io->base.async_wait; - io->base.async_wait = NULL; - g_cancellable_cancel (async_wait); - g_object_unref (async_wait); - - g_object_unref (msg); -} - -/* - * There are two request/response formats: the basic request/response, - * possibly with one or more unsolicited informational responses (such - * as the WebDAV "102 Processing" response): - * - * Client Server - * W:HEADERS / R:NOT_STARTED -> R:HEADERS / W:NOT_STARTED - * W:BODY / R:NOT_STARTED -> R:BODY / W:NOT_STARTED - * [W:DONE / R:HEADERS (1xx) <- R:DONE / W:HEADERS (1xx) ...] - * W:DONE / R:HEADERS <- R:DONE / W:HEADERS - * W:DONE / R:BODY <- R:DONE / W:BODY - * W:DONE / R:DONE R:DONE / W:DONE - * - * and the "Expect: 100-continue" request/response, with the client - * blocking halfway through its request, and then either continuing or - * aborting, depending on the server response: - * - * Client Server - * W:HEADERS / R:NOT_STARTED -> R:HEADERS / W:NOT_STARTED - * W:BLOCKING / R:HEADERS <- R:BLOCKING / W:HEADERS - * [W:BODY / R:BLOCKING -> R:BODY / W:BLOCKING] - * [W:DONE / R:HEADERS <- R:DONE / W:HEADERS] - * W:DONE / R:BODY <- R:DONE / W:BODY - * W:DONE / R:DONE R:DONE / W:DONE - */ - -static void -write_headers (SoupMessage *msg, - GString *header, - SoupEncoding *encoding) -{ - GUri *uri = soup_message_get_uri (msg); - char *uri_string; - SoupMessageHeadersIter iter; - const char *name, *value; - - if (soup_message_get_method (msg) == SOUP_METHOD_CONNECT) { - char *uri_host = soup_uri_get_host_for_headers (uri); - - /* CONNECT URI is hostname:port for tunnel destination */ - uri_string = g_strdup_printf ("%s:%d", uri_host, g_uri_get_port (uri)); - g_free (uri_host); - } else { - gboolean proxy = soup_connection_is_via_proxy (soup_message_get_connection (msg)); - - /* Proxy expects full URI to destination. Otherwise - * just the path. - */ - if (proxy) - uri_string = g_uri_to_string (uri); - else if (soup_message_get_is_options_ping (msg)) - uri_string = g_strdup ("*"); - else - uri_string = soup_uri_get_path_and_query (uri); - - if (proxy && g_uri_get_fragment (uri)) { - /* Strip fragment */ - char *fragment = strchr (uri_string, '#'); - if (fragment) - *fragment = '\0'; - } - } - - g_string_append_printf (header, "%s %s HTTP/1.%d\r\n", - soup_message_get_method (msg), uri_string, - (soup_message_get_http_version (msg) == SOUP_HTTP_1_0) ? 0 : 1); - g_free (uri_string); - - *encoding = soup_message_headers_get_encoding (soup_message_get_request_headers (msg)); - - soup_message_headers_iter_init (&iter, soup_message_get_request_headers (msg)); - while (soup_message_headers_iter_next (&iter, &name, &value)) - g_string_append_printf (header, "%s: %s\r\n", name, value); - g_string_append (header, "\r\n"); -} - -/* Attempts to push forward the writing side of @msg's I/O. Returns - * %TRUE if it manages to make some progress, and it is likely that - * further progress can be made. Returns %FALSE if it has reached a - * stopping point of some sort (need input from the application, - * socket not writable, write is complete, etc). - */ -static gboolean -io_write (SoupClientMessageIOData *client_io, - gboolean blocking, - GCancellable *cancellable, - GError **error) -{ - SoupMessageIOData *io = &client_io->base; - SoupMessage *msg = client_io->item->msg; - SoupSessionFeature *logger; - gssize nwrote; - - if (io->async_error) { - g_propagate_error (error, io->async_error); - io->async_error = NULL; - return FALSE; - } else if (io->async_wait) { - g_set_error_literal (error, G_IO_ERROR, - G_IO_ERROR_WOULD_BLOCK, - _("Operation would block")); - return FALSE; - } - - switch (io->write_state) { - case SOUP_MESSAGE_IO_STATE_HEADERS: - if (!io->write_buf->len) - write_headers (msg, io->write_buf, &io->write_encoding); - - while (io->written < io->write_buf->len) { - nwrote = g_pollable_stream_write (io->ostream, - io->write_buf->str + io->written, - io->write_buf->len - io->written, - blocking, - cancellable, error); - if (nwrote == -1) - return FALSE; - io->written += nwrote; - if (client_io->metrics) - client_io->metrics->request_header_bytes_sent += nwrote; - } - - io->written = 0; - g_string_truncate (io->write_buf, 0); - - if (io->write_encoding == SOUP_ENCODING_CONTENT_LENGTH) - io->write_length = soup_message_headers_get_content_length (soup_message_get_request_headers (msg)); - - if (soup_message_headers_get_expectations (soup_message_get_request_headers (msg)) & SOUP_EXPECTATION_CONTINUE) { - /* Need to wait for the Continue response */ - io->write_state = SOUP_MESSAGE_IO_STATE_BLOCKING; - io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS; - } else - io->write_state = SOUP_MESSAGE_IO_STATE_BODY_START; - - soup_message_wrote_headers (msg); - break; - - case SOUP_MESSAGE_IO_STATE_BODY_START: - io->body_ostream = soup_body_output_stream_new (io->ostream, - io->write_encoding, - io->write_length); - io->write_state = SOUP_MESSAGE_IO_STATE_BODY; - logger = soup_session_get_feature_for_message (client_io->item->session, - SOUP_TYPE_LOGGER, msg); - if (logger) { - soup_logger_request_body_setup (SOUP_LOGGER (logger), msg, - SOUP_BODY_OUTPUT_STREAM (io->body_ostream)); - } - break; - - case SOUP_MESSAGE_IO_STATE_BODY: - if (!io->write_length && - io->write_encoding != SOUP_ENCODING_EOF && - io->write_encoding != SOUP_ENCODING_CHUNKED) { - io->write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH; - break; - } - - if (soup_message_get_request_body_stream (msg)) { - g_signal_connect_object (io->body_ostream, - "wrote-data", - G_CALLBACK (request_body_stream_wrote_data_cb), - msg, G_CONNECT_SWAPPED); - if (blocking) { - nwrote = g_output_stream_splice (io->body_ostream, - soup_message_get_request_body_stream (msg), - G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE, - cancellable, - error); - if (nwrote == -1) - return FALSE; - io->write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH; - break; - } else { - io->async_wait = g_cancellable_new (); - g_output_stream_splice_async (io->body_ostream, - soup_message_get_request_body_stream (msg), - G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE, - soup_client_message_io_data_get_priority (client_io), - cancellable, - (GAsyncReadyCallback)request_body_stream_wrote_cb, - g_object_ref (msg)); - return FALSE; - } - } else - io->write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH; - break; - - case SOUP_MESSAGE_IO_STATE_BODY_FLUSH: - if (io->body_ostream) { - if (blocking || io->write_encoding != SOUP_ENCODING_CHUNKED) { - if (!g_output_stream_close (io->body_ostream, cancellable, error)) - return FALSE; - g_clear_object (&io->body_ostream); - } else { - io->async_wait = g_cancellable_new (); - g_output_stream_close_async (io->body_ostream, - soup_client_message_io_data_get_priority (client_io), - cancellable, - closed_async, g_object_ref (msg)); - } - } - - io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE; - break; - - case SOUP_MESSAGE_IO_STATE_BODY_DONE: - io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING; - soup_message_wrote_body (msg); - break; - - case SOUP_MESSAGE_IO_STATE_FINISHING: - io->write_state = SOUP_MESSAGE_IO_STATE_DONE; - io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS; - break; - - default: - g_return_val_if_reached (FALSE); - } - - return TRUE; -} - -static gboolean -parse_headers (SoupMessage *msg, - char *headers, - guint headers_len, - SoupEncoding *encoding, - GError **error) -{ - SoupHTTPVersion version; - char *reason_phrase; - SoupStatus status; - - soup_message_set_reason_phrase (msg, NULL); - - if (!soup_headers_parse_response (headers, headers_len, - soup_message_get_response_headers (msg), - &version, - &status, - &reason_phrase)) { - g_set_error_literal (error, SOUP_SESSION_ERROR, - SOUP_SESSION_ERROR_PARSING, - _("Could not parse HTTP response")); - return FALSE; - } - - soup_message_set_status (msg, status, reason_phrase); - g_free (reason_phrase); - - if (version < soup_message_get_http_version (msg)) - soup_message_set_http_version (msg, version); - - if ((soup_message_get_method (msg) == SOUP_METHOD_HEAD || - soup_message_get_status (msg) == SOUP_STATUS_NO_CONTENT || - soup_message_get_status (msg) == SOUP_STATUS_NOT_MODIFIED || - SOUP_STATUS_IS_INFORMATIONAL (soup_message_get_status (msg))) || - (soup_message_get_method (msg) == SOUP_METHOD_CONNECT && - SOUP_STATUS_IS_SUCCESSFUL (soup_message_get_status (msg)))) - *encoding = SOUP_ENCODING_NONE; - else - *encoding = soup_message_headers_get_encoding (soup_message_get_response_headers (msg)); - - if (*encoding == SOUP_ENCODING_UNRECOGNIZED) { - g_set_error_literal (error, SOUP_SESSION_ERROR, - SOUP_SESSION_ERROR_ENCODING, - _("Unrecognized HTTP response encoding")); - return FALSE; - } - - return TRUE; -} - -static void -response_network_stream_read_data_cb (SoupMessage *msg, - guint count) -{ - SoupClientMessageIOData *client_io = (SoupClientMessageIOData *)soup_message_get_io_data (msg); - - if (client_io->base.read_state < SOUP_MESSAGE_IO_STATE_BODY_START) - client_io->metrics->response_header_bytes_received += count; - else - client_io->metrics->response_body_bytes_received += count; -} - -/* Attempts to push forward the reading side of @msg's I/O. Returns - * %TRUE if it manages to make some progress, and it is likely that - * further progress can be made. Returns %FALSE if it has reached a - * stopping point of some sort (need input from the application, - * socket not readable, read is complete, etc). - */ -static gboolean -io_read (SoupClientMessageIOData *client_io, - gboolean blocking, - GCancellable *cancellable, - GError **error) -{ - SoupMessageIOData *io = &client_io->base; - SoupMessage *msg = client_io->item->msg; - gboolean succeeded; - gboolean is_first_read; - gushort extra_bytes; - - switch (io->read_state) { - case SOUP_MESSAGE_IO_STATE_HEADERS: - is_first_read = io->read_header_buf->len == 0 && - soup_message_get_status (msg) == SOUP_STATUS_NONE; - - if (!soup_message_io_data_read_headers (io, blocking, cancellable, &extra_bytes, error)) - return FALSE; - - if (client_io->metrics) { - /* Adjust the header and body bytes received, since we might - * have read part of the body already that is queued by the stream. - */ - if (client_io->metrics->response_header_bytes_received > io->read_header_buf->len + extra_bytes) { - client_io->metrics->response_body_bytes_received = - client_io->metrics->response_header_bytes_received - io->read_header_buf->len - extra_bytes; - client_io->metrics->response_header_bytes_received -= client_io->metrics->response_body_bytes_received; - } - } - - if (is_first_read) - soup_message_set_metrics_timestamp (msg, SOUP_MESSAGE_METRICS_RESPONSE_START); - - succeeded = parse_headers (msg, - (char *)io->read_header_buf->data, - io->read_header_buf->len, - &io->read_encoding, - error); - g_byte_array_set_size (io->read_header_buf, 0); - - if (!succeeded) { - /* Either we couldn't parse the headers, or they - * indicated something that would mean we wouldn't - * be able to parse the body. (Eg, unknown - * Transfer-Encoding.). Skip the rest of the - * reading, and make sure the connection gets - * closed when we're done. - */ - soup_message_headers_append (soup_message_get_request_headers (msg), - "Connection", "close"); - soup_message_set_metrics_timestamp (msg, SOUP_MESSAGE_METRICS_RESPONSE_END); - io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING; - break; - } - - if (SOUP_STATUS_IS_INFORMATIONAL (soup_message_get_status (msg))) { - if (soup_message_get_status (msg) == SOUP_STATUS_CONTINUE && - io->write_state == SOUP_MESSAGE_IO_STATE_BLOCKING) { - /* Pause the reader, unpause the writer */ - io->read_state = - SOUP_MESSAGE_IO_STATE_BLOCKING; - io->write_state = - SOUP_MESSAGE_IO_STATE_BODY_START; - } else { - /* Just stay in HEADERS */ - io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS; - } - - /* Informational responses have no bodies, so - * bail out here rather than parsing encoding, etc - */ - soup_message_got_informational (msg); - - /* If this was "101 Switching Protocols", then - * the session may have stolen the connection... - */ - if (client_io != (SoupClientMessageIOData *)soup_message_get_io_data (msg)) - return FALSE; - - soup_message_cleanup_response (msg); - break; - } else { - io->read_state = SOUP_MESSAGE_IO_STATE_BODY_START; - - /* If the client was waiting for a Continue - * but got something else, then it's done - * writing. - */ - if (io->write_state == SOUP_MESSAGE_IO_STATE_BLOCKING) - io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING; - } - - if (io->read_encoding == SOUP_ENCODING_CONTENT_LENGTH) { - io->read_length = soup_message_headers_get_content_length (soup_message_get_response_headers (msg)); - - if (!soup_message_is_keepalive (msg)) { - /* Some servers suck and send - * incorrect Content-Length values, so - * allow EOF termination in this case - * (iff the message is too short) too. - */ - io->read_encoding = SOUP_ENCODING_EOF; - } - } else - io->read_length = -1; - - soup_message_got_headers (msg); - break; - - case SOUP_MESSAGE_IO_STATE_BODY_START: - if (!io->body_istream) { - GInputStream *body_istream = soup_body_input_stream_new (G_INPUT_STREAM (io->istream), - io->read_encoding, - io->read_length); - - io->body_istream = soup_message_setup_body_istream (body_istream, msg, - client_io->item->session, - SOUP_STAGE_MESSAGE_BODY); - g_object_unref (body_istream); - } - - if (soup_message_get_content_sniffer (msg)) { - SoupContentSnifferStream *sniffer_stream = SOUP_CONTENT_SNIFFER_STREAM (io->body_istream); - const char *content_type; - GHashTable *params; - - if (!soup_content_sniffer_stream_is_ready (sniffer_stream, blocking, - cancellable, error)) - return FALSE; - - content_type = soup_content_sniffer_stream_sniff (sniffer_stream, ¶ms); - soup_message_content_sniffed (msg, content_type, params); - } - - io->read_state = SOUP_MESSAGE_IO_STATE_BODY; - break; - - case SOUP_MESSAGE_IO_STATE_BODY: { - guchar buf[RESPONSE_BLOCK_SIZE]; - gssize nread; - - nread = g_pollable_stream_read (io->body_istream, - buf, - RESPONSE_BLOCK_SIZE, - blocking, - cancellable, error); - if (nread == -1) - return FALSE; - - if (nread == 0) - io->read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE; - - if (client_io->metrics) - client_io->metrics->response_body_size += nread; - - break; - } - - case SOUP_MESSAGE_IO_STATE_BODY_DONE: - io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING; - soup_message_set_metrics_timestamp (msg, SOUP_MESSAGE_METRICS_RESPONSE_END); - soup_message_got_body (msg); - break; - - case SOUP_MESSAGE_IO_STATE_FINISHING: - io->read_state = SOUP_MESSAGE_IO_STATE_DONE; - break; - - default: - g_return_val_if_reached (FALSE); - } - - return TRUE; -} - -static gboolean -request_is_restartable (SoupMessage *msg, GError *error) -{ - SoupClientMessageIOData *client_io = (SoupClientMessageIOData *)soup_message_get_io_data (msg); - SoupMessageIOData *io; - - if (!client_io) - return FALSE; - - io = &client_io->base; - - return (io->read_state <= SOUP_MESSAGE_IO_STATE_HEADERS && - io->read_header_buf->len == 0 && - soup_connection_get_ever_used (soup_message_get_connection (client_io->item->msg)) && - !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT) && - !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) && - !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) && - error->domain != G_TLS_ERROR && - SOUP_METHOD_IS_IDEMPOTENT (soup_message_get_method (msg))); -} - -static gboolean -io_run_until (SoupClientMessageIOData *client_io, - gboolean blocking, - SoupMessageIOState read_state, - SoupMessageIOState write_state, - GCancellable *cancellable, - GError **error) -{ - SoupMessageIOData *io = &client_io->base; - SoupMessage *msg = client_io->item->msg; - gboolean progress = TRUE, done; - GError *my_error = NULL; - - if (g_cancellable_set_error_if_cancelled (cancellable, error)) - return FALSE; - else if (!io) { - g_set_error_literal (error, G_IO_ERROR, - G_IO_ERROR_CANCELLED, - _("Operation was cancelled")); - return FALSE; - } - - g_object_ref (msg); - - while (progress && (SoupClientMessageIOData *)soup_message_get_io_data (msg) == client_io && - !io->paused && !io->async_wait && - (io->read_state < read_state || io->write_state < write_state)) { - - if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state)) - progress = io_read (client_io, blocking, cancellable, &my_error); - else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state)) - progress = io_write (client_io, blocking, cancellable, &my_error); - else - progress = FALSE; - } - - if (my_error) { - g_propagate_error (error, my_error); - g_object_unref (msg); - return FALSE; - } else if ((SoupClientMessageIOData *)soup_message_get_io_data (msg) != client_io) { - g_set_error_literal (error, G_IO_ERROR, - G_IO_ERROR_CANCELLED, - _("Operation was cancelled")); - g_object_unref (msg); - return FALSE; - } else if (!io->async_wait && - g_cancellable_set_error_if_cancelled (cancellable, error)) { - g_object_unref (msg); - return FALSE; - } - - done = (io->read_state >= read_state && - io->write_state >= write_state); - - if (!blocking && !done) { - g_set_error_literal (error, G_IO_ERROR, - G_IO_ERROR_WOULD_BLOCK, - _("Operation would block")); - g_object_unref (msg); - return FALSE; - } - -#ifdef HAVE_SYSPROF - /* Allow profiling of network requests. */ - if (io->read_state == SOUP_MESSAGE_IO_STATE_DONE && - io->write_state == SOUP_MESSAGE_IO_STATE_DONE) { - GUri *uri = soup_message_get_uri (msg); - char *uri_str = g_uri_to_string_partial (uri, G_URI_HIDE_PASSWORD); - const gchar *last_modified = soup_message_headers_get_one (soup_message_get_response_headers (msg), "Last-Modified"); - const gchar *etag = soup_message_headers_get_one (soup_message_get_response_headers (msg), "ETag"); - const gchar *if_modified_since = soup_message_headers_get_one (soup_message_get_request_headers (msg), "If-Modified-Since"); - const gchar *if_none_match = soup_message_headers_get_one (soup_message_get_request_headers (msg), "If-None-Match"); - - /* FIXME: Expand and generalise sysprof support: - * https://gitlab.gnome.org/GNOME/sysprof/-/issues/43 */ - sysprof_collector_mark_printf (client_io->begin_time_nsec, - SYSPROF_CAPTURE_CURRENT_TIME - client_io->begin_time_nsec, - "libsoup", "message", - "%s request/response to %s: " - "read %" G_GOFFSET_FORMAT "B, " - "wrote %" G_GOFFSET_FORMAT "B, " - "If-Modified-Since: %s, " - "If-None-Match: %s, " - "Last-Modified: %s, " - "ETag: %s", - soup_message_get_tls_peer_certificate (msg) ? "HTTPS" : "HTTP", - uri_str, io->read_length, io->write_length, - (if_modified_since != NULL) ? if_modified_since : "(unset)", - (if_none_match != NULL) ? if_none_match : "(unset)", - (last_modified != NULL) ? last_modified : "(unset)", - (etag != NULL) ? etag : "(unset)"); - g_free (uri_str); - } -#endif /* HAVE_SYSPROF */ - - g_object_unref (msg); - return done; -} - -static void -soup_message_io_finish (SoupMessage *msg, - GError *error) -{ - if (request_is_restartable (msg, error)) { - SoupClientMessageIOData *io = (SoupClientMessageIOData *)soup_message_get_io_data (msg); - - /* Connection got closed, but we can safely try again. */ - io->item->state = SOUP_MESSAGE_RESTARTING; - } else if (error) { - soup_message_set_metrics_timestamp (msg, SOUP_MESSAGE_METRICS_RESPONSE_END); - } - - soup_message_io_finished (msg); -} - -static void soup_client_message_io_data_run (SoupClientMessageIO *iface, gboolean blocking); - -static gboolean -io_run_ready (SoupMessage *msg, gpointer user_data) -{ - soup_client_message_io_data_run (soup_message_get_io_data (msg), FALSE); - return FALSE; -} - -static void -soup_client_message_io_data_run (SoupClientMessageIO *iface, - gboolean blocking) -{ - SoupClientMessageIOData *client_io = (SoupClientMessageIOData *)iface; - SoupMessageIOData *io = &client_io->base; - SoupMessage *msg = client_io->item->msg; - GError *error = NULL; - - if (io->io_source) { - g_source_destroy (io->io_source); - g_source_unref (io->io_source); - io->io_source = NULL; - } - - g_object_ref (msg); - - if (io_run_until (client_io, blocking, - SOUP_MESSAGE_IO_STATE_DONE, - SOUP_MESSAGE_IO_STATE_DONE, - client_io->item->cancellable, &error)) { - soup_message_io_finished (msg); - } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { - g_clear_error (&error); - io->io_source = soup_message_io_data_get_source (io, G_OBJECT (msg), - client_io->item->cancellable, - (SoupMessageIOSourceFunc)io_run_ready, - NULL); - g_source_set_priority (io->io_source, - soup_client_message_io_data_get_priority (client_io)); - g_source_attach (io->io_source, g_main_context_get_thread_default ()); - } else { - if ((SoupClientMessageIOData *)soup_message_get_io_data (msg) == client_io) - soup_message_io_finish (msg, error); - g_error_free (error); - - } - - g_object_unref (msg); -} - -static gboolean -soup_client_message_io_data_run_until_read (SoupClientMessageIO *iface, - GCancellable *cancellable, - GError **error) -{ - SoupClientMessageIOData *io = (SoupClientMessageIOData *)iface; - SoupMessage *msg = io->item->msg; - - if (io_run_until (io, TRUE, - SOUP_MESSAGE_IO_STATE_BODY, - SOUP_MESSAGE_IO_STATE_ANY, - cancellable, error)) - return TRUE; - - if ((SoupClientMessageIOData *)soup_message_get_io_data (msg) == io) - soup_message_io_finish (msg, *error); - - return FALSE; -} - -static void io_run_until_read_async (SoupClientMessageIOData *io, GTask *task); - -static gboolean -io_run_until_read_ready (SoupMessage *msg, - gpointer user_data) -{ - GTask *task = user_data; - - io_run_until_read_async ((SoupClientMessageIOData *)soup_message_get_io_data (msg), task); - return FALSE; -} - -static void -io_run_until_read_async (SoupClientMessageIOData *client_io, - GTask *task) -{ - SoupMessageIOData *io = &client_io->base; - SoupMessage *msg = client_io->item->msg; - GError *error = NULL; - - if (io->io_source) { - g_source_destroy (io->io_source); - g_source_unref (io->io_source); - io->io_source = NULL; - } - - if (io_run_until (client_io, FALSE, - SOUP_MESSAGE_IO_STATE_BODY, - SOUP_MESSAGE_IO_STATE_ANY, - g_task_get_cancellable (task), - &error)) { - g_task_return_boolean (task, TRUE); - g_object_unref (task); - return; - } - - if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { - g_error_free (error); - io->io_source = soup_message_io_data_get_source (io, G_OBJECT (msg), g_task_get_cancellable (task), - (SoupMessageIOSourceFunc)io_run_until_read_ready, - task); - g_source_set_priority (io->io_source, g_task_get_priority (task)); - g_source_attach (io->io_source, g_main_context_get_thread_default ()); - return; - } - - if ((SoupClientMessageIOData *)soup_message_get_io_data (msg) == client_io) - soup_message_io_finish (msg, error); - - g_task_return_error (task, error); - g_object_unref (task); -} - -static void -soup_client_message_io_data_run_until_read_async (SoupClientMessageIO *iface, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer user_data) -{ - SoupClientMessageIOData *io = (SoupClientMessageIOData *)iface; - SoupMessage *msg = io->item->msg; - GTask *task; - - task = g_task_new (msg, cancellable, callback, user_data); - g_task_set_priority (task, io_priority); - io_run_until_read_async (io, task); -} - -static gboolean -soup_client_message_io_data_run_until_finish (SoupClientMessageIO *iface, - gboolean blocking, - GCancellable *cancellable, - GError **error) -{ - SoupClientMessageIOData *io = (SoupClientMessageIOData *)iface; - SoupMessage *msg = io->item->msg; - gboolean success; - - g_object_ref (msg); - - if (io) { - if (io->base.read_state < SOUP_MESSAGE_IO_STATE_BODY_DONE) - io->base.read_state = SOUP_MESSAGE_IO_STATE_FINISHING; - } - - success = io_run_until (io, blocking, - SOUP_MESSAGE_IO_STATE_DONE, - SOUP_MESSAGE_IO_STATE_DONE, - cancellable, error); - - g_object_unref (msg); - return success; -} - -static void -client_stream_eof (SoupClientInputStream *stream, - SoupClientMessageIOData *io) -{ - if (io && io->base.read_state == SOUP_MESSAGE_IO_STATE_BODY) - io->base.read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE; -} - -static GInputStream * -soup_client_message_io_data_get_response_stream (SoupClientMessageIO *iface, - GError **error) -{ - SoupClientMessageIOData *io = (SoupClientMessageIOData *)iface; - SoupMessage *msg = io->item->msg; - GInputStream *client_stream; - - client_stream = soup_client_input_stream_new (io->base.body_istream, msg); - g_signal_connect (client_stream, "eof", - G_CALLBACK (client_stream_eof), io); - - return client_stream; -} - -static void -soup_client_message_io_data_send_item (SoupClientMessageIO *iface, - SoupMessageQueueItem *item, - SoupMessageIOCompletionFn completion_cb, - gpointer user_data) -{ - SoupClientMessageIOData *io = (SoupClientMessageIOData *)iface; - - io->item = soup_message_queue_item_ref (item); - io->base.completion_cb = completion_cb; - io->base.completion_data = user_data; - - io->metrics = soup_message_get_metrics (io->item->msg); - if (io->metrics) { - g_signal_connect_object (io->base.istream, "read-data", - G_CALLBACK (response_network_stream_read_data_cb), - io->item->msg, G_CONNECT_SWAPPED); - } - -#ifdef HAVE_SYSPROF - io->begin_time_nsec = SYSPROF_CAPTURE_CURRENT_TIME; -#endif -} - -static void -soup_client_message_io_data_pause (SoupClientMessageIO *iface) -{ - SoupClientMessageIOData *io = (SoupClientMessageIOData *)iface; - - g_return_if_fail (io->base.read_state < SOUP_MESSAGE_IO_STATE_BODY); - - soup_message_io_data_pause (&io->base); -} - -static void -soup_client_message_io_data_unpause (SoupClientMessageIO *iface) -{ - SoupClientMessageIOData *io = (SoupClientMessageIOData *)iface; - - g_return_if_fail (io->base.read_state < SOUP_MESSAGE_IO_STATE_BODY); - io->base.paused = FALSE; -} - -static gboolean -soup_client_message_io_data_is_paused (SoupClientMessageIO *iface) -{ - SoupClientMessageIOData *io = (SoupClientMessageIOData *)iface; - - return io->base.paused; -} - -static const SoupClientMessageIOFuncs io_funcs = { - soup_client_message_io_data_destroy, - soup_client_message_io_data_finished, - soup_client_message_io_data_stolen, - soup_client_message_io_data_send_item, - soup_client_message_io_data_get_response_stream, - soup_client_message_io_data_pause, - soup_client_message_io_data_unpause, - soup_client_message_io_data_is_paused, - soup_client_message_io_data_run, - soup_client_message_io_data_run_until_read, - soup_client_message_io_data_run_until_read_async, - soup_client_message_io_data_run_until_finish -}; - -SoupClientMessageIO * -soup_client_message_io_data_new (GIOStream *stream) -{ - SoupClientMessageIOData *io; - - io = g_slice_new0 (SoupClientMessageIOData); - io->base.iostream = g_object_ref (stream); - io->base.istream = SOUP_FILTER_INPUT_STREAM (g_io_stream_get_input_stream (io->base.iostream)); - io->base.ostream = g_io_stream_get_output_stream (io->base.iostream); - - io->base.read_header_buf = g_byte_array_new (); - io->base.write_buf = g_string_new (NULL); - - io->base.read_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED; - io->base.write_state = SOUP_MESSAGE_IO_STATE_HEADERS; - - io->iface.funcs = &io_funcs; - - return (SoupClientMessageIO *)io; -} diff --git a/libsoup/soup-message-private.h b/libsoup/soup-message-private.h index d8cfa0bb..4efba24e 100644 --- a/libsoup/soup-message-private.h +++ b/libsoup/soup-message-private.h @@ -45,7 +45,6 @@ SoupAuth *soup_message_get_proxy_auth (SoupMessage *msg); GUri *soup_message_get_uri_for_auth (SoupMessage *msg); /* I/O */ -SoupClientMessageIO *soup_client_message_io_data_new (GIOStream *stream); void soup_message_io_run (SoupMessage *msg, gboolean blocking); void soup_message_io_finished (SoupMessage *msg); |