summaryrefslogtreecommitdiff
path: root/plugin/semisync/semisync_master_plugin.cc
diff options
context:
space:
mode:
Diffstat (limited to 'plugin/semisync/semisync_master_plugin.cc')
-rw-r--r--plugin/semisync/semisync_master_plugin.cc121
1 files changed, 114 insertions, 7 deletions
diff --git a/plugin/semisync/semisync_master_plugin.cc b/plugin/semisync/semisync_master_plugin.cc
index 309910312c4..58bddf019bd 100644
--- a/plugin/semisync/semisync_master_plugin.cc
+++ b/plugin/semisync/semisync_master_plugin.cc
@@ -21,6 +21,11 @@
static ReplSemiSyncMaster repl_semisync;
+// forward declaration
+static inline ulong get_slave_lag_wait_timeout(THD* thd);
+
+static char rpl_semi_sync_master_group_commit = 0;
+
C_MODE_START
int repl_semi_report_binlog_update(Binlog_storage_param *param,
@@ -31,6 +36,13 @@ int repl_semi_report_binlog_update(Binlog_storage_param *param,
if (repl_semisync.getMasterEnabled())
{
+ if (rpl_semi_sync_master_group_commit &&
+ ((flags & BINLOG_GROUP_COMMIT_TRAILER) == 0))
+ {
+ /** there are transactions more coming... */
+ return 0;
+ }
+
/*
Let us store the binlog file name and the position, so that
we know how long to wait for the binlog to the replicated to
@@ -43,8 +55,11 @@ int repl_semi_report_binlog_update(Binlog_storage_param *param,
return error;
}
-int repl_semi_request_commit(Trans_param *param)
+int repl_semi_before_commit(Trans_param *param, int *error)
{
+ *error = repl_semisync.wait_slave_lag(
+ get_slave_lag_wait_timeout(current_thd));
+
return 0;
}
@@ -53,6 +68,14 @@ int repl_semi_report_binlog_sync(Binlog_storage_param *param,
my_off_t log_pos, uint32 flags)
{
int error= 0;
+
+ if (rpl_semi_sync_master_group_commit &&
+ ((flags & BINLOG_GROUP_COMMIT_TRAILER) == 0))
+ {
+ /** there are transactions more coming... */
+ return 0;
+ }
+
if (rpl_semi_sync_master_wait_point ==
SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC)
{
@@ -100,7 +123,7 @@ int repl_semi_binlog_dump_start(Binlog_transmit_param *param,
Let's assume this semi-sync slave has already received all
binlog events before the filename and position it requests.
*/
- repl_semisync.reportReplyBinlog(param->server_id, log_file, log_pos);
+ repl_semisync.reportReplyBinlog(param->server_id, log_file, log_pos, NULL);
}
sql_print_information("Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)",
semi_sync_slave ? "semi-sync" : "asynchronous",
@@ -242,15 +265,72 @@ static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_master_trace_level,
&fix_rpl_semi_sync_master_trace_level, // update
32, 0, ~0UL, 1);
+static MYSQL_SYSVAR_ULONG(max_unacked_event_count,
+ rpl_semi_sync_master_max_unacked_event_count,
+ PLUGIN_VAR_OPCMDARG,
+ "Maximum unacked replication events",
+ NULL, // check
+ NULL, // update
+ rpl_semi_sync_master_max_unacked_event_count, 0, ~0UL, 1);
+
+static MYSQL_SYSVAR_ULONG(max_unacked_event_bytes,
+ rpl_semi_sync_master_max_unacked_event_bytes,
+ PLUGIN_VAR_OPCMDARG,
+ "Maximum unacked replication bytes",
+ NULL, // check
+ NULL, // update
+ rpl_semi_sync_master_max_unacked_event_bytes, 0, ~0UL, 1);
+
+static MYSQL_SYSVAR_ULONG(max_slave_lag, rpl_semi_sync_master_max_slave_lag,
+ PLUGIN_VAR_OPCMDARG,
+ "Maximum allowed lag of fastest semi-sync slave (in seconds), "
+ "checked before commit.",
+ NULL, // check
+ NULL, // update
+ rpl_semi_sync_master_max_slave_lag, 0, ~0UL, 1);
+
+static MYSQL_THDVAR_ULONG(slave_lag_wait_timeout,
+ PLUGIN_VAR_RQCMDARG,
+ "Timeout in seconds a rw-transaction may wait for max slave lag before "
+ "being rolled back.",
+ NULL, NULL, 50, 1, 1024 * 1024 * 1024, 0);
+
+static MYSQL_SYSVAR_ULONG(
+ slave_lag_heartbeat_frequency_us,
+ rpl_semi_sync_master_slave_lag_heartbeat_frequency_us,
+ PLUGIN_VAR_RQCMDARG,
+ "Heartbeat frequency when slave-lag is enabled (in microseconds).",
+ NULL, // check
+ NULL, // update
+ 500000, /* 500 ms */
+ 1, ~0UL, 1);
+
+static MYSQL_SYSVAR_BOOL(group_commit, rpl_semi_sync_master_group_commit,
+ PLUGIN_VAR_OPCMDARG,
+ "Group commit for semi sync",
+ NULL, // check
+ NULL,
+ 0);
+
static SYS_VAR* semi_sync_master_system_vars[]= {
MYSQL_SYSVAR(enabled),
MYSQL_SYSVAR(wait_point),
MYSQL_SYSVAR(timeout),
MYSQL_SYSVAR(wait_no_slave),
MYSQL_SYSVAR(trace_level),
+ MYSQL_SYSVAR(max_unacked_event_count),
+ MYSQL_SYSVAR(max_unacked_event_bytes),
+ MYSQL_SYSVAR(max_slave_lag),
+ MYSQL_SYSVAR(slave_lag_wait_timeout),
+ MYSQL_SYSVAR(slave_lag_heartbeat_frequency_us),
+ MYSQL_SYSVAR(group_commit),
NULL,
};
+static inline ulong get_slave_lag_wait_timeout(THD* thd)
+{
+ return THDVAR(thd, slave_lag_wait_timeout);
+}
static void fix_rpl_semi_sync_master_timeout(MYSQL_THD thd,
SYS_VAR *var,
@@ -297,6 +377,7 @@ Trans_observer trans_observer = {
repl_semi_report_commit, // after_commit
repl_semi_report_rollback, // after_rollback
+ repl_semi_before_commit, // before commit
};
Binlog_storage_observer storage_observer = {
@@ -339,7 +420,11 @@ DEF_SHOW_FUNC(net_wait_time, SHOW_LONGLONG)
DEF_SHOW_FUNC(net_wait_num, SHOW_LONGLONG)
DEF_SHOW_FUNC(avg_net_wait_time, SHOW_LONG)
DEF_SHOW_FUNC(avg_trx_wait_time, SHOW_LONG)
-
+DEF_SHOW_FUNC(slave_lag_wait_sessions, SHOW_LONG)
+DEF_SHOW_FUNC(estimated_slave_lag, SHOW_LONGLONG)
+DEF_SHOW_FUNC(trx_slave_lag_wait_time, SHOW_LONGLONG)
+DEF_SHOW_FUNC(trx_slave_lag_wait_num, SHOW_LONGLONG)
+DEF_SHOW_FUNC(avg_trx_slave_lag_wait_time, SHOW_LONG)
/* plugin status variables */
static SHOW_VAR semi_sync_master_status_vars[]= {
@@ -385,32 +470,55 @@ static SHOW_VAR semi_sync_master_status_vars[]= {
{"Rpl_semi_sync_master_net_avg_wait_time",
(char*) &SHOW_FNAME(avg_net_wait_time),
SHOW_SIMPLE_FUNC},
+ {"Rpl_semi_sync_master_slave_lag_wait_sessions",
+ (char*) &SHOW_FNAME(slave_lag_wait_sessions),
+ SHOW_SIMPLE_FUNC},
+ {"Rpl_semi_sync_master_estimated_slave_lag",
+ (char*) &SHOW_FNAME(estimated_slave_lag),
+ SHOW_SIMPLE_FUNC},
+ {"Rpl_semi_sync_master_tx_slave_lag_wait_time",
+ (char*) &SHOW_FNAME(trx_slave_lag_wait_time),
+ SHOW_SIMPLE_FUNC},
+ {"Rpl_semi_sync_master_tx_slave_lag_waits",
+ (char*) &SHOW_FNAME(trx_slave_lag_wait_num),
+ SHOW_SIMPLE_FUNC},
+ {"Rpl_semi_sync_master_tx_avg_slave_lag_wait_time",
+ (char*) &SHOW_FNAME(avg_trx_slave_lag_wait_time),
+ SHOW_SIMPLE_FUNC},
{NULL, NULL, SHOW_LONG},
};
#ifdef HAVE_PSI_INTERFACE
PSI_mutex_key key_ss_mutex_LOCK_binlog_;
+PSI_mutex_key key_ss_mutex_LOCK_slave_lag_;
static PSI_mutex_info all_semisync_mutexes[]=
{
- { &key_ss_mutex_LOCK_binlog_, "LOCK_binlog_", 0}
+ { &key_ss_mutex_LOCK_binlog_, "LOCK_binlog_", 0 },
+ { &key_ss_mutex_LOCK_slave_lag_, "LOCK_slave_lag_", 0 }
};
PSI_cond_key key_ss_cond_COND_binlog_send_;
+PSI_cond_key key_ss_cond_COND_slave_lag_;
static PSI_cond_info all_semisync_conds[]=
{
- { &key_ss_cond_COND_binlog_send_, "COND_binlog_send_", 0}
+ { &key_ss_cond_COND_binlog_send_, "COND_binlog_send_", 0 },
+ { &key_ss_cond_COND_slave_lag_, "COND_slave_lag_", 0 }
};
#endif /* HAVE_PSI_INTERFACE */
PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave=
{ 0, "Waiting for semi-sync ACK from slave", 0};
+PSI_stage_info stage_waiting_for_semi_sync_slave_lag=
+{ 0, "Waiting for semi-sync slave lag", 0};
+
#ifdef HAVE_PSI_INTERFACE
PSI_stage_info *all_semisync_stages[]=
{
- & stage_waiting_for_semi_sync_ack_from_slave
+ & stage_waiting_for_semi_sync_ack_from_slave,
+ & stage_waiting_for_semi_sync_slave_lag
};
static void init_semisync_psi_keys(void)
@@ -492,4 +600,3 @@ maria_declare_plugin(semisync_master)
MariaDB_PLUGIN_MATURITY_STABLE
}
maria_declare_plugin_end;
-