summaryrefslogtreecommitdiff
path: root/sql/threadpool_common.cc
diff options
context:
space:
mode:
authorVladislav Vaintroub <wlad@mariadb.com>2016-09-21 14:28:42 +0000
committerVladislav Vaintroub <wlad@mariadb.com>2016-09-22 17:01:28 +0000
commitf7a7c0c2fec3dcca331bb529f8314273360c72ae (patch)
tree2e04f4036bd7def676d85690e67e393ec0c41a8e /sql/threadpool_common.cc
parentf32a5115584c9b33a2163df57830ad335cd2b3ab (diff)
downloadmariadb-git-f7a7c0c2fec3dcca331bb529f8314273360c72ae.tar.gz
MDEV-10297 Add priorization to threadpool
Also MDEV-10385 Threadpool refactoring
Diffstat (limited to 'sql/threadpool_common.cc')
-rw-r--r--sql/threadpool_common.cc214
1 files changed, 197 insertions, 17 deletions
diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc
index d6c343dc04e..2308f4277d6 100644
--- a/sql/threadpool_common.cc
+++ b/sql/threadpool_common.cc
@@ -34,14 +34,25 @@ uint threadpool_max_size;
uint threadpool_stall_limit;
uint threadpool_max_threads;
uint threadpool_oversubscribe;
+uint threadpool_mode;
+uint threadpool_prio_kickup_timer;
/* Stats */
TP_STATISTICS tp_stats;
+static void threadpool_remove_connection(THD *thd);
+static int threadpool_process_request(THD *thd);
+static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data);
+
extern "C" pthread_key(struct st_my_thread_var*, THR_KEY_mysys);
extern bool do_command(THD*);
+static inline TP_connection *get_TP_connection(THD *thd)
+{
+ return (TP_connection *)thd->event_scheduler.data;
+}
+
/*
Worker threads contexts, and THD contexts.
=========================================
@@ -105,15 +116,81 @@ static void thread_attach(THD* thd)
#endif
}
+/*
+ Determine connection priority , using current
+ transaction state and 'threadpool_priority' variable value.
+*/
+static TP_PRIORITY get_priority(TP_connection *c)
+{
+ DBUG_ASSERT(c->thd == current_thd);
+ TP_PRIORITY prio= (TP_PRIORITY)c->thd->variables.threadpool_priority;
+ if (prio == TP_PRIORITY_AUTO)
+ {
+ return c->thd->transaction.is_active() ? TP_PRIORITY_HIGH : TP_PRIORITY_LOW;
+ }
+ return prio;
+}
-THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
+
+void tp_callback(TP_connection *c)
{
- THD *thd= NULL;
- int error=1;
+ DBUG_ASSERT(c);
Worker_thread_context worker_context;
worker_context.save();
+ THD *thd= c->thd;
+
+ c->state = TP_STATE_RUNNING;
+
+ if (!thd)
+ {
+ /* No THD, need to login first. */
+ DBUG_ASSERT(c->connect);
+ thd= c->thd= threadpool_add_connection(c->connect, c);
+ if (!thd)
+ {
+ /* Bail out on connect error.*/
+ goto error;
+ }
+ c->connect= 0;
+ }
+ else if (threadpool_process_request(thd))
+ {
+ /* QUIT or an error occured. */
+ goto error;
+ }
+
+ /* Set priority */
+ c->priority= get_priority(c);
+
+ /* Read next command from client. */
+ c->set_io_timeout(thd->variables.net_wait_timeout);
+ c->state= TP_STATE_IDLE;
+ if (c->start_io())
+ goto error;
+
+ worker_context.restore();
+ return;
+
+error:
+ c->thd= 0;
+ delete c;
+
+ if (thd)
+ {
+ threadpool_remove_connection(thd);
+ }
+ worker_context.restore();
+}
+
+
+static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
+{
+ THD *thd= NULL;
+ int error=1;
+
+
/*
Create a new connection context: mysys_thread_var and PSI thread
Store them in THD.
@@ -137,7 +214,6 @@ THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
#endif
my_thread_end();
}
- worker_context.restore();
return NULL;
}
delete connect;
@@ -184,17 +260,14 @@ THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
threadpool_remove_connection(thd);
thd= NULL;
}
- worker_context.restore();
return thd;
}
-void threadpool_remove_connection(THD *thd)
+static void threadpool_remove_connection(THD *thd)
{
- Worker_thread_context worker_context;
- worker_context.save();
thread_attach(thd);
-
+ thd->event_scheduler.data= 0;
thd->net.reading_or_writing = 0;
end_connection(thd);
close_connection(thd, 0);
@@ -206,19 +279,14 @@ void threadpool_remove_connection(THD *thd)
mysys thread_var and PSI thread.
*/
my_thread_end();
-
- worker_context.restore();
}
/**
Process a single client request or a single batch.
*/
-int threadpool_process_request(THD *thd)
+static int threadpool_process_request(THD *thd)
{
int retval= 0;
- Worker_thread_context worker_context;
- worker_context.save();
-
thread_attach(thd);
if (thd->killed >= KILL_CONNECTION)
@@ -268,7 +336,6 @@ int threadpool_process_request(THD *thd)
}
end:
- worker_context.restore();
return retval;
}
@@ -286,6 +353,119 @@ static bool tp_end_thread(THD *, bool)
return 0;
}
+static TP_pool *pool;
+
+static bool tp_init()
+{
+
+#ifdef _WIN32
+ if (threadpool_mode == TP_MODE_WINDOWS)
+ pool= new (std::nothrow) TP_pool_win;
+ else
+ pool= new (std::nothrow) TP_pool_generic;
+#else
+ pool= new (std::nothrow) TP_pool_generic;
+#endif
+ if (!pool)
+ return true;
+ if (pool->init())
+ {
+ delete pool;
+ pool= 0;
+ return true;
+ }
+ return false;
+}
+
+static void tp_add_connection(CONNECT *connect)
+{
+ TP_connection *c= pool->new_connection(connect);
+ DBUG_EXECUTE_IF("simulate_failed_connection_1", delete c ; c= 0;);
+ if (c)
+ pool->add(c);
+ else
+ connect->close_and_delete();
+}
+
+int tp_get_idle_thread_count()
+{
+ return pool? pool->get_idle_thread_count(): 0;
+}
+
+int tp_get_thread_count()
+{
+ return pool ? pool->get_thread_count() : 0;
+}
+
+void tp_set_min_threads(uint val)
+{
+ if (pool)
+ pool->set_min_threads(val);
+}
+
+
+void tp_set_max_threads(uint val)
+{
+ if (pool)
+ pool->set_max_threads(val);
+}
+
+void tp_set_threadpool_size(uint val)
+{
+ if (pool)
+ pool->set_pool_size(val);
+}
+
+
+void tp_set_threadpool_stall_limit(uint val)
+{
+ if (pool)
+ pool->set_stall_limit(val);
+}
+
+
+void tp_timeout_handler(TP_connection *c)
+{
+ if (c->state != TP_STATE_IDLE)
+ return;
+ THD *thd=c->thd;
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ thd->killed= KILL_CONNECTION;
+ c->priority= TP_PRIORITY_HIGH;
+ post_kill_notification(thd);
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+}
+
+
+static void tp_wait_begin(THD *thd, int type)
+{
+ TP_connection *c = get_TP_connection(thd);
+ if (c)
+ c->wait_begin(type);
+}
+
+
+static void tp_wait_end(THD *thd)
+{
+ TP_connection *c = get_TP_connection(thd);
+ if (c)
+ c->wait_end();
+}
+
+
+static void tp_end()
+{
+ delete pool;
+}
+
+static void tp_post_kill_notification(THD *thd)
+{
+ TP_connection *c= get_TP_connection(thd);
+ if (c)
+ c->priority= TP_PRIORITY_HIGH;
+ post_kill_notification(thd);
+}
+
static scheduler_functions tp_scheduler_functions=
{
0, // max_threads
@@ -296,7 +476,7 @@ static scheduler_functions tp_scheduler_functions=
tp_add_connection, // add_connection
tp_wait_begin, // thd_wait_begin
tp_wait_end, // thd_wait_end
- post_kill_notification, // post_kill_notification
+ tp_post_kill_notification, // post kill notification
tp_end_thread, // Dummy function
tp_end // end
};