summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc274
1 files changed, 212 insertions, 62 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index 6c9cfc250c5..99bddb7b9b0 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -16,10 +16,9 @@
#include "mysql_priv.h"
-#ifdef HAVE_REPLICATION
-
#include <mysql.h>
#include <myisam.h>
+#include "rpl_rli.h"
#include "slave.h"
#include "sql_repl.h"
#include "rpl_filter.h"
@@ -28,6 +27,10 @@
#include <my_dir.h>
#include <sql_common.h>
+#ifdef HAVE_REPLICATION
+
+#include "rpl_tblmap.h"
+
#define MAX_SLAVE_RETRY_PAUSE 5
bool use_slave_mask = 0;
MY_BITMAP slave_error_mask;
@@ -48,8 +51,6 @@ ulonglong relay_log_space_limit = 0;
*/
int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
-int events_till_abort = -1;
-static int events_till_disconnect = -1;
typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
@@ -860,19 +861,48 @@ static bool sql_slave_killed(THD* thd, RELAY_LOG_INFO* rli)
{
DBUG_ASSERT(rli->sql_thd == thd);
DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun
- return rli->abort_slave || abort_loop || thd->killed;
+ if (abort_loop || thd->killed || rli->abort_slave)
+ {
+ /*
+ If we are in an unsafe situation (stopping could corrupt replication),
+ we give one minute to the slave SQL thread of grace before really
+ terminating, in the hope that it will be able to read more events and
+ the unsafe situation will soon be left. Note that this one minute starts
+ from the last time anything happened in the slave SQL thread. So it's
+ really one minute of idleness, we don't timeout if the slave SQL thread
+ is actively working.
+ */
+ if (!rli->unsafe_to_stop_at)
+ return 1;
+ DBUG_PRINT("info", ("Slave SQL thread is in an unsafe situation, giving "
+ "it some grace period"));
+ if (difftime(time(0), rli->unsafe_to_stop_at) > 60)
+ {
+ slave_print_msg(ERROR_LEVEL, rli, 0,
+ "SQL thread had to stop in an unsafe situation, in "
+ "the middle of applying updates to a "
+ "non-transactional table without any primary key. "
+ "There is a risk of duplicate updates when the slave "
+ "SQL thread is restarted. Please check your tables' "
+ "contents after restart.");
+ return 1;
+ }
+ }
+ return 0;
}
/*
- Writes an error message to rli->last_slave_error and rli->last_slave_errno
- (which will be displayed by SHOW SLAVE STATUS), and prints it to stderr.
+ Writes a message to stderr, and if it's an error message, to
+ rli->last_slave_error and rli->last_slave_errno (which will be displayed by
+ SHOW SLAVE STATUS).
SYNOPSIS
- slave_print_error()
- rli
+ slave_print_msg()
+ level The severity level
+ rli
err_code The error code
- msg The error message (usually related to the error code, but can
+ msg The message (usually related to the error code, but can
contain more information).
... (this is printf-like format, with % symbols in msg)
@@ -880,22 +910,47 @@ static bool sql_slave_killed(THD* thd, RELAY_LOG_INFO* rli)
void
*/
-void slave_print_error(RELAY_LOG_INFO* rli, int err_code, const char* msg, ...)
+void slave_print_msg(enum loglevel level, RELAY_LOG_INFO* rli,
+ int err_code, const char* msg, ...)
{
+ void (*report_function)(const char *, ...);
+ char buff[MAX_SLAVE_ERRMSG], *pbuff= buff;
+ uint pbuffsize= sizeof(buff);
va_list args;
va_start(args,msg);
- my_vsnprintf(rli->last_slave_error,
- sizeof(rli->last_slave_error), msg, args);
- rli->last_slave_errno = err_code;
- /* If the error string ends with '.', do not add a ',' it would be ugly */
- if (rli->last_slave_error[0] &&
- (*(strend(rli->last_slave_error)-1) == '.'))
- sql_print_error("Slave: %s Error_code: %d", rli->last_slave_error,
- err_code);
+ switch (level)
+ {
+ case ERROR_LEVEL:
+ /*
+ This my_error call only has effect in client threads.
+ Slave threads do nothing in my_error().
+ */
+ my_error(ER_UNKNOWN_ERROR, MYF(0), msg);
+ /*
+ It's an error, it must be reported in Last_error and Last_errno in SHOW
+ SLAVE STATUS.
+ */
+ pbuff= rli->last_slave_error;
+ pbuffsize= sizeof(rli->last_slave_error);
+ rli->last_slave_errno = err_code;
+ report_function= sql_print_error;
+ break;
+ case WARNING_LEVEL:
+ report_function= sql_print_warning;
+ break;
+ case INFORMATION_LEVEL:
+ report_function= sql_print_information;
+ break;
+ default:
+ DBUG_ASSERT(0); // should not come here
+ return; // don't crash production builds, just do nothing
+ }
+ my_vsnprintf(pbuff, pbuffsize, msg, args);
+ /* If the msg string ends with '.', do not add a ',' it would be ugly */
+ if (pbuff[0] && (*(strend(pbuff)-1) == '.'))
+ (*report_function)("Slave: %s Error_code: %d", pbuff, err_code);
else
- sql_print_error("Slave: %s, Error_code: %d", rli->last_slave_error,
- err_code);
-
+ (*report_function)("Slave: %s, Error_code: %d", pbuff, err_code);
}
/*
@@ -919,7 +974,6 @@ bool net_request_file(NET* net, const char* fname)
DBUG_RETURN(net_write_command(net, 251, fname, strlen(fname), "", 0));
}
-
/*
From other comments and tests in code, it looks like
sometimes Query_log_event and Load_log_event can have db == 0
@@ -932,7 +986,6 @@ const char *print_slave_db_safe(const char* db)
return (db ? db : "");
}
-
static int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
const char *default_val)
{
@@ -1379,6 +1432,7 @@ static int init_relay_log_info(RELAY_LOG_INFO* rli,
const char* msg = 0;
int error = 0;
DBUG_ENTER("init_relay_log_info");
+ DBUG_ASSERT(!rli->no_storage); // Don't init if there is no storage
if (rli->inited) // Set if this function called
DBUG_RETURN(0);
@@ -1674,7 +1728,7 @@ static void write_ignored_events_info_to_relay_log(THD *thd, MASTER_INFO *mi)
if (rli->ign_master_log_name_end[0])
{
DBUG_PRINT("info",("writing a Rotate event to track down ignored events"));
- Rotate_log_event *ev= new Rotate_log_event(thd, rli->ign_master_log_name_end,
+ Rotate_log_event *ev= new Rotate_log_event(rli->ign_master_log_name_end,
0, rli->ign_master_log_pos_end,
Rotate_log_event::DUP_NAME);
rli->ign_master_log_name_end[0]= 0;
@@ -2241,17 +2295,17 @@ bool flush_master_info(MASTER_INFO* mi, bool flush_relay_log_cache)
st_relay_log_info::st_relay_log_info()
- :info_fd(-1), cur_log_fd(-1), save_temporary_tables(0),
+ :no_storage(FALSE), info_fd(-1), cur_log_fd(-1), save_temporary_tables(0),
cur_log_old_open_count(0), group_master_log_pos(0), log_space_total(0),
ignore_log_space_limit(0), last_master_timestamp(0), slave_skip_counter(0),
abort_pos_wait(0), slave_run_id(0), sql_thd(0), last_slave_errno(0),
inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
- until_log_pos(0), retried_trans(0)
+ until_log_pos(0), retried_trans(0), m_reload_flags(RELOAD_NONE_F),
+ unsafe_to_stop_at(0)
{
group_relay_log_name[0]= event_relay_log_name[0]=
group_master_log_name[0]= 0;
last_slave_error[0]= until_log_name[0]= ign_master_log_name_end[0]= 0;
-
bzero((char*) &info_file, sizeof(info_file));
bzero((char*) &cache_buf, sizeof(cache_buf));
cached_charset_invalidate();
@@ -2671,11 +2725,9 @@ static ulong read_event(MYSQL* mysql, MASTER_INFO *mi, bool* suppress_warnings)
/*
my_real_read() will time us out
We check if we were told to die, and if not, try reading again
-
- TODO: Move 'events_till_disconnect' to the MASTER_INFO structure
*/
#ifndef DBUG_OFF
- if (disconnect_slave_event_count && !(events_till_disconnect--))
+ if (disconnect_slave_event_count && !(mi->events_till_disconnect--))
return packet_error;
#endif
@@ -2950,7 +3002,7 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
thd->lex->current_select= 0;
if (!ev->when)
ev->when = time(NULL);
- ev->thd = thd;
+ ev->thd = thd; // because up to this point, ev->thd == 0
exec_res = ev->exec_event(rli);
DBUG_ASSERT(rli->sql_thd==thd);
/*
@@ -3022,7 +3074,7 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
else
{
pthread_mutex_unlock(&rli->data_lock);
- slave_print_error(rli, 0, "\
+ slave_print_msg(ERROR_LEVEL, rli, 0, "\
Could not parse relay log event entry. The possible reasons are: the master's \
binary log is corrupted (you can check this by running 'mysqlbinlog' on the \
binary log), the slave's relay log is corrupted (you can check this by running \
@@ -3051,9 +3103,6 @@ pthread_handler_t handle_slave_io(void *arg)
my_thread_init();
DBUG_ENTER("handle_slave_io");
-#ifndef DBUG_OFF
-slave_begin:
-#endif
DBUG_ASSERT(mi->inited);
mysql= NULL ;
retry_count= 0;
@@ -3063,7 +3112,7 @@ slave_begin:
mi->slave_run_id++;
#ifndef DBUG_OFF
- mi->events_till_abort = abort_slave_event_count;
+ mi->events_till_disconnect = disconnect_slave_event_count;
#endif
thd= new THD; // note that contructor of THD uses DBUG_ !
@@ -3301,14 +3350,6 @@ ignore_log_space_limit=%d",
log space");
goto err;
}
- // TODO: check debugging abort code
-#ifndef DBUG_OFF
- if (abort_slave_event_count && !--events_till_abort)
- {
- sql_print_error("Slave I/O thread: debugging abort");
- goto err;
- }
-#endif
}
}
@@ -3347,10 +3388,6 @@ err:
pthread_mutex_unlock(&LOCK_thread_count);
pthread_cond_broadcast(&mi->stop_cond); // tell the world we are done
pthread_mutex_unlock(&mi->run_lock);
-#ifndef DBUG_OFF
- if (abort_slave_event_count && !events_till_abort)
- goto slave_begin;
-#endif
my_thread_end();
pthread_exit(0);
DBUG_RETURN(0); // Can't return anything here
@@ -3370,10 +3407,6 @@ pthread_handler_t handle_slave_sql(void *arg)
my_thread_init();
DBUG_ENTER("handle_slave_sql");
-#ifndef DBUG_OFF
-slave_begin:
-#endif
-
DBUG_ASSERT(rli->inited);
pthread_mutex_lock(&rli->run_lock);
DBUG_ASSERT(!rli->slave_running);
@@ -3520,6 +3553,14 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
RPL_LOG_NAME, llstr(rli->group_master_log_pos,llbuff));
err:
+
+ /*
+ Some events set some playgrounds, which won't be cleared because thread
+ stops. Stopping of this thread may not be known to these events ("stop"
+ request is detected only by the present function, not by events), so we
+ must "proactively" clear playgrounds:
+ */
+ rli->cleanup_context(thd, 1);
VOID(pthread_mutex_lock(&LOCK_thread_count));
/*
Some extra safety, which should not been needed (normally, event deletion
@@ -3565,10 +3606,6 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
pthread_cond_broadcast(&rli->stop_cond);
// tell the world we are done
pthread_mutex_unlock(&rli->run_lock);
-#ifndef DBUG_OFF // TODO: reconsider the code below
- if (abort_slave_event_count && !rli->events_till_abort)
- goto slave_begin;
-#endif
my_thread_end();
pthread_exit(0);
DBUG_RETURN(0); // Can't return anything here
@@ -3721,7 +3758,7 @@ static int process_io_rotate(MASTER_INFO *mi, Rotate_log_event *rev)
rotate event forever, so we need to not disconnect after one.
*/
if (disconnect_slave_event_count)
- events_till_disconnect++;
+ mi->events_till_disconnect++;
#endif
/*
@@ -4177,7 +4214,7 @@ static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
DBUG_ENTER("connect_to_master");
#ifndef DBUG_OFF
- events_till_disconnect = disconnect_slave_event_count;
+ mi->events_till_disconnect = disconnect_slave_event_count;
#endif
ulong client_flag= CLIENT_REMEMBER_OPTIONS;
if (opt_slave_compressed_protocol)
@@ -4311,6 +4348,10 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
bool flush_relay_log_info(RELAY_LOG_INFO* rli)
{
bool error=0;
+
+ if (unlikely(rli->no_storage))
+ return 0;
+
IO_CACHE *file = &rli->info_file;
char buff[FN_REFLEN*2+22*2+4], *pos;
@@ -4327,6 +4368,7 @@ bool flush_relay_log_info(RELAY_LOG_INFO* rli)
error=1;
if (flush_io_cache(file))
error=1;
+
/* Flushing the relay log is done by the slave I/O thread */
return error;
}
@@ -4357,7 +4399,7 @@ static IO_CACHE *reopen_relay_log(RELAY_LOG_INFO *rli, const char **errmsg)
}
-Log_event* next_event(RELAY_LOG_INFO* rli)
+static Log_event* next_event(RELAY_LOG_INFO* rli)
{
Log_event* ev;
IO_CACHE* cur_log = rli->cur_log;
@@ -4368,6 +4410,11 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
DBUG_ENTER("next_event");
DBUG_ASSERT(thd != 0);
+#ifndef DBUG_OFF
+ if (abort_slave_event_count && !rli->events_till_abort--)
+ DBUG_RETURN(0);
+#endif
+
/*
For most operations we need to protect rli members with data_lock,
so we assume calling function acquired this mutex for us and we will
@@ -4489,7 +4536,7 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
{
/* We generate and return a Rotate, to make our positions advance */
DBUG_PRINT("info",("seeing an ignored end segment"));
- ev= new Rotate_log_event(thd, rli->ign_master_log_name_end,
+ ev= new Rotate_log_event(rli->ign_master_log_name_end,
0, rli->ign_master_log_pos_end,
Rotate_log_event::DUP_NAME);
rli->ign_master_log_name_end[0]= 0;
@@ -4737,11 +4784,114 @@ end:
DBUG_VOID_RETURN;
}
+/*
+ Some system tables needed to be re-read by the MySQL server after it has
+ updated them; in statement-based replication, the GRANT and other commands
+ are sent verbatim to the slave which then reloads; in row-based replication,
+ changes to these tables are done through ordinary Rows binlog events, so
+ master must add some flag for the slave to know it has to reload the tables.
+*/
+struct st_reload_entry
+{
+ char const *table;
+ st_relay_log_info::enum_reload_flag flag;
+};
+
+/*
+ Sorted array of table names, please keep it sorted since we are
+ using bsearch() on it below.
+ */
+static st_reload_entry s_mysql_tables[] =
+{
+ { "columns_priv", st_relay_log_info::RELOAD_GRANT_F },
+ { "db", st_relay_log_info::RELOAD_ACCESS_F },
+ { "host", st_relay_log_info::RELOAD_ACCESS_F },
+ { "procs_priv", st_relay_log_info::RELOAD_GRANT_F },
+ { "tables_priv", st_relay_log_info::RELOAD_GRANT_F },
+ { "user", st_relay_log_info::RELOAD_ACCESS_F }
+};
+
+static const my_size_t s_mysql_tables_size =
+ sizeof(s_mysql_tables)/sizeof(*s_mysql_tables);
+
+static int reload_entry_compare(const void *lhs, const void *rhs)
+{
+ const char *lstr = static_cast<const char *>(lhs);
+ const char *rstr = static_cast<const st_reload_entry*>(rhs)->table;
+ return strcmp(lstr, rstr);
+}
+
+void st_relay_log_info::touching_table(char const* db, char const* table,
+ ulong table_id)
+{
+ if (strcmp(db,"mysql") == 0)
+ {
+#if defined(HAVE_BSEARCH) && defined(HAVE_SIZE_T)
+ void *const ptr= bsearch(table, s_mysql_tables,
+ s_mysql_tables_size,
+ sizeof(*s_mysql_tables), reload_entry_compare);
+ st_reload_entry const *const entry= static_cast<st_reload_entry*>(ptr);
+#else
+ /*
+ Fall back to full scan, there are few rows anyway and updating the
+ "mysql" database is rare.
+ */
+ st_reload_entry const *entry= s_mysql_tables;
+ for ( ; entry < s_mysql_tables + s_mysql_tables_size ; entry++)
+ if (reload_entry_compare(table, entry) == 0)
+ break;
+#endif
+ if (entry)
+ m_reload_flags|= entry->flag;
+ }
+}
+
+void st_relay_log_info::transaction_end(THD* thd)
+{
+ if (m_reload_flags != RELOAD_NONE_F)
+ {
+ if (m_reload_flags & RELOAD_ACCESS_F)
+ acl_reload(thd);
+
+ if (m_reload_flags & RELOAD_GRANT_F)
+ grant_reload(thd);
+
+ m_reload_flags= RELOAD_NONE_F;
+ }
+}
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+void st_relay_log_info::cleanup_context(THD *thd, bool error)
+{
+ DBUG_ASSERT(sql_thd == thd);
+ /*
+ 1) Instances of Table_map_log_event, if ::exec_event() was called on them,
+ may have opened tables, which we cannot be sure have been closed (because
+ maybe the Rows_log_event have not been found or will not be, because slave
+ SQL thread is stopping, or relay log has a missing tail etc). So we close
+ all thread's tables. And so the table mappings have to be cancelled.
+ 2) Rows_log_event::exec_event() may even have started statements or
+ transactions on them, which we need to rollback in case of error.
+ 3) If finding a Format_description_log_event after a BEGIN, we also need
+ to rollback before continuing with the next events.
+ 4) so we need this "context cleanup" function.
+ */
+ if (error)
+ {
+ ha_autocommit_or_rollback(thd, 1); // if a "statement transaction"
+ end_trans(thd, ROLLBACK); // if a "real transaction"
+ }
+ m_table_map.clear_tables();
+ close_thread_tables(thd);
+ unsafe_to_stop_at= 0;
+}
+#endif
+
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
template class I_List_iterator<i_string>;
template class I_List_iterator<i_string_pair>;
#endif
-
#endif /* HAVE_REPLICATION */
+