diff options
Diffstat (limited to 'sql/wsrep_mysqld.cc')
-rw-r--r-- | sql/wsrep_mysqld.cc | 344 |
1 files changed, 174 insertions, 170 deletions
diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index 33d1d2a0966..d72d9fdf151 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -1,4 +1,4 @@ -/* Copyright 2008-2014 Codership Oy <http://www.codership.com> +/* Copyright 2008-2015 Codership Oy <http://www.codership.com> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -31,6 +31,7 @@ #include "wsrep_var.h" #include "wsrep_binlog.h" #include "wsrep_applier.h" +#include "wsrep_xid.h" #include <cstdio> #include <cstdlib> #include "log_event.h" @@ -164,10 +165,6 @@ static PSI_file_info wsrep_files[]= { { &key_file_wsrep_gra_log, "wsrep_gra_log", 0} }; - -#else -#define mysql_mutex_register(X,Y,Z) -#define mysql_cond_register(X,Y,Z) #endif my_bool wsrep_inited = 0; // initialized ? @@ -252,62 +249,22 @@ static void wsrep_log_states (wsrep_log_level_t const level, wsrep_log_cb (level, msg); } -static my_bool set_SE_checkpoint(THD* unused, plugin_ref plugin, void* arg) -{ - XID* xid= reinterpret_cast<XID*>(arg); - handlerton* hton= plugin_data(plugin, handlerton *); - if (hton->set_checkpoint) - { - const wsrep_uuid_t* uuid(wsrep_xid_uuid(xid)); - char uuid_str[40] = {0, }; - wsrep_uuid_print(uuid, uuid_str, sizeof(uuid_str)); - WSREP_DEBUG("Set WSREPXid for InnoDB: %s:%lld", - uuid_str, (long long)wsrep_xid_seqno(xid)); - hton->set_checkpoint(hton, xid); - } - return FALSE; -} - -void wsrep_set_SE_checkpoint(XID* xid) -{ - plugin_foreach(NULL, set_SE_checkpoint, MYSQL_STORAGE_ENGINE_PLUGIN, xid); -} - -static my_bool get_SE_checkpoint(THD* unused, plugin_ref plugin, void* arg) -{ - XID* xid= reinterpret_cast<XID*>(arg); - handlerton* hton= plugin_data(plugin, handlerton *); - if (hton->get_checkpoint) - { - hton->get_checkpoint(hton, xid); - const wsrep_uuid_t* uuid(wsrep_xid_uuid(xid)); - char uuid_str[40] = {0, }; - wsrep_uuid_print(uuid, uuid_str, sizeof(uuid_str)); - WSREP_DEBUG("Read WSREPXid from InnoDB: %s:%lld", - uuid_str, (long long)wsrep_xid_seqno(xid)); - } - return FALSE; -} - -void wsrep_get_SE_checkpoint(XID* xid) -{ - plugin_foreach(NULL, get_SE_checkpoint, MYSQL_STORAGE_ENGINE_PLUGIN, xid); -} - #ifdef GTID_SUPPORT -void wsrep_init_sidno(const wsrep_uuid_t& uuid) +void wsrep_init_sidno(const wsrep_uuid_t& wsrep_uuid) { /* generate new Sid map entry from inverted uuid */ rpl_sid sid; wsrep_uuid_t ltid_uuid; + for (size_t i= 0; i < sizeof(ltid_uuid.data); ++i) { - ltid_uuid.data[i] = ~local_uuid.data[i]; + ltid_uuid.data[i] = ~wsrep_uuid.data[i]; } + sid.copy_from(ltid_uuid.data); global_sid_lock->wrlock(); wsrep_sidno= global_sid_map->add_sid(sid); - WSREP_INFO("inited wsrep sidno %d", wsrep_sidno); + WSREP_INFO("Initialized wsrep sidno %d", wsrep_sidno); global_sid_lock->unlock(); } #endif /* GTID_SUPPORT */ @@ -448,13 +405,11 @@ wsrep_view_handler_cb (void* app_ctx, local_seqno= view->state_id.seqno; } /* Init storage engine XIDs from first view */ - XID xid; - wsrep_xid_init(&xid, &local_uuid, local_seqno); - wsrep_set_SE_checkpoint(&xid); - memb_status= WSREP_MEMBER_JOINED; + wsrep_set_SE_checkpoint(local_uuid, local_seqno); #ifdef GTID_SUPPORT wsrep_init_sidno(local_uuid); #endif /* GTID_SUPPORT */ + memb_status= WSREP_MEMBER_JOINED; } // just some sanity check @@ -547,12 +502,13 @@ static void wsrep_synced_cb(void* app_ctx) wsrep_restart_slave_activated= FALSE; mysql_mutex_lock(&LOCK_active_mi); - if ((rcode = start_slave_threads(1 /* need mutex */, - 0 /* no wait for start*/, - active_mi, - master_info_file, - relay_log_info_file, - SLAVE_SQL))) + if ((rcode = start_slave_threads(0, + 1 /* need mutex */, + 0 /* no wait for start*/, + active_mi, + master_info_file, + relay_log_info_file, + SLAVE_SQL))) { WSREP_WARN("Failed to create slave threads: %d", rcode); } @@ -564,38 +520,28 @@ static void wsrep_synced_cb(void* app_ctx) static void wsrep_init_position() { /* read XIDs from storage engines */ - XID xid; - memset(&xid, 0, sizeof(xid)); - xid.formatID= -1; - wsrep_get_SE_checkpoint(&xid); + wsrep_uuid_t uuid; + wsrep_seqno_t seqno; + wsrep_get_SE_checkpoint(uuid, seqno); - if (xid.formatID == -1) + if (!memcmp(&uuid, &WSREP_UUID_UNDEFINED, sizeof(wsrep_uuid_t))) { WSREP_INFO("Read nil XID from storage engines, skipping position init"); return; } - else if (!wsrep_is_wsrep_xid(&xid)) - { - WSREP_WARN("Read non-wsrep XID from storage engines, skipping position init"); - return; - } - - const wsrep_uuid_t* uuid= wsrep_xid_uuid(&xid); - const wsrep_seqno_t seqno= wsrep_xid_seqno(&xid); char uuid_str[40] = {0, }; - wsrep_uuid_print(uuid, uuid_str, sizeof(uuid_str)); + wsrep_uuid_print(&uuid, uuid_str, sizeof(uuid_str)); WSREP_INFO("Initial position: %s:%lld", uuid_str, (long long)seqno); - if (!memcmp(&local_uuid, &WSREP_UUID_UNDEFINED, sizeof(local_uuid)) && local_seqno == WSREP_SEQNO_UNDEFINED) { // Initial state - local_uuid= *uuid; + local_uuid= uuid; local_seqno= seqno; } - else if (memcmp(&local_uuid, uuid, sizeof(local_uuid)) || + else if (memcmp(&local_uuid, &uuid, sizeof(local_uuid)) || local_seqno != seqno) { WSREP_WARN("Initial position was provided by configuration or SST, " @@ -672,6 +618,7 @@ int wsrep_init() wsrep->provider_vendor, sizeof(provider_vendor) - 1); } + /* Initialize node address */ char node_addr[512]= { 0, }; size_t const node_addr_max= sizeof(node_addr) - 1; if (!wsrep_node_address || !strcmp(wsrep_node_address, "")) @@ -689,86 +636,81 @@ int wsrep_init() strncpy(node_addr, wsrep_node_address, node_addr_max); } + /* Initialize node's incoming address */ char inc_addr[512]= { 0, }; size_t const inc_addr_max= sizeof (inc_addr); + + /* + In case wsrep_node_incoming_address is either not set or set to AUTO, + we need to use mysqld's my_bind_addr_str:mysqld_port, lastly fallback + to wsrep_node_address' value if mysqld's bind-address is not set either. + */ if ((!wsrep_node_incoming_address || !strcmp (wsrep_node_incoming_address, WSREP_NODE_INCOMING_AUTO))) { + bool is_ipv6= false; unsigned int my_bind_ip= INADDR_ANY; // default if not set + if (my_bind_addr_str && strlen(my_bind_addr_str)) { - my_bind_ip= wsrep_check_ip(my_bind_addr_str); + my_bind_ip= wsrep_check_ip(my_bind_addr_str, &is_ipv6); } if (INADDR_ANY != my_bind_ip) { + /* + If its a not a valid address, leave inc_addr as empty string. mysqld + is not listening for client connections on network interfaces. + */ if (INADDR_NONE != my_bind_ip && INADDR_LOOPBACK != my_bind_ip) { - snprintf(inc_addr, inc_addr_max, "%s:%u", - my_bind_addr_str, (int)mysqld_port); - } // else leave inc_addr an empty string - mysqld is not listening for - // client connections on network interfaces. + const char *fmt= (is_ipv6) ? "[%s]:%u" : "%s:%u"; + snprintf(inc_addr, inc_addr_max, fmt, my_bind_addr_str, mysqld_port); + } } - else // mysqld binds to 0.0.0.0, take IP from wsrep_node_address if possible + else /* mysqld binds to 0.0.0.0, try taking IP from wsrep_node_address. */ { size_t const node_addr_len= strlen(node_addr); if (node_addr_len > 0) { - const char* const colon= strrchr(node_addr, ':'); - if (strchr(node_addr, ':') == colon) // 1 or 0 ':' - { - size_t const ip_len= colon ? colon - node_addr : node_addr_len; - if (ip_len + 7 /* :55555\0 */ < inc_addr_max) - { - memcpy (inc_addr, node_addr, ip_len); - snprintf(inc_addr + ip_len, inc_addr_max - ip_len, ":%u", - (int)mysqld_port); - } - else - { - WSREP_WARN("Guessing address for incoming client connections: " - "address too long."); - inc_addr[0]= '\0'; - } - } - else - { - WSREP_WARN("Guessing address for incoming client connections: " - "too many colons :) ."); - inc_addr[0]= '\0'; - } - } + wsp::Address addr(node_addr); - if (!strlen(inc_addr)) - { + if (!addr.is_valid()) + { + WSREP_DEBUG("Could not parse node address : %s", node_addr); WSREP_WARN("Guessing address for incoming client connections failed. " "Try setting wsrep_node_incoming_address explicitly."); + goto done; + } + + const char *fmt= (addr.is_ipv6()) ? "[%s]:%u" : "%s:%u"; + snprintf(inc_addr, inc_addr_max, fmt, addr.get_address(), + (int) mysqld_port); } } } - else if (!strchr(wsrep_node_incoming_address, ':')) // no port included - { - if ((int)inc_addr_max <= - snprintf(inc_addr, inc_addr_max, "%s:%u", - wsrep_node_incoming_address,(int)mysqld_port)) - { - WSREP_WARN("Guessing address for incoming client connections: " - "address too long."); - inc_addr[0]= '\0'; - } - } else { - size_t const need = strlen (wsrep_node_incoming_address); - if (need >= inc_addr_max) { - WSREP_WARN("wsrep_node_incoming_address too long: %zu", need); - inc_addr[0]= '\0'; - } - else { - memcpy (inc_addr, wsrep_node_incoming_address, need); + wsp::Address addr(wsrep_node_incoming_address); + + if (!addr.is_valid()) + { + WSREP_WARN("Could not parse wsrep_node_incoming_address : %s", + wsrep_node_incoming_address); + goto done; } + + /* + In case port is not specified in wsrep_node_incoming_address, we use + mysqld_port. + */ + int port= (addr.get_port() > 0) ? addr.get_port() : (int) mysqld_port; + const char *fmt= (addr.is_ipv6()) ? "[%s]:%u" : "%s:%u"; + + snprintf(inc_addr, inc_addr_max, fmt, addr.get_address(), port); } +done: struct wsrep_init_args wsrep_args; struct wsrep_gtid const state_id = { local_uuid, local_seqno }; @@ -811,8 +753,11 @@ int wsrep_init() /* Initialize wsrep thread LOCKs and CONDs */ void wsrep_thr_init() { +#ifdef HAVE_PSI_INTERFACE mysql_mutex_register("sql", wsrep_mutexes, array_elements(wsrep_mutexes)); mysql_cond_register("sql", wsrep_conds, array_elements(wsrep_conds)); + mysql_file_register("sql", wsrep_files, array_elements(wsrep_files)); +#endif mysql_mutex_init(key_LOCK_wsrep_ready, &LOCK_wsrep_ready, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_wsrep_ready, &COND_wsrep_ready, NULL); @@ -827,8 +772,6 @@ void wsrep_thr_init() mysql_mutex_init(key_LOCK_wsrep_slave_threads, &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_wsrep_desync, &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_wsrep_config_state, &LOCK_wsrep_config_state, MY_MUTEX_INIT_FAST); - - mysql_file_register("sql", wsrep_files, array_elements(wsrep_files)); } @@ -864,6 +807,7 @@ void wsrep_deinit(bool free_options) provider_name[0]= '\0'; provider_version[0]= '\0'; provider_vendor[0]= '\0'; + wsrep_inited= 0; if (free_options) @@ -902,13 +846,11 @@ void wsrep_recover() uuid_str, (long long)local_seqno); return; } - XID xid; - memset(&xid, 0, sizeof(xid)); - xid.formatID= -1; - wsrep_get_SE_checkpoint(&xid); - wsrep_uuid_print(wsrep_xid_uuid(&xid), uuid_str, sizeof(uuid_str)); - WSREP_INFO("Recovered position: %s:%lld", uuid_str, - (long long)wsrep_xid_seqno(&xid)); + wsrep_uuid_t uuid; + wsrep_seqno_t seqno; + wsrep_get_SE_checkpoint(uuid, seqno); + wsrep_uuid_print(&uuid, uuid_str, sizeof(uuid_str)); + WSREP_INFO("Recovered position: %s:%lld", uuid_str, (long long)seqno); } @@ -1269,17 +1211,23 @@ int wsrep_to_buf_helper( THD* thd, const char *query, uint query_len, uchar** buf, size_t* buf_len) { IO_CACHE tmp_io_cache; + Log_event_writer writer(&tmp_io_cache); if (open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX, 65536, MYF(MY_WME))) return 1; int ret(0); + Format_description_log_event *tmp_fd= new Format_description_log_event(4); + tmp_fd->checksum_alg= (enum_binlog_checksum_alg)binlog_checksum_options; + writer.write(tmp_fd); + delete tmp_fd; + #ifdef GTID_SUPPORT if (thd->variables.gtid_next.type == GTID_GROUP) { Gtid_log_event gtid_ev(thd, FALSE, &thd->variables.gtid_next); if (!gtid_ev.is_valid()) ret= 0; - if (!ret && gtid_ev.write(&tmp_io_cache)) ret= 1; + if (!ret && writer.write(>id_ev)) ret= 1; } #endif /* GTID_SUPPORT */ @@ -1289,12 +1237,12 @@ int wsrep_to_buf_helper( Query_log_event ev(thd, thd->wsrep_TOI_pre_query, thd->wsrep_TOI_pre_query_len, FALSE, FALSE, FALSE, 0); - if (ev.write(&tmp_io_cache)) ret= 1; + if (writer.write(&ev)) ret= 1; } /* continue to append the actual query */ Query_log_event ev(thd, query, query_len, FALSE, FALSE, FALSE, 0); - if (!ret && ev.write(&tmp_io_cache)) ret= 1; + if (!ret && writer.write(&ev)) ret= 1; if (!ret && wsrep_write_cache_buf(&tmp_io_cache, buf, buf_len)) ret= 1; close_cached_file(&tmp_io_cache); return ret; @@ -1347,7 +1295,13 @@ create_view_query(THD *thd, uchar** buf, size_t* buf_len) buff.append(command[thd->lex->create_view_mode].str, command[thd->lex->create_view_mode].length); - if (!lex->definer) + LEX_USER *definer; + + if (lex->definer) + { + definer= get_current_user(thd, lex->definer); + } + else { /* DEFINER-clause is missing; we have to create default definer in @@ -1355,16 +1309,19 @@ create_view_query(THD *thd, uchar** buf, size_t* buf_len) If this is an ALTER VIEW then the current user should be set as the definer. */ + definer= create_default_definer(thd, false); + } - if (!(lex->definer= create_default_definer(thd, false))) - { - WSREP_WARN("view default definer issue"); - } + if (definer) + { + views->definer.user = definer->user; + views->definer.host = definer->host; + } else { + WSREP_ERROR("Failed to get DEFINER for VIEW."); + return 1; } views->algorithm = lex->create_view_algorithm; - views->definer.user = lex->definer->user; - views->definer.host = lex->definer->host; views->view_suid = lex->create_view_suid; views->with_check = lex->create_view_check; @@ -1403,6 +1360,12 @@ create_view_query(THD *thd, uchar** buf, size_t* buf_len) return wsrep_to_buf_helper(thd, buff.ptr(), buff.length(), buf, buf_len); } +/* + returns: + 0: statement was replicated as TOI + 1: TOI replication was skipped + -1: TOI replication failed + */ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_, const TABLE_LIST* table_list) { @@ -1410,6 +1373,7 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_, uchar* buf(0); size_t buf_len(0); int buf_err; + int rc= 0; WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd), thd->wsrep_exec_mode, thd->query() ); @@ -1439,32 +1403,39 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_, wsrep_key_arr_t key_arr= {0, 0}; struct wsrep_buf buff = { buf, buf_len }; - if (!buf_err && + if (!buf_err && wsrep_prepare_keys_for_isolation(thd, db_, table_, table_list, &key_arr)&& + key_arr.keys_len > 0 && WSREP_OK == (ret = wsrep->to_execute_start(wsrep, thd->thread_id, - key_arr.keys, key_arr.keys_len, - &buff, 1, - &thd->wsrep_trx_meta))) + key_arr.keys, key_arr.keys_len, + &buff, 1, + &thd->wsrep_trx_meta))) { thd->wsrep_exec_mode= TOTAL_ORDER; wsrep_to_isolation++; - my_free(buf); wsrep_keys_free(&key_arr); WSREP_DEBUG("TO BEGIN: %lld, %d",(long long)wsrep_thd_trx_seqno(thd), - thd->wsrep_exec_mode); + thd->wsrep_exec_mode); } - else { + else if (key_arr.keys_len > 0) { /* jump to error handler in mysql_execute_command() */ WSREP_WARN("TO isolation failed for: %d, sql: %s. Check wsrep " "connection state and retry the query.", ret, (thd->query()) ? thd->query() : "void"); my_error(ER_LOCK_DEADLOCK, MYF(0), "WSREP replication failed. Check " - "your wsrep connection state and retry the query."); - if (buf) my_free(buf); + "your wsrep connection state and retry the query."); wsrep_keys_free(&key_arr); - return -1; + rc= -1; } - return 0; + else { + /* non replicated DDL, affecting temporary tables only */ + WSREP_DEBUG("TO isolation skipped for: %d, sql: %s." + "Only temporary tables affected.", + ret, (thd->query()) ? thd->query() : "void"); + rc= 1; + } + if (buf) my_free(buf); + return rc; } static void wsrep_TOI_end(THD *thd) { @@ -1473,11 +1444,9 @@ static void wsrep_TOI_end(THD *thd) { WSREP_DEBUG("TO END: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd), thd->wsrep_exec_mode, (thd->query()) ? thd->query() : "void"); - - XID xid; - wsrep_xid_init(&xid, &thd->wsrep_trx_meta.gtid.uuid, - thd->wsrep_trx_meta.gtid.seqno); - wsrep_set_SE_checkpoint(&xid); + + wsrep_set_SE_checkpoint(thd->wsrep_trx_meta.gtid.uuid, + thd->wsrep_trx_meta.gtid.seqno); WSREP_DEBUG("TO END: %lld, update seqno", (long long)wsrep_thd_trx_seqno(thd)); @@ -1613,14 +1582,25 @@ int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_, if (thd->variables.wsrep_on && thd->wsrep_exec_mode==LOCAL_STATE) { - switch (wsrep_OSU_method_options) { + switch (thd->variables.wsrep_OSU_method) { case WSREP_OSU_TOI: ret = wsrep_TOI_begin(thd, db_, table_, table_list); break; case WSREP_OSU_RSU: ret = wsrep_RSU_begin(thd, db_, table_); break; + default: + WSREP_ERROR("Unsupported OSU method: %lu", + thd->variables.wsrep_OSU_method); + ret= -1; + break; } - if (!ret) - { - thd->wsrep_exec_mode= TOTAL_ORDER; + switch (ret) { + case 0: thd->wsrep_exec_mode= TOTAL_ORDER; break; + case 1: + /* TOI replication skipped, treat as success */ + ret = 0; + break; + case -1: + /* TOI replication failed, treat as error */ + break; } } return ret; @@ -1630,10 +1610,14 @@ void wsrep_to_isolation_end(THD *thd) { if (thd->wsrep_exec_mode == TOTAL_ORDER) { - switch(wsrep_OSU_method_options) + switch(thd->variables.wsrep_OSU_method) { case WSREP_OSU_TOI: wsrep_TOI_end(thd); break; case WSREP_OSU_RSU: wsrep_RSU_end(thd); break; + default: + WSREP_WARN("Unsupported wsrep OSU method at isolation end: %lu", + thd->variables.wsrep_OSU_method); + break; } wsrep_cleanup_transaction(thd); } @@ -1652,10 +1636,21 @@ void wsrep_to_isolation_end(THD *thd) gra->wsrep_exec_mode, gra->wsrep_query_state, gra->wsrep_conflict_state, \ gra->get_command(), gra->lex->sql_command, gra->query()); +/** + Check if request for the metadata lock should be granted to the requester. + + @param requestor_ctx The MDL context of the requestor + @param ticket MDL ticket for the requested lock + + @retval TRUE Lock request can be granted + @retval FALSE Lock request cannot be granted +*/ + bool wsrep_grant_mdl_exception(MDL_context *requestor_ctx, MDL_ticket *ticket ) { + /* Fallback to the non-wsrep behaviour */ if (!WSREP_ON) return FALSE; THD *request_thd = requestor_ctx->get_thd(); @@ -1748,7 +1743,7 @@ pthread_handler_t start_wsrep_THD(void *arg) thread_created++; threads.append(thd); - my_net_init(&thd->net,(st_vio*) 0, MYF(0)); + my_net_init(&thd->net,(st_vio*) 0, thd, MYF(0)); DBUG_PRINT("wsrep",(("creating thread %lld"), (long long)thd->thread_id)); thd->prior_thr_create_utime= thd->start_utime= microsecond_interval_timer(); @@ -2477,9 +2472,13 @@ int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len) if (lex->definer) { /* SUID trigger. */ + LEX_USER *d= get_current_user(thd, lex->definer); + + if (!d) + return 1; - definer_user= lex->definer->user; - definer_host= lex->definer->host; + definer_user= d->user; + definer_host= d->host; } else { @@ -2530,6 +2529,11 @@ my_bool get_wsrep_drupal_282555_workaround() return wsrep_drupal_282555_workaround; } +my_bool get_wsrep_recovery() +{ + return wsrep_recovery; +} + my_bool get_wsrep_log_conflicts() { return wsrep_log_conflicts; |