diff options
author | Vladislav Vaintroub <wlad@mariadb.com> | 2022-02-18 18:10:34 +0100 |
---|---|---|
committer | Vladislav Vaintroub <wlad@mariadb.com> | 2022-02-21 16:05:00 +0100 |
commit | dcd3ce0bc5433af23fbd44b1e28721e447cdbc94 (patch) | |
tree | d7a7612c90ba0cf1d552437d696c5f37eb8c6f87 | |
parent | bbe99cd4e2d7c83a06dd93ea88af97f2d5796810 (diff) | |
download | mariadb-git-10.9-wlad.tar.gz |
For Nikita : illustrate of queuing APC, when connection is sleeping10.9-wlad
-rw-r--r-- | sql/scheduler.h | 10 | ||||
-rw-r--r-- | sql/sql_class.h | 1 | ||||
-rw-r--r-- | sql/sql_parse.cc | 10 | ||||
-rw-r--r-- | sql/threadpool.h | 3 | ||||
-rw-r--r-- | sql/threadpool_common.cc | 20 | ||||
-rw-r--r-- | sql/threadpool_generic.cc | 9 | ||||
-rw-r--r-- | sql/threadpool_win.cc | 13 |
7 files changed, 64 insertions, 2 deletions
diff --git a/sql/scheduler.h b/sql/scheduler.h index c2686aad21c..37754f8c21d 100644 --- a/sql/scheduler.h +++ b/sql/scheduler.h @@ -42,6 +42,16 @@ struct scheduler_functions void (*end)(void); /** resume previous unfinished command (threadpool only)*/ void (*thd_resume)(THD* thd); + + /** + requests asynchronous procedure call execution + Works only when THD is sleeping, i.e awaiting user requests. + + When THD is active, there is another mechanism for that. + @return 0, if APC is queued for execution, otherwise the connection + active (or maybe executing another APC) + */ + int (*thd_wake)(THD *thd); }; diff --git a/sql/sql_class.h b/sql/sql_class.h index ab04e17fdfd..e9c39b42b02 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -2572,6 +2572,7 @@ private: { DBUG_ASSERT(0); return Statement::is_conventional(); } public: + bool woken= false; MDL_context mdl_context; /* Used to execute base64 coded binlog events in MySQL server */ diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 18629f4bd22..d3ea7bf7057 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -1209,6 +1209,16 @@ dispatch_command_return do_command(THD *thd, bool blocking) #else DBUG_ASSERT(!thd->async_state.pending_ops()); #endif + if (thd->woken) + { + thd->async_state.m_state= thd_async_state::enum_async_state::NONE; + mysql_mutex_lock(&thd->LOCK_thd_kill); + thd->woken= false; + mysql_mutex_unlock(&thd->LOCK_thd_kill); + /* Does whatever one needs for when THD is woken*/ + /* MOST IMPORTANT!, return once you done */ + return DISPATCH_COMMAND_SUCCESS; + } if (thd->async_state.m_state == thd_async_state::enum_async_state::RESUMED) { diff --git a/sql/threadpool.h b/sql/threadpool.h index 7737d056b4a..fc060f9ee06 100644 --- a/sql/threadpool.h +++ b/sql/threadpool.h @@ -134,6 +134,7 @@ struct TP_pool virtual int get_thread_count() { return tp_stats.num_worker_threads; } virtual int get_idle_thread_count(){ return 0; } virtual void resume(TP_connection* c)=0; + virtual int wake(TP_connection *c)= 0; }; #ifdef _WIN32 @@ -148,6 +149,7 @@ struct TP_pool_win:TP_pool virtual int set_max_threads(uint); virtual int set_min_threads(uint); void resume(TP_connection *c); + int wake(TP_connection *c); }; #endif @@ -162,6 +164,7 @@ struct TP_pool_generic :TP_pool virtual int set_stall_limit(uint); virtual int get_idle_thread_count(); void resume(TP_connection* c); + int wake(TP_connection *c); }; #endif /* HAVE_POOL_OF_THREADS */ diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc index 97c6c317cc4..95faafe1e63 100644 --- a/sql/threadpool_common.cc +++ b/sql/threadpool_common.cc @@ -173,7 +173,6 @@ static TP_PRIORITY get_priority(TP_connection *c) return prio; } - void tp_callback(TP_connection *c) { DBUG_ASSERT(c); @@ -577,6 +576,22 @@ static void tp_resume(THD* thd) pool->resume(c); } +static int tp_wake(THD *thd) +{ + TP_connection *c= get_TP_connection(thd); + if (c->state != TP_STATE_IDLE) + return -1; + int ret= -1; + mysql_mutex_lock(&thd->LOCK_thd_kill); + if (!thd->woken) + { + ret= pool->wake(c); + thd->woken= ret == 0; + } + mysql_mutex_unlock(&thd->LOCK_thd_kill); + return ret; +} + static scheduler_functions tp_scheduler_functions= { 0, // max_threads @@ -588,7 +603,8 @@ static scheduler_functions tp_scheduler_functions= tp_wait_end, // thd_wait_end tp_post_kill_notification, // post kill notification tp_end, // end - tp_resume + tp_resume, + tp_wake }; void pool_of_threads_scheduler(struct scheduler_functions *func, diff --git a/sql/threadpool_generic.cc b/sql/threadpool_generic.cc index eb08441a4d5..2e4ed2c273b 100644 --- a/sql/threadpool_generic.cc +++ b/sql/threadpool_generic.cc @@ -1337,6 +1337,15 @@ void TP_pool_generic::resume(TP_connection* c) add(c); } +int TP_pool_generic::wake(TP_connection *c) +{ + TP_connection_generic *con= (TP_connection_generic *) c; + int ret= io_poll_disassociate_fd(con->thread_group->pollfd, con->fd); + if (ret) + return ret; + resume(con); + return 0; +} /** MySQL scheduler callback: wait begin */ diff --git a/sql/threadpool_win.cc b/sql/threadpool_win.cc index ed68e31c755..be3f598eb27 100644 --- a/sql/threadpool_win.cc +++ b/sql/threadpool_win.cc @@ -90,6 +90,7 @@ public: void set_io_timeout(int sec) override; void wait_begin(int type) override; void wait_end() override; + int wake(); ulonglong timeout=ULLONG_MAX; OVERLAPPED overlapped{}; @@ -131,6 +132,11 @@ void TP_pool_win::resume(TP_connection* c) SubmitThreadpoolWork(((TP_connection_win*)c)->work); } +int TP_pool_win::wake(TP_connection *c) +{ + return ((TP_connection_win *) c)->wake(); +} + #define CHECK_ALLOC_ERROR(op) \ do \ { \ @@ -177,6 +183,13 @@ int TP_connection_win::start_io() return 0; } +int TP_connection_win::wake() +{ + if (!CancelIoEx(sock.m_handle, &sock.m_overlapped)) + return GetLastError(); + return 0; +} + /* Recalculate wait timeout, maybe reset timer. */ |