summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2014-03-12 00:14:49 +0100
committerunknown <knielsen@knielsen-hq.org>2014-03-12 00:14:49 +0100
commit8b9b7ec395df111f886224a565b63a1a312e5679 (patch)
treea44e2719f727964a854ed9c45e26e7314f6b62a4 /sql
parent2c2478b82260f5110ea2c5bed3c6c7bcd3558453 (diff)
downloadmariadb-git-8b9b7ec395df111f886224a565b63a1a312e5679.tar.gz
MDEV-5804: If same GTID is received on multiple master connections in multi-source replication, the event is double-executed causing corruption or replication failure
Some fixes, mainly to make it work in non-parallel replication mode also (--slave-parallel-threads=0). Patch should be fairly complete now.
Diffstat (limited to 'sql')
-rw-r--r--sql/log_event.cc4
-rw-r--r--sql/mysqld.cc4
-rw-r--r--sql/mysqld.h1
-rw-r--r--sql/rpl_gtid.cc107
-rw-r--r--sql/rpl_gtid.h9
-rw-r--r--sql/rpl_parallel.cc18
-rw-r--r--sql/rpl_rli.cc8
-rw-r--r--sql/rpl_rli.h14
-rw-r--r--sql/slave.cc50
-rw-r--r--sql/sys_vars.cc6
10 files changed, 177 insertions, 44 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 98524d73433..c769c5b9209 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -4440,7 +4440,7 @@ Default database: '%s'. Query: '%s'",
end:
if (sub_id && !thd->is_slave_error)
- rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid, rli);
+ rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid, rgi);
/*
Probably we have set thd->query, thd->db, thd->catalog to point to places
@@ -7327,7 +7327,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
thd->mdl_context.release_transactional_locks();
if (!res && sub_id)
- rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid, rli);
+ rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid, rgi);
/*
Increment the global status commit count variable
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index d292cc86cfb..774f9531893 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -9447,6 +9447,7 @@ PSI_stage_info stage_waiting_for_prior_transaction_to_commit= { 0, "Waiting for
PSI_stage_info stage_waiting_for_room_in_worker_thread= { 0, "Waiting for room in worker thread event queue", 0};
PSI_stage_info stage_master_gtid_wait_primary= { 0, "Waiting in MASTER_GTID_WAIT() (primary waiter)", 0};
PSI_stage_info stage_master_gtid_wait= { 0, "Waiting in MASTER_GTID_WAIT()", 0};
+PSI_stage_info stage_gtid_wait_other_connection= { 0, "Waiting for other master connection to process GTID received on multiple master connections", 0};
#ifdef HAVE_PSI_INTERFACE
@@ -9565,7 +9566,8 @@ PSI_stage_info *all_server_stages[]=
& stage_waiting_to_finalize_termination,
& stage_waiting_to_get_readlock,
& stage_master_gtid_wait_primary,
- & stage_master_gtid_wait
+ & stage_master_gtid_wait,
+ & stage_gtid_wait_other_connection
};
PSI_socket_key key_socket_tcpip, key_socket_unix, key_socket_client_connection;
diff --git a/sql/mysqld.h b/sql/mysqld.h
index 28b9c061945..fa79dd2d11d 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -441,6 +441,7 @@ extern PSI_stage_info stage_waiting_for_prior_transaction_to_commit;
extern PSI_stage_info stage_waiting_for_room_in_worker_thread;
extern PSI_stage_info stage_master_gtid_wait_primary;
extern PSI_stage_info stage_master_gtid_wait;
+extern PSI_stage_info stage_gtid_wait_other_connection;
#ifdef HAVE_PSI_STATEMENT_INTERFACE
/**
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index b66651ae5fd..17c3b15c902 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -34,7 +34,7 @@ const LEX_STRING rpl_gtid_slave_state_table_name=
void
rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid,
- const Relay_log_info *rli)
+ rpl_group_info *rgi)
{
int err;
/*
@@ -45,7 +45,7 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid,
it is even committed.
*/
mysql_mutex_lock(&LOCK_slave_state);
- err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, rli);
+ err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, rgi);
mysql_mutex_unlock(&LOCK_slave_state);
if (err)
{
@@ -75,9 +75,13 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
if ((sub_id= rgi->gtid_sub_id))
{
rgi->gtid_sub_id= 0;
- if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false))
- DBUG_RETURN(1);
- update_state_hash(sub_id, &rgi->current_gtid, rgi->rli);
+ if (rgi->gtid_ignore_duplicate_state!=rpl_group_info::GTID_DUPLICATE_IGNORE)
+ {
+ if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false))
+ DBUG_RETURN(1);
+ update_state_hash(sub_id, &rgi->current_gtid, rgi);
+ }
+ rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
}
DBUG_RETURN(0);
}
@@ -110,16 +114,21 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
-1 Error (out of memory to allocate a new element for the domain).
*/
int
-rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli)
+rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, rpl_group_info *rgi)
{
uint32 domain_id= gtid->domain_id;
uint32 seq_no= gtid->seq_no;
rpl_slave_state::element *elem;
int res;
+ bool did_enter_cond;
+ PSI_stage_info old_stage;
+ THD *thd;
+ Relay_log_info *rli= rgi->rli;
mysql_mutex_lock(&LOCK_slave_state);
if (!(elem= get_element(domain_id)))
{
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
res= -1;
goto err;
}
@@ -129,13 +138,14 @@ rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli)
each lock release and re-take.
*/
- /* ToDo: Make this wait killable. */
+ did_enter_cond= false;
for (;;)
{
if (elem->highest_seq_no >= seq_no)
{
/* This sequence number is already applied, ignore it. */
res= 0;
+ rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_IGNORE;
break;
}
if (!elem->owner_rli)
@@ -143,6 +153,7 @@ rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli)
/* The domain became free, grab it and apply the event. */
elem->owner_rli= rli;
elem->owner_count= 1;
+ rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_OWNER;
res= 1;
break;
}
@@ -150,23 +161,78 @@ rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli)
{
/* Already own this domain, increment reference count and apply event. */
++elem->owner_count;
+ rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_OWNER;
res= 1;
break;
}
+ thd= rgi->thd;
+ if (thd->check_killed())
+ {
+ thd->send_kill_message();
+ res= -1;
+ break;
+ }
/*
Someone else is currently processing this GTID (or an earlier one).
Wait for them to complete (or fail), and then check again.
*/
+ if (!did_enter_cond)
+ {
+ thd->ENTER_COND(&elem->COND_gtid_ignore_duplicates, &LOCK_slave_state,
+ &stage_gtid_wait_other_connection, &old_stage);
+ did_enter_cond= true;
+ }
mysql_cond_wait(&elem->COND_gtid_ignore_duplicates,
&LOCK_slave_state);
}
err:
- mysql_mutex_unlock(&LOCK_slave_state);
+ if (did_enter_cond)
+ thd->EXIT_COND(&old_stage);
+ else
+ mysql_mutex_unlock(&LOCK_slave_state);
return res;
}
+void
+rpl_slave_state::release_domain_owner(rpl_group_info *rgi)
+{
+ element *elem= NULL;
+
+ mysql_mutex_lock(&LOCK_slave_state);
+ if (!(elem= get_element(rgi->current_gtid.domain_id)))
+ {
+ /*
+ We cannot really deal with error here, as we are already called in an
+ error handling case (transaction failure and rollback).
+
+ However, get_element() only fails if the element did not exist already
+ and could not be allocated due to out-of-memory - and if it did not
+ exist, then we would not get here in the first place.
+ */
+ mysql_mutex_unlock(&LOCK_slave_state);
+ return;
+ }
+
+ if (rgi->gtid_ignore_duplicate_state == rpl_group_info::GTID_DUPLICATE_OWNER)
+ {
+ uint32 count= elem->owner_count;
+ DBUG_ASSERT(count > 0);
+ DBUG_ASSERT(elem->owner_rli == rgi->rli);
+ --count;
+ elem->owner_count= count;
+ if (count == 0)
+ {
+ elem->owner_rli= NULL;
+ mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates);
+ }
+ }
+ rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
+ mysql_mutex_unlock(&LOCK_slave_state);
+}
+
+
static void
rpl_slave_state_free_element(void *arg)
{
@@ -233,7 +299,7 @@ rpl_slave_state::deinit()
int
rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
- uint64 seq_no, const Relay_log_info *rli)
+ uint64 seq_no, rpl_group_info *rgi)
{
element *elem= NULL;
list_element *list_elem= NULL;
@@ -256,18 +322,23 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
mysql_cond_broadcast(&elem->COND_wait_gtid);
}
- if (opt_gtid_ignore_duplicates && rli)
+ if (rgi)
{
- uint32 count= elem->owner_count;
- DBUG_ASSERT(count > 0);
- DBUG_ASSERT(elem->owner_rli == rli);
- --count;
- elem->owner_count= count;
- if (count == 0)
+ if (rgi->gtid_ignore_duplicate_state==rpl_group_info::GTID_DUPLICATE_OWNER)
{
- elem->owner_rli= NULL;
- mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates);
+ Relay_log_info *rli= rgi->rli;
+ uint32 count= elem->owner_count;
+ DBUG_ASSERT(count > 0);
+ DBUG_ASSERT(elem->owner_rli == rli);
+ --count;
+ elem->owner_count= count;
+ if (count == 0)
+ {
+ elem->owner_rli= NULL;
+ mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates);
+ }
}
+ rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
}
if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME))))
diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h
index aef1ca9e403..3e9e2fce25f 100644
--- a/sql/rpl_gtid.h
+++ b/sql/rpl_gtid.h
@@ -92,6 +92,7 @@ struct gtid_waiting {
class Relay_log_info;
+struct rpl_group_info;
/*
Replication slave state.
@@ -171,7 +172,7 @@ struct rpl_slave_state
void truncate_hash();
ulong count() const { return hash.records; }
int update(uint32 domain_id, uint32 server_id, uint64 sub_id,
- uint64 seq_no, const Relay_log_info *rli);
+ uint64 seq_no, rpl_group_info *rgi);
int truncate_state_table(THD *thd);
int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
bool in_transaction, bool in_statement);
@@ -187,10 +188,10 @@ struct rpl_slave_state
element *get_element(uint32 domain_id);
int put_back_list(uint32 domain_id, list_element *list);
- void update_state_hash(uint64 sub_id, rpl_gtid *gtid,
- const Relay_log_info *rli);
+ void update_state_hash(uint64 sub_id, rpl_gtid *gtid, rpl_group_info *rgi);
int record_and_update_gtid(THD *thd, struct rpl_group_info *rgi);
- int check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli);
+ int check_duplicate_gtid(rpl_gtid *gtid, rpl_group_info *rgi);
+ void release_domain_owner(rpl_group_info *rgi);
};
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index d1e0ca518f1..3218acf2525 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -425,10 +425,22 @@ handle_rpl_parallel_thread(void *arg)
{
int res=
rpl_global_gtid_slave_state.check_duplicate_gtid(&rgi->current_gtid,
- rgi->rli);
- /* ToDo: Handle res==-1 error. */
- if (!res)
+ rgi);
+ if (res < 0)
+ {
+ /* Error. */
+ slave_output_error_info(rgi->rli, thd);
+ signal_error_to_sql_driver_thread(thd, rgi);
+ }
+ else if (!res)
+ {
+ /* GTID already applied by another master connection, skip. */
skip_event_group= true;
+ }
+ else
+ {
+ /* We have to apply the event. */
+ }
}
}
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 020f984ad50..c9b4153d28e 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -1493,6 +1493,7 @@ rpl_group_info::reinit(Relay_log_info *rli)
row_stmt_start_timestamp= 0;
long_find_row_note_printed= false;
did_mark_start_commit= false;
+ gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL;
commit_orderer.reinit();
}
@@ -1632,6 +1633,13 @@ void rpl_group_info::cleanup_context(THD *thd, bool error)
thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
/*
+ Ensure we always release the domain for others to process, when using
+ --gtid-ignore-duplicates.
+ */
+ if (gtid_ignore_duplicate_state != GTID_DUPLICATE_NULL)
+ rpl_global_gtid_slave_state.release_domain_owner(this);
+
+ /*
Reset state related to long_find_row notes in the error log:
- timestamp
- flag that decides whether the slave prints or not
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 6db4ce5d61b..100ce25fe9c 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -575,6 +575,20 @@ struct rpl_group_info
counting one event group twice.
*/
bool did_mark_start_commit;
+ enum {
+ GTID_DUPLICATE_NULL=0,
+ GTID_DUPLICATE_IGNORE=1,
+ GTID_DUPLICATE_OWNER=2
+ };
+ /*
+ When --gtid-ignore-duplicates, this is set to one of the above three
+ values:
+ GTID_DUPLICATE_NULL - Not using --gtid-ignore-duplicates.
+ GTID_DUPLICATE_IGNORE - This gtid already applied, skip the event group.
+ GTID_DUPLICATE_OWNER - We are the current owner of the domain, and must
+ apply the event group and then release the domain.
+ */
+ uint8 gtid_ignore_duplicate_state;
/*
Runtime state for printing a note when slave is taking
diff --git a/sql/slave.cc b/sql/slave.cc
index cf741ccedc0..79a2c4ccd25 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -3508,18 +3508,46 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
*/
}
- /*
- For GTID, allocate a new sub_id for the given domain_id.
- The sub_id must be allocated in increasing order of binlog order.
- */
- if (typ == GTID_EVENT &&
- event_group_new_gtid(serial_rgi, static_cast<Gtid_log_event *>(ev)))
+ if (typ == GTID_EVENT)
{
- sql_print_error("Error reading relay log event: %s",
- "slave SQL thread aborted because of out-of-memory error");
- mysql_mutex_unlock(&rli->data_lock);
- delete ev;
- DBUG_RETURN(1);
+ Gtid_log_event *gev= static_cast<Gtid_log_event *>(ev);
+
+ /*
+ For GTID, allocate a new sub_id for the given domain_id.
+ The sub_id must be allocated in increasing order of binlog order.
+ */
+ if (event_group_new_gtid(serial_rgi, gev))
+ {
+ sql_print_error("Error reading relay log event: %s", "slave SQL thread "
+ "aborted because of out-of-memory error");
+ mysql_mutex_unlock(&rli->data_lock);
+ delete ev;
+ DBUG_RETURN(1);
+ }
+
+ if (opt_gtid_ignore_duplicates)
+ {
+ serial_rgi->current_gtid.domain_id= gev->domain_id;
+ serial_rgi->current_gtid.server_id= gev->server_id;
+ serial_rgi->current_gtid.seq_no= gev->seq_no;
+ int res= rpl_global_gtid_slave_state.check_duplicate_gtid
+ (&serial_rgi->current_gtid, serial_rgi);
+ if (res < 0)
+ {
+ sql_print_error("Error processing GTID event: %s", "slave SQL "
+ "thread aborted because of out-of-memory error");
+ mysql_mutex_unlock(&rli->data_lock);
+ delete ev;
+ DBUG_RETURN(1);
+ }
+ /*
+ If we need to skip this event group (because the GTID was already
+ applied), then do it using the code for slave_skip_counter, which
+ is able to handle skipping until the end of the event group.
+ */
+ if (!res)
+ rli->slave_skip_counter= 1;
+ }
}
serial_rgi->future_event_relay_log_pos= rli->future_event_relay_log_pos;
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index 8aa202b381b..c9f1055d17a 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -1839,18 +1839,14 @@ static bool
fix_gtid_ignore_duplicates(sys_var *self, THD *thd, enum_var_type type)
{
bool running;
- bool err= false;
mysql_mutex_unlock(&LOCK_global_system_variables);
mysql_mutex_lock(&LOCK_active_mi);
running= master_info_index->give_error_if_slave_running();
mysql_mutex_unlock(&LOCK_active_mi);
- if (running)
- err= true;
mysql_mutex_lock(&LOCK_global_system_variables);
- /* ToDo: Isn't there a race here? I need to change the variable only under the LOCK_active_mi, and only if running is false. */
- return err;
+ return running ? true : false;
}