diff options
author | Marko Mäkelä <marko.makela@mariadb.com> | 2022-04-14 12:59:59 +0300 |
---|---|---|
committer | Marko Mäkelä <marko.makela@mariadb.com> | 2022-04-14 12:59:59 +0300 |
commit | e5f7657edad2fac7f111be1769cd6c178eb279c4 (patch) | |
tree | 095f660439b892f944bb5fe15564bfcd2ce8fd6d | |
parent | 2aed566d2267a824158025c09830bc6353ec88a9 (diff) | |
download | mariadb-git-bb-10.6-MDEV-21423.tar.gz |
MDEV-21423: Replace trx_sys.rw_trx_hash with a locked hash tablebb-10.6-MDEV-21423
The embedded page_hash_latch in buf_pool.page_hash have served us well.
Let us attempt the same approach for trx_sys.rw_trx_hash,
instead of using the lock-free hash table that is unable to shrink.
Compared to the lock-free hash table, any iteration of the entire
hash table, as in rw_trx_hash.for_each() or rw_trx_hash.for_each_until()
may potentially modify a large number of cache lines.
Outside debug checks, trx_sys.rw_trx_hash is being iterated in
ReadView::open(), trx_sys_t::clone_oldest_view(), and
trx_sys_t::get_min_trx_id() (invoked on secondary index lock checks).
To reduce the impact of locking on processors
that support transactional memory, we introduce the predicate
rw_trx_hash_t::empty() so that unnecessary acquisition and releasing
of hash array latches can be avoided when an entire slice of the
hash array is empty.
FIXME: This is reducing throughput in many cases. How to improve this
further?
TODO: Find a reasonable value for the constexpr size_t n_cells.
-rw-r--r-- | storage/innobase/btr/btr0cur.cc | 3 | ||||
-rw-r--r-- | storage/innobase/dict/dict0load.cc | 26 | ||||
-rw-r--r-- | storage/innobase/dict/dict0stats.cc | 6 | ||||
-rw-r--r-- | storage/innobase/handler/ha_innodb.cc | 17 | ||||
-rw-r--r-- | storage/innobase/include/buf0buf.h | 2 | ||||
-rw-r--r-- | storage/innobase/include/read0types.h | 2 | ||||
-rw-r--r-- | storage/innobase/include/row0vers.h | 3 | ||||
-rw-r--r-- | storage/innobase/include/trx0sys.h | 611 | ||||
-rw-r--r-- | storage/innobase/include/trx0trx.h | 112 | ||||
-rw-r--r-- | storage/innobase/lock/lock0lock.cc | 173 | ||||
-rw-r--r-- | storage/innobase/read/read0read.cc | 10 | ||||
-rw-r--r-- | storage/innobase/row/row0import.cc | 3 | ||||
-rw-r--r-- | storage/innobase/row/row0row.cc | 3 | ||||
-rw-r--r-- | storage/innobase/row/row0sel.cc | 6 | ||||
-rw-r--r-- | storage/innobase/row/row0vers.cc | 13 | ||||
-rw-r--r-- | storage/innobase/trx/trx0purge.cc | 8 | ||||
-rw-r--r-- | storage/innobase/trx/trx0roll.cc | 100 | ||||
-rw-r--r-- | storage/innobase/trx/trx0sys.cc | 105 | ||||
-rw-r--r-- | storage/innobase/trx/trx0trx.cc | 221 |
19 files changed, 530 insertions, 894 deletions
diff --git a/storage/innobase/btr/btr0cur.cc b/storage/innobase/btr/btr0cur.cc index 05f3128d819..ace12856e71 100644 --- a/storage/innobase/btr/btr0cur.cc +++ b/storage/innobase/btr/btr0cur.cc @@ -598,8 +598,7 @@ inconsistent: trx_sys.is_registered(DB_TRX_ID). */ if (rec_offs_n_fields(offsets) > ulint(index->n_fields) + !!index->table->instant - && !trx_sys.is_registered(current_trx(), - row_get_rec_trx_id(rec, index, + && !trx_sys.is_registered(row_get_rec_trx_id(rec, index, offsets))) { goto inconsistent; } diff --git a/storage/innobase/dict/dict0load.cc b/storage/innobase/dict/dict0load.cc index 49ebc63d24c..a084f77c8b4 100644 --- a/storage/innobase/dict/dict0load.cc +++ b/storage/innobase/dict/dict0load.cc @@ -667,7 +667,7 @@ dict_sys_tables_rec_read( rec, DICT_FLD__SYS_TABLES__DB_TRX_ID, &len); ut_ad(len == 6 || len == UNIV_SQL_NULL); trx_id_t id = len == 6 ? trx_read_trx_id(field) : 0; - if (id && trx_sys.find(nullptr, id, false)) { + if (id && trx_sys.find(id, false)) { const auto savepoint = mtr->get_savepoint(); heap = mem_heap_create(1024); dict_index_t* index = UT_LIST_GET_FIRST( @@ -676,7 +676,7 @@ dict_sys_tables_rec_read( rec, index, nullptr, true, ULINT_UNDEFINED, &heap); const rec_t* old_vers; row_vers_build_for_semi_consistent_read( - nullptr, rec, mtr, index, &offsets, &heap, + rec, mtr, index, &offsets, &heap, heap, &old_vers, nullptr); mtr->rollback_to_savepoint(savepoint); rec = old_vers; @@ -1075,7 +1075,7 @@ err_len: const trx_id_t trx_id = trx_read_trx_id(field); - if (trx_id && mtr && trx_sys.find(nullptr, trx_id, false)) { + if (trx_id && mtr && trx_sys.find(trx_id, false)) { const auto savepoint = mtr->get_savepoint(); dict_index_t* index = UT_LIST_GET_FIRST( dict_sys.sys_columns->indexes); @@ -1083,7 +1083,7 @@ err_len: rec, index, nullptr, true, ULINT_UNDEFINED, &heap); const rec_t* old_vers; row_vers_build_for_semi_consistent_read( - nullptr, rec, mtr, index, &offsets, &heap, + rec, mtr, index, &offsets, &heap, heap, &old_vers, nullptr); mtr->rollback_to_savepoint(savepoint); rec = old_vers; @@ -1278,7 +1278,7 @@ err_len: const trx_id_t trx_id = trx_read_trx_id(field); - if (trx_id && column && trx_sys.find(nullptr, trx_id, false)) { + if (trx_id && column && trx_sys.find(trx_id, false)) { if (!rec_get_deleted_flag(rec, 0)) { return dict_load_virtual_none; } @@ -1621,7 +1621,7 @@ err_len: if (!trx_id) { ut_ad(!rec_get_deleted_flag(rec, 0)); - } else if (mtr && trx_sys.find(nullptr, trx_id, false)) { + } else if (mtr && trx_sys.find(trx_id, false)) { const auto savepoint = mtr->get_savepoint(); dict_index_t* sys_field = UT_LIST_GET_FIRST( dict_sys.sys_fields->indexes); @@ -1629,7 +1629,7 @@ err_len: rec, sys_field, nullptr, true, ULINT_UNDEFINED, &heap); const rec_t* old_vers; row_vers_build_for_semi_consistent_read( - nullptr, rec, mtr, sys_field, &offsets, &heap, + rec, mtr, sys_field, &offsets, &heap, heap, &old_vers, nullptr); mtr->rollback_to_savepoint(savepoint); rec = old_vers; @@ -1833,7 +1833,7 @@ err_len: if (!trx_id) { ut_ad(!rec_get_deleted_flag(rec, 0)); } else if (!mtr) { - } else if (trx_sys.find(nullptr, trx_id, false)) { + } else if (trx_sys.find(trx_id, false)) { const auto savepoint = mtr->get_savepoint(); dict_index_t* sys_index = UT_LIST_GET_FIRST( dict_sys.sys_indexes->indexes); @@ -1841,7 +1841,7 @@ err_len: rec, sys_index, nullptr, true, ULINT_UNDEFINED, &heap); const rec_t* old_vers; row_vers_build_for_semi_consistent_read( - nullptr, rec, mtr, sys_index, &offsets, &heap, + rec, mtr, sys_index, &offsets, &heap, heap, &old_vers, nullptr); mtr->rollback_to_savepoint(savepoint); rec = old_vers; @@ -2707,14 +2707,14 @@ retry: const trx_id_t id = trx_read_trx_id(field); if (!id) { - } else if (id != trx_id && trx_sys.find(nullptr, id, false)) { + } else if (id != trx_id && trx_sys.find(id, false)) { const auto savepoint = mtr.get_savepoint(); rec_offs* offsets = rec_get_offsets( rec, sys_index, nullptr, true, ULINT_UNDEFINED, &heap); const rec_t* old_vers; row_vers_build_for_semi_consistent_read( - nullptr, rec, &mtr, sys_index, &offsets, &heap, + rec, &mtr, sys_index, &offsets, &heap, heap, &old_vers, nullptr); mtr.rollback_to_savepoint(savepoint); rec = old_vers; @@ -2883,13 +2883,13 @@ dict_load_foreign( const trx_id_t tid = trx_read_trx_id(field); - if (tid && tid != trx_id && trx_sys.find(nullptr, tid, false)) { + if (tid && tid != trx_id && trx_sys.find(tid, false)) { const auto savepoint = mtr.get_savepoint(); rec_offs* offsets = rec_get_offsets( rec, sys_index, nullptr, true, ULINT_UNDEFINED, &heap); const rec_t* old_vers; row_vers_build_for_semi_consistent_read( - nullptr, rec, &mtr, sys_index, &offsets, &heap, + rec, &mtr, sys_index, &offsets, &heap, heap, &old_vers, nullptr); mtr.rollback_to_savepoint(savepoint); rec = old_vers; diff --git a/storage/innobase/dict/dict0stats.cc b/storage/innobase/dict/dict0stats.cc index 570903d4327..349efb3a758 100644 --- a/storage/innobase/dict/dict0stats.cc +++ b/storage/innobase/dict/dict0stats.cc @@ -1,7 +1,7 @@ /***************************************************************************** Copyright (c) 2009, 2019, Oracle and/or its affiliates. All Rights Reserved. -Copyright (c) 2015, 2021, MariaDB Corporation. +Copyright (c) 2015, 2022, MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -1432,7 +1432,7 @@ invalid: } const auto bulk_trx_id = index->table->bulk_trx_id; - if (bulk_trx_id && trx_sys.find(nullptr, bulk_trx_id, false)) { + if (bulk_trx_id && trx_sys.find(bulk_trx_id, false)) { goto invalid; } @@ -2548,7 +2548,7 @@ empty_index: } const auto bulk_trx_id = index->table->bulk_trx_id; - if (bulk_trx_id && trx_sys.find(nullptr, bulk_trx_id, false)) { + if (bulk_trx_id && trx_sys.find(bulk_trx_id, false)) { result.index_size = 1; result.n_leaf_pages = 1; goto empty_index; diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index 3f11061258c..e23cf047c7c 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -2946,23 +2946,6 @@ check_trx_exists( } } -/** - Gets current trx. - - This function may be called during InnoDB initialisation, when - innodb_hton_ptr->slot is not yet set to meaningful value. -*/ - -trx_t *current_trx() -{ - THD *thd=current_thd; - if (likely(thd != 0) && innodb_hton_ptr->slot != HA_SLOT_UNDEF) { - return thd_to_trx(thd); - } else { - return(NULL); - } -} - /*********************************************************************//** Note that a transaction has been registered with MySQL. @return true if transaction is registered with MySQL 2PC coordinator */ diff --git a/storage/innobase/include/buf0buf.h b/storage/innobase/include/buf0buf.h index a1b4ea53c3e..a9a74574f63 100644 --- a/storage/innobase/include/buf0buf.h +++ b/storage/innobase/include/buf0buf.h @@ -1637,7 +1637,7 @@ public: ((CPU_LEVEL1_DCACHE_LINESIZE / 64) - 1) * (64 / sizeof(void*)); /** number of payload elements in array[] */ - Atomic_relaxed<ulint> n_cells; + ulint n_cells; /** the hash table, with pad(n_cells) elements, aligned to L1 cache size */ hash_chain *array; diff --git a/storage/innobase/include/read0types.h b/storage/innobase/include/read0types.h index bc02fc065f5..bd20beb3b6c 100644 --- a/storage/innobase/include/read0types.h +++ b/storage/innobase/include/read0types.h @@ -118,7 +118,7 @@ loop: @param[in,out] trx transaction */ - inline void snapshot(trx_t *trx); + inline void snapshot(); /** diff --git a/storage/innobase/include/row0vers.h b/storage/innobase/include/row0vers.h index e05b18a8ccc..d93b91cef00 100644 --- a/storage/innobase/include/row0vers.h +++ b/storage/innobase/include/row0vers.h @@ -1,7 +1,7 @@ /***************************************************************************** Copyright (c) 1997, 2016, Oracle and/or its affiliates. All Rights Reserved. -Copyright (c) 2017, 2019, MariaDB Corporation. +Copyright (c) 2017, 2022, MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -117,7 +117,6 @@ which should be seen by a semi-consistent read. */ void row_vers_build_for_semi_consistent_read( /*====================================*/ - trx_t* caller_trx,/*!<in/out: trx of current thread */ const rec_t* rec, /*!< in: record in a clustered index; the caller must have a latch on the page; this latch locks the top of the stack of versions diff --git a/storage/innobase/include/trx0sys.h b/storage/innobase/include/trx0sys.h index 1f33a9db091..c949d84c61d 100644 --- a/storage/innobase/include/trx0sys.h +++ b/storage/innobase/include/trx0sys.h @@ -121,15 +121,12 @@ trx_sys_update_mysql_binlog_offset( int64_t offset, /*!< in: position in that log file */ buf_block_t* sys_header, /*!< in,out: trx sys header */ mtr_t* mtr); /*!< in,out: mini-transaction */ -/** Display the MySQL binlog offset info if it is present in the trx -system header. */ -void -trx_sys_print_mysql_binlog_offset(); +/** Display the binlog offset info if it is present in the undo pages. */ +void trx_sys_print_mysql_binlog_offset(); /** Create the rollback segments. @return whether the creation succeeded */ -bool -trx_sys_create_rsegs(); +bool trx_sys_create_rsegs(); /** The automatically created system rollback segment has this id */ #define TRX_SYS_SYSTEM_RSEG_ID 0 @@ -330,173 +327,182 @@ constexpr uint32_t TRX_SYS_DOUBLEWRITE_MAGIC_N= 536853855; constexpr uint32_t TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED_N= 1783657386; /* @} */ -trx_t* current_trx(); - -struct rw_trx_hash_element_t +class rw_trx_hash_t { - rw_trx_hash_element_t() + /** number of payload elements in array[] */ + static constexpr size_t n_cells= 64; + /** Hash cell chain */ + struct hash_chain { - memset(reinterpret_cast<void*>(this), 0, sizeof *this); - mutex.init(); - } - - - ~rw_trx_hash_element_t() { mutex.destroy(); } - - - trx_id_t id; /* lf_hash_init() relies on this to be first in the struct */ - - /** - Transaction serialization number. - - Assigned shortly before the transaction is moved to COMMITTED_IN_MEMORY - state. Initially set to TRX_ID_MAX. - */ - Atomic_counter<trx_id_t> no; - trx_t *trx; - srw_mutex mutex; -}; + /** pointer to the first transaction */ + trx_t *first; + /** Append a transaction. */ + void append(trx_t *trx) + { + ut_d(validate_element(trx)); + ut_ad(!trx->rw_trx_hash); + trx_t **prev= &first; + while (*prev) + prev= &(*prev)->rw_trx_hash; + *prev= trx; + } -/** - Wrapper around LF_HASH to store set of in memory read-write transactions. -*/ + /** Remove a transaction. */ + void remove(trx_t *trx) + { + ut_d(validate_element(trx)); + trx_t **prev= &first; + while (*prev != trx) + prev= &(*prev)->rw_trx_hash; + *prev= trx->rw_trx_hash; + trx->rw_trx_hash= nullptr; + } + }; -class rw_trx_hash_t -{ - LF_HASH hash; + /** Number of array[] elements per page_hash_latch. + Must be one less than a power of 2. */ + static constexpr size_t ELEMENTS_PER_LATCH= 64 / sizeof(void*) - 1; + static constexpr size_t EMPTY_SLOTS_PER_LATCH= + ((CPU_LEVEL1_DCACHE_LINESIZE / 64) - 1) * (64 / sizeof(void*)); + /** @return raw array index converted to padded index */ + static constexpr ulint pad(ulint h) + { + return 1 + h + (h / ELEMENTS_PER_LATCH) * (1 + EMPTY_SLOTS_PER_LATCH); + } - template <typename T> - using walk_action= my_bool(rw_trx_hash_element_t *element, T *action); + static constexpr size_t n_cells_padded= 1 + n_cells + + (n_cells / ELEMENTS_PER_LATCH) * (1 + EMPTY_SLOTS_PER_LATCH); + static constexpr size_t n_cells_padded_aligned= + (n_cells_padded + CPU_LEVEL1_DCACHE_LINESIZE - 1) & + ~(CPU_LEVEL1_DCACHE_LINESIZE - 1); + /** the hash table */ + alignas(CPU_LEVEL1_DCACHE_LINESIZE) + hash_chain array[n_cells_padded_aligned]; - /** - Constructor callback for lock-free allocator. + ulint calc_hash(trx_id_t id) const { return calc_hash(ulint(id), n_cells); } - Object is just allocated and is not yet accessible via rw_trx_hash by - concurrent threads. Object can be reused multiple times before it is freed. - Every time object is being reused initializer() callback is called. - */ + /** @return the hash value before any ELEMENTS_PER_LATCH padding */ + static ulint hash(ulint fold, ulint n) { return ut_hash_ulint(fold, n); } - static void rw_trx_hash_constructor(uchar *arg) + /** @return the index of an array element */ + static ulint calc_hash(ulint fold, ulint n_cells) { - new(arg + LF_HASH_OVERHEAD) rw_trx_hash_element_t(); + return pad(hash(fold, n_cells)); } - - /** - Destructor callback for lock-free allocator. - - Object is about to be freed and is not accessible via rw_trx_hash by - concurrent threads. - */ - - static void rw_trx_hash_destructor(uchar *arg) + /** @return the latch covering a hash table chain */ + static page_hash_latch &lock_get(hash_chain &chain) { - reinterpret_cast<rw_trx_hash_element_t*> - (arg + LF_HASH_OVERHEAD)->~rw_trx_hash_element_t(); + static_assert(!((ELEMENTS_PER_LATCH + 1) & ELEMENTS_PER_LATCH), + "must be one less than a power of 2"); + const size_t addr= reinterpret_cast<size_t>(&chain); + ut_ad(addr & (ELEMENTS_PER_LATCH * sizeof chain)); + return *reinterpret_cast<page_hash_latch*> + (addr & ~(ELEMENTS_PER_LATCH * sizeof chain)); } + /** Get a hash table slot. */ + hash_chain &cell_get(trx_id_t id) { return array[calc_hash(id)]; } - /** - Destructor callback for lock-free allocator. +#ifdef NO_ELISION + static constexpr bool empty(const page_hash_latch*) { return false; } +#else + /** @return whether the cache line is empty */ + TRANSACTIONAL_TARGET bool empty(const page_hash_latch *lk) const; +#endif - This destructor is used at shutdown. It frees remaining transaction - objects. +public: + inline void destroy(); - XA PREPARED transactions may remain if they haven't been committed or - rolled back. ACTIVE transactions may remain if startup was interrupted or - server is running in read-only mode or for certain srv_force_recovery - levels. - */ + bool empty() { return !for_each_until([](const trx_t&){return true;}); } - static void rw_trx_hash_shutdown_destructor(uchar *arg) + size_t size() { - rw_trx_hash_element_t *element= - reinterpret_cast<rw_trx_hash_element_t*>(arg + LF_HASH_OVERHEAD); - if (trx_t *trx= element->trx) - { - ut_ad(trx_state_eq(trx, TRX_STATE_PREPARED) || - trx_state_eq(trx, TRX_STATE_PREPARED_RECOVERED) || - (trx_state_eq(trx, TRX_STATE_ACTIVE) && - (!srv_was_started || - srv_read_only_mode || - srv_force_recovery >= SRV_FORCE_NO_TRX_UNDO))); - trx_free_at_shutdown(trx); - } - element->~rw_trx_hash_element_t(); + size_t count= 0; + for_each([&count](const trx_t&){count++;}); + return count; } + page_hash_latch &lock_get(trx_id_t id) { return lock_get(cell_get(id)); } - /** - Initializer callback for lock-free hash. - - Object is not yet accessible via rw_trx_hash by concurrent threads, but is - about to become such. Object id can be changed only by this callback and - remains the same until all pins to this object are released. - - Object trx can be changed to 0 by erase() under object mutex protection, - which indicates it is about to be removed from lock-free hash and become - not accessible by concurrent threads. - */ - - static void rw_trx_hash_initializer(LF_HASH *, - rw_trx_hash_element_t *element, - trx_t *trx) + void insert(trx_t *trx) { - ut_ad(element->trx == 0); - element->trx= trx; - element->id= trx->id; - element->no= TRX_ID_MAX; - trx->rw_trx_hash_element= element; + hash_chain &chain= cell_get(trx->id); + auto &lk= lock_get(chain); + lk.lock(); + chain.append(trx); + lk.unlock(); } + void erase(trx_t *trx) + { + hash_chain &chain= cell_get(trx->id); + auto &lk= lock_get(chain); + lk.lock(); + chain.remove(trx); + lk.unlock(); + } - /** - Gets LF_HASH pins. - - Pins are used to protect object from being destroyed or reused. They are - normally stored in trx object for quick access. If caller doesn't have trx - available, we try to get it using currnet_trx(). If caller doesn't have trx - at all, temporary pins are allocated. - */ - - LF_PINS *get_pins(trx_t *trx) + template <typename Callable> void for_each(Callable &&callback) { - if (!trx->rw_trx_hash_pins) + const ulint n= pad(n_cells); + ulint i= 0; + do { - trx->rw_trx_hash_pins= lf_hash_get_pins(&hash); - ut_a(trx->rw_trx_hash_pins); + const ulint k= i + (ELEMENTS_PER_LATCH + 1); + auto lk= reinterpret_cast<page_hash_latch*>(&array[i]); + if (!empty(lk)) + { + lk->lock_shared(); + while (++i < k) + { + for (trx_t* trx= array[i].first; trx; trx= trx->rw_trx_hash) + { + ut_d(validate_element(trx)); + callback(*trx); + } + } + lk->unlock_shared(); + } + i= k; } - return trx->rw_trx_hash_pins; + while (i < n); } - - template <typename T> struct eliminate_duplicates_arg - { - trx_ids_t ids; - walk_action<T> *action; - T *argument; - eliminate_duplicates_arg(size_t size, walk_action<T> *act, T *arg): - action(act), argument(arg) { ids.reserve(size); } - }; - - - template <typename T> - static my_bool eliminate_duplicates(rw_trx_hash_element_t *element, - eliminate_duplicates_arg<T> *arg) + template <typename Callable> bool for_each_until(Callable &&callback) { - for (trx_ids_t::iterator it= arg->ids.begin(); it != arg->ids.end(); it++) + const ulint n= pad(n_cells); + ulint i= 0; + do { - if (*it == element->id) - return 0; + const ulint k= i + (ELEMENTS_PER_LATCH + 1); + auto lk= reinterpret_cast<page_hash_latch*>(&array[i]); + if (!empty(lk)) + { + lk->lock_shared(); + while (++i < k) + { + for (trx_t* trx= array[i].first; trx; trx= trx->rw_trx_hash) + { + ut_d(validate_element(trx)); + if (callback(*trx)) + { + lk->unlock_shared(); + return true; + } + } + } + lk->unlock_shared(); + } + i= k; } - arg->ids.push_back(element->id); - return arg->action(element, arg->argument); + while (i < n); + return false; } - #ifdef UNIV_DEBUG static void validate_element(trx_t *trx) { @@ -510,70 +516,10 @@ class rw_trx_hash_t trx_state_eq(trx, TRX_STATE_PREPARED)); ut_d(trx->mutex_unlock()); } - - - template <typename T> struct debug_iterator_arg - { - walk_action<T> *action; - T *argument; - }; - - - template <typename T> - static my_bool debug_iterator(rw_trx_hash_element_t *element, - debug_iterator_arg<T> *arg) - { - element->mutex.wr_lock(); - if (element->trx) - validate_element(element->trx); - element->mutex.wr_unlock(); - return arg->action(element, arg->argument); - } #endif - -public: - void init() - { - lf_hash_init(&hash, sizeof(rw_trx_hash_element_t), LF_HASH_UNIQUE, 0, - sizeof(trx_id_t), 0, &my_charset_bin); - hash.alloc.constructor= rw_trx_hash_constructor; - hash.alloc.destructor= rw_trx_hash_destructor; - hash.initializer= - reinterpret_cast<lf_hash_initializer>(rw_trx_hash_initializer); - } - - - void destroy() - { - hash.alloc.destructor= rw_trx_hash_shutdown_destructor; - lf_hash_destroy(&hash); - } - - /** - Releases LF_HASH pins. - - Must be called by thread that owns trx_t object when the latter is being - "detached" from thread (e.g. released to the pool by trx_t::free()). Can be - called earlier if thread is expected not to use rw_trx_hash. - - Since pins are not allowed to be transferred to another thread, - initialisation thread calls this for recovered transactions. - */ - - void put_pins(trx_t *trx) - { - if (trx->rw_trx_hash_pins) - { - lf_hash_put_pins(trx->rw_trx_hash_pins); - trx->rw_trx_hash_pins= 0; - } - } - - - /** - Finds trx object in lock-free hash with given id. + Find a transaction. Only ACTIVE or PREPARED trx objects may participate in hash. Nevertheless the transaction may get committed before this method returns. @@ -585,27 +531,11 @@ public: holding lock_sys.latch. Caller is responsible for calling trx->release_reference() when it is done playing with trx. - Ideally this method should get caller rw_trx_hash_pins along with trx - object as a parameter, similar to insert() and erase(). However most - callers lose trx early in their call chains and it is not that easy to pass - them through. - - So we take more expensive approach: get trx through current_thd()->ha_data. - Some threads don't have trx attached to THD, and at least server - initialisation thread, fts_optimize_thread, srv_master_thread, - dict_stats_thread, srv_monitor_thread, btr_defragment_thread don't even - have THD at all. For such cases we allocate pins only for duration of - search and free them immediately. - - This has negative performance impact and should be fixed eventually (by - passing caller_trx as a parameter). Still stream of DML is more or less Ok. - - @return - @retval 0 not found - @retval pointer to trx + @return transaction corresponding to trx_id + @retval nullptr if not found */ - trx_t *find(trx_t *caller_trx, trx_id_t trx_id, bool do_ref_count) + trx_t *find(trx_id_t trx_id, bool do_ref_count) { /* In MariaDB 10.3, purge will reset DB_TRX_ID to 0 @@ -617,21 +547,15 @@ public: The caller should already have handled trx_id==0 specially. */ ut_ad(trx_id); - ut_ad(!caller_trx || caller_trx->id != trx_id || !do_ref_count); - - trx_t *trx= 0; - LF_PINS *pins= caller_trx ? get_pins(caller_trx) : lf_hash_get_pins(&hash); - ut_a(pins); - rw_trx_hash_element_t *element= reinterpret_cast<rw_trx_hash_element_t*> - (lf_hash_search(&hash, pins, reinterpret_cast<const void*>(&trx_id), - sizeof(trx_id_t))); - if (element) + trx_t *trx= nullptr; + hash_chain &chain= cell_get(trx_id); + auto &lk= lock_get(chain); + lk.lock_shared(); + for (trx= chain.first; trx; trx= trx->rw_trx_hash) { - element->mutex.wr_lock(); - lf_hash_search_unpin(pins); - if ((trx= element->trx)) { - DBUG_ASSERT(trx_id == trx->id); + if (trx->id == trx_id) + { ut_d(validate_element(trx)); if (do_ref_count) { @@ -648,138 +572,12 @@ public: else trx->reference(); } + break; } - element->mutex.wr_unlock(); } - if (!caller_trx) - lf_hash_put_pins(pins); + lk.unlock_shared(); return trx; } - - - /** - Inserts trx to lock-free hash. - - Object becomes accessible via rw_trx_hash. - */ - - void insert(trx_t *trx) - { - ut_d(validate_element(trx)); - int res= lf_hash_insert(&hash, get_pins(trx), - reinterpret_cast<void*>(trx)); - ut_a(res == 0); - } - - - /** - Removes trx from lock-free hash. - - Object becomes not accessible via rw_trx_hash. But it still can be pinned - by concurrent find(), which is supposed to release it immediately after - it sees object trx is 0. - */ - - void erase(trx_t *trx) - { - ut_d(validate_element(trx)); - trx->rw_trx_hash_element->mutex.wr_lock(); - trx->rw_trx_hash_element->trx= nullptr; - trx->rw_trx_hash_element->mutex.wr_unlock(); - int res= lf_hash_delete(&hash, get_pins(trx), - reinterpret_cast<const void*>(&trx->id), - sizeof(trx_id_t)); - ut_a(res == 0); - } - - - /** - Returns the number of elements in the hash. - - The number is exact only if hash is protected against concurrent - modifications (e.g. single threaded startup or hash is protected - by some mutex). Otherwise the number may be used as a hint only, - because it may change even before this method returns. - */ - - uint32_t size() { return uint32_t(lf_hash_size(&hash)); } - - - /** - Iterates the hash. - - @param caller_trx used to get/set pins - @param action called for every element in hash - @param argument opque argument passed to action - - May return the same element multiple times if hash is under contention. - If caller doesn't like to see the same transaction multiple times, it has - to call iterate_no_dups() instead. - - May return element with committed transaction. If caller doesn't like to - see committed transactions, it has to skip those under element mutex: - - element->mutex.wr_lock(); - if (trx_t trx= element->trx) - { - // trx is protected against commit in this branch - } - element->mutex.wr_unlock(); - - May miss concurrently inserted transactions. - - @return - @retval 0 iteration completed successfully - @retval 1 iteration was interrupted (action returned 1) - */ - - template <typename T> - int iterate(trx_t *caller_trx, walk_action<T> *action, T *argument= nullptr) - { - LF_PINS *pins= caller_trx ? get_pins(caller_trx) : lf_hash_get_pins(&hash); - ut_a(pins); -#ifdef UNIV_DEBUG - debug_iterator_arg<T> debug_arg= { action, argument }; - action= reinterpret_cast<decltype(action)>(debug_iterator<T>); - argument= reinterpret_cast<T*>(&debug_arg); -#endif - int res= lf_hash_iterate(&hash, pins, - reinterpret_cast<my_hash_walk_action>(action), - const_cast<void*>(static_cast<const void*> - (argument))); - if (!caller_trx) - lf_hash_put_pins(pins); - return res; - } - - - template <typename T> - int iterate(walk_action<T> *action, T *argument= nullptr) - { - return iterate(current_trx(), action, argument); - } - - - /** - Iterates the hash and eliminates duplicate elements. - - @sa iterate() - */ - - template <typename T> - int iterate_no_dups(trx_t *caller_trx, walk_action<T> *action, - T *argument= nullptr) - { - eliminate_duplicates_arg<T> arg(size() + 32, action, argument); - return iterate(caller_trx, eliminate_duplicates<T>, &arg); - } - - - template <typename T> - int iterate_no_dups(walk_action<T> *action, T *argument= nullptr) - { - return iterate_no_dups(current_trx(), action, argument); - } }; class thread_safe_trx_ilist_t @@ -889,16 +687,6 @@ public: /** - Constructor. - - Some members may require late initialisation, thus we just mark object as - uninitialised. Real initialisation happens in create(). - */ - - trx_sys_t(): m_initialised(false) {} - - - /** @return TRX_RSEG_HISTORY length (number of committed transactions to purge) */ uint32_t history_size(); @@ -923,7 +711,6 @@ public: */ TPOOL_SUPPRESS_TSAN bool history_exists(); - /** Returns the minimum trx id in rw trx list. @@ -937,11 +724,15 @@ public: trx_id_t get_min_trx_id() { trx_id_t id= get_max_trx_id(); - rw_trx_hash.iterate(get_min_trx_id_callback, &id); + rw_trx_hash.for_each([&id](const trx_t &trx) + { + /* We don't care about read-only transactions here. */ + if (trx.id < id && trx.rsegs.m_redo.rseg) + id= trx.id; + }); return id; } - /** Determines the maximum transaction id. @@ -990,11 +781,7 @@ public: @param trx transaction */ - void assign_new_trx_no(trx_t *trx) - { - trx->rw_trx_hash_element->no= get_new_trx_id_no_refresh(); - refresh_rw_trx_hash_version(); - } + inline void assign_new_trx_no(trx_t &trx); /** @@ -1009,31 +796,30 @@ public: We rely on get_rw_trx_hash_version() to issue ACQUIRE memory barrier so that loading of m_rw_trx_hash_version happens before accessing rw_trx_hash. - To optimise snapshot creation rw_trx_hash.iterate() is being used instead - of rw_trx_hash.iterate_no_dups(). It means that some transaction - identifiers may appear multiple times in ids. - - @param[in,out] caller_trx used to get access to rw_trx_hash_pins @param[out] ids array to store registered transaction identifiers @param[out] max_trx_id variable to store m_max_trx_id value @param[out] mix_trx_no variable to store min(no) value */ - void snapshot_ids(trx_t *caller_trx, trx_ids_t *ids, trx_id_t *max_trx_id, - trx_id_t *min_trx_no) + void snapshot_ids(trx_ids_t *ids, trx_id_t *max_trx_id, trx_id_t *min_trx_no) { - snapshot_ids_arg arg(ids); - - while ((arg.m_id= get_rw_trx_hash_version()) != get_max_trx_id()) + trx_id_t max_id; + while ((max_id= get_rw_trx_hash_version()) != get_max_trx_id()) ut_delay(1); - arg.m_no= arg.m_id; - + trx_id_t min_no= max_id; ids->clear(); ids->reserve(rw_trx_hash.size() + 32); - rw_trx_hash.iterate(caller_trx, copy_one_id, &arg); - - *max_trx_id= arg.m_id; - *min_trx_no= arg.m_no; + rw_trx_hash.for_each([max_id,&min_no,&ids](const trx_t &trx) + { + if (trx.id < max_id) + { + ids->push_back(trx.id); + if (trx.no < min_no) + min_no= trx.no; + } + }); + *max_trx_id= max_id; + *min_trx_no= min_no; } @@ -1115,15 +901,12 @@ public: } - bool is_registered(trx_t *caller_trx, trx_id_t id) - { - return id && find(caller_trx, id, false); - } + bool is_registered(trx_id_t id) { return id && find(id, false); } - trx_t *find(trx_t *caller_trx, trx_id_t id, bool do_ref_count= true) + trx_t *find(trx_id_t id, bool do_ref_count= true) { - return rw_trx_hash.find(caller_trx, id, do_ref_count); + return rw_trx_hash.find(id, do_ref_count); } @@ -1173,44 +956,6 @@ public: } private: - static my_bool get_min_trx_id_callback(rw_trx_hash_element_t *element, - trx_id_t *id) - { - if (element->id < *id) - { - element->mutex.wr_lock(); - /* We don't care about read-only transactions here. */ - if (element->trx && element->trx->rsegs.m_redo.rseg) - *id= element->id; - element->mutex.wr_unlock(); - } - return 0; - } - - - struct snapshot_ids_arg - { - snapshot_ids_arg(trx_ids_t *ids): m_ids(ids) {} - trx_ids_t *m_ids; - trx_id_t m_id; - trx_id_t m_no; - }; - - - static my_bool copy_one_id(rw_trx_hash_element_t *element, - snapshot_ids_arg *arg) - { - if (element->id < arg->m_id) - { - trx_id_t no= element->no; - arg->m_ids->push_back(element->id); - if (no < arg->m_no) - arg->m_no= no; - } - return 0; - } - - /** Getter for m_rw_trx_hash_version, must issue ACQUIRE memory barrier. */ trx_id_t get_rw_trx_hash_version() { diff --git a/storage/innobase/include/trx0trx.h b/storage/innobase/include/trx0trx.h index 2fd04353a2b..b5fb521fe50 100644 --- a/storage/innobase/include/trx0trx.h +++ b/storage/innobase/include/trx0trx.h @@ -41,7 +41,7 @@ Created 3/26/1996 Heikki Tuuri // Forward declaration struct mtr_t; -struct rw_trx_hash_element_t; +class rw_trx_hash_t; /******************************************************************//** Set detailed error message for the transaction. */ @@ -585,55 +585,7 @@ private: alignas(CPU_LEVEL1_DCACHE_LINESIZE) Atomic_counter<int32_t> n_ref; - - public: - /** Transaction identifier (0 if no locks were acquired). - Set by trx_sys_t::register_rw() or trx_resurrect() before - the transaction is added to trx_sys.rw_trx_hash. - Cleared in commit_in_memory() after commit_state(), - trx_sys_t::deregister_rw(), release_locks(). */ - trx_id_t id; - -private: - /** mutex protecting state and some of lock - (some are protected by lock_sys.latch) */ - srw_spin_mutex mutex; -#ifdef UNIV_DEBUG - /** The owner of mutex (0 if none); protected by mutex */ - std::atomic<os_thread_id_t> mutex_owner{0}; -#endif /* UNIV_DEBUG */ -public: - void mutex_init() { mutex.init(); } - void mutex_destroy() { mutex.destroy(); } - - /** Acquire the mutex */ - void mutex_lock() - { - ut_ad(!mutex_is_owner()); - mutex.wr_lock(); - ut_ad(!mutex_owner.exchange(os_thread_get_curr_id(), - std::memory_order_relaxed)); - } - /** Release the mutex */ - void mutex_unlock() - { - ut_ad(mutex_owner.exchange(0, std::memory_order_relaxed) - == os_thread_get_curr_id()); - mutex.wr_unlock(); - } -#ifndef SUX_LOCK_GENERIC - bool mutex_is_locked() const noexcept { return mutex.is_locked(); } -#endif -#ifdef UNIV_DEBUG - /** @return whether the current thread holds the mutex */ - bool mutex_is_owner() const - { - return mutex_owner.load(std::memory_order_relaxed) == - os_thread_get_curr_id(); - } -#endif /* UNIV_DEBUG */ - /** State of the trx from the point of view of concurrency control and the valid state transitions. @@ -697,6 +649,62 @@ public: Transitions to COMMITTED are protected by trx_t::mutex. */ Atomic_relaxed<trx_state_t> state; + /** Transaction identifier (0 if no locks were acquired). + Set by trx_sys_t::register_rw() or trx_resurrect() before + the transaction is added to trx_sys.rw_trx_hash. + Cleared in commit_in_memory() after commit_state(), + trx_sys_t::deregister_rw(), release_locks(). */ + trx_id_t id; + /** + Transaction commit number for read view creation. + + Assigned shortly before trx is moved to TRX_STATE_COMMITTED_IN_MEMORY. + Initially set to TRX_ID_MAX. + */ + trx_id_t no; +private: + friend class rw_trx_hash_t; + /** Next element in trx_sys.rw_trx_hash bucket chain */ + trx_t *rw_trx_hash; + + /** mutex protecting state and some of lock + (some are protected by lock_sys.latch) */ + srw_spin_mutex mutex; +#ifdef UNIV_DEBUG + /** The owner of mutex (0 if none); protected by mutex */ + std::atomic<os_thread_id_t> mutex_owner{0}; +#endif /* UNIV_DEBUG */ +public: + void mutex_init() { mutex.init(); } + void mutex_destroy() { mutex.destroy(); } + + /** Acquire the mutex */ + void mutex_lock() + { + ut_ad(!mutex_is_owner()); + mutex.wr_lock(); + ut_ad(!mutex_owner.exchange(os_thread_get_curr_id(), + std::memory_order_relaxed)); + } + /** Release the mutex */ + void mutex_unlock() + { + ut_ad(mutex_owner.exchange(0, std::memory_order_relaxed) + == os_thread_get_curr_id()); + mutex.wr_unlock(); + } +#ifndef SUX_LOCK_GENERIC + bool mutex_is_locked() const noexcept { return mutex.is_locked(); } +#endif +#ifdef UNIV_DEBUG + /** @return whether the current thread holds the mutex */ + bool mutex_is_owner() const + { + return mutex_owner.load(std::memory_order_relaxed) == + os_thread_get_curr_id(); + } +#endif /* UNIV_DEBUG */ + /** The locks of the transaction. Protected by lock_sys.latch (insertions also by trx_t::mutex). */ alignas(CPU_LEVEL1_DCACHE_LINESIZE) trx_lock_t lock; @@ -884,8 +892,6 @@ public: /*------------------------------*/ char* detailed_error; /*!< detailed error message for last error, or empty. */ - rw_trx_hash_element_t *rw_trx_hash_element; - LF_PINS *rw_trx_hash_pins; ulint magic_n; /** @return whether any persistent undo log has been generated */ @@ -1010,11 +1016,11 @@ public: void assert_freed() const { - ut_ad(state == TRX_STATE_NOT_STARTED); + ut_ad(!is_referenced()); ut_ad(!id); + ut_ad(!rw_trx_hash); ut_ad(!mutex_is_owner()); ut_ad(!has_logged()); - ut_ad(!is_referenced()); ut_ad(!is_wsrep()); ut_ad(!lock.was_chosen_as_deadlock_victim); ut_ad(mod_tables.empty()); diff --git a/storage/innobase/lock/lock0lock.cc b/storage/innobase/lock/lock0lock.cc index 05fdf9b3efe..c0c322eb21e 100644 --- a/storage/innobase/lock/lock0lock.cc +++ b/storage/innobase/lock/lock0lock.cc @@ -4603,8 +4603,8 @@ func_exit: : 0; if (trx_t *impl_trx = impl_trx_id - ? trx_sys.find(current_trx(), impl_trx_id, false) - : 0) { + ? trx_sys.find(impl_trx_id, false) + : nullptr) { /* impl_trx could have been committed before we acquire its mutex, but not thereafter. */ @@ -4664,7 +4664,7 @@ func_exit: } for (lock = lock_sys_t::get_first(cell, id, heap_no); - lock != NULL; + lock; lock = lock_rec_get_next_const(heap_no, lock)) { ut_ad(!lock->trx->read_only || !lock->trx->is_autocommit_non_locking()); @@ -4865,24 +4865,6 @@ static void lock_rec_block_validate(const page_id_t page_id) } } -static my_bool lock_validate_table_locks(rw_trx_hash_element_t *element, void*) -{ - lock_sys.assert_locked(); - element->mutex.wr_lock(); - if (element->trx) - { - check_trx_state(element->trx); - for (const lock_t *lock= UT_LIST_GET_FIRST(element->trx->lock.trx_locks); - lock != NULL; - lock= UT_LIST_GET_NEXT(trx_locks, lock)) - if (lock->is_table()) - lock_table_queue_validate(lock->un_member.tab_lock.table); - } - element->mutex.wr_unlock(); - return 0; -} - - /** Validate the transactional locks. */ static void lock_validate() { @@ -4890,7 +4872,14 @@ static void lock_validate() { LockMutexGuard g{SRW_LOCK_CALL}; /* Validate table locks */ - trx_sys.rw_trx_hash.iterate(lock_validate_table_locks); + trx_sys.rw_trx_hash.for_each([](const trx_t &trx) + { + check_trx_state(&trx); + for (const lock_t *lock= UT_LIST_GET_FIRST(trx.lock.trx_locks); + lock; lock= UT_LIST_GET_NEXT(trx_locks, lock)) + if (lock->is_table()) + lock_table_queue_validate(lock->un_member.tab_lock.table); + }); for (ulint i= 0; i < lock_sys.rec_hash.n_cells; i++) { @@ -5062,40 +5051,6 @@ lock_rec_convert_impl_to_expl_for_trx( #ifdef UNIV_DEBUG -struct lock_rec_other_trx_holds_expl_arg -{ - const ulint heap_no; - const hash_cell_t &cell; - const page_id_t id; - const trx_t &impl_trx; -}; - - -static my_bool lock_rec_other_trx_holds_expl_callback( - rw_trx_hash_element_t *element, - lock_rec_other_trx_holds_expl_arg *arg) -{ - element->mutex.wr_lock(); - if (element->trx) - { - element->trx->mutex_lock(); - ut_ad(element->trx->state != TRX_STATE_NOT_STARTED); - lock_t *expl_lock= element->trx->state == TRX_STATE_COMMITTED_IN_MEMORY - ? nullptr - : lock_rec_has_expl(LOCK_S | LOCK_REC_NOT_GAP, - arg->cell, arg->id, arg->heap_no, element->trx); - /* - An explicit lock is held by trx other than the trx holding the implicit - lock. - */ - ut_ad(!expl_lock || expl_lock->trx == &arg->impl_trx); - element->trx->mutex_unlock(); - } - element->mutex.wr_unlock(); - return 0; -} - - /** Checks if some transaction, other than given trx_id, has an explicit lock on the given rec. @@ -5104,29 +5059,37 @@ static my_bool lock_rec_other_trx_holds_expl_callback( subsequent locking read should not convert it to explicit. See also MDEV-11215. - @param caller_trx trx of current thread - @param[in] trx trx holding implicit lock on rec - @param[in] rec user record - @param[in] id page identifier + @param impl_trx trx holding implicit lock on rec + @param rec user record + @param id page identifier */ -static void lock_rec_other_trx_holds_expl(trx_t *caller_trx, trx_t *trx, +static void lock_rec_other_trx_holds_expl(trx_t *impl_trx, const rec_t *rec, const page_id_t id) { - if (trx) + if (impl_trx) { ut_ad(!page_rec_is_metadata(rec)); LockGuard g{lock_sys.rec_hash, id}; - ut_ad(trx->is_referenced()); - const trx_state_t state{trx->state}; + ut_ad(impl_trx->is_referenced()); + const trx_state_t state{impl_trx->state}; ut_ad(state != TRX_STATE_NOT_STARTED); if (state == TRX_STATE_COMMITTED_IN_MEMORY) /* The transaction was committed before we acquired LockGuard. */ return; - lock_rec_other_trx_holds_expl_arg arg= - { page_rec_get_heap_no(rec), g.cell(), id, *trx }; - trx_sys.rw_trx_hash.iterate(caller_trx, - lock_rec_other_trx_holds_expl_callback, &arg); + const ulint heap_no{page_rec_get_heap_no(rec)}; + + trx_sys.rw_trx_hash.for_each([&](trx_t &trx) + { + trx.mutex_lock(); + ut_ad(trx.state != TRX_STATE_NOT_STARTED); + if (lock_t *expl_lock= trx.state == TRX_STATE_COMMITTED_IN_MEMORY + ? nullptr + : lock_rec_has_expl(LOCK_S | LOCK_REC_NOT_GAP, + g.cell(), id, heap_no, &trx)) + ut_ad(expl_lock->trx == impl_trx); + trx.mutex_unlock(); + }); } } #endif /* UNIV_DEBUG */ @@ -5179,7 +5142,7 @@ lock_rec_convert_impl_to_expl( return true; } - trx = trx_sys.find(caller_trx, trx_id); + trx = trx_sys.find(trx_id); } else { ut_ad(!dict_index_is_online_ddl(index)); @@ -5190,7 +5153,7 @@ lock_rec_convert_impl_to_expl( return true; } - ut_d(lock_rec_other_trx_holds_expl(caller_trx, trx, rec, id)); + ut_d(lock_rec_other_trx_holds_expl(trx, rec, id)); } if (trx) { @@ -5823,47 +5786,6 @@ dberr_t lock_trx_handle_wait(trx_t *trx) return err; } -#ifdef UNIV_DEBUG -/** - Do an exhaustive check for any locks (table or rec) against the table. - - @param[in] table check if there are any locks held on records in this table - or on the table itself -*/ - -static my_bool lock_table_locks_lookup(rw_trx_hash_element_t *element, - const dict_table_t *table) -{ - lock_sys.assert_locked(); - element->mutex.wr_lock(); - if (element->trx) - { - element->trx->mutex_lock(); - check_trx_state(element->trx); - if (element->trx->state != TRX_STATE_COMMITTED_IN_MEMORY) - { - for (const lock_t *lock= UT_LIST_GET_FIRST(element->trx->lock.trx_locks); - lock != NULL; - lock= UT_LIST_GET_NEXT(trx_locks, lock)) - { - ut_ad(lock->trx == element->trx); - if (!lock->is_table()) - { - ut_ad(lock->index->online_status != ONLINE_INDEX_CREATION || - lock->index->is_primary()); - ut_ad(lock->index->table != table); - } - else - ut_ad(lock->un_member.tab_lock.table != table); - } - } - element->trx->mutex_unlock(); - } - element->mutex.wr_unlock(); - return 0; -} -#endif /* UNIV_DEBUG */ - /** Check if there are any locks on a table. @return true if table has either table or record locks. */ TRANSACTIONAL_TARGET @@ -5887,16 +5809,35 @@ bool lock_table_has_locks(dict_table_t *table) len= UT_LIST_GET_LEN(table->locks); table->lock_mutex_unlock(); } - if (len) - return true; #ifdef UNIV_DEBUG + if (!len) { LockMutexGuard g{SRW_LOCK_CALL}; - trx_sys.rw_trx_hash.iterate(lock_table_locks_lookup, - const_cast<const dict_table_t*>(table)); + trx_sys.rw_trx_hash.for_each([table](trx_t &trx) + { + trx.mutex_lock(); + check_trx_state(&trx); + if (trx.state != TRX_STATE_COMMITTED_IN_MEMORY) + { + for (const lock_t *lock= UT_LIST_GET_FIRST(trx.lock.trx_locks); + lock; lock= UT_LIST_GET_NEXT(trx_locks, lock)) + { + ut_ad(lock->trx == &trx); + if (!lock->is_table()) + { + ut_ad(lock->index->online_status != ONLINE_INDEX_CREATION || + lock->index->is_primary()); + ut_ad(lock->index->table != table); + } + else + ut_ad(lock->un_member.tab_lock.table != table); + } + } + trx.mutex_unlock(); + }); } #endif /* UNIV_DEBUG */ - return false; + return len; } /*******************************************************************//** diff --git a/storage/innobase/read/read0read.cc b/storage/innobase/read/read0read.cc index 05d12fa7f21..af7726bc6b8 100644 --- a/storage/innobase/read/read0read.cc +++ b/storage/innobase/read/read0read.cc @@ -167,12 +167,10 @@ For details see: row_vers_old_has_index_entry() and row_purge_poss_sec() /** Creates a snapshot where exactly the transactions serialized before this point in time are seen in the view. - - @param[in,out] trx transaction */ -inline void ReadViewBase::snapshot(trx_t *trx) +inline void ReadViewBase::snapshot() { - trx_sys.snapshot_ids(trx, &m_ids, &m_low_limit_id, &m_low_limit_no); + trx_sys.snapshot_ids(&m_ids, &m_low_limit_id, &m_low_limit_no); std::sort(m_ids.begin(), m_ids.end()); m_up_limit_id= m_ids.empty() ? m_low_limit_id : m_ids.front(); ut_ad(m_up_limit_id <= m_low_limit_id); @@ -227,7 +225,7 @@ void ReadView::open(trx_t *trx) else { m_mutex.wr_lock(); - snapshot(trx); + snapshot(); m_open.store(true, std::memory_order_relaxed); m_mutex.wr_unlock(); } @@ -244,7 +242,7 @@ void ReadView::open(trx_t *trx) */ void trx_sys_t::clone_oldest_view(ReadViewBase *view) const { - view->snapshot(nullptr); + view->snapshot(); /* Find oldest view. */ trx_list.for_each([view](const trx_t &trx) { trx.read_view.append_to(view); diff --git a/storage/innobase/row/row0import.cc b/storage/innobase/row/row0import.cc index 1d53ede4e9c..0045c2094ed 100644 --- a/storage/innobase/row/row0import.cc +++ b/storage/innobase/row/row0import.cc @@ -3392,8 +3392,7 @@ static dberr_t handle_instant_metadata(dict_table_t *table, trx_sys.is_registered(DB_TRX_ID). */ if (rec_offs_n_fields(offsets) > ulint(index->n_fields) + !!index->table->instant && - !trx_sys.is_registered(current_trx(), - row_get_rec_trx_id(rec, index, offsets))) + !trx_sys.is_registered(row_get_rec_trx_id(rec, index, offsets))) goto inconsistent; for (unsigned i= index->n_core_fields; i < index->n_fields; i++) diff --git a/storage/innobase/row/row0row.cc b/storage/innobase/row/row0row.cc index 4cd1c3a4d26..61b84139ffb 100644 --- a/storage/innobase/row/row0row.cc +++ b/storage/innobase/row/row0row.cc @@ -456,8 +456,7 @@ row_build_low( times, and the cursor restore can happen multiple times for single insert or update statement. */ ut_a(!rec_offs_any_null_extern(rec, offsets) - || trx_sys.is_registered(current_trx(), - row_get_rec_trx_id(rec, index, + || trx_sys.is_registered(row_get_rec_trx_id(rec, index, offsets))); #endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */ diff --git a/storage/innobase/row/row0sel.cc b/storage/innobase/row/row0sel.cc index e01637d22cc..0fd9032c732 100644 --- a/storage/innobase/row/row0sel.cc +++ b/storage/innobase/row/row0sel.cc @@ -2,7 +2,7 @@ Copyright (c) 1997, 2017, Oracle and/or its affiliates. All Rights Reserved. Copyright (c) 2008, Google Inc. -Copyright (c) 2015, 2021, MariaDB Corporation. +Copyright (c) 2015, 2022, MariaDB Corporation. Portions of this file contain modifications contributed and copyrighted by Google, Inc. Those modifications are gratefully acknowledged and are described @@ -876,7 +876,7 @@ row_sel_build_committed_vers_for_mysql( rec_offs_size(*offsets)); } - row_vers_build_for_semi_consistent_read(prebuilt->trx, + row_vers_build_for_semi_consistent_read( rec, mtr, clust_index, offsets, offset_heap, prebuilt->old_vers_heap, old_vers, vrow); } @@ -5121,7 +5121,7 @@ wrong_offs: /* In delete-marked records, DB_TRX_ID must always refer to an existing undo log record. */ ut_ad(trx_id); - if (!trx_sys.is_registered(trx, trx_id)) { + if (!trx_sys.is_registered(trx_id)) { /* The clustered index record was delete-marked in a committed transaction. Ignore the record. */ diff --git a/storage/innobase/row/row0vers.cc b/storage/innobase/row/row0vers.cc index 695c6dba472..5e52fffd525 100644 --- a/storage/innobase/row/row0vers.cc +++ b/storage/innobase/row/row0vers.cc @@ -1,7 +1,7 @@ /***************************************************************************** Copyright (c) 1997, 2017, Oracle and/or its affiliates. All Rights Reserved. -Copyright (c) 2017, 2021, MariaDB Corporation. +Copyright (c) 2017, 2022, MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -134,8 +134,8 @@ row_vers_impl_x_locked_low( trx = caller_trx; trx->reference(); } else { - trx = trx_sys.find(caller_trx, trx_id); - if (trx == 0) { + trx = trx_sys.find(trx_id); + if (!trx) { /* The transaction that modified or inserted clust_rec is no longer active, or it is corrupt: no implicit lock on rec */ @@ -424,13 +424,13 @@ row_vers_impl_x_locked( a rollback we always undo the modifications to secondary index records before the clustered index record. */ - trx = 0; + trx = nullptr; } else { trx = row_vers_impl_x_locked_low( caller_trx, clust_rec, clust_index, rec, index, offsets, &mtr); - ut_ad(trx == 0 || trx->is_referenced()); + ut_ad(!trx || trx->is_referenced()); } mtr_commit(&mtr); @@ -1217,7 +1217,6 @@ which should be seen by a semi-consistent read. */ void row_vers_build_for_semi_consistent_read( /*====================================*/ - trx_t* caller_trx,/*!<in/out: trx of current thread */ const rec_t* rec, /*!< in: record in a clustered index; the caller must have a latch on the page; this latch locks the top of the stack of versions @@ -1262,7 +1261,7 @@ row_vers_build_for_semi_consistent_read( rec_trx_id = version_trx_id; } - if (!trx_sys.is_registered(caller_trx, version_trx_id)) { + if (!trx_sys.is_registered(version_trx_id)) { committed_version_trx: /* We found a version that belongs to a committed transaction: return it. */ diff --git a/storage/innobase/trx/trx0purge.cc b/storage/innobase/trx/trx0purge.cc index d45f23ea1ea..cc88a4a03ad 100644 --- a/storage/innobase/trx/trx0purge.cc +++ b/storage/innobase/trx/trx0purge.cc @@ -227,7 +227,7 @@ void trx_purge_add_undo_to_history(const trx_t* trx, trx_undo_t*& undo, mtr_t* mtr) { DBUG_PRINT("trx", ("commit(" TRX_ID_FMT "," TRX_ID_FMT ")", - trx->id, trx_id_t{trx->rw_trx_hash_element->no})); + trx->id, trx->no)); ut_ad(undo == trx->rsegs.m_redo.undo); trx_rseg_t* rseg = trx->rsegs.m_redo.rseg; ut_ad(undo->rseg == rseg); @@ -317,15 +317,13 @@ trx_purge_add_undo_to_history(const trx_t* trx, trx_undo_t*& undo, mtr_t* mtr) + TRX_UNDO_HISTORY_NODE), mtr); mtr->write<8,mtr_t::MAYBE_NOP>(*undo_page, - undo_header + TRX_UNDO_TRX_NO, - trx->rw_trx_hash_element->no); + undo_header + TRX_UNDO_TRX_NO, trx->no); mtr->write<2,mtr_t::MAYBE_NOP>(*undo_page, undo_header + TRX_UNDO_NEEDS_PURGE, 1U); if (rseg->last_page_no == FIL_NULL) { rseg->last_page_no = undo->hdr_page_no; - rseg->set_last_commit(undo->hdr_offset, - trx->rw_trx_hash_element->no); + rseg->set_last_commit(undo->hdr_offset, trx->no); rseg->set_needs_purge(); } diff --git a/storage/innobase/trx/trx0roll.cc b/storage/innobase/trx/trx0roll.cc index ddad699ead4..ee8d558d180 100644 --- a/storage/innobase/trx/trx0roll.cc +++ b/storage/innobase/trx/trx0roll.cc @@ -43,6 +43,7 @@ Created 3/26/1996 Heikki Tuuri #include "trx0sys.h" #include "trx0trx.h" #include "trx0undo.h" +#include "log.h" #ifdef UNIV_PFS_THREAD mysql_pfs_key_t trx_rollback_clean_thread_key; @@ -620,76 +621,44 @@ trx_rollback_active( } -struct trx_roll_count_callback_arg -{ - uint32_t n_trx; - uint64_t n_rows; - trx_roll_count_callback_arg(): n_trx(0), n_rows(0) {} -}; - - -static my_bool trx_roll_count_callback(rw_trx_hash_element_t *element, - trx_roll_count_callback_arg *arg) -{ - element->mutex.wr_lock(); - if (trx_t *trx= element->trx) - { - if (trx->is_recovered && trx_state_eq(trx, TRX_STATE_ACTIVE)) - { - arg->n_trx++; - arg->n_rows+= trx->undo_no; - } - } - element->mutex.wr_unlock(); - return 0; -} - /** Report progress when rolling back a row of a recovered transaction. */ void trx_roll_report_progress() { - time_t now = time(NULL); - mysql_mutex_lock(&recv_sys.mutex); - bool report = recv_sys.report(now); - mysql_mutex_unlock(&recv_sys.mutex); - - if (report) { - trx_roll_count_callback_arg arg; - - /* Get number of recovered active transactions and number of - rows they modified. Numbers must be accurate, because only this - thread is allowed to touch recovered transactions. */ - trx_sys.rw_trx_hash.iterate_no_dups( - trx_roll_count_callback, &arg); - - if (arg.n_rows > 0) { - service_manager_extend_timeout( - INNODB_EXTEND_TIMEOUT_INTERVAL, - "To roll back: " UINT32PF " transactions, " - UINT64PF " rows", arg.n_trx, arg.n_rows); - } + time_t now= time(nullptr); + mysql_mutex_lock(&recv_sys.mutex); + const bool report{recv_sys.report(now)}; + mysql_mutex_unlock(&recv_sys.mutex); - ib::info() << "To roll back: " << arg.n_trx - << " transactions, " << arg.n_rows << " rows"; + if (report) + { + size_t n_trx= 0; + uint64_t n_rows= 0; - } -} + /* Get number of recovered active transactions and number of + rows they modified. Numbers must be accurate, because only this + thread is allowed to touch recovered transactions. */ + trx_sys.rw_trx_hash.for_each([&n_trx, &n_rows](const trx_t &trx) + { + if (trx.is_recovered && trx_state_eq(&trx, TRX_STATE_ACTIVE)) + { + n_trx++; + n_rows+= trx.undo_no; + } + }); + if (n_rows) + { + service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL, + "To roll back: %zu transactions, " + UINT64PF " rows", n_trx, n_rows); + } -static my_bool trx_rollback_recovered_callback(rw_trx_hash_element_t *element, - std::vector<trx_t*> *trx_list) -{ - element->mutex.wr_lock(); - if (trx_t *trx= element->trx) - { - trx->mutex_lock(); - if (trx_state_eq(trx, TRX_STATE_ACTIVE) && trx->is_recovered) - trx_list->push_back(trx); - trx->mutex_unlock(); + sql_print_information("InnoDB: To roll back: %zu transactions, " + UINT64PF " rows", n_trx, n_rows); } - element->mutex.wr_unlock(); - return 0; } + /** Rollback any incomplete transactions which were encountered in crash recovery. @@ -717,8 +686,13 @@ void trx_rollback_recovered(bool all) other thread is allowed to modify or remove these transactions from rw_trx_hash. */ - trx_sys.rw_trx_hash.iterate_no_dups(trx_rollback_recovered_callback, - &trx_list); + trx_sys.rw_trx_hash.for_each([&trx_list](trx_t &trx) + { + trx.mutex_lock(); + if (trx_state_eq(&trx, TRX_STATE_ACTIVE) && trx.is_recovered) + trx_list.push_back(&trx); + trx.mutex_unlock(); + }); while (!trx_list.empty()) { @@ -780,7 +754,7 @@ void trx_rollback_all_recovered(void*) { ut_ad(!srv_read_only_mode); - if (trx_sys.rw_trx_hash.size()) { + if (!trx_sys.rw_trx_hash.empty()) { ib::info() << "Starting in background the rollback of" " recovered transactions"; trx_rollback_recovered(true); diff --git a/storage/innobase/trx/trx0sys.cc b/storage/innobase/trx/trx0sys.cc index c4640ab4716..995835f3fb1 100644 --- a/storage/innobase/trx/trx0sys.cc +++ b/storage/innobase/trx/trx0sys.cc @@ -1,7 +1,7 @@ /***************************************************************************** Copyright (c) 1996, 2017, Oracle and/or its affiliates. All Rights Reserved. -Copyright (c) 2017, 2021, MariaDB Corporation. +Copyright (c) 2017, 2022, MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -40,6 +40,7 @@ Created 3/26/1996 Heikki Tuuri #include "log0log.h" #include "log0recv.h" #include "os0file.h" +#include "log.h" /** The transaction system */ trx_sys_t trx_sys; @@ -83,19 +84,13 @@ ReadViewBase::check_trx_id_sanity( uint trx_rseg_n_slots_debug = 0; #endif -/** Display the MySQL binlog offset info if it is present in the trx -system header. */ -void -trx_sys_print_mysql_binlog_offset() +/** Display the binlog offset info if it is present in the undo pages. */ +void trx_sys_print_mysql_binlog_offset() { - if (!*trx_sys.recovered_binlog_filename) { - return; - } - - ib::info() << "Last binlog file '" - << trx_sys.recovered_binlog_filename - << "', position " - << trx_sys.recovered_binlog_offset; + if (*trx_sys.recovered_binlog_filename) + sql_print_information("InnoDB: Last binlog file '%s', position " UINT64PF, + trx_sys.recovered_binlog_filename, + trx_sys.recovered_binlog_offset); } /** Find an available rollback segment. @@ -197,13 +192,55 @@ trx_sysf_create( ut_a(rblock->page.id() == page_id_t(0, FSP_FIRST_RSEG_PAGE_NO)); } +inline void rw_trx_hash_t::destroy() +{ + for (ulint i= 0; i < n_cells_padded; i++) + { + if (trx_t *trx= array[i].first) + { + ut_ad(i % (ELEMENTS_PER_LATCH + 1)); + do + { + ut_d(validate_element(trx)); + ut_ad(trx_state_eq(trx, TRX_STATE_PREPARED) || + trx_state_eq(trx, TRX_STATE_PREPARED_RECOVERED) || + (trx_state_eq(trx, TRX_STATE_ACTIVE) && + (!srv_was_started || + srv_read_only_mode || + srv_force_recovery >= SRV_FORCE_NO_TRX_UNDO))); + trx_t *next= trx->rw_trx_hash; + trx_free_at_shutdown(trx); + trx= next; + } + while (trx); + array[i].first= nullptr; + } + } +} + +#ifndef NO_ELISION +TRANSACTIONAL_TARGET bool rw_trx_hash_t::empty(const page_hash_latch *lk) const +{ + if (!xbegin()) + return false; + + const hash_chain *b= reinterpret_cast<const hash_chain*>(lk); + const hash_chain *const end= b + (ELEMENTS_PER_LATCH + 1); + + while (!b->first && b < end) + b++; + + xend(); + return b >= end; +} +#endif + void trx_sys_t::create() { ut_ad(this == &trx_sys); ut_ad(!is_initialised()); m_initialised= true; trx_list.create(); - rw_trx_hash.init(); } uint32_t trx_sys_t::history_size() @@ -373,33 +410,31 @@ bool trx_sys_create_rsegs() } /** Close the transaction system on shutdown */ -void -trx_sys_t::close() +void trx_sys_t::close() { - ut_ad(srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS); - if (!is_initialised()) { - return; - } + ut_ad(srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS); + if (!is_initialised()) + return; - if (size_t size = view_count()) { - ib::error() << "All read views were not closed before" - " shutdown: " << size << " read views open"; - } + if (size_t size = view_count()) + { + sql_print_error("InnoDB: %zu read views were not closed before shutdown", + size); + ut_ad("read view leak" == 0); + } - rw_trx_hash.destroy(); + rw_trx_hash.destroy(); - /* There can't be any active transactions. */ + /* There can't be any active transactions. */ - for (ulint i = 0; i < array_elements(temp_rsegs); ++i) { - temp_rsegs[i].destroy(); - } - for (ulint i = 0; i < array_elements(rseg_array); ++i) { - rseg_array[i].destroy(); - } + for (size_t i= 0; i < array_elements(temp_rsegs); ++i) + temp_rsegs[i].destroy(); + for (size_t i= 0; i < array_elements(rseg_array); ++i) + rseg_array[i].destroy(); - ut_a(trx_list.empty()); - trx_list.close(); - m_initialised = false; + ut_a(trx_list.empty()); + trx_list.close(); + m_initialised= false; } /** @return total number of active (non-prepared) transactions */ diff --git a/storage/innobase/trx/trx0trx.cc b/storage/innobase/trx/trx0trx.cc index 3c3c4db2b3e..2131fc4628a 100644 --- a/storage/innobase/trx/trx0trx.cc +++ b/storage/innobase/trx/trx0trx.cc @@ -47,6 +47,7 @@ Created 3/26/1996 Heikki Tuuri #include "trx0xa.h" #include "ut0pool.h" #include "ut0vec.h" +#include "log.h" #include <set> #include <new> @@ -99,6 +100,8 @@ trx_init( { trx->state = TRX_STATE_NOT_STARTED; + trx->no = TRX_ID_MAX; + trx->is_recovered = false; trx->op_info = ""; @@ -162,16 +165,13 @@ struct TrxFactory { static void init(trx_t* trx) { /* Explicitly call the constructor of the already - allocated object. trx_t objects are allocated by - ut_zalloc_nokey() in Pool::Pool() which would not call - the constructors of the trx_t members. */ + allocated and zero-initialized memory. */ new(&trx->mod_tables) trx_mod_tables_t(); new(&trx->lock.table_locks) lock_list(); new(&trx->read_view) ReadView(); - trx->rw_trx_hash_pins = 0; trx_init(trx); trx->dict_operation_lock_mode = false; @@ -293,7 +293,7 @@ typedef PoolManager<trx_pool_t, TrxPoolManagerLock > trx_pools_t; static trx_pools_t* trx_pools; /** Size of on trx_t pool in bytes. */ -static const ulint MAX_TRX_BLOCK_SIZE = 1024 * 1024 * 4; +static constexpr size_t MAX_TRX_BLOCK_SIZE = 1024 * 1024 * 4; /** Create the trx_t pool */ void @@ -338,7 +338,6 @@ trx_t *trx_create() /* We just got trx from pool, it should be non locking */ ut_ad(!trx->will_lock); - ut_ad(!trx->rw_trx_hash_pins); DBUG_LOG("trx", "Create: " << trx); @@ -382,8 +381,9 @@ void trx_t::free() dict_operation= false; trx_sys.deregister_trx(this); + ut_ad(no == TRX_ID_MAX); + ut_ad(state == TRX_STATE_NOT_STARTED); assert_freed(); - trx_sys.rw_trx_hash.put_pins(this); mysql_thd= nullptr; @@ -397,9 +397,11 @@ void trx_t::free() } MEM_NOACCESS(&n_ref, sizeof n_ref); - /* do not poison mutex */ - MEM_NOACCESS(&id, sizeof id); MEM_NOACCESS(&state, sizeof state); + MEM_NOACCESS(&id, sizeof id); + MEM_NOACCESS(&no, sizeof no); + MEM_NOACCESS(&rw_trx_hash, sizeof rw_trx_hash); + /* do not poison mutex */ MEM_NOACCESS(&is_recovered, sizeof is_recovered); #ifdef WITH_WSREP MEM_NOACCESS(&wsrep, sizeof wsrep); @@ -541,7 +543,6 @@ void trx_disconnect_prepared(trx_t *trx) trx->mysql_thd= NULL; /* todo/fixme: suggest to do it at innodb prepare */ trx->will_lock= false; - trx_sys.rw_trx_hash.put_pins(trx); } /****************************************************************//** @@ -650,8 +651,8 @@ static void trx_resurrect(trx_undo_t *undo, trx_rseg_t *rseg, Prepared transactions are left in the prepared state waiting for a commit or abort decision from MySQL */ - ib::info() << "Transaction " << undo->trx_id - << " was in the XA prepared state."; + sql_print_information("InnoDB: Transaction " TRX_ID_FMT + " was in the XA prepared state.", undo->trx_id); state= TRX_STATE_PREPARED; break; @@ -681,7 +682,6 @@ static void trx_resurrect(trx_undo_t *undo, trx_rseg_t *rseg, trx->dict_operation= undo->dict_operation; trx_sys.rw_trx_hash.insert(trx); - trx_sys.rw_trx_hash.put_pins(trx); trx_resurrect_table_locks(trx, undo); if (trx_state_eq(trx, TRX_STATE_ACTIVE)) *rows_to_undo+= trx->undo_no; @@ -707,7 +707,8 @@ dberr_t trx_lists_init_at_db_start() purge_sys.create(); if (dberr_t err = trx_rseg_array_init()) { - ib::info() << "Retry with innodb_force_recovery=5"; + sql_print_information("InnoDB: Retry with " + "innodb_force_recovery=5"); return err; } @@ -730,7 +731,7 @@ dberr_t trx_lists_init_at_db_start() for (undo = UT_LIST_GET_FIRST(rseg.undo_list); undo != NULL; undo = UT_LIST_GET_NEXT(undo_list, undo)) { - trx_t *trx = trx_sys.find(0, undo->trx_id, false); + trx_t *trx = trx_sys.find(undo->trx_id, false); if (!trx) { trx_resurrect(undo, &rseg, start_time, start_time_micro, &rows_to_undo); @@ -967,6 +968,15 @@ trx_start_low( ut_a(trx->error_state == DB_SUCCESS); } +inline void trx_sys_t::assign_new_trx_no(trx_t &trx) +{ + auto &lk= rw_trx_hash.lock_get(trx.id); + lk.lock(); + trx.no= get_new_trx_id_no_refresh(); + lk.unlock(); + refresh_rw_trx_hash_version(); +} + /** Set the serialisation number for a persistent committed transaction. @param[in,out] trx committed transaction with persistent changes */ static @@ -980,15 +990,14 @@ trx_serialise(trx_t* trx) mysql_mutex_lock(&purge_sys.pq_mutex); } - trx_sys.assign_new_trx_no(trx); + trx_sys.assign_new_trx_no(*trx); /* If the rollback segment is not empty then the new trx_t::no can't be less than any trx_t::no already in the rollback segment. User threads only produce events when a rollback segment is empty. */ if (rseg->last_page_no == FIL_NULL) { - purge_sys.purge_queue.push(TrxUndoRsegs(trx->rw_trx_hash_element->no, - *rseg)); + purge_sys.purge_queue.push(TrxUndoRsegs(trx->no, *rseg)); mysql_mutex_unlock(&purge_sys.pq_mutex); } } @@ -1381,9 +1390,7 @@ void trx_t::commit_cleanup() dict_operation= false; DBUG_LOG("trx", "Commit in memory: " << this); - state= TRX_STATE_NOT_STARTED; mod_tables.clear(); - assert_freed(); trx_init(this); mutex.wr_unlock(); @@ -1902,64 +1909,6 @@ void trx_prepare_for_mysql(trx_t* trx) } -struct trx_recover_for_mysql_callback_arg -{ - XID *xid_list; - uint len; - uint count; -}; - - -static my_bool trx_recover_for_mysql_callback(rw_trx_hash_element_t *element, - trx_recover_for_mysql_callback_arg *arg) -{ - DBUG_ASSERT(arg->len > 0); - element->mutex.wr_lock(); - if (trx_t *trx= element->trx) - { - /* - The state of a read-write transaction can only change from ACTIVE to - PREPARED while we are holding the element->mutex. But since it is - executed at startup no state change should occur. - */ - if (trx_state_eq(trx, TRX_STATE_PREPARED)) - { - ut_ad(trx->is_recovered); - ut_ad(trx->id); - if (arg->count == 0) - ib::info() << "Starting recovery for XA transactions..."; - XID& xid= arg->xid_list[arg->count]; - if (arg->count++ < arg->len) - { - trx->state= TRX_STATE_PREPARED_RECOVERED; - ib::info() << "Transaction " << trx->id - << " in prepared state after recovery"; - ib::info() << "Transaction contains changes to " << trx->undo_no - << " rows"; - xid= trx->xid; - } - } - } - element->mutex.wr_unlock(); - /* Do not terminate upon reaching arg->len; count all transactions */ - return false; -} - - -static my_bool trx_recover_reset_callback(rw_trx_hash_element_t *element, - void*) -{ - element->mutex.wr_lock(); - if (trx_t *trx= element->trx) - { - if (trx_state_eq(trx, TRX_STATE_PREPARED_RECOVERED)) - trx->state= TRX_STATE_PREPARED; - } - element->mutex.wr_unlock(); - return false; -} - - /** Find prepared transaction objects for recovery. @@ -1971,80 +1920,92 @@ static my_bool trx_recover_reset_callback(rw_trx_hash_element_t *element, int trx_recover_for_mysql(XID *xid_list, uint len) { - trx_recover_for_mysql_callback_arg arg= { xid_list, len, 0 }; - ut_ad(xid_list); ut_ad(len); + uint count= 0; + /* Fill xid_list with PREPARED transactions. */ - trx_sys.rw_trx_hash.iterate_no_dups(trx_recover_for_mysql_callback, &arg); - if (arg.count) + trx_sys.rw_trx_hash.for_each([&](trx_t &trx) { - ib::info() << arg.count - << " transactions in prepared state after recovery"; + /* + The state of a read-write transaction can only change from ACTIVE to + PREPARED while we are holding the element->mutex. But since it is + executed at startup no state change should occur. + */ + if (trx_state_eq(&trx, TRX_STATE_PREPARED)) + { + ut_ad(trx.is_recovered); + ut_ad(trx.id); + if (count++ == 0) + sql_print_information("InnoDB: Starting recovery for XA" + " transactions..."); + if (count <= len) + { + trx.state= TRX_STATE_PREPARED_RECOVERED; + sql_print_information("InnoDB: Transaction " TRX_ID_FMT + " in prepared state after recovery", + trx.id); + sql_print_information("InnoDB: Transaction contains changes to " + IB_ID_FMT " rows", trx.undo_no); + xid_list[count - 1]= trx.xid; + } + } + }); + + if (count) + { + sql_print_information("InnoDB: %u transactions in prepared state" + " after recovery", count); /* After returning the full list, reset the state, because init_server_components() wants to recover the collection of transactions twice, by first calling tc_log->open() and then ha_recover() directly. */ - if (arg.count <= len) - trx_sys.rw_trx_hash.iterate(trx_recover_reset_callback); + if (count <= len) + trx_sys.rw_trx_hash.for_each([](trx_t &trx) + { + if (trx.state == TRX_STATE_PREPARED_RECOVERED) + trx.state= TRX_STATE_PREPARED; + }); } - return int(std::min(arg.count, len)); + return int(std::min(count, len)); } -struct trx_get_trx_by_xid_callback_arg -{ - const XID *xid; - trx_t *trx; -}; - - -static my_bool trx_get_trx_by_xid_callback(rw_trx_hash_element_t *element, - trx_get_trx_by_xid_callback_arg *arg) -{ - my_bool found= 0; - element->mutex.wr_lock(); - if (trx_t *trx= element->trx) - { - trx->mutex_lock(); - if (trx->is_recovered && - (trx_state_eq(trx, TRX_STATE_PREPARED) || - trx_state_eq(trx, TRX_STATE_PREPARED_RECOVERED)) && - arg->xid->eq(&trx->xid)) - { -#ifdef WITH_WSREP - /* The commit of a prepared recovered Galera - transaction needs a valid trx->xid for - invoking trx_sys_update_wsrep_checkpoint(). */ - if (!wsrep_is_wsrep_xid(&trx->xid)) -#endif /* WITH_WSREP */ - /* Invalidate the XID, so that subsequent calls will not find it. */ - trx->xid.null(); - arg->trx= trx; - found= 1; - } - trx->mutex_unlock(); - } - element->mutex.wr_unlock(); - return found; -} - /** Look up an X/Open distributed transaction in XA PREPARE state. @param[in] xid X/Open XA transaction identifier @return transaction on match (the trx_t::xid will be invalidated); note that the trx may have been committed before the caller acquires trx_t::mutex @retval NULL if no match */ -trx_t* trx_get_trx_by_xid(const XID* xid) +trx_t *trx_get_trx_by_xid(const XID *xid) { - trx_get_trx_by_xid_callback_arg arg= { xid, 0 }; - + trx_t *trx_by_xid= nullptr; if (xid) - trx_sys.rw_trx_hash.iterate(trx_get_trx_by_xid_callback, &arg); - return arg.trx; -} + trx_sys.rw_trx_hash.for_each_until([xid, &trx_by_xid](trx_t &trx) + { + trx.mutex_lock(); + if (trx.is_recovered && + (trx_state_eq(&trx, TRX_STATE_PREPARED) || + trx_state_eq(&trx, TRX_STATE_PREPARED_RECOVERED)) && + xid->eq(&trx.xid)) + { +#ifdef WITH_WSREP + /* The commit of a prepared recovered Galera + transaction needs a valid trx.xid for + invoking trx_sys_update_wsrep_checkpoint(). */ + if (!wsrep_is_wsrep_xid(&trx.xid)) +#endif /* WITH_WSREP */ + /* Invalidate the XID, so that subsequent calls will not find it. */ + trx.xid.null(); + trx_by_xid= &trx; + } + trx.mutex_unlock(); + return trx_by_xid; + }); + return trx_by_xid; +} /*************************************************************//** Starts the transaction if it is not yet started. */ |