summaryrefslogtreecommitdiff
path: root/sql/rpl_mi.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/rpl_mi.cc')
-rw-r--r--sql/rpl_mi.cc138
1 files changed, 125 insertions, 13 deletions
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc
index 5e46837e948..e83e0ad0ba9 100644
--- a/sql/rpl_mi.cc
+++ b/sql/rpl_mi.cc
@@ -26,17 +26,21 @@
int init_intvar_from_file(int* var, IO_CACHE* f, int default_val);
int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
const char *default_val);
+int init_floatvar_from_file(float* var, IO_CACHE* f, float default_val);
+int init_dynarray_intvar_from_file(DYNAMIC_ARRAY* arr, IO_CACHE* f);
-Master_info::Master_info()
+Master_info::Master_info(bool is_slave_recovery)
:Slave_reporting_capability("I/O"),
ssl(0), ssl_verify_server_cert(0), fd(-1), io_thd(0), inited(0),
- abort_slave(0),slave_running(0),
- slave_run_id(0)
+ rli(is_slave_recovery), abort_slave(0), slave_running(0),
+ slave_run_id(0), sync_counter(0),
+ heartbeat_period(0), received_heartbeats(0), master_id(0)
{
host[0] = 0; user[0] = 0; password[0] = 0;
ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0;
ssl_cipher[0]= 0; ssl_key[0]= 0;
+ my_init_dynamic_array(&ignore_server_ids, sizeof(::server_id), 16, 16);
bzero((char*) &file, sizeof(file));
pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
@@ -47,6 +51,7 @@ Master_info::Master_info()
Master_info::~Master_info()
{
+ delete_dynamic(&ignore_server_ids);
pthread_mutex_destroy(&run_lock);
pthread_mutex_destroy(&data_lock);
pthread_cond_destroy(&data_cond);
@@ -54,6 +59,43 @@ Master_info::~Master_info()
pthread_cond_destroy(&stop_cond);
}
+/**
+ A comparison function to be supplied as argument to @c sort_dynamic()
+ and @c bsearch()
+
+ @return -1 if first argument is less, 0 if it equal to, 1 if it is greater
+ than the second
+*/
+int change_master_server_id_cmp(ulong *id1, ulong *id2)
+{
+ return *id1 < *id2? -1 : (*id1 > *id2? 1 : 0);
+}
+
+
+/**
+ Reports if the s_id server has been configured to ignore events
+ it generates with
+
+ CHANGE MASTER IGNORE_SERVER_IDS= ( list of server ids )
+
+ Method is called from the io thread event receiver filtering.
+
+ @param s_id the master server identifier
+
+ @retval TRUE if s_id is in the list of ignored master servers,
+ @retval FALSE otherwise.
+ */
+bool Master_info::shall_ignore_server_id(ulong s_id)
+{
+ if (likely(ignore_server_ids.elements == 1))
+ return (* (ulong*) dynamic_array_ptr(&ignore_server_ids, 0)) == s_id;
+ else
+ return bsearch((const ulong *) &s_id,
+ ignore_server_ids.buffer,
+ ignore_server_ids.elements, sizeof(ulong),
+ (int (*) (const void*, const void*)) change_master_server_id_cmp)
+ != NULL;
+}
void init_master_info_with_options(Master_info* mi)
{
@@ -84,6 +126,17 @@ void init_master_info_with_options(Master_info* mi)
strmake(mi->ssl_key, master_ssl_key, sizeof(mi->ssl_key)-1);
/* Intentionally init ssl_verify_server_cert to 0, no option available */
mi->ssl_verify_server_cert= 0;
+ /*
+ always request heartbeat unless master_heartbeat_period is set
+ explicitly zero. Here is the default value for heartbeat period
+ if CHANGE MASTER did not specify it. (no data loss in conversion
+ as hb period has a max)
+ */
+ mi->heartbeat_period= (float) min(SLAVE_MAX_HEARTBEAT_PERIOD,
+ (slave_net_timeout/2.0));
+ DBUG_ASSERT(mi->heartbeat_period > (float) 0.001
+ || mi->heartbeat_period == 0);
+
DBUG_VOID_RETURN;
}
@@ -93,9 +146,12 @@ enum {
/* 5.1.16 added value of master_ssl_verify_server_cert */
LINE_FOR_MASTER_SSL_VERIFY_SERVER_CERT= 15,
-
+ /* 6.0 added value of master_heartbeat_period */
+ LINE_FOR_MASTER_HEARTBEAT_PERIOD= 16,
+ /* 6.0 added value of master_ignore_server_id */
+ LINE_FOR_REPLICATE_IGNORE_SERVER_IDS= 17,
/* Number of lines currently used when saving master info file */
- LINES_IN_MASTER_INFO= LINE_FOR_MASTER_SSL_VERIFY_SERVER_CERT
+ LINES_IN_MASTER_INFO= LINE_FOR_REPLICATE_IGNORE_SERVER_IDS
};
int init_master_info(Master_info* mi, const char* master_info_fname,
@@ -197,6 +253,7 @@ file '%s')", fname);
mi->fd = fd;
int port, connect_retry, master_log_pos, lines;
int ssl= 0, ssl_verify_server_cert= 0;
+ float master_heartbeat_period= 0.0;
char *first_non_digit;
/*
@@ -281,7 +338,23 @@ file '%s')", fname);
if (lines >= LINE_FOR_MASTER_SSL_VERIFY_SERVER_CERT &&
init_intvar_from_file(&ssl_verify_server_cert, &mi->file, 0))
goto errwithmsg;
-
+ /*
+ Starting from 6.0 master_heartbeat_period might be
+ in the file
+ */
+ if (lines >= LINE_FOR_MASTER_HEARTBEAT_PERIOD &&
+ init_floatvar_from_file(&master_heartbeat_period, &mi->file, 0.0))
+ goto errwithmsg;
+ /*
+ Starting from 6.0 list of server_id of ignorable servers might be
+ in the file
+ */
+ if (lines >= LINE_FOR_REPLICATE_IGNORE_SERVER_IDS &&
+ init_dynarray_intvar_from_file(&mi->ignore_server_ids, &mi->file))
+ {
+ sql_print_error("Failed to initialize master info ignore_server_ids");
+ goto errwithmsg;
+ }
}
#ifndef HAVE_OPENSSL
@@ -300,6 +373,7 @@ file '%s')", fname);
mi->connect_retry= (uint) connect_retry;
mi->ssl= (my_bool) ssl;
mi->ssl_verify_server_cert= ssl_verify_server_cert;
+ mi->heartbeat_period= master_heartbeat_period;
}
DBUG_PRINT("master_info",("log_file_name: %s position: %ld",
mi->master_log_name,
@@ -310,6 +384,7 @@ file '%s')", fname);
goto err;
mi->inited = 1;
+ mi->rli.is_relay_log_recovery= FALSE;
// now change cache READ -> WRITE - must do this before flush_master_info
reinit_io_cache(&mi->file, WRITE_CACHE, 0L, 0, 1);
if ((error=test(flush_master_info(mi, 1))))
@@ -342,6 +417,7 @@ int flush_master_info(Master_info* mi, bool flush_relay_log_cache)
{
IO_CACHE* file = &mi->file;
char lbuf[22];
+ int err= 0;
DBUG_ENTER("flush_master_info");
DBUG_PRINT("enter",("master_pos: %ld", (long) mi->master_log_pos));
@@ -358,9 +434,35 @@ int flush_master_info(Master_info* mi, bool flush_relay_log_cache)
When we come to this place in code, relay log may or not be initialized;
the caller is responsible for setting 'flush_relay_log_cache' accordingly.
*/
- if (flush_relay_log_cache &&
- flush_io_cache(mi->rli.relay_log.get_log_file()))
- DBUG_RETURN(2);
+ if (flush_relay_log_cache)
+ {
+ IO_CACHE *log_file= mi->rli.relay_log.get_log_file();
+ if (flush_io_cache(log_file))
+ DBUG_RETURN(2);
+ }
+
+ /*
+ produce a line listing the total number and all the ignored server_id:s
+ */
+ char* ignore_server_ids_buf;
+ {
+ ignore_server_ids_buf=
+ (char *) my_malloc((sizeof(::server_id) * 3 + 1) *
+ (1 + mi->ignore_server_ids.elements), MYF(MY_WME));
+ if (!ignore_server_ids_buf)
+ DBUG_RETURN(1);
+ for (ulong i= 0, cur_len= my_sprintf(ignore_server_ids_buf,
+ (ignore_server_ids_buf, "%u",
+ mi->ignore_server_ids.elements));
+ i < mi->ignore_server_ids.elements; i++)
+ {
+ ulong s_id;
+ get_dynamic(&mi->ignore_server_ids, (uchar*) &s_id, i);
+ cur_len +=my_sprintf(ignore_server_ids_buf + cur_len,
+ (ignore_server_ids_buf + cur_len,
+ " %lu", s_id));
+ }
+ }
/*
We flushed the relay log BEFORE the master.info file, because if we crash
@@ -378,17 +480,27 @@ int flush_master_info(Master_info* mi, bool flush_relay_log_cache)
contents of file). But because of number of lines in the first line
of file we don't care about this garbage.
*/
-
+ char heartbeat_buf[sizeof(mi->heartbeat_period) * 4]; // buffer to suffice always
+ my_sprintf(heartbeat_buf, (heartbeat_buf, "%.3f", mi->heartbeat_period));
my_b_seek(file, 0L);
my_b_printf(file,
- "%u\n%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n%s\n%s\n%s\n%s\n%s\n%d\n",
+ "%u\n%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n%s\n%s\n%s\n%s\n%s\n%d\n%s\n%s\n",
LINES_IN_MASTER_INFO,
mi->master_log_name, llstr(mi->master_log_pos, lbuf),
mi->host, mi->user,
mi->password, mi->port, mi->connect_retry,
(int)(mi->ssl), mi->ssl_ca, mi->ssl_capath, mi->ssl_cert,
- mi->ssl_cipher, mi->ssl_key, mi->ssl_verify_server_cert);
- DBUG_RETURN(-flush_io_cache(file));
+ mi->ssl_cipher, mi->ssl_key, mi->ssl_verify_server_cert,
+ heartbeat_buf, ignore_server_ids_buf);
+ my_free(ignore_server_ids_buf, MYF(0));
+ err= flush_io_cache(file);
+ if (sync_masterinfo_period && !err &&
+ ++(mi->sync_counter) >= sync_masterinfo_period)
+ {
+ err= my_sync(mi->fd, MYF(MY_WME));
+ mi->sync_counter= 0;
+ }
+ DBUG_RETURN(-err);
}