summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2012-03-01 12:41:49 +0100
committerunknown <knielsen@knielsen-hq.org>2012-03-01 12:41:49 +0100
commit22a504f89794f6517bc091b8d1f945b9aece7c18 (patch)
tree6c2bb074e63c752293c83176d894bfc3d57fb3b9 /sql
parent4e8bb265fef04c0e331dc78bdfdda6b41e918dfd (diff)
parent9313032283f1650d11fb36066f31d966e8492bdc (diff)
downloadmariadb-git-22a504f89794f6517bc091b8d1f945b9aece7c18.tar.gz
Merge MWL#234: @@skip_replication feature to MariaDB 5.5.
Diffstat (limited to 'sql')
-rw-r--r--sql/log_event.cc19
-rw-r--r--sql/log_event.h26
-rw-r--r--sql/mysqld.cc2
-rw-r--r--sql/mysqld.h1
-rw-r--r--sql/set_var.h1
-rw-r--r--sql/share/errmsg-utf8.txt5
-rw-r--r--sql/slave.cc64
-rw-r--r--sql/slave.h9
-rw-r--r--sql/sql_binlog.cc11
-rw-r--r--sql/sql_priv.h1
-rw-r--r--sql/sql_repl.cc154
-rw-r--r--sql/sys_vars.cc124
-rw-r--r--sql/sys_vars.h20
13 files changed, 349 insertions, 88 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index bd5f6306c47..cec0785a088 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -729,7 +729,7 @@ const char* Log_event::get_type_str()
#ifndef MYSQL_CLIENT
Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans)
- :log_pos(0), temp_buf(0), exec_time(0), flags(flags_arg),
+ :log_pos(0), temp_buf(0), exec_time(0),
crc(0), thd(thd_arg),
checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
{
@@ -741,6 +741,9 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans)
cache_type= Log_event::EVENT_TRANSACTIONAL_CACHE;
else
cache_type= Log_event::EVENT_STMT_CACHE;
+ flags= flags_arg |
+ (thd->variables.option_bits & OPTION_SKIP_REPLICATION ?
+ LOG_EVENT_SKIP_REPLICATION_F : 0);
}
/**
@@ -891,7 +894,9 @@ Log_event::do_shall_skip(Relay_log_info *rli)
rli->replicate_same_server_id,
rli->slave_skip_counter));
if ((server_id == ::server_id && !rli->replicate_same_server_id) ||
- (rli->slave_skip_counter == 1 && rli->is_in_group()))
+ (rli->slave_skip_counter == 1 && rli->is_in_group()) ||
+ (flags & LOG_EVENT_SKIP_REPLICATION_F &&
+ opt_replicate_events_marked_for_skip != RPL_SKIP_REPLICATE))
return EVENT_SKIP_IGNORE;
if (rli->slave_skip_counter > 0)
return EVENT_SKIP_COUNT;
@@ -3901,6 +3906,14 @@ Query_log_event::do_shall_skip(Relay_log_info *rli)
DBUG_PRINT("debug", ("query: %s; q_len: %d", query, q_len));
DBUG_ASSERT(query && q_len > 0);
+ /*
+ An event skipped due to @@skip_replication must not be counted towards the
+ number of events to be skipped due to @@sql_slave_skip_counter.
+ */
+ if (flags & LOG_EVENT_SKIP_REPLICATION_F &&
+ opt_replicate_events_marked_for_skip != RPL_SKIP_REPLICATE)
+ DBUG_RETURN(Log_event::EVENT_SKIP_IGNORE);
+
if (rli->slave_skip_counter > 0)
{
if (strcmp("BEGIN", query) == 0)
@@ -10806,7 +10819,7 @@ st_print_event_info::st_print_event_info()
auto_increment_increment(0),auto_increment_offset(0), charset_inited(0),
lc_time_names_number(~0),
charset_database_number(ILLEGAL_CHARSET_INFO_NUMBER),
- thread_id(0), thread_id_printed(false),
+ thread_id(0), thread_id_printed(false), skip_replication(0),
base64_output_mode(BASE64_OUTPUT_UNSPEC), printed_fd_event(FALSE)
{
/*
diff --git a/sql/log_event.h b/sql/log_event.h
index 2f8854dd488..22e28c7ae13 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -505,6 +505,19 @@ struct sql_ex_info
#define LOG_EVENT_RELAY_LOG_F 0x40
/**
+ @def LOG_EVENT_SKIP_REPLICATION_F
+
+ Flag set by application creating the event (with @@skip_replication); the
+ slave will skip replication of such events if
+ --replicate-events-marked-for-skip is not set to REPLICATE.
+
+ This is a MariaDB flag; we allocate it from the end of the available
+ values to reduce risk of conflict with new MySQL flags.
+*/
+#define LOG_EVENT_SKIP_REPLICATION_F 0x8000
+
+
+/**
@def OPTIONS_WRITTEN_TO_BIN_LOG
OPTIONS_WRITTEN_TO_BIN_LOG are the bits of thd->options which must
@@ -701,6 +714,11 @@ typedef struct st_print_event_info
uint charset_database_number;
uint thread_id;
bool thread_id_printed;
+ /*
+ Track when @@skip_replication changes so we need to output a SET
+ statement for it.
+ */
+ int skip_replication;
st_print_event_info();
@@ -993,8 +1011,8 @@ public:
/**
Some 16 flags. See the definitions above for LOG_EVENT_TIME_F,
- LOG_EVENT_FORCED_ROTATE_F, LOG_EVENT_THREAD_SPECIFIC_F, and
- LOG_EVENT_SUPPRESS_USE_F for notes.
+ LOG_EVENT_FORCED_ROTATE_F, LOG_EVENT_THREAD_SPECIFIC_F,
+ LOG_EVENT_SUPPRESS_USE_F, and LOG_EVENT_SKIP_REPLICATION_F for notes.
*/
uint16 flags;
@@ -4143,6 +4161,8 @@ public:
m_message.str= NULL; /* Just as a precaution */
m_message.length= 0;
set_direct_logging();
+ /* Replicate the incident irregardless of @@skip_replication. */
+ flags&= ~LOG_EVENT_SKIP_REPLICATION_F;
DBUG_VOID_RETURN;
}
@@ -4153,6 +4173,8 @@ public:
DBUG_PRINT("enter", ("m_incident: %d", m_incident));
m_message= msg;
set_direct_logging();
+ /* Replicate the incident irregardless of @@skip_replication. */
+ flags&= ~LOG_EVENT_SKIP_REPLICATION_F;
DBUG_VOID_RETURN;
}
#endif
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 0d76c94cff3..f4a6585896c 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -437,6 +437,8 @@ uint opt_large_page_size= 0;
MYSQL_PLUGIN_IMPORT uint opt_debug_sync_timeout= 0;
#endif /* defined(ENABLED_DEBUG_SYNC) */
my_bool opt_old_style_user_limits= 0, trust_function_creators= 0;
+ulong opt_replicate_events_marked_for_skip;
+
/*
True if there is at least one per-hour limit for some user, so we should
check them before each query (and possibly reset counters when hour is
diff --git a/sql/mysqld.h b/sql/mysqld.h
index 296b747b1ce..db7857d9cd5 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -109,6 +109,7 @@ extern my_bool opt_old_style_user_limits, trust_function_creators;
extern uint opt_crash_binlog_innodb;
extern char *shared_memory_base_name, *mysqld_unix_port;
extern my_bool opt_enable_shared_memory;
+extern ulong opt_replicate_events_marked_for_skip;
extern char *default_tz_name;
extern Time_zone *default_tz;
extern char *default_storage_engine;
diff --git a/sql/set_var.h b/sql/set_var.h
index d285787904c..c074f3f4399 100644
--- a/sql/set_var.h
+++ b/sql/set_var.h
@@ -171,6 +171,7 @@ protected:
#include "sql_plugin.h" /* SHOW_HA_ROWS, SHOW_MY_BOOL */
+
/****************************************************************************
Classes for parsing of the SET command
****************************************************************************/
diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt
index 872e80ab542..8e866c82123 100644
--- a/sql/share/errmsg-utf8.txt
+++ b/sql/share/errmsg-utf8.txt
@@ -6557,4 +6557,7 @@ ER_CONNECTION_KILLED 70100
eng "Connection was killed"
ER_INTERNAL_ERROR
eng "Internal error: '%-.192s'"
-
+ER_INSIDE_TRANSACTION_PREVENTS_SWITCH_SKIP_REPLICATION
+ eng "Cannot modify @@session.skip_replication inside a transaction"
+ER_STORED_FUNCTION_PREVENTS_SWITCH_SKIP_REPLICATION
+ eng "Cannot modify @@session.skip_replication inside a stored function or trigger"
diff --git a/sql/slave.cc b/sql/slave.cc
index 2b73cad1d7b..1e717a2e98c 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -1710,6 +1710,48 @@ when it try to get the value of TIME_ZONE global variable from master.";
past_checksum:
#endif
+ /*
+ Request the master to filter away events with the @@skip_replication flag
+ set, if we are running with
+ --replicate-events-marked-for-skip=FILTER_ON_MASTER.
+ */
+ if (opt_replicate_events_marked_for_skip == RPL_SKIP_FILTER_ON_MASTER)
+ {
+ if (mysql_real_query(mysql, STRING_WITH_LEN("SET skip_replication=1")))
+ {
+ err_code= mysql_errno(mysql);
+ if (is_network_error(err_code))
+ {
+ mi->report(ERROR_LEVEL, err_code,
+ "Setting master-side filtering of @@skip_replication failed "
+ "with error: %s", mysql_error(mysql));
+ goto network_err;
+ }
+ else if (err_code == ER_UNKNOWN_SYSTEM_VARIABLE)
+ {
+ /*
+ The master is older than the slave and does not support the
+ @@skip_replication feature.
+ This is not a problem, as such master will not generate events with
+ the @@skip_replication flag set in the first place. We will still
+ do slave-side filtering of such events though, to handle the (rare)
+ case of downgrading a master and receiving old events generated from
+ before the downgrade with the @@skip_replication flag set.
+ */
+ DBUG_PRINT("info", ("Old master does not support master-side filtering "
+ "of @@skip_replication events."));
+ }
+ else
+ {
+ /* Fatal error */
+ errmsg= "The slave I/O thread stops because a fatal error is "
+ "encountered when it tries to request filtering of events marked "
+ "with the @@skip_replication flag.";
+ sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql));
+ goto err;
+ }
+ }
+ }
err:
if (errmsg)
{
@@ -2498,6 +2540,9 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli)
ev->when= hrtime_to_my_time(hrtime);
ev->when_sec_part= hrtime_sec_part(hrtime);
}
+ thd->variables.option_bits=
+ (thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) |
+ (ev->flags & LOG_EVENT_SKIP_REPLICATION_F ? OPTION_SKIP_REPLICATION : 0);
ev->thd = thd; // because up to this point, ev->thd == 0
int reason= ev->shall_skip(rli);
@@ -4062,6 +4107,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
int error= 0;
String error_msg;
ulong inc_pos;
+ ulong event_pos;
Relay_log_info *rli= &mi->rli;
mysql_mutex_t *log_lock= rli->relay_log.get_log_lock();
ulong s_id;
@@ -4134,7 +4180,6 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
(uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
DBUG_RETURN(queue_old_event(mi,buf,event_len));
- LINT_INIT(inc_pos);
mysql_mutex_lock(&mi->data_lock);
switch ((uchar)buf[EVENT_TYPE_OFFSET]) {
@@ -4327,6 +4372,23 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
}
/*
+ If we filter events master-side (eg. @@skip_replication), we will see holes
+ in the event positions from the master. If we see such a hole, adjust
+ mi->master_log_pos accordingly so we maintain the correct position (for
+ reconnect, MASTER_POS_WAIT(), etc.)
+ */
+ if (inc_pos > 0 &&
+ event_len >= LOG_POS_OFFSET+4 &&
+ (event_pos= uint4korr(buf+LOG_POS_OFFSET)) > mi->master_log_pos + inc_pos)
+ {
+ inc_pos= event_pos - mi->master_log_pos;
+ DBUG_PRINT("info", ("Adjust master_log_pos %lu->%lu to account for "
+ "master-side filtering",
+ (unsigned long)(mi->master_log_pos + inc_pos),
+ event_pos));
+ }
+
+ /*
If this event is originating from this server, don't queue it.
We don't check this for 3.23 events because it's simpler like this; 3.23
will be filtered anyway by the SQL slave thread which also tests the
diff --git a/sql/slave.h b/sql/slave.h
index e519a9fc3fa..6b4bcffe109 100644
--- a/sql/slave.h
+++ b/sql/slave.h
@@ -152,6 +152,15 @@ extern ulonglong relay_log_space_limit;
*/
#define SLAVE_FORCE_ALL 4
+/*
+ Values for the option --replicate-events-marked-for-skip.
+ Must match the names in replicate_events_marked_for_skip_names in sys_vars.cc
+*/
+#define RPL_SKIP_REPLICATE 0
+#define RPL_SKIP_FILTER_ON_SLAVE 1
+#define RPL_SKIP_FILTER_ON_MASTER 2
+
+
int init_slave();
int init_recovery(Master_info* mi, const char** errmsg);
void init_slave_skip_errors(const char* arg);
diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc
index 664590c34ac..0ac92859365 100644
--- a/sql/sql_binlog.cc
+++ b/sql/sql_binlog.cc
@@ -44,6 +44,7 @@
void mysql_client_binlog_statement(THD* thd)
{
+ ulonglong save_skip_replication;
DBUG_ENTER("mysql_client_binlog_statement");
DBUG_PRINT("info",("binlog base64: '%*s'",
(int) (thd->lex->comment.length < 2048 ?
@@ -225,7 +226,17 @@ void mysql_client_binlog_statement(THD* thd)
reporting.
*/
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+ save_skip_replication= thd->variables.option_bits&OPTION_SKIP_REPLICATION;
+ thd->variables.option_bits=
+ (thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) |
+ (ev->flags & LOG_EVENT_SKIP_REPLICATION_F ?
+ OPTION_SKIP_REPLICATION : 0);
+
err= ev->apply_event(rli);
+
+ thd->variables.option_bits=
+ (thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) |
+ save_skip_replication;
#else
err= 0;
#endif
diff --git a/sql/sql_priv.h b/sql/sql_priv.h
index b9017f1e5ab..78e1fed83fc 100644
--- a/sql/sql_priv.h
+++ b/sql/sql_priv.h
@@ -151,6 +151,7 @@
Note! Reserved for use in MySQL Cluster
*/
#define OPTION_ALLOW_BATCH (ULL(1) << 36) // THD, intern (slave)
+#define OPTION_SKIP_REPLICATION (ULL(1) << 37) // THD, user
/* The rest of the file is included in the server only */
#ifndef MYSQL_CLIENT
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 9974c56f3d1..29ab8b1d05b 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -556,8 +556,60 @@ static int send_heartbeat_event(NET* net, String* packet,
/*
- TODO: Clean up loop to only have one call to send_file()
+ Helper function for mysql_binlog_send() to write an event down the slave
+ connection.
+
+ Returns NULL on success, error message string on error.
*/
+static const char *
+send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
+ Log_event_type event_type, char *log_file_name,
+ IO_CACHE *log)
+{
+ my_off_t pos;
+
+ /* Do not send annotate_rows events unless slave requested it. */
+ if (event_type == ANNOTATE_ROWS_EVENT &&
+ !(flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT))
+ return NULL;
+
+ /*
+ Skip events with the @@skip_replication flag set, if slave requested
+ skipping of such events.
+ */
+ if (thd->variables.option_bits & OPTION_SKIP_REPLICATION)
+ {
+ /*
+ The first byte of the packet is a '\0' to distinguish it from an error
+ packet. So the actual event starts at offset +1.
+ */
+ uint16 event_flags= uint2korr(&((*packet)[FLAGS_OFFSET+1]));
+ if (event_flags & LOG_EVENT_SKIP_REPLICATION_F)
+ return NULL;
+ }
+
+ thd_proc_info(thd, "Sending binlog event to slave");
+
+ pos= my_b_tell(log);
+ if (RUN_HOOK(binlog_transmit, before_send_event,
+ (thd, flags, packet, log_file_name, pos)))
+ return "run 'before_send_event' hook failed";
+
+ if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
+ return "Failed on my_net_write()";
+
+ DBUG_PRINT("info", ("log event code %d", (*packet)[LOG_EVENT_OFFSET+1] ));
+ if (event_type == LOAD_EVENT)
+ {
+ if (send_file(thd))
+ return "failed in send_file()";
+ }
+
+ if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
+ return "Failed to run hook 'after_send_event'";
+
+ return NULL; /* Success */
+}
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
ushort flags)
@@ -570,9 +622,9 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
IO_CACHE log;
File file = -1;
- String* packet = &thd->packet;
+ String* const packet = &thd->packet;
int error;
- const char *errmsg = "Unknown error";
+ const char *errmsg = "Unknown error", *tmp_msg;
const char *fmt= "%s; the last event was read from '%s' at %s, the last byte read was read from '%s' at %s.";
char llbuff1[22], llbuff2[22];
char error_text[MAX_SLAVE_ERRMSG]; // to be send to slave via my_message()
@@ -889,51 +941,21 @@ impossible position";
(*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
}
- if (event_type != ANNOTATE_ROWS_EVENT ||
- (flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT))
+ if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
+ log_file_name, &log)))
{
- pos = my_b_tell(&log);
- if (RUN_HOOK(binlog_transmit, before_send_event,
- (thd, flags, packet, log_file_name, pos)))
- {
- my_errno= ER_UNKNOWN_ERROR;
- errmsg= "run 'before_send_event' hook failed";
- goto err;
- }
-
- if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
- {
- errmsg = "Failed on my_net_write()";
- my_errno= ER_UNKNOWN_ERROR;
- goto err;
- }
+ errmsg= tmp_msg;
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
- DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
+ DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
+ {
+ if (event_type == XID_EVENT)
{
- if (event_type == XID_EVENT)
- {
- net_flush(net);
- }
- });
-
- DBUG_PRINT("info", ("log event code %d", event_type));
- if (event_type == LOAD_EVENT)
- {
- if (send_file(thd))
- {
- errmsg = "failed in send_file()";
- my_errno= ER_UNKNOWN_ERROR;
- goto err;
- }
- }
-
- if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
- {
- errmsg= "Failed to run hook 'after_send_event'";
- my_errno= ER_UNKNOWN_ERROR;
- goto err;
- }
- }
+ net_flush(net);
+ }
+ });
/* reset transmit packet for next loop */
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
@@ -1078,43 +1100,13 @@ impossible position";
goto err;
}
- if (read_packet &&
- (event_type != ANNOTATE_ROWS_EVENT ||
- (flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT)))
+ if (read_packet &&
+ (tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
+ log_file_name, &log)))
{
- thd_proc_info(thd, "Sending binlog event to slave");
- pos = my_b_tell(&log);
- if (RUN_HOOK(binlog_transmit, before_send_event,
- (thd, flags, packet, log_file_name, pos)))
- {
- my_errno= ER_UNKNOWN_ERROR;
- errmsg= "run 'before_send_event' hook failed";
- goto err;
- }
-
- if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
- {
- errmsg = "Failed on my_net_write()";
- my_errno= ER_UNKNOWN_ERROR;
- goto err;
- }
-
- if (event_type == LOAD_EVENT)
- {
- if (send_file(thd))
- {
- errmsg = "failed in send_file()";
- my_errno= ER_UNKNOWN_ERROR;
- goto err;
- }
- }
-
- if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
- {
- my_errno= ER_UNKNOWN_ERROR;
- errmsg= "Failed to run hook 'after_send_event'";
- goto err;
- }
+ errmsg= tmp_msg;
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
}
log.error=0;
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index 0e734476242..5a2904dbbb2 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -2015,6 +2015,67 @@ static Sys_var_mybool Sys_master_verify_checksum(
"SHOW BINLOG EVENTS",
GLOBAL_VAR(opt_master_verify_checksum), CMD_LINE(OPT_ARG),
DEFAULT(FALSE));
+
+/* These names must match RPL_SKIP_XXX #defines in slave.h. */
+static const char *replicate_events_marked_for_skip_names[]= {
+ "replicate", "filter_on_slave", "filter_on_master", 0
+};
+static bool
+replicate_events_marked_for_skip_check(sys_var *self, THD *thd,
+ set_var *var)
+{
+ int thread_mask;
+ DBUG_ENTER("sys_var_replicate_events_marked_for_skip_check");
+
+ /* Slave threads must be stopped to change the variable. */
+ mysql_mutex_lock(&LOCK_active_mi);
+ lock_slave_threads(active_mi);
+ init_thread_mask(&thread_mask, active_mi, 0 /*not inverse*/);
+ unlock_slave_threads(active_mi);
+ mysql_mutex_unlock(&LOCK_active_mi);
+
+ if (thread_mask) // We refuse if any slave thread is running
+ {
+ my_error(ER_SLAVE_MUST_STOP, MYF(0));
+ DBUG_RETURN(true);
+ }
+ DBUG_RETURN(false);
+}
+bool
+Sys_var_replicate_events_marked_for_skip::global_update(THD *thd, set_var *var)
+{
+ bool result;
+ int thread_mask;
+ DBUG_ENTER("Sys_var_replicate_events_marked_for_skip::global_update");
+
+ /* Slave threads must be stopped to change the variable. */
+ mysql_mutex_lock(&LOCK_active_mi);
+ lock_slave_threads(active_mi);
+ init_thread_mask(&thread_mask, active_mi, 0 /*not inverse*/);
+ if (thread_mask) // We refuse if any slave thread is running
+ {
+ my_error(ER_SLAVE_MUST_STOP, MYF(0));
+ result= true;
+ }
+ else
+ result= Sys_var_enum::global_update(thd, var);
+
+ unlock_slave_threads(active_mi);
+ mysql_mutex_unlock(&LOCK_active_mi);
+ DBUG_RETURN(result);
+}
+static Sys_var_replicate_events_marked_for_skip Replicate_events_marked_for_skip
+ ("replicate_events_marked_for_skip",
+ "Whether the slave should replicate events that were created with "
+ "@@skip_replication=1 on the master. Default REPLICATE (no events are "
+ "skipped). Other values are FILTER_ON_SLAVE (events will be sent by the "
+ "master but ignored by the slave) and FILTER_ON_MASTER (events marked with "
+ "@@skip_replication=1 will be filtered on the master and never be sent to "
+ "the slave).",
+ GLOBAL_VAR(opt_replicate_events_marked_for_skip), CMD_LINE(REQUIRED_ARG),
+ replicate_events_marked_for_skip_names, DEFAULT(RPL_SKIP_REPLICATE),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG,
+ ON_CHECK(replicate_events_marked_for_skip_check));
#endif
@@ -2645,6 +2706,69 @@ static Sys_var_ulong Sys_profiling_history_size(
VALID_RANGE(0, 100), DEFAULT(15), BLOCK_SIZE(1));
#endif
+/*
+ Some variables like @sql_log_bin and @binlog_format change how/if binlogging
+ is done. We must not change them inside a running transaction or statement,
+ otherwise the event group eventually written to the binlog may become
+ incomplete or otherwise garbled.
+
+ This function does the appropriate check.
+
+ It returns true if an error is caused by incorrect usage, false if ok.
+*/
+static bool
+error_if_in_trans_or_substatement(THD *thd, int in_substatement_error,
+ int in_transaction_error)
+{
+ if (thd->in_sub_stmt)
+ {
+ my_error(in_substatement_error, MYF(0));
+ return true;
+ }
+
+ if (thd->in_active_multi_stmt_transaction())
+ {
+ my_error(in_transaction_error, MYF(0));
+ return true;
+ }
+
+ return false;
+}
+
+/*
+ When this is set by a connection, binlogged events will be marked with a
+ corresponding flag. The slave can be configured to not replicate events
+ so marked.
+ In the binlog dump thread on the master, this variable is re-used for a
+ related purpose: The slave sets this flag when connecting to the master to
+ request that the master filter out (ie. not send) any events with the flag
+ set, thus saving network traffic on events that would be ignored by the
+ slave anyway.
+*/
+static bool check_skip_replication(sys_var *self, THD *thd, set_var *var)
+{
+ /*
+ We must not change @@skip_replication in the middle of a transaction or
+ statement, as that could result in only part of the transaction / statement
+ being replicated.
+ (This would be particularly serious if we were to replicate eg.
+ Rows_log_event without Table_map_log_event or transactional updates without
+ the COMMIT).
+ */
+ if (error_if_in_trans_or_substatement(thd,
+ ER_STORED_FUNCTION_PREVENTS_SWITCH_SKIP_REPLICATION,
+ ER_INSIDE_TRANSACTION_PREVENTS_SWITCH_SKIP_REPLICATION))
+ return 1;
+
+ return 0;
+}
+
+static Sys_var_bit Sys_skip_replication(
+ "skip_replication", "skip_replication",
+ SESSION_ONLY(option_bits), NO_CMD_LINE, OPTION_SKIP_REPLICATION,
+ DEFAULT(FALSE), NO_MUTEX_GUARD, NOT_IN_BINLOG,
+ ON_CHECK(check_skip_replication));
+
static Sys_var_harows Sys_select_limit(
"sql_select_limit",
"The maximum number of rows to return from SELECT statements",
diff --git a/sql/sys_vars.h b/sql/sys_vars.h
index 272506ff1b5..f2a2966e6a2 100644
--- a/sql/sys_vars.h
+++ b/sql/sys_vars.h
@@ -1800,6 +1800,26 @@ public:
}
};
+/*
+ Class for replicate_events_marked_for_skip.
+ We need a custom update function that ensures the slave is stopped when
+ the update is happening.
+*/
+class Sys_var_replicate_events_marked_for_skip: public Sys_var_enum
+{
+public:
+ Sys_var_replicate_events_marked_for_skip(const char *name_arg,
+ const char *comment, int flag_args, ptrdiff_t off, size_t size,
+ CMD_LINE getopt,
+ const char *values[], uint def_val, PolyLock *lock,
+ enum binlog_status_enum binlog_status_arg,
+ on_check_function on_check_func)
+ :Sys_var_enum(name_arg, comment, flag_args, off, size, getopt,
+ values, def_val, lock, binlog_status_arg, on_check_func)
+ {}
+ bool global_update(THD *thd, set_var *var);
+};
+
/****************************************************************************
Used templates
****************************************************************************/