diff options
author | Vladislav Vaintroub <wlad@mariadb.com> | 2020-07-26 22:16:55 +0200 |
---|---|---|
committer | Vladislav Vaintroub <wlad@mariadb.com> | 2020-07-28 21:24:55 +0200 |
commit | 51cd130eeabe1fa5886ba33f68b0f9cb54dec518 (patch) | |
tree | 837a560066e478511f06ef6b2ad380d117188c6e | |
parent | 56990b18d914b8150c9f777d134724d2b3390360 (diff) | |
download | mariadb-git-10.6-wlad.tar.gz |
WIP10.6-wlad
-rw-r--r-- | sql/net_serv.cc | 9 | ||||
-rw-r--r-- | sql/scheduler.h | 1 | ||||
-rw-r--r-- | sql/sql_class.cc | 38 | ||||
-rw-r--r-- | sql/sql_class.h | 104 | ||||
-rw-r--r-- | sql/sql_parse.cc | 81 | ||||
-rw-r--r-- | sql/sql_parse.h | 13 | ||||
-rw-r--r-- | sql/threadpool.h | 3 | ||||
-rw-r--r-- | sql/threadpool_common.cc | 66 | ||||
-rw-r--r-- | sql/threadpool_generic.cc | 5 | ||||
-rw-r--r-- | sql/threadpool_win.cc | 6 | ||||
-rw-r--r-- | storage/innobase/log/log0log.cc | 13 | ||||
-rw-r--r-- | storage/innobase/log/log0sync.cc | 36 | ||||
-rw-r--r-- | storage/innobase/log/log0sync.h | 10 | ||||
-rw-r--r-- | storage/innobase/trx/trx0trx.cc | 123 |
14 files changed, 447 insertions, 61 deletions
diff --git a/sql/net_serv.cc b/sql/net_serv.cc index 3e173a47f02..59f77beafeb 100644 --- a/sql/net_serv.cc +++ b/sql/net_serv.cc @@ -627,8 +627,13 @@ net_real_write(NET *net,const uchar *packet, size_t len) my_bool net_blocking = vio_is_blocking(net->vio); DBUG_ENTER("net_real_write"); -#if defined(MYSQL_SERVER) && defined(USE_QUERY_CACHE) - query_cache_insert(net->thd, (char*) packet, len, net->pkt_nr); +#if defined(MYSQL_SERVER) + THD *thd= (THD *)net->thd; +#if defined(USE_QUERY_CACHE) + query_cache_insert(thd, (char*) packet, len, net->pkt_nr); +#endif + if (likely(thd)) + thd->async_state.wait_for_pending_ops(); #endif if (unlikely(net->error == 2)) diff --git a/sql/scheduler.h b/sql/scheduler.h index ebf8d6e9e64..b21b3bb362c 100644 --- a/sql/scheduler.h +++ b/sql/scheduler.h @@ -40,6 +40,7 @@ struct scheduler_functions void (*thd_wait_end)(THD *thd); void (*post_kill_notification)(THD *thd); void (*end)(void); + void (*thd_resume)(THD* thd); }; diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 0b142a22f59..17d27621334 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -676,7 +676,8 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) m_stmt_da(&main_da), tdc_hash_pins(0), xid_hash_pins(0), - m_tmp_tables_locked(false) + m_tmp_tables_locked(false), + async_state() #ifdef HAVE_REPLICATION , current_linfo(0), @@ -4904,6 +4905,41 @@ void reset_thd(MYSQL_THD thd) free_root(thd->mem_root, MYF(MY_KEEP_PREALLOC)); } +extern "C" MYSQL_THD thd_increment_pending_ops(void) +{ + THD *thd = current_thd; + if (!thd) + return NULL; + thd->async_state.inc_pending_ops(); + return thd; +} + +extern "C" int +thd_pending_ops(MYSQL_THD thd) +{ + return thd->async_state.pending_ops(); +} + +extern "C" void thd_decrement_pending_ops(MYSQL_THD thd) +{ + DBUG_ASSERT(thd); + if (thd->async_state.dec_pending_ops() == 0) + { + switch(thd->async_state.m_state) + { + case thd_async_state::enum_async_state::SUSPEND: + DBUG_ASSERT(thd->scheduler->thd_resume); + thd->scheduler->thd_resume(thd); + break; + case thd_async_state::enum_async_state::NONE: + break; + default: + DBUG_ASSERT(0); + } + } +} + + unsigned long long thd_get_query_id(const MYSQL_THD thd) { return((unsigned long long)thd->query_id); diff --git a/sql/sql_class.h b/sql/sql_class.h index e13b896c820..ae451bf9c83 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -2250,6 +2250,109 @@ struct THD_count ~THD_count() { thread_count--; } }; +struct thd_async_state +{ + enum class enum_async_state + { + NONE, + SUSPEND, + RESUME + }; + enum_async_state m_state{ enum_async_state::NONE}; + enum enum_server_command m_command {COM_SLEEP}; + LEX_STRING m_packet{}; + + mysql_mutex_t m_mtx; + mysql_cond_t m_cond; + int m_pending_ops=0; + + thd_async_state() + { + mysql_mutex_init(0, &m_mtx, 0); + mysql_cond_init(0, &m_cond, 0); + } + /* + Only used with threadpool, where one can "suspend" and "resume" a THD. + Suspend only means leaving do_command earlier, while saving some state + Resume is continuing suspended do_command(), where it finished last time. + */ + bool try_suspend() + { + bool ret= true; + mysql_mutex_lock(&m_mtx); + DBUG_ASSERT(m_state == enum_async_state::NONE); + DBUG_ASSERT(m_pending_ops >= 0); + if (m_pending_ops == 0) + { + /* No pending operations, can't suspend, since nobody can resume */ + ret = false; + } + m_state= enum_async_state::SUSPEND; + mysql_mutex_unlock(&m_mtx); + return ret; + } + + ~thd_async_state() + { + wait_for_pending_ops(); + mysql_mutex_destroy(&m_mtx); + mysql_cond_destroy(&m_cond); + } + + /* + Increment pending asynchronous operations. + The client response may not be written if + this count > 0. + So, without threadpool query needs to wait for + the operations to finish. + With threadpool, THD can be suspended and resumed + when this counter goes to 0. + */ + void inc_pending_ops() + { + mysql_mutex_lock(&m_mtx); + m_pending_ops++; + mysql_mutex_unlock(&m_mtx); + } + + int dec_pending_ops() + { + mysql_mutex_lock(&m_mtx); + m_pending_ops--; + if (!m_pending_ops) + mysql_cond_signal(&m_cond); + mysql_mutex_unlock(&m_mtx); + return m_pending_ops; + } + + int pending_ops() + { + return m_pending_ops; + } + + /* Wait for pending operations to finish.*/ + void wait_for_pending_ops() + { + /* + It is fine to dirty read m_pending_ops and compare it with 0. + It is only incremented by the current thread, and may be decremented + by another one, so dirty read can be off and show positive number + when it is really 0 + */ + if (!m_pending_ops) + return; + mysql_mutex_lock(&m_mtx); + DBUG_ASSERT(m_pending_ops >= 0); + while (m_pending_ops) + mysql_cond_wait(&m_cond, &m_mtx); + mysql_mutex_unlock(&m_mtx); + } +}; + +extern "C" MYSQL_THD thd_increment_pending_ops(void); +extern "C" void thd_decrement_pending_ops(MYSQL_THD); +extern "C" int thd_pending_ops(MYSQL_THD); + /** @class THD @@ -4935,6 +5038,7 @@ private: } public: + thd_async_state async_state; #ifdef HAVE_REPLICATION /* If we do a purge of binary logs, log index info of the threads diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 1d830f60da0..847ca3f5b40 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -1155,24 +1155,48 @@ static enum enum_server_command fetch_command(THD *thd, char *packet) /** Read one command from connection and execute it (query or simple command). - This function is called in loop from thread function. + This function is to be used by different schedulers (one-thread-per-connection, + pool-of-threads) For profiling to work, it must never be called recursively. + @param thd - client connection context + + @param blocking - wait for command to finish. + if true, will wait for outstanding operations (e.g group commit) to finish, + before returning. otherwise, it might return DISPATCH_COMMAND_WOULDBLOCK, + in this case another do_command() needs to be executed to finish the current + command. + + @retval + DISPATCH_COMMAND_SUCCESS(0) - success @retval - 0 success + DISPATCH_COMMAND_ERROR request of THD shutdown (see dispatch_command() description) @retval - 1 request of thread shutdown (see dispatch_command() description) + DISPATCH_COMMAND_WOULDBLOCK - need to wait for commit notification */ -bool do_command(THD *thd) +dispatch_command_return do_command(THD *thd, bool blocking) { - bool return_value; + dispatch_command_return return_value; char *packet= 0; ulong packet_length; NET *net= &thd->net; enum enum_server_command command; DBUG_ENTER("do_command"); + DBUG_ASSERT(!thd->async_state.pending_ops()); + + if (thd->async_state.m_state == thd_async_state::enum_async_state::RESUME) + { + /* + Resuming previously suspended command. + Restore the state + */ + command = thd->async_state.m_command; + packet = thd->async_state.m_packet.str; + packet_length = (ulong)thd->async_state.m_packet.length; + goto resume; + } /* indicator of uninitialized lex => normal flow of errors handling @@ -1240,12 +1264,12 @@ bool do_command(THD *thd) if (net->error != 3) { - return_value= TRUE; // We have to close it. + return_value= DISPATCH_COMMAND_ERROR; // We have to close it. goto out; } net->error= 0; - return_value= FALSE; + return_value= DISPATCH_COMMAND_SUCCESS; goto out; } @@ -1302,7 +1326,7 @@ bool do_command(THD *thd) MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da()); thd->m_statement_psi= NULL; thd->m_digest= NULL; - return_value= FALSE; + return_value= DISPATCH_COMMAND_SUCCESS; wsrep_after_command_before_result(thd); goto out; @@ -1328,7 +1352,7 @@ bool do_command(THD *thd) thd->m_statement_psi= NULL; thd->m_digest= NULL; - return_value= FALSE; + return_value= DISPATCH_COMMAND_SUCCESS; wsrep_after_command_before_result(thd); goto out; } @@ -1339,10 +1363,19 @@ bool do_command(THD *thd) DBUG_ASSERT(packet_length); DBUG_ASSERT(!thd->apc_target.is_enabled()); + +resume: return_value= dispatch_command(command, thd, packet+1, - (uint) (packet_length-1)); - DBUG_ASSERT(!thd->apc_target.is_enabled()); + (uint) (packet_length-1), blocking); + if (return_value == DISPATCH_COMMAND_WOULDBLOCK) + { + /* Store current state for suspend/resume in threadpool*/ + thd->async_state.m_command= command; + thd->async_state.m_packet={packet,packet_length}; + DBUG_RETURN(return_value); + } + DBUG_ASSERT(!thd->apc_target.is_enabled()); out: thd->lex->restore_set_statement_var(); /* The statement instrumentation must be closed in all cases. */ @@ -1499,8 +1532,8 @@ public: 1 request of thread shutdown, i. e. if command is COM_QUIT/COM_SHUTDOWN */ -bool dispatch_command(enum enum_server_command command, THD *thd, - char* packet, uint packet_length) +dispatch_command_return dispatch_command(enum enum_server_command command, THD *thd, + char* packet, uint packet_length, bool blocking) { NET *net= &thd->net; bool error= 0; @@ -1512,6 +1545,12 @@ bool dispatch_command(enum enum_server_command command, THD *thd, "<?>"))); bool drop_more_results= 0; + if (thd->async_state.m_state == thd_async_state::enum_async_state::RESUME) + { + thd->async_state.m_state = thd_async_state::enum_async_state::NONE; + goto resume; + } + /* keep it withing 1 byte */ compile_time_assert(COM_END == 255); @@ -2242,6 +2281,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd, general_log_print(thd, command, NullS); my_eof(thd); break; + case COM_SLEEP: case COM_CONNECT: // Impossible here case COM_TIME: // Impossible from client @@ -2255,7 +2295,18 @@ bool dispatch_command(enum enum_server_command command, THD *thd, } dispatch_end: - do_end_of_statement= true; + /* + For the threadpool i.e if non-blocking call, if not all async operations + are finished, return without cleanup. The cleanup will be done on + later, when command execution is resumed. + */ + if (!blocking && !error && thd->async_state.pending_ops()) + { + DBUG_RETURN(DISPATCH_COMMAND_WOULDBLOCK); + } + +resume: + #ifdef WITH_WSREP /* Next test should really be WSREP(thd), but that causes a failure when doing @@ -2367,7 +2418,7 @@ dispatch_end: /* Check that some variables are reset properly */ DBUG_ASSERT(thd->abort_on_warning == 0); thd->lex->restore_set_statement_var(); - DBUG_RETURN(error); + DBUG_RETURN(error?DISPATCH_COMMAND_ERROR: DISPATCH_COMMAND_SUCCESS); } static bool slow_filter_masked(THD *thd, ulonglong mask) diff --git a/sql/sql_parse.h b/sql/sql_parse.h index 36dc68c292c..c91a5231b53 100644 --- a/sql/sql_parse.h +++ b/sql/sql_parse.h @@ -100,9 +100,16 @@ bool multi_delete_set_locks_and_link_aux_tables(LEX *lex); void create_table_set_open_action_and_adjust_tables(LEX *lex); int bootstrap(MYSQL_FILE *file); int mysql_execute_command(THD *thd); -bool do_command(THD *thd); -bool dispatch_command(enum enum_server_command command, THD *thd, - char* packet, uint packet_length); +enum dispatch_command_return +{ + DISPATCH_COMMAND_SUCCESS=0, + DISPATCH_COMMAND_ERROR= 1, + DISPATCH_COMMAND_WOULDBLOCK= 2 +}; + +dispatch_command_return do_command(THD *thd, bool blocking = true); +dispatch_command_return dispatch_command(enum enum_server_command command, THD *thd, + char* packet, uint packet_length, bool blocking = true); void log_slow_statement(THD *thd); bool append_file_to_dir(THD *thd, const char **filename_ptr, const LEX_CSTRING *table_name); diff --git a/sql/threadpool.h b/sql/threadpool.h index 285b46e3b27..b1c26630812 100644 --- a/sql/threadpool.h +++ b/sql/threadpool.h @@ -132,6 +132,7 @@ struct TP_pool virtual int set_stall_limit(uint){ return 0; } virtual int get_thread_count() { return tp_stats.num_worker_threads; } virtual int get_idle_thread_count(){ return 0; } + virtual void resume(TP_connection* c)=0; }; #ifdef _WIN32 @@ -145,6 +146,7 @@ struct TP_pool_win:TP_pool virtual void add(TP_connection *); virtual int set_max_threads(uint); virtual int set_min_threads(uint); + void resume(TP_connection *c); }; #endif @@ -158,6 +160,7 @@ struct TP_pool_generic :TP_pool virtual int set_pool_size(uint); virtual int set_stall_limit(uint); virtual int get_idle_thread_count(); + void resume(TP_connection* c); }; #endif /* HAVE_POOL_OF_THREADS */ diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc index c27f42b3d62..1f65a85c233 100644 --- a/sql/threadpool_common.cc +++ b/sql/threadpool_common.cc @@ -23,6 +23,8 @@ #include <sql_audit.h> #include <debug_sync.h> #include <threadpool.h> +#include <sql_class.h> +#include <sql_parse.h> #ifdef WITH_WSREP #include "wsrep_trans_observer.h" @@ -51,7 +53,7 @@ TP_STATISTICS tp_stats; static void threadpool_remove_connection(THD *thd); -static int threadpool_process_request(THD *thd); +static dispatch_command_return threadpool_process_request(THD *thd); static THD* threadpool_add_connection(CONNECT *connect, TP_connection *c); extern bool do_command(THD*); @@ -195,10 +197,30 @@ void tp_callback(TP_connection *c) } c->connect= 0; } - else if (threadpool_process_request(thd)) + else { - /* QUIT or an error occurred. */ - goto error; +retry: + switch(threadpool_process_request(thd)) + { + case DISPATCH_COMMAND_WOULDBLOCK: + if (!thd->async_state.try_suspend()) + { + /* + All async operations finished meanwhile, thus nobody is will wake up + this THD (this is done at the end of, and therefore do "resume" manually. + */ + thd->async_state.m_state = thd_async_state::enum_async_state::RESUME; + goto retry; + } + worker_context.restore(); + return; + case DISPATCH_COMMAND_ERROR: + /* QUIT or an error occurred. */ + goto error; + case DISPATCH_COMMAND_SUCCESS: + break; + } + thd->async_state.m_state= thd_async_state::enum_async_state::NONE; } /* Set priority */ @@ -321,10 +343,13 @@ static void handle_wait_timeout(THD *thd) /** Process a single client request or a single batch. */ -static int threadpool_process_request(THD *thd) +static dispatch_command_return threadpool_process_request(THD *thd) { - int retval= 0; + dispatch_command_return retval= DISPATCH_COMMAND_SUCCESS; + thread_attach(thd); + if(thd->async_state.m_state == thd_async_state::enum_async_state::RESUME) + goto resume; if (thd->killed >= KILL_CONNECTION) { @@ -332,7 +357,7 @@ static int threadpool_process_request(THD *thd) killed flag was set by timeout handler or KILL command. Return error. */ - retval= 1; + retval= DISPATCH_COMMAND_ERROR; if(thd->killed == KILL_WAIT_TIMEOUT) handle_wait_timeout(thd); goto end; @@ -356,12 +381,20 @@ static int threadpool_process_request(THD *thd) if (mysql_audit_release_required(thd)) mysql_audit_release(thd); - if ((retval= do_command(thd)) != 0) - goto end; +resume: + retval= do_command(thd, false); + switch(retval) + { + case DISPATCH_COMMAND_WOULDBLOCK: + case DISPATCH_COMMAND_ERROR: + goto end; + case DISPATCH_COMMAND_SUCCESS: + break; + } if (!thd_is_connection_alive(thd)) { - retval= 1; + retval=DISPATCH_COMMAND_ERROR; goto end; } @@ -369,7 +402,7 @@ static int threadpool_process_request(THD *thd) vio= thd->net.vio; if (!vio->has_data(vio)) - { + { /* More info on this debug sync is in sql_parse.cc*/ DEBUG_SYNC(thd, "before_do_command_net_read"); goto end; @@ -505,6 +538,14 @@ static void tp_post_kill_notification(THD *thd) post_kill_notification(thd); } +static void tp_resume(THD* thd) +{ + DBUG_ASSERT(thd->async_state.m_state == thd_async_state::enum_async_state::SUSPEND); + thd->async_state.m_state = thd_async_state::enum_async_state::RESUME; + TP_connection* c = get_TP_connection(thd); + pool->resume(c); +} + static scheduler_functions tp_scheduler_functions= { 0, // max_threads @@ -515,7 +556,8 @@ static scheduler_functions tp_scheduler_functions= tp_wait_begin, // thd_wait_begin tp_wait_end, // thd_wait_end tp_post_kill_notification, // post kill notification - tp_end // end + tp_end, // end + tp_resume }; void pool_of_threads_scheduler(struct scheduler_functions *func, diff --git a/sql/threadpool_generic.cc b/sql/threadpool_generic.cc index 2a5587fa04a..f135b920c17 100644 --- a/sql/threadpool_generic.cc +++ b/sql/threadpool_generic.cc @@ -1314,7 +1314,10 @@ void TP_pool_generic::add(TP_connection *c) DBUG_VOID_RETURN; } - +void TP_pool_generic::resume(TP_connection* c) +{ + add(c); +} /** MySQL scheduler callback: wait begin diff --git a/sql/threadpool_win.cc b/sql/threadpool_win.cc index df8a6c216a3..c83fb84ba4a 100644 --- a/sql/threadpool_win.cc +++ b/sql/threadpool_win.cc @@ -125,6 +125,11 @@ void TP_pool_win::add(TP_connection *c) } } +void TP_pool_win::resume(TP_connection* c) +{ + SubmitThreadpoolWork(((TP_connection_win*)c)->work); +} + #define CHECK_ALLOC_ERROR(op) \ do \ { \ @@ -438,3 +443,4 @@ TP_connection *TP_pool_win::new_connection(CONNECT *connect) } return c; } + diff --git a/storage/innobase/log/log0log.cc b/storage/innobase/log/log0log.cc index 84589e7950f..5f8bf3040a3 100644 --- a/storage/innobase/log/log0log.cc +++ b/storage/innobase/log/log0log.cc @@ -1082,6 +1082,17 @@ bool log_write_lock_own() } #endif + +/* Execute callback , once lsn is flushed or written to log. */ +void log_register_wait(lsn_t lsn, bool flush, void (*f)(void *), void *par) +{ + completion_callback c{f, par}; + if (flush) + flush_lock.register_wait(lsn,c); + else + write_lock.register_wait(lsn,c); +} + /** Ensure that the log has been written to the log file up to a given log entry (such as that of a transaction commit). Start a new write, or wait and check if an already running write is covering the request. @@ -1392,7 +1403,7 @@ bool log_checkpoint() log_mutex_exit(); - log_write_up_to(flush_lsn, true, true); + log_write_up_to(flush_lsn,true,true); log_mutex_enter(); diff --git a/storage/innobase/log/log0sync.cc b/storage/innobase/log/log0sync.cc index 7799e605576..6341380d544 100644 --- a/storage/innobase/log/log0sync.cc +++ b/storage/innobase/log/log0sync.cc @@ -77,6 +77,7 @@ Note that if write operation is very fast, a) or b) can be fine as alternative. #include <log0types.h> #include "log0sync.h" #include <mysql/service_thd_wait.h> +#include <sql_class.h> /** Helper class , used in group commit lock. @@ -249,6 +250,9 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num) void group_commit_lock::release(value_type num) { + completion_callback callbacks[1024]; + size_t callback_count = 0; + std::unique_lock<std::mutex> lk(m_mtx); m_lock = false; @@ -291,6 +295,23 @@ void group_commit_lock::release(value_type num) prev= cur; } } + for (auto &c : m_pending_callbacks) + { + if (c.first <= num) + { + if (callback_count < array_elements(callbacks)) + callbacks[callback_count++]= c.second; + else + c.second.m_callback(c.second.m_param); + } + } + + auto it= std::remove_if( + m_pending_callbacks.begin(), m_pending_callbacks.end(), + [num](const std::pair<value_type,completion_callback> &c) { return c.first <= num; }); + + m_pending_callbacks.erase(it, m_pending_callbacks.end()); + lk.unlock(); for (cur= wakeup_list; cur; cur= next) @@ -298,6 +319,21 @@ void group_commit_lock::release(value_type num) next= cur->m_next; cur->m_sema.wake(); } + + for (size_t i= 0; i < callback_count; i++) + callbacks[i].m_callback(callbacks[i].m_param); +} + +void group_commit_lock::register_wait(lsn_t lsn, completion_callback &c) +{ + std::unique_lock<std::mutex> lk(m_mtx); + if (lsn <= value()) + { + lk.unlock(); + c.m_callback(c.m_param); + return; + } + m_pending_callbacks.push_back({ lsn,c }); } #ifndef DBUG_OFF diff --git a/storage/innobase/log/log0sync.h b/storage/innobase/log/log0sync.h index 40afbf74ecd..c550f027b08 100644 --- a/storage/innobase/log/log0sync.h +++ b/storage/innobase/log/log0sync.h @@ -18,6 +18,7 @@ this program; if not, write to the Free Software Foundation, Inc., #include <atomic> #include <thread> #include <log0types.h> +#include <vector> struct group_commit_waiter_t; @@ -52,6 +53,12 @@ Operations supported on this semaphore 5. set_pending_value() */ +struct completion_callback +{ + void (*m_callback)(void*); + void* m_param; +}; + class group_commit_lock { using value_type = lsn_t; @@ -63,6 +70,8 @@ class group_commit_lock std::atomic<value_type> m_pending_value; bool m_lock; group_commit_waiter_t* m_waiters_list; + std::vector<std::pair<value_type,completion_callback>> m_pending_callbacks; + public: group_commit_lock(); enum lock_return_code @@ -71,6 +80,7 @@ public: EXPIRED }; lock_return_code acquire(value_type num); + void register_wait(value_type num, completion_callback& callback); void release(value_type num); value_type value() const; value_type pending() const; diff --git a/storage/innobase/trx/trx0trx.cc b/storage/innobase/trx/trx0trx.cc index 03e60d80d5b..4a86bb9261f 100644 --- a/storage/innobase/trx/trx0trx.cc +++ b/storage/innobase/trx/trx0trx.cc @@ -23,6 +23,7 @@ The transaction Created 3/26/1996 Heikki Tuuri *******************************************************/ +#define MYSQL_SERVER #include "trx0trx.h" @@ -1185,34 +1186,104 @@ trx_finalize_for_fts( trx->fts_trx = NULL; } -/**********************************************************************//** -If required, flushes the log to disk based on the value of -innodb_flush_log_at_trx_commit. */ -static -void -trx_flush_log_if_needed_low( -/*========================*/ - lsn_t lsn) /*!< in: lsn up to which logs are to be - flushed. */ +extern "C" MYSQL_THD thd_increment_pending_ops(); +extern "C" void thd_decrement_pending_ops(MYSQL_THD); +extern "C" int thd_pending_ops(MYSQL_THD ); + +static Atomic_counter<int> n_pending_log_write; + +static bool need_to_flush_log() { - bool flush = srv_file_flush_method != SRV_NOSYNC; + if (srv_file_flush_method != SRV_NOSYNC) + return false; - switch (srv_flush_log_at_trx_commit) { - case 3: - case 2: - /* Write the log but do not flush it to disk */ - flush = false; - /* fall through */ - case 1: - /* Write the log and optionally flush it to disk */ - log_write_up_to(lsn, flush); - return; - case 0: - /* Do nothing */ - return; - } + switch (srv_flush_log_at_trx_commit) + { + case 3: + case 2: + /* Write the log but do not flush it to disk */ + return false; + /* fall through */ + case 1: + return true; + case 0: + return false; + } + ut_error; + return false; +} +static void background_log_write(void *) +{ + bool flush= need_to_flush_log(); - ut_error; + lsn_t old_lsn= 0; + do + { + log_mutex_enter(); + auto lsn= log_sys.get_lsn(); + log_mutex_exit(); + n_pending_log_write= 0; + if (lsn == old_lsn) + break; + log_write_up_to(lsn, flush); + old_lsn= lsn; + } while (n_pending_log_write); +} + +static tpool::task_group bck_write_group(1); +static tpool::task background_log_write_task(background_log_write, 0, + &bck_write_group); + +static void initiate_background_log_write_if_needed() +{ + if (!n_pending_log_write++) + srv_thread_pool->submit_task(&background_log_write_task); +} + +extern "C" int thd_pending_ops(MYSQL_THD thd); +extern void log_register_wait(lsn_t lsn, bool flush, void (*f)(void *), + void *par); + +/* + If required, intiates flushes the log to disk based on the value of + innodb_flush_log_at_trx_commit. + + It can increment pending THD operations, and current connection + might not write response to the client, until the pending operation + is finished. + + @param lsn_t lsn + @params trx_state +*/ +static void trx_flush_log_if_needed_low(lsn_t lsn, trx_state_t trx_state) +{ + if (!srv_flush_log_at_trx_commit) + return; + + if (log_sys.get_flushed_lsn() > lsn) + return; + bool flush= need_to_flush_log(); + + if (trx_state == TRX_STATE_PREPARED) + { + /* XA, which is used with binlog as well. + Be conservative, use synchronous wait.*/ + log_write_up_to(lsn, flush); + return; + } + + initiate_background_log_write_if_needed(); + + MYSQL_THD thd= thd_increment_pending_ops(); + if (!thd) + { + /* This might be dictionary recalculations.*/ + log_write_up_to(lsn, flush); + return; + } + + log_register_wait(lsn, flush, (void (*)(void *)) thd_decrement_pending_ops, + thd); } /**********************************************************************//** @@ -1227,7 +1298,7 @@ trx_flush_log_if_needed( trx_t* trx) /*!< in/out: transaction */ { trx->op_info = "flushing log"; - trx_flush_log_if_needed_low(lsn); + trx_flush_log_if_needed_low(lsn,trx->state); trx->op_info = ""; } |