diff options
-rw-r--r-- | mysql-test/std_data/mariadb-5.5-binlog.000001 | bin | 0 -> 1037 bytes | |||
-rw-r--r-- | mysql-test/suite/rpl/r/rpl_old_master.result | 27 | ||||
-rw-r--r-- | mysql-test/suite/rpl/t/rpl_old_master.test | 49 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 82 | ||||
-rw-r--r-- | sql/rpl_parallel.h | 3 | ||||
-rw-r--r-- | sql/slave.cc | 18 |
6 files changed, 134 insertions, 45 deletions
diff --git a/mysql-test/std_data/mariadb-5.5-binlog.000001 b/mysql-test/std_data/mariadb-5.5-binlog.000001 Binary files differnew file mode 100644 index 00000000000..9b6f6dce0fb --- /dev/null +++ b/mysql-test/std_data/mariadb-5.5-binlog.000001 diff --git a/mysql-test/suite/rpl/r/rpl_old_master.result b/mysql-test/suite/rpl/r/rpl_old_master.result new file mode 100644 index 00000000000..df5bbe34256 --- /dev/null +++ b/mysql-test/suite/rpl/r/rpl_old_master.result @@ -0,0 +1,27 @@ +include/master-slave.inc +[connection master] +include/stop_slave.inc +include/rpl_stop_server.inc [server_number=1] +include/rpl_start_server.inc [server_number=1] +SET @old_parallel= @@GLOBAL.slave_parallel_threads; +SET GLOBAL slave_parallel_threads=10; +CHANGE MASTER TO master_host='127.0.0.1', master_port=SERVER_MYPORT_1, master_user='root', master_log_file='master-bin.000001', master_log_pos=4; +include/start_slave.inc +CREATE TABLE t2 (a INT PRIMARY KEY) ENGINE=InnoDB; +INSERT INTO t2 VALUES (1); +SELECT * FROM t1 ORDER BY a; +a b +1 1 +2 2 +3 4 +4 8 +5 16 +SELECT * FROM t2; +a +1 +include/stop_slave.inc +SET GLOBAL slave_parallel_threads=@old_parallel; +DROP TABLE t1; +include/start_slave.inc +DROP TABLE t2; +include/rpl_end.inc diff --git a/mysql-test/suite/rpl/t/rpl_old_master.test b/mysql-test/suite/rpl/t/rpl_old_master.test new file mode 100644 index 00000000000..8f61d6979cd --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_old_master.test @@ -0,0 +1,49 @@ +# Test replicating off old master. +# We simulate old master by copying in pre-generated binlog files from earlier +# server versions. + +--source include/have_innodb.inc +--source include/master-slave.inc + +--connection slave +--source include/stop_slave.inc + +--connection master +--let $datadir= `SELECT @@datadir` + +--let $rpl_server_number= 1 +--source include/rpl_stop_server.inc + +--remove_file $datadir/master-bin.000001 +--copy_file $MYSQL_TEST_DIR/std_data/mariadb-5.5-binlog.000001 $datadir/master-bin.000001 + +--let $rpl_server_number= 1 +--source include/rpl_start_server.inc + +--source include/wait_until_connected_again.inc + +--connection slave +SET @old_parallel= @@GLOBAL.slave_parallel_threads; +SET GLOBAL slave_parallel_threads=10; +--replace_result $SERVER_MYPORT_1 SERVER_MYPORT_1 +eval CHANGE MASTER TO master_host='127.0.0.1', master_port=$SERVER_MYPORT_1, master_user='root', master_log_file='master-bin.000001', master_log_pos=4; +--source include/start_slave.inc + +--connection master +CREATE TABLE t2 (a INT PRIMARY KEY) ENGINE=InnoDB; +INSERT INTO t2 VALUES (1); +--save_master_pos + +--connection slave +--sync_with_master +SELECT * FROM t1 ORDER BY a; +SELECT * FROM t2; + +--source include/stop_slave.inc +SET GLOBAL slave_parallel_threads=@old_parallel; +DROP TABLE t1; +--source include/start_slave.inc + +--connection master +DROP TABLE t2; +--source include/rpl_end.inc diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index cceb6013d6c..12fcef7c866 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -1259,11 +1259,12 @@ abandon_worker_thread(THD *thd, rpl_parallel_thread *cur_thread, do_event() is executed by the sql_driver_thd thread. It's main purpose is to find a thread that can execute the query. - @retval false ok, event was accepted - @retval true error + @retval 0 ok, event was accepted + @retval 1 error + @retval -1 event should be executed serially, in the sql driver thread */ -bool +int rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ulonglong event_size) { @@ -1277,6 +1278,32 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, bool did_enter_cond= false; const char *old_msg= NULL; + /* Handle master log name change, seen in Rotate_log_event. */ + typ= ev->get_type_code(); + if (unlikely(typ == ROTATE_EVENT)) + { + Rotate_log_event *rev= static_cast<Rotate_log_event *>(ev); + if ((rev->server_id != global_system_variables.server_id || + rli->replicate_same_server_id) && + !rev->is_relay_log_event() && + !rli->is_in_group()) + { + memcpy(rli->future_event_master_log_name, + rev->new_log_ident, rev->ident_len+1); + } + } + + /* + Execute queries non-parallel if slave_skip_counter is set, as it's is + easier to skip queries in single threaded mode. + */ + if (rli->slave_skip_counter) + return -1; + + /* Execute pre-10.0 event, which have no GTID, in single-threaded mode. */ + if (unlikely(!current) && typ != GTID_EVENT) + return -1; + /* ToDo: what to do with this lock?!? */ mysql_mutex_unlock(&rli->data_lock); @@ -1288,21 +1315,20 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, been partially queued, but after that we will just ignore any further events the SQL driver thread may try to queue, and eventually it will stop. */ - if (((typ= ev->get_type_code()) == GTID_EVENT || - !(is_group_event= Log_event::is_group_event(typ))) && - rli->abort_slave) + is_group_event= Log_event::is_group_event(typ); + if ((typ == GTID_EVENT || !is_group_event) && rli->abort_slave) sql_thread_stopping= true; if (sql_thread_stopping) { delete ev; /* - Return false ("no error"); normal stop is not an error, and otherwise the - error has already been recorded. + Return "no error"; normal stop is not an error, and otherwise the error + has already been recorded. */ - return false; + return 0; } - if (typ == GTID_EVENT || unlikely(!current)) + if (typ == GTID_EVENT) { uint32 domain_id; if (likely(typ == GTID_EVENT)) @@ -1317,7 +1343,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, { my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME)); delete ev; - return true; + return 1; } current= e; } @@ -1336,7 +1362,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, { /* This means we were killed. The error is already signalled. */ delete ev; - return true; + return 1; } if (!(qev= cur_thread->get_qev(ev, event_size, rli))) @@ -1344,7 +1370,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, abandon_worker_thread(rli->sql_driver_thd, cur_thread, &did_enter_cond, old_msg); delete ev; - return true; + return 1; } if (typ == GTID_EVENT) @@ -1357,7 +1383,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, abandon_worker_thread(rli->sql_driver_thd, cur_thread, &did_enter_cond, old_msg); delete ev; - return true; + return 1; } /* @@ -1395,7 +1421,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, abandon_worker_thread(rli->sql_driver_thd, cur_thread, &did_enter_cond, old_msg); delete ev; - return true; + return 1; } e->current_gco= rgi->gco= gco; } @@ -1409,7 +1435,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, e->current_sub_id= rgi->gtid_sub_id; ++e->count_queued_event_groups; } - else if (!is_group_event || !e) + else if (!is_group_event) { my_off_t log_pos; int err; @@ -1418,38 +1444,22 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread. Same for events not preceeded by GTID (we should not see those normally, but they might be from an old master). - - The variable `e' is NULL for the case where the master did not - have GTID, like a MariaDB 5.5 or MySQL master. */ qev->rgi= serial_rgi; - /* Handle master log name change, seen in Rotate_log_event. */ - if (typ == ROTATE_EVENT) - { - Rotate_log_event *rev= static_cast<Rotate_log_event *>(qev->ev); - if ((rev->server_id != global_system_variables.server_id || - rli->replicate_same_server_id) && - !rev->is_relay_log_event() && - !rli->is_in_group()) - { - memcpy(rli->future_event_master_log_name, - rev->new_log_ident, rev->ident_len+1); - } - } tmp= serial_rgi->is_parallel_exec; serial_rgi->is_parallel_exec= true; err= rpt_handle_event(qev, NULL); serial_rgi->is_parallel_exec= tmp; - log_pos= qev->ev->log_pos; - delete_or_keep_event_post_apply(serial_rgi, typ, qev->ev); + log_pos= ev->log_pos; + delete_or_keep_event_post_apply(serial_rgi, typ, ev); if (err) { cur_thread->free_qev(qev); abandon_worker_thread(rli->sql_driver_thd, cur_thread, &did_enter_cond, old_msg); - return true; + return 1; } /* Queue an empty event, so that the position will be updated in a @@ -1480,5 +1490,5 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, &did_enter_cond, old_msg); mysql_cond_signal(&cur_thread->COND_rpl_thread); - return false; + return 0; } diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index 31a6a035dd8..128ef698240 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -225,8 +225,7 @@ struct rpl_parallel { void wait_for_done(THD *thd, Relay_log_info *rli); void stop_during_until(); bool workers_idle(); - bool do_event(rpl_group_info *serial_rgi, Log_event *ev, - ulonglong event_size); + int do_event(rpl_group_info *serial_rgi, Log_event *ev, ulonglong event_size); }; diff --git a/sql/slave.cc b/sql/slave.cc index 29515fb3821..3a6dcf92a8a 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -3449,13 +3449,17 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, update_state_of_relay_log(rli, ev); - /* - Execute queries in parallel, except if slave_skip_counter is set, - as it's is easier to skip queries in single threaded mode. - */ - - if (opt_slave_parallel_threads > 0 && rli->slave_skip_counter == 0) - DBUG_RETURN(rli->parallel.do_event(serial_rgi, ev, event_size)); + if (opt_slave_parallel_threads > 0) + { + int res= rli->parallel.do_event(serial_rgi, ev, event_size); + if (res >= 0) + DBUG_RETURN(res); + /* + Else we proceed to execute the event non-parallel. + This is the case for pre-10.0 events without GTID, and for handling + slave_skip_counter. + */ + } /* For GTID, allocate a new sub_id for the given domain_id. |