summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Eissing <stefan@eissing.org>2023-04-21 12:04:46 +0200
committerDaniel Stenberg <daniel@haxx.se>2023-04-25 17:49:28 +0200
commitcab2d56ea52b3724495b3dbc5bfef3343d4ecadc (patch)
tree7aea2eecec9df647106ddb351b6e62d8d6c907f0
parenta97e4eb95f86adb6043a3388250f34841440981e (diff)
downloadcurl-cab2d56ea52b3724495b3dbc5bfef3343d4ecadc.tar.gz
h2/h3: replace `state.drain` counter with `state.dselect_bits`
- `drain` was used by http/2 and http/3 implementations to indicate that the transfer requires send/recv independant from its socket poll state. Intended as a counter, it was used as bool flag only. - a similar mechanism exists on `connectdata->cselect_bits` where specific protocols can indicate something similar, only for the whole connection. - `cselect_bits` are cleard in transfer.c on use and, importantly, also set when the transfer loop expended its `maxloops` tries. `drain` was not cleared by transfer and the http2/3 implementations had to take care of that. - `dselect_bits` is cleared *and* set by the transfer loop. http2/3 does no longer clear it, only set when new events happen. This change unifies the handling of socket poll overrides, extending `cselect_bits` by a easy handle specific value and a common treatment in transfers. Closes #11005
-rw-r--r--lib/http2.c121
-rw-r--r--lib/transfer.c60
-rw-r--r--lib/urldata.h5
-rw-r--r--lib/vquic/curl_msh3.c46
-rw-r--r--lib/vquic/curl_ngtcp2.c35
-rw-r--r--lib/vquic/curl_quiche.c50
-rw-r--r--lib/vquic/vquic.c1
-rw-r--r--tests/http/test_05_errors.py2
8 files changed, 151 insertions, 169 deletions
diff --git a/lib/http2.c b/lib/http2.c
index 4940918f8..9da3cae17 100644
--- a/lib/http2.c
+++ b/lib/http2.c
@@ -154,34 +154,6 @@ static void cf_h2_ctx_free(struct cf_h2_ctx *ctx)
}
}
-/*
- * This specific transfer on this connection has been "drained".
- */
-static void drained_transfer(struct Curl_cfilter *cf,
- struct Curl_easy *data)
-{
- if(data->state.drain) {
- struct cf_h2_ctx *ctx = cf->ctx;
- DEBUGASSERT(ctx->drain_total > 0);
- ctx->drain_total--;
- data->state.drain = 0;
- }
-}
-
-/*
- * Mark this transfer to get "drained".
- */
-static void drain_this(struct Curl_cfilter *cf,
- struct Curl_easy *data)
-{
- if(!data->state.drain) {
- struct cf_h2_ctx *ctx = cf->ctx;
- data->state.drain = 1;
- ctx->drain_total++;
- DEBUGASSERT(ctx->drain_total > 0);
- }
-}
-
/**
* All about the H3 internals of a stream
*/
@@ -213,6 +185,25 @@ struct stream_ctx {
#define H2_STREAM_ID(d) (H2_STREAM_CTX(d)? \
H2_STREAM_CTX(d)->id : -2)
+/*
+ * Mark this transfer to get "drained".
+ */
+static void drain_stream(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ struct stream_ctx *stream)
+{
+ int bits;
+
+ (void)cf;
+ bits = CURL_CSELECT_IN;
+ if(stream->upload_left)
+ bits |= CURL_CSELECT_OUT;
+ if(data->state.dselect_bits != bits) {
+ data->state.dselect_bits = bits;
+ Curl_expire(data, 0, EXPIRE_RUN_NOW);
+ }
+}
+
static CURLcode http2_data_setup(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct stream_ctx **pstream)
@@ -276,8 +267,6 @@ static void http2_data_done(struct Curl_cfilter *cf,
(void)nghttp2_session_send(ctx->h2);
}
- drained_transfer(cf, data);
-
/* -1 means unassigned and 0 means cleared */
if(nghttp2_session_get_stream_user_data(ctx->h2, stream->id)) {
int rv = nghttp2_session_set_stream_user_data(ctx->h2,
@@ -515,8 +504,6 @@ static int h2_process_pending_input(struct Curl_cfilter *cf,
while(Curl_bufq_peek(&ctx->inbufq, &buf, &blen)) {
rv = nghttp2_session_mem_recv(ctx->h2, (const uint8_t *)buf, blen);
- DEBUGF(LOG_CF(data, cf,
- "fed %zu bytes from nw to nghttp2 -> %zd", blen, rv));
if(rv < 0) {
failf(data,
"process_pending_input: nghttp2_session_mem_recv() returned "
@@ -526,7 +513,6 @@ static int h2_process_pending_input(struct Curl_cfilter *cf,
}
Curl_bufq_skip(&ctx->inbufq, (size_t)rv);
if(Curl_bufq_is_empty(&ctx->inbufq)) {
- DEBUGF(LOG_CF(data, cf, "all data in connection buffer processed"));
break;
}
else {
@@ -975,8 +961,7 @@ static CURLcode on_stream_frame(struct Curl_cfilter *cf,
}
}
if(frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
- drain_this(cf, data);
- Curl_expire(data, 0, EXPIRE_RUN_NOW);
+ drain_stream(cf, data, stream);
}
break;
case NGHTTP2_HEADERS:
@@ -1005,10 +990,7 @@ static CURLcode on_stream_frame(struct Curl_cfilter *cf,
DEBUGF(LOG_CF(data, cf, "[h2sid=%d] %zu header bytes",
stream_id, Curl_bufq_len(&stream->recvbuf)));
- if(CF_DATA_CURRENT(cf) != data) {
- drain_this(cf, data);
- Curl_expire(data, 0, EXPIRE_RUN_NOW);
- }
+ drain_stream(cf, data, stream);
break;
case NGHTTP2_PUSH_PROMISE:
DEBUGF(LOG_CF(data, cf, "[h2sid=%d] recv PUSH_PROMISE", stream_id));
@@ -1031,16 +1013,14 @@ static CURLcode on_stream_frame(struct Curl_cfilter *cf,
DEBUGF(LOG_CF(data, cf, "[h2sid=%d] recv RST", stream_id));
stream->closed = TRUE;
stream->reset = TRUE;
- drain_this(cf, data);
- Curl_expire(data, 0, EXPIRE_RUN_NOW);
+ drain_stream(cf, data, stream);
break;
case NGHTTP2_WINDOW_UPDATE:
DEBUGF(LOG_CF(data, cf, "[h2sid=%d] recv WINDOW_UPDATE", stream_id));
if((data->req.keepon & KEEP_SEND_HOLD) &&
(data->req.keepon & KEEP_SEND)) {
data->req.keepon &= ~KEEP_SEND_HOLD;
- drain_this(cf, data);
- Curl_expire(data, 0, EXPIRE_RUN_NOW);
+ drain_stream(cf, data, stream);
DEBUGF(LOG_CF(data, cf, "[h2sid=%d] un-holding after win update",
stream_id));
}
@@ -1156,10 +1136,7 @@ static int on_data_chunk_recv(nghttp2_session *session, uint8_t flags,
}
/* if we receive data for another handle, wake that up */
- if(CF_DATA_CURRENT(cf) != data_s) {
- drain_this(cf, data_s);
- Curl_expire(data_s, 0, EXPIRE_RUN_NOW);
- }
+ drain_stream(cf, data_s, stream);
DEBUGASSERT((size_t)nwritten == len);
DEBUGF(LOG_CF(data_s, cf, "[h2sid=%d] %zd/%zu DATA recvd, "
@@ -1196,10 +1173,7 @@ static int on_stream_close(nghttp2_session *session, int32_t stream_id,
if(stream->error)
stream->reset = TRUE;
- if(CF_DATA_CURRENT(cf) != data_s) {
- drain_this(cf, data_s);
- Curl_expire(data_s, 0, EXPIRE_RUN_NOW);
- }
+ drain_stream(cf, data_s, stream);
/* remove `data_s` from the nghttp2 stream */
rv = nghttp2_session_set_stream_user_data(session, stream_id, 0);
@@ -1529,7 +1503,7 @@ static CURLcode http2_data_done_send(struct Curl_cfilter *cf,
/* resume sending here to trigger the callback to get called again so
that it can signal EOF to nghttp2 */
(void)nghttp2_session_resume_data(ctx->h2, stream->id);
- drain_this(cf, data);
+ drain_stream(cf, data, stream);
}
out:
@@ -1543,14 +1517,17 @@ static ssize_t http2_handle_stream_close(struct Curl_cfilter *cf,
struct stream_ctx *stream = H2_STREAM_CTX(data);
ssize_t rv = 0;
- drained_transfer(cf, data);
-
if(stream->error == NGHTTP2_REFUSED_STREAM) {
DEBUGF(LOG_CF(data, cf, "[h2sid=%d] REFUSED_STREAM, try again on a new "
"connection", stream->id));
connclose(cf->conn, "REFUSED_STREAM"); /* don't use this anymore */
data->state.refused_stream = TRUE;
- *err = CURLE_RECV_ERROR; /* trigger Curl_retry_request() later */
+ *err = CURLE_SEND_ERROR; /* trigger Curl_retry_request() later */
+ return -1;
+ }
+ else if(stream->reset) {
+ failf(data, "HTTP/2 stream %u was reset", stream->id);
+ *err = stream->bodystarted? CURLE_PARTIAL_FILE : CURLE_RECV_ERROR;
return -1;
}
else if(stream->error != NGHTTP2_NO_ERROR) {
@@ -1560,11 +1537,6 @@ static ssize_t http2_handle_stream_close(struct Curl_cfilter *cf,
*err = CURLE_HTTP2_STREAM;
return -1;
}
- else if(stream->reset) {
- failf(data, "HTTP/2 stream %u was reset", stream->id);
- *err = stream->bodystarted? CURLE_PARTIAL_FILE : CURLE_RECV_ERROR;
- return -1;
- }
if(!stream->bodystarted) {
failf(data, "HTTP/2 stream %u was closed cleanly, but before getting "
@@ -1691,7 +1663,6 @@ static ssize_t stream_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
ssize_t nread = -1;
*err = CURLE_AGAIN;
- drained_transfer(cf, data);
if(!Curl_bufq_is_empty(&stream->recvbuf)) {
nread = Curl_bufq_read(&stream->recvbuf,
(unsigned char *)buf, len, err);
@@ -1755,8 +1726,8 @@ static CURLcode h2_progress_ingress(struct Curl_cfilter *cf,
}
nread = Curl_bufq_slurp(&ctx->inbufq, nw_in_reader, cf, &result);
- DEBUGF(LOG_CF(data, cf, "read %zd bytes nw data -> %zd, %d",
- Curl_bufq_len(&ctx->inbufq), nread, result));
+ /* DEBUGF(LOG_CF(data, cf, "read %zd bytes nw data -> %zd, %d",
+ Curl_bufq_len(&ctx->inbufq), nread, result)); */
if(nread < 0) {
if(result != CURLE_AGAIN) {
failf(data, "Failed receiving HTTP2 data");
@@ -1832,7 +1803,7 @@ static ssize_t cf_h2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
if(stream->closed) {
DEBUGF(LOG_CF(data, cf, "[h2sid=%d] closed stream, set drain",
stream->id));
- drain_this(cf, data);
+ drain_stream(cf, data, stream);
}
}
@@ -2040,9 +2011,14 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
}
if(should_close_session(ctx)) {
- DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session"));
- *err = CURLE_HTTP2;
- nwritten = -1;
+ if(stream->closed) {
+ nwritten = http2_handle_stream_close(cf, data, err);
+ }
+ else {
+ DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session"));
+ *err = CURLE_HTTP2;
+ nwritten = -1;
+ }
goto out;
}
@@ -2085,9 +2061,14 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
}
if(should_close_session(ctx)) {
- DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session"));
- *err = CURLE_HTTP2;
- nwritten = -1;
+ if(stream->closed) {
+ nwritten = http2_handle_stream_close(cf, data, err);
+ }
+ else {
+ DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session"));
+ *err = CURLE_HTTP2;
+ nwritten = -1;
+ }
goto out;
}
}
diff --git a/lib/transfer.c b/lib/transfer.c
index cb69f3365..947070956 100644
--- a/lib/transfer.c
+++ b/lib/transfer.c
@@ -753,7 +753,7 @@ static CURLcode readwrite_data(struct Curl_easy *data,
if(maxloops <= 0) {
/* we mark it as read-again-please */
- conn->cselect_bits = CURL_CSELECT_IN;
+ data->state.dselect_bits = CURL_CSELECT_IN;
*comeback = TRUE;
}
@@ -1065,40 +1065,36 @@ CURLcode Curl_readwrite(struct connectdata *conn,
CURLcode result;
struct curltime now;
int didwhat = 0;
+ int select_bits;
- curl_socket_t fd_read;
- curl_socket_t fd_write;
- int select_res = conn->cselect_bits;
- conn->cselect_bits = 0;
-
- /* only use the proper socket if the *_HOLD bit is not set simultaneously as
- then we are in rate limiting state in that transfer direction */
-
- if((k->keepon & KEEP_RECVBITS) == KEEP_RECV)
- fd_read = conn->sockfd;
- else
- fd_read = CURL_SOCKET_BAD;
-
- if((k->keepon & KEEP_SENDBITS) == KEEP_SEND)
- fd_write = conn->writesockfd;
- else
- fd_write = CURL_SOCKET_BAD;
+ if(data->state.dselect_bits) {
+ select_bits = data->state.dselect_bits;
+ data->state.dselect_bits = 0;
+ }
+ else if(conn->cselect_bits) {
+ select_bits = conn->cselect_bits;
+ conn->cselect_bits = 0;
+ }
+ else {
+ curl_socket_t fd_read;
+ curl_socket_t fd_write;
+ /* only use the proper socket if the *_HOLD bit is not set simultaneously
+ as then we are in rate limiting state in that transfer direction */
+ if((k->keepon & KEEP_RECVBITS) == KEEP_RECV)
+ fd_read = conn->sockfd;
+ else
+ fd_read = CURL_SOCKET_BAD;
-#if defined(USE_HTTP2) || defined(USE_HTTP3)
- if(data->state.drain) {
- select_res |= CURL_CSELECT_IN;
- DEBUGF(infof(data, "Curl_readwrite: forcibly told to drain data"));
if((k->keepon & KEEP_SENDBITS) == KEEP_SEND)
- select_res |= CURL_CSELECT_OUT;
- }
-#endif
+ fd_write = conn->writesockfd;
+ else
+ fd_write = CURL_SOCKET_BAD;
- if(!select_res) /* Call for select()/poll() only, if read/write/error
- status is not known. */
- select_res = Curl_socket_check(fd_read, CURL_SOCKET_BAD, fd_write, 0);
+ select_bits = Curl_socket_check(fd_read, CURL_SOCKET_BAD, fd_write, 0);
+ }
- if(select_res == CURL_CSELECT_ERR) {
+ if(select_bits == CURL_CSELECT_ERR) {
failf(data, "select/poll returned error");
result = CURLE_SEND_ERROR;
goto out;
@@ -1106,7 +1102,7 @@ CURLcode Curl_readwrite(struct connectdata *conn,
#ifdef USE_HYPER
if(conn->datastream) {
- result = conn->datastream(data, conn, &didwhat, done, select_res);
+ result = conn->datastream(data, conn, &didwhat, done, select_bits);
if(result || *done)
goto out;
}
@@ -1115,14 +1111,14 @@ CURLcode Curl_readwrite(struct connectdata *conn,
/* We go ahead and do a read if we have a readable socket or if
the stream was rewound (in which case we have data in a
buffer) */
- if((k->keepon & KEEP_RECV) && (select_res & CURL_CSELECT_IN)) {
+ if((k->keepon & KEEP_RECV) && (select_bits & CURL_CSELECT_IN)) {
result = readwrite_data(data, conn, k, &didwhat, done, comeback);
if(result || *done)
goto out;
}
/* If we still have writing to do, we check if we have a writable socket. */
- if((k->keepon & KEEP_SEND) && (select_res & CURL_CSELECT_OUT)) {
+ if((k->keepon & KEEP_SEND) && (select_bits & CURL_CSELECT_OUT)) {
/* write */
result = readwrite_upload(data, conn, &didwhat);
diff --git a/lib/urldata.h b/lib/urldata.h
index a8580bdb6..777bc36f9 100644
--- a/lib/urldata.h
+++ b/lib/urldata.h
@@ -1319,6 +1319,8 @@ struct UrlState {
char *scratch; /* huge buffer[set.buffer_size*2] for upload CRLF replacing */
long followlocation; /* redirect counter */
int requests; /* request counter: redirects + authentication retakes */
+ int dselect_bits; /* != 0 -> bitmask of socket events for this transfer
+ * overriding anything the socket may report */
#ifdef HAVE_SIGNAL
/* storage for the previous bag^H^H^HSIGPIPE signal handler :-) */
void (*prev_signal)(int sig);
@@ -1374,9 +1376,6 @@ struct UrlState {
curl_off_t infilesize; /* size of file to upload, -1 means unknown.
Copied from set.filesize at start of operation */
#if defined(USE_HTTP2) || defined(USE_HTTP3)
- size_t drain; /* Increased when this stream has data to read, even if its
- socket is not necessarily is readable. Decreased when
- checked. */
struct Curl_data_priority priority; /* shallow copy of data->set */
#endif
diff --git a/lib/vquic/curl_msh3.c b/lib/vquic/curl_msh3.c
index 1e1a15a8f..34ea0bcf6 100644
--- a/lib/vquic/curl_msh3.c
+++ b/lib/vquic/curl_msh3.c
@@ -189,12 +189,33 @@ static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
}
}
-static void notify_drain(struct Curl_cfilter *cf,
+static void drain_stream_from_other_thread(struct Curl_easy *data,
+ struct stream_ctx *stream)
+{
+ int bits;
+
+ /* risky */
+ bits = CURL_CSELECT_IN;
+ if(stream && !stream->upload_done)
+ bits |= CURL_CSELECT_OUT;
+ if(data->state.dselect_bits != bits) {
+ data->state.dselect_bits = bits;
+ /* cannot expire from other thread */
+ }
+}
+
+static void drain_stream(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
+ struct stream_ctx *stream = H3_STREAM_CTX(data);
+ int bits;
+
(void)cf;
- if(!data->state.drain) {
- data->state.drain = 1;
+ bits = CURL_CSELECT_IN;
+ if(stream && !stream->upload_done)
+ bits |= CURL_CSELECT_OUT;
+ if(data->state.dselect_bits != bits) {
+ data->state.dselect_bits = bits;
Curl_expire(data, 0, EXPIRE_RUN_NOW);
}
}
@@ -350,7 +371,7 @@ static void MSH3_CALL msh3_header_received(MSH3_REQUEST *Request,
}
}
- data->state.drain = 1;
+ drain_stream_from_other_thread(data, stream);
msh3_lock_release(&stream->recv_lock);
}
@@ -469,7 +490,6 @@ static ssize_t recv_closed_stream(struct Curl_cfilter *cf,
nread = 0;
out:
- data->state.drain = 0;
return nread;
}
@@ -508,7 +528,6 @@ static ssize_t cf_msh3_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
if(stream->recv_error) {
failf(data, "request aborted");
- data->state.drain = 0;
*err = stream->recv_error;
goto out;
}
@@ -522,10 +541,8 @@ static ssize_t cf_msh3_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
len, nread, *err));
if(nread < 0)
goto out;
- if(!Curl_bufq_is_empty(&stream->recvbuf) ||
- stream->closed) {
- notify_drain(cf, data);
- }
+ if(stream->closed)
+ drain_stream(cf, data);
}
else if(stream->closed) {
nread = recv_closed_stream(cf, data, err);
@@ -669,15 +686,14 @@ static int cf_msh3_get_select_socks(struct Curl_cfilter *cf,
if(stream->recv_error) {
bitmap |= GETSOCK_READSOCK(0);
- notify_drain(cf, data);
+ drain_stream(cf, data);
}
else if(stream->req) {
bitmap |= GETSOCK_READSOCK(0);
- notify_drain(cf, data);
+ drain_stream(cf, data);
}
}
- DEBUGF(LOG_CF(data, cf, "select_sock %u -> %d",
- (uint32_t)data->state.drain, bitmap));
+ DEBUGF(LOG_CF(data, cf, "select_sock -> %d", bitmap));
CF_DATA_RESTORE(cf, save);
return bitmap;
}
@@ -698,6 +714,8 @@ static bool cf_msh3_data_pending(struct Curl_cfilter *cf,
Curl_bufq_len(&stream->recvbuf)));
pending = !Curl_bufq_is_empty(&stream->recvbuf);
msh3_lock_release(&stream->recv_lock);
+ if(pending)
+ drain_stream(cf, (struct Curl_easy *)data);
}
CF_DATA_RESTORE(cf, save);
diff --git a/lib/vquic/curl_ngtcp2.c b/lib/vquic/curl_ngtcp2.c
index 9c0c223b4..2f4b4cdb4 100644
--- a/lib/vquic/curl_ngtcp2.c
+++ b/lib/vquic/curl_ngtcp2.c
@@ -709,11 +709,6 @@ static void report_consumed_data(struct Curl_cfilter *cf,
consumed);
ngtcp2_conn_extend_max_offset(ctx->qconn, consumed);
}
- if(!stream->closed && data->state.drain &&
- Curl_bufq_is_empty(&stream->recvbuf)) {
- /* nothing buffered any more */
- data->state.drain = 0;
- }
}
static int cb_recv_stream_data(ngtcp2_conn *tconn, uint32_t flags,
@@ -995,12 +990,18 @@ static int cf_ngtcp2_get_select_socks(struct Curl_cfilter *cf,
return rv;
}
-static void notify_drain(struct Curl_cfilter *cf,
+static void drain_stream(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
+ struct stream_ctx *stream = H3_STREAM_CTX(data);
+ int bits;
+
(void)cf;
- if(!data->state.drain) {
- data->state.drain = 1;
+ bits = CURL_CSELECT_IN;
+ if(stream && !stream->upload_done)
+ bits |= CURL_CSELECT_OUT;
+ if(data->state.dselect_bits != bits) {
+ data->state.dselect_bits = bits;
Curl_expire(data, 0, EXPIRE_RUN_NOW);
}
}
@@ -1028,7 +1029,7 @@ static int cb_h3_stream_close(nghttp3_conn *conn, int64_t stream_id,
if(app_error_code == NGHTTP3_H3_INTERNAL_ERROR) {
stream->reset = TRUE;
}
- notify_drain(cf, data);
+ drain_stream(cf, data);
return 0;
}
@@ -1082,9 +1083,7 @@ static int cb_h3_recv_data(nghttp3_conn *conn, int64_t stream3_id,
(void)stream3_id;
result = write_resp_raw(cf, data, buf, buflen, TRUE);
- if(CF_DATA_CURRENT(cf) != data) {
- notify_drain(cf, data);
- }
+ drain_stream(cf, data);
return result? -1 : 0;
}
@@ -1129,9 +1128,7 @@ static int cb_h3_end_headers(nghttp3_conn *conn, int64_t stream_id,
if(stream->status_code / 100 != 1) {
stream->resp_hds_complete = TRUE;
}
- if(CF_DATA_CURRENT(cf) != data) {
- notify_drain(cf, data);
- }
+ drain_stream(cf, data);
return 0;
}
@@ -1358,7 +1355,6 @@ static ssize_t recv_closed_stream(struct Curl_cfilter *cf,
nread = 0;
out:
- data->state.drain = 0;
return nread;
}
@@ -1413,16 +1409,13 @@ static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
}
if(nread > 0) {
- if(1 || !Curl_bufq_is_empty(&stream->recvbuf)) {
- notify_drain(cf, data);
- }
+ drain_stream(cf, data);
}
else {
if(stream->closed) {
nread = recv_closed_stream(cf, data, err);
goto out;
}
- data->state.drain = FALSE;
*err = CURLE_AGAIN;
nread = -1;
}
@@ -1468,7 +1461,7 @@ static int cb_h3_acked_req_body(nghttp3_conn *conn, int64_t stream_id,
if((data->req.keepon & KEEP_SEND_HOLD) &&
(data->req.keepon & KEEP_SEND)) {
data->req.keepon &= ~KEEP_SEND_HOLD;
- notify_drain(cf, data);
+ drain_stream(cf, data);
DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] unpausing acks",
stream_id));
}
diff --git a/lib/vquic/curl_quiche.c b/lib/vquic/curl_quiche.c
index 1a6838b01..afd446b2e 100644
--- a/lib/vquic/curl_quiche.c
+++ b/lib/vquic/curl_quiche.c
@@ -299,11 +299,18 @@ static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
}
}
-static void notify_drain(struct Curl_cfilter *cf, struct Curl_easy *data)
+static void drain_stream(struct Curl_cfilter *cf,
+ struct Curl_easy *data)
{
+ struct stream_ctx *stream = H3_STREAM_CTX(data);
+ int bits;
+
(void)cf;
- if(!data->state.drain) {
- data->state.drain = 1;
+ bits = CURL_CSELECT_IN;
+ if(stream && !stream->upload_done)
+ bits |= CURL_CSELECT_OUT;
+ if(data->state.dselect_bits != bits) {
+ data->state.dselect_bits = bits;
Curl_expire(data, 0, EXPIRE_RUN_NOW);
}
}
@@ -579,9 +586,7 @@ static CURLcode cf_poll_events(struct Curl_cfilter *cf,
}
else {
result = h3_process_event(cf, sdata, stream3_id, ev);
- if(sdata != data) {
- notify_drain(cf, sdata);
- }
+ drain_stream(cf, sdata);
if(result) {
DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"] error processing event %s "
"for [h3sid=%"PRId64"] -> %d",
@@ -848,15 +853,20 @@ static ssize_t cf_quiche_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
}
if(nread > 0) {
- data->state.drain = (!Curl_bufq_is_empty(&stream->recvbuf) ||
- stream->closed);
+ if(stream->closed)
+ drain_stream(cf, data);
}
else {
- data->state.drain = FALSE;
if(stream->closed) {
nread = recv_closed_stream(cf, data, err);
goto out;
}
+ else if(quiche_conn_is_draining(ctx->qconn)) {
+ failf(data, "QUIC connection is draining");
+ *err = CURLE_HTTP3;
+ nread = -1;
+ goto out;
+ }
*err = CURLE_AGAIN;
nread = -1;
}
@@ -1065,24 +1075,9 @@ static bool stream_is_writeable(struct Curl_cfilter *cf,
{
struct cf_quiche_ctx *ctx = cf->ctx;
struct stream_ctx *stream = H3_STREAM_CTX(data);
- quiche_stream_iter *qiter;
- bool is_writable = FALSE;
- if(!stream)
- return FALSE;
- /* surely, there must be a better way */
- qiter = quiche_conn_writable(ctx->qconn);
- if(qiter) {
- uint64_t stream_id;
- while(quiche_stream_iter_next(qiter, &stream_id)) {
- if(stream_id == (uint64_t)stream->id) {
- is_writable = TRUE;
- break;
- }
- }
- quiche_stream_iter_free(qiter);
- }
- return is_writable;
+ return stream &&
+ quiche_conn_stream_writable(ctx->qconn, (uint64_t)stream->id, 1);
}
static int cf_quiche_get_select_socks(struct Curl_cfilter *cf,
@@ -1152,7 +1147,8 @@ static CURLcode cf_quiche_data_event(struct Curl_cfilter *cf,
}
case CF_CTRL_DATA_IDLE:
result = cf_flush_egress(cf, data);
- DEBUGF(LOG_CF(data, cf, "data idle, flush egress -> %d", result));
+ if(result)
+ DEBUGF(LOG_CF(data, cf, "data idle, flush egress -> %d", result));
break;
default:
break;
diff --git a/lib/vquic/vquic.c b/lib/vquic/vquic.c
index 87dd1a75d..a51ebfa7d 100644
--- a/lib/vquic/vquic.c
+++ b/lib/vquic/vquic.c
@@ -398,7 +398,6 @@ static CURLcode recvmsg_packets(struct Curl_cfilter *cf,
;
if(nread == -1) {
if(SOCKERRNO == EAGAIN || SOCKERRNO == EWOULDBLOCK) {
- DEBUGF(LOG_CF(data, cf, "ingress, recvmsg -> EAGAIN"));
goto out;
}
if(!cf->connected && SOCKERRNO == ECONNREFUSED) {
diff --git a/tests/http/test_05_errors.py b/tests/http/test_05_errors.py
index 587ba33c4..219faf3cc 100644
--- a/tests/http/test_05_errors.py
+++ b/tests/http/test_05_errors.py
@@ -89,6 +89,6 @@ class TestErrors:
assert len(r.stats) == count, f'did not get all stats: {r}'
invalid_stats = []
for idx, s in enumerate(r.stats):
- if 'exitcode' not in s or s['exitcode'] not in [18, 56, 92, 95]:
+ if 'exitcode' not in s or s['exitcode'] not in [18, 55, 56, 92, 95]:
invalid_stats.append(f'request {idx} exit with {s["exitcode"]}\n{s}')
assert len(invalid_stats) == 0, f'failed: {invalid_stats}'