summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/log_event.cc10
-rw-r--r--sql/log_event.h4
-rw-r--r--sql/mysqld.cc24
-rw-r--r--sql/mysqld.h18
-rw-r--r--sql/rpl_filter.cc2
-rw-r--r--sql/rpl_filter.h7
-rw-r--r--sql/rpl_mi.h11
-rw-r--r--sql/rpl_parallel.cc15
-rw-r--r--sql/share/errmsg-utf8.txt4
-rw-r--r--sql/slave.cc21
-rw-r--r--sql/sql_priv.h2
-rw-r--r--sql/sys_vars.cc62
-rw-r--r--sql/sys_vars.h9
13 files changed, 86 insertions, 103 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 9bf9da8c9fa..4a7b1e221c9 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -6400,7 +6400,7 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg,
flags2|= FL_DDL;
else if (is_transactional)
flags2|= FL_TRANSACTIONAL;
- if (thd_arg->variables.option_bits & OPTION_RPL_ALLOW_PARALLEL)
+ if (!(thd_arg->variables.option_bits & OPTION_RPL_SKIP_PARALLEL))
flags2|= FL_ALLOW_PARALLEL;
/* Preserve any DDL or WAITED flag in the slave's binlog. */
if (thd_arg->rgi_slave)
@@ -6545,9 +6545,9 @@ Gtid_log_event::do_apply_event(rpl_group_info *rgi)
/* Execute this like a BEGIN query event. */
bits|= OPTION_GTID_BEGIN;
if (flags2 & FL_ALLOW_PARALLEL)
- bits|= (ulonglong)OPTION_RPL_ALLOW_PARALLEL;
+ bits&= ~(ulonglong)OPTION_RPL_SKIP_PARALLEL;
else
- bits&= ~(ulonglong)OPTION_RPL_ALLOW_PARALLEL;
+ bits|= (ulonglong)OPTION_RPL_SKIP_PARALLEL;
thd->variables.option_bits= bits;
DBUG_PRINT("info", ("Set OPTION_GTID_BEGIN"));
thd->set_query_and_id(gtid_begin_string, sizeof(gtid_begin_string)-1,
@@ -6638,8 +6638,8 @@ Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
print_event_info->allow_parallel != !!(flags2 & FL_ALLOW_PARALLEL))
{
my_b_printf(&cache,
- "/*!100101 SET @@session.replicate_allow_parallel=%u*/%s\n",
- !!(flags2 & FL_ALLOW_PARALLEL), print_event_info->delimiter);
+ "/*!100101 SET @@session.skip_parallel_replication=%u*/%s\n",
+ !(flags2 & FL_ALLOW_PARALLEL), print_event_info->delimiter);
print_event_info->allow_parallel= !!(flags2 & FL_ALLOW_PARALLEL);
print_event_info->allow_parallel_printed= true;
}
diff --git a/sql/log_event.h b/sql/log_event.h
index 1cc75e0bd9a..b57ef35aad2 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -3177,8 +3177,8 @@ public:
*/
static const uchar FL_TRANSACTIONAL= 4;
/*
- FL_ALLOW_PARALLEL reflects the value of @@SESSION.replicate_allow_parallel
- at the time of commit.
+ FL_ALLOW_PARALLEL reflects the (negation of the) value of
+ @@SESSION.skip_parallel_replication at the time of commit.
*/
static const uchar FL_ALLOW_PARALLEL= 8;
/*
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 423d07b02cd..80b61e39160 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -562,8 +562,7 @@ ulong stored_program_cache_size= 0;
ulong opt_slave_parallel_threads= 0;
ulong opt_slave_domain_parallel_threads= 0;
-ulonglong opt_slave_parallel_mode=
- SLAVE_PARALLEL_DOMAIN | SLAVE_PARALLEL_FOLLOW_MASTER_COMMIT;
+ulong opt_slave_parallel_mode= SLAVE_PARALLEL_CONSERVATIVE;
ulong opt_binlog_commit_wait_count= 0;
ulong opt_binlog_commit_wait_usec= 0;
ulong opt_slave_parallel_max_queued= 131072;
@@ -7326,16 +7325,16 @@ struct my_option my_long_options[]=
#ifdef HAVE_REPLICATION
{"slave-parallel-mode", OPT_SLAVE_PARALLEL_MODE,
"Controls what transactions are applied in parallel when using "
- "--slave-parallel-threads. Syntax: slave_parallel_mode=value[,value...], "
- "where \"value\" could be one or more of: \"domain\", to apply different "
- "replication domains in parallel; \"follow_master_commit\", to apply "
- "in parallel transactions that group-committed together on the master; "
- "\"transactional\", to optimistically try to apply all transactional "
- "DML in parallel; and \"waiting\" to extend \"transactional\" to "
- "even transactions that had to wait on the master.",
+ "--slave-parallel-threads. Possible values: \"optimistic\" tries to "
+ "apply most transactional DML in parallel, and handles any conflicts "
+ "with rollback and retry. \"conservative\" limits parallelism in an "
+ "effort to avoid any conflicts. \"aggressive\" tries to maximise the "
+ "parallelism, possibly at the cost of increased conflict rate. "
+ "\"minimal\" only parallelizes the commit steps of transactions. "
+ "\"none\" disables parallel apply completely.",
&opt_slave_parallel_mode, &opt_slave_parallel_mode,
- &slave_parallel_mode_typelib, GET_SET | GET_ASK_ADDR, REQUIRED_ARG,
- SLAVE_PARALLEL_DOMAIN | SLAVE_PARALLEL_FOLLOW_MASTER_COMMIT, 0, 0, 0, 0, 0},
+ &slave_parallel_mode_typelib, GET_ENUM | GET_ASK_ADDR, REQUIRED_ARG,
+ SLAVE_PARALLEL_CONSERVATIVE, 0, 0, 0, 0, 0},
#endif
#if defined(_WIN32) && !defined(EMBEDDED_LIBRARY)
{"slow-start-timeout", 0,
@@ -8847,7 +8846,8 @@ mysqld_get_one_option(int optid, const struct my_option *opt, char *argument)
case (int)OPT_SLAVE_PARALLEL_MODE:
{
/* Store latest mode for Master::Info */
- cur_rpl_filter->set_parallel_mode(opt_slave_parallel_mode);
+ cur_rpl_filter->set_parallel_mode
+ ((enum_slave_parallel_mode)opt_slave_parallel_mode);
break;
}
diff --git a/sql/mysqld.h b/sql/mysqld.h
index f89f2e99188..6a1ad65bd67 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -61,14 +61,16 @@ typedef Bitmap<((MAX_INDEXES+7)/8*8)> key_map; /* Used for finding keys */
#define OPT_GLOBAL SHOW_OPT_GLOBAL
/*
- Bit masks for the values in --slave-parallel-mode.
- Note that these values cannot be changed - they are stored in master.info,
- so need to be possible to read back in a different version of the server.
+ Values for --slave-parallel-mode
+ Must match order in slave_parallel_mode_typelib in sys_vars.cc.
*/
-#define SLAVE_PARALLEL_DOMAIN (1ULL << 0)
-#define SLAVE_PARALLEL_FOLLOW_MASTER_COMMIT (1ULL << 1)
-#define SLAVE_PARALLEL_TRX (1ULL << 2)
-#define SLAVE_PARALLEL_WAITING (1ULL << 3)
+enum enum_slave_parallel_mode {
+ SLAVE_PARALLEL_NONE,
+ SLAVE_PARALLEL_MINIMAL,
+ SLAVE_PARALLEL_CONSERVATIVE,
+ SLAVE_PARALLEL_OPTIMISTIC,
+ SLAVE_PARALLEL_AGGRESSIVE
+};
/* Function prototypes */
void kill_mysql(void);
@@ -201,7 +203,7 @@ extern ulong stored_program_cache_size;
extern ulong opt_slave_parallel_threads;
extern ulong opt_slave_domain_parallel_threads;
extern ulong opt_slave_parallel_max_queued;
-extern ulonglong opt_slave_parallel_mode;
+extern ulong opt_slave_parallel_mode;
extern ulong opt_binlog_commit_wait_count;
extern ulong opt_binlog_commit_wait_usec;
extern my_bool opt_gtid_ignore_duplicates;
diff --git a/sql/rpl_filter.cc b/sql/rpl_filter.cc
index e3ebd329cc4..f1c6d76f7ff 100644
--- a/sql/rpl_filter.cc
+++ b/sql/rpl_filter.cc
@@ -24,7 +24,7 @@
#define TABLE_RULE_ARR_SIZE 16
Rpl_filter::Rpl_filter() :
- parallel_mode(SLAVE_PARALLEL_DOMAIN | SLAVE_PARALLEL_FOLLOW_MASTER_COMMIT),
+ parallel_mode(SLAVE_PARALLEL_CONSERVATIVE),
table_rules_on(0),
do_table_inited(0), ignore_table_inited(0),
wild_do_table_inited(0), wild_ignore_table_inited(0)
diff --git a/sql/rpl_filter.h b/sql/rpl_filter.h
index f08971a9ac1..f24ece30a80 100644
--- a/sql/rpl_filter.h
+++ b/sql/rpl_filter.h
@@ -17,6 +17,7 @@
#define RPL_FILTER_H
#include "mysql.h"
+#include "mysqld.h"
#include "sql_list.h" /* I_List */
#include "hash.h" /* HASH */
@@ -76,12 +77,12 @@ public:
int set_do_db(const char* db_spec);
int set_ignore_db(const char* db_spec);
- void set_parallel_mode(ulonglong mode)
+ void set_parallel_mode(enum_slave_parallel_mode mode)
{
parallel_mode= mode;
}
/* Return given parallel mode or if one is not given, the default mode */
- int get_parallel_mode()
+ enum_slave_parallel_mode get_parallel_mode()
{
return parallel_mode;
}
@@ -137,7 +138,7 @@ private:
HASH ignore_table;
DYNAMIC_ARRAY wild_do_table;
DYNAMIC_ARRAY wild_ignore_table;
- ulonglong parallel_mode;
+ enum_slave_parallel_mode parallel_mode;
bool table_rules_on;
bool do_table_inited;
diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h
index 87ac0a5a46b..b74b9cb42b3 100644
--- a/sql/rpl_mi.h
+++ b/sql/rpl_mi.h
@@ -184,7 +184,8 @@ class Master_info : public Slave_reporting_capability
static const char *using_gtid_astext(enum enum_using_gtid arg);
bool using_parallel()
{
- return opt_slave_parallel_threads > 0 && parallel_mode != 0;
+ return opt_slave_parallel_threads > 0 &&
+ parallel_mode > SLAVE_PARALLEL_NONE;
}
/* the variables below are needed because we can change masters on the fly */
@@ -300,12 +301,8 @@ class Master_info : public Slave_reporting_capability
/* domain-id based filter */
Domain_id_filter domain_id_filter;
- /*
- The parallel replication modes, if any. A combination (binary OR) of any
- of SLAVE_PARALLEL_DOMAIN, SLAVE_PARALLEL_FOLLOW_MASTER_COMMIT,
- SLAVE_PARALLEL_TRX, and SLAVE_PARALLEL_WAITING.
- */
- ulonglong parallel_mode;
+ /* The parallel replication mode. */
+ enum_slave_parallel_mode parallel_mode;
};
int init_master_info(Master_info* mi, const char* master_info_fname,
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index e37a82720b5..609f50952c0 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -2013,7 +2013,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
{
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ||
- !(rli->mi->parallel_mode & SLAVE_PARALLEL_DOMAIN) ?
+ rli->mi->parallel_mode <= SLAVE_PARALLEL_MINIMAL ?
0 : gtid_ev->domain_id);
if (!(e= find(domain_id)))
{
@@ -2054,7 +2054,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
{
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
bool new_gco;
- ulonglong mode= rli->mi->parallel_mode;
+ enum_slave_parallel_mode mode= rli->mi->parallel_mode;
uchar gtid_flags= gtid_ev->flags2;
group_commit_orderer *gco;
uint8 force_switch_flag;
@@ -2093,7 +2093,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
{
uint8 flags= gco->flags;
- if (!(gtid_flags & Gtid_log_event::FL_GROUP_COMMIT_ID) ||
+ if (mode <= SLAVE_PARALLEL_MINIMAL ||
+ !(gtid_flags & Gtid_log_event::FL_GROUP_COMMIT_ID) ||
e->last_commit_id != gtid_ev->commit_id)
flags|= group_commit_orderer::MULTI_BATCH;
/* Make sure we do not attempt to run DDL in parallel speculatively. */
@@ -2108,7 +2109,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
*/
new_gco= false;
}
- else if ((mode & SLAVE_PARALLEL_TRX) &&
+ else if ((mode >= SLAVE_PARALLEL_OPTIMISTIC) &&
!(flags & group_commit_orderer::FORCE_SWITCH))
{
/*
@@ -2124,9 +2125,9 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
*/
new_gco= false;
if (!(gtid_flags & Gtid_log_event::FL_TRANSACTIONAL) ||
- !(gtid_flags & Gtid_log_event::FL_ALLOW_PARALLEL) ||
- ((gtid_flags & Gtid_log_event::FL_WAITED) &&
- !(mode & SLAVE_PARALLEL_WAITING)))
+ ( (!(gtid_flags & Gtid_log_event::FL_ALLOW_PARALLEL) ||
+ (gtid_flags & Gtid_log_event::FL_WAITED)) &&
+ (mode < SLAVE_PARALLEL_AGGRESSIVE)))
{
/*
This transaction should not be speculatively run in parallel with
diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt
index a5d3ba19b2f..381370e9d4e 100644
--- a/sql/share/errmsg-utf8.txt
+++ b/sql/share/errmsg-utf8.txt
@@ -7115,8 +7115,8 @@ ER_SUBQUERIES_NOT_SUPPORTED 42000
eng "%s does not support subqueries or stored functions."
ER_SET_STATEMENT_NOT_SUPPORTED 42000
eng "The system variable %.200s cannot be set in SET STATEMENT."
-ER_INVALID_SLAVE_PARALLEL_MODE
- eng "Invalid use of '%s' option for slave_parallel_mode"
+ER_UNUSED_17
+ eng "You should never see it"
ER_USER_CREATE_EXISTS
eng "Can't create user '%-.64s'@'%-.64s'; it already exists"
ER_USER_DROP_EXISTS
diff --git a/sql/slave.cc b/sql/slave.cc
index f193659b170..6d64534faf9 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -2585,7 +2585,7 @@ static bool send_show_master_info_header(THD *thd, bool full,
field_list.push_back(new Item_empty_string("Replicate_Ignore_Domain_Ids",
FN_REFLEN));
field_list.push_back(new Item_empty_string("Parallel_Mode",
- sizeof("domain,follow_master_commit,transactional,waiting")-1));
+ sizeof("conservative")-1));
if (full)
{
field_list.push_back(new Item_return_int("Retried_transactions",
@@ -2788,22 +2788,9 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
// Parallel_Mode
{
- /* Note how sizeof("domain") has room for "domain," due to traling 0. */
- char buf[sizeof("domain") + sizeof("follow_master_commit") +
- sizeof("transactional") + sizeof("waiting") + 1];
- char *p= buf;
- uint32 mode= mi->parallel_mode;
- if (mode & SLAVE_PARALLEL_DOMAIN)
- p= strmov(p, "domain,");
- if (mode & SLAVE_PARALLEL_FOLLOW_MASTER_COMMIT)
- p= strmov(p, "follow_master_commit,");
- if (mode & SLAVE_PARALLEL_TRX)
- p= strmov(p, "transactional,");
- if (mode & SLAVE_PARALLEL_WAITING)
- p= strmov(p, "waiting,");
- if (p != buf)
- --p; // Discard last ','
- protocol->store(buf, p-buf, &my_charset_bin);
+ const char *mode_name= get_type(&slave_parallel_mode_typelib,
+ mi->parallel_mode);
+ protocol->store(mode_name, strlen(mode_name), &my_charset_bin);
}
if (full)
diff --git a/sql/sql_priv.h b/sql/sql_priv.h
index 1389065873c..7ba027d2380 100644
--- a/sql/sql_priv.h
+++ b/sql/sql_priv.h
@@ -181,7 +181,7 @@
*/
#define OPTION_ALLOW_BATCH (1ULL << 36) // THD, intern (slave)
#define OPTION_SKIP_REPLICATION (1ULL << 37) // THD, user
-#define OPTION_RPL_ALLOW_PARALLEL (1ULL << 38)
+#define OPTION_RPL_SKIP_PARALLEL (1ULL << 38)
/* The rest of the file is included in the server only */
#ifndef MYSQL_CLIENT
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index 26d4d7ae04b..b535cec7b48 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -1874,18 +1874,12 @@ static Sys_var_ulong Sys_slave_parallel_max_queued(
bool
Sys_var_slave_parallel_mode::global_update(THD *thd, set_var *var)
{
- ulonglong new_value= var->save_result.ulonglong_value;
+ enum_slave_parallel_mode new_value=
+ (enum_slave_parallel_mode)var->save_result.ulonglong_value;
LEX_STRING *base_name= &var->base;
Master_info *mi;
bool res= false;
- if ((new_value & (SLAVE_PARALLEL_FOLLOW_MASTER_COMMIT|SLAVE_PARALLEL_TRX)) ==
- (SLAVE_PARALLEL_FOLLOW_MASTER_COMMIT|SLAVE_PARALLEL_TRX))
- {
- my_error(ER_INVALID_SLAVE_PARALLEL_MODE, MYF(0), "transactional");
- return true;
- }
-
if (!base_name->length)
base_name= &thd->variables.default_master_connection;
@@ -1928,7 +1922,8 @@ Sys_var_slave_parallel_mode::global_value_ptr(THD *thd,
const LEX_STRING *base_name)
{
Master_info *mi;
- ulonglong val= opt_slave_parallel_mode;
+ enum_slave_parallel_mode val=
+ (enum_slave_parallel_mode)opt_slave_parallel_mode;
if (!base_name->length)
base_name= &thd->variables.default_master_connection;
@@ -1947,13 +1942,14 @@ Sys_var_slave_parallel_mode::global_value_ptr(THD *thd,
mysql_mutex_lock(&LOCK_global_system_variables);
if (!mi)
return 0;
-
- return (uchar*)set_to_string(thd, 0, val, typelib.type_names);
+
+ return valptr(thd, val);
}
+/* The order here must match enum_slave_parallel_mode in mysqld.h. */
static const char *slave_parallel_mode_names[] = {
- "domain", "follow_master_commit", "transactional", "waiting", NULL
+ "none", "minimal", "conservative", "optimistic", "aggressive", NULL
};
export TYPELIB slave_parallel_mode_typelib = {
array_elements(slave_parallel_mode_names)-1,
@@ -1965,28 +1961,26 @@ export TYPELIB slave_parallel_mode_typelib = {
static Sys_var_slave_parallel_mode Sys_slave_parallel_mode(
"slave_parallel_mode",
"Controls what transactions are applied in parallel when using "
- "--slave-parallel-threads. Syntax: slave_parallel_mode=value[,value...], "
- "where \"value\" could be one or more of: \"domain\", to apply different "
- "replication domains in parallel; \"follow_master_commit\", to apply "
- "in parallel transactions that group-committed together on the master; "
- "\"transactional\", to optimistically try to apply all transactional "
- "DML in parallel; and \"waiting\" to extend \"transactional\" to "
- "even transactions that had to wait on the master.",
- GLOBAL_VAR(opt_slave_parallel_mode),
- NO_CMD_LINE, slave_parallel_mode_names,
- DEFAULT(SLAVE_PARALLEL_DOMAIN |
- SLAVE_PARALLEL_FOLLOW_MASTER_COMMIT));
-
-
-static Sys_var_bit Sys_replicate_allow_parallel(
- "replicate_allow_parallel",
- "If set when a transaction is written to the binlog, that transaction "
- "is allowed to replicate in parallel on a slave where "
- "slave_parallel_mode is set to \"transactional\". Can be cleared for "
- "transactions that are likely to cause a conflict if replicated in "
- "parallel, to avoid unnecessary rollback and retry.",
- SESSION_ONLY(option_bits), NO_CMD_LINE, OPTION_RPL_ALLOW_PARALLEL,
- DEFAULT(TRUE), NO_MUTEX_GUARD, NOT_IN_BINLOG);
+ "--slave-parallel-threads. Possible values: \"optimistic\" tries to "
+ "apply most transactional DML in parallel, and handles any conflicts "
+ "with rollback and retry. \"conservative\" limits parallelism in an "
+ "effort to avoid any conflicts. \"aggressive\" tries to maximise the "
+ "parallelism, possibly at the cost of increased conflict rate. "
+ "\"minimal\" only parallelizes the commit steps of transactions. "
+ "\"none\" disables parallel apply completely.",
+ GLOBAL_VAR(opt_slave_parallel_mode), NO_CMD_LINE,
+ slave_parallel_mode_names, DEFAULT(SLAVE_PARALLEL_CONSERVATIVE));
+
+
+static Sys_var_bit Sys_skip_parallel_replication(
+ "skip_parallel_replication",
+ "If set when a transaction is written to the binlog, parallel apply of "
+ "that transaction will be avoided on a slave where slave_parallel_mode "
+ "is not \"aggressive\". Can be used to avoid unnecessary rollback and "
+ "retry for transactions that are likely to cause a conflict if "
+ "replicated in parallel.",
+ SESSION_ONLY(option_bits), NO_CMD_LINE, OPTION_RPL_SKIP_PARALLEL,
+ DEFAULT(FALSE), NO_MUTEX_GUARD, NOT_IN_BINLOG);
static bool
diff --git a/sql/sys_vars.h b/sql/sys_vars.h
index a19102be6ec..61af931c189 100644
--- a/sql/sys_vars.h
+++ b/sql/sys_vars.h
@@ -2312,14 +2312,15 @@ public:
/**
Class for connection_name.slave_parallel_mode.
*/
-class Sys_var_slave_parallel_mode: public Sys_var_set
+class Sys_var_slave_parallel_mode: public Sys_var_enum
{
public:
Sys_var_slave_parallel_mode(const char *name_arg,
const char *comment, int flag_args, ptrdiff_t off, size_t size,
- CMD_LINE getopt, const char *values[], ulonglong def_val)
- : Sys_var_set(name_arg, comment, flag_args, off, size,
- getopt, values, def_val)
+ CMD_LINE getopt, const char *values[],
+ enum_slave_parallel_mode def_val)
+ : Sys_var_enum(name_arg, comment, flag_args, off, size,
+ getopt, values, def_val)
{
option.var_type|= GET_ASK_ADDR;
option.value= (uchar**)1; // crash me, please