diff options
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r-- | sql/rpl_parallel.cc | 274 |
1 files changed, 256 insertions, 18 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 78d81d973e4..1e07ca582da 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -150,6 +150,9 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id, wait_for_commit *wfc= &rgi->commit_orderer; int err; + if (rgi->get_finish_event_group_called()) + return; + thd->get_stmt_da()->set_overwrite_status(true); /* Remove any left-over registration to wait for a prior commit to @@ -279,6 +282,8 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id, */ thd->get_stmt_da()->reset_diagnostics_area(); wfc->wakeup_subsequent_commits(rgi->worker_error); + rgi->did_mark_start_commit= false; + rgi->set_finish_event_group_called(true); } @@ -583,6 +588,7 @@ rpl_pause_for_ftwrl(THD *thd) uint32 i; rpl_parallel_thread_pool *pool= &global_rpl_thread_pool; int err; + Dynamic_array<Master_info*> mi_arr(4, 4); // array of replication source mi:s DBUG_ENTER("rpl_pause_for_ftwrl"); /* @@ -634,9 +640,36 @@ rpl_pause_for_ftwrl(THD *thd) mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry); }; --e->need_sub_id_signal; + thd->EXIT_COND(&old_stage); if (err) break; + /* + Notify any source any domain waiting-for-master Start-Alter to give way. + */ + Master_info *mi= e->rli->mi; + bool found= false; + for (uint i= 0; i < mi_arr.elements() && !found; i++) + found= mi_arr.at(i) == mi; + if (!found) + { + mi_arr.append(mi); + start_alter_info *info=NULL; + mysql_mutex_lock(&mi->start_alter_list_lock); + List_iterator<start_alter_info> info_iterator(mi->start_alter_list); + while ((info= info_iterator++)) + { + mysql_mutex_lock(&mi->start_alter_lock); + + DBUG_ASSERT(info->state == start_alter_state::REGISTERED); + + info->state= start_alter_state::ROLLBACK_ALTER; + info->direct_commit_alter= true; + mysql_cond_broadcast(&info->start_alter_cond); + mysql_mutex_unlock(&mi->start_alter_lock); + } + mysql_mutex_unlock(&mi->start_alter_list_lock); + } } if (err) @@ -828,11 +861,9 @@ do_retry: { mysql_mutex_lock(&entry->LOCK_parallel_entry); register_wait_for_prior_event_group_commit(rgi, entry); - if (!(entry->stop_on_error_sub_id == (uint64) ULONGLONG_MAX || -#ifndef DBUG_OFF - (DBUG_EVALUATE_IF("simulate_mdev_12746", 1, 0)) || -#endif - rgi->gtid_sub_id < entry->stop_on_error_sub_id)) + if (entry->stop_on_error_sub_id != (uint64) ULONGLONG_MAX && + !DBUG_IF("simulate_mdev_12746") && + rgi->gtid_sub_id >= entry->stop_on_error_sub_id) { /* A failure of a preceding "parent" transaction may not be @@ -1712,6 +1743,9 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, { mysql_mutex_lock(&pool->threads[i]->LOCK_rpl_thread); pool->threads[i]->delay_start= false; + pool->threads[i]->current_start_alter_id= 0; + pool->threads[i]->current_start_alter_domain_id= 0; + pool->threads[i]->reserved_start_alter_thread= false; mysql_cond_signal(&pool->threads[i]->COND_rpl_thread); while (!pool->threads[i]->running) mysql_cond_wait(&pool->threads[i]->COND_rpl_thread, @@ -1992,7 +2026,19 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev, rgi->retry_start_offset= rli->future_event_relay_log_pos-event_size; rgi->retry_event_count= 0; rgi->killed_for_retry= rpl_group_info::RETRY_KILL_NONE; + /* rgi is transaction specific so we need to move this value to rgi */ + rgi->reserved_start_alter_thread= reserved_start_alter_thread; + rgi->rpt= this; + rgi->direct_commit_alter= false; + rgi->finish_event_group_called= false; + DBUG_ASSERT(!rgi->sa_info); + /* + We can remove the reserved_start_alter_thread flag. + If we get more concurrent alter handle_split_alter will + automatically set this flag again. + */ + reserved_start_alter_thread= false; return rgi; } @@ -2063,6 +2109,10 @@ rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco) loc_gco_list= gco; } +void rpl_group_info::finish_start_alter_event_group() +{ + finish_event_group(rpt, this->gtid_sub_id, this->parallel_entry, this); +} rpl_parallel_thread::rpl_parallel_thread() : channel_name_length(0), last_error_number(0), last_error_timestamp(0), @@ -2072,7 +2122,7 @@ rpl_parallel_thread::rpl_parallel_thread() rpl_parallel_thread_pool::rpl_parallel_thread_pool() - : threads(0), free_list(0), count(0), inited(false), busy(false), + : threads(0), free_list(0), count(0), inited(false),current_start_alters(0), busy(false), pfs_bkp{0, false, false, NULL} { } @@ -2206,6 +2256,129 @@ rpl_parallel_thread_pool::copy_pool_for_pfs(Relay_log_info *rli) } } +/* + START ALTER , COMMIT ALTER / ROLLBACK ALTER scheduling + + Steps:- + 1. (For Gtid_log_event SA). Get the worker thread which is either + e->rpl_threads[i] is NULL means worker from poll has not been assigned yet + e->rpl_threads[i]->current_owner != &e->rpl_threads[i] + Thread has been released, or about to //same as choose_thread logic + !e->rpl_threads[i]->current_start_alter_id is 0 , safe to schedule. + We dont want to schedule on worker which already have been scheduled SA + but CA/RA has not been scheduled yet. current_start_alter_id will indicate + this. If we dont do this we will get deadlock. + 2. (For Gtid_log_event SA) + call choose_thread_internal so that e->rpl_threads[idx] is not null + update the current_start_alter_id + 3. (For Gtid_log_event SA) + update local e->pending_start_alters(local) variable and + pool->current_start_alters(global) + We need 2 status variable (global and local) because we can have + slave_domain_parallel_threads != pool->threads. + 4. (For CA/RA Gtid_log_event) + Update e->pending_start_alters and pool->current_start_alters + while holding mutex lock on pool (if SA is not assigned to + reserved thread) + + + @returns + true Worker allocated (choose_thread_internal called) + false Worker not allocated (choose_thread_internal not called) +*/ +static bool handle_split_alter(rpl_parallel_entry *e, + Gtid_log_event *gtid_ev, uint32 *idx, + //choose_thread_internal specific + bool *did_enter_cond, rpl_group_info* rgi, + PSI_stage_info *old_stage) +{ + uint16 flags_extra= gtid_ev->flags_extra; + bool thread_allocated= false; + //Step 1 + if (flags_extra & Gtid_log_event::FL_START_ALTER_E1 || + //This will arrange finding threads for CA/RA as well + //as concurrent DDL + e->pending_start_alters) + { + /* + j is needed for round robin scheduling, we will start with rpl_thread_idx + go till rpl_thread_max and then start with 0 to rpl_thread_idx + */ + int j= e->rpl_thread_idx; + for(uint i= 0; i < e->rpl_thread_max; i++) + { + if (!e->rpl_threads[j] || e->rpl_threads[j]->current_owner + != &e->rpl_threads[j] || !e->rpl_threads[j]->current_start_alter_id) + { + //This condition will hit atleast one time no matter what happens + *idx= j; + DBUG_PRINT("info", ("Start alter id %d", j)); + goto idx_found; + } + j++; + j= j % e->rpl_thread_max; + } + //We did not find and idx + DBUG_ASSERT(0); + return false; +idx_found: + e->rpl_thread_idx= *idx; + e->choose_thread_internal(*idx, did_enter_cond, rgi, old_stage); + thread_allocated= true; + if (flags_extra & Gtid_log_event::FL_START_ALTER_E1) + { + mysql_mutex_assert_owner(&e->rpl_threads[*idx]->LOCK_rpl_thread); + e->rpl_threads[e->rpl_thread_idx]->current_start_alter_id= gtid_ev->seq_no; + e->rpl_threads[e->rpl_thread_idx]->current_start_alter_domain_id= + gtid_ev->domain_id; + /* + We are locking LOCK_rpl_thread_pool becuase we are going to update + current_start_alters + */ + mysql_mutex_lock(&global_rpl_thread_pool.LOCK_rpl_thread_pool); + if (e->pending_start_alters < e->rpl_thread_max - 1 && + global_rpl_thread_pool.current_start_alters + < global_rpl_thread_pool.count - 1) + { + e->pending_start_alters++; + global_rpl_thread_pool.current_start_alters++; + } + else + { + e->rpl_threads[*idx]->reserved_start_alter_thread= true; + e->rpl_threads[*idx]->current_start_alter_id= 0; + e->rpl_threads[*idx]->current_start_alter_domain_id= 0; + } + mysql_mutex_unlock(&global_rpl_thread_pool.LOCK_rpl_thread_pool); + } + } + if(flags_extra & (Gtid_log_event::FL_COMMIT_ALTER_E1 | + Gtid_log_event::FL_ROLLBACK_ALTER_E1 )) + { + //Free the corrosponding rpt current_start_alter_id + for(uint i= 0; i < e->rpl_thread_max; i++) + { + if(e->rpl_threads[i] && + e->rpl_threads[i]->current_start_alter_id == gtid_ev->sa_seq_no && + e->rpl_threads[i]->current_start_alter_domain_id == gtid_ev->domain_id) + { + mysql_mutex_lock(&global_rpl_thread_pool.LOCK_rpl_thread_pool); + e->rpl_threads[i]->current_start_alter_id= 0; + e->rpl_threads[i]->current_start_alter_domain_id= 0; + global_rpl_thread_pool.current_start_alters--; + e->pending_start_alters--; + DBUG_PRINT("info", ("Commit/Rollback alter id %d", i)); + mysql_mutex_unlock(&global_rpl_thread_pool.LOCK_rpl_thread_pool); + break; + } + } + } + + return thread_allocated; + +} + + /* Obtain a worker thread that we can queue an event to. @@ -2239,25 +2412,32 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond, Gtid_log_event *gtid_ev) { uint32 idx; - Relay_log_info *rli= rgi->rli; - rpl_parallel_thread *thr; idx= rpl_thread_idx; if (gtid_ev) { + if (++idx >= rpl_thread_max) + idx= 0; + //rpl_thread_idx will be updated handle_split_alter + if (handle_split_alter(this, gtid_ev, &idx, did_enter_cond, rgi, old_stage)) + return rpl_threads[idx]; if (gtid_ev->flags2 & (Gtid_log_event::FL_COMPLETED_XA | Gtid_log_event::FL_PREPARED_XA)) + { idx= my_hash_sort(&my_charset_bin, gtid_ev->xid.key(), gtid_ev->xid.key_length()) % rpl_thread_max; - else - { - ++idx; - if (idx >= rpl_thread_max) - idx= 0; } rpl_thread_idx= idx; } - thr= rpl_threads[idx]; + return choose_thread_internal(idx, did_enter_cond, rgi, old_stage); +} + +rpl_parallel_thread * rpl_parallel_entry::choose_thread_internal(uint idx, + bool *did_enter_cond, rpl_group_info *rgi, + PSI_stage_info *old_stage) +{ + rpl_parallel_thread* thr= rpl_threads[idx]; + Relay_log_info *rli= rgi->rli; if (thr) { *did_enter_cond= false; @@ -2375,12 +2555,13 @@ rpl_parallel::~rpl_parallel() rpl_parallel_entry * -rpl_parallel::find(uint32 domain_id) +rpl_parallel::find(uint32 domain_id, Relay_log_info *rli) { struct rpl_parallel_entry *e; if (!(e= (rpl_parallel_entry *)my_hash_search(&domain_hash, - (const uchar *)&domain_id, 0))) + (const uchar *)&domain_id, + sizeof(domain_id)))) { /* Allocate a new, empty one. */ ulong count= opt_slave_domain_parallel_threads; @@ -2400,6 +2581,8 @@ rpl_parallel::find(uint32 domain_id) e->domain_id= domain_id; e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX; e->pause_sub_id= (uint64)ULONGLONG_MAX; + e->pending_start_alters= 0; + e->rli= rli; mysql_mutex_init(key_LOCK_parallel_entry, &e->LOCK_parallel_entry, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_parallel_entry, &e->COND_parallel_entry, NULL); @@ -2412,7 +2595,11 @@ rpl_parallel::find(uint32 domain_id) } } else + { + DBUG_ASSERT(rli == e->rli); + e->force_abort= false; + } return e; } @@ -2429,7 +2616,7 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli) struct rpl_parallel_entry *e; rpl_parallel_thread *rpt; uint32 i, j; - + Master_info *mi= rli->mi; /* First signal all workers that they must force quit; no more events will be queued to complete any partial event groups executed. @@ -2482,6 +2669,45 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli) #endif global_rpl_thread_pool.copy_pool_for_pfs(rli); + /* + Shutdown SA alter threads through marking their execution states + to force their early post-SA execution exit. Upon that the affected SA threads + change their state to COMPLETED, notify any waiting CA|RA and this thread. + */ + start_alter_info *info=NULL; + mysql_mutex_lock(&mi->start_alter_list_lock); + List_iterator<start_alter_info> info_iterator(mi->start_alter_list); + mi->is_shutdown= true; // a sign to stop in concurrently coming in new SA:s + while ((info= info_iterator++)) + { + mysql_mutex_lock(&mi->start_alter_lock); + if (info->state == start_alter_state::COMPLETED) + { + mysql_mutex_unlock(&mi->start_alter_lock); + continue; + } + info->state= start_alter_state::ROLLBACK_ALTER; + // Any possible CA that is (will be) waiting will complete this ALTER instance + info->direct_commit_alter= true; + mysql_cond_broadcast(&info->start_alter_cond); // notify SA:s + mysql_mutex_unlock(&mi->start_alter_lock); + + // await SA in the COMPLETED state + mysql_mutex_lock(&mi->start_alter_lock); + while(info->state == start_alter_state::ROLLBACK_ALTER) + mysql_cond_wait(&info->start_alter_cond, &mi->start_alter_lock); + + DBUG_ASSERT(info->state == start_alter_state::COMPLETED); + + mysql_mutex_unlock(&mi->start_alter_lock); + } + mysql_mutex_unlock(&mi->start_alter_list_lock); + + DBUG_EXECUTE_IF("rpl_slave_stop_CA_before_binlog", + { + debug_sync_set_action(thd, STRING_WITH_LEN("now signal proceed_CA_1")); + }); + for (i= 0; i < domain_hash.records; ++i) { e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); @@ -2496,6 +2722,17 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli) } } } + // Now that all threads are docked, remained alter states are safe to destroy + mysql_mutex_lock(&mi->start_alter_list_lock); + info_iterator.rewind(); + while ((info= info_iterator++)) + { + info_iterator.remove(); + mysql_cond_destroy(&info->start_alter_cond); + my_free(info); + } + mi->is_shutdown= false; + mysql_mutex_unlock(&mi->start_alter_list_lock); } @@ -2840,7 +3077,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO || rli->mi->parallel_mode <= SLAVE_PARALLEL_MINIMAL ? 0 : gtid_ev->domain_id); - if (!(e= find(domain_id))) + if (!(e= find(domain_id, rli))) { my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME)); delete ev; @@ -2852,6 +3089,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, gtid.server_id= gtid_ev->server_id; gtid.seq_no= gtid_ev->seq_no; rli->update_relay_log_state(>id, 1); + serial_rgi->gtid_ev_flags_extra= gtid_ev->flags_extra; if (process_gtid_for_restart_pos(rli, >id)) { /* |