diff options
author | Vladislav Vaintroub <wlad@montyprogram.com> | 2011-12-08 19:17:49 +0100 |
---|---|---|
committer | Vladislav Vaintroub <wlad@montyprogram.com> | 2011-12-08 19:17:49 +0100 |
commit | e91bbca5fb080a8d988c156d78c7dc1b1daaad82 (patch) | |
tree | edeb15da451e956ae0d6874657c910ab0df111f8 | |
parent | 5e7b949e61f4330e27013c8ec81fa3d450e5dce6 (diff) | |
download | mariadb-git-e91bbca5fb080a8d988c156d78c7dc1b1daaad82.tar.gz |
Initial threadpool implementation for MariaDB 5.5
-rw-r--r-- | cmake/os/FreeBSD.cmake | 7 | ||||
-rw-r--r-- | include/thr_alarm.h | 6 | ||||
-rw-r--r-- | include/violite.h | 3 | ||||
-rwxr-xr-x | mysql-test/mysql-test-run.pl | 4 | ||||
-rw-r--r-- | mysql-test/suite/sys_vars/t/slow_launch_time_func.test | 2 | ||||
-rw-r--r-- | mysql-test/t/wait_timeout.test | 2 | ||||
-rw-r--r-- | mysys/thr_alarm.c | 88 | ||||
-rw-r--r-- | sql/CMakeLists.txt | 13 | ||||
-rw-r--r-- | sql/mysqld.cc | 11 | ||||
-rw-r--r-- | sql/net_serv.cc | 4 | ||||
-rw-r--r-- | sql/scheduler.cc | 21 | ||||
-rw-r--r-- | sql/scheduler.h | 52 | ||||
-rw-r--r-- | sql/sql_class.cc | 37 | ||||
-rw-r--r-- | sql/sql_class.h | 4 | ||||
-rw-r--r-- | sql/sql_connect.cc | 3 | ||||
-rw-r--r-- | sql/sql_parse.cc | 5 | ||||
-rw-r--r-- | sql/sys_vars.cc | 76 | ||||
-rw-r--r-- | sql/threadpool.h | 47 | ||||
-rw-r--r-- | sql/threadpool_common.cc | 246 | ||||
-rw-r--r-- | sql/threadpool_unix.cc | 1238 | ||||
-rw-r--r-- | sql/threadpool_win.cc | 756 | ||||
-rw-r--r-- | vio/vio.c | 25 | ||||
-rw-r--r-- | vio/vio_priv.h | 4 | ||||
-rw-r--r-- | vio/viosocket.c | 83 |
24 files changed, 2550 insertions, 187 deletions
diff --git a/cmake/os/FreeBSD.cmake b/cmake/os/FreeBSD.cmake index e09592942c1..9bc7d944da2 100644 --- a/cmake/os/FreeBSD.cmake +++ b/cmake/os/FreeBSD.cmake @@ -22,3 +22,10 @@ # The below was used for really old versions of FreeBSD, roughly: before 5.1.9 # ADD_DEFINITIONS(-DHAVE_BROKEN_REALPATH) + +# Use atomic builtins +IF(CMAKE_SIZEOF_VOID_P EQUAL 4 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "i386") + SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -march=i686") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -march=i686") + SET(CMAKE_REQUIRED_FLAGS "${CMAKE_REQUIRED_FLAGS} -march=i686") +ENDIF() diff --git a/include/thr_alarm.h b/include/thr_alarm.h index f4823b618f7..66e344d10fd 100644 --- a/include/thr_alarm.h +++ b/include/thr_alarm.h @@ -40,7 +40,11 @@ typedef struct st_alarm_info } ALARM_INFO; void thr_alarm_info(ALARM_INFO *info); +extern my_bool my_disable_thr_alarm; +#ifdef _WIN32 +#define DONT_USE_THR_ALARM +#endif #if defined(DONT_USE_THR_ALARM) #define USE_ALARM_THREAD @@ -88,7 +92,7 @@ typedef struct st_alarm { extern uint thr_client_alarm; extern pthread_t alarm_thread; -extern my_bool my_disable_thr_alarm; + #define thr_alarm_init(A) (*(A))=0 #define thr_alarm_in_use(A) (*(A)!= 0) diff --git a/include/violite.h b/include/violite.h index ba057028ed2..05b20245c5a 100644 --- a/include/violite.h +++ b/include/violite.h @@ -168,6 +168,7 @@ void vio_end(void); #define vio_should_retry(vio) (vio)->should_retry(vio) #define vio_was_interrupted(vio) (vio)->was_interrupted(vio) #define vio_close(vio) ((vio)->vioclose)(vio) +#define vio_shutdown(vio,how) ((vio)->shutdown)(vio,how) #define vio_peer_addr(vio, buf, prt, buflen) (vio)->peer_addr(vio, buf, prt, buflen) #define vio_timeout(vio, which, seconds) (vio)->timeout(vio, which, seconds) #define vio_poll_read(vio, timeout) (vio)->poll_read(vio, timeout) @@ -219,6 +220,7 @@ struct st_vio void (*timeout)(Vio*, unsigned int which, unsigned int timeout); my_bool (*poll_read)(Vio *vio, uint timeout); my_bool (*is_connected)(Vio*); + int (*shutdown)(Vio *, int); my_bool (*has_data) (Vio*); #ifdef HAVE_OPENSSL void *ssl_arg; @@ -235,6 +237,7 @@ struct st_vio char *shared_memory_pos; #endif /* HAVE_SMEM */ #ifdef _WIN32 + DWORD thread_id; /* Used to XP only in vio_shutdown */ OVERLAPPED pipe_overlapped; DWORD read_timeout_ms; DWORD write_timeout_ms; diff --git a/mysql-test/mysql-test-run.pl b/mysql-test/mysql-test-run.pl index 25427b81eff..e4cd9e6ab89 100755 --- a/mysql-test/mysql-test-run.pl +++ b/mysql-test/mysql-test-run.pl @@ -3461,7 +3461,9 @@ sub mysql_install_db { mtr_add_arg($args, "--loose-skip-ndbcluster"); mtr_add_arg($args, "--loose-skip-aria"); mtr_add_arg($args, "--disable-sync-frm"); - mtr_add_arg($args, "--tmpdir=%s", "$opt_vardir/tmp/"); + mtr_add_arg($args, "--tmpdir=."); + mtr_add_arg($args, "--max_allowed_packet=8M"); + mtr_add_arg($args, "--net_buffer_length=16K"); mtr_add_arg($args, "--core-file"); if ( $opt_debug ) diff --git a/mysql-test/suite/sys_vars/t/slow_launch_time_func.test b/mysql-test/suite/sys_vars/t/slow_launch_time_func.test index c9fc357b10f..a5b429f81cb 100644 --- a/mysql-test/suite/sys_vars/t/slow_launch_time_func.test +++ b/mysql-test/suite/sys_vars/t/slow_launch_time_func.test @@ -29,7 +29,7 @@ # # Setup # - +--source include/not_threadpool.inc --source include/not_embedded.inc --source include/not_threadpool.inc diff --git a/mysql-test/t/wait_timeout.test b/mysql-test/t/wait_timeout.test index eb86cf17ebb..68c0957347d 100644 --- a/mysql-test/t/wait_timeout.test +++ b/mysql-test/t/wait_timeout.test @@ -8,7 +8,7 @@ ############################################################################### # These tests cannot run with the embedded server -- source include/not_embedded.inc --- source include/one_thread_per_connection.inc +#-- source include/one_thread_per_connection.inc # Save the initial number of concurrent sessions --source include/count_sessions.inc diff --git a/mysys/thr_alarm.c b/mysys/thr_alarm.c index adce7407239..4b5144e90bf 100644 --- a/mysys/thr_alarm.c +++ b/mysys/thr_alarm.c @@ -597,93 +597,6 @@ static void *alarm_handler(void *arg __attribute__((unused))) return 0; /* Impossible */ } #endif /* USE_ALARM_THREAD */ - -/***************************************************************************** - thr_alarm for win95 -*****************************************************************************/ - -#else /* __WIN__ */ - -void thr_alarm_kill(my_thread_id thread_id) -{ - /* Can't do this yet */ -} - -sig_handler process_alarm(int sig __attribute__((unused))) -{ - /* Can't do this yet */ -} - - -my_bool thr_alarm(thr_alarm_t *alrm, uint sec, ALARM *alarm) -{ - (*alrm)= &alarm->alarmed; - if (alarm_aborted) - { - alarm->alarmed.crono=0; - return 1; - } - if (!(alarm->alarmed.crono=SetTimer((HWND) NULL,0, sec*1000, - (TIMERPROC) NULL))) - return 1; - return 0; -} - - -my_bool thr_got_alarm(thr_alarm_t *alrm_ptr) -{ - thr_alarm_t alrm= *alrm_ptr; - MSG msg; - if (alrm->crono) - { - PeekMessage(&msg,NULL,WM_TIMER,WM_TIMER,PM_REMOVE) ; - if (msg.message == WM_TIMER || alarm_aborted) - { - KillTimer(NULL, alrm->crono); - alrm->crono = 0; - } - } - return !alrm->crono || alarm_aborted; -} - - -void thr_end_alarm(thr_alarm_t *alrm_ptr) -{ - thr_alarm_t alrm= *alrm_ptr; - /* alrm may be zero if thr_alarm aborted with an error */ - if (alrm && alrm->crono) - - { - KillTimer(NULL, alrm->crono); - alrm->crono = 0; - } -} - -void end_thr_alarm(my_bool free_structures) -{ - DBUG_ENTER("end_thr_alarm"); - alarm_aborted=1; /* No more alarms */ - DBUG_VOID_RETURN; -} - -void init_thr_alarm(uint max_alarm) -{ - DBUG_ENTER("init_thr_alarm"); - alarm_aborted=0; /* Yes, Gimmie alarms */ - DBUG_VOID_RETURN; -} - -void thr_alarm_info(ALARM_INFO *info) -{ - bzero((char*) info, sizeof(*info)); -} - -void resize_thr_alarm(uint max_alarms) -{ -} - -#endif /* __WIN__ */ - #endif /**************************************************************************** @@ -954,4 +867,5 @@ int main(int argc __attribute__((unused)),char **argv __attribute__((unused))) } #endif /* !defined(DONT_USE_ALARM_THREAD) */ +#endif /* WIN */ #endif /* MAIN */ diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index ce6c96e55f2..1772c9e12b2 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -31,7 +31,7 @@ ${CMAKE_CURRENT_BINARY_DIR}/lex_hash.h SET_SOURCE_FILES_PROPERTIES(${GEN_SOURCES} PROPERTIES GENERATED 1) -ADD_DEFINITIONS(-DMYSQL_SERVER -DHAVE_EVENT_SCHEDULER) +ADD_DEFINITIONS(-DMYSQL_SERVER -DHAVE_EVENT_SCHEDULER -DHAVE_POOL_OF_THREADS) IF(SSL_DEFINES) ADD_DEFINITIONS(${SSL_DEFINES}) ENDIF() @@ -82,9 +82,16 @@ SET (SQL_SOURCE opt_index_cond_pushdown.cc opt_subselect.cc opt_table_elimination.cc sql_expression_cache.cc gcalc_slicescan.cc gcalc_tools.cc - + threadpool_common.cc ${GEN_SOURCES} - ${MYSYS_LIBWRAP_SOURCE}) + ${MYSYS_LIBWRAP_SOURCE} + ) + +IF(WIN32) + SET(SQL_SOURCE ${SQL_SOURCE} threadpool_win.cc) +ELSE() + SET(SQL_SOURCE ${SQL_SOURCE} threadpool_unix.cc) +ENDIF() MYSQL_ADD_PLUGIN(partition ha_partition.cc STORAGE_ENGINE DEFAULT STATIC_ONLY RECOMPILE_FOR_EMBEDDED) diff --git a/sql/mysqld.cc b/sql/mysqld.cc index c43f516fc2e..303e7d00221 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -73,6 +73,7 @@ #include <waiting_threads.h> #include "debug_sync.h" #include "sql_callback.h" +#include "threadpool.h" #ifdef WITH_PERFSCHEMA_STORAGE_ENGINE #include "../storage/perfschema/pfs_server.h" @@ -5236,6 +5237,8 @@ default_service_handling(char **argv, int mysqld_main(int argc, char **argv) { + my_progname= argv[0]; + /* When several instances are running on the same machine, we need to have an unique named hEventShudown through the @@ -7133,6 +7136,10 @@ SHOW_VAR status_vars[]= { {"Tc_log_page_size", (char*) &tc_log_page_size, SHOW_LONG}, {"Tc_log_page_waits", (char*) &tc_log_page_waits, SHOW_LONG}, #endif +#ifndef EMBEDDED_LIBRARY + {"Threadpool_idle_threads", (char *) &tp_stats.num_waiting_threads, SHOW_INT}, + {"Threadpool_threads", (char *) &tp_stats.num_worker_threads, SHOW_INT}, +#endif {"Threads_cached", (char*) &cached_thread_count, SHOW_LONG_NOFLUSH}, {"Threads_connected", (char*) &connection_count, SHOW_INT}, {"Threads_created", (char*) &thread_created, SHOW_LONG_NOFLUSH}, @@ -8018,7 +8025,9 @@ static int get_options(int *argc_ptr, char ***argv_ptr) else if (thread_handling == SCHEDULER_NO_THREADS) one_thread_scheduler(thread_scheduler); else - pool_of_threads_scheduler(thread_scheduler); /* purecov: tested */ + pool_of_threads_scheduler(thread_scheduler, &max_connections, + &connection_count); + one_thread_per_connection_scheduler(extra_thread_scheduler, &extra_max_connections, &extra_connection_count); diff --git a/sql/net_serv.cc b/sql/net_serv.cc index ccc67e64ea4..9011d570497 100644 --- a/sql/net_serv.cc +++ b/sql/net_serv.cc @@ -842,7 +842,7 @@ my_real_read(NET *net, size_t *complen) DBUG_PRINT("info",("vio_read returned %ld errno: %d", (long) length, vio_errno(net->vio))); -#if !defined(__WIN__) || defined(MYSQL_SERVER) +#if !defined(__WIN__) && defined(MYSQL_SERVER) /* We got an error that there was no data on the socket. We now set up an alarm to not 'read forever', change the socket to the blocking @@ -874,7 +874,7 @@ my_real_read(NET *net, size_t *complen) continue; } } -#endif /* (!defined(__WIN__) || defined(MYSQL_SERVER) */ +#endif /* (!defined(__WIN__) && defined(MYSQL_SERVER) */ if (thr_alarm_in_use(&alarmed) && !thr_got_alarm(&alarmed) && interrupted) { /* Probably in MIT threads */ diff --git a/sql/scheduler.cc b/sql/scheduler.cc index f04fdef39f9..c174d300d2e 100644 --- a/sql/scheduler.cc +++ b/sql/scheduler.cc @@ -79,7 +79,7 @@ static void scheduler_wait_sync_end(void) { one_thread_scheduler() or one_thread_per_connection_scheduler() in mysqld.cc, so this init function will always be called. */ -static void scheduler_init() { +void scheduler_init() { thr_set_lock_wait_callback(scheduler_wait_lock_begin, scheduler_wait_lock_end); thr_set_sync_wait_callback(scheduler_wait_sync_begin, @@ -124,25 +124,6 @@ void one_thread_scheduler(scheduler_functions *func) } -#ifdef HAVE_POOL_OF_THREADS - -/* - thd_scheduler keeps the link between THD and events. - It's embedded in the THD class. -*/ - -thd_scheduler::thd_scheduler() - : m_psi(NULL), logged_in(FALSE), io_event(NULL), thread_attached(FALSE) -{ -} - - -thd_scheduler::~thd_scheduler() -{ - my_free(io_event); -} - -#endif /* no pluggable schedulers in mariadb. diff --git a/sql/scheduler.h b/sql/scheduler.h index 03e1ad385c1..c91df8512ab 100644 --- a/sql/scheduler.h +++ b/sql/scheduler.h @@ -76,13 +76,11 @@ void one_thread_per_connection_scheduler(scheduler_functions *func, ulong *arg_max_connections, uint *arg_connection_count); void one_thread_scheduler(scheduler_functions *func); -#if defined(HAVE_LIBEVENT) && !defined(EMBEDDED_LIBRARY) -#define HAVE_POOL_OF_THREADS 1 - -struct event; - -class thd_scheduler +/* + To be used for pool-of-threads (implemeneted differently on various OSs) +*/ +struct thd_scheduler { public: /* @@ -96,29 +94,33 @@ public: differently. */ PSI_thread *m_psi; - - bool logged_in; - struct event* io_event; - LIST list; - bool thread_attached; /* Indicates if THD is attached to the OS thread */ - - thd_scheduler(); - ~thd_scheduler(); - bool init(THD* parent_thd); - bool thread_attach(); - void thread_detach(); + void *data; /* scheduler-specific data structure */ +#ifndef DBUG_OFF + bool set_explain; + char dbug_explain[512]; +#endif }; -void pool_of_threads_scheduler(scheduler_functions* func); -#else +void *thd_get_scheduler_data(THD *thd); +void thd_set_scheduler_data(THD *thd, void *data); +PSI_thread* thd_get_psi(THD *thd); +void thd_set_psi(THD *thd, PSI_thread *psi); -#define pool_of_threads_scheduler(A) \ - one_thread_per_connection_scheduler(A, &max_connections, \ - &connection_count) +/* Common thread pool routines, suitable for different implementations */ +extern void threadpool_remove_connection(THD *thd); +extern int threadpool_process_request(THD *thd); +extern int threadpool_add_connection(THD *thd); -class thd_scheduler -{}; -#endif +extern scheduler_functions *thread_scheduler; +#endif /* SCHEDULER_INCLUDED */ +#if !defined(EMBEDDED_LIBRARY) +#define HAVE_POOL_OF_THREADS 1 +void pool_of_threads_scheduler(scheduler_functions* func, + ulong *arg_max_connections, + uint *arg_connection_count); +#else +#define pool_of_threads_scheduler(A,B,C) \ + one_thread_per_connection_scheduler(A, B, C) #endif diff --git a/sql/sql_class.cc b/sql/sql_class.cc index cc58a131f00..63e1b36073f 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -1536,35 +1536,10 @@ void THD::awake(killed_state state_to_set) if (state_to_set >= KILL_CONNECTION || state_to_set == NOT_KILLED) { #ifdef SIGNAL_WITH_VIO_CLOSE - if (this != current_thd) + if (this != current_thd) { - /* - Before sending a signal, let's close the socket of the thread - that is being killed ("this", which is not the current thread). - This is to make sure it does not block if the signal is lost. - This needs to be done only on platforms where signals are not - a reliable interruption mechanism. - - Note that the downside of this mechanism is that we could close - the connection while "this" target thread is in the middle of - sending a result to the application, thus violating the client- - server protocol. - - On the other hand, without closing the socket we have a race - condition. If "this" target thread passes the check of - thd->killed, and then the current thread runs through - THD::awake(), sets the 'killed' flag and completes the - signaling, and then the target thread runs into read(), it will - block on the socket. As a result of the discussions around - Bug#37780, it has been decided that we accept the race - condition. A second KILL awakes the target from read(). - - If we are killing ourselves, we know that we are not blocked. - We also know that we will check thd->killed before we go for - reading the next statement. - */ - - close_active_vio(); + if(active_vio) + vio_shutdown(active_vio, SHUT_RDWR); } #endif @@ -1668,7 +1643,7 @@ void THD::disconnect() /* Disconnect even if a active vio is not associated. */ if (net.vio != vio) - vio_close(net.vio); + vio_close(net.vio); mysql_mutex_unlock(&LOCK_thd_data); } @@ -1740,6 +1715,10 @@ bool THD::store_globals() mysys_var->stack_ends_here= thread_stack + // for consistency, see libevent_thread_proc STACK_DIRECTION * (long)my_thread_stack_size; +#ifdef _WIN32 + if (net.vio) + net.vio->thread_id= real_id; /* Required to support IO cancelation on XP */ +#endif /* We have to call thr_lock_info_init() again here as THD may have been created in another thread diff --git a/sql/sql_class.h b/sql/sql_class.h index f271a6d5cd9..f87af8842d8 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -2339,6 +2339,10 @@ public: { mysql_mutex_lock(&LOCK_thd_data); active_vio = vio; +#ifdef _WIN32 + /* Required to support cancelation on XP */ + active_vio->thread_id = pthread_self(); +#endif mysql_mutex_unlock(&LOCK_thd_data); } inline void clear_active_vio() diff --git a/sql/sql_connect.cc b/sql/sql_connect.cc index 3c88b7a054d..570f869c2a6 100644 --- a/sql/sql_connect.cc +++ b/sql/sql_connect.cc @@ -890,6 +890,7 @@ static int check_connection(THD *thd) DBUG_PRINT("info", ("New connection received on %s", vio_description(net->vio))); + #ifdef SIGNAL_WITH_VIO_CLOSE thd->set_active_vio(net->vio); #endif @@ -1175,7 +1176,7 @@ void do_handle_one_connection(THD *thd_arg) /* We need to set this because of time_out_user_resource_limits */ thd->start_utime= thd->thr_create_utime; - if (MYSQL_CALLBACK_ELSE(thread_scheduler, init_new_connection_thread, (), 0)) + if (MYSQL_CALLBACK_ELSE(thd->scheduler, init_new_connection_thread, (), 0)) { close_connection(thd, ER_OUT_OF_RESOURCES); statistic_increment(aborted_connects,&LOCK_status); diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 719fcad5883..7ee6c9fb74f 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -678,6 +678,7 @@ void cleanup_items(Item *item) @retval 1 request of thread shutdown (see dispatch_command() description) */ +int skip_net_wait_timeout = 0; bool do_command(THD *thd) { @@ -700,7 +701,9 @@ bool do_command(THD *thd) the client, the connection is closed or "net_wait_timeout" number of seconds has passed. */ - my_net_set_read_timeout(net, thd->variables.net_wait_timeout); + if(!skip_net_wait_timeout) + my_net_set_read_timeout(net, thd->variables.net_wait_timeout); + /* XXX: this code is here only to clear possible errors of init_connect. diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index d01f1892d58..f547292e239 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -50,6 +50,7 @@ #ifdef WITH_PERFSCHEMA_STORAGE_ENGINE #include "../storage/perfschema/pfs_server.h" #endif /* WITH_PERFSCHEMA_STORAGE_ENGINE */ +#include "threadpool.h" /* The rule for this file: everything should be 'static'. When a sys_var @@ -1804,7 +1805,13 @@ static Sys_var_enum Sys_thread_handling( ", pool-of-threads" #endif , READ_ONLY GLOBAL_VAR(thread_handling), CMD_LINE(REQUIRED_ARG), - thread_handling_names, DEFAULT(0)); + thread_handling_names, +#ifdef HAVE_POOL_OF_THREADS + DEFAULT(2) +#else + DEFAULT(0) +#endif + ); #ifdef HAVE_QUERY_CACHE static bool fix_query_cache_size(sys_var *self, THD *thd, enum_var_type type) @@ -2173,15 +2180,68 @@ static Sys_var_ulong Sys_thread_cache_size( GLOBAL_VAR(thread_cache_size), CMD_LINE(REQUIRED_ARG), VALID_RANGE(0, 16384), DEFAULT(0), BLOCK_SIZE(1)); -#ifdef HAVE_POOL_OF_THREADS -static Sys_var_ulong Sys_thread_pool_size( - "thread_pool_size", - "How many threads we should create to handle query requests in " - "case of 'thread_handling=pool-of-threads'", - GLOBAL_VAR(thread_pool_size), CMD_LINE(REQUIRED_ARG), - VALID_RANGE(1, 16384), DEFAULT(20), BLOCK_SIZE(0)); +#ifndef HAVE_POOL_OF_THREADS +static bool fix_tp_max_threads(sys_var *, THD *, enum_var_type) +{ +#ifdef _WIN32 + tp_set_max_threads(threadpool_max_threads); +#endif + return false; +} + +#ifdef _WIN32 +static bool fix_tp_min_threads(sys_var *, THD *, enum_var_type) +{ + tp_set_min_threads(threadpool_min_threads); + return false; +} #endif +#ifdef _WIN32 +static Sys_var_uint Sys_threadpool_min_threads( + "thread_pool_min_threads", + "Minimuim number of threads in the thread pool.", + GLOBAL_VAR(threadpool_min_threads), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(1, 256), DEFAULT(1), BLOCK_SIZE(1), + NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), + ON_UPDATE(fix_tp_min_threads) + ); +#else +static Sys_var_uint Sys_threadpool_idle_thread_timeout( + "thread_pool_idle_timeout", + "Timeout in seconds for an idle thread in the thread pool." + "Worker thread will be shut down after timeout", + GLOBAL_VAR(threadpool_idle_timeout), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(1, UINT_MAX/100), DEFAULT(60000), BLOCK_SIZE(1) +); +static Sys_var_uint Sys_threadpool_size( + "thread_pool_size", + "Number of concurrently executing threads in the pool. " + "Leaving value default (0) sets it to the number of processors.", + GLOBAL_VAR(threadpool_size), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(0, 128), DEFAULT(0), BLOCK_SIZE(1) +); +static Sys_var_uint Sys_threadpool_stall_limit( + "thread_pool_stall_limit", + "Maximum query execution time before in milliseconds," + "before an executing non-yielding thread is considered stalled." + "If a worker thread is stalled, additional worker thread " + "may be created to handle remaining clients.", + GLOBAL_VAR(threadpool_stall_limit), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(60, UINT_MAX), DEFAULT(500), BLOCK_SIZE(1) +); +#endif /*! WIN32 */ +static Sys_var_uint Sys_threadpool_max_threads( + "thread_pool_max_threads", + "Maximum allowed number of worker threads in the thread pool", + GLOBAL_VAR(threadpool_max_threads), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(1, UINT_MAX), DEFAULT(3000), BLOCK_SIZE(1), + NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), + ON_UPDATE(fix_tp_max_threads) +); +#endif /* !HAVE_POOL_OF_THREADS */ + + /** Can't change the 'next' tx_isolation if we are already in a transaction. diff --git a/sql/threadpool.h b/sql/threadpool.h new file mode 100644 index 00000000000..0694981d76f --- /dev/null +++ b/sql/threadpool.h @@ -0,0 +1,47 @@ + +/* Threadpool parameters */ +#ifdef _WIN32 +extern uint threadpool_min_threads; /* Minimum threads in pool */ +#else +extern uint threadpool_idle_timeout; /* Shutdown idle worker threads after this timeout */ +extern uint threadpool_size; /* Number of parallel executing threads */ +extern uint threadpool_stall_limit; /* time interval in 10 ms units for stall checks*/ +#endif +extern uint threadpool_max_threads; /* Maximum threads in pool */ + +/* + Threadpool statistics +*/ +struct TP_STATISTICS +{ + /* Current number of worker thread. */ + volatile int num_worker_threads; + /* Current number of idle threads. */ + volatile int num_waiting_threads; + /* Number of login requests are queued but not yet processed. */ + volatile int pending_login_requests; + /* Number of threads that are starting. */ + volatile int pending_thread_starts; + /* Number of threads that are being shut down */ + volatile int pending_thread_shutdowns; + /* Time (in milliseconds) since pool is blocked (num_waiting_threads is 0) */ + ulonglong pool_block_duration; + /* Maximum duration of the pending login, im milliseconds. */ + ulonglong pending_login_duration; + /* Time since last thread was created */ + ulonglong time_since_last_thread_creation; + /* Number of requests processed since pool monitor run last time. */ + volatile int requests_dequeued; + volatile int requests_completed; +}; + +extern TP_STATISTICS tp_stats; + + +/* Functions to set threadpool parameters */ +extern void tp_set_min_threads(uint val); +extern void tp_set_max_threads(uint val); + +/* Activate threadpool scheduler */ +extern void tp_scheduler(void); + diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc new file mode 100644 index 00000000000..01546162377 --- /dev/null +++ b/sql/threadpool_common.cc @@ -0,0 +1,246 @@ +#include <my_global.h> +#include <violite.h> +#include <sql_priv.h> +#include <sql_class.h> +#include <my_pthread.h> +#include <scheduler.h> +#include <sql_connect.h> +#include <sql_audit.h> +#include <debug_sync.h> + + +extern bool login_connection(THD *thd); +extern bool do_command(THD *thd); +extern void prepare_new_connection_state(THD* thd); +extern void end_connection(THD *thd); +extern void thd_cleanup(THD *thd); +extern void delete_thd(THD *thd); + +/* Threadpool parameters */ +#ifdef _WIN32 +uint threadpool_min_threads; +#else +uint threadpool_idle_timeout; +uint threadpool_size; +uint threadpool_stall_limit; +#endif +uint threadpool_max_threads; + + +/* + Attach/associate the connection with the OS thread, for command processing. +*/ +static inline bool thread_attach(THD* thd, char *stack_start, PSI_thread **save_psi_thread) +{ + DBUG_ENTER("thread_attach"); + + if (PSI_server) + { + *save_psi_thread= PSI_server->get_thread(); + PSI_server->set_thread(thd->event_scheduler.m_psi); + } + else + *save_psi_thread= NULL; + + /* + We need to know the start of the stack so that we could check for + stack overruns. + */ + thd->thread_stack= stack_start; + + + /* Calls close_connection() on failure */ + if (setup_connection_thread_globals(thd)) + { + DBUG_RETURN(TRUE); + } + + /* clear errors from processing the previous THD */ + my_errno= 0; + thd->mysys_var->abort= 0; + +#ifndef DBUG_OFF + if (thd->event_scheduler.set_explain) + DBUG_SET(thd->event_scheduler.dbug_explain); +#endif + + DBUG_RETURN(FALSE); +} + +/* + Detach/disassociate the connection with the OS thread. +*/ +static inline void thread_detach(THD* thd, PSI_thread *restore_psi_thread) +{ + DBUG_ENTER("thread_detach"); + thd->mysys_var = NULL; +#ifndef DBUG_OFF + /* + If during the session @@session.dbug was assigned, the + dbug options/state has been pushed. Check if this is the + case, to be able to restore the state when we attach this + logical connection to a physical thread. + */ + if (_db_is_pushed_()) + { + thd->event_scheduler.set_explain= TRUE; + if (DBUG_EXPLAIN(thd->event_scheduler.dbug_explain, sizeof(thd->event_scheduler.dbug_explain))) + sql_print_error("thd_scheduler: DBUG_EXPLAIN buffer is too small"); + } + /* DBUG_POP() is a no-op in case there is no session state */ + DBUG_POP(); +#endif + if (PSI_server) + PSI_server->set_thread(restore_psi_thread); + pthread_setspecific(THR_THD, NULL); + DBUG_VOID_RETURN; +} + + + +int threadpool_add_connection(THD *thd) +{ + int retval=1; + PSI_thread *psi_thread; +#ifndef DBUG_OFF + thd->event_scheduler.set_explain = 0; +#endif + thread_attach(thd, (char *)&thd, &psi_thread); + ulonglong now= microsecond_interval_timer(); + thd->prior_thr_create_utime= now; + thd->start_utime= now; + thd->thr_create_utime= now; + + if (PSI_server) + { + thd->event_scheduler.m_psi = + PSI_server->new_thread(key_thread_one_connection, thd, thd->thread_id); + PSI_server->set_thread(thd->event_scheduler.m_psi); + } + + if (setup_connection_thread_globals(thd) == 0) + { + if (login_connection(thd) == 0) + { + prepare_new_connection_state(thd); + retval = thd_is_connection_alive(thd)?0:-1; + thd->net.reading_or_writing= 1; + } + } + + thread_detach(thd, psi_thread); + return retval; +} + +void threadpool_remove_connection(THD *thd) +{ + PSI_thread *save_psi_thread; + + thread_attach(thd, (char *)&thd, &save_psi_thread); + thd->killed= KILL_CONNECTION; + + thd->net.reading_or_writing= 0; + + end_connection(thd); + close_connection(thd, 0); + + mysql_mutex_lock(&thd->LOCK_thd_data); + thd->event_scheduler.data= NULL; + mysql_mutex_unlock(&thd->LOCK_thd_data); + + unlink_thd(thd); + mysql_mutex_unlock(&LOCK_thread_count); + mysql_cond_broadcast(&COND_thread_count); + DBUG_POP(); + if (PSI_server) + PSI_server->delete_current_thread(); + pthread_setspecific(THR_THD, NULL); +} + +int threadpool_process_request(THD *thd) +{ + int retval= 0; + PSI_thread *psi_thread; + thread_attach(thd, (char *)&thd, &psi_thread); + + if (thd->killed == KILL_CONNECTION) + { + /* + kill flag can be set have been killed by + timeout handler or by a KILL command + */ + thread_detach(thd, psi_thread); + return 1; + } + + for(;;) + { + Vio *vio; + thd->net.reading_or_writing= 0; + mysql_audit_release(thd); + + if ((retval= do_command(thd)) != 0) + break ; + + if (!thd_is_connection_alive(thd)) + { + retval= 1; + break; + } + + vio= thd->net.vio; + if (!vio->has_data(vio)) + { + /* + More info on this debug sync is in sql_parse.cc + */ + DEBUG_SYNC(thd, "before_do_command_net_read"); + break; + } + } + thread_detach(thd, psi_thread); + if (!retval) + thd->net.reading_or_writing= 1; + return retval; +} + + +/* + Scheduler struct, individual functions are implemented + in threadpool_unix.cc or threadpool_win.cc +*/ + +extern bool tp_init(); +extern void tp_add_connection(THD*); +extern void tp_wait_begin(THD *, int); +extern void tp_wait_end(THD*); +extern void tp_post_kill_notification(THD *thd); +extern void tp_end(void); + +static scheduler_functions tp_scheduler_functions= +{ + 0, // max_threads + NULL, + NULL, + tp_init, // init + NULL, // init_new_connection_thread + tp_add_connection, // add_connection + tp_wait_begin, // thd_wait_begin + tp_wait_end, // thd_wait_end + tp_post_kill_notification, // post_kill_notification + NULL, // end_thread + tp_end // end +}; + +extern void scheduler_init(); + +void pool_of_threads_scheduler(struct scheduler_functions *func, + ulong *arg_max_connections, + uint *arg_connection_count) +{ + *func = tp_scheduler_functions; + func->max_threads= *arg_max_connections + 1; + func->max_connections= arg_max_connections; + func->connection_count= arg_connection_count; + scheduler_init(); +} diff --git a/sql/threadpool_unix.cc b/sql/threadpool_unix.cc new file mode 100644 index 00000000000..9fa95f151a5 --- /dev/null +++ b/sql/threadpool_unix.cc @@ -0,0 +1,1238 @@ +#include <my_global.h> +#include <violite.h> +#include <sql_priv.h> +#include <sql_class.h> +#include <my_pthread.h> +#include <scheduler.h> +#include <sql_connect.h> +#include <mysqld.h> +#include <debug_sync.h> +#include <sys/queue.h> +#include <time.h> + +#include <threadpool.h> +#ifdef __linux__ +#include <sys/epoll.h> +typedef struct epoll_event native_event; +#endif +#if defined (__FreeBSD__) || defined (__APPLE__) +#include <sys/event.h> +typedef struct kevent native_event; +#endif +#if defined (__sun) +#include <port.h> +typedef port_event_t native_event; +#endif + + +static PSI_mutex_key key_group_mutex; +static PSI_mutex_key key_timer_mutex; +static PSI_mutex_info mutex_list[]= +{ + { &key_group_mutex, "group_mutex", 0}, + { &key_timer_mutex, "timer_mutex", PSI_FLAG_GLOBAL} +}; + +static PSI_cond_key key_worker_cond; +static PSI_cond_key key_timer_cond; +static PSI_cond_info cond_list[]= +{ + { &key_worker_cond, "worker_cond", 0}, + { &key_timer_cond, "timer_cond", PSI_FLAG_GLOBAL} +}; + +static PSI_thread_key key_worker_thread; +static PSI_thread_key key_timer_thread; +static PSI_thread_info thread_list[] = +{ + {&key_worker_thread, "worker_thread", 0}, + {&key_timer_thread, "timer_thread", PSI_FLAG_GLOBAL} +}; + + +TP_STATISTICS tp_stats; + + +struct thread_group_t; + +/* Per-thread structure for workers */ +struct worker_thread_t +{ + mysql_cond_t cond; + bool woken; + thread_group_t* thread_group; + ulonglong event_count; /* Stats: number of executed requests */ + SLIST_ENTRY(worker_thread_t) ptr; +}; + +/* + Data associated with an io event (also can be sent with with explicit + post_event()) +*/ +struct pool_event_t +{ + STAILQ_ENTRY (pool_event_t) next; + void *data; +}; +static pool_event_t POOL_SHUTDOWN_EVENT; + +struct thread_group_t +{ + + mysql_mutex_t mutex; + STAILQ_HEAD(queue_listhead, pool_event_t) queue; + SLIST_HEAD(wait_listhead, worker_thread_t) waiting_threads; + int pollfd; + int thread_count; + int active_thread_count; + int pending_thread_start_count; + int connection_count; + bool shutdown; + bool stalled; + int shutdown_pipe[2]; + worker_thread_t *listener; + pthread_attr_t *pthread_attr; + ulonglong last_thread_creation_time; + /* Stats for the deadlock detection timer routine.*/ + ulonglong io_event_count; + ulonglong queue_event_count; +} MY_ALIGNED(512); + +static thread_group_t all_groups[128]; + +/* Global timer for all groups */ +struct pool_timer_t +{ + mysql_mutex_t mutex; + mysql_cond_t cond; + int tick_interval; + volatile ulonglong current_microtime; + volatile ulonglong next_timeout_check; + bool shutdown; +}; + +static pool_timer_t pool_timer; + +struct connection_t +{ + pool_event_t event; + THD *thd; + thread_group_t *thread_group; + ulonglong abs_wait_timeout; + bool logged_in; + bool waiting; +}; + +/* Externals functions and variables we use */ +extern uint thread_created; +extern void scheduler_init(); +extern pthread_attr_t *get_connection_attrib(void); +extern int skip_net_wait_timeout; + + +static void post_event(thread_group_t *thread_group, pool_event_t* ev); +static int wake_thread(thread_group_t *thread_group); +static void handle_event(pool_event_t *ev); +static int wake_or_create_thread(thread_group_t *thread_group); +static int create_worker(thread_group_t *thread_group); +static void *worker_main(void *param); +static void check_stall(thread_group_t *thread_group); +static void connection_abort(connection_t *connection); +void tp_post_kill_notification(THD *thd); +static void set_wait_timeout(connection_t *connection); +static void set_next_timeout_check(ulonglong abstime); + + +/** + Asynchronous network IO. + + We use native edge-triggered network IO multiplexing facility. + This maps to different APIs on different Unixes. + + Supported are currently Linux with epoll, Solaris with event ports, + OSX and BSD with kevent. All those API's are used with one-shot flags + (the event is signalled once client has written something into the socket, + then socket is removed from the "poll-set" until the command is finished, + and we need to re-arm/re-register socket) + + No implementation for poll/select/AIO is currently provided. + + The API closely resembles all of the above mentioned platform APIs + and consists of following functions. + + - io_poll_create() + Creates an io_poll descriptor + On Linux: epoll_create() + + - io_poll_associate_fd(int poll_fd, int fd, void *data) + Associate file descriptor with io poll descriptor + On Linux : epoll_ctl(..EPOLL_CTL_ADD)) + + - io_poll_disassociate_fd(int pollfd, int fd) + Associate file descriptor with io poll descriptor + On Linux: epoll_ctl(..EPOLL_CTL_DEL) + + + - io_poll_start_read(int poll_fd,int fd, void *data) + The same as io_poll_associate_fd(), but cannot be used before + io_poll_associate_fd() was called. + On Linux : epoll_ctl(..EPOLL_CTL_MOD) + + - io_poll_wait (int pollfd, native_event *native_events, int maxevents, + int timeout_ms) + + wait until one or more descriptors added with io_poll_associate_fd() + or io_poll_start_read() becomes readable. Data associated with + descriptors can be retrieved from native_events array, using + native_event_get_userdata() function. + + On Linux: epoll_wait() +*/ + +#if defined (__linux__) +static int io_poll_create() +{ + return epoll_create(1); +} + + +int io_poll_associate_fd(int pollfd, int fd, void *data) +{ + struct epoll_event ev; + ev.data.ptr= data; + ev.events= EPOLLIN|EPOLLET|EPOLLERR|EPOLLRDHUP|EPOLLONESHOT; + return epoll_ctl(pollfd, EPOLL_CTL_ADD, fd, &ev); +} + + + +int io_poll_start_read(int pollfd, int fd, void *data) +{ + struct epoll_event ev; + ev.data.ptr= data; + ev.events= EPOLLIN|EPOLLET|EPOLLERR|EPOLLRDHUP|EPOLLONESHOT; + return epoll_ctl(pollfd, EPOLL_CTL_MOD, fd, &ev); +} + +void io_poll_disassociate_fd(int pollfd, int fd) +{ + struct epoll_event ev; + epoll_ctl(pollfd, EPOLL_CTL_DEL, fd, &ev); +} + + +int io_poll_wait(int pollfd, native_event *native_events, int maxevents, + int timeout_ms) +{ + int ret; + do + { + ret = epoll_wait(pollfd, native_events, maxevents, timeout_ms); + } + while(ret == -1 && errno == EINTR); + return ret; +} + +static void *native_event_get_userdata(native_event *event) +{ + return event->data.ptr; +} + +#elif defined (__FreeBSD__) || defined (__APPLE__) +int io_poll_create() +{ + return kqueue(); +} + +int io_poll_start_read(int pollfd, int fd, void *data) +{ + struct kevent ke; + EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ENABLE|EV_CLEAR, + 0, 0, data); + return kevent(pollfd, &ke, 1, 0, 0, 0); +} + + +int io_poll_associate_fd(int pollfd, int fd, void *data) +{ + return io_poll_start_read(poolfd,fd, data); +} + + +int io_poll_disassociate_fd(thread_group_t *thread_group, int fd) +{ + struct kevent ke; + EV_SET(&ke,fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); + return kevent(thread_group->pollfd, &ke, 1, 0, 0, 0); +} + + +int io_poll_wait(int pollfd, struct kevent *events, int maxevents, int timeout_ms) +{ + struct timespec ts; + int ret; + if (timeout_ms >= 0) + { + ts.tv_sec= timeout_ms/1000; + ts.tv_nsec= (timeout_ms%1000)*1000000; + } + do + { + ret= kevent(pollfd, 0, 0, events, maxevents, + (timeout_ms >= 0)?&ts:NULL); + } + while (ret == -1 && errno == EINTR); + if (ret > 0) + { + /* Disable monitoring for the events we that we dequeued */ + for (int i=0; i < ret; i++) + { + struct kevent *ke = &events[i]; + EV_SET(ke, ke->ident, EVFILT_READ, EV_ADD|EV_DISABLE, + 0, 0, ke->udata); + } + kevent(pollfd, events, ret, 0, 0, 0); + } + return ret; +} + +static void* native_event_get_userdata(native_event *event) +{ + return event->udata; +} +#elif defined (__sun) +static int io_poll_create() +{ + return port_create(); +} + +int io_poll_start_read(int pollfd, int fd, void *data) +{ + return port_associate(pollfd, PORT_SOURCE_FD, fd, POLLIN, data); +} + +static int io_poll_associate_fd(int pollfd, int fd, void *data) +{ + return io_poll_start_read(pollfd, fd, data); +} + +int io_poll_wait(int pollfd, native_event *events, int maxevents, int timeout_ms) +{ + struct timespec ts; + int ret; + uint_t nget= 1; + if (timeout_ms >= 0) + { + ts.tv_sec= timeout_ms/1000; + ts.tv_nsec= (timeout_ms%1000)*1000000; + } + do + { + ret= port_getn(pollfd, events, maxevents, &nget, + (timeout_ms >= 0)?&ts:NULL); + } + while (ret == -1 && errno == EINTR); + return nget; +} +static void* native_event_get_userdata(native_event *event) +{ + return event->portev_user; +} +#else +#error not ported yet to this OS +#endif + + + + +/* Dequeue element from a workqueue */ +static pool_event_t *queue_get(thread_group_t *thread_group) +{ + DBUG_ENTER("queue_get"); + pool_event_t *ev= NULL; + thread_group->queue_event_count++; + ev= STAILQ_FIRST(&thread_group->queue); + if (ev) + { + STAILQ_REMOVE_HEAD(&thread_group->queue,next); + } + DBUG_RETURN(ev); +} + +/* Check if workqueue is empty. */ +static bool queue_is_empty(thread_group_t* thread_group) +{ + DBUG_ENTER("queue_is_empty"); + bool empty= (STAILQ_FIRST(&thread_group->queue) == NULL); + DBUG_RETURN(empty); +} + +static void queue_put(thread_group_t *thread_group, pool_event_t *event) +{ + DBUG_ENTER("queue_put"); + STAILQ_INSERT_TAIL(&thread_group->queue, event, next); + DBUG_VOID_RETURN; +} + +static void increment_active_threads(thread_group_t *thread_group) +{ + my_atomic_add32(&tp_stats.num_waiting_threads,-1); + thread_group->active_thread_count++; +} + +static void decrement_active_threads(thread_group_t *thread_group) +{ + my_atomic_add32(&tp_stats.num_waiting_threads,1); + thread_group->active_thread_count--; +} + + +/* + Handle wait timeout : + Find connections that have been idle for too long and kill them. + Also, recalculate time when next timeout check should run. +*/ +static void timeout_check(pool_timer_t *timer) +{ + DBUG_ENTER("timeout_check"); + + mysql_mutex_lock(&LOCK_thread_count); + I_List_iterator<THD> it(threads); + + /* Reset next timeout check, it will be recalculated in the loop below */ + my_atomic_fas64((volatile int64*)&timer->next_timeout_check, ULONGLONG_MAX); + + THD *thd; + while ((thd=it++)) + { + if (thd->net.reading_or_writing != 1) + continue; + + connection_t *connection= (connection_t *)thd->scheduler.data; + if (!connection) + continue; + + if(connection->abs_wait_timeout < timer->current_microtime) + { + /* Wait timeout exceeded, kill connection. */ + mysql_mutex_lock(&thd->LOCK_thd_data); + thd->killed = THD::KILL_CONNECTION; + tp_post_kill_notification(thd); + mysql_mutex_unlock(&thd->LOCK_thd_data); + } + else + { + set_next_timeout_check(connection->abs_wait_timeout); + } + } + mysql_mutex_unlock(&LOCK_thread_count); + DBUG_VOID_RETURN; +} + + +/* + Timer thread. + + Periodically, check if one of the thread groups is stalled. Stalls happen if + events are not being dequeued from the queue, or from the network, Primary + reason for stall can be a lengthy executing non-blocking request. It could + also happen that thread is waiting but wait_begin/wait_end is forgotten by + storage engine. Timer thread will create a new thread in group in case of + a stall. + + Besides checking for stalls, timer thread is also responsible for terminating + clients that have been idle for longer than wait_timeout seconds. +*/ +static void* timer_thread(void *param) +{ + uint i; + + pool_timer_t* timer=(pool_timer_t *)param; + timer->next_timeout_check= ULONGLONG_MAX; + timer->current_microtime= my_micro_time(); + + my_thread_init(); + DBUG_ENTER("timer_thread"); + + for(;;) + { + struct timespec ts; + set_timespec_nsec(ts,timer->tick_interval*1000000); + mysql_mutex_lock(&timer->mutex); + int err = mysql_cond_timedwait(&timer->cond, &timer->mutex, &ts); + if (timer->shutdown) + break; + if (err == ETIMEDOUT) + { + timer->current_microtime= my_micro_time(); + + /* Check stallls in thread groups */ + for(i=0; i< threadpool_size;i++) + { + if(all_groups[i].connection_count) + check_stall(&all_groups[i]); + } + + /* Check if any client exceeded wait_timeout */ + if (timer->next_timeout_check <= timer->current_microtime) + timeout_check(timer); + } + mysql_mutex_unlock(&timer->mutex); + } + DBUG_POP(); + my_thread_end(); + return NULL; +} + + + +void check_stall(thread_group_t *thread_group) +{ + if (mysql_mutex_trylock(&thread_group->mutex) != 0) + { + /* Something happens. Don't disturb */ + return; + } + + /* + Check if listener is present. If not, check whether any IO + events were dequeued since last time. If not, this means + listener is either in tight loop or thd_wait_begin() + was forgotten. Create a new worker(it will make itself listener). + */ + if (!thread_group->listener && !thread_group->io_event_count) + { + wake_or_create_thread(thread_group); + mysql_mutex_unlock(&thread_group->mutex); + return; + } + + /* Reset io event count */ + thread_group->io_event_count= 0; + + /* + Check whether requests from the workqueue are being dequeued. + */ + if (!queue_is_empty(thread_group) && !thread_group->queue_event_count) + { + thread_group->stalled= true; + wake_or_create_thread(thread_group); + } + + /* Reset queue event count */ + thread_group->queue_event_count= 0; + + mysql_mutex_unlock(&thread_group->mutex); +} + + + +static void start_timer(pool_timer_t* timer) +{ + pthread_t thread_id; + DBUG_ENTER("start_timer"); + mysql_mutex_init(key_timer_mutex,&timer->mutex, NULL); + mysql_cond_init(key_timer_cond, &timer->cond, NULL); + timer->shutdown = false; + mysql_thread_create(key_timer_thread,&thread_id, NULL, timer_thread, timer); + DBUG_VOID_RETURN; +} + +static void stop_timer(pool_timer_t *timer) +{ + DBUG_ENTER("stop_timer"); + mysql_mutex_lock(&timer->mutex); + timer->shutdown = true; + mysql_cond_signal(&timer->cond); + mysql_mutex_unlock(&timer->mutex); + DBUG_VOID_RETURN; +} + +#define MAX_EVENTS 1024 + +/* + Poll for socket events and distribute them to worker threads. + In many case current thread will handle single event itself. +*/ +static pool_event_t * listener(worker_thread_t *current_thread, + thread_group_t *thread_group) +{ + DBUG_ENTER("listener"); + + for(;;) + { + native_event ev[MAX_EVENTS]; + int cnt; + + if (thread_group->shutdown) + { + DBUG_RETURN(&POOL_SHUTDOWN_EVENT); + } + do + { + cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, -1); + } + while(cnt <= 0 && errno == EINTR); + + if (cnt <=0) + { + DBUG_ASSERT(thread_group->shutdown); + DBUG_RETURN(&POOL_SHUTDOWN_EVENT); + } + + /* + Put events to queue, maybe wakeup workers. + If queue is currently empty, listener will return + so the current thread handles query itself, this avoids + wakeups and context switches. But if queue is not empty + this smells like a flood of queries, and the listener + stays. + */ + mysql_mutex_lock(&thread_group->mutex); + + if (thread_group->shutdown) + { + mysql_mutex_unlock(&thread_group->mutex); + DBUG_RETURN(&POOL_SHUTDOWN_EVENT); + } + + thread_group->io_event_count += cnt; + bool pick_event= queue_is_empty(thread_group); + + for(int i=(pick_event)?1:0; i < cnt ; i++) + { + pool_event_t *e= (pool_event_t *)native_event_get_userdata(&ev[i]); + queue_put(thread_group, e); + } + + /* Wake at most one worker thread */ + if(thread_group->active_thread_count==0 && + /*!queue_is_empty(thread_group)*/ !pick_event) + { + if(wake_thread(thread_group)) + { + if(thread_group->thread_count == 1) + create_worker(thread_group); + } + } + mysql_mutex_unlock(&thread_group->mutex); + + if (pick_event) + DBUG_RETURN((pool_event_t *)(native_event_get_userdata(&ev[0]))); + } +} + + + + +/* Creates a new worker thread. thread_mutex must be held when calling this function */ + +static int create_worker(thread_group_t *thread_group) +{ + pthread_t thread_id; + int err; + DBUG_ENTER("create_worker"); + if (tp_stats.num_worker_threads >= (int)threadpool_max_threads) + { + DBUG_PRINT("info", + ("Cannot create new thread (maximum allowed threads reached)")); + DBUG_RETURN(-1); + } + + err= pthread_create(&thread_id, thread_group->pthread_attr, worker_main, thread_group); + if (!err) + { + thread_group->pending_thread_start_count++; + thread_group->last_thread_creation_time=my_micro_time(); + } + DBUG_RETURN(err); +} + + +/* + Wakes a worker thread, or creates a new one. + + Worker creation is throttled, so we avoid too many threads + to be created during the short time. +*/ +static int wake_or_create_thread(thread_group_t *thread_group) +{ + ulonglong now; + ulonglong time_since_last_thread_created; + + DBUG_ENTER("wake_or_create_thread"); + + if (wake_thread(thread_group) == 0) + DBUG_RETURN(0); + + if (thread_group->pending_thread_start_count > 0) + DBUG_RETURN(-1); + + if (thread_group->thread_count < 4) + { + DBUG_RETURN(create_worker(thread_group)); + } + + now = my_micro_time(); + time_since_last_thread_created = + (now - thread_group->last_thread_creation_time)/1000; + + if (thread_group->active_thread_count == 0) + { + /* + We're better off creating a new thread here with no delay, as + others threads (at least 4) are all blocking and there was no sleeping + thread to wakeup. It smells like deadlock or very slowly executing + requests, e.g sleeps or user locks. + */ + DBUG_RETURN(create_worker(thread_group)); + } + + /* Throttle thread creation. */ + if ((thread_group->thread_count < 8 && time_since_last_thread_created > 50) + || (thread_group->thread_count < 16 && time_since_last_thread_created > 100) + || (time_since_last_thread_created > 200)) + { + DBUG_RETURN(create_worker(thread_group)); + } + + DBUG_RETURN(-1); +} + + + +/* Initialize thread group */ +int thread_group_init(thread_group_t *thread_group, pthread_attr_t* thread_attr) +{ + DBUG_ENTER("thread_group_init"); + + memset(thread_group, 0, sizeof(thread_group_t)); + thread_group->pthread_attr = thread_attr; + mysql_mutex_init(key_group_mutex, &thread_group->mutex, NULL); + STAILQ_INIT(&thread_group->queue); + SLIST_INIT(&thread_group->waiting_threads); + + thread_group->pending_thread_start_count= 0; + thread_group->pollfd= io_poll_create(); + thread_group->stalled= false; + if (thread_group->pollfd < 0) + { + DBUG_RETURN(-1); + } + if (pipe(thread_group->shutdown_pipe)) + { + DBUG_RETURN(-1); + } + if (io_poll_associate_fd(thread_group->pollfd, + thread_group->shutdown_pipe[0], &POOL_SHUTDOWN_EVENT)) + { + DBUG_RETURN(-1); + } + DBUG_RETURN(0); +} + + +/* + Wake single sleeping thread in pool. Optionally, tell this thread + to listen to socket io notification. + */ +static int wake_thread(thread_group_t *thread_group) +{ + DBUG_ENTER("wake_thread"); + worker_thread_t *thread = SLIST_FIRST(&thread_group->waiting_threads); + if(thread) + { + thread->woken= true; + SLIST_REMOVE_HEAD(&thread_group->waiting_threads, ptr); + if (mysql_cond_signal(&thread->cond)) + abort(); + DBUG_RETURN(0); + } + DBUG_RETURN(-1); /* no thread- missed wakeup*/ +} + + +/* + Shutdown thread group. +*/ +static void thread_group_close(thread_group_t *thread_group) +{ + DBUG_ENTER("thread_group_close"); + + char c= 0; + + mysql_mutex_lock(&thread_group->mutex); + thread_group->shutdown= true; + thread_group->listener= NULL; + + /* Wake listener. */ + if (write(thread_group->shutdown_pipe[1], &c, 1) < 0) + DBUG_VOID_RETURN; + + /* Wake all workers. */ + while(wake_thread(thread_group) == 0) {}; + mysql_mutex_unlock(&thread_group->mutex); + +#if 0 + /* Wait until workers terminate */ + while(thread_group->thread_count) + usleep(1000); +#endif + + DBUG_VOID_RETURN; +} + + +/* + Post a task to the workqueue, maybe wake a worker so + it picks the task. +*/ +static void post_event(thread_group_t *thread_group, pool_event_t* ev) +{ + DBUG_ENTER("post_event"); + + mysql_mutex_lock(&thread_group->mutex); + STAILQ_INSERT_TAIL(&thread_group->queue, ev, next); + if (thread_group->active_thread_count == 0) + { + wake_or_create_thread(thread_group); + } + mysql_mutex_unlock(&thread_group->mutex); + DBUG_VOID_RETURN; +} + + +/* + Check if pool is already overcommited. + This is used to prevent too many threads executing at the same time, + if the workload is not CPU bound. +*/ +static bool too_many_threads(thread_group_t *thread_group) +{ + return (thread_group->active_thread_count > 4 && !thread_group->stalled); +} + + + +/* + Dequeue a work item. + + If it is not immediately available, thread will sleep until + work is available (it also can become IO listener for a while). +*/ +int get_event(worker_thread_t *current_thread, thread_group_t *thread_group, + pool_event_t **ev, struct timespec *ts) +{ + DBUG_ENTER("get_event"); + + pool_event_t *first_event = NULL; + int err=0; + + mysql_mutex_lock(&thread_group->mutex); + + decrement_active_threads(thread_group); + DBUG_ASSERT(thread_group->active_thread_count >= 0); + + do + { + if (thread_group->shutdown) + break; + + /* Check if queue is not empty */ + if (!too_many_threads(thread_group)) + { + first_event= queue_get(thread_group); + if(first_event) + break; + } + + /* If there is currently no listener in the group, become one. */ + if(!thread_group->listener) + { + thread_group->listener= current_thread; + mysql_mutex_unlock(&thread_group->mutex); + + first_event= listener(current_thread, thread_group); + + mysql_mutex_lock(&thread_group->mutex); + /* There is no listener anymore, it just returned. */ + thread_group->listener= NULL; + break; + } + + /* + Last thing we try before going to sleep is to + pick a single event via epoll, without waiting (timeout 0) + */ + if (!too_many_threads(thread_group)) + { + native_event nev; + if (io_poll_wait(thread_group->pollfd,&nev,1, 0) == 1) + { + thread_group->io_event_count++; + first_event = (pool_event_t *)native_event_get_userdata(&nev); + break; + } + } + + + /* And now, finally sleep */ + current_thread->woken = false; /* wake() sets this to true */ + + /* + Add current thread to the head of the waiting list and wait. + It is important to add thread to the head rather than tail + as it ensures LIFO wakeup order (hot caches, working inactivity timeout) + */ + SLIST_INSERT_HEAD(&thread_group->waiting_threads, current_thread, ptr); + + if(ts) + err = mysql_cond_timedwait(¤t_thread->cond, &thread_group->mutex, ts); + else + err = mysql_cond_wait(¤t_thread->cond, &thread_group->mutex); + + if (!current_thread->woken) + { + /* + Thread was not signalled by wake(), it might be a spurious wakeup or + a timeout. Anyhow, we need to remove ourselves from the list now. + If thread was explicitly woken, than caller removed us from the list. + */ + SLIST_REMOVE(&thread_group->waiting_threads, current_thread, worker_thread_t, ptr); + } + + if(err) + break; + + } + while(true); + + thread_group->stalled= false; + increment_active_threads(thread_group); + mysql_mutex_unlock(&thread_group->mutex); + + + if (first_event) + *ev = first_event; + else + *ev = &POOL_SHUTDOWN_EVENT; + + DBUG_RETURN(err); +} + + + +/* + Tells the pool that thread starts waiting on IO, lock, condition, + sleep() or similar. + + Will wake another worker, and if there is no listener will + promote a listener, +*/ +void wait_begin(thread_group_t *thread_group) +{ + DBUG_ENTER("wait_begin"); + mysql_mutex_lock(&thread_group->mutex); + decrement_active_threads(thread_group); + DBUG_ASSERT(thread_group->active_thread_count >=0); + DBUG_ASSERT(thread_group->connection_count > 0); + + if((thread_group->active_thread_count == 0) && + (!queue_is_empty(thread_group) || !thread_group->listener)) + { + wake_or_create_thread(thread_group); + } + + mysql_mutex_unlock(&thread_group->mutex); + DBUG_VOID_RETURN; +} + +/* + Tells the pool current thread finished waiting. +*/ +void wait_end(thread_group_t *thread_group) +{ + DBUG_ENTER("wait_end"); + mysql_mutex_lock(&thread_group->mutex); + increment_active_threads(thread_group); + mysql_mutex_unlock(&thread_group->mutex); + DBUG_VOID_RETURN; +} + + +/* Scheduler */ +connection_t *alloc_connection(THD *thd) +{ + DBUG_ENTER("alloc_connection"); + + connection_t* connection = (connection_t *)my_malloc(sizeof(connection_t),0); + if (connection) + { + connection->thd = thd; + connection->waiting= false; + connection->logged_in= false; + connection->abs_wait_timeout= ULONGLONG_MAX; + } + DBUG_RETURN(connection); +} + + + +/* + Add a new connection to thread pool.. +*/ +void tp_add_connection(THD *thd) +{ + DBUG_ENTER("tp_add_connection"); + + threads.append(thd); + mysql_mutex_unlock(&LOCK_thread_count); + connection_t *c= alloc_connection(thd); + if(c) + { + c->thread_group= &all_groups[c->thd->thread_id%threadpool_size]; + mysql_mutex_lock(&c->thread_group->mutex); + c->thread_group->connection_count++; + mysql_mutex_unlock(&c->thread_group->mutex); + c->thd->scheduler.data = c; + post_event(c->thread_group,&c->event); + } + + DBUG_VOID_RETURN; +} + + +static void connection_abort(connection_t *c) +{ + DBUG_ENTER("connection_abort"); + mysql_mutex_lock(&c->thread_group->mutex); + c->thread_group->connection_count--; + mysql_mutex_unlock(&c->thread_group->mutex); + + threadpool_remove_connection(c->thd); + my_free(c); + DBUG_VOID_RETURN; +} + +void tp_post_kill_notification(THD *thd) +{ + DBUG_ENTER("tp_post_kill_notification"); + if (current_thd == thd || thd->system_thread) + DBUG_VOID_RETURN; + + if (thd->net.vio) + vio_shutdown(thd->net.vio, SHUT_RD); + DBUG_VOID_RETURN; +} + +void tp_wait_begin(THD *thd, int type) +{ + DBUG_ENTER("tp_wait_begin"); + + if (!thd) + DBUG_VOID_RETURN; + + connection_t *connection = (connection_t *)thd->scheduler.data; + if(connection) + { + DBUG_ASSERT(!connection->waiting); + connection->waiting= true; + wait_begin(connection->thread_group); + } + DBUG_VOID_RETURN; +} + + +void tp_wait_end(THD *thd) +{ + DBUG_ENTER("tp_wait_end"); + if (!thd) + DBUG_VOID_RETURN; + + connection_t *connection = (connection_t *)thd->scheduler.data; + if(connection) + { + DBUG_ASSERT(connection->waiting); + connection->waiting = false; + wait_end(connection->thread_group); + } + DBUG_VOID_RETURN; +} + + +static void set_next_timeout_check(ulonglong abstime) +{ + DBUG_ENTER("set_next_timeout_check"); + while(abstime < pool_timer.next_timeout_check) + { + longlong old= (longlong)pool_timer.next_timeout_check; + my_atomic_cas64((volatile int64*)&pool_timer.next_timeout_check, + &old, abstime); + } + DBUG_VOID_RETURN; +} + +static void set_wait_timeout(connection_t *c) +{ + DBUG_ENTER("set_wait_timeout"); + /* + Calculate wait deadline for this connection. + Instead of using my_micro_time() which has a syscall + overhead, use pool_timer.current_microtime and take + into account that its value could be off by at most + one tick interval. + */ + + c->abs_wait_timeout= pool_timer.current_microtime + + 1000LL*pool_timer.tick_interval + + 1000000LL*c->thd->variables.net_wait_timeout; + + set_next_timeout_check(c->abs_wait_timeout); + DBUG_VOID_RETURN; +} + +static void handle_event(pool_event_t *ev) +{ + + DBUG_ENTER("handle_event"); + + /* Normal case, handle query on connection */ + connection_t *c = (connection_t*)(void *)ev; + bool do_login = (!c->logged_in); + int ret; + + if (do_login) + { + ret= threadpool_add_connection(c->thd); + c->logged_in= true; + } + else + { + ret= threadpool_process_request(c->thd); + } + + if(!ret) + { + set_wait_timeout(c); + int fd = c->thd->net.vio->sd; + if (do_login) + { + ret= io_poll_associate_fd(c->thread_group->pollfd, fd, c); + } + else + ret= io_poll_start_read(c->thread_group->pollfd, fd, c); + } + + if (ret) + { + connection_abort(c); + } + + DBUG_VOID_RETURN; +} + + +static void *worker_main(void *param) +{ + + worker_thread_t this_thread; + + thread_created++; + pthread_detach_this_thread(); + my_thread_init(); + DBUG_ENTER("worker_main"); + + thread_group_t *thread_group = (thread_group_t *)param; + + /* Init per-thread structure */ + mysql_cond_init(key_worker_cond, &this_thread.cond, NULL); + this_thread.thread_group= thread_group; + this_thread.event_count=0; + + mysql_mutex_lock(&thread_group->mutex); + tp_stats.num_worker_threads++; + thread_group->thread_count++; + thread_group->active_thread_count++; + thread_group->pending_thread_start_count--; + mysql_mutex_unlock(&thread_group->mutex); + + /* Run event loop */ + for(;;) + { + struct pool_event_t *ev; + struct timespec ts; + set_timespec(ts,threadpool_idle_timeout); + if (get_event(&this_thread, thread_group, &ev, &ts) + || ev == &POOL_SHUTDOWN_EVENT) + { + break; + } + this_thread.event_count++; + handle_event(ev); + } + + /* Thread shutdown: cleanup per-worker-thread structure. */ + mysql_cond_destroy(&this_thread.cond); + + mysql_mutex_lock(&thread_group->mutex); + thread_group->active_thread_count--; + thread_group->thread_count--; + tp_stats.num_worker_threads--; + mysql_mutex_unlock(&thread_group->mutex); + + /* If it is the last thread in pool and pool is terminating, destroy pool.*/ + if (thread_group->shutdown && (thread_group->thread_count == 0)) + { + /* last thread existing, cleanup the pool structure */ + mysql_mutex_destroy(&thread_group->mutex); + } + DBUG_POP(); + my_thread_end(); + return NULL; +} + + +static bool started=false; + +bool tp_init() +{ + DBUG_ENTER("tp_init"); + started = true; + scheduler_init(); + skip_net_wait_timeout= 1; + if (threadpool_size == 0) + { + threadpool_size= my_getncpus(); + } + + for(uint i=0; i < threadpool_size; i++) + { + thread_group_init(&all_groups[i], get_connection_attrib()); + } + + #define PSI_register(X) \ + if(PSI_server) PSI_server->register_ ## X("threadpool", X ## _list, array_elements(X ## _list)) + + PSI_register(mutex); + PSI_register(cond); + PSI_register(thread); + + pool_timer.tick_interval= threadpool_stall_limit; + start_timer(&pool_timer); + DBUG_RETURN(0); +} + +void tp_end() +{ + DBUG_ENTER("tp_end"); + + if (!started) + DBUG_VOID_RETURN; + + stop_timer(&pool_timer); + for(uint i=0; i< threadpool_size; i++) + { + thread_group_close(&all_groups[i]); + } + DBUG_VOID_RETURN; +} diff --git a/sql/threadpool_win.cc b/sql/threadpool_win.cc new file mode 100644 index 00000000000..4e46e393c24 --- /dev/null +++ b/sql/threadpool_win.cc @@ -0,0 +1,756 @@ +#ifdef _WIN32_WINNT +#undef _WIN32_WINNT +#endif + +#define _WIN32_WINNT 0x0601 + +#include <my_global.h> +#include <violite.h> +#include <sql_priv.h> +#include <sql_class.h> +#include <my_pthread.h> +#include <scheduler.h> +#include <sql_connect.h> +#include <mysqld.h> +#include <debug_sync.h> +#include <threadpool.h> +#include <windows.h> + + + +TP_STATISTICS tp_stats; + +#define WEAK_SYMBOL(return_type, function, ...) \ + typedef return_type (WINAPI *pFN_##function)(__VA_ARGS__); \ + static pFN_##function my_##function = (pFN_##function) \ + (GetProcAddress(GetModuleHandle("kernel32"),#function)) + +WEAK_SYMBOL(VOID, CancelThreadpoolIo, PTP_IO); +#define CancelThreadpoolIo my_CancelThreadpoolIo + +WEAK_SYMBOL(VOID, CloseThreadpool, PTP_POOL); +#define CloseThreadpool my_CloseThreadpool + +WEAK_SYMBOL(VOID, CloseThreadpoolIo, PTP_IO); +#define CloseThreadpoolIo my_CloseThreadpoolIo + +WEAK_SYMBOL(VOID, CloseThreadpoolTimer,PTP_TIMER); +#define CloseThreadpoolTimer my_CloseThreadpoolTimer + +WEAK_SYMBOL(VOID, CloseThreadpoolWait,PTP_WAIT); +#define CloseThreadpoolWait my_CloseThreadpoolWait + +WEAK_SYMBOL(PTP_POOL, CreateThreadpool,PVOID); +#define CreateThreadpool my_CreateThreadpool + +WEAK_SYMBOL(PTP_IO, CreateThreadpoolIo, HANDLE, PTP_WIN32_IO_CALLBACK, PVOID , + PTP_CALLBACK_ENVIRON); +#define CreateThreadpoolIo my_CreateThreadpoolIo + +WEAK_SYMBOL(PTP_TIMER, CreateThreadpoolTimer, PTP_TIMER_CALLBACK , + PVOID pv, PTP_CALLBACK_ENVIRON pcbe); +#define CreateThreadpoolTimer my_CreateThreadpoolTimer + +WEAK_SYMBOL(PTP_WAIT, CreateThreadpoolWait, PTP_WAIT_CALLBACK, PVOID, + PTP_CALLBACK_ENVIRON); +#define CreateThreadpoolWait my_CreateThreadpoolWait + +WEAK_SYMBOL(VOID, DisassociateCurrentThreadFromCallback, PTP_CALLBACK_INSTANCE); +#define DisassociateCurrentThreadFromCallback my_DisassociateCurrentThreadFromCallback + +WEAK_SYMBOL(DWORD, FlsAlloc, PFLS_CALLBACK_FUNCTION); +#define FlsAlloc my_FlsAlloc + +WEAK_SYMBOL(PVOID, FlsGetValue, DWORD); +#define FlsGetValue my_FlsGetValue + +WEAK_SYMBOL(BOOL, FlsSetValue, DWORD, PVOID); +#define FlsSetValue my_FlsSetValue + +WEAK_SYMBOL(VOID, SetThreadpoolThreadMaximum, PTP_POOL, DWORD); +#define SetThreadpoolThreadMaximum my_SetThreadpoolThreadMaximum + +WEAK_SYMBOL(BOOL, SetThreadpoolThreadMinimum, PTP_POOL, DWORD); +#define SetThreadpoolThreadMinimum my_SetThreadpoolThreadMinimum + +WEAK_SYMBOL(VOID, SetThreadpoolTimer, PTP_TIMER, PFILETIME,DWORD,DWORD); +#define SetThreadpoolTimer my_SetThreadpoolTimer + +WEAK_SYMBOL(VOID, SetThreadpoolWait, PTP_WAIT,HANDLE,PFILETIME); +#define SetThreadpoolWait my_SetThreadpoolWait + +WEAK_SYMBOL(VOID, StartThreadpoolIo, PTP_IO); +#define StartThreadpoolIo my_StartThreadpoolIo + +WEAK_SYMBOL(VOID, WaitForThreadpoolIoCallbacks,PTP_IO, BOOL); +#define WaitForThreadpoolIoCallbacks my_WaitForThreadpoolIoCallbacks + +WEAK_SYMBOL(VOID, WaitForThreadpoolTimerCallbacks, PTP_TIMER, BOOL); +#define WaitForThreadpoolTimerCallbacks my_WaitForThreadpoolTimerCallbacks + +WEAK_SYMBOL(VOID, WaitForThreadpoolWaitCallbacks, PTP_WAIT, BOOL); +#define WaitForThreadpoolWaitCallbacks my_WaitForThreadpoolWaitCallbacks + +WEAK_SYMBOL(BOOL, SetFileCompletionNotificationModes, HANDLE, UCHAR); +#define SetFileCompletionNotificationModes my_SetFileCompletionNotificationModes + +WEAK_SYMBOL(BOOL, TrySubmitThreadpoolCallback, PTP_SIMPLE_CALLBACK pfns, + PVOID pv,PTP_CALLBACK_ENVIRON pcbe); +#define TrySubmitThreadpoolCallback my_TrySubmitThreadpoolCallback + +WEAK_SYMBOL(PTP_WORK, CreateThreadpoolWork, PTP_WORK_CALLBACK pfnwk, PVOID pv, + PTP_CALLBACK_ENVIRON pcbe); +#define CreateThreadpoolWork my_CreateThreadpoolWork + +WEAK_SYMBOL(VOID, SubmitThreadpoolWork,PTP_WORK pwk); +#define SubmitThreadpoolWork my_SubmitThreadpoolWork + +WEAK_SYMBOL(VOID, CloseThreadpoolWork, PTP_WORK pwk); +#define CloseThreadpoolWork my_CloseThreadpoolWork + +#if _MSC_VER >= 1600 +/* Stack size manipulation available only on Win7+ /declarations in VS10 */ +WEAK_SYMBOL(BOOL, SetThreadpoolStackInformation, PTP_POOL, + PTP_POOL_STACK_INFORMATION); +#define SetThreadpoolStackInformation my_SetThreadpoolStackInformation +#endif + +#if _MSC_VER < 1600 +#define SetThreadpoolCallbackPriority(env,prio) +typedef enum _TP_CALLBACK_PRIORITY { + TP_CALLBACK_PRIORITY_HIGH, + TP_CALLBACK_PRIORITY_NORMAL, + TP_CALLBACK_PRIORITY_LOW, + TP_CALLBACK_PRIORITY_INVALID +} TP_CALLBACK_PRIORITY; +#endif + + +/* Log a warning */ +static void tp_log_warning(const char *msg, const char *fct) +{ + sql_print_warning("Threadpool: %s. %s failed (last error %d)",msg, fct, + GetLastError()); +} + + +PTP_POOL pool; +DWORD fls; +extern int skip_net_wait_timeout; + +static bool skip_completion_port_on_success = false; + +/* + Threadpool callbacks. + + io_completion_callback - handle client request + timer_callback - handle wait timeout (kill connection) + shm_read_callback, shm_close_callback - shared memory stuff + login_callback - user login (submitted as threadpool work) + +*/ + +static void CALLBACK timer_callback(PTP_CALLBACK_INSTANCE instance, + PVOID context, PTP_TIMER timer); + +static void CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance, + PVOID context, PVOID overlapped, ULONG io_result, ULONG_PTR nbytes, PTP_IO io); + +static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance, + PVOID Context, PTP_WAIT wait,TP_WAIT_RESULT wait_result); + +static void CALLBACK shm_close_callback(PTP_CALLBACK_INSTANCE instance, + PVOID Context, PTP_WAIT wait,TP_WAIT_RESULT wait_result); + +#define CONNECTION_SIGNATURE 0xAFFEAFFE + +static void check_thread_init(); + +/* Get current time as Windows time */ +static ulonglong now() +{ + ulonglong current_time; + GetSystemTimeAsFileTime((PFILETIME)¤t_time); + return current_time; +} + +/* + Connection structure, encapsulates THD + structures for asynchronous + IO and pool. +*/ + +struct connection_t +{ + THD *thd; + bool logged_in; + HANDLE handle; + OVERLAPPED overlapped; + + /* absolute time for wait timeout (as Windows time) */ + volatile ulonglong timeout; + + PTP_CLEANUP_GROUP cleanup_group; + TP_CALLBACK_ENVIRON callback_environ; + + PTP_IO io; + PTP_TIMER timer; + PTP_WAIT shm_read; +}; + +void init_connection(connection_t *connection) +{ + connection->logged_in = false; + connection->handle= 0; + connection->io= 0; + connection->shm_read= 0; + connection->timer= 0; + connection->logged_in = false; + connection->timeout= ULONGLONG_MAX; + memset(&connection->overlapped, 0, sizeof(OVERLAPPED)); + InitializeThreadpoolEnvironment(&connection->callback_environ); + SetThreadpoolCallbackPool(&connection->callback_environ, pool); + connection->thd = 0; +} + +int init_io(connection_t *connection, THD *thd) +{ + connection->thd= thd; + Vio *vio = thd->net.vio; + switch(vio->type) + { + case VIO_TYPE_SSL: + case VIO_TYPE_TCPIP: + connection->handle= (HANDLE)vio->sd; + break; + case VIO_TYPE_NAMEDPIPE: + connection->handle= (HANDLE)vio->hPipe; + break; + case VIO_TYPE_SHARED_MEMORY: + connection->shm_read= CreateThreadpoolWait(shm_read_callback, connection, + &connection->callback_environ); + if (!connection->shm_read) + { + tp_log_warning("Allocation failed", "CreateThreadpoolWait"); + return -1; + } + break; + default: + abort(); + } + + if (connection->handle) + { + /* Performance tweaks (s. MSDN documentation)*/ + UCHAR flags = FILE_SKIP_SET_EVENT_ON_HANDLE; + if (skip_completion_port_on_success) + { + flags |= FILE_SKIP_COMPLETION_PORT_ON_SUCCESS; + } + (void)SetFileCompletionNotificationModes(connection->handle, flags); + + /* Assign io completion callback */ + connection->io = CreateThreadpoolIo(connection->handle, + io_completion_callback, connection, &connection->callback_environ); + if(!connection->io) + { + tp_log_warning("Allocation failed", "CreateThreadpoolWait"); + return -1; + } + } + connection->timer = CreateThreadpoolTimer(timer_callback, connection, + &connection->callback_environ); + if (!connection->timer) + { + tp_log_warning("Allocation failed", "CreateThreadpoolWait"); + return -1; + } + + return 0; +} + + +/* + Start asynchronous read +*/ +int start_io(connection_t *connection, PTP_CALLBACK_INSTANCE instance) +{ + /* Start async read */ + DWORD num_bytes = 0; + static char c; + WSABUF buf; + buf.buf= &c; + buf.len= 0; + DWORD flags=0; + DWORD last_error= 0; + + int retval; + Vio *vio= connection->thd->net.vio; + + if (vio->type == VIO_TYPE_SHARED_MEMORY) + { + SetThreadpoolWait(connection->shm_read, vio->event_server_wrote, NULL); + return 0; + } + if (vio->type == VIO_CLOSED) + { + return -1; + } + + DBUG_ASSERT(vio->type == VIO_TYPE_TCPIP || + vio->type == VIO_TYPE_SSL || + vio->type == VIO_TYPE_NAMEDPIPE); + + OVERLAPPED *overlapped= &connection->overlapped; + PTP_IO io= connection->io; + StartThreadpoolIo(io); + + if (vio->type == VIO_TYPE_TCPIP || vio->type == VIO_TYPE_SSL) + { + /* Start async io (sockets). */ + if (WSARecv(vio->sd , &buf, 1, &num_bytes, &flags, + overlapped, NULL) == 0) + { + retval= last_error= 0; + } + else + { + retval= -1; + last_error= WSAGetLastError(); + } + } + else + { + /* Start async io (named pipe) */ + if (ReadFile(vio->hPipe, &c, 0, &num_bytes ,overlapped)) + { + retval= last_error= 0; + } + else + { + retval= -1; + last_error= GetLastError(); + } + } + + if (retval == 0 || last_error == ERROR_MORE_DATA) + { + /* + IO successfully finished (synchronously). + If skip_completion_port_on_success is set, we need to handle it right + here, because completion callback would not be executed by the pool. + */ + if(skip_completion_port_on_success) + { + CancelThreadpoolIo(io); + io_completion_callback(instance, connection, overlapped, last_error, + num_bytes, io); + } + return 0; + } + + if(last_error == ERROR_IO_PENDING) + { + return 0; + } + + /* Some error occured */ + CancelThreadpoolIo(io); + return -1; +} + +int login(connection_t *connection, PTP_CALLBACK_INSTANCE instance) +{ + if (threadpool_add_connection(connection->thd) == 0 + && init_io(connection, connection->thd) == 0 + && start_io(connection, instance) == 0) + { + return 0; + } + return -1; +} + +/* + Recalculate wait timeout, maybe reset timer. +*/ +void set_wait_timeout(connection_t *connection, ulonglong old_timeout) +{ + ulonglong new_timeout = now() + + 10000000LL*connection->thd->variables.net_wait_timeout; + + if (new_timeout < old_timeout) + { + SetThreadpoolTimer(connection->timer, (PFILETIME) &new_timeout, 0, 1000); + } + connection->timeout = new_timeout; +} + +/* + Terminates (idle) connection by closing the socket. + This will activate io_completion_callback() in a different thread +*/ +void post_kill_notification(connection_t *connection) +{ + check_thread_init(); + THD *thd=connection->thd; + mysql_mutex_lock(&thd->LOCK_thd_data); + thd->killed = KILL_CONNECTION; + vio_shutdown(thd->net.vio, SHUT_RDWR); + thd->mysys_var= NULL; + mysql_mutex_unlock(&thd->LOCK_thd_data); +} + + +/* Connection destructor */ +void destroy_connection(connection_t *connection) +{ + if (connection->thd) + { + threadpool_remove_connection(connection->thd); + } + + if (connection->io) + { + WaitForThreadpoolIoCallbacks(connection->io, TRUE); + CloseThreadpoolIo(connection->io); + } + + if(connection->shm_read) + { + WaitForThreadpoolWaitCallbacks(connection->shm_read, TRUE); + CloseThreadpoolWait(connection->shm_read); + } + + if(connection->timer) + { + SetThreadpoolTimer(connection->timer, 0, 0, 0); + WaitForThreadpoolTimerCallbacks(connection->timer, TRUE); + CloseThreadpoolTimer(connection->timer); + } + + DestroyThreadpoolEnvironment(&connection->callback_environ); +} + + + +/* + This function should be called first whenever a callback is invoked in the + threadpool, does my_thread_init() if not yet done +*/ +extern ulong thread_created; +static void check_thread_init() +{ + if (FlsGetValue(fls) == NULL) + { + FlsSetValue(fls, (void *)1); + my_thread_init(); + thread_created++; + InterlockedIncrement((volatile long *)&tp_stats.num_worker_threads); + } +} + + +/* + Take care of proper cleanup when threadpool threads exit. + We do not control how threads are created, thus it is our responsibility to + check that my_thread_init() is called on thread initialization and + my_thread_end() on thread destruction. On Windows, FlsAlloc() provides the + thread destruction callbacks. +*/ +static VOID WINAPI thread_destructor(void *data) +{ + if(data) + { + if (InterlockedDecrement((volatile long *)&tp_stats.num_worker_threads) >= 0) + { + /* + The above check for number of thread >= 0 is due to shutdown code ( + see tp_end()) where we forcefully set num_worker_threads to 0, even + if not all threads have shut down yet to the point they would ran Fls + destructors, even after CloseThreadpool(). See also comment in tp_end(). + */ + mysql_mutex_lock(&LOCK_thread_count); + my_thread_end(); + mysql_mutex_unlock(&LOCK_thread_count); + } + } +} + + +/* Scheduler callback : init */ +bool tp_init(void) +{ + fls= FlsAlloc(thread_destructor); + pool= CreateThreadpool(NULL); + if(!pool) + { + sql_print_error("Can't create threadpool. " + "CreateThreadpool() failed with %d. Likely cause is memory pressure", + GetLastError()); + exit(1); + } + + if (threadpool_max_threads) + { + SetThreadpoolThreadMaximum(pool,threadpool_max_threads); + } + + if (threadpool_min_threads) + { + if (!SetThreadpoolThreadMinimum(pool, threadpool_min_threads)) + { + tp_log_warning( "Can't set threadpool minimum threads", + "SetThreadpoolThreadMinimum"); + } + } + + /* + Control stack size (OS must be Win7 or later, plus corresponding SDK) + */ +#if _MSC_VER >=1600 + if (SetThreadpoolStackInformation) + { + TP_POOL_STACK_INFORMATION stackinfo; + stackinfo.StackCommit = 0; + stackinfo.StackReserve = my_thread_stack_size; + if (!SetThreadpoolStackInformation(pool, &stackinfo)) + { + tp_log_warning("Can't set threadpool stack size", + "SetThreadpoolStackInformation"); + } + } +#endif + + skip_net_wait_timeout = 1; + return 0; +} + + +/* + Scheduler callback : Destroy the scheduler. +*/ + +extern "C" uint THR_thread_count; +extern "C" mysql_mutex_t THR_LOCK_threads; +extern "C" mysql_cond_t THR_COND_threads; + +void tp_end(void) +{ + if(pool) + { + SetThreadpoolThreadMaximum(pool, 0); + CloseThreadpool(pool); + + /* + Tell my_global_thread_end() we're complete. + + This would not be necessary if CloseThreadpool() would synchronously + release all threads and wait until they disappear and call all their FLS + destrructors . However, threads in the pool are released asynchronously + and might spend some time in the CRT shutdown code. Thus zero + num_worker_threads, to avoid thread destructor's my_thread_end()s after + this point. + */ + LONG remaining_threads= + InterlockedExchange( (volatile long *)&tp_stats.num_worker_threads, 0); + + if (remaining_threads) + { + mysql_mutex_lock(&THR_LOCK_threads); + THR_thread_count -= remaining_threads; + mysql_cond_signal(&THR_COND_threads); + mysql_mutex_unlock(&THR_LOCK_threads); + } + } + skip_net_wait_timeout= 0; +} + +/* + Notify pool about connection being killed. +*/ +void tp_post_kill_notification(THD *thd) +{ + if (current_thd == thd) + return; /* There is nothing to do.*/ + + if (thd->system_thread) + return; /* Will crash if we attempt to kill system thread. */ + + Vio *vio= thd->net.vio; + + vio_shutdown(vio, SD_BOTH); + +} + +/* + Handle read completion/notification. +*/ +static VOID CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance, + PVOID context, PVOID overlapped, ULONG io_result, ULONG_PTR nbytes, PTP_IO io) +{ + if(instance) + { + check_thread_init(); + } + + connection_t *connection = (connection_t*)context; + THD *thd= connection->thd; + ulonglong old_timeout = connection->timeout; + connection->timeout = ULONGLONG_MAX; + + if (threadpool_process_request(connection->thd)) + goto error; + + set_wait_timeout(connection, old_timeout); + if(start_io(connection, instance)) + goto error; + + return; + +error: + /* Some error has occured. */ + if (instance) + DisassociateCurrentThreadFromCallback(instance); + + destroy_connection(connection); + my_free(connection); +} + + +/* Simple callback for login */ +static void CALLBACK login_callback(PTP_CALLBACK_INSTANCE instance, + PVOID context, PTP_WORK work) +{ + if(instance) + { + check_thread_init(); + } + + connection_t *connection =(connection_t *)context; + if (login(connection, instance) != 0) + { + destroy_connection(connection); + my_free(connection); + } +} + +/* + Timer callback. + Invoked when connection times out (wait_timeout) +*/ +static VOID CALLBACK timer_callback(PTP_CALLBACK_INSTANCE instance, + PVOID parameter, PTP_TIMER timer) +{ + check_thread_init(); + + connection_t *con= (connection_t*)parameter; + ulonglong timeout= con->timeout; + + if (timeout <= now()) + { + con->thd->killed = KILL_CONNECTION; + if(con->thd->net.vio) + vio_shutdown(con->thd->net.vio, SD_BOTH); + } + else if(timeout != ULONGLONG_MAX) + { + /* + Reset timer. + There is a tiny possibility of a race condition, since the value of timeout + could have changed to smaller value in the thread doing io callback. + + Given the relative unimportance of the wait timeout, we accept race + condition. + */ + SetThreadpoolTimer(timer, (PFILETIME)&timeout, 0, 1000); + } +} + + +/* + Shared memory read callback. + Invoked when read event is set on connection. +*/ +static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance, + PVOID context, PTP_WAIT wait,TP_WAIT_RESULT wait_result) +{ + connection_t *con= (connection_t *)context; + /* Disarm wait. */ + SetThreadpoolWait(wait, NULL, NULL); + + /* + This is an autoreset event, and one wakeup is eaten already by threadpool, + and the current state is "not set". Thus we need to reset the event again, + or vio_read will hang. + */ + HANDLE h = con->thd->net.vio->event_server_wrote; + SetEvent(h); + io_completion_callback(instance, context, NULL, 0, 0 , 0); +} + + +/* + Notify the thread pool about a new connection. + NOTE: LOCK_thread_count is locked on entry. This function must unlock it. +*/ +void tp_add_connection(THD *thd) +{ + bool success = false; + connection_t *con = (connection_t *)my_malloc(sizeof(connection_t), 0); + + if (con) + threads.append(thd); + mysql_mutex_unlock(&LOCK_thread_count); + + if(!con) + { + tp_log_warning("Allocation failed", "tp_add_connection"); + return; + } + + init_connection(con); + con->thd= thd; + /* Try to login asynchronously, using threads in the pool */ + PTP_WORK wrk = CreateThreadpoolWork(login_callback,con, &con->callback_environ); + if (wrk) + { + SubmitThreadpoolWork(wrk); + CloseThreadpoolWork(wrk); + } + else + { + /* Likely memory pressure */ + login_callback(NULL, con, NULL); /* deletes connection if something goes wrong */ + } +} + + + +/* + Sets the number of idle threads the thread pool maintains in anticipation of new + requests. +*/ +void tp_set_min_threads(uint val) +{ + if (pool) + SetThreadpoolThreadMinimum(pool, val); +} + +void tp_set_max_threads(uint val) +{ + if (pool) + SetThreadpoolThreadMaximum(pool, val); +} + +void tp_wait_begin(THD *thd, int type) +{ + if (thd && thd->event_scheduler.data) + { + /* TODO: call CallbackMayRunLong() */ + } +} + +void tp_wait_end(THD *thd) +{ + /* Do we need to do anything ? */ +} + diff --git a/vio/vio.c b/vio/vio.c index b8bc7bdae08..aa0d2012afa 100644 --- a/vio/vio.c +++ b/vio/vio.c @@ -49,6 +49,25 @@ static my_bool has_no_data(Vio *vio __attribute__((unused))) return FALSE; } +#ifdef _WIN32 +my_bool vio_shared_memory_has_data(Vio *vio) +{ + return (vio->shared_memory_remain > 0); +} + +int vio_shared_memory_shutdown(Vio *vio, int how) +{ + SetEvent(vio->event_conn_closed); + SetEvent(vio->event_server_wrote); + return 0; +} + +int vio_pipe_shutdown(Vio *vio, int how) +{ + return vio_socket_shutdown(vio, how); /* cancels io */ +} +#endif + /* * Helper to fill most of the Vio* with defaults. */ @@ -89,6 +108,7 @@ static void vio_init(Vio* vio, enum enum_vio_type type, vio->poll_read =no_poll_read; vio->is_connected =vio_is_connected_pipe; vio->has_data =has_no_data; + vio->shutdown =vio_pipe_shutdown; vio->timeout=vio_win32_timeout; /* Set default timeout */ @@ -116,7 +136,8 @@ static void vio_init(Vio* vio, enum enum_vio_type type, vio->poll_read =no_poll_read; vio->is_connected =vio_is_connected_shared_memory; - vio->has_data =has_no_data; + vio->has_data =vio_shared_memory_has_data; + vio->shutdown =vio_shared_memory_shutdown; /* Currently, shared memory is on Windows only, hence the below is ok*/ vio->timeout= vio_win32_timeout; @@ -145,6 +166,7 @@ static void vio_init(Vio* vio, enum enum_vio_type type, vio->poll_read =vio_poll_read; vio->is_connected =vio_is_connected; vio->has_data =vio_ssl_has_data; + vio->shutdown =vio_socket_shutdown; DBUG_VOID_RETURN; } #endif /* HAVE_OPENSSL */ @@ -163,6 +185,7 @@ static void vio_init(Vio* vio, enum enum_vio_type type, vio->timeout =vio_timeout; vio->poll_read =vio_poll_read; vio->is_connected =vio_is_connected; + vio->shutdown =vio_socket_shutdown; vio->has_data= (flags & VIO_BUFFERED_READ) ? vio_buff_has_data : has_no_data; DBUG_VOID_RETURN; diff --git a/vio/vio_priv.h b/vio/vio_priv.h index 702ba4de38a..3f62c508375 100644 --- a/vio/vio_priv.h +++ b/vio/vio_priv.h @@ -39,6 +39,7 @@ size_t vio_read_pipe(Vio *vio, uchar * buf, size_t size); size_t vio_write_pipe(Vio *vio, const uchar * buf, size_t size); my_bool vio_is_connected_pipe(Vio *vio); int vio_close_pipe(Vio * vio); +int vio_shutdown_pipe(Vio *vio,int how); #endif #ifdef HAVE_SMEM @@ -46,8 +47,11 @@ size_t vio_read_shared_memory(Vio *vio, uchar * buf, size_t size); size_t vio_write_shared_memory(Vio *vio, const uchar * buf, size_t size); my_bool vio_is_connected_shared_memory(Vio *vio); int vio_close_shared_memory(Vio * vio); +my_bool vio_shared_memory_has_data(Vio *vio); +int vio_shutdown_shared_memory(Vio *vio, int how); #endif +int vio_socket_shutdown(Vio *vio, int how); void vio_timeout(Vio *vio,uint which, uint timeout); my_bool vio_buff_has_data(Vio *vio); diff --git a/vio/viosocket.c b/vio/viosocket.c index 4772847abd8..1f129cb8e55 100644 --- a/vio/viosocket.c +++ b/vio/viosocket.c @@ -131,6 +131,60 @@ size_t vio_write(Vio * vio, const uchar* buf, size_t size) DBUG_RETURN(r); } +#ifdef _WIN32 +static void CALLBACK cancel_io_apc(ULONG_PTR data) +{ + CancelIo((HANDLE)data); +} + +/* + Cancel IO on Windows. + + On XP, issue CancelIo as asynchronous procedure call to the thread that started + IO. On Vista+, simpler cancelation is done with CancelIoEx. +*/ + +static int cancel_io(HANDLE handle, DWORD thread_id) +{ + static BOOL (WINAPI *fp_CancelIoEx) (HANDLE, OVERLAPPED *); + static volatile int first_time= 1; + int rc; + HANDLE thread_handle; + + if (first_time) + { + /* Try to load CancelIoEx using GetProcAddress */ + InterlockedCompareExchangePointer((volatile void *)&fp_CancelIoEx, + GetProcAddress(GetModuleHandle("kernel32"), "CancelIoEx"), NULL); + first_time =0; + } + + if (fp_CancelIoEx) + { + return fp_CancelIoEx(handle, NULL)? 0 :-1; + } + + thread_handle= OpenThread(THREAD_SET_CONTEXT, FALSE, thread_id); + if (thread_handle) + { + rc= QueueUserAPC(cancel_io_apc, thread_handle, (ULONG_PTR)handle); + CloseHandle(thread_handle); + } + return rc; + +} +#endif + +int vio_socket_shutdown(Vio *vio, int how) +{ +#ifdef _WIN32 + return cancel_io((HANDLE)vio->sd, vio->thread_id); +#else + return shutdown(vio->sd, how); +#endif +} + + int vio_blocking(Vio * vio __attribute__((unused)), my_bool set_blocking_mode, my_bool *old_mode) { @@ -726,6 +780,22 @@ void vio_timeout(Vio *vio, uint which, uint timeout) #ifdef __WIN__ +/* + Disable posting IO completion event to the port. + In some cases (synchronous timed IO) we want to skip IOCP notifications. +*/ +static void disable_iocp_notification(OVERLAPPED *overlapped) +{ + HANDLE *handle = &(overlapped->hEvent); + *handle = ((HANDLE)((ULONG_PTR) *handle|1)); +} + +/* Enable posting IO completion event to the port */ +static void enable_iocp_notification(OVERLAPPED *overlapped) +{ + HANDLE *handle = &(overlapped->hEvent); + *handle = (HANDLE)((ULONG_PTR) *handle & ~1); +} /* Finish pending IO on pipe. Honor wait timeout @@ -737,7 +807,7 @@ static size_t pipe_complete_io(Vio* vio, char* buf, size_t size, DWORD timeout_m DBUG_ENTER("pipe_complete_io"); - ret= WaitForSingleObject(vio->pipe_overlapped.hEvent, timeout_ms); + ret= WaitForSingleObjectEx(vio->pipe_overlapped.hEvent, timeout_ms, TRUE); /* WaitForSingleObjects will normally return WAIT_OBJECT_O (success, IO completed) or WAIT_TIMEOUT. @@ -767,7 +837,8 @@ size_t vio_read_pipe(Vio * vio, uchar *buf, size_t size) DBUG_ENTER("vio_read_pipe"); DBUG_PRINT("enter", ("sd: %d buf: 0x%lx size: %u", vio->sd, (long) buf, (uint) size)); - + + disable_iocp_notification(&vio->pipe_overlapped); if (ReadFile(vio->hPipe, buf, (DWORD)size, &bytes_read, &(vio->pipe_overlapped))) { @@ -777,13 +848,14 @@ size_t vio_read_pipe(Vio * vio, uchar *buf, size_t size) { if (GetLastError() != ERROR_IO_PENDING) { + enable_iocp_notification(&vio->pipe_overlapped); DBUG_PRINT("error",("ReadFile() returned last error %d", GetLastError())); DBUG_RETURN((size_t)-1); } retval= pipe_complete_io(vio, buf, size,vio->read_timeout_ms); } - + enable_iocp_notification(&vio->pipe_overlapped); DBUG_PRINT("exit", ("%lld", (longlong)retval)); DBUG_RETURN(retval); } @@ -796,7 +868,7 @@ size_t vio_write_pipe(Vio * vio, const uchar* buf, size_t size) DBUG_ENTER("vio_write_pipe"); DBUG_PRINT("enter", ("sd: %d buf: 0x%lx size: %u", vio->sd, (long) buf, (uint) size)); - + disable_iocp_notification(&vio->pipe_overlapped); if (WriteFile(vio->hPipe, buf, (DWORD)size, &bytes_written, &(vio->pipe_overlapped))) { @@ -804,6 +876,7 @@ size_t vio_write_pipe(Vio * vio, const uchar* buf, size_t size) } else { + enable_iocp_notification(&vio->pipe_overlapped); if (GetLastError() != ERROR_IO_PENDING) { DBUG_PRINT("vio_error",("WriteFile() returned last error %d", @@ -812,7 +885,7 @@ size_t vio_write_pipe(Vio * vio, const uchar* buf, size_t size) } retval= pipe_complete_io(vio, (char *)buf, size, vio->write_timeout_ms); } - + enable_iocp_notification(&vio->pipe_overlapped); DBUG_PRINT("exit", ("%lld", (longlong)retval)); DBUG_RETURN(retval); } |