summaryrefslogtreecommitdiff
path: root/sql/wsrep_mysqld.cc
diff options
context:
space:
mode:
authorNirbhay Choubey <nirbhay@mariadb.com>2014-01-09 14:54:57 -0500
committerNirbhay Choubey <nirbhay@mariadb.com>2014-01-09 14:54:57 -0500
commit31eaa90a6ef767b4f846bfe454892979200003a1 (patch)
treeac63b8be86cc6d9bfc148407fc5631bd342741c8 /sql/wsrep_mysqld.cc
parent088c069462d4a5cf26e97690e045fc3e737453a3 (diff)
downloadmariadb-git-31eaa90a6ef767b4f846bfe454892979200003a1.tar.gz
Merging revision 3839..3932 from codership-mysql/5.5.
Diffstat (limited to 'sql/wsrep_mysqld.cc')
-rw-r--r--sql/wsrep_mysqld.cc368
1 files changed, 228 insertions, 140 deletions
diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc
index 2181054a34c..666952e6f52 100644
--- a/sql/wsrep_mysqld.cc
+++ b/sql/wsrep_mysqld.cc
@@ -1,4 +1,4 @@
-/* Copyright 2008 Codership Oy <http://www.codership.com>
+/* Copyright 2008-2013 Codership Oy <http://www.codership.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -17,11 +17,17 @@
#include <sql_class.h>
#include <sql_parse.h>
#include "wsrep_priv.h"
+#include "wsrep_thd.h"
+#include "wsrep_sst.h"
+#include "wsrep_utils.h"
+#include "wsrep_var.h"
+#include "wsrep_binlog.h"
+#include "wsrep_applier.h"
#include <cstdio>
#include <cstdlib>
#include "log_event.h"
-extern Format_description_log_event *wsrep_format_desc;
+Format_description_log_event *wsrep_format_desc = NULL;
wsrep_t *wsrep = NULL;
my_bool wsrep_emulate_bin_log = FALSE; // activating parts of binlog interface
@@ -33,24 +39,26 @@ const char* wsrep_data_home_dir = NULL;
const char* wsrep_dbug_option = "";
long wsrep_slave_threads = 1; // # of slave action appliers wanted
+int wsrep_slave_count_change = 0; // # of appliers to stop or start
my_bool wsrep_debug = 0; // enable debug level logging
my_bool wsrep_convert_LOCK_to_trx = 1; // convert locking sessions to trx
ulong wsrep_retry_autocommit = 5; // retry aborted autocommit trx
my_bool wsrep_auto_increment_control = 1; // control auto increment variables
my_bool wsrep_drupal_282555_workaround = 1; // retry autoinc insert after dupkey
my_bool wsrep_incremental_data_collection = 0; // incremental data collection
-long long wsrep_max_ws_size = 1073741824LL; //max ws (RBR buffer) size
-long wsrep_max_ws_rows = 65536; // max number of rows in ws
+ulong wsrep_max_ws_size = 1073741824UL;//max ws (RBR buffer) size
+ulong wsrep_max_ws_rows = 65536; // max number of rows in ws
int wsrep_to_isolation = 0; // # of active TO isolation threads
my_bool wsrep_certify_nonPK = 1; // certify, even when no primary key
long wsrep_max_protocol_version = 2; // maximum protocol version to use
ulong wsrep_forced_binlog_format = BINLOG_FORMAT_UNSPEC;
my_bool wsrep_recovery = 0; // recovery
my_bool wsrep_replicate_myisam = 0; // enable myisam replication
-my_bool wsrep_log_conflicts = 0; //
+my_bool wsrep_log_conflicts = 0;
ulong wsrep_mysql_replication_bundle = 0;
+my_bool wsrep_desync = 0; // desynchronize the node from the
+ // cluster
my_bool wsrep_load_data_splitting = 1; // commit load data every 10K intervals
-my_bool wsrep_desync = 0; // desynchronize the node from the cluster
/*
* End configuration options
@@ -88,12 +96,12 @@ long long wsrep_cluster_conf_id = WSREP_SEQNO_UNDEFINED;
const char* wsrep_cluster_status = cluster_status_str[WSREP_VIEW_DISCONNECTED];
long wsrep_cluster_size = 0;
long wsrep_local_index = -1;
+long long wsrep_local_bf_aborts = 0;
const char* wsrep_provider_name = provider_name;
const char* wsrep_provider_version = provider_version;
const char* wsrep_provider_vendor = provider_vendor;
/* End wsrep status variables */
-
wsrep_uuid_t local_uuid = WSREP_UUID_UNDEFINED;
wsrep_seqno_t local_seqno = WSREP_SEQNO_UNDEFINED;
wsp::node_status local_status;
@@ -104,14 +112,6 @@ long wsrep_protocol_version = 2;
// if there was no state gap on receiving first view event.
static my_bool wsrep_startup = TRUE;
-// action execute callback
-extern wsrep_status_t wsrep_apply_cb(void *ctx,
- const void* buf, size_t buf_len,
- wsrep_seqno_t global_seqno);
-
-extern wsrep_status_t wsrep_commit_cb (void *ctx,
- wsrep_seqno_t global_seqno,
- bool commit);
static void wsrep_log_cb(wsrep_log_level_t level, const char *msg) {
switch (level) {
@@ -195,19 +195,25 @@ void wsrep_get_SE_checkpoint(XID* xid)
plugin_foreach(NULL, get_SE_checkpoint, MYSQL_STORAGE_ENGINE_PLUGIN, xid);
}
-static void wsrep_view_handler_cb (void* app_ctx,
- void* recv_ctx,
- const wsrep_view_info_t* view,
- const char* state,
- size_t state_len,
- void** sst_req,
- ssize_t* sst_req_len)
+static wsrep_cb_status_t
+wsrep_view_handler_cb (void* app_ctx,
+ void* recv_ctx,
+ const wsrep_view_info_t* view,
+ const char* state,
+ size_t state_len,
+ void** sst_req,
+ size_t* sst_req_len)
{
+ *sst_req = NULL;
+ *sst_req_len = 0;
+
wsrep_member_status_t new_status= local_status.get();
- if (memcmp(&cluster_uuid, &view->uuid, sizeof(wsrep_uuid_t)))
+ if (memcmp(&cluster_uuid, &view->state_id.uuid, sizeof(wsrep_uuid_t)))
{
- memcpy((wsrep_uuid_t*)&cluster_uuid, &view->uuid, sizeof(cluster_uuid));
+ memcpy((wsrep_uuid_t*)&cluster_uuid, &view->state_id.uuid,
+ sizeof(cluster_uuid));
+
wsrep_uuid_print (&cluster_uuid, cluster_uuid_str,
sizeof(cluster_uuid_str));
}
@@ -219,7 +225,7 @@ static void wsrep_view_handler_cb (void* app_ctx,
WSREP_INFO("New cluster view: global state: %s:%lld, view# %lld: %s, "
"number of nodes: %ld, my index: %ld, protocol version %d",
- wsrep_cluster_state_uuid, (long long)view->seqno,
+ wsrep_cluster_state_uuid, (long long)view->state_id.seqno,
(long long)wsrep_cluster_conf_id, wsrep_cluster_status,
wsrep_cluster_size, wsrep_local_index, view->proto_ver);
@@ -274,16 +280,18 @@ static void wsrep_view_handler_cb (void* app_ctx,
WSREP_DEBUG("[debug]: closing client connections for PRIM");
wsrep_close_client_connections(TRUE);
- *sst_req_len= wsrep_sst_prepare (sst_req);
+ ssize_t const req_len= wsrep_sst_prepare (sst_req);
- if (*sst_req_len < 0)
+ if (req_len < 0)
{
- int err = *sst_req_len;
- WSREP_ERROR("SST preparation failed: %d (%s)", -err, strerror(-err));
+ WSREP_ERROR("SST preparation failed: %zd (%s)", -req_len,
+ strerror(-req_len));
new_status= WSREP_MEMBER_UNDEFINED;
}
else
{
+ assert(sst_req != NULL);
+ *sst_req_len= req_len;
new_status= WSREP_MEMBER_JOINER;
}
}
@@ -299,14 +307,14 @@ static void wsrep_view_handler_cb (void* app_ctx,
{
wsrep_SE_init_grab();
// Signal mysqld init thread to continue
- wsrep_sst_complete (&cluster_uuid, view->seqno, false);
+ wsrep_sst_complete (&cluster_uuid, view->state_id.seqno, false);
// and wait for SE initialization
wsrep_SE_init_wait();
}
else
{
local_uuid= cluster_uuid;
- local_seqno= view->seqno;
+ local_seqno= view->state_id.seqno;
}
/* Init storage engine XIDs from first view */
XID xid;
@@ -319,7 +327,7 @@ static void wsrep_view_handler_cb (void* app_ctx,
if (memcmp (&local_uuid, &cluster_uuid, sizeof (wsrep_uuid_t)))
{
WSREP_ERROR("Undetected state gap. Can't continue.");
- wsrep_log_states(WSREP_LOG_FATAL, &cluster_uuid, view->seqno,
+ wsrep_log_states(WSREP_LOG_FATAL, &cluster_uuid, view->state_id.seqno,
&local_uuid, -1);
unireg_abort(1);
}
@@ -331,9 +339,23 @@ static void wsrep_view_handler_cb (void* app_ctx,
global_system_variables.auto_increment_increment= view->memb_num;
}
+ { /* capabilities may be updated on new configuration */
+ uint64_t const caps(wsrep->capabilities (wsrep));
+
+ my_bool const idc((caps & WSREP_CAP_INCREMENTAL_WRITESET) != 0);
+ if (TRUE == wsrep_incremental_data_collection && FALSE == idc)
+ {
+ WSREP_WARN("Unsupported protocol downgrade: "
+ "incremental data collection disabled. Expect abort.");
+ }
+ wsrep_incremental_data_collection = idc;
+ }
+
out:
wsrep_startup= FALSE;
local_status.set(new_status, view);
+
+ return WSREP_CB_SUCCESS;
}
void wsrep_ready_set (my_bool x)
@@ -364,14 +386,26 @@ void wsrep_ready_wait ()
static void wsrep_synced_cb(void* app_ctx)
{
WSREP_INFO("Synchronized with group, ready for connections");
+ bool signal_main= false;
if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort();
if (!wsrep_ready)
{
wsrep_ready= TRUE;
mysql_cond_signal (&COND_wsrep_ready);
+ signal_main= true;
+
}
local_status.set(WSREP_MEMBER_SYNCED);
mysql_mutex_unlock (&LOCK_wsrep_ready);
+
+ if (signal_main)
+ {
+ wsrep_SE_init_grab();
+ // Signal mysqld init thread to continue
+ wsrep_sst_complete (&local_uuid, local_seqno, false);
+ // and wait for SE initialization
+ wsrep_SE_init_wait();
+ }
}
static void wsrep_init_position()
@@ -416,6 +450,8 @@ static void wsrep_init_position()
}
}
+extern const char* my_bind_addr_str;
+
int wsrep_init()
{
int rcode= -1;
@@ -470,7 +506,7 @@ int wsrep_init()
size_t const node_addr_max= sizeof(node_addr) - 1;
if (!wsrep_node_address || !strcmp(wsrep_node_address, ""))
{
- size_t const ret= guess_ip(node_addr, node_addr_max);
+ size_t const ret= wsrep_guess_ip(node_addr, node_addr_max);
if (!(ret > 0 && ret < node_addr_max))
{
WSREP_WARN("Failed to guess base node address. Set it explicitly via "
@@ -488,38 +524,57 @@ int wsrep_init()
if ((!wsrep_node_incoming_address ||
!strcmp (wsrep_node_incoming_address, WSREP_NODE_INCOMING_AUTO)))
{
- size_t const node_addr_len= strlen(node_addr);
- if (node_addr_len > 0)
+ unsigned int my_bind_ip= INADDR_ANY; // default if not set
+ if (my_bind_addr_str && strlen(my_bind_addr_str))
+ {
+ my_bind_ip= wsrep_check_ip(my_bind_addr_str);
+ }
+
+ if (INADDR_ANY != my_bind_ip)
{
- const char* const colon= strrchr(node_addr, ':');
- if (strchr(node_addr, ':') == colon) // 1 or 0 ':'
+ if (INADDR_NONE != my_bind_ip && INADDR_LOOPBACK != my_bind_ip)
{
- size_t const ip_len= colon ? colon - node_addr : node_addr_len;
- if (ip_len + 7 /* :55555\0 */ < inc_addr_max)
+ snprintf(inc_addr, inc_addr_max, "%s:%u",
+ my_bind_addr_str, (int)mysqld_port);
+ } // else leave inc_addr an empty string - mysqld is not listening for
+ // client connections on network interfaces.
+ }
+ else // mysqld binds to 0.0.0.0, take IP from wsrep_node_address if possible
+ {
+ size_t const node_addr_len= strlen(node_addr);
+ if (node_addr_len > 0)
+ {
+ const char* const colon= strrchr(node_addr, ':');
+ if (strchr(node_addr, ':') == colon) // 1 or 0 ':'
{
- memcpy (inc_addr, node_addr, ip_len);
- snprintf(inc_addr + ip_len, inc_addr_max - ip_len, ":%u",mysqld_port);
+ size_t const ip_len= colon ? colon - node_addr : node_addr_len;
+ if (ip_len + 7 /* :55555\0 */ < inc_addr_max)
+ {
+ memcpy (inc_addr, node_addr, ip_len);
+ snprintf(inc_addr + ip_len, inc_addr_max - ip_len, ":%u",
+ (int)mysqld_port);
+ }
+ else
+ {
+ WSREP_WARN("Guessing address for incoming client connections: "
+ "address too long.");
+ inc_addr[0]= '\0';
+ }
}
else
{
WSREP_WARN("Guessing address for incoming client connections: "
- "address too long.");
+ "too many colons :) .");
inc_addr[0]= '\0';
}
}
- else
+
+ if (!strlen(inc_addr))
{
- WSREP_WARN("Guessing address for incoming client connections: "
- "too many colons :) .");
- inc_addr[0]= '\0';
+ WSREP_WARN("Guessing address for incoming client connections failed. "
+ "Try setting wsrep_node_incoming_address explicitly.");
}
}
-
- if (!strlen(inc_addr))
- {
- WSREP_WARN("Guessing address for incoming client connections failed. "
- "Try setting wsrep_node_incoming_address explicitly.");
- }
}
else if (!strchr(wsrep_node_incoming_address, ':')) // no port included
{
@@ -546,6 +601,8 @@ int wsrep_init()
struct wsrep_init_args wsrep_args;
+ struct wsrep_gtid const state_id = { local_uuid, local_seqno };
+
wsrep_args.data_dir = wsrep_data_home_dir;
wsrep_args.node_name = (wsrep_node_name) ? wsrep_node_name : "";
wsrep_args.node_address = node_addr;
@@ -554,13 +611,13 @@ int wsrep_init()
wsrep_provider_options : "";
wsrep_args.proto_ver = wsrep_max_protocol_version;
- wsrep_args.state_uuid = &local_uuid;
- wsrep_args.state_seqno = local_seqno;
+ wsrep_args.state_id = &state_id;
wsrep_args.logger_cb = wsrep_log_cb;
wsrep_args.view_handler_cb = wsrep_view_handler_cb;
wsrep_args.apply_cb = wsrep_apply_cb;
wsrep_args.commit_cb = wsrep_commit_cb;
+ wsrep_args.unordered_cb = wsrep_unordered_cb;
wsrep_args.sst_donate_cb = wsrep_sst_donate_cb;
wsrep_args.synced_cb = wsrep_synced_cb;
@@ -661,8 +718,36 @@ void wsrep_stop_replication(THD *thd)
return;
}
-
-extern my_bool wsrep_new_cluster;
+/* This one is set to true when --wsrep-new-cluster is found in the command
+ * line arguments */
+static my_bool wsrep_new_cluster= FALSE;
+#define WSREP_NEW_CLUSTER "--wsrep-new-cluster"
+/* Finds and hides --wsrep-new-cluster from the arguments list
+ * by moving it to the end of the list and decrementing argument count */
+void wsrep_filter_new_cluster (int* argc, char* argv[])
+{
+ int i;
+ for (i= *argc - 1; i > 0; i--)
+ {
+ /* make a copy of the argument to convert possible underscores to hyphens.
+ * the copy need not to be longer than WSREP_NEW_CLUSTER option */
+ char arg[sizeof(WSREP_NEW_CLUSTER) + 2]= { 0, };
+ strncpy(arg, argv[i], sizeof(arg) - 1);
+ char* underscore;
+ while (NULL != (underscore= strchr(arg, '_'))) *underscore= '-';
+
+ if (!strcmp(arg, WSREP_NEW_CLUSTER))
+ {
+ wsrep_new_cluster= TRUE;
+ *argc -= 1;
+ /* preserve the order of remaining arguments AND
+ * preserve the original argument pointers - just in case */
+ char* wnc= argv[i];
+ memmove(&argv[i], &argv[i + 1], (*argc - i)*sizeof(argv[i]));
+ argv[*argc]= wnc; /* this will be invisible to the rest of the program */
+ }
+ }
+}
bool wsrep_start_replication()
{
@@ -686,20 +771,16 @@ bool wsrep_start_replication()
return true;
}
- /* Note 'bootstrap' address is not officially supported in wsrep API #23
- but it can be back ported from #24 provider to get sneak preview of
- bootstrap command
- */
- const char* cluster_address =
- wsrep_new_cluster ? "bootstrap" : wsrep_cluster_address;
+ bool const bootstrap(TRUE == wsrep_new_cluster);
wsrep_new_cluster= FALSE;
WSREP_INFO("Start replication");
if ((rcode = wsrep->connect(wsrep,
wsrep_cluster_name,
- cluster_address,
- wsrep_sst_donor)))
+ wsrep_cluster_address,
+ wsrep_sst_donor,
+ bootstrap)))
{
if (-ESOCKTNOSUPPORT == rcode)
{
@@ -720,11 +801,6 @@ bool wsrep_start_replication()
{
wsrep_connected= TRUE;
- uint64_t caps = wsrep->capabilities (wsrep);
-
- wsrep_incremental_data_collection =
- !!(caps & WSREP_CAP_WRITE_SET_INCREMENTS);
-
char* opts= wsrep->options_get(wsrep);
if (opts)
{
@@ -749,8 +825,8 @@ wsrep_causal_wait (THD* thd)
{
// This allows autocommit SELECTs and a first SELECT after SET AUTOCOMMIT=0
// TODO: modify to check if thd has locked any rows.
- wsrep_seqno_t seqno;
- wsrep_status_t ret= wsrep->causal_read (wsrep, &seqno);
+ wsrep_gtid_t gtid;
+ wsrep_status_t ret= wsrep->causal_read (wsrep, &gtid);
if (unlikely(WSREP_OK != ret))
{
@@ -798,7 +874,7 @@ static void wsrep_keys_free(wsrep_key_arr_t* key_arr)
{
for (size_t i= 0; i < key_arr->keys_len; ++i)
{
- my_free((wsrep_key_part_t*)key_arr->keys[i].key_parts);
+ my_free((void*)key_arr->keys[i].key_parts);
}
my_free(key_arr->keys);
key_arr->keys= 0;
@@ -818,7 +894,7 @@ static void wsrep_keys_free(wsrep_key_arr_t* key_arr)
static bool wsrep_prepare_key_for_isolation(const char* db,
const char* table,
- wsrep_key_part_t* key,
+ wsrep_buf_t* key,
size_t* key_len)
{
if (*key_len < 2) return false;
@@ -837,13 +913,13 @@ static bool wsrep_prepare_key_for_isolation(const char* db,
// sql_print_information("%s.%s", db, table);
if (db)
{
- key[*key_len].buf= db;
- key[*key_len].buf_len= strlen(db);
+ key[*key_len].ptr= db;
+ key[*key_len].len= strlen(db);
++(*key_len);
if (table)
{
- key[*key_len].buf= table;
- key[*key_len].buf_len= strlen(table);
+ key[*key_len].ptr= table;
+ key[*key_len].len= strlen(table);
++(*key_len);
}
}
@@ -879,23 +955,23 @@ static bool wsrep_prepare_keys_for_isolation(THD* thd,
{
if (!(ka->keys= (wsrep_key_t*)my_malloc(sizeof(wsrep_key_t), MYF(0))))
{
- sql_print_error("Can't allocate memory for key_array");
+ WSREP_ERROR("Can't allocate memory for key_array");
goto err;
}
ka->keys_len= 1;
- if (!(ka->keys[0].key_parts= (wsrep_key_part_t*)
- my_malloc(sizeof(wsrep_key_part_t)*2, MYF(0))))
+ if (!(ka->keys[0].key_parts= (wsrep_buf_t*)
+ my_malloc(sizeof(wsrep_buf_t)*2, MYF(0))))
{
- sql_print_error("Can't allocate memory for key_parts");
+ WSREP_ERROR("Can't allocate memory for key_parts");
goto err;
}
- ka->keys[0].key_parts_len= 2;
+ ka->keys[0].key_parts_num= 2;
if (!wsrep_prepare_key_for_isolation(
db, table,
- (wsrep_key_part_t*)ka->keys[0].key_parts,
- &ka->keys[0].key_parts_len))
+ (wsrep_buf_t*)ka->keys[0].key_parts,
+ &ka->keys[0].key_parts_num))
{
- sql_print_error("Preparing keys for isolation failed");
+ WSREP_ERROR("Preparing keys for isolation failed");
goto err;
}
}
@@ -910,24 +986,24 @@ static bool wsrep_prepare_keys_for_isolation(THD* thd,
ka->keys, (ka->keys_len + 1) * sizeof(wsrep_key_t), MYF(0));
if (!tmp)
{
- sql_print_error("Can't allocate memory for key_array");
+ WSREP_ERROR("Can't allocate memory for key_array");
goto err;
}
ka->keys= tmp;
- if (!(ka->keys[ka->keys_len].key_parts= (wsrep_key_part_t*)
- my_malloc(sizeof(wsrep_key_part_t)*2, MYF(0))))
+ if (!(ka->keys[ka->keys_len].key_parts= (wsrep_buf_t*)
+ my_malloc(sizeof(wsrep_buf_t)*2, MYF(0))))
{
- sql_print_error("Can't allocate memory for key_parts");
+ WSREP_ERROR("Can't allocate memory for key_parts");
goto err;
}
- ka->keys[ka->keys_len].key_parts_len= 2;
+ ka->keys[ka->keys_len].key_parts_num= 2;
++ka->keys_len;
if (!wsrep_prepare_key_for_isolation(
table->db, table->table_name,
- (wsrep_key_part_t*)ka->keys[ka->keys_len - 1].key_parts,
- &ka->keys[ka->keys_len - 1].key_parts_len))
+ (wsrep_buf_t*)ka->keys[ka->keys_len - 1].key_parts,
+ &ka->keys[ka->keys_len - 1].key_parts_num))
{
- sql_print_error("Preparing keys for isolation failed");
+ WSREP_ERROR("Preparing keys for isolation failed");
goto err;
}
}
@@ -939,12 +1015,11 @@ err:
}
-
bool wsrep_prepare_key_for_innodb(const uchar* cache_key,
- size_t cache_key_len,
+ size_t cache_key_len,
const uchar* row_id,
size_t row_id_len,
- wsrep_key_part_t* key,
+ wsrep_buf_t* key,
size_t* key_len)
{
if (*key_len < 3) return false;
@@ -954,33 +1029,36 @@ bool wsrep_prepare_key_for_innodb(const uchar* cache_key,
{
case 0:
{
- key[*key_len].buf = cache_key;
- key[*key_len].buf_len = cache_key_len;
- ++(*key_len);
+ key[0].ptr = cache_key;
+ key[0].len = cache_key_len;
+
+ *key_len = 1;
break;
}
case 1:
case 2:
{
- key[*key_len].buf = cache_key;
- key[*key_len].buf_len = strlen( (char*)cache_key );
- ++(*key_len);
- key[*key_len].buf = cache_key + strlen( (char*)cache_key ) + 1;
- key[*key_len].buf_len = strlen( (char*)(key[*key_len].buf) );
- ++(*key_len);
+ key[0].ptr = cache_key;
+ key[0].len = strlen( (char*)cache_key );
+
+ key[1].ptr = cache_key + strlen( (char*)cache_key ) + 1;
+ key[1].len = strlen( (char*)(key[1].ptr) );
+
+ *key_len = 2;
break;
}
default:
return false;
}
- key[*key_len].buf = row_id;
- key[*key_len].buf_len = row_id_len;
+ key[*key_len].ptr = row_id;
+ key[*key_len].len = row_id_len;
++(*key_len);
return true;
}
+
/*
* Construct Query_log_Event from thd query and serialize it
* into buffer.
@@ -988,7 +1066,7 @@ bool wsrep_prepare_key_for_innodb(const uchar* cache_key,
* Return 0 in case of success, 1 in case of error.
*/
int wsrep_to_buf_helper(
- THD* thd, const char *query, uint query_len, uchar** buf, uint* buf_len)
+ THD* thd, const char *query, uint query_len, uchar** buf, size_t* buf_len)
{
IO_CACHE tmp_io_cache;
if (open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX,
@@ -999,9 +1077,9 @@ int wsrep_to_buf_helper(
/* if there is prepare query, add event for it */
if (thd->wsrep_TOI_pre_query)
{
- Query_log_event ev(thd, thd->wsrep_TOI_pre_query,
- thd->wsrep_TOI_pre_query_len,
- FALSE, FALSE, FALSE, 0);
+ Query_log_event ev(thd, thd->wsrep_TOI_pre_query,
+ thd->wsrep_TOI_pre_query_len,
+ FALSE, FALSE, FALSE, 0);
if (ev.write(&tmp_io_cache)) ret= 1;
}
@@ -1009,7 +1087,7 @@ int wsrep_to_buf_helper(
Query_log_event ev(thd, query, query_len, FALSE, FALSE, FALSE, 0);
if (ev.write(&tmp_io_cache)) ret= 1;
- if (!ret && wsrep_write_cache(&tmp_io_cache, buf, buf_len)) ret= 1;
+ if (!ret && wsrep_write_cache_buf(&tmp_io_cache, buf, buf_len)) ret= 1;
close_cached_file(&tmp_io_cache);
return ret;
@@ -1017,7 +1095,7 @@ int wsrep_to_buf_helper(
#include "sql_show.h"
static int
-create_view_query(THD *thd, uchar** buf, uint* buf_len)
+create_view_query(THD *thd, uchar** buf, size_t* buf_len)
{
LEX *lex= thd->lex;
SELECT_LEX *select_lex= &lex->select_lex;
@@ -1036,15 +1114,15 @@ create_view_query(THD *thd, uchar** buf, uint* buf_len)
if (!lex->definer)
{
/*
- DEFINER-clause is missing; we have to create default definer in
- persistent arena to be PS/SP friendly.
- If this is an ALTER VIEW then the current user should be set as
- the definer.
+ DEFINER-clause is missing; we have to create default definer in
+ persistent arena to be PS/SP friendly.
+ If this is an ALTER VIEW then the current user should be set as
+ the definer.
*/
if (!(lex->definer= create_default_definer(thd)))
{
- WSREP_WARN("view default definer issue");
+ WSREP_WARN("view default definer issue");
}
}
@@ -1071,7 +1149,7 @@ create_view_query(THD *thd, uchar** buf, uint* buf_len)
List_iterator_fast<LEX_STRING> names(lex->view_list);
LEX_STRING *name;
int i;
-
+
for (i= 0; (name= names++); i++)
{
buff.append(i ? ", " : "(");
@@ -1082,7 +1160,7 @@ create_view_query(THD *thd, uchar** buf, uint* buf_len)
buff.append(STRING_WITH_LEN(" AS "));
//buff.append(views->source.str, views->source.length);
buff.append(thd->lex->create_view_select.str,
- thd->lex->create_view_select.length);
+ thd->lex->create_view_select.length);
//int errcode= query_error_code(thd, TRUE);
//if (thd->binlog_query(THD::STMT_QUERY_TYPE,
// buff.ptr(), buff.length(), FALSE, FALSE, FALSE, errcod
@@ -1094,11 +1172,11 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_,
{
wsrep_status_t ret(WSREP_WARNING);
uchar* buf(0);
- uint buf_len(0);
+ size_t buf_len(0);
int buf_err;
- WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)thd->wsrep_trx_seqno,
- thd->wsrep_exec_mode, thd->query() );
+ WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
+ thd->wsrep_exec_mode, thd->query() );
switch (thd->lex->sql_command)
{
case SQLCOM_CREATE_VIEW:
@@ -1121,19 +1199,20 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_,
}
wsrep_key_arr_t key_arr= {0, 0};
+ struct wsrep_buf buff = { buf, buf_len };
if (!buf_err &&
wsrep_prepare_keys_for_isolation(thd, db_, table_, table_list, &key_arr)&&
WSREP_OK == (ret = wsrep->to_execute_start(wsrep, thd->thread_id,
key_arr.keys, key_arr.keys_len,
- buf, buf_len,
- &thd->wsrep_trx_seqno)))
+ &buff, 1,
+ &thd->wsrep_trx_meta)))
{
thd->wsrep_exec_mode= TOTAL_ORDER;
wsrep_to_isolation++;
if (buf) my_free(buf);
wsrep_keys_free(&key_arr);
- WSREP_DEBUG("TO BEGIN: %lld, %d",(long long)thd->wsrep_trx_seqno,
- thd->wsrep_exec_mode);
+ WSREP_DEBUG("TO BEGIN: %lld, %d",(long long)wsrep_thd_trx_seqno(thd),
+ thd->wsrep_exec_mode);
}
else {
/* jump to error handler in mysql_execute_command() */
@@ -1152,10 +1231,11 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_,
static void wsrep_TOI_end(THD *thd) {
wsrep_status_t ret;
wsrep_to_isolation--;
- WSREP_DEBUG("TO END: %lld, %d : %s", (long long)thd->wsrep_trx_seqno,
+
+ WSREP_DEBUG("TO END: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
thd->wsrep_exec_mode, (thd->query()) ? thd->query() : "void");
if (WSREP_OK == (ret = wsrep->to_execute_end(wsrep, thd->thread_id))) {
- WSREP_DEBUG("TO END: %lld", (long long)thd->wsrep_trx_seqno);
+ WSREP_DEBUG("TO END: %lld", (long long)wsrep_thd_trx_seqno(thd));
}
else {
WSREP_WARN("TO isolation end failed for: %d, sql: %s",
@@ -1166,7 +1246,7 @@ static void wsrep_TOI_end(THD *thd) {
static int wsrep_RSU_begin(THD *thd, char *db_, char *table_)
{
wsrep_status_t ret(WSREP_WARNING);
- WSREP_DEBUG("RSU BEGIN: %lld, %d : %s", (long long)thd->wsrep_trx_seqno,
+ WSREP_DEBUG("RSU BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
thd->wsrep_exec_mode, thd->query() );
ret = wsrep->desync(wsrep);
@@ -1211,7 +1291,7 @@ static int wsrep_RSU_begin(THD *thd, char *db_, char *table_)
static void wsrep_RSU_end(THD *thd)
{
wsrep_status_t ret(WSREP_WARNING);
- WSREP_DEBUG("RSU END: %lld, %d : %s", (long long)thd->wsrep_trx_seqno,
+ WSREP_DEBUG("RSU END: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
thd->wsrep_exec_mode, thd->query() );
@@ -1255,13 +1335,27 @@ int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_,
mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
DBUG_ASSERT(thd->wsrep_exec_mode == LOCAL_STATE);
- DBUG_ASSERT(thd->wsrep_trx_seqno == WSREP_SEQNO_UNDEFINED);
+ DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno == WSREP_SEQNO_UNDEFINED);
if (wsrep_debug && thd->mdl_context.has_locks())
{
WSREP_DEBUG("thread holds MDL locks at TI begin: %s %lu",
thd->query(), thd->thread_id);
}
+
+ /*
+ It makes sense to set auto_increment_* to defaults in TOI operations.
+ Must be done before wsrep_TOI_begin() since Query_log_event encapsulating
+ TOI statement and auto inc variables for wsrep replication is constructed
+ there. Variables are reset back in THD::reset_for_next_command() before
+ processing of next command.
+ */
+ if (wsrep_auto_increment_control)
+ {
+ thd->variables.auto_increment_offset = 1;
+ thd->variables.auto_increment_increment = 1;
+ }
+
if (thd->variables.wsrep_on && thd->wsrep_exec_mode==LOCAL_STATE)
{
switch (wsrep_OSU_method_options) {
@@ -1272,12 +1366,6 @@ int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_,
if (!ret)
{
thd->wsrep_exec_mode= TOTAL_ORDER;
- /* It makes sense to set auto_increment_* to defaults in TOI operations */
- if (wsrep_auto_increment_control)
- {
- thd->variables.auto_increment_offset = 1;
- thd->variables.auto_increment_increment = 1;
- }
}
}
return ret;
@@ -1302,10 +1390,10 @@ void wsrep_to_isolation_end(THD *thd)
"request: (%lu \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)\n" \
"granted: (%lu \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)", \
msg, \
- req->thread_id, (long long)req->wsrep_trx_seqno, \
+ req->thread_id, (long long)wsrep_thd_trx_seqno(req), \
req->wsrep_exec_mode, req->wsrep_query_state, req->wsrep_conflict_state, \
req->command, req->lex->sql_command, req->query(), \
- gra->thread_id, (long long)gra->wsrep_trx_seqno, \
+ gra->thread_id, (long long)wsrep_thd_trx_seqno(gra), \
gra->wsrep_exec_mode, gra->wsrep_query_state, gra->wsrep_conflict_state, \
gra->command, gra->lex->sql_command, gra->query());