diff options
author | Brandon Nesterenko <brandon.nesterenko@mariadb.com> | 2021-08-11 11:29:37 -0600 |
---|---|---|
committer | Brandon Nesterenko <brandon.nesterenko@mariadb.com> | 2021-12-14 14:26:43 -0700 |
commit | 03db20383220b3548cdacd1bb7712fba108881d4 (patch) | |
tree | 498a93b7f445cea6985682d946da6640fc315f65 /sql | |
parent | 2d21917e7db2db0900671aac2e29f49e4ff2acd7 (diff) | |
download | mariadb-git-bb-10.8-MDEV-4989.tar.gz |
MDEV-4989: Support for GTID in mysqlbinlogbb-10.8-MDEV-4989
New Feature:
===========
This commit extends the mariadb-binlog capabilities to allow events
to be filtered by GTID ranges. More specifically, the following
capabilities are addressed:
1) GTIDs can be used to filter results on local binlog files
2) GTIDs can be used to filter results from remote servers
3) For a given GTID range, its start-position is exclusive and
its stop-position is inclusive. This allows users to receive
events strictly after those that they already have, and is
useful in cases such as: 1) events were received out of order
and should be re-sent, or 2) specifying the gtid state of a
slave to get events newer than their current state.
domain id are reset to their former values. If a seq_no is 0
for start-position, it means to include the entirety of the
domain. If a seq_no is 0 for stop-position, it means to
exclude all events from that domain.
4) Implemented --gtid-strict-mode that ensures the GTID event
stream in each domain is monotonically increasing
5) Added new level of verbosity in mysqlbinlog -vvv to print
additional diagnostic information about invalid GTID states
To facilitate these features, the --start-position and
--stop-position arguments have been extended to accept values
formatted as a list of GTID positions, e.g.
--start-position=0-1-0,1-2-55
A few additional notess:
1) this commit squashes together the commits:
f4319661120e-78a9d49907ba
2) Changed rpl.rpl_blackhole_row_annotate test because it has
out of order GTIDs in its binlog, so I added
--skip-gtid-strict-mode
3) After all binlog events have been written, the session server
id and domain id are reset to their values in the global state.
Reviewed By:
===========
Andrei Elkin: <andrei.elkin@mariadb.com>
Diffstat (limited to 'sql')
-rw-r--r-- | sql/log_event.h | 48 | ||||
-rw-r--r-- | sql/log_event_client.cc | 2 | ||||
-rw-r--r-- | sql/rpl_gtid.cc | 846 | ||||
-rw-r--r-- | sql/rpl_gtid.h | 448 |
4 files changed, 1342 insertions, 2 deletions
diff --git a/sql/log_event.h b/sql/log_event.h index 3adc7a26d93..db77be2d99b 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -920,6 +920,20 @@ typedef struct st_print_event_info IO_CACHE review_sql_cache; #endif FILE *file; + + + + /* + Used to include the events within a GTID start/stop boundary + */ + my_bool m_is_event_group_active; + + /* + Tracks whether or not output events must be explicitly activated in order + to be printed + */ + my_bool m_is_event_group_filtering_enabled; + st_print_event_info(); ~st_print_event_info() { @@ -942,6 +956,40 @@ typedef struct st_print_event_info copy_event_cache_to_file_and_reinit(&body_cache, file); fflush(file); } + + /* + Notify that all events part of the current group should be printed + */ + void activate_current_event_group() + { + m_is_event_group_active= TRUE; + } + void deactivate_current_event_group() + { + m_is_event_group_active= FALSE; + } + + /* + Used for displaying events part of an event group. + Returns TRUE when both event group filtering is enabled and the current + event group should be displayed, OR if event group filtering is + disabled. More specifically, if filtering is disabled, all events + should be shown. + Returns FALSE when event group filtering is enabled and the current event + group is filtered out. + */ + my_bool is_event_group_active() + { + return m_is_event_group_filtering_enabled ? m_is_event_group_active : TRUE; + } + + /* + Notify that events must be explicitly activated in order to be printed + */ + void enable_event_group_filtering() + { + m_is_event_group_filtering_enabled= TRUE; + } } PRINT_EVENT_INFO; #endif diff --git a/sql/log_event_client.cc b/sql/log_event_client.cc index 7e0bf7d8e4c..66f2753d96b 100644 --- a/sql/log_event_client.cc +++ b/sql/log_event_client.cc @@ -3791,6 +3791,8 @@ st_print_event_info::st_print_event_info() printed_fd_event=FALSE; file= 0; base64_output_mode=BASE64_OUTPUT_UNSPEC; + m_is_event_group_active= TRUE; + m_is_event_group_filtering_enabled= FALSE; open_cached_file(&head_cache, NULL, NULL, 0, flags); open_cached_file(&body_cache, NULL, NULL, 0, flags); open_cached_file(&tail_cache, NULL, NULL, 0, flags); diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc index 306ae878060..c1031133530 100644 --- a/sql/rpl_gtid.cc +++ b/sql/rpl_gtid.cc @@ -17,6 +17,7 @@ /* Definitions for MariaDB global transaction ID (GTID). */ +#ifndef MYSQL_CLIENT #include "mariadb.h" #include "sql_priv.h" #include "unireg.h" @@ -24,7 +25,6 @@ #include "sql_base.h" #include "sql_parse.h" #include "key.h" -#include "rpl_gtid.h" #include "rpl_rli.h" #include "slave.h" #include "log_event.h" @@ -1273,6 +1273,7 @@ rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid) return true; } +#endif /* Parse a GTID at the start of a string, and update the pointer to point @@ -1310,7 +1311,6 @@ gtid_parser_helper(const char **ptr, const char *end, rpl_gtid *out_gtid) return 0; } - rpl_gtid * gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len) { @@ -1349,6 +1349,7 @@ gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len) return list; } +#ifndef MYSQL_CLIENT /* Update the slave replication state with the GTID position obtained from @@ -2974,3 +2975,844 @@ gtid_waiting::remove_from_wait_queue(gtid_waiting::hash_element *he, queue_remove(&he->queue, elem->queue_idx); } + +#endif + +void free_domain_lookup_element(void *p) +{ + struct Binlog_gtid_state_validator::audit_elem *audit_elem= + (struct Binlog_gtid_state_validator::audit_elem *) p; + delete_dynamic(&audit_elem->late_gtids_previous); + delete_dynamic(&audit_elem->late_gtids_real); + my_free(audit_elem); +} + +Binlog_gtid_state_validator::Binlog_gtid_state_validator() +{ + my_hash_init(PSI_INSTRUMENT_ME, &m_audit_elem_domain_lookup, &my_charset_bin, 32, + offsetof(struct audit_elem, domain_id), sizeof(uint32), + NULL, free_domain_lookup_element, HASH_UNIQUE); +} + +Binlog_gtid_state_validator::~Binlog_gtid_state_validator() +{ + my_hash_free(&m_audit_elem_domain_lookup); +} + +void Binlog_gtid_state_validator::initialize_start_gtids(rpl_gtid *start_gtids, + size_t n_gtids) +{ + size_t i; + for(i= 0; i < n_gtids; i++) + { + rpl_gtid *domain_state_gtid= &start_gtids[i]; + + /* + If we are initializing from a GLLE, we can have repeat domain ids from + differing servers, so we want to ensure our start gtid matches the last + known position + */ + struct audit_elem *audit_elem= (struct audit_elem *) my_hash_search( + &m_audit_elem_domain_lookup, + (const uchar *) &(domain_state_gtid->domain_id), 0); + if (audit_elem) + { + /* + We have this domain already specified, so try to overwrite with the + more recent GTID + */ + if (domain_state_gtid->seq_no > audit_elem->start_gtid.seq_no) + audit_elem->start_gtid = *domain_state_gtid; + continue; + } + + /* Initialize a new domain */ + audit_elem= (struct audit_elem *) my_malloc( + PSI_NOT_INSTRUMENTED, sizeof(struct audit_elem), MYF(MY_WME)); + if (!audit_elem) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return; + } + + audit_elem->domain_id= start_gtids[i].domain_id; + audit_elem->start_gtid= start_gtids[i]; + audit_elem->last_gtid= {audit_elem->domain_id, 0, 0}; + + my_init_dynamic_array(PSI_INSTRUMENT_ME, &audit_elem->late_gtids_real, + sizeof(rpl_gtid), 8, 8, MYF(0)); + my_init_dynamic_array(PSI_INSTRUMENT_ME, &audit_elem->late_gtids_previous, + sizeof(rpl_gtid), 8, 8, MYF(0)); + + if (my_hash_insert(&m_audit_elem_domain_lookup, (uchar *) audit_elem)) + { + my_free(audit_elem); + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return; + } + } +} + +my_bool Binlog_gtid_state_validator::initialize_gtid_state(FILE *out, + rpl_gtid *gtids, + size_t n_gtids) +{ + size_t i; + my_bool err= FALSE; + + /* + We weren't initialized with starting positions explicitly, so assume the + starting positions of the current gtid state + */ + if (!m_audit_elem_domain_lookup.records) + initialize_start_gtids(gtids, n_gtids); + + for(i= 0; i < n_gtids; i++) + { + rpl_gtid *domain_state_gtid= >ids[i]; + + struct audit_elem *audit_elem= (struct audit_elem *) my_hash_search( + &m_audit_elem_domain_lookup, + (const uchar *) &(domain_state_gtid->domain_id), 0); + + if (!audit_elem) + { + Binlog_gtid_state_validator::error( + out, + "Starting GTID position list does not specify an initial value " + "for domain %u, whose events may be present in the requested binlog " + "file(s). The last known position for this domain was %u-%u-%llu.", + domain_state_gtid->domain_id, PARAM_GTID((*domain_state_gtid))); + err= TRUE; + continue; + } + + if (audit_elem->start_gtid.seq_no < domain_state_gtid->seq_no) + { + Binlog_gtid_state_validator::error( + out, + "Binary logs are missing data for domain %u. Expected data to " + "start from state %u-%u-%llu; however, the initial GTID state of " + "the logs was %u-%u-%llu.", + domain_state_gtid->domain_id, PARAM_GTID(audit_elem->start_gtid), + PARAM_GTID((*domain_state_gtid))); + err= TRUE; + continue; + } + + if (domain_state_gtid->seq_no > audit_elem->last_gtid.seq_no) + audit_elem->last_gtid= *domain_state_gtid; + } + return err; +} + +my_bool Binlog_gtid_state_validator::verify_stop_state(FILE *out, + rpl_gtid *stop_gtids, + size_t n_stop_gtids) +{ + size_t i; + for(i= 0; i < n_stop_gtids; i++) + { + rpl_gtid *stop_gtid= &stop_gtids[i]; + + struct audit_elem *audit_elem= (struct audit_elem *) my_hash_search( + &m_audit_elem_domain_lookup, + (const uchar *) &(stop_gtid->domain_id), 0); + + /* + It is okay if stop gtid doesn't exist in current state because it will be treated + as a new domain + */ + if (audit_elem && stop_gtid->seq_no <= audit_elem->start_gtid.seq_no) + { + Binlog_gtid_state_validator::error( + out, + "--stop-position GTID %u-%u-%llu does not exist in the " + "specified binlog files. The current GTID state of domain %u in the " + "specified binary logs is %u-%u-%llu", + PARAM_GTID((*stop_gtid)), stop_gtid->domain_id, + PARAM_GTID(audit_elem->start_gtid)); + return TRUE; + } + } + + /* No issues with any GTIDs */ + return FALSE; +} + +my_bool +Binlog_gtid_state_validator::verify_gtid_state(FILE *out, + rpl_gtid *domain_state_gtid) +{ + struct audit_elem *audit_elem= (struct audit_elem *) my_hash_search( + &m_audit_elem_domain_lookup, + (const uchar *) &(domain_state_gtid->domain_id), 0); + + if (!audit_elem) + { + Binlog_gtid_state_validator::warn( + out, + "Binary logs are missing data for domain %u. The current binary log " + "specified its " + "current state for this domain as %u-%u-%llu, but neither the " + "starting GTID position list nor any processed events have " + "mentioned " + "this domain.", + domain_state_gtid->domain_id, PARAM_GTID((*domain_state_gtid))); + return TRUE; + } + + if (audit_elem->last_gtid.seq_no < domain_state_gtid->seq_no) + { + Binlog_gtid_state_validator::warn( + out, + "Binary logs are missing data for domain %u. The current binary log " + "state is %u-%u-%llu, but the last seen event was %u-%u-%llu.", + domain_state_gtid->domain_id, PARAM_GTID((*domain_state_gtid)), + PARAM_GTID(audit_elem->last_gtid)); + return TRUE; + } + + return FALSE; +} + +my_bool Binlog_gtid_state_validator::record(rpl_gtid *gtid) +{ + struct audit_elem *audit_elem= (struct audit_elem *) my_hash_search( + &m_audit_elem_domain_lookup, (const uchar *) &(gtid->domain_id), 0); + + if (!audit_elem) + { + /* + We haven't seen any GTIDs in this domian yet. Perform initial set up for + this domain so we can monitor its events. + */ + audit_elem= (struct audit_elem *) my_malloc( + PSI_NOT_INSTRUMENTED, sizeof(struct audit_elem), MYF(MY_WME)); + if (!audit_elem) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return TRUE; + } + + audit_elem->domain_id= gtid->domain_id; + audit_elem->last_gtid= *gtid; + audit_elem->start_gtid= {gtid->domain_id, 0, 0}; + + my_init_dynamic_array(PSI_INSTRUMENT_ME, &audit_elem->late_gtids_real, + sizeof(rpl_gtid), 8, 8, MYF(0)); + my_init_dynamic_array(PSI_INSTRUMENT_ME, &audit_elem->late_gtids_previous, + sizeof(rpl_gtid), 8, 8, MYF(0)); + + if (my_hash_insert(&m_audit_elem_domain_lookup, (uchar *) audit_elem)) + { + my_free(audit_elem); + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return TRUE; + } + } + else + { + /* Out of order check */ + if (gtid->seq_no <= audit_elem->last_gtid.seq_no && + gtid->seq_no >= audit_elem->start_gtid.seq_no) + { + /* GTID is out of order */ + insert_dynamic(&audit_elem->late_gtids_real, (const void *) gtid); + insert_dynamic(&audit_elem->late_gtids_previous, + (const void *) &(audit_elem->last_gtid)); + + return TRUE; + } + else + { + /* GTID is valid */ + audit_elem->last_gtid= *gtid; + } + } + + return FALSE; +} + +/* + Data structure used to help pass data into report_audit_findings because + my_hash_iterate only passes one parameter +*/ +struct gtid_report_ctx +{ + FILE *out_file; + my_bool is_strict_mode; + my_bool contains_err; +}; + +static my_bool report_audit_findings(void *entry, void *report_ctx_arg) +{ + struct Binlog_gtid_state_validator::audit_elem *audit_el= + (struct Binlog_gtid_state_validator::audit_elem *) entry; + + struct gtid_report_ctx *report_ctx= + (struct gtid_report_ctx *) report_ctx_arg; + FILE *out= report_ctx->out_file; + my_bool is_strict_mode= report_ctx->is_strict_mode; + size_t i; + void (*report_f)(FILE*, const char*, ...); + + if (is_strict_mode) + report_f= Binlog_gtid_state_validator::error; + else + report_f= Binlog_gtid_state_validator::warn; + + if (audit_el) + { + if (audit_el->last_gtid.seq_no < audit_el->start_gtid.seq_no) + { + report_f(out, + "Binary logs never reached expected GTID state of %u-%u-%llu", + PARAM_GTID(audit_el->start_gtid)); + report_ctx->contains_err= TRUE; + } + + /* Report any out of order GTIDs */ + for(i= 0; i < audit_el->late_gtids_real.elements; i++) + { + rpl_gtid *real_gtid= + (rpl_gtid *) dynamic_array_ptr(&(audit_el->late_gtids_real), i); + rpl_gtid *last_gtid= (rpl_gtid *) dynamic_array_ptr( + &(audit_el->late_gtids_previous), i); + DBUG_ASSERT(real_gtid && last_gtid); + + report_f(out, + "Found out of order GTID. Got %u-%u-%llu after %u-%u-%llu", + PARAM_GTID((*real_gtid)), PARAM_GTID((*last_gtid))); + report_ctx->contains_err= TRUE; + } + } + + return FALSE; +} + +my_bool Binlog_gtid_state_validator::report(FILE *out, my_bool is_strict_mode) +{ + struct gtid_report_ctx report_ctx; + report_ctx.out_file= out; + report_ctx.is_strict_mode= is_strict_mode; + report_ctx.contains_err= FALSE; + my_hash_iterate(&m_audit_elem_domain_lookup, report_audit_findings, &report_ctx); + fflush(out); + return is_strict_mode ? report_ctx.contains_err : FALSE; +} + +Window_gtid_event_filter::Window_gtid_event_filter() + : m_has_start(FALSE), m_has_stop(FALSE), m_is_active(FALSE), + m_has_passed(FALSE) +{ + // m_start and m_stop do not need initial values if unused +} + +int Window_gtid_event_filter::set_start_gtid(rpl_gtid *start) +{ + if (m_has_start) + { + sql_print_error( + "Start position cannot have repeated domain " + "ids (found %u-%u-%llu when %u-%u-%llu was previously specified)", + PARAM_GTID((*start)), PARAM_GTID(m_start)); + return 1; + } + + m_has_start= TRUE; + m_start= *start; + return 0; +} + +int Window_gtid_event_filter::set_stop_gtid(rpl_gtid *stop) +{ + if (m_has_stop) + { + sql_print_error( + "Stop position cannot have repeated domain " + "ids (found %u-%u-%llu when %u-%u-%llu was previously specified)", + PARAM_GTID((*stop)), PARAM_GTID(m_stop)); + return 1; + } + + m_has_stop= TRUE; + m_stop= *stop; + return 0; +} + +my_bool Window_gtid_event_filter::is_range_invalid() +{ + if (m_has_start && m_has_stop && m_start.seq_no > m_stop.seq_no) + { + sql_print_error( + "Queried GTID range is invalid in strict mode. Stop position " + "%u-%u-%llu is not greater than or equal to start %u-%u-%llu.", + PARAM_GTID(m_stop), PARAM_GTID(m_start)); + return TRUE; + } + return FALSE; +} + +static inline my_bool is_gtid_at_or_after(rpl_gtid *boundary, + rpl_gtid *test_gtid) +{ + return test_gtid->domain_id == boundary->domain_id && + test_gtid->seq_no >= boundary->seq_no; +} + +static inline my_bool is_gtid_at_or_before(rpl_gtid *boundary, + rpl_gtid *test_gtid) +{ + return test_gtid->domain_id == boundary->domain_id && + test_gtid->seq_no <= boundary->seq_no; +} + +my_bool Window_gtid_event_filter::exclude(rpl_gtid *gtid) +{ + /* Assume result should be excluded to start */ + my_bool should_exclude= TRUE; + + DBUG_ASSERT((m_has_start || m_has_stop) && + (gtid->domain_id == m_start.domain_id || + gtid->domain_id == m_stop.domain_id)); + + if (!m_is_active && !m_has_passed) + { + /* + This filter has not yet been activated. Check if the gtid is within the + bounds of this window. + */ + + if (!m_has_start && is_gtid_at_or_before(&m_stop, gtid)) + { + /* + Start GTID was not provided, so we want to include everything from here + up to m_stop + */ + m_is_active= TRUE; + should_exclude= FALSE; + } + else if ((m_has_start && is_gtid_at_or_after(&m_start, gtid)) && + (!m_has_stop || is_gtid_at_or_before(&m_stop, gtid))) + { + m_is_active= TRUE; + + DBUG_PRINT("gtid-event-filter", + ("Window: Begin (%d-%d-%llu, %d-%d-%llu]", + PARAM_GTID(m_start), PARAM_GTID(m_stop))); + + /* + As the start of the range is exclusive, if this gtid is the start of + the range, exclude it + */ + if (gtid->seq_no == m_start.seq_no) + should_exclude= TRUE; + else + should_exclude= FALSE; + + if (m_has_stop && gtid->seq_no == m_stop.seq_no) + { + m_has_passed= TRUE; + DBUG_PRINT("gtid-event-filter", + ("Window: End (%d-%d-%llu, %d-%d-%llu]", + PARAM_GTID(m_start), PARAM_GTID(m_stop))); + } + } + } /* if (!m_is_active && !m_has_passed) */ + else if (m_is_active && !m_has_passed) + { + /* + This window is currently active so we want the event group to be included + in the results. Additionally check if we are at the end of the window. + If no end of the window is provided, go indefinitely + */ + should_exclude= FALSE; + + if (m_has_stop && is_gtid_at_or_after(&m_stop, gtid)) + { + DBUG_PRINT("gtid-event-filter", + ("Window: End (%d-%d-%llu, %d-%d-%llu]", + PARAM_GTID(m_start), PARAM_GTID(m_stop))); + m_is_active= FALSE; + m_has_passed= TRUE; + + if (!is_gtid_at_or_before(&m_stop, gtid)) + { + /* + The GTID is after the finite stop of the window, don't let it pass + through + */ + should_exclude= TRUE; + } + } + } + + return should_exclude; +} + +my_bool Window_gtid_event_filter::has_finished() +{ + return m_has_stop ? m_has_passed : FALSE; +} + +void free_gtid_filter_element(void *p) +{ + gtid_filter_element *gfe = (gtid_filter_element *) p; + if (gfe->filter) + delete gfe->filter; + my_free(gfe); +} + +Id_delegating_gtid_event_filter::Id_delegating_gtid_event_filter() + : m_num_stateful_filters(0), m_num_completed_filters(0) +{ + my_hash_init(PSI_INSTRUMENT_ME, &m_filters_by_id_hash, &my_charset_bin, 32, + offsetof(gtid_filter_element, identifier), + sizeof(gtid_filter_identifier), NULL, free_gtid_filter_element, + HASH_UNIQUE); + + m_default_filter= new Accept_all_gtid_filter(); +} + +Id_delegating_gtid_event_filter::~Id_delegating_gtid_event_filter() +{ + my_hash_free(&m_filters_by_id_hash); + delete m_default_filter; +} + +void Id_delegating_gtid_event_filter::set_default_filter( + Gtid_event_filter *filter) +{ + if (m_default_filter) + delete m_default_filter; + + m_default_filter= filter; +} + +gtid_filter_element * +Id_delegating_gtid_event_filter::find_or_create_filter_element_for_id( + gtid_filter_identifier filter_id) +{ + gtid_filter_element *fe= (gtid_filter_element *) my_hash_search( + &m_filters_by_id_hash, (const uchar *) &filter_id, 0); + + if (!fe) + { + gtid_filter_element *new_fe= (gtid_filter_element *) my_malloc( + PSI_NOT_INSTRUMENTED, sizeof(gtid_filter_element), MYF(MY_WME)); + new_fe->filter= NULL; + new_fe->identifier= filter_id; + if (my_hash_insert(&m_filters_by_id_hash, (uchar*) new_fe)) + { + my_free(new_fe); + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return NULL; + } + fe= new_fe; + } + + return fe; +} + +my_bool Id_delegating_gtid_event_filter::has_finished() +{ + /* + If all user-defined filters have deactivated, we are effectively + deactivated + */ + return m_num_stateful_filters && + m_num_completed_filters == m_num_stateful_filters; +} + +my_bool Id_delegating_gtid_event_filter::exclude(rpl_gtid *gtid) +{ + gtid_filter_identifier filter_id= get_id_from_gtid(gtid); + gtid_filter_element *filter_element= (gtid_filter_element *) my_hash_search( + &m_filters_by_id_hash, (const uchar *) &filter_id, 0); + Gtid_event_filter *filter= + (filter_element ? filter_element->filter : m_default_filter); + my_bool ret= TRUE; + + if(!filter_element || !filter->has_finished()) + { + ret= filter->exclude(gtid); + + /* + If this is an explicitly defined filter, e.g. Window-based filter, check + if it has completed, and update the counter accordingly if so. + */ + if (filter_element && filter->has_finished()) + m_num_completed_filters++; + } + + return ret; +} + +Window_gtid_event_filter * +Domain_gtid_event_filter::find_or_create_window_filter_for_id( + uint32 domain_id) +{ + gtid_filter_element *filter_element= + find_or_create_filter_element_for_id(domain_id); + Window_gtid_event_filter *wgef= NULL; + + if (filter_element->filter == NULL) + { + /* New filter */ + wgef= new Window_gtid_event_filter(); + filter_element->filter= wgef; + } + else if (filter_element->filter->get_filter_type() == WINDOW_GTID_FILTER_TYPE) + { + /* We have an existing window filter here */ + wgef= (Window_gtid_event_filter *) filter_element->filter; + } + else + { + /* + We have an existing filter but it is not of window type so propogate NULL + filter + */ + sql_print_error("cannot subset domain id %d by position, another rule " + "exists on that domain", + domain_id); + } + + return wgef; +} + +static my_bool check_filter_entry_validity(void *entry, void *are_filters_invalid_arg) +{ + gtid_filter_element *fe= (gtid_filter_element*) entry; + + if (fe) + { + Gtid_event_filter *gef= fe->filter; + if (gef->get_filter_type() == Gtid_event_filter::WINDOW_GTID_FILTER_TYPE) + { + Window_gtid_event_filter *wgef= (Window_gtid_event_filter *) gef; + if (wgef->is_range_invalid()) + { + *((int *) are_filters_invalid_arg)= 1; + return TRUE; + } + } + } + return FALSE; +} + +int Domain_gtid_event_filter::validate_window_filters() +{ + int are_filters_invalid= 0; + my_hash_iterate(&m_filters_by_id_hash, check_filter_entry_validity, &are_filters_invalid); + return are_filters_invalid; +} + +int Domain_gtid_event_filter::add_start_gtid(rpl_gtid *gtid) +{ + int err= 0; + Window_gtid_event_filter *filter_to_update= + find_or_create_window_filter_for_id(gtid->domain_id); + + if (filter_to_update == NULL) + { + err= 1; + } + else if (!(err= filter_to_update->set_start_gtid(gtid))) + { + gtid_filter_element *fe= (gtid_filter_element *) my_hash_search( + &m_filters_by_id_hash, (const uchar *) &(gtid->domain_id), 0); + insert_dynamic(&m_start_filters, (const void *) &fe); + } + + return err; +} + +int Domain_gtid_event_filter::add_stop_gtid(rpl_gtid *gtid) +{ + int err= 0; + Window_gtid_event_filter *filter_to_update= + find_or_create_window_filter_for_id(gtid->domain_id); + + if (filter_to_update == NULL) + { + err= 1; + } + else if (!(err= filter_to_update->set_stop_gtid(gtid))) + { + gtid_filter_element *fe= (gtid_filter_element *) my_hash_search( + &m_filters_by_id_hash, (const uchar *) &(gtid->domain_id), 0); + insert_dynamic(&m_stop_filters, (const void *) &fe); + + /* + A window with a stop position can be disabled, and is therefore stateful. + */ + m_num_stateful_filters++; + + /* + Default filtering behavior changes with GTID stop positions, where we + exclude all domains not present in the stop list + */ + if (m_default_filter->get_filter_type() == ACCEPT_ALL_GTID_FILTER_TYPE) + { + delete m_default_filter; + m_default_filter= new Reject_all_gtid_filter(); + } + } + + return err; +} + +rpl_gtid *Domain_gtid_event_filter::get_start_gtids() +{ + rpl_gtid *gtid_list; + uint32 i; + size_t n_start_gtids= get_num_start_gtids(); + + gtid_list= (rpl_gtid *) my_malloc( + PSI_INSTRUMENT_ME, n_start_gtids * sizeof(rpl_gtid), MYF(MY_WME)); + + for (i = 0; i < n_start_gtids; i++) + { + gtid_filter_element *fe= + *(gtid_filter_element **) dynamic_array_ptr(&m_start_filters, i); + DBUG_ASSERT(fe->filter && + fe->filter->get_filter_type() == WINDOW_GTID_FILTER_TYPE); + Window_gtid_event_filter *wgef= + (Window_gtid_event_filter *) fe->filter; + + rpl_gtid win_start_gtid= wgef->get_start_gtid(); + gtid_list[i]= win_start_gtid; + } + + return gtid_list; +} + +rpl_gtid *Domain_gtid_event_filter::get_stop_gtids() +{ + rpl_gtid *gtid_list; + uint32 i; + size_t n_stop_gtids= get_num_stop_gtids(); + + gtid_list= (rpl_gtid *) my_malloc( + PSI_INSTRUMENT_ME, n_stop_gtids * sizeof(rpl_gtid), MYF(MY_WME)); + + for (i = 0; i < n_stop_gtids; i++) + { + gtid_filter_element *fe= + *(gtid_filter_element **) dynamic_array_ptr(&m_stop_filters, i); + DBUG_ASSERT(fe->filter && + fe->filter->get_filter_type() == WINDOW_GTID_FILTER_TYPE); + Window_gtid_event_filter *wgef= + (Window_gtid_event_filter *) fe->filter; + + rpl_gtid win_stop_gtid= wgef->get_stop_gtid(); + gtid_list[i]= win_stop_gtid; + } + + return gtid_list; +} + +void Domain_gtid_event_filter::clear_start_gtids() +{ + uint32 i; + for (i = 0; i < get_num_start_gtids(); i++) + { + gtid_filter_element *fe= + *(gtid_filter_element **) dynamic_array_ptr(&m_start_filters, i); + DBUG_ASSERT(fe->filter && + fe->filter->get_filter_type() == WINDOW_GTID_FILTER_TYPE); + Window_gtid_event_filter *wgef= + (Window_gtid_event_filter *) fe->filter; + + if (wgef->has_stop()) + { + /* + Don't delete the whole filter if it already has a stop position attached + */ + wgef->clear_start_pos(); + } + else + { + /* + This domain only has a stop, so delete the whole filter + */ + my_hash_delete(&m_filters_by_id_hash, (uchar *) fe); + } + } + + reset_dynamic(&m_start_filters); +} + +void Domain_gtid_event_filter::clear_stop_gtids() +{ + uint32 i; + + for (i = 0; i < get_num_stop_gtids(); i++) + { + gtid_filter_element *fe= + *(gtid_filter_element **) dynamic_array_ptr(&m_stop_filters, i); + DBUG_ASSERT(fe->filter && + fe->filter->get_filter_type() == WINDOW_GTID_FILTER_TYPE); + Window_gtid_event_filter *wgef= + (Window_gtid_event_filter *) fe->filter; + + if (wgef->has_start()) + { + /* + Don't delete the whole filter if it already has a start position + attached + */ + wgef->clear_stop_pos(); + } + else + { + /* + This domain only has a start, so delete the whole filter + */ + my_hash_delete(&m_filters_by_id_hash, (uchar *) fe); + } + m_num_stateful_filters--; + } + + /* + Stop positions were cleared and we want to be inclusive again of other + domains again + */ + if (m_default_filter->get_filter_type() == REJECT_ALL_GTID_FILTER_TYPE) + { + delete m_default_filter; + m_default_filter= new Accept_all_gtid_filter(); + } + + reset_dynamic(&m_stop_filters); +} + +my_bool Domain_gtid_event_filter::exclude(rpl_gtid *gtid) +{ + my_bool include_domain= TRUE; + /* + If GTID stop positions are provided, we limit the domains which are output + to only be those specified with stop positions + */ + if (get_num_stop_gtids()) + { + gtid_filter_identifier filter_id= get_id_from_gtid(gtid); + gtid_filter_element *filter_element= + (gtid_filter_element *) my_hash_search(&m_filters_by_id_hash, + (const uchar *) &filter_id, 0); + if (filter_element) + { + Gtid_event_filter *filter= filter_element->filter; + if (filter->get_filter_type() == WINDOW_GTID_FILTER_TYPE) + { + Window_gtid_event_filter *wgef= (Window_gtid_event_filter *) filter; + include_domain= wgef->has_stop(); + } + } + } + + return include_domain ? Id_delegating_gtid_event_filter::exclude(gtid) + : TRUE; +} diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h index 11541c8000c..8b8c3bb9c92 100644 --- a/sql/rpl_gtid.h +++ b/sql/rpl_gtid.h @@ -28,6 +28,7 @@ extern const LEX_CSTRING rpl_gtid_slave_state_table_name; class String; #define GTID_MAX_STR_LENGTH (10+1+10+1+20) +#define PARAM_GTID(G) G.domain_id, G.server_id, G.seq_no struct rpl_gtid { @@ -36,6 +37,9 @@ struct rpl_gtid uint64 seq_no; }; +/* Data structure to help with quick lookup for filters. */ +typedef decltype(rpl_gtid::domain_id) gtid_filter_identifier; + inline bool operator==(const rpl_gtid& lhs, const rpl_gtid& rhs) { return @@ -380,5 +384,449 @@ extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid, extern int gtid_check_rpl_slave_state_table(TABLE *table); extern rpl_gtid *gtid_parse_string_to_list(const char *p, size_t len, uint32 *out_len); +extern rpl_gtid *gtid_unpack_string_to_list(const char *p, size_t len, + uint32 *out_len); + + + +/* + This class ensures that the GTID state of an event stream is consistent with + the set of provided binary log files. In particular, it has two concerns: + + 1) Ensuring that GTID events are monotonically increasing within each + domain + 2) Ensuring that the GTID state of the specified binary logs is consistent + both with the initial state that a user provides, and between + binary logs (if multiple are specified) +*/ +class Binlog_gtid_state_validator +{ +public: + + struct audit_elem + { + uint32 domain_id; + + + /* + Holds the largest GTID received, and is indexed by domain_id + */ + rpl_gtid last_gtid; + + /* + Holds the largest GTID received, and is indexed by domain_id + */ + rpl_gtid start_gtid; + + /* + List of the problematic GTIDs received which were out of order + */ + DYNAMIC_ARRAY late_gtids_real; + + /* + For each problematic GTID in late_gtids_real, this list contains the last + GTID of the domain at the time of receiving the out of order GTID. + */ + DYNAMIC_ARRAY late_gtids_previous; + }; + + Binlog_gtid_state_validator(); + ~Binlog_gtid_state_validator(); + + /* + Initialize where we should start monitoring for invalid GTID entries + in the event stream. Note that these start positions must occur at or after + a given binary logs GTID state (from Gtid_list_log_event) + */ + void initialize_start_gtids(rpl_gtid *start_gtids, size_t n_gtids); + + /* + Initialize our current state so we know where to expect GTIDs to start + increasing from. Error if the state exists after our expected start_gtid + positions, because we know we will be missing event data (possibly from + a purged log). + */ + my_bool initialize_gtid_state(FILE *out, rpl_gtid *gtids, size_t n_gtids); + + /* + Ensures that the expected stop GTID positions exist within the specified + binary logs. + */ + my_bool verify_stop_state(FILE *out, rpl_gtid *stop_gtids, size_t n_stop_gtids); + + /* + Ensure a GTID state (e.g., from a Gtid_list_log_event) is consistent with + the current state of our auditing. For example, if we see a GTID from a + Gtid_list_log_event that is ahead of our current state for that domain, we + have missed events (perhaps from a missing log). + */ + my_bool verify_gtid_state(FILE *out, rpl_gtid *gtid_state_cur); + + /* + Take note of a new GTID being processed. + + returns TRUE if the GTID is invalid, FALSE on success + */ + my_bool record(rpl_gtid *gtid); + + /* + Writes warnings/errors (if any) during GTID processing + + Returns TRUE if any findings were reported, FALSE otherwise + */ + my_bool report(FILE *out, my_bool is_strict_mode); + + static void report_details(FILE *out, const char *format, va_list args) + { + vfprintf(out, format, args); + fprintf(out, "\n"); + } + + static void warn(FILE *out, const char *format,...) + { + va_list args; + va_start(args, format); + fprintf(out, "WARNING: "); + report_details(out, format, args); + } + + static void error(FILE *out, const char *format,...) + { + va_list args; + va_start(args, format); + fprintf(out, "ERROR: "); + report_details(out, format, args); + } + +private: + + /* + Holds the records for each domain id we are monitoring. Elements are of type + `struct audit_elem` and indexed by domian_id. + */ + HASH m_audit_elem_domain_lookup; +}; + +/* + Interface to support different methods of filtering log events by GTID +*/ +class Gtid_event_filter +{ +public: + Gtid_event_filter() {}; + virtual ~Gtid_event_filter() {}; + + enum gtid_event_filter_type + { + DELEGATING_GTID_FILTER_TYPE = 1, + WINDOW_GTID_FILTER_TYPE = 2, + ACCEPT_ALL_GTID_FILTER_TYPE = 3, + REJECT_ALL_GTID_FILTER_TYPE = 4 + }; + + /* + Run the filter on an input gtid to test if the corresponding log events + should be excluded from a result + + Returns TRUE when the event group corresponding to the input GTID should be + excluded. + Returns FALSE when the event group should be included. + */ + virtual my_bool exclude(rpl_gtid *) = 0; + + /* + The gtid_event_filter_type that corresponds to the underlying filter + implementation + */ + virtual uint32 get_filter_type() = 0; + + /* + For filters that can maintain their own state, this tests if the filter + implementation has completed. + + Returns TRUE when completed, and FALSE when the filter has not finished. + */ + virtual my_bool has_finished() = 0; +}; + +/* + Filter implementation which will include any and all input GTIDs. This is + used to set default behavior for GTIDs that do not have explicit filters + set on their domain_id, e.g. when a Window_gtid_event_filter is used for + a specific domain, then all other domain_ids will be accepted using this + filter implementation. +*/ +class Accept_all_gtid_filter : public Gtid_event_filter +{ +public: + Accept_all_gtid_filter() {} + ~Accept_all_gtid_filter() {} + my_bool exclude(rpl_gtid *gtid) { return FALSE; } + uint32 get_filter_type() { return ACCEPT_ALL_GTID_FILTER_TYPE; } + my_bool has_finished() { return FALSE; } +}; + +/* + Filter implementation to exclude all tested GTIDs. +*/ +class Reject_all_gtid_filter : public Gtid_event_filter +{ +public: + Reject_all_gtid_filter() {} + ~Reject_all_gtid_filter() {} + my_bool exclude(rpl_gtid *gtid) { return TRUE; } + uint32 get_filter_type() { return REJECT_ALL_GTID_FILTER_TYPE; } + my_bool has_finished() { return FALSE; } +}; + +/* + A filter implementation that includes events that exist between two GTID + positions, m_start (exclusive) and m_stop (inclusive), within a domain. + + This filter is stateful, such that it expects GTIDs to be an increasing + stream, and internally, the window will activate and deactivate when the start + and stop positions of the event stream have passed through, respectively. +*/ +class Window_gtid_event_filter : public Gtid_event_filter +{ +public: + Window_gtid_event_filter(); + ~Window_gtid_event_filter() {} + + my_bool exclude(rpl_gtid*); + my_bool has_finished(); + + /* + Set the GTID that begins this window (exclusive) + + Returns 0 on ok, non-zero on error + */ + int set_start_gtid(rpl_gtid *start); + + /* + Set the GTID that ends this window (inclusive) + + Returns 0 on ok, non-zero on error + */ + int set_stop_gtid(rpl_gtid *stop); + + uint32 get_filter_type() { return WINDOW_GTID_FILTER_TYPE; } + + /* + Validates the underlying range is correct, and writes an error if not, i.e. + m_start >= m_stop. + + Returns FALSE on ok, TRUE if range is invalid + */ + my_bool is_range_invalid(); + + /* + Getter/setter methods + */ + my_bool has_start() { return m_has_start; } + my_bool has_stop() { return m_has_stop; } + rpl_gtid get_start_gtid() { return m_start; } + rpl_gtid get_stop_gtid() { return m_stop; } + + void clear_start_pos() + { + m_has_start= FALSE; + m_start= {0, 0, 0}; + } + + void clear_stop_pos() + { + m_has_stop= FALSE; + m_stop= {0, 0, 0}; + } + +protected: + + /* + When processing GTID streams, the order in which they are processed should + be sequential with no gaps between events. If a gap is found within a + window, warn the user. + */ + void verify_gtid_is_expected(rpl_gtid *gtid); + +private: + + enum warning_flags + { + WARN_GTID_SEQUENCE_NUMBER_OUT_OF_ORDER= 0x1 + }; + + /* + m_has_start : Indicates if a start to this window has been explicitly + provided. A window starts immediately if not provided. + */ + my_bool m_has_start; + + /* + m_has_stop : Indicates if a stop to this window has been explicitly + provided. A window continues indefinitely if not provided. + */ + my_bool m_has_stop; + + /* + m_is_active : Indicates whether or not the program is currently reading + events from within this window. When TRUE, events with + different server ids than those specified by m_start or + m_stop will be passed through. + */ + my_bool m_is_active; + + /* + m_has_passed : Indicates whether or not the program is currently reading + events from within this window. + */ + my_bool m_has_passed; + + /* m_start : marks the GTID that begins the window (exclusive). */ + rpl_gtid m_start; + + /* m_stop : marks the GTID that ends the range (inclusive). */ + rpl_gtid m_stop; +}; + +typedef struct _gtid_filter_element +{ + Gtid_event_filter *filter; + gtid_filter_identifier identifier; /* Used for HASH lookup */ +} gtid_filter_element; + +/* + Gtid_event_filter subclass which has no specific implementation, but rather + delegates the filtering to specific identifiable/mapped implementations. + + A default filter is used for GTIDs that are passed through which no explicit + filter can be identified. + + This class should be subclassed, where the get_id_from_gtid function + specifies how to extract the filter identifier from a GTID. +*/ +class Id_delegating_gtid_event_filter : public Gtid_event_filter +{ +public: + Id_delegating_gtid_event_filter(); + ~Id_delegating_gtid_event_filter(); + + my_bool exclude(rpl_gtid *gtid); + my_bool has_finished(); + void set_default_filter(Gtid_event_filter *default_filter); + + uint32 get_filter_type() { return DELEGATING_GTID_FILTER_TYPE; } + + virtual gtid_filter_identifier get_id_from_gtid(rpl_gtid *) = 0; + +protected: + + uint32 m_num_stateful_filters; + uint32 m_num_completed_filters; + Gtid_event_filter *m_default_filter; + + HASH m_filters_by_id_hash; + + gtid_filter_element *find_or_create_filter_element_for_id(gtid_filter_identifier); +}; + +/* + A subclass of Id_delegating_gtid_event_filter which identifies filters using the + domain id of a GTID. + + Additional helper functions include: + add_start_gtid(GTID) : adds a start GTID position to this filter, to be + identified by its domain id + add_stop_gtid(GTID) : adds a stop GTID position to this filter, to be + identified by its domain id + clear_start_gtids() : removes existing GTID start positions + clear_stop_gtids() : removes existing GTID stop positions + get_start_gtids() : gets all added GTID start positions + get_stop_gtids() : gets all added GTID stop positions + get_num_start_gtids() : gets the count of added GTID start positions + get_num_stop_gtids() : gets the count of added GTID stop positions +*/ +class Domain_gtid_event_filter : public Id_delegating_gtid_event_filter +{ +public: + Domain_gtid_event_filter() + { + my_init_dynamic_array(PSI_INSTRUMENT_ME, &m_start_filters, + sizeof(gtid_filter_element*), 8, 8, MYF(0)); + my_init_dynamic_array(PSI_INSTRUMENT_ME, &m_stop_filters, + sizeof(gtid_filter_element*), 8, 8, MYF(0)); + } + ~Domain_gtid_event_filter() + { + delete_dynamic(&m_start_filters); + delete_dynamic(&m_stop_filters); + } + + /* + Returns the domain id of from the input GTID + */ + gtid_filter_identifier get_id_from_gtid(rpl_gtid *gtid) + { + return gtid->domain_id; + } + + /* + Override Id_delegating_gtid_event_filter to extend with domain specific + filtering logic + */ + my_bool exclude(rpl_gtid*); + + /* + Validates that window filters with both a start and stop GTID satisfy + stop_gtid > start_gtid + + Returns 0 on ok, non-zero if any windows are invalid. + */ + int validate_window_filters(); + + /* + Helper function to start a GTID window filter at the given GTID + + Returns 0 on ok, non-zero on error + */ + int add_start_gtid(rpl_gtid *gtid); + + /* + Helper function to end a GTID window filter at the given GTID + + Returns 0 on ok, non-zero on error + */ + int add_stop_gtid(rpl_gtid *gtid); + + /* + If start or stop position is respecified, we remove all existing values + and start over with the new specification. + */ + void clear_start_gtids(); + void clear_stop_gtids(); + + /* + Return list of all GTIDs used as start position. + + Note that this list is allocated and it is up to the user to free it + */ + rpl_gtid *get_start_gtids(); + + /* + Return list of all GTIDs used as stop position. + + Note that this list is allocated and it is up to the user to free it + */ + rpl_gtid *get_stop_gtids(); + + size_t get_num_start_gtids() { return m_start_filters.elements; } + size_t get_num_stop_gtids() { return m_stop_filters.elements; } + +private: + DYNAMIC_ARRAY m_start_filters; + DYNAMIC_ARRAY m_stop_filters; + + Window_gtid_event_filter *find_or_create_window_filter_for_id(gtid_filter_identifier); +}; #endif /* RPL_GTID_H */ |