diff options
-rw-r--r-- | sql/log_event.cc | 83 | ||||
-rw-r--r-- | sql/log_event.h | 1 | ||||
-rw-r--r-- | sql/share/errmsg-utf8.txt | 2 | ||||
-rw-r--r-- | sql/slave.cc | 17 |
4 files changed, 75 insertions, 28 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index 91d569d284d..b1d054bde6a 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -3724,6 +3724,35 @@ bool test_if_equal_repl_errors(int expected_error, int actual_error) } +void +update_slave_gtid_state_hash(uint64 sub_id, rpl_gtid *gtid) +{ + int err; + /* + Add the gtid to the HASH in the replication slave state. + + We must do this only _after_ commit, so that for parallel replication, + there will not be an attempt to delete the corresponding table row before + it is even committed. + */ + rpl_global_gtid_slave_state.lock(); + err= rpl_global_gtid_slave_state.update(gtid->domain_id, gtid->server_id, + sub_id, gtid->seq_no); + rpl_global_gtid_slave_state.unlock(); + if (err) + { + sql_print_warning("Slave: Out of memory during slave state maintenance. " + "Some no longer necessary rows in table " + "mysql.rpl_slave_state may be left undeleted."); + /* + Such failure is not fatal. We will fail to delete the row for this + GTID, but it will do no harm and will be removed automatically on next + server restart. + */ + } +} + + /** @todo Compare the values of "affected rows" around here. Something @@ -3747,6 +3776,8 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli, LEX_STRING new_db; int expected_error,actual_error= 0; HA_CREATE_INFO db_options; + uint64 sub_id= 0; + rpl_gtid gtid; DBUG_ENTER("Query_log_event::do_apply_event"); /* @@ -3810,6 +3841,26 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli, */ const_cast<Relay_log_info*>(rli)->inc_event_relay_log_pos(); const_cast<Relay_log_info*>(rli)->clear_flag(Relay_log_info::IN_STMT); + + /* + Record any GTID in the same transaction, so slave state is + transactionally consistent. + */ + if ((sub_id= rli->gtid_sub_id)) + { + /* Clear the GTID from the RLI so we don't accidentally reuse it. */ + const_cast<Relay_log_info*>(rli)->gtid_sub_id= 0; + + gtid= rli->current_gtid; + error= rpl_global_gtid_slave_state.record_gtid(thd, >id, sub_id, true); + if (error) + { + my_error(ER_CANNOT_UPDATE_GTID_STATE, MYF(0)); + trans_rollback(thd); + sub_id= 0; + goto compare_errors; + } + } } else { @@ -4119,6 +4170,9 @@ Default database: '%s'. Query: '%s'", } end: + if (sub_id && !thd->is_slave_error) + update_slave_gtid_state_hash(sub_id, >id); + /* Probably we have set thd->query, thd->db, thd->catalog to point to places in the data_buf of this event. Now the event is going to be deleted @@ -7751,34 +7805,7 @@ int Xid_log_event::do_apply_event(Relay_log_info const *rli) thd->mdl_context.release_transactional_locks(); if (sub_id) - { - /* - Add the gtid to the HASH in the replication slave state. - - We must do this only here _after_ commit, so that for parallel - replication, there will not be an attempt to delete the corresponding - table row before it is even committed. - - Even if commit fails, we still add the entry - in case the table - mysql.rpl_slave_state is non-transactional and the row is not removed - by rollback. - */ - rpl_global_gtid_slave_state.lock(); - err= rpl_global_gtid_slave_state.update(gtid.domain_id, gtid.server_id, - sub_id, gtid.seq_no); - rpl_global_gtid_slave_state.unlock(); - if (err) - { - sql_print_warning("Slave: Out of memory during slave state maintenance. " - "Some no longer necessary rows in table " - "mysql.rpl_slave_state may be left undeleted."); - /* - Such failure is not fatal. We will fail to delete the row for this - GTID, but it will do no harm and will be removed automatically on next - server restart. - */ - } - } + update_slave_gtid_state_hash(sub_id, >id); /* Increment the global status commit count variable diff --git a/sql/log_event.h b/sql/log_event.h index 385ff1563c5..2292150704c 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -4721,6 +4721,7 @@ extern TYPELIB binlog_checksum_typelib; them once the fate of the Query is determined for execution. */ bool slave_execute_deferred_events(THD *thd); +void update_slave_gtid_state_hash(uint64 sub_id, rpl_gtid *gtid); #endif /** diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt index 9161569f73c..39b9bb3383d 100644 --- a/sql/share/errmsg-utf8.txt +++ b/sql/share/errmsg-utf8.txt @@ -6604,3 +6604,5 @@ ER_FAILED_GTID_STATE_INIT eng "Failed initializing replication GTID state" ER_INCORRECT_GTID_STATE eng "Could not parse GTID list for MASTER_GTID_POS" +ER_CANNOT_UPDATE_GTID_STATE + eng "Could not update replication slave gtid state" diff --git a/sql/slave.cc b/sql/slave.cc index 33455986008..3c98ecf3639 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -5151,8 +5151,25 @@ MYSQL *rpl_connect_master(MYSQL *mysql) bool flush_relay_log_info(Relay_log_info* rli) { bool error=0; + uint64 sub_id; + rpl_gtid gtid; DBUG_ENTER("flush_relay_log_info"); + /* + Update the GTID position, if we have it and did not already update + it in a GTID transaction. + */ + if ((sub_id= rli->gtid_sub_id)) + { + rli->gtid_sub_id= 0; + gtid= rli->current_gtid; + if (rpl_global_gtid_slave_state.record_gtid(rli->sql_thd, + >id, sub_id, false)) + error= 1; + else + update_slave_gtid_state_hash(sub_id, >id); + } + if (unlikely(rli->no_storage)) DBUG_RETURN(0); |