diff options
author | Nirbhay Choubey <nirbhay@mariadb.com> | 2014-01-09 14:54:57 -0500 |
---|---|---|
committer | Nirbhay Choubey <nirbhay@mariadb.com> | 2014-01-09 14:54:57 -0500 |
commit | 31eaa90a6ef767b4f846bfe454892979200003a1 (patch) | |
tree | ac63b8be86cc6d9bfc148407fc5631bd342741c8 /sql/wsrep_mysqld.cc | |
parent | 088c069462d4a5cf26e97690e045fc3e737453a3 (diff) | |
download | mariadb-git-31eaa90a6ef767b4f846bfe454892979200003a1.tar.gz |
Merging revision 3839..3932 from codership-mysql/5.5.
Diffstat (limited to 'sql/wsrep_mysqld.cc')
-rw-r--r-- | sql/wsrep_mysqld.cc | 368 |
1 files changed, 228 insertions, 140 deletions
diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index 2181054a34c..666952e6f52 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -1,4 +1,4 @@ -/* Copyright 2008 Codership Oy <http://www.codership.com> +/* Copyright 2008-2013 Codership Oy <http://www.codership.com> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -17,11 +17,17 @@ #include <sql_class.h> #include <sql_parse.h> #include "wsrep_priv.h" +#include "wsrep_thd.h" +#include "wsrep_sst.h" +#include "wsrep_utils.h" +#include "wsrep_var.h" +#include "wsrep_binlog.h" +#include "wsrep_applier.h" #include <cstdio> #include <cstdlib> #include "log_event.h" -extern Format_description_log_event *wsrep_format_desc; +Format_description_log_event *wsrep_format_desc = NULL; wsrep_t *wsrep = NULL; my_bool wsrep_emulate_bin_log = FALSE; // activating parts of binlog interface @@ -33,24 +39,26 @@ const char* wsrep_data_home_dir = NULL; const char* wsrep_dbug_option = ""; long wsrep_slave_threads = 1; // # of slave action appliers wanted +int wsrep_slave_count_change = 0; // # of appliers to stop or start my_bool wsrep_debug = 0; // enable debug level logging my_bool wsrep_convert_LOCK_to_trx = 1; // convert locking sessions to trx ulong wsrep_retry_autocommit = 5; // retry aborted autocommit trx my_bool wsrep_auto_increment_control = 1; // control auto increment variables my_bool wsrep_drupal_282555_workaround = 1; // retry autoinc insert after dupkey my_bool wsrep_incremental_data_collection = 0; // incremental data collection -long long wsrep_max_ws_size = 1073741824LL; //max ws (RBR buffer) size -long wsrep_max_ws_rows = 65536; // max number of rows in ws +ulong wsrep_max_ws_size = 1073741824UL;//max ws (RBR buffer) size +ulong wsrep_max_ws_rows = 65536; // max number of rows in ws int wsrep_to_isolation = 0; // # of active TO isolation threads my_bool wsrep_certify_nonPK = 1; // certify, even when no primary key long wsrep_max_protocol_version = 2; // maximum protocol version to use ulong wsrep_forced_binlog_format = BINLOG_FORMAT_UNSPEC; my_bool wsrep_recovery = 0; // recovery my_bool wsrep_replicate_myisam = 0; // enable myisam replication -my_bool wsrep_log_conflicts = 0; // +my_bool wsrep_log_conflicts = 0; ulong wsrep_mysql_replication_bundle = 0; +my_bool wsrep_desync = 0; // desynchronize the node from the + // cluster my_bool wsrep_load_data_splitting = 1; // commit load data every 10K intervals -my_bool wsrep_desync = 0; // desynchronize the node from the cluster /* * End configuration options @@ -88,12 +96,12 @@ long long wsrep_cluster_conf_id = WSREP_SEQNO_UNDEFINED; const char* wsrep_cluster_status = cluster_status_str[WSREP_VIEW_DISCONNECTED]; long wsrep_cluster_size = 0; long wsrep_local_index = -1; +long long wsrep_local_bf_aborts = 0; const char* wsrep_provider_name = provider_name; const char* wsrep_provider_version = provider_version; const char* wsrep_provider_vendor = provider_vendor; /* End wsrep status variables */ - wsrep_uuid_t local_uuid = WSREP_UUID_UNDEFINED; wsrep_seqno_t local_seqno = WSREP_SEQNO_UNDEFINED; wsp::node_status local_status; @@ -104,14 +112,6 @@ long wsrep_protocol_version = 2; // if there was no state gap on receiving first view event. static my_bool wsrep_startup = TRUE; -// action execute callback -extern wsrep_status_t wsrep_apply_cb(void *ctx, - const void* buf, size_t buf_len, - wsrep_seqno_t global_seqno); - -extern wsrep_status_t wsrep_commit_cb (void *ctx, - wsrep_seqno_t global_seqno, - bool commit); static void wsrep_log_cb(wsrep_log_level_t level, const char *msg) { switch (level) { @@ -195,19 +195,25 @@ void wsrep_get_SE_checkpoint(XID* xid) plugin_foreach(NULL, get_SE_checkpoint, MYSQL_STORAGE_ENGINE_PLUGIN, xid); } -static void wsrep_view_handler_cb (void* app_ctx, - void* recv_ctx, - const wsrep_view_info_t* view, - const char* state, - size_t state_len, - void** sst_req, - ssize_t* sst_req_len) +static wsrep_cb_status_t +wsrep_view_handler_cb (void* app_ctx, + void* recv_ctx, + const wsrep_view_info_t* view, + const char* state, + size_t state_len, + void** sst_req, + size_t* sst_req_len) { + *sst_req = NULL; + *sst_req_len = 0; + wsrep_member_status_t new_status= local_status.get(); - if (memcmp(&cluster_uuid, &view->uuid, sizeof(wsrep_uuid_t))) + if (memcmp(&cluster_uuid, &view->state_id.uuid, sizeof(wsrep_uuid_t))) { - memcpy((wsrep_uuid_t*)&cluster_uuid, &view->uuid, sizeof(cluster_uuid)); + memcpy((wsrep_uuid_t*)&cluster_uuid, &view->state_id.uuid, + sizeof(cluster_uuid)); + wsrep_uuid_print (&cluster_uuid, cluster_uuid_str, sizeof(cluster_uuid_str)); } @@ -219,7 +225,7 @@ static void wsrep_view_handler_cb (void* app_ctx, WSREP_INFO("New cluster view: global state: %s:%lld, view# %lld: %s, " "number of nodes: %ld, my index: %ld, protocol version %d", - wsrep_cluster_state_uuid, (long long)view->seqno, + wsrep_cluster_state_uuid, (long long)view->state_id.seqno, (long long)wsrep_cluster_conf_id, wsrep_cluster_status, wsrep_cluster_size, wsrep_local_index, view->proto_ver); @@ -274,16 +280,18 @@ static void wsrep_view_handler_cb (void* app_ctx, WSREP_DEBUG("[debug]: closing client connections for PRIM"); wsrep_close_client_connections(TRUE); - *sst_req_len= wsrep_sst_prepare (sst_req); + ssize_t const req_len= wsrep_sst_prepare (sst_req); - if (*sst_req_len < 0) + if (req_len < 0) { - int err = *sst_req_len; - WSREP_ERROR("SST preparation failed: %d (%s)", -err, strerror(-err)); + WSREP_ERROR("SST preparation failed: %zd (%s)", -req_len, + strerror(-req_len)); new_status= WSREP_MEMBER_UNDEFINED; } else { + assert(sst_req != NULL); + *sst_req_len= req_len; new_status= WSREP_MEMBER_JOINER; } } @@ -299,14 +307,14 @@ static void wsrep_view_handler_cb (void* app_ctx, { wsrep_SE_init_grab(); // Signal mysqld init thread to continue - wsrep_sst_complete (&cluster_uuid, view->seqno, false); + wsrep_sst_complete (&cluster_uuid, view->state_id.seqno, false); // and wait for SE initialization wsrep_SE_init_wait(); } else { local_uuid= cluster_uuid; - local_seqno= view->seqno; + local_seqno= view->state_id.seqno; } /* Init storage engine XIDs from first view */ XID xid; @@ -319,7 +327,7 @@ static void wsrep_view_handler_cb (void* app_ctx, if (memcmp (&local_uuid, &cluster_uuid, sizeof (wsrep_uuid_t))) { WSREP_ERROR("Undetected state gap. Can't continue."); - wsrep_log_states(WSREP_LOG_FATAL, &cluster_uuid, view->seqno, + wsrep_log_states(WSREP_LOG_FATAL, &cluster_uuid, view->state_id.seqno, &local_uuid, -1); unireg_abort(1); } @@ -331,9 +339,23 @@ static void wsrep_view_handler_cb (void* app_ctx, global_system_variables.auto_increment_increment= view->memb_num; } + { /* capabilities may be updated on new configuration */ + uint64_t const caps(wsrep->capabilities (wsrep)); + + my_bool const idc((caps & WSREP_CAP_INCREMENTAL_WRITESET) != 0); + if (TRUE == wsrep_incremental_data_collection && FALSE == idc) + { + WSREP_WARN("Unsupported protocol downgrade: " + "incremental data collection disabled. Expect abort."); + } + wsrep_incremental_data_collection = idc; + } + out: wsrep_startup= FALSE; local_status.set(new_status, view); + + return WSREP_CB_SUCCESS; } void wsrep_ready_set (my_bool x) @@ -364,14 +386,26 @@ void wsrep_ready_wait () static void wsrep_synced_cb(void* app_ctx) { WSREP_INFO("Synchronized with group, ready for connections"); + bool signal_main= false; if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort(); if (!wsrep_ready) { wsrep_ready= TRUE; mysql_cond_signal (&COND_wsrep_ready); + signal_main= true; + } local_status.set(WSREP_MEMBER_SYNCED); mysql_mutex_unlock (&LOCK_wsrep_ready); + + if (signal_main) + { + wsrep_SE_init_grab(); + // Signal mysqld init thread to continue + wsrep_sst_complete (&local_uuid, local_seqno, false); + // and wait for SE initialization + wsrep_SE_init_wait(); + } } static void wsrep_init_position() @@ -416,6 +450,8 @@ static void wsrep_init_position() } } +extern const char* my_bind_addr_str; + int wsrep_init() { int rcode= -1; @@ -470,7 +506,7 @@ int wsrep_init() size_t const node_addr_max= sizeof(node_addr) - 1; if (!wsrep_node_address || !strcmp(wsrep_node_address, "")) { - size_t const ret= guess_ip(node_addr, node_addr_max); + size_t const ret= wsrep_guess_ip(node_addr, node_addr_max); if (!(ret > 0 && ret < node_addr_max)) { WSREP_WARN("Failed to guess base node address. Set it explicitly via " @@ -488,38 +524,57 @@ int wsrep_init() if ((!wsrep_node_incoming_address || !strcmp (wsrep_node_incoming_address, WSREP_NODE_INCOMING_AUTO))) { - size_t const node_addr_len= strlen(node_addr); - if (node_addr_len > 0) + unsigned int my_bind_ip= INADDR_ANY; // default if not set + if (my_bind_addr_str && strlen(my_bind_addr_str)) + { + my_bind_ip= wsrep_check_ip(my_bind_addr_str); + } + + if (INADDR_ANY != my_bind_ip) { - const char* const colon= strrchr(node_addr, ':'); - if (strchr(node_addr, ':') == colon) // 1 or 0 ':' + if (INADDR_NONE != my_bind_ip && INADDR_LOOPBACK != my_bind_ip) { - size_t const ip_len= colon ? colon - node_addr : node_addr_len; - if (ip_len + 7 /* :55555\0 */ < inc_addr_max) + snprintf(inc_addr, inc_addr_max, "%s:%u", + my_bind_addr_str, (int)mysqld_port); + } // else leave inc_addr an empty string - mysqld is not listening for + // client connections on network interfaces. + } + else // mysqld binds to 0.0.0.0, take IP from wsrep_node_address if possible + { + size_t const node_addr_len= strlen(node_addr); + if (node_addr_len > 0) + { + const char* const colon= strrchr(node_addr, ':'); + if (strchr(node_addr, ':') == colon) // 1 or 0 ':' { - memcpy (inc_addr, node_addr, ip_len); - snprintf(inc_addr + ip_len, inc_addr_max - ip_len, ":%u",mysqld_port); + size_t const ip_len= colon ? colon - node_addr : node_addr_len; + if (ip_len + 7 /* :55555\0 */ < inc_addr_max) + { + memcpy (inc_addr, node_addr, ip_len); + snprintf(inc_addr + ip_len, inc_addr_max - ip_len, ":%u", + (int)mysqld_port); + } + else + { + WSREP_WARN("Guessing address for incoming client connections: " + "address too long."); + inc_addr[0]= '\0'; + } } else { WSREP_WARN("Guessing address for incoming client connections: " - "address too long."); + "too many colons :) ."); inc_addr[0]= '\0'; } } - else + + if (!strlen(inc_addr)) { - WSREP_WARN("Guessing address for incoming client connections: " - "too many colons :) ."); - inc_addr[0]= '\0'; + WSREP_WARN("Guessing address for incoming client connections failed. " + "Try setting wsrep_node_incoming_address explicitly."); } } - - if (!strlen(inc_addr)) - { - WSREP_WARN("Guessing address for incoming client connections failed. " - "Try setting wsrep_node_incoming_address explicitly."); - } } else if (!strchr(wsrep_node_incoming_address, ':')) // no port included { @@ -546,6 +601,8 @@ int wsrep_init() struct wsrep_init_args wsrep_args; + struct wsrep_gtid const state_id = { local_uuid, local_seqno }; + wsrep_args.data_dir = wsrep_data_home_dir; wsrep_args.node_name = (wsrep_node_name) ? wsrep_node_name : ""; wsrep_args.node_address = node_addr; @@ -554,13 +611,13 @@ int wsrep_init() wsrep_provider_options : ""; wsrep_args.proto_ver = wsrep_max_protocol_version; - wsrep_args.state_uuid = &local_uuid; - wsrep_args.state_seqno = local_seqno; + wsrep_args.state_id = &state_id; wsrep_args.logger_cb = wsrep_log_cb; wsrep_args.view_handler_cb = wsrep_view_handler_cb; wsrep_args.apply_cb = wsrep_apply_cb; wsrep_args.commit_cb = wsrep_commit_cb; + wsrep_args.unordered_cb = wsrep_unordered_cb; wsrep_args.sst_donate_cb = wsrep_sst_donate_cb; wsrep_args.synced_cb = wsrep_synced_cb; @@ -661,8 +718,36 @@ void wsrep_stop_replication(THD *thd) return; } - -extern my_bool wsrep_new_cluster; +/* This one is set to true when --wsrep-new-cluster is found in the command + * line arguments */ +static my_bool wsrep_new_cluster= FALSE; +#define WSREP_NEW_CLUSTER "--wsrep-new-cluster" +/* Finds and hides --wsrep-new-cluster from the arguments list + * by moving it to the end of the list and decrementing argument count */ +void wsrep_filter_new_cluster (int* argc, char* argv[]) +{ + int i; + for (i= *argc - 1; i > 0; i--) + { + /* make a copy of the argument to convert possible underscores to hyphens. + * the copy need not to be longer than WSREP_NEW_CLUSTER option */ + char arg[sizeof(WSREP_NEW_CLUSTER) + 2]= { 0, }; + strncpy(arg, argv[i], sizeof(arg) - 1); + char* underscore; + while (NULL != (underscore= strchr(arg, '_'))) *underscore= '-'; + + if (!strcmp(arg, WSREP_NEW_CLUSTER)) + { + wsrep_new_cluster= TRUE; + *argc -= 1; + /* preserve the order of remaining arguments AND + * preserve the original argument pointers - just in case */ + char* wnc= argv[i]; + memmove(&argv[i], &argv[i + 1], (*argc - i)*sizeof(argv[i])); + argv[*argc]= wnc; /* this will be invisible to the rest of the program */ + } + } +} bool wsrep_start_replication() { @@ -686,20 +771,16 @@ bool wsrep_start_replication() return true; } - /* Note 'bootstrap' address is not officially supported in wsrep API #23 - but it can be back ported from #24 provider to get sneak preview of - bootstrap command - */ - const char* cluster_address = - wsrep_new_cluster ? "bootstrap" : wsrep_cluster_address; + bool const bootstrap(TRUE == wsrep_new_cluster); wsrep_new_cluster= FALSE; WSREP_INFO("Start replication"); if ((rcode = wsrep->connect(wsrep, wsrep_cluster_name, - cluster_address, - wsrep_sst_donor))) + wsrep_cluster_address, + wsrep_sst_donor, + bootstrap))) { if (-ESOCKTNOSUPPORT == rcode) { @@ -720,11 +801,6 @@ bool wsrep_start_replication() { wsrep_connected= TRUE; - uint64_t caps = wsrep->capabilities (wsrep); - - wsrep_incremental_data_collection = - !!(caps & WSREP_CAP_WRITE_SET_INCREMENTS); - char* opts= wsrep->options_get(wsrep); if (opts) { @@ -749,8 +825,8 @@ wsrep_causal_wait (THD* thd) { // This allows autocommit SELECTs and a first SELECT after SET AUTOCOMMIT=0 // TODO: modify to check if thd has locked any rows. - wsrep_seqno_t seqno; - wsrep_status_t ret= wsrep->causal_read (wsrep, &seqno); + wsrep_gtid_t gtid; + wsrep_status_t ret= wsrep->causal_read (wsrep, >id); if (unlikely(WSREP_OK != ret)) { @@ -798,7 +874,7 @@ static void wsrep_keys_free(wsrep_key_arr_t* key_arr) { for (size_t i= 0; i < key_arr->keys_len; ++i) { - my_free((wsrep_key_part_t*)key_arr->keys[i].key_parts); + my_free((void*)key_arr->keys[i].key_parts); } my_free(key_arr->keys); key_arr->keys= 0; @@ -818,7 +894,7 @@ static void wsrep_keys_free(wsrep_key_arr_t* key_arr) static bool wsrep_prepare_key_for_isolation(const char* db, const char* table, - wsrep_key_part_t* key, + wsrep_buf_t* key, size_t* key_len) { if (*key_len < 2) return false; @@ -837,13 +913,13 @@ static bool wsrep_prepare_key_for_isolation(const char* db, // sql_print_information("%s.%s", db, table); if (db) { - key[*key_len].buf= db; - key[*key_len].buf_len= strlen(db); + key[*key_len].ptr= db; + key[*key_len].len= strlen(db); ++(*key_len); if (table) { - key[*key_len].buf= table; - key[*key_len].buf_len= strlen(table); + key[*key_len].ptr= table; + key[*key_len].len= strlen(table); ++(*key_len); } } @@ -879,23 +955,23 @@ static bool wsrep_prepare_keys_for_isolation(THD* thd, { if (!(ka->keys= (wsrep_key_t*)my_malloc(sizeof(wsrep_key_t), MYF(0)))) { - sql_print_error("Can't allocate memory for key_array"); + WSREP_ERROR("Can't allocate memory for key_array"); goto err; } ka->keys_len= 1; - if (!(ka->keys[0].key_parts= (wsrep_key_part_t*) - my_malloc(sizeof(wsrep_key_part_t)*2, MYF(0)))) + if (!(ka->keys[0].key_parts= (wsrep_buf_t*) + my_malloc(sizeof(wsrep_buf_t)*2, MYF(0)))) { - sql_print_error("Can't allocate memory for key_parts"); + WSREP_ERROR("Can't allocate memory for key_parts"); goto err; } - ka->keys[0].key_parts_len= 2; + ka->keys[0].key_parts_num= 2; if (!wsrep_prepare_key_for_isolation( db, table, - (wsrep_key_part_t*)ka->keys[0].key_parts, - &ka->keys[0].key_parts_len)) + (wsrep_buf_t*)ka->keys[0].key_parts, + &ka->keys[0].key_parts_num)) { - sql_print_error("Preparing keys for isolation failed"); + WSREP_ERROR("Preparing keys for isolation failed"); goto err; } } @@ -910,24 +986,24 @@ static bool wsrep_prepare_keys_for_isolation(THD* thd, ka->keys, (ka->keys_len + 1) * sizeof(wsrep_key_t), MYF(0)); if (!tmp) { - sql_print_error("Can't allocate memory for key_array"); + WSREP_ERROR("Can't allocate memory for key_array"); goto err; } ka->keys= tmp; - if (!(ka->keys[ka->keys_len].key_parts= (wsrep_key_part_t*) - my_malloc(sizeof(wsrep_key_part_t)*2, MYF(0)))) + if (!(ka->keys[ka->keys_len].key_parts= (wsrep_buf_t*) + my_malloc(sizeof(wsrep_buf_t)*2, MYF(0)))) { - sql_print_error("Can't allocate memory for key_parts"); + WSREP_ERROR("Can't allocate memory for key_parts"); goto err; } - ka->keys[ka->keys_len].key_parts_len= 2; + ka->keys[ka->keys_len].key_parts_num= 2; ++ka->keys_len; if (!wsrep_prepare_key_for_isolation( table->db, table->table_name, - (wsrep_key_part_t*)ka->keys[ka->keys_len - 1].key_parts, - &ka->keys[ka->keys_len - 1].key_parts_len)) + (wsrep_buf_t*)ka->keys[ka->keys_len - 1].key_parts, + &ka->keys[ka->keys_len - 1].key_parts_num)) { - sql_print_error("Preparing keys for isolation failed"); + WSREP_ERROR("Preparing keys for isolation failed"); goto err; } } @@ -939,12 +1015,11 @@ err: } - bool wsrep_prepare_key_for_innodb(const uchar* cache_key, - size_t cache_key_len, + size_t cache_key_len, const uchar* row_id, size_t row_id_len, - wsrep_key_part_t* key, + wsrep_buf_t* key, size_t* key_len) { if (*key_len < 3) return false; @@ -954,33 +1029,36 @@ bool wsrep_prepare_key_for_innodb(const uchar* cache_key, { case 0: { - key[*key_len].buf = cache_key; - key[*key_len].buf_len = cache_key_len; - ++(*key_len); + key[0].ptr = cache_key; + key[0].len = cache_key_len; + + *key_len = 1; break; } case 1: case 2: { - key[*key_len].buf = cache_key; - key[*key_len].buf_len = strlen( (char*)cache_key ); - ++(*key_len); - key[*key_len].buf = cache_key + strlen( (char*)cache_key ) + 1; - key[*key_len].buf_len = strlen( (char*)(key[*key_len].buf) ); - ++(*key_len); + key[0].ptr = cache_key; + key[0].len = strlen( (char*)cache_key ); + + key[1].ptr = cache_key + strlen( (char*)cache_key ) + 1; + key[1].len = strlen( (char*)(key[1].ptr) ); + + *key_len = 2; break; } default: return false; } - key[*key_len].buf = row_id; - key[*key_len].buf_len = row_id_len; + key[*key_len].ptr = row_id; + key[*key_len].len = row_id_len; ++(*key_len); return true; } + /* * Construct Query_log_Event from thd query and serialize it * into buffer. @@ -988,7 +1066,7 @@ bool wsrep_prepare_key_for_innodb(const uchar* cache_key, * Return 0 in case of success, 1 in case of error. */ int wsrep_to_buf_helper( - THD* thd, const char *query, uint query_len, uchar** buf, uint* buf_len) + THD* thd, const char *query, uint query_len, uchar** buf, size_t* buf_len) { IO_CACHE tmp_io_cache; if (open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX, @@ -999,9 +1077,9 @@ int wsrep_to_buf_helper( /* if there is prepare query, add event for it */ if (thd->wsrep_TOI_pre_query) { - Query_log_event ev(thd, thd->wsrep_TOI_pre_query, - thd->wsrep_TOI_pre_query_len, - FALSE, FALSE, FALSE, 0); + Query_log_event ev(thd, thd->wsrep_TOI_pre_query, + thd->wsrep_TOI_pre_query_len, + FALSE, FALSE, FALSE, 0); if (ev.write(&tmp_io_cache)) ret= 1; } @@ -1009,7 +1087,7 @@ int wsrep_to_buf_helper( Query_log_event ev(thd, query, query_len, FALSE, FALSE, FALSE, 0); if (ev.write(&tmp_io_cache)) ret= 1; - if (!ret && wsrep_write_cache(&tmp_io_cache, buf, buf_len)) ret= 1; + if (!ret && wsrep_write_cache_buf(&tmp_io_cache, buf, buf_len)) ret= 1; close_cached_file(&tmp_io_cache); return ret; @@ -1017,7 +1095,7 @@ int wsrep_to_buf_helper( #include "sql_show.h" static int -create_view_query(THD *thd, uchar** buf, uint* buf_len) +create_view_query(THD *thd, uchar** buf, size_t* buf_len) { LEX *lex= thd->lex; SELECT_LEX *select_lex= &lex->select_lex; @@ -1036,15 +1114,15 @@ create_view_query(THD *thd, uchar** buf, uint* buf_len) if (!lex->definer) { /* - DEFINER-clause is missing; we have to create default definer in - persistent arena to be PS/SP friendly. - If this is an ALTER VIEW then the current user should be set as - the definer. + DEFINER-clause is missing; we have to create default definer in + persistent arena to be PS/SP friendly. + If this is an ALTER VIEW then the current user should be set as + the definer. */ if (!(lex->definer= create_default_definer(thd))) { - WSREP_WARN("view default definer issue"); + WSREP_WARN("view default definer issue"); } } @@ -1071,7 +1149,7 @@ create_view_query(THD *thd, uchar** buf, uint* buf_len) List_iterator_fast<LEX_STRING> names(lex->view_list); LEX_STRING *name; int i; - + for (i= 0; (name= names++); i++) { buff.append(i ? ", " : "("); @@ -1082,7 +1160,7 @@ create_view_query(THD *thd, uchar** buf, uint* buf_len) buff.append(STRING_WITH_LEN(" AS ")); //buff.append(views->source.str, views->source.length); buff.append(thd->lex->create_view_select.str, - thd->lex->create_view_select.length); + thd->lex->create_view_select.length); //int errcode= query_error_code(thd, TRUE); //if (thd->binlog_query(THD::STMT_QUERY_TYPE, // buff.ptr(), buff.length(), FALSE, FALSE, FALSE, errcod @@ -1094,11 +1172,11 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_, { wsrep_status_t ret(WSREP_WARNING); uchar* buf(0); - uint buf_len(0); + size_t buf_len(0); int buf_err; - WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)thd->wsrep_trx_seqno, - thd->wsrep_exec_mode, thd->query() ); + WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd), + thd->wsrep_exec_mode, thd->query() ); switch (thd->lex->sql_command) { case SQLCOM_CREATE_VIEW: @@ -1121,19 +1199,20 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_, } wsrep_key_arr_t key_arr= {0, 0}; + struct wsrep_buf buff = { buf, buf_len }; if (!buf_err && wsrep_prepare_keys_for_isolation(thd, db_, table_, table_list, &key_arr)&& WSREP_OK == (ret = wsrep->to_execute_start(wsrep, thd->thread_id, key_arr.keys, key_arr.keys_len, - buf, buf_len, - &thd->wsrep_trx_seqno))) + &buff, 1, + &thd->wsrep_trx_meta))) { thd->wsrep_exec_mode= TOTAL_ORDER; wsrep_to_isolation++; if (buf) my_free(buf); wsrep_keys_free(&key_arr); - WSREP_DEBUG("TO BEGIN: %lld, %d",(long long)thd->wsrep_trx_seqno, - thd->wsrep_exec_mode); + WSREP_DEBUG("TO BEGIN: %lld, %d",(long long)wsrep_thd_trx_seqno(thd), + thd->wsrep_exec_mode); } else { /* jump to error handler in mysql_execute_command() */ @@ -1152,10 +1231,11 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_, static void wsrep_TOI_end(THD *thd) { wsrep_status_t ret; wsrep_to_isolation--; - WSREP_DEBUG("TO END: %lld, %d : %s", (long long)thd->wsrep_trx_seqno, + + WSREP_DEBUG("TO END: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd), thd->wsrep_exec_mode, (thd->query()) ? thd->query() : "void"); if (WSREP_OK == (ret = wsrep->to_execute_end(wsrep, thd->thread_id))) { - WSREP_DEBUG("TO END: %lld", (long long)thd->wsrep_trx_seqno); + WSREP_DEBUG("TO END: %lld", (long long)wsrep_thd_trx_seqno(thd)); } else { WSREP_WARN("TO isolation end failed for: %d, sql: %s", @@ -1166,7 +1246,7 @@ static void wsrep_TOI_end(THD *thd) { static int wsrep_RSU_begin(THD *thd, char *db_, char *table_) { wsrep_status_t ret(WSREP_WARNING); - WSREP_DEBUG("RSU BEGIN: %lld, %d : %s", (long long)thd->wsrep_trx_seqno, + WSREP_DEBUG("RSU BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd), thd->wsrep_exec_mode, thd->query() ); ret = wsrep->desync(wsrep); @@ -1211,7 +1291,7 @@ static int wsrep_RSU_begin(THD *thd, char *db_, char *table_) static void wsrep_RSU_end(THD *thd) { wsrep_status_t ret(WSREP_WARNING); - WSREP_DEBUG("RSU END: %lld, %d : %s", (long long)thd->wsrep_trx_seqno, + WSREP_DEBUG("RSU END: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd), thd->wsrep_exec_mode, thd->query() ); @@ -1255,13 +1335,27 @@ int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_, mysql_mutex_unlock(&thd->LOCK_wsrep_thd); DBUG_ASSERT(thd->wsrep_exec_mode == LOCAL_STATE); - DBUG_ASSERT(thd->wsrep_trx_seqno == WSREP_SEQNO_UNDEFINED); + DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno == WSREP_SEQNO_UNDEFINED); if (wsrep_debug && thd->mdl_context.has_locks()) { WSREP_DEBUG("thread holds MDL locks at TI begin: %s %lu", thd->query(), thd->thread_id); } + + /* + It makes sense to set auto_increment_* to defaults in TOI operations. + Must be done before wsrep_TOI_begin() since Query_log_event encapsulating + TOI statement and auto inc variables for wsrep replication is constructed + there. Variables are reset back in THD::reset_for_next_command() before + processing of next command. + */ + if (wsrep_auto_increment_control) + { + thd->variables.auto_increment_offset = 1; + thd->variables.auto_increment_increment = 1; + } + if (thd->variables.wsrep_on && thd->wsrep_exec_mode==LOCAL_STATE) { switch (wsrep_OSU_method_options) { @@ -1272,12 +1366,6 @@ int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_, if (!ret) { thd->wsrep_exec_mode= TOTAL_ORDER; - /* It makes sense to set auto_increment_* to defaults in TOI operations */ - if (wsrep_auto_increment_control) - { - thd->variables.auto_increment_offset = 1; - thd->variables.auto_increment_increment = 1; - } } } return ret; @@ -1302,10 +1390,10 @@ void wsrep_to_isolation_end(THD *thd) "request: (%lu \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)\n" \ "granted: (%lu \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)", \ msg, \ - req->thread_id, (long long)req->wsrep_trx_seqno, \ + req->thread_id, (long long)wsrep_thd_trx_seqno(req), \ req->wsrep_exec_mode, req->wsrep_query_state, req->wsrep_conflict_state, \ req->command, req->lex->sql_command, req->query(), \ - gra->thread_id, (long long)gra->wsrep_trx_seqno, \ + gra->thread_id, (long long)wsrep_thd_trx_seqno(gra), \ gra->wsrep_exec_mode, gra->wsrep_query_state, gra->wsrep_conflict_state, \ gra->command, gra->lex->sql_command, gra->query()); |