diff options
author | Aleksey Midenkov <midenok@gmail.com> | 2017-12-21 11:16:42 +0300 |
---|---|---|
committer | Aleksey Midenkov <midenok@gmail.com> | 2017-12-21 11:16:42 +0300 |
commit | 5c0a19c873382c9fec696f827e6766f61c6682af (patch) | |
tree | c39d60a5557669b779ad72aac6c10abef3ed0eba /plugin | |
parent | 5c760d952b8ae8a8722b206da3de0ebbad4978e5 (diff) | |
parent | 9ec2479778269fb33194c088216119d4f1dca58d (diff) | |
download | mariadb-git-5c0a19c873382c9fec696f827e6766f61c6682af.tar.gz |
System Versioning 1.0 pre7
Merge branch '10.3' into trunk
Diffstat (limited to 'plugin')
-rw-r--r-- | plugin/semisync/CMakeLists.txt | 28 | ||||
-rw-r--r-- | plugin/semisync/semisync.cc | 32 | ||||
-rw-r--r-- | plugin/semisync/semisync.h | 93 | ||||
-rw-r--r-- | plugin/semisync/semisync_master.cc | 1218 | ||||
-rw-r--r-- | plugin/semisync/semisync_master.h | 632 | ||||
-rw-r--r-- | plugin/semisync/semisync_master_plugin.cc | 496 | ||||
-rw-r--r-- | plugin/semisync/semisync_slave.cc | 140 | ||||
-rw-r--r-- | plugin/semisync/semisync_slave.h | 97 | ||||
-rw-r--r-- | plugin/semisync/semisync_slave_plugin.cc | 234 |
9 files changed, 0 insertions, 2970 deletions
diff --git a/plugin/semisync/CMakeLists.txt b/plugin/semisync/CMakeLists.txt deleted file mode 100644 index 88998fb3093..00000000000 --- a/plugin/semisync/CMakeLists.txt +++ /dev/null @@ -1,28 +0,0 @@ -# Copyright (c) 2006, 2010, Oracle and/or its affiliates. All rights reserved. -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; version 2 of the License. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA - -SET(SEMISYNC_MASTER_SOURCES - semisync.cc semisync_master.cc semisync_master_plugin.cc - semisync.h semisync_master.h) - -MYSQL_ADD_PLUGIN(semisync_master ${SEMISYNC_MASTER_SOURCES} - RECOMPILE_FOR_EMBEDDED) - -SET(SEMISYNC_SLAVE_SOURCES semisync.cc semisync_slave.cc - semisync_slave_plugin.cc semisync.h semisync_slave.h ) - -MYSQL_ADD_PLUGIN(semisync_slave ${SEMISYNC_SLAVE_SOURCES} - RECOMPILE_FOR_EMBEDDED) - diff --git a/plugin/semisync/semisync.cc b/plugin/semisync/semisync.cc deleted file mode 100644 index df37f03ec2f..00000000000 --- a/plugin/semisync/semisync.cc +++ /dev/null @@ -1,32 +0,0 @@ -/* Copyright (C) 2007 Google Inc. - Copyright (C) 2008 MySQL AB - Use is subject to license terms - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; version 2 of the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ - - -#include <my_global.h> -#include "semisync.h" - -const unsigned char ReplSemiSyncBase::kPacketMagicNum = 0xef; -const unsigned char ReplSemiSyncBase::kPacketFlagSync = 0x01; - - -const unsigned long Trace::kTraceGeneral = 0x0001; -const unsigned long Trace::kTraceDetail = 0x0010; -const unsigned long Trace::kTraceNetWait = 0x0020; -const unsigned long Trace::kTraceFunction = 0x0040; - -const unsigned char ReplSemiSyncBase::kSyncHeader[2] = - {ReplSemiSyncBase::kPacketMagicNum, 0}; diff --git a/plugin/semisync/semisync.h b/plugin/semisync/semisync.h deleted file mode 100644 index 28577296817..00000000000 --- a/plugin/semisync/semisync.h +++ /dev/null @@ -1,93 +0,0 @@ -/* Copyright (C) 2007 Google Inc. - Copyright (C) 2008 MySQL AB - Use is subject to license terms - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; version 2 of the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ - - -#ifndef SEMISYNC_H -#define SEMISYNC_H - -#define MYSQL_SERVER -#define HAVE_REPLICATION -#include <my_pthread.h> -#include <sql_priv.h> -#include <sql_class.h> -#include "unireg.h" -#include <replication.h> -#include "log.h" /* sql_print_information */ - -typedef struct st_mysql_show_var SHOW_VAR; -typedef struct st_mysql_sys_var SYS_VAR; - - -/** - This class is used to trace function calls and other process - information -*/ -class Trace { -public: - static const unsigned long kTraceFunction; - static const unsigned long kTraceGeneral; - static const unsigned long kTraceDetail; - static const unsigned long kTraceNetWait; - - unsigned long trace_level_; /* the level for tracing */ - - inline void function_enter(const char *func_name) - { - if (trace_level_ & kTraceFunction) - sql_print_information("---> %s enter", func_name); - } - inline int function_exit(const char *func_name, int exit_code) - { - if (trace_level_ & kTraceFunction) - sql_print_information("<--- %s exit (%d)", func_name, exit_code); - return exit_code; - } - - Trace() - :trace_level_(0L) - {} - Trace(unsigned long trace_level) - :trace_level_(trace_level) - {} -}; - -/** - Base class for semi-sync master and slave classes -*/ -class ReplSemiSyncBase - :public Trace { -public: - static const unsigned char kSyncHeader[2]; /* three byte packet header */ - - /* Constants in network packet header. */ - static const unsigned char kPacketMagicNum; - static const unsigned char kPacketFlagSync; -}; - -/* The layout of a semisync slave reply packet: - 1 byte for the magic num - 8 bytes for the binlog positon - n bytes for the binlog filename, terminated with a '\0' -*/ -#define REPLY_MAGIC_NUM_LEN 1 -#define REPLY_BINLOG_POS_LEN 8 -#define REPLY_BINLOG_NAME_LEN (FN_REFLEN + 1) -#define REPLY_MAGIC_NUM_OFFSET 0 -#define REPLY_BINLOG_POS_OFFSET (REPLY_MAGIC_NUM_OFFSET + REPLY_MAGIC_NUM_LEN) -#define REPLY_BINLOG_NAME_OFFSET (REPLY_BINLOG_POS_OFFSET + REPLY_BINLOG_POS_LEN) - -#endif /* SEMISYNC_H */ diff --git a/plugin/semisync/semisync_master.cc b/plugin/semisync/semisync_master.cc deleted file mode 100644 index 975b2e13253..00000000000 --- a/plugin/semisync/semisync_master.cc +++ /dev/null @@ -1,1218 +0,0 @@ -/* Copyright (C) 2007 Google Inc. - Copyright (c) 2008, 2013, Oracle and/or its affiliates. - Copyright (c) 2011, 2016, MariaDB - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; version 2 of the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ - - -#include <my_global.h> -#include "semisync_master.h" - -#define TIME_THOUSAND 1000 -#define TIME_MILLION 1000000 -#define TIME_BILLION 1000000000 - -/* This indicates whether semi-synchronous replication is enabled. */ -char rpl_semi_sync_master_enabled; -unsigned long rpl_semi_sync_master_wait_point = - SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT; -unsigned long rpl_semi_sync_master_timeout; -unsigned long rpl_semi_sync_master_trace_level; -char rpl_semi_sync_master_status = 0; -unsigned long rpl_semi_sync_master_yes_transactions = 0; -unsigned long rpl_semi_sync_master_no_transactions = 0; -unsigned long rpl_semi_sync_master_off_times = 0; -unsigned long rpl_semi_sync_master_timefunc_fails = 0; -unsigned long rpl_semi_sync_master_wait_timeouts = 0; -unsigned long rpl_semi_sync_master_wait_sessions = 0; -unsigned long rpl_semi_sync_master_wait_pos_backtraverse = 0; -unsigned long rpl_semi_sync_master_avg_trx_wait_time = 0; -unsigned long long rpl_semi_sync_master_trx_wait_num = 0; -unsigned long rpl_semi_sync_master_avg_net_wait_time = 0; -unsigned long long rpl_semi_sync_master_net_wait_num = 0; -unsigned long rpl_semi_sync_master_clients = 0; -unsigned long long rpl_semi_sync_master_net_wait_time = 0; -unsigned long long rpl_semi_sync_master_trx_wait_time = 0; -char rpl_semi_sync_master_wait_no_slave = 1; - - -static int getWaitTime(const struct timespec& start_ts); - -static unsigned long long timespec_to_usec(const struct timespec *ts) -{ - return (unsigned long long) ts->tv_sec * TIME_MILLION + ts->tv_nsec / TIME_THOUSAND; -} - -/******************************************************************************* - * - * <ActiveTranx> class : manage all active transaction nodes - * - ******************************************************************************/ - -ActiveTranx::ActiveTranx(mysql_mutex_t *lock, - unsigned long trace_level) - : Trace(trace_level), allocator_(max_connections), - num_entries_(max_connections << 1), /* Transaction hash table size - * is set to double the size - * of max_connections */ - lock_(lock) -{ - /* No transactions are in the list initially. */ - trx_front_ = NULL; - trx_rear_ = NULL; - - /* Create the hash table to find a transaction's ending event. */ - trx_htb_ = new TranxNode *[num_entries_]; - for (int idx = 0; idx < num_entries_; ++idx) - trx_htb_[idx] = NULL; - - sql_print_information("Semi-sync replication initialized for transactions."); -} - -ActiveTranx::~ActiveTranx() -{ - delete [] trx_htb_; - trx_htb_ = NULL; - num_entries_ = 0; -} - -unsigned int ActiveTranx::calc_hash(const unsigned char *key, - unsigned int length) -{ - unsigned int nr = 1, nr2 = 4; - - /* The hash implementation comes from calc_hashnr() in mysys/hash.c. */ - while (length--) - { - nr ^= (((nr & 63)+nr2)*((unsigned int) (unsigned char) *key++))+ (nr << 8); - nr2 += 3; - } - return((unsigned int) nr); -} - -unsigned int ActiveTranx::get_hash_value(const char *log_file_name, - my_off_t log_file_pos) -{ - unsigned int hash1 = calc_hash((const unsigned char *)log_file_name, - strlen(log_file_name)); - unsigned int hash2 = calc_hash((const unsigned char *)(&log_file_pos), - sizeof(log_file_pos)); - - return (hash1 + hash2) % num_entries_; -} - -int ActiveTranx::compare(const char *log_file_name1, my_off_t log_file_pos1, - const char *log_file_name2, my_off_t log_file_pos2) -{ - int cmp = strcmp(log_file_name1, log_file_name2); - - if (cmp != 0) - return cmp; - - if (log_file_pos1 > log_file_pos2) - return 1; - else if (log_file_pos1 < log_file_pos2) - return -1; - return 0; -} - -int ActiveTranx::insert_tranx_node(const char *log_file_name, - my_off_t log_file_pos) -{ - const char *kWho = "ActiveTranx:insert_tranx_node"; - TranxNode *ins_node; - int result = 0; - unsigned int hash_val; - - function_enter(kWho); - - ins_node = allocator_.allocate_node(); - if (!ins_node) - { - sql_print_error("%s: transaction node allocation failed for: (%s, %lu)", - kWho, log_file_name, (unsigned long)log_file_pos); - result = -1; - goto l_end; - } - - /* insert the binlog position in the active transaction list. */ - strncpy(ins_node->log_name_, log_file_name, FN_REFLEN-1); - ins_node->log_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */ - ins_node->log_pos_ = log_file_pos; - - if (!trx_front_) - { - /* The list is empty. */ - trx_front_ = trx_rear_ = ins_node; - } - else - { - int cmp = compare(ins_node, trx_rear_); - if (cmp > 0) - { - /* Compare with the tail first. If the transaction happens later in - * binlog, then make it the new tail. - */ - trx_rear_->next_ = ins_node; - trx_rear_ = ins_node; - } - else - { - /* Otherwise, it is an error because the transaction should hold the - * mysql_bin_log.LOCK_log when appending events. - */ - sql_print_error("%s: binlog write out-of-order, tail (%s, %lu), " - "new node (%s, %lu)", kWho, - trx_rear_->log_name_, (unsigned long)trx_rear_->log_pos_, - ins_node->log_name_, (unsigned long)ins_node->log_pos_); - result = -1; - goto l_end; - } - } - - hash_val = get_hash_value(ins_node->log_name_, ins_node->log_pos_); - ins_node->hash_next_ = trx_htb_[hash_val]; - trx_htb_[hash_val] = ins_node; - - if (trace_level_ & kTraceDetail) - sql_print_information("%s: insert (%s, %lu) in entry(%u)", kWho, - ins_node->log_name_, (unsigned long)ins_node->log_pos_, - hash_val); - - l_end: - return function_exit(kWho, result); -} - -bool ActiveTranx::is_tranx_end_pos(const char *log_file_name, - my_off_t log_file_pos) -{ - const char *kWho = "ActiveTranx::is_tranx_end_pos"; - function_enter(kWho); - - unsigned int hash_val = get_hash_value(log_file_name, log_file_pos); - TranxNode *entry = trx_htb_[hash_val]; - - while (entry != NULL) - { - if (compare(entry, log_file_name, log_file_pos) == 0) - break; - - entry = entry->hash_next_; - } - - if (trace_level_ & kTraceDetail) - sql_print_information("%s: probe (%s, %lu) in entry(%u)", kWho, - log_file_name, (unsigned long)log_file_pos, hash_val); - - function_exit(kWho, (entry != NULL)); - return (entry != NULL); -} - -int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name, - my_off_t log_file_pos) -{ - const char *kWho = "ActiveTranx::::clear_active_tranx_nodes"; - TranxNode *new_front; - - function_enter(kWho); - - if (log_file_name != NULL) - { - new_front = trx_front_; - - while (new_front) - { - if (compare(new_front, log_file_name, log_file_pos) > 0) - break; - new_front = new_front->next_; - } - } - else - { - /* If log_file_name is NULL, clear everything. */ - new_front = NULL; - } - - if (new_front == NULL) - { - /* No active transaction nodes after the call. */ - - /* Clear the hash table. */ - memset(trx_htb_, 0, num_entries_ * sizeof(TranxNode *)); - allocator_.free_all_nodes(); - - /* Clear the active transaction list. */ - if (trx_front_ != NULL) - { - trx_front_ = NULL; - trx_rear_ = NULL; - } - - if (trace_level_ & kTraceDetail) - sql_print_information("%s: cleared all nodes", kWho); - } - else if (new_front != trx_front_) - { - TranxNode *curr_node, *next_node; - - /* Delete all transaction nodes before the confirmation point. */ - int n_frees = 0; - curr_node = trx_front_; - while (curr_node != new_front) - { - next_node = curr_node->next_; - n_frees++; - - /* Remove the node from the hash table. */ - unsigned int hash_val = get_hash_value(curr_node->log_name_, curr_node->log_pos_); - TranxNode **hash_ptr = &(trx_htb_[hash_val]); - while ((*hash_ptr) != NULL) - { - if ((*hash_ptr) == curr_node) - { - (*hash_ptr) = curr_node->hash_next_; - break; - } - hash_ptr = &((*hash_ptr)->hash_next_); - } - - curr_node = next_node; - } - - trx_front_ = new_front; - allocator_.free_nodes_before(trx_front_); - - if (trace_level_ & kTraceDetail) - sql_print_information("%s: cleared %d nodes back until pos (%s, %lu)", - kWho, n_frees, - trx_front_->log_name_, (unsigned long)trx_front_->log_pos_); - } - - return function_exit(kWho, 0); -} - - -/******************************************************************************* - * - * <ReplSemiSyncMaster> class: the basic code layer for sync-replication master. - * <ReplSemiSyncSlave> class: the basic code layer for sync-replication slave. - * - * The most important functions during semi-syn replication listed: - * - * Master: - * . reportReplyBinlog(): called by the binlog dump thread when it receives - * the slave's status information. - * . updateSyncHeader(): based on transaction waiting information, decide - * whether to request the slave to reply. - * . writeTranxInBinlog(): called by the transaction thread when it finishes - * writing all transaction events in binlog. - * . commitTrx(): transaction thread wait for the slave reply. - * - * Slave: - * . slaveReadSyncHeader(): read the semi-sync header from the master, get the - * sync status and get the payload for events. - * . slaveReply(): reply to the master about the replication progress. - * - ******************************************************************************/ - -ReplSemiSyncMaster::ReplSemiSyncMaster() - : active_tranxs_(NULL), - init_done_(false), - reply_file_name_inited_(false), - reply_file_pos_(0L), - wait_file_name_inited_(false), - wait_file_pos_(0), - master_enabled_(false), - wait_timeout_(0L), - state_(0) -{ - strcpy(reply_file_name_, ""); - strcpy(wait_file_name_, ""); -} - -int ReplSemiSyncMaster::initObject() -{ - int result; - const char *kWho = "ReplSemiSyncMaster::initObject"; - - if (init_done_) - { - fprintf(stderr, "%s called twice\n", kWho); - return 1; - } - init_done_ = true; - - /* References to the parameter works after set_options(). */ - setWaitTimeout(rpl_semi_sync_master_timeout); - setTraceLevel(rpl_semi_sync_master_trace_level); - - /* Mutex initialization can only be done after MY_INIT(). */ - mysql_mutex_init(key_ss_mutex_LOCK_binlog_, - &LOCK_binlog_, MY_MUTEX_INIT_FAST); - mysql_cond_init(key_ss_cond_COND_binlog_send_, - &COND_binlog_send_, NULL); - - if (rpl_semi_sync_master_enabled) - result = enableMaster(); - else - result = disableMaster(); - - return result; -} - -int ReplSemiSyncMaster::enableMaster() -{ - int result = 0; - - /* Must have the lock when we do enable of disable. */ - lock(); - - if (!getMasterEnabled()) - { - active_tranxs_ = new ActiveTranx(&LOCK_binlog_, trace_level_); - if (active_tranxs_ != NULL) - { - commit_file_name_inited_ = false; - reply_file_name_inited_ = false; - wait_file_name_inited_ = false; - - set_master_enabled(true); - state_ = true; - sql_print_information("Semi-sync replication enabled on the master."); - } - else - { - sql_print_error("Cannot allocate memory to enable semi-sync on the master."); - result = -1; - } - } - - unlock(); - - return result; -} - -int ReplSemiSyncMaster::disableMaster() -{ - /* Must have the lock when we do enable of disable. */ - lock(); - - if (getMasterEnabled()) - { - /* Switch off the semi-sync first so that waiting transaction will be - * waken up. - */ - switch_off(); - - assert(active_tranxs_ != NULL); - delete active_tranxs_; - active_tranxs_ = NULL; - - reply_file_name_inited_ = false; - wait_file_name_inited_ = false; - commit_file_name_inited_ = false; - - set_master_enabled(false); - sql_print_information("Semi-sync replication disabled on the master."); - } - - unlock(); - - return 0; -} - -void ReplSemiSyncMaster::cleanup() -{ - if (init_done_) - { - mysql_mutex_destroy(&LOCK_binlog_); - mysql_cond_destroy(&COND_binlog_send_); - init_done_= 0; - } - - delete active_tranxs_; -} - -void ReplSemiSyncMaster::lock() -{ - mysql_mutex_lock(&LOCK_binlog_); -} - -void ReplSemiSyncMaster::unlock() -{ - mysql_mutex_unlock(&LOCK_binlog_); -} - -void ReplSemiSyncMaster::cond_broadcast() -{ - mysql_cond_broadcast(&COND_binlog_send_); -} - -int ReplSemiSyncMaster::cond_timewait(struct timespec *wait_time) -{ - const char *kWho = "ReplSemiSyncMaster::cond_timewait()"; - int wait_res; - - function_enter(kWho); - wait_res= mysql_cond_timedwait(&COND_binlog_send_, - &LOCK_binlog_, wait_time); - return function_exit(kWho, wait_res); -} - -void ReplSemiSyncMaster::add_slave() -{ - lock(); - rpl_semi_sync_master_clients++; - unlock(); -} - -void ReplSemiSyncMaster::remove_slave() -{ - lock(); - rpl_semi_sync_master_clients--; - - /* Only switch off if semi-sync is enabled and is on */ - if (getMasterEnabled() && is_on()) - { - /* If user has chosen not to wait if no semi-sync slave available - and the last semi-sync slave exits, turn off semi-sync on master - immediately. - */ - if (!rpl_semi_sync_master_wait_no_slave && - rpl_semi_sync_master_clients == 0) - switch_off(); - } - unlock(); -} - -bool ReplSemiSyncMaster::is_semi_sync_slave() -{ - int null_value; - long long val= 0; - get_user_var_int("rpl_semi_sync_slave", &val, &null_value); - return val; -} - -int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id, - const char *log_file_name, - my_off_t log_file_pos) -{ - const char *kWho = "ReplSemiSyncMaster::reportReplyBinlog"; - int cmp; - bool can_release_threads = false; - bool need_copy_send_pos = true; - - if (!(getMasterEnabled())) - return 0; - - function_enter(kWho); - - lock(); - - /* This is the real check inside the mutex. */ - if (!getMasterEnabled()) - goto l_end; - - if (!is_on()) - /* We check to see whether we can switch semi-sync ON. */ - try_switch_on(server_id, log_file_name, log_file_pos); - - /* The position should increase monotonically, if there is only one - * thread sending the binlog to the slave. - * In reality, to improve the transaction availability, we allow multiple - * sync replication slaves. So, if any one of them get the transaction, - * the transaction session in the primary can move forward. - */ - if (reply_file_name_inited_) - { - cmp = ActiveTranx::compare(log_file_name, log_file_pos, - reply_file_name_, reply_file_pos_); - - /* If the requested position is behind the sending binlog position, - * would not adjust sending binlog position. - * We based on the assumption that there are multiple semi-sync slave, - * and at least one of them shou/ld be up to date. - * If all semi-sync slaves are behind, at least initially, the primary - * can find the situation after the waiting timeout. After that, some - * slaves should catch up quickly. - */ - if (cmp < 0) - { - /* If the position is behind, do not copy it. */ - need_copy_send_pos = false; - } - } - - if (need_copy_send_pos) - { - strmake_buf(reply_file_name_, log_file_name); - reply_file_pos_ = log_file_pos; - reply_file_name_inited_ = true; - - /* Remove all active transaction nodes before this point. */ - assert(active_tranxs_ != NULL); - active_tranxs_->clear_active_tranx_nodes(log_file_name, log_file_pos); - - if (trace_level_ & kTraceDetail) - sql_print_information("%s: Got reply at (%s, %lu)", kWho, - log_file_name, (unsigned long)log_file_pos); - } - - if (rpl_semi_sync_master_wait_sessions > 0) - { - /* Let us check if some of the waiting threads doing a trx - * commit can now proceed. - */ - cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_, - wait_file_name_, wait_file_pos_); - if (cmp >= 0) - { - /* Yes, at least one waiting thread can now proceed: - * let us release all waiting threads with a broadcast - */ - can_release_threads = true; - wait_file_name_inited_ = false; - } - } - - l_end: - unlock(); - - if (can_release_threads) - { - if (trace_level_ & kTraceDetail) - sql_print_information("%s: signal all waiting threads.", kWho); - - cond_broadcast(); - } - - return function_exit(kWho, 0); -} - -int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name, - my_off_t trx_wait_binlog_pos) -{ - const char *kWho = "ReplSemiSyncMaster::commitTrx"; - - function_enter(kWho); - - if (getMasterEnabled() && trx_wait_binlog_name) - { - struct timespec start_ts; - struct timespec abstime; - int wait_result; - PSI_stage_info old_stage; - - set_timespec(start_ts, 0); - - DEBUG_SYNC(current_thd, "rpl_semisync_master_commit_trx_before_lock"); - /* Acquire the mutex. */ - lock(); - - /* This must be called after acquired the lock */ - THD_ENTER_COND(NULL, &COND_binlog_send_, &LOCK_binlog_, - & stage_waiting_for_semi_sync_ack_from_slave, - & old_stage); - - /* This is the real check inside the mutex. */ - if (!getMasterEnabled() || !is_on()) - goto l_end; - - if (trace_level_ & kTraceDetail) - { - sql_print_information("%s: wait pos (%s, %lu), repl(%d)\n", kWho, - trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos, - (int)is_on()); - } - - while (is_on() && !thd_killed(current_thd)) - { - if (reply_file_name_inited_) - { - int cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_, - trx_wait_binlog_name, trx_wait_binlog_pos); - if (cmp >= 0) - { - /* We have already sent the relevant binlog to the slave: no need to - * wait here. - */ - if (trace_level_ & kTraceDetail) - sql_print_information("%s: Binlog reply is ahead (%s, %lu),", - kWho, reply_file_name_, (unsigned long)reply_file_pos_); - break; - } - } - - /* Let us update the info about the minimum binlog position of waiting - * threads. - */ - if (wait_file_name_inited_) - { - int cmp = ActiveTranx::compare(trx_wait_binlog_name, trx_wait_binlog_pos, - wait_file_name_, wait_file_pos_); - if (cmp <= 0) - { - /* This thd has a lower position, let's update the minimum info. */ - strmake_buf(wait_file_name_, trx_wait_binlog_name); - wait_file_pos_ = trx_wait_binlog_pos; - - rpl_semi_sync_master_wait_pos_backtraverse++; - if (trace_level_ & kTraceDetail) - sql_print_information("%s: move back wait position (%s, %lu),", - kWho, wait_file_name_, (unsigned long)wait_file_pos_); - } - } - else - { - strmake_buf(wait_file_name_, trx_wait_binlog_name); - wait_file_pos_ = trx_wait_binlog_pos; - wait_file_name_inited_ = true; - - if (trace_level_ & kTraceDetail) - sql_print_information("%s: init wait position (%s, %lu),", - kWho, wait_file_name_, (unsigned long)wait_file_pos_); - } - - /* Calcuate the waiting period. */ - long diff_secs = (long) (wait_timeout_ / TIME_THOUSAND); - long diff_nsecs = (long) ((wait_timeout_ % TIME_THOUSAND) * TIME_MILLION); - long nsecs = start_ts.tv_nsec + diff_nsecs; - abstime.tv_sec = start_ts.tv_sec + diff_secs + nsecs/TIME_BILLION; - abstime.tv_nsec = nsecs % TIME_BILLION; - - /* In semi-synchronous replication, we wait until the binlog-dump - * thread has received the reply on the relevant binlog segment from the - * replication slave. - * - * Let us suspend this thread to wait on the condition; - * when replication has progressed far enough, we will release - * these waiting threads. - */ - rpl_semi_sync_master_wait_sessions++; - - if (trace_level_ & kTraceDetail) - sql_print_information("%s: wait %lu ms for binlog sent (%s, %lu)", - kWho, wait_timeout_, - wait_file_name_, (unsigned long)wait_file_pos_); - - wait_result = cond_timewait(&abstime); - rpl_semi_sync_master_wait_sessions--; - - if (wait_result != 0) - { - /* This is a real wait timeout. */ - sql_print_warning("Timeout waiting for reply of binlog (file: %s, pos: %lu), " - "semi-sync up to file %s, position %lu.", - trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos, - reply_file_name_, (unsigned long)reply_file_pos_); - rpl_semi_sync_master_wait_timeouts++; - - /* switch semi-sync off */ - switch_off(); - } - else - { - int wait_time; - - wait_time = getWaitTime(start_ts); - if (wait_time < 0) - { - if (trace_level_ & kTraceGeneral) - { - sql_print_error("Replication semi-sync getWaitTime fail at " - "wait position (%s, %lu)", - trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos); - } - rpl_semi_sync_master_timefunc_fails++; - } - else - { - rpl_semi_sync_master_trx_wait_num++; - rpl_semi_sync_master_trx_wait_time += wait_time; - } - } - } - - /* - At this point, the binlog file and position of this transaction - must have been removed from ActiveTranx. - active_tranxs_ may be NULL if someone disabled semi sync during - cond_timewait() - */ - assert(thd_killed(current_thd) || !active_tranxs_ || - !active_tranxs_->is_tranx_end_pos(trx_wait_binlog_name, - trx_wait_binlog_pos)); - - l_end: - /* Update the status counter. */ - if (is_on()) - rpl_semi_sync_master_yes_transactions++; - else - rpl_semi_sync_master_no_transactions++; - - /* The lock held will be released by thd_exit_cond, so no need to - call unlock() here */ - THD_EXIT_COND(NULL, & old_stage); - } - - return function_exit(kWho, 0); -} - -/* Indicate that semi-sync replication is OFF now. - * - * What should we do when it is disabled? The problem is that we want - * the semi-sync replication enabled again when the slave catches up - * later. But, it is not that easy to detect that the slave has caught - * up. This is caused by the fact that MySQL's replication protocol is - * asynchronous, meaning that if the master does not use the semi-sync - * protocol, the slave would not send anything to the master. - * Still, if the master is sending (N+1)-th event, we assume that it is - * an indicator that the slave has received N-th event and earlier ones. - * - * If semi-sync is disabled, all transactions still update the wait - * position with the last position in binlog. But no transactions will - * wait for confirmations and the active transaction list would not be - * maintained. In binlog dump thread, updateSyncHeader() checks whether - * the current sending event catches up with last wait position. If it - * does match, semi-sync will be switched on again. - */ -int ReplSemiSyncMaster::switch_off() -{ - const char *kWho = "ReplSemiSyncMaster::switch_off"; - int result; - - function_enter(kWho); - state_ = false; - - /* Clear the active transaction list. */ - assert(active_tranxs_ != NULL); - result = active_tranxs_->clear_active_tranx_nodes(NULL, 0); - - rpl_semi_sync_master_off_times++; - wait_file_name_inited_ = false; - reply_file_name_inited_ = false; - sql_print_information("Semi-sync replication switched OFF."); - cond_broadcast(); /* wake up all waiting threads */ - - return function_exit(kWho, result); -} - -int ReplSemiSyncMaster::try_switch_on(int server_id, - const char *log_file_name, - my_off_t log_file_pos) -{ - const char *kWho = "ReplSemiSyncMaster::try_switch_on"; - bool semi_sync_on = false; - - function_enter(kWho); - - /* If the current sending event's position is larger than or equal to the - * 'largest' commit transaction binlog position, the slave is already - * catching up now and we can switch semi-sync on here. - * If commit_file_name_inited_ indicates there are no recent transactions, - * we can enable semi-sync immediately. - */ - if (commit_file_name_inited_) - { - int cmp = ActiveTranx::compare(log_file_name, log_file_pos, - commit_file_name_, commit_file_pos_); - semi_sync_on = (cmp >= 0); - } - else - { - semi_sync_on = true; - } - - if (semi_sync_on) - { - /* Switch semi-sync replication on. */ - state_ = true; - - sql_print_information("Semi-sync replication switched ON with slave (server_id: %d) " - "at (%s, %lu)", - server_id, log_file_name, - (unsigned long)log_file_pos); - } - - return function_exit(kWho, 0); -} - -int ReplSemiSyncMaster::reserveSyncHeader(unsigned char *header, - unsigned long size) -{ - const char *kWho = "ReplSemiSyncMaster::reserveSyncHeader"; - function_enter(kWho); - - int hlen=0; - if (!is_semi_sync_slave()) - { - hlen= 0; - } - else - { - /* No enough space for the extra header, disable semi-sync master */ - if (sizeof(kSyncHeader) > size) - { - sql_print_warning("No enough space in the packet " - "for semi-sync extra header, " - "semi-sync replication disabled"); - disableMaster(); - return 0; - } - - /* Set the magic number and the sync status. By default, no sync - * is required. - */ - memcpy(header, kSyncHeader, sizeof(kSyncHeader)); - hlen= sizeof(kSyncHeader); - } - return function_exit(kWho, hlen); -} - -int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet, - const char *log_file_name, - my_off_t log_file_pos, - uint32 server_id) -{ - const char *kWho = "ReplSemiSyncMaster::updateSyncHeader"; - int cmp = 0; - bool sync = false; - - /* If the semi-sync master is not enabled, or the slave is not a semi-sync - * target, do not request replies from the slave. - */ - if (!getMasterEnabled() || !is_semi_sync_slave()) - return 0; - - function_enter(kWho); - - lock(); - - /* This is the real check inside the mutex. */ - if (!getMasterEnabled()) - goto l_end; // sync= false at this point in time - - if (is_on()) - { - /* semi-sync is ON */ - /* sync= false; No sync unless a transaction is involved. */ - - if (reply_file_name_inited_) - { - cmp = ActiveTranx::compare(log_file_name, log_file_pos, - reply_file_name_, reply_file_pos_); - if (cmp <= 0) - { - /* If we have already got the reply for the event, then we do - * not need to sync the transaction again. - */ - goto l_end; - } - } - - if (wait_file_name_inited_) - { - cmp = ActiveTranx::compare(log_file_name, log_file_pos, - wait_file_name_, wait_file_pos_); - } - else - { - cmp = 1; - } - - /* If we are already waiting for some transaction replies which - * are later in binlog, do not wait for this one event. - */ - if (cmp >= 0) - { - /* - * We only wait if the event is a transaction's ending event. - */ - assert(active_tranxs_ != NULL); - sync = active_tranxs_->is_tranx_end_pos(log_file_name, - log_file_pos); - } - } - else - { - if (commit_file_name_inited_) - { - int cmp = ActiveTranx::compare(log_file_name, log_file_pos, - commit_file_name_, commit_file_pos_); - sync = (cmp >= 0); - } - else - { - sync = true; - } - } - - if (trace_level_ & kTraceDetail) - sql_print_information("%s: server(%d), (%s, %lu) sync(%d), repl(%d)", - kWho, server_id, log_file_name, - (unsigned long)log_file_pos, sync, (int)is_on()); - - l_end: - unlock(); - - /* We do not need to clear sync flag because we set it to 0 when we - * reserve the packet header. - */ - if (sync) - { - (packet)[2] = kPacketFlagSync; - } - - return function_exit(kWho, 0); -} - -int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name, - my_off_t log_file_pos) -{ - const char *kWho = "ReplSemiSyncMaster::writeTranxInBinlog"; - int result = 0; - - function_enter(kWho); - - lock(); - - /* This is the real check inside the mutex. */ - if (!getMasterEnabled()) - goto l_end; - - /* Update the 'largest' transaction commit position seen so far even - * though semi-sync is switched off. - * It is much better that we update commit_file_* here, instead of - * inside commitTrx(). This is mostly because updateSyncHeader() - * will watch for commit_file_* to decide whether to switch semi-sync - * on. The detailed reason is explained in function updateSyncHeader(). - */ - if (commit_file_name_inited_) - { - int cmp = ActiveTranx::compare(log_file_name, log_file_pos, - commit_file_name_, commit_file_pos_); - if (cmp > 0) - { - /* This is a larger position, let's update the maximum info. */ - strncpy(commit_file_name_, log_file_name, FN_REFLEN-1); - commit_file_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */ - commit_file_pos_ = log_file_pos; - } - } - else - { - strncpy(commit_file_name_, log_file_name, FN_REFLEN-1); - commit_file_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */ - commit_file_pos_ = log_file_pos; - commit_file_name_inited_ = true; - } - - if (is_on()) - { - assert(active_tranxs_ != NULL); - if(active_tranxs_->insert_tranx_node(log_file_name, log_file_pos)) - { - /* - if insert tranx_node failed, print a warning message - and turn off semi-sync - */ - sql_print_warning("Semi-sync failed to insert tranx_node for binlog file: %s, position: %lu", - log_file_name, (ulong)log_file_pos); - switch_off(); - } - } - - l_end: - unlock(); - - return function_exit(kWho, result); -} - -int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id, - const char *event_buf) -{ - const char *kWho = "ReplSemiSyncMaster::readSlaveReply"; - const unsigned char *packet; - char log_file_name[FN_REFLEN]; - my_off_t log_file_pos; - ulong log_file_len = 0; - ulong packet_len; - int result = -1; - struct timespec start_ts; - ulong trc_level = trace_level_; - LINT_INIT_STRUCT(start_ts); - - function_enter(kWho); - - assert((unsigned char)event_buf[1] == kPacketMagicNum); - if ((unsigned char)event_buf[2] != kPacketFlagSync) - { - /* current event does not require reply */ - result = 0; - goto l_end; - } - - if (trc_level & kTraceNetWait) - set_timespec(start_ts, 0); - - /* We flush to make sure that the current event is sent to the network, - * instead of being buffered in the TCP/IP stack. - */ - if (net_flush(net)) - { - sql_print_error("Semi-sync master failed on net_flush() " - "before waiting for slave reply"); - goto l_end; - } - - net_clear(net, 0); - if (trc_level & kTraceDetail) - sql_print_information("%s: Wait for replica's reply", kWho); - - /* Wait for the network here. Though binlog dump thread can indefinitely wait - * here, transactions would not wait indefintely. - * Transactions wait on binlog replies detected by binlog dump threads. If - * binlog dump threads wait too long, transactions will timeout and continue. - */ - packet_len = my_net_read(net); - - if (trc_level & kTraceNetWait) - { - int wait_time = getWaitTime(start_ts); - if (wait_time < 0) - { - sql_print_error("Semi-sync master wait for reply " - "fail to get wait time."); - rpl_semi_sync_master_timefunc_fails++; - } - else - { - rpl_semi_sync_master_net_wait_num++; - rpl_semi_sync_master_net_wait_time += wait_time; - } - } - - if (packet_len == packet_error || packet_len < REPLY_BINLOG_NAME_OFFSET) - { - if (packet_len == packet_error) - sql_print_error("Read semi-sync reply network error: %s (errno: %d)", - net->last_error, net->last_errno); - else - sql_print_error("Read semi-sync reply length error: %s (errno: %d)", - net->last_error, net->last_errno); - goto l_end; - } - - packet = net->read_pos; - if (packet[REPLY_MAGIC_NUM_OFFSET] != ReplSemiSyncMaster::kPacketMagicNum) - { - sql_print_error("Read semi-sync reply magic number error"); - goto l_end; - } - - log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET); - log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET; - if (log_file_len >= FN_REFLEN) - { - sql_print_error("Read semi-sync reply binlog file length too large"); - goto l_end; - } - strncpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len); - log_file_name[log_file_len] = 0; - - if (trc_level & kTraceDetail) - sql_print_information("%s: Got reply (%s, %lu)", - kWho, log_file_name, (ulong)log_file_pos); - - result = reportReplyBinlog(server_id, log_file_name, log_file_pos); - - l_end: - return function_exit(kWho, result); -} - - -int ReplSemiSyncMaster::resetMaster() -{ - const char *kWho = "ReplSemiSyncMaster::resetMaster"; - int result = 0; - - function_enter(kWho); - - - lock(); - - state_ = getMasterEnabled()? 1 : 0; - - wait_file_name_inited_ = false; - reply_file_name_inited_ = false; - commit_file_name_inited_ = false; - - rpl_semi_sync_master_yes_transactions = 0; - rpl_semi_sync_master_no_transactions = 0; - rpl_semi_sync_master_off_times = 0; - rpl_semi_sync_master_timefunc_fails = 0; - rpl_semi_sync_master_wait_sessions = 0; - rpl_semi_sync_master_wait_pos_backtraverse = 0; - rpl_semi_sync_master_trx_wait_num = 0; - rpl_semi_sync_master_trx_wait_time = 0; - rpl_semi_sync_master_net_wait_num = 0; - rpl_semi_sync_master_net_wait_time = 0; - - unlock(); - - return function_exit(kWho, result); -} - -void ReplSemiSyncMaster::setExportStats() -{ - lock(); - - rpl_semi_sync_master_status = state_; - rpl_semi_sync_master_avg_trx_wait_time= - ((rpl_semi_sync_master_trx_wait_num) ? - (unsigned long)((double)rpl_semi_sync_master_trx_wait_time / - ((double)rpl_semi_sync_master_trx_wait_num)) : 0); - rpl_semi_sync_master_avg_net_wait_time= - ((rpl_semi_sync_master_net_wait_num) ? - (unsigned long)((double)rpl_semi_sync_master_net_wait_time / - ((double)rpl_semi_sync_master_net_wait_num)) : 0); - - unlock(); -} - -/* Get the waiting time given the wait's staring time. - * - * Return: - * >= 0: the waiting time in microsecons(us) - * < 0: error in get time or time back traverse - */ -static int getWaitTime(const struct timespec& start_ts) -{ - unsigned long long start_usecs, end_usecs; - struct timespec end_ts; - - /* Starting time in microseconds(us). */ - start_usecs = timespec_to_usec(&start_ts); - - /* Get the wait time interval. */ - set_timespec(end_ts, 0); - - /* Ending time in microseconds(us). */ - end_usecs = timespec_to_usec(&end_ts); - - if (end_usecs < start_usecs) - return -1; - - return (int)(end_usecs - start_usecs); -} diff --git a/plugin/semisync/semisync_master.h b/plugin/semisync/semisync_master.h deleted file mode 100644 index c2862476ec8..00000000000 --- a/plugin/semisync/semisync_master.h +++ /dev/null @@ -1,632 +0,0 @@ -/* Copyright (C) 2007 Google Inc. - Copyright (c) 2008 MySQL AB, 2009 Sun Microsystems, Inc. - Use is subject to license terms. - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; version 2 of the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ - - -#ifndef SEMISYNC_MASTER_H -#define SEMISYNC_MASTER_H - -#include "semisync.h" - -#ifdef HAVE_PSI_INTERFACE -extern PSI_mutex_key key_ss_mutex_LOCK_binlog_; -extern PSI_cond_key key_ss_cond_COND_binlog_send_; -#endif - -extern PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave; - -struct TranxNode { - char log_name_[FN_REFLEN]; - my_off_t log_pos_; - struct TranxNode *next_; /* the next node in the sorted list */ - struct TranxNode *hash_next_; /* the next node during hash collision */ -}; - -/** - @class TranxNodeAllocator - - This class provides memory allocating and freeing methods for - TranxNode. The main target is performance. - - @section ALLOCATE How to allocate a node - The pointer of the first node after 'last_node' in current_block is - returned. current_block will move to the next free Block when all nodes of - it are in use. A new Block is allocated and is put into the rear of the - Block link table if no Block is free. - - The list starts up empty (ie, there is no allocated Block). - - After some nodes are freed, there probably are some free nodes before - the sequence of the allocated nodes, but we do not reuse it. It is better - to keep the allocated nodes are in the sequence, for it is more efficient - for allocating and freeing TranxNode. - - @section FREENODE How to free nodes - There are two methods for freeing nodes. They are free_all_nodes and - free_nodes_before. - - 'A Block is free' means all of its nodes are free. - @subsection free_nodes_before - As all allocated nodes are in the sequence, 'Before one node' means all - nodes before given node in the same Block and all Blocks before the Block - which containing the given node. As such, all Blocks before the given one - ('node') are free Block and moved into the rear of the Block link table. - The Block containing the given 'node', however, is not. For at least the - given 'node' is still in use. This will waste at most one Block, but it is - more efficient. - */ -#define BLOCK_TRANX_NODES 16 -class TranxNodeAllocator -{ -public: - /** - @param reserved_nodes - The number of reserved TranxNodes. It is used to set 'reserved_blocks' - which can contain at least 'reserved_nodes' number of TranxNodes. When - freeing memory, we will reserve at least reserved_blocks of Blocks not - freed. - */ - TranxNodeAllocator(uint reserved_nodes) : - reserved_blocks(reserved_nodes/BLOCK_TRANX_NODES + - (reserved_nodes%BLOCK_TRANX_NODES > 1 ? 2 : 1)), - first_block(NULL), last_block(NULL), - current_block(NULL), last_node(-1), block_num(0) {} - - ~TranxNodeAllocator() - { - Block *block= first_block; - while (block != NULL) - { - Block *next= block->next; - free_block(block); - block= next; - } - } - - /** - The pointer of the first node after 'last_node' in current_block is - returned. current_block will move to the next free Block when all nodes of - it are in use. A new Block is allocated and is put into the rear of the - Block link table if no Block is free. - - @return Return a TranxNode *, or NULL if an error occurred. - */ - TranxNode *allocate_node() - { - TranxNode *trx_node; - Block *block= current_block; - - if (last_node == BLOCK_TRANX_NODES-1) - { - current_block= current_block->next; - last_node= -1; - } - - if (current_block == NULL && allocate_block()) - { - current_block= block; - if (current_block) - last_node= BLOCK_TRANX_NODES-1; - return NULL; - } - - trx_node= &(current_block->nodes[++last_node]); - trx_node->log_name_[0] = '\0'; - trx_node->log_pos_= 0; - trx_node->next_= 0; - trx_node->hash_next_= 0; - return trx_node; - } - - /** - All nodes are freed. - - @return Return 0, or 1 if an error occurred. - */ - int free_all_nodes() - { - current_block= first_block; - last_node= -1; - free_blocks(); - return 0; - } - - /** - All Blocks before the given 'node' are free Block and moved into the rear - of the Block link table. - - @param node All nodes before 'node' will be freed - - @return Return 0, or 1 if an error occurred. - */ - int free_nodes_before(TranxNode* node) - { - Block *block; - Block *prev_block= NULL; - - block= first_block; - while (block != current_block->next) - { - /* Find the Block containing the given node */ - if (&(block->nodes[0]) <= node && &(block->nodes[BLOCK_TRANX_NODES]) >= node) - { - /* All Blocks before the given node are put into the rear */ - if (first_block != block) - { - last_block->next= first_block; - first_block= block; - last_block= prev_block; - last_block->next= NULL; - free_blocks(); - } - return 0; - } - prev_block= block; - block= block->next; - } - - /* Node does not find should never happen */ - DBUG_ASSERT(0); - return 1; - } - -private: - uint reserved_blocks; - - /** - A sequence memory which contains BLOCK_TRANX_NODES TranxNodes. - - BLOCK_TRANX_NODES The number of TranxNodes which are in a Block. - - next Every Block has a 'next' pointer which points to the next Block. - These linking Blocks constitute a Block link table. - */ - struct Block { - Block *next; - TranxNode nodes[BLOCK_TRANX_NODES]; - }; - - /** - The 'first_block' is the head of the Block link table; - */ - Block *first_block; - /** - The 'last_block' is the rear of the Block link table; - */ - Block *last_block; - - /** - current_block always points the Block in the Block link table in - which the last allocated node is. The Blocks before it are all in use - and the Blocks after it are all free. - */ - Block *current_block; - - /** - It always points to the last node which has been allocated in the - current_block. - */ - int last_node; - - /** - How many Blocks are in the Block link table. - */ - uint block_num; - - /** - Allocate a block and then assign it to current_block. - */ - int allocate_block() - { - Block *block= (Block *)my_malloc(sizeof(Block), MYF(0)); - if (block) - { - block->next= NULL; - - if (first_block == NULL) - first_block= block; - else - last_block->next= block; - - /* New Block is always put into the rear */ - last_block= block; - /* New Block is always the current_block */ - current_block= block; - ++block_num; - return 0; - } - return 1; - } - - /** - Free a given Block. - @param block The Block will be freed. - */ - void free_block(Block *block) - { - my_free(block); - --block_num; - } - - - /** - If there are some free Blocks and the total number of the Blocks in the - Block link table is larger than the 'reserved_blocks', Some free Blocks - will be freed until the total number of the Blocks is equal to the - 'reserved_blocks' or there is only one free Block behind the - 'current_block'. - */ - void free_blocks() - { - if (current_block == NULL || current_block->next == NULL) - return; - - /* One free Block is always kept behind the current block */ - Block *block= current_block->next->next; - while (block_num > reserved_blocks && block != NULL) - { - Block *next= block->next; - free_block(block); - block= next; - } - current_block->next->next= block; - if (block == NULL) - last_block= current_block->next; - } -}; - -/** - This class manages memory for active transaction list. - - We record each active transaction with a TranxNode, each session - can have only one open transaction. Because of EVENT, the total - active transaction nodes can exceed the maximum allowed - connections. -*/ -class ActiveTranx - :public Trace { -private: - - TranxNodeAllocator allocator_; - /* These two record the active transaction list in sort order. */ - TranxNode *trx_front_, *trx_rear_; - - TranxNode **trx_htb_; /* A hash table on active transactions. */ - - int num_entries_; /* maximum hash table entries */ - mysql_mutex_t *lock_; /* mutex lock */ - - inline void assert_lock_owner(); - - inline unsigned int calc_hash(const unsigned char *key,unsigned int length); - unsigned int get_hash_value(const char *log_file_name, my_off_t log_file_pos); - - int compare(const char *log_file_name1, my_off_t log_file_pos1, - const TranxNode *node2) { - return compare(log_file_name1, log_file_pos1, - node2->log_name_, node2->log_pos_); - } - int compare(const TranxNode *node1, - const char *log_file_name2, my_off_t log_file_pos2) { - return compare(node1->log_name_, node1->log_pos_, - log_file_name2, log_file_pos2); - } - int compare(const TranxNode *node1, const TranxNode *node2) { - return compare(node1->log_name_, node1->log_pos_, - node2->log_name_, node2->log_pos_); - } - -public: - ActiveTranx(mysql_mutex_t *lock, unsigned long trace_level); - ~ActiveTranx(); - - /* Insert an active transaction node with the specified position. - * - * Return: - * 0: success; non-zero: error - */ - int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos); - - /* Clear the active transaction nodes until(inclusive) the specified - * position. - * If log_file_name is NULL, everything will be cleared: the sorted - * list and the hash table will be reset to empty. - * - * Return: - * 0: success; non-zero: error - */ - int clear_active_tranx_nodes(const char *log_file_name, - my_off_t log_file_pos); - - /* Given a position, check to see whether the position is an active - * transaction's ending position by probing the hash table. - */ - bool is_tranx_end_pos(const char *log_file_name, my_off_t log_file_pos); - - /* Given two binlog positions, compare which one is bigger based on - * (file_name, file_position). - */ - static int compare(const char *log_file_name1, my_off_t log_file_pos1, - const char *log_file_name2, my_off_t log_file_pos2); - -}; - -/** - The extension class for the master of semi-synchronous replication -*/ -class ReplSemiSyncMaster - :public ReplSemiSyncBase { - private: - ActiveTranx *active_tranxs_; /* active transaction list: the list will - be cleared when semi-sync switches off. */ - - /* True when initObject has been called */ - bool init_done_; - - /* This cond variable is signaled when enough binlog has been sent to slave, - * so that a waiting trx can return the 'ok' to the client for a commit. - */ - mysql_cond_t COND_binlog_send_; - - /* Mutex that protects the following state variables and the active - * transaction list. - * Under no cirumstances we can acquire mysql_bin_log.LOCK_log if we are - * already holding LOCK_binlog_ because it can cause deadlocks. - */ - mysql_mutex_t LOCK_binlog_; - - /* This is set to true when reply_file_name_ contains meaningful data. */ - bool reply_file_name_inited_; - - /* The binlog name up to which we have received replies from any slaves. */ - char reply_file_name_[FN_REFLEN]; - - /* The position in that file up to which we have the reply from any slaves. */ - my_off_t reply_file_pos_; - - /* This is set to true when we know the 'smallest' wait position. */ - bool wait_file_name_inited_; - - /* NULL, or the 'smallest' filename that a transaction is waiting for - * slave replies. - */ - char wait_file_name_[FN_REFLEN]; - - /* The smallest position in that file that a trx is waiting for: the trx - * can proceed and send an 'ok' to the client when the master has got the - * reply from the slave indicating that it already got the binlog events. - */ - my_off_t wait_file_pos_; - - /* This is set to true when we know the 'largest' transaction commit - * position in the binlog file. - * We always maintain the position no matter whether semi-sync is switched - * on switched off. When a transaction wait timeout occurs, semi-sync will - * switch off. Binlog-dump thread can use the three fields to detect when - * slaves catch up on replication so that semi-sync can switch on again. - */ - bool commit_file_name_inited_; - - /* The 'largest' binlog filename that a commit transaction is seeing. */ - char commit_file_name_[FN_REFLEN]; - - /* The 'largest' position in that file that a commit transaction is seeing. */ - my_off_t commit_file_pos_; - - /* All global variables which can be set by parameters. */ - volatile bool master_enabled_; /* semi-sync is enabled on the master */ - unsigned long wait_timeout_; /* timeout period(ms) during tranx wait */ - - bool state_; /* whether semi-sync is switched */ - - void lock(); - void unlock(); - void cond_broadcast(); - int cond_timewait(struct timespec *wait_time); - - /* Is semi-sync replication on? */ - bool is_on() { - return (state_); - } - - void set_master_enabled(bool enabled) { - master_enabled_ = enabled; - } - - /* Switch semi-sync off because of timeout in transaction waiting. */ - int switch_off(); - - /* Switch semi-sync on when slaves catch up. */ - int try_switch_on(int server_id, - const char *log_file_name, my_off_t log_file_pos); - - public: - ReplSemiSyncMaster(); - ~ReplSemiSyncMaster() {} - - void cleanup(); - - bool getMasterEnabled() { - return master_enabled_; - } - void setTraceLevel(unsigned long trace_level) { - trace_level_ = trace_level; - if (active_tranxs_) - active_tranxs_->trace_level_ = trace_level; - } - - /* Set the transaction wait timeout period, in milliseconds. */ - void setWaitTimeout(unsigned long wait_timeout) { - wait_timeout_ = wait_timeout; - } - - /* Initialize this class after MySQL parameters are initialized. this - * function should be called once at bootstrap time. - */ - int initObject(); - - /* Enable the object to enable semi-sync replication inside the master. */ - int enableMaster(); - - /* Enable the object to enable semi-sync replication inside the master. */ - int disableMaster(); - - /* Add a semi-sync replication slave */ - void add_slave(); - - /* Remove a semi-sync replication slave */ - void remove_slave(); - - /* Is the slave servered by the thread requested semi-sync */ - bool is_semi_sync_slave(); - - /* In semi-sync replication, reports up to which binlog position we have - * received replies from the slave indicating that it already get the events. - * - * Input: - * server_id - (IN) master server id number - * log_file_name - (IN) binlog file name - * end_offset - (IN) the offset in the binlog file up to which we have - * the replies from the slave - * - * Return: - * 0: success; non-zero: error - */ - int reportReplyBinlog(uint32 server_id, - const char* log_file_name, - my_off_t end_offset); - - /* Commit a transaction in the final step. This function is called from - * InnoDB before returning from the low commit. If semi-sync is switch on, - * the function will wait to see whether binlog-dump thread get the reply for - * the events of the transaction. Remember that this is not a direct wait, - * instead, it waits to see whether the binlog-dump thread has reached the - * point. If the wait times out, semi-sync status will be switched off and - * all other transaction would not wait either. - * - * Input: (the transaction events' ending binlog position) - * trx_wait_binlog_name - (IN) ending position's file name - * trx_wait_binlog_pos - (IN) ending position's file offset - * - * Return: - * 0: success; non-zero: error - */ - int commitTrx(const char* trx_wait_binlog_name, - my_off_t trx_wait_binlog_pos); - - /* Reserve space in the replication event packet header: - * . slave semi-sync off: 1 byte - (0) - * . slave semi-sync on: 3 byte - (0, 0xef, 0/1} - * - * Input: - * header - (IN) the header buffer - * size - (IN) size of the header buffer - * - * Return: - * size of the bytes reserved for header - */ - int reserveSyncHeader(unsigned char *header, unsigned long size); - - /* Update the sync bit in the packet header to indicate to the slave whether - * the master will wait for the reply of the event. If semi-sync is switched - * off and we detect that the slave is catching up, we switch semi-sync on. - * - * Input: - * packet - (IN) the packet containing the replication event - * log_file_name - (IN) the event ending position's file name - * log_file_pos - (IN) the event ending position's file offset - * server_id - (IN) master server id number - * - * Return: - * 0: success; non-zero: error - */ - int updateSyncHeader(unsigned char *packet, - const char *log_file_name, - my_off_t log_file_pos, - uint32 server_id); - - /* Called when a transaction finished writing binlog events. - * . update the 'largest' transactions' binlog event position - * . insert the ending position in the active transaction list if - * semi-sync is on - * - * Input: (the transaction events' ending binlog position) - * log_file_name - (IN) transaction ending position's file name - * log_file_pos - (IN) transaction ending position's file offset - * - * Return: - * 0: success; non-zero: error - */ - int writeTranxInBinlog(const char* log_file_name, my_off_t log_file_pos); - - /* Read the slave's reply so that we know how much progress the slave makes - * on receive replication events. - * - * Input: - * net - (IN) the connection to master - * server_id - (IN) master server id number - * event_buf - (IN) pointer to the event packet - * - * Return: - * 0: success; non-zero: error - */ - int readSlaveReply(NET *net, uint32 server_id, const char *event_buf); - - /* Export internal statistics for semi-sync replication. */ - void setExportStats(); - - /* 'reset master' command is issued from the user and semi-sync need to - * go off for that. - */ - int resetMaster(); -}; - -enum rpl_semi_sync_master_wait_point_t { - SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC, - SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT, -}; - -/* System and status variables for the master component */ -extern char rpl_semi_sync_master_enabled; -extern char rpl_semi_sync_master_status; -extern unsigned long rpl_semi_sync_master_wait_point; -extern unsigned long rpl_semi_sync_master_clients; -extern unsigned long rpl_semi_sync_master_timeout; -extern unsigned long rpl_semi_sync_master_trace_level; -extern unsigned long rpl_semi_sync_master_yes_transactions; -extern unsigned long rpl_semi_sync_master_no_transactions; -extern unsigned long rpl_semi_sync_master_off_times; -extern unsigned long rpl_semi_sync_master_wait_timeouts; -extern unsigned long rpl_semi_sync_master_timefunc_fails; -extern unsigned long rpl_semi_sync_master_num_timeouts; -extern unsigned long rpl_semi_sync_master_wait_sessions; -extern unsigned long rpl_semi_sync_master_wait_pos_backtraverse; -extern unsigned long rpl_semi_sync_master_avg_trx_wait_time; -extern unsigned long rpl_semi_sync_master_avg_net_wait_time; -extern unsigned long long rpl_semi_sync_master_net_wait_num; -extern unsigned long long rpl_semi_sync_master_trx_wait_num; -extern unsigned long long rpl_semi_sync_master_net_wait_time; -extern unsigned long long rpl_semi_sync_master_trx_wait_time; - -/* - This indicates whether we should keep waiting if no semi-sync slave - is available. - 0 : stop waiting if detected no avaialable semi-sync slave. - 1 (default) : keep waiting until timeout even no available semi-sync slave. -*/ -extern char rpl_semi_sync_master_wait_no_slave; - -#endif /* SEMISYNC_MASTER_H */ diff --git a/plugin/semisync/semisync_master_plugin.cc b/plugin/semisync/semisync_master_plugin.cc deleted file mode 100644 index b46cf5d79cb..00000000000 --- a/plugin/semisync/semisync_master_plugin.cc +++ /dev/null @@ -1,496 +0,0 @@ -/* Copyright (C) 2007 Google Inc. - Copyright (c) 2008 MySQL AB, 2008-2009 Sun Microsystems, Inc. - Use is subject to license terms. - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; version 2 of the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ - - -#include <my_global.h> -#include "semisync_master.h" -#include "sql_class.h" // THD - -static ReplSemiSyncMaster repl_semisync; - -C_MODE_START - -int repl_semi_report_binlog_update(Binlog_storage_param *param, - const char *log_file, - my_off_t log_pos, uint32 flags) -{ - int error= 0; - - if (repl_semisync.getMasterEnabled()) - { - /* - Let us store the binlog file name and the position, so that - we know how long to wait for the binlog to the replicated to - the slave in synchronous replication. - */ - error= repl_semisync.writeTranxInBinlog(log_file, - log_pos); - } - - return error; -} - -int repl_semi_request_commit(Trans_param *param) -{ - return 0; -} - -int repl_semi_report_binlog_sync(Binlog_storage_param *param, - const char *log_file, - my_off_t log_pos, uint32 flags) -{ - int error= 0; - if (rpl_semi_sync_master_wait_point == - SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC) - { - error = repl_semisync.commitTrx(log_file, log_pos); - } - - return error; -} - -int repl_semi_report_commit(Trans_param *param) -{ - if (rpl_semi_sync_master_wait_point != - SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT) - { - return 0; - } - - bool is_real_trans= param->flags & TRANS_IS_REAL_TRANS; - - if (is_real_trans && param->log_pos) - { - const char *binlog_name= param->log_file; - return repl_semisync.commitTrx(binlog_name, param->log_pos); - } - return 0; -} - -int repl_semi_report_rollback(Trans_param *param) -{ - return repl_semi_report_commit(param); -} - -int repl_semi_binlog_dump_start(Binlog_transmit_param *param, - const char *log_file, - my_off_t log_pos) -{ - bool semi_sync_slave= repl_semisync.is_semi_sync_slave(); - - if (semi_sync_slave) - { - /* One more semi-sync slave */ - repl_semisync.add_slave(); - - /* - Let's assume this semi-sync slave has already received all - binlog events before the filename and position it requests. - */ - repl_semisync.reportReplyBinlog(param->server_id, log_file, log_pos); - } - sql_print_information("Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)", - semi_sync_slave ? "semi-sync" : "asynchronous", - param->server_id, log_file, (unsigned long)log_pos); - - return 0; -} - -int repl_semi_binlog_dump_end(Binlog_transmit_param *param) -{ - bool semi_sync_slave= repl_semisync.is_semi_sync_slave(); - - sql_print_information("Stop %s binlog_dump to slave (server_id: %d)", - semi_sync_slave ? "semi-sync" : "asynchronous", - param->server_id); - if (semi_sync_slave) - { - /* One less semi-sync slave */ - repl_semisync.remove_slave(); - } - return 0; -} - -int repl_semi_reserve_header(Binlog_transmit_param *param, - unsigned char *header, - unsigned long size, unsigned long *len) -{ - *len += repl_semisync.reserveSyncHeader(header, size); - return 0; -} - -int repl_semi_before_send_event(Binlog_transmit_param *param, - unsigned char *packet, unsigned long len, - const char *log_file, my_off_t log_pos) -{ - return repl_semisync.updateSyncHeader(packet, - log_file, - log_pos, - param->server_id); -} - -int repl_semi_after_send_event(Binlog_transmit_param *param, - const char *event_buf, unsigned long len) -{ - if (repl_semisync.is_semi_sync_slave()) - { - THD *thd= current_thd; - /* - Possible errors in reading slave reply are ignored deliberately - because we do not want dump thread to quit on this. Error - messages are already reported. - */ - (void) repl_semisync.readSlaveReply(&thd->net, - param->server_id, event_buf); - thd->clear_error(); - } - return 0; -} - -int repl_semi_reset_master(Binlog_transmit_param *param) -{ - if (repl_semisync.resetMaster()) - return 1; - return 0; -} - -C_MODE_END - -/* - semisync system variables - */ -static void fix_rpl_semi_sync_master_timeout(MYSQL_THD thd, - SYS_VAR *var, - void *ptr, - const void *val); - -static void fix_rpl_semi_sync_master_trace_level(MYSQL_THD thd, - SYS_VAR *var, - void *ptr, - const void *val); - -static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd, - SYS_VAR *var, - void *ptr, - const void *val); - -static MYSQL_SYSVAR_BOOL(enabled, rpl_semi_sync_master_enabled, - PLUGIN_VAR_OPCMDARG, - "Enable semi-synchronous replication master (disabled by default). ", - NULL, // check - &fix_rpl_semi_sync_master_enabled, // update - 0); - -/* NOTE: must match order of rpl_semi_sync_master_wait_point_t */ -static const char *rpl_semi_sync_master_wait_point_names[] = -{ - "AFTER_SYNC", - "AFTER_COMMIT", - NullS -}; - -static TYPELIB rpl_semi_sync_master_wait_point_typelib = -{ - array_elements(rpl_semi_sync_master_wait_point_names) - 1, - "", - rpl_semi_sync_master_wait_point_names, - NULL -}; - -static MYSQL_SYSVAR_ENUM( - wait_point, - rpl_semi_sync_master_wait_point, - PLUGIN_VAR_RQCMDARG, - "Should transaction wait for semi-sync ack after having synced binlog, " - "or after having committed in storeage engine.", - NULL, // check - NULL, // update - SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT, - &rpl_semi_sync_master_wait_point_typelib); - -static MYSQL_SYSVAR_ULONG(timeout, rpl_semi_sync_master_timeout, - PLUGIN_VAR_OPCMDARG, - "The timeout value (in ms) for semi-synchronous replication in the master", - NULL, // check - fix_rpl_semi_sync_master_timeout, // update - 10000, 0, ~0UL, 1); - -static MYSQL_SYSVAR_BOOL(wait_no_slave, rpl_semi_sync_master_wait_no_slave, - PLUGIN_VAR_OPCMDARG, - "Wait until timeout when no semi-synchronous replication slave available (enabled by default). ", - NULL, // check - NULL, // update - 1); - -static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_master_trace_level, - PLUGIN_VAR_OPCMDARG, - "The tracing level for semi-sync replication.", - NULL, // check - &fix_rpl_semi_sync_master_trace_level, // update - 32, 0, ~0UL, 1); - -static SYS_VAR* semi_sync_master_system_vars[]= { - MYSQL_SYSVAR(enabled), - MYSQL_SYSVAR(wait_point), - MYSQL_SYSVAR(timeout), - MYSQL_SYSVAR(wait_no_slave), - MYSQL_SYSVAR(trace_level), - NULL, -}; - - -static void fix_rpl_semi_sync_master_timeout(MYSQL_THD thd, - SYS_VAR *var, - void *ptr, - const void *val) -{ - *(unsigned long *)ptr= *(unsigned long *)val; - repl_semisync.setWaitTimeout(rpl_semi_sync_master_timeout); - return; -} - -static void fix_rpl_semi_sync_master_trace_level(MYSQL_THD thd, - SYS_VAR *var, - void *ptr, - const void *val) -{ - *(unsigned long *)ptr= *(unsigned long *)val; - repl_semisync.setTraceLevel(rpl_semi_sync_master_trace_level); - return; -} - -static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd, - SYS_VAR *var, - void *ptr, - const void *val) -{ - *(char *)ptr= *(char *)val; - if (rpl_semi_sync_master_enabled) - { - if (repl_semisync.enableMaster() != 0) - rpl_semi_sync_master_enabled = false; - } - else - { - if (repl_semisync.disableMaster() != 0) - rpl_semi_sync_master_enabled = true; - } - - return; -} - -Trans_observer trans_observer = { - sizeof(Trans_observer), // len - - repl_semi_report_commit, // after_commit - repl_semi_report_rollback, // after_rollback -}; - -Binlog_storage_observer storage_observer = { - sizeof(Binlog_storage_observer), // len - - repl_semi_report_binlog_update, // report_update - repl_semi_report_binlog_sync, // after_sync -}; - -Binlog_transmit_observer transmit_observer = { - sizeof(Binlog_transmit_observer), // len - - repl_semi_binlog_dump_start, // start - repl_semi_binlog_dump_end, // stop - repl_semi_reserve_header, // reserve_header - repl_semi_before_send_event, // before_send_event - repl_semi_after_send_event, // after_send_event - repl_semi_reset_master, // reset -}; - - -#define SHOW_FNAME(name) \ - rpl_semi_sync_master_show_##name - -#define DEF_SHOW_FUNC(name, show_type) \ - static int SHOW_FNAME(name)(MYSQL_THD thd, SHOW_VAR *var, char *buff) \ - { \ - repl_semisync.setExportStats(); \ - var->type= show_type; \ - var->value= (char *)&rpl_semi_sync_master_##name; \ - return 0; \ - } - -DEF_SHOW_FUNC(status, SHOW_BOOL) -DEF_SHOW_FUNC(clients, SHOW_LONG) -DEF_SHOW_FUNC(wait_sessions, SHOW_LONG) -DEF_SHOW_FUNC(trx_wait_time, SHOW_LONGLONG) -DEF_SHOW_FUNC(trx_wait_num, SHOW_LONGLONG) -DEF_SHOW_FUNC(net_wait_time, SHOW_LONGLONG) -DEF_SHOW_FUNC(net_wait_num, SHOW_LONGLONG) -DEF_SHOW_FUNC(avg_net_wait_time, SHOW_LONG) -DEF_SHOW_FUNC(avg_trx_wait_time, SHOW_LONG) - - -/* plugin status variables */ -static SHOW_VAR semi_sync_master_status_vars[]= { - {"Rpl_semi_sync_master_status", - (char*) &SHOW_FNAME(status), - SHOW_SIMPLE_FUNC}, - {"Rpl_semi_sync_master_clients", - (char*) &SHOW_FNAME(clients), - SHOW_SIMPLE_FUNC}, - {"Rpl_semi_sync_master_yes_tx", - (char*) &rpl_semi_sync_master_yes_transactions, - SHOW_LONG}, - {"Rpl_semi_sync_master_no_tx", - (char*) &rpl_semi_sync_master_no_transactions, - SHOW_LONG}, - {"Rpl_semi_sync_master_wait_sessions", - (char*) &SHOW_FNAME(wait_sessions), - SHOW_SIMPLE_FUNC}, - {"Rpl_semi_sync_master_no_times", - (char*) &rpl_semi_sync_master_off_times, - SHOW_LONG}, - {"Rpl_semi_sync_master_timefunc_failures", - (char*) &rpl_semi_sync_master_timefunc_fails, - SHOW_LONG}, - {"Rpl_semi_sync_master_wait_pos_backtraverse", - (char*) &rpl_semi_sync_master_wait_pos_backtraverse, - SHOW_LONG}, - {"Rpl_semi_sync_master_tx_wait_time", - (char*) &SHOW_FNAME(trx_wait_time), - SHOW_SIMPLE_FUNC}, - {"Rpl_semi_sync_master_tx_waits", - (char*) &SHOW_FNAME(trx_wait_num), - SHOW_SIMPLE_FUNC}, - {"Rpl_semi_sync_master_tx_avg_wait_time", - (char*) &SHOW_FNAME(avg_trx_wait_time), - SHOW_SIMPLE_FUNC}, - {"Rpl_semi_sync_master_net_wait_time", - (char*) &SHOW_FNAME(net_wait_time), - SHOW_SIMPLE_FUNC}, - {"Rpl_semi_sync_master_net_waits", - (char*) &SHOW_FNAME(net_wait_num), - SHOW_SIMPLE_FUNC}, - {"Rpl_semi_sync_master_net_avg_wait_time", - (char*) &SHOW_FNAME(avg_net_wait_time), - SHOW_SIMPLE_FUNC}, - {NULL, NULL, SHOW_LONG}, -}; - -#ifdef HAVE_PSI_INTERFACE -PSI_mutex_key key_ss_mutex_LOCK_binlog_; - -static PSI_mutex_info all_semisync_mutexes[]= -{ - { &key_ss_mutex_LOCK_binlog_, "LOCK_binlog_", 0} -}; - -PSI_cond_key key_ss_cond_COND_binlog_send_; - -static PSI_cond_info all_semisync_conds[]= -{ - { &key_ss_cond_COND_binlog_send_, "COND_binlog_send_", 0} -}; -#endif /* HAVE_PSI_INTERFACE */ - -PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave= -{ 0, "Waiting for semi-sync ACK from slave", 0}; - -#ifdef HAVE_PSI_INTERFACE -PSI_stage_info *all_semisync_stages[]= -{ - & stage_waiting_for_semi_sync_ack_from_slave -}; - -static void init_semisync_psi_keys(void) -{ - const char* category= "semisync"; - int count; - - count= array_elements(all_semisync_mutexes); - mysql_mutex_register(category, all_semisync_mutexes, count); - - count= array_elements(all_semisync_conds); - mysql_cond_register(category, all_semisync_conds, count); - - count= array_elements(all_semisync_stages); - mysql_stage_register(category, all_semisync_stages, count); -} -#endif /* HAVE_PSI_INTERFACE */ - -static int semi_sync_master_plugin_init(void *p) -{ -#ifdef HAVE_PSI_INTERFACE - init_semisync_psi_keys(); -#endif - - if (repl_semisync.initObject()) - return 1; - if (register_trans_observer(&trans_observer, p)) - return 1; - if (register_binlog_storage_observer(&storage_observer, p)) - return 1; - if (register_binlog_transmit_observer(&transmit_observer, p)) - return 1; - return 0; -} - -static int semi_sync_master_plugin_deinit(void *p) -{ - if (unregister_trans_observer(&trans_observer, p)) - { - sql_print_error("unregister_trans_observer failed"); - return 1; - } - if (unregister_binlog_storage_observer(&storage_observer, p)) - { - sql_print_error("unregister_binlog_storage_observer failed"); - return 1; - } - if (unregister_binlog_transmit_observer(&transmit_observer, p)) - { - sql_print_error("unregister_binlog_transmit_observer failed"); - return 1; - } - repl_semisync.cleanup(); - sql_print_information("unregister_replicator OK"); - return 0; -} - -struct Mysql_replication semi_sync_master_plugin= { - MYSQL_REPLICATION_INTERFACE_VERSION -}; - -/* - Plugin library descriptor -*/ -maria_declare_plugin(semisync_master) -{ - MYSQL_REPLICATION_PLUGIN, - &semi_sync_master_plugin, - "rpl_semi_sync_master", - "He Zhenxing", - "Semi-synchronous replication master", - PLUGIN_LICENSE_GPL, - semi_sync_master_plugin_init, /* Plugin Init */ - semi_sync_master_plugin_deinit, /* Plugin Deinit */ - 0x0100 /* 1.0 */, - semi_sync_master_status_vars, /* status variables */ - semi_sync_master_system_vars, /* system variables */ - "1.0", - MariaDB_PLUGIN_MATURITY_STABLE -} -maria_declare_plugin_end; - diff --git a/plugin/semisync/semisync_slave.cc b/plugin/semisync/semisync_slave.cc deleted file mode 100644 index ff8a40aafac..00000000000 --- a/plugin/semisync/semisync_slave.cc +++ /dev/null @@ -1,140 +0,0 @@ -/* Copyright (c) 2008 MySQL AB, 2009 Sun Microsystems, Inc. - Use is subject to license terms. - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; version 2 of the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ - - -#include <my_global.h> -#include "semisync_slave.h" - -char rpl_semi_sync_slave_enabled; -char rpl_semi_sync_slave_status= 0; -unsigned long rpl_semi_sync_slave_trace_level; - -int ReplSemiSyncSlave::initObject() -{ - int result= 0; - const char *kWho = "ReplSemiSyncSlave::initObject"; - - if (init_done_) - { - fprintf(stderr, "%s called twice\n", kWho); - return 1; - } - init_done_ = true; - - /* References to the parameter works after set_options(). */ - setSlaveEnabled(rpl_semi_sync_slave_enabled); - setTraceLevel(rpl_semi_sync_slave_trace_level); - - return result; -} - -int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header, - unsigned long total_len, - bool *need_reply, - const char **payload, - unsigned long *payload_len) -{ - const char *kWho = "ReplSemiSyncSlave::slaveReadSyncHeader"; - int read_res = 0; - function_enter(kWho); - - if ((unsigned char)(header[0]) == kPacketMagicNum) - { - *need_reply = (header[1] & kPacketFlagSync); - *payload_len = total_len - 2; - *payload = header + 2; - - if (trace_level_ & kTraceDetail) - sql_print_information("%s: reply - %d", kWho, *need_reply); - } - else - { - sql_print_error("Missing magic number for semi-sync packet, packet " - "len: %lu", total_len); - read_res = -1; - } - - return function_exit(kWho, read_res); -} - -int ReplSemiSyncSlave::slaveStart(Binlog_relay_IO_param *param) -{ - bool semi_sync= getSlaveEnabled(); - - sql_print_information("Slave I/O thread: Start %s replication to\ - master '%s@%s:%d' in log '%s' at position %lu", - semi_sync ? "semi-sync" : "asynchronous", - param->user, param->host, param->port, - param->master_log_name[0] ? param->master_log_name : "FIRST", - (unsigned long)param->master_log_pos); - - if (semi_sync && !rpl_semi_sync_slave_status) - rpl_semi_sync_slave_status= 1; - return 0; -} - -int ReplSemiSyncSlave::slaveStop(Binlog_relay_IO_param *param) -{ - if (rpl_semi_sync_slave_status) - rpl_semi_sync_slave_status= 0; - if (mysql_reply) - mysql_close(mysql_reply); - mysql_reply= 0; - return 0; -} - -int ReplSemiSyncSlave::slaveReply(MYSQL *mysql, - const char *binlog_filename, - my_off_t binlog_filepos) -{ - const char *kWho = "ReplSemiSyncSlave::slaveReply"; - NET *net= &mysql->net; - uchar reply_buffer[REPLY_MAGIC_NUM_LEN - + REPLY_BINLOG_POS_LEN - + REPLY_BINLOG_NAME_LEN]; - int reply_res, name_len = strlen(binlog_filename); - - function_enter(kWho); - - /* Prepare the buffer of the reply. */ - reply_buffer[REPLY_MAGIC_NUM_OFFSET] = kPacketMagicNum; - int8store(reply_buffer + REPLY_BINLOG_POS_OFFSET, binlog_filepos); - memcpy(reply_buffer + REPLY_BINLOG_NAME_OFFSET, - binlog_filename, - name_len + 1 /* including trailing '\0' */); - - if (trace_level_ & kTraceDetail) - sql_print_information("%s: reply (%s, %lu)", kWho, - binlog_filename, (ulong)binlog_filepos); - - net_clear(net, 0); - /* Send the reply. */ - reply_res = my_net_write(net, reply_buffer, - name_len + REPLY_BINLOG_NAME_OFFSET); - if (!reply_res) - { - reply_res = net_flush(net); - if (reply_res) - sql_print_error("Semi-sync slave net_flush() reply failed"); - } - else - { - sql_print_error("Semi-sync slave send reply failed: %s (%d)", - net->last_error, net->last_errno); - } - - return function_exit(kWho, reply_res); -} diff --git a/plugin/semisync/semisync_slave.h b/plugin/semisync/semisync_slave.h deleted file mode 100644 index 1bf8cf31972..00000000000 --- a/plugin/semisync/semisync_slave.h +++ /dev/null @@ -1,97 +0,0 @@ -/* Copyright (c) 2006 MySQL AB, 2009 Sun Microsystems, Inc. - Use is subject to license terms. - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; version 2 of the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ - - -#ifndef SEMISYNC_SLAVE_H -#define SEMISYNC_SLAVE_H - -#include "semisync.h" - -/** - The extension class for the slave of semi-synchronous replication -*/ -class ReplSemiSyncSlave - :public ReplSemiSyncBase { -public: - ReplSemiSyncSlave() - :slave_enabled_(false) - {} - ~ReplSemiSyncSlave() {} - - void setTraceLevel(unsigned long trace_level) { - trace_level_ = trace_level; - } - - /* Initialize this class after MySQL parameters are initialized. this - * function should be called once at bootstrap time. - */ - int initObject(); - - bool getSlaveEnabled() { - return slave_enabled_; - } - void setSlaveEnabled(bool enabled) { - slave_enabled_ = enabled; - } - - /* A slave reads the semi-sync packet header and separate the metadata - * from the payload data. - * - * Input: - * header - (IN) packet header pointer - * total_len - (IN) total packet length: metadata + payload - * need_reply - (IN) whether the master is waiting for the reply - * payload - (IN) payload: the replication event - * payload_len - (IN) payload length - * - * Return: - * 0: success; non-zero: error - */ - int slaveReadSyncHeader(const char *header, unsigned long total_len, bool *need_reply, - const char **payload, unsigned long *payload_len); - - /* A slave replies to the master indicating its replication process. It - * indicates that the slave has received all events before the specified - * binlog position. - * - * Input: - * mysql - (IN) the mysql network connection - * binlog_filename - (IN) the reply point's binlog file name - * binlog_filepos - (IN) the reply point's binlog file offset - * - * Return: - * 0: success; non-zero: error - */ - int slaveReply(MYSQL *mysql, const char *binlog_filename, - my_off_t binlog_filepos); - - int slaveStart(Binlog_relay_IO_param *param); - int slaveStop(Binlog_relay_IO_param *param); - -private: - /* True when initObject has been called */ - bool init_done_; - bool slave_enabled_; /* semi-sycn is enabled on the slave */ - MYSQL *mysql_reply; /* connection to send reply */ -}; - - -/* System and status variables for the slave component */ -extern char rpl_semi_sync_slave_enabled; -extern unsigned long rpl_semi_sync_slave_trace_level; -extern char rpl_semi_sync_slave_status; - -#endif /* SEMISYNC_SLAVE_H */ diff --git a/plugin/semisync/semisync_slave_plugin.cc b/plugin/semisync/semisync_slave_plugin.cc deleted file mode 100644 index df9e8e10429..00000000000 --- a/plugin/semisync/semisync_slave_plugin.cc +++ /dev/null @@ -1,234 +0,0 @@ -/* Copyright (C) 2007 Google Inc. - Copyright (C) 2008 MySQL AB - Use is subject to license terms - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; version 2 of the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ - - -#include <my_global.h> -#include "semisync_slave.h" -#include <mysql.h> - -static ReplSemiSyncSlave repl_semisync; - -/* - indicate whether or not the slave should send a reply to the master. - - This is set to true in repl_semi_slave_read_event if the current - event read is the last event of a transaction. And the value is - checked in repl_semi_slave_queue_event. -*/ -bool semi_sync_need_reply= false; - -C_MODE_START - -int repl_semi_reset_slave(Binlog_relay_IO_param *param) -{ - // TODO: reset semi-sync slave status here - return 0; -} - -int repl_semi_slave_request_dump(Binlog_relay_IO_param *param, - uint32 flags) -{ - MYSQL *mysql= param->mysql; - MYSQL_RES *res= 0; - MYSQL_ROW row; - const char *query; - - if (!repl_semisync.getSlaveEnabled()) - return 0; - - /* Check if master server has semi-sync plugin installed */ - query= "SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'"; - if (mysql_real_query(mysql, query, strlen(query)) || - !(res= mysql_store_result(mysql))) - { - sql_print_error("Execution failed on master: %s", query); - return 1; - } - - row= mysql_fetch_row(res); - if (!row) - { - /* Master does not support semi-sync */ - sql_print_warning("Master server does not support semi-sync, " - "fallback to asynchronous replication"); - rpl_semi_sync_slave_status= 0; - mysql_free_result(res); - return 0; - } - mysql_free_result(res); - - /* - Tell master dump thread that we want to do semi-sync - replication - */ - query= "SET @rpl_semi_sync_slave= 1"; - if (mysql_real_query(mysql, query, strlen(query))) - { - sql_print_error("Set 'rpl_semi_sync_slave=1' on master failed"); - return 1; - } - mysql_free_result(mysql_store_result(mysql)); - rpl_semi_sync_slave_status= 1; - return 0; -} - -int repl_semi_slave_read_event(Binlog_relay_IO_param *param, - const char *packet, unsigned long len, - const char **event_buf, unsigned long *event_len) -{ - if (rpl_semi_sync_slave_status) - return repl_semisync.slaveReadSyncHeader(packet, len, - &semi_sync_need_reply, - event_buf, event_len); - *event_buf= packet; - *event_len= len; - return 0; -} - -int repl_semi_slave_queue_event(Binlog_relay_IO_param *param, - const char *event_buf, - unsigned long event_len, - uint32 flags) -{ - if (rpl_semi_sync_slave_status && semi_sync_need_reply) - { - /* - We deliberately ignore the error in slaveReply, such error - should not cause the slave IO thread to stop, and the error - messages are already reported. - */ - (void) repl_semisync.slaveReply(param->mysql, - param->master_log_name, - param->master_log_pos); - } - return 0; -} - -int repl_semi_slave_io_start(Binlog_relay_IO_param *param) -{ - return repl_semisync.slaveStart(param); -} - -int repl_semi_slave_io_end(Binlog_relay_IO_param *param) -{ - return repl_semisync.slaveStop(param); -} - -C_MODE_END - -static void fix_rpl_semi_sync_slave_enabled(MYSQL_THD thd, - SYS_VAR *var, - void *ptr, - const void *val) -{ - *(char *)ptr= *(char *)val; - repl_semisync.setSlaveEnabled(rpl_semi_sync_slave_enabled != 0); - return; -} - -static void fix_rpl_semi_sync_trace_level(MYSQL_THD thd, - SYS_VAR *var, - void *ptr, - const void *val) -{ - *(unsigned long *)ptr= *(unsigned long *)val; - repl_semisync.setTraceLevel(rpl_semi_sync_slave_trace_level); - return; -} - -/* plugin system variables */ -static MYSQL_SYSVAR_BOOL(enabled, rpl_semi_sync_slave_enabled, - PLUGIN_VAR_OPCMDARG, - "Enable semi-synchronous replication slave (disabled by default). ", - NULL, // check - &fix_rpl_semi_sync_slave_enabled, // update - 0); - -static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_slave_trace_level, - PLUGIN_VAR_OPCMDARG, - "The tracing level for semi-sync replication.", - NULL, // check - &fix_rpl_semi_sync_trace_level, // update - 32, 0, ~0UL, 1); - -static SYS_VAR* semi_sync_slave_system_vars[]= { - MYSQL_SYSVAR(enabled), - MYSQL_SYSVAR(trace_level), - NULL, -}; - - -/* plugin status variables */ -static SHOW_VAR semi_sync_slave_status_vars[]= { - {"Rpl_semi_sync_slave_status", - (char*) &rpl_semi_sync_slave_status, SHOW_BOOL}, - {NULL, NULL, SHOW_BOOL}, -}; - -Binlog_relay_IO_observer relay_io_observer = { - sizeof(Binlog_relay_IO_observer), // len - - repl_semi_slave_io_start, // start - repl_semi_slave_io_end, // stop - repl_semi_slave_request_dump, // request_transmit - repl_semi_slave_read_event, // after_read_event - repl_semi_slave_queue_event, // after_queue_event - repl_semi_reset_slave, // reset -}; - -static int semi_sync_slave_plugin_init(void *p) -{ - if (repl_semisync.initObject()) - return 1; - if (register_binlog_relay_io_observer(&relay_io_observer, p)) - return 1; - return 0; -} - -static int semi_sync_slave_plugin_deinit(void *p) -{ - if (unregister_binlog_relay_io_observer(&relay_io_observer, p)) - return 1; - return 0; -} - - -struct Mysql_replication semi_sync_slave_plugin= { - MYSQL_REPLICATION_INTERFACE_VERSION -}; - -/* - Plugin library descriptor -*/ -maria_declare_plugin(semisync_slave) -{ - MYSQL_REPLICATION_PLUGIN, - &semi_sync_slave_plugin, - "rpl_semi_sync_slave", - "He Zhenxing", - "Semi-synchronous replication slave", - PLUGIN_LICENSE_GPL, - semi_sync_slave_plugin_init, /* Plugin Init */ - semi_sync_slave_plugin_deinit, /* Plugin Deinit */ - 0x0100 /* 1.0 */, - semi_sync_slave_status_vars, /* status variables */ - semi_sync_slave_system_vars, /* system variables */ - "1.0", - MariaDB_PLUGIN_MATURITY_STABLE -} -maria_declare_plugin_end; - |