summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
authorSergei Golubchik <serg@mariadb.org>2022-05-10 11:53:59 +0200
committerSergei Golubchik <serg@mariadb.org>2022-05-10 14:01:23 +0200
commit3bc98a4ec4e7e7493ee93048dddfad87ceb3d8ff (patch)
treead189995a78fdc6068059e65b8455d3d4e61e719 /sql/slave.cc
parentf4d671bff2c4dcfe1bb2af6d40122576e4fcfa91 (diff)
parentfe3d07cab82b2215dc64f52ac93122072c33d021 (diff)
downloadmariadb-git-3bc98a4ec4e7e7493ee93048dddfad87ceb3d8ff.tar.gz
Merge branch '10.5' into 10.6
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc167
1 files changed, 147 insertions, 20 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index a6578be199a..54be2f07b3d 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -3307,9 +3307,9 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
protocol->store((uint32) mi->master_id);
// SQL_Delay
// Master_Ssl_Crl
- protocol->store_string_or_null(mi->ssl_ca, &my_charset_bin);
+ protocol->store_string_or_null(mi->ssl_crl, &my_charset_bin);
// Master_Ssl_Crlpath
- protocol->store_string_or_null(mi->ssl_capath, &my_charset_bin);
+ protocol->store_string_or_null(mi->ssl_crlpath, &my_charset_bin);
// Using_Gtid
protocol->store_string_or_null(mi->using_gtid_astext(mi->using_gtid),
&my_charset_bin);
@@ -3402,7 +3402,7 @@ bool show_all_master_info(THD* thd)
String gtid_pos;
Master_info **tmp;
List<Item> field_list;
- DBUG_ENTER("show_master_info");
+ DBUG_ENTER("show_all_master_info");
mysql_mutex_assert_owner(&LOCK_active_mi);
gtid_pos.length(0);
@@ -4938,6 +4938,7 @@ Stopping slave I/O thread due to out-of-memory error from master");
not cause the slave IO thread to stop, and the error messages are
already reported.
*/
+ DBUG_EXECUTE_IF("simulate_delay_semisync_slave_reply", my_sleep(800000););
(void)repl_semisync_slave.slave_reply(mi);
}
@@ -6704,17 +6705,75 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len)
mi->gtid_event_seen= true;
/*
- We have successfully queued to relay log everything before this GTID, so
+ Unless the previous group is malformed,
+ we have successfully queued to relay log everything before this GTID, so
in case of reconnect we can start from after any previous GTID.
- (Normally we would have updated gtid_current_pos earlier at the end of
- the previous event group, but better leave an extra check here for
- safety).
+ (We must have updated gtid_current_pos earlier at the end of
+ the previous event group. Unless ...)
*/
- if (mi->events_queued_since_last_gtid)
+ if (unlikely(mi->events_queued_since_last_gtid > 0))
{
- mi->gtid_current_pos.update(&mi->last_queued_gtid);
- mi->events_queued_since_last_gtid= 0;
+ /*
+ ...unless the last group has not been completed. An assert below
+ can be satisfied only with the strict mode that ensures
+ against "genuine" gtid duplicates.
+ */
+ rpl_gtid *gtid_in_slave_state=
+ mi->gtid_current_pos.find(mi->last_queued_gtid.domain_id);
+
+ // Slave gtid state must not have updated yet to the last received gtid.
+ DBUG_ASSERT((mi->using_gtid == Master_info::USE_GTID_NO ||
+ !opt_gtid_strict_mode) ||
+ (!gtid_in_slave_state ||
+ !(*gtid_in_slave_state == mi->last_queued_gtid)));
+
+ DBUG_EXECUTE_IF("slave_discard_xid_for_gtid_0_x_1000",
+ {
+ /* Inject an event group that is missing its XID commit event. */
+ if ((mi->last_queued_gtid.domain_id == 0 &&
+ mi->last_queued_gtid.seq_no == 1000) ||
+ (mi->last_queued_gtid.domain_id == 1 &&
+ mi->last_queued_gtid.seq_no == 32))
+ {
+ sql_print_warning(
+ "Unexpected break of being relay-logged GTID %u-%u-%llu "
+ "event group by the current GTID event %u-%u-%llu",
+ PARAM_GTID(mi->last_queued_gtid),PARAM_GTID(event_gtid));
+ DBUG_SET("-d,slave_discard_xid_for_gtid_0_x_1000");
+ goto dbug_gtid_accept;
+ }
+ });
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
+ sql_print_error("Unexpected break of being relay-logged GTID %u-%u-%llu "
+ "event group by the current GTID event %u-%u-%llu",
+ PARAM_GTID(mi->last_queued_gtid),PARAM_GTID(event_gtid));
+ goto err;
}
+ else if (unlikely(mi->gtid_reconnect_event_skip_count > 0))
+ {
+ if (mi->gtid_reconnect_event_skip_count ==
+ mi->events_queued_since_last_gtid)
+ {
+ DBUG_ASSERT(event_gtid == mi->last_queued_gtid);
+
+ goto default_action;
+ }
+
+ DBUG_ASSERT(0);
+ }
+ // else_likely{...
+#ifndef DBUG_OFF
+dbug_gtid_accept:
+ DBUG_EXECUTE_IF("slave_discard_gtid_0_x_1002",
+ {
+ if (mi->last_queued_gtid.server_id == 27697 &&
+ mi->last_queued_gtid.seq_no == 1002)
+ {
+ DBUG_SET("-d,slave_discard_gtid_0_x_1002");
+ goto skip_relay_logging;
+ }
+ });
+#endif
mi->last_queued_gtid= event_gtid;
mi->last_queued_gtid_standalone=
(gtid_flag & Gtid_log_event::FL_STANDALONE) != 0;
@@ -6724,7 +6783,7 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len)
++mi->events_queued_since_last_gtid;
inc_pos= event_len;
-
+ // ...} eof else_likely
}
break;
/*
@@ -6804,6 +6863,12 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len)
case XID_EVENT:
DBUG_EXECUTE_IF("slave_discard_xid_for_gtid_0_x_1000",
{
+ if (mi->last_queued_gtid.server_id == 27697 &&
+ mi->last_queued_gtid.seq_no == 1000)
+ {
+ DBUG_SET("-d,slave_discard_xid_for_gtid_0_x_1000");
+ goto skip_relay_logging;
+ }
/* Inject an event group that is missing its XID commit event. */
if (mi->last_queued_gtid.domain_id == 0 &&
mi->last_queued_gtid.seq_no == 1000)
@@ -6850,15 +6915,48 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len)
}
};);
- if (mi->using_gtid != Master_info::USE_GTID_NO && mi->gtid_event_seen)
+ if (mi->using_gtid != Master_info::USE_GTID_NO)
{
- if (unlikely(mi->gtid_reconnect_event_skip_count))
+ if (likely(mi->gtid_event_seen))
{
- --mi->gtid_reconnect_event_skip_count;
- gtid_skip_enqueue= true;
+ if (unlikely(mi->gtid_reconnect_event_skip_count))
+ {
+ if (!got_gtid_event &&
+ mi->gtid_reconnect_event_skip_count ==
+ mi->events_queued_since_last_gtid)
+ goto gtid_not_start; // the 1st re-sent must be gtid
+
+ --mi->gtid_reconnect_event_skip_count;
+ gtid_skip_enqueue= true;
+ }
+ else if (likely(mi->events_queued_since_last_gtid))
+ {
+ DBUG_ASSERT(!got_gtid_event);
+
+ ++mi->events_queued_since_last_gtid;
+ }
+ else if (Log_event::is_group_event((Log_event_type) (uchar)
+ buf[EVENT_TYPE_OFFSET]))
+ {
+ goto gtid_not_start; // no first gtid event in this group
+ }
+ }
+ else if (Log_event::is_group_event((Log_event_type) (uchar)
+ buf[EVENT_TYPE_OFFSET]))
+ {
+ gtid_not_start:
+
+ DBUG_ASSERT(!got_gtid_event);
+
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
+ sql_print_error("The current group of events starts with "
+ "a non-GTID %s event; "
+ "the last seen GTID is %u-%u-%llu",
+ Log_event::get_type_str((Log_event_type) (uchar)
+ buf[EVENT_TYPE_OFFSET]),
+ mi->last_queued_gtid);
+ goto err;
}
- else if (mi->events_queued_since_last_gtid)
- ++mi->events_queued_since_last_gtid;
}
if (!is_compress_event)
@@ -7069,11 +7167,13 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len)
The whole of the current event group is queued. So in case of
reconnect we can start from after the current GTID.
*/
- if (mi->gtid_reconnect_event_skip_count)
+ if (gtid_skip_enqueue)
{
bool first= true;
StringBuffer<1024> gtid_text;
+ DBUG_ASSERT(mi->events_queued_since_last_gtid > 1);
+
rpl_slave_state_tostring_helper(&gtid_text, &mi->last_queued_gtid,
&first);
sql_print_error("Slave IO thread received a terminal event from "
@@ -7083,12 +7183,39 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len)
gtid_text.ptr(),
mi->gtid_reconnect_event_skip_count,
mi->events_queued_since_last_gtid);
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
+ goto err;
}
mi->gtid_current_pos.update(&mi->last_queued_gtid);
mi->events_queued_since_last_gtid= 0;
- /* Reset the domain_id_filter flag. */
- mi->domain_id_filter.reset_filter();
+ if (unlikely(gtid_skip_enqueue))
+ {
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
+ sql_print_error("Recieved a group closing %s event "
+ "at %llu position in the group while there are "
+ "still %llu events to skip upon reconnecting; "
+ "the last seen GTID is %u-%u-%llu",
+ Log_event::get_type_str((Log_event_type) (uchar)
+ buf[EVENT_TYPE_OFFSET]),
+ (mi->events_queued_since_last_gtid -
+ mi->gtid_reconnect_event_skip_count),
+ mi->events_queued_since_last_gtid,
+ mi->last_queued_gtid);
+ goto err;
+ }
+ else
+ {
+ /*
+ The whole of the current event group is queued. So in case of
+ reconnect we can start from after the current GTID.
+ */
+ mi->gtid_current_pos.update(&mi->last_queued_gtid);
+ mi->events_queued_since_last_gtid= 0;
+
+ /* Reset the domain_id_filter flag. */
+ mi->domain_id_filter.reset_filter();
+ }
}
skip_relay_logging: