diff options
Diffstat (limited to 'sql/threadpool_win.cc')
-rw-r--r-- | sql/threadpool_win.cc | 163 |
1 files changed, 48 insertions, 115 deletions
diff --git a/sql/threadpool_win.cc b/sql/threadpool_win.cc index 6003b06bc7b..515bf0e02bc 100644 --- a/sql/threadpool_win.cc +++ b/sql/threadpool_win.cc @@ -31,6 +31,8 @@ #include <threadpool.h> #include <windows.h> +#include "threadpool_winsockets.h" + /* Log a warning */ static void tp_log_warning(const char *msg, const char *fct) { @@ -43,8 +45,6 @@ static PTP_POOL pool; static TP_CALLBACK_ENVIRON callback_environ; static DWORD fls; -static bool skip_completion_port_on_success = false; - PTP_CALLBACK_ENVIRON get_threadpool_win_callback_environ() { return pool? &callback_environ: 0; @@ -83,22 +83,21 @@ struct TP_connection_win:public TP_connection public: TP_connection_win(CONNECT*); ~TP_connection_win(); - virtual int init(); - virtual int start_io(); - virtual void set_io_timeout(int sec); - virtual void wait_begin(int type); - virtual void wait_end(); - - ulonglong timeout; - enum_vio_type vio_type; - HANDLE handle; - OVERLAPPED overlapped; - PTP_CALLBACK_INSTANCE callback_instance; - PTP_IO io; - PTP_TIMER timer; - PTP_WORK work; - bool long_callback; - + int init() override; + void init_vio(st_vio *vio) override; + int start_io() override; + void set_io_timeout(int sec) override; + void wait_begin(int type) override; + void wait_end() override; + + ulonglong timeout=ULLONG_MAX; + OVERLAPPED overlapped{}; + PTP_CALLBACK_INSTANCE callback_instance{}; + PTP_IO io{}; + PTP_TIMER timer{}; + PTP_WORK work{}; + bool long_callback{}; + win_aiosocket sock; }; struct TP_connection *new_TP_connection(CONNECT *connect) @@ -125,120 +124,50 @@ void TP_pool_win::add(TP_connection *c) } } +#define CHECK_ALLOC_ERROR(op) \ + do \ + { \ + if (!(op)) \ + { \ + tp_log_warning("Allocation failed", #op); \ + } \ + } while (0) TP_connection_win::TP_connection_win(CONNECT *c) : - TP_connection(c), - timeout(ULONGLONG_MAX), - callback_instance(0), - io(0), - timer(0), - work(0) + TP_connection(c) { -} + /* Assign io completion callback */ + HANDLE h= c->vio_type == VIO_TYPE_NAMEDPIPE ? c->pipe + : (HANDLE)mysql_socket_getfd(c->sock); -#define CHECK_ALLOC_ERROR(op) if (!(op)) {tp_log_warning("Allocation failed", #op); DBUG_ASSERT(0); return -1; } + CHECK_ALLOC_ERROR(io=CreateThreadpoolIo(h, io_completion_callback, this, &callback_environ)); + CHECK_ALLOC_ERROR(timer= CreateThreadpoolTimer(timer_callback, this, &callback_environ)); + CHECK_ALLOC_ERROR(work= CreateThreadpoolWork(work_callback, this, &callback_environ)); +} int TP_connection_win::init() { - - memset(&overlapped, 0, sizeof(OVERLAPPED)); - switch ((vio_type = connect->vio_type)) - { - case VIO_TYPE_SSL: - case VIO_TYPE_TCPIP: - handle= (HANDLE) mysql_socket_getfd(connect->sock); - break; - case VIO_TYPE_NAMEDPIPE: - handle= connect->pipe; - break; - default: - abort(); - } - - - /* Performance tweaks (s. MSDN documentation)*/ - UCHAR flags= FILE_SKIP_SET_EVENT_ON_HANDLE; - if (skip_completion_port_on_success) - { - flags |= FILE_SKIP_COMPLETION_PORT_ON_SUCCESS; - } - (void)SetFileCompletionNotificationModes(handle, flags); - /* Assign io completion callback */ - CHECK_ALLOC_ERROR(io= CreateThreadpoolIo(handle, io_completion_callback, this, &callback_environ)); - CHECK_ALLOC_ERROR(timer= CreateThreadpoolTimer(timer_callback, this, &callback_environ)); - CHECK_ALLOC_ERROR(work= CreateThreadpoolWork(work_callback, this, &callback_environ)); - return 0; + return !io || !timer || !work ; } +void TP_connection_win::init_vio(st_vio* vio) +{ + sock.init(vio); +} /* Start asynchronous read */ int TP_connection_win::start_io() { - DWORD num_bytes = 0; - static char c; - WSABUF buf; - buf.buf= &c; - buf.len= 0; - DWORD flags=0; - DWORD last_error= 0; - - int retval; StartThreadpoolIo(io); - - if (vio_type == VIO_TYPE_TCPIP || vio_type == VIO_TYPE_SSL) + if (sock.begin_read()) { - /* Start async io (sockets). */ - if (WSARecv((SOCKET)handle , &buf, 1, &num_bytes, &flags, - &overlapped, NULL) == 0) - { - retval= last_error= 0; - } - else - { - retval= -1; - last_error= WSAGetLastError(); - } - } - else - { - /* Start async io (named pipe) */ - if (ReadFile(handle, &c, 0, &num_bytes,&overlapped)) - { - retval= last_error= 0; - } - else - { - retval= -1; - last_error= GetLastError(); - } - } - - if (retval == 0 || last_error == ERROR_MORE_DATA) - { - /* - IO successfully finished (synchronously). - If skip_completion_port_on_success is set, we need to handle it right - here, because completion callback would not be executed by the pool. - */ - if (skip_completion_port_on_success) - { - CancelThreadpoolIo(io); - io_completion_callback(callback_instance, this, &overlapped, last_error, - num_bytes, io); - } - return 0; - } - - if (last_error == ERROR_IO_PENDING) - { - return 0; + /* Some error occurred */ + CancelThreadpoolIo(io); + return -1; } - - /* Some error occurred */ - CancelThreadpoolIo(io); - return -1; + return 0; } /* @@ -305,7 +234,7 @@ void tp_win_callback_prolog() { /* Running in new worker thread*/ FlsSetValue(fls, (void *)1); - statistic_increment(thread_created, &LOCK_status); + thread_created++; tp_stats.num_worker_threads++; my_thread_init(); } @@ -350,6 +279,10 @@ static VOID CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance, PVOID context, PVOID overlapped, ULONG io_result, ULONG_PTR nbytes, PTP_IO io) { TP_connection_win *c= (TP_connection_win *)context; + + /* How many bytes were preread into read buffer */ + c->sock.end_read((ULONG)nbytes, io_result); + /* Execute high priority connections immediately. 'Yield' in case of low priority connections, i.e SubmitThreadpoolWork (with the same callback) |