summaryrefslogtreecommitdiff
path: root/storage/xtradb
diff options
context:
space:
mode:
authorJan Lindström <jan.lindstrom@skysql.com>2014-08-06 15:39:15 +0300
committerJan Lindström <jan.lindstrom@skysql.com>2014-08-26 15:43:46 +0300
commitdf4dd593f29aec8e2116aec1775ad4b8833d8c93 (patch)
treebecae67f02054e15ead58e929e91c044f0b7aa15 /storage/xtradb
parente974b564389af8251c2ba51060e6129e45431586 (diff)
downloadmariadb-git-df4dd593f29aec8e2116aec1775ad4b8833d8c93.tar.gz
MDEV-6247: Merge 10.0-galera to 10.1.
Merged lp:maria/maria-10.0-galera up to revision 3879. Added a new functions to handler API to forcefully abort_transaction, producing fake_trx_id, get_checkpoint and set_checkpoint for XA. These were added for future possiblity to add more storage engines that could use galera replication.
Diffstat (limited to 'storage/xtradb')
-rw-r--r--storage/xtradb/CMakeLists.txt25
-rw-r--r--storage/xtradb/dict/dict0dict.cc24
-rw-r--r--storage/xtradb/fil/fil0fil.cc32
-rw-r--r--storage/xtradb/fil/fil0pagecompress.cc2
-rw-r--r--storage/xtradb/handler/ha_innodb.cc1481
-rw-r--r--storage/xtradb/handler/ha_innodb.h41
-rw-r--r--storage/xtradb/handler/handler0alter.cc4
-rw-r--r--storage/xtradb/include/dict0mem.h3
-rw-r--r--storage/xtradb/include/ha_prototypes.h17
-rw-r--r--storage/xtradb/include/hash0hash.h27
-rw-r--r--storage/xtradb/include/rem0rec.h9
-rw-r--r--storage/xtradb/include/srv0srv.h13
-rw-r--r--storage/xtradb/include/sync0sync.ic3
-rw-r--r--storage/xtradb/include/trx0sys.h33
-rw-r--r--storage/xtradb/include/trx0sys.ic3
-rw-r--r--storage/xtradb/include/trx0trx.h3
-rw-r--r--storage/xtradb/lock/lock0lock.cc490
-rw-r--r--storage/xtradb/lock/lock0wait.cc35
-rw-r--r--storage/xtradb/os/os0file.cc24
-rw-r--r--storage/xtradb/rem/rem0rec.cc139
-rw-r--r--storage/xtradb/row/row0ins.cc31
-rw-r--r--storage/xtradb/row/row0upd.cc288
-rw-r--r--storage/xtradb/srv/srv0conc.cc93
-rw-r--r--storage/xtradb/srv/srv0srv.cc33
-rw-r--r--storage/xtradb/trx/trx0roll.cc16
-rw-r--r--storage/xtradb/trx/trx0sys.cc134
-rw-r--r--storage/xtradb/trx/trx0trx.cc43
-rw-r--r--storage/xtradb/wsrep/md5.cc74
-rw-r--r--storage/xtradb/wsrep/wsrep_md5.h26
29 files changed, 3084 insertions, 62 deletions
diff --git a/storage/xtradb/CMakeLists.txt b/storage/xtradb/CMakeLists.txt
index 528c6f87fcc..6d9bd1a500a 100644
--- a/storage/xtradb/CMakeLists.txt
+++ b/storage/xtradb/CMakeLists.txt
@@ -268,6 +268,27 @@ ENDIF()
INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/storage/xtradb/include
${CMAKE_SOURCE_DIR}/storage/xtradb/handler)
+IF(WITH_WSREP)
+ # ssl include directory
+ INCLUDE_DIRECTORIES(${SSL_INCLUDE_DIRS}
+ ${CMAKE_SOURCE_DIR}/storage/xtradb/wsrep)
+
+ IF(SSL_DEFINES)
+ ADD_DEFINITIONS(${SSL_DEFINES})
+ ENDIF()
+
+ LINK_LIBRARIES(${SSL_LIBRARIES})
+
+ # We do RESTRICT_SYMBOL_EXPORTS(yassl) elsewhere.
+ # In order to get correct symbol visibility, these files
+ # must be compiled with "-fvisibility=hidden"
+ IF(WITH_SSL STREQUAL "bundled" AND HAVE_VISIBILITY_HIDDEN)
+ SET_SOURCE_FILES_PROPERTIES(
+ {CMAKE_CURRENT_SOURCE_DIR}/wsrep/md5.cc
+ PROPERTIES COMPILE_FLAGS "-fvisibility=hidden")
+ ENDIF()
+ENDIF()
+
# Sun Studio bug with -xO2
IF(CMAKE_CXX_COMPILER_ID MATCHES "SunPro"
AND CMAKE_CXX_FLAGS_RELEASE MATCHES "O2"
@@ -407,6 +428,10 @@ SET(INNOBASE_SOURCES
ut/ut0vec.cc
ut/ut0wqueue.cc)
+IF(WITH_WSREP)
+ SET(INNOBASE_SOURCES ${INNOBASE_SOURCES} wsrep/md5.cc)
+ENDIF()
+
IF(NOT XTRADB_OK)
MESSAGE(FATAL_ERROR "Percona XtraDB is not supported on this platform")
ENDIF()
diff --git a/storage/xtradb/dict/dict0dict.cc b/storage/xtradb/dict/dict0dict.cc
index af3518337d4..c340590b4bb 100644
--- a/storage/xtradb/dict/dict0dict.cc
+++ b/storage/xtradb/dict/dict0dict.cc
@@ -3241,7 +3241,29 @@ dict_foreign_find_index(
return(NULL);
}
-
+#ifdef WITH_WSREP
+dict_index_t*
+wsrep_dict_foreign_find_index(
+/*====================*/
+ dict_table_t* table, /*!< in: table */
+ const char** col_names, /*!< in: column names, or NULL
+ to use table->col_names */
+ const char** columns,/*!< in: array of column names */
+ ulint n_cols, /*!< in: number of columns */
+ dict_index_t* types_idx, /*!< in: NULL or an index to whose types the
+ column types must match */
+ ibool check_charsets,
+ /*!< in: whether to check charsets.
+ only has an effect if types_idx != NULL */
+ ulint check_null)
+ /*!< in: nonzero if none of the columns must
+ be declared NOT NULL */
+{
+ return dict_foreign_find_index(
+ table, col_names, columns, n_cols, types_idx, check_charsets,
+ check_null);
+}
+#endif /* WITH_WSREP */
/**********************************************************************//**
Report an error in a foreign key definition. */
static
diff --git a/storage/xtradb/fil/fil0fil.cc b/storage/xtradb/fil/fil0fil.cc
index d40cfa57bd5..343cc0f47c4 100644
--- a/storage/xtradb/fil/fil0fil.cc
+++ b/storage/xtradb/fil/fil0fil.cc
@@ -6833,36 +6833,36 @@ fil_get_page_type_name(
{
switch(page_type) {
case FIL_PAGE_PAGE_COMPRESSED:
- return "PAGE_COMPRESSED";
+ return (char *)"PAGE_COMPRESSED";
case FIL_PAGE_INDEX:
- return "INDEX";
+ return (char *)"INDEX";
case FIL_PAGE_UNDO_LOG:
- return "UNDO LOG";
+ return (char *)"UNDO LOG";
case FIL_PAGE_INODE:
- return "INODE";
+ return (char *)"INODE";
case FIL_PAGE_IBUF_FREE_LIST:
- return "IBUF_FREE_LIST";
+ return (char *)"IBUF_FREE_LIST";
case FIL_PAGE_TYPE_ALLOCATED:
- return "ALLOCATED";
+ return (char *)"ALLOCATED";
case FIL_PAGE_IBUF_BITMAP:
- return "IBUF_BITMAP";
+ return (char *)"IBUF_BITMAP";
case FIL_PAGE_TYPE_SYS:
- return "SYS";
+ return (char *)"SYS";
case FIL_PAGE_TYPE_TRX_SYS:
- return "TRX_SYS";
+ return (char *)"TRX_SYS";
case FIL_PAGE_TYPE_FSP_HDR:
- return "FSP_HDR";
+ return (char *)"FSP_HDR";
case FIL_PAGE_TYPE_XDES:
- return "XDES";
+ return (char *)"XDES";
case FIL_PAGE_TYPE_BLOB:
- return "BLOB";
+ return (char *)"BLOB";
case FIL_PAGE_TYPE_ZBLOB:
- return "ZBLOB";
+ return (char *)"ZBLOB";
case FIL_PAGE_TYPE_ZBLOB2:
- return "ZBLOB2";
+ return (char *)"ZBLOB2";
case FIL_PAGE_TYPE_COMPRESSED:
- return "ORACLE PAGE COMPRESSED";
+ return (char *)"ORACLE PAGE COMPRESSED";
default:
- return "PAGE TYPE CORRUPTED";
+ return (char *)"PAGE TYPE CORRUPTED";
}
}
diff --git a/storage/xtradb/fil/fil0pagecompress.cc b/storage/xtradb/fil/fil0pagecompress.cc
index 854b094ea81..ec19a4ef4b7 100644
--- a/storage/xtradb/fil/fil0pagecompress.cc
+++ b/storage/xtradb/fil/fil0pagecompress.cc
@@ -487,7 +487,9 @@ fil_decompress_page(
ulint actual_size = 0;
ulint compression_alg = 0;
byte *in_buf;
+#ifdef HAVE_LZO
ulint olen=0;
+#endif
ulint ptype;
ut_ad(buf);
diff --git a/storage/xtradb/handler/ha_innodb.cc b/storage/xtradb/handler/ha_innodb.cc
index fb3e097491d..4c4bccd15e1 100644
--- a/storage/xtradb/handler/ha_innodb.cc
+++ b/storage/xtradb/handler/ha_innodb.cc
@@ -120,6 +120,44 @@ this program; if not, write to the Free Software Foundation, Inc.,
# define MYSQL_PLUGIN_IMPORT /* nothing */
# endif /* MYSQL_PLUGIN_IMPORT */
+#ifdef WITH_WSREP
+#include "dict0priv.h"
+#include "../storage/innobase/include/ut0byte.h"
+#include <wsrep_mysqld.h>
+#include <wsrep_md5.h>
+
+extern my_bool wsrep_certify_nonPK;
+class binlog_trx_data;
+extern handlerton *binlog_hton;
+
+extern MYSQL_PLUGIN_IMPORT mysql_mutex_t LOCK_wsrep_rollback;
+extern MYSQL_PLUGIN_IMPORT mysql_cond_t COND_wsrep_rollback;
+extern MYSQL_PLUGIN_IMPORT wsrep_aborting_thd_t wsrep_aborting_thd;
+
+static inline wsrep_ws_handle_t*
+wsrep_ws_handle(THD* thd, const trx_t* trx) {
+ return wsrep_ws_handle_for_trx(wsrep_thd_ws_handle(thd),
+ (wsrep_trx_id_t)trx->id);
+}
+
+extern bool wsrep_prepare_key_for_innodb(const uchar *cache_key,
+ size_t cache_key_len,
+ const uchar* row_id,
+ size_t row_id_len,
+ wsrep_buf_t* key,
+ size_t* key_len);
+
+extern handlerton * wsrep_hton;
+extern TC_LOG* tc_log;
+extern void wsrep_cleanup_transaction(THD *thd);
+static int
+wsrep_abort_transaction(handlerton* hton, THD *bf_thd, THD *victim_thd,
+ my_bool signal);
+static void
+wsrep_fake_trx_id(handlerton* hton, THD *thd);
+static int innobase_wsrep_set_checkpoint(handlerton* hton, const XID* xid);
+static int innobase_wsrep_get_checkpoint(handlerton* hton, XID* xid);
+#endif /* WITH_WSREP */
/** to protect innobase_open_files */
static mysql_mutex_t innobase_share_mutex;
/** to force correct commit order in binlog */
@@ -1483,6 +1521,10 @@ innobase_srv_conc_enter_innodb(
/*===========================*/
trx_t* trx) /*!< in: transaction handle */
{
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ wsrep_thd_is_BF(trx->mysql_thd, FALSE)) return;
+#endif /* WITH_WSREP */
if (srv_thread_concurrency) {
if (trx->n_tickets_to_enter_innodb > 0) {
@@ -1517,6 +1559,10 @@ innobase_srv_conc_exit_innodb(
#ifdef UNIV_SYNC_DEBUG
ut_ad(!sync_thread_levels_nonempty_trx(trx->has_search_latch));
#endif /* UNIV_SYNC_DEBUG */
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ wsrep_thd_is_BF(trx->mysql_thd, FALSE)) return;
+#endif /* WITH_WSREP */
/* This is to avoid making an unnecessary function call. */
if (trx->declared_to_be_inside_innodb
@@ -1649,6 +1695,16 @@ thd_to_trx(
return(*(trx_t**) thd_ha_data(thd, innodb_hton_ptr));
}
+#ifdef WITH_WSREP
+ulonglong
+thd_to_trx_id(
+/*=======*/
+ THD* thd) /*!< in: MySQL thread */
+{
+ return(thd_to_trx(thd)->id);
+}
+#endif /* WITH_WSREP */
+
my_bool
ha_innobase::is_fake_change_enabled(THD* thd)
{
@@ -2149,6 +2205,9 @@ int
innobase_mysql_tmpfile(void)
/*========================*/
{
+#ifdef WITH_INNODB_DISALLOW_WRITES
+ os_event_wait(srv_allow_writes_event);
+#endif /* WITH_INNODB_DISALLOW_WRITES */
int fd2 = -1;
File fd;
@@ -3313,6 +3372,12 @@ innobase_init(
innobase_hton->tablefile_extensions = ha_innobase_exts;
innobase_hton->table_options = innodb_table_option_list;
+#ifdef WITH_WSREP
+ innobase_hton->abort_transaction=wsrep_abort_transaction;
+ innobase_hton->set_checkpoint=innobase_wsrep_set_checkpoint;
+ innobase_hton->get_checkpoint=innobase_wsrep_get_checkpoint;
+ innobase_hton->fake_trx_id=wsrep_fake_trx_id;
+#endif /* WITH_WSREP */
ut_a(DATA_MYSQL_TRUE_VARCHAR == (ulint)MYSQL_TYPE_VARCHAR);
@@ -4050,10 +4115,30 @@ innobase_commit_low(
/*================*/
trx_t* trx) /*!< in: transaction handle */
{
+#ifdef WITH_WSREP
+ THD* thd = (THD*)trx->mysql_thd;
+ const char* tmp = 0;
+ if (wsrep_on((void*)thd)) {
+#ifdef WSREP_PROC_INFO
+ char info[64];
+ info[sizeof(info) - 1] = '\0';
+ snprintf(info, sizeof(info) - 1,
+ "innobase_commit_low():trx_commit_for_mysql(%lld)",
+ (long long) wsrep_thd_trx_seqno(thd));
+ tmp = thd_proc_info(thd, info);
+
+#else
+ tmp = thd_proc_info(thd, "innobase_commit_low()");
+#endif /* WSREP_PROC_INFO */
+ }
+#endif /* WITH_WSREP */
if (trx_is_started(trx)) {
trx_commit_for_mysql(trx);
}
+#ifdef WITH_WSREP
+ if (wsrep_on((void*)thd)) { thd_proc_info(thd, tmp); }
+#endif /* WITH_WSREP */
}
/*****************************************************************//**
@@ -4828,22 +4913,32 @@ innobase_kill_connection(
DBUG_ENTER("innobase_kill_connection");
DBUG_ASSERT(hton == innodb_hton_ptr);
- lock_mutex_enter();
-
+#ifdef WITH_WSREP
+ wsrep_thd_LOCK(thd);
+ if (wsrep_thd_conflict_state(thd) != NO_CONFLICT) {
+ /* if victim has been signaled by BF thread and/or aborting
+ is already progressing, following query aborting is not necessary
+ any more.
+ Also, BF thread should own trx mutex for the victim, which would
+ conflict with trx_mutex_enter() below
+ */
+ wsrep_thd_UNLOCK(thd);
+ DBUG_VOID_RETURN;
+ }
+ wsrep_thd_UNLOCK(thd);
+#endif /* WITH_WSREP */
trx = thd_to_trx(thd);
if (trx)
{
- trx_mutex_enter(trx);
-
- /* Cancel a pending lock request. */
- if (trx->lock.wait_lock)
- lock_cancel_waiting_and_release(trx->lock.wait_lock);
-
- trx_mutex_exit(trx);
- }
-
- lock_mutex_exit();
+ /* Cancel a pending lock request. */
+ lock_mutex_enter();
+ trx_mutex_enter(trx);
+ if (trx->lock.wait_lock)
+ lock_cancel_waiting_and_release(trx->lock.wait_lock);
+ trx_mutex_exit(trx);
+ lock_mutex_exit();
+ }
DBUG_VOID_RETURN;
}
@@ -4956,7 +5051,11 @@ ha_innobase::max_supported_key_length() const
case 8192:
return(1536);
default:
+#ifdef WITH_WSREP
+ return(3500);
+#else
return(3500);
+#endif
}
}
@@ -6072,6 +6171,117 @@ get_field_offset(
return((uint) (field->ptr - table->record[0]));
}
+#ifdef WITH_WSREP
+UNIV_INTERN
+int
+wsrep_innobase_mysql_sort(
+/*===============*/
+ /* out: str contains sort string */
+ int mysql_type, /* in: MySQL type */
+ uint charset_number, /* in: number of the charset */
+ unsigned char* str, /* in: data field */
+ unsigned int str_length, /* in: data field length,
+ not UNIV_SQL_NULL */
+ unsigned int buf_length) /* in: total str buffer length */
+
+{
+ CHARSET_INFO* charset;
+ enum_field_types mysql_tp;
+ int ret_length = str_length;
+
+ DBUG_ASSERT(str_length != UNIV_SQL_NULL);
+
+ mysql_tp = (enum_field_types) mysql_type;
+
+ switch (mysql_tp) {
+
+ case MYSQL_TYPE_BIT:
+ case MYSQL_TYPE_STRING:
+ case MYSQL_TYPE_VAR_STRING:
+ case MYSQL_TYPE_TINY_BLOB:
+ case MYSQL_TYPE_MEDIUM_BLOB:
+ case MYSQL_TYPE_BLOB:
+ case MYSQL_TYPE_LONG_BLOB:
+ case MYSQL_TYPE_VARCHAR:
+ {
+ uchar tmp_str[REC_VERSION_56_MAX_INDEX_COL_LEN] = {'\0'};
+ uint tmp_length = REC_VERSION_56_MAX_INDEX_COL_LEN;
+
+ /* Use the charset number to pick the right charset struct for
+ the comparison. Since the MySQL function get_charset may be
+ slow before Bar removes the mutex operation there, we first
+ look at 2 common charsets directly. */
+
+ if (charset_number == default_charset_info->number) {
+ charset = default_charset_info;
+ } else if (charset_number == my_charset_latin1.number) {
+ charset = &my_charset_latin1;
+ } else {
+ charset = get_charset(charset_number, MYF(MY_WME));
+
+ if (charset == NULL) {
+ sql_print_error("InnoDB needs charset %lu for doing "
+ "a comparison, but MySQL cannot "
+ "find that charset.",
+ (ulong) charset_number);
+ ut_a(0);
+ }
+ }
+
+ ut_a(str_length <= tmp_length);
+ memcpy(tmp_str, str, str_length);
+
+ tmp_length = charset->coll->strnxfrm(charset, str, str_length,
+ str_length, tmp_str,
+ tmp_length, 0);
+ DBUG_ASSERT(tmp_length <= str_length);
+ if (wsrep_protocol_version < 3) {
+ tmp_length = charset->coll->strnxfrm(
+ charset, str, str_length,
+ str_length, tmp_str, tmp_length, 0);
+ DBUG_ASSERT(tmp_length <= str_length);
+ } else {
+ /* strnxfrm will expand the destination string,
+ protocols < 3 truncated the sorted sring
+ protocols >= 3 gets full sorted sring
+ */
+ tmp_length = charset->coll->strnxfrm(
+ charset, str, buf_length,
+ str_length, tmp_str, str_length, 0);
+ DBUG_ASSERT(tmp_length <= buf_length);
+ ret_length = tmp_length;
+ }
+
+ break;
+ }
+ case MYSQL_TYPE_DECIMAL :
+ case MYSQL_TYPE_TINY :
+ case MYSQL_TYPE_SHORT :
+ case MYSQL_TYPE_LONG :
+ case MYSQL_TYPE_FLOAT :
+ case MYSQL_TYPE_DOUBLE :
+ case MYSQL_TYPE_NULL :
+ case MYSQL_TYPE_TIMESTAMP :
+ case MYSQL_TYPE_LONGLONG :
+ case MYSQL_TYPE_INT24 :
+ case MYSQL_TYPE_DATE :
+ case MYSQL_TYPE_TIME :
+ case MYSQL_TYPE_DATETIME :
+ case MYSQL_TYPE_YEAR :
+ case MYSQL_TYPE_NEWDATE :
+ case MYSQL_TYPE_NEWDECIMAL :
+ case MYSQL_TYPE_ENUM :
+ case MYSQL_TYPE_SET :
+ case MYSQL_TYPE_GEOMETRY :
+ break;
+ default:
+ break;
+ }
+
+ return ret_length;
+}
+#endif /* WITH_WSREP */
+
/*************************************************************//**
InnoDB uses this function to compare two data fields for which the data type
is such that we must use MySQL code to compare them. NOTE that the prototype
@@ -6575,6 +6785,312 @@ innobase_read_from_2_little_endian(
/*******************************************************************//**
Stores a key value for a row to a buffer.
@return key value length as stored in buff */
+#ifdef WITH_WSREP
+UNIV_INTERN
+uint
+wsrep_store_key_val_for_row(
+/*===============================*/
+ THD* thd,
+ TABLE* table,
+ uint keynr, /*!< in: key number */
+ char* buff, /*!< in/out: buffer for the key value (in MySQL
+ format) */
+ uint buff_len,/*!< in: buffer length */
+ const uchar* record,
+ ibool* key_is_null)/*!< out: full key was null */
+{
+ KEY* key_info = table->key_info + keynr;
+ KEY_PART_INFO* key_part = key_info->key_part;
+ KEY_PART_INFO* end = key_part + key_info->user_defined_key_parts;
+ char* buff_start = buff;
+ enum_field_types mysql_type;
+ Field* field;
+ uint buff_space = buff_len;
+
+ DBUG_ENTER("wsrep_store_key_val_for_row");
+
+ memset(buff, 0, buff_len);
+ *key_is_null = TRUE;
+
+ for (; key_part != end; key_part++) {
+
+ uchar sorted[REC_VERSION_56_MAX_INDEX_COL_LEN] = {'\0'};
+ ibool part_is_null = FALSE;
+
+ if (key_part->null_bit) {
+ if (buff_space > 0) {
+ if (record[key_part->null_offset]
+ & key_part->null_bit) {
+ *buff = 1;
+ part_is_null = TRUE;
+ } else {
+ *buff = 0;
+ }
+ buff++;
+ buff_space--;
+ } else {
+ fprintf (stderr, "WSREP: key truncated: %s\n",
+ wsrep_thd_query(thd));
+ }
+ }
+ if (!part_is_null) *key_is_null = FALSE;
+
+ field = key_part->field;
+ mysql_type = field->type();
+
+ if (mysql_type == MYSQL_TYPE_VARCHAR) {
+ /* >= 5.0.3 true VARCHAR */
+ ulint lenlen;
+ ulint len;
+ const byte* data;
+ ulint key_len;
+ ulint true_len;
+ const CHARSET_INFO* cs;
+ int error=0;
+
+ key_len = key_part->length;
+
+ if (part_is_null) {
+ true_len = key_len + 2;
+ if (true_len > buff_space) {
+ fprintf (stderr,
+ "WSREP: key truncated: %s\n",
+ wsrep_thd_query(thd));
+ true_len = buff_space;
+ }
+ buff += true_len;
+ buff_space -= true_len;
+ continue;
+ }
+ cs = field->charset();
+
+ lenlen = (ulint)
+ (((Field_varstring*)field)->length_bytes);
+
+ data = row_mysql_read_true_varchar(&len,
+ (byte*) (record
+ + (ulint)get_field_offset(table, field)),
+ lenlen);
+
+ true_len = len;
+
+ /* For multi byte character sets we need to calculate
+ the true length of the key */
+
+ if (len > 0 && cs->mbmaxlen > 1) {
+ true_len = (ulint) cs->cset->well_formed_len(cs,
+ (const char *) data,
+ (const char *) data + len,
+ (uint) (key_len /
+ cs->mbmaxlen),
+ &error);
+ }
+
+ /* In a column prefix index, we may need to truncate
+ the stored value: */
+
+ if (true_len > key_len) {
+ true_len = key_len;
+ }
+
+ memcpy(sorted, data, true_len);
+ true_len = wsrep_innobase_mysql_sort(
+ mysql_type, cs->number, sorted, true_len,
+ REC_VERSION_56_MAX_INDEX_COL_LEN);
+
+ if (wsrep_protocol_version > 1) {
+ /* Note that we always reserve the maximum possible
+ length of the true VARCHAR in the key value, though
+ only len first bytes after the 2 length bytes contain
+ actual data. The rest of the space was reset to zero
+ in the bzero() call above. */
+ if (true_len > buff_space) {
+ fprintf (stderr,
+ "WSREP: key truncated: %s\n",
+ wsrep_thd_query(thd));
+ true_len = buff_space;
+ }
+ memcpy(buff, sorted, true_len);
+ buff += true_len;
+ buff_space -= true_len;
+ } else {
+ buff += key_len;
+ }
+ } else if (mysql_type == MYSQL_TYPE_TINY_BLOB
+ || mysql_type == MYSQL_TYPE_MEDIUM_BLOB
+ || mysql_type == MYSQL_TYPE_BLOB
+ || mysql_type == MYSQL_TYPE_LONG_BLOB
+ /* MYSQL_TYPE_GEOMETRY data is treated
+ as BLOB data in innodb. */
+ || mysql_type == MYSQL_TYPE_GEOMETRY) {
+
+ const CHARSET_INFO* cs;
+ ulint key_len;
+ ulint true_len;
+ int error=0;
+ ulint blob_len;
+ const byte* blob_data;
+
+ ut_a(key_part->key_part_flag & HA_PART_KEY_SEG);
+
+ key_len = key_part->length;
+
+ if (part_is_null) {
+ true_len = key_len + 2;
+ if (true_len > buff_space) {
+ fprintf (stderr,
+ "WSREP: key truncated: %s\n",
+ wsrep_thd_query(thd));
+ true_len = buff_space;
+ }
+ buff += true_len;
+ buff_space -= true_len;
+
+ continue;
+ }
+
+ cs = field->charset();
+
+ blob_data = row_mysql_read_blob_ref(&blob_len,
+ (byte*) (record
+ + (ulint)get_field_offset(table, field)),
+ (ulint) field->pack_length());
+
+ true_len = blob_len;
+
+ ut_a(get_field_offset(table, field)
+ == key_part->offset);
+
+ /* For multi byte character sets we need to calculate
+ the true length of the key */
+
+ if (blob_len > 0 && cs->mbmaxlen > 1) {
+ true_len = (ulint) cs->cset->well_formed_len(cs,
+ (const char *) blob_data,
+ (const char *) blob_data
+ + blob_len,
+ (uint) (key_len /
+ cs->mbmaxlen),
+ &error);
+ }
+
+ /* All indexes on BLOB and TEXT are column prefix
+ indexes, and we may need to truncate the data to be
+ stored in the key value: */
+
+ if (true_len > key_len) {
+ true_len = key_len;
+ }
+
+ memcpy(sorted, blob_data, true_len);
+ true_len = wsrep_innobase_mysql_sort(
+ mysql_type, cs->number, sorted, true_len,
+ REC_VERSION_56_MAX_INDEX_COL_LEN);
+
+
+ /* Note that we always reserve the maximum possible
+ length of the BLOB prefix in the key value. */
+ if (wsrep_protocol_version > 1) {
+ if (true_len > buff_space) {
+ fprintf (stderr,
+ "WSREP: key truncated: %s\n",
+ wsrep_thd_query(thd));
+ true_len = buff_space;
+ }
+ buff += true_len;
+ buff_space -= true_len;
+ } else {
+ buff += key_len;
+ }
+ memcpy(buff, sorted, true_len);
+ } else {
+ /* Here we handle all other data types except the
+ true VARCHAR, BLOB and TEXT. Note that the column
+ value we store may be also in a column prefix
+ index. */
+
+ const CHARSET_INFO* cs = NULL;
+ ulint true_len;
+ ulint key_len;
+ const uchar* src_start;
+ int error=0;
+ enum_field_types real_type;
+
+ key_len = key_part->length;
+
+ if (part_is_null) {
+ true_len = key_len;
+ if (true_len > buff_space) {
+ fprintf (stderr,
+ "WSREP: key truncated: %s\n",
+ wsrep_thd_query(thd));
+ true_len = buff_space;
+ }
+ buff += true_len;
+ buff_space -= true_len;
+
+ continue;
+ }
+
+ src_start = record + key_part->offset;
+ real_type = field->real_type();
+ true_len = key_len;
+
+ /* Character set for the field is defined only
+ to fields whose type is string and real field
+ type is not enum or set. For these fields check
+ if character set is multi byte. */
+
+ if (real_type != MYSQL_TYPE_ENUM
+ && real_type != MYSQL_TYPE_SET
+ && ( mysql_type == MYSQL_TYPE_VAR_STRING
+ || mysql_type == MYSQL_TYPE_STRING)) {
+
+ cs = field->charset();
+
+ /* For multi byte character sets we need to
+ calculate the true length of the key */
+
+ if (key_len > 0 && cs->mbmaxlen > 1) {
+
+ true_len = (ulint)
+ cs->cset->well_formed_len(cs,
+ (const char *)src_start,
+ (const char *)src_start
+ + key_len,
+ (uint) (key_len /
+ cs->mbmaxlen),
+ &error);
+ }
+ memcpy(sorted, src_start, true_len);
+ true_len = wsrep_innobase_mysql_sort(
+ mysql_type, cs->number, sorted, true_len,
+ REC_VERSION_56_MAX_INDEX_COL_LEN);
+
+ if (true_len > buff_space) {
+ fprintf (stderr,
+ "WSREP: key truncated: %s\n",
+ wsrep_thd_query(thd));
+ true_len = buff_space;
+ }
+ memcpy(buff, sorted, true_len);
+ } else {
+ memcpy(buff, src_start, true_len);
+ }
+ buff += true_len;
+ buff_space -= true_len;
+ }
+ }
+
+ ut_a(buff <= buff_start + buff_len);
+
+ DBUG_RETURN((uint)(buff - buff_start));
+}
+#endif /* WITH_WSREP */
+
+/*******************************************************************//**
+Stores a key value for a row to a buffer.
+@return key value length as stored in buff */
UNIV_INTERN
uint
ha_innobase::store_key_val_for_row(
@@ -7419,6 +7935,9 @@ ha_innobase::write_row(
ibool auto_inc_used= FALSE;
ulint sql_command;
trx_t* trx = thd_to_trx(user_thd);
+#ifdef WITH_WSREP
+ ibool auto_inc_inserted= FALSE; /* if NULL was inserted */
+#endif
DBUG_ENTER("ha_innobase::write_row");
@@ -7454,8 +7973,18 @@ ha_innobase::write_row(
if ((sql_command == SQLCOM_ALTER_TABLE
|| sql_command == SQLCOM_OPTIMIZE
|| sql_command == SQLCOM_CREATE_INDEX
+#ifdef WITH_WSREP
+ || (wsrep_on(user_thd) && wsrep_load_data_splitting &&
+ sql_command == SQLCOM_LOAD)
+#endif /* WITH_WSREP */
|| sql_command == SQLCOM_DROP_INDEX)
&& num_write_row >= 10000) {
+#ifdef WITH_WSREP
+ if (wsrep_on(user_thd) && sql_command == SQLCOM_LOAD) {
+ WSREP_DEBUG("forced trx split for LOAD: %s",
+ wsrep_thd_query(user_thd));
+ }
+#endif /* WITH_WSREP */
/* ALTER TABLE is COMMITted at every 10000 copied rows.
The IX table lock for the original table has to be re-issued.
As this method will be called on a temporary table where the
@@ -7489,6 +8018,21 @@ no_commit:
*/
;
} else if (src_table == prebuilt->table) {
+#ifdef WITH_WSREP
+ switch (wsrep_run_wsrep_commit(user_thd, wsrep_hton, 1))
+ {
+ case WSREP_TRX_OK:
+ break;
+ case WSREP_TRX_SIZE_EXCEEDED:
+ case WSREP_TRX_CERT_FAIL:
+ case WSREP_TRX_ERROR:
+ DBUG_RETURN(1);
+ }
+
+ if (binlog_hton->commit(binlog_hton, user_thd, 1))
+ DBUG_RETURN(1);
+ wsrep_post_commit(user_thd, TRUE);
+#endif /* WITH_WSREP */
/* Source table is not in InnoDB format:
no need to re-acquire locks on it. */
@@ -7499,6 +8043,21 @@ no_commit:
/* We will need an IX lock on the destination table. */
prebuilt->sql_stat_start = TRUE;
} else {
+#ifdef WITH_WSREP
+ switch (wsrep_run_wsrep_commit(user_thd, wsrep_hton, 1))
+ {
+ case WSREP_TRX_OK:
+ break;
+ case WSREP_TRX_SIZE_EXCEEDED:
+ case WSREP_TRX_CERT_FAIL:
+ case WSREP_TRX_ERROR:
+ DBUG_RETURN(1);
+ }
+
+ if (binlog_hton->commit(binlog_hton, user_thd, 1))
+ DBUG_RETURN(1);
+ wsrep_post_commit(user_thd, TRUE);
+#endif /* WITH_WSREP */
/* Ensure that there are no other table locks than
LOCK_IX and LOCK_AUTO_INC on the destination table. */
@@ -7528,6 +8087,10 @@ no_commit:
innobase_get_auto_increment(). */
prebuilt->autoinc_error = DB_SUCCESS;
+#ifdef WITH_WSREP
+ auto_inc_inserted= (table->next_number_field->val_int() == 0);
+#endif
+
if ((error_result = update_auto_increment())) {
/* We don't want to mask autoinc overflow errors. */
@@ -7615,6 +8178,32 @@ no_commit:
case SQLCOM_REPLACE_SELECT:
goto set_max_autoinc;
+#ifdef WITH_WSREP
+ /* workaround for LP bug #355000, retrying the insert */
+ case SQLCOM_INSERT:
+ if (wsrep_on(current_thd) &&
+ auto_inc_inserted &&
+ wsrep_drupal_282555_workaround &&
+ wsrep_thd_retry_counter(current_thd) == 0 &&
+ !thd_test_options(current_thd,
+ OPTION_NOT_AUTOCOMMIT |
+ OPTION_BEGIN)) {
+ WSREP_DEBUG(
+ "retrying insert: %s",
+ (*wsrep_thd_query(current_thd)) ?
+ wsrep_thd_query(current_thd) :
+ (char *)"void");
+ error= DB_SUCCESS;
+ wsrep_thd_set_conflict_state(
+ current_thd, MUST_ABORT);
+ innobase_srv_conc_exit_innodb(prebuilt->trx);
+ /* jump straight to func exit over
+ * later wsrep hooks */
+ goto func_exit;
+ }
+ break;
+#endif /* WITH_WSREP */
+
default:
break;
}
@@ -7674,6 +8263,21 @@ report_error:
prebuilt->table->flags,
user_thd);
+#ifdef WITH_WSREP
+ if (!error_result && wsrep_thd_exec_mode(user_thd) == LOCAL_STATE &&
+ wsrep_on(user_thd) && !wsrep_consistency_check(user_thd) &&
+ (sql_command != SQLCOM_LOAD ||
+ thd_binlog_format(user_thd) == BINLOG_FORMAT_ROW)) {
+
+ if (wsrep_append_keys(user_thd, false, record, NULL)) {
+ DBUG_PRINT("wsrep", ("row key failed"));
+ error_result = HA_ERR_INTERNAL_ERROR;
+ goto wsrep_error;
+ }
+ }
+wsrep_error:
+#endif /* WITH_WSREP */
+
if (error_result == HA_FTS_INVALID_DOCID) {
my_error(HA_FTS_INVALID_DOCID, MYF(0));
}
@@ -7963,6 +8567,87 @@ calc_row_difference(
return(DB_SUCCESS);
}
+#ifdef WITH_WSREP
+static
+int
+wsrep_calc_row_hash(
+/*================*/
+ byte* digest, /*!< in/out: md5 sum */
+ const uchar* row, /*!< in: row in MySQL format */
+ TABLE* table, /*!< in: table in MySQL data
+ dictionary */
+ row_prebuilt_t* prebuilt, /*!< in: InnoDB prebuilt struct */
+ THD* thd) /*!< in: user thread */
+{
+ Field* field;
+ enum_field_types field_mysql_type;
+ uint n_fields;
+ ulint len;
+ const byte* ptr;
+ ulint col_type;
+ uint i;
+
+ void *ctx = wsrep_md5_init();
+
+ n_fields = table->s->fields;
+
+ for (i = 0; i < n_fields; i++) {
+ byte null_byte=0;
+ byte true_byte=1;
+
+ field = table->field[i];
+
+ ptr = (const byte*) row + get_field_offset(table, field);
+ len = field->pack_length();
+
+ field_mysql_type = field->type();
+
+ col_type = prebuilt->table->cols[i].mtype;
+
+ switch (col_type) {
+
+ case DATA_BLOB:
+ ptr = row_mysql_read_blob_ref(&len, ptr, len);
+
+ break;
+
+ case DATA_VARCHAR:
+ case DATA_BINARY:
+ case DATA_VARMYSQL:
+ if (field_mysql_type == MYSQL_TYPE_VARCHAR) {
+ /* This is a >= 5.0.3 type true VARCHAR where
+ the real payload data length is stored in
+ 1 or 2 bytes */
+
+ ptr = row_mysql_read_true_varchar(
+ &len, ptr,
+ (ulint)
+ (((Field_varstring*)field)->length_bytes));
+
+ }
+
+ break;
+ default:
+ ;
+ }
+ /*
+ if (field->null_ptr &&
+ field_in_record_is_null(table, field, (char*) row)) {
+ */
+
+ if (field->is_null_in_record(row)) {
+ wsrep_md5_update(ctx, (char*)&null_byte, 1);
+ } else {
+ wsrep_md5_update(ctx, (char*)&true_byte, 1);
+ wsrep_md5_update(ctx, (char*)ptr, len);
+ }
+ }
+
+ wsrep_compute_md5_hash((char*)digest, ctx);
+
+ return(0);
+}
+#endif /* WITH_WSREP */
/**********************************************************************//**
Updates a row given as a parameter to a new value. Note that we are given
whole rows, not just the fields which are updated: this incurs some
@@ -8109,6 +8794,23 @@ func_exit:
innobase_active_small();
+#ifdef WITH_WSREP
+ if (error == DB_SUCCESS &&
+ wsrep_thd_exec_mode(user_thd) == LOCAL_STATE &&
+ wsrep_on(user_thd)) {
+
+ DBUG_PRINT("wsrep", ("update row key"));
+
+ if (wsrep_append_keys(user_thd, false, old_row, new_row)) {
+ WSREP_DEBUG("WSREP: UPDATE_ROW_KEY FAILED");
+ DBUG_PRINT("wsrep", ("row key failed"));
+ err = HA_ERR_INTERNAL_ERROR;
+ goto wsrep_error;
+ }
+ }
+wsrep_error:
+#endif /* WITH_WSREP */
+
if (UNIV_UNLIKELY(share->ib_table->is_corrupt)) {
DBUG_RETURN(HA_ERR_CRASHED);
}
@@ -8171,6 +8873,19 @@ ha_innobase::delete_row(
innobase_active_small();
+#ifdef WITH_WSREP
+ if (error == DB_SUCCESS && wsrep_thd_exec_mode(user_thd) == LOCAL_STATE &&
+ wsrep_on(user_thd)) {
+
+ if (wsrep_append_keys(user_thd, false, record, NULL)) {
+ DBUG_PRINT("wsrep", ("delete fail"));
+ error = DB_ERROR;
+ goto wsrep_error;
+ }
+ }
+wsrep_error:
+#endif /* WITH_WSREP */
+
if (UNIV_UNLIKELY(share && share->ib_table
&& share->ib_table->is_corrupt)) {
DBUG_RETURN(HA_ERR_CRASHED);
@@ -9360,6 +10075,394 @@ ha_innobase::ft_end()
rnd_end();
}
+#ifdef WITH_WSREP
+extern dict_index_t*
+wsrep_dict_foreign_find_index(
+ dict_table_t* table,
+ const char** col_names,
+ const char** columns,
+ ulint n_cols,
+ dict_index_t* types_idx,
+ ibool check_charsets,
+ ulint check_null);
+
+
+extern dberr_t
+wsrep_append_foreign_key(
+/*===========================*/
+ trx_t* trx, /*!< in: trx */
+ dict_foreign_t* foreign, /*!< in: foreign key constraint */
+ const rec_t* rec, /*!<in: clustered index record */
+ dict_index_t* index, /*!<in: clustered index */
+ ibool referenced, /*!<in: is check for referenced table */
+ ibool shared) /*!<in: is shared access */
+{
+ ut_a(trx);
+ THD* thd = (THD*)trx->mysql_thd;
+ ulint rcode = DB_SUCCESS;
+ char cache_key[513] = {'\0'};
+ int cache_key_len;
+ bool const copy = true;
+
+ if (!wsrep_on(trx->mysql_thd) ||
+ wsrep_thd_exec_mode(thd) != LOCAL_STATE)
+ return DB_SUCCESS;
+
+ if (!thd || !foreign ||
+ (!foreign->referenced_table && !foreign->foreign_table))
+ {
+ WSREP_INFO("FK: %s missing in: %s",
+ (!thd) ? "thread" :
+ ((!foreign) ? "constraint" :
+ ((!foreign->referenced_table) ?
+ "referenced table" : "foreign table")),
+ (thd && wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void");
+ return DB_ERROR;
+ }
+
+ if ( !((referenced) ?
+ foreign->referenced_table : foreign->foreign_table))
+ {
+ WSREP_DEBUG("pulling %s table into cache",
+ (referenced) ? "referenced" : "foreign");
+ mutex_enter(&(dict_sys->mutex));
+ if (referenced)
+ {
+ foreign->referenced_table =
+ dict_table_get_low(
+ foreign->referenced_table_name_lookup);
+ if (foreign->referenced_table)
+ {
+ foreign->referenced_index =
+ wsrep_dict_foreign_find_index(
+ foreign->referenced_table, NULL,
+ foreign->referenced_col_names,
+ foreign->n_fields,
+ foreign->foreign_index,
+ TRUE, FALSE);
+ }
+ }
+ else
+ {
+ foreign->foreign_table =
+ dict_table_get_low(
+ foreign->foreign_table_name_lookup);
+ if (foreign->foreign_table)
+ {
+ foreign->foreign_index =
+ wsrep_dict_foreign_find_index(
+ foreign->foreign_table, NULL,
+ foreign->foreign_col_names,
+ foreign->n_fields,
+ foreign->referenced_index,
+ TRUE, FALSE);
+ }
+ }
+ mutex_exit(&(dict_sys->mutex));
+ }
+
+ if ( !((referenced) ?
+ foreign->referenced_table : foreign->foreign_table))
+ {
+ WSREP_WARN("FK: %s missing in query: %s",
+ (!foreign->referenced_table) ?
+ "referenced table" : "foreign table",
+ (wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void");
+ return DB_ERROR;
+ }
+ byte key[WSREP_MAX_SUPPORTED_KEY_LENGTH+1] = {'\0'};
+ ulint len = WSREP_MAX_SUPPORTED_KEY_LENGTH;
+
+ dict_index_t *idx_target = (referenced) ?
+ foreign->referenced_index : index;
+ dict_index_t *idx = (referenced) ?
+ UT_LIST_GET_FIRST(foreign->referenced_table->indexes) :
+ UT_LIST_GET_FIRST(foreign->foreign_table->indexes);
+ int i = 0;
+ while (idx != NULL && idx != idx_target) {
+ if (innobase_strcasecmp (idx->name, innobase_index_reserve_name) != 0) {
+ i++;
+ }
+ idx = UT_LIST_GET_NEXT(indexes, idx);
+ }
+ ut_a(idx);
+ key[0] = (char)i;
+
+ rcode = wsrep_rec_get_foreign_key(
+ &key[1], &len, rec, index, idx,
+ wsrep_protocol_version > 1);
+ if (rcode != DB_SUCCESS) {
+ WSREP_ERROR(
+ "FK key set failed: %lu (%lu %lu), index: %s %s, %s",
+ rcode, referenced, shared,
+ (index && index->name) ? index->name :
+ "void index",
+ (index && index->table_name) ? index->table_name :
+ "void table",
+ wsrep_thd_query(thd));
+ return DB_ERROR;
+ }
+ strncpy(cache_key,
+ (wsrep_protocol_version > 1) ?
+ ((referenced) ?
+ foreign->referenced_table->name :
+ foreign->foreign_table->name) :
+ foreign->foreign_table->name, sizeof(cache_key) - 1);
+ cache_key_len = strlen(cache_key);
+#ifdef WSREP_DEBUG_PRINT
+ ulint j;
+ fprintf(stderr, "FK parent key, table: %s %s len: %lu ",
+ cache_key, (shared) ? "shared" : "exclusive", len+1);
+ for (j=0; j<len+1; j++) {
+ fprintf(stderr, " %hhX, ", key[j]);
+ }
+ fprintf(stderr, "\n");
+#endif
+ char *p = strchr(cache_key, '/');
+ if (p) {
+ *p = '\0';
+ } else {
+ WSREP_WARN("unexpected foreign key table %s %s",
+ foreign->referenced_table->name,
+ foreign->foreign_table->name);
+ }
+
+ wsrep_buf_t wkey_part[3];
+ wsrep_key_t wkey = {wkey_part, 3};
+ if (!wsrep_prepare_key_for_innodb(
+ (const uchar*)cache_key,
+ cache_key_len + 1,
+ (const uchar*)key, len+1,
+ wkey_part,
+ (size_t*)&wkey.key_parts_num)) {
+ WSREP_WARN("key prepare failed for cascaded FK: %s",
+ (wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void");
+ return DB_ERROR;
+ }
+ rcode = (int)wsrep->append_key(
+ wsrep,
+ wsrep_ws_handle(thd, trx),
+ &wkey,
+ 1,
+ shared ? WSREP_KEY_SHARED : WSREP_KEY_EXCLUSIVE,
+ copy);
+ if (rcode) {
+ DBUG_PRINT("wsrep", ("row key failed: %lu", rcode));
+ WSREP_ERROR("Appending cascaded fk row key failed: %s, %lu",
+ (wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void", rcode);
+ return DB_ERROR;
+ }
+
+ return DB_SUCCESS;
+}
+
+static int
+wsrep_append_key(
+/*==================*/
+ THD *thd,
+ trx_t *trx,
+ TABLE_SHARE *table_share,
+ TABLE *table,
+ const char* key,
+ uint16_t key_len,
+ bool shared
+)
+{
+ DBUG_ENTER("wsrep_append_key");
+ bool const copy = true;
+#ifdef WSREP_DEBUG_PRINT
+ fprintf(stderr, "%s conn %ld, trx %lu, keylen %d, table %s ",
+ (shared) ? "Shared" : "Exclusive",
+ wsrep_thd_thread_id(thd), (long long)trx->id, key_len,
+ table_share->table_name.str);
+ for (int i=0; i<key_len; i++) {
+ fprintf(stderr, "%hhX, ", key[i]);
+ }
+ fprintf(stderr, "\n");
+#endif
+ wsrep_buf_t wkey_part[3];
+ wsrep_key_t wkey = {wkey_part, 3};
+ if (!wsrep_prepare_key_for_innodb(
+ (const uchar*)table_share->table_cache_key.str,
+ table_share->table_cache_key.length,
+ (const uchar*)key, key_len,
+ wkey_part,
+ (size_t*)&wkey.key_parts_num)) {
+ WSREP_WARN("key prepare failed for: %s",
+ (wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void");
+ DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
+ }
+
+ int rcode = (int)wsrep->append_key(
+ wsrep,
+ wsrep_ws_handle(thd, trx),
+ &wkey,
+ 1,
+ shared ? WSREP_KEY_SHARED : WSREP_KEY_EXCLUSIVE,
+ copy);
+ if (rcode) {
+ DBUG_PRINT("wsrep", ("row key failed: %d", rcode));
+ WSREP_WARN("Appending row key failed: %s, %d",
+ (wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void", rcode);
+ DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
+ }
+ DBUG_RETURN(0);
+}
+
+extern void compute_md5_hash(char *digest, const char *buf, int len);
+#define MD5_HASH compute_md5_hash
+
+int
+ha_innobase::wsrep_append_keys(
+/*==================*/
+ THD *thd,
+ bool shared,
+ const uchar* record0, /* in: row in MySQL format */
+ const uchar* record1) /* in: row in MySQL format */
+{
+ int rcode;
+ DBUG_ENTER("wsrep_append_keys");
+
+ bool key_appended = false;
+ trx_t *trx = thd_to_trx(thd);
+
+ if (table_share && table_share->tmp_table != NO_TMP_TABLE) {
+ WSREP_DEBUG("skipping tmp table DML: THD: %lu tmp: %d SQL: %s",
+ wsrep_thd_thread_id(thd),
+ table_share->tmp_table,
+ (wsrep_thd_query(thd)) ?
+ wsrep_thd_query(thd) : "void");
+ DBUG_RETURN(0);
+ }
+
+ if (wsrep_protocol_version == 0) {
+ uint len;
+ char keyval[WSREP_MAX_SUPPORTED_KEY_LENGTH+1] = {'\0'};
+ char *key = &keyval[0];
+ ibool is_null;
+
+ len = wsrep_store_key_val_for_row(
+ thd, table, 0, key, WSREP_MAX_SUPPORTED_KEY_LENGTH,
+ record0, &is_null);
+
+ if (!is_null) {
+ rcode = wsrep_append_key(
+ thd, trx, table_share, table, keyval,
+ len, shared);
+ if (rcode) DBUG_RETURN(rcode);
+ }
+ else
+ {
+ WSREP_DEBUG("NULL key skipped (proto 0): %s",
+ wsrep_thd_query(thd));
+ }
+ } else {
+ ut_a(table->s->keys <= 256);
+ uint i;
+ bool hasPK= false;
+
+ for (i=0; i<table->s->keys; ++i) {
+ KEY* key_info = table->key_info + i;
+ if (key_info->flags & HA_NOSAME) {
+ hasPK = true;
+ }
+ }
+
+ for (i=0; i<table->s->keys; ++i) {
+ uint len;
+ char keyval0[WSREP_MAX_SUPPORTED_KEY_LENGTH+1] = {'\0'};
+ char keyval1[WSREP_MAX_SUPPORTED_KEY_LENGTH+1] = {'\0'};
+ char* key0 = &keyval0[1];
+ char* key1 = &keyval1[1];
+ KEY* key_info = table->key_info + i;
+ ibool is_null;
+
+ dict_index_t* idx = innobase_get_index(i);
+ dict_table_t* tab = (idx) ? idx->table : NULL;
+
+ keyval0[0] = (char)i;
+ keyval1[0] = (char)i;
+
+ if (!tab) {
+ WSREP_WARN("MySQL-InnoDB key mismatch %s %s",
+ table->s->table_name.str,
+ key_info->name);
+ }
+ /* !hasPK == table with no PK, must append all non-unique keys */
+ if (!hasPK || key_info->flags & HA_NOSAME ||
+ ((tab &&
+ dict_table_get_referenced_constraint(tab, idx)) ||
+ (!tab && referenced_by_foreign_key()))) {
+
+ len = wsrep_store_key_val_for_row(
+ thd, table, i, key0,
+ WSREP_MAX_SUPPORTED_KEY_LENGTH,
+ record0, &is_null);
+ if (!is_null) {
+ rcode = wsrep_append_key(
+ thd, trx, table_share, table,
+ keyval0, len+1, shared);
+ if (rcode) DBUG_RETURN(rcode);
+
+ if (key_info->flags & HA_NOSAME || shared)
+ key_appended = true;
+ }
+ else
+ {
+ WSREP_DEBUG("NULL key skipped: %s",
+ wsrep_thd_query(thd));
+ }
+ if (record1) {
+ len = wsrep_store_key_val_for_row(
+ thd, table, i, key1,
+ WSREP_MAX_SUPPORTED_KEY_LENGTH,
+ record1, &is_null);
+ if (!is_null && memcmp(key0, key1, len)) {
+ rcode = wsrep_append_key(
+ thd, trx, table_share,
+ table,
+ keyval1, len+1, shared);
+ if (rcode) DBUG_RETURN(rcode);
+ }
+ }
+ }
+ }
+ }
+
+ /* if no PK, calculate hash of full row, to be the key value */
+ if (!key_appended && wsrep_certify_nonPK) {
+ uchar digest[16];
+ int rcode;
+
+ wsrep_calc_row_hash(digest, record0, table, prebuilt, thd);
+ if ((rcode = wsrep_append_key(thd, trx, table_share, table,
+ (const char*) digest, 16,
+ shared))) {
+ DBUG_RETURN(rcode);
+ }
+
+ if (record1) {
+ wsrep_calc_row_hash(
+ digest, record1, table, prebuilt, thd);
+ if ((rcode = wsrep_append_key(thd, trx, table_share,
+ table,
+ (const char*) digest,
+ 16, shared))) {
+ DBUG_RETURN(rcode);
+ }
+ }
+ DBUG_RETURN(0);
+ }
+
+ DBUG_RETURN(0);
+}
+#endif /* WITH_WSREP */
/*********************************************************************//**
Stores a reference to the current row to 'ref' field of the handle. Note
@@ -13276,11 +14379,18 @@ ha_innobase::external_lock(
/* used by test case */
DBUG_EXECUTE_IF("no_innodb_binlog_errors", skip = true;);
if (!skip) {
- my_error(ER_BINLOG_STMT_MODE_AND_ROW_ENGINE, MYF(0),
- " InnoDB is limited to row-logging when "
- "transaction isolation level is "
- "READ COMMITTED or READ UNCOMMITTED.");
- DBUG_RETURN(HA_ERR_LOGGING_IMPOSSIBLE);
+#ifdef WITH_WSREP
+ if (!wsrep_on(thd)
+ || wsrep_thd_exec_mode(thd) == LOCAL_STATE) {
+#endif /* WITH_WSREP */
+ my_error(ER_BINLOG_STMT_MODE_AND_ROW_ENGINE, MYF(0),
+ " InnoDB is limited to row-logging when "
+ "transaction isolation level is "
+ "READ COMMITTED or READ UNCOMMITTED.");
+ DBUG_RETURN(HA_ERR_LOGGING_IMPOSSIBLE);
+#ifdef WITH_WSREP
+ }
+#endif /* WITH_WSREP */
}
}
@@ -14754,6 +15864,9 @@ innobase_xa_prepare(
time, not the current session variable value. Any possible changes
to the session variable take effect only in the next transaction */
if (!trx->support_xa) {
+#ifdef WITH_WSREP
+ thd_get_xid(thd, (MYSQL_XID*) &trx->xid);
+#endif // WITH_WSREP
return(0);
}
@@ -17155,6 +18268,303 @@ static SHOW_VAR innodb_status_variables_export[]= {
static struct st_mysql_storage_engine innobase_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
+#ifdef WITH_WSREP
+void
+wsrep_abort_slave_trx(wsrep_seqno_t bf_seqno, wsrep_seqno_t victim_seqno)
+{
+ WSREP_ERROR("Trx %lld tries to abort slave trx %lld. This could be "
+ "caused by:\n\t"
+ "1) unsupported configuration options combination, please check documentation.\n\t"
+ "2) a bug in the code.\n\t"
+ "3) a database corruption.\n Node consistency compromized, "
+ "need to abort. Restart the node to resync with cluster.",
+ (long long)bf_seqno, (long long)victim_seqno);
+ abort();
+}
+/*******************************************************************//**
+This function is used to kill one transaction in BF. */
+
+int
+wsrep_innobase_kill_one_trx(void * const bf_thd_ptr,
+ const trx_t * const bf_trx,
+ trx_t *victim_trx, ibool signal)
+{
+ ut_ad(lock_mutex_own());
+ ut_ad(trx_mutex_own(victim_trx));
+ ut_ad(bf_thd_ptr);
+ ut_ad(victim_trx);
+
+ DBUG_ENTER("wsrep_innobase_kill_one_trx");
+ THD *bf_thd = bf_thd_ptr ? (THD*) bf_thd_ptr : NULL;
+ THD *thd = (THD *) victim_trx->mysql_thd;
+ int64_t bf_seqno = (bf_thd) ? wsrep_thd_trx_seqno(bf_thd) : 0;
+
+ if (!thd) {
+ DBUG_PRINT("wsrep", ("no thd for conflicting lock"));
+ WSREP_WARN("no THD for trx: %lu", victim_trx->id);
+ DBUG_RETURN(1);
+ }
+ if (!bf_thd) {
+ DBUG_PRINT("wsrep", ("no BF thd for conflicting lock"));
+ WSREP_WARN("no BF THD for trx: %lu", (bf_trx) ? bf_trx->id : 0);
+ DBUG_RETURN(1);
+ }
+
+ WSREP_LOG_CONFLICT(bf_thd, thd, TRUE);
+
+ WSREP_DEBUG("BF kill (%lu, seqno: %lld), victim: (%lu) trx: %lu",
+ signal, (long long)bf_seqno,
+ wsrep_thd_thread_id(thd),
+ victim_trx->id);
+
+ WSREP_DEBUG("Aborting query: %s",
+ (thd && wsrep_thd_query(thd)) ? wsrep_thd_query(thd) : "void");
+
+ wsrep_thd_LOCK(thd);
+
+ if (wsrep_thd_query_state(thd) == QUERY_EXITING) {
+ WSREP_DEBUG("kill trx EXITING for %lu", victim_trx->id);
+ wsrep_thd_UNLOCK(thd);
+ DBUG_RETURN(0);
+ }
+ if(wsrep_thd_exec_mode(thd) != LOCAL_STATE) {
+ WSREP_DEBUG("withdraw for BF trx: %lu, state: %d",
+ victim_trx->id,
+ wsrep_thd_conflict_state(thd));
+ }
+
+ switch (wsrep_thd_conflict_state(thd)) {
+ case NO_CONFLICT:
+ wsrep_thd_set_conflict_state(thd, MUST_ABORT);
+ break;
+ case MUST_ABORT:
+ WSREP_DEBUG("victim %lu in MUST ABORT state",
+ victim_trx->id);
+ wsrep_thd_UNLOCK(thd);
+ wsrep_thd_awake(thd, signal);
+ DBUG_RETURN(0);
+ break;
+ case ABORTED:
+ case ABORTING: // fall through
+ default:
+ WSREP_DEBUG("victim %lu in state %d",
+ victim_trx->id, wsrep_thd_conflict_state(thd));
+ wsrep_thd_UNLOCK(thd);
+ DBUG_RETURN(0);
+ break;
+ }
+
+ switch (wsrep_thd_query_state(thd)) {
+ case QUERY_COMMITTING:
+ enum wsrep_status rcode;
+
+ WSREP_DEBUG("kill query for: %ld",
+ wsrep_thd_thread_id(thd));
+ WSREP_DEBUG("kill trx QUERY_COMMITTING for %lu",
+ victim_trx->id);
+
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV) {
+ wsrep_abort_slave_trx(bf_seqno,
+ wsrep_thd_trx_seqno(thd));
+ } else {
+ rcode = wsrep->abort_pre_commit(
+ wsrep, bf_seqno,
+ (wsrep_trx_id_t)victim_trx->id
+ );
+
+ switch (rcode) {
+ case WSREP_WARNING:
+ WSREP_DEBUG("cancel commit warning: %lu",
+ victim_trx->id);
+ wsrep_thd_UNLOCK(thd);
+ wsrep_thd_awake(thd, signal);
+ DBUG_RETURN(1);
+ break;
+ case WSREP_OK:
+ break;
+ default:
+ WSREP_ERROR(
+ "cancel commit bad exit: %d %lu",
+ rcode,
+ victim_trx->id);
+ /* unable to interrupt, must abort */
+ /* note: kill_mysql() will block, if we cannot.
+ * kill the lock holder first.
+ */
+ abort();
+ break;
+ }
+ }
+ wsrep_thd_UNLOCK(thd);
+ wsrep_thd_awake(thd, signal);
+ break;
+ case QUERY_EXEC:
+ /* it is possible that victim trx is itself waiting for some
+ * other lock. We need to cancel this waiting
+ */
+ WSREP_DEBUG("kill trx QUERY_EXEC for %lu", victim_trx->id);
+
+ victim_trx->lock.was_chosen_as_deadlock_victim= TRUE;
+ if (victim_trx->lock.wait_lock) {
+ WSREP_DEBUG("victim has wait flag: %ld",
+ wsrep_thd_thread_id(thd));
+ lock_t* wait_lock = victim_trx->lock.wait_lock;
+ if (wait_lock) {
+ WSREP_DEBUG("canceling wait lock");
+ victim_trx->lock.was_chosen_as_deadlock_victim= TRUE;
+ lock_cancel_waiting_and_release(wait_lock);
+ }
+
+ wsrep_thd_UNLOCK(thd);
+ wsrep_thd_awake(thd, signal);
+ } else {
+ /* abort currently executing query */
+ DBUG_PRINT("wsrep",("sending KILL_QUERY to: %ld",
+ wsrep_thd_thread_id(thd)));
+ WSREP_DEBUG("kill query for: %ld",
+ wsrep_thd_thread_id(thd));
+ /* Note that innobase_kill_connection will take lock_mutex
+ and trx_mutex */
+ wsrep_thd_UNLOCK(thd);
+ wsrep_thd_awake(thd, signal);
+
+ /* for BF thd, we need to prevent him from committing */
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV) {
+ wsrep_abort_slave_trx(bf_seqno,
+ wsrep_thd_trx_seqno(thd));
+ }
+ }
+ break;
+ case QUERY_IDLE:
+ {
+ bool skip_abort= false;
+ wsrep_aborting_thd_t abortees;
+
+ WSREP_DEBUG("kill IDLE for %lu", victim_trx->id);
+
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV) {
+ WSREP_DEBUG("kill BF IDLE, seqno: %lld",
+ (long long)wsrep_thd_trx_seqno(thd));
+ wsrep_thd_UNLOCK(thd);
+ wsrep_abort_slave_trx(bf_seqno,
+ wsrep_thd_trx_seqno(thd));
+ DBUG_RETURN(0);
+ }
+ /* This will lock thd from proceeding after net_read() */
+ wsrep_thd_set_conflict_state(thd, ABORTING);
+
+ mysql_mutex_lock(&LOCK_wsrep_rollback);
+
+ abortees = wsrep_aborting_thd;
+ while (abortees && !skip_abort) {
+ /* check if we have a kill message for this already */
+ if (abortees->aborting_thd == thd) {
+ skip_abort = true;
+ WSREP_WARN("duplicate thd aborter %lu",
+ wsrep_thd_thread_id(thd));
+ }
+ abortees = abortees->next;
+ }
+ if (!skip_abort) {
+ wsrep_aborting_thd_t aborting = (wsrep_aborting_thd_t)
+ my_malloc(sizeof(struct wsrep_aborting_thd),
+ MYF(0));
+ aborting->aborting_thd = thd;
+ aborting->next = wsrep_aborting_thd;
+ wsrep_aborting_thd = aborting;
+ DBUG_PRINT("wsrep",("enqueuing trx abort for %lu",
+ wsrep_thd_thread_id(thd)));
+ WSREP_DEBUG("enqueuing trx abort for (%lu)",
+ wsrep_thd_thread_id(thd));
+ }
+
+ DBUG_PRINT("wsrep",("signalling wsrep rollbacker"));
+ WSREP_DEBUG("signaling aborter");
+ mysql_cond_signal(&COND_wsrep_rollback);
+ mysql_mutex_unlock(&LOCK_wsrep_rollback);
+ wsrep_thd_UNLOCK(thd);
+
+ break;
+ }
+ default:
+ WSREP_WARN("bad wsrep query state: %d",
+ wsrep_thd_query_state(thd));
+ wsrep_thd_UNLOCK(thd);
+ break;
+ }
+
+ DBUG_RETURN(0);
+}
+static int
+wsrep_abort_transaction(handlerton* hton, THD *bf_thd, THD *victim_thd,
+ my_bool signal)
+{
+ DBUG_ENTER("wsrep_innobase_abort_thd");
+ trx_t* victim_trx = thd_to_trx(victim_thd);
+ trx_t* bf_trx = (bf_thd) ? thd_to_trx(bf_thd) : NULL;
+ WSREP_DEBUG("abort transaction: BF: %s victim: %s",
+ wsrep_thd_query(bf_thd),
+ wsrep_thd_query(victim_thd));
+
+ if (victim_trx)
+ {
+ lock_mutex_enter();
+ trx_mutex_enter(victim_trx);
+ int rcode = wsrep_innobase_kill_one_trx(bf_thd, bf_trx,
+ victim_trx, signal);
+ trx_mutex_exit(victim_trx);
+ lock_mutex_exit();
+ wsrep_srv_conc_cancel_wait(victim_trx);
+
+ DBUG_RETURN(rcode);
+ } else {
+ WSREP_DEBUG("victim does not have transaction");
+ wsrep_thd_LOCK(victim_thd);
+ wsrep_thd_set_conflict_state(victim_thd, MUST_ABORT);
+ wsrep_thd_UNLOCK(victim_thd);
+ wsrep_thd_awake(victim_thd, signal);
+ }
+ DBUG_RETURN(-1);
+}
+
+static int innobase_wsrep_set_checkpoint(handlerton* hton, const XID* xid)
+{
+ DBUG_ASSERT(hton == innodb_hton_ptr);
+ if (wsrep_is_wsrep_xid((const void *)xid)) {
+ mtr_t mtr;
+ mtr_start(&mtr);
+ trx_sysf_t* sys_header = trx_sysf_get(&mtr);
+ trx_sys_update_wsrep_checkpoint(xid, sys_header, &mtr);
+ mtr_commit(&mtr);
+ innobase_flush_logs(hton);
+ return 0;
+ } else {
+ return 1;
+ }
+}
+
+static int innobase_wsrep_get_checkpoint(handlerton* hton, XID* xid)
+{
+ DBUG_ASSERT(hton == innodb_hton_ptr);
+ trx_sys_read_wsrep_checkpoint(xid);
+ return 0;
+}
+
+static void
+wsrep_fake_trx_id(
+/*==================*/
+ handlerton *hton,
+ THD *thd) /*!< in: user thread handle */
+{
+ mutex_enter(&trx_sys->mutex);
+ trx_id_t trx_id = trx_sys_get_new_trx_id();
+ mutex_exit(&trx_sys->mutex);
+
+ (void *)wsrep_ws_handle_for_trx(wsrep_thd_ws_handle(thd), trx_id);
+}
+
+#endif /* WITH_WSREP */
+
/* plugin options */
static MYSQL_SYSVAR_ENUM(checksum_algorithm, srv_checksum_algorithm,
@@ -18092,6 +19502,40 @@ static MYSQL_SYSVAR_BOOL(disable_background_merge,
NULL, NULL, FALSE);
#endif /* UNIV_DEBUG || UNIV_IBUF_DEBUG */
+#ifdef WITH_INNODB_DISALLOW_WRITES
+/*******************************************************
+ * innobase_disallow_writes variable definition *
+ *******************************************************/
+
+/* Must always init to FALSE. */
+static my_bool innobase_disallow_writes = FALSE;
+
+/**************************************************************************
+An "update" method for innobase_disallow_writes variable. */
+static
+void
+innobase_disallow_writes_update(
+/*============================*/
+ THD* thd, /* in: thread handle */
+ st_mysql_sys_var* var, /* in: pointer to system
+ variable */
+ void* var_ptr, /* out: pointer to dynamic
+ variable */
+ const void* save) /* in: temporary storage */
+{
+ *(my_bool*)var_ptr = *(my_bool*)save;
+ ut_a(srv_allow_writes_event);
+ if (*(my_bool*)var_ptr)
+ os_event_reset(srv_allow_writes_event);
+ else
+ os_event_set(srv_allow_writes_event);
+}
+
+static MYSQL_SYSVAR_BOOL(disallow_writes, innobase_disallow_writes,
+ PLUGIN_VAR_NOCMDOPT,
+ "Tell InnoDB to stop any writes to disk",
+ NULL, innobase_disallow_writes_update, FALSE);
+#endif /* WITH_INNODB_DISALLOW_WRITES */
static MYSQL_SYSVAR_BOOL(random_read_ahead, srv_random_read_ahead,
PLUGIN_VAR_NOCMDARG,
"Whether to use read ahead for random access within an extent.",
@@ -18403,6 +19847,9 @@ static struct st_mysql_sys_var* innobase_system_variables[]= {
MYSQL_SYSVAR(change_buffering_debug),
MYSQL_SYSVAR(disable_background_merge),
#endif /* UNIV_DEBUG || UNIV_IBUF_DEBUG */
+#ifdef WITH_INNODB_DISALLOW_WRITES
+ MYSQL_SYSVAR(disallow_writes),
+#endif /* WITH_INNODB_DISALLOW_WRITES */
MYSQL_SYSVAR(random_read_ahead),
MYSQL_SYSVAR(read_ahead_threshold),
MYSQL_SYSVAR(read_only),
diff --git a/storage/xtradb/handler/ha_innodb.h b/storage/xtradb/handler/ha_innodb.h
index b4df711356c..ff4ab88335a 100644
--- a/storage/xtradb/handler/ha_innodb.h
+++ b/storage/xtradb/handler/ha_innodb.h
@@ -119,6 +119,10 @@ class ha_innobase: public handler
void innobase_initialize_autoinc();
dict_index_t* innobase_get_index(uint keynr);
+#ifdef WITH_WSREP
+ int wsrep_append_keys(THD *thd, bool shared,
+ const uchar* record0, const uchar* record1);
+#endif
/* Init values for the class: */
public:
ha_innobase(handlerton *hton, TABLE_SHARE *table_arg);
@@ -472,6 +476,40 @@ __attribute__((nonnull));
extern void mysql_bin_log_commit_pos(THD *thd, ulonglong *out_pos, const char **out_file);
struct trx_t;
+#ifdef WITH_WSREP
+#include <wsrep_mysqld.h>
+//extern "C" int wsrep_trx_order_before(void *thd1, void *thd2);
+
+extern "C" bool wsrep_thd_is_wsrep_on(THD *thd);
+
+extern "C" enum wsrep_exec_mode wsrep_thd_exec_mode(THD *thd);
+extern "C" enum wsrep_conflict_state wsrep_thd_conflict_state(THD *thd);
+extern "C" enum wsrep_query_state wsrep_thd_query_state(THD *thd);
+extern "C" const char * wsrep_thd_exec_mode_str(THD *thd);
+extern "C" const char * wsrep_thd_conflict_state_str(THD *thd);
+extern "C" const char * wsrep_thd_query_state_str(THD *thd);
+extern "C" wsrep_ws_handle_t* wsrep_thd_ws_handle(THD *thd);
+
+extern "C" void wsrep_thd_set_exec_mode(THD *thd, enum wsrep_exec_mode mode);
+extern "C" void wsrep_thd_set_query_state(
+ THD *thd, enum wsrep_query_state state);
+extern "C" void wsrep_thd_set_conflict_state(
+ THD *thd, enum wsrep_conflict_state state);
+
+extern "C" void wsrep_thd_set_trx_to_replay(THD *thd, uint64 trx_id);
+
+extern "C"void wsrep_thd_LOCK(THD *thd);
+extern "C"void wsrep_thd_UNLOCK(THD *thd);
+extern "C" uint32 wsrep_thd_wsrep_rand(THD *thd);
+extern "C" time_t wsrep_thd_query_start(THD *thd);
+extern "C" my_thread_id wsrep_thd_thread_id(THD *thd);
+extern "C" int64_t wsrep_thd_trx_seqno(THD *thd);
+extern "C" query_id_t wsrep_thd_query_id(THD *thd);
+extern "C" char * wsrep_thd_query(THD *thd);
+extern "C" query_id_t wsrep_thd_wsrep_last_query_id(THD *thd);
+extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id);
+extern "C" void wsrep_thd_awake(THD* thd, my_bool signal);
+#endif
extern const struct _ft_vft ft_vft_result;
@@ -509,6 +547,9 @@ innobase_index_name_is_reserved(
__attribute__((nonnull, warn_unused_result));
/*****************************************************************//**
+#ifdef WITH_WSREP
+extern "C" int wsrep_trx_is_aborting(void *thd_ptr);
+#endif
Determines InnoDB table flags.
@retval true if successful, false if error */
UNIV_INTERN
diff --git a/storage/xtradb/handler/handler0alter.cc b/storage/xtradb/handler/handler0alter.cc
index 34048e7c987..2cd0500007a 100644
--- a/storage/xtradb/handler/handler0alter.cc
+++ b/storage/xtradb/handler/handler0alter.cc
@@ -49,6 +49,10 @@ Smart ALTER TABLE
#include "pars0pars.h"
#include "row0sel.h"
#include "ha_innodb.h"
+#ifdef WITH_WSREP
+//#include "wsrep_api.h"
+#include <sql_acl.h> // PROCESS_ACL
+#endif
/** Operations for creating secondary indexes (no rebuild needed) */
static const Alter_inplace_info::HA_ALTER_FLAGS INNOBASE_ONLINE_CREATE
diff --git a/storage/xtradb/include/dict0mem.h b/storage/xtradb/include/dict0mem.h
index a347a75ea42..93fe2efa830 100644
--- a/storage/xtradb/include/dict0mem.h
+++ b/storage/xtradb/include/dict0mem.h
@@ -527,6 +527,9 @@ be REC_VERSION_56_MAX_INDEX_COL_LEN (3072) bytes */
/** Defines the maximum fixed length column size */
#define DICT_MAX_FIXED_COL_LEN DICT_ANTELOPE_MAX_INDEX_COL_LEN
+#ifdef WITH_WSREP
+#define WSREP_MAX_SUPPORTED_KEY_LENGTH 3500
+#endif /* WITH_WSREP */
/** Data structure for a field in an index */
struct dict_field_t{
diff --git a/storage/xtradb/include/ha_prototypes.h b/storage/xtradb/include/ha_prototypes.h
index 66a96282b69..4ebaee3c89e 100644
--- a/storage/xtradb/include/ha_prototypes.h
+++ b/storage/xtradb/include/ha_prototypes.h
@@ -285,6 +285,23 @@ innobase_casedn_str(
/*================*/
char* a); /*!< in/out: string to put in lower case */
+#ifdef WITH_WSREP
+UNIV_INTERN
+int
+wsrep_innobase_kill_one_trx(void *thd_ptr,
+ const trx_t *bf_trx, trx_t *victim_trx, ibool signal);
+my_bool wsrep_thd_set_PA_safe(void *thd_ptr, my_bool safe);
+int wsrep_thd_conflict_state(void *thd_ptr, my_bool sync);
+my_bool wsrep_thd_is_BF(void *thd_ptr, my_bool sync);
+my_bool wsrep_thd_is_wsrep(void *thd_ptr);
+int wsrep_trx_order_before(void *thd1, void *thd2);
+int wsrep_innobase_mysql_sort(int mysql_type, uint charset_number,
+ unsigned char* str, unsigned int str_length,
+ unsigned int buf_length);
+int
+wsrep_on(void *thd_ptr);
+extern "C" int wsrep_is_wsrep_xid(const void*);
+#endif /* WITH_WSREP */
/**********************************************************************//**
Determines the connection character set.
@return connection character set */
diff --git a/storage/xtradb/include/hash0hash.h b/storage/xtradb/include/hash0hash.h
index a6fe4e680a1..68d3c6ace4e 100644
--- a/storage/xtradb/include/hash0hash.h
+++ b/storage/xtradb/include/hash0hash.h
@@ -144,6 +144,33 @@ do {\
}\
} while (0)
+#ifdef WITH_WSREP
+/*******************************************************************//**
+Inserts a struct to the head of hash table. */
+
+#define HASH_PREPEND(TYPE, NAME, TABLE, FOLD, DATA) \
+do { \
+ hash_cell_t* cell3333; \
+ TYPE* struct3333; \
+ \
+ HASH_ASSERT_OWN(TABLE, FOLD) \
+ \
+ (DATA)->NAME = NULL; \
+ \
+ cell3333 = hash_get_nth_cell(TABLE, hash_calc_hash(FOLD, TABLE));\
+ \
+ if (cell3333->node == NULL) { \
+ cell3333->node = DATA; \
+ DATA->NAME = NULL; \
+ } else { \
+ struct3333 = (TYPE*) cell3333->node; \
+ \
+ DATA->NAME = struct3333; \
+ \
+ cell3333->node = DATA; \
+ } \
+} while (0)
+#endif /*WITH_WSREP */
#ifdef UNIV_HASH_DEBUG
# define HASH_ASSERT_VALID(DATA) ut_a((void*) (DATA) != (void*) -1)
# define HASH_INVALIDATE(DATA, NAME) *(void**) (&DATA->NAME) = (void*) -1
diff --git a/storage/xtradb/include/rem0rec.h b/storage/xtradb/include/rem0rec.h
index 8e7d5ff2d48..238cb04e1f8 100644
--- a/storage/xtradb/include/rem0rec.h
+++ b/storage/xtradb/include/rem0rec.h
@@ -981,6 +981,15 @@ are given in one byte (resp. two byte) format. */
two upmost bits in a two byte offset for special purposes */
#define REC_MAX_DATA_SIZE (16 * 1024)
+#ifdef WITH_WSREP
+int wsrep_rec_get_foreign_key(
+ byte *buf, /* out: extracted key */
+ ulint *buf_len, /* in/out: length of buf */
+ const rec_t* rec, /* in: physical record */
+ dict_index_t* index_for, /* in: index for foreign table */
+ dict_index_t* index_ref, /* in: index for referenced table */
+ ibool new_protocol); /* in: protocol > 1 */
+#endif /* WITH_WSREP */
#ifndef UNIV_NONINL
#include "rem0rec.ic"
#endif
diff --git a/storage/xtradb/include/srv0srv.h b/storage/xtradb/include/srv0srv.h
index a02c8a96e1a..a54198f4e00 100644
--- a/storage/xtradb/include/srv0srv.h
+++ b/storage/xtradb/include/srv0srv.h
@@ -323,6 +323,10 @@ extern uint srv_flush_log_at_timeout;
extern char srv_use_global_flush_log_at_trx_commit;
extern char srv_adaptive_flushing;
+#ifdef WITH_INNODB_DISALLOW_WRITES
+/* When this event is reset we do not allow any file writes to take place. */
+extern os_event_t srv_allow_writes_event;
+#endif /* WITH_INNODB_DISALLOW_WRITES */
/* If this flag is TRUE, then we will load the indexes' (and tables') metadata
even if they are marked as "corrupted". Mostly it is for DBA to process
corrupted index and table */
@@ -1167,4 +1171,13 @@ struct srv_slot_t{
# define srv_file_per_table 1
#endif /* !UNIV_HOTBACKUP */
+#ifdef WITH_WSREP
+UNIV_INTERN
+void
+wsrep_srv_conc_cancel_wait(
+/*==================*/
+ trx_t* trx); /*!< in: transaction object associated with the
+ thread */
+#endif /* WITH_WSREP */
+
#endif
diff --git a/storage/xtradb/include/sync0sync.ic b/storage/xtradb/include/sync0sync.ic
index c3529af5262..a302e1473a5 100644
--- a/storage/xtradb/include/sync0sync.ic
+++ b/storage/xtradb/include/sync0sync.ic
@@ -255,7 +255,10 @@ mutex_enter_func(
ulint line) /*!< in: line where locked */
{
ut_ad(mutex_validate(mutex));
+#ifndef WITH_WSREP
+ /* this cannot be be granted when BF trx kills a trx in lock wait state */
ut_ad(!mutex_own(mutex));
+#endif /* WITH_WSREP */
/* Note that we do not peek at the value of lock_word before trying
the atomic test_and_set; we could peek, and possibly save time. */
diff --git a/storage/xtradb/include/trx0sys.h b/storage/xtradb/include/trx0sys.h
index 7b97c6e99cd..7892fdd0bf1 100644
--- a/storage/xtradb/include/trx0sys.h
+++ b/storage/xtradb/include/trx0sys.h
@@ -42,6 +42,9 @@ Created 3/26/1996 Heikki Tuuri
#include "read0types.h"
#include "page0types.h"
#include "ut0bh.h"
+#ifdef WITH_WSREP
+#include "trx0xa.h"
+#endif /* WITH_WSREP */
typedef UT_LIST_BASE_NODE_T(trx_t) trx_list_t;
@@ -314,6 +317,9 @@ trx_sys_update_mysql_binlog_offset(
ib_int64_t offset, /*!< in: position in that log file */
ulint field, /*!< in: offset of the MySQL log info field in
the trx sys header */
+#ifdef WITH_WSREP
+ trx_sysf_t* sys_header, /*!< in: trx sys header */
+#endif /* WITH_WSREP */
mtr_t* mtr); /*!< in: mtr */
/*****************************************************************//**
Prints to stderr the MySQL binlog offset info in the trx system header if
@@ -322,6 +328,19 @@ UNIV_INTERN
void
trx_sys_print_mysql_binlog_offset(void);
/*===================================*/
+#ifdef WITH_WSREP
+/** Update WSREP checkpoint XID in sys header. */
+void
+trx_sys_update_wsrep_checkpoint(
+ const XID* xid, /*!< in: WSREP XID */
+ trx_sysf_t* sys_header, /*!< in: sys_header */
+ mtr_t* mtr); /*!< in: mtr */
+
+void
+/** Read WSREP checkpoint XID from sys header. */
+trx_sys_read_wsrep_checkpoint(
+ XID* xid); /*!< out: WSREP XID */
+#endif /* WITH_WSREP */
/*****************************************************************//**
Prints to stderr the MySQL master log offset info in the trx system header if
the magic number shows it valid. */
@@ -550,6 +569,20 @@ this contains the same fields as TRX_SYS_MYSQL_LOG_INFO below */
within that file */
#define TRX_SYS_MYSQL_LOG_NAME 12 /*!< MySQL log file name */
+#ifdef WITH_WSREP
+/* The offset to WSREP XID headers */
+#define TRX_SYS_WSREP_XID_INFO (UNIV_PAGE_SIZE - 3500)
+#define TRX_SYS_WSREP_XID_MAGIC_N_FLD 0
+#define TRX_SYS_WSREP_XID_MAGIC_N 0x77737265
+
+/* XID field: formatID, gtrid_len, bqual_len, xid_data */
+#define TRX_SYS_WSREP_XID_LEN (4 + 4 + 4 + XIDDATASIZE)
+#define TRX_SYS_WSREP_XID_FORMAT 4
+#define TRX_SYS_WSREP_XID_GTRID_LEN 8
+#define TRX_SYS_WSREP_XID_BQUAL_LEN 12
+#define TRX_SYS_WSREP_XID_DATA 16
+#endif /* WITH_WSREP*/
+
/** Doublewrite buffer */
/* @{ */
/** The offset of the doublewrite buffer header on the trx system header page */
diff --git a/storage/xtradb/include/trx0sys.ic b/storage/xtradb/include/trx0sys.ic
index 699148cff6d..6024c1dc94e 100644
--- a/storage/xtradb/include/trx0sys.ic
+++ b/storage/xtradb/include/trx0sys.ic
@@ -474,7 +474,10 @@ trx_id_t
trx_sys_get_new_trx_id(void)
/*========================*/
{
+#ifndef WITH_WSREP
+ /* wsrep_fake_trx_id violates this assert */
ut_ad(mutex_own(&trx_sys->mutex));
+#endif /* WITH_WSREP */
/* VERY important: after the database is started, max_trx_id value is
divisible by TRX_SYS_TRX_ID_WRITE_MARGIN, and the following if
diff --git a/storage/xtradb/include/trx0trx.h b/storage/xtradb/include/trx0trx.h
index aaa74724a14..be13c48fdfc 100644
--- a/storage/xtradb/include/trx0trx.h
+++ b/storage/xtradb/include/trx0trx.h
@@ -1031,6 +1031,9 @@ struct trx_t{
/*------------------------------*/
char detailed_error[256]; /*!< detailed error message for last
error, or empty. */
+#ifdef WITH_WSREP
+ os_event_t wsrep_event; /* event waited for in srv_conc_slot */
+#endif /* WITH_WSREP */
/*------------------------------*/
ulint io_reads;
ib_uint64_t io_read;
diff --git a/storage/xtradb/lock/lock0lock.cc b/storage/xtradb/lock/lock0lock.cc
index 4f9395e27d8..8f7724daad9 100644
--- a/storage/xtradb/lock/lock0lock.cc
+++ b/storage/xtradb/lock/lock0lock.cc
@@ -50,6 +50,11 @@ Created 5/7/1996 Heikki Tuuri
#include "dict0boot.h"
#include <set>
+#ifdef WITH_WSREP
+extern my_bool wsrep_debug;
+extern my_bool wsrep_log_conflicts;
+#include "ha_prototypes.h"
+#endif
/* Restricts the length of search we will do in the waits-for
graph of transactions */
#define LOCK_MAX_N_STEPS_IN_DEADLOCK_CHECK 1000000
@@ -946,6 +951,9 @@ UNIV_INLINE
ibool
lock_rec_has_to_wait(
/*=================*/
+#ifdef WITH_WSREP
+ ibool for_locking, /*!< is caller locking or releasing */
+#endif /* WITH_WSREP */
const trx_t* trx, /*!< in: trx of new lock */
ulint type_mode,/*!< in: precise mode of the new lock
to set: LOCK_S or LOCK_X, possibly
@@ -1016,6 +1024,50 @@ lock_rec_has_to_wait(
return(FALSE);
}
+#ifdef WITH_WSREP
+ /* if BF thread is locking and has conflict with another BF
+ thread, we need to look at trx ordering and lock types */
+ if (for_locking &&
+ wsrep_thd_is_BF(trx->mysql_thd, FALSE) &&
+ wsrep_thd_is_BF(lock2->trx->mysql_thd, TRUE)) {
+
+ if (wsrep_debug) {
+ fprintf(stderr, "\n BF-BF lock conflict \n");
+ lock_rec_print(stderr, lock2);
+ }
+
+ if (wsrep_trx_order_before(trx->mysql_thd,
+ lock2->trx->mysql_thd) &&
+ (type_mode & LOCK_MODE_MASK) == LOCK_X &&
+ (lock2->type_mode & LOCK_MODE_MASK) == LOCK_X)
+ {
+ /* exclusive lock conflicts are not accepted */
+ fprintf(stderr, "BF-BF X lock conflict,"
+ "type_mode: %lu supremum: %lu\n",
+ type_mode, lock_is_on_supremum);
+ fprintf(stderr, "conflicts states: my %d locked %d\n",
+ wsrep_thd_conflict_state(trx->mysql_thd, FALSE),
+ wsrep_thd_conflict_state(lock2->trx->mysql_thd, FALSE) );
+ lock_rec_print(stderr, lock2);
+ return FALSE;
+ //abort();
+ } else {
+ /* if lock2->index->n_uniq <=
+ lock2->index->n_user_defined_cols
+ operation is on uniq index
+ */
+ if (wsrep_debug) fprintf(stderr,
+ "BF conflict, modes: %lu %lu, "
+ "idx: %s-%s n_uniq %u n_user %u\n",
+ type_mode, lock2->type_mode,
+ lock2->index->name,
+ lock2->index->table_name,
+ lock2->index->n_uniq,
+ lock2->index->n_user_defined_cols);
+ return FALSE;
+ }
+ }
+#endif /* WITH_WSREP */
return(TRUE);
}
@@ -1046,7 +1098,11 @@ lock_has_to_wait(
/* If this lock request is for a supremum record
then the second bit on the lock bitmap is set */
+#ifdef WITH_WSREP
+ return(lock_rec_has_to_wait(FALSE, lock1->trx,
+#else
return(lock_rec_has_to_wait(lock1->trx,
+#endif /* WITH_WSREP */
lock1->type_mode, lock2,
lock_rec_get_nth_bit(
lock1, 1)));
@@ -1515,6 +1571,11 @@ lock_rec_has_expl(
return(NULL);
}
+#ifdef WITH_WSREP
+static
+void
+lock_rec_discard(lock_t* in_lock);
+#endif
#ifdef UNIV_DEBUG
/*********************************************************************//**
Checks if some other transaction has a lock request in the queue.
@@ -1561,6 +1622,69 @@ lock_rec_other_has_expl_req(
}
#endif /* UNIV_DEBUG */
+#ifdef WITH_WSREP
+static
+void
+wsrep_kill_victim(
+ const trx_t * const trx,
+ const lock_t *lock)
+{
+ ut_ad(lock_mutex_own());
+ ut_ad(trx_mutex_own(lock->trx));
+ my_bool bf_this = wsrep_thd_is_BF(trx->mysql_thd, FALSE);
+ my_bool bf_other = wsrep_thd_is_BF(lock->trx->mysql_thd, TRUE);
+
+ if ((bf_this && !bf_other) ||
+ (bf_this && bf_other && wsrep_trx_order_before(
+ trx->mysql_thd, lock->trx->mysql_thd))) {
+
+ if (lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
+ if (wsrep_debug) {
+ fprintf(stderr, "WSREP: BF victim waiting\n");
+ }
+ /* cannot release lock, until our lock
+ is in the queue*/
+ } else if (lock->trx != trx) {
+ if (wsrep_log_conflicts) {
+ mutex_enter(&trx_sys->mutex);
+ if (bf_this) {
+ fputs("\n*** Priority TRANSACTION:\n",
+ stderr);
+ } else {
+ fputs("\n*** Victim TRANSACTION:\n",
+ stderr);
+ }
+
+ trx_print_latched(stderr, trx, 3000);
+
+ if (bf_other) {
+ fputs("\n*** Priority TRANSACTION:\n",
+ stderr);
+ } else {
+ fputs("\n*** Victim TRANSACTION:\n",
+ stderr);
+ }
+
+ trx_print_latched(stderr, lock->trx, 3000);
+
+ mutex_exit(&trx_sys->mutex);
+
+ fputs("*** WAITING FOR THIS LOCK TO BE GRANTED:\n",
+ stderr);
+
+ if (lock_get_type(lock) == LOCK_REC) {
+ lock_rec_print(stderr, lock);
+ } else {
+ lock_table_print(stderr, lock);
+ }
+ }
+
+ wsrep_innobase_kill_one_trx(trx->mysql_thd,
+ (const trx_t*) trx, lock->trx, TRUE);
+ }
+ }
+}
+#endif
/*********************************************************************//**
Checks if some other transaction has a conflicting explicit lock request
in the queue, so that we have to wait.
@@ -1589,7 +1713,15 @@ lock_rec_other_has_conflicting(
lock != NULL;
lock = lock_rec_get_next_const(heap_no, lock)) {
- if (lock_rec_has_to_wait(trx, mode, lock, is_supremum)) {
+#ifdef WITH_WSREP
+ if (lock_rec_has_to_wait(TRUE, trx, mode, lock, is_supremum)) {
+ trx_mutex_enter(lock->trx);
+ wsrep_kill_victim((trx_t *)trx, (lock_t *)lock);
+ trx_mutex_exit(lock->trx);
+#else
+ if (lock_rec_has_to_wait(trx, mode, lock, is_supremum)) {
+#endif /* WITH_WSREP */
+
return(lock);
}
}
@@ -1782,6 +1914,28 @@ lock_number_of_rows_locked(
/*============== RECORD LOCK CREATION AND QUEUE MANAGEMENT =============*/
+#ifdef WITH_WSREP
+static
+void
+wsrep_print_wait_locks(
+/*============*/
+ lock_t* c_lock) /* conflicting lock to print */
+{
+ if (wsrep_debug && c_lock->trx->lock.wait_lock != c_lock) {
+ fprintf(stderr, "WSREP: c_lock != wait lock\n");
+ if (lock_get_type_low(c_lock) & LOCK_TABLE)
+ lock_table_print(stderr, c_lock);
+ else
+ lock_rec_print(stderr, c_lock);
+
+ if (lock_get_type_low(c_lock->trx->lock.wait_lock) & LOCK_TABLE)
+ lock_table_print(stderr, c_lock->trx->lock.wait_lock);
+ else
+ lock_rec_print(stderr, c_lock->trx->lock.wait_lock);
+ }
+}
+#endif /* WITH_WSREP */
+
/*********************************************************************//**
Creates a new record lock and inserts it to the lock queue. Does NOT check
for deadlocks or lock compatibility!
@@ -1790,6 +1944,10 @@ static
lock_t*
lock_rec_create(
/*============*/
+#ifdef WITH_WSREP
+ lock_t* const c_lock, /* conflicting lock */
+ que_thr_t* thr,
+#endif
ulint type_mode,/*!< in: lock mode and wait
flag, type is ignored and
replaced by LOCK_REC */
@@ -1861,8 +2019,91 @@ lock_rec_create(
ut_ad(index->table->n_ref_count > 0 || !index->table->can_be_evicted);
+#ifdef WITH_WSREP
+ if (c_lock && wsrep_thd_is_BF(trx->mysql_thd, FALSE)) {
+ lock_t *hash = (lock_t *)c_lock->hash;
+ lock_t *prev = NULL;
+
+ while (hash &&
+ wsrep_thd_is_BF(((lock_t *)hash)->trx->mysql_thd, TRUE) &&
+ wsrep_trx_order_before(
+ ((lock_t *)hash)->trx->mysql_thd,
+ trx->mysql_thd)) {
+ prev = hash;
+ hash = (lock_t *)hash->hash;
+ }
+ lock->hash = hash;
+ if (prev) {
+ prev->hash = lock;
+ } else {
+ c_lock->hash = lock;
+ }
+ /*
+ * delayed conflict resolution '...kill_one_trx' was not called,
+ * if victim was waiting for some other lock
+ */
+ trx_mutex_enter(c_lock->trx);
+ if (c_lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
+
+ c_lock->trx->lock.was_chosen_as_deadlock_victim = TRUE;
+
+ if (wsrep_debug) {
+ wsrep_print_wait_locks(c_lock);
+ }
+
+ trx->lock.que_state = TRX_QUE_LOCK_WAIT;
+ lock_set_lock_and_trx_wait(lock, trx);
+ UT_LIST_ADD_LAST(trx_locks, trx->lock.trx_locks, lock);
+
+ ut_ad(thr != NULL);
+ trx->lock.wait_thr = thr;
+ thr->state = QUE_THR_LOCK_WAIT;
+
+ /* have to release trx mutex for the duration of
+ victim lock release. This will eventually call
+ lock_grant, which wants to grant trx mutex again
+ */
+ if (caller_owns_trx_mutex) {
+ trx_mutex_exit(trx);
+ }
+ lock_cancel_waiting_and_release(
+ c_lock->trx->lock.wait_lock);
+
+ if (caller_owns_trx_mutex) {
+ trx_mutex_enter(trx);
+ }
+
+ /* trx might not wait for c_lock, but some other lock
+ does not matter if wait_lock was released above
+ */
+ if (c_lock->trx->lock.wait_lock == c_lock) {
+ lock_reset_lock_and_trx_wait(lock);
+ }
+
+ trx_mutex_exit(c_lock->trx);
+
+ if (wsrep_debug) {
+ fprintf(
+ stderr,
+ "WSREP: c_lock canceled %llu\n",
+ (ulonglong) c_lock->trx->id);
+ }
+
+ /* have to bail out here to avoid lock_set_lock... */
+ return(lock);
+ }
+ trx_mutex_exit(c_lock->trx);
+ } else if (wsrep_thd_is_BF(trx->mysql_thd, FALSE)) {
+ HASH_PREPEND(lock_t, hash, lock_sys->rec_hash,
+ lock_rec_fold(space, page_no), lock);
+ } else {
+ HASH_INSERT(lock_t, hash, lock_sys->rec_hash,
+ lock_rec_fold(space, page_no), lock);
+ }
+#else
HASH_INSERT(lock_t, hash, lock_sys->rec_hash,
lock_rec_fold(space, page_no), lock);
+#endif /* WITH_WSREP */
lock_sys->rec_num++;
@@ -1872,7 +2113,6 @@ lock_rec_create(
ut_ad(trx_mutex_own(trx));
if (type_mode & LOCK_WAIT) {
-
lock_set_lock_and_trx_wait(lock, trx);
}
@@ -1884,7 +2124,6 @@ lock_rec_create(
MONITOR_INC(MONITOR_RECLOCK_CREATED);
MONITOR_INC(MONITOR_NUM_RECLOCK);
-
return(lock);
}
@@ -1899,6 +2138,9 @@ static
dberr_t
lock_rec_enqueue_waiting(
/*=====================*/
+#ifdef WITH_WSREP
+ lock_t* c_lock, /* conflicting lock */
+#endif
ulint type_mode,/*!< in: lock mode this
transaction is requesting:
LOCK_S or LOCK_X, possibly
@@ -1959,7 +2201,10 @@ lock_rec_enqueue_waiting(
/* Enqueue the lock request that will wait to be granted, note that
we already own the trx mutex. */
lock = lock_rec_create(
- type_mode | LOCK_WAIT, block, heap_no, index, trx, TRUE);
+#ifdef WITH_WSREP
+ c_lock, thr,
+#endif /* WITH_WSREP */
+ type_mode | LOCK_WAIT, block, heap_no, index, trx, TRUE);
/* Release the mutex to obey the latching order.
This is safe, because lock_deadlock_check_and_resolve()
@@ -2064,7 +2309,19 @@ lock_rec_add_to_queue(
const lock_t* other_lock
= lock_rec_other_has_expl_req(mode, 0, LOCK_WAIT,
block, heap_no, trx->id);
+#ifdef WITH_WSREP
+ /* this can potentionally assert with wsrep */
+ if (wsrep_thd_is_wsrep(trx->mysql_thd)) {
+ if (wsrep_debug && other_lock) {
+ fprintf(stderr,
+ "WSREP: InnoDB assert ignored\n");
+ }
+ } else {
+ ut_a(!other_lock);
+ }
+#else
ut_a(!other_lock);
+#endif /* WITH_WSREP */
}
#endif /* UNIV_DEBUG */
@@ -2092,7 +2349,16 @@ lock_rec_add_to_queue(
if (lock_get_wait(lock)
&& lock_rec_get_nth_bit(lock, heap_no)) {
-
+#ifdef WITH_WSREP
+ if (wsrep_thd_is_BF(trx->mysql_thd, FALSE)) {
+ if (wsrep_debug) {
+ fprintf(stderr,
+ "BF skipping wait: %lu\n",
+ trx->id);
+ lock_rec_print(stderr, lock);
+ }
+ } else
+#endif
goto somebody_waits;
}
}
@@ -2115,9 +2381,15 @@ lock_rec_add_to_queue(
}
somebody_waits:
+#ifdef WITH_WSREP
+ return(lock_rec_create(NULL, NULL,
+ type_mode, block, heap_no, index, trx,
+ caller_owns_trx_mutex));
+#else
return(lock_rec_create(
type_mode, block, heap_no, index, trx,
caller_owns_trx_mutex));
+#endif /* WITH_WSREP */
}
/** Record locking request status */
@@ -2180,8 +2452,13 @@ lock_rec_lock_fast(
if (lock == NULL) {
if (!impl) {
/* Note that we don't own the trx mutex. */
+#ifdef WITH_WSREP
+ lock = lock_rec_create(NULL, thr,
+ mode, block, heap_no, index, trx, FALSE);
+#else
lock = lock_rec_create(
mode, block, heap_no, index, trx, FALSE);
+#endif
}
status = LOCK_REC_SUCCESS_CREATED;
@@ -2235,6 +2512,9 @@ lock_rec_lock_slow(
que_thr_t* thr) /*!< in: query thread */
{
trx_t* trx;
+#ifdef WITH_WSREP
+ lock_t* c_lock(NULL);
+#endif
dberr_t err = DB_SUCCESS;
ut_ad(lock_mutex_own());
@@ -2259,17 +2539,31 @@ lock_rec_lock_slow(
/* The trx already has a strong enough lock on rec: do
nothing */
+#ifdef WITH_WSREP
+ } else if ((c_lock = (lock_t *)lock_rec_other_has_conflicting(
+ static_cast<enum lock_mode>(mode),
+ block, heap_no, trx))) {
+#else
} else if (lock_rec_other_has_conflicting(
static_cast<enum lock_mode>(mode),
block, heap_no, trx)) {
+#endif /* WITH_WSREP */
/* If another transaction has a non-gap conflicting
request in the queue, as this transaction does not
have a lock strong enough already granted on the
record, we have to wait. */
+#ifdef WITH_WSREP
+ /* c_lock is NULL here if jump to enqueue_waiting happened
+ but it's ok because lock is not NULL in that case and c_lock
+ is not used. */
+ err = lock_rec_enqueue_waiting(c_lock,
+ mode, block, heap_no, index, thr);
+#else
err = lock_rec_enqueue_waiting(
mode, block, heap_no, index, thr);
+#endif /* WITH_WSREP */
} else if (!impl) {
/* Set the requested lock on the record, note that
@@ -2321,6 +2615,7 @@ lock_rec_lock(
ut_ad(mode - (LOCK_MODE_MASK & mode) == LOCK_GAP
|| mode - (LOCK_MODE_MASK & mode) == LOCK_REC_NOT_GAP
|| mode - (LOCK_MODE_MASK & mode) == 0);
+
ut_ad(dict_index_is_clust(index) || !dict_index_is_online_ddl(index));
/* We try a simplified and faster subroutine for the most
@@ -2375,7 +2670,13 @@ lock_rec_has_to_wait_in_queue(
if (heap_no < lock_rec_get_n_bits(lock)
&& (p[bit_offset] & bit_mask)
&& lock_has_to_wait(wait_lock, lock)) {
-
+#ifdef WITH_WSREP
+ if (wsrep_thd_is_BF(wait_lock->trx->mysql_thd, FALSE) &&
+ wsrep_thd_is_BF(lock->trx->mysql_thd, TRUE)) {
+ /* don't wait for another BF lock */
+ continue;
+ }
+#endif
return(lock);
}
}
@@ -3761,10 +4062,22 @@ lock_deadlock_select_victim(
/* The joining transaction is 'smaller',
choose it as the victim and roll it back. */
- return(ctx->start);
+#ifdef WITH_WSREP
+ if (wsrep_thd_is_BF(ctx->start->mysql_thd, TRUE)) {
+ return(ctx->wait_lock->trx);
+ }
+ else
+#endif /* WITH_WSREP */
+ return(ctx->start);
}
- return(ctx->wait_lock->trx);
+#ifdef WITH_WSREP
+ if (wsrep_thd_is_BF(ctx->wait_lock->trx->mysql_thd, TRUE)) {
+ return(ctx->start);
+ }
+ else
+#endif /* WITH_WSREP */
+ return(ctx->wait_lock->trx);
}
/********************************************************************//**
@@ -3893,8 +4206,14 @@ lock_deadlock_search(
ctx->too_deep = TRUE;
+#ifdef WITH_WSREP
+ if (wsrep_thd_is_BF(ctx->start->mysql_thd, TRUE)) {
+ return(ctx->wait_lock->trx->id);
+ }
+ else
+#endif /* WITH_WSREP */
/* Select the joining transaction as the victim. */
- return(ctx->start->id);
+ return(ctx->start->id);
} else if (lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
@@ -3911,7 +4230,12 @@ lock_deadlock_search(
ctx->too_deep = TRUE;
- return(ctx->start->id);
+#ifdef WITH_WSREP
+ if (wsrep_thd_is_BF(ctx->start->mysql_thd, TRUE))
+ return(lock->trx->id);
+ else
+#endif /* WITH_WSREP */
+ return(ctx->start->id);
}
ctx->wait_lock = lock->trx->lock.wait_lock;
@@ -4029,9 +4353,18 @@ lock_deadlock_check_and_resolve(
ut_a(trx == ctx.start);
ut_a(victim_trx_id == trx->id);
- if (!srv_read_only_mode) {
- lock_deadlock_joining_trx_print(trx, lock);
+#ifdef WITH_WSREP
+ if (!wsrep_thd_is_BF(ctx.start->mysql_thd, TRUE))
+ {
+#endif /* WITH_WSREP */
+ if (!srv_read_only_mode) {
+ lock_deadlock_joining_trx_print(trx, lock);
+ }
+#ifdef WITH_WSREP
+ } else {
+ /* BF processor */;
}
+#endif /* WITH_WSREP */
MONITOR_INC(MONITOR_DEADLOCK);
@@ -4071,6 +4404,9 @@ UNIV_INLINE
lock_t*
lock_table_create(
/*==============*/
+#ifdef WITH_WSREP
+ lock_t* c_lock, /*!< in: conflicting lock */
+#endif
dict_table_t* table, /*!< in/out: database table
in dictionary cache */
ulint type_mode,/*!< in: lock mode possibly ORed with
@@ -4114,7 +4450,59 @@ lock_table_create(
ut_ad(table->n_ref_count > 0 || !table->can_be_evicted);
UT_LIST_ADD_LAST(trx_locks, trx->lock.trx_locks, lock);
+
+#ifdef WITH_WSREP
+ if (wsrep_thd_is_wsrep(trx->mysql_thd)) {
+ if (c_lock && wsrep_thd_is_BF(trx->mysql_thd, FALSE)) {
+ UT_LIST_INSERT_AFTER(
+ un_member.tab_lock.locks, table->locks, c_lock, lock);
+ } else {
+ UT_LIST_ADD_LAST(un_member.tab_lock.locks, table->locks, lock);
+ }
+
+ if (c_lock) {
+ trx_mutex_enter(c_lock->trx);
+ }
+
+ if (c_lock && c_lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
+
+ c_lock->trx->lock.was_chosen_as_deadlock_victim = TRUE;
+
+ if (wsrep_debug) {
+ wsrep_print_wait_locks(c_lock);
+ wsrep_print_wait_locks(c_lock->trx->lock.wait_lock);
+ }
+
+ /* have to release trx mutex for the duration of
+ victim lock release. This will eventually call
+ lock_grant, which wants to grant trx mutex again
+ */
+ /* caller has trx_mutex, have to release for lock cancel */
+ trx_mutex_exit(trx);
+ lock_cancel_waiting_and_release(c_lock->trx->lock.wait_lock);
+ trx_mutex_enter(trx);
+
+ /* trx might not wait for c_lock, but some other lock
+ does not matter if wait_lock was released above
+ */
+ if (c_lock->trx->lock.wait_lock == c_lock) {
+ lock_reset_lock_and_trx_wait(lock);
+ }
+
+ if (wsrep_debug) {
+ fprintf(stderr, "WSREP: c_lock canceled %llu\n",
+ (ulonglong) c_lock->trx->id);
+ }
+ }
+ if (c_lock) {
+ trx_mutex_exit(c_lock->trx);
+ }
+ } else {
+ UT_LIST_ADD_LAST(un_member.tab_lock.locks, table->locks, lock);
+ }
+#else
UT_LIST_ADD_LAST(un_member.tab_lock.locks, table->locks, lock);
+#endif /* WITH_WSREP */
if (UNIV_UNLIKELY(type_mode & LOCK_WAIT)) {
@@ -4271,6 +4659,9 @@ static
dberr_t
lock_table_enqueue_waiting(
/*=======================*/
+#ifdef WITH_WSREP
+ lock_t* c_lock, /*!< in: conflicting lock */
+#endif
ulint mode, /*!< in: lock mode this transaction is
requesting */
dict_table_t* table, /*!< in/out: table */
@@ -4317,7 +4708,14 @@ lock_table_enqueue_waiting(
/* Enqueue the lock request that will wait to be granted */
+#ifdef WITH_WSREP
+ if (trx->lock.was_chosen_as_deadlock_victim) {
+ return(DB_DEADLOCK);
+ }
+ lock = lock_table_create(c_lock, table, mode | LOCK_WAIT, trx);
+#else
lock = lock_table_create(table, mode | LOCK_WAIT, trx);
+#endif /* WITH_WSREP */
/* Release the mutex to obey the latching order.
This is safe, because lock_deadlock_check_and_resolve()
@@ -4394,6 +4792,18 @@ lock_table_other_has_incompatible(
&& !lock_mode_compatible(lock_get_mode(lock), mode)
&& (wait || !lock_get_wait(lock))) {
+#ifdef WITH_WSREP
+ if(wsrep_thd_is_wsrep(trx->mysql_thd)) {
+ if (wsrep_debug) {
+ fprintf(stderr, "WSREP: trx %ld table lock abort\n",
+ trx->id);
+ }
+ trx_mutex_enter(lock->trx);
+ wsrep_kill_victim((trx_t *)trx, (lock_t *)lock);
+ trx_mutex_exit(lock->trx);
+ }
+#endif
+
return(lock);
}
}
@@ -4416,6 +4826,9 @@ lock_table(
enum lock_mode mode, /*!< in: lock mode */
que_thr_t* thr) /*!< in: query thread */
{
+#ifdef WITH_WSREP
+ lock_t *c_lock = NULL;
+#endif
trx_t* trx;
dberr_t err;
const lock_t* wait_for;
@@ -4450,8 +4863,13 @@ lock_table(
/* We have to check if the new lock is compatible with any locks
other transactions have in the table lock queue. */
+#ifdef WITH_WSREP
+ wait_for = lock_table_other_has_incompatible(
+ trx, LOCK_WAIT, table, mode);
+#else
wait_for = lock_table_other_has_incompatible(
trx, LOCK_WAIT, table, mode);
+#endif
trx_mutex_enter(trx);
@@ -4459,9 +4877,17 @@ lock_table(
mode: this trx may have to wait */
if (wait_for != NULL) {
+#ifdef WITH_WSREP
+ err = lock_table_enqueue_waiting((ib_lock_t*)wait_for, mode | flags, table, thr);
+#else
err = lock_table_enqueue_waiting(mode | flags, table, thr);
+#endif
} else {
+#ifdef WITH_WSREP
+ lock_table_create(c_lock, table, mode | flags, trx);
+#else
lock_table_create(table, mode | flags, trx);
+#endif
ut_a(!flags || mode == LOCK_S || mode == LOCK_X);
@@ -4499,7 +4925,11 @@ lock_table_ix_resurrect(
trx, LOCK_WAIT, table, LOCK_IX));
trx_mutex_enter(trx);
+#ifdef WITH_WSREP
+ lock_table_create(NULL, table, LOCK_IX, trx);
+#else
lock_table_create(table, LOCK_IX, trx);
+#endif
lock_mutex_exit();
trx_mutex_exit(trx);
}
@@ -5640,16 +6070,20 @@ lock_rec_queue_validate(
if (!lock_rec_get_gap(lock) && !lock_get_wait(lock)) {
- enum lock_mode mode;
+#ifndef WITH_WSREP
+ if (wsrep_thd_is_wsrep(lock->trx->mysql_thd)) {
+ enum lock_mode mode;
- if (lock_get_mode(lock) == LOCK_S) {
- mode = LOCK_X;
- } else {
- mode = LOCK_S;
+
+ if (lock_get_mode(lock) == LOCK_S) {
+ mode = LOCK_X;
+ } else {
+ mode = LOCK_S;
+ }
+ ut_a(!lock_rec_other_has_expl_req(
+ mode, 0, 0, block, heap_no, lock->trx->id));
}
- ut_a(!lock_rec_other_has_expl_req(
- mode, 0, 0, block, heap_no,
- lock->trx->id));
+#endif /* WITH_WSREP */
} else if (lock_get_wait(lock) && !lock_rec_get_gap(lock)) {
@@ -5953,6 +6387,9 @@ lock_rec_insert_check_and_lock(
lock_t* lock;
dberr_t err;
ulint next_rec_heap_no;
+#ifdef WITH_WSREP
+ lock_t* c_lock=NULL;
+#endif
ut_ad(block->frame == page_align(rec));
ut_ad(!dict_index_is_online_ddl(index)
@@ -6014,17 +6451,30 @@ lock_rec_insert_check_and_lock(
had to wait for their insert. Both had waiting gap type lock requests
on the successor, which produced an unnecessary deadlock. */
+#ifdef WITH_WSREP
+ if ((c_lock = (ib_lock_t*)lock_rec_other_has_conflicting(
+ static_cast<enum lock_mode>(
+ LOCK_X | LOCK_GAP | LOCK_INSERT_INTENTION),
+ block, next_rec_heap_no, trx))) {
+#else
if (lock_rec_other_has_conflicting(
static_cast<enum lock_mode>(
LOCK_X | LOCK_GAP | LOCK_INSERT_INTENTION),
block, next_rec_heap_no, trx)) {
+#endif /* WITH_WSREP */
/* Note that we may get DB_SUCCESS also here! */
trx_mutex_enter(trx);
+#ifdef WITH_WSREP
+ err = lock_rec_enqueue_waiting(c_lock,
+ LOCK_X | LOCK_GAP | LOCK_INSERT_INTENTION,
+ block, next_rec_heap_no, index, thr);
+#else
err = lock_rec_enqueue_waiting(
LOCK_X | LOCK_GAP | LOCK_INSERT_INTENTION,
block, next_rec_heap_no, index, thr);
+#endif /* WITH_WSREP */
trx_mutex_exit(trx);
} else {
diff --git a/storage/xtradb/lock/lock0wait.cc b/storage/xtradb/lock/lock0wait.cc
index a1c35e20ead..388c847f580 100644
--- a/storage/xtradb/lock/lock0wait.cc
+++ b/storage/xtradb/lock/lock0wait.cc
@@ -184,6 +184,28 @@ lock_wait_table_reserve_slot(
return(NULL);
}
+#ifdef WITH_WSREP
+/*********************************************************************//**
+check if lock timeout was for priority thread,
+as a side effect trigger lock monitor
+@return false for regular lock timeout */
+static ibool
+wsrep_is_BF_lock_timeout(
+/*====================*/
+ trx_t* trx) /* in: trx to check for lock priority */
+{
+ if (wsrep_on(trx->mysql_thd) &&
+ wsrep_thd_is_BF(trx->mysql_thd, FALSE)) {
+ fprintf(stderr, "WSREP: BF lock wait long\n");
+ srv_print_innodb_monitor = TRUE;
+ srv_print_innodb_lock_monitor = TRUE;
+ os_event_set(srv_monitor_event);
+ return TRUE;
+ }
+ return FALSE;
+ }
+#endif /* WITH_WSREP */
+
/***************************************************************//**
Puts a user OS thread to wait for a lock to be released. If an error
occurs during the wait trx->error_state associated with thr is
@@ -375,9 +397,15 @@ lock_wait_suspend_thread(
if (lock_wait_timeout < 100000000
&& wait_time > (double) lock_wait_timeout) {
+#ifdef WITH_WSREP
+ if (!wsrep_is_BF_lock_timeout(trx)) {
+#endif /* WITH_WSREP */
trx->error_state = DB_LOCK_WAIT_TIMEOUT;
+#ifdef WITH_WSREP
+ }
+#endif /* WITH_WSREP */
MONITOR_INC(MONITOR_TIMEOUT);
}
@@ -461,8 +489,13 @@ lock_wait_check_and_cancel(
if (trx->lock.wait_lock) {
ut_a(trx->lock.que_state == TRX_QUE_LOCK_WAIT);
-
+#ifdef WITH_WSREP
+ if (!wsrep_is_BF_lock_timeout(trx)) {
+#endif /* WITH_WSREP */
lock_cancel_waiting_and_release(trx->lock.wait_lock);
+#ifdef WITH_WSREP
+ }
+#endif /* WITH_WSREP */
}
lock_mutex_exit();
diff --git a/storage/xtradb/os/os0file.cc b/storage/xtradb/os/os0file.cc
index 394c3199f76..edcb6662fe9 100644
--- a/storage/xtradb/os/os0file.cc
+++ b/storage/xtradb/os/os0file.cc
@@ -110,6 +110,12 @@ UNIV_INTERN os_ib_mutex_t os_file_seek_mutexes[OS_FILE_N_SEEK_MUTEXES];
/* In simulated aio, merge at most this many consecutive i/os */
#define OS_AIO_MERGE_N_CONSECUTIVE 64
+#ifdef WITH_INNODB_DISALLOW_WRITES
+#define WAIT_ALLOW_WRITES() os_event_wait(srv_allow_writes_event)
+#else
+#define WAIT_ALLOW_WRITES() do { } while (0)
+#endif /* WITH_INNODB_DISALLOW_WRITES */
+
/**********************************************************************
InnoDB AIO Implementation:
@@ -999,7 +1005,9 @@ os_file_create_tmpfile(void)
/*========================*/
{
FILE* file = NULL;
- int fd = innobase_mysql_tmpfile();
+ int fd;
+ WAIT_ALLOW_WRITES();
+ fd = innobase_mysql_tmpfile();
ut_ad(!srv_read_only_mode);
@@ -1325,6 +1333,7 @@ os_file_create_directory(
return(TRUE);
#else
int rcode;
+ WAIT_ALLOW_WRITES();
rcode = mkdir(pathname, 0770);
@@ -1451,6 +1460,8 @@ os_file_create_simple_func(
#else /* __WIN__ */
int create_flag;
+ if (create_mode != OS_FILE_OPEN && create_mode != OS_FILE_OPEN_RAW)
+ WAIT_ALLOW_WRITES();
ut_a(!(create_mode & OS_FILE_ON_ERROR_SILENT));
ut_a(!(create_mode & OS_FILE_ON_ERROR_NO_EXIT));
@@ -1638,6 +1649,8 @@ os_file_create_simple_no_error_handling_func(
int create_flag;
ut_a(name);
+ if (create_mode != OS_FILE_OPEN && create_mode != OS_FILE_OPEN_RAW)
+ WAIT_ALLOW_WRITES();
ut_a(!(create_mode & OS_FILE_ON_ERROR_SILENT));
ut_a(!(create_mode & OS_FILE_ON_ERROR_NO_EXIT));
@@ -2013,6 +2026,8 @@ os_file_create_func(
#else /* __WIN__ */
int create_flag;
const char* mode_str = NULL;
+ if (create_mode != OS_FILE_OPEN && create_mode != OS_FILE_OPEN_RAW)
+ WAIT_ALLOW_WRITES();
on_error_no_exit = create_mode & OS_FILE_ON_ERROR_NO_EXIT
? TRUE : FALSE;
@@ -2212,6 +2227,7 @@ loop:
goto loop;
#else
int ret;
+ WAIT_ALLOW_WRITES();
ret = unlink(name);
@@ -2276,6 +2292,7 @@ loop:
goto loop;
#else
int ret;
+ WAIT_ALLOW_WRITES();
ret = unlink(name);
@@ -2329,6 +2346,7 @@ os_file_rename_func(
return(FALSE);
#else
int ret;
+ WAIT_ALLOW_WRITES();
ret = rename(oldpath, newpath);
@@ -2562,6 +2580,7 @@ os_file_set_eof(
HANDLE h = (HANDLE) _get_osfhandle(fileno(file));
return(SetEndOfFile(h));
#else /* __WIN__ */
+ WAIT_ALLOW_WRITES();
return(!ftruncate(fileno(file), ftell(file)));
#endif /* __WIN__ */
}
@@ -2581,6 +2600,7 @@ os_file_set_eof_at(
return(SetFilePointerEx(file, li, &li2,FILE_BEGIN)
&& SetEndOfFile(file));
#else
+ WAIT_ALLOW_WRITES();
/* TODO: works only with -D_FILE_OFFSET_BITS=64 ? */
return(!ftruncate(file, new_len));
#endif
@@ -2680,6 +2700,7 @@ os_file_flush_func(
return(FALSE);
#else
int ret;
+ WAIT_ALLOW_WRITES();
#if defined(HAVE_DARWIN_THREADS)
# ifndef F_FULLFSYNC
@@ -3382,6 +3403,7 @@ retry:
return(FALSE);
#else
ssize_t ret;
+ WAIT_ALLOW_WRITES();
ret = os_file_pwrite(file, buf, n, offset);
diff --git a/storage/xtradb/rem/rem0rec.cc b/storage/xtradb/rem/rem0rec.cc
index 0d7b7c16785..ca9b0a6b841 100644
--- a/storage/xtradb/rem/rem0rec.cc
+++ b/storage/xtradb/rem/rem0rec.cc
@@ -33,6 +33,9 @@ Created 5/30/1994 Heikki Tuuri
#include "mtr0mtr.h"
#include "mtr0log.h"
#include "fts0fts.h"
+#ifdef WITH_WSREP
+#include <ha_prototypes.h>
+#endif /* WITH_WSREP */
/* PHYSICAL RECORD (OLD STYLE)
===========================
@@ -1917,6 +1920,138 @@ rec_print(
}
}
}
+#endif /* !UNIV_HOTBACKUP */
+
+#ifdef WITH_WSREP
+int
+wsrep_rec_get_foreign_key(
+ byte *buf, /* out: extracted key */
+ ulint *buf_len, /* in/out: length of buf */
+ const rec_t* rec, /* in: physical record */
+ dict_index_t* index_for, /* in: index in foreign table */
+ dict_index_t* index_ref, /* in: index in referenced table */
+ ibool new_protocol) /* in: protocol > 1 */
+{
+ const byte* data;
+ ulint len;
+ ulint key_len = 0;
+ ulint i;
+ uint key_parts;
+ mem_heap_t* heap = NULL;
+ ulint offsets_[REC_OFFS_NORMAL_SIZE];
+ const ulint* offsets;
+
+ ut_ad(index_for);
+ ut_ad(index_ref);
+
+ rec_offs_init(offsets_);
+ offsets = rec_get_offsets(rec, index_for, offsets_,
+ ULINT_UNDEFINED, &heap);
+
+ ut_ad(rec_offs_validate(rec, NULL, offsets));
+
+ ut_ad(rec);
+
+ key_parts = dict_index_get_n_unique_in_tree(index_for);
+ for (i = 0;
+ i < key_parts &&
+ (index_for->type & DICT_CLUSTERED || i < key_parts - 1);
+ i++) {
+ dict_field_t* field_f =
+ dict_index_get_nth_field(index_for, i);
+ const dict_col_t* col_f = dict_field_get_col(field_f);
+ dict_field_t* field_r =
+ dict_index_get_nth_field(index_ref, i);
+ const dict_col_t* col_r = dict_field_get_col(field_r);
+
+ data = rec_get_nth_field(rec, offsets, i, &len);
+ if (key_len + ((len != UNIV_SQL_NULL) ? len + 1 : 1) >
+ *buf_len) {
+ fprintf (stderr,
+ "WSREP: FK key len exceeded %lu %lu %lu\n",
+ key_len, len, *buf_len);
+ goto err_out;
+ }
+
+ if (len == UNIV_SQL_NULL) {
+ ut_a(!(col_f->prtype & DATA_NOT_NULL));
+ *buf++ = 1;
+ key_len++;
+ } else if (!new_protocol) {
+ if (!(col_r->prtype & DATA_NOT_NULL)) {
+ *buf++ = 0;
+ key_len++;
+ }
+ memcpy(buf, data, len);
+ *buf_len = wsrep_innobase_mysql_sort(
+ (int)(col_f->prtype & DATA_MYSQL_TYPE_MASK),
+ (uint)dtype_get_charset_coll(col_f->prtype),
+ buf, len, *buf_len);
+ } else { /* new protocol */
+ if (!(col_r->prtype & DATA_NOT_NULL)) {
+ *buf++ = 0;
+ key_len++;
+ }
+ switch (col_f->mtype) {
+ case DATA_INT: {
+ byte* ptr = buf+len;
+ for (;;) {
+ ptr--;
+ *ptr = *data;
+ if (ptr == buf) {
+ break;
+ }
+ data++;
+ }
+
+ if (!(col_f->prtype & DATA_UNSIGNED)) {
+ buf[len-1] = (byte) (buf[len-1] ^ 128);
+ }
+
+ break;
+ }
+ case DATA_VARCHAR:
+ case DATA_VARMYSQL:
+ case DATA_CHAR:
+ case DATA_MYSQL:
+ /* Copy the actual data */
+ ut_memcpy(buf, data, len);
+ len = wsrep_innobase_mysql_sort(
+ (int)
+ (col_f->prtype & DATA_MYSQL_TYPE_MASK),
+ (uint)
+ dtype_get_charset_coll(col_f->prtype),
+ buf, len, *buf_len);
+ break;
+ case DATA_BLOB:
+ case DATA_BINARY:
+ memcpy(buf, data, len);
+ break;
+ default:
+ break;
+ }
+
+ key_len += len;
+ buf += len;
+ }
+ }
+
+ rec_validate(rec, offsets);
+
+ if (UNIV_LIKELY_NULL(heap)) {
+ mem_heap_free(heap);
+ }
+
+ *buf_len = key_len;
+ return DB_SUCCESS;
+
+ err_out:
+ if (UNIV_LIKELY_NULL(heap)) {
+ mem_heap_free(heap);
+ }
+ return DB_ERROR;
+}
+#endif /* WITH_WSREP */
# ifdef UNIV_DEBUG
/************************************************************//**
@@ -1959,5 +2094,5 @@ rec_get_trx_id(
return(trx_read_trx_id(trx_id));
}
-# endif /* UNIV_DEBUG */
-#endif /* !UNIV_HOTBACKUP */
+#endif /* UNIV_DEBUG */
+
diff --git a/storage/xtradb/row/row0ins.cc b/storage/xtradb/row/row0ins.cc
index f8ca40fac12..444fac87842 100644
--- a/storage/xtradb/row/row0ins.cc
+++ b/storage/xtradb/row/row0ins.cc
@@ -918,6 +918,14 @@ row_ins_invalidate_query_cache(
innobase_invalidate_query_cache(thr_get_trx(thr), buf, len);
mem_free(buf);
}
+#ifdef WITH_WSREP
+dberr_t wsrep_append_foreign_key(trx_t *trx,
+ dict_foreign_t* foreign,
+ const rec_t* clust_rec,
+ dict_index_t* clust_index,
+ ibool referenced,
+ ibool shared);
+#endif /* WITH_WSREP */
/*********************************************************************//**
Perform referential actions or checks when a parent row is deleted or updated
@@ -1269,7 +1277,19 @@ row_ins_foreign_check_on_constraint(
cascade->state = UPD_NODE_UPDATE_CLUSTERED;
- err = row_update_cascade_for_mysql(thr, cascade,
+#ifdef WITH_WSREP
+ err = wsrep_append_foreign_key(
+ thr_get_trx(thr),
+ foreign,
+ clust_rec,
+ clust_index,
+ FALSE, FALSE);
+ if (err != DB_SUCCESS) {
+ fprintf(stderr,
+ "WSREP: foreign key append failed: %d\n", err);
+ } else
+#endif /* WITH_WSREP */
+ err = row_update_cascade_for_mysql(thr, cascade,
foreign->foreign_table);
if (foreign->foreign_table->n_foreign_key_checks_running == 0) {
@@ -1607,7 +1627,14 @@ run_again:
if (check_ref) {
err = DB_SUCCESS;
-
+#ifdef WITH_WSREP
+ err = wsrep_append_foreign_key(
+ thr_get_trx(thr),
+ foreign,
+ rec,
+ check_index,
+ check_ref, TRUE);
+#endif /* WITH_WSREP */
goto end_scan;
} else if (foreign->type != 0) {
/* There is an ON UPDATE or ON DELETE
diff --git a/storage/xtradb/row/row0upd.cc b/storage/xtradb/row/row0upd.cc
index 3ead385c2cd..bdd2a691368 100644
--- a/storage/xtradb/row/row0upd.cc
+++ b/storage/xtradb/row/row0upd.cc
@@ -53,6 +53,9 @@ Created 12/27/1996 Heikki Tuuri
#include "pars0sym.h"
#include "eval0eval.h"
#include "buf0lru.h"
+#ifdef WITH_WSREP
+extern my_bool wsrep_debug;
+#endif
/* What kind of latch and lock can we assume when the control comes to
@@ -172,6 +175,50 @@ func_exit:
return(is_referenced);
}
+#ifdef WITH_WSREP
+static
+ibool
+wsrep_row_upd_index_is_foreign(
+/*========================*/
+ dict_index_t* index, /*!< in: index */
+ trx_t* trx) /*!< in: transaction */
+{
+ dict_table_t* table = index->table;
+ dict_foreign_t* foreign;
+ ibool froze_data_dict = FALSE;
+ ibool is_referenced = FALSE;
+
+ if (!UT_LIST_GET_FIRST(table->foreign_list)) {
+
+ return(FALSE);
+ }
+
+ if (trx->dict_operation_lock_mode == 0) {
+ row_mysql_freeze_data_dictionary(trx);
+ froze_data_dict = TRUE;
+ }
+
+ foreign = UT_LIST_GET_FIRST(table->foreign_list);
+
+ while (foreign) {
+ if (foreign->foreign_index == index) {
+
+ is_referenced = TRUE;
+ goto func_exit;
+ }
+
+ foreign = UT_LIST_GET_NEXT(foreign_list, foreign);
+ }
+
+func_exit:
+ if (froze_data_dict) {
+ row_mysql_unfreeze_data_dictionary(trx);
+ }
+
+ return(is_referenced);
+}
+#endif /* WITH_WSREP */
+
/*********************************************************************//**
Checks if possible foreign key constraints hold after a delete of the record
under pcur.
@@ -289,7 +336,123 @@ run_again:
}
err = DB_SUCCESS;
+func_exit:
+ if (got_s_lock) {
+ row_mysql_unfreeze_data_dictionary(trx);
+ }
+
+ mem_heap_free(heap);
+ return(err);
+}
+#ifdef WITH_WSREP
+static
+dberr_t
+wsrep_row_upd_check_foreign_constraints(
+/*=================================*/
+ upd_node_t* node, /*!< in: row update node */
+ btr_pcur_t* pcur, /*!< in: cursor positioned on a record; NOTE: the
+ cursor position is lost in this function! */
+ dict_table_t* table, /*!< in: table in question */
+ dict_index_t* index, /*!< in: index of the cursor */
+ ulint* offsets,/*!< in/out: rec_get_offsets(pcur.rec, index) */
+ que_thr_t* thr, /*!< in: query thread */
+ mtr_t* mtr) /*!< in: mtr */
+{
+ dict_foreign_t* foreign;
+ mem_heap_t* heap;
+ dtuple_t* entry;
+ trx_t* trx;
+ const rec_t* rec;
+ ulint n_ext;
+ dberr_t err;
+ ibool got_s_lock = FALSE;
+ ibool opened = FALSE;
+
+ if (UT_LIST_GET_FIRST(table->foreign_list) == NULL) {
+
+ return(DB_SUCCESS);
+ }
+
+ trx = thr_get_trx(thr);
+
+ /* TODO: make native slave thread bail out here */
+
+ rec = btr_pcur_get_rec(pcur);
+ ut_ad(rec_offs_validate(rec, index, offsets));
+
+ heap = mem_heap_create(500);
+
+ entry = row_rec_to_index_entry(rec, index, offsets,
+ &n_ext, heap);
+
+ mtr_commit(mtr);
+
+ mtr_start(mtr);
+
+ if (trx->dict_operation_lock_mode == 0) {
+ got_s_lock = TRUE;
+
+ row_mysql_freeze_data_dictionary(trx);
+ }
+
+ foreign = UT_LIST_GET_FIRST(table->foreign_list);
+
+ while (foreign) {
+ /* Note that we may have an update which updates the index
+ record, but does NOT update the first fields which are
+ referenced in a foreign key constraint. Then the update does
+ NOT break the constraint. */
+
+ if (foreign->foreign_index == index
+ && (node->is_delete
+ || row_upd_changes_first_fields_binary(
+ entry, index, node->update,
+ foreign->n_fields))) {
+
+ if (foreign->referenced_table == NULL) {
+ foreign->referenced_table =
+ dict_table_open_on_name(
+ foreign->referenced_table_name_lookup,
+ FALSE, FALSE, DICT_ERR_IGNORE_NONE);
+ opened = TRUE;
+ }
+
+ if (foreign->referenced_table) {
+ os_inc_counter(dict_sys->mutex,
+ foreign->referenced_table
+ ->n_foreign_key_checks_running);
+ }
+
+ /* NOTE that if the thread ends up waiting for a lock
+ we will release dict_operation_lock temporarily!
+ But the counter on the table protects 'foreign' from
+ being dropped while the check is running. */
+
+ err = row_ins_check_foreign_constraint(
+ TRUE, foreign, table, entry, thr);
+
+ if (foreign->referenced_table) {
+ os_dec_counter(dict_sys->mutex,
+ foreign->referenced_table
+ ->n_foreign_key_checks_running);
+
+ if (opened == TRUE) {
+ dict_table_close(foreign->referenced_table, TRUE, FALSE);
+ opened = FALSE;
+ }
+ }
+
+ if (err != DB_SUCCESS) {
+
+ goto func_exit;
+ }
+ }
+
+ foreign = UT_LIST_GET_NEXT(foreign_list, foreign);
+ }
+
+ err = DB_SUCCESS;
func_exit:
if (got_s_lock) {
row_mysql_unfreeze_data_dictionary(trx);
@@ -301,6 +464,7 @@ func_exit:
return(err);
}
+#endif /* WITH_WSREP */
/*********************************************************************//**
Creates an update node for a query graph.
@@ -1675,6 +1839,9 @@ row_upd_sec_index_entry(
index = node->index;
referenced = row_upd_index_is_referenced(index, trx);
+#ifdef WITH_WSREP
+ ibool foreign = wsrep_row_upd_index_is_foreign(index, trx);
+#endif /* WITH_WSREP */
heap = mem_heap_create(1024);
@@ -1805,6 +1972,9 @@ row_upd_sec_index_entry(
row_ins_sec_index_entry() below */
if (!rec_get_deleted_flag(
rec, dict_table_is_comp(index->table))) {
+#ifdef WITH_WSREP
+ que_node_t *parent = que_node_get_parent(node);
+#endif /* WITH_WSREP */
err = btr_cur_del_mark_set_sec_rec(
0, btr_cur, TRUE, thr, &mtr);
@@ -1822,6 +1992,37 @@ row_upd_sec_index_entry(
node, &pcur, index->table,
index, offsets, thr, &mtr);
}
+#ifdef WITH_WSREP
+ if (err == DB_SUCCESS && !referenced &&
+ !(parent && que_node_get_type(parent) ==
+ QUE_NODE_UPDATE &&
+ ((upd_node_t*)parent)->cascade_node == node) &&
+ foreign
+ ) {
+ ulint* offsets =
+ rec_get_offsets(
+ rec, index, NULL, ULINT_UNDEFINED,
+ &heap);
+ err = wsrep_row_upd_check_foreign_constraints(
+ node, &pcur, index->table,
+ index, offsets, thr, &mtr);
+ switch (err) {
+ case DB_SUCCESS:
+ case DB_NO_REFERENCED_ROW:
+ err = DB_SUCCESS;
+ break;
+ case DB_DEADLOCK:
+ if (wsrep_debug) fprintf (stderr,
+ "WSREP: sec index FK check fail for deadlock");
+ break;
+ default:
+ fprintf (stderr,
+ "WSREP: referenced FK check fail: %d",
+ (int)err);
+ break;
+ }
+ }
+#endif /* WITH_WSREP */
}
break;
}
@@ -1976,6 +2177,9 @@ row_upd_clust_rec_by_insert(
que_thr_t* thr, /*!< in: query thread */
ibool referenced,/*!< in: TRUE if index may be referenced in
a foreign key constraint */
+#ifdef WITH_WSREP
+ ibool foreign, /*!< in: TRUE if index is foreign key index */
+#endif /* WITH_WSREP */
mtr_t* mtr) /*!< in/out: mtr; gets committed here */
{
mem_heap_t* heap;
@@ -1989,6 +2193,9 @@ row_upd_clust_rec_by_insert(
rec_t* rec;
ulint* offsets = NULL;
+#ifdef WITH_WSREP
+ que_node_t *parent = que_node_get_parent(node);
+#endif /* WITH_WSREP */
ut_ad(node);
ut_ad(dict_index_is_clust(index));
@@ -2074,6 +2281,34 @@ err_exit:
goto err_exit;
}
}
+#ifdef WITH_WSREP
+ if (!referenced &&
+ !(parent && que_node_get_type(parent) == QUE_NODE_UPDATE &&
+ ((upd_node_t*)parent)->cascade_node == node) &&
+ foreign
+ ) {
+ err = wsrep_row_upd_check_foreign_constraints(
+ node, pcur, table, index, offsets, thr, mtr);
+ switch (err) {
+ case DB_SUCCESS:
+ case DB_NO_REFERENCED_ROW:
+ err = DB_SUCCESS;
+ break;
+ case DB_DEADLOCK:
+ if (wsrep_debug) fprintf (stderr,
+ "WSREP: insert FK check fail for deadlock");
+ break;
+ default:
+ fprintf (stderr,
+ "WSREP: referenced FK check fail: %d",
+ (int)err);
+ break;
+ }
+ if (err != DB_SUCCESS) {
+ goto err_exit;
+ }
+ }
+#endif /* WITH_WSREP */
}
mtr_commit(mtr);
@@ -2269,11 +2504,18 @@ row_upd_del_mark_clust_rec(
ibool referenced,
/*!< in: TRUE if index may be referenced in
a foreign key constraint */
+#ifdef WITH_WSREP
+ ibool foreign,/*!< in: TRUE if index is foreign key index */
+#endif /* WITH_WSREP */
mtr_t* mtr) /*!< in: mtr; gets committed here */
{
btr_pcur_t* pcur;
btr_cur_t* btr_cur;
dberr_t err;
+#ifdef WITH_WSREP
+ rec_t* rec;
+ que_node_t *parent = que_node_get_parent(node);
+#endif /* WITH_WSREP */
ut_ad(node);
ut_ad(dict_index_is_clust(index));
@@ -2290,8 +2532,16 @@ row_upd_del_mark_clust_rec(
/* Mark the clustered index record deleted; we do not have to check
locks, because we assume that we have an x-lock on the record */
+#ifdef WITH_WSREP
+ rec = btr_cur_get_rec(btr_cur);
+#endif /* WITH_WSREP */
+
err = btr_cur_del_mark_set_clust_rec(
+#ifdef WITH_WSREP
+ btr_cur_get_block(btr_cur), rec,
+#else
btr_cur_get_block(btr_cur), btr_cur_get_rec(btr_cur),
+#endif /* WITH_WSREP */
index, offsets, thr, mtr);
if (err == DB_SUCCESS && referenced) {
/* NOTE that the following call loses the position of pcur ! */
@@ -2299,6 +2549,32 @@ row_upd_del_mark_clust_rec(
err = row_upd_check_references_constraints(
node, pcur, index->table, index, offsets, thr, mtr);
}
+#ifdef WITH_WSREP
+ if (err == DB_SUCCESS && !referenced &&
+ !(parent && que_node_get_type(parent) == QUE_NODE_UPDATE &&
+ ((upd_node_t*)parent)->cascade_node == node) &&
+ thr_get_trx(thr) &&
+ foreign
+ ) {
+ err = wsrep_row_upd_check_foreign_constraints(
+ node, pcur, index->table, index, offsets, thr, mtr);
+ switch (err) {
+ case DB_SUCCESS:
+ case DB_NO_REFERENCED_ROW:
+ err = DB_SUCCESS;
+ break;
+ case DB_DEADLOCK:
+ if (wsrep_debug) fprintf (stderr,
+ "WSREP: clust rec FK check fail for deadlock");
+ break;
+ default:
+ fprintf (stderr,
+ "WSREP: clust rec referenced FK check fail: %d",
+ (int)err);
+ break;
+ }
+ }
+#endif /* WITH_WSREP */
mtr_commit(mtr);
@@ -2331,6 +2607,10 @@ row_upd_clust_step(
index = dict_table_get_first_index(node->table);
referenced = row_upd_index_is_referenced(index, thr_get_trx(thr));
+#ifdef WITH_WSREP
+ ibool foreign = wsrep_row_upd_index_is_foreign(
+ index, thr_get_trx(thr));
+#endif /* WITH_WSREP */
pcur = node->pcur;
@@ -2427,7 +2707,11 @@ row_upd_clust_step(
if (node->is_delete) {
err = row_upd_del_mark_clust_rec(
+#ifdef WITH_WSREP
+ node, index, offsets, thr, referenced, foreign, &mtr);
+#else
node, index, offsets, thr, referenced, &mtr);
+#endif /* WITH_WSREP */
if (err == DB_SUCCESS) {
node->state = UPD_NODE_UPDATE_ALL_SEC;
@@ -2472,7 +2756,11 @@ row_upd_clust_step(
externally! */
err = row_upd_clust_rec_by_insert(
+#ifdef WITH_WSREP
+ node, index, thr, referenced, foreign, &mtr);
+#else
node, index, thr, referenced, &mtr);
+#endif /* WITH_WSREP */
if (err != DB_SUCCESS) {
diff --git a/storage/xtradb/srv/srv0conc.cc b/storage/xtradb/srv/srv0conc.cc
index 6c15753246a..5868e4e012b 100644
--- a/storage/xtradb/srv/srv0conc.cc
+++ b/storage/xtradb/srv/srv0conc.cc
@@ -43,6 +43,10 @@ Created 2011/04/18 Sunny Bains
#include "trx0trx.h"
#include "mysql/plugin.h"
+#ifdef WITH_WSREP
+extern "C" int wsrep_trx_is_aborting(void *thd_ptr);
+extern my_bool wsrep_debug;
+#endif
/** Number of times a thread is allowed to enter InnoDB within the same
SQL query after it has once got the ticket. */
@@ -87,6 +91,9 @@ struct srv_conc_slot_t{
reserved may still be TRUE at that
point */
srv_conc_node_t srv_conc_queue; /*!< queue node */
+#ifdef WITH_WSREP
+ void *thd; /*!< to see priority */
+#endif
};
/** Queue of threads waiting to get in */
@@ -146,6 +153,9 @@ srv_conc_init(void)
conc_slot->event = os_event_create();
ut_a(conc_slot->event);
+#ifdef WITH_WSREP
+ conc_slot->thd = NULL;
+#endif /* WITH_WSREP */
}
#endif /* !HAVE_ATOMIC_BUILTINS */
}
@@ -203,6 +213,16 @@ srv_conc_enter_innodb_with_atomics(
for (;;) {
ulint sleep_in_us;
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ wsrep_trx_is_aborting(trx->mysql_thd)) {
+ if (wsrep_debug)
+ fprintf(stderr,
+ "srv_conc_enter due to MUST_ABORT");
+ srv_conc_force_enter_innodb(trx);
+ return;
+ }
+#endif /* WITH_WSREP */
if (srv_conc.n_active < (lint) srv_thread_concurrency) {
ulint n_active;
@@ -321,6 +341,9 @@ srv_conc_exit_innodb_without_atomics(
slot = NULL;
if (srv_conc.n_active < (lint) srv_thread_concurrency) {
+#ifdef WITH_WSREP
+ srv_conc_slot_t* wsrep_slot;
+#endif
/* Look for a slot where a thread is waiting and no other
thread has yet released the thread */
@@ -331,6 +354,19 @@ srv_conc_exit_innodb_without_atomics(
/* No op */
}
+#ifdef WITH_WSREP
+ /* look for aborting trx, they must be released asap */
+ wsrep_slot= slot;
+ while (wsrep_slot && (wsrep_slot->wait_ended == TRUE ||
+ !wsrep_trx_is_aborting(wsrep_slot->thd))) {
+ wsrep_slot = UT_LIST_GET_NEXT(srv_conc_queue, wsrep_slot);
+ }
+ if (wsrep_slot) {
+ slot = wsrep_slot;
+ if (wsrep_debug)
+ fprintf(stderr, "WSREP: releasing aborting thd\n");
+ }
+#endif
if (slot != NULL) {
slot->wait_ended = TRUE;
@@ -390,6 +426,13 @@ retry:
return;
}
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ wsrep_thd_is_brute_force(trx->mysql_thd)) {
+ srv_conc_force_enter_innodb(trx);
+ return;
+ }
+#endif
/* If the transaction is not holding resources, let it sleep
for srv_thread_sleep_delay microseconds, and try again then */
@@ -457,6 +500,9 @@ retry:
/* Add to the queue */
slot->reserved = TRUE;
slot->wait_ended = FALSE;
+#ifdef WITH_WSREP
+ slot->thd = trx->mysql_thd;
+#endif
UT_LIST_ADD_LAST(srv_conc_queue, srv_conc_queue, slot);
@@ -464,6 +510,18 @@ retry:
srv_conc.n_waiting++;
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ wsrep_trx_is_aborting(trx->mysql_thd)) {
+ os_fast_mutex_unlock(&srv_conc_mutex);
+ if (wsrep_debug)
+ fprintf(stderr, "srv_conc_enter due to MUST_ABORT");
+ trx->declared_to_be_inside_innodb = TRUE;
+ trx->n_tickets_to_enter_innodb = srv_n_free_tickets_to_enter;
+ return;
+ }
+ trx->wsrep_event = slot->event;
+#endif /* WITH_WSREP */
os_fast_mutex_unlock(&srv_conc_mutex);
/* Go to wait for the event; when a thread leaves InnoDB it will
@@ -487,6 +545,9 @@ retry:
os_event_wait(slot->event);
thd_wait_end(trx->mysql_thd);
+#ifdef WITH_WSREP
+ trx->wsrep_event = NULL;
+#endif /* WITH_WSREP */
trx->op_info = "";
@@ -504,6 +565,9 @@ retry:
incremented the thread counter on behalf of this thread */
slot->reserved = FALSE;
+#ifdef WITH_WSREP
+ slot->thd = NULL;
+#endif
UT_LIST_REMOVE(srv_conc_queue, srv_conc_queue, slot);
@@ -614,5 +678,32 @@ srv_conc_get_active_threads(void)
/*==============================*/
{
return(srv_conc.n_active);
- }
+}
+
+#ifdef WITH_WSREP
+UNIV_INTERN
+void
+wsrep_srv_conc_cancel_wait(
+/*==================*/
+ trx_t* trx) /*!< in: transaction object associated with the
+ thread */
+{
+#ifdef HAVE_ATOMIC_BUILTINS
+ /* aborting transactions will enter innodb by force in
+ srv_conc_enter_innodb_with_atomics(). No need to cancel here,
+ thr will wake up after os_sleep and let to enter innodb
+ */
+ if (wsrep_debug)
+ fprintf(stderr, "WSREP: conc slot cancel, no atomics\n");
+#else
+ os_fast_mutex_lock(&srv_conc_mutex);
+ if (trx->wsrep_event) {
+ if (wsrep_debug)
+ fprintf(stderr, "WSREP: conc slot cancel\n");
+ os_event_set(trx->wsrep_event);
+ }
+ os_fast_mutex_unlock(&srv_conc_mutex);
+#endif
+}
+#endif /* WITH_WSREP */
diff --git a/storage/xtradb/srv/srv0srv.cc b/storage/xtradb/srv/srv0srv.cc
index 8e01ea7402e..d421980d41a 100644
--- a/storage/xtradb/srv/srv0srv.cc
+++ b/storage/xtradb/srv/srv0srv.cc
@@ -84,6 +84,14 @@ ulong innobase_thd_get_thread_id(const void* thd);
/* prototypes for new functions added to ha_innodb.cc */
ibool innobase_get_slow_log();
+#ifdef WITH_WSREP
+extern int wsrep_debug;
+extern int wsrep_trx_is_aborting(void *thd_ptr);
+#endif
+/* The following counter is incremented whenever there is some user activity
+in the server */
+UNIV_INTERN ulint srv_activity_count = 0;
+
/* The following is the maximum allowed duration of a lock wait. */
UNIV_INTERN ulint srv_fatal_semaphore_wait_threshold = 600;
@@ -254,6 +262,10 @@ srv_printf_innodb_monitor() will request mutex acquisition
with mutex_enter(), which will wait until it gets the mutex. */
#define MUTEX_NOWAIT(mutex_skipped) ((mutex_skipped) < MAX_MUTEX_NOWAIT)
+#ifdef WITH_INNODB_DISALLOW_WRITES
+UNIV_INTERN os_event_t srv_allow_writes_event;
+#endif /* WITH_INNODB_DISALLOW_WRITES */
+
/** The sort order table of the MySQL latin1_swedish_ci character set
collation */
UNIV_INTERN const byte* srv_latin1_ordering;
@@ -1150,6 +1162,14 @@ srv_init(void)
dict_ind_init();
srv_conc_init();
+#ifdef WITH_INNODB_DISALLOW_WRITES
+ /* Writes have to be enabled on init or else we hang. Thus, we
+ always set the event here regardless of innobase_disallow_writes.
+ That flag will always be 0 at this point because it isn't settable
+ via my.cnf or command line arg. */
+ srv_allow_writes_event = os_event_create();
+ os_event_set(srv_allow_writes_event);
+#endif /* WITH_INNODB_DISALLOW_WRITES */
/* Initialize some INFORMATION SCHEMA internal structures */
trx_i_s_cache_init(trx_i_s_cache);
@@ -2158,7 +2178,20 @@ loop:
if (sync_array_print_long_waits(&waiter, &sema)
&& sema == old_sema && os_thread_eq(waiter, old_waiter)) {
+#if defined(WITH_WSREP) && defined(WITH_INNODB_DISALLOW_WRITES)
+ if (srv_allow_writes_event->is_set) {
+#endif /* WITH_WSREP */
fatal_cnt++;
+#if defined(WITH_WSREP) && defined(WITH_INNODB_DISALLOW_WRITES)
+ } else {
+ fprintf(stderr,
+ "WSREP: avoiding InnoDB self crash due to long "
+ "semaphore wait of > %lu seconds\n"
+ "Server is processing SST donor operation, "
+ "fatal_cnt now: %lu",
+ (ulong) srv_fatal_semaphore_wait_threshold, fatal_cnt);
+ }
+#endif /* WITH_WSREP */
if (fatal_cnt > 10) {
fprintf(stderr,
diff --git a/storage/xtradb/trx/trx0roll.cc b/storage/xtradb/trx/trx0roll.cc
index 1089607c6d1..eb2af877a6d 100644
--- a/storage/xtradb/trx/trx0roll.cc
+++ b/storage/xtradb/trx/trx0roll.cc
@@ -45,6 +45,9 @@ Created 3/26/1996 Heikki Tuuri
#include "pars0pars.h"
#include "srv0mon.h"
#include "trx0sys.h"
+#ifdef WITH_WSREP
+#include "ha_prototypes.h"
+#endif /* WITH_WSREP */
/** This many pages must be undone before a truncate is tried within
rollback */
@@ -382,6 +385,13 @@ trx_rollback_to_savepoint_for_mysql_low(
trx->op_info = "";
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd) &&
+ trx->lock.was_chosen_as_deadlock_victim) {
+ trx->lock.was_chosen_as_deadlock_victim = FALSE;
+ }
+#endif
+
return(err);
}
@@ -1020,6 +1030,12 @@ trx_roll_try_truncate(
if (trx->update_undo) {
trx_undo_truncate_end(trx, trx->update_undo, limit);
}
+
+#ifdef WITH_WSREP_OUT
+ if (wsrep_on(trx->mysql_thd)) {
+ trx->lock.was_chosen_as_deadlock_victim = FALSE;
+ }
+#endif /* WITH_WSREP */
}
/***********************************************************************//**
diff --git a/storage/xtradb/trx/trx0sys.cc b/storage/xtradb/trx/trx0sys.cc
index aab99a793f6..19d14bf6f38 100644
--- a/storage/xtradb/trx/trx0sys.cc
+++ b/storage/xtradb/trx/trx0sys.cc
@@ -44,6 +44,10 @@ Created 3/26/1996 Heikki Tuuri
#include "os0file.h"
#include "read0read.h"
+#ifdef WITH_WSREP
+#include "ha_prototypes.h" /* wsrep_is_wsrep_xid() */
+#endif /* */
+
/** The file format tag structure with id and name. */
struct file_format_t {
ulint id; /*!< id of the file format */
@@ -174,7 +178,12 @@ trx_sys_flush_max_trx_id(void)
mtr_t mtr;
trx_sysf_t* sys_header;
+#ifndef WITH_WSREP
+ /* wsrep_fake_trx_id violates this assert
+ * Copied from trx_sys_get_new_trx_id
+ */
ut_ad(mutex_own(&trx_sys->mutex));
+#endif /* WITH_WSREP */
if (!srv_read_only_mode) {
mtr_start(&mtr);
@@ -202,9 +211,14 @@ trx_sys_update_mysql_binlog_offset(
ib_int64_t offset, /*!< in: position in that log file */
ulint field, /*!< in: offset of the MySQL log info field in
the trx sys header */
+#ifdef WITH_WSREP
+ trx_sysf_t* sys_header, /*!< in: trx sys header */
+#endif /* WITH_WSREP */
mtr_t* mtr) /*!< in: mtr */
{
+#ifndef WITH_WSREP
trx_sysf_t* sys_header;
+#endif /* !WITH_WSREP */
if (ut_strlen(file_name) >= TRX_SYS_MYSQL_LOG_NAME_LEN) {
@@ -213,7 +227,9 @@ trx_sys_update_mysql_binlog_offset(
return;
}
+#ifndef WITH_WSREP
sys_header = trx_sysf_get(mtr);
+#endif /* !WITH_WSREP */
if (mach_read_from_4(sys_header + field
+ TRX_SYS_MYSQL_LOG_MAGIC_N_FLD)
@@ -300,6 +316,124 @@ trx_sys_print_mysql_binlog_offset(void)
mtr_commit(&mtr);
}
+#ifdef WITH_WSREP
+
+#ifdef UNIV_DEBUG
+static long long trx_sys_cur_xid_seqno = -1;
+static unsigned char trx_sys_cur_xid_uuid[16];
+
+long long read_wsrep_xid_seqno(const XID* xid)
+{
+ long long seqno;
+ memcpy(&seqno, xid->data + 24, sizeof(long long));
+ return seqno;
+}
+
+void read_wsrep_xid_uuid(const XID* xid, unsigned char* buf)
+{
+ memcpy(buf, xid->data + 8, 16);
+}
+
+#endif /* UNIV_DEBUG */
+
+void
+trx_sys_update_wsrep_checkpoint(
+ const XID* xid, /*!< in: transaction XID */
+ trx_sysf_t* sys_header, /*!< in: sys_header */
+ mtr_t* mtr) /*!< in: mtr */
+{
+#ifdef UNIV_DEBUG
+ {
+ /* Check that seqno is monotonically increasing */
+ unsigned char xid_uuid[16];
+ long long xid_seqno = read_wsrep_xid_seqno(xid);
+ read_wsrep_xid_uuid(xid, xid_uuid);
+ if (!memcmp(xid_uuid, trx_sys_cur_xid_uuid, 8))
+ {
+ ut_ad(xid_seqno > trx_sys_cur_xid_seqno);
+ trx_sys_cur_xid_seqno = xid_seqno;
+ }
+ else
+ {
+ memcpy(trx_sys_cur_xid_uuid, xid_uuid, 16);
+ }
+ trx_sys_cur_xid_seqno = xid_seqno;
+ }
+#endif /* UNIV_DEBUG */
+
+ ut_ad(xid && mtr);
+ ut_a(xid->formatID == -1 || wsrep_is_wsrep_xid((const void *)xid));
+
+ if (mach_read_from_4(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_MAGIC_N_FLD)
+ != TRX_SYS_WSREP_XID_MAGIC_N) {
+ mlog_write_ulint(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_MAGIC_N_FLD,
+ TRX_SYS_WSREP_XID_MAGIC_N,
+ MLOG_4BYTES, mtr);
+ }
+
+ mlog_write_ulint(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_FORMAT,
+ (int)xid->formatID,
+ MLOG_4BYTES, mtr);
+ mlog_write_ulint(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_GTRID_LEN,
+ (int)xid->gtrid_length,
+ MLOG_4BYTES, mtr);
+ mlog_write_ulint(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_BQUAL_LEN,
+ (int)xid->bqual_length,
+ MLOG_4BYTES, mtr);
+ mlog_write_string(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_DATA,
+ (const unsigned char*) xid->data,
+ XIDDATASIZE, mtr);
+
+}
+
+void
+trx_sys_read_wsrep_checkpoint(XID* xid)
+/*===================================*/
+{
+ trx_sysf_t* sys_header;
+ mtr_t mtr;
+ ulint magic;
+
+ ut_ad(xid);
+
+ mtr_start(&mtr);
+
+ sys_header = trx_sysf_get(&mtr);
+
+ if ((magic = mach_read_from_4(sys_header + TRX_SYS_WSREP_XID_INFO
+ + TRX_SYS_WSREP_XID_MAGIC_N_FLD))
+ != TRX_SYS_WSREP_XID_MAGIC_N) {
+ memset(xid, 0, sizeof(*xid));
+ xid->formatID = -1;
+ trx_sys_update_wsrep_checkpoint(xid, sys_header, &mtr);
+ mtr_commit(&mtr);
+ return;
+ }
+
+ xid->formatID = (int)mach_read_from_4(
+ sys_header
+ + TRX_SYS_WSREP_XID_INFO + TRX_SYS_WSREP_XID_FORMAT);
+ xid->gtrid_length = (int)mach_read_from_4(
+ sys_header
+ + TRX_SYS_WSREP_XID_INFO + TRX_SYS_WSREP_XID_GTRID_LEN);
+ xid->bqual_length = (int)mach_read_from_4(
+ sys_header
+ + TRX_SYS_WSREP_XID_INFO + TRX_SYS_WSREP_XID_BQUAL_LEN);
+ ut_memcpy(xid->data,
+ sys_header + TRX_SYS_WSREP_XID_INFO + TRX_SYS_WSREP_XID_DATA,
+ XIDDATASIZE);
+
+ mtr_commit(&mtr);
+}
+
+#endif /* WITH_WSREP */
+
/*****************************************************************//**
Prints to stderr the MySQL master log offset info in the trx system header if
the magic number shows it valid. */
diff --git a/storage/xtradb/trx/trx0trx.cc b/storage/xtradb/trx/trx0trx.cc
index f2c78bafd86..d26517637cd 100644
--- a/storage/xtradb/trx/trx0trx.cc
+++ b/storage/xtradb/trx/trx0trx.cc
@@ -294,6 +294,9 @@ trx_create(void)
trx->lock.table_locks = ib_vector_create(
heap_alloc, sizeof(void**), 32);
+#ifdef WITH_WSREP
+ trx->wsrep_event = NULL;
+#endif /* WITH_WSREP */
return(trx);
}
@@ -1041,6 +1044,11 @@ trx_start_low(
srv_undo_logs, srv_undo_tablespaces);
}
+#ifdef WITH_WSREP
+ memset(&trx->xid, 0, sizeof(trx->xid));
+ trx->xid.formatID = -1;
+#endif /* WITH_WSREP */
+
/* The initial value for trx->no: TRX_ID_MAX is used in
read_view_open_now: */
@@ -1166,6 +1174,9 @@ trx_write_serialisation_history(
trx_t* trx, /*!< in/out: transaction */
mtr_t* mtr) /*!< in/out: mini-transaction */
{
+#ifdef WITH_WSREP
+ trx_sysf_t* sys_header;
+#endif /* WITH_WSREP */
trx_rseg_t* rseg;
rseg = trx->rseg;
@@ -1212,6 +1223,15 @@ trx_write_serialisation_history(
MONITOR_INC(MONITOR_TRX_COMMIT_UNDO);
+#ifdef WITH_WSREP
+ sys_header = trx_sysf_get(mtr);
+ /* Update latest MySQL wsrep XID in trx sys header. */
+ if (wsrep_is_wsrep_xid((const void *)&trx->xid))
+ {
+ trx_sys_update_wsrep_checkpoint(&trx->xid, sys_header, mtr);
+ }
+#endif /* WITH_WSREP */
+
/* Update the latest MySQL binlog name and offset info
in trx sys header if MySQL binlogging is on or the database
server is a MySQL replication slave */
@@ -1222,7 +1242,11 @@ trx_write_serialisation_history(
trx_sys_update_mysql_binlog_offset(
trx->mysql_log_file_name,
trx->mysql_log_offset,
- TRX_SYS_MYSQL_LOG_INFO, mtr);
+ TRX_SYS_MYSQL_LOG_INFO,
+#ifdef WITH_WSREP
+ sys_header,
+#endif /* WITH_WSREP */
+ mtr);
trx->mysql_log_file_name = NULL;
}
@@ -1527,6 +1551,11 @@ trx_commit_in_memory(
ut_ad(!trx->in_ro_trx_list);
ut_ad(!trx->in_rw_trx_list);
+#ifdef WITH_WSREP
+ if (wsrep_on(trx->mysql_thd)) {
+ trx->lock.was_chosen_as_deadlock_victim = FALSE;
+ }
+#endif
trx->dict_operation = TRX_DICT_OP_NONE;
trx->error_state = DB_SUCCESS;
@@ -1709,6 +1738,10 @@ trx_commit_or_rollback_prepare(
switch (trx->state) {
case TRX_STATE_NOT_STARTED:
+#ifdef WITH_WSREP
+ ut_d(trx->start_file = __FILE__);
+ ut_d(trx->start_line = __LINE__);
+#endif /* WITH_WSREP */
trx_start_low(trx);
/* fall through */
case TRX_STATE_ACTIVE:
@@ -2480,6 +2513,10 @@ trx_start_if_not_started_low(
{
switch (trx->state) {
case TRX_STATE_NOT_STARTED:
+#ifdef WITH_WSREP
+ ut_d(trx->start_file = __FILE__);
+ ut_d(trx->start_line = __LINE__);
+#endif /* WITH_WSREP */
trx_start_low(trx);
/* fall through */
case TRX_STATE_ACTIVE:
@@ -2514,6 +2551,10 @@ trx_start_for_ddl_low(
trx->ddl = true;
+#ifdef WITH_WSREP
+ ut_d(trx->start_file = __FILE__);
+ ut_d(trx->start_line = __LINE__);
+#endif /* WITH_WSREP */
trx_start_low(trx);
return;
diff --git a/storage/xtradb/wsrep/md5.cc b/storage/xtradb/wsrep/md5.cc
new file mode 100644
index 00000000000..30be6ab7dd3
--- /dev/null
+++ b/storage/xtradb/wsrep/md5.cc
@@ -0,0 +1,74 @@
+/*
+ Copyright (c) 2014 SkySQL AB.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+
+#ifdef WITH_WSREP
+
+#if defined(HAVE_YASSL)
+#include "my_config.h"
+#include "md5.hpp"
+#elif defined(HAVE_OPENSSL)
+#include <openssl/md5.h>
+#endif /* HAVE_YASSL */
+
+/* Initialize md5 object. */
+void *wsrep_md5_init()
+{
+#if defined(HAVE_YASSL)
+ TaoCrypt::MD5 *hasher= new TaoCrypt::MD5;
+ return (void*)hasher;
+#elif defined(HAVE_OPENSSL)
+ MD5_CTX *ctx = new MD5_CTX();
+ MD5_Init (ctx);
+ return (void *)ctx;
+#endif /* HAVE_YASSL */
+}
+
+/**
+ Supply message to be hashed.
+
+ @param ctx [IN] Pointer to MD5 context
+ @param buf [IN] Message to be computed.
+ @param len [IN] Length of the message.
+*/
+void wsrep_md5_update(void *ctx, char* buf, int len)
+{
+#if defined(HAVE_YASSL)
+ ((TaoCrypt::MD5 *)ctx)->Update((TaoCrypt::byte *) buf, len);
+#elif defined(HAVE_OPENSSL)
+ MD5_Update((MD5_CTX*)(ctx), buf, len);
+#endif /* HAVE_YASSL */
+}
+
+/**
+ Place computed MD5 digest into the given buffer.
+
+ @param digest [OUT] Computed MD5 digest
+ @param ctx [IN] Pointer to MD5 context
+*/
+void wsrep_compute_md5_hash(char *digest, void *ctx)
+{
+#if defined(HAVE_YASSL)
+ ((TaoCrypt::MD5*)ctx)->Final((TaoCrypt::byte *) digest);
+ delete (TaoCrypt::MD5*)ctx;
+#elif defined(HAVE_OPENSSL)
+ MD5_Final ((unsigned char*)digest, (MD5_CTX*)ctx);
+ delete (MD5_CTX*)ctx;
+#endif /* HAVE_YASSL */
+}
+
+#endif /* WITH_WSREP */
+
diff --git a/storage/xtradb/wsrep/wsrep_md5.h b/storage/xtradb/wsrep/wsrep_md5.h
new file mode 100644
index 00000000000..1339f7f4b86
--- /dev/null
+++ b/storage/xtradb/wsrep/wsrep_md5.h
@@ -0,0 +1,26 @@
+#ifndef WSREP_MD5_INCLUDED
+#define WSREP_MD5_INCLUDED
+
+/* Copyright (c) 2014 SkySQL AB.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
+*/
+
+#ifdef WITH_WSREP
+void *wsrep_md5_init();
+void wsrep_md5_update(void *ctx, char* buf, int len);
+void wsrep_compute_md5_hash(char *digest, void *ctx);
+#endif /* WITH_WSREP */
+
+#endif /* WSREP_MD5_INCLUDED */