diff options
author | Vladislav Vaintroub <wlad@mariadb.com> | 2016-09-21 14:28:42 +0000 |
---|---|---|
committer | Vladislav Vaintroub <wlad@mariadb.com> | 2016-09-22 17:01:28 +0000 |
commit | f7a7c0c2fec3dcca331bb529f8314273360c72ae (patch) | |
tree | 2e04f4036bd7def676d85690e67e393ec0c41a8e /sql/threadpool_common.cc | |
parent | f32a5115584c9b33a2163df57830ad335cd2b3ab (diff) | |
download | mariadb-git-f7a7c0c2fec3dcca331bb529f8314273360c72ae.tar.gz |
MDEV-10297 Add priorization to threadpool
Also MDEV-10385 Threadpool refactoring
Diffstat (limited to 'sql/threadpool_common.cc')
-rw-r--r-- | sql/threadpool_common.cc | 214 |
1 files changed, 197 insertions, 17 deletions
diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc index d6c343dc04e..2308f4277d6 100644 --- a/sql/threadpool_common.cc +++ b/sql/threadpool_common.cc @@ -34,14 +34,25 @@ uint threadpool_max_size; uint threadpool_stall_limit; uint threadpool_max_threads; uint threadpool_oversubscribe; +uint threadpool_mode; +uint threadpool_prio_kickup_timer; /* Stats */ TP_STATISTICS tp_stats; +static void threadpool_remove_connection(THD *thd); +static int threadpool_process_request(THD *thd); +static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data); + extern "C" pthread_key(struct st_my_thread_var*, THR_KEY_mysys); extern bool do_command(THD*); +static inline TP_connection *get_TP_connection(THD *thd) +{ + return (TP_connection *)thd->event_scheduler.data; +} + /* Worker threads contexts, and THD contexts. ========================================= @@ -105,15 +116,81 @@ static void thread_attach(THD* thd) #endif } +/* + Determine connection priority , using current + transaction state and 'threadpool_priority' variable value. +*/ +static TP_PRIORITY get_priority(TP_connection *c) +{ + DBUG_ASSERT(c->thd == current_thd); + TP_PRIORITY prio= (TP_PRIORITY)c->thd->variables.threadpool_priority; + if (prio == TP_PRIORITY_AUTO) + { + return c->thd->transaction.is_active() ? TP_PRIORITY_HIGH : TP_PRIORITY_LOW; + } + return prio; +} -THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data) + +void tp_callback(TP_connection *c) { - THD *thd= NULL; - int error=1; + DBUG_ASSERT(c); Worker_thread_context worker_context; worker_context.save(); + THD *thd= c->thd; + + c->state = TP_STATE_RUNNING; + + if (!thd) + { + /* No THD, need to login first. */ + DBUG_ASSERT(c->connect); + thd= c->thd= threadpool_add_connection(c->connect, c); + if (!thd) + { + /* Bail out on connect error.*/ + goto error; + } + c->connect= 0; + } + else if (threadpool_process_request(thd)) + { + /* QUIT or an error occured. */ + goto error; + } + + /* Set priority */ + c->priority= get_priority(c); + + /* Read next command from client. */ + c->set_io_timeout(thd->variables.net_wait_timeout); + c->state= TP_STATE_IDLE; + if (c->start_io()) + goto error; + + worker_context.restore(); + return; + +error: + c->thd= 0; + delete c; + + if (thd) + { + threadpool_remove_connection(thd); + } + worker_context.restore(); +} + + +static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data) +{ + THD *thd= NULL; + int error=1; + + /* Create a new connection context: mysys_thread_var and PSI thread Store them in THD. @@ -137,7 +214,6 @@ THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data) #endif my_thread_end(); } - worker_context.restore(); return NULL; } delete connect; @@ -184,17 +260,14 @@ THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data) threadpool_remove_connection(thd); thd= NULL; } - worker_context.restore(); return thd; } -void threadpool_remove_connection(THD *thd) +static void threadpool_remove_connection(THD *thd) { - Worker_thread_context worker_context; - worker_context.save(); thread_attach(thd); - + thd->event_scheduler.data= 0; thd->net.reading_or_writing = 0; end_connection(thd); close_connection(thd, 0); @@ -206,19 +279,14 @@ void threadpool_remove_connection(THD *thd) mysys thread_var and PSI thread. */ my_thread_end(); - - worker_context.restore(); } /** Process a single client request or a single batch. */ -int threadpool_process_request(THD *thd) +static int threadpool_process_request(THD *thd) { int retval= 0; - Worker_thread_context worker_context; - worker_context.save(); - thread_attach(thd); if (thd->killed >= KILL_CONNECTION) @@ -268,7 +336,6 @@ int threadpool_process_request(THD *thd) } end: - worker_context.restore(); return retval; } @@ -286,6 +353,119 @@ static bool tp_end_thread(THD *, bool) return 0; } +static TP_pool *pool; + +static bool tp_init() +{ + +#ifdef _WIN32 + if (threadpool_mode == TP_MODE_WINDOWS) + pool= new (std::nothrow) TP_pool_win; + else + pool= new (std::nothrow) TP_pool_generic; +#else + pool= new (std::nothrow) TP_pool_generic; +#endif + if (!pool) + return true; + if (pool->init()) + { + delete pool; + pool= 0; + return true; + } + return false; +} + +static void tp_add_connection(CONNECT *connect) +{ + TP_connection *c= pool->new_connection(connect); + DBUG_EXECUTE_IF("simulate_failed_connection_1", delete c ; c= 0;); + if (c) + pool->add(c); + else + connect->close_and_delete(); +} + +int tp_get_idle_thread_count() +{ + return pool? pool->get_idle_thread_count(): 0; +} + +int tp_get_thread_count() +{ + return pool ? pool->get_thread_count() : 0; +} + +void tp_set_min_threads(uint val) +{ + if (pool) + pool->set_min_threads(val); +} + + +void tp_set_max_threads(uint val) +{ + if (pool) + pool->set_max_threads(val); +} + +void tp_set_threadpool_size(uint val) +{ + if (pool) + pool->set_pool_size(val); +} + + +void tp_set_threadpool_stall_limit(uint val) +{ + if (pool) + pool->set_stall_limit(val); +} + + +void tp_timeout_handler(TP_connection *c) +{ + if (c->state != TP_STATE_IDLE) + return; + THD *thd=c->thd; + mysql_mutex_lock(&thd->LOCK_thd_data); + thd->killed= KILL_CONNECTION; + c->priority= TP_PRIORITY_HIGH; + post_kill_notification(thd); + mysql_mutex_unlock(&thd->LOCK_thd_data); +} + + +static void tp_wait_begin(THD *thd, int type) +{ + TP_connection *c = get_TP_connection(thd); + if (c) + c->wait_begin(type); +} + + +static void tp_wait_end(THD *thd) +{ + TP_connection *c = get_TP_connection(thd); + if (c) + c->wait_end(); +} + + +static void tp_end() +{ + delete pool; +} + +static void tp_post_kill_notification(THD *thd) +{ + TP_connection *c= get_TP_connection(thd); + if (c) + c->priority= TP_PRIORITY_HIGH; + post_kill_notification(thd); +} + static scheduler_functions tp_scheduler_functions= { 0, // max_threads @@ -296,7 +476,7 @@ static scheduler_functions tp_scheduler_functions= tp_add_connection, // add_connection tp_wait_begin, // thd_wait_begin tp_wait_end, // thd_wait_end - post_kill_notification, // post_kill_notification + tp_post_kill_notification, // post kill notification tp_end_thread, // Dummy function tp_end // end }; |