From a44277fa98677977de73ea1c34bcdbac5e24df83 Mon Sep 17 00:00:00 2001 From: Daniel Stenberg Date: Thu, 22 Aug 2019 14:08:18 +0200 Subject: ngtcp2: accept upload via callback [WIP] --- lib/http.h | 8 +++ lib/quic.h | 5 +- lib/transfer.c | 2 + lib/vquic/ngtcp2.c | 139 +++++++++++++++++++++++++++++++++++++++++++++++------ lib/vquic/quiche.c | 18 ++++++- 5 files changed, 153 insertions(+), 19 deletions(-) diff --git a/lib/http.h b/lib/http.h index 945aceb56..6232bbc3a 100644 --- a/lib/http.h +++ b/lib/http.h @@ -126,6 +126,10 @@ CURLcode Curl_http_auth_act(struct connectdata *conn); #endif /* CURL_DISABLE_HTTP */ +#ifdef USE_NGHTTP3 +struct h3out; /* see ngtcp2 */ +#endif + /**************************************************************************** * HTTP unique setup ***************************************************************************/ @@ -196,6 +200,10 @@ struct HTTP { int64_t stream3_id; /* stream we are interested in */ bool firstbody; /* FALSE until body arrives */ bool h3req; /* FALSE until request is issued */ + bool upload_done; +#endif +#ifdef USE_NGHTTP3 + struct h3out *h3out; /* per-stream buffers for upload */ #endif }; diff --git a/lib/quic.h b/lib/quic.h index d73ba0c36..6c132a324 100644 --- a/lib/quic.h +++ b/lib/quic.h @@ -44,7 +44,10 @@ CURLcode Curl_quic_is_connected(struct connectdata *conn, curl_socket_t sockfd, bool *connected); int Curl_quic_ver(char *p, size_t len); +CURLcode Curl_quic_done_sending(struct connectdata *conn); -#endif +#else /* ENABLE_QUIC */ +#define Curl_quic_done_sending(x) +#endif /* !ENABLE_QUIC */ #endif /* HEADER_CURL_QUIC_H */ diff --git a/lib/transfer.c b/lib/transfer.c index ab662fbc0..7e57fbe03 100644 --- a/lib/transfer.c +++ b/lib/transfer.c @@ -942,7 +942,9 @@ CURLcode Curl_done_sending(struct connectdata *conn, { k->keepon &= ~KEEP_SEND; /* we're done writing */ + /* These functions should be moved into the handler struct! */ Curl_http2_done_sending(conn); + Curl_quic_done_sending(conn); if(conn->bits.rewindaftersend) { CURLcode result = Curl_readrewind(conn); diff --git a/lib/vquic/ngtcp2.c b/lib/vquic/ngtcp2.c index f9de76960..008a75cfd 100644 --- a/lib/vquic/ngtcp2.c +++ b/lib/vquic/ngtcp2.c @@ -50,6 +50,20 @@ #define H3BUGF(x) do { } WHILE_FALSE #endif +/* + * This holds outgoing HTTP/3 stream data that is used by nghttp3 until acked. + * It is used as a circular buffer. Add new bytes at the end until it reaches + * the far end, then start over at index 0 again. + */ + +#define H3_SEND_SIZE (20*1024) +struct h3out { + uint8_t buf[H3_SEND_SIZE]; + size_t used; /* number of bytes used in the buffer */ + size_t windex; /* index in the buffer where to start writing the next + data block */ +}; + #define QUIC_MAX_STREAMS (256*1024) #define QUIC_MAX_DATA (1*1024*1024) #define QUIC_IDLE_TIMEOUT 60000 /* milliseconds */ @@ -63,6 +77,9 @@ static CURLcode ng_process_ingress(struct connectdata *conn, struct quicsocket *qs); static CURLcode ng_flush_egress(struct connectdata *conn, int sockfd, struct quicsocket *qs); +static int cb_h3_acked_stream_data(nghttp3_conn *conn, int64_t stream_id, + size_t datalen, void *user_data, + void *stream_user_data); static ngtcp2_tstamp timestamp(void) { @@ -1194,7 +1211,7 @@ static unsigned int ng_conncheck(struct connectdata *conn, return CONNRESULT_NONE; } -static const struct Curl_handler Curl_handler_h3_quiche = { +static const struct Curl_handler Curl_handler_http3 = { "HTTPS", /* scheme */ ZERO_NULL, /* setup_connection */ Curl_http, /* do_it */ @@ -1370,7 +1387,7 @@ static int cb_h3_send_stop_sending(nghttp3_conn *conn, int64_t stream_id, } static nghttp3_conn_callbacks ngh3_callbacks = { - NULL, /* acked_stream_data */ + cb_h3_acked_stream_data, /* acked_stream_data */ cb_h3_stream_close, cb_h3_recv_data, cb_h3_deferred_consume, @@ -1386,6 +1403,7 @@ static nghttp3_conn_callbacks ngh3_callbacks = { NULL, /* http_cancel_push */ cb_h3_send_stop_sending, NULL, /* push_stream */ + NULL, /* end_stream */ }; static int init_ngh3_conn(struct quicsocket *qs) @@ -1451,6 +1469,7 @@ static int init_ngh3_conn(struct quicsocket *qs) static Curl_recv ngh3_stream_recv; static Curl_send ngh3_stream_send; +/* incoming data frames on the h3 stream */ static ssize_t ngh3_stream_recv(struct connectdata *conn, int sockindex, char *buf, @@ -1497,18 +1516,38 @@ static ssize_t ngh3_stream_recv(struct connectdata *conn, return -1; } +/* this amount of data has now been acked on this stream */ +static int cb_h3_acked_stream_data(nghttp3_conn *conn, int64_t stream_id, + size_t datalen, void *user_data, + void *stream_user_data) +{ + struct Curl_easy *data = stream_user_data; + struct HTTP *stream = data->req.protop; + (void)conn; + (void)stream_id; + (void)user_data; + + if(!data->set.postfields) { + stream->h3out->used -= datalen; + fprintf(stderr, "cb_h3_acked_stream_data, %zd bytes, %zd left unacked\n", + datalen, stream->h3out->used); + DEBUGASSERT(stream->h3out->used < H3_SEND_SIZE); + } + return 0; +} + static int cb_h3_readfunction(nghttp3_conn *conn, int64_t stream_id, const uint8_t **pdata, size_t *pdatalen, uint32_t *pflags, void *user_data, void *stream_user_data) { struct Curl_easy *data = stream_user_data; + size_t nread; + struct HTTP *stream = data->req.protop; (void)conn; (void)stream_id; (void)user_data; - fprintf(stderr, "called cb_h3_readfunction\n"); - if(data->set.postfields) { *pdata = data->set.postfields; *pdatalen = data->state.infilesize; @@ -1516,6 +1555,48 @@ static int cb_h3_readfunction(nghttp3_conn *conn, int64_t stream_id, return 0; } + nread = CURLMIN(stream->upload_len, H3_SEND_SIZE - stream->h3out->used); + if(nread > 0) { + /* nghttp3 wants us to hold on to the data until it tells us it is okay to + delete it. Append the data at the end of the h3out buffer. Since we can + only return consecutive data, copy the amount that fits and the next + part comes in next invoke. */ + struct h3out *out = stream->h3out; + if(nread + out->windex > H3_SEND_SIZE) + nread = H3_SEND_SIZE - out->windex; + + memcpy(&out->buf[out->windex], stream->upload_mem, nread); + out->windex += nread; + out->used += nread; + + /* that's the chunk we return to nghttp3 */ + *pdata = &out->buf[out->windex]; + *pdatalen = nread; + + if(out->windex == H3_SEND_SIZE) + out->windex = 0; /* wrap */ + stream->upload_mem += nread; + stream->upload_len -= nread; + if(data->state.infilesize != -1) { + stream->upload_left -= nread; + if(!stream->upload_left) + *pflags = NGHTTP3_DATA_FLAG_EOF; + } + fprintf(stderr, "cb_h3_readfunction %zd bytes%s (at %zd unacked)\n", + nread, *pflags == NGHTTP3_DATA_FLAG_EOF?" EOF":"", + out->used); + } + if(stream->upload_done && !stream->upload_len && + (stream->upload_left <= 0)) { + fprintf(stderr, "!!!!!!!!! cb_h3_readfunction sets EOF\n"); + *pdata = NULL; + *pdatalen = 0; + *pflags = NGHTTP3_DATA_FLAG_EOF; + } + else if(!nread) { + *pdatalen = 0; + return NGHTTP3_ERR_WOULDBLOCK; + } return 0; } @@ -1538,6 +1619,7 @@ static CURLcode http_request(struct connectdata *conn, const void *mem, nghttp3_nv *nva = NULL; int64_t stream3_id; int rc; + struct h3out *h3out = NULL; rc = ngtcp2_conn_open_bidi_stream(qs->qconn, &stream3_id, NULL); if(rc) { @@ -1722,6 +1804,13 @@ static CURLcode http_request(struct connectdata *conn, const void *mem, data_reader.read_data = cb_h3_readfunction; + h3out = calloc(sizeof(struct h3out), 1); + if(!h3out) { + result = CURLE_OUT_OF_MEMORY; + goto fail; + } + stream->h3out = h3out; + rc = nghttp3_conn_submit_request(qs->h3conn, stream->stream3_id, nva, nheader, &data_reader, conn->data); @@ -1746,15 +1835,6 @@ static CURLcode http_request(struct connectdata *conn, const void *mem, Curl_safefree(nva); - if(!stream->upload_left) { - /* done with this stream, FIN it */ - rc = nghttp3_conn_end_stream(qs->h3conn, stream->stream3_id); - if(rc) { - result = CURLE_SEND_ERROR; - goto fail; - } - } - infof(data, "Using HTTP/3 Stream ID: %x (easy handle %p)\n", stream3_id, (void *)data); @@ -1784,8 +1864,17 @@ static ssize_t ngh3_stream_send(struct connectdata *conn, sent = len; } else { - (void)qs; - /* TODO */ + fprintf(stderr, "ngh3_stream_send() wants to send %zd bytes\n", len); + if(!stream->upload_len) { + stream->upload_mem = mem; + stream->upload_len = len; + (void)nghttp3_conn_resume_stream(qs->h3conn, stream->stream3_id); + sent = len; + } + else { + *curlcode = CURLE_AGAIN; + return -1; + } } if(ng_flush_egress(conn, sockfd, qs)) { @@ -1801,7 +1890,7 @@ static void ng_has_connected(struct connectdata *conn, int tempindex) { conn->recv[FIRSTSOCKET] = ngh3_stream_recv; conn->send[FIRSTSOCKET] = ngh3_stream_send; - conn->handler = &Curl_handler_h3_quiche; + conn->handler = &Curl_handler_http3; conn->bits.multiplex = TRUE; /* at least potentially multiplexed */ conn->httpversion = 30; conn->bundle->multiuse = BUNDLE_MULTIPLEX; @@ -2022,4 +2111,22 @@ static CURLcode ng_flush_egress(struct connectdata *conn, int sockfd, return CURLE_OK; } + +/* + * Called from transfer.c:done_sending when we stop HTTP/3 uploading. + */ +CURLcode Curl_quic_done_sending(struct connectdata *conn) +{ + if(conn->handler == &Curl_handler_http3) { + /* only for HTTP/3 transfers */ + struct HTTP *stream = conn->data->req.protop; + struct quicsocket *qs = conn->quic; + fprintf(stderr, "!!! Curl_quic_done_sending stream %zu\n", + stream->stream3_id); + stream->upload_done = TRUE; + (void)nghttp3_conn_resume_stream(qs->h3conn, stream->stream3_id); + } + + return CURLE_OK; +} #endif diff --git a/lib/vquic/quiche.c b/lib/vquic/quiche.c index b84cc4779..43723a5f9 100644 --- a/lib/vquic/quiche.c +++ b/lib/vquic/quiche.c @@ -116,7 +116,7 @@ static CURLcode quiche_do(struct connectdata *conn, bool *done) return Curl_http(conn, done); } -static const struct Curl_handler Curl_handler_h3_quiche = { +static const struct Curl_handler Curl_handler_http3 = { "HTTPS", /* scheme */ ZERO_NULL, /* setup_connection */ quiche_do, /* do_it */ @@ -232,7 +232,7 @@ static CURLcode quiche_has_connected(struct connectdata *conn, conn->recv[sockindex] = h3_stream_recv; conn->send[sockindex] = h3_stream_send; - conn->handler = &Curl_handler_h3_quiche; + conn->handler = &Curl_handler_http3; conn->bits.multiplex = TRUE; /* at least potentially multiplexed */ conn->httpversion = 30; conn->bundle->multiuse = BUNDLE_MULTIPLEX; @@ -750,5 +750,19 @@ fail: return result; } +/* + * Called from transfer.c:done_sending when we stop HTTP/3 uploading. + */ +CURLcode Curl_quic_done_sending(struct connectdata *conn) +{ + if(conn->handler == &Curl_handler_http3) { + /* only for HTTP/3 transfers */ + struct HTTP *stream = conn->data->req.protop; + fprintf(stderr, "!!! Curl_quic_done_sending\n"); + stream->upload_done = TRUE; + } + + return CURLE_OK; +} #endif -- cgit v1.2.1