diff options
47 files changed, 1107 insertions, 5809 deletions
diff --git a/extra/mariabackup/xtrabackup.cc b/extra/mariabackup/xtrabackup.cc index 1c71deb4594..6353b8f063c 100644 --- a/extra/mariabackup/xtrabackup.cc +++ b/extra/mariabackup/xtrabackup.cc @@ -102,6 +102,7 @@ Street, Fifth Floor, Boston, MA 02110-1335 USA #include <srv0srv.h> #include <crc_glue.h> #include <log.h> +#include <thr_timer.h> int sys_var_init(); @@ -4052,7 +4053,7 @@ fail: especially in 64-bit computers */ } - + srv_thread_pool_init(); sync_check_init(); ut_d(sync_check_enable()); /* Reset the system variables in the recovery module. */ @@ -6133,9 +6134,12 @@ int main(int argc, char **argv) DBUG_SET(dbug_option); } #endif + /* Main functions for library */ + init_thr_timer(5); int status = main_low(server_defaults); + end_thr_timer(); backup_cleanup(); if (innobackupex_mode) { diff --git a/mysql-test/suite/gcol/r/innodb_virtual_debug_purge.result b/mysql-test/suite/gcol/r/innodb_virtual_debug_purge.result index d99565b2f4c..1bbc577ed93 100644 --- a/mysql-test/suite/gcol/r/innodb_virtual_debug_purge.result +++ b/mysql-test/suite/gcol/r/innodb_virtual_debug_purge.result @@ -233,48 +233,3 @@ set global debug_dbug= @saved_dbug; drop table t1; set debug_sync=reset; SET GLOBAL innodb_purge_rseg_truncate_frequency = @saved_frequency; -# -# MDEV-18546 ASAN heap-use-after-free -# in innobase_get_computed_value / row_purge -# -CREATE TABLE t1 ( -pk INT AUTO_INCREMENT, -b BIT(15), -v BIT(15) AS (b) VIRTUAL, -PRIMARY KEY(pk), -UNIQUE(v) -) ENGINE=InnoDB; -INSERT IGNORE INTO t1 (b) VALUES -(NULL),(b'011'),(b'000110100'), -(b'01101101010'),(b'01111001001011'),(NULL); -SET GLOBAL innodb_debug_sync = "ib_clust_v_col_before_row_allocated " - "SIGNAL before_row_allocated " - "WAIT_FOR flush_unlock"; -SET GLOBAL innodb_debug_sync = "ib_open_after_dict_open " - "SIGNAL purge_open " - "WAIT_FOR select_open"; -set @saved_dbug= @@global.debug_dbug; -set global debug_dbug= "+d,ib_purge_virtual_index_callback"; -connect purge_waiter,localhost,root; -SET debug_sync= "now WAIT_FOR before_row_allocated"; -connection default; -REPLACE INTO t1 (pk, b) SELECT pk, b FROM t1; -connection purge_waiter; -connection default; -disconnect purge_waiter; -FLUSH TABLES; -SET GLOBAL innodb_debug_sync = reset; -SET debug_sync= "now SIGNAL flush_unlock WAIT_FOR purge_open"; -SET GLOBAL innodb_debug_sync = reset; -SET debug_sync= "ib_open_after_dict_open SIGNAL select_open"; -SELECT * FROM t1; -pk b v -1 NULL NULL -2 -3 -4 j j -5 K K -6 NULL NULL -DROP TABLE t1; -SET debug_sync= reset; -set global debug_dbug= @saved_dbug; diff --git a/mysql-test/suite/gcol/t/innodb_virtual_debug_purge.test b/mysql-test/suite/gcol/t/innodb_virtual_debug_purge.test index 276407007da..04ab8a88488 100644 --- a/mysql-test/suite/gcol/t/innodb_virtual_debug_purge.test +++ b/mysql-test/suite/gcol/t/innodb_virtual_debug_purge.test @@ -310,67 +310,3 @@ drop table t1; --source include/wait_until_count_sessions.inc set debug_sync=reset; SET GLOBAL innodb_purge_rseg_truncate_frequency = @saved_frequency; - ---echo # ---echo # MDEV-18546 ASAN heap-use-after-free ---echo # in innobase_get_computed_value / row_purge ---echo # - -CREATE TABLE t1 ( - pk INT AUTO_INCREMENT, - b BIT(15), - v BIT(15) AS (b) VIRTUAL, - PRIMARY KEY(pk), - UNIQUE(v) -) ENGINE=InnoDB; -INSERT IGNORE INTO t1 (b) VALUES - (NULL),(b'011'),(b'000110100'), - (b'01101101010'),(b'01111001001011'),(NULL); - -SET GLOBAL innodb_debug_sync = "ib_clust_v_col_before_row_allocated " - "SIGNAL before_row_allocated " - "WAIT_FOR flush_unlock"; -SET GLOBAL innodb_debug_sync = "ib_open_after_dict_open " - "SIGNAL purge_open " - "WAIT_FOR select_open"; - -# In 10.2 trx_undo_roll_ptr_is_insert(t_roll_ptr) condition never pass in purge, -# so this condition is forced to pass in row_vers_old_has_index_entry -set @saved_dbug= @@global.debug_dbug; -set global debug_dbug= "+d,ib_purge_virtual_index_callback"; - -# The purge starts from REPLACE command. To avoid possible race, separate -# connection is used. ---connect(purge_waiter,localhost,root) ---send -SET debug_sync= "now WAIT_FOR before_row_allocated"; - ---connection default -REPLACE INTO t1 (pk, b) SELECT pk, b FROM t1; - ---connection purge_waiter -# Now we will definitely catch ib_clust_v_col_before_row_allocated ---reap ---connection default ---disconnect purge_waiter - -# purge hangs on the sync point. table is purged, ref_count is set to 0 -FLUSH TABLES; - -# Avoid hang on repeating purge. -# Reset Will be applied after first record is purged -SET GLOBAL innodb_debug_sync = reset; - -SET debug_sync= "now SIGNAL flush_unlock WAIT_FOR purge_open"; - -# Avoid hang on repeating purge -SET GLOBAL innodb_debug_sync = reset; - -# select unblocks purge thread -SET debug_sync= "ib_open_after_dict_open SIGNAL select_open"; -SELECT * FROM t1; - -# Cleanup -DROP TABLE t1; -SET debug_sync= reset; -set global debug_dbug= @saved_dbug;
\ No newline at end of file diff --git a/mysql-test/suite/innodb/r/purge_thread_shutdown.result b/mysql-test/suite/innodb/r/purge_thread_shutdown.result index a87cba89917..85ac77e6d49 100644 --- a/mysql-test/suite/innodb/r/purge_thread_shutdown.result +++ b/mysql-test/suite/innodb/r/purge_thread_shutdown.result @@ -6,11 +6,6 @@ select user,state from information_schema.processlist order by 2; user state root root Filling schema table -system user InnoDB purge coordinator -system user InnoDB purge worker -system user InnoDB purge worker -system user InnoDB purge worker -system user InnoDB shutdown handler set global debug_dbug='+d,only_kill_system_threads'; set global innodb_fast_shutdown=0; shutdown; @@ -19,11 +14,6 @@ disconnect con1; select user,state from information_schema.processlist order by 2; user state root Filling schema table -system user InnoDB purge coordinator -system user InnoDB purge worker -system user InnoDB purge worker -system user InnoDB purge worker -system user InnoDB slow shutdown wait set global innodb_fast_shutdown=1; select user,state from information_schema.processlist order by 2; user state diff --git a/mysql-test/suite/perfschema/r/threads_innodb.result b/mysql-test/suite/perfschema/r/threads_innodb.result index 2229d972038..d79420f6fb5 100644 --- a/mysql-test/suite/perfschema/r/threads_innodb.result +++ b/mysql-test/suite/perfschema/r/threads_innodb.result @@ -5,14 +5,5 @@ FROM performance_schema.threads WHERE name LIKE 'thread/innodb/%' GROUP BY name; name type processlist_user processlist_host processlist_db processlist_command processlist_time processlist_state processlist_info parent_thread_id role instrumented -thread/innodb/io_ibuf_thread BACKGROUND NULL NULL NULL NULL NULL NULL NULL NULL NULL YES -thread/innodb/io_log_thread BACKGROUND NULL NULL NULL NULL NULL NULL NULL NULL NULL YES -thread/innodb/io_read_thread BACKGROUND NULL NULL NULL NULL NULL NULL NULL NULL NULL YES -thread/innodb/io_write_thread BACKGROUND NULL NULL NULL NULL NULL NULL NULL NULL NULL YES thread/innodb/page_cleaner_thread BACKGROUND NULL NULL NULL NULL NULL NULL NULL NULL NULL YES -thread/innodb/srv_error_monitor_thread BACKGROUND NULL NULL NULL NULL NULL NULL NULL NULL NULL YES -thread/innodb/srv_lock_timeout_thread BACKGROUND NULL NULL NULL NULL NULL NULL NULL NULL NULL YES -thread/innodb/srv_master_thread BACKGROUND NULL NULL NULL NULL NULL NULL NULL NULL NULL YES -thread/innodb/srv_monitor_thread BACKGROUND NULL NULL NULL NULL NULL NULL NULL NULL NULL YES -thread/innodb/srv_purge_thread BACKGROUND NULL NULL NULL NULL NULL NULL NULL NULL NULL YES -thread/innodb/thd_destructor_thread BACKGROUND NULL NULL NULL NULL NULL NULL NULL 1 NULL YES +thread/innodb/thread_pool_thread BACKGROUND NULL NULL NULL NULL NULL NULL NULL NULL NULL YES diff --git a/mysql-test/suite/sys_vars/r/sysvars_innodb.result b/mysql-test/suite/sys_vars/r/sysvars_innodb.result index f4c072431c6..274e25c169c 100644 --- a/mysql-test/suite/sys_vars/r/sysvars_innodb.result +++ b/mysql-test/suite/sys_vars/r/sysvars_innodb.result @@ -525,18 +525,6 @@ NUMERIC_BLOCK_SIZE NULL ENUM_VALUE_LIST OFF,ON READ_ONLY NO COMMAND_LINE_ARGUMENT REQUIRED -VARIABLE_NAME INNODB_DEBUG_SYNC -SESSION_VALUE NULL -DEFAULT_VALUE -VARIABLE_SCOPE GLOBAL -VARIABLE_TYPE VARCHAR -VARIABLE_COMMENT debug_sync for innodb purge threads. Use it to set up sync points for all purge threads at once. The commands will be applied sequentially at the beginning of purging the next undo record. -NUMERIC_MIN_VALUE NULL -NUMERIC_MAX_VALUE NULL -NUMERIC_BLOCK_SIZE NULL -ENUM_VALUE_LIST NULL -READ_ONLY NO -COMMAND_LINE_ARGUMENT NONE VARIABLE_NAME INNODB_DEFAULT_ENCRYPTION_KEY_ID SESSION_VALUE 1 DEFAULT_VALUE 1 diff --git a/storage/innobase/CMakeLists.txt b/storage/innobase/CMakeLists.txt index cbd280af223..92c66a6513a 100644 --- a/storage/innobase/CMakeLists.txt +++ b/storage/innobase/CMakeLists.txt @@ -19,6 +19,7 @@ INCLUDE(innodb.cmake) +INCLUDE_DIRECTORIES(${PROJECT_SOURCE_DIR}/tpool) SET(INNOBASE_SOURCES btr/btr0btr.cc @@ -154,7 +155,8 @@ MYSQL_ADD_PLUGIN(innobase ${INNOBASE_SOURCES} STORAGE_ENGINE ${CRC32_LIBRARY} ${NUMA_LIBRARY} ${LIBSYSTEMD} - ${LINKER_SCRIPT}) + ${LINKER_SCRIPT} + tpool) IF(NOT TARGET innobase) RETURN() diff --git a/storage/innobase/btr/btr0cur.cc b/storage/innobase/btr/btr0cur.cc index 20c5d0f7f3c..0d256f7f2cb 100644 --- a/storage/innobase/btr/btr0cur.cc +++ b/storage/innobase/btr/btr0cur.cc @@ -3267,10 +3267,6 @@ btr_cur_prefetch_siblings( page_id_t(block->page.id.space(), right_page_no), block->zip_size(), false); } - if (left_page_no != FIL_NULL - || right_page_no != FIL_NULL) { - os_aio_simulated_wake_handler_threads(); - } } /*************************************************************//** diff --git a/storage/innobase/btr/btr0defragment.cc b/storage/innobase/btr/btr0defragment.cc index fc34a75d11a..300d66456e6 100644 --- a/storage/innobase/btr/btr0defragment.cc +++ b/storage/innobase/btr/btr0defragment.cc @@ -71,6 +71,21 @@ The difference between btr_defragment_count and btr_defragment_failures shows the amount of effort wasted. */ Atomic_counter<ulint> btr_defragment_count; +bool btr_defragment_active; + +struct defragment_chunk_state_t +{ + btr_defragment_item_t* m_item; +}; + +static defragment_chunk_state_t defragment_chunk_state; +static void btr_defragment_chunk(void*); + +static tpool::timer* btr_defragment_timer; +static tpool::task_group task_group(1); +static tpool::task btr_defragment_task(btr_defragment_chunk, 0, &task_group); +static void btr_defragment_start(); + /******************************************************************//** Constructor for btr_defragment_item_t. */ btr_defragment_item_t::btr_defragment_item_t( @@ -94,6 +109,11 @@ btr_defragment_item_t::~btr_defragment_item_t() { } } +static void submit_defragment_task(void*arg=0) +{ + srv_thread_pool->submit_task(&btr_defragment_task); +} + /******************************************************************//** Initialize defragmentation. */ void @@ -101,6 +121,9 @@ btr_defragment_init() { srv_defragment_interval = 1000000000ULL / srv_defragment_frequency; mutex_create(LATCH_ID_BTR_DEFRAGMENT_MUTEX, &btr_defragment_mutex); + defragment_chunk_state.m_item = 0; + btr_defragment_timer = srv_thread_pool->create_timer(submit_defragment_task); + btr_defragment_active = true; } /******************************************************************//** @@ -108,6 +131,11 @@ Shutdown defragmentation. Release all resources. */ void btr_defragment_shutdown() { + if (!btr_defragment_timer) + return; + delete btr_defragment_timer; + btr_defragment_timer = 0; + task_group.cancel_pending(&btr_defragment_task); mutex_enter(&btr_defragment_mutex); std::list< btr_defragment_item_t* >::iterator iter = btr_defragment_wq.begin(); while(iter != btr_defragment_wq.end()) { @@ -117,6 +145,7 @@ btr_defragment_shutdown() } mutex_exit(&btr_defragment_mutex); mutex_free(&btr_defragment_mutex); + btr_defragment_active = false; } @@ -197,6 +226,10 @@ btr_defragment_add_index( btr_defragment_item_t* item = new btr_defragment_item_t(pcur, event); mutex_enter(&btr_defragment_mutex); btr_defragment_wq.push_back(item); + if(btr_defragment_wq.size() == 1){ + /* Kick off defragmentation work */ + btr_defragment_start(); + } mutex_exit(&btr_defragment_mutex); return event; } @@ -674,14 +707,29 @@ btr_defragment_n_pages( return current_block; } -/** Whether btr_defragment_thread is active */ -bool btr_defragment_thread_active; -/** Merge consecutive b-tree pages into fewer pages to defragment indexes */ -extern "C" UNIV_INTERN -os_thread_ret_t -DECLARE_THREAD(btr_defragment_thread)(void*) + +void btr_defragment_start() { + if (!srv_defragment) + return; + ut_ad(!btr_defragment_wq.empty()); + submit_defragment_task(); +} + + +/** +Callback used by defragment timer + +Throttling "sleep", is implemented via rescheduling the +threadpool timer, which, when fired, will resume the work again, +where it is left. + +The state (current item) is stored in function parameter. +*/ +static void btr_defragment_chunk(void*) { + defragment_chunk_state_t* state = &defragment_chunk_state; + btr_pcur_t* pcur; btr_cur_t* cursor; dict_index_t* index; @@ -690,37 +738,24 @@ DECLARE_THREAD(btr_defragment_thread)(void*) buf_block_t* last_block; while (srv_shutdown_state == SRV_SHUTDOWN_NONE) { - ut_ad(btr_defragment_thread_active); - - /* If defragmentation is disabled, sleep before - checking whether it's enabled. */ - if (!srv_defragment) { - os_thread_sleep(BTR_DEFRAGMENT_SLEEP_IN_USECS); - continue; - } - /* The following call won't remove the item from work queue. - We only get a pointer to it to work on. This will make sure - when user issue a kill command, all indices are in the work - queue to be searched. This also means that the user thread - cannot directly remove the item from queue (since we might be - using it). So user thread only marks index as removed. */ - btr_defragment_item_t* item = btr_defragment_get_item(); - /* If work queue is empty, sleep and check later. */ - if (!item) { - os_thread_sleep(BTR_DEFRAGMENT_SLEEP_IN_USECS); - continue; + if (!state->m_item) { + state->m_item = btr_defragment_get_item(); } /* If an index is marked as removed, we remove it from the work queue. No other thread could be using this item at this point so it's safe to remove now. */ - if (item->removed) { - btr_defragment_remove_item(item); - continue; + while (state->m_item && state->m_item->removed) { + btr_defragment_remove_item(state->m_item); + state->m_item = btr_defragment_get_item(); + } + if (!state->m_item) { + /* Queue empty */ + return; } - pcur = item->pcur; + pcur = state->m_item->pcur; ulonglong now = my_interval_timer(); - ulonglong elapsed = now - item->last_processed; + ulonglong elapsed = now - state->m_item->last_processed; if (elapsed < srv_defragment_interval) { /* If we see an index again before the interval @@ -729,12 +764,12 @@ DECLARE_THREAD(btr_defragment_thread)(void*) defragmentation of all indices queue up on a single thread, it's likely other indices that follow this one don't need to sleep again. */ - os_thread_sleep(static_cast<ulint> - ((srv_defragment_interval - elapsed) - / 1000)); + int sleep_ms = (int)((srv_defragment_interval - elapsed) / 1000 / 1000); + if (sleep_ms) { + btr_defragment_timer->set_time(sleep_ms, 0); + return; + } } - - now = my_interval_timer(); mtr_start(&mtr); cursor = btr_pcur_get_btr_cur(pcur); index = btr_cur_get_index(cursor); @@ -763,7 +798,7 @@ DECLARE_THREAD(btr_defragment_thread)(void*) btr_pcur_store_position(pcur, &mtr); mtr_commit(&mtr); /* Update the last_processed time of this index. */ - item->last_processed = now; + state->m_item->last_processed = now; } else { dberr_t err = DB_SUCCESS; mtr_commit(&mtr); @@ -786,11 +821,8 @@ DECLARE_THREAD(btr_defragment_thread)(void*) } } - btr_defragment_remove_item(item); + btr_defragment_remove_item(state->m_item); + state->m_item = NULL; } } - - btr_defragment_thread_active = false; - os_thread_exit(); - OS_THREAD_DUMMY_RETURN; } diff --git a/storage/innobase/buf/buf0buf.cc b/storage/innobase/buf/buf0buf.cc index 41d242a9360..89f75b8de55 100644 --- a/storage/innobase/buf/buf0buf.cc +++ b/storage/innobase/buf/buf0buf.cc @@ -2021,7 +2021,7 @@ buf_pool_init_instance( /* Initialize the temporal memory array and slots */ new(&buf_pool->io_buf) buf_pool_t::io_buf_t( (srv_n_read_io_threads + srv_n_write_io_threads) - * (8 * OS_AIO_N_PENDING_IOS_PER_THREAD)); + * OS_AIO_N_PENDING_IOS_PER_THREAD); buf_pool_mutex_exit(buf_pool); @@ -3186,48 +3186,39 @@ calc_buf_pool_size: return; } -/** This is the thread for resizing buffer pool. It waits for an event and -when waked up either performs a resizing and sleeps again. -@return this function does not return, calls os_thread_exit() -*/ -extern "C" -os_thread_ret_t -DECLARE_THREAD(buf_resize_thread)(void*) +/* Thread pool task invoked by innodb_buffer_pool_size changes. */ +static void buf_resize_callback(void *) { - my_thread_init(); - - while (srv_shutdown_state == SRV_SHUTDOWN_NONE) { - os_event_wait(srv_buf_resize_event); - os_event_reset(srv_buf_resize_event); - - if (srv_shutdown_state != SRV_SHUTDOWN_NONE) { - break; - } - - buf_pool_mutex_enter_all(); - if (srv_buf_pool_old_size == srv_buf_pool_size) { - buf_pool_mutex_exit_all(); - std::ostringstream sout; - sout << "Size did not change (old size = new size = " - << srv_buf_pool_size << ". Nothing to do."; - buf_resize_status(sout.str().c_str()); - - /* nothing to do */ - continue; - } + ut_a(srv_shutdown_state == SRV_SHUTDOWN_NONE); + buf_pool_mutex_enter_all(); + if (srv_buf_pool_old_size == srv_buf_pool_size) { buf_pool_mutex_exit_all(); - - buf_pool_resize(); + std::ostringstream sout; + sout << "Size did not change (old size = new size = " + << srv_buf_pool_size << ". Nothing to do."; + buf_resize_status(sout.str().c_str()); + return; } + buf_pool_mutex_exit_all(); + buf_pool_resize(); +} - srv_buf_resize_thread_active = false; +/* Ensure that task does not run in parallel, by setting max_concurrency to 1 for the thread group */ +static tpool::task_group single_threaded_group(1); +static tpool::waitable_task buf_resize_task(buf_resize_callback, + nullptr, &single_threaded_group); - my_thread_end(); - os_thread_exit(); +void buf_resize_start() +{ + srv_thread_pool->submit_task(&buf_resize_task); +} - OS_THREAD_DUMMY_RETURN; +void buf_resize_shutdown() +{ + buf_resize_task.wait(); } + #ifdef BTR_CUR_HASH_ADAPT /** Clear the adaptive hash index on all pages in the buffer pool. */ void diff --git a/storage/innobase/buf/buf0dblwr.cc b/storage/innobase/buf/buf0dblwr.cc index 791b5614fc2..a248bb28f6d 100644 --- a/storage/innobase/buf/buf0dblwr.cc +++ b/storage/innobase/buf/buf0dblwr.cc @@ -99,10 +99,6 @@ void buf_dblwr_sync_datafiles() /*======================*/ { - /* Wake possible simulated aio thread to actually post the - writes to the operating system */ - os_aio_simulated_wake_handler_threads(); - /* Wait that all async writes to tablespaces have been posted to the OS */ os_aio_wait_until_no_pending_writes(); @@ -914,11 +910,6 @@ buf_dblwr_write_block_to_datafile( ut_a(buf_page_in_file(bpage)); ulint type = IORequest::WRITE; - - if (sync) { - type |= IORequest::DO_NOT_WAKE; - } - IORequest request(type, const_cast<buf_page_t*>(bpage)); /* We request frame here to get correct buffer in case of @@ -950,9 +941,8 @@ buf_dblwr_write_block_to_datafile( } /********************************************************************//** -Flushes possible buffered writes from the doublewrite memory buffer to disk, -and also wakes up the aio thread if simulated aio is used. It is very -important to call this function after a batch of writes has been posted, +Flushes possible buffered writes from the doublewrite memory buffer to disk. +It is very important to call this function after a batch of writes has been posted, and also when we may have to wait for a page latch! Otherwise a deadlock of threads can occur. */ void @@ -982,13 +972,6 @@ try_again: if (buf_dblwr->first_free == 0) { mutex_exit(&buf_dblwr->mutex); - - /* Wake possible simulated aio thread as there could be - system temporary tablespace pages active for flushing. - Note: system temporary tablespace pages are not scheduled - for doublewrite. */ - os_aio_simulated_wake_handler_threads(); - return; } @@ -1090,12 +1073,6 @@ flush: buf_dblwr_write_block_to_datafile( buf_dblwr->buf_block_arr[i], false); } - - /* Wake possible simulated aio thread to actually post the - writes to the operating system. We don't flush the files - at this point. We leave it to the IO helper thread to flush - datafiles when the whole batch has been processed. */ - os_aio_simulated_wake_handler_threads(); } /********************************************************************//** diff --git a/storage/innobase/buf/buf0dump.cc b/storage/innobase/buf/buf0dump.cc index 25e92158623..81909a839d2 100644 --- a/storage/innobase/buf/buf0dump.cc +++ b/storage/innobase/buf/buf0dump.cc @@ -45,6 +45,8 @@ Created April 08, 2011 Vasil Dimov #include "mysql/service_wsrep.h" /* wsrep_recovery */ #include <my_service_manager.h> +static void buf_do_load_dump(); + enum status_severity { STATUS_INFO, STATUS_ERR @@ -80,7 +82,7 @@ buf_dump_start() /*============*/ { buf_dump_should_start = true; - os_event_set(srv_buf_dump_event); + buf_do_load_dump(); } /*****************************************************************//** @@ -93,7 +95,7 @@ buf_load_start() /*============*/ { buf_load_should_start = true; - os_event_set(srv_buf_dump_event); + buf_do_load_dump(); } /*****************************************************************//** @@ -720,9 +722,6 @@ buf_load() page_id_t(this_space_id, BUF_DUMP_PAGE(dump[i])), zip_size, true); - if (i % 64 == 63) { - os_aio_simulated_wake_handler_threads(); - } if (buf_load_abort_flag) { if (space != NULL) { @@ -802,22 +801,13 @@ buf_load_abort() } /*****************************************************************//** -This is the main thread for buffer pool dump/load. It waits for an -event and when waked up either performs a dump or load and sleeps -again. -@return this function does not return, it calls os_thread_exit() */ -extern "C" -os_thread_ret_t -DECLARE_THREAD(buf_dump_thread)(void*) +This is the main task for buffer pool dump/load. when scheduled +either performs a dump or load, depending on server state, state of the variables etc- */ +static void buf_dump_load_func(void *) { - my_thread_init(); ut_ad(!srv_read_only_mode); - /* JAN: TODO: MySQL 5.7 PSI -#ifdef UNIV_PFS_THREAD - pfs_register_thread(buf_dump_thread_key); - #endif */ /* UNIV_PFS_THREAD */ - - if (srv_buffer_pool_load_at_startup) { + static bool first_time = true; + if (first_time && srv_buffer_pool_load_at_startup) { #ifdef WITH_WSREP if (!get_wsrep_recovery()) { @@ -827,27 +817,24 @@ DECLARE_THREAD(buf_dump_thread)(void*) } #endif /* WITH_WSREP */ } + first_time = false; while (!SHUTTING_DOWN()) { - - os_event_wait(srv_buf_dump_event); - if (buf_dump_should_start) { buf_dump_should_start = false; buf_dump(TRUE /* quit on shutdown */); } - if (buf_load_should_start) { buf_load_should_start = false; buf_load(); } - if (buf_dump_should_start || buf_load_should_start) { - continue; + if (!buf_dump_should_start && !buf_load_should_start) { + return; } - os_event_reset(srv_buf_dump_event); } + /* In shutdown */ if (srv_buffer_pool_dump_at_shutdown && srv_fast_shutdown != 2) { if (export_vars.innodb_buffer_pool_load_incomplete) { buf_dump_status(STATUS_INFO, @@ -860,13 +847,33 @@ DECLARE_THREAD(buf_dump_thread)(void*) buf_dump(FALSE/* do complete dump at shutdown */); } } +} - srv_buf_dump_thread_active = false; - my_thread_end(); - /* We count the number of threads in os_thread_exit(). A created - thread should always use that to exit and not use return() to exit. */ - os_thread_exit(); +/* Execute tak with max.concurrency */ +tpool::task_group tpool_group(1); +static tpool::waitable_task buf_dump_load_task(buf_dump_load_func, &tpool_group); +static bool load_dump_enabled; - OS_THREAD_DUMMY_RETURN; +/** Start async buffer pool load, if srv_buffer_pool_load_at_startup was set.*/ +void buf_load_at_startup() +{ + load_dump_enabled = true; + if (srv_buffer_pool_load_at_startup) { + buf_do_load_dump(); + } +} + +static void buf_do_load_dump() +{ + if (!load_dump_enabled || buf_dump_load_task.is_running()) + return; + srv_thread_pool->submit_task(&buf_dump_load_task); +} + +/** Wait for currently running load/dumps to finish*/ +void buf_load_dump_end() +{ + ut_ad(SHUTTING_DOWN()); + buf_dump_load_task.wait(); } diff --git a/storage/innobase/buf/buf0flu.cc b/storage/innobase/buf/buf0flu.cc index 5a589936e5f..d7d658e25e1 100644 --- a/storage/innobase/buf/buf0flu.cc +++ b/storage/innobase/buf/buf0flu.cc @@ -89,6 +89,11 @@ mysql_pfs_key_t page_cleaner_thread_key; /** Event to synchronise with the flushing. */ os_event_t buf_flush_event; +static void pc_flush_slot_func(void *); +static tpool::task_group page_cleaner_task_group(1); +static tpool::waitable_task pc_flush_slot_task( + pc_flush_slot_func, 0, &page_cleaner_task_group); + /** State for page cleaner array slot */ enum page_cleaner_state_t { /** Not requested any yet. @@ -146,8 +151,6 @@ struct page_cleaner_t { ib_mutex_t mutex; /*!< mutex to protect whole of page_cleaner_t struct and page_cleaner_slot_t slots. */ - os_event_t is_requested; /*!< event to activate worker - threads. */ os_event_t is_finished; /*!< event to signal that all slots were finished. */ os_event_t is_started; /*!< event to signal that @@ -186,6 +189,9 @@ struct page_cleaner_t { #endif /* UNIV_DEBUG */ }; +static void pc_submit_task(); +static void pc_wait_all_tasks(); + static page_cleaner_t page_cleaner; #ifdef UNIV_DEBUG @@ -946,8 +952,8 @@ buf_flush_init_for_writing( } /********************************************************************//** -Does an asynchronous write of a buffer page. NOTE: in simulated aio and -also when the doublewrite buffer is used, we must call +Does an asynchronous write of a buffer page. NOTE: when the +doublewrite buffer is used, we must call buf_dblwr_flush_buffered_writes after we have posted a batch of writes! */ static @@ -1046,7 +1052,7 @@ buf_flush_write_block_low( && space->use_doublewrite(); if (!use_doublewrite) { - ulint type = IORequest::WRITE | IORequest::DO_NOT_WAKE; + ulint type = IORequest::WRITE; IORequest request(type, bpage); @@ -1100,9 +1106,7 @@ buf_flush_write_block_low( /********************************************************************//** Writes a flushable page asynchronously from the buffer pool to a file. -NOTE: in simulated aio we must call -os_aio_simulated_wake_handler_threads after we have posted a batch of -writes! NOTE: buf_pool->mutex and buf_page_get_mutex(bpage) must be +NOTE: buf_pool->mutex and buf_page_get_mutex(bpage) must be held upon entering this function, and they will be released by this function if it returns true. @return TRUE if the page was flushed */ @@ -1931,8 +1935,6 @@ buf_flush_end( if (!srv_read_only_mode) { buf_dblwr_flush_buffered_writes(); - } else { - os_aio_simulated_wake_handler_threads(); } } @@ -2659,7 +2661,6 @@ buf_flush_page_cleaner_init(void) mutex_create(LATCH_ID_PAGE_CLEANER, &page_cleaner.mutex); - page_cleaner.is_requested = os_event_create("pc_is_requested"); page_cleaner.is_finished = os_event_create("pc_is_finished"); page_cleaner.is_started = os_event_create("pc_is_started"); page_cleaner.n_slots = static_cast<ulint>(srv_buf_pool_instances); @@ -2722,8 +2723,10 @@ pc_request( page_cleaner.n_slots_flushing = 0; page_cleaner.n_slots_finished = 0; - os_event_set(page_cleaner.is_requested); - + /* Submit slots-1 tasks, coordinator also does the work itself */ + for (ulint i = pc_flush_slot_task.get_ref_count(); i < page_cleaner.n_slots - 1; i++) { + pc_submit_task(); + } mutex_exit(&page_cleaner.mutex); } @@ -2741,9 +2744,7 @@ pc_flush_slot(void) mutex_enter(&page_cleaner.mutex); - if (!page_cleaner.n_slots_requested) { - os_event_reset(page_cleaner.is_requested); - } else { + if (page_cleaner.n_slots_requested) { page_cleaner_slot_t* slot = NULL; ulint i; @@ -2771,10 +2772,6 @@ pc_flush_slot(void) goto finish_mutex; } - if (page_cleaner.n_slots_requested == 0) { - os_event_reset(page_cleaner.is_requested); - } - mutex_exit(&page_cleaner.mutex); lru_tm = ut_time_ms(); @@ -2956,18 +2953,6 @@ void buf_flush_page_cleaner_disabled_debug_update(THD*, } innodb_page_cleaner_disabled_debug = false; - - /* Enable page cleaner threads. */ - while (srv_shutdown_state == SRV_SHUTDOWN_NONE) { - mutex_enter(&page_cleaner.mutex); - const ulint n = page_cleaner.n_disabled_debug; - mutex_exit(&page_cleaner.mutex); - /* Check if all threads have been enabled, to avoid - problem when we decide to re-disable them soon. */ - if (n == 0) { - break; - } - } return; } @@ -2976,34 +2961,7 @@ void buf_flush_page_cleaner_disabled_debug_update(THD*, } innodb_page_cleaner_disabled_debug = true; - - while (srv_shutdown_state == SRV_SHUTDOWN_NONE) { - /* Workers are possibly sleeping on is_requested. - - We have to wake them, otherwise they could possibly - have never noticed, that they should be disabled, - and we would wait for them here forever. - - That's why we have sleep-loop instead of simply - waiting on some disabled_debug_event. */ - os_event_set(page_cleaner.is_requested); - - mutex_enter(&page_cleaner.mutex); - - ut_ad(page_cleaner.n_disabled_debug - <= srv_n_page_cleaners); - - if (page_cleaner.n_disabled_debug - == srv_n_page_cleaners) { - - mutex_exit(&page_cleaner.mutex); - break; - } - - mutex_exit(&page_cleaner.mutex); - - os_thread_sleep(100000); - } + pc_wait_all_tasks(); } #endif /* UNIV_DEBUG */ @@ -3316,7 +3274,7 @@ DECLARE_THREAD(buf_flush_page_cleaner_coordinator)(void*) /* At this point all threads including the master and the purge thread must have been suspended. */ - ut_a(srv_get_active_thread_type() == SRV_NONE); + ut_a(!srv_any_background_activity()); ut_a(srv_shutdown_state == SRV_SHUTDOWN_FLUSH_PHASE); /* We can now make a final sweep on flushing the buffer pool @@ -3348,7 +3306,7 @@ DECLARE_THREAD(buf_flush_page_cleaner_coordinator)(void*) } while (!success || n_flushed > 0); /* Some sanity checks */ - ut_a(srv_get_active_thread_type() == SRV_NONE); + ut_a(!srv_any_background_activity()); ut_a(srv_shutdown_state == SRV_SHUTDOWN_FLUSH_PHASE); for (ulint i = 0; i < srv_buf_pool_instances; i++) { @@ -3359,21 +3317,12 @@ DECLARE_THREAD(buf_flush_page_cleaner_coordinator)(void*) /* We have lived our life. Time to die. */ thread_exit: - /* All worker threads are waiting for the event here, - and no more access to page_cleaner structure by them. - Wakes worker threads up just to make them exit. */ + /* Wait for worker tasks to finish */ page_cleaner.is_running = false; - - /* waiting for all worker threads exit */ - while (page_cleaner.n_workers) { - os_event_set(page_cleaner.is_requested); - os_thread_sleep(10000); - } - + pc_wait_all_tasks(); mutex_destroy(&page_cleaner.mutex); os_event_destroy(page_cleaner.is_finished); - os_event_destroy(page_cleaner.is_requested); os_event_destroy(page_cleaner.is_started); buf_page_cleaner_is_active = false; @@ -3386,113 +3335,39 @@ thread_exit: OS_THREAD_DUMMY_RETURN; } +static void pc_flush_slot_func(void*) +{ + while (pc_flush_slot() > 0) {}; +} + + /** Adjust thread count for page cleaner workers. @param[in] new_cnt Number of threads to be used */ void buf_flush_set_page_cleaner_thread_cnt(ulong new_cnt) { mutex_enter(&page_cleaner.mutex); - + page_cleaner_task_group.set_max_tasks((uint)new_cnt); srv_n_page_cleaners = new_cnt; - if (new_cnt > page_cleaner.n_workers) { - /* User has increased the number of page - cleaner threads. */ - ulint add = new_cnt - page_cleaner.n_workers; - for (ulint i = 0; i < add; i++) { - os_thread_id_t cleaner_thread_id; - os_thread_create(buf_flush_page_cleaner_worker, NULL, &cleaner_thread_id); - } - } mutex_exit(&page_cleaner.mutex); - /* Wait until defined number of workers has started. */ - while (page_cleaner.is_running && - page_cleaner.n_workers != (srv_n_page_cleaners - 1)) { - os_event_set(page_cleaner.is_requested); - os_event_reset(page_cleaner.is_started); - os_event_wait_time(page_cleaner.is_started, 1000000); - } + } -/******************************************************************//** -Worker thread of page_cleaner. -@return a dummy parameter */ -extern "C" -os_thread_ret_t -DECLARE_THREAD(buf_flush_page_cleaner_worker)( -/*==========================================*/ - void* arg MY_ATTRIBUTE((unused))) - /*!< in: a dummy parameter required by - os_thread_create */ + +void pc_submit_task() { - my_thread_init(); -#ifndef DBUG_OFF - os_thread_id_t cleaner_thread_id = os_thread_get_curr_id(); +#ifdef UNIV_DEBUG + if (innodb_page_cleaner_disabled_debug) + return; #endif + srv_thread_pool->submit_task(&pc_flush_slot_task); +} - mutex_enter(&page_cleaner.mutex); - ulint thread_no = page_cleaner.n_workers++; - - DBUG_LOG("ib_buf", "Thread " << cleaner_thread_id - << " started; n_workers=" << page_cleaner.n_workers); - - /* Signal that we have started */ - os_event_set(page_cleaner.is_started); - mutex_exit(&page_cleaner.mutex); - -#ifdef UNIV_LINUX - /* linux might be able to set different setting for each thread - worth to try to set high priority for page cleaner threads */ - if (buf_flush_page_cleaner_set_priority( - buf_flush_page_cleaner_priority)) { - - ib::info() << "page_cleaner worker priority: " - << buf_flush_page_cleaner_priority; - } -#endif /* UNIV_LINUX */ - - while (true) { - os_event_wait(page_cleaner.is_requested); - - ut_d(buf_flush_page_cleaner_disabled_loop()); - - if (!page_cleaner.is_running) { - break; - } - - ut_ad(srv_n_page_cleaners >= 1); - - /* If number of page cleaner threads is decreased - exit those that are not anymore needed. */ - if (srv_shutdown_state == SRV_SHUTDOWN_NONE && - thread_no >= (srv_n_page_cleaners - 1)) { - DBUG_LOG("ib_buf", "Exiting " - << thread_no - << " page cleaner worker thread_id " - << os_thread_pf(cleaner_thread_id) - << " total threads " << srv_n_page_cleaners << "."); - break; - } - - pc_flush_slot(); - } - - mutex_enter(&page_cleaner.mutex); - page_cleaner.n_workers--; - - DBUG_LOG("ib_buf", "Thread " << cleaner_thread_id - << " exiting; n_workers=" << page_cleaner.n_workers); - - /* Signal that we have stopped */ - os_event_set(page_cleaner.is_started); - mutex_exit(&page_cleaner.mutex); - - my_thread_end(); - - os_thread_exit(); - - OS_THREAD_DUMMY_RETURN; +void pc_wait_all_tasks() +{ + pc_flush_slot_task.wait(); } /*******************************************************************//** diff --git a/storage/innobase/buf/buf0lru.cc b/storage/innobase/buf/buf0lru.cc index 79fafa68c4c..1cbe4ac86ea 100644 --- a/storage/innobase/buf/buf0lru.cc +++ b/storage/innobase/buf/buf0lru.cc @@ -529,9 +529,6 @@ buf_flush_or_remove_page( buf_pool, bpage, BUF_FLUSH_SINGLE_PAGE, false); if (processed) { - /* Wake possible simulated aio thread to actually - post the writes to the operating system */ - os_aio_simulated_wake_handler_threads(); buf_pool_mutex_enter(buf_pool); } else { mutex_exit(block_mutex); @@ -1038,7 +1035,7 @@ buf_LRU_check_size_of_non_data_objects( buf_lru_switched_on_innodb_mon = true; srv_print_innodb_monitor = TRUE; - os_event_set(srv_monitor_event); + srv_monitor_timer_schedule_now(); } } else if (buf_lru_switched_on_innodb_mon) { diff --git a/storage/innobase/buf/buf0rea.cc b/storage/innobase/buf/buf0rea.cc index f147bb807ce..989c7284e21 100644 --- a/storage/innobase/buf/buf0rea.cc +++ b/storage/innobase/buf/buf0rea.cc @@ -344,7 +344,7 @@ read_ahead: if (!ibuf_bitmap_page(cur_page_id, zip_size)) { count += buf_read_page_low( &err, false, - IORequest::DO_NOT_WAKE, + 0, ibuf_mode, cur_page_id, zip_size, false); @@ -364,11 +364,6 @@ read_ahead: } } - /* In simulated aio we wake the aio handler threads only after - queuing all aio requests, in native aio the following call does - nothing: */ - - os_aio_simulated_wake_handler_threads(); if (count) { DBUG_PRINT("ib_buf", ("random read-ahead %u pages, %u:%u", @@ -440,7 +435,7 @@ buf_read_page_background(const page_id_t page_id, ulint zip_size, bool sync) count = buf_read_page_low( &err, sync, - IORequest::DO_NOT_WAKE | IORequest::IGNORE_MISSING, + IORequest::IGNORE_MISSING, BUF_READ_ANY_PAGE, page_id, zip_size, false); @@ -712,7 +707,7 @@ buf_read_ahead_linear(const page_id_t page_id, ulint zip_size, bool ibuf) if (!ibuf_bitmap_page(cur_page_id, zip_size)) { count += buf_read_page_low( &err, false, - IORequest::DO_NOT_WAKE, + 0, ibuf_mode, cur_page_id, zip_size, false); switch (err) { @@ -732,12 +727,6 @@ buf_read_ahead_linear(const page_id_t page_id, ulint zip_size, bool ibuf) } } - /* In simulated aio we wake the aio handler threads only after - queuing all aio requests, in native aio the following call does - nothing: */ - - os_aio_simulated_wake_handler_threads(); - if (count) { DBUG_PRINT("ib_buf", ("linear read-ahead " ULINTPF " pages, " "%u:%u", @@ -788,7 +777,6 @@ buf_read_recv_pages( buf_pool = buf_pool_get(cur_page_id); while (buf_pool->n_pend_reads >= recv_n_pool_free_frames / 2) { - os_aio_simulated_wake_handler_threads(); os_thread_sleep(10000); count++; @@ -814,7 +802,7 @@ buf_read_recv_pages( } else { buf_read_page_low( &err, false, - IORequest::DO_NOT_WAKE, + 0, BUF_READ_ANY_PAGE, cur_page_id, zip_size, true); } @@ -825,8 +813,6 @@ buf_read_recv_pages( } } - os_aio_simulated_wake_handler_threads(); - DBUG_PRINT("ib_buf", ("recovery read-ahead (%u pages)", unsigned(n_stored))); } diff --git a/storage/innobase/dict/dict0crea.cc b/storage/innobase/dict/dict0crea.cc index ceb4b8dd483..5fceb8d81b8 100644 --- a/storage/innobase/dict/dict0crea.cc +++ b/storage/innobase/dict/dict0crea.cc @@ -1355,7 +1355,7 @@ dict_check_if_system_table_exists( dict_table_t* sys_table; dberr_t error = DB_SUCCESS; - ut_a(srv_get_active_thread_type() == SRV_NONE); + ut_a(!srv_any_background_activity()); mutex_enter(&dict_sys.mutex); @@ -1395,7 +1395,7 @@ dict_create_or_check_foreign_constraint_tables(void) dberr_t sys_foreign_err; dberr_t sys_foreign_cols_err; - ut_a(srv_get_active_thread_type() == SRV_NONE); + ut_a(!srv_any_background_activity()); /* Note: The master thread has not been started at this point. */ @@ -1537,7 +1537,7 @@ dict_create_or_check_sys_virtual() my_bool srv_file_per_table_backup; dberr_t err; - ut_a(srv_get_active_thread_type() == SRV_NONE); + ut_a(!srv_any_background_activity()); /* Note: The master thread has not been started at this point. */ err = dict_check_if_system_table_exists( @@ -2064,7 +2064,7 @@ dict_create_or_check_sys_tablespace(void) dberr_t sys_tablespaces_err; dberr_t sys_datafiles_err; - ut_a(srv_get_active_thread_type() == SRV_NONE); + ut_a(!srv_any_background_activity()); /* Note: The master thread has not been started at this point. */ diff --git a/storage/innobase/dict/dict0defrag_bg.cc b/storage/innobase/dict/dict0defrag_bg.cc index 7e61e298ac6..0d9cb185b81 100644 --- a/storage/innobase/dict/dict0defrag_bg.cc +++ b/storage/innobase/dict/dict0defrag_bg.cc @@ -44,7 +44,6 @@ typedef defrag_pool_t::iterator defrag_pool_iterator_t; by background defragmentation. */ defrag_pool_t defrag_pool; -extern bool dict_stats_start_shutdown; /*****************************************************************//** Initialize the defrag pool, called once during thread initialization. */ @@ -134,10 +133,11 @@ dict_stats_defrag_pool_add( item.table_id = index->table->id; item.index_id = index->id; defrag_pool.push_back(item); - + if (defrag_pool.size() == 1) { + /* Kick off dict stats optimizer work */ + dict_stats_schedule_now(); + } mutex_exit(&defrag_pool_mutex); - - os_event_set(dict_stats_event); } /*****************************************************************//** @@ -224,7 +224,7 @@ void dict_defrag_process_entries_from_defrag_pool() /*==========================================*/ { - while (defrag_pool.size() && !dict_stats_start_shutdown) { + while (defrag_pool.size()) { dict_stats_process_entry_from_defrag_pool(); } } diff --git a/storage/innobase/dict/dict0stats_bg.cc b/storage/innobase/dict/dict0stats_bg.cc index 2985b6faf35..b61c52a2f0f 100644 --- a/storage/innobase/dict/dict0stats_bg.cc +++ b/storage/innobase/dict/dict0stats_bg.cc @@ -42,24 +42,11 @@ Created Apr 25, 2012 Vasil Dimov /** Minimum time interval between stats recalc for a given table */ #define MIN_RECALC_INTERVAL 10 /* seconds */ - -/** Event to wake up dict_stats_thread on dict_stats_recalc_pool_add() -or shutdown. Not protected by any mutex. */ -os_event_t dict_stats_event; - -/** Variable to initiate shutdown the dict stats thread. Note we don't -use 'srv_shutdown_state' because we want to shutdown dict stats thread -before purge thread. */ -bool dict_stats_start_shutdown; - -/** Event to wait for shutdown of the dict stats thread */ -os_event_t dict_stats_shutdown_event; +static void dict_stats_schedule(int ms); #ifdef UNIV_DEBUG /** Used by SET GLOBAL innodb_dict_stats_disabled_debug = 1; */ my_bool innodb_dict_stats_disabled_debug; - -static os_event_t dict_stats_disabled_event; #endif /* UNIV_DEBUG */ /** This mutex protects the "recalc_pool" variable. */ @@ -118,7 +105,9 @@ static void dict_stats_recalc_pool_add( /*=======================*/ - const dict_table_t* table) /*!< in: table to add */ + const dict_table_t* table, /*!< in: table to add */ + bool schedule_dict_stats_task = true /*!< in: schedule dict stats task */ +) { ut_ad(!srv_read_only_mode); @@ -136,10 +125,11 @@ dict_stats_recalc_pool_add( } recalc_pool.push_back(table->id); - + if (recalc_pool.size() == 1 && schedule_dict_stats_task) { + dict_stats_schedule_now(); + } mutex_exit(&recalc_pool_mutex); - os_event_set(dict_stats_event); } #ifdef WITH_WSREP @@ -295,13 +285,10 @@ dict_stats_wait_bg_to_stop_using_table( Initialize global variables needed for the operation of dict_stats_thread() Must be called before dict_stats_thread() is started. */ void -dict_stats_thread_init() +dict_stats_init() { ut_a(!srv_read_only_mode); - dict_stats_event = os_event_create(0); - dict_stats_shutdown_event = os_event_create(0); - ut_d(dict_stats_disabled_event = os_event_create(0)); /* The recalc_pool_mutex is acquired from: 1) the background stats gathering thread before any other latch @@ -324,37 +311,34 @@ dict_stats_thread_init() } /*****************************************************************//** -Free resources allocated by dict_stats_thread_init(), must be called -after dict_stats_thread() has exited. */ +Free resources allocated by dict_stats_init(), must be called +after dict_stats task has exited. */ void -dict_stats_thread_deinit() +dict_stats_deinit() /*======================*/ { - ut_a(!srv_read_only_mode); - ut_ad(!srv_dict_stats_thread_active); - if (!stats_initialised) { return; } + ut_a(!srv_read_only_mode); stats_initialised = false; dict_stats_recalc_pool_deinit(); dict_defrag_pool_deinit(); mutex_free(&recalc_pool_mutex); - - ut_d(os_event_destroy(dict_stats_disabled_event)); - os_event_destroy(dict_stats_event); - os_event_destroy(dict_stats_shutdown_event); - dict_stats_start_shutdown = false; } /*****************************************************************//** Get the first table that has been added for auto recalc and eventually -update its stats. */ +update its stats. +@return : true if pool was non-empty and first entry does +not needs delay, false otherwise. + +*/ static -void +bool dict_stats_process_entry_from_recalc_pool() /*=======================================*/ { @@ -362,10 +346,11 @@ dict_stats_process_entry_from_recalc_pool() ut_ad(!srv_read_only_mode); +next_table_id: /* pop the first table from the auto recalc pool */ if (!dict_stats_recalc_pool_get(&table_id)) { /* no tables for auto recalc */ - return; + return false; } dict_table_t* table; @@ -378,7 +363,7 @@ dict_stats_process_entry_from_recalc_pool() /* table does not exist, must have been DROPped after its id was enqueued */ mutex_exit(&dict_sys.mutex); - return; + goto next_table_id; } ut_ad(!table->is_temporary()); @@ -386,7 +371,7 @@ dict_stats_process_entry_from_recalc_pool() if (!fil_table_accessible(table)) { dict_table_close(table, TRUE, FALSE); mutex_exit(&dict_sys.mutex); - return; + goto next_table_id; } table->stats_bg_flag |= BG_STAT_IN_PROGRESS; @@ -399,7 +384,7 @@ dict_stats_process_entry_from_recalc_pool() find out that this is a problem, then the check below could eventually be replaced with something else, though a time interval is the natural approach. */ - + int ret; if (difftime(time(NULL), table->stats_last_recalc) < MIN_RECALC_INTERVAL) { @@ -407,11 +392,13 @@ dict_stats_process_entry_from_recalc_pool() too frequent stats updates we put back the table on the auto recalc list and do nothing. */ - dict_stats_recalc_pool_add(table); - + dict_stats_recalc_pool_add(table, false); + dict_stats_schedule(MIN_RECALC_INTERVAL*1000); + ret = false; } else { dict_stats_update(table, DICT_STATS_RECALC_PERSISTENT); + ret = true; } mutex_enter(&dict_sys.mutex); @@ -421,6 +408,7 @@ dict_stats_process_entry_from_recalc_pool() dict_table_close(table, TRUE, FALSE); mutex_exit(&dict_sys.mutex); + return ret; } #ifdef UNIV_DEBUG @@ -430,89 +418,51 @@ dict_stats_process_entry_from_recalc_pool() void dict_stats_disabled_debug_update(THD*, st_mysql_sys_var*, void*, const void* save) { - /* This method is protected by mutex, as every SET GLOBAL .. */ - ut_ad(dict_stats_disabled_event != NULL); - const bool disable = *static_cast<const my_bool*>(save); - - const int64_t sig_count = os_event_reset(dict_stats_disabled_event); - - innodb_dict_stats_disabled_debug = disable; - - if (disable) { - os_event_set(dict_stats_event); - os_event_wait_low(dict_stats_disabled_event, sig_count); - } + if (disable) + dict_stats_shutdown(); + else + dict_stats_start(); } #endif /* UNIV_DEBUG */ +static tpool::timer* dict_stats_timer; +std::mutex dict_stats_mutex; -/*****************************************************************//** -This is the thread for background stats gathering. It pops tables, from -the auto recalc list and proceeds them, eventually recalculating their -statistics. -@return this function does not return, it calls os_thread_exit() */ -extern "C" -os_thread_ret_t -DECLARE_THREAD(dict_stats_thread)(void*) +static void dict_stats_func(void*) { - my_thread_init(); - ut_a(!srv_read_only_mode); - -#ifdef UNIV_PFS_THREAD - /* JAN: TODO: MySQL 5.7 PSI - pfs_register_thread(dict_stats_thread_key); - */ -#endif /* UNIV_PFS_THREAD */ - - while (!dict_stats_start_shutdown) { - - /* Wake up periodically even if not signaled. This is - because we may lose an event - if the below call to - dict_stats_process_entry_from_recalc_pool() puts the entry back - in the list, the os_event_set() will be lost by the subsequent - os_event_reset(). */ - os_event_wait_time( - dict_stats_event, MIN_RECALC_INTERVAL * 1000000); - -#ifdef UNIV_DEBUG - while (innodb_dict_stats_disabled_debug) { - os_event_set(dict_stats_disabled_event); - if (dict_stats_start_shutdown) { - break; - } - os_event_wait_time( - dict_stats_event, 100000); - } -#endif /* UNIV_DEBUG */ - - if (dict_stats_start_shutdown) { - break; - } + while (dict_stats_process_entry_from_recalc_pool()) {} + dict_defrag_process_entries_from_defrag_pool(); +} - dict_stats_process_entry_from_recalc_pool(); - dict_defrag_process_entries_from_defrag_pool(); - os_event_reset(dict_stats_event); +void dict_stats_start() +{ + std::lock_guard<std::mutex> lk(dict_stats_mutex); + if (dict_stats_timer) { + return; } + dict_stats_timer = srv_thread_pool->create_timer(dict_stats_func); +} - srv_dict_stats_thread_active = false; - - os_event_set(dict_stats_shutdown_event); - my_thread_end(); - /* We count the number of threads in os_thread_exit(). A created - thread should always use that to exit instead of return(). */ - os_thread_exit(); +static void dict_stats_schedule(int ms) +{ + std::lock_guard<std::mutex> lk(dict_stats_mutex); + if(dict_stats_timer) { + dict_stats_timer->set_time(ms,0); + } +} - OS_THREAD_DUMMY_RETURN; +void dict_stats_schedule_now() +{ + dict_stats_schedule(0); } /** Shut down the dict_stats_thread. */ -void -dict_stats_shutdown() +void dict_stats_shutdown() { - dict_stats_start_shutdown = true; - os_event_set(dict_stats_event); - os_event_wait(dict_stats_shutdown_event); + std::lock_guard<std::mutex> lk(dict_stats_mutex); + delete dict_stats_timer; + dict_stats_timer = 0; } diff --git a/storage/innobase/fil/fil0fil.cc b/storage/innobase/fil/fil0fil.cc index 9c497b59b9e..c59eb0a8efa 100644 --- a/storage/innobase/fil/fil0fil.cc +++ b/storage/innobase/fil/fil0fil.cc @@ -954,7 +954,6 @@ fil_mutex_enter_and_prepare_for_io( break; } else { mutex_exit(&fil_system.mutex); - os_aio_simulated_wake_handler_threads(); os_thread_sleep(20000); /* Flush tablespaces so that we can close modified files in the LRU list */ @@ -4148,13 +4147,7 @@ fil_io( } else if (req_type.is_read() && !recv_no_ibuf_operations && ibuf_page(page_id, zip_size, NULL)) { - mode = OS_AIO_IBUF; - - /* Reduce probability of deadlock bugs in connection with ibuf: - do not let the ibuf i/o handler sleep */ - - req_type.clear_do_not_wake(); } else { mode = OS_AIO_NORMAL; } @@ -4200,8 +4193,6 @@ fil_io( return(DB_TABLESPACE_DELETED); } - ut_ad(mode != OS_AIO_IBUF || fil_type_is_data(space->purpose)); - ulint cur_page_no = page_id.page_no(); fil_node_t* node = UT_LIST_GET_FIRST(space->chain); @@ -4337,34 +4328,27 @@ fil_io( return(err); } -/**********************************************************************//** -Waits for an aio operation to complete. This function is used to write the -handler for completed requests. The aio array of pending requests is divided -into segments (see os0file.cc for more info). The thread specifies which -segment it wants to wait for. */ +#include <tpool.h> +/**********************************************************************/ + +/* Callback for AIO completion */ void -fil_aio_wait( -/*=========*/ - ulint segment) /*!< in: the number of the segment in the aio - array to wait for */ +fil_aio_callback(const tpool::aiocb *cb) { - fil_node_t* node; - IORequest type; - void* message; + os_aio_userdata_t *data=(os_aio_userdata_t *)cb->m_userdata; + fil_node_t* node= data->node; + IORequest type = data->type; + void* message = data->message; ut_ad(fil_validate_skip()); - dberr_t err = os_aio_handler(segment, &node, &message, &type); - - ut_a(err == DB_SUCCESS); + ut_a(cb->m_err == DB_SUCCESS); if (node == NULL) { ut_ad(srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS); return; } - srv_set_io_thread_op_info(segment, "complete io for fil node"); - mutex_enter(&fil_system.mutex); fil_node_complete_io(node, type); @@ -4382,7 +4366,6 @@ fil_aio_wait( deadlocks in the i/o system. We keep tablespace 0 data files always open, and use a special i/o thread to serve insert buffer requests. */ - srv_set_io_thread_op_info(segment, "complete io for buf page"); /* async single page writes from the dblwr buffer don't have access to the page */ @@ -4396,7 +4379,7 @@ fil_aio_wait( bpage->init_on_flush = false; dblwr = false; } - err = buf_page_io_complete(bpage, dblwr); + dberr_t err = buf_page_io_complete(bpage, dblwr); if (err == DB_SUCCESS) { return; } diff --git a/storage/innobase/fsp/fsp0file.cc b/storage/innobase/fsp/fsp0file.cc index 4deacd1c3b6..c21a75687e6 100644 --- a/storage/innobase/fsp/fsp0file.cc +++ b/storage/innobase/fsp/fsp0file.cc @@ -157,7 +157,7 @@ void Datafile::init_file_info() { #ifdef _WIN32 - GetFileInformationByHandle(m_handle, &m_file_info); + GetFileInformationByHandle((os_file_t)m_handle, &m_file_info); #else fstat(m_handle, &m_file_info); #endif /* WIN32 */ diff --git a/storage/innobase/fts/fts0opt.cc b/storage/innobase/fts/fts0opt.cc index d21107b4728..6f3049f39aa 100644 --- a/storage/innobase/fts/fts0opt.cc +++ b/storage/innobase/fts/fts0opt.cc @@ -39,6 +39,12 @@ Completed 2011/7/10 Sunny and Jimmy Yang /** The FTS optimize thread's work queue. */ ib_wqueue_t* fts_optimize_wq; +static void fts_optimize_callback(void *); +static void timer_callback(void*); +static tpool::timer* timer; +static tpool::task_group task_group(1); +static tpool::task task(fts_optimize_callback,0, &task_group); + /** The FTS vector to store fts_slot_t */ static ib_vector_t* fts_slots; @@ -2535,6 +2541,23 @@ fts_optimize_create_msg( return(msg); } +/** Add message to wqueue, signal thread pool*/ +void add_msg(fts_msg_t *msg, bool wq_locked = false) +{ + ut_a(fts_optimize_wq); + ib_wqueue_add(fts_optimize_wq,msg, msg->heap,wq_locked); + srv_thread_pool->submit_task(&task); +} + +/** +Called by "idle" timer. Submits optimize task, which +will only recalculate is_sync_needed, in case the queue is empty. +*/ +static void timer_callback(void *) +{ + srv_thread_pool->submit_task(&task); +} + /** Add the table to add to the OPTIMIZER's list. @param[in] table table to add */ void fts_optimize_add_table(dict_table_t* table) @@ -2558,7 +2581,7 @@ void fts_optimize_add_table(dict_table_t* table) mutex_enter(&fts_optimize_wq->mutex); - ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true); + add_msg(msg,true); table->fts->in_queue = true; @@ -2608,7 +2631,7 @@ fts_optimize_remove_table( remove->event = event; msg->ptr = remove; - ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true); + add_msg(msg, true); mutex_exit(&fts_optimize_wq->mutex); @@ -2643,7 +2666,7 @@ fts_optimize_request_sync_table( mutex_enter(&fts_optimize_wq->mutex); - ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true); + add_msg(msg, true); table->fts->in_queue = true; @@ -2786,24 +2809,22 @@ static void fts_optimize_sync_table(dict_table_t* table) Optimize all FTS tables. @return Dummy return */ static -os_thread_ret_t -DECLARE_THREAD(fts_optimize_thread)( +void fts_optimize_callback( /*================*/ void* arg) /*!< in: work queue*/ { - ulint current = 0; - ibool done = FALSE; - ulint n_tables = 0; - ulint n_optimize = 0; - ib_wqueue_t* wq = (ib_wqueue_t*) arg; + static ulint current = 0; + static ibool done = FALSE; + static ulint n_tables = ib_vector_size(fts_slots); + static ulint n_optimize = 0; + ib_wqueue_t* wq = fts_optimize_wq; ut_ad(!srv_read_only_mode); - my_thread_init(); - - ut_ad(fts_slots); - /* Assign number of tables added in fts_slots_t to n_tables */ - n_tables = ib_vector_size(fts_slots); + if (!fts_optimize_wq) { + /* Possibly timer initiated callback, can come after FTS_MSG_STOP.*/ + return; + } while (!done && srv_shutdown_state == SRV_SHUTDOWN_NONE) { @@ -2831,17 +2852,15 @@ DECLARE_THREAD(fts_optimize_thread)( } else if (n_optimize == 0 || !ib_wqueue_is_empty(wq)) { fts_msg_t* msg; - - msg = static_cast<fts_msg_t*>( - ib_wqueue_timedwait(wq, FTS_QUEUE_WAIT_IN_USECS)); - + msg = static_cast<fts_msg_t*>(ib_wqueue_nowait(wq)); /* Timeout ? */ if (msg == NULL) { if (fts_is_sync_needed()) { fts_need_sync = true; } - - continue; + if (n_tables) + timer->set_time(5000, 0); + return; } switch (msg->type) { @@ -2908,13 +2927,6 @@ DECLARE_THREAD(fts_optimize_thread)( ib::info() << "FTS optimize thread exiting."; os_event_set(fts_opt_shutdown_event); - my_thread_end(); - - /* We count the number of threads in os_thread_exit(). A created - thread should always use that to exit and not use return() to exit. */ - os_thread_exit(); - - OS_THREAD_DUMMY_RETURN; } /**********************************************************************//** @@ -2933,7 +2945,8 @@ fts_optimize_init(void) /* Create FTS optimize work queue */ fts_optimize_wq = ib_wqueue_create(); - ut_a(fts_optimize_wq != NULL); + ut_a(fts_optimize_wq != NULL); + timer = srv_thread_pool->create_timer(timer_callback); /* Create FTS vector to store fts_slot_t */ heap = mem_heap_create(sizeof(dict_table_t*) * 64); @@ -2962,8 +2975,6 @@ fts_optimize_init(void) fts_opt_shutdown_event = os_event_create(0); last_check_sync_time = time(NULL); - - os_thread_create(fts_optimize_thread, fts_optimize_wq, NULL); } /** Shutdown fts optimize thread. */ @@ -2987,15 +2998,18 @@ fts_optimize_shutdown() /* We tell the OPTIMIZE thread to switch to state done, we can't delete the work queue here because the add thread needs deregister the FTS tables. */ + delete timer; + timer = NULL; + task_group.cancel_pending(&task); msg = fts_optimize_create_msg(FTS_MSG_STOP, NULL); - ib_wqueue_add(fts_optimize_wq, msg, msg->heap); + add_msg(msg, false); os_event_wait(fts_opt_shutdown_event); os_event_destroy(fts_opt_shutdown_event); - ib_wqueue_free(fts_optimize_wq); fts_optimize_wq = NULL; + } diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index da317ca23e5..925c66d2b16 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -55,6 +55,8 @@ this program; if not, write to the Free Software Foundation, Inc., #include <mysql/service_thd_alloc.h> #include <mysql/service_thd_wait.h> #include "field.h" +#include "srv0srv.h" + // MYSQL_PLUGIN_IMPORT extern my_bool lower_case_file_system; // MYSQL_PLUGIN_IMPORT extern char mysql_unpacked_real_data_home[]; @@ -69,6 +71,7 @@ this program; if not, write to the Free Software Foundation, Inc., #include "btr0sea.h" #include "buf0dblwr.h" #include "buf0dump.h" +#include "buf0buf.h" #include "buf0flu.h" #include "buf0lru.h" #include "dict0boot.h" @@ -121,8 +124,8 @@ void thd_clear_error(MYSQL_THD thd); TABLE *find_fk_open_table(THD *thd, const char *db, size_t db_len, const char *table, size_t table_len); -MYSQL_THD create_thd(); -void destroy_thd(MYSQL_THD thd); +MYSQL_THD create_background_thd(); +void destroy_background_thd(MYSQL_THD thd); void reset_thd(MYSQL_THD thd); TABLE *open_purge_table(THD *thd, const char *db, size_t dblen, const char *tb, size_t tblen); @@ -250,62 +253,7 @@ is_partition( return strstr(file_name, table_name_t::part_suffix); } -/** Signal to shut down InnoDB (NULL if shutdown was signaled, or if -running in innodb_read_only mode, srv_read_only_mode) */ -std::atomic <st_my_thread_var *> srv_running; -/** Service thread that waits for the server shutdown and stops purge threads. -Purge workers have THDs that are needed to calculate virtual columns. -This THDs must be destroyed rather early in the server shutdown sequence. -This service thread creates a THD and idly waits for it to get a signal to -die. Then it notifies all purge workers to shutdown. -*/ -static pthread_t thd_destructor_thread; - -pthread_handler_t -thd_destructor_proxy(void *) -{ - mysql_mutex_t thd_destructor_mutex; - mysql_cond_t thd_destructor_cond; - - my_thread_init(); - mysql_mutex_init(PSI_NOT_INSTRUMENTED, &thd_destructor_mutex, 0); - mysql_cond_init(PSI_NOT_INSTRUMENTED, &thd_destructor_cond, 0); - - st_my_thread_var *myvar= _my_thread_var(); - myvar->current_mutex = &thd_destructor_mutex; - myvar->current_cond = &thd_destructor_cond; - - THD *thd= create_thd(); - thd_proc_info(thd, "InnoDB shutdown handler"); - - mysql_mutex_lock(&thd_destructor_mutex); - srv_running.store(myvar, std::memory_order_relaxed); - /* wait until the server wakes the THD to abort and die */ - while (!myvar->abort) - mysql_cond_wait(&thd_destructor_cond, &thd_destructor_mutex); - mysql_mutex_unlock(&thd_destructor_mutex); - srv_running.store(NULL, std::memory_order_relaxed); - - while (srv_fast_shutdown == 0 && - (trx_sys.any_active_transactions() || - (uint)thread_count > srv_n_purge_threads + 1)) { - thd_proc_info(thd, "InnoDB slow shutdown wait"); - os_thread_sleep(1000); - } - - /* Some background threads might generate undo pages that will - need to be purged, so they have to be shut down before purge - threads if slow shutdown is requested. */ - srv_shutdown_bg_undo_sources(); - srv_purge_shutdown(); - - destroy_thd(thd); - mysql_cond_destroy(&thd_destructor_cond); - mysql_mutex_destroy(&thd_destructor_mutex); - my_thread_end(); - return 0; -} /** Return the InnoDB ROW_FORMAT enum value @param[in] row_format row_format from "innodb_default_row_format" @@ -547,7 +495,6 @@ performance schema */ static mysql_pfs_key_t commit_cond_mutex_key; static mysql_pfs_key_t commit_cond_key; static mysql_pfs_key_t pending_checkpoint_mutex_key; -static mysql_pfs_key_t thd_destructor_thread_key; static PSI_mutex_info all_pthread_mutexes[] = { PSI_KEY(commit_cond_mutex), @@ -651,23 +598,10 @@ static PSI_rwlock_info all_innodb_rwlocks[] = { performance schema instrumented if "UNIV_PFS_THREAD" is defined */ static PSI_thread_info all_innodb_threads[] = { - PSI_KEY(buf_dump_thread), - PSI_KEY(dict_stats_thread), - PSI_KEY(io_handler_thread), - PSI_KEY(io_ibuf_thread), - PSI_KEY(io_log_thread), - PSI_KEY(io_read_thread), - PSI_KEY(io_write_thread), PSI_KEY(page_cleaner_thread), PSI_KEY(recv_writer_thread), - PSI_KEY(srv_error_monitor_thread), - PSI_KEY(srv_lock_timeout_thread), - PSI_KEY(srv_master_thread), - PSI_KEY(srv_monitor_thread), - PSI_KEY(srv_purge_thread), - PSI_KEY(srv_worker_thread), PSI_KEY(trx_rollback_clean_thread), - PSI_KEY(thd_destructor_thread), + PSI_KEY(thread_pool_thread) }; # endif /* UNIV_PFS_THREAD */ @@ -1586,7 +1520,7 @@ MYSQL_THD innobase_create_background_thd(const char* name) /*============================*/ { - MYSQL_THD thd= create_thd(); + MYSQL_THD thd= create_background_thd(); thd_proc_info(thd, name); THDVAR(thd, background_thread) = true; return thd; @@ -1604,7 +1538,7 @@ innobase_destroy_background_thd( if innodb is in the PLUGIN_IS_DYING state */ innobase_close_connection(innodb_hton_ptr, thd); thd_set_ha_data(thd, innodb_hton_ptr, NULL); - destroy_thd(thd); + destroy_background_thd(thd); } /** Close opened tables, free memory, delete items for a MYSQL_THD. @@ -4012,6 +3946,7 @@ static int innodb_init(void* p) innobase_hton->drop_database = innobase_drop_database; innobase_hton->panic = innobase_end; + innobase_hton->pre_shutdown = innodb_preshutdown; innobase_hton->start_consistent_snapshot = innobase_start_trx_and_assign_read_view; @@ -4113,12 +4048,6 @@ static int innodb_init(void* p) if (err != DB_SUCCESS) { innodb_shutdown(); DBUG_RETURN(innodb_init_abort()); - } else if (!srv_read_only_mode) { - mysql_thread_create(thd_destructor_thread_key, - &thd_destructor_thread, - NULL, thd_destructor_proxy, NULL); - while (!srv_running.load(std::memory_order_relaxed)) - os_thread_sleep(20); } srv_was_started = true; @@ -4197,17 +4126,6 @@ innobase_end(handlerton*, ha_panic_function) } } - if (auto r = srv_running.load(std::memory_order_relaxed)) { - ut_ad(!srv_read_only_mode); - if (!abort_loop) { - // may be UNINSTALL PLUGIN statement - mysql_mutex_lock(r->current_mutex); - r->abort = 1; - mysql_cond_broadcast(r->current_cond); - mysql_mutex_unlock(r->current_mutex); - } - pthread_join(thd_destructor_thread, NULL); - } innodb_shutdown(); innobase_space_shutdown(); @@ -17154,7 +17072,7 @@ fast_shutdown_validate( uint new_val = *reinterpret_cast<uint*>(save); if (srv_fast_shutdown && !new_val - && !srv_running.load(std::memory_order_relaxed)) { + && !srv_read_only_mode && abort_loop) { return(1); } @@ -17203,6 +17121,8 @@ innodb_stopword_table_validate( return(ret); } +extern void buf_resize_start(); + /** Update the system variable innodb_buffer_pool_size using the "saved" value. This function is registered as a callback with MySQL. @param[in] save immediate result from check function */ @@ -17216,7 +17136,7 @@ innodb_buffer_pool_size_update(THD*,st_mysql_sys_var*,void*, const void* save) sizeof(export_vars.innodb_buffer_pool_resize_status), "Requested to resize buffer pool."); - os_event_set(srv_buf_resize_event); + buf_resize_start(); ib::info() << export_vars.innodb_buffer_pool_resize_status << " (new size: " << in_val << " bytes)"; @@ -18384,8 +18304,8 @@ innodb_status_output_update(THD*,st_mysql_sys_var*,void*var,const void*save) { *static_cast<my_bool*>(var) = *static_cast<const my_bool*>(save); mysql_mutex_unlock(&LOCK_global_system_variables); - /* Wakeup server monitor thread. */ - os_event_set(srv_monitor_event); + /* Wakeup server monitor. */ + srv_monitor_timer_schedule_now(); mysql_mutex_lock(&LOCK_global_system_variables); } @@ -18455,33 +18375,6 @@ innodb_undo_logs_warn(THD* thd, st_mysql_sys_var*, void*, const void*) innodb_undo_logs_deprecated); } -#ifdef UNIV_DEBUG -static -void -innobase_debug_sync_callback(srv_slot_t *slot, const void *value) -{ - const char *value_str = *static_cast<const char* const*>(value); - size_t len = strlen(value_str) + 1; - - - // One allocatoin for list node object and value. - void *buf = ut_malloc_nokey(sizeof(srv_slot_t::debug_sync_t) + len); - srv_slot_t::debug_sync_t *sync = new(buf) srv_slot_t::debug_sync_t(); - strcpy(reinterpret_cast<char*>(&sync[1]), value_str); - - rw_lock_x_lock(&slot->debug_sync_lock); - UT_LIST_ADD_LAST(slot->debug_sync, sync); - rw_lock_x_unlock(&slot->debug_sync_lock); -} -static -void -innobase_debug_sync_set(THD *thd, st_mysql_sys_var*, void *, const void *value) -{ - srv_for_each_thread(SRV_WORKER, innobase_debug_sync_callback, value); - srv_for_each_thread(SRV_PURGE, innobase_debug_sync_callback, value); -} -#endif - static SHOW_VAR innodb_status_variables_export[]= { {"Innodb", (char*) &show_innodb_vars, SHOW_FUNC}, {NullS, NullS, SHOW_LONG} @@ -19812,16 +19705,6 @@ static MYSQL_SYSVAR_BOOL(debug_force_scrubbing, 0, "Perform extra scrubbing to increase test exposure", NULL, NULL, FALSE); - -char *innobase_debug_sync; -static MYSQL_SYSVAR_STR(debug_sync, innobase_debug_sync, - PLUGIN_VAR_NOCMDARG, - "debug_sync for innodb purge threads. " - "Use it to set up sync points for all purge threads " - "at once. The commands will be applied sequentially at " - "the beginning of purging the next undo record.", - NULL, - innobase_debug_sync_set, NULL); #endif /* UNIV_DEBUG */ static MYSQL_SYSVAR_BOOL(encrypt_temporary_tables, innodb_encrypt_temporary_tables, @@ -20029,7 +19912,6 @@ static struct st_mysql_sys_var* innobase_system_variables[]= { MYSQL_SYSVAR(background_scrub_data_check_interval), #ifdef UNIV_DEBUG MYSQL_SYSVAR(debug_force_scrubbing), - MYSQL_SYSVAR(debug_sync), #endif MYSQL_SYSVAR(buf_dump_status_frequency), MYSQL_SYSVAR(background_thread), diff --git a/storage/innobase/include/btr0defragment.h b/storage/innobase/include/btr0defragment.h index 22f29eae3a6..ed543477bcc 100644 --- a/storage/innobase/include/btr0defragment.h +++ b/storage/innobase/include/btr0defragment.h @@ -86,12 +86,7 @@ void btr_defragment_save_defrag_stats_if_needed( dict_index_t* index); /*!< in: index */ -/** Merge consecutive b-tree pages into fewer pages to defragment indexes */ -extern "C" UNIV_INTERN -os_thread_ret_t -DECLARE_THREAD(btr_defragment_thread)(void*); - -/** Whether btr_defragment_thread is active */ -extern bool btr_defragment_thread_active; - +/* Stop defragmentation.*/ +void btr_defragment_end(); +extern bool btr_defragment_active; #endif diff --git a/storage/innobase/include/buf0buf.h b/storage/innobase/include/buf0buf.h index 332b6d93aeb..f6af7f94ce8 100644 --- a/storage/innobase/include/buf0buf.h +++ b/storage/innobase/include/buf0buf.h @@ -261,14 +261,6 @@ buf_frame_will_withdrawn( buf_pool_t* buf_pool, const byte* ptr); -/** This is the thread for resizing buffer pool. It waits for an event and -when waked up either performs a resizing and sleeps again. -@return this function does not return, calls os_thread_exit() -*/ -extern "C" -os_thread_ret_t -DECLARE_THREAD(buf_resize_thread)(void*); - #ifdef BTR_CUR_HASH_ADAPT /** Clear the adaptive hash index on all pages in the buffer pool. */ void diff --git a/storage/innobase/include/buf0dblwr.h b/storage/innobase/include/buf0dblwr.h index c34c1077d97..68662ac3b89 100644 --- a/storage/innobase/include/buf0dblwr.h +++ b/storage/innobase/include/buf0dblwr.h @@ -100,11 +100,10 @@ void buf_dblwr_sync_datafiles(); /********************************************************************//** -Flushes possible buffered writes from the doublewrite memory buffer to disk, -and also wakes up the aio thread if simulated aio is used. It is very -important to call this function after a batch of writes has been posted, -and also when we may have to wait for a page latch! Otherwise a deadlock -of threads can occur. */ +Flushes possible buffered writes from the doublewrite memory buffer to disk. +It is very important to call this function after a batch of writes +has been posted, and also when we may have to wait for a page latch! +Otherwise a deadlock of threads can occur. */ void buf_dblwr_flush_buffered_writes(); diff --git a/storage/innobase/include/buf0dump.h b/storage/innobase/include/buf0dump.h index 8a7ef95ef9c..b8d790d3b13 100644 --- a/storage/innobase/include/buf0dump.h +++ b/storage/innobase/include/buf0dump.h @@ -29,7 +29,7 @@ Created April 08, 2011 Vasil Dimov #include "univ.i" /*****************************************************************//** -Wakes up the buffer pool dump/load thread and instructs it to start +Starts the buffer pool dump/load task dump/load thread and instructs it to start a dump. This function is called by MySQL code via buffer_pool_dump_now() and it should return immediately because the whole MySQL is frozen during its execution. */ @@ -38,7 +38,7 @@ buf_dump_start(); /*============*/ /*****************************************************************//** -Wakes up the buffer pool dump/load thread and instructs it to start +Starts the buffer pool dump/load task (if not started) and instructs it to start a load. This function is called by MySQL code via buffer_pool_load_now() and it should return immediately because the whole MySQL is frozen during its execution. */ @@ -54,16 +54,10 @@ void buf_load_abort(); /*============*/ -/*****************************************************************//** -This is the main thread for buffer pool dump/load. It waits for an -event and when waked up either performs a dump or load and sleeps -again. -@return this function does not return, it calls os_thread_exit() */ -extern "C" -os_thread_ret_t -DECLARE_THREAD(buf_dump_thread)( -/*============================*/ - void* arg); /*!< in: a dummy parameter - required by os_thread_create */ +/** Start async buffer pool load, if srv_buffer_pool_load_at_startup was set.*/ +void buf_load_at_startup(); + +/** Wait for currently running load/dumps to finish*/ +void buf_load_dump_end(); #endif /* buf0dump_h */ diff --git a/storage/innobase/include/buf0flu.h b/storage/innobase/include/buf0flu.h index 3753cb05651..ea26dbe9106 100644 --- a/storage/innobase/include/buf0flu.h +++ b/storage/innobase/include/buf0flu.h @@ -269,9 +269,7 @@ buf_flush_free_flush_rbt(void); /********************************************************************//** Writes a flushable page asynchronously from the buffer pool to a file. -NOTE: in simulated aio we must call -os_aio_simulated_wake_handler_threads after we have posted a batch of -writes! NOTE: buf_pool->mutex and buf_page_get_mutex(bpage) must be +NOTE: buf_pool->mutex and buf_page_get_mutex(bpage) must be held upon entering this function, and they will be released by this function. @return TRUE if page was flushed */ diff --git a/storage/innobase/include/dict0stats_bg.h b/storage/innobase/include/dict0stats_bg.h index 526139643d1..14dc38851aa 100644 --- a/storage/innobase/include/dict0stats_bg.h +++ b/storage/innobase/include/dict0stats_bg.h @@ -31,9 +31,6 @@ Created Apr 26, 2012 Vasil Dimov #include "os0event.h" #include "os0thread.h" -/** Event to wake up dict_stats_thread on dict_stats_recalc_pool_add() -or shutdown. Not protected by any mutex. */ -extern os_event_t dict_stats_event; #ifdef HAVE_PSI_INTERFACE extern mysql_pfs_key_t dict_stats_recalc_pool_mutex_key; @@ -99,16 +96,16 @@ dict_stats_wait_bg_to_stop_using_table( unlocking/locking the data dict */ /*****************************************************************//** Initialize global variables needed for the operation of dict_stats_thread(). -Must be called before dict_stats_thread() is started. */ +Must be called before dict_stats task is started. */ void -dict_stats_thread_init(); +dict_stats_init(); /*====================*/ /*****************************************************************//** Free resources allocated by dict_stats_thread_init(), must be called -after dict_stats_thread() has exited. */ +after dict_stats task has exited. */ void -dict_stats_thread_deinit(); +dict_stats_deinit(); /*======================*/ #ifdef UNIV_DEBUG @@ -119,20 +116,17 @@ void dict_stats_disabled_debug_update(THD*, st_mysql_sys_var*, void*, const void* save); #endif /* UNIV_DEBUG */ -/*****************************************************************//** -This is the thread for background stats gathering. It pops tables, from -the auto recalc list and proceeds them, eventually recalculating their -statistics. -@return this function does not return, it calls os_thread_exit() */ -extern "C" -os_thread_ret_t -DECLARE_THREAD(dict_stats_thread)( -/*==============================*/ - void* arg); /*!< in: a dummy parameter - required by os_thread_create */ - -/** Shut down the dict_stats_thread. */ + +/** Start the dict stats timer */ +void +dict_stats_start(); + +/** Shut down the dict_stats timer. */ void dict_stats_shutdown(); +/** reschedule dict stats timer to run now. */ +void +dict_stats_schedule_now(); + #endif /* dict0stats_bg_h */ diff --git a/storage/innobase/include/lock0lock.h b/storage/innobase/include/lock0lock.h index 6300706a2b9..67964decf1e 100644 --- a/storage/innobase/include/lock0lock.h +++ b/storage/innobase/include/lock0lock.h @@ -647,14 +647,8 @@ lock_table_has_locks( table itself */ /*********************************************************************//** -A thread which wakes up threads whose lock wait may have lasted too long. -@return a dummy parameter */ -extern "C" -os_thread_ret_t -DECLARE_THREAD(lock_wait_timeout_thread)( -/*=====================================*/ - void* arg); /*!< in: a dummy parameter required by - os_thread_create */ +A task which wakes up threads whose lock wait may have lasted too long. */ +void lock_wait_timeout_task(void*); /********************************************************************//** Releases a user OS thread waiting for a lock to be released, if the @@ -791,14 +785,8 @@ public: ulint n_lock_max_wait_time; /*!< Max wait time */ - os_event_t timeout_event; /*!< An event waited for by - lock_wait_timeout_thread. - Not protected by a mutex, - but the waits are timed. - Signaled on shutdown only. */ - - bool timeout_thread_active; /*!< True if the timeout thread - is running */ + std::unique_ptr<tpool::timer> timeout_timer; /*!< Thread pool timer task */ + bool timeout_timer_active; /** diff --git a/storage/innobase/include/os0file.h b/storage/innobase/include/os0file.h index 324c6f7447d..29bed4f3e4f 100644 --- a/storage/innobase/include/os0file.h +++ b/storage/innobase/include/os0file.h @@ -38,6 +38,7 @@ Created 10/21/1995 Heikki Tuuri #include "fsp0types.h" #include "os0api.h" +#include "tpool.h" #ifndef _WIN32 #include <dirent.h> @@ -66,7 +67,7 @@ the OS actually supports it: Win 95 does not, NT does. */ # define UNIV_NON_BUFFERED_IO /** File handle */ -typedef HANDLE os_file_t; +typedef native_file_handle os_file_t; #else /* _WIN32 */ @@ -102,6 +103,14 @@ struct pfs_os_file_t /** Assignment operator. @param[in] file file handle to be assigned */ void operator=(os_file_t file) { m_file = file; } + bool operator==(os_file_t file) const { return m_file == file; } + bool operator!=(os_file_t file) const { return !(*this == file); } +#ifndef DBUG_OFF + friend std::ostream& operator<<(std::ostream& os, pfs_os_file_t f){ + os << os_file_t(f); + return os; + } +#endif }; /** The next value should be smaller or equal to the smallest sector size used @@ -206,14 +215,6 @@ public: /** Disable partial read warnings */ DISABLE_PARTIAL_IO_WARNINGS = 32, - /** Do not to wake i/o-handler threads, but the caller will do - the waking explicitly later, in this way the caller can post - several requests in a batch; NOTE that the batch must not be - so big that it exhausts the slots in AIO arrays! NOTE that - a simulated batch may introduce hidden chances of deadlocks, - because I/Os are not actually handled until all - have been posted: use with great caution! */ - DO_NOT_WAKE = 64, /** Ignore failed reads of non-existent pages */ IGNORE_MISSING = 128, @@ -296,13 +297,6 @@ public: return((m_type & LOG) == LOG); } - /** @return true if the simulated AIO thread should be woken up */ - bool is_wake() const - MY_ATTRIBUTE((warn_unused_result)) - { - return((m_type & DO_NOT_WAKE) == 0); - } - /** Clear the punch hole flag */ void clear_punch_hole() { @@ -352,12 +346,6 @@ public: } } - /** Clear the do not wake flag */ - void clear_do_not_wake() - { - m_type &= ~DO_NOT_WAKE; - } - /** Set the pointer to file node for IO @param[in] node File node */ inline void set_fil_node(fil_node_t* node); @@ -438,7 +426,7 @@ struct os_file_size_t { }; /** Win NT does not allow more than 64 */ -static const ulint OS_AIO_N_PENDING_IOS_PER_THREAD = 32; +static const ulint OS_AIO_N_PENDING_IOS_PER_THREAD = 256; /** Modes for aio operations @{ */ /** Normal asynchronous i/o not for ibuf pages or ibuf bitmap pages */ @@ -450,12 +438,10 @@ static const ulint OS_AIO_IBUF = 22; /** Asynchronous i/o for the log */ static const ulint OS_AIO_LOG = 23; -/** Asynchronous i/o where the calling thread will itself wait for -the i/o to complete, doing also the job of the i/o-handler thread; +/**Calling thread will wait for the i/o to complete, +and perform IO completion routine itself; can be used for any pages, ibuf or non-ibuf. This is used to save -CPU time, as we can do with fewer thread switches. Plain synchronous -I/O is not as good, because it must serialize the file seek and read -or write, causing a bottleneck for parallelism. */ +CPU time, as we can do with fewer thread switches. */ static const ulint OS_AIO_SYNC = 24; /* @} */ @@ -1396,6 +1382,12 @@ Frees the asynchronous io system. */ void os_aio_free(); +struct os_aio_userdata_t +{ + fil_node_t* node; + IORequest type; + void* message; +}; /** NOTE! Use the corresponding macro os_aio(), not directly this function! Requests an asynchronous i/o operation. @@ -1428,47 +1420,12 @@ os_aio_func( fil_node_t* m1, void* m2); -/** Wakes up all async i/o threads so that they know to exit themselves in -shutdown. */ -void -os_aio_wake_all_threads_at_shutdown(); /** Waits until there are no pending writes in os_aio_write_array. There can be other, synchronous, pending writes. */ void os_aio_wait_until_no_pending_writes(); -/** Wakes up simulated aio i/o-handler threads if they have something to do. */ -void -os_aio_simulated_wake_handler_threads(); - -/** This is the generic AIO handler interface function. -Waits for an aio operation to complete. This function is used to wait the -for completed requests. The AIO array of pending requests is divided -into segments. The thread specifies which segment or slot it wants to wait -for. NOTE: this function will also take care of freeing the aio slot, -therefore no other thread is allowed to do the freeing! -@param[in] segment the number of the segment in the aio arrays to - wait for; segment 0 is the ibuf I/O thread, - segment 1 the log I/O thread, then follow the - non-ibuf read threads, and as the last are the - non-ibuf write threads; if this is - ULINT_UNDEFINED, then it means that sync AIO - is used, and this parameter is ignored -@param[out] m1 the messages passed with the AIO request; - note that also in the case where the AIO - operation failed, these output parameters - are valid and can be used to restart the - operation, for example -@param[out] m2 callback message -@param[out] type OS_FILE_WRITE or ..._READ -@return DB_SUCCESS or error code */ -dberr_t -os_aio_handler( - ulint segment, - fil_node_t** m1, - void** m2, - IORequest* type); /** Prints info of the aio arrays. @param[in/out] file file where to print */ @@ -1484,14 +1441,6 @@ no pending io operations. */ bool os_aio_all_slots_free(); -#ifdef UNIV_DEBUG - -/** Prints all pending IO -@param[in] file file where to print */ -void -os_aio_print_pending_io(FILE* file); - -#endif /* UNIV_DEBUG */ /** This function returns information about the specified file @param[in] path pathname of the file diff --git a/storage/innobase/include/que0que.h b/storage/innobase/include/que0que.h index 4b52e55a047..01572e2a026 100644 --- a/storage/innobase/include/que0que.h +++ b/storage/innobase/include/que0que.h @@ -378,9 +378,6 @@ struct que_thr_t{ related delete/updates */ row_prebuilt_t* prebuilt; /*!< prebuilt structure processed by the query thread */ - - /** a slot of srv_sys.sys_threads, for DEBUG_SYNC in purge thread */ - ut_d(srv_slot_t* thread_slot;) }; #define QUE_THR_MAGIC_N 8476583 diff --git a/storage/innobase/include/row0ftsort.h b/storage/innobase/include/row0ftsort.h index beb2f8c2bfb..a00d4bbba6a 100644 --- a/storage/innobase/include/row0ftsort.h +++ b/storage/innobase/include/row0ftsort.h @@ -32,6 +32,7 @@ Created 10/13/2010 Jimmy Yang #include "fts0priv.h" #include "row0merge.h" #include "btr0bulk.h" +#include "srv0srv.h" /** This structure defineds information the scan thread will fetch and put to the linked list for parallel tokenization/sort threads @@ -64,7 +65,6 @@ struct fts_psort_common_t { trx_t* trx; /*!< transaction */ fts_psort_t* all_info; /*!< all parallel sort info */ os_event_t sort_event; /*!< sort event */ - os_event_t merge_event; /*!< merge event */ ibool opt_doc_id_size;/*!< whether to use 4 bytes instead of 8 bytes integer to store Doc ID during sort, if @@ -86,11 +86,11 @@ struct fts_psort_t { /*!< buffer to crypt data */ row_merge_block_t* crypt_alloc[FTS_NUM_AUX_INDEX]; /*!< buffer to allocated */ - ulint child_status; /*!< child thread status */ - ulint state; /*!< parent thread state */ + ulint child_status; /*!< child task status */ + ulint state; /*!< parent state */ fts_doc_list_t fts_doc_list; /*!< doc list to process */ fts_psort_common_t* psort_common; /*!< ptr to all psort info */ - os_thread_t thread_hdl; /*!< thread handler */ + tpool::waitable_task* task; /*!< threadpool task */ dberr_t error; /*!< db error during psort */ ulint memory_used; /*!< memory used by fts_doc_list */ ib_mutex_t mutex; /*!< mutex for fts_doc_list */ diff --git a/storage/innobase/include/srv0srv.h b/storage/innobase/include/srv0srv.h index 673c5c1091e..53c280287f2 100644 --- a/storage/innobase/include/srv0srv.h +++ b/storage/innobase/include/srv0srv.h @@ -51,6 +51,8 @@ Created 10/10/1995 Heikki Tuuri #include "mysql/psi/mysql_stage.h" #include "mysql/psi/psi.h" +#include <tpool.h> +#include <memory> /** Global counters used inside InnoDB. */ struct srv_stats_t @@ -204,21 +206,6 @@ extern const char* srv_main_thread_op_info; /** Prefix used by MySQL to indicate pre-5.1 table name encoding */ extern const char srv_mysql50_table_name_prefix[10]; -/** Event to signal srv_monitor_thread. Not protected by a mutex. -Set after setting srv_print_innodb_monitor. */ -extern os_event_t srv_monitor_event; - -/** Event to signal the shutdown of srv_error_monitor_thread. -Not protected by a mutex. */ -extern os_event_t srv_error_event; - -/** Event for waking up buf_dump_thread. Not protected by a mutex. -Set on shutdown or by buf_dump_start() or buf_load_start(). */ -extern os_event_t srv_buf_dump_event; - -/** The buffer pool resize thread waits on this event. */ -extern os_event_t srv_buf_resize_event; - /** The buffer pool dump/load file name */ #define SRV_BUF_DUMP_FILENAME_DEFAULT "ib_buffer_pool" extern char* srv_buf_dump_filename; @@ -274,7 +261,7 @@ extern unsigned long long srv_online_max_size; /* If this flag is TRUE, then we will use the native aio of the OS (provided we compiled Innobase with it in), otherwise we will -use simulated aio we build below with threads. +use simulated aio. Currently we support native aio on windows and linux */ extern my_bool srv_use_native_aio; extern my_bool srv_numa_interleave; @@ -505,16 +492,7 @@ extern my_bool srv_print_innodb_lock_monitor; extern ibool srv_print_verbose_log; extern bool srv_monitor_active; -extern bool srv_error_monitor_active; -/* TRUE during the lifetime of the buffer pool dump/load thread */ -extern bool srv_buf_dump_thread_active; - -/* true during the lifetime of the buffer pool resize thread */ -extern bool srv_buf_resize_thread_active; - -/* TRUE during the lifetime of the stats thread */ -extern bool srv_dict_stats_thread_active; /* TRUE if enable log scrubbing */ extern my_bool srv_scrub_log; @@ -593,23 +571,10 @@ extern ulong srv_buf_dump_status_frequency; #define srv_max_purge_threads 32 # ifdef UNIV_PFS_THREAD -/* Keys to register InnoDB threads with performance schema */ -extern mysql_pfs_key_t buf_dump_thread_key; -extern mysql_pfs_key_t dict_stats_thread_key; -extern mysql_pfs_key_t io_handler_thread_key; -extern mysql_pfs_key_t io_ibuf_thread_key; -extern mysql_pfs_key_t io_log_thread_key; -extern mysql_pfs_key_t io_read_thread_key; -extern mysql_pfs_key_t io_write_thread_key; extern mysql_pfs_key_t page_cleaner_thread_key; extern mysql_pfs_key_t recv_writer_thread_key; -extern mysql_pfs_key_t srv_error_monitor_thread_key; -extern mysql_pfs_key_t srv_lock_timeout_thread_key; -extern mysql_pfs_key_t srv_master_thread_key; -extern mysql_pfs_key_t srv_monitor_thread_key; -extern mysql_pfs_key_t srv_purge_thread_key; -extern mysql_pfs_key_t srv_worker_thread_key; extern mysql_pfs_key_t trx_rollback_clean_thread_key; +extern mysql_pfs_key_t thread_pool_thread_key; /* This macro register the current thread and its key with performance schema */ @@ -732,17 +697,6 @@ enum srv_stats_method_name_enum { typedef enum srv_stats_method_name_enum srv_stats_method_name_t; -/** Types of threads existing in the system. */ -enum srv_thread_type { - SRV_NONE, /*!< None */ - SRV_WORKER, /*!< threads serving parallelized - queries and queries released from - lock wait */ - SRV_PURGE, /*!< Purge coordinator thread */ - SRV_MASTER /*!< the master thread, (whose type - number must be biggest) */ -}; - /*********************************************************************//** Boots Innobase server. */ void @@ -766,10 +720,10 @@ Resets the info describing an i/o thread current state. */ void srv_reset_io_thread_op_info(); -/** Wake up the purge threads if there is work to do. */ +/** Wake up the purge if there is work to do. */ void srv_wake_purge_thread_if_not_active(); -/** Wake up the InnoDB master thread if it was suspended (not sleeping). */ +/** Wake up the InnoDB master thread */ void srv_active_wake_master_thread_low(); @@ -779,7 +733,7 @@ srv_active_wake_master_thread_low(); srv_active_wake_master_thread_low(); \ } \ } while (0) -/** Wake up the master thread if it is suspended or being suspended. */ +/** Wake up the master */ void srv_wake_master_thread(); @@ -832,61 +786,37 @@ srv_que_task_enqueue_low( que_thr_t* thr); /*!< in: query thread */ /**********************************************************************//** -Check whether any background thread is active. If so, return the thread -type. -@return SRV_NONE if all are are suspended or have exited, thread -type if any are still active. */ -enum srv_thread_type -srv_get_active_thread_type(void); +Check whether purge or master is active. +@return false if all are are suspended or have exited, true +if any are still active. */ +bool srv_any_background_activity(); + /*============================*/ extern "C" { -/*********************************************************************//** -A thread which prints the info output by various InnoDB monitors. -@return a dummy parameter */ -os_thread_ret_t -DECLARE_THREAD(srv_monitor_thread)( -/*===============================*/ - void* arg); /*!< in: a dummy parameter required by - os_thread_create */ -/*********************************************************************//** -The master thread controlling the server. -@return a dummy parameter */ -os_thread_ret_t -DECLARE_THREAD(srv_master_thread)( -/*==============================*/ - void* arg); /*!< in: a dummy parameter required by - os_thread_create */ +/** Periodic task which prints the info output by various InnoDB monitors.*/ +void srv_monitor_task(void*); + + +/** The periodic master task controlling the server. */ +void srv_master_callback(void *); + + +/** +Perform shutdown tasks such as background drop, +and optionally ibuf merge. +*/ +void srv_shutdown(bool ibuf_merge); + /************************************************************************* -A thread which prints warnings about semaphore waits which have lasted +A task which prints warnings about semaphore waits which have lasted too long. These can be used to track bugs which cause hangs. -@return a dummy parameter */ -os_thread_ret_t -DECLARE_THREAD(srv_error_monitor_thread)( -/*=====================================*/ - void* arg); /*!< in: a dummy parameter required by - os_thread_create */ +*/ +void srv_error_monitor_task(void*); -/*********************************************************************//** -Purge coordinator thread that schedules the purge tasks. -@return a dummy parameter */ -os_thread_ret_t -DECLARE_THREAD(srv_purge_coordinator_thread)( -/*=========================================*/ - void* arg MY_ATTRIBUTE((unused))); /*!< in: a dummy parameter - required by os_thread_create */ - -/*********************************************************************//** -Worker thread that reads tasks from the work queue and executes them. -@return a dummy parameter */ -os_thread_ret_t -DECLARE_THREAD(srv_worker_thread)( -/*==============================*/ - void* arg MY_ATTRIBUTE((unused))); /*!< in: a dummy parameter - required by os_thread_create */ } /* extern "C" */ /**********************************************************************//** @@ -896,12 +826,6 @@ ulint srv_get_task_queue_length(void); /*===========================*/ -/** Ensure that a given number of threads of the type given are running -(or are already terminated). -@param[in] type thread type -@param[in] n number of threads that have to run */ -void -srv_release_threads(enum srv_thread_type type, ulint n); /** Wakeup the purge threads. */ void @@ -910,6 +834,12 @@ srv_purge_wakeup(); /** Shut down the purge threads. */ void srv_purge_shutdown(); +/** Init purge tasks*/ +void srv_init_purge_tasks(uint n_max); + +/** Shut down purge tasks*/ +void srv_shutdown_purge_tasks(); + #ifdef UNIV_DEBUG /** Disables master thread. It's used by: SET GLOBAL innodb_master_thread_disabled_debug = 1 (0). @@ -1074,8 +1004,6 @@ struct export_var_t{ /** Thread slot in the thread table. */ struct srv_slot_t{ - srv_thread_type type; /*!< thread type: user, - utility etc. */ ibool in_use; /*!< TRUE if this slot is in use */ ibool suspended; /*!< TRUE if the thread is @@ -1099,22 +1027,29 @@ struct srv_slot_t{ to do */ que_thr_t* thr; /*!< suspended query thread (only used for user threads) */ -#ifdef UNIV_DEBUG - struct debug_sync_t { - UT_LIST_NODE_T(debug_sync_t) debug_sync_list; - }; - UT_LIST_BASE_NODE_T(debug_sync_t) debug_sync; - rw_lock_t debug_sync_lock; -#endif }; -#ifdef UNIV_DEBUG -typedef void srv_slot_callback_t(srv_slot_t*, const void*); +extern tpool::thread_pool *srv_thread_pool; +extern std::unique_ptr<tpool::timer> srv_master_timer; +extern std::unique_ptr<tpool::timer> srv_error_monitor_timer; +extern std::unique_ptr<tpool::timer> srv_monitor_timer; -void srv_for_each_thread(srv_thread_type type, - srv_slot_callback_t callback, - const void *arg); -#endif +#define SRV_MONITOR_TIMER_PERIOD 5000 +static inline void srv_monitor_timer_schedule_now() +{ + srv_monitor_timer->set_time(0, SRV_MONITOR_TIMER_PERIOD); +} +static inline void srv_start_periodic_timer( + std::unique_ptr<tpool::timer>& timer, + void (*func)(void*), + int period) +{ + timer.reset(srv_thread_pool->create_timer(func)); + timer->set_time(0, period); +} + +void srv_thread_pool_init(); +void srv_thread_pool_end(); #ifdef WITH_WSREP UNIV_INTERN diff --git a/storage/innobase/include/srv0start.h b/storage/innobase/include/srv0start.h index 580c48bd02c..79add0b72c3 100644 --- a/storage/innobase/include/srv0start.h +++ b/storage/innobase/include/srv0start.h @@ -48,6 +48,12 @@ srv_undo_tablespaces_init(bool create_new_db); @return DB_SUCCESS or error code */ dberr_t srv_start(bool create_new_db); +/** + Shutdown purge to make sure that there is no possibility that we call any + plugin code (e.g audit) inside virtual column computation. +*/ +void innodb_preshutdown(); + /** Shut down InnoDB. */ void innodb_shutdown(); diff --git a/storage/innobase/include/trx0purge.h b/storage/innobase/include/trx0purge.h index 7c3343ce7d2..9f3438b7f7d 100644 --- a/storage/innobase/include/trx0purge.h +++ b/storage/innobase/include/trx0purge.h @@ -62,10 +62,6 @@ trx_purge( ulint n_purge_threads, /*!< in: number of purge tasks to submit to task queue. */ bool truncate /*!< in: truncate history if true */ -#ifdef UNIV_DEBUG - , srv_slot_t *slot /*!< in/out: purge coordinator - thread slot */ -#endif ); /** Rollback segements from a given transaction with trx-no @@ -144,14 +140,11 @@ private: class purge_sys_t { public: - /** signal state changes; os_event_reset() and os_event_set() - are protected by rw_lock_x_lock(latch) */ - MY_ALIGNED(CACHE_LINE_SIZE) - os_event_t event; /** latch protecting view, m_enabled */ MY_ALIGNED(CACHE_LINE_SIZE) rw_lock_t latch; private: + bool m_initialized; /** whether purge is enabled; protected by latch and std::atomic */ std::atomic<bool> m_enabled; /** number of pending stop() calls without resume() */ @@ -162,9 +155,6 @@ public: MY_ALIGNED(CACHE_LINE_SIZE) ReadView view; /*!< The purge will not remove undo logs which are >= this view (purge view) */ - /** Number of not completed tasks. Accessed by srv_purge_coordinator - and srv_worker_thread by std::atomic. */ - std::atomic<ulint> n_tasks; /** Iterator to the undo log records of committed transactions */ struct iterator @@ -234,7 +224,7 @@ public: uninitialised. Real initialisation happens in create(). */ - purge_sys_t() : event(NULL), m_enabled(false), n_tasks(0) {} + purge_sys_t():m_initialized(false),m_enabled(false) {} /** Create the instance */ diff --git a/storage/innobase/lock/lock0lock.cc b/storage/innobase/lock/lock0lock.cc index d01fd73892e..77fee83aef2 100644 --- a/storage/innobase/lock/lock0lock.cc +++ b/storage/innobase/lock/lock0lock.cc @@ -461,7 +461,6 @@ void lock_sys_t::create(ulint n_cells) mutex_create(LATCH_ID_LOCK_SYS_WAIT, &wait_mutex); - timeout_event = os_event_create(0); rec_hash = hash_create(n_cells); prdt_hash = hash_create(n_cells); @@ -471,6 +470,7 @@ void lock_sys_t::create(ulint n_cells) lock_latest_err_file = os_file_create_tmpfile(); ut_a(lock_latest_err_file); } + timeout_timer_active = false; } /** Calculates the fold value of a lock: used in migrating the hash table. @@ -560,8 +560,6 @@ void lock_sys_t::close() hash_table_free(prdt_hash); hash_table_free(prdt_page_hash); - os_event_destroy(timeout_event); - mutex_destroy(&mutex); mutex_destroy(&wait_mutex); @@ -6290,14 +6288,6 @@ lock_trx_lock_list_init( UT_LIST_INIT(*lock_list, &lock_t::trx_locks); } -/*******************************************************************//** -Set the lock system timeout event. */ -void -lock_set_timeout_event() -/*====================*/ -{ - os_event_set(lock_sys.timeout_event); -} #ifdef UNIV_DEBUG /*******************************************************************//** diff --git a/storage/innobase/lock/lock0wait.cc b/storage/innobase/lock/lock0wait.cc index 94104172577..c98cb299534 100644 --- a/storage/innobase/lock/lock0wait.cc +++ b/storage/innobase/lock/lock0wait.cc @@ -36,6 +36,7 @@ Created 25/5/2010 Sunny Bains #include "row0mysql.h" #include "srv0start.h" #include "lock0priv.h" +#include "srv0srv.h" /*********************************************************************//** Print the contents of the lock_sys_t::waiting_threads array. */ @@ -51,10 +52,9 @@ lock_wait_table_print(void) for (ulint i = 0; i < srv_max_n_threads; i++, ++slot) { fprintf(stderr, - "Slot %lu: thread type %lu," + "Slot %lu:" " in use %lu, susp %lu, timeout %lu, time %lu\n", (ulong) i, - (ulong) slot->type, (ulong) slot->in_use, (ulong) slot->suspended, slot->wait_timeout, @@ -164,7 +164,10 @@ lock_wait_table_reserve_slot( ut_ad(lock_sys.last_slot <= lock_sys.waiting_threads + srv_max_n_threads); - + if (!lock_sys.timeout_timer_active) { + lock_sys.timeout_timer_active = true; + lock_sys.timeout_timer->set_time(1000, 0); + } return(slot); } } @@ -211,7 +214,7 @@ wsrep_is_BF_lock_timeout( srv_print_innodb_monitor = TRUE; srv_print_innodb_lock_monitor = TRUE; - os_event_set(srv_monitor_event); + srv_monitor_timer_schedule_now(); return true; } return false; @@ -235,6 +238,7 @@ lock_wait_suspend_thread( ibool was_declared_inside_innodb; ulong lock_wait_timeout; + ut_a(lock_sys.timeout_timer.get()); trx = thr_get_trx(thr); if (trx->mysql_thd != 0) { @@ -497,67 +501,33 @@ lock_wait_check_and_cancel( } } -/*********************************************************************//** -A thread which wakes up threads whose lock wait may have lasted too long. -@return a dummy parameter */ -extern "C" -os_thread_ret_t -DECLARE_THREAD(lock_wait_timeout_thread)(void*) +/** Task that is periodically runs in the thread pool*/ +void lock_wait_timeout_task(void*) { - int64_t sig_count = 0; - os_event_t event = lock_sys.timeout_event; - - ut_ad(!srv_read_only_mode); - -#ifdef UNIV_PFS_THREAD - pfs_register_thread(srv_lock_timeout_thread_key); -#endif /* UNIV_PFS_THREAD */ - - do { - srv_slot_t* slot; - - /* When someone is waiting for a lock, we wake up every second - and check if a timeout has passed for a lock wait */ - - os_event_wait_time_low(event, 1000000, sig_count); - sig_count = os_event_reset(event); - - if (srv_shutdown_state >= SRV_SHUTDOWN_CLEANUP) { - break; - } - - lock_wait_mutex_enter(); - - /* Check all slots for user threads that are waiting - on locks, and if they have exceeded the time limit. */ - - for (slot = lock_sys.waiting_threads; - slot < lock_sys.last_slot; - ++slot) { - - /* We are doing a read without the lock mutex - and/or the trx mutex. This is OK because a slot - can't be freed or reserved without the lock wait - mutex. */ + lock_wait_mutex_enter(); - if (slot->in_use) { - lock_wait_check_and_cancel(slot); - } + /* Check all slots for user threads that are waiting + on locks, and if they have exceeded the time limit. */ + bool any_slot_in_use = false; + for (srv_slot_t* slot = lock_sys.waiting_threads; + slot < lock_sys.last_slot; + ++slot) { + + /* We are doing a read without the lock mutex + and/or the trx mutex. This is OK because a slot + can't be freed or reserved without the lock wait + mutex. */ + + if (slot->in_use) { + any_slot_in_use = true; + lock_wait_check_and_cancel(slot); } - - sig_count = os_event_reset(event); - - lock_wait_mutex_exit(); - - } while (srv_shutdown_state < SRV_SHUTDOWN_CLEANUP); - - lock_sys.timeout_thread_active = false; - - /* We count the number of threads in os_thread_exit(). A created - thread should always use that to exit and not use return() to exit. */ - - os_thread_exit(); - - OS_THREAD_DUMMY_RETURN; + } + if (any_slot_in_use) { + lock_sys.timeout_timer->set_time(1000, 0); + } else { + lock_sys.timeout_timer_active = false; + } + lock_wait_mutex_exit(); } diff --git a/storage/innobase/log/log0log.cc b/storage/innobase/log/log0log.cc index 1fee4d70411..609e278e835 100644 --- a/storage/innobase/log/log0log.cc +++ b/storage/innobase/log/log0log.cc @@ -53,6 +53,7 @@ Created 12/9/1995 Heikki Tuuri #include "trx0roll.h" #include "srv0mon.h" #include "sync0sync.h" +#include "buf0dump.h" /* General philosophy of InnoDB redo-logs: @@ -1495,6 +1496,7 @@ log_check_margins(void) } while (check); } +extern void buf_resize_shutdown(); /****************************************************************//** Makes a checkpoint at the latest lsn and writes it to first page of each data file in the database, so that we know that the file spaces contain @@ -1511,26 +1513,37 @@ logs_empty_and_mark_files_at_shutdown(void) /* Wait until the master thread and all other operations are idle: our algorithm only works if the server is idle at shutdown */ + bool do_srv_shutdown = false; + if (srv_master_timer) { + do_srv_shutdown = srv_fast_shutdown < 2; + srv_master_timer.reset(); + } + + /* Wait for the end of the buffer resize task.*/ + buf_resize_shutdown(); + dict_stats_shutdown(); + btr_defragment_shutdown(); srv_shutdown_state = SRV_SHUTDOWN_CLEANUP; + + if (srv_buffer_pool_dump_at_shutdown && + !srv_read_only_mode && srv_fast_shutdown < 2) { + buf_dump_start(); + } + srv_error_monitor_timer.reset(); + srv_monitor_timer.reset(); + lock_sys.timeout_timer.reset(); + if (do_srv_shutdown) { + srv_shutdown(srv_fast_shutdown == 0); + } + + loop: ut_ad(lock_sys.is_initialised() || !srv_was_started); ut_ad(log_sys.is_initialised() || !srv_was_started); ut_ad(fil_system.is_initialised() || !srv_was_started); - os_event_set(srv_buf_resize_event); if (!srv_read_only_mode) { - os_event_set(srv_error_event); - os_event_set(srv_monitor_event); - os_event_set(srv_buf_dump_event); - if (lock_sys.timeout_thread_active) { - os_event_set(lock_sys.timeout_event); - } - if (dict_stats_event) { - os_event_set(dict_stats_event); - } else { - ut_ad(!srv_dict_stats_thread_active); - } if (recv_sys.flush_start) { /* This is in case recv_writer_thread was never started, or buf_flush_page_cleaner_coordinator @@ -1570,23 +1583,7 @@ loop: /* We need these threads to stop early in shutdown. */ const char* thread_name; - if (srv_error_monitor_active) { - thread_name = "srv_error_monitor_thread"; - } else if (srv_monitor_active) { - thread_name = "srv_monitor_thread"; - } else if (srv_buf_resize_thread_active) { - thread_name = "buf_resize_thread"; - goto wait_suspend_loop; - } else if (srv_dict_stats_thread_active) { - thread_name = "dict_stats_thread"; - } else if (lock_sys.timeout_thread_active) { - thread_name = "lock_wait_timeout_thread"; - } else if (srv_buf_dump_thread_active) { - thread_name = "buf_dump_thread"; - goto wait_suspend_loop; - } else if (btr_defragment_thread_active) { - thread_name = "btr_defragment_thread"; - } else if (srv_fast_shutdown != 2 && trx_rollback_is_active) { + if (srv_fast_shutdown != 2 && trx_rollback_is_active) { thread_name = "rollback of recovered transactions"; } else { thread_name = NULL; @@ -1608,26 +1605,17 @@ wait_suspend_loop: /* Check that the background threads are suspended */ - switch (srv_get_active_thread_type()) { - case SRV_NONE: - if (!srv_n_fil_crypt_threads_started) { - srv_shutdown_state = SRV_SHUTDOWN_FLUSH_PHASE; - break; - } + ut_a(!srv_any_background_activity()); + if (srv_n_fil_crypt_threads_started) { os_event_set(fil_crypt_threads_event); thread_name = "fil_crypt_thread"; goto wait_suspend_loop; - case SRV_PURGE: - case SRV_WORKER: - ut_ad(!"purge was not shut down"); - srv_purge_wakeup(); - thread_name = "purge thread"; - goto wait_suspend_loop; - case SRV_MASTER: - thread_name = "master thread"; - goto wait_suspend_loop; } + buf_load_dump_end(); + + srv_shutdown_state = SRV_SHUTDOWN_FLUSH_PHASE; + /* At this point only page_cleaner should be active. We wait here to let it complete the flushing of the buffer pools before proceeding further. */ @@ -1740,7 +1728,7 @@ wait_suspend_loop: srv_shutdown_state = SRV_SHUTDOWN_LAST_PHASE; /* Make some checks that the server really is quiet */ - ut_a(srv_get_active_thread_type() == SRV_NONE); + ut_a(!srv_any_background_activity()); service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL, "Free innodb buffer pool"); @@ -1768,7 +1756,7 @@ wait_suspend_loop: fil_close_all_files(); /* Make some checks that the server really is quiet */ - ut_a(srv_get_active_thread_type() == SRV_NONE); + ut_a(!srv_any_background_activity()); ut_a(lsn == log_sys.lsn || srv_force_recovery == SRV_FORCE_NO_LOG_REDO); diff --git a/storage/innobase/os/os0file.cc b/storage/innobase/os/os0file.cc index f9028122f8d..361e34549c7 100644 --- a/storage/innobase/os/os0file.cc +++ b/storage/innobase/os/os0file.cc @@ -53,6 +53,7 @@ Created 10/21/1995 Heikki Tuuri #include "os0thread.h" #include <vector> +#include <tpool_structs.h> #ifdef LINUX_NATIVE_AIO #include <libaio.h> @@ -78,601 +79,80 @@ Created 10/21/1995 Heikki Tuuri #endif -/** Insert buffer segment id */ -static const ulint IO_IBUF_SEGMENT = 0; - -/** Log segment id */ -static const ulint IO_LOG_SEGMENT = 1; - -/** Number of retries for partial I/O's */ -static const ulint NUM_RETRIES_ON_PARTIAL_IO = 10; - -/* This specifies the file permissions InnoDB uses when it creates files in -Unix; the value of os_innodb_umask is initialized in ha_innodb.cc to -my_umask */ - -#ifndef _WIN32 -/** Umask for creating files */ -static ulint os_innodb_umask = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP; -#else -/** Umask for creating files */ -static ulint os_innodb_umask = 0; -static HANDLE data_completion_port; -static HANDLE log_completion_port; - -static DWORD fls_sync_io = FLS_OUT_OF_INDEXES; -#define IOCP_SHUTDOWN_KEY (ULONG_PTR)-1 -#endif /* _WIN32 */ - -/** In simulated aio, merge at most this many consecutive i/os */ -static const ulint OS_AIO_MERGE_N_CONSECUTIVE = 64; - -/** Flag indicating if the page_cleaner is in active state. */ -extern bool buf_page_cleaner_is_active; - -#ifdef WITH_INNODB_DISALLOW_WRITES -#define WAIT_ALLOW_WRITES() os_event_wait(srv_allow_writes_event) -#else -#define WAIT_ALLOW_WRITES() do { } while (0) -#endif /* WITH_INNODB_DISALLOW_WRITES */ - -/********************************************************************** - -InnoDB AIO Implementation: -========================= - -We support native AIO for Windows and Linux. For rest of the platforms -we simulate AIO by special IO-threads servicing the IO-requests. - -Simulated AIO: -============== - -On platforms where we 'simulate' AIO, the following is a rough explanation -of the high level design. -There are four io-threads (for ibuf, log, read, write). -All synchronous IO requests are serviced by the calling thread using -os_file_write/os_file_read. The Asynchronous requests are queued up -in an array (there are four such arrays) by the calling thread. -Later these requests are picked up by the IO-thread and are serviced -synchronously. - -Windows native AIO: -================== - -If srv_use_native_aio is not set then Windows follow the same -code as simulated AIO. If the flag is set then native AIO interface -is used. On windows, one of the limitation is that if a file is opened -for AIO no synchronous IO can be done on it. Therefore we have an -extra fifth array to queue up synchronous IO requests. -There are innodb_file_io_threads helper threads. These threads work -on the four arrays mentioned above in Simulated AIO. No thread is -required for the sync array. -If a synchronous IO request is made, it is first queued in the sync -array. Then the calling thread itself waits on the request, thus -making the call synchronous. -If an AIO request is made the calling thread not only queues it in the -array but also submits the requests. The helper thread then collects -the completed IO request and calls completion routine on it. - -Linux native AIO: -================= - -If we have libaio installed on the system and innodb_use_native_aio -is set to true we follow the code path of native AIO, otherwise we -do simulated AIO. -There are innodb_file_io_threads helper threads. These threads work -on the four arrays mentioned above in Simulated AIO. -If a synchronous IO request is made, it is handled by calling -os_file_write/os_file_read. -If an AIO request is made the calling thread not only queues it in the -array but also submits the requests. The helper thread then collects -the completed IO request and calls completion routine on it. - -**********************************************************************/ - - -#ifdef UNIV_PFS_IO -/* Keys to register InnoDB I/O with performance schema */ -mysql_pfs_key_t innodb_data_file_key; -mysql_pfs_key_t innodb_log_file_key; -mysql_pfs_key_t innodb_temp_file_key; -#endif /* UNIV_PFS_IO */ - -class AIO; - -/** The asynchronous I/O context */ -struct Slot { - -#ifdef WIN_ASYNC_IO - /** Windows control block for the aio request - must be at the very start of Slot, so we can - cast Slot* to OVERLAPPED* - */ - OVERLAPPED control; -#endif - - /** index of the slot in the aio array */ - uint16_t pos; - - /** true if this slot is reserved */ - bool is_reserved; - - /** time when reserved */ - time_t reservation_time; - - /** buffer used in i/o */ - byte* buf; - - /** Buffer pointer used for actual IO. We advance this - when partial IO is required and not buf */ - byte* ptr; - - /** OS_FILE_READ or OS_FILE_WRITE */ - IORequest type; - - /** file offset in bytes */ - os_offset_t offset; - - /** file where to read or write */ - pfs_os_file_t file; - - /** file name or path */ - const char* name; - - /** used only in simulated aio: true if the physical i/o - already made and only the slot message needs to be passed - to the caller of os_aio_simulated_handle */ - bool io_already_done; - - /*!< file block size */ - ulint file_block_size; - - /** The file node for which the IO is requested. */ - fil_node_t* m1; - - /** the requester of an aio operation and which can be used - to identify which pending aio operation was completed */ - void* m2; - - /** AIO completion status */ - dberr_t err; - -#ifdef WIN_ASYNC_IO - - /** bytes written/read */ - DWORD n_bytes; - - /** length of the block to read or write */ - DWORD len; - - /** aio array containing this slot */ - AIO *array; -#elif defined(LINUX_NATIVE_AIO) - /** Linux control block for aio */ - struct iocb control; - - /** AIO return code */ - int ret; - - /** bytes written/read. */ - ssize_t n_bytes; - - /** length of the block to read or write */ - ulint len; -#else - /** length of the block to read or write */ - ulint len; - - /** bytes written/read. */ - ulint n_bytes; -#endif /* WIN_ASYNC_IO */ - - /** Length of the block before it was compressed */ - uint32 original_len; - -}; - -/** The asynchronous i/o array structure */ -class AIO { +/* Per-IO operation environment*/ +class io_slots +{ +private: + tpool::cache<tpool::aiocb> m_cache; + tpool::task_group m_group; public: - /** Constructor - @param[in] id Latch ID - @param[in] n_slots Number of slots to configure - @param[in] segments Number of segments to configure */ - AIO(latch_id_t id, ulint n_slots, ulint segments); - - /** Destructor */ - ~AIO(); - - /** Initialize the instance - @return DB_SUCCESS or error code */ - dberr_t init(); - - /** Requests for a slot in the aio array. If no slot is available, waits - until not_full-event becomes signaled. - - @param[in] type IO context - @param[in,out] m1 message to be passed along with the AIO - operation - @param[in,out] m2 message to be passed along with the AIO - operation - @param[in] file file handle - @param[in] name name of the file or path as a null-terminated - string - @param[in,out] buf buffer where to read or from which to write - @param[in] offset file offset, where to read from or start writing - @param[in] len length of the block to read or write - @return pointer to slot */ - Slot* reserve_slot( - const IORequest& type, - fil_node_t* m1, - void* m2, - pfs_os_file_t file, - const char* name, - void* buf, - os_offset_t offset, - ulint len) - MY_ATTRIBUTE((warn_unused_result)); - - /** @return number of reserved slots */ - ulint pending_io_count() const; - - /** Returns a pointer to the nth slot in the aio array. - @param[in] index Index of the slot in the array - @return pointer to slot */ - const Slot* at(ulint i) const - MY_ATTRIBUTE((warn_unused_result)) + io_slots(int max_submitted_io, int max_callback_concurrency) : + m_cache(max_submitted_io), + m_group(max_callback_concurrency) { - ut_a(i < m_slots.size()); - - return(&m_slots[i]); } - - /** Non const version */ - Slot* at(ulint i) - MY_ATTRIBUTE((warn_unused_result)) + /* Get cached AIO control block */ + tpool::aiocb* acquire() { - ut_a(i < m_slots.size()); - - return(&m_slots[i]); + return m_cache.get(); } - - /** Frees a slot in the AIO array, assumes caller owns the mutex. - @param[in,out] slot Slot to release */ - void release(Slot* slot); - - /** Frees a slot in the AIO array, assumes caller doesn't own the mutex. - @param[in,out] slot Slot to release */ - void release_with_mutex(Slot* slot); - - /** Prints info about the aio array. - @param[in,out] file Where to print */ - void print(FILE* file); - - /** @return the number of slots per segment */ - ulint slots_per_segment() const - MY_ATTRIBUTE((warn_unused_result)) - { - return(m_slots.size() / m_n_segments); - } - - /** @return accessor for n_segments */ - ulint get_n_segments() const - MY_ATTRIBUTE((warn_unused_result)) + /* Release AIO control block back to cache */ + void release(tpool::aiocb* aiocb) { - return(m_n_segments); + m_cache.put(aiocb); } -#ifdef UNIV_DEBUG - /** @return true if the thread owns the mutex */ - bool is_mutex_owned() const - MY_ATTRIBUTE((warn_unused_result)) + bool contains(tpool::aiocb* aiocb) { - return(mutex_own(&m_mutex)); + return m_cache.contains(aiocb); } -#endif /* UNIV_DEBUG */ - /** Acquire the mutex */ - void acquire() const + /* Wait for completions of all AIO operations */ + void wait() { - mutex_enter(&m_mutex); + m_cache.wait(); } - /** Release the mutex */ - void release() const - { - mutex_exit(&m_mutex); - } - - /** Write out the state to the file/stream - @param[in, out] file File to write to */ - void to_file(FILE* file) const; - -#ifdef LINUX_NATIVE_AIO - /** Dispatch an AIO request to the kernel. - @param[in,out] slot an already reserved slot - @return true on success. */ - bool linux_dispatch(Slot* slot) - MY_ATTRIBUTE((warn_unused_result)); - - /** Accessor for an AIO event - @param[in] index Index into the array - @return the event at the index */ - io_event* io_events(ulint index) - MY_ATTRIBUTE((warn_unused_result)) + tpool::task_group* get_task_group() { - ut_a(index < m_events.size()); - - return(&m_events[index]); + return &m_group; } - /** Accessor for the AIO context - @param[in] segment Segment for which to get the context - @return the AIO context for the segment */ - io_context* io_ctx(ulint segment) - MY_ATTRIBUTE((warn_unused_result)) + ~io_slots() { - ut_ad(segment < get_n_segments()); - - return(m_aio_ctx[segment]); + wait(); } +}; - /** Creates an io_context for native linux AIO. - @param[in] max_events number of events - @param[out] io_ctx io_ctx to initialize. - @return true on success. */ - static bool linux_create_io_ctx(unsigned max_events, io_context_t* io_ctx) - MY_ATTRIBUTE((warn_unused_result)); +io_slots* read_slots; +io_slots* write_slots; +io_slots* ibuf_slots; - /** Checks if the system supports native linux aio. On some kernel - versions where native aio is supported it won't work on tmpfs. In such - cases we can't use native aio as it is not possible to mix simulated - and native aio. - @return true if supported, false otherwise. */ - static bool is_linux_native_aio_supported() - MY_ATTRIBUTE((warn_unused_result)); -#endif /* LINUX_NATIVE_AIO */ +/** Number of retries for partial I/O's */ +static const ulint NUM_RETRIES_ON_PARTIAL_IO = 10; -#ifdef WIN_ASYNC_IO - HANDLE m_completion_port; - /** Wake up all AIO threads in Windows native aio */ - static void wake_at_shutdown() { - AIO *all_arrays[] = {s_reads, s_writes, s_log, s_ibuf }; - for (size_t i = 0; i < array_elements(all_arrays); i++) { - AIO *a = all_arrays[i]; - if (a) { - PostQueuedCompletionStatus(a->m_completion_port, 0, - IOCP_SHUTDOWN_KEY, 0); - } - } - } -#endif /* WIN_ASYNC_IO */ +/* This specifies the file permissions InnoDB uses when it creates files in +Unix; the value of os_innodb_umask is initialized in ha_innodb.cc to +my_umask */ -#ifdef _WIN32 - /** This function can be called if one wants to post a batch of reads - and prefers an I/O - handler thread to handle them all at once later.You - must call os_aio_simulated_wake_handler_threads later to ensure the - threads are not left sleeping! */ - static void simulated_put_read_threads_to_sleep(); +#ifndef _WIN32 +/** Umask for creating files */ +static ulint os_innodb_umask = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP; +#else +/** Umask for creating files */ +static ulint os_innodb_umask = 0; #endif /* _WIN32 */ - /** Create an instance using new(std::nothrow) - @param[in] id Latch ID - @param[in] n_slots The number of AIO request slots - @param[in] segments The number of segments - @return a new AIO instance */ - static AIO* create( - latch_id_t id, - ulint n_slots, - ulint segments) - MY_ATTRIBUTE((warn_unused_result)); - - /** Initializes the asynchronous io system. Creates one array each - for ibuf and log I/O. Also creates one array each for read and write - where each array is divided logically into n_readers and n_writers - respectively. The caller must create an i/o handler thread for each - segment in these arrays. This function also creates the sync array. - No I/O handler thread needs to be created for that - @param[in] n_per_seg maximum number of pending aio - operations allowed per segment - @param[in] n_readers number of reader threads - @param[in] n_writers number of writer threads - @param[in] n_slots_sync number of slots in the sync aio array - @return true if AIO sub-system was started successfully */ - static bool start( - ulint n_per_seg, - ulint n_readers, - ulint n_writers, - ulint n_slots_sync) - MY_ATTRIBUTE((warn_unused_result)); - - /** Free the AIO arrays */ - static void shutdown(); - - /** Print all the AIO segments - @param[in,out] file Where to print */ - static void print_all(FILE* file); - - /** Calculates local segment number and aio array from global - segment number. - @param[out] array AIO wait array - @param[in] segment global segment number - @return local segment number within the aio array */ - static ulint get_array_and_local_segment( - AIO** array, - ulint segment) - MY_ATTRIBUTE((warn_unused_result)); - - /** Select the IO slot array - @param[in,out] type Type of IO, READ or WRITE - @param[in] read_only true if running in read-only mode - @param[in] mode IO mode - @return slot array or NULL if invalid mode specified */ - static AIO* select_slot_array( - IORequest& type, - bool read_only, - ulint mode) - MY_ATTRIBUTE((warn_unused_result)); - - /** Calculates segment number for a slot. - @param[in] array AIO wait array - @param[in] slot slot in this array - @return segment number (which is the number used by, for example, - I/O handler threads) */ - static ulint get_segment_no_from_slot( - const AIO* array, - const Slot* slot) - MY_ATTRIBUTE((warn_unused_result)); - - /** Wakes up a simulated AIO I/O-handler thread if it has something - to do. - @param[in] global_segment the number of the segment in the - AIO arrays */ - static void wake_simulated_handler_thread(ulint global_segment); - - /** Check if it is a read request - @param[in] aio The AIO instance to check - @return true if the AIO instance is for reading. */ - static bool is_read(const AIO* aio) - MY_ATTRIBUTE((warn_unused_result)) - { - return(s_reads == aio); - } - - /** Wait on an event until no pending writes */ - static void wait_until_no_pending_writes() - { - os_event_wait(AIO::s_writes->m_is_empty); - } - - /** Print to file - @param[in] file File to write to */ - static void print_to_file(FILE* file); - - /** Check for pending IO. Gets the count and also validates the - data structures. - @return count of pending IO requests */ - static ulint total_pending_io_count(); - -private: - /** Initialise the slots - @return DB_SUCCESS or error code */ - dberr_t init_slots() - MY_ATTRIBUTE((warn_unused_result)); - - /** Wakes up a simulated AIO I/O-handler thread if it has something - to do for a local segment in the AIO array. - @param[in] global_segment the number of the segment in the - AIO arrays - @param[in] segment the local segment in the AIO array */ - void wake_simulated_handler_thread(ulint global_segment, ulint segment); - - /** Prints pending IO requests per segment of an aio array. - We probably don't need per segment statistics but they can help us - during development phase to see if the IO requests are being - distributed as expected. - @param[in,out] file file where to print - @param[in] segments pending IO array */ - void print_segment_info( - FILE* file, - const ulint* segments); - -#ifdef LINUX_NATIVE_AIO - /** Initialise the Linux native AIO data structures - @return DB_SUCCESS or error code */ - dberr_t init_linux_native_aio() - MY_ATTRIBUTE((warn_unused_result)); -#endif /* LINUX_NATIVE_AIO */ - -private: - typedef std::vector<Slot> Slots; - - /** the mutex protecting the aio array */ - mutable SysMutex m_mutex; - - /** Pointer to the slots in the array. - Number of elements must be divisible by n_threads. */ - Slots m_slots; - - /** Number of segments in the aio array of pending aio requests. - A thread can wait separately for any one of the segments. */ - ulint m_n_segments; - - /** The event which is set to the signaled state when - there is space in the aio outside the ibuf segment; - os_event_set() and os_event_reset() are protected by AIO::m_mutex */ - os_event_t m_not_full; - - /** The event which is set to the signaled state when - there are no pending i/os in this array; - os_event_set() and os_event_reset() are protected by AIO::m_mutex */ - os_event_t m_is_empty; - - /** Number of reserved slots in the AIO array outside - the ibuf segment */ - ulint m_n_reserved; - - -#if defined(LINUX_NATIVE_AIO) - typedef std::vector<io_event> IOEvents; - - /** completion queue for IO. There is one such queue per - segment. Each thread will work on one ctx exclusively. */ - io_context_t* m_aio_ctx; - - /** The array to collect completed IOs. There is one such - event for each possible pending IO. The size of the array - is equal to m_slots.size(). */ - IOEvents m_events; -#endif /* LINUX_NATIV_AIO */ - - /** The aio arrays for non-ibuf i/o and ibuf i/o, as well as - sync AIO. These are NULL when the module has not yet been - initialized. */ - - /** Insert buffer */ - static AIO* s_ibuf; - /** Redo log */ - static AIO* s_log; - - /** Reads */ - static AIO* s_reads; - - /** Writes */ - static AIO* s_writes; - - /** Synchronous I/O */ - static AIO* s_sync; -}; - -/** Static declarations */ -AIO* AIO::s_reads; -AIO* AIO::s_writes; -AIO* AIO::s_ibuf; -AIO* AIO::s_log; -AIO* AIO::s_sync; - -#if defined(LINUX_NATIVE_AIO) -/** timeout for each io_getevents() call = 500ms. */ -static const ulint OS_AIO_REAP_TIMEOUT = 500000000UL; - -/** time to sleep, in microseconds if io_setup() returns EAGAIN. */ -static const ulint OS_AIO_IO_SETUP_RETRY_SLEEP = 500000UL; - -/** number of attempts before giving up on io_setup(). */ -static const int OS_AIO_IO_SETUP_RETRY_ATTEMPTS = 5; -#endif /* LINUX_NATIVE_AIO */ - -/** Array of events used in simulated AIO */ -static os_event_t* os_aio_segment_wait_events; +/** Flag indicating if the page_cleaner is in active state. */ +extern bool buf_page_cleaner_is_active; -/** Number of asynchronous I/O segments. Set by os_aio_init(). */ -static ulint os_aio_n_segments = ULINT_UNDEFINED; +#ifdef WITH_INNODB_DISALLOW_WRITES +#define WAIT_ALLOW_WRITES() os_event_wait(srv_allow_writes_event) +#else +#define WAIT_ALLOW_WRITES() do { } while (0) +#endif /* WITH_INNODB_DISALLOW_WRITES */ -/** If the following is true, read i/o handler threads try to -wait until a batch of new read requests have been posted */ -static bool os_aio_recommend_sleep_for_read_threads; ulint os_n_file_reads; static ulint os_bytes_read_since_printout; @@ -688,11 +168,12 @@ bool os_has_said_disk_full; /** Default Zip compression level */ extern uint page_zip_level; -/** Validates the consistency of the aio system. -@return true if ok */ -static -bool -os_aio_validate(); +#ifdef UNIV_PFS_IO +/* Keys to register InnoDB I/O with performance schema */ +mysql_pfs_key_t innodb_data_file_key; +mysql_pfs_key_t innodb_log_file_key; +mysql_pfs_key_t innodb_temp_file_key; +#endif /** Handle errors for file operations. @param[in] name name of a file or NULL @@ -756,30 +237,8 @@ static void os_file_handle_rename_error(const char* name, const char* new_name) } } -/** Does simulated AIO. This function should be called by an i/o-handler -thread. - -@param[in] segment The number of the segment in the aio arrays to wait - for; segment 0 is the ibuf i/o thread, segment 1 the - log i/o thread, then follow the non-ibuf read threads, - and as the last are the non-ibuf write threads -@param[out] m1 the messages passed with the AIO request; note that - also in the case where the AIO operation failed, these - output parameters are valid and can be used to restart - the operation, for example -@param[out] m2 Callback argument -@param[in] type IO context -@return DB_SUCCESS or error code */ -static -dberr_t -os_aio_simulated_handler( - ulint global_segment, - fil_node_t** m1, - void** m2, - IORequest* type); #ifdef _WIN32 -static HANDLE win_get_syncio_event(); /** Wrapper around Windows DeviceIoControl() function. @@ -803,7 +262,7 @@ os_win32_device_io_control( ) { OVERLAPPED overlapped = { 0 }; - overlapped.hEvent = win_get_syncio_event(); + overlapped.hEvent = tpool::win_get_syncio_event(); BOOL result = DeviceIoControl(handle, code, inbuf, inbuf_size, outbuf, outbuf_size, NULL, &overlapped); @@ -817,47 +276,7 @@ os_win32_device_io_control( #endif -#ifdef WIN_ASYNC_IO -/** This function is only used in Windows asynchronous i/o. -Waits for an aio operation to complete. This function is used to wait the -for completed requests. The aio array of pending requests is divided -into segments. The thread specifies which segment or slot it wants to wait -for. NOTE: this function will also take care of freeing the aio slot, -therefore no other thread is allowed to do the freeing! -@param[in] segment The number of the segment in the aio arrays to -wait for; segment 0 is the ibuf I/O thread, -segment 1 the log I/O thread, then follow the -non-ibuf read threads, and as the last are the -non-ibuf write threads; if this is -ULINT_UNDEFINED, then it means that sync AIO -is used, and this parameter is ignored -@param[in] pos this parameter is used only in sync AIO: -wait for the aio slot at this position -@param[out] m1 the messages passed with the AIO request; note -that also in the case where the AIO operation -failed, these output parameters are valid and -can be used to restart the operation, -for example -@param[out] m2 callback message -@param[out] type OS_FILE_WRITE or ..._READ -@return DB_SUCCESS or error code */ -static -dberr_t -os_aio_windows_handler( - ulint segment, - ulint pos, - fil_node_t** m1, - void** m2, - IORequest* type); -#endif /* WIN_ASYNC_IO */ -/** Generic AIO Handler methods. Currently handles IO post processing. */ -class AIOHandler { -public: - /** Do any post processing after a read/write - @return DB_SUCCESS or error code. */ - static dberr_t post_io_processing(Slot* slot); -}; /** Helper class for doing synchronous file IO. Currently, the objective is to hide the OS specific code, so that the higher level functions aren't @@ -890,10 +309,7 @@ public: @return the number of bytes read/written or negative value on error */ ssize_t execute(const IORequest& request); - /** Do the read/write - @param[in,out] slot The IO slot, it has the IO context - @return the number of bytes read/written or negative value on error */ - static ssize_t execute(Slot* slot); + /** Move the read/write offset up to where the partial IO succeeded. @param[in] n_bytes The number of bytes to advance */ @@ -922,67 +338,6 @@ private: os_offset_t m_offset; }; -/** Do any post processing after a read/write -@return DB_SUCCESS or error code. */ -dberr_t -AIOHandler::post_io_processing(Slot* slot) -{ - ut_ad(slot->is_reserved); - - /* Total bytes read so far */ - ulint n_bytes = ulint(slot->ptr - slot->buf) + slot->n_bytes; - - return(n_bytes == slot->original_len ? DB_SUCCESS : DB_FAIL); -} - -/** Count the number of free slots -@return number of reserved slots */ -ulint -AIO::pending_io_count() const -{ - acquire(); - -#ifdef UNIV_DEBUG - ut_a(m_n_segments > 0); - ut_a(!m_slots.empty()); - - ulint count = 0; - - for (ulint i = 0; i < m_slots.size(); ++i) { - - const Slot& slot = m_slots[i]; - - if (slot.is_reserved) { - ++count; - ut_a(slot.len > 0); - } - } - - ut_a(m_n_reserved == count); -#endif /* UNIV_DEBUG */ - - ulint reserved = m_n_reserved; - - release(); - - return(reserved); -} - -#ifdef UNIV_DEBUG -/** Validates the consistency the aio system some of the time. -@return true if ok or the check was skipped */ -static -bool -os_aio_validate_skip() -{ -/** Try os_aio_validate() every this many times */ -# define OS_AIO_VALIDATE_SKIP 13 - - static Atomic_counter<uint32_t> os_aio_validate_count; - return (os_aio_validate_count++ % OS_AIO_VALIDATE_SKIP) || os_aio_validate(); -} -#endif /* UNIV_DEBUG */ - #undef USE_FILE_LOCK #ifndef _WIN32 /* On Windows, mandatory locking is used */ @@ -1026,101 +381,6 @@ os_file_lock( } #endif /* USE_FILE_LOCK */ -/** Calculates local segment number and aio array from global segment number. -@param[out] array aio wait array -@param[in] segment global segment number -@return local segment number within the aio array */ -ulint -AIO::get_array_and_local_segment( - AIO** array, - ulint segment) -{ - ulint local_segment; - ulint n_extra_segs = (srv_read_only_mode) ? 0 : 2; - - ut_a(segment < os_aio_n_segments); - - if (!srv_read_only_mode && segment < n_extra_segs) { - - /* We don't support ibuf/log IO during read only mode. */ - - if (segment == IO_IBUF_SEGMENT) { - - *array = s_ibuf; - - } else if (segment == IO_LOG_SEGMENT) { - - *array = s_log; - - } else { - *array = NULL; - } - - local_segment = 0; - - } else if (segment < s_reads->m_n_segments + n_extra_segs) { - - *array = s_reads; - local_segment = segment - n_extra_segs; - - } else { - *array = s_writes; - - local_segment = segment - - (s_reads->m_n_segments + n_extra_segs); - } - - return(local_segment); -} - -/** Frees a slot in the aio array. Assumes caller owns the mutex. -@param[in,out] slot Slot to release */ -void -AIO::release(Slot* slot) -{ - ut_ad(is_mutex_owned()); - - ut_ad(slot->is_reserved); - - slot->is_reserved = false; - - --m_n_reserved; - - if (m_n_reserved == m_slots.size() - 1) { - os_event_set(m_not_full); - } - - if (m_n_reserved == 0) { - os_event_set(m_is_empty); - } - -#if defined(LINUX_NATIVE_AIO) - - if (srv_use_native_aio) { - memset(&slot->control, 0x0, sizeof(slot->control)); - slot->ret = 0; - slot->n_bytes = 0; - } else { - /* These fields should not be used if we are not - using native AIO. */ - ut_ad(slot->n_bytes == 0); - ut_ad(slot->ret == 0); - } - -#endif /* WIN_ASYNC_IO */ -} - -/** Frees a slot in the AIO array. Assumes caller doesn't own the mutex. -@param[in,out] slot Slot to release */ -void -AIO::release_with_mutex(Slot* slot) -{ - acquire(); - - release(slot); - - release(); -} /** Create a temporary file. This function is like tmpfile(3), but the temporary file is created in the in the mysql server configuration @@ -1464,7 +724,7 @@ os_file_create_subdirs_if_needed( return(success ? DB_SUCCESS : DB_ERROR); } -#ifndef _WIN32 + /** Do the read/write @param[in] request The IO context and type @@ -1475,14 +735,24 @@ SyncFileIO::execute(const IORequest& request) ssize_t n_bytes; if (request.is_read()) { +#ifdef _WIN32 + n_bytes = tpool::pread(m_fh, m_buf, m_n, m_offset); +#else n_bytes = pread(m_fh, m_buf, m_n, m_offset); +#endif } else { ut_ad(request.is_write()); +#ifdef _WIN32 + n_bytes = tpool::pwrite(m_fh, m_buf, m_n, m_offset); +#else n_bytes = pwrite(m_fh, m_buf, m_n, m_offset); +#endif } return(n_bytes); } + +#ifndef _WIN32 /** Free storage space associated with a section of the file. @param[in] fh Open file handle @param[in] off Starting offset (SEEK_SET) @@ -1526,732 +796,7 @@ os_file_punch_hole_posix( return(DB_IO_NO_PUNCH_HOLE); } -#if defined(LINUX_NATIVE_AIO) - -/** Linux native AIO handler */ -class LinuxAIOHandler { -public: - /** - @param[in] global_segment The global segment*/ - LinuxAIOHandler(ulint global_segment) - : - m_global_segment(global_segment) - { - /* Should never be doing Sync IO here. */ - ut_a(m_global_segment != ULINT_UNDEFINED); - - /* Find the array and the local segment. */ - - m_segment = AIO::get_array_and_local_segment( - &m_array, m_global_segment); - m_n_slots = m_array->slots_per_segment(); - } - - /** Destructor */ - ~LinuxAIOHandler() - { - // No op - } - - /** - Process a Linux AIO request - @param[out] m1 the messages passed with the - @param[out] m2 AIO request; note that in case the - AIO operation failed, these output - parameters are valid and can be used to - restart the operation. - @param[out] request IO context - @return DB_SUCCESS or error code */ - dberr_t poll(fil_node_t** m1, void** m2, IORequest* request); - -private: - /** Resubmit an IO request that was only partially successful - @param[in,out] slot Request to resubmit - @return DB_SUCCESS or DB_FAIL if the IO resubmit request failed */ - dberr_t resubmit(Slot* slot); - - /** Check if the AIO succeeded - @param[in,out] slot The slot to check - @return DB_SUCCESS, DB_FAIL if the operation should be retried or - DB_IO_ERROR on all other errors */ - dberr_t check_state(Slot* slot); - - /** @return true if a shutdown was detected */ - bool is_shutdown() const - { - return(srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS - && !buf_page_cleaner_is_active); - } - - /** If no slot was found then the m_array->m_mutex will be released. - @param[out] n_pending The number of pending IOs - @return NULL or a slot that has completed IO */ - Slot* find_completed_slot(ulint* n_pending); - - /** This is called from within the IO-thread. If there are no completed - IO requests in the slot array, the thread calls this function to - collect more requests from the Linux kernel. - The IO-thread waits on io_getevents(), which is a blocking call, with - a timeout value. Unless the system is very heavy loaded, keeping the - IO-thread very busy, the io-thread will spend most of its time waiting - in this function. - The IO-thread also exits in this function. It checks server status at - each wakeup and that is why we use timed wait in io_getevents(). */ - void collect(); - -private: - /** Slot array */ - AIO* m_array; - - /** Number of slots inthe local segment */ - ulint m_n_slots; - - /** The local segment to check */ - ulint m_segment; - - /** The global segment */ - ulint m_global_segment; -}; - -/** Resubmit an IO request that was only partially successful -@param[in,out] slot Request to resubmit -@return DB_SUCCESS or DB_FAIL if the IO resubmit request failed */ -dberr_t -LinuxAIOHandler::resubmit(Slot* slot) -{ -#ifdef UNIV_DEBUG - /* Bytes already read/written out */ - ulint n_bytes = slot->ptr - slot->buf; - - ut_ad(m_array->is_mutex_owned()); - - ut_ad(n_bytes < slot->original_len); - ut_ad(static_cast<ulint>(slot->n_bytes) < slot->original_len - n_bytes); - /* Partial read or write scenario */ - ut_ad(slot->len >= static_cast<ulint>(slot->n_bytes)); -#endif /* UNIV_DEBUG */ - - slot->len -= slot->n_bytes; - slot->ptr += slot->n_bytes; - slot->offset += slot->n_bytes; - - /* Resetting the bytes read/written */ - slot->n_bytes = 0; - slot->io_already_done = false; - - compile_time_assert(sizeof(off_t) >= sizeof(os_offset_t)); - - struct iocb* iocb = &slot->control; - - if (slot->type.is_read()) { - - io_prep_pread( - iocb, - slot->file, - slot->ptr, - slot->len, - slot->offset); - } else { - - ut_a(slot->type.is_write()); - - io_prep_pwrite( - iocb, - slot->file, - slot->ptr, - slot->len, - slot->offset); - } - - iocb->data = slot; - - /* Resubmit an I/O request */ - int ret = io_submit(m_array->io_ctx(m_segment), 1, &iocb); - srv_stats.buffered_aio_submitted.inc(); - - if (ret < -1) { - errno = -ret; - } - - return(ret < 0 ? DB_IO_PARTIAL_FAILED : DB_SUCCESS); -} - -/** Check if the AIO succeeded -@param[in,out] slot The slot to check -@return DB_SUCCESS, DB_FAIL if the operation should be retried or - DB_IO_ERROR on all other errors */ -dberr_t -LinuxAIOHandler::check_state(Slot* slot) -{ - ut_ad(m_array->is_mutex_owned()); - - /* Note that it may be that there is more then one completed - IO requests. We process them one at a time. We may have a case - here to improve the performance slightly by dealing with all - requests in one sweep. */ - - srv_set_io_thread_op_info( - m_global_segment, "processing completed aio requests"); - - ut_ad(slot->io_already_done); - - dberr_t err = DB_SUCCESS; - - if (slot->ret == 0) { - - err = AIOHandler::post_io_processing(slot); - - } else { - errno = -slot->ret; - - /* os_file_handle_error does tell us if we should retry - this IO. As it stands now, we don't do this retry when - reaping requests from a different context than - the dispatcher. This non-retry logic is the same for - Windows and Linux native AIO. - We should probably look into this to transparently - re-submit the IO. */ - os_file_handle_error(slot->name, "Linux aio"); - - err = DB_IO_ERROR; - } - - return(err); -} - -/** If no slot was found then the m_array->m_mutex will be released. -@param[out] n_pending The number of pending IOs -@return NULL or a slot that has completed IO */ -Slot* -LinuxAIOHandler::find_completed_slot(ulint* n_pending) -{ - ulint offset = m_n_slots * m_segment; - - *n_pending = 0; - - m_array->acquire(); - - Slot* slot = m_array->at(offset); - - for (ulint i = 0; i < m_n_slots; ++i, ++slot) { - - if (slot->is_reserved) { - - ++*n_pending; - - if (slot->io_already_done) { - - /* Something for us to work on. - Note: We don't release the mutex. */ - return(slot); - } - } - } - - m_array->release(); - - return(NULL); -} - -/** This function is only used in Linux native asynchronous i/o. This is -called from within the io-thread. If there are no completed IO requests -in the slot array, the thread calls this function to collect more -requests from the kernel. -The io-thread waits on io_getevents(), which is a blocking call, with -a timeout value. Unless the system is very heavy loaded, keeping the -io-thread very busy, the io-thread will spend most of its time waiting -in this function. -The io-thread also exits in this function. It checks server status at -each wakeup and that is why we use timed wait in io_getevents(). */ -void -LinuxAIOHandler::collect() -{ - ut_ad(m_n_slots > 0); - ut_ad(m_array != NULL); - ut_ad(m_segment < m_array->get_n_segments()); - - /* Which io_context we are going to use. */ - io_context* io_ctx = m_array->io_ctx(m_segment); - - /* Starting point of the m_segment we will be working on. */ - ulint start_pos = m_segment * m_n_slots; - - /* End point. */ - ulint end_pos = start_pos + m_n_slots; - - for (;;) { - struct io_event* events; - - /* Which part of event array we are going to work on. */ - events = m_array->io_events(m_segment * m_n_slots); - - /* Initialize the events. */ - memset(events, 0, sizeof(*events) * m_n_slots); - - /* The timeout value is arbitrary. We probably need - to experiment with it a little. */ - struct timespec timeout; - - timeout.tv_sec = 0; - timeout.tv_nsec = OS_AIO_REAP_TIMEOUT; - - int ret; - - ret = io_getevents(io_ctx, 1, m_n_slots, events, &timeout); - - for (int i = 0; i < ret; ++i) { - - struct iocb* iocb; - - iocb = reinterpret_cast<struct iocb*>(events[i].obj); - ut_a(iocb != NULL); - - Slot* slot = reinterpret_cast<Slot*>(iocb->data); - - /* Some sanity checks. */ - ut_a(slot != NULL); - ut_a(slot->is_reserved); - - /* We are not scribbling previous segment. */ - ut_a(slot->pos >= start_pos); - - /* We have not overstepped to next segment. */ - ut_a(slot->pos < end_pos); - - /* Deallocate unused blocks from file system. - This is newer done to page 0 or to log files.*/ - if (slot->offset > 0 - && !slot->type.is_log() - && slot->type.is_write() - && slot->type.punch_hole()) { - - slot->err = slot->type.punch_hole( - slot->file, - slot->offset, slot->len); - } else { - slot->err = DB_SUCCESS; - } - - /* Mark this request as completed. The error handling - will be done in the calling function. */ - m_array->acquire(); - - /* events[i].res2 should always be ZERO */ - ut_ad(events[i].res2 == 0); - slot->io_already_done = true; - - /*Even though events[i].res is an unsigned number - in libaio, it is used to return a negative value - (negated errno value) to indicate error and a positive - value to indicate number of bytes read or written. */ - - if (events[i].res > slot->len) { - /* failure */ - slot->n_bytes = 0; - slot->ret = events[i].res; - } else { - /* success */ - slot->n_bytes = events[i].res; - slot->ret = 0; - } - m_array->release(); - } - - if (srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS - || !buf_page_cleaner_is_active - || ret > 0) { - - break; - } - - /* This error handling is for any error in collecting the - IO requests. The errors, if any, for any particular IO - request are simply passed on to the calling routine. */ - - switch (ret) { - case -EAGAIN: - /* Not enough resources! Try again. */ - - case -EINTR: - /* Interrupted! The behaviour in case of an interrupt. - If we have some completed IOs available then the - return code will be the number of IOs. We get EINTR - only if there are no completed IOs and we have been - interrupted. */ - - case 0: - /* No pending request! Go back and check again. */ - - continue; - } - - /* All other errors should cause a trap for now. */ - ib::fatal() - << "Unexpected ret_code[" << ret - << "] from io_getevents()!"; - - break; - } -} - -/** Process a Linux AIO request -@param[out] m1 the messages passed with the -@param[out] m2 AIO request; note that in case the - AIO operation failed, these output - parameters are valid and can be used to - restart the operation. -@param[out] request IO context -@return DB_SUCCESS or error code */ -dberr_t -LinuxAIOHandler::poll(fil_node_t** m1, void** m2, IORequest* request) -{ - dberr_t err = DB_SUCCESS; - Slot* slot; - - /* Loop until we have found a completed request. */ - for (;;) { - - ulint n_pending; - - slot = find_completed_slot(&n_pending); - - if (slot != NULL) { - - ut_ad(m_array->is_mutex_owned()); - - err = check_state(slot); - - /* DB_FAIL is not a hard error, we should retry */ - if (err != DB_FAIL) { - break; - } - - /* Partial IO, resubmit request for - remaining bytes to read/write */ - err = resubmit(slot); - - if (err != DB_SUCCESS) { - break; - } - - m_array->release(); - - } else if (is_shutdown() && n_pending == 0) { - - /* There is no completed request. If there is - no pending request at all, and the system is - being shut down, exit. */ - - *m1 = NULL; - *m2 = NULL; - - return(DB_SUCCESS); - - } else { - - /* Wait for some request. Note that we return - from wait if we have found a request. */ - - srv_set_io_thread_op_info( - m_global_segment, - "waiting for completed aio requests"); - - collect(); - } - } - - if (err == DB_IO_PARTIAL_FAILED) { - /* Aborting in case of submit failure */ - ib::fatal() - << "Native Linux AIO interface. " - "io_submit() call failed when " - "resubmitting a partial I/O " - "request on the file " << slot->name - << "."; - } - - *m1 = slot->m1; - *m2 = slot->m2; - - *request = slot->type; - - m_array->release(slot); - - m_array->release(); - - return(err); -} - -/** This function is only used in Linux native asynchronous i/o. -Waits for an aio operation to complete. This function is used to wait for -the completed requests. The aio array of pending requests is divided -into segments. The thread specifies which segment or slot it wants to wait -for. NOTE: this function will also take care of freeing the aio slot, -therefore no other thread is allowed to do the freeing! - -@param[in] global_seg segment number in the aio array - to wait for; segment 0 is the ibuf - i/o thread, segment 1 is log i/o thread, - then follow the non-ibuf read threads, - and the last are the non-ibuf write - threads. -@param[out] m1 the messages passed with the -@param[out] m2 AIO request; note that in case the - AIO operation failed, these output - parameters are valid and can be used to - restart the operation. -@param[out]xi request IO context -@return DB_SUCCESS if the IO was successful */ -static -dberr_t -os_aio_linux_handler( - ulint global_segment, - fil_node_t** m1, - void** m2, - IORequest* request) -{ - return LinuxAIOHandler(global_segment).poll(m1, m2, request); -} - -/** Dispatch an AIO request to the kernel. -@param[in,out] slot an already reserved slot -@return true on success. */ -bool -AIO::linux_dispatch(Slot* slot) -{ - ut_a(slot->is_reserved); - ut_ad(slot->type.validate()); - - /* Find out what we are going to work with. - The iocb struct is directly in the slot. - The io_context is one per segment. */ - - ulint io_ctx_index; - struct iocb* iocb = &slot->control; - - io_ctx_index = (slot->pos * m_n_segments) / m_slots.size(); - - int ret = io_submit(m_aio_ctx[io_ctx_index], 1, &iocb); - srv_stats.buffered_aio_submitted.inc(); - - /* io_submit() returns number of successfully queued requests - or -errno. */ - - if (ret != 1) { - errno = -ret; - } - - return(ret == 1); -} - -/** Creates an io_context for native linux AIO. -@param[in] max_events number of events -@param[out] io_ctx io_ctx to initialize. -@return true on success. */ -bool -AIO::linux_create_io_ctx( - unsigned max_events, - io_context_t* io_ctx) -{ - ssize_t n_retries = 0; - - for (;;) { - - memset(io_ctx, 0x0, sizeof(*io_ctx)); - - /* Initialize the io_ctx. Tell it how many pending - IO requests this context will handle. */ - - int ret = io_setup(max_events, io_ctx); - - if (ret == 0) { - /* Success. Return now. */ - return(true); - } - - /* If we hit EAGAIN we'll make a few attempts before failing. */ - - switch (ret) { - case -EAGAIN: - if (n_retries == 0) { - /* First time around. */ - ib::warn() - << "io_setup() failed with EAGAIN." - " Will make " - << OS_AIO_IO_SETUP_RETRY_ATTEMPTS - << " attempts before giving up."; - } - - if (n_retries < OS_AIO_IO_SETUP_RETRY_ATTEMPTS) { - - ++n_retries; - - ib::warn() - << "io_setup() attempt " - << n_retries << "."; - - os_thread_sleep(OS_AIO_IO_SETUP_RETRY_SLEEP); - - continue; - } - - /* Have tried enough. Better call it a day. */ - ib::error() - << "io_setup() failed with EAGAIN after " - << OS_AIO_IO_SETUP_RETRY_ATTEMPTS - << " attempts."; - break; - - case -ENOSYS: - ib::error() - << "Linux Native AIO interface" - " is not supported on this platform. Please" - " check your OS documentation and install" - " appropriate binary of InnoDB."; - - break; - - default: - ib::error() - << "Linux Native AIO setup" - << " returned following error[" - << ret << "]"; - break; - } - - ib::info() - << "You can disable Linux Native AIO by" - " setting innodb_use_native_aio = 0 in my.cnf"; - - break; - } - - return(false); -} - -/** Checks if the system supports native linux aio. On some kernel -versions where native aio is supported it won't work on tmpfs. In such -cases we can't use native aio as it is not possible to mix simulated -and native aio. -@return: true if supported, false otherwise. */ -bool -AIO::is_linux_native_aio_supported() -{ - File fd; - io_context_t io_ctx; - char name[1000]; - - if (!linux_create_io_ctx(1, &io_ctx)) { - - /* The platform does not support native aio. */ - - return(false); - - } else if (!srv_read_only_mode) { - - /* Now check if tmpdir supports native aio ops. */ - fd = mysql_tmpfile("ib"); - - if (fd < 0) { - ib::warn() - << "Unable to create temp file to check" - " native AIO support."; - - return(false); - } - } else { - - os_normalize_path(srv_log_group_home_dir); - - ulint dirnamelen = strlen(srv_log_group_home_dir); - - ut_a(dirnamelen < (sizeof name) - 10 - sizeof "ib_logfile"); - - memcpy(name, srv_log_group_home_dir, dirnamelen); - - /* Add a path separator if needed. */ - if (dirnamelen && name[dirnamelen - 1] != OS_PATH_SEPARATOR) { - - name[dirnamelen++] = OS_PATH_SEPARATOR; - } - - strcpy(name + dirnamelen, "ib_logfile0"); - - fd = my_open(name, O_RDONLY | O_CLOEXEC, MYF(0)); - - if (fd == -1) { - - ib::warn() - << "Unable to open" - << " \"" << name << "\" to check native" - << " AIO read support."; - - return(false); - } - } - - struct io_event io_event; - - memset(&io_event, 0x0, sizeof(io_event)); - - byte* buf = static_cast<byte*>(ut_malloc_nokey(srv_page_size * 2)); - byte* ptr = static_cast<byte*>(ut_align(buf, srv_page_size)); - - struct iocb iocb; - - /* Suppress valgrind warning. */ - memset(buf, 0x00, srv_page_size * 2); - memset(&iocb, 0x0, sizeof(iocb)); - - struct iocb* p_iocb = &iocb; - - if (!srv_read_only_mode) { - - io_prep_pwrite(p_iocb, fd, ptr, srv_page_size, 0); - - } else { - ut_a(srv_page_size >= 512); - io_prep_pread(p_iocb, fd, ptr, 512, 0); - } - - int err = io_submit(io_ctx, 1, &p_iocb); - srv_stats.buffered_aio_submitted.inc(); - - if (err >= 1) { - /* Now collect the submitted IO request. */ - err = io_getevents(io_ctx, 1, 1, &io_event, NULL); - } - - ut_free(buf); - my_close(fd, MYF(MY_WME)); - - switch (err) { - case 1: - return(true); - - case -EINVAL: - case -ENOSYS: - ib::error() - << "Linux Native AIO not supported. You can either" - " move " - << (srv_read_only_mode ? name : "tmpdir") - << " to a file system that supports native" - " AIO or you can set innodb_use_native_aio to" - " FALSE to avoid this message."; - - /* fall through. */ - default: - ib::error() - << "Linux Native AIO check on " - << (srv_read_only_mode ? name : "tmpdir") - << "returned error[" << -err << "]"; - } - - return(false); -} - -#endif /* LINUX_NATIVE_AIO */ /** Retrieves the last error number if an error occurs in a file io function. The number should be retrieved before any other OS calls (because they may @@ -3283,127 +1828,6 @@ os_file_set_eof( #include <WinIoCtl.h> -/* -Windows : Handling synchronous IO on files opened asynchronously. - -If file is opened for asynchronous IO (FILE_FLAG_OVERLAPPED) and also bound to -a completion port, then every IO on this file would normally be enqueued to the -completion port. Sometimes however we would like to do a synchronous IO. This is -possible if we initialitze have overlapped.hEvent with a valid event and set its -lowest order bit to 1 (see MSDN ReadFile and WriteFile description for more info) - -We'll create this special event once for each thread and store in thread local -storage. -*/ - - -static void __stdcall win_free_syncio_event(void *data) { - if (data) { - CloseHandle((HANDLE)data); - } -} - - -/* -Retrieve per-thread event for doing synchronous io on asyncronously opened files -*/ -static HANDLE win_get_syncio_event() -{ - HANDLE h; - - h = (HANDLE)FlsGetValue(fls_sync_io); - if (h) { - return h; - } - h = CreateEventA(NULL, FALSE, FALSE, NULL); - ut_a(h); - /* Set low-order bit to keeps I/O completion from being queued */ - h = (HANDLE)((uintptr_t)h | 1); - FlsSetValue(fls_sync_io, h); - return h; -} - - -/** Do the read/write -@param[in] request The IO context and type -@return the number of bytes read/written or negative value on error */ -ssize_t -SyncFileIO::execute(const IORequest& request) -{ - OVERLAPPED seek; - - memset(&seek, 0x0, sizeof(seek)); - - seek.hEvent = win_get_syncio_event(); - seek.Offset = (DWORD) m_offset & 0xFFFFFFFF; - seek.OffsetHigh = (DWORD) (m_offset >> 32); - - BOOL ret; - DWORD n_bytes; - - if (request.is_read()) { - ret = ReadFile(m_fh, m_buf, - static_cast<DWORD>(m_n), NULL, &seek); - - } else { - ut_ad(request.is_write()); - ret = WriteFile(m_fh, m_buf, - static_cast<DWORD>(m_n), NULL, &seek); - } - if (ret || (GetLastError() == ERROR_IO_PENDING)) { - /* Wait for async io to complete */ - ret = GetOverlappedResult(m_fh, &seek, &n_bytes, TRUE); - } - - return(ret ? static_cast<ssize_t>(n_bytes) : -1); -} - -/** Do the read/write -@param[in,out] slot The IO slot, it has the IO context -@return the number of bytes read/written or negative value on error */ -ssize_t -SyncFileIO::execute(Slot* slot) -{ - BOOL ret; - slot->control.hEvent = win_get_syncio_event(); - if (slot->type.is_read()) { - - ret = ReadFile( - slot->file, slot->ptr, slot->len, - NULL, &slot->control); - - } else { - ut_ad(slot->type.is_write()); - - ret = WriteFile( - slot->file, slot->ptr, slot->len, - NULL, &slot->control); - - } - if (ret || (GetLastError() == ERROR_IO_PENDING)) { - /* Wait for async io to complete */ - ret = GetOverlappedResult(slot->file, &slot->control, &slot->n_bytes, TRUE); - } - - return(ret ? static_cast<ssize_t>(slot->n_bytes) : -1); -} - -/* Startup/shutdown */ - -struct WinIoInit -{ - WinIoInit() { - fls_sync_io = FlsAlloc(win_free_syncio_event); - ut_a(fls_sync_io != FLS_OUT_OF_INDEXES); - } - - ~WinIoInit() { - FlsFree(fls_sync_io); - } -}; - -/* Ensures proper initialization and shutdown */ -static WinIoInit win_io_init; /** Free storage space associated with a section of the file. @@ -4167,18 +2591,9 @@ os_file_create_func( } } - if (*success && srv_use_native_aio && (attributes & FILE_FLAG_OVERLAPPED)) { - /* Bind the file handle to completion port. Completion port - might not be created yet, in some stages of backup, but - must always be there for the server.*/ - HANDLE port = (type == OS_LOG_FILE) ? - log_completion_port : data_completion_port; - ut_a(port || srv_operation != SRV_OPERATION_NORMAL); - if (port) { - ut_a(CreateIoCompletionPort(file, port, 0, 0)); - } + if (*success && (attributes & FILE_FLAG_OVERLAPPED) && srv_thread_pool) { + srv_thread_pool->bind(file); } - return(file); } @@ -4438,15 +2853,18 @@ bool os_file_close_func( os_file_t file) { - ut_a(file); + ut_a(file != 0); - if (CloseHandle(file)) { - return(true); - } - os_file_handle_error(NULL, "close"); + if (!CloseHandle(file)) { + os_file_handle_error(NULL, "close"); + return false; + } - return(false); + if(srv_thread_pool) + srv_thread_pool->unbind(file); + + return(true); } /** Gets a file size. @@ -4943,17 +3361,19 @@ os_file_read_page( if (ulint(n_bytes) == n || (err != DB_SUCCESS && !exit_on_err)) { return err; } - - ib::error() << "Tried to read " << n << " bytes at offset " - << offset << ", but was only able to read " << n_bytes; + int os_err = IF_WIN((int)GetLastError(), errno); if (!os_file_handle_error_cond_exit( NULL, "read", exit_on_err, false)) { ib::fatal() - << "Cannot read from file. OS error number " - << errno << "."; + << "Tried to read " << n << " bytes at offset " + << offset << ", but was only able to read " << n_bytes + << ".Cannot read from file. OS error number " + << os_err << "."; + } else { + ib::error() << "Tried to read " << n << " bytes at offset " + << offset << ", but was only able to read " << n_bytes; } - if (err == DB_SUCCESS) { err = DB_IO_ERROR; } @@ -5478,979 +3898,201 @@ os_file_get_status( return(ret); } -/** -Waits for an AIO operation to complete. This function is used to wait the -for completed requests. The aio array of pending requests is divided -into segments. The thread specifies which segment or slot it wants to wait -for. NOTE: this function will also take care of freeing the aio slot, -therefore no other thread is allowed to do the freeing! -@param[in] segment The number of the segment in the aio arrays to - wait for; segment 0 is the ibuf I/O thread, - segment 1 the log I/O thread, then follow the - non-ibuf read threads, and as the last are the - non-ibuf write threads; if this is - ULINT_UNDEFINED, then it means that sync AIO - is used, and this parameter is ignored -@param[out] m1 the messages passed with the AIO request; note - that also in the case where the AIO operation - failed, these output parameters are valid and - can be used to restart the operation, - for example -@param[out] m2 callback message -@param[out] type OS_FILE_WRITE or ..._READ -@return DB_SUCCESS or error code */ -dberr_t -os_aio_handler( - ulint segment, - fil_node_t** m1, - void** m2, - IORequest* request) -{ - dberr_t err; - - if (srv_use_native_aio) { - srv_set_io_thread_op_info(segment, "native aio handle"); - -#ifdef WIN_ASYNC_IO - - err = os_aio_windows_handler(segment, 0, m1, m2, request); - -#elif defined(LINUX_NATIVE_AIO) - - err = os_aio_linux_handler(segment, m1, m2, request); - -#else - ut_error; - - err = DB_ERROR; /* Eliminate compiler warning */ - -#endif /* WIN_ASYNC_IO */ - - } else { - srv_set_io_thread_op_info(segment, "simulated aio handle"); - - err = os_aio_simulated_handler(segment, m1, m2, request); - } - - return(err); -} - -#ifdef WIN_ASYNC_IO -static HANDLE new_completion_port() -{ - HANDLE h = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0); - ut_a(h); - return h; -} -#endif - -/** Constructor -@param[in] id The latch ID -@param[in] n Number of AIO slots -@param[in] segments Number of segments */ -AIO::AIO( - latch_id_t id, - ulint n, - ulint segments) - : -#ifdef WIN_ASYNC_IO - m_completion_port(new_completion_port()), -#endif - m_slots(n), - m_n_segments(segments), - m_n_reserved() -# ifdef LINUX_NATIVE_AIO - ,m_aio_ctx(), - m_events(m_slots.size()) -# endif /* LINUX_NATIVE_AIO */ -{ - ut_a(n > 0); - ut_a(m_n_segments > 0); - - mutex_create(id, &m_mutex); - - m_not_full = os_event_create("aio_not_full"); - m_is_empty = os_event_create("aio_is_empty"); - - memset((void*)&m_slots[0], 0x0, sizeof(m_slots[0]) * m_slots.size()); -#ifdef LINUX_NATIVE_AIO - memset(&m_events[0], 0x0, sizeof(m_events[0]) * m_events.size()); -#endif /* LINUX_NATIVE_AIO */ - - os_event_set(m_is_empty); -} - -/** Initialise the slots */ -dberr_t -AIO::init_slots() -{ - for (ulint i = 0; i < m_slots.size(); ++i) { - Slot& slot = m_slots[i]; - - slot.pos = static_cast<uint16_t>(i); - - slot.is_reserved = false; -#ifdef WIN_ASYNC_IO - - slot.array = this; - -#elif defined(LINUX_NATIVE_AIO) - - slot.ret = 0; - - slot.n_bytes = 0; - - memset(&slot.control, 0x0, sizeof(slot.control)); - -#endif /* WIN_ASYNC_IO */ - } - - return(DB_SUCCESS); -} +extern void fil_aio_callback(const tpool::aiocb *cb); -#ifdef LINUX_NATIVE_AIO -/** Initialise the Linux Native AIO interface */ -dberr_t -AIO::init_linux_native_aio() +static void io_callback(tpool::aiocb* cb) { - /* Initialize the io_context array. One io_context - per segment in the array. */ - - ut_a(m_aio_ctx == NULL); - - m_aio_ctx = static_cast<io_context**>( - ut_zalloc_nokey(m_n_segments * sizeof(*m_aio_ctx))); - - if (m_aio_ctx == NULL) { - return(DB_OUT_OF_MEMORY); - } + fil_aio_callback(cb); - io_context** ctx = m_aio_ctx; - ulint max_events = slots_per_segment(); - - for (ulint i = 0; i < m_n_segments; ++i, ++ctx) { - - if (!linux_create_io_ctx(max_events, ctx)) { - /* If something bad happened during aio setup - we disable linux native aio. - The disadvantage will be a small memory leak - at shutdown but that's ok compared to a crash - or a not working server. - This frequently happens when running the test suite - with many threads on a system with low fs.aio-max-nr! - */ - - ib::warn() - << "Warning: Linux Native AIO disabled " - << "because _linux_create_io_ctx() " - << "failed. To get rid of this warning you can " - << "try increasing system " - << "fs.aio-max-nr to 1048576 or larger or " - << "setting innodb_use_native_aio = 0 in my.cnf"; - ut_free(m_aio_ctx); - m_aio_ctx = 0; - srv_use_native_aio = FALSE; - return(DB_SUCCESS); + /* Return cb back to cache*/ + if (cb->m_opcode == tpool::aio_opcode::AIO_PREAD) { + if (read_slots->contains(cb)) { + read_slots->release(cb); + } else { + ut_ad(ibuf_slots->contains(cb)); + ibuf_slots->release(cb); } + } else { + ut_ad(write_slots->contains(cb)); + write_slots->release(cb); } - - return(DB_SUCCESS); } -#endif /* LINUX_NATIVE_AIO */ - -/** Initialise the array */ -dberr_t -AIO::init() -{ - ut_a(!m_slots.empty()); - - if (srv_use_native_aio) { #ifdef LINUX_NATIVE_AIO - dberr_t err = init_linux_native_aio(); - - if (err != DB_SUCCESS) { - return(err); - } - -#endif /* LINUX_NATIVE_AIO */ - } - - return(init_slots()); -} - -/** Creates an aio wait array. Note that we return NULL in case of failure. -We don't care about freeing memory here because we assume that a -failure will result in server refusing to start up. -@param[in] id Latch ID -@param[in] n maximum number of pending AIO operations - allowed; n must be divisible by m_n_segments -@param[in] n_segments number of segments in the AIO array -@return own: AIO array, NULL on failure */ -AIO* -AIO::create( - latch_id_t id, - ulint n, - ulint n_segments) -{ - if ((n % n_segments)) { - - ib::error() - << "Maximum number of AIO operations must be " - << "divisible by number of segments"; - - return(NULL); - } - - AIO* array = UT_NEW_NOKEY(AIO(id, n, n_segments)); - - if (array != NULL && array->init() != DB_SUCCESS) { - - UT_DELETE(array); - - array = NULL; - } - - return(array); -} - -/** AIO destructor */ -AIO::~AIO() -{ - mutex_destroy(&m_mutex); - - os_event_destroy(m_not_full); - os_event_destroy(m_is_empty); - -#if defined(LINUX_NATIVE_AIO) - if (srv_use_native_aio) { - m_events.clear(); - ut_free(m_aio_ctx); - } -#endif /* LINUX_NATIVE_AIO */ -#if defined(WIN_ASYNC_IO) - CloseHandle(m_completion_port); -#endif - - m_slots.clear(); -} +/** Checks if the system supports native linux aio. On some kernel +versions where native aio is supported it won't work on tmpfs. In such +cases we can't use native aio. -/** Initializes the asynchronous io system. Creates one array each for ibuf -and log i/o. Also creates one array each for read and write where each -array is divided logically into n_readers and n_writers -respectively. The caller must create an i/o handler thread for each -segment in these arrays. This function also creates the sync array. -No i/o handler thread needs to be created for that -@param[in] n_per_seg maximum number of pending aio - operations allowed per segment -@param[in] n_readers number of reader threads -@param[in] n_writers number of writer threads -@param[in] n_slots_sync number of slots in the sync aio array -@return true if the AIO sub-system was started successfully */ -bool -AIO::start( - ulint n_per_seg, - ulint n_readers, - ulint n_writers, - ulint n_slots_sync) +@return: true if supported, false otherwise. */ +#include <libaio.h> +static bool is_linux_native_aio_supported() { -#if defined(LINUX_NATIVE_AIO) - /* Check if native aio is supported on this system and tmpfs */ - if (srv_use_native_aio && !is_linux_native_aio_supported()) { - - ib::warn() << "Linux Native AIO disabled."; - - srv_use_native_aio = FALSE; - } -#endif /* LINUX_NATIVE_AIO */ + File fd; + io_context_t io_ctx; + char name[1000]; - srv_reset_io_thread_op_info(); + memset(&io_ctx, 0, sizeof(io_ctx)); + if (io_setup(1, &io_ctx)) { - s_reads = create( - LATCH_ID_OS_AIO_READ_MUTEX, n_readers * n_per_seg, n_readers); + /* The platform does not support native aio. */ - if (s_reads == NULL) { return(false); - } - - ulint start = srv_read_only_mode ? 0 : 2; - ulint n_segs = n_readers + start; - /* 0 is the ibuf segment and 1 is the redo log segment. */ - for (ulint i = start; i < n_segs; ++i) { - ut_a(i < SRV_MAX_N_IO_THREADS); - srv_io_thread_function[i] = "read thread"; } + else if (!srv_read_only_mode) { - ulint n_segments = n_readers; - - if (!srv_read_only_mode) { - - s_ibuf = create(LATCH_ID_OS_AIO_IBUF_MUTEX, n_per_seg, 1); - - if (s_ibuf == NULL) { - return(false); - } - - ++n_segments; - - srv_io_thread_function[0] = "insert buffer thread"; + /* Now check if tmpdir supports native aio ops. */ + fd = mysql_tmpfile("ib"); - s_log = create(LATCH_ID_OS_AIO_LOG_MUTEX, n_per_seg, 1); + if (fd < 0) { + ib::warn() + << "Unable to create temp file to check" + " native AIO support."; - if (s_log == NULL) { return(false); } - - ++n_segments; - - srv_io_thread_function[1] = "log thread"; - - } else { - s_ibuf = s_log = NULL; - } - - s_writes = create( - LATCH_ID_OS_AIO_WRITE_MUTEX, n_writers * n_per_seg, n_writers); - - if (s_writes == NULL) { - return(false); } + else { -#ifdef WIN_ASYNC_IO - data_completion_port = s_writes->m_completion_port; - log_completion_port = - s_log ? s_log->m_completion_port : data_completion_port; -#endif - - n_segments += n_writers; - - for (ulint i = start + n_readers; i < n_segments; ++i) { - ut_a(i < SRV_MAX_N_IO_THREADS); - srv_io_thread_function[i] = "write thread"; - } - - ut_ad(n_segments >= static_cast<ulint>(srv_read_only_mode ? 2 : 4)); - - s_sync = create(LATCH_ID_OS_AIO_SYNC_MUTEX, n_slots_sync, 1); - - if (s_sync == NULL) { - - return(false); - } - - os_aio_n_segments = n_segments; - - os_aio_validate(); - - os_last_printout = time(NULL); - - if (srv_use_native_aio) { - return(true); - } - - os_aio_segment_wait_events = static_cast<os_event_t*>( - ut_zalloc_nokey( - n_segments * sizeof *os_aio_segment_wait_events)); - - if (os_aio_segment_wait_events == NULL) { - - return(false); - } - - for (ulint i = 0; i < n_segments; ++i) { - os_aio_segment_wait_events[i] = os_event_create(0); - } - - return(true); -} - -/** Free the AIO arrays */ -void -AIO::shutdown() -{ - UT_DELETE(s_ibuf); - s_ibuf = NULL; - - UT_DELETE(s_log); - s_log = NULL; - - UT_DELETE(s_writes); - s_writes = NULL; - - UT_DELETE(s_sync); - s_sync = NULL; - - UT_DELETE(s_reads); - s_reads = NULL; -} + os_normalize_path(srv_log_group_home_dir); -/** Initializes the asynchronous io system. Creates one array each for ibuf -and log i/o. Also creates one array each for read and write where each -array is divided logically into n_readers and n_writers -respectively. The caller must create an i/o handler thread for each -segment in these arrays. This function also creates the sync array. -No i/o handler thread needs to be created for that -@param[in] n_readers number of reader threads -@param[in] n_writers number of writer threads -@param[in] n_slots_sync number of slots in the sync aio array */ -bool -os_aio_init( - ulint n_readers, - ulint n_writers, - ulint n_slots_sync) -{ - /* Maximum number of pending aio operations allowed per segment */ - ulint limit = 8 * OS_AIO_N_PENDING_IOS_PER_THREAD; + ulint dirnamelen = strlen(srv_log_group_home_dir); - return(AIO::start(limit, n_readers, n_writers, n_slots_sync)); -} + ut_a(dirnamelen < (sizeof name) - 10 - sizeof "ib_logfile"); -/** Frees the asynchronous io system. */ -void -os_aio_free() -{ - AIO::shutdown(); + memcpy(name, srv_log_group_home_dir, dirnamelen); - ut_ad(!os_aio_segment_wait_events || !srv_use_native_aio); - ut_ad(srv_use_native_aio || os_aio_segment_wait_events - || !srv_was_started); + /* Add a path separator if needed. */ + if (dirnamelen && name[dirnamelen - 1] != OS_PATH_SEPARATOR) { - if (!srv_use_native_aio && os_aio_segment_wait_events) { - for (ulint i = 0; i < os_aio_n_segments; i++) { - os_event_destroy(os_aio_segment_wait_events[i]); + name[dirnamelen++] = OS_PATH_SEPARATOR; } - ut_free(os_aio_segment_wait_events); - os_aio_segment_wait_events = 0; - } - os_aio_n_segments = 0; -} - -/** Wakes up all async i/o threads so that they know to exit themselves in -shutdown. */ -void -os_aio_wake_all_threads_at_shutdown() -{ -#ifdef WIN_ASYNC_IO - AIO::wake_at_shutdown(); -#elif defined(LINUX_NATIVE_AIO) - /* When using native AIO interface the io helper threads - wait on io_getevents with a timeout value of 500ms. At - each wake up these threads check the server status. - No need to do anything to wake them up. */ -#endif /* !WIN_ASYNC_AIO */ - - if (srv_use_native_aio) { - return; - } - - /* This loop wakes up all simulated ai/o threads */ - - for (ulint i = 0; i < os_aio_n_segments; ++i) { - - os_event_set(os_aio_segment_wait_events[i]); - } -} - -/** Waits until there are no pending writes in AIO::s_writes. There can -be other, synchronous, pending writes. */ -void -os_aio_wait_until_no_pending_writes() -{ - AIO::wait_until_no_pending_writes(); -} - -/** Calculates segment number for a slot. -@param[in] array AIO wait array -@param[in] slot slot in this array -@return segment number (which is the number used by, for example, - I/O-handler threads) */ -ulint -AIO::get_segment_no_from_slot( - const AIO* array, - const Slot* slot) -{ - ulint segment; - ulint seg_len; - - if (array == s_ibuf) { - ut_ad(!srv_read_only_mode); - - segment = IO_IBUF_SEGMENT; - - } else if (array == s_log) { - ut_ad(!srv_read_only_mode); - - segment = IO_LOG_SEGMENT; - - } else if (array == s_reads) { - seg_len = s_reads->slots_per_segment(); - - segment = (srv_read_only_mode ? 0 : 2) + slot->pos / seg_len; - } else { - ut_a(array == s_writes); - - seg_len = s_writes->slots_per_segment(); - - segment = s_reads->m_n_segments - + (srv_read_only_mode ? 0 : 2) + slot->pos / seg_len; - } - - return(segment); -} - -/** Requests for a slot in the aio array. If no slot is available, waits until -not_full-event becomes signaled. - -@param[in] type IO context -@param[in,out] m1 message to be passed along with the AIO - operation -@param[in,out] m2 message to be passed along with the AIO - operation -@param[in] file file handle -@param[in] name name of the file or path as a NUL-terminated - string -@param[in,out] buf buffer where to read or from which to write -@param[in] offset file offset, where to read from or start writing -@param[in] len length of the block to read or write -@return pointer to slot */ -Slot* -AIO::reserve_slot( - const IORequest& type, - fil_node_t* m1, - void* m2, - pfs_os_file_t file, - const char* name, - void* buf, - os_offset_t offset, - ulint len) -{ -#ifdef WIN_ASYNC_IO - ut_a((len & 0xFFFFFFFFUL) == len); -#endif /* WIN_ASYNC_IO */ - - /* No need of a mutex. Only reading constant fields */ - ulint slots_per_seg; - - ut_ad(type.validate()); - - slots_per_seg = slots_per_segment(); - - /* We attempt to keep adjacent blocks in the same local - segment. This can help in merging IO requests when we are - doing simulated AIO */ - ulint local_seg; - - local_seg = (offset >> (srv_page_size_shift + 6)) % m_n_segments; - - for (;;) { - - acquire(); + strcpy(name + dirnamelen, "ib_logfile0"); - if (m_n_reserved != m_slots.size()) { - break; - } + fd = my_open(name, O_RDONLY | O_CLOEXEC, MYF(0)); - release(); + if (fd == -1) { - if (!srv_use_native_aio) { - /* If the handler threads are suspended, - wake them so that we get more slots */ + ib::warn() + << "Unable to open" + << " \"" << name << "\" to check native" + << " AIO read support."; - os_aio_simulated_wake_handler_threads(); + return(false); } - - os_event_wait(m_not_full); } - ulint counter = 0; - Slot* slot = NULL; + struct io_event io_event; - /* We start our search for an available slot from our preferred - local segment and do a full scan of the array. We are - guaranteed to find a slot in full scan. */ - for (ulint i = local_seg * slots_per_seg; - counter < m_slots.size(); - ++i, ++counter) { + memset(&io_event, 0x0, sizeof(io_event)); - i %= m_slots.size(); + byte* buf = static_cast<byte*>(ut_malloc_nokey(srv_page_size * 2)); + byte* ptr = static_cast<byte*>(ut_align(buf, srv_page_size)); - slot = at(i); + struct iocb iocb; - if (slot->is_reserved == false) { - break; - } - } + /* Suppress valgrind warning. */ + memset(buf, 0x00, srv_page_size * 2); + memset(&iocb, 0x0, sizeof(iocb)); - /* We MUST always be able to get hold of a reserved slot. */ - ut_a(counter < m_slots.size()); + struct iocb* p_iocb = &iocb; - ut_a(slot->is_reserved == false); + if (!srv_read_only_mode) { - ++m_n_reserved; + io_prep_pwrite(p_iocb, fd, ptr, srv_page_size, 0); - if (m_n_reserved == 1) { - os_event_reset(m_is_empty); } - - if (m_n_reserved == m_slots.size()) { - os_event_reset(m_not_full); + else { + ut_a(srv_page_size >= 512); + io_prep_pread(p_iocb, fd, ptr, 512, 0); } - slot->is_reserved = true; - slot->reservation_time = time(NULL); - slot->m1 = m1; - slot->m2 = m2; - slot->file = file; - slot->name = name; -#ifdef _WIN32 - slot->len = static_cast<DWORD>(len); -#else - slot->len = len; -#endif /* _WIN32 */ - slot->type = type; - slot->buf = static_cast<byte*>(buf); - slot->ptr = slot->buf; - slot->offset = offset; - slot->err = DB_SUCCESS; - slot->original_len = static_cast<uint32>(len); - slot->io_already_done = false; - slot->buf = static_cast<byte*>(buf); - -#ifdef WIN_ASYNC_IO - { - OVERLAPPED* control; + int err = io_submit(io_ctx, 1, &p_iocb); + srv_stats.buffered_aio_submitted.inc(); - control = &slot->control; - control->Offset = (DWORD) offset & 0xFFFFFFFF; - control->OffsetHigh = (DWORD) (offset >> 32); + if (err >= 1) { + /* Now collect the submitted IO request. */ + err = io_getevents(io_ctx, 1, 1, &io_event, NULL); } -#elif defined(LINUX_NATIVE_AIO) - - /* If we are not using native AIO skip this part. */ - if (srv_use_native_aio) { - - off_t aio_offset; - - /* Check if we are dealing with 64 bit arch. - If not then make sure that offset fits in 32 bits. */ - aio_offset = (off_t) offset; - - ut_a(sizeof(aio_offset) >= sizeof(offset) - || ((os_offset_t) aio_offset) == offset); - - struct iocb* iocb = &slot->control; - if (type.is_read()) { - - io_prep_pread( - iocb, file, slot->ptr, slot->len, aio_offset); - } else { - ut_ad(type.is_write()); + ut_free(buf); + my_close(fd, MYF(MY_WME)); - io_prep_pwrite( - iocb, file, slot->ptr, slot->len, aio_offset); - } + switch (err) { + case 1: + return(true); - iocb->data = slot; + case -EINVAL: + case -ENOSYS: + ib::error() + << "Linux Native AIO not supported. You can either" + " move " + << (srv_read_only_mode ? name : "tmpdir") + << " to a file system that supports native" + " AIO or you can set innodb_use_native_aio to" + " FALSE to avoid this message."; - slot->n_bytes = 0; - slot->ret = 0; + /* fall through. */ + default: + ib::error() + << "Linux Native AIO check on " + << (srv_read_only_mode ? name : "tmpdir") + << "returned error[" << -err << "]"; } -#endif /* LINUX_NATIVE_AIO */ - - release(); - return(slot); + return(false); } +#endif -/** Wakes up a simulated aio i/o-handler thread if it has something to do. -@param[in] global_segment The number of the segment in the AIO arrays */ -void -AIO::wake_simulated_handler_thread(ulint global_segment) -{ - ut_ad(!srv_use_native_aio); - - AIO* array; - ulint segment = get_array_and_local_segment(&array, global_segment); - array->wake_simulated_handler_thread(global_segment, segment); -} -/** Wakes up a simulated AIO I/O-handler thread if it has something to do -for a local segment in the AIO array. -@param[in] global_segment The number of the segment in the AIO arrays -@param[in] segment The local segment in the AIO array */ -void -AIO::wake_simulated_handler_thread(ulint global_segment, ulint segment) +bool os_aio_init(ulint n_reader_threads, ulint n_writer_threads, ulint) { - ut_ad(!srv_use_native_aio); - - ulint n = slots_per_segment(); - ulint offset = segment * n; + int max_write_events = (int)n_writer_threads * OS_AIO_N_PENDING_IOS_PER_THREAD; + int max_read_events = (int)n_reader_threads * OS_AIO_N_PENDING_IOS_PER_THREAD; + int max_ibuf_events = 1 * OS_AIO_N_PENDING_IOS_PER_THREAD; + int max_events = max_read_events + max_write_events + max_ibuf_events; + int ret; - /* Look through n slots after the segment * n'th slot */ - - acquire(); - - const Slot* slot = at(offset); - - for (ulint i = 0; i < n; ++i, ++slot) { - - if (slot->is_reserved) { - - /* Found an i/o request */ - - release(); - - os_event_t event; - - event = os_aio_segment_wait_events[global_segment]; - - os_event_set(event); - - return; - } +#if LINUX_NATIVE_AIO + if (srv_use_native_aio && !is_linux_native_aio_supported()) + srv_use_native_aio = false; +#endif + ret = srv_thread_pool->configure_aio(srv_use_native_aio, max_events); + if(ret) { + ut_a(srv_use_native_aio); + srv_use_native_aio = false; +#ifdef LINUX_NATIVE_AIO + ib::info() << "Linux native AIO disabled"; +#endif + ret = srv_thread_pool->configure_aio(srv_use_native_aio, max_events); + DBUG_ASSERT(!ret); } - - release(); + read_slots = new io_slots(max_read_events, (uint)n_reader_threads); + write_slots = new io_slots(max_write_events, (uint)n_writer_threads); + ibuf_slots = new io_slots(max_ibuf_events, 1); + return true; } -/** Wakes up simulated aio i/o-handler threads if they have something to do. */ -void -os_aio_simulated_wake_handler_threads() +void os_aio_free(void) { - if (srv_use_native_aio) { - /* We do not use simulated aio: do nothing */ - - return; - } - - os_aio_recommend_sleep_for_read_threads = false; - - for (ulint i = 0; i < os_aio_n_segments; i++) { - AIO::wake_simulated_handler_thread(i); - } + srv_thread_pool->disable_aio(); + delete read_slots; + delete write_slots; + delete ibuf_slots; } -/** Select the IO slot array -@param[in,out] type Type of IO, READ or WRITE -@param[in] read_only true if running in read-only mode -@param[in] mode IO mode -@return slot array or NULL if invalid mode specified */ -AIO* -AIO::select_slot_array(IORequest& type, bool read_only, ulint mode) +/** Waits until there are no pending writes. There can +be other, synchronous, pending writes. */ +void +os_aio_wait_until_no_pending_writes() { - AIO* array; - - ut_ad(type.validate()); - - switch (mode) { - case OS_AIO_NORMAL: - - array = type.is_read() ? AIO::s_reads : AIO::s_writes; - break; - - case OS_AIO_IBUF: - ut_ad(type.is_read()); - - /* Reduce probability of deadlock bugs in connection with ibuf: - do not let the ibuf i/o handler sleep */ - - type.clear_do_not_wake(); - - array = read_only ? AIO::s_reads : AIO::s_ibuf; - break; - - case OS_AIO_LOG: - - array = read_only ? AIO::s_reads : AIO::s_log; - break; - - case OS_AIO_SYNC: - - array = AIO::s_sync; -#if defined(LINUX_NATIVE_AIO) - /* In Linux native AIO we don't use sync IO array. */ - ut_a(!srv_use_native_aio); -#endif /* LINUX_NATIVE_AIO */ - break; - - default: - ut_error; - array = NULL; /* Eliminate compiler warning */ - } - - return(array); + write_slots->wait(); } -#ifdef WIN_ASYNC_IO -/** This function is only used in Windows asynchronous i/o. -Waits for an aio operation to complete. This function is used to wait the -for completed requests. The aio array of pending requests is divided -into segments. The thread specifies which segment or slot it wants to wait -for. NOTE: this function will also take care of freeing the aio slot, -therefore no other thread is allowed to do the freeing! -@param[in] segment The number of the segment in the aio arrays to - wait for; segment 0 is the ibuf I/O thread, - segment 1 the log I/O thread, then follow the - non-ibuf read threads, and as the last are the - non-ibuf write threads; if this is - ULINT_UNDEFINED, then it means that sync AIO - is used, and this parameter is ignored -@param[in] pos this parameter is used only in sync AIO: - wait for the aio slot at this position -@param[out] m1 the messages passed with the AIO request; note - that also in the case where the AIO operation - failed, these output parameters are valid and - can be used to restart the operation, - for example -@param[out] m2 callback message -@param[out] type OS_FILE_WRITE or ..._READ -@return DB_SUCCESS or error code */ - - - -static -dberr_t -os_aio_windows_handler( - ulint segment, - ulint pos, - fil_node_t** m1, - void** m2, - IORequest* type) -{ - Slot* slot= 0; - dberr_t err; - - BOOL ret; - ULONG_PTR key; - - ut_a(segment != ULINT_UNDEFINED); - - /* NOTE! We only access constant fields in os_aio_array. Therefore - we do not have to acquire the protecting mutex yet */ - - ut_ad(os_aio_validate_skip()); - AIO *my_array; - AIO::get_array_and_local_segment(&my_array, segment); - - HANDLE port = my_array->m_completion_port; - ut_ad(port); - for (;;) { - DWORD len; - ret = GetQueuedCompletionStatus(port, &len, &key, - (OVERLAPPED **)&slot, INFINITE); - - /* If shutdown key was received, repost the shutdown message and exit */ - if (ret && key == IOCP_SHUTDOWN_KEY) { - PostQueuedCompletionStatus(port, 0, key, NULL); - *m1 = NULL; - *m2 = NULL; - return (DB_SUCCESS); - } - - ut_a(slot); - - if (!ret) { - /* IO failed */ - break; - } - - slot->n_bytes= len; - ut_a(slot->array); - HANDLE slot_port = slot->array->m_completion_port; - if (slot_port != port) { - /* there are no redirections between data and log */ - ut_ad(port == data_completion_port); - ut_ad(slot_port != log_completion_port); - - /* - Redirect completions to the dedicated completion port - and threads. - - "Write array" threads receive write,read and ibuf - notifications, read and ibuf completions are redirected. - - Forwarding IO completion this way costs a context switch, - and this seems tolerable since asynchronous reads are by - far less frequent. - */ - ut_a(PostQueuedCompletionStatus(slot_port, - len, key, &slot->control)); - } - else { - break; - } - } - - ut_a(slot->is_reserved); - - *m1 = slot->m1; - *m2 = slot->m2; - - *type = slot->type; - - bool retry = false; - - if (ret && slot->n_bytes == slot->len) { - - err = DB_SUCCESS; - - } else if (os_file_handle_error(slot->name, "Windows aio")) { - - retry = true; - - } else { - - err = DB_IO_ERROR; - } - - - if (retry) { - /* Retry failed read/write operation synchronously. */ - -#ifdef UNIV_PFS_IO - /* This read/write does not go through os_file_read - and os_file_write APIs, need to register with - performance schema explicitly here. */ - PSI_file_locker_state state; - struct PSI_file_locker* locker = NULL; - - register_pfs_file_io_begin( - &state, locker, slot->file, slot->len, - slot->type.is_write() - ? PSI_FILE_WRITE : PSI_FILE_READ, __FILE__, __LINE__); -#endif /* UNIV_PFS_IO */ - - ut_a((slot->len & 0xFFFFFFFFUL) == slot->len); - - ssize_t n_bytes = SyncFileIO::execute(slot); - -#ifdef UNIV_PFS_IO - register_pfs_file_io_end(locker, slot->len); -#endif /* UNIV_PFS_IO */ - err = (n_bytes == slot->len) ? DB_SUCCESS : DB_IO_ERROR; - } - - if (err == DB_SUCCESS) { - err = AIOHandler::post_io_processing(slot); - } - - slot->array->release_with_mutex(slot); - - if (srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS - && !buf_page_cleaner_is_active - && os_aio_all_slots_free()) { - /* Last IO, wakeup other io threads */ - AIO::wake_at_shutdown(); - } - return(err); -} -#endif /* WIN_ASYNC_IO */ /** NOTE! Use the corresponding macro os_aio(), not directly this function! @@ -6485,14 +4127,10 @@ os_aio_func( fil_node_t* m1, void* m2) { -#ifdef WIN_ASYNC_IO - BOOL ret = TRUE; -#endif /* WIN_ASYNC_IO */ ut_ad(n > 0); ut_ad((n % OS_FILE_LOG_BLOCK_SIZE) == 0); ut_ad((offset % OS_FILE_LOG_BLOCK_SIZE) == 0); - ut_ad(os_aio_validate_skip()); #ifdef WIN_ASYNC_IO ut_ad((n & 0xFFFFFFFFUL) == n); @@ -6511,777 +4149,43 @@ os_aio_func( return(os_file_write_func(type, name, file, buf, offset, n)); } -try_again: - - AIO* array; - - array = AIO::select_slot_array(type, read_only, mode); - - Slot* slot; - - slot = array->reserve_slot(type, m1, m2, file, name, buf, offset, n); - if (type.is_read()) { - - - if (srv_use_native_aio) { - ++os_n_file_reads; - - os_bytes_read_since_printout += n; -#ifdef WIN_ASYNC_IO - ret = ReadFile( - file, slot->ptr, slot->len, - NULL, &slot->control); -#elif defined(LINUX_NATIVE_AIO) - if (!array->linux_dispatch(slot)) { - goto err_exit; - } -#endif /* WIN_ASYNC_IO */ - } else if (type.is_wake()) { - AIO::wake_simulated_handler_thread( - AIO::get_segment_no_from_slot(array, slot)); - } } else if (type.is_write()) { - - if (srv_use_native_aio) { ++os_n_file_writes; - -#ifdef WIN_ASYNC_IO - ret = WriteFile( - file, slot->ptr, slot->len, - NULL, &slot->control); -#elif defined(LINUX_NATIVE_AIO) - if (!array->linux_dispatch(slot)) { - goto err_exit; - } -#endif /* WIN_ASYNC_IO */ - - } else if (type.is_wake()) { - AIO::wake_simulated_handler_thread( - AIO::get_segment_no_from_slot(array, slot)); - } } else { ut_error; } -#ifdef WIN_ASYNC_IO - if (ret || (GetLastError() == ERROR_IO_PENDING)) { - /* aio completed or was queued successfully! */ - return(DB_SUCCESS); + compile_time_assert(sizeof(os_aio_userdata_t) <= tpool::MAX_AIO_USERDATA_LEN); + os_aio_userdata_t userdata{m1,type,m2}; + io_slots* slots; + if (type.is_read()) { + slots = mode == OS_AIO_IBUF?ibuf_slots: read_slots; + } else { + slots = write_slots; } + tpool::aiocb* cb = slots->acquire(); - goto err_exit; + cb->m_buffer = buf; + cb->m_callback = (tpool::callback_func)io_callback; + cb->m_group = slots->get_task_group(); + cb->m_fh = file.m_file; + cb->m_len = (int)n; + cb->m_offset = offset; + cb->m_opcode = type.is_read() ? tpool::aio_opcode::AIO_PREAD : tpool::aio_opcode::AIO_PWRITE; + memcpy(cb->m_userdata, &userdata, sizeof(userdata)); -#endif /* WIN_ASYNC_IO */ - - /* AIO request was queued successfully! */ - return(DB_SUCCESS); + if (!srv_thread_pool->submit_io(cb)) + return DB_SUCCESS; -#if defined LINUX_NATIVE_AIO || defined WIN_ASYNC_IO -err_exit: -#endif /* LINUX_NATIVE_AIO || WIN_ASYNC_IO */ + slots->release(cb); - array->release_with_mutex(slot); - - if (os_file_handle_error( - name, type.is_read() ? "aio read" : "aio write")) { - - goto try_again; - } + os_file_handle_error(name, type.is_read() ? "aio read" : "aio write"); return(DB_IO_ERROR); } -/** Simulated AIO handler for reaping IO requests */ -class SimulatedAIOHandler { - -public: - - /** Constructor - @param[in,out] array The AIO array - @param[in] segment Local segment in the array */ - SimulatedAIOHandler(AIO* array, ulint segment) - : - m_oldest(), - m_n_elems(), - m_lowest_offset(IB_UINT64_MAX), - m_array(array), - m_n_slots(), - m_segment(segment), - m_ptr(), - m_buf() - { - ut_ad(m_segment < 100); - - m_slots.resize(OS_AIO_MERGE_N_CONSECUTIVE); - } - - /** Destructor */ - ~SimulatedAIOHandler() - { - if (m_ptr != NULL) { - ut_free(m_ptr); - } - } - - /** Reset the state of the handler - @param[in] n_slots Number of pending AIO operations supported */ - void init(ulint n_slots) - { - m_oldest = 0; - m_n_elems = 0; - m_n_slots = n_slots; - m_lowest_offset = IB_UINT64_MAX; - - if (m_ptr != NULL) { - ut_free(m_ptr); - m_ptr = m_buf = NULL; - } - - m_slots[0] = NULL; - } - - /** Check if there is a slot for which the i/o has already been done - @param[out] n_reserved Number of reserved slots - @return the first completed slot that is found. */ - Slot* check_completed(ulint* n_reserved) - { - ulint offset = m_segment * m_n_slots; - - *n_reserved = 0; - - Slot* slot; - - slot = m_array->at(offset); - - for (ulint i = 0; i < m_n_slots; ++i, ++slot) { - - if (slot->is_reserved) { - - if (slot->io_already_done) { - - ut_a(slot->is_reserved); - - return(slot); - } - - ++*n_reserved; - } - } - - return(NULL); - } - - /** If there are at least 2 seconds old requests, then pick the - oldest one to prevent starvation. If several requests have the - same age, then pick the one at the lowest offset. - @return true if request was selected */ - bool select() - { - if (!select_oldest()) { - - return(select_lowest_offset()); - } - - return(true); - } - - /** Check if there are several consecutive blocks - to read or write. Merge them if found. */ - void merge() - { - /* if m_n_elems != 0, then we have assigned - something valid to consecutive_ios[0] */ - ut_ad(m_n_elems != 0); - ut_ad(first_slot() != NULL); - - Slot* slot = first_slot(); - - while (!merge_adjacent(slot)) { - /* No op */ - } - } - - /** We have now collected n_consecutive I/O requests - in the array; allocate a single buffer which can hold - all data, and perform the I/O - @return the length of the buffer */ - ulint allocate_buffer() - MY_ATTRIBUTE((warn_unused_result)) - { - ulint len; - Slot* slot = first_slot(); - - ut_ad(m_ptr == NULL); - - if (slot->type.is_read() && m_n_elems > 1) { - - len = 0; - - for (ulint i = 0; i < m_n_elems; ++i) { - len += m_slots[i]->len; - } - - m_ptr = static_cast<byte*>( - ut_malloc_nokey(len + srv_page_size)); - - m_buf = static_cast<byte*>( - ut_align(m_ptr, srv_page_size)); - - } else { - len = first_slot()->len; - m_buf = first_slot()->buf; - } - - return(len); - } - - /** We have to compress the individual pages and punch - holes in them on a page by page basis when writing to - tables that can be compresed at the IO level. - @param[in] len Value returned by allocate_buffer */ - void copy_to_buffer(ulint len) - { - Slot* slot = first_slot(); - - if (len > slot->len && slot->type.is_write()) { - - byte* ptr = m_buf; - - ut_ad(ptr != slot->buf); - - /* Copy the buffers to the combined buffer */ - for (ulint i = 0; i < m_n_elems; ++i) { - - slot = m_slots[i]; - - memmove(ptr, slot->buf, slot->len); - - ptr += slot->len; - } - } - } - - /** Do the I/O with ordinary, synchronous i/o functions: - @param[in] len Length of buffer for IO */ - void io() - { - if (first_slot()->type.is_write()) { - - for (ulint i = 0; i < m_n_elems; ++i) { - write(m_slots[i]); - } - - } else { - - for (ulint i = 0; i < m_n_elems; ++i) { - read(m_slots[i]); - } - } - } - - /** Mark the i/os done in slots */ - void done() - { - for (ulint i = 0; i < m_n_elems; ++i) { - m_slots[i]->io_already_done = true; - } - } - - /** @return the first slot in the consecutive array */ - Slot* first_slot() - MY_ATTRIBUTE((warn_unused_result)) - { - ut_a(m_n_elems > 0); - - return(m_slots[0]); - } - - /** Wait for I/O requests - @param[in] global_segment The global segment - @param[in,out] event Wait on event if no active requests - @return the number of slots */ - ulint check_pending( - ulint global_segment, - os_event_t event) - MY_ATTRIBUTE((warn_unused_result)); -private: - - /** Do the file read - @param[in,out] slot Slot that has the IO context */ - void read(Slot* slot) - { - dberr_t err = os_file_read( - slot->type, - slot->file, - slot->ptr, - slot->offset, - slot->len); - - ut_a(err == DB_SUCCESS); - } - - /** Do the file read - @param[in,out] slot Slot that has the IO context */ - void write(Slot* slot) - { - dberr_t err = os_file_write( - slot->type, - slot->name, - slot->file, - slot->ptr, - slot->offset, - slot->len); - - ut_a(err == DB_SUCCESS); - } - - /** @return true if the slots are adjacent and can be merged */ - bool adjacent(const Slot* s1, const Slot* s2) const - { - return(s1 != s2 - && s1->file == s2->file - && s2->offset == s1->offset + s1->len - && s1->type == s2->type); - } - - /** @return true if merge limit reached or no adjacent slots found. */ - bool merge_adjacent(Slot*& current) - { - Slot* slot; - ulint offset = m_segment * m_n_slots; - - slot = m_array->at(offset); - - for (ulint i = 0; i < m_n_slots; ++i, ++slot) { - - if (slot->is_reserved && adjacent(current, slot)) { - - current = slot; - - /* Found a consecutive i/o request */ - - m_slots[m_n_elems] = slot; - - ++m_n_elems; - - return(m_n_elems >= m_slots.capacity()); - } - } - - return(true); - } - - /** There were no old requests. Look for an I/O request at the lowest - offset in the array (we ignore the high 32 bits of the offset in these - heuristics) */ - bool select_lowest_offset() - { - ut_ad(m_n_elems == 0); - - ulint offset = m_segment * m_n_slots; - - m_lowest_offset = IB_UINT64_MAX; - - for (ulint i = 0; i < m_n_slots; ++i) { - Slot* slot; - - slot = m_array->at(i + offset); - - if (slot->is_reserved - && slot->offset < m_lowest_offset) { - - /* Found an i/o request */ - m_slots[0] = slot; - - m_n_elems = 1; - - m_lowest_offset = slot->offset; - } - } - - return(m_n_elems > 0); - } - - /** Select the slot if it is older than the current oldest slot. - @param[in] slot The slot to check */ - void select_if_older(Slot* slot) - { - ulint age; - - age = (ulint) difftime(time(NULL), slot->reservation_time); - - if ((age >= 2 && age > m_oldest) - || (age >= 2 - && age == m_oldest - && slot->offset < m_lowest_offset)) { - - /* Found an i/o request */ - m_slots[0] = slot; - - m_n_elems = 1; - - m_oldest = age; - - m_lowest_offset = slot->offset; - } - } - - /** Select th oldest slot in the array - @return true if oldest slot found */ - bool select_oldest() - { - ut_ad(m_n_elems == 0); - - Slot* slot; - ulint offset = m_n_slots * m_segment; - - slot = m_array->at(offset); - - for (ulint i = 0; i < m_n_slots; ++i, ++slot) { - - if (slot->is_reserved) { - select_if_older(slot); - } - } - - return(m_n_elems > 0); - } - - typedef std::vector<Slot*> slots_t; - -private: - ulint m_oldest; - ulint m_n_elems; - os_offset_t m_lowest_offset; - - AIO* m_array; - ulint m_n_slots; - ulint m_segment; - - slots_t m_slots; - - byte* m_ptr; - byte* m_buf; -}; - -/** Wait for I/O requests -@return the number of slots */ -ulint -SimulatedAIOHandler::check_pending( - ulint global_segment, - os_event_t event) -{ - /* NOTE! We only access constant fields in os_aio_array. - Therefore we do not have to acquire the protecting mutex yet */ - - ut_ad(os_aio_validate_skip()); - - ut_ad(m_segment < m_array->get_n_segments()); - - /* Look through n slots after the segment * n'th slot */ - - if (AIO::is_read(m_array) - && os_aio_recommend_sleep_for_read_threads) { - - /* Give other threads chance to add several - I/Os to the array at once. */ - - srv_set_io_thread_op_info( - global_segment, "waiting for i/o request"); - - os_event_wait(event); - - return(0); - } - - return(m_array->slots_per_segment()); -} - -/** Does simulated AIO. This function should be called by an i/o-handler -thread. - -@param[in] segment The number of the segment in the aio arrays to wait - for; segment 0 is the ibuf i/o thread, segment 1 the - log i/o thread, then follow the non-ibuf read threads, - and as the last are the non-ibuf write threads -@param[out] m1 the messages passed with the AIO request; note that - also in the case where the AIO operation failed, these - output parameters are valid and can be used to restart - the operation, for example -@param[out] m2 Callback argument -@param[in] type IO context -@return DB_SUCCESS or error code */ -static -dberr_t -os_aio_simulated_handler( - ulint global_segment, - fil_node_t** m1, - void** m2, - IORequest* type) -{ - Slot* slot; - AIO* array; - ulint segment; - os_event_t event = os_aio_segment_wait_events[global_segment]; - - segment = AIO::get_array_and_local_segment(&array, global_segment); - - SimulatedAIOHandler handler(array, segment); - - for (;;) { - - srv_set_io_thread_op_info( - global_segment, "looking for i/o requests (a)"); - - ulint n_slots = handler.check_pending(global_segment, event); - - if (n_slots == 0) { - continue; - } - - handler.init(n_slots); - - srv_set_io_thread_op_info( - global_segment, "looking for i/o requests (b)"); - - array->acquire(); - - ulint n_reserved; - - slot = handler.check_completed(&n_reserved); - - if (slot != NULL) { - - break; - - } else if (n_reserved == 0 - && !buf_page_cleaner_is_active - && srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS) { - - /* There is no completed request. If there - are no pending request at all, and the system - is being shut down, exit. */ - - array->release(); - - *m1 = NULL; - - *m2 = NULL; - - return(DB_SUCCESS); - - } else if (handler.select()) { - - break; - } - - /* No I/O requested at the moment */ - - srv_set_io_thread_op_info( - global_segment, "resetting wait event"); - - /* We wait here until tbere are more IO requests - for this segment. */ - - os_event_reset(event); - - array->release(); - - srv_set_io_thread_op_info( - global_segment, "waiting for i/o request"); - - os_event_wait(event); - } - - /** Found a slot that has already completed its IO */ - - if (slot == NULL) { - /* Merge adjacent requests */ - handler.merge(); - - /* Check if there are several consecutive blocks - to read or write */ - - srv_set_io_thread_op_info( - global_segment, "consecutive i/o requests"); - - // Note: We don't support write combining for simulated AIO. - //ulint total_len = handler.allocate_buffer(); - - /* We release the array mutex for the time of the I/O: NOTE that - this assumes that there is just one i/o-handler thread serving - a single segment of slots! */ - - array->release(); - - // Note: We don't support write combining for simulated AIO. - //handler.copy_to_buffer(total_len); - - srv_set_io_thread_op_info(global_segment, "doing file i/o"); - - handler.io(); - - srv_set_io_thread_op_info(global_segment, "file i/o done"); - - array->acquire(); - - handler.done(); - - /* We return the messages for the first slot now, and if there - were several slots, the messages will be returned with - subsequent calls of this function */ - - slot = handler.first_slot(); - } - - ut_ad(slot->is_reserved); - - *m1 = slot->m1; - *m2 = slot->m2; - - *type = slot->type; - - array->release(slot); - - array->release(); - - return(DB_SUCCESS); -} - -/** Get the total number of pending IOs -@return the total number of pending IOs */ -ulint -AIO::total_pending_io_count() -{ - ulint count = s_reads->pending_io_count(); - - if (s_writes != NULL) { - count += s_writes->pending_io_count(); - } - - if (s_ibuf != NULL) { - count += s_ibuf->pending_io_count(); - } - - if (s_log != NULL) { - count += s_log->pending_io_count(); - } - - if (s_sync != NULL) { - count += s_sync->pending_io_count(); - } - - return(count); -} - -/** Validates the consistency the aio system. -@return true if ok */ -static -bool -os_aio_validate() -{ - /* The methods countds and validates, we ignore the count. */ - AIO::total_pending_io_count(); - - return(true); -} - -/** Prints pending IO requests per segment of an aio array. -We probably don't need per segment statistics but they can help us -during development phase to see if the IO requests are being -distributed as expected. -@param[in,out] file File where to print -@param[in] segments Pending IO array */ -void -AIO::print_segment_info( - FILE* file, - const ulint* segments) -{ - ut_ad(m_n_segments > 0); - - if (m_n_segments > 1) { - - fprintf(file, " ["); - - for (ulint i = 0; i < m_n_segments; ++i, ++segments) { - - if (i != 0) { - fprintf(file, ", "); - } - - fprintf(file, ULINTPF, *segments); - } - - fprintf(file, "] "); - } -} - -/** Prints info about the aio array. -@param[in,out] file Where to print */ -void -AIO::print(FILE* file) -{ - ulint count = 0; - ulint n_res_seg[SRV_MAX_N_IO_THREADS]; - - mutex_enter(&m_mutex); - - ut_a(!m_slots.empty()); - ut_a(m_n_segments > 0); - - memset(n_res_seg, 0x0, sizeof(n_res_seg)); - - for (ulint i = 0; i < m_slots.size(); ++i) { - Slot& slot = m_slots[i]; - ulint segment = (i * m_n_segments) / m_slots.size(); - - if (slot.is_reserved) { - - ++count; - - ++n_res_seg[segment]; - - ut_a(slot.len > 0); - } - } - - ut_a(m_n_reserved == count); - - print_segment_info(file, n_res_seg); - - mutex_exit(&m_mutex); -} - -/** Print all the AIO segments -@param[in,out] file Where to print */ -void -AIO::print_all(FILE* file) -{ - s_reads->print(file); - - if (s_writes != NULL) { - fputs(", aio writes:", file); - s_writes->print(file); - } - - if (s_ibuf != NULL) { - fputs(",\n ibuf aio reads:", file); - s_ibuf->print(file); - } - - if (s_log != NULL) { - fputs(", log i/o's:", file); - s_log->print(file); - } - - if (s_sync != NULL) { - fputs(", sync i/o's:", file); - s_sync->print(file); - } -} - /** Prints info of the aio arrays. @param[in,out] file file where to print */ void @@ -7297,19 +4201,11 @@ os_aio_print(FILE* file) srv_io_thread_op_info[i], srv_io_thread_function[i]); -#ifndef _WIN32 - if (!srv_use_native_aio - && os_event_is_set(os_aio_segment_wait_events[i])) { - fprintf(file, " ev set"); - } -#endif /* _WIN32 */ - fprintf(file, "\n"); } fputs("Pending normal aio reads:", file); - AIO::print_all(file); putc('\n', file); current_time = time(NULL); @@ -7381,82 +4277,6 @@ os_aio_refresh_stats() os_last_printout = time(NULL); } -/** Checks that all slots in the system have been freed, that is, there are -no pending io operations. -@return true if all free */ -bool -os_aio_all_slots_free() -{ - return(AIO::total_pending_io_count() == 0); -} - -#ifdef UNIV_DEBUG -/** Prints all pending IO for the array -@param[in] file file where to print -@param[in] array array to process */ -void -AIO::to_file(FILE* file) const -{ - acquire(); - - fprintf(file, " " ULINTPF "\n", m_n_reserved); - - for (ulint i = 0; i < m_slots.size(); ++i) { - - const Slot& slot = m_slots[i]; - - if (slot.is_reserved) { - - fprintf(file, - "%s IO for %s (offset=" UINT64PF - ", size=%lu)\n", - slot.type.is_read() ? "read" : "write", - slot.name, slot.offset, (unsigned long)(slot.len)); - } - } - - release(); -} - -/** Print pending IOs for all arrays */ -void -AIO::print_to_file(FILE* file) -{ - fprintf(file, "Pending normal aio reads:"); - - s_reads->to_file(file); - - if (s_writes != NULL) { - fprintf(file, "Pending normal aio writes:"); - s_writes->to_file(file); - } - - if (s_ibuf != NULL) { - fprintf(file, "Pending ibuf aio reads:"); - s_ibuf->to_file(file); - } - - if (s_log != NULL) { - fprintf(file, "Pending log i/o's:"); - s_log->to_file(file); - } - - if (s_sync != NULL) { - fprintf(file, "Pending sync i/o's:"); - s_sync->to_file(file); - } -} - -/** Prints all pending IO -@param[in] file File where to print */ -void -os_aio_print_pending_io( - FILE* file) -{ - AIO::print_to_file(file); -} - -#endif /* UNIV_DEBUG */ /** Set the file create umask diff --git a/storage/innobase/row/row0ftsort.cc b/storage/innobase/row/row0ftsort.cc index 45a5fee59ec..ca3de3e2af0 100644 --- a/storage/innobase/row/row0ftsort.cc +++ b/storage/innobase/row/row0ftsort.cc @@ -216,7 +216,6 @@ row_fts_psort_info_init( common_info->trx = trx; common_info->all_info = psort_info; common_info->sort_event = os_event_create(0); - common_info->merge_event = os_event_create(0); common_info->opt_doc_id_size = opt_doc_id_size; if (log_tmp_is_encrypted()) { @@ -350,7 +349,6 @@ row_fts_psort_info_destroy( } os_event_destroy(merge_info[0].psort_common->sort_event); - os_event_destroy(merge_info[0].psort_common->merge_event); ut_free(merge_info[0].psort_common->dup); ut_free(merge_info[0].psort_common); ut_free(psort_info); @@ -754,10 +752,9 @@ row_merge_fts_get_next_doc_item( /*********************************************************************//** Function performs parallel tokenization of the incoming doc strings. It also performs the initial in memory sort of the parsed records. -@return OS_THREAD_DUMMY_RETURN */ +*/ static -os_thread_ret_t -DECLARE_THREAD(fts_parallel_tokenization)( +void fts_parallel_tokenization( /*======================*/ void* arg) /*!< in: psort_info for the thread */ { @@ -1065,10 +1062,6 @@ func_exit: psort_info->child_status = FTS_CHILD_COMPLETE; os_event_set(psort_info->psort_common->sort_event); psort_info->child_status = FTS_CHILD_EXITING; - - os_thread_exit(); - - OS_THREAD_DUMMY_RETURN; } /*********************************************************************//** @@ -1079,23 +1072,20 @@ row_fts_start_psort( fts_psort_t* psort_info) /*!< parallel sort structure */ { ulint i = 0; - os_thread_id_t thd_id; for (i = 0; i < fts_sort_pll_degree; i++) { psort_info[i].psort_id = i; - psort_info[i].thread_hdl = - os_thread_create(fts_parallel_tokenization, - (void*) &psort_info[i], - &thd_id); + psort_info[i].task = + new tpool::waitable_task(fts_parallel_tokenization,&psort_info[i]); + srv_thread_pool->submit_task(psort_info[i].task); } } /*********************************************************************//** -Function performs the merge and insertion of the sorted records. -@return OS_THREAD_DUMMY_RETURN */ +Function performs the merge and insertion of the sorted records. */ static -os_thread_ret_t -DECLARE_THREAD(fts_parallel_merge)( +void +fts_parallel_merge( /*===============*/ void* arg) /*!< in: parallel merge info */ { @@ -1109,14 +1099,6 @@ DECLARE_THREAD(fts_parallel_merge)( row_fts_merge_insert(psort_info->psort_common->dup->index, psort_info->psort_common->new_table, psort_info->psort_common->all_info, id); - - psort_info->child_status = FTS_CHILD_COMPLETE; - os_event_set(psort_info->psort_common->merge_event); - psort_info->child_status = FTS_CHILD_EXITING; - - os_thread_exit(false); - - OS_THREAD_DUMMY_RETURN; } /*********************************************************************//** @@ -1128,15 +1110,15 @@ row_fts_start_parallel_merge( { ulint i = 0; - /* Kick off merge/insert threads */ + /* Kick off merge/insert tasks */ for (i = 0; i < FTS_NUM_AUX_INDEX; i++) { merge_info[i].psort_id = i; merge_info[i].child_status = 0; - merge_info[i].thread_hdl = os_thread_create( + merge_info[i].task = new tpool::waitable_task( fts_parallel_merge, - (void*) &merge_info[i], - &merge_info[i].thread_hdl); + (void*) &merge_info[i]); + srv_thread_pool->submit_task(merge_info[i].task); } } diff --git a/storage/innobase/row/row0merge.cc b/storage/innobase/row/row0merge.cc index dad2e060678..3718d34f7c7 100644 --- a/storage/innobase/row/row0merge.cc +++ b/storage/innobase/row/row0merge.cc @@ -2771,10 +2771,6 @@ all_done: DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Scan Table\n"); #endif if (fts_pll_sort) { - bool all_exit = false; - ulint trial_count = 0; - const ulint max_trial_count = 10000; - wait_again: /* Check if error occurs in child thread */ for (ulint j = 0; j < fts_sort_pll_degree; j++) { @@ -2807,27 +2803,9 @@ wait_again: } } - /* Now all children should complete, wait a bit until - they all finish setting the event, before we free everything. - This has a 10 second timeout */ - do { - all_exit = true; - - for (ulint j = 0; j < fts_sort_pll_degree; j++) { - if (psort_info[j].child_status - != FTS_CHILD_EXITING) { - all_exit = false; - os_thread_sleep(1000); - break; - } - } - trial_count++; - } while (!all_exit && trial_count < max_trial_count); - - if (!all_exit) { - ib::fatal() << "Not all child sort threads exited" - " when creating FTS index '" - << fts_sort_idx->name << "'"; + for (ulint j = 0; j < fts_sort_pll_degree; j++) { + psort_info[j].task->wait(); + delete psort_info[j].task; } } @@ -4109,7 +4087,7 @@ row_merge_file_create_low( File f = create_temp_file(filename, path, "ib", O_BINARY | O_SEQUENTIAL, MYF(MY_WME | MY_TEMPORARY)); - pfs_os_file_t fd = IF_WIN(my_get_osfhandle(f), f); + pfs_os_file_t fd = IF_WIN((os_file_t)my_get_osfhandle(f), f); #ifdef UNIV_PFS_IO register_pfs_file_open_end(locker, fd, @@ -4155,7 +4133,7 @@ row_merge_file_destroy_low( const pfs_os_file_t& fd) /*!< in: merge file descriptor */ { if (fd != OS_FILE_CLOSED) { - int res = mysql_file_close(IF_WIN(my_win_handle2File(fd), fd), + int res = mysql_file_close(IF_WIN(my_win_handle2File((os_file_t)fd), fd), MYF(MY_WME)); ut_a(res != -1); } @@ -4589,7 +4567,6 @@ row_merge_build_indexes( dict_index_t* fts_sort_idx = NULL; fts_psort_t* psort_info = NULL; fts_psort_t* merge_info = NULL; - int64_t sig_count = 0; bool fts_psort_initiated = false; double total_static_cost = 0; @@ -4756,65 +4733,14 @@ row_merge_build_indexes( } if (indexes[i]->type & DICT_FTS) { - os_event_t fts_parallel_merge_event; sort_idx = fts_sort_idx; - fts_parallel_merge_event - = merge_info[0].psort_common->merge_event; - if (FTS_PLL_MERGE) { - ulint trial_count = 0; - bool all_exit = false; - - os_event_reset(fts_parallel_merge_event); row_fts_start_parallel_merge(merge_info); -wait_again: - os_event_wait_time_low( - fts_parallel_merge_event, 1000000, - sig_count); - for (j = 0; j < FTS_NUM_AUX_INDEX; j++) { - if (merge_info[j].child_status - != FTS_CHILD_COMPLETE - && merge_info[j].child_status - != FTS_CHILD_EXITING) { - sig_count = os_event_reset( - fts_parallel_merge_event); - - goto wait_again; - } - } - - /* Now all children should complete, wait - a bit until they all finish using event */ - while (!all_exit && trial_count < 10000) { - all_exit = true; - - for (j = 0; j < FTS_NUM_AUX_INDEX; - j++) { - if (merge_info[j].child_status - != FTS_CHILD_EXITING) { - all_exit = false; - os_thread_sleep(1000); - break; - } - } - trial_count++; - } - - if (!all_exit) { - ib::error() << "Not all child merge" - " threads exited when creating" - " FTS index '" - << indexes[i]->name << "'"; - } else { - for (j = 0; j < FTS_NUM_AUX_INDEX; - j++) { - - os_thread_join(merge_info[j] - .thread_hdl); - } + merge_info[j].task->wait(); + delete merge_info[j].task; } } else { /* This cannot report duplicates; an diff --git a/storage/innobase/row/row0mysql.cc b/storage/innobase/row/row0mysql.cc index da11fa2f948..c6a61ff4bf7 100644 --- a/storage/innobase/row/row0mysql.cc +++ b/storage/innobase/row/row0mysql.cc @@ -3435,7 +3435,7 @@ row_drop_table_for_mysql( dict_stats_recalc_pool_del(table); dict_stats_defrag_pool_del(table, NULL); - if (btr_defragment_thread_active) { + if (btr_defragment_active) { /* During fts_drop_orphaned_tables() in recv_recovery_rollback_active() the btr_defragment_mutex has not yet been diff --git a/storage/innobase/row/row0purge.cc b/storage/innobase/row/row0purge.cc index 4dee8de5aad..41731ed17a0 100644 --- a/storage/innobase/row/row0purge.cc +++ b/storage/innobase/row/row0purge.cc @@ -1311,26 +1311,6 @@ row_purge_step( node->start(); -#ifdef UNIV_DEBUG - srv_slot_t *slot = thr->thread_slot; - ut_ad(slot); - - rw_lock_x_lock(&slot->debug_sync_lock); - while (UT_LIST_GET_LEN(slot->debug_sync)) { - srv_slot_t::debug_sync_t *sync = - UT_LIST_GET_FIRST(slot->debug_sync); - const char* sync_str = reinterpret_cast<char*>(&sync[1]); - bool result = debug_sync_set_action(current_thd, - sync_str, - strlen(sync_str)); - ut_a(!result); - - UT_LIST_REMOVE(slot->debug_sync, sync); - ut_free(sync); - } - rw_lock_x_unlock(&slot->debug_sync_lock); -#endif - if (!(node->undo_recs == NULL || ib_vector_is_empty(node->undo_recs))) { trx_purge_rec_t*purge_rec; diff --git a/storage/innobase/srv/srv0srv.cc b/storage/innobase/srv/srv0srv.cc index 52b4f6ef921..c4e20c973a0 100644 --- a/storage/innobase/srv/srv0srv.cc +++ b/storage/innobase/srv/srv0srv.cc @@ -74,6 +74,7 @@ Created 10/8/1995 Heikki Tuuri #include "fil0pagecompress.h" #include "btr0scrub.h" + #include <my_service_manager.h> /* The following is the maximum allowed duration of a lock wait. */ @@ -83,12 +84,6 @@ UNIV_INTERN ulong srv_fatal_semaphore_wait_threshold = DEFAULT_SRV_FATAL_SEMAPH in microseconds, in order to reduce the lagging of the purge thread. */ ulint srv_dml_needed_delay; -bool srv_monitor_active; -bool srv_error_monitor_active; -bool srv_buf_dump_thread_active; -bool srv_dict_stats_thread_active; -bool srv_buf_resize_thread_active; - my_bool srv_scrub_log; const char* srv_main_thread_op_info = ""; @@ -195,6 +190,9 @@ my_bool srv_adaptive_flushing; /** innodb_flush_sync; whether to ignore io_capacity at log checkpoints */ my_bool srv_flush_sync; +/** common thread pool*/ +tpool::thread_pool* srv_thread_pool; + /** Maximum number of times allowed to conditionally acquire mutex before switching to blocking wait on the mutex */ #define MAX_MUTEX_NOWAIT 20 @@ -448,7 +446,7 @@ FILE* srv_misc_tmpfile; static ulint srv_main_thread_process_no; static ulint srv_main_thread_id; -/* The following counts are used by the srv_master_thread. */ +/* The following counts are used by the srv_master_callback. */ /** Iterations of the loop bounded by 'srv_active' label. */ ulint srv_main_active_loops; @@ -574,23 +572,6 @@ struct srv_sys_t{ ib_mutex_t mutex; /*!< variable protecting the fields below. */ - ulint n_sys_threads; /*!< size of the sys_threads - array */ - - srv_slot_t - sys_threads[srv_max_purge_threads + 1]; /*!< server thread table; - os_event_set() and - os_event_reset() on - sys_threads[]->event are - covered by srv_sys_t::mutex */ - - Atomic_counter<ulint> - n_threads_active[SRV_MASTER + 1]; - /*!< number of threads active - in a thread class; protected - by both std::atomic and - mutex */ - srv_stats_t::ulint_ctr_1_t activity_count; /*!< For tracking server activity */ @@ -598,26 +579,35 @@ struct srv_sys_t{ static srv_sys_t srv_sys; +/* + Structure shared by timer and coordinator_callback. + No protection necessary since timer and task never run + in parallel (being in the same task group of size 1). +*/ +struct purge_coordinator_state +{ + /* Snapshot of the last history length before the purge call.*/ + uint32 m_history_length; + Atomic_counter<int> m_running; + purge_coordinator_state() : + m_history_length(), m_running(0) + {} +}; + +static purge_coordinator_state purge_state; +extern tpool::waitable_task purge_coordinator_task; + /** @return whether the purge coordinator thread is active */ bool purge_sys_t::running() { - return srv_sys.n_threads_active[SRV_PURGE]; + return purge_coordinator_task.is_running(); } -/** Event to signal srv_monitor_thread. Not protected by a mutex. -Set after setting srv_print_innodb_monitor. */ -os_event_t srv_monitor_event; - -/** Event to signal the shutdown of srv_error_monitor_thread. -Not protected by a mutex. */ -os_event_t srv_error_event; -/** Event for waking up buf_dump_thread. Not protected by a mutex. -Set on shutdown or by buf_dump_start() or buf_load_start(). */ -os_event_t srv_buf_dump_event; +/** threadpool timer for srv_error_monitor_task(). */ +std::unique_ptr<tpool::timer> srv_error_monitor_timer; +std::unique_ptr<tpool::timer> srv_monitor_timer; -/** Event to signal the buffer pool resize thread */ -os_event_t srv_buf_resize_event; /** The buffer pool dump/load file name */ char* srv_buf_dump_filename; @@ -720,319 +710,62 @@ srv_reset_io_thread_op_info() } } -#ifdef UNIV_DEBUG -/*********************************************************************//** -Validates the type of a thread table slot. -@return TRUE if ok */ -static -ibool -srv_thread_type_validate( -/*=====================*/ - srv_thread_type type) /*!< in: thread type */ -{ - switch (type) { - case SRV_NONE: - break; - case SRV_WORKER: - case SRV_PURGE: - case SRV_MASTER: - return(TRUE); - } - ut_error; - return(FALSE); -} -#endif /* UNIV_DEBUG */ -/*********************************************************************//** -Gets the type of a thread table slot. -@return thread type */ -static -srv_thread_type -srv_slot_get_type( -/*==============*/ - const srv_slot_t* slot) /*!< in: thread slot */ -{ - srv_thread_type type = slot->type; - ut_ad(srv_thread_type_validate(type)); - return(type); -} -/*********************************************************************//** -Reserves a slot in the thread table for the current thread. -@return reserved slot */ -static -srv_slot_t* -srv_reserve_slot( -/*=============*/ - srv_thread_type type) /*!< in: type of the thread */ +static void thread_pool_thread_init() { - srv_slot_t* slot = 0; - - srv_sys_mutex_enter(); - - ut_ad(srv_thread_type_validate(type)); - - switch (type) { - case SRV_MASTER: - slot = &srv_sys.sys_threads[SRV_MASTER_SLOT]; - break; - - case SRV_PURGE: - slot = &srv_sys.sys_threads[SRV_PURGE_SLOT]; - break; - - case SRV_WORKER: - /* Find an empty slot, skip the master and purge slots. */ - for (slot = &srv_sys.sys_threads[SRV_WORKER_SLOTS_START]; - slot->in_use; - ++slot) { - - ut_a(slot < &srv_sys.sys_threads[ - srv_sys.n_sys_threads]); - } - break; - - case SRV_NONE: - ut_error; - } - - ut_a(!slot->in_use); - - slot->in_use = TRUE; - slot->suspended = FALSE; - slot->type = type; - - ut_ad(srv_slot_get_type(slot) == type); - - srv_sys.n_threads_active[type]++; - - srv_sys_mutex_exit(); - - return(slot); + my_thread_init(); + pfs_register_thread(thread_pool_thread_key); } - -/*********************************************************************//** -Suspends the calling thread to wait for the event in its thread slot. -@return the current signal count of the event. */ -static -int64_t -srv_suspend_thread_low( -/*===================*/ - srv_slot_t* slot) /*!< in/out: thread slot */ +static void thread_pool_thread_end() { - ut_ad(!srv_read_only_mode); - ut_ad(mutex_own(&srv_sys.mutex)); - - ut_ad(slot->in_use); - - srv_thread_type type = srv_slot_get_type(slot); - - switch (type) { - case SRV_NONE: - ut_error; - - case SRV_MASTER: - /* We have only one master thread and it - should be the first entry always. */ - ut_a(srv_sys.n_threads_active[type] == 1); - break; - - case SRV_PURGE: - /* We have only one purge coordinator thread - and it should be the second entry always. */ - ut_a(srv_sys.n_threads_active[type] == 1); - break; - - case SRV_WORKER: - ut_a(srv_n_purge_threads > 1); - break; - } - - ut_a(!slot->suspended); - slot->suspended = TRUE; - - if (srv_sys.n_threads_active[type]-- == 0) { - ut_error; - } - - return(os_event_reset(slot->event)); + pfs_delete_thread(); + my_thread_end(); } -/*********************************************************************//** -Suspends the calling thread to wait for the event in its thread slot. -@return the current signal count of the event. */ -static -int64_t -srv_suspend_thread( -/*===============*/ - srv_slot_t* slot) /*!< in/out: thread slot */ -{ - srv_sys_mutex_enter(); - - int64_t sig_count = srv_suspend_thread_low(slot); - - srv_sys_mutex_exit(); - - return(sig_count); -} -/** Resume the calling thread. -@param[in,out] slot thread slot -@param[in] sig_count signal count (if wait) -@param[in] wait whether to wait for the event -@param[in] timeout_usec timeout in microseconds (0=infinite) -@return whether the wait timed out */ -static -bool -srv_resume_thread(srv_slot_t* slot, int64_t sig_count = 0, bool wait = true, - ulint timeout_usec = 0) +void srv_thread_pool_init() { - bool timeout; + DBUG_ASSERT(!srv_thread_pool); - ut_ad(!srv_read_only_mode); - ut_ad(slot->in_use); - ut_ad(slot->suspended); - - if (!wait) { - timeout = false; - } else if (timeout_usec) { - timeout = OS_SYNC_TIME_EXCEEDED == os_event_wait_time_low( - slot->event, timeout_usec, sig_count); - } else { - timeout = false; - os_event_wait_low(slot->event, sig_count); - } - - srv_sys_mutex_enter(); - ut_ad(slot->in_use); - ut_ad(slot->suspended); - - slot->suspended = FALSE; - srv_sys.n_threads_active[slot->type]++; - srv_sys_mutex_exit(); - return(timeout); +#if defined (_WIN32) + srv_thread_pool = tpool::create_thread_pool_win(); +#else + srv_thread_pool = tpool::create_thread_pool_generic(); +#endif + srv_thread_pool->set_thread_callbacks(thread_pool_thread_init, thread_pool_thread_end); } -/** Ensure that a given number of threads of the type given are running -(or are already terminated). -@param[in] type thread type -@param[in] n number of threads that have to run */ -void -srv_release_threads(enum srv_thread_type type, ulint n) -{ - ulint running; - - ut_ad(srv_thread_type_validate(type)); - ut_ad(n > 0); - - do { - running = 0; - - srv_sys_mutex_enter(); - for (ulint i = 0; i < srv_sys.n_sys_threads; i++) { - srv_slot_t* slot = &srv_sys.sys_threads[i]; - - if (!slot->in_use || srv_slot_get_type(slot) != type) { - continue; - } else if (!slot->suspended) { - if (++running >= n) { - break; - } - continue; - } - - switch (type) { - case SRV_NONE: - ut_error; - - case SRV_MASTER: - /* We have only one master thread and it - should be the first entry always. */ - ut_a(n == 1); - ut_a(i == SRV_MASTER_SLOT); - ut_a(srv_sys.n_threads_active[type] == 0); - break; - - case SRV_PURGE: - /* We have only one purge coordinator thread - and it should be the second entry always. */ - ut_a(n == 1); - ut_a(i == SRV_PURGE_SLOT); - ut_a(srv_n_purge_threads > 0); - ut_a(srv_sys.n_threads_active[type] == 0); - break; - - case SRV_WORKER: - ut_a(srv_n_purge_threads > 1); - ut_a(srv_sys.n_threads_active[type] - < srv_n_purge_threads - 1); - break; - } - - os_event_set(slot->event); - } - - srv_sys_mutex_exit(); - } while (running && running < n); -} - -/*********************************************************************//** -Release a thread's slot. */ -static -void -srv_free_slot( -/*==========*/ - srv_slot_t* slot) /*!< in/out: thread slot */ +void srv_thread_pool_end() { - srv_sys_mutex_enter(); - - /* Mark the thread as inactive. */ - srv_suspend_thread_low(slot); - /* Free the slot for reuse. */ - ut_ad(slot->in_use); - slot->in_use = FALSE; - - srv_sys_mutex_exit(); + ut_a(!srv_master_timer); + delete srv_thread_pool; + srv_thread_pool = nullptr; } +static bool need_srv_free; + /** Initialize the server. */ static void srv_init() { mutex_create(LATCH_ID_SRV_INNODB_MONITOR, &srv_innodb_monitor_mutex); - - srv_sys.n_sys_threads = srv_read_only_mode - ? 0 - : srv_n_purge_threads + 1/* purge coordinator */; + srv_thread_pool_init(); if (!srv_read_only_mode) { mutex_create(LATCH_ID_SRV_SYS, &srv_sys.mutex); mutex_create(LATCH_ID_SRV_SYS_TASKS, &srv_sys.tasks_mutex); - for (ulint i = 0; i < srv_sys.n_sys_threads; ++i) { - srv_slot_t* slot = &srv_sys.sys_threads[i]; - - slot->event = os_event_create(0); - - ut_a(slot->event); - } - - srv_error_event = os_event_create(0); - - srv_monitor_event = os_event_create(0); - - srv_buf_dump_event = os_event_create(0); buf_flush_event = os_event_create("buf_flush_event"); UT_LIST_INIT(srv_sys.tasks, &que_thr_t::queue); } - srv_buf_resize_event = os_event_create(0); - + need_srv_free = true; ut_d(srv_master_thread_disabled_event = os_event_create(0)); /* page_zip_stat_per_index_mutex is acquired from: @@ -1070,7 +803,7 @@ void srv_free(void) /*==========*/ { - if (!srv_buf_resize_event) { + if (!need_srv_free) { return; } @@ -1080,24 +813,15 @@ srv_free(void) if (!srv_read_only_mode) { mutex_free(&srv_sys.mutex); mutex_free(&srv_sys.tasks_mutex); - - for (ulint i = 0; i < srv_sys.n_sys_threads; ++i) { - os_event_destroy(srv_sys.sys_threads[i].event); - } - - os_event_destroy(srv_error_event); - os_event_destroy(srv_monitor_event); - os_event_destroy(srv_buf_dump_event); os_event_destroy(buf_flush_event); } - os_event_destroy(srv_buf_resize_event); - ut_d(os_event_destroy(srv_master_thread_disabled_event)); dict_ind_free(); trx_i_s_cache_free(trx_i_s_cache); + srv_thread_pool_end(); } /*********************************************************************//** @@ -1681,50 +1405,36 @@ srv_export_innodb_status(void) log_mutex_exit(); } -/*********************************************************************//** -A thread which prints the info output by various InnoDB monitors. -@return a dummy parameter */ -extern "C" -os_thread_ret_t -DECLARE_THREAD(srv_monitor_thread)(void*) +struct srv_monitor_state_t +{ + time_t last_monitor_time; + ulint mutex_skipped; + bool last_srv_print_monitor; + srv_monitor_state_t() + { + srv_last_monitor_time = time(NULL); + last_monitor_time = srv_last_monitor_time; + mutex_skipped = 0; + last_srv_print_monitor = false; + } +}; + +static srv_monitor_state_t monitor_state; + +/** A task which prints the info output by various InnoDB monitors.*/ +void srv_monitor_task(void*) { - int64_t sig_count; double time_elapsed; time_t current_time; - time_t last_monitor_time; - ulint mutex_skipped; - ibool last_srv_print_monitor; ut_ad(!srv_read_only_mode); -#ifdef UNIV_DEBUG_THREAD_CREATION - ib::info() << "Lock timeout thread starts, id " - << os_thread_pf(os_thread_get_curr_id()); -#endif /* UNIV_DEBUG_THREAD_CREATION */ - -#ifdef UNIV_PFS_THREAD - pfs_register_thread(srv_monitor_thread_key); -#endif /* UNIV_PFS_THREAD */ - - current_time = time(NULL); - srv_last_monitor_time = current_time; - last_monitor_time = current_time; - mutex_skipped = 0; - last_srv_print_monitor = srv_print_innodb_monitor; -loop: - /* Wake up every 5 seconds to see if we need to print - monitor information or if signalled at shutdown. */ - - sig_count = os_event_reset(srv_monitor_event); - - os_event_wait_time_low(srv_monitor_event, 5000000, sig_count); - current_time = time(NULL); - time_elapsed = difftime(current_time, last_monitor_time); + time_elapsed = difftime(current_time, monitor_state.last_monitor_time); if (time_elapsed > 15) { - last_monitor_time = current_time; + monitor_state.last_monitor_time = current_time; if (srv_print_innodb_monitor) { /* Reset mutex_skipped counter everytime @@ -1732,21 +1442,21 @@ loop: ensure we will not be blocked by lock_sys.mutex for short duration information printing, such as requested by sync_array_print_long_waits() */ - if (!last_srv_print_monitor) { - mutex_skipped = 0; - last_srv_print_monitor = TRUE; + if (!monitor_state.last_srv_print_monitor) { + monitor_state.mutex_skipped = 0; + monitor_state.last_srv_print_monitor = true; } if (!srv_printf_innodb_monitor(stderr, - MUTEX_NOWAIT(mutex_skipped), + MUTEX_NOWAIT(monitor_state.mutex_skipped), NULL, NULL)) { - mutex_skipped++; + monitor_state.mutex_skipped++; } else { /* Reset the counter */ - mutex_skipped = 0; + monitor_state.mutex_skipped = 0; } } else { - last_srv_print_monitor = FALSE; + monitor_state.last_monitor_time = 0; } @@ -1757,11 +1467,11 @@ loop: mutex_enter(&srv_monitor_file_mutex); rewind(srv_monitor_file); if (!srv_printf_innodb_monitor(srv_monitor_file, - MUTEX_NOWAIT(mutex_skipped), + MUTEX_NOWAIT(monitor_state.mutex_skipped), NULL, NULL)) { - mutex_skipped++; + monitor_state.mutex_skipped++; } else { - mutex_skipped = 0; + monitor_state.mutex_skipped = 0; } os_file_set_eof(srv_monitor_file); @@ -1770,63 +1480,27 @@ loop: } srv_refresh_innodb_monitor_stats(); - - if (srv_shutdown_state != SRV_SHUTDOWN_NONE) { - goto exit_func; - } - - if (srv_print_innodb_monitor - || srv_print_innodb_lock_monitor) { - goto loop; - } - - goto loop; - -exit_func: - srv_monitor_active = false; - - /* We count the number of threads in os_thread_exit(). A created - thread should always use that to exit and not use return() to exit. */ - - os_thread_exit(); - - OS_THREAD_DUMMY_RETURN; } /*********************************************************************//** -A thread which prints warnings about semaphore waits which have lasted +A task which prints warnings about semaphore waits which have lasted too long. These can be used to track bugs which cause hangs. -@return a dummy parameter */ -extern "C" -os_thread_ret_t -DECLARE_THREAD(srv_error_monitor_thread)(void*) +*/ +void srv_error_monitor_task(void*) { /* number of successive fatal timeouts observed */ - ulint fatal_cnt = 0; - lsn_t old_lsn; + static ulint fatal_cnt; + static lsn_t old_lsn = srv_start_lsn; lsn_t new_lsn; - int64_t sig_count; /* longest waiting thread for a semaphore */ - os_thread_id_t waiter = os_thread_get_curr_id(); - os_thread_id_t old_waiter = waiter; + os_thread_id_t waiter; + static os_thread_id_t old_waiter = os_thread_get_curr_id(); /* the semaphore that is being waited for */ const void* sema = NULL; - const void* old_sema = NULL; + static const void* old_sema = NULL; ut_ad(!srv_read_only_mode); - old_lsn = srv_start_lsn; - -#ifdef UNIV_DEBUG_THREAD_CREATION - ib::info() << "Error monitor thread starts, id " - << os_thread_pf(os_thread_get_curr_id()); -#endif /* UNIV_DEBUG_THREAD_CREATION */ - -#ifdef UNIV_PFS_THREAD - pfs_register_thread(srv_error_monitor_thread_key); -#endif /* UNIV_PFS_THREAD */ - -loop: /* Try to track a strange bug reported by Harald Fuchs and others, where the lsn seems to decrease at times */ @@ -1873,29 +1547,6 @@ loop: old_waiter = waiter; old_sema = sema; } - - /* Flush stderr so that a database user gets the output - to possible MySQL error file */ - - fflush(stderr); - - sig_count = os_event_reset(srv_error_event); - - os_event_wait_time_low(srv_error_event, 1000000, sig_count); - - if (srv_shutdown_state == SRV_SHUTDOWN_NONE) { - - goto loop; - } - - srv_error_monitor_active = false; - - /* We count the number of threads in os_thread_exit(). A created - thread should always use that to exit and not use return() to exit. */ - - os_thread_exit(); - - OS_THREAD_DUMMY_RETURN; } /******************************************************************//** @@ -1907,37 +1558,17 @@ srv_inc_activity_count(void) srv_sys.activity_count.inc(); } -/**********************************************************************//** -Check whether any background thread is active. If so return the thread -type. -@return SRV_NONE if all are suspended or have exited, thread -type if any are still active. */ -srv_thread_type -srv_get_active_thread_type(void) -/*============================*/ +/** +Check whether purge or master are still active. +@return true if something is active, false if not. +*/ +bool srv_any_background_activity() { - srv_thread_type ret = SRV_NONE; - - if (srv_read_only_mode) { - return(SRV_NONE); - } - - srv_sys_mutex_enter(); - - for (ulint i = SRV_WORKER; i <= SRV_MASTER; ++i) { - if (srv_sys.n_threads_active[i] != 0) { - ret = static_cast<srv_thread_type>(i); - break; - } - } - - srv_sys_mutex_exit(); - - if (ret == SRV_NONE && purge_sys.enabled()) { - ret = SRV_PURGE; + if (purge_sys.enabled() || srv_master_timer.get()) { + ut_ad(!srv_read_only_mode); + return true; } - - return(ret); + return false; } /** Wake up the InnoDB master thread if it was suspended (not sleeping). */ @@ -1948,24 +1579,20 @@ srv_active_wake_master_thread_low() ut_ad(!mutex_own(&srv_sys.mutex)); srv_inc_activity_count(); +} - if (srv_sys.n_threads_active[SRV_MASTER] == 0) { - srv_slot_t* slot; - - srv_sys_mutex_enter(); - slot = &srv_sys.sys_threads[SRV_MASTER_SLOT]; +void purge_worker_callback(void*); +void purge_coordinator_callback(void*); +void purge_coordinator_timer_callback(void*); - /* Only if the master thread has been started. */ +tpool::task_group purge_task_group; +tpool::waitable_task purge_worker_task(purge_worker_callback, nullptr, &purge_task_group); - if (slot->in_use) { - ut_a(srv_slot_get_type(slot) == SRV_MASTER); - os_event_set(slot->event); - } +tpool::task_group purge_coordinator_task_group(1); +tpool::waitable_task purge_coordinator_task(purge_coordinator_callback, nullptr, &purge_coordinator_task_group); - srv_sys_mutex_exit(); - } -} +tpool::timer* purge_coordinator_timer; /** Wake up the purge threads if there is work to do. */ void @@ -1975,10 +1602,10 @@ srv_wake_purge_thread_if_not_active() ut_ad(!mutex_own(&srv_sys.mutex)); if (purge_sys.enabled() && !purge_sys.paused() - && !srv_sys.n_threads_active[SRV_PURGE] && trx_sys.rseg_history_len) { - - srv_release_threads(SRV_PURGE, 1); + if(++purge_state.m_running == 1) { + srv_thread_pool->submit_task(&purge_coordinator_task); + } } } @@ -1987,7 +1614,6 @@ void srv_wake_master_thread() { srv_inc_activity_count(); - srv_release_threads(SRV_MASTER, 1); } /*******************************************************************//** @@ -2304,7 +1930,6 @@ srv_master_do_idle_tasks(void) /** Perform shutdown tasks. @param[in] ibuf_merge whether to complete the change buffer merge */ -static void srv_shutdown(bool ibuf_merge) { @@ -2341,95 +1966,24 @@ srv_shutdown(bool ibuf_merge) } /*********************************************************************//** -Puts master thread to sleep. At this point we are using polling to -service various activities. Master thread sleeps for one second before -checking the state of the server again */ -static -void -srv_master_sleep(void) -/*==================*/ -{ - srv_main_thread_op_info = "sleeping"; - os_thread_sleep(1000000); - srv_main_thread_op_info = ""; -} - -/*********************************************************************//** -The master thread controlling the server. +The periodic master controlling the server. @return a dummy parameter */ -extern "C" -os_thread_ret_t -DECLARE_THREAD(srv_master_thread)( -/*==============================*/ - void* arg MY_ATTRIBUTE((unused))) - /*!< in: a dummy parameter required by - os_thread_create */ -{ - my_thread_init(); - DBUG_ENTER("srv_master_thread"); - - srv_slot_t* slot; - ulint old_activity_count = srv_get_activity_count(); - - ut_ad(!srv_read_only_mode); - -#ifdef UNIV_DEBUG_THREAD_CREATION - ib::info() << "Master thread starts, id " - << os_thread_pf(os_thread_get_curr_id()); -#endif /* UNIV_DEBUG_THREAD_CREATION */ - -#ifdef UNIV_PFS_THREAD - pfs_register_thread(srv_master_thread_key); -#endif /* UNIV_PFS_THREAD */ - srv_main_thread_process_no = os_proc_get_number(); - srv_main_thread_id = os_thread_pf(os_thread_get_curr_id()); - - slot = srv_reserve_slot(SRV_MASTER); - ut_a(slot == srv_sys.sys_threads); - -loop: - while (srv_shutdown_state == SRV_SHUTDOWN_NONE) { - - srv_master_sleep(); - - MONITOR_INC(MONITOR_MASTER_THREAD_SLEEP); +void srv_master_callback(void*) +{ + static ulint old_activity_count; - if (srv_check_activity(old_activity_count)) { - old_activity_count = srv_get_activity_count(); - srv_master_do_active_tasks(); - } else { - srv_master_do_idle_tasks(); - } - } + ut_a(srv_shutdown_state == SRV_SHUTDOWN_NONE); - switch (srv_shutdown_state) { - case SRV_SHUTDOWN_NONE: - break; - case SRV_SHUTDOWN_FLUSH_PHASE: - case SRV_SHUTDOWN_LAST_PHASE: - ut_ad(0); - /* fall through */ - case SRV_SHUTDOWN_EXIT_THREADS: - /* srv_init_abort() must have been invoked */ - case SRV_SHUTDOWN_CLEANUP: - if (srv_shutdown_state == SRV_SHUTDOWN_CLEANUP - && srv_fast_shutdown < 2) { - srv_shutdown(srv_fast_shutdown == 0); - } - srv_suspend_thread(slot); - my_thread_end(); - os_thread_exit(); + srv_main_thread_op_info = ""; + MONITOR_INC(MONITOR_MASTER_THREAD_SLEEP); + if (srv_check_activity(old_activity_count)) { + old_activity_count = srv_get_activity_count(); + srv_master_do_active_tasks(); + } else { + srv_master_do_idle_tasks(); } - - srv_main_thread_op_info = "suspending"; - - srv_suspend_thread(slot); - - srv_main_thread_op_info = "waiting for server activity"; - - srv_resume_thread(slot); - goto loop; + srv_main_thread_op_info = "sleeping"; } /** @return whether purge should exit due to shutdown */ @@ -2468,7 +2022,7 @@ static bool srv_purge_should_exit() Fetch and execute a task from the work queue. @param [in,out] slot purge worker thread slot @return true if a task was executed */ -static bool srv_task_execute(ut_d(srv_slot_t *slot)) +static bool srv_task_execute() { ut_ad(!srv_read_only_mode); ut_ad(srv_force_recovery < SRV_FORCE_NO_BACKGROUND); @@ -2479,9 +2033,7 @@ static bool srv_task_execute(ut_d(srv_slot_t *slot)) ut_a(que_node_get_type(thr->child) == QUE_NODE_PURGE); UT_LIST_REMOVE(srv_sys.tasks, thr); mutex_exit(&srv_sys.tasks_mutex); - ut_d(thr->thread_slot = slot); que_run_threads(thr); - purge_sys.n_tasks.fetch_sub(1, std::memory_order_release); return true; } @@ -2490,86 +2042,12 @@ static bool srv_task_execute(ut_d(srv_slot_t *slot)) return false; } -/*********************************************************************//** -Worker thread that reads tasks from the work queue and executes them. -@return a dummy parameter */ -extern "C" -os_thread_ret_t -DECLARE_THREAD(srv_worker_thread)( -/*==============================*/ - void* arg MY_ATTRIBUTE((unused))) /*!< in: a dummy parameter - required by os_thread_create */ -{ - my_thread_init(); - - srv_slot_t* slot; - - ut_ad(!srv_read_only_mode); - ut_a(srv_force_recovery < SRV_FORCE_NO_BACKGROUND); - my_thread_init(); - THD* thd = innobase_create_background_thd("InnoDB purge worker"); - -#ifdef UNIV_DEBUG_THREAD_CREATION - ib::info() << "Worker thread starting, id " - << os_thread_pf(os_thread_get_curr_id()); -#endif /* UNIV_DEBUG_THREAD_CREATION */ - slot = srv_reserve_slot(SRV_WORKER); - -#ifdef UNIV_DEBUG - UT_LIST_INIT(slot->debug_sync, - &srv_slot_t::debug_sync_t::debug_sync_list); - rw_lock_create(PFS_NOT_INSTRUMENTED, &slot->debug_sync_lock, - SYNC_NO_ORDER_CHECK); -#endif - - ut_a(srv_n_purge_threads > 1); - ut_a(ulong(srv_sys.n_threads_active[SRV_WORKER]) - < srv_n_purge_threads); - - /* We need to ensure that the worker threads exit after the - purge coordinator thread. Otherwise the purge coordinator can - end up waiting forever in trx_purge_wait_for_workers_to_complete() */ - - do { - srv_suspend_thread(slot); - srv_resume_thread(slot); - - if (srv_task_execute(ut_d(slot))) { - - /* If there are tasks in the queue, wakeup - the purge coordinator thread. */ - - srv_wake_purge_thread_if_not_active(); - } - } while (purge_sys.enabled()); - - srv_free_slot(slot); - - ut_ad(!purge_sys.enabled()); - -#ifdef UNIV_DEBUG_THREAD_CREATION - ib::info() << "Purge worker thread exiting, id " - << os_thread_pf(os_thread_get_curr_id()); -#endif /* UNIV_DEBUG_THREAD_CREATION */ - - innobase_destroy_background_thd(thd); - my_thread_end(); - /* We count the number of threads in os_thread_exit(). A created - thread should always use that to exit and not use return() to exit. */ - os_thread_exit(); - - OS_THREAD_DUMMY_RETURN; /* Not reached, avoid compiler warning */ -} /** Do the actual purge operation. @param[in,out] n_total_purged total number of purged pages @return length of history list before the last purge batch. */ -static uint32_t srv_do_purge(ulint* n_total_purged -#ifdef UNIV_DEBUG - , srv_slot_t* slot /*!< purge coordinator */ -#endif - ) +static uint32_t srv_do_purge(ulint* n_total_purged) { ulint n_pages_purged; @@ -2629,11 +2107,7 @@ static uint32_t srv_do_purge(ulint* n_total_purged n_pages_purged = trx_purge( n_use_threads, !(++count % srv_purge_rseg_truncate_frequency) - || purge_sys.truncate.current -#ifdef UNIV_DEBUG - , slot -#endif - ); + || purge_sys.truncate.current); *n_total_purged += n_pages_purged; } while (n_pages_purged > 0 && !purge_sys.paused() @@ -2641,174 +2115,150 @@ static uint32_t srv_do_purge(ulint* n_total_purged return(rseg_history_len); } -#ifndef UNIV_DEBUG -# define srv_do_purge(n_total_purged, slot) srv_do_purge(n_total_purged) -#endif -/*********************************************************************//** -Suspend the purge coordinator thread. */ -static -void -srv_purge_coordinator_suspend( -/*==========================*/ - srv_slot_t* slot, /*!< in/out: Purge coordinator - thread slot */ - uint32_t rseg_history_len) /*!< in: history list length - before last purge */ + +std::queue<THD*> purge_thds; +std::mutex purge_thd_mutex; + +void purge_create_background_thds(int n) { - ut_ad(!srv_read_only_mode); - ut_a(slot->type == SRV_PURGE); + THD* thd = current_thd; + std::unique_lock<std::mutex> lk(purge_thd_mutex); + for (int i = 0; i < n; i++) { + purge_thds.push(innobase_create_background_thd("InnoDB purge worker")); + } + set_current_thd(thd); +} - bool stop = false; +extern void* thd_attach_thd(THD*); +extern void thd_detach_thd(void *); - /** Maximum wait time on the purge event, in micro-seconds. */ - static const ulint SRV_PURGE_MAX_TIMEOUT = 10000; +THD* acquire_thd(void **ctx) +{ + std::unique_lock<std::mutex> lk(purge_thd_mutex); + ut_a(!purge_thds.empty()); + THD* thd = purge_thds.front(); + purge_thds.pop(); + lk.unlock(); + + /* Set current thd, and thd->mysys_var as well, + it might be used by something in the server.*/ + *ctx = thd_attach_thd(thd); + return thd; +} - int64_t sig_count = srv_suspend_thread(slot); +void release_thd(THD *thd, void *ctx) +{ + thd_detach_thd(ctx); + std::unique_lock<std::mutex> lk(purge_thd_mutex); + purge_thds.push(thd); + lk.unlock(); + set_current_thd(0); +} - do { - /* We don't wait right away on the the non-timed wait because - we want to signal the thread that wants to suspend purge. */ - const bool wait = stop - || rseg_history_len <= trx_sys.rseg_history_len; - const bool timeout = srv_resume_thread( - slot, sig_count, wait, - stop ? 0 : SRV_PURGE_MAX_TIMEOUT); - - sig_count = srv_suspend_thread(slot); - - rw_lock_x_lock(&purge_sys.latch); - - stop = srv_shutdown_state == SRV_SHUTDOWN_NONE - && purge_sys.paused(); - - if (!stop) { - if (timeout - && rseg_history_len < 5000 - && rseg_history_len == trx_sys.rseg_history_len) { - /* No new records were added since the - wait started. Simply wait for new - records. The magic number 5000 is an - approximation for the case where we - have cached UNDO log records which - prevent truncate of the UNDO - segments. */ - stop = true; - } - } else { - /* Signal that we are suspended. */ - os_event_set(purge_sys.event); - } - rw_lock_x_unlock(&purge_sys.latch); - } while (stop && srv_undo_sources); - srv_resume_thread(slot, 0, false); +/* + Called by timer when purge coordinator decides + to delay processing of purge records. +*/ +void purge_coordinator_timer_callback(void *) +{ + if (!purge_sys.enabled() || purge_sys.paused() || + purge_state.m_running || !trx_sys.rseg_history_len) { + return; + } + + if (purge_state.m_history_length < 5000 && + purge_state.m_history_length == trx_sys.rseg_history_len) { + + /* No new records were added since wait started. + Simply wait for new records.The magic number 5000 is an + approximation for the case where we have cached UNDO + log records which prevent truncate of the UNDO segments.*/ + + return; + } + + srv_wake_purge_thread_if_not_active(); } -/*********************************************************************//** -Purge coordinator thread that schedules the purge tasks. -@return a dummy parameter */ -extern "C" -os_thread_ret_t -DECLARE_THREAD(srv_purge_coordinator_thread)( -/*=========================================*/ - void* arg MY_ATTRIBUTE((unused))) /*!< in: a dummy parameter - required by os_thread_create */ +void purge_worker_callback(void*) { - my_thread_init(); - THD* thd = innobase_create_background_thd("InnoDB purge coordinator"); - srv_slot_t* slot; - ulint n_total_purged = ULINT_UNDEFINED; - + ut_ad(!current_thd); ut_ad(!srv_read_only_mode); - ut_a(srv_n_purge_threads >= 1); - ut_a(srv_force_recovery < SRV_FORCE_NO_BACKGROUND); + ut_ad(srv_force_recovery < SRV_FORCE_NO_BACKGROUND); + void* ctx; + THD* thd = acquire_thd(&ctx); + while (srv_task_execute()){} + release_thd(thd,ctx); +} - purge_sys.coordinator_startup(); -#ifdef UNIV_PFS_THREAD - pfs_register_thread(srv_purge_thread_key); -#endif /* UNIV_PFS_THREAD */ +void purge_coordinator_callback_low() +{ + ulint n_total_purged = ULINT_UNDEFINED; + purge_state.m_history_length = 0; -#ifdef UNIV_DEBUG_THREAD_CREATION - ib::info() << "Purge coordinator thread created, id " - << os_thread_pf(os_thread_get_curr_id()); -#endif /* UNIV_DEBUG_THREAD_CREATION */ + if (!purge_sys.enabled() || purge_sys.paused()) { + return; + } + do { + n_total_purged = 0; - slot = srv_reserve_slot(SRV_PURGE); + int sigcount = purge_state.m_running; -#ifdef UNIV_DEBUG - UT_LIST_INIT(slot->debug_sync, - &srv_slot_t::debug_sync_t::debug_sync_list); - rw_lock_create(PFS_NOT_INSTRUMENTED, &slot->debug_sync_lock, - SYNC_NO_ORDER_CHECK); -#endif - uint32_t rseg_history_len = trx_sys.rseg_history_len; + purge_state.m_history_length = srv_do_purge(&n_total_purged); - do { - /* If there are no records to purge or the last - purge didn't purge any records then wait for activity. */ + /* Check if purge was woken by srv_wake_purge_thread_if_not_active() */ - if (srv_shutdown_state == SRV_SHUTDOWN_NONE - && srv_undo_sources - && (n_total_purged == 0 || purge_sys.paused())) { + bool woken_during_purge = purge_state.m_running > sigcount; - srv_purge_coordinator_suspend(slot, rseg_history_len); - } + /*If last purge batch processed less that 1 page and there is still work to do, + delay the next batch by 10ms. Unless someone added work and woke us up. */ + if (n_total_purged == 0){ - ut_ad(!slot->suspended); + if(trx_sys.rseg_history_len == 0) { + return; + } - if (srv_purge_should_exit()) { - break; + if (!woken_during_purge) { + /* Delay next purge round*/ + purge_coordinator_timer->set_time(10, 0); + return; + } } + } while((purge_sys.enabled() && !purge_sys.paused()) || !srv_purge_should_exit()); +} - n_total_purged = 0; - - rseg_history_len = srv_do_purge(&n_total_purged, slot); - } while (!srv_purge_should_exit()); - - /* The task queue should always be empty, independent of fast - shutdown state. */ - ut_a(srv_get_task_queue_length() == 0); - - srv_free_slot(slot); - - /* Note that we are shutting down. */ - rw_lock_x_lock(&purge_sys.latch); - purge_sys.coordinator_shutdown(); - /* Ensure that the wait in purge_sys_t::stop() will terminate. */ - os_event_set(purge_sys.event); - - rw_lock_x_unlock(&purge_sys.latch); - -#ifdef UNIV_DEBUG_THREAD_CREATION - ib::info() << "Purge coordinator exiting, id " - << os_thread_pf(os_thread_get_curr_id()); -#endif /* UNIV_DEBUG_THREAD_CREATION */ - - /* Ensure that all the worker threads quit. */ - if (ulint n_workers = srv_n_purge_threads - 1) { - const srv_slot_t* slot; - const srv_slot_t* const end = &srv_sys.sys_threads[ - srv_sys.n_sys_threads]; - - do { - srv_release_threads(SRV_WORKER, n_workers); - srv_sys_mutex_enter(); - for (slot = &srv_sys.sys_threads[2]; - !slot++->in_use && slot < end; ); - srv_sys_mutex_exit(); - } while (slot < end); - } +void purge_coordinator_callback(void*) +{ + void* ctx; + THD* thd = acquire_thd(&ctx); + purge_coordinator_callback_low(); + release_thd(thd,ctx); + purge_state.m_running = 0; +} - innobase_destroy_background_thd(thd); - my_thread_end(); - /* We count the number of threads in os_thread_exit(). A created - thread should always use that to exit and not use return() to exit. */ - os_thread_exit(); +void srv_init_purge_tasks(uint n_tasks) +{ + purge_task_group.set_max_tasks(n_tasks-1); + purge_create_background_thds(n_tasks); + purge_coordinator_timer = + srv_thread_pool->create_timer(purge_coordinator_timer_callback, + nullptr); +} - OS_THREAD_DUMMY_RETURN; /* Not reached, avoid compiler warning */ +void srv_shutdown_purge_tasks() +{ + purge_coordinator_task.wait(); + delete purge_coordinator_timer; + purge_coordinator_timer = nullptr; + purge_worker_task.wait(); + while (!purge_thds.empty()) { + innobase_destroy_background_thd(purge_thds.front()); + purge_thds.pop(); + } } /**********************************************************************//** @@ -2825,8 +2275,6 @@ srv_que_task_enqueue_low( UT_LIST_ADD_LAST(srv_sys.tasks, thr); mutex_exit(&srv_sys.tasks_mutex); - - srv_release_threads(SRV_WORKER, 1); } /**********************************************************************//** @@ -2849,7 +2297,7 @@ srv_get_task_queue_length(void) return(n_tasks); } -/** Wake up the purge threads. */ +/** Wake up the purge coordinator. */ void srv_purge_wakeup() { @@ -2859,55 +2307,21 @@ srv_purge_wakeup() if (srv_force_recovery >= SRV_FORCE_NO_BACKGROUND) { return; } - - do { - srv_release_threads(SRV_PURGE, 1); - - if (srv_n_purge_threads > 1) { - ulint n_workers = srv_n_purge_threads - 1; - - srv_release_threads(SRV_WORKER, n_workers); - } - } while (!srv_running.load(std::memory_order_relaxed) - && (srv_sys.n_threads_active[SRV_WORKER] - || srv_sys.n_threads_active[SRV_PURGE])); + ut_a(purge_sys.enabled() && !purge_sys.paused()); + purge_state.m_running = 0; + srv_wake_purge_thread_if_not_active(); } /** Shut down the purge threads. */ void srv_purge_shutdown() { - do { - ut_ad(!srv_undo_sources); - srv_purge_wakeup(); - } while (srv_sys.sys_threads[SRV_PURGE_SLOT].in_use); -} - -#ifdef UNIV_DEBUG -static ulint get_first_slot(srv_thread_type type) -{ - switch (type) { - case SRV_MASTER: - return SRV_MASTER_SLOT; - case SRV_PURGE: - return SRV_PURGE_SLOT; - case SRV_WORKER: - /* Find an empty slot, skip the master and purge slots. */ - return SRV_WORKER_SLOTS_START; - default: - ut_error; - } -} - -void srv_for_each_thread(srv_thread_type type, - srv_slot_callback_t callback, - const void *arg) -{ - for (ulint slot_idx= get_first_slot(type); - slot_idx < srv_sys.n_sys_threads - && srv_sys.sys_threads[slot_idx].in_use - && srv_sys.sys_threads[slot_idx].type == type; - slot_idx++) { - callback(&srv_sys.sys_threads[slot_idx], arg); + if (purge_sys.enabled()) { + while(!srv_purge_should_exit()) { + ut_a(!purge_sys.paused()); + srv_wake_purge_thread_if_not_active(); + os_thread_sleep(100); + } + purge_sys.coordinator_shutdown(); + srv_shutdown_purge_tasks(); } -} -#endif +}
\ No newline at end of file diff --git a/storage/innobase/srv/srv0start.cc b/storage/innobase/srv/srv0start.cc index 311574a4086..20d19520611 100644 --- a/storage/innobase/srv/srv0start.cc +++ b/storage/innobase/srv/srv0start.cc @@ -145,19 +145,18 @@ the initialisation step. */ enum srv_start_state_t { /** No thread started */ SRV_START_STATE_NONE = 0, /*!< No thread started */ - /** lock_wait_timeout_thread started */ - SRV_START_STATE_LOCK_SYS = 1, /*!< Started lock-timeout - thread. */ + /** lock_wait_timeout timer task started */ + SRV_START_STATE_LOCK_SYS = 1, /** buf_flush_page_cleaner_coordinator, buf_flush_page_cleaner_worker started */ SRV_START_STATE_IO = 2, - /** srv_error_monitor_thread, srv_monitor_thread started */ + /** srv_error_monitor_thread, srv_print_monitor_task started */ SRV_START_STATE_MONITOR = 4, /** srv_master_thread started */ SRV_START_STATE_MASTER = 8, /** srv_purge_coordinator_thread, srv_worker_thread started */ SRV_START_STATE_PURGE = 16, - /** fil_crypt_thread, btr_defragment_thread started + /** fil_crypt_thread, (all background threads that can generate redo log but not undo log */ SRV_START_STATE_REDO = 32 }; @@ -172,40 +171,16 @@ enum srv_shutdown_t srv_shutdown_state = SRV_SHUTDOWN_NONE; /** Files comprising the system tablespace */ pfs_os_file_t files[1000]; -/** io_handler_thread parameters for thread identification */ -static ulint n[SRV_MAX_N_IO_THREADS + 6]; -/** io_handler_thread identifiers, 32 is the maximum number of purge threads */ -/** 6 is the ? */ -#define START_OLD_THREAD_CNT (SRV_MAX_N_IO_THREADS + 6 + 32) -static os_thread_id_t thread_ids[SRV_MAX_N_IO_THREADS + 6 + 32]; - -/** Thead handles */ -static os_thread_t thread_handles[SRV_MAX_N_IO_THREADS + 6 + 32]; -static os_thread_t buf_dump_thread_handle; -static os_thread_t dict_stats_thread_handle; -/** Status variables, is thread started ?*/ -static bool thread_started[SRV_MAX_N_IO_THREADS + 6 + 32] = {false}; /** Name of srv_monitor_file */ static char* srv_monitor_file_name; +std::unique_ptr<tpool::timer> srv_master_timer; /** */ #define SRV_MAX_N_PENDING_SYNC_IOS 100 #ifdef UNIV_PFS_THREAD /* Keys to register InnoDB threads with performance schema */ -mysql_pfs_key_t buf_dump_thread_key; -mysql_pfs_key_t dict_stats_thread_key; -mysql_pfs_key_t io_handler_thread_key; -mysql_pfs_key_t io_ibuf_thread_key; -mysql_pfs_key_t io_log_thread_key; -mysql_pfs_key_t io_read_thread_key; -mysql_pfs_key_t io_write_thread_key; -mysql_pfs_key_t srv_error_monitor_thread_key; -mysql_pfs_key_t srv_lock_timeout_thread_key; -mysql_pfs_key_t srv_master_thread_key; -mysql_pfs_key_t srv_monitor_thread_key; -mysql_pfs_key_t srv_purge_thread_key; -mysql_pfs_key_t srv_worker_thread_key; +mysql_pfs_key_t thread_pool_thread_key; #endif /* UNIV_PFS_THREAD */ #ifdef HAVE_PSI_STAGE_INTERFACE @@ -275,64 +250,6 @@ srv_file_check_mode( return(true); } -/********************************************************************//** -I/o-handler thread function. -@return OS_THREAD_DUMMY_RETURN */ -extern "C" -os_thread_ret_t -DECLARE_THREAD(io_handler_thread)( -/*==============================*/ - void* arg) /*!< in: pointer to the number of the segment in - the aio array */ -{ - ulint segment; - - segment = *((ulint*) arg); - -#ifdef UNIV_DEBUG_THREAD_CREATION - ib::info() << "Io handler thread " << segment << " starts, id " - << os_thread_pf(os_thread_get_curr_id()); -#endif - - /* For read only mode, we don't need ibuf and log I/O thread. - Please see srv_start() */ - ulint start = (srv_read_only_mode) ? 0 : 2; - - if (segment < start) { - if (segment == 0) { - pfs_register_thread(io_ibuf_thread_key); - } else { - ut_ad(segment == 1); - pfs_register_thread(io_log_thread_key); - } - } else if (segment >= start - && segment < (start + srv_n_read_io_threads)) { - pfs_register_thread(io_read_thread_key); - - } else if (segment >= (start + srv_n_read_io_threads) - && segment < (start + srv_n_read_io_threads - + srv_n_write_io_threads)) { - pfs_register_thread(io_write_thread_key); - - } else { - pfs_register_thread(io_handler_thread_key); - } - - while (srv_shutdown_state != SRV_SHUTDOWN_EXIT_THREADS - || buf_page_cleaner_is_active - || !os_aio_all_slots_free()) { - fil_aio_wait(segment); - } - - /* We count the number of threads in os_thread_exit(). A created - thread should always use that to exit and not use return() to exit. - The thread actually never comes here because it is exited in an - os_event_wait(). */ - - os_thread_exit(); - - OS_THREAD_DUMMY_RETURN; -} /*********************************************************************//** Creates a log file. @@ -1066,6 +983,13 @@ srv_shutdown_all_bg_threads() ut_ad(!srv_undo_sources); srv_shutdown_state = SRV_SHUTDOWN_EXIT_THREADS; + lock_sys.timeout_timer.reset(); + srv_master_timer.reset(); + + if (purge_sys.enabled()) { + srv_purge_shutdown(); + } + /* All threads end up waiting for certain events. Put those events to the signaled state. Then the threads will exit themselves after os_event_wait(). */ @@ -1073,26 +997,10 @@ srv_shutdown_all_bg_threads() /* NOTE: IF YOU CREATE THREADS IN INNODB, YOU MUST EXIT THEM HERE OR EARLIER */ - if (srv_start_state_is_set(SRV_START_STATE_LOCK_SYS)) { - /* a. Let the lock timeout thread exit */ - os_event_set(lock_sys.timeout_event); - } if (!srv_read_only_mode) { /* b. srv error monitor thread exits automatically, no need to do anything here */ - - if (srv_start_state_is_set(SRV_START_STATE_MASTER)) { - /* c. We wake the master thread so that - it exits */ - srv_wake_master_thread(); - } - - if (srv_start_state_is_set(SRV_START_STATE_PURGE)) { - /* d. Wakeup purge threads. */ - srv_purge_wakeup(); - } - if (srv_n_fil_crypt_threads_started) { os_event_set(fil_crypt_threads_event); } @@ -1120,25 +1028,11 @@ srv_shutdown_all_bg_threads() return; } - switch (srv_operation) { - case SRV_OPERATION_BACKUP: - case SRV_OPERATION_RESTORE_DELTA: - break; - case SRV_OPERATION_NORMAL: - case SRV_OPERATION_RESTORE: - case SRV_OPERATION_RESTORE_EXPORT: - if (!buf_page_cleaner_is_active - && os_aio_all_slots_free()) { - os_aio_wake_all_threads_at_shutdown(); - } - } - os_thread_sleep(100000); } ib::warn() << os_thread_count << " threads created by InnoDB" " had not exited at shutdown!"; - ut_d(os_aio_print_pending_io(stderr)); ut_ad(0); } @@ -1379,10 +1273,7 @@ dberr_t srv_start(bool create_new_db) srv_max_n_threads = 1 /* io_ibuf_thread */ + 1 /* io_log_thread */ - + 1 /* lock_wait_timeout_thread */ - + 1 /* srv_error_monitor_thread */ - + 1 /* srv_monitor_thread */ - + 1 /* srv_master_thread */ + + 1 /* srv_print_monitor_task */ + 1 /* srv_purge_coordinator_thread */ + 1 /* buf_dump_thread */ + 1 /* dict_stats_thread */ @@ -1535,15 +1426,6 @@ dberr_t srv_start(bool create_new_db) recv_sys.create(); lock_sys.create(srv_lock_table_size); - /* Create i/o-handler threads: */ - - for (ulint t = 0; t < srv_n_file_io_threads; ++t) { - - n[t] = t; - - thread_handles[t] = os_thread_create(io_handler_thread, n + t, thread_ids + t); - thread_started[t] = true; - } if (!srv_read_only_mode) { buf_flush_page_cleaner_init(); @@ -1803,7 +1685,7 @@ files_checked: /* Initialize objects used by dict stats gathering thread, which can also be used by recovery if it tries to drop some table */ if (!srv_read_only_mode) { - dict_stats_thread_init(); + dict_stats_init(); } trx_sys.create(); @@ -2239,28 +2121,16 @@ files_checked: srv_startup_is_before_trx_rollback_phase = false; if (!srv_read_only_mode) { - /* Create the thread which watches the timeouts + /* timer task which watches the timeouts for lock waits */ - thread_handles[2 + SRV_MAX_N_IO_THREADS] = os_thread_create( - lock_wait_timeout_thread, - NULL, thread_ids + 2 + SRV_MAX_N_IO_THREADS); - thread_started[2 + SRV_MAX_N_IO_THREADS] = true; - lock_sys.timeout_thread_active = true; + lock_sys.timeout_timer.reset(srv_thread_pool->create_timer( + lock_wait_timeout_task)); DBUG_EXECUTE_IF("innodb_skip_monitors", goto skip_monitors;); - /* Create the thread which warns of long semaphore waits */ - srv_error_monitor_active = true; - thread_handles[3 + SRV_MAX_N_IO_THREADS] = os_thread_create( - srv_error_monitor_thread, - NULL, thread_ids + 3 + SRV_MAX_N_IO_THREADS); - thread_started[3 + SRV_MAX_N_IO_THREADS] = true; - - /* Create the thread which prints InnoDB monitor info */ - srv_monitor_active = true; - thread_handles[4 + SRV_MAX_N_IO_THREADS] = os_thread_create( - srv_monitor_thread, - NULL, thread_ids + 4 + SRV_MAX_N_IO_THREADS); - thread_started[4 + SRV_MAX_N_IO_THREADS] = true; + /* Create the task which warns of long semaphore waits */ + srv_start_periodic_timer(srv_error_monitor_timer, srv_error_monitor_task, 1000); + srv_start_periodic_timer(srv_monitor_timer, srv_monitor_task, 5000); + srv_start_state |= SRV_START_STATE_LOCK_SYS | SRV_START_STATE_MONITOR; @@ -2272,11 +2142,8 @@ skip_monitors: if (srv_force_recovery < SRV_FORCE_NO_BACKGROUND) { srv_undo_sources = true; - /* Create the dict stats gathering thread */ - srv_dict_stats_thread_active = true; - dict_stats_thread_handle = os_thread_create( - dict_stats_thread, NULL, NULL); - + /* Create the dict stats gathering task */ + dict_stats_start(); /* Create the thread that will optimize the FULLTEXT search index subsystem. */ fts_optimize_init(); @@ -2316,42 +2183,16 @@ skip_monitors: trx_temp_rseg_create(); if (srv_force_recovery < SRV_FORCE_NO_BACKGROUND) { - thread_handles[1 + SRV_MAX_N_IO_THREADS] - = os_thread_create(srv_master_thread, NULL, - (1 + SRV_MAX_N_IO_THREADS) - + thread_ids); - thread_started[1 + SRV_MAX_N_IO_THREADS] = true; - srv_start_state_set(SRV_START_STATE_MASTER); + srv_start_periodic_timer(srv_master_timer, srv_master_callback, 1000); } } if (!srv_read_only_mode && srv_operation == SRV_OPERATION_NORMAL && srv_force_recovery < SRV_FORCE_NO_BACKGROUND) { - thread_handles[5 + SRV_MAX_N_IO_THREADS] = os_thread_create( - srv_purge_coordinator_thread, - NULL, thread_ids + 5 + SRV_MAX_N_IO_THREADS); - - thread_started[5 + SRV_MAX_N_IO_THREADS] = true; - - ut_a(UT_ARR_SIZE(thread_ids) - > 5 + srv_n_purge_threads + SRV_MAX_N_IO_THREADS); - - /* We've already created the purge coordinator thread above. */ - for (i = 1; i < srv_n_purge_threads; ++i) { - thread_handles[5 + i + SRV_MAX_N_IO_THREADS] = os_thread_create( - srv_worker_thread, NULL, - thread_ids + 5 + i + SRV_MAX_N_IO_THREADS); - thread_started[5 + i + SRV_MAX_N_IO_THREADS] = true; - } - - while (srv_shutdown_state == SRV_SHUTDOWN_NONE - && srv_force_recovery < SRV_FORCE_NO_BACKGROUND - && !purge_sys.enabled()) { - ib::info() << "Waiting for purge to start"; - os_thread_sleep(50000); - } - + srv_init_purge_tasks(srv_n_purge_threads); + purge_sys.coordinator_startup(); + srv_wake_purge_thread_if_not_active(); srv_start_state_set(SRV_START_STATE_PURGE); } @@ -2396,10 +2237,8 @@ skip_monitors: if (!get_wsrep_recovery()) { #endif /* WITH_WSREP */ - /* Create the buffer pool dump/load thread */ - srv_buf_dump_thread_active = true; - buf_dump_thread_handle= - os_thread_create(buf_dump_thread, NULL, NULL); + /* Start buffer pool dump/load task */ + buf_load_at_startup(); #ifdef WITH_WSREP } else { @@ -2420,16 +2259,10 @@ skip_monitors: /* Initialize online defragmentation. */ btr_defragment_init(); - btr_defragment_thread_active = true; - os_thread_create(btr_defragment_thread, NULL, NULL); srv_start_state |= SRV_START_STATE_REDO; } - /* Create the buffer pool resize thread */ - srv_buf_resize_thread_active = true; - os_thread_create(buf_resize_thread, NULL, NULL); - return(DB_SUCCESS); } @@ -2448,12 +2281,38 @@ void srv_shutdown_bg_undo_sources() } } +/** +Perform pre-shutdown task. + +Since purge tasks vall into server (some MDL acqusition, +and compute virtual functions), let them shut down right +after use connections go down while the rest of the server +infrasture is still intact. +*/ +void innodb_preshutdown() +{ + static bool first_time = true; + if (!first_time) + return; + first_time = false; + + if (!srv_read_only_mode) { + if (!srv_fast_shutdown && srv_operation == SRV_OPERATION_NORMAL) { + while (trx_sys.any_active_transactions()) { + os_thread_sleep(1000); + } + } + srv_shutdown_bg_undo_sources(); + srv_purge_shutdown(); + } +} + + /** Shut down InnoDB. */ void innodb_shutdown() { - ut_ad(!srv_running.load(std::memory_order_relaxed)); + innodb_preshutdown(); ut_ad(!srv_undo_sources); - switch (srv_operation) { case SRV_OPERATION_BACKUP: case SRV_OPERATION_RESTORE: @@ -2489,7 +2348,6 @@ void innodb_shutdown() srv_misc_tmpfile = 0; } - ut_ad(dict_stats_event || !srv_was_started || srv_read_only_mode); ut_ad(dict_sys.is_initialised() || !srv_was_started); ut_ad(trx_sys.is_initialised() || !srv_was_started); ut_ad(buf_dblwr || !srv_was_started || srv_read_only_mode @@ -2501,9 +2359,7 @@ void innodb_shutdown() #endif /* BTR_CUR_HASH_ADAPT */ ut_ad(ibuf.index || !srv_was_started); - if (dict_stats_event) { - dict_stats_thread_deinit(); - } + dict_stats_deinit(); if (srv_start_state_is_set(SRV_START_STATE_REDO)) { ut_ad(!srv_read_only_mode); @@ -2565,7 +2421,7 @@ void innodb_shutdown() << srv_shutdown_lsn << "; transaction id " << trx_sys.get_max_trx_id(); } - + srv_thread_pool_end(); srv_start_state = SRV_START_STATE_NONE; srv_was_started = false; srv_start_has_been_called = false; @@ -2605,3 +2461,5 @@ srv_get_meta_data_filename( ut_free(path); } + + diff --git a/storage/innobase/sync/sync0arr.cc b/storage/innobase/sync/sync0arr.cc index b9578289504..b3528e61a0d 100644 --- a/storage/innobase/sync/sync0arr.cc +++ b/storage/innobase/sync/sync0arr.cc @@ -1096,9 +1096,7 @@ sync_array_print_long_waits( srv_print_innodb_monitor = TRUE; - lock_set_timeout_event(); - - os_thread_sleep(30000000); + lock_wait_timeout_task(nullptr); srv_print_innodb_monitor = static_cast<my_bool>(old_val); fprintf(stderr, diff --git a/storage/innobase/trx/trx0purge.cc b/storage/innobase/trx/trx0purge.cc index 356999b17b2..0cef749fd14 100644 --- a/storage/innobase/trx/trx0purge.cc +++ b/storage/innobase/trx/trx0purge.cc @@ -161,9 +161,6 @@ void purge_sys_t::create() { ut_ad(this == &purge_sys); ut_ad(!enabled()); - ut_ad(!event); - event= os_event_create(0); - ut_ad(event); m_paused= 0; query= purge_graph_build(); next_stored= false; @@ -176,16 +173,18 @@ void purge_sys_t::create() mutex_create(LATCH_ID_PURGE_SYS_PQ, &pq_mutex); truncate.current= NULL; truncate.last= NULL; + m_initialized = true; + } /** Close the purge subsystem on shutdown. */ void purge_sys_t::close() { - ut_ad(this == &purge_sys); - if (!event) return; + if (!m_initialized) + return; + ut_ad(this == &purge_sys); ut_ad(!enabled()); - ut_ad(n_tasks.load(std::memory_order_relaxed) == 0); trx_t* trx = query->trx; que_graph_free(query); ut_ad(!trx->id); @@ -194,7 +193,7 @@ void purge_sys_t::close() trx_free(trx); rw_lock_free(&latch); mutex_free(&pq_mutex); - os_event_destroy(event); + m_initialized = false; } /*================ UNDO LOG HISTORY LIST =============================*/ @@ -1250,21 +1249,14 @@ trx_purge_dml_delay(void) return(delay); } +extern tpool::waitable_task purge_worker_task; + /** Wait for pending purge jobs to complete. */ static void trx_purge_wait_for_workers_to_complete() { - /* Ensure that the work queue empties out. */ - while (purge_sys.n_tasks.load(std::memory_order_acquire)) { - - if (srv_get_task_queue_length() > 0) { - srv_release_threads(SRV_WORKER, 1); - } - - os_thread_yield(); - } - + purge_worker_task.wait(); /* There should be no outstanding tasks as long as the worker threads are active. */ ut_a(srv_get_task_queue_length() == 0); @@ -1279,10 +1271,6 @@ trx_purge( ulint n_purge_threads, /*!< in: number of purge tasks to submit to the work queue */ bool truncate /*!< in: truncate history if true */ -#ifdef UNIV_DEBUG - , srv_slot_t *slot /*!< in/out: purge coordinator - thread slot */ -#endif ) { que_thr_t* thr = NULL; @@ -1292,9 +1280,6 @@ trx_purge( srv_dml_needed_delay = trx_purge_dml_delay(); - /* All submitted tasks should be completed. */ - ut_ad(purge_sys.n_tasks.load(std::memory_order_relaxed) == 0); - rw_lock_x_lock(&purge_sys.latch); trx_sys.clone_oldest_view(); rw_lock_x_unlock(&purge_sys.latch); @@ -1307,24 +1292,21 @@ trx_purge( /* Fetch the UNDO recs that need to be purged. */ n_pages_handled = trx_purge_attach_undo_recs(n_purge_threads); - purge_sys.n_tasks.store(n_purge_threads - 1, std::memory_order_relaxed); /* Submit tasks to workers queue if using multi-threaded purge. */ - for (ulint i = n_purge_threads; --i; ) { + for (ulint i = 0; i < n_purge_threads-1; i++) { thr = que_fork_scheduler_round_robin(purge_sys.query, thr); ut_a(thr); srv_que_task_enqueue_low(thr); + srv_thread_pool->submit_task(&purge_worker_task); } thr = que_fork_scheduler_round_robin(purge_sys.query, thr); - ut_d(thr->thread_slot = slot); que_run_threads(thr); trx_purge_wait_for_workers_to_complete(); - ut_ad(purge_sys.n_tasks.load(std::memory_order_relaxed) == 0); - if (truncate) { trx_purge_truncate_history(); } @@ -1335,6 +1317,8 @@ trx_purge( return(n_pages_handled); } +extern tpool::waitable_task purge_coordinator_task; + /** Stop purge during FLUSH TABLES FOR EXPORT */ void purge_sys_t::stop() { @@ -1352,14 +1336,8 @@ void purge_sys_t::stop() if (m_paused++ == 0) { - /* We need to wakeup the purge thread in case it is suspended, so - that it can acknowledge the state change. */ - const int64_t sig_count = os_event_reset(event); rw_lock_x_unlock(&latch); ib::info() << "Stopping purge"; - srv_purge_wakeup(); - /* Wait for purge coordinator to signal that it is suspended. */ - os_event_wait_low(event, sig_count); MONITOR_ATOMIC_INC(MONITOR_PURGE_STOP_COUNT); return; } @@ -1369,8 +1347,7 @@ void purge_sys_t::stop() if (running()) { ib::info() << "Waiting for purge to stop"; - while (running()) - os_thread_sleep(10000); + purge_coordinator_task.wait(); } } @@ -1384,6 +1361,7 @@ void purge_sys_t::resume() return; } + rw_lock_x_lock(&latch); int32_t paused= m_paused--; ut_a(paused); @@ -1393,4 +1371,5 @@ void purge_sys_t::resume() srv_purge_wakeup(); MONITOR_ATOMIC_INC(MONITOR_PURGE_RESUME_COUNT); } + rw_lock_x_unlock(&latch); } |