summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/scheduler.cc9
-rw-r--r--sql/scheduler.h3
-rw-r--r--sql/sql_class.cc19
-rw-r--r--sql/threadpool.h7
-rw-r--r--sql/threadpool_common.cc63
-rw-r--r--sql/threadpool_win.cc156
6 files changed, 236 insertions, 21 deletions
diff --git a/sql/scheduler.cc b/sql/scheduler.cc
index 7380b134f13..526aa135805 100644
--- a/sql/scheduler.cc
+++ b/sql/scheduler.cc
@@ -109,7 +109,14 @@ void post_kill_notification(THD *thd)
*/
#ifndef EMBEDDED_LIBRARY
+static void* my_scheduler_yield()
+{
+ return NULL;
+}
+static void my_scheduler_resume(void *)
+{
+}
void one_thread_per_connection_scheduler(scheduler_functions *func,
ulong *arg_max_connections,
Atomic_counter<uint> *arg_connection_count)
@@ -120,6 +127,7 @@ void one_thread_per_connection_scheduler(scheduler_functions *func,
func->connection_count= arg_connection_count;
func->add_connection= create_thread_to_handle_connection;
func->post_kill_notification= post_kill_notification;
+
}
#else
void handle_connection_in_main_thread(CONNECT *connect)
@@ -139,3 +147,4 @@ void one_thread_scheduler(scheduler_functions *func)
func->connection_count= &connection_count;
func->add_connection= handle_connection_in_main_thread;
}
+
diff --git a/sql/scheduler.h b/sql/scheduler.h
index 676262f6454..8c6f408e329 100644
--- a/sql/scheduler.h
+++ b/sql/scheduler.h
@@ -40,6 +40,9 @@ struct scheduler_functions
void (*thd_wait_end)(THD *thd);
void (*post_kill_notification)(THD *thd);
void (*end)(void);
+ void* (*get_context)();
+ void (*yield)(void);
+ void (*resume)(void *);
};
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 4577f1007be..53b11c345b6 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -4917,6 +4917,25 @@ void reset_thd(MYSQL_THD thd)
free_root(thd->mem_root, MYF(MY_KEEP_PREALLOC));
}
+int my_scheduler_get_context(void **ctx, void (**resume_fct)(void*))
+{
+ THD *thd = _current_thd();
+ if (thd && thd->scheduler && thd->scheduler->yield)
+ {
+ auto s = thd->scheduler;
+ *ctx = s->get_context();
+ *resume_fct =s->resume;
+ return ctx?0:1;
+ }
+ return 1;
+}
+
+void my_scheduler_yield()
+{
+ THD* thd = _current_thd();
+ thd->scheduler->yield();
+}
+
unsigned long long thd_get_query_id(const MYSQL_THD thd)
{
return((unsigned long long)thd->query_id);
diff --git a/sql/threadpool.h b/sql/threadpool.h
index fe77100a82a..cf7b74d2de9 100644
--- a/sql/threadpool.h
+++ b/sql/threadpool.h
@@ -135,6 +135,10 @@ struct TP_pool
virtual int set_stall_limit(uint){ return 0; }
virtual int get_thread_count() { return tp_stats.num_worker_threads; }
virtual int get_idle_thread_count(){ return 0; }
+ virtual void* get_context() {return nullptr;}
+ virtual void yield() {}
+ virtual void resume(void *) {}
+
};
#ifdef _WIN32
@@ -147,6 +151,9 @@ struct TP_pool_win:TP_pool
virtual void add(TP_connection *);
virtual int set_max_threads(uint);
virtual int set_min_threads(uint);
+ virtual void* get_context();
+ virtual void yield();
+ virtual void resume(void *);
};
#endif
diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc
index 0588562ae61..076fd34bf3b 100644
--- a/sql/threadpool_common.cc
+++ b/sql/threadpool_common.cc
@@ -169,15 +169,14 @@ static TP_PRIORITY get_priority(TP_connection *c)
return prio;
}
-
-void tp_callback(TP_connection *c)
+int tp_callback_internal(TP_connection* c)
{
DBUG_ASSERT(c);
Worker_thread_context worker_context;
worker_context.save();
- THD *thd= c->thd;
+ THD* thd = c->thd;
c->state = TP_STATE_RUNNING;
@@ -185,13 +184,14 @@ void tp_callback(TP_connection *c)
{
/* No THD, need to login first. */
DBUG_ASSERT(c->connect);
- thd= c->thd= threadpool_add_connection(c->connect, c);
+ my_thread_init();
+ thd = c->thd = threadpool_add_connection(c->connect, c);
if (!thd)
{
/* Bail out on connect error.*/
goto error;
}
- c->connect= 0;
+ c->connect = 0;
}
else if (threadpool_process_request(thd))
{
@@ -200,28 +200,43 @@ void tp_callback(TP_connection *c)
}
/* Set priority */
- c->priority= get_priority(c);
+ c->priority = get_priority(c);
/* Read next command from client. */
c->set_io_timeout(thd->get_net_wait_timeout());
- c->state= TP_STATE_IDLE;
- if (c->start_io())
- goto error;
+ c->state = TP_STATE_IDLE;
worker_context.restore();
- return;
+ return 0;
error:
- c->thd= 0;
- delete c;
+ worker_context.restore();
+ return -1;
+}
+void tp_destroy_connection(TP_connection *c)
+{
+ Worker_thread_context worker_context;
+ worker_context.save();
+
+ THD *thd = c->thd;
+ c->thd = 0;
+ delete c;
if (thd)
- {
threadpool_remove_connection(thd);
- }
worker_context.restore();
}
+void tp_callback(TP_connection *c)
+{
+ if (tp_callback_internal(c)
+ || c->start_io())
+ {
+ tp_destroy_connection(c);
+ }
+}
+
+
static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
{
@@ -509,6 +524,21 @@ static void tp_post_kill_notification(THD *thd)
post_kill_notification(thd);
}
+static void tp_yield()
+{
+ pool->yield();
+}
+
+static void* tp_get_context()
+{
+ return pool->get_context();
+}
+
+static void tp_resume(void* context)
+{
+ pool->resume(context);
+}
+
static scheduler_functions tp_scheduler_functions=
{
0, // max_threads
@@ -519,7 +549,10 @@ static scheduler_functions tp_scheduler_functions=
tp_wait_begin, // thd_wait_begin
tp_wait_end, // thd_wait_end
tp_post_kill_notification, // post kill notification
- tp_end // end
+ tp_end, // end
+ tp_get_context,
+ tp_yield,
+ tp_resume
};
void pool_of_threads_scheduler(struct scheduler_functions *func,
diff --git a/sql/threadpool_win.cc b/sql/threadpool_win.cc
index c9968d48c06..f25402a4f51 100644
--- a/sql/threadpool_win.cc
+++ b/sql/threadpool_win.cc
@@ -41,6 +41,7 @@ static void tp_log_warning(const char *msg, const char *fct)
static PTP_POOL pool;
static TP_CALLBACK_ENVIRON callback_environ;
+static TP_CALLBACK_ENVIRON callback_environ_highprio;
static DWORD fls;
static bool skip_completion_port_on_success = false;
@@ -50,6 +51,25 @@ PTP_CALLBACK_ENVIRON get_threadpool_win_callback_environ()
return pool? &callback_environ: 0;
}
+enum class child_return
+{
+ SUCCESS,
+ ERROR,
+ YIELD,
+ YIELD_CONFIRM,
+ PARENT_ERROR
+};
+
+struct tp_fiber_data
+{
+ void* parent_fiber;
+ void* child_fiber;
+ void* child_data;
+ child_return child_ret;
+ tp_fiber_data():parent_fiber(),child_fiber(),child_data(), child_ret(){}
+};
+
+
/*
Threadpool callbacks.
@@ -97,8 +117,13 @@ public:
PTP_IO io;
PTP_TIMER timer;
PTP_WORK work;
+ PTP_WORK work_highprio;
bool long_callback;
-
+ void *fiber;
+ void *parent_fiber;
+ volatile child_return fiber_ret;
+ std::atomic<int> executing;
+ DWORD executing_thread_id;
};
struct TP_connection *new_TP_connection(CONNECT *connect)
@@ -112,27 +137,136 @@ struct TP_connection *new_TP_connection(CONNECT *connect)
return c;
}
+extern int tp_callback_internal(TP_connection* c);
+extern void tp_destroy_connection(TP_connection* c);
+
+static void tp_fiber(void *param)
+{
+ auto c = (TP_connection_win*)param;
+ void *parent_fiber;
+ for(;;)
+ {
+ if(c->fiber_ret == child_return::PARENT_ERROR)
+ {
+ parent_fiber = c->parent_fiber;
+ tp_destroy_connection(c);
+ SwitchToFiber(parent_fiber);
+ }
+ else
+ {
+ c->fiber_ret = child_return::YIELD;
+ if (tp_callback_internal(c))
+ c->fiber_ret = child_return::ERROR;
+ else
+ c->fiber_ret = child_return::SUCCESS;
+ SwitchToFiber(c->parent_fiber);
+ }
+ }
+}
+
+static void tp_callback_fiber(TP_connection_win* c)
+{
+ c->parent_fiber = GetCurrentFiber();
+ if (!c->fiber)
+ c->fiber = CreateFiber(my_thread_stack_size, tp_fiber, c);
+ if (!c->fiber)
+ {
+ tp_destroy_connection(c);
+ return;
+ }
+ void* child_fiber = c->fiber;
+ auto ret = c->fiber_ret;
+ switch (ret)
+ {
+ case child_return::ERROR:
+ case child_return::PARENT_ERROR:
+ abort();
+ break;
+ case child_return::YIELD:
+ SubmitThreadpoolWork(c->work);
+ return;
+ case child_return::YIELD_CONFIRM:
+ break;
+ case child_return::SUCCESS:
+ break;
+ }
+
+ if (c->executing++)
+ abort();
+ c->executing_thread_id = GetCurrentThreadId();
+ SwitchToFiber(c->fiber);
+ c->executing --;
+
+ switch (c->fiber_ret)
+ {
+ case child_return::SUCCESS:
+ if (c->start_io())
+ {
+ c->fiber_ret = child_return::PARENT_ERROR;
+
+ SwitchToFiber(child_fiber);
+ DeleteFiber(child_fiber);
+ }
+ break;
+ case child_return::ERROR:
+ c->fiber_ret = child_return::PARENT_ERROR;
+ SwitchToFiber(child_fiber);
+ DeleteFiber(child_fiber);
+ break;
+ case child_return::YIELD:
+ c->fiber_ret = child_return::YIELD_CONFIRM;
+ break;
+ case child_return::PARENT_ERROR:
+ case child_return::YIELD_CONFIRM:
+ abort();
+ }
+}
+
void TP_pool_win::add(TP_connection *c)
{
if(FlsGetValue(fls))
{
/* Inside threadpool(), execute callback directly. */
- tp_callback(c);
+ tp_callback_fiber((TP_connection_win*)c);
}
else
{
- SubmitThreadpoolWork(((TP_connection_win *)c)->work);
+ TP_connection_win *c_w = ((TP_connection_win*)c);
+ SubmitThreadpoolWork(c_w->work);
}
}
+void TP_pool_win::yield()
+{
+ TP_connection_win *c = (TP_connection_win *) GetFiberData();
+ c->fiber_ret = child_return::YIELD;
+ SwitchToFiber(c->parent_fiber);
+}
+
+void* TP_pool_win::get_context()
+{
+ return GetFiberData();
+}
+
+void TP_pool_win::resume(void* context)
+{
+ auto c = (TP_connection_win*) context;
+ if (c->fiber_ret != child_return::YIELD && c->fiber_ret != child_return::YIELD_CONFIRM)
+ abort();
+ SubmitThreadpoolWork(c->work);
+}
TP_connection_win::TP_connection_win(CONNECT *c) :
TP_connection(c),
- timeout(ULONGLONG_MAX),
+ timeout(ULONGLONG_MAX),
callback_instance(0),
io(0),
timer(0),
- work(0)
+ work(0),
+ parent_fiber(),
+ fiber(),
+ fiber_ret(),
+ executing(0)
{
}
@@ -167,6 +301,7 @@ int TP_connection_win::init()
CHECK_ALLOC_ERROR(io= CreateThreadpoolIo(handle, io_completion_callback, this, &callback_environ));
CHECK_ALLOC_ERROR(timer= CreateThreadpoolTimer(timer_callback, this, &callback_environ));
CHECK_ALLOC_ERROR(work= CreateThreadpoolWork(work_callback, this, &callback_environ));
+ CHECK_ALLOC_ERROR(work_highprio = CreateThreadpoolWork(work_callback, this, &callback_environ_highprio));
return 0;
}
@@ -266,6 +401,9 @@ TP_connection_win::~TP_connection_win()
if (work)
CloseThreadpoolWork(work);
+ if (work_highprio)
+ CloseThreadpoolWork(work_highprio);
+
if (timer)
{
SetThreadpoolTimer(timer, 0, 0, 0);
@@ -304,6 +442,7 @@ void tp_win_callback_prolog()
if (FlsGetValue(fls) == NULL)
{
/* Running in new worker thread*/
+ ConvertThreadToFiber(NULL);
FlsSetValue(fls, (void *)1);
statistic_increment(thread_created, &LOCK_status);
InterlockedIncrement((volatile long *)&tp_stats.num_worker_threads);
@@ -329,6 +468,7 @@ static VOID WINAPI thread_destructor(void *data)
{
if(data)
{
+ ConvertFiberToThread();
InterlockedDecrement((volatile long *)&tp_stats.num_worker_threads);
my_thread_end();
}
@@ -339,7 +479,7 @@ static VOID WINAPI thread_destructor(void *data)
static inline void tp_callback(PTP_CALLBACK_INSTANCE instance, PVOID context)
{
pre_callback(context, instance);
- tp_callback((TP_connection *)context);
+ tp_callback_fiber((TP_connection_win *)context);
}
@@ -412,6 +552,9 @@ int TP_pool_win::init()
InitializeThreadpoolEnvironment(&callback_environ);
SetThreadpoolCallbackPool(&callback_environ, pool);
+ InitializeThreadpoolEnvironment(&callback_environ_highprio);
+ SetThreadpoolCallbackPriority(&callback_environ_highprio,TP_CALLBACK_PRIORITY_HIGH);
+ SetThreadpoolCallbackPool(&callback_environ_highprio, pool);
if (threadpool_max_threads)
{
SetThreadpoolThreadMaximum(pool, threadpool_max_threads);
@@ -446,6 +589,7 @@ TP_pool_win::~TP_pool_win()
if (!pool)
return;
DestroyThreadpoolEnvironment(&callback_environ);
+ DestroyThreadpoolEnvironment(&callback_environ_highprio);
SetThreadpoolThreadMaximum(pool, 0);
CloseThreadpool(pool);
if (!tp_stats.num_worker_threads)