diff options
author | unknown <knielsen@knielsen-hq.org> | 2013-10-31 14:11:41 +0100 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2013-10-31 14:11:41 +0100 |
commit | 39df665a3332bd9bfb2529419f534a49cfac388c (patch) | |
tree | 0b431c7e00a4ef8344a5fbde2cdac4935ec204b1 | |
parent | 9c8da4ed762a4ad092e23cc07c34212320341ac1 (diff) | |
download | mariadb-git-39df665a3332bd9bfb2529419f534a49cfac388c.tar.gz |
MDEV-5206: Incorrect slave old-style position in MDEV-4506, parallel replication.
In parallel replication, there are two kinds of events which are
executed in different ways.
Normal events that are part of event groups/transactions are executed
asynchroneously by being queued for a worker thread.
Other events like format description and rotate and such are executed
directly in the driver SQL thread.
If the direct execution of the other events were to update the old-style
position, then the position gets updated too far ahead, before the normal
events that have been queued for a worker thread have been executed. So
this patch adds some special cases to prevent such position updates ahead
of time, and instead queues dummy events for the worker threads, so that
they will at an appropriate time do the position updates instead.
(Also fix a race in a test case that happened to trigger while running
tests for this patch).
-rw-r--r-- | mysql-test/suite/rpl/t/rpl_parallel.test | 3 | ||||
-rw-r--r-- | sql/log_event.cc | 16 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 105 | ||||
-rw-r--r-- | sql/rpl_parallel.h | 1 |
4 files changed, 114 insertions, 11 deletions
diff --git a/mysql-test/suite/rpl/t/rpl_parallel.test b/mysql-test/suite/rpl/t/rpl_parallel.test index 89834b790d6..5709cab19c0 100644 --- a/mysql-test/suite/rpl/t/rpl_parallel.test +++ b/mysql-test/suite/rpl/t/rpl_parallel.test @@ -92,6 +92,7 @@ INSERT INTO t2 VALUES (foo(10, --connection server_2 FLUSH LOGS; +--source include/wait_for_binlog_checkpoint.inc SET sql_log_bin=0; --delimiter || CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500)) @@ -148,6 +149,7 @@ SELECT * FROM t2 WHERE a >= 10 ORDER BY a; --let $binlog_file= slave-bin.000002 --source include/show_binlog_events.inc FLUSH LOGS; +--source include/wait_for_binlog_checkpoint.inc # Restart all the slave parallel worker threads, to clear all debug_sync actions. --connection server_2 @@ -161,6 +163,7 @@ SET debug_sync='RESET'; --echo *** Test that group-committed transactions on the master can replicate in parallel on the slave. *** --connection server_1 FLUSH LOGS; +--source include/wait_for_binlog_checkpoint.inc CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB; # Create some sentinel rows so that the rows inserted in parallel fall into # separate gaps and do not cause gap lock conflicts. diff --git a/sql/log_event.cc b/sql/log_event.cc index e7c0506a50a..7ce6c203248 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -966,11 +966,17 @@ int Log_event::do_update_pos(rpl_group_info *rgi) if (debug_not_change_ts_if_art_event == 1 && is_artificial_event()) debug_not_change_ts_if_art_event= 0; ); - rli->stmt_done(log_pos, - (is_artificial_event() && - IF_DBUG(debug_not_change_ts_if_art_event > 0, 1) ? - 0 : when), - thd, rgi); + /* + In parallel execution, delay position update for the events that are + not part of event groups (format description, rotate, and such) until + the actual event execution reaches that point. + */ + if (!rgi->is_parallel_exec || is_group_event(get_type_code())) + rli->stmt_done(log_pos, + (is_artificial_event() && + IF_DBUG(debug_not_change_ts_if_art_event > 0, 1) ? + 0 : when), + thd, rgi); DBUG_EXECUTE_IF("let_first_flush_log_change_timestamp", if (debug_not_change_ts_if_art_event == 0) debug_not_change_ts_if_art_event= 2; ); diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index d62bec6e605..8328dd24128 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -56,6 +56,48 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, } +static void +handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev) +{ + int cmp; + Relay_log_info *rli; + /* + Events that are not part of an event group, such as Format Description, + Stop, GTID List and such, are executed directly in the driver SQL thread, + to keep the relay log state up-to-date. But the associated position update + is done here, in sync with other normal events as they are queued to + worker threads. + */ + if ((thd->variables.option_bits & OPTION_BEGIN) && + opt_using_transactions) + return; + rli= qev->rgi->rli; + mysql_mutex_lock(&rli->data_lock); + cmp= strcmp(rli->group_relay_log_name, qev->event_relay_log_name); + if (cmp < 0) + { + rli->group_relay_log_pos= qev->future_event_relay_log_pos; + strmake_buf(rli->group_relay_log_name, qev->event_relay_log_name); + rli->notify_group_relay_log_name_update(); + } else if (cmp == 0 && + rli->group_relay_log_pos < qev->future_event_relay_log_pos) + rli->group_relay_log_pos= qev->future_event_relay_log_pos; + + cmp= strcmp(rli->group_master_log_name, qev->future_event_master_log_name); + if (cmp < 0) + { + strcpy(rli->group_master_log_name, qev->future_event_master_log_name); + rli->notify_group_master_log_name_update(); + rli->group_master_log_pos= qev->future_event_master_log_pos; + } + else if (cmp == 0 + && rli->group_master_log_pos < qev->future_event_master_log_pos) + rli->group_master_log_pos= qev->future_event_master_log_pos; + mysql_mutex_unlock(&rli->data_lock); + mysql_cond_broadcast(&rli->data_cond); +} + + static bool sql_worker_killed(THD *thd, rpl_group_info *rgi, bool in_event_group) { @@ -142,16 +184,24 @@ handle_rpl_parallel_thread(void *arg) while (events) { struct rpl_parallel_thread::queued_event *next= events->next; - Log_event_type event_type= events->ev->get_type_code(); + Log_event_type event_type; rpl_group_info *rgi= events->rgi; rpl_parallel_entry *entry= rgi->parallel_entry; uint64 wait_for_sub_id; uint64 wait_start_sub_id; bool end_of_group; + if (!events->ev) + { + handle_queued_pos_update(thd, events); + my_free(events); + events= next; + continue; + } + err= 0; /* Handle a new event group, which will be initiated by a GTID event. */ - if (event_type == GTID_EVENT) + if ((event_type= events->ev->get_type_code()) == GTID_EVENT) { in_event_group= true; /* @@ -794,13 +844,15 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, e->last_commit_id= 0; } - e->current_group_info= rgi; + qev->rgi= e->current_group_info= rgi; e->current_sub_id= rgi->gtid_sub_id; current= rgi->parallel_entry= e; } else if (!is_group_event || !current) { + my_off_t log_pos; int err; + bool tmp; /* Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread. Same for events not preceeded by GTID (we should not see those normally, @@ -824,11 +876,52 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, } } + tmp= serial_rgi->is_parallel_exec; + serial_rgi->is_parallel_exec= true; err= rpt_handle_event(qev, NULL); + serial_rgi->is_parallel_exec= tmp; + log_pos= qev->ev->log_pos; delete_or_keep_event_post_apply(serial_rgi, typ, qev->ev); - my_free(qev); - return (err != 0); + if (err) + { + my_free(qev); + return true; + } + qev->ev= NULL; + qev->future_event_master_log_pos= log_pos; + if (!current) + { + handle_queued_pos_update(rli->sql_driver_thd, qev); + my_free(qev); + return false; + } + /* + Queue an empty event, so that the position will be updated in a + reasonable way relative to other events: + + - If the currently executing events are queued serially for a single + thread, the position will only be updated when everything before has + completed. + + - If we are executing multiple independent events in parallel, then at + least the position will not be updated until one of them has reached + the current point. + */ + cur_thread= current->rpl_thread; + if (cur_thread) + { + mysql_mutex_lock(&cur_thread->LOCK_rpl_thread); + if (cur_thread->current_entry != current) + { + /* Not ours anymore, we need to grab a new one. */ + mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); + cur_thread= NULL; + } + } + if (!cur_thread) + cur_thread= current->rpl_thread= + global_rpl_thread_pool.get_thread(current); } else { @@ -848,8 +941,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, cur_thread= current->rpl_thread= global_rpl_thread_pool.get_thread(current); } + qev->rgi= current->current_group_info; } - qev->rgi= current->current_group_info; /* Queue the event for processing. diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index fe9c6708e97..0b9619e5e83 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -27,6 +27,7 @@ struct rpl_parallel_thread { char event_relay_log_name[FN_REFLEN]; char future_event_master_log_name[FN_REFLEN]; ulonglong event_relay_log_pos; + my_off_t future_event_master_log_pos; size_t event_size; } *event_queue, *last_in_queue; uint64 queued_size; |