diff options
Diffstat (limited to 'plugin/semisync/semisync_master_plugin.cc')
-rw-r--r-- | plugin/semisync/semisync_master_plugin.cc | 121 |
1 files changed, 114 insertions, 7 deletions
diff --git a/plugin/semisync/semisync_master_plugin.cc b/plugin/semisync/semisync_master_plugin.cc index 309910312c4..58bddf019bd 100644 --- a/plugin/semisync/semisync_master_plugin.cc +++ b/plugin/semisync/semisync_master_plugin.cc @@ -21,6 +21,11 @@ static ReplSemiSyncMaster repl_semisync; +// forward declaration +static inline ulong get_slave_lag_wait_timeout(THD* thd); + +static char rpl_semi_sync_master_group_commit = 0; + C_MODE_START int repl_semi_report_binlog_update(Binlog_storage_param *param, @@ -31,6 +36,13 @@ int repl_semi_report_binlog_update(Binlog_storage_param *param, if (repl_semisync.getMasterEnabled()) { + if (rpl_semi_sync_master_group_commit && + ((flags & BINLOG_GROUP_COMMIT_TRAILER) == 0)) + { + /** there are transactions more coming... */ + return 0; + } + /* Let us store the binlog file name and the position, so that we know how long to wait for the binlog to the replicated to @@ -43,8 +55,11 @@ int repl_semi_report_binlog_update(Binlog_storage_param *param, return error; } -int repl_semi_request_commit(Trans_param *param) +int repl_semi_before_commit(Trans_param *param, int *error) { + *error = repl_semisync.wait_slave_lag( + get_slave_lag_wait_timeout(current_thd)); + return 0; } @@ -53,6 +68,14 @@ int repl_semi_report_binlog_sync(Binlog_storage_param *param, my_off_t log_pos, uint32 flags) { int error= 0; + + if (rpl_semi_sync_master_group_commit && + ((flags & BINLOG_GROUP_COMMIT_TRAILER) == 0)) + { + /** there are transactions more coming... */ + return 0; + } + if (rpl_semi_sync_master_wait_point == SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC) { @@ -100,7 +123,7 @@ int repl_semi_binlog_dump_start(Binlog_transmit_param *param, Let's assume this semi-sync slave has already received all binlog events before the filename and position it requests. */ - repl_semisync.reportReplyBinlog(param->server_id, log_file, log_pos); + repl_semisync.reportReplyBinlog(param->server_id, log_file, log_pos, NULL); } sql_print_information("Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)", semi_sync_slave ? "semi-sync" : "asynchronous", @@ -242,15 +265,72 @@ static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_master_trace_level, &fix_rpl_semi_sync_master_trace_level, // update 32, 0, ~0UL, 1); +static MYSQL_SYSVAR_ULONG(max_unacked_event_count, + rpl_semi_sync_master_max_unacked_event_count, + PLUGIN_VAR_OPCMDARG, + "Maximum unacked replication events", + NULL, // check + NULL, // update + rpl_semi_sync_master_max_unacked_event_count, 0, ~0UL, 1); + +static MYSQL_SYSVAR_ULONG(max_unacked_event_bytes, + rpl_semi_sync_master_max_unacked_event_bytes, + PLUGIN_VAR_OPCMDARG, + "Maximum unacked replication bytes", + NULL, // check + NULL, // update + rpl_semi_sync_master_max_unacked_event_bytes, 0, ~0UL, 1); + +static MYSQL_SYSVAR_ULONG(max_slave_lag, rpl_semi_sync_master_max_slave_lag, + PLUGIN_VAR_OPCMDARG, + "Maximum allowed lag of fastest semi-sync slave (in seconds), " + "checked before commit.", + NULL, // check + NULL, // update + rpl_semi_sync_master_max_slave_lag, 0, ~0UL, 1); + +static MYSQL_THDVAR_ULONG(slave_lag_wait_timeout, + PLUGIN_VAR_RQCMDARG, + "Timeout in seconds a rw-transaction may wait for max slave lag before " + "being rolled back.", + NULL, NULL, 50, 1, 1024 * 1024 * 1024, 0); + +static MYSQL_SYSVAR_ULONG( + slave_lag_heartbeat_frequency_us, + rpl_semi_sync_master_slave_lag_heartbeat_frequency_us, + PLUGIN_VAR_RQCMDARG, + "Heartbeat frequency when slave-lag is enabled (in microseconds).", + NULL, // check + NULL, // update + 500000, /* 500 ms */ + 1, ~0UL, 1); + +static MYSQL_SYSVAR_BOOL(group_commit, rpl_semi_sync_master_group_commit, + PLUGIN_VAR_OPCMDARG, + "Group commit for semi sync", + NULL, // check + NULL, + 0); + static SYS_VAR* semi_sync_master_system_vars[]= { MYSQL_SYSVAR(enabled), MYSQL_SYSVAR(wait_point), MYSQL_SYSVAR(timeout), MYSQL_SYSVAR(wait_no_slave), MYSQL_SYSVAR(trace_level), + MYSQL_SYSVAR(max_unacked_event_count), + MYSQL_SYSVAR(max_unacked_event_bytes), + MYSQL_SYSVAR(max_slave_lag), + MYSQL_SYSVAR(slave_lag_wait_timeout), + MYSQL_SYSVAR(slave_lag_heartbeat_frequency_us), + MYSQL_SYSVAR(group_commit), NULL, }; +static inline ulong get_slave_lag_wait_timeout(THD* thd) +{ + return THDVAR(thd, slave_lag_wait_timeout); +} static void fix_rpl_semi_sync_master_timeout(MYSQL_THD thd, SYS_VAR *var, @@ -297,6 +377,7 @@ Trans_observer trans_observer = { repl_semi_report_commit, // after_commit repl_semi_report_rollback, // after_rollback + repl_semi_before_commit, // before commit }; Binlog_storage_observer storage_observer = { @@ -339,7 +420,11 @@ DEF_SHOW_FUNC(net_wait_time, SHOW_LONGLONG) DEF_SHOW_FUNC(net_wait_num, SHOW_LONGLONG) DEF_SHOW_FUNC(avg_net_wait_time, SHOW_LONG) DEF_SHOW_FUNC(avg_trx_wait_time, SHOW_LONG) - +DEF_SHOW_FUNC(slave_lag_wait_sessions, SHOW_LONG) +DEF_SHOW_FUNC(estimated_slave_lag, SHOW_LONGLONG) +DEF_SHOW_FUNC(trx_slave_lag_wait_time, SHOW_LONGLONG) +DEF_SHOW_FUNC(trx_slave_lag_wait_num, SHOW_LONGLONG) +DEF_SHOW_FUNC(avg_trx_slave_lag_wait_time, SHOW_LONG) /* plugin status variables */ static SHOW_VAR semi_sync_master_status_vars[]= { @@ -385,32 +470,55 @@ static SHOW_VAR semi_sync_master_status_vars[]= { {"Rpl_semi_sync_master_net_avg_wait_time", (char*) &SHOW_FNAME(avg_net_wait_time), SHOW_SIMPLE_FUNC}, + {"Rpl_semi_sync_master_slave_lag_wait_sessions", + (char*) &SHOW_FNAME(slave_lag_wait_sessions), + SHOW_SIMPLE_FUNC}, + {"Rpl_semi_sync_master_estimated_slave_lag", + (char*) &SHOW_FNAME(estimated_slave_lag), + SHOW_SIMPLE_FUNC}, + {"Rpl_semi_sync_master_tx_slave_lag_wait_time", + (char*) &SHOW_FNAME(trx_slave_lag_wait_time), + SHOW_SIMPLE_FUNC}, + {"Rpl_semi_sync_master_tx_slave_lag_waits", + (char*) &SHOW_FNAME(trx_slave_lag_wait_num), + SHOW_SIMPLE_FUNC}, + {"Rpl_semi_sync_master_tx_avg_slave_lag_wait_time", + (char*) &SHOW_FNAME(avg_trx_slave_lag_wait_time), + SHOW_SIMPLE_FUNC}, {NULL, NULL, SHOW_LONG}, }; #ifdef HAVE_PSI_INTERFACE PSI_mutex_key key_ss_mutex_LOCK_binlog_; +PSI_mutex_key key_ss_mutex_LOCK_slave_lag_; static PSI_mutex_info all_semisync_mutexes[]= { - { &key_ss_mutex_LOCK_binlog_, "LOCK_binlog_", 0} + { &key_ss_mutex_LOCK_binlog_, "LOCK_binlog_", 0 }, + { &key_ss_mutex_LOCK_slave_lag_, "LOCK_slave_lag_", 0 } }; PSI_cond_key key_ss_cond_COND_binlog_send_; +PSI_cond_key key_ss_cond_COND_slave_lag_; static PSI_cond_info all_semisync_conds[]= { - { &key_ss_cond_COND_binlog_send_, "COND_binlog_send_", 0} + { &key_ss_cond_COND_binlog_send_, "COND_binlog_send_", 0 }, + { &key_ss_cond_COND_slave_lag_, "COND_slave_lag_", 0 } }; #endif /* HAVE_PSI_INTERFACE */ PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave= { 0, "Waiting for semi-sync ACK from slave", 0}; +PSI_stage_info stage_waiting_for_semi_sync_slave_lag= +{ 0, "Waiting for semi-sync slave lag", 0}; + #ifdef HAVE_PSI_INTERFACE PSI_stage_info *all_semisync_stages[]= { - & stage_waiting_for_semi_sync_ack_from_slave + & stage_waiting_for_semi_sync_ack_from_slave, + & stage_waiting_for_semi_sync_slave_lag }; static void init_semisync_psi_keys(void) @@ -492,4 +600,3 @@ maria_declare_plugin(semisync_master) MariaDB_PLUGIN_MATURITY_STABLE } maria_declare_plugin_end; - |