summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVladislav Vaintroub <wlad@mariadb.com>2022-02-18 18:10:34 +0100
committerVladislav Vaintroub <wlad@mariadb.com>2022-02-21 16:05:00 +0100
commitdcd3ce0bc5433af23fbd44b1e28721e447cdbc94 (patch)
treed7a7612c90ba0cf1d552437d696c5f37eb8c6f87
parentbbe99cd4e2d7c83a06dd93ea88af97f2d5796810 (diff)
downloadmariadb-git-10.9-wlad.tar.gz
For Nikita : illustrate of queuing APC, when connection is sleeping10.9-wlad
-rw-r--r--sql/scheduler.h10
-rw-r--r--sql/sql_class.h1
-rw-r--r--sql/sql_parse.cc10
-rw-r--r--sql/threadpool.h3
-rw-r--r--sql/threadpool_common.cc20
-rw-r--r--sql/threadpool_generic.cc9
-rw-r--r--sql/threadpool_win.cc13
7 files changed, 64 insertions, 2 deletions
diff --git a/sql/scheduler.h b/sql/scheduler.h
index c2686aad21c..37754f8c21d 100644
--- a/sql/scheduler.h
+++ b/sql/scheduler.h
@@ -42,6 +42,16 @@ struct scheduler_functions
void (*end)(void);
/** resume previous unfinished command (threadpool only)*/
void (*thd_resume)(THD* thd);
+
+ /**
+ requests asynchronous procedure call execution
+ Works only when THD is sleeping, i.e awaiting user requests.
+
+ When THD is active, there is another mechanism for that.
+ @return 0, if APC is queued for execution, otherwise the connection
+ active (or maybe executing another APC)
+ */
+ int (*thd_wake)(THD *thd);
};
diff --git a/sql/sql_class.h b/sql/sql_class.h
index ab04e17fdfd..e9c39b42b02 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -2572,6 +2572,7 @@ private:
{ DBUG_ASSERT(0); return Statement::is_conventional(); }
public:
+ bool woken= false;
MDL_context mdl_context;
/* Used to execute base64 coded binlog events in MySQL server */
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index 18629f4bd22..d3ea7bf7057 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -1209,6 +1209,16 @@ dispatch_command_return do_command(THD *thd, bool blocking)
#else
DBUG_ASSERT(!thd->async_state.pending_ops());
#endif
+ if (thd->woken)
+ {
+ thd->async_state.m_state= thd_async_state::enum_async_state::NONE;
+ mysql_mutex_lock(&thd->LOCK_thd_kill);
+ thd->woken= false;
+ mysql_mutex_unlock(&thd->LOCK_thd_kill);
+ /* Does whatever one needs for when THD is woken*/
+ /* MOST IMPORTANT!, return once you done */
+ return DISPATCH_COMMAND_SUCCESS;
+ }
if (thd->async_state.m_state == thd_async_state::enum_async_state::RESUMED)
{
diff --git a/sql/threadpool.h b/sql/threadpool.h
index 7737d056b4a..fc060f9ee06 100644
--- a/sql/threadpool.h
+++ b/sql/threadpool.h
@@ -134,6 +134,7 @@ struct TP_pool
virtual int get_thread_count() { return tp_stats.num_worker_threads; }
virtual int get_idle_thread_count(){ return 0; }
virtual void resume(TP_connection* c)=0;
+ virtual int wake(TP_connection *c)= 0;
};
#ifdef _WIN32
@@ -148,6 +149,7 @@ struct TP_pool_win:TP_pool
virtual int set_max_threads(uint);
virtual int set_min_threads(uint);
void resume(TP_connection *c);
+ int wake(TP_connection *c);
};
#endif
@@ -162,6 +164,7 @@ struct TP_pool_generic :TP_pool
virtual int set_stall_limit(uint);
virtual int get_idle_thread_count();
void resume(TP_connection* c);
+ int wake(TP_connection *c);
};
#endif /* HAVE_POOL_OF_THREADS */
diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc
index 97c6c317cc4..95faafe1e63 100644
--- a/sql/threadpool_common.cc
+++ b/sql/threadpool_common.cc
@@ -173,7 +173,6 @@ static TP_PRIORITY get_priority(TP_connection *c)
return prio;
}
-
void tp_callback(TP_connection *c)
{
DBUG_ASSERT(c);
@@ -577,6 +576,22 @@ static void tp_resume(THD* thd)
pool->resume(c);
}
+static int tp_wake(THD *thd)
+{
+ TP_connection *c= get_TP_connection(thd);
+ if (c->state != TP_STATE_IDLE)
+ return -1;
+ int ret= -1;
+ mysql_mutex_lock(&thd->LOCK_thd_kill);
+ if (!thd->woken)
+ {
+ ret= pool->wake(c);
+ thd->woken= ret == 0;
+ }
+ mysql_mutex_unlock(&thd->LOCK_thd_kill);
+ return ret;
+}
+
static scheduler_functions tp_scheduler_functions=
{
0, // max_threads
@@ -588,7 +603,8 @@ static scheduler_functions tp_scheduler_functions=
tp_wait_end, // thd_wait_end
tp_post_kill_notification, // post kill notification
tp_end, // end
- tp_resume
+ tp_resume,
+ tp_wake
};
void pool_of_threads_scheduler(struct scheduler_functions *func,
diff --git a/sql/threadpool_generic.cc b/sql/threadpool_generic.cc
index eb08441a4d5..2e4ed2c273b 100644
--- a/sql/threadpool_generic.cc
+++ b/sql/threadpool_generic.cc
@@ -1337,6 +1337,15 @@ void TP_pool_generic::resume(TP_connection* c)
add(c);
}
+int TP_pool_generic::wake(TP_connection *c)
+{
+ TP_connection_generic *con= (TP_connection_generic *) c;
+ int ret= io_poll_disassociate_fd(con->thread_group->pollfd, con->fd);
+ if (ret)
+ return ret;
+ resume(con);
+ return 0;
+}
/**
MySQL scheduler callback: wait begin
*/
diff --git a/sql/threadpool_win.cc b/sql/threadpool_win.cc
index ed68e31c755..be3f598eb27 100644
--- a/sql/threadpool_win.cc
+++ b/sql/threadpool_win.cc
@@ -90,6 +90,7 @@ public:
void set_io_timeout(int sec) override;
void wait_begin(int type) override;
void wait_end() override;
+ int wake();
ulonglong timeout=ULLONG_MAX;
OVERLAPPED overlapped{};
@@ -131,6 +132,11 @@ void TP_pool_win::resume(TP_connection* c)
SubmitThreadpoolWork(((TP_connection_win*)c)->work);
}
+int TP_pool_win::wake(TP_connection *c)
+{
+ return ((TP_connection_win *) c)->wake();
+}
+
#define CHECK_ALLOC_ERROR(op) \
do \
{ \
@@ -177,6 +183,13 @@ int TP_connection_win::start_io()
return 0;
}
+int TP_connection_win::wake()
+{
+ if (!CancelIoEx(sock.m_handle, &sock.m_overlapped))
+ return GetLastError();
+ return 0;
+}
+
/*
Recalculate wait timeout, maybe reset timer.
*/