summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVladislav Vaintroub <wlad@mariadb.com>2020-07-26 22:16:55 +0200
committerVladislav Vaintroub <wlad@mariadb.com>2020-07-28 21:24:55 +0200
commit51cd130eeabe1fa5886ba33f68b0f9cb54dec518 (patch)
tree837a560066e478511f06ef6b2ad380d117188c6e
parent56990b18d914b8150c9f777d134724d2b3390360 (diff)
downloadmariadb-git-10.6-wlad.tar.gz
-rw-r--r--sql/net_serv.cc9
-rw-r--r--sql/scheduler.h1
-rw-r--r--sql/sql_class.cc38
-rw-r--r--sql/sql_class.h104
-rw-r--r--sql/sql_parse.cc81
-rw-r--r--sql/sql_parse.h13
-rw-r--r--sql/threadpool.h3
-rw-r--r--sql/threadpool_common.cc66
-rw-r--r--sql/threadpool_generic.cc5
-rw-r--r--sql/threadpool_win.cc6
-rw-r--r--storage/innobase/log/log0log.cc13
-rw-r--r--storage/innobase/log/log0sync.cc36
-rw-r--r--storage/innobase/log/log0sync.h10
-rw-r--r--storage/innobase/trx/trx0trx.cc123
14 files changed, 447 insertions, 61 deletions
diff --git a/sql/net_serv.cc b/sql/net_serv.cc
index 3e173a47f02..59f77beafeb 100644
--- a/sql/net_serv.cc
+++ b/sql/net_serv.cc
@@ -627,8 +627,13 @@ net_real_write(NET *net,const uchar *packet, size_t len)
my_bool net_blocking = vio_is_blocking(net->vio);
DBUG_ENTER("net_real_write");
-#if defined(MYSQL_SERVER) && defined(USE_QUERY_CACHE)
- query_cache_insert(net->thd, (char*) packet, len, net->pkt_nr);
+#if defined(MYSQL_SERVER)
+ THD *thd= (THD *)net->thd;
+#if defined(USE_QUERY_CACHE)
+ query_cache_insert(thd, (char*) packet, len, net->pkt_nr);
+#endif
+ if (likely(thd))
+ thd->async_state.wait_for_pending_ops();
#endif
if (unlikely(net->error == 2))
diff --git a/sql/scheduler.h b/sql/scheduler.h
index ebf8d6e9e64..b21b3bb362c 100644
--- a/sql/scheduler.h
+++ b/sql/scheduler.h
@@ -40,6 +40,7 @@ struct scheduler_functions
void (*thd_wait_end)(THD *thd);
void (*post_kill_notification)(THD *thd);
void (*end)(void);
+ void (*thd_resume)(THD* thd);
};
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 0b142a22f59..17d27621334 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -676,7 +676,8 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
m_stmt_da(&main_da),
tdc_hash_pins(0),
xid_hash_pins(0),
- m_tmp_tables_locked(false)
+ m_tmp_tables_locked(false),
+ async_state()
#ifdef HAVE_REPLICATION
,
current_linfo(0),
@@ -4904,6 +4905,41 @@ void reset_thd(MYSQL_THD thd)
free_root(thd->mem_root, MYF(MY_KEEP_PREALLOC));
}
+extern "C" MYSQL_THD thd_increment_pending_ops(void)
+{
+ THD *thd = current_thd;
+ if (!thd)
+ return NULL;
+ thd->async_state.inc_pending_ops();
+ return thd;
+}
+
+extern "C" int
+thd_pending_ops(MYSQL_THD thd)
+{
+ return thd->async_state.pending_ops();
+}
+
+extern "C" void thd_decrement_pending_ops(MYSQL_THD thd)
+{
+ DBUG_ASSERT(thd);
+ if (thd->async_state.dec_pending_ops() == 0)
+ {
+ switch(thd->async_state.m_state)
+ {
+ case thd_async_state::enum_async_state::SUSPEND:
+ DBUG_ASSERT(thd->scheduler->thd_resume);
+ thd->scheduler->thd_resume(thd);
+ break;
+ case thd_async_state::enum_async_state::NONE:
+ break;
+ default:
+ DBUG_ASSERT(0);
+ }
+ }
+}
+
+
unsigned long long thd_get_query_id(const MYSQL_THD thd)
{
return((unsigned long long)thd->query_id);
diff --git a/sql/sql_class.h b/sql/sql_class.h
index e13b896c820..ae451bf9c83 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -2250,6 +2250,109 @@ struct THD_count
~THD_count() { thread_count--; }
};
+struct thd_async_state
+{
+ enum class enum_async_state
+ {
+ NONE,
+ SUSPEND,
+ RESUME
+ };
+ enum_async_state m_state{ enum_async_state::NONE};
+ enum enum_server_command m_command {COM_SLEEP};
+ LEX_STRING m_packet{};
+
+ mysql_mutex_t m_mtx;
+ mysql_cond_t m_cond;
+ int m_pending_ops=0;
+
+ thd_async_state()
+ {
+ mysql_mutex_init(0, &m_mtx, 0);
+ mysql_cond_init(0, &m_cond, 0);
+ }
+ /*
+ Only used with threadpool, where one can "suspend" and "resume" a THD.
+ Suspend only means leaving do_command earlier, while saving some state
+ Resume is continuing suspended do_command(), where it finished last time.
+ */
+ bool try_suspend()
+ {
+ bool ret= true;
+ mysql_mutex_lock(&m_mtx);
+ DBUG_ASSERT(m_state == enum_async_state::NONE);
+ DBUG_ASSERT(m_pending_ops >= 0);
+ if (m_pending_ops == 0)
+ {
+ /* No pending operations, can't suspend, since nobody can resume */
+ ret = false;
+ }
+ m_state= enum_async_state::SUSPEND;
+ mysql_mutex_unlock(&m_mtx);
+ return ret;
+ }
+
+ ~thd_async_state()
+ {
+ wait_for_pending_ops();
+ mysql_mutex_destroy(&m_mtx);
+ mysql_cond_destroy(&m_cond);
+ }
+
+ /*
+ Increment pending asynchronous operations.
+ The client response may not be written if
+ this count > 0.
+ So, without threadpool query needs to wait for
+ the operations to finish.
+ With threadpool, THD can be suspended and resumed
+ when this counter goes to 0.
+ */
+ void inc_pending_ops()
+ {
+ mysql_mutex_lock(&m_mtx);
+ m_pending_ops++;
+ mysql_mutex_unlock(&m_mtx);
+ }
+
+ int dec_pending_ops()
+ {
+ mysql_mutex_lock(&m_mtx);
+ m_pending_ops--;
+ if (!m_pending_ops)
+ mysql_cond_signal(&m_cond);
+ mysql_mutex_unlock(&m_mtx);
+ return m_pending_ops;
+ }
+
+ int pending_ops()
+ {
+ return m_pending_ops;
+ }
+
+ /* Wait for pending operations to finish.*/
+ void wait_for_pending_ops()
+ {
+ /*
+ It is fine to dirty read m_pending_ops and compare it with 0.
+ It is only incremented by the current thread, and may be decremented
+ by another one, so dirty read can be off and show positive number
+ when it is really 0
+ */
+ if (!m_pending_ops)
+ return;
+ mysql_mutex_lock(&m_mtx);
+ DBUG_ASSERT(m_pending_ops >= 0);
+ while (m_pending_ops)
+ mysql_cond_wait(&m_cond, &m_mtx);
+ mysql_mutex_unlock(&m_mtx);
+ }
+};
+
+extern "C" MYSQL_THD thd_increment_pending_ops(void);
+extern "C" void thd_decrement_pending_ops(MYSQL_THD);
+extern "C" int thd_pending_ops(MYSQL_THD);
+
/**
@class THD
@@ -4935,6 +5038,7 @@ private:
}
public:
+ thd_async_state async_state;
#ifdef HAVE_REPLICATION
/*
If we do a purge of binary logs, log index info of the threads
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index 1d830f60da0..847ca3f5b40 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -1155,24 +1155,48 @@ static enum enum_server_command fetch_command(THD *thd, char *packet)
/**
Read one command from connection and execute it (query or simple command).
- This function is called in loop from thread function.
+ This function is to be used by different schedulers (one-thread-per-connection,
+ pool-of-threads)
For profiling to work, it must never be called recursively.
+ @param thd - client connection context
+
+ @param blocking - wait for command to finish.
+ if true, will wait for outstanding operations (e.g group commit) to finish,
+ before returning. otherwise, it might return DISPATCH_COMMAND_WOULDBLOCK,
+ in this case another do_command() needs to be executed to finish the current
+ command.
+
+ @retval
+ DISPATCH_COMMAND_SUCCESS(0) - success
@retval
- 0 success
+ DISPATCH_COMMAND_ERROR request of THD shutdown (see dispatch_command() description)
@retval
- 1 request of thread shutdown (see dispatch_command() description)
+ DISPATCH_COMMAND_WOULDBLOCK - need to wait for commit notification
*/
-bool do_command(THD *thd)
+dispatch_command_return do_command(THD *thd, bool blocking)
{
- bool return_value;
+ dispatch_command_return return_value;
char *packet= 0;
ulong packet_length;
NET *net= &thd->net;
enum enum_server_command command;
DBUG_ENTER("do_command");
+ DBUG_ASSERT(!thd->async_state.pending_ops());
+
+ if (thd->async_state.m_state == thd_async_state::enum_async_state::RESUME)
+ {
+ /*
+ Resuming previously suspended command.
+ Restore the state
+ */
+ command = thd->async_state.m_command;
+ packet = thd->async_state.m_packet.str;
+ packet_length = (ulong)thd->async_state.m_packet.length;
+ goto resume;
+ }
/*
indicator of uninitialized lex => normal flow of errors handling
@@ -1240,12 +1264,12 @@ bool do_command(THD *thd)
if (net->error != 3)
{
- return_value= TRUE; // We have to close it.
+ return_value= DISPATCH_COMMAND_ERROR; // We have to close it.
goto out;
}
net->error= 0;
- return_value= FALSE;
+ return_value= DISPATCH_COMMAND_SUCCESS;
goto out;
}
@@ -1302,7 +1326,7 @@ bool do_command(THD *thd)
MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da());
thd->m_statement_psi= NULL;
thd->m_digest= NULL;
- return_value= FALSE;
+ return_value= DISPATCH_COMMAND_SUCCESS;
wsrep_after_command_before_result(thd);
goto out;
@@ -1328,7 +1352,7 @@ bool do_command(THD *thd)
thd->m_statement_psi= NULL;
thd->m_digest= NULL;
- return_value= FALSE;
+ return_value= DISPATCH_COMMAND_SUCCESS;
wsrep_after_command_before_result(thd);
goto out;
}
@@ -1339,10 +1363,19 @@ bool do_command(THD *thd)
DBUG_ASSERT(packet_length);
DBUG_ASSERT(!thd->apc_target.is_enabled());
+
+resume:
return_value= dispatch_command(command, thd, packet+1,
- (uint) (packet_length-1));
- DBUG_ASSERT(!thd->apc_target.is_enabled());
+ (uint) (packet_length-1), blocking);
+ if (return_value == DISPATCH_COMMAND_WOULDBLOCK)
+ {
+ /* Store current state for suspend/resume in threadpool*/
+ thd->async_state.m_command= command;
+ thd->async_state.m_packet={packet,packet_length};
+ DBUG_RETURN(return_value);
+ }
+ DBUG_ASSERT(!thd->apc_target.is_enabled());
out:
thd->lex->restore_set_statement_var();
/* The statement instrumentation must be closed in all cases. */
@@ -1499,8 +1532,8 @@ public:
1 request of thread shutdown, i. e. if command is
COM_QUIT/COM_SHUTDOWN
*/
-bool dispatch_command(enum enum_server_command command, THD *thd,
- char* packet, uint packet_length)
+dispatch_command_return dispatch_command(enum enum_server_command command, THD *thd,
+ char* packet, uint packet_length, bool blocking)
{
NET *net= &thd->net;
bool error= 0;
@@ -1512,6 +1545,12 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
"<?>")));
bool drop_more_results= 0;
+ if (thd->async_state.m_state == thd_async_state::enum_async_state::RESUME)
+ {
+ thd->async_state.m_state = thd_async_state::enum_async_state::NONE;
+ goto resume;
+ }
+
/* keep it withing 1 byte */
compile_time_assert(COM_END == 255);
@@ -2242,6 +2281,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
general_log_print(thd, command, NullS);
my_eof(thd);
break;
+
case COM_SLEEP:
case COM_CONNECT: // Impossible here
case COM_TIME: // Impossible from client
@@ -2255,7 +2295,18 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
}
dispatch_end:
- do_end_of_statement= true;
+ /*
+ For the threadpool i.e if non-blocking call, if not all async operations
+ are finished, return without cleanup. The cleanup will be done on
+ later, when command execution is resumed.
+ */
+ if (!blocking && !error && thd->async_state.pending_ops())
+ {
+ DBUG_RETURN(DISPATCH_COMMAND_WOULDBLOCK);
+ }
+
+resume:
+
#ifdef WITH_WSREP
/*
Next test should really be WSREP(thd), but that causes a failure when doing
@@ -2367,7 +2418,7 @@ dispatch_end:
/* Check that some variables are reset properly */
DBUG_ASSERT(thd->abort_on_warning == 0);
thd->lex->restore_set_statement_var();
- DBUG_RETURN(error);
+ DBUG_RETURN(error?DISPATCH_COMMAND_ERROR: DISPATCH_COMMAND_SUCCESS);
}
static bool slow_filter_masked(THD *thd, ulonglong mask)
diff --git a/sql/sql_parse.h b/sql/sql_parse.h
index 36dc68c292c..c91a5231b53 100644
--- a/sql/sql_parse.h
+++ b/sql/sql_parse.h
@@ -100,9 +100,16 @@ bool multi_delete_set_locks_and_link_aux_tables(LEX *lex);
void create_table_set_open_action_and_adjust_tables(LEX *lex);
int bootstrap(MYSQL_FILE *file);
int mysql_execute_command(THD *thd);
-bool do_command(THD *thd);
-bool dispatch_command(enum enum_server_command command, THD *thd,
- char* packet, uint packet_length);
+enum dispatch_command_return
+{
+ DISPATCH_COMMAND_SUCCESS=0,
+ DISPATCH_COMMAND_ERROR= 1,
+ DISPATCH_COMMAND_WOULDBLOCK= 2
+};
+
+dispatch_command_return do_command(THD *thd, bool blocking = true);
+dispatch_command_return dispatch_command(enum enum_server_command command, THD *thd,
+ char* packet, uint packet_length, bool blocking = true);
void log_slow_statement(THD *thd);
bool append_file_to_dir(THD *thd, const char **filename_ptr,
const LEX_CSTRING *table_name);
diff --git a/sql/threadpool.h b/sql/threadpool.h
index 285b46e3b27..b1c26630812 100644
--- a/sql/threadpool.h
+++ b/sql/threadpool.h
@@ -132,6 +132,7 @@ struct TP_pool
virtual int set_stall_limit(uint){ return 0; }
virtual int get_thread_count() { return tp_stats.num_worker_threads; }
virtual int get_idle_thread_count(){ return 0; }
+ virtual void resume(TP_connection* c)=0;
};
#ifdef _WIN32
@@ -145,6 +146,7 @@ struct TP_pool_win:TP_pool
virtual void add(TP_connection *);
virtual int set_max_threads(uint);
virtual int set_min_threads(uint);
+ void resume(TP_connection *c);
};
#endif
@@ -158,6 +160,7 @@ struct TP_pool_generic :TP_pool
virtual int set_pool_size(uint);
virtual int set_stall_limit(uint);
virtual int get_idle_thread_count();
+ void resume(TP_connection* c);
};
#endif /* HAVE_POOL_OF_THREADS */
diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc
index c27f42b3d62..1f65a85c233 100644
--- a/sql/threadpool_common.cc
+++ b/sql/threadpool_common.cc
@@ -23,6 +23,8 @@
#include <sql_audit.h>
#include <debug_sync.h>
#include <threadpool.h>
+#include <sql_class.h>
+#include <sql_parse.h>
#ifdef WITH_WSREP
#include "wsrep_trans_observer.h"
@@ -51,7 +53,7 @@ TP_STATISTICS tp_stats;
static void threadpool_remove_connection(THD *thd);
-static int threadpool_process_request(THD *thd);
+static dispatch_command_return threadpool_process_request(THD *thd);
static THD* threadpool_add_connection(CONNECT *connect, TP_connection *c);
extern bool do_command(THD*);
@@ -195,10 +197,30 @@ void tp_callback(TP_connection *c)
}
c->connect= 0;
}
- else if (threadpool_process_request(thd))
+ else
{
- /* QUIT or an error occurred. */
- goto error;
+retry:
+ switch(threadpool_process_request(thd))
+ {
+ case DISPATCH_COMMAND_WOULDBLOCK:
+ if (!thd->async_state.try_suspend())
+ {
+ /*
+ All async operations finished meanwhile, thus nobody is will wake up
+ this THD (this is done at the end of, and therefore do "resume" manually.
+ */
+ thd->async_state.m_state = thd_async_state::enum_async_state::RESUME;
+ goto retry;
+ }
+ worker_context.restore();
+ return;
+ case DISPATCH_COMMAND_ERROR:
+ /* QUIT or an error occurred. */
+ goto error;
+ case DISPATCH_COMMAND_SUCCESS:
+ break;
+ }
+ thd->async_state.m_state= thd_async_state::enum_async_state::NONE;
}
/* Set priority */
@@ -321,10 +343,13 @@ static void handle_wait_timeout(THD *thd)
/**
Process a single client request or a single batch.
*/
-static int threadpool_process_request(THD *thd)
+static dispatch_command_return threadpool_process_request(THD *thd)
{
- int retval= 0;
+ dispatch_command_return retval= DISPATCH_COMMAND_SUCCESS;
+
thread_attach(thd);
+ if(thd->async_state.m_state == thd_async_state::enum_async_state::RESUME)
+ goto resume;
if (thd->killed >= KILL_CONNECTION)
{
@@ -332,7 +357,7 @@ static int threadpool_process_request(THD *thd)
killed flag was set by timeout handler
or KILL command. Return error.
*/
- retval= 1;
+ retval= DISPATCH_COMMAND_ERROR;
if(thd->killed == KILL_WAIT_TIMEOUT)
handle_wait_timeout(thd);
goto end;
@@ -356,12 +381,20 @@ static int threadpool_process_request(THD *thd)
if (mysql_audit_release_required(thd))
mysql_audit_release(thd);
- if ((retval= do_command(thd)) != 0)
- goto end;
+resume:
+ retval= do_command(thd, false);
+ switch(retval)
+ {
+ case DISPATCH_COMMAND_WOULDBLOCK:
+ case DISPATCH_COMMAND_ERROR:
+ goto end;
+ case DISPATCH_COMMAND_SUCCESS:
+ break;
+ }
if (!thd_is_connection_alive(thd))
{
- retval= 1;
+ retval=DISPATCH_COMMAND_ERROR;
goto end;
}
@@ -369,7 +402,7 @@ static int threadpool_process_request(THD *thd)
vio= thd->net.vio;
if (!vio->has_data(vio))
- {
+ {
/* More info on this debug sync is in sql_parse.cc*/
DEBUG_SYNC(thd, "before_do_command_net_read");
goto end;
@@ -505,6 +538,14 @@ static void tp_post_kill_notification(THD *thd)
post_kill_notification(thd);
}
+static void tp_resume(THD* thd)
+{
+ DBUG_ASSERT(thd->async_state.m_state == thd_async_state::enum_async_state::SUSPEND);
+ thd->async_state.m_state = thd_async_state::enum_async_state::RESUME;
+ TP_connection* c = get_TP_connection(thd);
+ pool->resume(c);
+}
+
static scheduler_functions tp_scheduler_functions=
{
0, // max_threads
@@ -515,7 +556,8 @@ static scheduler_functions tp_scheduler_functions=
tp_wait_begin, // thd_wait_begin
tp_wait_end, // thd_wait_end
tp_post_kill_notification, // post kill notification
- tp_end // end
+ tp_end, // end
+ tp_resume
};
void pool_of_threads_scheduler(struct scheduler_functions *func,
diff --git a/sql/threadpool_generic.cc b/sql/threadpool_generic.cc
index 2a5587fa04a..f135b920c17 100644
--- a/sql/threadpool_generic.cc
+++ b/sql/threadpool_generic.cc
@@ -1314,7 +1314,10 @@ void TP_pool_generic::add(TP_connection *c)
DBUG_VOID_RETURN;
}
-
+void TP_pool_generic::resume(TP_connection* c)
+{
+ add(c);
+}
/**
MySQL scheduler callback: wait begin
diff --git a/sql/threadpool_win.cc b/sql/threadpool_win.cc
index df8a6c216a3..c83fb84ba4a 100644
--- a/sql/threadpool_win.cc
+++ b/sql/threadpool_win.cc
@@ -125,6 +125,11 @@ void TP_pool_win::add(TP_connection *c)
}
}
+void TP_pool_win::resume(TP_connection* c)
+{
+ SubmitThreadpoolWork(((TP_connection_win*)c)->work);
+}
+
#define CHECK_ALLOC_ERROR(op) \
do \
{ \
@@ -438,3 +443,4 @@ TP_connection *TP_pool_win::new_connection(CONNECT *connect)
}
return c;
}
+
diff --git a/storage/innobase/log/log0log.cc b/storage/innobase/log/log0log.cc
index 84589e7950f..5f8bf3040a3 100644
--- a/storage/innobase/log/log0log.cc
+++ b/storage/innobase/log/log0log.cc
@@ -1082,6 +1082,17 @@ bool log_write_lock_own()
}
#endif
+
+/* Execute callback , once lsn is flushed or written to log. */
+void log_register_wait(lsn_t lsn, bool flush, void (*f)(void *), void *par)
+{
+ completion_callback c{f, par};
+ if (flush)
+ flush_lock.register_wait(lsn,c);
+ else
+ write_lock.register_wait(lsn,c);
+}
+
/** Ensure that the log has been written to the log file up to a given
log entry (such as that of a transaction commit). Start a new write, or
wait and check if an already running write is covering the request.
@@ -1392,7 +1403,7 @@ bool log_checkpoint()
log_mutex_exit();
- log_write_up_to(flush_lsn, true, true);
+ log_write_up_to(flush_lsn,true,true);
log_mutex_enter();
diff --git a/storage/innobase/log/log0sync.cc b/storage/innobase/log/log0sync.cc
index 7799e605576..6341380d544 100644
--- a/storage/innobase/log/log0sync.cc
+++ b/storage/innobase/log/log0sync.cc
@@ -77,6 +77,7 @@ Note that if write operation is very fast, a) or b) can be fine as alternative.
#include <log0types.h>
#include "log0sync.h"
#include <mysql/service_thd_wait.h>
+#include <sql_class.h>
/**
Helper class , used in group commit lock.
@@ -249,6 +250,9 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num)
void group_commit_lock::release(value_type num)
{
+ completion_callback callbacks[1024];
+ size_t callback_count = 0;
+
std::unique_lock<std::mutex> lk(m_mtx);
m_lock = false;
@@ -291,6 +295,23 @@ void group_commit_lock::release(value_type num)
prev= cur;
}
}
+ for (auto &c : m_pending_callbacks)
+ {
+ if (c.first <= num)
+ {
+ if (callback_count < array_elements(callbacks))
+ callbacks[callback_count++]= c.second;
+ else
+ c.second.m_callback(c.second.m_param);
+ }
+ }
+
+ auto it= std::remove_if(
+ m_pending_callbacks.begin(), m_pending_callbacks.end(),
+ [num](const std::pair<value_type,completion_callback> &c) { return c.first <= num; });
+
+ m_pending_callbacks.erase(it, m_pending_callbacks.end());
+
lk.unlock();
for (cur= wakeup_list; cur; cur= next)
@@ -298,6 +319,21 @@ void group_commit_lock::release(value_type num)
next= cur->m_next;
cur->m_sema.wake();
}
+
+ for (size_t i= 0; i < callback_count; i++)
+ callbacks[i].m_callback(callbacks[i].m_param);
+}
+
+void group_commit_lock::register_wait(lsn_t lsn, completion_callback &c)
+{
+ std::unique_lock<std::mutex> lk(m_mtx);
+ if (lsn <= value())
+ {
+ lk.unlock();
+ c.m_callback(c.m_param);
+ return;
+ }
+ m_pending_callbacks.push_back({ lsn,c });
}
#ifndef DBUG_OFF
diff --git a/storage/innobase/log/log0sync.h b/storage/innobase/log/log0sync.h
index 40afbf74ecd..c550f027b08 100644
--- a/storage/innobase/log/log0sync.h
+++ b/storage/innobase/log/log0sync.h
@@ -18,6 +18,7 @@ this program; if not, write to the Free Software Foundation, Inc.,
#include <atomic>
#include <thread>
#include <log0types.h>
+#include <vector>
struct group_commit_waiter_t;
@@ -52,6 +53,12 @@ Operations supported on this semaphore
5. set_pending_value()
*/
+struct completion_callback
+{
+ void (*m_callback)(void*);
+ void* m_param;
+};
+
class group_commit_lock
{
using value_type = lsn_t;
@@ -63,6 +70,8 @@ class group_commit_lock
std::atomic<value_type> m_pending_value;
bool m_lock;
group_commit_waiter_t* m_waiters_list;
+ std::vector<std::pair<value_type,completion_callback>> m_pending_callbacks;
+
public:
group_commit_lock();
enum lock_return_code
@@ -71,6 +80,7 @@ public:
EXPIRED
};
lock_return_code acquire(value_type num);
+ void register_wait(value_type num, completion_callback& callback);
void release(value_type num);
value_type value() const;
value_type pending() const;
diff --git a/storage/innobase/trx/trx0trx.cc b/storage/innobase/trx/trx0trx.cc
index 03e60d80d5b..4a86bb9261f 100644
--- a/storage/innobase/trx/trx0trx.cc
+++ b/storage/innobase/trx/trx0trx.cc
@@ -23,6 +23,7 @@ The transaction
Created 3/26/1996 Heikki Tuuri
*******************************************************/
+#define MYSQL_SERVER
#include "trx0trx.h"
@@ -1185,34 +1186,104 @@ trx_finalize_for_fts(
trx->fts_trx = NULL;
}
-/**********************************************************************//**
-If required, flushes the log to disk based on the value of
-innodb_flush_log_at_trx_commit. */
-static
-void
-trx_flush_log_if_needed_low(
-/*========================*/
- lsn_t lsn) /*!< in: lsn up to which logs are to be
- flushed. */
+extern "C" MYSQL_THD thd_increment_pending_ops();
+extern "C" void thd_decrement_pending_ops(MYSQL_THD);
+extern "C" int thd_pending_ops(MYSQL_THD );
+
+static Atomic_counter<int> n_pending_log_write;
+
+static bool need_to_flush_log()
{
- bool flush = srv_file_flush_method != SRV_NOSYNC;
+ if (srv_file_flush_method != SRV_NOSYNC)
+ return false;
- switch (srv_flush_log_at_trx_commit) {
- case 3:
- case 2:
- /* Write the log but do not flush it to disk */
- flush = false;
- /* fall through */
- case 1:
- /* Write the log and optionally flush it to disk */
- log_write_up_to(lsn, flush);
- return;
- case 0:
- /* Do nothing */
- return;
- }
+ switch (srv_flush_log_at_trx_commit)
+ {
+ case 3:
+ case 2:
+ /* Write the log but do not flush it to disk */
+ return false;
+ /* fall through */
+ case 1:
+ return true;
+ case 0:
+ return false;
+ }
+ ut_error;
+ return false;
+}
+static void background_log_write(void *)
+{
+ bool flush= need_to_flush_log();
- ut_error;
+ lsn_t old_lsn= 0;
+ do
+ {
+ log_mutex_enter();
+ auto lsn= log_sys.get_lsn();
+ log_mutex_exit();
+ n_pending_log_write= 0;
+ if (lsn == old_lsn)
+ break;
+ log_write_up_to(lsn, flush);
+ old_lsn= lsn;
+ } while (n_pending_log_write);
+}
+
+static tpool::task_group bck_write_group(1);
+static tpool::task background_log_write_task(background_log_write, 0,
+ &bck_write_group);
+
+static void initiate_background_log_write_if_needed()
+{
+ if (!n_pending_log_write++)
+ srv_thread_pool->submit_task(&background_log_write_task);
+}
+
+extern "C" int thd_pending_ops(MYSQL_THD thd);
+extern void log_register_wait(lsn_t lsn, bool flush, void (*f)(void *),
+ void *par);
+
+/*
+ If required, intiates flushes the log to disk based on the value of
+ innodb_flush_log_at_trx_commit.
+
+ It can increment pending THD operations, and current connection
+ might not write response to the client, until the pending operation
+ is finished.
+
+ @param lsn_t lsn
+ @params trx_state
+*/
+static void trx_flush_log_if_needed_low(lsn_t lsn, trx_state_t trx_state)
+{
+ if (!srv_flush_log_at_trx_commit)
+ return;
+
+ if (log_sys.get_flushed_lsn() > lsn)
+ return;
+ bool flush= need_to_flush_log();
+
+ if (trx_state == TRX_STATE_PREPARED)
+ {
+ /* XA, which is used with binlog as well.
+ Be conservative, use synchronous wait.*/
+ log_write_up_to(lsn, flush);
+ return;
+ }
+
+ initiate_background_log_write_if_needed();
+
+ MYSQL_THD thd= thd_increment_pending_ops();
+ if (!thd)
+ {
+ /* This might be dictionary recalculations.*/
+ log_write_up_to(lsn, flush);
+ return;
+ }
+
+ log_register_wait(lsn, flush, (void (*)(void *)) thd_decrement_pending_ops,
+ thd);
}
/**********************************************************************//**
@@ -1227,7 +1298,7 @@ trx_flush_log_if_needed(
trx_t* trx) /*!< in/out: transaction */
{
trx->op_info = "flushing log";
- trx_flush_log_if_needed_low(lsn);
+ trx_flush_log_if_needed_low(lsn,trx->state);
trx->op_info = "";
}