summaryrefslogtreecommitdiff
path: root/sql/semisync_master_ack_receiver.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/semisync_master_ack_receiver.cc')
-rw-r--r--sql/semisync_master_ack_receiver.cc308
1 files changed, 308 insertions, 0 deletions
diff --git a/sql/semisync_master_ack_receiver.cc b/sql/semisync_master_ack_receiver.cc
new file mode 100644
index 00000000000..eee35cc122f
--- /dev/null
+++ b/sql/semisync_master_ack_receiver.cc
@@ -0,0 +1,308 @@
+/* Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
+
+#include <my_global.h>
+#include "semisync_master.h"
+#include "semisync_master_ack_receiver.h"
+
+extern PSI_mutex_key key_ss_mutex_Ack_receiver_mutex;
+extern PSI_cond_key key_ss_cond_Ack_receiver_cond;
+extern PSI_thread_key key_ss_thread_Ack_receiver_thread;
+extern ReplSemiSyncMaster repl_semisync;
+
+/* Callback function of ack receive thread */
+pthread_handler_t ack_receive_handler(void *arg)
+{
+ Ack_receiver *recv= reinterpret_cast<Ack_receiver *>(arg);
+
+ my_thread_init();
+ recv->run();
+ my_thread_end();
+
+ return NULL;
+}
+
+Ack_receiver::Ack_receiver()
+{
+ const char *kWho = "Ack_receiver::Ack_receiver";
+ function_enter(kWho);
+
+ m_status= ST_DOWN;
+ mysql_mutex_init(key_ss_mutex_Ack_receiver_mutex, &m_mutex,
+ MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_ss_cond_Ack_receiver_cond, &m_cond, NULL);
+ m_pid= 0;
+
+ function_exit(kWho);
+}
+
+void Ack_receiver::cleanup()
+{
+ const char *kWho = "Ack_receiver::~Ack_receiver";
+ function_enter(kWho);
+
+ stop();
+ mysql_mutex_destroy(&m_mutex);
+ mysql_cond_destroy(&m_cond);
+
+ function_exit(kWho);
+}
+
+bool Ack_receiver::start()
+{
+ const char *kWho = "Ack_receiver::start";
+ function_enter(kWho);
+
+ mysql_mutex_lock(&m_mutex);
+ if(m_status == ST_DOWN)
+ {
+ pthread_attr_t attr;
+
+ m_status= ST_UP;
+
+ if (DBUG_EVALUATE_IF("rpl_semisync_simulate_create_thread_failure", 1, 0) ||
+ pthread_attr_init(&attr) != 0 ||
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) != 0 ||
+#ifndef _WIN32
+ pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM) != 0 ||
+#endif
+ mysql_thread_create(key_ss_thread_Ack_receiver_thread, &m_pid,
+ &attr, ack_receive_handler, this))
+ {
+ sql_print_error("Failed to start semi-sync ACK receiver thread, "
+ " could not create thread(errno:%d)", errno);
+
+ m_status= ST_DOWN;
+ mysql_mutex_unlock(&m_mutex);
+
+ return function_exit(kWho, true);
+ }
+ (void) pthread_attr_destroy(&attr);
+ }
+ mysql_mutex_unlock(&m_mutex);
+
+ return function_exit(kWho, false);
+}
+
+void Ack_receiver::stop()
+{
+ const char *kWho = "Ack_receiver::stop";
+ function_enter(kWho);
+
+ mysql_mutex_lock(&m_mutex);
+ if (m_status == ST_UP)
+ {
+ m_status= ST_STOPPING;
+ mysql_cond_broadcast(&m_cond);
+
+ while (m_status == ST_STOPPING)
+ mysql_cond_wait(&m_cond, &m_mutex);
+
+ DBUG_ASSERT(m_status == ST_DOWN);
+
+ m_pid= 0;
+ }
+ mysql_mutex_unlock(&m_mutex);
+
+ function_exit(kWho);
+}
+
+bool Ack_receiver::add_slave(THD *thd)
+{
+ Slave *slave;
+ const char *kWho = "Ack_receiver::add_slave";
+ function_enter(kWho);
+
+ if (!(slave= new Slave))
+ return function_exit(kWho, true);
+
+ slave->thd= thd;
+ slave->vio= *thd->net.vio;
+ slave->vio.mysql_socket.m_psi= NULL;
+ slave->vio.read_timeout= 1;
+
+ mysql_mutex_lock(&m_mutex);
+ m_slaves.push_back(slave);
+ m_slaves_changed= true;
+ mysql_cond_broadcast(&m_cond);
+ mysql_mutex_unlock(&m_mutex);
+
+ return function_exit(kWho, false);
+}
+
+void Ack_receiver::remove_slave(THD *thd)
+{
+ I_List_iterator<Slave> it(m_slaves);
+ Slave *slave;
+ const char *kWho = "Ack_receiver::remove_slave";
+ function_enter(kWho);
+
+ mysql_mutex_lock(&m_mutex);
+
+ while ((slave= it++))
+ {
+ if (slave->thd == thd)
+ {
+ delete slave;
+ m_slaves_changed= true;
+ break;
+ }
+ }
+ mysql_mutex_unlock(&m_mutex);
+ function_exit(kWho);
+}
+
+inline void Ack_receiver::set_stage_info(const PSI_stage_info &stage)
+{
+ MYSQL_SET_STAGE(stage.m_key, __FILE__, __LINE__);
+}
+
+inline void Ack_receiver::wait_for_slave_connection()
+{
+ set_stage_info(stage_waiting_for_semi_sync_slave);
+ mysql_cond_wait(&m_cond, &m_mutex);
+}
+
+my_socket Ack_receiver::get_slave_sockets(fd_set *fds, uint *count)
+{
+ my_socket max_fd= INVALID_SOCKET;
+ Slave *slave;
+ I_List_iterator<Slave> it(m_slaves);
+
+ *count= 0;
+ FD_ZERO(fds);
+ while ((slave= it++))
+ {
+ (*count)++;
+ my_socket fd= slave->sock_fd();
+ max_fd= (fd > max_fd ? fd : max_fd);
+ FD_SET(fd, fds);
+ }
+
+ return max_fd;
+}
+
+/* Auxilary function to initialize a NET object with given net buffer. */
+static void init_net(NET *net, unsigned char *buff, unsigned int buff_len)
+{
+ memset(net, 0, sizeof(NET));
+ net->max_packet= buff_len;
+ net->buff= buff;
+ net->buff_end= buff + buff_len;
+ net->read_pos= net->buff;
+}
+
+void Ack_receiver::run()
+{
+ // skip LOCK_global_system_variables due to the 3rd arg
+ THD *thd= new THD(next_thread_id(), false, true);
+ NET net;
+ unsigned char net_buff[REPLY_MESSAGE_MAX_LENGTH];
+ fd_set read_fds;
+ my_socket max_fd= INVALID_SOCKET;
+ Slave *slave;
+
+ my_thread_init();
+
+ DBUG_ENTER("Ack_receiver::run");
+
+ sql_print_information("Starting ack receiver thread");
+ thd->system_thread= SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND;
+ thd->thread_stack= (char*) &thd;
+ thd->store_globals();
+ thd->security_ctx->skip_grants();
+ thread_safe_increment32(&service_thread_count);
+ thd->set_command(COM_DAEMON);
+ init_net(&net, net_buff, REPLY_MESSAGE_MAX_LENGTH);
+
+ mysql_mutex_lock(&m_mutex);
+ m_slaves_changed= true;
+ mysql_mutex_unlock(&m_mutex);
+
+ while (1)
+ {
+ fd_set fds;
+ int ret;
+ uint slave_count;
+
+ mysql_mutex_lock(&m_mutex);
+ if (unlikely(m_status == ST_STOPPING))
+ goto end;
+
+ set_stage_info(stage_waiting_for_semi_sync_ack_from_slave);
+ if (unlikely(m_slaves_changed))
+ {
+ if (unlikely(m_slaves.is_empty()))
+ {
+ wait_for_slave_connection();
+ mysql_mutex_unlock(&m_mutex);
+ continue;
+ }
+
+ max_fd= get_slave_sockets(&read_fds, &slave_count);
+ m_slaves_changed= false;
+ DBUG_PRINT("info", ("fd count %u, max_fd %d", slave_count, max_fd));
+ }
+
+ struct timeval tv= {1, 0};
+ fds= read_fds;
+ /* select requires max fd + 1 for the first argument */
+ ret= select(max_fd+1, &fds, NULL, NULL, &tv);
+ if (ret <= 0)
+ {
+ mysql_mutex_unlock(&m_mutex);
+
+ ret= DBUG_EVALUATE_IF("rpl_semisync_simulate_select_error", -1, ret);
+
+ if (ret == -1)
+ sql_print_information("Failed to select() on semi-sync dump sockets, "
+ "error: errno=%d", socket_errno);
+ /* Sleep 1us, so other threads can catch the m_mutex easily. */
+ my_sleep(1);
+ continue;
+ }
+
+ set_stage_info(stage_reading_semi_sync_ack);
+ I_List_iterator<Slave> it(m_slaves);
+
+ while ((slave= it++))
+ {
+ if (FD_ISSET(slave->sock_fd(), &fds))
+ {
+ ulong len;
+
+ net_clear(&net, 0);
+ net.vio= &slave->vio;
+
+ len= my_net_read(&net);
+ if (likely(len != packet_error))
+ repl_semisync_master.reportReplyPacket(slave->server_id(),
+ net.read_pos, len);
+ else if (net.last_errno == ER_NET_READ_ERROR)
+ FD_CLR(slave->sock_fd(), &read_fds);
+ }
+ }
+ mysql_mutex_unlock(&m_mutex);
+ }
+end:
+ sql_print_information("Stopping ack receiver thread");
+ m_status= ST_DOWN;
+ delete thd;
+ thread_safe_decrement32(&service_thread_count);
+ signal_thd_deleted();
+ mysql_cond_broadcast(&m_cond);
+ mysql_mutex_unlock(&m_mutex);
+ DBUG_VOID_RETURN;
+}