diff options
author | Aleksey Midenkov <midenok@gmail.com> | 2023-01-16 00:01:35 +0300 |
---|---|---|
committer | Aleksey Midenkov <midenok@gmail.com> | 2023-03-26 20:20:42 +0300 |
commit | 06a7eb2dc31a273c3408ae983c3a2a7523dc353a (patch) | |
tree | fb3311b1d3c6727d564da35dd37b295793c8d841 | |
parent | 8f934a8a573dedf15b40d9cdb1d033e24792c70c (diff) | |
download | mariadb-git-bb-11.0-midenok-MDEV-30421.tar.gz |
MDEV-30421 Allow administrators to enable or disable parallel replication on a per-table basisbb-11.0-midenok-MDEV-30421
Per-domain dedicated thread for processing ordered transactions. The
thread is reserved from the total number of domain threads (controlled
by slave_parallel_threads and slave_domain_parallel_threads). Whether
the event goes to ordered thread depends on FL_ALLOW_PARALLEL flag as
well as several other conditions. FL_ALLOW_PARALLEL is passed from
master and is set for the event depending on master configuration
directives. To allow dedicated slave on server one must enable it
explicitly with configuration directive:
set global slave_ordered_thread= 1;
Originally it was controlled by skip_parallel_replication session
variable which can be changed per-statement. This patch adds several
more directives to control it on per-schema and per-table levels:
parallel_do_db
parallel_do_table
parallel_ignore_db
parallel_ignore_table
parallel_wild_do_table
parallel_wild_ignore_table
Each directive is comma-separated list of fully-qualified table
names. Spaces after comma are ignored (but not before).
"Table" directives take precedence over "db" directives. "Do"
directives take precedence over "ignore" directives. "Wild" directives
are checked if "do" and "ignore" directives did not match.
If none of the above directives present everything is considered
parallel. If any of the above directives present and the table did not
match anything in the lists it is considered ordered.
Examples:
set @@global.parallel_do_db= "db_parallel";
set @@global.parallel_ignore_db= "db_serial";
set global parallel_do_table= "db_serial.t3, db_serial.t1";
set global parallel_wild_ignore_table= "db_parallel.non_parallel_%"
Normal behaviour of ordered transaction is before start to wait any of
prior transactions to commit: they get into different commit
groups. But since all the ordered transactions (within one domain) go
to a single thread we may avoid that restriction with this directive
on slave:
set global slave_ordered_dont_wait= 1;
When set events without explicit FL_WAITED flag going to ordered
thread nonetheless accept optimistic speculation. I.e. they get into
same commit group with parallel events: ordered event is executed in
parallel with parallel events.
27 files changed, 852 insertions, 129 deletions
diff --git a/include/mysql_com.h b/include/mysql_com.h index b0e96caddf7..0eb0a15fbed 100644 --- a/include/mysql_com.h +++ b/include/mysql_com.h @@ -98,6 +98,7 @@ enum enum_server_command COM_RESET_CONNECTION, /* don't forget to update const char *command_name[] in sql_parse.cc */ COM_MDB_GAP_BEG, + COM_SLAVE_ORDERED=240, COM_MDB_GAP_END=249, COM_STMT_BULK_EXECUTE=250, COM_SLAVE_WORKER=251, diff --git a/mysql-test/main/debug_sync.result b/mysql-test/main/debug_sync.result index 4c1711a6d6b..2cf54d4ea81 100644 --- a/mysql-test/main/debug_sync.result +++ b/mysql-test/main/debug_sync.result @@ -161,7 +161,7 @@ Variable_name Value debug_sync ON - current signals: 'something,from_function,from_myvar' SET DEBUG_SYNC= 'now WAIT_FOR nothing TIMEOUT 0'; Warnings: -Warning #### debug sync point wait timed out +Warning #### Debug sync WAIT_FOR 'nothing' timed out SET DEBUG_SYNC= 'now SIGNAL nothing'; SHOW VARIABLES LIKE 'DEBUG_SYNC'; Variable_name Value diff --git a/mysql-test/main/mysqld--help.result b/mysql-test/main/mysqld--help.result index 458a2ceaad0..8c8825a0be9 100644 --- a/mysql-test/main/mysqld--help.result +++ b/mysql-test/main/mysqld--help.result @@ -1291,6 +1291,12 @@ The following specify which files/extra groups are read (specified before remain --slave-net-timeout=# Number of seconds to wait for more data from any master/slave connection before aborting the read + --slave-ordered-dont-wait + Slave ordered events don't wait previous groups unless + they have explicit WAIT flag + --slave-ordered-thread + Per-domain dedicated thread on slave for processing + ordered events --slave-parallel-max-queued=# Limit on how much memory SQL threads should use per parallel replication thread when reading ahead in the @@ -1887,6 +1893,8 @@ slave-exec-mode STRICT slave-max-allowed-packet 1073741824 slave-max-statement-time 0 slave-net-timeout 60 +slave-ordered-dont-wait FALSE +slave-ordered-thread FALSE slave-parallel-max-queued 131072 slave-parallel-mode conservative slave-parallel-threads 0 diff --git a/mysql-test/suite/binlog/r/binlog_checkpoint.result b/mysql-test/suite/binlog/r/binlog_checkpoint.result index a00b1c0c1a3..1162322f485 100644 --- a/mysql-test/suite/binlog/r/binlog_checkpoint.result +++ b/mysql-test/suite/binlog/r/binlog_checkpoint.result @@ -30,7 +30,7 @@ connect con2,localhost,root,,; This will timeout, as RESET MASTER is blocked SET DEBUG_SYNC= "now WAIT_FOR reset_master_done TIMEOUT 1"; Warnings: -Warning 1639 debug sync point wait timed out +Warning 1639 Debug sync WAIT_FOR 'reset_master_done' timed out SET DEBUG_SYNC= "now SIGNAL con1_go"; connection con1; connection default; @@ -145,6 +145,8 @@ SET debug_sync="now WAIT_FOR injected_binlog_background_thread"; SET GLOBAL debug_dbug=@old_dbug; SET debug_sync="now SIGNAL reset_master_cont"; connection con3; +Warnings: +Warning 1639 Debug sync WAIT_FOR 'reset_master_cont' timed out connection default; SET debug_sync = 'reset'; *** MDEV-24660: MYSQL_BIN_LOG::cleanup(): Assertion `b->xid_count == 0' failed in MYSQL_BIN_LOG::cleanup diff --git a/mysql-test/suite/parts/r/partition_debug_sync_innodb.result b/mysql-test/suite/parts/r/partition_debug_sync_innodb.result index 3305802a288..a3107bcf537 100644 --- a/mysql-test/suite/parts/r/partition_debug_sync_innodb.result +++ b/mysql-test/suite/parts/r/partition_debug_sync_innodb.result @@ -74,7 +74,7 @@ ALTER TABLE t1 REORGANIZE PARTITION p0 INTO (PARTITION p0 VALUES LESS THAN (10), PARTITION p10 VALUES LESS THAN MAXVALUE); Warnings: -Warning 1639 debug sync point wait timed out +Warning 1639 Debug sync WAIT_FOR 'alter' timed out disconnect con1; connection default; TABLE_SCHEMA TABLE_NAME PARTITION_NAME PARTITION_ORDINAL_POSITION PARTITION_DESCRIPTION TABLE_ROWS diff --git a/mysql-test/suite/rpl/r/parallel_filter.result b/mysql-test/suite/rpl/r/parallel_filter.result new file mode 100644 index 00000000000..85b19ff796e --- /dev/null +++ b/mysql-test/suite/rpl/r/parallel_filter.result @@ -0,0 +1,184 @@ +include/rpl_init.inc [topology=1->2] +connection server_2; +set @old_parallel_threads= @@global.slave_parallel_threads; +set @old_slave_parallel_mode= @@global.slave_parallel_mode; +set @old_dbug= @@global.debug_dbug; +set @old_slave_ordered_thread= @@global.slave_ordered_thread; +set @old_slave_ordered_dont_wait= @@global.slave_ordered_dont_wait; +create procedure processlist() +select Command, State, Info +from information_schema.processlist +where Command like 'Slave_%' and +state like 'debug sync point%' and +Info is not null +order by db, Info; +include/stop_slave.inc +set global slave_parallel_threads= 5; +set global slave_parallel_mode= 'optimistic'; +set global slave_ordered_thread= 1; +set global slave_ordered_dont_wait= 1; +include/start_slave.inc +connection server_1; +set @_parallel_do_db= @@global.parallel_do_db; +set @_parallel_ignore_db= @@global.parallel_ignore_db; +set @_parallel_do_table= @@global.parallel_do_table; +set @_parallel_ignore_table= @@global.parallel_ignore_table; +set @@global.parallel_do_db= "db_parallel"; +set @@global.parallel_ignore_db= "db_serial"; +set @@global.parallel_do_table= "db_serial.t3"; +select @@global.parallel_do_db; +@@global.parallel_do_db +db_parallel +select @@global.replicate_do_db; +@@global.replicate_do_db + +create database db_parallel; +create database db_serial; +create database db_not_mentioned; +create table db_parallel.t1 (a int) engine innodb; +create table db_serial.t2 (b int) engine innodb; +create table db_serial.t3 (c int) engine innodb; +create table db_not_mentioned.t4 (c int) engine innodb; +connect m1,localhost,root,,test,$SERVER_MYPORT_1,; +connect m2,localhost,root,,test,$SERVER_MYPORT_1,; +connect m3,localhost,root,,test,$SERVER_MYPORT_1,; +connect m4,localhost,root,,test,$SERVER_MYPORT_1,; +connection server_2; +set global debug_dbug= '+d,ha_write_row_start'; +connection m1; +insert into db_parallel.t1 values (1); +connection server_2; +set debug_sync= 'now wait_for ha_write_row_entering_t1'; +connection m2; +insert into db_serial.t2 values (2); +connection server_2; +set debug_sync= 'now wait_for ha_write_row_entering_t2'; +connection m3; +insert into db_serial.t3 values (3); +connection m4; +insert into db_not_mentioned.t4 values (4); +connection server_2; +set debug_sync= 'now wait_for ha_write_row_entering_t3'; +call processlist; +Command State Info +Slave_worker debug sync point: now insert into db_parallel.t1 values (1) +Slave_ordered debug sync point: now insert into db_serial.t2 values (2) +Slave_worker debug sync point: now insert into db_serial.t3 values (3) +set debug_sync= 'now signal ha_write_row_continue_t1'; +set debug_sync= 'now wait_for ha_write_row_continued_t1'; +set debug_sync= 'now signal ha_write_row_continue_t2'; +set debug_sync= 'now wait_for ha_write_row_continued_t2'; +set debug_sync= 'now signal ha_write_row_continue_t3'; +set debug_sync= 'now wait_for ha_write_row_continued_t3'; +set debug_sync= 'now wait_for ha_write_row_entering_t4'; +connection m1; +insert into db_parallel.t1 values (5); +connection server_2; +set debug_sync= 'now wait_for ha_write_row_entering_t1'; +connection m2; +insert into db_serial.t3 values (6); +connection server_2; +set debug_sync= 'now wait_for ha_write_row_entering_t3'; +call processlist; +Command State Info +Slave_ordered debug sync point: now insert into db_not_mentioned.t4 values (4) +Slave_worker debug sync point: now insert into db_parallel.t1 values (5) +Slave_worker debug sync point: now insert into db_serial.t3 values (6) +set debug_sync= 'now signal ha_write_row_continue_t1'; +set debug_sync= 'now wait_for ha_write_row_continued_t1'; +set debug_sync= 'now signal ha_write_row_continue_t3'; +set debug_sync= 'now wait_for ha_write_row_continued_t3'; +set debug_sync= 'now signal ha_write_row_continue_t4'; +set debug_sync= 'now wait_for ha_write_row_continued_t4'; +connection m1; +connection m2; +connection m3; +connection m4; +# db_parallel.t1 is now ordered +connection m1; +set @@global.parallel_ignore_table= "db_parallel.t1"; +insert into db_parallel.t1 values (7); +connection server_2; +set debug_sync= 'now wait_for ha_write_row_entering_t1'; +call processlist; +Command State Info +Slave_ordered debug sync point: now insert into db_parallel.t1 values (7) +set debug_sync= 'now signal ha_write_row_continue_t1'; +set debug_sync= 'now wait_for ha_write_row_continued_t1'; +connection m1; +# db_parallel.t1 is parallel again +connection m1; +set @@global.parallel_do_table= "db_serial.t3, db_parallel.t1"; +insert into db_parallel.t1 values (8); +connection server_2; +set debug_sync= 'now wait_for ha_write_row_entering_t1'; +call processlist; +Command State Info +Slave_worker debug sync point: now insert into db_parallel.t1 values (8) +set debug_sync= 'now signal ha_write_row_continue_t1'; +set debug_sync= 'now wait_for ha_write_row_continued_t1'; +connection m1; +# Transaction contains 2 parallel DML +connection server_2; +set debug_sync= 'now wait_for ha_write_row_entering_t1'; +connection m1; +start transaction; +insert into db_parallel.t1 values (9); +insert into db_serial.t3 values (10); +commit; +connection server_2; +call processlist; +Command State Info +Slave_worker debug sync point: now insert into db_parallel.t1 values (9) +set debug_sync= 'now signal ha_write_row_continue_t1'; +set debug_sync= 'now wait_for ha_write_row_entering_t3'; +call processlist; +Command State Info +Slave_worker debug sync point: now insert into db_serial.t3 values (10) +set debug_sync= 'now signal ha_write_row_continue_t3'; +set debug_sync= 'now wait_for ha_write_row_continued_t3'; +connection m1; +# Transaction contains 1 parallel DML and 1 ordered DML +connection server_2; +set debug_sync= 'now wait_for ha_write_row_entering_t1'; +connection m1; +start transaction; +insert into db_parallel.t1 values (11); +insert into db_serial.t2 values (12); +commit; +connection server_2; +call processlist; +Command State Info +Slave_ordered debug sync point: now insert into db_parallel.t1 values (11) +set debug_sync= 'now signal ha_write_row_continue_t1'; +set debug_sync= 'now wait_for ha_write_row_entering_t2'; +call processlist; +Command State Info +Slave_ordered debug sync point: now insert into db_serial.t2 values (12) +set debug_sync= 'now signal ha_write_row_continue_t2'; +set debug_sync= 'now wait_for ha_write_row_continued_t2'; +connection m1; +disconnect m1; +disconnect m2; +disconnect m3; +disconnect m4; +connection server_2; +drop procedure processlist; +include/stop_slave.inc +set global slave_parallel_threads= @old_parallel_threads; +set global slave_parallel_mode= @old_slave_parallel_mode; +set global slave_ordered_thread= @old_slave_ordered_thread; +set global slave_ordered_dont_wait= @old_slave_ordered_dont_wait; +include/start_slave.inc +set @@global.debug_dbug= @old_dbug; +set debug_sync= reset; +connection server_1; +drop database db_parallel; +drop database db_serial; +drop database db_not_mentioned; +use test; +set @@global.parallel_do_db= @_parallel_do_db; +set @@global.parallel_ignore_db= @_parallel_ignore_db; +set @@global.parallel_do_table= @_parallel_do_table; +set @@global.parallel_ignore_table= @_parallel_ignore_table; +include/rpl_end.inc diff --git a/mysql-test/suite/rpl/t/parallel_filter.test b/mysql-test/suite/rpl/t/parallel_filter.test new file mode 100644 index 00000000000..41c18429ef5 --- /dev/null +++ b/mysql-test/suite/rpl/t/parallel_filter.test @@ -0,0 +1,223 @@ +--source include/have_debug.inc +--source include/have_innodb.inc +--source include/have_binlog_format_row_or_statement.inc +--let $rpl_topology=1->2 +--source include/rpl_init.inc + +--connection server_2 +set @old_parallel_threads= @@global.slave_parallel_threads; +set @old_slave_parallel_mode= @@global.slave_parallel_mode; +set @old_dbug= @@global.debug_dbug; +set @old_slave_ordered_thread= @@global.slave_ordered_thread; +set @old_slave_ordered_dont_wait= @@global.slave_ordered_dont_wait; + +create procedure processlist() + select Command, State, Info + from information_schema.processlist + where Command like 'Slave_%' and + state like 'debug sync point%' and + Info is not null + order by db, Info; + +--source include/stop_slave.inc +set global slave_parallel_threads= 5; +set global slave_parallel_mode= 'optimistic'; +set global slave_ordered_thread= 1; +set global slave_ordered_dont_wait= 1; +--source include/start_slave.inc + +--connection server_1 +# parallel_do_db +# parallel_do_table +# parallel_ignore_db +# parallel_ignore_table +# parallel_wild_do_table +# parallel_wild_ignore_table + +set @_parallel_do_db= @@global.parallel_do_db; +set @_parallel_ignore_db= @@global.parallel_ignore_db; +set @_parallel_do_table= @@global.parallel_do_table; +set @_parallel_ignore_table= @@global.parallel_ignore_table; + +set @@global.parallel_do_db= "db_parallel"; +# Note: in this case db_serial and db_not_mentioned is the same as we have +# parallel_do_db not empty. +set @@global.parallel_ignore_db= "db_serial"; +set @@global.parallel_do_table= "db_serial.t3"; + +select @@global.parallel_do_db; +select @@global.replicate_do_db; + +create database db_parallel; +create database db_serial; +create database db_not_mentioned; + +create table db_parallel.t1 (a int) engine innodb; +create table db_serial.t2 (b int) engine innodb; +create table db_serial.t3 (c int) engine innodb; +create table db_not_mentioned.t4 (c int) engine innodb; +--save_master_pos + +--connect (m1,localhost,root,,test,$SERVER_MYPORT_1,) +--connect (m2,localhost,root,,test,$SERVER_MYPORT_1,) +--connect (m3,localhost,root,,test,$SERVER_MYPORT_1,) +--connect (m4,localhost,root,,test,$SERVER_MYPORT_1,) + +--connection server_2 +--sync_with_master +set global debug_dbug= '+d,ha_write_row_start'; + +--connection m1 +send insert into db_parallel.t1 values (1); + +--connection server_2 +set debug_sync= 'now wait_for ha_write_row_entering_t1'; + +--connection m2 +send insert into db_serial.t2 values (2); + +--connection server_2 +set debug_sync= 'now wait_for ha_write_row_entering_t2'; + +--connection m3 +send insert into db_serial.t3 values (3); + +--connection m4 +send insert into db_not_mentioned.t4 values (4); + +--connection server_2 +set debug_sync= 'now wait_for ha_write_row_entering_t3'; +call processlist; + +set debug_sync= 'now signal ha_write_row_continue_t1'; +set debug_sync= 'now wait_for ha_write_row_continued_t1'; +set debug_sync= 'now signal ha_write_row_continue_t2'; +set debug_sync= 'now wait_for ha_write_row_continued_t2'; +set debug_sync= 'now signal ha_write_row_continue_t3'; +set debug_sync= 'now wait_for ha_write_row_continued_t3'; + +set debug_sync= 'now wait_for ha_write_row_entering_t4'; + +--connection m1 +reap; +send insert into db_parallel.t1 values (5); + +--connection server_2 +set debug_sync= 'now wait_for ha_write_row_entering_t1'; + +--connection m2 +reap; +send insert into db_serial.t3 values (6); + +--connection server_2 +set debug_sync= 'now wait_for ha_write_row_entering_t3'; + +call processlist; + +set debug_sync= 'now signal ha_write_row_continue_t1'; +set debug_sync= 'now wait_for ha_write_row_continued_t1'; +set debug_sync= 'now signal ha_write_row_continue_t3'; +set debug_sync= 'now wait_for ha_write_row_continued_t3'; +set debug_sync= 'now signal ha_write_row_continue_t4'; +set debug_sync= 'now wait_for ha_write_row_continued_t4'; + +--connection m1 +reap; +--connection m2 +reap; +--connection m3 +reap; +--connection m4 +reap; + +--echo # db_parallel.t1 is now ordered +--connection m1 +set @@global.parallel_ignore_table= "db_parallel.t1"; +send insert into db_parallel.t1 values (7); +--connection server_2 +set debug_sync= 'now wait_for ha_write_row_entering_t1'; +call processlist; +set debug_sync= 'now signal ha_write_row_continue_t1'; +set debug_sync= 'now wait_for ha_write_row_continued_t1'; +--connection m1 +reap; + +--echo # db_parallel.t1 is parallel again +--connection m1 +# spaces after comma are ignored (but not before!) +set @@global.parallel_do_table= "db_serial.t3, db_parallel.t1"; +send insert into db_parallel.t1 values (8); +--connection server_2 +set debug_sync= 'now wait_for ha_write_row_entering_t1'; +call processlist; +set debug_sync= 'now signal ha_write_row_continue_t1'; +set debug_sync= 'now wait_for ha_write_row_continued_t1'; +--connection m1 +reap; + +--echo # Transaction contains 2 parallel DML +--connection server_2 +send set debug_sync= 'now wait_for ha_write_row_entering_t1'; +--connection m1 +start transaction; +insert into db_parallel.t1 values (9); +insert into db_serial.t3 values (10); +send commit; +--connection server_2 +reap; +call processlist; +set debug_sync= 'now signal ha_write_row_continue_t1'; +set debug_sync= 'now wait_for ha_write_row_entering_t3'; +call processlist; +set debug_sync= 'now signal ha_write_row_continue_t3'; +set debug_sync= 'now wait_for ha_write_row_continued_t3'; +--connection m1 +reap; + +--echo # Transaction contains 1 parallel DML and 1 ordered DML +--connection server_2 +send set debug_sync= 'now wait_for ha_write_row_entering_t1'; +--connection m1 +start transaction; +insert into db_parallel.t1 values (11); +insert into db_serial.t2 values (12); +send commit; +--connection server_2 +reap; +call processlist; +set debug_sync= 'now signal ha_write_row_continue_t1'; +set debug_sync= 'now wait_for ha_write_row_entering_t2'; +call processlist; +set debug_sync= 'now signal ha_write_row_continue_t2'; +set debug_sync= 'now wait_for ha_write_row_continued_t2'; +--connection m1 +reap; + +disconnect m1; +disconnect m2; +disconnect m3; +disconnect m4; + +--connection server_2 +drop procedure processlist; +--source include/stop_slave.inc +set global slave_parallel_threads= @old_parallel_threads; +set global slave_parallel_mode= @old_slave_parallel_mode; +set global slave_ordered_thread= @old_slave_ordered_thread; +set global slave_ordered_dont_wait= @old_slave_ordered_dont_wait; +--source include/start_slave.inc +set @@global.debug_dbug= @old_dbug; +set debug_sync= reset; + +--connection server_1 +drop database db_parallel; +drop database db_serial; +drop database db_not_mentioned; +use test; + +set @@global.parallel_do_db= @_parallel_do_db; +set @@global.parallel_ignore_db= @_parallel_ignore_db; +set @@global.parallel_do_table= @_parallel_do_table; +set @@global.parallel_ignore_table= @_parallel_ignore_table; + +--source include/rpl_end.inc diff --git a/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result b/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result index 075c1ba959a..4e443ebae13 100644 --- a/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result +++ b/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result @@ -2692,6 +2692,66 @@ NUMERIC_BLOCK_SIZE NULL ENUM_VALUE_LIST NULL READ_ONLY NO COMMAND_LINE_ARGUMENT REQUIRED +VARIABLE_NAME PARALLEL_DO_DB +VARIABLE_SCOPE GLOBAL +VARIABLE_TYPE VARCHAR +VARIABLE_COMMENT Tell the master to restrict parallel replication to databases whose names appear in the comma-separated list. Other databases will be in serial replication, no matter parallel_ignore_db is used or not. +NUMERIC_MIN_VALUE NULL +NUMERIC_MAX_VALUE NULL +NUMERIC_BLOCK_SIZE NULL +ENUM_VALUE_LIST NULL +READ_ONLY NO +COMMAND_LINE_ARGUMENT NULL +VARIABLE_NAME PARALLEL_DO_TABLE +VARIABLE_SCOPE GLOBAL +VARIABLE_TYPE VARCHAR +VARIABLE_COMMENT Tells the master to restrict parallel replication to tables in the comma-separated list. +NUMERIC_MIN_VALUE NULL +NUMERIC_MAX_VALUE NULL +NUMERIC_BLOCK_SIZE NULL +ENUM_VALUE_LIST NULL +READ_ONLY NO +COMMAND_LINE_ARGUMENT NULL +VARIABLE_NAME PARALLEL_IGNORE_DB +VARIABLE_SCOPE GLOBAL +VARIABLE_TYPE VARCHAR +VARIABLE_COMMENT Tell the master to restrict parallel replication to databases whose names do not appear in the comma-separated list. If parallel_do_db is used this directive is ignored. +NUMERIC_MIN_VALUE NULL +NUMERIC_MAX_VALUE NULL +NUMERIC_BLOCK_SIZE NULL +ENUM_VALUE_LIST NULL +READ_ONLY NO +COMMAND_LINE_ARGUMENT NULL +VARIABLE_NAME PARALLEL_IGNORE_TABLE +VARIABLE_SCOPE GLOBAL +VARIABLE_TYPE VARCHAR +VARIABLE_COMMENT Tell the master to restrict parallel replication to tables whose names do not appear in the comma-separated list. +NUMERIC_MIN_VALUE NULL +NUMERIC_MAX_VALUE NULL +NUMERIC_BLOCK_SIZE NULL +ENUM_VALUE_LIST NULL +READ_ONLY NO +COMMAND_LINE_ARGUMENT NULL +VARIABLE_NAME PARALLEL_WILD_DO_TABLE +VARIABLE_SCOPE GLOBAL +VARIABLE_TYPE VARCHAR +VARIABLE_COMMENT Tells the master to restrict parallel replication to statements where all the updated tables match the specified database and table name patterns. +NUMERIC_MIN_VALUE NULL +NUMERIC_MAX_VALUE NULL +NUMERIC_BLOCK_SIZE NULL +ENUM_VALUE_LIST NULL +READ_ONLY NO +COMMAND_LINE_ARGUMENT NULL +VARIABLE_NAME PARALLEL_WILD_IGNORE_TABLE +VARIABLE_SCOPE GLOBAL +VARIABLE_TYPE VARCHAR +VARIABLE_COMMENT Tells the master to restrict parallel replication to statements where none of the updated tables match the specified database and table name patterns. +NUMERIC_MIN_VALUE NULL +NUMERIC_MAX_VALUE NULL +NUMERIC_BLOCK_SIZE NULL +ENUM_VALUE_LIST NULL +READ_ONLY NO +COMMAND_LINE_ARGUMENT NULL VARIABLE_NAME PERFORMANCE_SCHEMA VARIABLE_SCOPE GLOBAL VARIABLE_TYPE BOOLEAN @@ -3932,6 +3992,26 @@ NUMERIC_BLOCK_SIZE 1 ENUM_VALUE_LIST NULL READ_ONLY NO COMMAND_LINE_ARGUMENT REQUIRED +VARIABLE_NAME SLAVE_ORDERED_DONT_WAIT +VARIABLE_SCOPE GLOBAL +VARIABLE_TYPE BOOLEAN +VARIABLE_COMMENT Slave ordered events don't wait previous groups unless they have explicit WAIT flag +NUMERIC_MIN_VALUE NULL +NUMERIC_MAX_VALUE NULL +NUMERIC_BLOCK_SIZE NULL +ENUM_VALUE_LIST OFF,ON +READ_ONLY NO +COMMAND_LINE_ARGUMENT OPTIONAL +VARIABLE_NAME SLAVE_ORDERED_THREAD +VARIABLE_SCOPE GLOBAL +VARIABLE_TYPE BOOLEAN +VARIABLE_COMMENT Per-domain dedicated thread on slave for processing ordered events +NUMERIC_MIN_VALUE NULL +NUMERIC_MAX_VALUE NULL +NUMERIC_BLOCK_SIZE NULL +ENUM_VALUE_LIST OFF,ON +READ_ONLY NO +COMMAND_LINE_ARGUMENT OPTIONAL VARIABLE_NAME SLAVE_PARALLEL_MAX_QUEUED VARIABLE_SCOPE GLOBAL VARIABLE_TYPE BIGINT UNSIGNED diff --git a/mysql-test/suite/sysschema/r/pr_ps_setup_show_disabled.result b/mysql-test/suite/sysschema/r/pr_ps_setup_show_disabled.result index cf8ca76e214..84e53eb0354 100644 --- a/mysql-test/suite/sysschema/r/pr_ps_setup_show_disabled.result +++ b/mysql-test/suite/sysschema/r/pr_ps_setup_show_disabled.result @@ -96,6 +96,7 @@ statement/com/Reset stmt YES statement/com/Set option YES statement/com/Shutdown YES statement/com/Slave_IO YES +statement/com/Slave_ordered YES statement/com/Slave_SQL YES statement/com/Slave_worker YES statement/com/Sleep YES @@ -191,6 +192,7 @@ statement/com/Reset stmt YES statement/com/Set option YES statement/com/Shutdown YES statement/com/Slave_IO YES +statement/com/Slave_ordered YES statement/com/Slave_SQL YES statement/com/Slave_worker YES statement/com/Sleep YES diff --git a/mysql-test/suite/sysschema/r/pr_ps_setup_show_disabled_instruments.result b/mysql-test/suite/sysschema/r/pr_ps_setup_show_disabled_instruments.result index b0aa5f6d357..980094c939f 100644 --- a/mysql-test/suite/sysschema/r/pr_ps_setup_show_disabled_instruments.result +++ b/mysql-test/suite/sysschema/r/pr_ps_setup_show_disabled_instruments.result @@ -42,6 +42,7 @@ statement/com/Reset stmt YES statement/com/Set option YES statement/com/Shutdown YES statement/com/Slave_IO YES +statement/com/Slave_ordered YES statement/com/Slave_SQL YES statement/com/Slave_worker YES statement/com/Sleep YES diff --git a/mysql-test/suite/sysschema/r/pr_ps_setup_show_enabled.result b/mysql-test/suite/sysschema/r/pr_ps_setup_show_enabled.result index 261ccb191ed..ecb91faa4dd 100644 --- a/mysql-test/suite/sysschema/r/pr_ps_setup_show_enabled.result +++ b/mysql-test/suite/sysschema/r/pr_ps_setup_show_enabled.result @@ -171,6 +171,7 @@ statement/com/Reset stmt YES statement/com/Set option YES statement/com/Shutdown YES statement/com/Slave_IO YES +statement/com/Slave_ordered YES statement/com/Slave_SQL YES statement/com/Slave_worker YES statement/com/Sleep YES @@ -348,6 +349,7 @@ statement/com/Reset stmt YES statement/com/Set option YES statement/com/Shutdown YES statement/com/Slave_IO YES +statement/com/Slave_ordered YES statement/com/Slave_SQL YES statement/com/Slave_worker YES statement/com/Sleep YES diff --git a/mysql-test/suite/sysschema/r/pr_ps_setup_show_enabled_instruments.result b/mysql-test/suite/sysschema/r/pr_ps_setup_show_enabled_instruments.result index 36399f0d0cc..0d517192dd4 100644 --- a/mysql-test/suite/sysschema/r/pr_ps_setup_show_enabled_instruments.result +++ b/mysql-test/suite/sysschema/r/pr_ps_setup_show_enabled_instruments.result @@ -113,6 +113,7 @@ statement/com/Reset stmt YES statement/com/Set option YES statement/com/Shutdown YES statement/com/Slave_IO YES +statement/com/Slave_ordered YES statement/com/Slave_SQL YES statement/com/Slave_worker YES statement/com/Sleep YES diff --git a/sql/debug_sync.cc b/sql/debug_sync.cc index eac111d32d7..d39fb6c6058 100644 --- a/sql/debug_sync.cc +++ b/sql/debug_sync.cc @@ -1658,9 +1658,10 @@ static void debug_sync_execute(THD *thd, st_debug_sync_action *action) { // We should not make the statement fail, even if in strict mode. Abort_on_warning_instant_set aws(thd, false); - push_warning(thd, Sql_condition::WARN_LEVEL_WARN, + push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, ER_DEBUG_SYNC_TIMEOUT, - ER_THD(thd, ER_DEBUG_SYNC_TIMEOUT)); + ER_THD(thd, ER_DEBUG_SYNC_TIMEOUT), + action->wait_for.c_ptr()); DBUG_EXECUTE_IF("debug_sync_abort_on_timeout", DBUG_ASSERT(0);); break; } diff --git a/sql/handler.cc b/sql/handler.cc index 595e76c708b..ec4dc8ba273 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -7758,7 +7758,34 @@ int handler::ha_write_row(const uchar *buf) DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE || m_lock_type == F_WRLCK); DBUG_ENTER("handler::ha_write_row"); +#ifdef ENABLED_DEBUG_SYNC + if (!lex_string_eq(&MYSQL_SCHEMA_NAME, &table_share->db)) + { + DBUG_EXECUTE_IF("ha_write_row_start", + { + String act1( + C_STRING_WITH_LEN("now " + "signal ha_write_row_entering"), system_charset_info); + act1.append('_'); + act1.append(table->alias); + String act2( + C_STRING_WITH_LEN("now " + "wait_for ha_write_row_continue"), system_charset_info); + act2.append('_'); + act2.append(table->alias); + String act3( + C_STRING_WITH_LEN("now " + "signal ha_write_row_continued"), system_charset_info); + act3.append('_'); + act3.append(table->alias); + DBUG_ASSERT(debug_sync_service); + DBUG_ASSERT(!debug_sync_set_action(ha_thd(), act1.ptr(), act1.length())); + DBUG_ASSERT(!debug_sync_set_action(ha_thd(), act2.ptr(), act2.length())); + DBUG_ASSERT(!debug_sync_set_action(ha_thd(), act3.ptr(), act3.length())); + };); + } DEBUG_SYNC_C("ha_write_row_start"); +#endif if ((error= ha_check_overlaps(NULL, buf))) DBUG_RETURN(error); diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc index 3910d910da1..03f088d73ea 100644 --- a/sql/log_event_server.cc +++ b/sql/log_event_server.cc @@ -2109,6 +2109,11 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi, */ thd->variables.sql_log_slow= !MY_TEST(global_system_variables.log_slow_disabled_statements & LOG_SLOW_DISABLE_SLAVE); } + DBUG_PRINT("rpl", + ("GTID %u-%u-%llu " + "thread_id: %llu query: %s", + rgi->current_gtid.domain_id, rgi->current_gtid.server_id, + rgi->current_gtid.seq_no, thd->thread_id, thd->query())); mysql_parse(thd, thd->query(), thd->query_length(), &parser_state); /* Finalize server status flags after executing a statement. */ thd->update_server_status(); @@ -2883,7 +2888,7 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg, flags2|= FL_DDL; else if (is_transactional && !is_tmp_table) flags2|= FL_TRANSACTIONAL; - if (!(thd_arg->variables.option_bits & OPTION_RPL_SKIP_PARALLEL)) + if (!thd_arg->rpl_ordered && !(thd_arg->variables.option_bits & OPTION_RPL_SKIP_PARALLEL)) flags2|= FL_ALLOW_PARALLEL; /* Preserve any DDL or WAITED flag in the slave's binlog. */ if (thd_arg->rgi_slave) diff --git a/sql/mysqld.cc b/sql/mysqld.cc index ee368def9be..1ec4a2072af 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -355,6 +355,8 @@ bool opt_bin_log, opt_bin_log_used=0, opt_ignore_builtin_innodb= 0; bool opt_bin_log_compress; uint opt_bin_log_compress_min_len; my_bool opt_log, debug_assert_if_crashed_table= 0, opt_help= 0; +my_bool opt_slave_ordered_thread= 0; +my_bool opt_slave_ordered_dont_wait= 0; my_bool debug_assert_on_not_freed_memory= 0; my_bool disable_log_notes, opt_support_flashback= 0; static my_bool opt_abort; @@ -632,6 +634,7 @@ THD_list server_threads; Rpl_filter* cur_rpl_filter; Rpl_filter* global_rpl_filter; Rpl_filter* binlog_filter; +Rpl_filter* parallel_filter; struct system_variables global_system_variables; /** @@ -2027,6 +2030,7 @@ static void clean_up(bool print_message) wsrep_thr_deinit(); my_uuid_end(); delete type_handler_data; + delete parallel_filter; delete binlog_filter; delete global_rpl_filter; end_ssl(); @@ -3947,7 +3951,8 @@ static int init_common_variables() global_rpl_filter= new Rpl_filter; binlog_filter= new Rpl_filter; - if (!global_rpl_filter || !binlog_filter) + parallel_filter= new Rpl_filter; + if (!global_rpl_filter || !binlog_filter || !parallel_filter) { sql_perror("Could not allocate replication and binlog filters"); exit(1); @@ -5952,6 +5957,7 @@ int mysqld_main(int argc, char **argv) /* Copy default global rpl_filter to global_rpl_filter */ copy_filter_setting(global_rpl_filter, get_or_create_rpl_filter("", 0)); + copy_filter_setting(parallel_filter, get_or_create_rpl_filter("", 0)); /* init_slave() must be called after the thread keys are created. diff --git a/sql/mysqld.h b/sql/mysqld.h index 54cafdcde15..3cceb2c0afd 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -833,6 +833,12 @@ enum options_mysqld OPT_REPLICATE_REWRITE_DB, OPT_REPLICATE_WILD_DO_TABLE, OPT_REPLICATE_WILD_IGNORE_TABLE, + OPT_PARALLEL_DO_DB, + OPT_PARALLEL_DO_TABLE, + OPT_PARALLEL_IGNORE_DB, + OPT_PARALLEL_IGNORE_TABLE, + OPT_PARALLEL_WILD_DO_TABLE, + OPT_PARALLEL_WILD_IGNORE_TABLE, OPT_SAFE, OPT_SERVER_ID, OPT_SILENT, @@ -989,6 +995,8 @@ extern ulong opt_binlog_dbug_fsync_sleep; extern uint volatile global_disable_checkpoint; extern my_bool opt_help; +extern my_bool opt_slave_ordered_thread; +extern my_bool opt_slave_ordered_dont_wait; extern int mysqld_main(int argc, char **argv); diff --git a/sql/rpl_filter.cc b/sql/rpl_filter.cc index fb2e07f62ab..11e9e913c47 100644 --- a/sql/rpl_filter.cc +++ b/sql/rpl_filter.cc @@ -87,17 +87,14 @@ Rpl_filter::~Rpl_filter() (I could not find an equivalent in the regex library MySQL uses). RETURN VALUES - 0 should not be logged/replicated - 1 should be logged/replicated + 0 should not be logged/replicated (all tables not matched) + 1 should be logged/replicated (any table matched) */ bool Rpl_filter::tables_ok(const char* db, TABLE_LIST* tables) { bool some_tables_updating= 0; - char hash_key[SAFE_NAME_LEN*2+2]; - char *end; - uint len; DBUG_ENTER("Rpl_filter::tables_ok"); for (; tables; tables= tables->next_global) @@ -106,31 +103,17 @@ Rpl_filter::tables_ok(const char* db, TABLE_LIST* tables) continue; some_tables_updating= 1; - if (!do_table_inited && - !ignore_table_inited && - !wild_do_table_inited && - !wild_ignore_table_inited) - continue; - - end= strmov(hash_key, tables->db.str ? tables->db.str : db); - *end++= '.'; - len= (uint) (strmov(end, tables->table_name.str) - hash_key); - if (do_table_inited) // if there are any do's - { - if (my_hash_search(&do_table, (uchar*) hash_key, len)) - DBUG_RETURN(1); - } - if (ignore_table_inited) // if there are any ignores - { - if (my_hash_search(&ignore_table, (uchar*) hash_key, len)) - DBUG_RETURN(0); - } - if (wild_do_table_inited && - find_wild(&wild_do_table, hash_key, len)) - DBUG_RETURN(1); - if (wild_ignore_table_inited && - find_wild(&wild_ignore_table, hash_key, len)) - DBUG_RETURN(0); + /* Bits 0-1 are set in case of lists match */ + /* Bit 2 (4) is set in case of no lists inited */ + /* Bit 3 (8) is set in case of no lists match */ + int res= table_ok(db, tables); + /* This table matched against some list, return result */ + if (!(res & NOT_IN_ANY_LIST)) + DBUG_RETURN(res & ALLOWED); + /* No lists set, no need to check more */ + if (res & NO_LISTS_SET) + break; + DBUG_ASSERT(res & NOT_MATCHED); } /* @@ -143,6 +126,42 @@ Rpl_filter::tables_ok(const char* db, TABLE_LIST* tables) !do_table_inited && !wild_do_table_inited); } + +int Rpl_filter::table_ok(const char* db, TABLE_LIST* tables) +{ + char hash_key[SAFE_NAME_LEN*2+2]; + char *end; + uint len; + + if (!do_table_inited && + !ignore_table_inited && + !wild_do_table_inited && + !wild_ignore_table_inited) + return (NO_LISTS_SET | ALLOWED); + + end= strmov(hash_key, tables->db.str ? tables->db.str : db); + *end++= '.'; + len= (uint) (strmov(end, tables->table_name.str) - hash_key); + if (do_table_inited) // if there are any do's + { + if (my_hash_search(&do_table, (uchar*) hash_key, len)) + return ALLOWED; + } + if (ignore_table_inited) // if there are any ignores + { + if (my_hash_search(&ignore_table, (uchar*) hash_key, len)) + return IGNORED; + } + if (wild_do_table_inited && + find_wild(&wild_do_table, hash_key, len)) + return (WILDCARD | ALLOWED); + if (wild_ignore_table_inited && + find_wild(&wild_ignore_table, hash_key, len)) + return (WILDCARD | IGNORED); + + return NOT_MATCHED; +} + #endif /* diff --git a/sql/rpl_filter.h b/sql/rpl_filter.h index ee3b9d516b7..25806f7dabc 100644 --- a/sql/rpl_filter.h +++ b/sql/rpl_filter.h @@ -42,6 +42,13 @@ typedef struct st_table_rule_ent class Rpl_filter { public: + static constexpr int NOT_IN_ANY_LIST= 0xfc; + static constexpr int NO_LISTS_SET= 4; + static constexpr int IGNORED= 0; + static constexpr int ALLOWED= 1; + static constexpr int WILDCARD= 2; + static constexpr int NOT_MATCHED= 8; + Rpl_filter(); ~Rpl_filter(); Rpl_filter(Rpl_filter const&); @@ -51,6 +58,7 @@ public: #ifndef MYSQL_CLIENT bool tables_ok(const char* db, TABLE_LIST *tables); + int table_ok(const char* db, TABLE_LIST *tables); #endif bool db_ok(const char* db); bool db_ok_with_wild_table(const char *db); @@ -157,5 +165,6 @@ private: extern Rpl_filter *global_rpl_filter; extern Rpl_filter *binlog_filter; +extern Rpl_filter *parallel_filter; #endif // RPL_FILTER_H diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index d3891b9508b..2348f99b03a 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -1143,7 +1143,7 @@ handle_rpl_parallel_thread(void *arg) rpt->running= true; mysql_cond_signal(&rpt->COND_rpl_thread); - thd->set_command(COM_SLAVE_WORKER); + thd->set_command(rpt->command); #ifdef WITH_WSREP wsrep_open(thd); if (wsrep_before_command(thd)) @@ -1184,6 +1184,8 @@ handle_rpl_parallel_thread(void *arg) rpt->add_to_worker_idle_time_and_reset(); more_events: + thd->set_command(rpt->command); + for (qev= events; qev; qev= next_qev) { Log_event_type event_type; @@ -2274,10 +2276,10 @@ static bool handle_split_alter(rpl_parallel_entry *e, { /* j is needed for round robin scheduling, we will start with rpl_thread_idx - go till rpl_thread_max and then start with 0 to rpl_thread_idx + go till parallel_threads() and then start with 0 to rpl_thread_idx */ int j= e->rpl_thread_idx; - for(uint i= 0; i < e->rpl_thread_max; i++) + for(uint i= 0; i < e->parallel_threads(); i++) { if (!e->rpl_threads[j] || e->rpl_threads[j]->current_owner != &e->rpl_threads[j] || !e->rpl_threads[j]->current_start_alter_id) @@ -2288,7 +2290,7 @@ static bool handle_split_alter(rpl_parallel_entry *e, goto idx_found; } j++; - j= j % e->rpl_thread_max; + j= j % e->parallel_threads(); } //We did not find and idx DBUG_ASSERT(0); @@ -2308,7 +2310,7 @@ idx_found: current_start_alters */ mysql_mutex_lock(&global_rpl_thread_pool.LOCK_rpl_thread_pool); - if (e->pending_start_alters < e->rpl_thread_max - 1 && + if (e->pending_start_alters < e->parallel_threads() - 1 && global_rpl_thread_pool.current_start_alters < global_rpl_thread_pool.count - 1) { @@ -2328,7 +2330,7 @@ idx_found: Gtid_log_event::FL_ROLLBACK_ALTER_E1 )) { //Free the corrosponding rpt current_start_alter_id - for(uint i= 0; i < e->rpl_thread_max; i++) + for(uint i= 0; i < e->parallel_threads(); i++) { if(e->rpl_threads[i] && e->rpl_threads[i]->current_start_alter_id == gtid_ev->sa_seq_no && @@ -2385,22 +2387,34 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond, { uint32 idx; - idx= rpl_thread_idx; if (gtid_ev) { - if (++idx >= rpl_thread_max) - idx= 0; - //rpl_thread_idx will be updated handle_split_alter - if (handle_split_alter(this, gtid_ev, &idx, did_enter_cond, rgi, old_stage)) - return rpl_threads[idx]; - if (gtid_ev->flags2 & - (Gtid_log_event::FL_COMPLETED_XA | Gtid_log_event::FL_PREPARED_XA)) - { - idx= my_hash_sort(&my_charset_bin, gtid_ev->xid.key(), - gtid_ev->xid.key_length()) % rpl_thread_max; + if (ordered_thread) + { + idx= rpl_thread_max - 1; + was_ordered= true; + } + else + { + was_ordered= false; + idx= rpl_thread_idx; + ++idx; + if (idx >= parallel_threads()) + idx= 0; + //rpl_thread_idx will be updated handle_split_alter + if (handle_split_alter(this, gtid_ev, &idx, did_enter_cond, rgi, old_stage)) + return rpl_threads[idx]; + if (gtid_ev->flags2 & + (Gtid_log_event::FL_COMPLETED_XA | Gtid_log_event::FL_PREPARED_XA)) + { + idx= my_hash_sort(&my_charset_bin, gtid_ev->xid.key(), + gtid_ev->xid.key_length()) % parallel_threads(); + } + rpl_thread_idx= idx; } - rpl_thread_idx= idx; } + else + idx= last_idx(); return choose_thread_internal(idx, did_enter_cond, rgi, old_stage); } @@ -2483,6 +2497,7 @@ rpl_parallel_thread * rpl_parallel_entry::choose_thread_internal(uint idx, rpl_threads[idx]= thr= global_rpl_thread_pool.get_thread(&rpl_threads[idx], this); + thr->command= was_ordered ? COM_SLAVE_ORDERED : COM_SLAVE_WORKER; return thr; } @@ -2571,6 +2586,7 @@ rpl_parallel::find(uint32 domain_id, Relay_log_info *rli) e->force_abort= false; } + e->ordered_thread= false; return e; } @@ -2770,7 +2786,7 @@ rpl_parallel_entry::queue_master_restart(rpl_group_info *rgi, Thus there is no need for the full complexity of choose_thread(). We only need to check if we have a current worker thread, and queue for it if so. */ - idx= rpl_thread_idx; + idx= last_idx(); thr= rpl_threads[idx]; if (!thr) return 0; @@ -2911,6 +2927,14 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, bool is_group_event; bool did_enter_cond= false; PSI_stage_info old_stage; + rpl_group_info::enum_speculation speculation= rpl_group_info::SPECULATE_NO; + enum_slave_parallel_mode mode; + uchar gtid_flags; + group_commit_orderer *gco; + bool new_gco= true; + uint8 force_switch_flag= 0; + + DBUG_EXECUTE_IF("slave_crash_if_parallel_apply", DBUG_SUICIDE();); /* Handle master log name change, seen in Rotate_log_event. */ @@ -3072,70 +3096,9 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, delete_or_keep_event_post_apply(serial_rgi, typ, ev); return 0; } - } - else - e= current; - - /* - Find a worker thread to queue the event for. - Prefer a new thread, so we maximise parallelism (at least for the group - commit). But do not exceed a limit of --slave-domain-parallel-threads; - instead re-use a thread that we queued for previously. - */ - cur_thread= - e->choose_thread(serial_rgi, &did_enter_cond, &old_stage, gtid_ev); - if (!cur_thread) - { - /* This means we were killed. The error is already signalled. */ - delete ev; - return 1; - } - - if (!(qev= cur_thread->get_qev(ev, event_size, rli))) - { - abandon_worker_thread(rli->sql_driver_thd, cur_thread, - &did_enter_cond, &old_stage); - delete ev; - return 1; - } - - if (typ == GTID_EVENT) - { - bool new_gco; - enum_slave_parallel_mode mode= rli->mi->parallel_mode; - uchar gtid_flags= gtid_ev->flags2; - group_commit_orderer *gco; - uint8 force_switch_flag; - enum rpl_group_info::enum_speculation speculation; - - if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e, event_size))) - { - cur_thread->free_qev(qev); - abandon_worker_thread(rli->sql_driver_thd, cur_thread, - &did_enter_cond, &old_stage); - delete ev; - return 1; - } - - /* - We queue the event group in a new worker thread, to run in parallel - with previous groups. - - To preserve commit order within the replication domain, we set up - rgi->wait_commit_sub_id to make the new group commit only after the - previous group has committed. - - Event groups that group-committed together on the master can be run - in parallel with each other without restrictions. But one batch of - group-commits may not start before all groups in the previous batch - have initiated their commit phase; we set up rgi->gco to ensure that. - */ - rgi->wait_commit_sub_id= e->current_sub_id; - rgi->wait_commit_group_info= e->current_group_info; + mode= rli->mi->parallel_mode; + gtid_flags= gtid_ev->flags2; - speculation= rpl_group_info::SPECULATE_NO; - new_gco= true; - force_switch_flag= 0; gco= e->current_gco; if (likely(gco)) { @@ -3188,18 +3151,78 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, new group_commit_orderer, since we still want following transactions to run in parallel with transactions prior to this one. */ - speculation= rpl_group_info::SPECULATE_WAIT; + if (opt_slave_ordered_dont_wait && !(gtid_flags & Gtid_log_event::FL_WAITED)) + speculation= rpl_group_info::SPECULATE_OPTIMISTIC; + else + speculation= rpl_group_info::SPECULATE_WAIT; + if (opt_slave_ordered_thread) + e->ordered_thread= true; } else speculation= rpl_group_info::SPECULATE_OPTIMISTIC; } gco->flags= flags; - } + } /* if (gco) */ else { if (gtid_flags & Gtid_log_event::FL_DDL) force_switch_flag= group_commit_orderer::FORCE_SWITCH; + } /* else (!gco) */ + } /* if (typ == GTID_EVENT) */ + else + e= current; + + /* + Find a worker thread to queue the event for. + Prefer a new thread, so we maximise parallelism (at least for the group + commit). But do not exceed a limit of --slave-domain-parallel-threads; + instead re-use a thread that we queued for previously. + */ + cur_thread= + e->choose_thread(serial_rgi, &did_enter_cond, &old_stage, gtid_ev); + if (!cur_thread) + { + /* This means we were killed. The error is already signalled. */ + delete ev; + return 1; + } + + DBUG_ASSERT(e->rpl_threads[e->last_idx()] == cur_thread); + + if (!(qev= cur_thread->get_qev(ev, event_size, rli))) + { + abandon_worker_thread(rli->sql_driver_thd, cur_thread, + &did_enter_cond, &old_stage); + delete ev; + return 1; + } + + if (gtid_ev) + { + if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e, event_size))) + { + cur_thread->free_qev(qev); + abandon_worker_thread(rli->sql_driver_thd, cur_thread, + &did_enter_cond, &old_stage); + delete ev; + return 1; } + + /* + We queue the event group in a new worker thread, to run in parallel + with previous groups. + + To preserve commit order within the replication domain, we set up + rgi->wait_commit_sub_id to make the new group commit only after the + previous group has committed. + + Event groups that group-committed together on the master can be run + in parallel with each other without restrictions. But one batch of + group-commits may not start before all groups in the previous batch + have initiated their commit phase; we set up rgi->gco to ensure that. + */ + rgi->wait_commit_sub_id= e->current_sub_id; + rgi->wait_commit_group_info= e->current_group_info; rgi->speculation= speculation; if (gtid_flags & Gtid_log_event::FL_GROUP_COMMIT_ID) @@ -3288,6 +3311,29 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, qev->rgi= e->current_group_info; } + if (typ == GTID_EVENT) + { + DBUG_PRINT("rpl", + ("pos: %llu " + "GTID %u-%u-%llu " + "cid=%llu " + "idx: %u " + "spcl: %u fsf: %u ng: %u " + "groups_q: %llu", + ev->log_pos, + gtid_ev->domain_id, gtid_ev->server_id, gtid_ev->seq_no, + gtid_ev->commit_id, + e->last_idx(), + speculation, force_switch_flag, new_gco, + e->count_queued_event_groups)); + } + else + { + DBUG_PRINT("rpl", + ("pos: %llu type: %u idx: %u", + ev->log_pos, typ, e->last_idx())); + } + /* Queue the event for processing. */ diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index 66c7fc9f316..ab125ebbccd 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -117,6 +117,7 @@ struct rpl_parallel_thread { mysql_cond_t COND_rpl_thread_stop; struct rpl_parallel_thread *next; /* For free list. */ struct rpl_parallel_thread_pool *pool; + enum enum_server_command command; THD *thd; /* Who owns the thread, if any (it's a pointer into the @@ -377,6 +378,19 @@ struct rpl_parallel_entry { rpl_parallel_thread **rpl_threads; uint32 rpl_thread_max; uint32 rpl_thread_idx; + bool ordered_thread; + bool was_ordered; + + uint32 last_idx() const + { + return was_ordered ? rpl_thread_max - 1 : rpl_thread_idx; + } + + uint32 parallel_threads() const + { + return opt_slave_ordered_thread ? rpl_thread_max - 1 : rpl_thread_max; + } + /* The sub_id of the last transaction to commit within this domain_id. Must be accessed under LOCK_parallel_entry protection. diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt index 34a5dad6d3f..8309e2fe739 100644 --- a/sql/share/errmsg-utf8.txt +++ b/sql/share/errmsg-utf8.txt @@ -7974,11 +7974,8 @@ WARN_NON_ASCII_SEPARATOR_NOT_IMPLEMENTED geo "არა-ASCII გამყოფი არგუმენტები სრულად მხარდაჭერილი არაა" spa "Los argumentos con separador No ASCII no están completamente soportados" ER_DEBUG_SYNC_TIMEOUT - chi "调试同步点等待超时" - eng "debug sync point wait timed out" - ger "Debug Sync Point Wartezeit überschritten" - geo "გამართვის სინქრონიზაციის წერტილის მოლოდინის დრო ამოიწურა" - spa "agotado tiempo de espera de punto de sincronización de depuración" + eng "Debug sync WAIT_FOR '%-.64s' timed out" + ger "Debug sync WAIT_FOR '%-.64s' wartezeit überschritten" ER_DEBUG_SYNC_HIT_LIMIT chi "调试同步点限制达到" eng "debug sync point hit limit reached" diff --git a/sql/sql_class.cc b/sql/sql_class.cc index c6a929d6fea..3a27b78a462 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -902,6 +902,7 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) prepare_derived_at_open= FALSE; create_tmp_table_for_derived= FALSE; save_prep_leaf_list= FALSE; + rpl_ordered= false; org_charset= 0; /* Restore THR_THD */ set_current_thd(old_THR_THD); diff --git a/sql/sql_class.h b/sql/sql_class.h index 79f347f40c5..1c2acc80057 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -2907,6 +2907,7 @@ public: bool create_tmp_table_for_derived; bool save_prep_leaf_list; + bool rpl_ordered; /* container for handler's private per-connection data */ Ha_data ha_data[MAX_HA]; diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 7a171174a72..e09d0129481 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -371,7 +371,7 @@ const LEX_CSTRING command_name[257]={ { 0, 0 }, //237 { 0, 0 }, //238 { 0, 0 }, //239 - { 0, 0 }, //240 + { STRING_WITH_LEN("Slave_ordered") }, //240 { 0, 0 }, //241 { 0, 0 }, //242 { 0, 0 }, //243 @@ -6057,6 +6057,27 @@ finish: } else { + /* + In multi-statement transaction if one statement sets rpl_ordered, it stays + for the whole transaction. + */ + if (all_tables && !thd->rpl_ordered && thd->variables.option_bits & OPTION_BIN_LOG) + { + TABLE_LIST *table; + /* Transaction can be parallel as long as all tables allow parallel */ + bool can_parallel= true; + for (table= all_tables; can_parallel && table; table= table->next_global) + { + if (!table->updating) + continue; + int res= parallel_filter->table_ok(thd->db.str, table); + if (!(res & Rpl_filter::NOT_IN_ANY_LIST)) + can_parallel= (res & Rpl_filter::ALLOWED); + else /* Table did not match any lists, check database lists */ + can_parallel= parallel_filter->db_ok(table->db.str); + } + thd->rpl_ordered= !can_parallel; + } /* If commit fails, we should be able to reset the OK status. */ THD_STAGE_INFO(thd, stage_commit); thd->get_stmt_da()->set_overwrite_status(true); @@ -7590,6 +7611,8 @@ void THD::reset_for_next_command(bool do_clear_error) binlog_unsafe_warning_flags= 0; save_prep_leaf_list= false; + if (!in_multi_stmt_transaction_mode()) + rpl_ordered= false; #if defined(WITH_WSREP) && !defined(DBUG_OFF) if (mysql_bin_log.is_open()) diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 28443c8f4f7..7a4d7332f4a 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -5477,7 +5477,7 @@ bool Sys_var_rpl_filter::global_update(THD *thd, set_var *var) bool Sys_var_rpl_filter::set_filter_value(const char *value, Master_info *mi) { bool status= true; - Rpl_filter* rpl_filter= mi->rpl_filter; + Rpl_filter* rpl_filter= (opt_id < OPT_PARALLEL_DO_DB) ? mi->rpl_filter : parallel_filter; /* Proctect against other threads */ mysql_mutex_lock(&LOCK_active_mi); @@ -5486,21 +5486,27 @@ bool Sys_var_rpl_filter::set_filter_value(const char *value, Master_info *mi) status= rpl_filter->set_rewrite_db(value); break; case OPT_REPLICATE_DO_DB: + case OPT_PARALLEL_DO_DB: status= rpl_filter->set_do_db(value); break; case OPT_REPLICATE_DO_TABLE: + case OPT_PARALLEL_DO_TABLE: status= rpl_filter->set_do_table(value); break; case OPT_REPLICATE_IGNORE_DB: + case OPT_PARALLEL_IGNORE_DB: status= rpl_filter->set_ignore_db(value); break; case OPT_REPLICATE_IGNORE_TABLE: + case OPT_PARALLEL_IGNORE_TABLE: status= rpl_filter->set_ignore_table(value); break; case OPT_REPLICATE_WILD_DO_TABLE: + case OPT_PARALLEL_WILD_DO_TABLE: status= rpl_filter->set_wild_do_table(value); break; case OPT_REPLICATE_WILD_IGNORE_TABLE: + case OPT_PARALLEL_WILD_IGNORE_TABLE: status= rpl_filter->set_wild_ignore_table(value); break; } @@ -5529,7 +5535,8 @@ Sys_var_rpl_filter::global_value_ptr(THD *thd, return 0; } - rpl_filter= mi->rpl_filter; + rpl_filter= (opt_id < OPT_PARALLEL_DO_DB) ? mi->rpl_filter : parallel_filter; + tmp.length(0); mysql_mutex_lock(&LOCK_active_mi); switch (opt_id) { @@ -5537,21 +5544,27 @@ Sys_var_rpl_filter::global_value_ptr(THD *thd, rpl_filter->get_rewrite_db(&tmp); break; case OPT_REPLICATE_DO_DB: + case OPT_PARALLEL_DO_DB: rpl_filter->get_do_db(&tmp); break; case OPT_REPLICATE_DO_TABLE: + case OPT_PARALLEL_DO_TABLE: rpl_filter->get_do_table(&tmp); break; case OPT_REPLICATE_IGNORE_DB: + case OPT_PARALLEL_IGNORE_DB: rpl_filter->get_ignore_db(&tmp); break; case OPT_REPLICATE_IGNORE_TABLE: + case OPT_PARALLEL_IGNORE_TABLE: rpl_filter->get_ignore_table(&tmp); break; case OPT_REPLICATE_WILD_DO_TABLE: + case OPT_PARALLEL_WILD_DO_TABLE: rpl_filter->get_wild_do_table(&tmp); break; case OPT_REPLICATE_WILD_IGNORE_TABLE: + case OPT_PARALLEL_WILD_IGNORE_TABLE: rpl_filter->get_wild_ignore_table(&tmp); break; } @@ -5618,6 +5631,52 @@ static Sys_var_rpl_filter Sys_replicate_wild_ignore_table( "match the given wildcard pattern.", PRIV_SET_SYSTEM_GLOBAL_VAR_REPLICATE_WILD_IGNORE_TABLE); +static Sys_var_rpl_filter Sys_parallel_do_db( + "parallel_do_db", OPT_PARALLEL_DO_DB, + "Tell the master to restrict parallel replication to databases " + "whose names appear in the comma-separated list. Other databases will be " + "in serial replication, no matter parallel_ignore_db is used or not."); + +static Sys_var_rpl_filter Sys_parallel_do_table( + "parallel_do_table", OPT_PARALLEL_DO_TABLE, + "Tells the master to restrict parallel replication to tables in the " + "comma-separated list."); + +static Sys_var_rpl_filter Sys_parallel_ignore_db( + "parallel_ignore_db", OPT_PARALLEL_IGNORE_DB, + "Tell the master to restrict parallel replication to databases " + "whose names do not appear in the comma-separated list. " + "If parallel_do_db is used this directive is ignored."); + +static Sys_var_rpl_filter Sys_parallel_ignore_table( + "parallel_ignore_table", OPT_PARALLEL_IGNORE_TABLE, + "Tell the master to restrict parallel replication to tables " + "whose names do not appear in the comma-separated list."); + +static Sys_var_rpl_filter Sys_parallel_wild_do_table( + "parallel_wild_do_table", OPT_PARALLEL_WILD_DO_TABLE, + "Tells the master to restrict parallel replication to statements " + "where all the updated tables match the specified database " + "and table name patterns."); + +static Sys_var_rpl_filter Sys_parallel_wild_ignore_table( + "parallel_wild_ignore_table", OPT_PARALLEL_WILD_IGNORE_TABLE, + "Tells the master to restrict parallel replication to statements " + "where none of the updated tables match the specified database " + "and table name patterns."); + +static Sys_var_mybool Sys_slave_ordered_thread( + "slave_ordered_thread", + "Per-domain dedicated thread on slave for processing ordered events", + GLOBAL_VAR(opt_slave_ordered_thread), + CMD_LINE(OPT_ARG), DEFAULT(FALSE)); + +static Sys_var_mybool Sys_slave_ordered_dont_wait( + "slave_ordered_dont_wait", + "Slave ordered events don't wait previous groups unless they have explicit WAIT flag", + GLOBAL_VAR(opt_slave_ordered_dont_wait), + CMD_LINE(OPT_ARG), DEFAULT(FALSE)); + static Sys_var_charptr_fscs Sys_slave_load_tmpdir( "slave_load_tmpdir", "The location where the slave should put " "its temporary files when replicating a LOAD DATA INFILE command", diff --git a/sql/sys_vars.inl b/sql/sys_vars.inl index 2c5acdcdc6b..0904ede1669 100644 --- a/sql/sys_vars.inl +++ b/sql/sys_vars.inl @@ -764,6 +764,9 @@ public: option.var_type|= GET_STR | GET_ASK_ADDR; } + Sys_var_rpl_filter(const char *name, int getopt_id, const char *comment) + : Sys_var_rpl_filter(name, getopt_id, comment, REPL_SLAVE_ADMIN_ACL) {} + bool do_check(THD *thd, set_var *var) override { return Sys_var_charptr::do_string_check(thd, var, charset(thd)); |