diff options
author | Nikita Malyavin <nikitamalyavin@gmail.com> | 2019-07-25 22:17:04 +1000 |
---|---|---|
committer | Nikita Malyavin <nikitamalyavin@gmail.com> | 2019-09-27 18:13:16 +1000 |
commit | 905a98c0eb812f1a3aad83368244d549baf80cbc (patch) | |
tree | 21eed70b0d50c9493c1a25c7bbfe3686821de37d | |
parent | b6bb64e54a3e34a20cda34e25b6ec62a097955ef (diff) | |
download | mariadb-git-905a98c0eb812f1a3aad83368244d549baf80cbc.tar.gz |
add innodb_debug_sync var to support DEBUG_SYNC from purge threads
-rw-r--r-- | storage/innobase/handler/ha_innodb.cc | 42 | ||||
-rw-r--r-- | storage/innobase/include/que0que.h | 3 | ||||
-rw-r--r-- | storage/innobase/include/srv0srv.h | 18 | ||||
-rw-r--r-- | storage/innobase/include/trx0purge.h | 4 | ||||
-rw-r--r-- | storage/innobase/que/que0que.cc | 3 | ||||
-rw-r--r-- | storage/innobase/row/row0purge.cc | 20 | ||||
-rw-r--r-- | storage/innobase/srv/srv0srv.cc | 70 | ||||
-rw-r--r-- | storage/innobase/trx/trx0purge.cc | 5 |
8 files changed, 153 insertions, 12 deletions
diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index 06a9b74546b..f8345a06b74 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -18573,6 +18573,34 @@ innodb_log_checksums_update(THD* thd, st_mysql_sys_var*, void* var_ptr, thd, *static_cast<const my_bool*>(save)); } +#ifdef UNIV_DEBUG +static +void +innobase_debug_sync_callback(srv_slot_t *slot, const void *value) +{ + ut_ad(srv_n_purge_threads == 1); + const char *value_str = *static_cast<const char* const*>(value); + size_t len = strlen(value_str) + 1; + + + // One allocatoin for list node object and value. + void *buf = ut_malloc_nokey(sizeof(srv_slot_t::debug_sync_t) + len); + srv_slot_t::debug_sync_t *sync = new(buf) srv_slot_t::debug_sync_t(); + strcpy(sync->str, value_str); + + rw_lock_x_lock(&slot->debug_sync_lock); + UT_LIST_ADD_LAST(slot->debug_sync, sync); + rw_lock_x_unlock(&slot->debug_sync_lock); +} +static +void +innobase_debug_sync_set(THD *thd, st_mysql_sys_var*, void *, const void *value) +{ + srv_for_each_thread(SRV_WORKER, innobase_debug_sync_callback, value); + srv_for_each_thread(SRV_PURGE, innobase_debug_sync_callback, value); +} +#endif + static SHOW_VAR innodb_status_variables_export[]= { {"Innodb", (char*) &show_innodb_vars, SHOW_FUNC}, {NullS, NullS, SHOW_LONG} @@ -19025,7 +19053,8 @@ static MYSQL_SYSVAR_ULONG(purge_threads, srv_n_purge_threads, NULL, NULL, 4, /* Default setting */ 1, /* Minimum value */ - 32, 0); /* Maximum value */ + srv_max_purge_threads,/* Maximum value */ + 0); static MYSQL_SYSVAR_ULONG(sync_array_size, srv_sync_array_size, PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_READONLY, @@ -20092,6 +20121,16 @@ static MYSQL_SYSVAR_BOOL(debug_force_scrubbing, 0, "Perform extra scrubbing to increase test exposure", NULL, NULL, FALSE); + +char *innobase_debug_sync; +static MYSQL_SYSVAR_STR(debug_sync, innobase_debug_sync, + PLUGIN_VAR_NOCMDARG, + "debug_sync for innodb purge threads. " + "Use it t oset up sync points for all purge threads " + "at once. The commands will be applied sequentially at " + "the beginning of purging the next node ", + NULL, + innobase_debug_sync_set, NULL); #endif /* UNIV_DEBUG */ static MYSQL_SYSVAR_BOOL(encrypt_temporary_tables, innodb_encrypt_temporary_tables, @@ -20303,6 +20342,7 @@ static struct st_mysql_sys_var* innobase_system_variables[]= { MYSQL_SYSVAR(background_scrub_data_check_interval), #ifdef UNIV_DEBUG MYSQL_SYSVAR(debug_force_scrubbing), + MYSQL_SYSVAR(debug_sync), #endif MYSQL_SYSVAR(buf_dump_status_frequency), MYSQL_SYSVAR(background_thread), diff --git a/storage/innobase/include/que0que.h b/storage/innobase/include/que0que.h index b7d2aae2c1b..b56a6d09d2d 100644 --- a/storage/innobase/include/que0que.h +++ b/storage/innobase/include/que0que.h @@ -378,6 +378,9 @@ struct que_thr_t{ related delete/updates */ row_prebuilt_t* prebuilt; /*!< prebuilt structure processed by the query thread */ + + ut_d(srv_slot_t *thread_slot;) /*!< a slot from srv_sys.sys_threads + * if any */ }; #define QUE_THR_MAGIC_N 8476583 diff --git a/storage/innobase/include/srv0srv.h b/storage/innobase/include/srv0srv.h index db2218633b3..c0c1efbb6d2 100644 --- a/storage/innobase/include/srv0srv.h +++ b/storage/innobase/include/srv0srv.h @@ -592,6 +592,8 @@ extern ulong srv_fatal_semaphore_wait_threshold; /** Buffer pool dump status frequence in percentages */ extern ulong srv_buf_dump_status_frequency; +constexpr uint srv_max_purge_threads = 32; + # ifdef UNIV_PFS_THREAD /* Keys to register InnoDB threads with performance schema */ extern mysql_pfs_key_t buf_dump_thread_key; @@ -1105,8 +1107,24 @@ struct srv_slot_t{ to do */ que_thr_t* thr; /*!< suspended query thread (only used for user threads) */ +#ifdef UNIV_DEBUG + struct debug_sync_t { + UT_LIST_NODE_T(debug_sync_t) + debug_sync_list; + char str[0]; + }; + UT_LIST_BASE_NODE_T(debug_sync_t) + debug_sync; + rw_lock_t debug_sync_lock; +#endif }; +typedef void srv_slot_callback_t(srv_slot_t*, const void*); + +void srv_for_each_thread(srv_thread_type type, + srv_slot_callback_t callback, + const void *arg); + #ifdef WITH_WSREP UNIV_INTERN void diff --git a/storage/innobase/include/trx0purge.h b/storage/innobase/include/trx0purge.h index 71e0b78a105..9a9c685c972 100644 --- a/storage/innobase/include/trx0purge.h +++ b/storage/innobase/include/trx0purge.h @@ -61,7 +61,9 @@ trx_purge( /*======*/ ulint n_purge_threads, /*!< in: number of purge tasks to submit to task queue. */ - bool truncate); /*!< in: truncate history if true */ + bool truncate, /*!< in: truncate history if true */ + srv_slot_t *slot); /*!< in/out: purge coordinator + thread slot */ /** Rollback segements from a given transaction with trx-no scheduled for purge. */ diff --git a/storage/innobase/que/que0que.cc b/storage/innobase/que/que0que.cc index 878427e536e..f8d56a7a250 100644 --- a/storage/innobase/que/que0que.cc +++ b/storage/innobase/que/que0que.cc @@ -1079,6 +1079,9 @@ que_run_threads_low( ut_a(thr_get_trx(thr)->error_state == DB_SUCCESS); ut_ad(!trx_mutex_own(thr_get_trx(thr))); + /* slot can be received from purge thread for debug_sync setup */ + ut_d(srv_slot_t *slot = thr->thread_slot); + /* cumul_resource counts how much resources the OS thread (NOT the query thread) has spent in this function */ diff --git a/storage/innobase/row/row0purge.cc b/storage/innobase/row/row0purge.cc index 89e336e1724..e61327d32f8 100644 --- a/storage/innobase/row/row0purge.cc +++ b/storage/innobase/row/row0purge.cc @@ -46,6 +46,7 @@ Created 3/14/1997 Heikki Tuuri #include "handler.h" #include "ha_innodb.h" #include "fil0fil.h" +#include "debug_sync.h" /************************************************************************* IMPORTANT NOTE: Any operation that generates redo MUST check that there @@ -1306,6 +1307,25 @@ row_purge_step( node->start(); +#ifdef UNIV_DEBUG + srv_slot_t *slot = thr->thread_slot; + ut_ad(slot); + + rw_lock_x_lock(&slot->debug_sync_lock); + while (UT_LIST_GET_LEN(slot->debug_sync)) { + srv_slot_t::debug_sync_t *sync = + UT_LIST_GET_FIRST(slot->debug_sync); + bool result = debug_sync_set_action(current_thd, + sync->str, + strlen(sync->str)); + ut_a(!result); + + UT_LIST_REMOVE(slot->debug_sync, sync); + ut_free(sync); + } + rw_lock_x_unlock(&slot->debug_sync_lock); +#endif + if (!(node->undo_recs == NULL || ib_vector_is_empty(node->undo_recs))) { trx_purge_rec_t*purge_rec; diff --git a/storage/innobase/srv/srv0srv.cc b/storage/innobase/srv/srv0srv.cc index ce2240d3256..87bd8755ab2 100644 --- a/storage/innobase/srv/srv0srv.cc +++ b/storage/innobase/srv/srv0srv.cc @@ -594,7 +594,8 @@ struct srv_sys_t{ ulint n_sys_threads; /*!< size of the sys_threads array */ - srv_slot_t sys_threads[32 + 1]; /*!< server thread table; + srv_slot_t + sys_threads[srv_max_purge_threads + 1]; /*!< server thread table; os_event_set() and os_event_reset() on sys_threads[]->event are @@ -642,11 +643,15 @@ and/or load it during startup. */ char srv_buffer_pool_dump_at_shutdown = TRUE; char srv_buffer_pool_load_at_startup = TRUE; +/** Slot index in the srv_sys.sys_threads array for the master thread. */ +static const ulint SRV_MASTER_SLOT = 0; + /** Slot index in the srv_sys.sys_threads array for the purge thread. */ static const ulint SRV_PURGE_SLOT = 1; -/** Slot index in the srv_sys.sys_threads array for the master thread. */ -static const ulint SRV_MASTER_SLOT = 0; +/** Slot index in the srv_sys.sys_threads array from which purge workers start. + */ +static const ulint SRV_WORKER_SLOTS_START = 2; #ifdef HAVE_PSI_STAGE_INTERFACE /** Performance schema stage event for monitoring ALTER TABLE progress @@ -794,7 +799,7 @@ srv_reserve_slot( case SRV_WORKER: /* Find an empty slot, skip the master and purge slots. */ - for (slot = &srv_sys.sys_threads[2]; + for (slot = &srv_sys.sys_threads[SRV_WORKER_SLOTS_START]; slot->in_use; ++slot) { @@ -2458,8 +2463,9 @@ static bool srv_purge_should_exit() /*********************************************************************//** Fetch and execute a task from the work queue. +@param [in,out] slot purge worker thread slot @return true if a task was executed */ -static bool srv_task_execute() +static bool srv_task_execute(srv_slot_t *slot) { ut_ad(!srv_read_only_mode); ut_ad(srv_force_recovery < SRV_FORCE_NO_BACKGROUND); @@ -2470,6 +2476,7 @@ static bool srv_task_execute() ut_a(que_node_get_type(thr->child) == QUE_NODE_PURGE); UT_LIST_REMOVE(srv_sys.tasks, thr); mutex_exit(&srv_sys.tasks_mutex); + ut_d(thr->thread_slot = slot;) que_run_threads(thr); my_atomic_addlint(&purge_sys.n_completed, 1); return true; @@ -2506,6 +2513,13 @@ DECLARE_THREAD(srv_worker_thread)( slot = srv_reserve_slot(SRV_WORKER); +#ifdef UNIV_DEBUG + UT_LIST_INIT(slot->debug_sync, + &srv_slot_t::debug_sync_t::debug_sync_list); + rw_lock_create(PFS_NOT_INSTRUMENTED, &slot->debug_sync_lock, + SYNC_NO_ORDER_CHECK); +#endif + ut_a(srv_n_purge_threads > 1); ut_a(ulong(my_atomic_loadlint(&srv_sys.n_threads_active[SRV_WORKER])) < srv_n_purge_threads); @@ -2518,7 +2532,7 @@ DECLARE_THREAD(srv_worker_thread)( srv_suspend_thread(slot); srv_resume_thread(slot); - if (srv_task_execute()) { + if (srv_task_execute(slot)) { /* If there are tasks in the queue, wakeup the purge coordinator thread. */ @@ -2547,10 +2561,11 @@ DECLARE_THREAD(srv_worker_thread)( /** Do the actual purge operation. @param[in,out] n_total_purged total number of purged pages +@param[in,out] slot purge coordinator thread slot @return length of history list before the last purge batch. */ static ulint -srv_do_purge(ulint* n_total_purged) +srv_do_purge(ulint* n_total_purged, srv_slot_t *slot) { ulint n_pages_purged; @@ -2616,7 +2631,8 @@ srv_do_purge(ulint* n_total_purged) n_pages_purged = trx_purge( n_use_threads, - (++count % rseg_truncate_frequency) == 0); + (++count % rseg_truncate_frequency) == 0, + slot); *n_total_purged += n_pages_purged; } while (n_pages_purged > 0 && !purge_sys.paused() @@ -2718,6 +2734,12 @@ DECLARE_THREAD(srv_purge_coordinator_thread)( slot = srv_reserve_slot(SRV_PURGE); +#ifdef UNIV_DEBUG + UT_LIST_INIT(slot->debug_sync, + &srv_slot_t::debug_sync_t::debug_sync_list); + rw_lock_create(PFS_NOT_INSTRUMENTED, &slot->debug_sync_lock, + SYNC_NO_ORDER_CHECK); +#endif ulint rseg_history_len = trx_sys.history_size(); do { @@ -2739,7 +2761,7 @@ DECLARE_THREAD(srv_purge_coordinator_thread)( n_total_purged = 0; - rseg_history_len = srv_do_purge(&n_total_purged); + rseg_history_len = srv_do_purge(&n_total_purged, slot); } while (!srv_purge_should_exit()); /* The task queue should always be empty, independent of fast @@ -2897,3 +2919,33 @@ srv_was_tablespace_truncated(const fil_space_t* space) return (!is_system_tablespace(space->id) && truncate_t::was_tablespace_truncated(space->id)); } + +static uint get_first_slot(srv_thread_type type) +{ + switch (type) { + case SRV_MASTER: + return SRV_MASTER_SLOT; + case SRV_PURGE: + return SRV_PURGE_SLOT; + case SRV_WORKER: + /* Find an empty slot, skip the master and purge slots. */ + return SRV_WORKER_SLOTS_START; + default: + ut_error; + } +} + +void srv_for_each_thread(srv_thread_type type, + srv_slot_callback_t callback, + const void *arg) +{ + int slot_idx= get_first_slot(type); + while(slot_idx < srv_sys.n_sys_threads + && srv_sys.sys_threads[slot_idx].in_use + && srv_sys.sys_threads[slot_idx].type == type) + { + srv_slot_t *slot= &srv_sys.sys_threads[slot_idx]; + callback(slot, arg); + slot_idx++; + } +} diff --git a/storage/innobase/trx/trx0purge.cc b/storage/innobase/trx/trx0purge.cc index 475babb6195..cd7572462dc 100644 --- a/storage/innobase/trx/trx0purge.cc +++ b/storage/innobase/trx/trx0purge.cc @@ -1561,7 +1561,9 @@ trx_purge( /*======*/ ulint n_purge_threads, /*!< in: number of purge tasks to submit to the work queue */ - bool truncate) /*!< in: truncate history if true */ + bool truncate, /*!< in: truncate history if true */ + srv_slot_t *slot) /*!< in/out: purge coordinator + thread slot */ { que_thr_t* thr = NULL; ulint n_pages_handled; @@ -1597,6 +1599,7 @@ trx_purge( thr = que_fork_scheduler_round_robin(purge_sys.query, thr); + thr->thread_slot = slot; que_run_threads(thr); my_atomic_addlint(&purge_sys.n_completed, 1); |