diff options
-rw-r--r-- | client/mysqlbinlog.cc | 294 | ||||
-rw-r--r-- | man/mysqlbinlog.1 | 19 | ||||
-rw-r--r-- | mysql-test/suite/binlog/include/mysqlbinlog_gtid_window_test_cases.inc | 242 | ||||
-rw-r--r-- | mysql-test/suite/binlog/t/binlog_mysqlbinlog_gtid_window-master.opt | 1 | ||||
-rw-r--r-- | mysql-test/suite/binlog/t/binlog_mysqlbinlog_gtid_window.test | 124 | ||||
-rw-r--r-- | sql/rpl_gtid.cc | 422 | ||||
-rw-r--r-- | sql/rpl_gtid.h | 253 | ||||
-rw-r--r-- | sql/sql_parse.cc | 36 | ||||
-rw-r--r-- | sql/sql_repl.cc | 48 | ||||
-rw-r--r-- | sql/sql_repl.h | 3 |
10 files changed, 1400 insertions, 42 deletions
diff --git a/client/mysqlbinlog.cc b/client/mysqlbinlog.cc index 3d9b9712ec8..2988dcf22a9 100644 --- a/client/mysqlbinlog.cc +++ b/client/mysqlbinlog.cc @@ -143,10 +143,18 @@ static char *charset= 0; static uint verbose= 0; -static ulonglong start_position, stop_position; +static char *start_pos_str, *stop_pos_str; +static ulonglong start_position= BIN_LOG_HEADER_SIZE, + stop_position= (longlong)(~(my_off_t)0) ; #define start_position_mot ((my_off_t)start_position) #define stop_position_mot ((my_off_t)stop_position) +static Domain_gtid_event_filter *domain_gtid_filter= NULL; +static rpl_gtid *start_gtids, *stop_gtids; +static my_bool is_event_group_active= FALSE; +static uint32 n_start_gtid_ranges= 0; +static uint32 n_stop_gtid_ranges= 0; + static char *start_datetime_str, *stop_datetime_str; static my_time_t start_datetime= 0, stop_datetime= MY_TIME_T_MAX; static ulonglong rec_count= 0; @@ -981,6 +989,10 @@ static bool print_row_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, return result; } +static inline my_bool is_gtid_filtering_enabled() +{ + return domain_gtid_filter != NULL; +} /** Print the given event, and either delete it or delegate the deletion @@ -1019,11 +1031,29 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, #endif /* + If the binlog output should be filtered using GTIDs, test the new event + group to see if its events should be written or ignored. + */ + if (ev_type == GTID_EVENT && is_gtid_filtering_enabled()) + { + Gtid_log_event *gle= (Gtid_log_event*) ev; + rpl_gtid gtid; + gtid.domain_id= gle->domain_id; + gtid.server_id= gle->server_id; + gtid.seq_no= gle->seq_no; + if (!domain_gtid_filter->exclude(>id)) + is_event_group_active= TRUE; + else + is_event_group_active= FALSE; + } + + /* Format events are not concerned by --offset and such, we always need to read them to be able to process the wanted events. */ if (((rec_count >= offset) && - (ev->when >= start_datetime)) || + (ev->when >= start_datetime) && + (!is_gtid_filtering_enabled() || is_event_group_active)) || (ev_type == FORMAT_DESCRIPTION_EVENT)) { if (ev_type != FORMAT_DESCRIPTION_EVENT) @@ -1500,6 +1530,17 @@ end: } } + /* + Xid_log_events or commit Query_log_events mark the end of a GTID event + group. + */ + if ((ev_type == XID_EVENT || + (ev_type == QUERY_EVENT && ((Query_log_event *) ev)->is_commit())) && + is_event_group_active) + { + is_event_group_active= FALSE; + } + if (destroy_evt) /* destroy it later if not set (ignored table map) */ delete ev; } @@ -1658,15 +1699,15 @@ static struct my_option my_options[] = &start_datetime_str, &start_datetime_str, 0, GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, {"start-position", 'j', - "Start reading the binlog at position N. Applies to the first binlog " - "passed on the command line.", - &start_position, &start_position, 0, GET_ULL, - REQUIRED_ARG, BIN_LOG_HEADER_SIZE, BIN_LOG_HEADER_SIZE, - /* - COM_BINLOG_DUMP accepts only 4 bytes for the position - so remote log reading has lower limit. - */ - (ulonglong)(0xffffffffffffffffULL), 0, 0, 0}, + "Start reading the binlog at this position. Type can either be a positive " + "integer or a GTID list. When using a positive integer, the value only " + "applies to the first binlog passed on the command line. In GTID mode, " + "multiple GTIDs can be passed as a comma separated list, where each must " + "have a unique domain id. Each GTID is exclusive; only events after a " + "given sequence number will be printed to allow users to receive events " + "after their current state, e.g. on a slave.", + &start_pos_str, &start_pos_str, 0, GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, + 0, 0, 0}, {"stop-datetime", OPT_STOP_DATETIME, "Stop reading the binlog at first event having a datetime equal or " "posterior to the argument; the argument must be a date and time " @@ -1684,11 +1725,14 @@ static struct my_option my_options[] = &opt_stop_never_slave_server_id, &opt_stop_never_slave_server_id, 0, GET_ULONG, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, {"stop-position", OPT_STOP_POSITION, - "Stop reading the binlog at position N. Applies to the last binlog " - "passed on the command line.", - &stop_position, &stop_position, 0, GET_ULL, - REQUIRED_ARG, (longlong)(~(my_off_t)0), BIN_LOG_HEADER_SIZE, - (ulonglong)(~(my_off_t)0), 0, 0, 0}, + "Stop reading the binlog at this position. Type can either be a positive " + "integer or a GTID list. When using a positive integer, the value only " + "applies to the last binlog passed on the command line. In GTID mode, " + "multiple GTIDs can be passed as a comma separated list, where each must " + "have a unique domain id. Each GTID is inclusive; only events up to the " + "given sequence numbers are printed.", + &stop_pos_str, &stop_pos_str, 0, GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, + 0, 0}, {"table", 'T', "List entries for just this table (local log only).", &table, &table, 0, GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, @@ -1824,8 +1868,14 @@ static void cleanup() my_free(const_cast<char*>(dirname_for_local_load)); my_free(start_datetime_str); my_free(stop_datetime_str); + my_free(start_pos_str); + my_free(stop_pos_str); + my_free(start_gtids); + my_free(stop_gtids); free_root(&glob_root, MYF(0)); + delete domain_gtid_filter; + delete binlog_filter; delete glob_description_event; if (mysql) @@ -2075,6 +2125,103 @@ get_one_option(const struct my_option *opt, const char *argument, const char *fi print_version(); opt_version= 1; break; + case OPT_STOP_POSITION: + { + stop_gtids= gtid_parse_string_to_list(stop_pos_str, strlen(stop_pos_str), + &n_stop_gtid_ranges); + if (stop_gtids == NULL) + { + int err= 0; + char *end_ptr= NULL; + /* + No GTIDs specified in OPT_STOP_POSITION specification. Treat the value + as a singular index. + */ + stop_position= my_strtoll10(stop_pos_str, &end_ptr, &err); + + if (err || *end_ptr) + { + // Can't parse the position from the user + sql_print_error("Stop position argument value is invalid. Should be " + "either a positive integer or GTID."); + return 1; + } + } + else if (n_stop_gtid_ranges > 0) + { + uint32 gtid_idx; + + if (domain_gtid_filter == NULL) + domain_gtid_filter= new Domain_gtid_event_filter(); + + for (gtid_idx = 0; gtid_idx < n_stop_gtid_ranges; gtid_idx++) + { + rpl_gtid *stop_gtid= &stop_gtids[gtid_idx]; + if (domain_gtid_filter->add_stop_gtid(stop_gtid)) + { + sql_print_error( + "Domain id is invalid for GTID stop position %u-%u-%llu", + stop_gtid->domain_id, stop_gtid->server_id, + stop_gtid->seq_no); + return 1; + } + } + } + else + { + DBUG_ASSERT(0); + } + break; + } + case 'j': + { + start_gtids= gtid_parse_string_to_list( + start_pos_str, strlen(start_pos_str), &n_start_gtid_ranges); + + if (start_gtids == NULL) + { + int err= 0; + char *end_ptr= NULL; + /* + No GTIDs specified in OPT_START_POSITION specification. Treat the value + as a singular index. + */ + start_position= my_strtoll10(start_pos_str, &end_ptr, &err); + + if (err || *end_ptr) + { + // Can't parse the position from the user + sql_print_error("Start position argument value is invalid. Should be " + "either a positive integer or GTID."); + return 1; + } + } + else if (n_start_gtid_ranges > 0) + { + uint32 gtid_idx; + + if (domain_gtid_filter == NULL) + domain_gtid_filter= new Domain_gtid_event_filter(); + + for (gtid_idx = 0; gtid_idx < n_start_gtid_ranges; gtid_idx++) + { + rpl_gtid *start_gtid= &start_gtids[gtid_idx]; + if (domain_gtid_filter->add_start_gtid(start_gtid)) + { + sql_print_error( + "Domain id is invalid for GTID start position %u-%u-%llu", + start_gtid->domain_id, start_gtid->server_id, + start_gtid->seq_no); + return 1; + } + } + } + else + { + DBUG_ASSERT(0); + } + break; + } case '?': usage(); opt_version= 1; @@ -2237,11 +2384,16 @@ static Exit_status dump_log_entries(const char* logname) @retval ERROR_STOP An error occurred - the program should terminate. @retval OK_CONTINUE No error, the program should continue. */ -static Exit_status check_master_version() +static Exit_status check_master_version(uint *major, uint *minor, uint *patch, + my_bool *version_extracted) { MYSQL_RES* res = 0; MYSQL_ROW row; uint version; + size_t version_iter; + + // Assume no problems extracting the master version + *version_extracted= TRUE; if (mysql_query(mysql, "SELECT VERSION()") || !(res = mysql_store_result(mysql))) @@ -2257,12 +2409,52 @@ static Exit_status check_master_version() goto err; } - if (!(version = atoi(row[0]))) + if (!(*major = atoi(row[0]))) { error("Could not find server version: " "Master reported NULL for the version."); goto err; } + + /* Try to save the minor version, if supplied with a storage location */ + if (minor != NULL || patch != NULL) + { + for (version_iter= 0; version_iter < strlen((const char *) row); + version_iter++) + { + if (row[0][version_iter] == '.') + { + version_iter= version_iter + 1; + break; + } + } + if (minor != NULL && !(*minor= atoi(&row[0][version_iter]))) + { + warning("Could not extract minor version from MariaDB master version %s", row[0]); + *version_extracted= FALSE; + } + } + + /* Try to save the patch version, if supplied with a storage location */ + if (patch != NULL) + { + for (; version_iter < strlen((const char *) row); version_iter++) + { + if (row[0][version_iter] == '.') + { + version_iter= version_iter + 1; + break; + } + } + if (!(*patch= atoi(&row[0][version_iter]))) + { + warning("Could not extract patch version from MariaDB master version %s", row[0]); + *version_extracted= FALSE; + } + } + + version= *major; + /* Make a notice to the server that this client is checksum-aware. It does not need the first fake Rotate @@ -2606,21 +2798,63 @@ static Exit_status dump_remote_log_entries(PRINT_EVENT_INFO *print_event_info, DBUG_RETURN(retval); net= &mysql->net; - if ((retval= check_master_version()) != OK_CONTINUE) + uint major_version, minor_version; + my_bool was_version_extracted= TRUE; // Assume true to start + if ((retval= check_master_version(&major_version, &minor_version, NULL, + &was_version_extracted)) != OK_CONTINUE) DBUG_RETURN(retval); /* COM_BINLOG_DUMP accepts only 4 bytes for the position, so we are forced to cast to uint32. */ - DBUG_ASSERT(start_position <= UINT_MAX32); - int4store(buf, (uint32)start_position); + size_t buf_idx= 0; + if (is_gtid_filtering_enabled() && n_start_gtid_ranges > 0) + { + if (was_version_extracted && + (major_version < 10 || (major_version == 10 && minor_version < 7))) + { + error("Master does not support GTID filtering. This feature was added " + "in MariaDB version 10.7."); + DBUG_RETURN(ERROR_STOP); + } + else if (!was_version_extracted) + { + warning("Could not extract complete MariaDB version from master. Trying " + "to use GTID filtering, but it is not guaranteed that the " + "master supports it (must be 10.7+)."); + } + + size_t i; + for (i = 0; i < n_start_gtid_ranges; i++) + { + if (i > 0) + { + buf[buf_idx]= ','; + buf_idx += 1; + } + + int4store(buf + buf_idx, start_gtids[i].domain_id); + buf[buf_idx+4]= '-'; + int4store(buf + buf_idx + 5, start_gtids[i].server_id); + buf[buf_idx+9]= '-'; + int8store(buf + buf_idx + 10, start_gtids[i].seq_no); + buf_idx += 18; + } + } + else + { + DBUG_ASSERT(start_position <= UINT_MAX32); + int4store(buf, (uint32) start_position); + buf_idx= BIN_LOG_HEADER_SIZE; + } if (!opt_skip_annotate_row_events) binlog_flags|= BINLOG_SEND_ANNOTATE_ROWS_EVENT; if (!opt_stop_never) binlog_flags|= BINLOG_DUMP_NON_BLOCK; - int2store(buf + BIN_LOG_HEADER_SIZE, binlog_flags); + int2store(buf + buf_idx, binlog_flags); + buf_idx += 2; size_t tlen = strlen(logname); if (tlen > sizeof(buf) - 10) @@ -2637,9 +2871,10 @@ static Exit_status dump_remote_log_entries(PRINT_EVENT_INFO *print_event_info, } else slave_id= 0; - int4store(buf + 6, slave_id); - memcpy(buf + 10, logname, logname_len); - if (simple_command(mysql, COM_BINLOG_DUMP, buf, logname_len + 10, 1)) + int4store(buf + buf_idx, slave_id); + buf_idx += 4; + memcpy(buf + buf_idx, logname, logname_len); + if (simple_command(mysql, COM_BINLOG_DUMP, buf, logname_len + buf_idx, 1)) { error("Got fatal error sending the log dump command."); DBUG_RETURN(ERROR_STOP); @@ -3210,6 +3445,14 @@ int main(int argc, char** argv) "/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;\n"); fprintf(result_file, "/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=0*/;\n"); + + if (is_gtid_filtering_enabled()) + { + fprintf(result_file, + "/*!100001 SET @@SESSION.SERVER_ID=@@GLOBAL.SERVER_ID */;\n" + "/*!100001 SET @@SESSION.GTID_DOMAIN_ID=@@GLOBAL.GTID_DOMAIN_ID " + "*/;\n"); + } } if (tmpdir.list) @@ -3271,3 +3514,4 @@ struct encryption_service_st encryption_handler= #include "sql_list.cc" #include "rpl_filter.cc" #include "compat56.cc" +#include "rpl_gtid.cc"
\ No newline at end of file diff --git a/man/mysqlbinlog.1 b/man/mysqlbinlog.1 index 633300bb7c5..1c7f013af7b 100644 --- a/man/mysqlbinlog.1 +++ b/man/mysqlbinlog.1 @@ -992,8 +992,14 @@ This option is useful for point\-in\-time recovery\&. \fB\-\-start\-position=\fR\fB\fIN\fR\fR, \fB\-j \fR\fB\fIN\fR\fR .sp -Start reading the binary log at the first event having a position equal to or greater than -\fIN\fR\&. This option applies to the first log file named on the command line\&. +Start reading the binary log at \fIN\fR\&. Type can either be a positive +integer or a GTID\& list\&. When using a positive integer, the value only +applies to the first binlog passed on the command line, and the first event +that has a position equal to or greater than \fIN\fR is printed\&. In GTID mode, +multiple GTIDs can be passed as a comma separated list, where each must have a +unique domain id\&. Each GTID is exclusive; only events after a given sequence +number will be printed to allow users to receive events after their current +state, e.g. on a slave\&. .sp This option is useful for point\-in\-time recovery\&. .RE @@ -1063,8 +1069,13 @@ The slave server_id used for \fB--read-from-remote-server --stop-never\fR\&. .\" stop-position option: mysqlbinlog \fB\-\-stop\-position=\fR\fB\fIN\fR\fR .sp -Stop reading the binary log at the first event having a position equal to or greater than -\fIN\fR\&. This option applies to the last log file named on the command line\&. +Stop reading the binary log at the first event having a position equal to or +greater than \fIN\fR\&. Type can either be a positive integer or a GTID +list\&. When using a positive integer, the value only applies to the last log +file named on the command line\&. When in GTID mode, multiple GTIDs can be +passed as a comma separated list, where each must have a unique domain id\&. +Each GTID is inclusive; only events up to the given sequence numbers are +printed. .sp This option is useful for point\-in\-time recovery\&. .RE diff --git a/mysql-test/suite/binlog/include/mysqlbinlog_gtid_window_test_cases.inc b/mysql-test/suite/binlog/include/mysqlbinlog_gtid_window_test_cases.inc new file mode 100644 index 00000000000..29defc45091 --- /dev/null +++ b/mysql-test/suite/binlog/include/mysqlbinlog_gtid_window_test_cases.inc @@ -0,0 +1,242 @@ +# +# This file runs test cases for providing GTIDs to --start-position and +# --stop-position arguments in mariadb-binlog +# +# param $is_remote boolean (0 for false, 1 for true) to perform a local file +# or remote host analysis +# + +--let MYSQLD_DATADIR=`select @@datadir` +--let OUT_FILE=$MYSQLTEST_VARDIR/tmp/binlog.out +--let BINLOG_FILE=master-bin.000001 +--let data_inconsistent_err= "table data is inconsistent after replaying binlog using GTID start/stop positions"; + +if ($is_remote == 0) +{ + --let BINLOG_FILE_PARAM=$MYSQLD_DATADIR/$BINLOG_FILE.orig +} +if ($is_remote == 1) +{ + --let BINLOG_FILE_PARAM=--read-from-remote-server $BINLOG_FILE +} + +## Initialize test data +# +SET @@session.gtid_domain_id= 0; +SET @@session.server_id= 1; +CREATE TABLE t1 (a int); +INSERT INTO t1 values (1), (2); +--let test2_t1_good_checksum= `CHECKSUM TABLE t1` + +SET @@session.gtid_domain_id= 1; +SET @@session.server_id= 2; +CREATE TABLE t2 (a int); +INSERT INTO t2 values (1); + +SET @@session.gtid_domain_id= 0; +SET @@session.server_id= 1; +INSERT INTO t1 values (3), (4); +--let t1_final_checksum= `CHECKSUM TABLE t1` + +SET @@session.gtid_domain_id= 1; +SET @@session.server_id= 2; +INSERT INTO t2 values (2); +--let test4_t2_good_checksum= `CHECKSUM TABLE t2` + +SET @@session.server_id= 3; +INSERT INTO t2 values (3); +--let test3_t2_good_checksum= `CHECKSUM TABLE t2` + +SET @@session.server_id= 2; +INSERT INTO t2 values (4); +--let t2_final_checksum= `CHECKSUM TABLE t2` + +FLUSH LOGS; +if ($is_remote == 0) +{ + --copy_file $MYSQLD_DATADIR/master-bin.000001 $BINLOG_FILE_PARAM +} + +DROP TABLE t1; +DROP TABLE t2; + + +--echo # +--echo # Test Case 1: +--echo # The end of the binlog file resets the server and domain id of the +--echo # session + +# As gtid_domain_id and server_id should not change after reading binlog in GTID +# mode, change variables to otherwise-unused values to ensure they remain +--let $reset_gtid_domain_id = `select @@session.gtid_domain_id` +--let $reset_server_id = `select @@session.server_id` +SET @@session.gtid_domain_id= 10; +SET @@session.server_id= 20; + +# Replay the binlog +--echo # MYSQL_BINLOG BINLOG_FILE_PARAM --start-position=0-1-0 --stop-position=0-1-2 | MYSQL +--exec $MYSQL_BINLOG $BINLOG_FILE_PARAM --start-position=0-1-0 --stop-position=0-1-2 | $MYSQL +--let $test_gtid_domain_id = `select @@session.gtid_domain_id` + +# Ensure variables haven't changed +--let $assert_text = session gtid_domain_id should not change when reading binlog in GTID mode +--let $assert_cond = @@session.gtid_domain_id = 10 +--source include/assert.inc +--let $assert_text = session server_id should not change when reading binlog in GTID mode +--let $assert_cond = @@session.server_id = 20 +--source include/assert.inc + +# Reset back to previous state +--eval SET @@session.gtid_domain_id= $reset_gtid_domain_id +--eval SET @@session.server_id= $reset_server_id +DROP TABLE t1; +DROP TABLE t2; + + +--echo # +--echo # Test Case 2: +--echo # Single GTID range specified +--echo # MYSQL_BINLOG BINLOG_FILE_PARAM --start-position=0-1-0 --stop-position=0-1-2 | MYSQL +--exec $MYSQL_BINLOG $BINLOG_FILE_PARAM --start-position=0-1-0 --stop-position=0-1-2 | $MYSQL + +if ($test2_t1_good_checksum != `CHECKSUM TABLE t1`) +{ + die $data_inconsistent_err; +} +if ($t2_final_checksum != `CHECKSUM TABLE t2`) +{ + die $data_inconsistent_err; +} +DROP TABLE t1; +DROP TABLE t2; + + +--echo # +--echo # Test Case 3: +--echo # Single GTID range with different server_ids +--echo # MYSQL_BINLOG BINLOG_FILE_PARAM --start-position=1-2-0 --stop-position=1-3-4 | MYSQL +--exec $MYSQL_BINLOG $BINLOG_FILE_PARAM --start-position=1-2-0 --stop-position=1-3-4 | $MYSQL + +if ($test3_t2_good_checksum != `CHECKSUM TABLE t2`) +{ + die $data_inconsistent_err; +} +if ($t1_final_checksum != `CHECKSUM TABLE t1`) +{ + die $data_inconsistent_err; +} +DROP TABLE t1; +DROP TABLE t2; + + +--echo # +--echo # Test Case 4: +--echo # Multiple GTID ranges specified +--echo # MYSQL_BINLOG BINLOG_FILE_PARAM --start-position=0-1-0,1-2-0 --stop-position=0-1-3,1-2-3 | MYSQL +--exec $MYSQL_BINLOG $BINLOG_FILE_PARAM --start-position=0-1-0,1-2-0 --stop-position=0-1-3,1-2-3 | $MYSQL + +# Reuse checksum spot from test 4 +if ($t1_final_checksum != `CHECKSUM TABLE t1`) +{ + die $data_inconsistent_err; +} +if ($test4_t2_good_checksum != `CHECKSUM TABLE t2`) +{ + die $data_inconsistent_err; +} +DROP TABLE t1; +DROP TABLE t2; + + +--echo # +--echo # Test Case 5: +--echo # Multiple GTID ranges specified where the domain ids are listed in +--echo # different orders between start/stop position +--echo # MYSQL_BINLOG BINLOG_FILE_PARAM --stop-position=0-1-3,1-2-3 --start-position=1-2-0,0-1-0 | MYSQL +--exec $MYSQL_BINLOG $BINLOG_FILE_PARAM --stop-position=0-1-3,1-2-3 --start-position=1-2-0,0-1-0 | $MYSQL + +# Reuse checksum spot from test 4 +if ($t1_final_checksum != `CHECKSUM TABLE t1`) +{ + die $data_inconsistent_err; +} +if ($test4_t2_good_checksum != `CHECKSUM TABLE t2`) +{ + die $data_inconsistent_err; +} +DROP TABLE t1; +DROP TABLE t2; + + +--echo # +--echo # Test Case 6: +--echo # Only start position specified +CREATE TABLE t1 (a int); +INSERT INTO t1 values (3), (4); +--let test6_t1_good_checksum= `CHECKSUM TABLE t1` +DROP TABLE t1; +CREATE TABLE t1 (a int); +--echo # MYSQL_BINLOG BINLOG_FILE_PARAM --start-position=0-1-2 | MYSQL +--exec $MYSQL_BINLOG $BINLOG_FILE_PARAM --start-position=0-1-2 | $MYSQL +if ($test6_t1_good_checksum != `CHECKSUM TABLE t1`) +{ + die $data_inconsistent_err; +} +if ($t2_final_checksum != `CHECKSUM TABLE t2`) +{ + die $data_inconsistent_err; +} +DROP TABLE t1; +DROP TABLE t2; + + +--echo # +--echo # Test Case 7: +--echo # Only stop position specified +--echo # MYSQL_BINLOG BINLOG_FILE_PARAM --stop-position=0-1-2 | MYSQL +--exec $MYSQL_BINLOG $BINLOG_FILE_PARAM --stop-position=0-1-2 | $MYSQL + +# Reuse checksum spot from test 2 +if ($test2_t1_good_checksum != `CHECKSUM TABLE t1`) +{ + die $data_inconsistent_err; +} +if ($t2_final_checksum != `CHECKSUM TABLE t2`) +{ + die $data_inconsistent_err; +} +DROP TABLE t1; +DROP TABLE t2; + + +--echo # +--echo # Test Case 8: +--echo # Seq_no=0 in --start-position includes all events for a domain +--echo # MYSQL_BINLOG BINLOG_FILE_PARAM --start-position=0-1-0,1-2-0 | MYSQL +--exec $MYSQL_BINLOG $BINLOG_FILE_PARAM --start-position=0-1-0,1-2-0 | $MYSQL +if ($t1_final_checksum != `CHECKSUM TABLE t1`) +{ + die "t1 data should be complete as binlog replay should include domain 0 entirely in results"; +} +if ($t2_final_checksum != `CHECKSUM TABLE t2`) +{ + die "t2 data should be complete as binlog replay should include domain 1 entirely in results"; +} +DROP TABLE t1; +DROP TABLE t2; + +--echo # +--echo # Test Case 9: +--echo # Seq_no=0 in --stop-position excludes all events for a domain +--echo # MYSQL_BINLOG BINLOG_FILE_PARAM --stop-position=0-1-0,1-2-0 | MYSQL +--exec $MYSQL_BINLOG $BINLOG_FILE_PARAM --stop-position=0-1-0,1-2-0 | $MYSQL +if (0 < `SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'test' AND table_name = 't1'`) +{ + die "t1 should not exist as binlog replay should exclude domain 0 from results"; +} +if (0 < `SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'test' AND table_name = 't2'`) +{ + die "t2 should not exist as binlog replay should exclude domain 1 from results"; +} +DROP TABLE t1; +DROP TABLE t2;
\ No newline at end of file diff --git a/mysql-test/suite/binlog/t/binlog_mysqlbinlog_gtid_window-master.opt b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_gtid_window-master.opt new file mode 100644 index 00000000000..d17999c07c1 --- /dev/null +++ b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_gtid_window-master.opt @@ -0,0 +1 @@ +--timezone=GMT-8
\ No newline at end of file diff --git a/mysql-test/suite/binlog/t/binlog_mysqlbinlog_gtid_window.test b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_gtid_window.test new file mode 100644 index 00000000000..0bc83927123 --- /dev/null +++ b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_gtid_window.test @@ -0,0 +1,124 @@ +# +# Purpose: +# +# This test ensures that the mariadb-binlog CLI tool can filter log events +# using GTID ranges. More specifically, this test ensures the following +# capabilities: +# 1) GTIDs can be used to filter results on local binlog files +# 2) GTIDs can be used to filter results from remote servers +# 3) For a given GTID range, its start-position is exclusive and its +# stop-position is inclusive. This allows users to receive events strictly +# after those that they already have, and is useful in cases such as: 1) +# events were received out of order and should be re-sent, or 2) +# specifying the gtid state of a slave to get events newer than their +# current state. +# 4) After the events have been written, the session server id and domain id +# are reset to their former values +# +# +# Methodology: +# +# This test validates the expected capabilities using the following test cases +# on both a local binlog file and remote server for all binlog formats. +# Test Case 1) The end of the binlog file resets the server and domain id of +# the session +# Test Case 2) Single GTID range specified +# Test Case 3) Single GTID range with different server_ids +# Test Case 4) Multiple GTID ranges specified +# Test Case 5) Multiple GTID ranges specified where the domain ids are +# listed in different orders between start/stop position +# Test Case 6) Only start position specified +# Test Case 7) Only stop position specified +# Test Case 8) Seq_no=0 in --start-position includes all events for a domain +# Test Case 9) Seq_no=0 in --stop-position excludes all events for a domain +# +# To validate for data consistency, each test case compares a checksum of +# correct data against a variant created after replaying the binlog using +# --(start|stop)-position. If the checksums are identical, the test passes. +# If the checksums differ, data has been changed and the test fails. +# +# Additionally, this test validates the following error scenarios: +# Error Case 1) User provides invalid positions +# Error Case 2) User provides GTID ranges with repeated domain ids +# +# +# References: +# MDEV-4989: Support for GTID in mysqlbinlog +# + +--source include/have_log_bin.inc + +--echo ############################### +--echo # Test Setup +--echo ############################### + +## Save old state +# +let orig_gtid_domain_id = `select @@session.gtid_domain_id`; +let orig_server_id = `select @@session.server_id`; + +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +RESET MASTER; + + +--echo ###################################### +--echo # Test Group 1 +--echo # Run test cases on local log file +--echo ###################################### +--let is_remote= 0 +--source include/mysqlbinlog_gtid_window_test_cases.inc + + +--echo ###################################### +--echo # Test Group 2 +--echo # Run test cases on remote host +--echo ###################################### +--let is_remote= 1 +--source include/mysqlbinlog_gtid_window_test_cases.inc + + +--echo ############################## +--echo # Error Cases +--echo ############################## + +--let $MYSQLD_DATADIR=`select @@datadir` +--echo # +--echo # Error Case 1: +--echo # User provides invalid positions +--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=z --base64-output=NEVER +--error 1 +--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --start-position=z --base64-output=NEVER + +--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=1- --base64-output=NEVER +--error 1 +--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --start-position=1- --base64-output=NEVER + +--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=1-2 --base64-output=NEVER +--error 1 +--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --start-position=1-2 --base64-output=NEVER + +--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=1-2- --base64-output=NEVER +--error 1 +--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --start-position=1-2- --base64-output=NEVER + +--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=-1 --base64-output=NEVER +--error 1 +--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --start-position=-1 --base64-output=NEVER + +--echo # +--echo # Error Case 2: +--echo # User provides GTID ranges with repeated domain ids +--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=0-1-1,0-1-8 --stop-position=0-1-4,0-1-12 --base64-output=NEVER +--error 1 +--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --start-position=0-1-1,0-1-8 --stop-position=0-1-4,0-1-12 --base64-output=NEVER + + +--echo ############################## +--echo # Cleanup +--echo ############################## +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +--eval SET @@global.gtid_domain_id= $orig_gtid_domain_id +--eval SET @@global.server_id= $orig_server_id + diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc index 306ae878060..629be5165bf 100644 --- a/sql/rpl_gtid.cc +++ b/sql/rpl_gtid.cc @@ -17,6 +17,7 @@ /* Definitions for MariaDB global transaction ID (GTID). */ +#ifndef MYSQL_CLIENT #include "mariadb.h" #include "sql_priv.h" #include "unireg.h" @@ -1273,6 +1274,7 @@ rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid) return true; } +#endif /* Parse a GTID at the start of a string, and update the pointer to point @@ -1310,9 +1312,32 @@ gtid_parser_helper(const char **ptr, const char *end, rpl_gtid *out_gtid) return 0; } +/* + Unpack a GTID at the start of a string, and update the pointer to point + at the first character after the unpacked GTID. + + Returns 0 on ok, non-zero on parse error. +*/ +static int +gtid_unpack_helper(const char **ptr, const char *end, rpl_gtid *out_gtid) +{ + const char *p= *ptr; + + if (p[4] != '-' || p[9] != '-') + return 1; + + out_gtid->domain_id= (uint32)uint4korr(p); + out_gtid->server_id= (uint32)uint4korr(&p[5]); + out_gtid->seq_no= (uint64)uint8korr(&p[10]); + + *ptr= p + 18; + return 0; +} rpl_gtid * -gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len) +gtid_read_to_list(const char *str, size_t str_len, uint32 *out_len, + int reader_f(const char **ptr, const char *end, + rpl_gtid *out_gtid)) { const char *p= const_cast<char *>(str); const char *end= p + str_len; @@ -1323,7 +1348,7 @@ gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len) { rpl_gtid gtid; - if (len >= (((uint32)1 << 28)-1) || gtid_parser_helper(&p, end, >id)) + if (len >= (((uint32)1 << 28)-1) || reader_f(&p, end, >id)) { my_free(list); return NULL; @@ -1350,6 +1375,20 @@ gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len) } +rpl_gtid * +gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len) +{ + return gtid_read_to_list(str, str_len, out_len, gtid_parser_helper); +} + +rpl_gtid * +gtid_unpack_string_to_list(const char *str, size_t str_len, uint32 *out_len) +{ + return gtid_read_to_list(str, str_len, out_len, gtid_unpack_helper); +} + +#ifndef MYSQL_CLIENT + /* Update the slave replication state with the GTID position obtained from master when connecting with old-style (filename,offset) position. @@ -2974,3 +3013,382 @@ gtid_waiting::remove_from_wait_queue(gtid_waiting::hash_element *he, queue_remove(&he->queue, elem->queue_idx); } + +#endif + +Window_gtid_event_filter::Window_gtid_event_filter() : + m_has_start(FALSE), + m_has_stop(FALSE), + m_is_active(FALSE), + m_has_passed(FALSE) + { + // m_start and m_stop do not need initial values if unused + } + +Window_gtid_event_filter::Window_gtid_event_filter(rpl_gtid *start, rpl_gtid *stop) : + m_is_active(FALSE), + m_has_passed(FALSE) +{ + DBUG_ASSERT(start->domain_id == stop->domain_id); + + m_is_active= FALSE; + m_has_passed= FALSE; + + m_has_start= TRUE; + m_start.domain_id= start->domain_id; + m_start.server_id= start->server_id; + m_start.seq_no= start->seq_no; + + m_has_stop= TRUE; + m_stop.domain_id= stop->domain_id; + m_stop.server_id= stop->server_id; + m_stop.seq_no= stop->seq_no; +} + +int Window_gtid_event_filter::set_start_gtid(rpl_gtid *start) +{ + int err= 0; + if (m_has_start) + { + err= 1; + goto err; + } + + if (m_has_stop && m_stop.domain_id != start->domain_id) + { + err= 1; + goto err; + } + + // Copy values + m_has_start= TRUE; + m_start.domain_id= start->domain_id; + m_start.server_id= start->server_id; + m_start.seq_no= start->seq_no; + +err: + return err; +} + +int Window_gtid_event_filter::set_stop_gtid(rpl_gtid *stop) +{ + int err= 0; + if (m_has_stop) + { + err= 1; + goto err; + } + + if (m_has_start && m_start.domain_id != stop->domain_id) + { + err= 1; + goto err; + } + + // Copy values + m_has_stop= TRUE; + m_stop.domain_id= stop->domain_id; + m_stop.server_id= stop->server_id; + m_stop.seq_no= stop->seq_no; + +err: + return err; +} + +static inline my_bool is_gtid_at_or_after(rpl_gtid *boundary, + rpl_gtid *test_gtid) +{ + return test_gtid->domain_id == boundary->domain_id && + test_gtid->server_id == boundary->server_id && + test_gtid->seq_no >= boundary->seq_no; +} + +static inline my_bool is_gtid_before(rpl_gtid *boundary, + rpl_gtid *test_gtid) +{ + return test_gtid->domain_id == boundary->domain_id && + test_gtid->server_id == boundary->server_id && + test_gtid->seq_no <= boundary->seq_no; +} + +my_bool Window_gtid_event_filter::exclude(rpl_gtid *gtid) +{ + /* Assume result should be excluded to start */ + my_bool should_exclude= TRUE; + + DBUG_ASSERT((m_has_start || m_has_stop) && + (gtid->domain_id == m_start.domain_id || + gtid->domain_id == m_stop.domain_id)); + + if (!m_is_active && !m_has_passed) + { + /* + This filter has not yet been activated. Check if the gtid is within the + bounds of this window. + */ + + if (!m_has_start) + { + /* + Start GTID was not provided, so we want to include everything up to m_stop + */ + m_is_active= TRUE; + should_exclude= FALSE; + } + else if (is_gtid_at_or_after(&m_start, gtid)) + { + m_is_active= TRUE; + + DBUG_PRINT("gtid-event-filter", + ("Window: Begin (%d-%d-%d, %d-%d-%llu]", m_start.domain_id, + m_start.server_id, m_start.seq_no, m_stop.domain_id, + m_stop.server_id, m_stop.seq_no)); + + /* + As the start of the range is exclusive, if this gtid is the start of + the range, exclude it + */ + if (gtid->seq_no == m_start.seq_no && + gtid->server_id == m_start.server_id) + should_exclude= TRUE; + else + should_exclude= FALSE; + } + } /* if (!m_is_active && !m_has_passed) */ + else if (m_is_active && !m_has_passed) + { + /* + This window is currently active so we want the event group to be included + in the results. Additionally check if we are at the end of the window. + If no end of the window is provided, go indefinitely + */ + should_exclude= FALSE; + + if (m_has_stop && is_gtid_at_or_after(&m_stop, gtid)) + { + DBUG_PRINT("gtid-event-filter", + ("Window: End (%d-%d-%d, %d-%d-%llu]", m_start.domain_id, + m_start.server_id, m_start.seq_no, m_stop.domain_id, + m_stop.server_id, m_stop.seq_no)); + m_is_active= FALSE; + m_has_passed= TRUE; + + if (gtid->server_id == m_stop.server_id && gtid->seq_no > m_stop.seq_no) + { + /* + The GTID is after the finite stop of the window, don't let it pass + through + */ + should_exclude= TRUE; + } + } + else if (m_has_start && is_gtid_before(&m_start, gtid)) + { + /* + Out of order check, the window is active but this GTID takes place + before the window begins. keep the window active, but exclude it from + passing through. + */ + should_exclude= TRUE; + } + } + else if (m_has_passed && m_has_stop && is_gtid_before(&m_stop, gtid)) + { + /* Test if events are out of order */ + if (!m_has_start || (m_has_start && is_gtid_at_or_after(&m_start, gtid))) + { + /* + The filter window has closed because it has seen a GTID higher than its + end boundary; however, this GTID is out of order and should be passed + through. + */ + should_exclude= TRUE; + } + } + + return should_exclude; +} + +Id_delegating_gtid_event_filter::Id_delegating_gtid_event_filter() +{ + uint32 i; + + m_filter_id_mask= 0xf; + + m_filters_by_id= (gtid_filter_element **) my_malloc( + PSI_NOT_INSTRUMENTED, + (m_filter_id_mask + 1) * sizeof(gtid_filter_element *), + MYF(MY_WME) + ); + + if (m_filters_by_id == NULL) + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + + for (i = 0; i <= m_filter_id_mask; i++) + { + m_filters_by_id[i]= NULL; + } + + m_default_filter= new Accept_all_gtid_filter(); +} + +/* + Deconstructor deletes: + 1) All Identifiable_gtid_event_filters added + 2) All gtid_filter_element allocations +*/ +Id_delegating_gtid_event_filter::~Id_delegating_gtid_event_filter() +{ + uint32 i; + for (i = 0; i <= m_filter_id_mask; i++) + { + gtid_filter_element *filter_element= m_filters_by_id[i], + *filter_element_to_del= NULL; + while(filter_element) + { + filter_element_to_del= filter_element; + filter_element= filter_element->next; + delete filter_element_to_del->filter; + my_free(filter_element_to_del); + } + } + my_free(m_filters_by_id); + + delete m_default_filter; +} + +void Id_delegating_gtid_event_filter::set_default_filter(Gtid_event_filter *filter) +{ + if (m_default_filter) + delete m_default_filter; + + m_default_filter= filter; +} + +gtid_filter_element * +Id_delegating_gtid_event_filter::try_find_filter_element_for_id( + gtid_filter_identifier filter_id) +{ + // Add this into the domain id list + uint32 map_idx= filter_id & m_filter_id_mask; + gtid_filter_element *filter_idx= m_filters_by_id[map_idx]; + + /* Find list index to add this filter */ + while (filter_idx) + { + if (filter_idx->filter->get_filter_identifier() == filter_id) + break; + filter_idx= filter_idx->next; + } + + return filter_idx; +} + +gtid_filter_element * +Id_delegating_gtid_event_filter::find_or_create_filter_element_for_id( + gtid_filter_identifier filter_id) +{ + // Add this into the domain id list + uint32 map_idx= filter_id & m_filter_id_mask; + gtid_filter_element *filter_idx= m_filters_by_id[map_idx], + *prev_idx= NULL; + + /* Find list index to add this filter */ + while (filter_idx) + { + prev_idx= filter_idx; + if (filter_idx->filter->get_filter_identifier() == filter_id) + { + break; + } + prev_idx= filter_idx; + filter_idx= filter_idx->next; + } + + if (filter_idx == NULL) + { + // No other domain ids have filters that index here, create this one + filter_idx= (gtid_filter_element *) my_malloc( + PSI_NOT_INSTRUMENTED, sizeof(gtid_filter_element), MYF(MY_WME)); + filter_idx->filter= NULL; + filter_idx->next= NULL; + + if (prev_idx == NULL) + { + // This is the first filter in the bucket + m_filters_by_id[map_idx]= filter_idx; + } + else + { + // End of list, append filter list to tail + prev_idx->next= filter_idx; + } + } + + return filter_idx; +} + +my_bool Id_delegating_gtid_event_filter::exclude(rpl_gtid *gtid) +{ + gtid_filter_identifier filter_id= get_id_from_gtid(gtid); + gtid_filter_element *filter_element= try_find_filter_element_for_id(filter_id); + Gtid_event_filter *filter= + (filter_element ? filter_element->filter : m_default_filter); + return filter->exclude(gtid); +} + +Window_gtid_event_filter * +Domain_gtid_event_filter::find_or_create_window_filter_for_id( + uint32 domain_id) +{ + gtid_filter_element *filter_element= + find_or_create_filter_element_for_id(domain_id); + Window_gtid_event_filter *wgef= NULL; + + if (filter_element->filter == NULL) + { + // New filter + wgef= new Window_gtid_event_filter(); + filter_element->filter= new Identifiable_gtid_event_filter(domain_id, wgef); + } + else if (filter_element->filter->get_filter_type() == WINDOW_GTID_FILTER_TYPE) + { + // We have an existing window filter here + wgef= (Window_gtid_event_filter *) + filter_element->filter->get_identified_filter(); + } + /* + Else: We have an existing filter but it is not of window type so propogate + NULL filter + */ + + return wgef; +} + +int Domain_gtid_event_filter::add_start_gtid(rpl_gtid *gtid) +{ + int err= 0; + Window_gtid_event_filter *filter_to_update= + find_or_create_window_filter_for_id(gtid->domain_id); + + if (filter_to_update == NULL) + err= 1; + else + err= filter_to_update->set_start_gtid(gtid); + + return err; +} + +int Domain_gtid_event_filter::add_stop_gtid(rpl_gtid *gtid) +{ + int err= 0; + Window_gtid_event_filter *filter_to_update= + find_or_create_window_filter_for_id(gtid->domain_id); + + if (filter_to_update == NULL) + err= 1; + else + err= filter_to_update->set_stop_gtid(gtid); + + return err; +}
\ No newline at end of file diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h index 11541c8000c..3e54b995a64 100644 --- a/sql/rpl_gtid.h +++ b/sql/rpl_gtid.h @@ -380,5 +380,258 @@ extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid, extern int gtid_check_rpl_slave_state_table(TABLE *table); extern rpl_gtid *gtid_parse_string_to_list(const char *p, size_t len, uint32 *out_len); +extern rpl_gtid *gtid_unpack_string_to_list(const char *p, size_t len, + uint32 *out_len); + +/* + Interface to support different methods of filtering log events by GTID +*/ +class Gtid_event_filter +{ +public: + Gtid_event_filter() {}; + virtual ~Gtid_event_filter() {}; + + enum gtid_event_filter_type + { + DELEGATING_GTID_FILTER_TYPE = 1, + WINDOW_GTID_FILTER_TYPE = 2, + ACCEPT_ALL_GTID_FILTER_TYPE = 3 + }; + + /* + Run the filter on an input gtid to test if the corresponding log events + should be excluded from a result + + Returns TRUE when the event group corresponding to the input GTID should be + excluded. + Returns FALSE when the event group should be included. + */ + virtual my_bool exclude(rpl_gtid *) = 0; + + /* + The gtid_event_filter_type that corresponds to the underlying filter + implementation + */ + virtual uint32 get_filter_type() = 0; +}; + +/* + Filter implementation which will include any and all input GTIDs. This is + used to set default behavior for GTIDs that do not have explicit filters + set on their domain_id, e.g. when a Window_gtid_event_filter is used for + a specific domain, then all other domain_ids will be accepted using this + filter implementation. +*/ +class Accept_all_gtid_filter : public Gtid_event_filter +{ +public: + Accept_all_gtid_filter() {} + ~Accept_all_gtid_filter() {} + my_bool exclude(rpl_gtid *gtid) { return FALSE; } + uint32 get_filter_type() { return ACCEPT_ALL_GTID_FILTER_TYPE; } +}; + +/* + A sub-class of Gtid_event_filter which allows for quick identification + of potentially applicable filters for arbitrary GTIDs. +*/ +typedef uint32 gtid_filter_identifier; +class Identifiable_gtid_event_filter : public Gtid_event_filter +{ + +public: + Identifiable_gtid_event_filter(gtid_filter_identifier filter_id, + Gtid_event_filter *filter) + : m_filter_id(filter_id), m_filter(filter){}; + + ~Identifiable_gtid_event_filter() { + delete m_filter; + }; + + Gtid_event_filter *get_identified_filter() { return m_filter; } + gtid_filter_identifier get_filter_identifier() { return m_filter_id; } + + /* + Inherited functionality uses composition to call the pass-through filter + */ + my_bool exclude(rpl_gtid *gtid) { return m_filter->exclude(gtid); } + uint32 get_filter_type() { return m_filter->get_filter_type(); } + +protected: + gtid_filter_identifier m_filter_id; + Gtid_event_filter *m_filter; +}; + +/* + A filter implementation that passes through events between two GTIDs, m_start + (exclusive) and m_stop (inclusive). + + This filter is stateful, such that it expects GTIDs to be a sequential + stream, and internally, the window will activate/deactivate when the start + and stop positions of the event stream have passed through, respectively. + + Window activation is used to permit events from the same domain id which fall + in-between m_start and m_stop, but are not from the same server id. For + example, consider the following event stream with GTIDs 0-1-1,0-2-1,0-1-2. + With m_start as 0-1-0 and m_stop as 0-1-2, we want 0-2-1 to be included in + this filter. Therefore, the window activates upon seeing 0-1-1, and allows + any GTIDs within this domain to pass through until 0-1-2 has been + encountered. +*/ +class Window_gtid_event_filter : public Gtid_event_filter +{ +public: + Window_gtid_event_filter(); + Window_gtid_event_filter(rpl_gtid *start, rpl_gtid *stop); + ~Window_gtid_event_filter() {} + + my_bool exclude(rpl_gtid*); + + /* + Set the GTID that begins this window (exclusive) + + Returns 0 on ok, non-zero on error + */ + int set_start_gtid(rpl_gtid *start); + + /* + Set the GTID that ends this window (inclusive) + + Returns 0 on ok, non-zero on error + */ + int set_stop_gtid(rpl_gtid *stop); + + uint32 get_filter_type() { return WINDOW_GTID_FILTER_TYPE; } + +private: + /* + m_has_start : Indicates if a start to this window has been explicitly + provided. A window starts immediately if not provided. + */ + my_bool m_has_start; + + /* + m_has_stop : Indicates if a stop to this window has been explicitly + provided. A window continues indefinitely if not provided. + */ + my_bool m_has_stop; + + /* + m_is_active : Indicates whether or not the program is currently reading + events from within this window. When TRUE, events with + different server ids than those specified by m_start or + m_stop will be passed through. + */ + my_bool m_is_active; + + /* + m_has_passed : Indicates whether or not the program is currently reading + events from within this window. + */ + my_bool m_has_passed; + + /* m_start : marks the GTID that begins the window (exclusive). */ + rpl_gtid m_start; + + /* m_stop : marks the GTID that ends the range (inclusive). */ + rpl_gtid m_stop; + + /* last_gtid_seen: saves the last */ + rpl_gtid last_gtid_seen; +}; + +/* + Data structure to help with quick lookup for filters. More specifically, + if two filters have identifiers that lead to the same hash, they will be + put into a linked list. +*/ +typedef struct _gtid_filter_element +{ + Identifiable_gtid_event_filter *filter; + struct _gtid_filter_element *next; +} gtid_filter_element; + +/* + Gtid_event_filter subclass which has no specific implementation, but rather + delegates the filtering to specific identifiable/mapped implementations. + + A default filter is used for GTIDs that are passed through which no explicit + filter can be identified. + + This class should be subclassed, where the get_id_from_gtid function + specifies how to extract the filter identifier from a GTID. +*/ +class Id_delegating_gtid_event_filter : public Gtid_event_filter +{ +public: + Id_delegating_gtid_event_filter(); + ~Id_delegating_gtid_event_filter(); + + my_bool exclude(rpl_gtid *gtid); + void set_default_filter(Gtid_event_filter *default_filter); + + uint32 get_filter_type() { return DELEGATING_GTID_FILTER_TYPE; } + + virtual gtid_filter_identifier get_id_from_gtid(rpl_gtid *) = 0; + + +protected: + + uint32 m_filter_id_mask; + Gtid_event_filter *m_default_filter; + + /* + To reduce time to find a gtid window, they are indexed by domain_id. More + specifically, domain_ids are arranged into m_filter_id_mask+1 buckets, and + each bucket is a linked list of gtid_filter_elements that share the same + index. The index itself is found by a bitwise and, i.e. + some_rpl_gtid.domain_id & m_filter_id_mask + */ + gtid_filter_element **m_filters_by_id; + + gtid_filter_element *try_find_filter_element_for_id(gtid_filter_identifier); + gtid_filter_element *find_or_create_filter_element_for_id(gtid_filter_identifier); +}; + +/* + A subclass of Id_delegating_gtid_event_filter which identifies filters using the + domain id of a GTID. + + Additional helper functions include: + add_start_gtid(GTID) : adds a start GTID position to this filter, to be + identified by its domain id + add_stop_gtid(GTID) : adds a stop GTID position to this filter, to be + identified by its domain id +*/ +class Domain_gtid_event_filter : public Id_delegating_gtid_event_filter +{ +public: + + /* + Returns the domain id of from the input GTID + */ + gtid_filter_identifier get_id_from_gtid(rpl_gtid *gtid) + { + return gtid->domain_id; + } + + /* + Helper function to start a GTID window filter at the given GTID + + Returns 0 on ok, non-zero on error + */ + int add_start_gtid(rpl_gtid *gtid); + + /* + Helper function to end a GTID window filter at the given GTID + + Returns 0 on ok, non-zero on error + */ + int add_stop_gtid(rpl_gtid *gtid); + +private: + Window_gtid_event_filter *find_or_create_window_filter_for_id(gtid_filter_identifier); +}; #endif /* RPL_GTID_H */ diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index b9d3eec5a60..b1b450d7b40 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -2121,8 +2121,11 @@ dispatch_command_return dispatch_command(enum enum_server_command command, THD * case COM_BINLOG_DUMP: { ulong pos; + uint32 n_start_gtids= 0; + rpl_gtid *start_gtids= NULL; ushort flags; uint32 slave_server_id; + uint32 unpack_idx= 0; status_var_increment(thd->status_var.com_other); @@ -2131,19 +2134,42 @@ dispatch_command_return dispatch_command(enum enum_server_command command, THD * break; /* TODO: The following has to be changed to an 8 byte integer */ - pos = uint4korr(packet); - flags = uint2korr(packet + 4); + if (packet[4] == '-' && packet[9] == '-') + { + unpack_idx= 18; + while (packet[unpack_idx] == ',') + unpack_idx += 19; // 18 chars for gtid + 1 for comma + start_gtids= gtid_unpack_string_to_list(packet, unpack_idx, &n_start_gtids); + + /* + Set pos to the start of the binlog file for scanning + + TODO: When GTID indexing is complete (MDEV-4991), update pos by + looking it up in the index + */ + pos= 4; + } /* if (packet[4] == '-' && packet[9] == '-') */ + else + { + /* Single numeric log position case */ + pos = uint4korr(packet); + unpack_idx += 4; + } + flags = uint2korr(packet + unpack_idx); + unpack_idx += 2; thd->variables.server_id=0; /* avoid suicide */ - if ((slave_server_id= uint4korr(packet+6))) // mysqlbinlog.server_id==0 + if ((slave_server_id= uint4korr(packet+unpack_idx))) // mysqlbinlog.server_id==0 kill_zombie_dump_threads(slave_server_id); thd->variables.server_id = slave_server_id; + unpack_idx += 4; - const char *name= packet + 10; + const char *name= packet + unpack_idx; size_t nlen= strlen(name); general_log_print(thd, command, "Log: '%s' Pos: %lu", name, pos); if (nlen < FN_REFLEN) - mysql_binlog_send(thd, thd->strmake(name, nlen), (my_off_t)pos, flags); + mysql_binlog_send(thd, thd->strmake(name, nlen), (my_off_t)pos, flags, + start_gtids, n_start_gtids); thd->unregister_slave(); // todo: can be extraneous /* fake COM_QUIT -- if we get here, the thread needs to terminate */ error = TRUE; diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 90fdce1b56f..722ca301c81 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -163,6 +163,8 @@ struct binlog_send_info { bool should_stop; size_t dirlen; + Gtid_event_filter *gtid_event_filter; + binlog_send_info(THD *thd_arg, String *packet_arg, ushort flags_arg, char *lfn) : thd(thd_arg), net(&thd_arg->net), packet(packet_arg), @@ -185,6 +187,8 @@ struct binlog_send_info { error_text[0] = 0; bzero(&error_gtid, sizeof(error_gtid)); until_binlog_state.init(); + + gtid_event_filter= NULL; } }; @@ -1751,6 +1755,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, } } + /* Skip GTID event groups until we reach slave position within a domain_id. */ if (event_type == GTID_EVENT && info->using_gtid_state) { @@ -1758,7 +1763,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, slave_connection_state::entry *gtid_entry; rpl_gtid *gtid; - if (gtid_state->count() > 0 || until_gtid_state) + if (gtid_state->count() > 0 || until_gtid_state || info->gtid_event_filter) { rpl_gtid event_gtid; @@ -1899,6 +1904,17 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, } } } + + /* + Should this result be excluded from the output? + */ + if (info->gtid_event_filter && + info->gtid_event_filter->exclude(&event_gtid)) + { + info->gtid_skip_group= + (flags2 & Gtid_log_event::FL_STANDALONE ? GTID_SKIP_STANDALONE + : GTID_SKIP_TRANSACTION); + } } } @@ -2110,7 +2126,9 @@ err: static int init_binlog_sender(binlog_send_info *info, LOG_INFO *linfo, const char *log_ident, - my_off_t *pos) + my_off_t *pos, + rpl_gtid *start_gtids, + size_t n_start_gtids) { THD *thd= info->thd; int error; @@ -2130,7 +2148,8 @@ static int init_binlog_sender(binlog_send_info *info, info->current_checksum_alg= get_binlog_checksum_value_at_connect(thd); info->mariadb_slave_capability= get_mariadb_slave_capability(thd); - info->using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state); + info->using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state) || + start_gtids != NULL; DBUG_EXECUTE_IF("simulate_non_gtid_aware_master", info->using_gtid_state= false;); @@ -2247,6 +2266,21 @@ static int init_binlog_sender(binlog_send_info *info, info->clear_initial_log_pos= true; } + if (start_gtids != NULL) + { + Domain_gtid_event_filter *filter= new Domain_gtid_event_filter(); + my_off_t i; + for(i = 0; i < n_start_gtids; i++) + { + if (filter->add_start_gtid(&start_gtids[i])) + { + info->errmsg= "Domain id is invalid for GTID start position"; + info->error= ER_INCORRECT_GTID_STATE; + } + } + info->gtid_event_filter= filter; + } + return 0; } @@ -2840,7 +2874,8 @@ static int send_one_binlog_file(binlog_send_info *info, } void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, - ushort flags) + ushort flags, rpl_gtid *start_gtids, + uint32 n_start_gtids) { LOG_INFO linfo; @@ -2860,7 +2895,8 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, bzero((char*) &log,sizeof(log)); - if (init_binlog_sender(info, &linfo, log_ident, &pos)) + if (init_binlog_sender(info, &linfo, log_ident, &pos, start_gtids, + n_start_gtids)) goto err; has_transmit_started= true; @@ -3022,6 +3058,8 @@ err: thd->reset_current_linfo(); thd->variables.max_allowed_packet= old_max_allowed_packet; delete info->fdev; + delete info->gtid_event_filter; + my_free(start_gtids); if (likely(info->error == 0)) { diff --git a/sql/sql_repl.h b/sql/sql_repl.h index 95916e31abf..bfc35ea5456 100644 --- a/sql/sql_repl.h +++ b/sql/sql_repl.h @@ -57,7 +57,8 @@ struct LOAD_FILE_IO_CACHE : public IO_CACHE int log_loaded_block(IO_CACHE* file, uchar *Buffer, size_t Count); int init_replication_sys_vars(); -void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags); +void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags, + rpl_gtid *start_gtids, uint32 n_start_gtids); #ifdef HAVE_PSI_INTERFACE extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state; |