summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVladislav Vaintroub <wlad@montyprogram.com>2011-12-08 19:17:49 +0100
committerVladislav Vaintroub <wlad@montyprogram.com>2011-12-08 19:17:49 +0100
commite91bbca5fb080a8d988c156d78c7dc1b1daaad82 (patch)
treeedeb15da451e956ae0d6874657c910ab0df111f8
parent5e7b949e61f4330e27013c8ec81fa3d450e5dce6 (diff)
downloadmariadb-git-e91bbca5fb080a8d988c156d78c7dc1b1daaad82.tar.gz
Initial threadpool implementation for MariaDB 5.5
-rw-r--r--cmake/os/FreeBSD.cmake7
-rw-r--r--include/thr_alarm.h6
-rw-r--r--include/violite.h3
-rwxr-xr-xmysql-test/mysql-test-run.pl4
-rw-r--r--mysql-test/suite/sys_vars/t/slow_launch_time_func.test2
-rw-r--r--mysql-test/t/wait_timeout.test2
-rw-r--r--mysys/thr_alarm.c88
-rw-r--r--sql/CMakeLists.txt13
-rw-r--r--sql/mysqld.cc11
-rw-r--r--sql/net_serv.cc4
-rw-r--r--sql/scheduler.cc21
-rw-r--r--sql/scheduler.h52
-rw-r--r--sql/sql_class.cc37
-rw-r--r--sql/sql_class.h4
-rw-r--r--sql/sql_connect.cc3
-rw-r--r--sql/sql_parse.cc5
-rw-r--r--sql/sys_vars.cc76
-rw-r--r--sql/threadpool.h47
-rw-r--r--sql/threadpool_common.cc246
-rw-r--r--sql/threadpool_unix.cc1238
-rw-r--r--sql/threadpool_win.cc756
-rw-r--r--vio/vio.c25
-rw-r--r--vio/vio_priv.h4
-rw-r--r--vio/viosocket.c83
24 files changed, 2550 insertions, 187 deletions
diff --git a/cmake/os/FreeBSD.cmake b/cmake/os/FreeBSD.cmake
index e09592942c1..9bc7d944da2 100644
--- a/cmake/os/FreeBSD.cmake
+++ b/cmake/os/FreeBSD.cmake
@@ -22,3 +22,10 @@
# The below was used for really old versions of FreeBSD, roughly: before 5.1.9
# ADD_DEFINITIONS(-DHAVE_BROKEN_REALPATH)
+
+# Use atomic builtins
+IF(CMAKE_SIZEOF_VOID_P EQUAL 4 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "i386")
+ SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -march=i686")
+ SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -march=i686")
+ SET(CMAKE_REQUIRED_FLAGS "${CMAKE_REQUIRED_FLAGS} -march=i686")
+ENDIF()
diff --git a/include/thr_alarm.h b/include/thr_alarm.h
index f4823b618f7..66e344d10fd 100644
--- a/include/thr_alarm.h
+++ b/include/thr_alarm.h
@@ -40,7 +40,11 @@ typedef struct st_alarm_info
} ALARM_INFO;
void thr_alarm_info(ALARM_INFO *info);
+extern my_bool my_disable_thr_alarm;
+#ifdef _WIN32
+#define DONT_USE_THR_ALARM
+#endif
#if defined(DONT_USE_THR_ALARM)
#define USE_ALARM_THREAD
@@ -88,7 +92,7 @@ typedef struct st_alarm {
extern uint thr_client_alarm;
extern pthread_t alarm_thread;
-extern my_bool my_disable_thr_alarm;
+
#define thr_alarm_init(A) (*(A))=0
#define thr_alarm_in_use(A) (*(A)!= 0)
diff --git a/include/violite.h b/include/violite.h
index ba057028ed2..05b20245c5a 100644
--- a/include/violite.h
+++ b/include/violite.h
@@ -168,6 +168,7 @@ void vio_end(void);
#define vio_should_retry(vio) (vio)->should_retry(vio)
#define vio_was_interrupted(vio) (vio)->was_interrupted(vio)
#define vio_close(vio) ((vio)->vioclose)(vio)
+#define vio_shutdown(vio,how) ((vio)->shutdown)(vio,how)
#define vio_peer_addr(vio, buf, prt, buflen) (vio)->peer_addr(vio, buf, prt, buflen)
#define vio_timeout(vio, which, seconds) (vio)->timeout(vio, which, seconds)
#define vio_poll_read(vio, timeout) (vio)->poll_read(vio, timeout)
@@ -219,6 +220,7 @@ struct st_vio
void (*timeout)(Vio*, unsigned int which, unsigned int timeout);
my_bool (*poll_read)(Vio *vio, uint timeout);
my_bool (*is_connected)(Vio*);
+ int (*shutdown)(Vio *, int);
my_bool (*has_data) (Vio*);
#ifdef HAVE_OPENSSL
void *ssl_arg;
@@ -235,6 +237,7 @@ struct st_vio
char *shared_memory_pos;
#endif /* HAVE_SMEM */
#ifdef _WIN32
+ DWORD thread_id; /* Used to XP only in vio_shutdown */
OVERLAPPED pipe_overlapped;
DWORD read_timeout_ms;
DWORD write_timeout_ms;
diff --git a/mysql-test/mysql-test-run.pl b/mysql-test/mysql-test-run.pl
index 25427b81eff..e4cd9e6ab89 100755
--- a/mysql-test/mysql-test-run.pl
+++ b/mysql-test/mysql-test-run.pl
@@ -3461,7 +3461,9 @@ sub mysql_install_db {
mtr_add_arg($args, "--loose-skip-ndbcluster");
mtr_add_arg($args, "--loose-skip-aria");
mtr_add_arg($args, "--disable-sync-frm");
- mtr_add_arg($args, "--tmpdir=%s", "$opt_vardir/tmp/");
+ mtr_add_arg($args, "--tmpdir=.");
+ mtr_add_arg($args, "--max_allowed_packet=8M");
+ mtr_add_arg($args, "--net_buffer_length=16K");
mtr_add_arg($args, "--core-file");
if ( $opt_debug )
diff --git a/mysql-test/suite/sys_vars/t/slow_launch_time_func.test b/mysql-test/suite/sys_vars/t/slow_launch_time_func.test
index c9fc357b10f..a5b429f81cb 100644
--- a/mysql-test/suite/sys_vars/t/slow_launch_time_func.test
+++ b/mysql-test/suite/sys_vars/t/slow_launch_time_func.test
@@ -29,7 +29,7 @@
#
# Setup
#
-
+--source include/not_threadpool.inc
--source include/not_embedded.inc
--source include/not_threadpool.inc
diff --git a/mysql-test/t/wait_timeout.test b/mysql-test/t/wait_timeout.test
index eb86cf17ebb..68c0957347d 100644
--- a/mysql-test/t/wait_timeout.test
+++ b/mysql-test/t/wait_timeout.test
@@ -8,7 +8,7 @@
###############################################################################
# These tests cannot run with the embedded server
-- source include/not_embedded.inc
--- source include/one_thread_per_connection.inc
+#-- source include/one_thread_per_connection.inc
# Save the initial number of concurrent sessions
--source include/count_sessions.inc
diff --git a/mysys/thr_alarm.c b/mysys/thr_alarm.c
index adce7407239..4b5144e90bf 100644
--- a/mysys/thr_alarm.c
+++ b/mysys/thr_alarm.c
@@ -597,93 +597,6 @@ static void *alarm_handler(void *arg __attribute__((unused)))
return 0; /* Impossible */
}
#endif /* USE_ALARM_THREAD */
-
-/*****************************************************************************
- thr_alarm for win95
-*****************************************************************************/
-
-#else /* __WIN__ */
-
-void thr_alarm_kill(my_thread_id thread_id)
-{
- /* Can't do this yet */
-}
-
-sig_handler process_alarm(int sig __attribute__((unused)))
-{
- /* Can't do this yet */
-}
-
-
-my_bool thr_alarm(thr_alarm_t *alrm, uint sec, ALARM *alarm)
-{
- (*alrm)= &alarm->alarmed;
- if (alarm_aborted)
- {
- alarm->alarmed.crono=0;
- return 1;
- }
- if (!(alarm->alarmed.crono=SetTimer((HWND) NULL,0, sec*1000,
- (TIMERPROC) NULL)))
- return 1;
- return 0;
-}
-
-
-my_bool thr_got_alarm(thr_alarm_t *alrm_ptr)
-{
- thr_alarm_t alrm= *alrm_ptr;
- MSG msg;
- if (alrm->crono)
- {
- PeekMessage(&msg,NULL,WM_TIMER,WM_TIMER,PM_REMOVE) ;
- if (msg.message == WM_TIMER || alarm_aborted)
- {
- KillTimer(NULL, alrm->crono);
- alrm->crono = 0;
- }
- }
- return !alrm->crono || alarm_aborted;
-}
-
-
-void thr_end_alarm(thr_alarm_t *alrm_ptr)
-{
- thr_alarm_t alrm= *alrm_ptr;
- /* alrm may be zero if thr_alarm aborted with an error */
- if (alrm && alrm->crono)
-
- {
- KillTimer(NULL, alrm->crono);
- alrm->crono = 0;
- }
-}
-
-void end_thr_alarm(my_bool free_structures)
-{
- DBUG_ENTER("end_thr_alarm");
- alarm_aborted=1; /* No more alarms */
- DBUG_VOID_RETURN;
-}
-
-void init_thr_alarm(uint max_alarm)
-{
- DBUG_ENTER("init_thr_alarm");
- alarm_aborted=0; /* Yes, Gimmie alarms */
- DBUG_VOID_RETURN;
-}
-
-void thr_alarm_info(ALARM_INFO *info)
-{
- bzero((char*) info, sizeof(*info));
-}
-
-void resize_thr_alarm(uint max_alarms)
-{
-}
-
-#endif /* __WIN__ */
-
#endif
/****************************************************************************
@@ -954,4 +867,5 @@ int main(int argc __attribute__((unused)),char **argv __attribute__((unused)))
}
#endif /* !defined(DONT_USE_ALARM_THREAD) */
+#endif /* WIN */
#endif /* MAIN */
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt
index ce6c96e55f2..1772c9e12b2 100644
--- a/sql/CMakeLists.txt
+++ b/sql/CMakeLists.txt
@@ -31,7 +31,7 @@ ${CMAKE_CURRENT_BINARY_DIR}/lex_hash.h
SET_SOURCE_FILES_PROPERTIES(${GEN_SOURCES} PROPERTIES GENERATED 1)
-ADD_DEFINITIONS(-DMYSQL_SERVER -DHAVE_EVENT_SCHEDULER)
+ADD_DEFINITIONS(-DMYSQL_SERVER -DHAVE_EVENT_SCHEDULER -DHAVE_POOL_OF_THREADS)
IF(SSL_DEFINES)
ADD_DEFINITIONS(${SSL_DEFINES})
ENDIF()
@@ -82,9 +82,16 @@ SET (SQL_SOURCE
opt_index_cond_pushdown.cc opt_subselect.cc
opt_table_elimination.cc sql_expression_cache.cc
gcalc_slicescan.cc gcalc_tools.cc
-
+ threadpool_common.cc
${GEN_SOURCES}
- ${MYSYS_LIBWRAP_SOURCE})
+ ${MYSYS_LIBWRAP_SOURCE}
+ )
+
+IF(WIN32)
+ SET(SQL_SOURCE ${SQL_SOURCE} threadpool_win.cc)
+ELSE()
+ SET(SQL_SOURCE ${SQL_SOURCE} threadpool_unix.cc)
+ENDIF()
MYSQL_ADD_PLUGIN(partition ha_partition.cc STORAGE_ENGINE DEFAULT STATIC_ONLY
RECOMPILE_FOR_EMBEDDED)
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index c43f516fc2e..303e7d00221 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -73,6 +73,7 @@
#include <waiting_threads.h>
#include "debug_sync.h"
#include "sql_callback.h"
+#include "threadpool.h"
#ifdef WITH_PERFSCHEMA_STORAGE_ENGINE
#include "../storage/perfschema/pfs_server.h"
@@ -5236,6 +5237,8 @@ default_service_handling(char **argv,
int mysqld_main(int argc, char **argv)
{
+ my_progname= argv[0];
+
/*
When several instances are running on the same machine, we
need to have an unique named hEventShudown through the
@@ -7133,6 +7136,10 @@ SHOW_VAR status_vars[]= {
{"Tc_log_page_size", (char*) &tc_log_page_size, SHOW_LONG},
{"Tc_log_page_waits", (char*) &tc_log_page_waits, SHOW_LONG},
#endif
+#ifndef EMBEDDED_LIBRARY
+ {"Threadpool_idle_threads", (char *) &tp_stats.num_waiting_threads, SHOW_INT},
+ {"Threadpool_threads", (char *) &tp_stats.num_worker_threads, SHOW_INT},
+#endif
{"Threads_cached", (char*) &cached_thread_count, SHOW_LONG_NOFLUSH},
{"Threads_connected", (char*) &connection_count, SHOW_INT},
{"Threads_created", (char*) &thread_created, SHOW_LONG_NOFLUSH},
@@ -8018,7 +8025,9 @@ static int get_options(int *argc_ptr, char ***argv_ptr)
else if (thread_handling == SCHEDULER_NO_THREADS)
one_thread_scheduler(thread_scheduler);
else
- pool_of_threads_scheduler(thread_scheduler); /* purecov: tested */
+ pool_of_threads_scheduler(thread_scheduler, &max_connections,
+ &connection_count);
+
one_thread_per_connection_scheduler(extra_thread_scheduler,
&extra_max_connections,
&extra_connection_count);
diff --git a/sql/net_serv.cc b/sql/net_serv.cc
index ccc67e64ea4..9011d570497 100644
--- a/sql/net_serv.cc
+++ b/sql/net_serv.cc
@@ -842,7 +842,7 @@ my_real_read(NET *net, size_t *complen)
DBUG_PRINT("info",("vio_read returned %ld errno: %d",
(long) length, vio_errno(net->vio)));
-#if !defined(__WIN__) || defined(MYSQL_SERVER)
+#if !defined(__WIN__) && defined(MYSQL_SERVER)
/*
We got an error that there was no data on the socket. We now set up
an alarm to not 'read forever', change the socket to the blocking
@@ -874,7 +874,7 @@ my_real_read(NET *net, size_t *complen)
continue;
}
}
-#endif /* (!defined(__WIN__) || defined(MYSQL_SERVER) */
+#endif /* (!defined(__WIN__) && defined(MYSQL_SERVER) */
if (thr_alarm_in_use(&alarmed) && !thr_got_alarm(&alarmed) &&
interrupted)
{ /* Probably in MIT threads */
diff --git a/sql/scheduler.cc b/sql/scheduler.cc
index f04fdef39f9..c174d300d2e 100644
--- a/sql/scheduler.cc
+++ b/sql/scheduler.cc
@@ -79,7 +79,7 @@ static void scheduler_wait_sync_end(void) {
one_thread_scheduler() or one_thread_per_connection_scheduler() in
mysqld.cc, so this init function will always be called.
*/
-static void scheduler_init() {
+void scheduler_init() {
thr_set_lock_wait_callback(scheduler_wait_lock_begin,
scheduler_wait_lock_end);
thr_set_sync_wait_callback(scheduler_wait_sync_begin,
@@ -124,25 +124,6 @@ void one_thread_scheduler(scheduler_functions *func)
}
-#ifdef HAVE_POOL_OF_THREADS
-
-/*
- thd_scheduler keeps the link between THD and events.
- It's embedded in the THD class.
-*/
-
-thd_scheduler::thd_scheduler()
- : m_psi(NULL), logged_in(FALSE), io_event(NULL), thread_attached(FALSE)
-{
-}
-
-
-thd_scheduler::~thd_scheduler()
-{
- my_free(io_event);
-}
-
-#endif
/*
no pluggable schedulers in mariadb.
diff --git a/sql/scheduler.h b/sql/scheduler.h
index 03e1ad385c1..c91df8512ab 100644
--- a/sql/scheduler.h
+++ b/sql/scheduler.h
@@ -76,13 +76,11 @@ void one_thread_per_connection_scheduler(scheduler_functions *func,
ulong *arg_max_connections, uint *arg_connection_count);
void one_thread_scheduler(scheduler_functions *func);
-#if defined(HAVE_LIBEVENT) && !defined(EMBEDDED_LIBRARY)
-#define HAVE_POOL_OF_THREADS 1
-
-struct event;
-
-class thd_scheduler
+/*
+ To be used for pool-of-threads (implemeneted differently on various OSs)
+*/
+struct thd_scheduler
{
public:
/*
@@ -96,29 +94,33 @@ public:
differently.
*/
PSI_thread *m_psi;
-
- bool logged_in;
- struct event* io_event;
- LIST list;
- bool thread_attached; /* Indicates if THD is attached to the OS thread */
-
- thd_scheduler();
- ~thd_scheduler();
- bool init(THD* parent_thd);
- bool thread_attach();
- void thread_detach();
+ void *data; /* scheduler-specific data structure */
+#ifndef DBUG_OFF
+ bool set_explain;
+ char dbug_explain[512];
+#endif
};
-void pool_of_threads_scheduler(scheduler_functions* func);
-#else
+void *thd_get_scheduler_data(THD *thd);
+void thd_set_scheduler_data(THD *thd, void *data);
+PSI_thread* thd_get_psi(THD *thd);
+void thd_set_psi(THD *thd, PSI_thread *psi);
-#define pool_of_threads_scheduler(A) \
- one_thread_per_connection_scheduler(A, &max_connections, \
- &connection_count)
+/* Common thread pool routines, suitable for different implementations */
+extern void threadpool_remove_connection(THD *thd);
+extern int threadpool_process_request(THD *thd);
+extern int threadpool_add_connection(THD *thd);
-class thd_scheduler
-{};
-#endif
+extern scheduler_functions *thread_scheduler;
+#endif /* SCHEDULER_INCLUDED */
+#if !defined(EMBEDDED_LIBRARY)
+#define HAVE_POOL_OF_THREADS 1
+void pool_of_threads_scheduler(scheduler_functions* func,
+ ulong *arg_max_connections,
+ uint *arg_connection_count);
+#else
+#define pool_of_threads_scheduler(A,B,C) \
+ one_thread_per_connection_scheduler(A, B, C)
#endif
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index cc58a131f00..63e1b36073f 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -1536,35 +1536,10 @@ void THD::awake(killed_state state_to_set)
if (state_to_set >= KILL_CONNECTION || state_to_set == NOT_KILLED)
{
#ifdef SIGNAL_WITH_VIO_CLOSE
- if (this != current_thd)
+ if (this != current_thd)
{
- /*
- Before sending a signal, let's close the socket of the thread
- that is being killed ("this", which is not the current thread).
- This is to make sure it does not block if the signal is lost.
- This needs to be done only on platforms where signals are not
- a reliable interruption mechanism.
-
- Note that the downside of this mechanism is that we could close
- the connection while "this" target thread is in the middle of
- sending a result to the application, thus violating the client-
- server protocol.
-
- On the other hand, without closing the socket we have a race
- condition. If "this" target thread passes the check of
- thd->killed, and then the current thread runs through
- THD::awake(), sets the 'killed' flag and completes the
- signaling, and then the target thread runs into read(), it will
- block on the socket. As a result of the discussions around
- Bug#37780, it has been decided that we accept the race
- condition. A second KILL awakes the target from read().
-
- If we are killing ourselves, we know that we are not blocked.
- We also know that we will check thd->killed before we go for
- reading the next statement.
- */
-
- close_active_vio();
+ if(active_vio)
+ vio_shutdown(active_vio, SHUT_RDWR);
}
#endif
@@ -1668,7 +1643,7 @@ void THD::disconnect()
/* Disconnect even if a active vio is not associated. */
if (net.vio != vio)
- vio_close(net.vio);
+ vio_close(net.vio);
mysql_mutex_unlock(&LOCK_thd_data);
}
@@ -1740,6 +1715,10 @@ bool THD::store_globals()
mysys_var->stack_ends_here= thread_stack + // for consistency, see libevent_thread_proc
STACK_DIRECTION * (long)my_thread_stack_size;
+#ifdef _WIN32
+ if (net.vio)
+ net.vio->thread_id= real_id; /* Required to support IO cancelation on XP */
+#endif
/*
We have to call thr_lock_info_init() again here as THD may have been
created in another thread
diff --git a/sql/sql_class.h b/sql/sql_class.h
index f271a6d5cd9..f87af8842d8 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -2339,6 +2339,10 @@ public:
{
mysql_mutex_lock(&LOCK_thd_data);
active_vio = vio;
+#ifdef _WIN32
+ /* Required to support cancelation on XP */
+ active_vio->thread_id = pthread_self();
+#endif
mysql_mutex_unlock(&LOCK_thd_data);
}
inline void clear_active_vio()
diff --git a/sql/sql_connect.cc b/sql/sql_connect.cc
index 3c88b7a054d..570f869c2a6 100644
--- a/sql/sql_connect.cc
+++ b/sql/sql_connect.cc
@@ -890,6 +890,7 @@ static int check_connection(THD *thd)
DBUG_PRINT("info",
("New connection received on %s", vio_description(net->vio)));
+
#ifdef SIGNAL_WITH_VIO_CLOSE
thd->set_active_vio(net->vio);
#endif
@@ -1175,7 +1176,7 @@ void do_handle_one_connection(THD *thd_arg)
/* We need to set this because of time_out_user_resource_limits */
thd->start_utime= thd->thr_create_utime;
- if (MYSQL_CALLBACK_ELSE(thread_scheduler, init_new_connection_thread, (), 0))
+ if (MYSQL_CALLBACK_ELSE(thd->scheduler, init_new_connection_thread, (), 0))
{
close_connection(thd, ER_OUT_OF_RESOURCES);
statistic_increment(aborted_connects,&LOCK_status);
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index 719fcad5883..7ee6c9fb74f 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -678,6 +678,7 @@ void cleanup_items(Item *item)
@retval
1 request of thread shutdown (see dispatch_command() description)
*/
+int skip_net_wait_timeout = 0;
bool do_command(THD *thd)
{
@@ -700,7 +701,9 @@ bool do_command(THD *thd)
the client, the connection is closed or "net_wait_timeout"
number of seconds has passed.
*/
- my_net_set_read_timeout(net, thd->variables.net_wait_timeout);
+ if(!skip_net_wait_timeout)
+ my_net_set_read_timeout(net, thd->variables.net_wait_timeout);
+
/*
XXX: this code is here only to clear possible errors of init_connect.
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index d01f1892d58..f547292e239 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -50,6 +50,7 @@
#ifdef WITH_PERFSCHEMA_STORAGE_ENGINE
#include "../storage/perfschema/pfs_server.h"
#endif /* WITH_PERFSCHEMA_STORAGE_ENGINE */
+#include "threadpool.h"
/*
The rule for this file: everything should be 'static'. When a sys_var
@@ -1804,7 +1805,13 @@ static Sys_var_enum Sys_thread_handling(
", pool-of-threads"
#endif
, READ_ONLY GLOBAL_VAR(thread_handling), CMD_LINE(REQUIRED_ARG),
- thread_handling_names, DEFAULT(0));
+ thread_handling_names,
+#ifdef HAVE_POOL_OF_THREADS
+ DEFAULT(2)
+#else
+ DEFAULT(0)
+#endif
+ );
#ifdef HAVE_QUERY_CACHE
static bool fix_query_cache_size(sys_var *self, THD *thd, enum_var_type type)
@@ -2173,15 +2180,68 @@ static Sys_var_ulong Sys_thread_cache_size(
GLOBAL_VAR(thread_cache_size), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(0, 16384), DEFAULT(0), BLOCK_SIZE(1));
-#ifdef HAVE_POOL_OF_THREADS
-static Sys_var_ulong Sys_thread_pool_size(
- "thread_pool_size",
- "How many threads we should create to handle query requests in "
- "case of 'thread_handling=pool-of-threads'",
- GLOBAL_VAR(thread_pool_size), CMD_LINE(REQUIRED_ARG),
- VALID_RANGE(1, 16384), DEFAULT(20), BLOCK_SIZE(0));
+#ifndef HAVE_POOL_OF_THREADS
+static bool fix_tp_max_threads(sys_var *, THD *, enum_var_type)
+{
+#ifdef _WIN32
+ tp_set_max_threads(threadpool_max_threads);
+#endif
+ return false;
+}
+
+#ifdef _WIN32
+static bool fix_tp_min_threads(sys_var *, THD *, enum_var_type)
+{
+ tp_set_min_threads(threadpool_min_threads);
+ return false;
+}
#endif
+#ifdef _WIN32
+static Sys_var_uint Sys_threadpool_min_threads(
+ "thread_pool_min_threads",
+ "Minimuim number of threads in the thread pool.",
+ GLOBAL_VAR(threadpool_min_threads), CMD_LINE(REQUIRED_ARG),
+ VALID_RANGE(1, 256), DEFAULT(1), BLOCK_SIZE(1),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
+ ON_UPDATE(fix_tp_min_threads)
+ );
+#else
+static Sys_var_uint Sys_threadpool_idle_thread_timeout(
+ "thread_pool_idle_timeout",
+ "Timeout in seconds for an idle thread in the thread pool."
+ "Worker thread will be shut down after timeout",
+ GLOBAL_VAR(threadpool_idle_timeout), CMD_LINE(REQUIRED_ARG),
+ VALID_RANGE(1, UINT_MAX/100), DEFAULT(60000), BLOCK_SIZE(1)
+);
+static Sys_var_uint Sys_threadpool_size(
+ "thread_pool_size",
+ "Number of concurrently executing threads in the pool. "
+ "Leaving value default (0) sets it to the number of processors.",
+ GLOBAL_VAR(threadpool_size), CMD_LINE(REQUIRED_ARG),
+ VALID_RANGE(0, 128), DEFAULT(0), BLOCK_SIZE(1)
+);
+static Sys_var_uint Sys_threadpool_stall_limit(
+ "thread_pool_stall_limit",
+ "Maximum query execution time before in milliseconds,"
+ "before an executing non-yielding thread is considered stalled."
+ "If a worker thread is stalled, additional worker thread "
+ "may be created to handle remaining clients.",
+ GLOBAL_VAR(threadpool_stall_limit), CMD_LINE(REQUIRED_ARG),
+ VALID_RANGE(60, UINT_MAX), DEFAULT(500), BLOCK_SIZE(1)
+);
+#endif /*! WIN32 */
+static Sys_var_uint Sys_threadpool_max_threads(
+ "thread_pool_max_threads",
+ "Maximum allowed number of worker threads in the thread pool",
+ GLOBAL_VAR(threadpool_max_threads), CMD_LINE(REQUIRED_ARG),
+ VALID_RANGE(1, UINT_MAX), DEFAULT(3000), BLOCK_SIZE(1),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
+ ON_UPDATE(fix_tp_max_threads)
+);
+#endif /* !HAVE_POOL_OF_THREADS */
+
+
/**
Can't change the 'next' tx_isolation if we are already in a
transaction.
diff --git a/sql/threadpool.h b/sql/threadpool.h
new file mode 100644
index 00000000000..0694981d76f
--- /dev/null
+++ b/sql/threadpool.h
@@ -0,0 +1,47 @@
+
+/* Threadpool parameters */
+#ifdef _WIN32
+extern uint threadpool_min_threads; /* Minimum threads in pool */
+#else
+extern uint threadpool_idle_timeout; /* Shutdown idle worker threads after this timeout */
+extern uint threadpool_size; /* Number of parallel executing threads */
+extern uint threadpool_stall_limit; /* time interval in 10 ms units for stall checks*/
+#endif
+extern uint threadpool_max_threads; /* Maximum threads in pool */
+
+/*
+ Threadpool statistics
+*/
+struct TP_STATISTICS
+{
+ /* Current number of worker thread. */
+ volatile int num_worker_threads;
+ /* Current number of idle threads. */
+ volatile int num_waiting_threads;
+ /* Number of login requests are queued but not yet processed. */
+ volatile int pending_login_requests;
+ /* Number of threads that are starting. */
+ volatile int pending_thread_starts;
+ /* Number of threads that are being shut down */
+ volatile int pending_thread_shutdowns;
+ /* Time (in milliseconds) since pool is blocked (num_waiting_threads is 0) */
+ ulonglong pool_block_duration;
+ /* Maximum duration of the pending login, im milliseconds. */
+ ulonglong pending_login_duration;
+ /* Time since last thread was created */
+ ulonglong time_since_last_thread_creation;
+ /* Number of requests processed since pool monitor run last time. */
+ volatile int requests_dequeued;
+ volatile int requests_completed;
+};
+
+extern TP_STATISTICS tp_stats;
+
+
+/* Functions to set threadpool parameters */
+extern void tp_set_min_threads(uint val);
+extern void tp_set_max_threads(uint val);
+
+/* Activate threadpool scheduler */
+extern void tp_scheduler(void);
+
diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc
new file mode 100644
index 00000000000..01546162377
--- /dev/null
+++ b/sql/threadpool_common.cc
@@ -0,0 +1,246 @@
+#include <my_global.h>
+#include <violite.h>
+#include <sql_priv.h>
+#include <sql_class.h>
+#include <my_pthread.h>
+#include <scheduler.h>
+#include <sql_connect.h>
+#include <sql_audit.h>
+#include <debug_sync.h>
+
+
+extern bool login_connection(THD *thd);
+extern bool do_command(THD *thd);
+extern void prepare_new_connection_state(THD* thd);
+extern void end_connection(THD *thd);
+extern void thd_cleanup(THD *thd);
+extern void delete_thd(THD *thd);
+
+/* Threadpool parameters */
+#ifdef _WIN32
+uint threadpool_min_threads;
+#else
+uint threadpool_idle_timeout;
+uint threadpool_size;
+uint threadpool_stall_limit;
+#endif
+uint threadpool_max_threads;
+
+
+/*
+ Attach/associate the connection with the OS thread, for command processing.
+*/
+static inline bool thread_attach(THD* thd, char *stack_start, PSI_thread **save_psi_thread)
+{
+ DBUG_ENTER("thread_attach");
+
+ if (PSI_server)
+ {
+ *save_psi_thread= PSI_server->get_thread();
+ PSI_server->set_thread(thd->event_scheduler.m_psi);
+ }
+ else
+ *save_psi_thread= NULL;
+
+ /*
+ We need to know the start of the stack so that we could check for
+ stack overruns.
+ */
+ thd->thread_stack= stack_start;
+
+
+ /* Calls close_connection() on failure */
+ if (setup_connection_thread_globals(thd))
+ {
+ DBUG_RETURN(TRUE);
+ }
+
+ /* clear errors from processing the previous THD */
+ my_errno= 0;
+ thd->mysys_var->abort= 0;
+
+#ifndef DBUG_OFF
+ if (thd->event_scheduler.set_explain)
+ DBUG_SET(thd->event_scheduler.dbug_explain);
+#endif
+
+ DBUG_RETURN(FALSE);
+}
+
+/*
+ Detach/disassociate the connection with the OS thread.
+*/
+static inline void thread_detach(THD* thd, PSI_thread *restore_psi_thread)
+{
+ DBUG_ENTER("thread_detach");
+ thd->mysys_var = NULL;
+#ifndef DBUG_OFF
+ /*
+ If during the session @@session.dbug was assigned, the
+ dbug options/state has been pushed. Check if this is the
+ case, to be able to restore the state when we attach this
+ logical connection to a physical thread.
+ */
+ if (_db_is_pushed_())
+ {
+ thd->event_scheduler.set_explain= TRUE;
+ if (DBUG_EXPLAIN(thd->event_scheduler.dbug_explain, sizeof(thd->event_scheduler.dbug_explain)))
+ sql_print_error("thd_scheduler: DBUG_EXPLAIN buffer is too small");
+ }
+ /* DBUG_POP() is a no-op in case there is no session state */
+ DBUG_POP();
+#endif
+ if (PSI_server)
+ PSI_server->set_thread(restore_psi_thread);
+ pthread_setspecific(THR_THD, NULL);
+ DBUG_VOID_RETURN;
+}
+
+
+
+int threadpool_add_connection(THD *thd)
+{
+ int retval=1;
+ PSI_thread *psi_thread;
+#ifndef DBUG_OFF
+ thd->event_scheduler.set_explain = 0;
+#endif
+ thread_attach(thd, (char *)&thd, &psi_thread);
+ ulonglong now= microsecond_interval_timer();
+ thd->prior_thr_create_utime= now;
+ thd->start_utime= now;
+ thd->thr_create_utime= now;
+
+ if (PSI_server)
+ {
+ thd->event_scheduler.m_psi =
+ PSI_server->new_thread(key_thread_one_connection, thd, thd->thread_id);
+ PSI_server->set_thread(thd->event_scheduler.m_psi);
+ }
+
+ if (setup_connection_thread_globals(thd) == 0)
+ {
+ if (login_connection(thd) == 0)
+ {
+ prepare_new_connection_state(thd);
+ retval = thd_is_connection_alive(thd)?0:-1;
+ thd->net.reading_or_writing= 1;
+ }
+ }
+
+ thread_detach(thd, psi_thread);
+ return retval;
+}
+
+void threadpool_remove_connection(THD *thd)
+{
+ PSI_thread *save_psi_thread;
+
+ thread_attach(thd, (char *)&thd, &save_psi_thread);
+ thd->killed= KILL_CONNECTION;
+
+ thd->net.reading_or_writing= 0;
+
+ end_connection(thd);
+ close_connection(thd, 0);
+
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ thd->event_scheduler.data= NULL;
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+
+ unlink_thd(thd);
+ mysql_mutex_unlock(&LOCK_thread_count);
+ mysql_cond_broadcast(&COND_thread_count);
+ DBUG_POP();
+ if (PSI_server)
+ PSI_server->delete_current_thread();
+ pthread_setspecific(THR_THD, NULL);
+}
+
+int threadpool_process_request(THD *thd)
+{
+ int retval= 0;
+ PSI_thread *psi_thread;
+ thread_attach(thd, (char *)&thd, &psi_thread);
+
+ if (thd->killed == KILL_CONNECTION)
+ {
+ /*
+ kill flag can be set have been killed by
+ timeout handler or by a KILL command
+ */
+ thread_detach(thd, psi_thread);
+ return 1;
+ }
+
+ for(;;)
+ {
+ Vio *vio;
+ thd->net.reading_or_writing= 0;
+ mysql_audit_release(thd);
+
+ if ((retval= do_command(thd)) != 0)
+ break ;
+
+ if (!thd_is_connection_alive(thd))
+ {
+ retval= 1;
+ break;
+ }
+
+ vio= thd->net.vio;
+ if (!vio->has_data(vio))
+ {
+ /*
+ More info on this debug sync is in sql_parse.cc
+ */
+ DEBUG_SYNC(thd, "before_do_command_net_read");
+ break;
+ }
+ }
+ thread_detach(thd, psi_thread);
+ if (!retval)
+ thd->net.reading_or_writing= 1;
+ return retval;
+}
+
+
+/*
+ Scheduler struct, individual functions are implemented
+ in threadpool_unix.cc or threadpool_win.cc
+*/
+
+extern bool tp_init();
+extern void tp_add_connection(THD*);
+extern void tp_wait_begin(THD *, int);
+extern void tp_wait_end(THD*);
+extern void tp_post_kill_notification(THD *thd);
+extern void tp_end(void);
+
+static scheduler_functions tp_scheduler_functions=
+{
+ 0, // max_threads
+ NULL,
+ NULL,
+ tp_init, // init
+ NULL, // init_new_connection_thread
+ tp_add_connection, // add_connection
+ tp_wait_begin, // thd_wait_begin
+ tp_wait_end, // thd_wait_end
+ tp_post_kill_notification, // post_kill_notification
+ NULL, // end_thread
+ tp_end // end
+};
+
+extern void scheduler_init();
+
+void pool_of_threads_scheduler(struct scheduler_functions *func,
+ ulong *arg_max_connections,
+ uint *arg_connection_count)
+{
+ *func = tp_scheduler_functions;
+ func->max_threads= *arg_max_connections + 1;
+ func->max_connections= arg_max_connections;
+ func->connection_count= arg_connection_count;
+ scheduler_init();
+}
diff --git a/sql/threadpool_unix.cc b/sql/threadpool_unix.cc
new file mode 100644
index 00000000000..9fa95f151a5
--- /dev/null
+++ b/sql/threadpool_unix.cc
@@ -0,0 +1,1238 @@
+#include <my_global.h>
+#include <violite.h>
+#include <sql_priv.h>
+#include <sql_class.h>
+#include <my_pthread.h>
+#include <scheduler.h>
+#include <sql_connect.h>
+#include <mysqld.h>
+#include <debug_sync.h>
+#include <sys/queue.h>
+#include <time.h>
+
+#include <threadpool.h>
+#ifdef __linux__
+#include <sys/epoll.h>
+typedef struct epoll_event native_event;
+#endif
+#if defined (__FreeBSD__) || defined (__APPLE__)
+#include <sys/event.h>
+typedef struct kevent native_event;
+#endif
+#if defined (__sun)
+#include <port.h>
+typedef port_event_t native_event;
+#endif
+
+
+static PSI_mutex_key key_group_mutex;
+static PSI_mutex_key key_timer_mutex;
+static PSI_mutex_info mutex_list[]=
+{
+ { &key_group_mutex, "group_mutex", 0},
+ { &key_timer_mutex, "timer_mutex", PSI_FLAG_GLOBAL}
+};
+
+static PSI_cond_key key_worker_cond;
+static PSI_cond_key key_timer_cond;
+static PSI_cond_info cond_list[]=
+{
+ { &key_worker_cond, "worker_cond", 0},
+ { &key_timer_cond, "timer_cond", PSI_FLAG_GLOBAL}
+};
+
+static PSI_thread_key key_worker_thread;
+static PSI_thread_key key_timer_thread;
+static PSI_thread_info thread_list[] =
+{
+ {&key_worker_thread, "worker_thread", 0},
+ {&key_timer_thread, "timer_thread", PSI_FLAG_GLOBAL}
+};
+
+
+TP_STATISTICS tp_stats;
+
+
+struct thread_group_t;
+
+/* Per-thread structure for workers */
+struct worker_thread_t
+{
+ mysql_cond_t cond;
+ bool woken;
+ thread_group_t* thread_group;
+ ulonglong event_count; /* Stats: number of executed requests */
+ SLIST_ENTRY(worker_thread_t) ptr;
+};
+
+/*
+ Data associated with an io event (also can be sent with with explicit
+ post_event())
+*/
+struct pool_event_t
+{
+ STAILQ_ENTRY (pool_event_t) next;
+ void *data;
+};
+static pool_event_t POOL_SHUTDOWN_EVENT;
+
+struct thread_group_t
+{
+
+ mysql_mutex_t mutex;
+ STAILQ_HEAD(queue_listhead, pool_event_t) queue;
+ SLIST_HEAD(wait_listhead, worker_thread_t) waiting_threads;
+ int pollfd;
+ int thread_count;
+ int active_thread_count;
+ int pending_thread_start_count;
+ int connection_count;
+ bool shutdown;
+ bool stalled;
+ int shutdown_pipe[2];
+ worker_thread_t *listener;
+ pthread_attr_t *pthread_attr;
+ ulonglong last_thread_creation_time;
+ /* Stats for the deadlock detection timer routine.*/
+ ulonglong io_event_count;
+ ulonglong queue_event_count;
+} MY_ALIGNED(512);
+
+static thread_group_t all_groups[128];
+
+/* Global timer for all groups */
+struct pool_timer_t
+{
+ mysql_mutex_t mutex;
+ mysql_cond_t cond;
+ int tick_interval;
+ volatile ulonglong current_microtime;
+ volatile ulonglong next_timeout_check;
+ bool shutdown;
+};
+
+static pool_timer_t pool_timer;
+
+struct connection_t
+{
+ pool_event_t event;
+ THD *thd;
+ thread_group_t *thread_group;
+ ulonglong abs_wait_timeout;
+ bool logged_in;
+ bool waiting;
+};
+
+/* Externals functions and variables we use */
+extern uint thread_created;
+extern void scheduler_init();
+extern pthread_attr_t *get_connection_attrib(void);
+extern int skip_net_wait_timeout;
+
+
+static void post_event(thread_group_t *thread_group, pool_event_t* ev);
+static int wake_thread(thread_group_t *thread_group);
+static void handle_event(pool_event_t *ev);
+static int wake_or_create_thread(thread_group_t *thread_group);
+static int create_worker(thread_group_t *thread_group);
+static void *worker_main(void *param);
+static void check_stall(thread_group_t *thread_group);
+static void connection_abort(connection_t *connection);
+void tp_post_kill_notification(THD *thd);
+static void set_wait_timeout(connection_t *connection);
+static void set_next_timeout_check(ulonglong abstime);
+
+
+/**
+ Asynchronous network IO.
+
+ We use native edge-triggered network IO multiplexing facility.
+ This maps to different APIs on different Unixes.
+
+ Supported are currently Linux with epoll, Solaris with event ports,
+ OSX and BSD with kevent. All those API's are used with one-shot flags
+ (the event is signalled once client has written something into the socket,
+ then socket is removed from the "poll-set" until the command is finished,
+ and we need to re-arm/re-register socket)
+
+ No implementation for poll/select/AIO is currently provided.
+
+ The API closely resembles all of the above mentioned platform APIs
+ and consists of following functions.
+
+ - io_poll_create()
+ Creates an io_poll descriptor
+ On Linux: epoll_create()
+
+ - io_poll_associate_fd(int poll_fd, int fd, void *data)
+ Associate file descriptor with io poll descriptor
+ On Linux : epoll_ctl(..EPOLL_CTL_ADD))
+
+ - io_poll_disassociate_fd(int pollfd, int fd)
+ Associate file descriptor with io poll descriptor
+ On Linux: epoll_ctl(..EPOLL_CTL_DEL)
+
+
+ - io_poll_start_read(int poll_fd,int fd, void *data)
+ The same as io_poll_associate_fd(), but cannot be used before
+ io_poll_associate_fd() was called.
+ On Linux : epoll_ctl(..EPOLL_CTL_MOD)
+
+ - io_poll_wait (int pollfd, native_event *native_events, int maxevents,
+ int timeout_ms)
+
+ wait until one or more descriptors added with io_poll_associate_fd()
+ or io_poll_start_read() becomes readable. Data associated with
+ descriptors can be retrieved from native_events array, using
+ native_event_get_userdata() function.
+
+ On Linux: epoll_wait()
+*/
+
+#if defined (__linux__)
+static int io_poll_create()
+{
+ return epoll_create(1);
+}
+
+
+int io_poll_associate_fd(int pollfd, int fd, void *data)
+{
+ struct epoll_event ev;
+ ev.data.ptr= data;
+ ev.events= EPOLLIN|EPOLLET|EPOLLERR|EPOLLRDHUP|EPOLLONESHOT;
+ return epoll_ctl(pollfd, EPOLL_CTL_ADD, fd, &ev);
+}
+
+
+
+int io_poll_start_read(int pollfd, int fd, void *data)
+{
+ struct epoll_event ev;
+ ev.data.ptr= data;
+ ev.events= EPOLLIN|EPOLLET|EPOLLERR|EPOLLRDHUP|EPOLLONESHOT;
+ return epoll_ctl(pollfd, EPOLL_CTL_MOD, fd, &ev);
+}
+
+void io_poll_disassociate_fd(int pollfd, int fd)
+{
+ struct epoll_event ev;
+ epoll_ctl(pollfd, EPOLL_CTL_DEL, fd, &ev);
+}
+
+
+int io_poll_wait(int pollfd, native_event *native_events, int maxevents,
+ int timeout_ms)
+{
+ int ret;
+ do
+ {
+ ret = epoll_wait(pollfd, native_events, maxevents, timeout_ms);
+ }
+ while(ret == -1 && errno == EINTR);
+ return ret;
+}
+
+static void *native_event_get_userdata(native_event *event)
+{
+ return event->data.ptr;
+}
+
+#elif defined (__FreeBSD__) || defined (__APPLE__)
+int io_poll_create()
+{
+ return kqueue();
+}
+
+int io_poll_start_read(int pollfd, int fd, void *data)
+{
+ struct kevent ke;
+ EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ENABLE|EV_CLEAR,
+ 0, 0, data);
+ return kevent(pollfd, &ke, 1, 0, 0, 0);
+}
+
+
+int io_poll_associate_fd(int pollfd, int fd, void *data)
+{
+ return io_poll_start_read(poolfd,fd, data);
+}
+
+
+int io_poll_disassociate_fd(thread_group_t *thread_group, int fd)
+{
+ struct kevent ke;
+ EV_SET(&ke,fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
+ return kevent(thread_group->pollfd, &ke, 1, 0, 0, 0);
+}
+
+
+int io_poll_wait(int pollfd, struct kevent *events, int maxevents, int timeout_ms)
+{
+ struct timespec ts;
+ int ret;
+ if (timeout_ms >= 0)
+ {
+ ts.tv_sec= timeout_ms/1000;
+ ts.tv_nsec= (timeout_ms%1000)*1000000;
+ }
+ do
+ {
+ ret= kevent(pollfd, 0, 0, events, maxevents,
+ (timeout_ms >= 0)?&ts:NULL);
+ }
+ while (ret == -1 && errno == EINTR);
+ if (ret > 0)
+ {
+ /* Disable monitoring for the events we that we dequeued */
+ for (int i=0; i < ret; i++)
+ {
+ struct kevent *ke = &events[i];
+ EV_SET(ke, ke->ident, EVFILT_READ, EV_ADD|EV_DISABLE,
+ 0, 0, ke->udata);
+ }
+ kevent(pollfd, events, ret, 0, 0, 0);
+ }
+ return ret;
+}
+
+static void* native_event_get_userdata(native_event *event)
+{
+ return event->udata;
+}
+#elif defined (__sun)
+static int io_poll_create()
+{
+ return port_create();
+}
+
+int io_poll_start_read(int pollfd, int fd, void *data)
+{
+ return port_associate(pollfd, PORT_SOURCE_FD, fd, POLLIN, data);
+}
+
+static int io_poll_associate_fd(int pollfd, int fd, void *data)
+{
+ return io_poll_start_read(pollfd, fd, data);
+}
+
+int io_poll_wait(int pollfd, native_event *events, int maxevents, int timeout_ms)
+{
+ struct timespec ts;
+ int ret;
+ uint_t nget= 1;
+ if (timeout_ms >= 0)
+ {
+ ts.tv_sec= timeout_ms/1000;
+ ts.tv_nsec= (timeout_ms%1000)*1000000;
+ }
+ do
+ {
+ ret= port_getn(pollfd, events, maxevents, &nget,
+ (timeout_ms >= 0)?&ts:NULL);
+ }
+ while (ret == -1 && errno == EINTR);
+ return nget;
+}
+static void* native_event_get_userdata(native_event *event)
+{
+ return event->portev_user;
+}
+#else
+#error not ported yet to this OS
+#endif
+
+
+
+
+/* Dequeue element from a workqueue */
+static pool_event_t *queue_get(thread_group_t *thread_group)
+{
+ DBUG_ENTER("queue_get");
+ pool_event_t *ev= NULL;
+ thread_group->queue_event_count++;
+ ev= STAILQ_FIRST(&thread_group->queue);
+ if (ev)
+ {
+ STAILQ_REMOVE_HEAD(&thread_group->queue,next);
+ }
+ DBUG_RETURN(ev);
+}
+
+/* Check if workqueue is empty. */
+static bool queue_is_empty(thread_group_t* thread_group)
+{
+ DBUG_ENTER("queue_is_empty");
+ bool empty= (STAILQ_FIRST(&thread_group->queue) == NULL);
+ DBUG_RETURN(empty);
+}
+
+static void queue_put(thread_group_t *thread_group, pool_event_t *event)
+{
+ DBUG_ENTER("queue_put");
+ STAILQ_INSERT_TAIL(&thread_group->queue, event, next);
+ DBUG_VOID_RETURN;
+}
+
+static void increment_active_threads(thread_group_t *thread_group)
+{
+ my_atomic_add32(&tp_stats.num_waiting_threads,-1);
+ thread_group->active_thread_count++;
+}
+
+static void decrement_active_threads(thread_group_t *thread_group)
+{
+ my_atomic_add32(&tp_stats.num_waiting_threads,1);
+ thread_group->active_thread_count--;
+}
+
+
+/*
+ Handle wait timeout :
+ Find connections that have been idle for too long and kill them.
+ Also, recalculate time when next timeout check should run.
+*/
+static void timeout_check(pool_timer_t *timer)
+{
+ DBUG_ENTER("timeout_check");
+
+ mysql_mutex_lock(&LOCK_thread_count);
+ I_List_iterator<THD> it(threads);
+
+ /* Reset next timeout check, it will be recalculated in the loop below */
+ my_atomic_fas64((volatile int64*)&timer->next_timeout_check, ULONGLONG_MAX);
+
+ THD *thd;
+ while ((thd=it++))
+ {
+ if (thd->net.reading_or_writing != 1)
+ continue;
+
+ connection_t *connection= (connection_t *)thd->scheduler.data;
+ if (!connection)
+ continue;
+
+ if(connection->abs_wait_timeout < timer->current_microtime)
+ {
+ /* Wait timeout exceeded, kill connection. */
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ thd->killed = THD::KILL_CONNECTION;
+ tp_post_kill_notification(thd);
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ }
+ else
+ {
+ set_next_timeout_check(connection->abs_wait_timeout);
+ }
+ }
+ mysql_mutex_unlock(&LOCK_thread_count);
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Timer thread.
+
+ Periodically, check if one of the thread groups is stalled. Stalls happen if
+ events are not being dequeued from the queue, or from the network, Primary
+ reason for stall can be a lengthy executing non-blocking request. It could
+ also happen that thread is waiting but wait_begin/wait_end is forgotten by
+ storage engine. Timer thread will create a new thread in group in case of
+ a stall.
+
+ Besides checking for stalls, timer thread is also responsible for terminating
+ clients that have been idle for longer than wait_timeout seconds.
+*/
+static void* timer_thread(void *param)
+{
+ uint i;
+
+ pool_timer_t* timer=(pool_timer_t *)param;
+ timer->next_timeout_check= ULONGLONG_MAX;
+ timer->current_microtime= my_micro_time();
+
+ my_thread_init();
+ DBUG_ENTER("timer_thread");
+
+ for(;;)
+ {
+ struct timespec ts;
+ set_timespec_nsec(ts,timer->tick_interval*1000000);
+ mysql_mutex_lock(&timer->mutex);
+ int err = mysql_cond_timedwait(&timer->cond, &timer->mutex, &ts);
+ if (timer->shutdown)
+ break;
+ if (err == ETIMEDOUT)
+ {
+ timer->current_microtime= my_micro_time();
+
+ /* Check stallls in thread groups */
+ for(i=0; i< threadpool_size;i++)
+ {
+ if(all_groups[i].connection_count)
+ check_stall(&all_groups[i]);
+ }
+
+ /* Check if any client exceeded wait_timeout */
+ if (timer->next_timeout_check <= timer->current_microtime)
+ timeout_check(timer);
+ }
+ mysql_mutex_unlock(&timer->mutex);
+ }
+ DBUG_POP();
+ my_thread_end();
+ return NULL;
+}
+
+
+
+void check_stall(thread_group_t *thread_group)
+{
+ if (mysql_mutex_trylock(&thread_group->mutex) != 0)
+ {
+ /* Something happens. Don't disturb */
+ return;
+ }
+
+ /*
+ Check if listener is present. If not, check whether any IO
+ events were dequeued since last time. If not, this means
+ listener is either in tight loop or thd_wait_begin()
+ was forgotten. Create a new worker(it will make itself listener).
+ */
+ if (!thread_group->listener && !thread_group->io_event_count)
+ {
+ wake_or_create_thread(thread_group);
+ mysql_mutex_unlock(&thread_group->mutex);
+ return;
+ }
+
+ /* Reset io event count */
+ thread_group->io_event_count= 0;
+
+ /*
+ Check whether requests from the workqueue are being dequeued.
+ */
+ if (!queue_is_empty(thread_group) && !thread_group->queue_event_count)
+ {
+ thread_group->stalled= true;
+ wake_or_create_thread(thread_group);
+ }
+
+ /* Reset queue event count */
+ thread_group->queue_event_count= 0;
+
+ mysql_mutex_unlock(&thread_group->mutex);
+}
+
+
+
+static void start_timer(pool_timer_t* timer)
+{
+ pthread_t thread_id;
+ DBUG_ENTER("start_timer");
+ mysql_mutex_init(key_timer_mutex,&timer->mutex, NULL);
+ mysql_cond_init(key_timer_cond, &timer->cond, NULL);
+ timer->shutdown = false;
+ mysql_thread_create(key_timer_thread,&thread_id, NULL, timer_thread, timer);
+ DBUG_VOID_RETURN;
+}
+
+static void stop_timer(pool_timer_t *timer)
+{
+ DBUG_ENTER("stop_timer");
+ mysql_mutex_lock(&timer->mutex);
+ timer->shutdown = true;
+ mysql_cond_signal(&timer->cond);
+ mysql_mutex_unlock(&timer->mutex);
+ DBUG_VOID_RETURN;
+}
+
+#define MAX_EVENTS 1024
+
+/*
+ Poll for socket events and distribute them to worker threads.
+ In many case current thread will handle single event itself.
+*/
+static pool_event_t * listener(worker_thread_t *current_thread,
+ thread_group_t *thread_group)
+{
+ DBUG_ENTER("listener");
+
+ for(;;)
+ {
+ native_event ev[MAX_EVENTS];
+ int cnt;
+
+ if (thread_group->shutdown)
+ {
+ DBUG_RETURN(&POOL_SHUTDOWN_EVENT);
+ }
+ do
+ {
+ cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, -1);
+ }
+ while(cnt <= 0 && errno == EINTR);
+
+ if (cnt <=0)
+ {
+ DBUG_ASSERT(thread_group->shutdown);
+ DBUG_RETURN(&POOL_SHUTDOWN_EVENT);
+ }
+
+ /*
+ Put events to queue, maybe wakeup workers.
+ If queue is currently empty, listener will return
+ so the current thread handles query itself, this avoids
+ wakeups and context switches. But if queue is not empty
+ this smells like a flood of queries, and the listener
+ stays.
+ */
+ mysql_mutex_lock(&thread_group->mutex);
+
+ if (thread_group->shutdown)
+ {
+ mysql_mutex_unlock(&thread_group->mutex);
+ DBUG_RETURN(&POOL_SHUTDOWN_EVENT);
+ }
+
+ thread_group->io_event_count += cnt;
+ bool pick_event= queue_is_empty(thread_group);
+
+ for(int i=(pick_event)?1:0; i < cnt ; i++)
+ {
+ pool_event_t *e= (pool_event_t *)native_event_get_userdata(&ev[i]);
+ queue_put(thread_group, e);
+ }
+
+ /* Wake at most one worker thread */
+ if(thread_group->active_thread_count==0 &&
+ /*!queue_is_empty(thread_group)*/ !pick_event)
+ {
+ if(wake_thread(thread_group))
+ {
+ if(thread_group->thread_count == 1)
+ create_worker(thread_group);
+ }
+ }
+ mysql_mutex_unlock(&thread_group->mutex);
+
+ if (pick_event)
+ DBUG_RETURN((pool_event_t *)(native_event_get_userdata(&ev[0])));
+ }
+}
+
+
+
+
+/* Creates a new worker thread. thread_mutex must be held when calling this function */
+
+static int create_worker(thread_group_t *thread_group)
+{
+ pthread_t thread_id;
+ int err;
+ DBUG_ENTER("create_worker");
+ if (tp_stats.num_worker_threads >= (int)threadpool_max_threads)
+ {
+ DBUG_PRINT("info",
+ ("Cannot create new thread (maximum allowed threads reached)"));
+ DBUG_RETURN(-1);
+ }
+
+ err= pthread_create(&thread_id, thread_group->pthread_attr, worker_main, thread_group);
+ if (!err)
+ {
+ thread_group->pending_thread_start_count++;
+ thread_group->last_thread_creation_time=my_micro_time();
+ }
+ DBUG_RETURN(err);
+}
+
+
+/*
+ Wakes a worker thread, or creates a new one.
+
+ Worker creation is throttled, so we avoid too many threads
+ to be created during the short time.
+*/
+static int wake_or_create_thread(thread_group_t *thread_group)
+{
+ ulonglong now;
+ ulonglong time_since_last_thread_created;
+
+ DBUG_ENTER("wake_or_create_thread");
+
+ if (wake_thread(thread_group) == 0)
+ DBUG_RETURN(0);
+
+ if (thread_group->pending_thread_start_count > 0)
+ DBUG_RETURN(-1);
+
+ if (thread_group->thread_count < 4)
+ {
+ DBUG_RETURN(create_worker(thread_group));
+ }
+
+ now = my_micro_time();
+ time_since_last_thread_created =
+ (now - thread_group->last_thread_creation_time)/1000;
+
+ if (thread_group->active_thread_count == 0)
+ {
+ /*
+ We're better off creating a new thread here with no delay, as
+ others threads (at least 4) are all blocking and there was no sleeping
+ thread to wakeup. It smells like deadlock or very slowly executing
+ requests, e.g sleeps or user locks.
+ */
+ DBUG_RETURN(create_worker(thread_group));
+ }
+
+ /* Throttle thread creation. */
+ if ((thread_group->thread_count < 8 && time_since_last_thread_created > 50)
+ || (thread_group->thread_count < 16 && time_since_last_thread_created > 100)
+ || (time_since_last_thread_created > 200))
+ {
+ DBUG_RETURN(create_worker(thread_group));
+ }
+
+ DBUG_RETURN(-1);
+}
+
+
+
+/* Initialize thread group */
+int thread_group_init(thread_group_t *thread_group, pthread_attr_t* thread_attr)
+{
+ DBUG_ENTER("thread_group_init");
+
+ memset(thread_group, 0, sizeof(thread_group_t));
+ thread_group->pthread_attr = thread_attr;
+ mysql_mutex_init(key_group_mutex, &thread_group->mutex, NULL);
+ STAILQ_INIT(&thread_group->queue);
+ SLIST_INIT(&thread_group->waiting_threads);
+
+ thread_group->pending_thread_start_count= 0;
+ thread_group->pollfd= io_poll_create();
+ thread_group->stalled= false;
+ if (thread_group->pollfd < 0)
+ {
+ DBUG_RETURN(-1);
+ }
+ if (pipe(thread_group->shutdown_pipe))
+ {
+ DBUG_RETURN(-1);
+ }
+ if (io_poll_associate_fd(thread_group->pollfd,
+ thread_group->shutdown_pipe[0], &POOL_SHUTDOWN_EVENT))
+ {
+ DBUG_RETURN(-1);
+ }
+ DBUG_RETURN(0);
+}
+
+
+/*
+ Wake single sleeping thread in pool. Optionally, tell this thread
+ to listen to socket io notification.
+ */
+static int wake_thread(thread_group_t *thread_group)
+{
+ DBUG_ENTER("wake_thread");
+ worker_thread_t *thread = SLIST_FIRST(&thread_group->waiting_threads);
+ if(thread)
+ {
+ thread->woken= true;
+ SLIST_REMOVE_HEAD(&thread_group->waiting_threads, ptr);
+ if (mysql_cond_signal(&thread->cond))
+ abort();
+ DBUG_RETURN(0);
+ }
+ DBUG_RETURN(-1); /* no thread- missed wakeup*/
+}
+
+
+/*
+ Shutdown thread group.
+*/
+static void thread_group_close(thread_group_t *thread_group)
+{
+ DBUG_ENTER("thread_group_close");
+
+ char c= 0;
+
+ mysql_mutex_lock(&thread_group->mutex);
+ thread_group->shutdown= true;
+ thread_group->listener= NULL;
+
+ /* Wake listener. */
+ if (write(thread_group->shutdown_pipe[1], &c, 1) < 0)
+ DBUG_VOID_RETURN;
+
+ /* Wake all workers. */
+ while(wake_thread(thread_group) == 0) {};
+ mysql_mutex_unlock(&thread_group->mutex);
+
+#if 0
+ /* Wait until workers terminate */
+ while(thread_group->thread_count)
+ usleep(1000);
+#endif
+
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Post a task to the workqueue, maybe wake a worker so
+ it picks the task.
+*/
+static void post_event(thread_group_t *thread_group, pool_event_t* ev)
+{
+ DBUG_ENTER("post_event");
+
+ mysql_mutex_lock(&thread_group->mutex);
+ STAILQ_INSERT_TAIL(&thread_group->queue, ev, next);
+ if (thread_group->active_thread_count == 0)
+ {
+ wake_or_create_thread(thread_group);
+ }
+ mysql_mutex_unlock(&thread_group->mutex);
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Check if pool is already overcommited.
+ This is used to prevent too many threads executing at the same time,
+ if the workload is not CPU bound.
+*/
+static bool too_many_threads(thread_group_t *thread_group)
+{
+ return (thread_group->active_thread_count > 4 && !thread_group->stalled);
+}
+
+
+
+/*
+ Dequeue a work item.
+
+ If it is not immediately available, thread will sleep until
+ work is available (it also can become IO listener for a while).
+*/
+int get_event(worker_thread_t *current_thread, thread_group_t *thread_group,
+ pool_event_t **ev, struct timespec *ts)
+{
+ DBUG_ENTER("get_event");
+
+ pool_event_t *first_event = NULL;
+ int err=0;
+
+ mysql_mutex_lock(&thread_group->mutex);
+
+ decrement_active_threads(thread_group);
+ DBUG_ASSERT(thread_group->active_thread_count >= 0);
+
+ do
+ {
+ if (thread_group->shutdown)
+ break;
+
+ /* Check if queue is not empty */
+ if (!too_many_threads(thread_group))
+ {
+ first_event= queue_get(thread_group);
+ if(first_event)
+ break;
+ }
+
+ /* If there is currently no listener in the group, become one. */
+ if(!thread_group->listener)
+ {
+ thread_group->listener= current_thread;
+ mysql_mutex_unlock(&thread_group->mutex);
+
+ first_event= listener(current_thread, thread_group);
+
+ mysql_mutex_lock(&thread_group->mutex);
+ /* There is no listener anymore, it just returned. */
+ thread_group->listener= NULL;
+ break;
+ }
+
+ /*
+ Last thing we try before going to sleep is to
+ pick a single event via epoll, without waiting (timeout 0)
+ */
+ if (!too_many_threads(thread_group))
+ {
+ native_event nev;
+ if (io_poll_wait(thread_group->pollfd,&nev,1, 0) == 1)
+ {
+ thread_group->io_event_count++;
+ first_event = (pool_event_t *)native_event_get_userdata(&nev);
+ break;
+ }
+ }
+
+
+ /* And now, finally sleep */
+ current_thread->woken = false; /* wake() sets this to true */
+
+ /*
+ Add current thread to the head of the waiting list and wait.
+ It is important to add thread to the head rather than tail
+ as it ensures LIFO wakeup order (hot caches, working inactivity timeout)
+ */
+ SLIST_INSERT_HEAD(&thread_group->waiting_threads, current_thread, ptr);
+
+ if(ts)
+ err = mysql_cond_timedwait(&current_thread->cond, &thread_group->mutex, ts);
+ else
+ err = mysql_cond_wait(&current_thread->cond, &thread_group->mutex);
+
+ if (!current_thread->woken)
+ {
+ /*
+ Thread was not signalled by wake(), it might be a spurious wakeup or
+ a timeout. Anyhow, we need to remove ourselves from the list now.
+ If thread was explicitly woken, than caller removed us from the list.
+ */
+ SLIST_REMOVE(&thread_group->waiting_threads, current_thread, worker_thread_t, ptr);
+ }
+
+ if(err)
+ break;
+
+ }
+ while(true);
+
+ thread_group->stalled= false;
+ increment_active_threads(thread_group);
+ mysql_mutex_unlock(&thread_group->mutex);
+
+
+ if (first_event)
+ *ev = first_event;
+ else
+ *ev = &POOL_SHUTDOWN_EVENT;
+
+ DBUG_RETURN(err);
+}
+
+
+
+/*
+ Tells the pool that thread starts waiting on IO, lock, condition,
+ sleep() or similar.
+
+ Will wake another worker, and if there is no listener will
+ promote a listener,
+*/
+void wait_begin(thread_group_t *thread_group)
+{
+ DBUG_ENTER("wait_begin");
+ mysql_mutex_lock(&thread_group->mutex);
+ decrement_active_threads(thread_group);
+ DBUG_ASSERT(thread_group->active_thread_count >=0);
+ DBUG_ASSERT(thread_group->connection_count > 0);
+
+ if((thread_group->active_thread_count == 0) &&
+ (!queue_is_empty(thread_group) || !thread_group->listener))
+ {
+ wake_or_create_thread(thread_group);
+ }
+
+ mysql_mutex_unlock(&thread_group->mutex);
+ DBUG_VOID_RETURN;
+}
+
+/*
+ Tells the pool current thread finished waiting.
+*/
+void wait_end(thread_group_t *thread_group)
+{
+ DBUG_ENTER("wait_end");
+ mysql_mutex_lock(&thread_group->mutex);
+ increment_active_threads(thread_group);
+ mysql_mutex_unlock(&thread_group->mutex);
+ DBUG_VOID_RETURN;
+}
+
+
+/* Scheduler */
+connection_t *alloc_connection(THD *thd)
+{
+ DBUG_ENTER("alloc_connection");
+
+ connection_t* connection = (connection_t *)my_malloc(sizeof(connection_t),0);
+ if (connection)
+ {
+ connection->thd = thd;
+ connection->waiting= false;
+ connection->logged_in= false;
+ connection->abs_wait_timeout= ULONGLONG_MAX;
+ }
+ DBUG_RETURN(connection);
+}
+
+
+
+/*
+ Add a new connection to thread pool..
+*/
+void tp_add_connection(THD *thd)
+{
+ DBUG_ENTER("tp_add_connection");
+
+ threads.append(thd);
+ mysql_mutex_unlock(&LOCK_thread_count);
+ connection_t *c= alloc_connection(thd);
+ if(c)
+ {
+ c->thread_group= &all_groups[c->thd->thread_id%threadpool_size];
+ mysql_mutex_lock(&c->thread_group->mutex);
+ c->thread_group->connection_count++;
+ mysql_mutex_unlock(&c->thread_group->mutex);
+ c->thd->scheduler.data = c;
+ post_event(c->thread_group,&c->event);
+ }
+
+ DBUG_VOID_RETURN;
+}
+
+
+static void connection_abort(connection_t *c)
+{
+ DBUG_ENTER("connection_abort");
+ mysql_mutex_lock(&c->thread_group->mutex);
+ c->thread_group->connection_count--;
+ mysql_mutex_unlock(&c->thread_group->mutex);
+
+ threadpool_remove_connection(c->thd);
+ my_free(c);
+ DBUG_VOID_RETURN;
+}
+
+void tp_post_kill_notification(THD *thd)
+{
+ DBUG_ENTER("tp_post_kill_notification");
+ if (current_thd == thd || thd->system_thread)
+ DBUG_VOID_RETURN;
+
+ if (thd->net.vio)
+ vio_shutdown(thd->net.vio, SHUT_RD);
+ DBUG_VOID_RETURN;
+}
+
+void tp_wait_begin(THD *thd, int type)
+{
+ DBUG_ENTER("tp_wait_begin");
+
+ if (!thd)
+ DBUG_VOID_RETURN;
+
+ connection_t *connection = (connection_t *)thd->scheduler.data;
+ if(connection)
+ {
+ DBUG_ASSERT(!connection->waiting);
+ connection->waiting= true;
+ wait_begin(connection->thread_group);
+ }
+ DBUG_VOID_RETURN;
+}
+
+
+void tp_wait_end(THD *thd)
+{
+ DBUG_ENTER("tp_wait_end");
+ if (!thd)
+ DBUG_VOID_RETURN;
+
+ connection_t *connection = (connection_t *)thd->scheduler.data;
+ if(connection)
+ {
+ DBUG_ASSERT(connection->waiting);
+ connection->waiting = false;
+ wait_end(connection->thread_group);
+ }
+ DBUG_VOID_RETURN;
+}
+
+
+static void set_next_timeout_check(ulonglong abstime)
+{
+ DBUG_ENTER("set_next_timeout_check");
+ while(abstime < pool_timer.next_timeout_check)
+ {
+ longlong old= (longlong)pool_timer.next_timeout_check;
+ my_atomic_cas64((volatile int64*)&pool_timer.next_timeout_check,
+ &old, abstime);
+ }
+ DBUG_VOID_RETURN;
+}
+
+static void set_wait_timeout(connection_t *c)
+{
+ DBUG_ENTER("set_wait_timeout");
+ /*
+ Calculate wait deadline for this connection.
+ Instead of using my_micro_time() which has a syscall
+ overhead, use pool_timer.current_microtime and take
+ into account that its value could be off by at most
+ one tick interval.
+ */
+
+ c->abs_wait_timeout= pool_timer.current_microtime +
+ 1000LL*pool_timer.tick_interval +
+ 1000000LL*c->thd->variables.net_wait_timeout;
+
+ set_next_timeout_check(c->abs_wait_timeout);
+ DBUG_VOID_RETURN;
+}
+
+static void handle_event(pool_event_t *ev)
+{
+
+ DBUG_ENTER("handle_event");
+
+ /* Normal case, handle query on connection */
+ connection_t *c = (connection_t*)(void *)ev;
+ bool do_login = (!c->logged_in);
+ int ret;
+
+ if (do_login)
+ {
+ ret= threadpool_add_connection(c->thd);
+ c->logged_in= true;
+ }
+ else
+ {
+ ret= threadpool_process_request(c->thd);
+ }
+
+ if(!ret)
+ {
+ set_wait_timeout(c);
+ int fd = c->thd->net.vio->sd;
+ if (do_login)
+ {
+ ret= io_poll_associate_fd(c->thread_group->pollfd, fd, c);
+ }
+ else
+ ret= io_poll_start_read(c->thread_group->pollfd, fd, c);
+ }
+
+ if (ret)
+ {
+ connection_abort(c);
+ }
+
+ DBUG_VOID_RETURN;
+}
+
+
+static void *worker_main(void *param)
+{
+
+ worker_thread_t this_thread;
+
+ thread_created++;
+ pthread_detach_this_thread();
+ my_thread_init();
+ DBUG_ENTER("worker_main");
+
+ thread_group_t *thread_group = (thread_group_t *)param;
+
+ /* Init per-thread structure */
+ mysql_cond_init(key_worker_cond, &this_thread.cond, NULL);
+ this_thread.thread_group= thread_group;
+ this_thread.event_count=0;
+
+ mysql_mutex_lock(&thread_group->mutex);
+ tp_stats.num_worker_threads++;
+ thread_group->thread_count++;
+ thread_group->active_thread_count++;
+ thread_group->pending_thread_start_count--;
+ mysql_mutex_unlock(&thread_group->mutex);
+
+ /* Run event loop */
+ for(;;)
+ {
+ struct pool_event_t *ev;
+ struct timespec ts;
+ set_timespec(ts,threadpool_idle_timeout);
+ if (get_event(&this_thread, thread_group, &ev, &ts)
+ || ev == &POOL_SHUTDOWN_EVENT)
+ {
+ break;
+ }
+ this_thread.event_count++;
+ handle_event(ev);
+ }
+
+ /* Thread shutdown: cleanup per-worker-thread structure. */
+ mysql_cond_destroy(&this_thread.cond);
+
+ mysql_mutex_lock(&thread_group->mutex);
+ thread_group->active_thread_count--;
+ thread_group->thread_count--;
+ tp_stats.num_worker_threads--;
+ mysql_mutex_unlock(&thread_group->mutex);
+
+ /* If it is the last thread in pool and pool is terminating, destroy pool.*/
+ if (thread_group->shutdown && (thread_group->thread_count == 0))
+ {
+ /* last thread existing, cleanup the pool structure */
+ mysql_mutex_destroy(&thread_group->mutex);
+ }
+ DBUG_POP();
+ my_thread_end();
+ return NULL;
+}
+
+
+static bool started=false;
+
+bool tp_init()
+{
+ DBUG_ENTER("tp_init");
+ started = true;
+ scheduler_init();
+ skip_net_wait_timeout= 1;
+ if (threadpool_size == 0)
+ {
+ threadpool_size= my_getncpus();
+ }
+
+ for(uint i=0; i < threadpool_size; i++)
+ {
+ thread_group_init(&all_groups[i], get_connection_attrib());
+ }
+
+ #define PSI_register(X) \
+ if(PSI_server) PSI_server->register_ ## X("threadpool", X ## _list, array_elements(X ## _list))
+
+ PSI_register(mutex);
+ PSI_register(cond);
+ PSI_register(thread);
+
+ pool_timer.tick_interval= threadpool_stall_limit;
+ start_timer(&pool_timer);
+ DBUG_RETURN(0);
+}
+
+void tp_end()
+{
+ DBUG_ENTER("tp_end");
+
+ if (!started)
+ DBUG_VOID_RETURN;
+
+ stop_timer(&pool_timer);
+ for(uint i=0; i< threadpool_size; i++)
+ {
+ thread_group_close(&all_groups[i]);
+ }
+ DBUG_VOID_RETURN;
+}
diff --git a/sql/threadpool_win.cc b/sql/threadpool_win.cc
new file mode 100644
index 00000000000..4e46e393c24
--- /dev/null
+++ b/sql/threadpool_win.cc
@@ -0,0 +1,756 @@
+#ifdef _WIN32_WINNT
+#undef _WIN32_WINNT
+#endif
+
+#define _WIN32_WINNT 0x0601
+
+#include <my_global.h>
+#include <violite.h>
+#include <sql_priv.h>
+#include <sql_class.h>
+#include <my_pthread.h>
+#include <scheduler.h>
+#include <sql_connect.h>
+#include <mysqld.h>
+#include <debug_sync.h>
+#include <threadpool.h>
+#include <windows.h>
+
+
+
+TP_STATISTICS tp_stats;
+
+#define WEAK_SYMBOL(return_type, function, ...) \
+ typedef return_type (WINAPI *pFN_##function)(__VA_ARGS__); \
+ static pFN_##function my_##function = (pFN_##function) \
+ (GetProcAddress(GetModuleHandle("kernel32"),#function))
+
+WEAK_SYMBOL(VOID, CancelThreadpoolIo, PTP_IO);
+#define CancelThreadpoolIo my_CancelThreadpoolIo
+
+WEAK_SYMBOL(VOID, CloseThreadpool, PTP_POOL);
+#define CloseThreadpool my_CloseThreadpool
+
+WEAK_SYMBOL(VOID, CloseThreadpoolIo, PTP_IO);
+#define CloseThreadpoolIo my_CloseThreadpoolIo
+
+WEAK_SYMBOL(VOID, CloseThreadpoolTimer,PTP_TIMER);
+#define CloseThreadpoolTimer my_CloseThreadpoolTimer
+
+WEAK_SYMBOL(VOID, CloseThreadpoolWait,PTP_WAIT);
+#define CloseThreadpoolWait my_CloseThreadpoolWait
+
+WEAK_SYMBOL(PTP_POOL, CreateThreadpool,PVOID);
+#define CreateThreadpool my_CreateThreadpool
+
+WEAK_SYMBOL(PTP_IO, CreateThreadpoolIo, HANDLE, PTP_WIN32_IO_CALLBACK, PVOID ,
+ PTP_CALLBACK_ENVIRON);
+#define CreateThreadpoolIo my_CreateThreadpoolIo
+
+WEAK_SYMBOL(PTP_TIMER, CreateThreadpoolTimer, PTP_TIMER_CALLBACK ,
+ PVOID pv, PTP_CALLBACK_ENVIRON pcbe);
+#define CreateThreadpoolTimer my_CreateThreadpoolTimer
+
+WEAK_SYMBOL(PTP_WAIT, CreateThreadpoolWait, PTP_WAIT_CALLBACK, PVOID,
+ PTP_CALLBACK_ENVIRON);
+#define CreateThreadpoolWait my_CreateThreadpoolWait
+
+WEAK_SYMBOL(VOID, DisassociateCurrentThreadFromCallback, PTP_CALLBACK_INSTANCE);
+#define DisassociateCurrentThreadFromCallback my_DisassociateCurrentThreadFromCallback
+
+WEAK_SYMBOL(DWORD, FlsAlloc, PFLS_CALLBACK_FUNCTION);
+#define FlsAlloc my_FlsAlloc
+
+WEAK_SYMBOL(PVOID, FlsGetValue, DWORD);
+#define FlsGetValue my_FlsGetValue
+
+WEAK_SYMBOL(BOOL, FlsSetValue, DWORD, PVOID);
+#define FlsSetValue my_FlsSetValue
+
+WEAK_SYMBOL(VOID, SetThreadpoolThreadMaximum, PTP_POOL, DWORD);
+#define SetThreadpoolThreadMaximum my_SetThreadpoolThreadMaximum
+
+WEAK_SYMBOL(BOOL, SetThreadpoolThreadMinimum, PTP_POOL, DWORD);
+#define SetThreadpoolThreadMinimum my_SetThreadpoolThreadMinimum
+
+WEAK_SYMBOL(VOID, SetThreadpoolTimer, PTP_TIMER, PFILETIME,DWORD,DWORD);
+#define SetThreadpoolTimer my_SetThreadpoolTimer
+
+WEAK_SYMBOL(VOID, SetThreadpoolWait, PTP_WAIT,HANDLE,PFILETIME);
+#define SetThreadpoolWait my_SetThreadpoolWait
+
+WEAK_SYMBOL(VOID, StartThreadpoolIo, PTP_IO);
+#define StartThreadpoolIo my_StartThreadpoolIo
+
+WEAK_SYMBOL(VOID, WaitForThreadpoolIoCallbacks,PTP_IO, BOOL);
+#define WaitForThreadpoolIoCallbacks my_WaitForThreadpoolIoCallbacks
+
+WEAK_SYMBOL(VOID, WaitForThreadpoolTimerCallbacks, PTP_TIMER, BOOL);
+#define WaitForThreadpoolTimerCallbacks my_WaitForThreadpoolTimerCallbacks
+
+WEAK_SYMBOL(VOID, WaitForThreadpoolWaitCallbacks, PTP_WAIT, BOOL);
+#define WaitForThreadpoolWaitCallbacks my_WaitForThreadpoolWaitCallbacks
+
+WEAK_SYMBOL(BOOL, SetFileCompletionNotificationModes, HANDLE, UCHAR);
+#define SetFileCompletionNotificationModes my_SetFileCompletionNotificationModes
+
+WEAK_SYMBOL(BOOL, TrySubmitThreadpoolCallback, PTP_SIMPLE_CALLBACK pfns,
+ PVOID pv,PTP_CALLBACK_ENVIRON pcbe);
+#define TrySubmitThreadpoolCallback my_TrySubmitThreadpoolCallback
+
+WEAK_SYMBOL(PTP_WORK, CreateThreadpoolWork, PTP_WORK_CALLBACK pfnwk, PVOID pv,
+ PTP_CALLBACK_ENVIRON pcbe);
+#define CreateThreadpoolWork my_CreateThreadpoolWork
+
+WEAK_SYMBOL(VOID, SubmitThreadpoolWork,PTP_WORK pwk);
+#define SubmitThreadpoolWork my_SubmitThreadpoolWork
+
+WEAK_SYMBOL(VOID, CloseThreadpoolWork, PTP_WORK pwk);
+#define CloseThreadpoolWork my_CloseThreadpoolWork
+
+#if _MSC_VER >= 1600
+/* Stack size manipulation available only on Win7+ /declarations in VS10 */
+WEAK_SYMBOL(BOOL, SetThreadpoolStackInformation, PTP_POOL,
+ PTP_POOL_STACK_INFORMATION);
+#define SetThreadpoolStackInformation my_SetThreadpoolStackInformation
+#endif
+
+#if _MSC_VER < 1600
+#define SetThreadpoolCallbackPriority(env,prio)
+typedef enum _TP_CALLBACK_PRIORITY {
+ TP_CALLBACK_PRIORITY_HIGH,
+ TP_CALLBACK_PRIORITY_NORMAL,
+ TP_CALLBACK_PRIORITY_LOW,
+ TP_CALLBACK_PRIORITY_INVALID
+} TP_CALLBACK_PRIORITY;
+#endif
+
+
+/* Log a warning */
+static void tp_log_warning(const char *msg, const char *fct)
+{
+ sql_print_warning("Threadpool: %s. %s failed (last error %d)",msg, fct,
+ GetLastError());
+}
+
+
+PTP_POOL pool;
+DWORD fls;
+extern int skip_net_wait_timeout;
+
+static bool skip_completion_port_on_success = false;
+
+/*
+ Threadpool callbacks.
+
+ io_completion_callback - handle client request
+ timer_callback - handle wait timeout (kill connection)
+ shm_read_callback, shm_close_callback - shared memory stuff
+ login_callback - user login (submitted as threadpool work)
+
+*/
+
+static void CALLBACK timer_callback(PTP_CALLBACK_INSTANCE instance,
+ PVOID context, PTP_TIMER timer);
+
+static void CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance,
+ PVOID context, PVOID overlapped, ULONG io_result, ULONG_PTR nbytes, PTP_IO io);
+
+static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance,
+ PVOID Context, PTP_WAIT wait,TP_WAIT_RESULT wait_result);
+
+static void CALLBACK shm_close_callback(PTP_CALLBACK_INSTANCE instance,
+ PVOID Context, PTP_WAIT wait,TP_WAIT_RESULT wait_result);
+
+#define CONNECTION_SIGNATURE 0xAFFEAFFE
+
+static void check_thread_init();
+
+/* Get current time as Windows time */
+static ulonglong now()
+{
+ ulonglong current_time;
+ GetSystemTimeAsFileTime((PFILETIME)&current_time);
+ return current_time;
+}
+
+/*
+ Connection structure, encapsulates THD + structures for asynchronous
+ IO and pool.
+*/
+
+struct connection_t
+{
+ THD *thd;
+ bool logged_in;
+ HANDLE handle;
+ OVERLAPPED overlapped;
+
+ /* absolute time for wait timeout (as Windows time) */
+ volatile ulonglong timeout;
+
+ PTP_CLEANUP_GROUP cleanup_group;
+ TP_CALLBACK_ENVIRON callback_environ;
+
+ PTP_IO io;
+ PTP_TIMER timer;
+ PTP_WAIT shm_read;
+};
+
+void init_connection(connection_t *connection)
+{
+ connection->logged_in = false;
+ connection->handle= 0;
+ connection->io= 0;
+ connection->shm_read= 0;
+ connection->timer= 0;
+ connection->logged_in = false;
+ connection->timeout= ULONGLONG_MAX;
+ memset(&connection->overlapped, 0, sizeof(OVERLAPPED));
+ InitializeThreadpoolEnvironment(&connection->callback_environ);
+ SetThreadpoolCallbackPool(&connection->callback_environ, pool);
+ connection->thd = 0;
+}
+
+int init_io(connection_t *connection, THD *thd)
+{
+ connection->thd= thd;
+ Vio *vio = thd->net.vio;
+ switch(vio->type)
+ {
+ case VIO_TYPE_SSL:
+ case VIO_TYPE_TCPIP:
+ connection->handle= (HANDLE)vio->sd;
+ break;
+ case VIO_TYPE_NAMEDPIPE:
+ connection->handle= (HANDLE)vio->hPipe;
+ break;
+ case VIO_TYPE_SHARED_MEMORY:
+ connection->shm_read= CreateThreadpoolWait(shm_read_callback, connection,
+ &connection->callback_environ);
+ if (!connection->shm_read)
+ {
+ tp_log_warning("Allocation failed", "CreateThreadpoolWait");
+ return -1;
+ }
+ break;
+ default:
+ abort();
+ }
+
+ if (connection->handle)
+ {
+ /* Performance tweaks (s. MSDN documentation)*/
+ UCHAR flags = FILE_SKIP_SET_EVENT_ON_HANDLE;
+ if (skip_completion_port_on_success)
+ {
+ flags |= FILE_SKIP_COMPLETION_PORT_ON_SUCCESS;
+ }
+ (void)SetFileCompletionNotificationModes(connection->handle, flags);
+
+ /* Assign io completion callback */
+ connection->io = CreateThreadpoolIo(connection->handle,
+ io_completion_callback, connection, &connection->callback_environ);
+ if(!connection->io)
+ {
+ tp_log_warning("Allocation failed", "CreateThreadpoolWait");
+ return -1;
+ }
+ }
+ connection->timer = CreateThreadpoolTimer(timer_callback, connection,
+ &connection->callback_environ);
+ if (!connection->timer)
+ {
+ tp_log_warning("Allocation failed", "CreateThreadpoolWait");
+ return -1;
+ }
+
+ return 0;
+}
+
+
+/*
+ Start asynchronous read
+*/
+int start_io(connection_t *connection, PTP_CALLBACK_INSTANCE instance)
+{
+ /* Start async read */
+ DWORD num_bytes = 0;
+ static char c;
+ WSABUF buf;
+ buf.buf= &c;
+ buf.len= 0;
+ DWORD flags=0;
+ DWORD last_error= 0;
+
+ int retval;
+ Vio *vio= connection->thd->net.vio;
+
+ if (vio->type == VIO_TYPE_SHARED_MEMORY)
+ {
+ SetThreadpoolWait(connection->shm_read, vio->event_server_wrote, NULL);
+ return 0;
+ }
+ if (vio->type == VIO_CLOSED)
+ {
+ return -1;
+ }
+
+ DBUG_ASSERT(vio->type == VIO_TYPE_TCPIP ||
+ vio->type == VIO_TYPE_SSL ||
+ vio->type == VIO_TYPE_NAMEDPIPE);
+
+ OVERLAPPED *overlapped= &connection->overlapped;
+ PTP_IO io= connection->io;
+ StartThreadpoolIo(io);
+
+ if (vio->type == VIO_TYPE_TCPIP || vio->type == VIO_TYPE_SSL)
+ {
+ /* Start async io (sockets). */
+ if (WSARecv(vio->sd , &buf, 1, &num_bytes, &flags,
+ overlapped, NULL) == 0)
+ {
+ retval= last_error= 0;
+ }
+ else
+ {
+ retval= -1;
+ last_error= WSAGetLastError();
+ }
+ }
+ else
+ {
+ /* Start async io (named pipe) */
+ if (ReadFile(vio->hPipe, &c, 0, &num_bytes ,overlapped))
+ {
+ retval= last_error= 0;
+ }
+ else
+ {
+ retval= -1;
+ last_error= GetLastError();
+ }
+ }
+
+ if (retval == 0 || last_error == ERROR_MORE_DATA)
+ {
+ /*
+ IO successfully finished (synchronously).
+ If skip_completion_port_on_success is set, we need to handle it right
+ here, because completion callback would not be executed by the pool.
+ */
+ if(skip_completion_port_on_success)
+ {
+ CancelThreadpoolIo(io);
+ io_completion_callback(instance, connection, overlapped, last_error,
+ num_bytes, io);
+ }
+ return 0;
+ }
+
+ if(last_error == ERROR_IO_PENDING)
+ {
+ return 0;
+ }
+
+ /* Some error occured */
+ CancelThreadpoolIo(io);
+ return -1;
+}
+
+int login(connection_t *connection, PTP_CALLBACK_INSTANCE instance)
+{
+ if (threadpool_add_connection(connection->thd) == 0
+ && init_io(connection, connection->thd) == 0
+ && start_io(connection, instance) == 0)
+ {
+ return 0;
+ }
+ return -1;
+}
+
+/*
+ Recalculate wait timeout, maybe reset timer.
+*/
+void set_wait_timeout(connection_t *connection, ulonglong old_timeout)
+{
+ ulonglong new_timeout = now() +
+ 10000000LL*connection->thd->variables.net_wait_timeout;
+
+ if (new_timeout < old_timeout)
+ {
+ SetThreadpoolTimer(connection->timer, (PFILETIME) &new_timeout, 0, 1000);
+ }
+ connection->timeout = new_timeout;
+}
+
+/*
+ Terminates (idle) connection by closing the socket.
+ This will activate io_completion_callback() in a different thread
+*/
+void post_kill_notification(connection_t *connection)
+{
+ check_thread_init();
+ THD *thd=connection->thd;
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ thd->killed = KILL_CONNECTION;
+ vio_shutdown(thd->net.vio, SHUT_RDWR);
+ thd->mysys_var= NULL;
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+}
+
+
+/* Connection destructor */
+void destroy_connection(connection_t *connection)
+{
+ if (connection->thd)
+ {
+ threadpool_remove_connection(connection->thd);
+ }
+
+ if (connection->io)
+ {
+ WaitForThreadpoolIoCallbacks(connection->io, TRUE);
+ CloseThreadpoolIo(connection->io);
+ }
+
+ if(connection->shm_read)
+ {
+ WaitForThreadpoolWaitCallbacks(connection->shm_read, TRUE);
+ CloseThreadpoolWait(connection->shm_read);
+ }
+
+ if(connection->timer)
+ {
+ SetThreadpoolTimer(connection->timer, 0, 0, 0);
+ WaitForThreadpoolTimerCallbacks(connection->timer, TRUE);
+ CloseThreadpoolTimer(connection->timer);
+ }
+
+ DestroyThreadpoolEnvironment(&connection->callback_environ);
+}
+
+
+
+/*
+ This function should be called first whenever a callback is invoked in the
+ threadpool, does my_thread_init() if not yet done
+*/
+extern ulong thread_created;
+static void check_thread_init()
+{
+ if (FlsGetValue(fls) == NULL)
+ {
+ FlsSetValue(fls, (void *)1);
+ my_thread_init();
+ thread_created++;
+ InterlockedIncrement((volatile long *)&tp_stats.num_worker_threads);
+ }
+}
+
+
+/*
+ Take care of proper cleanup when threadpool threads exit.
+ We do not control how threads are created, thus it is our responsibility to
+ check that my_thread_init() is called on thread initialization and
+ my_thread_end() on thread destruction. On Windows, FlsAlloc() provides the
+ thread destruction callbacks.
+*/
+static VOID WINAPI thread_destructor(void *data)
+{
+ if(data)
+ {
+ if (InterlockedDecrement((volatile long *)&tp_stats.num_worker_threads) >= 0)
+ {
+ /*
+ The above check for number of thread >= 0 is due to shutdown code (
+ see tp_end()) where we forcefully set num_worker_threads to 0, even
+ if not all threads have shut down yet to the point they would ran Fls
+ destructors, even after CloseThreadpool(). See also comment in tp_end().
+ */
+ mysql_mutex_lock(&LOCK_thread_count);
+ my_thread_end();
+ mysql_mutex_unlock(&LOCK_thread_count);
+ }
+ }
+}
+
+
+/* Scheduler callback : init */
+bool tp_init(void)
+{
+ fls= FlsAlloc(thread_destructor);
+ pool= CreateThreadpool(NULL);
+ if(!pool)
+ {
+ sql_print_error("Can't create threadpool. "
+ "CreateThreadpool() failed with %d. Likely cause is memory pressure",
+ GetLastError());
+ exit(1);
+ }
+
+ if (threadpool_max_threads)
+ {
+ SetThreadpoolThreadMaximum(pool,threadpool_max_threads);
+ }
+
+ if (threadpool_min_threads)
+ {
+ if (!SetThreadpoolThreadMinimum(pool, threadpool_min_threads))
+ {
+ tp_log_warning( "Can't set threadpool minimum threads",
+ "SetThreadpoolThreadMinimum");
+ }
+ }
+
+ /*
+ Control stack size (OS must be Win7 or later, plus corresponding SDK)
+ */
+#if _MSC_VER >=1600
+ if (SetThreadpoolStackInformation)
+ {
+ TP_POOL_STACK_INFORMATION stackinfo;
+ stackinfo.StackCommit = 0;
+ stackinfo.StackReserve = my_thread_stack_size;
+ if (!SetThreadpoolStackInformation(pool, &stackinfo))
+ {
+ tp_log_warning("Can't set threadpool stack size",
+ "SetThreadpoolStackInformation");
+ }
+ }
+#endif
+
+ skip_net_wait_timeout = 1;
+ return 0;
+}
+
+
+/*
+ Scheduler callback : Destroy the scheduler.
+*/
+
+extern "C" uint THR_thread_count;
+extern "C" mysql_mutex_t THR_LOCK_threads;
+extern "C" mysql_cond_t THR_COND_threads;
+
+void tp_end(void)
+{
+ if(pool)
+ {
+ SetThreadpoolThreadMaximum(pool, 0);
+ CloseThreadpool(pool);
+
+ /*
+ Tell my_global_thread_end() we're complete.
+
+ This would not be necessary if CloseThreadpool() would synchronously
+ release all threads and wait until they disappear and call all their FLS
+ destrructors . However, threads in the pool are released asynchronously
+ and might spend some time in the CRT shutdown code. Thus zero
+ num_worker_threads, to avoid thread destructor's my_thread_end()s after
+ this point.
+ */
+ LONG remaining_threads=
+ InterlockedExchange( (volatile long *)&tp_stats.num_worker_threads, 0);
+
+ if (remaining_threads)
+ {
+ mysql_mutex_lock(&THR_LOCK_threads);
+ THR_thread_count -= remaining_threads;
+ mysql_cond_signal(&THR_COND_threads);
+ mysql_mutex_unlock(&THR_LOCK_threads);
+ }
+ }
+ skip_net_wait_timeout= 0;
+}
+
+/*
+ Notify pool about connection being killed.
+*/
+void tp_post_kill_notification(THD *thd)
+{
+ if (current_thd == thd)
+ return; /* There is nothing to do.*/
+
+ if (thd->system_thread)
+ return; /* Will crash if we attempt to kill system thread. */
+
+ Vio *vio= thd->net.vio;
+
+ vio_shutdown(vio, SD_BOTH);
+
+}
+
+/*
+ Handle read completion/notification.
+*/
+static VOID CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance,
+ PVOID context, PVOID overlapped, ULONG io_result, ULONG_PTR nbytes, PTP_IO io)
+{
+ if(instance)
+ {
+ check_thread_init();
+ }
+
+ connection_t *connection = (connection_t*)context;
+ THD *thd= connection->thd;
+ ulonglong old_timeout = connection->timeout;
+ connection->timeout = ULONGLONG_MAX;
+
+ if (threadpool_process_request(connection->thd))
+ goto error;
+
+ set_wait_timeout(connection, old_timeout);
+ if(start_io(connection, instance))
+ goto error;
+
+ return;
+
+error:
+ /* Some error has occured. */
+ if (instance)
+ DisassociateCurrentThreadFromCallback(instance);
+
+ destroy_connection(connection);
+ my_free(connection);
+}
+
+
+/* Simple callback for login */
+static void CALLBACK login_callback(PTP_CALLBACK_INSTANCE instance,
+ PVOID context, PTP_WORK work)
+{
+ if(instance)
+ {
+ check_thread_init();
+ }
+
+ connection_t *connection =(connection_t *)context;
+ if (login(connection, instance) != 0)
+ {
+ destroy_connection(connection);
+ my_free(connection);
+ }
+}
+
+/*
+ Timer callback.
+ Invoked when connection times out (wait_timeout)
+*/
+static VOID CALLBACK timer_callback(PTP_CALLBACK_INSTANCE instance,
+ PVOID parameter, PTP_TIMER timer)
+{
+ check_thread_init();
+
+ connection_t *con= (connection_t*)parameter;
+ ulonglong timeout= con->timeout;
+
+ if (timeout <= now())
+ {
+ con->thd->killed = KILL_CONNECTION;
+ if(con->thd->net.vio)
+ vio_shutdown(con->thd->net.vio, SD_BOTH);
+ }
+ else if(timeout != ULONGLONG_MAX)
+ {
+ /*
+ Reset timer.
+ There is a tiny possibility of a race condition, since the value of timeout
+ could have changed to smaller value in the thread doing io callback.
+
+ Given the relative unimportance of the wait timeout, we accept race
+ condition.
+ */
+ SetThreadpoolTimer(timer, (PFILETIME)&timeout, 0, 1000);
+ }
+}
+
+
+/*
+ Shared memory read callback.
+ Invoked when read event is set on connection.
+*/
+static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance,
+ PVOID context, PTP_WAIT wait,TP_WAIT_RESULT wait_result)
+{
+ connection_t *con= (connection_t *)context;
+ /* Disarm wait. */
+ SetThreadpoolWait(wait, NULL, NULL);
+
+ /*
+ This is an autoreset event, and one wakeup is eaten already by threadpool,
+ and the current state is "not set". Thus we need to reset the event again,
+ or vio_read will hang.
+ */
+ HANDLE h = con->thd->net.vio->event_server_wrote;
+ SetEvent(h);
+ io_completion_callback(instance, context, NULL, 0, 0 , 0);
+}
+
+
+/*
+ Notify the thread pool about a new connection.
+ NOTE: LOCK_thread_count is locked on entry. This function must unlock it.
+*/
+void tp_add_connection(THD *thd)
+{
+ bool success = false;
+ connection_t *con = (connection_t *)my_malloc(sizeof(connection_t), 0);
+
+ if (con)
+ threads.append(thd);
+ mysql_mutex_unlock(&LOCK_thread_count);
+
+ if(!con)
+ {
+ tp_log_warning("Allocation failed", "tp_add_connection");
+ return;
+ }
+
+ init_connection(con);
+ con->thd= thd;
+ /* Try to login asynchronously, using threads in the pool */
+ PTP_WORK wrk = CreateThreadpoolWork(login_callback,con, &con->callback_environ);
+ if (wrk)
+ {
+ SubmitThreadpoolWork(wrk);
+ CloseThreadpoolWork(wrk);
+ }
+ else
+ {
+ /* Likely memory pressure */
+ login_callback(NULL, con, NULL); /* deletes connection if something goes wrong */
+ }
+}
+
+
+
+/*
+ Sets the number of idle threads the thread pool maintains in anticipation of new
+ requests.
+*/
+void tp_set_min_threads(uint val)
+{
+ if (pool)
+ SetThreadpoolThreadMinimum(pool, val);
+}
+
+void tp_set_max_threads(uint val)
+{
+ if (pool)
+ SetThreadpoolThreadMaximum(pool, val);
+}
+
+void tp_wait_begin(THD *thd, int type)
+{
+ if (thd && thd->event_scheduler.data)
+ {
+ /* TODO: call CallbackMayRunLong() */
+ }
+}
+
+void tp_wait_end(THD *thd)
+{
+ /* Do we need to do anything ? */
+}
+
diff --git a/vio/vio.c b/vio/vio.c
index b8bc7bdae08..aa0d2012afa 100644
--- a/vio/vio.c
+++ b/vio/vio.c
@@ -49,6 +49,25 @@ static my_bool has_no_data(Vio *vio __attribute__((unused)))
return FALSE;
}
+#ifdef _WIN32
+my_bool vio_shared_memory_has_data(Vio *vio)
+{
+ return (vio->shared_memory_remain > 0);
+}
+
+int vio_shared_memory_shutdown(Vio *vio, int how)
+{
+ SetEvent(vio->event_conn_closed);
+ SetEvent(vio->event_server_wrote);
+ return 0;
+}
+
+int vio_pipe_shutdown(Vio *vio, int how)
+{
+ return vio_socket_shutdown(vio, how); /* cancels io */
+}
+#endif
+
/*
* Helper to fill most of the Vio* with defaults.
*/
@@ -89,6 +108,7 @@ static void vio_init(Vio* vio, enum enum_vio_type type,
vio->poll_read =no_poll_read;
vio->is_connected =vio_is_connected_pipe;
vio->has_data =has_no_data;
+ vio->shutdown =vio_pipe_shutdown;
vio->timeout=vio_win32_timeout;
/* Set default timeout */
@@ -116,7 +136,8 @@ static void vio_init(Vio* vio, enum enum_vio_type type,
vio->poll_read =no_poll_read;
vio->is_connected =vio_is_connected_shared_memory;
- vio->has_data =has_no_data;
+ vio->has_data =vio_shared_memory_has_data;
+ vio->shutdown =vio_shared_memory_shutdown;
/* Currently, shared memory is on Windows only, hence the below is ok*/
vio->timeout= vio_win32_timeout;
@@ -145,6 +166,7 @@ static void vio_init(Vio* vio, enum enum_vio_type type,
vio->poll_read =vio_poll_read;
vio->is_connected =vio_is_connected;
vio->has_data =vio_ssl_has_data;
+ vio->shutdown =vio_socket_shutdown;
DBUG_VOID_RETURN;
}
#endif /* HAVE_OPENSSL */
@@ -163,6 +185,7 @@ static void vio_init(Vio* vio, enum enum_vio_type type,
vio->timeout =vio_timeout;
vio->poll_read =vio_poll_read;
vio->is_connected =vio_is_connected;
+ vio->shutdown =vio_socket_shutdown;
vio->has_data= (flags & VIO_BUFFERED_READ) ?
vio_buff_has_data : has_no_data;
DBUG_VOID_RETURN;
diff --git a/vio/vio_priv.h b/vio/vio_priv.h
index 702ba4de38a..3f62c508375 100644
--- a/vio/vio_priv.h
+++ b/vio/vio_priv.h
@@ -39,6 +39,7 @@ size_t vio_read_pipe(Vio *vio, uchar * buf, size_t size);
size_t vio_write_pipe(Vio *vio, const uchar * buf, size_t size);
my_bool vio_is_connected_pipe(Vio *vio);
int vio_close_pipe(Vio * vio);
+int vio_shutdown_pipe(Vio *vio,int how);
#endif
#ifdef HAVE_SMEM
@@ -46,8 +47,11 @@ size_t vio_read_shared_memory(Vio *vio, uchar * buf, size_t size);
size_t vio_write_shared_memory(Vio *vio, const uchar * buf, size_t size);
my_bool vio_is_connected_shared_memory(Vio *vio);
int vio_close_shared_memory(Vio * vio);
+my_bool vio_shared_memory_has_data(Vio *vio);
+int vio_shutdown_shared_memory(Vio *vio, int how);
#endif
+int vio_socket_shutdown(Vio *vio, int how);
void vio_timeout(Vio *vio,uint which, uint timeout);
my_bool vio_buff_has_data(Vio *vio);
diff --git a/vio/viosocket.c b/vio/viosocket.c
index 4772847abd8..1f129cb8e55 100644
--- a/vio/viosocket.c
+++ b/vio/viosocket.c
@@ -131,6 +131,60 @@ size_t vio_write(Vio * vio, const uchar* buf, size_t size)
DBUG_RETURN(r);
}
+#ifdef _WIN32
+static void CALLBACK cancel_io_apc(ULONG_PTR data)
+{
+ CancelIo((HANDLE)data);
+}
+
+/*
+ Cancel IO on Windows.
+
+ On XP, issue CancelIo as asynchronous procedure call to the thread that started
+ IO. On Vista+, simpler cancelation is done with CancelIoEx.
+*/
+
+static int cancel_io(HANDLE handle, DWORD thread_id)
+{
+ static BOOL (WINAPI *fp_CancelIoEx) (HANDLE, OVERLAPPED *);
+ static volatile int first_time= 1;
+ int rc;
+ HANDLE thread_handle;
+
+ if (first_time)
+ {
+ /* Try to load CancelIoEx using GetProcAddress */
+ InterlockedCompareExchangePointer((volatile void *)&fp_CancelIoEx,
+ GetProcAddress(GetModuleHandle("kernel32"), "CancelIoEx"), NULL);
+ first_time =0;
+ }
+
+ if (fp_CancelIoEx)
+ {
+ return fp_CancelIoEx(handle, NULL)? 0 :-1;
+ }
+
+ thread_handle= OpenThread(THREAD_SET_CONTEXT, FALSE, thread_id);
+ if (thread_handle)
+ {
+ rc= QueueUserAPC(cancel_io_apc, thread_handle, (ULONG_PTR)handle);
+ CloseHandle(thread_handle);
+ }
+ return rc;
+
+}
+#endif
+
+int vio_socket_shutdown(Vio *vio, int how)
+{
+#ifdef _WIN32
+ return cancel_io((HANDLE)vio->sd, vio->thread_id);
+#else
+ return shutdown(vio->sd, how);
+#endif
+}
+
+
int vio_blocking(Vio * vio __attribute__((unused)), my_bool set_blocking_mode,
my_bool *old_mode)
{
@@ -726,6 +780,22 @@ void vio_timeout(Vio *vio, uint which, uint timeout)
#ifdef __WIN__
+/*
+ Disable posting IO completion event to the port.
+ In some cases (synchronous timed IO) we want to skip IOCP notifications.
+*/
+static void disable_iocp_notification(OVERLAPPED *overlapped)
+{
+ HANDLE *handle = &(overlapped->hEvent);
+ *handle = ((HANDLE)((ULONG_PTR) *handle|1));
+}
+
+/* Enable posting IO completion event to the port */
+static void enable_iocp_notification(OVERLAPPED *overlapped)
+{
+ HANDLE *handle = &(overlapped->hEvent);
+ *handle = (HANDLE)((ULONG_PTR) *handle & ~1);
+}
/*
Finish pending IO on pipe. Honor wait timeout
@@ -737,7 +807,7 @@ static size_t pipe_complete_io(Vio* vio, char* buf, size_t size, DWORD timeout_m
DBUG_ENTER("pipe_complete_io");
- ret= WaitForSingleObject(vio->pipe_overlapped.hEvent, timeout_ms);
+ ret= WaitForSingleObjectEx(vio->pipe_overlapped.hEvent, timeout_ms, TRUE);
/*
WaitForSingleObjects will normally return WAIT_OBJECT_O (success, IO completed)
or WAIT_TIMEOUT.
@@ -767,7 +837,8 @@ size_t vio_read_pipe(Vio * vio, uchar *buf, size_t size)
DBUG_ENTER("vio_read_pipe");
DBUG_PRINT("enter", ("sd: %d buf: 0x%lx size: %u", vio->sd, (long) buf,
(uint) size));
-
+
+ disable_iocp_notification(&vio->pipe_overlapped);
if (ReadFile(vio->hPipe, buf, (DWORD)size, &bytes_read,
&(vio->pipe_overlapped)))
{
@@ -777,13 +848,14 @@ size_t vio_read_pipe(Vio * vio, uchar *buf, size_t size)
{
if (GetLastError() != ERROR_IO_PENDING)
{
+ enable_iocp_notification(&vio->pipe_overlapped);
DBUG_PRINT("error",("ReadFile() returned last error %d",
GetLastError()));
DBUG_RETURN((size_t)-1);
}
retval= pipe_complete_io(vio, buf, size,vio->read_timeout_ms);
}
-
+ enable_iocp_notification(&vio->pipe_overlapped);
DBUG_PRINT("exit", ("%lld", (longlong)retval));
DBUG_RETURN(retval);
}
@@ -796,7 +868,7 @@ size_t vio_write_pipe(Vio * vio, const uchar* buf, size_t size)
DBUG_ENTER("vio_write_pipe");
DBUG_PRINT("enter", ("sd: %d buf: 0x%lx size: %u", vio->sd, (long) buf,
(uint) size));
-
+ disable_iocp_notification(&vio->pipe_overlapped);
if (WriteFile(vio->hPipe, buf, (DWORD)size, &bytes_written,
&(vio->pipe_overlapped)))
{
@@ -804,6 +876,7 @@ size_t vio_write_pipe(Vio * vio, const uchar* buf, size_t size)
}
else
{
+ enable_iocp_notification(&vio->pipe_overlapped);
if (GetLastError() != ERROR_IO_PENDING)
{
DBUG_PRINT("vio_error",("WriteFile() returned last error %d",
@@ -812,7 +885,7 @@ size_t vio_write_pipe(Vio * vio, const uchar* buf, size_t size)
}
retval= pipe_complete_io(vio, (char *)buf, size, vio->write_timeout_ms);
}
-
+ enable_iocp_notification(&vio->pipe_overlapped);
DBUG_PRINT("exit", ("%lld", (longlong)retval));
DBUG_RETURN(retval);
}