diff options
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r-- | sql/sql_repl.cc | 804 |
1 files changed, 478 insertions, 326 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index c15382c669e..15d0d5c90d5 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -1,5 +1,4 @@ -/* - Copyright (c) 2000, 2011, Oracle and/or its affiliates. All rights reserved. +/* Copyright (c) 2000, 2012, Oracle and/or its affiliates. All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -12,17 +11,20 @@ You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA -*/ + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ -#include "mysql_priv.h" +#include "sql_priv.h" +#include "unireg.h" +#include "sql_parse.h" // check_access #ifdef HAVE_REPLICATION #include "rpl_mi.h" #include "sql_repl.h" +#include "sql_acl.h" // SUPER_ACL #include "log_event.h" #include "rpl_filter.h" #include <my_dir.h> +#include "rpl_handler.h" #include "debug_sync.h" int max_binlog_dump_events = 0; // unlimited @@ -31,6 +33,14 @@ my_bool opt_sporadic_binlog_dump_fail = 0; static int binlog_dump_count = 0; #endif +/** + a copy of active_mi->rli->slave_skip_counter, for showing in SHOW VARIABLES, + INFORMATION_SCHEMA.GLOBAL_VARIABLES and @@sql_slave_skip_counter without + taking all the mutexes needed to access active_mi->rli->slave_skip_counter + properly. +*/ +uint sql_slave_skip_counter; + /* fake_rotate_event() builds a fake (=which does not exist physically in any binlog) Rotate event, which contains the name of the binlog we are going to @@ -83,6 +93,32 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name, DBUG_RETURN(0); } +/* + Reset thread transmit packet buffer for event sending + + This function allocates header bytes for event transmission, and + should be called before store the event data to the packet buffer. +*/ +static int reset_transmit_packet(THD *thd, ushort flags, + ulong *ev_offset, const char **errmsg) +{ + int ret= 0; + String *packet= &thd->packet; + + /* reserve and set default header */ + packet->length(0); + packet->set("\0", 1, &my_charset_bin); + + if (RUN_HOOK(binlog_transmit, reserve_header, (thd, flags, packet))) + { + *errmsg= "Failed to run hook 'reserve_header'"; + my_errno= ER_UNKNOWN_ERROR; + ret= 1; + } + *ev_offset= packet->length(); + return ret; +} + static int send_file(THD *thd) { NET* net = &thd->net; @@ -119,13 +155,14 @@ static int send_file(THD *thd) if (!strcmp(fname,"/dev/null")) goto end; - if ((fd = my_open(fname, O_RDONLY, MYF(0))) < 0) + if ((fd= mysql_file_open(key_file_send_file, + fname, O_RDONLY, MYF(0))) < 0) { errmsg = "on open of file"; goto err; } - while ((long) (bytes= my_read(fd, buf, IO_SIZE, MYF(0))) > 0) + while ((long) (bytes= mysql_file_read(fd, buf, IO_SIZE, MYF(0))) > 0) { if (my_net_write(net, buf, bytes)) { @@ -146,7 +183,7 @@ static int send_file(THD *thd) err: my_net_set_read_timeout(net, old_timeout); if (fd >= 0) - (void) my_close(fd, MYF(0)); + mysql_file_close(fd, MYF(0)); if (errmsg) { sql_print_error("Failed in send_file() %s", errmsg); @@ -181,7 +218,7 @@ void adjust_linfo_offsets(my_off_t purge_offset) { THD *tmp; - pthread_mutex_lock(&LOCK_thread_count); + mysql_mutex_lock(&LOCK_thread_count); I_List_iterator<THD> it(threads); while ((tmp=it++)) @@ -189,7 +226,7 @@ void adjust_linfo_offsets(my_off_t purge_offset) LOG_INFO* linfo; if ((linfo = tmp->current_linfo)) { - pthread_mutex_lock(&linfo->lock); + mysql_mutex_lock(&linfo->lock); /* Index file offset can be less that purge offset only if we just started reading the index file. In that case @@ -199,10 +236,10 @@ void adjust_linfo_offsets(my_off_t purge_offset) linfo->fatal = (linfo->index_file_offset != 0); else linfo->index_file_offset -= purge_offset; - pthread_mutex_unlock(&linfo->lock); + mysql_mutex_unlock(&linfo->lock); } } - pthread_mutex_unlock(&LOCK_thread_count); + mysql_mutex_unlock(&LOCK_thread_count); } @@ -212,7 +249,7 @@ bool log_in_use(const char* log_name) THD *tmp; bool result = 0; - pthread_mutex_lock(&LOCK_thread_count); + mysql_mutex_lock(&LOCK_thread_count); I_List_iterator<THD> it(threads); while ((tmp=it++)) @@ -220,38 +257,25 @@ bool log_in_use(const char* log_name) LOG_INFO* linfo; if ((linfo = tmp->current_linfo)) { - pthread_mutex_lock(&linfo->lock); + mysql_mutex_lock(&linfo->lock); result = !memcmp(log_name, linfo->log_file_name, log_name_len); - pthread_mutex_unlock(&linfo->lock); + mysql_mutex_unlock(&linfo->lock); if (result) break; } } - pthread_mutex_unlock(&LOCK_thread_count); + mysql_mutex_unlock(&LOCK_thread_count); return result; } bool purge_error_message(THD* thd, int res) { - uint errmsg= 0; - - switch (res) { - case 0: break; - case LOG_INFO_EOF: errmsg= ER_UNKNOWN_TARGET_BINLOG; break; - case LOG_INFO_IO: errmsg= ER_IO_ERR_LOG_INDEX_READ; break; - case LOG_INFO_INVALID:errmsg= ER_BINLOG_PURGE_PROHIBITED; break; - case LOG_INFO_SEEK: errmsg= ER_FSEEK_FAIL; break; - case LOG_INFO_MEM: errmsg= ER_OUT_OF_RESOURCES; break; - case LOG_INFO_FATAL: errmsg= ER_BINLOG_PURGE_FATAL_ERR; break; - case LOG_INFO_IN_USE: errmsg= ER_LOG_IN_USE; break; - case LOG_INFO_EMFILE: errmsg= ER_BINLOG_PURGE_EMFILE; break; - default: errmsg= ER_LOG_PURGE_UNKNOWN_ERR; break; - } + uint errcode; - if (errmsg) + if ((errcode= purge_log_get_error_code(res)) != 0) { - my_message(errmsg, ER(errmsg), MYF(0)); + my_message(errcode, ER(errcode), MYF(0)); return TRUE; } my_ok(thd); @@ -328,7 +352,7 @@ Increase max_allowed_packet on master"; *errmsg = "memory allocation failed reading log event"; break; case LOG_READ_TRUNC: - *errmsg = "binlog truncated in the middle of event"; + *errmsg = "binlog truncated in the middle of event; consider out of disk space on master"; break; default: *errmsg = "unknown error reading log event on the master"; @@ -338,6 +362,73 @@ Increase max_allowed_packet on master"; } +/** + An auxiliary function for calling in mysql_binlog_send + to initialize the heartbeat timeout in waiting for a binlogged event. + + @param[in] thd THD to access a user variable + + @return heartbeat period an ulonglong of nanoseconds + or zero if heartbeat was not demanded by slave +*/ +static ulonglong get_heartbeat_period(THD * thd) +{ + my_bool null_value; + LEX_STRING name= { C_STRING_WITH_LEN("master_heartbeat_period")}; + user_var_entry *entry= + (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str, + name.length); + return entry? entry->val_int(&null_value) : 0; +} + +/* + Function prepares and sends repliation heartbeat event. + + @param net net object of THD + @param packet buffer to store the heartbeat instance + @param event_coordinates binlog file name and position of the last + real event master sent from binlog + + @note + Among three essential pieces of heartbeat data Log_event::when + is computed locally. + The error to send is serious and should force terminating + the dump thread. +*/ +static int send_heartbeat_event(NET* net, String* packet, + const struct event_coordinates *coord) +{ + DBUG_ENTER("send_heartbeat_event"); + char header[LOG_EVENT_HEADER_LEN]; + /* + 'when' (the timestamp) is set to 0 so that slave could distinguish between + real and fake Rotate events (if necessary) + */ + memset(header, 0, 4); // when + + header[EVENT_TYPE_OFFSET] = HEARTBEAT_LOG_EVENT; + + char* p= coord->file_name + dirname_length(coord->file_name); + + uint ident_len = strlen(p); + ulong event_len = ident_len + LOG_EVENT_HEADER_LEN; + int4store(header + SERVER_ID_OFFSET, server_id); + int4store(header + EVENT_LEN_OFFSET, event_len); + int2store(header + FLAGS_OFFSET, 0); + + int4store(header + LOG_POS_OFFSET, coord->pos); // log_pos + + packet->append(header, sizeof(header)); + packet->append(p, ident_len); // log_file_name + + if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) || + net_flush(net)) + { + DBUG_RETURN(-1); + } + DBUG_RETURN(0); +} + /* TODO: Clean up loop to only have one call to send_file() */ @@ -348,14 +439,19 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, LOG_INFO linfo; char *log_file_name = linfo.log_file_name; char search_file_name[FN_REFLEN], *name; + + ulong ev_offset; + IO_CACHE log; File file = -1; String* packet = &thd->packet; int error; const char *errmsg = "Unknown error"; + char error_text[MAX_SLAVE_ERRMSG]; // to be send to slave via my_message() NET* net = &thd->net; - pthread_mutex_t *log_lock; - bool binlog_can_be_corrupted= FALSE; + mysql_mutex_t *log_lock; + mysql_cond_t *log_cond; + #ifndef DBUG_OFF int left_events = max_binlog_dump_events; #endif @@ -364,6 +460,30 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos)); bzero((char*) &log,sizeof(log)); + /* + heartbeat_period from @master_heartbeat_period user variable + */ + ulonglong heartbeat_period= get_heartbeat_period(thd); + struct timespec heartbeat_buf; + struct timespec *heartbeat_ts= NULL; + const LOG_POS_COORD start_coord= { log_ident, pos }, + *p_start_coord= &start_coord; + LOG_POS_COORD coord_buf= { log_file_name, BIN_LOG_HEADER_SIZE }, + *p_coord= &coord_buf; + if (heartbeat_period != LL(0)) + { + heartbeat_ts= &heartbeat_buf; + set_timespec_nsec(*heartbeat_ts, 0); + } + if (global_system_variables.log_warnings > 1) + sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)", + thd->server_id, log_ident, (ulong)pos); + if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos))) + { + errmsg= "Failed to run hook 'transmit_start'"; + my_errno= ER_UNKNOWN_ERROR; + goto err; + } #ifndef DBUG_OFF if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2)) @@ -402,9 +522,9 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, goto err; } - pthread_mutex_lock(&LOCK_thread_count); + mysql_mutex_lock(&LOCK_thread_count); thd->current_linfo = &linfo; - pthread_mutex_unlock(&LOCK_thread_count); + mysql_mutex_unlock(&LOCK_thread_count); if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0) { @@ -419,11 +539,9 @@ impossible position"; goto err; } - /* - We need to start a packet with something other than 255 - to distinguish it from error - */ - packet->set("\0", 1, &my_charset_bin); /* This is the start of a new packet */ + /* reset transmit packet for the fake rotate event below */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + goto err; /* Tell the client about the log name with a fake Rotate event; @@ -463,7 +581,7 @@ impossible position"; my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; goto err; } - packet->set("\0", 1, &my_charset_bin); + /* Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become this larger than the corresponding packet (query) sent @@ -476,9 +594,16 @@ impossible position"; mysql_bin_log, and it's already inited, and it will be destroyed only at shutdown). */ - log_lock = mysql_bin_log.get_log_lock(); + p_coord->pos= pos; // the first hb matches the slave's last seen value + log_lock= mysql_bin_log.get_log_lock(); + log_cond= mysql_bin_log.get_log_cond(); if (pos > BIN_LOG_HEADER_SIZE) { + /* reset transmit packet for the event read from binary log + file */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + goto err; + /* Try to find a Format_description_log_event at the beginning of the binlog @@ -486,29 +611,28 @@ impossible position"; if (!(error = Log_event::read_log_event(&log, packet, log_lock))) { /* - The packet has offsets equal to the normal offsets in a binlog - event +1 (the first character is \0). + The packet has offsets equal to the normal offsets in a + binlog event + ev_offset (the first ev_offset characters are + the header (default \0)). */ DBUG_PRINT("info", ("Looked for a Format_description_log_event, found event type %d", - (*packet)[EVENT_TYPE_OFFSET+1])); - if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT) + (*packet)[EVENT_TYPE_OFFSET+ev_offset])); + if ((*packet)[EVENT_TYPE_OFFSET+ev_offset] == FORMAT_DESCRIPTION_EVENT) { - binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] & - LOG_EVENT_BINLOG_IN_USE_F); - (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F; + (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F; /* mark that this event with "log_pos=0", so the slave should not increment master's binlog position (rli->group_master_log_pos) */ - int4store((char*) packet->ptr()+LOG_POS_OFFSET+1, 0); + int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, 0); /* if reconnect master sends FD event with `created' as 0 to avoid destroying temp tables. */ int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+ - ST_CREATED_OFFSET+1, (ulong) 0); + ST_CREATED_OFFSET+ev_offset, (ulong) 0); /* send it */ if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) { @@ -534,8 +658,6 @@ impossible position"; Format_description_log_event will be found naturally if it is written. */ } - /* reset the packet as we wrote to it in any case */ - packet->set("\0", 1, &my_charset_bin); } /* end of if (pos > BIN_LOG_HEADER_SIZE); */ else { @@ -547,10 +669,15 @@ impossible position"; while (!net->error && net->vio != 0 && !thd->killed) { - my_off_t prev_pos= pos; - while (!(error = Log_event::read_log_event(&log, packet, log_lock))) + Log_event_type event_type= UNKNOWN_EVENT; + + /* reset the transmit packet for the event read from binary log + file */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + goto err; + + while (!(error= Log_event::read_log_event(&log, packet, log_lock))) { - prev_pos= my_b_tell(&log); #ifndef DBUG_OFF if (max_binlog_dump_events && !left_events--) { @@ -560,10 +687,15 @@ impossible position"; goto err; } #endif + /* + log's filename does not change while it's active + */ + p_coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET); + event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]); DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid", { - if ((*packet)[EVENT_TYPE_OFFSET+1] == XID_EVENT) + if (event_type == XID_EVENT) { net_flush(net); const char act[]= @@ -574,15 +706,18 @@ impossible position"; STRING_WITH_LEN(act))); } }); - - if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT) + if (event_type == FORMAT_DESCRIPTION_EVENT) + { + (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F; + } + pos = my_b_tell(&log); + if (RUN_HOOK(binlog_transmit, before_send_event, + (thd, flags, packet, log_file_name, pos))) { - binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] & - LOG_EVENT_BINLOG_IN_USE_F); - (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F; + my_errno= ER_UNKNOWN_ERROR; + errmsg= "run 'before_send_event' hook failed"; + goto err; } - else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT) - binlog_can_be_corrupted= FALSE; if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) { @@ -593,15 +728,14 @@ impossible position"; DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid", { - if ((*packet)[EVENT_TYPE_OFFSET+1] == XID_EVENT) + if (event_type == XID_EVENT) { net_flush(net); } }); - DBUG_PRINT("info", ("log event code %d", - (*packet)[LOG_EVENT_OFFSET+1] )); - if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT) + DBUG_PRINT("info", ("log event code %d", event_type)); + if (event_type == LOAD_EVENT) { if (send_file(thd)) { @@ -610,18 +744,17 @@ impossible position"; goto err; } } - packet->set("\0", 1, &my_charset_bin); - } - /* - here we were reading binlog that was not closed properly (as a result - of a crash ?). treat any corruption as EOF - */ - if (binlog_can_be_corrupted && - error != LOG_READ_MEM && error != LOG_READ_EOF) - { - my_b_seek(&log, prev_pos); - error=LOG_READ_EOF; + if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet))) + { + errmsg= "Failed to run hook 'after_send_event'"; + my_errno= ER_UNKNOWN_ERROR; + goto err; + } + + /* reset transmit packet for next loop */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + goto err; } /* @@ -666,6 +799,11 @@ impossible position"; } #endif + /* reset the transmit packet for the event read from binary log + file */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + goto err; + /* No one will update the log while we are reading now, but we'll be quick and just read one record @@ -676,40 +814,96 @@ impossible position"; has not been updated since last read. */ - pthread_mutex_lock(log_lock); - switch (error= Log_event::read_log_event(&log, packet, (pthread_mutex_t*) 0)) { + mysql_mutex_lock(log_lock); + switch (error= Log_event::read_log_event(&log, packet, (mysql_mutex_t*) 0)) { case 0: /* we read successfully, so we'll need to send it to the slave */ - pthread_mutex_unlock(log_lock); + mysql_mutex_unlock(log_lock); read_packet = 1; + p_coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET); + event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]); break; case LOG_READ_EOF: + { + int ret; + ulong signal_cnt; DBUG_PRINT("wait",("waiting for data in binary log")); if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0) { - pthread_mutex_unlock(log_lock); + mysql_mutex_unlock(log_lock); goto end; } - if (!thd->killed) - { - /* Note that the following call unlocks lock_log */ - mysql_bin_log.wait_for_update(thd, 0); - } - else - pthread_mutex_unlock(log_lock); - DBUG_PRINT("wait",("binary log received update")); - break; - default: - pthread_mutex_unlock(log_lock); +#ifndef DBUG_OFF + ulong hb_info_counter= 0; +#endif + const char* old_msg= thd->proc_info; + signal_cnt= mysql_bin_log.signal_cnt; + do + { + if (heartbeat_period != 0) + { + DBUG_ASSERT(heartbeat_ts); + set_timespec_nsec(*heartbeat_ts, heartbeat_period); + } + thd->enter_cond(log_cond, log_lock, + "Master has sent all binlog to slave; " + "waiting for binlog to be updated"); + ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts); + DBUG_ASSERT(ret == 0 || (heartbeat_period != 0)); + if (ret == ETIMEDOUT || ret == ETIME) + { +#ifndef DBUG_OFF + if (hb_info_counter < 3) + { + sql_print_information("master sends heartbeat message"); + hb_info_counter++; + if (hb_info_counter == 3) + sql_print_information("the rest of heartbeat info skipped ..."); + } +#endif + /* reset transmit packet for the heartbeat event */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + { + thd->exit_cond(old_msg); + goto err; + } + if (send_heartbeat_event(net, packet, p_coord)) + { + errmsg = "Failed on my_net_write()"; + my_errno= ER_UNKNOWN_ERROR; + thd->exit_cond(old_msg); + goto err; + } + } + else + { + DBUG_PRINT("wait",("binary log received update or a broadcast signal caught")); + } + } while (signal_cnt == mysql_bin_log.signal_cnt && !thd->killed); + thd->exit_cond(old_msg); + } + break; + + default: + mysql_mutex_unlock(log_lock); test_for_non_eof_log_read_errors(error, &errmsg); goto err; } if (read_packet) - { - thd_proc_info(thd, "Sending binlog event to slave"); + { + thd_proc_info(thd, "Sending binlog event to slave"); + pos = my_b_tell(&log); + if (RUN_HOOK(binlog_transmit, before_send_event, + (thd, flags, packet, log_file_name, pos))) + { + my_errno= ER_UNKNOWN_ERROR; + errmsg= "run 'before_send_event' hook failed"; + goto err; + } + if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ) { errmsg = "Failed on my_net_write()"; @@ -717,7 +911,7 @@ impossible position"; goto err; } - if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT) + if (event_type == LOAD_EVENT) { if (send_file(thd)) { @@ -726,11 +920,13 @@ impossible position"; goto err; } } - packet->set("\0", 1, &my_charset_bin); - /* - No need to net_flush because we will get to flush later when - we hit EOF pretty quick - */ + + if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet))) + { + my_errno= ER_UNKNOWN_ERROR; + errmsg= "Failed to run hook 'after_send_event'"; + goto err; + } } log.error=0; @@ -761,8 +957,12 @@ impossible position"; break; end_io_cache(&log); - (void) my_close(file, MYF(MY_WME)); + mysql_file_close(file, MYF(MY_WME)); + /* reset transmit packet for the possible fake rotate event */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + goto err; + /* Call fake_rotate_event() in case the previous log (the one which we have just finished reading) did not contain a Rotate event @@ -780,26 +980,44 @@ impossible position"; goto err; } - packet->length(0); - packet->append('\0'); + p_coord->file_name= log_file_name; // reset to the next } } end: end_io_cache(&log); - (void)my_close(file, MYF(MY_WME)); + mysql_file_close(file, MYF(MY_WME)); + RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags)); my_eof(thd); thd_proc_info(thd, "Waiting to finalize termination"); - pthread_mutex_lock(&LOCK_thread_count); + mysql_mutex_lock(&LOCK_thread_count); thd->current_linfo = 0; - pthread_mutex_unlock(&LOCK_thread_count); + mysql_mutex_unlock(&LOCK_thread_count); thd->variables.max_allowed_packet= old_max_allowed_packet; DBUG_VOID_RETURN; err: thd_proc_info(thd, "Waiting to finalize termination"); + if (my_errno == ER_MASTER_FATAL_ERROR_READING_BINLOG && my_b_inited(&log)) + { + /* + detailing the fatal error message with coordinates + of the last position read. + */ + my_snprintf(error_text, sizeof(error_text), + "%s; the first event '%s' at %lld, " + "the last event read from '%s' at %lld, " + "the last byte read from '%s' at %lld.", + errmsg, + p_start_coord->file_name, p_start_coord->pos, + p_coord->file_name, p_coord->pos, + log_file_name, my_b_tell(&log)); + } + else + strcpy(error_text, errmsg); end_io_cache(&log); + RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags)); /* Exclude iteration through thread list this is needed for purge_logs() - it will iterate through @@ -807,14 +1025,14 @@ err: this mutex will make sure that it never tried to update our linfo after we return from this stack frame */ - pthread_mutex_lock(&LOCK_thread_count); + mysql_mutex_lock(&LOCK_thread_count); thd->current_linfo = 0; - pthread_mutex_unlock(&LOCK_thread_count); + mysql_mutex_unlock(&LOCK_thread_count); if (file >= 0) - (void) my_close(file, MYF(MY_WME)); + mysql_file_close(file, MYF(MY_WME)); thd->variables.max_allowed_packet= old_max_allowed_packet; - my_message(my_errno, errmsg, MYF(0)); + my_message(my_errno, error_text, MYF(0)); DBUG_VOID_RETURN; } @@ -827,7 +1045,7 @@ err: @param mi Pointer to Master_info object for the slave's IO thread. - @param net_report If true, saves the exit status into thd->main_da. + @param net_report If true, saves the exit status into thd->stmt_da. @retval 0 success @retval 1 error @@ -838,7 +1056,7 @@ int start_slave(THD* thd , Master_info* mi, bool net_report) int thread_mask; DBUG_ENTER("start_slave"); - if (check_access(thd, SUPER_ACL, any_db,0,0,0,0)) + if (check_access(thd, SUPER_ACL, any_db, NULL, NULL, 0, 0)) DBUG_RETURN(1); lock_slave_threads(mi); // this allows us to cleanly read slave_running // Get a mask of _stopped_ threads @@ -865,7 +1083,7 @@ int start_slave(THD* thd , Master_info* mi, bool net_report) */ if (thread_mask & SLAVE_SQL) { - pthread_mutex_lock(&mi->rli.data_lock); + mysql_mutex_lock(&mi->rli.data_lock); if (thd->lex->mi.pos) { @@ -923,7 +1141,7 @@ int start_slave(THD* thd , Master_info* mi, bool net_report) ER(ER_MISSING_SKIP_SLAVE)); } - pthread_mutex_unlock(&mi->rli.data_lock); + mysql_mutex_unlock(&mi->rli.data_lock); } else if (thd->lex->mi.pos || thd->lex->mi.relay_log_pos) push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED, @@ -969,7 +1187,7 @@ int start_slave(THD* thd , Master_info* mi, bool net_report) @param mi Pointer to Master_info object for the slave's IO thread. - @param net_report If true, saves the exit status into thd->main_da. + @param net_report If true, saves the exit status into thd->stmt_da. @retval 0 success @retval 1 error @@ -982,7 +1200,7 @@ int stop_slave(THD* thd, Master_info* mi, bool net_report ) if (!thd) thd = current_thd; - if (check_access(thd, SUPER_ACL, any_db,0,0,0,0)) + if (check_access(thd, SUPER_ACL, any_db, NULL, NULL, 0, 0)) DBUG_RETURN(1); thd_proc_info(thd, "Killing slave"); int thread_mask; @@ -1066,14 +1284,9 @@ int reset_slave(THD *thd, Master_info* mi) goto err; } - /* - Clear master's log coordinates and reset host/user/etc to the values - specified in mysqld's options (only for good display of SHOW SLAVE STATUS; - next init_master_info() (in start_slave() for example) would have set them - the same way; but here this is for the case where the user does SHOW SLAVE - STATUS; before doing START SLAVE; - */ - init_master_info_with_options(mi); + /* Clear master's log coordinates and associated information */ + mi->clear_in_memory_info(thd->lex->reset_slave_info.all); + /* Reset errors (the idea is that we forget about the old master). @@ -1086,19 +1299,22 @@ int reset_slave(THD *thd, Master_info* mi) end_master_info(mi); // and delete these two files fn_format(fname, master_info_file, mysql_data_home, "", 4+32); - if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME))) + if (mysql_file_stat(key_file_master_info, fname, &stat_area, MYF(0)) && + mysql_file_delete(key_file_master_info, fname, MYF(MY_WME))) { error=1; goto err; } // delete relay_log_info_file fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32); - if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME))) + if (mysql_file_stat(key_file_relay_log_info, fname, &stat_area, MYF(0)) && + mysql_file_delete(key_file_relay_log_info, fname, MYF(MY_WME))) { error=1; goto err; } + RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi)); err: unlock_slave_threads(mi); if (error) @@ -1110,7 +1326,7 @@ err: Kill all Binlog_dump threads which previously talked to the same slave ("same" means with the same server id). Indeed, if the slave stops, if the - Binlog_dump thread is waiting (pthread_cond_wait) for binlog update, then it + Binlog_dump thread is waiting (mysql_cond_wait) for binlog update, then it will keep existing until a query is written to the binlog. If the master is idle, then this could last long, and if the slave reconnects, we could have 2 Binlog_dump threads in SHOW PROCESSLIST, until a query is written to the @@ -1128,7 +1344,7 @@ err: void kill_zombie_dump_threads(uint32 slave_server_id) { - pthread_mutex_lock(&LOCK_thread_count); + mysql_mutex_lock(&LOCK_thread_count); I_List_iterator<THD> it(threads); THD *tmp; @@ -1137,11 +1353,11 @@ void kill_zombie_dump_threads(uint32 slave_server_id) if (tmp->command == COM_BINLOG_DUMP && tmp->server_id == slave_server_id) { - pthread_mutex_lock(&tmp->LOCK_thd_kill); // Lock from delete + mysql_mutex_lock(&tmp->LOCK_thd_data); // Lock from delete break; } } - pthread_mutex_unlock(&LOCK_thread_count); + mysql_mutex_unlock(&LOCK_thread_count); if (tmp) { /* @@ -1150,7 +1366,7 @@ void kill_zombie_dump_threads(uint32 slave_server_id) again. We just to do kill the thread ourselves. */ tmp->awake(THD::KILL_QUERY); - pthread_mutex_unlock(&tmp->LOCK_thd_kill); + mysql_mutex_unlock(&tmp->LOCK_thd_data); } } @@ -1172,6 +1388,7 @@ bool change_master(THD* thd, Master_info* mi) int thread_mask; const char* errmsg= 0; bool need_relay_log_purge= 1; + bool ret= FALSE; char saved_host[HOSTNAME_LENGTH + 1]; uint saved_port; char saved_log_name[FN_REFLEN]; @@ -1180,22 +1397,35 @@ bool change_master(THD* thd, Master_info* mi) lock_slave_threads(mi); init_thread_mask(&thread_mask,mi,0 /*not inverse*/); + LEX_MASTER_INFO* lex_mi= &thd->lex->mi; if (thread_mask) // We refuse if any slave thread is running { my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0)); - unlock_slave_threads(mi); - DBUG_RETURN(TRUE); + ret= TRUE; + goto err; } thd_proc_info(thd, "Changing master"); - LEX_MASTER_INFO* lex_mi= &thd->lex->mi; + /* + We need to check if there is an empty master_host. Otherwise + change master succeeds, a master.info file is created containing + empty master_host string and when issuing: start slave; an error + is thrown stating that the server is not configured as slave. + (See BUG#28796). + */ + if(lex_mi->host && !*lex_mi->host) + { + my_error(ER_WRONG_ARGUMENTS, MYF(0), "MASTER_HOST"); + unlock_slave_threads(mi); + DBUG_RETURN(TRUE); + } // TODO: see if needs re-write if (init_master_info(mi, master_info_file, relay_log_info_file, 0, thread_mask)) { my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0)); - unlock_slave_threads(mi); - DBUG_RETURN(TRUE); + ret= TRUE; + goto err; } /* @@ -1242,13 +1472,46 @@ bool change_master(THD* thd, Master_info* mi) mi->port = lex_mi->port; if (lex_mi->connect_retry) mi->connect_retry = lex_mi->connect_retry; + if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED) + mi->heartbeat_period = lex_mi->heartbeat_period; + else + mi->heartbeat_period= (float) min(SLAVE_MAX_HEARTBEAT_PERIOD, + (slave_net_timeout/2.0)); + mi->received_heartbeats= LL(0); // counter lives until master is CHANGEd + /* + reset the last time server_id list if the current CHANGE MASTER + is mentioning IGNORE_SERVER_IDS= (...) + */ + if (lex_mi->repl_ignore_server_ids_opt == LEX_MASTER_INFO::LEX_MI_ENABLE) + reset_dynamic(&mi->ignore_server_ids); + for (uint i= 0; i < lex_mi->repl_ignore_server_ids.elements; i++) + { + ulong s_id; + get_dynamic(&lex_mi->repl_ignore_server_ids, (uchar*) &s_id, i); + if (s_id == ::server_id && replicate_same_server_id) + { + my_error(ER_SLAVE_IGNORE_SERVER_IDS, MYF(0), static_cast<int>(s_id)); + ret= TRUE; + goto err; + } + else + { + if (bsearch((const ulong *) &s_id, + mi->ignore_server_ids.buffer, + mi->ignore_server_ids.elements, sizeof(ulong), + (int (*) (const void*, const void*)) + change_master_server_id_cmp) == NULL) + insert_dynamic(&mi->ignore_server_ids, (uchar*) &s_id); + } + } + sort_dynamic(&mi->ignore_server_ids, (qsort_cmp) change_master_server_id_cmp); - if (lex_mi->ssl != LEX_MASTER_INFO::SSL_UNCHANGED) - mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::SSL_ENABLE); + if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED) + mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE); - if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::SSL_UNCHANGED) + if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED) mi->ssl_verify_server_cert= - (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::SSL_ENABLE); + (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE); if (lex_mi->ssl_ca) strmake(mi->ssl_ca, lex_mi->ssl_ca, sizeof(mi->ssl_ca)-1); @@ -1271,9 +1534,11 @@ bool change_master(THD* thd, Master_info* mi) if (lex_mi->relay_log_name) { need_relay_log_purge= 0; - strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name, + char relay_log_name[FN_REFLEN]; + mi->rli.relay_log.make_log_name(relay_log_name, lex_mi->relay_log_name); + strmake(mi->rli.group_relay_log_name, relay_log_name, sizeof(mi->rli.group_relay_log_name)-1); - strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name, + strmake(mi->rli.event_relay_log_name, relay_log_name, sizeof(mi->rli.event_relay_log_name)-1); } @@ -1320,8 +1585,8 @@ bool change_master(THD* thd, Master_info* mi) if (flush_master_info(mi, FALSE, FALSE)) { my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file"); - unlock_slave_threads(mi); - DBUG_RETURN(TRUE); + ret= TRUE; + goto err; } if (need_relay_log_purge) { @@ -1332,8 +1597,8 @@ bool change_master(THD* thd, Master_info* mi) &errmsg)) { my_error(ER_RELAY_LOG_FAIL, MYF(0), errmsg); - unlock_slave_threads(mi); - DBUG_RETURN(TRUE); + ret= TRUE; + goto err; } } else @@ -1348,8 +1613,8 @@ bool change_master(THD* thd, Master_info* mi) &msg, 0)) { my_error(ER_RELAY_LOG_INIT, MYF(0), msg); - unlock_slave_threads(mi); - DBUG_RETURN(TRUE); + ret= TRUE; + goto err; } } /* @@ -1370,7 +1635,7 @@ bool change_master(THD* thd, Master_info* mi) if (!mi->rli.group_master_log_name[0]) // uninitialized case mi->rli.group_master_log_pos=0; - pthread_mutex_lock(&mi->rli.data_lock); + mysql_mutex_lock(&mi->rli.data_lock); mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */ /* Clear the errors, for a clean start */ mi->rli.clear_error(); @@ -1392,13 +1657,15 @@ bool change_master(THD* thd, Master_info* mi) not exist anymore). */ flush_relay_log_info(&mi->rli); - pthread_cond_broadcast(&mi->data_cond); - pthread_mutex_unlock(&mi->rli.data_lock); + mysql_cond_broadcast(&mi->data_cond); + mysql_mutex_unlock(&mi->rli.data_lock); +err: unlock_slave_threads(mi); thd_proc_info(thd, 0); - my_ok(thd); - DBUG_RETURN(FALSE); + if (ret == FALSE) + my_ok(thd); + DBUG_RETURN(ret); } @@ -1419,24 +1686,11 @@ int reset_master(THD* thd) ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG)); return 1; } - return mysql_bin_log.reset_logs(thd); -} - -int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1, - const char* log_file_name2, ulonglong log_pos2) -{ - int res; - size_t log_file_name1_len= strlen(log_file_name1); - size_t log_file_name2_len= strlen(log_file_name2); - // We assume that both log names match up to '.' - if (log_file_name1_len == log_file_name2_len) - { - if ((res= strcmp(log_file_name1, log_file_name2))) - return res; - return (log_pos1 < log_pos2) ? -1 : (log_pos1 == log_pos2) ? 0 : 1; - } - return ((log_file_name1_len < log_file_name2_len) ? -1 : 1); + if (mysql_bin_log.reset_logs(thd)) + return 1; + RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */)); + return 0; } @@ -1457,27 +1711,44 @@ bool mysql_show_binlog_events(THD* thd) bool ret = TRUE; IO_CACHE log; File file = -1; + MYSQL_BIN_LOG *binary_log= NULL; int old_max_allowed_packet= thd->variables.max_allowed_packet; LOG_INFO linfo; DBUG_ENTER("mysql_show_binlog_events"); Log_event::init_show_field_list(&field_list); - if (protocol->send_fields(&field_list, + if (protocol->send_result_set_metadata(&field_list, Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(TRUE); Format_description_log_event *description_event= new Format_description_log_event(3); /* MySQL 4.0 by default */ - /* - Wait for handlers to insert any pending information - into the binlog. For e.g. ndb which updates the binlog asynchronously - this is needed so that the uses sees all its own commands in the binlog - */ - ha_binlog_wait(thd); + DBUG_ASSERT(thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS || + thd->lex->sql_command == SQLCOM_SHOW_RELAYLOG_EVENTS); - if (mysql_bin_log.is_open()) + /* select wich binary log to use: binlog or relay */ + if ( thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS ) + { + /* + Wait for handlers to insert any pending information + into the binlog. For e.g. ndb which updates the binlog asynchronously + this is needed so that the uses sees all its own commands in the binlog + */ + ha_binlog_wait(thd); + + binary_log= &mysql_bin_log; + } + else /* showing relay log contents */ + { + if (!active_mi) + DBUG_RETURN(TRUE); + + binary_log= &(active_mi->rli.relay_log); + } + + if (binary_log->is_open()) { LEX_MASTER_INFO *lex_mi= &thd->lex->mi; SELECT_LEX_UNIT *unit= &thd->lex->unit; @@ -1485,7 +1756,7 @@ bool mysql_show_binlog_events(THD* thd) my_off_t pos = max(BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly char search_file_name[FN_REFLEN], *name; const char *log_file_name = lex_mi->log_file_name; - pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock(); + mysql_mutex_t *log_lock = binary_log->get_log_lock(); Log_event* ev; unit->set_limit(thd->lex->current_select); @@ -1494,21 +1765,21 @@ bool mysql_show_binlog_events(THD* thd) name= search_file_name; if (log_file_name) - mysql_bin_log.make_log_name(search_file_name, log_file_name); + binary_log->make_log_name(search_file_name, log_file_name); else name=0; // Find first log linfo.index_file_offset = 0; - if (mysql_bin_log.find_log_pos(&linfo, name, 1)) + if (binary_log->find_log_pos(&linfo, name, 1)) { errmsg = "Could not find target log"; goto err; } - pthread_mutex_lock(&LOCK_thread_count); + mysql_mutex_lock(&LOCK_thread_count); thd->current_linfo = &linfo; - pthread_mutex_unlock(&LOCK_thread_count); + mysql_mutex_unlock(&LOCK_thread_count); if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0) goto err; @@ -1518,7 +1789,7 @@ bool mysql_show_binlog_events(THD* thd) */ thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER; - pthread_mutex_lock(log_lock); + mysql_mutex_lock(log_lock); /* open_binlog() sought to position 4. @@ -1528,7 +1799,7 @@ bool mysql_show_binlog_events(THD* thd) This code will fail on a mixed relay log (one which has Format_desc then Rotate then Format_desc). */ - ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event); + ev= Log_event::read_log_event(&log, (mysql_mutex_t*)0, description_event); if (ev) { if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT) @@ -1549,7 +1820,7 @@ bool mysql_show_binlog_events(THD* thd) } for (event_count = 0; - (ev = Log_event::read_log_event(&log,(pthread_mutex_t*) 0, + (ev = Log_event::read_log_event(&log, (mysql_mutex_t*) 0, description_event)); ) { if (event_count >= limit_start && @@ -1557,7 +1828,7 @@ bool mysql_show_binlog_events(THD* thd) { errmsg = "Net error"; delete ev; - pthread_mutex_unlock(log_lock); + mysql_mutex_unlock(log_lock); goto err; } @@ -1571,11 +1842,11 @@ bool mysql_show_binlog_events(THD* thd) if (event_count < limit_end && log.error) { errmsg = "Wrong offset or I/O error"; - pthread_mutex_unlock(log_lock); + mysql_mutex_unlock(log_lock); goto err; } - pthread_mutex_unlock(log_lock); + mysql_mutex_unlock(log_lock); } // Check that linfo is still on the function scope. DEBUG_SYNC(thd, "after_show_binlog_events"); @@ -1587,7 +1858,7 @@ err: if (file >= 0) { end_io_cache(&log); - (void) my_close(file, MYF(MY_WME)); + mysql_file_close(file, MYF(MY_WME)); } if (errmsg) @@ -1596,9 +1867,9 @@ err: else my_eof(thd); - pthread_mutex_lock(&LOCK_thread_count); + mysql_mutex_lock(&LOCK_thread_count); thd->current_linfo = 0; - pthread_mutex_unlock(&LOCK_thread_count); + mysql_mutex_unlock(&LOCK_thread_count); thd->variables.max_allowed_packet= old_max_allowed_packet; DBUG_RETURN(ret); } @@ -1624,7 +1895,7 @@ bool show_binlog_info(THD* thd) field_list.push_back(new Item_empty_string("Binlog_Do_DB",255)); field_list.push_back(new Item_empty_string("Binlog_Ignore_DB",255)); - if (protocol->send_fields(&field_list, + if (protocol->send_result_set_metadata(&field_list, Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(TRUE); protocol->prepare_for_resend(); @@ -1669,23 +1940,23 @@ bool show_binlogs(THD* thd) if (!mysql_bin_log.is_open()) { - my_message(ER_NO_BINARY_LOGGING, ER(ER_NO_BINARY_LOGGING), MYF(0)); + my_error(ER_NO_BINARY_LOGGING, MYF(0)); DBUG_RETURN(TRUE); } field_list.push_back(new Item_empty_string("Log_name", 255)); field_list.push_back(new Item_return_int("File_size", 20, MYSQL_TYPE_LONGLONG)); - if (protocol->send_fields(&field_list, + if (protocol->send_result_set_metadata(&field_list, Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(TRUE); - pthread_mutex_lock(mysql_bin_log.get_log_lock()); + mysql_mutex_lock(mysql_bin_log.get_log_lock()); mysql_bin_log.lock_index(); index_file=mysql_bin_log.get_index_file(); mysql_bin_log.raw_get_current_log(&cur); // dont take mutex - pthread_mutex_unlock(mysql_bin_log.get_log_lock()); // lockdep, OK + mysql_mutex_unlock(mysql_bin_log.get_log_lock()); // lockdep, OK cur_dir_len= dirname_length(cur.log_file_name); @@ -1708,11 +1979,12 @@ bool show_binlogs(THD* thd) else { /* this is an old log, open it and find the size */ - if ((file= my_open(fname, O_RDONLY | O_SHARE | O_BINARY, - MYF(0))) >= 0) + if ((file= mysql_file_open(key_file_binlog, + fname, O_RDONLY | O_SHARE | O_BINARY, + MYF(0))) >= 0) { - file_length= (ulonglong) my_seek(file, 0L, MY_SEEK_END, MYF(0)); - my_close(file, MYF(0)); + file_length= (ulonglong) mysql_file_seek(file, 0L, MY_SEEK_END, MYF(0)); + mysql_file_close(file, MYF(0)); } } protocol->store(file_length); @@ -1747,7 +2019,7 @@ int log_loaded_block(IO_CACHE* file) uchar* buffer= (uchar*) my_b_get_buffer_start(file); uint max_event_size= current_thd->variables.max_allowed_packet; lf_info= (LOAD_FILE_INFO*) file->arg; - if (lf_info->thd->current_stmt_binlog_row_based) + if (lf_info->thd->is_current_stmt_binlog_format_row()) DBUG_RETURN(0); if (lf_info->last_pos_in_file != HA_POS_ERROR && lf_info->last_pos_in_file >= my_b_get_pos_in_file(file)) @@ -1780,124 +2052,4 @@ int log_loaded_block(IO_CACHE* file) DBUG_RETURN(0); } -/* - Replication System Variables -*/ - -class sys_var_slave_skip_counter :public sys_var -{ -public: - sys_var_slave_skip_counter(sys_var_chain *chain, const char *name_arg) - :sys_var(name_arg) - { chain_sys_var(chain); } - bool check(THD *thd, set_var *var); - bool update(THD *thd, set_var *var); - bool check_type(enum_var_type type) { return type != OPT_GLOBAL; } - /* - We can't retrieve the value of this, so we don't have to define - type() or value_ptr() - */ -}; - -class sys_var_sync_binlog_period :public sys_var_long_ptr -{ -public: - sys_var_sync_binlog_period(sys_var_chain *chain, const char *name_arg, - ulong *value_ptr) - :sys_var_long_ptr(chain, name_arg,value_ptr) {} - bool update(THD *thd, set_var *var); -}; - -static sys_var_chain vars = { NULL, NULL }; - -static sys_var_const sys_log_slave_updates(&vars, "log_slave_updates", - OPT_GLOBAL, SHOW_MY_BOOL, - (uchar*) &opt_log_slave_updates); -static sys_var_const sys_relay_log(&vars, "relay_log", - OPT_GLOBAL, SHOW_CHAR_PTR, - (uchar*) &opt_relay_logname); -static sys_var_const sys_relay_log_index(&vars, "relay_log_index", - OPT_GLOBAL, SHOW_CHAR_PTR, - (uchar*) &opt_relaylog_index_name); -static sys_var_const sys_relay_log_info_file(&vars, "relay_log_info_file", - OPT_GLOBAL, SHOW_CHAR_PTR, - (uchar*) &relay_log_info_file); -static sys_var_bool_ptr sys_relay_log_purge(&vars, "relay_log_purge", - &relay_log_purge); -static sys_var_const sys_relay_log_space_limit(&vars, - "relay_log_space_limit", - OPT_GLOBAL, SHOW_LONGLONG, - (uchar*) - &relay_log_space_limit); -static sys_var_const sys_slave_load_tmpdir(&vars, "slave_load_tmpdir", - OPT_GLOBAL, SHOW_CHAR_PTR, - (uchar*) &slave_load_tmpdir); -static sys_var_long_ptr sys_slave_net_timeout(&vars, "slave_net_timeout", - &slave_net_timeout); -static sys_var_const sys_slave_skip_errors(&vars, "slave_skip_errors", - OPT_GLOBAL, SHOW_CHAR, - (uchar*) slave_skip_error_names); -static sys_var_long_ptr sys_slave_trans_retries(&vars, "slave_transaction_retries", - &slave_trans_retries); -static sys_var_sync_binlog_period sys_sync_binlog_period(&vars, "sync_binlog", &sync_binlog_period); -static sys_var_slave_skip_counter sys_slave_skip_counter(&vars, "sql_slave_skip_counter"); - - -bool sys_var_slave_skip_counter::check(THD *thd, set_var *var) -{ - int result= 0; - pthread_mutex_lock(&LOCK_active_mi); - pthread_mutex_lock(&active_mi->rli.run_lock); - if (active_mi->rli.slave_running) - { - my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0)); - result=1; - } - pthread_mutex_unlock(&active_mi->rli.run_lock); - pthread_mutex_unlock(&LOCK_active_mi); - var->save_result.ulong_value= (ulong) var->value->val_int(); - return result; -} - - -bool sys_var_slave_skip_counter::update(THD *thd, set_var *var) -{ - pthread_mutex_lock(&LOCK_active_mi); - pthread_mutex_lock(&active_mi->rli.run_lock); - /* - The following test should normally never be true as we test this - in the check function; To be safe against multiple - SQL_SLAVE_SKIP_COUNTER request, we do the check anyway - */ - if (!active_mi->rli.slave_running) - { - pthread_mutex_lock(&active_mi->rli.data_lock); - active_mi->rli.slave_skip_counter= var->save_result.ulong_value; - pthread_mutex_unlock(&active_mi->rli.data_lock); - } - pthread_mutex_unlock(&active_mi->rli.run_lock); - pthread_mutex_unlock(&LOCK_active_mi); - return 0; -} - - -bool sys_var_sync_binlog_period::update(THD *thd, set_var *var) -{ - sync_binlog_period= (ulong) var->save_result.ulonglong_value; - return 0; -} - -int init_replication_sys_vars() -{ - if (mysql_add_sys_var_chain(vars.first, my_long_options)) - { - /* should not happen */ - fprintf(stderr, "failed to initialize replication system variables"); - unireg_abort(1); - } - return 0; -} - #endif /* HAVE_REPLICATION */ - - |