diff options
author | Sujatha <sujatha.sivakumar@mariadb.com> | 2020-12-01 17:06:53 +0530 |
---|---|---|
committer | Sujatha <sujatha.sivakumar@mariadb.com> | 2020-12-01 17:06:53 +0530 |
commit | 534008164128c38a98b58ead71d635e91992d0a8 (patch) | |
tree | 1163b1de622e1b4d3fc80fd5a0edc449fb589e08 | |
parent | 6504d3d229b086d20e5e4798b1ae10bbbd70d3c7 (diff) | |
download | mariadb-git-bb-10.1-sujatha.tar.gz |
MDEV-20122: Draftbb-10.1-sujatha
-rw-r--r-- | sql/log.cc | 8 | ||||
-rw-r--r-- | sql/log.h | 4 | ||||
-rw-r--r-- | sql/rpl_gtid.cc | 78 | ||||
-rw-r--r-- | sql/rpl_gtid.h | 13 | ||||
-rw-r--r-- | sql/slave.cc | 3 | ||||
-rw-r--r-- | sql/sql_repl.cc | 17 | ||||
-rw-r--r-- | sql/sql_repl.h | 3 |
7 files changed, 120 insertions, 6 deletions
diff --git a/sql/log.cc b/sql/log.cc index f2fe0d852d1..3a35e89c124 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -6101,6 +6101,14 @@ MYSQL_BIN_LOG::is_empty_state() return (rpl_global_gtid_binlog_state.count() == 0); } +int MYSQL_BIN_LOG::set_binlog_state_used_by_master(rpl_gtid *gtid_list, + uint32 num_gtids, + char *conn_name) +{ + return rpl_global_gtid_binlog_state.set_binlog_state_used_by_master(gtid_list, + num_gtids, + conn_name); +} bool MYSQL_BIN_LOG::find_in_binlog_state(uint32 domain_id, uint32 server_id_arg, diff --git a/sql/log.h b/sql/log.h index 277e5c6f69c..7cbb0b89fba 100644 --- a/sql/log.h +++ b/sql/log.h @@ -833,6 +833,10 @@ public: uint64 commit_id); int read_state_from_file(); int write_state_to_file(); + int set_binlog_state_used_by_master(rpl_gtid *gtid_list, + uint32 num_gtids, + char *conn_name); + void clear_binlog_state_used_by_master(); int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size); bool append_state_pos(String *str); bool append_state(String *str); diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc index 752b4172b6e..7f99f6f7d5a 100644 --- a/sql/rpl_gtid.cc +++ b/sql/rpl_gtid.cc @@ -27,7 +27,17 @@ #include "rpl_gtid.h" #include "rpl_rli.h" #include "log_event.h" +#include "slave.h" +#ifdef HAVE_REPLICATION +void free_io_element(void *arg) +{ + struct rpl_binlog_state::io_element *e= (struct rpl_binlog_state::io_element *)arg; + my_free(e->conn_name); + my_hash_free(&e->gtid_list_hash); + my_free(e); +} +#endif const LEX_STRING rpl_gtid_slave_state_table_name= { C_STRING_WITH_LEN("gtid_slave_pos") }; @@ -1149,6 +1159,11 @@ rpl_binlog_state::rpl_binlog_state() { my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id), sizeof(uint32), NULL, my_free, HASH_UNIQUE); +#ifdef HAVE_REPLICATION + my_hash_init(&io_thd_hash, &my_charset_bin, MAX_REPLICATION_THREAD, + offsetof(io_element, conn_name), + sizeof(char*), NULL, free_io_element, HASH_UNIQUE); +#endif my_init_dynamic_array(>id_sort_array, sizeof(rpl_gtid), 8, 8, MYF(0)); mysql_mutex_init(key_LOCK_binlog_state, &LOCK_binlog_state, MY_MUTEX_INIT_SLOW); @@ -1183,6 +1198,7 @@ void rpl_binlog_state::free() initialized= 0; reset_nolock(); my_hash_free(&hash); + my_hash_free(&io_thd_hash); delete_dynamic(>id_sort_array); mysql_mutex_destroy(&LOCK_binlog_state); } @@ -1229,7 +1245,46 @@ rpl_binlog_state::load(rpl_slave_state *slave_pos) mysql_mutex_unlock(&LOCK_binlog_state); return res; } +#ifdef HAVE_REPLICATION +int rpl_binlog_state::set_binlog_state_used_by_master(rpl_gtid *gtid_list, + uint32 num_gtids, + char *conn_name) +{ + uint32 i; + int res= 1; + io_element *io_elem=NULL; + + mysql_mutex_lock(&LOCK_binlog_state); + io_elem= (io_element *)my_malloc(sizeof(io_element), MYF(MY_WME)); + io_elem->conn_name= my_strdup(conn_name, MYF(MY_WME)); + my_hash_init(&io_elem->gtid_list_hash, &my_charset_bin, 32, offsetof(rpl_gtid, domain_id), + sizeof(uint32), NULL, NULL, HASH_UNIQUE); + for (i= 0; i < num_gtids; ++i) + { + rpl_gtid *gtid= (rpl_gtid *)my_malloc(sizeof(rpl_gtid), MYF(MY_WME)); + memcpy(gtid, >id_list[i], sizeof(rpl_gtid)); + if (my_hash_insert(&io_elem->gtid_list_hash, (uchar *)(gtid))) + goto err; + } + if (my_hash_insert(&io_thd_hash, (uchar *)(io_elem))) + goto err; + mysql_mutex_unlock(&LOCK_binlog_state); + res= 0; +err: + return res; +} + +uint32 rpl_binlog_state::is_binlog_state_used_by_master() +{ + return io_thd_hash.records; +} +void rpl_binlog_state::clear_binlog_state_used_by_master(io_element *io_elem) +{ + mysql_mutex_assert_owner(&LOCK_binlog_state); + my_hash_delete(&io_thd_hash, (uchar *)io_elem); +} +#endif // HAVE_REPLICATION rpl_binlog_state::~rpl_binlog_state() { @@ -1293,7 +1348,12 @@ rpl_binlog_state::update_with_next_gtid(uint32 domain_id, uint32 server_id, rpl_gtid *gtid) { element *elem; + uint32 i=0; +#ifdef HAVE_REPLICATION + uint32 io_thd_count= io_thd_hash.records; +#endif int res= 0; + rpl_gtid *io_gtid; gtid->domain_id= domain_id; gtid->server_id= server_id; @@ -1315,6 +1375,24 @@ rpl_binlog_state::update_with_next_gtid(uint32 domain_id, uint32 server_id, my_error(ER_OUT_OF_RESOURCES, MYF(0)); res= 1; end: +#ifdef HAVE_REPLICATION + if (is_binlog_state_used_by_master()) + { + io_thd_count= io_thd_hash.records; + for (i= 0; i < io_thd_count; ++i) + { + io_element *e= (io_element *)my_hash_element(&io_thd_hash, i); + rpl_gtid *gtid= (rpl_gtid *)my_hash_element(&e->gtid_list_hash, i); + if ((io_gtid= (rpl_gtid*)my_hash_search(&e->gtid_list_hash, (const uchar *)(&domain_id), 0))) + { + sql_print_warning("Connection: %s is using gtid_cur_pos for replication and " + "modifying domain_id:%u domain_id:%u will cause replication to break.", + e->conn_name, domain_id, gtid->domain_id); + clear_binlog_state_used_by_master(e); + } + } + } +#endif mysql_mutex_unlock(&LOCK_binlog_state); return res; } diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h index 972f1f6584b..13c7bab12a9 100644 --- a/sql/rpl_gtid.h +++ b/sql/rpl_gtid.h @@ -235,6 +235,7 @@ struct rpl_binlog_state /* Mutex protecting access to the state. */ mysql_mutex_t LOCK_binlog_state; my_bool initialized; + my_bool binlog_state_used_by_master; /* Auxiliary buffer to sort gtid list. */ DYNAMIC_ARRAY gtid_sort_array; @@ -265,6 +266,18 @@ struct rpl_binlog_state rpl_gtid *find(uint32 domain_id, uint32 server_id); rpl_gtid *find_most_recent(uint32 domain_id); const char* drop_domain(DYNAMIC_ARRAY *ids, Gtid_list_log_event *glev, char*); +#ifdef HAVE_REPLICATION + struct io_element { + char* conn_name; + HASH gtid_list_hash; /* Containing all rpl_gtids list in gtid_current_pos */ + }; + HASH io_thd_hash; + int set_binlog_state_used_by_master(rpl_gtid *gtid_list, + uint32 num_gtids, + char *conn_name); + void clear_binlog_state_used_by_master(io_element *io_elem); + uint32 is_binlog_state_used_by_master(); +#endif }; diff --git a/sql/slave.cc b/sql/slave.cc index 06f2b0d955a..32122a48fe7 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -1041,7 +1041,8 @@ int start_slave_threads(THD *thd, sizeof(mi->rli.group_master_log_name)-1); error= rpl_load_gtid_state(&mi->gtid_current_pos, mi->using_gtid == - Master_info::USE_GTID_CURRENT_POS); + Master_info::USE_GTID_CURRENT_POS, + mi->connection_name.str); mi->events_queued_since_last_gtid= 0; mi->gtid_reconnect_event_skip_count= 0; diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 2d644ffc5ef..68f89bad70c 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -4411,15 +4411,24 @@ rpl_append_gtid_state(String *dest, bool use_binlog) enabled) is merged into the current GTID state (master_use_gtid=current_pos). */ int -rpl_load_gtid_state(slave_connection_state *state, bool use_binlog) +rpl_load_gtid_state(slave_connection_state *state, bool use_binlog, + char *conn_name) { int err; rpl_gtid *gtid_list= NULL; uint32 num_gtids= 0; - if (use_binlog && opt_bin_log && - (err= mysql_bin_log.get_most_recent_gtid_list(>id_list, &num_gtids))) - return err; + if (use_binlog && opt_bin_log) + { + if ((err= mysql_bin_log.get_most_recent_gtid_list(>id_list, &num_gtids))) + return err; + if ((err= mysql_bin_log.set_binlog_state_used_by_master(gtid_list, num_gtids, + conn_name))) + { + my_free(gtid_list); + return err; + } + } err= state->load(rpl_global_gtid_slave_state, gtid_list, num_gtids); my_free(gtid_list); diff --git a/sql/sql_repl.h b/sql/sql_repl.h index 6a50cc66a3a..4061de518eb 100644 --- a/sql/sql_repl.h +++ b/sql/sql_repl.h @@ -79,7 +79,8 @@ void rpl_init_gtid_waiting(); void rpl_deinit_gtid_waiting(); int gtid_state_from_binlog_pos(const char *name, uint32 pos, String *out_str); int rpl_append_gtid_state(String *dest, bool use_binlog); -int rpl_load_gtid_state(slave_connection_state *state, bool use_binlog); +int rpl_load_gtid_state(slave_connection_state *state, bool use_binlog, + char *conn_name); bool rpl_gtid_pos_check(THD *thd, char *str, size_t len); bool rpl_gtid_pos_update(THD *thd, char *str, size_t len); #else |