summaryrefslogtreecommitdiff
path: root/sql/threadpool_win.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/threadpool_win.cc')
-rw-r--r--sql/threadpool_win.cc163
1 files changed, 48 insertions, 115 deletions
diff --git a/sql/threadpool_win.cc b/sql/threadpool_win.cc
index 6003b06bc7b..515bf0e02bc 100644
--- a/sql/threadpool_win.cc
+++ b/sql/threadpool_win.cc
@@ -31,6 +31,8 @@
#include <threadpool.h>
#include <windows.h>
+#include "threadpool_winsockets.h"
+
/* Log a warning */
static void tp_log_warning(const char *msg, const char *fct)
{
@@ -43,8 +45,6 @@ static PTP_POOL pool;
static TP_CALLBACK_ENVIRON callback_environ;
static DWORD fls;
-static bool skip_completion_port_on_success = false;
-
PTP_CALLBACK_ENVIRON get_threadpool_win_callback_environ()
{
return pool? &callback_environ: 0;
@@ -83,22 +83,21 @@ struct TP_connection_win:public TP_connection
public:
TP_connection_win(CONNECT*);
~TP_connection_win();
- virtual int init();
- virtual int start_io();
- virtual void set_io_timeout(int sec);
- virtual void wait_begin(int type);
- virtual void wait_end();
-
- ulonglong timeout;
- enum_vio_type vio_type;
- HANDLE handle;
- OVERLAPPED overlapped;
- PTP_CALLBACK_INSTANCE callback_instance;
- PTP_IO io;
- PTP_TIMER timer;
- PTP_WORK work;
- bool long_callback;
-
+ int init() override;
+ void init_vio(st_vio *vio) override;
+ int start_io() override;
+ void set_io_timeout(int sec) override;
+ void wait_begin(int type) override;
+ void wait_end() override;
+
+ ulonglong timeout=ULLONG_MAX;
+ OVERLAPPED overlapped{};
+ PTP_CALLBACK_INSTANCE callback_instance{};
+ PTP_IO io{};
+ PTP_TIMER timer{};
+ PTP_WORK work{};
+ bool long_callback{};
+ win_aiosocket sock;
};
struct TP_connection *new_TP_connection(CONNECT *connect)
@@ -125,120 +124,50 @@ void TP_pool_win::add(TP_connection *c)
}
}
+#define CHECK_ALLOC_ERROR(op) \
+ do \
+ { \
+ if (!(op)) \
+ { \
+ tp_log_warning("Allocation failed", #op); \
+ } \
+ } while (0)
TP_connection_win::TP_connection_win(CONNECT *c) :
- TP_connection(c),
- timeout(ULONGLONG_MAX),
- callback_instance(0),
- io(0),
- timer(0),
- work(0)
+ TP_connection(c)
{
-}
+ /* Assign io completion callback */
+ HANDLE h= c->vio_type == VIO_TYPE_NAMEDPIPE ? c->pipe
+ : (HANDLE)mysql_socket_getfd(c->sock);
-#define CHECK_ALLOC_ERROR(op) if (!(op)) {tp_log_warning("Allocation failed", #op); DBUG_ASSERT(0); return -1; }
+ CHECK_ALLOC_ERROR(io=CreateThreadpoolIo(h, io_completion_callback, this, &callback_environ));
+ CHECK_ALLOC_ERROR(timer= CreateThreadpoolTimer(timer_callback, this, &callback_environ));
+ CHECK_ALLOC_ERROR(work= CreateThreadpoolWork(work_callback, this, &callback_environ));
+}
int TP_connection_win::init()
{
-
- memset(&overlapped, 0, sizeof(OVERLAPPED));
- switch ((vio_type = connect->vio_type))
- {
- case VIO_TYPE_SSL:
- case VIO_TYPE_TCPIP:
- handle= (HANDLE) mysql_socket_getfd(connect->sock);
- break;
- case VIO_TYPE_NAMEDPIPE:
- handle= connect->pipe;
- break;
- default:
- abort();
- }
-
-
- /* Performance tweaks (s. MSDN documentation)*/
- UCHAR flags= FILE_SKIP_SET_EVENT_ON_HANDLE;
- if (skip_completion_port_on_success)
- {
- flags |= FILE_SKIP_COMPLETION_PORT_ON_SUCCESS;
- }
- (void)SetFileCompletionNotificationModes(handle, flags);
- /* Assign io completion callback */
- CHECK_ALLOC_ERROR(io= CreateThreadpoolIo(handle, io_completion_callback, this, &callback_environ));
- CHECK_ALLOC_ERROR(timer= CreateThreadpoolTimer(timer_callback, this, &callback_environ));
- CHECK_ALLOC_ERROR(work= CreateThreadpoolWork(work_callback, this, &callback_environ));
- return 0;
+ return !io || !timer || !work ;
}
+void TP_connection_win::init_vio(st_vio* vio)
+{
+ sock.init(vio);
+}
/*
Start asynchronous read
*/
int TP_connection_win::start_io()
{
- DWORD num_bytes = 0;
- static char c;
- WSABUF buf;
- buf.buf= &c;
- buf.len= 0;
- DWORD flags=0;
- DWORD last_error= 0;
-
- int retval;
StartThreadpoolIo(io);
-
- if (vio_type == VIO_TYPE_TCPIP || vio_type == VIO_TYPE_SSL)
+ if (sock.begin_read())
{
- /* Start async io (sockets). */
- if (WSARecv((SOCKET)handle , &buf, 1, &num_bytes, &flags,
- &overlapped, NULL) == 0)
- {
- retval= last_error= 0;
- }
- else
- {
- retval= -1;
- last_error= WSAGetLastError();
- }
- }
- else
- {
- /* Start async io (named pipe) */
- if (ReadFile(handle, &c, 0, &num_bytes,&overlapped))
- {
- retval= last_error= 0;
- }
- else
- {
- retval= -1;
- last_error= GetLastError();
- }
- }
-
- if (retval == 0 || last_error == ERROR_MORE_DATA)
- {
- /*
- IO successfully finished (synchronously).
- If skip_completion_port_on_success is set, we need to handle it right
- here, because completion callback would not be executed by the pool.
- */
- if (skip_completion_port_on_success)
- {
- CancelThreadpoolIo(io);
- io_completion_callback(callback_instance, this, &overlapped, last_error,
- num_bytes, io);
- }
- return 0;
- }
-
- if (last_error == ERROR_IO_PENDING)
- {
- return 0;
+ /* Some error occurred */
+ CancelThreadpoolIo(io);
+ return -1;
}
-
- /* Some error occurred */
- CancelThreadpoolIo(io);
- return -1;
+ return 0;
}
/*
@@ -305,7 +234,7 @@ void tp_win_callback_prolog()
{
/* Running in new worker thread*/
FlsSetValue(fls, (void *)1);
- statistic_increment(thread_created, &LOCK_status);
+ thread_created++;
tp_stats.num_worker_threads++;
my_thread_init();
}
@@ -350,6 +279,10 @@ static VOID CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance,
PVOID context, PVOID overlapped, ULONG io_result, ULONG_PTR nbytes, PTP_IO io)
{
TP_connection_win *c= (TP_connection_win *)context;
+
+ /* How many bytes were preread into read buffer */
+ c->sock.end_read((ULONG)nbytes, io_result);
+
/*
Execute high priority connections immediately.
'Yield' in case of low priority connections, i.e SubmitThreadpoolWork (with the same callback)