diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/log_event.cc | 10 | ||||
-rw-r--r-- | sql/log_event.h | 4 | ||||
-rw-r--r-- | sql/mysqld.cc | 24 | ||||
-rw-r--r-- | sql/mysqld.h | 18 | ||||
-rw-r--r-- | sql/rpl_filter.cc | 2 | ||||
-rw-r--r-- | sql/rpl_filter.h | 7 | ||||
-rw-r--r-- | sql/rpl_mi.h | 11 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 15 | ||||
-rw-r--r-- | sql/share/errmsg-utf8.txt | 4 | ||||
-rw-r--r-- | sql/slave.cc | 21 | ||||
-rw-r--r-- | sql/sql_priv.h | 2 | ||||
-rw-r--r-- | sql/sys_vars.cc | 62 | ||||
-rw-r--r-- | sql/sys_vars.h | 9 |
13 files changed, 86 insertions, 103 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index 9bf9da8c9fa..4a7b1e221c9 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -6400,7 +6400,7 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg, flags2|= FL_DDL; else if (is_transactional) flags2|= FL_TRANSACTIONAL; - if (thd_arg->variables.option_bits & OPTION_RPL_ALLOW_PARALLEL) + if (!(thd_arg->variables.option_bits & OPTION_RPL_SKIP_PARALLEL)) flags2|= FL_ALLOW_PARALLEL; /* Preserve any DDL or WAITED flag in the slave's binlog. */ if (thd_arg->rgi_slave) @@ -6545,9 +6545,9 @@ Gtid_log_event::do_apply_event(rpl_group_info *rgi) /* Execute this like a BEGIN query event. */ bits|= OPTION_GTID_BEGIN; if (flags2 & FL_ALLOW_PARALLEL) - bits|= (ulonglong)OPTION_RPL_ALLOW_PARALLEL; + bits&= ~(ulonglong)OPTION_RPL_SKIP_PARALLEL; else - bits&= ~(ulonglong)OPTION_RPL_ALLOW_PARALLEL; + bits|= (ulonglong)OPTION_RPL_SKIP_PARALLEL; thd->variables.option_bits= bits; DBUG_PRINT("info", ("Set OPTION_GTID_BEGIN")); thd->set_query_and_id(gtid_begin_string, sizeof(gtid_begin_string)-1, @@ -6638,8 +6638,8 @@ Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) print_event_info->allow_parallel != !!(flags2 & FL_ALLOW_PARALLEL)) { my_b_printf(&cache, - "/*!100101 SET @@session.replicate_allow_parallel=%u*/%s\n", - !!(flags2 & FL_ALLOW_PARALLEL), print_event_info->delimiter); + "/*!100101 SET @@session.skip_parallel_replication=%u*/%s\n", + !(flags2 & FL_ALLOW_PARALLEL), print_event_info->delimiter); print_event_info->allow_parallel= !!(flags2 & FL_ALLOW_PARALLEL); print_event_info->allow_parallel_printed= true; } diff --git a/sql/log_event.h b/sql/log_event.h index 1cc75e0bd9a..b57ef35aad2 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -3177,8 +3177,8 @@ public: */ static const uchar FL_TRANSACTIONAL= 4; /* - FL_ALLOW_PARALLEL reflects the value of @@SESSION.replicate_allow_parallel - at the time of commit. + FL_ALLOW_PARALLEL reflects the (negation of the) value of + @@SESSION.skip_parallel_replication at the time of commit. */ static const uchar FL_ALLOW_PARALLEL= 8; /* diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 423d07b02cd..80b61e39160 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -562,8 +562,7 @@ ulong stored_program_cache_size= 0; ulong opt_slave_parallel_threads= 0; ulong opt_slave_domain_parallel_threads= 0; -ulonglong opt_slave_parallel_mode= - SLAVE_PARALLEL_DOMAIN | SLAVE_PARALLEL_FOLLOW_MASTER_COMMIT; +ulong opt_slave_parallel_mode= SLAVE_PARALLEL_CONSERVATIVE; ulong opt_binlog_commit_wait_count= 0; ulong opt_binlog_commit_wait_usec= 0; ulong opt_slave_parallel_max_queued= 131072; @@ -7326,16 +7325,16 @@ struct my_option my_long_options[]= #ifdef HAVE_REPLICATION {"slave-parallel-mode", OPT_SLAVE_PARALLEL_MODE, "Controls what transactions are applied in parallel when using " - "--slave-parallel-threads. Syntax: slave_parallel_mode=value[,value...], " - "where \"value\" could be one or more of: \"domain\", to apply different " - "replication domains in parallel; \"follow_master_commit\", to apply " - "in parallel transactions that group-committed together on the master; " - "\"transactional\", to optimistically try to apply all transactional " - "DML in parallel; and \"waiting\" to extend \"transactional\" to " - "even transactions that had to wait on the master.", + "--slave-parallel-threads. Possible values: \"optimistic\" tries to " + "apply most transactional DML in parallel, and handles any conflicts " + "with rollback and retry. \"conservative\" limits parallelism in an " + "effort to avoid any conflicts. \"aggressive\" tries to maximise the " + "parallelism, possibly at the cost of increased conflict rate. " + "\"minimal\" only parallelizes the commit steps of transactions. " + "\"none\" disables parallel apply completely.", &opt_slave_parallel_mode, &opt_slave_parallel_mode, - &slave_parallel_mode_typelib, GET_SET | GET_ASK_ADDR, REQUIRED_ARG, - SLAVE_PARALLEL_DOMAIN | SLAVE_PARALLEL_FOLLOW_MASTER_COMMIT, 0, 0, 0, 0, 0}, + &slave_parallel_mode_typelib, GET_ENUM | GET_ASK_ADDR, REQUIRED_ARG, + SLAVE_PARALLEL_CONSERVATIVE, 0, 0, 0, 0, 0}, #endif #if defined(_WIN32) && !defined(EMBEDDED_LIBRARY) {"slow-start-timeout", 0, @@ -8847,7 +8846,8 @@ mysqld_get_one_option(int optid, const struct my_option *opt, char *argument) case (int)OPT_SLAVE_PARALLEL_MODE: { /* Store latest mode for Master::Info */ - cur_rpl_filter->set_parallel_mode(opt_slave_parallel_mode); + cur_rpl_filter->set_parallel_mode + ((enum_slave_parallel_mode)opt_slave_parallel_mode); break; } diff --git a/sql/mysqld.h b/sql/mysqld.h index f89f2e99188..6a1ad65bd67 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -61,14 +61,16 @@ typedef Bitmap<((MAX_INDEXES+7)/8*8)> key_map; /* Used for finding keys */ #define OPT_GLOBAL SHOW_OPT_GLOBAL /* - Bit masks for the values in --slave-parallel-mode. - Note that these values cannot be changed - they are stored in master.info, - so need to be possible to read back in a different version of the server. + Values for --slave-parallel-mode + Must match order in slave_parallel_mode_typelib in sys_vars.cc. */ -#define SLAVE_PARALLEL_DOMAIN (1ULL << 0) -#define SLAVE_PARALLEL_FOLLOW_MASTER_COMMIT (1ULL << 1) -#define SLAVE_PARALLEL_TRX (1ULL << 2) -#define SLAVE_PARALLEL_WAITING (1ULL << 3) +enum enum_slave_parallel_mode { + SLAVE_PARALLEL_NONE, + SLAVE_PARALLEL_MINIMAL, + SLAVE_PARALLEL_CONSERVATIVE, + SLAVE_PARALLEL_OPTIMISTIC, + SLAVE_PARALLEL_AGGRESSIVE +}; /* Function prototypes */ void kill_mysql(void); @@ -201,7 +203,7 @@ extern ulong stored_program_cache_size; extern ulong opt_slave_parallel_threads; extern ulong opt_slave_domain_parallel_threads; extern ulong opt_slave_parallel_max_queued; -extern ulonglong opt_slave_parallel_mode; +extern ulong opt_slave_parallel_mode; extern ulong opt_binlog_commit_wait_count; extern ulong opt_binlog_commit_wait_usec; extern my_bool opt_gtid_ignore_duplicates; diff --git a/sql/rpl_filter.cc b/sql/rpl_filter.cc index e3ebd329cc4..f1c6d76f7ff 100644 --- a/sql/rpl_filter.cc +++ b/sql/rpl_filter.cc @@ -24,7 +24,7 @@ #define TABLE_RULE_ARR_SIZE 16 Rpl_filter::Rpl_filter() : - parallel_mode(SLAVE_PARALLEL_DOMAIN | SLAVE_PARALLEL_FOLLOW_MASTER_COMMIT), + parallel_mode(SLAVE_PARALLEL_CONSERVATIVE), table_rules_on(0), do_table_inited(0), ignore_table_inited(0), wild_do_table_inited(0), wild_ignore_table_inited(0) diff --git a/sql/rpl_filter.h b/sql/rpl_filter.h index f08971a9ac1..f24ece30a80 100644 --- a/sql/rpl_filter.h +++ b/sql/rpl_filter.h @@ -17,6 +17,7 @@ #define RPL_FILTER_H #include "mysql.h" +#include "mysqld.h" #include "sql_list.h" /* I_List */ #include "hash.h" /* HASH */ @@ -76,12 +77,12 @@ public: int set_do_db(const char* db_spec); int set_ignore_db(const char* db_spec); - void set_parallel_mode(ulonglong mode) + void set_parallel_mode(enum_slave_parallel_mode mode) { parallel_mode= mode; } /* Return given parallel mode or if one is not given, the default mode */ - int get_parallel_mode() + enum_slave_parallel_mode get_parallel_mode() { return parallel_mode; } @@ -137,7 +138,7 @@ private: HASH ignore_table; DYNAMIC_ARRAY wild_do_table; DYNAMIC_ARRAY wild_ignore_table; - ulonglong parallel_mode; + enum_slave_parallel_mode parallel_mode; bool table_rules_on; bool do_table_inited; diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h index 87ac0a5a46b..b74b9cb42b3 100644 --- a/sql/rpl_mi.h +++ b/sql/rpl_mi.h @@ -184,7 +184,8 @@ class Master_info : public Slave_reporting_capability static const char *using_gtid_astext(enum enum_using_gtid arg); bool using_parallel() { - return opt_slave_parallel_threads > 0 && parallel_mode != 0; + return opt_slave_parallel_threads > 0 && + parallel_mode > SLAVE_PARALLEL_NONE; } /* the variables below are needed because we can change masters on the fly */ @@ -300,12 +301,8 @@ class Master_info : public Slave_reporting_capability /* domain-id based filter */ Domain_id_filter domain_id_filter; - /* - The parallel replication modes, if any. A combination (binary OR) of any - of SLAVE_PARALLEL_DOMAIN, SLAVE_PARALLEL_FOLLOW_MASTER_COMMIT, - SLAVE_PARALLEL_TRX, and SLAVE_PARALLEL_WAITING. - */ - ulonglong parallel_mode; + /* The parallel replication mode. */ + enum_slave_parallel_mode parallel_mode; }; int init_master_info(Master_info* mi, const char* master_info_fname, diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index e37a82720b5..609f50952c0 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -2013,7 +2013,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, { Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO || - !(rli->mi->parallel_mode & SLAVE_PARALLEL_DOMAIN) ? + rli->mi->parallel_mode <= SLAVE_PARALLEL_MINIMAL ? 0 : gtid_ev->domain_id); if (!(e= find(domain_id))) { @@ -2054,7 +2054,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, { Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); bool new_gco; - ulonglong mode= rli->mi->parallel_mode; + enum_slave_parallel_mode mode= rli->mi->parallel_mode; uchar gtid_flags= gtid_ev->flags2; group_commit_orderer *gco; uint8 force_switch_flag; @@ -2093,7 +2093,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, { uint8 flags= gco->flags; - if (!(gtid_flags & Gtid_log_event::FL_GROUP_COMMIT_ID) || + if (mode <= SLAVE_PARALLEL_MINIMAL || + !(gtid_flags & Gtid_log_event::FL_GROUP_COMMIT_ID) || e->last_commit_id != gtid_ev->commit_id) flags|= group_commit_orderer::MULTI_BATCH; /* Make sure we do not attempt to run DDL in parallel speculatively. */ @@ -2108,7 +2109,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, */ new_gco= false; } - else if ((mode & SLAVE_PARALLEL_TRX) && + else if ((mode >= SLAVE_PARALLEL_OPTIMISTIC) && !(flags & group_commit_orderer::FORCE_SWITCH)) { /* @@ -2124,9 +2125,9 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, */ new_gco= false; if (!(gtid_flags & Gtid_log_event::FL_TRANSACTIONAL) || - !(gtid_flags & Gtid_log_event::FL_ALLOW_PARALLEL) || - ((gtid_flags & Gtid_log_event::FL_WAITED) && - !(mode & SLAVE_PARALLEL_WAITING))) + ( (!(gtid_flags & Gtid_log_event::FL_ALLOW_PARALLEL) || + (gtid_flags & Gtid_log_event::FL_WAITED)) && + (mode < SLAVE_PARALLEL_AGGRESSIVE))) { /* This transaction should not be speculatively run in parallel with diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt index a5d3ba19b2f..381370e9d4e 100644 --- a/sql/share/errmsg-utf8.txt +++ b/sql/share/errmsg-utf8.txt @@ -7115,8 +7115,8 @@ ER_SUBQUERIES_NOT_SUPPORTED 42000 eng "%s does not support subqueries or stored functions." ER_SET_STATEMENT_NOT_SUPPORTED 42000 eng "The system variable %.200s cannot be set in SET STATEMENT." -ER_INVALID_SLAVE_PARALLEL_MODE - eng "Invalid use of '%s' option for slave_parallel_mode" +ER_UNUSED_17 + eng "You should never see it" ER_USER_CREATE_EXISTS eng "Can't create user '%-.64s'@'%-.64s'; it already exists" ER_USER_DROP_EXISTS diff --git a/sql/slave.cc b/sql/slave.cc index f193659b170..6d64534faf9 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -2585,7 +2585,7 @@ static bool send_show_master_info_header(THD *thd, bool full, field_list.push_back(new Item_empty_string("Replicate_Ignore_Domain_Ids", FN_REFLEN)); field_list.push_back(new Item_empty_string("Parallel_Mode", - sizeof("domain,follow_master_commit,transactional,waiting")-1)); + sizeof("conservative")-1)); if (full) { field_list.push_back(new Item_return_int("Retried_transactions", @@ -2788,22 +2788,9 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, // Parallel_Mode { - /* Note how sizeof("domain") has room for "domain," due to traling 0. */ - char buf[sizeof("domain") + sizeof("follow_master_commit") + - sizeof("transactional") + sizeof("waiting") + 1]; - char *p= buf; - uint32 mode= mi->parallel_mode; - if (mode & SLAVE_PARALLEL_DOMAIN) - p= strmov(p, "domain,"); - if (mode & SLAVE_PARALLEL_FOLLOW_MASTER_COMMIT) - p= strmov(p, "follow_master_commit,"); - if (mode & SLAVE_PARALLEL_TRX) - p= strmov(p, "transactional,"); - if (mode & SLAVE_PARALLEL_WAITING) - p= strmov(p, "waiting,"); - if (p != buf) - --p; // Discard last ',' - protocol->store(buf, p-buf, &my_charset_bin); + const char *mode_name= get_type(&slave_parallel_mode_typelib, + mi->parallel_mode); + protocol->store(mode_name, strlen(mode_name), &my_charset_bin); } if (full) diff --git a/sql/sql_priv.h b/sql/sql_priv.h index 1389065873c..7ba027d2380 100644 --- a/sql/sql_priv.h +++ b/sql/sql_priv.h @@ -181,7 +181,7 @@ */ #define OPTION_ALLOW_BATCH (1ULL << 36) // THD, intern (slave) #define OPTION_SKIP_REPLICATION (1ULL << 37) // THD, user -#define OPTION_RPL_ALLOW_PARALLEL (1ULL << 38) +#define OPTION_RPL_SKIP_PARALLEL (1ULL << 38) /* The rest of the file is included in the server only */ #ifndef MYSQL_CLIENT diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 26d4d7ae04b..b535cec7b48 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -1874,18 +1874,12 @@ static Sys_var_ulong Sys_slave_parallel_max_queued( bool Sys_var_slave_parallel_mode::global_update(THD *thd, set_var *var) { - ulonglong new_value= var->save_result.ulonglong_value; + enum_slave_parallel_mode new_value= + (enum_slave_parallel_mode)var->save_result.ulonglong_value; LEX_STRING *base_name= &var->base; Master_info *mi; bool res= false; - if ((new_value & (SLAVE_PARALLEL_FOLLOW_MASTER_COMMIT|SLAVE_PARALLEL_TRX)) == - (SLAVE_PARALLEL_FOLLOW_MASTER_COMMIT|SLAVE_PARALLEL_TRX)) - { - my_error(ER_INVALID_SLAVE_PARALLEL_MODE, MYF(0), "transactional"); - return true; - } - if (!base_name->length) base_name= &thd->variables.default_master_connection; @@ -1928,7 +1922,8 @@ Sys_var_slave_parallel_mode::global_value_ptr(THD *thd, const LEX_STRING *base_name) { Master_info *mi; - ulonglong val= opt_slave_parallel_mode; + enum_slave_parallel_mode val= + (enum_slave_parallel_mode)opt_slave_parallel_mode; if (!base_name->length) base_name= &thd->variables.default_master_connection; @@ -1947,13 +1942,14 @@ Sys_var_slave_parallel_mode::global_value_ptr(THD *thd, mysql_mutex_lock(&LOCK_global_system_variables); if (!mi) return 0; - - return (uchar*)set_to_string(thd, 0, val, typelib.type_names); + + return valptr(thd, val); } +/* The order here must match enum_slave_parallel_mode in mysqld.h. */ static const char *slave_parallel_mode_names[] = { - "domain", "follow_master_commit", "transactional", "waiting", NULL + "none", "minimal", "conservative", "optimistic", "aggressive", NULL }; export TYPELIB slave_parallel_mode_typelib = { array_elements(slave_parallel_mode_names)-1, @@ -1965,28 +1961,26 @@ export TYPELIB slave_parallel_mode_typelib = { static Sys_var_slave_parallel_mode Sys_slave_parallel_mode( "slave_parallel_mode", "Controls what transactions are applied in parallel when using " - "--slave-parallel-threads. Syntax: slave_parallel_mode=value[,value...], " - "where \"value\" could be one or more of: \"domain\", to apply different " - "replication domains in parallel; \"follow_master_commit\", to apply " - "in parallel transactions that group-committed together on the master; " - "\"transactional\", to optimistically try to apply all transactional " - "DML in parallel; and \"waiting\" to extend \"transactional\" to " - "even transactions that had to wait on the master.", - GLOBAL_VAR(opt_slave_parallel_mode), - NO_CMD_LINE, slave_parallel_mode_names, - DEFAULT(SLAVE_PARALLEL_DOMAIN | - SLAVE_PARALLEL_FOLLOW_MASTER_COMMIT)); - - -static Sys_var_bit Sys_replicate_allow_parallel( - "replicate_allow_parallel", - "If set when a transaction is written to the binlog, that transaction " - "is allowed to replicate in parallel on a slave where " - "slave_parallel_mode is set to \"transactional\". Can be cleared for " - "transactions that are likely to cause a conflict if replicated in " - "parallel, to avoid unnecessary rollback and retry.", - SESSION_ONLY(option_bits), NO_CMD_LINE, OPTION_RPL_ALLOW_PARALLEL, - DEFAULT(TRUE), NO_MUTEX_GUARD, NOT_IN_BINLOG); + "--slave-parallel-threads. Possible values: \"optimistic\" tries to " + "apply most transactional DML in parallel, and handles any conflicts " + "with rollback and retry. \"conservative\" limits parallelism in an " + "effort to avoid any conflicts. \"aggressive\" tries to maximise the " + "parallelism, possibly at the cost of increased conflict rate. " + "\"minimal\" only parallelizes the commit steps of transactions. " + "\"none\" disables parallel apply completely.", + GLOBAL_VAR(opt_slave_parallel_mode), NO_CMD_LINE, + slave_parallel_mode_names, DEFAULT(SLAVE_PARALLEL_CONSERVATIVE)); + + +static Sys_var_bit Sys_skip_parallel_replication( + "skip_parallel_replication", + "If set when a transaction is written to the binlog, parallel apply of " + "that transaction will be avoided on a slave where slave_parallel_mode " + "is not \"aggressive\". Can be used to avoid unnecessary rollback and " + "retry for transactions that are likely to cause a conflict if " + "replicated in parallel.", + SESSION_ONLY(option_bits), NO_CMD_LINE, OPTION_RPL_SKIP_PARALLEL, + DEFAULT(FALSE), NO_MUTEX_GUARD, NOT_IN_BINLOG); static bool diff --git a/sql/sys_vars.h b/sql/sys_vars.h index a19102be6ec..61af931c189 100644 --- a/sql/sys_vars.h +++ b/sql/sys_vars.h @@ -2312,14 +2312,15 @@ public: /** Class for connection_name.slave_parallel_mode. */ -class Sys_var_slave_parallel_mode: public Sys_var_set +class Sys_var_slave_parallel_mode: public Sys_var_enum { public: Sys_var_slave_parallel_mode(const char *name_arg, const char *comment, int flag_args, ptrdiff_t off, size_t size, - CMD_LINE getopt, const char *values[], ulonglong def_val) - : Sys_var_set(name_arg, comment, flag_args, off, size, - getopt, values, def_val) + CMD_LINE getopt, const char *values[], + enum_slave_parallel_mode def_val) + : Sys_var_enum(name_arg, comment, flag_args, off, size, + getopt, values, def_val) { option.var_type|= GET_ASK_ADDR; option.value= (uchar**)1; // crash me, please |