diff options
author | Marko Mäkelä <marko.makela@mariadb.com> | 2020-11-26 07:36:53 +0200 |
---|---|---|
committer | Marko Mäkelä <marko.makela@mariadb.com> | 2020-11-26 07:36:53 +0200 |
commit | 8b8969929d741aac667dcd1780c2264785c8a114 (patch) | |
tree | d239edad28ac404ed780b3f1b81ca466daed551c | |
parent | 581aebe29f19aaf767ba5f0ac69f8f199eb80dff (diff) | |
parent | 657fcdf430f39a3103dff51a6a2b2bd3a090a498 (diff) | |
download | mariadb-git-bb-10.3-mdev21265.tar.gz |
Merge 10.5 into 10.6bb-10.3-mdev2126510.3-mdev21265
-rw-r--r-- | storage/innobase/btr/btr0cur.cc | 4 | ||||
-rw-r--r-- | storage/innobase/buf/buf0flu.cc | 33 | ||||
-rw-r--r-- | storage/innobase/buf/buf0lru.cc | 7 | ||||
-rw-r--r-- | storage/innobase/handler/ha_innodb.cc | 2 | ||||
-rw-r--r-- | storage/innobase/include/buf0buf.h | 21 | ||||
-rw-r--r-- | storage/innobase/include/os0file.h | 6 | ||||
-rw-r--r-- | storage/innobase/include/rw_lock.h | 4 | ||||
-rw-r--r-- | storage/innobase/include/srv0srv.h | 13 | ||||
-rw-r--r-- | storage/innobase/include/trx0undo.h | 3 | ||||
-rw-r--r-- | storage/innobase/log/log0log.cc | 1 | ||||
-rw-r--r-- | storage/innobase/page/page0cur.cc | 2 | ||||
-rw-r--r-- | storage/innobase/row/row0merge.cc | 4 | ||||
-rw-r--r-- | storage/innobase/srv/srv0srv.cc | 59 | ||||
-rw-r--r-- | storage/innobase/srv/srv0start.cc | 6 | ||||
-rw-r--r-- | storage/innobase/sync/sync0arr.cc | 16 | ||||
-rw-r--r-- | storage/innobase/trx/trx0rec.cc | 2 | ||||
-rw-r--r-- | tpool/aio_linux.cc | 183 | ||||
-rw-r--r-- | tpool/tpool_generic.cc | 22 |
18 files changed, 208 insertions, 180 deletions
diff --git a/storage/innobase/btr/btr0cur.cc b/storage/innobase/btr/btr0cur.cc index d04ad46ac3e..446d3fdafaa 100644 --- a/storage/innobase/btr/btr0cur.cc +++ b/storage/innobase/btr/btr0cur.cc @@ -4157,7 +4157,7 @@ void btr_cur_upd_rec_in_place(rec_t *rec, const dict_index_t *index, } ulint l = rec_get_1byte_offs_flag(rec) ? (n + 1) : (n + 1) * 2; - byte* b = &rec[-REC_N_OLD_EXTRA_BYTES - l]; + byte* b = rec - REC_N_OLD_EXTRA_BYTES - l; compile_time_assert(REC_1BYTE_SQL_NULL_MASK << 8 == REC_2BYTE_SQL_NULL_MASK); mtr->write<1>(*block, b, @@ -4180,7 +4180,7 @@ void btr_cur_upd_rec_in_place(rec_t *rec, const dict_index_t *index, ut_ad(len == rec_get_nth_field_size(rec, n)); ulint l = rec_get_1byte_offs_flag(rec) ? (n + 1) : (n + 1) * 2; - byte* b = &rec[-REC_N_OLD_EXTRA_BYTES - l]; + byte* b = rec - REC_N_OLD_EXTRA_BYTES - l; compile_time_assert(REC_1BYTE_SQL_NULL_MASK << 8 == REC_2BYTE_SQL_NULL_MASK); mtr->write<1>(*block, b, diff --git a/storage/innobase/buf/buf0flu.cc b/storage/innobase/buf/buf0flu.cc index 316c1822f0b..dc7ae3af70e 100644 --- a/storage/innobase/buf/buf0flu.cc +++ b/storage/innobase/buf/buf0flu.cc @@ -126,6 +126,20 @@ static void buf_flush_validate_skip() } #endif /* UNIV_DEBUG */ +/** Wake up the page cleaner if needed */ +inline void buf_pool_t::page_cleaner_wakeup() +{ + if (page_cleaner_idle() && + (srv_max_dirty_pages_pct_lwm == 0.0 || + srv_max_dirty_pages_pct_lwm <= + double(UT_LIST_GET_LEN(buf_pool.flush_list)) * 100.0 / + double(UT_LIST_GET_LEN(buf_pool.LRU) + UT_LIST_GET_LEN(buf_pool.free)))) + { + page_cleaner_is_idle= false; + mysql_cond_signal(&do_flush_list); + } +} + /** Insert a modified block into the flush list. @param[in,out] block modified block @param[in] lsn oldest modification */ @@ -145,6 +159,7 @@ void buf_flush_insert_into_flush_list(buf_block_t* block, lsn_t lsn) UT_LIST_ADD_FIRST(buf_pool.flush_list, &block->page); ut_d(buf_flush_validate_skip()); + buf_pool.page_cleaner_wakeup(); mysql_mutex_unlock(&buf_pool.flush_list_mutex); } @@ -2067,8 +2082,12 @@ furious_flush: else if (srv_shutdown_state > SRV_SHUTDOWN_INITIATED) break; - mysql_cond_timedwait(&buf_pool.do_flush_list, &buf_pool.flush_list_mutex, - &abstime); + if (buf_pool.page_cleaner_idle()) + mysql_cond_wait(&buf_pool.do_flush_list, &buf_pool.flush_list_mutex); + else + mysql_cond_timedwait(&buf_pool.do_flush_list, &buf_pool.flush_list_mutex, + &abstime); + set_timespec(abstime, 1); lsn_limit= buf_flush_sync_lsn; @@ -2091,6 +2110,8 @@ furious_flush: /* wake up buf_flush_wait_flushed() */ mysql_cond_broadcast(&buf_pool.done_flush_list); } +unemployed: + buf_pool.page_cleaner_set_idle(true); continue; } @@ -2101,13 +2122,14 @@ furious_flush: double(UT_LIST_GET_LEN(buf_pool.LRU) + UT_LIST_GET_LEN(buf_pool.free)); if (dirty_pct < srv_max_dirty_pages_pct_lwm && !lsn_limit) - continue; + goto unemployed; const lsn_t oldest_lsn= buf_pool.get_oldest_modification(0); if (UNIV_UNLIKELY(lsn_limit != 0) && oldest_lsn >= lsn_limit) buf_flush_sync_lsn= 0; + buf_pool.page_cleaner_set_idle(false); mysql_mutex_unlock(&buf_pool.flush_list_mutex); ulint n_flushed; @@ -2159,6 +2181,11 @@ do_checkpoint: goto do_checkpoint; } } + else + { + mysql_mutex_lock(&buf_pool.flush_list_mutex); + goto unemployed; + } #ifdef UNIV_DEBUG while (innodb_page_cleaner_disabled_debug && !buf_flush_sync_lsn && diff --git a/storage/innobase/buf/buf0lru.cc b/storage/innobase/buf/buf0lru.cc index f9ed938b20c..37a4ec9849c 100644 --- a/storage/innobase/buf/buf0lru.cc +++ b/storage/innobase/buf/buf0lru.cc @@ -76,13 +76,12 @@ uncompressed and compressed data), which must be clean. */ /* @{ */ /** Number of intervals for which we keep the history of these stats. -Each interval is 1 second, defined by the rate at which -srv_error_monitor_thread() calls buf_LRU_stat_update(). */ -static const ulint BUF_LRU_STAT_N_INTERVAL = 50; +Updated at SRV_MONITOR_INTERVAL (the buf_LRU_stat_update() call rate). */ +static constexpr ulint BUF_LRU_STAT_N_INTERVAL= 4; /** Co-efficient with which we multiply I/O operations to equate them with page_zip_decompress() operations. */ -static const ulint BUF_LRU_IO_TO_UNZIP_FACTOR = 50; +static constexpr ulint BUF_LRU_IO_TO_UNZIP_FACTOR= 50; /** Sampled values buf_LRU_stat_cur. Not protected by any mutex. Updated by buf_LRU_stat_update(). */ diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index b5166ba83e0..2c0526b2cd0 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -16948,6 +16948,7 @@ innodb_max_dirty_pages_pct_update( in_val); srv_max_dirty_pages_pct_lwm = in_val; + mysql_cond_signal(&buf_pool.do_flush_list); } srv_max_buf_pool_modified_pct = in_val; @@ -16981,6 +16982,7 @@ innodb_max_dirty_pages_pct_lwm_update( } srv_max_dirty_pages_pct_lwm = in_val; + mysql_cond_signal(&buf_pool.do_flush_list); } /*************************************************************//** diff --git a/storage/innobase/include/buf0buf.h b/storage/innobase/include/buf0buf.h index df52d6b572c..e255f6db056 100644 --- a/storage/innobase/include/buf0buf.h +++ b/storage/innobase/include/buf0buf.h @@ -1937,10 +1937,29 @@ public: FlushHp flush_hp; /** modified blocks (a subset of LRU) */ UT_LIST_BASE_NODE_T(buf_page_t) flush_list; - +private: + /** whether the page cleaner needs wakeup from indefinite sleep */ + bool page_cleaner_is_idle; +public: /** signalled to wake up the page_cleaner; protected by flush_list_mutex */ mysql_cond_t do_flush_list; + /** @return whether the page cleaner must sleep due to being idle */ + bool page_cleaner_idle() const + { + mysql_mutex_assert_owner(&flush_list_mutex); + return page_cleaner_is_idle; + } + /** Wake up the page cleaner if needed */ + inline void page_cleaner_wakeup(); + + /** Register whether an explicit wakeup of the page cleaner is needed */ + void page_cleaner_set_idle(bool deep_sleep) + { + mysql_mutex_assert_owner(&flush_list_mutex); + page_cleaner_is_idle= deep_sleep; + } + // n_flush_LRU + n_flush_list is approximately COUNT(io_fix()==BUF_IO_WRITE) // in flush_list diff --git a/storage/innobase/include/os0file.h b/storage/innobase/include/os0file.h index 5e0039304b0..149da54b63c 100644 --- a/storage/innobase/include/os0file.h +++ b/storage/innobase/include/os0file.h @@ -237,8 +237,7 @@ private: @param off byte offset from the start (SEEK_SET) @param len size of the hole in bytes @return DB_SUCCESS or error code */ - dberr_t punch_hole(os_offset_t off, ulint len) const - MY_ATTRIBUTE((nonnull)); + dberr_t punch_hole(os_offset_t off, ulint len) const; public: /** Page to be written on write operation */ @@ -265,8 +264,7 @@ struct os_file_size_t { os_offset_t m_alloc_size; }; -/** Win NT does not allow more than 64 */ -static const ulint OS_AIO_N_PENDING_IOS_PER_THREAD = 256; +constexpr ulint OS_AIO_N_PENDING_IOS_PER_THREAD= 256; extern ulint os_n_file_reads; extern ulint os_n_file_writes; diff --git a/storage/innobase/include/rw_lock.h b/storage/innobase/include/rw_lock.h index 1388093dc25..c4fbd672207 100644 --- a/storage/innobase/include/rw_lock.h +++ b/storage/innobase/include/rw_lock.h @@ -30,9 +30,9 @@ protected: /** Available lock */ static constexpr uint32_t UNLOCKED= 0; /** Flag to indicate that write_lock() is being held */ - static constexpr uint32_t WRITER= 1 << 31; + static constexpr uint32_t WRITER= 1U << 31; /** Flag to indicate that write_lock_wait() is pending */ - static constexpr uint32_t WRITER_WAITING= 1 << 30; + static constexpr uint32_t WRITER_WAITING= 1U << 30; /** Flag to indicate that write_lock() or write_lock_wait() is pending */ static constexpr uint32_t WRITER_PENDING= WRITER | WRITER_WAITING; diff --git a/storage/innobase/include/srv0srv.h b/storage/innobase/include/srv0srv.h index 44712c5ae66..4fe90e8a0dc 100644 --- a/storage/innobase/include/srv0srv.h +++ b/storage/innobase/include/srv0srv.h @@ -702,13 +702,6 @@ Complete the shutdown tasks such as background DROP TABLE, and optionally change buffer merge (on innodb_fast_shutdown=0). */ void srv_shutdown(bool ibuf_merge); - -/************************************************************************* -A task which prints warnings about semaphore waits which have lasted -too long. These can be used to track bugs which cause hangs. -*/ -void srv_error_monitor_task(void*); - } /* extern "C" */ #ifdef UNIV_DEBUG @@ -900,12 +893,14 @@ struct srv_slot_t{ extern tpool::thread_pool *srv_thread_pool; extern std::unique_ptr<tpool::timer> srv_master_timer; -extern std::unique_ptr<tpool::timer> srv_error_monitor_timer; extern std::unique_ptr<tpool::timer> srv_monitor_timer; +/** The interval at which srv_monitor_task is invoked, in milliseconds */ +constexpr unsigned SRV_MONITOR_INTERVAL= 15000; /* 4 times per minute */ + static inline void srv_monitor_timer_schedule_now() { - srv_monitor_timer->set_time(0, 5000); + srv_monitor_timer->set_time(0, SRV_MONITOR_INTERVAL); } static inline void srv_start_periodic_timer(std::unique_ptr<tpool::timer>& t, void (*func)(void*), int period) diff --git a/storage/innobase/include/trx0undo.h b/storage/innobase/include/trx0undo.h index 9325f39a309..e032c53263a 100644 --- a/storage/innobase/include/trx0undo.h +++ b/storage/innobase/include/trx0undo.h @@ -160,8 +160,7 @@ trx_undo_get_first_rec(const fil_space_t &space, uint32_t page_no, NOTE: This corresponds to a redo log record and must not be changed! @see mtr_t::undo_create() @param[in,out] block undo log page */ -void trx_undo_page_init(const buf_block_t &block) - MY_ATTRIBUTE((nonnull)); +void trx_undo_page_init(const buf_block_t &block); /** Allocate an undo log page. @param[in,out] undo undo log diff --git a/storage/innobase/log/log0log.cc b/storage/innobase/log/log0log.cc index 4a063765569..bda34a21960 100644 --- a/storage/innobase/log/log0log.cc +++ b/storage/innobase/log/log0log.cc @@ -1009,7 +1009,6 @@ ATTRIBUTE_COLD void logs_empty_and_mark_files_at_shutdown() !srv_read_only_mode && srv_fast_shutdown < 2) { buf_dump_start(); } - srv_error_monitor_timer.reset(); srv_monitor_timer.reset(); lock_sys.timeout_timer.reset(); if (do_srv_shutdown) { diff --git a/storage/innobase/page/page0cur.cc b/storage/innobase/page/page0cur.cc index 5ca0d120a10..781884842ef 100644 --- a/storage/innobase/page/page0cur.cc +++ b/storage/innobase/page/page0cur.cc @@ -2683,7 +2683,7 @@ corrupted: data_len-= enc_hdr_l >> 3; data= &static_cast<const byte*>(data)[enc_hdr_l >> 3]; - memcpy(buf, &prev_rec[-REC_N_NEW_EXTRA_BYTES - hdr_c], hdr_c); + memcpy(buf, prev_rec - REC_N_NEW_EXTRA_BYTES - hdr_c, hdr_c); buf+= hdr_c; *buf++= static_cast<byte>((enc_hdr_l & 3) << 4); /* info_bits; n_owned=0 */ *buf++= static_cast<byte>(h >> 5); /* MSB of heap number */ diff --git a/storage/innobase/row/row0merge.cc b/storage/innobase/row/row0merge.cc index 558bf6d5a07..e306cb3429b 100644 --- a/storage/innobase/row/row0merge.cc +++ b/storage/innobase/row/row0merge.cc @@ -1820,8 +1820,8 @@ row_merge_read_clustered_index( based on that. */ clust_index = dict_table_get_first_index(old_table); - const ulint old_trx_id_col = DATA_TRX_ID - DATA_N_SYS_COLS - + ulint(old_table->n_cols); + const ulint old_trx_id_col = ulint(old_table->n_cols) + - (DATA_N_SYS_COLS - DATA_TRX_ID); ut_ad(old_table->cols[old_trx_id_col].mtype == DATA_SYS); ut_ad(old_table->cols[old_trx_id_col].prtype == (DATA_TRX_ID | DATA_NOT_NULL)); diff --git a/storage/innobase/srv/srv0srv.cc b/storage/innobase/srv/srv0srv.cc index eb5d87b677e..c686ed7784a 100644 --- a/storage/innobase/srv/srv0srv.cc +++ b/storage/innobase/srv/srv0srv.cc @@ -186,7 +186,7 @@ tpool::thread_pool* srv_thread_pool; /** Maximum number of times allowed to conditionally acquire mutex before switching to blocking wait on the mutex */ -#define MAX_MUTEX_NOWAIT 20 +#define MAX_MUTEX_NOWAIT 2 /** Check whether the number of failed nonblocking mutex acquisition attempts exceeds maximum allowed value. If so, @@ -555,8 +555,7 @@ struct purge_coordinator_state static purge_coordinator_state purge_state; -/** threadpool timer for srv_error_monitor_task(). */ -std::unique_ptr<tpool::timer> srv_error_monitor_timer; +/** threadpool timer for srv_monitor_task() */ std::unique_ptr<tpool::timer> srv_monitor_timer; @@ -769,16 +768,11 @@ srv_boot(void) /******************************************************************//** Refreshes the values used to calculate per-second averages. */ -static -void -srv_refresh_innodb_monitor_stats(void) -/*==================================*/ +static void srv_refresh_innodb_monitor_stats(time_t current_time) { mutex_enter(&srv_innodb_monitor_mutex); - time_t current_time = time(NULL); - - if (difftime(current_time, srv_last_monitor_time) <= 60) { + if (difftime(current_time, srv_last_monitor_time) < 60) { /* We referesh InnoDB Monitor values so that averages are printed from at most 60 last seconds */ mutex_exit(&srv_innodb_monitor_mutex); @@ -1309,26 +1303,18 @@ struct srv_monitor_state_t static srv_monitor_state_t monitor_state; /** A task which prints the info output by various InnoDB monitors.*/ -void srv_monitor_task(void*) +static void srv_monitor() { - double time_elapsed; - time_t current_time; - - ut_ad(!srv_read_only_mode); - - current_time = time(NULL); - - time_elapsed = difftime(current_time, monitor_state.last_monitor_time); + time_t current_time = time(NULL); - if (time_elapsed > 15) { + if (difftime(current_time, monitor_state.last_monitor_time) >= 15) { monitor_state.last_monitor_time = current_time; if (srv_print_innodb_monitor) { /* Reset mutex_skipped counter everytime srv_print_innodb_monitor changes. This is to ensure we will not be blocked by lock_sys.mutex - for short duration information printing, - such as requested by sync_array_print_long_waits() */ + for short duration information printing */ if (!monitor_state.last_srv_print_monitor) { monitor_state.mutex_skipped = 0; monitor_state.last_srv_print_monitor = true; @@ -1366,14 +1352,14 @@ void srv_monitor_task(void*) } } - srv_refresh_innodb_monitor_stats(); + srv_refresh_innodb_monitor_stats(current_time); } /*********************************************************************//** A task which prints warnings about semaphore waits which have lasted too long. These can be used to track bugs which cause hangs. */ -void srv_error_monitor_task(void*) +void srv_monitor_task(void*) { /* number of successive fatal timeouts observed */ static ulint fatal_cnt; @@ -1408,20 +1394,17 @@ void srv_error_monitor_task(void*) if (sync_array_print_long_waits(&waiter, &sema) && sema == old_sema && os_thread_eq(waiter, old_waiter)) { #if defined(WITH_WSREP) && defined(WITH_INNODB_DISALLOW_WRITES) - if (os_event_is_set(srv_allow_writes_event)) { -#endif /* WITH_WSREP */ - fatal_cnt++; -#if defined(WITH_WSREP) && defined(WITH_INNODB_DISALLOW_WRITES) - } else { - fprintf(stderr, - "WSREP: avoiding InnoDB self crash due to long " - "semaphore wait of > %lu seconds\n" - "Server is processing SST donor operation, " - "fatal_cnt now: " ULINTPF, - srv_fatal_semaphore_wait_threshold, fatal_cnt); - } + if (!os_event_is_set(srv_allow_writes_event)) { + fprintf(stderr, + "WSREP: avoiding InnoDB self crash due to " + "long semaphore wait of > %lu seconds\n" + "Server is processing SST donor operation, " + "fatal_cnt now: " ULINTPF, + srv_fatal_semaphore_wait_threshold, fatal_cnt); + return; + } #endif /* WITH_WSREP */ - if (fatal_cnt > 10) { + if (fatal_cnt++) { ib::fatal() << "Semaphore wait has lasted > " << srv_fatal_semaphore_wait_threshold << " seconds. We intentionally crash the" @@ -1432,6 +1415,8 @@ void srv_error_monitor_task(void*) old_waiter = waiter; old_sema = sema; } + + srv_monitor(); } /******************************************************************//** diff --git a/storage/innobase/srv/srv0start.cc b/storage/innobase/srv/srv0start.cc index 6ffd59f97d8..143de203beb 100644 --- a/storage/innobase/srv/srv0start.cc +++ b/storage/innobase/srv/srv0start.cc @@ -339,7 +339,7 @@ static dberr_t create_log_file(bool create_new_db, lsn_t lsn, @param[in,out] logfile0 name of the first log file @return error code @retval DB_SUCCESS on successful operation */ -MY_ATTRIBUTE((warn_unused_result, nonnull)) +MY_ATTRIBUTE((warn_unused_result)) static dberr_t create_log_file_rename(lsn_t lsn, std::string &logfile0) { ut_ad(!srv_log_file_created); @@ -1813,8 +1813,8 @@ file_checked: DBUG_EXECUTE_IF("innodb_skip_monitors", goto skip_monitors;); /* Create the task which warns of long semaphore waits */ - srv_start_periodic_timer(srv_error_monitor_timer, srv_error_monitor_task, 1000); - srv_start_periodic_timer(srv_monitor_timer, srv_monitor_task, 5000); + srv_start_periodic_timer(srv_monitor_timer, srv_monitor_task, + SRV_MONITOR_INTERVAL); #ifndef DBUG_OFF skip_monitors: diff --git a/storage/innobase/sync/sync0arr.cc b/storage/innobase/sync/sync0arr.cc index 4b6f818000c..4ae3b02c071 100644 --- a/storage/innobase/sync/sync0arr.cc +++ b/storage/innobase/sync/sync0arr.cc @@ -893,6 +893,7 @@ sync_array_print_long_waits_low( #else # define SYNC_ARRAY_TIMEOUT 240 #endif + const time_t now = time(NULL); for (ulint i = 0; i < arr->n_cells; i++) { @@ -908,7 +909,7 @@ sync_array_print_long_waits_low( continue; } - double diff = difftime(time(NULL), cell->reservation_time); + double diff = difftime(now, cell->reservation_time); if (diff > SYNC_ARRAY_TIMEOUT) { ib::warn() << "A long semaphore wait:"; @@ -982,12 +983,6 @@ sync_array_print_long_waits( } if (noticed) { - fprintf(stderr, - "InnoDB: ###### Starts InnoDB Monitor" - " for 30 secs to print diagnostic info:\n"); - - my_bool old_val = srv_print_innodb_monitor; - /* If some crucial semaphore is reserved, then also the InnoDB Monitor can hang, and we do not get diagnostics. Since in many cases an InnoDB hang is caused by a pwrite() or a pread() @@ -1000,14 +995,7 @@ sync_array_print_long_waits( MONITOR_VALUE(MONITOR_OS_PENDING_READS), MONITOR_VALUE(MONITOR_OS_PENDING_WRITES)); - srv_print_innodb_monitor = TRUE; - lock_wait_timeout_task(nullptr); - - srv_print_innodb_monitor = static_cast<my_bool>(old_val); - fprintf(stderr, - "InnoDB: ###### Diagnostic info printed" - " to the standard error stream\n"); } return(fatal); diff --git a/storage/innobase/trx/trx0rec.cc b/storage/innobase/trx/trx0rec.cc index 83942891357..e006258c447 100644 --- a/storage/innobase/trx/trx0rec.cc +++ b/storage/innobase/trx/trx0rec.cc @@ -2452,7 +2452,7 @@ trx_undo_prev_version_build( == rec_get_nth_field_size(rec, n)); ulint l = rec_get_1byte_offs_flag(*old_vers) ? (n + 1) : (n + 1) * 2; - (*old_vers)[-REC_N_OLD_EXTRA_BYTES - l] + *(*old_vers - REC_N_OLD_EXTRA_BYTES - l) &= byte(~REC_1BYTE_SQL_NULL_MASK); } } diff --git a/tpool/aio_linux.cc b/tpool/aio_linux.cc index 24bc04c75ba..91d1d08c3ff 100644 --- a/tpool/aio_linux.cc +++ b/tpool/aio_linux.cc @@ -1,4 +1,4 @@ -/* Copyright(C) 2019 MariaDB Corporation. +/* Copyright (C) 2019, 2020, MariaDB Corporation. This program is free software; you can redistribute itand /or modify it under the terms of the GNU General Public License as published by @@ -14,133 +14,153 @@ along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ #include "tpool_structs.h" - -#include <stdlib.h> -#include <signal.h> -#include <assert.h> #include "tpool.h" -#include <thread> + #ifdef LINUX_NATIVE_AIO -#include <libaio.h> +# include <thread> +# include <atomic> +# include <libaio.h> +# include <sys/syscall.h> + +/** + Invoke the io_getevents() system call. + + @param ctx context from io_setup() + @param min_nr minimum number of completion events to wait for + @param nr maximum number of completion events to collect + @param ev the collected events + + In https://pagure.io/libaio/c/7cede5af5adf01ad26155061cc476aad0804d3fc + the io_getevents() implementation in libaio was "optimized" so that it + would elide the system call when there are no outstanding requests + and a timeout was specified. + + The libaio code for dereferencing ctx would occasionally trigger + SIGSEGV if io_destroy() was concurrently invoked from another thread. + Hence, we use the raw system call. +*/ +static int my_getevents(io_context_t ctx, long min_nr, long nr, io_event *ev) +{ + int saved_errno= errno; + int ret= syscall(__NR_io_getevents, reinterpret_cast<long>(ctx), + min_nr, nr, ev, 0); + if (ret < 0) + { + ret= -errno; + errno= saved_errno; + } + return ret; +} #endif + + /* Linux AIO implementation, based on native AIO. Needs libaio.h and -laio at the compile time. - submit_io() is used to submit async IO. + io_submit() is used to submit async IO. - There is a single thread, that collects the completion notification - with io_getevent(), and forwards io completion callback + A single thread will collect the completion notification + with io_getevents() and forward io completion callback to the worker threadpool. */ namespace tpool { #ifdef LINUX_NATIVE_AIO -class aio_linux : public aio +class aio_linux final : public aio { - thread_pool* m_pool; + thread_pool *m_pool; io_context_t m_io_ctx; - bool m_in_shutdown; std::thread m_getevent_thread; + static std::atomic<bool> shutdown_in_progress; - static void getevent_thread_routine(aio_linux* aio) + static void getevent_thread_routine(aio_linux *aio) { + /* We collect this many events at a time. os_aio_init() would + multiply OS_AIO_N_PENDING_THREADS by the number of read and write threads + and ultimately pass it to io_setup() via thread_pool::configure_aio(). */ + constexpr unsigned MAX_EVENTS= 256; + io_event events[MAX_EVENTS]; for (;;) { - io_event event; - struct timespec ts{0, 500000000}; - int ret = io_getevents(aio->m_io_ctx, 1, 1, &event, &ts); - - if (aio->m_in_shutdown) - break; - - if (ret > 0) - { - aiocb* iocb = (aiocb*)event.obj; - long long res = event.res; - if (res < 0) + switch (int ret= my_getevents(aio->m_io_ctx, 1, MAX_EVENTS, events)) { + case -EINTR: + continue; + case -EINVAL: + if (shutdown_in_progress) + return; + /* fall through */ + default: + if (ret < 0) { - iocb->m_err = static_cast<int>(-res); - iocb->m_ret_len = 0; + fprintf(stderr, "io_getevents returned %d\n", ret); + abort(); + return; } - else + for (int i= 0; i < ret; i++) { - iocb->m_ret_len = ret; - iocb->m_err = 0; + const io_event &event= events[i]; + aiocb *iocb= static_cast<aiocb*>(event.obj); + if (static_cast<int>(event.res) < 0) + { + iocb->m_err= -event.res; + iocb->m_ret_len= 0; + } + else + { + iocb->m_ret_len= event.res; + iocb->m_err= 0; + } + iocb->m_internal_task.m_func= iocb->m_callback; + iocb->m_internal_task.m_arg= iocb; + iocb->m_internal_task.m_group= iocb->m_group; + aio->m_pool->submit_task(&iocb->m_internal_task); } - - iocb->m_internal_task.m_func = iocb->m_callback; - iocb->m_internal_task.m_arg = iocb; - iocb->m_internal_task.m_group = iocb->m_group; - aio->m_pool->submit_task(&iocb->m_internal_task); - continue; - } - switch (ret) - { - case -EAGAIN: - usleep(1000); - continue; - case -EINTR: - case 0: - continue; - default: - fprintf(stderr, "io_getevents returned %d\n", ret); - abort(); } } } public: - aio_linux(io_context_t ctx, thread_pool* pool) + aio_linux(io_context_t ctx, thread_pool *pool) : m_pool(pool), m_io_ctx(ctx), - m_in_shutdown(), m_getevent_thread(getevent_thread_routine, this) + m_getevent_thread(getevent_thread_routine, this) { } ~aio_linux() { - m_in_shutdown = true; - m_getevent_thread.join(); + shutdown_in_progress= true; io_destroy(m_io_ctx); + m_getevent_thread.join(); + shutdown_in_progress= false; } - // Inherited via aio - virtual int submit_io(aiocb* cb) override + int submit_io(aiocb *cb) override { - - if (cb->m_opcode == aio_opcode::AIO_PREAD) - io_prep_pread((iocb *)cb, cb->m_fh, cb->m_buffer, cb->m_len, - cb->m_offset); - else - io_prep_pwrite((iocb *)cb, cb->m_fh, cb->m_buffer, cb->m_len, - cb->m_offset); - - int ret; - ret = io_submit(m_io_ctx, 1, (iocb * *)& cb); + io_prep_pread(static_cast<iocb*>(cb), cb->m_fh, cb->m_buffer, cb->m_len, + cb->m_offset); + if (cb->m_opcode != aio_opcode::AIO_PREAD) + cb->aio_lio_opcode= IO_CMD_PWRITE; + iocb *icb= static_cast<iocb*>(cb); + int ret= io_submit(m_io_ctx, 1, &icb); if (ret == 1) return 0; - errno = -ret; + errno= -ret; return -1; } - // Inherited via aio - virtual int bind(native_file_handle& fd) override - { - return 0; - } - virtual int unbind(const native_file_handle& fd) override - { - return 0; - } + int bind(native_file_handle&) override { return 0; } + int unbind(const native_file_handle&) override { return 0; } }; -aio* create_linux_aio(thread_pool* pool, int max_io) +std::atomic<bool> aio_linux::shutdown_in_progress; + +aio *create_linux_aio(thread_pool *pool, int max_io) { io_context_t ctx; - memset(&ctx, 0, sizeof(ctx)); - int ret = io_setup(max_io, &ctx); - if (ret) + memset(&ctx, 0, sizeof ctx); + if (int ret= io_setup(max_io, &ctx)) { fprintf(stderr, "io_setup(%d) returned %d\n", max_io, ret); return nullptr; @@ -148,9 +168,6 @@ aio* create_linux_aio(thread_pool* pool, int max_io) return new aio_linux(ctx, pool); } #else -aio* create_linux_aio(thread_pool* pool, int max_aio) -{ - return nullptr; -} +aio *create_linux_aio(thread_pool*, int) { return nullptr; } #endif } diff --git a/tpool/tpool_generic.cc b/tpool/tpool_generic.cc index 80fa87fb8c1..98237063ec2 100644 --- a/tpool/tpool_generic.cc +++ b/tpool/tpool_generic.cc @@ -230,19 +230,19 @@ class thread_pool_generic : public thread_pool /** Maximimum number of threads in this pool. */ unsigned int m_max_threads; - /* Maintainence related statistics (see maintainence()) */ + /* maintenance related statistics (see maintenance()) */ size_t m_last_thread_count; unsigned long long m_last_activity; - std::unique_ptr<timer> m_maintaince_timer_task; + std::unique_ptr<timer> m_maintenance_timer_task; void worker_main(worker_data *thread_data); void worker_end(worker_data* thread_data); /* Checks threadpool responsiveness, adjusts thread_counts */ - void maintainence(); - static void maintainence_func(void* arg) + void maintenance(); + static void maintenance_func(void* arg) { - ((thread_pool_generic *)arg)->maintainence(); + ((thread_pool_generic *)arg)->maintenance(); } bool add_thread(); bool wake(worker_wake_reason reason, task *t = nullptr); @@ -528,11 +528,11 @@ void thread_pool_generic::worker_main(worker_data *thread_var) Periodic job to fix thread count and concurrency, in case of long tasks, etc */ -void thread_pool_generic::maintainence() +void thread_pool_generic::maintenance() { /* If pool is busy (i.e the its mutex is currently locked), we can - skip the maintainence task, some times, to lower mutex contention + skip the maintenance task, some times, to lower mutex contention */ static int skip_counter; const int MAX_SKIPS = 10; @@ -691,7 +691,7 @@ thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) : m_max_threads(max_threads), m_last_thread_count(), m_last_activity(), - m_maintaince_timer_task() + m_maintenance_timer_task() { if (m_max_threads < m_concurrency) @@ -703,8 +703,8 @@ thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) : if (min_threads < max_threads) { - m_maintaince_timer_task.reset(new timer_generic(thread_pool_generic::maintainence_func, this, nullptr)); - m_maintaince_timer_task->set_time((int)m_timer_interval.count(), (int)m_timer_interval.count()); + m_maintenance_timer_task.reset(new timer_generic(thread_pool_generic::maintenance_func, this, nullptr)); + m_maintenance_timer_task->set_time((int)m_timer_interval.count(), (int)m_timer_interval.count()); } } @@ -792,7 +792,7 @@ thread_pool_generic::~thread_pool_generic() m_aio.reset(); /* Also stop the maintanence task early. */ - m_maintaince_timer_task.reset(); + m_maintenance_timer_task.reset(); std::unique_lock<std::mutex> lk(m_mtx); m_in_shutdown= true; |