summaryrefslogtreecommitdiff
path: root/sql/sql_repl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r--sql/sql_repl.cc411
1 files changed, 274 insertions, 137 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index cff36eaa388..32c5f0bfdab 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -17,14 +17,12 @@
// Sasha Pachev <sasha@mysql.com> is currently in charge of this file
#include "mysql_priv.h"
+#ifdef HAVE_REPLICATION
+
#include "sql_repl.h"
#include "sql_acl.h"
#include "log_event.h"
-#include "mini_client.h"
#include <my_dir.h>
-#include <assert.h>
-
-extern const char* any_db;
int max_binlog_dump_events = 0; // unlimited
my_bool opt_sporadic_binlog_dump_fail = 0;
@@ -262,44 +260,56 @@ bool log_in_use(const char* log_name)
return result;
}
-
-int purge_master_logs(THD* thd, const char* to_log)
+int purge_error_message(THD* thd, int res)
{
- char search_file_name[FN_REFLEN];
- const char* errmsg = 0;
- int res;
-
- if (!mysql_bin_log.is_open())
- goto end;
+ const char *errmsg= 0;
- mysql_bin_log.make_log_name(search_file_name, to_log);
- res = mysql_bin_log.purge_logs(thd, search_file_name);
-
- switch(res) {
+ switch (res) {
case 0: break;
- case LOG_INFO_EOF: errmsg = "Target log not found in binlog index"; break;
- case LOG_INFO_IO: errmsg = "I/O error reading log index file"; break;
- case LOG_INFO_INVALID: errmsg = "Server configuration does not permit \
-binlog purge"; break;
- case LOG_INFO_SEEK: errmsg = "Failed on fseek()"; break;
- case LOG_INFO_MEM: errmsg = "Out of memory"; break;
- case LOG_INFO_FATAL: errmsg = "Fatal error during purge"; break;
- case LOG_INFO_IN_USE: errmsg = "A purgeable log is in use, will not purge";
+ case LOG_INFO_EOF: errmsg= "Target log not found in binlog index"; break;
+ case LOG_INFO_IO: errmsg= "I/O error reading log index file"; break;
+ case LOG_INFO_INVALID:
+ errmsg= "Server configuration does not permit binlog purge"; break;
+ case LOG_INFO_SEEK: errmsg= "Failed on fseek()"; break;
+ case LOG_INFO_MEM: errmsg= "Out of memory"; break;
+ case LOG_INFO_FATAL: errmsg= "Fatal error during purge"; break;
+ case LOG_INFO_IN_USE: errmsg= "A purgeable log is in use, will not purge";
break;
- default: errmsg = "Unknown error during purge"; break;
+ default: errmsg= "Unknown error during purge"; break;
}
if (errmsg)
{
- send_error(&thd->net, 0, errmsg);
+ send_error(thd, 0, errmsg);
return 1;
}
-
-end:
- send_ok(&thd->net);
+ send_ok(thd);
return 0;
}
+
+int purge_master_logs(THD* thd, const char* to_log)
+{
+ char search_file_name[FN_REFLEN];
+ if (!mysql_bin_log.is_open())
+ {
+ send_ok(current_thd);
+ return 0;
+ }
+
+ mysql_bin_log.make_log_name(search_file_name, to_log);
+ return purge_error_message(thd,
+ mysql_bin_log.purge_logs(search_file_name, 0, 1,
+ 1, NULL));
+}
+
+
+int purge_master_logs_before_date(THD* thd, time_t purge_time)
+{
+ int res = mysql_bin_log.purge_logs_before_date(purge_time);
+ return purge_error_message(thd ,res);
+}
+
/*
TODO: Clean up loop to only have one call to send_file()
*/
@@ -380,32 +390,33 @@ impossible position";
We need to start a packet with something other than 255
to distiquish it from error
*/
- packet->set("\0", 1);
+ packet->set("\0", 1, &my_charset_bin);
/*
Before 4.0.14 we called fake_rotate_event below only if
(pos == BIN_LOG_HEADER_SIZE), because if this is false then the slave
already knows the binlog's name.
Now we always call fake_rotate_event; if the slave already knew the log's
- name (ex: CHANGE MASTER TO MASTER_LOG_FILE=...) this is useless but does not
- harm much. It is nice for 3.23 (>=.58) slaves which test Rotate events
+ name (ex: CHANGE MASTER TO MASTER_LOG_FILE=...) this is useless but does
+ not harm much. It is nice for 3.23 (>=.58) slaves which test Rotate events
to see if the master is 4.0 (then they choose to stop because they can't
- replicate 4.0); by always calling fake_rotate_event we are sure that 3.23.58
- and newer will detect the problem as soon as replication starts (BUG#198).
+ replicate 4.0); by always calling fake_rotate_event we are sure that
+ 3.23.58 and newer will detect the problem as soon as replication starts
+ (BUG#198).
Always calling fake_rotate_event makes sending of normal
- (=from-binlog) Rotate events a priori unneeded, but it is not so simple: the
- 2 Rotate events are not equivalent, the normal one is before the Stop event,
- the fake one is after. If we don't send the normal one, then the Stop event
- will be interpreted (by existing 4.0 slaves) as "the master stopped", which
- is wrong. So for safety, given that we want minimum modification of 4.0, we
- send the normal and fake Rotates.
+ (=from-binlog) Rotate events a priori unneeded, but it is not so simple:
+ the 2 Rotate events are not equivalent, the normal one is before the Stop
+ event, the fake one is after. If we don't send the normal one, then the
+ Stop event will be interpreted (by existing 4.0 slaves) as "the master
+ stopped", which is wrong. So for safety, given that we want minimum
+ modification of 4.0, we send the normal and fake Rotates.
*/
if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err;
}
- packet->set("\0", 1);
+ packet->set("\0", 1, &my_charset_bin);
while (!net->error && net->vio != 0 && !thd->killed)
{
@@ -439,7 +450,7 @@ impossible position";
goto err;
}
}
- packet->set("\0", 1);
+ packet->set("\0", 1, &my_charset_bin);
}
/*
TODO: now that we are logging the offset, check to make sure
@@ -564,7 +575,7 @@ Increase max_allowed_packet on master";
goto err;
}
}
- packet->set("\0", 1);
+ packet->set("\0", 1, &my_charset_bin);
/*
No need to net_flush because we will get to flush later when
we hit EOF pretty quick
@@ -614,7 +625,7 @@ Increase max_allowed_packet on master";
goto err;
}
packet->length(0);
- packet->append("\0",1);
+ packet->append('\0');
}
}
@@ -622,7 +633,7 @@ end:
end_io_cache(&log);
(void)my_close(file, MYF(MY_WME));
- send_eof(&thd->net);
+ send_eof(thd);
thd->proc_info = "Waiting to finalize termination";
pthread_mutex_lock(&LOCK_thread_count);
thd->current_linfo = 0;
@@ -644,82 +655,177 @@ err:
pthread_mutex_unlock(&LOCK_thread_count);
if (file >= 0)
(void) my_close(file, MYF(MY_WME));
- send_error(&thd->net, my_errno, errmsg);
+ send_error(thd, my_errno, errmsg);
DBUG_VOID_RETURN;
}
int start_slave(THD* thd , MASTER_INFO* mi, bool net_report)
{
- int slave_errno = 0;
- if (!thd) thd = current_thd;
- NET* net = &thd->net;
+ int slave_errno= 0;
int thread_mask;
DBUG_ENTER("start_slave");
- if (check_access(thd, SUPER_ACL, any_db))
+ if (check_access(thd, SUPER_ACL, any_db,0,0,0))
DBUG_RETURN(1);
lock_slave_threads(mi); // this allows us to cleanly read slave_running
+ // Get a mask of _stopped_ threads
init_thread_mask(&thread_mask,mi,1 /* inverse */);
- if (thd->lex.slave_thd_opt)
- thread_mask &= thd->lex.slave_thd_opt;
- if (thread_mask)
+ /*
+ Below we will start all stopped threads.
+ But if the user wants to start only one thread, do as if the other thread
+ was running (as we don't wan't to touch the other thread), so set the
+ bit to 0 for the other thread
+ */
+ if (thd->lex->slave_thd_opt)
+ thread_mask&= thd->lex->slave_thd_opt;
+ if (thread_mask) //some threads are stopped, start them
{
if (init_master_info(mi,master_info_file,relay_log_info_file, 0))
slave_errno=ER_MASTER_INFO;
else if (server_id_supplied && *mi->host)
- slave_errno = start_slave_threads(0 /*no mutex */,
+ {
+ /*
+ If we will start SQL thread we will care about UNTIL options
+ If not and they are specified we will ignore them and warn user
+ about this fact.
+ */
+ if (thread_mask & SLAVE_SQL)
+ {
+ pthread_mutex_lock(&mi->rli.data_lock);
+
+ if (thd->lex->mi.pos)
+ {
+ mi->rli.until_condition= RELAY_LOG_INFO::UNTIL_MASTER_POS;
+ mi->rli.until_log_pos= thd->lex->mi.pos;
+ /*
+ We don't check thd->lex->mi.log_file_name for NULL here
+ since it is checked in sql_yacc.yy
+ */
+ strmake(mi->rli.until_log_name, thd->lex->mi.log_file_name,
+ sizeof(mi->rli.until_log_name)-1);
+ }
+ else if (thd->lex->mi.relay_log_pos)
+ {
+ mi->rli.until_condition= RELAY_LOG_INFO::UNTIL_RELAY_POS;
+ mi->rli.until_log_pos= thd->lex->mi.relay_log_pos;
+ strmake(mi->rli.until_log_name, thd->lex->mi.relay_log_name,
+ sizeof(mi->rli.until_log_name)-1);
+ }
+ else
+ clear_until_condition(&mi->rli);
+
+ if (mi->rli.until_condition != RELAY_LOG_INFO::UNTIL_NONE)
+ {
+ /* Preparing members for effective until condition checking */
+ const char *p= fn_ext(mi->rli.until_log_name);
+ char *p_end;
+ if (*p)
+ {
+ //p points to '.'
+ mi->rli.until_log_name_extension= strtoul(++p,&p_end, 10);
+ /*
+ p_end points to the first invalid character. If it equals
+ to p, no digits were found, error. If it contains '\0' it
+ means conversion went ok.
+ */
+ if (p_end==p || *p_end)
+ slave_errno=ER_BAD_SLAVE_UNTIL_COND;
+ }
+ else
+ slave_errno=ER_BAD_SLAVE_UNTIL_COND;
+
+ /* mark the cached result of the UNTIL comparison as "undefined" */
+ mi->rli.until_log_names_cmp_result=
+ RELAY_LOG_INFO::UNTIL_LOG_NAMES_CMP_UNKNOWN;
+
+ /* Issuing warning then started without --skip-slave-start */
+ if (!opt_skip_slave_start)
+ push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_MISSING_SKIP_SLAVE,
+ ER(ER_MISSING_SKIP_SLAVE));
+ }
+
+ pthread_mutex_unlock(&mi->rli.data_lock);
+ }
+ else if (thd->lex->mi.pos || thd->lex->mi.relay_log_pos)
+ push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
+ ER(ER_UNTIL_COND_IGNORED));
+
+
+ if (!slave_errno)
+ slave_errno = start_slave_threads(0 /*no mutex */,
1 /* wait for start */,
mi,
master_info_file,relay_log_info_file,
thread_mask);
+ }
else
slave_errno = ER_BAD_SLAVE;
}
else
- slave_errno = ER_SLAVE_MUST_STOP;
+ //no error if all threads are already started, only a warning
+ push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING,
+ ER(ER_SLAVE_WAS_RUNNING));
unlock_slave_threads(mi);
if (slave_errno)
{
if (net_report)
- send_error(net, slave_errno);
+ send_error(thd, slave_errno);
DBUG_RETURN(1);
}
else if (net_report)
- send_ok(net);
+ send_ok(thd);
DBUG_RETURN(0);
}
+
int stop_slave(THD* thd, MASTER_INFO* mi, bool net_report )
{
- int slave_errno = 0;
- if (!thd) thd = current_thd;
- NET* net = &thd->net;
+ int slave_errno;
+ if (!thd)
+ thd = current_thd;
- if (check_access(thd, SUPER_ACL, any_db))
+ if (check_access(thd, SUPER_ACL, any_db,0,0,0))
return 1;
thd->proc_info = "Killing slave";
int thread_mask;
lock_slave_threads(mi);
+ // Get a mask of _running_ threads
init_thread_mask(&thread_mask,mi,0 /* not inverse*/);
- if (thd->lex.slave_thd_opt)
- thread_mask &= thd->lex.slave_thd_opt;
- slave_errno = (thread_mask) ?
- terminate_slave_threads(mi,thread_mask,
- 1 /*skip lock */) : ER_SLAVE_NOT_RUNNING;
+ /*
+ Below we will stop all running threads.
+ But if the user wants to stop only one thread, do as if the other thread
+ was stopped (as we don't wan't to touch the other thread), so set the
+ bit to 0 for the other thread
+ */
+ if (thd->lex->slave_thd_opt)
+ thread_mask &= thd->lex->slave_thd_opt;
+
+ if (thread_mask)
+ {
+ slave_errno= terminate_slave_threads(mi,thread_mask,
+ 1 /*skip lock */);
+ }
+ else
+ {
+ //no error if both threads are already stopped, only a warning
+ slave_errno= 0;
+ push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
+ ER(ER_SLAVE_WAS_NOT_RUNNING));
+ }
unlock_slave_threads(mi);
thd->proc_info = 0;
if (slave_errno)
{
if (net_report)
- send_error(net, slave_errno);
+ send_error(thd, slave_errno);
return 1;
}
else if (net_report)
- send_ok(net);
+ send_ok(thd);
return 0;
}
@@ -733,16 +839,9 @@ int stop_slave(THD* thd, MASTER_INFO* mi, bool net_report )
thd Thread handler
mi Master info for the slave
-
- NOTES
- We don't send ok in this functions as this is called from
- reload_acl_and_cache() which may have done other tasks, which may
- have failed for which we want to send and error.
-
RETURN
0 ok
1 error
- In this case error is sent to the client with send_error()
*/
@@ -763,7 +862,7 @@ int reset_slave(THD *thd, MASTER_INFO* mi)
error=1;
goto err;
}
- //delete relay logs, clear relay log coordinates
+ // delete relay logs, clear relay log coordinates
if ((error= purge_relay_logs(&mi->rli, thd,
1 /* just reset */,
&errmsg)))
@@ -777,16 +876,23 @@ int reset_slave(THD *thd, MASTER_INFO* mi)
STATUS; before doing START SLAVE;
*/
init_master_info_with_options(mi);
- clear_last_slave_error(&mi->rli);
- //close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
+ /*
+ Reset errors, and master timestamp (the idea is that we forget about the
+ old master).
+ */
+ clear_slave_error_timestamp(&mi->rli);
+ clear_until_condition(&mi->rli);
+
+ // close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
end_master_info(mi);
- //and delete these two files
+ // and delete these two files
fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME)))
{
error=1;
goto err;
}
+ // delete relay_log_info_file
fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32);
if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME)))
{
@@ -796,8 +902,8 @@ int reset_slave(THD *thd, MASTER_INFO* mi)
err:
unlock_slave_threads(mi);
- if (thd && error)
- send_error(&thd->net, sql_errno, errmsg);
+ if (error)
+ my_error(sql_errno, MYF(0), errmsg);
DBUG_RETURN(error);
}
@@ -861,17 +967,17 @@ int change_master(THD* thd, MASTER_INFO* mi)
init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
if (thread_mask) // We refuse if any slave thread is running
{
- net_printf(&thd->net,ER_SLAVE_MUST_STOP);
+ net_printf(thd,ER_SLAVE_MUST_STOP);
unlock_slave_threads(mi);
DBUG_RETURN(1);
}
thd->proc_info = "Changing master";
- LEX_MASTER_INFO* lex_mi = &thd->lex.mi;
+ LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
// TODO: see if needs re-write
if (init_master_info(mi, master_info_file, relay_log_info_file, 0))
{
- send_error(&thd->net, ER_MASTER_INFO);
+ send_error(thd, ER_MASTER_INFO);
unlock_slave_threads(mi);
DBUG_RETURN(1);
}
@@ -891,7 +997,6 @@ int change_master(THD* thd, MASTER_INFO* mi)
{
mi->master_log_name[0] = 0;
mi->master_log_pos= BIN_LOG_HEADER_SIZE;
- mi->rli.pending = 0;
}
if (lex_mi->log_file_name)
@@ -900,7 +1005,6 @@ int change_master(THD* thd, MASTER_INFO* mi)
if (lex_mi->pos)
{
mi->master_log_pos= lex_mi->pos;
- mi->rli.pending = 0;
}
DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
@@ -914,18 +1018,39 @@ int change_master(THD* thd, MASTER_INFO* mi)
mi->port = lex_mi->port;
if (lex_mi->connect_retry)
mi->connect_retry = lex_mi->connect_retry;
+
+ if (lex_mi->ssl != LEX_MASTER_INFO::SSL_UNCHANGED)
+ mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::SSL_ENABLE);
+ if (lex_mi->ssl_ca)
+ strmake(mi->ssl_ca, lex_mi->ssl_ca, sizeof(mi->ssl_ca)-1);
+ if (lex_mi->ssl_capath)
+ strmake(mi->ssl_capath, lex_mi->ssl_capath, sizeof(mi->ssl_capath)-1);
+ if (lex_mi->ssl_cert)
+ strmake(mi->ssl_cert, lex_mi->ssl_cert, sizeof(mi->ssl_cert)-1);
+ if (lex_mi->ssl_cipher)
+ strmake(mi->ssl_cipher, lex_mi->ssl_cipher, sizeof(mi->ssl_cipher)-1);
+ if (lex_mi->ssl_key)
+ strmake(mi->ssl_key, lex_mi->ssl_key, sizeof(mi->ssl_key)-1);
+#ifndef HAVE_OPENSSL
+ if (lex_mi->ssl || lex_mi->ssl_ca || lex_mi->ssl_capath ||
+ lex_mi->ssl_cert || lex_mi->ssl_cipher || lex_mi->ssl_key )
+ push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
+ ER_SLAVE_IGNORED_SSL_PARAMS, ER(ER_SLAVE_IGNORED_SSL_PARAMS));
+#endif
if (lex_mi->relay_log_name)
{
need_relay_log_purge= 0;
- strmake(mi->rli.relay_log_name,lex_mi->relay_log_name,
- sizeof(mi->rli.relay_log_name)-1);
+ strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name,
+ sizeof(mi->rli.group_relay_log_name)-1);
+ strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name,
+ sizeof(mi->rli.event_relay_log_name)-1);
}
if (lex_mi->relay_log_pos)
{
need_relay_log_purge= 0;
- mi->rli.relay_log_pos=lex_mi->relay_log_pos;
+ mi->rli.group_relay_log_pos= mi->rli.event_relay_log_pos= lex_mi->relay_log_pos;
}
/*
@@ -953,21 +1078,25 @@ int change_master(THD* thd, MASTER_INFO* mi)
of replication is not 100% clear, so we guard against problems using
max().
*/
- mi->master_log_pos = max(BIN_LOG_HEADER_SIZE, mi->rli.master_log_pos);
- strmake(mi->master_log_name,mi->rli.master_log_name,
+ mi->master_log_pos = max(BIN_LOG_HEADER_SIZE,
+ mi->rli.group_master_log_pos);
+ strmake(mi->master_log_name, mi->rli.group_master_log_name,
sizeof(mi->master_log_name)-1);
}
-
+ /*
+ Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never
+ a slave before).
+ */
flush_master_info(mi, 0);
if (need_relay_log_purge)
{
- mi->rli.skip_log_purge= 0;
+ relay_log_purge= 1;
thd->proc_info="Purging old relay logs";
if (purge_relay_logs(&mi->rli, thd,
0 /* not only reset, but also reinit */,
&errmsg))
{
- net_printf(&thd->net, 0, "Failed purging old relay logs: %s",errmsg);
+ net_printf(thd, 0, "Failed purging old relay logs: %s",errmsg);
unlock_slave_threads(mi);
DBUG_RETURN(1);
}
@@ -975,19 +1104,20 @@ int change_master(THD* thd, MASTER_INFO* mi)
else
{
const char* msg;
- mi->rli.skip_log_purge= 1;
+ relay_log_purge= 0;
/* Relay log is already initialized */
if (init_relay_log_pos(&mi->rli,
- mi->rli.relay_log_name,
- mi->rli.relay_log_pos,
+ mi->rli.group_relay_log_name,
+ mi->rli.group_relay_log_pos,
0 /*no data lock*/,
&msg))
{
- net_printf(&thd->net,0,"Failed initializing relay log position: %s",msg);
+ net_printf(thd,0,"Failed initializing relay log position: %s",msg);
unlock_slave_threads(mi);
DBUG_RETURN(1);
}
}
+ mi->rli.group_master_log_pos = mi->master_log_pos;
DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
/*
@@ -1000,17 +1130,18 @@ int change_master(THD* thd, MASTER_INFO* mi)
''/0: we have lost all copies of the original good coordinates.
That's why we always save good coords in rli.
*/
- mi->rli.master_log_pos = mi->master_log_pos;
- strmake(mi->rli.master_log_name,mi->master_log_name,
- sizeof(mi->rli.master_log_name)-1);
+ mi->rli.group_master_log_pos= mi->master_log_pos;
+ strmake(mi->rli.group_master_log_name,mi->master_log_name,
+ sizeof(mi->rli.group_master_log_name)-1);
- if (!mi->rli.master_log_name[0]) // uninitialized case
- mi->rli.master_log_pos=0;
+ if (!mi->rli.group_master_log_name[0]) // uninitialized case
+ mi->rli.group_master_log_pos=0;
pthread_mutex_lock(&mi->rli.data_lock);
mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */
- /* Clear the error, for a clean start. */
- clear_last_slave_error(&mi->rli);
+ /* Clear the errors, for a clean start, and master timestamp */
+ clear_slave_error_timestamp(&mi->rli);
+ clear_until_condition(&mi->rli);
/*
If we don't write new coordinates to disk now, then old will remain in
relay-log.info until START SLAVE is issued; but if mysqld is shutdown
@@ -1018,13 +1149,13 @@ int change_master(THD* thd, MASTER_INFO* mi)
in-memory value at restart (thus causing errors, as the old relay log does
not exist anymore).
*/
- flush_relay_log_info(&mi->rli);
+ flush_relay_log_info(&mi->rli);
pthread_cond_broadcast(&mi->data_cond);
pthread_mutex_unlock(&mi->rli.data_lock);
unlock_slave_threads(mi);
thd->proc_info = 0;
- send_ok(&thd->net);
+ send_ok(thd);
DBUG_RETURN(0);
}
@@ -1058,19 +1189,20 @@ int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
int show_binlog_events(THD* thd)
{
+ Protocol *protocol= thd->protocol;
DBUG_ENTER("show_binlog_events");
List<Item> field_list;
- const char* errmsg = 0;
+ const char *errmsg = 0;
IO_CACHE log;
File file = -1;
Log_event::init_show_field_list(&field_list);
- if (send_fields(thd, field_list, 1))
+ if (protocol-> send_fields(&field_list, 1))
DBUG_RETURN(-1);
if (mysql_bin_log.is_open())
{
- LEX_MASTER_INFO *lex_mi = &thd->lex.mi;
+ LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
ha_rows event_count, limit_start, limit_end;
my_off_t pos = max(BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
char search_file_name[FN_REFLEN], *name;
@@ -1079,8 +1211,8 @@ int show_binlog_events(THD* thd)
LOG_INFO linfo;
Log_event* ev;
- limit_start = thd->lex.select->offset_limit;
- limit_end = thd->lex.select->select_limit + limit_start;
+ limit_start= thd->lex->current_select->offset_limit;
+ limit_end= thd->lex->current_select->select_limit + limit_start;
name= search_file_name;
if (log_file_name)
@@ -1107,7 +1239,7 @@ int show_binlog_events(THD* thd)
(ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,0)); )
{
if (event_count >= limit_start &&
- ev->net_send(thd, linfo.log_file_name, pos))
+ ev->net_send(protocol, linfo.log_file_name, pos))
{
errmsg = "Net error";
delete ev;
@@ -1146,7 +1278,7 @@ err:
DBUG_RETURN(-1);
}
- send_eof(&thd->net);
+ send_eof(thd);
pthread_mutex_lock(&LOCK_thread_count);
thd->current_linfo = 0;
pthread_mutex_unlock(&LOCK_thread_count);
@@ -1156,31 +1288,32 @@ err:
int show_binlog_info(THD* thd)
{
+ Protocol *protocol= thd->protocol;
DBUG_ENTER("show_binlog_info");
List<Item> field_list;
field_list.push_back(new Item_empty_string("File", FN_REFLEN));
- field_list.push_back(new Item_empty_string("Position",20));
- field_list.push_back(new Item_empty_string("Binlog_do_db",20));
- field_list.push_back(new Item_empty_string("Binlog_ignore_db",20));
+ field_list.push_back(new Item_return_int("Position",20,
+ MYSQL_TYPE_LONGLONG));
+ field_list.push_back(new Item_empty_string("Binlog_Do_DB",255));
+ field_list.push_back(new Item_empty_string("Binlog_Ignore_DB",255));
- if (send_fields(thd, field_list, 1))
+ if (protocol->send_fields(&field_list, 1))
DBUG_RETURN(-1);
- String* packet = &thd->packet;
- packet->length(0);
+ protocol->prepare_for_resend();
if (mysql_bin_log.is_open())
{
LOG_INFO li;
mysql_bin_log.get_current_log(&li);
int dir_len = dirname_length(li.log_file_name);
- net_store_data(packet, li.log_file_name + dir_len);
- net_store_data(packet, (longlong)li.pos);
- net_store_data(packet, &binlog_do_db);
- net_store_data(packet, &binlog_ignore_db);
- if (my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length()))
+ protocol->store(li.log_file_name + dir_len, &my_charset_bin);
+ protocol->store((ulonglong) li.pos);
+ protocol->store(&binlog_do_db);
+ protocol->store(&binlog_ignore_db);
+ if (protocol->write())
DBUG_RETURN(-1);
}
- send_eof(&thd->net);
+ send_eof(thd);
DBUG_RETURN(0);
}
@@ -1201,21 +1334,21 @@ int show_binlogs(THD* thd)
{
IO_CACHE *index_file;
char fname[FN_REFLEN];
- NET* net = &thd->net;
List<Item> field_list;
- String *packet = &thd->packet;
uint length;
+ Protocol *protocol= thd->protocol;
+ DBUG_ENTER("show_binlogs");
if (!mysql_bin_log.is_open())
{
//TODO: Replace with ER() error message
- send_error(net, 0, "You are not using binary logging");
+ send_error(thd, 0, "You are not using binary logging");
return 1;
}
field_list.push_back(new Item_empty_string("Log_name", 255));
- if (send_fields(thd, field_list, 1))
- return 1;
+ if (protocol->send_fields(&field_list, 1))
+ DBUG_RETURN(1);
mysql_bin_log.lock_index();
index_file=mysql_bin_log.get_index_file();
@@ -1224,20 +1357,20 @@ int show_binlogs(THD* thd)
/* The file ends with EOF or empty line */
while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
{
+ protocol->prepare_for_resend();
int dir_len = dirname_length(fname);
- packet->length(0);
/* The -1 is for removing newline from fname */
- net_store_data(packet, fname + dir_len, length-1-dir_len);
- if (my_net_write(net, (char*) packet->ptr(), packet->length()))
+ protocol->store(fname + dir_len, length-1-dir_len, &my_charset_bin);
+ if (protocol->write())
goto err;
}
mysql_bin_log.unlock_index();
- send_eof(net);
- return 0;
+ send_eof(thd);
+ DBUG_RETURN(0);
err:
mysql_bin_log.unlock_index();
- return 1;
+ DBUG_RETURN(1);
}
@@ -1273,3 +1406,7 @@ int log_loaded_block(IO_CACHE* file)
}
return 0;
}
+
+#endif /* HAVE_REPLICATION */
+
+