summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVladislav Vaintroub <wlad@mariadb.com>2016-07-16 00:49:45 +0000
committerVladislav Vaintroub <wlad@mariadb.com>2016-07-16 00:49:45 +0000
commita170bd337e84fd13fa7e64cbb38d6a8baa2a2244 (patch)
treebf78c6d79a7ea58482bb95a7c69a9e8d75aca028
parent627c5d9b57c461cda64f703c2e683c2a0b6ac53b (diff)
downloadmariadb-git-a170bd337e84fd13fa7e64cbb38d6a8baa2a2244.tar.gz
Unix threadpool refactoring (Actually makes Windows possible to use generic implementation
-rw-r--r--sql/CMakeLists.txt4
-rw-r--r--sql/mysqld.cc2
-rw-r--r--sql/sys_vars.cc12
-rw-r--r--sql/threadpool.h6
-rw-r--r--sql/threadpool_common.cc20
-rw-r--r--sql/threadpool_unix.cc409
-rw-r--r--sql/threadpool_win.cc12
7 files changed, 252 insertions, 213 deletions
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt
index 089d793b2b0..4e3c194598a 100644
--- a/sql/CMakeLists.txt
+++ b/sql/CMakeLists.txt
@@ -155,9 +155,9 @@ IF (CMAKE_SYSTEM_NAME MATCHES "Linux" OR
ADD_DEFINITIONS(-DHAVE_POOL_OF_THREADS)
IF(WIN32)
SET(SQL_SOURCE ${SQL_SOURCE} threadpool_win.cc)
- ELSE()
- SET(SQL_SOURCE ${SQL_SOURCE} threadpool_unix.cc)
ENDIF()
+ SET(SQL_SOURCE ${SQL_SOURCE} threadpool_unix.cc)
+
ENDIF()
MYSQL_ADD_PLUGIN(partition ha_partition.cc STORAGE_ENGINE DEFAULT STATIC_ONLY
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index fa8f143335d..bc2d673e3ca 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -4416,7 +4416,7 @@ static int init_common_variables()
#endif /* HAVE_SOLARIS_LARGE_PAGES */
-#if defined(HAVE_POOL_OF_THREADS) && !defined(_WIN32)
+#if defined(HAVE_POOL_OF_THREADS)
if (IS_SYSVAR_AUTOSIZE(&threadpool_size))
SYSVAR_AUTOSIZE(threadpool_size, my_getncpus());
#endif
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index db054a635af..f0635611414 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -3217,23 +3217,17 @@ static Sys_var_ulong Sys_thread_cache_size(
#ifdef HAVE_POOL_OF_THREADS
static bool fix_tp_max_threads(sys_var *, THD *, enum_var_type)
{
-#ifdef _WIN32
tp_set_max_threads(threadpool_max_threads);
-#endif
return false;
}
-#ifdef _WIN32
static bool fix_tp_min_threads(sys_var *, THD *, enum_var_type)
{
tp_set_min_threads(threadpool_min_threads);
return false;
}
-#endif
-
-#ifndef _WIN32
static bool check_threadpool_size(sys_var *self, THD *thd, set_var *var)
{
ulonglong v= var->save_result.ulonglong_value;
@@ -3258,9 +3252,7 @@ static bool fix_threadpool_stall_limit(sys_var*, THD*, enum_var_type)
tp_set_threadpool_stall_limit(threadpool_stall_limit);
return false;
}
-#endif
-#ifdef _WIN32
static Sys_var_uint Sys_threadpool_min_threads(
"thread_pool_min_threads",
"Minimum number of threads in the thread pool.",
@@ -3269,7 +3261,7 @@ static Sys_var_uint Sys_threadpool_min_threads(
NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
ON_UPDATE(fix_tp_min_threads)
);
-#else
+
static Sys_var_uint Sys_threadpool_idle_thread_timeout(
"thread_pool_idle_timeout",
"Timeout in seconds for an idle thread in the thread pool."
@@ -3304,7 +3296,7 @@ static Sys_var_uint Sys_threadpool_stall_limit(
NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
ON_UPDATE(fix_threadpool_stall_limit)
);
-#endif /* !WIN32 */
+
static Sys_var_uint Sys_threadpool_max_threads(
"thread_pool_max_threads",
"Maximum allowed number of worker threads in the thread pool",
diff --git a/sql/threadpool.h b/sql/threadpool.h
index 57a3e052a97..8583e793678 100644
--- a/sql/threadpool.h
+++ b/sql/threadpool.h
@@ -122,6 +122,7 @@ struct TP_pool
virtual int set_pool_size(uint){ return 0; }
virtual int set_idle_timeout(uint){ return 0; }
virtual int set_oversubscribe(uint){ return 0; }
+ virtual int set_stall_limit(uint){ return 0; }
virtual int get_thread_count() { return tp_stats.num_worker_threads; }
virtual int get_idle_thread_count(){ return 0; }
};
@@ -141,11 +142,10 @@ struct TP_pool_win:TP_pool
struct TP_pool_unix :TP_pool
{
TP_pool_unix();
+ ~TP_pool_unix();
virtual TP_connection *new_connection(CONNECT *c);
virtual void add(TP_connection *);
- virtual int set_max_threads(uint);
virtual int set_pool_size(uint);
- virtual int set_idle_timeout(uint);
- virtual int set_oversubscribe(uint);
+ virtual int set_stall_limit(uint);
virtual int get_idle_thread_count();
};
diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc
index 7e3492ae3ef..730ea2bd365 100644
--- a/sql/threadpool_common.cc
+++ b/sql/threadpool_common.cc
@@ -344,15 +344,17 @@ static bool tp_end_thread(THD *, bool)
return 0;
}
-TP_pool *pool;
+static TP_pool *pool;
static bool tp_init()
{
+
#ifdef _WIN32
pool = new (std::nothrow) TP_pool_win;
return 0;
#else
-#error No threadpool
+ pool= new (std::nothrow) TP_pool_unix;
+ return 0;
#endif
}
@@ -389,6 +391,19 @@ void tp_set_max_threads(uint val)
pool->set_max_threads(val);
}
+void tp_set_threadpool_size(uint val)
+{
+ if (pool)
+ pool->set_pool_size(val);
+}
+
+
+void tp_set_threadpool_stall_limit(uint val)
+{
+ if (pool)
+ pool->set_stall_limit(val);
+}
+
void tp_timeout_handler(TP_connection *c)
{
@@ -433,6 +448,7 @@ static void tp_end()
delete pool;
}
+
static scheduler_functions tp_scheduler_functions=
{
0, // max_threads
diff --git a/sql/threadpool_unix.cc b/sql/threadpool_unix.cc
index 4079091e217..005afde464b 100644
--- a/sql/threadpool_unix.cc
+++ b/sql/threadpool_unix.cc
@@ -22,6 +22,17 @@
#ifdef HAVE_POOL_OF_THREADS
+#ifdef _WIN32
+/* AIX may define this, too ?*/
+#define HAVE_IOCP
+#endif
+
+#ifdef HAVE_IOCP
+#define OPTIONAL_IO_POLL_READ_PARAM &overlapped
+#else
+#define OPTIONAL_IOCP_READ_PARAM 0
+#endif
+
#include <sql_connect.h>
#include <mysqld.h>
#include <debug_sync.h>
@@ -38,10 +49,23 @@ typedef struct kevent native_event;
#elif defined (__sun)
#include <port.h>
typedef port_event_t native_event;
+#elif defined (HAVE_IOCP)
+typedef OVERLAPPED_ENTRY native_event;
#else
#error threadpool is not available on this platform
#endif
+
+static void io_poll_close(int fd)
+{
+#ifdef _WIN32
+ CloseHandle((HANDLE)fd);
+#else
+ close(fd);
+#endif
+}
+
+
/** Maximum number of native events a listener can read in one go */
#define MAX_EVENTS 1024
@@ -108,26 +132,34 @@ typedef I_P_List<worker_thread_t, I_P_List_adapter<worker_thread_t,
>
worker_list_t;
-struct connection_t
+struct TP_connection_unix:public TP_connection
{
+ TP_connection_unix(CONNECT *c);
+ virtual int init(){ return 0; };
+ virtual void set_io_timeout(int sec);
+ virtual int start_io();
+ virtual void wait_begin(int type);
+ virtual void wait_end();
- THD *thd;
thread_group_t *thread_group;
- connection_t *next_in_queue;
- connection_t **prev_in_queue;
+ TP_connection_unix *next_in_queue;
+ TP_connection_unix **prev_in_queue;
ulonglong abs_wait_timeout;
- CONNECT* connect;
- bool logged_in;
bool bound_to_poll_descriptor;
bool waiting;
+#ifdef HAVE_IOCP
+ OVERLAPPED overlapped;
+#endif
};
-typedef I_P_List<connection_t,
- I_P_List_adapter<connection_t,
- &connection_t::next_in_queue,
- &connection_t::prev_in_queue>,
+typedef TP_connection_unix TP_connection_unix;
+
+typedef I_P_List<TP_connection_unix,
+ I_P_List_adapter<TP_connection_unix,
+ &TP_connection_unix::next_in_queue,
+ &TP_connection_unix::prev_in_queue>,
I_P_List_null_counter,
- I_P_List_fast_push_back<connection_t> >
+ I_P_List_fast_push_back<TP_connection_unix> >
connection_queue_t;
struct thread_group_t
@@ -175,15 +207,12 @@ struct pool_timer_t
static pool_timer_t pool_timer;
-static void queue_put(thread_group_t *thread_group, connection_t *connection);
+static void queue_put(thread_group_t *thread_group, TP_connection_unix *connection);
static int wake_thread(thread_group_t *thread_group);
-static void handle_event(connection_t *connection);
static int wake_or_create_thread(thread_group_t *thread_group);
static int create_worker(thread_group_t *thread_group);
static void *worker_main(void *param);
static void check_stall(thread_group_t *thread_group);
-static void connection_abort(connection_t *connection);
-static void set_wait_timeout(connection_t *connection);
static void set_next_timeout_check(ulonglong abstime);
static void print_pool_blocked_message(bool);
@@ -208,7 +237,7 @@ static void print_pool_blocked_message(bool);
Creates an io_poll descriptor
On Linux: epoll_create()
- - io_poll_associate_fd(int poll_fd, int fd, void *data)
+ - io_poll_associate_fd(int poll_fd, int fd, void *data, void *opt)
Associate file descriptor with io poll descriptor
On Linux : epoll_ctl(..EPOLL_CTL_ADD))
@@ -217,7 +246,7 @@ static void print_pool_blocked_message(bool);
On Linux: epoll_ctl(..EPOLL_CTL_DEL)
- - io_poll_start_read(int poll_fd,int fd, void *data)
+ - io_poll_start_read(int poll_fd,int fd, void *data, void *opt)
The same as io_poll_associate_fd(), but cannot be used before
io_poll_associate_fd() was called.
On Linux : epoll_ctl(..EPOLL_CTL_MOD)
@@ -245,7 +274,7 @@ static int io_poll_create()
}
-int io_poll_associate_fd(int pollfd, int fd, void *data)
+int io_poll_associate_fd(int pollfd, int fd, void *data, void*)
{
struct epoll_event ev;
ev.data.u64= 0; /* Keep valgrind happy */
@@ -256,7 +285,7 @@ int io_poll_associate_fd(int pollfd, int fd, void *data)
-int io_poll_start_read(int pollfd, int fd, void *data)
+int io_poll_start_read(int pollfd, int fd, void *data, void *)
{
struct epoll_event ev;
ev.data.u64= 0; /* Keep valgrind happy */
@@ -315,7 +344,7 @@ int io_poll_create()
return kqueue();
}
-int io_poll_start_read(int pollfd, int fd, void *data)
+int io_poll_start_read(int pollfd, int fd, void *data,void *)
{
struct kevent ke;
MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT,
@@ -324,12 +353,12 @@ int io_poll_start_read(int pollfd, int fd, void *data)
}
-int io_poll_associate_fd(int pollfd, int fd, void *data)
+int io_poll_associate_fd(int pollfd, int fd, void *data,void *)
{
struct kevent ke;
MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT,
0, 0, data);
- return io_poll_start_read(pollfd,fd, data);
+ return io_poll_start_read(pollfd,fd, data, 0);
}
@@ -371,14 +400,14 @@ static int io_poll_create()
return port_create();
}
-int io_poll_start_read(int pollfd, int fd, void *data)
+int io_poll_start_read(int pollfd, int fd, void *data, void *)
{
return port_associate(pollfd, PORT_SOURCE_FD, fd, POLLIN, data);
}
-static int io_poll_associate_fd(int pollfd, int fd, void *data)
+static int io_poll_associate_fd(int pollfd, int fd, void *data, void *)
{
- return io_poll_start_read(pollfd, fd, data);
+ return io_poll_start_read(pollfd, fd, data, 0);
}
int io_poll_disassociate_fd(int pollfd, int fd)
@@ -410,16 +439,77 @@ static void* native_event_get_userdata(native_event *event)
{
return event->portev_user;
}
+
+#elif defined(HAVE_IOCP)
+
+static int io_poll_create()
+{
+ HANDLE h= CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
+ return (int)h;
+}
+
+
+int io_poll_start_read(int pollfd, int fd, void *, void *opt)
+{
+ DWORD num_bytes = 0;
+ static char c;
+
+ WSABUF buf;
+ buf.buf= &c;
+ buf.len= 0;
+ DWORD flags=0;
+
+ if (WSARecv((SOCKET)fd, &buf, 1, &num_bytes, &flags, (OVERLAPPED *)opt, NULL) == 0)
+ return 0;
+
+ if (GetLastError() == ERROR_IO_PENDING)
+ return 0;
+
+ return 1;
+}
+
+
+static int io_poll_associate_fd(int pollfd, int fd, void *data, void *opt)
+{
+ HANDLE h= CreateIoCompletionPort((HANDLE)fd, (HANDLE)pollfd, (ULONG_PTR)data, 0);
+ if (!h)
+ return -1;
+ return io_poll_start_read(pollfd,fd, 0, opt);
+}
+
+
+int io_poll_disassociate_fd(int pollfd, int fd)
+{
+ /* Not possible to unbind/rebind file descriptor in IOCP. */
+ return 0;
+}
+
+
+int io_poll_wait(int pollfd, native_event *events, int maxevents, int timeout_ms)
+{
+ ULONG n;
+ BOOL ok = GetQueuedCompletionStatusEx((HANDLE)pollfd, events,
+ maxevents, &n, timeout_ms, FALSE);
+
+ return ok ? (int)n : -1;
+}
+
+
+static void* native_event_get_userdata(native_event *event)
+{
+ return (void *)event->lpCompletionKey;
+}
+
#endif
/* Dequeue element from a workqueue */
-static connection_t *queue_get(thread_group_t *thread_group)
+static TP_connection_unix *queue_get(thread_group_t *thread_group)
{
DBUG_ENTER("queue_get");
thread_group->queue_event_count++;
- connection_t *c= thread_group->queue.front();
+ TP_connection_unix *c= thread_group->queue.front();
if (c)
{
thread_group->queue.remove(c);
@@ -450,7 +540,7 @@ static void timeout_check(pool_timer_t *timer)
if (thd->net.reading_or_writing != 1)
continue;
- connection_t *connection= (connection_t *)thd->event_scheduler.data;
+ TP_connection_unix *connection= (TP_connection_unix *)thd->event_scheduler.data;
if (!connection)
{
/*
@@ -462,11 +552,7 @@ static void timeout_check(pool_timer_t *timer)
if(connection->abs_wait_timeout < timer->current_microtime)
{
- /* Wait timeout exceeded, kill connection. */
- mysql_mutex_lock(&thd->LOCK_thd_data);
- thd->killed = KILL_CONNECTION;
- post_kill_notification(thd);
- mysql_mutex_unlock(&thd->LOCK_thd_data);
+ tp_timeout_handler(connection);
}
else
{
@@ -636,11 +722,11 @@ static void stop_timer(pool_timer_t *timer)
@return a ready connection, or NULL on shutdown
*/
-static connection_t * listener(worker_thread_t *current_thread,
+static TP_connection_unix * listener(worker_thread_t *current_thread,
thread_group_t *thread_group)
{
DBUG_ENTER("listener");
- connection_t *retval= NULL;
+ TP_connection_unix *retval= NULL;
for(;;)
{
@@ -721,14 +807,14 @@ static connection_t * listener(worker_thread_t *current_thread,
*/
for(int i=(listener_picks_event)?1:0; i < cnt ; i++)
{
- connection_t *c= (connection_t *)native_event_get_userdata(&ev[i]);
+ TP_connection_unix *c= (TP_connection_unix *)native_event_get_userdata(&ev[i]);
thread_group->queue.push_back(c);
}
if (listener_picks_event)
{
/* Handle the first event. */
- retval= (connection_t *)native_event_get_userdata(&ev[0]);
+ retval= (TP_connection_unix *)native_event_get_userdata(&ev[0]);
mysql_mutex_unlock(&thread_group->mutex);
break;
}
@@ -924,9 +1010,10 @@ void thread_group_destroy(thread_group_t *thread_group)
mysql_mutex_destroy(&thread_group->mutex);
if (thread_group->pollfd != -1)
{
- close(thread_group->pollfd);
+ io_poll_close(thread_group->pollfd);
thread_group->pollfd= -1;
}
+#ifndef HAVE_IOCP
for(int i=0; i < 2; i++)
{
if(thread_group->shutdown_pipe[i] != -1)
@@ -935,6 +1022,8 @@ void thread_group_destroy(thread_group_t *thread_group)
thread_group->shutdown_pipe[i]= -1;
}
}
+#endif
+
if (my_atomic_add32(&shutdown_group_count, -1) == 1)
my_free(all_groups);
}
@@ -957,7 +1046,32 @@ static int wake_thread(thread_group_t *thread_group)
DBUG_RETURN(1); /* no thread in waiter list => missed wakeup */
}
+/*
+ Wake listener thread (during shutdown)
+ Self-pipe trick is used in most cases,except IOCP.
+*/
+static int wake_listener(thread_group_t *thread_group)
+{
+#ifndef HAVE_IOCP
+ if (pipe(thread_group->shutdown_pipe))
+ {
+ return -1;
+ }
+ /* Wake listener */
+ if (io_poll_associate_fd(thread_group->pollfd,
+ thread_group->shutdown_pipe[0], NULL))
+ {
+ return -1;
+ }
+ char c= 0;
+ if (write(thread_group->shutdown_pipe[1], &c, 1) < 0)
+ return -1;
+#else
+ PostQueuedCompletionStatus((HANDLE)thread_group->pollfd, 0, 0, 0);
+#endif
+ return 0;
+}
/**
Initiate shutdown for thread group.
@@ -981,20 +1095,7 @@ static void thread_group_close(thread_group_t *thread_group)
thread_group->shutdown= true;
thread_group->listener= NULL;
- if (pipe(thread_group->shutdown_pipe))
- {
- DBUG_VOID_RETURN;
- }
-
- /* Wake listener */
- if (io_poll_associate_fd(thread_group->pollfd,
- thread_group->shutdown_pipe[0], NULL))
- {
- DBUG_VOID_RETURN;
- }
- char c= 0;
- if (write(thread_group->shutdown_pipe[1], &c, 1) < 0)
- DBUG_VOID_RETURN;
+ wake_listener(thread_group);
/* Wake all workers. */
while(wake_thread(thread_group) == 0)
@@ -1015,7 +1116,7 @@ static void thread_group_close(thread_group_t *thread_group)
*/
-static void queue_put(thread_group_t *thread_group, connection_t *connection)
+static void queue_put(thread_group_t *thread_group, TP_connection_unix *connection)
{
DBUG_ENTER("queue_put");
@@ -1061,11 +1162,11 @@ static bool too_many_threads(thread_group_t *thread_group)
NULL is returned if timeout has expired,or on shutdown.
*/
-connection_t *get_event(worker_thread_t *current_thread,
+TP_connection_unix *get_event(worker_thread_t *current_thread,
thread_group_t *thread_group, struct timespec *abstime)
{
DBUG_ENTER("get_event");
- connection_t *connection = NULL;
+ TP_connection_unix *connection = NULL;
int err=0;
mysql_mutex_lock(&thread_group->mutex);
@@ -1111,7 +1212,7 @@ connection_t *get_event(worker_thread_t *current_thread,
if (io_poll_wait(thread_group->pollfd,&nev,1, 0) == 1)
{
thread_group->io_event_count++;
- connection = (connection_t *)native_event_get_userdata(&nev);
+ connection= (TP_connection_unix *)native_event_get_userdata(&nev);
break;
}
}
@@ -1206,45 +1307,38 @@ void wait_end(thread_group_t *thread_group)
Allocate/initialize a new connection structure.
*/
-connection_t *alloc_connection()
+TP_connection_unix::TP_connection_unix(CONNECT *c):
+ TP_connection(c),
+ waiting(false),
+ bound_to_poll_descriptor(false),
+ abs_wait_timeout(ULONGLONG_MAX),
+ thread_group(0),
+ next_in_queue(0),
+ prev_in_queue(0),
+#ifdef HAVE_IOCP
+ overlapped()
+#endif
{
- connection_t* connection;
- DBUG_ENTER("alloc_connection");
- DBUG_EXECUTE_IF("simulate_failed_connection_1", DBUG_RETURN(0); );
-
- if ((connection = (connection_t *)my_malloc(sizeof(connection_t),0)))
- {
- connection->waiting= false;
- connection->logged_in= false;
- connection->bound_to_poll_descriptor= false;
- connection->abs_wait_timeout= ULONGLONG_MAX;
- connection->thd= 0;
- }
- DBUG_RETURN(connection);
}
+TP_connection * TP_pool_unix::new_connection(CONNECT *c)
+{
+ return new (std::nothrow) TP_connection_unix(c);
+}
/**
Add a new connection to thread pool..
*/
-void tp_add_connection(CONNECT *connect)
+void TP_pool_unix::add(TP_connection *c)
{
- connection_t *connection;
DBUG_ENTER("tp_add_connection");
- connection= alloc_connection();
- if (!connection)
- {
- connect->close_and_delete();
- DBUG_VOID_RETURN;
- }
- connection->connect= connect;
-
+ TP_connection_unix *connection=(TP_connection_unix *)c;
/* Assign connection to a group. */
thread_group_t *group=
- &all_groups[connect->thread_id%group_count];
+ &all_groups[c->connect->thread_id%group_count];
connection->thread_group=group;
@@ -1261,44 +1355,18 @@ void tp_add_connection(CONNECT *connect)
}
-/**
- Terminate connection.
-*/
-
-static void connection_abort(connection_t *connection)
-{
- DBUG_ENTER("connection_abort");
- thread_group_t *group= connection->thread_group;
-
- if (connection->thd)
- {
- threadpool_remove_connection(connection->thd);
- }
-
- mysql_mutex_lock(&group->mutex);
- group->connection_count--;
- mysql_mutex_unlock(&group->mutex);
-
- my_free(connection);
- DBUG_VOID_RETURN;
-}
-
/**
MySQL scheduler callback: wait begin
*/
-void tp_wait_begin(THD *thd, int type)
+void TP_connection_unix::wait_begin(int type)
{
- DBUG_ENTER("tp_wait_begin");
- DBUG_ASSERT(thd);
- connection_t *connection = (connection_t *)thd->event_scheduler.data;
- if (connection)
- {
- DBUG_ASSERT(!connection->waiting);
- connection->waiting= true;
- wait_begin(connection->thread_group);
- }
+ DBUG_ENTER("wait_begin");
+
+ DBUG_ASSERT(!waiting);
+ waiting= true;
+ ::wait_begin(thread_group);
DBUG_VOID_RETURN;
}
@@ -1307,18 +1375,13 @@ void tp_wait_begin(THD *thd, int type)
MySQL scheduler callback: wait end
*/
-void tp_wait_end(THD *thd)
+void TP_connection_unix::wait_end()
{
- DBUG_ENTER("tp_wait_end");
+ DBUG_ENTER("wait_end");
DBUG_ASSERT(thd);
-
- connection_t *connection = (connection_t *)thd->event_scheduler.data;
- if (connection)
- {
- DBUG_ASSERT(connection->waiting);
- connection->waiting = false;
- wait_end(connection->thread_group);
- }
+ DBUG_ASSERT(waiting);
+ waiting = false;
+ ::wait_end(thread_group);
DBUG_VOID_RETURN;
}
@@ -1340,7 +1403,7 @@ static void set_next_timeout_check(ulonglong abstime)
Set wait timeout for connection.
*/
-static void set_wait_timeout(connection_t *c)
+void TP_connection_unix::set_io_timeout(int timeout_sec)
{
DBUG_ENTER("set_wait_timeout");
/*
@@ -1351,11 +1414,11 @@ static void set_wait_timeout(connection_t *c)
one tick interval.
*/
- c->abs_wait_timeout= pool_timer.current_microtime +
+ abs_wait_timeout= pool_timer.current_microtime +
1000LL*pool_timer.tick_interval +
- 1000000LL*c->thd->variables.net_wait_timeout;
+ 1000000LL*timeout_sec;
- set_next_timeout_check(c->abs_wait_timeout);
+ set_next_timeout_check(abs_wait_timeout);
DBUG_VOID_RETURN;
}
@@ -1367,7 +1430,7 @@ static void set_wait_timeout(connection_t *c)
after thread_pool_size setting.
*/
-static int change_group(connection_t *c,
+static int change_group(TP_connection_unix *c,
thread_group_t *old_group,
thread_group_t *new_group)
{
@@ -1398,10 +1461,11 @@ static int change_group(connection_t *c,
}
-static int start_io(connection_t *connection)
+int TP_connection_unix::start_io()
{
- int fd = mysql_socket_getfd(connection->thd->net.vio->mysql_socket);
+ int fd= mysql_socket_getfd(thd->net.vio->mysql_socket);
+#ifndef HAVE_IOCP
/*
Usually, connection will stay in the same group for the entire
connection's life. However, we do allow group_count to
@@ -1413,56 +1477,25 @@ static int start_io(connection_t *connection)
on thread_id and current group count, and migrate if necessary.
*/
thread_group_t *group =
- &all_groups[connection->thd->thread_id%group_count];
+ &all_groups[thd->thread_id%group_count];
- if (group != connection->thread_group)
+ if (group != thread_group)
{
- if (change_group(connection, connection->thread_group, group))
+ if (change_group(this, thread_group, group))
return -1;
}
-
+#endif
+
/*
Bind to poll descriptor if not yet done.
*/
- if (!connection->bound_to_poll_descriptor)
+ if (!bound_to_poll_descriptor)
{
- connection->bound_to_poll_descriptor= true;
- return io_poll_associate_fd(group->pollfd, fd, connection);
+ bound_to_poll_descriptor= true;
+ return io_poll_associate_fd(thread_group->pollfd, fd, this, OPTIONAL_IO_POLL_READ_PARAM);
}
- return io_poll_start_read(group->pollfd, fd, connection);
-}
-
-
-
-static void handle_event(connection_t *connection)
-{
-
- DBUG_ENTER("handle_event");
- int err;
-
- if (!connection->logged_in)
- {
- connection->thd = threadpool_add_connection(connection->connect, connection);
- err= (connection->thd == NULL);
- connection->logged_in= true;
- }
- else
- {
- err= threadpool_process_request(connection->thd);
- }
-
- if(err)
- goto end;
-
- set_wait_timeout(connection);
- err= start_io(connection);
-
-end:
- if (err)
- connection_abort(connection);
-
- DBUG_VOID_RETURN;
+ return io_poll_start_read(thread_group->pollfd, fd, this, OPTIONAL_IO_POLL_READ_PARAM);
}
@@ -1490,14 +1523,14 @@ static void *worker_main(void *param)
/* Run event loop */
for(;;)
{
- connection_t *connection;
+ TP_connection_unix *connection;
struct timespec ts;
set_timespec(ts,threadpool_idle_timeout);
connection = get_event(&this_thread, thread_group, &ts);
if (!connection)
break;
this_thread.event_count++;
- handle_event(connection);
+ tp_callback(connection);
}
/* Thread shutdown: cleanup per-worker-thread structure. */
@@ -1518,16 +1551,17 @@ static void *worker_main(void *param)
}
-bool tp_init()
+TP_pool_unix::TP_pool_unix()
{
- DBUG_ENTER("tp_init");
+ DBUG_ENTER("TP_pool_unix::TP_pool_unix");
threadpool_max_size= MY_MAX(threadpool_size, 128);
all_groups= (thread_group_t *)
my_malloc(sizeof(thread_group_t) * threadpool_max_size, MYF(MY_WME|MY_ZEROFILL));
if (!all_groups)
{
threadpool_max_size= 0;
- DBUG_RETURN(1);
+ sql_print_error("Allocation failed");
+ exit(1);
}
threadpool_started= true;
scheduler_init();
@@ -1536,12 +1570,12 @@ bool tp_init()
{
thread_group_init(&all_groups[i], get_connection_attrib());
}
- tp_set_threadpool_size(threadpool_size);
+ set_pool_size(threadpool_size);
if(group_count == 0)
{
/* Something went wrong */
sql_print_error("Can't set threadpool size to %d",threadpool_size);
- DBUG_RETURN(1);
+ exit(1);
}
PSI_register(mutex);
PSI_register(cond);
@@ -1549,10 +1583,10 @@ bool tp_init()
pool_timer.tick_interval= threadpool_stall_limit;
start_timer(&pool_timer);
- DBUG_RETURN(0);
+ DBUG_VOID_RETURN;
}
-void tp_end()
+TP_pool_unix::~TP_pool_unix()
{
DBUG_ENTER("tp_end");
@@ -1571,13 +1605,10 @@ void tp_end()
/** Ensure that poll descriptors are created when threadpool_size changes */
-
-void tp_set_threadpool_size(uint size)
+int TP_pool_unix::set_pool_size(uint size)
{
bool success= true;
- if (!threadpool_started)
- return;
-
+
for(uint i=0; i< size; i++)
{
thread_group_t *group= &all_groups[i];
@@ -1596,20 +1627,20 @@ void tp_set_threadpool_size(uint size)
if (!success)
{
group_count= i;
- return;
+ return -1;
}
}
group_count= size;
+ return 0;
}
-void tp_set_threadpool_stall_limit(uint limit)
+int TP_pool_unix::set_stall_limit(uint limit)
{
- if (!threadpool_started)
- return;
mysql_mutex_lock(&(pool_timer.mutex));
pool_timer.tick_interval= limit;
mysql_mutex_unlock(&(pool_timer.mutex));
mysql_cond_signal(&(pool_timer.cond));
+ return 0;
}
@@ -1620,7 +1651,7 @@ void tp_set_threadpool_stall_limit(uint limit)
Don't do any locking, it is not required for stats.
*/
-int tp_get_idle_thread_count()
+int TP_pool_unix::get_idle_thread_count()
{
int sum=0;
for (uint i= 0; i < threadpool_max_size && all_groups[i].pollfd >= 0; i++)
diff --git a/sql/threadpool_win.cc b/sql/threadpool_win.cc
index abc41681aa5..2c012a9e77b 100644
--- a/sql/threadpool_win.cc
+++ b/sql/threadpool_win.cc
@@ -13,11 +13,11 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
-#ifdef _WIN32_winT
-#undef _WIN32_winT
+#ifdef _WIN32_WINNT
+#undef _WIN32_WINNT
#endif
-#define _WIN32_winT 0x0601
+#define _WIN32_WINNT 0x0601
#include <my_global.h>
#include <violite.h>
@@ -64,9 +64,9 @@ static void tp_log_warning(const char *msg, const char *fct)
}
-PTP_POOL pool;
-TP_CALLBACK_ENVIRON callback_environ;
-DWORD fls;
+static PTP_POOL pool;
+static TP_CALLBACK_ENVIRON callback_environ;
+static DWORD fls;
static bool skip_completion_port_on_success = false;