summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrandon Nesterenko <brandon.nesterenko@mariadb.com>2021-08-11 11:29:37 -0600
committerBrandon Nesterenko <brandon.nesterenko@mariadb.com>2021-11-01 11:01:34 -0600
commitf4319661120e34c84048bbba347bf21feb14a10d (patch)
tree33f00704c5dcc9f6fe640e646dc7dfe82fdf7d21
parentd6e3cd6f233730e168624441d4025de5ce83affd (diff)
downloadmariadb-git-f4319661120e34c84048bbba347bf21feb14a10d.tar.gz
MDEV-4989: Support for GTID in mysqlbinlog
New Feature: =========== This commit extends the mariadb-binlog capabilities to allow events to be filtered by GTID ranges. More specifically, the following capabilities are addressed: 1) GTIDs can be used to filter results on local binlog files 2) GTIDs can be used to filter results from remote servers 3) For a given GTID range, its start-position is exclusive and its stop-position is inclusive. This allows users to receive events strictly after those that they already have, and is useful in cases such as: 1) events were received out of order and should be re-sent, or 2) specifying the gtid state of a slave to get events newer than their current state. 4) After the events have been written, the session server id and domain id are reset to their former values To facilitate these features, the --start-position and --stop-position arguments have been extended to accept values formatted as a list of GTID positions, e.g. --start-position=0-1-0,1-2-55 Reviewed By: =========== Andrei Elkin: <andrei.elkin@mariadb.com>
-rw-r--r--client/mysqlbinlog.cc294
-rw-r--r--man/mysqlbinlog.119
-rw-r--r--mysql-test/suite/binlog/include/mysqlbinlog_gtid_window_test_cases.inc242
-rw-r--r--mysql-test/suite/binlog/t/binlog_mysqlbinlog_gtid_window-master.opt1
-rw-r--r--mysql-test/suite/binlog/t/binlog_mysqlbinlog_gtid_window.test124
-rw-r--r--sql/rpl_gtid.cc422
-rw-r--r--sql/rpl_gtid.h253
-rw-r--r--sql/sql_parse.cc36
-rw-r--r--sql/sql_repl.cc48
-rw-r--r--sql/sql_repl.h3
10 files changed, 1400 insertions, 42 deletions
diff --git a/client/mysqlbinlog.cc b/client/mysqlbinlog.cc
index 3d9b9712ec8..2988dcf22a9 100644
--- a/client/mysqlbinlog.cc
+++ b/client/mysqlbinlog.cc
@@ -143,10 +143,18 @@ static char *charset= 0;
static uint verbose= 0;
-static ulonglong start_position, stop_position;
+static char *start_pos_str, *stop_pos_str;
+static ulonglong start_position= BIN_LOG_HEADER_SIZE,
+ stop_position= (longlong)(~(my_off_t)0) ;
#define start_position_mot ((my_off_t)start_position)
#define stop_position_mot ((my_off_t)stop_position)
+static Domain_gtid_event_filter *domain_gtid_filter= NULL;
+static rpl_gtid *start_gtids, *stop_gtids;
+static my_bool is_event_group_active= FALSE;
+static uint32 n_start_gtid_ranges= 0;
+static uint32 n_stop_gtid_ranges= 0;
+
static char *start_datetime_str, *stop_datetime_str;
static my_time_t start_datetime= 0, stop_datetime= MY_TIME_T_MAX;
static ulonglong rec_count= 0;
@@ -981,6 +989,10 @@ static bool print_row_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
return result;
}
+static inline my_bool is_gtid_filtering_enabled()
+{
+ return domain_gtid_filter != NULL;
+}
/**
Print the given event, and either delete it or delegate the deletion
@@ -1019,11 +1031,29 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
#endif
/*
+ If the binlog output should be filtered using GTIDs, test the new event
+ group to see if its events should be written or ignored.
+ */
+ if (ev_type == GTID_EVENT && is_gtid_filtering_enabled())
+ {
+ Gtid_log_event *gle= (Gtid_log_event*) ev;
+ rpl_gtid gtid;
+ gtid.domain_id= gle->domain_id;
+ gtid.server_id= gle->server_id;
+ gtid.seq_no= gle->seq_no;
+ if (!domain_gtid_filter->exclude(&gtid))
+ is_event_group_active= TRUE;
+ else
+ is_event_group_active= FALSE;
+ }
+
+ /*
Format events are not concerned by --offset and such, we always need to
read them to be able to process the wanted events.
*/
if (((rec_count >= offset) &&
- (ev->when >= start_datetime)) ||
+ (ev->when >= start_datetime) &&
+ (!is_gtid_filtering_enabled() || is_event_group_active)) ||
(ev_type == FORMAT_DESCRIPTION_EVENT))
{
if (ev_type != FORMAT_DESCRIPTION_EVENT)
@@ -1500,6 +1530,17 @@ end:
}
}
+ /*
+ Xid_log_events or commit Query_log_events mark the end of a GTID event
+ group.
+ */
+ if ((ev_type == XID_EVENT ||
+ (ev_type == QUERY_EVENT && ((Query_log_event *) ev)->is_commit())) &&
+ is_event_group_active)
+ {
+ is_event_group_active= FALSE;
+ }
+
if (destroy_evt) /* destroy it later if not set (ignored table map) */
delete ev;
}
@@ -1658,15 +1699,15 @@ static struct my_option my_options[] =
&start_datetime_str, &start_datetime_str,
0, GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"start-position", 'j',
- "Start reading the binlog at position N. Applies to the first binlog "
- "passed on the command line.",
- &start_position, &start_position, 0, GET_ULL,
- REQUIRED_ARG, BIN_LOG_HEADER_SIZE, BIN_LOG_HEADER_SIZE,
- /*
- COM_BINLOG_DUMP accepts only 4 bytes for the position
- so remote log reading has lower limit.
- */
- (ulonglong)(0xffffffffffffffffULL), 0, 0, 0},
+ "Start reading the binlog at this position. Type can either be a positive "
+ "integer or a GTID list. When using a positive integer, the value only "
+ "applies to the first binlog passed on the command line. In GTID mode, "
+ "multiple GTIDs can be passed as a comma separated list, where each must "
+ "have a unique domain id. Each GTID is exclusive; only events after a "
+ "given sequence number will be printed to allow users to receive events "
+ "after their current state, e.g. on a slave.",
+ &start_pos_str, &start_pos_str, 0, GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0,
+ 0, 0, 0},
{"stop-datetime", OPT_STOP_DATETIME,
"Stop reading the binlog at first event having a datetime equal or "
"posterior to the argument; the argument must be a date and time "
@@ -1684,11 +1725,14 @@ static struct my_option my_options[] =
&opt_stop_never_slave_server_id, &opt_stop_never_slave_server_id, 0,
GET_ULONG, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"stop-position", OPT_STOP_POSITION,
- "Stop reading the binlog at position N. Applies to the last binlog "
- "passed on the command line.",
- &stop_position, &stop_position, 0, GET_ULL,
- REQUIRED_ARG, (longlong)(~(my_off_t)0), BIN_LOG_HEADER_SIZE,
- (ulonglong)(~(my_off_t)0), 0, 0, 0},
+ "Stop reading the binlog at this position. Type can either be a positive "
+ "integer or a GTID list. When using a positive integer, the value only "
+ "applies to the last binlog passed on the command line. In GTID mode, "
+ "multiple GTIDs can be passed as a comma separated list, where each must "
+ "have a unique domain id. Each GTID is inclusive; only events up to the "
+ "given sequence numbers are printed.",
+ &stop_pos_str, &stop_pos_str, 0, GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0,
+ 0, 0},
{"table", 'T', "List entries for just this table (local log only).",
&table, &table, 0, GET_STR_ALLOC, REQUIRED_ARG,
0, 0, 0, 0, 0, 0},
@@ -1824,8 +1868,14 @@ static void cleanup()
my_free(const_cast<char*>(dirname_for_local_load));
my_free(start_datetime_str);
my_free(stop_datetime_str);
+ my_free(start_pos_str);
+ my_free(stop_pos_str);
+ my_free(start_gtids);
+ my_free(stop_gtids);
free_root(&glob_root, MYF(0));
+ delete domain_gtid_filter;
+
delete binlog_filter;
delete glob_description_event;
if (mysql)
@@ -2075,6 +2125,103 @@ get_one_option(const struct my_option *opt, const char *argument, const char *fi
print_version();
opt_version= 1;
break;
+ case OPT_STOP_POSITION:
+ {
+ stop_gtids= gtid_parse_string_to_list(stop_pos_str, strlen(stop_pos_str),
+ &n_stop_gtid_ranges);
+ if (stop_gtids == NULL)
+ {
+ int err= 0;
+ char *end_ptr= NULL;
+ /*
+ No GTIDs specified in OPT_STOP_POSITION specification. Treat the value
+ as a singular index.
+ */
+ stop_position= my_strtoll10(stop_pos_str, &end_ptr, &err);
+
+ if (err || *end_ptr)
+ {
+ // Can't parse the position from the user
+ sql_print_error("Stop position argument value is invalid. Should be "
+ "either a positive integer or GTID.");
+ return 1;
+ }
+ }
+ else if (n_stop_gtid_ranges > 0)
+ {
+ uint32 gtid_idx;
+
+ if (domain_gtid_filter == NULL)
+ domain_gtid_filter= new Domain_gtid_event_filter();
+
+ for (gtid_idx = 0; gtid_idx < n_stop_gtid_ranges; gtid_idx++)
+ {
+ rpl_gtid *stop_gtid= &stop_gtids[gtid_idx];
+ if (domain_gtid_filter->add_stop_gtid(stop_gtid))
+ {
+ sql_print_error(
+ "Domain id is invalid for GTID stop position %u-%u-%llu",
+ stop_gtid->domain_id, stop_gtid->server_id,
+ stop_gtid->seq_no);
+ return 1;
+ }
+ }
+ }
+ else
+ {
+ DBUG_ASSERT(0);
+ }
+ break;
+ }
+ case 'j':
+ {
+ start_gtids= gtid_parse_string_to_list(
+ start_pos_str, strlen(start_pos_str), &n_start_gtid_ranges);
+
+ if (start_gtids == NULL)
+ {
+ int err= 0;
+ char *end_ptr= NULL;
+ /*
+ No GTIDs specified in OPT_START_POSITION specification. Treat the value
+ as a singular index.
+ */
+ start_position= my_strtoll10(start_pos_str, &end_ptr, &err);
+
+ if (err || *end_ptr)
+ {
+ // Can't parse the position from the user
+ sql_print_error("Start position argument value is invalid. Should be "
+ "either a positive integer or GTID.");
+ return 1;
+ }
+ }
+ else if (n_start_gtid_ranges > 0)
+ {
+ uint32 gtid_idx;
+
+ if (domain_gtid_filter == NULL)
+ domain_gtid_filter= new Domain_gtid_event_filter();
+
+ for (gtid_idx = 0; gtid_idx < n_start_gtid_ranges; gtid_idx++)
+ {
+ rpl_gtid *start_gtid= &start_gtids[gtid_idx];
+ if (domain_gtid_filter->add_start_gtid(start_gtid))
+ {
+ sql_print_error(
+ "Domain id is invalid for GTID start position %u-%u-%llu",
+ start_gtid->domain_id, start_gtid->server_id,
+ start_gtid->seq_no);
+ return 1;
+ }
+ }
+ }
+ else
+ {
+ DBUG_ASSERT(0);
+ }
+ break;
+ }
case '?':
usage();
opt_version= 1;
@@ -2237,11 +2384,16 @@ static Exit_status dump_log_entries(const char* logname)
@retval ERROR_STOP An error occurred - the program should terminate.
@retval OK_CONTINUE No error, the program should continue.
*/
-static Exit_status check_master_version()
+static Exit_status check_master_version(uint *major, uint *minor, uint *patch,
+ my_bool *version_extracted)
{
MYSQL_RES* res = 0;
MYSQL_ROW row;
uint version;
+ size_t version_iter;
+
+ // Assume no problems extracting the master version
+ *version_extracted= TRUE;
if (mysql_query(mysql, "SELECT VERSION()") ||
!(res = mysql_store_result(mysql)))
@@ -2257,12 +2409,52 @@ static Exit_status check_master_version()
goto err;
}
- if (!(version = atoi(row[0])))
+ if (!(*major = atoi(row[0])))
{
error("Could not find server version: "
"Master reported NULL for the version.");
goto err;
}
+
+ /* Try to save the minor version, if supplied with a storage location */
+ if (minor != NULL || patch != NULL)
+ {
+ for (version_iter= 0; version_iter < strlen((const char *) row);
+ version_iter++)
+ {
+ if (row[0][version_iter] == '.')
+ {
+ version_iter= version_iter + 1;
+ break;
+ }
+ }
+ if (minor != NULL && !(*minor= atoi(&row[0][version_iter])))
+ {
+ warning("Could not extract minor version from MariaDB master version %s", row[0]);
+ *version_extracted= FALSE;
+ }
+ }
+
+ /* Try to save the patch version, if supplied with a storage location */
+ if (patch != NULL)
+ {
+ for (; version_iter < strlen((const char *) row); version_iter++)
+ {
+ if (row[0][version_iter] == '.')
+ {
+ version_iter= version_iter + 1;
+ break;
+ }
+ }
+ if (!(*patch= atoi(&row[0][version_iter])))
+ {
+ warning("Could not extract patch version from MariaDB master version %s", row[0]);
+ *version_extracted= FALSE;
+ }
+ }
+
+ version= *major;
+
/*
Make a notice to the server that this client
is checksum-aware. It does not need the first fake Rotate
@@ -2606,21 +2798,63 @@ static Exit_status dump_remote_log_entries(PRINT_EVENT_INFO *print_event_info,
DBUG_RETURN(retval);
net= &mysql->net;
- if ((retval= check_master_version()) != OK_CONTINUE)
+ uint major_version, minor_version;
+ my_bool was_version_extracted= TRUE; // Assume true to start
+ if ((retval= check_master_version(&major_version, &minor_version, NULL,
+ &was_version_extracted)) != OK_CONTINUE)
DBUG_RETURN(retval);
/*
COM_BINLOG_DUMP accepts only 4 bytes for the position, so we are forced to
cast to uint32.
*/
- DBUG_ASSERT(start_position <= UINT_MAX32);
- int4store(buf, (uint32)start_position);
+ size_t buf_idx= 0;
+ if (is_gtid_filtering_enabled() && n_start_gtid_ranges > 0)
+ {
+ if (was_version_extracted &&
+ (major_version < 10 || (major_version == 10 && minor_version < 7)))
+ {
+ error("Master does not support GTID filtering. This feature was added "
+ "in MariaDB version 10.7.");
+ DBUG_RETURN(ERROR_STOP);
+ }
+ else if (!was_version_extracted)
+ {
+ warning("Could not extract complete MariaDB version from master. Trying "
+ "to use GTID filtering, but it is not guaranteed that the "
+ "master supports it (must be 10.7+).");
+ }
+
+ size_t i;
+ for (i = 0; i < n_start_gtid_ranges; i++)
+ {
+ if (i > 0)
+ {
+ buf[buf_idx]= ',';
+ buf_idx += 1;
+ }
+
+ int4store(buf + buf_idx, start_gtids[i].domain_id);
+ buf[buf_idx+4]= '-';
+ int4store(buf + buf_idx + 5, start_gtids[i].server_id);
+ buf[buf_idx+9]= '-';
+ int8store(buf + buf_idx + 10, start_gtids[i].seq_no);
+ buf_idx += 18;
+ }
+ }
+ else
+ {
+ DBUG_ASSERT(start_position <= UINT_MAX32);
+ int4store(buf, (uint32) start_position);
+ buf_idx= BIN_LOG_HEADER_SIZE;
+ }
if (!opt_skip_annotate_row_events)
binlog_flags|= BINLOG_SEND_ANNOTATE_ROWS_EVENT;
if (!opt_stop_never)
binlog_flags|= BINLOG_DUMP_NON_BLOCK;
- int2store(buf + BIN_LOG_HEADER_SIZE, binlog_flags);
+ int2store(buf + buf_idx, binlog_flags);
+ buf_idx += 2;
size_t tlen = strlen(logname);
if (tlen > sizeof(buf) - 10)
@@ -2637,9 +2871,10 @@ static Exit_status dump_remote_log_entries(PRINT_EVENT_INFO *print_event_info,
}
else
slave_id= 0;
- int4store(buf + 6, slave_id);
- memcpy(buf + 10, logname, logname_len);
- if (simple_command(mysql, COM_BINLOG_DUMP, buf, logname_len + 10, 1))
+ int4store(buf + buf_idx, slave_id);
+ buf_idx += 4;
+ memcpy(buf + buf_idx, logname, logname_len);
+ if (simple_command(mysql, COM_BINLOG_DUMP, buf, logname_len + buf_idx, 1))
{
error("Got fatal error sending the log dump command.");
DBUG_RETURN(ERROR_STOP);
@@ -3210,6 +3445,14 @@ int main(int argc, char** argv)
"/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;\n");
fprintf(result_file, "/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=0*/;\n");
+
+ if (is_gtid_filtering_enabled())
+ {
+ fprintf(result_file,
+ "/*!100001 SET @@SESSION.SERVER_ID=@@GLOBAL.SERVER_ID */;\n"
+ "/*!100001 SET @@SESSION.GTID_DOMAIN_ID=@@GLOBAL.GTID_DOMAIN_ID "
+ "*/;\n");
+ }
}
if (tmpdir.list)
@@ -3271,3 +3514,4 @@ struct encryption_service_st encryption_handler=
#include "sql_list.cc"
#include "rpl_filter.cc"
#include "compat56.cc"
+#include "rpl_gtid.cc" \ No newline at end of file
diff --git a/man/mysqlbinlog.1 b/man/mysqlbinlog.1
index 633300bb7c5..1c7f013af7b 100644
--- a/man/mysqlbinlog.1
+++ b/man/mysqlbinlog.1
@@ -992,8 +992,14 @@ This option is useful for point\-in\-time recovery\&.
\fB\-\-start\-position=\fR\fB\fIN\fR\fR,
\fB\-j \fR\fB\fIN\fR\fR
.sp
-Start reading the binary log at the first event having a position equal to or greater than
-\fIN\fR\&. This option applies to the first log file named on the command line\&.
+Start reading the binary log at \fIN\fR\&. Type can either be a positive
+integer or a GTID\& list\&. When using a positive integer, the value only
+applies to the first binlog passed on the command line, and the first event
+that has a position equal to or greater than \fIN\fR is printed\&. In GTID mode,
+multiple GTIDs can be passed as a comma separated list, where each must have a
+unique domain id\&. Each GTID is exclusive; only events after a given sequence
+number will be printed to allow users to receive events after their current
+state, e.g. on a slave\&.
.sp
This option is useful for point\-in\-time recovery\&.
.RE
@@ -1063,8 +1069,13 @@ The slave server_id used for \fB--read-from-remote-server --stop-never\fR\&.
.\" stop-position option: mysqlbinlog
\fB\-\-stop\-position=\fR\fB\fIN\fR\fR
.sp
-Stop reading the binary log at the first event having a position equal to or greater than
-\fIN\fR\&. This option applies to the last log file named on the command line\&.
+Stop reading the binary log at the first event having a position equal to or
+greater than \fIN\fR\&. Type can either be a positive integer or a GTID
+list\&. When using a positive integer, the value only applies to the last log
+file named on the command line\&. When in GTID mode, multiple GTIDs can be
+passed as a comma separated list, where each must have a unique domain id\&.
+Each GTID is inclusive; only events up to the given sequence numbers are
+printed.
.sp
This option is useful for point\-in\-time recovery\&.
.RE
diff --git a/mysql-test/suite/binlog/include/mysqlbinlog_gtid_window_test_cases.inc b/mysql-test/suite/binlog/include/mysqlbinlog_gtid_window_test_cases.inc
new file mode 100644
index 00000000000..29defc45091
--- /dev/null
+++ b/mysql-test/suite/binlog/include/mysqlbinlog_gtid_window_test_cases.inc
@@ -0,0 +1,242 @@
+#
+# This file runs test cases for providing GTIDs to --start-position and
+# --stop-position arguments in mariadb-binlog
+#
+# param $is_remote boolean (0 for false, 1 for true) to perform a local file
+# or remote host analysis
+#
+
+--let MYSQLD_DATADIR=`select @@datadir`
+--let OUT_FILE=$MYSQLTEST_VARDIR/tmp/binlog.out
+--let BINLOG_FILE=master-bin.000001
+--let data_inconsistent_err= "table data is inconsistent after replaying binlog using GTID start/stop positions";
+
+if ($is_remote == 0)
+{
+ --let BINLOG_FILE_PARAM=$MYSQLD_DATADIR/$BINLOG_FILE.orig
+}
+if ($is_remote == 1)
+{
+ --let BINLOG_FILE_PARAM=--read-from-remote-server $BINLOG_FILE
+}
+
+## Initialize test data
+#
+SET @@session.gtid_domain_id= 0;
+SET @@session.server_id= 1;
+CREATE TABLE t1 (a int);
+INSERT INTO t1 values (1), (2);
+--let test2_t1_good_checksum= `CHECKSUM TABLE t1`
+
+SET @@session.gtid_domain_id= 1;
+SET @@session.server_id= 2;
+CREATE TABLE t2 (a int);
+INSERT INTO t2 values (1);
+
+SET @@session.gtid_domain_id= 0;
+SET @@session.server_id= 1;
+INSERT INTO t1 values (3), (4);
+--let t1_final_checksum= `CHECKSUM TABLE t1`
+
+SET @@session.gtid_domain_id= 1;
+SET @@session.server_id= 2;
+INSERT INTO t2 values (2);
+--let test4_t2_good_checksum= `CHECKSUM TABLE t2`
+
+SET @@session.server_id= 3;
+INSERT INTO t2 values (3);
+--let test3_t2_good_checksum= `CHECKSUM TABLE t2`
+
+SET @@session.server_id= 2;
+INSERT INTO t2 values (4);
+--let t2_final_checksum= `CHECKSUM TABLE t2`
+
+FLUSH LOGS;
+if ($is_remote == 0)
+{
+ --copy_file $MYSQLD_DATADIR/master-bin.000001 $BINLOG_FILE_PARAM
+}
+
+DROP TABLE t1;
+DROP TABLE t2;
+
+
+--echo #
+--echo # Test Case 1:
+--echo # The end of the binlog file resets the server and domain id of the
+--echo # session
+
+# As gtid_domain_id and server_id should not change after reading binlog in GTID
+# mode, change variables to otherwise-unused values to ensure they remain
+--let $reset_gtid_domain_id = `select @@session.gtid_domain_id`
+--let $reset_server_id = `select @@session.server_id`
+SET @@session.gtid_domain_id= 10;
+SET @@session.server_id= 20;
+
+# Replay the binlog
+--echo # MYSQL_BINLOG BINLOG_FILE_PARAM --start-position=0-1-0 --stop-position=0-1-2 | MYSQL
+--exec $MYSQL_BINLOG $BINLOG_FILE_PARAM --start-position=0-1-0 --stop-position=0-1-2 | $MYSQL
+--let $test_gtid_domain_id = `select @@session.gtid_domain_id`
+
+# Ensure variables haven't changed
+--let $assert_text = session gtid_domain_id should not change when reading binlog in GTID mode
+--let $assert_cond = @@session.gtid_domain_id = 10
+--source include/assert.inc
+--let $assert_text = session server_id should not change when reading binlog in GTID mode
+--let $assert_cond = @@session.server_id = 20
+--source include/assert.inc
+
+# Reset back to previous state
+--eval SET @@session.gtid_domain_id= $reset_gtid_domain_id
+--eval SET @@session.server_id= $reset_server_id
+DROP TABLE t1;
+DROP TABLE t2;
+
+
+--echo #
+--echo # Test Case 2:
+--echo # Single GTID range specified
+--echo # MYSQL_BINLOG BINLOG_FILE_PARAM --start-position=0-1-0 --stop-position=0-1-2 | MYSQL
+--exec $MYSQL_BINLOG $BINLOG_FILE_PARAM --start-position=0-1-0 --stop-position=0-1-2 | $MYSQL
+
+if ($test2_t1_good_checksum != `CHECKSUM TABLE t1`)
+{
+ die $data_inconsistent_err;
+}
+if ($t2_final_checksum != `CHECKSUM TABLE t2`)
+{
+ die $data_inconsistent_err;
+}
+DROP TABLE t1;
+DROP TABLE t2;
+
+
+--echo #
+--echo # Test Case 3:
+--echo # Single GTID range with different server_ids
+--echo # MYSQL_BINLOG BINLOG_FILE_PARAM --start-position=1-2-0 --stop-position=1-3-4 | MYSQL
+--exec $MYSQL_BINLOG $BINLOG_FILE_PARAM --start-position=1-2-0 --stop-position=1-3-4 | $MYSQL
+
+if ($test3_t2_good_checksum != `CHECKSUM TABLE t2`)
+{
+ die $data_inconsistent_err;
+}
+if ($t1_final_checksum != `CHECKSUM TABLE t1`)
+{
+ die $data_inconsistent_err;
+}
+DROP TABLE t1;
+DROP TABLE t2;
+
+
+--echo #
+--echo # Test Case 4:
+--echo # Multiple GTID ranges specified
+--echo # MYSQL_BINLOG BINLOG_FILE_PARAM --start-position=0-1-0,1-2-0 --stop-position=0-1-3,1-2-3 | MYSQL
+--exec $MYSQL_BINLOG $BINLOG_FILE_PARAM --start-position=0-1-0,1-2-0 --stop-position=0-1-3,1-2-3 | $MYSQL
+
+# Reuse checksum spot from test 4
+if ($t1_final_checksum != `CHECKSUM TABLE t1`)
+{
+ die $data_inconsistent_err;
+}
+if ($test4_t2_good_checksum != `CHECKSUM TABLE t2`)
+{
+ die $data_inconsistent_err;
+}
+DROP TABLE t1;
+DROP TABLE t2;
+
+
+--echo #
+--echo # Test Case 5:
+--echo # Multiple GTID ranges specified where the domain ids are listed in
+--echo # different orders between start/stop position
+--echo # MYSQL_BINLOG BINLOG_FILE_PARAM --stop-position=0-1-3,1-2-3 --start-position=1-2-0,0-1-0 | MYSQL
+--exec $MYSQL_BINLOG $BINLOG_FILE_PARAM --stop-position=0-1-3,1-2-3 --start-position=1-2-0,0-1-0 | $MYSQL
+
+# Reuse checksum spot from test 4
+if ($t1_final_checksum != `CHECKSUM TABLE t1`)
+{
+ die $data_inconsistent_err;
+}
+if ($test4_t2_good_checksum != `CHECKSUM TABLE t2`)
+{
+ die $data_inconsistent_err;
+}
+DROP TABLE t1;
+DROP TABLE t2;
+
+
+--echo #
+--echo # Test Case 6:
+--echo # Only start position specified
+CREATE TABLE t1 (a int);
+INSERT INTO t1 values (3), (4);
+--let test6_t1_good_checksum= `CHECKSUM TABLE t1`
+DROP TABLE t1;
+CREATE TABLE t1 (a int);
+--echo # MYSQL_BINLOG BINLOG_FILE_PARAM --start-position=0-1-2 | MYSQL
+--exec $MYSQL_BINLOG $BINLOG_FILE_PARAM --start-position=0-1-2 | $MYSQL
+if ($test6_t1_good_checksum != `CHECKSUM TABLE t1`)
+{
+ die $data_inconsistent_err;
+}
+if ($t2_final_checksum != `CHECKSUM TABLE t2`)
+{
+ die $data_inconsistent_err;
+}
+DROP TABLE t1;
+DROP TABLE t2;
+
+
+--echo #
+--echo # Test Case 7:
+--echo # Only stop position specified
+--echo # MYSQL_BINLOG BINLOG_FILE_PARAM --stop-position=0-1-2 | MYSQL
+--exec $MYSQL_BINLOG $BINLOG_FILE_PARAM --stop-position=0-1-2 | $MYSQL
+
+# Reuse checksum spot from test 2
+if ($test2_t1_good_checksum != `CHECKSUM TABLE t1`)
+{
+ die $data_inconsistent_err;
+}
+if ($t2_final_checksum != `CHECKSUM TABLE t2`)
+{
+ die $data_inconsistent_err;
+}
+DROP TABLE t1;
+DROP TABLE t2;
+
+
+--echo #
+--echo # Test Case 8:
+--echo # Seq_no=0 in --start-position includes all events for a domain
+--echo # MYSQL_BINLOG BINLOG_FILE_PARAM --start-position=0-1-0,1-2-0 | MYSQL
+--exec $MYSQL_BINLOG $BINLOG_FILE_PARAM --start-position=0-1-0,1-2-0 | $MYSQL
+if ($t1_final_checksum != `CHECKSUM TABLE t1`)
+{
+ die "t1 data should be complete as binlog replay should include domain 0 entirely in results";
+}
+if ($t2_final_checksum != `CHECKSUM TABLE t2`)
+{
+ die "t2 data should be complete as binlog replay should include domain 1 entirely in results";
+}
+DROP TABLE t1;
+DROP TABLE t2;
+
+--echo #
+--echo # Test Case 9:
+--echo # Seq_no=0 in --stop-position excludes all events for a domain
+--echo # MYSQL_BINLOG BINLOG_FILE_PARAM --stop-position=0-1-0,1-2-0 | MYSQL
+--exec $MYSQL_BINLOG $BINLOG_FILE_PARAM --stop-position=0-1-0,1-2-0 | $MYSQL
+if (0 < `SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'test' AND table_name = 't1'`)
+{
+ die "t1 should not exist as binlog replay should exclude domain 0 from results";
+}
+if (0 < `SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'test' AND table_name = 't2'`)
+{
+ die "t2 should not exist as binlog replay should exclude domain 1 from results";
+}
+DROP TABLE t1;
+DROP TABLE t2; \ No newline at end of file
diff --git a/mysql-test/suite/binlog/t/binlog_mysqlbinlog_gtid_window-master.opt b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_gtid_window-master.opt
new file mode 100644
index 00000000000..d17999c07c1
--- /dev/null
+++ b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_gtid_window-master.opt
@@ -0,0 +1 @@
+--timezone=GMT-8 \ No newline at end of file
diff --git a/mysql-test/suite/binlog/t/binlog_mysqlbinlog_gtid_window.test b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_gtid_window.test
new file mode 100644
index 00000000000..0bc83927123
--- /dev/null
+++ b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_gtid_window.test
@@ -0,0 +1,124 @@
+#
+# Purpose:
+#
+# This test ensures that the mariadb-binlog CLI tool can filter log events
+# using GTID ranges. More specifically, this test ensures the following
+# capabilities:
+# 1) GTIDs can be used to filter results on local binlog files
+# 2) GTIDs can be used to filter results from remote servers
+# 3) For a given GTID range, its start-position is exclusive and its
+# stop-position is inclusive. This allows users to receive events strictly
+# after those that they already have, and is useful in cases such as: 1)
+# events were received out of order and should be re-sent, or 2)
+# specifying the gtid state of a slave to get events newer than their
+# current state.
+# 4) After the events have been written, the session server id and domain id
+# are reset to their former values
+#
+#
+# Methodology:
+#
+# This test validates the expected capabilities using the following test cases
+# on both a local binlog file and remote server for all binlog formats.
+# Test Case 1) The end of the binlog file resets the server and domain id of
+# the session
+# Test Case 2) Single GTID range specified
+# Test Case 3) Single GTID range with different server_ids
+# Test Case 4) Multiple GTID ranges specified
+# Test Case 5) Multiple GTID ranges specified where the domain ids are
+# listed in different orders between start/stop position
+# Test Case 6) Only start position specified
+# Test Case 7) Only stop position specified
+# Test Case 8) Seq_no=0 in --start-position includes all events for a domain
+# Test Case 9) Seq_no=0 in --stop-position excludes all events for a domain
+#
+# To validate for data consistency, each test case compares a checksum of
+# correct data against a variant created after replaying the binlog using
+# --(start|stop)-position. If the checksums are identical, the test passes.
+# If the checksums differ, data has been changed and the test fails.
+#
+# Additionally, this test validates the following error scenarios:
+# Error Case 1) User provides invalid positions
+# Error Case 2) User provides GTID ranges with repeated domain ids
+#
+#
+# References:
+# MDEV-4989: Support for GTID in mysqlbinlog
+#
+
+--source include/have_log_bin.inc
+
+--echo ###############################
+--echo # Test Setup
+--echo ###############################
+
+## Save old state
+#
+let orig_gtid_domain_id = `select @@session.gtid_domain_id`;
+let orig_server_id = `select @@session.server_id`;
+
+DROP TABLE IF EXISTS t1;
+DROP TABLE IF EXISTS t2;
+RESET MASTER;
+
+
+--echo ######################################
+--echo # Test Group 1
+--echo # Run test cases on local log file
+--echo ######################################
+--let is_remote= 0
+--source include/mysqlbinlog_gtid_window_test_cases.inc
+
+
+--echo ######################################
+--echo # Test Group 2
+--echo # Run test cases on remote host
+--echo ######################################
+--let is_remote= 1
+--source include/mysqlbinlog_gtid_window_test_cases.inc
+
+
+--echo ##############################
+--echo # Error Cases
+--echo ##############################
+
+--let $MYSQLD_DATADIR=`select @@datadir`
+--echo #
+--echo # Error Case 1:
+--echo # User provides invalid positions
+--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=z --base64-output=NEVER
+--error 1
+--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --start-position=z --base64-output=NEVER
+
+--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=1- --base64-output=NEVER
+--error 1
+--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --start-position=1- --base64-output=NEVER
+
+--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=1-2 --base64-output=NEVER
+--error 1
+--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --start-position=1-2 --base64-output=NEVER
+
+--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=1-2- --base64-output=NEVER
+--error 1
+--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --start-position=1-2- --base64-output=NEVER
+
+--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=-1 --base64-output=NEVER
+--error 1
+--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --start-position=-1 --base64-output=NEVER
+
+--echo #
+--echo # Error Case 2:
+--echo # User provides GTID ranges with repeated domain ids
+--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=0-1-1,0-1-8 --stop-position=0-1-4,0-1-12 --base64-output=NEVER
+--error 1
+--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --start-position=0-1-1,0-1-8 --stop-position=0-1-4,0-1-12 --base64-output=NEVER
+
+
+--echo ##############################
+--echo # Cleanup
+--echo ##############################
+DROP TABLE IF EXISTS t1;
+DROP TABLE IF EXISTS t2;
+--eval SET @@global.gtid_domain_id= $orig_gtid_domain_id
+--eval SET @@global.server_id= $orig_server_id
+
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index 306ae878060..629be5165bf 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -17,6 +17,7 @@
/* Definitions for MariaDB global transaction ID (GTID). */
+#ifndef MYSQL_CLIENT
#include "mariadb.h"
#include "sql_priv.h"
#include "unireg.h"
@@ -1273,6 +1274,7 @@ rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid)
return true;
}
+#endif
/*
Parse a GTID at the start of a string, and update the pointer to point
@@ -1310,9 +1312,32 @@ gtid_parser_helper(const char **ptr, const char *end, rpl_gtid *out_gtid)
return 0;
}
+/*
+ Unpack a GTID at the start of a string, and update the pointer to point
+ at the first character after the unpacked GTID.
+
+ Returns 0 on ok, non-zero on parse error.
+*/
+static int
+gtid_unpack_helper(const char **ptr, const char *end, rpl_gtid *out_gtid)
+{
+ const char *p= *ptr;
+
+ if (p[4] != '-' || p[9] != '-')
+ return 1;
+
+ out_gtid->domain_id= (uint32)uint4korr(p);
+ out_gtid->server_id= (uint32)uint4korr(&p[5]);
+ out_gtid->seq_no= (uint64)uint8korr(&p[10]);
+
+ *ptr= p + 18;
+ return 0;
+}
rpl_gtid *
-gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len)
+gtid_read_to_list(const char *str, size_t str_len, uint32 *out_len,
+ int reader_f(const char **ptr, const char *end,
+ rpl_gtid *out_gtid))
{
const char *p= const_cast<char *>(str);
const char *end= p + str_len;
@@ -1323,7 +1348,7 @@ gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len)
{
rpl_gtid gtid;
- if (len >= (((uint32)1 << 28)-1) || gtid_parser_helper(&p, end, &gtid))
+ if (len >= (((uint32)1 << 28)-1) || reader_f(&p, end, &gtid))
{
my_free(list);
return NULL;
@@ -1350,6 +1375,20 @@ gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len)
}
+rpl_gtid *
+gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len)
+{
+ return gtid_read_to_list(str, str_len, out_len, gtid_parser_helper);
+}
+
+rpl_gtid *
+gtid_unpack_string_to_list(const char *str, size_t str_len, uint32 *out_len)
+{
+ return gtid_read_to_list(str, str_len, out_len, gtid_unpack_helper);
+}
+
+#ifndef MYSQL_CLIENT
+
/*
Update the slave replication state with the GTID position obtained from
master when connecting with old-style (filename,offset) position.
@@ -2974,3 +3013,382 @@ gtid_waiting::remove_from_wait_queue(gtid_waiting::hash_element *he,
queue_remove(&he->queue, elem->queue_idx);
}
+
+#endif
+
+Window_gtid_event_filter::Window_gtid_event_filter() :
+ m_has_start(FALSE),
+ m_has_stop(FALSE),
+ m_is_active(FALSE),
+ m_has_passed(FALSE)
+ {
+ // m_start and m_stop do not need initial values if unused
+ }
+
+Window_gtid_event_filter::Window_gtid_event_filter(rpl_gtid *start, rpl_gtid *stop) :
+ m_is_active(FALSE),
+ m_has_passed(FALSE)
+{
+ DBUG_ASSERT(start->domain_id == stop->domain_id);
+
+ m_is_active= FALSE;
+ m_has_passed= FALSE;
+
+ m_has_start= TRUE;
+ m_start.domain_id= start->domain_id;
+ m_start.server_id= start->server_id;
+ m_start.seq_no= start->seq_no;
+
+ m_has_stop= TRUE;
+ m_stop.domain_id= stop->domain_id;
+ m_stop.server_id= stop->server_id;
+ m_stop.seq_no= stop->seq_no;
+}
+
+int Window_gtid_event_filter::set_start_gtid(rpl_gtid *start)
+{
+ int err= 0;
+ if (m_has_start)
+ {
+ err= 1;
+ goto err;
+ }
+
+ if (m_has_stop && m_stop.domain_id != start->domain_id)
+ {
+ err= 1;
+ goto err;
+ }
+
+ // Copy values
+ m_has_start= TRUE;
+ m_start.domain_id= start->domain_id;
+ m_start.server_id= start->server_id;
+ m_start.seq_no= start->seq_no;
+
+err:
+ return err;
+}
+
+int Window_gtid_event_filter::set_stop_gtid(rpl_gtid *stop)
+{
+ int err= 0;
+ if (m_has_stop)
+ {
+ err= 1;
+ goto err;
+ }
+
+ if (m_has_start && m_start.domain_id != stop->domain_id)
+ {
+ err= 1;
+ goto err;
+ }
+
+ // Copy values
+ m_has_stop= TRUE;
+ m_stop.domain_id= stop->domain_id;
+ m_stop.server_id= stop->server_id;
+ m_stop.seq_no= stop->seq_no;
+
+err:
+ return err;
+}
+
+static inline my_bool is_gtid_at_or_after(rpl_gtid *boundary,
+ rpl_gtid *test_gtid)
+{
+ return test_gtid->domain_id == boundary->domain_id &&
+ test_gtid->server_id == boundary->server_id &&
+ test_gtid->seq_no >= boundary->seq_no;
+}
+
+static inline my_bool is_gtid_before(rpl_gtid *boundary,
+ rpl_gtid *test_gtid)
+{
+ return test_gtid->domain_id == boundary->domain_id &&
+ test_gtid->server_id == boundary->server_id &&
+ test_gtid->seq_no <= boundary->seq_no;
+}
+
+my_bool Window_gtid_event_filter::exclude(rpl_gtid *gtid)
+{
+ /* Assume result should be excluded to start */
+ my_bool should_exclude= TRUE;
+
+ DBUG_ASSERT((m_has_start || m_has_stop) &&
+ (gtid->domain_id == m_start.domain_id ||
+ gtid->domain_id == m_stop.domain_id));
+
+ if (!m_is_active && !m_has_passed)
+ {
+ /*
+ This filter has not yet been activated. Check if the gtid is within the
+ bounds of this window.
+ */
+
+ if (!m_has_start)
+ {
+ /*
+ Start GTID was not provided, so we want to include everything up to m_stop
+ */
+ m_is_active= TRUE;
+ should_exclude= FALSE;
+ }
+ else if (is_gtid_at_or_after(&m_start, gtid))
+ {
+ m_is_active= TRUE;
+
+ DBUG_PRINT("gtid-event-filter",
+ ("Window: Begin (%d-%d-%d, %d-%d-%llu]", m_start.domain_id,
+ m_start.server_id, m_start.seq_no, m_stop.domain_id,
+ m_stop.server_id, m_stop.seq_no));
+
+ /*
+ As the start of the range is exclusive, if this gtid is the start of
+ the range, exclude it
+ */
+ if (gtid->seq_no == m_start.seq_no &&
+ gtid->server_id == m_start.server_id)
+ should_exclude= TRUE;
+ else
+ should_exclude= FALSE;
+ }
+ } /* if (!m_is_active && !m_has_passed) */
+ else if (m_is_active && !m_has_passed)
+ {
+ /*
+ This window is currently active so we want the event group to be included
+ in the results. Additionally check if we are at the end of the window.
+ If no end of the window is provided, go indefinitely
+ */
+ should_exclude= FALSE;
+
+ if (m_has_stop && is_gtid_at_or_after(&m_stop, gtid))
+ {
+ DBUG_PRINT("gtid-event-filter",
+ ("Window: End (%d-%d-%d, %d-%d-%llu]", m_start.domain_id,
+ m_start.server_id, m_start.seq_no, m_stop.domain_id,
+ m_stop.server_id, m_stop.seq_no));
+ m_is_active= FALSE;
+ m_has_passed= TRUE;
+
+ if (gtid->server_id == m_stop.server_id && gtid->seq_no > m_stop.seq_no)
+ {
+ /*
+ The GTID is after the finite stop of the window, don't let it pass
+ through
+ */
+ should_exclude= TRUE;
+ }
+ }
+ else if (m_has_start && is_gtid_before(&m_start, gtid))
+ {
+ /*
+ Out of order check, the window is active but this GTID takes place
+ before the window begins. keep the window active, but exclude it from
+ passing through.
+ */
+ should_exclude= TRUE;
+ }
+ }
+ else if (m_has_passed && m_has_stop && is_gtid_before(&m_stop, gtid))
+ {
+ /* Test if events are out of order */
+ if (!m_has_start || (m_has_start && is_gtid_at_or_after(&m_start, gtid)))
+ {
+ /*
+ The filter window has closed because it has seen a GTID higher than its
+ end boundary; however, this GTID is out of order and should be passed
+ through.
+ */
+ should_exclude= TRUE;
+ }
+ }
+
+ return should_exclude;
+}
+
+Id_delegating_gtid_event_filter::Id_delegating_gtid_event_filter()
+{
+ uint32 i;
+
+ m_filter_id_mask= 0xf;
+
+ m_filters_by_id= (gtid_filter_element **) my_malloc(
+ PSI_NOT_INSTRUMENTED,
+ (m_filter_id_mask + 1) * sizeof(gtid_filter_element *),
+ MYF(MY_WME)
+ );
+
+ if (m_filters_by_id == NULL)
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+
+ for (i = 0; i <= m_filter_id_mask; i++)
+ {
+ m_filters_by_id[i]= NULL;
+ }
+
+ m_default_filter= new Accept_all_gtid_filter();
+}
+
+/*
+ Deconstructor deletes:
+ 1) All Identifiable_gtid_event_filters added
+ 2) All gtid_filter_element allocations
+*/
+Id_delegating_gtid_event_filter::~Id_delegating_gtid_event_filter()
+{
+ uint32 i;
+ for (i = 0; i <= m_filter_id_mask; i++)
+ {
+ gtid_filter_element *filter_element= m_filters_by_id[i],
+ *filter_element_to_del= NULL;
+ while(filter_element)
+ {
+ filter_element_to_del= filter_element;
+ filter_element= filter_element->next;
+ delete filter_element_to_del->filter;
+ my_free(filter_element_to_del);
+ }
+ }
+ my_free(m_filters_by_id);
+
+ delete m_default_filter;
+}
+
+void Id_delegating_gtid_event_filter::set_default_filter(Gtid_event_filter *filter)
+{
+ if (m_default_filter)
+ delete m_default_filter;
+
+ m_default_filter= filter;
+}
+
+gtid_filter_element *
+Id_delegating_gtid_event_filter::try_find_filter_element_for_id(
+ gtid_filter_identifier filter_id)
+{
+ // Add this into the domain id list
+ uint32 map_idx= filter_id & m_filter_id_mask;
+ gtid_filter_element *filter_idx= m_filters_by_id[map_idx];
+
+ /* Find list index to add this filter */
+ while (filter_idx)
+ {
+ if (filter_idx->filter->get_filter_identifier() == filter_id)
+ break;
+ filter_idx= filter_idx->next;
+ }
+
+ return filter_idx;
+}
+
+gtid_filter_element *
+Id_delegating_gtid_event_filter::find_or_create_filter_element_for_id(
+ gtid_filter_identifier filter_id)
+{
+ // Add this into the domain id list
+ uint32 map_idx= filter_id & m_filter_id_mask;
+ gtid_filter_element *filter_idx= m_filters_by_id[map_idx],
+ *prev_idx= NULL;
+
+ /* Find list index to add this filter */
+ while (filter_idx)
+ {
+ prev_idx= filter_idx;
+ if (filter_idx->filter->get_filter_identifier() == filter_id)
+ {
+ break;
+ }
+ prev_idx= filter_idx;
+ filter_idx= filter_idx->next;
+ }
+
+ if (filter_idx == NULL)
+ {
+ // No other domain ids have filters that index here, create this one
+ filter_idx= (gtid_filter_element *) my_malloc(
+ PSI_NOT_INSTRUMENTED, sizeof(gtid_filter_element), MYF(MY_WME));
+ filter_idx->filter= NULL;
+ filter_idx->next= NULL;
+
+ if (prev_idx == NULL)
+ {
+ // This is the first filter in the bucket
+ m_filters_by_id[map_idx]= filter_idx;
+ }
+ else
+ {
+ // End of list, append filter list to tail
+ prev_idx->next= filter_idx;
+ }
+ }
+
+ return filter_idx;
+}
+
+my_bool Id_delegating_gtid_event_filter::exclude(rpl_gtid *gtid)
+{
+ gtid_filter_identifier filter_id= get_id_from_gtid(gtid);
+ gtid_filter_element *filter_element= try_find_filter_element_for_id(filter_id);
+ Gtid_event_filter *filter=
+ (filter_element ? filter_element->filter : m_default_filter);
+ return filter->exclude(gtid);
+}
+
+Window_gtid_event_filter *
+Domain_gtid_event_filter::find_or_create_window_filter_for_id(
+ uint32 domain_id)
+{
+ gtid_filter_element *filter_element=
+ find_or_create_filter_element_for_id(domain_id);
+ Window_gtid_event_filter *wgef= NULL;
+
+ if (filter_element->filter == NULL)
+ {
+ // New filter
+ wgef= new Window_gtid_event_filter();
+ filter_element->filter= new Identifiable_gtid_event_filter(domain_id, wgef);
+ }
+ else if (filter_element->filter->get_filter_type() == WINDOW_GTID_FILTER_TYPE)
+ {
+ // We have an existing window filter here
+ wgef= (Window_gtid_event_filter *)
+ filter_element->filter->get_identified_filter();
+ }
+ /*
+ Else: We have an existing filter but it is not of window type so propogate
+ NULL filter
+ */
+
+ return wgef;
+}
+
+int Domain_gtid_event_filter::add_start_gtid(rpl_gtid *gtid)
+{
+ int err= 0;
+ Window_gtid_event_filter *filter_to_update=
+ find_or_create_window_filter_for_id(gtid->domain_id);
+
+ if (filter_to_update == NULL)
+ err= 1;
+ else
+ err= filter_to_update->set_start_gtid(gtid);
+
+ return err;
+}
+
+int Domain_gtid_event_filter::add_stop_gtid(rpl_gtid *gtid)
+{
+ int err= 0;
+ Window_gtid_event_filter *filter_to_update=
+ find_or_create_window_filter_for_id(gtid->domain_id);
+
+ if (filter_to_update == NULL)
+ err= 1;
+ else
+ err= filter_to_update->set_stop_gtid(gtid);
+
+ return err;
+} \ No newline at end of file
diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h
index 11541c8000c..3e54b995a64 100644
--- a/sql/rpl_gtid.h
+++ b/sql/rpl_gtid.h
@@ -380,5 +380,258 @@ extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid,
extern int gtid_check_rpl_slave_state_table(TABLE *table);
extern rpl_gtid *gtid_parse_string_to_list(const char *p, size_t len,
uint32 *out_len);
+extern rpl_gtid *gtid_unpack_string_to_list(const char *p, size_t len,
+ uint32 *out_len);
+
+/*
+ Interface to support different methods of filtering log events by GTID
+*/
+class Gtid_event_filter
+{
+public:
+ Gtid_event_filter() {};
+ virtual ~Gtid_event_filter() {};
+
+ enum gtid_event_filter_type
+ {
+ DELEGATING_GTID_FILTER_TYPE = 1,
+ WINDOW_GTID_FILTER_TYPE = 2,
+ ACCEPT_ALL_GTID_FILTER_TYPE = 3
+ };
+
+ /*
+ Run the filter on an input gtid to test if the corresponding log events
+ should be excluded from a result
+
+ Returns TRUE when the event group corresponding to the input GTID should be
+ excluded.
+ Returns FALSE when the event group should be included.
+ */
+ virtual my_bool exclude(rpl_gtid *) = 0;
+
+ /*
+ The gtid_event_filter_type that corresponds to the underlying filter
+ implementation
+ */
+ virtual uint32 get_filter_type() = 0;
+};
+
+/*
+ Filter implementation which will include any and all input GTIDs. This is
+ used to set default behavior for GTIDs that do not have explicit filters
+ set on their domain_id, e.g. when a Window_gtid_event_filter is used for
+ a specific domain, then all other domain_ids will be accepted using this
+ filter implementation.
+*/
+class Accept_all_gtid_filter : public Gtid_event_filter
+{
+public:
+ Accept_all_gtid_filter() {}
+ ~Accept_all_gtid_filter() {}
+ my_bool exclude(rpl_gtid *gtid) { return FALSE; }
+ uint32 get_filter_type() { return ACCEPT_ALL_GTID_FILTER_TYPE; }
+};
+
+/*
+ A sub-class of Gtid_event_filter which allows for quick identification
+ of potentially applicable filters for arbitrary GTIDs.
+*/
+typedef uint32 gtid_filter_identifier;
+class Identifiable_gtid_event_filter : public Gtid_event_filter
+{
+
+public:
+ Identifiable_gtid_event_filter(gtid_filter_identifier filter_id,
+ Gtid_event_filter *filter)
+ : m_filter_id(filter_id), m_filter(filter){};
+
+ ~Identifiable_gtid_event_filter() {
+ delete m_filter;
+ };
+
+ Gtid_event_filter *get_identified_filter() { return m_filter; }
+ gtid_filter_identifier get_filter_identifier() { return m_filter_id; }
+
+ /*
+ Inherited functionality uses composition to call the pass-through filter
+ */
+ my_bool exclude(rpl_gtid *gtid) { return m_filter->exclude(gtid); }
+ uint32 get_filter_type() { return m_filter->get_filter_type(); }
+
+protected:
+ gtid_filter_identifier m_filter_id;
+ Gtid_event_filter *m_filter;
+};
+
+/*
+ A filter implementation that passes through events between two GTIDs, m_start
+ (exclusive) and m_stop (inclusive).
+
+ This filter is stateful, such that it expects GTIDs to be a sequential
+ stream, and internally, the window will activate/deactivate when the start
+ and stop positions of the event stream have passed through, respectively.
+
+ Window activation is used to permit events from the same domain id which fall
+ in-between m_start and m_stop, but are not from the same server id. For
+ example, consider the following event stream with GTIDs 0-1-1,0-2-1,0-1-2.
+ With m_start as 0-1-0 and m_stop as 0-1-2, we want 0-2-1 to be included in
+ this filter. Therefore, the window activates upon seeing 0-1-1, and allows
+ any GTIDs within this domain to pass through until 0-1-2 has been
+ encountered.
+*/
+class Window_gtid_event_filter : public Gtid_event_filter
+{
+public:
+ Window_gtid_event_filter();
+ Window_gtid_event_filter(rpl_gtid *start, rpl_gtid *stop);
+ ~Window_gtid_event_filter() {}
+
+ my_bool exclude(rpl_gtid*);
+
+ /*
+ Set the GTID that begins this window (exclusive)
+
+ Returns 0 on ok, non-zero on error
+ */
+ int set_start_gtid(rpl_gtid *start);
+
+ /*
+ Set the GTID that ends this window (inclusive)
+
+ Returns 0 on ok, non-zero on error
+ */
+ int set_stop_gtid(rpl_gtid *stop);
+
+ uint32 get_filter_type() { return WINDOW_GTID_FILTER_TYPE; }
+
+private:
+ /*
+ m_has_start : Indicates if a start to this window has been explicitly
+ provided. A window starts immediately if not provided.
+ */
+ my_bool m_has_start;
+
+ /*
+ m_has_stop : Indicates if a stop to this window has been explicitly
+ provided. A window continues indefinitely if not provided.
+ */
+ my_bool m_has_stop;
+
+ /*
+ m_is_active : Indicates whether or not the program is currently reading
+ events from within this window. When TRUE, events with
+ different server ids than those specified by m_start or
+ m_stop will be passed through.
+ */
+ my_bool m_is_active;
+
+ /*
+ m_has_passed : Indicates whether or not the program is currently reading
+ events from within this window.
+ */
+ my_bool m_has_passed;
+
+ /* m_start : marks the GTID that begins the window (exclusive). */
+ rpl_gtid m_start;
+
+ /* m_stop : marks the GTID that ends the range (inclusive). */
+ rpl_gtid m_stop;
+
+ /* last_gtid_seen: saves the last */
+ rpl_gtid last_gtid_seen;
+};
+
+/*
+ Data structure to help with quick lookup for filters. More specifically,
+ if two filters have identifiers that lead to the same hash, they will be
+ put into a linked list.
+*/
+typedef struct _gtid_filter_element
+{
+ Identifiable_gtid_event_filter *filter;
+ struct _gtid_filter_element *next;
+} gtid_filter_element;
+
+/*
+ Gtid_event_filter subclass which has no specific implementation, but rather
+ delegates the filtering to specific identifiable/mapped implementations.
+
+ A default filter is used for GTIDs that are passed through which no explicit
+ filter can be identified.
+
+ This class should be subclassed, where the get_id_from_gtid function
+ specifies how to extract the filter identifier from a GTID.
+*/
+class Id_delegating_gtid_event_filter : public Gtid_event_filter
+{
+public:
+ Id_delegating_gtid_event_filter();
+ ~Id_delegating_gtid_event_filter();
+
+ my_bool exclude(rpl_gtid *gtid);
+ void set_default_filter(Gtid_event_filter *default_filter);
+
+ uint32 get_filter_type() { return DELEGATING_GTID_FILTER_TYPE; }
+
+ virtual gtid_filter_identifier get_id_from_gtid(rpl_gtid *) = 0;
+
+
+protected:
+
+ uint32 m_filter_id_mask;
+ Gtid_event_filter *m_default_filter;
+
+ /*
+ To reduce time to find a gtid window, they are indexed by domain_id. More
+ specifically, domain_ids are arranged into m_filter_id_mask+1 buckets, and
+ each bucket is a linked list of gtid_filter_elements that share the same
+ index. The index itself is found by a bitwise and, i.e.
+ some_rpl_gtid.domain_id & m_filter_id_mask
+ */
+ gtid_filter_element **m_filters_by_id;
+
+ gtid_filter_element *try_find_filter_element_for_id(gtid_filter_identifier);
+ gtid_filter_element *find_or_create_filter_element_for_id(gtid_filter_identifier);
+};
+
+/*
+ A subclass of Id_delegating_gtid_event_filter which identifies filters using the
+ domain id of a GTID.
+
+ Additional helper functions include:
+ add_start_gtid(GTID) : adds a start GTID position to this filter, to be
+ identified by its domain id
+ add_stop_gtid(GTID) : adds a stop GTID position to this filter, to be
+ identified by its domain id
+*/
+class Domain_gtid_event_filter : public Id_delegating_gtid_event_filter
+{
+public:
+
+ /*
+ Returns the domain id of from the input GTID
+ */
+ gtid_filter_identifier get_id_from_gtid(rpl_gtid *gtid)
+ {
+ return gtid->domain_id;
+ }
+
+ /*
+ Helper function to start a GTID window filter at the given GTID
+
+ Returns 0 on ok, non-zero on error
+ */
+ int add_start_gtid(rpl_gtid *gtid);
+
+ /*
+ Helper function to end a GTID window filter at the given GTID
+
+ Returns 0 on ok, non-zero on error
+ */
+ int add_stop_gtid(rpl_gtid *gtid);
+
+private:
+ Window_gtid_event_filter *find_or_create_window_filter_for_id(gtid_filter_identifier);
+};
#endif /* RPL_GTID_H */
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index b9d3eec5a60..b1b450d7b40 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -2121,8 +2121,11 @@ dispatch_command_return dispatch_command(enum enum_server_command command, THD *
case COM_BINLOG_DUMP:
{
ulong pos;
+ uint32 n_start_gtids= 0;
+ rpl_gtid *start_gtids= NULL;
ushort flags;
uint32 slave_server_id;
+ uint32 unpack_idx= 0;
status_var_increment(thd->status_var.com_other);
@@ -2131,19 +2134,42 @@ dispatch_command_return dispatch_command(enum enum_server_command command, THD *
break;
/* TODO: The following has to be changed to an 8 byte integer */
- pos = uint4korr(packet);
- flags = uint2korr(packet + 4);
+ if (packet[4] == '-' && packet[9] == '-')
+ {
+ unpack_idx= 18;
+ while (packet[unpack_idx] == ',')
+ unpack_idx += 19; // 18 chars for gtid + 1 for comma
+ start_gtids= gtid_unpack_string_to_list(packet, unpack_idx, &n_start_gtids);
+
+ /*
+ Set pos to the start of the binlog file for scanning
+
+ TODO: When GTID indexing is complete (MDEV-4991), update pos by
+ looking it up in the index
+ */
+ pos= 4;
+ } /* if (packet[4] == '-' && packet[9] == '-') */
+ else
+ {
+ /* Single numeric log position case */
+ pos = uint4korr(packet);
+ unpack_idx += 4;
+ }
+ flags = uint2korr(packet + unpack_idx);
+ unpack_idx += 2;
thd->variables.server_id=0; /* avoid suicide */
- if ((slave_server_id= uint4korr(packet+6))) // mysqlbinlog.server_id==0
+ if ((slave_server_id= uint4korr(packet+unpack_idx))) // mysqlbinlog.server_id==0
kill_zombie_dump_threads(slave_server_id);
thd->variables.server_id = slave_server_id;
+ unpack_idx += 4;
- const char *name= packet + 10;
+ const char *name= packet + unpack_idx;
size_t nlen= strlen(name);
general_log_print(thd, command, "Log: '%s' Pos: %lu", name, pos);
if (nlen < FN_REFLEN)
- mysql_binlog_send(thd, thd->strmake(name, nlen), (my_off_t)pos, flags);
+ mysql_binlog_send(thd, thd->strmake(name, nlen), (my_off_t)pos, flags,
+ start_gtids, n_start_gtids);
thd->unregister_slave(); // todo: can be extraneous
/* fake COM_QUIT -- if we get here, the thread needs to terminate */
error = TRUE;
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 90fdce1b56f..722ca301c81 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -163,6 +163,8 @@ struct binlog_send_info {
bool should_stop;
size_t dirlen;
+ Gtid_event_filter *gtid_event_filter;
+
binlog_send_info(THD *thd_arg, String *packet_arg, ushort flags_arg,
char *lfn)
: thd(thd_arg), net(&thd_arg->net), packet(packet_arg),
@@ -185,6 +187,8 @@ struct binlog_send_info {
error_text[0] = 0;
bzero(&error_gtid, sizeof(error_gtid));
until_binlog_state.init();
+
+ gtid_event_filter= NULL;
}
};
@@ -1751,6 +1755,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
}
}
+
/* Skip GTID event groups until we reach slave position within a domain_id. */
if (event_type == GTID_EVENT && info->using_gtid_state)
{
@@ -1758,7 +1763,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
slave_connection_state::entry *gtid_entry;
rpl_gtid *gtid;
- if (gtid_state->count() > 0 || until_gtid_state)
+ if (gtid_state->count() > 0 || until_gtid_state || info->gtid_event_filter)
{
rpl_gtid event_gtid;
@@ -1899,6 +1904,17 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
}
}
}
+
+ /*
+ Should this result be excluded from the output?
+ */
+ if (info->gtid_event_filter &&
+ info->gtid_event_filter->exclude(&event_gtid))
+ {
+ info->gtid_skip_group=
+ (flags2 & Gtid_log_event::FL_STANDALONE ? GTID_SKIP_STANDALONE
+ : GTID_SKIP_TRANSACTION);
+ }
}
}
@@ -2110,7 +2126,9 @@ err:
static int init_binlog_sender(binlog_send_info *info,
LOG_INFO *linfo,
const char *log_ident,
- my_off_t *pos)
+ my_off_t *pos,
+ rpl_gtid *start_gtids,
+ size_t n_start_gtids)
{
THD *thd= info->thd;
int error;
@@ -2130,7 +2148,8 @@ static int init_binlog_sender(binlog_send_info *info,
info->current_checksum_alg= get_binlog_checksum_value_at_connect(thd);
info->mariadb_slave_capability= get_mariadb_slave_capability(thd);
- info->using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state);
+ info->using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state) ||
+ start_gtids != NULL;
DBUG_EXECUTE_IF("simulate_non_gtid_aware_master",
info->using_gtid_state= false;);
@@ -2247,6 +2266,21 @@ static int init_binlog_sender(binlog_send_info *info,
info->clear_initial_log_pos= true;
}
+ if (start_gtids != NULL)
+ {
+ Domain_gtid_event_filter *filter= new Domain_gtid_event_filter();
+ my_off_t i;
+ for(i = 0; i < n_start_gtids; i++)
+ {
+ if (filter->add_start_gtid(&start_gtids[i]))
+ {
+ info->errmsg= "Domain id is invalid for GTID start position";
+ info->error= ER_INCORRECT_GTID_STATE;
+ }
+ }
+ info->gtid_event_filter= filter;
+ }
+
return 0;
}
@@ -2840,7 +2874,8 @@ static int send_one_binlog_file(binlog_send_info *info,
}
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
- ushort flags)
+ ushort flags, rpl_gtid *start_gtids,
+ uint32 n_start_gtids)
{
LOG_INFO linfo;
@@ -2860,7 +2895,8 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
bzero((char*) &log,sizeof(log));
- if (init_binlog_sender(info, &linfo, log_ident, &pos))
+ if (init_binlog_sender(info, &linfo, log_ident, &pos, start_gtids,
+ n_start_gtids))
goto err;
has_transmit_started= true;
@@ -3022,6 +3058,8 @@ err:
thd->reset_current_linfo();
thd->variables.max_allowed_packet= old_max_allowed_packet;
delete info->fdev;
+ delete info->gtid_event_filter;
+ my_free(start_gtids);
if (likely(info->error == 0))
{
diff --git a/sql/sql_repl.h b/sql/sql_repl.h
index 95916e31abf..bfc35ea5456 100644
--- a/sql/sql_repl.h
+++ b/sql/sql_repl.h
@@ -57,7 +57,8 @@ struct LOAD_FILE_IO_CACHE : public IO_CACHE
int log_loaded_block(IO_CACHE* file, uchar *Buffer, size_t Count);
int init_replication_sys_vars();
-void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags);
+void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags,
+ rpl_gtid *start_gtids, uint32 n_start_gtids);
#ifdef HAVE_PSI_INTERFACE
extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state;