diff options
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r-- | sql/rpl_parallel.cc | 179 |
1 files changed, 169 insertions, 10 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 49ec08a9cea..a6a659cf950 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -1412,7 +1412,10 @@ handle_rpl_parallel_thread(void *arg) if (end_of_group) { in_event_group= false; - finish_event_group(rpt, event_gtid_sub_id, entry, rgi); + if (!rgi->get_finish_event_group_called()) + finish_event_group(rpt, event_gtid_sub_id, entry, rgi); + else + rgi->set_finish_event_group_called(false); rpt->loc_free_rgi(rgi); thd->rgi_slave= group_rgi= rgi= NULL; skip_event_group= false; @@ -1690,6 +1693,9 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, { mysql_mutex_lock(&pool->threads[i]->LOCK_rpl_thread); pool->threads[i]->delay_start= false; + pool->threads[i]->current_start_alter_id= 0; + pool->threads[i]->current_start_alter_domain_id= 0; + pool->threads[i]->reserved_start_alter_thread= false; mysql_cond_signal(&pool->threads[i]->COND_rpl_thread); while (!pool->threads[i]->running) mysql_cond_wait(&pool->threads[i]->COND_rpl_thread, @@ -1969,6 +1975,15 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev, rgi->retry_start_offset= rli->future_event_relay_log_pos-event_size; rgi->retry_event_count= 0; rgi->killed_for_retry= rpl_group_info::RETRY_KILL_NONE; + /* rgi is transaction specific so we need to move this value to rgi */ + rgi->reserved_start_alter_thread= reserved_start_alter_thread; + rgi->rpt= this; + /* + We can remove the reserved_start_alter_thread flag. + If we get more concurrent alter handle_split_alter will + automatically set this flag again. + */ + reserved_start_alter_thread= false; return rgi; } @@ -2033,6 +2048,10 @@ rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco) loc_gco_list= gco; } +void rpl_group_info::finish_start_alter_event_group() +{ + finish_event_group(rpt, this->gtid_sub_id, this->parallel_entry, this); +} rpl_parallel_thread::rpl_parallel_thread() : channel_name_length(0), last_error_number(0), last_error_timestamp(0), @@ -2042,7 +2061,7 @@ rpl_parallel_thread::rpl_parallel_thread() rpl_parallel_thread_pool::rpl_parallel_thread_pool() - : threads(0), free_list(0), count(0), inited(false), busy(false), + : threads(0), free_list(0), count(0), inited(false),current_start_alters(0), busy(false), pfs_bkp{0, false, NULL} { } @@ -2175,6 +2194,129 @@ rpl_parallel_thread_pool::copy_pool_for_pfs(Relay_log_info *rli) } } +/* + START ALTER , COMMIT ALTER / ROLLBACK ALTER scheduling + + Steps:- + 1. (For Gtid_log_event SA). Get the worker thread which is either + e->rpl_threads[i] is NULL means worker from poll has not been assigned yet + e->rpl_threads[i]->current_owner != &e->rpl_threads[i] + Thread has been released, or about to //same as choose_thread logic + !e->rpl_threads[i]->current_start_alter_id is 0 , safe to schedule. + We dont want to schedule on worker which already have been scheduled SA + but CA/RA has not been scheduled yet. current_start_alter_id will indicate + this. If we dont do this we will get deadlock. + 2. (For Gtid_log_event SA) + call choose_thread_internal so that e->rpl_threads[idx] is not null + update the current_start_alter_id + 3. (For Gtid_log_event SA) + update local e->pending_start_alters(local) variable and + pool->current_start_alters(global) + We need 2 status variable (global and local) because we can have + slave_domain_parallel_threads != pool->threads. + 4. (For CA/RA Gtid_log_event) + Update e->pending_start_alters and pool->current_start_alters + while holding mutex lock on pool (if SA is not assigned to + reserved thread) + + + @returns + true Worker allocated (choose_thread_internal called) + false Worker not allocated (choose_thread_internal not called) +*/ +static bool handle_split_alter(rpl_parallel_entry *e, + Gtid_log_event *gtid_ev, uint32 *idx, + //choose_thread_internal specific + bool *did_enter_cond, rpl_group_info* rgi, + PSI_stage_info *old_stage) +{ + uint16 flags_extra= gtid_ev->flags_extra; + bool thread_allocated= false; + //Step 1 + if (flags_extra & Log_event::FL_START_ALTER_E1 || + //This will arrange finding threads for CA/RA as well + //as concurrent DDL + e->pending_start_alters) + { + /* + j is needed for round robin scheduling, we will start with rpl_thread_idx + go till rpl_thread_max and then start with 0 to rpl_thread_idx + */ + int j= e->rpl_thread_idx; + for(uint i= 0; i < e->rpl_thread_max; i++) + { + if (!e->rpl_threads[j] || e->rpl_threads[j]->current_owner + != &e->rpl_threads[j] || !e->rpl_threads[j]->current_start_alter_id) + { + //This condition will hit atleast one time no matter what happens + *idx= j; + DBUG_PRINT("info", ("Start alter id %d", j)); + goto idx_found; + } + j++; + j= j % e->rpl_thread_max; + } + //We did not find and idx + DBUG_ASSERT(0); + return false; +idx_found: + e->rpl_thread_idx= *idx; + e->choose_thread_internal(*idx, did_enter_cond, rgi, old_stage); + thread_allocated= true; + if (flags_extra & Log_event::FL_START_ALTER_E1) + { + mysql_mutex_assert_owner(&e->rpl_threads[*idx]->LOCK_rpl_thread); + e->rpl_threads[e->rpl_thread_idx]->current_start_alter_id= gtid_ev->seq_no; + e->rpl_threads[e->rpl_thread_idx]->current_start_alter_domain_id= + gtid_ev->domain_id; + /* + We are locking LOCK_rpl_thread_pool becuase we are going to update + current_start_alters + */ + mysql_mutex_lock(&global_rpl_thread_pool.LOCK_rpl_thread_pool); + if (e->pending_start_alters < e->rpl_thread_max - 1 && + global_rpl_thread_pool.current_start_alters + < global_rpl_thread_pool.count - 1) + { + e->pending_start_alters++; + global_rpl_thread_pool.current_start_alters++; + } + else + { + e->rpl_threads[*idx]->reserved_start_alter_thread= true; + e->rpl_threads[*idx]->current_start_alter_id= 0; + e->rpl_threads[*idx]->current_start_alter_domain_id= 0; + } + mysql_mutex_unlock(&global_rpl_thread_pool.LOCK_rpl_thread_pool); + } + } + if(flags_extra & (Log_event::FL_COMMIT_ALTER_E1 | + Log_event::FL_ROLLBACK_ALTER_E1 )) + { + //Free the corrosponding rpt current_start_alter_id + for(uint i= 0; i < e->rpl_thread_max; i++) + { + if(e->rpl_threads[i] && + e->rpl_threads[i]->current_start_alter_id == gtid_ev->sa_seq_no && + e->rpl_threads[i]->current_start_alter_domain_id == gtid_ev->domain_id) + { + mysql_mutex_lock(&global_rpl_thread_pool.LOCK_rpl_thread_pool); + e->rpl_threads[i]->current_start_alter_id= 0; + e->rpl_threads[i]->current_start_alter_domain_id= 0; + global_rpl_thread_pool.current_start_alters--; + e->pending_start_alters--; + DBUG_PRINT("info", ("Commit/Rollback alter id %d", i)); + mysql_mutex_unlock(&global_rpl_thread_pool.LOCK_rpl_thread_pool); + break; + } + } + } + + return thread_allocated; + +} + + /* Obtain a worker thread that we can queue an event to. @@ -2208,25 +2350,32 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond, Gtid_log_event *gtid_ev) { uint32 idx; - Relay_log_info *rli= rgi->rli; - rpl_parallel_thread *thr; idx= rpl_thread_idx; if (gtid_ev) { + if (++idx >= rpl_thread_max) + idx= 0; + //rpl_thread_idx will be updated handle_split_alter + if (handle_split_alter(this, gtid_ev, &idx, did_enter_cond, rgi, old_stage)) + return rpl_threads[idx]; if (gtid_ev->flags2 & (Gtid_log_event::FL_COMPLETED_XA | Gtid_log_event::FL_PREPARED_XA)) + { idx= my_hash_sort(&my_charset_bin, gtid_ev->xid.key(), gtid_ev->xid.key_length()) % rpl_thread_max; - else - { - ++idx; - if (idx >= rpl_thread_max) - idx= 0; } rpl_thread_idx= idx; } - thr= rpl_threads[idx]; + return choose_thread_internal(idx, did_enter_cond, rgi, old_stage); +} + +rpl_parallel_thread * rpl_parallel_entry::choose_thread_internal(uint idx, + bool *did_enter_cond, rpl_group_info *rgi, + PSI_stage_info *old_stage) +{ + rpl_parallel_thread* thr= rpl_threads[idx]; + Relay_log_info *rli= rgi->rli; if (thr) { *did_enter_cond= false; @@ -2366,6 +2515,7 @@ rpl_parallel::find(uint32 domain_id) e->domain_id= domain_id; e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX; e->pause_sub_id= (uint64)ULONGLONG_MAX; + e->pending_start_alters= 0; if (my_hash_insert(&domain_hash, (uchar *)e)) { my_free(e); @@ -2451,6 +2601,14 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli) { if ((rpt= e->rpl_threads[j])) { + /* + Dont wait for SA workers , But wait for CA/RA workers + If CA/RA is executed that means corresponding SA is also executed + And remaning SA will never recieve CA/RA so we have to manualy send it + */ + if (rpt->thd->rgi_slave && + (rpt->thd->rgi_slave->gtid_ev_flags_extra & Log_event::FL_START_ALTER_E1)) + continue; mysql_mutex_lock(&rpt->LOCK_rpl_thread); while (rpt->current_owner == &e->rpl_threads[j]) mysql_cond_wait(&rpt->COND_rpl_thread_stop, &rpt->LOCK_rpl_thread); @@ -2814,6 +2972,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, gtid.server_id= gtid_ev->server_id; gtid.seq_no= gtid_ev->seq_no; rli->update_relay_log_state(>id, 1); + serial_rgi->gtid_ev_flags_extra= gtid_ev->flags_extra; if (process_gtid_for_restart_pos(rli, >id)) { /* |