summaryrefslogtreecommitdiff
path: root/sql/sql_repl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r--sql/sql_repl.cc804
1 files changed, 478 insertions, 326 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index c15382c669e..15d0d5c90d5 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -1,5 +1,4 @@
-/*
- Copyright (c) 2000, 2011, Oracle and/or its affiliates. All rights reserved.
+/* Copyright (c) 2000, 2012, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -12,17 +11,20 @@
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
- Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
-*/
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
-#include "mysql_priv.h"
+#include "sql_priv.h"
+#include "unireg.h"
+#include "sql_parse.h" // check_access
#ifdef HAVE_REPLICATION
#include "rpl_mi.h"
#include "sql_repl.h"
+#include "sql_acl.h" // SUPER_ACL
#include "log_event.h"
#include "rpl_filter.h"
#include <my_dir.h>
+#include "rpl_handler.h"
#include "debug_sync.h"
int max_binlog_dump_events = 0; // unlimited
@@ -31,6 +33,14 @@ my_bool opt_sporadic_binlog_dump_fail = 0;
static int binlog_dump_count = 0;
#endif
+/**
+ a copy of active_mi->rli->slave_skip_counter, for showing in SHOW VARIABLES,
+ INFORMATION_SCHEMA.GLOBAL_VARIABLES and @@sql_slave_skip_counter without
+ taking all the mutexes needed to access active_mi->rli->slave_skip_counter
+ properly.
+*/
+uint sql_slave_skip_counter;
+
/*
fake_rotate_event() builds a fake (=which does not exist physically in any
binlog) Rotate event, which contains the name of the binlog we are going to
@@ -83,6 +93,32 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
DBUG_RETURN(0);
}
+/*
+ Reset thread transmit packet buffer for event sending
+
+ This function allocates header bytes for event transmission, and
+ should be called before store the event data to the packet buffer.
+*/
+static int reset_transmit_packet(THD *thd, ushort flags,
+ ulong *ev_offset, const char **errmsg)
+{
+ int ret= 0;
+ String *packet= &thd->packet;
+
+ /* reserve and set default header */
+ packet->length(0);
+ packet->set("\0", 1, &my_charset_bin);
+
+ if (RUN_HOOK(binlog_transmit, reserve_header, (thd, flags, packet)))
+ {
+ *errmsg= "Failed to run hook 'reserve_header'";
+ my_errno= ER_UNKNOWN_ERROR;
+ ret= 1;
+ }
+ *ev_offset= packet->length();
+ return ret;
+}
+
static int send_file(THD *thd)
{
NET* net = &thd->net;
@@ -119,13 +155,14 @@ static int send_file(THD *thd)
if (!strcmp(fname,"/dev/null"))
goto end;
- if ((fd = my_open(fname, O_RDONLY, MYF(0))) < 0)
+ if ((fd= mysql_file_open(key_file_send_file,
+ fname, O_RDONLY, MYF(0))) < 0)
{
errmsg = "on open of file";
goto err;
}
- while ((long) (bytes= my_read(fd, buf, IO_SIZE, MYF(0))) > 0)
+ while ((long) (bytes= mysql_file_read(fd, buf, IO_SIZE, MYF(0))) > 0)
{
if (my_net_write(net, buf, bytes))
{
@@ -146,7 +183,7 @@ static int send_file(THD *thd)
err:
my_net_set_read_timeout(net, old_timeout);
if (fd >= 0)
- (void) my_close(fd, MYF(0));
+ mysql_file_close(fd, MYF(0));
if (errmsg)
{
sql_print_error("Failed in send_file() %s", errmsg);
@@ -181,7 +218,7 @@ void adjust_linfo_offsets(my_off_t purge_offset)
{
THD *tmp;
- pthread_mutex_lock(&LOCK_thread_count);
+ mysql_mutex_lock(&LOCK_thread_count);
I_List_iterator<THD> it(threads);
while ((tmp=it++))
@@ -189,7 +226,7 @@ void adjust_linfo_offsets(my_off_t purge_offset)
LOG_INFO* linfo;
if ((linfo = tmp->current_linfo))
{
- pthread_mutex_lock(&linfo->lock);
+ mysql_mutex_lock(&linfo->lock);
/*
Index file offset can be less that purge offset only if
we just started reading the index file. In that case
@@ -199,10 +236,10 @@ void adjust_linfo_offsets(my_off_t purge_offset)
linfo->fatal = (linfo->index_file_offset != 0);
else
linfo->index_file_offset -= purge_offset;
- pthread_mutex_unlock(&linfo->lock);
+ mysql_mutex_unlock(&linfo->lock);
}
}
- pthread_mutex_unlock(&LOCK_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
}
@@ -212,7 +249,7 @@ bool log_in_use(const char* log_name)
THD *tmp;
bool result = 0;
- pthread_mutex_lock(&LOCK_thread_count);
+ mysql_mutex_lock(&LOCK_thread_count);
I_List_iterator<THD> it(threads);
while ((tmp=it++))
@@ -220,38 +257,25 @@ bool log_in_use(const char* log_name)
LOG_INFO* linfo;
if ((linfo = tmp->current_linfo))
{
- pthread_mutex_lock(&linfo->lock);
+ mysql_mutex_lock(&linfo->lock);
result = !memcmp(log_name, linfo->log_file_name, log_name_len);
- pthread_mutex_unlock(&linfo->lock);
+ mysql_mutex_unlock(&linfo->lock);
if (result)
break;
}
}
- pthread_mutex_unlock(&LOCK_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
return result;
}
bool purge_error_message(THD* thd, int res)
{
- uint errmsg= 0;
-
- switch (res) {
- case 0: break;
- case LOG_INFO_EOF: errmsg= ER_UNKNOWN_TARGET_BINLOG; break;
- case LOG_INFO_IO: errmsg= ER_IO_ERR_LOG_INDEX_READ; break;
- case LOG_INFO_INVALID:errmsg= ER_BINLOG_PURGE_PROHIBITED; break;
- case LOG_INFO_SEEK: errmsg= ER_FSEEK_FAIL; break;
- case LOG_INFO_MEM: errmsg= ER_OUT_OF_RESOURCES; break;
- case LOG_INFO_FATAL: errmsg= ER_BINLOG_PURGE_FATAL_ERR; break;
- case LOG_INFO_IN_USE: errmsg= ER_LOG_IN_USE; break;
- case LOG_INFO_EMFILE: errmsg= ER_BINLOG_PURGE_EMFILE; break;
- default: errmsg= ER_LOG_PURGE_UNKNOWN_ERR; break;
- }
+ uint errcode;
- if (errmsg)
+ if ((errcode= purge_log_get_error_code(res)) != 0)
{
- my_message(errmsg, ER(errmsg), MYF(0));
+ my_message(errcode, ER(errcode), MYF(0));
return TRUE;
}
my_ok(thd);
@@ -328,7 +352,7 @@ Increase max_allowed_packet on master";
*errmsg = "memory allocation failed reading log event";
break;
case LOG_READ_TRUNC:
- *errmsg = "binlog truncated in the middle of event";
+ *errmsg = "binlog truncated in the middle of event; consider out of disk space on master";
break;
default:
*errmsg = "unknown error reading log event on the master";
@@ -338,6 +362,73 @@ Increase max_allowed_packet on master";
}
+/**
+ An auxiliary function for calling in mysql_binlog_send
+ to initialize the heartbeat timeout in waiting for a binlogged event.
+
+ @param[in] thd THD to access a user variable
+
+ @return heartbeat period an ulonglong of nanoseconds
+ or zero if heartbeat was not demanded by slave
+*/
+static ulonglong get_heartbeat_period(THD * thd)
+{
+ my_bool null_value;
+ LEX_STRING name= { C_STRING_WITH_LEN("master_heartbeat_period")};
+ user_var_entry *entry=
+ (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
+ name.length);
+ return entry? entry->val_int(&null_value) : 0;
+}
+
+/*
+ Function prepares and sends repliation heartbeat event.
+
+ @param net net object of THD
+ @param packet buffer to store the heartbeat instance
+ @param event_coordinates binlog file name and position of the last
+ real event master sent from binlog
+
+ @note
+ Among three essential pieces of heartbeat data Log_event::when
+ is computed locally.
+ The error to send is serious and should force terminating
+ the dump thread.
+*/
+static int send_heartbeat_event(NET* net, String* packet,
+ const struct event_coordinates *coord)
+{
+ DBUG_ENTER("send_heartbeat_event");
+ char header[LOG_EVENT_HEADER_LEN];
+ /*
+ 'when' (the timestamp) is set to 0 so that slave could distinguish between
+ real and fake Rotate events (if necessary)
+ */
+ memset(header, 0, 4); // when
+
+ header[EVENT_TYPE_OFFSET] = HEARTBEAT_LOG_EVENT;
+
+ char* p= coord->file_name + dirname_length(coord->file_name);
+
+ uint ident_len = strlen(p);
+ ulong event_len = ident_len + LOG_EVENT_HEADER_LEN;
+ int4store(header + SERVER_ID_OFFSET, server_id);
+ int4store(header + EVENT_LEN_OFFSET, event_len);
+ int2store(header + FLAGS_OFFSET, 0);
+
+ int4store(header + LOG_POS_OFFSET, coord->pos); // log_pos
+
+ packet->append(header, sizeof(header));
+ packet->append(p, ident_len); // log_file_name
+
+ if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
+ net_flush(net))
+ {
+ DBUG_RETURN(-1);
+ }
+ DBUG_RETURN(0);
+}
+
/*
TODO: Clean up loop to only have one call to send_file()
*/
@@ -348,14 +439,19 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
LOG_INFO linfo;
char *log_file_name = linfo.log_file_name;
char search_file_name[FN_REFLEN], *name;
+
+ ulong ev_offset;
+
IO_CACHE log;
File file = -1;
String* packet = &thd->packet;
int error;
const char *errmsg = "Unknown error";
+ char error_text[MAX_SLAVE_ERRMSG]; // to be send to slave via my_message()
NET* net = &thd->net;
- pthread_mutex_t *log_lock;
- bool binlog_can_be_corrupted= FALSE;
+ mysql_mutex_t *log_lock;
+ mysql_cond_t *log_cond;
+
#ifndef DBUG_OFF
int left_events = max_binlog_dump_events;
#endif
@@ -364,6 +460,30 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos));
bzero((char*) &log,sizeof(log));
+ /*
+ heartbeat_period from @master_heartbeat_period user variable
+ */
+ ulonglong heartbeat_period= get_heartbeat_period(thd);
+ struct timespec heartbeat_buf;
+ struct timespec *heartbeat_ts= NULL;
+ const LOG_POS_COORD start_coord= { log_ident, pos },
+ *p_start_coord= &start_coord;
+ LOG_POS_COORD coord_buf= { log_file_name, BIN_LOG_HEADER_SIZE },
+ *p_coord= &coord_buf;
+ if (heartbeat_period != LL(0))
+ {
+ heartbeat_ts= &heartbeat_buf;
+ set_timespec_nsec(*heartbeat_ts, 0);
+ }
+ if (global_system_variables.log_warnings > 1)
+ sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)",
+ thd->server_id, log_ident, (ulong)pos);
+ if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos)))
+ {
+ errmsg= "Failed to run hook 'transmit_start'";
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
#ifndef DBUG_OFF
if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
@@ -402,9 +522,9 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
goto err;
}
- pthread_mutex_lock(&LOCK_thread_count);
+ mysql_mutex_lock(&LOCK_thread_count);
thd->current_linfo = &linfo;
- pthread_mutex_unlock(&LOCK_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0)
{
@@ -419,11 +539,9 @@ impossible position";
goto err;
}
- /*
- We need to start a packet with something other than 255
- to distinguish it from error
- */
- packet->set("\0", 1, &my_charset_bin); /* This is the start of a new packet */
+ /* reset transmit packet for the fake rotate event below */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
/*
Tell the client about the log name with a fake Rotate event;
@@ -463,7 +581,7 @@ impossible position";
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err;
}
- packet->set("\0", 1, &my_charset_bin);
+
/*
Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
this larger than the corresponding packet (query) sent
@@ -476,9 +594,16 @@ impossible position";
mysql_bin_log, and it's already inited, and it will be destroyed
only at shutdown).
*/
- log_lock = mysql_bin_log.get_log_lock();
+ p_coord->pos= pos; // the first hb matches the slave's last seen value
+ log_lock= mysql_bin_log.get_log_lock();
+ log_cond= mysql_bin_log.get_log_cond();
if (pos > BIN_LOG_HEADER_SIZE)
{
+ /* reset transmit packet for the event read from binary log
+ file */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
+
/*
Try to find a Format_description_log_event at the beginning of
the binlog
@@ -486,29 +611,28 @@ impossible position";
if (!(error = Log_event::read_log_event(&log, packet, log_lock)))
{
/*
- The packet has offsets equal to the normal offsets in a binlog
- event +1 (the first character is \0).
+ The packet has offsets equal to the normal offsets in a
+ binlog event + ev_offset (the first ev_offset characters are
+ the header (default \0)).
*/
DBUG_PRINT("info",
("Looked for a Format_description_log_event, found event type %d",
- (*packet)[EVENT_TYPE_OFFSET+1]));
- if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
+ (*packet)[EVENT_TYPE_OFFSET+ev_offset]));
+ if ((*packet)[EVENT_TYPE_OFFSET+ev_offset] == FORMAT_DESCRIPTION_EVENT)
{
- binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
- LOG_EVENT_BINLOG_IN_USE_F);
- (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
+ (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
/*
mark that this event with "log_pos=0", so the slave
should not increment master's binlog position
(rli->group_master_log_pos)
*/
- int4store((char*) packet->ptr()+LOG_POS_OFFSET+1, 0);
+ int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, 0);
/*
if reconnect master sends FD event with `created' as 0
to avoid destroying temp tables.
*/
int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
- ST_CREATED_OFFSET+1, (ulong) 0);
+ ST_CREATED_OFFSET+ev_offset, (ulong) 0);
/* send it */
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
{
@@ -534,8 +658,6 @@ impossible position";
Format_description_log_event will be found naturally if it is written.
*/
}
- /* reset the packet as we wrote to it in any case */
- packet->set("\0", 1, &my_charset_bin);
} /* end of if (pos > BIN_LOG_HEADER_SIZE); */
else
{
@@ -547,10 +669,15 @@ impossible position";
while (!net->error && net->vio != 0 && !thd->killed)
{
- my_off_t prev_pos= pos;
- while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
+ Log_event_type event_type= UNKNOWN_EVENT;
+
+ /* reset the transmit packet for the event read from binary log
+ file */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
+
+ while (!(error= Log_event::read_log_event(&log, packet, log_lock)))
{
- prev_pos= my_b_tell(&log);
#ifndef DBUG_OFF
if (max_binlog_dump_events && !left_events--)
{
@@ -560,10 +687,15 @@ impossible position";
goto err;
}
#endif
+ /*
+ log's filename does not change while it's active
+ */
+ p_coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET);
+ event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
{
- if ((*packet)[EVENT_TYPE_OFFSET+1] == XID_EVENT)
+ if (event_type == XID_EVENT)
{
net_flush(net);
const char act[]=
@@ -574,15 +706,18 @@ impossible position";
STRING_WITH_LEN(act)));
}
});
-
- if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
+ if (event_type == FORMAT_DESCRIPTION_EVENT)
+ {
+ (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
+ }
+ pos = my_b_tell(&log);
+ if (RUN_HOOK(binlog_transmit, before_send_event,
+ (thd, flags, packet, log_file_name, pos)))
{
- binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
- LOG_EVENT_BINLOG_IN_USE_F);
- (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
+ my_errno= ER_UNKNOWN_ERROR;
+ errmsg= "run 'before_send_event' hook failed";
+ goto err;
}
- else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
- binlog_can_be_corrupted= FALSE;
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
{
@@ -593,15 +728,14 @@ impossible position";
DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
{
- if ((*packet)[EVENT_TYPE_OFFSET+1] == XID_EVENT)
+ if (event_type == XID_EVENT)
{
net_flush(net);
}
});
- DBUG_PRINT("info", ("log event code %d",
- (*packet)[LOG_EVENT_OFFSET+1] ));
- if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
+ DBUG_PRINT("info", ("log event code %d", event_type));
+ if (event_type == LOAD_EVENT)
{
if (send_file(thd))
{
@@ -610,18 +744,17 @@ impossible position";
goto err;
}
}
- packet->set("\0", 1, &my_charset_bin);
- }
- /*
- here we were reading binlog that was not closed properly (as a result
- of a crash ?). treat any corruption as EOF
- */
- if (binlog_can_be_corrupted &&
- error != LOG_READ_MEM && error != LOG_READ_EOF)
- {
- my_b_seek(&log, prev_pos);
- error=LOG_READ_EOF;
+ if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
+ {
+ errmsg= "Failed to run hook 'after_send_event'";
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
+
+ /* reset transmit packet for next loop */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
}
/*
@@ -666,6 +799,11 @@ impossible position";
}
#endif
+ /* reset the transmit packet for the event read from binary log
+ file */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
+
/*
No one will update the log while we are reading
now, but we'll be quick and just read one record
@@ -676,40 +814,96 @@ impossible position";
has not been updated since last read.
*/
- pthread_mutex_lock(log_lock);
- switch (error= Log_event::read_log_event(&log, packet, (pthread_mutex_t*) 0)) {
+ mysql_mutex_lock(log_lock);
+ switch (error= Log_event::read_log_event(&log, packet, (mysql_mutex_t*) 0)) {
case 0:
/* we read successfully, so we'll need to send it to the slave */
- pthread_mutex_unlock(log_lock);
+ mysql_mutex_unlock(log_lock);
read_packet = 1;
+ p_coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET);
+ event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
break;
case LOG_READ_EOF:
+ {
+ int ret;
+ ulong signal_cnt;
DBUG_PRINT("wait",("waiting for data in binary log"));
if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0)
{
- pthread_mutex_unlock(log_lock);
+ mysql_mutex_unlock(log_lock);
goto end;
}
- if (!thd->killed)
- {
- /* Note that the following call unlocks lock_log */
- mysql_bin_log.wait_for_update(thd, 0);
- }
- else
- pthread_mutex_unlock(log_lock);
- DBUG_PRINT("wait",("binary log received update"));
- break;
- default:
- pthread_mutex_unlock(log_lock);
+#ifndef DBUG_OFF
+ ulong hb_info_counter= 0;
+#endif
+ const char* old_msg= thd->proc_info;
+ signal_cnt= mysql_bin_log.signal_cnt;
+ do
+ {
+ if (heartbeat_period != 0)
+ {
+ DBUG_ASSERT(heartbeat_ts);
+ set_timespec_nsec(*heartbeat_ts, heartbeat_period);
+ }
+ thd->enter_cond(log_cond, log_lock,
+ "Master has sent all binlog to slave; "
+ "waiting for binlog to be updated");
+ ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts);
+ DBUG_ASSERT(ret == 0 || (heartbeat_period != 0));
+ if (ret == ETIMEDOUT || ret == ETIME)
+ {
+#ifndef DBUG_OFF
+ if (hb_info_counter < 3)
+ {
+ sql_print_information("master sends heartbeat message");
+ hb_info_counter++;
+ if (hb_info_counter == 3)
+ sql_print_information("the rest of heartbeat info skipped ...");
+ }
+#endif
+ /* reset transmit packet for the heartbeat event */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ {
+ thd->exit_cond(old_msg);
+ goto err;
+ }
+ if (send_heartbeat_event(net, packet, p_coord))
+ {
+ errmsg = "Failed on my_net_write()";
+ my_errno= ER_UNKNOWN_ERROR;
+ thd->exit_cond(old_msg);
+ goto err;
+ }
+ }
+ else
+ {
+ DBUG_PRINT("wait",("binary log received update or a broadcast signal caught"));
+ }
+ } while (signal_cnt == mysql_bin_log.signal_cnt && !thd->killed);
+ thd->exit_cond(old_msg);
+ }
+ break;
+
+ default:
+ mysql_mutex_unlock(log_lock);
test_for_non_eof_log_read_errors(error, &errmsg);
goto err;
}
if (read_packet)
- {
- thd_proc_info(thd, "Sending binlog event to slave");
+ {
+ thd_proc_info(thd, "Sending binlog event to slave");
+ pos = my_b_tell(&log);
+ if (RUN_HOOK(binlog_transmit, before_send_event,
+ (thd, flags, packet, log_file_name, pos)))
+ {
+ my_errno= ER_UNKNOWN_ERROR;
+ errmsg= "run 'before_send_event' hook failed";
+ goto err;
+ }
+
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
{
errmsg = "Failed on my_net_write()";
@@ -717,7 +911,7 @@ impossible position";
goto err;
}
- if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
+ if (event_type == LOAD_EVENT)
{
if (send_file(thd))
{
@@ -726,11 +920,13 @@ impossible position";
goto err;
}
}
- packet->set("\0", 1, &my_charset_bin);
- /*
- No need to net_flush because we will get to flush later when
- we hit EOF pretty quick
- */
+
+ if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
+ {
+ my_errno= ER_UNKNOWN_ERROR;
+ errmsg= "Failed to run hook 'after_send_event'";
+ goto err;
+ }
}
log.error=0;
@@ -761,8 +957,12 @@ impossible position";
break;
end_io_cache(&log);
- (void) my_close(file, MYF(MY_WME));
+ mysql_file_close(file, MYF(MY_WME));
+ /* reset transmit packet for the possible fake rotate event */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
+
/*
Call fake_rotate_event() in case the previous log (the one which
we have just finished reading) did not contain a Rotate event
@@ -780,26 +980,44 @@ impossible position";
goto err;
}
- packet->length(0);
- packet->append('\0');
+ p_coord->file_name= log_file_name; // reset to the next
}
}
end:
end_io_cache(&log);
- (void)my_close(file, MYF(MY_WME));
+ mysql_file_close(file, MYF(MY_WME));
+ RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
my_eof(thd);
thd_proc_info(thd, "Waiting to finalize termination");
- pthread_mutex_lock(&LOCK_thread_count);
+ mysql_mutex_lock(&LOCK_thread_count);
thd->current_linfo = 0;
- pthread_mutex_unlock(&LOCK_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
thd->variables.max_allowed_packet= old_max_allowed_packet;
DBUG_VOID_RETURN;
err:
thd_proc_info(thd, "Waiting to finalize termination");
+ if (my_errno == ER_MASTER_FATAL_ERROR_READING_BINLOG && my_b_inited(&log))
+ {
+ /*
+ detailing the fatal error message with coordinates
+ of the last position read.
+ */
+ my_snprintf(error_text, sizeof(error_text),
+ "%s; the first event '%s' at %lld, "
+ "the last event read from '%s' at %lld, "
+ "the last byte read from '%s' at %lld.",
+ errmsg,
+ p_start_coord->file_name, p_start_coord->pos,
+ p_coord->file_name, p_coord->pos,
+ log_file_name, my_b_tell(&log));
+ }
+ else
+ strcpy(error_text, errmsg);
end_io_cache(&log);
+ RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
/*
Exclude iteration through thread list
this is needed for purge_logs() - it will iterate through
@@ -807,14 +1025,14 @@ err:
this mutex will make sure that it never tried to update our linfo
after we return from this stack frame
*/
- pthread_mutex_lock(&LOCK_thread_count);
+ mysql_mutex_lock(&LOCK_thread_count);
thd->current_linfo = 0;
- pthread_mutex_unlock(&LOCK_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
if (file >= 0)
- (void) my_close(file, MYF(MY_WME));
+ mysql_file_close(file, MYF(MY_WME));
thd->variables.max_allowed_packet= old_max_allowed_packet;
- my_message(my_errno, errmsg, MYF(0));
+ my_message(my_errno, error_text, MYF(0));
DBUG_VOID_RETURN;
}
@@ -827,7 +1045,7 @@ err:
@param mi Pointer to Master_info object for the slave's IO thread.
- @param net_report If true, saves the exit status into thd->main_da.
+ @param net_report If true, saves the exit status into thd->stmt_da.
@retval 0 success
@retval 1 error
@@ -838,7 +1056,7 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
int thread_mask;
DBUG_ENTER("start_slave");
- if (check_access(thd, SUPER_ACL, any_db,0,0,0,0))
+ if (check_access(thd, SUPER_ACL, any_db, NULL, NULL, 0, 0))
DBUG_RETURN(1);
lock_slave_threads(mi); // this allows us to cleanly read slave_running
// Get a mask of _stopped_ threads
@@ -865,7 +1083,7 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
*/
if (thread_mask & SLAVE_SQL)
{
- pthread_mutex_lock(&mi->rli.data_lock);
+ mysql_mutex_lock(&mi->rli.data_lock);
if (thd->lex->mi.pos)
{
@@ -923,7 +1141,7 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
ER(ER_MISSING_SKIP_SLAVE));
}
- pthread_mutex_unlock(&mi->rli.data_lock);
+ mysql_mutex_unlock(&mi->rli.data_lock);
}
else if (thd->lex->mi.pos || thd->lex->mi.relay_log_pos)
push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
@@ -969,7 +1187,7 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
@param mi Pointer to Master_info object for the slave's IO thread.
- @param net_report If true, saves the exit status into thd->main_da.
+ @param net_report If true, saves the exit status into thd->stmt_da.
@retval 0 success
@retval 1 error
@@ -982,7 +1200,7 @@ int stop_slave(THD* thd, Master_info* mi, bool net_report )
if (!thd)
thd = current_thd;
- if (check_access(thd, SUPER_ACL, any_db,0,0,0,0))
+ if (check_access(thd, SUPER_ACL, any_db, NULL, NULL, 0, 0))
DBUG_RETURN(1);
thd_proc_info(thd, "Killing slave");
int thread_mask;
@@ -1066,14 +1284,9 @@ int reset_slave(THD *thd, Master_info* mi)
goto err;
}
- /*
- Clear master's log coordinates and reset host/user/etc to the values
- specified in mysqld's options (only for good display of SHOW SLAVE STATUS;
- next init_master_info() (in start_slave() for example) would have set them
- the same way; but here this is for the case where the user does SHOW SLAVE
- STATUS; before doing START SLAVE;
- */
- init_master_info_with_options(mi);
+ /* Clear master's log coordinates and associated information */
+ mi->clear_in_memory_info(thd->lex->reset_slave_info.all);
+
/*
Reset errors (the idea is that we forget about the
old master).
@@ -1086,19 +1299,22 @@ int reset_slave(THD *thd, Master_info* mi)
end_master_info(mi);
// and delete these two files
fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
- if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME)))
+ if (mysql_file_stat(key_file_master_info, fname, &stat_area, MYF(0)) &&
+ mysql_file_delete(key_file_master_info, fname, MYF(MY_WME)))
{
error=1;
goto err;
}
// delete relay_log_info_file
fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32);
- if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME)))
+ if (mysql_file_stat(key_file_relay_log_info, fname, &stat_area, MYF(0)) &&
+ mysql_file_delete(key_file_relay_log_info, fname, MYF(MY_WME)))
{
error=1;
goto err;
}
+ RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi));
err:
unlock_slave_threads(mi);
if (error)
@@ -1110,7 +1326,7 @@ err:
Kill all Binlog_dump threads which previously talked to the same slave
("same" means with the same server id). Indeed, if the slave stops, if the
- Binlog_dump thread is waiting (pthread_cond_wait) for binlog update, then it
+ Binlog_dump thread is waiting (mysql_cond_wait) for binlog update, then it
will keep existing until a query is written to the binlog. If the master is
idle, then this could last long, and if the slave reconnects, we could have 2
Binlog_dump threads in SHOW PROCESSLIST, until a query is written to the
@@ -1128,7 +1344,7 @@ err:
void kill_zombie_dump_threads(uint32 slave_server_id)
{
- pthread_mutex_lock(&LOCK_thread_count);
+ mysql_mutex_lock(&LOCK_thread_count);
I_List_iterator<THD> it(threads);
THD *tmp;
@@ -1137,11 +1353,11 @@ void kill_zombie_dump_threads(uint32 slave_server_id)
if (tmp->command == COM_BINLOG_DUMP &&
tmp->server_id == slave_server_id)
{
- pthread_mutex_lock(&tmp->LOCK_thd_kill); // Lock from delete
+ mysql_mutex_lock(&tmp->LOCK_thd_data); // Lock from delete
break;
}
}
- pthread_mutex_unlock(&LOCK_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
if (tmp)
{
/*
@@ -1150,7 +1366,7 @@ void kill_zombie_dump_threads(uint32 slave_server_id)
again. We just to do kill the thread ourselves.
*/
tmp->awake(THD::KILL_QUERY);
- pthread_mutex_unlock(&tmp->LOCK_thd_kill);
+ mysql_mutex_unlock(&tmp->LOCK_thd_data);
}
}
@@ -1172,6 +1388,7 @@ bool change_master(THD* thd, Master_info* mi)
int thread_mask;
const char* errmsg= 0;
bool need_relay_log_purge= 1;
+ bool ret= FALSE;
char saved_host[HOSTNAME_LENGTH + 1];
uint saved_port;
char saved_log_name[FN_REFLEN];
@@ -1180,22 +1397,35 @@ bool change_master(THD* thd, Master_info* mi)
lock_slave_threads(mi);
init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
+ LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
if (thread_mask) // We refuse if any slave thread is running
{
my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
- unlock_slave_threads(mi);
- DBUG_RETURN(TRUE);
+ ret= TRUE;
+ goto err;
}
thd_proc_info(thd, "Changing master");
- LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
+ /*
+ We need to check if there is an empty master_host. Otherwise
+ change master succeeds, a master.info file is created containing
+ empty master_host string and when issuing: start slave; an error
+ is thrown stating that the server is not configured as slave.
+ (See BUG#28796).
+ */
+ if(lex_mi->host && !*lex_mi->host)
+ {
+ my_error(ER_WRONG_ARGUMENTS, MYF(0), "MASTER_HOST");
+ unlock_slave_threads(mi);
+ DBUG_RETURN(TRUE);
+ }
// TODO: see if needs re-write
if (init_master_info(mi, master_info_file, relay_log_info_file, 0,
thread_mask))
{
my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
- unlock_slave_threads(mi);
- DBUG_RETURN(TRUE);
+ ret= TRUE;
+ goto err;
}
/*
@@ -1242,13 +1472,46 @@ bool change_master(THD* thd, Master_info* mi)
mi->port = lex_mi->port;
if (lex_mi->connect_retry)
mi->connect_retry = lex_mi->connect_retry;
+ if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
+ mi->heartbeat_period = lex_mi->heartbeat_period;
+ else
+ mi->heartbeat_period= (float) min(SLAVE_MAX_HEARTBEAT_PERIOD,
+ (slave_net_timeout/2.0));
+ mi->received_heartbeats= LL(0); // counter lives until master is CHANGEd
+ /*
+ reset the last time server_id list if the current CHANGE MASTER
+ is mentioning IGNORE_SERVER_IDS= (...)
+ */
+ if (lex_mi->repl_ignore_server_ids_opt == LEX_MASTER_INFO::LEX_MI_ENABLE)
+ reset_dynamic(&mi->ignore_server_ids);
+ for (uint i= 0; i < lex_mi->repl_ignore_server_ids.elements; i++)
+ {
+ ulong s_id;
+ get_dynamic(&lex_mi->repl_ignore_server_ids, (uchar*) &s_id, i);
+ if (s_id == ::server_id && replicate_same_server_id)
+ {
+ my_error(ER_SLAVE_IGNORE_SERVER_IDS, MYF(0), static_cast<int>(s_id));
+ ret= TRUE;
+ goto err;
+ }
+ else
+ {
+ if (bsearch((const ulong *) &s_id,
+ mi->ignore_server_ids.buffer,
+ mi->ignore_server_ids.elements, sizeof(ulong),
+ (int (*) (const void*, const void*))
+ change_master_server_id_cmp) == NULL)
+ insert_dynamic(&mi->ignore_server_ids, (uchar*) &s_id);
+ }
+ }
+ sort_dynamic(&mi->ignore_server_ids, (qsort_cmp) change_master_server_id_cmp);
- if (lex_mi->ssl != LEX_MASTER_INFO::SSL_UNCHANGED)
- mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::SSL_ENABLE);
+ if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
+ mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
- if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::SSL_UNCHANGED)
+ if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
mi->ssl_verify_server_cert=
- (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::SSL_ENABLE);
+ (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE);
if (lex_mi->ssl_ca)
strmake(mi->ssl_ca, lex_mi->ssl_ca, sizeof(mi->ssl_ca)-1);
@@ -1271,9 +1534,11 @@ bool change_master(THD* thd, Master_info* mi)
if (lex_mi->relay_log_name)
{
need_relay_log_purge= 0;
- strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name,
+ char relay_log_name[FN_REFLEN];
+ mi->rli.relay_log.make_log_name(relay_log_name, lex_mi->relay_log_name);
+ strmake(mi->rli.group_relay_log_name, relay_log_name,
sizeof(mi->rli.group_relay_log_name)-1);
- strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name,
+ strmake(mi->rli.event_relay_log_name, relay_log_name,
sizeof(mi->rli.event_relay_log_name)-1);
}
@@ -1320,8 +1585,8 @@ bool change_master(THD* thd, Master_info* mi)
if (flush_master_info(mi, FALSE, FALSE))
{
my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
- unlock_slave_threads(mi);
- DBUG_RETURN(TRUE);
+ ret= TRUE;
+ goto err;
}
if (need_relay_log_purge)
{
@@ -1332,8 +1597,8 @@ bool change_master(THD* thd, Master_info* mi)
&errmsg))
{
my_error(ER_RELAY_LOG_FAIL, MYF(0), errmsg);
- unlock_slave_threads(mi);
- DBUG_RETURN(TRUE);
+ ret= TRUE;
+ goto err;
}
}
else
@@ -1348,8 +1613,8 @@ bool change_master(THD* thd, Master_info* mi)
&msg, 0))
{
my_error(ER_RELAY_LOG_INIT, MYF(0), msg);
- unlock_slave_threads(mi);
- DBUG_RETURN(TRUE);
+ ret= TRUE;
+ goto err;
}
}
/*
@@ -1370,7 +1635,7 @@ bool change_master(THD* thd, Master_info* mi)
if (!mi->rli.group_master_log_name[0]) // uninitialized case
mi->rli.group_master_log_pos=0;
- pthread_mutex_lock(&mi->rli.data_lock);
+ mysql_mutex_lock(&mi->rli.data_lock);
mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */
/* Clear the errors, for a clean start */
mi->rli.clear_error();
@@ -1392,13 +1657,15 @@ bool change_master(THD* thd, Master_info* mi)
not exist anymore).
*/
flush_relay_log_info(&mi->rli);
- pthread_cond_broadcast(&mi->data_cond);
- pthread_mutex_unlock(&mi->rli.data_lock);
+ mysql_cond_broadcast(&mi->data_cond);
+ mysql_mutex_unlock(&mi->rli.data_lock);
+err:
unlock_slave_threads(mi);
thd_proc_info(thd, 0);
- my_ok(thd);
- DBUG_RETURN(FALSE);
+ if (ret == FALSE)
+ my_ok(thd);
+ DBUG_RETURN(ret);
}
@@ -1419,24 +1686,11 @@ int reset_master(THD* thd)
ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
return 1;
}
- return mysql_bin_log.reset_logs(thd);
-}
-
-int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
- const char* log_file_name2, ulonglong log_pos2)
-{
- int res;
- size_t log_file_name1_len= strlen(log_file_name1);
- size_t log_file_name2_len= strlen(log_file_name2);
- // We assume that both log names match up to '.'
- if (log_file_name1_len == log_file_name2_len)
- {
- if ((res= strcmp(log_file_name1, log_file_name2)))
- return res;
- return (log_pos1 < log_pos2) ? -1 : (log_pos1 == log_pos2) ? 0 : 1;
- }
- return ((log_file_name1_len < log_file_name2_len) ? -1 : 1);
+ if (mysql_bin_log.reset_logs(thd))
+ return 1;
+ RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */));
+ return 0;
}
@@ -1457,27 +1711,44 @@ bool mysql_show_binlog_events(THD* thd)
bool ret = TRUE;
IO_CACHE log;
File file = -1;
+ MYSQL_BIN_LOG *binary_log= NULL;
int old_max_allowed_packet= thd->variables.max_allowed_packet;
LOG_INFO linfo;
DBUG_ENTER("mysql_show_binlog_events");
Log_event::init_show_field_list(&field_list);
- if (protocol->send_fields(&field_list,
+ if (protocol->send_result_set_metadata(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(TRUE);
Format_description_log_event *description_event= new
Format_description_log_event(3); /* MySQL 4.0 by default */
- /*
- Wait for handlers to insert any pending information
- into the binlog. For e.g. ndb which updates the binlog asynchronously
- this is needed so that the uses sees all its own commands in the binlog
- */
- ha_binlog_wait(thd);
+ DBUG_ASSERT(thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS ||
+ thd->lex->sql_command == SQLCOM_SHOW_RELAYLOG_EVENTS);
- if (mysql_bin_log.is_open())
+ /* select wich binary log to use: binlog or relay */
+ if ( thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS )
+ {
+ /*
+ Wait for handlers to insert any pending information
+ into the binlog. For e.g. ndb which updates the binlog asynchronously
+ this is needed so that the uses sees all its own commands in the binlog
+ */
+ ha_binlog_wait(thd);
+
+ binary_log= &mysql_bin_log;
+ }
+ else /* showing relay log contents */
+ {
+ if (!active_mi)
+ DBUG_RETURN(TRUE);
+
+ binary_log= &(active_mi->rli.relay_log);
+ }
+
+ if (binary_log->is_open())
{
LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
SELECT_LEX_UNIT *unit= &thd->lex->unit;
@@ -1485,7 +1756,7 @@ bool mysql_show_binlog_events(THD* thd)
my_off_t pos = max(BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
char search_file_name[FN_REFLEN], *name;
const char *log_file_name = lex_mi->log_file_name;
- pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
+ mysql_mutex_t *log_lock = binary_log->get_log_lock();
Log_event* ev;
unit->set_limit(thd->lex->current_select);
@@ -1494,21 +1765,21 @@ bool mysql_show_binlog_events(THD* thd)
name= search_file_name;
if (log_file_name)
- mysql_bin_log.make_log_name(search_file_name, log_file_name);
+ binary_log->make_log_name(search_file_name, log_file_name);
else
name=0; // Find first log
linfo.index_file_offset = 0;
- if (mysql_bin_log.find_log_pos(&linfo, name, 1))
+ if (binary_log->find_log_pos(&linfo, name, 1))
{
errmsg = "Could not find target log";
goto err;
}
- pthread_mutex_lock(&LOCK_thread_count);
+ mysql_mutex_lock(&LOCK_thread_count);
thd->current_linfo = &linfo;
- pthread_mutex_unlock(&LOCK_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0)
goto err;
@@ -1518,7 +1789,7 @@ bool mysql_show_binlog_events(THD* thd)
*/
thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER;
- pthread_mutex_lock(log_lock);
+ mysql_mutex_lock(log_lock);
/*
open_binlog() sought to position 4.
@@ -1528,7 +1799,7 @@ bool mysql_show_binlog_events(THD* thd)
This code will fail on a mixed relay log (one which has Format_desc then
Rotate then Format_desc).
*/
- ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event);
+ ev= Log_event::read_log_event(&log, (mysql_mutex_t*)0, description_event);
if (ev)
{
if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
@@ -1549,7 +1820,7 @@ bool mysql_show_binlog_events(THD* thd)
}
for (event_count = 0;
- (ev = Log_event::read_log_event(&log,(pthread_mutex_t*) 0,
+ (ev = Log_event::read_log_event(&log, (mysql_mutex_t*) 0,
description_event)); )
{
if (event_count >= limit_start &&
@@ -1557,7 +1828,7 @@ bool mysql_show_binlog_events(THD* thd)
{
errmsg = "Net error";
delete ev;
- pthread_mutex_unlock(log_lock);
+ mysql_mutex_unlock(log_lock);
goto err;
}
@@ -1571,11 +1842,11 @@ bool mysql_show_binlog_events(THD* thd)
if (event_count < limit_end && log.error)
{
errmsg = "Wrong offset or I/O error";
- pthread_mutex_unlock(log_lock);
+ mysql_mutex_unlock(log_lock);
goto err;
}
- pthread_mutex_unlock(log_lock);
+ mysql_mutex_unlock(log_lock);
}
// Check that linfo is still on the function scope.
DEBUG_SYNC(thd, "after_show_binlog_events");
@@ -1587,7 +1858,7 @@ err:
if (file >= 0)
{
end_io_cache(&log);
- (void) my_close(file, MYF(MY_WME));
+ mysql_file_close(file, MYF(MY_WME));
}
if (errmsg)
@@ -1596,9 +1867,9 @@ err:
else
my_eof(thd);
- pthread_mutex_lock(&LOCK_thread_count);
+ mysql_mutex_lock(&LOCK_thread_count);
thd->current_linfo = 0;
- pthread_mutex_unlock(&LOCK_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
thd->variables.max_allowed_packet= old_max_allowed_packet;
DBUG_RETURN(ret);
}
@@ -1624,7 +1895,7 @@ bool show_binlog_info(THD* thd)
field_list.push_back(new Item_empty_string("Binlog_Do_DB",255));
field_list.push_back(new Item_empty_string("Binlog_Ignore_DB",255));
- if (protocol->send_fields(&field_list,
+ if (protocol->send_result_set_metadata(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(TRUE);
protocol->prepare_for_resend();
@@ -1669,23 +1940,23 @@ bool show_binlogs(THD* thd)
if (!mysql_bin_log.is_open())
{
- my_message(ER_NO_BINARY_LOGGING, ER(ER_NO_BINARY_LOGGING), MYF(0));
+ my_error(ER_NO_BINARY_LOGGING, MYF(0));
DBUG_RETURN(TRUE);
}
field_list.push_back(new Item_empty_string("Log_name", 255));
field_list.push_back(new Item_return_int("File_size", 20,
MYSQL_TYPE_LONGLONG));
- if (protocol->send_fields(&field_list,
+ if (protocol->send_result_set_metadata(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(TRUE);
- pthread_mutex_lock(mysql_bin_log.get_log_lock());
+ mysql_mutex_lock(mysql_bin_log.get_log_lock());
mysql_bin_log.lock_index();
index_file=mysql_bin_log.get_index_file();
mysql_bin_log.raw_get_current_log(&cur); // dont take mutex
- pthread_mutex_unlock(mysql_bin_log.get_log_lock()); // lockdep, OK
+ mysql_mutex_unlock(mysql_bin_log.get_log_lock()); // lockdep, OK
cur_dir_len= dirname_length(cur.log_file_name);
@@ -1708,11 +1979,12 @@ bool show_binlogs(THD* thd)
else
{
/* this is an old log, open it and find the size */
- if ((file= my_open(fname, O_RDONLY | O_SHARE | O_BINARY,
- MYF(0))) >= 0)
+ if ((file= mysql_file_open(key_file_binlog,
+ fname, O_RDONLY | O_SHARE | O_BINARY,
+ MYF(0))) >= 0)
{
- file_length= (ulonglong) my_seek(file, 0L, MY_SEEK_END, MYF(0));
- my_close(file, MYF(0));
+ file_length= (ulonglong) mysql_file_seek(file, 0L, MY_SEEK_END, MYF(0));
+ mysql_file_close(file, MYF(0));
}
}
protocol->store(file_length);
@@ -1747,7 +2019,7 @@ int log_loaded_block(IO_CACHE* file)
uchar* buffer= (uchar*) my_b_get_buffer_start(file);
uint max_event_size= current_thd->variables.max_allowed_packet;
lf_info= (LOAD_FILE_INFO*) file->arg;
- if (lf_info->thd->current_stmt_binlog_row_based)
+ if (lf_info->thd->is_current_stmt_binlog_format_row())
DBUG_RETURN(0);
if (lf_info->last_pos_in_file != HA_POS_ERROR &&
lf_info->last_pos_in_file >= my_b_get_pos_in_file(file))
@@ -1780,124 +2052,4 @@ int log_loaded_block(IO_CACHE* file)
DBUG_RETURN(0);
}
-/*
- Replication System Variables
-*/
-
-class sys_var_slave_skip_counter :public sys_var
-{
-public:
- sys_var_slave_skip_counter(sys_var_chain *chain, const char *name_arg)
- :sys_var(name_arg)
- { chain_sys_var(chain); }
- bool check(THD *thd, set_var *var);
- bool update(THD *thd, set_var *var);
- bool check_type(enum_var_type type) { return type != OPT_GLOBAL; }
- /*
- We can't retrieve the value of this, so we don't have to define
- type() or value_ptr()
- */
-};
-
-class sys_var_sync_binlog_period :public sys_var_long_ptr
-{
-public:
- sys_var_sync_binlog_period(sys_var_chain *chain, const char *name_arg,
- ulong *value_ptr)
- :sys_var_long_ptr(chain, name_arg,value_ptr) {}
- bool update(THD *thd, set_var *var);
-};
-
-static sys_var_chain vars = { NULL, NULL };
-
-static sys_var_const sys_log_slave_updates(&vars, "log_slave_updates",
- OPT_GLOBAL, SHOW_MY_BOOL,
- (uchar*) &opt_log_slave_updates);
-static sys_var_const sys_relay_log(&vars, "relay_log",
- OPT_GLOBAL, SHOW_CHAR_PTR,
- (uchar*) &opt_relay_logname);
-static sys_var_const sys_relay_log_index(&vars, "relay_log_index",
- OPT_GLOBAL, SHOW_CHAR_PTR,
- (uchar*) &opt_relaylog_index_name);
-static sys_var_const sys_relay_log_info_file(&vars, "relay_log_info_file",
- OPT_GLOBAL, SHOW_CHAR_PTR,
- (uchar*) &relay_log_info_file);
-static sys_var_bool_ptr sys_relay_log_purge(&vars, "relay_log_purge",
- &relay_log_purge);
-static sys_var_const sys_relay_log_space_limit(&vars,
- "relay_log_space_limit",
- OPT_GLOBAL, SHOW_LONGLONG,
- (uchar*)
- &relay_log_space_limit);
-static sys_var_const sys_slave_load_tmpdir(&vars, "slave_load_tmpdir",
- OPT_GLOBAL, SHOW_CHAR_PTR,
- (uchar*) &slave_load_tmpdir);
-static sys_var_long_ptr sys_slave_net_timeout(&vars, "slave_net_timeout",
- &slave_net_timeout);
-static sys_var_const sys_slave_skip_errors(&vars, "slave_skip_errors",
- OPT_GLOBAL, SHOW_CHAR,
- (uchar*) slave_skip_error_names);
-static sys_var_long_ptr sys_slave_trans_retries(&vars, "slave_transaction_retries",
- &slave_trans_retries);
-static sys_var_sync_binlog_period sys_sync_binlog_period(&vars, "sync_binlog", &sync_binlog_period);
-static sys_var_slave_skip_counter sys_slave_skip_counter(&vars, "sql_slave_skip_counter");
-
-
-bool sys_var_slave_skip_counter::check(THD *thd, set_var *var)
-{
- int result= 0;
- pthread_mutex_lock(&LOCK_active_mi);
- pthread_mutex_lock(&active_mi->rli.run_lock);
- if (active_mi->rli.slave_running)
- {
- my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
- result=1;
- }
- pthread_mutex_unlock(&active_mi->rli.run_lock);
- pthread_mutex_unlock(&LOCK_active_mi);
- var->save_result.ulong_value= (ulong) var->value->val_int();
- return result;
-}
-
-
-bool sys_var_slave_skip_counter::update(THD *thd, set_var *var)
-{
- pthread_mutex_lock(&LOCK_active_mi);
- pthread_mutex_lock(&active_mi->rli.run_lock);
- /*
- The following test should normally never be true as we test this
- in the check function; To be safe against multiple
- SQL_SLAVE_SKIP_COUNTER request, we do the check anyway
- */
- if (!active_mi->rli.slave_running)
- {
- pthread_mutex_lock(&active_mi->rli.data_lock);
- active_mi->rli.slave_skip_counter= var->save_result.ulong_value;
- pthread_mutex_unlock(&active_mi->rli.data_lock);
- }
- pthread_mutex_unlock(&active_mi->rli.run_lock);
- pthread_mutex_unlock(&LOCK_active_mi);
- return 0;
-}
-
-
-bool sys_var_sync_binlog_period::update(THD *thd, set_var *var)
-{
- sync_binlog_period= (ulong) var->save_result.ulonglong_value;
- return 0;
-}
-
-int init_replication_sys_vars()
-{
- if (mysql_add_sys_var_chain(vars.first, my_long_options))
- {
- /* should not happen */
- fprintf(stderr, "failed to initialize replication system variables");
- unireg_abort(1);
- }
- return 0;
-}
-
#endif /* HAVE_REPLICATION */
-
-