diff options
Diffstat (limited to 'sql/wsrep_sst.cc')
-rw-r--r-- | sql/wsrep_sst.cc | 121 |
1 files changed, 72 insertions, 49 deletions
diff --git a/sql/wsrep_sst.cc b/sql/wsrep_sst.cc index d37c6c0e96c..b31d433de08 100644 --- a/sql/wsrep_sst.cc +++ b/sql/wsrep_sst.cc @@ -1,4 +1,4 @@ -/* Copyright 2008-2012 Codership Oy <http://www.codership.com> +/* Copyright 2008-2015 Codership Oy <http://www.codership.com> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -26,6 +26,7 @@ #include <sql_parse.h> #include "wsrep_priv.h" #include "wsrep_utils.h" +#include "wsrep_xid.h" #include <cstdio> #include <cstdlib> @@ -264,19 +265,41 @@ void wsrep_sst_complete (const wsrep_uuid_t* sst_uuid, } void wsrep_sst_received (wsrep_t* const wsrep, - const wsrep_uuid_t* const uuid, + const wsrep_uuid_t& uuid, wsrep_seqno_t const seqno, const void* const state, size_t const state_len) { - int const rcode(seqno < 0 ? seqno : 0); - wsrep_gtid_t const state_id = { - *uuid, (rcode ? WSREP_SEQNO_UNDEFINED : seqno) - }; + wsrep_get_SE_checkpoint(local_uuid, local_seqno); + + if (memcmp(&local_uuid, &uuid, sizeof(wsrep_uuid_t)) || + local_seqno < seqno || seqno < 0) + { + wsrep_set_SE_checkpoint(uuid, seqno); + local_uuid = uuid; + local_seqno = seqno; + } + else if (local_seqno > seqno) + { + WSREP_WARN("SST postion is in the past: %lld, current: %lld. " + "Can't continue.", + (long long)seqno, (long long)local_seqno); + unireg_abort(1); + } + #ifdef GTID_SUPPORT - wsrep_init_sidno(state_id.uuid); + wsrep_init_sidno(uuid); #endif /* GTID_SUPPORT */ - wsrep->sst_received(wsrep, &state_id, state, state_len, rcode); + + if (wsrep) + { + int const rcode(seqno < 0 ? seqno : 0); + wsrep_gtid_t const state_id = { + uuid, (rcode ? WSREP_SEQNO_UNDEFINED : seqno) + }; + + wsrep->sst_received(wsrep, &state_id, state, state_len, rcode); + } } // Let applier threads to continue @@ -285,7 +308,7 @@ void wsrep_sst_continue () if (sst_needed) { WSREP_INFO("Signalling provider to continue."); - wsrep_sst_received (wsrep, &local_uuid, local_seqno, NULL, 0); + wsrep_sst_received (wsrep, local_uuid, local_seqno, NULL, 0); } } @@ -606,8 +629,6 @@ static bool SE_initialized = false; ssize_t wsrep_sst_prepare (void** msg) { - const ssize_t ip_max= 256; - char ip_buf[ip_max]; const char* addr_in= NULL; const char* addr_out= NULL; @@ -623,27 +644,34 @@ ssize_t wsrep_sst_prepare (void** msg) return ret; } - // Figure out SST address. Common for all SST methods + /* + Figure out SST receive address. Common for all SST methods. + */ + char ip_buf[256]; + const ssize_t ip_max= sizeof(ip_buf); + + // Attempt 1: wsrep_sst_receive_address if (wsrep_sst_receive_address && strcmp (wsrep_sst_receive_address, WSREP_SST_ADDRESS_AUTO)) { addr_in= wsrep_sst_receive_address; } + + //Attempt 2: wsrep_node_address else if (wsrep_node_address && strlen(wsrep_node_address)) { - const char* const colon= strchr (wsrep_node_address, ':'); - if (colon) - { - ptrdiff_t const len= colon - wsrep_node_address; - strncpy (ip_buf, wsrep_node_address, len); - ip_buf[len]= '\0'; - addr_in= ip_buf; - } - else + wsp::Address addr(wsrep_node_address); + + if (!addr.is_valid()) { - addr_in= wsrep_node_address; + WSREP_ERROR("Could not parse wsrep_node_address : %s", + wsrep_node_address); + unireg_abort(1); } + memcpy(ip_buf, addr.get_address(), addr.get_address_len()); + addr_in= ip_buf; } + // Attempt 3: Try to get the IP from the list of available interfaces. else { ssize_t ret= wsrep_guess_ip (ip_buf, ip_max); @@ -654,8 +682,7 @@ ssize_t wsrep_sst_prepare (void** msg) } else { - WSREP_ERROR("Could not prepare state transfer request: " - "failed to guess address to accept state transfer at. " + WSREP_ERROR("Failed to guess address to accept state transfer. " "wsrep_sst_receive_address must be set manually."); unireg_abort(1); } @@ -755,8 +782,10 @@ static void sst_reject_queries(my_bool close_conn) if (TRUE == close_conn) wsrep_close_client_connections(FALSE); } -static int sst_mysqldump_check_addr (const char* user, const char* pswd, - const char* host, const char* port) +static int sst_mysqldump_check_addr (const char* user, + const char* pswd, + const char* host, + int port) { return 0; } @@ -767,25 +796,17 @@ static int sst_donate_mysqldump (const char* addr, wsrep_seqno_t seqno, bool bypass) { - size_t host_len; - const char* port = strchr (addr, ':'); + char host[256]; + wsp::Address address(addr); - if (port) + if (!address.is_valid()) { - port += 1; - host_len = port - addr; - } - else - { - port = ""; - host_len = strlen (addr) + 1; + WSREP_ERROR("Could not parse SST address : %s", addr); + return 0; } - char *host=(char*)alloca(host_len); - - strncpy (host, addr, host_len - 1); - host[host_len - 1] = '\0'; - + memcpy(host, address.get_address(), address.get_address_len()); + int port= address.get_port(); const char* auth = sst_auth_real; const char* pswd = (auth) ? strchr (auth, ':') : NULL; size_t user_len; @@ -801,7 +822,7 @@ static int sst_donate_mysqldump (const char* addr, user_len = (auth) ? strlen (auth) + 1 : 1; } - char *user=(char*)alloca(user_len); + char *user= (char *) alloca(user_len); strncpy (user, (auth) ? auth : "", user_len - 1); user[user_len - 1] = '\0'; @@ -820,7 +841,7 @@ static int sst_donate_mysqldump (const char* addr, WSREP_SST_OPT_USER" '%s' " WSREP_SST_OPT_PSWD" '%s' " WSREP_SST_OPT_HOST" '%s' " - WSREP_SST_OPT_PORT" '%s' " + WSREP_SST_OPT_PORT" '%d' " WSREP_SST_OPT_LPORT" '%u' " WSREP_SST_OPT_SOCKET" '%s' " " %s " @@ -912,11 +933,13 @@ static int sst_flush_tables(THD* thd) { WSREP_INFO("Tables flushed."); const char base_name[]= "tables_flushed"; + ssize_t const full_len= strlen(mysql_real_data_home) + strlen(base_name)+2; - char *real_name=(char*)alloca(full_len); - sprintf(real_name, "%s/%s", mysql_real_data_home, base_name); - char *tmp_name=(char*)alloca(full_len + 4); - sprintf(tmp_name, "%s.tmp", real_name); + char *real_name= (char *) alloca(full_len); + snprintf(real_name, (size_t) full_len, "%s/%s", mysql_real_data_home, + base_name); + char *tmp_name= (char *) alloca(full_len + 4); + snprintf(tmp_name, (size_t) full_len + 4, "%s.tmp", real_name); FILE* file= fopen(tmp_name, "w+"); if (0 == file) @@ -1078,7 +1101,7 @@ static int sst_donate_other (const char* method, wsrep_seqno_t seqno, bool bypass) { - char cmd_str[4096]; + char cmd_str[4096]; const char* binlog_opt= ""; char* binlog_opt_val= NULL; @@ -1111,7 +1134,7 @@ static int sst_donate_other (const char* method, bypass ? " "WSREP_SST_OPT_BYPASS : ""); my_free(binlog_opt_val); - if (ret < 0 || ret >= (int)sizeof(cmd_str)) + if (ret < 0 || ret >= (int) sizeof(cmd_str)) { WSREP_ERROR("sst_donate_other(): snprintf() failed: %d", ret); return (ret < 0 ? ret : -EMSGSIZE); |