diff options
Diffstat (limited to 'sql/semisync_master_ack_receiver.cc')
-rw-r--r-- | sql/semisync_master_ack_receiver.cc | 308 |
1 files changed, 308 insertions, 0 deletions
diff --git a/sql/semisync_master_ack_receiver.cc b/sql/semisync_master_ack_receiver.cc new file mode 100644 index 00000000000..eee35cc122f --- /dev/null +++ b/sql/semisync_master_ack_receiver.cc @@ -0,0 +1,308 @@ +/* Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#include <my_global.h> +#include "semisync_master.h" +#include "semisync_master_ack_receiver.h" + +extern PSI_mutex_key key_ss_mutex_Ack_receiver_mutex; +extern PSI_cond_key key_ss_cond_Ack_receiver_cond; +extern PSI_thread_key key_ss_thread_Ack_receiver_thread; +extern ReplSemiSyncMaster repl_semisync; + +/* Callback function of ack receive thread */ +pthread_handler_t ack_receive_handler(void *arg) +{ + Ack_receiver *recv= reinterpret_cast<Ack_receiver *>(arg); + + my_thread_init(); + recv->run(); + my_thread_end(); + + return NULL; +} + +Ack_receiver::Ack_receiver() +{ + const char *kWho = "Ack_receiver::Ack_receiver"; + function_enter(kWho); + + m_status= ST_DOWN; + mysql_mutex_init(key_ss_mutex_Ack_receiver_mutex, &m_mutex, + MY_MUTEX_INIT_FAST); + mysql_cond_init(key_ss_cond_Ack_receiver_cond, &m_cond, NULL); + m_pid= 0; + + function_exit(kWho); +} + +void Ack_receiver::cleanup() +{ + const char *kWho = "Ack_receiver::~Ack_receiver"; + function_enter(kWho); + + stop(); + mysql_mutex_destroy(&m_mutex); + mysql_cond_destroy(&m_cond); + + function_exit(kWho); +} + +bool Ack_receiver::start() +{ + const char *kWho = "Ack_receiver::start"; + function_enter(kWho); + + mysql_mutex_lock(&m_mutex); + if(m_status == ST_DOWN) + { + pthread_attr_t attr; + + m_status= ST_UP; + + if (DBUG_EVALUATE_IF("rpl_semisync_simulate_create_thread_failure", 1, 0) || + pthread_attr_init(&attr) != 0 || + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) != 0 || +#ifndef _WIN32 + pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM) != 0 || +#endif + mysql_thread_create(key_ss_thread_Ack_receiver_thread, &m_pid, + &attr, ack_receive_handler, this)) + { + sql_print_error("Failed to start semi-sync ACK receiver thread, " + " could not create thread(errno:%d)", errno); + + m_status= ST_DOWN; + mysql_mutex_unlock(&m_mutex); + + return function_exit(kWho, true); + } + (void) pthread_attr_destroy(&attr); + } + mysql_mutex_unlock(&m_mutex); + + return function_exit(kWho, false); +} + +void Ack_receiver::stop() +{ + const char *kWho = "Ack_receiver::stop"; + function_enter(kWho); + + mysql_mutex_lock(&m_mutex); + if (m_status == ST_UP) + { + m_status= ST_STOPPING; + mysql_cond_broadcast(&m_cond); + + while (m_status == ST_STOPPING) + mysql_cond_wait(&m_cond, &m_mutex); + + DBUG_ASSERT(m_status == ST_DOWN); + + m_pid= 0; + } + mysql_mutex_unlock(&m_mutex); + + function_exit(kWho); +} + +bool Ack_receiver::add_slave(THD *thd) +{ + Slave *slave; + const char *kWho = "Ack_receiver::add_slave"; + function_enter(kWho); + + if (!(slave= new Slave)) + return function_exit(kWho, true); + + slave->thd= thd; + slave->vio= *thd->net.vio; + slave->vio.mysql_socket.m_psi= NULL; + slave->vio.read_timeout= 1; + + mysql_mutex_lock(&m_mutex); + m_slaves.push_back(slave); + m_slaves_changed= true; + mysql_cond_broadcast(&m_cond); + mysql_mutex_unlock(&m_mutex); + + return function_exit(kWho, false); +} + +void Ack_receiver::remove_slave(THD *thd) +{ + I_List_iterator<Slave> it(m_slaves); + Slave *slave; + const char *kWho = "Ack_receiver::remove_slave"; + function_enter(kWho); + + mysql_mutex_lock(&m_mutex); + + while ((slave= it++)) + { + if (slave->thd == thd) + { + delete slave; + m_slaves_changed= true; + break; + } + } + mysql_mutex_unlock(&m_mutex); + function_exit(kWho); +} + +inline void Ack_receiver::set_stage_info(const PSI_stage_info &stage) +{ + MYSQL_SET_STAGE(stage.m_key, __FILE__, __LINE__); +} + +inline void Ack_receiver::wait_for_slave_connection() +{ + set_stage_info(stage_waiting_for_semi_sync_slave); + mysql_cond_wait(&m_cond, &m_mutex); +} + +my_socket Ack_receiver::get_slave_sockets(fd_set *fds, uint *count) +{ + my_socket max_fd= INVALID_SOCKET; + Slave *slave; + I_List_iterator<Slave> it(m_slaves); + + *count= 0; + FD_ZERO(fds); + while ((slave= it++)) + { + (*count)++; + my_socket fd= slave->sock_fd(); + max_fd= (fd > max_fd ? fd : max_fd); + FD_SET(fd, fds); + } + + return max_fd; +} + +/* Auxilary function to initialize a NET object with given net buffer. */ +static void init_net(NET *net, unsigned char *buff, unsigned int buff_len) +{ + memset(net, 0, sizeof(NET)); + net->max_packet= buff_len; + net->buff= buff; + net->buff_end= buff + buff_len; + net->read_pos= net->buff; +} + +void Ack_receiver::run() +{ + // skip LOCK_global_system_variables due to the 3rd arg + THD *thd= new THD(next_thread_id(), false, true); + NET net; + unsigned char net_buff[REPLY_MESSAGE_MAX_LENGTH]; + fd_set read_fds; + my_socket max_fd= INVALID_SOCKET; + Slave *slave; + + my_thread_init(); + + DBUG_ENTER("Ack_receiver::run"); + + sql_print_information("Starting ack receiver thread"); + thd->system_thread= SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND; + thd->thread_stack= (char*) &thd; + thd->store_globals(); + thd->security_ctx->skip_grants(); + thread_safe_increment32(&service_thread_count); + thd->set_command(COM_DAEMON); + init_net(&net, net_buff, REPLY_MESSAGE_MAX_LENGTH); + + mysql_mutex_lock(&m_mutex); + m_slaves_changed= true; + mysql_mutex_unlock(&m_mutex); + + while (1) + { + fd_set fds; + int ret; + uint slave_count; + + mysql_mutex_lock(&m_mutex); + if (unlikely(m_status == ST_STOPPING)) + goto end; + + set_stage_info(stage_waiting_for_semi_sync_ack_from_slave); + if (unlikely(m_slaves_changed)) + { + if (unlikely(m_slaves.is_empty())) + { + wait_for_slave_connection(); + mysql_mutex_unlock(&m_mutex); + continue; + } + + max_fd= get_slave_sockets(&read_fds, &slave_count); + m_slaves_changed= false; + DBUG_PRINT("info", ("fd count %u, max_fd %d", slave_count, max_fd)); + } + + struct timeval tv= {1, 0}; + fds= read_fds; + /* select requires max fd + 1 for the first argument */ + ret= select(max_fd+1, &fds, NULL, NULL, &tv); + if (ret <= 0) + { + mysql_mutex_unlock(&m_mutex); + + ret= DBUG_EVALUATE_IF("rpl_semisync_simulate_select_error", -1, ret); + + if (ret == -1) + sql_print_information("Failed to select() on semi-sync dump sockets, " + "error: errno=%d", socket_errno); + /* Sleep 1us, so other threads can catch the m_mutex easily. */ + my_sleep(1); + continue; + } + + set_stage_info(stage_reading_semi_sync_ack); + I_List_iterator<Slave> it(m_slaves); + + while ((slave= it++)) + { + if (FD_ISSET(slave->sock_fd(), &fds)) + { + ulong len; + + net_clear(&net, 0); + net.vio= &slave->vio; + + len= my_net_read(&net); + if (likely(len != packet_error)) + repl_semisync_master.reportReplyPacket(slave->server_id(), + net.read_pos, len); + else if (net.last_errno == ER_NET_READ_ERROR) + FD_CLR(slave->sock_fd(), &read_fds); + } + } + mysql_mutex_unlock(&m_mutex); + } +end: + sql_print_information("Stopping ack receiver thread"); + m_status= ST_DOWN; + delete thd; + thread_safe_decrement32(&service_thread_count); + signal_thd_deleted(); + mysql_cond_broadcast(&m_cond); + mysql_mutex_unlock(&m_mutex); + DBUG_VOID_RETURN; +} |