summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--storage/innobase/include/read0types.h39
-rw-r--r--storage/innobase/include/trx0sys.h59
-rw-r--r--storage/innobase/include/trx0types.h2
-rw-r--r--storage/innobase/read/read0read.cc14
-rw-r--r--storage/innobase/trx/trx0trx.cc5
5 files changed, 89 insertions, 30 deletions
diff --git a/storage/innobase/include/read0types.h b/storage/innobase/include/read0types.h
index f647826e6ee..3a06190b61d 100644
--- a/storage/innobase/include/read0types.h
+++ b/storage/innobase/include/read0types.h
@@ -89,15 +89,46 @@ public:
/**
Copy state from another view.
+ This method is used to find min(m_low_limit_no), min(m_low_limit_id) and
+ all transaction ids below min(m_low_limit_id). These values effectively
+ form oldest view.
+
@param other view to copy from
*/
void copy(const ReadView &other)
{
ut_ad(&other != this);
- m_ids= other.m_ids;
- m_up_limit_id= other.m_up_limit_id;
- m_low_limit_no= other.m_low_limit_no;
- m_low_limit_id= other.m_low_limit_id;
+ if (m_low_limit_no > other.m_low_limit_no)
+ m_low_limit_no= other.m_low_limit_no;
+ if (m_low_limit_id > other.m_low_limit_id)
+ m_low_limit_id= other.m_low_limit_id;
+
+ trx_ids_t::iterator dst= m_ids.begin();
+ for (trx_ids_t::const_iterator src= other.m_ids.begin();
+ src != other.m_ids.end(); src++)
+ {
+ if (*src >= m_low_limit_id)
+ break;
+loop:
+ if (dst == m_ids.end())
+ {
+ m_ids.push_back(*src);
+ dst= m_ids.end();
+ continue;
+ }
+ if (*dst < *src)
+ {
+ dst++;
+ goto loop;
+ }
+ else if (*dst > *src)
+ dst= m_ids.insert(dst, *src) + 1;
+ }
+ m_ids.erase(std::lower_bound(dst, m_ids.end(), m_low_limit_id),
+ m_ids.end());
+
+ m_up_limit_id= m_ids.empty() ? m_low_limit_id : m_ids.front();
+ ut_ad(m_up_limit_id <= m_low_limit_id);
}
diff --git a/storage/innobase/include/trx0sys.h b/storage/innobase/include/trx0sys.h
index c8013ba4e71..78462a4f178 100644
--- a/storage/innobase/include/trx0sys.h
+++ b/storage/innobase/include/trx0sys.h
@@ -797,17 +797,23 @@ public:
/** The transaction system central memory data structure. */
-struct trx_sys_t {
-private:
+class trx_sys_t
+{
/**
The smallest number not yet assigned as a transaction id or transaction
number. Accessed and updated with atomic operations.
*/
-
MY_ALIGNED(CACHE_LINE_SIZE) trx_id_t m_max_trx_id;
- /** Solves race condition between register_rw() and snapshot_ids(). */
+ /**
+ Solves race conditions between register_rw() and snapshot_ids() as well as
+ race condition between assign_new_trx_no() and snapshot_ids().
+
+ @sa register_rw()
+ @sa assign_new_trx_no()
+ @sa snapshot_ids()
+ */
MY_ALIGNED(CACHE_LINE_SIZE) trx_id_t m_rw_trx_hash_version;
@@ -895,7 +901,7 @@ public:
next call to trx_sys.get_new_trx_id()
*/
- trx_id_t get_max_trx_id(void)
+ trx_id_t get_max_trx_id()
{
return static_cast<trx_id_t>
(my_atomic_load64_explicit(reinterpret_cast<int64*>(&m_max_trx_id),
@@ -917,13 +923,45 @@ public:
/**
+ Allocates and assigns new transaction serialisation number.
+
+ There's a gap between m_max_trx_id increment and transaction serialisation
+ number becoming visible through rw_trx_hash. While we're in this gap
+ concurrent thread may come and do MVCC snapshot without seeing allocated
+ but not yet assigned serialisation number. Then at some point purge thread
+ may clone this view. As a result it won't see newly allocated serialisation
+ number and may remove "unnecessary" history data of this transaction from
+ rollback segments.
+
+ m_rw_trx_hash_version is intended to solve this problem. MVCC snapshot has
+ to wait until m_max_trx_id == m_rw_trx_hash_version, which effectively
+ means that all transaction serialisation numbers up to m_max_trx_id are
+ available through rw_trx_hash.
+
+ We rely on refresh_rw_trx_hash_version() to issue RELEASE memory barrier so
+ that m_rw_trx_hash_version increment happens after
+ trx->rw_trx_hash_element->no becomes visible through rw_trx_hash.
+
+ @param trx transaction
+ */
+ void assign_new_trx_no(trx_t *trx)
+ {
+ trx->no= get_new_trx_id_no_refresh();
+ my_atomic_store64_explicit(reinterpret_cast<int64*>
+ (&trx->rw_trx_hash_element->no),
+ trx->no, MY_MEMORY_ORDER_RELAXED);
+ refresh_rw_trx_hash_version();
+ }
+
+
+ /**
Takes MVCC snapshot.
To reduce malloc probablility we reserver rw_trx_hash.size() + 32 elements
in ids.
For details about get_rw_trx_hash_version() != get_max_trx_id() spin
- @sa register_rw().
+ @sa register_rw() and @sa assign_new_trx_no().
We rely on get_rw_trx_hash_version() to issue ACQUIRE memory barrier so
that loading of m_rw_trx_hash_version happens before accessing rw_trx_hash.
@@ -941,6 +979,7 @@ public:
void snapshot_ids(trx_t *caller_trx, trx_ids_t *ids, trx_id_t *max_trx_id,
trx_id_t *min_trx_no)
{
+ ut_ad(!mutex_own(&mutex));
snapshot_ids_arg arg(ids);
while ((arg.m_id= get_rw_trx_hash_version()) != get_max_trx_id())
@@ -952,7 +991,6 @@ public:
rw_trx_hash.iterate(caller_trx,
reinterpret_cast<my_hash_walk_action>(copy_one_id),
&arg);
- std::sort(ids->begin(), ids->end());
*max_trx_id= arg.m_id;
*min_trx_no= arg.m_no;
@@ -1146,11 +1184,12 @@ private:
/**
Allocates new transaction id without refreshing rw_trx_hash version.
- This method is extracted for exclusive use by register_rw() where
- transaction must be inserted into rw_trx_hash between new transaction id
- allocation and rw_trx_hash version refresh.
+ This method is extracted for exclusive use by register_rw() and
+ assign_new_trx_no() where new id must be allocated atomically with
+ payload of these methods from MVCC snapshot point of view.
@sa get_new_trx_id()
+ @sa assign_new_trx_no()
@return new transaction id
*/
diff --git a/storage/innobase/include/trx0types.h b/storage/innobase/include/trx0types.h
index b893c05781a..29139172c92 100644
--- a/storage/innobase/include/trx0types.h
+++ b/storage/innobase/include/trx0types.h
@@ -112,8 +112,6 @@ enum trx_dict_op_t {
struct trx_t;
/** The locks and state of an active transaction */
struct trx_lock_t;
-/** Transaction system */
-struct trx_sys_t;
/** Signal */
struct trx_sig_t;
/** Rollback segment */
diff --git a/storage/innobase/read/read0read.cc b/storage/innobase/read/read0read.cc
index 607ea027e96..08cdbeef3b1 100644
--- a/storage/innobase/read/read0read.cc
+++ b/storage/innobase/read/read0read.cc
@@ -182,8 +182,8 @@ will mark their views as closed but not actually free their views.
*/
void ReadView::snapshot(trx_t *trx)
{
- ut_ad(!mutex_own(&trx_sys.mutex));
trx_sys.snapshot_ids(trx, &m_ids, &m_low_limit_id, &m_low_limit_no);
+ std::sort(m_ids.begin(), m_ids.end());
m_up_limit_id= m_ids.empty() ? m_low_limit_id : m_ids.front();
ut_ad(m_up_limit_id <= m_low_limit_id);
}
@@ -219,7 +219,7 @@ void ReadView::open(trx_t *trx)
protection. But we're cutting edges to achieve great scalability.
There're at least two types of concurrent threads interested in this
- value: purge coordinator thread (see MVCC::clone_oldest_view()) and
+ value: purge coordinator thread (see trx_sys_t::clone_oldest_view()) and
InnoDB monitor thread (see lock_trx_print_wait_and_mvcc_state()).
What bad things can happen because we allow this race?
@@ -319,10 +319,7 @@ void ReadView::close()
*/
void trx_sys_t::clone_oldest_view()
{
- const ReadView *oldest_view= &purge_sys->view;
-
purge_sys->view.snapshot(0);
-
mutex_enter(&mutex);
/* Find oldest view. */
for (const ReadView *v= UT_LIST_GET_FIRST(m_views); v;
@@ -333,11 +330,8 @@ void trx_sys_t::clone_oldest_view()
while ((state= v->get_state()) == READ_VIEW_STATE_SNAPSHOT)
ut_delay(1);
- if (state == READ_VIEW_STATE_OPEN &&
- v->low_limit_no() < oldest_view->low_limit_no())
- oldest_view= v;
+ if (state == READ_VIEW_STATE_OPEN)
+ purge_sys->view.copy(*v);
}
- if (oldest_view != &purge_sys->view)
- purge_sys->view.copy(*oldest_view);
mutex_exit(&mutex);
}
diff --git a/storage/innobase/trx/trx0trx.cc b/storage/innobase/trx/trx0trx.cc
index b6e2b0067e8..fe50a471e74 100644
--- a/storage/innobase/trx/trx0trx.cc
+++ b/storage/innobase/trx/trx0trx.cc
@@ -1222,10 +1222,7 @@ trx_serialise(trx_t* trx)
mutex_enter(&purge_sys->pq_mutex);
}
- trx->no = trx_sys.get_new_trx_id();
- my_atomic_store64_explicit(reinterpret_cast<int64*>
- (&trx->rw_trx_hash_element->no),
- trx->no, MY_MEMORY_ORDER_RELAXED);
+ trx_sys.assign_new_trx_no(trx);
/* If the rollack segment is not empty then the
new trx_t::no can't be less than any trx_t::no