summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mysql-test/extra/rpl_tests/delayed_slave_wait_on_query.inc39
-rw-r--r--mysql-test/include/check-testcase.test3
-rw-r--r--mysql-test/include/show_delayed_slave_state.inc28
-rw-r--r--mysql-test/include/sync_with_master.inc26
-rw-r--r--mysql-test/suite/rpl/r/rpl_delayed_slave.result172
-rw-r--r--mysql-test/suite/rpl/t/rpl_delayed_slave.test348
-rw-r--r--sql/lex.h1
-rw-r--r--sql/log.cc8
-rw-r--r--sql/log_event.cc7
-rw-r--r--sql/rpl_mi.cc2
-rw-r--r--sql/rpl_rli.cc178
-rw-r--r--sql/rpl_rli.h115
-rw-r--r--sql/slave.cc245
-rw-r--r--sql/slave.h18
-rw-r--r--sql/sql_binlog.cc148
-rw-r--r--sql/sql_lex.cc1
-rw-r--r--sql/sql_lex.h5
-rw-r--r--sql/sql_repl.cc4
-rw-r--r--sql/sql_yacc.yy13
19 files changed, 1150 insertions, 211 deletions
diff --git a/mysql-test/extra/rpl_tests/delayed_slave_wait_on_query.inc b/mysql-test/extra/rpl_tests/delayed_slave_wait_on_query.inc
new file mode 100644
index 00000000000..5d04d14edf3
--- /dev/null
+++ b/mysql-test/extra/rpl_tests/delayed_slave_wait_on_query.inc
@@ -0,0 +1,39 @@
+# ==== Purpose ====
+#
+# Auxiliary file used by rpl_delayed_slave.test. This assumes that an
+# 'INSERT INTO t1...' query has been executed on the master. It does
+# this:
+#
+# - After half the delay, check the status. It should be delaying and
+# the query should not have executed.
+#
+# - After one and a half delay, check the status. It should not be
+# delaying and the query should be executed.
+#
+# ==== Usage ====
+#
+# --source extra/rpl_tests/delayed_slave_wait_on_query.inc
+
+connection master;
+--echo [on slave]
+--let $slave_timeout= $time1
+
+--source include/sync_slave_io_with_master.inc
+--echo # sleep 1*T
+--sleep $time1
+
+--echo # Expect query not executed and status is 'Waiting until MASTER_DELAY...'
+SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
+--source include/show_delayed_slave_state.inc
+
+--echo # sleep 1*T
+--sleep $time1
+
+--echo # sync with master (with timeout 1*T)
+--source include/sync_with_master.inc
+
+--echo # Expect query executed and status is 'Has read all relay log...'
+SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
+--source include/show_delayed_slave_state.inc
+
+--source include/check_slave_is_running.inc
diff --git a/mysql-test/include/check-testcase.test b/mysql-test/include/check-testcase.test
index 083f44ce966..abac3861c66 100644
--- a/mysql-test/include/check-testcase.test
+++ b/mysql-test/include/check-testcase.test
@@ -67,6 +67,9 @@ if ($tmp)
--echo Replicate_Do_Domain_Ids
--echo Replicate_Ignore_Domain_Ids
--echo Parallel_Mode conservative
+ --echo SQL_Delay 0
+ --echo SQL_Remaining_Delay NULL
+ --echo Slave_SQL_Running_State
}
if (!$tmp) {
# Note: after WL#5177, fields 13-18 shall not be filtered-out.
diff --git a/mysql-test/include/show_delayed_slave_state.inc b/mysql-test/include/show_delayed_slave_state.inc
new file mode 100644
index 00000000000..8eb7232a206
--- /dev/null
+++ b/mysql-test/include/show_delayed_slave_state.inc
@@ -0,0 +1,28 @@
+# ==== Purpose ====
+#
+# Display the delay state of the SQL thread.
+#
+# ==== Usage ====
+#
+# --let $verbose_delayed_slave_state= [0|1]
+# --source extra/rpl_tests/show_delayed_slave_state.inc
+#
+# By default, the output is normalized so that it does not depend on
+# exact timing or exact binlog positions. If
+# $verbose_delayed_slave_state is set, then it outputs exact times and
+# binlog positions. This can be useful for debugging.
+
+--let $_delayed_slave_status= query_get_value(SHOW SLAVE STATUS, Slave_SQL_Running_State, 1)
+
+--let $_delayed_slave_remaining_delay= query_get_value(SHOW SLAVE STATUS, SQL_Remaining_Delay, 1)
+--let $_delayed_slave_qualitative_delay= `SELECT CASE WHEN "$_delayed_slave_remaining_delay" = "NULL" THEN "NULL" WHEN "$_delayed_slave_remaining_delay" = "0" THEN "0" ELSE "greater than zero" END`
+
+--let $_delayed_slave_io_pos= query_get_value(SHOW SLAVE STATUS, Read_Master_Log_Pos, 1)
+--let $_delayed_slave_sql_pos= query_get_value(SHOW SLAVE STATUS, Exec_Master_Log_Pos, 1)
+--let $_delayed_slave_qualitative_log_pos= `SELECT IF($_delayed_slave_io_pos > $_delayed_slave_sql_pos, "behind", "in sync with")`
+
+--echo Slave_SQL_Running_State='$_delayed_slave_status'; SQL_Remaining_Delay is $_delayed_slave_qualitative_delay; SQL thread is $_delayed_slave_qualitative_log_pos IO thread
+
+if ($verbose_delayed_slave_state) {
+ --echo SQL_Remaining_Delay='$_delayed_slave_remaining_delay'; Read_master_log_pos='$_delayed_slave_io_pos'; Exec_Master_Log_Pos='$_delayed_slave_sql_pos'
+}
diff --git a/mysql-test/include/sync_with_master.inc b/mysql-test/include/sync_with_master.inc
new file mode 100644
index 00000000000..dcb995a3ca9
--- /dev/null
+++ b/mysql-test/include/sync_with_master.inc
@@ -0,0 +1,26 @@
+# ==== Purpose ====
+#
+# This file does the same as the built-in command sync_with_master,
+# but can be configured to use a custom timeout. This has the benefit
+# that it accepts the same $slave_timeout and $master_connection
+# parameters as wait_for_slave_param.inc
+#
+#
+# ==== Usage ====
+#
+# --connection master
+# --source include/save_master_pos.inc
+# --connection slave
+# --source include/sync_with_master.inc
+#
+# Parameters to this macro are $slave_timeout and
+# $master_connection. See wait_for_slave_param.inc for
+# descriptions.
+
+--let $slave_param= Relay_Master_Log_File
+--let $slave_param_value= $_master_file
+--source include/wait_for_slave_param.inc
+
+--let $slave_param= Exec_Master_Log_Pos
+--let $slave_param_value= $_master_pos
+--source include/wait_for_slave_param.inc
diff --git a/mysql-test/suite/rpl/r/rpl_delayed_slave.result b/mysql-test/suite/rpl/r/rpl_delayed_slave.result
new file mode 100644
index 00000000000..75b263b61e0
--- /dev/null
+++ b/mysql-test/suite/rpl/r/rpl_delayed_slave.result
@@ -0,0 +1,172 @@
+include/master-slave.inc
+[connection master]
+call mtr.add_suppression("Unsafe statement written to the binary log using statement format");
+call mtr.add_suppression("Unsafe statement written to the binary log using statement format");
+[on master]
+CREATE TABLE t1 (a VARCHAR(100), b INT AUTO_INCREMENT PRIMARY KEY);
+==== Normal setup ====
+[on slave]
+include/stop_slave.inc
+# CHANGE MASTER TO MASTER_DELAY = 2*T
+# Checking that delay is what we set it to
+# Expect status to be ''
+SELECT STATE FROM INFORMATION_SCHEMA.PROCESSLIST ORDER BY ID DESC LIMIT 1;
+STATE
+
+include/start_slave.inc
+[on master]
+INSERT INTO t1(a) VALUES ('normal setup');
+[on slave]
+include/sync_slave_io_with_master.inc
+# sleep 1*T
+# Expect query not executed and status is 'Waiting until MASTER_DELAY...'
+SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
+a b
+Slave_SQL_Running_State='Waiting until MASTER_DELAY seconds after master executed event'; SQL_Remaining_Delay is greater than zero; SQL thread is behind IO thread
+# sleep 1*T
+# sync with master (with timeout 1*T)
+include/wait_for_slave_param.inc [Relay_Master_Log_File]
+include/wait_for_slave_param.inc [Exec_Master_Log_Pos]
+# Expect query executed and status is 'Has read all relay log...'
+SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
+a b
+normal setup 1
+Slave_SQL_Running_State='Slave has read all relay log; waiting for the slave I/O thread to update it'; SQL_Remaining_Delay is NULL; SQL thread is in sync with IO thread
+include/check_slave_is_running.inc
+==== Slave lags "naturally" after master ====
+[on master]
+# CREATE FUNCTION delay_on_slave(time_units INT) RETURNS INT BEGIN IF @@GLOBAL.server_id = 2 THEN RETURN SLEEP(time_units * T); ELSE RETURN 0; END IF; END
+INSERT INTO t1(a) SELECT delay_on_slave(3);
+Warnings:
+Note 1592 Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement is unsafe because it uses a system variable that may have a different value on the slave.
+Note 1592 Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement is unsafe because it uses a system function that may return a different value on the slave.
+INSERT INTO t1(a) VALUES ('slave is already lagging: this statement should execute immediately');
+INSERT INTO t1(a) SELECT delay_on_slave(2);
+Warnings:
+Note 1592 Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement is unsafe because it uses a system variable that may have a different value on the slave.
+Note 1592 Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement is unsafe because it uses a system function that may return a different value on the slave.
+[on slave]
+include/sync_slave_io_with_master.inc
+# sleep 1*T
+# Expect no query executed and status is 'Waiting until MASTER_DELAY...'
+SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
+a b
+normal setup 1
+Slave_SQL_Running_State='Waiting until MASTER_DELAY seconds after master executed event'; SQL_Remaining_Delay is greater than zero; SQL thread is behind IO thread
+# wait for first query to execute
+# sleep 1*T
+# Expect second query executed and status is executing third query (i.e., 'User sleep')
+SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
+a b
+slave is already lagging: this statement should execute immediately 3
+Slave_SQL_Running_State='User sleep'; SQL_Remaining_Delay is NULL; SQL thread is behind IO thread
+# sleep 2*T
+# Expect query executed and status is 'Has read all relay log...'
+SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
+a b
+0 4
+Slave_SQL_Running_State='Slave has read all relay log; waiting for the slave I/O thread to update it'; SQL_Remaining_Delay is NULL; SQL thread is in sync with IO thread
+==== Seconds_Behind_Master ====
+# Bring slave to sync.
+include/stop_slave.inc
+CHANGE MASTER TO MASTER_DELAY = 0;
+include/start_slave.inc
+INSERT INTO t1(a) VALUES ('Syncing slave');
+include/stop_slave.inc
+# CHANGE MASTER TO MASTER_DELAY = 2*T
+include/start_slave.inc
+INSERT INTO t1(a) VALUES (delay_on_slave(1));
+Warnings:
+Note 1592 Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement is unsafe because it uses a system variable that may have a different value on the slave.
+Note 1592 Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement is unsafe because it uses a system function that may return a different value on the slave.
+# sleep 1*T
+# sleep 1*T
+==== STOP SLAVE and START SLAVE ====
+include/stop_slave.inc
+# CHANGE MASTER TO MASTER_DELAY = 3*T
+include/start_slave.inc
+# Checking that delay is what we set it to
+[on master]
+INSERT INTO t1(a) VALUES ('stop slave and start slave');
+[on slave]
+# sleep 1*T
+SET @before_stop_slave= UNIX_TIMESTAMP();
+include/stop_slave.inc
+# STOP SLAVE finished in time.
+# Expect query not executed and status is ''
+SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
+a b
+0 6
+Slave_SQL_Running_State=''; SQL_Remaining_Delay is NULL; SQL thread is behind IO thread
+include/start_slave.inc
+# START SLAVE finished in time.
+[on slave]
+include/sync_slave_io_with_master.inc
+# sleep 1*T
+# Expect query not executed and status is 'Waiting until MASTER_DELAY...'
+SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
+a b
+0 6
+Slave_SQL_Running_State='Waiting until MASTER_DELAY seconds after master executed event'; SQL_Remaining_Delay is greater than zero; SQL thread is behind IO thread
+# sleep 1*T
+# sync with master (with timeout 1*T)
+include/wait_for_slave_param.inc [Relay_Master_Log_File]
+include/wait_for_slave_param.inc [Exec_Master_Log_Pos]
+# Expect query executed and status is 'Has read all relay log...'
+SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
+a b
+stop slave and start slave 7
+Slave_SQL_Running_State='Slave has read all relay log; waiting for the slave I/O thread to update it'; SQL_Remaining_Delay is NULL; SQL thread is in sync with IO thread
+include/check_slave_is_running.inc
+==== Change back to no delay ====
+[on slave]
+include/stop_slave.inc
+CHANGE MASTER TO MASTER_DELAY = 0;
+# Expect delay is 0.
+SQL_Delay='0'
+include/start_slave.inc
+[on master]
+INSERT INTO t1(a) VALUES ('change back to no delay');
+[on slave]
+include/sync_slave_io_with_master.inc
+# sleep 1*T
+# Expect query executed and status is 'Has read all relay log...'
+SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
+a b
+change back to no delay 8
+Slave_SQL_Running_State='Slave has read all relay log; waiting for the slave I/O thread to update it'; SQL_Remaining_Delay is NULL; SQL thread is in sync with IO thread
+==== Reset delay with RESET SLAVE ====
+include/stop_slave.inc
+CHANGE MASTER TO MASTER_DELAY = 71;
+include/start_slave.inc
+# Expect delay is 71
+SQL_Delay='71'
+include/stop_slave.inc
+RESET SLAVE;
+[on master]
+RESET MASTER;
+[on slave]
+include/start_slave.inc
+# Expect delay is 0
+SQL_Delay='0'
+==== Set a bad value for the delay ====
+include/stop_slave.inc
+# Expect error for setting negative delay
+CHANGE MASTER TO MASTER_DELAY = -1;
+ERROR 42000: You have an error in your SQL syntax; check the manual that corresponds to your MariaDB server version for the right syntax to use near '-1' at line 1
+# Expect that it's ok to set delay of 2^31-1
+CHANGE MASTER TO MASTER_DELAY = 2147483647;
+# Expect error for setting delay between 2^31 and 2^32-1
+CHANGE MASTER TO MASTER_DELAY = 2147483648;
+ERROR HY000: The requested value 2147483648 for the master delay exceeds the maximum 2147483647
+# Expect error for setting delay to nonsense
+CHANGE MASTER TO MASTER_DELAY = blah;
+ERROR 42000: You have an error in your SQL syntax; check the manual that corresponds to your MariaDB server version for the right syntax to use near 'blah' at line 1
+CHANGE MASTER TO MASTER_DELAY = 0;
+include/start_slave.inc
+==== Clean up ====
+[on master]
+DROP TABLE t1;
+DROP FUNCTION delay_on_slave;
+[on slave]
+include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/t/rpl_delayed_slave.test b/mysql-test/suite/rpl/t/rpl_delayed_slave.test
new file mode 100644
index 00000000000..2fa5c81f5a5
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_delayed_slave.test
@@ -0,0 +1,348 @@
+# ==== Purpose ====
+#
+# Test the time-delayed replication feature, i.e.,
+# CHANGE MASTER TO MASTER_DELAY=X:
+#
+# - Verify that slave has executed the events after but not before the
+# delay timeout.
+#
+# - Verify that delay is correct works when slave is already lagging
+# due to slow queries.
+#
+# - Verify that Seconds_Behind_Master is greater than or equal to the
+# delay if the slave still has unprocessed events in the relay log
+# and more time than the delay has elapsed since the last event was
+# executed on the master.
+#
+# - Verify that STOP SLAVE works instantly even during a delay, and
+# that it does not cause the waited-for event to be executed too
+# early on slave.
+#
+# - Verify that changing back to no delay works.
+#
+# - Verify that RESET SLAVE sets the delay to 0.
+#
+# - Verify that setting a bad value for the delay gives an error.
+#
+# ==== Implementation ====
+#
+# We run the slave with 10 seconds lag.
+#
+# In general, to test that a query has not been executed by the slave
+# before this time, we wait until the slave IO thread has received the
+# event, and then 5 seconds more, and check that the table has not
+# been updated. To test that a query has been executed after this
+# time, we wait 10 seconds more.
+#
+# To simulate that the slave lags due to slow queries, we invoke a
+# stored function that executes SLEEP if @@gloval.server_id==2. This
+# requires that we run with binlog_format=STATEMENT.
+#
+# ==== Related Bugs and Worklogs ====
+#
+# WL#344: Time-delayed replication
+# BUG#28760: Simulating a replication lag
+# [duplicate] BUG#22072: configurable delayed replication
+# [duplicate] BUG#21639: Add Replication Delay parameter
+#
+# ==== Issues with this Test Case ====
+#
+# The test is inherently timing-sensitive (i.e., contains races) and
+# is likely to fail sporadically on a loaded host.
+#
+# The test takes a long time; it sleeps for around 20*10 seconds.
+
+--source include/master-slave.inc
+# Needed so that sleeps get executed in the slave SQL thread.
+--source include/have_binlog_format_statement.inc
+
+
+call mtr.add_suppression("Unsafe statement written to the binary log using statement format");
+--connection slave
+call mtr.add_suppression("Unsafe statement written to the binary log using statement format");
+--connection master
+
+
+# We assume that any simple operation takes zero time, with an error
+# margin of $time1 seconds. Hence, if we run with a delay of $time2
+# seconds, we expect that:
+# - If we execute a query on master and wait $time1 seconds, then the
+# query has been copied to slave but not yet executed.
+# - If we execute a query on master and wait $time3 seconds, then the
+# query has been executed.
+--let $time1= 10
+if (`SELECT '$max_query_execution_time' > 0`) {
+ --let $time1= $max_query_execution_time
+}
+--let $time2= `SELECT 2 * $time1`
+--let $time3= `SELECT 3 * $time1`
+
+
+--echo [on master]
+CREATE TABLE t1 (a VARCHAR(100), b INT AUTO_INCREMENT PRIMARY KEY);
+
+
+--echo ==== Normal setup ====
+
+--echo [on slave]
+--sync_slave_with_master
+
+--source include/stop_slave.inc
+
+--echo # CHANGE MASTER TO MASTER_DELAY = 2*T
+--disable_query_log
+eval CHANGE MASTER TO MASTER_DELAY = $time2;
+--enable_query_log
+
+--echo # Checking that delay is what we set it to
+--let $delay= query_get_value(SHOW SLAVE STATUS, SQL_Delay, 1)
+if (`SELECT $delay != $time2`) {
+ --echo Delay is wrong! Expected $time2, got $delay
+ --source include/show_rpl_debug_info.inc
+ --die wrong delay
+}
+
+--echo # Expect status to be ''
+SELECT STATE FROM INFORMATION_SCHEMA.PROCESSLIST ORDER BY ID DESC LIMIT 1;
+
+--source include/start_slave.inc
+
+--echo [on master]
+--connection master
+INSERT INTO t1(a) VALUES ('normal setup');
+
+--source extra/rpl_tests/delayed_slave_wait_on_query.inc
+
+
+--echo ==== Slave lags "naturally" after master ====
+
+--echo [on master]
+--connection master
+
+--disable_query_log
+--echo # CREATE FUNCTION delay_on_slave(time_units INT) RETURNS INT BEGIN IF @@GLOBAL.server_id = 2 THEN RETURN SLEEP(time_units * T); ELSE RETURN 0; END IF; END
+--eval CREATE FUNCTION delay_on_slave(time_units INT) RETURNS INT BEGIN IF @@GLOBAL.server_id = 2 THEN RETURN SLEEP(time_units * $time1); ELSE RETURN 0; END IF; END
+--enable_query_log
+
+INSERT INTO t1(a) SELECT delay_on_slave(3);
+
+--save_master_pos
+INSERT INTO t1(a) VALUES ('slave is already lagging: this statement should execute immediately');
+INSERT INTO t1(a) SELECT delay_on_slave(2);
+
+--echo [on slave]
+--source include/sync_slave_io_with_master.inc
+--echo # sleep 1*T
+--sleep $time1
+
+--echo # Expect no query executed and status is 'Waiting until MASTER_DELAY...'
+SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
+--source include/show_delayed_slave_state.inc
+
+--echo # wait for first query to execute
+--sync_with_master
+
+--echo # sleep 1*T
+--sleep $time1
+
+--echo # Expect second query executed and status is executing third query (i.e., 'User sleep')
+SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
+--source include/show_delayed_slave_state.inc
+
+--echo # sleep 2*T
+--sleep $time2
+
+--echo # Expect query executed and status is 'Has read all relay log...'
+SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
+--source include/show_delayed_slave_state.inc
+
+
+--echo ==== Seconds_Behind_Master ====
+
+--echo # Bring slave to sync.
+--source include/stop_slave.inc
+eval CHANGE MASTER TO MASTER_DELAY = 0;
+--source include/start_slave.inc
+
+--connection master
+INSERT INTO t1(a) VALUES ('Syncing slave');
+--sync_slave_with_master
+
+--source include/stop_slave.inc
+--echo # CHANGE MASTER TO MASTER_DELAY = 2*T
+--disable_query_log
+eval CHANGE MASTER TO MASTER_DELAY = $time2;
+--enable_query_log
+--source include/start_slave.inc
+
+--connection master
+INSERT INTO t1(a) VALUES (delay_on_slave(1));
+--save_master_pos
+--connection slave
+
+--echo # sleep 1*T
+--sleep $time1
+
+if ($bug_53167_is_fixed) {
+
+--let $seconds_behind_master= query_get_value(SHOW SLAVE STATUS, Seconds_Behind_Master, 1)
+if (`SELECT $seconds_behind_master <= 0 OR $seconds_behind_master >= $time2`) {
+ --echo Seconds_Behind_Master was $seconds_behind_master. Expected that 0 < Seconds_Behind_Master < SQL_Delay = $time2
+ --source include/show_rpl_debug_info.inc
+ --die Seconds_Behind_Master was wrong
+}
+
+}
+
+--echo # sleep 1*T
+--sleep $time1
+
+--let $seconds_behind_master= query_get_value(SHOW SLAVE STATUS, Seconds_Behind_Master, 1)
+if (`SELECT $seconds_behind_master < $time2`) {
+ --echo Seconds_Behind_Master was $seconds_behind_master. Expected it to be >= SQL_Delay = $time2
+ --source include/show_rpl_debug_info.inc
+ --die Seconds_Behind_Master was < SQL_Delay
+}
+
+--sync_with_master
+
+
+--echo ==== STOP SLAVE and START SLAVE ====
+
+# Set up a longer delay.
+--source include/stop_slave.inc
+
+--echo # CHANGE MASTER TO MASTER_DELAY = 3*T
+--disable_query_log
+eval CHANGE MASTER TO MASTER_DELAY = $time3;
+--enable_query_log
+
+--source include/start_slave.inc
+
+--echo # Checking that delay is what we set it to
+--let $delay= query_get_value(SHOW SLAVE STATUS, SQL_Delay, 1)
+if (`SELECT $delay != $time3`) {
+ --echo Delay is wrong! Expected $time2, got $delay
+ --source include/show_rpl_debug_info.inc
+ --die wrong delay
+}
+
+--echo [on master]
+--connection master
+INSERT INTO t1(a) VALUES ('stop slave and start slave');
+
+--echo [on slave]
+--connection slave
+--echo # sleep 1*T
+--sleep $time1
+SET @before_stop_slave= UNIX_TIMESTAMP();
+--source include/stop_slave.inc
+if (`SELECT UNIX_TIMESTAMP() - @before_stop_slave >= $time1`)
+{
+ --source include/show_rpl_debug_info.inc
+ --die STOP SLAVE did not finish in time
+}
+--echo # STOP SLAVE finished in time.
+
+--echo # Expect query not executed and status is ''
+SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
+--source include/show_delayed_slave_state.inc
+
+--source include/start_slave.inc
+if (`SELECT UNIX_TIMESTAMP() - @before_stop_slave >= $time1`)
+{
+ --source include/show_rpl_debug_info.inc
+ --die START SLAVE did not finish in time
+}
+--echo # START SLAVE finished in time.
+
+--source extra/rpl_tests/delayed_slave_wait_on_query.inc
+
+
+--echo ==== Change back to no delay ====
+
+--echo [on slave]
+--connection slave
+--source include/stop_slave.inc
+eval CHANGE MASTER TO MASTER_DELAY = 0;
+
+--echo # Expect delay is 0.
+--let $delay= query_get_value(SHOW SLAVE STATUS, SQL_Delay, 1)
+--echo SQL_Delay='$delay'
+
+--source include/start_slave.inc
+
+--echo [on master]
+--connection master
+INSERT INTO t1(a) VALUES ('change back to no delay');
+
+--echo [on slave]
+--source include/sync_slave_io_with_master.inc
+--echo # sleep 1*T
+--sleep $time1
+
+--echo # Expect query executed and status is 'Has read all relay log...'
+SELECT * FROM t1 ORDER BY b DESC LIMIT 1;
+--source include/show_delayed_slave_state.inc
+
+
+--echo ==== Reset delay with RESET SLAVE ====
+
+--source include/stop_slave.inc
+CHANGE MASTER TO MASTER_DELAY = 71;
+--source include/start_slave.inc
+
+--echo # Expect delay is 71
+--let $delay= query_get_value(SHOW SLAVE STATUS, SQL_Delay, 1)
+--echo SQL_Delay='$delay'
+
+--source include/stop_slave.inc
+RESET SLAVE;
+--echo [on master]
+--connection master
+RESET MASTER;
+--echo [on slave]
+--connection slave
+--source include/start_slave.inc
+
+--echo # Expect delay is 0
+--let $delay= query_get_value(SHOW SLAVE STATUS, SQL_Delay, 1)
+--echo SQL_Delay='$delay'
+
+
+--echo ==== Set a bad value for the delay ====
+
+--source include/stop_slave.inc
+
+--echo # Expect error for setting negative delay
+--error ER_PARSE_ERROR
+CHANGE MASTER TO MASTER_DELAY = -1;
+
+--echo # Expect that it's ok to set delay of 2^31-1
+CHANGE MASTER TO MASTER_DELAY = 2147483647;
+--echo # Expect error for setting delay between 2^31 and 2^32-1
+--error ER_MASTER_DELAY_VALUE_OUT_OF_RANGE
+CHANGE MASTER TO MASTER_DELAY = 2147483648;
+
+--echo # Expect error for setting delay to nonsense
+--error ER_PARSE_ERROR
+CHANGE MASTER TO MASTER_DELAY = blah;
+
+# todo: CHANGE MASTER TO MASTER_DELAY = 999999999999999999999999999
+# should give error
+
+CHANGE MASTER TO MASTER_DELAY = 0;
+--source include/start_slave.inc
+
+
+--echo ==== Clean up ====
+
+--echo [on master]
+--connection master
+DROP TABLE t1;
+DROP FUNCTION delay_on_slave;
+
+--echo [on slave]
+--sync_slave_with_master
+
+--source include/rpl_end.inc
diff --git a/sql/lex.h b/sql/lex.h
index 85bd20a5f37..430e966eb40 100644
--- a/sql/lex.h
+++ b/sql/lex.h
@@ -339,6 +339,7 @@ static SYMBOL symbols[] = {
{ "LOW_PRIORITY", SYM(LOW_PRIORITY)},
{ "MASTER", SYM(MASTER_SYM)},
{ "MASTER_CONNECT_RETRY", SYM(MASTER_CONNECT_RETRY_SYM)},
+ { "MASTER_DELAY", SYM(MASTER_DELAY_SYM)},
{ "MASTER_GTID_POS", SYM(MASTER_GTID_POS_SYM)},
{ "MASTER_HOST", SYM(MASTER_HOST_SYM)},
{ "MASTER_LOG_FILE", SYM(MASTER_LOG_FILE_SYM)},
diff --git a/sql/log.cc b/sql/log.cc
index be24bcd718a..e3d42a4547e 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -4292,6 +4292,10 @@ void MYSQL_BIN_LOG::wait_for_last_checkpoint_event()
relay log.
IMPLEMENTATION
+
+ - You must hold rli->data_lock before calling this function, since
+ it writes group_relay_log_pos and similar fields of
+ Relay_log_info.
- Protects index file with LOCK_index
- Delete relevant relay log files
- Copy all file names after these ones to the front of the index file
@@ -4305,7 +4309,7 @@ void MYSQL_BIN_LOG::wait_for_last_checkpoint_event()
read by the SQL slave thread are deleted).
@note
- - This is only called from the slave-execute thread when it has read
+ - This is only called from the slave SQL thread when it has read
all commands from a relay log and want to switch to a new relay log.
- When this happens, we can be in an active transaction as
a transaction can span over two relay logs
@@ -4336,6 +4340,8 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included)
DBUG_ASSERT(rli->slave_running == MYSQL_SLAVE_RUN_NOT_CONNECT);
DBUG_ASSERT(!strcmp(rli->linfo.log_file_name,rli->event_relay_log_name));
+ mysql_mutex_assert_owner(&rli->data_lock);
+
mysql_mutex_lock(&LOCK_index);
ir= rli->inuse_relaylog_list;
diff --git a/sql/log_event.cc b/sql/log_event.cc
index afa58afc21d..bd8ae984c2d 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -966,6 +966,7 @@ int Log_event::do_update_pos(rpl_group_info *rgi)
Relay_log_info *rli= rgi->rli;
DBUG_ENTER("Log_event::do_update_pos");
+ DBUG_ASSERT(!rli->belongs_to_client());
/*
rli is null when (as far as I (Guilhem) know) the caller is
Load_log_event::do_apply_event *and* that one is called from
@@ -6400,6 +6401,9 @@ bool Rotate_log_event::write()
in a A -> B -> A setup.
The NOTES below is a wrong comment which will disappear when 4.1 is merged.
+ This must only be called from the Slave SQL thread, since it calls
+ flush_relay_log_info().
+
@retval
0 ok
*/
@@ -8222,6 +8226,9 @@ void Stop_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
were we must do this cleaning is in
Start_log_event_v3::do_apply_event(), not here. Because if we come
here, the master was sane.
+
+ This must only be called from the Slave SQL thread, since it calls
+ flush_relay_log_info().
*/
int Stop_log_event::do_update_pos(rpl_group_info *rgi)
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc
index 6048d26998b..867ebc7167a 100644
--- a/sql/rpl_mi.cc
+++ b/sql/rpl_mi.cc
@@ -18,7 +18,7 @@
#include "sql_priv.h"
#include <my_dir.h>
#include "rpl_mi.h"
-#include "slave.h" // SLAVE_MAX_HEARTBEAT_PERIOD
+#include "slave.h"
#include "strfunc.h"
#include "sql_repl.h"
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 31dd24a038d..a7d8f843688 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -28,6 +28,7 @@
#include "rpl_utility.h"
#include "transaction.h"
#include "sql_parse.h" // end_trans, ROLLBACK
+#include "slave.h"
#include <mysql/plugin.h>
#include <mysql/service_thd_wait.h>
@@ -41,30 +42,24 @@ rpl_slave_state *rpl_global_gtid_slave_state;
/* Object used for MASTER_GTID_WAIT(). */
gtid_waiting rpl_global_gtid_waiting;
-
-// Defined in slave.cc
-int init_intvar_from_file(int* var, IO_CACHE* f, int default_val);
-int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
- const char *default_val);
+const char *const Relay_log_info::state_delaying_string = "Waiting until MASTER_DELAY seconds after master executed event";
Relay_log_info::Relay_log_info(bool is_slave_recovery)
:Slave_reporting_capability("SQL"),
- no_storage(FALSE), replicate_same_server_id(::replicate_same_server_id),
+ replicate_same_server_id(::replicate_same_server_id),
info_fd(-1), cur_log_fd(-1), relay_log(&sync_relaylog_period),
sync_counter(0), is_relay_log_recovery(is_slave_recovery),
save_temporary_tables(0), mi(0),
inuse_relaylog_list(0), last_inuse_relaylog(0),
cur_log_old_open_count(0), group_relay_log_pos(0),
event_relay_log_pos(0),
-#if HAVE_valgrind
- is_fake(FALSE),
-#endif
group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0),
abort_pos_wait(0), slave_run_id(0), sql_driver_thd(),
gtid_skip_flag(GTID_SKIP_NOT), inited(0), abort_slave(0), stop_for_until(0),
slave_running(MYSQL_SLAVE_NOT_RUN), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0), executed_entries(0),
+ sql_delay(0), sql_delay_end(0),
m_flags(0)
{
DBUG_ENTER("Relay_log_info::Relay_log_info");
@@ -115,39 +110,55 @@ Relay_log_info::~Relay_log_info()
}
+/**
+ Wrapper around Relay_log_info::init(const char *).
+
+ @todo Remove this and replace all calls to it by calls to
+ Relay_log_info::init(const char *). /SVEN
+*/
int init_relay_log_info(Relay_log_info* rli,
const char* info_fname)
{
+ return rli->init(info_fname);
+}
+
+
+/**
+ Read the relay_log.info file.
+
+ @param info_fname The name of the file to read from.
+ @retval 0 success
+ @retval 1 failure
+*/
+int Relay_log_info::init(const char* info_fname)
+{
char fname[FN_REFLEN+128];
- int info_fd;
const char* msg = 0;
int error = 0;
DBUG_ENTER("init_relay_log_info");
- DBUG_ASSERT(!rli->no_storage); // Don't init if there is no storage
- if (rli->inited) // Set if this function called
+ if (inited) // Set if this function called
DBUG_RETURN(0);
fn_format(fname, info_fname, mysql_data_home, "", 4+32);
- mysql_mutex_lock(&rli->data_lock);
- info_fd = rli->info_fd;
- rli->cur_log_fd = -1;
- rli->slave_skip_counter=0;
- rli->abort_pos_wait=0;
- rli->log_space_limit= relay_log_space_limit;
- rli->log_space_total= 0;
+ mysql_mutex_lock(&data_lock);
+ cur_log_fd = -1;
+ slave_skip_counter=0;
+ abort_pos_wait=0;
+ log_space_limit= relay_log_space_limit;
+ log_space_total= 0;
char pattern[FN_REFLEN];
(void) my_realpath(pattern, slave_load_tmpdir, 0);
if (fn_format(pattern, PREFIX_SQL_LOAD, pattern, "",
MY_SAFE_PATH | MY_RETURN_REAL_PATH) == NullS)
{
- mysql_mutex_unlock(&rli->data_lock);
+ mysql_mutex_unlock(&data_lock);
sql_print_error("Unable to use slave's temporary directory %s",
slave_load_tmpdir);
DBUG_RETURN(1);
}
- unpack_filename(rli->slave_patternload_file, pattern);
- rli->slave_patternload_file_size= strlen(rli->slave_patternload_file);
+ unpack_filename(slave_patternload_file, pattern);
+ slave_patternload_file_size= strlen(slave_patternload_file);
/*
The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE.
@@ -161,7 +172,7 @@ int init_relay_log_info(Relay_log_info* rli,
if (opt_relay_logname &&
opt_relay_logname[strlen(opt_relay_logname) - 1] == FN_LIBCHAR)
{
- mysql_mutex_unlock(&rli->data_lock);
+ mysql_mutex_unlock(&data_lock);
sql_print_error("Path '%s' is a directory name, please specify \
a file name for --relay-log option", opt_relay_logname);
DBUG_RETURN(1);
@@ -173,7 +184,7 @@ a file name for --relay-log option", opt_relay_logname);
opt_relaylog_index_name[strlen(opt_relaylog_index_name) - 1]
== FN_LIBCHAR)
{
- mysql_mutex_unlock(&rli->data_lock);
+ mysql_mutex_unlock(&data_lock);
sql_print_error("Path '%s' is a directory name, please specify \
a file name for --relay-log-index option", opt_relaylog_index_name);
DBUG_RETURN(1);
@@ -182,7 +193,7 @@ a file name for --relay-log-index option", opt_relaylog_index_name);
char buf[FN_REFLEN];
const char *ln;
static bool name_warning_sent= 0;
- ln= rli->relay_log.generate_name(opt_relay_logname, "-relay-bin",
+ ln= relay_log.generate_name(opt_relay_logname, "-relay-bin",
1, buf);
/* We send the warning only at startup, not after every RESET SLAVE */
if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent &&
@@ -205,7 +216,6 @@ a file name for --relay-log-index option", opt_relaylog_index_name);
}
/* For multimaster, add connection name to relay log filenames */
- Master_info* mi= rli->mi;
char buf_relay_logname[FN_REFLEN], buf_relaylog_index_name_buff[FN_REFLEN];
char *buf_relaylog_index_name= opt_relaylog_index_name;
@@ -227,11 +237,11 @@ a file name for --relay-log-index option", opt_relaylog_index_name);
note, that if open() fails, we'll still have index file open
but a destructor will take care of that
*/
- if (rli->relay_log.open_index_file(buf_relaylog_index_name, ln, TRUE) ||
- rli->relay_log.open(ln, LOG_BIN, 0, 0, SEQ_READ_APPEND,
- mi->rli.max_relay_log_size, 1, TRUE))
+ if (relay_log.open_index_file(buf_relaylog_index_name, ln, TRUE) ||
+ relay_log.open(ln, LOG_BIN, 0, 0, SEQ_READ_APPEND,
+ max_relay_log_size, 1, TRUE))
{
- mysql_mutex_unlock(&rli->data_lock);
+ mysql_mutex_unlock(&data_lock);
sql_print_error("Failed when trying to open logs for '%s' in init_relay_log_info(). Error: %M", ln, my_errno);
DBUG_RETURN(1);
}
@@ -254,7 +264,7 @@ file '%s', errno %d)", fname, my_errno);
msg= current_thd->get_stmt_da()->message();
goto err;
}
- if (init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
+ if (init_io_cache(&info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
MYF(MY_WME)))
{
sql_print_error("Failed to create a cache on relay log info file '%s'",
@@ -264,20 +274,19 @@ file '%s', errno %d)", fname, my_errno);
}
/* Init relay log with first entry in the relay index file */
- if (init_relay_log_pos(rli,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
+ if (init_relay_log_pos(this,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
&msg, 0))
{
sql_print_error("Failed to open the relay log 'FIRST' (relay_log_pos 4)");
goto err;
}
- rli->group_master_log_name[0]= 0;
- rli->group_master_log_pos= 0;
- rli->info_fd= info_fd;
+ group_master_log_name[0]= 0;
+ group_master_log_pos= 0;
}
else // file exists
{
if (info_fd >= 0)
- reinit_io_cache(&rli->info_file, READ_CACHE, 0L,0,0);
+ reinit_io_cache(&info_file, READ_CACHE, 0L,0,0);
else
{
int error=0;
@@ -289,7 +298,7 @@ Failed to open the existing relay log info file '%s' (errno %d)",
fname, my_errno);
error= 1;
}
- else if (init_io_cache(&rli->info_file, info_fd,
+ else if (init_io_cache(&info_file, info_fd,
IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME)))
{
sql_print_error("Failed to create a cache on relay log info file '%s'",
@@ -300,24 +309,15 @@ Failed to open the existing relay log info file '%s' (errno %d)",
{
if (info_fd >= 0)
mysql_file_close(info_fd, MYF(0));
- rli->info_fd= -1;
- rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
- mysql_mutex_unlock(&rli->data_lock);
+ info_fd= -1;
+ relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
+ mysql_mutex_unlock(&data_lock);
DBUG_RETURN(1);
}
}
- rli->info_fd = info_fd;
int relay_log_pos, master_log_pos, lines;
char *first_non_digit;
- /*
- In MySQL 5.6, there is a MASTER_DELAY option to CHANGE MASTER. This is
- not yet merged into MariaDB (as of 10.0.13). However, we detect the
- presense of the new option in relay-log.info, as a placeholder for
- possible later merge of the feature, and to maintain file format
- compatibility with MySQL 5.6+.
- */
- int dummy_sql_delay;
/*
Starting from MySQL 5.6.x, relay-log.info has a new format.
@@ -342,25 +342,25 @@ Failed to open the existing relay log info file '%s' (errno %d)",
it is line count and not binlog name (new format) it will be
overwritten by the second row later.
*/
- if (init_strvar_from_file(rli->group_relay_log_name,
- sizeof(rli->group_relay_log_name),
- &rli->info_file, ""))
+ if (init_strvar_from_file(group_relay_log_name,
+ sizeof(group_relay_log_name),
+ &info_file, ""))
{
msg="Error reading slave log configuration";
goto err;
}
- lines= strtoul(rli->group_relay_log_name, &first_non_digit, 10);
+ lines= strtoul(group_relay_log_name, &first_non_digit, 10);
- if (rli->group_relay_log_name[0] != '\0' &&
+ if (group_relay_log_name[0] != '\0' &&
*first_non_digit == '\0' &&
lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY)
{
DBUG_PRINT("info", ("relay_log_info file is in new format."));
/* Seems to be new format => read relay log name from next line */
- if (init_strvar_from_file(rli->group_relay_log_name,
- sizeof(rli->group_relay_log_name),
- &rli->info_file, ""))
+ if (init_strvar_from_file(group_relay_log_name,
+ sizeof(group_relay_log_name),
+ &info_file, ""))
{
msg="Error reading slave log configuration";
goto err;
@@ -370,70 +370,70 @@ Failed to open the existing relay log info file '%s' (errno %d)",
DBUG_PRINT("info", ("relay_log_info file is in old format."));
if (init_intvar_from_file(&relay_log_pos,
- &rli->info_file, BIN_LOG_HEADER_SIZE) ||
- init_strvar_from_file(rli->group_master_log_name,
- sizeof(rli->group_master_log_name),
- &rli->info_file, "") ||
- init_intvar_from_file(&master_log_pos, &rli->info_file, 0) ||
+ &info_file, BIN_LOG_HEADER_SIZE) ||
+ init_strvar_from_file(group_master_log_name,
+ sizeof(group_master_log_name),
+ &info_file, "") ||
+ init_intvar_from_file(&master_log_pos, &info_file, 0) ||
(lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY &&
- init_intvar_from_file(&dummy_sql_delay, &rli->info_file, 0)))
+ init_intvar_from_file(&sql_delay, &info_file, 0)))
{
msg="Error reading slave log configuration";
goto err;
}
- strmake_buf(rli->event_relay_log_name,rli->group_relay_log_name);
- rli->group_relay_log_pos= rli->event_relay_log_pos= relay_log_pos;
- rli->group_master_log_pos= master_log_pos;
+ strmake_buf(event_relay_log_name,group_relay_log_name);
+ group_relay_log_pos= event_relay_log_pos= relay_log_pos;
+ group_master_log_pos= master_log_pos;
- if (rli->is_relay_log_recovery && init_recovery(rli->mi, &msg))
+ if (is_relay_log_recovery && init_recovery(mi, &msg))
goto err;
- rli->relay_log_state.load(rpl_global_gtid_slave_state);
- if (init_relay_log_pos(rli,
- rli->group_relay_log_name,
- rli->group_relay_log_pos,
+ relay_log_state.load(rpl_global_gtid_slave_state);
+ if (init_relay_log_pos(this,
+ group_relay_log_name,
+ group_relay_log_pos,
0 /* no data lock*/,
&msg, 0))
{
sql_print_error("Failed to open the relay log '%s' (relay_log_pos %llu)",
- rli->group_relay_log_name, rli->group_relay_log_pos);
+ group_relay_log_name, group_relay_log_pos);
goto err;
}
}
- DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%llu rli->event_relay_log_pos=%llu",
- my_b_tell(rli->cur_log), rli->event_relay_log_pos));
- DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
- DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
+ DBUG_PRINT("info", ("my_b_tell(cur_log)=%llu event_relay_log_pos=%llu",
+ my_b_tell(cur_log), event_relay_log_pos));
+ DBUG_ASSERT(event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
+ DBUG_ASSERT(my_b_tell(cur_log) == event_relay_log_pos);
/*
Now change the cache from READ to WRITE - must do this
before flush_relay_log_info
*/
- reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1);
- if ((error= flush_relay_log_info(rli)))
+ reinit_io_cache(&info_file, WRITE_CACHE,0L,0,1);
+ if ((error= flush_relay_log_info(this)))
{
msg= "Failed to flush relay log info file";
goto err;
}
- if (count_relay_log_space(rli))
+ if (count_relay_log_space(this))
{
msg="Error counting relay log space";
goto err;
}
- rli->inited= 1;
- mysql_mutex_unlock(&rli->data_lock);
+ inited= 1;
+ mysql_mutex_unlock(&data_lock);
DBUG_RETURN(error);
err:
sql_print_error("%s", msg);
- end_io_cache(&rli->info_file);
+ end_io_cache(&info_file);
if (info_fd >= 0)
mysql_file_close(info_fd, MYF(0));
- rli->info_fd= -1;
- rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
- mysql_mutex_unlock(&rli->data_lock);
+ info_fd= -1;
+ relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
+ mysql_mutex_unlock(&data_lock);
DBUG_RETURN(1);
}
@@ -750,6 +750,8 @@ err:
if (!rli->relay_log.description_event_for_exec->is_valid() && !*errmsg)
*errmsg= "Invalid Format_description log event; could be out of memory";
+ DBUG_PRINT("info", ("Returning %d from init_relay_log_pos", (*errmsg)?1:0));
+
DBUG_RETURN ((*errmsg) ? 1 : 0);
}
@@ -967,8 +969,11 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
{
DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos");
- if (!skip_lock)
+ if (skip_lock)
+ mysql_mutex_assert_owner(&data_lock);
+ else
mysql_mutex_lock(&data_lock);
+
rgi->inc_event_relay_log_pos();
DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu",
(long) log_pos, (long) group_master_log_pos));
@@ -1294,6 +1299,7 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos, THD *thd,
{
DBUG_ENTER("Relay_log_info::stmt_done");
+ DBUG_ASSERT(!belongs_to_client());
DBUG_ASSERT(rgi->rli == this);
/*
If in a transaction, and if the slave supports transactions, just
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 96c3e7c3fac..0c8f16a0d16 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -29,11 +29,6 @@ class Master_info;
class Rpl_filter;
-enum {
- LINES_IN_RELAY_LOG_INFO_WITH_DELAY= 5
-};
-
-
/****************************************************************************
Replication SQL Thread
@@ -78,11 +73,17 @@ public:
};
/*
- If flag set, then rli does not store its state in any info file.
- This is the case only when we execute BINLOG SQL commands inside
- a client, non-replication thread.
+ The SQL thread owns one Relay_log_info, and each client that has
+ executed a BINLOG statement owns one Relay_log_info. This function
+ returns zero for the Relay_log_info object that belongs to the SQL
+ thread and nonzero for Relay_log_info objects that belong to
+ clients.
*/
- bool no_storage;
+ inline bool belongs_to_client()
+ {
+ DBUG_ASSERT(sql_driver_thd);
+ return !sql_driver_thd->slave_thread;
+ }
/*
If true, events with the same server id should be replicated. This
@@ -194,6 +195,11 @@ public:
relay log and finishing (commiting) on another relay log. Case which can
happen when, for example, the relay log gets rotated because of
max_binlog_size.
+
+ Note: group_relay_log_name, group_relay_log_pos must only be
+ written from the thread owning the Relay_log_info (SQL thread if
+ !belongs_to_client(); client thread executing BINLOG statement if
+ belongs_to_client()).
*/
char group_relay_log_name[FN_REFLEN];
ulonglong group_relay_log_pos;
@@ -205,16 +211,17 @@ public:
*/
char future_event_master_log_name[FN_REFLEN];
-#ifdef HAVE_valgrind
- bool is_fake; /* Mark that this is a fake relay log info structure */
-#endif
-
/*
Original log name and position of the group we're currently executing
(whose coordinates are group_relay_log_name/pos in the relay log)
in the master's binlog. These concern the *group*, because in the master's
binlog the log_pos that comes with each event is the position of the
beginning of the group.
+
+ Note: group_master_log_name, group_master_log_pos must only be
+ written from the thread owning the Relay_log_info (SQL thread if
+ !belongs_to_client(); client thread executing BINLOG statement if
+ belongs_to_client()).
*/
char group_master_log_name[FN_REFLEN];
volatile my_off_t group_master_log_pos;
@@ -244,6 +251,15 @@ public:
bool sql_thread_caught_up;
void clear_until_condition();
+ /**
+ Reset the delay.
+ This is used by RESET SLAVE to clear the delay.
+ */
+ void clear_sql_delay()
+ {
+ sql_delay= 0;
+ }
+
/*
Needed for problems when slave stops and we want to restart it
@@ -474,8 +490,72 @@ public:
m_flags&= ~flag;
}
+ /**
+ Text used in THD::proc_info when the slave SQL thread is delaying.
+ */
+ static const char *const state_delaying_string;
+
+ bool flush();
+
+ /**
+ Reads the relay_log.info file.
+ */
+ int init(const char* info_filename);
+
+ /**
+ Indicate that a delay starts.
+
+ This does not actually sleep; it only sets the state of this
+ Relay_log_info object to delaying so that the correct state can be
+ reported by SHOW SLAVE STATUS and SHOW PROCESSLIST.
+
+ Requires rli->data_lock.
+
+ @param delay_end The time when the delay shall end.
+ */
+ void start_sql_delay(time_t delay_end)
+ {
+ mysql_mutex_assert_owner(&data_lock);
+ sql_delay_end= delay_end;
+ thd_proc_info(sql_driver_thd, state_delaying_string);
+ }
+
+ int32 get_sql_delay() { return sql_delay; }
+ void set_sql_delay(time_t _sql_delay) { sql_delay= _sql_delay; }
+ time_t get_sql_delay_end() { return sql_delay_end; }
+
private:
+
+ /**
+ Delay slave SQL thread by this amount, compared to master (in
+ seconds). This is set with CHANGE MASTER TO MASTER_DELAY=X.
+
+ Guarded by data_lock. Initialized by the client thread executing
+ START SLAVE. Written by client threads executing CHANGE MASTER TO
+ MASTER_DELAY=X. Read by SQL thread and by client threads
+ executing SHOW SLAVE STATUS. Note: must not be written while the
+ slave SQL thread is running, since the SQL thread reads it without
+ a lock when executing flush_relay_log_info().
+ */
+ int sql_delay;
+
+ /**
+ During a delay, specifies the point in time when the delay ends.
+
+ This is used for the SQL_Remaining_Delay column in SHOW SLAVE STATUS.
+
+ Guarded by data_lock. Written by the sql thread. Read by client
+ threads executing SHOW SLAVE STATUS.
+ */
+ time_t sql_delay_end;
+
+ /*
+ Before the MASTER_DELAY parameter was added (WL#344),
+ relay_log.info had 4 lines. Now it has 5 lines.
+ */
+ static const int LINES_IN_RELAY_LOG_INFO_WITH_DELAY= 5;
+
/*
Holds the state of the data in the relay log.
We need this to ensure that we are not in the middle of a
@@ -874,7 +954,14 @@ public:
};
-// Defined in rpl_rli.cc
+/**
+ Reads the relay_log.info file.
+
+ @todo This is a wrapper around Relay_log_info::init(). It's only
+ kept for historical reasons. It would be good if we removed this
+ function and replaced all calls to it by calls to
+ Relay_log_info::init(). /SVEN
+*/
int init_relay_log_info(Relay_log_info* rli, const char* info_fname);
diff --git a/sql/slave.cc b/sql/slave.cc
index a55d26b1aa0..d1d9abd4027 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -1634,8 +1634,10 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi)
(master_res= mysql_store_result(mysql)) &&
(master_row= mysql_fetch_row(master_res)))
{
+ mysql_mutex_lock(&mi->data_lock);
mi->clock_diff_with_master=
(long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10));
+ mysql_mutex_unlock(&mi->data_lock);
}
else if (check_io_slave_killed(mi, NULL))
goto slave_killed_err;
@@ -1647,7 +1649,9 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi)
}
else
{
+ mysql_mutex_lock(&mi->data_lock);
mi->clock_diff_with_master= 0; /* The "most sensible" value */
+ mysql_mutex_unlock(&mi->data_lock);
sql_print_warning("\"SELECT UNIX_TIMESTAMP()\" failed on master, "
"do not trust column Seconds_Behind_Master of SHOW "
"SLAVE STATUS. Error: %s (%d)",
@@ -2797,6 +2801,15 @@ void show_master_info_get_fields(THD *thd, List<Item> *field_list,
Item_empty_string(thd, "Parallel_Mode",
sizeof("conservative")-1),
mem_root);
+ field_list->push_back(new (mem_root)
+ Item_return_int(thd, "SQL_Delay", 10,
+ MYSQL_TYPE_LONG));
+ field_list->push_back(new (mem_root)
+ Item_return_int(thd, "SQL_Remaining_Delay", 8,
+ MYSQL_TYPE_LONG));
+ field_list->push_back(new (mem_root)
+ Item_empty_string(thd, "Slave_SQL_Running_State",
+ 20));
if (full)
{
field_list->push_back(new (mem_root)
@@ -2986,6 +2999,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
prot_store_ids(thd, &mi->ignore_server_ids);
// Master_Server_id
protocol->store((uint32) mi->master_id);
+ // SQL_Delay
// Master_Ssl_Crl
protocol->store(mi->ssl_ca, &my_charset_bin);
// Master_Ssl_Crlpath
@@ -3008,6 +3022,22 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
protocol->store(mode_name, strlen(mode_name), &my_charset_bin);
}
+ protocol->store((uint32) mi->rli.get_sql_delay());
+ // SQL_Remaining_Delay
+ // THD::proc_info is not protected by any lock, so we read it once
+ // to ensure that we use the same value throughout this function.
+ const char *slave_sql_running_state=
+ mi->rli.sql_driver_thd ? mi->rli.sql_driver_thd->proc_info : "";
+ if (slave_sql_running_state == Relay_log_info::state_delaying_string)
+ {
+ time_t t= my_time(0), sql_delay_end= mi->rli.get_sql_delay_end();
+ protocol->store((uint32)(t < sql_delay_end ? sql_delay_end - t : 0));
+ }
+ else
+ protocol->store_null();
+ // Slave_SQL_Running_State
+ protocol->store(slave_sql_running_state, &my_charset_bin);
+
if (full)
{
protocol->store((uint32) mi->rli.retried_trans);
@@ -3377,6 +3407,69 @@ has_temporary_error(THD *thd)
}
+/**
+ If this is a lagging slave (specified with CHANGE MASTER TO MASTER_DELAY = X), delays accordingly. Also unlocks rli->data_lock.
+
+ Design note: this is the place to unlock rli->data_lock here since
+ it should be held when reading delay info from rli, but it should
+ not be held while sleeping.
+
+ @param ev Event that is about to be executed.
+
+ @param thd The sql thread's THD object.
+
+ @param rli The sql thread's Relay_log_info structure.
+*/
+static void sql_delay_event(Log_event *ev, THD *thd, rpl_group_info *rgi)
+{
+ Relay_log_info* rli= rgi->rli;
+ long sql_delay= rli->get_sql_delay();
+
+ DBUG_ENTER("sql_delay_event");
+ mysql_mutex_assert_owner(&rli->data_lock);
+ DBUG_ASSERT(!rli->belongs_to_client());
+
+ int type= ev->get_type_code();
+ if (sql_delay && type != ROTATE_EVENT &&
+ type != FORMAT_DESCRIPTION_EVENT && type != START_EVENT_V3)
+ {
+ // The time when we should execute the event.
+ time_t sql_delay_end=
+ ev->when + rli->mi->clock_diff_with_master + sql_delay;
+ // The current time.
+ time_t now= my_time(0);
+ // The time we will have to sleep before executing the event.
+ unsigned long nap_time= 0;
+ if (sql_delay_end > now)
+ nap_time= sql_delay_end - now;
+
+ DBUG_PRINT("info", ("sql_delay= %lu "
+ "ev->when= %lu "
+ "rli->mi->clock_diff_with_master= %lu "
+ "now= %ld "
+ "sql_delay_end= %lu "
+ "nap_time= %ld",
+ sql_delay, (long)ev->when,
+ rli->mi->clock_diff_with_master,
+ (long)now, sql_delay_end, (long)nap_time));
+
+ if (sql_delay_end > now)
+ {
+ DBUG_PRINT("info", ("delaying replication event %lu secs",
+ nap_time));
+ rli->start_sql_delay(sql_delay_end);
+ mysql_mutex_unlock(&rli->data_lock);
+ slave_sleep(thd, nap_time, sql_slave_killed, rgi);
+ DBUG_VOID_RETURN;
+ }
+ }
+
+ mysql_mutex_unlock(&rli->data_lock);
+
+ DBUG_VOID_RETURN;
+}
+
+
/*
First half of apply_event_and_update_pos(), see below.
Setup some THD variables for applying the event.
@@ -3500,16 +3593,16 @@ apply_event_and_update_pos_apply(Log_event* ev, THD* thd, rpl_group_info *rgi,
if (exec_res == 0)
{
int error= ev->update_pos(rgi);
-#ifdef HAVE_valgrind
- if (!rli->is_fake)
-#endif
+ #ifndef DBUG_OFF
+ DBUG_PRINT("info", ("update_pos error = %d", error));
+ if (!rli->belongs_to_client())
{
- DBUG_PRINT("info", ("update_pos error = %d", error));
DBUG_PRINT("info", ("group %llu %s", rli->group_relay_log_pos,
rli->group_relay_log_name));
DBUG_PRINT("info", ("event %llu %s", rli->event_relay_log_pos,
rli->event_relay_log_name));
}
+#endif
/*
The update should not fail, so print an error message and
return an error code.
@@ -3544,21 +3637,39 @@ apply_event_and_update_pos_apply(Log_event* ev, THD* thd, rpl_group_info *rgi,
/**
Applies the given event and advances the relay log position.
- In essence, this function does:
+ This is needed by the sql thread to execute events from the binlog,
+ and by clients executing BINLOG statements. Conceptually, this
+ function does:
@code
ev->apply_event(rli);
ev->update_pos(rli);
@endcode
- But it also does some maintainance, such as skipping events if
- needed and reporting errors.
+ It also does the following maintainance:
- If the @c skip flag is set, then it is tested whether the event
- should be skipped, by looking at the slave_skip_counter and the
- server id. The skip flag should be set when calling this from a
- replication thread but not set when executing an explicit BINLOG
- statement.
+ - Initializes the thread's server_id and time; and the event's
+ thread.
+
+ - If !rli->belongs_to_client() (i.e., if it belongs to the slave
+ sql thread instead of being used for executing BINLOG
+ statements), it does the following things: (1) skips events if it
+ is needed according to the server id or slave_skip_counter; (2)
+ unlocks rli->data_lock; (3) sleeps if required by 'CHANGE MASTER
+ TO MASTER_DELAY=X'; (4) maintains the running state of the sql
+ thread (rli->thread_state).
+
+ - Reports errors as needed.
+
+ @param ev The event to apply.
+
+ @param thd The client thread that executes the event (i.e., the
+ slave sql thread if called from a replication slave, or the client
+ thread if called to execute a BINLOG statement).
+
+ @param rli The relay log info (i.e., the slave's rli if called from
+ a replication slave, or the client's thd->rli_fake if called to
+ execute a BINLOG statement).
@retval 0 OK.
@@ -3581,7 +3692,15 @@ apply_event_and_update_pos(Log_event* ev, THD* thd, rpl_group_info *rgi)
DBUG_ASSERT(rli->slave_skip_counter > 0);
rli->slave_skip_counter--;
}
- mysql_mutex_unlock(&rli->data_lock);
+
+ if (reason == Log_event::EVENT_SKIP_NOT)
+ {
+ // Sleeps if needed, and unlocks rli->data_lock.
+ sql_delay_event(ev, thd, rgi);
+ }
+ else
+ mysql_mutex_unlock(&rli->data_lock);
+
return apply_event_and_update_pos_apply(ev, thd, rgi, reason);
}
@@ -3603,6 +3722,10 @@ apply_event_and_update_pos_for_parallel(Log_event* ev, THD* thd,
driver thread, so 23 should never see EVENT_SKIP_COUNT here.
*/
DBUG_ASSERT(reason != Log_event::EVENT_SKIP_COUNT);
+ /*
+ Calling sql_delay_event() was handled in the SQL driver thread when
+ doing parallel replication.
+ */
return apply_event_and_update_pos_apply(ev, thd, rgi, reason);
}
@@ -3682,7 +3805,8 @@ inline void update_state_of_relay_log(Relay_log_info *rli, Log_event *ev)
/**
- Top-level function for executing the next event from the relay log.
+ Top-level function for executing the next event in the relay log.
+ This is called from the SQL thread.
This function reads the event from the relay log, executes it, and
advances the relay log position. It also handles errors, etc.
@@ -4229,8 +4353,10 @@ connected:
};);
#endif
- // TODO: the assignment below should be under mutex (5.0)
+ mysql_mutex_lock(&mi->run_lock);
mi->slave_running= MYSQL_SLAVE_RUN_CONNECT;
+ mysql_mutex_unlock(&mi->run_lock);
+
thd->slave_net = &mysql->net;
THD_STAGE_INFO(thd, stage_checking_master_version);
ret= get_master_version_and_clock(mysql, mi);
@@ -6582,67 +6708,80 @@ MYSQL *rpl_connect_master(MYSQL *mysql)
}
#endif
-/*
- Store the file and position where the execute-slave thread are in the
+/**
+ Store the file and position where the slave's SQL thread are in the
relay log.
- SYNOPSIS
- flush_relay_log_info()
- rli Relay log information
+ Notes:
- NOTES
- - As this is only called by the slave thread or on STOP SLAVE, with the
- log_lock grabbed and the slave thread stopped, we don't need to have
- a lock here.
- - If there is an active transaction, then we don't update the position
- in the relay log. This is to ensure that we re-execute statements
- if we die in the middle of an transaction that was rolled back.
- - As a transaction never spans binary logs, we don't have to handle the
- case where we do a relay-log-rotation in the middle of the transaction.
- If this would not be the case, we would have to ensure that we
- don't delete the relay log file where the transaction started when
- we switch to a new relay log file.
-
- TODO
- - Change the log file information to a binary format to avoid calling
- longlong2str.
+ - This function should be called either from the slave SQL thread,
+ or when the slave thread is not running. (It reads the
+ group_{relay|master}_log_{pos|name} and delay fields in the rli
+ object. These may only be modified by the slave SQL thread or by
+ a client thread when the slave SQL thread is not running.)
- RETURN VALUES
- 0 ok
- 1 write error
-*/
+ - If there is an active transaction, then we do not update the
+ position in the relay log. This is to ensure that we re-execute
+ statements if we die in the middle of an transaction that was
+ rolled back.
+
+ - As a transaction never spans binary logs, we don't have to handle
+ the case where we do a relay-log-rotation in the middle of the
+ transaction. If transactions could span several binlogs, we would
+ have to ensure that we do not delete the relay log file where the
+ transaction started before switching to a new relay log file.
+
+ - Error can happen if writing to file fails or if flushing the file
+ fails.
+
+ @param rli The object representing the Relay_log_info.
+ @todo Change the log file information to a binary format to avoid
+ calling longlong2str.
+
+ @todo Move the member function into rpl_rli.cc and get rid of the
+ global function. /SVEN
+
+ @return 0 on success, 1 on error.
+*/
bool flush_relay_log_info(Relay_log_info* rli)
{
- bool error=0;
- DBUG_ENTER("flush_relay_log_info");
+ return rli->flush();
+}
- if (unlikely(rli->no_storage))
- DBUG_RETURN(0);
+bool Relay_log_info::flush()
+{
+ bool error=0;
- IO_CACHE *file = &rli->info_file;
- char buff[FN_REFLEN*2+22*2+4], *pos;
+ DBUG_ENTER("Relay_log_info::flush()");
+ IO_CACHE *file = &info_file;
+ // 2*file name, 2*long long, 2*unsigned long, 6*'\n'
+ char buff[FN_REFLEN * 2 + 22 * 2 + 10 * 2 + 6], *pos;
my_b_seek(file, 0L);
- pos=strmov(buff, rli->group_relay_log_name);
+ pos= longlong10_to_str(LINES_IN_RELAY_LOG_INFO_WITH_DELAY, buff, 10);
+ *pos++='\n';
+ pos=strmov(pos, group_relay_log_name);
*pos++='\n';
- pos= longlong10_to_str(rli->group_relay_log_pos, pos, 10);
+ pos=longlong10_to_str(group_relay_log_pos, pos, 10);
*pos++='\n';
- pos=strmov(pos, rli->group_master_log_name);
+ pos=strmov(pos, group_master_log_name);
*pos++='\n';
- pos=longlong10_to_str(rli->group_master_log_pos, pos, 10);
+ pos=longlong10_to_str(group_master_log_pos, pos, 10);
*pos='\n';
+ pos= longlong10_to_str(sql_delay, pos, 10);
+ *pos= '\n';
if (my_b_write(file, (uchar*) buff, (size_t) (pos-buff)+1))
error=1;
if (flush_io_cache(file))
error=1;
if (sync_relayloginfo_period &&
!error &&
- ++(rli->sync_counter) >= sync_relayloginfo_period)
+ ++sync_counter >= sync_relayloginfo_period)
{
- if (my_sync(rli->info_fd, MYF(MY_WME)))
+ if (my_sync(info_fd, MYF(MY_WME)))
error=1;
- rli->sync_counter= 0;
+ sync_counter= 0;
}
/*
Flushing the relay log is done by the slave I/O thread
diff --git a/sql/slave.h b/sql/slave.h
index a78ae4cff6f..d28048af8ea 100644
--- a/sql/slave.h
+++ b/sql/slave.h
@@ -18,6 +18,14 @@
#define SLAVE_H
/**
+ MASTER_DELAY can be at most (1 << 31) - 1.
+*/
+#define MASTER_DELAY_MAX (0x7FFFFFFF)
+#if INT_MAX < 0x7FFFFFFF
+#error "don't support platforms where INT_MAX < 0x7FFFFFFF"
+#endif
+
+/**
@defgroup Replication Replication
@{
@@ -102,12 +110,14 @@ int init_dynarray_intvar_from_file(DYNAMIC_ARRAY* arr, IO_CACHE* f);
In Master_info: run_lock, data_lock
run_lock protects all information about the run state: slave_running, thd
- and the existence of the I/O thread to stop/start it, you need this mutex).
+ and the existence of the I/O thread (to stop/start it, you need this mutex).
data_lock protects some moving members of the struct: counters (log name,
position) and relay log (MYSQL_BIN_LOG object).
In Relay_log_info: run_lock, data_lock
see Master_info
+ However, note that run_lock does not protect
+ Relay_log_info.run_state; that is protected by data_lock.
Order of acquisition: if you want to have LOCK_active_mi and a run_lock, you
must acquire LOCK_active_mi first.
@@ -247,6 +257,12 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd,
int apply_event_and_update_pos_for_parallel(Log_event* ev, THD* thd,
struct rpl_group_info *rgi);
+int init_intvar_from_file(int* var, IO_CACHE* f, int default_val);
+int init_floatvar_from_file(float* var, IO_CACHE* f, float default_val);
+int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
+ const char *default_val);
+int init_dynarray_intvar_from_file(DYNAMIC_ARRAY* arr, IO_CACHE* f);
+
pthread_handler_t handle_slave_io(void *arg);
void slave_output_error_info(rpl_group_info *rgi, THD *thd);
pthread_handler_t handle_slave_sql(void *arg);
diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc
index f0465cdf5bf..a7beb42c315 100644
--- a/sql/sql_binlog.cc
+++ b/sql/sql_binlog.cc
@@ -17,18 +17,96 @@
#include <my_global.h>
#include "sql_priv.h"
#include "sql_binlog.h"
-#include "sql_parse.h" // check_global_access
-#include "sql_acl.h" // *_ACL
+#include "sql_parse.h"
+#include "sql_acl.h"
#include "rpl_rli.h"
#include "base64.h"
-#include "slave.h" // apply_event_and_update_pos
-#include "log_event.h" // Format_description_log_event,
- // EVENT_LEN_OFFSET,
- // EVENT_TYPE_OFFSET,
- // FORMAT_DESCRIPTION_LOG_EVENT,
- // START_EVENT_V3,
- // Log_event_type,
- // Log_event
+#include "slave.h"
+#include "log_event.h"
+
+
+/**
+ Check if the event type is allowed in a BINLOG statement.
+
+ @retval 0 if the event type is ok.
+ @retval 1 if the event type is not ok.
+*/
+static int check_event_type(int type, Relay_log_info *rli)
+{
+ Format_description_log_event *fd_event=
+ rli->relay_log.description_event_for_exec;
+
+ /*
+ Convert event type id of certain old versions (see comment in
+ Format_description_log_event::Format_description_log_event(char*,...)).
+ */
+ if (fd_event && fd_event->event_type_permutation)
+ {
+ IF_DBUG({
+ int new_type= fd_event->event_type_permutation[type];
+ DBUG_PRINT("info",
+ ("converting event type %d to %d (%s)",
+ type, new_type,
+ Log_event::get_type_str((Log_event_type)new_type)));
+ },
+ (void)0);
+ type= fd_event->event_type_permutation[type];
+ }
+
+ switch (type)
+ {
+ case START_EVENT_V3:
+ case FORMAT_DESCRIPTION_EVENT:
+ /*
+ We need a preliminary FD event in order to parse the FD event,
+ if we don't already have one.
+ */
+ if (!fd_event)
+ if (!(rli->relay_log.description_event_for_exec=
+ new Format_description_log_event(4)))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), 1);
+ return 1;
+ }
+
+ /* It is always allowed to execute FD events. */
+ return 0;
+
+ case TABLE_MAP_EVENT:
+ case WRITE_ROWS_EVENT:
+ case UPDATE_ROWS_EVENT:
+ case DELETE_ROWS_EVENT:
+ case PRE_GA_WRITE_ROWS_EVENT:
+ case PRE_GA_UPDATE_ROWS_EVENT:
+ case PRE_GA_DELETE_ROWS_EVENT:
+ /*
+ Row events are only allowed if a Format_description_event has
+ already been seen.
+ */
+ if (fd_event)
+ return 0;
+ else
+ {
+ my_error(ER_NO_FORMAT_DESCRIPTION_EVENT_BEFORE_BINLOG_STATEMENT,
+ MYF(0), Log_event::get_type_str((Log_event_type)type));
+ return 1;
+ }
+ break;
+
+ default:
+ /*
+ It is not meaningful to execute other events than row-events and
+ FD events. It would even be dangerous to execute Stop_log_event
+ and Rotate_log_event since they call flush_relay_log_info, which
+ is not allowed to call by other threads than the slave SQL
+ thread when the slave SQL thread is running.
+ */
+ my_error(ER_ONLY_FD_AND_RBR_EVENTS_ALLOWED_IN_BINLOG_STATEMENT,
+ MYF(0), Log_event::get_type_str((Log_event_type)type));
+ return 1;
+ }
+}
+
/**
Execute a BINLOG statement.
@@ -73,31 +151,13 @@ void mysql_client_binlog_statement(THD* thd)
Allocation
*/
- /*
- If we do not have a Format_description_event, we create a dummy
- one here. In this case, the first event we read must be a
- Format_description_event.
- */
- my_bool have_fd_event= TRUE;
int err;
Relay_log_info *rli;
rpl_group_info *rgi;
rli= thd->rli_fake;
- if (!rli)
- {
- rli= thd->rli_fake= new Relay_log_info(FALSE);
-#ifdef HAVE_valgrind
- rli->is_fake= TRUE;
-#endif
- have_fd_event= FALSE;
- }
- if (rli && !rli->relay_log.description_event_for_exec)
- {
- rli->relay_log.description_event_for_exec=
- new Format_description_log_event(4);
- have_fd_event= FALSE;
- }
+ if (!rli && (rli= thd->rli_fake= new Relay_log_info(FALSE)))
+ rli->sql_driver_thd= thd;
if (!(rgi= thd->rgi_fake))
rgi= thd->rgi_fake= new rpl_group_info(rli);
rgi->thd= thd;
@@ -109,16 +169,13 @@ void mysql_client_binlog_statement(THD* thd)
/*
Out of memory check
*/
- if (!(rli &&
- rli->relay_log.description_event_for_exec &&
- buf))
+ if (!(rli && buf))
{
my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1); /* needed 1 bytes */
goto end;
}
- rli->sql_driver_thd= thd;
- rli->no_storage= TRUE;
+ DBUG_ASSERT(rli->belongs_to_client());
for (char const *strptr= thd->lex->comment.str ;
strptr < thd->lex->comment.str + thd->lex->comment.length ; )
@@ -185,23 +242,8 @@ void mysql_client_binlog_statement(THD* thd)
DBUG_PRINT("info", ("event_len=%lu, bytes_decoded=%d",
event_len, bytes_decoded));
- /*
- If we have not seen any Format_description_event, then we must
- see one; it is the only statement that can be read in base64
- without a prior Format_description_event.
- */
- if (!have_fd_event)
- {
- int type = (uchar)bufptr[EVENT_TYPE_OFFSET];
- if (type == FORMAT_DESCRIPTION_EVENT || type == START_EVENT_V3)
- have_fd_event= TRUE;
- else
- {
- my_error(ER_NO_FORMAT_DESCRIPTION_EVENT_BEFORE_BINLOG_STATEMENT,
- MYF(0), Log_event::get_type_str((Log_event_type)type));
- goto end;
- }
- }
+ if (check_event_type(bufptr[EVENT_TYPE_OFFSET], rli))
+ goto end;
ev= Log_event::read_log_event(bufptr, event_len, &error,
rli->relay_log.description_event_for_exec,
@@ -212,7 +254,7 @@ void mysql_client_binlog_statement(THD* thd)
{
/*
This could actually be an out-of-memory, but it is more likely
- causes by a bad statement
+ caused by a bad statement
*/
my_error(ER_SYNTAX_ERROR, MYF(0));
goto end;
diff --git a/sql/sql_lex.cc b/sql/sql_lex.cc
index b62838f46f2..15cf9a39f90 100644
--- a/sql/sql_lex.cc
+++ b/sql/sql_lex.cc
@@ -4831,4 +4831,3 @@ void binlog_unsafe_map_init()
BINLOG_DIRECT_OFF & TRX_CACHE_NOT_EMPTY);
}
#endif
-
diff --git a/sql/sql_lex.h b/sql/sql_lex.h
index 6ec112cc038..626beaa9894 100644
--- a/sql/sql_lex.h
+++ b/sql/sql_lex.h
@@ -240,11 +240,12 @@ struct LEX_MASTER_INFO
ulong server_id;
uint port, connect_retry;
float heartbeat_period;
+ int sql_delay;
/*
Enum is used for making it possible to detect if the user
changed variable or if it should be left at old value
*/
- enum {LEX_MI_UNCHANGED, LEX_MI_DISABLE, LEX_MI_ENABLE}
+ enum {LEX_MI_UNCHANGED= 0, LEX_MI_DISABLE, LEX_MI_ENABLE}
ssl, ssl_verify_server_cert, heartbeat_opt, repl_ignore_server_ids_opt,
repl_do_domain_ids_opt, repl_ignore_domain_ids_opt;
enum {
@@ -260,6 +261,7 @@ struct LEX_MASTER_INFO
sizeof(ulong), 0, 16, MYF(0));
my_init_dynamic_array(&repl_ignore_domain_ids,
sizeof(ulong), 0, 16, MYF(0));
+ sql_delay= -1;
}
void reset(bool is_change_master)
{
@@ -280,6 +282,7 @@ struct LEX_MASTER_INFO
repl_ignore_domain_ids_opt= LEX_MI_UNCHANGED;
gtid_pos_str= null_lex_str;
use_gtid_opt= LEX_GTID_UNCHANGED;
+ sql_delay= -1;
}
};
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 572d08399d5..8bea280b820 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -3319,6 +3319,7 @@ int reset_slave(THD *thd, Master_info* mi)
mi->clear_error();
mi->rli.clear_error();
mi->rli.clear_until_condition();
+ mi->rli.clear_sql_delay();
mi->rli.slave_skip_counter= 0;
// close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
@@ -3630,6 +3631,9 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
+ if (lex_mi->sql_delay != -1)
+ mi->rli.set_sql_delay(lex_mi->sql_delay);
+
if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
mi->ssl_verify_server_cert=
(lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE);
diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy
index 9131b99d95e..3b9a06d5b38 100644
--- a/sql/sql_yacc.yy
+++ b/sql/sql_yacc.yy
@@ -1341,6 +1341,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize);
%token LOOP_SYM
%token LOW_PRIORITY
%token MASTER_CONNECT_RETRY_SYM
+%token MASTER_DELAY_SYM
%token MASTER_GTID_POS_SYM
%token MASTER_HOST_SYM
%token MASTER_LOG_FILE_SYM
@@ -2255,6 +2256,16 @@ master_def:
{
Lex->mi.connect_retry = $3;
}
+ | MASTER_DELAY_SYM '=' ulong_num
+ {
+ if ($3 > MASTER_DELAY_MAX)
+ {
+ my_error(ER_MASTER_DELAY_VALUE_OUT_OF_RANGE, MYF(0),
+ $3, MASTER_DELAY_MAX);
+ }
+ else
+ Lex->mi.sql_delay = $3;
+ }
| MASTER_SSL_SYM '=' ulong_num
{
Lex->mi.ssl= $3 ?
@@ -7660,6 +7671,7 @@ slave:
LEX *lex=Lex;
lex->sql_command = SQLCOM_SLAVE_ALL_START;
lex->type = 0;
+ /* If you change this code don't forget to update STOP SLAVE too */
}
{}
| STOP_SYM SLAVE optional_connection_name slave_thread_opts
@@ -14099,6 +14111,7 @@ keyword_sp:
| MASTER_PASSWORD_SYM {}
| MASTER_SERVER_ID_SYM {}
| MASTER_CONNECT_RETRY_SYM {}
+ | MASTER_DELAY_SYM {}
| MASTER_SSL_SYM {}
| MASTER_SSL_CA_SYM {}
| MASTER_SSL_CAPATH_SYM {}