summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mysql-test/suite/rpl/r/rpl_parallel.result73
-rw-r--r--mysql-test/suite/rpl/t/rpl_parallel.test121
-rw-r--r--sql/handler.cc8
-rw-r--r--sql/log.cc134
-rw-r--r--sql/log_event.cc2
-rw-r--r--sql/rpl_parallel.cc26
-rw-r--r--sql/slave.cc11
-rw-r--r--sql/sql_class.cc45
-rw-r--r--sql/sql_class.h19
9 files changed, 366 insertions, 73 deletions
diff --git a/mysql-test/suite/rpl/r/rpl_parallel.result b/mysql-test/suite/rpl/r/rpl_parallel.result
index 03b102a1af9..d256a609b53 100644
--- a/mysql-test/suite/rpl/r/rpl_parallel.result
+++ b/mysql-test/suite/rpl/r/rpl_parallel.result
@@ -7,6 +7,7 @@ SET GLOBAL slave_parallel_threads=10;
CHANGE MASTER TO master_use_gtid=slave_pos;
include/start_slave.inc
*** Test long-running query in domain 1 can run in parallel with short queries in domain 0 ***
+ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=MyISAM;
CREATE TABLE t2 (a int PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1);
@@ -259,6 +260,78 @@ SET GLOBAL binlog_format=@old_format;
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
include/start_slave.inc
+*** Test killing slave threads at various wait points ***
+*** 1. Test killing transaction waiting in commit for previous transaction to commit ***
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1';
+SET binlog_format=statement;
+INSERT INTO t3 VALUES (31, foo(31,
+'commit_before_prepare_ordered WAIT_FOR t2_waiting',
+'commit_after_prepare_ordered SIGNAL t1_ready WAIT_FOR t1_cont'));
+SET debug_sync='now WAIT_FOR master_queued1';
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2';
+SET binlog_format=statement;
+BEGIN;
+INSERT INTO t3 VALUES (32, foo(32,
+'ha_write_row_end SIGNAL t2_query WAIT_FOR t2_cont',
+''));
+INSERT INTO t3 VALUES (33, foo(33,
+'group_commit_waiting_for_prior SIGNAL t2_waiting',
+'group_commit_waiting_for_prior_killed SIGNAL t2_killed'));
+COMMIT;
+SET debug_sync='now WAIT_FOR master_queued2';
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued3';
+SET binlog_format=statement;
+INSERT INTO t3 VALUES (34, foo(34,
+'',
+''));
+SET debug_sync='now WAIT_FOR master_queued3';
+SET debug_sync='now SIGNAL master_cont1';
+SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
+a b
+31 31
+32 32
+33 33
+34 34
+SET sql_log_bin=0;
+CALL mtr.add_suppression("Query execution was interrupted");
+CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
+SET sql_log_bin=1;
+SET debug_sync='now WAIT_FOR t2_query';
+SET debug_sync='now SIGNAL t2_cont';
+SET debug_sync='now WAIT_FOR t1_ready';
+KILL THD_ID;
+SET debug_sync='now WAIT_FOR t2_killed';
+SET debug_sync='now SIGNAL t1_cont';
+include/wait_for_slave_sql_error.inc [errno=1317,1963]
+STOP SLAVE IO_THREAD;
+SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
+a b
+31 31
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
+SET sql_log_bin=0;
+DROP FUNCTION foo;
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+RETURNS INT DETERMINISTIC
+BEGIN
+RETURN x;
+END
+||
+SET sql_log_bin=1;
+INSERT INTO t3 VALUES (39,0);
+include/start_slave.inc
+SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
+a b
+31 31
+32 32
+33 33
+34 34
+39 0
+include/stop_slave.inc
+SET GLOBAL binlog_format=@old_format;
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
+include/start_slave.inc
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc
diff --git a/mysql-test/suite/rpl/t/rpl_parallel.test b/mysql-test/suite/rpl/t/rpl_parallel.test
index 5709cab19c0..8a6eddbc2e5 100644
--- a/mysql-test/suite/rpl/t/rpl_parallel.test
+++ b/mysql-test/suite/rpl/t/rpl_parallel.test
@@ -19,6 +19,7 @@ CHANGE MASTER TO master_use_gtid=slave_pos;
--echo *** Test long-running query in domain 1 can run in parallel with short queries in domain 0 ***
--connection server_1
+ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=MyISAM;
CREATE TABLE t2 (a int PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1);
@@ -334,6 +335,126 @@ SELECT * FROM t3 WHERE a >= 20 ORDER BY a;
--connection server_2
+# Respawn all worker threads to clear any left-over debug_sync or other stuff.
+--source include/stop_slave.inc
+SET GLOBAL binlog_format=@old_format;
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
+--source include/start_slave.inc
+
+
+--echo *** Test killing slave threads at various wait points ***
+--echo *** 1. Test killing transaction waiting in commit for previous transaction to commit ***
+
+# Set up three transactions on the master that will be group-committed
+# together so they can be replicated in parallel on the slave.
+--connection con_temp3
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1';
+SET binlog_format=statement;
+send INSERT INTO t3 VALUES (31, foo(31,
+ 'commit_before_prepare_ordered WAIT_FOR t2_waiting',
+ 'commit_after_prepare_ordered SIGNAL t1_ready WAIT_FOR t1_cont'));
+
+--connection server_1
+SET debug_sync='now WAIT_FOR master_queued1';
+
+--connection con_temp4
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2';
+SET binlog_format=statement;
+BEGIN;
+# This insert is just so we can get T2 to wait while a query is running that we
+# can see in SHOW PROCESSLIST so we can get its thread_id to kill later.
+INSERT INTO t3 VALUES (32, foo(32,
+ 'ha_write_row_end SIGNAL t2_query WAIT_FOR t2_cont',
+ ''));
+# This insert sets up debug_sync points so that T2 will tell when it is at its
+# wait point where we want to kill it - and when it has been killed.
+INSERT INTO t3 VALUES (33, foo(33,
+ 'group_commit_waiting_for_prior SIGNAL t2_waiting',
+ 'group_commit_waiting_for_prior_killed SIGNAL t2_killed'));
+send COMMIT;
+
+--connection server_1
+SET debug_sync='now WAIT_FOR master_queued2';
+
+--connection con_temp5
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued3';
+SET binlog_format=statement;
+send INSERT INTO t3 VALUES (34, foo(34,
+ '',
+ ''));
+
+--connection server_1
+SET debug_sync='now WAIT_FOR master_queued3';
+SET debug_sync='now SIGNAL master_cont1';
+
+--connection con_temp3
+REAP;
+--connection con_temp4
+REAP;
+--connection con_temp5
+REAP;
+
+--connection server_1
+SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
+
+--connection server_2
+SET sql_log_bin=0;
+CALL mtr.add_suppression("Query execution was interrupted");
+CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
+SET sql_log_bin=1;
+# Wait until T2 is inside executing its insert of 32, then find it in SHOW
+# PROCESSLIST to know its thread id for KILL later.
+SET debug_sync='now WAIT_FOR t2_query';
+--let $thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE INFO LIKE '%foo(32%' AND INFO NOT LIKE '%LIKE%'`
+SET debug_sync='now SIGNAL t2_cont';
+
+# Wait until T2 has entered its wait for T1 to commit, and T1 has
+# progressed into its commit phase.
+SET debug_sync='now WAIT_FOR t1_ready';
+
+# Now kill the transaction T2.
+--replace_result $thd_id THD_ID
+eval KILL $thd_id;
+
+# Wait until T2 has reacted on the kill.
+SET debug_sync='now WAIT_FOR t2_killed';
+
+# Now we can allow T1 to proceed.
+SET debug_sync='now SIGNAL t1_cont';
+
+--let $slave_sql_errno= 1317,1963
+--source include/wait_for_slave_sql_error.inc
+STOP SLAVE IO_THREAD;
+SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
+
+# Now we have to disable the debug_sync statements, so they do not trigger
+# when the events are retried.
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
+SET sql_log_bin=0;
+DROP FUNCTION foo;
+--delimiter ||
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+ RETURNS INT DETERMINISTIC
+ BEGIN
+ RETURN x;
+ END
+||
+--delimiter ;
+SET sql_log_bin=1;
+
+--connection server_1
+INSERT INTO t3 VALUES (39,0);
+--save_master_pos
+
+--connection server_2
+--source include/start_slave.inc
+--sync_with_master
+SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
+
+
+--connection server_2
--source include/stop_slave.inc
SET GLOBAL binlog_format=@old_format;
SET GLOBAL slave_parallel_threads=0;
diff --git a/sql/handler.cc b/sql/handler.cc
index 1518f2baaa0..88431d4db32 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -1300,7 +1300,10 @@ int ha_commit_trans(THD *thd, bool all)
{
/* Free resources and perform other cleanup even for 'empty' transactions. */
if (is_real_trans)
+ {
thd->transaction.cleanup();
+ thd->wakeup_subsequent_commits(error);
+ }
DBUG_RETURN(0);
}
@@ -1334,6 +1337,7 @@ int ha_commit_trans(THD *thd, bool all)
thd->variables.lock_wait_timeout))
{
ha_rollback_trans(thd, all);
+ thd->wakeup_subsequent_commits(1);
DBUG_RETURN(1);
}
@@ -1421,6 +1425,7 @@ done:
err:
error= 1; /* Transaction was rolled back */
ha_rollback_trans(thd, all);
+ thd->wakeup_subsequent_commits(error);
end:
if (rw_trans && mdl_request.ticket)
@@ -1591,10 +1596,7 @@ int ha_rollback_trans(THD *thd, bool all)
/* Always cleanup. Even if nht==0. There may be savepoints. */
if (is_real_trans)
- {
- thd->wakeup_subsequent_commits(error);
thd->transaction.cleanup();
- }
if (all)
thd->transaction_rollback_request= FALSE;
diff --git a/sql/log.cc b/sql/log.cc
index 9cddb5a8e75..fbb73acf5d1 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -6622,7 +6622,7 @@ int
MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
{
group_commit_entry *entry, *orig_queue;
- wait_for_commit *list, *cur, *last;
+ wait_for_commit *cur, *last;
wait_for_commit *wfc;
DBUG_ENTER("MYSQL_BIN_LOG::queue_for_group_commit");
@@ -6663,17 +6663,39 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
wfc->opaque_pointer= NULL;
DBUG_PRINT("info", ("After waiting for prior commit, queued_by_other=%d",
orig_entry->queued_by_other));
- orig_entry->thd->exit_cond(old_msg);
if (wfc->waiting_for_commit)
{
- /* Interrupted by kill. */
- wfc->wakeup_error= orig_entry->thd->killed_errno();
- if (wfc->wakeup_error)
- wfc->wakeup_error= ER_QUERY_INTERRUPTED;
- my_message(wfc->wakeup_error, ER(wfc->wakeup_error), MYF(0));
- DBUG_RETURN(-1);
+ /* Wait terminated due to kill. */
+ wait_for_commit *loc_waitee= wfc->waitee;
+ mysql_mutex_lock(&loc_waitee->LOCK_wait_commit);
+ if (loc_waitee->wakeup_subsequent_commits_running ||
+ orig_entry->queued_by_other)
+ {
+ /* Our waitee is already waking us up, so ignore the kill. */
+ mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
+ do
+ {
+ mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit);
+ } while (wfc->waiting_for_commit);
+ }
+ else
+ {
+ /* We were killed, so remove us from the list of waitee. */
+ wfc->remove_from_list(&loc_waitee->subsequent_commits_list);
+ mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
+
+ orig_entry->thd->exit_cond(old_msg);
+ /* Interrupted by kill. */
+ DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior_killed");
+ wfc->wakeup_error= orig_entry->thd->killed_errno();
+ if (wfc->wakeup_error)
+ wfc->wakeup_error= ER_QUERY_INTERRUPTED;
+ my_message(wfc->wakeup_error, ER(wfc->wakeup_error), MYF(0));
+ DBUG_RETURN(-1);
+ }
}
+ orig_entry->thd->exit_cond(old_msg);
}
else
mysql_mutex_unlock(&wfc->LOCK_wait_commit);
@@ -6729,9 +6751,8 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
used by the caller or any other function.
*/
- list= wfc;
- cur= list;
- last= list;
+ cur= wfc;
+ last= wfc;
entry= orig_entry;
for (;;)
{
@@ -6757,11 +6778,11 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
*/
if (cur->subsequent_commits_list)
{
- bool have_lock;
wait_for_commit *waiter;
+ wait_for_commit *wakeup_list= NULL;
+ wait_for_commit **wakeup_next_ptr= &wakeup_list;
mysql_mutex_lock(&cur->LOCK_wait_commit);
- have_lock= true;
/*
Grab the list, now safely under lock, and process it if still
non-empty.
@@ -6802,18 +6823,68 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
For this, we need to set the "wakeup running" flag and release
the waitee lock to avoid a deadlock, see comments on
THD::wakeup_subsequent_commits2() for details.
+
+ So we need to put these on a list and delay the wakeup until we
+ have released the lock.
+ */
+ *wakeup_next_ptr= waiter;
+ wakeup_next_ptr= &waiter->next_subsequent_commit;
+ }
+ waiter= next;
+ }
+ if (wakeup_list)
+ {
+ /* Now release our lock and do the wakeups that were delayed above. */
+ cur->wakeup_subsequent_commits_running= true;
+ mysql_mutex_unlock(&cur->LOCK_wait_commit);
+ for (;;)
+ {
+ wait_for_commit *next;
+
+ /*
+ ToDo: We wakeup the waiter here, so that it can have the chance to
+ reach its own commit state and queue up for this same group commit,
+ if it is still pending.
+
+ One problem with this is that if the waiter does not reach its own
+ commit state before this group commit starts, and then the group
+ commit fails (binlog write failure), we do not get to propagate
+ the error to the waiter.
+
+ A solution for this could be to delay the wakeup until commit is
+ successful. But then we need to set a flag in the waitee that it is
+ already queued for group commit, so that the waiter can check this
+ flag and queue itself if it _does_ reach the commit state in time.
+
+ (But error handling in case of binlog write failure is currently
+ broken in other ways, as well).
*/
- if (have_lock)
+ if (&wakeup_list->next_subsequent_commit == wakeup_next_ptr)
{
- have_lock= false;
- cur->wakeup_subsequent_commits_running= true;
- mysql_mutex_unlock(&cur->LOCK_wait_commit);
+ /* The last one in the list. */
+ wakeup_list->wakeup(0);
+ break;
}
- waiter->wakeup(0);
+ /*
+ Important: don't access wakeup_list->next after the wakeup() call,
+ it may be invalidated by the other thread.
+ */
+ next= wakeup_list->next_subsequent_commit;
+ wakeup_list->wakeup(0);
+ wakeup_list= next;
}
- waiter= next;
+ /*
+ We need a full memory barrier between walking the list and clearing
+ the flag wakeup_subsequent_commits_running. This barrier is needed
+ to ensure that no other thread will start to modify the list
+ pointers before we are done traversing the list.
+
+ But wait_for_commit::wakeup(), which was called above, does a full
+ memory barrier already (it locks a mutex).
+ */
+ cur->wakeup_subsequent_commits_running= false;
}
- if (have_lock)
+ else
mysql_mutex_unlock(&cur->LOCK_wait_commit);
}
if (cur == last)
@@ -6827,29 +6898,6 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
DBUG_ASSERT(entry != NULL);
}
- /*
- Now we need to clear the wakeup_subsequent_commits_running flags.
-
- We need a full memory barrier between walking the list above, and clearing
- the flag wakeup_subsequent_commits_running below. This barrier is needed
- to ensure that no other thread will start to modify the list pointers
- before we are done traversing the list.
-
- But wait_for_commit::wakeup(), which was called above for any other thread
- that might modify the list in parallel, does a full memory barrier already
- (it locks a mutex).
- */
- if (list)
- {
- for (;;)
- {
- list->wakeup_subsequent_commits_running= false;
- if (list == last)
- break;
- list= list->next_subsequent_commit;
- }
- }
-
if (opt_binlog_commit_wait_count > 0)
mysql_cond_signal(&COND_prepare_ordered);
mysql_mutex_unlock(&LOCK_prepare_ordered);
diff --git a/sql/log_event.cc b/sql/log_event.cc
index d6c7db1e215..d205a08e708 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -7158,7 +7158,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
res= trans_commit(thd); /* Automatically rolls back on error. */
thd->mdl_context.release_transactional_locks();
- if (sub_id)
+ if (!res && sub_id)
rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid);
/*
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index ff2ad84e037..91aa36abc52 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -24,7 +24,7 @@ static int
rpt_handle_event(rpl_parallel_thread::queued_event *qev,
struct rpl_parallel_thread *rpt)
{
- int err __attribute__((unused));
+ int err;
rpl_group_info *rgi= qev->rgi;
Relay_log_info *rli= rgi->rli;
THD *thd= rgi->thd;
@@ -172,6 +172,18 @@ finish_event_group(THD *thd, int err, uint64 sub_id,
}
+static void
+signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi)
+{
+ rgi->is_error= true;
+ rgi->cleanup_context(thd, true);
+ rgi->rli->abort_slave= true;
+ mysql_mutex_lock(rgi->rli->relay_log.get_log_lock());
+ mysql_mutex_unlock(rgi->rli->relay_log.get_log_lock());
+ rgi->rli->relay_log.signal_update();
+}
+
+
pthread_handler_t
handle_rpl_parallel_thread(void *arg)
{
@@ -304,10 +316,8 @@ handle_rpl_parallel_thread(void *arg)
{
/* The thread got a kill signal. */
thd->send_kill_message();
- rgi->is_error= true;
slave_output_error_info(rgi->rli, thd);
- rgi->cleanup_context(thd, true);
- rgi->rli->abort_slave= true;
+ signal_error_to_sql_driver_thread(thd, rgi);
}
rgi->wait_start_sub_id= 0; /* No need to check again. */
}
@@ -363,10 +373,8 @@ handle_rpl_parallel_thread(void *arg)
if (err)
{
- rgi->is_error= true;
slave_output_error_info(rgi->rli, thd);
- rgi->cleanup_context(thd, true);
- rgi->rli->abort_slave= true;
+ signal_error_to_sql_driver_thread(thd, rgi);
}
if (end_of_group)
{
@@ -405,11 +413,9 @@ handle_rpl_parallel_thread(void *arg)
half-processed event group.
*/
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
- group_rgi->is_error= true;
finish_event_group(thd, 1, group_rgi->gtid_sub_id,
group_rgi->parallel_entry, &group_rgi->commit_orderer);
- group_rgi->cleanup_context(thd, true);
- group_rgi->rli->abort_slave= true;
+ signal_error_to_sql_driver_thread(thd, group_rgi);
in_event_group= false;
delete group_rgi;
group_rgi= NULL;
diff --git a/sql/slave.cc b/sql/slave.cc
index 4be4a96d142..bddb69d84bb 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -6242,6 +6242,17 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size)
}
/*
+ We have to check sql_slave_killed() here an extra time.
+ Otherwise we may miss a wakeup, since last check was done
+ without holding LOCK_log.
+ */
+ if (sql_slave_killed(rgi))
+ {
+ mysql_mutex_unlock(log_lock);
+ break;
+ }
+
+ /*
If the I/O thread is blocked, unblock it. Ok to broadcast
after unlock, because the mutex is only destroyed in
~Relay_log_info(), i.e. when rli is destroyed, and rli will
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 1898bae1499..11c03191a49 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -5786,25 +5786,49 @@ int
wait_for_commit::wait_for_prior_commit2(THD *thd)
{
const char *old_msg;
+ wait_for_commit *loc_waitee;
mysql_mutex_lock(&LOCK_wait_commit);
old_msg= thd->enter_cond(&COND_wait_commit, &LOCK_wait_commit,
"Waiting for prior transaction to commit");
while (waiting_for_commit && !thd->check_killed())
mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
- thd->exit_cond(old_msg);
- waitee= NULL;
if (!waiting_for_commit)
{
if (wakeup_error)
my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
- return wakeup_error;
+ goto end;
+ }
+ /*
+ Wait was interrupted by kill. We need to unregister our wait and give the
+ error. But if a wakeup is already in progress, then we must ignore the
+ kill and not give error, otherwise we get inconsistency between waitee and
+ waiter as to whether we succeed or fail (eg. we may roll back but waitee
+ might attempt to commit both us and any subsequent commits waiting for us).
+ */
+ loc_waitee= this->waitee;
+ mysql_mutex_lock(&loc_waitee->LOCK_wait_commit);
+ if (loc_waitee->wakeup_subsequent_commits_running)
+ {
+ /* We are being woken up; ignore the kill and just wait. */
+ mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
+ do
+ {
+ mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
+ } while (waiting_for_commit);
+ goto end;
}
- /* Wait was interrupted by kill, so give the error. */
+ remove_from_list(&loc_waitee->subsequent_commits_list);
+ mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
+
wakeup_error= thd->killed_errno();
if (!wakeup_error)
wakeup_error= ER_QUERY_INTERRUPTED;
my_message(wakeup_error, ER(wakeup_error), MYF(0));
+
+end:
+ thd->exit_cond(old_msg);
+ waitee= NULL;
return wakeup_error;
}
@@ -5891,7 +5915,6 @@ wait_for_commit::unregister_wait_for_prior_commit2()
if (waiting_for_commit)
{
wait_for_commit *loc_waitee= this->waitee;
- wait_for_commit **next_ptr_ptr, *cur;
mysql_mutex_lock(&loc_waitee->LOCK_wait_commit);
if (loc_waitee->wakeup_subsequent_commits_running)
{
@@ -5909,17 +5932,7 @@ wait_for_commit::unregister_wait_for_prior_commit2()
else
{
/* Remove ourselves from the list in the waitee. */
- next_ptr_ptr= &loc_waitee->subsequent_commits_list;
- while ((cur= *next_ptr_ptr) != NULL)
- {
- if (cur == this)
- {
- *next_ptr_ptr= this->next_subsequent_commit;
- break;
- }
- next_ptr_ptr= &cur->next_subsequent_commit;
- }
- waiting_for_commit= false;
+ remove_from_list(&loc_waitee->subsequent_commits_list);
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
}
}
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 083cf0b8c04..a3fc3a7866f 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -1660,6 +1660,25 @@ struct wait_for_commit
if (waiting_for_commit)
unregister_wait_for_prior_commit2();
}
+ /*
+ Remove a waiter from the list in the waitee. Used to unregister a wait.
+ The caller must be holding the locks of both waiter and waitee.
+ */
+ void remove_from_list(wait_for_commit **next_ptr_ptr)
+ {
+ wait_for_commit *cur;
+
+ while ((cur= *next_ptr_ptr) != NULL)
+ {
+ if (cur == this)
+ {
+ *next_ptr_ptr= this->next_subsequent_commit;
+ break;
+ }
+ next_ptr_ptr= &cur->next_subsequent_commit;
+ }
+ waiting_for_commit= false;
+ }
void wakeup(int wakeup_error);