summaryrefslogtreecommitdiff
path: root/sql/threadpool_generic.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/threadpool_generic.cc')
-rw-r--r--sql/threadpool_generic.cc72
1 files changed, 29 insertions, 43 deletions
diff --git a/sql/threadpool_generic.cc b/sql/threadpool_generic.cc
index d9286602ace..2a5587fa04a 100644
--- a/sql/threadpool_generic.cc
+++ b/sql/threadpool_generic.cc
@@ -29,8 +29,8 @@
#include <sql_plist.h>
#include <threadpool.h>
#include <algorithm>
-
-#ifdef HAVE_IOCP
+#ifdef _WIN32
+#include "threadpool_winsockets.h"
#define OPTIONAL_IO_POLL_READ_PARAM this
#else
#define OPTIONAL_IO_POLL_READ_PARAM 0
@@ -347,7 +347,7 @@ static void* native_event_get_userdata(native_event *event)
return event->portev_user;
}
-#elif defined(HAVE_IOCP)
+#elif defined(_WIN32)
static TP_file_handle io_poll_create()
@@ -358,29 +358,8 @@ static TP_file_handle io_poll_create()
int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *, void *opt)
{
- static char c;
- TP_connection_generic *con= (TP_connection_generic *)opt;
- OVERLAPPED *overlapped= &con->overlapped;
- if (con->vio_type == VIO_TYPE_NAMEDPIPE)
- {
- if (ReadFile(fd, &c, 0, NULL, overlapped))
- return 0;
- }
- else
- {
- WSABUF buf;
- buf.buf= &c;
- buf.len= 0;
- DWORD flags=0;
-
- if (WSARecv((SOCKET)fd, &buf, 1,NULL, &flags,overlapped, NULL) == 0)
- return 0;
- }
-
- if (GetLastError() == ERROR_IO_PENDING)
- return 0;
-
- return 1;
+ auto c= (TP_connection_generic *) opt;
+ return (int) c->win_sock.begin_read();
}
@@ -429,20 +408,33 @@ int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
}
-int io_poll_wait(TP_file_handle pollfd, native_event *events, int maxevents, int timeout_ms)
+static void *native_event_get_userdata(native_event *event)
{
- ULONG n;
- BOOL ok = GetQueuedCompletionStatusEx(pollfd, events,
- maxevents, &n, timeout_ms, FALSE);
-
- return ok ? (int)n : -1;
+ return (void *) event->lpCompletionKey;
}
-
-static void* native_event_get_userdata(native_event *event)
+int io_poll_wait(TP_file_handle pollfd, native_event *events, int maxevents,
+ int timeout_ms)
{
- return (void *)event->lpCompletionKey;
+ ULONG n;
+ if (!GetQueuedCompletionStatusEx(pollfd, events, maxevents, &n, timeout_ms, FALSE))
+ return -1;
+
+ /* Update win_sock with number of bytes read.*/
+ for (ULONG i= 0; i < n; i++)
+ {
+ auto ev= &events[i];
+ auto c= (TP_connection_generic *) native_event_get_userdata(ev);
+ /* null userdata zero means shutdown (see PostQueuedCompletionStatus() usage*/
+ if (c)
+ {
+ c->win_sock.end_read(ev->dwNumberOfBytesTransferred, 0);
+ }
+ }
+
+ return (int) n;
}
+
#endif
@@ -995,7 +987,7 @@ void thread_group_destroy(thread_group_t *thread_group)
io_poll_close(thread_group->pollfd);
thread_group->pollfd= INVALID_HANDLE_VALUE;
}
-#ifndef HAVE_IOCP
+#ifndef _WIN32
for(int i=0; i < 2; i++)
{
if(thread_group->shutdown_pipe[i] != -1)
@@ -1039,7 +1031,7 @@ static int wake_thread(thread_group_t *thread_group,bool due_to_stall)
*/
static int wake_listener(thread_group_t *thread_group)
{
-#ifndef HAVE_IOCP
+#ifndef _WIN32
if (pipe(thread_group->shutdown_pipe))
{
return -1;
@@ -1385,12 +1377,6 @@ TP_connection_generic::TP_connection_generic(CONNECT *c):
bound_to_poll_descriptor(false),
waiting(false),
fix_group(false)
-#ifdef HAVE_IOCP
-, overlapped()
-#endif
-#ifdef _WIN32
-, vio_type(c->vio_type)
-#endif
{
DBUG_ASSERT(c->vio_type != VIO_CLOSED);