summaryrefslogtreecommitdiff
path: root/sql/wsrep_sst.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/wsrep_sst.cc')
-rw-r--r--sql/wsrep_sst.cc121
1 files changed, 72 insertions, 49 deletions
diff --git a/sql/wsrep_sst.cc b/sql/wsrep_sst.cc
index d37c6c0e96c..b31d433de08 100644
--- a/sql/wsrep_sst.cc
+++ b/sql/wsrep_sst.cc
@@ -1,4 +1,4 @@
-/* Copyright 2008-2012 Codership Oy <http://www.codership.com>
+/* Copyright 2008-2015 Codership Oy <http://www.codership.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -26,6 +26,7 @@
#include <sql_parse.h>
#include "wsrep_priv.h"
#include "wsrep_utils.h"
+#include "wsrep_xid.h"
#include <cstdio>
#include <cstdlib>
@@ -264,19 +265,41 @@ void wsrep_sst_complete (const wsrep_uuid_t* sst_uuid,
}
void wsrep_sst_received (wsrep_t* const wsrep,
- const wsrep_uuid_t* const uuid,
+ const wsrep_uuid_t& uuid,
wsrep_seqno_t const seqno,
const void* const state,
size_t const state_len)
{
- int const rcode(seqno < 0 ? seqno : 0);
- wsrep_gtid_t const state_id = {
- *uuid, (rcode ? WSREP_SEQNO_UNDEFINED : seqno)
- };
+ wsrep_get_SE_checkpoint(local_uuid, local_seqno);
+
+ if (memcmp(&local_uuid, &uuid, sizeof(wsrep_uuid_t)) ||
+ local_seqno < seqno || seqno < 0)
+ {
+ wsrep_set_SE_checkpoint(uuid, seqno);
+ local_uuid = uuid;
+ local_seqno = seqno;
+ }
+ else if (local_seqno > seqno)
+ {
+ WSREP_WARN("SST postion is in the past: %lld, current: %lld. "
+ "Can't continue.",
+ (long long)seqno, (long long)local_seqno);
+ unireg_abort(1);
+ }
+
#ifdef GTID_SUPPORT
- wsrep_init_sidno(state_id.uuid);
+ wsrep_init_sidno(uuid);
#endif /* GTID_SUPPORT */
- wsrep->sst_received(wsrep, &state_id, state, state_len, rcode);
+
+ if (wsrep)
+ {
+ int const rcode(seqno < 0 ? seqno : 0);
+ wsrep_gtid_t const state_id = {
+ uuid, (rcode ? WSREP_SEQNO_UNDEFINED : seqno)
+ };
+
+ wsrep->sst_received(wsrep, &state_id, state, state_len, rcode);
+ }
}
// Let applier threads to continue
@@ -285,7 +308,7 @@ void wsrep_sst_continue ()
if (sst_needed)
{
WSREP_INFO("Signalling provider to continue.");
- wsrep_sst_received (wsrep, &local_uuid, local_seqno, NULL, 0);
+ wsrep_sst_received (wsrep, local_uuid, local_seqno, NULL, 0);
}
}
@@ -606,8 +629,6 @@ static bool SE_initialized = false;
ssize_t wsrep_sst_prepare (void** msg)
{
- const ssize_t ip_max= 256;
- char ip_buf[ip_max];
const char* addr_in= NULL;
const char* addr_out= NULL;
@@ -623,27 +644,34 @@ ssize_t wsrep_sst_prepare (void** msg)
return ret;
}
- // Figure out SST address. Common for all SST methods
+ /*
+ Figure out SST receive address. Common for all SST methods.
+ */
+ char ip_buf[256];
+ const ssize_t ip_max= sizeof(ip_buf);
+
+ // Attempt 1: wsrep_sst_receive_address
if (wsrep_sst_receive_address &&
strcmp (wsrep_sst_receive_address, WSREP_SST_ADDRESS_AUTO))
{
addr_in= wsrep_sst_receive_address;
}
+
+ //Attempt 2: wsrep_node_address
else if (wsrep_node_address && strlen(wsrep_node_address))
{
- const char* const colon= strchr (wsrep_node_address, ':');
- if (colon)
- {
- ptrdiff_t const len= colon - wsrep_node_address;
- strncpy (ip_buf, wsrep_node_address, len);
- ip_buf[len]= '\0';
- addr_in= ip_buf;
- }
- else
+ wsp::Address addr(wsrep_node_address);
+
+ if (!addr.is_valid())
{
- addr_in= wsrep_node_address;
+ WSREP_ERROR("Could not parse wsrep_node_address : %s",
+ wsrep_node_address);
+ unireg_abort(1);
}
+ memcpy(ip_buf, addr.get_address(), addr.get_address_len());
+ addr_in= ip_buf;
}
+ // Attempt 3: Try to get the IP from the list of available interfaces.
else
{
ssize_t ret= wsrep_guess_ip (ip_buf, ip_max);
@@ -654,8 +682,7 @@ ssize_t wsrep_sst_prepare (void** msg)
}
else
{
- WSREP_ERROR("Could not prepare state transfer request: "
- "failed to guess address to accept state transfer at. "
+ WSREP_ERROR("Failed to guess address to accept state transfer. "
"wsrep_sst_receive_address must be set manually.");
unireg_abort(1);
}
@@ -755,8 +782,10 @@ static void sst_reject_queries(my_bool close_conn)
if (TRUE == close_conn) wsrep_close_client_connections(FALSE);
}
-static int sst_mysqldump_check_addr (const char* user, const char* pswd,
- const char* host, const char* port)
+static int sst_mysqldump_check_addr (const char* user,
+ const char* pswd,
+ const char* host,
+ int port)
{
return 0;
}
@@ -767,25 +796,17 @@ static int sst_donate_mysqldump (const char* addr,
wsrep_seqno_t seqno,
bool bypass)
{
- size_t host_len;
- const char* port = strchr (addr, ':');
+ char host[256];
+ wsp::Address address(addr);
- if (port)
+ if (!address.is_valid())
{
- port += 1;
- host_len = port - addr;
- }
- else
- {
- port = "";
- host_len = strlen (addr) + 1;
+ WSREP_ERROR("Could not parse SST address : %s", addr);
+ return 0;
}
- char *host=(char*)alloca(host_len);
-
- strncpy (host, addr, host_len - 1);
- host[host_len - 1] = '\0';
-
+ memcpy(host, address.get_address(), address.get_address_len());
+ int port= address.get_port();
const char* auth = sst_auth_real;
const char* pswd = (auth) ? strchr (auth, ':') : NULL;
size_t user_len;
@@ -801,7 +822,7 @@ static int sst_donate_mysqldump (const char* addr,
user_len = (auth) ? strlen (auth) + 1 : 1;
}
- char *user=(char*)alloca(user_len);
+ char *user= (char *) alloca(user_len);
strncpy (user, (auth) ? auth : "", user_len - 1);
user[user_len - 1] = '\0';
@@ -820,7 +841,7 @@ static int sst_donate_mysqldump (const char* addr,
WSREP_SST_OPT_USER" '%s' "
WSREP_SST_OPT_PSWD" '%s' "
WSREP_SST_OPT_HOST" '%s' "
- WSREP_SST_OPT_PORT" '%s' "
+ WSREP_SST_OPT_PORT" '%d' "
WSREP_SST_OPT_LPORT" '%u' "
WSREP_SST_OPT_SOCKET" '%s' "
" %s "
@@ -912,11 +933,13 @@ static int sst_flush_tables(THD* thd)
{
WSREP_INFO("Tables flushed.");
const char base_name[]= "tables_flushed";
+
ssize_t const full_len= strlen(mysql_real_data_home) + strlen(base_name)+2;
- char *real_name=(char*)alloca(full_len);
- sprintf(real_name, "%s/%s", mysql_real_data_home, base_name);
- char *tmp_name=(char*)alloca(full_len + 4);
- sprintf(tmp_name, "%s.tmp", real_name);
+ char *real_name= (char *) alloca(full_len);
+ snprintf(real_name, (size_t) full_len, "%s/%s", mysql_real_data_home,
+ base_name);
+ char *tmp_name= (char *) alloca(full_len + 4);
+ snprintf(tmp_name, (size_t) full_len + 4, "%s.tmp", real_name);
FILE* file= fopen(tmp_name, "w+");
if (0 == file)
@@ -1078,7 +1101,7 @@ static int sst_donate_other (const char* method,
wsrep_seqno_t seqno,
bool bypass)
{
- char cmd_str[4096];
+ char cmd_str[4096];
const char* binlog_opt= "";
char* binlog_opt_val= NULL;
@@ -1111,7 +1134,7 @@ static int sst_donate_other (const char* method,
bypass ? " "WSREP_SST_OPT_BYPASS : "");
my_free(binlog_opt_val);
- if (ret < 0 || ret >= (int)sizeof(cmd_str))
+ if (ret < 0 || ret >= (int) sizeof(cmd_str))
{
WSREP_ERROR("sst_donate_other(): snprintf() failed: %d", ret);
return (ret < 0 ? ret : -EMSGSIZE);