summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Stenberg <daniel@haxx.se>2019-08-22 14:08:18 +0200
committerDaniel Stenberg <daniel@haxx.se>2019-08-23 22:33:29 +0200
commit0a5d28fa2ec872de55c8d3f3b62675f17ca9cd45 (patch)
tree46be1169dec129df20c418b3a98b92ff7ffb4ad9
parent32d64b2e875f0d74cd433dff8bda9f8a98dcd44e (diff)
downloadcurl-0a5d28fa2ec872de55c8d3f3b62675f17ca9cd45.tar.gz
ngtcp2: accept upload via callback
Closes #4256
-rw-r--r--lib/http.h8
-rw-r--r--lib/quic.h5
-rw-r--r--lib/transfer.c2
-rw-r--r--lib/vquic/ngtcp2.c139
-rw-r--r--lib/vquic/quiche.c18
5 files changed, 153 insertions, 19 deletions
diff --git a/lib/http.h b/lib/http.h
index 945aceb56..6232bbc3a 100644
--- a/lib/http.h
+++ b/lib/http.h
@@ -126,6 +126,10 @@ CURLcode Curl_http_auth_act(struct connectdata *conn);
#endif /* CURL_DISABLE_HTTP */
+#ifdef USE_NGHTTP3
+struct h3out; /* see ngtcp2 */
+#endif
+
/****************************************************************************
* HTTP unique setup
***************************************************************************/
@@ -196,6 +200,10 @@ struct HTTP {
int64_t stream3_id; /* stream we are interested in */
bool firstbody; /* FALSE until body arrives */
bool h3req; /* FALSE until request is issued */
+ bool upload_done;
+#endif
+#ifdef USE_NGHTTP3
+ struct h3out *h3out; /* per-stream buffers for upload */
#endif
};
diff --git a/lib/quic.h b/lib/quic.h
index d73ba0c36..6c132a324 100644
--- a/lib/quic.h
+++ b/lib/quic.h
@@ -44,7 +44,10 @@ CURLcode Curl_quic_is_connected(struct connectdata *conn,
curl_socket_t sockfd,
bool *connected);
int Curl_quic_ver(char *p, size_t len);
+CURLcode Curl_quic_done_sending(struct connectdata *conn);
-#endif
+#else /* ENABLE_QUIC */
+#define Curl_quic_done_sending(x)
+#endif /* !ENABLE_QUIC */
#endif /* HEADER_CURL_QUIC_H */
diff --git a/lib/transfer.c b/lib/transfer.c
index ab662fbc0..7e57fbe03 100644
--- a/lib/transfer.c
+++ b/lib/transfer.c
@@ -942,7 +942,9 @@ CURLcode Curl_done_sending(struct connectdata *conn,
{
k->keepon &= ~KEEP_SEND; /* we're done writing */
+ /* These functions should be moved into the handler struct! */
Curl_http2_done_sending(conn);
+ Curl_quic_done_sending(conn);
if(conn->bits.rewindaftersend) {
CURLcode result = Curl_readrewind(conn);
diff --git a/lib/vquic/ngtcp2.c b/lib/vquic/ngtcp2.c
index f9de76960..008a75cfd 100644
--- a/lib/vquic/ngtcp2.c
+++ b/lib/vquic/ngtcp2.c
@@ -50,6 +50,20 @@
#define H3BUGF(x) do { } WHILE_FALSE
#endif
+/*
+ * This holds outgoing HTTP/3 stream data that is used by nghttp3 until acked.
+ * It is used as a circular buffer. Add new bytes at the end until it reaches
+ * the far end, then start over at index 0 again.
+ */
+
+#define H3_SEND_SIZE (20*1024)
+struct h3out {
+ uint8_t buf[H3_SEND_SIZE];
+ size_t used; /* number of bytes used in the buffer */
+ size_t windex; /* index in the buffer where to start writing the next
+ data block */
+};
+
#define QUIC_MAX_STREAMS (256*1024)
#define QUIC_MAX_DATA (1*1024*1024)
#define QUIC_IDLE_TIMEOUT 60000 /* milliseconds */
@@ -63,6 +77,9 @@ static CURLcode ng_process_ingress(struct connectdata *conn,
struct quicsocket *qs);
static CURLcode ng_flush_egress(struct connectdata *conn, int sockfd,
struct quicsocket *qs);
+static int cb_h3_acked_stream_data(nghttp3_conn *conn, int64_t stream_id,
+ size_t datalen, void *user_data,
+ void *stream_user_data);
static ngtcp2_tstamp timestamp(void)
{
@@ -1194,7 +1211,7 @@ static unsigned int ng_conncheck(struct connectdata *conn,
return CONNRESULT_NONE;
}
-static const struct Curl_handler Curl_handler_h3_quiche = {
+static const struct Curl_handler Curl_handler_http3 = {
"HTTPS", /* scheme */
ZERO_NULL, /* setup_connection */
Curl_http, /* do_it */
@@ -1370,7 +1387,7 @@ static int cb_h3_send_stop_sending(nghttp3_conn *conn, int64_t stream_id,
}
static nghttp3_conn_callbacks ngh3_callbacks = {
- NULL, /* acked_stream_data */
+ cb_h3_acked_stream_data, /* acked_stream_data */
cb_h3_stream_close,
cb_h3_recv_data,
cb_h3_deferred_consume,
@@ -1386,6 +1403,7 @@ static nghttp3_conn_callbacks ngh3_callbacks = {
NULL, /* http_cancel_push */
cb_h3_send_stop_sending,
NULL, /* push_stream */
+ NULL, /* end_stream */
};
static int init_ngh3_conn(struct quicsocket *qs)
@@ -1451,6 +1469,7 @@ static int init_ngh3_conn(struct quicsocket *qs)
static Curl_recv ngh3_stream_recv;
static Curl_send ngh3_stream_send;
+/* incoming data frames on the h3 stream */
static ssize_t ngh3_stream_recv(struct connectdata *conn,
int sockindex,
char *buf,
@@ -1497,18 +1516,38 @@ static ssize_t ngh3_stream_recv(struct connectdata *conn,
return -1;
}
+/* this amount of data has now been acked on this stream */
+static int cb_h3_acked_stream_data(nghttp3_conn *conn, int64_t stream_id,
+ size_t datalen, void *user_data,
+ void *stream_user_data)
+{
+ struct Curl_easy *data = stream_user_data;
+ struct HTTP *stream = data->req.protop;
+ (void)conn;
+ (void)stream_id;
+ (void)user_data;
+
+ if(!data->set.postfields) {
+ stream->h3out->used -= datalen;
+ fprintf(stderr, "cb_h3_acked_stream_data, %zd bytes, %zd left unacked\n",
+ datalen, stream->h3out->used);
+ DEBUGASSERT(stream->h3out->used < H3_SEND_SIZE);
+ }
+ return 0;
+}
+
static int cb_h3_readfunction(nghttp3_conn *conn, int64_t stream_id,
const uint8_t **pdata,
size_t *pdatalen, uint32_t *pflags,
void *user_data, void *stream_user_data)
{
struct Curl_easy *data = stream_user_data;
+ size_t nread;
+ struct HTTP *stream = data->req.protop;
(void)conn;
(void)stream_id;
(void)user_data;
- fprintf(stderr, "called cb_h3_readfunction\n");
-
if(data->set.postfields) {
*pdata = data->set.postfields;
*pdatalen = data->state.infilesize;
@@ -1516,6 +1555,48 @@ static int cb_h3_readfunction(nghttp3_conn *conn, int64_t stream_id,
return 0;
}
+ nread = CURLMIN(stream->upload_len, H3_SEND_SIZE - stream->h3out->used);
+ if(nread > 0) {
+ /* nghttp3 wants us to hold on to the data until it tells us it is okay to
+ delete it. Append the data at the end of the h3out buffer. Since we can
+ only return consecutive data, copy the amount that fits and the next
+ part comes in next invoke. */
+ struct h3out *out = stream->h3out;
+ if(nread + out->windex > H3_SEND_SIZE)
+ nread = H3_SEND_SIZE - out->windex;
+
+ memcpy(&out->buf[out->windex], stream->upload_mem, nread);
+ out->windex += nread;
+ out->used += nread;
+
+ /* that's the chunk we return to nghttp3 */
+ *pdata = &out->buf[out->windex];
+ *pdatalen = nread;
+
+ if(out->windex == H3_SEND_SIZE)
+ out->windex = 0; /* wrap */
+ stream->upload_mem += nread;
+ stream->upload_len -= nread;
+ if(data->state.infilesize != -1) {
+ stream->upload_left -= nread;
+ if(!stream->upload_left)
+ *pflags = NGHTTP3_DATA_FLAG_EOF;
+ }
+ fprintf(stderr, "cb_h3_readfunction %zd bytes%s (at %zd unacked)\n",
+ nread, *pflags == NGHTTP3_DATA_FLAG_EOF?" EOF":"",
+ out->used);
+ }
+ if(stream->upload_done && !stream->upload_len &&
+ (stream->upload_left <= 0)) {
+ fprintf(stderr, "!!!!!!!!! cb_h3_readfunction sets EOF\n");
+ *pdata = NULL;
+ *pdatalen = 0;
+ *pflags = NGHTTP3_DATA_FLAG_EOF;
+ }
+ else if(!nread) {
+ *pdatalen = 0;
+ return NGHTTP3_ERR_WOULDBLOCK;
+ }
return 0;
}
@@ -1538,6 +1619,7 @@ static CURLcode http_request(struct connectdata *conn, const void *mem,
nghttp3_nv *nva = NULL;
int64_t stream3_id;
int rc;
+ struct h3out *h3out = NULL;
rc = ngtcp2_conn_open_bidi_stream(qs->qconn, &stream3_id, NULL);
if(rc) {
@@ -1722,6 +1804,13 @@ static CURLcode http_request(struct connectdata *conn, const void *mem,
data_reader.read_data = cb_h3_readfunction;
+ h3out = calloc(sizeof(struct h3out), 1);
+ if(!h3out) {
+ result = CURLE_OUT_OF_MEMORY;
+ goto fail;
+ }
+ stream->h3out = h3out;
+
rc = nghttp3_conn_submit_request(qs->h3conn, stream->stream3_id,
nva, nheader, &data_reader,
conn->data);
@@ -1746,15 +1835,6 @@ static CURLcode http_request(struct connectdata *conn, const void *mem,
Curl_safefree(nva);
- if(!stream->upload_left) {
- /* done with this stream, FIN it */
- rc = nghttp3_conn_end_stream(qs->h3conn, stream->stream3_id);
- if(rc) {
- result = CURLE_SEND_ERROR;
- goto fail;
- }
- }
-
infof(data, "Using HTTP/3 Stream ID: %x (easy handle %p)\n",
stream3_id, (void *)data);
@@ -1784,8 +1864,17 @@ static ssize_t ngh3_stream_send(struct connectdata *conn,
sent = len;
}
else {
- (void)qs;
- /* TODO */
+ fprintf(stderr, "ngh3_stream_send() wants to send %zd bytes\n", len);
+ if(!stream->upload_len) {
+ stream->upload_mem = mem;
+ stream->upload_len = len;
+ (void)nghttp3_conn_resume_stream(qs->h3conn, stream->stream3_id);
+ sent = len;
+ }
+ else {
+ *curlcode = CURLE_AGAIN;
+ return -1;
+ }
}
if(ng_flush_egress(conn, sockfd, qs)) {
@@ -1801,7 +1890,7 @@ static void ng_has_connected(struct connectdata *conn, int tempindex)
{
conn->recv[FIRSTSOCKET] = ngh3_stream_recv;
conn->send[FIRSTSOCKET] = ngh3_stream_send;
- conn->handler = &Curl_handler_h3_quiche;
+ conn->handler = &Curl_handler_http3;
conn->bits.multiplex = TRUE; /* at least potentially multiplexed */
conn->httpversion = 30;
conn->bundle->multiuse = BUNDLE_MULTIPLEX;
@@ -2022,4 +2111,22 @@ static CURLcode ng_flush_egress(struct connectdata *conn, int sockfd,
return CURLE_OK;
}
+
+/*
+ * Called from transfer.c:done_sending when we stop HTTP/3 uploading.
+ */
+CURLcode Curl_quic_done_sending(struct connectdata *conn)
+{
+ if(conn->handler == &Curl_handler_http3) {
+ /* only for HTTP/3 transfers */
+ struct HTTP *stream = conn->data->req.protop;
+ struct quicsocket *qs = conn->quic;
+ fprintf(stderr, "!!! Curl_quic_done_sending stream %zu\n",
+ stream->stream3_id);
+ stream->upload_done = TRUE;
+ (void)nghttp3_conn_resume_stream(qs->h3conn, stream->stream3_id);
+ }
+
+ return CURLE_OK;
+}
#endif
diff --git a/lib/vquic/quiche.c b/lib/vquic/quiche.c
index b84cc4779..43723a5f9 100644
--- a/lib/vquic/quiche.c
+++ b/lib/vquic/quiche.c
@@ -116,7 +116,7 @@ static CURLcode quiche_do(struct connectdata *conn, bool *done)
return Curl_http(conn, done);
}
-static const struct Curl_handler Curl_handler_h3_quiche = {
+static const struct Curl_handler Curl_handler_http3 = {
"HTTPS", /* scheme */
ZERO_NULL, /* setup_connection */
quiche_do, /* do_it */
@@ -232,7 +232,7 @@ static CURLcode quiche_has_connected(struct connectdata *conn,
conn->recv[sockindex] = h3_stream_recv;
conn->send[sockindex] = h3_stream_send;
- conn->handler = &Curl_handler_h3_quiche;
+ conn->handler = &Curl_handler_http3;
conn->bits.multiplex = TRUE; /* at least potentially multiplexed */
conn->httpversion = 30;
conn->bundle->multiuse = BUNDLE_MULTIPLEX;
@@ -750,5 +750,19 @@ fail:
return result;
}
+/*
+ * Called from transfer.c:done_sending when we stop HTTP/3 uploading.
+ */
+CURLcode Curl_quic_done_sending(struct connectdata *conn)
+{
+ if(conn->handler == &Curl_handler_http3) {
+ /* only for HTTP/3 transfers */
+ struct HTTP *stream = conn->data->req.protop;
+ fprintf(stderr, "!!! Curl_quic_done_sending\n");
+ stream->upload_done = TRUE;
+ }
+
+ return CURLE_OK;
+}
#endif