summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorrcombs <rcombs@rcombs.me>2020-05-13 18:49:57 -0500
committerMarc Hoersken <info@marc-hoersken.de>2020-06-17 07:18:08 +0200
commit8bc25c590e530de87595d1bb3577f699eb1309b9 (patch)
tree908a0d45fcd00ba73688f3ecb1668ef3f9a13e7a
parent477a4e31d724a9e14f92092c3d2ec8e486b99403 (diff)
downloadcurl-8bc25c590e530de87595d1bb3577f699eb1309b9.tar.gz
multi: implement wait using winsock events
This avoids using a pair of TCP ports to provide wakeup functionality for every multi instance on Windows, where socketpair() is emulated using a TCP socket on loopback which could in turn lead to socket resource exhaustion. Reviewed-by: Gergely Nagy Reviewed-by: Marc Hörsken Closes #5397
-rw-r--r--lib/multi.c122
-rw-r--r--lib/multihandle.h4
2 files changed, 122 insertions, 4 deletions
diff --git a/lib/multi.c b/lib/multi.c
index d20b33dbc..121a44593 100644
--- a/lib/multi.c
+++ b/lib/multi.c
@@ -374,6 +374,11 @@ struct Curl_multi *Curl_multi_handle(int hashsize, /* socket hash */
multi->max_concurrent_streams = 100;
multi->ipv6_works = Curl_ipv6works(NULL);
+#ifdef USE_WINSOCK
+ multi->wsa_event = WSACreateEvent();
+ if(multi->wsa_event == WSA_INVALID_EVENT)
+ goto error;
+#else
#ifdef ENABLE_WAKEUP
if(Curl_socketpair(AF_UNIX, SOCK_STREAM, 0, multi->wakeup_pair) < 0) {
multi->wakeup_pair[0] = CURL_SOCKET_BAD;
@@ -387,6 +392,7 @@ struct Curl_multi *Curl_multi_handle(int hashsize, /* socket hash */
multi->wakeup_pair[1] = CURL_SOCKET_BAD;
}
#endif
+#endif
return multi;
@@ -1067,11 +1073,15 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi,
unsigned int i;
unsigned int nfds = 0;
unsigned int curlfds;
- bool ufds_malloc = FALSE;
long timeout_internal;
int retcode = 0;
+#ifndef USE_WINSOCK
struct pollfd a_few_on_stack[NUM_POLLS_ON_STACK];
struct pollfd *ufds = &a_few_on_stack[0];
+ bool ufds_malloc = FALSE;
+#else
+ DEBUGASSERT(multi->wsa_event != WSA_INVALID_EVENT);
+#endif
if(!GOOD_MULTI_HANDLE(multi))
return CURLM_BAD_HANDLE;
@@ -1117,11 +1127,16 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi,
nfds += extra_nfds; /* add the externally provided ones */
#ifdef ENABLE_WAKEUP
+#ifdef USE_WINSOCK
+ if(use_wakeup) {
+#else
if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
+#endif
++nfds;
}
#endif
+#ifndef USE_WINSOCK
if(nfds > NUM_POLLS_ON_STACK) {
/* 'nfds' is a 32 bit value and 'struct pollfd' is typically 8 bytes
big, so at 2^29 sockets this value might wrap. When a process gets
@@ -1132,7 +1147,9 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi,
return CURLM_OUT_OF_MEMORY;
ufds_malloc = TRUE;
}
+
nfds = 0;
+#endif
/* only do the second loop if we found descriptors in the first stage run
above */
@@ -1145,22 +1162,36 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi,
for(i = 0; i< MAX_SOCKSPEREASYHANDLE; i++) {
curl_socket_t s = CURL_SOCKET_BAD;
-
+#ifdef USE_WINSOCK
+ long mask = 0;
+#endif
if(bitmap & GETSOCK_READSOCK(i)) {
+#ifdef USE_WINSOCK
+ mask |= FD_READ;
+#else
ufds[nfds].fd = sockbunch[i];
ufds[nfds].events = POLLIN;
++nfds;
+#endif
s = sockbunch[i];
}
if(bitmap & GETSOCK_WRITESOCK(i)) {
+#ifdef USE_WINSOCK
+ mask |= FD_WRITE;
+#else
ufds[nfds].fd = sockbunch[i];
ufds[nfds].events = POLLOUT;
++nfds;
+#endif
s = sockbunch[i];
}
if(s == CURL_SOCKET_BAD) {
break;
}
+#ifdef USE_WINSOCK
+ if(WSAEventSelect(s, multi->wsa_event, mask) != 0)
+ return CURLM_INTERNAL_ERROR;
+#endif
}
data = data->next; /* check next handle */
@@ -1169,6 +1200,17 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi,
/* Add external file descriptions from poll-like struct curl_waitfd */
for(i = 0; i < extra_nfds; i++) {
+#ifdef USE_WINSOCK
+ long events = 0;
+ if(extra_fds[i].events & CURL_WAIT_POLLIN)
+ events |= FD_READ;
+ if(extra_fds[i].events & CURL_WAIT_POLLPRI)
+ events |= FD_OOB;
+ if(extra_fds[i].events & CURL_WAIT_POLLOUT)
+ events |= FD_WRITE;
+ if(WSAEventSelect(extra_fds[i].fd, multi->wsa_event, events) != 0)
+ return CURLM_INTERNAL_ERROR;
+#else
ufds[nfds].fd = extra_fds[i].fd;
ufds[nfds].events = 0;
if(extra_fds[i].events & CURL_WAIT_POLLIN)
@@ -1178,28 +1220,61 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi,
if(extra_fds[i].events & CURL_WAIT_POLLOUT)
ufds[nfds].events |= POLLOUT;
++nfds;
+#endif
}
#ifdef ENABLE_WAKEUP
+#ifndef USE_WINSOCK
if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
ufds[nfds].fd = multi->wakeup_pair[0];
ufds[nfds].events = POLLIN;
++nfds;
}
#endif
+#endif
if(nfds) {
int pollrc;
/* wait... */
+#ifdef USE_WINSOCK
+ DWORD waitrc = WSAWaitForMultipleEvents(1, &multi->wsa_event, FALSE,
+ timeout_ms, FALSE);
+ /* WSA_WAIT_EVENT_0 is 0, so waitrc >= WSA_WAIT_EVENT_0 warns */
+ if(waitrc == WSA_WAIT_EVENT_0)
+ pollrc = 1;
+ else
+ pollrc = -1;
+#else
pollrc = Curl_poll(ufds, nfds, timeout_ms);
+#endif
if(pollrc > 0) {
+#ifdef USE_WINSOCK
+ retcode = 0;
+#else
retcode = pollrc;
+#endif
/* copy revents results from the poll to the curl_multi_wait poll
struct, the bit values of the actual underlying poll() implementation
may not be the same as the ones in the public libcurl API! */
for(i = 0; i < extra_nfds; i++) {
unsigned short mask = 0;
+#ifdef USE_WINSOCK
+ WSANETWORKEVENTS events = {0};
+ if(WSAEnumNetworkEvents(extra_fds[i].fd, multi->wsa_event,
+ &events) == 0) {
+ if(events.lNetworkEvents & FD_READ)
+ mask |= CURL_WAIT_POLLIN;
+ if(events.lNetworkEvents & FD_WRITE)
+ mask |= CURL_WAIT_POLLOUT;
+ if(events.lNetworkEvents & FD_OOB)
+ mask |= CURL_WAIT_POLLPRI;
+
+ if(events.lNetworkEvents != 0)
+ retcode++;
+ }
+ WSAEventSelect(extra_fds[i].fd, multi->wsa_event, 0);
+#else
unsigned r = ufds[curlfds + i].revents;
if(r & POLLIN)
@@ -1208,10 +1283,39 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi,
mask |= CURL_WAIT_POLLOUT;
if(r & POLLPRI)
mask |= CURL_WAIT_POLLPRI;
+#endif
extra_fds[i].revents = mask;
}
+#ifdef USE_WINSOCK
+ /* Count up all our own sockets that had activity,
+ and remove them from the event. */
+ if(curlfds) {
+ data = multi->easyp;
+ while(data) {
+ bitmap = multi_getsock(data, sockbunch);
+
+ for(i = 0; i < MAX_SOCKSPEREASYHANDLE; i++) {
+ if(bitmap & (GETSOCK_READSOCK(i) | GETSOCK_WRITESOCK(i))) {
+ WSANETWORKEVENTS events = {0};
+ if(WSAEnumNetworkEvents(sockbunch[i], multi->wsa_event,
+ &events) == 0) {
+ if(events.lNetworkEvents != 0)
+ retcode++;
+ }
+ WSAEventSelect(sockbunch[i], multi->wsa_event, 0);
+ }
+ else
+ break;
+ }
+
+ data = data->next;
+ }
+ }
+
+ WSAResetEvent(multi->wsa_event);
+#else
#ifdef ENABLE_WAKEUP
if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
if(ufds[curlfds + extra_nfds].revents & POLLIN) {
@@ -1224,10 +1328,8 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi,
when there is no more data, breaking the loop. */
nread = sread(multi->wakeup_pair[0], buf, sizeof(buf));
if(nread <= 0) {
-#ifndef USE_WINSOCK
if(nread < 0 && EINTR == SOCKERRNO)
continue;
-#endif
break;
}
}
@@ -1236,11 +1338,14 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi,
}
}
#endif
+#endif
}
}
+#ifndef USE_WINSOCK
if(ufds_malloc)
free(ufds);
+#endif
if(ret)
*ret = retcode;
if(!extrawait || nfds)
@@ -1295,6 +1400,10 @@ CURLMcode curl_multi_wakeup(struct Curl_multi *multi)
return CURLM_BAD_HANDLE;
#ifdef ENABLE_WAKEUP
+#ifdef USE_WINSOCK
+ if(WSASetEvent(multi->wsa_event))
+ return CURLM_OK;
+#else
/* the wakeup_pair variable is only written during init and cleanup,
making it safe to access from another thread after the init part
and before cleanup */
@@ -1328,6 +1437,7 @@ CURLMcode curl_multi_wakeup(struct Curl_multi *multi)
}
}
#endif
+#endif
return CURLM_WAKEUP_FAILURE;
}
@@ -2485,10 +2595,14 @@ CURLMcode curl_multi_cleanup(struct Curl_multi *multi)
Curl_hash_destroy(&multi->hostcache);
Curl_psl_destroy(&multi->psl);
+#ifdef USE_WINSOCK
+ WSACloseEvent(multi->wsa_event);
+#else
#ifdef ENABLE_WAKEUP
sclose(multi->wakeup_pair[0]);
sclose(multi->wakeup_pair[1]);
#endif
+#endif
free(multi);
return CURLM_OK;
diff --git a/lib/multihandle.h b/lib/multihandle.h
index 91eca16c4..94bbad77a 100644
--- a/lib/multihandle.h
+++ b/lib/multihandle.h
@@ -138,10 +138,14 @@ struct Curl_multi {
previous callback */
unsigned int max_concurrent_streams;
+#ifdef USE_WINSOCK
+ WSAEVENT wsa_event; /* winsock event used for waits */
+#else
#ifdef ENABLE_WAKEUP
curl_socket_t wakeup_pair[2]; /* socketpair() used for wakeup
0 is used for read, 1 is used for write */
#endif
+#endif
/* multiplexing wanted */
bool multiplexing;
bool recheckstate; /* see Curl_multi_connchanged */