summaryrefslogtreecommitdiff
path: root/sql/rpl_parallel.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r--sql/rpl_parallel.cc50
1 files changed, 31 insertions, 19 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 35901cb5263..65d5a06a76a 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -693,12 +693,14 @@ convert_kill_to_deadlock_error(rpl_group_info *rgi)
static int
is_group_ending(Log_event *ev, Log_event_type event_type)
{
- if (event_type == XID_EVENT)
+ if (event_type == XID_EVENT || event_type == XA_PREPARE_LOG_EVENT)
return 1;
if (event_type == QUERY_EVENT) // COMMIT/ROLLBACK are never compressed
{
Query_log_event *qev = (Query_log_event *)ev;
- if (qev->is_commit())
+ if (qev->is_commit() ||
+ !strncmp(qev->query, STRING_WITH_LEN("XA COMMIT")) ||
+ !strncmp(qev->query, STRING_WITH_LEN("XA ROLLBACK")))
return 1;
if (qev->is_rollback())
return 2;
@@ -814,7 +816,7 @@ do_retry:
else
{
/*
- A failure of a preceeding "parent" transaction may not be
+ A failure of a preceding "parent" transaction may not be
seen by the current one through its own worker_error.
Such induced error gets set by ourselves now.
*/
@@ -1578,7 +1580,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
to allocate, and will not be left with a half-functional thread pool.
*/
if (new_count &&
- !my_multi_malloc(MYF(MY_WME|MY_ZEROFILL),
+ !my_multi_malloc(PSI_INSTRUMENT_ME, MYF(MY_WME|MY_ZEROFILL),
&new_list, new_count*sizeof(*new_list),
&rpt_array, new_count*sizeof(*rpt_array),
NULL))
@@ -1810,7 +1812,7 @@ rpl_parallel_thread::get_qev_common(Log_event *ev, ulonglong event_size)
mysql_mutex_assert_owner(&LOCK_rpl_thread);
if ((qev= qev_free_list))
qev_free_list= qev->next;
- else if(!(qev= (queued_event *)my_malloc(sizeof(*qev), MYF(0))))
+ else if(!(qev= (queued_event *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*qev), MYF(0))))
{
my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*qev));
return NULL;
@@ -1973,7 +1975,7 @@ rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev,
mysql_mutex_assert_owner(&LOCK_rpl_thread);
if ((gco= gco_free_list))
gco_free_list= gco->next_gco;
- else if(!(gco= (group_commit_orderer *)my_malloc(sizeof(*gco), MYF(0))))
+ else if(!(gco= (group_commit_orderer *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*gco), MYF(0))))
{
my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*gco));
return NULL;
@@ -2123,23 +2125,34 @@ rpl_parallel_thread_pool::release_thread(rpl_parallel_thread *rpt)
and the LOCK_rpl_thread must be released with THD::EXIT_COND() instead
of mysql_mutex_unlock.
- If the flag `reuse' is set, the last worker thread will be returned again,
+ When `gtid_ev' is not NULL the last worker thread will be returned again,
if it is still available. Otherwise a new worker thread is allocated.
+
+ A worker for XA transaction is determined through xid hashing which
+ ensure for a XA-complete to be scheduled to the same-xid XA-prepare worker.
*/
rpl_parallel_thread *
rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
- PSI_stage_info *old_stage, bool reuse)
+ PSI_stage_info *old_stage,
+ Gtid_log_event *gtid_ev)
{
uint32 idx;
Relay_log_info *rli= rgi->rli;
rpl_parallel_thread *thr;
idx= rpl_thread_idx;
- if (!reuse)
+ if (gtid_ev)
{
- ++idx;
- if (idx >= rpl_thread_max)
- idx= 0;
+ if (gtid_ev->flags2 &
+ (Gtid_log_event::FL_COMPLETED_XA | Gtid_log_event::FL_PREPARED_XA))
+ idx= my_hash_sort(&my_charset_bin, gtid_ev->xid.key(),
+ gtid_ev->xid.key_length()) % rpl_thread_max;
+ else
+ {
+ ++idx;
+ if (idx >= rpl_thread_max)
+ idx= 0;
+ }
rpl_thread_idx= idx;
}
thr= rpl_threads[idx];
@@ -2234,7 +2247,7 @@ free_rpl_parallel_entry(void *element)
rpl_parallel::rpl_parallel() :
current(NULL), sql_thread_stopping(false)
{
- my_hash_init(&domain_hash, &my_charset_bin, 32,
+ my_hash_init(PSI_INSTRUMENT_ME, &domain_hash, &my_charset_bin, 32,
offsetof(rpl_parallel_entry, domain_id), sizeof(uint32),
NULL, free_rpl_parallel_entry, HASH_UNIQUE);
}
@@ -2268,7 +2281,7 @@ rpl_parallel::find(uint32 domain_id)
if (count == 0 || count > opt_slave_parallel_threads)
count= opt_slave_parallel_threads;
rpl_parallel_thread **p;
- if (!my_multi_malloc(MYF(MY_WME|MY_ZEROFILL),
+ if (!my_multi_malloc(PSI_INSTRUMENT_ME, MYF(MY_WME|MY_ZEROFILL),
&e, sizeof(*e),
&p, count*sizeof(*p),
NULL))
@@ -2697,7 +2710,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
else
{
DBUG_ASSERT(rli->gtid_skip_flag == GTID_SKIP_TRANSACTION);
- if (typ == XID_EVENT ||
+ if (typ == XID_EVENT || typ == XA_PREPARE_LOG_EVENT ||
(typ == QUERY_EVENT && // COMMIT/ROLLBACK are never compressed
(((Query_log_event *)ev)->is_commit() ||
((Query_log_event *)ev)->is_rollback())))
@@ -2708,10 +2721,11 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
}
}
+ Gtid_log_event *gtid_ev= NULL;
if (typ == GTID_EVENT)
{
rpl_gtid gtid;
- Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
+ gtid_ev= static_cast<Gtid_log_event *>(ev);
uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ||
rli->mi->parallel_mode <= SLAVE_PARALLEL_MINIMAL ?
0 : gtid_ev->domain_id);
@@ -2750,8 +2764,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
instead re-use a thread that we queued for previously.
*/
cur_thread=
- e->choose_thread(serial_rgi, &did_enter_cond, &old_stage,
- typ != GTID_EVENT);
+ e->choose_thread(serial_rgi, &did_enter_cond, &old_stage, gtid_ev);
if (!cur_thread)
{
/* This means we were killed. The error is already signalled. */
@@ -2769,7 +2782,6 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
if (typ == GTID_EVENT)
{
- Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
bool new_gco;
enum_slave_parallel_mode mode= rli->mi->parallel_mode;
uchar gtid_flags= gtid_ev->flags2;