summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSujatha <sujatha.sivakumar@mariadb.com>2020-12-01 17:06:53 +0530
committerSujatha <sujatha.sivakumar@mariadb.com>2020-12-01 17:06:53 +0530
commit534008164128c38a98b58ead71d635e91992d0a8 (patch)
tree1163b1de622e1b4d3fc80fd5a0edc449fb589e08
parent6504d3d229b086d20e5e4798b1ae10bbbd70d3c7 (diff)
downloadmariadb-git-bb-10.1-sujatha.tar.gz
MDEV-20122: Draftbb-10.1-sujatha
-rw-r--r--sql/log.cc8
-rw-r--r--sql/log.h4
-rw-r--r--sql/rpl_gtid.cc78
-rw-r--r--sql/rpl_gtid.h13
-rw-r--r--sql/slave.cc3
-rw-r--r--sql/sql_repl.cc17
-rw-r--r--sql/sql_repl.h3
7 files changed, 120 insertions, 6 deletions
diff --git a/sql/log.cc b/sql/log.cc
index f2fe0d852d1..3a35e89c124 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -6101,6 +6101,14 @@ MYSQL_BIN_LOG::is_empty_state()
return (rpl_global_gtid_binlog_state.count() == 0);
}
+int MYSQL_BIN_LOG::set_binlog_state_used_by_master(rpl_gtid *gtid_list,
+ uint32 num_gtids,
+ char *conn_name)
+{
+ return rpl_global_gtid_binlog_state.set_binlog_state_used_by_master(gtid_list,
+ num_gtids,
+ conn_name);
+}
bool
MYSQL_BIN_LOG::find_in_binlog_state(uint32 domain_id, uint32 server_id_arg,
diff --git a/sql/log.h b/sql/log.h
index 277e5c6f69c..7cbb0b89fba 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -833,6 +833,10 @@ public:
uint64 commit_id);
int read_state_from_file();
int write_state_to_file();
+ int set_binlog_state_used_by_master(rpl_gtid *gtid_list,
+ uint32 num_gtids,
+ char *conn_name);
+ void clear_binlog_state_used_by_master();
int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size);
bool append_state_pos(String *str);
bool append_state(String *str);
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index 752b4172b6e..7f99f6f7d5a 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -27,7 +27,17 @@
#include "rpl_gtid.h"
#include "rpl_rli.h"
#include "log_event.h"
+#include "slave.h"
+#ifdef HAVE_REPLICATION
+void free_io_element(void *arg)
+{
+ struct rpl_binlog_state::io_element *e= (struct rpl_binlog_state::io_element *)arg;
+ my_free(e->conn_name);
+ my_hash_free(&e->gtid_list_hash);
+ my_free(e);
+}
+#endif
const LEX_STRING rpl_gtid_slave_state_table_name=
{ C_STRING_WITH_LEN("gtid_slave_pos") };
@@ -1149,6 +1159,11 @@ rpl_binlog_state::rpl_binlog_state()
{
my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id),
sizeof(uint32), NULL, my_free, HASH_UNIQUE);
+#ifdef HAVE_REPLICATION
+ my_hash_init(&io_thd_hash, &my_charset_bin, MAX_REPLICATION_THREAD,
+ offsetof(io_element, conn_name),
+ sizeof(char*), NULL, free_io_element, HASH_UNIQUE);
+#endif
my_init_dynamic_array(&gtid_sort_array, sizeof(rpl_gtid), 8, 8, MYF(0));
mysql_mutex_init(key_LOCK_binlog_state, &LOCK_binlog_state,
MY_MUTEX_INIT_SLOW);
@@ -1183,6 +1198,7 @@ void rpl_binlog_state::free()
initialized= 0;
reset_nolock();
my_hash_free(&hash);
+ my_hash_free(&io_thd_hash);
delete_dynamic(&gtid_sort_array);
mysql_mutex_destroy(&LOCK_binlog_state);
}
@@ -1229,7 +1245,46 @@ rpl_binlog_state::load(rpl_slave_state *slave_pos)
mysql_mutex_unlock(&LOCK_binlog_state);
return res;
}
+#ifdef HAVE_REPLICATION
+int rpl_binlog_state::set_binlog_state_used_by_master(rpl_gtid *gtid_list,
+ uint32 num_gtids,
+ char *conn_name)
+{
+ uint32 i;
+ int res= 1;
+ io_element *io_elem=NULL;
+
+ mysql_mutex_lock(&LOCK_binlog_state);
+ io_elem= (io_element *)my_malloc(sizeof(io_element), MYF(MY_WME));
+ io_elem->conn_name= my_strdup(conn_name, MYF(MY_WME));
+ my_hash_init(&io_elem->gtid_list_hash, &my_charset_bin, 32, offsetof(rpl_gtid, domain_id),
+ sizeof(uint32), NULL, NULL, HASH_UNIQUE);
+ for (i= 0; i < num_gtids; ++i)
+ {
+ rpl_gtid *gtid= (rpl_gtid *)my_malloc(sizeof(rpl_gtid), MYF(MY_WME));
+ memcpy(gtid, &gtid_list[i], sizeof(rpl_gtid));
+ if (my_hash_insert(&io_elem->gtid_list_hash, (uchar *)(gtid)))
+ goto err;
+ }
+ if (my_hash_insert(&io_thd_hash, (uchar *)(io_elem)))
+ goto err;
+ mysql_mutex_unlock(&LOCK_binlog_state);
+ res= 0;
+err:
+ return res;
+}
+
+uint32 rpl_binlog_state::is_binlog_state_used_by_master()
+{
+ return io_thd_hash.records;
+}
+void rpl_binlog_state::clear_binlog_state_used_by_master(io_element *io_elem)
+{
+ mysql_mutex_assert_owner(&LOCK_binlog_state);
+ my_hash_delete(&io_thd_hash, (uchar *)io_elem);
+}
+#endif // HAVE_REPLICATION
rpl_binlog_state::~rpl_binlog_state()
{
@@ -1293,7 +1348,12 @@ rpl_binlog_state::update_with_next_gtid(uint32 domain_id, uint32 server_id,
rpl_gtid *gtid)
{
element *elem;
+ uint32 i=0;
+#ifdef HAVE_REPLICATION
+ uint32 io_thd_count= io_thd_hash.records;
+#endif
int res= 0;
+ rpl_gtid *io_gtid;
gtid->domain_id= domain_id;
gtid->server_id= server_id;
@@ -1315,6 +1375,24 @@ rpl_binlog_state::update_with_next_gtid(uint32 domain_id, uint32 server_id,
my_error(ER_OUT_OF_RESOURCES, MYF(0));
res= 1;
end:
+#ifdef HAVE_REPLICATION
+ if (is_binlog_state_used_by_master())
+ {
+ io_thd_count= io_thd_hash.records;
+ for (i= 0; i < io_thd_count; ++i)
+ {
+ io_element *e= (io_element *)my_hash_element(&io_thd_hash, i);
+ rpl_gtid *gtid= (rpl_gtid *)my_hash_element(&e->gtid_list_hash, i);
+ if ((io_gtid= (rpl_gtid*)my_hash_search(&e->gtid_list_hash, (const uchar *)(&domain_id), 0)))
+ {
+ sql_print_warning("Connection: %s is using gtid_cur_pos for replication and "
+ "modifying domain_id:%u domain_id:%u will cause replication to break.",
+ e->conn_name, domain_id, gtid->domain_id);
+ clear_binlog_state_used_by_master(e);
+ }
+ }
+ }
+#endif
mysql_mutex_unlock(&LOCK_binlog_state);
return res;
}
diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h
index 972f1f6584b..13c7bab12a9 100644
--- a/sql/rpl_gtid.h
+++ b/sql/rpl_gtid.h
@@ -235,6 +235,7 @@ struct rpl_binlog_state
/* Mutex protecting access to the state. */
mysql_mutex_t LOCK_binlog_state;
my_bool initialized;
+ my_bool binlog_state_used_by_master;
/* Auxiliary buffer to sort gtid list. */
DYNAMIC_ARRAY gtid_sort_array;
@@ -265,6 +266,18 @@ struct rpl_binlog_state
rpl_gtid *find(uint32 domain_id, uint32 server_id);
rpl_gtid *find_most_recent(uint32 domain_id);
const char* drop_domain(DYNAMIC_ARRAY *ids, Gtid_list_log_event *glev, char*);
+#ifdef HAVE_REPLICATION
+ struct io_element {
+ char* conn_name;
+ HASH gtid_list_hash; /* Containing all rpl_gtids list in gtid_current_pos */
+ };
+ HASH io_thd_hash;
+ int set_binlog_state_used_by_master(rpl_gtid *gtid_list,
+ uint32 num_gtids,
+ char *conn_name);
+ void clear_binlog_state_used_by_master(io_element *io_elem);
+ uint32 is_binlog_state_used_by_master();
+#endif
};
diff --git a/sql/slave.cc b/sql/slave.cc
index 06f2b0d955a..32122a48fe7 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -1041,7 +1041,8 @@ int start_slave_threads(THD *thd,
sizeof(mi->rli.group_master_log_name)-1);
error= rpl_load_gtid_state(&mi->gtid_current_pos, mi->using_gtid ==
- Master_info::USE_GTID_CURRENT_POS);
+ Master_info::USE_GTID_CURRENT_POS,
+ mi->connection_name.str);
mi->events_queued_since_last_gtid= 0;
mi->gtid_reconnect_event_skip_count= 0;
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 2d644ffc5ef..68f89bad70c 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -4411,15 +4411,24 @@ rpl_append_gtid_state(String *dest, bool use_binlog)
enabled) is merged into the current GTID state (master_use_gtid=current_pos).
*/
int
-rpl_load_gtid_state(slave_connection_state *state, bool use_binlog)
+rpl_load_gtid_state(slave_connection_state *state, bool use_binlog,
+ char *conn_name)
{
int err;
rpl_gtid *gtid_list= NULL;
uint32 num_gtids= 0;
- if (use_binlog && opt_bin_log &&
- (err= mysql_bin_log.get_most_recent_gtid_list(&gtid_list, &num_gtids)))
- return err;
+ if (use_binlog && opt_bin_log)
+ {
+ if ((err= mysql_bin_log.get_most_recent_gtid_list(&gtid_list, &num_gtids)))
+ return err;
+ if ((err= mysql_bin_log.set_binlog_state_used_by_master(gtid_list, num_gtids,
+ conn_name)))
+ {
+ my_free(gtid_list);
+ return err;
+ }
+ }
err= state->load(rpl_global_gtid_slave_state, gtid_list, num_gtids);
my_free(gtid_list);
diff --git a/sql/sql_repl.h b/sql/sql_repl.h
index 6a50cc66a3a..4061de518eb 100644
--- a/sql/sql_repl.h
+++ b/sql/sql_repl.h
@@ -79,7 +79,8 @@ void rpl_init_gtid_waiting();
void rpl_deinit_gtid_waiting();
int gtid_state_from_binlog_pos(const char *name, uint32 pos, String *out_str);
int rpl_append_gtid_state(String *dest, bool use_binlog);
-int rpl_load_gtid_state(slave_connection_state *state, bool use_binlog);
+int rpl_load_gtid_state(slave_connection_state *state, bool use_binlog,
+ char *conn_name);
bool rpl_gtid_pos_check(THD *thd, char *str, size_t len);
bool rpl_gtid_pos_update(THD *thd, char *str, size_t len);
#else