diff options
author | Kristian Nielsen <knielsen@knielsen-hq.org> | 2014-07-11 12:06:47 +0200 |
---|---|---|
committer | Kristian Nielsen <knielsen@knielsen-hq.org> | 2014-07-11 12:06:47 +0200 |
commit | 501c56ef1e6058340a3dc94c8ddfdf7bf5c2678c (patch) | |
tree | aeff27d359a58a960114230e8c0b6053799f68c8 /sql/rpl_rli.cc | |
parent | fd0abecaf4e46bcfe4a07de9058b07779a43d44c (diff) | |
parent | e81ecc9c72d240a1b6d9d6619f4654d412da4090 (diff) | |
download | mariadb-git-501c56ef1e6058340a3dc94c8ddfdf7bf5c2678c.tar.gz |
MDEV-5262, MDEV-5914, MDEV-5941, MDEV-6020: Deadlocks during parallel replication causing replication to fail.
Merge the patches into MariaDB 10.0 main.
With this patch, parallel replication will now automatically retry a
transaction that fails due to deadlock or other temporary error, same as
single-threaded replication.
We catch deadlocks with InnoDB transactions due to enforced commit order. If
T1 must commit before T2 in parallel replication and T1 ends up waiting for T2
inside InnoDB, we kill T2 and retry it later to resolve the deadlock
automatically.
Diffstat (limited to 'sql/rpl_rli.cc')
-rw-r--r-- | sql/rpl_rli.cc | 79 |
1 files changed, 75 insertions, 4 deletions
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 595ef49d72f..0b133555cea 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -52,6 +52,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) info_fd(-1), cur_log_fd(-1), relay_log(&sync_relaylog_period), sync_counter(0), is_relay_log_recovery(is_slave_recovery), save_temporary_tables(0), mi(0), + inuse_relaylog_list(0), last_inuse_relaylog(0), cur_log_old_open_count(0), group_relay_log_pos(0), event_relay_log_pos(0), #if HAVE_valgrind @@ -91,6 +92,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) mysql_cond_init(key_relay_log_info_start_cond, &start_cond, NULL); mysql_cond_init(key_relay_log_info_stop_cond, &stop_cond, NULL); mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL); + my_atomic_rwlock_init(&inuse_relaylog_atomic_lock); relay_log.init_pthread_objects(); DBUG_VOID_RETURN; } @@ -98,8 +100,17 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) Relay_log_info::~Relay_log_info() { + inuse_relaylog *cur; DBUG_ENTER("Relay_log_info::~Relay_log_info"); + cur= inuse_relaylog_list; + while (cur) + { + DBUG_ASSERT(cur->queued_count == cur->dequeued_count); + inuse_relaylog *next= cur->next; + my_free(cur); + cur= next; + } mysql_mutex_destroy(&run_lock); mysql_mutex_destroy(&data_lock); mysql_mutex_destroy(&log_space_lock); @@ -107,6 +118,7 @@ Relay_log_info::~Relay_log_info() mysql_cond_destroy(&start_cond); mysql_cond_destroy(&stop_cond); mysql_cond_destroy(&log_space_cond); + my_atomic_rwlock_destroy(&inuse_relaylog_atomic_lock); relay_log.cleanup(); DBUG_VOID_RETURN; } @@ -1338,6 +1350,32 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos, DBUG_VOID_RETURN; } + +int +Relay_log_info::alloc_inuse_relaylog(const char *name) +{ + inuse_relaylog *ir; + + if (!(ir= (inuse_relaylog *)my_malloc(sizeof(*ir), MYF(MY_WME|MY_ZEROFILL)))) + { + my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir)); + return 1; + } + strmake_buf(ir->name, name); + + if (!inuse_relaylog_list) + inuse_relaylog_list= ir; + else + { + last_inuse_relaylog->completed= true; + last_inuse_relaylog->next= ir; + } + last_inuse_relaylog= ir; + + return 0; +} + + #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) int rpl_load_gtid_slave_state(THD *thd) @@ -1524,6 +1562,9 @@ rpl_group_info::reinit(Relay_log_info *rli) tables_to_lock_count= 0; trans_retries= 0; last_event_start_time= 0; + gtid_sub_id= 0; + commit_id= 0; + gtid_pending= false; worker_error= 0; row_stmt_start_timestamp= 0; long_find_row_note_printed= false; @@ -1533,7 +1574,7 @@ rpl_group_info::reinit(Relay_log_info *rli) } rpl_group_info::rpl_group_info(Relay_log_info *rli) - : thd(0), gtid_sub_id(0), wait_commit_sub_id(0), + : thd(0), wait_commit_sub_id(0), wait_commit_group_info(0), parallel_entry(0), deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false) { @@ -1567,6 +1608,8 @@ event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev) rgi->current_gtid.domain_id= gev->domain_id; rgi->current_gtid.server_id= gev->server_id; rgi->current_gtid.seq_no= gev->seq_no; + rgi->commit_id= gev->commit_id; + rgi->gtid_pending= true; return 0; } @@ -1622,7 +1665,7 @@ delete_or_keep_event_post_apply(rpl_group_info *rgi, void rpl_group_info::cleanup_context(THD *thd, bool error) { - DBUG_ENTER("Relay_log_info::cleanup_context"); + DBUG_ENTER("rpl_group_info::cleanup_context"); DBUG_PRINT("enter", ("error: %d", (int) error)); DBUG_ASSERT(this->thd == thd); @@ -1688,7 +1731,7 @@ void rpl_group_info::cleanup_context(THD *thd, bool error) void rpl_group_info::clear_tables_to_lock() { - DBUG_ENTER("Relay_log_info::clear_tables_to_lock()"); + DBUG_ENTER("rpl_group_info::clear_tables_to_lock()"); #ifndef DBUG_OFF /** When replicating in RBR and MyISAM Merge tables are involved @@ -1735,7 +1778,7 @@ void rpl_group_info::clear_tables_to_lock() void rpl_group_info::slave_close_thread_tables(THD *thd) { - DBUG_ENTER("Relay_log_info::slave_close_thread_tables(THD *thd)"); + DBUG_ENTER("rpl_group_info::slave_close_thread_tables(THD *thd)"); thd->get_stmt_da()->set_overwrite_status(true); thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd); thd->get_stmt_da()->set_overwrite_status(false); @@ -1824,6 +1867,34 @@ rpl_group_info::gtid_info() } +/* + Undo the effect of a prior mark_start_commit(). + + This is only used for retrying a transaction in parallel replication, after + we have encountered a deadlock or other temporary error. + + When we get such a deadlock, it means that the current group of transactions + did not yet all start committing (else they would not have deadlocked). So + we will not yet have woken up anything in the next group, our rgi->gco is + still live, and we can simply decrement the counter (to be incremented again + later, when the retry succeeds and reaches the commit step). +*/ +void +rpl_group_info::unmark_start_commit() +{ + rpl_parallel_entry *e; + + if (!did_mark_start_commit) + return; + + e= this->parallel_entry; + mysql_mutex_lock(&e->LOCK_parallel_entry); + --e->count_committing_event_groups; + mysql_mutex_unlock(&e->LOCK_parallel_entry); + did_mark_start_commit= false; +} + + rpl_sql_thread_info::rpl_sql_thread_info(Rpl_filter *filter) : rpl_filter(filter) { |