summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client/mysqlbinlog.cc294
-rw-r--r--man/mysqlbinlog.119
-rw-r--r--mysql-test/suite/binlog/include/mysqlbinlog_gtid_window_test_cases.inc242
-rw-r--r--mysql-test/suite/binlog/t/binlog_mysqlbinlog_gtid_window-master.opt1
-rw-r--r--mysql-test/suite/binlog/t/binlog_mysqlbinlog_gtid_window.test124
-rw-r--r--sql/rpl_gtid.cc422
-rw-r--r--sql/rpl_gtid.h253
-rw-r--r--sql/sql_parse.cc36
-rw-r--r--sql/sql_repl.cc48
-rw-r--r--sql/sql_repl.h3
10 files changed, 1400 insertions, 42 deletions
diff --git a/client/mysqlbinlog.cc b/client/mysqlbinlog.cc
index 3d9b9712ec8..2988dcf22a9 100644
--- a/client/mysqlbinlog.cc
+++ b/client/mysqlbinlog.cc
@@ -143,10 +143,18 @@ static char *charset= 0;
static uint verbose= 0;
-static ulonglong start_position, stop_position;
+static char *start_pos_str, *stop_pos_str;
+static ulonglong start_position= BIN_LOG_HEADER_SIZE,
+ stop_position= (longlong)(~(my_off_t)0) ;
#define start_position_mot ((my_off_t)start_position)
#define stop_position_mot ((my_off_t)stop_position)
+static Domain_gtid_event_filter *domain_gtid_filter= NULL;
+static rpl_gtid *start_gtids, *stop_gtids;
+static my_bool is_event_group_active= FALSE;
+static uint32 n_start_gtid_ranges= 0;
+static uint32 n_stop_gtid_ranges= 0;
+
static char *start_datetime_str, *stop_datetime_str;
static my_time_t start_datetime= 0, stop_datetime= MY_TIME_T_MAX;
static ulonglong rec_count= 0;
@@ -981,6 +989,10 @@ static bool print_row_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
return result;
}
+static inline my_bool is_gtid_filtering_enabled()
+{
+ return domain_gtid_filter != NULL;
+}
/**
Print the given event, and either delete it or delegate the deletion
@@ -1019,11 +1031,29 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
#endif
/*
+ If the binlog output should be filtered using GTIDs, test the new event
+ group to see if its events should be written or ignored.
+ */
+ if (ev_type == GTID_EVENT && is_gtid_filtering_enabled())
+ {
+ Gtid_log_event *gle= (Gtid_log_event*) ev;
+ rpl_gtid gtid;
+ gtid.domain_id= gle->domain_id;
+ gtid.server_id= gle->server_id;
+ gtid.seq_no= gle->seq_no;
+ if (!domain_gtid_filter->exclude(&gtid))
+ is_event_group_active= TRUE;
+ else
+ is_event_group_active= FALSE;
+ }
+
+ /*
Format events are not concerned by --offset and such, we always need to
read them to be able to process the wanted events.
*/
if (((rec_count >= offset) &&
- (ev->when >= start_datetime)) ||
+ (ev->when >= start_datetime) &&
+ (!is_gtid_filtering_enabled() || is_event_group_active)) ||
(ev_type == FORMAT_DESCRIPTION_EVENT))
{
if (ev_type != FORMAT_DESCRIPTION_EVENT)
@@ -1500,6 +1530,17 @@ end:
}
}
+ /*
+ Xid_log_events or commit Query_log_events mark the end of a GTID event
+ group.
+ */
+ if ((ev_type == XID_EVENT ||
+ (ev_type == QUERY_EVENT && ((Query_log_event *) ev)->is_commit())) &&
+ is_event_group_active)
+ {
+ is_event_group_active= FALSE;
+ }
+
if (destroy_evt) /* destroy it later if not set (ignored table map) */
delete ev;
}
@@ -1658,15 +1699,15 @@ static struct my_option my_options[] =
&start_datetime_str, &start_datetime_str,
0, GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"start-position", 'j',
- "Start reading the binlog at position N. Applies to the first binlog "
- "passed on the command line.",
- &start_position, &start_position, 0, GET_ULL,
- REQUIRED_ARG, BIN_LOG_HEADER_SIZE, BIN_LOG_HEADER_SIZE,
- /*
- COM_BINLOG_DUMP accepts only 4 bytes for the position
- so remote log reading has lower limit.
- */
- (ulonglong)(0xffffffffffffffffULL), 0, 0, 0},
+ "Start reading the binlog at this position. Type can either be a positive "
+ "integer or a GTID list. When using a positive integer, the value only "
+ "applies to the first binlog passed on the command line. In GTID mode, "
+ "multiple GTIDs can be passed as a comma separated list, where each must "
+ "have a unique domain id. Each GTID is exclusive; only events after a "
+ "given sequence number will be printed to allow users to receive events "
+ "after their current state, e.g. on a slave.",
+ &start_pos_str, &start_pos_str, 0, GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0,
+ 0, 0, 0},
{"stop-datetime", OPT_STOP_DATETIME,
"Stop reading the binlog at first event having a datetime equal or "
"posterior to the argument; the argument must be a date and time "
@@ -1684,11 +1725,14 @@ static struct my_option my_options[] =
&opt_stop_never_slave_server_id, &opt_stop_never_slave_server_id, 0,
GET_ULONG, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"stop-position", OPT_STOP_POSITION,
- "Stop reading the binlog at position N. Applies to the last binlog "
- "passed on the command line.",
- &stop_position, &stop_position, 0, GET_ULL,
- REQUIRED_ARG, (longlong)(~(my_off_t)0), BIN_LOG_HEADER_SIZE,
- (ulonglong)(~(my_off_t)0), 0, 0, 0},
+ "Stop reading the binlog at this position. Type can either be a positive "
+ "integer or a GTID list. When using a positive integer, the value only "
+ "applies to the last binlog passed on the command line. In GTID mode, "
+ "multiple GTIDs can be passed as a comma separated list, where each must "
+ "have a unique domain id. Each GTID is inclusive; only events up to the "
+ "given sequence numbers are printed.",
+ &stop_pos_str, &stop_pos_str, 0, GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0,
+ 0, 0},
{"table", 'T', "List entries for just this table (local log only).",
&table, &table, 0, GET_STR_ALLOC, REQUIRED_ARG,
0, 0, 0, 0, 0, 0},
@@ -1824,8 +1868,14 @@ static void cleanup()
my_free(const_cast<char*>(dirname_for_local_load));
my_free(start_datetime_str);
my_free(stop_datetime_str);
+ my_free(start_pos_str);
+ my_free(stop_pos_str);
+ my_free(start_gtids);
+ my_free(stop_gtids);
free_root(&glob_root, MYF(0));
+ delete domain_gtid_filter;
+
delete binlog_filter;
delete glob_description_event;
if (mysql)
@@ -2075,6 +2125,103 @@ get_one_option(const struct my_option *opt, const char *argument, const char *fi
print_version();
opt_version= 1;
break;
+ case OPT_STOP_POSITION:
+ {
+ stop_gtids= gtid_parse_string_to_list(stop_pos_str, strlen(stop_pos_str),
+ &n_stop_gtid_ranges);
+ if (stop_gtids == NULL)
+ {
+ int err= 0;
+ char *end_ptr= NULL;
+ /*
+ No GTIDs specified in OPT_STOP_POSITION specification. Treat the value
+ as a singular index.
+ */
+ stop_position= my_strtoll10(stop_pos_str, &end_ptr, &err);
+
+ if (err || *end_ptr)
+ {
+ // Can't parse the position from the user
+ sql_print_error("Stop position argument value is invalid. Should be "
+ "either a positive integer or GTID.");
+ return 1;
+ }
+ }
+ else if (n_stop_gtid_ranges > 0)
+ {
+ uint32 gtid_idx;
+
+ if (domain_gtid_filter == NULL)
+ domain_gtid_filter= new Domain_gtid_event_filter();
+
+ for (gtid_idx = 0; gtid_idx < n_stop_gtid_ranges; gtid_idx++)
+ {
+ rpl_gtid *stop_gtid= &stop_gtids[gtid_idx];
+ if (domain_gtid_filter->add_stop_gtid(stop_gtid))
+ {
+ sql_print_error(
+ "Domain id is invalid for GTID stop position %u-%u-%llu",
+ stop_gtid->domain_id, stop_gtid->server_id,
+ stop_gtid->seq_no);
+ return 1;
+ }
+ }
+ }
+ else
+ {
+ DBUG_ASSERT(0);
+ }
+ break;
+ }
+ case 'j':
+ {
+ start_gtids= gtid_parse_string_to_list(
+ start_pos_str, strlen(start_pos_str), &n_start_gtid_ranges);
+
+ if (start_gtids == NULL)
+ {
+ int err= 0;
+ char *end_ptr= NULL;
+ /*
+ No GTIDs specified in OPT_START_POSITION specification. Treat the value
+ as a singular index.
+ */
+ start_position= my_strtoll10(start_pos_str, &end_ptr, &err);
+
+ if (err || *end_ptr)
+ {
+ // Can't parse the position from the user
+ sql_print_error("Start position argument value is invalid. Should be "
+ "either a positive integer or GTID.");
+ return 1;
+ }
+ }
+ else if (n_start_gtid_ranges > 0)
+ {
+ uint32 gtid_idx;
+
+ if (domain_gtid_filter == NULL)
+ domain_gtid_filter= new Domain_gtid_event_filter();
+
+ for (gtid_idx = 0; gtid_idx < n_start_gtid_ranges; gtid_idx++)
+ {
+ rpl_gtid *start_gtid= &start_gtids[gtid_idx];
+ if (domain_gtid_filter->add_start_gtid(start_gtid))
+ {
+ sql_print_error(
+ "Domain id is invalid for GTID start position %u-%u-%llu",
+ start_gtid->domain_id, start_gtid->server_id,
+ start_gtid->seq_no);
+ return 1;
+ }
+ }
+ }
+ else
+ {
+ DBUG_ASSERT(0);
+ }
+ break;
+ }
case '?':
usage();
opt_version= 1;
@@ -2237,11 +2384,16 @@ static Exit_status dump_log_entries(const char* logname)
@retval ERROR_STOP An error occurred - the program should terminate.
@retval OK_CONTINUE No error, the program should continue.
*/
-static Exit_status check_master_version()
+static Exit_status check_master_version(uint *major, uint *minor, uint *patch,
+ my_bool *version_extracted)
{
MYSQL_RES* res = 0;
MYSQL_ROW row;
uint version;
+ size_t version_iter;
+
+ // Assume no problems extracting the master version
+ *version_extracted= TRUE;
if (mysql_query(mysql, "SELECT VERSION()") ||
!(res = mysql_store_result(mysql)))
@@ -2257,12 +2409,52 @@ static Exit_status check_master_version()
goto err;
}
- if (!(version = atoi(row[0])))
+ if (!(*major = atoi(row[0])))
{
error("Could not find server version: "
"Master reported NULL for the version.");
goto err;
}
+
+ /* Try to save the minor version, if supplied with a storage location */
+ if (minor != NULL || patch != NULL)
+ {
+ for (version_iter= 0; version_iter < strlen((const char *) row);
+ version_iter++)
+ {
+ if (row[0][version_iter] == '.')
+ {
+ version_iter= version_iter + 1;
+ break;
+ }
+ }
+ if (minor != NULL && !(*minor= atoi(&row[0][version_iter])))
+ {
+ warning("Could not extract minor version from MariaDB master version %s", row[0]);
+ *version_extracted= FALSE;
+ }
+ }
+
+ /* Try to save the patch version, if supplied with a storage location */
+ if (patch != NULL)
+ {
+ for (; version_iter < strlen((const char *) row); version_iter++)
+ {
+ if (row[0][version_iter] == '.')
+ {
+ version_iter= version_iter + 1;
+ break;
+ }
+ }
+ if (!(*patch= atoi(&row[0][version_iter])))
+ {
+ warning("Could not extract patch version from MariaDB master version %s", row[0]);
+ *version_extracted= FALSE;
+ }
+ }
+
+ version= *major;
+
/*
Make a notice to the server that this client
is checksum-aware. It does not need the first fake Rotate
@@ -2606,21 +2798,63 @@ static Exit_status dump_remote_log_entries(PRINT_EVENT_INFO *print_event_info,
DBUG_RETURN(retval);
net= &mysql->net;
- if ((retval= check_master_version()) != OK_CONTINUE)
+ uint major_version, minor_version;
+ my_bool was_version_extracted= TRUE; // Assume true to start
+ if ((retval= check_master_version(&major_version, &minor_version, NULL,
+ &was_version_extracted)) != OK_CONTINUE)
DBUG_RETURN(retval);
/*
COM_BINLOG_DUMP accepts only 4 bytes for the position, so we are forced to
cast to uint32.
*/
- DBUG_ASSERT(start_position <= UINT_MAX32);
- int4store(buf, (uint32)start_position);
+ size_t buf_idx= 0;
+ if (is_gtid_filtering_enabled() && n_start_gtid_ranges > 0)
+ {
+ if (was_version_extracted &&
+ (major_version < 10 || (major_version == 10 && minor_version < 7)))
+ {
+ error("Master does not support GTID filtering. This feature was added "
+ "in MariaDB version 10.7.");
+ DBUG_RETURN(ERROR_STOP);
+ }
+ else if (!was_version_extracted)
+ {
+ warning("Could not extract complete MariaDB version from master. Trying "
+ "to use GTID filtering, but it is not guaranteed that the "
+ "master supports it (must be 10.7+).");
+ }
+
+ size_t i;
+ for (i = 0; i < n_start_gtid_ranges; i++)
+ {
+ if (i > 0)
+ {
+ buf[buf_idx]= ',';
+ buf_idx += 1;
+ }
+
+ int4store(buf + buf_idx, start_gtids[i].domain_id);
+ buf[buf_idx+4]= '-';
+ int4store(buf + buf_idx + 5, start_gtids[i].server_id);
+ buf[buf_idx+9]= '-';
+ int8store(buf + buf_idx + 10, start_gtids[i].seq_no);
+ buf_idx += 18;
+ }
+ }
+ else
+ {
+ DBUG_ASSERT(start_position <= UINT_MAX32);
+ int4store(buf, (uint32) start_position);
+ buf_idx= BIN_LOG_HEADER_SIZE;
+ }
if (!opt_skip_annotate_row_events)
binlog_flags|= BINLOG_SEND_ANNOTATE_ROWS_EVENT;
if (!opt_stop_never)
binlog_flags|= BINLOG_DUMP_NON_BLOCK;
- int2store(buf + BIN_LOG_HEADER_SIZE, binlog_flags);
+ int2store(buf + buf_idx, binlog_flags);
+ buf_idx += 2;
size_t tlen = strlen(logname);
if (tlen > sizeof(buf) - 10)
@@ -2637,9 +2871,10 @@ static Exit_status dump_remote_log_entries(PRINT_EVENT_INFO *print_event_info,
}
else
slave_id= 0;
- int4store(buf + 6, slave_id);
- memcpy(buf + 10, logname, logname_len);
- if (simple_command(mysql, COM_BINLOG_DUMP, buf, logname_len + 10, 1))
+ int4store(buf + buf_idx, slave_id);
+ buf_idx += 4;
+ memcpy(buf + buf_idx, logname, logname_len);
+ if (simple_command(mysql, COM_BINLOG_DUMP, buf, logname_len + buf_idx, 1))
{
error("Got fatal error sending the log dump command.");
DBUG_RETURN(ERROR_STOP);
@@ -3210,6 +3445,14 @@ int main(int argc, char** argv)
"/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;\n");
fprintf(result_file, "/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=0*/;\n");
+
+ if (is_gtid_filtering_enabled())
+ {
+ fprintf(result_file,
+ "/*!100001 SET @@SESSION.SERVER_ID=@@GLOBAL.SERVER_ID */;\n"
+ "/*!100001 SET @@SESSION.GTID_DOMAIN_ID=@@GLOBAL.GTID_DOMAIN_ID "
+ "*/;\n");
+ }
}
if (tmpdir.list)
@@ -3271,3 +3514,4 @@ struct encryption_service_st encryption_handler=
#include "sql_list.cc"
#include "rpl_filter.cc"
#include "compat56.cc"
+#include "rpl_gtid.cc" \ No newline at end of file
diff --git a/man/mysqlbinlog.1 b/man/mysqlbinlog.1
index 633300bb7c5..1c7f013af7b 100644
--- a/man/mysqlbinlog.1
+++ b/man/mysqlbinlog.1
@@ -992,8 +992,14 @@ This option is useful for point\-in\-time recovery\&.
\fB\-\-start\-position=\fR\fB\fIN\fR\fR,
\fB\-j \fR\fB\fIN\fR\fR
.sp
-Start reading the binary log at the first event having a position equal to or greater than
-\fIN\fR\&. This option applies to the first log file named on the command line\&.
+Start reading the binary log at \fIN\fR\&. Type can either be a positive
+integer or a GTID\& list\&. When using a positive integer, the value only
+applies to the first binlog passed on the command line, and the first event
+that has a position equal to or greater than \fIN\fR is printed\&. In GTID mode,
+multiple GTIDs can be passed as a comma separated list, where each must have a
+unique domain id\&. Each GTID is exclusive; only events after a given sequence
+number will be printed to allow users to receive events after their current
+state, e.g. on a slave\&.
.sp
This option is useful for point\-in\-time recovery\&.
.RE
@@ -1063,8 +1069,13 @@ The slave server_id used for \fB--read-from-remote-server --stop-never\fR\&.
.\" stop-position option: mysqlbinlog
\fB\-\-stop\-position=\fR\fB\fIN\fR\fR
.sp
-Stop reading the binary log at the first event having a position equal to or greater than
-\fIN\fR\&. This option applies to the last log file named on the command line\&.
+Stop reading the binary log at the first event having a position equal to or
+greater than \fIN\fR\&. Type can either be a positive integer or a GTID
+list\&. When using a positive integer, the value only applies to the last log
+file named on the command line\&. When in GTID mode, multiple GTIDs can be
+passed as a comma separated list, where each must have a unique domain id\&.
+Each GTID is inclusive; only events up to the given sequence numbers are
+printed.
.sp
This option is useful for point\-in\-time recovery\&.
.RE
diff --git a/mysql-test/suite/binlog/include/mysqlbinlog_gtid_window_test_cases.inc b/mysql-test/suite/binlog/include/mysqlbinlog_gtid_window_test_cases.inc
new file mode 100644
index 00000000000..29defc45091
--- /dev/null
+++ b/mysql-test/suite/binlog/include/mysqlbinlog_gtid_window_test_cases.inc
@@ -0,0 +1,242 @@
+#
+# This file runs test cases for providing GTIDs to --start-position and
+# --stop-position arguments in mariadb-binlog
+#
+# param $is_remote boolean (0 for false, 1 for true) to perform a local file
+# or remote host analysis
+#
+
+--let MYSQLD_DATADIR=`select @@datadir`
+--let OUT_FILE=$MYSQLTEST_VARDIR/tmp/binlog.out
+--let BINLOG_FILE=master-bin.000001
+--let data_inconsistent_err= "table data is inconsistent after replaying binlog using GTID start/stop positions";
+
+if ($is_remote == 0)
+{
+ --let BINLOG_FILE_PARAM=$MYSQLD_DATADIR/$BINLOG_FILE.orig
+}
+if ($is_remote == 1)
+{
+ --let BINLOG_FILE_PARAM=--read-from-remote-server $BINLOG_FILE
+}
+
+## Initialize test data
+#
+SET @@session.gtid_domain_id= 0;
+SET @@session.server_id= 1;
+CREATE TABLE t1 (a int);
+INSERT INTO t1 values (1), (2);
+--let test2_t1_good_checksum= `CHECKSUM TABLE t1`
+
+SET @@session.gtid_domain_id= 1;
+SET @@session.server_id= 2;
+CREATE TABLE t2 (a int);
+INSERT INTO t2 values (1);
+
+SET @@session.gtid_domain_id= 0;
+SET @@session.server_id= 1;
+INSERT INTO t1 values (3), (4);
+--let t1_final_checksum= `CHECKSUM TABLE t1`
+
+SET @@session.gtid_domain_id= 1;
+SET @@session.server_id= 2;
+INSERT INTO t2 values (2);
+--let test4_t2_good_checksum= `CHECKSUM TABLE t2`
+
+SET @@session.server_id= 3;
+INSERT INTO t2 values (3);
+--let test3_t2_good_checksum= `CHECKSUM TABLE t2`
+
+SET @@session.server_id= 2;
+INSERT INTO t2 values (4);
+--let t2_final_checksum= `CHECKSUM TABLE t2`
+
+FLUSH LOGS;
+if ($is_remote == 0)
+{
+ --copy_file $MYSQLD_DATADIR/master-bin.000001 $BINLOG_FILE_PARAM
+}
+
+DROP TABLE t1;
+DROP TABLE t2;
+
+
+--echo #
+--echo # Test Case 1:
+--echo # The end of the binlog file resets the server and domain id of the
+--echo # session
+
+# As gtid_domain_id and server_id should not change after reading binlog in GTID
+# mode, change variables to otherwise-unused values to ensure they remain
+--let $reset_gtid_domain_id = `select @@session.gtid_domain_id`
+--let $reset_server_id = `select @@session.server_id`
+SET @@session.gtid_domain_id= 10;
+SET @@session.server_id= 20;
+
+# Replay the binlog
+--echo # MYSQL_BINLOG BINLOG_FILE_PARAM --start-position=0-1-0 --stop-position=0-1-2 | MYSQL
+--exec $MYSQL_BINLOG $BINLOG_FILE_PARAM --start-position=0-1-0 --stop-position=0-1-2 | $MYSQL
+--let $test_gtid_domain_id = `select @@session.gtid_domain_id`
+
+# Ensure variables haven't changed
+--let $assert_text = session gtid_domain_id should not change when reading binlog in GTID mode
+--let $assert_cond = @@session.gtid_domain_id = 10
+--source include/assert.inc
+--let $assert_text = session server_id should not change when reading binlog in GTID mode
+--let $assert_cond = @@session.server_id = 20
+--source include/assert.inc
+
+# Reset back to previous state
+--eval SET @@session.gtid_domain_id= $reset_gtid_domain_id
+--eval SET @@session.server_id= $reset_server_id
+DROP TABLE t1;
+DROP TABLE t2;
+
+
+--echo #
+--echo # Test Case 2:
+--echo # Single GTID range specified
+--echo # MYSQL_BINLOG BINLOG_FILE_PARAM --start-position=0-1-0 --stop-position=0-1-2 | MYSQL
+--exec $MYSQL_BINLOG $BINLOG_FILE_PARAM --start-position=0-1-0 --stop-position=0-1-2 | $MYSQL
+
+if ($test2_t1_good_checksum != `CHECKSUM TABLE t1`)
+{
+ die $data_inconsistent_err;
+}
+if ($t2_final_checksum != `CHECKSUM TABLE t2`)
+{
+ die $data_inconsistent_err;
+}
+DROP TABLE t1;
+DROP TABLE t2;
+
+
+--echo #
+--echo # Test Case 3:
+--echo # Single GTID range with different server_ids
+--echo # MYSQL_BINLOG BINLOG_FILE_PARAM --start-position=1-2-0 --stop-position=1-3-4 | MYSQL
+--exec $MYSQL_BINLOG $BINLOG_FILE_PARAM --start-position=1-2-0 --stop-position=1-3-4 | $MYSQL
+
+if ($test3_t2_good_checksum != `CHECKSUM TABLE t2`)
+{
+ die $data_inconsistent_err;
+}
+if ($t1_final_checksum != `CHECKSUM TABLE t1`)
+{
+ die $data_inconsistent_err;
+}
+DROP TABLE t1;
+DROP TABLE t2;
+
+
+--echo #
+--echo # Test Case 4:
+--echo # Multiple GTID ranges specified
+--echo # MYSQL_BINLOG BINLOG_FILE_PARAM --start-position=0-1-0,1-2-0 --stop-position=0-1-3,1-2-3 | MYSQL
+--exec $MYSQL_BINLOG $BINLOG_FILE_PARAM --start-position=0-1-0,1-2-0 --stop-position=0-1-3,1-2-3 | $MYSQL
+
+# Reuse checksum spot from test 4
+if ($t1_final_checksum != `CHECKSUM TABLE t1`)
+{
+ die $data_inconsistent_err;
+}
+if ($test4_t2_good_checksum != `CHECKSUM TABLE t2`)
+{
+ die $data_inconsistent_err;
+}
+DROP TABLE t1;
+DROP TABLE t2;
+
+
+--echo #
+--echo # Test Case 5:
+--echo # Multiple GTID ranges specified where the domain ids are listed in
+--echo # different orders between start/stop position
+--echo # MYSQL_BINLOG BINLOG_FILE_PARAM --stop-position=0-1-3,1-2-3 --start-position=1-2-0,0-1-0 | MYSQL
+--exec $MYSQL_BINLOG $BINLOG_FILE_PARAM --stop-position=0-1-3,1-2-3 --start-position=1-2-0,0-1-0 | $MYSQL
+
+# Reuse checksum spot from test 4
+if ($t1_final_checksum != `CHECKSUM TABLE t1`)
+{
+ die $data_inconsistent_err;
+}
+if ($test4_t2_good_checksum != `CHECKSUM TABLE t2`)
+{
+ die $data_inconsistent_err;
+}
+DROP TABLE t1;
+DROP TABLE t2;
+
+
+--echo #
+--echo # Test Case 6:
+--echo # Only start position specified
+CREATE TABLE t1 (a int);
+INSERT INTO t1 values (3), (4);
+--let test6_t1_good_checksum= `CHECKSUM TABLE t1`
+DROP TABLE t1;
+CREATE TABLE t1 (a int);
+--echo # MYSQL_BINLOG BINLOG_FILE_PARAM --start-position=0-1-2 | MYSQL
+--exec $MYSQL_BINLOG $BINLOG_FILE_PARAM --start-position=0-1-2 | $MYSQL
+if ($test6_t1_good_checksum != `CHECKSUM TABLE t1`)
+{
+ die $data_inconsistent_err;
+}
+if ($t2_final_checksum != `CHECKSUM TABLE t2`)
+{
+ die $data_inconsistent_err;
+}
+DROP TABLE t1;
+DROP TABLE t2;
+
+
+--echo #
+--echo # Test Case 7:
+--echo # Only stop position specified
+--echo # MYSQL_BINLOG BINLOG_FILE_PARAM --stop-position=0-1-2 | MYSQL
+--exec $MYSQL_BINLOG $BINLOG_FILE_PARAM --stop-position=0-1-2 | $MYSQL
+
+# Reuse checksum spot from test 2
+if ($test2_t1_good_checksum != `CHECKSUM TABLE t1`)
+{
+ die $data_inconsistent_err;
+}
+if ($t2_final_checksum != `CHECKSUM TABLE t2`)
+{
+ die $data_inconsistent_err;
+}
+DROP TABLE t1;
+DROP TABLE t2;
+
+
+--echo #
+--echo # Test Case 8:
+--echo # Seq_no=0 in --start-position includes all events for a domain
+--echo # MYSQL_BINLOG BINLOG_FILE_PARAM --start-position=0-1-0,1-2-0 | MYSQL
+--exec $MYSQL_BINLOG $BINLOG_FILE_PARAM --start-position=0-1-0,1-2-0 | $MYSQL
+if ($t1_final_checksum != `CHECKSUM TABLE t1`)
+{
+ die "t1 data should be complete as binlog replay should include domain 0 entirely in results";
+}
+if ($t2_final_checksum != `CHECKSUM TABLE t2`)
+{
+ die "t2 data should be complete as binlog replay should include domain 1 entirely in results";
+}
+DROP TABLE t1;
+DROP TABLE t2;
+
+--echo #
+--echo # Test Case 9:
+--echo # Seq_no=0 in --stop-position excludes all events for a domain
+--echo # MYSQL_BINLOG BINLOG_FILE_PARAM --stop-position=0-1-0,1-2-0 | MYSQL
+--exec $MYSQL_BINLOG $BINLOG_FILE_PARAM --stop-position=0-1-0,1-2-0 | $MYSQL
+if (0 < `SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'test' AND table_name = 't1'`)
+{
+ die "t1 should not exist as binlog replay should exclude domain 0 from results";
+}
+if (0 < `SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'test' AND table_name = 't2'`)
+{
+ die "t2 should not exist as binlog replay should exclude domain 1 from results";
+}
+DROP TABLE t1;
+DROP TABLE t2; \ No newline at end of file
diff --git a/mysql-test/suite/binlog/t/binlog_mysqlbinlog_gtid_window-master.opt b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_gtid_window-master.opt
new file mode 100644
index 00000000000..d17999c07c1
--- /dev/null
+++ b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_gtid_window-master.opt
@@ -0,0 +1 @@
+--timezone=GMT-8 \ No newline at end of file
diff --git a/mysql-test/suite/binlog/t/binlog_mysqlbinlog_gtid_window.test b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_gtid_window.test
new file mode 100644
index 00000000000..0bc83927123
--- /dev/null
+++ b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_gtid_window.test
@@ -0,0 +1,124 @@
+#
+# Purpose:
+#
+# This test ensures that the mariadb-binlog CLI tool can filter log events
+# using GTID ranges. More specifically, this test ensures the following
+# capabilities:
+# 1) GTIDs can be used to filter results on local binlog files
+# 2) GTIDs can be used to filter results from remote servers
+# 3) For a given GTID range, its start-position is exclusive and its
+# stop-position is inclusive. This allows users to receive events strictly
+# after those that they already have, and is useful in cases such as: 1)
+# events were received out of order and should be re-sent, or 2)
+# specifying the gtid state of a slave to get events newer than their
+# current state.
+# 4) After the events have been written, the session server id and domain id
+# are reset to their former values
+#
+#
+# Methodology:
+#
+# This test validates the expected capabilities using the following test cases
+# on both a local binlog file and remote server for all binlog formats.
+# Test Case 1) The end of the binlog file resets the server and domain id of
+# the session
+# Test Case 2) Single GTID range specified
+# Test Case 3) Single GTID range with different server_ids
+# Test Case 4) Multiple GTID ranges specified
+# Test Case 5) Multiple GTID ranges specified where the domain ids are
+# listed in different orders between start/stop position
+# Test Case 6) Only start position specified
+# Test Case 7) Only stop position specified
+# Test Case 8) Seq_no=0 in --start-position includes all events for a domain
+# Test Case 9) Seq_no=0 in --stop-position excludes all events for a domain
+#
+# To validate for data consistency, each test case compares a checksum of
+# correct data against a variant created after replaying the binlog using
+# --(start|stop)-position. If the checksums are identical, the test passes.
+# If the checksums differ, data has been changed and the test fails.
+#
+# Additionally, this test validates the following error scenarios:
+# Error Case 1) User provides invalid positions
+# Error Case 2) User provides GTID ranges with repeated domain ids
+#
+#
+# References:
+# MDEV-4989: Support for GTID in mysqlbinlog
+#
+
+--source include/have_log_bin.inc
+
+--echo ###############################
+--echo # Test Setup
+--echo ###############################
+
+## Save old state
+#
+let orig_gtid_domain_id = `select @@session.gtid_domain_id`;
+let orig_server_id = `select @@session.server_id`;
+
+DROP TABLE IF EXISTS t1;
+DROP TABLE IF EXISTS t2;
+RESET MASTER;
+
+
+--echo ######################################
+--echo # Test Group 1
+--echo # Run test cases on local log file
+--echo ######################################
+--let is_remote= 0
+--source include/mysqlbinlog_gtid_window_test_cases.inc
+
+
+--echo ######################################
+--echo # Test Group 2
+--echo # Run test cases on remote host
+--echo ######################################
+--let is_remote= 1
+--source include/mysqlbinlog_gtid_window_test_cases.inc
+
+
+--echo ##############################
+--echo # Error Cases
+--echo ##############################
+
+--let $MYSQLD_DATADIR=`select @@datadir`
+--echo #
+--echo # Error Case 1:
+--echo # User provides invalid positions
+--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=z --base64-output=NEVER
+--error 1
+--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --start-position=z --base64-output=NEVER
+
+--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=1- --base64-output=NEVER
+--error 1
+--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --start-position=1- --base64-output=NEVER
+
+--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=1-2 --base64-output=NEVER
+--error 1
+--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --start-position=1-2 --base64-output=NEVER
+
+--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=1-2- --base64-output=NEVER
+--error 1
+--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --start-position=1-2- --base64-output=NEVER
+
+--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=-1 --base64-output=NEVER
+--error 1
+--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --start-position=-1 --base64-output=NEVER
+
+--echo #
+--echo # Error Case 2:
+--echo # User provides GTID ranges with repeated domain ids
+--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=0-1-1,0-1-8 --stop-position=0-1-4,0-1-12 --base64-output=NEVER
+--error 1
+--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --start-position=0-1-1,0-1-8 --stop-position=0-1-4,0-1-12 --base64-output=NEVER
+
+
+--echo ##############################
+--echo # Cleanup
+--echo ##############################
+DROP TABLE IF EXISTS t1;
+DROP TABLE IF EXISTS t2;
+--eval SET @@global.gtid_domain_id= $orig_gtid_domain_id
+--eval SET @@global.server_id= $orig_server_id
+
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index 306ae878060..629be5165bf 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -17,6 +17,7 @@
/* Definitions for MariaDB global transaction ID (GTID). */
+#ifndef MYSQL_CLIENT
#include "mariadb.h"
#include "sql_priv.h"
#include "unireg.h"
@@ -1273,6 +1274,7 @@ rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid)
return true;
}
+#endif
/*
Parse a GTID at the start of a string, and update the pointer to point
@@ -1310,9 +1312,32 @@ gtid_parser_helper(const char **ptr, const char *end, rpl_gtid *out_gtid)
return 0;
}
+/*
+ Unpack a GTID at the start of a string, and update the pointer to point
+ at the first character after the unpacked GTID.
+
+ Returns 0 on ok, non-zero on parse error.
+*/
+static int
+gtid_unpack_helper(const char **ptr, const char *end, rpl_gtid *out_gtid)
+{
+ const char *p= *ptr;
+
+ if (p[4] != '-' || p[9] != '-')
+ return 1;
+
+ out_gtid->domain_id= (uint32)uint4korr(p);
+ out_gtid->server_id= (uint32)uint4korr(&p[5]);
+ out_gtid->seq_no= (uint64)uint8korr(&p[10]);
+
+ *ptr= p + 18;
+ return 0;
+}
rpl_gtid *
-gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len)
+gtid_read_to_list(const char *str, size_t str_len, uint32 *out_len,
+ int reader_f(const char **ptr, const char *end,
+ rpl_gtid *out_gtid))
{
const char *p= const_cast<char *>(str);
const char *end= p + str_len;
@@ -1323,7 +1348,7 @@ gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len)
{
rpl_gtid gtid;
- if (len >= (((uint32)1 << 28)-1) || gtid_parser_helper(&p, end, &gtid))
+ if (len >= (((uint32)1 << 28)-1) || reader_f(&p, end, &gtid))
{
my_free(list);
return NULL;
@@ -1350,6 +1375,20 @@ gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len)
}
+rpl_gtid *
+gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len)
+{
+ return gtid_read_to_list(str, str_len, out_len, gtid_parser_helper);
+}
+
+rpl_gtid *
+gtid_unpack_string_to_list(const char *str, size_t str_len, uint32 *out_len)
+{
+ return gtid_read_to_list(str, str_len, out_len, gtid_unpack_helper);
+}
+
+#ifndef MYSQL_CLIENT
+
/*
Update the slave replication state with the GTID position obtained from
master when connecting with old-style (filename,offset) position.
@@ -2974,3 +3013,382 @@ gtid_waiting::remove_from_wait_queue(gtid_waiting::hash_element *he,
queue_remove(&he->queue, elem->queue_idx);
}
+
+#endif
+
+Window_gtid_event_filter::Window_gtid_event_filter() :
+ m_has_start(FALSE),
+ m_has_stop(FALSE),
+ m_is_active(FALSE),
+ m_has_passed(FALSE)
+ {
+ // m_start and m_stop do not need initial values if unused
+ }
+
+Window_gtid_event_filter::Window_gtid_event_filter(rpl_gtid *start, rpl_gtid *stop) :
+ m_is_active(FALSE),
+ m_has_passed(FALSE)
+{
+ DBUG_ASSERT(start->domain_id == stop->domain_id);
+
+ m_is_active= FALSE;
+ m_has_passed= FALSE;
+
+ m_has_start= TRUE;
+ m_start.domain_id= start->domain_id;
+ m_start.server_id= start->server_id;
+ m_start.seq_no= start->seq_no;
+
+ m_has_stop= TRUE;
+ m_stop.domain_id= stop->domain_id;
+ m_stop.server_id= stop->server_id;
+ m_stop.seq_no= stop->seq_no;
+}
+
+int Window_gtid_event_filter::set_start_gtid(rpl_gtid *start)
+{
+ int err= 0;
+ if (m_has_start)
+ {
+ err= 1;
+ goto err;
+ }
+
+ if (m_has_stop && m_stop.domain_id != start->domain_id)
+ {
+ err= 1;
+ goto err;
+ }
+
+ // Copy values
+ m_has_start= TRUE;
+ m_start.domain_id= start->domain_id;
+ m_start.server_id= start->server_id;
+ m_start.seq_no= start->seq_no;
+
+err:
+ return err;
+}
+
+int Window_gtid_event_filter::set_stop_gtid(rpl_gtid *stop)
+{
+ int err= 0;
+ if (m_has_stop)
+ {
+ err= 1;
+ goto err;
+ }
+
+ if (m_has_start && m_start.domain_id != stop->domain_id)
+ {
+ err= 1;
+ goto err;
+ }
+
+ // Copy values
+ m_has_stop= TRUE;
+ m_stop.domain_id= stop->domain_id;
+ m_stop.server_id= stop->server_id;
+ m_stop.seq_no= stop->seq_no;
+
+err:
+ return err;
+}
+
+static inline my_bool is_gtid_at_or_after(rpl_gtid *boundary,
+ rpl_gtid *test_gtid)
+{
+ return test_gtid->domain_id == boundary->domain_id &&
+ test_gtid->server_id == boundary->server_id &&
+ test_gtid->seq_no >= boundary->seq_no;
+}
+
+static inline my_bool is_gtid_before(rpl_gtid *boundary,
+ rpl_gtid *test_gtid)
+{
+ return test_gtid->domain_id == boundary->domain_id &&
+ test_gtid->server_id == boundary->server_id &&
+ test_gtid->seq_no <= boundary->seq_no;
+}
+
+my_bool Window_gtid_event_filter::exclude(rpl_gtid *gtid)
+{
+ /* Assume result should be excluded to start */
+ my_bool should_exclude= TRUE;
+
+ DBUG_ASSERT((m_has_start || m_has_stop) &&
+ (gtid->domain_id == m_start.domain_id ||
+ gtid->domain_id == m_stop.domain_id));
+
+ if (!m_is_active && !m_has_passed)
+ {
+ /*
+ This filter has not yet been activated. Check if the gtid is within the
+ bounds of this window.
+ */
+
+ if (!m_has_start)
+ {
+ /*
+ Start GTID was not provided, so we want to include everything up to m_stop
+ */
+ m_is_active= TRUE;
+ should_exclude= FALSE;
+ }
+ else if (is_gtid_at_or_after(&m_start, gtid))
+ {
+ m_is_active= TRUE;
+
+ DBUG_PRINT("gtid-event-filter",
+ ("Window: Begin (%d-%d-%d, %d-%d-%llu]", m_start.domain_id,
+ m_start.server_id, m_start.seq_no, m_stop.domain_id,
+ m_stop.server_id, m_stop.seq_no));
+
+ /*
+ As the start of the range is exclusive, if this gtid is the start of
+ the range, exclude it
+ */
+ if (gtid->seq_no == m_start.seq_no &&
+ gtid->server_id == m_start.server_id)
+ should_exclude= TRUE;
+ else
+ should_exclude= FALSE;
+ }
+ } /* if (!m_is_active && !m_has_passed) */
+ else if (m_is_active && !m_has_passed)
+ {
+ /*
+ This window is currently active so we want the event group to be included
+ in the results. Additionally check if we are at the end of the window.
+ If no end of the window is provided, go indefinitely
+ */
+ should_exclude= FALSE;
+
+ if (m_has_stop && is_gtid_at_or_after(&m_stop, gtid))
+ {
+ DBUG_PRINT("gtid-event-filter",
+ ("Window: End (%d-%d-%d, %d-%d-%llu]", m_start.domain_id,
+ m_start.server_id, m_start.seq_no, m_stop.domain_id,
+ m_stop.server_id, m_stop.seq_no));
+ m_is_active= FALSE;
+ m_has_passed= TRUE;
+
+ if (gtid->server_id == m_stop.server_id && gtid->seq_no > m_stop.seq_no)
+ {
+ /*
+ The GTID is after the finite stop of the window, don't let it pass
+ through
+ */
+ should_exclude= TRUE;
+ }
+ }
+ else if (m_has_start && is_gtid_before(&m_start, gtid))
+ {
+ /*
+ Out of order check, the window is active but this GTID takes place
+ before the window begins. keep the window active, but exclude it from
+ passing through.
+ */
+ should_exclude= TRUE;
+ }
+ }
+ else if (m_has_passed && m_has_stop && is_gtid_before(&m_stop, gtid))
+ {
+ /* Test if events are out of order */
+ if (!m_has_start || (m_has_start && is_gtid_at_or_after(&m_start, gtid)))
+ {
+ /*
+ The filter window has closed because it has seen a GTID higher than its
+ end boundary; however, this GTID is out of order and should be passed
+ through.
+ */
+ should_exclude= TRUE;
+ }
+ }
+
+ return should_exclude;
+}
+
+Id_delegating_gtid_event_filter::Id_delegating_gtid_event_filter()
+{
+ uint32 i;
+
+ m_filter_id_mask= 0xf;
+
+ m_filters_by_id= (gtid_filter_element **) my_malloc(
+ PSI_NOT_INSTRUMENTED,
+ (m_filter_id_mask + 1) * sizeof(gtid_filter_element *),
+ MYF(MY_WME)
+ );
+
+ if (m_filters_by_id == NULL)
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+
+ for (i = 0; i <= m_filter_id_mask; i++)
+ {
+ m_filters_by_id[i]= NULL;
+ }
+
+ m_default_filter= new Accept_all_gtid_filter();
+}
+
+/*
+ Deconstructor deletes:
+ 1) All Identifiable_gtid_event_filters added
+ 2) All gtid_filter_element allocations
+*/
+Id_delegating_gtid_event_filter::~Id_delegating_gtid_event_filter()
+{
+ uint32 i;
+ for (i = 0; i <= m_filter_id_mask; i++)
+ {
+ gtid_filter_element *filter_element= m_filters_by_id[i],
+ *filter_element_to_del= NULL;
+ while(filter_element)
+ {
+ filter_element_to_del= filter_element;
+ filter_element= filter_element->next;
+ delete filter_element_to_del->filter;
+ my_free(filter_element_to_del);
+ }
+ }
+ my_free(m_filters_by_id);
+
+ delete m_default_filter;
+}
+
+void Id_delegating_gtid_event_filter::set_default_filter(Gtid_event_filter *filter)
+{
+ if (m_default_filter)
+ delete m_default_filter;
+
+ m_default_filter= filter;
+}
+
+gtid_filter_element *
+Id_delegating_gtid_event_filter::try_find_filter_element_for_id(
+ gtid_filter_identifier filter_id)
+{
+ // Add this into the domain id list
+ uint32 map_idx= filter_id & m_filter_id_mask;
+ gtid_filter_element *filter_idx= m_filters_by_id[map_idx];
+
+ /* Find list index to add this filter */
+ while (filter_idx)
+ {
+ if (filter_idx->filter->get_filter_identifier() == filter_id)
+ break;
+ filter_idx= filter_idx->next;
+ }
+
+ return filter_idx;
+}
+
+gtid_filter_element *
+Id_delegating_gtid_event_filter::find_or_create_filter_element_for_id(
+ gtid_filter_identifier filter_id)
+{
+ // Add this into the domain id list
+ uint32 map_idx= filter_id & m_filter_id_mask;
+ gtid_filter_element *filter_idx= m_filters_by_id[map_idx],
+ *prev_idx= NULL;
+
+ /* Find list index to add this filter */
+ while (filter_idx)
+ {
+ prev_idx= filter_idx;
+ if (filter_idx->filter->get_filter_identifier() == filter_id)
+ {
+ break;
+ }
+ prev_idx= filter_idx;
+ filter_idx= filter_idx->next;
+ }
+
+ if (filter_idx == NULL)
+ {
+ // No other domain ids have filters that index here, create this one
+ filter_idx= (gtid_filter_element *) my_malloc(
+ PSI_NOT_INSTRUMENTED, sizeof(gtid_filter_element), MYF(MY_WME));
+ filter_idx->filter= NULL;
+ filter_idx->next= NULL;
+
+ if (prev_idx == NULL)
+ {
+ // This is the first filter in the bucket
+ m_filters_by_id[map_idx]= filter_idx;
+ }
+ else
+ {
+ // End of list, append filter list to tail
+ prev_idx->next= filter_idx;
+ }
+ }
+
+ return filter_idx;
+}
+
+my_bool Id_delegating_gtid_event_filter::exclude(rpl_gtid *gtid)
+{
+ gtid_filter_identifier filter_id= get_id_from_gtid(gtid);
+ gtid_filter_element *filter_element= try_find_filter_element_for_id(filter_id);
+ Gtid_event_filter *filter=
+ (filter_element ? filter_element->filter : m_default_filter);
+ return filter->exclude(gtid);
+}
+
+Window_gtid_event_filter *
+Domain_gtid_event_filter::find_or_create_window_filter_for_id(
+ uint32 domain_id)
+{
+ gtid_filter_element *filter_element=
+ find_or_create_filter_element_for_id(domain_id);
+ Window_gtid_event_filter *wgef= NULL;
+
+ if (filter_element->filter == NULL)
+ {
+ // New filter
+ wgef= new Window_gtid_event_filter();
+ filter_element->filter= new Identifiable_gtid_event_filter(domain_id, wgef);
+ }
+ else if (filter_element->filter->get_filter_type() == WINDOW_GTID_FILTER_TYPE)
+ {
+ // We have an existing window filter here
+ wgef= (Window_gtid_event_filter *)
+ filter_element->filter->get_identified_filter();
+ }
+ /*
+ Else: We have an existing filter but it is not of window type so propogate
+ NULL filter
+ */
+
+ return wgef;
+}
+
+int Domain_gtid_event_filter::add_start_gtid(rpl_gtid *gtid)
+{
+ int err= 0;
+ Window_gtid_event_filter *filter_to_update=
+ find_or_create_window_filter_for_id(gtid->domain_id);
+
+ if (filter_to_update == NULL)
+ err= 1;
+ else
+ err= filter_to_update->set_start_gtid(gtid);
+
+ return err;
+}
+
+int Domain_gtid_event_filter::add_stop_gtid(rpl_gtid *gtid)
+{
+ int err= 0;
+ Window_gtid_event_filter *filter_to_update=
+ find_or_create_window_filter_for_id(gtid->domain_id);
+
+ if (filter_to_update == NULL)
+ err= 1;
+ else
+ err= filter_to_update->set_stop_gtid(gtid);
+
+ return err;
+} \ No newline at end of file
diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h
index 11541c8000c..3e54b995a64 100644
--- a/sql/rpl_gtid.h
+++ b/sql/rpl_gtid.h
@@ -380,5 +380,258 @@ extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid,
extern int gtid_check_rpl_slave_state_table(TABLE *table);
extern rpl_gtid *gtid_parse_string_to_list(const char *p, size_t len,
uint32 *out_len);
+extern rpl_gtid *gtid_unpack_string_to_list(const char *p, size_t len,
+ uint32 *out_len);
+
+/*
+ Interface to support different methods of filtering log events by GTID
+*/
+class Gtid_event_filter
+{
+public:
+ Gtid_event_filter() {};
+ virtual ~Gtid_event_filter() {};
+
+ enum gtid_event_filter_type
+ {
+ DELEGATING_GTID_FILTER_TYPE = 1,
+ WINDOW_GTID_FILTER_TYPE = 2,
+ ACCEPT_ALL_GTID_FILTER_TYPE = 3
+ };
+
+ /*
+ Run the filter on an input gtid to test if the corresponding log events
+ should be excluded from a result
+
+ Returns TRUE when the event group corresponding to the input GTID should be
+ excluded.
+ Returns FALSE when the event group should be included.
+ */
+ virtual my_bool exclude(rpl_gtid *) = 0;
+
+ /*
+ The gtid_event_filter_type that corresponds to the underlying filter
+ implementation
+ */
+ virtual uint32 get_filter_type() = 0;
+};
+
+/*
+ Filter implementation which will include any and all input GTIDs. This is
+ used to set default behavior for GTIDs that do not have explicit filters
+ set on their domain_id, e.g. when a Window_gtid_event_filter is used for
+ a specific domain, then all other domain_ids will be accepted using this
+ filter implementation.
+*/
+class Accept_all_gtid_filter : public Gtid_event_filter
+{
+public:
+ Accept_all_gtid_filter() {}
+ ~Accept_all_gtid_filter() {}
+ my_bool exclude(rpl_gtid *gtid) { return FALSE; }
+ uint32 get_filter_type() { return ACCEPT_ALL_GTID_FILTER_TYPE; }
+};
+
+/*
+ A sub-class of Gtid_event_filter which allows for quick identification
+ of potentially applicable filters for arbitrary GTIDs.
+*/
+typedef uint32 gtid_filter_identifier;
+class Identifiable_gtid_event_filter : public Gtid_event_filter
+{
+
+public:
+ Identifiable_gtid_event_filter(gtid_filter_identifier filter_id,
+ Gtid_event_filter *filter)
+ : m_filter_id(filter_id), m_filter(filter){};
+
+ ~Identifiable_gtid_event_filter() {
+ delete m_filter;
+ };
+
+ Gtid_event_filter *get_identified_filter() { return m_filter; }
+ gtid_filter_identifier get_filter_identifier() { return m_filter_id; }
+
+ /*
+ Inherited functionality uses composition to call the pass-through filter
+ */
+ my_bool exclude(rpl_gtid *gtid) { return m_filter->exclude(gtid); }
+ uint32 get_filter_type() { return m_filter->get_filter_type(); }
+
+protected:
+ gtid_filter_identifier m_filter_id;
+ Gtid_event_filter *m_filter;
+};
+
+/*
+ A filter implementation that passes through events between two GTIDs, m_start
+ (exclusive) and m_stop (inclusive).
+
+ This filter is stateful, such that it expects GTIDs to be a sequential
+ stream, and internally, the window will activate/deactivate when the start
+ and stop positions of the event stream have passed through, respectively.
+
+ Window activation is used to permit events from the same domain id which fall
+ in-between m_start and m_stop, but are not from the same server id. For
+ example, consider the following event stream with GTIDs 0-1-1,0-2-1,0-1-2.
+ With m_start as 0-1-0 and m_stop as 0-1-2, we want 0-2-1 to be included in
+ this filter. Therefore, the window activates upon seeing 0-1-1, and allows
+ any GTIDs within this domain to pass through until 0-1-2 has been
+ encountered.
+*/
+class Window_gtid_event_filter : public Gtid_event_filter
+{
+public:
+ Window_gtid_event_filter();
+ Window_gtid_event_filter(rpl_gtid *start, rpl_gtid *stop);
+ ~Window_gtid_event_filter() {}
+
+ my_bool exclude(rpl_gtid*);
+
+ /*
+ Set the GTID that begins this window (exclusive)
+
+ Returns 0 on ok, non-zero on error
+ */
+ int set_start_gtid(rpl_gtid *start);
+
+ /*
+ Set the GTID that ends this window (inclusive)
+
+ Returns 0 on ok, non-zero on error
+ */
+ int set_stop_gtid(rpl_gtid *stop);
+
+ uint32 get_filter_type() { return WINDOW_GTID_FILTER_TYPE; }
+
+private:
+ /*
+ m_has_start : Indicates if a start to this window has been explicitly
+ provided. A window starts immediately if not provided.
+ */
+ my_bool m_has_start;
+
+ /*
+ m_has_stop : Indicates if a stop to this window has been explicitly
+ provided. A window continues indefinitely if not provided.
+ */
+ my_bool m_has_stop;
+
+ /*
+ m_is_active : Indicates whether or not the program is currently reading
+ events from within this window. When TRUE, events with
+ different server ids than those specified by m_start or
+ m_stop will be passed through.
+ */
+ my_bool m_is_active;
+
+ /*
+ m_has_passed : Indicates whether or not the program is currently reading
+ events from within this window.
+ */
+ my_bool m_has_passed;
+
+ /* m_start : marks the GTID that begins the window (exclusive). */
+ rpl_gtid m_start;
+
+ /* m_stop : marks the GTID that ends the range (inclusive). */
+ rpl_gtid m_stop;
+
+ /* last_gtid_seen: saves the last */
+ rpl_gtid last_gtid_seen;
+};
+
+/*
+ Data structure to help with quick lookup for filters. More specifically,
+ if two filters have identifiers that lead to the same hash, they will be
+ put into a linked list.
+*/
+typedef struct _gtid_filter_element
+{
+ Identifiable_gtid_event_filter *filter;
+ struct _gtid_filter_element *next;
+} gtid_filter_element;
+
+/*
+ Gtid_event_filter subclass which has no specific implementation, but rather
+ delegates the filtering to specific identifiable/mapped implementations.
+
+ A default filter is used for GTIDs that are passed through which no explicit
+ filter can be identified.
+
+ This class should be subclassed, where the get_id_from_gtid function
+ specifies how to extract the filter identifier from a GTID.
+*/
+class Id_delegating_gtid_event_filter : public Gtid_event_filter
+{
+public:
+ Id_delegating_gtid_event_filter();
+ ~Id_delegating_gtid_event_filter();
+
+ my_bool exclude(rpl_gtid *gtid);
+ void set_default_filter(Gtid_event_filter *default_filter);
+
+ uint32 get_filter_type() { return DELEGATING_GTID_FILTER_TYPE; }
+
+ virtual gtid_filter_identifier get_id_from_gtid(rpl_gtid *) = 0;
+
+
+protected:
+
+ uint32 m_filter_id_mask;
+ Gtid_event_filter *m_default_filter;
+
+ /*
+ To reduce time to find a gtid window, they are indexed by domain_id. More
+ specifically, domain_ids are arranged into m_filter_id_mask+1 buckets, and
+ each bucket is a linked list of gtid_filter_elements that share the same
+ index. The index itself is found by a bitwise and, i.e.
+ some_rpl_gtid.domain_id & m_filter_id_mask
+ */
+ gtid_filter_element **m_filters_by_id;
+
+ gtid_filter_element *try_find_filter_element_for_id(gtid_filter_identifier);
+ gtid_filter_element *find_or_create_filter_element_for_id(gtid_filter_identifier);
+};
+
+/*
+ A subclass of Id_delegating_gtid_event_filter which identifies filters using the
+ domain id of a GTID.
+
+ Additional helper functions include:
+ add_start_gtid(GTID) : adds a start GTID position to this filter, to be
+ identified by its domain id
+ add_stop_gtid(GTID) : adds a stop GTID position to this filter, to be
+ identified by its domain id
+*/
+class Domain_gtid_event_filter : public Id_delegating_gtid_event_filter
+{
+public:
+
+ /*
+ Returns the domain id of from the input GTID
+ */
+ gtid_filter_identifier get_id_from_gtid(rpl_gtid *gtid)
+ {
+ return gtid->domain_id;
+ }
+
+ /*
+ Helper function to start a GTID window filter at the given GTID
+
+ Returns 0 on ok, non-zero on error
+ */
+ int add_start_gtid(rpl_gtid *gtid);
+
+ /*
+ Helper function to end a GTID window filter at the given GTID
+
+ Returns 0 on ok, non-zero on error
+ */
+ int add_stop_gtid(rpl_gtid *gtid);
+
+private:
+ Window_gtid_event_filter *find_or_create_window_filter_for_id(gtid_filter_identifier);
+};
#endif /* RPL_GTID_H */
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index b9d3eec5a60..b1b450d7b40 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -2121,8 +2121,11 @@ dispatch_command_return dispatch_command(enum enum_server_command command, THD *
case COM_BINLOG_DUMP:
{
ulong pos;
+ uint32 n_start_gtids= 0;
+ rpl_gtid *start_gtids= NULL;
ushort flags;
uint32 slave_server_id;
+ uint32 unpack_idx= 0;
status_var_increment(thd->status_var.com_other);
@@ -2131,19 +2134,42 @@ dispatch_command_return dispatch_command(enum enum_server_command command, THD *
break;
/* TODO: The following has to be changed to an 8 byte integer */
- pos = uint4korr(packet);
- flags = uint2korr(packet + 4);
+ if (packet[4] == '-' && packet[9] == '-')
+ {
+ unpack_idx= 18;
+ while (packet[unpack_idx] == ',')
+ unpack_idx += 19; // 18 chars for gtid + 1 for comma
+ start_gtids= gtid_unpack_string_to_list(packet, unpack_idx, &n_start_gtids);
+
+ /*
+ Set pos to the start of the binlog file for scanning
+
+ TODO: When GTID indexing is complete (MDEV-4991), update pos by
+ looking it up in the index
+ */
+ pos= 4;
+ } /* if (packet[4] == '-' && packet[9] == '-') */
+ else
+ {
+ /* Single numeric log position case */
+ pos = uint4korr(packet);
+ unpack_idx += 4;
+ }
+ flags = uint2korr(packet + unpack_idx);
+ unpack_idx += 2;
thd->variables.server_id=0; /* avoid suicide */
- if ((slave_server_id= uint4korr(packet+6))) // mysqlbinlog.server_id==0
+ if ((slave_server_id= uint4korr(packet+unpack_idx))) // mysqlbinlog.server_id==0
kill_zombie_dump_threads(slave_server_id);
thd->variables.server_id = slave_server_id;
+ unpack_idx += 4;
- const char *name= packet + 10;
+ const char *name= packet + unpack_idx;
size_t nlen= strlen(name);
general_log_print(thd, command, "Log: '%s' Pos: %lu", name, pos);
if (nlen < FN_REFLEN)
- mysql_binlog_send(thd, thd->strmake(name, nlen), (my_off_t)pos, flags);
+ mysql_binlog_send(thd, thd->strmake(name, nlen), (my_off_t)pos, flags,
+ start_gtids, n_start_gtids);
thd->unregister_slave(); // todo: can be extraneous
/* fake COM_QUIT -- if we get here, the thread needs to terminate */
error = TRUE;
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 90fdce1b56f..722ca301c81 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -163,6 +163,8 @@ struct binlog_send_info {
bool should_stop;
size_t dirlen;
+ Gtid_event_filter *gtid_event_filter;
+
binlog_send_info(THD *thd_arg, String *packet_arg, ushort flags_arg,
char *lfn)
: thd(thd_arg), net(&thd_arg->net), packet(packet_arg),
@@ -185,6 +187,8 @@ struct binlog_send_info {
error_text[0] = 0;
bzero(&error_gtid, sizeof(error_gtid));
until_binlog_state.init();
+
+ gtid_event_filter= NULL;
}
};
@@ -1751,6 +1755,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
}
}
+
/* Skip GTID event groups until we reach slave position within a domain_id. */
if (event_type == GTID_EVENT && info->using_gtid_state)
{
@@ -1758,7 +1763,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
slave_connection_state::entry *gtid_entry;
rpl_gtid *gtid;
- if (gtid_state->count() > 0 || until_gtid_state)
+ if (gtid_state->count() > 0 || until_gtid_state || info->gtid_event_filter)
{
rpl_gtid event_gtid;
@@ -1899,6 +1904,17 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
}
}
}
+
+ /*
+ Should this result be excluded from the output?
+ */
+ if (info->gtid_event_filter &&
+ info->gtid_event_filter->exclude(&event_gtid))
+ {
+ info->gtid_skip_group=
+ (flags2 & Gtid_log_event::FL_STANDALONE ? GTID_SKIP_STANDALONE
+ : GTID_SKIP_TRANSACTION);
+ }
}
}
@@ -2110,7 +2126,9 @@ err:
static int init_binlog_sender(binlog_send_info *info,
LOG_INFO *linfo,
const char *log_ident,
- my_off_t *pos)
+ my_off_t *pos,
+ rpl_gtid *start_gtids,
+ size_t n_start_gtids)
{
THD *thd= info->thd;
int error;
@@ -2130,7 +2148,8 @@ static int init_binlog_sender(binlog_send_info *info,
info->current_checksum_alg= get_binlog_checksum_value_at_connect(thd);
info->mariadb_slave_capability= get_mariadb_slave_capability(thd);
- info->using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state);
+ info->using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state) ||
+ start_gtids != NULL;
DBUG_EXECUTE_IF("simulate_non_gtid_aware_master",
info->using_gtid_state= false;);
@@ -2247,6 +2266,21 @@ static int init_binlog_sender(binlog_send_info *info,
info->clear_initial_log_pos= true;
}
+ if (start_gtids != NULL)
+ {
+ Domain_gtid_event_filter *filter= new Domain_gtid_event_filter();
+ my_off_t i;
+ for(i = 0; i < n_start_gtids; i++)
+ {
+ if (filter->add_start_gtid(&start_gtids[i]))
+ {
+ info->errmsg= "Domain id is invalid for GTID start position";
+ info->error= ER_INCORRECT_GTID_STATE;
+ }
+ }
+ info->gtid_event_filter= filter;
+ }
+
return 0;
}
@@ -2840,7 +2874,8 @@ static int send_one_binlog_file(binlog_send_info *info,
}
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
- ushort flags)
+ ushort flags, rpl_gtid *start_gtids,
+ uint32 n_start_gtids)
{
LOG_INFO linfo;
@@ -2860,7 +2895,8 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
bzero((char*) &log,sizeof(log));
- if (init_binlog_sender(info, &linfo, log_ident, &pos))
+ if (init_binlog_sender(info, &linfo, log_ident, &pos, start_gtids,
+ n_start_gtids))
goto err;
has_transmit_started= true;
@@ -3022,6 +3058,8 @@ err:
thd->reset_current_linfo();
thd->variables.max_allowed_packet= old_max_allowed_packet;
delete info->fdev;
+ delete info->gtid_event_filter;
+ my_free(start_gtids);
if (likely(info->error == 0))
{
diff --git a/sql/sql_repl.h b/sql/sql_repl.h
index 95916e31abf..bfc35ea5456 100644
--- a/sql/sql_repl.h
+++ b/sql/sql_repl.h
@@ -57,7 +57,8 @@ struct LOAD_FILE_IO_CACHE : public IO_CACHE
int log_loaded_block(IO_CACHE* file, uchar *Buffer, size_t Count);
int init_replication_sys_vars();
-void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags);
+void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags,
+ rpl_gtid *start_gtids, uint32 n_start_gtids);
#ifdef HAVE_PSI_INTERFACE
extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state;