diff options
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r-- | sql/rpl_parallel.cc | 50 |
1 files changed, 31 insertions, 19 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 35901cb5263..65d5a06a76a 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -693,12 +693,14 @@ convert_kill_to_deadlock_error(rpl_group_info *rgi) static int is_group_ending(Log_event *ev, Log_event_type event_type) { - if (event_type == XID_EVENT) + if (event_type == XID_EVENT || event_type == XA_PREPARE_LOG_EVENT) return 1; if (event_type == QUERY_EVENT) // COMMIT/ROLLBACK are never compressed { Query_log_event *qev = (Query_log_event *)ev; - if (qev->is_commit()) + if (qev->is_commit() || + !strncmp(qev->query, STRING_WITH_LEN("XA COMMIT")) || + !strncmp(qev->query, STRING_WITH_LEN("XA ROLLBACK"))) return 1; if (qev->is_rollback()) return 2; @@ -814,7 +816,7 @@ do_retry: else { /* - A failure of a preceeding "parent" transaction may not be + A failure of a preceding "parent" transaction may not be seen by the current one through its own worker_error. Such induced error gets set by ourselves now. */ @@ -1578,7 +1580,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, to allocate, and will not be left with a half-functional thread pool. */ if (new_count && - !my_multi_malloc(MYF(MY_WME|MY_ZEROFILL), + !my_multi_malloc(PSI_INSTRUMENT_ME, MYF(MY_WME|MY_ZEROFILL), &new_list, new_count*sizeof(*new_list), &rpt_array, new_count*sizeof(*rpt_array), NULL)) @@ -1810,7 +1812,7 @@ rpl_parallel_thread::get_qev_common(Log_event *ev, ulonglong event_size) mysql_mutex_assert_owner(&LOCK_rpl_thread); if ((qev= qev_free_list)) qev_free_list= qev->next; - else if(!(qev= (queued_event *)my_malloc(sizeof(*qev), MYF(0)))) + else if(!(qev= (queued_event *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*qev), MYF(0)))) { my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*qev)); return NULL; @@ -1973,7 +1975,7 @@ rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev, mysql_mutex_assert_owner(&LOCK_rpl_thread); if ((gco= gco_free_list)) gco_free_list= gco->next_gco; - else if(!(gco= (group_commit_orderer *)my_malloc(sizeof(*gco), MYF(0)))) + else if(!(gco= (group_commit_orderer *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*gco), MYF(0)))) { my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*gco)); return NULL; @@ -2123,23 +2125,34 @@ rpl_parallel_thread_pool::release_thread(rpl_parallel_thread *rpt) and the LOCK_rpl_thread must be released with THD::EXIT_COND() instead of mysql_mutex_unlock. - If the flag `reuse' is set, the last worker thread will be returned again, + When `gtid_ev' is not NULL the last worker thread will be returned again, if it is still available. Otherwise a new worker thread is allocated. + + A worker for XA transaction is determined through xid hashing which + ensure for a XA-complete to be scheduled to the same-xid XA-prepare worker. */ rpl_parallel_thread * rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond, - PSI_stage_info *old_stage, bool reuse) + PSI_stage_info *old_stage, + Gtid_log_event *gtid_ev) { uint32 idx; Relay_log_info *rli= rgi->rli; rpl_parallel_thread *thr; idx= rpl_thread_idx; - if (!reuse) + if (gtid_ev) { - ++idx; - if (idx >= rpl_thread_max) - idx= 0; + if (gtid_ev->flags2 & + (Gtid_log_event::FL_COMPLETED_XA | Gtid_log_event::FL_PREPARED_XA)) + idx= my_hash_sort(&my_charset_bin, gtid_ev->xid.key(), + gtid_ev->xid.key_length()) % rpl_thread_max; + else + { + ++idx; + if (idx >= rpl_thread_max) + idx= 0; + } rpl_thread_idx= idx; } thr= rpl_threads[idx]; @@ -2234,7 +2247,7 @@ free_rpl_parallel_entry(void *element) rpl_parallel::rpl_parallel() : current(NULL), sql_thread_stopping(false) { - my_hash_init(&domain_hash, &my_charset_bin, 32, + my_hash_init(PSI_INSTRUMENT_ME, &domain_hash, &my_charset_bin, 32, offsetof(rpl_parallel_entry, domain_id), sizeof(uint32), NULL, free_rpl_parallel_entry, HASH_UNIQUE); } @@ -2268,7 +2281,7 @@ rpl_parallel::find(uint32 domain_id) if (count == 0 || count > opt_slave_parallel_threads) count= opt_slave_parallel_threads; rpl_parallel_thread **p; - if (!my_multi_malloc(MYF(MY_WME|MY_ZEROFILL), + if (!my_multi_malloc(PSI_INSTRUMENT_ME, MYF(MY_WME|MY_ZEROFILL), &e, sizeof(*e), &p, count*sizeof(*p), NULL)) @@ -2697,7 +2710,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, else { DBUG_ASSERT(rli->gtid_skip_flag == GTID_SKIP_TRANSACTION); - if (typ == XID_EVENT || + if (typ == XID_EVENT || typ == XA_PREPARE_LOG_EVENT || (typ == QUERY_EVENT && // COMMIT/ROLLBACK are never compressed (((Query_log_event *)ev)->is_commit() || ((Query_log_event *)ev)->is_rollback()))) @@ -2708,10 +2721,11 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, } } + Gtid_log_event *gtid_ev= NULL; if (typ == GTID_EVENT) { rpl_gtid gtid; - Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); + gtid_ev= static_cast<Gtid_log_event *>(ev); uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO || rli->mi->parallel_mode <= SLAVE_PARALLEL_MINIMAL ? 0 : gtid_ev->domain_id); @@ -2750,8 +2764,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, instead re-use a thread that we queued for previously. */ cur_thread= - e->choose_thread(serial_rgi, &did_enter_cond, &old_stage, - typ != GTID_EVENT); + e->choose_thread(serial_rgi, &did_enter_cond, &old_stage, gtid_ev); if (!cur_thread) { /* This means we were killed. The error is already signalled. */ @@ -2769,7 +2782,6 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, if (typ == GTID_EVENT) { - Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); bool new_gco; enum_slave_parallel_mode mode= rli->mi->parallel_mode; uchar gtid_flags= gtid_ev->flags2; |