summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/log_event.cc83
-rw-r--r--sql/log_event.h1
-rw-r--r--sql/share/errmsg-utf8.txt2
-rw-r--r--sql/slave.cc17
4 files changed, 75 insertions, 28 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 91d569d284d..b1d054bde6a 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -3724,6 +3724,35 @@ bool test_if_equal_repl_errors(int expected_error, int actual_error)
}
+void
+update_slave_gtid_state_hash(uint64 sub_id, rpl_gtid *gtid)
+{
+ int err;
+ /*
+ Add the gtid to the HASH in the replication slave state.
+
+ We must do this only _after_ commit, so that for parallel replication,
+ there will not be an attempt to delete the corresponding table row before
+ it is even committed.
+ */
+ rpl_global_gtid_slave_state.lock();
+ err= rpl_global_gtid_slave_state.update(gtid->domain_id, gtid->server_id,
+ sub_id, gtid->seq_no);
+ rpl_global_gtid_slave_state.unlock();
+ if (err)
+ {
+ sql_print_warning("Slave: Out of memory during slave state maintenance. "
+ "Some no longer necessary rows in table "
+ "mysql.rpl_slave_state may be left undeleted.");
+ /*
+ Such failure is not fatal. We will fail to delete the row for this
+ GTID, but it will do no harm and will be removed automatically on next
+ server restart.
+ */
+ }
+}
+
+
/**
@todo
Compare the values of "affected rows" around here. Something
@@ -3747,6 +3776,8 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli,
LEX_STRING new_db;
int expected_error,actual_error= 0;
HA_CREATE_INFO db_options;
+ uint64 sub_id= 0;
+ rpl_gtid gtid;
DBUG_ENTER("Query_log_event::do_apply_event");
/*
@@ -3810,6 +3841,26 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli,
*/
const_cast<Relay_log_info*>(rli)->inc_event_relay_log_pos();
const_cast<Relay_log_info*>(rli)->clear_flag(Relay_log_info::IN_STMT);
+
+ /*
+ Record any GTID in the same transaction, so slave state is
+ transactionally consistent.
+ */
+ if ((sub_id= rli->gtid_sub_id))
+ {
+ /* Clear the GTID from the RLI so we don't accidentally reuse it. */
+ const_cast<Relay_log_info*>(rli)->gtid_sub_id= 0;
+
+ gtid= rli->current_gtid;
+ error= rpl_global_gtid_slave_state.record_gtid(thd, &gtid, sub_id, true);
+ if (error)
+ {
+ my_error(ER_CANNOT_UPDATE_GTID_STATE, MYF(0));
+ trans_rollback(thd);
+ sub_id= 0;
+ goto compare_errors;
+ }
+ }
}
else
{
@@ -4119,6 +4170,9 @@ Default database: '%s'. Query: '%s'",
}
end:
+ if (sub_id && !thd->is_slave_error)
+ update_slave_gtid_state_hash(sub_id, &gtid);
+
/*
Probably we have set thd->query, thd->db, thd->catalog to point to places
in the data_buf of this event. Now the event is going to be deleted
@@ -7751,34 +7805,7 @@ int Xid_log_event::do_apply_event(Relay_log_info const *rli)
thd->mdl_context.release_transactional_locks();
if (sub_id)
- {
- /*
- Add the gtid to the HASH in the replication slave state.
-
- We must do this only here _after_ commit, so that for parallel
- replication, there will not be an attempt to delete the corresponding
- table row before it is even committed.
-
- Even if commit fails, we still add the entry - in case the table
- mysql.rpl_slave_state is non-transactional and the row is not removed
- by rollback.
- */
- rpl_global_gtid_slave_state.lock();
- err= rpl_global_gtid_slave_state.update(gtid.domain_id, gtid.server_id,
- sub_id, gtid.seq_no);
- rpl_global_gtid_slave_state.unlock();
- if (err)
- {
- sql_print_warning("Slave: Out of memory during slave state maintenance. "
- "Some no longer necessary rows in table "
- "mysql.rpl_slave_state may be left undeleted.");
- /*
- Such failure is not fatal. We will fail to delete the row for this
- GTID, but it will do no harm and will be removed automatically on next
- server restart.
- */
- }
- }
+ update_slave_gtid_state_hash(sub_id, &gtid);
/*
Increment the global status commit count variable
diff --git a/sql/log_event.h b/sql/log_event.h
index 385ff1563c5..2292150704c 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -4721,6 +4721,7 @@ extern TYPELIB binlog_checksum_typelib;
them once the fate of the Query is determined for execution.
*/
bool slave_execute_deferred_events(THD *thd);
+void update_slave_gtid_state_hash(uint64 sub_id, rpl_gtid *gtid);
#endif
/**
diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt
index 9161569f73c..39b9bb3383d 100644
--- a/sql/share/errmsg-utf8.txt
+++ b/sql/share/errmsg-utf8.txt
@@ -6604,3 +6604,5 @@ ER_FAILED_GTID_STATE_INIT
eng "Failed initializing replication GTID state"
ER_INCORRECT_GTID_STATE
eng "Could not parse GTID list for MASTER_GTID_POS"
+ER_CANNOT_UPDATE_GTID_STATE
+ eng "Could not update replication slave gtid state"
diff --git a/sql/slave.cc b/sql/slave.cc
index 33455986008..3c98ecf3639 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -5151,8 +5151,25 @@ MYSQL *rpl_connect_master(MYSQL *mysql)
bool flush_relay_log_info(Relay_log_info* rli)
{
bool error=0;
+ uint64 sub_id;
+ rpl_gtid gtid;
DBUG_ENTER("flush_relay_log_info");
+ /*
+ Update the GTID position, if we have it and did not already update
+ it in a GTID transaction.
+ */
+ if ((sub_id= rli->gtid_sub_id))
+ {
+ rli->gtid_sub_id= 0;
+ gtid= rli->current_gtid;
+ if (rpl_global_gtid_slave_state.record_gtid(rli->sql_thd,
+ &gtid, sub_id, false))
+ error= 1;
+ else
+ update_slave_gtid_state_hash(sub_id, &gtid);
+ }
+
if (unlikely(rli->no_storage))
DBUG_RETURN(0);