summaryrefslogtreecommitdiff
path: root/sql/rpl_parallel.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r--sql/rpl_parallel.cc274
1 files changed, 256 insertions, 18 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 78d81d973e4..1e07ca582da 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -150,6 +150,9 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id,
wait_for_commit *wfc= &rgi->commit_orderer;
int err;
+ if (rgi->get_finish_event_group_called())
+ return;
+
thd->get_stmt_da()->set_overwrite_status(true);
/*
Remove any left-over registration to wait for a prior commit to
@@ -279,6 +282,8 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id,
*/
thd->get_stmt_da()->reset_diagnostics_area();
wfc->wakeup_subsequent_commits(rgi->worker_error);
+ rgi->did_mark_start_commit= false;
+ rgi->set_finish_event_group_called(true);
}
@@ -583,6 +588,7 @@ rpl_pause_for_ftwrl(THD *thd)
uint32 i;
rpl_parallel_thread_pool *pool= &global_rpl_thread_pool;
int err;
+ Dynamic_array<Master_info*> mi_arr(4, 4); // array of replication source mi:s
DBUG_ENTER("rpl_pause_for_ftwrl");
/*
@@ -634,9 +640,36 @@ rpl_pause_for_ftwrl(THD *thd)
mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
};
--e->need_sub_id_signal;
+
thd->EXIT_COND(&old_stage);
if (err)
break;
+ /*
+ Notify any source any domain waiting-for-master Start-Alter to give way.
+ */
+ Master_info *mi= e->rli->mi;
+ bool found= false;
+ for (uint i= 0; i < mi_arr.elements() && !found; i++)
+ found= mi_arr.at(i) == mi;
+ if (!found)
+ {
+ mi_arr.append(mi);
+ start_alter_info *info=NULL;
+ mysql_mutex_lock(&mi->start_alter_list_lock);
+ List_iterator<start_alter_info> info_iterator(mi->start_alter_list);
+ while ((info= info_iterator++))
+ {
+ mysql_mutex_lock(&mi->start_alter_lock);
+
+ DBUG_ASSERT(info->state == start_alter_state::REGISTERED);
+
+ info->state= start_alter_state::ROLLBACK_ALTER;
+ info->direct_commit_alter= true;
+ mysql_cond_broadcast(&info->start_alter_cond);
+ mysql_mutex_unlock(&mi->start_alter_lock);
+ }
+ mysql_mutex_unlock(&mi->start_alter_list_lock);
+ }
}
if (err)
@@ -828,11 +861,9 @@ do_retry:
{
mysql_mutex_lock(&entry->LOCK_parallel_entry);
register_wait_for_prior_event_group_commit(rgi, entry);
- if (!(entry->stop_on_error_sub_id == (uint64) ULONGLONG_MAX ||
-#ifndef DBUG_OFF
- (DBUG_EVALUATE_IF("simulate_mdev_12746", 1, 0)) ||
-#endif
- rgi->gtid_sub_id < entry->stop_on_error_sub_id))
+ if (entry->stop_on_error_sub_id != (uint64) ULONGLONG_MAX &&
+ !DBUG_IF("simulate_mdev_12746") &&
+ rgi->gtid_sub_id >= entry->stop_on_error_sub_id)
{
/*
A failure of a preceding "parent" transaction may not be
@@ -1712,6 +1743,9 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
{
mysql_mutex_lock(&pool->threads[i]->LOCK_rpl_thread);
pool->threads[i]->delay_start= false;
+ pool->threads[i]->current_start_alter_id= 0;
+ pool->threads[i]->current_start_alter_domain_id= 0;
+ pool->threads[i]->reserved_start_alter_thread= false;
mysql_cond_signal(&pool->threads[i]->COND_rpl_thread);
while (!pool->threads[i]->running)
mysql_cond_wait(&pool->threads[i]->COND_rpl_thread,
@@ -1992,7 +2026,19 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
rgi->retry_start_offset= rli->future_event_relay_log_pos-event_size;
rgi->retry_event_count= 0;
rgi->killed_for_retry= rpl_group_info::RETRY_KILL_NONE;
+ /* rgi is transaction specific so we need to move this value to rgi */
+ rgi->reserved_start_alter_thread= reserved_start_alter_thread;
+ rgi->rpt= this;
+ rgi->direct_commit_alter= false;
+ rgi->finish_event_group_called= false;
+ DBUG_ASSERT(!rgi->sa_info);
+ /*
+ We can remove the reserved_start_alter_thread flag.
+ If we get more concurrent alter handle_split_alter will
+ automatically set this flag again.
+ */
+ reserved_start_alter_thread= false;
return rgi;
}
@@ -2063,6 +2109,10 @@ rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco)
loc_gco_list= gco;
}
+void rpl_group_info::finish_start_alter_event_group()
+{
+ finish_event_group(rpt, this->gtid_sub_id, this->parallel_entry, this);
+}
rpl_parallel_thread::rpl_parallel_thread()
: channel_name_length(0), last_error_number(0), last_error_timestamp(0),
@@ -2072,7 +2122,7 @@ rpl_parallel_thread::rpl_parallel_thread()
rpl_parallel_thread_pool::rpl_parallel_thread_pool()
- : threads(0), free_list(0), count(0), inited(false), busy(false),
+ : threads(0), free_list(0), count(0), inited(false),current_start_alters(0), busy(false),
pfs_bkp{0, false, false, NULL}
{
}
@@ -2206,6 +2256,129 @@ rpl_parallel_thread_pool::copy_pool_for_pfs(Relay_log_info *rli)
}
}
+/*
+ START ALTER , COMMIT ALTER / ROLLBACK ALTER scheduling
+
+ Steps:-
+ 1. (For Gtid_log_event SA). Get the worker thread which is either
+ e->rpl_threads[i] is NULL means worker from poll has not been assigned yet
+ e->rpl_threads[i]->current_owner != &e->rpl_threads[i]
+ Thread has been released, or about to //same as choose_thread logic
+ !e->rpl_threads[i]->current_start_alter_id is 0 , safe to schedule.
+ We dont want to schedule on worker which already have been scheduled SA
+ but CA/RA has not been scheduled yet. current_start_alter_id will indicate
+ this. If we dont do this we will get deadlock.
+ 2. (For Gtid_log_event SA)
+ call choose_thread_internal so that e->rpl_threads[idx] is not null
+ update the current_start_alter_id
+ 3. (For Gtid_log_event SA)
+ update local e->pending_start_alters(local) variable and
+ pool->current_start_alters(global)
+ We need 2 status variable (global and local) because we can have
+ slave_domain_parallel_threads != pool->threads.
+ 4. (For CA/RA Gtid_log_event)
+ Update e->pending_start_alters and pool->current_start_alters
+ while holding mutex lock on pool (if SA is not assigned to
+ reserved thread)
+
+
+ @returns
+ true Worker allocated (choose_thread_internal called)
+ false Worker not allocated (choose_thread_internal not called)
+*/
+static bool handle_split_alter(rpl_parallel_entry *e,
+ Gtid_log_event *gtid_ev, uint32 *idx,
+ //choose_thread_internal specific
+ bool *did_enter_cond, rpl_group_info* rgi,
+ PSI_stage_info *old_stage)
+{
+ uint16 flags_extra= gtid_ev->flags_extra;
+ bool thread_allocated= false;
+ //Step 1
+ if (flags_extra & Gtid_log_event::FL_START_ALTER_E1 ||
+ //This will arrange finding threads for CA/RA as well
+ //as concurrent DDL
+ e->pending_start_alters)
+ {
+ /*
+ j is needed for round robin scheduling, we will start with rpl_thread_idx
+ go till rpl_thread_max and then start with 0 to rpl_thread_idx
+ */
+ int j= e->rpl_thread_idx;
+ for(uint i= 0; i < e->rpl_thread_max; i++)
+ {
+ if (!e->rpl_threads[j] || e->rpl_threads[j]->current_owner
+ != &e->rpl_threads[j] || !e->rpl_threads[j]->current_start_alter_id)
+ {
+ //This condition will hit atleast one time no matter what happens
+ *idx= j;
+ DBUG_PRINT("info", ("Start alter id %d", j));
+ goto idx_found;
+ }
+ j++;
+ j= j % e->rpl_thread_max;
+ }
+ //We did not find and idx
+ DBUG_ASSERT(0);
+ return false;
+idx_found:
+ e->rpl_thread_idx= *idx;
+ e->choose_thread_internal(*idx, did_enter_cond, rgi, old_stage);
+ thread_allocated= true;
+ if (flags_extra & Gtid_log_event::FL_START_ALTER_E1)
+ {
+ mysql_mutex_assert_owner(&e->rpl_threads[*idx]->LOCK_rpl_thread);
+ e->rpl_threads[e->rpl_thread_idx]->current_start_alter_id= gtid_ev->seq_no;
+ e->rpl_threads[e->rpl_thread_idx]->current_start_alter_domain_id=
+ gtid_ev->domain_id;
+ /*
+ We are locking LOCK_rpl_thread_pool becuase we are going to update
+ current_start_alters
+ */
+ mysql_mutex_lock(&global_rpl_thread_pool.LOCK_rpl_thread_pool);
+ if (e->pending_start_alters < e->rpl_thread_max - 1 &&
+ global_rpl_thread_pool.current_start_alters
+ < global_rpl_thread_pool.count - 1)
+ {
+ e->pending_start_alters++;
+ global_rpl_thread_pool.current_start_alters++;
+ }
+ else
+ {
+ e->rpl_threads[*idx]->reserved_start_alter_thread= true;
+ e->rpl_threads[*idx]->current_start_alter_id= 0;
+ e->rpl_threads[*idx]->current_start_alter_domain_id= 0;
+ }
+ mysql_mutex_unlock(&global_rpl_thread_pool.LOCK_rpl_thread_pool);
+ }
+ }
+ if(flags_extra & (Gtid_log_event::FL_COMMIT_ALTER_E1 |
+ Gtid_log_event::FL_ROLLBACK_ALTER_E1 ))
+ {
+ //Free the corrosponding rpt current_start_alter_id
+ for(uint i= 0; i < e->rpl_thread_max; i++)
+ {
+ if(e->rpl_threads[i] &&
+ e->rpl_threads[i]->current_start_alter_id == gtid_ev->sa_seq_no &&
+ e->rpl_threads[i]->current_start_alter_domain_id == gtid_ev->domain_id)
+ {
+ mysql_mutex_lock(&global_rpl_thread_pool.LOCK_rpl_thread_pool);
+ e->rpl_threads[i]->current_start_alter_id= 0;
+ e->rpl_threads[i]->current_start_alter_domain_id= 0;
+ global_rpl_thread_pool.current_start_alters--;
+ e->pending_start_alters--;
+ DBUG_PRINT("info", ("Commit/Rollback alter id %d", i));
+ mysql_mutex_unlock(&global_rpl_thread_pool.LOCK_rpl_thread_pool);
+ break;
+ }
+ }
+ }
+
+ return thread_allocated;
+
+}
+
+
/*
Obtain a worker thread that we can queue an event to.
@@ -2239,25 +2412,32 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
Gtid_log_event *gtid_ev)
{
uint32 idx;
- Relay_log_info *rli= rgi->rli;
- rpl_parallel_thread *thr;
idx= rpl_thread_idx;
if (gtid_ev)
{
+ if (++idx >= rpl_thread_max)
+ idx= 0;
+ //rpl_thread_idx will be updated handle_split_alter
+ if (handle_split_alter(this, gtid_ev, &idx, did_enter_cond, rgi, old_stage))
+ return rpl_threads[idx];
if (gtid_ev->flags2 &
(Gtid_log_event::FL_COMPLETED_XA | Gtid_log_event::FL_PREPARED_XA))
+ {
idx= my_hash_sort(&my_charset_bin, gtid_ev->xid.key(),
gtid_ev->xid.key_length()) % rpl_thread_max;
- else
- {
- ++idx;
- if (idx >= rpl_thread_max)
- idx= 0;
}
rpl_thread_idx= idx;
}
- thr= rpl_threads[idx];
+ return choose_thread_internal(idx, did_enter_cond, rgi, old_stage);
+}
+
+rpl_parallel_thread * rpl_parallel_entry::choose_thread_internal(uint idx,
+ bool *did_enter_cond, rpl_group_info *rgi,
+ PSI_stage_info *old_stage)
+{
+ rpl_parallel_thread* thr= rpl_threads[idx];
+ Relay_log_info *rli= rgi->rli;
if (thr)
{
*did_enter_cond= false;
@@ -2375,12 +2555,13 @@ rpl_parallel::~rpl_parallel()
rpl_parallel_entry *
-rpl_parallel::find(uint32 domain_id)
+rpl_parallel::find(uint32 domain_id, Relay_log_info *rli)
{
struct rpl_parallel_entry *e;
if (!(e= (rpl_parallel_entry *)my_hash_search(&domain_hash,
- (const uchar *)&domain_id, 0)))
+ (const uchar *)&domain_id,
+ sizeof(domain_id))))
{
/* Allocate a new, empty one. */
ulong count= opt_slave_domain_parallel_threads;
@@ -2400,6 +2581,8 @@ rpl_parallel::find(uint32 domain_id)
e->domain_id= domain_id;
e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX;
e->pause_sub_id= (uint64)ULONGLONG_MAX;
+ e->pending_start_alters= 0;
+ e->rli= rli;
mysql_mutex_init(key_LOCK_parallel_entry, &e->LOCK_parallel_entry,
MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_parallel_entry, &e->COND_parallel_entry, NULL);
@@ -2412,7 +2595,11 @@ rpl_parallel::find(uint32 domain_id)
}
}
else
+ {
+ DBUG_ASSERT(rli == e->rli);
+
e->force_abort= false;
+ }
return e;
}
@@ -2429,7 +2616,7 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli)
struct rpl_parallel_entry *e;
rpl_parallel_thread *rpt;
uint32 i, j;
-
+ Master_info *mi= rli->mi;
/*
First signal all workers that they must force quit; no more events will
be queued to complete any partial event groups executed.
@@ -2482,6 +2669,45 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli)
#endif
global_rpl_thread_pool.copy_pool_for_pfs(rli);
+ /*
+ Shutdown SA alter threads through marking their execution states
+ to force their early post-SA execution exit. Upon that the affected SA threads
+ change their state to COMPLETED, notify any waiting CA|RA and this thread.
+ */
+ start_alter_info *info=NULL;
+ mysql_mutex_lock(&mi->start_alter_list_lock);
+ List_iterator<start_alter_info> info_iterator(mi->start_alter_list);
+ mi->is_shutdown= true; // a sign to stop in concurrently coming in new SA:s
+ while ((info= info_iterator++))
+ {
+ mysql_mutex_lock(&mi->start_alter_lock);
+ if (info->state == start_alter_state::COMPLETED)
+ {
+ mysql_mutex_unlock(&mi->start_alter_lock);
+ continue;
+ }
+ info->state= start_alter_state::ROLLBACK_ALTER;
+ // Any possible CA that is (will be) waiting will complete this ALTER instance
+ info->direct_commit_alter= true;
+ mysql_cond_broadcast(&info->start_alter_cond); // notify SA:s
+ mysql_mutex_unlock(&mi->start_alter_lock);
+
+ // await SA in the COMPLETED state
+ mysql_mutex_lock(&mi->start_alter_lock);
+ while(info->state == start_alter_state::ROLLBACK_ALTER)
+ mysql_cond_wait(&info->start_alter_cond, &mi->start_alter_lock);
+
+ DBUG_ASSERT(info->state == start_alter_state::COMPLETED);
+
+ mysql_mutex_unlock(&mi->start_alter_lock);
+ }
+ mysql_mutex_unlock(&mi->start_alter_list_lock);
+
+ DBUG_EXECUTE_IF("rpl_slave_stop_CA_before_binlog",
+ {
+ debug_sync_set_action(thd, STRING_WITH_LEN("now signal proceed_CA_1"));
+ });
+
for (i= 0; i < domain_hash.records; ++i)
{
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
@@ -2496,6 +2722,17 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli)
}
}
}
+ // Now that all threads are docked, remained alter states are safe to destroy
+ mysql_mutex_lock(&mi->start_alter_list_lock);
+ info_iterator.rewind();
+ while ((info= info_iterator++))
+ {
+ info_iterator.remove();
+ mysql_cond_destroy(&info->start_alter_cond);
+ my_free(info);
+ }
+ mi->is_shutdown= false;
+ mysql_mutex_unlock(&mi->start_alter_list_lock);
}
@@ -2840,7 +3077,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ||
rli->mi->parallel_mode <= SLAVE_PARALLEL_MINIMAL ?
0 : gtid_ev->domain_id);
- if (!(e= find(domain_id)))
+ if (!(e= find(domain_id, rli)))
{
my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
delete ev;
@@ -2852,6 +3089,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
gtid.server_id= gtid_ev->server_id;
gtid.seq_no= gtid_ev->seq_no;
rli->update_relay_log_state(&gtid, 1);
+ serial_rgi->gtid_ev_flags_extra= gtid_ev->flags_extra;
if (process_gtid_for_restart_pos(rli, &gtid))
{
/*