summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarko Mäkelä <marko.makela@mariadb.com>2022-04-28 09:01:37 +0300
committerMarko Mäkelä <marko.makela@mariadb.com>2022-04-28 09:01:37 +0300
commitcdc328a004ecfa16b6a965256b923f920cca7931 (patch)
tree28b971bef5a4e75b2a2bf966dac57a36d3591c02
parentb0dbb43cf9454f72f7deae1c04612f0d77bd6acf (diff)
parentfbf8646335280150a6ecf5727effb1a719f26b22 (diff)
downloadmariadb-git-bb-10.9-MDEV-28313-21423-26603.tar.gz
-rw-r--r--storage/innobase/buf/buf0flu.cc2
-rw-r--r--storage/innobase/handler/ha_innodb.cc3
-rw-r--r--storage/innobase/include/log0log.h36
-rw-r--r--storage/innobase/log/log0log.cc269
-rw-r--r--storage/innobase/log/log0sync.cc136
-rw-r--r--storage/innobase/log/log0sync.h19
-rw-r--r--storage/innobase/mtr/mtr0mtr.cc4
-rw-r--r--storage/innobase/os/os0file.cc8
-rw-r--r--storage/innobase/srv/srv0srv.cc9
-rw-r--r--storage/innobase/srv/srv0start.cc4
-rw-r--r--storage/innobase/trx/trx0trx.cc3
11 files changed, 323 insertions, 170 deletions
diff --git a/storage/innobase/buf/buf0flu.cc b/storage/innobase/buf/buf0flu.cc
index 0604234c5f6..d297695f6f4 100644
--- a/storage/innobase/buf/buf0flu.cc
+++ b/storage/innobase/buf/buf0flu.cc
@@ -1702,7 +1702,7 @@ ulint buf_flush_LRU(ulint max_n)
if (buf_pool.n_flush_LRU())
return 0;
- log_buffer_flush_to_disk();
+ log_buffer_flush_to_disk_async();
mysql_mutex_lock(&buf_pool.mutex);
if (buf_pool.n_flush_LRU_)
diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc
index 03e50966547..0cd7e81028d 100644
--- a/storage/innobase/handler/ha_innodb.cc
+++ b/storage/innobase/handler/ha_innodb.cc
@@ -1770,6 +1770,7 @@ MYSQL_THD innobase_create_background_thd(const char* name)
MYSQL_THD thd= create_background_thd();
thd_proc_info(thd, name);
THDVAR(thd, background_thread) = true;
+ ut_ad(!thd_get_thread_id(thd));
return thd;
}
@@ -1777,7 +1778,7 @@ extern "C" void thd_increment_pending_ops(MYSQL_THD);
THD *innodb_thd_increment_pending_ops(THD *thd)
{
- if (!thd || THDVAR(thd, background_thread))
+ if (!thd || !thd_get_thread_id(thd))
return nullptr;
thd_increment_pending_ops(thd);
return thd;
diff --git a/storage/innobase/include/log0log.h b/storage/innobase/include/log0log.h
index 04c3d7c371d..8f6fbaf30e2 100644
--- a/storage/innobase/include/log0log.h
+++ b/storage/innobase/include/log0log.h
@@ -70,15 +70,25 @@ void log_write_up_to(lsn_t lsn, bool durable,
const completion_callback *callback= nullptr);
/** Write to the log file up to the last log entry.
-@param durable whether to wait for a durable write to complete */
-void log_buffer_flush_to_disk(bool durable= true);
+@param durable whether to wait for a durable write to complete
+@param wait whether to wait for completion
+*/
+void log_buffer_flush_to_disk(bool durable= true, bool wait=true);
+
+/** Initiate log buffer write/flush */
+static inline void log_buffer_flush_to_disk_async()
+{
+ log_buffer_flush_to_disk(true, false);
+}
/** Prepare to invoke log_write_and_flush(), before acquiring log_sys.latch. */
ATTRIBUTE_COLD void log_write_and_flush_prepare();
-/** Durably write the log up to log_sys.get_lsn(). */
-ATTRIBUTE_COLD void log_write_and_flush();
+/** Durably write the log up to log_sys.get_lsn().
+@return lsn that log_write_up_to() must be invoked with
+@retval 0 if there is no need to invoke log_write_up_to() */
+ATTRIBUTE_COLD __attribute__((warn_unused_result)) lsn_t log_write_and_flush();
/** Make a checkpoint */
ATTRIBUTE_COLD void log_make_checkpoint();
@@ -136,7 +146,8 @@ public:
dberr_t close() noexcept;
dberr_t read(os_offset_t offset, span<byte> buf) noexcept;
- void write(os_offset_t offset, span<const byte> buf) noexcept;
+ void write(os_offset_t offset, span<const byte> buf,
+ tpool::aiocb *cb= nullptr) noexcept;
bool flush() const noexcept { return os_file_flush(m_file); }
#ifdef HAVE_PMEM
byte *mmap(bool read_only, const struct stat &st) noexcept;
@@ -485,8 +496,19 @@ public:
/** Write buf to ib_logfile0.
@tparam release_latch whether to invoke latch.wr_unlock()
- @return the current log sequence number */
- template<bool release_latch> inline lsn_t write_buf() noexcept;
+ @param durable whether to invoke a durable write
+ @param sync whether to invoke a synchronous write
+ @return new write target
+ @retval 0 if there is no need to call log_write_up_to() */
+ template<bool release_latch>
+ inline lsn_t write_buf(bool durable, bool sync) noexcept;
+
+ /** Complete write_buf().
+ @param lsn new value of write_lsn
+ @param durable whether the write was durable
+ @return new write target
+ @retval 0 if there is no need to call log_write_up_to() */
+ inline lsn_t complete_write_buf(lsn_t lsn, bool durable) noexcept;
/** Create the log. */
void create(lsn_t lsn) noexcept;
diff --git a/storage/innobase/log/log0log.cc b/storage/innobase/log/log0log.cc
index 9529db0af06..7de28c69321 100644
--- a/storage/innobase/log/log0log.cc
+++ b/storage/innobase/log/log0log.cc
@@ -74,6 +74,38 @@ log_t log_sys;
#define LOG_BUF_FLUSH_MARGIN ((4 * 4096) /* cf. log_t::append_prepare() */ \
+ (4U << srv_page_size_shift))
+/**
+ group commit completion callback used for anything
+ that can run asynchronous
+*/
+static const completion_callback async_io_callback{nullptr, nullptr};
+
+/**
+ group commit completion callback that is forcing synchronous IO
+*/
+static const completion_callback sync_io_callback{nullptr, nullptr};
+
+#ifndef DBUG_OFF
+/**
+ Crashing after disk flush requested via dbug_debug flag.
+ flush can be executed by background thread,
+ where DBUG_EXECUTE_IF() does not work, this the value
+ is passed via global variable.
+*/
+static bool crash_after_flush;
+#endif
+
+static void report_aio_error(const char *text, tpool::aiocb *cb);
+
+/** AIO control block with auxilliary information, for async writing.
+Protected by write_lock.*/
+struct Log_aiocb : tpool::aiocb
+{
+ lsn_t lsn;
+ bool durable;
+};
+static Log_aiocb log_aiocb;
+
void log_t::set_capacity()
{
#ifndef SUX_LOCK_GENERIC
@@ -155,12 +187,29 @@ dberr_t log_file_t::read(os_offset_t offset, span<byte> buf) noexcept
return os_file_read(IORequestRead, m_file, buf.data(), offset, buf.size());
}
-void log_file_t::write(os_offset_t offset, span<const byte> buf) noexcept
+void log_file_t::write(os_offset_t offset, span<const byte> buf,
+ tpool::aiocb *iocb) noexcept
{
ut_ad(is_opened());
- if (dberr_t err= os_file_write_func(IORequestWrite, "ib_logfile0", m_file,
+ if (iocb)
+ {
+ ut_ad(buf.size() < UINT_MAX);
+ iocb->m_fh= m_file;
+ iocb->m_opcode= tpool::aio_opcode::AIO_PWRITE;
+ iocb->m_offset= offset;
+ iocb->m_buffer= (void *) buf.data();
+ iocb->m_len= (unsigned) buf.size();
+ if (srv_thread_pool->submit_io(iocb))
+ {
+ iocb->m_err= IF_WIN(GetLastError(), errno);
+ report_aio_error("submitting asynchronous write to ib_logfile0", iocb);
+ }
+ }
+ else if (dberr_t err= os_file_write(IORequestWrite, "ib_logfile0", m_file,
buf.data(), offset, buf.size()))
+ {
ib::fatal() << "write(\"ib_logfile0\") returned " << err;
+ }
}
#ifdef HAVE_PMEM
@@ -512,10 +561,13 @@ void log_t::resize_abort() noexcept
/** Write an aligned buffer to ib_logfile0.
@param buf buffer to be written
@param len length of data to be written
-@param offset log file offset */
-static void log_write_buf(const byte *buf, size_t len, lsn_t offset)
+@param offset log file offset
+@param cb completion callback */
+static void log_write_buf(const byte *buf, size_t len, lsn_t offset,
+ tpool::aiocb *cb)
{
- ut_ad(write_lock.is_owner());
+ ut_ad(cb ? !write_lock.has_owner() : write_lock.is_owner());
+ ut_ad(write_lock.locked());
ut_ad(!recv_no_log_write);
ut_d(const size_t block_size_1= log_sys.get_block_size() - 1);
ut_ad(!(offset & block_size_1));
@@ -526,7 +578,7 @@ static void log_write_buf(const byte *buf, size_t len, lsn_t offset)
if (UNIV_LIKELY(offset + len <= log_sys.file_size))
{
write:
- log_sys.log.write(offset, {buf, len});
+ log_sys.log.write(offset, {buf, len}, cb);
return;
}
@@ -738,30 +790,110 @@ ATTRIBUTE_COLD void log_t::resize_write_buf(size_t length) noexcept
resize_flush_buf, offset, length) == DB_SUCCESS);
}
+
+static void report_aio_error(const char *text, tpool::aiocb *cb)
+{
+ ib::fatal() << "IO Error "
+ << cb->m_err IF_WIN(, << " " << strerror(cb->m_err)) << " "
+ << text << "," << cb->m_len << " bytes at offset "
+ << cb->m_offset;
+}
+
+/** Ensure that previous log writes are durable.
+@return new durable lsn target
+@retval 0 if caller does not need to call log_write_up_to() again
+
+*/
+static lsn_t log_flush()
+{
+ ut_ad(!log_sys.is_pmem());
+ lsn_t lsn= write_lock.value();
+ ut_a(log_sys.flush(lsn));
+#ifndef DBUG_OFF
+ if (crash_after_flush)
+ DBUG_SUICIDE();
+#endif
+ return flush_lock.release(lsn);
+}
+
+
+/** Complete write_buf().
+@param lsn new value of write_lsn
+@param durable whether the write was durable
+@return new write target
+@retval 0 if there is no need to call log_write_up_to() */
+inline lsn_t log_t::complete_write_buf(lsn_t lsn, bool durable) noexcept
+{
+ ut_ad(write_lock.is_owner());
+ ut_ad(durable == flush_lock.is_owner());
+
+ ut_a(lsn >= write_lsn);
+
+ write_lsn= lsn;
+ lsn_t pending_lsn= write_lock.release(lsn);
+ if (durable)
+ pending_lsn= std::max(pending_lsn, log_flush());
+ return pending_lsn;
+}
+
+static void aio_complete_write_buf(void *p)
+{
+ ut_ad(write_lock.locked());
+
+ Log_aiocb *cb= static_cast<Log_aiocb *>(p);
+ if (cb->m_err)
+ report_aio_error("in asynchronous write to ib_logfile0", cb);
+ const bool durable{cb->durable};
+#ifdef UNIV_DEBUG
+ if (durable)
+ {
+ ut_ad(flush_lock.locked());
+ flush_lock.set_owner();
+ }
+ write_lock.set_owner();
+#endif
+
+ if (lsn_t ret_lsn= log_sys.complete_write_buf(cb->lsn, durable))
+ {
+ /** prevent stalls. Also, force special synchronous callback
+ as optimization. We'll avoid threadpool machinery and context
+ switching (we're already in the background thread here)
+ */
+ log_write_up_to(ret_lsn, durable, &sync_io_callback);
+ }
+}
+
+
/** Write buf to ib_logfile0.
@tparam release_latch whether to invoke latch.wr_unlock()
-@return the current log sequence number */
-template<bool release_latch> inline lsn_t log_t::write_buf() noexcept
+@param durable whether to invoke a durable write
+@param sync whether to invoke a synchronous write
+@return new write target
+@retval 0 if there is no need to call log_write_up_to() */
+template<bool release_latch>
+inline lsn_t log_t::write_buf(bool durable, bool sync) noexcept
{
#ifndef SUX_LOCK_GENERIC
ut_ad(latch.is_write_locked());
#endif
ut_ad(!srv_read_only_mode);
ut_ad(!is_pmem());
+ ut_ad(write_lock.is_owner());
+ ut_ad(durable == flush_lock.is_owner());
const lsn_t lsn{get_lsn(std::memory_order_relaxed)};
-
+ DBUG_EXECUTE_IF("crash_after_log_write_upto", crash_after_flush= true;);
if (write_lsn >= lsn)
{
if (release_latch)
latch.wr_unlock();
- ut_ad(write_lsn == lsn);
+ ut_a(write_lsn == lsn);
+ return complete_write_buf(lsn, durable);
}
else
{
ut_ad(!recv_no_log_write);
- write_lock.set_pending(lsn);
- ut_ad(write_lsn >= get_flushed_lsn());
+ ut_a(write_lsn >= get_flushed_lsn());
const size_t block_size_1{get_block_size() - 1};
lsn_t offset{calc_lsn_offset(write_lsn) & ~lsn_t{block_size_1}};
@@ -812,20 +944,34 @@ template<bool release_latch> inline lsn_t log_t::write_buf() noexcept
"InnoDB log write: " LSN_PF, write_lsn);
}
- /* Do the write to the log file */
- log_write_buf(write_buf, length, offset);
if (UNIV_LIKELY_NULL(resize_buf))
resize_write_buf(length);
- write_lsn= lsn;
- }
- return lsn;
+ /* Do the write to the log file */
+ if (sync)
+ {
+ log_write_buf(write_buf, length, offset, nullptr);
+ return complete_write_buf(lsn, durable);
+ }
+
+ /* Async log IO
+ Note : flush/write lock ownership is going to migrate to a
+ background thread*/
+ ut_d(write_lock.reset_owner());
+ ut_d(if (durable) flush_lock.reset_owner());
+
+ log_aiocb.m_callback= aio_complete_write_buf;
+ log_aiocb.durable= durable;
+ log_aiocb.lsn= lsn;
+ log_write_buf(write_buf, length, offset, &log_aiocb);
+ return 0;
+ }
}
bool log_t::flush(lsn_t lsn) noexcept
{
ut_ad(lsn >= get_flushed_lsn());
- flush_lock.set_pending(lsn);
+ ut_ad(flush_lock.is_owner());
const bool success{srv_file_flush_method == SRV_O_DSYNC || log.flush()};
if (UNIV_LIKELY(success))
{
@@ -835,22 +981,25 @@ bool log_t::flush(lsn_t lsn) noexcept
return success;
}
-/** Ensure that previous log writes are durable.
-@param lsn previously written LSN
-@return new durable lsn target
-@retval 0 if there are no pending callbacks on flush_lock
- or there is another group commit lead.
+/*
+ Decide about whether to do synchronous IO.
+ Async might not make sense because of the higher latency or CPU
+ overhead in threadpool, or because the file is cached,and say libaio
+ can't do AIO on cached files.
+
+ Async IO apparently makes sense always if the waiter does
+ not care about result (i.e callback with NULL function)
+
+ NOTE: currently, async IO is mostly unused, because it turns
+ out to be worse in benchmarks. Perhaps it is just too many threads
+ involved in waking and waiting.
*/
-static lsn_t log_flush(lsn_t lsn)
+static bool use_sync_log_write(bool /* durable */,
+ const completion_callback *cb)
{
- ut_ad(!log_sys.is_pmem());
- ut_a(log_sys.flush(lsn));
- DBUG_EXECUTE_IF("crash_after_log_write_upto", DBUG_SUICIDE(););
- return flush_lock.release(lsn);
+ return !cb || cb->m_callback || cb == &sync_io_callback;
}
-static const completion_callback dummy_callback{[](void *) {},nullptr};
-
/** Ensure that the log has been written to the log file up to a given
log entry (such as that of a transaction commit). Start a new write, or
wait and check if an already running write is covering the request.
@@ -867,7 +1016,7 @@ void log_write_up_to(lsn_t lsn, bool durable,
{
/* A non-final batch of recovery is active no writes to the log
are allowed yet. */
- ut_a(!callback);
+ ut_a(!callback || !callback->m_callback);
return;
}
@@ -876,7 +1025,7 @@ void log_write_up_to(lsn_t lsn, bool durable,
#ifdef HAVE_PMEM
if (log_sys.is_pmem())
{
- ut_ad(!callback);
+ ut_ad(!callback || !callback->m_callback);
if (durable)
log_sys.persist(lsn);
return;
@@ -884,42 +1033,49 @@ void log_write_up_to(lsn_t lsn, bool durable,
#endif
repeat:
- if (durable)
- {
- if (flush_lock.acquire(lsn, callback) != group_commit_lock::ACQUIRED)
- return;
- flush_lock.set_pending(log_sys.get_lsn());
- }
+ if (durable &&
+ flush_lock.acquire(lsn, callback) != group_commit_lock::ACQUIRED)
+ return;
- lsn_t pending_write_lsn= 0, pending_flush_lsn= 0;
+ lsn_t pending_lsn= 0;
if (write_lock.acquire(lsn, durable ? nullptr : callback) ==
group_commit_lock::ACQUIRED)
{
+ const bool sync{use_sync_log_write(durable, callback)};
log_sys.latch.wr_lock(SRW_LOCK_CALL);
- pending_write_lsn= write_lock.release(log_sys.write_buf<true>());
+ pending_lsn= log_sys.write_buf<true>(durable, sync);
+ if (!pending_lsn)
+ return;
}
- if (durable)
+ if (durable && !pending_lsn)
{
- pending_flush_lsn= log_flush(write_lock.value());
+ /* We only get here if flush_lock is acquired, but write_lock
+ is expired, i.e lsn was already written, but not flushed yet.*/
+ pending_lsn= log_flush();
}
- if (pending_write_lsn || pending_flush_lsn)
+ if (pending_lsn)
{
- /* There is no new group commit lead; some async waiters could stall. */
- callback= &dummy_callback;
- lsn= std::max(pending_write_lsn, pending_flush_lsn);
+ /* There is no new group commit lead; some waiters could stall.
+ If special sync_io_callback was used we'll continue to use it
+ as optimization to reduce context switches.
+ */
+ if (callback != &sync_io_callback)
+ callback= &async_io_callback;
+ lsn= pending_lsn;
goto repeat;
}
}
/** Write to the log file up to the last log entry.
@param durable whether to wait for a durable write to complete */
-void log_buffer_flush_to_disk(bool durable)
+void log_buffer_flush_to_disk(bool durable, bool wait)
{
ut_ad(!srv_read_only_mode);
- log_write_up_to(log_sys.get_lsn(std::memory_order_acquire), durable);
+ log_write_up_to(log_sys.get_lsn(std::memory_order_acquire), durable,
+ wait ? nullptr : &async_io_callback);
}
/** Prepare to invoke log_write_and_flush(), before acquiring log_sys.latch. */
@@ -934,20 +1090,19 @@ ATTRIBUTE_COLD void log_write_and_flush_prepare()
group_commit_lock::ACQUIRED);
}
-/** Durably write the log up to log_sys.get_lsn(). */
-ATTRIBUTE_COLD void log_write_and_flush()
+/** Durably write the log up to log_sys.get_lsn().
+@return lsn that log_write_up_to() must be invoked with
+@retval 0 if there is no need to invoke log_write_up_to() */
+ATTRIBUTE_COLD __attribute__((warn_unused_result)) lsn_t log_write_and_flush()
{
ut_ad(!srv_read_only_mode);
if (!log_sys.is_pmem())
- {
- const lsn_t lsn{log_sys.write_buf<false>()};
- write_lock.release(lsn);
- log_flush(lsn);
- }
+ return log_sys.write_buf<false>(true, true);
+
#ifdef HAVE_PMEM
- else
- log_sys.persist(log_sys.get_lsn());
+ log_sys.persist(log_sys.get_lsn());
#endif
+ return 0;
}
/********************************************************************
diff --git a/storage/innobase/log/log0sync.cc b/storage/innobase/log/log0sync.cc
index 6b14d1d3591..641d7e65c36 100644
--- a/storage/innobase/log/log0sync.cc
+++ b/storage/innobase/log/log0sync.cc
@@ -1,5 +1,5 @@
/*****************************************************************************
-Copyright (c) 2020 MariaDB Corporation.
+Copyright (c) 2020, 2022, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@@ -72,12 +72,8 @@ Note that if write operation is very fast, a) or b) can be fine as alternative.
#include <thread>
#include <mutex>
#include <condition_variable>
-#include <my_cpu.h>
-#include <log0types.h>
#include "log0sync.h"
-#include <mysql/service_thd_wait.h>
-#include <sql_class.h>
/**
Helper class , used in group commit lock.
@@ -166,7 +162,7 @@ struct group_commit_waiter_t
};
group_commit_lock::group_commit_lock() :
- m_mtx(), m_value(0), m_pending_value(0), m_lock(false), m_waiters_list()
+ m_mtx(), m_value(0), m_lock(false), m_waiters_list()
{
}
@@ -175,60 +171,32 @@ group_commit_lock::value_type group_commit_lock::value() const
return m_value.load(std::memory_order::memory_order_relaxed);
}
-group_commit_lock::value_type group_commit_lock::pending() const
-{
- return m_pending_value.load(std::memory_order::memory_order_relaxed);
-}
-
-void group_commit_lock::set_pending(group_commit_lock::value_type num)
-{
- ut_a(num >= value());
- m_pending_value.store(num, std::memory_order::memory_order_relaxed);
-}
-const unsigned int MAX_SPINS = 1; /** max spins in acquire */
thread_local group_commit_waiter_t thread_local_waiter;
static inline void do_completion_callback(const completion_callback* cb)
{
- if (cb)
+ if (cb && cb->m_callback)
cb->m_callback(cb->m_param);
}
-group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num, const completion_callback *callback)
+inline void store_callback(lsn_t num, const completion_callback *cb,
+ std::vector<std::pair<lsn_t, completion_callback>> &v)
{
- unsigned int spins = MAX_SPINS;
-
- for(;;)
- {
- if (num <= value())
- {
- /* No need to wait.*/
- do_completion_callback(callback);
- return lock_return_code::EXPIRED;
- }
-
- if(spins-- == 0)
- break;
- if (num > pending())
- {
- /* Longer wait expected (longer than currently running operation),
- don't spin.*/
- break;
- }
- ut_delay(1);
- }
+ if (!cb || !cb->m_callback)
+ return;
+ v.push_back({num, *cb});
+}
- thread_local_waiter.m_value = num;
- thread_local_waiter.m_group_commit_leader= false;
+group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num, const completion_callback *callback)
+{
+ bool group_commit_leader= false;
std::unique_lock<std::mutex> lk(m_mtx, std::defer_lock);
- while (num > value() || thread_local_waiter.m_group_commit_leader)
+ while (num > value() || group_commit_leader)
{
lk.lock();
-
/* Re-read current value after acquiring the lock*/
- if (num <= value() &&
- (!thread_local_waiter.m_group_commit_leader || m_lock))
+ if (num <= value() && (!group_commit_leader || m_lock))
{
lk.unlock();
do_completion_callback(callback);
@@ -239,40 +207,30 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num, c
{
/* Take the lock, become group commit leader.*/
m_lock = true;
-#ifndef DBUG_OFF
- m_owner_id = std::this_thread::get_id();
-#endif
- if (callback)
- m_pending_callbacks.push_back({num,*callback});
+ ut_d(set_owner());
+ store_callback(num, callback,m_pending_callbacks);
return lock_return_code::ACQUIRED;
}
- if (callback && (m_waiters_list || num <= pending()))
+ if (callback)
{
- /*
- If num > pending(), we have a good candidate for the next group
- commit lead, that will be taking over the lock after current owner
- releases it. We put current thread into waiter's list so it sleeps
- and can be signaled and marked as group commit lead during lock release.
-
- For this to work well, pending() must deliver a good approximation for N
- in the next call to group_commit_lock::release(N).
- */
- m_pending_callbacks.push_back({num, *callback});
+ store_callback(num, callback, m_pending_callbacks);
return lock_return_code::CALLBACK_QUEUED;
}
/* Add yourself to waiters list.*/
- thread_local_waiter.m_group_commit_leader= false;
- thread_local_waiter.m_next = m_waiters_list;
- m_waiters_list = &thread_local_waiter;
+ auto *waiter= &thread_local_waiter;
+ waiter->m_value= num;
+ waiter->m_group_commit_leader= false;
+ waiter->m_next= m_waiters_list;
+ m_waiters_list= waiter;
lk.unlock();
/* Sleep until woken in release().*/
thd_wait_begin(0,THD_WAIT_GROUP_COMMIT);
- thread_local_waiter.m_sema.wait();
+ waiter->m_sema.wait();
thd_wait_end(0);
-
+ group_commit_leader= waiter->m_group_commit_leader;
}
do_completion_callback(callback);
return lock_return_code::EXPIRED;
@@ -282,9 +240,8 @@ group_commit_lock::value_type group_commit_lock::release(value_type num)
{
completion_callback callbacks[1000];
size_t callback_count = 0;
- value_type ret = 0;
+ value_type ret= 0;
std::unique_lock<std::mutex> lk(m_mtx);
- m_lock = false;
/* Update current value. */
ut_a(num >= value());
@@ -307,6 +264,12 @@ group_commit_lock::value_type group_commit_lock::release(value_type num)
}
}
+ auto it=
+ std::remove_if(m_pending_callbacks.begin(), m_pending_callbacks.end(),
+ [num](const pending_cb &c) { return c.first <= num; });
+
+ m_pending_callbacks.erase(it, m_pending_callbacks.end());
+
for (prev= nullptr, cur= m_waiters_list; cur; cur= next)
{
next= cur->m_next;
@@ -335,12 +298,6 @@ group_commit_lock::value_type group_commit_lock::release(value_type num)
}
}
- auto it= std::remove_if(
- m_pending_callbacks.begin(), m_pending_callbacks.end(),
- [num](const pending_cb &c) { return c.first <= num; });
-
- m_pending_callbacks.erase(it, m_pending_callbacks.end());
-
if (m_pending_callbacks.size() || m_waiters_list)
{
/*
@@ -370,7 +327,8 @@ group_commit_lock::value_type group_commit_lock::release(value_type num)
ret= m_pending_callbacks[0].first;
}
}
-
+ ut_d(reset_owner();)
+ m_lock= false;
lk.unlock();
/*
@@ -396,9 +354,29 @@ group_commit_lock::value_type group_commit_lock::release(value_type num)
}
#ifndef DBUG_OFF
-bool group_commit_lock::is_owner()
+#include <tpool_structs.h>
+TPOOL_SUPPRESS_TSAN
+bool group_commit_lock::locked() const noexcept { return m_lock; }
+
+TPOOL_SUPPRESS_TSAN bool group_commit_lock::is_owner() const noexcept
{
- return m_lock && std::this_thread::get_id() == m_owner_id;
+ return locked() && std::this_thread::get_id() == m_owner_id;
+}
+
+void TPOOL_SUPPRESS_TSAN group_commit_lock::set_owner()
+{
+ DBUG_ASSERT(locked() && !has_owner());
+ m_owner_id= std::this_thread::get_id();
}
-#endif
+void TPOOL_SUPPRESS_TSAN group_commit_lock::reset_owner()
+{
+ DBUG_ASSERT(is_owner());
+ m_owner_id= std::thread::id();
+}
+
+bool TPOOL_SUPPRESS_TSAN group_commit_lock::has_owner() const noexcept
+{
+ return m_owner_id != std::thread::id();
+}
+#endif
diff --git a/storage/innobase/log/log0sync.h b/storage/innobase/log/log0sync.h
index 00686d39dac..7dc9e2aaf38 100644
--- a/storage/innobase/log/log0sync.h
+++ b/storage/innobase/log/log0sync.h
@@ -1,5 +1,5 @@
/*****************************************************************************
-Copyright (c) 2020 MariaDB Corporation.
+Copyright (c) 2020, 2022, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@@ -56,14 +56,6 @@ Operations supported on this semaphore
- might return some lsn, meaning there are some pending
callbacks left, and there is no new group commit lead
(i.e caller must do something to flush those pending callbacks)
-
-3. value()
-- read current value
-
-4. pending_value()
-- read pending value
-
-5. set_pending_value()
*/
class group_commit_lock
{
@@ -73,7 +65,6 @@ class group_commit_lock
#endif
std::mutex m_mtx;
std::atomic<value_type> m_value;
- std::atomic<value_type> m_pending_value;
bool m_lock;
group_commit_waiter_t* m_waiters_list;
@@ -91,9 +82,11 @@ public:
lock_return_code acquire(value_type num, const completion_callback *cb);
value_type release(value_type num);
value_type value() const;
- value_type pending() const;
- void set_pending(value_type num);
#ifndef DBUG_OFF
- bool is_owner();
+ bool locked() const noexcept;
+ bool is_owner() const noexcept;
+ bool has_owner() const noexcept;
+ void set_owner();
+ void reset_owner();
#endif
};
diff --git a/storage/innobase/mtr/mtr0mtr.cc b/storage/innobase/mtr/mtr0mtr.cc
index b68b820eb0c..e2b8740a5ac 100644
--- a/storage/innobase/mtr/mtr0mtr.cc
+++ b/storage/innobase/mtr/mtr0mtr.cc
@@ -614,7 +614,7 @@ void mtr_t::commit_shrink(fil_space_t &space)
const lsn_t start_lsn= do_write().first;
/* Durably write the reduced FSP_SIZE before truncating the data file. */
- log_write_and_flush();
+ lsn_t pending_lsn= log_write_and_flush();
#ifndef SUX_LOCK_GENERIC
ut_ad(log_sys.latch.is_write_locked());
#endif
@@ -660,6 +660,8 @@ void mtr_t::commit_shrink(fil_space_t &space)
m_memo.for_each_block_in_reverse(CIterate<ReleaseLatches>());
release_resources();
+ if (pending_lsn)
+ log_buffer_flush_to_disk_async();
}
/** Commit a mini-transaction that did not modify any pages,
diff --git a/storage/innobase/os/os0file.cc b/storage/innobase/os/os0file.cc
index a9c4776c147..3654982ac4b 100644
--- a/storage/innobase/os/os0file.cc
+++ b/storage/innobase/os/os0file.cc
@@ -2205,6 +2205,8 @@ os_file_create_func(
? FILE_FLAG_OVERLAPPED : 0;
if (type == OS_LOG_FILE) {
+ ut_a(read_only || srv_thread_pool);
+ attributes|= FILE_FLAG_OVERLAPPED;
if(srv_flush_log_at_trx_commit != 2 && !log_sys.is_opened())
attributes|= FILE_FLAG_NO_BUFFERING;
if (srv_file_flush_method == SRV_O_DSYNC)
@@ -3690,7 +3692,11 @@ int os_aio_init()
OS_AIO_N_PENDING_IOS_PER_THREAD);
int max_read_events= int(srv_n_read_io_threads *
OS_AIO_N_PENDING_IOS_PER_THREAD);
- int max_events= max_read_events + max_write_events;
+
+ /* Count one extra aiocb for innodb async redo writes, which
+ bypasses the slots.*/
+
+ int max_events= max_read_events + max_write_events + 1;
read_slots.reset(new io_slots(max_read_events, srv_n_read_io_threads));
write_slots.reset(new io_slots(max_write_events, srv_n_write_io_threads));
diff --git a/storage/innobase/srv/srv0srv.cc b/storage/innobase/srv/srv0srv.cc
index 7e1cd5145c2..7ce1ebf145c 100644
--- a/storage/innobase/srv/srv0srv.cc
+++ b/storage/innobase/srv/srv0srv.cc
@@ -1454,7 +1454,7 @@ static void srv_sync_log_buffer_in_background()
srv_main_thread_op_info = "flushing log";
if (difftime(current_time, srv_last_log_flush_time)
>= srv_flush_log_at_timeout) {
- log_buffer_flush_to_disk();
+ log_buffer_flush_to_disk_async();
srv_last_log_flush_time = current_time;
srv_log_writes_and_flush++;
}
@@ -1566,15 +1566,14 @@ void srv_master_callback(void*)
if (!purge_state.m_running)
srv_wake_purge_thread_if_not_active();
ulonglong counter_time= microsecond_interval_timer();
- srv_sync_log_buffer_in_background();
- MONITOR_INC_TIME_IN_MICRO_SECS(MONITOR_SRV_LOG_FLUSH_MICROSECOND,
- counter_time);
-
if (srv_check_activity(&old_activity_count))
srv_master_do_active_tasks(counter_time);
else
srv_master_do_idle_tasks(counter_time);
+ srv_sync_log_buffer_in_background();
+ MONITOR_INC_TIME_IN_MICRO_SECS(MONITOR_SRV_LOG_FLUSH_MICROSECOND,
+ counter_time);
srv_main_thread_op_info= "sleeping";
}
diff --git a/storage/innobase/srv/srv0start.cc b/storage/innobase/srv/srv0start.cc
index 40ae4f3a3e7..12baa8e8b39 100644
--- a/storage/innobase/srv/srv0start.cc
+++ b/storage/innobase/srv/srv0start.cc
@@ -203,7 +203,7 @@ static dberr_t create_log_file(bool create_new_db, lsn_t lsn)
os_file_t file{
os_file_create_func(logfile0.c_str(),
OS_FILE_CREATE | OS_FILE_ON_ERROR_NO_EXIT,
- OS_FILE_NORMAL, OS_LOG_FILE, false, &ret)
+ OS_FILE_AIO, OS_LOG_FILE, false, &ret)
};
if (!ret) {
@@ -524,7 +524,7 @@ srv_check_undo_redo_logs_exists()
fh = os_file_create_func(logfilename.c_str(),
OS_FILE_OPEN_RETRY | OS_FILE_ON_ERROR_NO_EXIT
| OS_FILE_ON_ERROR_SILENT,
- OS_FILE_NORMAL, OS_LOG_FILE,
+ OS_FILE_AIO, OS_LOG_FILE,
srv_read_only_mode, &ret);
if (ret) {
diff --git a/storage/innobase/trx/trx0trx.cc b/storage/innobase/trx/trx0trx.cc
index f8ddd98f289..fc37f59b937 100644
--- a/storage/innobase/trx/trx0trx.cc
+++ b/storage/innobase/trx/trx0trx.cc
@@ -1130,9 +1130,6 @@ static void trx_flush_log_if_needed_low(lsn_t lsn, const trx_t *trx)
if (!srv_flush_log_at_trx_commit)
return;
- if (log_sys.get_flushed_lsn(std::memory_order_relaxed) >= lsn)
- return;
-
completion_callback cb, *callback= nullptr;
if (trx->state != TRX_STATE_PREPARED && !log_sys.is_pmem() &&