summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorLixun Peng <lixun@mariadb.org>2017-02-11 02:11:30 +0800
committerLixun Peng <lixun@mariadb.org>2017-02-11 02:11:30 +0800
commit86bba77b88ebbe420b797c95e61b3f7d84410206 (patch)
tree34765f9b889ff0065cad0795d4a01056e99d9a59 /sql
parent1b4f694adfa257d41b5ddb141e5bcc23e6dfd9c8 (diff)
downloadmariadb-git-bb-10.2-mdev8112.tar.gz
[MDEV-8112] Port no slave left behind into 10.2bb-10.2-mdev8112
This patch implements master throttling based on slave lag, aka no slave left behind. The core feature works as follows: 1) The semi-sync-reply is ammended to also report back SQL-thread position (aka exec position) 2) Transactions are not removed from the "active-transaction-list" in the semi-sync-master plugin until atleast one slave has reported that it has executed this transaction. the slave lag can then be estimated by calculating how long the oldest transaction has been lingering in the active-transaction-list. 3) client-threads are forced to wait before commit until slave lag has decreased to acceptable value. The following variables are introduced on master: 1. rpl_semi_sync_master_max_slave_lag (global) 2. rpl_semi_sync_master_slave_lag_wait_timeout (session) The following status variables are introduced on master: 1. rpl_semi_sync_master_slave_lag_wait_sessions 2. rpl_semi_sync_master_estimated_slave_lag 3. rpl_semi_sync_master_trx_slave_lag_wait_time 4. rpl_semi_sync_master_trx_slave_lag_wait_num 5. rpl_semi_sync_master_avg_trx_slave_lag_wait_time The following variables are introduced on slave: 1. rpl_semi_sync_slave_lag_enabled (global) In addition to this, 2 optimizations that decreases overhead of semi-sync is introduced. 1) the idea of this is that if when a slave should send and transaction, it checks if it should be semi-synced, but rather than semi-sync:ing each transaction (which is done currently) the code will skip semi-syncing transaction if there already is newer transactions committed. But, since this can mean that semi-syncing is delayed indefinitely a cap is set using 2 new master variables: 1. rpl_semi_sync_master_max_unacked_event_bytes (global) 2. rpl_semi_sync_master_max_unacked_event_count (global) 2) rpl_semi_sync_master_group_commit which makes the semi-sync plugin only semi-sync the last transaction in a group commit.
Diffstat (limited to 'sql')
-rw-r--r--sql/handler.cc21
-rw-r--r--sql/replication.h45
-rw-r--r--sql/rpl_handler.cc78
-rw-r--r--sql/rpl_handler.h2
-rw-r--r--sql/rpl_rli.h4
-rw-r--r--sql/sql_repl.cc14
6 files changed, 158 insertions, 6 deletions
diff --git a/sql/handler.cc b/sql/handler.cc
index f06c5d71a5e..bf32c000377 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -1361,12 +1361,30 @@ int ha_commit_trans(THD *thd, bool all)
/* rw_trans is TRUE when we in a transaction changing data */
bool rw_trans= is_real_trans &&
(rw_ha_count > !thd->is_current_stmt_binlog_disabled());
+ bool mdl_request_initialized= false;
MDL_request mdl_request;
DBUG_PRINT("info", ("is_real_trans: %d rw_trans: %d rw_ha_count: %d",
is_real_trans, rw_trans, rw_ha_count));
if (rw_trans)
{
+ /* check READ-ONLY just before before_commit hook to decrease likelihood
+ * of having threads hanging waiting for slave-lag only to be aborted
+ * due to read-only.
+ */
+ if (opt_readonly &&
+ !(thd->security_ctx->master_access & SUPER_ACL) &&
+ !thd->slave_thread)
+ {
+ my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--read-only");
+ goto err;
+ }
+
+ if (RUN_HOOK(transaction, before_commit, (thd)))
+ {
+ goto err;
+ }
+
/*
Acquire a metadata lock which will ensure that COMMIT is blocked
by an active FLUSH TABLES WITH READ LOCK (and vice versa:
@@ -1375,6 +1393,7 @@ int ha_commit_trans(THD *thd, bool all)
We allow the owner of FTWRL to COMMIT; we assume that it knows
what it does.
*/
+ mdl_request_initialized= true;
mdl_request.init(MDL_key::COMMIT, "", "", MDL_INTENTION_EXCLUSIVE,
MDL_EXPLICIT);
@@ -1483,7 +1502,7 @@ err:
ha_rollback_trans(thd, all);
end:
- if (rw_trans && mdl_request.ticket)
+ if (rw_trans && mdl_request_initialized && mdl_request.ticket)
{
/*
We do not always immediately release transactional locks
diff --git a/sql/replication.h b/sql/replication.h
index 4731c2246ef..309bdb4da61 100644
--- a/sql/replication.h
+++ b/sql/replication.h
@@ -110,6 +110,21 @@ typedef struct Trans_observer {
@retval 1 Failure
*/
int (*after_rollback)(Trans_param *param);
+
+ /**
+ This callback is called before transaction commit
+ If function does not return *error == 0 transaction will
+ not be committed but error code will be returned to client
+
+ @note *error!=0 and return code 0 shall be used by plugin to signal
+ that transaction should be aborted.
+ If returning non-zero transaction will also be aborted and an error
+ will be printed to error log.
+
+ @retval 0 Sucess
+ @retval non-zero error
+ */
+ int (*before_commit)(Trans_param *param, int *error);
} Trans_observer;
/**
@@ -294,6 +309,8 @@ enum Binlog_relay_IO_flags {
};
+class Master_info;
+
/**
Replication binlog relay IO observer parameter
*/
@@ -309,8 +326,20 @@ typedef struct Binlog_relay_IO_param {
my_off_t master_log_pos;
MYSQL *mysql; /* the connection to master */
+
+ Master_info * mi; /* master info handle */
} Binlog_relay_IO_param;
+
+/* get the master log given a Master_info
+ * and store it in filename_buf/filepos
+ * return length of filename (excluding \0)
+ *
+ * note: filename_buf should be a minimum FN_REFLEN
+ */
+size_t get_master_log_pos(const Master_info *mi,
+ char *filename_buf, my_off_t *filepos);
+
/**
Observes and extends the service of slave IO thread.
*/
@@ -561,7 +590,21 @@ int get_user_var_str(const char *name,
char *value, unsigned long len,
unsigned int precision, int *null_value);
-
+
+/**
+ Set or replace the value of user variable as to an ulonglong
+
+ @param name user variable name
+ @param value the value
+ @param old_value pointer to where old value will be stored (or NULL)
+
+ @retval 0 Success, no prior value found
+ @retval 1 Success, old_value populated
+ @retval -1 Fail
+*/
+int set_user_var_int(const char *name,
+ long long int value,
+ long long int *old_value);
#ifdef __cplusplus
}
diff --git a/sql/rpl_handler.cc b/sql/rpl_handler.cc
index 733af6c61c8..574fc4938a3 100644
--- a/sql/rpl_handler.cc
+++ b/sql/rpl_handler.cc
@@ -23,6 +23,7 @@
#include "rpl_filter.h"
#include <my_dir.h>
#include "rpl_handler.h"
+#include "sql_prepare.h"
Trans_delegate *transaction_delegate;
Binlog_storage_delegate *binlog_storage_delegate;
@@ -86,6 +87,42 @@ int get_user_var_str(const char *name, char *value,
return 0;
}
+int set_user_var_int(const char *name,
+ long long int value,
+ long long int *old_value)
+{
+ THD* thd= current_thd;
+ bool null_val;
+ user_var_entry *entry=
+ (user_var_entry*) my_hash_search(&thd->user_vars,
+ (uchar*) name, strlen(name));
+ if (entry != NULL)
+ {
+ if (old_value != NULL)
+ *old_value= entry->val_int(&null_val);
+ }
+
+ Ed_connection con(thd);
+
+ char buf[256];
+ int res= snprintf(buf, sizeof(buf), "SET @%s=%lld", name, value);
+ if (/* error */ res < 0 ||
+ /* truncated */ res >= sizeof(buf))
+ {
+ return -1;
+ }
+
+ LEX_STRING str;
+ lex_string_set(&str, buf);
+
+ if (con.execute_direct(str))
+ {
+ return -1;
+ }
+
+ return entry == NULL ? 0 : 1;
+}
+
int delegates_init()
{
static my_aligned_storage<sizeof(Trans_delegate), MY_ALIGNOF(long)> trans_mem;
@@ -240,6 +277,17 @@ int Trans_delegate::after_rollback(THD *thd, bool all)
return ret;
}
+int Trans_delegate::before_commit(THD *thd)
+{
+ int ret= 0, error= 0;
+ Trans_param param;
+ param.flags= 0;
+ param.log_file= 0;
+ param.log_pos= 0;
+ FOREACH_OBSERVER(ret, before_commit, thd, (&param, &error));
+ return error;
+}
+
int Binlog_storage_delegate::after_flush(THD *thd,
const char *log_file,
my_off_t log_pos,
@@ -364,17 +412,19 @@ int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
String *packet,
- const char *log_file,
+ const char *log_file_path,
my_off_t log_pos)
{
Binlog_transmit_param param;
param.flags= flags;
int ret= 0;
+ const char* log_file_name= log_file_path != NULL ?
+ log_file_path + dirname_length(log_file_path) : NULL;
FOREACH_OBSERVER(ret, before_send_event, false,
(&param, (uchar *)packet->c_ptr(),
packet->length(),
- log_file+dirname_length(log_file), log_pos));
+ log_file_name, log_pos));
return ret;
}
@@ -404,6 +454,7 @@ int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param,
Master_info *mi)
{
+ param->mi = mi;
param->mysql= mi->mysql;
param->user= mi->user;
param->host= mi->host;
@@ -530,6 +581,20 @@ int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void
{
return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p);
}
+
+/* get master log pos for a Master_info struct */
+size_t get_master_log_pos(const Master_info* mi,
+ char *filename_buf, my_off_t *filepos)
+{
+ mysql_mutex_t *mutex= &mi->rli.data_lock;
+
+ mysql_mutex_lock(mutex);
+ *filepos= mi->rli.group_master_log_pos;
+ strncpy(filename_buf, mi->rli.group_master_log_name, FN_REFLEN);
+ mysql_mutex_unlock(mutex);
+ return strnlen(filename_buf, FN_REFLEN);
+}
+
#else
int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
{
@@ -550,4 +615,13 @@ int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void
{
return 0;
}
+
+size_t get_master_log_pos(const Master_info* mi,
+ char *filename_buf, my_off_t *filepos)
+{
+ *filepos= 0;
+ filename_buf[0]= 0;
+ return 0;
+}
+
#endif /* HAVE_REPLICATION */
diff --git a/sql/rpl_handler.h b/sql/rpl_handler.h
index afcfd9d55b1..5119ee4ee78 100644
--- a/sql/rpl_handler.h
+++ b/sql/rpl_handler.h
@@ -142,7 +142,7 @@ class Trans_delegate
:public Delegate {
public:
typedef Trans_observer Observer;
- int before_commit(THD *thd, bool all);
+ int before_commit(THD *thd);
int before_rollback(THD *thd, bool all);
int after_commit(THD *thd, bool all);
int after_rollback(THD *thd, bool all);
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 5c637702d04..3ca7c2412e6 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -155,7 +155,9 @@ public:
standard lock acquisition order to avoid deadlocks:
run_lock, data_lock, relay_log.LOCK_log, relay_log.LOCK_index
*/
- mysql_mutex_t data_lock, run_lock;
+ mutable mysql_mutex_t data_lock;
+ mysql_mutex_t run_lock;
+
/*
start_cond is broadcast when SQL thread is started
stop_cond - when stopped
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 2a22810b8c2..945343f8d3e 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -821,6 +821,13 @@ static int send_heartbeat_event(binlog_send_info *info,
packet->append(b, sizeof(b));
}
+ if (RUN_HOOK(binlog_transmit, before_send_event,
+ (info->thd, info->flags, packet, 0, 0)))
+ {
+ info->error= ER_UNKNOWN_ERROR;
+ DBUG_RETURN(-1);
+ }
+
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
net_flush(net))
{
@@ -828,6 +835,13 @@ static int send_heartbeat_event(binlog_send_info *info,
DBUG_RETURN(-1);
}
+ if (RUN_HOOK(binlog_transmit, after_send_event,
+ (info->thd, info->flags, packet)))
+ {
+ info->error= ER_UNKNOWN_ERROR;
+ DBUG_RETURN(-1);
+ }
+
DBUG_RETURN(0);
}