summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrandon Nesterenko <brandon.nesterenko@mariadb.com>2021-08-11 11:29:37 -0600
committerBrandon Nesterenko <brandon.nesterenko@mariadb.com>2021-08-23 13:57:38 -0600
commitc622af26fca37fc7864e9526a60dbe059644d120 (patch)
treed9ed6be82f0da06e8b2ac68abea5988180289cd0
parent64f7dffcc7e0e69c31d9a36c2090a26300e57c4c (diff)
downloadmariadb-git-10.7-MDEV-4989.tar.gz
MDEV-4989: Support for GTID in mysqlbinlog10.7-MDEV-4989
New Feature: ============ This commit extends the mariadb-binlog capabilities to allow events to be filtered by GTID ranges. More specifically, the following capabilities are addressed: 1) GTIDs can be used to filter results on local binlog files 2) GTIDs can be used to filter results from remote servers 3) For a given GTID range, its start-position is exclusive and its stop-position is inclusive 4) After the events have been written, the session server id and domain id are reset to their former values 5) Output filtered by GTID ranges can be piped to the MariaDB client To facilitate these features, the --start-position and --stop-position arguments have been extended to additionally accept values formatted as a list of GTID positions, e.g. `--start-position=0-1-0,1-2-55` Reviewed By: ============ <TODO>
-rw-r--r--client/mysqlbinlog.cc331
-rw-r--r--mysql-test/suite/binlog/r/binlog_mysqlbinlog_gtid_window.result226
-rw-r--r--mysql-test/suite/binlog/t/binlog_mysqlbinlog_gtid_window-master.opt1
-rw-r--r--mysql-test/suite/binlog/t/binlog_mysqlbinlog_gtid_window.test242
-rw-r--r--sql/rpl_gtid.cc402
-rw-r--r--sql/rpl_gtid.h224
-rw-r--r--sql/sql_parse.cc36
-rw-r--r--sql/sql_repl.cc44
-rw-r--r--sql/sql_repl.h3
9 files changed, 1471 insertions, 38 deletions
diff --git a/client/mysqlbinlog.cc b/client/mysqlbinlog.cc
index d65828ea71c..dcb362e6b3a 100644
--- a/client/mysqlbinlog.cc
+++ b/client/mysqlbinlog.cc
@@ -143,10 +143,18 @@ static char *charset= 0;
static uint verbose= 0;
-static ulonglong start_position, stop_position;
+static char *start_pos_str, *stop_pos_str;
+static ulonglong start_position= BIN_LOG_HEADER_SIZE,
+ stop_position= (longlong)(~(my_off_t)0) ;
#define start_position_mot ((my_off_t)start_position)
#define stop_position_mot ((my_off_t)stop_position)
+static Delegating_gtid_event_filter *gtid_event_filter= NULL;
+static rpl_gtid *start_gtids, *stop_gtids;
+static my_bool is_event_gtid_active= FALSE;
+static uint32 n_start_gtid_ranges= 0;
+static uint32 n_stop_gtid_ranges= 0;
+
static char *start_datetime_str, *stop_datetime_str;
static my_time_t start_datetime= 0, stop_datetime= MY_TIME_T_MAX;
static ulonglong rec_count= 0;
@@ -981,6 +989,40 @@ static bool print_row_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
return result;
}
+static inline my_bool is_gtid_filtering_enabled()
+{
+ return gtid_event_filter != NULL;
+}
+
+/*
+ Where the binlog is processed sequentially, the variable is_event_gtid_active
+ keeps track of the state of active event groups. When a new Gtid_log_event is
+ read, if it should be output, this function will return true. Otherwise, this
+ function will return false.
+*/
+static inline my_bool is_event_group_active()
+{
+ return is_event_gtid_active;
+}
+
+/*
+ When a Gtid_log_event marks a GTID that should be output, this function is
+ invoked to change the program state to start writing binlog events until
+ the event group has ended.
+*/
+static inline void activate_current_event_group()
+{
+ is_event_gtid_active= TRUE;
+}
+
+/*
+ When an active event group has written its last event, this function is
+ invoked to change the program state to stop writing events.
+*/
+static inline void deactivate_current_event_group()
+{
+ is_event_gtid_active= FALSE;
+}
/**
Print the given event, and either delete it or delegate the deletion
@@ -1019,11 +1061,34 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
#endif
/*
+ If the binlog output should be filtered using GTIDs, test the new event
+ group to see if its events should be written or ignored.
+ */
+ if (ev_type == GTID_EVENT && is_gtid_filtering_enabled())
+ {
+ Gtid_log_event *gle= (Gtid_log_event*) ev;
+ rpl_gtid gtid;
+ gtid.domain_id= gle->domain_id;
+ gtid.server_id= gle->server_id;
+ gtid.seq_no= gle->seq_no;
+ if (!gtid_event_filter->exclude(&gtid))
+ {
+ activate_current_event_group();
+ }
+ else
+ {
+ deactivate_current_event_group();
+ }
+
+ }
+
+ /*
Format events are not concerned by --offset and such, we always need to
read them to be able to process the wanted events.
*/
if (((rec_count >= offset) &&
- (ev->when >= start_datetime)) ||
+ (ev->when >= start_datetime) &&
+ (!is_gtid_filtering_enabled() || is_event_group_active())) ||
(ev_type == FORMAT_DESCRIPTION_EVENT))
{
if (ev_type != FORMAT_DESCRIPTION_EVENT)
@@ -1500,6 +1565,13 @@ end:
}
}
+ /* Xid_log_events or Query_log_events mark the end of a GTID event group. */
+ if ((ev_type == XID_EVENT || ev_type == QUERY_EVENT) &&
+ is_event_group_active())
+ {
+ deactivate_current_event_group();
+ }
+
if (destroy_evt) /* destroy it later if not set (ignored table map) */
delete ev;
}
@@ -1658,15 +1730,13 @@ static struct my_option my_options[] =
&start_datetime_str, &start_datetime_str,
0, GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"start-position", 'j',
- "Start reading the binlog at position N. Applies to the first binlog "
- "passed on the command line.",
- &start_position, &start_position, 0, GET_ULL,
- REQUIRED_ARG, BIN_LOG_HEADER_SIZE, BIN_LOG_HEADER_SIZE,
- /*
- COM_BINLOG_DUMP accepts only 4 bytes for the position
- so remote log reading has lower limit.
- */
- (ulonglong)(0xffffffffffffffffULL), 0, 0, 0},
+ "Start reading the binlog at this position. Type can either be a positive "
+ "integer or a GTID. When using a positive integer, the value only "
+ "applies to the first binlog passed on the command line. In GTID mode, "
+ "multiple GTIDs can be passed as a comma separated list, where each must "
+ "have a unique domain id.",
+ &start_pos_str, &start_pos_str, 0, GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0,
+ 0, 0, 0},
{"stop-datetime", OPT_STOP_DATETIME,
"Stop reading the binlog at first event having a datetime equal or "
"posterior to the argument; the argument must be a date and time "
@@ -1684,11 +1754,13 @@ static struct my_option my_options[] =
&opt_stop_never_slave_server_id, &opt_stop_never_slave_server_id, 0,
GET_ULONG, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"stop-position", OPT_STOP_POSITION,
- "Stop reading the binlog at position N. Applies to the last binlog "
- "passed on the command line.",
- &stop_position, &stop_position, 0, GET_ULL,
- REQUIRED_ARG, (longlong)(~(my_off_t)0), BIN_LOG_HEADER_SIZE,
- (ulonglong)(~(my_off_t)0), 0, 0, 0},
+ "Stop reading the binlog at this position. Type can either be a positive "
+ "integer or a GTID. When using a positive integer, the value only "
+ "applies to the last binlog passed on the command line. In GTID mode, "
+ "multiple GTIDs can be passed as a comma separated list, where each must "
+ "have a unique domain id.",
+ &stop_pos_str, &stop_pos_str, 0, GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0,
+ 0, 0},
{"table", 'T', "List entries for just this table (local log only).",
&table, &table, 0, GET_STR_ALLOC, REQUIRED_ARG,
0, 0, 0, 0, 0, 0},
@@ -1824,8 +1896,14 @@ static void cleanup()
my_free(const_cast<char*>(dirname_for_local_load));
my_free(start_datetime_str);
my_free(stop_datetime_str);
+ my_free(start_pos_str);
+ my_free(stop_pos_str);
+ my_free(start_gtids);
+ my_free(stop_gtids);
free_root(&glob_root, MYF(0));
+ delete gtid_event_filter;
+
delete binlog_filter;
delete glob_description_event;
if (mysql)
@@ -2075,6 +2153,123 @@ get_one_option(const struct my_option *opt, const char *argument, const char *fi
print_version();
opt_version= 1;
break;
+ case OPT_STOP_POSITION:
+ {
+ stop_gtids= gtid_parse_string_to_list(stop_pos_str, strlen(stop_pos_str),
+ &n_stop_gtid_ranges);
+ if (stop_gtids == NULL)
+ {
+ int err= 0;
+ char *end_ptr= NULL;
+ /*
+ No GTIDs specified in OPT_STOP_POSITION specification. Treat the value
+ as a singular index.
+ */
+ stop_position= my_strtoll10(stop_pos_str, &end_ptr, &err);
+
+ if (err || *end_ptr)
+ {
+ // Can't parse the position from the user
+ sql_print_error("Stop position argument value is invalid. Should be "
+ "either a position or GTID.");
+ return 1;
+ }
+ }
+ else if (n_stop_gtid_ranges > 0)
+ {
+ uint32 gtid_idx;
+ Domain_gtid_event_filter *domain_filter;
+
+ if (gtid_event_filter == NULL)
+ {
+ domain_filter= new Domain_gtid_event_filter();
+ gtid_event_filter= domain_filter;
+ }
+ else
+ {
+ domain_filter= (Domain_gtid_event_filter *) gtid_event_filter;
+ }
+
+ for (gtid_idx = 0; gtid_idx < n_stop_gtid_ranges; gtid_idx++)
+ {
+ rpl_gtid *stop_gtid= &stop_gtids[gtid_idx];
+ if (!domain_filter->add_stop_gtid(stop_gtid))
+ {
+ sql_print_error("Cannot add stop position; domain id %u "
+ "already has a stop position",
+ stop_gtid->domain_id);
+ return 1;
+ }
+ }
+ }
+ else
+ {
+ // Can't parse the position from the user
+ sql_print_error("Stop position argument value is invalid. Should be "
+ "either a position or GTID.");
+ return 1;
+ }
+ break;
+ }
+ case 'j':
+ {
+ start_gtids= gtid_parse_string_to_list(
+ start_pos_str, strlen(start_pos_str), &n_start_gtid_ranges);
+
+ if (start_gtids == NULL)
+ {
+ int err= 0;
+ char *end_ptr= NULL;
+ /*
+ No GTIDs specified in OPT_START_POSITION specification. Treat the value
+ as a singular index.
+ */
+ start_position= my_strtoll10(start_pos_str, &end_ptr, &err);
+
+ if (err || *end_ptr)
+ {
+ // Can't parse the position from the user
+ sql_print_error("Start position argument value is invalid. Should be "
+ "either a position or GTID.");
+ return 1;
+ }
+ }
+ else if (n_start_gtid_ranges > 0)
+ {
+ uint32 gtid_idx;
+ Domain_gtid_event_filter *domain_filter;
+
+ if (gtid_event_filter == NULL)
+ {
+ domain_filter= new Domain_gtid_event_filter();
+ gtid_event_filter= domain_filter;
+ }
+ else
+ {
+ domain_filter= (Domain_gtid_event_filter *) gtid_event_filter;
+ }
+
+ for (gtid_idx = 0; gtid_idx < n_start_gtid_ranges; gtid_idx++)
+ {
+ rpl_gtid *start_gtid= &start_gtids[gtid_idx];
+ if (!domain_filter->add_start_gtid(start_gtid))
+ {
+ sql_print_error("Cannot add start position; domain id %u "
+ "already has a start position",
+ start_gtid->domain_id);
+ return 1;
+ }
+ }
+ }
+ else
+ {
+ // Can't parse the position from the user
+ sql_print_error("Start position argument value is invalid. Should be "
+ "either a position or GTID.");
+ return 1;
+ }
+ break;
+ }
case '?':
usage();
opt_version= 1;
@@ -2237,11 +2432,12 @@ static Exit_status dump_log_entries(const char* logname)
@retval ERROR_STOP An error occurred - the program should terminate.
@retval OK_CONTINUE No error, the program should continue.
*/
-static Exit_status check_master_version()
+static Exit_status check_master_version(uint *major, uint *minor, uint *patch)
{
MYSQL_RES* res = 0;
MYSQL_ROW row;
uint version;
+ size_t version_iter;
if (mysql_query(mysql, "SELECT VERSION()") ||
!(res = mysql_store_result(mysql)))
@@ -2257,12 +2453,52 @@ static Exit_status check_master_version()
goto err;
}
- if (!(version = atoi(row[0])))
+ if (!(*major = atoi(row[0])))
{
error("Could not find server version: "
"Master reported NULL for the version.");
goto err;
}
+
+ /* Try to save the minor version, if supplied with a storage location */
+ if (minor != NULL)
+ {
+ for (version_iter= 0; version_iter < strlen((const char *) row);
+ version_iter++)
+ {
+ if (row[0][version_iter] == '.')
+ {
+ version_iter= version_iter + 1;
+ break;
+ }
+ }
+ if (!(*minor= atoi(&row[0][version_iter])))
+ {
+ error("Could not find server minor version: "
+ "Master reported NULL for the minor version.");
+ }
+ }
+
+ /* Try to save the patch version, if supplied with a storage location */
+ if (patch != NULL)
+ {
+ for (; version_iter < strlen((const char *) row); version_iter++)
+ {
+ if (row[0][version_iter] == '.')
+ {
+ version_iter= version_iter + 1;
+ break;
+ }
+ }
+ if (!(*patch= atoi(&row[0][version_iter])))
+ {
+ error("Could not find server patch version: "
+ "Master reported NULL for the patch version.");
+ }
+ }
+
+ version= *major;
+
/*
Make a notice to the server that this client
is checksum-aware. It does not need the first fake Rotate
@@ -2606,21 +2842,56 @@ static Exit_status dump_remote_log_entries(PRINT_EVENT_INFO *print_event_info,
DBUG_RETURN(retval);
net= &mysql->net;
- if ((retval= check_master_version()) != OK_CONTINUE)
+ uint major_version, minor_version;
+ if ((retval= check_master_version(&major_version, &minor_version, NULL)) !=
+ OK_CONTINUE)
DBUG_RETURN(retval);
/*
COM_BINLOG_DUMP accepts only 4 bytes for the position, so we are forced to
cast to uint32.
*/
- DBUG_ASSERT(start_position <= UINT_MAX32);
- int4store(buf, (uint32)start_position);
+ size_t buf_idx= 0;
+ if (is_gtid_filtering_enabled())
+ {
+ if (major_version < 10 || (major_version == 10 && minor_version < 7))
+ {
+ error("Master does not support GTID filtering. This feature was added "
+ "in MariaDB version 10.7");
+ DBUG_RETURN(ERROR_STOP);
+ }
+
+ size_t i;
+ for (i = 0; i < n_start_gtid_ranges; i++)
+ {
+ if (i > 0)
+ {
+ buf[buf_idx]= ',';
+ buf_idx += 1;
+ }
+
+
+ int4store(buf + buf_idx, start_gtids[i].domain_id);
+ buf[buf_idx+4]= '-';
+ int4store(buf + buf_idx + 5, start_gtids[i].server_id);
+ buf[buf_idx+9]= '-';
+ int8store(buf + buf_idx + 10, start_gtids[i].seq_no);
+ buf_idx += 18;
+ }
+ }
+ else
+ {
+ DBUG_ASSERT(start_position <= UINT_MAX32);
+ int4store(buf, (uint32) start_position);
+ buf_idx= BIN_LOG_HEADER_SIZE;
+ }
if (!opt_skip_annotate_row_events)
binlog_flags|= BINLOG_SEND_ANNOTATE_ROWS_EVENT;
if (!opt_stop_never)
binlog_flags|= BINLOG_DUMP_NON_BLOCK;
- int2store(buf + BIN_LOG_HEADER_SIZE, binlog_flags);
+ int2store(buf + buf_idx, binlog_flags);
+ buf_idx += 2;
size_t tlen = strlen(logname);
if (tlen > sizeof(buf) - 10)
@@ -2637,9 +2908,10 @@ static Exit_status dump_remote_log_entries(PRINT_EVENT_INFO *print_event_info,
}
else
slave_id= 0;
- int4store(buf + 6, slave_id);
- memcpy(buf + 10, logname, logname_len);
- if (simple_command(mysql, COM_BINLOG_DUMP, buf, logname_len + 10, 1))
+ int4store(buf + buf_idx, slave_id);
+ buf_idx += 4;
+ memcpy(buf + buf_idx, logname, logname_len);
+ if (simple_command(mysql, COM_BINLOG_DUMP, buf, logname_len + buf_idx, 1))
{
error("Got fatal error sending the log dump command.");
DBUG_RETURN(ERROR_STOP);
@@ -3210,6 +3482,14 @@ int main(int argc, char** argv)
"/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;\n");
fprintf(result_file, "/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=0*/;\n");
+
+ if (is_gtid_filtering_enabled())
+ {
+ fprintf(result_file,
+ "/*!100001 SET @@SESSION.SERVER_ID=@@GLOBAL.SERVER_ID */;\n"
+ "/*!100001 SET @@SESSION.GTID_DOMAIN_ID=@@GLOBAL.GTID_DOMAIN_ID "
+ "*/;\n");
+ }
}
if (tmpdir.list)
@@ -3271,3 +3551,4 @@ struct encryption_service_st encryption_handler=
#include "sql_list.cc"
#include "rpl_filter.cc"
#include "compat56.cc"
+#include "rpl_gtid.cc" \ No newline at end of file
diff --git a/mysql-test/suite/binlog/r/binlog_mysqlbinlog_gtid_window.result b/mysql-test/suite/binlog/r/binlog_mysqlbinlog_gtid_window.result
new file mode 100644
index 00000000000..d3fd17b7a18
--- /dev/null
+++ b/mysql-test/suite/binlog/r/binlog_mysqlbinlog_gtid_window.result
@@ -0,0 +1,226 @@
+###############################
+# Test Setup
+###############################
+SET timestamp=1000000000;
+RESET MASTER;
+SET @@session.gtid_domain_id= 0;
+SET @@session.server_id= 1;
+CREATE TABLE t1 (a int);
+CREATE TABLE t2 (a int);
+INSERT INTO t1 values (1), (2);
+SET @@session.gtid_domain_id= 1;
+SET @@session.server_id= 2;
+INSERT INTO t2 values (1);
+INSERT INTO t2 values (2);
+SET @@session.gtid_domain_id= 0;
+SET @@session.server_id= 1;
+INSERT INTO t1 values (3), (4);
+SET @@session.gtid_domain_id= 1;
+SET @@session.server_id= 2;
+INSERT INTO t2 values (3);
+INSERT INTO t2 values (4);
+INSERT INTO t2 values (5);
+SET @@session.server_id= 3;
+INSERT INTO t2 values (6);
+INSERT INTO t2 values (7);
+SET @@session.server_id= 2;
+INSERT INTO t2 values (8);
+FLUSH LOGS;
+###############################
+# Test Cases
+###############################
+#
+# Test Case 1:
+# The end of the binlog file resets the server and domain id of the
+# session
+# MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=0-1-1 --stop-position=0-1-4 --base64-output=NEVER > OUT_FILE
+SET @@SESSION.SERVER_ID=@@GLOBAL.SERVER_ID
+SET @@SESSION.GTID_DOMAIN_ID=@@GLOBAL.GTID_DOMAIN_ID
+#
+# Test Case 2:
+# Local file, single GTID range specified
+# MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=0-1-1 --stop-position=0-1-4 --base64-output=NEVER > OUT_FILE
+end_log_pos # CRC32 XXX Start: binlog v 4, server v #.##.## created 010909 9:46:40 at startup
+end_log_pos # CRC32 XXX GTID 0-1-2 ddl
+end_log_pos # CRC32 XXX Query thread_id=# exec_time=# error_code=0 xid=#
+end_log_pos # CRC32 XXX GTID 0-1-3
+end_log_pos # CRC32 XXX Annotate_rows:
+end_log_pos # CRC32 XXX Table_map: `test`.`t1` mapped to number #
+end_log_pos # CRC32 XXX Write_rows: table id # flags: STMT_END_F
+end_log_pos # CRC32 XXX Query thread_id=# exec_time=# error_code=0 xid=#
+end_log_pos # CRC32 XXX GTID 0-1-4
+end_log_pos # CRC32 XXX Annotate_rows:
+end_log_pos # CRC32 XXX Table_map: `test`.`t1` mapped to number #
+end_log_pos # CRC32 XXX Write_rows: table id # flags: STMT_END_F
+end_log_pos # CRC32 XXX Query thread_id=# exec_time=# error_code=0 xid=#
+#
+# Test Case 3:
+# Local file, single GTID range with different server_ids
+# MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=1-2-2 --stop-position=1-3-6 --base64-output=NEVER > OUT_FILE
+end_log_pos # CRC32 XXX Start: binlog v 4, server v #.##.## created 010909 9:46:40 at startup
+end_log_pos # CRC32 XXX GTID 1-2-3
+end_log_pos # CRC32 XXX Annotate_rows:
+end_log_pos # CRC32 XXX Table_map: `test`.`t2` mapped to number #
+end_log_pos # CRC32 XXX Write_rows: table id # flags: STMT_END_F
+end_log_pos # CRC32 XXX Query thread_id=# exec_time=# error_code=0 xid=#
+end_log_pos # CRC32 XXX GTID 1-2-4
+end_log_pos # CRC32 XXX Annotate_rows:
+end_log_pos # CRC32 XXX Table_map: `test`.`t2` mapped to number #
+end_log_pos # CRC32 XXX Write_rows: table id # flags: STMT_END_F
+end_log_pos # CRC32 XXX Query thread_id=# exec_time=# error_code=0 xid=#
+end_log_pos # CRC32 XXX GTID 1-2-5
+end_log_pos # CRC32 XXX Annotate_rows:
+end_log_pos # CRC32 XXX Table_map: `test`.`t2` mapped to number #
+end_log_pos # CRC32 XXX Write_rows: table id # flags: STMT_END_F
+end_log_pos # CRC32 XXX Query thread_id=# exec_time=# error_code=0 xid=#
+end_log_pos # CRC32 XXX GTID 1-3-6
+end_log_pos # CRC32 XXX Annotate_rows:
+end_log_pos # CRC32 XXX Table_map: `test`.`t2` mapped to number #
+end_log_pos # CRC32 XXX Write_rows: table id # flags: STMT_END_F
+end_log_pos # CRC32 XXX Query thread_id=# exec_time=# error_code=0 xid=#
+#
+# Test Case 4:
+# Local file, multiple GTID ranges specified
+# MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=0-1-1,1-2-0 --stop-position=0-1-4,1-2-3 --base64-output=NEVER > OUT_FILE
+end_log_pos # CRC32 XXX Start: binlog v 4, server v #.##.## created 010909 9:46:40 at startup
+end_log_pos # CRC32 XXX GTID 0-1-2 ddl
+end_log_pos # CRC32 XXX Query thread_id=# exec_time=# error_code=0 xid=#
+end_log_pos # CRC32 XXX GTID 0-1-3
+end_log_pos # CRC32 XXX Annotate_rows:
+end_log_pos # CRC32 XXX Table_map: `test`.`t1` mapped to number #
+end_log_pos # CRC32 XXX Write_rows: table id # flags: STMT_END_F
+end_log_pos # CRC32 XXX Query thread_id=# exec_time=# error_code=0 xid=#
+end_log_pos # CRC32 XXX GTID 1-2-1
+end_log_pos # CRC32 XXX Annotate_rows:
+end_log_pos # CRC32 XXX Table_map: `test`.`t2` mapped to number #
+end_log_pos # CRC32 XXX Write_rows: table id # flags: STMT_END_F
+end_log_pos # CRC32 XXX Query thread_id=# exec_time=# error_code=0 xid=#
+end_log_pos # CRC32 XXX GTID 1-2-2
+end_log_pos # CRC32 XXX Annotate_rows:
+end_log_pos # CRC32 XXX Table_map: `test`.`t2` mapped to number #
+end_log_pos # CRC32 XXX Write_rows: table id # flags: STMT_END_F
+end_log_pos # CRC32 XXX Query thread_id=# exec_time=# error_code=0 xid=#
+end_log_pos # CRC32 XXX GTID 0-1-4
+end_log_pos # CRC32 XXX Annotate_rows:
+end_log_pos # CRC32 XXX Table_map: `test`.`t1` mapped to number #
+end_log_pos # CRC32 XXX Write_rows: table id # flags: STMT_END_F
+end_log_pos # CRC32 XXX Query thread_id=# exec_time=# error_code=0 xid=#
+end_log_pos # CRC32 XXX GTID 1-2-3
+end_log_pos # CRC32 XXX Annotate_rows:
+end_log_pos # CRC32 XXX Table_map: `test`.`t2` mapped to number #
+end_log_pos # CRC32 XXX Write_rows: table id # flags: STMT_END_F
+end_log_pos # CRC32 XXX Query thread_id=# exec_time=# error_code=0 xid=#
+#
+# Test Case 5:
+# Local file, multiple GTID ranges specified where the domain ids are
+# listed in different orders between start/stop position
+# MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --stop-position=0-1-4,1-2-3 --start-position=1-2-0,0-1-1 --base64-output=NEVER > MYSQLTEST_VARDIR/tmp/binlog2.out
+# --diff_files OUT_FILE MYSQLTEST_VARDIR/tmp/binlog2.out
+#
+# Test Case 6:
+# Local file, only start position specified
+# MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=0-1-2 --base64-output=NEVER > OUT_FILE
+end_log_pos # CRC32 XXX Start: binlog v 4, server v #.##.## created 010909 9:46:40 at startup
+end_log_pos # CRC32 XXX GTID 0-1-3
+end_log_pos # CRC32 XXX Annotate_rows:
+end_log_pos # CRC32 XXX Table_map: `test`.`t1` mapped to number #
+end_log_pos # CRC32 XXX Write_rows: table id # flags: STMT_END_F
+end_log_pos # CRC32 XXX Query thread_id=# exec_time=# error_code=0 xid=#
+end_log_pos # CRC32 XXX GTID 0-1-4
+end_log_pos # CRC32 XXX Annotate_rows:
+end_log_pos # CRC32 XXX Table_map: `test`.`t1` mapped to number #
+end_log_pos # CRC32 XXX Write_rows: table id # flags: STMT_END_F
+end_log_pos # CRC32 XXX Query thread_id=# exec_time=# error_code=0 xid=#
+#
+# Test Case 7:
+# Local file, only stop position specified
+# MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --stop-position=0-1-2 --base64-output=NEVER > OUT_FILE
+end_log_pos # CRC32 XXX Start: binlog v 4, server v #.##.## created 010909 9:46:40 at startup
+end_log_pos # CRC32 XXX GTID 0-1-1 ddl
+end_log_pos # CRC32 XXX Query thread_id=# exec_time=# error_code=0 xid=#
+end_log_pos # CRC32 XXX GTID 0-1-2 ddl
+end_log_pos # CRC32 XXX Query thread_id=# exec_time=# error_code=0 xid=#
+#
+# Test Case 8:
+# Remote host, single GTID range specified
+# MYSQL_BINLOG master-bin.000001 --read-from-remote-server --start-position=0-1-1 --stop-position=0-1-4 --base64-output=NEVER > OUT_FILE
+end_log_pos # CRC32 XXX Start: binlog v 4, server v #.##.## created 010909 9:46:40 at startup
+end_log_pos # CRC32 XXX GTID 0-1-2 ddl
+end_log_pos # CRC32 XXX Query thread_id=# exec_time=# error_code=0 xid=#
+end_log_pos # CRC32 XXX GTID 0-1-3
+end_log_pos # CRC32 XXX Annotate_rows:
+end_log_pos # CRC32 XXX Table_map: `test`.`t1` mapped to number #
+end_log_pos # CRC32 XXX Write_rows: table id # flags: STMT_END_F
+end_log_pos # CRC32 XXX Query thread_id=# exec_time=# error_code=0 xid=#
+end_log_pos # CRC32 XXX GTID 0-1-4
+end_log_pos # CRC32 XXX Annotate_rows:
+end_log_pos # CRC32 XXX Table_map: `test`.`t1` mapped to number #
+end_log_pos # CRC32 XXX Write_rows: table id # flags: STMT_END_F
+end_log_pos # CRC32 XXX Query thread_id=# exec_time=# error_code=0 xid=#
+SET @@SESSION.SERVER_ID=@@GLOBAL.SERVER_ID
+SET @@SESSION.GTID_DOMAIN_ID=@@GLOBAL.GTID_DOMAIN_ID
+#
+# Test Case 9:
+# The output filtered by GTID ranges can be piped back into the
+# MariaDB client
+# MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --stop-position=0-1-4,1-2-8 > MYSQLTEST_VARDIR/tmp/ir.out
+DROP TABLE t1;
+DROP TABLE t2;
+RESET MASTER;
+# MYSQL < MYSQLTEST_VARDIR/tmp/ir.out
+FLUSH LOGS;
+show tables;
+Tables_in_test
+t1
+t2
+SELECT * FROM t1;
+a
+1
+2
+3
+4
+SELECT * FROM t2;
+a
+1
+2
+3
+4
+5
+6
+7
+8
+# MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --base64-output=NEVER > OUT_FILE
+GTID 0-1-1
+GTID 0-1-2
+GTID 0-1-3
+GTID 1-2-1
+GTID 1-2-2
+GTID 0-1-4
+GTID 1-2-3
+GTID 1-2-4
+GTID 1-2-5
+GTID 1-3-6
+GTID 1-3-7
+GTID 1-2-8
+##############################
+# Error Cases
+##############################
+#
+# Error Case 1:
+# User provides invalid positions
+# MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=z --base64-output=NEVER > OUT_FILE
+# MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=1- --base64-output=NEVER > OUT_FILE
+# MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=1-2 --base64-output=NEVER > OUT_FILE
+# MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=1-2- --base64-output=NEVER > OUT_FILE
+# MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=-1 --base64-output=NEVER > OUT_FILE
+#
+# Error Case 2:
+# User provides GTID ranges with repeated domain ids
+# MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=0-1-1,0-1-8 --stop-position=0-1-4,0-1-12 --base64-output=NEVER > OUT_FILE
+##############################
+# Cleanup
+##############################
+DROP TABLE t1;
+DROP TABLE t2;
+SET @@global.gtid_domain_id= 0;
+SET @@global.server_id= 1;
diff --git a/mysql-test/suite/binlog/t/binlog_mysqlbinlog_gtid_window-master.opt b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_gtid_window-master.opt
new file mode 100644
index 00000000000..d17999c07c1
--- /dev/null
+++ b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_gtid_window-master.opt
@@ -0,0 +1 @@
+--timezone=GMT-8 \ No newline at end of file
diff --git a/mysql-test/suite/binlog/t/binlog_mysqlbinlog_gtid_window.test b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_gtid_window.test
new file mode 100644
index 00000000000..9611cb381da
--- /dev/null
+++ b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_gtid_window.test
@@ -0,0 +1,242 @@
+#
+# Purpose:
+# This test ensures that the mariadb-binlog CLI tool can filter log events
+# using GTID ranges. More specifically, this test ensures the following
+# capabilities:
+# 1) GTIDs can be used to filter results on local binlog files
+# 2) GTIDs can be used to filter results from remote servers
+# 3) For a given GTID range, its start-position is exclusive and its
+# stop-position is inclusive
+# 4) After the events have been written, the session server id and domain id
+# are reset to their former values
+# 5) Output filtered by GTID ranges can be piped to the MariaDB client
+#
+# Methodology:
+# This test validates the expected capabilities using the following test
+# cases:
+# Test Case 1) The end of the binlog file resets the server and domain id of
+# the session
+# Test Case 2) Local file, single GTID range specified
+# Test Case 3) Local file, single GTID range with different server_ids
+# Test Case 4) Local file, multiple GTID ranges specified
+# Test Case 5) Local file, multiple GTID ranges specified where the domain
+# ids are listed in different orders between start/stop position
+# Test Case 6) Local file, only start position specified
+# Test Case 7) Local file, only stop position specified
+# Test Case 8) Remote host, single GTID range specified
+# Test Case 9) The output filtered by GTID ranges can be piped back into the
+# MariaDB client
+#
+# Additionally, this test validates the following error scenarios:
+# Error Case 1) User provides invalid positions
+# Error Case 2: User provides GTID ranges with repeated domain ids
+#
+# References:
+# MDEV-4989: Support for GTID in mysqlbinlog
+#
+
+--source include/have_log_bin.inc
+--source include/have_binlog_format_row.inc
+
+--echo ###############################
+--echo # Test Setup
+--echo ###############################
+
+## Fix timestamp to avoid varying results.
+#
+SET timestamp=1000000000;
+RESET MASTER;
+
+## Save old state
+#
+let $ORIG_GTID_DOMAIN_ID = `select @@session.gtid_domain_id`;
+let $ORIG_SERVER_ID = `select @@session.server_id`;
+
+## Configure test variables
+#
+--let $MYSQLD_DATADIR=`select @@datadir`
+--let OUT_FILE=$MYSQLTEST_VARDIR/tmp/binlog.out
+--let SEARCH_OUTPUT=matches
+--let SEARCH_FILE=$OUT_FILE
+
+## Initialize test data
+#
+SET @@session.gtid_domain_id= 0;
+SET @@session.server_id= 1;
+
+CREATE TABLE t1 (a int);
+CREATE TABLE t2 (a int);
+
+INSERT INTO t1 values (1), (2);
+
+SET @@session.gtid_domain_id= 1;
+SET @@session.server_id= 2;
+INSERT INTO t2 values (1);
+INSERT INTO t2 values (2);
+
+SET @@session.gtid_domain_id= 0;
+SET @@session.server_id= 1;
+INSERT INTO t1 values (3), (4);
+
+SET @@session.gtid_domain_id= 1;
+SET @@session.server_id= 2;
+INSERT INTO t2 values (3);
+INSERT INTO t2 values (4);
+INSERT INTO t2 values (5);
+
+SET @@session.server_id= 3;
+INSERT INTO t2 values (6);
+INSERT INTO t2 values (7);
+
+SET @@session.server_id= 2;
+INSERT INTO t2 values (8);
+
+FLUSH LOGS;
+
+
+--echo ###############################
+--echo # Test Cases
+--echo ###############################
+
+--echo #
+--echo # Test Case 1:
+--echo # The end of the binlog file resets the server and domain id of the
+--echo # session
+--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=0-1-1 --stop-position=0-1-4 --base64-output=NEVER > OUT_FILE
+--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --start-position=0-1-1 --stop-position=0-1-4 --base64-output=NEVER > $OUT_FILE
+--let SEARCH_PATTERN=SET @@SESSION\.[\w]*_ID[\h]*=[\h]*@@GLOBAL\.[\w]+_ID
+--source include/search_pattern_in_file.inc
+
+--echo #
+--echo # Test Case 2:
+--echo # Local file, single GTID range specified
+--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=0-1-1 --stop-position=0-1-4 --base64-output=NEVER > OUT_FILE
+--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --start-position=0-1-1 --stop-position=0-1-4 --base64-output=NEVER > $OUT_FILE
+--let SEARCH_PATTERN=end_log_pos[^\n]+
+--replace_result $MYSQLTEST_VARDIR MYSQLTEST_VARDIR
+--replace_regex /SQL_LOAD_MB-[0-9]-[0-9]/SQL_LOAD_MB-#-#/ /exec_time=[0-9]*/exec_time=#/ /end_log_pos [0-9]*/end_log_pos #/ /# at [0-9]*/# at #/ /Xid = [0-9]*/Xid = #/ /xid=[0-9]*/xid=#/ /thread_id=[0-9]*/thread_id=#/ /table id [0-9]*/table id #/ /mapped to number [0-9]*/mapped to number #/ /server v [^ ]*/server v #.##.##/ /CRC32 0x[0-9a-f]*/CRC32 XXX/ /collation_server=[0-9]+/collation_server=X/ /character_set_client=[0-9]+/character_set_client=X/ /collation_connection=[0-9]+/collation_connection=X/
+--source include/search_pattern_in_file.inc
+
+--echo #
+--echo # Test Case 3:
+--echo # Local file, single GTID range with different server_ids
+--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=1-2-2 --stop-position=1-3-6 --base64-output=NEVER > OUT_FILE
+--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --start-position=1-2-2 --stop-position=1-3-6 --base64-output=NEVER > $OUT_FILE
+--let SEARCH_PATTERN=end_log_pos[^\n]+
+--replace_result $MYSQLTEST_VARDIR MYSQLTEST_VARDIR
+--replace_regex /SQL_LOAD_MB-[0-9]-[0-9]/SQL_LOAD_MB-#-#/ /exec_time=[0-9]*/exec_time=#/ /end_log_pos [0-9]*/end_log_pos #/ /# at [0-9]*/# at #/ /Xid = [0-9]*/Xid = #/ /xid=[0-9]*/xid=#/ /thread_id=[0-9]*/thread_id=#/ /table id [0-9]*/table id #/ /mapped to number [0-9]*/mapped to number #/ /server v [^ ]*/server v #.##.##/ /CRC32 0x[0-9a-f]*/CRC32 XXX/ /collation_server=[0-9]+/collation_server=X/ /character_set_client=[0-9]+/character_set_client=X/ /collation_connection=[0-9]+/collation_connection=X/
+--source include/search_pattern_in_file.inc
+
+--echo #
+--echo # Test Case 4:
+--echo # Local file, multiple GTID ranges specified
+--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=0-1-1,1-2-0 --stop-position=0-1-4,1-2-3 --base64-output=NEVER > OUT_FILE
+--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --start-position=0-1-1,1-2-0 --stop-position=0-1-4,1-2-3 --base64-output=NEVER > $OUT_FILE
+--let SEARCH_PATTERN=end_log_pos[^\n]+
+--replace_result $MYSQLTEST_VARDIR MYSQLTEST_VARDIR
+--replace_regex /SQL_LOAD_MB-[0-9]-[0-9]/SQL_LOAD_MB-#-#/ /exec_time=[0-9]*/exec_time=#/ /end_log_pos [0-9]*/end_log_pos #/ /# at [0-9]*/# at #/ /Xid = [0-9]*/Xid = #/ /xid=[0-9]*/xid=#/ /thread_id=[0-9]*/thread_id=#/ /table id [0-9]*/table id #/ /mapped to number [0-9]*/mapped to number #/ /server v [^ ]*/server v #.##.##/ /CRC32 0x[0-9a-f]*/CRC32 XXX/ /collation_server=[0-9]+/collation_server=X/ /character_set_client=[0-9]+/character_set_client=X/ /collation_connection=[0-9]+/collation_connection=X/
+--source include/search_pattern_in_file.inc
+
+--echo #
+--echo # Test Case 5:
+--echo # Local file, multiple GTID ranges specified where the domain ids are
+--echo # listed in different orders between start/stop position
+--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --stop-position=0-1-4,1-2-3 --start-position=1-2-0,0-1-1 --base64-output=NEVER > MYSQLTEST_VARDIR/tmp/binlog2.out
+--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --stop-position=0-1-4,1-2-3 --start-position=1-2-0,0-1-1 --base64-output=NEVER > $MYSQLTEST_VARDIR/tmp/binlog2.out
+--let SEARCH_PATTERN=end_log_pos[^\n]+
+--echo # --diff_files OUT_FILE MYSQLTEST_VARDIR/tmp/binlog2.out
+--diff_files $OUT_FILE $MYSQLTEST_VARDIR/tmp/binlog2.out
+
+--echo #
+--echo # Test Case 6:
+--echo # Local file, only start position specified
+--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=0-1-2 --base64-output=NEVER > OUT_FILE
+--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --start-position=0-1-2 --base64-output=NEVER > $OUT_FILE
+--let SEARCH_PATTERN=end_log_pos[^\n]+
+--replace_result $MYSQLTEST_VARDIR MYSQLTEST_VARDIR
+--replace_regex /SQL_LOAD_MB-[0-9]-[0-9]/SQL_LOAD_MB-#-#/ /exec_time=[0-9]*/exec_time=#/ /end_log_pos [0-9]*/end_log_pos #/ /# at [0-9]*/# at #/ /Xid = [0-9]*/Xid = #/ /xid=[0-9]*/xid=#/ /thread_id=[0-9]*/thread_id=#/ /table id [0-9]*/table id #/ /mapped to number [0-9]*/mapped to number #/ /server v [^ ]*/server v #.##.##/ /CRC32 0x[0-9a-f]*/CRC32 XXX/ /collation_server=[0-9]+/collation_server=X/ /character_set_client=[0-9]+/character_set_client=X/ /collation_connection=[0-9]+/collation_connection=X/
+--source include/search_pattern_in_file.inc
+
+--echo #
+--echo # Test Case 7:
+--echo # Local file, only stop position specified
+--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --stop-position=0-1-2 --base64-output=NEVER > OUT_FILE
+--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --stop-position=0-1-2 --base64-output=NEVER > $OUT_FILE
+--let SEARCH_PATTERN=end_log_pos[^\n]+
+--replace_result $MYSQLTEST_VARDIR MYSQLTEST_VARDIR
+--replace_regex /SQL_LOAD_MB-[0-9]-[0-9]/SQL_LOAD_MB-#-#/ /exec_time=[0-9]*/exec_time=#/ /end_log_pos [0-9]*/end_log_pos #/ /# at [0-9]*/# at #/ /Xid = [0-9]*/Xid = #/ /xid=[0-9]*/xid=#/ /thread_id=[0-9]*/thread_id=#/ /table id [0-9]*/table id #/ /mapped to number [0-9]*/mapped to number #/ /server v [^ ]*/server v #.##.##/ /CRC32 0x[0-9a-f]*/CRC32 XXX/ /collation_server=[0-9]+/collation_server=X/ /character_set_client=[0-9]+/character_set_client=X/ /collation_connection=[0-9]+/collation_connection=X/
+--source include/search_pattern_in_file.inc
+
+--echo #
+--echo # Test Case 8:
+--echo # Remote host, single GTID range specified
+--echo # MYSQL_BINLOG master-bin.000001 --read-from-remote-server --start-position=0-1-1 --stop-position=0-1-4 --base64-output=NEVER > OUT_FILE
+--exec $MYSQL_BINLOG master-bin.000001 --read-from-remote-server --start-position=0-1-1 --stop-position=0-1-4 --base64-output=NEVER > $OUT_FILE
+--let SEARCH_PATTERN=SET @@SESSION\.[\w]*_ID[\h]*=[\h]*@@GLOBAL\.[\w]+_ID|end_log_pos[^\n]+
+--replace_result $MYSQLTEST_VARDIR MYSQLTEST_VARDIR
+--replace_regex /SQL_LOAD_MB-[0-9]-[0-9]/SQL_LOAD_MB-#-#/ /exec_time=[0-9]*/exec_time=#/ /end_log_pos [0-9]*/end_log_pos #/ /# at [0-9]*/# at #/ /Xid = [0-9]*/Xid = #/ /xid=[0-9]*/xid=#/ /thread_id=[0-9]*/thread_id=#/ /table id [0-9]*/table id #/ /mapped to number [0-9]*/mapped to number #/ /server v [^ ]*/server v #.##.##/ /CRC32 0x[0-9a-f]*/CRC32 XXX/ /collation_server=[0-9]+/collation_server=X/ /character_set_client=[0-9]+/character_set_client=X/ /collation_connection=[0-9]+/collation_connection=X/
+--source include/search_pattern_in_file.inc
+
+--echo #
+--echo # Test Case 9:
+--echo # The output filtered by GTID ranges can be piped back into the
+--echo # MariaDB client
+--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --stop-position=0-1-4,1-2-8 > MYSQLTEST_VARDIR/tmp/ir.out
+--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --stop-position=0-1-4,1-2-8 > $MYSQLTEST_VARDIR/tmp/ir.out
+DROP TABLE t1;
+DROP TABLE t2;
+RESET MASTER;
+--echo # MYSQL < MYSQLTEST_VARDIR/tmp/ir.out
+--exec $MYSQL < $MYSQLTEST_VARDIR/tmp/ir.out
+FLUSH LOGS;
+show tables;
+SELECT * FROM t1;
+SELECT * FROM t2;
+--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --base64-output=NEVER > OUT_FILE
+--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --base64-output=NEVER > $OUT_FILE
+--let SEARCH_PATTERN=GTID \d+-\d+-\d+
+--source include/search_pattern_in_file.inc
+
+
+--echo ##############################
+--echo # Error Cases
+--echo ##############################
+
+--echo #
+--echo # Error Case 1:
+--echo # User provides invalid positions
+--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=z --base64-output=NEVER > OUT_FILE
+--error 1
+--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --start-position=z --base64-output=NEVER > $OUT_FILE
+
+--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=1- --base64-output=NEVER > OUT_FILE
+--error 1
+--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --start-position=1- --base64-output=NEVER > $OUT_FILE
+
+--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=1-2 --base64-output=NEVER > OUT_FILE
+--error 1
+--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --start-position=1-2 --base64-output=NEVER > $OUT_FILE
+
+--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=1-2- --base64-output=NEVER > OUT_FILE
+--error 1
+--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --start-position=1-2- --base64-output=NEVER > $OUT_FILE
+
+--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=-1 --base64-output=NEVER > OUT_FILE
+--error 1
+--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --start-position=-1 --base64-output=NEVER > $OUT_FILE
+
+--echo #
+--echo # Error Case 2:
+--echo # User provides GTID ranges with repeated domain ids
+--echo # MYSQL_BINLOG MYSQLD_DATADIR/master-bin.000001 --start-position=0-1-1,0-1-8 --stop-position=0-1-4,0-1-12 --base64-output=NEVER > OUT_FILE
+--error 1
+--exec $MYSQL_BINLOG $MYSQLD_DATADIR/master-bin.000001 --start-position=0-1-1,0-1-8 --stop-position=0-1-4,0-1-12 --base64-output=NEVER > $OUT_FILE
+
+--echo ##############################
+--echo # Cleanup
+--echo ##############################
+DROP TABLE t1;
+DROP TABLE t2;
+--eval SET @@global.gtid_domain_id= $ORIG_GTID_DOMAIN_ID
+--eval SET @@global.server_id= $ORIG_SERVER_ID
+
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index 8b10703fdc2..52e14714a9b 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -17,6 +17,7 @@
/* Definitions for MariaDB global transaction ID (GTID). */
+#ifndef MYSQL_CLIENT
#include "mariadb.h"
#include "sql_priv.h"
#include "unireg.h"
@@ -1268,6 +1269,7 @@ rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid)
return true;
}
+#endif
/*
Parse a GTID at the start of a string, and update the pointer to point
@@ -1305,9 +1307,32 @@ gtid_parser_helper(const char **ptr, const char *end, rpl_gtid *out_gtid)
return 0;
}
+/*
+ Unpack a GTID at the start of a string, and update the pointer to point
+ at the first character after the unpacked GTID.
+
+ Returns 0 on ok, non-zero on parse error.
+*/
+static int
+gtid_unpack_helper(const char **ptr, const char *end, rpl_gtid *out_gtid)
+{
+ const char *p= *ptr;
+
+ if (p[4] != '-' || p[9] != '-')
+ return 1;
+
+ out_gtid->domain_id= (uint32)uint4korr(p);
+ out_gtid->server_id= (uint32)uint4korr(&p[5]);
+ out_gtid->seq_no= (uint64)uint8korr(&p[10]);
+
+ *ptr= p + 18;
+ return 0;
+}
rpl_gtid *
-gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len)
+gtid_read_to_list(const char *str, size_t str_len, uint32 *out_len,
+ int reader_f(const char **ptr, const char *end,
+ rpl_gtid *out_gtid))
{
const char *p= const_cast<char *>(str);
const char *end= p + str_len;
@@ -1318,7 +1343,7 @@ gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len)
{
rpl_gtid gtid;
- if (len >= (((uint32)1 << 28)-1) || gtid_parser_helper(&p, end, &gtid))
+ if (len >= (((uint32)1 << 28)-1) || reader_f(&p, end, &gtid))
{
my_free(list);
return NULL;
@@ -1345,6 +1370,20 @@ gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len)
}
+rpl_gtid *
+gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len)
+{
+ return gtid_read_to_list(str, str_len, out_len, gtid_parser_helper);
+}
+
+rpl_gtid *
+gtid_unpack_string_to_list(const char *str, size_t str_len, uint32 *out_len)
+{
+ return gtid_read_to_list(str, str_len, out_len, gtid_unpack_helper);
+}
+
+#ifndef MYSQL_CLIENT
+
/*
Update the slave replication state with the GTID position obtained from
master when connecting with old-style (filename,offset) position.
@@ -2952,3 +2991,362 @@ gtid_waiting::remove_from_wait_queue(gtid_waiting::hash_element *he,
queue_remove(&he->queue, elem->queue_idx);
}
+
+#endif
+
+Window_gtid_event_filter::Window_gtid_event_filter() :
+ m_has_start(FALSE),
+ m_has_stop(FALSE),
+ m_is_active(FALSE),
+ m_has_passed(FALSE)
+ {
+ // m_start and m_stop do not need initial values if unused
+ }
+
+Window_gtid_event_filter::Window_gtid_event_filter(rpl_gtid *start, rpl_gtid *stop) :
+ m_is_active(FALSE),
+ m_has_passed(FALSE)
+{
+ DBUG_ASSERT(start->domain_id == stop->domain_id);
+
+ m_is_active= FALSE;
+ m_has_passed= FALSE;
+
+ m_has_start= TRUE;
+ m_start.domain_id= start->domain_id;
+ m_start.server_id= start->server_id;
+ m_start.seq_no= start->seq_no;
+
+ m_has_stop= TRUE;
+ m_stop.domain_id= stop->domain_id;
+ m_stop.server_id= stop->server_id;
+ m_stop.seq_no= stop->seq_no;
+}
+
+my_bool Window_gtid_event_filter::set_start_gtid(rpl_gtid *start)
+{
+ if (m_has_start)
+ return FALSE;
+
+ // Copy values
+ m_has_start= TRUE;
+ m_start.domain_id= start->domain_id;
+ m_start.server_id= start->server_id;
+ m_start.seq_no= start->seq_no;
+
+ return TRUE;
+}
+
+my_bool Window_gtid_event_filter::set_stop_gtid(rpl_gtid *stop)
+{
+ if (m_has_stop)
+ return FALSE;
+
+ // Copy values
+ m_has_stop= TRUE;
+ m_stop.domain_id= stop->domain_id;
+ m_stop.server_id= stop->server_id;
+ m_stop.seq_no= stop->seq_no;
+
+ return TRUE;
+}
+
+gtid_filter_identifier Window_gtid_event_filter::get_filter_identifier()
+{
+ DBUG_ASSERT(m_has_start || m_has_stop);
+ if (m_has_start)
+ return m_start.domain_id;
+ else
+ return m_stop.domain_id;
+}
+
+static inline my_bool is_gtid_at_or_after(rpl_gtid *boundary,
+ rpl_gtid *test_gtid)
+{
+ return test_gtid->domain_id == boundary->domain_id &&
+ test_gtid->server_id == boundary->server_id &&
+ test_gtid->seq_no >= boundary->seq_no;
+}
+
+static inline my_bool is_gtid_before(rpl_gtid *boundary,
+ rpl_gtid *test_gtid)
+{
+ return test_gtid->domain_id == boundary->domain_id &&
+ test_gtid->server_id == boundary->server_id &&
+ test_gtid->seq_no <= boundary->seq_no;
+}
+
+my_bool Window_gtid_event_filter::exclude(rpl_gtid *gtid)
+{
+ /* Assume result should be excluded to start */
+ my_bool should_exclude= TRUE;
+
+ DBUG_ASSERT((m_has_start || m_has_stop) &&
+ (gtid->domain_id == m_start.domain_id ||
+ gtid->domain_id == m_stop.domain_id));
+
+ if (!m_is_active && !m_has_passed)
+ {
+ /*
+ This filter has not yet been activated. Check if the gtid is within the
+ bounds of this window.
+ */
+
+ if (!m_has_start)
+ {
+ /*
+ Start GTID was not provided, so we want to include everything up to m_stop
+ */
+ m_is_active= TRUE;
+ should_exclude= FALSE;
+ }
+ else if (is_gtid_at_or_after(&m_start, gtid))
+ {
+ m_is_active= TRUE;
+
+ DBUG_PRINT("gtid-event-filter",
+ ("Window: Begin (%d-%d-%d, %d-%d-%llu]", m_start.domain_id,
+ m_start.server_id, m_start.seq_no, m_stop.domain_id,
+ m_stop.server_id, m_stop.seq_no));
+
+ /*
+ As the start of the range is exclusive, if this gtid is the start of
+ the range, exclude it
+ */
+ if (gtid->seq_no == m_start.seq_no &&
+ gtid->server_id == m_start.server_id)
+ should_exclude= TRUE;
+ else
+ should_exclude= FALSE;
+ }
+ } /* if (!m_is_active && !m_has_passed) */
+ else if (m_is_active && !m_has_passed)
+ {
+ /*
+ This window is currently active so we want the event group to be included
+ in the results. Additionally check if we are at the end of the window.
+ If no end of the window is provided, go indefinitely
+ */
+ should_exclude= FALSE;
+
+ if (m_has_stop && is_gtid_at_or_after(&m_stop, gtid))
+ {
+ DBUG_PRINT("gtid-event-filter",
+ ("Window: End (%d-%d-%d, %d-%d-%llu]", m_start.domain_id,
+ m_start.server_id, m_start.seq_no, m_stop.domain_id,
+ m_stop.server_id, m_stop.seq_no));
+ m_is_active= FALSE;
+ m_has_passed= TRUE;
+
+ if (gtid->server_id == m_stop.server_id && gtid->seq_no > m_stop.seq_no)
+ {
+ /*
+ The GTID is after the finite stop of the window, don't let it pass
+ through
+ */
+ should_exclude= TRUE;
+ }
+ }
+ else if (m_has_start && is_gtid_before(&m_start, gtid))
+ {
+ /*
+ Out of order check, the window is active but this GTID takes place
+ before the window begins. keep the window active, but exclude it from
+ passing through.
+ */
+ should_exclude= TRUE;
+ }
+ }
+ else if (m_has_passed && m_has_stop && is_gtid_before(&m_stop, gtid))
+ {
+ /* Test if events are out of order */
+ if (!m_has_start || (m_has_start && is_gtid_at_or_after(&m_start, gtid)))
+ {
+ /*
+ The filter window has closed because it has seen a GTID higher than its
+ end boundary; however, this GTID is out of order and should be passed
+ through.
+ */
+ should_exclude= TRUE;
+ }
+ }
+
+ return should_exclude;
+}
+
+Delegating_gtid_event_filter::Delegating_gtid_event_filter()
+{
+ uint32 i;
+
+ m_filter_id_mask= 0xf;
+
+ m_filters_by_id= (gtid_filter_element **) my_malloc(
+ PSI_NOT_INSTRUMENTED,
+ (m_filter_id_mask + 1) * sizeof(gtid_filter_element *),
+ MYF(MY_WME)
+ );
+
+ DBUG_ASSERT(m_filters_by_id != NULL);
+
+ for (i = 0; i <= m_filter_id_mask; i++)
+ {
+ m_filters_by_id[i]= NULL;
+ }
+
+ m_default_filter= new Reject_all_gtid_filter();
+}
+
+/*
+ Deconstructor deletes:
+ 1) All Identifiable_gtid_event_filters added
+ 2) All gtid_filter_element allocations
+*/
+Delegating_gtid_event_filter::~Delegating_gtid_event_filter()
+{
+ uint32 i;
+ for (i = 0; i <= m_filter_id_mask; i++)
+ {
+ gtid_filter_element *filter_element= m_filters_by_id[i],
+ *filter_element_to_del= NULL;
+ while(filter_element)
+ {
+ filter_element_to_del= filter_element;
+ filter_element= filter_element->next;
+ delete filter_element_to_del->filter;
+ my_free(filter_element_to_del);
+ }
+ }
+ my_free(m_filters_by_id);
+
+ delete m_default_filter;
+}
+
+void Delegating_gtid_event_filter::set_default_filter(Gtid_event_filter *filter)
+{
+ if (m_default_filter)
+ delete m_default_filter;
+
+ m_default_filter= filter;
+}
+
+gtid_filter_element *
+Delegating_gtid_event_filter::try_find_filter_element_for_id(
+ gtid_filter_identifier filter_id)
+{
+ // Add this into the domain id list
+ uint32 map_idx= filter_id & m_filter_id_mask;
+ gtid_filter_element *filter_idx= m_filters_by_id[map_idx];
+
+ /* Find list index to add this filter */
+ while (filter_idx)
+ {
+ if (filter_idx->identifier == filter_id)
+ break;
+ filter_idx= filter_idx->next;
+ }
+
+ return filter_idx;
+}
+
+gtid_filter_element *
+Delegating_gtid_event_filter::find_or_create_filter_element_for_id(
+ gtid_filter_identifier filter_id)
+{
+ // Add this into the domain id list
+ uint32 map_idx= filter_id & m_filter_id_mask;
+ gtid_filter_element *filter_idx= m_filters_by_id[map_idx],
+ *prev_idx= NULL;
+
+ /* Find list index to add this filter */
+ while (filter_idx)
+ {
+ prev_idx= filter_idx;
+ if (filter_idx->identifier == filter_id)
+ {
+ break;
+ }
+ prev_idx= filter_idx;
+ filter_idx= filter_idx->next;
+ }
+
+ if (filter_idx == NULL)
+ {
+ // No other domain ids have filters that index here, create this one
+ filter_idx= (gtid_filter_element *) my_malloc(
+ PSI_NOT_INSTRUMENTED, sizeof(gtid_filter_element), MYF(MY_WME));
+ filter_idx->identifier= filter_id;
+ filter_idx->next= NULL;
+ filter_idx->filter= NULL;
+
+ if (prev_idx == NULL)
+ {
+ // This is the first filter in the bucket
+ m_filters_by_id[map_idx]= filter_idx;
+ }
+ else
+ {
+ // End of list, append filter list to tail
+ prev_idx->next= filter_idx;
+ }
+ }
+
+ return filter_idx;
+}
+
+my_bool Delegating_gtid_event_filter::exclude(rpl_gtid *gtid)
+{
+ Gtid_event_filter *filter;
+ gtid_filter_identifier filter_id= get_id_from_gtid(gtid);
+ gtid_filter_element *filter_element= try_find_filter_element_for_id(filter_id);
+ if (filter_element)
+ {
+ filter= filter_element->filter;
+ }
+ else
+ {
+ filter= m_default_filter;
+ }
+
+ return filter->exclude(gtid);
+}
+
+Window_gtid_event_filter *
+Domain_gtid_event_filter::find_or_create_window_filter_for_id(
+ uint32 domain_id)
+{
+ gtid_filter_element *filter_element=
+ find_or_create_filter_element_for_id(domain_id);
+ Window_gtid_event_filter *wgef= NULL;
+
+ if (filter_element->filter == NULL)
+ {
+ // New filter
+ wgef= new Window_gtid_event_filter();
+ filter_element->filter= wgef;
+ }
+ else if (filter_element->filter->get_filter_type() == WINDOW_GTID_FILTER_TYPE)
+ {
+ // We have an existing window filter here
+ wgef= (Window_gtid_event_filter *) filter_element->filter;
+ }
+ /*
+ Else: We have an existing filter but it is not of window type so propogate
+ NULL filter
+ */
+
+ return wgef;
+}
+
+my_bool Domain_gtid_event_filter::add_start_gtid(rpl_gtid *gtid)
+{
+ Window_gtid_event_filter *filter_to_update=
+ find_or_create_window_filter_for_id(gtid->domain_id);
+ return filter_to_update->set_start_gtid(gtid);
+}
+
+my_bool Domain_gtid_event_filter::add_stop_gtid(rpl_gtid *gtid)
+{
+ Window_gtid_event_filter *filter_to_update=
+ find_or_create_window_filter_for_id(gtid->domain_id);
+ return filter_to_update->set_stop_gtid(gtid);
+} \ No newline at end of file
diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h
index 11541c8000c..9f5058196b8 100644
--- a/sql/rpl_gtid.h
+++ b/sql/rpl_gtid.h
@@ -380,5 +380,229 @@ extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid,
extern int gtid_check_rpl_slave_state_table(TABLE *table);
extern rpl_gtid *gtid_parse_string_to_list(const char *p, size_t len,
uint32 *out_len);
+extern rpl_gtid *gtid_unpack_string_to_list(const char *p, size_t len,
+ uint32 *out_len);
+
+/*
+ Interface to support different methods of filtering log events by GTID
+*/
+class Gtid_event_filter
+{
+public:
+ Gtid_event_filter() {};
+ virtual ~Gtid_event_filter() {};
+
+ enum gtid_event_filter_type
+ {
+ DELEGATING_GTID_FILTER_TYPE = 1,
+ WINDOW_GTID_FILTER_TYPE = 2,
+ REJECT_ALL_GTID_FILTER_TYPE = 3
+ };
+
+ /*
+ Run the filter on an input gtid to test if the corresponding log events
+ should be excluded from a result
+
+ Returns TRUE when the event group corresponding to the input GTID should be
+ excluded.
+ Returns FALSE when the event group should be included.
+ */
+ virtual my_bool exclude(rpl_gtid *) = 0;
+
+ /*
+ The gtid_event_filter_type that corresponds to the underlying filter
+ implementation
+ */
+ virtual uint32 get_filter_type() = 0;
+};
+
+/*
+ Filter implementation which will exclude any and all input GTIDs. This is
+ used to set default behavior for GTIDs that do not have explicit filters
+ set on their domain_id, e.g. when a Window_gtid_event_filter is used for
+ a specific domain, then all other domain_ids will be rejected using this
+ filter implementation.
+*/
+class Reject_all_gtid_filter : public Gtid_event_filter
+{
+public:
+ Reject_all_gtid_filter() {}
+ ~Reject_all_gtid_filter() {}
+ my_bool exclude(rpl_gtid *gtid) { return TRUE; }
+ uint32 get_filter_type() { return REJECT_ALL_GTID_FILTER_TYPE; }
+};
+
+/*
+ A virtual sub-class of Gtid_event_filter which allows for quick identification
+ of potentially applicable filters for arbitrary GTIDs.
+*/
+typedef uint32 gtid_filter_identifier;
+class Identifiable_gtid_event_filter : public Gtid_event_filter
+{
+
+public:
+ Identifiable_gtid_event_filter() {};
+ virtual ~Identifiable_gtid_event_filter() {};
+
+ enum gtid_filter_lookup_flags
+ {
+ BY_DOMAIN_ID= 0x1
+ };
+
+ virtual my_bool exclude(rpl_gtid *) = 0;
+ virtual uint32 get_filter_type() = 0;
+ virtual int get_lookup_flags() = 0;
+ virtual gtid_filter_identifier get_filter_identifier() = 0;
+};
+
+/*
+ A filter implementation that passes through events between two GTIDs, m_start
+ (exclusive) and m_stop (inclusive).
+
+ This filter is stateful, such that it expects GTIDs to be a sequential
+ stream, and internally, the window will activate/deactivate when the start
+ and stop positions of the event stream have passed through, respectively.
+
+ Window activation is used to pass through events from arbitrary servers that
+ were not mentioned within m_start or m_stop, yet still fall within the
+ boundary.
+*/
+class Window_gtid_event_filter : public Identifiable_gtid_event_filter
+{
+public:
+ Window_gtid_event_filter();
+ Window_gtid_event_filter(rpl_gtid *start, rpl_gtid *stop);
+ ~Window_gtid_event_filter() {}
+
+ my_bool exclude(rpl_gtid*);
+ gtid_filter_identifier get_filter_identifier();
+
+ my_bool set_start_gtid(rpl_gtid *start);
+ my_bool set_stop_gtid(rpl_gtid *stop);
+
+ /*
+ Windows are indexed by the domain_id of a GTID
+ */
+ int get_lookup_flags()
+ {
+ return BY_DOMAIN_ID;
+ }
+
+ uint32 get_filter_type() { return WINDOW_GTID_FILTER_TYPE; }
+
+private:
+ /*
+ m_has_start : Indicates if a start to this window has been explicitly
+ provided. A window starts immediately if not provided.
+ */
+ my_bool m_has_start;
+
+ /*
+ m_has_stop : Indicates if a stop to this window has been explicitly
+ provided. A window continues indefinitely if not provided.
+ */
+ my_bool m_has_stop;
+
+ /*
+ m_is_active : Indicates whether or not the program is currently reading
+ events from within this window. When TRUE, events with
+ different server ids than those specified by m_start or
+ m_stop will be passed through.
+ */
+ my_bool m_is_active;
+
+ /*
+ m_has_passed : Indicates whether or not the program is currently reading
+ events from within this window.
+ */
+ my_bool m_has_passed;
+
+ /* m_start : marks the GTID that begins the window (exclusive). */
+ rpl_gtid m_start;
+
+ /* m_stop : marks the GTID that ends the range (inclusive). */
+ rpl_gtid m_stop;
+
+ /* last_gtid_seen: saves the last */
+ rpl_gtid last_gtid_seen;
+};
+
+/*
+ Data structure to help with quick lookup for filters. More specifically,
+ if two filters have identifiers that lead to the same hash, they will be
+ put into a linked list.
+*/
+typedef struct _gtid_filter_element
+{
+ gtid_filter_identifier identifier;
+ Identifiable_gtid_event_filter *filter;
+ struct _gtid_filter_element *next;
+} gtid_filter_element;
+
+/*
+ Gtid_event_filter subclass which has no specific implementation, but rather
+ delegates the filtering to specific identifiable/mapped implementations.
+
+ A default filter is used for GTIDs that are passed through which no explicit
+ filter can be identified.
+
+ This class should be subclassed, where the get_id_from_gtid function
+ specifies how to extract the filter identifier from a GTID.
+*/
+class Delegating_gtid_event_filter : public Gtid_event_filter
+{
+public:
+ Delegating_gtid_event_filter();
+ ~Delegating_gtid_event_filter();
+
+ my_bool exclude(rpl_gtid *gtid);
+ void set_default_filter(Gtid_event_filter *default_filter);
+
+ uint32 get_filter_type() { return DELEGATING_GTID_FILTER_TYPE; }
+
+ virtual gtid_filter_identifier get_id_from_gtid(rpl_gtid *) = 0;
+
+protected:
+
+ uint32 m_filter_id_mask;
+ Gtid_event_filter *m_default_filter;
+
+ /*
+ To reduce time to find a gtid window, they are indexed by domain_id. More
+ specifically, domain_ids are arranged into m_filter_id_mask+1 buckets, and
+ each bucket is a linked list of gtid_filter_elements that share the same
+ index. The index itself is found by a bitwise and, i.e.
+ some_rpl_gtid.domain_id & m_filter_id_mask
+ */
+ gtid_filter_element **m_filters_by_id;
+
+ gtid_filter_element *try_find_filter_element_for_id(gtid_filter_identifier);
+ gtid_filter_element *find_or_create_filter_element_for_id(gtid_filter_identifier);
+};
+
+/*
+ A subclass of Delegating_gtid_event_filter which identifies filters using the
+ domain id of a GTID.
+
+ Additional helper functions include:
+ add_start_gtid(GTID) : adds a start GTID position to this filter, to be
+ identified by its domain id
+ add_stop_gtid(GTID) : adds a stop GTID position to this filter, to be
+ identified by its domain id
+*/
+class Domain_gtid_event_filter : public Delegating_gtid_event_filter
+{
+public:
+ gtid_filter_identifier get_id_from_gtid(rpl_gtid *gtid)
+ {
+ return gtid->domain_id;
+ }
+
+ my_bool add_start_gtid(rpl_gtid *gtid);
+ my_bool add_stop_gtid(rpl_gtid *gtid);
+
+private:
+ Window_gtid_event_filter *find_or_create_window_filter_for_id(gtid_filter_identifier);
+};
#endif /* RPL_GTID_H */
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index e46e46f803c..9f23f537cb1 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -2123,8 +2123,11 @@ dispatch_command_return dispatch_command(enum enum_server_command command, THD *
case COM_BINLOG_DUMP:
{
ulong pos;
+ uint32 n_start_gtids;
+ rpl_gtid *start_gtids= NULL;
ushort flags;
uint32 slave_server_id;
+ uint32 unpack_idx= 0;
status_var_increment(thd->status_var.com_other);
@@ -2133,19 +2136,42 @@ dispatch_command_return dispatch_command(enum enum_server_command command, THD *
break;
/* TODO: The following has to be changed to an 8 byte integer */
- pos = uint4korr(packet);
- flags = uint2korr(packet + 4);
+ if (packet[4] == '-' && packet[9] == '-')
+ {
+ unpack_idx= 18;
+ while (packet[unpack_idx] == ',')
+ unpack_idx += 19; // 18 chars for gtid + 1 for comma
+ start_gtids= gtid_unpack_string_to_list(packet, unpack_idx, &n_start_gtids);
+
+ /*
+ Set pos to the start of the binlog file for scanning
+
+ TODO: When GTID indexing is complete (MDEV-4991), update pos by
+ looking it up in the index
+ */
+ pos= 4;
+ } /* if (packet[4] == '-' && packet[9] == '-') */
+ else
+ {
+ /* Single numeric log position case */
+ pos = uint4korr(packet);
+ unpack_idx += 4;
+ }
+ flags = uint2korr(packet + unpack_idx);
+ unpack_idx += 2;
thd->variables.server_id=0; /* avoid suicide */
- if ((slave_server_id= uint4korr(packet+6))) // mysqlbinlog.server_id==0
+ if ((slave_server_id= uint4korr(packet+unpack_idx))) // mysqlbinlog.server_id==0
kill_zombie_dump_threads(slave_server_id);
thd->variables.server_id = slave_server_id;
+ unpack_idx += 4;
- const char *name= packet + 10;
+ const char *name= packet + unpack_idx;
size_t nlen= strlen(name);
general_log_print(thd, command, "Log: '%s' Pos: %lu", name, pos);
if (nlen < FN_REFLEN)
- mysql_binlog_send(thd, thd->strmake(name, nlen), (my_off_t)pos, flags);
+ mysql_binlog_send(thd, thd->strmake(name, nlen), (my_off_t)pos, flags,
+ start_gtids, n_start_gtids);
thd->unregister_slave(); // todo: can be extraneous
/* fake COM_QUIT -- if we get here, the thread needs to terminate */
error = TRUE;
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 7bcff12a735..00fc65cf82f 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -163,6 +163,8 @@ struct binlog_send_info {
bool should_stop;
size_t dirlen;
+ Gtid_event_filter *gtid_event_filter;
+
binlog_send_info(THD *thd_arg, String *packet_arg, ushort flags_arg,
char *lfn)
: thd(thd_arg), net(&thd_arg->net), packet(packet_arg),
@@ -185,6 +187,8 @@ struct binlog_send_info {
error_text[0] = 0;
bzero(&error_gtid, sizeof(error_gtid));
until_binlog_state.init();
+
+ gtid_event_filter= NULL;
}
};
@@ -1751,6 +1755,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
}
}
+
/* Skip GTID event groups until we reach slave position within a domain_id. */
if (event_type == GTID_EVENT && info->using_gtid_state)
{
@@ -1758,7 +1763,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
slave_connection_state::entry *gtid_entry;
rpl_gtid *gtid;
- if (gtid_state->count() > 0 || until_gtid_state)
+ if (gtid_state->count() > 0 || until_gtid_state || info->gtid_event_filter)
{
rpl_gtid event_gtid;
@@ -1899,6 +1904,17 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
}
}
}
+
+ /*
+ Should this result be excluded from the output?
+ */
+ if (info->gtid_event_filter &&
+ info->gtid_event_filter->exclude(&event_gtid))
+ {
+ info->gtid_skip_group=
+ (flags2 & Gtid_log_event::FL_STANDALONE ? GTID_SKIP_STANDALONE
+ : GTID_SKIP_TRANSACTION);
+ }
}
}
@@ -2110,7 +2126,9 @@ err:
static int init_binlog_sender(binlog_send_info *info,
LOG_INFO *linfo,
const char *log_ident,
- my_off_t *pos)
+ my_off_t *pos,
+ rpl_gtid *start_gtids,
+ size_t n_start_gtids)
{
THD *thd= info->thd;
int error;
@@ -2130,7 +2148,8 @@ static int init_binlog_sender(binlog_send_info *info,
info->current_checksum_alg= get_binlog_checksum_value_at_connect(thd);
info->mariadb_slave_capability= get_mariadb_slave_capability(thd);
- info->using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state);
+ info->using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state) ||
+ start_gtids != NULL;
DBUG_EXECUTE_IF("simulate_non_gtid_aware_master",
info->using_gtid_state= false;);
@@ -2247,6 +2266,17 @@ static int init_binlog_sender(binlog_send_info *info,
info->clear_initial_log_pos= true;
}
+ if (start_gtids != NULL)
+ {
+ Domain_gtid_event_filter *filter= new Domain_gtid_event_filter();
+ my_off_t i;
+ for(i = 0; i < n_start_gtids; i++)
+ {
+ filter->add_start_gtid(&start_gtids[i]);
+ }
+ info->gtid_event_filter= filter;
+ }
+
return 0;
}
@@ -2840,7 +2870,8 @@ static int send_one_binlog_file(binlog_send_info *info,
}
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
- ushort flags)
+ ushort flags, rpl_gtid *start_gtids,
+ uint32 n_start_gtids)
{
LOG_INFO linfo;
@@ -2860,7 +2891,8 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
bzero((char*) &log,sizeof(log));
- if (init_binlog_sender(info, &linfo, log_ident, &pos))
+ if (init_binlog_sender(info, &linfo, log_ident, &pos, start_gtids,
+ n_start_gtids))
goto err;
has_transmit_started= true;
@@ -3022,6 +3054,8 @@ err:
thd->reset_current_linfo();
thd->variables.max_allowed_packet= old_max_allowed_packet;
delete info->fdev;
+ delete info->gtid_event_filter;
+ my_free(start_gtids);
if (likely(info->error == 0))
{
diff --git a/sql/sql_repl.h b/sql/sql_repl.h
index 95916e31abf..bfc35ea5456 100644
--- a/sql/sql_repl.h
+++ b/sql/sql_repl.h
@@ -57,7 +57,8 @@ struct LOAD_FILE_IO_CACHE : public IO_CACHE
int log_loaded_block(IO_CACHE* file, uchar *Buffer, size_t Count);
int init_replication_sys_vars();
-void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags);
+void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags,
+ rpl_gtid *start_gtids, uint32 n_start_gtids);
#ifdef HAVE_PSI_INTERFACE
extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state;