summaryrefslogtreecommitdiff
path: root/sql/rpl_rli.cc
diff options
context:
space:
mode:
authorKristian Nielsen <knielsen@knielsen-hq.org>2014-07-11 12:06:47 +0200
committerKristian Nielsen <knielsen@knielsen-hq.org>2014-07-11 12:06:47 +0200
commit501c56ef1e6058340a3dc94c8ddfdf7bf5c2678c (patch)
treeaeff27d359a58a960114230e8c0b6053799f68c8 /sql/rpl_rli.cc
parentfd0abecaf4e46bcfe4a07de9058b07779a43d44c (diff)
parente81ecc9c72d240a1b6d9d6619f4654d412da4090 (diff)
downloadmariadb-git-501c56ef1e6058340a3dc94c8ddfdf7bf5c2678c.tar.gz
MDEV-5262, MDEV-5914, MDEV-5941, MDEV-6020: Deadlocks during parallel replication causing replication to fail.
Merge the patches into MariaDB 10.0 main. With this patch, parallel replication will now automatically retry a transaction that fails due to deadlock or other temporary error, same as single-threaded replication. We catch deadlocks with InnoDB transactions due to enforced commit order. If T1 must commit before T2 in parallel replication and T1 ends up waiting for T2 inside InnoDB, we kill T2 and retry it later to resolve the deadlock automatically.
Diffstat (limited to 'sql/rpl_rli.cc')
-rw-r--r--sql/rpl_rli.cc79
1 files changed, 75 insertions, 4 deletions
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 595ef49d72f..0b133555cea 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -52,6 +52,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
info_fd(-1), cur_log_fd(-1), relay_log(&sync_relaylog_period),
sync_counter(0), is_relay_log_recovery(is_slave_recovery),
save_temporary_tables(0), mi(0),
+ inuse_relaylog_list(0), last_inuse_relaylog(0),
cur_log_old_open_count(0), group_relay_log_pos(0),
event_relay_log_pos(0),
#if HAVE_valgrind
@@ -91,6 +92,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
mysql_cond_init(key_relay_log_info_start_cond, &start_cond, NULL);
mysql_cond_init(key_relay_log_info_stop_cond, &stop_cond, NULL);
mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL);
+ my_atomic_rwlock_init(&inuse_relaylog_atomic_lock);
relay_log.init_pthread_objects();
DBUG_VOID_RETURN;
}
@@ -98,8 +100,17 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
Relay_log_info::~Relay_log_info()
{
+ inuse_relaylog *cur;
DBUG_ENTER("Relay_log_info::~Relay_log_info");
+ cur= inuse_relaylog_list;
+ while (cur)
+ {
+ DBUG_ASSERT(cur->queued_count == cur->dequeued_count);
+ inuse_relaylog *next= cur->next;
+ my_free(cur);
+ cur= next;
+ }
mysql_mutex_destroy(&run_lock);
mysql_mutex_destroy(&data_lock);
mysql_mutex_destroy(&log_space_lock);
@@ -107,6 +118,7 @@ Relay_log_info::~Relay_log_info()
mysql_cond_destroy(&start_cond);
mysql_cond_destroy(&stop_cond);
mysql_cond_destroy(&log_space_cond);
+ my_atomic_rwlock_destroy(&inuse_relaylog_atomic_lock);
relay_log.cleanup();
DBUG_VOID_RETURN;
}
@@ -1338,6 +1350,32 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
DBUG_VOID_RETURN;
}
+
+int
+Relay_log_info::alloc_inuse_relaylog(const char *name)
+{
+ inuse_relaylog *ir;
+
+ if (!(ir= (inuse_relaylog *)my_malloc(sizeof(*ir), MYF(MY_WME|MY_ZEROFILL))))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir));
+ return 1;
+ }
+ strmake_buf(ir->name, name);
+
+ if (!inuse_relaylog_list)
+ inuse_relaylog_list= ir;
+ else
+ {
+ last_inuse_relaylog->completed= true;
+ last_inuse_relaylog->next= ir;
+ }
+ last_inuse_relaylog= ir;
+
+ return 0;
+}
+
+
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
int
rpl_load_gtid_slave_state(THD *thd)
@@ -1524,6 +1562,9 @@ rpl_group_info::reinit(Relay_log_info *rli)
tables_to_lock_count= 0;
trans_retries= 0;
last_event_start_time= 0;
+ gtid_sub_id= 0;
+ commit_id= 0;
+ gtid_pending= false;
worker_error= 0;
row_stmt_start_timestamp= 0;
long_find_row_note_printed= false;
@@ -1533,7 +1574,7 @@ rpl_group_info::reinit(Relay_log_info *rli)
}
rpl_group_info::rpl_group_info(Relay_log_info *rli)
- : thd(0), gtid_sub_id(0), wait_commit_sub_id(0),
+ : thd(0), wait_commit_sub_id(0),
wait_commit_group_info(0), parallel_entry(0),
deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false)
{
@@ -1567,6 +1608,8 @@ event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev)
rgi->current_gtid.domain_id= gev->domain_id;
rgi->current_gtid.server_id= gev->server_id;
rgi->current_gtid.seq_no= gev->seq_no;
+ rgi->commit_id= gev->commit_id;
+ rgi->gtid_pending= true;
return 0;
}
@@ -1622,7 +1665,7 @@ delete_or_keep_event_post_apply(rpl_group_info *rgi,
void rpl_group_info::cleanup_context(THD *thd, bool error)
{
- DBUG_ENTER("Relay_log_info::cleanup_context");
+ DBUG_ENTER("rpl_group_info::cleanup_context");
DBUG_PRINT("enter", ("error: %d", (int) error));
DBUG_ASSERT(this->thd == thd);
@@ -1688,7 +1731,7 @@ void rpl_group_info::cleanup_context(THD *thd, bool error)
void rpl_group_info::clear_tables_to_lock()
{
- DBUG_ENTER("Relay_log_info::clear_tables_to_lock()");
+ DBUG_ENTER("rpl_group_info::clear_tables_to_lock()");
#ifndef DBUG_OFF
/**
When replicating in RBR and MyISAM Merge tables are involved
@@ -1735,7 +1778,7 @@ void rpl_group_info::clear_tables_to_lock()
void rpl_group_info::slave_close_thread_tables(THD *thd)
{
- DBUG_ENTER("Relay_log_info::slave_close_thread_tables(THD *thd)");
+ DBUG_ENTER("rpl_group_info::slave_close_thread_tables(THD *thd)");
thd->get_stmt_da()->set_overwrite_status(true);
thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
thd->get_stmt_da()->set_overwrite_status(false);
@@ -1824,6 +1867,34 @@ rpl_group_info::gtid_info()
}
+/*
+ Undo the effect of a prior mark_start_commit().
+
+ This is only used for retrying a transaction in parallel replication, after
+ we have encountered a deadlock or other temporary error.
+
+ When we get such a deadlock, it means that the current group of transactions
+ did not yet all start committing (else they would not have deadlocked). So
+ we will not yet have woken up anything in the next group, our rgi->gco is
+ still live, and we can simply decrement the counter (to be incremented again
+ later, when the retry succeeds and reaches the commit step).
+*/
+void
+rpl_group_info::unmark_start_commit()
+{
+ rpl_parallel_entry *e;
+
+ if (!did_mark_start_commit)
+ return;
+
+ e= this->parallel_entry;
+ mysql_mutex_lock(&e->LOCK_parallel_entry);
+ --e->count_committing_event_groups;
+ mysql_mutex_unlock(&e->LOCK_parallel_entry);
+ did_mark_start_commit= false;
+}
+
+
rpl_sql_thread_info::rpl_sql_thread_info(Rpl_filter *filter)
: rpl_filter(filter)
{