summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mysql-test/include/kill_binlog_dump_threads.inc62
-rw-r--r--mysql-test/suite/rpl/r/rpl_gtid_reconnect.result169
-rw-r--r--mysql-test/suite/rpl/t/rpl_gtid_reconnect.test201
-rw-r--r--sql/rpl_gtid.cc91
-rw-r--r--sql/rpl_gtid.h4
-rw-r--r--sql/rpl_mi.cc24
-rw-r--r--sql/rpl_mi.h37
-rw-r--r--sql/share/errmsg-utf8.txt2
-rw-r--r--sql/slave.cc290
-rw-r--r--sql/sql_parse.cc21
-rw-r--r--sql/sql_repl.cc131
-rw-r--r--sql/sql_repl.h3
12 files changed, 936 insertions, 99 deletions
diff --git a/mysql-test/include/kill_binlog_dump_threads.inc b/mysql-test/include/kill_binlog_dump_threads.inc
new file mode 100644
index 00000000000..38e3e2a7c98
--- /dev/null
+++ b/mysql-test/include/kill_binlog_dump_threads.inc
@@ -0,0 +1,62 @@
+# ==== Purpose ====
+#
+# Terminate all binlog dump threads on a master.
+#
+# This is sometimes useful, as normally such dump threads can hang
+# around for some time before they notice that the slave has disconnected.
+#
+# Note that if there are active slave connections, they might try to
+# reconnect as their dump threads are killed, which may not lead to the
+# desired results.
+#
+#
+# ==== Usage ====
+#
+# [--let $kill_timeout= NUMBER]
+# --source include/stop_slavekill_binlog_dump_threads.inc
+#
+# Parameters:
+# $kill_timeout
+# Maximum number of seconds to wait for dump threads to disappear.
+
+
+--let $include_filename= kill_binlog_dump_threads.inc
+--source include/begin_include_file.inc
+
+--disable_query_log
+
+let $wait_counter= 300;
+if ($kill_timeout)
+{
+ let $wait_counter= `SELECT $kill_timeout * 10`;
+}
+
+let $success= 0;
+while ($wait_counter)
+{
+ dec $wait_counter;
+ let $_tid= `SELECT id FROM information_schema.processlist WHERE command = 'Binlog Dump' LIMIT 1`;
+ if ($_tid)
+ {
+ eval KILL QUERY $_tid;
+ }
+ if (!$_tid)
+ {
+ let $wait_counter= 0;
+ let $success= 1;
+ }
+ if (!$success)
+ {
+ real_sleep 0.1;
+ }
+}
+if (!$success)
+{
+ SHOW FULL PROCESSLIST;
+ --die Timeout while waiting for binlog dump threads to disappear.
+}
+
+--enable_query_log
+
+--let $include_filename= kill_binlog_dump_threads.inc
+--source include/end_include_file.inc
diff --git a/mysql-test/suite/rpl/r/rpl_gtid_reconnect.result b/mysql-test/suite/rpl/r/rpl_gtid_reconnect.result
new file mode 100644
index 00000000000..e9f64628e12
--- /dev/null
+++ b/mysql-test/suite/rpl/r/rpl_gtid_reconnect.result
@@ -0,0 +1,169 @@
+include/rpl_init.inc [topology=1->2]
+include/stop_slave.inc
+CHANGE MASTER TO master_use_gtid= current_pos;
+include/start_slave.inc
+CREATE TABLE t1 (a INT);
+FLUSH LOGS;
+SET gtid_domain_id=10;
+INSERT INTO t1 VALUES (1);
+INSERT INTO t1 VALUES (2);
+SET gtid_seq_no=100;
+INSERT INTO t1 VALUES (3);
+INSERT INTO t1 VALUES (4);
+INSERT INTO t1 VALUES (5);
+include/stop_slave.inc
+SELECT * FROM t1 ORDER BY a;
+a
+1
+2
+3
+4
+5
+include/kill_binlog_dump_threads.inc
+INSERT INTO t1 VALUES (10);
+SET @old_dbug= @@GLOBAL.debug_dbug;
+SET GLOBAL debug_dbug="+d,gtid_force_reconnect_at_10_1_100";
+include/start_slave.inc
+SELECT * FROM t1 ORDER BY a;
+a
+1
+2
+3
+4
+5
+10
+include/stop_slave.inc
+TRUNCATE t1;
+RESET MASTER;
+SET GLOBAL gtid_slave_pos= "";
+SET GLOBAL debug_dbug= @old_debug;
+TRUNCATE t1;
+RESET MASTER;
+SET gtid_domain_id=10;
+SET gtid_seq_no=50;
+INSERT INTO t1 VALUES (1);
+SET gtid_domain_id=11;
+INSERT INTO t1 VALUES (11);
+SET gtid_domain_id=10;
+SET gtid_seq_no=100;
+INSERT INTO t1 VALUES (2);
+SET gtid_domain_id=11;
+INSERT INTO t1 VALUES (12);
+SET gtid_domain_id=10;
+INSERT INTO t1 VALUES (3);
+SET gtid_domain_id=11;
+SET gtid_seq_no=200;
+INSERT INTO t1 VALUES (13);
+START SLAVE UNTIL master_gtid_pos="10-1-50,11-1-200";
+include/wait_for_slave_to_stop.inc
+SELECT * FROM t1 ORDER BY a;
+a
+1
+11
+12
+13
+include/kill_binlog_dump_threads.inc
+INSERT INTO t1 VALUES (20);
+SET GLOBAL debug_dbug="+d,gtid_force_reconnect_at_10_1_100";
+include/start_slave.inc
+SELECT * FROM t1 ORDER BY a;
+a
+1
+2
+3
+11
+12
+13
+20
+include/stop_slave.inc
+TRUNCATE t1;
+RESET MASTER;
+SET GLOBAL gtid_slave_pos= "";
+SET GLOBAL debug_dbug= @old_debug;
+TRUNCATE t1;
+RESET MASTER;
+include/kill_binlog_dump_threads.inc
+SET gtid_domain_id= 9;
+SET gtid_seq_no= 50;
+INSERT INTO t1 VALUES (1);
+SET gtid_domain_id= 10;
+INSERT INTO t1 VALUES (11);
+SET gtid_domain_id= 9;
+INSERT INTO t1 VALUES (2);
+SET gtid_domain_id= 10;
+SET gtid_seq_no= 100;
+INSERT INTO t1 VALUES (12);
+SET gtid_domain_id= 9;
+INSERT INTO t1 VALUES (3);
+SET gtid_domain_id= 10;
+SET gtid_seq_no= 200;
+INSERT INTO t1 VALUES (13);
+SET gtid_domain_id= 10;
+SET GLOBAL debug_dbug="+d,gtid_force_reconnect_at_10_1_100";
+START SLAVE UNTIL master_gtid_pos="9-1-50,10-1-200";
+include/wait_for_slave_to_stop.inc
+SELECT * FROM t1 ORDER BY a;
+a
+1
+11
+12
+13
+SET GLOBAL debug_dbug= @old_debug;
+INSERT INTO t1 VALUES (20);
+include/start_slave.inc
+SELECT * FROM t1 ORDER BY a;
+a
+1
+2
+3
+11
+12
+13
+20
+*** Test when slave IO thread needs to reconnect in the middle of an event group. ***
+include/stop_slave.inc
+TRUNCATE t1;
+RESET MASTER;
+SET GLOBAL gtid_slave_pos= "";
+SET GLOBAL debug_dbug= @old_debug;
+TRUNCATE t1;
+RESET MASTER;
+include/kill_binlog_dump_threads.inc
+SET GLOBAL debug_dbug="+d,binlog_force_reconnect_after_22_events";
+CREATE TABLE t2 (a INT PRIMARY KEY) ENGINE=InnoDB;
+INSERT INTO t2 VALUES (1);
+BEGIN;
+INSERT INTO t2 VALUES (10);
+INSERT INTO t2 VALUES (11);
+COMMIT;
+BEGIN;
+INSERT INTO t2 VALUES (20);
+INSERT INTO t2 VALUES (21);
+INSERT INTO t2 VALUES (22);
+INSERT INTO t2 VALUES (23);
+INSERT INTO t2 VALUES (24);
+INSERT INTO t2 VALUES (25);
+INSERT INTO t2 VALUES (26);
+INSERT INTO t2 VALUES (27);
+INSERT INTO t2 VALUES (28);
+INSERT INTO t2 VALUES (29);
+COMMIT;
+include/start_slave.inc
+SELECT * FROM t2 ORDER BY a;
+a
+1
+10
+11
+20
+21
+22
+23
+24
+25
+26
+27
+28
+29
+SET GLOBAL debug_dbug= @old_debug;
+DROP TABLE t1, t2;
+include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/t/rpl_gtid_reconnect.test b/mysql-test/suite/rpl/t/rpl_gtid_reconnect.test
new file mode 100644
index 00000000000..153a04d8918
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_gtid_reconnect.test
@@ -0,0 +1,201 @@
+--let $rpl_topology=1->2
+--source include/rpl_init.inc
+--source include/have_innodb.inc
+--source include/have_debug.inc
+
+
+--connection server_2
+--source include/stop_slave.inc
+CHANGE MASTER TO master_use_gtid= current_pos;
+--source include/start_slave.inc
+
+--connection server_1
+CREATE TABLE t1 (a INT);
+FLUSH LOGS;
+--save_master_pos
+
+--connection server_2
+--sync_with_master
+
+
+# Prepare a string of events and have the slave replicate all of it.
+--connection server_1
+SET gtid_domain_id=10;
+INSERT INTO t1 VALUES (1);
+INSERT INTO t1 VALUES (2);
+SET gtid_seq_no=100;
+INSERT INTO t1 VALUES (3);
+INSERT INTO t1 VALUES (4);
+INSERT INTO t1 VALUES (5);
+--save_master_pos
+
+--connection server_2
+--sync_with_master
+--source include/stop_slave.inc
+SELECT * FROM t1 ORDER BY a;
+
+# Now start the slave again, but force a reconnect. There was a bug that this
+# reconnect would cause duplicate events.
+
+--connection server_1
+# Make sure to get rid of any old binlog dump thread so it does not
+# interfere with our DBUG error injection.
+--source include/kill_binlog_dump_threads.inc
+INSERT INTO t1 VALUES (10);
+SET @old_dbug= @@GLOBAL.debug_dbug;
+SET GLOBAL debug_dbug="+d,gtid_force_reconnect_at_10_1_100";
+--save_master_pos
+
+--connection server_2
+--source include/start_slave.inc
+--sync_with_master
+SELECT * FROM t1 ORDER BY a;
+
+--source include/stop_slave.inc
+TRUNCATE t1;
+RESET MASTER;
+SET GLOBAL gtid_slave_pos= "";
+
+--connection server_1
+SET GLOBAL debug_dbug= @old_debug;
+TRUNCATE t1;
+RESET MASTER;
+
+# A1 B1 A2 B2 A3 B3, slave reached A1 and B3 and stopped. Slave starts,
+# reconnects at A2. There was a bug that B2 would be duplicated.
+
+SET gtid_domain_id=10;
+SET gtid_seq_no=50;
+INSERT INTO t1 VALUES (1);
+SET gtid_domain_id=11;
+INSERT INTO t1 VALUES (11);
+SET gtid_domain_id=10;
+SET gtid_seq_no=100;
+INSERT INTO t1 VALUES (2);
+SET gtid_domain_id=11;
+INSERT INTO t1 VALUES (12);
+SET gtid_domain_id=10;
+INSERT INTO t1 VALUES (3);
+SET gtid_domain_id=11;
+SET gtid_seq_no=200;
+INSERT INTO t1 VALUES (13);
+
+--connection server_2
+START SLAVE UNTIL master_gtid_pos="10-1-50,11-1-200";
+--source include/wait_for_slave_to_stop.inc
+SELECT * FROM t1 ORDER BY a;
+
+--connection server_1
+--source include/kill_binlog_dump_threads.inc
+INSERT INTO t1 VALUES (20);
+SET GLOBAL debug_dbug="+d,gtid_force_reconnect_at_10_1_100";
+--save_master_pos
+
+--connection server_2
+--source include/start_slave.inc
+--sync_with_master
+SELECT * FROM t1 ORDER BY a;
+
+--source include/stop_slave.inc
+TRUNCATE t1;
+RESET MASTER;
+SET GLOBAL gtid_slave_pos= "";
+
+--connection server_1
+SET GLOBAL debug_dbug= @old_debug;
+TRUNCATE t1;
+RESET MASTER;
+
+# A1 B1 A2 B2 A3 B3. START SLAVE UNTIL A1,B3, gets reconnect at B2.
+# There was a bug that the UNTIL would be ignored, and A2 would be lost.
+
+--source include/kill_binlog_dump_threads.inc
+SET gtid_domain_id= 9;
+SET gtid_seq_no= 50;
+INSERT INTO t1 VALUES (1);
+SET gtid_domain_id= 10;
+INSERT INTO t1 VALUES (11);
+SET gtid_domain_id= 9;
+INSERT INTO t1 VALUES (2);
+SET gtid_domain_id= 10;
+SET gtid_seq_no= 100;
+INSERT INTO t1 VALUES (12);
+SET gtid_domain_id= 9;
+INSERT INTO t1 VALUES (3);
+SET gtid_domain_id= 10;
+SET gtid_seq_no= 200;
+INSERT INTO t1 VALUES (13);
+SET gtid_domain_id= 10;
+
+SET GLOBAL debug_dbug="+d,gtid_force_reconnect_at_10_1_100";
+
+--connection server_2
+START SLAVE UNTIL master_gtid_pos="9-1-50,10-1-200";
+--source include/wait_for_slave_to_stop.inc
+SELECT * FROM t1 ORDER BY a;
+
+--connection server_1
+SET GLOBAL debug_dbug= @old_debug;
+INSERT INTO t1 VALUES (20);
+--save_master_pos
+
+--connection server_2
+--source include/start_slave.inc
+SELECT * FROM t1 ORDER BY a;
+
+
+--echo *** Test when slave IO thread needs to reconnect in the middle of an event group. ***
+--connection server_2
+--source include/stop_slave.inc
+
+TRUNCATE t1;
+RESET MASTER;
+SET GLOBAL gtid_slave_pos= "";
+
+--connection server_1
+SET GLOBAL debug_dbug= @old_debug;
+TRUNCATE t1;
+RESET MASTER;
+
+--source include/kill_binlog_dump_threads.inc
+SET GLOBAL debug_dbug="+d,binlog_force_reconnect_after_22_events";
+
+# 4 events for FD, fake rotate, gtid list, binlog checkpoint.
+# 2 events for GTID, create table
+CREATE TABLE t2 (a INT PRIMARY KEY) ENGINE=InnoDB;
+# 3 events for BEGIN/query/COMMIT
+INSERT INTO t2 VALUES (1);
+# 4 events for BEGIN/query/query/COMMIT
+BEGIN;
+INSERT INTO t2 VALUES (10);
+INSERT INTO t2 VALUES (11);
+COMMIT;
+# So this event group starts after 4+2+4+3=13 events. Or 16 in row-based.
+BEGIN;
+INSERT INTO t2 VALUES (20);
+INSERT INTO t2 VALUES (21);
+INSERT INTO t2 VALUES (22);
+INSERT INTO t2 VALUES (23);
+INSERT INTO t2 VALUES (24);
+INSERT INTO t2 VALUES (25);
+INSERT INTO t2 VALUES (26);
+INSERT INTO t2 VALUES (27);
+INSERT INTO t2 VALUES (28);
+INSERT INTO t2 VALUES (29);
+COMMIT;
+--save_master_pos
+
+--connection server_2
+--source include/start_slave.inc
+--sync_with_master
+SELECT * FROM t2 ORDER BY a;
+
+--connection server_1
+SET GLOBAL debug_dbug= @old_debug;
+
+
+# Clean up.
+--connection server_1
+DROP TABLE t1, t2;
+
+--source include/rpl_end.inc
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index adaf9aa4e31..d5e9380296e 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -482,26 +482,10 @@ rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid, bool *first)
}
-/*
- Prepare the current slave state as a string, suitable for sending to the
- master to request to receive binlog events starting from that GTID state.
-
- The state consists of the most recently applied GTID for each domain_id,
- ie. the one with the highest sub_id within each domain_id.
-
- Optinally, extra_gtids is a list of GTIDs from the binlog. This is used when
- a server was previously a master and now needs to connect to a new master as
- a slave. For each domain_id, if the GTID in the binlog was logged with our
- own server_id _and_ has a higher seq_no than what is in the slave state,
- then this should be used as the position to start replicating at. This
- allows to promote a slave as new master, and connect the old master as a
- slave with MASTER_GTID_POS=AUTO.
-*/
-
int
-rpl_slave_state::tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra)
+rpl_slave_state::iterate(int (*cb)(rpl_gtid *, void *), void *data,
+ rpl_gtid *extra_gtids, uint32 num_extra)
{
- bool first= true;
uint32 i;
HASH gtid_hash;
uchar *rec;
@@ -555,7 +539,7 @@ rpl_slave_state::tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra)
}
}
- if (rpl_slave_state_tostring_helper(dest, &best_gtid, &first))
+ if ((res= (*cb)(&best_gtid, data)))
{
unlock();
goto err;
@@ -568,7 +552,7 @@ rpl_slave_state::tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra)
for (i= 0; i < gtid_hash.records; ++i)
{
gtid= (rpl_gtid *)my_hash_element(&gtid_hash, i);
- if (rpl_slave_state_tostring_helper(dest, gtid, &first))
+ if ((res= (*cb)(gtid, data)))
goto err;
}
@@ -581,6 +565,44 @@ err:
}
+struct rpl_slave_state_tostring_data {
+ String *dest;
+ bool first;
+};
+static int
+rpl_slave_state_tostring_cb(rpl_gtid *gtid, void *data)
+{
+ rpl_slave_state_tostring_data *p= (rpl_slave_state_tostring_data *)data;
+ return rpl_slave_state_tostring_helper(p->dest, gtid, &p->first);
+}
+
+
+/*
+ Prepare the current slave state as a string, suitable for sending to the
+ master to request to receive binlog events starting from that GTID state.
+
+ The state consists of the most recently applied GTID for each domain_id,
+ ie. the one with the highest sub_id within each domain_id.
+
+ Optinally, extra_gtids is a list of GTIDs from the binlog. This is used when
+ a server was previously a master and now needs to connect to a new master as
+ a slave. For each domain_id, if the GTID in the binlog was logged with our
+ own server_id _and_ has a higher seq_no than what is in the slave state,
+ then this should be used as the position to start replicating at. This
+ allows to promote a slave as new master, and connect the old master as a
+ slave with MASTER_GTID_POS=AUTO.
+*/
+int
+rpl_slave_state::tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra)
+{
+ struct rpl_slave_state_tostring_data data;
+ data.first= true;
+ data.dest= dest;
+
+ return iterate(rpl_slave_state_tostring_cb, &data, extra_gtids, num_extra);
+}
+
+
/*
Lookup a domain_id in the current replication slave state.
@@ -626,9 +648,6 @@ rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid)
Parse a GTID at the start of a string, and update the pointer to point
at the first character after the parsed GTID.
- GTID can be in short form with domain_id=0 implied, SERVERID-SEQNO.
- Or long form, DOMAINID-SERVERID-SEQNO.
-
Returns 0 on ok, non-zero on parse error.
*/
static int
@@ -1217,7 +1236,7 @@ slave_connection_state::load(char *slave_request, size_t len)
rpl_gtid *gtid;
const rpl_gtid *gtid2;
- my_hash_reset(&hash);
+ reset();
p= slave_request;
end= slave_request + len;
if (p == end)
@@ -1270,7 +1289,7 @@ slave_connection_state::load(const rpl_gtid *gtid_list, uint32 count)
{
uint32 i;
- my_hash_reset(&hash);
+ reset();
for (i= 0; i < count; ++i)
if (update(&gtid_list[i]))
return 1;
@@ -1278,6 +1297,28 @@ slave_connection_state::load(const rpl_gtid *gtid_list, uint32 count)
}
+static int
+slave_connection_state_load_cb(rpl_gtid *gtid, void *data)
+{
+ slave_connection_state *state= (slave_connection_state *)data;
+ return state->update(gtid);
+}
+
+
+/*
+ Same as rpl_slave_state::tostring(), but populates a slave_connection_state
+ instead.
+*/
+int
+slave_connection_state::load(rpl_slave_state *state,
+ rpl_gtid *extra_gtids, uint32 num_extra)
+{
+ reset();
+ return state->iterate(slave_connection_state_load_cb, this,
+ extra_gtids, num_extra);
+}
+
+
rpl_gtid *
slave_connection_state::find(uint32 domain_id)
{
diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h
index 40d51568357..4d5302020bf 100644
--- a/sql/rpl_gtid.h
+++ b/sql/rpl_gtid.h
@@ -93,6 +93,8 @@ struct rpl_slave_state
int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
bool in_transaction, bool in_statement);
uint64 next_subid(uint32 domain_id);
+ int iterate(int (*cb)(rpl_gtid *, void *), void *data,
+ rpl_gtid *extra_gtids, uint32 num_extra);
int tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra);
bool domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid);
int load(THD *thd, char *state_from_master, size_t len, bool reset,
@@ -178,8 +180,10 @@ struct slave_connection_state
slave_connection_state();
~slave_connection_state();
+ void reset() { my_hash_reset(&hash); }
int load(char *slave_request, size_t len);
int load(const rpl_gtid *gtid_list, uint32 count);
+ int load(rpl_slave_state *state, rpl_gtid *extra_gtids, uint32 num_extra);
rpl_gtid *find(uint32 domain_id);
int update(const rpl_gtid *in_gtid);
void remove(const rpl_gtid *gtid);
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc
index 4ffe4f37cac..4a69eb4a6ee 100644
--- a/sql/rpl_mi.cc
+++ b/sql/rpl_mi.cc
@@ -38,7 +38,8 @@ Master_info::Master_info(LEX_STRING *connection_name_arg,
connect_retry(DEFAULT_CONNECT_RETRY), inited(0), abort_slave(0),
slave_running(0), slave_run_id(0), sync_counter(0),
heartbeat_period(0), received_heartbeats(0), master_id(0),
- using_gtid(USE_GTID_NO)
+ using_gtid(USE_GTID_NO), events_queued_since_last_gtid(0),
+ gtid_reconnect_event_skip_count(0), gtid_event_seen(false)
{
host[0] = 0; user[0] = 0; password[0] = 0;
ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0;
@@ -147,6 +148,23 @@ void Master_info::clear_in_memory_info(bool all)
}
}
+
+const char *
+Master_info::using_gtid_astext(enum enum_using_gtid arg)
+{
+ switch (arg)
+ {
+ case USE_GTID_NO:
+ return "No";
+ case USE_GTID_SLAVE_POS:
+ return "Slave_Pos";
+ default:
+ DBUG_ASSERT(arg == USE_GTID_CURRENT_POS);
+ return "Current_Pos";
+ }
+}
+
+
void init_master_log_pos(Master_info* mi)
{
DBUG_ENTER("init_master_log_pos");
@@ -154,6 +172,10 @@ void init_master_log_pos(Master_info* mi)
mi->master_log_name[0] = 0;
mi->master_log_pos = BIN_LOG_HEADER_SIZE; // skip magic number
mi->using_gtid= Master_info::USE_GTID_NO;
+ mi->gtid_current_pos.reset();
+ mi->events_queued_since_last_gtid= 0;
+ mi->gtid_reconnect_event_skip_count= 0;
+ mi->gtid_event_seen= false;
/* Intentionally init ssl_verify_server_cert to 0, no option available */
mi->ssl_verify_server_cert= 0;
diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h
index 916bd0dae02..38daed0e260 100644
--- a/sql/rpl_mi.h
+++ b/sql/rpl_mi.h
@@ -61,6 +61,10 @@ typedef struct st_mysql MYSQL;
class Master_info : public Slave_reporting_capability
{
public:
+ enum enum_using_gtid {
+ USE_GTID_NO= 0, USE_GTID_CURRENT_POS= 1, USE_GTID_SLAVE_POS= 2
+ };
+
Master_info(LEX_STRING *connection_name, bool is_slave_recovery);
~Master_info();
bool shall_ignore_server_id(ulong s_id);
@@ -70,6 +74,7 @@ class Master_info : public Slave_reporting_capability
/* If malloc() in initialization failed */
return connection_name.str == 0;
}
+ static const char *using_gtid_astext(enum enum_using_gtid arg);
/* the variables below are needed because we can change masters on the fly */
char master_log_name[FN_REFLEN+6]; /* Room for multi-*/
@@ -135,9 +140,35 @@ class Master_info : public Slave_reporting_capability
Note that you can not change the numeric values of these, they are used
in master.info.
*/
- enum {
- USE_GTID_NO= 0, USE_GTID_CURRENT_POS= 1, USE_GTID_SLAVE_POS= 2
- } using_gtid;
+ enum enum_using_gtid using_gtid;
+
+ /*
+ This GTID position records how far we have fetched into the relay logs.
+ This is used to continue fetching when the IO thread reconnects to the
+ master.
+
+ (Full slave stop/start does not use it, as it resets the relay logs).
+ */
+ slave_connection_state gtid_current_pos;
+ /*
+ If events_queued_since_last_gtid is non-zero, it is the number of events
+ queued so far in the relaylog of a GTID-prefixed event group.
+ It is zero when no partial event group has been queued at the moment.
+ */
+ uint64 events_queued_since_last_gtid;
+ /*
+ The GTID of the partially-queued event group, when
+ events_queued_since_last_gtid is non-zero.
+ */
+ rpl_gtid last_queued_gtid;
+ /*
+ When slave IO thread needs to reconnect, gtid_reconnect_event_skip_count
+ counts number of events to skip from the first GTID-prefixed event group,
+ to avoid duplicating events in the relay log.
+ */
+ uint64 gtid_reconnect_event_skip_count;
+ /* gtid_event_seen is false until we receive first GTID event from master. */
+ bool gtid_event_seen;
};
int init_master_info(Master_info* mi, const char* master_info_fname,
const char* slave_info_fname,
diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt
index a6a331eeb18..9acdda3b78b 100644
--- a/sql/share/errmsg-utf8.txt
+++ b/sql/share/errmsg-utf8.txt
@@ -6549,3 +6549,5 @@ ER_GTID_STRICT_OUT_OF_ORDER
eng "An attempt was made to binlog GTID %u-%u-%llu which would create an out-of-order sequence number with existing GTID %u-%u-%llu, and gtid strict mode is enabled."
ER_GTID_START_FROM_BINLOG_HOLE
eng "The binlog on the master is missing the GTID %u-%u-%llu requested by the slave (even though both a prior and a subsequent sequence number does exist), and GTID strict mode is enabled"
+ER_SLAVE_UNEXPECTED_MASTER_SWITCH
+ eng "Unexpected GTID received from master after reconnect. This normally indicates that the master server was replaced without restarting the slave threads. %s"
diff --git a/sql/slave.cc b/sql/slave.cc
index 6b876c5e863..edd7a06d959 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -114,7 +114,7 @@ static const char *reconnect_messages[SLAVE_RECON_ACT_MAX][SLAVE_RECON_MSG_MAX]=
registration on master",
"Reconnecting after a failed registration on master",
"failed registering on master, reconnecting to try again, \
-log '%s' at position %s",
+log '%s' at position %s%s",
"COM_REGISTER_SLAVE",
"Slave I/O thread killed during or after reconnect"
},
@@ -122,7 +122,7 @@ log '%s' at position %s",
"Waiting to reconnect after a failed binlog dump request",
"Slave I/O thread killed while retrying master dump",
"Reconnecting after a failed binlog dump request",
- "failed dump request, reconnecting to try again, log '%s' at position %s",
+ "failed dump request, reconnecting to try again, log '%s' at position %s%s",
"COM_BINLOG_DUMP",
"Slave I/O thread killed during or after reconnect"
},
@@ -131,7 +131,7 @@ log '%s' at position %s",
"Slave I/O thread killed while waiting to reconnect after a failed read",
"Reconnecting after a failed master event read",
"Slave I/O thread: Failed reading log event, reconnecting to retry, \
-log '%s' at position %s",
+log '%s' at position %s%s",
"",
"Slave I/O thread killed during or after a reconnect done to recover from \
failed read"
@@ -879,9 +879,13 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
purge_relay_logs(&mi->rli, NULL, 0, &errmsg);
mi->master_log_name[0]= 0;
mi->master_log_pos= 0;
+ error= rpl_load_gtid_state(&mi->gtid_current_pos, mi->using_gtid ==
+ Master_info::USE_GTID_CURRENT_POS);
+ mi->events_queued_since_last_gtid= 0;
+ mi->gtid_reconnect_event_skip_count= 0;
}
- if (thread_mask & SLAVE_IO)
+ if (!error && (thread_mask & SLAVE_IO))
error= start_slave_thread(
#ifdef HAVE_PSI_INTERFACE
key_thread_slave_io,
@@ -1873,18 +1877,9 @@ past_checksum:
after_set_capability:
#endif
- /*
- Request dump start from slave replication GTID state.
-
- Only request GTID position the first time we connect after CHANGE MASTER
- or after starting both IO or SQL thread.
-
- Otherwise, if the IO thread was ahead of the SQL thread before the
- restart or reconnect, we might end up re-fetching and hence re-applying
- the same event(s) again.
- */
- if (mi->using_gtid != Master_info::USE_GTID_NO && !mi->master_log_name[0])
+ if (mi->using_gtid != Master_info::USE_GTID_NO)
{
+ /* Request dump to start from slave replication GTID state. */
int rc;
char str_buf[256];
String query_str(str_buf, sizeof(str_buf), system_charset_info);
@@ -1913,9 +1908,7 @@ after_set_capability:
query_str.append(STRING_WITH_LEN("SET @slave_connect_state='"),
system_charset_info);
- if (rpl_append_gtid_state(&query_str,
- mi->using_gtid ==
- Master_info::USE_GTID_CURRENT_POS))
+ if (mi->gtid_current_pos.append_to_string(&query_str))
{
err_code= ER_OUTOFMEMORY;
errmsg= "The slave I/O thread stops because a fatal out-of-memory "
@@ -2016,7 +2009,7 @@ after_set_capability:
}
}
}
- if (mi->using_gtid == Master_info::USE_GTID_NO)
+ else
{
/*
If we are not using GTID to connect this time, then instead request
@@ -2588,10 +2581,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
}
// Master_Server_id
protocol->store((uint32) mi->master_id);
- protocol->store((mi->using_gtid==Master_info::USE_GTID_NO ? "No" :
- (mi->using_gtid==Master_info::USE_GTID_SLAVE_POS ?
- "Slave_Pos" : "Current_Pos")),
- &my_charset_bin);
+ protocol->store(mi->using_gtid_astext(mi->using_gtid), &my_charset_bin);
if (full)
{
protocol->store((uint32) mi->rli.retried_trans);
@@ -3424,8 +3414,22 @@ static int try_to_reconnect(THD *thd, MYSQL *mysql, Master_info *mi,
if (!suppress_warnings)
{
char buf[256], llbuff[22];
+ String tmp;
+ if (mi->using_gtid != Master_info::USE_GTID_NO)
+ {
+ tmp.append(STRING_WITH_LEN("; GTID position '"));
+ mi->gtid_current_pos.append_to_string(&tmp);
+ if (mi->events_queued_since_last_gtid == 0)
+ tmp.append(STRING_WITH_LEN("'"));
+ else
+ {
+ tmp.append(STRING_WITH_LEN("', GTID event skip "));
+ tmp.append_ulonglong((ulonglong)mi->events_queued_since_last_gtid);
+ }
+ }
my_snprintf(buf, sizeof(buf), messages[SLAVE_RECON_MSG_FAILED],
- IO_RPL_LOG_NAME, llstr(mi->master_log_pos, llbuff));
+ IO_RPL_LOG_NAME, llstr(mi->master_log_pos, llbuff),
+ tmp.c_ptr_safe());
/*
Raise a warining during registering on master/requesting dump.
Log a message reading event.
@@ -3545,11 +3549,21 @@ pthread_handler_t handle_slave_io(void *arg)
// we can get killed during safe_connect
if (!safe_connect(thd, mysql, mi))
{
- sql_print_information("Slave I/O thread: connected to master '%s@%s:%d',"
- "replication started in log '%s' at position %s",
- mi->user, mi->host, mi->port,
- IO_RPL_LOG_NAME,
- llstr(mi->master_log_pos,llbuff));
+ if (mi->using_gtid == Master_info::USE_GTID_NO)
+ sql_print_information("Slave I/O thread: connected to master '%s@%s:%d',"
+ "replication started in log '%s' at position %s",
+ mi->user, mi->host, mi->port,
+ IO_RPL_LOG_NAME,
+ llstr(mi->master_log_pos,llbuff));
+ else
+ {
+ String tmp;
+ mi->gtid_current_pos.to_string(&tmp);
+ sql_print_information("Slave I/O thread: connected to master '%s@%s:%d',"
+ "replication starts at GTID position '%s'",
+ mi->user, mi->host, mi->port, tmp.c_ptr_safe());
+ }
+
/*
Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O
thread, since a replication event can become this much larger than
@@ -3566,6 +3580,25 @@ pthread_handler_t handle_slave_io(void *arg)
connected:
+ if (mi->using_gtid != Master_info::USE_GTID_NO)
+ {
+ /*
+ When the IO thread (re)connects to the master using GTID, it will
+ connect at the start of an event group. But the IO thread may have
+ previously logged part of the following event group to the relay
+ log.
+
+ When the IO and SQL thread are started together, we erase any previous
+ relay logs, but this is not possible/desirable while the SQL thread is
+ running. To avoid duplicating partial event groups in the relay logs in
+ this case, we remember the count of events in any partially logged event
+ group before the reconnect, and then here at connect we set up a counter
+ to skip the already-logged part of the group.
+ */
+ mi->gtid_reconnect_event_skip_count= mi->events_queued_since_last_gtid;
+ mi->gtid_event_seen= false;
+ }
+
#ifdef ENABLED_DEBUG_SYNC
DBUG_EXECUTE_IF("dbug.before_get_running_status_yes",
{
@@ -3791,8 +3824,19 @@ log space");
// error = 0;
err:
// print the current replication position
- sql_print_information("Slave I/O thread exiting, read up to log '%s', position %s",
- IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
+ if (mi->using_gtid == Master_info::USE_GTID_NO)
+ sql_print_information("Slave I/O thread exiting, read up to log '%s', "
+ "position %s",
+ IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
+ else
+ {
+ String tmp;
+ mi->gtid_current_pos.to_string(&tmp);
+ sql_print_information("Slave I/O thread exiting, read up to log '%s', "
+ "position %s; GTID position %s",
+ IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff),
+ tmp.c_ptr_safe());
+ }
RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi));
thd->reset_query();
thd->reset_db(NULL, 0);
@@ -4059,10 +4103,20 @@ pthread_handler_t handle_slave_sql(void *arg)
rli->group_master_log_name,
llstr(rli->group_master_log_pos,llbuff)));
if (global_system_variables.log_warnings)
+ {
+ String tmp;
+ if (mi->using_gtid != Master_info::USE_GTID_NO)
+ {
+ tmp.append(STRING_WITH_LEN("; GTID position '"));
+ rpl_append_gtid_state(&tmp,
+ mi->using_gtid==Master_info::USE_GTID_CURRENT_POS);
+ tmp.append(STRING_WITH_LEN("'"));
+ }
sql_print_information("Slave SQL thread initialized, starting replication in \
-log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
+log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
llstr(rli->group_master_log_pos,llbuff),rli->group_relay_log_name,
- llstr(rli->group_relay_log_pos,llbuff1));
+ llstr(rli->group_relay_log_pos,llbuff1), tmp.c_ptr_safe());
+ }
if (check_temp_dir(rli->slave_patternload_file))
{
@@ -4196,16 +4250,35 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
sql_print_warning("Slave: %s Error_code: %d", err->get_message_text(), err->get_sql_errno());
}
if (udf_error)
+ {
+ String tmp;
+ if (mi->using_gtid != Master_info::USE_GTID_NO)
+ {
+ tmp.append(STRING_WITH_LEN("; GTID position '"));
+ rpl_append_gtid_state(&tmp, false);
+ tmp.append(STRING_WITH_LEN("'"));
+ }
sql_print_error("Error loading user-defined library, slave SQL "
"thread aborted. Install the missing library, and restart the "
"slave SQL thread with \"SLAVE START\". We stopped at log '%s' "
- "position %s", RPL_LOG_NAME, llstr(rli->group_master_log_pos,
- llbuff));
+ "position %s%s", RPL_LOG_NAME, llstr(rli->group_master_log_pos,
+ llbuff), tmp.c_ptr_safe());
+ }
else
+ {
+ String tmp;
+ if (mi->using_gtid != Master_info::USE_GTID_NO)
+ {
+ tmp.append(STRING_WITH_LEN("; GTID position '"));
+ rpl_append_gtid_state(&tmp, false);
+ tmp.append(STRING_WITH_LEN("'"));
+ }
sql_print_error("\
Error running query, slave SQL thread aborted. Fix the problem, and restart \
the slave SQL thread with \"SLAVE START\". We stopped at log \
-'%s' position %s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff));
+'%s' position %s%s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff),
+ tmp.c_ptr_safe());
+ }
}
goto err;
}
@@ -4213,9 +4286,20 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
}
/* Thread stopped. Print the current replication position to the log */
- sql_print_information("Slave SQL thread exiting, replication stopped in log "
- "'%s' at position %s",
- RPL_LOG_NAME, llstr(rli->group_master_log_pos,llbuff));
+ {
+ String tmp;
+ if (mi->using_gtid != Master_info::USE_GTID_NO)
+ {
+ tmp.append(STRING_WITH_LEN("; GTID position '"));
+ rpl_append_gtid_state(&tmp, false);
+ tmp.append(STRING_WITH_LEN("'"));
+ }
+ sql_print_information("Slave SQL thread exiting, replication stopped in "
+ "log '%s' at position %s%s",
+ RPL_LOG_NAME,
+ llstr(rli->group_master_log_pos,llbuff),
+ tmp.c_ptr_safe());
+ }
err:
@@ -4690,6 +4774,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
mysql_mutex_t *log_lock= rli->relay_log.get_log_lock();
ulong s_id;
bool unlock_data_lock= TRUE;
+ bool gtid_skip_enqueue= false;
+
/*
FD_q must have been prepared for the first R_a event
inside get_master_version_and_clock()
@@ -4877,6 +4963,19 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
mi->rli.relay_log.relay_log_checksum_alg= tmp->checksum_alg;
/*
+ Do not queue any format description event that we receive after a
+ reconnect where we are skipping over a partial event group received
+ before the reconnect.
+
+ (If we queued such an event, and it was the first format_description
+ event after master restart, the slave SQL thread would think that
+ the partial event group before it in the relay log was from a
+ previous master crash and should be rolled back.
+ */
+ if (unlikely(mi->gtid_reconnect_event_skip_count))
+ gtid_skip_enqueue= true;
+
+ /*
Though this does some conversion to the slave's format, this will
preserve the master's binlog format version, and number of event types.
*/
@@ -4971,18 +5070,113 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
" UNTIL master_gtid_pos %s", str.c_ptr_safe());
mi->abort_slave= true;
}
+ event_pos= glev->log_pos;
delete glev;
/*
- Do not update position for fake Gtid_list event (which has a zero
- end_log_pos).
+ We use fake Gtid_list events to update the old-style position (among
+ other things).
+
+ Early code created fake Gtid_list events with zero log_pos, those should
+ not modify old-style position.
*/
- inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
+ if (event_pos == 0 || event_pos <= mi->master_log_pos)
+ inc_pos= 0;
+ else
+ inc_pos= event_pos - mi->master_log_pos;
+ }
+ break;
+
+ case GTID_EVENT:
+ {
+ uchar dummy_flag;
+
+ if (mi->using_gtid == Master_info::USE_GTID_NO)
+ goto default_action;
+ if (unlikely(!mi->gtid_event_seen))
+ {
+ mi->gtid_event_seen= true;
+ if (mi->gtid_reconnect_event_skip_count)
+ {
+ rpl_gtid gtid;
+
+ /*
+ If we are reconnecting, and we need to skip a partial event group
+ already queued to the relay log before the reconnect, then we check
+ that we actually get the same event group (same GTID) as before, so
+ we do not end up with half of one group and half another.
+
+ The only way we should be able to receive a different GTID than what
+ we expect is if the binlog on the master (or more likely the whole
+ master server) was replaced with a different one, one the same IP
+ address, _and_ the new master happens to have domains in a different
+ order so we get the GTID from a different domain first. Still, it is
+ best to protect against this case.
+ */
+ if (Gtid_log_event::peek(buf, event_len, checksum_alg,
+ &gtid.domain_id, &gtid.server_id,
+ &gtid.seq_no, &dummy_flag))
+ {
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
+ goto err;
+ }
+ if (gtid.domain_id != mi->last_queued_gtid.domain_id ||
+ gtid.server_id != mi->last_queued_gtid.server_id ||
+ gtid.seq_no != mi->last_queued_gtid.seq_no)
+ {
+ bool first;
+ error= ER_SLAVE_UNEXPECTED_MASTER_SWITCH;
+ error_msg.append(STRING_WITH_LEN("Expected: "));
+ first= true;
+ rpl_slave_state_tostring_helper(&error_msg, &mi->last_queued_gtid,
+ &first);
+ error_msg.append(STRING_WITH_LEN(", received: "));
+ first= true;
+ rpl_slave_state_tostring_helper(&error_msg, &gtid, &first);
+ goto err;
+ }
+ }
+ }
+
+ if (unlikely(mi->gtid_reconnect_event_skip_count))
+ {
+ goto default_action;
+ }
+
+ /*
+ We have successfully queued to relay log everything before this GTID, so
+ in case of reconnect we can start from after any previous GTID.
+ */
+ if (mi->events_queued_since_last_gtid)
+ {
+ mi->gtid_current_pos.update(&mi->last_queued_gtid);
+ mi->events_queued_since_last_gtid= 0;
+ }
+ if (Gtid_log_event::peek(buf, event_len, checksum_alg,
+ &mi->last_queued_gtid.domain_id,
+ &mi->last_queued_gtid.server_id,
+ &mi->last_queued_gtid.seq_no, &dummy_flag))
+ {
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
+ goto err;
+ }
+ ++mi->events_queued_since_last_gtid;
}
break;
default:
default_action:
+ if (mi->using_gtid != Master_info::USE_GTID_NO && mi->gtid_event_seen)
+ {
+ if (unlikely(mi->gtid_reconnect_event_skip_count))
+ {
+ --mi->gtid_reconnect_event_skip_count;
+ gtid_skip_enqueue= true;
+ }
+ else if (mi->events_queued_since_last_gtid)
+ ++mi->events_queued_since_last_gtid;
+ }
+
inc_pos= event_len;
break;
}
@@ -5067,8 +5261,16 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
}
else
{
- /* write the event to the relay log */
- if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
+ /*
+ Write the event to the relay log, unless we reconnected in the middle
+ of an event group and now need to skip the initial part of the group that
+ we already wrote before reconnecting.
+ */
+ if (unlikely(gtid_skip_enqueue))
+ {
+ mi->master_log_pos+= inc_pos;
+ }
+ else if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
{
mi->master_log_pos+= inc_pos;
DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index c6372099600..4625a61f22c 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -2776,13 +2776,34 @@ end_with_restore_list:
{
LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
Master_info *mi;
+ int load_error;
+
+ load_error= rpl_load_gtid_slave_state(thd);
+
mysql_mutex_lock(&LOCK_active_mi);
if ((mi= (master_info_index->
get_master_info(&lex_mi->connection_name,
MYSQL_ERROR::WARN_LEVEL_ERROR))))
+ {
+ if (load_error)
+ {
+ /*
+ We cannot start a slave using GTID if we cannot load the GTID position
+ from the mysql.gtid_slave_pos table. But we can allow non-GTID
+ replication (useful eg. during upgrade).
+ */
+ if (mi->using_gtid != Master_info::USE_GTID_NO)
+ {
+ mysql_mutex_unlock(&LOCK_active_mi);
+ break;
+ }
+ else
+ thd->clear_error();
+ }
if (!start_slave(thd, mi, 1 /* net report*/))
my_ok(thd);
+ }
mysql_mutex_unlock(&LOCK_active_mi);
break;
}
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index fd9ead71472..d8aebbde8dc 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -50,7 +50,7 @@ extern TYPELIB binlog_checksum_typelib;
static int
fake_event_header(String* packet, Log_event_type event_type, ulong extra_len,
my_bool *do_checksum, ha_checksum *crc, const char** errmsg,
- uint8 checksum_alg_arg)
+ uint8 checksum_alg_arg, uint32 end_pos)
{
char header[LOG_EVENT_HEADER_LEN];
ulong event_len;
@@ -70,7 +70,7 @@ fake_event_header(String* packet, Log_event_type event_type, ulong extra_len,
int4store(header + EVENT_LEN_OFFSET, event_len);
int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F);
// TODO: check what problems this may cause and fix them
- int4store(header + LOG_POS_OFFSET, 0);
+ int4store(header + LOG_POS_OFFSET, end_pos);
if (packet->append(header, sizeof(header)))
{
*errmsg= "Failed due to out-of-memory writing event";
@@ -146,7 +146,7 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
if ((err= fake_event_header(packet, ROTATE_EVENT,
ident_len + ROTATE_HEADER_LEN, &do_checksum, &crc,
- errmsg, checksum_alg_arg)))
+ errmsg, checksum_alg_arg, 0)))
DBUG_RETURN(err);
int8store(buf+R_POS_OFFSET,position);
@@ -169,7 +169,7 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
static int fake_gtid_list_event(NET* net, String* packet,
Gtid_list_log_event *glev, const char** errmsg,
- uint8 checksum_alg_arg)
+ uint8 checksum_alg_arg, uint32 current_pos)
{
my_bool do_checksum;
int err;
@@ -185,7 +185,7 @@ static int fake_gtid_list_event(NET* net, String* packet,
}
if ((err= fake_event_header(packet, GTID_LIST_EVENT,
str.length(), &do_checksum, &crc,
- errmsg, checksum_alg_arg)))
+ errmsg, checksum_alg_arg, current_pos)))
return err;
packet->append(str);
@@ -1406,7 +1406,7 @@ is_until_reached(THD *thd, NET *net, String *packet, ulong *ev_offset,
enum_gtid_until_state gtid_until_group,
Log_event_type event_type, uint8 current_checksum_alg,
ushort flags, const char **errmsg,
- rpl_binlog_state *until_binlog_state)
+ rpl_binlog_state *until_binlog_state, uint32 current_pos)
{
switch (gtid_until_group)
{
@@ -1437,7 +1437,8 @@ is_until_reached(THD *thd, NET *net, String *packet, ulong *ev_offset,
return true;
Gtid_list_log_event glev(until_binlog_state,
Gtid_list_log_event::FLAG_UNTIL_REACHED);
- if (fake_gtid_list_event(net, packet, &glev, errmsg, current_checksum_alg))
+ if (fake_gtid_list_event(net, packet, &glev, errmsg, current_checksum_alg,
+ current_pos))
return true;
*errmsg= NULL;
return true;
@@ -1508,6 +1509,19 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
return "Failed to read Gtid_log_event: corrupt binlog";
}
+ DBUG_EXECUTE_IF("gtid_force_reconnect_at_10_1_100",
+ {
+ rpl_gtid *dbug_gtid;
+ if ((dbug_gtid= until_binlog_state->find(10,1)) &&
+ dbug_gtid->seq_no == 100)
+ {
+ DBUG_SET("-d,gtid_force_reconnect_at_10_1_100");
+ DBUG_SET_INITIAL("-d,gtid_force_reconnect_at_10_1_100");
+ my_errno= ER_UNKNOWN_ERROR;
+ return "DBUG-injected forced reconnect";
+ }
+ });
+
if (until_binlog_state->update(&event_gtid, false))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
@@ -1527,19 +1541,31 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
if (event_gtid.server_id == gtid->server_id &&
event_gtid.seq_no >= gtid->seq_no)
{
- /*
- In strict mode, it is an error if the slave requests to start in
- a "hole" in the master's binlog: a GTID that does not exist, even
- though both the prior and subsequent seq_no exists for same
- domain_id and server_id.
- */
- if (slave_gtid_strict_mode && event_gtid.seq_no > gtid->seq_no)
+ if (event_gtid.seq_no > gtid->seq_no)
+ {
+ /*
+ In strict mode, it is an error if the slave requests to start
+ in a "hole" in the master's binlog: a GTID that does not
+ exist, even though both the prior and subsequent seq_no exists
+ for same domain_id and server_id.
+ */
+ if (slave_gtid_strict_mode)
+ {
+ my_errno= ER_GTID_START_FROM_BINLOG_HOLE;
+ *error_gtid= *gtid;
+ return "The binlog on the master is missing the GTID requested "
+ "by the slave (even though both a prior and a subsequent "
+ "sequence number does exist), and GTID strict mode is enabled.";
+ }
+ }
+ else
{
- my_errno= ER_GTID_START_FROM_BINLOG_HOLE;
- *error_gtid= *gtid;
- return "The binlog on the master is missing the GTID requested "
- "by the slave (even though both a prior and a subsequent "
- "sequence number does exist), and GTID strict mode is enabled.";
+ /*
+ Send a fake Gtid_list event to the slave.
+ This allows the slave to update its current binlog position
+ so MASTER_POS_WAIT() and MASTER_GTID_WAIT() can work.
+ */
+// send_fake_gtid_list_event(until_binlog_state);
}
/*
Delete this entry if we have reached slave start position (so we
@@ -1797,6 +1823,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
int old_max_allowed_packet= thd->variables.max_allowed_packet;
#ifndef DBUG_OFF
int left_events = max_binlog_dump_events;
+ uint dbug_reconnect_counter= 0;
#endif
DBUG_ENTER("mysql_binlog_send");
DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos));
@@ -1830,6 +1857,13 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
until_gtid_state= &until_gtid_state_obj;
}
+ DBUG_EXECUTE_IF("binlog_force_reconnect_after_22_events",
+ {
+ DBUG_SET("-d,binlog_force_reconnect_after_22_events");
+ DBUG_SET_INITIAL("-d,binlog_force_reconnect_after_22_events");
+ dbug_reconnect_counter= 22;
+ });
+
/*
We want to corrupt the first event, in Log_event::read_log_event().
But we do not want the corruption to happen early, eg. when client does
@@ -2176,6 +2210,19 @@ impossible position";
(*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
}
+#ifndef DBUG_OFF
+ if (dbug_reconnect_counter > 0)
+ {
+ --dbug_reconnect_counter;
+ if (dbug_reconnect_counter == 0)
+ {
+ errmsg= "DBUG-injected forced reconnect";
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
+ }
+#endif
+
if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
log_file_name, &log,
mariadb_slave_capability, ev_offset,
@@ -2191,7 +2238,7 @@ impossible position";
if (until_gtid_state &&
is_until_reached(thd, net, packet, &ev_offset, gtid_until_group,
event_type, current_checksum_alg, flags, &errmsg,
- &until_binlog_state))
+ &until_binlog_state, my_b_tell(&log)))
{
if (errmsg)
{
@@ -2373,7 +2420,7 @@ impossible position";
until_gtid_state &&
is_until_reached(thd, net, packet, &ev_offset, gtid_until_group,
event_type, current_checksum_alg, flags, &errmsg,
- &until_binlog_state))
+ &until_binlog_state, my_b_tell(&log)))
{
if (errmsg)
{
@@ -2970,6 +3017,7 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
char saved_host[HOSTNAME_LENGTH + 1];
uint saved_port;
char saved_log_name[FN_REFLEN];
+ Master_info::enum_using_gtid saved_using_gtid;
char master_info_file_tmp[FN_REFLEN];
char relay_log_info_file_tmp[FN_REFLEN];
my_off_t saved_log_pos;
@@ -3059,6 +3107,7 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
saved_port= mi->port;
strmake(saved_log_name, mi->master_log_name, FN_REFLEN - 1);
saved_log_pos= mi->master_log_pos;
+ saved_using_gtid= mi->using_gtid;
/*
If the user specified host or port without binlog or position,
@@ -3291,6 +3340,11 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
"master_log_pos='%ld'.", saved_host, saved_port, saved_log_name,
(ulong) saved_log_pos, mi->host, mi->port, mi->master_log_name,
(ulong) mi->master_log_pos);
+ if (saved_using_gtid != Master_info::USE_GTID_NO ||
+ mi->using_gtid != Master_info::USE_GTID_NO)
+ sql_print_information("Previous Using_Gtid=%s. New Using_Gtid=%s",
+ mi->using_gtid_astext(saved_using_gtid),
+ mi->using_gtid_astext(mi->using_gtid));
/*
If we don't write new coordinates to disk now, then old will remain in
@@ -3753,11 +3807,11 @@ rpl_deinit_gtid_slave_state()
/*
- Format the current GTID state as a string, for use when connecting to a
- master server with GTID, or for returning the value of @@global.gtid_state.
+ Format the current GTID state as a string, for returning the value of
+ @@global.gtid_slave_pos.
If the flag use_binlog is true, then the contents of the binary log (if
- enabled) is merged into the current GTID state.
+ enabled) is merged into the current GTID state (@@global.gtid_current_pos).
*/
int
rpl_append_gtid_state(String *dest, bool use_binlog)
@@ -3770,10 +3824,35 @@ rpl_append_gtid_state(String *dest, bool use_binlog)
(err= mysql_bin_log.get_most_recent_gtid_list(&gtid_list, &num_gtids)))
return err;
- rpl_global_gtid_slave_state.tostring(dest, gtid_list, num_gtids);
+ err= rpl_global_gtid_slave_state.tostring(dest, gtid_list, num_gtids);
my_free(gtid_list);
- return 0;
+ return err;
+}
+
+
+/*
+ Load the current GITD position into a slave_connection_state, for use when
+ connecting to a master server with GTID.
+
+ If the flag use_binlog is true, then the contents of the binary log (if
+ enabled) is merged into the current GTID state (master_use_gtid=current_pos).
+*/
+int
+rpl_load_gtid_state(slave_connection_state *state, bool use_binlog)
+{
+ int err;
+ rpl_gtid *gtid_list= NULL;
+ uint32 num_gtids= 0;
+
+ if (use_binlog && opt_bin_log &&
+ (err= mysql_bin_log.get_most_recent_gtid_list(&gtid_list, &num_gtids)))
+ return err;
+
+ err= state->load(&rpl_global_gtid_slave_state, gtid_list, num_gtids);
+ my_free(gtid_list);
+
+ return err;
}
diff --git a/sql/sql_repl.h b/sql/sql_repl.h
index 820ffed0928..a242fa4aeef 100644
--- a/sql/sql_repl.h
+++ b/sql/sql_repl.h
@@ -32,6 +32,8 @@ typedef struct st_slave_info
THD* thd;
} SLAVE_INFO;
+class slave_connection_state;
+
extern my_bool opt_show_slave_auth_info;
extern char *master_host, *master_info_file;
extern bool server_id_supplied;
@@ -70,6 +72,7 @@ void rpl_init_gtid_slave_state();
void rpl_deinit_gtid_slave_state();
int gtid_state_from_binlog_pos(const char *name, uint32 pos, String *out_str);
int rpl_append_gtid_state(String *dest, bool use_binlog);
+int rpl_load_gtid_state(slave_connection_state *state, bool use_binlog);
bool rpl_gtid_pos_check(THD *thd, char *str, size_t len);
bool rpl_gtid_pos_update(THD *thd, char *str, size_t len);