diff options
-rw-r--r-- | sql/threadpool.h | 32 | ||||
-rw-r--r-- | sql/threadpool_common.cc | 79 | ||||
-rw-r--r-- | sql/threadpool_unix.cc | 663 |
3 files changed, 454 insertions, 320 deletions
diff --git a/sql/threadpool.h b/sql/threadpool.h index 966dcbc18e0..8c991aab2cb 100644 --- a/sql/threadpool.h +++ b/sql/threadpool.h @@ -9,30 +9,28 @@ extern uint threadpool_stall_limit; /* time interval in 10 ms units for stall c extern uint threadpool_max_threads; /* Maximum threads in pool */ extern uint threadpool_oversubscribe; /* Maximum active threads in group */ + +/* + Functions used by scheduler. + OS-specific implementations are in + threadpool_unix.cc or threadpool_win.cc +*/ +extern bool tp_init(); +extern void tp_add_connection(THD*); +extern void tp_wait_begin(THD *, int); +extern void tp_wait_end(THD*); +extern void tp_post_kill_notification(THD *thd); +extern void tp_end(void); + /* Threadpool statistics */ struct TP_STATISTICS { /* Current number of worker thread. */ - volatile int num_worker_threads; + volatile int32 num_worker_threads; /* Current number of idle threads. */ - volatile int num_waiting_threads; - /* Number of login requests are queued but not yet processed. */ - volatile int pending_login_requests; - /* Number of threads that are starting. */ - volatile int pending_thread_starts; - /* Number of threads that are being shut down */ - volatile int pending_thread_shutdowns; - /* Time (in milliseconds) since pool is blocked (num_waiting_threads is 0) */ - ulonglong pool_block_duration; - /* Maximum duration of the pending login, im milliseconds. */ - ulonglong pending_login_duration; - /* Time since last thread was created */ - ulonglong time_since_last_thread_creation; - /* Number of requests processed since pool monitor run last time. */ - volatile int requests_dequeued; - volatile int requests_completed; + volatile int32 num_waiting_threads; }; extern TP_STATISTICS tp_stats; diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc index 2ed9e6f2ba6..5152f62efe9 100644 --- a/sql/threadpool_common.cc +++ b/sql/threadpool_common.cc @@ -7,15 +7,9 @@ #include <sql_connect.h> #include <sql_audit.h> #include <debug_sync.h> +#include <threadpool.h> -extern bool login_connection(THD *thd); -extern bool do_command(THD *thd); -extern void prepare_new_connection_state(THD* thd); -extern void end_connection(THD *thd); -extern void thd_cleanup(THD *thd); -extern void delete_thd(THD *thd); - /* Threadpool parameters */ uint threadpool_min_threads; @@ -27,14 +21,15 @@ uint threadpool_oversubscribe; extern "C" pthread_key(struct st_my_thread_var*, THR_KEY_mysys); +extern bool do_command(THD*); /* Worker threads contexts, and THD contexts. - ===================================== + ========================================= Both worker threads and connections have their sets of thread local variables - At the moment it is mysys_var (which has e.g dbug my_error and similar - goodies inside), and PSI per-client structure. + At the moment it is mysys_var (this has specific data for dbug, my_error and + similar goodies), and PSI per-client structure. Whenever query is executed following needs to be done: @@ -77,7 +72,7 @@ struct Worker_thread_context /* Attach/associate the connection with the OS thread, */ -static inline bool thread_attach(THD* thd) +static bool thread_attach(THD* thd) { pthread_setspecific(THR_KEY_mysys,thd->mysys_var); thd->thread_stack=(char*)&thd; @@ -95,11 +90,10 @@ int threadpool_add_connection(THD *thd) worker_context.save(); /* - Create a new connection context: mysys_thread_var and PSI thread - Store them in thd->mysys_var and thd->scheduler.m_psi. + Create a new connection context: mysys_thread_var and PSI thread + Store them in THD. */ - /* Use my_thread_init() to create new mysys_thread_var. */ pthread_setspecific(THR_KEY_mysys, 0); my_thread_init(); thd->mysys_var= (st_my_thread_var *)pthread_getspecific(THR_KEY_mysys); @@ -125,21 +119,29 @@ int threadpool_add_connection(THD *thd) thd->start_utime= now; thd->thr_create_utime= now; - if (setup_connection_thread_globals(thd) == 0) + if (!setup_connection_thread_globals(thd)) { - if (login_connection(thd) == 0) + if (!login_connection(thd)) { - prepare_new_connection_state(thd); - retval = thd_is_connection_alive(thd)?0:-1; - thd->net.reading_or_writing= 1; + prepare_new_connection_state(thd); + + /* + Check if THD is ok, as prepare_new_connection_state() + can fail, for example if init command failed. + */ + if (thd_is_connection_alive(thd)) + { + retval= 0; + thd->net.reading_or_writing= 1; + thd->skip_wait_timeout= true; + } } } - thd->skip_wait_timeout= true; - worker_context.restore(); return retval; } + void threadpool_remove_connection(THD *thd) { @@ -147,9 +149,7 @@ void threadpool_remove_connection(THD *thd) worker_context.save(); thread_attach(thd); - thd->killed= KILL_CONNECTION; - thd->net.reading_or_writing= 0; end_connection(thd); @@ -163,11 +163,13 @@ void threadpool_remove_connection(THD *thd) mysql_mutex_unlock(&LOCK_thread_count); mysql_cond_broadcast(&COND_thread_count); - /* Free resources (thread_var and PSI connection specific struct)*/ + /* + Free resources associated with this connection: + mysys thread_var and PSI thread. + */ my_thread_end(); worker_context.restore(); - } int threadpool_process_request(THD *thd) @@ -181,8 +183,8 @@ int threadpool_process_request(THD *thd) if (thd->killed >= KILL_CONNECTION) { /* - kill flag can be set have been killed by - timeout handler or by a KILL command + killed flag was set by timeout handler + or KILL command. Return error. */ worker_context.restore(); return 1; @@ -206,33 +208,18 @@ int threadpool_process_request(THD *thd) vio= thd->net.vio; if (!vio->has_data(vio)) { - /* - More info on this debug sync is in sql_parse.cc - */ + /* More info on this debug sync is in sql_parse.cc*/ DEBUG_SYNC(thd, "before_do_command_net_read"); + thd->net.reading_or_writing= 1; break; } - } - if (!retval) - thd->net.reading_or_writing= 1; + } worker_context.restore(); return retval; } -/* - Scheduler struct, individual functions are implemented - in threadpool_unix.cc or threadpool_win.cc -*/ - -extern bool tp_init(); -extern void tp_add_connection(THD*); -extern void tp_wait_begin(THD *, int); -extern void tp_wait_end(THD*); -extern void tp_post_kill_notification(THD *thd); -extern void tp_end(void); - static scheduler_functions tp_scheduler_functions= { 0, // max_threads @@ -255,7 +242,7 @@ void pool_of_threads_scheduler(struct scheduler_functions *func, uint *arg_connection_count) { *func = tp_scheduler_functions; - func->max_threads= *arg_max_connections + 1; + func->max_threads= threadpool_max_threads; func->max_connections= arg_max_connections; func->connection_count= arg_connection_count; scheduler_init(); diff --git a/sql/threadpool_unix.cc b/sql/threadpool_unix.cc index ec9f5a91d40..b6eeb5bcffd 100644 --- a/sql/threadpool_unix.cc +++ b/sql/threadpool_unix.cc @@ -7,9 +7,8 @@ #include <sql_connect.h> #include <mysqld.h> #include <debug_sync.h> -#include <sys/queue.h> #include <time.h> - +#include <sql_plist.h> #include <threadpool.h> #ifdef __linux__ #include <sys/epoll.h> @@ -25,6 +24,13 @@ typedef port_event_t native_event; #endif +/* + Define PSI Keys for performance schema. + We have a mutex per group, worker threads, condition per worker thread, + and timer thread with its own mutex and condition. +*/ + + static PSI_mutex_key key_group_mutex; static PSI_mutex_key key_timer_mutex; static PSI_mutex_info mutex_list[]= @@ -49,53 +55,73 @@ static PSI_thread_info thread_list[] = {&key_timer_thread, "timer_thread", PSI_FLAG_GLOBAL} }; +/* Macro to simplify performance schema registration */ +#define PSI_register(X) \ + if(PSI_server) PSI_server->register_ ## X("threadpool", X ## _list, array_elements(X ## _list)) + TP_STATISTICS tp_stats; - struct thread_group_t; /* Per-thread structure for workers */ struct worker_thread_t { + ulonglong event_count; /* number of request handled by this thread */ + thread_group_t* thread_group; + worker_thread_t *next_in_list; + worker_thread_t **prev_in_list; + mysql_cond_t cond; - bool woken; - thread_group_t* thread_group; - ulonglong event_count; /* Stats: number of executed requests */ - SLIST_ENTRY(worker_thread_t) ptr; + bool woken; }; -/* - Data associated with an io event (also can be sent with with explicit - post_event()) -*/ -struct pool_event_t +typedef I_P_List<worker_thread_t, I_P_List_adapter<worker_thread_t, + &worker_thread_t::next_in_list, + &worker_thread_t::prev_in_list> + > +worker_list_t; + +struct connection_t { - STAILQ_ENTRY (pool_event_t) next; - void *data; + + THD *thd; + thread_group_t *thread_group; + connection_t *next_in_queue; + connection_t **prev_in_queue; + ulonglong abs_wait_timeout; + bool logged_in; + bool waiting; }; -static pool_event_t POOL_SHUTDOWN_EVENT; + +typedef I_P_List<connection_t, + I_P_List_adapter<connection_t, + &connection_t::next_in_queue, + &connection_t::prev_in_queue>, + I_P_List_null_counter, + I_P_List_fast_push_back<connection_t> > +connection_queue_t; struct thread_group_t { - mysql_mutex_t mutex; - STAILQ_HEAD(queue_listhead, pool_event_t) queue; - SLIST_HEAD(wait_listhead, worker_thread_t) waiting_threads; + connection_queue_t queue; + worker_list_t waiting_threads; + worker_thread_t *listener; + pthread_attr_t *pthread_attr; int pollfd; int thread_count; int active_thread_count; int pending_thread_start_count; - int connection_count; + int connection_count; + /* Stats for the deadlock detection timer routine.*/ + int io_event_count; + int queue_event_count; + ulonglong last_thread_creation_time; + int shutdown_pipe[2]; bool shutdown; bool stalled; - int shutdown_pipe[2]; - worker_thread_t *listener; - pthread_attr_t *pthread_attr; - ulonglong last_thread_creation_time; - /* Stats for the deadlock detection timer routine.*/ - ulonglong io_event_count; - ulonglong queue_event_count; + } MY_ALIGNED(512); static thread_group_t all_groups[MAX_THREAD_GROUPS]; @@ -106,33 +132,21 @@ struct pool_timer_t { mysql_mutex_t mutex; mysql_cond_t cond; - int tick_interval; - volatile ulonglong current_microtime; - volatile ulonglong next_timeout_check; + volatile uint64 current_microtime; + volatile uint64 next_timeout_check; + int tick_interval; bool shutdown; }; static pool_timer_t pool_timer; -struct connection_t -{ - pool_event_t event; - THD *thd; - thread_group_t *thread_group; - ulonglong abs_wait_timeout; - bool logged_in; - bool waiting; -}; - /* Externals functions and variables we use */ extern void scheduler_init(); extern pthread_attr_t *get_connection_attrib(void); -extern int skip_net_wait_timeout; - -static void post_event(thread_group_t *thread_group, pool_event_t* ev); +static void queue_put(thread_group_t *thread_group, connection_t *connection); static int wake_thread(thread_group_t *thread_group); -static void handle_event(pool_event_t *ev); +static void handle_event(connection_t *connection); static int wake_or_create_thread(thread_group_t *thread_group); static int create_worker(thread_group_t *thread_group); static void *worker_main(void *param); @@ -357,33 +371,18 @@ static void* native_event_get_userdata(native_event *event) /* Dequeue element from a workqueue */ -static pool_event_t *queue_get(thread_group_t *thread_group) +static connection_t *queue_get(thread_group_t *thread_group) { DBUG_ENTER("queue_get"); - pool_event_t *ev= NULL; thread_group->queue_event_count++; - ev= STAILQ_FIRST(&thread_group->queue); - if (ev) + connection_t *c= thread_group->queue.front(); + if (c) { - STAILQ_REMOVE_HEAD(&thread_group->queue,next); + thread_group->queue.remove(c); } - DBUG_RETURN(ev); + DBUG_RETURN(c); } -/* Check if workqueue is empty. */ -static bool queue_is_empty(thread_group_t* thread_group) -{ - DBUG_ENTER("queue_is_empty"); - bool empty= (STAILQ_FIRST(&thread_group->queue) == NULL); - DBUG_RETURN(empty); -} - -static void queue_put(thread_group_t *thread_group, pool_event_t *event) -{ - DBUG_ENTER("queue_put"); - STAILQ_INSERT_TAIL(&thread_group->queue, event, next); - DBUG_VOID_RETURN; -} static void increment_active_threads(thread_group_t *thread_group) { @@ -418,10 +417,16 @@ static void timeout_check(pool_timer_t *timer) { if (thd->net.reading_or_writing != 1) continue; - + connection_t *connection= (connection_t *)thd->event_scheduler.data; if (!connection) - continue; + { + /* + Connection does not have scheduler data. This happens for example + if THD belongs to another scheduler, that is listening to extra_port. + */ + continue; + } if(connection->abs_wait_timeout < timer->current_microtime) { @@ -454,6 +459,7 @@ static void timeout_check(pool_timer_t *timer) Besides checking for stalls, timer thread is also responsible for terminating clients that have been idle for longer than wait_timeout seconds. */ + static void* timer_thread(void *param) { uint i; @@ -468,9 +474,10 @@ static void* timer_thread(void *param) for(;;) { struct timespec ts; + int err; set_timespec_nsec(ts,timer->tick_interval*1000000); mysql_mutex_lock(&timer->mutex); - int err = mysql_cond_timedwait(&timer->cond, &timer->mutex, &ts); + err= mysql_cond_timedwait(&timer->cond, &timer->mutex, &ts); if (timer->shutdown) { mysql_mutex_unlock(&timer->mutex); @@ -495,7 +502,6 @@ static void* timer_thread(void *param) } mysql_mutex_destroy(&timer->mutex); - DBUG_POP(); my_thread_end(); return NULL; } @@ -529,7 +535,7 @@ void check_stall(thread_group_t *thread_group) /* Check whether requests from the workqueue are being dequeued. */ - if (!queue_is_empty(thread_group) && !thread_group->queue_event_count) + if (!thread_group->queue.is_empty() && !thread_group->queue_event_count) { thread_group->stalled= true; wake_or_create_thread(thread_group); @@ -566,76 +572,141 @@ static void stop_timer(pool_timer_t *timer) #define MAX_EVENTS 1024 -/* - Poll for socket events and distribute them to worker threads. +/** + Poll for socket events and distribute them to worker threads In many case current thread will handle single event itself. + + @return a ready connection, or NULL on shutdown */ -static pool_event_t * listener(worker_thread_t *current_thread, +static connection_t * listener(worker_thread_t *current_thread, thread_group_t *thread_group) { DBUG_ENTER("listener"); + connection_t *retval= NULL; + + decrement_active_threads(thread_group); for(;;) { native_event ev[MAX_EVENTS]; int cnt; if (thread_group->shutdown) - { - DBUG_RETURN(&POOL_SHUTDOWN_EVENT); - } - do - { - cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, -1); - } - while(cnt <= 0 && errno == EINTR); - + break; + + cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, -1); + if (cnt <=0) { DBUG_ASSERT(thread_group->shutdown); - DBUG_RETURN(&POOL_SHUTDOWN_EVENT); + break; } - /* - Put events to queue, maybe wakeup workers. - If queue is currently empty, listener will return - so the current thread handles query itself, this avoids - wakeups and context switches. But if queue is not empty - this smells like a flood of queries, and the listener - stays. - */ mysql_mutex_lock(&thread_group->mutex); if (thread_group->shutdown) { mysql_mutex_unlock(&thread_group->mutex); - DBUG_RETURN(&POOL_SHUTDOWN_EVENT); + break; } - thread_group->io_event_count += cnt; - bool pick_event= queue_is_empty(thread_group); - - for(int i=(pick_event)?1:0; i < cnt ; i++) + thread_group->io_event_count += cnt; + + /* + We got some network events and need to make decisions : whether + listener hould handle events and whether or not any wake worker + threads so they can handle events. + + Q1 : Should listener handle an event itself, or put all events into + queue and let workers handle the events? + + Solution : + Generally, listener that handles events itself is preferable. We do not + want listener thread to change its state from waiting to running too + often, Since listener has just woken from poll, it better uses its time + slice and does some work. Besides, not handling events means they go to + the queue, and often to wake another worker must wake up to handle the + event. This is not good, as we want to avoid wakeups. + + The downside of listener that also handles queries is that we can + potentially leave thread group for long time not picking the new + network events. It is not a major problem, because this stall will be + detected sooner or later by the timer thread. Still, relying on timer + is not always good, because it may "tick" too slow (large timer_interval) + + We use following strategy to solve this problem - if queue was not empty + we suspect flood of network events and listener stays, Otherwise, it + handles a query. + + + Q2: If queue is not empty, how many workers to wake? + + Solution: + We generally try to keep one thread per group active (threads handling + queries are considered active, unless they stuck in inside some "wait") + Thus, we will wake only one worker, and only if there is not active + threads currently,and listener is not going to handle a query. When we + don't wake, we hope that currently active threads will finish fast and + handle the queue. If this does not happen, timer thread will detect stall + and wake a worker. + + NOTE: Currently nothing is done to detect or prevent long queuing times. + A solution (for the future) would be to give up "one active thread per group" + principle, if events stay in the queue for too long, and wake more workers. + + */ + + bool listener_picks_event= thread_group->queue.is_empty(); + + /* + If listener_picks_event is set, listener thread will handle first event, + and put the rest into the queue. If listener_pick_event is not set, all + events go to the queue. + */ + for(int i=(listener_picks_event)?1:0; i < cnt ; i++) { - pool_event_t *e= (pool_event_t *)native_event_get_userdata(&ev[i]); - queue_put(thread_group, e); + connection_t *c= (connection_t *)native_event_get_userdata(&ev[i]); + thread_group->queue.push_back(c); } - /* Wake at most one worker thread */ - if(thread_group->active_thread_count==0 && - /*!queue_is_empty(thread_group)*/ !pick_event) + + if(thread_group->active_thread_count==0 && !listener_picks_event) { + /* Wake one worker thread */ if(wake_thread(thread_group)) { - if(thread_group->thread_count == 1) + /* + Wake failed, groups has no idle threads. + Now check if the group has at least one worker. + */ + if(thread_group->thread_count == 1 && + thread_group->pending_thread_start_count == 0) + { + /* + Currently there is no worker thread in the group, as indicated by + thread_count == 1 (means listener is the only one thread in the + group). + + Rhe queue is not empty, and listener is not going to handle + events. In order to drain the queue, we create a worker here. + Alternatively, we could just rely on timer to detect stall, but + this would be an inefficient, pointless delay. + */ create_worker(thread_group); + } } } mysql_mutex_unlock(&thread_group->mutex); - if (pick_event) - DBUG_RETURN((pool_event_t *)(native_event_get_userdata(&ev[0]))); + if (listener_picks_event) + { + retval= (connection_t *)native_event_get_userdata(&ev[0]); + break; + } } + + increment_active_threads(thread_group); + DBUG_RETURN(retval); } @@ -674,7 +745,36 @@ static int create_worker(thread_group_t *thread_group) } -/* +/** + Calculate microseconds throttling delay for thread creation. + + The value depends on how many threads are already in the group: + small number of threads means no delay, the more threads the larger + the delay. + + The actual values were not calculated using any scientific methods. + They just look right, and behave well in practice. + + TODO: Should throttling depend on thread_pool_stall_limit? +*/ +static ulonglong microsecond_throttling_interval(thread_group_t *thread_group) +{ + int count= thread_group->thread_count; + + if (count < 4) + return 0; + + if (count < 8) + return 50*1000; + + if(count < 16) + return 100*1000; + + return 200*1000; +} + + +/** Wakes a worker thread, or creates a new one. Worker creation is throttled, so we avoid too many threads @@ -682,9 +782,6 @@ static int create_worker(thread_group_t *thread_group) */ static int wake_or_create_thread(thread_group_t *thread_group) { - ulonglong now; - ulonglong time_since_last_thread_created; - DBUG_ENTER("wake_or_create_thread"); if (wake_thread(thread_group) == 0) @@ -696,30 +793,25 @@ static int wake_or_create_thread(thread_group_t *thread_group) if (thread_group->thread_count > thread_group->connection_count) DBUG_RETURN(-1); - if (thread_group->thread_count < 4) - { - DBUG_RETURN(create_worker(thread_group)); - } - - now = microsecond_interval_timer(); - time_since_last_thread_created = - (now - thread_group->last_thread_creation_time)/1000; - + if (thread_group->active_thread_count == 0) { /* - We're better off creating a new thread here with no delay, as - others threads (at least 4) are all blocking and there was no sleeping - thread to wakeup. It smells like deadlock or very slowly executing - requests, e.g sleeps or user locks. + We're better off creating a new thread here with no delay, + either there is no workers at all, or they all are all blocking + and there was no sleeping thread to wakeup. It smells like deadlock + or very slowly executing requests, e.g sleeps or user locks. */ DBUG_RETURN(create_worker(thread_group)); } + ulonglong now = microsecond_interval_timer(); + ulonglong time_since_last_thread_created = + (now - thread_group->last_thread_creation_time); + /* Throttle thread creation. */ - if ((thread_group->thread_count < 8 && time_since_last_thread_created > 50) - || (thread_group->thread_count < 16 && time_since_last_thread_created > 100) - || (time_since_last_thread_created > 200)) + if (time_since_last_thread_created > + microsecond_throttling_interval(thread_group)) { DBUG_RETURN(create_worker(thread_group)); } @@ -729,62 +821,75 @@ static int wake_or_create_thread(thread_group_t *thread_group) -/* Initialize thread group */ int thread_group_init(thread_group_t *thread_group, pthread_attr_t* thread_attr) { DBUG_ENTER("thread_group_init"); - - memset(thread_group, 0, sizeof(thread_group_t)); thread_group->pthread_attr = thread_attr; mysql_mutex_init(key_group_mutex, &thread_group->mutex, NULL); - STAILQ_INIT(&thread_group->queue); - SLIST_INIT(&thread_group->waiting_threads); - - thread_group->pending_thread_start_count= 0; - thread_group->stalled= false; - - thread_group->pollfd= -1; + thread_group->pollfd=-1; + thread_group->shutdown_pipe[0]= -1; + thread_group->shutdown_pipe[1]= -1; DBUG_RETURN(0); } -/* - Wake single sleeping thread in pool. Optionally, tell this thread - to listen to socket io notification. +void thread_group_destroy(thread_group_t *thread_group) +{ + mysql_mutex_destroy(&thread_group->mutex); + if (thread_group->pollfd != -1) + { + close(thread_group->pollfd); + thread_group->pollfd= -1; + } + for(int i=0; i < 2; i++) + { + if(thread_group->shutdown_pipe[i] != -1) + { + close(thread_group->shutdown_pipe[i]); + thread_group->shutdown_pipe[i]= -1; + } + } +} + +/** + Wake sleeping thread from waiting list */ static int wake_thread(thread_group_t *thread_group) { DBUG_ENTER("wake_thread"); - worker_thread_t *thread = SLIST_FIRST(&thread_group->waiting_threads); + worker_thread_t *thread = thread_group->waiting_threads.front(); if(thread) { thread->woken= true; - SLIST_REMOVE_HEAD(&thread_group->waiting_threads, ptr); + thread_group->waiting_threads.remove(thread); if (mysql_cond_signal(&thread->cond)) - abort(); + abort(); DBUG_RETURN(0); } DBUG_RETURN(-1); /* no thread- missed wakeup*/ } -/* - Shutdown thread group. +/* + Initiate shutdown for thread group. + + The shutdown is asynchronous, we only care to wake all threads + in here, so they can finish. We do not wait here until threads + terminate, + + Final cleanup of the group (thread_group_destroy) will be done by + the last exiting threads. */ static void thread_group_close(thread_group_t *thread_group) { DBUG_ENTER("thread_group_close"); - - char c= 0; mysql_mutex_lock(&thread_group->mutex); if (thread_group->thread_count == 0 && thread_group->pending_thread_start_count == 0) { - if (thread_group->pollfd >= 0) - close(thread_group->pollfd); mysql_mutex_unlock(&thread_group->mutex); - mysql_mutex_destroy(&thread_group->mutex); + thread_group_destroy(thread_group); DBUG_VOID_RETURN; } @@ -795,40 +900,41 @@ static void thread_group_close(thread_group_t *thread_group) { DBUG_VOID_RETURN; } + + /* Wake listener */ if (io_poll_associate_fd(thread_group->pollfd, - thread_group->shutdown_pipe[0], &POOL_SHUTDOWN_EVENT)) + thread_group->shutdown_pipe[0], NULL)) { DBUG_VOID_RETURN; } - - /* Wake listener. */ + char c= 0; if (write(thread_group->shutdown_pipe[1], &c, 1) < 0) DBUG_VOID_RETURN; /* Wake all workers. */ - while(wake_thread(thread_group) == 0) {}; - mysql_mutex_unlock(&thread_group->mutex); + while(wake_thread(thread_group) == 0) + { + } -#if 0 - /* Wait until workers terminate */ - while(thread_group->thread_count) - usleep(1000); -#endif + mysql_mutex_unlock(&thread_group->mutex); DBUG_VOID_RETURN; } /* - Post a task to the workqueue, maybe wake a worker so - it picks the task. + Add work to the queue. Maybe wake a worker if they all sleep. + + Currently, this function is only used when new connections need to + perform login (this is done in worker threads). + */ -static void post_event(thread_group_t *thread_group, pool_event_t* ev) +static void queue_put(thread_group_t *thread_group, connection_t *connection) { - DBUG_ENTER("post_event"); + DBUG_ENTER("queue_put"); mysql_mutex_lock(&thread_group->mutex); - STAILQ_INSERT_TAIL(&thread_group->queue, ev, next); + thread_group->queue.push_back(connection); if (thread_group->active_thread_count == 0) { wake_or_create_thread(thread_group); @@ -850,23 +956,33 @@ static bool too_many_threads(thread_group_t *thread_group) -/* - Dequeue a work item. +/** + Retrieve a connection with pending event. + + Pending event in our case means that there is either a pending login request + (if connection is not yet logged in), or there are unread bytes on the socket. - If it is not immediately available, thread will sleep until - work is available (it also can become IO listener for a while). + If there are no pending events currently, thread will wait. If timeout specified + int abstime parameter passes, the function returns NULL. + + @param current_thread - current worker thread + @param thread_group - current thread group + @param abstime - absolute wait timeout + + @return + connection with pending event. NULL is returned if timeout has expired,or on shutdown. */ -int get_event(worker_thread_t *current_thread, thread_group_t *thread_group, - pool_event_t **ev, struct timespec *ts) +connection_t *get_event(worker_thread_t *current_thread, + thread_group_t *thread_group, struct timespec *abstime) { DBUG_ENTER("get_event"); - pool_event_t *first_event = NULL; + connection_t *connection = NULL; int err=0; mysql_mutex_lock(&thread_group->mutex); - decrement_active_threads(thread_group); + DBUG_ASSERT(thread_group->active_thread_count >= 0); do @@ -877,8 +993,8 @@ int get_event(worker_thread_t *current_thread, thread_group_t *thread_group, /* Check if queue is not empty */ if (!too_many_threads(thread_group)) { - first_event= queue_get(thread_group); - if(first_event) + connection = queue_get(thread_group); + if(connection) break; } @@ -888,7 +1004,7 @@ int get_event(worker_thread_t *current_thread, thread_group_t *thread_group, thread_group->listener= current_thread; mysql_mutex_unlock(&thread_group->mutex); - first_event= listener(current_thread, thread_group); + connection = listener(current_thread, thread_group); mysql_mutex_lock(&thread_group->mutex); /* There is no listener anymore, it just returned. */ @@ -906,12 +1022,11 @@ int get_event(worker_thread_t *current_thread, thread_group_t *thread_group, if (io_poll_wait(thread_group->pollfd,&nev,1, 0) == 1) { thread_group->io_event_count++; - first_event = (pool_event_t *)native_event_get_userdata(&nev); + connection = (connection_t *)native_event_get_userdata(&nev); break; } } - /* And now, finally sleep */ current_thread->woken = false; /* wake() sets this to true */ @@ -920,13 +1035,15 @@ int get_event(worker_thread_t *current_thread, thread_group_t *thread_group, It is important to add thread to the head rather than tail as it ensures LIFO wakeup order (hot caches, working inactivity timeout) */ - SLIST_INSERT_HEAD(&thread_group->waiting_threads, current_thread, ptr); - - if(ts) - err = mysql_cond_timedwait(¤t_thread->cond, &thread_group->mutex, ts); + thread_group->waiting_threads.push_front(current_thread); + + decrement_active_threads(thread_group); + if(abstime) + err = mysql_cond_timedwait(¤t_thread->cond, &thread_group->mutex, abstime); else err = mysql_cond_wait(¤t_thread->cond, &thread_group->mutex); - + increment_active_threads(thread_group); + if (!current_thread->woken) { /* @@ -934,7 +1051,7 @@ int get_event(worker_thread_t *current_thread, thread_group_t *thread_group, a timeout. Anyhow, we need to remove ourselves from the list now. If thread was explicitly woken, than caller removed us from the list. */ - SLIST_REMOVE(&thread_group->waiting_threads, current_thread, worker_thread_t, ptr); + thread_group->waiting_threads.remove(current_thread); } if(err) @@ -944,26 +1061,16 @@ int get_event(worker_thread_t *current_thread, thread_group_t *thread_group, while(true); thread_group->stalled= false; - increment_active_threads(thread_group); mysql_mutex_unlock(&thread_group->mutex); - - - if (first_event) - *ev = first_event; - else - *ev = &POOL_SHUTDOWN_EVENT; - DBUG_RETURN(err); + DBUG_RETURN(connection); } -/* - Tells the pool that thread starts waiting on IO, lock, condition, +/** + Tells the pool that worker starts waiting on IO, lock, condition, sleep() or similar. - - Will wake another worker, and if there is no listener will - promote a listener, */ void wait_begin(thread_group_t *thread_group) { @@ -974,8 +1081,12 @@ void wait_begin(thread_group_t *thread_group) DBUG_ASSERT(thread_group->connection_count > 0); if((thread_group->active_thread_count == 0) && - (!queue_is_empty(thread_group) || !thread_group->listener)) + (thread_group->queue.is_empty() || !thread_group->listener)) { + /* + Group might stall while this thread waits, thus wake + or create a worker to prevent stall. + */ wake_or_create_thread(thread_group); } @@ -983,9 +1094,10 @@ void wait_begin(thread_group_t *thread_group) DBUG_VOID_RETURN; } -/* - Tells the pool current thread finished waiting. +/** + Tells the pool has finished waiting. */ + void wait_end(thread_group_t *thread_group) { DBUG_ENTER("wait_end"); @@ -996,7 +1108,10 @@ void wait_end(thread_group_t *thread_group) } -/* Scheduler */ +/** + Allocate/initialize a new connection structure. +*/ + connection_t *alloc_connection(THD *thd) { DBUG_ENTER("alloc_connection"); @@ -1014,42 +1129,67 @@ connection_t *alloc_connection(THD *thd) -/* +/** Add a new connection to thread pool.. */ + void tp_add_connection(THD *thd) { DBUG_ENTER("tp_add_connection"); threads.append(thd); mysql_mutex_unlock(&LOCK_thread_count); - connection_t *c= alloc_connection(thd); - if(c) + connection_t *connection= alloc_connection(thd); + if(connection) { - c->thread_group= &all_groups[c->thd->thread_id%group_count]; - mysql_mutex_lock(&c->thread_group->mutex); - c->thread_group->connection_count++; - mysql_mutex_unlock(&c->thread_group->mutex); - c->thd->event_scheduler.data = c; - post_event(c->thread_group,&c->event); + mysql_mutex_lock(&thd->LOCK_thd_data); + thd->event_scheduler.data= connection; + mysql_mutex_unlock(&thd->LOCK_thd_data); + + /* Assign connection to a group. */ + thread_group_t *group= + &all_groups[connection->thd->thread_id%group_count]; + + connection->thread_group=group; + + mysql_mutex_lock(&group->mutex); + group->connection_count++; + mysql_mutex_unlock(&group->mutex); + + /* + Add connection to the work queue.Actual logon + will be done by a worker thread. + */ + queue_put(group, connection); } DBUG_VOID_RETURN; } -static void connection_abort(connection_t *c) +/** + Terminate connection. +*/ + +static void connection_abort(connection_t *connection) { DBUG_ENTER("connection_abort"); - mysql_mutex_lock(&c->thread_group->mutex); - c->thread_group->connection_count--; - mysql_mutex_unlock(&c->thread_group->mutex); + thread_group_t *group= connection->thread_group; + + mysql_mutex_lock(&group->mutex); + group->connection_count--; + mysql_mutex_unlock(&group->mutex); - threadpool_remove_connection(c->thd); - my_free(c); + threadpool_remove_connection(connection->thd); + my_free(connection); DBUG_VOID_RETURN; } + +/** + MySQL scheduler callback : kill connection +*/ + void tp_post_kill_notification(THD *thd) { DBUG_ENTER("tp_post_kill_notification"); @@ -1061,6 +1201,10 @@ void tp_post_kill_notification(THD *thd) DBUG_VOID_RETURN; } +/** + MySQL scheduler callback: wait begin +*/ + void tp_wait_begin(THD *thd, int type) { DBUG_ENTER("tp_wait_begin"); @@ -1079,6 +1223,10 @@ void tp_wait_begin(THD *thd, int type) } +/** + MySQL scheduler callback: wait end +*/ + void tp_wait_end(THD *thd) { DBUG_ENTER("tp_wait_end"); @@ -1095,7 +1243,7 @@ void tp_wait_end(THD *thd) DBUG_VOID_RETURN; } - + static void set_next_timeout_check(ulonglong abstime) { DBUG_ENTER("set_next_timeout_check"); @@ -1108,6 +1256,11 @@ static void set_next_timeout_check(ulonglong abstime) DBUG_VOID_RETURN; } + +/** + Set wait timeout for connection. +*/ + static void set_wait_timeout(connection_t *c) { DBUG_ENTER("set_wait_timeout"); @@ -1129,10 +1282,10 @@ static void set_wait_timeout(connection_t *c) -/* - Handle a (rare) special case,where connection needs to - migrate to a different group because group_count has changed - as a result of thread_pool_size setting. +/** + Handle a (rare) special case,where connection needs to + migrate to a different group because group_count has changed + after thread_pool_size setting. */ static int change_group(connection_t *c, thread_group_t *old_group, @@ -1162,9 +1315,9 @@ static int change_group(connection_t *c, } -static int start_io(connection_t *c) +static int start_io(connection_t *connection) { - int fd = c->thd->net.vio->sd; + int fd = connection->thd->net.vio->sd; /* Usually, connection will stay in the same group for the entire @@ -1176,78 +1329,79 @@ static int start_io(connection_t *c) So we recalculate in which group the connection should be, based on thread_id and current group count, and migrate if necessary. */ - thread_group_t *g = &all_groups[c->thd->thread_id%group_count]; + thread_group_t *group = + &all_groups[connection->thd->thread_id%group_count]; - if (g != c->thread_group) + if (group != connection->thread_group) { - if (!change_group(c, c->thread_group, g)) + if (!change_group(connection, connection->thread_group, group)) { - c->logged_in= true; - return io_poll_associate_fd(c->thread_group->pollfd, fd, c); + connection->logged_in= true; + return io_poll_associate_fd(group->pollfd, fd, connection); } else return -1; } - /* - Handle case where connection is not yet logged in, i.e - not associated with poll fd. + In case binding to a poll descriptor was not yet done, + (start_io called first time), do it now. */ - if(!c->logged_in) + if(!connection->logged_in) { - c->logged_in= true; - return io_poll_associate_fd(c->thread_group->pollfd, fd, c); + connection->logged_in= true; + return io_poll_associate_fd(group->pollfd, fd, connection); } - return io_poll_start_read(c->thread_group->pollfd, fd, c); + return io_poll_start_read(group->pollfd, fd, connection); } -static void handle_event(pool_event_t *ev) +static void handle_event(connection_t *connection) { DBUG_ENTER("handle_event"); - - /* Normal case, handle query on connection */ - connection_t *c = (connection_t*)(void *)ev; int ret; - if (!c->logged_in) + if (!connection->logged_in) { - ret= threadpool_add_connection(c->thd); + ret= threadpool_add_connection(connection->thd); } else { - ret= threadpool_process_request(c->thd); + ret= threadpool_process_request(connection->thd); } if(!ret) { - set_wait_timeout(c); - ret= start_io(c); + set_wait_timeout(connection); + ret= start_io(connection); } if (ret) { - connection_abort(c); + connection_abort(connection); } + DBUG_VOID_RETURN; } +/** + Worker thread's main +*/ static void *worker_main(void *param) { worker_thread_t this_thread; - - thread_created++; pthread_detach_this_thread(); my_thread_init(); + DBUG_ENTER("worker_main"); + thread_created++; thread_group_t *thread_group = (thread_group_t *)param; /* Init per-thread structure */ @@ -1265,16 +1419,16 @@ static void *worker_main(void *param) /* Run event loop */ for(;;) { - struct pool_event_t *ev; + connection_t *connection; struct timespec ts; set_timespec(ts,threadpool_idle_timeout); - if (get_event(&this_thread, thread_group, &ev, &ts) - || ev == &POOL_SHUTDOWN_EVENT) + connection = get_event(&this_thread, thread_group, &ts); + if (!connection) { break; } this_thread.event_count++; - handle_event(ev); + handle_event(connection); } /* Thread shutdown: cleanup per-worker-thread structure. */ @@ -1286,20 +1440,18 @@ static void *worker_main(void *param) mysql_mutex_unlock(&thread_group->mutex); my_atomic_add32(&tp_stats.num_worker_threads, -1); - /* If it is the last thread in pool and pool is terminating, destroy pool.*/ - if (thread_group->shutdown && (thread_group->thread_count == 0)) + /* If it is the last thread in group and pool is terminating, destroy group.*/ + if (thread_group->shutdown && thread_group->thread_count == 0 + && thread_group->pending_thread_start_count == 0) { - /* last thread existing, cleanup the pool structure */ - mysql_mutex_destroy(&thread_group->mutex); + thread_group_destroy(thread_group); } - DBUG_POP(); my_thread_end(); return NULL; } -static bool started=false; - +static bool started=false; bool tp_init() { DBUG_ENTER("tp_init"); @@ -1311,10 +1463,7 @@ bool tp_init() thread_group_init(&all_groups[i], get_connection_attrib()); } tp_set_threadpool_size(threadpool_size); - - #define PSI_register(X) \ - if(PSI_server) PSI_server->register_ ## X("threadpool", X ## _list, array_elements(X ## _list)) - + PSI_register(mutex); PSI_register(cond); PSI_register(thread); |