diff options
author | Stefan Eissing <stefan@eissing.org> | 2023-04-21 12:04:46 +0200 |
---|---|---|
committer | Daniel Stenberg <daniel@haxx.se> | 2023-04-25 17:49:28 +0200 |
commit | cab2d56ea52b3724495b3dbc5bfef3343d4ecadc (patch) | |
tree | 7aea2eecec9df647106ddb351b6e62d8d6c907f0 /lib | |
parent | a97e4eb95f86adb6043a3388250f34841440981e (diff) | |
download | curl-cab2d56ea52b3724495b3dbc5bfef3343d4ecadc.tar.gz |
h2/h3: replace `state.drain` counter with `state.dselect_bits`
- `drain` was used by http/2 and http/3 implementations to indicate
that the transfer requires send/recv independant from its socket
poll state. Intended as a counter, it was used as bool flag only.
- a similar mechanism exists on `connectdata->cselect_bits` where
specific protocols can indicate something similar, only for the
whole connection.
- `cselect_bits` are cleard in transfer.c on use and, importantly,
also set when the transfer loop expended its `maxloops` tries.
`drain` was not cleared by transfer and the http2/3 implementations
had to take care of that.
- `dselect_bits` is cleared *and* set by the transfer loop. http2/3
does no longer clear it, only set when new events happen.
This change unifies the handling of socket poll overrides, extending
`cselect_bits` by a easy handle specific value and a common treatment in
transfers.
Closes #11005
Diffstat (limited to 'lib')
-rw-r--r-- | lib/http2.c | 121 | ||||
-rw-r--r-- | lib/transfer.c | 60 | ||||
-rw-r--r-- | lib/urldata.h | 5 | ||||
-rw-r--r-- | lib/vquic/curl_msh3.c | 46 | ||||
-rw-r--r-- | lib/vquic/curl_ngtcp2.c | 35 | ||||
-rw-r--r-- | lib/vquic/curl_quiche.c | 50 | ||||
-rw-r--r-- | lib/vquic/vquic.c | 1 |
7 files changed, 150 insertions, 168 deletions
diff --git a/lib/http2.c b/lib/http2.c index 4940918f8..9da3cae17 100644 --- a/lib/http2.c +++ b/lib/http2.c @@ -154,34 +154,6 @@ static void cf_h2_ctx_free(struct cf_h2_ctx *ctx) } } -/* - * This specific transfer on this connection has been "drained". - */ -static void drained_transfer(struct Curl_cfilter *cf, - struct Curl_easy *data) -{ - if(data->state.drain) { - struct cf_h2_ctx *ctx = cf->ctx; - DEBUGASSERT(ctx->drain_total > 0); - ctx->drain_total--; - data->state.drain = 0; - } -} - -/* - * Mark this transfer to get "drained". - */ -static void drain_this(struct Curl_cfilter *cf, - struct Curl_easy *data) -{ - if(!data->state.drain) { - struct cf_h2_ctx *ctx = cf->ctx; - data->state.drain = 1; - ctx->drain_total++; - DEBUGASSERT(ctx->drain_total > 0); - } -} - /** * All about the H3 internals of a stream */ @@ -213,6 +185,25 @@ struct stream_ctx { #define H2_STREAM_ID(d) (H2_STREAM_CTX(d)? \ H2_STREAM_CTX(d)->id : -2) +/* + * Mark this transfer to get "drained". + */ +static void drain_stream(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct stream_ctx *stream) +{ + int bits; + + (void)cf; + bits = CURL_CSELECT_IN; + if(stream->upload_left) + bits |= CURL_CSELECT_OUT; + if(data->state.dselect_bits != bits) { + data->state.dselect_bits = bits; + Curl_expire(data, 0, EXPIRE_RUN_NOW); + } +} + static CURLcode http2_data_setup(struct Curl_cfilter *cf, struct Curl_easy *data, struct stream_ctx **pstream) @@ -276,8 +267,6 @@ static void http2_data_done(struct Curl_cfilter *cf, (void)nghttp2_session_send(ctx->h2); } - drained_transfer(cf, data); - /* -1 means unassigned and 0 means cleared */ if(nghttp2_session_get_stream_user_data(ctx->h2, stream->id)) { int rv = nghttp2_session_set_stream_user_data(ctx->h2, @@ -515,8 +504,6 @@ static int h2_process_pending_input(struct Curl_cfilter *cf, while(Curl_bufq_peek(&ctx->inbufq, &buf, &blen)) { rv = nghttp2_session_mem_recv(ctx->h2, (const uint8_t *)buf, blen); - DEBUGF(LOG_CF(data, cf, - "fed %zu bytes from nw to nghttp2 -> %zd", blen, rv)); if(rv < 0) { failf(data, "process_pending_input: nghttp2_session_mem_recv() returned " @@ -526,7 +513,6 @@ static int h2_process_pending_input(struct Curl_cfilter *cf, } Curl_bufq_skip(&ctx->inbufq, (size_t)rv); if(Curl_bufq_is_empty(&ctx->inbufq)) { - DEBUGF(LOG_CF(data, cf, "all data in connection buffer processed")); break; } else { @@ -975,8 +961,7 @@ static CURLcode on_stream_frame(struct Curl_cfilter *cf, } } if(frame->hd.flags & NGHTTP2_FLAG_END_STREAM) { - drain_this(cf, data); - Curl_expire(data, 0, EXPIRE_RUN_NOW); + drain_stream(cf, data, stream); } break; case NGHTTP2_HEADERS: @@ -1005,10 +990,7 @@ static CURLcode on_stream_frame(struct Curl_cfilter *cf, DEBUGF(LOG_CF(data, cf, "[h2sid=%d] %zu header bytes", stream_id, Curl_bufq_len(&stream->recvbuf))); - if(CF_DATA_CURRENT(cf) != data) { - drain_this(cf, data); - Curl_expire(data, 0, EXPIRE_RUN_NOW); - } + drain_stream(cf, data, stream); break; case NGHTTP2_PUSH_PROMISE: DEBUGF(LOG_CF(data, cf, "[h2sid=%d] recv PUSH_PROMISE", stream_id)); @@ -1031,16 +1013,14 @@ static CURLcode on_stream_frame(struct Curl_cfilter *cf, DEBUGF(LOG_CF(data, cf, "[h2sid=%d] recv RST", stream_id)); stream->closed = TRUE; stream->reset = TRUE; - drain_this(cf, data); - Curl_expire(data, 0, EXPIRE_RUN_NOW); + drain_stream(cf, data, stream); break; case NGHTTP2_WINDOW_UPDATE: DEBUGF(LOG_CF(data, cf, "[h2sid=%d] recv WINDOW_UPDATE", stream_id)); if((data->req.keepon & KEEP_SEND_HOLD) && (data->req.keepon & KEEP_SEND)) { data->req.keepon &= ~KEEP_SEND_HOLD; - drain_this(cf, data); - Curl_expire(data, 0, EXPIRE_RUN_NOW); + drain_stream(cf, data, stream); DEBUGF(LOG_CF(data, cf, "[h2sid=%d] un-holding after win update", stream_id)); } @@ -1156,10 +1136,7 @@ static int on_data_chunk_recv(nghttp2_session *session, uint8_t flags, } /* if we receive data for another handle, wake that up */ - if(CF_DATA_CURRENT(cf) != data_s) { - drain_this(cf, data_s); - Curl_expire(data_s, 0, EXPIRE_RUN_NOW); - } + drain_stream(cf, data_s, stream); DEBUGASSERT((size_t)nwritten == len); DEBUGF(LOG_CF(data_s, cf, "[h2sid=%d] %zd/%zu DATA recvd, " @@ -1196,10 +1173,7 @@ static int on_stream_close(nghttp2_session *session, int32_t stream_id, if(stream->error) stream->reset = TRUE; - if(CF_DATA_CURRENT(cf) != data_s) { - drain_this(cf, data_s); - Curl_expire(data_s, 0, EXPIRE_RUN_NOW); - } + drain_stream(cf, data_s, stream); /* remove `data_s` from the nghttp2 stream */ rv = nghttp2_session_set_stream_user_data(session, stream_id, 0); @@ -1529,7 +1503,7 @@ static CURLcode http2_data_done_send(struct Curl_cfilter *cf, /* resume sending here to trigger the callback to get called again so that it can signal EOF to nghttp2 */ (void)nghttp2_session_resume_data(ctx->h2, stream->id); - drain_this(cf, data); + drain_stream(cf, data, stream); } out: @@ -1543,14 +1517,17 @@ static ssize_t http2_handle_stream_close(struct Curl_cfilter *cf, struct stream_ctx *stream = H2_STREAM_CTX(data); ssize_t rv = 0; - drained_transfer(cf, data); - if(stream->error == NGHTTP2_REFUSED_STREAM) { DEBUGF(LOG_CF(data, cf, "[h2sid=%d] REFUSED_STREAM, try again on a new " "connection", stream->id)); connclose(cf->conn, "REFUSED_STREAM"); /* don't use this anymore */ data->state.refused_stream = TRUE; - *err = CURLE_RECV_ERROR; /* trigger Curl_retry_request() later */ + *err = CURLE_SEND_ERROR; /* trigger Curl_retry_request() later */ + return -1; + } + else if(stream->reset) { + failf(data, "HTTP/2 stream %u was reset", stream->id); + *err = stream->bodystarted? CURLE_PARTIAL_FILE : CURLE_RECV_ERROR; return -1; } else if(stream->error != NGHTTP2_NO_ERROR) { @@ -1560,11 +1537,6 @@ static ssize_t http2_handle_stream_close(struct Curl_cfilter *cf, *err = CURLE_HTTP2_STREAM; return -1; } - else if(stream->reset) { - failf(data, "HTTP/2 stream %u was reset", stream->id); - *err = stream->bodystarted? CURLE_PARTIAL_FILE : CURLE_RECV_ERROR; - return -1; - } if(!stream->bodystarted) { failf(data, "HTTP/2 stream %u was closed cleanly, but before getting " @@ -1691,7 +1663,6 @@ static ssize_t stream_recv(struct Curl_cfilter *cf, struct Curl_easy *data, ssize_t nread = -1; *err = CURLE_AGAIN; - drained_transfer(cf, data); if(!Curl_bufq_is_empty(&stream->recvbuf)) { nread = Curl_bufq_read(&stream->recvbuf, (unsigned char *)buf, len, err); @@ -1755,8 +1726,8 @@ static CURLcode h2_progress_ingress(struct Curl_cfilter *cf, } nread = Curl_bufq_slurp(&ctx->inbufq, nw_in_reader, cf, &result); - DEBUGF(LOG_CF(data, cf, "read %zd bytes nw data -> %zd, %d", - Curl_bufq_len(&ctx->inbufq), nread, result)); + /* DEBUGF(LOG_CF(data, cf, "read %zd bytes nw data -> %zd, %d", + Curl_bufq_len(&ctx->inbufq), nread, result)); */ if(nread < 0) { if(result != CURLE_AGAIN) { failf(data, "Failed receiving HTTP2 data"); @@ -1832,7 +1803,7 @@ static ssize_t cf_h2_recv(struct Curl_cfilter *cf, struct Curl_easy *data, if(stream->closed) { DEBUGF(LOG_CF(data, cf, "[h2sid=%d] closed stream, set drain", stream->id)); - drain_this(cf, data); + drain_stream(cf, data, stream); } } @@ -2040,9 +2011,14 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data, } if(should_close_session(ctx)) { - DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session")); - *err = CURLE_HTTP2; - nwritten = -1; + if(stream->closed) { + nwritten = http2_handle_stream_close(cf, data, err); + } + else { + DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session")); + *err = CURLE_HTTP2; + nwritten = -1; + } goto out; } @@ -2085,9 +2061,14 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data, } if(should_close_session(ctx)) { - DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session")); - *err = CURLE_HTTP2; - nwritten = -1; + if(stream->closed) { + nwritten = http2_handle_stream_close(cf, data, err); + } + else { + DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session")); + *err = CURLE_HTTP2; + nwritten = -1; + } goto out; } } diff --git a/lib/transfer.c b/lib/transfer.c index cb69f3365..947070956 100644 --- a/lib/transfer.c +++ b/lib/transfer.c @@ -753,7 +753,7 @@ static CURLcode readwrite_data(struct Curl_easy *data, if(maxloops <= 0) { /* we mark it as read-again-please */ - conn->cselect_bits = CURL_CSELECT_IN; + data->state.dselect_bits = CURL_CSELECT_IN; *comeback = TRUE; } @@ -1065,40 +1065,36 @@ CURLcode Curl_readwrite(struct connectdata *conn, CURLcode result; struct curltime now; int didwhat = 0; + int select_bits; - curl_socket_t fd_read; - curl_socket_t fd_write; - int select_res = conn->cselect_bits; - conn->cselect_bits = 0; - - /* only use the proper socket if the *_HOLD bit is not set simultaneously as - then we are in rate limiting state in that transfer direction */ - - if((k->keepon & KEEP_RECVBITS) == KEEP_RECV) - fd_read = conn->sockfd; - else - fd_read = CURL_SOCKET_BAD; - - if((k->keepon & KEEP_SENDBITS) == KEEP_SEND) - fd_write = conn->writesockfd; - else - fd_write = CURL_SOCKET_BAD; + if(data->state.dselect_bits) { + select_bits = data->state.dselect_bits; + data->state.dselect_bits = 0; + } + else if(conn->cselect_bits) { + select_bits = conn->cselect_bits; + conn->cselect_bits = 0; + } + else { + curl_socket_t fd_read; + curl_socket_t fd_write; + /* only use the proper socket if the *_HOLD bit is not set simultaneously + as then we are in rate limiting state in that transfer direction */ + if((k->keepon & KEEP_RECVBITS) == KEEP_RECV) + fd_read = conn->sockfd; + else + fd_read = CURL_SOCKET_BAD; -#if defined(USE_HTTP2) || defined(USE_HTTP3) - if(data->state.drain) { - select_res |= CURL_CSELECT_IN; - DEBUGF(infof(data, "Curl_readwrite: forcibly told to drain data")); if((k->keepon & KEEP_SENDBITS) == KEEP_SEND) - select_res |= CURL_CSELECT_OUT; - } -#endif + fd_write = conn->writesockfd; + else + fd_write = CURL_SOCKET_BAD; - if(!select_res) /* Call for select()/poll() only, if read/write/error - status is not known. */ - select_res = Curl_socket_check(fd_read, CURL_SOCKET_BAD, fd_write, 0); + select_bits = Curl_socket_check(fd_read, CURL_SOCKET_BAD, fd_write, 0); + } - if(select_res == CURL_CSELECT_ERR) { + if(select_bits == CURL_CSELECT_ERR) { failf(data, "select/poll returned error"); result = CURLE_SEND_ERROR; goto out; @@ -1106,7 +1102,7 @@ CURLcode Curl_readwrite(struct connectdata *conn, #ifdef USE_HYPER if(conn->datastream) { - result = conn->datastream(data, conn, &didwhat, done, select_res); + result = conn->datastream(data, conn, &didwhat, done, select_bits); if(result || *done) goto out; } @@ -1115,14 +1111,14 @@ CURLcode Curl_readwrite(struct connectdata *conn, /* We go ahead and do a read if we have a readable socket or if the stream was rewound (in which case we have data in a buffer) */ - if((k->keepon & KEEP_RECV) && (select_res & CURL_CSELECT_IN)) { + if((k->keepon & KEEP_RECV) && (select_bits & CURL_CSELECT_IN)) { result = readwrite_data(data, conn, k, &didwhat, done, comeback); if(result || *done) goto out; } /* If we still have writing to do, we check if we have a writable socket. */ - if((k->keepon & KEEP_SEND) && (select_res & CURL_CSELECT_OUT)) { + if((k->keepon & KEEP_SEND) && (select_bits & CURL_CSELECT_OUT)) { /* write */ result = readwrite_upload(data, conn, &didwhat); diff --git a/lib/urldata.h b/lib/urldata.h index a8580bdb6..777bc36f9 100644 --- a/lib/urldata.h +++ b/lib/urldata.h @@ -1319,6 +1319,8 @@ struct UrlState { char *scratch; /* huge buffer[set.buffer_size*2] for upload CRLF replacing */ long followlocation; /* redirect counter */ int requests; /* request counter: redirects + authentication retakes */ + int dselect_bits; /* != 0 -> bitmask of socket events for this transfer + * overriding anything the socket may report */ #ifdef HAVE_SIGNAL /* storage for the previous bag^H^H^HSIGPIPE signal handler :-) */ void (*prev_signal)(int sig); @@ -1374,9 +1376,6 @@ struct UrlState { curl_off_t infilesize; /* size of file to upload, -1 means unknown. Copied from set.filesize at start of operation */ #if defined(USE_HTTP2) || defined(USE_HTTP3) - size_t drain; /* Increased when this stream has data to read, even if its - socket is not necessarily is readable. Decreased when - checked. */ struct Curl_data_priority priority; /* shallow copy of data->set */ #endif diff --git a/lib/vquic/curl_msh3.c b/lib/vquic/curl_msh3.c index 1e1a15a8f..34ea0bcf6 100644 --- a/lib/vquic/curl_msh3.c +++ b/lib/vquic/curl_msh3.c @@ -189,12 +189,33 @@ static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data) } } -static void notify_drain(struct Curl_cfilter *cf, +static void drain_stream_from_other_thread(struct Curl_easy *data, + struct stream_ctx *stream) +{ + int bits; + + /* risky */ + bits = CURL_CSELECT_IN; + if(stream && !stream->upload_done) + bits |= CURL_CSELECT_OUT; + if(data->state.dselect_bits != bits) { + data->state.dselect_bits = bits; + /* cannot expire from other thread */ + } +} + +static void drain_stream(struct Curl_cfilter *cf, struct Curl_easy *data) { + struct stream_ctx *stream = H3_STREAM_CTX(data); + int bits; + (void)cf; - if(!data->state.drain) { - data->state.drain = 1; + bits = CURL_CSELECT_IN; + if(stream && !stream->upload_done) + bits |= CURL_CSELECT_OUT; + if(data->state.dselect_bits != bits) { + data->state.dselect_bits = bits; Curl_expire(data, 0, EXPIRE_RUN_NOW); } } @@ -350,7 +371,7 @@ static void MSH3_CALL msh3_header_received(MSH3_REQUEST *Request, } } - data->state.drain = 1; + drain_stream_from_other_thread(data, stream); msh3_lock_release(&stream->recv_lock); } @@ -469,7 +490,6 @@ static ssize_t recv_closed_stream(struct Curl_cfilter *cf, nread = 0; out: - data->state.drain = 0; return nread; } @@ -508,7 +528,6 @@ static ssize_t cf_msh3_recv(struct Curl_cfilter *cf, struct Curl_easy *data, if(stream->recv_error) { failf(data, "request aborted"); - data->state.drain = 0; *err = stream->recv_error; goto out; } @@ -522,10 +541,8 @@ static ssize_t cf_msh3_recv(struct Curl_cfilter *cf, struct Curl_easy *data, len, nread, *err)); if(nread < 0) goto out; - if(!Curl_bufq_is_empty(&stream->recvbuf) || - stream->closed) { - notify_drain(cf, data); - } + if(stream->closed) + drain_stream(cf, data); } else if(stream->closed) { nread = recv_closed_stream(cf, data, err); @@ -669,15 +686,14 @@ static int cf_msh3_get_select_socks(struct Curl_cfilter *cf, if(stream->recv_error) { bitmap |= GETSOCK_READSOCK(0); - notify_drain(cf, data); + drain_stream(cf, data); } else if(stream->req) { bitmap |= GETSOCK_READSOCK(0); - notify_drain(cf, data); + drain_stream(cf, data); } } - DEBUGF(LOG_CF(data, cf, "select_sock %u -> %d", - (uint32_t)data->state.drain, bitmap)); + DEBUGF(LOG_CF(data, cf, "select_sock -> %d", bitmap)); CF_DATA_RESTORE(cf, save); return bitmap; } @@ -698,6 +714,8 @@ static bool cf_msh3_data_pending(struct Curl_cfilter *cf, Curl_bufq_len(&stream->recvbuf))); pending = !Curl_bufq_is_empty(&stream->recvbuf); msh3_lock_release(&stream->recv_lock); + if(pending) + drain_stream(cf, (struct Curl_easy *)data); } CF_DATA_RESTORE(cf, save); diff --git a/lib/vquic/curl_ngtcp2.c b/lib/vquic/curl_ngtcp2.c index 9c0c223b4..2f4b4cdb4 100644 --- a/lib/vquic/curl_ngtcp2.c +++ b/lib/vquic/curl_ngtcp2.c @@ -709,11 +709,6 @@ static void report_consumed_data(struct Curl_cfilter *cf, consumed); ngtcp2_conn_extend_max_offset(ctx->qconn, consumed); } - if(!stream->closed && data->state.drain && - Curl_bufq_is_empty(&stream->recvbuf)) { - /* nothing buffered any more */ - data->state.drain = 0; - } } static int cb_recv_stream_data(ngtcp2_conn *tconn, uint32_t flags, @@ -995,12 +990,18 @@ static int cf_ngtcp2_get_select_socks(struct Curl_cfilter *cf, return rv; } -static void notify_drain(struct Curl_cfilter *cf, +static void drain_stream(struct Curl_cfilter *cf, struct Curl_easy *data) { + struct stream_ctx *stream = H3_STREAM_CTX(data); + int bits; + (void)cf; - if(!data->state.drain) { - data->state.drain = 1; + bits = CURL_CSELECT_IN; + if(stream && !stream->upload_done) + bits |= CURL_CSELECT_OUT; + if(data->state.dselect_bits != bits) { + data->state.dselect_bits = bits; Curl_expire(data, 0, EXPIRE_RUN_NOW); } } @@ -1028,7 +1029,7 @@ static int cb_h3_stream_close(nghttp3_conn *conn, int64_t stream_id, if(app_error_code == NGHTTP3_H3_INTERNAL_ERROR) { stream->reset = TRUE; } - notify_drain(cf, data); + drain_stream(cf, data); return 0; } @@ -1082,9 +1083,7 @@ static int cb_h3_recv_data(nghttp3_conn *conn, int64_t stream3_id, (void)stream3_id; result = write_resp_raw(cf, data, buf, buflen, TRUE); - if(CF_DATA_CURRENT(cf) != data) { - notify_drain(cf, data); - } + drain_stream(cf, data); return result? -1 : 0; } @@ -1129,9 +1128,7 @@ static int cb_h3_end_headers(nghttp3_conn *conn, int64_t stream_id, if(stream->status_code / 100 != 1) { stream->resp_hds_complete = TRUE; } - if(CF_DATA_CURRENT(cf) != data) { - notify_drain(cf, data); - } + drain_stream(cf, data); return 0; } @@ -1358,7 +1355,6 @@ static ssize_t recv_closed_stream(struct Curl_cfilter *cf, nread = 0; out: - data->state.drain = 0; return nread; } @@ -1413,16 +1409,13 @@ static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data, } if(nread > 0) { - if(1 || !Curl_bufq_is_empty(&stream->recvbuf)) { - notify_drain(cf, data); - } + drain_stream(cf, data); } else { if(stream->closed) { nread = recv_closed_stream(cf, data, err); goto out; } - data->state.drain = FALSE; *err = CURLE_AGAIN; nread = -1; } @@ -1468,7 +1461,7 @@ static int cb_h3_acked_req_body(nghttp3_conn *conn, int64_t stream_id, if((data->req.keepon & KEEP_SEND_HOLD) && (data->req.keepon & KEEP_SEND)) { data->req.keepon &= ~KEEP_SEND_HOLD; - notify_drain(cf, data); + drain_stream(cf, data); DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] unpausing acks", stream_id)); } diff --git a/lib/vquic/curl_quiche.c b/lib/vquic/curl_quiche.c index 1a6838b01..afd446b2e 100644 --- a/lib/vquic/curl_quiche.c +++ b/lib/vquic/curl_quiche.c @@ -299,11 +299,18 @@ static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data) } } -static void notify_drain(struct Curl_cfilter *cf, struct Curl_easy *data) +static void drain_stream(struct Curl_cfilter *cf, + struct Curl_easy *data) { + struct stream_ctx *stream = H3_STREAM_CTX(data); + int bits; + (void)cf; - if(!data->state.drain) { - data->state.drain = 1; + bits = CURL_CSELECT_IN; + if(stream && !stream->upload_done) + bits |= CURL_CSELECT_OUT; + if(data->state.dselect_bits != bits) { + data->state.dselect_bits = bits; Curl_expire(data, 0, EXPIRE_RUN_NOW); } } @@ -579,9 +586,7 @@ static CURLcode cf_poll_events(struct Curl_cfilter *cf, } else { result = h3_process_event(cf, sdata, stream3_id, ev); - if(sdata != data) { - notify_drain(cf, sdata); - } + drain_stream(cf, sdata); if(result) { DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"] error processing event %s " "for [h3sid=%"PRId64"] -> %d", @@ -848,15 +853,20 @@ static ssize_t cf_quiche_recv(struct Curl_cfilter *cf, struct Curl_easy *data, } if(nread > 0) { - data->state.drain = (!Curl_bufq_is_empty(&stream->recvbuf) || - stream->closed); + if(stream->closed) + drain_stream(cf, data); } else { - data->state.drain = FALSE; if(stream->closed) { nread = recv_closed_stream(cf, data, err); goto out; } + else if(quiche_conn_is_draining(ctx->qconn)) { + failf(data, "QUIC connection is draining"); + *err = CURLE_HTTP3; + nread = -1; + goto out; + } *err = CURLE_AGAIN; nread = -1; } @@ -1065,24 +1075,9 @@ static bool stream_is_writeable(struct Curl_cfilter *cf, { struct cf_quiche_ctx *ctx = cf->ctx; struct stream_ctx *stream = H3_STREAM_CTX(data); - quiche_stream_iter *qiter; - bool is_writable = FALSE; - if(!stream) - return FALSE; - /* surely, there must be a better way */ - qiter = quiche_conn_writable(ctx->qconn); - if(qiter) { - uint64_t stream_id; - while(quiche_stream_iter_next(qiter, &stream_id)) { - if(stream_id == (uint64_t)stream->id) { - is_writable = TRUE; - break; - } - } - quiche_stream_iter_free(qiter); - } - return is_writable; + return stream && + quiche_conn_stream_writable(ctx->qconn, (uint64_t)stream->id, 1); } static int cf_quiche_get_select_socks(struct Curl_cfilter *cf, @@ -1152,7 +1147,8 @@ static CURLcode cf_quiche_data_event(struct Curl_cfilter *cf, } case CF_CTRL_DATA_IDLE: result = cf_flush_egress(cf, data); - DEBUGF(LOG_CF(data, cf, "data idle, flush egress -> %d", result)); + if(result) + DEBUGF(LOG_CF(data, cf, "data idle, flush egress -> %d", result)); break; default: break; diff --git a/lib/vquic/vquic.c b/lib/vquic/vquic.c index 87dd1a75d..a51ebfa7d 100644 --- a/lib/vquic/vquic.c +++ b/lib/vquic/vquic.c @@ -398,7 +398,6 @@ static CURLcode recvmsg_packets(struct Curl_cfilter *cf, ; if(nread == -1) { if(SOCKERRNO == EAGAIN || SOCKERRNO == EWOULDBLOCK) { - DEBUGF(LOG_CF(data, cf, "ingress, recvmsg -> EAGAIN")); goto out; } if(!cf->connected && SOCKERRNO == ECONNREFUSED) { |