diff options
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r-- | sql/rpl_parallel.cc | 375 |
1 files changed, 281 insertions, 94 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 46c3e4aaaf4..305e8053032 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -2,6 +2,7 @@ #include "rpl_parallel.h" #include "slave.h" #include "rpl_mi.h" +#include "sql_parse.h" #include "debug_sync.h" /* @@ -113,6 +114,7 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id, wait_for_commit *wfc= &rgi->commit_orderer; int err; + thd->get_stmt_da()->set_overwrite_status(true); /* Remove any left-over registration to wait for a prior commit to complete. Normally, such wait would already have been removed at @@ -129,14 +131,14 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id, for us to complete and rely on this also ensuring that any other event in the group has completed. - But in the error case, we have to abort anyway, and it seems best - to just complete as quickly as possible with unregister. Anyone - waiting for us will in any case receive the error back from their - wait_for_prior_commit() call. + And in the error case, correct GCO lifetime relies on the fact that once + the last event group in the GCO has executed wait_for_prior_commit(), + all earlier event groups have also committed; this way no more + mark_start_commit() calls can be made and it is safe to de-allocate + the GCO. */ - if (rgi->worker_error) - wfc->unregister_wait_for_prior_commit(); - else if ((err= wfc->wait_for_prior_commit(thd))) + err= wfc->wait_for_prior_commit(thd); + if (unlikely(err) && !rgi->worker_error) signal_error_to_sql_driver_thread(thd, rgi, err); thd->wait_for_commit_ptr= NULL; @@ -166,12 +168,30 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id, done and also no longer need waiting for. */ entry->last_committed_sub_id= sub_id; + if (entry->need_sub_id_signal) + mysql_cond_broadcast(&entry->COND_parallel_entry); /* Now free any GCOs in which all transactions have committed. */ group_commit_orderer *tmp_gco= rgi->gco; while (tmp_gco && - (!tmp_gco->next_gco || tmp_gco->last_sub_id > sub_id)) + (!tmp_gco->next_gco || tmp_gco->last_sub_id > sub_id || + tmp_gco->next_gco->wait_count > entry->count_committing_event_groups)) + { + /* + We must not free a GCO before the wait_count of the following GCO has + been reached and wakeup has been sent. Otherwise we will lose the + wakeup and hang (there were several such bugs in the past). + + The intention is that this is ensured already since we only free when + the last event group in the GCO has committed + (tmp_gco->last_sub_id <= sub_id). However, if we have a bug, we have + extra check on next_gco->wait_count to hopefully avoid hanging; we + have here an assertion in debug builds that this check does not in + fact trigger. + */ + DBUG_ASSERT(!tmp_gco->next_gco || tmp_gco->last_sub_id > sub_id); tmp_gco= tmp_gco->prev_gco; + } while (tmp_gco) { group_commit_orderer *prev_gco= tmp_gco->prev_gco; @@ -193,6 +213,10 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id, thd->clear_error(); thd->reset_killed(); + /* + Would do thd->get_stmt_da()->set_overwrite_status(false) here, but + reset_diagnostics_area() already does that. + */ thd->get_stmt_da()->reset_diagnostics_area(); wfc->wakeup_subsequent_commits(rgi->worker_error); } @@ -305,7 +329,7 @@ retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt, IO_CACHE rlog; LOG_INFO linfo; File fd= (File)-1; - const char *errmsg= NULL; + const char *errmsg; inuse_relaylog *ir= rgi->relay_log; uint64 event_count; uint64 events_to_execute= rgi->retry_event_count; @@ -321,6 +345,7 @@ retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt, do_retry: event_count= 0; err= 0; + errmsg= NULL; /* If we already started committing before getting the deadlock (or other @@ -356,7 +381,16 @@ do_retry: */ if(thd->wait_for_commit_ptr) thd->wait_for_commit_ptr->unregister_wait_for_prior_commit(); + DBUG_EXECUTE_IF("inject_mdev8031", { + /* Simulate that we get deadlock killed at this exact point. */ + rgi->killed_for_retry= true; + mysql_mutex_lock(&thd->LOCK_thd_data); + thd->killed= KILL_CONNECTION; + mysql_mutex_unlock(&thd->LOCK_thd_data); + }); rgi->cleanup_context(thd, 1); + thd->reset_killed(); + thd->clear_error(); /* If we retry due to a deadlock kill that occured during the commit step, we @@ -372,9 +406,46 @@ do_retry: statistic_increment(slave_retried_transactions, LOCK_status); mysql_mutex_unlock(&rli->data_lock); - mysql_mutex_lock(&entry->LOCK_parallel_entry); - register_wait_for_prior_event_group_commit(rgi, entry); - mysql_mutex_unlock(&entry->LOCK_parallel_entry); + for (;;) + { + mysql_mutex_lock(&entry->LOCK_parallel_entry); + register_wait_for_prior_event_group_commit(rgi, entry); + mysql_mutex_unlock(&entry->LOCK_parallel_entry); + + /* + Let us wait for all prior transactions to complete before trying again. + This way, we avoid repeatedly conflicting with and getting deadlock + killed by the same earlier transaction. + */ + if (!(err= thd->wait_for_prior_commit())) + break; + + convert_kill_to_deadlock_error(rgi); + if (!has_temporary_error(thd)) + goto err; + /* + If we get a temporary error such as a deadlock kill, we can safely + ignore it, as we already rolled back. + + But we still want to retry the wait for the prior transaction to + complete its commit. + */ + thd->clear_error(); + thd->reset_killed(); + if(thd->wait_for_commit_ptr) + thd->wait_for_commit_ptr->unregister_wait_for_prior_commit(); + DBUG_EXECUTE_IF("inject_mdev8031", { + /* Inject a small sleep to give prior transaction a chance to commit. */ + my_sleep(100000); + }); + } + + /* + Let us clear any lingering deadlock kill one more time, here after + wait_for_prior_commit() has completed. This should rule out any + possibility of an old deadlock kill lingering on beyond this point. + */ + thd->reset_killed(); strmake_buf(log_name, ir->name); if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0) @@ -391,6 +462,14 @@ do_retry: err= 1; goto err; } + DBUG_EXECUTE_IF("inject_mdev8031", { + /* Simulate pending KILL caught in read_relay_log_description_event(). */ + if (thd->check_killed()) { + thd->send_kill_message(); + err= 1; + goto err; + } + }); my_b_seek(&rlog, cur_offset); do @@ -413,7 +492,7 @@ do_retry: { errmsg= "slave SQL thread aborted because of I/O error"; err= 1; - goto err; + goto check_retry; } if (rlog.error > 0) { @@ -442,10 +521,25 @@ do_retry: } strmake_buf(log_name ,linfo.log_file_name); + DBUG_EXECUTE_IF("inject_retry_event_group_open_binlog_kill", { + if (retries < 2) + { + /* Simulate that we get deadlock killed during open_binlog(). */ + mysql_reset_thd_for_next_command(thd); + rgi->killed_for_retry= true; + mysql_mutex_lock(&thd->LOCK_thd_data); + thd->killed= KILL_CONNECTION; + mysql_mutex_unlock(&thd->LOCK_thd_data); + thd->send_kill_message(); + fd= (File)-1; + err= 1; + goto check_retry; + } + }); if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0) { err= 1; - goto err; + goto check_retry; } /* Loop to try again on the new log file. */ } @@ -488,26 +582,31 @@ do_retry: if (retries == 0) err= dbug_simulate_tmp_error(rgi, thd);); DBUG_EXECUTE_IF("rpl_parallel_simulate_infinite_temp_err_gtid_0_x_100", err= dbug_simulate_tmp_error(rgi, thd);); - if (err) + if (!err) + continue; + +check_retry: + convert_kill_to_deadlock_error(rgi); + if (has_temporary_error(thd)) { - convert_kill_to_deadlock_error(rgi); - if (has_temporary_error(thd)) + ++retries; + if (retries < slave_trans_retries) { - ++retries; - if (retries < slave_trans_retries) + if (fd >= 0) { end_io_cache(&rlog); mysql_file_close(fd, MYF(MY_WME)); fd= (File)-1; - goto do_retry; } - sql_print_error("Slave worker thread retried transaction %lu time(s) " - "in vain, giving up. Consider raising the value of " - "the slave_transaction_retries variable.", - slave_trans_retries); + goto do_retry; } - goto err; + sql_print_error("Slave worker thread retried transaction %lu time(s) " + "in vain, giving up. Consider raising the value of " + "the slave_transaction_retries variable.", + slave_trans_retries); } + goto err; + } while (event_count < events_to_execute); err: @@ -640,7 +739,7 @@ handle_rpl_parallel_thread(void *arg) } DBUG_ASSERT(qev->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT); - thd->rgi_slave= group_rgi= rgi; + thd->rgi_slave= rgi; gco= rgi->gco; /* Handle a new event group, which will be initiated by a GTID event. */ if ((event_type= qev->ev->get_type_code()) == GTID_EVENT) @@ -657,6 +756,21 @@ handle_rpl_parallel_thread(void *arg) } }); + if(unlikely(thd->wait_for_commit_ptr) && group_rgi != NULL) + { + /* + This indicates that we get a new GTID event in the middle of + a not completed event group. This is corrupt binlog (the master + will never write such binlog), so it does not happen unless + someone tries to inject wrong crafted binlog, but let us still + try to handle it somewhat nicely. + */ + group_rgi->cleanup_context(thd, true); + finish_event_group(rpt, group_rgi->gtid_sub_id, + group_rgi->parallel_entry, group_rgi); + rpt->loc_free_rgi(group_rgi); + } + in_event_group= true; /* If the standalone flag is set, then this event group consists of a @@ -736,25 +850,11 @@ handle_rpl_parallel_thread(void *arg) if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id)) skip_event_group= true; - else - register_wait_for_prior_event_group_commit(rgi, entry); + register_wait_for_prior_event_group_commit(rgi, entry); unlock_or_exit_cond(thd, &entry->LOCK_parallel_entry, &did_enter_cond, &old_stage); - if(thd->wait_for_commit_ptr) - { - /* - This indicates that we get a new GTID event in the middle of - a not completed event group. This is corrupt binlog (the master - will never write such binlog), so it does not happen unless - someone tries to inject wrong crafted binlog, but let us still - try to handle it somewhat nicely. - */ - rgi->cleanup_context(thd, true); - thd->wait_for_commit_ptr->unregister_wait_for_prior_commit(); - thd->wait_for_commit_ptr->wakeup_subsequent_commits(rgi->worker_error); - } thd->wait_for_commit_ptr= &rgi->commit_orderer; if (opt_gtid_ignore_duplicates) @@ -780,6 +880,7 @@ handle_rpl_parallel_thread(void *arg) } } + group_rgi= rgi; group_ending= is_group_ending(qev->ev, event_type); if (group_ending && likely(!rgi->worker_error)) { @@ -824,7 +925,9 @@ handle_rpl_parallel_thread(void *arg) else { delete qev->ev; + thd->get_stmt_da()->set_overwrite_status(true); err= thd->wait_for_prior_commit(); + thd->get_stmt_da()->set_overwrite_status(false); } end_of_group= @@ -941,9 +1044,9 @@ dealloc_gco(group_commit_orderer *gco) } -int +static int rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, - uint32 new_count, bool skip_check) + uint32 new_count) { uint32 i; rpl_parallel_thread **new_list= NULL; @@ -988,24 +1091,6 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, new_free_list= new_list[i]; } - if (!skip_check) - { - mysql_mutex_lock(&LOCK_active_mi); - if (master_info_index->give_error_if_slave_running()) - { - mysql_mutex_unlock(&LOCK_active_mi); - goto err; - } - if (pool->changing) - { - mysql_mutex_unlock(&LOCK_active_mi); - my_error(ER_CHANGE_SLAVE_PARALLEL_THREADS_ACTIVE, MYF(0)); - goto err; - } - pool->changing= true; - mysql_mutex_unlock(&LOCK_active_mi); - } - /* Grab each old thread in turn, and signal it to stop. @@ -1065,13 +1150,6 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, mysql_mutex_unlock(&pool->threads[i]->LOCK_rpl_thread); } - if (!skip_check) - { - mysql_mutex_lock(&LOCK_active_mi); - pool->changing= false; - mysql_mutex_unlock(&LOCK_active_mi); - } - mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); mysql_cond_broadcast(&pool->COND_rpl_thread_pool); mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); @@ -1098,16 +1176,26 @@ err: } my_free(new_list); } - if (!skip_check) - { - mysql_mutex_lock(&LOCK_active_mi); - pool->changing= false; - mysql_mutex_unlock(&LOCK_active_mi); - } return 1; } +int +rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool) +{ + if (!pool->count) + return rpl_parallel_change_thread_count(pool, opt_slave_parallel_threads); + return 0; +} + + +int +rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool) +{ + return rpl_parallel_change_thread_count(pool, 0); +} + + void rpl_parallel_thread::batch_free() { @@ -1351,7 +1439,7 @@ rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco) rpl_parallel_thread_pool::rpl_parallel_thread_pool() - : count(0), threads(0), free_list(0), changing(false), inited(false) + : count(0), threads(0), free_list(0), inited(false) { } @@ -1366,10 +1454,14 @@ rpl_parallel_thread_pool::init(uint32 size) mysql_mutex_init(key_LOCK_rpl_thread_pool, &LOCK_rpl_thread_pool, MY_MUTEX_INIT_SLOW); mysql_cond_init(key_COND_rpl_thread_pool, &COND_rpl_thread_pool, NULL); - changing= false; inited= true; - return rpl_parallel_change_thread_count(this, size, true); + /* + The pool is initially empty. Threads will be spawned when a slave SQL + thread is started. + */ + + return 0; } @@ -1378,7 +1470,7 @@ rpl_parallel_thread_pool::destroy() { if (!inited) return; - rpl_parallel_change_thread_count(this, 0, true); + rpl_parallel_change_thread_count(this, 0); mysql_mutex_destroy(&LOCK_rpl_thread_pool); mysql_cond_destroy(&COND_rpl_thread_pool); inited= false; @@ -1804,28 +1896,66 @@ rpl_parallel::wait_for_workers_idle(THD *thd) max_i= domain_hash.records; for (i= 0; i < max_i; ++i) { - bool active; - wait_for_commit my_orderer; + PSI_stage_info old_stage; struct rpl_parallel_entry *e; + int err= 0; e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); mysql_mutex_lock(&e->LOCK_parallel_entry); - if ((active= (e->current_sub_id > e->last_committed_sub_id))) + e->need_sub_id_signal= true; + thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry, + &stage_waiting_for_workers_idle, &old_stage); + while (e->current_sub_id > e->last_committed_sub_id) { - wait_for_commit *waitee= &e->current_group_info->commit_orderer; - my_orderer.register_wait_for_prior_commit(waitee); - thd->wait_for_commit_ptr= &my_orderer; + if (thd->check_killed()) + { + thd->send_kill_message(); + err= 1; + break; + } + mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry); } - mysql_mutex_unlock(&e->LOCK_parallel_entry); - if (active) + e->need_sub_id_signal= false; + thd->EXIT_COND(&old_stage); + if (err) + return err; + } + return 0; +} + + +/* + Handle seeing a GTID during slave restart in GTID mode. If we stopped with + different replication domains having reached different positions in the relay + log, we need to skip event groups in domains that are further progressed. + + Updates the state with the seen GTID, and returns true if this GTID should + be skipped, false otherwise. +*/ +bool +process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid) +{ + slave_connection_state::entry *gtid_entry; + slave_connection_state *state= &rli->restart_gtid_pos; + + if (likely(state->count() == 0) || + !(gtid_entry= state->find_entry(gtid->domain_id))) + return false; + if (gtid->server_id == gtid_entry->gtid.server_id) + { + uint64 seq_no= gtid_entry->gtid.seq_no; + if (gtid->seq_no >= seq_no) { - int err= my_orderer.wait_for_prior_commit(thd); - thd->wait_for_commit_ptr= NULL; - if (err) - return err; + /* + This domain has reached its start position. So remove it, so that + further events will be processed normally. + */ + state->remove(>id_entry->gtid); } + return gtid->seq_no <= seq_no; } - return 0; + else + return true; } @@ -1890,13 +2020,15 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, return -1; /* Execute pre-10.0 event, which have no GTID, in single-threaded mode. */ - if (unlikely(!current) && typ != GTID_EVENT) + is_group_event= Log_event::is_group_event(typ); + if (unlikely(!current) && typ != GTID_EVENT && + !(unlikely(rli->gtid_skip_flag != GTID_SKIP_NOT) && is_group_event)) return -1; /* ToDo: what to do with this lock?!? */ mysql_mutex_unlock(&rli->data_lock); - if (typ == FORMAT_DESCRIPTION_EVENT) + if (unlikely(typ == FORMAT_DESCRIPTION_EVENT)) { Format_description_log_event *fdev= static_cast<Format_description_log_event *>(ev); @@ -1922,6 +2054,19 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, } } } + else if (unlikely(typ == GTID_LIST_EVENT)) + { + Gtid_list_log_event *glev= static_cast<Gtid_list_log_event *>(ev); + rpl_gtid *list= glev->list; + uint32 count= glev->count; + rli->update_relay_log_state(list, count); + while (count) + { + process_gtid_for_restart_pos(rli, list); + ++list; + --count; + } + } /* Stop queueing additional event groups once the SQL thread is requested to @@ -1931,7 +2076,6 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, been partially queued, but after that we will just ignore any further events the SQL driver thread may try to queue, and eventually it will stop. */ - is_group_event= Log_event::is_group_event(typ); if ((typ == GTID_EVENT || !is_group_event) && rli->abort_slave) sql_thread_stopping= true; if (sql_thread_stopping) @@ -1944,8 +2088,34 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, return 0; } + if (unlikely(rli->gtid_skip_flag != GTID_SKIP_NOT) && is_group_event) + { + if (typ == GTID_EVENT) + rli->gtid_skip_flag= GTID_SKIP_NOT; + else + { + if (rli->gtid_skip_flag == GTID_SKIP_STANDALONE) + { + if (!Log_event::is_part_of_group(typ)) + rli->gtid_skip_flag= GTID_SKIP_NOT; + } + else + { + DBUG_ASSERT(rli->gtid_skip_flag == GTID_SKIP_TRANSACTION); + if (typ == XID_EVENT || + (typ == QUERY_EVENT && + (((Query_log_event *)ev)->is_commit() || + ((Query_log_event *)ev)->is_rollback()))) + rli->gtid_skip_flag= GTID_SKIP_NOT; + } + delete_or_keep_event_post_apply(serial_rgi, typ, ev); + return 0; + } + } + if (typ == GTID_EVENT) { + rpl_gtid gtid; Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ? 0 : gtid_ev->domain_id); @@ -1956,6 +2126,23 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, return 1; } current= e; + + gtid.domain_id= gtid_ev->domain_id; + gtid.server_id= gtid_ev->server_id; + gtid.seq_no= gtid_ev->seq_no; + rli->update_relay_log_state(>id, 1); + if (process_gtid_for_restart_pos(rli, >id)) + { + /* + This domain has progressed further into the relay log before the last + SQL thread restart. So we need to skip this event group to not doubly + apply it. + */ + rli->gtid_skip_flag= ((gtid_ev->flags2 & Gtid_log_event::FL_STANDALONE) ? + GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION); + delete_or_keep_event_post_apply(serial_rgi, typ, ev); + return 0; + } } else e= current; |