summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/log_event.h48
-rw-r--r--sql/log_event_client.cc2
-rw-r--r--sql/rpl_gtid.cc846
-rw-r--r--sql/rpl_gtid.h448
4 files changed, 1342 insertions, 2 deletions
diff --git a/sql/log_event.h b/sql/log_event.h
index 3adc7a26d93..db77be2d99b 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -920,6 +920,20 @@ typedef struct st_print_event_info
IO_CACHE review_sql_cache;
#endif
FILE *file;
+
+
+
+ /*
+ Used to include the events within a GTID start/stop boundary
+ */
+ my_bool m_is_event_group_active;
+
+ /*
+ Tracks whether or not output events must be explicitly activated in order
+ to be printed
+ */
+ my_bool m_is_event_group_filtering_enabled;
+
st_print_event_info();
~st_print_event_info() {
@@ -942,6 +956,40 @@ typedef struct st_print_event_info
copy_event_cache_to_file_and_reinit(&body_cache, file);
fflush(file);
}
+
+ /*
+ Notify that all events part of the current group should be printed
+ */
+ void activate_current_event_group()
+ {
+ m_is_event_group_active= TRUE;
+ }
+ void deactivate_current_event_group()
+ {
+ m_is_event_group_active= FALSE;
+ }
+
+ /*
+ Used for displaying events part of an event group.
+ Returns TRUE when both event group filtering is enabled and the current
+ event group should be displayed, OR if event group filtering is
+ disabled. More specifically, if filtering is disabled, all events
+ should be shown.
+ Returns FALSE when event group filtering is enabled and the current event
+ group is filtered out.
+ */
+ my_bool is_event_group_active()
+ {
+ return m_is_event_group_filtering_enabled ? m_is_event_group_active : TRUE;
+ }
+
+ /*
+ Notify that events must be explicitly activated in order to be printed
+ */
+ void enable_event_group_filtering()
+ {
+ m_is_event_group_filtering_enabled= TRUE;
+ }
} PRINT_EVENT_INFO;
#endif
diff --git a/sql/log_event_client.cc b/sql/log_event_client.cc
index 7e0bf7d8e4c..66f2753d96b 100644
--- a/sql/log_event_client.cc
+++ b/sql/log_event_client.cc
@@ -3791,6 +3791,8 @@ st_print_event_info::st_print_event_info()
printed_fd_event=FALSE;
file= 0;
base64_output_mode=BASE64_OUTPUT_UNSPEC;
+ m_is_event_group_active= TRUE;
+ m_is_event_group_filtering_enabled= FALSE;
open_cached_file(&head_cache, NULL, NULL, 0, flags);
open_cached_file(&body_cache, NULL, NULL, 0, flags);
open_cached_file(&tail_cache, NULL, NULL, 0, flags);
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index 306ae878060..c1031133530 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -17,6 +17,7 @@
/* Definitions for MariaDB global transaction ID (GTID). */
+#ifndef MYSQL_CLIENT
#include "mariadb.h"
#include "sql_priv.h"
#include "unireg.h"
@@ -24,7 +25,6 @@
#include "sql_base.h"
#include "sql_parse.h"
#include "key.h"
-#include "rpl_gtid.h"
#include "rpl_rli.h"
#include "slave.h"
#include "log_event.h"
@@ -1273,6 +1273,7 @@ rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid)
return true;
}
+#endif
/*
Parse a GTID at the start of a string, and update the pointer to point
@@ -1310,7 +1311,6 @@ gtid_parser_helper(const char **ptr, const char *end, rpl_gtid *out_gtid)
return 0;
}
-
rpl_gtid *
gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len)
{
@@ -1349,6 +1349,7 @@ gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len)
return list;
}
+#ifndef MYSQL_CLIENT
/*
Update the slave replication state with the GTID position obtained from
@@ -2974,3 +2975,844 @@ gtid_waiting::remove_from_wait_queue(gtid_waiting::hash_element *he,
queue_remove(&he->queue, elem->queue_idx);
}
+
+#endif
+
+void free_domain_lookup_element(void *p)
+{
+ struct Binlog_gtid_state_validator::audit_elem *audit_elem=
+ (struct Binlog_gtid_state_validator::audit_elem *) p;
+ delete_dynamic(&audit_elem->late_gtids_previous);
+ delete_dynamic(&audit_elem->late_gtids_real);
+ my_free(audit_elem);
+}
+
+Binlog_gtid_state_validator::Binlog_gtid_state_validator()
+{
+ my_hash_init(PSI_INSTRUMENT_ME, &m_audit_elem_domain_lookup, &my_charset_bin, 32,
+ offsetof(struct audit_elem, domain_id), sizeof(uint32),
+ NULL, free_domain_lookup_element, HASH_UNIQUE);
+}
+
+Binlog_gtid_state_validator::~Binlog_gtid_state_validator()
+{
+ my_hash_free(&m_audit_elem_domain_lookup);
+}
+
+void Binlog_gtid_state_validator::initialize_start_gtids(rpl_gtid *start_gtids,
+ size_t n_gtids)
+{
+ size_t i;
+ for(i= 0; i < n_gtids; i++)
+ {
+ rpl_gtid *domain_state_gtid= &start_gtids[i];
+
+ /*
+ If we are initializing from a GLLE, we can have repeat domain ids from
+ differing servers, so we want to ensure our start gtid matches the last
+ known position
+ */
+ struct audit_elem *audit_elem= (struct audit_elem *) my_hash_search(
+ &m_audit_elem_domain_lookup,
+ (const uchar *) &(domain_state_gtid->domain_id), 0);
+ if (audit_elem)
+ {
+ /*
+ We have this domain already specified, so try to overwrite with the
+ more recent GTID
+ */
+ if (domain_state_gtid->seq_no > audit_elem->start_gtid.seq_no)
+ audit_elem->start_gtid = *domain_state_gtid;
+ continue;
+ }
+
+ /* Initialize a new domain */
+ audit_elem= (struct audit_elem *) my_malloc(
+ PSI_NOT_INSTRUMENTED, sizeof(struct audit_elem), MYF(MY_WME));
+ if (!audit_elem)
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return;
+ }
+
+ audit_elem->domain_id= start_gtids[i].domain_id;
+ audit_elem->start_gtid= start_gtids[i];
+ audit_elem->last_gtid= {audit_elem->domain_id, 0, 0};
+
+ my_init_dynamic_array(PSI_INSTRUMENT_ME, &audit_elem->late_gtids_real,
+ sizeof(rpl_gtid), 8, 8, MYF(0));
+ my_init_dynamic_array(PSI_INSTRUMENT_ME, &audit_elem->late_gtids_previous,
+ sizeof(rpl_gtid), 8, 8, MYF(0));
+
+ if (my_hash_insert(&m_audit_elem_domain_lookup, (uchar *) audit_elem))
+ {
+ my_free(audit_elem);
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return;
+ }
+ }
+}
+
+my_bool Binlog_gtid_state_validator::initialize_gtid_state(FILE *out,
+ rpl_gtid *gtids,
+ size_t n_gtids)
+{
+ size_t i;
+ my_bool err= FALSE;
+
+ /*
+ We weren't initialized with starting positions explicitly, so assume the
+ starting positions of the current gtid state
+ */
+ if (!m_audit_elem_domain_lookup.records)
+ initialize_start_gtids(gtids, n_gtids);
+
+ for(i= 0; i < n_gtids; i++)
+ {
+ rpl_gtid *domain_state_gtid= &gtids[i];
+
+ struct audit_elem *audit_elem= (struct audit_elem *) my_hash_search(
+ &m_audit_elem_domain_lookup,
+ (const uchar *) &(domain_state_gtid->domain_id), 0);
+
+ if (!audit_elem)
+ {
+ Binlog_gtid_state_validator::error(
+ out,
+ "Starting GTID position list does not specify an initial value "
+ "for domain %u, whose events may be present in the requested binlog "
+ "file(s). The last known position for this domain was %u-%u-%llu.",
+ domain_state_gtid->domain_id, PARAM_GTID((*domain_state_gtid)));
+ err= TRUE;
+ continue;
+ }
+
+ if (audit_elem->start_gtid.seq_no < domain_state_gtid->seq_no)
+ {
+ Binlog_gtid_state_validator::error(
+ out,
+ "Binary logs are missing data for domain %u. Expected data to "
+ "start from state %u-%u-%llu; however, the initial GTID state of "
+ "the logs was %u-%u-%llu.",
+ domain_state_gtid->domain_id, PARAM_GTID(audit_elem->start_gtid),
+ PARAM_GTID((*domain_state_gtid)));
+ err= TRUE;
+ continue;
+ }
+
+ if (domain_state_gtid->seq_no > audit_elem->last_gtid.seq_no)
+ audit_elem->last_gtid= *domain_state_gtid;
+ }
+ return err;
+}
+
+my_bool Binlog_gtid_state_validator::verify_stop_state(FILE *out,
+ rpl_gtid *stop_gtids,
+ size_t n_stop_gtids)
+{
+ size_t i;
+ for(i= 0; i < n_stop_gtids; i++)
+ {
+ rpl_gtid *stop_gtid= &stop_gtids[i];
+
+ struct audit_elem *audit_elem= (struct audit_elem *) my_hash_search(
+ &m_audit_elem_domain_lookup,
+ (const uchar *) &(stop_gtid->domain_id), 0);
+
+ /*
+ It is okay if stop gtid doesn't exist in current state because it will be treated
+ as a new domain
+ */
+ if (audit_elem && stop_gtid->seq_no <= audit_elem->start_gtid.seq_no)
+ {
+ Binlog_gtid_state_validator::error(
+ out,
+ "--stop-position GTID %u-%u-%llu does not exist in the "
+ "specified binlog files. The current GTID state of domain %u in the "
+ "specified binary logs is %u-%u-%llu",
+ PARAM_GTID((*stop_gtid)), stop_gtid->domain_id,
+ PARAM_GTID(audit_elem->start_gtid));
+ return TRUE;
+ }
+ }
+
+ /* No issues with any GTIDs */
+ return FALSE;
+}
+
+my_bool
+Binlog_gtid_state_validator::verify_gtid_state(FILE *out,
+ rpl_gtid *domain_state_gtid)
+{
+ struct audit_elem *audit_elem= (struct audit_elem *) my_hash_search(
+ &m_audit_elem_domain_lookup,
+ (const uchar *) &(domain_state_gtid->domain_id), 0);
+
+ if (!audit_elem)
+ {
+ Binlog_gtid_state_validator::warn(
+ out,
+ "Binary logs are missing data for domain %u. The current binary log "
+ "specified its "
+ "current state for this domain as %u-%u-%llu, but neither the "
+ "starting GTID position list nor any processed events have "
+ "mentioned "
+ "this domain.",
+ domain_state_gtid->domain_id, PARAM_GTID((*domain_state_gtid)));
+ return TRUE;
+ }
+
+ if (audit_elem->last_gtid.seq_no < domain_state_gtid->seq_no)
+ {
+ Binlog_gtid_state_validator::warn(
+ out,
+ "Binary logs are missing data for domain %u. The current binary log "
+ "state is %u-%u-%llu, but the last seen event was %u-%u-%llu.",
+ domain_state_gtid->domain_id, PARAM_GTID((*domain_state_gtid)),
+ PARAM_GTID(audit_elem->last_gtid));
+ return TRUE;
+ }
+
+ return FALSE;
+}
+
+my_bool Binlog_gtid_state_validator::record(rpl_gtid *gtid)
+{
+ struct audit_elem *audit_elem= (struct audit_elem *) my_hash_search(
+ &m_audit_elem_domain_lookup, (const uchar *) &(gtid->domain_id), 0);
+
+ if (!audit_elem)
+ {
+ /*
+ We haven't seen any GTIDs in this domian yet. Perform initial set up for
+ this domain so we can monitor its events.
+ */
+ audit_elem= (struct audit_elem *) my_malloc(
+ PSI_NOT_INSTRUMENTED, sizeof(struct audit_elem), MYF(MY_WME));
+ if (!audit_elem)
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return TRUE;
+ }
+
+ audit_elem->domain_id= gtid->domain_id;
+ audit_elem->last_gtid= *gtid;
+ audit_elem->start_gtid= {gtid->domain_id, 0, 0};
+
+ my_init_dynamic_array(PSI_INSTRUMENT_ME, &audit_elem->late_gtids_real,
+ sizeof(rpl_gtid), 8, 8, MYF(0));
+ my_init_dynamic_array(PSI_INSTRUMENT_ME, &audit_elem->late_gtids_previous,
+ sizeof(rpl_gtid), 8, 8, MYF(0));
+
+ if (my_hash_insert(&m_audit_elem_domain_lookup, (uchar *) audit_elem))
+ {
+ my_free(audit_elem);
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return TRUE;
+ }
+ }
+ else
+ {
+ /* Out of order check */
+ if (gtid->seq_no <= audit_elem->last_gtid.seq_no &&
+ gtid->seq_no >= audit_elem->start_gtid.seq_no)
+ {
+ /* GTID is out of order */
+ insert_dynamic(&audit_elem->late_gtids_real, (const void *) gtid);
+ insert_dynamic(&audit_elem->late_gtids_previous,
+ (const void *) &(audit_elem->last_gtid));
+
+ return TRUE;
+ }
+ else
+ {
+ /* GTID is valid */
+ audit_elem->last_gtid= *gtid;
+ }
+ }
+
+ return FALSE;
+}
+
+/*
+ Data structure used to help pass data into report_audit_findings because
+ my_hash_iterate only passes one parameter
+*/
+struct gtid_report_ctx
+{
+ FILE *out_file;
+ my_bool is_strict_mode;
+ my_bool contains_err;
+};
+
+static my_bool report_audit_findings(void *entry, void *report_ctx_arg)
+{
+ struct Binlog_gtid_state_validator::audit_elem *audit_el=
+ (struct Binlog_gtid_state_validator::audit_elem *) entry;
+
+ struct gtid_report_ctx *report_ctx=
+ (struct gtid_report_ctx *) report_ctx_arg;
+ FILE *out= report_ctx->out_file;
+ my_bool is_strict_mode= report_ctx->is_strict_mode;
+ size_t i;
+ void (*report_f)(FILE*, const char*, ...);
+
+ if (is_strict_mode)
+ report_f= Binlog_gtid_state_validator::error;
+ else
+ report_f= Binlog_gtid_state_validator::warn;
+
+ if (audit_el)
+ {
+ if (audit_el->last_gtid.seq_no < audit_el->start_gtid.seq_no)
+ {
+ report_f(out,
+ "Binary logs never reached expected GTID state of %u-%u-%llu",
+ PARAM_GTID(audit_el->start_gtid));
+ report_ctx->contains_err= TRUE;
+ }
+
+ /* Report any out of order GTIDs */
+ for(i= 0; i < audit_el->late_gtids_real.elements; i++)
+ {
+ rpl_gtid *real_gtid=
+ (rpl_gtid *) dynamic_array_ptr(&(audit_el->late_gtids_real), i);
+ rpl_gtid *last_gtid= (rpl_gtid *) dynamic_array_ptr(
+ &(audit_el->late_gtids_previous), i);
+ DBUG_ASSERT(real_gtid && last_gtid);
+
+ report_f(out,
+ "Found out of order GTID. Got %u-%u-%llu after %u-%u-%llu",
+ PARAM_GTID((*real_gtid)), PARAM_GTID((*last_gtid)));
+ report_ctx->contains_err= TRUE;
+ }
+ }
+
+ return FALSE;
+}
+
+my_bool Binlog_gtid_state_validator::report(FILE *out, my_bool is_strict_mode)
+{
+ struct gtid_report_ctx report_ctx;
+ report_ctx.out_file= out;
+ report_ctx.is_strict_mode= is_strict_mode;
+ report_ctx.contains_err= FALSE;
+ my_hash_iterate(&m_audit_elem_domain_lookup, report_audit_findings, &report_ctx);
+ fflush(out);
+ return is_strict_mode ? report_ctx.contains_err : FALSE;
+}
+
+Window_gtid_event_filter::Window_gtid_event_filter()
+ : m_has_start(FALSE), m_has_stop(FALSE), m_is_active(FALSE),
+ m_has_passed(FALSE)
+{
+ // m_start and m_stop do not need initial values if unused
+}
+
+int Window_gtid_event_filter::set_start_gtid(rpl_gtid *start)
+{
+ if (m_has_start)
+ {
+ sql_print_error(
+ "Start position cannot have repeated domain "
+ "ids (found %u-%u-%llu when %u-%u-%llu was previously specified)",
+ PARAM_GTID((*start)), PARAM_GTID(m_start));
+ return 1;
+ }
+
+ m_has_start= TRUE;
+ m_start= *start;
+ return 0;
+}
+
+int Window_gtid_event_filter::set_stop_gtid(rpl_gtid *stop)
+{
+ if (m_has_stop)
+ {
+ sql_print_error(
+ "Stop position cannot have repeated domain "
+ "ids (found %u-%u-%llu when %u-%u-%llu was previously specified)",
+ PARAM_GTID((*stop)), PARAM_GTID(m_stop));
+ return 1;
+ }
+
+ m_has_stop= TRUE;
+ m_stop= *stop;
+ return 0;
+}
+
+my_bool Window_gtid_event_filter::is_range_invalid()
+{
+ if (m_has_start && m_has_stop && m_start.seq_no > m_stop.seq_no)
+ {
+ sql_print_error(
+ "Queried GTID range is invalid in strict mode. Stop position "
+ "%u-%u-%llu is not greater than or equal to start %u-%u-%llu.",
+ PARAM_GTID(m_stop), PARAM_GTID(m_start));
+ return TRUE;
+ }
+ return FALSE;
+}
+
+static inline my_bool is_gtid_at_or_after(rpl_gtid *boundary,
+ rpl_gtid *test_gtid)
+{
+ return test_gtid->domain_id == boundary->domain_id &&
+ test_gtid->seq_no >= boundary->seq_no;
+}
+
+static inline my_bool is_gtid_at_or_before(rpl_gtid *boundary,
+ rpl_gtid *test_gtid)
+{
+ return test_gtid->domain_id == boundary->domain_id &&
+ test_gtid->seq_no <= boundary->seq_no;
+}
+
+my_bool Window_gtid_event_filter::exclude(rpl_gtid *gtid)
+{
+ /* Assume result should be excluded to start */
+ my_bool should_exclude= TRUE;
+
+ DBUG_ASSERT((m_has_start || m_has_stop) &&
+ (gtid->domain_id == m_start.domain_id ||
+ gtid->domain_id == m_stop.domain_id));
+
+ if (!m_is_active && !m_has_passed)
+ {
+ /*
+ This filter has not yet been activated. Check if the gtid is within the
+ bounds of this window.
+ */
+
+ if (!m_has_start && is_gtid_at_or_before(&m_stop, gtid))
+ {
+ /*
+ Start GTID was not provided, so we want to include everything from here
+ up to m_stop
+ */
+ m_is_active= TRUE;
+ should_exclude= FALSE;
+ }
+ else if ((m_has_start && is_gtid_at_or_after(&m_start, gtid)) &&
+ (!m_has_stop || is_gtid_at_or_before(&m_stop, gtid)))
+ {
+ m_is_active= TRUE;
+
+ DBUG_PRINT("gtid-event-filter",
+ ("Window: Begin (%d-%d-%llu, %d-%d-%llu]",
+ PARAM_GTID(m_start), PARAM_GTID(m_stop)));
+
+ /*
+ As the start of the range is exclusive, if this gtid is the start of
+ the range, exclude it
+ */
+ if (gtid->seq_no == m_start.seq_no)
+ should_exclude= TRUE;
+ else
+ should_exclude= FALSE;
+
+ if (m_has_stop && gtid->seq_no == m_stop.seq_no)
+ {
+ m_has_passed= TRUE;
+ DBUG_PRINT("gtid-event-filter",
+ ("Window: End (%d-%d-%llu, %d-%d-%llu]",
+ PARAM_GTID(m_start), PARAM_GTID(m_stop)));
+ }
+ }
+ } /* if (!m_is_active && !m_has_passed) */
+ else if (m_is_active && !m_has_passed)
+ {
+ /*
+ This window is currently active so we want the event group to be included
+ in the results. Additionally check if we are at the end of the window.
+ If no end of the window is provided, go indefinitely
+ */
+ should_exclude= FALSE;
+
+ if (m_has_stop && is_gtid_at_or_after(&m_stop, gtid))
+ {
+ DBUG_PRINT("gtid-event-filter",
+ ("Window: End (%d-%d-%llu, %d-%d-%llu]",
+ PARAM_GTID(m_start), PARAM_GTID(m_stop)));
+ m_is_active= FALSE;
+ m_has_passed= TRUE;
+
+ if (!is_gtid_at_or_before(&m_stop, gtid))
+ {
+ /*
+ The GTID is after the finite stop of the window, don't let it pass
+ through
+ */
+ should_exclude= TRUE;
+ }
+ }
+ }
+
+ return should_exclude;
+}
+
+my_bool Window_gtid_event_filter::has_finished()
+{
+ return m_has_stop ? m_has_passed : FALSE;
+}
+
+void free_gtid_filter_element(void *p)
+{
+ gtid_filter_element *gfe = (gtid_filter_element *) p;
+ if (gfe->filter)
+ delete gfe->filter;
+ my_free(gfe);
+}
+
+Id_delegating_gtid_event_filter::Id_delegating_gtid_event_filter()
+ : m_num_stateful_filters(0), m_num_completed_filters(0)
+{
+ my_hash_init(PSI_INSTRUMENT_ME, &m_filters_by_id_hash, &my_charset_bin, 32,
+ offsetof(gtid_filter_element, identifier),
+ sizeof(gtid_filter_identifier), NULL, free_gtid_filter_element,
+ HASH_UNIQUE);
+
+ m_default_filter= new Accept_all_gtid_filter();
+}
+
+Id_delegating_gtid_event_filter::~Id_delegating_gtid_event_filter()
+{
+ my_hash_free(&m_filters_by_id_hash);
+ delete m_default_filter;
+}
+
+void Id_delegating_gtid_event_filter::set_default_filter(
+ Gtid_event_filter *filter)
+{
+ if (m_default_filter)
+ delete m_default_filter;
+
+ m_default_filter= filter;
+}
+
+gtid_filter_element *
+Id_delegating_gtid_event_filter::find_or_create_filter_element_for_id(
+ gtid_filter_identifier filter_id)
+{
+ gtid_filter_element *fe= (gtid_filter_element *) my_hash_search(
+ &m_filters_by_id_hash, (const uchar *) &filter_id, 0);
+
+ if (!fe)
+ {
+ gtid_filter_element *new_fe= (gtid_filter_element *) my_malloc(
+ PSI_NOT_INSTRUMENTED, sizeof(gtid_filter_element), MYF(MY_WME));
+ new_fe->filter= NULL;
+ new_fe->identifier= filter_id;
+ if (my_hash_insert(&m_filters_by_id_hash, (uchar*) new_fe))
+ {
+ my_free(new_fe);
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return NULL;
+ }
+ fe= new_fe;
+ }
+
+ return fe;
+}
+
+my_bool Id_delegating_gtid_event_filter::has_finished()
+{
+ /*
+ If all user-defined filters have deactivated, we are effectively
+ deactivated
+ */
+ return m_num_stateful_filters &&
+ m_num_completed_filters == m_num_stateful_filters;
+}
+
+my_bool Id_delegating_gtid_event_filter::exclude(rpl_gtid *gtid)
+{
+ gtid_filter_identifier filter_id= get_id_from_gtid(gtid);
+ gtid_filter_element *filter_element= (gtid_filter_element *) my_hash_search(
+ &m_filters_by_id_hash, (const uchar *) &filter_id, 0);
+ Gtid_event_filter *filter=
+ (filter_element ? filter_element->filter : m_default_filter);
+ my_bool ret= TRUE;
+
+ if(!filter_element || !filter->has_finished())
+ {
+ ret= filter->exclude(gtid);
+
+ /*
+ If this is an explicitly defined filter, e.g. Window-based filter, check
+ if it has completed, and update the counter accordingly if so.
+ */
+ if (filter_element && filter->has_finished())
+ m_num_completed_filters++;
+ }
+
+ return ret;
+}
+
+Window_gtid_event_filter *
+Domain_gtid_event_filter::find_or_create_window_filter_for_id(
+ uint32 domain_id)
+{
+ gtid_filter_element *filter_element=
+ find_or_create_filter_element_for_id(domain_id);
+ Window_gtid_event_filter *wgef= NULL;
+
+ if (filter_element->filter == NULL)
+ {
+ /* New filter */
+ wgef= new Window_gtid_event_filter();
+ filter_element->filter= wgef;
+ }
+ else if (filter_element->filter->get_filter_type() == WINDOW_GTID_FILTER_TYPE)
+ {
+ /* We have an existing window filter here */
+ wgef= (Window_gtid_event_filter *) filter_element->filter;
+ }
+ else
+ {
+ /*
+ We have an existing filter but it is not of window type so propogate NULL
+ filter
+ */
+ sql_print_error("cannot subset domain id %d by position, another rule "
+ "exists on that domain",
+ domain_id);
+ }
+
+ return wgef;
+}
+
+static my_bool check_filter_entry_validity(void *entry, void *are_filters_invalid_arg)
+{
+ gtid_filter_element *fe= (gtid_filter_element*) entry;
+
+ if (fe)
+ {
+ Gtid_event_filter *gef= fe->filter;
+ if (gef->get_filter_type() == Gtid_event_filter::WINDOW_GTID_FILTER_TYPE)
+ {
+ Window_gtid_event_filter *wgef= (Window_gtid_event_filter *) gef;
+ if (wgef->is_range_invalid())
+ {
+ *((int *) are_filters_invalid_arg)= 1;
+ return TRUE;
+ }
+ }
+ }
+ return FALSE;
+}
+
+int Domain_gtid_event_filter::validate_window_filters()
+{
+ int are_filters_invalid= 0;
+ my_hash_iterate(&m_filters_by_id_hash, check_filter_entry_validity, &are_filters_invalid);
+ return are_filters_invalid;
+}
+
+int Domain_gtid_event_filter::add_start_gtid(rpl_gtid *gtid)
+{
+ int err= 0;
+ Window_gtid_event_filter *filter_to_update=
+ find_or_create_window_filter_for_id(gtid->domain_id);
+
+ if (filter_to_update == NULL)
+ {
+ err= 1;
+ }
+ else if (!(err= filter_to_update->set_start_gtid(gtid)))
+ {
+ gtid_filter_element *fe= (gtid_filter_element *) my_hash_search(
+ &m_filters_by_id_hash, (const uchar *) &(gtid->domain_id), 0);
+ insert_dynamic(&m_start_filters, (const void *) &fe);
+ }
+
+ return err;
+}
+
+int Domain_gtid_event_filter::add_stop_gtid(rpl_gtid *gtid)
+{
+ int err= 0;
+ Window_gtid_event_filter *filter_to_update=
+ find_or_create_window_filter_for_id(gtid->domain_id);
+
+ if (filter_to_update == NULL)
+ {
+ err= 1;
+ }
+ else if (!(err= filter_to_update->set_stop_gtid(gtid)))
+ {
+ gtid_filter_element *fe= (gtid_filter_element *) my_hash_search(
+ &m_filters_by_id_hash, (const uchar *) &(gtid->domain_id), 0);
+ insert_dynamic(&m_stop_filters, (const void *) &fe);
+
+ /*
+ A window with a stop position can be disabled, and is therefore stateful.
+ */
+ m_num_stateful_filters++;
+
+ /*
+ Default filtering behavior changes with GTID stop positions, where we
+ exclude all domains not present in the stop list
+ */
+ if (m_default_filter->get_filter_type() == ACCEPT_ALL_GTID_FILTER_TYPE)
+ {
+ delete m_default_filter;
+ m_default_filter= new Reject_all_gtid_filter();
+ }
+ }
+
+ return err;
+}
+
+rpl_gtid *Domain_gtid_event_filter::get_start_gtids()
+{
+ rpl_gtid *gtid_list;
+ uint32 i;
+ size_t n_start_gtids= get_num_start_gtids();
+
+ gtid_list= (rpl_gtid *) my_malloc(
+ PSI_INSTRUMENT_ME, n_start_gtids * sizeof(rpl_gtid), MYF(MY_WME));
+
+ for (i = 0; i < n_start_gtids; i++)
+ {
+ gtid_filter_element *fe=
+ *(gtid_filter_element **) dynamic_array_ptr(&m_start_filters, i);
+ DBUG_ASSERT(fe->filter &&
+ fe->filter->get_filter_type() == WINDOW_GTID_FILTER_TYPE);
+ Window_gtid_event_filter *wgef=
+ (Window_gtid_event_filter *) fe->filter;
+
+ rpl_gtid win_start_gtid= wgef->get_start_gtid();
+ gtid_list[i]= win_start_gtid;
+ }
+
+ return gtid_list;
+}
+
+rpl_gtid *Domain_gtid_event_filter::get_stop_gtids()
+{
+ rpl_gtid *gtid_list;
+ uint32 i;
+ size_t n_stop_gtids= get_num_stop_gtids();
+
+ gtid_list= (rpl_gtid *) my_malloc(
+ PSI_INSTRUMENT_ME, n_stop_gtids * sizeof(rpl_gtid), MYF(MY_WME));
+
+ for (i = 0; i < n_stop_gtids; i++)
+ {
+ gtid_filter_element *fe=
+ *(gtid_filter_element **) dynamic_array_ptr(&m_stop_filters, i);
+ DBUG_ASSERT(fe->filter &&
+ fe->filter->get_filter_type() == WINDOW_GTID_FILTER_TYPE);
+ Window_gtid_event_filter *wgef=
+ (Window_gtid_event_filter *) fe->filter;
+
+ rpl_gtid win_stop_gtid= wgef->get_stop_gtid();
+ gtid_list[i]= win_stop_gtid;
+ }
+
+ return gtid_list;
+}
+
+void Domain_gtid_event_filter::clear_start_gtids()
+{
+ uint32 i;
+ for (i = 0; i < get_num_start_gtids(); i++)
+ {
+ gtid_filter_element *fe=
+ *(gtid_filter_element **) dynamic_array_ptr(&m_start_filters, i);
+ DBUG_ASSERT(fe->filter &&
+ fe->filter->get_filter_type() == WINDOW_GTID_FILTER_TYPE);
+ Window_gtid_event_filter *wgef=
+ (Window_gtid_event_filter *) fe->filter;
+
+ if (wgef->has_stop())
+ {
+ /*
+ Don't delete the whole filter if it already has a stop position attached
+ */
+ wgef->clear_start_pos();
+ }
+ else
+ {
+ /*
+ This domain only has a stop, so delete the whole filter
+ */
+ my_hash_delete(&m_filters_by_id_hash, (uchar *) fe);
+ }
+ }
+
+ reset_dynamic(&m_start_filters);
+}
+
+void Domain_gtid_event_filter::clear_stop_gtids()
+{
+ uint32 i;
+
+ for (i = 0; i < get_num_stop_gtids(); i++)
+ {
+ gtid_filter_element *fe=
+ *(gtid_filter_element **) dynamic_array_ptr(&m_stop_filters, i);
+ DBUG_ASSERT(fe->filter &&
+ fe->filter->get_filter_type() == WINDOW_GTID_FILTER_TYPE);
+ Window_gtid_event_filter *wgef=
+ (Window_gtid_event_filter *) fe->filter;
+
+ if (wgef->has_start())
+ {
+ /*
+ Don't delete the whole filter if it already has a start position
+ attached
+ */
+ wgef->clear_stop_pos();
+ }
+ else
+ {
+ /*
+ This domain only has a start, so delete the whole filter
+ */
+ my_hash_delete(&m_filters_by_id_hash, (uchar *) fe);
+ }
+ m_num_stateful_filters--;
+ }
+
+ /*
+ Stop positions were cleared and we want to be inclusive again of other
+ domains again
+ */
+ if (m_default_filter->get_filter_type() == REJECT_ALL_GTID_FILTER_TYPE)
+ {
+ delete m_default_filter;
+ m_default_filter= new Accept_all_gtid_filter();
+ }
+
+ reset_dynamic(&m_stop_filters);
+}
+
+my_bool Domain_gtid_event_filter::exclude(rpl_gtid *gtid)
+{
+ my_bool include_domain= TRUE;
+ /*
+ If GTID stop positions are provided, we limit the domains which are output
+ to only be those specified with stop positions
+ */
+ if (get_num_stop_gtids())
+ {
+ gtid_filter_identifier filter_id= get_id_from_gtid(gtid);
+ gtid_filter_element *filter_element=
+ (gtid_filter_element *) my_hash_search(&m_filters_by_id_hash,
+ (const uchar *) &filter_id, 0);
+ if (filter_element)
+ {
+ Gtid_event_filter *filter= filter_element->filter;
+ if (filter->get_filter_type() == WINDOW_GTID_FILTER_TYPE)
+ {
+ Window_gtid_event_filter *wgef= (Window_gtid_event_filter *) filter;
+ include_domain= wgef->has_stop();
+ }
+ }
+ }
+
+ return include_domain ? Id_delegating_gtid_event_filter::exclude(gtid)
+ : TRUE;
+}
diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h
index 11541c8000c..8b8c3bb9c92 100644
--- a/sql/rpl_gtid.h
+++ b/sql/rpl_gtid.h
@@ -28,6 +28,7 @@ extern const LEX_CSTRING rpl_gtid_slave_state_table_name;
class String;
#define GTID_MAX_STR_LENGTH (10+1+10+1+20)
+#define PARAM_GTID(G) G.domain_id, G.server_id, G.seq_no
struct rpl_gtid
{
@@ -36,6 +37,9 @@ struct rpl_gtid
uint64 seq_no;
};
+/* Data structure to help with quick lookup for filters. */
+typedef decltype(rpl_gtid::domain_id) gtid_filter_identifier;
+
inline bool operator==(const rpl_gtid& lhs, const rpl_gtid& rhs)
{
return
@@ -380,5 +384,449 @@ extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid,
extern int gtid_check_rpl_slave_state_table(TABLE *table);
extern rpl_gtid *gtid_parse_string_to_list(const char *p, size_t len,
uint32 *out_len);
+extern rpl_gtid *gtid_unpack_string_to_list(const char *p, size_t len,
+ uint32 *out_len);
+
+
+
+/*
+ This class ensures that the GTID state of an event stream is consistent with
+ the set of provided binary log files. In particular, it has two concerns:
+
+ 1) Ensuring that GTID events are monotonically increasing within each
+ domain
+ 2) Ensuring that the GTID state of the specified binary logs is consistent
+ both with the initial state that a user provides, and between
+ binary logs (if multiple are specified)
+*/
+class Binlog_gtid_state_validator
+{
+public:
+
+ struct audit_elem
+ {
+ uint32 domain_id;
+
+
+ /*
+ Holds the largest GTID received, and is indexed by domain_id
+ */
+ rpl_gtid last_gtid;
+
+ /*
+ Holds the largest GTID received, and is indexed by domain_id
+ */
+ rpl_gtid start_gtid;
+
+ /*
+ List of the problematic GTIDs received which were out of order
+ */
+ DYNAMIC_ARRAY late_gtids_real;
+
+ /*
+ For each problematic GTID in late_gtids_real, this list contains the last
+ GTID of the domain at the time of receiving the out of order GTID.
+ */
+ DYNAMIC_ARRAY late_gtids_previous;
+ };
+
+ Binlog_gtid_state_validator();
+ ~Binlog_gtid_state_validator();
+
+ /*
+ Initialize where we should start monitoring for invalid GTID entries
+ in the event stream. Note that these start positions must occur at or after
+ a given binary logs GTID state (from Gtid_list_log_event)
+ */
+ void initialize_start_gtids(rpl_gtid *start_gtids, size_t n_gtids);
+
+ /*
+ Initialize our current state so we know where to expect GTIDs to start
+ increasing from. Error if the state exists after our expected start_gtid
+ positions, because we know we will be missing event data (possibly from
+ a purged log).
+ */
+ my_bool initialize_gtid_state(FILE *out, rpl_gtid *gtids, size_t n_gtids);
+
+ /*
+ Ensures that the expected stop GTID positions exist within the specified
+ binary logs.
+ */
+ my_bool verify_stop_state(FILE *out, rpl_gtid *stop_gtids, size_t n_stop_gtids);
+
+ /*
+ Ensure a GTID state (e.g., from a Gtid_list_log_event) is consistent with
+ the current state of our auditing. For example, if we see a GTID from a
+ Gtid_list_log_event that is ahead of our current state for that domain, we
+ have missed events (perhaps from a missing log).
+ */
+ my_bool verify_gtid_state(FILE *out, rpl_gtid *gtid_state_cur);
+
+ /*
+ Take note of a new GTID being processed.
+
+ returns TRUE if the GTID is invalid, FALSE on success
+ */
+ my_bool record(rpl_gtid *gtid);
+
+ /*
+ Writes warnings/errors (if any) during GTID processing
+
+ Returns TRUE if any findings were reported, FALSE otherwise
+ */
+ my_bool report(FILE *out, my_bool is_strict_mode);
+
+ static void report_details(FILE *out, const char *format, va_list args)
+ {
+ vfprintf(out, format, args);
+ fprintf(out, "\n");
+ }
+
+ static void warn(FILE *out, const char *format,...)
+ {
+ va_list args;
+ va_start(args, format);
+ fprintf(out, "WARNING: ");
+ report_details(out, format, args);
+ }
+
+ static void error(FILE *out, const char *format,...)
+ {
+ va_list args;
+ va_start(args, format);
+ fprintf(out, "ERROR: ");
+ report_details(out, format, args);
+ }
+
+private:
+
+ /*
+ Holds the records for each domain id we are monitoring. Elements are of type
+ `struct audit_elem` and indexed by domian_id.
+ */
+ HASH m_audit_elem_domain_lookup;
+};
+
+/*
+ Interface to support different methods of filtering log events by GTID
+*/
+class Gtid_event_filter
+{
+public:
+ Gtid_event_filter() {};
+ virtual ~Gtid_event_filter() {};
+
+ enum gtid_event_filter_type
+ {
+ DELEGATING_GTID_FILTER_TYPE = 1,
+ WINDOW_GTID_FILTER_TYPE = 2,
+ ACCEPT_ALL_GTID_FILTER_TYPE = 3,
+ REJECT_ALL_GTID_FILTER_TYPE = 4
+ };
+
+ /*
+ Run the filter on an input gtid to test if the corresponding log events
+ should be excluded from a result
+
+ Returns TRUE when the event group corresponding to the input GTID should be
+ excluded.
+ Returns FALSE when the event group should be included.
+ */
+ virtual my_bool exclude(rpl_gtid *) = 0;
+
+ /*
+ The gtid_event_filter_type that corresponds to the underlying filter
+ implementation
+ */
+ virtual uint32 get_filter_type() = 0;
+
+ /*
+ For filters that can maintain their own state, this tests if the filter
+ implementation has completed.
+
+ Returns TRUE when completed, and FALSE when the filter has not finished.
+ */
+ virtual my_bool has_finished() = 0;
+};
+
+/*
+ Filter implementation which will include any and all input GTIDs. This is
+ used to set default behavior for GTIDs that do not have explicit filters
+ set on their domain_id, e.g. when a Window_gtid_event_filter is used for
+ a specific domain, then all other domain_ids will be accepted using this
+ filter implementation.
+*/
+class Accept_all_gtid_filter : public Gtid_event_filter
+{
+public:
+ Accept_all_gtid_filter() {}
+ ~Accept_all_gtid_filter() {}
+ my_bool exclude(rpl_gtid *gtid) { return FALSE; }
+ uint32 get_filter_type() { return ACCEPT_ALL_GTID_FILTER_TYPE; }
+ my_bool has_finished() { return FALSE; }
+};
+
+/*
+ Filter implementation to exclude all tested GTIDs.
+*/
+class Reject_all_gtid_filter : public Gtid_event_filter
+{
+public:
+ Reject_all_gtid_filter() {}
+ ~Reject_all_gtid_filter() {}
+ my_bool exclude(rpl_gtid *gtid) { return TRUE; }
+ uint32 get_filter_type() { return REJECT_ALL_GTID_FILTER_TYPE; }
+ my_bool has_finished() { return FALSE; }
+};
+
+/*
+ A filter implementation that includes events that exist between two GTID
+ positions, m_start (exclusive) and m_stop (inclusive), within a domain.
+
+ This filter is stateful, such that it expects GTIDs to be an increasing
+ stream, and internally, the window will activate and deactivate when the start
+ and stop positions of the event stream have passed through, respectively.
+*/
+class Window_gtid_event_filter : public Gtid_event_filter
+{
+public:
+ Window_gtid_event_filter();
+ ~Window_gtid_event_filter() {}
+
+ my_bool exclude(rpl_gtid*);
+ my_bool has_finished();
+
+ /*
+ Set the GTID that begins this window (exclusive)
+
+ Returns 0 on ok, non-zero on error
+ */
+ int set_start_gtid(rpl_gtid *start);
+
+ /*
+ Set the GTID that ends this window (inclusive)
+
+ Returns 0 on ok, non-zero on error
+ */
+ int set_stop_gtid(rpl_gtid *stop);
+
+ uint32 get_filter_type() { return WINDOW_GTID_FILTER_TYPE; }
+
+ /*
+ Validates the underlying range is correct, and writes an error if not, i.e.
+ m_start >= m_stop.
+
+ Returns FALSE on ok, TRUE if range is invalid
+ */
+ my_bool is_range_invalid();
+
+ /*
+ Getter/setter methods
+ */
+ my_bool has_start() { return m_has_start; }
+ my_bool has_stop() { return m_has_stop; }
+ rpl_gtid get_start_gtid() { return m_start; }
+ rpl_gtid get_stop_gtid() { return m_stop; }
+
+ void clear_start_pos()
+ {
+ m_has_start= FALSE;
+ m_start= {0, 0, 0};
+ }
+
+ void clear_stop_pos()
+ {
+ m_has_stop= FALSE;
+ m_stop= {0, 0, 0};
+ }
+
+protected:
+
+ /*
+ When processing GTID streams, the order in which they are processed should
+ be sequential with no gaps between events. If a gap is found within a
+ window, warn the user.
+ */
+ void verify_gtid_is_expected(rpl_gtid *gtid);
+
+private:
+
+ enum warning_flags
+ {
+ WARN_GTID_SEQUENCE_NUMBER_OUT_OF_ORDER= 0x1
+ };
+
+ /*
+ m_has_start : Indicates if a start to this window has been explicitly
+ provided. A window starts immediately if not provided.
+ */
+ my_bool m_has_start;
+
+ /*
+ m_has_stop : Indicates if a stop to this window has been explicitly
+ provided. A window continues indefinitely if not provided.
+ */
+ my_bool m_has_stop;
+
+ /*
+ m_is_active : Indicates whether or not the program is currently reading
+ events from within this window. When TRUE, events with
+ different server ids than those specified by m_start or
+ m_stop will be passed through.
+ */
+ my_bool m_is_active;
+
+ /*
+ m_has_passed : Indicates whether or not the program is currently reading
+ events from within this window.
+ */
+ my_bool m_has_passed;
+
+ /* m_start : marks the GTID that begins the window (exclusive). */
+ rpl_gtid m_start;
+
+ /* m_stop : marks the GTID that ends the range (inclusive). */
+ rpl_gtid m_stop;
+};
+
+typedef struct _gtid_filter_element
+{
+ Gtid_event_filter *filter;
+ gtid_filter_identifier identifier; /* Used for HASH lookup */
+} gtid_filter_element;
+
+/*
+ Gtid_event_filter subclass which has no specific implementation, but rather
+ delegates the filtering to specific identifiable/mapped implementations.
+
+ A default filter is used for GTIDs that are passed through which no explicit
+ filter can be identified.
+
+ This class should be subclassed, where the get_id_from_gtid function
+ specifies how to extract the filter identifier from a GTID.
+*/
+class Id_delegating_gtid_event_filter : public Gtid_event_filter
+{
+public:
+ Id_delegating_gtid_event_filter();
+ ~Id_delegating_gtid_event_filter();
+
+ my_bool exclude(rpl_gtid *gtid);
+ my_bool has_finished();
+ void set_default_filter(Gtid_event_filter *default_filter);
+
+ uint32 get_filter_type() { return DELEGATING_GTID_FILTER_TYPE; }
+
+ virtual gtid_filter_identifier get_id_from_gtid(rpl_gtid *) = 0;
+
+protected:
+
+ uint32 m_num_stateful_filters;
+ uint32 m_num_completed_filters;
+ Gtid_event_filter *m_default_filter;
+
+ HASH m_filters_by_id_hash;
+
+ gtid_filter_element *find_or_create_filter_element_for_id(gtid_filter_identifier);
+};
+
+/*
+ A subclass of Id_delegating_gtid_event_filter which identifies filters using the
+ domain id of a GTID.
+
+ Additional helper functions include:
+ add_start_gtid(GTID) : adds a start GTID position to this filter, to be
+ identified by its domain id
+ add_stop_gtid(GTID) : adds a stop GTID position to this filter, to be
+ identified by its domain id
+ clear_start_gtids() : removes existing GTID start positions
+ clear_stop_gtids() : removes existing GTID stop positions
+ get_start_gtids() : gets all added GTID start positions
+ get_stop_gtids() : gets all added GTID stop positions
+ get_num_start_gtids() : gets the count of added GTID start positions
+ get_num_stop_gtids() : gets the count of added GTID stop positions
+*/
+class Domain_gtid_event_filter : public Id_delegating_gtid_event_filter
+{
+public:
+ Domain_gtid_event_filter()
+ {
+ my_init_dynamic_array(PSI_INSTRUMENT_ME, &m_start_filters,
+ sizeof(gtid_filter_element*), 8, 8, MYF(0));
+ my_init_dynamic_array(PSI_INSTRUMENT_ME, &m_stop_filters,
+ sizeof(gtid_filter_element*), 8, 8, MYF(0));
+ }
+ ~Domain_gtid_event_filter()
+ {
+ delete_dynamic(&m_start_filters);
+ delete_dynamic(&m_stop_filters);
+ }
+
+ /*
+ Returns the domain id of from the input GTID
+ */
+ gtid_filter_identifier get_id_from_gtid(rpl_gtid *gtid)
+ {
+ return gtid->domain_id;
+ }
+
+ /*
+ Override Id_delegating_gtid_event_filter to extend with domain specific
+ filtering logic
+ */
+ my_bool exclude(rpl_gtid*);
+
+ /*
+ Validates that window filters with both a start and stop GTID satisfy
+ stop_gtid > start_gtid
+
+ Returns 0 on ok, non-zero if any windows are invalid.
+ */
+ int validate_window_filters();
+
+ /*
+ Helper function to start a GTID window filter at the given GTID
+
+ Returns 0 on ok, non-zero on error
+ */
+ int add_start_gtid(rpl_gtid *gtid);
+
+ /*
+ Helper function to end a GTID window filter at the given GTID
+
+ Returns 0 on ok, non-zero on error
+ */
+ int add_stop_gtid(rpl_gtid *gtid);
+
+ /*
+ If start or stop position is respecified, we remove all existing values
+ and start over with the new specification.
+ */
+ void clear_start_gtids();
+ void clear_stop_gtids();
+
+ /*
+ Return list of all GTIDs used as start position.
+
+ Note that this list is allocated and it is up to the user to free it
+ */
+ rpl_gtid *get_start_gtids();
+
+ /*
+ Return list of all GTIDs used as stop position.
+
+ Note that this list is allocated and it is up to the user to free it
+ */
+ rpl_gtid *get_stop_gtids();
+
+ size_t get_num_start_gtids() { return m_start_filters.elements; }
+ size_t get_num_stop_gtids() { return m_stop_filters.elements; }
+
+private:
+ DYNAMIC_ARRAY m_start_filters;
+ DYNAMIC_ARRAY m_stop_filters;
+
+ Window_gtid_event_filter *find_or_create_window_filter_for_id(gtid_filter_identifier);
+};
#endif /* RPL_GTID_H */