From 5651a36d1ae46db61a31771a8d4d6dcf2a510856 Mon Sep 17 00:00:00 2001 From: Stefan Eissing Date: Mon, 30 Jan 2023 16:03:00 +0100 Subject: cf-socket: improvements in socket I/O handling - Curl_write_plain/Curl_read_plain have been eliminated. Last code use now uses Curl_conn_send/recv so that requests use conn->send/revc callbacks which defaults to cfilters use. - Curl_recv_plain/Curl_send_plain have been internalized in cf-socket.c. - USE_RECV_BEFORE_SEND_WORKAROUND (active on Windows) has been moved into cf-socket.c. The pre_recv buffer is held at the socket filter context. `postponed_data` structures have been removed from `connectdata`. - the hanger in HTTP/2 request handling was a result of read buffering on all sends and the multi handling is not prepared for this. The following happens: - multi preforms on a HTTP/2 easy handle - h2 reads and processes data - this leads to a send of h2 data - which receives and buffers before the send - h2 returns - multi selects on the socket, but no data arrives (its in the buffer already) the workaround now receives data in a loop as long as there is something in the buffer. The real fix would be for multi to change, so that `data_pending` is evaluated before deciding to wait on the socket. io_buffer, optional, in cf-socket.c, http/2 sets state.drain if lower filter have pending data. This io_buffer is only available/used when the -DUSE_RECV_BEFORE_SEND_WORKAROUND is active, e.g. on Windows configurations. It also maintains the original checks on protocol handler being HTTP and conn->send/recv not being replaced. The HTTP/2 (nghttp2) cfilter now sets data->state.drain when it finds out that the "lower" filter chain has still pending data at the end of its IO operation. This prevents the processing from becoming stalled. Closes #10280 --- lib/cf-socket.c | 192 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 188 insertions(+), 4 deletions(-) (limited to 'lib/cf-socket.c') diff --git a/lib/cf-socket.c b/lib/cf-socket.c index 98565aed4..d8b07d309 100644 --- a/lib/cf-socket.c +++ b/lib/cf-socket.c @@ -742,11 +742,29 @@ CURLcode Curl_socket_connect_result(struct Curl_easy *data, } } +#ifdef USE_RECV_BEFORE_SEND_WORKAROUND +struct io_buffer { + char *bufr; + size_t allc; /* size of the current allocation */ + size_t head; /* bufr index for next read */ + size_t tail; /* bufr index for next write */ +}; + +static void io_buffer_reset(struct io_buffer *iob) +{ + if(iob->bufr) + free(iob->bufr); + memset(iob, 0, sizeof(*iob)); +} +#endif /* USE_RECV_BEFORE_SEND_WORKAROUND */ struct cf_socket_ctx { int transport; struct Curl_sockaddr_ex addr; /* address to connect to */ curl_socket_t sock; /* current attempt socket */ +#ifdef USE_RECV_BEFORE_SEND_WORKAROUND + struct io_buffer recv_buffer; +#endif char r_ip[MAX_IPADR_LEN]; /* remote IP as string */ int r_port; /* remote port number */ char l_ip[MAX_IPADR_LEN]; /* local IP as string */ @@ -783,6 +801,9 @@ static void cf_socket_close(struct Curl_cfilter *cf, struct Curl_easy *data) DEBUGF(LOG_CF(data, cf, "cf_socket_close(%d) local", (int)ctx->sock)); sclose(ctx->sock); } +#ifdef USE_RECV_BEFORE_SEND_WORKAROUND + io_buffer_reset(&ctx->recv_buffer); +#endif ctx->sock = CURL_SOCKET_BAD; ctx->active = FALSE; } @@ -1112,12 +1133,88 @@ static int cf_socket_get_select_socks(struct Curl_cfilter *cf, return rc; } +#ifdef USE_RECV_BEFORE_SEND_WORKAROUND + +static CURLcode pre_receive_plain(struct Curl_cfilter *cf, + struct Curl_easy *data) +{ + struct cf_socket_ctx *ctx = cf->ctx; + struct io_buffer * const iob = &ctx->recv_buffer; + + /* WinSock will destroy unread received data if send() is + failed. + To avoid lossage of received data, recv() must be + performed before every send() if any incoming data is + available. However, skip this, if buffer is already full. */ + if((cf->conn->handler->protocol&PROTO_FAMILY_HTTP) != 0 && + cf->conn->recv[cf->sockindex] == Curl_conn_recv && + (!iob->bufr || (iob->allc > iob->tail))) { + const int readymask = Curl_socket_check(ctx->sock, CURL_SOCKET_BAD, + CURL_SOCKET_BAD, 0); + if(readymask != -1 && (readymask & CURL_CSELECT_IN) != 0) { + size_t bytestorecv = iob->allc - iob->tail; + ssize_t nread; + /* Have some incoming data */ + if(!iob->bufr) { + /* Use buffer double default size for intermediate buffer */ + iob->allc = 2 * data->set.buffer_size; + iob->bufr = malloc(iob->allc); + if(!iob->bufr) + return CURLE_OUT_OF_MEMORY; + iob->tail = 0; + iob->head = 0; + bytestorecv = iob->allc; + } + + nread = sread(ctx->sock, iob->bufr + iob->tail, bytestorecv); + if(nread > 0) + iob->tail += (size_t)nread; + } + } + return CURLE_OK; +} + +static ssize_t get_pre_recved(struct Curl_cfilter *cf, char *buf, size_t len) +{ + struct cf_socket_ctx *ctx = cf->ctx; + struct io_buffer * const iob = &ctx->recv_buffer; + size_t copysize; + if(!iob->bufr) + return 0; + + DEBUGASSERT(iob->allc > 0); + DEBUGASSERT(iob->tail <= iob->allc); + DEBUGASSERT(iob->head <= iob->tail); + /* Check and process data that already received and storied in internal + intermediate buffer */ + if(iob->tail > iob->head) { + copysize = CURLMIN(len, iob->tail - iob->head); + memcpy(buf, iob->bufr + iob->head, copysize); + iob->head += copysize; + } + else + copysize = 0; /* buffer was allocated, but nothing was received */ + + /* Free intermediate buffer if it has no unprocessed data */ + if(iob->head == iob->tail) + io_buffer_reset(iob); + + return (ssize_t)copysize; +} +#endif /* USE_RECV_BEFORE_SEND_WORKAROUND */ + static bool cf_socket_data_pending(struct Curl_cfilter *cf, const struct Curl_easy *data) { struct cf_socket_ctx *ctx = cf->ctx; int readable; +#ifdef USE_RECV_BEFORE_SEND_WORKAROUND + if(ctx->recv_buffer.bufr && ctx->recv_buffer.allc && + ctx->recv_buffer.tail > ctx->recv_buffer.head) + return TRUE; +#endif + (void)data; readable = SOCKET_READABLE(ctx->sock, 0); return (readable > 0 && (readable & CURL_CSELECT_IN)); @@ -1126,10 +1223,59 @@ static bool cf_socket_data_pending(struct Curl_cfilter *cf, static ssize_t cf_socket_send(struct Curl_cfilter *cf, struct Curl_easy *data, const void *buf, size_t len, CURLcode *err) { + struct cf_socket_ctx *ctx = cf->ctx; ssize_t nwritten; - DEBUGASSERT(data->conn == cf->conn); - nwritten = Curl_send_plain(data, cf->sockindex, buf, len, err); + *err = CURLE_OK; +#ifdef USE_RECV_BEFORE_SEND_WORKAROUND + /* WinSock will destroy unread received data if send() is + failed. + To avoid lossage of received data, recv() must be + performed before every send() if any incoming data is + available. */ + if(pre_receive_plain(cf, data)) { + *err = CURLE_OUT_OF_MEMORY; + return -1; + } +#endif + +#if defined(MSG_FASTOPEN) && !defined(TCP_FASTOPEN_CONNECT) /* Linux */ + if(cf->conn->bits.tcp_fastopen) { + bytes_written = sendto(ctx->sock, buf, len, MSG_FASTOPEN, + &cf->conn->remote_addr->sa_addr, + cf->conn->remote_addr->addrlen); + cf->conn->bits.tcp_fastopen = FALSE; + } + else +#endif + nwritten = swrite(ctx->sock, buf, len); + + if(-1 == nwritten) { + int sockerr = SOCKERRNO; + + if( +#ifdef WSAEWOULDBLOCK + /* This is how Windows does it */ + (WSAEWOULDBLOCK == sockerr) +#else + /* errno may be EWOULDBLOCK or on some systems EAGAIN when it returned + due to its inability to send off data without blocking. We therefore + treat both error codes the same here */ + (EWOULDBLOCK == sockerr) || (EAGAIN == sockerr) || (EINTR == sockerr) || + (EINPROGRESS == sockerr) +#endif + ) { + /* this is just a case of EWOULDBLOCK */ + *err = CURLE_AGAIN; + } + else { + char buffer[STRERROR_LEN]; + failf(data, "Send failure: %s", + Curl_strerror(sockerr, buffer, sizeof(buffer))); + data->state.os_errno = sockerr; + *err = CURLE_SEND_ERROR; + } + } DEBUGF(LOG_CF(data, cf, "send(len=%zu) -> %d, err=%d", len, (int)nwritten, *err)); return nwritten; @@ -1138,10 +1284,48 @@ static ssize_t cf_socket_send(struct Curl_cfilter *cf, struct Curl_easy *data, static ssize_t cf_socket_recv(struct Curl_cfilter *cf, struct Curl_easy *data, char *buf, size_t len, CURLcode *err) { + struct cf_socket_ctx *ctx = cf->ctx; ssize_t nread; - DEBUGASSERT(data->conn == cf->conn); - nread = Curl_recv_plain(data, cf->sockindex, buf, len, err); + *err = CURLE_OK; + +#ifdef USE_RECV_BEFORE_SEND_WORKAROUND + /* Check and return data that already received and storied in internal + intermediate buffer */ + nread = get_pre_recved(cf, buf, len); + if(nread > 0) { + *err = CURLE_OK; + return nread; + } +#endif + + nread = sread(ctx->sock, buf, len); + + if(-1 == nread) { + int sockerr = SOCKERRNO; + + if( +#ifdef WSAEWOULDBLOCK + /* This is how Windows does it */ + (WSAEWOULDBLOCK == sockerr) +#else + /* errno may be EWOULDBLOCK or on some systems EAGAIN when it returned + due to its inability to send off data without blocking. We therefore + treat both error codes the same here */ + (EWOULDBLOCK == sockerr) || (EAGAIN == sockerr) || (EINTR == sockerr) +#endif + ) { + /* this is just a case of EWOULDBLOCK */ + *err = CURLE_AGAIN; + } + else { + char buffer[STRERROR_LEN]; + failf(data, "Recv failure: %s", + Curl_strerror(sockerr, buffer, sizeof(buffer))); + data->state.os_errno = sockerr; + *err = CURLE_RECV_ERROR; + } + } DEBUGF(LOG_CF(data, cf, "recv(len=%zu) -> %d, err=%d", len, (int)nread, *err)); return nread; -- cgit v1.2.1