diff options
Diffstat (limited to 'sql/rpl_gtid.cc')
-rw-r--r-- | sql/rpl_gtid.cc | 402 |
1 files changed, 400 insertions, 2 deletions
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc index 8b10703fdc2..52e14714a9b 100644 --- a/sql/rpl_gtid.cc +++ b/sql/rpl_gtid.cc @@ -17,6 +17,7 @@ /* Definitions for MariaDB global transaction ID (GTID). */ +#ifndef MYSQL_CLIENT #include "mariadb.h" #include "sql_priv.h" #include "unireg.h" @@ -1268,6 +1269,7 @@ rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid) return true; } +#endif /* Parse a GTID at the start of a string, and update the pointer to point @@ -1305,9 +1307,32 @@ gtid_parser_helper(const char **ptr, const char *end, rpl_gtid *out_gtid) return 0; } +/* + Unpack a GTID at the start of a string, and update the pointer to point + at the first character after the unpacked GTID. + + Returns 0 on ok, non-zero on parse error. +*/ +static int +gtid_unpack_helper(const char **ptr, const char *end, rpl_gtid *out_gtid) +{ + const char *p= *ptr; + + if (p[4] != '-' || p[9] != '-') + return 1; + + out_gtid->domain_id= (uint32)uint4korr(p); + out_gtid->server_id= (uint32)uint4korr(&p[5]); + out_gtid->seq_no= (uint64)uint8korr(&p[10]); + + *ptr= p + 18; + return 0; +} rpl_gtid * -gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len) +gtid_read_to_list(const char *str, size_t str_len, uint32 *out_len, + int reader_f(const char **ptr, const char *end, + rpl_gtid *out_gtid)) { const char *p= const_cast<char *>(str); const char *end= p + str_len; @@ -1318,7 +1343,7 @@ gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len) { rpl_gtid gtid; - if (len >= (((uint32)1 << 28)-1) || gtid_parser_helper(&p, end, >id)) + if (len >= (((uint32)1 << 28)-1) || reader_f(&p, end, >id)) { my_free(list); return NULL; @@ -1345,6 +1370,20 @@ gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len) } +rpl_gtid * +gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len) +{ + return gtid_read_to_list(str, str_len, out_len, gtid_parser_helper); +} + +rpl_gtid * +gtid_unpack_string_to_list(const char *str, size_t str_len, uint32 *out_len) +{ + return gtid_read_to_list(str, str_len, out_len, gtid_unpack_helper); +} + +#ifndef MYSQL_CLIENT + /* Update the slave replication state with the GTID position obtained from master when connecting with old-style (filename,offset) position. @@ -2952,3 +2991,362 @@ gtid_waiting::remove_from_wait_queue(gtid_waiting::hash_element *he, queue_remove(&he->queue, elem->queue_idx); } + +#endif + +Window_gtid_event_filter::Window_gtid_event_filter() : + m_has_start(FALSE), + m_has_stop(FALSE), + m_is_active(FALSE), + m_has_passed(FALSE) + { + // m_start and m_stop do not need initial values if unused + } + +Window_gtid_event_filter::Window_gtid_event_filter(rpl_gtid *start, rpl_gtid *stop) : + m_is_active(FALSE), + m_has_passed(FALSE) +{ + DBUG_ASSERT(start->domain_id == stop->domain_id); + + m_is_active= FALSE; + m_has_passed= FALSE; + + m_has_start= TRUE; + m_start.domain_id= start->domain_id; + m_start.server_id= start->server_id; + m_start.seq_no= start->seq_no; + + m_has_stop= TRUE; + m_stop.domain_id= stop->domain_id; + m_stop.server_id= stop->server_id; + m_stop.seq_no= stop->seq_no; +} + +my_bool Window_gtid_event_filter::set_start_gtid(rpl_gtid *start) +{ + if (m_has_start) + return FALSE; + + // Copy values + m_has_start= TRUE; + m_start.domain_id= start->domain_id; + m_start.server_id= start->server_id; + m_start.seq_no= start->seq_no; + + return TRUE; +} + +my_bool Window_gtid_event_filter::set_stop_gtid(rpl_gtid *stop) +{ + if (m_has_stop) + return FALSE; + + // Copy values + m_has_stop= TRUE; + m_stop.domain_id= stop->domain_id; + m_stop.server_id= stop->server_id; + m_stop.seq_no= stop->seq_no; + + return TRUE; +} + +gtid_filter_identifier Window_gtid_event_filter::get_filter_identifier() +{ + DBUG_ASSERT(m_has_start || m_has_stop); + if (m_has_start) + return m_start.domain_id; + else + return m_stop.domain_id; +} + +static inline my_bool is_gtid_at_or_after(rpl_gtid *boundary, + rpl_gtid *test_gtid) +{ + return test_gtid->domain_id == boundary->domain_id && + test_gtid->server_id == boundary->server_id && + test_gtid->seq_no >= boundary->seq_no; +} + +static inline my_bool is_gtid_before(rpl_gtid *boundary, + rpl_gtid *test_gtid) +{ + return test_gtid->domain_id == boundary->domain_id && + test_gtid->server_id == boundary->server_id && + test_gtid->seq_no <= boundary->seq_no; +} + +my_bool Window_gtid_event_filter::exclude(rpl_gtid *gtid) +{ + /* Assume result should be excluded to start */ + my_bool should_exclude= TRUE; + + DBUG_ASSERT((m_has_start || m_has_stop) && + (gtid->domain_id == m_start.domain_id || + gtid->domain_id == m_stop.domain_id)); + + if (!m_is_active && !m_has_passed) + { + /* + This filter has not yet been activated. Check if the gtid is within the + bounds of this window. + */ + + if (!m_has_start) + { + /* + Start GTID was not provided, so we want to include everything up to m_stop + */ + m_is_active= TRUE; + should_exclude= FALSE; + } + else if (is_gtid_at_or_after(&m_start, gtid)) + { + m_is_active= TRUE; + + DBUG_PRINT("gtid-event-filter", + ("Window: Begin (%d-%d-%d, %d-%d-%llu]", m_start.domain_id, + m_start.server_id, m_start.seq_no, m_stop.domain_id, + m_stop.server_id, m_stop.seq_no)); + + /* + As the start of the range is exclusive, if this gtid is the start of + the range, exclude it + */ + if (gtid->seq_no == m_start.seq_no && + gtid->server_id == m_start.server_id) + should_exclude= TRUE; + else + should_exclude= FALSE; + } + } /* if (!m_is_active && !m_has_passed) */ + else if (m_is_active && !m_has_passed) + { + /* + This window is currently active so we want the event group to be included + in the results. Additionally check if we are at the end of the window. + If no end of the window is provided, go indefinitely + */ + should_exclude= FALSE; + + if (m_has_stop && is_gtid_at_or_after(&m_stop, gtid)) + { + DBUG_PRINT("gtid-event-filter", + ("Window: End (%d-%d-%d, %d-%d-%llu]", m_start.domain_id, + m_start.server_id, m_start.seq_no, m_stop.domain_id, + m_stop.server_id, m_stop.seq_no)); + m_is_active= FALSE; + m_has_passed= TRUE; + + if (gtid->server_id == m_stop.server_id && gtid->seq_no > m_stop.seq_no) + { + /* + The GTID is after the finite stop of the window, don't let it pass + through + */ + should_exclude= TRUE; + } + } + else if (m_has_start && is_gtid_before(&m_start, gtid)) + { + /* + Out of order check, the window is active but this GTID takes place + before the window begins. keep the window active, but exclude it from + passing through. + */ + should_exclude= TRUE; + } + } + else if (m_has_passed && m_has_stop && is_gtid_before(&m_stop, gtid)) + { + /* Test if events are out of order */ + if (!m_has_start || (m_has_start && is_gtid_at_or_after(&m_start, gtid))) + { + /* + The filter window has closed because it has seen a GTID higher than its + end boundary; however, this GTID is out of order and should be passed + through. + */ + should_exclude= TRUE; + } + } + + return should_exclude; +} + +Delegating_gtid_event_filter::Delegating_gtid_event_filter() +{ + uint32 i; + + m_filter_id_mask= 0xf; + + m_filters_by_id= (gtid_filter_element **) my_malloc( + PSI_NOT_INSTRUMENTED, + (m_filter_id_mask + 1) * sizeof(gtid_filter_element *), + MYF(MY_WME) + ); + + DBUG_ASSERT(m_filters_by_id != NULL); + + for (i = 0; i <= m_filter_id_mask; i++) + { + m_filters_by_id[i]= NULL; + } + + m_default_filter= new Reject_all_gtid_filter(); +} + +/* + Deconstructor deletes: + 1) All Identifiable_gtid_event_filters added + 2) All gtid_filter_element allocations +*/ +Delegating_gtid_event_filter::~Delegating_gtid_event_filter() +{ + uint32 i; + for (i = 0; i <= m_filter_id_mask; i++) + { + gtid_filter_element *filter_element= m_filters_by_id[i], + *filter_element_to_del= NULL; + while(filter_element) + { + filter_element_to_del= filter_element; + filter_element= filter_element->next; + delete filter_element_to_del->filter; + my_free(filter_element_to_del); + } + } + my_free(m_filters_by_id); + + delete m_default_filter; +} + +void Delegating_gtid_event_filter::set_default_filter(Gtid_event_filter *filter) +{ + if (m_default_filter) + delete m_default_filter; + + m_default_filter= filter; +} + +gtid_filter_element * +Delegating_gtid_event_filter::try_find_filter_element_for_id( + gtid_filter_identifier filter_id) +{ + // Add this into the domain id list + uint32 map_idx= filter_id & m_filter_id_mask; + gtid_filter_element *filter_idx= m_filters_by_id[map_idx]; + + /* Find list index to add this filter */ + while (filter_idx) + { + if (filter_idx->identifier == filter_id) + break; + filter_idx= filter_idx->next; + } + + return filter_idx; +} + +gtid_filter_element * +Delegating_gtid_event_filter::find_or_create_filter_element_for_id( + gtid_filter_identifier filter_id) +{ + // Add this into the domain id list + uint32 map_idx= filter_id & m_filter_id_mask; + gtid_filter_element *filter_idx= m_filters_by_id[map_idx], + *prev_idx= NULL; + + /* Find list index to add this filter */ + while (filter_idx) + { + prev_idx= filter_idx; + if (filter_idx->identifier == filter_id) + { + break; + } + prev_idx= filter_idx; + filter_idx= filter_idx->next; + } + + if (filter_idx == NULL) + { + // No other domain ids have filters that index here, create this one + filter_idx= (gtid_filter_element *) my_malloc( + PSI_NOT_INSTRUMENTED, sizeof(gtid_filter_element), MYF(MY_WME)); + filter_idx->identifier= filter_id; + filter_idx->next= NULL; + filter_idx->filter= NULL; + + if (prev_idx == NULL) + { + // This is the first filter in the bucket + m_filters_by_id[map_idx]= filter_idx; + } + else + { + // End of list, append filter list to tail + prev_idx->next= filter_idx; + } + } + + return filter_idx; +} + +my_bool Delegating_gtid_event_filter::exclude(rpl_gtid *gtid) +{ + Gtid_event_filter *filter; + gtid_filter_identifier filter_id= get_id_from_gtid(gtid); + gtid_filter_element *filter_element= try_find_filter_element_for_id(filter_id); + if (filter_element) + { + filter= filter_element->filter; + } + else + { + filter= m_default_filter; + } + + return filter->exclude(gtid); +} + +Window_gtid_event_filter * +Domain_gtid_event_filter::find_or_create_window_filter_for_id( + uint32 domain_id) +{ + gtid_filter_element *filter_element= + find_or_create_filter_element_for_id(domain_id); + Window_gtid_event_filter *wgef= NULL; + + if (filter_element->filter == NULL) + { + // New filter + wgef= new Window_gtid_event_filter(); + filter_element->filter= wgef; + } + else if (filter_element->filter->get_filter_type() == WINDOW_GTID_FILTER_TYPE) + { + // We have an existing window filter here + wgef= (Window_gtid_event_filter *) filter_element->filter; + } + /* + Else: We have an existing filter but it is not of window type so propogate + NULL filter + */ + + return wgef; +} + +my_bool Domain_gtid_event_filter::add_start_gtid(rpl_gtid *gtid) +{ + Window_gtid_event_filter *filter_to_update= + find_or_create_window_filter_for_id(gtid->domain_id); + return filter_to_update->set_start_gtid(gtid); +} + +my_bool Domain_gtid_event_filter::add_stop_gtid(rpl_gtid *gtid) +{ + Window_gtid_event_filter *filter_to_update= + find_or_create_window_filter_for_id(gtid->domain_id); + return filter_to_update->set_stop_gtid(gtid); +}
\ No newline at end of file |