summaryrefslogtreecommitdiff
path: root/sql/rpl_parallel.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r--sql/rpl_parallel.cc375
1 files changed, 281 insertions, 94 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 46c3e4aaaf4..305e8053032 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -2,6 +2,7 @@
#include "rpl_parallel.h"
#include "slave.h"
#include "rpl_mi.h"
+#include "sql_parse.h"
#include "debug_sync.h"
/*
@@ -113,6 +114,7 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id,
wait_for_commit *wfc= &rgi->commit_orderer;
int err;
+ thd->get_stmt_da()->set_overwrite_status(true);
/*
Remove any left-over registration to wait for a prior commit to
complete. Normally, such wait would already have been removed at
@@ -129,14 +131,14 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id,
for us to complete and rely on this also ensuring that any other
event in the group has completed.
- But in the error case, we have to abort anyway, and it seems best
- to just complete as quickly as possible with unregister. Anyone
- waiting for us will in any case receive the error back from their
- wait_for_prior_commit() call.
+ And in the error case, correct GCO lifetime relies on the fact that once
+ the last event group in the GCO has executed wait_for_prior_commit(),
+ all earlier event groups have also committed; this way no more
+ mark_start_commit() calls can be made and it is safe to de-allocate
+ the GCO.
*/
- if (rgi->worker_error)
- wfc->unregister_wait_for_prior_commit();
- else if ((err= wfc->wait_for_prior_commit(thd)))
+ err= wfc->wait_for_prior_commit(thd);
+ if (unlikely(err) && !rgi->worker_error)
signal_error_to_sql_driver_thread(thd, rgi, err);
thd->wait_for_commit_ptr= NULL;
@@ -166,12 +168,30 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id,
done and also no longer need waiting for.
*/
entry->last_committed_sub_id= sub_id;
+ if (entry->need_sub_id_signal)
+ mysql_cond_broadcast(&entry->COND_parallel_entry);
/* Now free any GCOs in which all transactions have committed. */
group_commit_orderer *tmp_gco= rgi->gco;
while (tmp_gco &&
- (!tmp_gco->next_gco || tmp_gco->last_sub_id > sub_id))
+ (!tmp_gco->next_gco || tmp_gco->last_sub_id > sub_id ||
+ tmp_gco->next_gco->wait_count > entry->count_committing_event_groups))
+ {
+ /*
+ We must not free a GCO before the wait_count of the following GCO has
+ been reached and wakeup has been sent. Otherwise we will lose the
+ wakeup and hang (there were several such bugs in the past).
+
+ The intention is that this is ensured already since we only free when
+ the last event group in the GCO has committed
+ (tmp_gco->last_sub_id <= sub_id). However, if we have a bug, we have
+ extra check on next_gco->wait_count to hopefully avoid hanging; we
+ have here an assertion in debug builds that this check does not in
+ fact trigger.
+ */
+ DBUG_ASSERT(!tmp_gco->next_gco || tmp_gco->last_sub_id > sub_id);
tmp_gco= tmp_gco->prev_gco;
+ }
while (tmp_gco)
{
group_commit_orderer *prev_gco= tmp_gco->prev_gco;
@@ -193,6 +213,10 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id,
thd->clear_error();
thd->reset_killed();
+ /*
+ Would do thd->get_stmt_da()->set_overwrite_status(false) here, but
+ reset_diagnostics_area() already does that.
+ */
thd->get_stmt_da()->reset_diagnostics_area();
wfc->wakeup_subsequent_commits(rgi->worker_error);
}
@@ -305,7 +329,7 @@ retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
IO_CACHE rlog;
LOG_INFO linfo;
File fd= (File)-1;
- const char *errmsg= NULL;
+ const char *errmsg;
inuse_relaylog *ir= rgi->relay_log;
uint64 event_count;
uint64 events_to_execute= rgi->retry_event_count;
@@ -321,6 +345,7 @@ retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
do_retry:
event_count= 0;
err= 0;
+ errmsg= NULL;
/*
If we already started committing before getting the deadlock (or other
@@ -356,7 +381,16 @@ do_retry:
*/
if(thd->wait_for_commit_ptr)
thd->wait_for_commit_ptr->unregister_wait_for_prior_commit();
+ DBUG_EXECUTE_IF("inject_mdev8031", {
+ /* Simulate that we get deadlock killed at this exact point. */
+ rgi->killed_for_retry= true;
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ thd->killed= KILL_CONNECTION;
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ });
rgi->cleanup_context(thd, 1);
+ thd->reset_killed();
+ thd->clear_error();
/*
If we retry due to a deadlock kill that occured during the commit step, we
@@ -372,9 +406,46 @@ do_retry:
statistic_increment(slave_retried_transactions, LOCK_status);
mysql_mutex_unlock(&rli->data_lock);
- mysql_mutex_lock(&entry->LOCK_parallel_entry);
- register_wait_for_prior_event_group_commit(rgi, entry);
- mysql_mutex_unlock(&entry->LOCK_parallel_entry);
+ for (;;)
+ {
+ mysql_mutex_lock(&entry->LOCK_parallel_entry);
+ register_wait_for_prior_event_group_commit(rgi, entry);
+ mysql_mutex_unlock(&entry->LOCK_parallel_entry);
+
+ /*
+ Let us wait for all prior transactions to complete before trying again.
+ This way, we avoid repeatedly conflicting with and getting deadlock
+ killed by the same earlier transaction.
+ */
+ if (!(err= thd->wait_for_prior_commit()))
+ break;
+
+ convert_kill_to_deadlock_error(rgi);
+ if (!has_temporary_error(thd))
+ goto err;
+ /*
+ If we get a temporary error such as a deadlock kill, we can safely
+ ignore it, as we already rolled back.
+
+ But we still want to retry the wait for the prior transaction to
+ complete its commit.
+ */
+ thd->clear_error();
+ thd->reset_killed();
+ if(thd->wait_for_commit_ptr)
+ thd->wait_for_commit_ptr->unregister_wait_for_prior_commit();
+ DBUG_EXECUTE_IF("inject_mdev8031", {
+ /* Inject a small sleep to give prior transaction a chance to commit. */
+ my_sleep(100000);
+ });
+ }
+
+ /*
+ Let us clear any lingering deadlock kill one more time, here after
+ wait_for_prior_commit() has completed. This should rule out any
+ possibility of an old deadlock kill lingering on beyond this point.
+ */
+ thd->reset_killed();
strmake_buf(log_name, ir->name);
if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
@@ -391,6 +462,14 @@ do_retry:
err= 1;
goto err;
}
+ DBUG_EXECUTE_IF("inject_mdev8031", {
+ /* Simulate pending KILL caught in read_relay_log_description_event(). */
+ if (thd->check_killed()) {
+ thd->send_kill_message();
+ err= 1;
+ goto err;
+ }
+ });
my_b_seek(&rlog, cur_offset);
do
@@ -413,7 +492,7 @@ do_retry:
{
errmsg= "slave SQL thread aborted because of I/O error";
err= 1;
- goto err;
+ goto check_retry;
}
if (rlog.error > 0)
{
@@ -442,10 +521,25 @@ do_retry:
}
strmake_buf(log_name ,linfo.log_file_name);
+ DBUG_EXECUTE_IF("inject_retry_event_group_open_binlog_kill", {
+ if (retries < 2)
+ {
+ /* Simulate that we get deadlock killed during open_binlog(). */
+ mysql_reset_thd_for_next_command(thd);
+ rgi->killed_for_retry= true;
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ thd->killed= KILL_CONNECTION;
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ thd->send_kill_message();
+ fd= (File)-1;
+ err= 1;
+ goto check_retry;
+ }
+ });
if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
{
err= 1;
- goto err;
+ goto check_retry;
}
/* Loop to try again on the new log file. */
}
@@ -488,26 +582,31 @@ do_retry:
if (retries == 0) err= dbug_simulate_tmp_error(rgi, thd););
DBUG_EXECUTE_IF("rpl_parallel_simulate_infinite_temp_err_gtid_0_x_100",
err= dbug_simulate_tmp_error(rgi, thd););
- if (err)
+ if (!err)
+ continue;
+
+check_retry:
+ convert_kill_to_deadlock_error(rgi);
+ if (has_temporary_error(thd))
{
- convert_kill_to_deadlock_error(rgi);
- if (has_temporary_error(thd))
+ ++retries;
+ if (retries < slave_trans_retries)
{
- ++retries;
- if (retries < slave_trans_retries)
+ if (fd >= 0)
{
end_io_cache(&rlog);
mysql_file_close(fd, MYF(MY_WME));
fd= (File)-1;
- goto do_retry;
}
- sql_print_error("Slave worker thread retried transaction %lu time(s) "
- "in vain, giving up. Consider raising the value of "
- "the slave_transaction_retries variable.",
- slave_trans_retries);
+ goto do_retry;
}
- goto err;
+ sql_print_error("Slave worker thread retried transaction %lu time(s) "
+ "in vain, giving up. Consider raising the value of "
+ "the slave_transaction_retries variable.",
+ slave_trans_retries);
}
+ goto err;
+
} while (event_count < events_to_execute);
err:
@@ -640,7 +739,7 @@ handle_rpl_parallel_thread(void *arg)
}
DBUG_ASSERT(qev->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT);
- thd->rgi_slave= group_rgi= rgi;
+ thd->rgi_slave= rgi;
gco= rgi->gco;
/* Handle a new event group, which will be initiated by a GTID event. */
if ((event_type= qev->ev->get_type_code()) == GTID_EVENT)
@@ -657,6 +756,21 @@ handle_rpl_parallel_thread(void *arg)
}
});
+ if(unlikely(thd->wait_for_commit_ptr) && group_rgi != NULL)
+ {
+ /*
+ This indicates that we get a new GTID event in the middle of
+ a not completed event group. This is corrupt binlog (the master
+ will never write such binlog), so it does not happen unless
+ someone tries to inject wrong crafted binlog, but let us still
+ try to handle it somewhat nicely.
+ */
+ group_rgi->cleanup_context(thd, true);
+ finish_event_group(rpt, group_rgi->gtid_sub_id,
+ group_rgi->parallel_entry, group_rgi);
+ rpt->loc_free_rgi(group_rgi);
+ }
+
in_event_group= true;
/*
If the standalone flag is set, then this event group consists of a
@@ -736,25 +850,11 @@ handle_rpl_parallel_thread(void *arg)
if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id))
skip_event_group= true;
- else
- register_wait_for_prior_event_group_commit(rgi, entry);
+ register_wait_for_prior_event_group_commit(rgi, entry);
unlock_or_exit_cond(thd, &entry->LOCK_parallel_entry,
&did_enter_cond, &old_stage);
- if(thd->wait_for_commit_ptr)
- {
- /*
- This indicates that we get a new GTID event in the middle of
- a not completed event group. This is corrupt binlog (the master
- will never write such binlog), so it does not happen unless
- someone tries to inject wrong crafted binlog, but let us still
- try to handle it somewhat nicely.
- */
- rgi->cleanup_context(thd, true);
- thd->wait_for_commit_ptr->unregister_wait_for_prior_commit();
- thd->wait_for_commit_ptr->wakeup_subsequent_commits(rgi->worker_error);
- }
thd->wait_for_commit_ptr= &rgi->commit_orderer;
if (opt_gtid_ignore_duplicates)
@@ -780,6 +880,7 @@ handle_rpl_parallel_thread(void *arg)
}
}
+ group_rgi= rgi;
group_ending= is_group_ending(qev->ev, event_type);
if (group_ending && likely(!rgi->worker_error))
{
@@ -824,7 +925,9 @@ handle_rpl_parallel_thread(void *arg)
else
{
delete qev->ev;
+ thd->get_stmt_da()->set_overwrite_status(true);
err= thd->wait_for_prior_commit();
+ thd->get_stmt_da()->set_overwrite_status(false);
}
end_of_group=
@@ -941,9 +1044,9 @@ dealloc_gco(group_commit_orderer *gco)
}
-int
+static int
rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
- uint32 new_count, bool skip_check)
+ uint32 new_count)
{
uint32 i;
rpl_parallel_thread **new_list= NULL;
@@ -988,24 +1091,6 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
new_free_list= new_list[i];
}
- if (!skip_check)
- {
- mysql_mutex_lock(&LOCK_active_mi);
- if (master_info_index->give_error_if_slave_running())
- {
- mysql_mutex_unlock(&LOCK_active_mi);
- goto err;
- }
- if (pool->changing)
- {
- mysql_mutex_unlock(&LOCK_active_mi);
- my_error(ER_CHANGE_SLAVE_PARALLEL_THREADS_ACTIVE, MYF(0));
- goto err;
- }
- pool->changing= true;
- mysql_mutex_unlock(&LOCK_active_mi);
- }
-
/*
Grab each old thread in turn, and signal it to stop.
@@ -1065,13 +1150,6 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
mysql_mutex_unlock(&pool->threads[i]->LOCK_rpl_thread);
}
- if (!skip_check)
- {
- mysql_mutex_lock(&LOCK_active_mi);
- pool->changing= false;
- mysql_mutex_unlock(&LOCK_active_mi);
- }
-
mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
mysql_cond_broadcast(&pool->COND_rpl_thread_pool);
mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
@@ -1098,16 +1176,26 @@ err:
}
my_free(new_list);
}
- if (!skip_check)
- {
- mysql_mutex_lock(&LOCK_active_mi);
- pool->changing= false;
- mysql_mutex_unlock(&LOCK_active_mi);
- }
return 1;
}
+int
+rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool)
+{
+ if (!pool->count)
+ return rpl_parallel_change_thread_count(pool, opt_slave_parallel_threads);
+ return 0;
+}
+
+
+int
+rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool)
+{
+ return rpl_parallel_change_thread_count(pool, 0);
+}
+
+
void
rpl_parallel_thread::batch_free()
{
@@ -1351,7 +1439,7 @@ rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco)
rpl_parallel_thread_pool::rpl_parallel_thread_pool()
- : count(0), threads(0), free_list(0), changing(false), inited(false)
+ : count(0), threads(0), free_list(0), inited(false)
{
}
@@ -1366,10 +1454,14 @@ rpl_parallel_thread_pool::init(uint32 size)
mysql_mutex_init(key_LOCK_rpl_thread_pool, &LOCK_rpl_thread_pool,
MY_MUTEX_INIT_SLOW);
mysql_cond_init(key_COND_rpl_thread_pool, &COND_rpl_thread_pool, NULL);
- changing= false;
inited= true;
- return rpl_parallel_change_thread_count(this, size, true);
+ /*
+ The pool is initially empty. Threads will be spawned when a slave SQL
+ thread is started.
+ */
+
+ return 0;
}
@@ -1378,7 +1470,7 @@ rpl_parallel_thread_pool::destroy()
{
if (!inited)
return;
- rpl_parallel_change_thread_count(this, 0, true);
+ rpl_parallel_change_thread_count(this, 0);
mysql_mutex_destroy(&LOCK_rpl_thread_pool);
mysql_cond_destroy(&COND_rpl_thread_pool);
inited= false;
@@ -1804,28 +1896,66 @@ rpl_parallel::wait_for_workers_idle(THD *thd)
max_i= domain_hash.records;
for (i= 0; i < max_i; ++i)
{
- bool active;
- wait_for_commit my_orderer;
+ PSI_stage_info old_stage;
struct rpl_parallel_entry *e;
+ int err= 0;
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
mysql_mutex_lock(&e->LOCK_parallel_entry);
- if ((active= (e->current_sub_id > e->last_committed_sub_id)))
+ e->need_sub_id_signal= true;
+ thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry,
+ &stage_waiting_for_workers_idle, &old_stage);
+ while (e->current_sub_id > e->last_committed_sub_id)
{
- wait_for_commit *waitee= &e->current_group_info->commit_orderer;
- my_orderer.register_wait_for_prior_commit(waitee);
- thd->wait_for_commit_ptr= &my_orderer;
+ if (thd->check_killed())
+ {
+ thd->send_kill_message();
+ err= 1;
+ break;
+ }
+ mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
}
- mysql_mutex_unlock(&e->LOCK_parallel_entry);
- if (active)
+ e->need_sub_id_signal= false;
+ thd->EXIT_COND(&old_stage);
+ if (err)
+ return err;
+ }
+ return 0;
+}
+
+
+/*
+ Handle seeing a GTID during slave restart in GTID mode. If we stopped with
+ different replication domains having reached different positions in the relay
+ log, we need to skip event groups in domains that are further progressed.
+
+ Updates the state with the seen GTID, and returns true if this GTID should
+ be skipped, false otherwise.
+*/
+bool
+process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid)
+{
+ slave_connection_state::entry *gtid_entry;
+ slave_connection_state *state= &rli->restart_gtid_pos;
+
+ if (likely(state->count() == 0) ||
+ !(gtid_entry= state->find_entry(gtid->domain_id)))
+ return false;
+ if (gtid->server_id == gtid_entry->gtid.server_id)
+ {
+ uint64 seq_no= gtid_entry->gtid.seq_no;
+ if (gtid->seq_no >= seq_no)
{
- int err= my_orderer.wait_for_prior_commit(thd);
- thd->wait_for_commit_ptr= NULL;
- if (err)
- return err;
+ /*
+ This domain has reached its start position. So remove it, so that
+ further events will be processed normally.
+ */
+ state->remove(&gtid_entry->gtid);
}
+ return gtid->seq_no <= seq_no;
}
- return 0;
+ else
+ return true;
}
@@ -1890,13 +2020,15 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
return -1;
/* Execute pre-10.0 event, which have no GTID, in single-threaded mode. */
- if (unlikely(!current) && typ != GTID_EVENT)
+ is_group_event= Log_event::is_group_event(typ);
+ if (unlikely(!current) && typ != GTID_EVENT &&
+ !(unlikely(rli->gtid_skip_flag != GTID_SKIP_NOT) && is_group_event))
return -1;
/* ToDo: what to do with this lock?!? */
mysql_mutex_unlock(&rli->data_lock);
- if (typ == FORMAT_DESCRIPTION_EVENT)
+ if (unlikely(typ == FORMAT_DESCRIPTION_EVENT))
{
Format_description_log_event *fdev=
static_cast<Format_description_log_event *>(ev);
@@ -1922,6 +2054,19 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
}
}
}
+ else if (unlikely(typ == GTID_LIST_EVENT))
+ {
+ Gtid_list_log_event *glev= static_cast<Gtid_list_log_event *>(ev);
+ rpl_gtid *list= glev->list;
+ uint32 count= glev->count;
+ rli->update_relay_log_state(list, count);
+ while (count)
+ {
+ process_gtid_for_restart_pos(rli, list);
+ ++list;
+ --count;
+ }
+ }
/*
Stop queueing additional event groups once the SQL thread is requested to
@@ -1931,7 +2076,6 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
been partially queued, but after that we will just ignore any further
events the SQL driver thread may try to queue, and eventually it will stop.
*/
- is_group_event= Log_event::is_group_event(typ);
if ((typ == GTID_EVENT || !is_group_event) && rli->abort_slave)
sql_thread_stopping= true;
if (sql_thread_stopping)
@@ -1944,8 +2088,34 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
return 0;
}
+ if (unlikely(rli->gtid_skip_flag != GTID_SKIP_NOT) && is_group_event)
+ {
+ if (typ == GTID_EVENT)
+ rli->gtid_skip_flag= GTID_SKIP_NOT;
+ else
+ {
+ if (rli->gtid_skip_flag == GTID_SKIP_STANDALONE)
+ {
+ if (!Log_event::is_part_of_group(typ))
+ rli->gtid_skip_flag= GTID_SKIP_NOT;
+ }
+ else
+ {
+ DBUG_ASSERT(rli->gtid_skip_flag == GTID_SKIP_TRANSACTION);
+ if (typ == XID_EVENT ||
+ (typ == QUERY_EVENT &&
+ (((Query_log_event *)ev)->is_commit() ||
+ ((Query_log_event *)ev)->is_rollback())))
+ rli->gtid_skip_flag= GTID_SKIP_NOT;
+ }
+ delete_or_keep_event_post_apply(serial_rgi, typ, ev);
+ return 0;
+ }
+ }
+
if (typ == GTID_EVENT)
{
+ rpl_gtid gtid;
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ?
0 : gtid_ev->domain_id);
@@ -1956,6 +2126,23 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
return 1;
}
current= e;
+
+ gtid.domain_id= gtid_ev->domain_id;
+ gtid.server_id= gtid_ev->server_id;
+ gtid.seq_no= gtid_ev->seq_no;
+ rli->update_relay_log_state(&gtid, 1);
+ if (process_gtid_for_restart_pos(rli, &gtid))
+ {
+ /*
+ This domain has progressed further into the relay log before the last
+ SQL thread restart. So we need to skip this event group to not doubly
+ apply it.
+ */
+ rli->gtid_skip_flag= ((gtid_ev->flags2 & Gtid_log_event::FL_STANDALONE) ?
+ GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
+ delete_or_keep_event_post_apply(serial_rgi, typ, ev);
+ return 0;
+ }
}
else
e= current;