summaryrefslogtreecommitdiff
path: root/sql/rpl_gtid.cc
diff options
context:
space:
mode:
authorBrandon Nesterenko <brandon.nesterenko@mariadb.com>2021-08-11 11:29:37 -0600
committerBrandon Nesterenko <brandon.nesterenko@mariadb.com>2021-08-23 13:57:38 -0600
commitc622af26fca37fc7864e9526a60dbe059644d120 (patch)
treed9ed6be82f0da06e8b2ac68abea5988180289cd0 /sql/rpl_gtid.cc
parent64f7dffcc7e0e69c31d9a36c2090a26300e57c4c (diff)
downloadmariadb-git-10.7-MDEV-4989.tar.gz
MDEV-4989: Support for GTID in mysqlbinlog10.7-MDEV-4989
New Feature: ============ This commit extends the mariadb-binlog capabilities to allow events to be filtered by GTID ranges. More specifically, the following capabilities are addressed: 1) GTIDs can be used to filter results on local binlog files 2) GTIDs can be used to filter results from remote servers 3) For a given GTID range, its start-position is exclusive and its stop-position is inclusive 4) After the events have been written, the session server id and domain id are reset to their former values 5) Output filtered by GTID ranges can be piped to the MariaDB client To facilitate these features, the --start-position and --stop-position arguments have been extended to additionally accept values formatted as a list of GTID positions, e.g. `--start-position=0-1-0,1-2-55` Reviewed By: ============ <TODO>
Diffstat (limited to 'sql/rpl_gtid.cc')
-rw-r--r--sql/rpl_gtid.cc402
1 files changed, 400 insertions, 2 deletions
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index 8b10703fdc2..52e14714a9b 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -17,6 +17,7 @@
/* Definitions for MariaDB global transaction ID (GTID). */
+#ifndef MYSQL_CLIENT
#include "mariadb.h"
#include "sql_priv.h"
#include "unireg.h"
@@ -1268,6 +1269,7 @@ rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid)
return true;
}
+#endif
/*
Parse a GTID at the start of a string, and update the pointer to point
@@ -1305,9 +1307,32 @@ gtid_parser_helper(const char **ptr, const char *end, rpl_gtid *out_gtid)
return 0;
}
+/*
+ Unpack a GTID at the start of a string, and update the pointer to point
+ at the first character after the unpacked GTID.
+
+ Returns 0 on ok, non-zero on parse error.
+*/
+static int
+gtid_unpack_helper(const char **ptr, const char *end, rpl_gtid *out_gtid)
+{
+ const char *p= *ptr;
+
+ if (p[4] != '-' || p[9] != '-')
+ return 1;
+
+ out_gtid->domain_id= (uint32)uint4korr(p);
+ out_gtid->server_id= (uint32)uint4korr(&p[5]);
+ out_gtid->seq_no= (uint64)uint8korr(&p[10]);
+
+ *ptr= p + 18;
+ return 0;
+}
rpl_gtid *
-gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len)
+gtid_read_to_list(const char *str, size_t str_len, uint32 *out_len,
+ int reader_f(const char **ptr, const char *end,
+ rpl_gtid *out_gtid))
{
const char *p= const_cast<char *>(str);
const char *end= p + str_len;
@@ -1318,7 +1343,7 @@ gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len)
{
rpl_gtid gtid;
- if (len >= (((uint32)1 << 28)-1) || gtid_parser_helper(&p, end, &gtid))
+ if (len >= (((uint32)1 << 28)-1) || reader_f(&p, end, &gtid))
{
my_free(list);
return NULL;
@@ -1345,6 +1370,20 @@ gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len)
}
+rpl_gtid *
+gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len)
+{
+ return gtid_read_to_list(str, str_len, out_len, gtid_parser_helper);
+}
+
+rpl_gtid *
+gtid_unpack_string_to_list(const char *str, size_t str_len, uint32 *out_len)
+{
+ return gtid_read_to_list(str, str_len, out_len, gtid_unpack_helper);
+}
+
+#ifndef MYSQL_CLIENT
+
/*
Update the slave replication state with the GTID position obtained from
master when connecting with old-style (filename,offset) position.
@@ -2952,3 +2991,362 @@ gtid_waiting::remove_from_wait_queue(gtid_waiting::hash_element *he,
queue_remove(&he->queue, elem->queue_idx);
}
+
+#endif
+
+Window_gtid_event_filter::Window_gtid_event_filter() :
+ m_has_start(FALSE),
+ m_has_stop(FALSE),
+ m_is_active(FALSE),
+ m_has_passed(FALSE)
+ {
+ // m_start and m_stop do not need initial values if unused
+ }
+
+Window_gtid_event_filter::Window_gtid_event_filter(rpl_gtid *start, rpl_gtid *stop) :
+ m_is_active(FALSE),
+ m_has_passed(FALSE)
+{
+ DBUG_ASSERT(start->domain_id == stop->domain_id);
+
+ m_is_active= FALSE;
+ m_has_passed= FALSE;
+
+ m_has_start= TRUE;
+ m_start.domain_id= start->domain_id;
+ m_start.server_id= start->server_id;
+ m_start.seq_no= start->seq_no;
+
+ m_has_stop= TRUE;
+ m_stop.domain_id= stop->domain_id;
+ m_stop.server_id= stop->server_id;
+ m_stop.seq_no= stop->seq_no;
+}
+
+my_bool Window_gtid_event_filter::set_start_gtid(rpl_gtid *start)
+{
+ if (m_has_start)
+ return FALSE;
+
+ // Copy values
+ m_has_start= TRUE;
+ m_start.domain_id= start->domain_id;
+ m_start.server_id= start->server_id;
+ m_start.seq_no= start->seq_no;
+
+ return TRUE;
+}
+
+my_bool Window_gtid_event_filter::set_stop_gtid(rpl_gtid *stop)
+{
+ if (m_has_stop)
+ return FALSE;
+
+ // Copy values
+ m_has_stop= TRUE;
+ m_stop.domain_id= stop->domain_id;
+ m_stop.server_id= stop->server_id;
+ m_stop.seq_no= stop->seq_no;
+
+ return TRUE;
+}
+
+gtid_filter_identifier Window_gtid_event_filter::get_filter_identifier()
+{
+ DBUG_ASSERT(m_has_start || m_has_stop);
+ if (m_has_start)
+ return m_start.domain_id;
+ else
+ return m_stop.domain_id;
+}
+
+static inline my_bool is_gtid_at_or_after(rpl_gtid *boundary,
+ rpl_gtid *test_gtid)
+{
+ return test_gtid->domain_id == boundary->domain_id &&
+ test_gtid->server_id == boundary->server_id &&
+ test_gtid->seq_no >= boundary->seq_no;
+}
+
+static inline my_bool is_gtid_before(rpl_gtid *boundary,
+ rpl_gtid *test_gtid)
+{
+ return test_gtid->domain_id == boundary->domain_id &&
+ test_gtid->server_id == boundary->server_id &&
+ test_gtid->seq_no <= boundary->seq_no;
+}
+
+my_bool Window_gtid_event_filter::exclude(rpl_gtid *gtid)
+{
+ /* Assume result should be excluded to start */
+ my_bool should_exclude= TRUE;
+
+ DBUG_ASSERT((m_has_start || m_has_stop) &&
+ (gtid->domain_id == m_start.domain_id ||
+ gtid->domain_id == m_stop.domain_id));
+
+ if (!m_is_active && !m_has_passed)
+ {
+ /*
+ This filter has not yet been activated. Check if the gtid is within the
+ bounds of this window.
+ */
+
+ if (!m_has_start)
+ {
+ /*
+ Start GTID was not provided, so we want to include everything up to m_stop
+ */
+ m_is_active= TRUE;
+ should_exclude= FALSE;
+ }
+ else if (is_gtid_at_or_after(&m_start, gtid))
+ {
+ m_is_active= TRUE;
+
+ DBUG_PRINT("gtid-event-filter",
+ ("Window: Begin (%d-%d-%d, %d-%d-%llu]", m_start.domain_id,
+ m_start.server_id, m_start.seq_no, m_stop.domain_id,
+ m_stop.server_id, m_stop.seq_no));
+
+ /*
+ As the start of the range is exclusive, if this gtid is the start of
+ the range, exclude it
+ */
+ if (gtid->seq_no == m_start.seq_no &&
+ gtid->server_id == m_start.server_id)
+ should_exclude= TRUE;
+ else
+ should_exclude= FALSE;
+ }
+ } /* if (!m_is_active && !m_has_passed) */
+ else if (m_is_active && !m_has_passed)
+ {
+ /*
+ This window is currently active so we want the event group to be included
+ in the results. Additionally check if we are at the end of the window.
+ If no end of the window is provided, go indefinitely
+ */
+ should_exclude= FALSE;
+
+ if (m_has_stop && is_gtid_at_or_after(&m_stop, gtid))
+ {
+ DBUG_PRINT("gtid-event-filter",
+ ("Window: End (%d-%d-%d, %d-%d-%llu]", m_start.domain_id,
+ m_start.server_id, m_start.seq_no, m_stop.domain_id,
+ m_stop.server_id, m_stop.seq_no));
+ m_is_active= FALSE;
+ m_has_passed= TRUE;
+
+ if (gtid->server_id == m_stop.server_id && gtid->seq_no > m_stop.seq_no)
+ {
+ /*
+ The GTID is after the finite stop of the window, don't let it pass
+ through
+ */
+ should_exclude= TRUE;
+ }
+ }
+ else if (m_has_start && is_gtid_before(&m_start, gtid))
+ {
+ /*
+ Out of order check, the window is active but this GTID takes place
+ before the window begins. keep the window active, but exclude it from
+ passing through.
+ */
+ should_exclude= TRUE;
+ }
+ }
+ else if (m_has_passed && m_has_stop && is_gtid_before(&m_stop, gtid))
+ {
+ /* Test if events are out of order */
+ if (!m_has_start || (m_has_start && is_gtid_at_or_after(&m_start, gtid)))
+ {
+ /*
+ The filter window has closed because it has seen a GTID higher than its
+ end boundary; however, this GTID is out of order and should be passed
+ through.
+ */
+ should_exclude= TRUE;
+ }
+ }
+
+ return should_exclude;
+}
+
+Delegating_gtid_event_filter::Delegating_gtid_event_filter()
+{
+ uint32 i;
+
+ m_filter_id_mask= 0xf;
+
+ m_filters_by_id= (gtid_filter_element **) my_malloc(
+ PSI_NOT_INSTRUMENTED,
+ (m_filter_id_mask + 1) * sizeof(gtid_filter_element *),
+ MYF(MY_WME)
+ );
+
+ DBUG_ASSERT(m_filters_by_id != NULL);
+
+ for (i = 0; i <= m_filter_id_mask; i++)
+ {
+ m_filters_by_id[i]= NULL;
+ }
+
+ m_default_filter= new Reject_all_gtid_filter();
+}
+
+/*
+ Deconstructor deletes:
+ 1) All Identifiable_gtid_event_filters added
+ 2) All gtid_filter_element allocations
+*/
+Delegating_gtid_event_filter::~Delegating_gtid_event_filter()
+{
+ uint32 i;
+ for (i = 0; i <= m_filter_id_mask; i++)
+ {
+ gtid_filter_element *filter_element= m_filters_by_id[i],
+ *filter_element_to_del= NULL;
+ while(filter_element)
+ {
+ filter_element_to_del= filter_element;
+ filter_element= filter_element->next;
+ delete filter_element_to_del->filter;
+ my_free(filter_element_to_del);
+ }
+ }
+ my_free(m_filters_by_id);
+
+ delete m_default_filter;
+}
+
+void Delegating_gtid_event_filter::set_default_filter(Gtid_event_filter *filter)
+{
+ if (m_default_filter)
+ delete m_default_filter;
+
+ m_default_filter= filter;
+}
+
+gtid_filter_element *
+Delegating_gtid_event_filter::try_find_filter_element_for_id(
+ gtid_filter_identifier filter_id)
+{
+ // Add this into the domain id list
+ uint32 map_idx= filter_id & m_filter_id_mask;
+ gtid_filter_element *filter_idx= m_filters_by_id[map_idx];
+
+ /* Find list index to add this filter */
+ while (filter_idx)
+ {
+ if (filter_idx->identifier == filter_id)
+ break;
+ filter_idx= filter_idx->next;
+ }
+
+ return filter_idx;
+}
+
+gtid_filter_element *
+Delegating_gtid_event_filter::find_or_create_filter_element_for_id(
+ gtid_filter_identifier filter_id)
+{
+ // Add this into the domain id list
+ uint32 map_idx= filter_id & m_filter_id_mask;
+ gtid_filter_element *filter_idx= m_filters_by_id[map_idx],
+ *prev_idx= NULL;
+
+ /* Find list index to add this filter */
+ while (filter_idx)
+ {
+ prev_idx= filter_idx;
+ if (filter_idx->identifier == filter_id)
+ {
+ break;
+ }
+ prev_idx= filter_idx;
+ filter_idx= filter_idx->next;
+ }
+
+ if (filter_idx == NULL)
+ {
+ // No other domain ids have filters that index here, create this one
+ filter_idx= (gtid_filter_element *) my_malloc(
+ PSI_NOT_INSTRUMENTED, sizeof(gtid_filter_element), MYF(MY_WME));
+ filter_idx->identifier= filter_id;
+ filter_idx->next= NULL;
+ filter_idx->filter= NULL;
+
+ if (prev_idx == NULL)
+ {
+ // This is the first filter in the bucket
+ m_filters_by_id[map_idx]= filter_idx;
+ }
+ else
+ {
+ // End of list, append filter list to tail
+ prev_idx->next= filter_idx;
+ }
+ }
+
+ return filter_idx;
+}
+
+my_bool Delegating_gtid_event_filter::exclude(rpl_gtid *gtid)
+{
+ Gtid_event_filter *filter;
+ gtid_filter_identifier filter_id= get_id_from_gtid(gtid);
+ gtid_filter_element *filter_element= try_find_filter_element_for_id(filter_id);
+ if (filter_element)
+ {
+ filter= filter_element->filter;
+ }
+ else
+ {
+ filter= m_default_filter;
+ }
+
+ return filter->exclude(gtid);
+}
+
+Window_gtid_event_filter *
+Domain_gtid_event_filter::find_or_create_window_filter_for_id(
+ uint32 domain_id)
+{
+ gtid_filter_element *filter_element=
+ find_or_create_filter_element_for_id(domain_id);
+ Window_gtid_event_filter *wgef= NULL;
+
+ if (filter_element->filter == NULL)
+ {
+ // New filter
+ wgef= new Window_gtid_event_filter();
+ filter_element->filter= wgef;
+ }
+ else if (filter_element->filter->get_filter_type() == WINDOW_GTID_FILTER_TYPE)
+ {
+ // We have an existing window filter here
+ wgef= (Window_gtid_event_filter *) filter_element->filter;
+ }
+ /*
+ Else: We have an existing filter but it is not of window type so propogate
+ NULL filter
+ */
+
+ return wgef;
+}
+
+my_bool Domain_gtid_event_filter::add_start_gtid(rpl_gtid *gtid)
+{
+ Window_gtid_event_filter *filter_to_update=
+ find_or_create_window_filter_for_id(gtid->domain_id);
+ return filter_to_update->set_start_gtid(gtid);
+}
+
+my_bool Domain_gtid_event_filter::add_stop_gtid(rpl_gtid *gtid)
+{
+ Window_gtid_event_filter *filter_to_update=
+ find_or_create_window_filter_for_id(gtid->domain_id);
+ return filter_to_update->set_stop_gtid(gtid);
+} \ No newline at end of file