summaryrefslogtreecommitdiff
path: root/sql/rpl_parallel.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r--sql/rpl_parallel.cc179
1 files changed, 169 insertions, 10 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 49ec08a9cea..a6a659cf950 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -1412,7 +1412,10 @@ handle_rpl_parallel_thread(void *arg)
if (end_of_group)
{
in_event_group= false;
- finish_event_group(rpt, event_gtid_sub_id, entry, rgi);
+ if (!rgi->get_finish_event_group_called())
+ finish_event_group(rpt, event_gtid_sub_id, entry, rgi);
+ else
+ rgi->set_finish_event_group_called(false);
rpt->loc_free_rgi(rgi);
thd->rgi_slave= group_rgi= rgi= NULL;
skip_event_group= false;
@@ -1690,6 +1693,9 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
{
mysql_mutex_lock(&pool->threads[i]->LOCK_rpl_thread);
pool->threads[i]->delay_start= false;
+ pool->threads[i]->current_start_alter_id= 0;
+ pool->threads[i]->current_start_alter_domain_id= 0;
+ pool->threads[i]->reserved_start_alter_thread= false;
mysql_cond_signal(&pool->threads[i]->COND_rpl_thread);
while (!pool->threads[i]->running)
mysql_cond_wait(&pool->threads[i]->COND_rpl_thread,
@@ -1969,6 +1975,15 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
rgi->retry_start_offset= rli->future_event_relay_log_pos-event_size;
rgi->retry_event_count= 0;
rgi->killed_for_retry= rpl_group_info::RETRY_KILL_NONE;
+ /* rgi is transaction specific so we need to move this value to rgi */
+ rgi->reserved_start_alter_thread= reserved_start_alter_thread;
+ rgi->rpt= this;
+ /*
+ We can remove the reserved_start_alter_thread flag.
+ If we get more concurrent alter handle_split_alter will
+ automatically set this flag again.
+ */
+ reserved_start_alter_thread= false;
return rgi;
}
@@ -2033,6 +2048,10 @@ rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco)
loc_gco_list= gco;
}
+void rpl_group_info::finish_start_alter_event_group()
+{
+ finish_event_group(rpt, this->gtid_sub_id, this->parallel_entry, this);
+}
rpl_parallel_thread::rpl_parallel_thread()
: channel_name_length(0), last_error_number(0), last_error_timestamp(0),
@@ -2042,7 +2061,7 @@ rpl_parallel_thread::rpl_parallel_thread()
rpl_parallel_thread_pool::rpl_parallel_thread_pool()
- : threads(0), free_list(0), count(0), inited(false), busy(false),
+ : threads(0), free_list(0), count(0), inited(false),current_start_alters(0), busy(false),
pfs_bkp{0, false, NULL}
{
}
@@ -2175,6 +2194,129 @@ rpl_parallel_thread_pool::copy_pool_for_pfs(Relay_log_info *rli)
}
}
+/*
+ START ALTER , COMMIT ALTER / ROLLBACK ALTER scheduling
+
+ Steps:-
+ 1. (For Gtid_log_event SA). Get the worker thread which is either
+ e->rpl_threads[i] is NULL means worker from poll has not been assigned yet
+ e->rpl_threads[i]->current_owner != &e->rpl_threads[i]
+ Thread has been released, or about to //same as choose_thread logic
+ !e->rpl_threads[i]->current_start_alter_id is 0 , safe to schedule.
+ We dont want to schedule on worker which already have been scheduled SA
+ but CA/RA has not been scheduled yet. current_start_alter_id will indicate
+ this. If we dont do this we will get deadlock.
+ 2. (For Gtid_log_event SA)
+ call choose_thread_internal so that e->rpl_threads[idx] is not null
+ update the current_start_alter_id
+ 3. (For Gtid_log_event SA)
+ update local e->pending_start_alters(local) variable and
+ pool->current_start_alters(global)
+ We need 2 status variable (global and local) because we can have
+ slave_domain_parallel_threads != pool->threads.
+ 4. (For CA/RA Gtid_log_event)
+ Update e->pending_start_alters and pool->current_start_alters
+ while holding mutex lock on pool (if SA is not assigned to
+ reserved thread)
+
+
+ @returns
+ true Worker allocated (choose_thread_internal called)
+ false Worker not allocated (choose_thread_internal not called)
+*/
+static bool handle_split_alter(rpl_parallel_entry *e,
+ Gtid_log_event *gtid_ev, uint32 *idx,
+ //choose_thread_internal specific
+ bool *did_enter_cond, rpl_group_info* rgi,
+ PSI_stage_info *old_stage)
+{
+ uint16 flags_extra= gtid_ev->flags_extra;
+ bool thread_allocated= false;
+ //Step 1
+ if (flags_extra & Log_event::FL_START_ALTER_E1 ||
+ //This will arrange finding threads for CA/RA as well
+ //as concurrent DDL
+ e->pending_start_alters)
+ {
+ /*
+ j is needed for round robin scheduling, we will start with rpl_thread_idx
+ go till rpl_thread_max and then start with 0 to rpl_thread_idx
+ */
+ int j= e->rpl_thread_idx;
+ for(uint i= 0; i < e->rpl_thread_max; i++)
+ {
+ if (!e->rpl_threads[j] || e->rpl_threads[j]->current_owner
+ != &e->rpl_threads[j] || !e->rpl_threads[j]->current_start_alter_id)
+ {
+ //This condition will hit atleast one time no matter what happens
+ *idx= j;
+ DBUG_PRINT("info", ("Start alter id %d", j));
+ goto idx_found;
+ }
+ j++;
+ j= j % e->rpl_thread_max;
+ }
+ //We did not find and idx
+ DBUG_ASSERT(0);
+ return false;
+idx_found:
+ e->rpl_thread_idx= *idx;
+ e->choose_thread_internal(*idx, did_enter_cond, rgi, old_stage);
+ thread_allocated= true;
+ if (flags_extra & Log_event::FL_START_ALTER_E1)
+ {
+ mysql_mutex_assert_owner(&e->rpl_threads[*idx]->LOCK_rpl_thread);
+ e->rpl_threads[e->rpl_thread_idx]->current_start_alter_id= gtid_ev->seq_no;
+ e->rpl_threads[e->rpl_thread_idx]->current_start_alter_domain_id=
+ gtid_ev->domain_id;
+ /*
+ We are locking LOCK_rpl_thread_pool becuase we are going to update
+ current_start_alters
+ */
+ mysql_mutex_lock(&global_rpl_thread_pool.LOCK_rpl_thread_pool);
+ if (e->pending_start_alters < e->rpl_thread_max - 1 &&
+ global_rpl_thread_pool.current_start_alters
+ < global_rpl_thread_pool.count - 1)
+ {
+ e->pending_start_alters++;
+ global_rpl_thread_pool.current_start_alters++;
+ }
+ else
+ {
+ e->rpl_threads[*idx]->reserved_start_alter_thread= true;
+ e->rpl_threads[*idx]->current_start_alter_id= 0;
+ e->rpl_threads[*idx]->current_start_alter_domain_id= 0;
+ }
+ mysql_mutex_unlock(&global_rpl_thread_pool.LOCK_rpl_thread_pool);
+ }
+ }
+ if(flags_extra & (Log_event::FL_COMMIT_ALTER_E1 |
+ Log_event::FL_ROLLBACK_ALTER_E1 ))
+ {
+ //Free the corrosponding rpt current_start_alter_id
+ for(uint i= 0; i < e->rpl_thread_max; i++)
+ {
+ if(e->rpl_threads[i] &&
+ e->rpl_threads[i]->current_start_alter_id == gtid_ev->sa_seq_no &&
+ e->rpl_threads[i]->current_start_alter_domain_id == gtid_ev->domain_id)
+ {
+ mysql_mutex_lock(&global_rpl_thread_pool.LOCK_rpl_thread_pool);
+ e->rpl_threads[i]->current_start_alter_id= 0;
+ e->rpl_threads[i]->current_start_alter_domain_id= 0;
+ global_rpl_thread_pool.current_start_alters--;
+ e->pending_start_alters--;
+ DBUG_PRINT("info", ("Commit/Rollback alter id %d", i));
+ mysql_mutex_unlock(&global_rpl_thread_pool.LOCK_rpl_thread_pool);
+ break;
+ }
+ }
+ }
+
+ return thread_allocated;
+
+}
+
+
/*
Obtain a worker thread that we can queue an event to.
@@ -2208,25 +2350,32 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
Gtid_log_event *gtid_ev)
{
uint32 idx;
- Relay_log_info *rli= rgi->rli;
- rpl_parallel_thread *thr;
idx= rpl_thread_idx;
if (gtid_ev)
{
+ if (++idx >= rpl_thread_max)
+ idx= 0;
+ //rpl_thread_idx will be updated handle_split_alter
+ if (handle_split_alter(this, gtid_ev, &idx, did_enter_cond, rgi, old_stage))
+ return rpl_threads[idx];
if (gtid_ev->flags2 &
(Gtid_log_event::FL_COMPLETED_XA | Gtid_log_event::FL_PREPARED_XA))
+ {
idx= my_hash_sort(&my_charset_bin, gtid_ev->xid.key(),
gtid_ev->xid.key_length()) % rpl_thread_max;
- else
- {
- ++idx;
- if (idx >= rpl_thread_max)
- idx= 0;
}
rpl_thread_idx= idx;
}
- thr= rpl_threads[idx];
+ return choose_thread_internal(idx, did_enter_cond, rgi, old_stage);
+}
+
+rpl_parallel_thread * rpl_parallel_entry::choose_thread_internal(uint idx,
+ bool *did_enter_cond, rpl_group_info *rgi,
+ PSI_stage_info *old_stage)
+{
+ rpl_parallel_thread* thr= rpl_threads[idx];
+ Relay_log_info *rli= rgi->rli;
if (thr)
{
*did_enter_cond= false;
@@ -2366,6 +2515,7 @@ rpl_parallel::find(uint32 domain_id)
e->domain_id= domain_id;
e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX;
e->pause_sub_id= (uint64)ULONGLONG_MAX;
+ e->pending_start_alters= 0;
if (my_hash_insert(&domain_hash, (uchar *)e))
{
my_free(e);
@@ -2451,6 +2601,14 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli)
{
if ((rpt= e->rpl_threads[j]))
{
+ /*
+ Dont wait for SA workers , But wait for CA/RA workers
+ If CA/RA is executed that means corresponding SA is also executed
+ And remaning SA will never recieve CA/RA so we have to manualy send it
+ */
+ if (rpt->thd->rgi_slave &&
+ (rpt->thd->rgi_slave->gtid_ev_flags_extra & Log_event::FL_START_ALTER_E1))
+ continue;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
while (rpt->current_owner == &e->rpl_threads[j])
mysql_cond_wait(&rpt->COND_rpl_thread_stop, &rpt->LOCK_rpl_thread);
@@ -2814,6 +2972,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
gtid.server_id= gtid_ev->server_id;
gtid.seq_no= gtid_ev->seq_no;
rli->update_relay_log_state(&gtid, 1);
+ serial_rgi->gtid_ev_flags_extra= gtid_ev->flags_extra;
if (process_gtid_for_restart_pos(rli, &gtid))
{
/*