diff options
-rw-r--r-- | mysql-test/include/rpl_init.inc | 1 | ||||
-rw-r--r-- | mysql-test/suite/rpl/r/rpl_gtid_master_promote.result | 367 | ||||
-rw-r--r-- | mysql-test/suite/rpl/r/rpl_gtid_until.result | 225 | ||||
-rw-r--r-- | mysql-test/suite/rpl/t/rpl_gtid_master_promote.cnf | 35 | ||||
-rw-r--r-- | mysql-test/suite/rpl/t/rpl_gtid_master_promote.test | 267 | ||||
-rw-r--r-- | mysql-test/suite/rpl/t/rpl_gtid_until.test | 234 | ||||
-rw-r--r-- | sql/lex.h | 3 | ||||
-rw-r--r-- | sql/log.cc | 11 | ||||
-rw-r--r-- | sql/log_event.cc | 88 | ||||
-rw-r--r-- | sql/log_event.h | 19 | ||||
-rw-r--r-- | sql/rpl_gtid.cc | 25 | ||||
-rw-r--r-- | sql/rpl_gtid.h | 2 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 3 | ||||
-rw-r--r-- | sql/rpl_rli.h | 8 | ||||
-rw-r--r-- | sql/share/errmsg-utf8.txt | 2 | ||||
-rw-r--r-- | sql/slave.cc | 82 | ||||
-rw-r--r-- | sql/sql_lex.h | 4 | ||||
-rw-r--r-- | sql/sql_repl.cc | 585 | ||||
-rw-r--r-- | sql/sql_yacc.yy | 10 |
19 files changed, 1824 insertions, 147 deletions
diff --git a/mysql-test/include/rpl_init.inc b/mysql-test/include/rpl_init.inc index e608a1223a5..939a05d8011 100644 --- a/mysql-test/include/rpl_init.inc +++ b/mysql-test/include/rpl_init.inc @@ -178,6 +178,7 @@ while ($_rpl_server) { RESET MASTER; RESET SLAVE; + SET GLOBAL gtid_pos= ""; } eval SET auto_increment_increment= $rpl_server_count; eval SET auto_increment_offset= $_rpl_server; diff --git a/mysql-test/suite/rpl/r/rpl_gtid_master_promote.result b/mysql-test/suite/rpl/r/rpl_gtid_master_promote.result new file mode 100644 index 00000000000..1f600b2a536 --- /dev/null +++ b/mysql-test/suite/rpl/r/rpl_gtid_master_promote.result @@ -0,0 +1,367 @@ +include/rpl_init.inc [topology=1->2, 1->3, 1->4, 1->5] +ALTER TABLE mysql.rpl_slave_state ENGINE=InnoDB; +CREATE TABLE t4 (a INT, b INT, PRIMARY KEY (a,b)) Engine=InnoDB; +CREATE FUNCTION extract_gtid(d VARCHAR(100), s VARCHAR(100)) +RETURNS VARCHAR(100) DETERMINISTIC +BEGIN +SET s= CONCAT(",", s, ","); +SET s= SUBSTR(s FROM LOCATE(CONCAT(",", d, "-"), s) + 1); +SET s= SUBSTR(s FROM 1 FOR LOCATE(",", s) - 1); +RETURN s; +END| +include/stop_slave.inc +CHANGE MASTER TO master_use_gtid=1; +include/stop_slave.inc +CHANGE MASTER TO master_use_gtid=1; +include/stop_slave.inc +CHANGE MASTER TO master_use_gtid=1; +include/stop_slave.inc +CHANGE MASTER TO master_use_gtid=1; +SET gtid_domain_id= 1; +CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB; +INSERT INTO t1 VALUES (1); +INSERT INTO t1 VALUES (2); +INSERT INTO t4 VALUES (1, 1); +INSERT INTO t1 VALUES (3); +INSERT INTO t1 VALUES (4); +INSERT INTO t4 VALUES (1, 3); +SET gtid_domain_id= 2; +CREATE TABLE t2 (a INT PRIMARY KEY) ENGINE=InnoDB; +INSERT INTO t2 VALUES (1); +INSERT INTO t2 VALUES (2); +INSERT INTO t4 VALUES (2, 1); +INSERT INTO t2 VALUES (3); +INSERT INTO t2 VALUES (4); +INSERT INTO t4 VALUES (2, 3); +SET gtid_domain_id= 3; +CREATE TABLE t3 (a INT PRIMARY KEY) ENGINE=InnoDB; +INSERT INTO t3 VALUES (1); +INSERT INTO t3 VALUES (2); +INSERT INTO t4 VALUES (3, 1); +INSERT INTO t3 VALUES (3); +INSERT INTO t3 VALUES (4); +INSERT INTO t4 VALUES (3, 3); +START SLAVE UNTIL master_gtid_pos= "1-1-7,2-1-14,3-1-21"; +START SLAVE UNTIL master_gtid_pos= "1-1-4,2-1-14,3-1-24"; +START SLAVE UNTIL master_gtid_pos= "2-1-11,3-1-21,1-1-10"; +START SLAVE UNTIL master_gtid_pos= "3-1-18,1-1-7,2-1-17"; +include/wait_for_slave_to_stop.inc +SELECT * FROM t1 ORDER BY a; +a +1 +2 +SELECT * FROM t2 ORDER BY a; +a +1 +2 +SELECT * FROM t3 ORDER BY a; +a +1 +2 +SELECT * FROM t4 ORDER BY a,b; +a b +1 1 +2 1 +3 1 +include/wait_for_slave_to_stop.inc +SELECT * FROM t1 ORDER BY a; +a +SELECT * FROM t2 ORDER BY a; +a +1 +2 +SELECT * FROM t3 ORDER BY a; +a +1 +2 +3 +4 +SELECT * FROM t4 ORDER BY a,b; +a b +2 1 +3 1 +3 3 +include/wait_for_slave_to_stop.inc +SELECT * FROM t1 ORDER BY a; +a +1 +2 +3 +4 +SELECT * FROM t2 ORDER BY a; +a +SELECT * FROM t3 ORDER BY a; +a +1 +2 +SELECT * FROM t4 ORDER BY a,b; +a b +1 1 +1 3 +3 1 +include/wait_for_slave_to_stop.inc +SELECT * FROM t1 ORDER BY a; +a +1 +2 +SELECT * FROM t2 ORDER BY a; +a +1 +2 +3 +4 +SELECT * FROM t3 ORDER BY a; +a +SELECT * FROM t4 ORDER BY a,b; +a b +1 1 +2 1 +2 3 +*** Now replicate all extra changes from 3,4,5 to 2, in preparation for making 2 the new master. *** +CHANGE MASTER TO master_host = '127.0.0.1', master_port = SERVER_MYPORT_3; +START SLAVE UNTIL master_gtid_pos = "1-1-4,0-1-3,3-1-24,2-1-14"; +include/wait_for_slave_to_stop.inc +SELECT * FROM t1 ORDER BY a; +a +1 +2 +SELECT * FROM t2 ORDER BY a; +a +1 +2 +SELECT * FROM t3 ORDER BY a; +a +1 +2 +3 +4 +SELECT * FROM t4 ORDER BY a,b; +a b +1 1 +2 1 +3 1 +3 3 +CHANGE MASTER TO master_host = '127.0.0.1', master_port = SERVER_MYPORT_4; +START SLAVE UNTIL master_gtid_pos = "1-1-10,0-1-3,3-1-21,2-1-11"; +include/wait_for_slave_to_stop.inc +SELECT * FROM t1 ORDER BY a; +a +1 +2 +3 +4 +SELECT * FROM t2 ORDER BY a; +a +1 +2 +SELECT * FROM t3 ORDER BY a; +a +1 +2 +3 +4 +SELECT * FROM t4 ORDER BY a,b; +a b +1 1 +1 3 +2 1 +3 1 +3 3 +CHANGE MASTER TO master_host = '127.0.0.1', master_port = SERVER_MYPORT_5; +START SLAVE UNTIL master_gtid_pos = "1-1-7,0-1-3,3-1-18,2-1-17"; +include/wait_for_slave_to_stop.inc +SELECT * FROM t1 ORDER BY a; +a +1 +2 +3 +4 +SELECT * FROM t2 ORDER BY a; +a +1 +2 +3 +4 +SELECT * FROM t3 ORDER BY a; +a +1 +2 +3 +4 +SELECT * FROM t4 ORDER BY a,b; +a b +1 1 +1 3 +2 1 +2 3 +3 1 +3 3 +*** Now make 2 master and point 3,4,5 to the new master 2 +SET gtid_domain_id= 1; +INSERT INTO t1 values (5); +INSERT INTO t4 values (1,5); +SET gtid_domain_id= 2; +INSERT INTO t2 values (5); +INSERT INTO t4 values (2,5); +SET gtid_domain_id= 3; +INSERT INTO t3 values (5); +INSERT INTO t4 values (3,5); +CHANGE MASTER TO master_host = '127.0.0.1', master_port = SERVER_MYPORT_2; +include/start_slave.inc +CHANGE MASTER TO master_host = '127.0.0.1', master_port = SERVER_MYPORT_2; +include/start_slave.inc +CHANGE MASTER TO master_host = '127.0.0.1', master_port = SERVER_MYPORT_2; +include/start_slave.inc +SELECT * FROM t1 ORDER BY a; +a +1 +2 +3 +4 +5 +SELECT * FROM t2 ORDER BY a; +a +1 +2 +3 +4 +5 +SELECT * FROM t3 ORDER BY a; +a +1 +2 +3 +4 +5 +SELECT * FROM t4 ORDER BY a,b; +a b +1 1 +1 3 +1 5 +2 1 +2 3 +2 5 +3 1 +3 3 +3 5 +SELECT * FROM t1 ORDER BY a; +a +1 +2 +3 +4 +5 +SELECT * FROM t2 ORDER BY a; +a +1 +2 +3 +4 +5 +SELECT * FROM t3 ORDER BY a; +a +1 +2 +3 +4 +5 +SELECT * FROM t4 ORDER BY a,b; +a b +1 1 +1 3 +1 5 +2 1 +2 3 +2 5 +3 1 +3 3 +3 5 +SELECT * FROM t1 ORDER BY a; +a +1 +2 +3 +4 +5 +SELECT * FROM t2 ORDER BY a; +a +1 +2 +3 +4 +5 +SELECT * FROM t3 ORDER BY a; +a +1 +2 +3 +4 +5 +SELECT * FROM t4 ORDER BY a,b; +a b +1 1 +1 3 +1 5 +2 1 +2 3 +2 5 +3 1 +3 3 +3 5 +*** Now let the old master join up as slave. *** +CHANGE MASTER TO master_host = '127.0.0.1', master_port = SERVER_MYPORT_2, +master_user = "root", master_use_gtid = 1; +include/start_slave.inc +SELECT * FROM t1 ORDER BY a; +a +1 +2 +3 +4 +5 +SELECT * FROM t2 ORDER BY a; +a +1 +2 +3 +4 +5 +SELECT * FROM t3 ORDER BY a; +a +1 +2 +3 +4 +5 +SELECT * FROM t4 ORDER BY a,b; +a b +1 1 +1 3 +1 5 +2 1 +2 3 +2 5 +3 1 +3 3 +3 5 +*** Finally move things back and clean up. *** +include/stop_slave.inc +RESET SLAVE ALL; +CHANGE MASTER TO master_host = '127.0.0.1', master_port = SERVER_MYPORT_1; +include/start_slave.inc +include/stop_slave.inc +CHANGE MASTER TO master_host = '127.0.0.1', master_port = SERVER_MYPORT_1; +include/start_slave.inc +include/stop_slave.inc +CHANGE MASTER TO master_host = '127.0.0.1', master_port = SERVER_MYPORT_1; +include/start_slave.inc +include/stop_slave.inc +CHANGE MASTER TO master_host = '127.0.0.1', master_port = SERVER_MYPORT_1; +include/start_slave.inc +SET gtid_domain_id = 0; +DROP TABLE t1; +DROP TABLE t2; +DROP TABLE t3; +DROP TABLE t4; +DROP FUNCTION extract_gtid; +include/rpl_end.inc diff --git a/mysql-test/suite/rpl/r/rpl_gtid_until.result b/mysql-test/suite/rpl/r/rpl_gtid_until.result new file mode 100644 index 00000000000..382da1b3844 --- /dev/null +++ b/mysql-test/suite/rpl/r/rpl_gtid_until.result @@ -0,0 +1,225 @@ +include/rpl_init.inc [topology=1->2] +ALTER TABLE mysql.rpl_slave_state ENGINE=InnoDB; +CREATE FUNCTION extract_gtid(d VARCHAR(100), s VARCHAR(100)) +RETURNS VARCHAR(100) DETERMINISTIC +BEGIN +SET s= CONCAT(",", s, ","); +SET s= SUBSTR(s FROM LOCATE(CONCAT(",", d, "-"), s) + 1); +SET s= SUBSTR(s FROM 1 FOR LOCATE(",", s) - 1); +RETURN s; +END| +START SLAVE UNTIL master_gtid_pos = ""; +ERROR HY000: Slave is already running +include/stop_slave_io.inc +START SLAVE UNTIL master_gtid_pos = ""; +ERROR HY000: Slave is already running +START SLAVE IO_THREAD; +include/wait_for_slave_io_to_start.inc +include/stop_slave_sql.inc +START SLAVE UNTIL master_gtid_pos = ""; +ERROR HY000: Slave is already running +include/stop_slave_io.inc +START SLAVE UNTIL master_gtid_pos = ""; +ERROR HY000: START SLAVE UNTIL master_gtid_pos requires that slave is using GTID +CHANGE MASTER TO master_use_gtid=1; +CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB; +INSERT INTO t1 VALUES(1); +INSERT INTO t1 VALUES(2); +START SLAVE UNTIL master_gtid_pos = "0-1-100,1-1-100,2-2-200,1-3-100,4-4-400"; +ERROR HY000: GTID 1-3-100 and 1-1-100 conflict (duplicate domain id 1) +START SLAVE UNTIL master_log_file = "master-bin.000001", master_log_pos = 4, master_gtid_pos = ""; +ERROR 42000: You have an error in your SQL syntax; check the manual that corresponds to your MariaDB server version for the right syntax to use near 'master_gtid_pos = ""' at line 1 +START SLAVE IO_THREAD UNTIL master_gtid_pos = ""; +ERROR HY000: Incorrect parameter or combination of parameters for START SLAVE UNTIL +START SLAVE SQL_THREAD UNTIL master_gtid_pos = ""; +ERROR HY000: Incorrect parameter or combination of parameters for START SLAVE UNTIL +START SLAVE UNTIL master_gtid_pos = '0-1-4'; +include/wait_for_slave_to_stop.inc +SELECT * FROM t1; +a +1 +include/start_slave.inc +SELECT * FROM t1 ORDER BY a; +a +1 +2 +include/stop_slave.inc +START SLAVE UNTIL master_gtid_pos = "1-10-100,2-20-200"; +include/wait_for_slave_to_start.inc +Using_Gtid = '1' +Until_Condition = 'Gtid' +include/stop_slave.inc +include/start_slave.inc +*** Test UNTIL condition in an earlier binlog than the start GTID. *** +include/stop_slave.inc +SET gtid_domain_id = 1; +INSERT INTO t1 VALUES (3); +SET gtid_domain_id = 2; +CREATE TABLE t2 (a INT); +INSERT INTO t2 VALUES (3); +FLUSH LOGS; +SET gtid_domain_id = 1; +INSERT INTO t1 VALUES (4); +SET gtid_domain_id = 2; +INSERT INTO t2 VALUES (4); +FLUSH LOGS; +SET gtid_domain_id = 1; +INSERT INTO t1 VALUES (5); +SET gtid_domain_id = 2; +INSERT INTO t2 VALUES (5); +FLUSH LOGS; +SET gtid_domain_id = 1; +INSERT INTO t1 VALUES (6); +SET gtid_domain_id = 2; +INSERT INTO t2 VALUES (6); +SET gtid_domain_id = 0; +show binary logs; +Log_name File_size +master-bin.000001 # +master-bin.000002 # +master-bin.000003 # +master-bin.000004 # +START SLAVE UNTIL master_gtid_pos='1-1-11,2-1-12'; +include/wait_for_slave_to_stop.inc +SELECT * FROM t1 ORDER BY a; +a +1 +2 +3 +4 +5 +SELECT * FROM t2 ORDER BY a; +a +3 +4 +5 +START SLAVE UNTIL master_gtid_pos='1-1-13,2-1-8'; +include/wait_for_slave_to_stop.inc +SELECT * FROM t1 ORDER BY a; +a +1 +2 +3 +4 +5 +6 +SELECT * FROM t2 ORDER BY a; +a +3 +4 +5 +START SLAVE UNTIL master_gtid_pos='1-1-11'; +include/wait_for_slave_to_stop.inc +SELECT * FROM t1 ORDER BY a; +a +1 +2 +3 +4 +5 +6 +SELECT * FROM t2 ORDER BY a; +a +3 +4 +5 +include/start_slave.inc +SELECT * FROM t1 ORDER BY a; +a +1 +2 +3 +4 +5 +6 +SELECT * FROM t2 ORDER BY a; +a +3 +4 +5 +6 +*** Test when the UNTIL position is right at the end of the binlog file prior to the starting position *** +include/stop_slave.inc +FLUSH LOGS; +SET gtid_domain_id = 1; +INSERT INTO t1 VALUES (7); +SET gtid_domain_id = 0; +START SLAVE UNTIL master_gtid_pos='1-1-13'; +include/wait_for_slave_to_stop.inc +SELECT * FROM t1 ORDER BY a; +a +1 +2 +3 +4 +5 +6 +include/start_slave.inc +SELECT * FROM t1 ORDER BY a; +a +1 +2 +3 +4 +5 +6 +7 +*** Test when UNTIL condition is after a stand-alone event (not a transaction). *** +include/stop_slave.inc +CREATE TABLE t3 (a INT); +DROP TABLE t3; +START SLAVE UNTIL master_gtid_pos='1-1-15,0-1-16,2-1-14'; +include/wait_for_slave_to_stop.inc +SHOW CREATE TABLE t3; +Table Create Table +t3 CREATE TABLE `t3` ( + `a` int(11) DEFAULT NULL +) ENGINE=MyISAM DEFAULT CHARSET=latin1 +include/start_slave.inc +*** Test UNTIL condition that has not yet been logged. *** +include/stop_slave.inc +RESET SLAVE ALL; +RESET MASTER; +SET GLOBAL gtid_pos=''; +RESET MASTER; +INSERT INTO t1 VALUES (10); +INSERT INTO t1 VALUES (11); +INSERT INTO t1 VALUES (12); +DELETE FROM t1 WHERE a >= 10; +RESET MASTER; +INSERT INTO t1 VALUES (10); +CHANGE MASTER TO master_host = '127.0.0.1', master_port = SERVER_MYPORT_1, +master_user = "root", master_use_gtid = 1; +START SLAVE UNTIL master_gtid_pos = '0-1-2'; +include/wait_for_slave_to_start.inc +INSERT INTO t1 VALUES (11); +INSERT INTO t1 VALUES (12); +include/wait_for_slave_to_stop.inc +SELECT * FROM t1 ORDER BY a; +a +1 +2 +3 +4 +5 +6 +7 +10 +11 +include/start_slave.inc +SELECT * FROM t1 ORDER BY a; +a +1 +2 +3 +4 +5 +6 +7 +10 +11 +12 +DROP TABLE t1; +DROP TABLE t2; +DROP FUNCTION extract_gtid; +include/rpl_end.inc diff --git a/mysql-test/suite/rpl/t/rpl_gtid_master_promote.cnf b/mysql-test/suite/rpl/t/rpl_gtid_master_promote.cnf new file mode 100644 index 00000000000..4eafa897501 --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_gtid_master_promote.cnf @@ -0,0 +1,35 @@ +!include ../my.cnf + +[mysqld.1] +log-slave-updates +loose-innodb + +[mysqld.2] +log-slave-updates +skip-slave-start +loose-innodb + +[mysqld.3] +log-slave-updates +skip-slave-start +loose-innodb + +[mysqld.4] +log-slave-updates +skip-slave-start +loose-innodb + +[mysqld.5] +log-slave-updates +skip-slave-start +loose-innodb + +[ENV] +SERVER_MYPORT_3= @mysqld.3.port +SERVER_MYSOCK_3= @mysqld.3.socket + +SERVER_MYPORT_4= @mysqld.4.port +SERVER_MYSOCK_4= @mysqld.4.socket + +SERVER_MYPORT_5= @mysqld.5.port +SERVER_MYSOCK_5= @mysqld.5.socket diff --git a/mysql-test/suite/rpl/t/rpl_gtid_master_promote.test b/mysql-test/suite/rpl/t/rpl_gtid_master_promote.test new file mode 100644 index 00000000000..68935011c20 --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_gtid_master_promote.test @@ -0,0 +1,267 @@ +--source include/have_innodb.inc +--let $rpl_topology=1->2, 1->3, 1->4, 1->5 +--source include/rpl_init.inc + +# Set up a topology with one master and 4 slaves. +# +# Replicate some events leaving the four slaves at different points +# in different domains. +# +# Then promote one slave as new master, bringing it ahead of all others +# using START SLAVE UNTIL master_gtid_pos. + +--connection server_1 +ALTER TABLE mysql.rpl_slave_state ENGINE=InnoDB; +CREATE TABLE t4 (a INT, b INT, PRIMARY KEY (a,b)) Engine=InnoDB; + +# Function to extract one GTID from a list. +delimiter |; +CREATE FUNCTION extract_gtid(d VARCHAR(100), s VARCHAR(100)) + RETURNS VARCHAR(100) DETERMINISTIC +BEGIN + SET s= CONCAT(",", s, ","); + SET s= SUBSTR(s FROM LOCATE(CONCAT(",", d, "-"), s) + 1); + SET s= SUBSTR(s FROM 1 FOR LOCATE(",", s) - 1); + RETURN s; +END| +delimiter ;| + +--save_master_pos + +--connection server_2 +--sync_with_master +--source include/stop_slave.inc +CHANGE MASTER TO master_use_gtid=1; + +--connection server_3 +--sync_with_master +--source include/stop_slave.inc +CHANGE MASTER TO master_use_gtid=1; + +--connection server_4 +--sync_with_master +--source include/stop_slave.inc +CHANGE MASTER TO master_use_gtid=1; + +--connection server_5 +--sync_with_master +--source include/stop_slave.inc +CHANGE MASTER TO master_use_gtid=1; + + +# Create three separate replication streams on master server_1. +# +# Then use START SLAVE UNTIL to get the different streams interleaved +# differently spread over multiple binlogs on the different slaves, to +# test that new master promotion is able to deal with this. + +--connection server_1 + +SET gtid_domain_id= 1; +CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB; +--let $d1_begin= `SELECT extract_gtid("1", @@GLOBAL.gtid_pos)` +INSERT INTO t1 VALUES (1); +INSERT INTO t1 VALUES (2); +INSERT INTO t4 VALUES (1, 1); +--let $d1_mid= `SELECT extract_gtid("1", @@GLOBAL.gtid_pos)` +INSERT INTO t1 VALUES (3); +INSERT INTO t1 VALUES (4); +INSERT INTO t4 VALUES (1, 3); +--let $d1_end= `SELECT extract_gtid("1", @@GLOBAL.gtid_pos)` + +SET gtid_domain_id= 2; +CREATE TABLE t2 (a INT PRIMARY KEY) ENGINE=InnoDB; +--let $d2_begin= `SELECT extract_gtid("2", @@GLOBAL.gtid_pos)` +INSERT INTO t2 VALUES (1); +INSERT INTO t2 VALUES (2); +INSERT INTO t4 VALUES (2, 1); +--let $d2_mid= `SELECT extract_gtid("2", @@GLOBAL.gtid_pos)` +INSERT INTO t2 VALUES (3); +INSERT INTO t2 VALUES (4); +INSERT INTO t4 VALUES (2, 3); +--let $d2_end= `SELECT extract_gtid("2", @@GLOBAL.gtid_pos)` + +SET gtid_domain_id= 3; +CREATE TABLE t3 (a INT PRIMARY KEY) ENGINE=InnoDB; +--let $d3_begin= `SELECT extract_gtid("3", @@GLOBAL.gtid_pos)` +INSERT INTO t3 VALUES (1); +INSERT INTO t3 VALUES (2); +INSERT INTO t4 VALUES (3, 1); +--let $d3_mid= `SELECT extract_gtid("3", @@GLOBAL.gtid_pos)` +INSERT INTO t3 VALUES (3); +INSERT INTO t3 VALUES (4); +INSERT INTO t4 VALUES (3, 3); +--let $d3_end= `SELECT extract_gtid("3", @@GLOBAL.gtid_pos)` + + +# Slave server_2 (that will be promoted to master) is in the middle +# of each stream. +--connection server_2 +eval START SLAVE UNTIL master_gtid_pos= "$d1_mid,$d2_mid,$d3_mid"; + +# The remaining slaves sit at different points each in different domains. +--connection server_3 +eval START SLAVE UNTIL master_gtid_pos= "$d1_begin,$d2_mid,$d3_end"; +--connection server_4 +eval START SLAVE UNTIL master_gtid_pos= "$d2_begin,$d3_mid,$d1_end"; +--connection server_5 +eval START SLAVE UNTIL master_gtid_pos= "$d3_begin,$d1_mid,$d2_end"; +--connection server_2 +--source include/wait_for_slave_to_stop.inc +SELECT * FROM t1 ORDER BY a; +SELECT * FROM t2 ORDER BY a; +SELECT * FROM t3 ORDER BY a; +SELECT * FROM t4 ORDER BY a,b; +--connection server_3 +--source include/wait_for_slave_to_stop.inc +SELECT * FROM t1 ORDER BY a; +SELECT * FROM t2 ORDER BY a; +SELECT * FROM t3 ORDER BY a; +SELECT * FROM t4 ORDER BY a,b; +--connection server_4 +--source include/wait_for_slave_to_stop.inc +SELECT * FROM t1 ORDER BY a; +SELECT * FROM t2 ORDER BY a; +SELECT * FROM t3 ORDER BY a; +SELECT * FROM t4 ORDER BY a,b; +--connection server_5 +--source include/wait_for_slave_to_stop.inc +SELECT * FROM t1 ORDER BY a; +SELECT * FROM t2 ORDER BY a; +SELECT * FROM t3 ORDER BY a; +SELECT * FROM t4 ORDER BY a,b; + + +--echo *** Now replicate all extra changes from 3,4,5 to 2, in preparation for making 2 the new master. *** + +--connection server_3 +--let $server3_pos= `SELECT @@GLOBAL.gtid_pos` +--connection server_2 +--replace_result $SERVER_MYPORT_3 SERVER_MYPORT_3 +eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $SERVER_MYPORT_3; +eval START SLAVE UNTIL master_gtid_pos = "$server3_pos"; +--source include/wait_for_slave_to_stop.inc +SELECT * FROM t1 ORDER BY a; +SELECT * FROM t2 ORDER BY a; +SELECT * FROM t3 ORDER BY a; +SELECT * FROM t4 ORDER BY a,b; + +--connection server_4 +--let $server4_pos= `SELECT @@GLOBAL.gtid_pos` +--connection server_2 +--replace_result $SERVER_MYPORT_4 SERVER_MYPORT_4 +eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $SERVER_MYPORT_4; +eval START SLAVE UNTIL master_gtid_pos = "$server4_pos"; +--source include/wait_for_slave_to_stop.inc +SELECT * FROM t1 ORDER BY a; +SELECT * FROM t2 ORDER BY a; +SELECT * FROM t3 ORDER BY a; +SELECT * FROM t4 ORDER BY a,b; + +--connection server_5 +--let $server5_pos= `SELECT @@GLOBAL.gtid_pos` +--connection server_2 +--replace_result $SERVER_MYPORT_5 SERVER_MYPORT_5 +eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $SERVER_MYPORT_5; +eval START SLAVE UNTIL master_gtid_pos = "$server5_pos"; +--source include/wait_for_slave_to_stop.inc +SELECT * FROM t1 ORDER BY a; +SELECT * FROM t2 ORDER BY a; +SELECT * FROM t3 ORDER BY a; +SELECT * FROM t4 ORDER BY a,b; + +--echo *** Now make 2 master and point 3,4,5 to the new master 2 +--connection server_2 +SET gtid_domain_id= 1; +INSERT INTO t1 values (5); +INSERT INTO t4 values (1,5); +SET gtid_domain_id= 2; +INSERT INTO t2 values (5); +INSERT INTO t4 values (2,5); +SET gtid_domain_id= 3; +INSERT INTO t3 values (5); +INSERT INTO t4 values (3,5); + +--connection server_3 +--replace_result $SERVER_MYPORT_2 SERVER_MYPORT_2 +eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $SERVER_MYPORT_2; +--source include/start_slave.inc +--connection server_4 +--replace_result $SERVER_MYPORT_2 SERVER_MYPORT_2 +eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $SERVER_MYPORT_2; +--source include/start_slave.inc +--connection server_5 +--replace_result $SERVER_MYPORT_2 SERVER_MYPORT_2 +eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $SERVER_MYPORT_2; +--source include/start_slave.inc + +--connection server_2 +--save_master_pos + +--connection server_3 +--sync_with_master +SELECT * FROM t1 ORDER BY a; +SELECT * FROM t2 ORDER BY a; +SELECT * FROM t3 ORDER BY a; +SELECT * FROM t4 ORDER BY a,b; +--connection server_5 +--sync_with_master +SELECT * FROM t1 ORDER BY a; +SELECT * FROM t2 ORDER BY a; +SELECT * FROM t3 ORDER BY a; +SELECT * FROM t4 ORDER BY a,b; +--connection server_5 +--sync_with_master +SELECT * FROM t1 ORDER BY a; +SELECT * FROM t2 ORDER BY a; +SELECT * FROM t3 ORDER BY a; +SELECT * FROM t4 ORDER BY a,b; + + +--echo *** Now let the old master join up as slave. *** +--connection server_1 +--replace_result $SERVER_MYPORT_2 SERVER_MYPORT_2 +eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $SERVER_MYPORT_2, + master_user = "root", master_use_gtid = 1; +--source include/start_slave.inc +--sync_with_master +SELECT * FROM t1 ORDER BY a; +SELECT * FROM t2 ORDER BY a; +SELECT * FROM t3 ORDER BY a; +SELECT * FROM t4 ORDER BY a,b; + + +--echo *** Finally move things back and clean up. *** +--connection server_1 +--source include/stop_slave.inc +RESET SLAVE ALL; + +--connection server_2 +--replace_result $SERVER_MYPORT_1 SERVER_MYPORT_1 +eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $SERVER_MYPORT_1; +--source include/start_slave.inc +--connection server_3 +--source include/stop_slave.inc +--replace_result $SERVER_MYPORT_1 SERVER_MYPORT_1 +eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $SERVER_MYPORT_1; +--source include/start_slave.inc +--connection server_4 +--source include/stop_slave.inc +--replace_result $SERVER_MYPORT_1 SERVER_MYPORT_1 +eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $SERVER_MYPORT_1; +--source include/start_slave.inc +--connection server_5 +--source include/stop_slave.inc +--replace_result $SERVER_MYPORT_1 SERVER_MYPORT_1 +eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $SERVER_MYPORT_1; +--source include/start_slave.inc + +--connection server_1 +SET gtid_domain_id = 0; +DROP TABLE t1; +DROP TABLE t2; +DROP TABLE t3; +DROP TABLE t4; +DROP FUNCTION extract_gtid; + +--source include/rpl_end.inc diff --git a/mysql-test/suite/rpl/t/rpl_gtid_until.test b/mysql-test/suite/rpl/t/rpl_gtid_until.test new file mode 100644 index 00000000000..3b6e238f225 --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_gtid_until.test @@ -0,0 +1,234 @@ +--source include/have_innodb.inc +--let $rpl_topology=1->2 +--source include/rpl_init.inc + +--connection server_1 +ALTER TABLE mysql.rpl_slave_state ENGINE=InnoDB; +# Function to extract one GTID from a list. +delimiter |; +CREATE FUNCTION extract_gtid(d VARCHAR(100), s VARCHAR(100)) + RETURNS VARCHAR(100) DETERMINISTIC +BEGIN + SET s= CONCAT(",", s, ","); + SET s= SUBSTR(s FROM LOCATE(CONCAT(",", d, "-"), s) + 1); + SET s= SUBSTR(s FROM 1 FOR LOCATE(",", s) - 1); + RETURN s; +END| +delimiter ;| +--save_master_pos + +--connection server_2 +--sync_with_master + +# Both replication threads must be stopped for UNTIL master_gtid_pos. +--error ER_SLAVE_WAS_RUNNING +START SLAVE UNTIL master_gtid_pos = ""; +--source include/stop_slave_io.inc +--error ER_SLAVE_WAS_RUNNING +START SLAVE UNTIL master_gtid_pos = ""; +START SLAVE IO_THREAD; +--source include/wait_for_slave_io_to_start.inc +--source include/stop_slave_sql.inc +--error ER_SLAVE_WAS_RUNNING +START SLAVE UNTIL master_gtid_pos = ""; +--source include/stop_slave_io.inc +# UNTIL master_gtid_pos only valid if GTID is used. + +--error ER_UNTIL_REQUIRES_USING_GTID +START SLAVE UNTIL master_gtid_pos = ""; + +CHANGE MASTER TO master_use_gtid=1; + +--connection server_1 +CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB; +INSERT INTO t1 VALUES(1); +--let $gtid_pos=`SELECT @@GLOBAL.gtid_pos` +INSERT INTO t1 VALUES(2); + +--connection server_2 + +# Test various incorrect syntax for UNTIL master_gtid_pos. +--error ER_DUPLICATE_GTID_DOMAIN +START SLAVE UNTIL master_gtid_pos = "0-1-100,1-1-100,2-2-200,1-3-100,4-4-400"; +--error ER_PARSE_ERROR +START SLAVE UNTIL master_log_file = "master-bin.000001", master_log_pos = 4, master_gtid_pos = ""; +--error ER_BAD_SLAVE_UNTIL_COND +START SLAVE IO_THREAD UNTIL master_gtid_pos = ""; +--error ER_BAD_SLAVE_UNTIL_COND +START SLAVE SQL_THREAD UNTIL master_gtid_pos = ""; + +eval START SLAVE UNTIL master_gtid_pos = '$gtid_pos'; + +--source include/wait_for_slave_to_stop.inc +SELECT * FROM t1; + +--source include/start_slave.inc + +--connection server_1 +--save_master_pos + +--connection server_2 +--sync_with_master +SELECT * FROM t1 ORDER BY a; + +# Test showing the UNTIL condition in SHOW SLAVE STATUS. +--source include/stop_slave.inc +START SLAVE UNTIL master_gtid_pos = "1-10-100,2-20-200"; +--source include/wait_for_slave_to_start.inc +--let $status_items= Using_Gtid,Until_Condition +--source include/show_slave_status.inc + +# Clear the UNTIL condition. +--source include/stop_slave.inc +--source include/start_slave.inc + + +--echo *** Test UNTIL condition in an earlier binlog than the start GTID. *** +--connection server_2 +--source include/stop_slave.inc + +--connection server_1 +SET gtid_domain_id = 1; +INSERT INTO t1 VALUES (3); +SET gtid_domain_id = 2; +CREATE TABLE t2 (a INT); +INSERT INTO t2 VALUES (3); +--let $d1_point1= `SELECT extract_gtid("1", @@GLOBAL.gtid_pos)` +--let $d2_point1= `SELECT extract_gtid("2", @@GLOBAL.gtid_pos)` +FLUSH LOGS; +SET gtid_domain_id = 1; +INSERT INTO t1 VALUES (4); +SET gtid_domain_id = 2; +INSERT INTO t2 VALUES (4); +FLUSH LOGS; +SET gtid_domain_id = 1; +INSERT INTO t1 VALUES (5); +--let $d1_point2= `SELECT extract_gtid("1", @@GLOBAL.gtid_pos)` +--let $d2_point2= `SELECT extract_gtid("2", @@GLOBAL.gtid_pos)` +SET gtid_domain_id = 2; +INSERT INTO t2 VALUES (5); +FLUSH LOGS; +SET gtid_domain_id = 1; +INSERT INTO t1 VALUES (6); +--let $d1_point3= `SELECT extract_gtid("1", @@GLOBAL.gtid_pos)` +--let $d2_point3= `SELECT extract_gtid("2", @@GLOBAL.gtid_pos)` +SET gtid_domain_id = 2; +INSERT INTO t2 VALUES (6); +SET gtid_domain_id = 0; +--source include/show_binary_logs.inc +--save_master_pos + +--connection server_2 +# Let the slave reach an middle point in domain 1 and a late point in domain 2. +eval START SLAVE UNTIL master_gtid_pos='$d1_point2,$d2_point3'; +--source include/wait_for_slave_to_stop.inc +SELECT * FROM t1 ORDER BY a; +SELECT * FROM t2 ORDER BY a; +# Now test starting at a middle point in the binlogs when the stop position in +# one domain (domain 2) is early. +eval START SLAVE UNTIL master_gtid_pos='$d1_point3,$d2_point1'; +--source include/wait_for_slave_to_stop.inc +SELECT * FROM t1 ORDER BY a; +SELECT * FROM t2 ORDER BY a; +# Test that one UNTIL domain empty means stop that domain immediately. +eval START SLAVE UNTIL master_gtid_pos='$d1_point2'; +--source include/wait_for_slave_to_stop.inc +SELECT * FROM t1 ORDER BY a; +SELECT * FROM t2 ORDER BY a; + +--source include/start_slave.inc +--sync_with_master +SELECT * FROM t1 ORDER BY a; +SELECT * FROM t2 ORDER BY a; + +--echo *** Test when the UNTIL position is right at the end of the binlog file prior to the starting position *** + +--connection server_2 +--source include/stop_slave.inc + +--connection server_1 +FLUSH LOGS; +SET gtid_domain_id = 1; +INSERT INTO t1 VALUES (7); +SET gtid_domain_id = 0; +--save_master_pos + +--connection server_2 +eval START SLAVE UNTIL master_gtid_pos='$d1_point3'; +--source include/wait_for_slave_to_stop.inc +# This should not show row 7, as we requested stop just before it. +SELECT * FROM t1 ORDER BY a; +--source include/start_slave.inc +--sync_with_master +SELECT * FROM t1 ORDER BY a; + + +--echo *** Test when UNTIL condition is after a stand-alone event (not a transaction). *** + +--connection server_2 +--source include/stop_slave.inc + +--connection server_1 +CREATE TABLE t3 (a INT); +--let $until_condition=`SELECT @@GLOBAL.gtid_pos` +DROP TABLE t3; +--save_master_pos + +--connection server_2 +eval START SLAVE UNTIL master_gtid_pos='$until_condition'; +--source include/wait_for_slave_to_stop.inc +SHOW CREATE TABLE t3; +--source include/start_slave.inc +--sync_with_master + +--echo *** Test UNTIL condition that has not yet been logged. *** + +--connection server_2 +--source include/stop_slave.inc +RESET SLAVE ALL; +RESET MASTER; +SET GLOBAL gtid_pos=''; + +--connection server_1 +# Do it once to compute the right GTID, then throw it away and do it again +# for the actual test. +RESET MASTER; +INSERT INTO t1 VALUES (10); +INSERT INTO t1 VALUES (11); +--let $until_condition=`SELECT @@GLOBAL.gtid_pos` +INSERT INTO t1 VALUES (12); +DELETE FROM t1 WHERE a >= 10; + +RESET MASTER; +INSERT INTO t1 VALUES (10); + +--connection server_2 +--replace_result $SERVER_MYPORT_1 SERVER_MYPORT_1 +eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $SERVER_MYPORT_1, + master_user = "root", master_use_gtid = 1; +eval START SLAVE UNTIL master_gtid_pos = '$until_condition'; +--source include/wait_for_slave_to_start.inc + +--connection server_1 +INSERT INTO t1 VALUES (11); +INSERT INTO t1 VALUES (12); +--save_master_pos + +--connection server_2 +# This then should wait until it gets the row (11) and then stop, not +# yet having the row (12). +--source include/wait_for_slave_to_stop.inc +SELECT * FROM t1 ORDER BY a; +--source include/start_slave.inc +--sync_with_master +# And now the row (12) should be there. +SELECT * FROM t1 ORDER BY a; + + +# Clean up. +--connection server_1 +DROP TABLE t1; +DROP TABLE t2; +DROP FUNCTION extract_gtid; + +--source include/rpl_end.inc diff --git a/sql/lex.h b/sql/lex.h index edf833021b0..756e7e80f7e 100644 --- a/sql/lex.h +++ b/sql/lex.h @@ -330,7 +330,7 @@ static SYMBOL symbols[] = { { "LOW_PRIORITY", SYM(LOW_PRIORITY)}, { "MASTER", SYM(MASTER_SYM)}, { "MASTER_CONNECT_RETRY", SYM(MASTER_CONNECT_RETRY_SYM)}, - { "MASTER_USE_GTID", SYM(MASTER_USE_GTID_SYM)}, + { "MASTER_GTID_POS", SYM(MASTER_GTID_POS_SYM)}, { "MASTER_HOST", SYM(MASTER_HOST_SYM)}, { "MASTER_LOG_FILE", SYM(MASTER_LOG_FILE_SYM)}, { "MASTER_LOG_POS", SYM(MASTER_LOG_POS_SYM)}, @@ -345,6 +345,7 @@ static SYMBOL symbols[] = { { "MASTER_SSL_KEY", SYM(MASTER_SSL_KEY_SYM)}, { "MASTER_SSL_VERIFY_SERVER_CERT", SYM(MASTER_SSL_VERIFY_SERVER_CERT_SYM)}, { "MASTER_USER", SYM(MASTER_USER_SYM)}, + { "MASTER_USE_GTID", SYM(MASTER_USE_GTID_SYM)}, { "MASTER_HEARTBEAT_PERIOD", SYM(MASTER_HEARTBEAT_PERIOD_SYM)}, { "MATCH", SYM(MATCH)}, { "MAX_CONNECTIONS_PER_HOUR", SYM(MAX_CONNECTIONS_PER_HOUR)}, diff --git a/sql/log.cc b/sql/log.cc index c0cd4be8a45..3266934beda 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -3280,7 +3280,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name, there had been an entry (domain_id, server_id, 0). */ - Gtid_list_log_event gl_ev(&rpl_global_gtid_binlog_state); + Gtid_list_log_event gl_ev(&rpl_global_gtid_binlog_state, 0); if (gl_ev.write(&log_file)) goto err; @@ -8718,16 +8718,11 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, case GTID_LIST_EVENT: if (first_round) { - uint32 i; Gtid_list_log_event *glev= (Gtid_list_log_event *)ev; /* Initialise the binlog state from the Gtid_list event. */ - rpl_global_gtid_binlog_state.reset(); - for (i= 0; i < glev->count; ++i) - { - if (rpl_global_gtid_binlog_state.update(&(glev->list[i]))) - goto err2; - } + if (rpl_global_gtid_binlog_state.load(glev->list, glev->count)) + goto err2; } break; diff --git a/sql/log_event.cc b/sql/log_event.cc index aa86fa6ff62..9fbcbb68145 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -6320,6 +6320,7 @@ Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len, : Log_event(buf, description_event), count(0), list(0) { uint32 i; + uint32 val; uint8 header_size= description_event->common_header_len; uint8 post_header_len= description_event->post_header_len[GTID_LIST_EVENT-1]; if (event_len < header_size + post_header_len || @@ -6327,7 +6328,9 @@ Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len, return; buf+= header_size; - count= uint4korr(buf) & ((1<<28)-1); + val= uint4korr(buf); + count= val & ((1<<28)-1); + gl_flags= val & ((uint32)0xf << 28); buf+= 4; if (event_len - (header_size + post_header_len) < count*element_size || (!(list= (rpl_gtid *)my_malloc(count*sizeof(*list) + (count == 0), @@ -6348,8 +6351,9 @@ Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len, #ifdef MYSQL_SERVER -Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set) - : count(gtid_set->count()), list(0) +Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set, + uint32 gl_flags_) + : count(gtid_set->count()), gl_flags(gl_flags_), list(0) { cache_type= EVENT_NO_CACHE; /* Failure to allocate memory will be caught by is_valid() returning false. */ @@ -6359,32 +6363,70 @@ Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set) gtid_set->get_gtid_list(list, count); } + +#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) bool -Gtid_list_log_event::write(IO_CACHE *file) +Gtid_list_log_event::to_packet(String *packet) { uint32 i; - uchar buf[element_size]; + uchar *p; + uint32 needed_length; DBUG_ASSERT(count < 1<<28); - if (write_header(file, get_data_size())) - return 1; - int4store(buf, count & ((1<<28)-1)); - if (wrapper_my_b_safe_write(file, buf, GTID_LIST_HEADER_LEN)) - return 1; + needed_length= packet->length() + 4 + count*element_size; + if (packet->reserve(needed_length)) + return true; + p= (uchar *)packet->ptr() + packet->length();; + packet->length(needed_length); + int4store(p, (count & ((1<<28)-1)) | gl_flags); + p += 4; for (i= 0; i < count; ++i) { - int4store(buf, list[i].domain_id); - int4store(buf+4, list[i].server_id); - int8store(buf+8, list[i].seq_no); - if (wrapper_my_b_safe_write(file, buf, element_size)) - return 1; + int4store(p, list[i].domain_id); + int4store(p+4, list[i].server_id); + int8store(p+8, list[i].seq_no); + p += 16; } - return write_footer(file); + + return false; +} + + +bool +Gtid_list_log_event::write(IO_CACHE *file) +{ + char buf[128]; + String packet(buf, sizeof(buf), system_charset_info); + + packet.length(0); + if (to_packet(&packet)) + return true; + return + write_header(file, get_data_size()) || + wrapper_my_b_safe_write(file, (uchar *)packet.ptr(), packet.length()) || + write_footer(file); +} + + +int +Gtid_list_log_event::do_apply_event(Relay_log_info const *rli) +{ + int ret= Log_event::do_apply_event(rli); + if (rli->until_condition == Relay_log_info::UNTIL_GTID && + (gl_flags & FLAG_UNTIL_REACHED)) + { + char str_buf[128]; + String str(str_buf, sizeof(str_buf), system_charset_info); + const_cast<Relay_log_info*>(rli)->until_gtid_pos.to_string(&str); + sql_print_information("Slave SQL thread stops because it reached its" + " UNTIL master_gtid_pos %s", str.c_ptr_safe()); + const_cast<Relay_log_info*>(rli)->abort_slave= true; + } + return ret; } -#ifdef HAVE_REPLICATION void Gtid_list_log_event::pack_info(THD *thd, Protocol *protocol) { @@ -6439,12 +6481,24 @@ Gtid_list_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) */ bool Gtid_list_log_event::peek(const char *event_start, uint32 event_len, + uint8 checksum_alg, rpl_gtid **out_gtid_list, uint32 *out_list_len) { const char *p; uint32 count_field, count; rpl_gtid *gtid_list; + if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32) + { + if (event_len > BINLOG_CHECKSUM_LEN) + event_len-= BINLOG_CHECKSUM_LEN; + else + event_len= 0; + } + else + DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF || + checksum_alg == BINLOG_CHECKSUM_ALG_OFF); + if (event_len < LOG_EVENT_HEADER_LEN + GTID_LIST_HEADER_LEN) return true; p= event_start + LOG_EVENT_HEADER_LEN; diff --git a/sql/log_event.h b/sql/log_event.h index 5026b280b27..21d60cac928 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -3116,7 +3116,7 @@ public: <td>count</td> <td>4 byte unsigned integer</td> <td>The lower 28 bits are the number of GTIDs. The upper 4 bits are - reserved for flags bits for future expansion</td> + flags bits.</td> </tr> </table> @@ -3149,18 +3149,28 @@ public: </table> The three elements in the body repeat COUNT times to form the GTID list. + + At the time of writing, only one flag bit is in use. + + Bit 28 of `count' is used for flag FLAG_UNTIL_REACHED, which is sent in a + Gtid_list event from the master to the slave to indicate that the START + SLAVE UNTIL master_gtid_pos=xxx condition has been reached. (This flag is + only sent in "fake" events generated on the fly, it is not written into + the binlog). */ class Gtid_list_log_event: public Log_event { public: uint32 count; + uint32 gl_flags; struct rpl_gtid *list; static const uint element_size= 4+4+8; + static const uint32 FLAG_UNTIL_REACHED= (1<<28); #ifdef MYSQL_SERVER - Gtid_list_log_event(rpl_binlog_state *gtid_set); + Gtid_list_log_event(rpl_binlog_state *gtid_set, uint32 gl_flags); #ifdef HAVE_REPLICATION void pack_info(THD *thd, Protocol *protocol); #endif @@ -3173,10 +3183,13 @@ public: Log_event_type get_type_code() { return GTID_LIST_EVENT; } int get_data_size() { return GTID_LIST_HEADER_LEN + count*element_size; } bool is_valid() const { return list != NULL; } -#ifdef MYSQL_SERVER +#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) + bool to_packet(String *packet); bool write(IO_CACHE *file); + virtual int do_apply_event(Relay_log_info const *rli); #endif static bool peek(const char *event_start, uint32 event_len, + uint8 checksum_alg, rpl_gtid **out_gtid_list, uint32 *out_list_len); }; diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc index bda060115ed..a4bdeb9932b 100644 --- a/sql/rpl_gtid.cc +++ b/sql/rpl_gtid.cc @@ -711,6 +711,22 @@ void rpl_binlog_state::free() } } + +bool +rpl_binlog_state::load(struct rpl_gtid *list, uint32 count) +{ + uint32 i; + + reset(); + for (i= 0; i < count; ++i) + { + if (update(&(list[i]))) + return true; + } + return false; +} + + rpl_binlog_state::~rpl_binlog_state() { free(); @@ -1117,10 +1133,17 @@ slave_connection_state::remove(const rpl_gtid *in_gtid) int slave_connection_state::to_string(String *out_str) { + out_str->length(0); + return append_to_string(out_str); +} + + +int +slave_connection_state::append_to_string(String *out_str) +{ uint32 i; bool first; - out_str->length(0); first= true; for (i= 0; i < hash.records; ++i) { diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h index 046533fd760..bd6663c0659 100644 --- a/sql/rpl_gtid.h +++ b/sql/rpl_gtid.h @@ -142,6 +142,7 @@ struct rpl_binlog_state void reset(); void free(); + bool load(struct rpl_gtid *list, uint32 count); int update(const struct rpl_gtid *gtid); uint64 seq_no_from_state(); int write_to_iocache(IO_CACHE *dest); @@ -172,6 +173,7 @@ struct slave_connection_state void remove(const rpl_gtid *gtid); ulong count() const { return hash.records; } int to_string(String *out_str); + int append_to_string(String *out_str); }; extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid, diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index ec2ca048976..d27a80313ac 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -1096,7 +1096,8 @@ bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev) ulonglong log_pos; DBUG_ENTER("Relay_log_info::is_until_satisfied"); - DBUG_ASSERT(until_condition != UNTIL_NONE); + DBUG_ASSERT(until_condition == UNTIL_MASTER_POS || + until_condition == UNTIL_RELAY_POS); if (until_condition == UNTIL_MASTER_POS) { diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 7aff6720aac..6dd757343fd 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -263,7 +263,9 @@ public: thread is running). */ - enum {UNTIL_NONE= 0, UNTIL_MASTER_POS, UNTIL_RELAY_POS} until_condition; + enum { + UNTIL_NONE= 0, UNTIL_MASTER_POS, UNTIL_RELAY_POS, UNTIL_GTID + } until_condition; char until_log_name[FN_REFLEN]; ulonglong until_log_pos; /* extension extracted from log_name and converted to int */ @@ -277,6 +279,8 @@ public: UNTIL_LOG_NAMES_CMP_UNKNOWN= -2, UNTIL_LOG_NAMES_CMP_LESS= -1, UNTIL_LOG_NAMES_CMP_EQUAL= 0, UNTIL_LOG_NAMES_CMP_GREATER= 1 } until_log_names_cmp_result; + /* Condition for UNTIL master_gtid_pos. */ + slave_connection_state until_gtid_pos; char cached_charset[6]; /* @@ -354,6 +358,8 @@ public: bool is_until_satisfied(THD *thd, Log_event *ev); inline ulonglong until_pos() { + DBUG_ASSERT(until_condition == UNTIL_MASTER_POS || + until_condition == UNTIL_RELAY_POS); return ((until_condition == UNTIL_MASTER_POS) ? group_master_log_pos : group_relay_log_pos); } diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt index 5dbc7637552..d7d5710b048 100644 --- a/sql/share/errmsg-utf8.txt +++ b/sql/share/errmsg-utf8.txt @@ -6543,3 +6543,5 @@ ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG eng "Requested GTID_POS %u-%u-%llu conflicts with the binary log which contains a more recent GTID %u-%u-%llu. To use the requested GTID_POS, the old binlog must be removed with RESET MASTER to avoid out-of-order binlog" ER_MASTER_GTID_POS_MISSING_DOMAIN eng "Requested GTID_POS contains no value for replication domain %u. This conflicts with the binary log which contains GTID %u-%u-%llu. To use the requested GTID_POS, the old binlog must be removed with RESET MASTER to avoid out-of-order binlog" +ER_UNTIL_REQUIRES_USING_GTID + eng "START SLAVE UNTIL master_gtid_pos requires that slave is using GTID" diff --git a/sql/slave.cc b/sql/slave.cc index f0a9b27707c..1b3154e20ab 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -1879,6 +1879,43 @@ after_set_capability: goto err; } } + + if (mi->rli.until_condition == Relay_log_info::UNTIL_GTID) + { + connect_state.length(0); + connect_state.append(STRING_WITH_LEN("SET @slave_until_gtid='"), + system_charset_info); + if (mi->rli.until_gtid_pos.append_to_string(&connect_state)) + { + err_code= ER_OUTOFMEMORY; + errmsg= "The slave I/O thread stops because a fatal out-of-memory " + "error is encountered when it tries to compute @slave_until_gtid."; + sprintf(err_buff, "%s Error: Out of memory", errmsg); + goto err; + } + connect_state.append(STRING_WITH_LEN("'"), system_charset_info); + + rc= mysql_real_query(mysql, connect_state.ptr(), connect_state.length()); + if (rc) + { + err_code= mysql_errno(mysql); + if (is_network_error(err_code)) + { + mi->report(ERROR_LEVEL, err_code, + "Setting @slave_until_gtid failed with error: %s", + mysql_error(mysql)); + goto network_err; + } + else + { + /* Fatal error */ + errmsg= "The slave I/O thread stops because a fatal error is " + "encountered when it tries to set @slave_until_gtid."; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + goto err; + } + } + } } if (!mi->using_gtid) { @@ -2363,7 +2400,8 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, protocol->store( mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None": ( mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master": - "Relay"), &my_charset_bin); + ( mi->rli.until_condition==Relay_log_info::UNTIL_RELAY_POS? "Relay": + "Gtid")), &my_charset_bin); protocol->store(mi->rli.until_log_name, &my_charset_bin); protocol->store((ulonglong) mi->rli.until_log_pos); @@ -3057,7 +3095,8 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli) This tests if the position of the beginning of the current event hits the UNTIL barrier. */ - if (rli->until_condition != Relay_log_info::UNTIL_NONE && + if ((rli->until_condition == Relay_log_info::UNTIL_MASTER_POS || + rli->until_condition == Relay_log_info::UNTIL_RELAY_POS) && rli->is_until_satisfied(thd, ev)) { char buf[22]; @@ -3954,7 +3993,8 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME, saved_master_log_pos= rli->group_master_log_pos; saved_skip= rli->slave_skip_counter; } - if (rli->until_condition != Relay_log_info::UNTIL_NONE && + if ((rli->until_condition == Relay_log_info::UNTIL_MASTER_POS || + rli->until_condition == Relay_log_info::UNTIL_RELAY_POS) && rli->is_until_satisfied(thd, NULL)) { char buf[22]; @@ -4793,7 +4833,43 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) } break; + case GTID_LIST_EVENT: + { + const char *errmsg; + Gtid_list_log_event *glev; + Log_event *tmp; + + if (mi->rli.until_condition != Relay_log_info::UNTIL_GTID) + goto default_action; + if (!(tmp= Log_event::read_log_event(buf, event_len, &errmsg, + mi->rli.relay_log.description_event_for_queue, + opt_slave_sql_verify_checksum))) + { + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + goto err; + } + glev= static_cast<Gtid_list_log_event *>(tmp); + if (glev->gl_flags & Gtid_list_log_event::FLAG_UNTIL_REACHED) + { + char str_buf[128]; + String str(str_buf, sizeof(str_buf), system_charset_info); + mi->rli.until_gtid_pos.to_string(&str); + sql_print_information("Slave IO thread stops because it reached its" + " UNTIL master_gtid_pos %s", str.c_ptr_safe()); + mi->abort_slave= true; + } + delete glev; + + /* + Do not update position for fake Gtid_list event (which has a zero + end_log_pos). + */ + inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0; + } + break; + default: + default_action: inc_pos= event_len; break; } diff --git a/sql/sql_lex.h b/sql/sql_lex.h index 8bfcddefdfd..4c1d917aeae 100644 --- a/sql/sql_lex.h +++ b/sql/sql_lex.h @@ -289,6 +289,8 @@ struct LEX_MASTER_INFO char *ssl_key, *ssl_cert, *ssl_ca, *ssl_capath, *ssl_cipher; char *relay_log_name; LEX_STRING connection_name; + /* Value in START SLAVE UNTIL master_gtid_pos=xxx */ + LEX_STRING gtid_pos_str; ulonglong pos; ulong relay_log_pos; ulong server_id; @@ -317,6 +319,8 @@ struct LEX_MASTER_INFO heartbeat_period= 0; ssl= ssl_verify_server_cert= heartbeat_opt= repl_ignore_server_ids_opt= use_gtid_opt= LEX_MI_UNCHANGED; + gtid_pos_str.length= 0; + gtid_pos_str.str= NULL; } }; diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 32e653cebff..aa21d191166 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -30,6 +30,14 @@ #include "rpl_handler.h" #include "debug_sync.h" + +enum enum_gtid_until_state { + GTID_UNTIL_NOT_DONE, + GTID_UNTIL_STOP_AFTER_STANDALONE, + GTID_UNTIL_STOP_AFTER_TRANSACTION +}; + + int max_binlog_dump_events = 0; // unlimited my_bool opt_sporadic_binlog_dump_fail = 0; #ifndef DBUG_OFF @@ -38,6 +46,74 @@ static int binlog_dump_count = 0; extern TYPELIB binlog_checksum_typelib; + +static int +fake_event_header(String* packet, Log_event_type event_type, ulong extra_len, + my_bool *do_checksum, ha_checksum *crc, const char** errmsg, + uint8 checksum_alg_arg) +{ + char header[LOG_EVENT_HEADER_LEN]; + ulong event_len; + + *do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF && + checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF; + + /* + 'when' (the timestamp) is set to 0 so that slave could distinguish between + real and fake Rotate events (if necessary) + */ + memset(header, 0, 4); + header[EVENT_TYPE_OFFSET] = (uchar)event_type; + event_len= LOG_EVENT_HEADER_LEN + extra_len + + (*do_checksum ? BINLOG_CHECKSUM_LEN : 0); + int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id); + int4store(header + EVENT_LEN_OFFSET, event_len); + int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F); + // TODO: check what problems this may cause and fix them + int4store(header + LOG_POS_OFFSET, 0); + if (packet->append(header, sizeof(header))) + { + *errmsg= "Failed due to out-of-memory writing event"; + return -1; + } + if (*do_checksum) + { + *crc= my_checksum(0L, NULL, 0); + *crc= my_checksum(*crc, (uchar*)header, sizeof(header)); + } + return 0; +} + + +static int +fake_event_footer(String *packet, my_bool do_checksum, ha_checksum crc, const char **errmsg) +{ + if (do_checksum) + { + char b[BINLOG_CHECKSUM_LEN]; + int4store(b, crc); + if (packet->append(b, sizeof(b))) + { + *errmsg= "Failed due to out-of-memory writing event checksum"; + return -1; + } + } + return 0; +} + + +static int +fake_event_write(NET *net, String *packet, const char **errmsg) +{ + if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) + { + *errmsg = "failed on my_net_write()"; + return -1; + } + return 0; +} + + /* fake_rotate_event() builds a fake (=which does not exist physically in any binlog) Rotate event, which contains the name of the binlog we are going to @@ -61,59 +137,71 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name, uint8 checksum_alg_arg) { DBUG_ENTER("fake_rotate_event"); - char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN+100]; - - /* - this Rotate is to be sent with checksum if and only if - slave's get_master_version_and_clock time handshake value - of master's @@global.binlog_checksum was TRUE - */ - - my_bool do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF && - checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF; - - /* - 'when' (the timestamp) is set to 0 so that slave could distinguish between - real and fake Rotate events (if necessary) - */ - memset(header, 0, 4); - header[EVENT_TYPE_OFFSET] = ROTATE_EVENT; - + char buf[ROTATE_HEADER_LEN+100]; + my_bool do_checksum; + int err; char* p = log_file_name+dirname_length(log_file_name); uint ident_len = (uint) strlen(p); - ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN + - (do_checksum ? BINLOG_CHECKSUM_LEN : 0); - int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id); - int4store(header + EVENT_LEN_OFFSET, event_len); - int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F); + ha_checksum crc; - // TODO: check what problems this may cause and fix them - int4store(header + LOG_POS_OFFSET, 0); + if ((err= fake_event_header(packet, ROTATE_EVENT, + ident_len + ROTATE_HEADER_LEN, &do_checksum, &crc, + errmsg, checksum_alg_arg))) + DBUG_RETURN(err); - packet->append(header, sizeof(header)); int8store(buf+R_POS_OFFSET,position); packet->append(buf, ROTATE_HEADER_LEN); packet->append(p, ident_len); if (do_checksum) { - char b[BINLOG_CHECKSUM_LEN]; - ha_checksum crc= my_checksum(0L, NULL, 0); - crc= my_checksum(crc, (uchar*)header, sizeof(header)); crc= my_checksum(crc, (uchar*)buf, ROTATE_HEADER_LEN); crc= my_checksum(crc, (uchar*)p, ident_len); - int4store(b, crc); - packet->append(b, sizeof(b)); } - if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) + if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) || + (err= fake_event_write(net, packet, errmsg))) + DBUG_RETURN(err); + + DBUG_RETURN(0); +} + + +static int fake_gtid_list_event(NET* net, String* packet, + Gtid_list_log_event *glev, const char** errmsg, + uint8 checksum_alg_arg) +{ + my_bool do_checksum; + int err; + ha_checksum crc; + char buf[128]; + String str(buf, sizeof(buf), system_charset_info); + + str.length(0); + if (glev->to_packet(&str)) { - *errmsg = "failed on my_net_write()"; - DBUG_RETURN(-1); + *errmsg= "Failed due to out-of-memory writing Gtid_list event"; + return -1; } - DBUG_RETURN(0); + if ((err= fake_event_header(packet, GTID_LIST_EVENT, + str.length(), &do_checksum, &crc, + errmsg, checksum_alg_arg))) + return err; + + packet->append(str); + if (do_checksum) + { + crc= my_checksum(crc, (uchar*)str.ptr(), str.length()); + } + + if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) || + (err= fake_event_write(net, packet, errmsg))) + return err; + + return 0; } + /* Reset thread transmit packet buffer for event sending @@ -527,6 +615,27 @@ get_slave_connect_state(THD *thd, String *out_str) /* + Get the value of the @slave_until_gtid user variable into the supplied + String (this is the GTID position specified for START SLAVE UNTIL + master_gtid_pos='xxx'). + + Returns false if error (ie. slave did not set the variable and is not doing + START SLAVE UNTIL mater_gtid_pos='xxx'), true if success. +*/ +static bool +get_slave_until_gtid(THD *thd, String *out_str) +{ + bool null_value; + + const LEX_STRING name= { C_STRING_WITH_LEN("slave_until_gtid") }; + user_var_entry *entry= + (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str, + name.length); + return entry && entry->val_str(&null_value, out_str, 0) && !null_value; +} + + +/* Function prepares and sends repliation heartbeat event. @param net net object of THD @@ -773,10 +882,10 @@ contains_all_slave_gtid(slave_connection_state *st, Gtid_list_log_event *glev) static int check_slave_start_position(THD *thd, slave_connection_state *st, - const char **errormsg, rpl_gtid *error_gtid) + const char **errormsg, rpl_gtid *error_gtid, + slave_connection_state *until_gtid_state) { uint32 i; - bool found; int err; rpl_gtid **delete_list= NULL; uint32 delete_idx= 0; @@ -791,9 +900,9 @@ check_slave_start_position(THD *thd, slave_connection_state *st, rpl_gtid master_replication_gtid; rpl_gtid start_gtid; - if ((found= mysql_bin_log.find_in_binlog_state(slave_gtid->domain_id, - slave_gtid->server_id, - &master_gtid)) && + if (mysql_bin_log.find_in_binlog_state(slave_gtid->domain_id, + slave_gtid->server_id, + &master_gtid) && master_gtid.seq_no >= slave_gtid->seq_no) continue; @@ -814,6 +923,7 @@ check_slave_start_position(THD *thd, slave_connection_state *st, slave_gtid->seq_no != master_replication_gtid.seq_no) { rpl_gtid domain_gtid; + rpl_gtid *until_gtid; if (!mysql_bin_log.lookup_domain_in_binlog_state(slave_gtid->domain_id, &domain_gtid)) @@ -832,6 +942,27 @@ check_slave_start_position(THD *thd, slave_connection_state *st, ++missing_domains; continue; } + + if (until_gtid_state && + ( !(until_gtid= until_gtid_state->find(slave_gtid->domain_id)) || + (mysql_bin_log.find_in_binlog_state(until_gtid->domain_id, + until_gtid->server_id, + &master_gtid) && + master_gtid.seq_no >= until_gtid->seq_no))) + { + /* + The slave requested to start from a position that is not (yet) in + our binlog, but it also specified an UNTIL condition that _is_ in + our binlog (or a missing UNTIL, which means stop at the very + beginning). So the stop position is before the start position, and + we just delete the entry from the UNTIL hash to mark that this + domain has already reached the UNTIL condition. + */ + if(until_gtid) + until_gtid_state->remove(until_gtid); + continue; + } + *errormsg= "Requested slave GTID state not found in binlog"; *error_gtid= *slave_gtid; err= ER_GTID_POSITION_NOT_FOUND_IN_BINLOG; @@ -951,7 +1082,8 @@ end: the requested GTID that was already purged. */ static const char * -gtid_find_binlog_file(slave_connection_state *state, char *out_name) +gtid_find_binlog_file(slave_connection_state *state, char *out_name, + slave_connection_state *until_gtid_state) { MEM_ROOT memroot; binlog_file_entry *list; @@ -1003,42 +1135,60 @@ gtid_find_binlog_file(slave_connection_state *state, char *out_name) if (!glev || contains_all_slave_gtid(state, glev)) { - uint32 i; - strmake(out_name, buf, FN_REFLEN); - /* - As a special case, we allow to start from binlog file N if the - requested GTID is the last event (in the corresponding domain) in - binlog file (N-1), but then we need to remove that GTID from the slave - state, rather than skipping events waiting for it to turn up. - */ - for (i= 0; i < glev->count; ++i) + if (glev) { - const rpl_gtid *gtid= state->find(glev->list[i].domain_id); - if (!gtid) - { - /* - contains_all_slave_gtid() returns false if there is any domain in - Gtid_list_event which is not in the requested slave position. + uint32 i; - We may delete a domain from the slave state inside this loop, but - we only do this when it is the very last GTID logged for that - domain in earlier binlogs, and then we can not encounter it in any - further GTIDs in the Gtid_list. - */ - DBUG_ASSERT(0); - continue; - } - if (gtid->server_id == glev->list[i].server_id && - gtid->seq_no == glev->list[i].seq_no) + /* + As a special case, we allow to start from binlog file N if the + requested GTID is the last event (in the corresponding domain) in + binlog file (N-1), but then we need to remove that GTID from the slave + state, rather than skipping events waiting for it to turn up. + + If slave is doing START SLAVE UNTIL, check for any UNTIL conditions + that are already included in a previous binlog file. Delete any such + from the UNTIL hash, to mark that such domains have already reached + their UNTIL condition. + */ + for (i= 0; i < glev->count; ++i) { - /* - The slave requested to start from the very beginning of this - domain in this binlog file. So delete the entry from the state, - we do not need to skip anything. - */ - state->remove(gtid); + const rpl_gtid *gtid= state->find(glev->list[i].domain_id); + if (!gtid) + { + /* + Contains_all_slave_gtid() returns false if there is any domain in + Gtid_list_event which is not in the requested slave position. + + We may delete a domain from the slave state inside this loop, but + we only do this when it is the very last GTID logged for that + domain in earlier binlogs, and then we can not encounter it in any + further GTIDs in the Gtid_list. + */ + DBUG_ASSERT(0); + } else if (gtid->server_id == glev->list[i].server_id && + gtid->seq_no == glev->list[i].seq_no) + { + /* + The slave requested to start from the very beginning of this + domain in this binlog file. So delete the entry from the state, + we do not need to skip anything. + */ + state->remove(gtid); + } + + if (until_gtid_state && + (gtid= until_gtid_state->find(glev->list[i].domain_id)) && + gtid->server_id == glev->list[i].server_id && + gtid->seq_no <= glev->list[i].seq_no) + { + /* + We've already reached the stop position in UNTIL for this domain, + since it is before the start position. + */ + until_gtid_state->remove(gtid); + } } } @@ -1163,6 +1313,7 @@ gtid_state_from_pos(const char *name, uint32 offset, goto end; } status= Gtid_list_log_event::peek(packet.ptr(), packet.length(), + current_checksum_alg, >id_list, &list_len); if (status) { @@ -1256,6 +1407,49 @@ gtid_state_from_binlog_pos(const char *in_name, uint32 pos, String *out_str) } +static bool +is_until_reached(THD *thd, NET *net, String *packet, ulong *ev_offset, + enum_gtid_until_state gtid_until_group, + Log_event_type event_type, uint8 current_checksum_alg, + ushort flags, const char **errmsg, + rpl_binlog_state *until_binlog_state) +{ + switch (gtid_until_group) + { + case GTID_UNTIL_NOT_DONE: + return false; + case GTID_UNTIL_STOP_AFTER_STANDALONE: + if (Log_event::is_part_of_group(event_type)) + return false; + break; + case GTID_UNTIL_STOP_AFTER_TRANSACTION: + if (event_type != XID_EVENT && + (event_type != QUERY_EVENT || + !Query_log_event::peek_is_commit_rollback(packet->ptr()+*ev_offset, + packet->length()-*ev_offset, + current_checksum_alg))) + return false; + break; + } + + /* + The last event group has been sent, now the START SLAVE UNTIL condition + has been reached. + + Send a last fake Gtid_list_log_event with a flag set to mark that we + stop due to UNTIL condition. + */ + if (reset_transmit_packet(thd, flags, ev_offset, errmsg)) + return true; + Gtid_list_log_event glev(until_binlog_state, + Gtid_list_log_event::FLAG_UNTIL_REACHED); + if (fake_gtid_list_event(net, packet, &glev, errmsg, current_checksum_alg)) + return true; + *errmsg= NULL; + return true; +} + + /* Helper function for mysql_binlog_send() to write an event down the slave connection. @@ -1268,37 +1462,113 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, IO_CACHE *log, int mariadb_slave_capability, ulong ev_offset, uint8 current_checksum_alg, bool using_gtid_state, slave_connection_state *gtid_state, - enum_gtid_skip_type *gtid_skip_group) + enum_gtid_skip_type *gtid_skip_group, + slave_connection_state *until_gtid_state, + enum_gtid_until_state *gtid_until_group, + rpl_binlog_state *until_binlog_state) { my_off_t pos; size_t len= packet->length(); + if (event_type == GTID_LIST_EVENT && using_gtid_state && + (gtid_state->count() > 0 || until_gtid_state)) + { + rpl_gtid *gtid_list; + uint32 list_len; + bool err; + + if (ev_offset > len || + Gtid_list_log_event::peek(packet->ptr()+ev_offset, len - ev_offset, + current_checksum_alg, + >id_list, &list_len)) + return "Failed to read Gtid_list_log_event: corrupt binlog"; + err= until_binlog_state->load(gtid_list, list_len); + my_free(gtid_list); + if (err) + return "Failed in internal GTID book-keeping: Out of memory"; + } + /* Skip GTID event groups until we reach slave position within a domain_id. */ - if (event_type == GTID_EVENT && using_gtid_state && gtid_state->count() > 0) + if (event_type == GTID_EVENT && using_gtid_state) { - uint32 server_id, domain_id; - uint64 seq_no; uchar flags2; rpl_gtid *gtid; - if (ev_offset > len || - Gtid_log_event::peek(packet->ptr()+ev_offset, len - ev_offset, - current_checksum_alg, - &domain_id, &server_id, &seq_no, &flags2)) - return "Failed to read Gtid_log_event: corrupt binlog"; - gtid= gtid_state->find(domain_id); - if (gtid != NULL) + if (gtid_state->count() > 0 || until_gtid_state) { - /* Skip this event group if we have not yet reached slave start pos. */ - if (server_id != gtid->server_id || seq_no <= gtid->seq_no) - *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ? - GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION); - /* - Delete this entry if we have reached slave start position (so we will - not skip subsequent events and won't have to look them up and check). - */ - if (server_id == gtid->server_id && seq_no >= gtid->seq_no) - gtid_state->remove(gtid); + rpl_gtid event_gtid; + + if (ev_offset > len || + Gtid_log_event::peek(packet->ptr()+ev_offset, len - ev_offset, + current_checksum_alg, + &event_gtid.domain_id, &event_gtid.server_id, + &event_gtid.seq_no, &flags2)) + return "Failed to read Gtid_log_event: corrupt binlog"; + + if (until_binlog_state->update(&event_gtid)) + return "Failed in internal GTID book-keeping: Out of memory"; + + if (gtid_state->count() > 0) + { + gtid= gtid_state->find(event_gtid.domain_id); + if (gtid != NULL) + { + /* Skip this event group if we have not yet reached slave start pos. */ + if (event_gtid.server_id != gtid->server_id || + event_gtid.seq_no <= gtid->seq_no) + *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ? + GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION); + /* + Delete this entry if we have reached slave start position (so we will + not skip subsequent events and won't have to look them up and check). + */ + if (event_gtid.server_id == gtid->server_id && + event_gtid.seq_no >= gtid->seq_no) + gtid_state->remove(gtid); + } + } + + if (until_gtid_state) + { + gtid= until_gtid_state->find(event_gtid.domain_id); + if (gtid == NULL) + { + /* + This domain already reached the START SLAVE UNTIL stop condition, + so skip this event group. + */ + *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ? + GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION); + } + else if (event_gtid.server_id == gtid->server_id && + event_gtid.seq_no >= gtid->seq_no) + { + /* + We have reached the stop condition. + Delete this domain_id from the hash, so we will skip all further + events in this domain and eventually stop when all domains are + done. + */ + uint64 until_seq_no= gtid->seq_no; + until_gtid_state->remove(gtid); + if (until_gtid_state->count() == 0) + *gtid_until_group= (flags2 & Gtid_log_event::FL_STANDALONE ? + GTID_UNTIL_STOP_AFTER_STANDALONE : + GTID_UNTIL_STOP_AFTER_TRANSACTION); + if (event_gtid.seq_no > until_seq_no) + { + /* + The GTID in START SLAVE UNTIL condition is missing in our binlog. + This should normally not happen (user error), but since we can be + sure that we are now beyond the position that the UNTIL condition + should be in, we can just stop now. And we also need to skip this + event group (as it is beyond the UNTIL condition). + */ + *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ? + GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION); + } + } + } } } @@ -1446,6 +1716,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, return NULL; /* Success */ } + void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags) { @@ -1465,12 +1736,17 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, mysql_mutex_t *log_lock; mysql_cond_t *log_cond; int mariadb_slave_capability; - char str_buf[256]; + char str_buf[128]; String connect_gtid_state(str_buf, sizeof(str_buf), system_charset_info); bool using_gtid_state; - slave_connection_state gtid_state, return_gtid_state; + char str_buf2[128]; + String slave_until_gtid_str(str_buf2, sizeof(str_buf2), system_charset_info); + slave_connection_state gtid_state, until_gtid_state_obj; + slave_connection_state *until_gtid_state= NULL; rpl_gtid error_gtid; enum_gtid_skip_type gtid_skip_group= GTID_SKIP_NOT; + enum_gtid_until_state gtid_until_group= GTID_UNTIL_NOT_DONE; + rpl_binlog_state until_binlog_state; uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF; int old_max_allowed_packet= thd->variables.max_allowed_packet; @@ -1502,6 +1778,10 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, connect_gtid_state.length(0); using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state); DBUG_EXECUTE_IF("simulate_non_gtid_aware_master", using_gtid_state= false;); + if (using_gtid_state && + get_slave_until_gtid(thd, &slave_until_gtid_str)) + until_gtid_state= &until_gtid_state_obj; + /* We want to corrupt the first event, in Log_event::read_log_event(). But we do not want the corruption to happen early, eg. when client does @@ -1557,13 +1837,23 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, my_errno= ER_UNKNOWN_ERROR; goto err; } + if (until_gtid_state && + until_gtid_state->load(slave_until_gtid_str.c_ptr_quick(), + slave_until_gtid_str.length())) + { + errmsg= "Out of memory or malformed slave request when obtaining UNTIL " + "position sent from slave"; + my_errno= ER_UNKNOWN_ERROR; + goto err; + } if ((error= check_slave_start_position(thd, >id_state, &errmsg, - &error_gtid))) + &error_gtid, until_gtid_state))) { my_errno= error; goto err; } - if ((errmsg= gtid_find_binlog_file(>id_state, search_file_name))) + if ((errmsg= gtid_find_binlog_file(>id_state, search_file_name, + until_gtid_state))) { my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; goto err; @@ -1753,6 +2043,15 @@ impossible position"; /* The Format_description_log_event event will be found naturally. */ } + /* + Handle the case of START SLAVE UNTIL with an UNTIL condition already + fulfilled at the start position. + + We will send one event, the format_description, and then stop. + */ + if (until_gtid_state && until_gtid_state->count() == 0) + gtid_until_group= GTID_UNTIL_STOP_AFTER_STANDALONE; + /* seek to the requested position, to start the requested dump */ my_b_seek(&log, pos); // Seek will done on next read @@ -1833,12 +2132,26 @@ impossible position"; log_file_name, &log, mariadb_slave_capability, ev_offset, current_checksum_alg, using_gtid_state, - >id_state, >id_skip_group))) + >id_state, >id_skip_group, + until_gtid_state, >id_until_group, + &until_binlog_state))) { errmsg= tmp_msg; my_errno= ER_UNKNOWN_ERROR; goto err; } + if (until_gtid_state && + is_until_reached(thd, net, packet, &ev_offset, gtid_until_group, + event_type, current_checksum_alg, flags, &errmsg, + &until_binlog_state)) + { + if (errmsg) + { + my_errno= ER_UNKNOWN_ERROR; + goto err; + } + goto end; + } DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid", { @@ -1992,18 +2305,34 @@ impossible position"; goto err; } - if (read_packet && - (tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type, - log_file_name, &log, - mariadb_slave_capability, ev_offset, - current_checksum_alg, - using_gtid_state, >id_state, - >id_skip_group))) + if (read_packet) { - errmsg= tmp_msg; - my_errno= ER_UNKNOWN_ERROR; - goto err; - } + if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type, + log_file_name, &log, + mariadb_slave_capability, ev_offset, + current_checksum_alg, + using_gtid_state, >id_state, + >id_skip_group, until_gtid_state, + >id_until_group, &until_binlog_state))) + { + errmsg= tmp_msg; + my_errno= ER_UNKNOWN_ERROR; + goto err; + } + if ( + until_gtid_state && + is_until_reached(thd, net, packet, &ev_offset, gtid_until_group, + event_type, current_checksum_alg, flags, &errmsg, + &until_binlog_state)) + { + if (errmsg) + { + my_errno= ER_UNKNOWN_ERROR; + goto err; + } + goto end; + } + } log.error=0; } @@ -2167,6 +2496,26 @@ int start_slave(THD* thd , Master_info* mi, bool net_report) lock_slave_threads(mi); // this allows us to cleanly read slave_running // Get a mask of _stopped_ threads init_thread_mask(&thread_mask,mi,1 /* inverse */); + + if (thd->lex->mi.gtid_pos_str.str) + { + if (thread_mask != (SLAVE_IO|SLAVE_SQL)) + { + slave_errno= ER_SLAVE_WAS_RUNNING; + goto err; + } + if (thd->lex->slave_thd_opt) + { + slave_errno= ER_BAD_SLAVE_UNTIL_COND; + goto err; + } + if (!mi->using_gtid) + { + slave_errno= ER_UNTIL_REQUIRES_USING_GTID; + goto err; + } + } + /* Below we will start all stopped threads. But if the user wants to start only one thread, do as if the other thread was running (as we @@ -2213,10 +2562,22 @@ int start_slave(THD* thd , Master_info* mi, bool net_report) strmake(mi->rli.until_log_name, thd->lex->mi.relay_log_name, sizeof(mi->rli.until_log_name)-1); } + else if (thd->lex->mi.gtid_pos_str.str) + { + if (mi->rli.until_gtid_pos.load(thd->lex->mi.gtid_pos_str.str, + thd->lex->mi.gtid_pos_str.length)) + { + slave_errno= ER_INCORRECT_GTID_STATE; + mysql_mutex_unlock(&mi->rli.data_lock); + goto err; + } + mi->rli.until_condition= Relay_log_info::UNTIL_GTID; + } else mi->rli.clear_until_condition(); - if (mi->rli.until_condition != Relay_log_info::UNTIL_NONE) + if (mi->rli.until_condition == Relay_log_info::UNTIL_MASTER_POS || + mi->rli.until_condition == Relay_log_info::UNTIL_RELAY_POS) { /* Preparing members for effective until condition checking */ const char *p= fn_ext(mi->rli.until_log_name); @@ -2239,7 +2600,10 @@ int start_slave(THD* thd , Master_info* mi, bool net_report) /* mark the cached result of the UNTIL comparison as "undefined" */ mi->rli.until_log_names_cmp_result= Relay_log_info::UNTIL_LOG_NAMES_CMP_UNKNOWN; + } + if (mi->rli.until_condition != Relay_log_info::UNTIL_NONE) + { /* Issuing warning then started without --skip-slave-start */ if (!opt_skip_slave_start) push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, @@ -2271,6 +2635,7 @@ int start_slave(THD* thd , Master_info* mi, bool net_report) ER(ER_SLAVE_WAS_RUNNING)); } +err: unlock_slave_threads(mi); if (slave_errno) diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy index 94fdc46647f..e995b410a85 100644 --- a/sql/sql_yacc.yy +++ b/sql/sql_yacc.yy @@ -1095,7 +1095,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize); %token LOW_PRIORITY %token LT /* OPERATOR */ %token MASTER_CONNECT_RETRY_SYM -%token MASTER_USE_GTID_SYM +%token MASTER_GTID_POS_SYM %token MASTER_HOST_SYM %token MASTER_LOG_FILE_SYM %token MASTER_LOG_POS_SYM @@ -1111,6 +1111,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize); %token MASTER_SSL_VERIFY_SERVER_CERT_SYM %token MASTER_SYM %token MASTER_USER_SYM +%token MASTER_USE_GTID_SYM %token MASTER_HEARTBEAT_PERIOD_SYM %token MATCH /* SQL-2003-R */ %token MAX_CONNECTIONS_PER_HOUR @@ -7215,6 +7216,10 @@ slave_until: MYSQL_YYABORT; } } + | UNTIL_SYM MASTER_GTID_POS_SYM EQ TEXT_STRING_sys + { + Lex->mi.gtid_pos_str = $4; + } ; slave_until_opts: @@ -13326,12 +13331,13 @@ keyword_sp: | MAX_ROWS {} | MASTER_SYM {} | MASTER_HEARTBEAT_PERIOD_SYM {} - | MASTER_USE_GTID_SYM {} + | MASTER_GTID_POS_SYM {} | MASTER_HOST_SYM {} | MASTER_PORT_SYM {} | MASTER_LOG_FILE_SYM {} | MASTER_LOG_POS_SYM {} | MASTER_USER_SYM {} + | MASTER_USE_GTID_SYM {} | MASTER_PASSWORD_SYM {} | MASTER_SERVER_ID_SYM {} | MASTER_CONNECT_RETRY_SYM {} |