summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
authorunknown <monty@mashka.mysql.fi>2002-08-08 03:12:02 +0300
committerunknown <monty@mashka.mysql.fi>2002-08-08 03:12:02 +0300
commitf01f49916b7a0ea6eaf9f0e4e1dfad911584f8a2 (patch)
treec44ce89903320c9d19cd6fe9767f75d997d2a1d0 /sql/slave.cc
parent6ba1aefe909df2651fcace1fe184e3f093d07ab4 (diff)
downloadmariadb-git-f01f49916b7a0ea6eaf9f0e4e1dfad911584f8a2.tar.gz
Lots of code fixes to the replication code (especially the binary logging and index log file handling)
Fixed bugs in my last changeset that made MySQL hard to compile. Added mutex around some data that could cause table cache corruptions when using OPTIMIZE TABLE / REPAIR TABLE or automatic repair of MyISAM tables. Added mutex around some data in the slave start/stop code that could cause THD linked list corruptions Extended my_chsize() to allow one to specify a filler character. Extend vio_blocking to return the old state (This made some usage of this function much simpler) Added testing for some functions that they caller have got the required mutexes before calling the function. Use setrlimit() to ensure that we can write core file if one specifies --core-file. Added --slave-compressed-protocol Made 2 the minimum length for ft_min_word_len Added variables foreign_key_checks & unique_checks. Less logging from replication code (if not started with --log-warnings) Changed that SHOW INNODB STATUS requre the SUPER privilege More DBUG statements and a lot of new code comments BitKeeper/deleted/.del-rpl_compat.result~c950bc346b12c61a: Delete: mysql-test/r/rpl_compat.result BitKeeper/deleted/.del-rpl_compat.test~5f6ba955e02aa95f: Delete: mysql-test/t/rpl_compat.test Docs/manual.texi: Updated manual with fixes in this changeset client/mysqltest.c: Indentation cleanup Better error messages for some error conditions. include/my_pthread.h: Added 'safe_mutex_assert_owner()' to check that the thread really owns the mutex. include/my_sys.h: Extended my_chsize() to allow one to specify a filler character. (For MySQL index logs) include/raid.h: New my_chsize() include/violite.h: Extend vio_blocking to return the old state innobase/include/dyn0dyn.h: Merge with 3.23 (AIX DYN_ARRAY_DATA_SIZE) innobase/include/dyn0dyn.ic: Merge with 3.23 isam/create.c: Fix for new my_chsize() isam/isamchk.c: Fix for new my_chsize() isam/pack_isam.c: Fix for new my_chsize() libmysql/manager.c: Fix for new vio_blocking() libmysqld/lib_sql.cc: Fix for new open_log() myisam/mi_cache.c: Fix typo from previous checking myisam/mi_check.c: Fix for new my_chsize() myisam/mi_create.c: Fix for new my_chsize() myisam/mi_delete_all.c: Fix for new my_chsize() myisam/myisampack.c: Fix for new my_chsize() mysql-test/include/master-slave.inc: Better initialization for replication tests mysql-test/mysql-test-run.sh: Added option --log-warnings mysql-test/r/insert.result: More tests if INSERT ...(DEFAULT) mysql-test/r/rpl000001.result: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/r/rpl000002.result: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/r/rpl000003.result: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/r/rpl000004.result: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/r/rpl000005.result: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/r/rpl000006.result: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/r/rpl000007.result: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/r/rpl000008.result: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/r/rpl000009.result: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/r/rpl000010.result: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/r/rpl000011.result: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/r/rpl000012.result: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/r/rpl000013.result: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/r/rpl000014.result: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/r/rpl_alter.result: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/r/rpl_empty_master_crash.result: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/r/rpl_get_lock.result: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/r/rpl_log.result: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/r/rpl_magic.result: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/r/rpl_mystery22.result: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/r/rpl_skip_error.result: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/r/rpl_sporadic_master.result: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/t/insert.test: More tests if INSERT ...(DEFAULT) mysql-test/t/rpl000001.test: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/t/rpl000002.test: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/t/rpl000003.test: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/t/rpl000004.test: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/t/rpl000005.test: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/t/rpl000006.test: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/t/rpl000007.test: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/t/rpl000009.test: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/t/rpl000011.test: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/t/rpl000013.test: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/t/rpl000014.test: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/t/rpl_alter.test: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/t/rpl_empty_master_crash.test: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/t/rpl_get_lock.test: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/t/rpl_magic.test: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/t/rpl_mystery22.test: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/t/rpl_skip_error.test: Clean up tests for new master-slave.inc Remove 'use database' mysql-test/t/rpl_sporadic_master.test: Clean up tests for new master-slave.inc Remove 'use database' mysys/mf_iocache.c: More debug info Force seek after reinit_io_cache() mysys/mf_iocache2.c: Added my_b_filelength() mysys/my_chsize.c: Extended my_chsize() to allow one to specify a filler character. (For MySQL index logs) mysys/raid.cc: Extended my_chsize() to allow one to specify a filler character. (For MySQL index logs) sql/field.h: Fix for INSERT ... (DEFAULT) sql/ha_berkeley.h: Fix for dynamic variables sql/ha_innodb.cc: Change sprintf() to my_sprintf() to make code portable. Fix after sync with 3.23 (We still need to fix the storage of the replication position in innodb) sql/ha_innodb.h: Fix for dynamic variables sql/handler.cc: Remove writting of COMMIT to the binary log. (Now done in MYSQL_LOG::write()) sql/item_func.cc: Query_log_event() now always takes query length. sql/item_func.h: Indentation cleanup sql/item_strfunc.h: Indentation cleanup sql/item_timefunc.h: Indentation cleanup sql/lock.cc: Check that we own critical mutexes. sql/log.cc: Big code cleanup / rewrite / optimize. - The index log file has its own IO_CACHE object. - Many functions totally rewritten to make them smaller and faster. - New handling of index log files - Lots of new comments sql/log_event.cc: Code cleanup New comments sql/log_event.h: Query_log_event() now always takes query length. sql/mini_client.cc: Better error messages on reconnect. Fixed wrong variable usage from last commit. sql/mysql_priv.h: New arguments to open_log() sql/mysqld.cc: Use setrlimit() to ensure that we can write core file if one specifies --core-file Added index file name as parameter to openlog(). Added --slave-compressed-protocol Made 2 the minimum length for ft_min_word_len sql/net_serv.cc: Use new vio_blocking() (The vio_blocking() change was done to make this code more readable) sql/repl_failsafe.cc: Minor code cleanup sql/set_var.cc: Added variables slave_compressed_protocol, foreign_key_checks & unique_checks. sql/set_var.h: Generalization sql/slave.cc: Code cleanup & rewrite. Dont call SELECT VERSION() on check_master_version() New init_slave() code. Ensure that all threads create a THD early. Add locks around manipulation of critical structures Don't retry a command more than master_retry_count times. Write less warnings to the log file (if not started with --log-warnings) Faster flush_relay_log_info() sql/slave.h: More comments Added new arguments to some functions. sql/sql_acl.cc: More DBUG info New parameter to Query_log_event() sql/sql_base.cc: Added some mutex checking. sql/sql_cache.cc: Less not critical debug info sql/sql_class.h: Fix for new log handling. sql/sql_db.cc: Added mutex around remove_db_from_cache() sql/sql_delete.cc: Added missing parameters to changed functions sql/sql_insert.cc: Added missing parameters to changed functions sql/sql_parse.cc: Do an 'end_active_trans()' before 'load_master_data' Changed that SHOW INNODB STATUS requre the SUPER privilege Added new function parameters to new functions sql/sql_rename.cc: Added missing parameters to changed functions sql/sql_repl.cc: Code cleanups / new comments Fix for new find_first_log() calling standard. More DBUG statements. Show binlogs updated to use new IO_CACHE:d index log file. sql/sql_repl.h: New function arguments sql/sql_select.cc: Indentation changes sql/sql_table.cc: Added missing parameters to changed functions Added checking of mutex Added mutex around critical regions. sql/sql_test.cc: Don't use THR_ALARM if the configuration doesn't support it. sql/sql_update.cc: Added missing parameters to changed functions sql/table.cc: Added missing parameters to changed functions vio/vio.c: Extend vio_blocking to return the old state vio/viosocket.c: Extend vio_blocking to return the old state vio/viossl.c: Extend vio_blocking to return the old state
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc560
1 files changed, 338 insertions, 222 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index 7871f536320..84e0f6dd236 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -23,7 +23,6 @@
#include "sql_repl.h"
#include "repl_failsafe.h"
#include <thr_alarm.h>
-#include <my_dir.h>
#include <assert.h>
bool use_slave_mask = 0;
@@ -80,9 +79,15 @@ static int request_table_dump(MYSQL* mysql, const char* db, const char* table);
static int create_table_from_dump(THD* thd, NET* net, const char* db,
const char* table_name);
static int check_master_version(MYSQL* mysql, MASTER_INFO* mi);
-
char* rewrite_db(char* db);
+
+/*
+ Get a bit mask for which threads are running so that we later can
+ restart these threads
+*/
+
+
void init_thread_mask(int* mask,MASTER_INFO* mi,bool inverse)
{
bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
@@ -104,6 +109,7 @@ void init_thread_mask(int* mask,MASTER_INFO* mi,bool inverse)
*mask = tmp_mask;
}
+
void lock_slave_threads(MASTER_INFO* mi)
{
//TODO: see if we can do this without dual mutex
@@ -118,46 +124,45 @@ void unlock_slave_threads(MASTER_INFO* mi)
pthread_mutex_unlock(&mi->run_lock);
}
+
int init_slave()
{
DBUG_ENTER("init_slave");
- // TODO (multi-master): replace this with list initialization
+
+ /*
+ TODO: re-write this to interate through the list of files
+ for multi-master
+ */
active_mi = &main_mi;
- // TODO: the code below is a copy-paste mess - clean it up
/*
- make sure slave thread gets started if server_id is set,
- valid master.info is present, and master_host has not been specified
+ If master_host is not specified, try to read it from the master_info file.
+ If master_host is specified, create the master_info file if it doesn't
+ exists.
*/
- if (server_id && !master_host)
+ if (init_master_info(active_mi,master_info_file,relay_log_info_file,
+ !master_host))
{
- /*
- TODO: re-write this to interate through the list of files
- for multi-master
- */
- char fname[FN_REFLEN+128];
- MY_STAT stat_area;
- fn_format(fname, master_info_file, mysql_data_home, "", 4+16+32);
- if (my_stat(fname, &stat_area, MYF(0)) &&
- !init_master_info(active_mi,master_info_file,relay_log_info_file))
- master_host = active_mi->host;
+ sql_print_error("Warning: failed to initialized master info");
+ DBUG_RETURN(0);
}
- // slave thread
- if (master_host)
+
+ /*
+ make sure slave thread gets started if server_id is set,
+ valid master.info is present, and master_host has not been specified
+ */
+ if (server_id && !master_host && active_mi->host[0])
+ master_host= active_mi->host;
+
+ if (master_host && !opt_skip_slave_start)
{
- if (!opt_skip_slave_start)
- {
- if (start_slave_threads(1 /* need mutex */,
- 0 /* no wait for start*/,
- active_mi,
- master_info_file,
- relay_log_info_file,
- SLAVE_IO | SLAVE_SQL))
- sql_print_error("Warning: Can't create threads to handle slave");
- }
- else if (init_master_info(active_mi, master_info_file,
- relay_log_info_file))
- sql_print_error("Warning: failed to initialized master info");
+ if (start_slave_threads(1 /* need mutex */,
+ 0 /* no wait for start*/,
+ active_mi,
+ master_info_file,
+ relay_log_info_file,
+ SLAVE_IO | SLAVE_SQL))
+ sql_print_error("Warning: Can't create threads to handle slave");
}
DBUG_RETURN(0);
}
@@ -175,7 +180,35 @@ static byte* get_table_key(TABLE_RULE_ENT* e, uint* len,
return (byte*)e->db;
}
-// TODO: check proper initialization of master_log_name/master_log_pos
+
+/*
+ Open the given relay log
+
+ SYNOPSIS
+ init_relay_log_pos()
+ rli Relay information (will be initialized)
+ log Name of relay log file to read from. NULL = First log
+ pos Position in relay log file
+ need_data_lock Set to 1 if this functions should do mutex locks
+ errmsg Store pointer to error message here
+
+ DESCRIPTION
+ - Close old open relay log files.
+ - If we are using the same relay log as the running IO-thread, then set
+ rli->cur_log to point to the same IO_CACHE entry.
+ - If not, open the 'log' binary file.
+
+ TODO
+ - check proper initialization of master_log_name/master_log_pos
+ - We may always want to delete all logs before 'log'.
+ Currently if we are not calling this with 'log' as NULL or the first
+ log we will never delete relay logs.
+ If we want this we should not set skip_log_purge to 1.
+
+ RETURN VALUES
+ 0 ok
+ 1 error. errmsg is set to point to the error message
+*/
int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
ulonglong pos, bool need_data_lock,
@@ -184,13 +217,14 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
DBUG_ENTER("init_relay_log_pos");
*errmsg=0;
- if (rli->log_pos_current)
+ if (rli->log_pos_current) // TODO: When can this happen ?
DBUG_RETURN(0);
pthread_mutex_t *log_lock=rli->relay_log.get_log_lock();
pthread_mutex_lock(log_lock);
if (need_data_lock)
pthread_mutex_lock(&rli->data_lock);
+ /* Close log file and free buffers if it's already open */
if (rli->cur_log_fd >= 0)
{
end_io_cache(&rli->cache_buf);
@@ -198,37 +232,37 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
rli->cur_log_fd = -1;
}
- if (!log)
- log = rli->relay_log_name; // already inited
- if (!pos)
- pos = rli->relay_log_pos; // already inited
- else
- rli->relay_log_pos = pos;
+ rli->relay_log_pos = pos;
/*
Test to see if the previous run was with the skip of purging
If yes, we do not purge when we restart
*/
- if (rli->relay_log.find_first_log(&rli->linfo,""))
+ if (rli->relay_log.find_log_pos(&rli->linfo,NullS))
{
*errmsg="Could not find first log during relay log initialization";
goto err;
}
- if (strcmp(log,rli->linfo.log_file_name))
- rli->skip_log_purge=1;
-
- if (rli->relay_log.find_first_log(&rli->linfo,log))
+
+ if (log) // If not first log
{
- *errmsg="Could not find target log during relay log initialization";
- goto err;
+ if (strcmp(log, rli->linfo.log_file_name))
+ rli->skip_log_purge=1; // Different name; Don't purge
+ if (rli->relay_log.find_log_pos(&rli->linfo, log))
+ {
+ *errmsg="Could not find target log during relay log initialization";
+ goto err;
+ }
}
strmake(rli->relay_log_name,rli->linfo.log_file_name,
sizeof(rli->relay_log_name)-1);
- // to make end_io_cache(&rli->cache_buf) safe in all cases
- if (!rli->inited)
- bzero((char*) &rli->cache_buf, sizeof(IO_CACHE));
if (rli->relay_log.is_active(rli->linfo.log_file_name))
{
+ /*
+ The IO thread is using this log file.
+ In this case, we will use the same IO_CACHE pointer to
+ read data as the IO thread is using to write data.
+ */
if (my_b_tell((rli->cur_log=rli->relay_log.get_log_file())) == 0 &&
check_binlog_magic(rli->cur_log,errmsg))
goto err;
@@ -236,10 +270,9 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
}
else
{
- if (rli->inited)
- end_io_cache(&rli->cache_buf);
- if (rli->cur_log_fd >= 0)
- my_close(rli->cur_log_fd,MYF(MY_WME));
+ /*
+ Open the relay log and set rli->cur_log to point at this one
+ */
if ((rli->cur_log_fd=open_binlog(&rli->cache_buf,
rli->linfo.log_file_name,errmsg)) < 0)
goto err;
@@ -257,6 +290,7 @@ err:
DBUG_RETURN ((*errmsg) ? 1 : 0);
}
+
/* called from get_options() in mysqld.cc on start-up */
void init_slave_skip_errors(const char* arg)
@@ -294,12 +328,13 @@ void init_slave_skip_errors(const char* arg)
are not running
*/
-int purge_relay_logs(RELAY_LOG_INFO* rli, bool just_reset, const char** errmsg)
+int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset,
+ const char** errmsg)
{
+ int error=0;
DBUG_ENTER("purge_relay_logs");
if (!rli->inited)
DBUG_RETURN(0); /* successfully do nothing */
- int error=0;
DBUG_ASSERT(rli->slave_running == 0);
DBUG_ASSERT(rli->mi->slave_running == 0);
@@ -309,14 +344,14 @@ int purge_relay_logs(RELAY_LOG_INFO* rli, bool just_reset, const char** errmsg)
rli->pending=0;
rli->master_log_name[0]=0;
rli->master_log_pos=0; // 0 means uninitialized
- if (rli->relay_log.reset_logs(rli->sql_thd) ||
- rli->relay_log.find_first_log(&rli->linfo,""))
+ if (rli->relay_log.reset_logs(thd))
{
*errmsg = "Failed during log reset";
error=1;
goto err;
}
- strmake(rli->relay_log_name,rli->linfo.log_file_name,
+ /* Save name of used relay log file */
+ strmake(rli->relay_log_name, rli->relay_log.get_log_fname(),
sizeof(rli->relay_log_name)-1);
// Just first log with magic number and nothing else
rli->log_space_total= BIN_LOG_HEADER_SIZE;
@@ -324,7 +359,8 @@ int purge_relay_logs(RELAY_LOG_INFO* rli, bool just_reset, const char** errmsg)
rli->relay_log.reset_bytes_written();
rli->log_pos_current=0;
if (!just_reset)
- error = init_relay_log_pos(rli,0,0,0 /* do not need data lock */,errmsg);
+ error= init_relay_log_pos(rli, rli->relay_log_name, rli->relay_log_pos,
+ 0 /* do not need data lock */, errmsg);
err:
#ifndef DBUG_OFF
@@ -335,6 +371,7 @@ err:
DBUG_RETURN(error);
}
+
int terminate_slave_threads(MASTER_INFO* mi,int thread_mask,bool skip_lock)
{
if (!mi->inited)
@@ -374,6 +411,7 @@ int terminate_slave_threads(MASTER_INFO* mi,int thread_mask,bool skip_lock)
return 0;
}
+
int terminate_slave_thread(THD* thd, pthread_mutex_t* term_lock,
pthread_mutex_t *cond_lock,
pthread_cond_t* term_cond,
@@ -417,6 +455,7 @@ int terminate_slave_thread(THD* thd, pthread_mutex_t* term_lock,
return 0;
}
+
int start_slave_thread(pthread_handler h_func, pthread_mutex_t* start_lock,
pthread_mutex_t *cond_lock,
pthread_cond_t* start_cond,
@@ -438,13 +477,13 @@ int start_slave_thread(pthread_handler h_func, pthread_mutex_t* start_lock,
}
if (*slave_running)
- {
- if (start_cond)
- pthread_cond_broadcast(start_cond);
- if (start_lock)
- pthread_mutex_unlock(start_lock);
- return ER_SLAVE_MUST_STOP;
- }
+ {
+ if (start_cond)
+ pthread_cond_broadcast(start_cond);
+ if (start_lock)
+ pthread_mutex_unlock(start_lock);
+ return ER_SLAVE_MUST_STOP;
+ }
if (pthread_create(&th, &connection_attrib, h_func, (void*)mi))
{
if (start_lock)
@@ -457,7 +496,7 @@ int start_slave_thread(pthread_handler h_func, pthread_mutex_t* start_lock,
while (!*slave_running)
{
const char* old_msg = thd->enter_cond(start_cond,cond_lock,
- "Waiting for slave thread to start");
+ "Waiting for slave thread to start");
pthread_cond_wait(start_cond,cond_lock);
thd->exit_cond(old_msg);
/*
@@ -506,8 +545,6 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
lock_cond_io = &mi->run_lock;
lock_cond_sql = &mi->rli.run_lock;
}
- if (init_master_info(mi,master_info_fname,slave_info_fname))
- DBUG_RETURN(ER_MASTER_INFO);
if (thread_mask & SLAVE_IO)
error=start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
@@ -660,6 +697,7 @@ void end_slave()
free_string_array(&replicate_wild_ignore_table);
}
+
static bool io_slave_killed(THD* thd, MASTER_INFO* mi)
{
DBUG_ASSERT(mi->io_thd == thd);
@@ -667,6 +705,7 @@ static bool io_slave_killed(THD* thd, MASTER_INFO* mi)
return mi->abort_slave || abort_loop || thd->killed;
}
+
static bool sql_slave_killed(THD* thd, RELAY_LOG_INFO* rli)
{
DBUG_ASSERT(rli->sql_thd == thd);
@@ -674,6 +713,7 @@ static bool sql_slave_killed(THD* thd, RELAY_LOG_INFO* rli)
return rli->abort_slave || abort_loop || thd->killed;
}
+
void slave_print_error(RELAY_LOG_INFO* rli, int err_code, const char* msg, ...)
{
va_list args;
@@ -685,6 +725,7 @@ void slave_print_error(RELAY_LOG_INFO* rli, int err_code, const char* msg, ...)
rli->last_slave_errno = err_code;
}
+
void skip_load_data_infile(NET* net)
{
(void)my_net_write(net, "\xfb/dev/null", 10);
@@ -693,6 +734,7 @@ void skip_load_data_infile(NET* net)
send_ok(net); // the master expects it
}
+
char* rewrite_db(char* db)
{
if (replicate_rewrite_db.is_empty() || !db)
@@ -744,11 +786,11 @@ int db_ok(const char* db, I_List<i_string> &do_list,
if (!strcmp(tmp->ptr, db))
return 0; // match
}
-
return 1;
}
}
+
static int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
const char *default_val)
{
@@ -777,6 +819,7 @@ static int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
return 1;
}
+
static int init_intvar_from_file(int* var, IO_CACHE* f, int default_val)
{
char buf[32];
@@ -794,32 +837,12 @@ static int init_intvar_from_file(int* var, IO_CACHE* f, int default_val)
return 1;
}
+
static int check_master_version(MYSQL* mysql, MASTER_INFO* mi)
{
- MYSQL_RES* res;
- MYSQL_ROW row;
- const char* version;
- const char* errmsg = 0;
+ const char* errmsg= 0;
- if (mc_mysql_query(mysql, "SELECT VERSION()", 0)
- || !(res = mc_mysql_store_result(mysql)))
- {
- sql_print_error("Error checking master version: %s",
- mc_mysql_error(mysql));
- return 1;
- }
- if (!(row = mc_mysql_fetch_row(res)))
- {
- errmsg = "Master returned no rows for SELECT VERSION()";
- goto err;
- }
- if (!(version = row[0]))
- {
- errmsg = "Master reported NULL for the version";
- goto err;
- }
-
- switch (*version) {
+ switch (*mysql->server_version) {
case '3':
mi->old_format = 1;
break;
@@ -829,11 +852,9 @@ static int check_master_version(MYSQL* mysql, MASTER_INFO* mi)
break;
default:
errmsg = "Master reported unrecognized MySQL version";
- goto err;
+ break;
}
err:
- if (res)
- mc_mysql_free_result(res);
if (errmsg)
{
sql_print_error(errmsg);
@@ -986,30 +1007,36 @@ int fetch_master_table(THD* thd, const char* db_name, const char* table_name,
return error;
}
+
void end_master_info(MASTER_INFO* mi)
{
+ DBUG_ENTER("end_master_info");
+
if (!mi->inited)
- return;
+ DBUG_VOID_RETURN;
end_relay_log_info(&mi->rli);
if (mi->fd >= 0)
- {
- end_io_cache(&mi->file);
- (void)my_close(mi->fd, MYF(MY_WME));
- mi->fd = -1;
- }
+ {
+ end_io_cache(&mi->file);
+ (void)my_close(mi->fd, MYF(MY_WME));
+ mi->fd = -1;
+ }
mi->inited = 0;
+
+ DBUG_VOID_RETURN;
}
+
int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
{
- DBUG_ENTER("init_relay_log_info");
MY_STAT stat_area;
char fname[FN_REFLEN+128];
int info_fd;
const char* msg = 0;
int error = 0;
+ DBUG_ENTER("init_relay_log_info");
- if (rli->inited)
+ if (rli->inited) // Set if this function called
DBUG_RETURN(0);
fn_format(fname, info_fname, mysql_data_home, "", 4+32);
pthread_mutex_lock(&rli->data_lock);
@@ -1022,6 +1049,7 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
rli->skip_log_purge=0;
rli->log_space_limit = relay_log_space_limit;
rli->log_space_total = 0;
+
// TODO: make this work with multi-master
if (!opt_relay_logname)
{
@@ -1034,11 +1062,12 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
strmov(strcend(tmp,'.'),"-relay-bin");
opt_relay_logname=my_strdup(tmp,MYF(MY_WME));
}
- rli->relay_log.set_index_file_name(opt_relaylog_index_name);
- open_log(&rli->relay_log, glob_hostname, opt_relay_logname, "-relay-bin",
- LOG_BIN, 1 /* read_append cache */,
- 1 /* no auto events */);
-
+ if (open_log(&rli->relay_log, glob_hostname, opt_relay_logname,
+ "-relay-bin", opt_relaylog_index_name,
+ LOG_BIN, 1 /* read_append cache */,
+ 1 /* no auto events */))
+ DBUG_RETURN(1);
+
/* if file does not exist */
if (!my_stat(fname, &stat_area, MYF(0)))
{
@@ -1052,13 +1081,12 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
MYF(MY_WME)))
{
- if (info_fd >= 0)
- my_close(info_fd, MYF(0));
- rli->info_fd= -1;
- pthread_mutex_unlock(&rli->data_lock);
- DBUG_RETURN(1);
+ msg= current_thd->net.last_error;
+ goto err;
}
- if (init_relay_log_pos(rli,"",BIN_LOG_HEADER_SIZE,0 /*no data mutex*/,
+
+ /* Init relay log with first entry in the relay index file */
+ if (init_relay_log_pos(rli,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
&msg))
goto err;
rli->master_log_pos = 0; // uninitialized
@@ -1075,6 +1103,7 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
if (info_fd >= 0)
my_close(info_fd, MYF(0));
rli->info_fd= -1;
+ rli->relay_log.close(1);
pthread_mutex_unlock(&rli->data_lock);
DBUG_RETURN(1);
}
@@ -1097,38 +1126,42 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
rli->relay_log_pos= relay_log_pos;
rli->master_log_pos= master_log_pos;
- if (init_relay_log_pos(rli,0 /* log already inited */,
- 0 /* pos already inited */,
+ if (init_relay_log_pos(rli,
+ rli->relay_log_name,
+ rli->relay_log_pos,
0 /* no data lock*/,
&msg))
- goto err;
+ goto err;
}
DBUG_ASSERT(rli->relay_log_pos >= BIN_LOG_HEADER_SIZE);
DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos);
- rli->inited = 1;
/*
Now change the cache from READ to WRITE - must do this
before flush_relay_log_info
*/
reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1);
- error=test(flush_relay_log_info(rli));
+ error= flush_relay_log_info(rli);
if (count_relay_log_space(rli))
{
msg="Error counting relay log space";
goto err;
}
+ rli->inited= 1;
pthread_mutex_unlock(&rli->data_lock);
DBUG_RETURN(error);
err:
sql_print_error(msg);
end_io_cache(&rli->info_file);
- my_close(info_fd, MYF(0));
+ if (info_fd >= 0)
+ my_close(info_fd, MYF(0));
rli->info_fd= -1;
+ rli->relay_log.close(1);
pthread_mutex_unlock(&rli->data_lock);
DBUG_RETURN(1);
}
+
static inline int add_relay_log(RELAY_LOG_INFO* rli,LOG_INFO* linfo)
{
MY_STAT s;
@@ -1142,11 +1175,12 @@ static inline int add_relay_log(RELAY_LOG_INFO* rli,LOG_INFO* linfo)
rli->log_space_total += s.st_size;
#ifndef DBUG_OFF
char buf[22];
-#endif
DBUG_PRINT("info",("log_space_total: %s", llstr(rli->log_space_total,buf)));
+#endif
DBUG_RETURN(0);
}
+
static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli)
{
bool slave_killed=0;
@@ -1174,7 +1208,7 @@ static int count_relay_log_space(RELAY_LOG_INFO* rli)
LOG_INFO linfo;
DBUG_ENTER("count_relay_log_space");
rli->log_space_total = 0;
- if (rli->relay_log.find_first_log(&linfo,""))
+ if (rli->relay_log.find_log_pos(&linfo, NullS))
{
sql_print_error("Could not find first log while counting relay log space");
DBUG_RETURN(1);
@@ -1189,18 +1223,15 @@ static int count_relay_log_space(RELAY_LOG_INFO* rli)
int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
- const char* slave_info_fname)
+ const char* slave_info_fname,
+ bool abort_if_no_master_info_file)
{
int fd,error;
- MY_STAT stat_area;
char fname[FN_REFLEN+128];
DBUG_ENTER("init_master_info");
if (mi->inited)
DBUG_RETURN(0);
- if (init_relay_log_info(&mi->rli, slave_info_fname))
- DBUG_RETURN(1);
- mi->rli.mi = mi;
mi->mysql=0;
mi->file_id=1;
mi->ignore_stop_event=0;
@@ -1214,9 +1245,13 @@ int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
pthread_mutex_lock(&mi->data_lock);
fd = mi->fd;
- // we do not want any messages if the file does not exist
- if (!my_stat(fname, &stat_area, MYF(0)))
+ if (access(fname,F_OK))
{
+ if (abort_if_no_master_info_file)
+ {
+ pthread_mutex_unlock(&mi->data_lock);
+ DBUG_RETURN(0);
+ }
/*
if someone removed the file from underneath our feet, just close
the old descriptor and re-create the old file
@@ -1281,6 +1316,11 @@ int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
DBUG_PRINT("master_info",("log_file_name: %s position: %ld",
mi->master_log_name,
(ulong) mi->master_log_pos));
+
+ if (init_relay_log_info(&mi->rli, slave_info_fname))
+ goto err;
+ mi->rli.mi = mi;
+
mi->inited = 1;
// now change cache READ -> WRITE - must do this before flush_master_info
reinit_io_cache(&mi->file, WRITE_CACHE,0L,0,1);
@@ -1289,7 +1329,6 @@ int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
DBUG_RETURN(error);
err:
- end_relay_log_info(&mi->rli);
if (fd >= 0)
{
my_close(fd, MYF(0));
@@ -1300,6 +1339,7 @@ err:
DBUG_RETURN(1);
}
+
int register_slave_on_master(MYSQL* mysql)
{
String packet;
@@ -1405,7 +1445,8 @@ int show_master_info(THD* thd, MASTER_INFO* mi)
DBUG_RETURN(0);
}
-int flush_master_info(MASTER_INFO* mi)
+
+bool flush_master_info(MASTER_INFO* mi)
{
IO_CACHE* file = &mi->file;
char lbuf[22];
@@ -1422,14 +1463,17 @@ int flush_master_info(MASTER_INFO* mi)
DBUG_RETURN(0);
}
+
int st_relay_log_info::wait_for_pos(THD* thd, String* log_name,
ulonglong log_pos)
{
- if (!inited) return -1;
+ if (!inited)
+ return -1;
bool pos_reached = 0;
int event_count = 0;
pthread_mutex_lock(&data_lock);
abort_pos_wait=0; // abort only if master info changes during wait
+
while (!thd->killed || !abort_pos_wait)
{
int cmp_result;
@@ -1471,6 +1515,7 @@ int st_relay_log_info::wait_for_pos(THD* thd, String* log_name,
return thd->killed ? -1 : event_count;
}
+
static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
{
DBUG_ENTER("init_slave_thread");
@@ -1529,7 +1574,7 @@ static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
{
int nap_time = (int) (end_time - start_time);
/*
- the only reason we are asking for alarm is so that
+ The only reason we are asking for alarm is so that
we will be woken up in case of murder, so if we do not get killed,
set the alarm so it goes off after we wake up naturally
*/
@@ -1580,6 +1625,7 @@ static int request_dump(MYSQL* mysql, MASTER_INFO* mi,
return 0;
}
+
static int request_table_dump(MYSQL* mysql, const char* db, const char* table)
{
char buf[1024];
@@ -1608,6 +1654,7 @@ command");
return 0;
}
+
/*
read one event from the master
@@ -1619,6 +1666,7 @@ command");
try a reconnect. We do not want to print anything to
the error log in this case because this a anormal
event in an idle server.
+
RETURN VALUES
'packet_error' Error
number Length of packet
@@ -1627,20 +1675,21 @@ command");
static ulong read_event(MYSQL* mysql, MASTER_INFO *mi, bool* suppress_warnings)
{
- ulong len = packet_error;
+ ulong len;
+ *suppress_warnings= 0;
/*
my_real_read() will time us out
We check if we were told to die, and if not, try reading again
+
+ TODO: Move 'events_till_disconnect' to the MASTER_INFO structure
*/
#ifndef DBUG_OFF
if (disconnect_slave_event_count && !(events_till_disconnect--))
return packet_error;
#endif
- *suppress_warnings= 0;
len = mc_net_safe_read(mysql);
-
if (len == packet_error || (long) len < 1)
{
if (mc_mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
@@ -1694,6 +1743,7 @@ point. If you are sure that your master is ok, run this query manually on the\
}
}
+
static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
{
DBUG_ASSERT(rli->sql_thd==thd);
@@ -1706,7 +1756,15 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
int type_code = ev->get_type_code();
int exec_res;
pthread_mutex_lock(&rli->data_lock);
- if (ev->server_id == ::server_id ||
+
+ /*
+ Skip queries originating from this server or number of
+ queries specified by the user in slave_skip_counter
+ We can't however skip event's that has something to do with the
+ log files themselves.
+ */
+
+ if (ev->server_id == (uint32) ::server_id ||
(rli->slave_skip_counter && type_code != ROTATE_EVENT))
{
/* TODO: I/O thread should not even log events with the same server id */
@@ -1750,6 +1808,7 @@ This may also be a network problem, or just a bug in the master or slave code.\
}
}
+
/* slave I/O thread */
pthread_handler_decl(handle_slave_io,arg)
{
@@ -1760,7 +1819,7 @@ slave_begin:
MYSQL *mysql = NULL ;
MASTER_INFO* mi = (MASTER_INFO*)arg;
char llbuff[22];
- bool retried_once = 0;
+ uint retry_count= 0;
ulonglong last_failed_pos = 0; // TODO: see if last_failed_pos is needed
DBUG_ASSERT(mi->inited);
@@ -1771,7 +1830,7 @@ slave_begin:
// needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
my_thread_init();
- thd = new THD; // note that contructor of THD uses DBUG_ !
+ thd= new THD; // note that contructor of THD uses DBUG_ !
DBUG_ENTER("handle_slave_io");
THD_CHECK_SENTRY(thd);
@@ -1785,12 +1844,13 @@ slave_begin:
}
mi->io_thd = thd;
thd->thread_stack = (char*)&thd; // remember where our stack is
- thd->store_globals();
+ pthread_mutex_lock(&LOCK_thread_count);
threads.append(thd);
+ pthread_mutex_unlock(&LOCK_thread_count);
mi->slave_running = 1;
mi->abort_slave = 0;
- pthread_cond_broadcast(&mi->start_cond);
pthread_mutex_unlock(&mi->run_lock);
+ pthread_cond_broadcast(&mi->start_cond);
DBUG_PRINT("master_info",("log_file_name: '%s' position: %s",
mi->master_log_name,
@@ -1804,9 +1864,6 @@ slave_begin:
thd->proc_info = "connecting to master";
-#ifndef DBUG_OFF
- sql_print_error("Slave I/O thread initialized");
-#endif
// we can get killed during safe_connect
if (!safe_connect(thd, mysql, mi))
sql_print_error("Slave I/O thread: connected to master '%s@%s:%d',\
@@ -1859,12 +1916,13 @@ dump");
right away - if first time fails, sleep between re-tries
hopefuly the admin can fix the problem sometime
*/
- if (retried_once)
+ if (retry_count++)
+ {
+ if (retry_count > master_retry_count)
+ goto err; // Don't retry forever
safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
(void*)mi);
- else
- retried_once = 1;
-
+ }
if (io_slave_killed(thd,mi))
{
sql_print_error("Slave I/O thread killed while retrying master \
@@ -1895,7 +1953,8 @@ after reconnect");
ulong event_len = read_event(mysql, mi, &suppress_warnings);
if (io_slave_killed(thd,mi))
{
- sql_print_error("Slave I/O thread killed while reading event");
+ if (global_system_variables.log_warnings)
+ sql_print_error("Slave I/O thread killed while reading event");
goto err;
}
@@ -1903,24 +1962,27 @@ after reconnect");
{
if (mc_mysql_errno(mysql) == ER_NET_PACKET_TOO_LARGE)
{
- sql_print_error("Log entry on master is longer than \
-max_allowed_packet (%ld) on slave. Slave thread will be aborted. If the entry \
-is correct, restart the server with a higher value of max_allowed_packet",
+ sql_print_error("\
+Log entry on master is longer than max_allowed_packet (%ld) on \
+slave. If the entry is correct, restart the server with a higher value of \
+max_allowed_packet",
thd->variables.max_allowed_packet);
goto err;
}
thd->proc_info = "Waiting to reconnect after a failed read";
mc_end_server(mysql);
- if (retried_once) // punish repeat offender with sleep
+ if (retry_count++)
+ {
+ if (retry_count > master_retry_count)
+ goto err; // Don't retry forever
safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
- (void*)mi);
- else
- retried_once = 1;
-
+ (void*) mi);
+ }
if (io_slave_killed(thd,mi))
{
- sql_print_error("Slave I/O thread killed while waiting to \
+ if (global_system_variables.log_warnings)
+ sql_print_error("Slave I/O thread killed while waiting to \
reconnect after a failed read");
goto err;
}
@@ -1932,19 +1994,20 @@ reconnecting to retry, log '%s' position %s", IO_RPL_LOG_NAME,
if (safe_reconnect(thd, mysql, mi, suppress_warnings) ||
io_slave_killed(thd,mi))
{
- sql_print_error("Slave I/O thread killed during or after a \
+ if (global_system_variables.log_warnings)
+ sql_print_error("Slave I/O thread killed during or after a \
reconnect done to recover from failed read");
goto err;
}
goto connected;
} // if (event_len == packet_error)
+ retry_count=0; // ok event, reset retry counter
thd->proc_info = "Queueing event from master";
if (queue_event(mi,(const char*)mysql->net.read_pos + 1,
event_len))
{
- sql_print_error("Slave I/O thread could not queue event \
-from master");
+ sql_print_error("Slave I/O thread could not queue event from master");
goto err;
}
flush_master_info(mi);
@@ -1963,9 +2026,9 @@ log space");
sql_print_error("Slave I/O thread: debugging abort");
goto err;
}
-#endif
+#endif
}
- }
+ }
// error = 0;
err:
@@ -2010,7 +2073,7 @@ pthread_handler_decl(handle_slave_sql,arg)
#ifndef DBUG_OFF
slave_begin:
#endif
- THD *thd; /* needs to be first for thread_stack */
+ THD *thd; /* needs to be first for thread_stack */
MYSQL *mysql = NULL ;
bool retried_once = 0;
ulonglong last_failed_pos = 0; // TODO: see if this can be removed
@@ -2024,13 +2087,12 @@ slave_begin:
rli->events_till_abort = abort_slave_event_count;
#endif
-
// needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
my_thread_init();
thd = new THD; // note that contructor of THD uses DBUG_ !
DBUG_ENTER("handle_slave_sql");
+
THD_CHECK_SENTRY(thd);
-
pthread_detach_this_thread();
if (init_slave_thread(thd, SLAVE_THD_SQL))
{
@@ -2043,24 +2105,28 @@ slave_begin:
sql_print_error("Failed during slave thread initialization");
goto err;
}
- THD_CHECK_SENTRY(thd);
- thd->thread_stack = (char*)&thd; // remember where our stack is
+ rli->sql_thd= thd;
thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
- thd->store_globals();
+ thd->thread_stack = (char*)&thd; // remember where our stack is
+ pthread_mutex_lock(&LOCK_thread_count);
threads.append(thd);
- rli->sql_thd = thd;
+ pthread_mutex_unlock(&LOCK_thread_count);
rli->slave_running = 1;
rli->abort_slave = 0;
- pthread_cond_broadcast(&rli->start_cond);
pthread_mutex_unlock(&rli->run_lock);
+ pthread_cond_broadcast(&rli->start_cond);
// This should always be set to 0 when the slave thread is started
rli->pending = 0;
- if (init_relay_log_pos(rli,0,0,1 /*need data lock*/, &errmsg))
+ if (init_relay_log_pos(rli,
+ rli->relay_log_name,
+ rli->relay_log_pos,
+ 1 /*need data lock*/, &errmsg))
{
sql_print_error("Error initializing relay log position: %s",
errmsg);
goto err;
}
+ THD_CHECK_SENTRY(thd);
DBUG_ASSERT(rli->relay_log_pos >= BIN_LOG_HEADER_SIZE);
DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos);
DBUG_ASSERT(rli->sql_thd == thd);
@@ -2068,11 +2134,14 @@ slave_begin:
DBUG_PRINT("master_info",("log_file_name: %s position: %s",
rli->master_log_name,
llstr(rli->master_log_pos,llbuff)));
-
- sql_print_error("Slave SQL thread initialized, starting replication in \
+ if (global_system_variables.log_warnings)
+ sql_print_error("Slave SQL thread initialized, starting replication in \
log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
- llstr(rli->master_log_pos,llbuff),rli->relay_log_name,
- llstr(rli->relay_log_pos,llbuff1));
+ llstr(rli->master_log_pos,llbuff),rli->relay_log_name,
+ llstr(rli->relay_log_pos,llbuff1));
+
+ /* Read queries from the IO/THREAD until this thread is killed */
+
while (!sql_slave_killed(thd,rli))
{
thd->proc_info = "Processing master log event";
@@ -2089,14 +2158,14 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
RPL_LOG_NAME, llstr(rli->master_log_pos, llbuff));
goto err;
}
- } // while (!sql_slave_killed(thd,rli)) - read/exec loop
+ }
- // error = 0;
- err:
- // print the current replication position
+ /* Thread stopped. Print the current replication position to the log */
sql_print_error("Slave SQL thread exiting, replication stopped in log \
'%s' at position %s",
RPL_LOG_NAME, llstr(rli->master_log_pos,llbuff));
+
+ err:
thd->query = thd->db = 0; // extra safety
thd->proc_info = "Waiting for slave mutex on exit";
pthread_mutex_lock(&rli->run_lock);
@@ -2114,7 +2183,7 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
net_end(&thd->net); // destructor will not free it, because we are weird
DBUG_ASSERT(rli->sql_thd == thd);
THD_CHECK_SENTRY(thd);
- rli->sql_thd = 0;
+ rli->sql_thd= 0;
pthread_mutex_lock(&LOCK_thread_count);
THD_CHECK_SENTRY(thd);
delete thd;
@@ -2403,12 +2472,14 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
void end_relay_log_info(RELAY_LOG_INFO* rli)
{
+ DBUG_ENTER("end_relay_log_info");
+
if (!rli->inited)
- return;
+ DBUG_VOID_RETURN;
if (rli->info_fd >= 0)
{
end_io_cache(&rli->info_file);
- (void)my_close(rli->info_fd, MYF(MY_WME));
+ (void) my_close(rli->info_fd, MYF(MY_WME));
rli->info_fd = -1;
}
if (rli->cur_log_fd >= 0)
@@ -2420,6 +2491,7 @@ void end_relay_log_info(RELAY_LOG_INFO* rli)
rli->inited = 0;
rli->log_pos_current=0;
rli->relay_log.close(1);
+ DBUG_VOID_RETURN;
}
/* try to connect until successful or slave killed */
@@ -2441,48 +2513,54 @@ static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
int last_errno= -2; // impossible error
ulong err_count=0;
char llbuff[22];
+ DBUG_ENTER("connect_to_master");
#ifndef DBUG_OFF
events_till_disconnect = disconnect_slave_event_count;
#endif
+ uint client_flag=0;
+ if (opt_slave_compressed_protocol)
+ client_flag=CLIENT_COMPRESS; /* We will use compression */
+
while (!(slave_was_killed = io_slave_killed(thd,mi)) &&
- (reconnect ? mc_mysql_reconnect(mysql) != 0 :
+ (reconnect ? mc_mysql_reconnect(mysql) != 0:
!mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
- mi->port, 0, 0,
+ mi->port, 0, client_flag,
thd->variables.net_read_timeout)))
{
/* Don't repeat last error */
if (mc_mysql_errno(mysql) != last_errno)
{
+ last_errno=mc_mysql_errno(mysql);
suppress_warnings= 0;
sql_print_error("Slave I/O thread: error connecting to master \
'%s@%s:%d': \
-%s, last_errno=%d, retry in %d sec",mi->user,mi->host,mi->port,
- mc_mysql_error(mysql), last_errno=mc_mysql_errno(mysql),
+Error: '%s' errno: %d retry-time: %d",mi->user,mi->host,mi->port,
+ mc_mysql_error(mysql), last_errno,
mi->connect_retry);
}
- safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
- (void*)mi);
/*
By default we try forever. The reason is that failure will trigger
master election, so if the user did not set master_retry_count we
- do not want to have electioin triggered on the first failure to
+ do not want to have election triggered on the first failure to
connect
*/
- if (master_retry_count && err_count++ == master_retry_count)
+ if (++err_count == master_retry_count)
{
slave_was_killed=1;
if (reconnect)
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER);
break;
}
+ safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
+ (void*)mi);
}
if (!slave_was_killed)
{
if (reconnect)
{
- if (!suppress_warnings)
+ if (!suppress_warnings && global_system_variables.log_warnings)
sql_print_error("Slave: connected to master '%s@%s:%d',\
replication resumed in log '%s' at position %s", mi->user,
mi->host, mi->port,
@@ -2499,8 +2577,8 @@ replication resumed in log '%s' at position %s", mi->user,
thd->set_active_vio(mysql->net.vio);
#endif
}
-
- return slave_was_killed;
+ DBUG_PRINT("exit",("slave_was_killed: %d", slave_was_killed));
+ DBUG_RETURN(slave_was_killed);
}
@@ -2516,19 +2594,61 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
}
-int flush_relay_log_info(RELAY_LOG_INFO* rli)
+/*
+ Store the file and position where the execute-slave thread are in the
+ relay log.
+
+ SYNOPSIS
+ flush_relay_log_info()
+ rli Relay log information
+
+ NOTES
+ - As this is only called by the slave thread, we don't need to
+ have a lock on this.
+ - If there is an active transaction, then we don't update the position
+ in the relay log. This is to ensure that we re-execute statements
+ if we die in the middle of an transaction that was rolled back.
+ - As a transaction never spans binary logs, we don't have to handle the
+ case where we do a relay-log-rotation in the middle of the transaction.
+ If this would not be the case, we would have to ensure that we
+ don't delete the relay log file where the transaction started when
+ we switch to a new relay log file.
+
+ TODO
+ - Change the log file information to a binary format to avoid calling
+ longlong2str.
+
+ RETURN VALUES
+ 0 ok
+ 1 write error
+*/
+
+bool flush_relay_log_info(RELAY_LOG_INFO* rli)
{
- register IO_CACHE* file = &rli->info_file;
- char lbuf[22],lbuf1[22];
-
+ bool error=0;
+ IO_CACHE *file = &rli->info_file;
+ char buff[FN_REFLEN*2+22*2+4], *pos;
+
+ /* sql_thd is not set when calling from init_slave() */
+ if ((rli->sql_thd && rli->sql_thd->options & OPTION_BEGIN))
+ return 0; // Wait for COMMIT
+
my_b_seek(file, 0L);
- my_b_printf(file, "%s\n%s\n%s\n%s\n",
- rli->relay_log_name, llstr(rli->relay_log_pos, lbuf),
- rli->master_log_name, llstr(rli->master_log_pos, lbuf1)
- );
- flush_io_cache(file);
- flush_io_cache(rli->cur_log);
- return 0;
+ pos=strmov(buff, rli->relay_log_name);
+ *pos++='\n';
+ pos=longlong2str(rli->relay_log_pos, pos, 10);
+ *pos++='\n';
+ pos=strmov(pos, rli->master_log_name);
+ *pos++='\n';
+ pos=longlong2str(rli->master_log_pos, pos, 10);
+ *pos='\n';
+ if (my_b_write(file, buff, (ulong) (pos-buff)+1))
+ error=1;
+ if (flush_io_cache(file))
+ error=1;
+ if (flush_io_cache(rli->cur_log)) // QQ Why this call ?
+ error=1;
+ return error;
}
@@ -2644,12 +2764,8 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
update. If we do not, show slave status will block
*/
pthread_mutex_unlock(&rli->data_lock);
-
- /*
- IMPORTANT: note that wait_for_update will unlock lock_log, but
- expects the caller to lock it
- */
rli->relay_log.wait_for_update(rli->sql_thd);
+ pthread_mutex_unlock(log_lock);
// re-acquire data lock since we released it earlier
pthread_mutex_lock(&rli->data_lock);
@@ -2681,13 +2797,13 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
else
{
/*
- TODO: verify that no lock is ok here. At this point, if we
- get this wrong, this is actually no big deal - the only time
- this code will ever be executed is if we are recovering from
- a bug when a full reload of the slave is not feasible or
- desirable.
+ If hot_log is set, then we already have a lock on
+ LOCK_log. If not, we have to get the lock.
+
+ According to Sasha, the only time this code will ever be executed
+ is if we are recovering from a bug.
*/
- if (rli->relay_log.find_next_log(&rli->linfo,0/*no lock*/))
+ if (rli->relay_log.find_next_log(&rli->linfo, !hot_log))
{
errmsg = "error switching to the next log";
goto err;
@@ -2746,16 +2862,16 @@ event(errno: %d cur_log->error: %d)",
my_b_seek(cur_log,rli->relay_log_pos+rli->pending);
/* otherwise, we have had a partial read */
errmsg = "Aborting slave SQL thread because of partial event read";
- /* TODO; see if there is a way to do this without this goto */
- goto err;
+ break; // To end of function
}
}
- if (!errmsg && was_killed)
+ if (!errmsg && global_system_variables.log_warnings)
errmsg = "slave SQL thread was killed";
err:
pthread_mutex_unlock(&rli->data_lock);
- sql_print_error("Error reading relay log event: %s", errmsg);
+ if (errmsg)
+ sql_print_error("Error reading relay log event: %s", errmsg);
DBUG_RETURN(0);
}