summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/threadpool.h32
-rw-r--r--sql/threadpool_common.cc79
-rw-r--r--sql/threadpool_unix.cc663
3 files changed, 454 insertions, 320 deletions
diff --git a/sql/threadpool.h b/sql/threadpool.h
index 966dcbc18e0..8c991aab2cb 100644
--- a/sql/threadpool.h
+++ b/sql/threadpool.h
@@ -9,30 +9,28 @@ extern uint threadpool_stall_limit; /* time interval in 10 ms units for stall c
extern uint threadpool_max_threads; /* Maximum threads in pool */
extern uint threadpool_oversubscribe; /* Maximum active threads in group */
+
+/*
+ Functions used by scheduler.
+ OS-specific implementations are in
+ threadpool_unix.cc or threadpool_win.cc
+*/
+extern bool tp_init();
+extern void tp_add_connection(THD*);
+extern void tp_wait_begin(THD *, int);
+extern void tp_wait_end(THD*);
+extern void tp_post_kill_notification(THD *thd);
+extern void tp_end(void);
+
/*
Threadpool statistics
*/
struct TP_STATISTICS
{
/* Current number of worker thread. */
- volatile int num_worker_threads;
+ volatile int32 num_worker_threads;
/* Current number of idle threads. */
- volatile int num_waiting_threads;
- /* Number of login requests are queued but not yet processed. */
- volatile int pending_login_requests;
- /* Number of threads that are starting. */
- volatile int pending_thread_starts;
- /* Number of threads that are being shut down */
- volatile int pending_thread_shutdowns;
- /* Time (in milliseconds) since pool is blocked (num_waiting_threads is 0) */
- ulonglong pool_block_duration;
- /* Maximum duration of the pending login, im milliseconds. */
- ulonglong pending_login_duration;
- /* Time since last thread was created */
- ulonglong time_since_last_thread_creation;
- /* Number of requests processed since pool monitor run last time. */
- volatile int requests_dequeued;
- volatile int requests_completed;
+ volatile int32 num_waiting_threads;
};
extern TP_STATISTICS tp_stats;
diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc
index 2ed9e6f2ba6..5152f62efe9 100644
--- a/sql/threadpool_common.cc
+++ b/sql/threadpool_common.cc
@@ -7,15 +7,9 @@
#include <sql_connect.h>
#include <sql_audit.h>
#include <debug_sync.h>
+#include <threadpool.h>
-extern bool login_connection(THD *thd);
-extern bool do_command(THD *thd);
-extern void prepare_new_connection_state(THD* thd);
-extern void end_connection(THD *thd);
-extern void thd_cleanup(THD *thd);
-extern void delete_thd(THD *thd);
-
/* Threadpool parameters */
uint threadpool_min_threads;
@@ -27,14 +21,15 @@ uint threadpool_oversubscribe;
extern "C" pthread_key(struct st_my_thread_var*, THR_KEY_mysys);
+extern bool do_command(THD*);
/*
Worker threads contexts, and THD contexts.
- =====================================
+ =========================================
Both worker threads and connections have their sets of thread local variables
- At the moment it is mysys_var (which has e.g dbug my_error and similar
- goodies inside), and PSI per-client structure.
+ At the moment it is mysys_var (this has specific data for dbug, my_error and
+ similar goodies), and PSI per-client structure.
Whenever query is executed following needs to be done:
@@ -77,7 +72,7 @@ struct Worker_thread_context
/*
Attach/associate the connection with the OS thread,
*/
-static inline bool thread_attach(THD* thd)
+static bool thread_attach(THD* thd)
{
pthread_setspecific(THR_KEY_mysys,thd->mysys_var);
thd->thread_stack=(char*)&thd;
@@ -95,11 +90,10 @@ int threadpool_add_connection(THD *thd)
worker_context.save();
/*
- Create a new connection context: mysys_thread_var and PSI thread
- Store them in thd->mysys_var and thd->scheduler.m_psi.
+ Create a new connection context: mysys_thread_var and PSI thread
+ Store them in THD.
*/
- /* Use my_thread_init() to create new mysys_thread_var. */
pthread_setspecific(THR_KEY_mysys, 0);
my_thread_init();
thd->mysys_var= (st_my_thread_var *)pthread_getspecific(THR_KEY_mysys);
@@ -125,21 +119,29 @@ int threadpool_add_connection(THD *thd)
thd->start_utime= now;
thd->thr_create_utime= now;
- if (setup_connection_thread_globals(thd) == 0)
+ if (!setup_connection_thread_globals(thd))
{
- if (login_connection(thd) == 0)
+ if (!login_connection(thd))
{
- prepare_new_connection_state(thd);
- retval = thd_is_connection_alive(thd)?0:-1;
- thd->net.reading_or_writing= 1;
+ prepare_new_connection_state(thd);
+
+ /*
+ Check if THD is ok, as prepare_new_connection_state()
+ can fail, for example if init command failed.
+ */
+ if (thd_is_connection_alive(thd))
+ {
+ retval= 0;
+ thd->net.reading_or_writing= 1;
+ thd->skip_wait_timeout= true;
+ }
}
}
- thd->skip_wait_timeout= true;
-
worker_context.restore();
return retval;
}
+
void threadpool_remove_connection(THD *thd)
{
@@ -147,9 +149,7 @@ void threadpool_remove_connection(THD *thd)
worker_context.save();
thread_attach(thd);
-
thd->killed= KILL_CONNECTION;
-
thd->net.reading_or_writing= 0;
end_connection(thd);
@@ -163,11 +163,13 @@ void threadpool_remove_connection(THD *thd)
mysql_mutex_unlock(&LOCK_thread_count);
mysql_cond_broadcast(&COND_thread_count);
- /* Free resources (thread_var and PSI connection specific struct)*/
+ /*
+ Free resources associated with this connection:
+ mysys thread_var and PSI thread.
+ */
my_thread_end();
worker_context.restore();
-
}
int threadpool_process_request(THD *thd)
@@ -181,8 +183,8 @@ int threadpool_process_request(THD *thd)
if (thd->killed >= KILL_CONNECTION)
{
/*
- kill flag can be set have been killed by
- timeout handler or by a KILL command
+ killed flag was set by timeout handler
+ or KILL command. Return error.
*/
worker_context.restore();
return 1;
@@ -206,33 +208,18 @@ int threadpool_process_request(THD *thd)
vio= thd->net.vio;
if (!vio->has_data(vio))
{
- /*
- More info on this debug sync is in sql_parse.cc
- */
+ /* More info on this debug sync is in sql_parse.cc*/
DEBUG_SYNC(thd, "before_do_command_net_read");
+ thd->net.reading_or_writing= 1;
break;
}
- }
- if (!retval)
- thd->net.reading_or_writing= 1;
+ }
worker_context.restore();
return retval;
}
-/*
- Scheduler struct, individual functions are implemented
- in threadpool_unix.cc or threadpool_win.cc
-*/
-
-extern bool tp_init();
-extern void tp_add_connection(THD*);
-extern void tp_wait_begin(THD *, int);
-extern void tp_wait_end(THD*);
-extern void tp_post_kill_notification(THD *thd);
-extern void tp_end(void);
-
static scheduler_functions tp_scheduler_functions=
{
0, // max_threads
@@ -255,7 +242,7 @@ void pool_of_threads_scheduler(struct scheduler_functions *func,
uint *arg_connection_count)
{
*func = tp_scheduler_functions;
- func->max_threads= *arg_max_connections + 1;
+ func->max_threads= threadpool_max_threads;
func->max_connections= arg_max_connections;
func->connection_count= arg_connection_count;
scheduler_init();
diff --git a/sql/threadpool_unix.cc b/sql/threadpool_unix.cc
index ec9f5a91d40..b6eeb5bcffd 100644
--- a/sql/threadpool_unix.cc
+++ b/sql/threadpool_unix.cc
@@ -7,9 +7,8 @@
#include <sql_connect.h>
#include <mysqld.h>
#include <debug_sync.h>
-#include <sys/queue.h>
#include <time.h>
-
+#include <sql_plist.h>
#include <threadpool.h>
#ifdef __linux__
#include <sys/epoll.h>
@@ -25,6 +24,13 @@ typedef port_event_t native_event;
#endif
+/*
+ Define PSI Keys for performance schema.
+ We have a mutex per group, worker threads, condition per worker thread,
+ and timer thread with its own mutex and condition.
+*/
+
+
static PSI_mutex_key key_group_mutex;
static PSI_mutex_key key_timer_mutex;
static PSI_mutex_info mutex_list[]=
@@ -49,53 +55,73 @@ static PSI_thread_info thread_list[] =
{&key_timer_thread, "timer_thread", PSI_FLAG_GLOBAL}
};
+/* Macro to simplify performance schema registration */
+#define PSI_register(X) \
+ if(PSI_server) PSI_server->register_ ## X("threadpool", X ## _list, array_elements(X ## _list))
+
TP_STATISTICS tp_stats;
-
struct thread_group_t;
/* Per-thread structure for workers */
struct worker_thread_t
{
+ ulonglong event_count; /* number of request handled by this thread */
+ thread_group_t* thread_group;
+ worker_thread_t *next_in_list;
+ worker_thread_t **prev_in_list;
+
mysql_cond_t cond;
- bool woken;
- thread_group_t* thread_group;
- ulonglong event_count; /* Stats: number of executed requests */
- SLIST_ENTRY(worker_thread_t) ptr;
+ bool woken;
};
-/*
- Data associated with an io event (also can be sent with with explicit
- post_event())
-*/
-struct pool_event_t
+typedef I_P_List<worker_thread_t, I_P_List_adapter<worker_thread_t,
+ &worker_thread_t::next_in_list,
+ &worker_thread_t::prev_in_list>
+ >
+worker_list_t;
+
+struct connection_t
{
- STAILQ_ENTRY (pool_event_t) next;
- void *data;
+
+ THD *thd;
+ thread_group_t *thread_group;
+ connection_t *next_in_queue;
+ connection_t **prev_in_queue;
+ ulonglong abs_wait_timeout;
+ bool logged_in;
+ bool waiting;
};
-static pool_event_t POOL_SHUTDOWN_EVENT;
+
+typedef I_P_List<connection_t,
+ I_P_List_adapter<connection_t,
+ &connection_t::next_in_queue,
+ &connection_t::prev_in_queue>,
+ I_P_List_null_counter,
+ I_P_List_fast_push_back<connection_t> >
+connection_queue_t;
struct thread_group_t
{
-
mysql_mutex_t mutex;
- STAILQ_HEAD(queue_listhead, pool_event_t) queue;
- SLIST_HEAD(wait_listhead, worker_thread_t) waiting_threads;
+ connection_queue_t queue;
+ worker_list_t waiting_threads;
+ worker_thread_t *listener;
+ pthread_attr_t *pthread_attr;
int pollfd;
int thread_count;
int active_thread_count;
int pending_thread_start_count;
- int connection_count;
+ int connection_count;
+ /* Stats for the deadlock detection timer routine.*/
+ int io_event_count;
+ int queue_event_count;
+ ulonglong last_thread_creation_time;
+ int shutdown_pipe[2];
bool shutdown;
bool stalled;
- int shutdown_pipe[2];
- worker_thread_t *listener;
- pthread_attr_t *pthread_attr;
- ulonglong last_thread_creation_time;
- /* Stats for the deadlock detection timer routine.*/
- ulonglong io_event_count;
- ulonglong queue_event_count;
+
} MY_ALIGNED(512);
static thread_group_t all_groups[MAX_THREAD_GROUPS];
@@ -106,33 +132,21 @@ struct pool_timer_t
{
mysql_mutex_t mutex;
mysql_cond_t cond;
- int tick_interval;
- volatile ulonglong current_microtime;
- volatile ulonglong next_timeout_check;
+ volatile uint64 current_microtime;
+ volatile uint64 next_timeout_check;
+ int tick_interval;
bool shutdown;
};
static pool_timer_t pool_timer;
-struct connection_t
-{
- pool_event_t event;
- THD *thd;
- thread_group_t *thread_group;
- ulonglong abs_wait_timeout;
- bool logged_in;
- bool waiting;
-};
-
/* Externals functions and variables we use */
extern void scheduler_init();
extern pthread_attr_t *get_connection_attrib(void);
-extern int skip_net_wait_timeout;
-
-static void post_event(thread_group_t *thread_group, pool_event_t* ev);
+static void queue_put(thread_group_t *thread_group, connection_t *connection);
static int wake_thread(thread_group_t *thread_group);
-static void handle_event(pool_event_t *ev);
+static void handle_event(connection_t *connection);
static int wake_or_create_thread(thread_group_t *thread_group);
static int create_worker(thread_group_t *thread_group);
static void *worker_main(void *param);
@@ -357,33 +371,18 @@ static void* native_event_get_userdata(native_event *event)
/* Dequeue element from a workqueue */
-static pool_event_t *queue_get(thread_group_t *thread_group)
+static connection_t *queue_get(thread_group_t *thread_group)
{
DBUG_ENTER("queue_get");
- pool_event_t *ev= NULL;
thread_group->queue_event_count++;
- ev= STAILQ_FIRST(&thread_group->queue);
- if (ev)
+ connection_t *c= thread_group->queue.front();
+ if (c)
{
- STAILQ_REMOVE_HEAD(&thread_group->queue,next);
+ thread_group->queue.remove(c);
}
- DBUG_RETURN(ev);
+ DBUG_RETURN(c);
}
-/* Check if workqueue is empty. */
-static bool queue_is_empty(thread_group_t* thread_group)
-{
- DBUG_ENTER("queue_is_empty");
- bool empty= (STAILQ_FIRST(&thread_group->queue) == NULL);
- DBUG_RETURN(empty);
-}
-
-static void queue_put(thread_group_t *thread_group, pool_event_t *event)
-{
- DBUG_ENTER("queue_put");
- STAILQ_INSERT_TAIL(&thread_group->queue, event, next);
- DBUG_VOID_RETURN;
-}
static void increment_active_threads(thread_group_t *thread_group)
{
@@ -418,10 +417,16 @@ static void timeout_check(pool_timer_t *timer)
{
if (thd->net.reading_or_writing != 1)
continue;
-
+
connection_t *connection= (connection_t *)thd->event_scheduler.data;
if (!connection)
- continue;
+ {
+ /*
+ Connection does not have scheduler data. This happens for example
+ if THD belongs to another scheduler, that is listening to extra_port.
+ */
+ continue;
+ }
if(connection->abs_wait_timeout < timer->current_microtime)
{
@@ -454,6 +459,7 @@ static void timeout_check(pool_timer_t *timer)
Besides checking for stalls, timer thread is also responsible for terminating
clients that have been idle for longer than wait_timeout seconds.
*/
+
static void* timer_thread(void *param)
{
uint i;
@@ -468,9 +474,10 @@ static void* timer_thread(void *param)
for(;;)
{
struct timespec ts;
+ int err;
set_timespec_nsec(ts,timer->tick_interval*1000000);
mysql_mutex_lock(&timer->mutex);
- int err = mysql_cond_timedwait(&timer->cond, &timer->mutex, &ts);
+ err= mysql_cond_timedwait(&timer->cond, &timer->mutex, &ts);
if (timer->shutdown)
{
mysql_mutex_unlock(&timer->mutex);
@@ -495,7 +502,6 @@ static void* timer_thread(void *param)
}
mysql_mutex_destroy(&timer->mutex);
- DBUG_POP();
my_thread_end();
return NULL;
}
@@ -529,7 +535,7 @@ void check_stall(thread_group_t *thread_group)
/*
Check whether requests from the workqueue are being dequeued.
*/
- if (!queue_is_empty(thread_group) && !thread_group->queue_event_count)
+ if (!thread_group->queue.is_empty() && !thread_group->queue_event_count)
{
thread_group->stalled= true;
wake_or_create_thread(thread_group);
@@ -566,76 +572,141 @@ static void stop_timer(pool_timer_t *timer)
#define MAX_EVENTS 1024
-/*
- Poll for socket events and distribute them to worker threads.
+/**
+ Poll for socket events and distribute them to worker threads
In many case current thread will handle single event itself.
+
+ @return a ready connection, or NULL on shutdown
*/
-static pool_event_t * listener(worker_thread_t *current_thread,
+static connection_t * listener(worker_thread_t *current_thread,
thread_group_t *thread_group)
{
DBUG_ENTER("listener");
+ connection_t *retval= NULL;
+
+ decrement_active_threads(thread_group);
for(;;)
{
native_event ev[MAX_EVENTS];
int cnt;
if (thread_group->shutdown)
- {
- DBUG_RETURN(&POOL_SHUTDOWN_EVENT);
- }
- do
- {
- cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, -1);
- }
- while(cnt <= 0 && errno == EINTR);
-
+ break;
+
+ cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, -1);
+
if (cnt <=0)
{
DBUG_ASSERT(thread_group->shutdown);
- DBUG_RETURN(&POOL_SHUTDOWN_EVENT);
+ break;
}
- /*
- Put events to queue, maybe wakeup workers.
- If queue is currently empty, listener will return
- so the current thread handles query itself, this avoids
- wakeups and context switches. But if queue is not empty
- this smells like a flood of queries, and the listener
- stays.
- */
mysql_mutex_lock(&thread_group->mutex);
if (thread_group->shutdown)
{
mysql_mutex_unlock(&thread_group->mutex);
- DBUG_RETURN(&POOL_SHUTDOWN_EVENT);
+ break;
}
- thread_group->io_event_count += cnt;
- bool pick_event= queue_is_empty(thread_group);
-
- for(int i=(pick_event)?1:0; i < cnt ; i++)
+ thread_group->io_event_count += cnt;
+
+ /*
+ We got some network events and need to make decisions : whether
+ listener hould handle events and whether or not any wake worker
+ threads so they can handle events.
+
+ Q1 : Should listener handle an event itself, or put all events into
+ queue and let workers handle the events?
+
+ Solution :
+ Generally, listener that handles events itself is preferable. We do not
+ want listener thread to change its state from waiting to running too
+ often, Since listener has just woken from poll, it better uses its time
+ slice and does some work. Besides, not handling events means they go to
+ the queue, and often to wake another worker must wake up to handle the
+ event. This is not good, as we want to avoid wakeups.
+
+ The downside of listener that also handles queries is that we can
+ potentially leave thread group for long time not picking the new
+ network events. It is not a major problem, because this stall will be
+ detected sooner or later by the timer thread. Still, relying on timer
+ is not always good, because it may "tick" too slow (large timer_interval)
+
+ We use following strategy to solve this problem - if queue was not empty
+ we suspect flood of network events and listener stays, Otherwise, it
+ handles a query.
+
+
+ Q2: If queue is not empty, how many workers to wake?
+
+ Solution:
+ We generally try to keep one thread per group active (threads handling
+ queries are considered active, unless they stuck in inside some "wait")
+ Thus, we will wake only one worker, and only if there is not active
+ threads currently,and listener is not going to handle a query. When we
+ don't wake, we hope that currently active threads will finish fast and
+ handle the queue. If this does not happen, timer thread will detect stall
+ and wake a worker.
+
+ NOTE: Currently nothing is done to detect or prevent long queuing times.
+ A solution (for the future) would be to give up "one active thread per group"
+ principle, if events stay in the queue for too long, and wake more workers.
+
+ */
+
+ bool listener_picks_event= thread_group->queue.is_empty();
+
+ /*
+ If listener_picks_event is set, listener thread will handle first event,
+ and put the rest into the queue. If listener_pick_event is not set, all
+ events go to the queue.
+ */
+ for(int i=(listener_picks_event)?1:0; i < cnt ; i++)
{
- pool_event_t *e= (pool_event_t *)native_event_get_userdata(&ev[i]);
- queue_put(thread_group, e);
+ connection_t *c= (connection_t *)native_event_get_userdata(&ev[i]);
+ thread_group->queue.push_back(c);
}
- /* Wake at most one worker thread */
- if(thread_group->active_thread_count==0 &&
- /*!queue_is_empty(thread_group)*/ !pick_event)
+
+ if(thread_group->active_thread_count==0 && !listener_picks_event)
{
+ /* Wake one worker thread */
if(wake_thread(thread_group))
{
- if(thread_group->thread_count == 1)
+ /*
+ Wake failed, groups has no idle threads.
+ Now check if the group has at least one worker.
+ */
+ if(thread_group->thread_count == 1 &&
+ thread_group->pending_thread_start_count == 0)
+ {
+ /*
+ Currently there is no worker thread in the group, as indicated by
+ thread_count == 1 (means listener is the only one thread in the
+ group).
+
+ Rhe queue is not empty, and listener is not going to handle
+ events. In order to drain the queue, we create a worker here.
+ Alternatively, we could just rely on timer to detect stall, but
+ this would be an inefficient, pointless delay.
+ */
create_worker(thread_group);
+ }
}
}
mysql_mutex_unlock(&thread_group->mutex);
- if (pick_event)
- DBUG_RETURN((pool_event_t *)(native_event_get_userdata(&ev[0])));
+ if (listener_picks_event)
+ {
+ retval= (connection_t *)native_event_get_userdata(&ev[0]);
+ break;
+ }
}
+
+ increment_active_threads(thread_group);
+ DBUG_RETURN(retval);
}
@@ -674,7 +745,36 @@ static int create_worker(thread_group_t *thread_group)
}
-/*
+/**
+ Calculate microseconds throttling delay for thread creation.
+
+ The value depends on how many threads are already in the group:
+ small number of threads means no delay, the more threads the larger
+ the delay.
+
+ The actual values were not calculated using any scientific methods.
+ They just look right, and behave well in practice.
+
+ TODO: Should throttling depend on thread_pool_stall_limit?
+*/
+static ulonglong microsecond_throttling_interval(thread_group_t *thread_group)
+{
+ int count= thread_group->thread_count;
+
+ if (count < 4)
+ return 0;
+
+ if (count < 8)
+ return 50*1000;
+
+ if(count < 16)
+ return 100*1000;
+
+ return 200*1000;
+}
+
+
+/**
Wakes a worker thread, or creates a new one.
Worker creation is throttled, so we avoid too many threads
@@ -682,9 +782,6 @@ static int create_worker(thread_group_t *thread_group)
*/
static int wake_or_create_thread(thread_group_t *thread_group)
{
- ulonglong now;
- ulonglong time_since_last_thread_created;
-
DBUG_ENTER("wake_or_create_thread");
if (wake_thread(thread_group) == 0)
@@ -696,30 +793,25 @@ static int wake_or_create_thread(thread_group_t *thread_group)
if (thread_group->thread_count > thread_group->connection_count)
DBUG_RETURN(-1);
- if (thread_group->thread_count < 4)
- {
- DBUG_RETURN(create_worker(thread_group));
- }
-
- now = microsecond_interval_timer();
- time_since_last_thread_created =
- (now - thread_group->last_thread_creation_time)/1000;
-
+
if (thread_group->active_thread_count == 0)
{
/*
- We're better off creating a new thread here with no delay, as
- others threads (at least 4) are all blocking and there was no sleeping
- thread to wakeup. It smells like deadlock or very slowly executing
- requests, e.g sleeps or user locks.
+ We're better off creating a new thread here with no delay,
+ either there is no workers at all, or they all are all blocking
+ and there was no sleeping thread to wakeup. It smells like deadlock
+ or very slowly executing requests, e.g sleeps or user locks.
*/
DBUG_RETURN(create_worker(thread_group));
}
+ ulonglong now = microsecond_interval_timer();
+ ulonglong time_since_last_thread_created =
+ (now - thread_group->last_thread_creation_time);
+
/* Throttle thread creation. */
- if ((thread_group->thread_count < 8 && time_since_last_thread_created > 50)
- || (thread_group->thread_count < 16 && time_since_last_thread_created > 100)
- || (time_since_last_thread_created > 200))
+ if (time_since_last_thread_created >
+ microsecond_throttling_interval(thread_group))
{
DBUG_RETURN(create_worker(thread_group));
}
@@ -729,62 +821,75 @@ static int wake_or_create_thread(thread_group_t *thread_group)
-/* Initialize thread group */
int thread_group_init(thread_group_t *thread_group, pthread_attr_t* thread_attr)
{
DBUG_ENTER("thread_group_init");
-
- memset(thread_group, 0, sizeof(thread_group_t));
thread_group->pthread_attr = thread_attr;
mysql_mutex_init(key_group_mutex, &thread_group->mutex, NULL);
- STAILQ_INIT(&thread_group->queue);
- SLIST_INIT(&thread_group->waiting_threads);
-
- thread_group->pending_thread_start_count= 0;
- thread_group->stalled= false;
-
- thread_group->pollfd= -1;
+ thread_group->pollfd=-1;
+ thread_group->shutdown_pipe[0]= -1;
+ thread_group->shutdown_pipe[1]= -1;
DBUG_RETURN(0);
}
-/*
- Wake single sleeping thread in pool. Optionally, tell this thread
- to listen to socket io notification.
+void thread_group_destroy(thread_group_t *thread_group)
+{
+ mysql_mutex_destroy(&thread_group->mutex);
+ if (thread_group->pollfd != -1)
+ {
+ close(thread_group->pollfd);
+ thread_group->pollfd= -1;
+ }
+ for(int i=0; i < 2; i++)
+ {
+ if(thread_group->shutdown_pipe[i] != -1)
+ {
+ close(thread_group->shutdown_pipe[i]);
+ thread_group->shutdown_pipe[i]= -1;
+ }
+ }
+}
+
+/**
+ Wake sleeping thread from waiting list
*/
static int wake_thread(thread_group_t *thread_group)
{
DBUG_ENTER("wake_thread");
- worker_thread_t *thread = SLIST_FIRST(&thread_group->waiting_threads);
+ worker_thread_t *thread = thread_group->waiting_threads.front();
if(thread)
{
thread->woken= true;
- SLIST_REMOVE_HEAD(&thread_group->waiting_threads, ptr);
+ thread_group->waiting_threads.remove(thread);
if (mysql_cond_signal(&thread->cond))
- abort();
+ abort();
DBUG_RETURN(0);
}
DBUG_RETURN(-1); /* no thread- missed wakeup*/
}
-/*
- Shutdown thread group.
+/*
+ Initiate shutdown for thread group.
+
+ The shutdown is asynchronous, we only care to wake all threads
+ in here, so they can finish. We do not wait here until threads
+ terminate,
+
+ Final cleanup of the group (thread_group_destroy) will be done by
+ the last exiting threads.
*/
static void thread_group_close(thread_group_t *thread_group)
{
DBUG_ENTER("thread_group_close");
-
- char c= 0;
mysql_mutex_lock(&thread_group->mutex);
if (thread_group->thread_count == 0 &&
thread_group->pending_thread_start_count == 0)
{
- if (thread_group->pollfd >= 0)
- close(thread_group->pollfd);
mysql_mutex_unlock(&thread_group->mutex);
- mysql_mutex_destroy(&thread_group->mutex);
+ thread_group_destroy(thread_group);
DBUG_VOID_RETURN;
}
@@ -795,40 +900,41 @@ static void thread_group_close(thread_group_t *thread_group)
{
DBUG_VOID_RETURN;
}
+
+ /* Wake listener */
if (io_poll_associate_fd(thread_group->pollfd,
- thread_group->shutdown_pipe[0], &POOL_SHUTDOWN_EVENT))
+ thread_group->shutdown_pipe[0], NULL))
{
DBUG_VOID_RETURN;
}
-
- /* Wake listener. */
+ char c= 0;
if (write(thread_group->shutdown_pipe[1], &c, 1) < 0)
DBUG_VOID_RETURN;
/* Wake all workers. */
- while(wake_thread(thread_group) == 0) {};
- mysql_mutex_unlock(&thread_group->mutex);
+ while(wake_thread(thread_group) == 0)
+ {
+ }
-#if 0
- /* Wait until workers terminate */
- while(thread_group->thread_count)
- usleep(1000);
-#endif
+ mysql_mutex_unlock(&thread_group->mutex);
DBUG_VOID_RETURN;
}
/*
- Post a task to the workqueue, maybe wake a worker so
- it picks the task.
+ Add work to the queue. Maybe wake a worker if they all sleep.
+
+ Currently, this function is only used when new connections need to
+ perform login (this is done in worker threads).
+
*/
-static void post_event(thread_group_t *thread_group, pool_event_t* ev)
+static void queue_put(thread_group_t *thread_group, connection_t *connection)
{
- DBUG_ENTER("post_event");
+ DBUG_ENTER("queue_put");
mysql_mutex_lock(&thread_group->mutex);
- STAILQ_INSERT_TAIL(&thread_group->queue, ev, next);
+ thread_group->queue.push_back(connection);
if (thread_group->active_thread_count == 0)
{
wake_or_create_thread(thread_group);
@@ -850,23 +956,33 @@ static bool too_many_threads(thread_group_t *thread_group)
-/*
- Dequeue a work item.
+/**
+ Retrieve a connection with pending event.
+
+ Pending event in our case means that there is either a pending login request
+ (if connection is not yet logged in), or there are unread bytes on the socket.
- If it is not immediately available, thread will sleep until
- work is available (it also can become IO listener for a while).
+ If there are no pending events currently, thread will wait. If timeout specified
+ int abstime parameter passes, the function returns NULL.
+
+ @param current_thread - current worker thread
+ @param thread_group - current thread group
+ @param abstime - absolute wait timeout
+
+ @return
+ connection with pending event. NULL is returned if timeout has expired,or on shutdown.
*/
-int get_event(worker_thread_t *current_thread, thread_group_t *thread_group,
- pool_event_t **ev, struct timespec *ts)
+connection_t *get_event(worker_thread_t *current_thread,
+ thread_group_t *thread_group, struct timespec *abstime)
{
DBUG_ENTER("get_event");
- pool_event_t *first_event = NULL;
+ connection_t *connection = NULL;
int err=0;
mysql_mutex_lock(&thread_group->mutex);
- decrement_active_threads(thread_group);
+
DBUG_ASSERT(thread_group->active_thread_count >= 0);
do
@@ -877,8 +993,8 @@ int get_event(worker_thread_t *current_thread, thread_group_t *thread_group,
/* Check if queue is not empty */
if (!too_many_threads(thread_group))
{
- first_event= queue_get(thread_group);
- if(first_event)
+ connection = queue_get(thread_group);
+ if(connection)
break;
}
@@ -888,7 +1004,7 @@ int get_event(worker_thread_t *current_thread, thread_group_t *thread_group,
thread_group->listener= current_thread;
mysql_mutex_unlock(&thread_group->mutex);
- first_event= listener(current_thread, thread_group);
+ connection = listener(current_thread, thread_group);
mysql_mutex_lock(&thread_group->mutex);
/* There is no listener anymore, it just returned. */
@@ -906,12 +1022,11 @@ int get_event(worker_thread_t *current_thread, thread_group_t *thread_group,
if (io_poll_wait(thread_group->pollfd,&nev,1, 0) == 1)
{
thread_group->io_event_count++;
- first_event = (pool_event_t *)native_event_get_userdata(&nev);
+ connection = (connection_t *)native_event_get_userdata(&nev);
break;
}
}
-
/* And now, finally sleep */
current_thread->woken = false; /* wake() sets this to true */
@@ -920,13 +1035,15 @@ int get_event(worker_thread_t *current_thread, thread_group_t *thread_group,
It is important to add thread to the head rather than tail
as it ensures LIFO wakeup order (hot caches, working inactivity timeout)
*/
- SLIST_INSERT_HEAD(&thread_group->waiting_threads, current_thread, ptr);
-
- if(ts)
- err = mysql_cond_timedwait(&current_thread->cond, &thread_group->mutex, ts);
+ thread_group->waiting_threads.push_front(current_thread);
+
+ decrement_active_threads(thread_group);
+ if(abstime)
+ err = mysql_cond_timedwait(&current_thread->cond, &thread_group->mutex, abstime);
else
err = mysql_cond_wait(&current_thread->cond, &thread_group->mutex);
-
+ increment_active_threads(thread_group);
+
if (!current_thread->woken)
{
/*
@@ -934,7 +1051,7 @@ int get_event(worker_thread_t *current_thread, thread_group_t *thread_group,
a timeout. Anyhow, we need to remove ourselves from the list now.
If thread was explicitly woken, than caller removed us from the list.
*/
- SLIST_REMOVE(&thread_group->waiting_threads, current_thread, worker_thread_t, ptr);
+ thread_group->waiting_threads.remove(current_thread);
}
if(err)
@@ -944,26 +1061,16 @@ int get_event(worker_thread_t *current_thread, thread_group_t *thread_group,
while(true);
thread_group->stalled= false;
- increment_active_threads(thread_group);
mysql_mutex_unlock(&thread_group->mutex);
-
-
- if (first_event)
- *ev = first_event;
- else
- *ev = &POOL_SHUTDOWN_EVENT;
- DBUG_RETURN(err);
+ DBUG_RETURN(connection);
}
-/*
- Tells the pool that thread starts waiting on IO, lock, condition,
+/**
+ Tells the pool that worker starts waiting on IO, lock, condition,
sleep() or similar.
-
- Will wake another worker, and if there is no listener will
- promote a listener,
*/
void wait_begin(thread_group_t *thread_group)
{
@@ -974,8 +1081,12 @@ void wait_begin(thread_group_t *thread_group)
DBUG_ASSERT(thread_group->connection_count > 0);
if((thread_group->active_thread_count == 0) &&
- (!queue_is_empty(thread_group) || !thread_group->listener))
+ (thread_group->queue.is_empty() || !thread_group->listener))
{
+ /*
+ Group might stall while this thread waits, thus wake
+ or create a worker to prevent stall.
+ */
wake_or_create_thread(thread_group);
}
@@ -983,9 +1094,10 @@ void wait_begin(thread_group_t *thread_group)
DBUG_VOID_RETURN;
}
-/*
- Tells the pool current thread finished waiting.
+/**
+ Tells the pool has finished waiting.
*/
+
void wait_end(thread_group_t *thread_group)
{
DBUG_ENTER("wait_end");
@@ -996,7 +1108,10 @@ void wait_end(thread_group_t *thread_group)
}
-/* Scheduler */
+/**
+ Allocate/initialize a new connection structure.
+*/
+
connection_t *alloc_connection(THD *thd)
{
DBUG_ENTER("alloc_connection");
@@ -1014,42 +1129,67 @@ connection_t *alloc_connection(THD *thd)
-/*
+/**
Add a new connection to thread pool..
*/
+
void tp_add_connection(THD *thd)
{
DBUG_ENTER("tp_add_connection");
threads.append(thd);
mysql_mutex_unlock(&LOCK_thread_count);
- connection_t *c= alloc_connection(thd);
- if(c)
+ connection_t *connection= alloc_connection(thd);
+ if(connection)
{
- c->thread_group= &all_groups[c->thd->thread_id%group_count];
- mysql_mutex_lock(&c->thread_group->mutex);
- c->thread_group->connection_count++;
- mysql_mutex_unlock(&c->thread_group->mutex);
- c->thd->event_scheduler.data = c;
- post_event(c->thread_group,&c->event);
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ thd->event_scheduler.data= connection;
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+
+ /* Assign connection to a group. */
+ thread_group_t *group=
+ &all_groups[connection->thd->thread_id%group_count];
+
+ connection->thread_group=group;
+
+ mysql_mutex_lock(&group->mutex);
+ group->connection_count++;
+ mysql_mutex_unlock(&group->mutex);
+
+ /*
+ Add connection to the work queue.Actual logon
+ will be done by a worker thread.
+ */
+ queue_put(group, connection);
}
DBUG_VOID_RETURN;
}
-static void connection_abort(connection_t *c)
+/**
+ Terminate connection.
+*/
+
+static void connection_abort(connection_t *connection)
{
DBUG_ENTER("connection_abort");
- mysql_mutex_lock(&c->thread_group->mutex);
- c->thread_group->connection_count--;
- mysql_mutex_unlock(&c->thread_group->mutex);
+ thread_group_t *group= connection->thread_group;
+
+ mysql_mutex_lock(&group->mutex);
+ group->connection_count--;
+ mysql_mutex_unlock(&group->mutex);
- threadpool_remove_connection(c->thd);
- my_free(c);
+ threadpool_remove_connection(connection->thd);
+ my_free(connection);
DBUG_VOID_RETURN;
}
+
+/**
+ MySQL scheduler callback : kill connection
+*/
+
void tp_post_kill_notification(THD *thd)
{
DBUG_ENTER("tp_post_kill_notification");
@@ -1061,6 +1201,10 @@ void tp_post_kill_notification(THD *thd)
DBUG_VOID_RETURN;
}
+/**
+ MySQL scheduler callback: wait begin
+*/
+
void tp_wait_begin(THD *thd, int type)
{
DBUG_ENTER("tp_wait_begin");
@@ -1079,6 +1223,10 @@ void tp_wait_begin(THD *thd, int type)
}
+/**
+ MySQL scheduler callback: wait end
+*/
+
void tp_wait_end(THD *thd)
{
DBUG_ENTER("tp_wait_end");
@@ -1095,7 +1243,7 @@ void tp_wait_end(THD *thd)
DBUG_VOID_RETURN;
}
-
+
static void set_next_timeout_check(ulonglong abstime)
{
DBUG_ENTER("set_next_timeout_check");
@@ -1108,6 +1256,11 @@ static void set_next_timeout_check(ulonglong abstime)
DBUG_VOID_RETURN;
}
+
+/**
+ Set wait timeout for connection.
+*/
+
static void set_wait_timeout(connection_t *c)
{
DBUG_ENTER("set_wait_timeout");
@@ -1129,10 +1282,10 @@ static void set_wait_timeout(connection_t *c)
-/*
- Handle a (rare) special case,where connection needs to
- migrate to a different group because group_count has changed
- as a result of thread_pool_size setting.
+/**
+ Handle a (rare) special case,where connection needs to
+ migrate to a different group because group_count has changed
+ after thread_pool_size setting.
*/
static int change_group(connection_t *c,
thread_group_t *old_group,
@@ -1162,9 +1315,9 @@ static int change_group(connection_t *c,
}
-static int start_io(connection_t *c)
+static int start_io(connection_t *connection)
{
- int fd = c->thd->net.vio->sd;
+ int fd = connection->thd->net.vio->sd;
/*
Usually, connection will stay in the same group for the entire
@@ -1176,78 +1329,79 @@ static int start_io(connection_t *c)
So we recalculate in which group the connection should be, based
on thread_id and current group count, and migrate if necessary.
*/
- thread_group_t *g = &all_groups[c->thd->thread_id%group_count];
+ thread_group_t *group =
+ &all_groups[connection->thd->thread_id%group_count];
- if (g != c->thread_group)
+ if (group != connection->thread_group)
{
- if (!change_group(c, c->thread_group, g))
+ if (!change_group(connection, connection->thread_group, group))
{
- c->logged_in= true;
- return io_poll_associate_fd(c->thread_group->pollfd, fd, c);
+ connection->logged_in= true;
+ return io_poll_associate_fd(group->pollfd, fd, connection);
}
else
return -1;
}
-
/*
- Handle case where connection is not yet logged in, i.e
- not associated with poll fd.
+ In case binding to a poll descriptor was not yet done,
+ (start_io called first time), do it now.
*/
- if(!c->logged_in)
+ if(!connection->logged_in)
{
- c->logged_in= true;
- return io_poll_associate_fd(c->thread_group->pollfd, fd, c);
+ connection->logged_in= true;
+ return io_poll_associate_fd(group->pollfd, fd, connection);
}
- return io_poll_start_read(c->thread_group->pollfd, fd, c);
+ return io_poll_start_read(group->pollfd, fd, connection);
}
-static void handle_event(pool_event_t *ev)
+static void handle_event(connection_t *connection)
{
DBUG_ENTER("handle_event");
-
- /* Normal case, handle query on connection */
- connection_t *c = (connection_t*)(void *)ev;
int ret;
- if (!c->logged_in)
+ if (!connection->logged_in)
{
- ret= threadpool_add_connection(c->thd);
+ ret= threadpool_add_connection(connection->thd);
}
else
{
- ret= threadpool_process_request(c->thd);
+ ret= threadpool_process_request(connection->thd);
}
if(!ret)
{
- set_wait_timeout(c);
- ret= start_io(c);
+ set_wait_timeout(connection);
+ ret= start_io(connection);
}
if (ret)
{
- connection_abort(c);
+ connection_abort(connection);
}
+
DBUG_VOID_RETURN;
}
+/**
+ Worker thread's main
+*/
static void *worker_main(void *param)
{
worker_thread_t this_thread;
-
- thread_created++;
pthread_detach_this_thread();
my_thread_init();
+
DBUG_ENTER("worker_main");
+ thread_created++;
thread_group_t *thread_group = (thread_group_t *)param;
/* Init per-thread structure */
@@ -1265,16 +1419,16 @@ static void *worker_main(void *param)
/* Run event loop */
for(;;)
{
- struct pool_event_t *ev;
+ connection_t *connection;
struct timespec ts;
set_timespec(ts,threadpool_idle_timeout);
- if (get_event(&this_thread, thread_group, &ev, &ts)
- || ev == &POOL_SHUTDOWN_EVENT)
+ connection = get_event(&this_thread, thread_group, &ts);
+ if (!connection)
{
break;
}
this_thread.event_count++;
- handle_event(ev);
+ handle_event(connection);
}
/* Thread shutdown: cleanup per-worker-thread structure. */
@@ -1286,20 +1440,18 @@ static void *worker_main(void *param)
mysql_mutex_unlock(&thread_group->mutex);
my_atomic_add32(&tp_stats.num_worker_threads, -1);
- /* If it is the last thread in pool and pool is terminating, destroy pool.*/
- if (thread_group->shutdown && (thread_group->thread_count == 0))
+ /* If it is the last thread in group and pool is terminating, destroy group.*/
+ if (thread_group->shutdown && thread_group->thread_count == 0
+ && thread_group->pending_thread_start_count == 0)
{
- /* last thread existing, cleanup the pool structure */
- mysql_mutex_destroy(&thread_group->mutex);
+ thread_group_destroy(thread_group);
}
- DBUG_POP();
my_thread_end();
return NULL;
}
-static bool started=false;
-
+static bool started=false;
bool tp_init()
{
DBUG_ENTER("tp_init");
@@ -1311,10 +1463,7 @@ bool tp_init()
thread_group_init(&all_groups[i], get_connection_attrib());
}
tp_set_threadpool_size(threadpool_size);
-
- #define PSI_register(X) \
- if(PSI_server) PSI_server->register_ ## X("threadpool", X ## _list, array_elements(X ## _list))
-
+
PSI_register(mutex);
PSI_register(cond);
PSI_register(thread);