summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Eissing <stefan@eissing.org>2023-03-30 12:13:49 +0200
committerDaniel Stenberg <daniel@haxx.se>2023-03-30 23:11:26 +0200
commit744dcf22fac6cf12a9112df106b61864982afef9 (patch)
tree7fa14c9fc43196508e9c08327184601496118c9b
parent4ced75b7cee64cdf797eaa24a76d29324029ed15 (diff)
downloadcurl-744dcf22fac6cf12a9112df106b61864982afef9.tar.gz
http2: flow control and buffer improvements
- use bufq for send/receive of network data - usd bufq for send/receive of stream data - use HTTP/2 flow control with no-auto updates to control the amount of data we are buffering for a stream HTTP/2 stream window set to 128K after local tests, defined code constant for now - elminiating PAUSEing nghttp2 processing when receiving data since a stream can now take in all DATA nghttp2 forwards Improved scorecard and adjuste http2 stream window sizes - scorecard improved output formatting and options default - scorecard now also benchmarks small requests / second Closes #10771
-rw-r--r--lib/bufq.c35
-rw-r--r--lib/bufq.h4
-rw-r--r--lib/http.c8
-rw-r--r--lib/http.h17
-rw-r--r--lib/http2.c1097
-rw-r--r--tests/data/test18002
-rw-r--r--tests/http/scorecard.py254
-rw-r--r--tests/http/test_07_upload.py19
-rw-r--r--tests/http/testenv/caddy.py4
-rw-r--r--tests/http/testenv/env.py2
10 files changed, 706 insertions, 736 deletions
diff --git a/lib/bufq.c b/lib/bufq.c
index e0d772661..f0ab6bb75 100644
--- a/lib/bufq.c
+++ b/lib/bufq.c
@@ -287,12 +287,6 @@ bool Curl_bufq_is_full(const struct bufq *q)
return chunk_is_full(q->tail);
}
-static size_t data_pass_size(struct bufq *q)
-{
- (void)q;
- return 4*1024;
-}
-
static struct buf_chunk *get_spare(struct bufq *q)
{
struct buf_chunk *chunk = NULL;
@@ -426,9 +420,12 @@ ssize_t Curl_bufq_read(struct bufq *q, unsigned char *buf, size_t len,
return nread;
}
-bool Curl_bufq_peek(const struct bufq *q,
+bool Curl_bufq_peek(struct bufq *q,
const unsigned char **pbuf, size_t *plen)
{
+ if(q->head && chunk_is_empty(q->head)) {
+ prune_head(q);
+ }
if(q->head && !chunk_is_empty(q->head)) {
chunk_peek(q->head, pbuf, plen);
return TRUE;
@@ -438,7 +435,7 @@ bool Curl_bufq_peek(const struct bufq *q,
return FALSE;
}
-bool Curl_bufq_peek_at(const struct bufq *q, size_t offset,
+bool Curl_bufq_peek_at(struct bufq *q, size_t offset,
const unsigned char **pbuf, size_t *plen)
{
struct buf_chunk *c = q->head;
@@ -502,13 +499,11 @@ ssize_t Curl_bufq_write_pass(struct bufq *q,
CURLcode *err)
{
ssize_t nwritten = 0, n;
- bool prefer_direct = (len >= data_pass_size(q));
*err = CURLE_OK;
while(len) {
- if(Curl_bufq_is_full(q) || (!Curl_bufq_is_empty(q) && prefer_direct)) {
- /* try to make room in case we are full
- * or empty the buffer when adding "large" data */
+ if(Curl_bufq_is_full(q)) {
+ /* try to make room in case we are full */
n = Curl_bufq_pass(q, writer, writer_ctx, err);
if(n < 0) {
if(*err != CURLE_AGAIN) {
@@ -519,22 +514,6 @@ ssize_t Curl_bufq_write_pass(struct bufq *q,
}
}
- if(Curl_bufq_is_empty(q) && prefer_direct) {
- /* empty and `data` is "large", try passing directly */
- n = writer(writer_ctx, buf, len, err);
- if(n < 0) {
- if(*err != CURLE_AGAIN) {
- /* real error, fail */
- return -1;
- }
- /* passing would block */
- n = 0;
- }
- buf += (size_t)n;
- len -= (size_t)n;
- nwritten += (size_t)n;
- }
-
if(len) {
/* Add whatever is remaining now to bufq */
n = Curl_bufq_write(q, buf, len, err);
diff --git a/lib/bufq.h b/lib/bufq.h
index a4ca21ece..09af226a9 100644
--- a/lib/bufq.h
+++ b/lib/bufq.h
@@ -201,10 +201,10 @@ ssize_t Curl_bufq_read(struct bufq *q, unsigned char *buf, size_t len,
* Repeated calls return the same information until the buffer queue
* is modified, see `Curl_bufq_skip()``
*/
-bool Curl_bufq_peek(const struct bufq *q,
+bool Curl_bufq_peek(struct bufq *q,
const unsigned char **pbuf, size_t *plen);
-bool Curl_bufq_peek_at(const struct bufq *q, size_t offset,
+bool Curl_bufq_peek_at(struct bufq *q, size_t offset,
const unsigned char **pbuf, size_t *plen);
/**
diff --git a/lib/http.c b/lib/http.c
index bcaa79487..cc2f5f057 100644
--- a/lib/http.c
+++ b/lib/http.c
@@ -4556,7 +4556,8 @@ CURLcode Curl_http_req_make(struct http_req **preq,
if(!req->path)
goto out;
}
- Curl_dynhds_init(&req->headers, 128, DYN_H2_HEADERS);
+ Curl_dynhds_init(&req->headers, 0, DYN_H2_HEADERS);
+ Curl_dynhds_init(&req->trailers, 0, DYN_H2_TRAILERS);
result = CURLE_OK;
out:
@@ -4573,6 +4574,7 @@ void Curl_http_req_free(struct http_req *req)
free(req->authority);
free(req->path);
Curl_dynhds_free(&req->headers);
+ Curl_dynhds_free(&req->trailers);
free(req);
}
}
@@ -4594,7 +4596,8 @@ CURLcode Curl_http_resp_make(struct http_resp **presp,
if(!resp->description)
goto out;
}
- Curl_dynhds_init(&resp->headers, 128, DYN_H2_HEADERS);
+ Curl_dynhds_init(&resp->headers, 0, DYN_H2_HEADERS);
+ Curl_dynhds_init(&resp->trailers, 0, DYN_H2_TRAILERS);
result = CURLE_OK;
out:
@@ -4609,6 +4612,7 @@ void Curl_http_resp_free(struct http_resp *resp)
if(resp) {
free(resp->description);
Curl_dynhds_free(&resp->headers);
+ Curl_dynhds_free(&resp->trailers);
if(resp->prev)
Curl_http_resp_free(resp->prev);
free(resp);
diff --git a/lib/http.h b/lib/http.h
index 5f4fcb904..b9a2e6149 100644
--- a/lib/http.h
+++ b/lib/http.h
@@ -29,6 +29,7 @@
#include <pthread.h>
#endif
+#include "bufq.h"
#include "dynhds.h"
#include "ws.h"
@@ -227,14 +228,12 @@ struct HTTP {
#ifdef USE_NGHTTP2
/*********** for HTTP/2 we store stream-local data here *************/
int32_t stream_id; /* stream we are interested in */
-
- /* We store non-final and final response headers here, per-stream */
- struct dynbuf header_recvbuf;
- size_t nread_header_recvbuf; /* number of bytes in header_recvbuf fed into
- upper layer */
- struct dynbuf trailer_recvbuf;
- const uint8_t *pausedata; /* pointer to data received in on_data_chunk */
- size_t pauselen; /* the number of bytes left in data */
+ struct bufq h2_sendbuf; /* request body data buffere for sending */
+ size_t h2_send_hds_len; /* amount of bytes in first cf_send() that
+ are header bytes. Or 0 if not known. */
+ struct bufq h2_recvbuf;
+ size_t h2_recv_hds_len; /* how many bytes in recvbuf are headers */
+ struct dynhds resp_trailers;
bool close_handled; /* TRUE if stream closure is handled by libcurl */
char **push_headers; /* allocated array */
@@ -346,6 +345,7 @@ struct http_req {
char *authority;
char *path;
struct dynhds headers;
+ struct dynhds trailers;
};
/**
@@ -366,6 +366,7 @@ struct http_resp {
int status;
char *description;
struct dynhds headers;
+ struct dynhds trailers;
struct http_resp *prev;
};
diff --git a/lib/http2.c b/lib/http2.c
index b0ce87d98..f43462d70 100644
--- a/lib/http2.c
+++ b/lib/http2.c
@@ -27,6 +27,7 @@
#ifdef USE_NGHTTP2
#include <nghttp2/nghttp2.h>
#include "urldata.h"
+#include "bufq.h"
#include "http2.h"
#include "http.h"
#include "sendf.h"
@@ -48,8 +49,6 @@
#include "curl_memory.h"
#include "memdebug.h"
-#define H2_BUFSIZE 32768
-
#if (NGHTTP2_VERSION_NUM < 0x010c00)
#error too old nghttp2 version, upgrade!
#endif
@@ -62,8 +61,23 @@
#define NGHTTP2_HAS_SET_LOCAL_WINDOW_SIZE 1
#endif
-#define HTTP2_HUGE_WINDOW_SIZE (32 * 1024 * 1024) /* 32 MB */
+/* buffer dimensioning:
+ * use 16K as chunk size, as that fits H2 DATA frames well */
+#define H2_CHUNK_SIZE (16 * 1024)
+/* this is how much we want "in flight" for a stream */
+#define H2_STREAM_WINDOW_SIZE (512 * 1024)
+/* on receving from TLS, we prep for holding a full stream window */
+#define H2_NW_RECV_CHUNKS (H2_STREAM_WINDOW_SIZE / H2_CHUNK_SIZE)
+/* on send into TLS, we just want to accumulate small frames */
+#define H2_NW_SEND_CHUNKS 1
+/* stream recv/send chunks are a result of window / chunk sizes */
+#define H2_STREAM_RECV_CHUNKS (H2_STREAM_WINDOW_SIZE / H2_CHUNK_SIZE)
+#define H2_STREAM_SEND_CHUNKS (H2_STREAM_WINDOW_SIZE / H2_CHUNK_SIZE)
+/* spare chunks we keep for a full window */
+#define H2_STREAM_POOL_SPARES (H2_STREAM_WINDOW_SIZE / H2_CHUNK_SIZE)
+
+#define HTTP2_HUGE_WINDOW_SIZE (16 * H2_STREAM_WINDOW_SIZE)
#define H2_SETTINGS_IV_LEN 3
#define H2_BINSETTINGS_LEN 80
@@ -75,7 +89,7 @@ static int populate_settings(nghttp2_settings_entry *iv,
iv[0].value = Curl_multi_max_concurrent_streams(data->multi);
iv[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
- iv[1].value = HTTP2_HUGE_WINDOW_SIZE;
+ iv[1].value = H2_STREAM_WINDOW_SIZE;
iv[2].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH;
iv[2].value = data->multi->push_cb != NULL;
@@ -101,22 +115,14 @@ struct cf_h2_ctx {
/* The easy handle used in the current filter call, cleared at return */
struct cf_call_data call_data;
- char *inbuf; /* buffer to receive data from underlying socket */
- size_t inbuflen; /* number of bytes filled in inbuf */
- size_t nread_inbuf; /* number of bytes read from in inbuf */
-
- struct dynbuf outbuf;
+ struct bufq inbufq; /* network input */
+ struct bufq outbufq; /* network output */
+ struct bufc_pool stream_bufcp; /* spares for stream buffers */
- /* We need separate buffer for transmission and reception because we
- may call nghttp2_session_send() after the
- nghttp2_session_mem_recv() but mem buffer is still not full. In
- this case, we wrongly sends the content of mem buffer if we share
- them for both cases. */
- int32_t pause_stream_id; /* stream ID which paused
- nghttp2_session_mem_recv */
size_t drain_total; /* sum of all stream's UrlState.drain */
int32_t goaway_error;
int32_t last_stream_id;
+ BIT(conn_closed);
BIT(goaway);
BIT(enable_push);
};
@@ -133,8 +139,9 @@ static void cf_h2_ctx_clear(struct cf_h2_ctx *ctx)
if(ctx->h2) {
nghttp2_session_del(ctx->h2);
}
- free(ctx->inbuf);
- Curl_dyn_free(&ctx->outbuf);
+ Curl_bufq_free(&ctx->inbufq);
+ Curl_bufq_free(&ctx->outbufq);
+ Curl_bufcp_free(&ctx->stream_bufcp);
memset(ctx, 0, sizeof(*ctx));
ctx->call_data = save;
}
@@ -151,22 +158,42 @@ static int h2_client_new(struct Curl_cfilter *cf,
nghttp2_session_callbacks *cbs)
{
struct cf_h2_ctx *ctx = cf->ctx;
-
-#if NGHTTP2_VERSION_NUM < 0x013200
- /* before 1.50.0 */
- return nghttp2_session_client_new(&ctx->h2, cbs, cf);
-#else
nghttp2_option *o;
+
int rc = nghttp2_option_new(&o);
if(rc)
return rc;
+ /* We handle window updates ourself to enfore buffer limits */
+ nghttp2_option_set_no_auto_window_update(o, 1);
+#if NGHTTP2_VERSION_NUM >= 0x013200
+ /* with 1.50.0 */
/* turn off RFC 9113 leading and trailing white spaces validation against
HTTP field value. */
nghttp2_option_set_no_rfc9113_leading_and_trailing_ws_validation(o, 1);
+#endif
rc = nghttp2_session_client_new2(&ctx->h2, cbs, cf, o);
nghttp2_option_del(o);
return rc;
-#endif
+}
+
+static ssize_t nw_in_reader(void *reader_ctx,
+ unsigned char *buf, size_t buflen,
+ CURLcode *err)
+{
+ struct Curl_cfilter *cf = reader_ctx;
+ struct Curl_easy *data = CF_DATA_CURRENT(cf);
+
+ return Curl_conn_cf_recv(cf->next, data, (char *)buf, buflen, err);
+}
+
+static ssize_t nw_out_writer(void *writer_ctx,
+ const unsigned char *buf, size_t buflen,
+ CURLcode *err)
+{
+ struct Curl_cfilter *cf = writer_ctx;
+ struct Curl_easy *data = CF_DATA_CURRENT(cf);
+
+ return Curl_conn_cf_send(cf->next, data, (const char *)buf, buflen, err);
}
static ssize_t send_callback(nghttp2_session *h2,
@@ -204,6 +231,7 @@ static void multi_connchanged(struct Curl_multi *multi)
static CURLcode http2_data_setup(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
+ struct cf_h2_ctx *ctx = cf->ctx;
struct HTTP *stream = data->req.p.http;
(void)cf;
@@ -212,22 +240,19 @@ static CURLcode http2_data_setup(struct Curl_cfilter *cf,
stream->stream_id = -1;
- Curl_dyn_init(&stream->header_recvbuf, DYN_H2_HEADERS);
- Curl_dyn_init(&stream->trailer_recvbuf, DYN_H2_TRAILERS);
-
+ Curl_bufq_initp(&stream->h2_sendbuf, &ctx->stream_bufcp,
+ H2_STREAM_SEND_CHUNKS, BUFQ_OPT_NONE);
+ Curl_bufq_initp(&stream->h2_recvbuf, &ctx->stream_bufcp,
+ H2_STREAM_RECV_CHUNKS, BUFQ_OPT_SOFT_LIMIT);
+ Curl_dynhds_init(&stream->resp_trailers, 0, DYN_H2_TRAILERS);
+ stream->h2_send_hds_len = 0;
+ stream->h2_recv_hds_len = 0;
stream->bodystarted = FALSE;
stream->status_code = -1;
- stream->pausedata = NULL;
- stream->pauselen = 0;
stream->closed = FALSE;
stream->close_handled = FALSE;
- stream->memlen = 0;
stream->error = NGHTTP2_NO_ERROR;
stream->upload_left = 0;
- stream->upload_mem = NULL;
- stream->upload_len = 0;
- stream->mem = data->state.buffer;
- stream->len = data->set.buffer_size;
return CURLE_OK;
}
@@ -246,11 +271,10 @@ static CURLcode cf_h2_ctx_init(struct Curl_cfilter *cf,
nghttp2_session_callbacks *cbs = NULL;
DEBUGASSERT(!ctx->h2);
- ctx->inbuf = malloc(H2_BUFSIZE);
- if(!ctx->inbuf)
- goto out;
- /* we want to aggregate small frames, SETTINGS, PRIO, UPDATES */
- Curl_dyn_init(&ctx->outbuf, 4*1024);
+ Curl_bufcp_init(&ctx->stream_bufcp, H2_CHUNK_SIZE, H2_STREAM_POOL_SPARES);
+ Curl_bufq_initp(&ctx->inbufq, &ctx->stream_bufcp, H2_NW_RECV_CHUNKS, 0);
+ Curl_bufq_initp(&ctx->outbufq, &ctx->stream_bufcp, H2_NW_SEND_CHUNKS, 0);
+ ctx->last_stream_id = 2147483647;
rc = nghttp2_session_callbacks_new(&cbs);
if(rc) {
@@ -345,19 +369,14 @@ out:
return result;
}
-static CURLcode h2_session_send(struct Curl_cfilter *cf,
- struct Curl_easy *data);
-static int h2_process_pending_input(struct Curl_cfilter *cf,
- struct Curl_easy *data,
- CURLcode *err);
-
/*
* http2_stream_free() free HTTP2 stream related data
*/
static void http2_stream_free(struct HTTP *stream)
{
if(stream) {
- Curl_dyn_free(&stream->header_recvbuf);
+ Curl_bufq_free(&stream->h2_sendbuf);
+ Curl_bufq_free(&stream->h2_recvbuf);
for(; stream->push_headers_used > 0; --stream->push_headers_used) {
free(stream->push_headers[stream->push_headers_used - 1]);
}
@@ -376,6 +395,54 @@ static int should_close_session(struct cf_h2_ctx *ctx)
}
/*
+ * Processes pending input left in network input buffer.
+ * This function returns 0 if it succeeds, or -1 and error code will
+ * be assigned to *err.
+ */
+static int h2_process_pending_input(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ CURLcode *err)
+{
+ struct cf_h2_ctx *ctx = cf->ctx;
+ const unsigned char *buf;
+ size_t blen;
+ ssize_t rv;
+
+ while(Curl_bufq_peek(&ctx->inbufq, &buf, &blen)) {
+
+ rv = nghttp2_session_mem_recv(ctx->h2, (const uint8_t *)buf, blen);
+ DEBUGF(LOG_CF(data, cf,
+ "fed %zu bytes from nw to nghttp2 -> %zd", blen, rv));
+ if(rv < 0) {
+ failf(data,
+ "process_pending_input: nghttp2_session_mem_recv() returned "
+ "%zd:%s", rv, nghttp2_strerror((int)rv));
+ *err = CURLE_RECV_ERROR;
+ return -1;
+ }
+ Curl_bufq_skip(&ctx->inbufq, (size_t)rv);
+ if(Curl_bufq_is_empty(&ctx->inbufq)) {
+ DEBUGF(LOG_CF(data, cf, "all data in connection buffer processed"));
+ break;
+ }
+ else {
+ DEBUGF(LOG_CF(data, cf, "process_pending_input: %zu bytes left "
+ "in connection buffer", Curl_bufq_len(&ctx->inbufq)));
+ }
+ }
+
+ if(nghttp2_session_check_request_allowed(ctx->h2) == 0) {
+ /* No more requests are allowed in the current session, so
+ the connection may not be reused. This is set when a
+ GOAWAY frame has been received or when the limit of stream
+ identifiers has been reached. */
+ connclose(cf->conn, "http/2: No new requests allowed");
+ }
+
+ return 0;
+}
+
+/*
* The server may send us data at any point (e.g. PING frames). Therefore,
* we cannot assume that an HTTP/2 socket is dead just because it is readable.
*
@@ -401,13 +468,10 @@ static bool http2_connisalive(struct Curl_cfilter *cf, struct Curl_easy *data,
*input_pending = FALSE;
Curl_attach_connection(data, cf->conn);
- nread = Curl_conn_cf_recv(cf->next, data,
- ctx->inbuf, H2_BUFSIZE, &result);
+ nread = Curl_bufq_slurp(&ctx->inbufq, nw_in_reader, cf, &result);
if(nread != -1) {
- DEBUGF(LOG_CF(data, cf, "%d bytes stray data read before trying "
- "h2 connection", (int)nread));
- ctx->nread_inbuf = 0;
- ctx->inbuflen = nread;
+ DEBUGF(LOG_CF(data, cf, "%zd bytes stray data read before trying "
+ "h2 connection", nread));
if(h2_process_pending_input(cf, data, &result) < 0)
/* immediate error, considered dead */
alive = FALSE;
@@ -456,30 +520,23 @@ void Curl_http2_ver(char *p, size_t len)
(void)msnprintf(p, len, "nghttp2/%s", h2->version_str);
}
-static CURLcode flush_output(struct Curl_cfilter *cf,
+static CURLcode nw_out_flush(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct cf_h2_ctx *ctx = cf->ctx;
- size_t buflen = Curl_dyn_len(&ctx->outbuf);
- ssize_t written;
+ ssize_t nwritten;
CURLcode result;
- if(!buflen)
+ (void)data;
+ if(Curl_bufq_is_empty(&ctx->outbufq))
return CURLE_OK;
- DEBUGF(LOG_CF(data, cf, "h2 conn flush %zu bytes", buflen));
- written = Curl_conn_cf_send(cf->next, data, Curl_dyn_ptr(&ctx->outbuf),
- buflen, &result);
- if(written < 0) {
+ DEBUGF(LOG_CF(data, cf, "h2 conn flush %zu bytes",
+ Curl_bufq_len(&ctx->outbufq)));
+ nwritten = Curl_bufq_pass(&ctx->outbufq, nw_out_writer, cf, &result);
+ if(nwritten < 0 && result != CURLE_AGAIN) {
return result;
}
- if((size_t)written < buflen) {
- Curl_dyn_tail(&ctx->outbuf, buflen - (size_t)written);
- return CURLE_AGAIN;
- }
- else {
- Curl_dyn_reset(&ctx->outbuf);
- }
return CURLE_OK;
}
@@ -495,49 +552,27 @@ static ssize_t send_callback(nghttp2_session *h2,
struct Curl_cfilter *cf = userp;
struct cf_h2_ctx *ctx = cf->ctx;
struct Curl_easy *data = CF_DATA_CURRENT(cf);
- ssize_t written;
+ ssize_t nwritten;
CURLcode result = CURLE_OK;
- size_t buflen = Curl_dyn_len(&ctx->outbuf);
(void)h2;
(void)flags;
DEBUGASSERT(data);
- if(blen < 1024 && (buflen + blen + 1 < ctx->outbuf.toobig)) {
- result = Curl_dyn_addn(&ctx->outbuf, buf, blen);
- if(result) {
- failf(data, "Failed to add data to output buffer");
- return NGHTTP2_ERR_CALLBACK_FAILURE;
- }
- return blen;
- }
- if(buflen) {
- /* not adding, flush buffer */
- result = flush_output(cf, data);
- if(result) {
- if(result == CURLE_AGAIN) {
- return NGHTTP2_ERR_WOULDBLOCK;
- }
- failf(data, "Failed sending HTTP2 data");
- return NGHTTP2_ERR_CALLBACK_FAILURE;
+ nwritten = Curl_bufq_write_pass(&ctx->outbufq, buf, blen,
+ nw_out_writer, cf, &result);
+ if(nwritten < 0) {
+ if(result == CURLE_AGAIN) {
+ return NGHTTP2_ERR_WOULDBLOCK;
}
- }
-
- DEBUGF(LOG_CF(data, cf, "h2 conn send %zu bytes", blen));
- written = Curl_conn_cf_send(cf->next, data, buf, blen, &result);
- if(result == CURLE_AGAIN) {
- return NGHTTP2_ERR_WOULDBLOCK;
- }
-
- if(written == -1) {
failf(data, "Failed sending HTTP2 data");
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
- if(!written)
+ if(!nwritten)
return NGHTTP2_ERR_WOULDBLOCK;
- return written;
+ return nwritten;
}
@@ -779,17 +814,21 @@ static int push_promise(struct Curl_cfilter *cf,
}
rv = nghttp2_session_set_stream_user_data(ctx->h2,
- frame->promised_stream_id,
+ newstream->stream_id,
newhandle);
if(rv) {
infof(data, "failed to set user_data for stream %u",
- frame->promised_stream_id);
+ newstream->stream_id);
DEBUGASSERT(0);
rv = CURL_PUSH_DENY;
goto fail;
}
- Curl_dyn_init(&newstream->header_recvbuf, DYN_H2_HEADERS);
- Curl_dyn_init(&newstream->trailer_recvbuf, DYN_H2_TRAILERS);
+ Curl_bufq_initp(&newstream->h2_sendbuf, &ctx->stream_bufcp,
+ H2_STREAM_SEND_CHUNKS, BUFQ_OPT_NONE);
+ Curl_bufq_initp(&newstream->h2_recvbuf, &ctx->stream_bufcp,
+ H2_STREAM_RECV_CHUNKS, BUFQ_OPT_SOFT_LIMIT);
+ newstream->h2_send_hds_len = 0;
+ Curl_dynhds_init(&newstream->resp_trailers, 0, DYN_H2_TRAILERS);
}
else {
DEBUGF(LOG_CF(data, cf, "Got PUSH_PROMISE, ignore it"));
@@ -799,6 +838,25 @@ static int push_promise(struct Curl_cfilter *cf,
return rv;
}
+static CURLcode recvbuf_write_hds(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ const char *buf, size_t blen)
+{
+ struct HTTP *stream = data->req.p.http;
+ ssize_t nwritten;
+ CURLcode result;
+
+ (void)cf;
+ nwritten = Curl_bufq_write(&stream->h2_recvbuf,
+ (const unsigned char *)buf, blen, &result);
+ if(nwritten < 0)
+ return result;
+ stream->h2_recv_hds_len += (size_t)nwritten;
+ /* TODO: make sure recvbuf is more flexible with overflow */
+ DEBUGASSERT((size_t)nwritten == blen);
+ return CURLE_OK;
+}
+
static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame,
void *userp)
{
@@ -808,7 +866,6 @@ static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame,
struct HTTP *stream = NULL;
struct Curl_easy *data = CF_DATA_CURRENT(cf);
int rv;
- size_t left, ncopy;
int32_t stream_id = frame->hd.stream_id;
CURLcode result;
@@ -841,6 +898,8 @@ static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame,
ctx->goaway_error = frame->goaway.error_code;
ctx->last_stream_id = frame->goaway.last_stream_id;
if(data) {
+ DEBUGF(LOG_CF(data, cf, "recv GOAWAY, error=%d, last_stream=%u",
+ ctx->goaway_error, ctx->last_stream_id));
infof(data, "recveived GOAWAY, error=%d, last_stream=%u",
ctx->goaway_error, ctx->last_stream_id);
multi_connchanged(data->multi);
@@ -882,7 +941,7 @@ static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame,
if(frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
/* Stream has ended. If there is pending data, ensure that read
will occur to consume it. */
- if(!data->state.drain && stream->memlen) {
+ if(!data->state.drain && !Curl_bufq_is_empty(&stream->h2_recvbuf)) {
drain_this(cf, data_s);
Curl_expire(data, 0, EXPIRE_RUN_NOW);
}
@@ -908,29 +967,16 @@ static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame,
stream->status_code = -1;
}
- result = Curl_dyn_addn(&stream->header_recvbuf, STRCONST("\r\n"));
+ result = recvbuf_write_hds(cf, data_s, STRCONST("\r\n"));
if(result)
return NGHTTP2_ERR_CALLBACK_FAILURE;
- left = Curl_dyn_len(&stream->header_recvbuf) -
- stream->nread_header_recvbuf;
- ncopy = CURLMIN(stream->len, left);
-
- memcpy(&stream->mem[stream->memlen],
- Curl_dyn_ptr(&stream->header_recvbuf) +
- stream->nread_header_recvbuf,
- ncopy);
- stream->nread_header_recvbuf += ncopy;
-
- DEBUGASSERT(stream->mem);
- DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] %zu header bytes, at %p",
- stream_id, ncopy, (void *)stream->mem));
-
- stream->len -= ncopy;
- stream->memlen += ncopy;
-
- drain_this(cf, data_s);
- Curl_expire(data_s, 0, EXPIRE_RUN_NOW);
+ DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] %zu header bytes",
+ stream_id, Curl_bufq_len(&stream->h2_recvbuf)));
+ if(CF_DATA_CURRENT(cf) != data_s) {
+ drain_this(cf, data_s);
+ Curl_expire(data_s, 0, EXPIRE_RUN_NOW);
+ }
break;
case NGHTTP2_PUSH_PROMISE:
DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] recv PUSH_PROMISE", stream_id));
@@ -980,10 +1026,10 @@ static int on_data_chunk_recv(nghttp2_session *session, uint8_t flags,
const uint8_t *mem, size_t len, void *userp)
{
struct Curl_cfilter *cf = userp;
- struct cf_h2_ctx *ctx = cf->ctx;
struct HTTP *stream;
struct Curl_easy *data_s;
- size_t nread;
+ ssize_t nwritten;
+ CURLcode result;
(void)flags;
DEBUGASSERT(stream_id); /* should never be a zero stream ID here */
@@ -997,6 +1043,8 @@ static int on_data_chunk_recv(nghttp2_session *session, uint8_t flags,
in the pipeline. Silently ignore. */
DEBUGF(LOG_CF(CF_DATA_CURRENT(cf), cf, "[h2sid=%u] Data for unknown",
stream_id));
+ /* consumed explicitly as no one will read it */
+ nghttp2_session_consume(session, stream_id, len);
return 0;
}
@@ -1004,11 +1052,13 @@ static int on_data_chunk_recv(nghttp2_session *session, uint8_t flags,
if(!stream)
return NGHTTP2_ERR_CALLBACK_FAILURE;
- nread = CURLMIN(stream->len, len);
- memcpy(&stream->mem[stream->memlen], mem, nread);
+ nwritten = Curl_bufq_write(&stream->h2_recvbuf, mem, len, &result);
+ if(nwritten < 0) {
+ if(result != CURLE_AGAIN)
+ return NGHTTP2_ERR_CALLBACK_FAILURE;
- stream->len -= nread;
- stream->memlen += nread;
+ nwritten = 0;
+ }
/* if we receive data for another handle, wake that up */
if(CF_DATA_CURRENT(cf) != data_s) {
@@ -1016,20 +1066,10 @@ static int on_data_chunk_recv(nghttp2_session *session, uint8_t flags,
Curl_expire(data_s, 0, EXPIRE_RUN_NOW);
}
- DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] %zu DATA recvd, "
- "(buffer now holds %zu, %zu still free in %p)",
- stream_id, nread,
- stream->memlen, stream->len, (void *)stream->mem));
-
- if(nread < len) {
- stream->pausedata = mem + nread;
- stream->pauselen = len - nread;
- DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] %zu not recvd -> NGHTTP2_ERR_PAUSE",
- stream_id, len - nread));
- ctx->pause_stream_id = stream_id;
- drain_this(cf, data_s);
- return NGHTTP2_ERR_PAUSE;
- }
+ DEBUGASSERT((size_t)nwritten == len);
+ DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] %zd/%zu DATA recvd, "
+ "(buffer now holds %zu)",
+ stream_id, nwritten, len, Curl_bufq_len(&stream->h2_recvbuf)));
return 0;
}
@@ -1038,7 +1078,6 @@ static int on_stream_close(nghttp2_session *session, int32_t stream_id,
uint32_t error_code, void *userp)
{
struct Curl_cfilter *cf = userp;
- struct cf_h2_ctx *ctx = cf->ctx;
struct Curl_easy *data_s;
struct HTTP *stream;
int rv;
@@ -1074,11 +1113,6 @@ static int on_stream_close(nghttp2_session *session, int32_t stream_id,
stream_id);
DEBUGASSERT(0);
}
- if(stream_id == ctx->pause_stream_id) {
- DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] closed the pause stream",
- stream_id));
- ctx->pause_stream_id = 0;
- }
DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] closed now", stream_id));
return 0;
}
@@ -1110,33 +1144,6 @@ static int on_begin_headers(nghttp2_session *session,
return 0;
}
-/* Decode HTTP status code. Returns -1 if no valid status code was
- decoded. */
-static int decode_status_code(const uint8_t *value, size_t len)
-{
- int i;
- int res;
-
- if(len != 3) {
- return -1;
- }
-
- res = 0;
-
- for(i = 0; i < 3; ++i) {
- char c = value[i];
-
- if(c < '0' || c > '9') {
- return -1;
- }
-
- res *= 10;
- res += c - '0';
- }
-
- return res;
-}
-
/* frame->hd.type is either NGHTTP2_HEADERS or NGHTTP2_PUSH_PROMISE */
static int on_header(nghttp2_session *session, const nghttp2_frame *frame,
const uint8_t *name, size_t namelen,
@@ -1234,9 +1241,9 @@ static int on_header(nghttp2_session *session, const nghttp2_frame *frame,
stream->stream_id,
(int)namelen, name,
(int)valuelen, value));
- result = Curl_dyn_addf(&stream->trailer_recvbuf,
- "%.*s: %.*s\r\n", (int)namelen, name,
- (int)valuelen, value);
+ result = Curl_dynhds_add(&stream->resp_trailers,
+ (const char *)name, namelen,
+ (const char *)value, valuelen);
if(result)
return NGHTTP2_ERR_CALLBACK_FAILURE;
@@ -1245,25 +1252,25 @@ static int on_header(nghttp2_session *session, const nghttp2_frame *frame,
if(namelen == sizeof(H2H3_PSEUDO_STATUS) - 1 &&
memcmp(H2H3_PSEUDO_STATUS, name, namelen) == 0) {
- /* nghttp2 guarantees :status is received first and only once, and
- value is 3 digits status code, and decode_status_code always
- succeeds. */
+ /* nghttp2 guarantees :status is received first and only once. */
char buffer[32];
- stream->status_code = decode_status_code(value, valuelen);
- DEBUGASSERT(stream->status_code != -1);
+ result = Curl_http_decode_status(&stream->status_code,
+ (const char *)value, valuelen);
+ if(result)
+ return NGHTTP2_ERR_CALLBACK_FAILURE;
msnprintf(buffer, sizeof(buffer), H2H3_PSEUDO_STATUS ":%u\r",
stream->status_code);
result = Curl_headers_push(data_s, buffer, CURLH_PSEUDO);
if(result)
return NGHTTP2_ERR_CALLBACK_FAILURE;
- result = Curl_dyn_addn(&stream->header_recvbuf, STRCONST("HTTP/2 "));
+ result = recvbuf_write_hds(cf, data_s, STRCONST("HTTP/2 "));
if(result)
return NGHTTP2_ERR_CALLBACK_FAILURE;
- result = Curl_dyn_addn(&stream->header_recvbuf, value, valuelen);
+ result = recvbuf_write_hds(cf, data_s, (const char *)value, valuelen);
if(result)
return NGHTTP2_ERR_CALLBACK_FAILURE;
/* the space character after the status code is mandatory */
- result = Curl_dyn_addn(&stream->header_recvbuf, STRCONST(" \r\n"));
+ result = recvbuf_write_hds(cf, data_s, STRCONST(" \r\n"));
if(result)
return NGHTTP2_ERR_CALLBACK_FAILURE;
/* if we receive data for another handle, wake that up */
@@ -1278,16 +1285,16 @@ static int on_header(nghttp2_session *session, const nghttp2_frame *frame,
/* nghttp2 guarantees that namelen > 0, and :status was already
received, and this is not pseudo-header field . */
/* convert to an HTTP1-style header */
- result = Curl_dyn_addn(&stream->header_recvbuf, name, namelen);
+ result = recvbuf_write_hds(cf, data_s, (const char *)name, namelen);
if(result)
return NGHTTP2_ERR_CALLBACK_FAILURE;
- result = Curl_dyn_addn(&stream->header_recvbuf, STRCONST(": "));
+ result = recvbuf_write_hds(cf, data_s, STRCONST(": "));
if(result)
return NGHTTP2_ERR_CALLBACK_FAILURE;
- result = Curl_dyn_addn(&stream->header_recvbuf, value, valuelen);
+ result = recvbuf_write_hds(cf, data_s, (const char *)value, valuelen);
if(result)
return NGHTTP2_ERR_CALLBACK_FAILURE;
- result = Curl_dyn_addn(&stream->header_recvbuf, STRCONST("\r\n"));
+ result = recvbuf_write_hds(cf, data_s, STRCONST("\r\n"));
if(result)
return NGHTTP2_ERR_CALLBACK_FAILURE;
/* if we receive data for another handle, wake that up */
@@ -1302,17 +1309,18 @@ static int on_header(nghttp2_session *session, const nghttp2_frame *frame,
return 0; /* 0 is successful */
}
-static ssize_t data_source_read_callback(nghttp2_session *session,
- int32_t stream_id,
- uint8_t *buf, size_t length,
- uint32_t *data_flags,
- nghttp2_data_source *source,
- void *userp)
+static ssize_t req_body_read_callback(nghttp2_session *session,
+ int32_t stream_id,
+ uint8_t *buf, size_t length,
+ uint32_t *data_flags,
+ nghttp2_data_source *source,
+ void *userp)
{
struct Curl_cfilter *cf = userp;
struct Curl_easy *data_s;
struct HTTP *stream = NULL;
- size_t nread;
+ CURLcode result;
+ ssize_t nread;
(void)source;
(void)cf;
@@ -1332,23 +1340,25 @@ static ssize_t data_source_read_callback(nghttp2_session *session,
else
return NGHTTP2_ERR_INVALID_ARGUMENT;
- nread = CURLMIN(stream->upload_len, length);
- if(nread > 0) {
- memcpy(buf, stream->upload_mem, nread);
- stream->upload_mem += nread;
- stream->upload_len -= nread;
- if(data_s->state.infilesize != -1)
- stream->upload_left -= nread;
+ nread = Curl_bufq_read(&stream->h2_sendbuf, buf, length, &result);
+ if(nread < 0) {
+ if(result != CURLE_AGAIN)
+ return NGHTTP2_ERR_CALLBACK_FAILURE;
+ nread = 0;
}
+ if(nread > 0 && data_s->state.infilesize != -1)
+ stream->upload_left -= nread;
+
+ DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] req_body_read(len=%zu) left=%zd"
+ " -> %zd, %d",
+ stream_id, length, stream->upload_left, nread, result));
+
if(stream->upload_left == 0)
*data_flags = NGHTTP2_DATA_FLAG_EOF;
else if(nread == 0)
return NGHTTP2_ERR_DEFERRED;
- DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] data_source_read_callback: "
- "returns %zu bytes", stream_id, nread));
-
return nread;
}
@@ -1374,8 +1384,9 @@ static void http2_data_done(struct Curl_cfilter *cf,
/* there might be allocated resources done before this got the 'h2' pointer
setup */
- Curl_dyn_free(&stream->header_recvbuf);
- Curl_dyn_free(&stream->trailer_recvbuf);
+ Curl_bufq_free(&stream->h2_sendbuf);
+ Curl_bufq_free(&stream->h2_recvbuf);
+ Curl_dynhds_free(&stream->resp_trailers);
if(stream->push_headers) {
/* if they weren't used and then freed before */
for(; stream->push_headers_used > 0; --stream->push_headers_used) {
@@ -1388,24 +1399,17 @@ static void http2_data_done(struct Curl_cfilter *cf,
if(!ctx || !ctx->h2)
return;
- /* do this before the reset handling, as that might clear ->stream_id */
- if(stream->stream_id && stream->stream_id == ctx->pause_stream_id) {
- DEBUGF(LOG_CF(data, cf, "[h2sid=%u] DONE, the pause stream",
- stream->stream_id));
- ctx->pause_stream_id = 0;
- }
-
(void)premature;
if(!stream->closed && stream->stream_id) {
/* RST_STREAM */
- DEBUGF(LOG_CF(data, cf, "[h2sid=%u] RST", stream->stream_id));
+ DEBUGF(LOG_CF(data, cf, "[h2sid=%u] premature DATA_DONE, RST stream",
+ stream->stream_id));
if(!nghttp2_submit_rst_stream(ctx->h2, NGHTTP2_FLAG_NONE,
stream->stream_id, NGHTTP2_STREAM_CLOSED))
(void)nghttp2_session_send(ctx->h2);
}
- if(data->state.drain)
- drained_transfer(cf, data);
+ drained_transfer(cf, data);
/* -1 means unassigned and 0 means cleared */
if(nghttp2_session_get_stream_user_data(ctx->h2, stream->stream_id)) {
@@ -1458,78 +1462,6 @@ CURLcode Curl_http2_request_upgrade(struct dynbuf *req,
return result;
}
-/*
- * h2_process_pending_input() processes pending input left in
- * httpc->inbuf. Then, call h2_session_send() to send pending data.
- * This function returns 0 if it succeeds, or -1 and error code will
- * be assigned to *err.
- */
-static int h2_process_pending_input(struct Curl_cfilter *cf,
- struct Curl_easy *data,
- CURLcode *err)
-{
- struct cf_h2_ctx *ctx = cf->ctx;
- ssize_t nread;
- ssize_t rv;
-
- nread = ctx->inbuflen - ctx->nread_inbuf;
- if(nread) {
- char *inbuf = ctx->inbuf + ctx->nread_inbuf;
-
- rv = nghttp2_session_mem_recv(ctx->h2, (const uint8_t *)inbuf, nread);
- if(rv < 0) {
- failf(data,
- "h2_process_pending_input: nghttp2_session_mem_recv() returned "
- "%zd:%s", rv, nghttp2_strerror((int)rv));
- *err = CURLE_RECV_ERROR;
- return -1;
- }
-
- if(nread == rv) {
- DEBUGF(LOG_CF(data, cf, "all data in connection buffer processed"));
- ctx->inbuflen = 0;
- ctx->nread_inbuf = 0;
- }
- else {
- ctx->nread_inbuf += rv;
- DEBUGF(LOG_CF(data, cf, "h2_process_pending_input: %zu bytes left "
- "in connection buffer",
- ctx->inbuflen - ctx->nread_inbuf));
- }
- }
-
- rv = h2_session_send(cf, data);
- if(rv) {
- *err = CURLE_SEND_ERROR;
- return -1;
- }
-
- if(nghttp2_session_check_request_allowed(ctx->h2) == 0) {
- /* No more requests are allowed in the current session, so
- the connection may not be reused. This is set when a
- GOAWAY frame has been received or when the limit of stream
- identifiers has been reached. */
- connclose(cf->conn, "http/2: No new requests allowed");
- }
-
- if(should_close_session(ctx)) {
- struct HTTP *stream = data->req.p.http;
- DEBUGF(LOG_CF(data, cf,
- "h2_process_pending_input: nothing to do in this session"));
- if(stream->reset)
- *err = CURLE_PARTIAL_FILE;
- else if(stream->error)
- *err = CURLE_HTTP2;
- else {
- /* not an error per se, but should still close the connection */
- connclose(cf->conn, "GOAWAY received");
- *err = CURLE_OK;
- }
- return -1;
- }
- return 0;
-}
-
static CURLcode http2_data_done_send(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
@@ -1540,32 +1472,16 @@ static CURLcode http2_data_done_send(struct Curl_cfilter *cf,
if(!ctx || !ctx->h2)
goto out;
+ DEBUGF(LOG_CF(data, cf, "[h2sid=%u] data done", stream->stream_id));
if(stream->upload_left) {
/* If the stream still thinks there's data left to upload. */
- stream->upload_left = 0; /* DONE! */
+ if(stream->upload_left == -1)
+ stream->upload_left = 0; /* DONE! */
/* resume sending here to trigger the callback to get called again so
that it can signal EOF to nghttp2 */
(void)nghttp2_session_resume_data(ctx->h2, stream->stream_id);
- (void)h2_process_pending_input(cf, data, &result);
- }
-
- /* If nghttp2 still has pending frames unsent */
- if(nghttp2_session_want_write(ctx->h2)) {
- struct SingleRequest *k = &data->req;
- int rv;
-
- DEBUGF(LOG_CF(data, cf, "HTTP/2 still wants to send data"));
-
- /* and attempt to send the pending frames */
- rv = h2_session_send(cf, data);
- if(rv)
- result = CURLE_SEND_ERROR;
-
- if(nghttp2_session_want_write(ctx->h2)) {
- /* re-set KEEP_SEND to make sure we are called again */
- k->keepon |= KEEP_SEND;
- }
+ drain_this(cf, data);
}
out:
@@ -1576,20 +1492,10 @@ static ssize_t http2_handle_stream_close(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct HTTP *stream, CURLcode *err)
{
- struct cf_h2_ctx *ctx = cf->ctx;
-
- if(ctx->pause_stream_id == stream->stream_id) {
- ctx->pause_stream_id = 0;
- }
+ ssize_t rv = 0;
drained_transfer(cf, data);
- if(ctx->pause_stream_id == 0) {
- if(h2_process_pending_input(cf, data, err) != 0) {
- return -1;
- }
- }
-
if(stream->error == NGHTTP2_REFUSED_STREAM) {
DEBUGF(LOG_CF(data, cf, "[h2sid=%u] REFUSED_STREAM, try again on a new "
"connection", stream->stream_id));
@@ -1619,34 +1525,42 @@ static ssize_t http2_handle_stream_close(struct Curl_cfilter *cf,
return -1;
}
- if(Curl_dyn_len(&stream->trailer_recvbuf)) {
- char *trailp = Curl_dyn_ptr(&stream->trailer_recvbuf);
- char *lf;
+ if(Curl_dynhds_count(&stream->resp_trailers)) {
+ struct dynhds_entry *e;
+ struct dynbuf dbuf;
+ size_t i;
- do {
- size_t len = 0;
- CURLcode result;
- /* each trailer line ends with a newline */
- lf = strchr(trailp, '\n');
- if(!lf)
+ *err = CURLE_OK;
+ Curl_dyn_init(&dbuf, DYN_TRAILERS);
+ for(i = 0; i < Curl_dynhds_count(&stream->resp_trailers); ++i) {
+ e = Curl_dynhds_getn(&stream->resp_trailers, i);
+ if(!e)
break;
- len = lf + 1 - trailp;
-
- Curl_debug(data, CURLINFO_HEADER_IN, trailp, len);
- /* pass the trailers one by one to the callback */
- result = Curl_client_write(data, CLIENTWRITE_HEADER, trailp, len);
- if(result) {
- *err = result;
- return -1;
- }
- trailp = ++lf;
- } while(lf);
+ Curl_dyn_reset(&dbuf);
+ *err = Curl_dyn_addf(&dbuf, "%.*s: %.*s\x0d\x0a",
+ (int)e->namelen, e->name,
+ (int)e->valuelen, e->value);
+ if(*err)
+ break;
+ Curl_debug(data, CURLINFO_HEADER_IN, Curl_dyn_ptr(&dbuf),
+ Curl_dyn_len(&dbuf));
+ *err = Curl_client_write(data, CLIENTWRITE_HEADER|CLIENTWRITE_TRAILER,
+ Curl_dyn_ptr(&dbuf), Curl_dyn_len(&dbuf));
+ if(*err)
+ break;
+ }
+ Curl_dyn_free(&dbuf);
+ if(*err)
+ goto out;
}
stream->close_handled = TRUE;
+ *err = CURLE_OK;
+ rv = 0;
- DEBUGF(LOG_CF(data, cf, "[h2sid=%u] closed cleanly", stream->stream_id));
- return 0;
+out:
+ DEBUGF(LOG_CF(data, cf, "handle_stream_close -> %zd, %d", rv, *err));
+ return rv;
}
static int sweight_wanted(const struct Curl_easy *data)
@@ -1683,12 +1597,13 @@ static void h2_pri_spec(struct Curl_easy *data,
}
/*
- * h2_session_send() checks if there's been an update in the priority /
+ * Check if there's been an update in the priority /
* dependency settings and if so it submits a PRIORITY frame with the updated
* info.
+ * Flush any out data pending in the network buffer.
*/
-static CURLcode h2_session_send(struct Curl_cfilter *cf,
- struct Curl_easy *data)
+static CURLcode h2_progress_egress(struct Curl_cfilter *cf,
+ struct Curl_easy *data)
{
struct cf_h2_ctx *ctx = cf->ctx;
struct HTTP *stream = data->req.p.http;
@@ -1717,259 +1632,169 @@ out:
nghttp2_strerror(rv), rv));
return CURLE_SEND_ERROR;
}
- return flush_output(cf, data);
+ return nw_out_flush(cf, data);
}
-static ssize_t cf_h2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
- char *buf, size_t len, CURLcode *err)
+static ssize_t stream_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
+ char *buf, size_t len, CURLcode *err)
{
struct cf_h2_ctx *ctx = cf->ctx;
struct HTTP *stream = data->req.p.http;
ssize_t nread = -1;
- struct cf_call_data save;
- bool conn_is_closed = FALSE;
-
- CF_DATA_SAVE(save, cf, data);
- /* If the h2 session has told us to GOAWAY with an error AND
- * indicated the highest stream id it has processes AND
- * the stream we are trying to read has a higher id, this
- * means we will most likely not receive any more for it.
- * Treat this as if the server explicitly had RST the stream */
- if((ctx->goaway && ctx->goaway_error &&
- ctx->last_stream_id > 0 &&
- ctx->last_stream_id < stream->stream_id)) {
- stream->reset = TRUE;
+ *err = CURLE_AGAIN;
+ if(!Curl_bufq_is_empty(&stream->h2_recvbuf)) {
+ nread = Curl_bufq_read(&stream->h2_recvbuf,
+ (unsigned char *)buf, len, err);
+ DEBUGF(LOG_CF(data, cf, "recvbuf read(len=%zu) -> %zd, %d",
+ len, nread, *err));
+ if(nread < 0)
+ goto out;
+ DEBUGASSERT(nread > 0);
}
- /* If a stream is RST, it does not matter what state the h2 session
- * is in, our answer to receiving data is always the same. */
- if(stream->reset) {
- *err = stream->bodystarted? CURLE_PARTIAL_FILE : CURLE_RECV_ERROR;
+ if(nread < 0) {
+ if(stream->closed) {
+ nread = http2_handle_stream_close(cf, data, stream, err);
+ }
+ else if(stream->reset ||
+ (ctx->conn_closed && Curl_bufq_is_empty(&ctx->inbufq)) ||
+ (ctx->goaway && ctx->last_stream_id < stream->stream_id)) {
+ *err = stream->bodystarted? CURLE_PARTIAL_FILE : CURLE_RECV_ERROR;
+ nread = -1;
+ }
+ }
+ else if(nread == 0) {
+ *err = CURLE_AGAIN;
nread = -1;
- goto out;
}
- if(should_close_session(ctx)) {
- DEBUGF(LOG_CF(data, cf, "http2_recv: nothing to do in this session"));
- if(cf->conn->bits.close) {
- /* already marked for closure, return OK and we're done */
- drained_transfer(cf, data);
- *err = CURLE_OK;
- nread = 0;
- goto out;
- }
- *err = CURLE_HTTP2;
- nread = -1;
- goto out;
+out:
+ DEBUGF(LOG_CF(data, cf, "stream_recv(len=%zu) -> %zd, %d",
+ len, nread, *err));
+ return nread;
+}
+
+static CURLcode h2_progress_ingress(struct Curl_cfilter *cf,
+ struct Curl_easy *data)
+{
+ struct cf_h2_ctx *ctx = cf->ctx;
+ struct HTTP *stream = data->req.p.http;
+ CURLcode result = CURLE_OK;
+ ssize_t nread;
+ bool keep_reading = TRUE;
+
+ /* Process network input buffer fist */
+ if(!Curl_bufq_is_empty(&ctx->inbufq)) {
+ DEBUGF(LOG_CF(data, cf, "Process %zd bytes in connection buffer",
+ Curl_bufq_len(&ctx->inbufq)));
+ if(h2_process_pending_input(cf, data, &result) < 0)
+ return result;
}
- /* Nullify here because we call nghttp2_session_send() and they
- might refer to the old buffer. */
- stream->upload_mem = NULL;
- stream->upload_len = 0;
+ /* Receive data from the "lower" filters, e.g. network until
+ * it is time to stop or we have enough data for this stream */
+ while(keep_reading &&
+ !ctx->conn_closed && /* not closed the connection */
+ !stream->closed && /* nor the stream */
+ Curl_bufq_is_empty(&ctx->inbufq) && /* and we consumed our input */
+ !Curl_bufq_is_full(&stream->h2_recvbuf) && /* enough? */
+ Curl_bufq_len(&stream->h2_recvbuf) < data->set.buffer_size) {
+
+ nread = Curl_bufq_slurp(&ctx->inbufq, nw_in_reader, cf, &result);
+ DEBUGF(LOG_CF(data, cf, "read %zd bytes nw data -> %zd, %d",
+ Curl_bufq_len(&ctx->inbufq), nread, result));
+ if(nread < 0) {
+ if(result != CURLE_AGAIN) {
+ failf(data, "Failed receiving HTTP2 data");
+ return result;
+ }
+ break;
+ }
+ else if(nread == 0) {
+ ctx->conn_closed = TRUE;
+ break;
+ }
- /*
- * At this point 'stream' is just in the Curl_easy the connection
- * identifies as its owner at this time.
- */
+ keep_reading = Curl_bufq_is_full(&ctx->inbufq);
+ if(h2_process_pending_input(cf, data, &result))
+ return result;
+ }
- if(stream->bodystarted &&
- stream->nread_header_recvbuf < Curl_dyn_len(&stream->header_recvbuf)) {
- /* If there is header data pending for this stream to return, do that */
- size_t left =
- Curl_dyn_len(&stream->header_recvbuf) - stream->nread_header_recvbuf;
- size_t ncopy = CURLMIN(len, left);
- memcpy(buf, Curl_dyn_ptr(&stream->header_recvbuf) +
- stream->nread_header_recvbuf, ncopy);
- stream->nread_header_recvbuf += ncopy;
-
- DEBUGF(LOG_CF(data, cf, "recv: Got %d bytes from header_recvbuf",
- (int)ncopy));
- nread = ncopy;
- goto out;
+ if(ctx->conn_closed && Curl_bufq_is_empty(&ctx->inbufq)) {
+ connclose(cf->conn, "GOAWAY received");
}
+ return CURLE_OK;
+}
+
+static ssize_t cf_h2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
+ char *buf, size_t len, CURLcode *err)
+{
+ struct cf_h2_ctx *ctx = cf->ctx;
+ struct HTTP *stream = data->req.p.http;
+ ssize_t nread = -1;
+ CURLcode result;
+ struct cf_call_data save;
+
+ CF_DATA_SAVE(save, cf, data);
+
DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_recv: win %u/%u",
stream->stream_id,
nghttp2_session_get_local_window_size(ctx->h2),
nghttp2_session_get_stream_local_window_size(ctx->h2,
stream->stream_id)
));
+ nread = stream_recv(cf, data, buf, len, err);
+ if(nread < 0 && *err != CURLE_AGAIN)
+ goto out;
- if(stream->memlen) {
- DEBUGF(LOG_CF(data, cf, "[h2sid=%u] recv: DRAIN %zu bytes (%p => %p)",
- stream->stream_id, stream->memlen,
- (void *)stream->mem, (void *)buf));
- if(buf != stream->mem) {
- /* if we didn't get the same buffer this time, we must move the data to
- the beginning */
- memmove(buf, stream->mem, stream->memlen);
- stream->len = len - stream->memlen;
- stream->mem = buf;
- }
-
- if(ctx->pause_stream_id == stream->stream_id && !stream->pausedata) {
- /* We have paused nghttp2, but we have no pause data (see
- on_data_chunk_recv). */
- ctx->pause_stream_id = 0;
- if(h2_process_pending_input(cf, data, err) != 0) {
- nread = -1;
- goto out;
- }
- }
- }
- else if(stream->pausedata) {
- DEBUGASSERT(ctx->pause_stream_id == stream->stream_id);
- nread = CURLMIN(len, stream->pauselen);
- memcpy(buf, stream->pausedata, nread);
-
- stream->pausedata += nread;
- stream->pauselen -= nread;
- drain_this(cf, data);
-
- if(stream->pauselen == 0) {
- DEBUGF(LOG_CF(data, cf, "[h2sid=%u] Unpaused", stream->stream_id));
- DEBUGASSERT(ctx->pause_stream_id == stream->stream_id);
- ctx->pause_stream_id = 0;
+ if(nread < 0) {
+ *err = h2_progress_ingress(cf, data);
+ if(*err)
+ goto out;
- stream->pausedata = NULL;
- stream->pauselen = 0;
- }
- DEBUGF(LOG_CF(data, cf, "[h2sid=%u] recv: returns unpaused %zd bytes",
- stream->stream_id, nread));
- goto out;
- }
- else if(ctx->pause_stream_id) {
- /* If a stream paused nghttp2_session_mem_recv previously, and has
- not processed all data, it still refers to the buffer in
- nghttp2_session. If we call nghttp2_session_mem_recv(), we may
- overwrite that buffer. To avoid that situation, just return
- here with CURLE_AGAIN. This could be busy loop since data in
- socket is not read. But it seems that usually streams are
- notified with its drain property, and socket is read again
- quickly. */
- if(stream->closed) {
- /* closed overrides paused */
+ nread = stream_recv(cf, data, buf, len, err);
+ if(Curl_bufq_is_empty(&stream->h2_recvbuf)) {
drained_transfer(cf, data);
- nread = 0;
- goto out;
}
- DEBUGF(LOG_CF(data, cf, "[h2sid=%u] is paused, pause h2sid: %u",
- stream->stream_id, ctx->pause_stream_id));
- *err = CURLE_AGAIN;
- nread = -1;
- goto out;
}
- else {
- /* We have nothing buffered for `data` and no other stream paused
- * the processing of incoming data, we can therefore read new data
- * from the network.
- * If DATA is coming for this stream, we want to store it ad the
- * `buf` passed in right away - saving us a copy.
- */
- stream->mem = buf;
- stream->len = len;
- stream->memlen = 0;
-
- if(ctx->inbuflen > 0) {
- DEBUGF(LOG_CF(data, cf, "[h2sid=%u] %zd bytes in inbuf",
- stream->stream_id, ctx->inbuflen - ctx->nread_inbuf));
- if(h2_process_pending_input(cf, data, err))
- return -1;
- }
- while(stream->memlen == 0 && /* have no data for this stream */
- !stream->closed && /* and it is not closed/reset */
- !ctx->pause_stream_id && /* we are not paused either */
- ctx->inbuflen == 0 && /* and out input buffer is empty */
- !conn_is_closed) { /* and connection is not closed */
- /* Receive data from the "lower" filters */
- nread = Curl_conn_cf_recv(cf->next, data, ctx->inbuf, H2_BUFSIZE, err);
- if(nread < 0) {
- DEBUGASSERT(*err);
- if(*err == CURLE_AGAIN) {
- break;
- }
- failf(data, "Failed receiving HTTP2 data");
- conn_is_closed = TRUE;
- }
- else if(nread == 0) {
- DEBUGF(LOG_CF(data, cf, "[h2sid=%u] underlying connection is closed",
- stream->stream_id));
- conn_is_closed = TRUE;
+ if(nread > 0) {
+ size_t data_consumed = (size_t)nread;
+ /* Now that we transferred this to the upper layer, we report
+ * the actual amount of DATA consumed to the H2 session, so
+ * that it adjusts stream flow control */
+ if(stream->h2_recv_hds_len >= data_consumed) {
+ stream->h2_recv_hds_len -= data_consumed; /* no DATA */
+ }
+ else {
+ if(stream->h2_recv_hds_len) {
+ data_consumed -= stream->h2_recv_hds_len;
+ stream->h2_recv_hds_len = 0;
}
- else {
- DEBUGF(LOG_CF(data, cf, "[h2sid=%u] read %zd from connection",
- stream->stream_id, nread));
- ctx->inbuflen = nread;
- DEBUGASSERT(ctx->nread_inbuf == 0);
- if(h2_process_pending_input(cf, data, err))
- return -1;
+ if(data_consumed) {
+ DEBUGF(LOG_CF(data, cf, "[h2sid=%u] increase window by %zu",
+ stream->stream_id, data_consumed));
+ nghttp2_session_consume(ctx->h2, stream->stream_id, data_consumed);
}
}
- }
-
- if(stream->memlen) {
- ssize_t retlen = stream->memlen;
-
- /* TODO: all this buffer handling is very brittle */
- stream->len += stream->memlen;
- stream->memlen = 0;
-
- if(ctx->pause_stream_id == stream->stream_id) {
- /* data for this stream is returned now, but this stream caused a pause
- already so we need it called again asap */
- DEBUGF(LOG_CF(data, cf, "[h2sid=%u] Data returned for PAUSED stream",
- stream->stream_id));
- drain_this(cf, data);
- Curl_expire(data, 0, EXPIRE_RUN_NOW);
- }
- else if(stream->closed) {
- if(stream->reset || stream->error) {
- nread = http2_handle_stream_close(cf, data, stream, err);
- goto out;
- }
- /* this stream is closed, trigger a another read ASAP to detect that */
- DEBUGF(LOG_CF(data, cf, "[h2sid=%u] is closed now, run again",
+ if(stream->closed) {
+ DEBUGF(LOG_CF(data, cf, "[h2sid=%u] closed stream, set drain",
stream->stream_id));
drain_this(cf, data);
- Curl_expire(data, 0, EXPIRE_RUN_NOW);
}
- else {
- drained_transfer(cf, data);
- }
-
- *err = CURLE_OK;
- nread = retlen;
- goto out;
}
- if(conn_is_closed && !stream->closed) {
- /* underlying connection is closed and we have nothing for the stream.
- * Treat this as a RST */
- stream->closed = stream->reset = TRUE;
- failf(data, "HTTP/2 stream %u was not closed cleanly before"
- " end of the underlying connection",
- stream->stream_id);
- }
-
- if(stream->closed) {
- nread = http2_handle_stream_close(cf, data, stream, err);
- goto out;
- }
-
- if(!data->state.drain && Curl_conn_cf_data_pending(cf->next, data)) {
- DEBUGF(LOG_CF(data, cf, "[h2sid=%u] pending data, set drain",
- stream->stream_id));
- drain_this(cf, data);
- }
- *err = CURLE_AGAIN;
- nread = -1;
out:
- DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_recv -> %zd, %d",
- stream->stream_id, nread, *err));
+ result = h2_progress_egress(cf, data);
+ if(result) {
+ *err = result;
+ nread = -1;
+ }
+ DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_recv(len=%zu) -> %zd %d",
+ stream->stream_id, len, nread, *err));
CF_DATA_RESTORE(cf, save);
return nread;
}
@@ -1996,9 +1821,15 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
ssize_t nwritten;
CF_DATA_SAVE(save, cf, data);
- DEBUGF(LOG_CF(data, cf, "cf_send(len=%zu) start", len));
if(stream->stream_id != -1) {
+ DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_send: win %u/%u",
+ stream->stream_id,
+ nghttp2_session_get_remote_window_size(ctx->h2),
+ nghttp2_session_get_stream_remote_window_size(
+ ctx->h2, stream->stream_id)
+ ));
+
if(stream->close_handled) {
infof(data, "stream %u closed", stream->stream_id);
*err = CURLE_HTTP2_STREAM;
@@ -2011,25 +1842,36 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
}
/* If stream_id != -1, we have dispatched request HEADERS, and now
are going to send or sending request body in DATA frame */
- stream->upload_mem = buf;
- stream->upload_len = len;
- rv = nghttp2_session_resume_data(ctx->h2, stream->stream_id);
- if(nghttp2_is_fatal(rv)) {
- *err = CURLE_SEND_ERROR;
+ nwritten = Curl_bufq_write(&stream->h2_sendbuf, buf, len, err);
+ if(nwritten < 0) {
+ if(*err != CURLE_AGAIN)
+ goto out;
+ nwritten = 0;
+ }
+
+ if(!Curl_bufq_is_empty(&stream->h2_sendbuf)) {
+ rv = nghttp2_session_resume_data(ctx->h2, stream->stream_id);
+ if(nghttp2_is_fatal(rv)) {
+ *err = CURLE_SEND_ERROR;
+ nwritten = -1;
+ goto out;
+ }
+ }
+
+ result = h2_progress_ingress(cf, data);
+ if(result) {
+ *err = result;
nwritten = -1;
goto out;
}
- result = h2_session_send(cf, data);
+
+ result = h2_progress_egress(cf, data);
if(result) {
*err = result;
nwritten = -1;
goto out;
}
- nwritten = (ssize_t)len - (ssize_t)stream->upload_len;
- stream->upload_mem = NULL;
- stream->upload_len = 0;
-
if(should_close_session(ctx)) {
DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session"));
*err = CURLE_HTTP2;
@@ -2037,14 +1879,6 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
goto out;
}
- if(stream->upload_left) {
- /* we are sure that we have more data to send here. Calling the
- following API will make nghttp2_session_want_write() return
- nonzero if remote window allows it, which then libcurl checks
- socket is writable or not. See http2_perform_getsock(). */
- nghttp2_session_resume_data(ctx->h2, stream->stream_id);
- }
-
if(!nwritten) {
size_t rwin = nghttp2_session_get_stream_remote_window_size(ctx->h2,
stream->stream_id);
@@ -2060,18 +1894,23 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
"window is exhausted", stream->stream_id));
}
}
- DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_send returns %zd ",
- stream->stream_id, nwritten));
-
/* handled writing BODY for open stream. */
goto out;
}
+
+ DEBUGF(LOG_CF(data, cf, "cf_send, submit %s", data->state.url));
+ if(!stream->h2_send_hds_len) {
+ /* first invocation carries the HTTP/1.1 formatted request headers.
+ * we remember that in case we EAGAIN this call, because the next
+ * invocation may have added request body data into the buffer. */
+ stream->h2_send_hds_len = len;
+ }
+
/* Stream has not been opened yet. `buf` is expected to contain
- * request headers. */
- /* TODO: this assumes that the `buf` and `len` we are called with
- * is *all* HEADERs and no body. We have no way to determine here
- * if that is indeed the case. */
- result = Curl_pseudo_headers(data, buf, len, NULL, &hreq);
+ * `stream->h2_send_hds_len` bytes of request headers. */
+ DEBUGASSERT(stream->h2_send_hds_len <= len);
+ result = Curl_pseudo_headers(data, buf, stream->h2_send_hds_len,
+ NULL, &hreq);
if(result) {
*err = result;
nwritten = -1;
@@ -2114,7 +1953,7 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
/* data sending without specifying the data amount up front */
stream->upload_left = -1; /* unknown, but not zero */
- data_prd.read_callback = data_source_read_callback;
+ data_prd.read_callback = req_body_read_callback;
data_prd.source.ptr = NULL;
stream_id = nghttp2_submit_request(ctx->h2, &pri_spec, nva, nheader,
&data_prd, data);
@@ -2134,14 +1973,21 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
goto out;
}
+ DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_send(len=%zu) submit %s",
+ stream_id, len, data->state.url));
infof(data, "Using Stream ID: %u (easy handle %p)",
stream_id, (void *)data);
stream->stream_id = stream_id;
- /* See TODO above. We assume that the whole buf was consumed by
- * generating the request headers. */
- nwritten = len;
+ nwritten = stream->h2_send_hds_len;
+
+ result = h2_progress_ingress(cf, data);
+ if(result) {
+ *err = result;
+ nwritten = -1;
+ goto out;
+ }
- result = h2_session_send(cf, data);
+ result = h2_progress_egress(cf, data);
if(result) {
*err = result;
nwritten = -1;
@@ -2155,16 +2001,9 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
goto out;
}
- /* If whole HEADERS frame was sent off to the underlying socket, the nghttp2
- library calls data_source_read_callback. But only it found that no data
- available, so it deferred the DATA transmission. Which means that
- nghttp2_session_want_write() returns 0 on http2_perform_getsock(), which
- results that no writable socket check is performed. To workaround this,
- we issue nghttp2_session_resume_data() here to bring back DATA
- transmission from deferred state. */
- nghttp2_session_resume_data(ctx->h2, stream->stream_id);
-
out:
+ DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_send -> %zd, %d",
+ stream->stream_id, nwritten, *err));
CF_DATA_RESTORE(cf, save);
return nwritten;
}
@@ -2182,7 +2021,7 @@ static int cf_h2_get_select_socks(struct Curl_cfilter *cf,
CF_DATA_SAVE(save, cf, data);
sock[0] = Curl_conn_cf_get_socket(cf, data);
- if(!(k->keepon & KEEP_RECV_PAUSE))
+ if(!(k->keepon & (KEEP_RECV_PAUSE|KEEP_RECV_HOLD)))
/* Unless paused - in an HTTP/2 connection we can basically always get a
frame so we should always be ready for one */
bitmap |= GETSOCK_READSOCK(0);
@@ -2230,10 +2069,13 @@ static CURLcode cf_h2_connect(struct Curl_cfilter *cf,
goto out;
}
- if(-1 == h2_process_pending_input(cf, data, &result)) {
- result = CURLE_HTTP2;
+ result = h2_progress_ingress(cf, data);
+ if(result)
+ goto out;
+
+ result = h2_progress_egress(cf, data);
+ if(result)
goto out;
- }
*done = TRUE;
cf->connected = TRUE;
@@ -2278,7 +2120,7 @@ static CURLcode http2_data_pause(struct Curl_cfilter *cf,
#ifdef NGHTTP2_HAS_SET_LOCAL_WINDOW_SIZE
if(ctx && ctx->h2) {
struct HTTP *stream = data->req.p.http;
- uint32_t window = !pause * HTTP2_HUGE_WINDOW_SIZE;
+ uint32_t window = !pause * H2_STREAM_WINDOW_SIZE;
CURLcode result;
int rv = nghttp2_session_set_local_window_size(ctx->h2,
@@ -2292,7 +2134,7 @@ static CURLcode http2_data_pause(struct Curl_cfilter *cf,
}
/* make sure the window update gets sent */
- result = h2_session_send(cf, data);
+ result = h2_progress_egress(cf, data);
if(result)
return result;
@@ -2329,10 +2171,9 @@ static CURLcode cf_h2_cntrl(struct Curl_cfilter *cf,
result = http2_data_setup(cf, data);
break;
}
- case CF_CTRL_DATA_PAUSE: {
+ case CF_CTRL_DATA_PAUSE:
result = http2_data_pause(cf, data, (arg1 != 0));
break;
- }
case CF_CTRL_DATA_DONE_SEND: {
result = http2_data_done_send(cf, data);
break;
@@ -2352,7 +2193,10 @@ static bool cf_h2_data_pending(struct Curl_cfilter *cf,
const struct Curl_easy *data)
{
struct cf_h2_ctx *ctx = cf->ctx;
- if(ctx && ctx->inbuflen > 0 && ctx->nread_inbuf > ctx->inbuflen)
+ struct HTTP *stream = data->req.p.http;
+
+ if(ctx && (!Curl_bufq_is_empty(&ctx->inbufq)
+ || (stream && !Curl_bufq_is_empty(&stream->h2_recvbuf))))
return TRUE;
return cf->next? cf->next->cft->has_data_pending(cf->next, data) : FALSE;
}
@@ -2606,23 +2450,26 @@ CURLcode Curl_http2_upgrade(struct Curl_easy *data,
if(result)
return result;
- if(nread) {
- /* we are going to copy mem to httpc->inbuf. This is required since
- mem is part of buffer pointed by stream->mem, and callbacks
- called by nghttp2_session_mem_recv() will write stream specific
- data into stream->mem, overwriting data already there. */
- if(H2_BUFSIZE < nread) {
- failf(data, "connection buffer size is too small to store data "
- "following HTTP Upgrade response header: buflen=%d, datalen=%zu",
- H2_BUFSIZE, nread);
+ if(nread > 0) {
+ /* Remaining data from the protocol switch reply is already using
+ * the switched protocol, ie. HTTP/2. We add that to the network
+ * inbufq. */
+ ssize_t copied;
+
+ copied = Curl_bufq_write(&ctx->inbufq,
+ (const unsigned char *)mem, nread, &result);
+ if(copied < 0) {
+ failf(data, "error on copying HTTP Upgrade response: %d", result);
+ return CURLE_RECV_ERROR;
+ }
+ if((size_t)copied < nread) {
+ failf(data, "connection buffer size could not take all data "
+ "from HTTP Upgrade response header: copied=%zd, datalen=%zu",
+ copied, nread);
return CURLE_HTTP2;
}
-
- infof(data, "Copying HTTP/2 data in stream buffer to connection buffer"
+ infof(data, "Copied HTTP/2 data in stream buffer to connection buffer"
" after upgrade: len=%zu", nread);
- DEBUGASSERT(ctx->nread_inbuf == 0);
- memcpy(ctx->inbuf, mem, nread);
- ctx->inbuflen = nread;
}
conn->httpversion = 20; /* we know we're on HTTP/2 now */
diff --git a/tests/data/test1800 b/tests/data/test1800
index d7cc73af4..6b14bfb37 100644
--- a/tests/data/test1800
+++ b/tests/data/test1800
@@ -49,7 +49,7 @@ User-Agent: curl/%VERSION
Accept: */*
Connection: Upgrade, HTTP2-Settings
Upgrade: %H2CVER
-HTTP2-Settings: AAMAAABkAAQCAAAAAAIAAAAA
+HTTP2-Settings: AAMAAABkAAQACAAAAAIAAAAA
</protocol>
</verify>
diff --git a/tests/http/scorecard.py b/tests/http/scorecard.py
index 271bf31fd..294008d01 100644
--- a/tests/http/scorecard.py
+++ b/tests/http/scorecard.py
@@ -59,17 +59,16 @@ class ScoreCard:
def handshakes(self, proto: str) -> Dict[str, Any]:
props = {}
- sample_size = 10
- self.info(f'handshaking ')
+ sample_size = 5
+ self.info(f'TLS Handshake\n')
for authority in [
f'{self.env.authority_for(self.env.domain1, proto)}'
]:
- self.info('localhost')
+ self.info(' localhost...')
c_samples = []
hs_samples = []
errors = []
for i in range(sample_size):
- self.info('.')
curl = CurlClient(env=self.env, silent=True)
url = f'https://{authority}/'
r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True)
@@ -79,20 +78,25 @@ class ScoreCard:
else:
errors.append(f'exit={r.exit_code}')
props['localhost'] = {
- 'connect': mean(c_samples),
- 'handshake': mean(hs_samples),
- 'errors': errors
+ 'ipv4-connect': mean(c_samples),
+ 'ipv4-handshake': mean(hs_samples),
+ 'ipv4-errors': errors,
+ 'ipv6-connect': 0,
+ 'ipv6-handshake': 0,
+ 'ipv6-errors': [],
}
+ self.info('ok.\n')
for authority in [
- 'curl.se', 'google.com', 'cloudflare.com', 'nghttp2.org',
+ 'curl.se', 'nghttp2.org',
]:
+ self.info(f' {authority}...')
+ props[authority] = {}
for ipv in ['ipv4', 'ipv6']:
- self.info(f'{authority}-{ipv}')
+ self.info(f'{ipv}...')
c_samples = []
hs_samples = []
errors = []
for i in range(sample_size):
- self.info('.')
curl = CurlClient(env=self.env, silent=True)
args = [
'--http3-only' if proto == 'h3' else '--http2',
@@ -104,12 +108,10 @@ class ScoreCard:
hs_samples.append(r.stats[0]['time_appconnect'])
else:
errors.append(f'exit={r.exit_code}')
- props[f'{authority}-{ipv}'] = {
- 'connect': mean(c_samples) if len(c_samples) else -1,
- 'handshake': mean(hs_samples) if len(hs_samples) else -1,
- 'errors': errors
- }
- self.info('\n')
+ props[authority][f'{ipv}-connect'] = mean(c_samples) if len(c_samples) else -1
+ props[authority][f'{ipv}-handshake'] = mean(hs_samples) if len(hs_samples) else -1
+ props[authority][f'{ipv}-errors'] = errors
+ self.info('ok.\n')
return props
def _make_docs_file(self, docs_dir: str, fname: str, fsize: int):
@@ -138,16 +140,17 @@ class ScoreCard:
count = 1
samples = []
errors = []
- self.info(f'{sample_size}x single')
+ self.info(f'single...')
for i in range(sample_size):
curl = CurlClient(env=self.env, silent=True)
- r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True)
+ r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True,
+ with_headers=False)
err = self._check_downloads(r, count)
if err:
errors.append(err)
else:
- samples.append(r.stats[0]['speed_download'])
- self.info(f'.')
+ total_size = sum([s['size_download'] for s in r.stats])
+ samples.append(total_size / r.duration.total_seconds())
return {
'count': count,
'samples': sample_size,
@@ -160,17 +163,17 @@ class ScoreCard:
samples = []
errors = []
url = f'{url}?[0-{count - 1}]'
- self.info(f'{sample_size}x{count} serial')
+ self.info(f'serial...')
for i in range(sample_size):
curl = CurlClient(env=self.env, silent=True)
- r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True)
- self.info(f'.')
+ r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True,
+ with_headers=False)
err = self._check_downloads(r, count)
if err:
errors.append(err)
else:
- for s in r.stats:
- samples.append(s['speed_download'])
+ total_size = sum([s['size_download'] for s in r.stats])
+ samples.append(total_size / r.duration.total_seconds())
return {
'count': count,
'samples': sample_size,
@@ -183,19 +186,18 @@ class ScoreCard:
samples = []
errors = []
url = f'{url}?[0-{count - 1}]'
- self.info(f'{sample_size}x{count} parallel')
+ self.info(f'parallel...')
for i in range(sample_size):
curl = CurlClient(env=self.env, silent=True)
- start = datetime.now()
r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True,
- extra_args=['--parallel'])
+ with_headers=False,
+ extra_args=['--parallel', '--parallel-max', str(count)])
err = self._check_downloads(r, count)
if err:
errors.append(err)
else:
- duration = datetime.now() - start
total_size = sum([s['size_download'] for s in r.stats])
- samples.append(total_size / duration.total_seconds())
+ samples.append(total_size / r.duration.total_seconds())
return {
'count': count,
'samples': sample_size,
@@ -210,7 +212,7 @@ class ScoreCard:
'serial': self.transfer_serial(url=url, proto=proto, count=count),
'parallel': self.transfer_parallel(url=url, proto=proto, count=count),
}
- self.info(f'\n')
+ self.info(f'ok.\n')
return props
def downloads(self, proto: str, test_httpd: bool = True,
@@ -234,9 +236,9 @@ class ScoreCard:
url100 = f'https://{self.env.domain1}:{port}/score100.data'
scores[via] = {
'description': descr,
- '1MB-local': self.download_url(url=url1, proto=proto, count=50),
- '10MB-local': self.download_url(url=url10, proto=proto, count=50),
- '100MB-local': self.download_url(url=url100, proto=proto, count=50),
+ '1MB': self.download_url(url=url1, proto=proto, count=50),
+ '10MB': self.download_url(url=url10, proto=proto, count=50),
+ '100MB': self.download_url(url=url100, proto=proto, count=50),
}
if test_caddy and self.caddy:
port = self.caddy.port
@@ -251,15 +253,85 @@ class ScoreCard:
url100 = f'https://{self.env.domain1}:{port}/score100.data'
scores[via] = {
'description': descr,
- '1MB-local': self.download_url(url=url1, proto=proto, count=50),
- '10MB-local': self.download_url(url=url10, proto=proto, count=50),
- '100MB-local': self.download_url(url=url100, proto=proto, count=50),
+ '1MB': self.download_url(url=url1, proto=proto, count=50),
+ '10MB': self.download_url(url=url10, proto=proto, count=50),
+ '100MB': self.download_url(url=url100, proto=proto, count=50),
+ }
+ return scores
+
+ def do_requests(self, url: str, proto: str, count: int, max_parallel: int = 1):
+ sample_size = 1
+ samples = []
+ errors = []
+ url = f'{url}?[0-{count - 1}]'
+ extra_args = ['--parallel', '--parallel-max', str(max_parallel)] if max_parallel > 1 else []
+ self.info(f'{max_parallel}...')
+ for i in range(sample_size):
+ curl = CurlClient(env=self.env)
+ r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True,
+ with_headers=False,
+ extra_args=extra_args)
+ err = self._check_downloads(r, count)
+ if err:
+ errors.append(err)
+ else:
+ for s in r.stats:
+ samples.append(count / r.duration.total_seconds())
+ return {
+ 'count': count,
+ 'samples': sample_size,
+ 'speed': mean(samples) if len(samples) else -1,
+ 'errors': errors
+ }
+
+ def requests_url(self, url: str, proto: str, count: int):
+ self.info(f' {url}: ')
+ props = {
+ 'serial': self.do_requests(url=url, proto=proto, count=count),
+ 'par-6': self.do_requests(url=url, proto=proto, count=count, max_parallel=6),
+ 'par-25': self.do_requests(url=url, proto=proto, count=count, max_parallel=25),
+ 'par-50': self.do_requests(url=url, proto=proto, count=count, max_parallel=50),
+ 'par-100': self.do_requests(url=url, proto=proto, count=count, max_parallel=100),
+ }
+ self.info(f'ok.\n')
+ return props
+
+ def requests(self, proto: str, test_httpd: bool = True,
+ test_caddy: bool = True) -> Dict[str, Any]:
+ scores = {}
+ if test_httpd:
+ if proto == 'h3':
+ port = self.env.h3_port
+ via = 'nghttpx'
+ descr = f'port {port}, proxying httpd'
+ else:
+ port = self.env.https_port
+ via = 'httpd'
+ descr = f'port {port}'
+ self.info(f'{via} requests\n')
+ self._make_docs_file(docs_dir=self.httpd.docs_dir, fname='reqs10.data', fsize=10*1024)
+ url1 = f'https://{self.env.domain1}:{port}/reqs10.data'
+ scores[via] = {
+ 'description': descr,
+ '10KB': self.requests_url(url=url1, proto=proto, count=10000),
+ }
+ if test_caddy and self.caddy:
+ port = self.caddy.port
+ via = 'caddy'
+ descr = f'port {port}'
+ self.info('caddy requests\n')
+ self._make_docs_file(docs_dir=self.caddy.docs_dir, fname='req10.data', fsize=10 * 1024)
+ url1 = f'https://{self.env.domain1}:{port}/req10.data'
+ scores[via] = {
+ 'description': descr,
+ '10KB': self.requests_url(url=url1, proto=proto, count=5000),
}
return scores
def score_proto(self, proto: str,
handshakes: bool = True,
downloads: bool = True,
+ requests: bool = True,
test_httpd: bool = True,
test_caddy: bool = True):
self.info(f"scoring {proto}\n")
@@ -280,6 +352,10 @@ class ScoreCard:
if self.env.curl_uses_lib(lib):
p['implementation'] = lib
break
+ elif proto == 'h1' or proto == 'http/1.1':
+ proto = 'http/1.1'
+ p['name'] = proto
+ p['implementation'] = 'hyper' if self.env.curl_uses_lib('hyper') else 'native'
else:
raise ScoreCardException(f"unknown protocol: {proto}")
@@ -298,6 +374,10 @@ class ScoreCard:
score['downloads'] = self.downloads(proto=proto,
test_httpd=test_httpd,
test_caddy=test_caddy)
+ if requests:
+ score['requests'] = self.requests(proto=proto,
+ test_httpd=test_httpd,
+ test_caddy=test_caddy)
self.info("\n")
return score
@@ -310,44 +390,86 @@ class ScoreCard:
def fmt_mbs(self, val):
return f'{val/(1024*1024):0.000f} MB/s' if val >= 0 else '--'
+ def fmt_reqs(self, val):
+ return f'{val:0.000f} r/s' if val >= 0 else '--'
+
def print_score(self, score):
print(f'{score["protocol"]["name"].upper()} in curl {score["curl"]} ({score["os"]}) via '
f'{score["protocol"]["implementation"]}/{score["protocol"]["version"]} ')
if 'handshakes' in score:
- print('Handshakes')
- print(f' {"Host":<25} {"Connect":>12} {"Handshake":>12} {"Errors":<20}')
+ print(f'{"Handshakes":<24} {"ipv4":25} {"ipv6":28}')
+ print(f' {"Host":<17} {"Connect":>12} {"Handshake":>12} '
+ f'{"Connect":>12} {"Handshake":>12} {"Errors":<20}')
for key, val in score["handshakes"].items():
- print(f' {key:<25} {self.fmt_ms(val["connect"]):>12} '''
- f'{self.fmt_ms(val["handshake"]):>12} {"/".join(val["errors"]):<20}')
+ print(f' {key:<17} {self.fmt_ms(val["ipv4-connect"]):>12} '
+ f'{self.fmt_ms(val["ipv4-handshake"]):>12} '
+ f'{self.fmt_ms(val["ipv6-connect"]):>12} '
+ f'{self.fmt_ms(val["ipv6-handshake"]):>12} {"/".join(val["ipv4-errors"] + val["ipv6-errors"]):<20}'
+ )
if 'downloads' in score:
print('Downloads')
+ print(f' {"Server":<8} {"Size":>8} '
+ f'{"Single":>12} {"Serial":>12} {"Parallel":>12} {"Errors":<20}')
+ skeys = {}
for dkey, dval in score["downloads"].items():
- print(f' {dkey}: {dval["description"]}')
+ for k in dval.keys():
+ skeys[k] = True
+ for skey in skeys:
+ for dkey, dval in score["downloads"].items():
+ if skey in dval:
+ sval = dval[skey]
+ if isinstance(sval, str):
+ continue
+ errors = []
+ for key, val in sval.items():
+ if 'errors' in val:
+ errors.extend(val['errors'])
+ print(f' {dkey:<8} {skey:>8} '
+ f'{self.fmt_mbs(sval["single"]["speed"]):>12} '
+ f'{self.fmt_mbs(sval["serial"]["speed"]):>12} '
+ f'{self.fmt_mbs(sval["parallel"]["speed"]):>12} '
+ f' {"/".join(errors):<20}')
+ if 'requests' in score:
+ print('Requests, max in parallel')
+ print(f' {"Server":<8} {"Size":>8} '
+ f'{"1 ":>12} {"6 ":>12} {"25 ":>12} '
+ f'{"50 ":>12} {"100 ":>12} {"Errors":<20}')
+ for dkey, dval in score["requests"].items():
for skey, sval in dval.items():
if isinstance(sval, str):
continue
- print(f' {skey:<13} {"Samples":>10} {"Count":>10} {"Speed":>17} {"Errors":<20}')
+ errors = []
for key, val in sval.items():
- print(f' {key:<11} {val["samples"]:>10} '''
- f'{val["count"]:>10} {self.fmt_mbs(val["speed"]):>17} '
- f'{"/".join(val["errors"]):<20}')
+ if 'errors' in val:
+ errors.extend(val['errors'])
+ print(f' {dkey:<8} {skey:>8} '
+ f'{self.fmt_reqs(sval["serial"]["speed"]):>12} '
+ f'{self.fmt_reqs(sval["par-6"]["speed"]):>12} '
+ f'{self.fmt_reqs(sval["par-25"]["speed"]):>12} '
+ f'{self.fmt_reqs(sval["par-50"]["speed"]):>12} '
+ f'{self.fmt_reqs(sval["par-100"]["speed"]):>12} '
+ f' {"/".join(errors):<20}')
def main(self):
parser = argparse.ArgumentParser(prog='scorecard', description="""
Run a range of tests to give a scorecard for a HTTP protocol
'h3' or 'h2' implementation in curl.
""")
- parser.add_argument("-v", "--verbose", action='count', default=0,
+ parser.add_argument("-v", "--verbose", action='count', default=1,
help="log more output on stderr")
- parser.add_argument("-t", "--text", action='store_true', default=False,
- help="print text instead of json")
+ parser.add_argument("-j", "--json", action='store_true', default=False,
+ help="print json instead of text")
+ parser.add_argument("-H", "--handshakes", action='store_true', default=False,
+ help="evaluate handshakes only")
parser.add_argument("-d", "--downloads", action='store_true', default=False,
help="evaluate downloads only")
+ parser.add_argument("-r", "--requests", action='store_true', default=False,
+ help="evaluate requests only")
parser.add_argument("--httpd", action='store_true', default=False,
help="evaluate httpd server only")
parser.add_argument("--caddy", action='store_true', default=False,
help="evaluate caddy server only")
- parser.add_argument("protocols", nargs='*', help="Name(s) of protocol to score")
+ parser.add_argument("protocol", default='h2', nargs='?', help="Name of protocol to score")
args = parser.parse_args()
self.verbose = args.verbose
@@ -357,13 +479,21 @@ class ScoreCard:
console.setFormatter(logging.Formatter(logging.BASIC_FORMAT))
logging.getLogger('').addHandler(console)
- protocols = args.protocols if len(args.protocols) else ['h2', 'h3']
+ protocol = args.protocol
handshakes = True
downloads = True
- test_httpd = True
+ requests = True
+ test_httpd = protocol != 'h3'
test_caddy = True
+ if args.handshakes:
+ downloads = False
+ requests = False
if args.downloads:
handshakes = False
+ requests = False
+ if args.requests:
+ handshakes = False
+ downloads = False
if args.caddy:
test_caddy = True
test_httpd = False
@@ -383,7 +513,7 @@ class ScoreCard:
assert self.httpd.exists(), f'httpd not found: {self.env.httpd}'
self.httpd.clear_logs()
assert self.httpd.start()
- if 'h3' in protocols:
+ if 'h3' == protocol:
self.nghttpx = Nghttpx(env=self.env)
self.nghttpx.clear_logs()
assert self.nghttpx.start()
@@ -392,15 +522,15 @@ class ScoreCard:
self.caddy.clear_logs()
assert self.caddy.start()
- for p in protocols:
- score = self.score_proto(proto=p, handshakes=handshakes,
- downloads=downloads,
- test_caddy=test_caddy,
- test_httpd=test_httpd)
- if args.text:
- self.print_score(score)
- else:
- print(json.JSONEncoder(indent=2).encode(score))
+ score = self.score_proto(proto=protocol, handshakes=handshakes,
+ downloads=downloads,
+ requests=requests,
+ test_caddy=test_caddy,
+ test_httpd=test_httpd)
+ if args.json:
+ print(json.JSONEncoder(indent=2).encode(score))
+ else:
+ self.print_score(score)
except ScoreCardException as ex:
sys.stderr.write(f"ERROR: {str(ex)}\n")
diff --git a/tests/http/test_07_upload.py b/tests/http/test_07_upload.py
index c2c7e5197..40f178a7c 100644
--- a/tests/http/test_07_upload.py
+++ b/tests/http/test_07_upload.py
@@ -24,6 +24,8 @@
#
###########################################################################
#
+import difflib
+import filecmp
import logging
import os
import pytest
@@ -178,11 +180,7 @@ class TestUpload:
extra_args=['--parallel'])
r.check_exit_code(0)
r.check_stats(count=count, exp_status=200)
- indata = open(fdata).readlines()
- r.check_stats(count=count, exp_status=200)
- for i in range(count):
- respdata = open(curl.response_file(i)).readlines()
- assert respdata == indata
+ self.check_download(count, fdata, curl)
# PUT 100k
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
@@ -222,3 +220,14 @@ class TestUpload:
respdata = open(curl.response_file(i)).readlines()
assert respdata == exp_data
+ def check_download(self, count, srcfile, curl):
+ for i in range(count):
+ dfile = curl.download_file(i)
+ assert os.path.exists(dfile)
+ if not filecmp.cmp(srcfile, dfile, shallow=False):
+ diff = "".join(difflib.unified_diff(a=open(srcfile).readlines(),
+ b=open(dfile).readlines(),
+ fromfile=srcfile,
+ tofile=dfile,
+ n=1))
+ assert False, f'download {dfile} differs:\n{diff}'
diff --git a/tests/http/testenv/caddy.py b/tests/http/testenv/caddy.py
index d789446f9..ea1343a95 100644
--- a/tests/http/testenv/caddy.py
+++ b/tests/http/testenv/caddy.py
@@ -126,10 +126,8 @@ class Caddy:
r = curl.http_get(url=check_url)
if r.exit_code == 0:
return True
- log.error(f'curl: {r}')
- log.debug(f'waiting for caddy to become responsive: {r}')
time.sleep(.1)
- log.error(f"Server still not responding after {timeout}")
+ log.error(f"Caddy still not responding after {timeout}")
return False
def _rmf(self, path):
diff --git a/tests/http/testenv/env.py b/tests/http/testenv/env.py
index caf9249b1..9d1a4255f 100644
--- a/tests/http/testenv/env.py
+++ b/tests/http/testenv/env.py
@@ -63,6 +63,8 @@ class EnvConfig:
self.config = DEF_CONFIG
# check cur and its features
self.curl = CURL
+ if 'CURL' in os.environ:
+ self.curl = os.environ['CURL']
self.curl_props = {
'version': None,
'os': None,