summaryrefslogtreecommitdiff
path: root/sql/semisync_master_ack_receiver.cc
diff options
context:
space:
mode:
authorAndrei Elkin <andrei.elkin@mariadb.com>2017-11-22 17:10:34 +0200
committerMonty <monty@mariadb.org>2017-12-18 13:43:37 +0200
commite972125f11d8f37bc263b113e85ed064257a92ee (patch)
treef705082eec5bdb4f274b3e7ceee4714b86c96075 /sql/semisync_master_ack_receiver.cc
parentabceaa75428f9b2d64ce64629d010af9aa6eae1f (diff)
downloadmariadb-git-e972125f11d8f37bc263b113e85ed064257a92ee.tar.gz
MDEV-13073 This part merges the Ali semisync related changes
and specifically the ack receiving functionality. Semisync is turned to be static instead of plugin so its functions are invoked at the same points as RUN_HOOKS. The RUN_HOOKS and the observer interface remain to be removed by later patch. Todo: React on killed status by repl_semisync_master.wait_after_sync(). Currently Repl_semi_sync_master::commit_trx does not check the killed status. There were few bugfixes found that are present in mysql and its unclear whether/how they are covered. Those include: Bug#15985893: GTID SKIPPED EVENTS ON MASTER CAUSE SEMI SYNC TIME-OUTS Bug#17932935 CALLING IS_SEMI_SYNC_SLAVE() IN EACH FUNCTION CALL HAS BAD PERFORMANCE Bug#20574628: SEMI-SYNC REPLICATION PERFORMANCE DEGRADES WITH A HIGH NUMBER OF THREADS
Diffstat (limited to 'sql/semisync_master_ack_receiver.cc')
-rw-r--r--sql/semisync_master_ack_receiver.cc308
1 files changed, 308 insertions, 0 deletions
diff --git a/sql/semisync_master_ack_receiver.cc b/sql/semisync_master_ack_receiver.cc
new file mode 100644
index 00000000000..eee35cc122f
--- /dev/null
+++ b/sql/semisync_master_ack_receiver.cc
@@ -0,0 +1,308 @@
+/* Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
+
+#include <my_global.h>
+#include "semisync_master.h"
+#include "semisync_master_ack_receiver.h"
+
+extern PSI_mutex_key key_ss_mutex_Ack_receiver_mutex;
+extern PSI_cond_key key_ss_cond_Ack_receiver_cond;
+extern PSI_thread_key key_ss_thread_Ack_receiver_thread;
+extern ReplSemiSyncMaster repl_semisync;
+
+/* Callback function of ack receive thread */
+pthread_handler_t ack_receive_handler(void *arg)
+{
+ Ack_receiver *recv= reinterpret_cast<Ack_receiver *>(arg);
+
+ my_thread_init();
+ recv->run();
+ my_thread_end();
+
+ return NULL;
+}
+
+Ack_receiver::Ack_receiver()
+{
+ const char *kWho = "Ack_receiver::Ack_receiver";
+ function_enter(kWho);
+
+ m_status= ST_DOWN;
+ mysql_mutex_init(key_ss_mutex_Ack_receiver_mutex, &m_mutex,
+ MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_ss_cond_Ack_receiver_cond, &m_cond, NULL);
+ m_pid= 0;
+
+ function_exit(kWho);
+}
+
+void Ack_receiver::cleanup()
+{
+ const char *kWho = "Ack_receiver::~Ack_receiver";
+ function_enter(kWho);
+
+ stop();
+ mysql_mutex_destroy(&m_mutex);
+ mysql_cond_destroy(&m_cond);
+
+ function_exit(kWho);
+}
+
+bool Ack_receiver::start()
+{
+ const char *kWho = "Ack_receiver::start";
+ function_enter(kWho);
+
+ mysql_mutex_lock(&m_mutex);
+ if(m_status == ST_DOWN)
+ {
+ pthread_attr_t attr;
+
+ m_status= ST_UP;
+
+ if (DBUG_EVALUATE_IF("rpl_semisync_simulate_create_thread_failure", 1, 0) ||
+ pthread_attr_init(&attr) != 0 ||
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) != 0 ||
+#ifndef _WIN32
+ pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM) != 0 ||
+#endif
+ mysql_thread_create(key_ss_thread_Ack_receiver_thread, &m_pid,
+ &attr, ack_receive_handler, this))
+ {
+ sql_print_error("Failed to start semi-sync ACK receiver thread, "
+ " could not create thread(errno:%d)", errno);
+
+ m_status= ST_DOWN;
+ mysql_mutex_unlock(&m_mutex);
+
+ return function_exit(kWho, true);
+ }
+ (void) pthread_attr_destroy(&attr);
+ }
+ mysql_mutex_unlock(&m_mutex);
+
+ return function_exit(kWho, false);
+}
+
+void Ack_receiver::stop()
+{
+ const char *kWho = "Ack_receiver::stop";
+ function_enter(kWho);
+
+ mysql_mutex_lock(&m_mutex);
+ if (m_status == ST_UP)
+ {
+ m_status= ST_STOPPING;
+ mysql_cond_broadcast(&m_cond);
+
+ while (m_status == ST_STOPPING)
+ mysql_cond_wait(&m_cond, &m_mutex);
+
+ DBUG_ASSERT(m_status == ST_DOWN);
+
+ m_pid= 0;
+ }
+ mysql_mutex_unlock(&m_mutex);
+
+ function_exit(kWho);
+}
+
+bool Ack_receiver::add_slave(THD *thd)
+{
+ Slave *slave;
+ const char *kWho = "Ack_receiver::add_slave";
+ function_enter(kWho);
+
+ if (!(slave= new Slave))
+ return function_exit(kWho, true);
+
+ slave->thd= thd;
+ slave->vio= *thd->net.vio;
+ slave->vio.mysql_socket.m_psi= NULL;
+ slave->vio.read_timeout= 1;
+
+ mysql_mutex_lock(&m_mutex);
+ m_slaves.push_back(slave);
+ m_slaves_changed= true;
+ mysql_cond_broadcast(&m_cond);
+ mysql_mutex_unlock(&m_mutex);
+
+ return function_exit(kWho, false);
+}
+
+void Ack_receiver::remove_slave(THD *thd)
+{
+ I_List_iterator<Slave> it(m_slaves);
+ Slave *slave;
+ const char *kWho = "Ack_receiver::remove_slave";
+ function_enter(kWho);
+
+ mysql_mutex_lock(&m_mutex);
+
+ while ((slave= it++))
+ {
+ if (slave->thd == thd)
+ {
+ delete slave;
+ m_slaves_changed= true;
+ break;
+ }
+ }
+ mysql_mutex_unlock(&m_mutex);
+ function_exit(kWho);
+}
+
+inline void Ack_receiver::set_stage_info(const PSI_stage_info &stage)
+{
+ MYSQL_SET_STAGE(stage.m_key, __FILE__, __LINE__);
+}
+
+inline void Ack_receiver::wait_for_slave_connection()
+{
+ set_stage_info(stage_waiting_for_semi_sync_slave);
+ mysql_cond_wait(&m_cond, &m_mutex);
+}
+
+my_socket Ack_receiver::get_slave_sockets(fd_set *fds, uint *count)
+{
+ my_socket max_fd= INVALID_SOCKET;
+ Slave *slave;
+ I_List_iterator<Slave> it(m_slaves);
+
+ *count= 0;
+ FD_ZERO(fds);
+ while ((slave= it++))
+ {
+ (*count)++;
+ my_socket fd= slave->sock_fd();
+ max_fd= (fd > max_fd ? fd : max_fd);
+ FD_SET(fd, fds);
+ }
+
+ return max_fd;
+}
+
+/* Auxilary function to initialize a NET object with given net buffer. */
+static void init_net(NET *net, unsigned char *buff, unsigned int buff_len)
+{
+ memset(net, 0, sizeof(NET));
+ net->max_packet= buff_len;
+ net->buff= buff;
+ net->buff_end= buff + buff_len;
+ net->read_pos= net->buff;
+}
+
+void Ack_receiver::run()
+{
+ // skip LOCK_global_system_variables due to the 3rd arg
+ THD *thd= new THD(next_thread_id(), false, true);
+ NET net;
+ unsigned char net_buff[REPLY_MESSAGE_MAX_LENGTH];
+ fd_set read_fds;
+ my_socket max_fd= INVALID_SOCKET;
+ Slave *slave;
+
+ my_thread_init();
+
+ DBUG_ENTER("Ack_receiver::run");
+
+ sql_print_information("Starting ack receiver thread");
+ thd->system_thread= SYSTEM_THREAD_SEMISYNC_MASTER_BACKGROUND;
+ thd->thread_stack= (char*) &thd;
+ thd->store_globals();
+ thd->security_ctx->skip_grants();
+ thread_safe_increment32(&service_thread_count);
+ thd->set_command(COM_DAEMON);
+ init_net(&net, net_buff, REPLY_MESSAGE_MAX_LENGTH);
+
+ mysql_mutex_lock(&m_mutex);
+ m_slaves_changed= true;
+ mysql_mutex_unlock(&m_mutex);
+
+ while (1)
+ {
+ fd_set fds;
+ int ret;
+ uint slave_count;
+
+ mysql_mutex_lock(&m_mutex);
+ if (unlikely(m_status == ST_STOPPING))
+ goto end;
+
+ set_stage_info(stage_waiting_for_semi_sync_ack_from_slave);
+ if (unlikely(m_slaves_changed))
+ {
+ if (unlikely(m_slaves.is_empty()))
+ {
+ wait_for_slave_connection();
+ mysql_mutex_unlock(&m_mutex);
+ continue;
+ }
+
+ max_fd= get_slave_sockets(&read_fds, &slave_count);
+ m_slaves_changed= false;
+ DBUG_PRINT("info", ("fd count %u, max_fd %d", slave_count, max_fd));
+ }
+
+ struct timeval tv= {1, 0};
+ fds= read_fds;
+ /* select requires max fd + 1 for the first argument */
+ ret= select(max_fd+1, &fds, NULL, NULL, &tv);
+ if (ret <= 0)
+ {
+ mysql_mutex_unlock(&m_mutex);
+
+ ret= DBUG_EVALUATE_IF("rpl_semisync_simulate_select_error", -1, ret);
+
+ if (ret == -1)
+ sql_print_information("Failed to select() on semi-sync dump sockets, "
+ "error: errno=%d", socket_errno);
+ /* Sleep 1us, so other threads can catch the m_mutex easily. */
+ my_sleep(1);
+ continue;
+ }
+
+ set_stage_info(stage_reading_semi_sync_ack);
+ I_List_iterator<Slave> it(m_slaves);
+
+ while ((slave= it++))
+ {
+ if (FD_ISSET(slave->sock_fd(), &fds))
+ {
+ ulong len;
+
+ net_clear(&net, 0);
+ net.vio= &slave->vio;
+
+ len= my_net_read(&net);
+ if (likely(len != packet_error))
+ repl_semisync_master.reportReplyPacket(slave->server_id(),
+ net.read_pos, len);
+ else if (net.last_errno == ER_NET_READ_ERROR)
+ FD_CLR(slave->sock_fd(), &read_fds);
+ }
+ }
+ mysql_mutex_unlock(&m_mutex);
+ }
+end:
+ sql_print_information("Stopping ack receiver thread");
+ m_status= ST_DOWN;
+ delete thd;
+ thread_safe_decrement32(&service_thread_count);
+ signal_thd_deleted();
+ mysql_cond_broadcast(&m_cond);
+ mysql_mutex_unlock(&m_mutex);
+ DBUG_VOID_RETURN;
+}