diff options
author | unknown <knielsen@knielsen-hq.org> | 2014-03-12 00:14:49 +0100 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2014-03-12 00:14:49 +0100 |
commit | 8b9b7ec395df111f886224a565b63a1a312e5679 (patch) | |
tree | a44e2719f727964a854ed9c45e26e7314f6b62a4 /sql | |
parent | 2c2478b82260f5110ea2c5bed3c6c7bcd3558453 (diff) | |
download | mariadb-git-8b9b7ec395df111f886224a565b63a1a312e5679.tar.gz |
MDEV-5804: If same GTID is received on multiple master connections in multi-source replication, the event is double-executed causing corruption or replication failure
Some fixes, mainly to make it work in non-parallel replication mode also
(--slave-parallel-threads=0).
Patch should be fairly complete now.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/log_event.cc | 4 | ||||
-rw-r--r-- | sql/mysqld.cc | 4 | ||||
-rw-r--r-- | sql/mysqld.h | 1 | ||||
-rw-r--r-- | sql/rpl_gtid.cc | 107 | ||||
-rw-r--r-- | sql/rpl_gtid.h | 9 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 18 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 8 | ||||
-rw-r--r-- | sql/rpl_rli.h | 14 | ||||
-rw-r--r-- | sql/slave.cc | 50 | ||||
-rw-r--r-- | sql/sys_vars.cc | 6 |
10 files changed, 177 insertions, 44 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index 98524d73433..c769c5b9209 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -4440,7 +4440,7 @@ Default database: '%s'. Query: '%s'", end: if (sub_id && !thd->is_slave_error) - rpl_global_gtid_slave_state.update_state_hash(sub_id, >id, rli); + rpl_global_gtid_slave_state.update_state_hash(sub_id, >id, rgi); /* Probably we have set thd->query, thd->db, thd->catalog to point to places @@ -7327,7 +7327,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi) thd->mdl_context.release_transactional_locks(); if (!res && sub_id) - rpl_global_gtid_slave_state.update_state_hash(sub_id, >id, rli); + rpl_global_gtid_slave_state.update_state_hash(sub_id, >id, rgi); /* Increment the global status commit count variable diff --git a/sql/mysqld.cc b/sql/mysqld.cc index d292cc86cfb..774f9531893 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -9447,6 +9447,7 @@ PSI_stage_info stage_waiting_for_prior_transaction_to_commit= { 0, "Waiting for PSI_stage_info stage_waiting_for_room_in_worker_thread= { 0, "Waiting for room in worker thread event queue", 0}; PSI_stage_info stage_master_gtid_wait_primary= { 0, "Waiting in MASTER_GTID_WAIT() (primary waiter)", 0}; PSI_stage_info stage_master_gtid_wait= { 0, "Waiting in MASTER_GTID_WAIT()", 0}; +PSI_stage_info stage_gtid_wait_other_connection= { 0, "Waiting for other master connection to process GTID received on multiple master connections", 0}; #ifdef HAVE_PSI_INTERFACE @@ -9565,7 +9566,8 @@ PSI_stage_info *all_server_stages[]= & stage_waiting_to_finalize_termination, & stage_waiting_to_get_readlock, & stage_master_gtid_wait_primary, - & stage_master_gtid_wait + & stage_master_gtid_wait, + & stage_gtid_wait_other_connection }; PSI_socket_key key_socket_tcpip, key_socket_unix, key_socket_client_connection; diff --git a/sql/mysqld.h b/sql/mysqld.h index 28b9c061945..fa79dd2d11d 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -441,6 +441,7 @@ extern PSI_stage_info stage_waiting_for_prior_transaction_to_commit; extern PSI_stage_info stage_waiting_for_room_in_worker_thread; extern PSI_stage_info stage_master_gtid_wait_primary; extern PSI_stage_info stage_master_gtid_wait; +extern PSI_stage_info stage_gtid_wait_other_connection; #ifdef HAVE_PSI_STATEMENT_INTERFACE /** diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc index b66651ae5fd..17c3b15c902 100644 --- a/sql/rpl_gtid.cc +++ b/sql/rpl_gtid.cc @@ -34,7 +34,7 @@ const LEX_STRING rpl_gtid_slave_state_table_name= void rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid, - const Relay_log_info *rli) + rpl_group_info *rgi) { int err; /* @@ -45,7 +45,7 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid, it is even committed. */ mysql_mutex_lock(&LOCK_slave_state); - err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, rli); + err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, rgi); mysql_mutex_unlock(&LOCK_slave_state); if (err) { @@ -75,9 +75,13 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi) if ((sub_id= rgi->gtid_sub_id)) { rgi->gtid_sub_id= 0; - if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false)) - DBUG_RETURN(1); - update_state_hash(sub_id, &rgi->current_gtid, rgi->rli); + if (rgi->gtid_ignore_duplicate_state!=rpl_group_info::GTID_DUPLICATE_IGNORE) + { + if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false)) + DBUG_RETURN(1); + update_state_hash(sub_id, &rgi->current_gtid, rgi); + } + rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL; } DBUG_RETURN(0); } @@ -110,16 +114,21 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi) -1 Error (out of memory to allocate a new element for the domain). */ int -rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli) +rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, rpl_group_info *rgi) { uint32 domain_id= gtid->domain_id; uint32 seq_no= gtid->seq_no; rpl_slave_state::element *elem; int res; + bool did_enter_cond; + PSI_stage_info old_stage; + THD *thd; + Relay_log_info *rli= rgi->rli; mysql_mutex_lock(&LOCK_slave_state); if (!(elem= get_element(domain_id))) { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); res= -1; goto err; } @@ -129,13 +138,14 @@ rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli) each lock release and re-take. */ - /* ToDo: Make this wait killable. */ + did_enter_cond= false; for (;;) { if (elem->highest_seq_no >= seq_no) { /* This sequence number is already applied, ignore it. */ res= 0; + rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_IGNORE; break; } if (!elem->owner_rli) @@ -143,6 +153,7 @@ rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli) /* The domain became free, grab it and apply the event. */ elem->owner_rli= rli; elem->owner_count= 1; + rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_OWNER; res= 1; break; } @@ -150,23 +161,78 @@ rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli) { /* Already own this domain, increment reference count and apply event. */ ++elem->owner_count; + rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_OWNER; res= 1; break; } + thd= rgi->thd; + if (thd->check_killed()) + { + thd->send_kill_message(); + res= -1; + break; + } /* Someone else is currently processing this GTID (or an earlier one). Wait for them to complete (or fail), and then check again. */ + if (!did_enter_cond) + { + thd->ENTER_COND(&elem->COND_gtid_ignore_duplicates, &LOCK_slave_state, + &stage_gtid_wait_other_connection, &old_stage); + did_enter_cond= true; + } mysql_cond_wait(&elem->COND_gtid_ignore_duplicates, &LOCK_slave_state); } err: - mysql_mutex_unlock(&LOCK_slave_state); + if (did_enter_cond) + thd->EXIT_COND(&old_stage); + else + mysql_mutex_unlock(&LOCK_slave_state); return res; } +void +rpl_slave_state::release_domain_owner(rpl_group_info *rgi) +{ + element *elem= NULL; + + mysql_mutex_lock(&LOCK_slave_state); + if (!(elem= get_element(rgi->current_gtid.domain_id))) + { + /* + We cannot really deal with error here, as we are already called in an + error handling case (transaction failure and rollback). + + However, get_element() only fails if the element did not exist already + and could not be allocated due to out-of-memory - and if it did not + exist, then we would not get here in the first place. + */ + mysql_mutex_unlock(&LOCK_slave_state); + return; + } + + if (rgi->gtid_ignore_duplicate_state == rpl_group_info::GTID_DUPLICATE_OWNER) + { + uint32 count= elem->owner_count; + DBUG_ASSERT(count > 0); + DBUG_ASSERT(elem->owner_rli == rgi->rli); + --count; + elem->owner_count= count; + if (count == 0) + { + elem->owner_rli= NULL; + mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates); + } + } + rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL; + mysql_mutex_unlock(&LOCK_slave_state); +} + + static void rpl_slave_state_free_element(void *arg) { @@ -233,7 +299,7 @@ rpl_slave_state::deinit() int rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, - uint64 seq_no, const Relay_log_info *rli) + uint64 seq_no, rpl_group_info *rgi) { element *elem= NULL; list_element *list_elem= NULL; @@ -256,18 +322,23 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, mysql_cond_broadcast(&elem->COND_wait_gtid); } - if (opt_gtid_ignore_duplicates && rli) + if (rgi) { - uint32 count= elem->owner_count; - DBUG_ASSERT(count > 0); - DBUG_ASSERT(elem->owner_rli == rli); - --count; - elem->owner_count= count; - if (count == 0) + if (rgi->gtid_ignore_duplicate_state==rpl_group_info::GTID_DUPLICATE_OWNER) { - elem->owner_rli= NULL; - mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates); + Relay_log_info *rli= rgi->rli; + uint32 count= elem->owner_count; + DBUG_ASSERT(count > 0); + DBUG_ASSERT(elem->owner_rli == rli); + --count; + elem->owner_count= count; + if (count == 0) + { + elem->owner_rli= NULL; + mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates); + } } + rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL; } if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME)))) diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h index aef1ca9e403..3e9e2fce25f 100644 --- a/sql/rpl_gtid.h +++ b/sql/rpl_gtid.h @@ -92,6 +92,7 @@ struct gtid_waiting { class Relay_log_info; +struct rpl_group_info; /* Replication slave state. @@ -171,7 +172,7 @@ struct rpl_slave_state void truncate_hash(); ulong count() const { return hash.records; } int update(uint32 domain_id, uint32 server_id, uint64 sub_id, - uint64 seq_no, const Relay_log_info *rli); + uint64 seq_no, rpl_group_info *rgi); int truncate_state_table(THD *thd); int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, bool in_transaction, bool in_statement); @@ -187,10 +188,10 @@ struct rpl_slave_state element *get_element(uint32 domain_id); int put_back_list(uint32 domain_id, list_element *list); - void update_state_hash(uint64 sub_id, rpl_gtid *gtid, - const Relay_log_info *rli); + void update_state_hash(uint64 sub_id, rpl_gtid *gtid, rpl_group_info *rgi); int record_and_update_gtid(THD *thd, struct rpl_group_info *rgi); - int check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli); + int check_duplicate_gtid(rpl_gtid *gtid, rpl_group_info *rgi); + void release_domain_owner(rpl_group_info *rgi); }; diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index d1e0ca518f1..3218acf2525 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -425,10 +425,22 @@ handle_rpl_parallel_thread(void *arg) { int res= rpl_global_gtid_slave_state.check_duplicate_gtid(&rgi->current_gtid, - rgi->rli); - /* ToDo: Handle res==-1 error. */ - if (!res) + rgi); + if (res < 0) + { + /* Error. */ + slave_output_error_info(rgi->rli, thd); + signal_error_to_sql_driver_thread(thd, rgi); + } + else if (!res) + { + /* GTID already applied by another master connection, skip. */ skip_event_group= true; + } + else + { + /* We have to apply the event. */ + } } } diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 020f984ad50..c9b4153d28e 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -1493,6 +1493,7 @@ rpl_group_info::reinit(Relay_log_info *rli) row_stmt_start_timestamp= 0; long_find_row_note_printed= false; did_mark_start_commit= false; + gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL; commit_orderer.reinit(); } @@ -1632,6 +1633,13 @@ void rpl_group_info::cleanup_context(THD *thd, bool error) thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS; /* + Ensure we always release the domain for others to process, when using + --gtid-ignore-duplicates. + */ + if (gtid_ignore_duplicate_state != GTID_DUPLICATE_NULL) + rpl_global_gtid_slave_state.release_domain_owner(this); + + /* Reset state related to long_find_row notes in the error log: - timestamp - flag that decides whether the slave prints or not diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 6db4ce5d61b..100ce25fe9c 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -575,6 +575,20 @@ struct rpl_group_info counting one event group twice. */ bool did_mark_start_commit; + enum { + GTID_DUPLICATE_NULL=0, + GTID_DUPLICATE_IGNORE=1, + GTID_DUPLICATE_OWNER=2 + }; + /* + When --gtid-ignore-duplicates, this is set to one of the above three + values: + GTID_DUPLICATE_NULL - Not using --gtid-ignore-duplicates. + GTID_DUPLICATE_IGNORE - This gtid already applied, skip the event group. + GTID_DUPLICATE_OWNER - We are the current owner of the domain, and must + apply the event group and then release the domain. + */ + uint8 gtid_ignore_duplicate_state; /* Runtime state for printing a note when slave is taking diff --git a/sql/slave.cc b/sql/slave.cc index cf741ccedc0..79a2c4ccd25 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -3508,18 +3508,46 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, */ } - /* - For GTID, allocate a new sub_id for the given domain_id. - The sub_id must be allocated in increasing order of binlog order. - */ - if (typ == GTID_EVENT && - event_group_new_gtid(serial_rgi, static_cast<Gtid_log_event *>(ev))) + if (typ == GTID_EVENT) { - sql_print_error("Error reading relay log event: %s", - "slave SQL thread aborted because of out-of-memory error"); - mysql_mutex_unlock(&rli->data_lock); - delete ev; - DBUG_RETURN(1); + Gtid_log_event *gev= static_cast<Gtid_log_event *>(ev); + + /* + For GTID, allocate a new sub_id for the given domain_id. + The sub_id must be allocated in increasing order of binlog order. + */ + if (event_group_new_gtid(serial_rgi, gev)) + { + sql_print_error("Error reading relay log event: %s", "slave SQL thread " + "aborted because of out-of-memory error"); + mysql_mutex_unlock(&rli->data_lock); + delete ev; + DBUG_RETURN(1); + } + + if (opt_gtid_ignore_duplicates) + { + serial_rgi->current_gtid.domain_id= gev->domain_id; + serial_rgi->current_gtid.server_id= gev->server_id; + serial_rgi->current_gtid.seq_no= gev->seq_no; + int res= rpl_global_gtid_slave_state.check_duplicate_gtid + (&serial_rgi->current_gtid, serial_rgi); + if (res < 0) + { + sql_print_error("Error processing GTID event: %s", "slave SQL " + "thread aborted because of out-of-memory error"); + mysql_mutex_unlock(&rli->data_lock); + delete ev; + DBUG_RETURN(1); + } + /* + If we need to skip this event group (because the GTID was already + applied), then do it using the code for slave_skip_counter, which + is able to handle skipping until the end of the event group. + */ + if (!res) + rli->slave_skip_counter= 1; + } } serial_rgi->future_event_relay_log_pos= rli->future_event_relay_log_pos; diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 8aa202b381b..c9f1055d17a 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -1839,18 +1839,14 @@ static bool fix_gtid_ignore_duplicates(sys_var *self, THD *thd, enum_var_type type) { bool running; - bool err= false; mysql_mutex_unlock(&LOCK_global_system_variables); mysql_mutex_lock(&LOCK_active_mi); running= master_info_index->give_error_if_slave_running(); mysql_mutex_unlock(&LOCK_active_mi); - if (running) - err= true; mysql_mutex_lock(&LOCK_global_system_variables); - /* ToDo: Isn't there a race here? I need to change the variable only under the LOCK_active_mi, and only if running is false. */ - return err; + return running ? true : false; } |