summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorSeppo Jaakola <seppo.jaakola@codership.com>2012-04-13 01:33:24 +0300
committerSeppo Jaakola <seppo.jaakola@codership.com>2012-04-13 01:33:24 +0300
commit2fc1ec43560b453b4694adbc1aac11f3f23b1761 (patch)
tree880c3875dae28574a90bf891ef901027723d21f6 /sql
parent51c77ec5d406843bb8c8131f0687f4f75839d045 (diff)
downloadmariadb-git-2fc1ec43560b453b4694adbc1aac11f3f23b1761.tar.gz
Initial push of codership-wsrep API implementation for MariaDB.
Merge of: lp:maria/5.5, #3334: http://bazaar.launchpad.net/~maria-captains/maria/5.5/revision/3334 lp:codership-mysql/5.5, #3725: http://bazaar.launchpad.net/~codership/codership-mysql/wsrep-5.5/revision/3725
Diffstat (limited to 'sql')
-rw-r--r--sql/CMakeLists.txt20
-rw-r--r--sql/events.cc12
-rw-r--r--sql/handler.cc107
-rw-r--r--sql/handler.h14
-rw-r--r--sql/item_func.cc12
-rw-r--r--sql/lock.cc50
-rw-r--r--sql/log.cc240
-rw-r--r--sql/log.h26
-rw-r--r--sql/log_event.cc151
-rw-r--r--sql/mdl.cc98
-rw-r--r--sql/mysqld.cc676
-rw-r--r--sql/mysqld.h17
-rw-r--r--sql/protocol.cc8
-rw-r--r--sql/set_var.h6
-rw-r--r--sql/slave.cc8
-rw-r--r--sql/sp.cc34
-rw-r--r--sql/sql_alter.cc17
-rw-r--r--sql/sql_base.cc33
-rw-r--r--sql/sql_builtin.cc.in9
-rw-r--r--sql/sql_class.cc251
-rw-r--r--sql/sql_class.h87
-rw-r--r--sql/sql_connect.cc39
-rw-r--r--sql/sql_delete.cc12
-rw-r--r--sql/sql_insert.cc22
-rw-r--r--sql/sql_lex.cc11
-rw-r--r--sql/sql_parse.cc1075
-rw-r--r--sql/sql_plugin.cc6
-rw-r--r--sql/sql_reload.cc13
-rw-r--r--sql/sql_repl.cc14
-rw-r--r--sql/sql_show.cc6
-rw-r--r--sql/sql_table.cc11
-rw-r--r--sql/sql_trigger.cc52
-rw-r--r--sql/sql_truncate.cc10
-rw-r--r--sql/sql_update.cc12
-rw-r--r--sql/sys_vars.cc200
-rw-r--r--sql/transaction.cc48
36 files changed, 3359 insertions, 48 deletions
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt
index 22c51f3ce23..f74931388bb 100644
--- a/sql/CMakeLists.txt
+++ b/sql/CMakeLists.txt
@@ -13,6 +13,10 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+IF(WITH_WSREP)
+ SET(WSREP_INCLUDES ${CMAKE_SOURCE_DIR}/wsrep)
+ENDIF()
+
INCLUDE_DIRECTORIES(
${CMAKE_SOURCE_DIR}/include
${CMAKE_SOURCE_DIR}/sql
@@ -20,6 +24,7 @@ ${CMAKE_SOURCE_DIR}/regex
${ZLIB_INCLUDE_DIR}
${SSL_INCLUDE_DIRS}
${CMAKE_BINARY_DIR}/sql
+${WSREP_INCLUDES}
)
SET(GEN_SOURCES
@@ -35,6 +40,18 @@ ADD_DEFINITIONS(-DMYSQL_SERVER -DHAVE_EVENT_SCHEDULER -DHAVE_POOL_OF_THREADS)
IF(SSL_DEFINES)
ADD_DEFINITIONS(${SSL_DEFINES})
ENDIF()
+IF(WITH_WSREP)
+ SET(WSREP_SOURCES
+ wsrep_check_opts.cc
+ wsrep_hton.cc
+ wsrep_mysqld.cc
+ wsrep_notify.cc
+ wsrep_sst.cc
+ wsrep_utils.cc
+ wsrep_var.cc
+ )
+ SET(WSREP_LIB wsrep)
+ENDIF()
SET (SQL_SOURCE
../sql-common/client.c derror.cc des_key_file.cc
@@ -86,6 +103,7 @@ SET (SQL_SOURCE
threadpool_common.cc
../sql-common/mysql_async.c
sql_logger.cc
+ ${WSREP_SOURCES}
${GEN_SOURCES}
${MYSYS_LIBWRAP_SOURCE}
)
@@ -105,6 +123,7 @@ DTRACE_INSTRUMENT(sql)
TARGET_LINK_LIBRARIES(sql ${MYSQLD_STATIC_PLUGIN_LIBS}
mysys dbug strings vio regex
${LIBWRAP} ${LIBCRYPT} ${LIBDL}
+ ${WSREP_LIB}
${SSL_LIBRARIES})
IF(WIN32)
@@ -200,7 +219,6 @@ IF (NOT ${CMAKE_CURRENT_SOURCE_DIR} STREQUAL ${CMAKE_CURRENT_BINARY_DIR})
ENDIF()
ENDIF()
-
INCLUDE(${CMAKE_SOURCE_DIR}/cmake/bison.cmake)
RUN_BISON(
${CMAKE_CURRENT_SOURCE_DIR}/sql_yacc.yy
diff --git a/sql/events.cc b/sql/events.cc
index 8b4bab9e3a6..208914a4e6d 100644
--- a/sql/events.cc
+++ b/sql/events.cc
@@ -1145,7 +1145,19 @@ end:
close_mysql_tables(thd);
DBUG_RETURN(ret);
}
+#ifdef WITH_WSREP
+int wsrep_create_event_query(THD *thd, uchar** buf, uint* buf_len)
+{
+ String log_query;
+ if (create_query_string(thd, &log_query))
+ {
+ WSREP_WARN("events create string failed: %s", thd->query());
+ return 1;
+ }
+ return wsrep_to_buf_helper(thd, log_query.ptr(), log_query.length(), buf, buf_len);
+}
+#endif /* WITH_WSREP */
/**
@} (End of group Event_Scheduler)
*/
diff --git a/sql/handler.cc b/sql/handler.cc
index 0310180759c..c04759b196f 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -52,6 +52,9 @@
#include "../storage/maria/ha_maria.h"
#endif
+#ifdef WITH_WSREP
+#include "wsrep_mysqld.h"
+#endif
/*
While we have legacy_db_type, we have this array to
check for dups and to find handlerton from legacy_db_type.
@@ -1192,7 +1195,11 @@ int ha_commit_trans(THD *thd, bool all)
{
/* Free resources and perform other cleanup even for 'empty' transactions. */
if (is_real_trans)
- thd->transaction.cleanup();
+#ifdef WITH_WSREP
+ thd->transaction.cleanup(thd);
+#else
+ thd->transaction.cleanup();
+#endif /* WITH_WSREP */
DBUG_RETURN(0);
}
@@ -1220,7 +1227,12 @@ int ha_commit_trans(THD *thd, bool all)
mdl_request.init(MDL_key::COMMIT, "", "", MDL_INTENTION_EXCLUSIVE,
MDL_EXPLICIT);
+#ifdef WITH_WSREP
+ if (!WSREP(thd) &&
+ thd->mdl_context.acquire_lock(&mdl_request,
+#else
if (thd->mdl_context.acquire_lock(&mdl_request,
+#endif /* WITH_WSREP */
thd->variables.lock_wait_timeout))
{
ha_rollback_trans(thd, all);
@@ -1267,6 +1279,19 @@ int ha_commit_trans(THD *thd, bool all)
err= ht->prepare(ht, thd, all);
status_var_increment(thd->status_var.ha_prepare_count);
if (err)
+#ifdef WITH_WSREP
+ if (WSREP(thd) && ht->db_type== DB_TYPE_WSREP)
+ {
+ error= 1;
+ /* avoid sending error, if we need to replay */
+ if (thd->wsrep_conflict_state!= MUST_REPLAY)
+ {
+ my_error(ER_LOCK_DEADLOCK, MYF(0), err);
+ }
+ }
+ else
+ /* not wsrep hton, bail to native mysql behavior */
+#endif
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
if (err)
@@ -1277,6 +1302,13 @@ int ha_commit_trans(THD *thd, bool all)
}
DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_SUICIDE(););
+#ifdef WITH_WSREP
+ if (!error && wsrep_is_wsrep_xid(&thd->transaction.xid_state.xid))
+ {
+ // xid was rewritten by wsrep
+ xid= wsrep_xid_seqno(&thd->transaction.xid_state.xid);
+ }
+#endif // WITH_WSREP
if (!is_real_trans)
{
error= commit_one_phase_2(thd, all, trans, is_real_trans);
@@ -1363,6 +1395,18 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans)
int error= 0;
Ha_trx_info *ha_info= trans->ha_list, *ha_info_next;
DBUG_ENTER("commit_one_phase_2");
+#ifdef WITH_WSREP
+#ifdef WSREP_PROC_INFO
+ char info[64]= { 0, };
+ snprintf (info, sizeof(info) - 1, "ha_commit_one_phase(%lld)",
+ (long long)thd->wsrep_trx_seqno);
+#else
+ const char info[]="ha_commit_one_phase()";
+#endif /* WSREP_PROC_INFO */
+ char* tmp_info= NULL;
+ if (WSREP(thd)) tmp_info= (char *)thd_proc_info(thd, info);
+#endif /* WITH_WSREP */
+
if (ha_info)
{
for (; ha_info; ha_info= ha_info_next)
@@ -1391,7 +1435,14 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans)
}
/* Free resources and perform other cleanup even for 'empty' transactions. */
if (is_real_trans)
+#ifdef WITH_WSREP
+ thd->transaction.cleanup(thd);
+#else
thd->transaction.cleanup();
+#endif /* WITH_WSREP */
+#ifdef WITH_WSREP
+ if (WSREP(thd)) thd_proc_info(thd, tmp_info);
+#endif /* WITH_WSREP */
DBUG_RETURN(error);
}
@@ -1466,7 +1517,11 @@ int ha_rollback_trans(THD *thd, bool all)
}
/* Always cleanup. Even if nht==0. There may be savepoints. */
if (is_real_trans)
+#ifdef WITH_WSREP
+ thd->transaction.cleanup(thd);
+#else
thd->transaction.cleanup();
+#endif /* WITH_WSREP */
if (all)
thd->transaction_rollback_request= FALSE;
@@ -1631,7 +1686,13 @@ static my_bool xarecover_handlerton(THD *unused, plugin_ref plugin,
got, hton_name(hton)->str);
for (int i=0; i < got; i ++)
{
+#ifdef WITH_WSREP
+ my_xid x=(wsrep_is_wsrep_xid(&info->list[i]) ?
+ wsrep_xid_seqno(&info->list[i]) :
+ info->list[i].get_my_xid());
+#else
my_xid x=info->list[i].get_my_xid();
+#endif /* WITH_WSREP */
if (!x) // not "mine" - that is generated by external TM
{
#ifndef DBUG_OFF
@@ -2635,7 +2696,12 @@ int handler::update_auto_increment()
variables->auto_increment_increment);
auto_inc_intervals_count++;
/* Row-based replication does not need to store intervals in binlog */
+#ifdef WITH_WSREP
+ if (((WSREP(thd) && wsrep_emulate_bin_log) || mysql_bin_log.is_open()) &&
+ !thd->is_current_stmt_binlog_format_row())
+#else
if (mysql_bin_log.is_open() && !thd->is_current_stmt_binlog_format_row())
+#endif /* WITH_WSREP */
thd->auto_inc_intervals_in_cur_stmt_for_binlog.append(auto_inc_interval_for_cur_row.minimum(),
auto_inc_interval_for_cur_row.values(),
variables->auto_increment_increment);
@@ -4821,7 +4887,11 @@ static bool check_table_binlog_row_based(THD *thd, TABLE *table)
return (thd->is_current_stmt_binlog_format_row() &&
table->s->cached_row_logging_check &&
(thd->variables.option_bits & OPTION_BIN_LOG) &&
+#ifdef WITH_WSREP
+ ((WSREP(thd) && wsrep_emulate_bin_log) || mysql_bin_log.is_open()));
+#else
mysql_bin_log.is_open());
+#endif
}
@@ -5142,6 +5212,41 @@ void signal_log_not_needed(struct handlerton, char *log_file)
}
+#ifdef WITH_WSREP
+/**
+ @details
+ This function makes the storage engine to force the victim transaction
+ to abort. Currently, only innodb has this functionality, but any SE
+ implementing the wsrep API should provide this service to support
+ multi-master operation.
+
+ @param bf_thd brute force THD asking for the abort
+ @param victim_thd victim THD to be aborted
+
+ @return
+ always 0
+*/
+
+int ha_wsrep_abort_transaction(THD *bf_thd, THD *victim_thd, my_bool signal)
+{
+ DBUG_ENTER("ha_wsrep_abort_transaction");
+ if (!WSREP(bf_thd)) {
+ DBUG_RETURN(0);
+ }
+
+ handlerton *hton= installed_htons[DB_TYPE_INNODB];
+ if (hton && hton->wsrep_abort_transaction)
+ {
+ hton->wsrep_abort_transaction(hton, bf_thd, victim_thd, signal);
+ }
+ else
+ {
+ WSREP_WARN("cannot abort InnoDB transaction");
+ }
+
+ DBUG_RETURN(0);
+}
+#endif /* WITH_WSREP */
#ifdef TRANS_LOG_MGM_EXAMPLE_CODE
/*
Example of transaction log management functions based on assumption that logs
diff --git a/sql/handler.h b/sql/handler.h
index d56e3242ddd..4ac7202587e 100644
--- a/sql/handler.h
+++ b/sql/handler.h
@@ -355,6 +355,7 @@ enum legacy_db_type
DB_TYPE_MARIA,
/** Performance schema engine. */
DB_TYPE_PERFORMANCE_SCHEMA,
+ DB_TYPE_WSREP,
DB_TYPE_FIRST_DYNAMIC=42,
DB_TYPE_DEFAULT=127 // Must be last
};
@@ -1042,6 +1043,10 @@ struct handlerton
const char *wild, bool dir, List<LEX_STRING> *files);
int (*table_exists_in_engine)(handlerton *hton, THD* thd, const char *db,
const char *name);
+ int (*wsrep_abort_transaction)(handlerton *hton, THD *bf_thd,
+ THD *victim_thd, my_bool signal);
+ int (*wsrep_set_checkpoint)(handlerton *hton, const XID* xid);
+ int (*wsrep_get_checkpoint)(handlerton *hton, XID* xid);
uint32 license; /* Flag for Engine License */
/*
Optional clauses in the CREATE/ALTER TABLE
@@ -2885,6 +2890,9 @@ bool key_uses_partial_cols(TABLE *table, uint keyno);
extern const char *ha_row_type[];
extern MYSQL_PLUGIN_IMPORT const char *tx_isolation_names[];
extern MYSQL_PLUGIN_IMPORT const char *binlog_format_names[];
+#ifdef WITH_WSREP
+extern MYSQL_PLUGIN_IMPORT const char *wsrep_binlog_format_names[];
+#endif /* WITH_WSREP */
extern TYPELIB tx_isolation_typelib;
extern const char *myisam_stats_method_names[];
extern ulong total_ha, total_ha_2pc;
@@ -2980,6 +2988,9 @@ int ha_enable_transaction(THD *thd, bool on);
int ha_rollback_to_savepoint(THD *thd, SAVEPOINT *sv);
int ha_savepoint(THD *thd, SAVEPOINT *sv);
int ha_release_savepoint(THD *thd, SAVEPOINT *sv);
+#ifdef WITH_WSREP
+int ha_wsrep_abort_transaction(THD *bf_thd, THD *victim_thd, my_bool signal);
+#endif /* WITH_WSREP */
/* these are called by storage engines */
void trans_register_ha(THD *thd, bool all, handlerton *ht);
@@ -3010,6 +3021,9 @@ int ha_binlog_end(THD *thd);
#define ha_binlog_wait(a) do {} while (0)
#define ha_binlog_end(a) do {} while (0)
#endif
+#ifdef WITH_WSREP
+void wsrep_brute_force_aborts();
+#endif
const char *get_canonical_filename(handler *file, const char *path,
char *tmp_path);
diff --git a/sql/item_func.cc b/sql/item_func.cc
index 92431a552c4..c56909d4335 100644
--- a/sql/item_func.cc
+++ b/sql/item_func.cc
@@ -2583,7 +2583,19 @@ void Item_func_rand::seed_random(Item *arg)
TODO: do not do reinit 'rand' for every execute of PS/SP if
args[0] is a constant.
*/
+#ifdef WITH_WSREP
+ uint32 tmp;
+ if (WSREP(current_thd))
+ {
+ if (current_thd->wsrep_exec_mode==REPL_RECV)
+ tmp= current_thd->wsrep_rand;
+ else
+ tmp= current_thd->wsrep_rand= (uint32) arg->val_int();
+ } else
+ tmp= (uint32) arg->val_int();
+#else
uint32 tmp= (uint32) arg->val_int();
+#endif /* WITH_WSREP */
my_rnd_init(rand, (uint32) (tmp*0x10001L+55555555L),
(uint32) (tmp*0x10000001L));
}
diff --git a/sql/lock.cc b/sql/lock.cc
index a7029548493..bbaa51dbbae 100644
--- a/sql/lock.cc
+++ b/sql/lock.cc
@@ -84,6 +84,10 @@
#include <hash.h>
#include <assert.h>
+#ifdef WITH_WSREP
+#include "wsrep_mysqld.h"
+#endif /* WITH_WSREP */
+
/**
@defgroup Locking Locking
@{
@@ -314,6 +318,9 @@ bool mysql_lock_tables(THD *thd, MYSQL_LOCK *sql_lock, uint flags)
/* Copy the lock data array. thr_multi_lock() reorders its contents. */
memcpy(sql_lock->locks + sql_lock->lock_count, sql_lock->locks,
sql_lock->lock_count * sizeof(*sql_lock->locks));
+#ifdef WITH_WSREP
+ thd->lock_info.in_lock_tables= thd->in_lock_tables;
+#endif /* Lock on the copied half of the lock data array. */
/* Lock on the copied half of the lock data array. */
rc= thr_lock_errno_to_mysql[(int) thr_multi_lock(sql_lock->locks +
sql_lock->lock_count,
@@ -323,7 +330,11 @@ bool mysql_lock_tables(THD *thd, MYSQL_LOCK *sql_lock, uint flags)
(void) unlock_external(thd, sql_lock->table, sql_lock->table_count);
end:
+#ifdef WITH_WSREP
+ thd_proc_info(thd, "mysql_lock_tables(): unlocking tables II");
+#else /* WITH_WSREP */
thd_proc_info(thd, 0);
+#endif /* WITH_WSREP */
if (thd->killed)
{
@@ -336,6 +347,9 @@ end:
my_error(rc, MYF(0));
thd->set_time_after_lock();
+#ifdef WITH_WSREP
+ thd_proc_info(thd, "exit mysqld_lock_tables()");
+#endif /* WITH_WSREP */
DBUG_RETURN(rc);
}
@@ -1045,11 +1059,15 @@ void Global_read_lock::unlock_global_read_lock(THD *thd)
{
thd->mdl_context.release_lock(m_mdl_blocks_commits_lock);
m_mdl_blocks_commits_lock= NULL;
+#ifdef WITH_WSREP
+ wsrep_locked_seqno= WSREP_SEQNO_UNDEFINED;
+ wsrep->resume(wsrep);
+#endif /* WITH_WSREP */
}
thd->mdl_context.release_lock(m_mdl_global_shared_lock);
m_mdl_global_shared_lock= NULL;
m_state= GRL_NONE;
-
+
DBUG_VOID_RETURN;
}
@@ -1077,9 +1095,39 @@ bool Global_read_lock::make_global_read_lock_block_commit(THD *thd)
If we didn't succeed lock_global_read_lock(), or if we already suceeded
make_global_read_lock_block_commit(), do nothing.
*/
+
+#ifdef WITH_WSREP
+ if (m_mdl_blocks_commits_lock)
+ {
+ WSREP_DEBUG("GRL was in block commit mode when entering "
+ "make_global_read_lock_block_commit");
+ thd->mdl_context.release_lock(m_mdl_blocks_commits_lock);
+ m_mdl_blocks_commits_lock= NULL;
+ wsrep_locked_seqno= WSREP_SEQNO_UNDEFINED;
+ wsrep->resume(wsrep);
+ m_state= GRL_ACQUIRED;
+ }
+#endif /* WITH_WSREP */
+
if (m_state != GRL_ACQUIRED)
DBUG_RETURN(0);
+#ifdef WITH_WSREP
+ long long ret = wsrep->pause(wsrep);
+ if (ret >= 0)
+ {
+ wsrep_locked_seqno= ret;
+ }
+ else if (ret != -ENOSYS) /* -ENOSYS - no provider */
+ {
+ WSREP_ERROR("Failed to pause provider: %lld (%s)", -ret, strerror(-ret));
+
+ /* m_mdl_blocks_commits_lock is always NULL here */
+ wsrep_locked_seqno= WSREP_SEQNO_UNDEFINED;
+ my_error(ER_LOCK_DEADLOCK, MYF(0));
+ DBUG_RETURN(TRUE);
+ }
+#endif /* WITH_WSREP */
mdl_request.init(MDL_key::COMMIT, "", "", MDL_SHARED, MDL_EXPLICIT);
if (thd->mdl_context.acquire_lock(&mdl_request,
diff --git a/sql/log.cc b/sql/log.cc
index 577297fa1a4..ee7d548d81a 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -51,6 +51,9 @@
#include "sql_plugin.h"
#include "rpl_handler.h"
+#ifdef WITH_WSREP
+#include "wsrep_mysqld.h"
+#endif /* WITH_WSREP */
#include "debug_sync.h"
/* max size of the log message */
@@ -486,6 +489,9 @@ private:
};
handlerton *binlog_hton;
+#ifdef WITH_WSREP
+extern handlerton *wsrep_hton;
+#endif
bool LOGGER::is_log_table_enabled(uint log_table_type)
{
@@ -500,6 +506,134 @@ bool LOGGER::is_log_table_enabled(uint log_table_type)
}
}
+#ifdef WITH_WSREP
+IO_CACHE * get_trans_log(THD * thd)
+{
+ binlog_cache_mngr *cache_mngr = (binlog_cache_mngr*)
+ thd_get_ha_data(thd, binlog_hton);
+ if (cache_mngr)
+ {
+ return cache_mngr->get_binlog_cache_log(true);
+ }
+ else
+ {
+ WSREP_DEBUG("binlog cache not initialized, conn :%ld", thd->thread_id);
+ return NULL;
+ }
+}
+
+
+bool wsrep_trans_cache_is_empty(THD *thd)
+{
+ bool res= TRUE;
+
+ if (thd_sql_command((const THD*) thd) != SQLCOM_SELECT)
+ res= FALSE;
+ else
+ {
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+ if (cache_mngr)
+ {
+ res= cache_mngr->trx_cache.empty();
+ }
+ }
+ return res;
+}
+
+void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end)
+{
+ thd->binlog_flush_pending_rows_event(stmt_end);
+}
+void thd_binlog_trx_reset(THD * thd)
+{
+ /*
+ todo: fix autocommit select to not call the caller
+ */
+ if (thd_get_ha_data(thd, binlog_hton) != NULL)
+ {
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+ if (cache_mngr) cache_mngr->reset(TRUE, TRUE);
+ }
+ thd->clear_binlog_table_maps();
+}
+
+void thd_binlog_rollback_stmt(THD * thd)
+{
+ WSREP_DEBUG("thd_binlog_rollback_stmt :%ld", thd->thread_id);
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+ if (cache_mngr) cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF);
+}
+/*
+ Write the contents of a cache to memory buffer.
+
+ This function quite the same as MYSQL_BIN_LOG::write_cache(),
+ with the exception that here we write in buffer instead of log file.
+ */
+
+int wsrep_write_cache(IO_CACHE *cache, uchar **buf, uint *buf_len)
+{
+
+ if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
+ return ER_ERROR_ON_WRITE;
+ uint length= my_b_bytes_in_cache(cache);
+ long long total_length = 0;
+ uchar *buf_ptr = NULL;
+
+ do
+ {
+ /* bail out if buffer grows too large
+ This is a temporary fix to avoid flooding replication
+ TODO: remove this check for 0.7.4 release
+ */
+ if (total_length > wsrep_max_ws_size)
+ {
+ WSREP_WARN("transaction size limit (%lld) exceeded: %lld",
+ wsrep_max_ws_size, total_length);
+ if (reinit_io_cache(cache, WRITE_CACHE, 0, 0, 0))
+ {
+ WSREP_WARN("failed to initialize io-cache");
+ }
+ if (buf_ptr) my_free(*buf);
+ *buf_len = 0;
+ return ER_ERROR_ON_WRITE;
+ }
+ if (total_length > 0)
+ {
+ *buf_len += length;
+ *buf = (uchar *)my_realloc(*buf, total_length+length, MYF(0));
+ if (!*buf)
+ {
+ WSREP_ERROR("io cache write problem: %d %d", *buf_len, length);
+ return ER_ERROR_ON_WRITE;
+ }
+ buf_ptr = *buf+total_length;
+ }
+ else
+ {
+ if (buf_ptr != NULL)
+ {
+ WSREP_ERROR("io cache alloc error: %d %d", *buf_len, length);
+ my_free(*buf);
+ }
+ if (length > 0)
+ {
+ *buf = (uchar *) my_malloc(length, MYF(0));
+ buf_ptr = *buf;
+ *buf_len = length;
+ }
+ }
+ total_length += length;
+
+ memcpy(buf_ptr, cache->read_pos, length);
+ cache->read_pos=cache->read_end;
+ } while ((cache->file >= 0) && (length= my_b_fill(cache)));
+
+ return 0;
+}
+#endif
/* Check if a given table is opened log table */
int check_if_log_table(size_t db_len, const char *db, size_t table_name_len,
@@ -1536,7 +1670,11 @@ binlog_trans_log_savepos(THD *thd, my_off_t *pos)
thd->binlog_setup_trx_data();
binlog_cache_mngr *const cache_mngr=
(binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+#ifdef WITH_WSREP
+ DBUG_ASSERT((WSREP(thd) && wsrep_emulate_bin_log) || mysql_bin_log.is_open());
+#else
DBUG_ASSERT(mysql_bin_log.is_open());
+#endif
*pos= cache_mngr->trx_cache.get_byte_position();
DBUG_PRINT("return", ("*pos: %lu", (ulong) *pos));
DBUG_VOID_RETURN;
@@ -1584,7 +1722,16 @@ binlog_trans_log_truncate(THD *thd, my_off_t pos)
int binlog_init(void *p)
{
binlog_hton= (handlerton *)p;
+#ifdef WITH_WSREP
+ if (WSREP_ON)
+ binlog_hton->state= SHOW_OPTION_YES;
+ else
+ {
+#endif /* WITH_WSREP */
binlog_hton->state=opt_bin_log ? SHOW_OPTION_YES : SHOW_OPTION_NO;
+#ifdef WITH_WSREP
+ }
+#endif /* WITH_WSREP */
binlog_hton->db_type=DB_TYPE_BINLOG;
binlog_hton->savepoint_offset= sizeof(my_off_t);
binlog_hton->close_connection= binlog_close_connection;
@@ -1840,6 +1987,9 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all)
DBUG_ENTER("binlog_commit");
binlog_cache_mngr *const cache_mngr=
(binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+#ifdef WITH_WSREP
+ if (!cache_mngr) DBUG_RETURN(0);
+#endif /* WITH_WSREP */
DBUG_PRINT("debug",
("all: %d, in_transaction: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s",
@@ -1896,6 +2046,9 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
int error= 0;
binlog_cache_mngr *const cache_mngr=
(binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+#ifdef WITH_WSREP
+ if (!cache_mngr) DBUG_RETURN(0);
+#endif /* WITH_WSREP */
DBUG_PRINT("debug", ("all: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s",
YESNO(all),
@@ -1924,8 +2077,12 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
cache_mngr->reset(false, true);
DBUG_RETURN(error);
}
-
+#ifdef WITH_WSREP
+ if (!wsrep_emulate_bin_log &&
+ mysql_bin_log.check_write_error(thd))
+#else
if (mysql_bin_log.check_write_error(thd))
+#endif
{
/*
"all == true" means that a "rollback statement" triggered the error and
@@ -1955,12 +2112,12 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
if (ending_trans(thd, all) &&
((thd->variables.option_bits & OPTION_KEEP_LOG) ||
(trans_has_updated_non_trans_table(thd) &&
- thd->variables.binlog_format == BINLOG_FORMAT_STMT) ||
+ WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_STMT) ||
(cache_mngr->trx_cache.changes_to_non_trans_temp_table() &&
- thd->variables.binlog_format == BINLOG_FORMAT_MIXED) ||
+ WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_MIXED) ||
(trans_has_updated_non_trans_table(thd) &&
ending_single_stmt_trans(thd,all) &&
- thd->variables.binlog_format == BINLOG_FORMAT_MIXED)))
+ WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_MIXED)))
error= binlog_rollback_flush_trx_cache(thd, all, cache_mngr);
/*
Truncate the cache if:
@@ -1974,9 +2131,9 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
else if (ending_trans(thd, all) ||
(!(thd->variables.option_bits & OPTION_KEEP_LOG) &&
(!stmt_has_updated_non_trans_table(thd) ||
- thd->variables.binlog_format != BINLOG_FORMAT_STMT) &&
+ WSREP_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_STMT) &&
(!cache_mngr->trx_cache.changes_to_non_trans_temp_table() ||
- thd->variables.binlog_format != BINLOG_FORMAT_MIXED)))
+ WSREP_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_MIXED)))
error= binlog_truncate_trx_cache(thd, cache_mngr, all);
}
@@ -2070,7 +2227,9 @@ static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv)
binlog_trans_log_savepos(thd, (my_off_t*) sv);
/* Write it to the binary log */
-
+#ifdef WITH_WSREP
+ if (wsrep_emulate_bin_log) DBUG_RETURN(0);
+#endif /* WITH_WSREP */
String log_query;
if (log_query.append(STRING_WITH_LEN("SAVEPOINT ")) ||
log_query.append("`") ||
@@ -2092,7 +2251,12 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
non-transactional table. Otherwise, truncate the binlog cache starting
from the SAVEPOINT command.
*/
+#ifdef WITH_WSREP
+ if (!wsrep_emulate_bin_log &&
+ unlikely(trans_has_updated_non_trans_table(thd) ||
+#else
if (unlikely(trans_has_updated_non_trans_table(thd) ||
+#endif
(thd->variables.option_bits & OPTION_KEEP_LOG)))
{
String log_query;
@@ -4720,6 +4884,7 @@ int THD::binlog_setup_trx_data()
DBUG_RETURN(0);
}
+
/*
Function to start a statement and optionally a transaction for the
binary log.
@@ -4839,7 +5004,12 @@ int THD::binlog_write_table_map(TABLE *table, bool is_transactional,
table->s->table_map_id));
/* Pre-conditions */
+#ifdef WITH_WSREP
+ DBUG_ASSERT(is_current_stmt_binlog_format_row() &&
+ (WSREP_EMULATE_BINLOG(this) || mysql_bin_log.is_open()));
+#else
DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open());
+#endif
DBUG_ASSERT(table->s->table_map_id != ULONG_MAX);
Table_map_log_event
@@ -4976,7 +5146,11 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
bool is_transactional)
{
DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)");
+#ifdef WITH_WSREP
+ DBUG_ASSERT(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open());
+#else
DBUG_ASSERT(mysql_bin_log.is_open());
+#endif
DBUG_PRINT("enter", ("event: 0x%lx", (long) event));
int error= 0;
@@ -5056,7 +5230,11 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
mostly called if is_open() *was* true a few instructions before, but it
could have changed since.
*/
+#ifdef WITH_WSREP
+ if ((WSREP(thd) && wsrep_emulate_bin_log) || is_open())
+#else
if (likely(is_open()))
+#endif
{
my_off_t UNINIT_VAR(my_org_b_tell);
#ifdef HAVE_REPLICATION
@@ -5237,6 +5415,35 @@ err:
}
}
+#ifdef WITH_WSREP
+ if (WSREP(thd) && wsrep_incremental_data_collection &&
+ (wsrep_emulate_bin_log || mysql_bin_log.is_open()))
+ {
+ DBUG_ASSERT(thd->wsrep_trx_handle.trx_id != (unsigned long)-1);
+ if (!error)
+ {
+ IO_CACHE* cache= get_trans_log(thd);
+ uchar* buf= NULL;
+ uint buf_len= 0;
+
+ if (wsrep_emulate_bin_log)
+ thd->binlog_flush_pending_rows_event(false);
+ error= wsrep_write_cache(cache, &buf, &buf_len);
+ if (!error && buf_len > 0)
+ {
+ wsrep_status_t rc= wsrep->append_data(wsrep,
+ &thd->wsrep_trx_handle,
+ buf, buf_len);
+ if (rc != WSREP_OK)
+ {
+ sql_print_warning("WSREP: append_data() returned %d", rc);
+ error= 1;
+ }
+ }
+ if (buf_len) my_free(buf);
+ }
+ }
+#endif /* WITH_WSREP */
DBUG_RETURN(error);
}
@@ -5329,6 +5536,14 @@ int MYSQL_BIN_LOG::rotate(bool force_rotate, bool* check_purge)
{
int error= 0;
DBUG_ENTER("MYSQL_BIN_LOG::rotate");
+#ifdef WITH_WSREP
+ if (WSREP_ON && wsrep_to_isolation)
+ {
+ WSREP_DEBUG("avoiding binlog rotate due to TO isolation: %d",
+ wsrep_to_isolation);
+ DBUG_RETURN(0);
+ }
+#endif
//todo: fix the macro def and restore safe_mutex_assert_owner(&LOCK_log);
*check_purge= false;
@@ -5797,6 +6012,9 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
group_commit_entry entry;
DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_to_binlog");
+#ifdef WITH_WSREP
+ if (wsrep_emulate_bin_log) DBUG_RETURN(0);
+#endif /* WITH_WSREP */
entry.thd= thd;
entry.cache_mngr= cache_mngr;
entry.error= 0;
@@ -7426,7 +7644,13 @@ TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all,
binlog_cache_mngr *cache_mngr=
(binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
-
+#ifdef WITH_WSREP
+ if (!cache_mngr)
+ {
+ WSREP_DEBUG("Skipping empty log_xid: %s", thd->query());
+ DBUG_RETURN(1);
+ }
+#endif /* WITH_WSREP */
cache_mngr->using_xa= TRUE;
cache_mngr->xa_xid= xid;
err= binlog_commit_flush_xid_caches(thd, cache_mngr, all, xid);
diff --git a/sql/log.h b/sql/log.h
index e8f59801683..2bc4d0e49d7 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -94,7 +94,7 @@ public:
int log_and_order(THD *thd, my_xid xid, bool all,
bool need_prepare_ordered, bool need_commit_ordered)
{
- DBUG_ASSERT(0 /* Internal error - TC_LOG_DUMMY::log_and_order() called */);
+ //DBUG_ASSERT(0 /* Internal error - TC_LOG_DUMMY::log_and_order() called */);
return 1;
}
int unlog(ulong cookie, my_xid xid) { return 0; }
@@ -267,6 +267,12 @@ enum enum_log_state { LOG_OPENED, LOG_CLOSED, LOG_TO_BE_OPENED };
(mmap+fsync is two times faster than write+fsync)
*/
+#ifdef WITH_WSREP
+extern my_bool wsrep_emulate_bin_log;
+Log_event* wsrep_read_log_event(
+ char **arg_buf, size_t *arg_buf_len,
+ const Format_description_log_event *description_event);
+#endif
class MYSQL_LOG
{
public:
@@ -846,12 +852,30 @@ public:
};
enum enum_binlog_format {
+ /*
+ statement-based except for cases where only row-based can work (UUID()
+ etc):
+ */
BINLOG_FORMAT_MIXED= 0, ///< statement if safe, otherwise row - autodetected
BINLOG_FORMAT_STMT= 1, ///< statement-based
BINLOG_FORMAT_ROW= 2, ///< row-based
BINLOG_FORMAT_UNSPEC=3 ///< thd_binlog_format() returns it when binlog is closed
};
+#ifdef WITH_WSREP
+IO_CACHE * get_trans_log(THD * thd);
+bool wsrep_trans_cache_is_empty(THD *thd);
+void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end);
+void thd_binlog_trx_reset(THD * thd);
+void thd_binlog_rollback_stmt(THD * thd);
+int wsrep_write_cache(IO_CACHE *cache, uchar **buf, uint *buf_len);
+
+#define WSREP_FORMAT(my_format) \
+ ((wsrep_forced_binlog_format != BINLOG_FORMAT_UNSPEC) ? \
+ wsrep_forced_binlog_format : my_format)
+#else
+#define WSREP_FORMAT(my_format) my_format
+#endif
int query_error_code(THD *thd, bool not_killed);
uint purge_log_get_error_code(int res);
diff --git a/sql/log_event.cc b/sql/log_event.cc
index cec0785a088..05340fff03a 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -46,6 +46,9 @@
#include "transaction.h"
#include <my_dir.h>
+#if WITH_WSREP
+#include "wsrep_mysqld.h"
+#endif
#endif /* MYSQL_CLIENT */
#include <base64.h>
@@ -2769,7 +2772,9 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
master_data_written(0)
{
time_t end_time;
-
+#ifdef WITH_WSREP
+ thd->wsrep_PA_safe= false;
+#endif /* WITH_WSREP */
memset(&user, 0, sizeof(user));
memset(&host, 0, sizeof(host));
@@ -6983,7 +6988,14 @@ err:
end_io_cache(&file);
if (fd >= 0)
mysql_file_close(fd, MYF(0));
+#ifdef WITH_WSREP
+ if (WSREP(thd))
+ thd_proc_info(thd, "exit Create_file_log_event::do_apply_event()");
+ else
+ thd_proc_info(thd, 0);
+#else /* WITH_WSREP */
thd_proc_info(thd, 0);
+#endif /* WITH_WSREP */
return error != 0;
}
#endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */
@@ -7154,7 +7166,14 @@ int Append_block_log_event::do_apply_event(Relay_log_info const *rli)
err:
if (fd >= 0)
mysql_file_close(fd, MYF(0));
+#ifdef WITH_WSREP
+ if (WSREP(thd))
+ thd_proc_info(thd, "exit Append_block_log_event::do_apply_event()");
+ else
+ thd_proc_info(thd, 0);
+#else /* WITH_WSREP */
thd_proc_info(thd, 0);
+#endif /* WITH_WSREP */
DBUG_RETURN(error);
}
#endif
@@ -8097,7 +8116,17 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
if (open_and_lock_tables(thd, rli->tables_to_lock, FALSE, 0))
{
+#ifdef WITH_WSREP
+ uint actual_error= ER_SERVER_SHUTDOWN;
+ if (WSREP(thd) && !thd->is_fatal_error)
+ {
+ sql_print_information("WSREP, BF applier interrupted in log_event.cc");
+ }
+ else
+ actual_error= thd->stmt_da->sql_errno();
+#else
uint actual_error= thd->stmt_da->sql_errno();
+#endif
if (thd->is_slave_error || thd->is_fatal_error)
{
/*
@@ -9842,8 +9871,23 @@ int
Write_rows_log_event::do_exec_row(const Relay_log_info *const rli)
{
DBUG_ASSERT(m_table != NULL);
+#ifdef WITH_WSREP
+#ifdef WSREP_PROC_INFO
+ char info[64];
+ info[sizeof(info) - 1] = '\0';
+ snprintf(info, sizeof(info) - 1, "Write_rows_log_event::write_row(%lld)",
+ (long long) thd->wsrep_trx_seqno);
+ const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL;
+#else
+ const char* tmp = (WSREP(thd)) ?
+ thd_proc_info(thd,"Write_rows_log_event::write_row()") : NULL;
+#endif /* WSREP_PROC_INFO */
+#endif /* WITH_WSREP */
int error= write_row(rli, slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT);
+#ifdef WITH_WSREP
+ if (WSREP(thd)) thd_proc_info(thd, tmp);
+#endif /* WITH_WSREP */
if (error && !thd->is_error())
{
DBUG_ASSERT(0);
@@ -10496,14 +10540,39 @@ int Delete_rows_log_event::do_exec_row(const Relay_log_info *const rli)
int error;
DBUG_ASSERT(m_table != NULL);
+#ifdef WITH_WSREP
+#ifdef WSREP_PROC_INFO
+ char info[64];
+ info[sizeof(info) - 1] = '\0';
+ snprintf(info, sizeof(info) - 1, "Delete_rows_log_event::find_row(%lld)",
+ (long long) thd->wsrep_trx_seqno);
+ const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL;
+#else
+ const char* tmp = (WSREP(thd)) ?
+ thd_proc_info(thd,"Delete_rows_log_event::find_row()") : NULL;
+#endif /* WSREP_PROC_INFO */
+#endif /* WITH_WSREP */
if (!(error= find_row(rli)))
{
/*
Delete the record found, located in record[0]
*/
+#ifdef WITH_WSREP
+#ifdef WSREP_PROC_INFO
+ snprintf(info, sizeof(info) - 1,
+ "Delete_rows_log_event::ha_delete_row(%lld)",
+ (long long) thd->wsrep_trx_seqno);
+ if (WSREP(thd)) thd_proc_info(thd, info);
+#else
+ if (WSREP(thd)) thd_proc_info(thd,"Delete_rows_log_event::ha_delete_row()");
+#endif /* WSREP_PROC_INFO */
+#endif /* WITH_WSREP */
error= m_table->file->ha_delete_row(m_table->record[0]);
m_table->file->ha_index_or_rnd_end();
}
+#ifdef WITH_WSREP
+ if (WSREP(thd)) thd_proc_info(thd, tmp);
+#endif /* WITH_WSREP */
return error;
}
@@ -10617,6 +10686,18 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
{
DBUG_ASSERT(m_table != NULL);
+#ifdef WITH_WSREP
+#ifdef WSREP_PROC_INFO
+ char info[64];
+ info[sizeof(info) - 1] = '\0';
+ snprintf(info, sizeof(info) - 1, "Update_rows_log_event::find_row(%lld)",
+ (long long) thd->wsrep_trx_seqno);
+ const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL;
+#else
+ const char* tmp = (WSREP(thd)) ?
+ thd_proc_info(thd,"Update_rows_log_event::find_row()") : NULL;
+#endif /* WSREP_PROC_INFO */
+#endif /* WITH_WSREP */
int error= find_row(rli);
if (error)
{
@@ -10643,6 +10724,17 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
store_record(m_table,record[1]);
m_curr_row= m_curr_row_end;
+#ifdef WITH_WSREP
+#ifdef WSREP_PROC_INFO
+ snprintf(info, sizeof(info) - 1,
+ "Update_rows_log_event::unpack_current_row(%lld)",
+ (long long) thd->wsrep_trx_seqno);
+ if (WSREP(thd)) thd_proc_info(thd, info);
+#else
+ if (WSREP(thd))
+ thd_proc_info(thd,"Update_rows_log_event::unpack_current_row()");
+#endif /* WSREP_PROC_INFO */
+#endif /* WITH_WSREP */
/* this also updates m_curr_row_end */
if ((error= unpack_current_row(rli)))
goto err;
@@ -10661,10 +10753,23 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
DBUG_DUMP("new values", m_table->record[0], m_table->s->reclength);
#endif
+#ifdef WITH_WSREP
+#ifdef WSREP_PROC_INFO
+ snprintf(info, sizeof(info) - 1,
+ "Update_rows_log_event::ha_update_row(%lld)",
+ (long long) thd->wsrep_trx_seqno);
+ if (WSREP(thd)) thd_proc_info(thd, info);
+#else
+ if (WSREP(thd)) thd_proc_info(thd,"Update_rows_log_event::ha_update_row()");
+#endif /* WSREP_PROC_INFO */
+#endif /* WITH_WSREP */
error= m_table->file->ha_update_row(m_table->record[1], m_table->record[0]);
if (error == HA_ERR_RECORD_IS_THE_SAME)
error= 0;
+#ifdef WITH_WSREP
+ if (WSREP(thd)) thd_proc_info(thd, tmp);
+#endif /* WITH_WSREP */
err:
m_table->file->ha_index_or_rnd_end();
return error;
@@ -10749,6 +10854,50 @@ void Incident_log_event::pack_info(Protocol *protocol)
protocol->store(buf, bytes, &my_charset_bin);
}
#endif
+#if WITH_WSREP && !defined(MYSQL_CLIENT)
+Format_description_log_event *wsrep_format_desc; // TODO: free them at the end
+/*
+ read the first event from (*buf). The size of the (*buf) is (*buf_len).
+ At the end (*buf) is shitfed to point to the following event or NULL and
+ (*buf_len) will be changed to account just being read bytes of the 1st event.
+*/
+Log_event* wsrep_read_log_event(
+ char **arg_buf, size_t *arg_buf_len,
+ const Format_description_log_event *description_event)
+{
+ DBUG_ENTER("wsrep_read_log_event");
+ char *head= (*arg_buf);
+
+ uint data_len = uint4korr(head + EVENT_LEN_OFFSET);
+ char *buf= (*arg_buf);
+ const char *error= 0;
+ Log_event *res= 0;
+#ifndef max_allowed_packet
+ THD *thd=current_thd;
+ uint max_allowed_packet= thd ? thd->variables.max_allowed_packet : ~(ulong)0;
+#endif
+
+ if (data_len > max_allowed_packet)
+ {
+ error = "Event too big";
+ goto err;
+ }
+
+ res= Log_event::read_log_event(buf, data_len, &error, description_event, FALSE);
+
+err:
+ if (!res)
+ {
+ DBUG_ASSERT(error != 0);
+ sql_print_error("Error in Log_event::read_log_event(): "
+ "'%s', data_len: %d, event_type: %d",
+ error,data_len,head[EVENT_TYPE_OFFSET]);
+ }
+ (*arg_buf)+= data_len;
+ (*arg_buf_len)-= data_len;
+ DBUG_RETURN(res);
+}
+#endif
#ifdef MYSQL_CLIENT
diff --git a/sql/mdl.cc b/sql/mdl.cc
index ca552a540b9..8ff420e4f50 100644
--- a/sql/mdl.cc
+++ b/sql/mdl.cc
@@ -20,7 +20,17 @@
#include <mysqld_error.h>
#include <mysql/plugin.h>
#include <mysql/service_thd_wait.h>
-
+#ifdef WITH_WSREP
+#include "wsrep_mysqld.h"
+extern "C" my_thread_id wsrep_thd_thread_id(THD *thd);
+extern "C" char *wsrep_thd_query(THD *thd);
+void sql_print_information(const char *format, ...)
+ ATTRIBUTE_FORMAT(printf, 1, 2);
+
+extern bool
+wsrep_grant_mdl_exception(MDL_context *requestor_ctx,
+ MDL_ticket *ticket);
+#endif /* WITH_WSREP */
#ifdef HAVE_PSI_INTERFACE
static PSI_mutex_key key_MDL_map_mutex;
static PSI_mutex_key key_MDL_wait_LOCK_wait_status;
@@ -1222,11 +1232,54 @@ void MDL_lock::Ticket_list::add_ticket(MDL_ticket *ticket)
called by other threads.
*/
DBUG_ASSERT(ticket->get_lock());
+#ifdef WITH_WSREP
+ if ((this == &(ticket->get_lock()->m_waiting)) &&
+ wsrep_thd_is_brute_force((void *)(ticket->get_ctx()->get_thd())))
+ {
+ Ticket_iterator itw(ticket->get_lock()->m_waiting);
+ Ticket_iterator itg(ticket->get_lock()->m_granted);
+
+ MDL_ticket *waiting, *granted;
+ MDL_ticket *prev=NULL;
+ bool added= false;
+
+ while ((waiting= itw++) && !added)
+ {
+ if (!wsrep_thd_is_brute_force((void *)(waiting->get_ctx()->get_thd())))
+ {
+ WSREP_DEBUG("MDL add_ticket inserted before: %lu %s",
+ wsrep_thd_thread_id(waiting->get_ctx()->get_thd()),
+ wsrep_thd_query(waiting->get_ctx()->get_thd()));
+ m_list.insert_after(prev, ticket);
+ added= true;
+ }
+ prev= waiting;
+ }
+ if (!added) m_list.push_back(ticket);
+
+ while ((granted= itg++))
+ {
+ if (granted->get_ctx() != ticket->get_ctx() &&
+ granted->is_incompatible_when_granted(ticket->get_type()))
+ {
+ if (!wsrep_grant_mdl_exception(ticket->get_ctx(), granted))
+ {
+ WSREP_DEBUG("MDL victim killed at add_ticket");
+ }
+ }
+ }
+ }
+ else
+ {
+#endif /* WITH_WSREP */
/*
Add ticket to the *back* of the queue to ensure fairness
among requests with the same priority.
*/
m_list.push_back(ticket);
+#ifdef WITH_WSREP
+ }
+#endif /* WITH_WSREP */
m_bitmap|= MDL_BIT(ticket->get_type());
}
@@ -1463,7 +1516,6 @@ MDL_object_lock::m_waiting_incompatible[MDL_TYPE_END] =
0
};
-
/**
Check if request for the metadata lock can be satisfied given its
current state.
@@ -1486,6 +1538,9 @@ MDL_lock::can_grant_lock(enum_mdl_type type_arg,
bool can_grant= FALSE;
bitmap_t waiting_incompat_map= incompatible_waiting_types_bitmap()[type_arg];
bitmap_t granted_incompat_map= incompatible_granted_types_bitmap()[type_arg];
+#ifdef WITH_WSREP
+ bool wsrep_can_grant= TRUE;
+#endif /* WITH_WSREP */
/*
New lock request can be satisfied iff:
- There are no incompatible types of satisfied requests
@@ -1507,12 +1562,51 @@ MDL_lock::can_grant_lock(enum_mdl_type type_arg,
{
if (ticket->get_ctx() != requestor_ctx &&
ticket->is_incompatible_when_granted(type_arg))
+#ifdef WITH_WSREP
+ {
+ if (wsrep_thd_is_brute_force((void *)(requestor_ctx->get_thd())) &&
+ key.mdl_namespace() == MDL_key::GLOBAL)
+ {
+ WSREP_DEBUG("global lock granted for BF: %lu %s",
+ wsrep_thd_thread_id(requestor_ctx->get_thd()),
+ wsrep_thd_query(requestor_ctx->get_thd()));
+ can_grant = true;
+ }
+ else if (!wsrep_grant_mdl_exception(requestor_ctx, ticket))
+ {
+ wsrep_can_grant= FALSE;
+ }
+ else
+ {
+ can_grant= TRUE;
+ }
+ }
+#else
break;
+#endif /* WITH_WSREP */
}
+#ifdef WITH_WSREP
+ if ((ticket == NULL) && wsrep_can_grant)
+#else
if (ticket == NULL) /* Incompatible locks are our own. */
+#endif /* WITH_WSREP */
+
can_grant= TRUE;
}
}
+#ifdef WITH_WSREP
+ else
+ {
+ if (wsrep_thd_is_brute_force((void *)(requestor_ctx->get_thd())) &&
+ key.mdl_namespace() == MDL_key::GLOBAL)
+ {
+ WSREP_DEBUG("global lock granted for BF (waiting queue): %lu %s",
+ wsrep_thd_thread_id(requestor_ctx->get_thd()),
+ wsrep_thd_query(requestor_ctx->get_thd()));
+ can_grant = true;
+ }
+ }
+#endif /* WITH_WSREP */
return can_grant;
}
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 7394ce5c931..2251663c94a 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -72,6 +72,10 @@
#include "scheduler.h"
#include <waiting_threads.h>
#include "debug_sync.h"
+#ifdef WITH_WSREP
+#include "wsrep_mysqld.h"
+ulong wsrep_running_threads = 0; // # of currently running wsrep threads
+#endif
#include "sql_callback.h"
#include "threadpool.h"
@@ -360,6 +364,9 @@ static DYNAMIC_ARRAY all_options;
/* Global variables */
+#ifdef WITH_WSREP
+ulong my_bind_addr;
+#endif /* WITH_WSREP */
bool opt_bin_log, opt_ignore_builtin_innodb= 0;
my_bool opt_log, opt_slow_log, debug_assert_if_crashed_table= 0, opt_help= 0, opt_abort;
ulonglong log_output_options;
@@ -452,6 +459,10 @@ ulong opt_binlog_rows_event_max_size;
my_bool opt_master_verify_checksum= 0;
my_bool opt_slave_sql_verify_checksum= 1;
const char *binlog_format_names[]= {"MIXED", "STATEMENT", "ROW", NullS};
+#ifdef WITH_WSREP
+const char *wsrep_binlog_format_names[]=
+ {"MIXED", "STATEMENT", "ROW", "NONE", NullS};
+#endif /*WITH_WSREP */
#ifdef HAVE_INITGROUPS
volatile sig_atomic_t calling_initgroups= 0; /**< Used in SIGSEGV handler. */
#endif
@@ -671,6 +682,21 @@ pthread_attr_t connection_attrib;
mysql_mutex_t LOCK_server_started;
mysql_cond_t COND_server_started;
+#ifdef WITH_WSREP
+mysql_mutex_t LOCK_wsrep_ready;
+mysql_cond_t COND_wsrep_ready;
+mysql_mutex_t LOCK_wsrep_sst;
+mysql_cond_t COND_wsrep_sst;
+mysql_mutex_t LOCK_wsrep_sst_init;
+mysql_cond_t COND_wsrep_sst_init;
+mysql_mutex_t LOCK_wsrep_rollback;
+mysql_cond_t COND_wsrep_rollback;
+wsrep_aborting_thd_t wsrep_aborting_thd= NULL;
+mysql_mutex_t LOCK_wsrep_replaying;
+mysql_cond_t COND_wsrep_replaying;
+int wsrep_replaying= 0;
+static void wsrep_close_threads(THD* thd);
+#endif
int mysqld_server_started= 0;
File_parser_dummy_hook file_parser_dummy_hook;
@@ -740,6 +766,11 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_prep_xids,
key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data,
key_LOCK_error_messages, key_LOG_INFO_lock, key_LOCK_thread_count,
key_PARTITION_LOCK_auto_inc;
+#ifdef WITH_WSREP
+PSI_mutex_key key_LOCK_wsrep_rollback, key_LOCK_wsrep_thd,
+ key_LOCK_wsrep_replaying, key_LOCK_wsrep_ready, key_LOCK_wsrep_sst,
+ key_LOCK_wsrep_sst_thread, key_LOCK_wsrep_sst_init;
+#endif
PSI_mutex_key key_RELAYLOG_LOCK_index;
PSI_mutex_key key_LOCK_stats,
@@ -810,6 +841,16 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_thread_count, "LOCK_thread_count", PSI_FLAG_GLOBAL},
{ &key_LOCK_logger_service, "logger_service_file_st::lock",
PSI_FLAG_GLOBAL},
+#ifdef WITH_WSREP
+ { &key_LOCK_wsrep_ready, "LOCK_wsrep_ready", PSI_FLAG_GLOBAL},
+ { &key_LOCK_wsrep_sst, "LOCK_wsrep_sst", PSI_FLAG_GLOBAL},
+ { &key_LOCK_wsrep_sst_thread, "wsrep_sst_thread", 0},
+ { &key_LOCK_wsrep_sst_init, "LOCK_wsrep_sst_init", PSI_FLAG_GLOBAL},
+ { &key_LOCK_wsrep_sst, "LOCK_wsrep_sst", PSI_FLAG_GLOBAL},
+ { &key_LOCK_wsrep_rollback, "LOCK_wsrep_rollback", PSI_FLAG_GLOBAL},
+ { &key_LOCK_wsrep_thd, "THD::LOCK_wsrep_thd", 0},
+ { &key_LOCK_wsrep_replaying, "LOCK_wsrep_replaying", PSI_FLAG_GLOBAL},
+#endif
{ &key_PARTITION_LOCK_auto_inc, "HA_DATA_PARTITION::LOCK_auto_inc", 0}
};
@@ -847,6 +888,11 @@ PSI_cond_key key_BINLOG_COND_prep_xids, key_BINLOG_update_cond,
key_TABLE_SHARE_cond, key_user_level_lock_cond,
key_COND_thread_count, key_COND_thread_cache, key_COND_flush_thread_cache,
key_BINLOG_COND_queue_busy;
+#ifdef WITH_WSREP
+PSI_cond_key key_COND_wsrep_rollback, key_COND_wsrep_thd,
+ key_COND_wsrep_replaying, key_COND_wsrep_ready, key_COND_wsrep_sst,
+ key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread;
+#endif /* WITH_WSREP */
PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready;
PSI_cond_key key_RELAYLOG_COND_queue_busy;
PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
@@ -888,6 +934,15 @@ static PSI_cond_info all_server_conds[]=
{ &key_user_level_lock_cond, "User_level_lock::cond", 0},
{ &key_COND_thread_count, "COND_thread_count", PSI_FLAG_GLOBAL},
{ &key_COND_thread_cache, "COND_thread_cache", PSI_FLAG_GLOBAL},
+#ifdef WITH_WSREP
+ { &key_COND_wsrep_ready, "COND_wsrep_ready", PSI_FLAG_GLOBAL},
+ { &key_COND_wsrep_sst, "COND_wsrep_sst", PSI_FLAG_GLOBAL},
+ { &key_COND_wsrep_sst_init, "COND_wsrep_sst_init", PSI_FLAG_GLOBAL},
+ { &key_COND_wsrep_sst_thread, "wsrep_sst_thread", 0},
+ { &key_COND_wsrep_rollback, "COND_wsrep_rollback", PSI_FLAG_GLOBAL},
+ { &key_COND_wsrep_thd, "THD::COND_wsrep_thd", 0},
+ { &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL},
+#endif
{ &key_COND_flush_thread_cache, "COND_flush_thread_cache", PSI_FLAG_GLOBAL}
};
@@ -1413,6 +1468,11 @@ static void close_connections(void)
if (tmp->slave_thread)
continue;
+#ifdef WITH_WSREP
+ /* skip wsrep system threads as well */
+ if (WSREP(tmp) && (tmp->wsrep_exec_mode==REPL_RECV || tmp->wsrep_applier))
+ continue;
+#endif
tmp->killed= KILL_SERVER_HARD;
MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (tmp));
mysql_mutex_lock(&tmp->LOCK_thd_data);
@@ -1476,6 +1536,33 @@ static void close_connections(void)
close_connection(tmp,ER_SERVER_SHUTDOWN);
}
#endif
+#ifdef WITH_WSREP
+ /*
+ * TODO: this code block may turn out redundant. wsrep->disconnect()
+ * should terminate slave threads gracefully, and we don't need
+ * to signal them here.
+ * The code here makes sure mysqld will not hang during shutdown
+ * even if wsrep provider has problems in shutting down.
+ */
+ if (WSREP(tmp) && tmp->wsrep_exec_mode==REPL_RECV)
+ {
+ sql_print_information("closing wsrep system thread");
+ tmp->killed= KILL_CONNECTION;
+ MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (tmp));
+ if (tmp->mysys_var)
+ {
+ tmp->mysys_var->abort=1;
+ mysql_mutex_lock(&tmp->mysys_var->mutex);
+ if (tmp->mysys_var->current_cond)
+ {
+ mysql_mutex_lock(tmp->mysys_var->current_mutex);
+ mysql_cond_broadcast(tmp->mysys_var->current_cond);
+ mysql_mutex_unlock(tmp->mysys_var->current_mutex);
+ }
+ mysql_mutex_unlock(&tmp->mysys_var->mutex);
+ }
+ }
+#endif
DBUG_PRINT("quit",("Unlocking LOCK_thread_count"));
mysql_mutex_unlock(&LOCK_thread_count);
}
@@ -1636,8 +1723,14 @@ static void __cdecl kill_server(int sig_ptr)
}
}
#endif
+#ifdef WITH_WSREP
+ if (WSREP_ON) wsrep_stop_replication(NULL);
+#endif
close_connections();
+#ifdef WITH_WSREP
+ if (WSREP_ON) wsrep_deinit();
+#endif
if (sig != MYSQL_KILL_SIGNAL &&
sig != 0)
unireg_abort(1); /* purecov: inspected */
@@ -1732,6 +1825,21 @@ extern "C" void unireg_abort(int exit_code)
usage();
if (exit_code)
sql_print_error("Aborting\n");
+#ifdef WITH_WSREP
+ if (wsrep)
+ {
+ /* This is an abort situation, we cannot expect to gracefully close all
+ * wsrep threads here, we can only diconnect from service */
+ wsrep_close_client_connections(FALSE);
+ shutdown_in_progress= 1;
+ THD* thd(0);
+ wsrep->disconnect(wsrep);
+ WSREP_INFO("Service disconnected.");
+ wsrep_close_threads(thd); /* this won't close all threads */
+ sleep(1); /* so give some time to exit for those which can */
+ WSREP_INFO("Some threads may fail to exit.");
+ }
+#endif // WITH_WSREP
clean_up(!opt_abort && (exit_code || !opt_bootstrap)); /* purecov: inspected */
DBUG_PRINT("quit",("done with cleanup in unireg_abort"));
mysqld_exit(exit_code);
@@ -1932,6 +2040,18 @@ static void clean_up_mutexes()
mysql_cond_destroy(&COND_thread_count);
mysql_cond_destroy(&COND_thread_cache);
mysql_cond_destroy(&COND_flush_thread_cache);
+#ifdef WITH_WSREP
+ (void) mysql_mutex_destroy(&LOCK_wsrep_ready);
+ (void) mysql_cond_destroy(&COND_wsrep_ready);
+ (void) mysql_mutex_destroy(&LOCK_wsrep_sst);
+ (void) mysql_cond_destroy(&COND_wsrep_sst);
+ (void) mysql_mutex_destroy(&LOCK_wsrep_sst_init);
+ (void) mysql_cond_destroy(&COND_wsrep_sst_init);
+ (void) mysql_mutex_destroy(&LOCK_wsrep_rollback);
+ (void) mysql_cond_destroy(&COND_wsrep_rollback);
+ (void) mysql_mutex_destroy(&LOCK_wsrep_replaying);
+ (void) mysql_cond_destroy(&COND_wsrep_replaying);
+#endif
mysql_mutex_destroy(&LOCK_server_started);
mysql_cond_destroy(&COND_server_started);
mysql_mutex_destroy(&LOCK_prepare_ordered);
@@ -2350,7 +2470,11 @@ static void network_init(void)
@note
For the connection that is doing shutdown, this is called twice
*/
+#ifdef WITH_WSREP
+void close_connection(THD *thd, uint sql_errno, bool lock)
+#else
void close_connection(THD *thd, uint sql_errno)
+#endif
{
DBUG_ENTER("close_connection");
@@ -3477,7 +3601,11 @@ static int init_common_variables()
compile_time_assert(sizeof(com_status_vars)/sizeof(com_status_vars[0]) - 1 ==
SQLCOM_END + 8);
#endif
-
+#ifdef WITH_WSREP
+ /* This is a protection against mutually incompatible option values. */
+ if (WSREP_ON && wsrep_check_opts (remaining_argc, remaining_argv))
+ return 1;
+#endif /* WITH_WSREP */
if (get_options(&remaining_argc, &remaining_argv))
return 1;
set_server_version();
@@ -3864,6 +3992,23 @@ static int init_thread_environment()
sql_print_error("Can't create thread-keys");
return 1;
}
+#ifdef WITH_WSREP
+ mysql_mutex_init(key_LOCK_wsrep_ready,
+ &LOCK_wsrep_ready, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_COND_wsrep_ready, &COND_wsrep_ready, NULL);
+ mysql_mutex_init(key_LOCK_wsrep_sst,
+ &LOCK_wsrep_sst, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_COND_wsrep_sst, &COND_wsrep_sst, NULL);
+ mysql_mutex_init(key_LOCK_wsrep_sst_init,
+ &LOCK_wsrep_sst_init, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_COND_wsrep_sst_init, &COND_wsrep_sst_init, NULL);
+ mysql_mutex_init(key_LOCK_wsrep_rollback,
+ &LOCK_wsrep_rollback, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_COND_wsrep_rollback, &COND_wsrep_rollback, NULL);
+ mysql_mutex_init(key_LOCK_wsrep_replaying,
+ &LOCK_wsrep_replaying, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_COND_wsrep_replaying, &COND_wsrep_replaying, NULL);
+#endif
return 0;
}
@@ -4106,7 +4251,12 @@ static int init_server_components()
sql_print_warning("You need to use --log-bin to make "
"--log-slave-updates work.");
}
+
+#ifdef WITH_WSREP
+ if (!WSREP_ON && !opt_bin_log && binlog_format_used)
+#else
if (!opt_bin_log && binlog_format_used)
+#endif
sql_print_warning("You need to use --log-bin to make "
"--binlog-format work.");
@@ -4131,6 +4281,31 @@ will be ignored as the --log-bin option is not defined.");
}
#endif
+/* WSREP BEFORE */
+#ifdef WITH_WSREP
+ // add basedir/bin to PATH to resolve wsrep script names
+ char* const tmp_path((char*)alloca(strlen(mysql_home) + strlen("/bin") + 1));
+ if (tmp_path)
+ {
+ strcpy(tmp_path, mysql_home);
+ strcat(tmp_path, "/bin");
+ wsrep_prepend_PATH(tmp_path);
+ }
+ else
+ {
+ WSREP_ERROR("Could not append %s/bin to PATH", mysql_home);
+ }
+
+ if (opt_bootstrap)
+ {
+ wsrep_provider_init(WSREP_NONE);
+ if (wsrep_init()) unireg_abort(1);
+ }
+ else if (!wsrep_recovery && wsrep_init_first())
+ {
+ wsrep_init_startup(true);
+ }
+#endif /* WITH_WSREP */
if (opt_bin_log)
{
/* Reports an error and aborts, if the --log-bin's path
@@ -4336,11 +4511,30 @@ a file name for --log-bin-index option", opt_binlog_index_name);
internal_tmp_table_max_key_segments= myisam_max_key_segments();
#endif
+#ifdef WITH_WSREP
+ if (!opt_bin_log)
+ {
+ wsrep_emulate_bin_log= 1;
+ }
+#endif
+
tc_log= (total_ha_2pc > 1 ? (opt_bin_log ?
(TC_LOG *) &mysql_bin_log :
+#ifdef WITH_WSREP
+ (WSREP_ON ?
+ (TC_LOG *) &tc_log_dummy :
+ (TC_LOG *) &tc_log_mmap)) :
+#else
(TC_LOG *) &tc_log_mmap) :
- (TC_LOG *) &tc_log_dummy);
-
+#endif
+ (TC_LOG *) &tc_log_dummy);
+#ifdef WITH_WSREP
+ WSREP_DEBUG("Initial TC log open: %s",
+ (tc_log == &mysql_bin_log) ? "binlog" :
+ (tc_log == &tc_log_mmap) ? "mmap" :
+ (tc_log == &tc_log_dummy) ? "dummy" : "unknown"
+ );
+#endif
if (tc_log->open(opt_bin_log ? opt_bin_logname : opt_tc_log_file))
{
sql_print_error("Can't init tc log");
@@ -4397,8 +4591,6 @@ a file name for --log-bin-index option", opt_binlog_index_name);
init_global_client_stats();
DBUG_RETURN(0);
}
-
-
#ifndef EMBEDDED_LIBRARY
static void create_shutdown_thread()
@@ -4417,6 +4609,397 @@ static void create_shutdown_thread()
#endif /* EMBEDDED_LIBRARY */
+#ifdef WITH_WSREP
+typedef void (*wsrep_thd_processor_fun)(THD *);
+
+pthread_handler_t start_wsrep_THD(void *arg)
+{
+ THD *thd;
+ wsrep_thd_processor_fun processor= (wsrep_thd_processor_fun)arg;
+
+ DBUG_ENTER("start_wsrep_THD");
+ if (my_thread_init())
+ {
+ WSREP_ERROR("Could not initialize thread");
+ DBUG_RETURN(NULL);
+ }
+
+ if (!(thd= new THD(true)))
+ {
+ DBUG_RETURN(NULL);
+ }
+ mysql_mutex_lock(&LOCK_thread_count);
+ thd->thread_id=thread_id++;
+
+ thd->real_id=pthread_self(); // Keep purify happy
+ thread_count++;
+ thread_created++;
+ threads.append(thd);
+
+ my_net_init(&thd->net,(st_vio*) 0);
+
+ DBUG_PRINT("wsrep",(("creating thread %lld"), (long long)thd->thread_id));
+ thd->prior_thr_create_utime= thd->start_utime= microsecond_interval_timer();
+ (void) mysql_mutex_unlock(&LOCK_thread_count);
+
+ /* from bootstrap()... */
+ thd->bootstrap=1;
+ thd->max_client_packet_length= thd->net.max_packet;
+ thd->security_ctx->master_access= ~(ulong)0;
+
+ /* from handle_one_connection... */
+ pthread_detach_this_thread();
+
+ mysql_thread_set_psi_id(thd->thread_id);
+ thd->thr_create_utime= microsecond_interval_timer();
+ if (MYSQL_CALLBACK_ELSE(thread_scheduler, init_new_connection_thread, (), 0))
+ {
+ close_connection(thd, ER_OUT_OF_RESOURCES, 1);
+ statistic_increment(aborted_connects,&LOCK_status);
+ MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0));
+
+ DBUG_RETURN(NULL);
+ }
+
+// </5.1.17>
+ /*
+ handle_one_connection() is normally the only way a thread would
+ start and would always be on the very high end of the stack ,
+ therefore, the thread stack always starts at the address of the
+ first local variable of handle_one_connection, which is thd. We
+ need to know the start of the stack so that we could check for
+ stack overruns.
+ */
+ DBUG_PRINT("wsrep", ("handle_one_connection called by thread %lld\n",
+ (long long)thd->thread_id));
+ /* now that we've called my_thread_init(), it is safe to call DBUG_* */
+
+ thd->thread_stack= (char*) &thd;
+ if (thd->store_globals())
+ {
+ close_connection(thd, ER_OUT_OF_RESOURCES, 1);
+ statistic_increment(aborted_connects,&LOCK_status);
+ MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0));
+ delete thd;
+
+ DBUG_RETURN(NULL);
+ }
+
+ thd->system_thread= SYSTEM_THREAD_SLAVE_SQL;
+ thd->security_ctx->skip_grants();
+
+ /* handle_one_connection() again... */
+ //thd->version= refresh_version;
+ thd->proc_info= 0;
+ thd->command= COM_SLEEP;
+ thd->set_time();
+ thd->init_for_queries();
+
+ mysql_mutex_lock(&LOCK_connection_count);
+ ++connection_count;
+ mysql_mutex_unlock(&LOCK_connection_count);
+
+ mysql_mutex_lock(&LOCK_thread_count);
+ wsrep_running_threads++;
+ mysql_cond_signal(&COND_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
+
+ processor(thd);
+
+ close_connection(thd, 0, 1);
+
+ mysql_mutex_lock(&LOCK_thread_count);
+ wsrep_running_threads--;
+ mysql_cond_signal(&COND_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
+
+ // Note: We can't call THD destructor without crashing
+ // if plugins have not been initialized. However, in most of the
+ // cases this means that pre SE initialization SST failed and
+ // we are going to exit anyway.
+ if (plugins_are_initialized)
+ {
+ net_end(&thd->net);
+ MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 1));
+ }
+ else
+ {
+ // TODO: lightweight cleanup to get rid of:
+ // 'Error in my_thread_global_end(): 2 threads didn't exit'
+ // at server shutdown
+ }
+ DBUG_RETURN(NULL);
+}
+
+void wsrep_create_rollbacker()
+{
+ if (wsrep_provider && strcasecmp(wsrep_provider, "none"))
+ {
+ pthread_t hThread;
+ /* create rollbacker */
+ if (pthread_create( &hThread, &connection_attrib,
+ start_wsrep_THD, (void*)wsrep_rollback_process))
+ WSREP_WARN("Can't create thread to manage wsrep rollback");
+ }
+}
+
+void wsrep_create_appliers(long threads)
+{
+ if (!wsrep_connected)
+ {
+ /* see wsrep_replication_start() for the logic */
+ if (wsrep_cluster_address && strlen(wsrep_cluster_address) &&
+ wsrep_provider && strcasecmp(wsrep_provider, "none"))
+ {
+ WSREP_ERROR("Trying to launch slave threads before creating "
+ "connection at '%s'", wsrep_cluster_address);
+ assert(0);
+ }
+ return;
+ }
+
+ long wsrep_threads=0;
+ pthread_t hThread;
+ while (wsrep_threads++ < threads) {
+ if (pthread_create(
+ &hThread, &connection_attrib,
+ start_wsrep_THD, (void*)wsrep_replication_process))
+ WSREP_WARN("Can't create thread to manage wsrep replication");
+ }
+}
+/**/
+static bool abort_replicated(THD *thd)
+{
+ bool ret_code= false;
+ if (thd->wsrep_query_state== QUERY_COMMITTING)
+ {
+ if (wsrep_debug) WSREP_INFO("aborting replicated trx: %lu", thd->real_id);
+
+ (void)wsrep_abort_thd(thd, thd, TRUE);
+ ret_code= true;
+ }
+ return ret_code;
+}
+/**/
+static inline bool is_client_connection(THD *thd)
+{
+#if REMOVE
+// REMOVE THIS LATER (lp:777201). Below we had to add an explicit check for
+// wsrep_applier since wsrep_exec_mode didn't seem to always work
+if (thd->wsrep_applier && thd->wsrep_exec_mode != REPL_RECV)
+WSREP_WARN("applier has wsrep_exec_mode = %d", thd->wsrep_exec_mode);
+
+ if ( thd->slave_thread || /* declared as mysql slave */
+ thd->system_thread || /* declared as system thread */
+ !thd->vio_ok() || /* server internal thread */
+ thd->wsrep_exec_mode==REPL_RECV || /* applier or replaying thread */
+ thd->wsrep_applier || /* wsrep slave applier */
+ !thd->variables.wsrep_on) /* client, but fenced outside wsrep */
+ return false;
+
+ return true;
+#else
+ return (thd->wsrep_client_thread && thd->variables.wsrep_on);
+#endif /* REMOVE */
+}
+
+static bool have_client_connections()
+{
+ THD *tmp;
+
+ I_List_iterator<THD> it(threads);
+ while ((tmp=it++))
+ {
+ DBUG_PRINT("quit",("Informing thread %ld that it's time to die",
+ tmp->thread_id));
+ if (is_client_connection(tmp) && tmp->killed == KILL_CONNECTION)
+ {
+ (void)abort_replicated(tmp);
+ return true;
+ }
+ }
+ return false;
+}
+
+/*
+ returns the number of wsrep appliers running.
+ However, the caller (thd parameter) is not taken in account
+ */
+static int have_wsrep_appliers(THD *thd)
+{
+ int ret= 0;
+ THD *tmp;
+
+ I_List_iterator<THD> it(threads);
+ while ((tmp=it++))
+ {
+ ret+= (tmp != thd && tmp->wsrep_applier);
+ }
+ return ret;
+}
+
+static void wsrep_close_thread(THD *thd)
+{
+ thd->killed= KILL_CONNECTION;
+ MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd));
+ if (thd->mysys_var)
+ {
+ thd->mysys_var->abort=1;
+ mysql_mutex_lock(&thd->mysys_var->mutex);
+ if (thd->mysys_var->current_cond)
+ {
+ mysql_mutex_lock(thd->mysys_var->current_mutex);
+ mysql_cond_broadcast(thd->mysys_var->current_cond);
+ mysql_mutex_unlock(thd->mysys_var->current_mutex);
+ }
+ mysql_mutex_unlock(&thd->mysys_var->mutex);
+ }
+}
+
+void wsrep_close_client_connections(my_bool wait_to_end)
+{
+ /*
+ First signal all threads that it's time to die
+ */
+
+ THD *tmp;
+ mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
+
+ bool kill_cached_threads_saved= kill_cached_threads;
+ kill_cached_threads= true; // prevent future threads caching
+ mysql_cond_broadcast(&COND_thread_cache); // tell cached threads to die
+
+ I_List_iterator<THD> it(threads);
+ while ((tmp=it++))
+ {
+ DBUG_PRINT("quit",("Informing thread %ld that it's time to die",
+ tmp->thread_id));
+ /* We skip slave threads & scheduler on this first loop through. */
+ if (!is_client_connection(tmp))
+ continue;
+
+ /* replicated transactions must be skipped */
+ if (abort_replicated(tmp))
+ continue;
+
+ WSREP_DEBUG("closing connection %ld", tmp->thread_id);
+ wsrep_close_thread(tmp);
+ }
+ mysql_mutex_unlock(&LOCK_thread_count);
+
+ if (thread_count)
+ sleep(2); // Give threads time to die
+
+ mysql_mutex_lock(&LOCK_thread_count);
+ /*
+ Force remaining threads to die by closing the connection to the client
+ */
+
+ I_List_iterator<THD> it2(threads);
+ while ((tmp=it2++))
+ {
+#ifndef __bsdi__ // Bug in BSDI kernel
+ if (is_client_connection(tmp) && !abort_replicated(tmp))
+ {
+ WSREP_INFO("SST kill local trx: %ld",tmp->thread_id);
+ close_connection(tmp,0,0);
+ }
+#endif
+ }
+
+ DBUG_PRINT("quit",("Waiting for threads to die (count=%u)",thread_count));
+ if (wsrep_debug)
+ WSREP_INFO("waiting for client connections to close: %u", thread_count);
+
+ while (wait_to_end && have_client_connections())
+ {
+ mysql_cond_wait(&COND_thread_count, &LOCK_thread_count);
+ DBUG_PRINT("quit",("One thread died (count=%u)", thread_count));
+ }
+
+ kill_cached_threads= kill_cached_threads_saved;
+
+ mysql_mutex_unlock(&LOCK_thread_count);
+
+ /* All client connection threads have now been aborted */
+}
+
+void wsrep_close_applier(THD *thd)
+{
+ if (wsrep_debug)
+ WSREP_INFO("closing applier %ld", thd->thread_id);
+
+ wsrep_close_thread(thd);
+}
+
+static void wsrep_close_threads(THD *thd)
+{
+ THD *tmp;
+ mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
+
+ I_List_iterator<THD> it(threads);
+ while ((tmp=it++))
+ {
+ DBUG_PRINT("quit",("Informing thread %ld that it's time to die",
+ tmp->thread_id));
+ /* We skip slave threads & scheduler on this first loop through. */
+ if (tmp->wsrep_applier && tmp != thd)
+ {
+ if (wsrep_debug)
+ WSREP_INFO("closing wsrep thread %ld", tmp->thread_id);
+ wsrep_close_thread (tmp);
+
+ }
+ }
+
+ mysql_mutex_unlock(&LOCK_thread_count);
+}
+
+void wsrep_wait_appliers_close(THD *thd)
+{
+ /* Wait for wsrep appliers to gracefully exit */
+ mysql_mutex_lock(&LOCK_thread_count);
+ while (have_wsrep_appliers(thd) > 1)
+ // 1 is for rollbacker thread which needs to be killed explicitly.
+ // This gotta be fixed in a more elegant manner if we gonna have arbitrary
+ // number of non-applier wsrep threads.
+ {
+ mysql_cond_wait(&COND_thread_count,&LOCK_thread_count);
+ DBUG_PRINT("quit",("One applier died (count=%u)",thread_count));
+ }
+ mysql_mutex_unlock(&LOCK_thread_count);
+ /* Now kill remaining wsrep threads: rollbacker */
+ wsrep_close_threads (thd);
+ /* and wait for them to die */
+ mysql_mutex_lock(&LOCK_thread_count);
+ while (have_wsrep_appliers(thd) > 0)
+ {
+ mysql_cond_wait(&COND_thread_count,&LOCK_thread_count);
+ DBUG_PRINT("quit",("One thread died (count=%u)",thread_count));
+ }
+ mysql_mutex_unlock(&LOCK_thread_count);
+
+ /* All wsrep applier threads have now been aborted. However, if this thread
+ is also applier, we are still running...
+ */
+}
+
+void wsrep_kill_mysql(THD *thd)
+{
+ if (mysqld_server_started)
+ {
+ if (!shutdown_in_progress)
+ {
+ WSREP_INFO("starting shutdown");
+ kill_mysql();
+ }
+ }
+ else
+ {
+ unireg_abort(1);
+ }
+}
+#endif /* WITH_WSREP */
#if (defined(_WIN32) || defined(HAVE_SMEM)) && !defined(EMBEDDED_LIBRARY)
static void handle_connections_methods()
@@ -4873,6 +5456,33 @@ int mysqld_main(int argc, char **argv)
if (Events::init(opt_noacl || opt_bootstrap))
unireg_abort(1);
+/* WSREP AFTER */
+#ifdef WITH_WSREP
+ wsrep_SE_initialized();
+ if (opt_bootstrap)
+ {
+ /*! bootstrap wsrep init was taken care of above */
+ }
+ else if (wsrep_recovery)
+ {
+ select_thread_in_use= 0;
+ wsrep_recover();
+ unireg_abort(0);
+ }
+ else if (wsrep_init_first())
+ {
+ /*! in case of no SST wsrep waits in view handler callback */
+ wsrep_SE_init_grab();
+ wsrep_SE_init_done();
+ /*! in case of SST wsrep waits for wsrep->sst_received */
+ wsrep_sst_continue();
+ }
+ else
+ {
+ wsrep_init_startup (false);
+ }
+ wsrep_create_appliers(wsrep_slave_threads - 1);
+#endif /* WITH_WSREP */
if (opt_bootstrap)
{
select_thread_in_use= 0; // Allow 'kill' to work
@@ -4930,6 +5540,9 @@ int mysqld_main(int argc, char **argv)
#ifdef EXTRA_DEBUG2
sql_print_error("Before Lock_thread_count");
#endif
+#ifdef WITH_WSREP
+ WSREP_DEBUG("Before Lock_thread_count");
+#endif
mysql_mutex_lock(&LOCK_thread_count);
DBUG_PRINT("quit", ("Got thread_count mutex"));
select_thread_in_use=0; // For close_connections
@@ -5196,6 +5809,9 @@ static void bootstrap(MYSQL_FILE *file)
DBUG_ENTER("bootstrap");
THD *thd= new THD;
+#ifdef WITH_WSREP
+ thd->variables.wsrep_on= 0;
+#endif
thd->bootstrap=1;
my_net_init(&thd->net,(st_vio*) 0);
thd->max_client_packet_length= thd->net.max_packet;
@@ -5320,7 +5936,11 @@ void create_thread_to_handle_connection(THD *thd)
my_snprintf(error_message_buff, sizeof(error_message_buff),
ER_THD(thd, ER_CANT_CREATE_THREAD), error);
net_send_error(thd, ER_CANT_CREATE_THREAD, error_message_buff, NULL);
+#ifdef WITH_WSREP
+ close_connection(thd, ER_OUT_OF_RESOURCES ,0);
+#else
close_connection(thd, ER_OUT_OF_RESOURCES);
+#endif /* WITH_WSREP */
mysql_mutex_lock(&LOCK_thread_count);
delete thd;
mysql_mutex_unlock(&LOCK_thread_count);
@@ -5363,7 +5983,11 @@ static void create_new_thread(THD *thd)
mysql_mutex_unlock(&LOCK_connection_count);
DBUG_PRINT("error",("Too many connections"));
+ #ifdef WITH_WSREP
+ close_connection(thd, ER_CON_COUNT_ERROR, 1);
+#else
close_connection(thd, ER_CON_COUNT_ERROR);
+#endif /* WITH_WSREP */
statistic_increment(denied_connections, &LOCK_status);
delete thd;
DBUG_VOID_RETURN;
@@ -5744,7 +6368,11 @@ pthread_handler_t handle_connections_namedpipes(void *arg)
if (!(thd->net.vio= vio_new_win32pipe(hConnectedPipe)) ||
my_net_init(&thd->net, thd->net.vio))
{
+#ifdef WITH_WSREP
+ close_connection(thd, ER_OUT_OF_RESOURCES, 1);
+#else
close_connection(thd, ER_OUT_OF_RESOURCES);
+#endif
delete thd;
continue;
}
@@ -5939,7 +6567,11 @@ pthread_handler_t handle_connections_shared_memory(void *arg)
event_conn_closed)) ||
my_net_init(&thd->net, thd->net.vio))
{
+#ifdef WITH_WSREP
+ close_connection(thd, ER_OUT_OF_RESOURCES, 1);
+#else
close_connection(thd, ER_OUT_OF_RESOURCES);
+#endif
errmsg= 0;
goto errorconn;
}
@@ -7022,6 +7654,19 @@ SHOW_VAR status_vars[]= {
#ifdef ENABLED_PROFILING
{"Uptime_since_flush_status",(char*) &show_flushstatustime, SHOW_FUNC},
#endif
+#ifdef WITH_WSREP
+ {"wsrep_connected", (char*) &wsrep_connected, SHOW_BOOL},
+ {"wsrep_ready", (char*) &wsrep_ready, SHOW_BOOL},
+ {"wsrep_cluster_state_uuid", (char*) &wsrep_cluster_state_uuid,SHOW_CHAR_PTR},
+ {"wsrep_cluster_conf_id", (char*) &wsrep_cluster_conf_id, SHOW_LONGLONG},
+ {"wsrep_cluster_status", (char*) &wsrep_cluster_status, SHOW_CHAR_PTR},
+ {"wsrep_cluster_size", (char*) &wsrep_cluster_size, SHOW_LONG},
+ {"wsrep_local_index", (char*) &wsrep_local_index, SHOW_LONG},
+ {"wsrep_provider_name", (char*) &wsrep_provider_name, SHOW_CHAR_PTR},
+ {"wsrep_provider_version", (char*) &wsrep_provider_version, SHOW_CHAR_PTR},
+ {"wsrep_provider_vendor", (char*) &wsrep_provider_vendor, SHOW_CHAR_PTR},
+ {"wsrep", (char*) &wsrep_show_status, SHOW_FUNC},
+#endif
{NullS, NullS, SHOW_LONG}
};
@@ -7345,6 +7990,10 @@ static int mysql_init_variables(void)
tmpenv = DEFAULT_MYSQL_HOME;
(void) strmake(mysql_home, tmpenv, sizeof(mysql_home)-1);
#endif
+#ifdef WITH_WSREP
+ if (WSREP_ON && wsrep_init_vars())
+ return 1;
+#endif
return 0;
}
@@ -7620,6 +8269,23 @@ mysqld_get_one_option(int optid,
case OPT_LOWER_CASE_TABLE_NAMES:
lower_case_table_names_used= 1;
break;
+#ifdef WITH_WSREP
+ case OPT_WSREP_PROVIDER:
+ wsrep_provider_init (argument);
+ break;
+ case OPT_WSREP_PROVIDER_OPTIONS:
+ wsrep_provider_options_init (argument);
+ break;
+ case OPT_WSREP_CLUSTER_ADDRESS:
+ wsrep_cluster_address_init (argument);
+ break;
+ case OPT_WSREP_START_POSITION:
+ wsrep_start_position_init (argument);
+ break;
+ case OPT_WSREP_SST_AUTH:
+ wsrep_sst_auth_init (argument);
+ break;
+#endif
#if defined(ENABLED_DEBUG_SYNC)
case OPT_DEBUG_SYNC_TIMEOUT:
/*
diff --git a/sql/mysqld.h b/sql/mysqld.h
index 988584f779a..99093e1e83b 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -53,7 +53,11 @@ typedef Bitmap<((MAX_INDEXES+7)/8*8)> key_map; /* Used for finding keys */
some places */
/* Function prototypes */
void kill_mysql(void);
+#ifdef WITH_WSREP
+void close_connection(THD *thd, uint sql_errno= 0, bool lock=1);
+#else
void close_connection(THD *thd, uint sql_errno= 0);
+#endif
void handle_connection_in_main_thread(THD *thd);
void create_thread_to_handle_connection(THD *thd);
void unlink_thd(THD *thd);
@@ -219,6 +223,10 @@ extern pthread_key(MEM_ROOT**,THR_MALLOC);
extern PSI_mutex_key key_PAGE_lock, key_LOCK_sync, key_LOCK_active,
key_LOCK_pool;
#endif /* HAVE_MMAP */
+#ifdef WITH_WSREP
+extern PSI_mutex_key key_LOCK_wsrep_thd;
+extern PSI_cond_key key_COND_wsrep_thd;
+#endif /* HAVE_MMAP */
#ifdef HAVE_OPENSSL
extern PSI_mutex_key key_LOCK_des_key_file;
@@ -406,6 +414,14 @@ enum options_mysqld
OPT_WANT_CORE,
OPT_ENGINE_CONDITION_PUSHDOWN,
OPT_LOG_ERROR,
+#ifdef WITH_WSREP
+ OPT_WSREP_PROVIDER,
+ OPT_WSREP_PROVIDER_OPTIONS,
+ OPT_WSREP_CLUSTER_ADDRESS,
+ OPT_WSREP_START_POSITION,
+ OPT_WSREP_SST_AUTH,
+ OPT_WSREP_RECOVER,
+#endif
OPT_MAX_LONG_DATA_SIZE
};
#endif
@@ -554,4 +570,5 @@ extern uint internal_tmp_table_max_key_segments;
extern uint volatile global_disable_checkpoint;
extern my_bool opt_help;
+
#endif /* MYSQLD_INCLUDED */
diff --git a/sql/protocol.cc b/sql/protocol.cc
index 63b945f7078..4a7ea88c402 100644
--- a/sql/protocol.cc
+++ b/sql/protocol.cc
@@ -485,6 +485,14 @@ static uchar *net_store_length_fast(uchar *packet, uint length)
void Protocol::end_statement()
{
+#ifdef WITH_WSREP
+ /*sanity check, can be removed before 1.0 release */
+ if (WSREP(thd) && thd->wsrep_conflict_state== REPLAYING)
+ {
+ WSREP_ERROR("attempting net_end_statement while replaying");
+ return;
+ }
+#endif
DBUG_ENTER("Protocol::end_statement");
DBUG_ASSERT(! thd->stmt_da->is_sent);
bool error= FALSE;
diff --git a/sql/set_var.h b/sql/set_var.h
index c074f3f4399..85faaaadd73 100644
--- a/sql/set_var.h
+++ b/sql/set_var.h
@@ -235,6 +235,9 @@ public:
int check(THD *thd);
int update(THD *thd);
int light_check(THD *thd);
+#ifdef WITH_WSREP
+ int wsrep_store_variable(THD *thd);
+#endif
};
@@ -315,6 +318,9 @@ extern sys_var *Sys_autocommit_ptr;
CHARSET_INFO *get_old_charset_by_name(const char *old_name);
+#ifdef WITH_WSREP
+int sql_set_wsrep_variables(THD *thd, List<set_var_base> *var_list);
+#endif
int sys_var_init();
int sys_var_add_options(DYNAMIC_ARRAY *long_options, int parse_flags);
void sys_var_end(void);
diff --git a/sql/slave.cc b/sql/slave.cc
index 56f9c14703c..833820203b0 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -53,6 +53,9 @@
// Create_file_log_event,
// Format_description_log_event
+#ifdef WITH_WSREP
+#include "wsrep_mysqld.h"
+#endif
#ifdef HAVE_REPLICATION
#include "rpl_tblmap.h"
@@ -3487,6 +3490,11 @@ pthread_handler_t handle_slave_sql(void *arg)
#endif
DBUG_ASSERT(rli->sql_thd == thd);
+#ifdef WITH_WSREP
+ thd->wsrep_exec_mode= LOCAL_STATE;
+ /* synchronize with wsrep replication */
+ wsrep_ready_wait ();
+#endif
DBUG_PRINT("master_info",("log_file_name: %s position: %s",
rli->group_master_log_name,
llstr(rli->group_master_log_pos,llbuff)));
diff --git a/sql/sp.cc b/sql/sp.cc
index 29195234a5a..700a8c54fbf 100644
--- a/sql/sp.cc
+++ b/sql/sp.cc
@@ -2277,3 +2277,37 @@ sp_load_for_information_schema(THD *thd, TABLE *proc_table, String *db,
thd->lex= old_lex;
return sp;
}
+#ifdef WITH_WSREP
+int wsrep_create_sp(THD *thd, uchar** buf, uint* buf_len)
+{
+ String log_query;
+ sp_head *sp = thd->lex->sphead;
+ ulong saved_mode= thd->variables.sql_mode;
+ String retstr(64);
+ retstr.set_charset(system_charset_info);
+
+ log_query.set_charset(system_charset_info);
+
+ if (sp->m_type == TYPE_ENUM_FUNCTION)
+ {
+ sp_returns_type(thd, retstr, sp);
+ }
+
+ if (!create_string(thd, &log_query,
+ sp->m_type,
+ (sp->m_explicit_name ? sp->m_db.str : NULL),
+ (sp->m_explicit_name ? sp->m_db.length : 0),
+ sp->m_name.str, sp->m_name.length,
+ sp->m_params.str, sp->m_params.length,
+ retstr.c_ptr(), retstr.length(),
+ sp->m_body.str, sp->m_body.length,
+ sp->m_chistics, &(thd->lex->definer->user),
+ &(thd->lex->definer->host),
+ saved_mode))
+ {
+ WSREP_WARN("SP create string failed: %s", thd->query());
+ return 1;
+ }
+ return wsrep_to_buf_helper(thd, log_query.ptr(), log_query.length(), buf, buf_len);
+}
+#endif /* WITH_WSREP */
diff --git a/sql/sql_alter.cc b/sql/sql_alter.cc
index c6c02773286..c4468ee8793 100644
--- a/sql/sql_alter.cc
+++ b/sql/sql_alter.cc
@@ -17,7 +17,9 @@
#include "sql_table.h" // mysql_alter_table,
// mysql_exchange_partition
#include "sql_alter.h"
-
+#ifdef WITH_WSREP
+#include "wsrep_mysqld.h"
+#endif /* WITH_WSREP */
bool Alter_table_statement::execute(THD *thd)
{
LEX *lex= thd->lex;
@@ -97,6 +99,17 @@ bool Alter_table_statement::execute(THD *thd)
thd->enable_slow_log= opt_log_slow_admin_statements;
+#ifdef WITH_WSREP
+TABLE *find_temporary_table(THD *thd, const TABLE_LIST *tl);
+
+ if ((!thd->is_current_stmt_binlog_format_row() ||
+ !find_temporary_table(thd, first_table)) &&
+ wsrep_to_isolation_begin(thd, first_table->db, first_table->table_name))
+ {
+ WSREP_WARN("ALTER TABLE isolation failure");
+ DBUG_RETURN(TRUE);
+ }
+#endif /* WITH_WSREP */
result= mysql_alter_table(thd, select_lex->db, lex->name.str,
&create_info,
first_table,
@@ -105,5 +118,7 @@ bool Alter_table_statement::execute(THD *thd)
select_lex->order_list.first,
lex->ignore, lex->online);
+#ifdef WITH_WSREP
+#endif /* WITH_WSREP */
DBUG_RETURN(result);
}
diff --git a/sql/sql_base.cc b/sql/sql_base.cc
index 9298d3ccb72..d0404693405 100644
--- a/sql/sql_base.cc
+++ b/sql/sql_base.cc
@@ -60,6 +60,9 @@
#include <io.h>
#endif
+#ifdef WITH_WSREP
+#include "wsrep_mysqld.h"
+#endif // WITH_WSREP
bool
No_such_table_error_handler::handle_condition(THD *,
@@ -4218,7 +4221,7 @@ thr_lock_type read_lock_type_for_table(THD *thd,
*/
bool log_on= mysql_bin_log.is_open() && thd->variables.sql_log_bin;
ulong binlog_format= thd->variables.binlog_format;
- if ((log_on == FALSE) || (binlog_format == BINLOG_FORMAT_ROW) ||
+ if ((log_on == FALSE) || (WSREP_FORMAT(binlog_format) == BINLOG_FORMAT_ROW) ||
(table_list->table->s->table_category == TABLE_CATEGORY_LOG) ||
(table_list->table->s->table_category == TABLE_CATEGORY_PERFORMANCE) ||
!(is_update_query(prelocking_ctx->sql_command) ||
@@ -5075,7 +5078,14 @@ restart:
}
err:
+#ifdef WITH_WSREP
+ if (WSREP(thd))
+ thd_proc_info(thd, "exit open_tables()");
+ else
+ thd_proc_info(thd, 0);
+#else /* WITH_WSREP */
thd_proc_info(thd, 0);
+#endif /* WITH_WSREP */
free_root(&new_frm_mem, MYF(0)); // Free pre-alloced block
if (error && *table_to_open)
@@ -5518,7 +5528,14 @@ end:
trans_rollback_stmt(thd);
close_thread_tables(thd);
}
+#ifdef WITH_WSREP
+ if (WSREP(thd))
+ thd_proc_info(thd, "End opening table");
+ else
+ thd_proc_info(thd, 0);
+#else /* WITH_WSREP */
thd_proc_info(thd, 0);
+#endif /* WITH_WSREP */
DBUG_RETURN(table);
}
@@ -5738,7 +5755,7 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count,
We can solve these problems in mixed mode by switching to binlogging
if at least one updated table is used by sub-statement
*/
- if (thd->variables.binlog_format != BINLOG_FORMAT_ROW && tables &&
+ if (WSREP_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_ROW && tables &&
has_write_table_with_auto_increment(thd->lex->first_not_own_table()))
thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_AUTOINC_COLUMNS);
}
@@ -9169,7 +9186,19 @@ bool mysql_notify_thread_having_shared_lock(THD *thd, THD *in_use,
(e.g. see partitioning code).
*/
if (!thd_table->needs_reopen())
+#ifdef WITH_WSREP
+ {
+ signalled|= mysql_lock_abort_for_thread(thd, thd_table);
+ if (thd && WSREP(thd) && wsrep_thd_is_brute_force((void *)thd))
+ {
+ WSREP_DEBUG("remove_table_from_cache: %llu",
+ (unsigned long long) thd->real_id);
+ wsrep_abort_thd((void *)thd, (void *)in_use, FALSE);
+ }
+ }
+#else
signalled|= mysql_lock_abort_for_thread(thd, thd_table);
+#endif
}
mysql_mutex_unlock(&in_use->LOCK_thd_data);
}
diff --git a/sql/sql_builtin.cc.in b/sql/sql_builtin.cc.in
index 63850650ac9..2de475b0a76 100644
--- a/sql/sql_builtin.cc.in
+++ b/sql/sql_builtin.cc.in
@@ -25,7 +25,11 @@ extern
#endif
builtin_maria_plugin
@mysql_mandatory_plugins@ @mysql_optional_plugins@
- builtin_maria_binlog_plugin, builtin_maria_mysql_password_plugin;
+ builtin_maria_binlog_plugin,
+#ifdef WITH_WSREP
+ builtin_wsrep_plugin@mysql_plugin_defs@,
+#endif /* WITH_WSREP */
+ builtin_maria_mysql_password_plugin;
struct st_maria_plugin *mysql_optional_plugins[]=
{
@@ -35,5 +39,8 @@ struct st_maria_plugin *mysql_optional_plugins[]=
struct st_maria_plugin *mysql_mandatory_plugins[]=
{
builtin_maria_binlog_plugin, builtin_maria_mysql_password_plugin,
+#ifdef WITH_WSREP
+ builtin_wsrep_plugin@mysql_plugin_defs@,
+#endif /* WITH_WSREP */
@mysql_mandatory_plugins@ 0
};
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index d7d0c8d3f68..5b63201a910 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -62,6 +62,9 @@
#include "debug_sync.h"
#include "sql_parse.h" // is_update_query
#include "sql_callback.h"
+#ifdef WITH_WSREP
+#include "wsrep_mysqld.h"
+#endif
#include "sql_connect.h"
/*
@@ -695,6 +698,137 @@ char *thd_security_context(THD *thd, char *buffer, unsigned int length,
return buffer;
}
+#ifdef WITH_WSREP
+extern "C" int wsrep_on(void *thd)
+{
+ return (int)(WSREP(((THD*)thd)));
+}
+extern "C" bool wsrep_thd_is_wsrep_on(THD *thd)
+{
+ return thd->variables.wsrep_on;
+}
+
+extern "C" bool wsrep_consistency_check(void *thd)
+{
+ return ((THD*)thd)->wsrep_consistency_check;
+}
+
+extern "C" void wsrep_thd_set_exec_mode(THD *thd, enum wsrep_exec_mode mode)
+{
+ thd->wsrep_exec_mode= mode;
+}
+extern "C" void wsrep_thd_set_query_state(
+ THD *thd, enum wsrep_query_state state)
+{
+ thd->wsrep_query_state= state;
+}
+extern "C" void wsrep_thd_set_conflict_state(
+ THD *thd, enum wsrep_conflict_state state)
+{
+ thd->wsrep_conflict_state= state;
+}
+
+
+extern "C" enum wsrep_exec_mode wsrep_thd_exec_mode(THD *thd)
+{
+ return thd->wsrep_exec_mode;
+}
+extern "C" enum wsrep_query_state wsrep_thd_query_state(THD *thd)
+{
+ return thd->wsrep_query_state;
+}
+extern "C" enum wsrep_conflict_state wsrep_thd_conflict_state(THD *thd)
+{
+ return thd->wsrep_conflict_state;
+}
+
+extern "C" wsrep_trx_handle_t* wsrep_thd_trx_handle(THD *thd)
+{
+ return &thd->wsrep_trx_handle;
+}
+
+extern "C"void wsrep_thd_LOCK(THD *thd)
+{
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+}
+extern "C"void wsrep_thd_UNLOCK(THD *thd)
+{
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+}
+extern "C" time_t wsrep_thd_query_start(THD *thd)
+{
+ return thd->query_start();
+}
+extern "C" uint32 wsrep_thd_wsrep_rand(THD *thd)
+{
+ return thd->wsrep_rand;
+}
+extern "C" my_thread_id wsrep_thd_thread_id(THD *thd)
+{
+ return thd->thread_id;
+}
+extern "C" wsrep_seqno_t wsrep_thd_trx_seqno(THD *thd)
+{
+ return thd->wsrep_trx_seqno;
+}
+extern "C" query_id_t wsrep_thd_query_id(THD *thd)
+{
+ return thd->query_id;
+}
+extern "C" char *wsrep_thd_query(THD *thd)
+{
+ return thd->query();
+}
+extern "C" query_id_t wsrep_thd_wsrep_last_query_id(THD *thd)
+{
+ return thd->wsrep_last_query_id;
+}
+extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id)
+{
+ thd->wsrep_last_query_id= id;
+}
+extern "C" void wsrep_thd_awake(THD *thd, my_bool signal)
+{
+ if (signal)
+ {
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ thd->awake(KILL_QUERY);
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ }
+ else
+ {
+ mysql_mutex_lock(&LOCK_wsrep_replaying);
+ mysql_cond_broadcast(&COND_wsrep_replaying);
+ mysql_mutex_unlock(&LOCK_wsrep_replaying);
+ }
+}
+
+extern "C" int
+wsrep_trx_order_before(void *thd1, void *thd2)
+{
+ if (((THD*)thd1)->wsrep_trx_seqno < ((THD*)thd2)->wsrep_trx_seqno) {
+ WSREP_DEBUG("BF conflict, order: %lld %lld\n",
+ (long long)((THD*)thd1)->wsrep_trx_seqno,
+ (long long)((THD*)thd2)->wsrep_trx_seqno);
+ return 1;
+ }
+ WSREP_DEBUG("waiting for BF, trx order: %lld %lld\n",
+ (long long)((THD*)thd1)->wsrep_trx_seqno,
+ (long long)((THD*)thd2)->wsrep_trx_seqno);
+ return 0;
+}
+extern "C" int
+wsrep_trx_is_aborting(void *thd_ptr)
+{
+ if (thd_ptr) {
+ if ((((THD *)thd_ptr)->wsrep_conflict_state == MUST_ABORT) ||
+ (((THD *)thd_ptr)->wsrep_conflict_state == ABORTING)) {
+ return 1;
+ }
+ }
+ return 0;
+}
+#endif
/**
Implementation of Drop_table_error_handler::handle_condition().
@@ -723,7 +857,11 @@ bool Drop_table_error_handler::handle_condition(THD *thd,
}
+#ifdef WITH_WSREP
+THD::THD(bool is_applier)
+#else
THD::THD()
+#endif
:Statement(&main_lex, &main_mem_root, STMT_CONVENTIONAL_EXECUTION,
/* statement id */ 0),
rli_fake(0),
@@ -750,6 +888,10 @@ THD::THD()
bootstrap(0),
derived_tables_processing(FALSE),
spcont(NULL),
+#ifdef WITH_WSREP
+ wsrep_applier(is_applier),
+ wsrep_client_thread(0),
+#endif
m_parser_state(NULL),
#if defined(ENABLED_DEBUG_SYNC)
debug_sync_control(0),
@@ -849,6 +991,20 @@ THD::THD()
command=COM_CONNECT;
*scramble= '\0';
+#ifdef WITH_WSREP
+ mysql_mutex_init(key_LOCK_wsrep_thd, &LOCK_wsrep_thd, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_COND_wsrep_thd, &COND_wsrep_thd, NULL);
+ wsrep_trx_handle.trx_id= -1;
+ wsrep_trx_handle.opaque= NULL;
+ //wsrep_retry_autocommit= ::wsrep_retry_autocommit;
+ wsrep_retry_counter= 0;
+ wsrep_PA_safe = true;
+ wsrep_seqno_changed= false;
+ wsrep_retry_query = NULL;
+ wsrep_retry_query_len = 0;
+ wsrep_retry_command = COM_CONNECT;
+ wsrep_consistency_check = false;
+#endif
/* Call to init() below requires fully initialized Open_tables_state. */
reset_open_tables_state(this);
@@ -881,6 +1037,13 @@ THD::THD()
my_rnd_init(&rand, tmp + (ulong) &rand, tmp + (ulong) ::global_query_id);
substitute_null_with_insert_id = FALSE;
thr_lock_info_init(&lock_info); /* safety: will be reset after start */
+#ifdef WITH_WSREP
+ lock_info.mysql_thd= (void *)this;
+ lock_info.in_lock_tables= false;
+#ifdef WSREP_PROC_INFO
+ wsrep_info[sizeof(wsrep_info) - 1] = '\0'; /* make sure it is 0-terminated */
+#endif /* WSREP_PROC_INFO */
+#endif /* WITH_WSREP */
m_internal_handler= NULL;
m_binlog_invoker= FALSE;
@@ -1182,7 +1345,19 @@ void THD::init(void)
reset_current_stmt_binlog_format_row();
bzero((char *) &status_var, sizeof(status_var));
bzero((char *) &org_status_var, sizeof(org_status_var));
-
+#ifdef WITH_WSREP
+ wsrep_exec_mode= wsrep_applier ? REPL_RECV : LOCAL_STATE;
+ wsrep_conflict_state= NO_CONFLICT;
+ wsrep_query_state= QUERY_IDLE;
+ wsrep_last_query_id= 0;
+ wsrep_trx_seqno= 0;
+ wsrep_converted_lock_session= false;
+ wsrep_retry_counter= 0;
+ wsrep_rli= NULL;
+ wsrep_PA_safe= true;
+ wsrep_seqno_changed= false;
+ wsrep_consistency_check = false;
+#endif
if (variables.sql_log_bin)
variables.option_bits|= OPTION_BIN_LOG;
else
@@ -1376,6 +1551,12 @@ THD::~THD()
mysql_mutex_unlock(&LOCK_thd_data);
add_to_status(&global_status_var, &status_var);
+#ifdef WITH_WSREP
+ mysql_mutex_lock(&LOCK_wsrep_thd);
+ mysql_mutex_unlock(&LOCK_wsrep_thd);
+ mysql_mutex_destroy(&LOCK_wsrep_thd);
+ if (wsrep_rli) delete wsrep_rli;
+#endif
/* Close connection */
#ifndef EMBEDDED_LIBRARY
if (net.vio)
@@ -1790,6 +1971,13 @@ void THD::cleanup_after_query()
/* reset table map for multi-table update */
table_map_for_update= 0;
m_binlog_invoker= FALSE;
+#ifdef WITH_WSREP
+ if (TOTAL_ORDER == wsrep_exec_mode)
+ {
+ wsrep_exec_mode = LOCAL_STATE;
+ }
+ //wsrep_trx_seqno = 0;
+#endif /* WITH_WSREP */
DBUG_VOID_RETURN;
}
@@ -2205,6 +2393,13 @@ bool sql_exchange::escaped_given(void)
bool select_send::send_result_set_metadata(List<Item> &list, uint flags)
{
bool res;
+#ifdef WITH_WSREP
+ if (WSREP(thd) && thd->wsrep_retry_query)
+ {
+ WSREP_DEBUG("skipping select metadata");
+ return FALSE;
+ }
+#endif /* WITH_WSREP */
if (!(res= thd->protocol->send_result_set_metadata(&list, flags)))
is_result_set_started= 1;
return res;
@@ -3911,8 +4106,13 @@ extern "C" int thd_non_transactional_update(const MYSQL_THD thd)
extern "C" int thd_binlog_format(const MYSQL_THD thd)
{
+#ifdef WITH_WSREP
+ if (((WSREP(thd) && wsrep_emulate_bin_log) || mysql_bin_log.is_open()) &&
+ (thd->variables.option_bits & OPTION_BIN_LOG))
+#else
if (mysql_bin_log.is_open() && (thd->variables.option_bits & OPTION_BIN_LOG))
- return (int) thd->variables.binlog_format;
+#endif
+ return (int) WSREP_FORMAT(thd->variables.binlog_format);
else
return BINLOG_FORMAT_UNSPEC;
}
@@ -4467,7 +4667,7 @@ int THD::decide_logging_format(TABLE_LIST *tables)
binlog by filtering rules.
*/
if (mysql_bin_log.is_open() && (variables.option_bits & OPTION_BIN_LOG) &&
- !(variables.binlog_format == BINLOG_FORMAT_STMT &&
+ !(WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_STMT &&
!binlog_filter->db_ok(db)))
{
/*
@@ -4631,7 +4831,7 @@ int THD::decide_logging_format(TABLE_LIST *tables)
*/
my_error((error= ER_BINLOG_ROW_INJECTION_AND_STMT_ENGINE), MYF(0));
}
- else if (variables.binlog_format == BINLOG_FORMAT_ROW &&
+ else if (WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_ROW &&
sqlcom_can_generate_row_events(this))
{
/*
@@ -4660,7 +4860,7 @@ int THD::decide_logging_format(TABLE_LIST *tables)
else
{
/* binlog_format = STATEMENT */
- if (variables.binlog_format == BINLOG_FORMAT_STMT)
+ if (WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_STMT)
{
if (lex->is_stmt_row_injection())
{
@@ -4677,7 +4877,14 @@ int THD::decide_logging_format(TABLE_LIST *tables)
5. Error: Cannot modify table that uses a storage engine
limited to row-logging when binlog_format = STATEMENT
*/
+#ifdef WITH_WSREP
+ if (!WSREP(this) || wsrep_exec_mode == LOCAL_STATE)
+ {
+#endif /* WITH_WSREP */
my_error((error= ER_BINLOG_STMT_MODE_AND_ROW_ENGINE), MYF(0), "");
+#ifdef WITH_WSREP
+ }
+#endif /* WITH_WSREP */
}
else if (is_write && (unsafe_flags= lex->get_stmt_unsafe_flags()) != 0)
{
@@ -4725,7 +4932,7 @@ int THD::decide_logging_format(TABLE_LIST *tables)
"and binlog_filter->db_ok(db) = %d",
mysql_bin_log.is_open(),
(variables.option_bits & OPTION_BIN_LOG),
- variables.binlog_format,
+ WSREP_FORMAT(variables.binlog_format),
binlog_filter->db_ok(db)));
#endif
@@ -4979,7 +5186,13 @@ int THD::binlog_write_row(TABLE* table, bool is_trans,
MY_BITMAP const* cols, size_t colcnt,
uchar const *record)
{
+#ifdef WITH_WSREP
+ DBUG_ASSERT(is_current_stmt_binlog_format_row() &&
+ ((WSREP(this) && wsrep_emulate_bin_log) ||
+ mysql_bin_log.is_open()));
+#else
DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open());
+#endif
/*
Pack records into format for transfer. We are allocating more
@@ -5009,7 +5222,13 @@ int THD::binlog_update_row(TABLE* table, bool is_trans,
const uchar *before_record,
const uchar *after_record)
{
+#ifdef WITH_WSREP
+ DBUG_ASSERT(is_current_stmt_binlog_format_row() &&
+ ((WSREP(this) && wsrep_emulate_bin_log)
+ || mysql_bin_log.is_open()));
+#else
DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open());
+#endif
size_t const before_maxlen = max_row_length(table, before_record);
size_t const after_maxlen = max_row_length(table, after_record);
@@ -5054,7 +5273,13 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans,
MY_BITMAP const* cols, size_t colcnt,
uchar const *record)
{
+#ifdef WITH_WSREP
+ DBUG_ASSERT(is_current_stmt_binlog_format_row() &&
+ ((WSREP(this) && wsrep_emulate_bin_log)
+ || mysql_bin_log.is_open()));
+#else
DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open());
+#endif
/*
Pack records into format for transfer. We are allocating more
@@ -5085,7 +5310,11 @@ int THD::binlog_remove_pending_rows_event(bool clear_maps,
{
DBUG_ENTER("THD::binlog_remove_pending_rows_event");
+#ifdef WITH_WSREP
+ if (!(WSREP_EMULATE_BINLOG(this) || mysql_bin_log.is_open()))
+#else
if (!mysql_bin_log.is_open())
+#endif
DBUG_RETURN(0);
mysql_bin_log.remove_pending_rows_event(this, is_transactional);
@@ -5104,7 +5333,11 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional)
mode: it might be the case that we left row-based mode before
flushing anything (e.g., if we have explicitly locked tables).
*/
+#ifdef WITH_WSREP
+ if (!(WSREP_EMULATE_BINLOG(this) || mysql_bin_log.is_open()))
+#else
if (!mysql_bin_log.is_open())
+#endif
DBUG_RETURN(0);
/*
@@ -5224,8 +5457,12 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg,
DBUG_ENTER("THD::binlog_query");
DBUG_PRINT("enter", ("qtype: %s query: '%-.*s'",
show_query_type(qtype), (int) query_len, query_arg));
+#ifdef WITH_WSREP
+ DBUG_ASSERT(query_arg && (WSREP_EMULATE_BINLOG(this)
+ || mysql_bin_log.is_open()));
+#else
DBUG_ASSERT(query_arg && mysql_bin_log.is_open());
-
+#endif
/*
If we are not in prelocked mode, mysql_unlock_tables() will be
called after this binlog_query(), so we have to flush the pending
diff --git a/sql/sql_class.h b/sql/sql_class.h
index c6c46975076..96a67fccaef 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -20,6 +20,32 @@
#define SQL_CLASS_INCLUDED
/* Classes in mysql */
+#ifdef WITH_WSREP
+#include "../wsrep/wsrep_api.h"
+//#include "wsrep_mysqld.h"
+ enum wsrep_exec_mode {
+ LOCAL_STATE,
+ REPL_RECV,
+ TOTAL_ORDER,
+ LOCAL_COMMIT,
+ };
+ enum wsrep_query_state {
+ QUERY_IDLE,
+ QUERY_EXEC,
+ QUERY_COMMITTING,
+ QUERY_EXITING,
+ QUERY_ROLLINGBACK,
+ };
+ enum wsrep_conflict_state {
+ NO_CONFLICT,
+ MUST_ABORT,
+ ABORTING,
+ ABORTED,
+ MUST_REPLAY,
+ REPLAYING,
+ RETRY_AUTOCOMMIT,
+ };
+#endif
#ifdef USE_PRAGMA_INTERFACE
#pragma interface /* gcc class implementation */
@@ -45,6 +71,15 @@
THR_LOCK_INFO */
+#ifdef WITH_WSREP
+#include "wsrep_mysqld.h"
+struct wsrep_thd_shadow {
+ ulonglong options;
+ enum wsrep_exec_mode wsrep_exec_mode;
+ Vio *vio;
+ ulong tx_isolation;
+};
+#endif
class Reprepare_observer;
class Relay_log_info;
@@ -572,6 +607,11 @@ typedef struct system_variables
ulong wt_timeout_short, wt_deadlock_search_depth_short;
ulong wt_timeout_long, wt_deadlock_search_depth_long;
+#ifdef WITH_WSREP
+ my_bool wsrep_on;
+ my_bool wsrep_causal_reads;
+ ulong wsrep_retry_autocommit;
+#endif
double long_query_time_double;
} SV;
@@ -965,6 +1005,9 @@ struct st_savepoint {
/** State of metadata locks before this savepoint was set. */
MDL_savepoint mdl_savepoint;
};
+#ifdef WITH_WSREP
+void wsrep_cleanup_transaction(THD *thd); // THD.transactions.cleanup calls it
+#endif
enum xa_states {XA_NOTR=0, XA_ACTIVE, XA_IDLE, XA_PREPARED, XA_ROLLBACK_ONLY};
extern const char *xa_state_names[];
@@ -1781,7 +1824,7 @@ public:
int is_current_stmt_binlog_format_row() const {
DBUG_ASSERT(current_stmt_binlog_format == BINLOG_FORMAT_STMT ||
current_stmt_binlog_format == BINLOG_FORMAT_ROW);
- return current_stmt_binlog_format == BINLOG_FORMAT_ROW;
+ return (WSREP_FORMAT((ulong)current_stmt_binlog_format) == BINLOG_FORMAT_ROW);
}
private:
@@ -1839,7 +1882,11 @@ public:
*/
CHANGED_TABLE_LIST* changed_tables;
MEM_ROOT mem_root; // Transaction-life memory allocation pool
+#ifdef WITH_WSREP
+ void cleanup(THD *thd)
+#else
void cleanup()
+#endif
{
changed_tables= 0;
savepoints= 0;
@@ -1852,6 +1899,11 @@ public:
if (!xid_state.rm_error)
xid_state.xid.null();
free_root(&mem_root,MYF(MY_KEEP_PREALLOC));
+#ifdef WITH_WSREP
+ // Todo: convert into a plugin method
+ // wsrep's post-commit. LOCAL_COMMIT designates wsrep's commit was ok
+ if (WSREP(thd)) wsrep_cleanup_transaction(thd);
+#endif /* WITH_WSREP */
}
my_bool is_active()
{
@@ -2314,6 +2366,31 @@ public:
query_id_t first_query_id;
} binlog_evt_union;
+#ifdef WITH_WSREP
+ const bool wsrep_applier; /* dedicated slave applier thread */
+ bool wsrep_client_thread; /* to identify client threads*/
+ enum wsrep_exec_mode wsrep_exec_mode;
+ query_id_t wsrep_last_query_id;
+ enum wsrep_query_state wsrep_query_state;
+ enum wsrep_conflict_state wsrep_conflict_state;
+ mysql_mutex_t LOCK_wsrep_thd;
+ mysql_cond_t COND_wsrep_thd;
+ wsrep_seqno_t wsrep_trx_seqno;
+ uint32 wsrep_rand;
+ Relay_log_info* wsrep_rli;
+ bool wsrep_converted_lock_session;
+ wsrep_trx_handle_t wsrep_trx_handle;
+ bool wsrep_seqno_changed;
+#ifdef WSREP_PROC_INFO
+ char wsrep_info[128]; /* string for dynamic proc info */
+#endif /* WSREP_PROC_INFO */
+ ulong wsrep_retry_counter; // of autocommit
+ bool wsrep_PA_safe;
+ char* wsrep_retry_query;
+ size_t wsrep_retry_query_len;
+ enum enum_server_command wsrep_retry_command;
+ bool wsrep_consistency_check;
+#endif /* WITH_WSREP */
/**
Internal parser state.
Note that since the parser is not re-entrant, we keep only one parser
@@ -2345,7 +2422,11 @@ public:
/* Debug Sync facility. See debug_sync.cc. */
struct st_debug_sync_control *debug_sync_control;
#endif /* defined(ENABLED_DEBUG_SYNC) */
+#ifdef WITH_WSREP
+ THD(bool is_applier = false);
+#else
THD();
+#endif
~THD();
void init(void);
@@ -2741,7 +2822,7 @@ public:
tests fail and so force them to propagate the
lex->binlog_row_based_if_mixed upwards to the caller.
*/
- if ((variables.binlog_format == BINLOG_FORMAT_MIXED) &&
+ if ((WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_MIXED) &&
(in_sub_stmt == 0))
set_current_stmt_binlog_format_row();
@@ -2783,7 +2864,7 @@ public:
show_system_thread(system_thread)));
if (in_sub_stmt == 0)
{
- if (variables.binlog_format == BINLOG_FORMAT_ROW)
+ if (WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_ROW)
set_current_stmt_binlog_format_row();
else if (temporary_tables == NULL)
clear_current_stmt_binlog_format_row();
diff --git a/sql/sql_connect.cc b/sql/sql_connect.cc
index 7157b1b109f..e96068484eb 100644
--- a/sql/sql_connect.cc
+++ b/sql/sql_connect.cc
@@ -44,6 +44,9 @@ HASH global_index_stats;
extern mysql_mutex_t LOCK_global_user_client_stats;
extern mysql_mutex_t LOCK_global_table_stats;
extern mysql_mutex_t LOCK_global_index_stats;
+#ifdef WITH_WSREP
+#include "wsrep_mysqld.h"
+#endif
/*
Get structure for logging connection data for the current user
@@ -97,6 +100,9 @@ int get_or_create_user_conn(THD *thd, const char *user,
}
thd->user_connect=uc;
uc->connections++;
+#ifdef WITH_WSREP
+ thd->wsrep_client_thread= 1;
+#endif /* WITH_WSREP */
end:
mysql_mutex_unlock(&LOCK_user_conn);
return return_val;
@@ -977,7 +983,11 @@ bool setup_connection_thread_globals(THD *thd)
{
if (thd->store_globals())
{
+#ifdef WITH_WSREP
+ close_connection(thd, ER_OUT_OF_RESOURCES, 1);
+#else
close_connection(thd, ER_OUT_OF_RESOURCES);
+#endif
statistic_increment(aborted_connects,&LOCK_status);
MYSQL_CALLBACK(thd->scheduler, end_thread, (thd, 0));
return 1; // Error
@@ -1050,6 +1060,17 @@ bool login_connection(THD *thd)
void end_connection(THD *thd)
{
NET *net= &thd->net;
+#ifdef WITH_WSREP
+ if (WSREP(thd))
+ {
+ wsrep_status_t rcode= wsrep->free_connection(wsrep, thd->thread_id);
+ if (rcode) {
+ WSREP_WARN("wsrep failed to free connection context: %lu, code: %d",
+ thd->thread_id, rcode);
+ }
+ }
+ thd->wsrep_client_thread= 0;
+#endif
plugin_thdvar_cleanup(thd);
if (thd->user_connect)
@@ -1205,7 +1226,11 @@ void do_handle_one_connection(THD *thd_arg)
if (MYSQL_CALLBACK_ELSE(thd->scheduler, init_new_connection_thread, (), 0))
{
+#ifdef WITH_WSREP
+ close_connection(thd, ER_OUT_OF_RESOURCES, 1);
+#else
close_connection(thd, ER_OUT_OF_RESOURCES);
+#endif
statistic_increment(aborted_connects,&LOCK_status);
MYSQL_CALLBACK(thd->scheduler, end_thread, (thd, 0));
return;
@@ -1254,9 +1279,21 @@ void do_handle_one_connection(THD *thd_arg)
break;
}
end_connection(thd);
-
+
+#ifdef WITH_WSREP
+ if (WSREP(thd))
+ {
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ thd->wsrep_query_state= QUERY_EXITING;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ }
+#endif
end_thread:
+#ifdef WITH_WSREP
+ close_connection(thd, 0, 1);
+#else
close_connection(thd);
+#endif
if (thd->userstat_running)
update_global_user_stats(thd, create_user, time(NULL));
diff --git a/sql/sql_delete.cc b/sql/sql_delete.cc
index ff88bf7c0f8..7fc623daf88 100644
--- a/sql/sql_delete.cc
+++ b/sql/sql_delete.cc
@@ -410,7 +410,11 @@ cleanup:
/* See similar binlogging code in sql_update.cc, for comments */
if ((error < 0) || thd->transaction.stmt.modified_non_trans_table)
{
+#ifdef WITH_WSREP
+ if ((WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()))
+#else
if (mysql_bin_log.is_open())
+#endif
{
int errcode= 0;
if (error < 0)
@@ -861,7 +865,11 @@ void multi_delete::abort_result_set()
/*
there is only side effects; to binlog with the error
*/
+#ifdef WITH_WSREP
+ if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
+#else
if (mysql_bin_log.is_open())
+#endif
{
int errcode= query_error_code(thd, thd->killed == NOT_KILLED);
/* possible error of writing binary log is ignored deliberately */
@@ -1037,7 +1045,11 @@ bool multi_delete::send_eof()
}
if ((local_error == 0) || thd->transaction.stmt.modified_non_trans_table)
{
+#ifdef WITH_WSREP
+ if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
+#else
if (mysql_bin_log.is_open())
+#endif
{
int errcode= 0;
if (local_error == 0)
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc
index 54f94ce78c1..a4eb1e8996f 100644
--- a/sql/sql_insert.cc
+++ b/sql/sql_insert.cc
@@ -1019,7 +1019,11 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
thd->transaction.stmt.modified_non_trans_table ||
was_insert_delayed)
{
+#ifdef WITH_WSREP
+ if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
+#else
if (mysql_bin_log.is_open())
+#endif
{
int errcode= 0;
if (error <= 0)
@@ -3113,6 +3117,11 @@ bool Delayed_insert::handle_inserts(void)
mysql_cond_broadcast(&cond_client); // If waiting clients
}
}
+#ifdef WITH_WSREP
+ if (WSREP((&thd)))
+ thd_proc_info(&thd, "insert done");
+ else
+#endif /* WITH_WSREP */
thd_proc_info(&thd, 0);
mysql_mutex_unlock(&mutex);
@@ -3597,8 +3606,13 @@ bool select_insert::send_eof()
events are in the transaction cache and will be written when
ha_autocommit_or_rollback() is issued below.
*/
+#ifdef WITH_WSREP
+ if ((WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) &&
+ (!error || thd->transaction.stmt.modified_non_trans_table))
+#else
if (mysql_bin_log.is_open() &&
(!error || thd->transaction.stmt.modified_non_trans_table))
+#endif
{
int errcode= 0;
if (!error)
@@ -3681,7 +3695,11 @@ void select_insert::abort_result_set() {
if (!can_rollback_data())
thd->transaction.all.modified_non_trans_table= TRUE;
+#ifdef WITH_WSREP
+ if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
+#else
if (mysql_bin_log.is_open())
+#endif
{
int errcode= query_error_code(thd, thd->killed == NOT_KILLED);
/* error of writing binary log is ignored */
@@ -4072,7 +4090,11 @@ select_create::binlog_show_create_table(TABLE **tables, uint count)
/* show_database */ TRUE);
DBUG_ASSERT(result == 0); /* store_create_info() always return 0 */
+#ifdef WITH_WSREP
+ if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
+#else
if (mysql_bin_log.is_open())
+#endif
{
int errcode= query_error_code(thd, thd->killed == NOT_KILLED);
result= thd->binlog_query(THD::STMT_QUERY_TYPE,
diff --git a/sql/sql_lex.cc b/sql/sql_lex.cc
index b2dfae5ded4..070bcdfcfd4 100644
--- a/sql/sql_lex.cc
+++ b/sql/sql_lex.cc
@@ -1548,6 +1548,17 @@ int lex_one_token(void *arg, void *yythd)
}
else
{
+#ifdef WITH_WSREP
+ if (version == 99997 && thd->wsrep_exec_mode == LOCAL_STATE)
+ {
+ WSREP_DEBUG("consistency check: %s", thd->query());
+ thd->wsrep_consistency_check= TRUE;
+ lip->yySkipn(5);
+ lip->set_echo(TRUE);
+ state=MY_LEX_START;
+ break; /* Do not treat contents as a comment. */
+ }
+#endif /* WITH_WSREP */
/*
Patch and skip the conditional comment to avoid it
being propagated infinitely (eg. to a slave).
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index e0b2acd199d..a955b434319 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -103,6 +103,24 @@
#include "../storage/maria/ha_maria.h"
#endif
+#ifdef WITH_WSREP
+#include "wsrep_mysqld.h"
+#include "rpl_rli.h"
+
+extern Format_description_log_event *wsrep_format_desc;
+#define WSREP_MYSQL_DB (char *)"mysql"
+
+#define WSREP_TO_ISOLATION_BEGIN(db_, table_) \
+ if (WSREP(thd) && wsrep_to_isolation_begin(thd, db_, table_)) goto error;
+
+#define WSREP_TO_ISOLATION_END \
+ if (WSREP(thd) || (thd && thd->wsrep_exec_mode==TOTAL_ORDER)) \
+ wsrep_to_isolation_end(thd);
+
+#else
+#define WSREP_TO_ISOLATION_BEGIN(db_, table_)
+#define WSREP_TO_ISOLATION_END
+#endif /* WITH_WSREP */
/**
@defgroup Runtime_Environment Runtime Environment
@{
@@ -436,6 +454,13 @@ bool is_log_table_write_query(enum enum_sql_command command)
return (sql_command_flags[command] & CF_WRITE_LOGS_COMMAND) != 0;
}
+#ifdef WITH_WSREP
+bool is_show_query(enum enum_sql_command command)
+{
+ DBUG_ASSERT(command >= 0 && command <= SQLCOM_END);
+ return (sql_command_flags[command] & CF_STATUS_COMMAND) != 0;
+}
+#endif
void execute_init_command(THD *thd, LEX_STRING *init_command,
mysql_rwlock_t *var_lock)
{
@@ -617,8 +642,12 @@ void do_handle_bootstrap(THD *thd)
if (my_thread_init() || thd->store_globals())
{
#ifndef EMBEDDED_LIBRARY
+#ifdef WITH_WSREP
+ close_connection(thd, ER_OUT_OF_RESOURCES, 1);
+#else
close_connection(thd, ER_OUT_OF_RESOURCES);
#endif
+#endif
thd->fatal_error();
goto end;
}
@@ -692,7 +721,26 @@ bool do_command(THD *thd)
NET *net= &thd->net;
enum enum_server_command command;
DBUG_ENTER("do_command");
+#ifdef WITH_WSREP
+ if (WSREP(thd))
+ {
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ thd->wsrep_query_state= QUERY_IDLE;
+ if (thd->wsrep_conflict_state==MUST_ABORT)
+ {
+ thd->wsrep_conflict_state= ABORTING;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ trans_rollback(thd);
+ thd->locked_tables_list.unlock_locked_tables(thd);
+ /* Release transactional metadata locks. */
+ thd->mdl_context.release_transactional_locks();
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ thd->wsrep_conflict_state= ABORTED;
+ }
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ }
+#endif
/*
indicator of uninitialized lex => normal flow of errors handling
(see my_message_sql)
@@ -738,11 +786,57 @@ bool do_command(THD *thd)
*/
DEBUG_SYNC(thd, "before_do_command_net_read");
+#ifdef WITH_WSREP
+ if (WSREP(thd)) {
+ packet_length= my_net_read(net);
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ /* these THD's are aborted or are aborting during being idle */
+ if (thd->wsrep_conflict_state == ABORTING)
+ {
+ while (thd->wsrep_conflict_state == ABORTING) {
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ my_sleep(1000);
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ }
+ thd->store_globals();
+ }
+ else if (thd->wsrep_conflict_state == ABORTED)
+ {
+ thd->store_globals();
+ }
+
+ thd->wsrep_query_state= QUERY_EXEC;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ }
+ if ((WSREP(thd) && packet_length == packet_error) ||
+ (!WSREP(thd) && (packet_length= my_net_read(net)) == packet_error))
+#else
if ((packet_length= my_net_read(net)) == packet_error)
+#endif
{
DBUG_PRINT("info",("Got error %d reading command from socket %s",
net->error,
vio_description(net->vio)));
+#ifdef WITH_WSREP
+ if (WSREP(thd)) {
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ if (thd->wsrep_conflict_state == MUST_ABORT)
+ {
+ DBUG_PRINT("wsrep",("aborted for wsrep rollback: %lu", thd->real_id));
+ thd->wsrep_conflict_state= ABORTING;
+
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ trans_rollback(thd);
+ thd->locked_tables_list.unlock_locked_tables(thd);
+ /* Release transactional metadata locks. */
+ thd->mdl_context.release_transactional_locks();
+
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ thd->wsrep_conflict_state= ABORTED;
+ }
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ }
+#endif
/* Check if we can continue without closing the connection */
@@ -788,12 +882,54 @@ bool do_command(THD *thd)
vio_description(net->vio), command,
command_name[command].str));
+#ifdef WITH_WSREP
+ if (WSREP(thd)) {
+ /*
+ * bail out if DB snapshot has not been installed. We however,
+ * allow queries "SET" and "SHOW", they are trapped later in execute_command
+ */
+ if (thd->variables.wsrep_on && !thd->wsrep_applier && !wsrep_ready &&
+ command != COM_QUERY &&
+ command != COM_PING &&
+ command != COM_QUIT &&
+ command != COM_PROCESS_INFO &&
+ command != COM_PROCESS_KILL &&
+ command != COM_SET_OPTION &&
+ command != COM_SHUTDOWN &&
+ command != COM_SLEEP &&
+ command != COM_STATISTICS &&
+ command != COM_TIME &&
+ command != COM_END
+ ) {
+ my_error(ER_UNKNOWN_COM_ERROR, MYF(0),
+ "WSREP has not yet prepared node for application use");
+ thd->protocol->end_statement();
+ return_value= FALSE;
+ goto out;
+ }
+ }
+#endif
/* Restore read timeout value */
my_net_set_read_timeout(net, thd->variables.net_read_timeout);
DBUG_ASSERT(packet_length);
return_value= dispatch_command(command, thd, packet+1, (uint) (packet_length-1));
-
+#ifdef WITH_WSREP
+ if (WSREP(thd)) {
+ while (thd->wsrep_conflict_state== RETRY_AUTOCOMMIT)
+ {
+ return_value= dispatch_command(command, thd, thd->wsrep_retry_query,
+ thd->wsrep_retry_query_len);
+ }
+ }
+ if (thd->wsrep_retry_query)
+ {
+ my_free(thd->wsrep_retry_query);
+ thd->wsrep_retry_query = NULL;
+ thd->wsrep_retry_query_len = 0;
+ thd->wsrep_retry_command = COM_CONNECT;
+ }
+#endif
out:
DBUG_RETURN(return_value);
}
@@ -867,6 +1003,34 @@ static my_bool deny_updates_if_read_only_option(THD *thd,
DBUG_RETURN(FALSE);
}
+#ifdef WITH_WSREP
+static my_bool wsrep_read_only_option(THD *thd, TABLE_LIST *all_tables)
+{
+ int opt_readonly_saved = opt_readonly;
+ ulong flag_saved = (ulong)(thd->security_ctx->master_access & SUPER_ACL);
+
+ opt_readonly = 0;
+ thd->security_ctx->master_access &= ~SUPER_ACL;
+
+ my_bool ret = !deny_updates_if_read_only_option(thd, all_tables);
+
+ opt_readonly = opt_readonly_saved;
+ thd->security_ctx->master_access |= flag_saved;
+
+ return ret;
+}
+
+static void wsrep_copy_query(THD *thd)
+{
+ thd->wsrep_retry_command = thd->command;
+ thd->wsrep_retry_query_len = thd->query_length();
+ thd->wsrep_retry_query = (char *)my_malloc(
+ thd->wsrep_retry_query_len + 1, MYF(0));
+ thd->wsrep_retry_command = thd->command;
+ strcpy(thd->wsrep_retry_query, thd->query());
+ thd->wsrep_retry_query[thd->wsrep_retry_query_len] = '\0';
+}
+#endif /* WITH_WSREP */
/**
Perform one connection-level (COM_XXXX) command.
@@ -896,6 +1060,50 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
DBUG_ENTER("dispatch_command");
DBUG_PRINT("info", ("command: %d", command));
+#ifdef WITH_WSREP
+ bool is_autocommit= false;
+
+ if (WSREP(thd)) {
+ if (!thd->in_multi_stmt_transaction_mode())
+ {
+ thd->wsrep_PA_safe= true;
+ }
+
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ thd->wsrep_query_state= QUERY_EXEC;
+ if (thd->wsrep_conflict_state== RETRY_AUTOCOMMIT)
+ {
+ thd->wsrep_conflict_state= NO_CONFLICT;
+ }
+
+ is_autocommit= !thd->in_multi_stmt_transaction_mode() &&
+ thd->wsrep_conflict_state == NO_CONFLICT &&
+ !thd->wsrep_applier &&
+ wsrep_read_only_option(thd, thd->lex->query_tables);
+
+ if (thd->wsrep_conflict_state== MUST_ABORT)
+ {
+ thd->wsrep_conflict_state= ABORTING;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ trans_rollback(thd);
+ thd->locked_tables_list.unlock_locked_tables(thd);
+ /* Release transactional metadata locks. */
+ thd->mdl_context.release_transactional_locks();
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ thd->wsrep_conflict_state= ABORTED;
+ }
+ if (thd->wsrep_conflict_state== ABORTED)
+ {
+ my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction");
+ WSREP_DEBUG("Deadlock error for: %s", thd->query());
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ thd->killed= NOT_KILLED;
+ thd->mysys_var->abort= 0;
+ goto dispatch_end;
+ }
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ }
+#endif /* WITH_WSREP */
#if defined(ENABLED_PROFILING)
thd->profiling.start_new_query();
#endif
@@ -1104,7 +1312,12 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
Count each statement from the client.
*/
statistic_increment(thd->status_var.questions, &LOCK_status);
+#ifdef WITH_WSREP
+ if (!WSREP(thd))
+ thd->set_time(); /* Reset the query start time. */
+#else
thd->set_time(); /* Reset the query start time. */
+#endif
parser_state.reset(beginning_of_next_stmt, length);
/* TODO: set thd->lex->sql_command to SQLCOM_END here */
mysql_parse(thd, beginning_of_next_stmt, length, &parser_state);
@@ -1422,6 +1635,156 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
my_message(ER_UNKNOWN_COM_ERROR, ER(ER_UNKNOWN_COM_ERROR), MYF(0));
break;
}
+#ifdef WITH_WSREP
+ dispatch_end:
+
+ if (WSREP(thd)) {
+ /* wsrep BF abort in query exec phase */
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ if (thd->wsrep_conflict_state == MUST_ABORT) {
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ ha_rollback_trans(thd, 0);
+ thd->locked_tables_list.unlock_locked_tables(thd);
+ /* Release transactional metadata locks. */
+ thd->mdl_context.release_transactional_locks();
+ thd->transaction.stmt.reset();
+ WSREP_DEBUG("abort in exec query state, avoiding autocommit");
+ goto wsrep_must_abort;
+ }
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ }
+#endif /* WITH_WSREP */
+
+#ifdef WITH_WSREP
+ wsrep_must_abort:
+ if (WSREP(thd)) {
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ if (thd->wsrep_conflict_state == MUST_ABORT) {
+ thd->wsrep_conflict_state= ABORTING;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+
+ WSREP_DEBUG("in dispatch_command, aborting %s",
+ (thd->query()) ? thd->query() : "void");
+ trans_rollback(thd);
+ thd->locked_tables_list.unlock_locked_tables(thd);
+ /* Release transactional metadata locks. */
+ thd->mdl_context.release_transactional_locks();
+
+ if (thd->get_binlog_table_maps()) {
+ thd->clear_binlog_table_maps();
+ }
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ thd->wsrep_conflict_state= ABORTED;
+ }
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ /* checking if BF trx must be replayed */
+ if (thd->wsrep_conflict_state== MUST_REPLAY) {
+ if (thd->wsrep_exec_mode!= REPL_RECV) {
+ if (thd->stmt_da->is_sent) {
+ WSREP_ERROR("replay issue, thd has reported status already");
+ }
+ thd->stmt_da->reset_diagnostics_area();
+
+ thd->wsrep_conflict_state= REPLAYING;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+
+ mysql_reset_thd_for_next_command(thd, opt_userstat_running);
+ thd->killed= NOT_KILLED;
+ close_thread_tables(thd);
+
+ thd_proc_info(thd, "wsrep replaying trx");
+ WSREP_DEBUG("replay trx: %s %lld",
+ thd->query() ? thd->query() : "void",
+ (long long)thd->wsrep_trx_seqno);
+ struct wsrep_thd_shadow shadow;
+ wsrep_prepare_bf_thd(thd, &shadow);
+ int rcode = wsrep->replay_trx(wsrep,
+ &thd->wsrep_trx_handle,
+ (void *)thd);
+
+ wsrep_return_from_bf_mode(thd, &shadow);
+ if (thd->wsrep_conflict_state!= REPLAYING)
+ WSREP_WARN("lost replaying mode: %d", thd->wsrep_conflict_state );
+
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+
+ switch (rcode) {
+ case WSREP_OK:
+ thd->wsrep_conflict_state= NO_CONFLICT;
+ wsrep->post_commit(wsrep, &thd->wsrep_trx_handle);
+ WSREP_DEBUG("trx_replay successful for: %ld %llu",
+ thd->thread_id, (long long)thd->real_id);
+ break;
+ case WSREP_TRX_FAIL:
+ if (thd->stmt_da->is_sent) {
+ WSREP_ERROR("replay failed, thd has reported status");
+ }
+ else
+ {
+ WSREP_DEBUG("replay failed, rolling back");
+ my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction");
+ }
+ thd->wsrep_conflict_state= ABORTED;
+ wsrep->post_rollback(wsrep, &thd->wsrep_trx_handle);
+ break;
+ default:
+ WSREP_ERROR("trx_replay failed for: %d, query: %s",
+ rcode, thd->query() ? thd->query() : "void");
+ /* we're now in inconsistent state, must abort */
+ unireg_abort(1);
+ break;
+ }
+ mysql_mutex_lock(&LOCK_wsrep_replaying);
+ wsrep_replaying--;
+ WSREP_DEBUG("replaying decreased: %d, thd: %lu",
+ wsrep_replaying, thd->thread_id);
+ mysql_cond_broadcast(&COND_wsrep_replaying);
+ mysql_mutex_unlock(&LOCK_wsrep_replaying);
+ }
+ }
+ /* setting error code for BF aborted trxs */
+ if (thd->wsrep_conflict_state == ABORTED)
+ {
+ mysql_reset_thd_for_next_command(thd, opt_userstat_running);
+ thd->killed= NOT_KILLED;
+ if (is_autocommit &&
+ (thd->wsrep_retry_counter < thd->variables.wsrep_retry_autocommit))
+ {
+ WSREP_DEBUG("wsrep retrying AC query: %s",
+ (thd->query()) ? thd->query() : "void");
+ thd->wsrep_conflict_state= RETRY_AUTOCOMMIT;
+ thd->wsrep_retry_counter++; // grow
+ wsrep_copy_query(thd);
+ }
+ else
+ {
+ WSREP_DEBUG("BF aborted, thd: %lu is_AC: %d, retry: %lu - %lu SQL: %s",
+ thd->thread_id, is_autocommit, thd->wsrep_retry_counter,
+ thd->variables.wsrep_retry_autocommit, thd->query());
+ my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction");
+ thd->killed= NOT_KILLED;
+ thd->wsrep_conflict_state= NO_CONFLICT;
+ thd->wsrep_retry_counter= 0; // reset
+ }
+ }
+ else
+ {
+ set_if_smaller(thd->wsrep_retry_counter, 0); // reset; eventually ok
+ }
+ if ((thd->wsrep_conflict_state != REPLAYING) &&
+ (thd->wsrep_conflict_state != RETRY_AUTOCOMMIT)) {
+
+ thd->update_server_status();
+ thd->protocol->end_statement();
+ query_cache_end_of_result(thd);
+ }
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+
+ } else { /* if (WSREP(thd))... */
+
+#endif /* WITH_WSREP */
DBUG_ASSERT(thd->derived_tables == NULL &&
(thd->open_tables == NULL ||
(thd->locked_tables_mode == LTM_LOCK_TABLES)));
@@ -1430,6 +1793,9 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
thd->update_server_status();
thd->protocol->end_statement();
query_cache_end_of_result(thd);
+#ifdef WITH_WSREP
+ }
+#endif /* WITH_WSREP */
if (!thd->is_error() && !thd->killed_errno())
mysql_audit_general(thd, MYSQL_AUDIT_GENERAL_RESULT, 0, 0);
@@ -1446,7 +1812,16 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
thd->reset_query();
thd->command=COM_SLEEP;
dec_thread_running();
+#ifdef WITH_WSREP
+ if (WSREP(thd)) {
+ thd_proc_info(thd, "sleeping");
+ } else {
+#endif /* WITH_WSREP */
thd_proc_info(thd, 0);
+#ifdef WITH_WSREP
+ }
+#endif /* WITH_WSREP */
+
thd->packet.shrink(thd->variables.net_buffer_length); // Reclaim some memory
free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
@@ -2091,7 +2466,45 @@ mysql_execute_command(THD *thd)
#ifdef HAVE_REPLICATION
} /* endif unlikely slave */
#endif
+#ifdef WITH_WSREP
+ if (WSREP(thd)) {
+ /*
+ change LOCK TABLE WRITE to transaction
+ */
+ if (lex->sql_command== SQLCOM_LOCK_TABLES && wsrep_convert_LOCK_to_trx)
+ {
+ for (TABLE_LIST *table= all_tables; table; table= table->next_global)
+ {
+ if (table->lock_type >= TL_WRITE_ALLOW_WRITE)
+ {
+ lex->sql_command= SQLCOM_BEGIN;
+ thd->wsrep_converted_lock_session= true;
+ break;
+ }
+ }
+ }
+ if (lex->sql_command== SQLCOM_UNLOCK_TABLES &&
+ thd->wsrep_converted_lock_session)
+ {
+ thd->wsrep_converted_lock_session= false;
+ lex->sql_command= SQLCOM_COMMIT;
+ lex->tx_release= TVL_NO;
+ }
+ /*
+ * bail out if DB snapshot has not been installed. We however,
+ * allow SET and SHOW queries
+ */
+ if (thd->variables.wsrep_on && !thd->wsrep_applier && !wsrep_ready &&
+ lex->sql_command != SQLCOM_SET_OPTION &&
+ !is_show_query(lex->sql_command))
+ {
+ my_error(ER_UNKNOWN_COM_ERROR, MYF(0),
+ "WSREP has not yet prepared node for application use");
+ goto error;
+ }
+ }
+#endif /* WITH_WSREP */
status_var_increment(thd->status_var.com_stat[lex->sql_command]);
thd->progress.report_to_client= test(sql_command_flags[lex->sql_command] &
CF_REPORT_PROGRESS);
@@ -2134,6 +2547,9 @@ mysql_execute_command(THD *thd)
#endif
case SQLCOM_SHOW_STATUS_PROC:
case SQLCOM_SHOW_STATUS_FUNC:
+#ifdef WITH_WSREP
+ if (WSREP(thd) && wsrep_causal_wait(thd)) goto error;
+#endif /* WITH_WSREP */
if ((res= check_table_access(thd, SELECT_ACL, all_tables, FALSE,
UINT_MAX, FALSE)))
goto error;
@@ -2141,6 +2557,9 @@ mysql_execute_command(THD *thd)
break;
case SQLCOM_SHOW_STATUS:
{
+#ifdef WITH_WSREP
+ if (WSREP(thd) && wsrep_causal_wait(thd)) goto error;
+#endif /* WITH_WSREP */
execute_show_status(thd, all_tables);
break;
}
@@ -2152,17 +2571,27 @@ mysql_execute_command(THD *thd)
case SQLCOM_SHOW_PLUGINS:
case SQLCOM_SHOW_FIELDS:
case SQLCOM_SHOW_KEYS:
+#ifndef WITH_WSREP
case SQLCOM_SHOW_VARIABLES:
case SQLCOM_SHOW_CHARSETS:
case SQLCOM_SHOW_COLLATIONS:
case SQLCOM_SHOW_STORAGE_ENGINES:
case SQLCOM_SHOW_PROFILE:
+#endif /* WITH_WSREP */
case SQLCOM_SHOW_CLIENT_STATS:
case SQLCOM_SHOW_USER_STATS:
case SQLCOM_SHOW_TABLE_STATS:
case SQLCOM_SHOW_INDEX_STATS:
case SQLCOM_SELECT:
- {
+#ifdef WITH_WSREP
+ if (WSREP(thd) && wsrep_causal_wait(thd)) goto error;
+ case SQLCOM_SHOW_VARIABLES:
+ case SQLCOM_SHOW_CHARSETS:
+ case SQLCOM_SHOW_COLLATIONS:
+ case SQLCOM_SHOW_STORAGE_ENGINES:
+ case SQLCOM_SHOW_PROFILE:
+#endif /* WITH_WSREP */
+ {
thd->status_var.last_query_cost= 0.0;
/*
@@ -2472,7 +2901,7 @@ case SQLCOM_PREPARE:
*/
if (thd->query_name_consts &&
mysql_bin_log.is_open() &&
- thd->variables.binlog_format == BINLOG_FORMAT_STMT &&
+ WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_STMT &&
!mysql_bin_log.is_query_in_union(thd, thd->query_id))
{
List_iterator_fast<Item> it(select_lex->item_list);
@@ -2576,6 +3005,11 @@ case SQLCOM_PREPARE:
if (create_info.options & HA_LEX_CREATE_TMP_TABLE)
thd->variables.option_bits|= OPTION_KEEP_LOG;
/* regular create */
+#ifdef WITH_WSREP
+ if (!thd->is_current_stmt_binlog_format_row() ||
+ !(create_info.options & HA_LEX_CREATE_TMP_TABLE))
+ WSREP_TO_ISOLATION_BEGIN(create_table->db, create_table->table_name)
+#endif /* WITH_WSREP */
if (create_info.options & HA_LEX_CREATE_TABLE_LIKE)
{
/* CREATE TABLE ... LIKE ... */
@@ -2617,6 +3051,7 @@ end_with_restore_list:
DBUG_ASSERT(first_table == all_tables && first_table != 0);
if (check_one_table_access(thd, INDEX_ACL, all_tables))
goto error; /* purecov: inspected */
+ WSREP_TO_ISOLATION_BEGIN(first_table->db, first_table->table_name)
/*
Currently CREATE INDEX or DROP INDEX cause a full table rebuild
and thus classify as slow administrative statements just like
@@ -2674,6 +3109,7 @@ end_with_restore_list:
case SQLCOM_RENAME_TABLE:
{
+ WSREP_TO_ISOLATION_BEGIN(first_table->db, first_table->table_name)
if (execute_rename_table(thd, first_table, all_tables))
goto error;
break;
@@ -2701,6 +3137,10 @@ end_with_restore_list:
goto error;
#else
{
+#ifdef WITH_WSREP
+ if (WSREP(thd) && wsrep_causal_wait(thd)) goto error;
+#endif /* WITH_WSREP */
+
/*
Access check:
SHOW CREATE TABLE require any privileges on the table level (ie
@@ -2756,6 +3196,10 @@ end_with_restore_list:
case SQLCOM_CHECKSUM:
{
DBUG_ASSERT(first_table == all_tables && first_table != 0);
+#ifdef WITH_WSREP
+ if (WSREP(thd) && wsrep_causal_wait(thd)) goto error;
+#endif /* WITH_WSREP */
+
if (check_table_access(thd, SELECT_ACL, all_tables,
FALSE, UINT_MAX, FALSE))
goto error; /* purecov: inspected */
@@ -2950,6 +3394,14 @@ end_with_restore_list:
DBUG_ASSERT(first_table == all_tables && first_table != 0);
if ((res= insert_precheck(thd, all_tables)))
break;
+#ifdef WITH_WSREP
+ if (lex->sql_command == SQLCOM_INSERT_SELECT &&
+ thd->wsrep_consistency_check)
+ {
+ WSREP_TO_ISOLATION_BEGIN(first_table->db, first_table->table_name);
+ }
+
+#endif
/*
INSERT...SELECT...ON DUPLICATE KEY UPDATE/REPLACE SELECT/
INSERT...IGNORE...SELECT can be unsafe, unless ORDER BY PRIMARY KEY
@@ -3113,6 +3565,17 @@ end_with_restore_list:
/* So that DROP TEMPORARY TABLE gets to binlog at commit/rollback */
thd->variables.option_bits|= OPTION_KEEP_LOG;
}
+#ifdef WITH_WSREP
+ for (TABLE_LIST *table= all_tables; table; table= table->next_global)
+ {
+ if (!thd->is_current_stmt_binlog_format_row() ||
+ !find_temporary_table(thd, table))
+ {
+ WSREP_TO_ISOLATION_BEGIN(table->db, table->table_name);
+ break;
+ }
+ }
+#endif /* WITH_WSREP */
/* DDL and binlog write order are protected by metadata locks. */
res= mysql_rm_table(thd, first_table, lex->drop_if_exists,
lex->drop_temporary);
@@ -3156,7 +3619,6 @@ end_with_restore_list:
if (!mysql_change_db(thd, &db_str, FALSE))
my_ok(thd);
-
break;
}
@@ -3299,6 +3761,7 @@ end_with_restore_list:
#endif
if (check_access(thd, CREATE_ACL, lex->name.str, NULL, NULL, 1, 0))
break;
+ WSREP_TO_ISOLATION_BEGIN(lex->name.str, NULL)
res= mysql_create_db(thd,(lower_case_table_names == 2 ? alias :
lex->name.str), &create_info, 0);
break;
@@ -3328,6 +3791,7 @@ end_with_restore_list:
#endif
if (check_access(thd, DROP_ACL, lex->name.str, NULL, NULL, 1, 0))
break;
+ WSREP_TO_ISOLATION_BEGIN(lex->name.str, NULL)
res= mysql_rm_db(thd, lex->name.str, lex->drop_if_exists, 0);
break;
}
@@ -3356,6 +3820,7 @@ end_with_restore_list:
res= 1;
break;
}
+ WSREP_TO_ISOLATION_BEGIN(db->str, NULL)
res= mysql_upgrade_db(thd, db);
if (!res)
my_ok(thd);
@@ -3388,6 +3853,7 @@ end_with_restore_list:
#endif
if (check_access(thd, ALTER_ACL, db->str, NULL, NULL, 1, 0))
break;
+ WSREP_TO_ISOLATION_BEGIN(db->str, NULL)
res= mysql_alter_db(thd, db->str, &create_info);
break;
}
@@ -3420,6 +3886,7 @@ end_with_restore_list:
if (res)
break;
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL)
switch (lex->sql_command) {
case SQLCOM_CREATE_EVENT:
{
@@ -3454,6 +3921,7 @@ end_with_restore_list:
lex->spname->m_name);
break;
case SQLCOM_DROP_EVENT:
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL)
if (!(res= Events::drop_event(thd,
lex->spname->m_db, lex->spname->m_name,
lex->drop_if_exists)))
@@ -3468,6 +3936,7 @@ end_with_restore_list:
if (check_access(thd, INSERT_ACL, "mysql", NULL, NULL, 1, 0))
break;
#ifdef HAVE_DLOPEN
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL)
if (!(res = mysql_create_function(thd, &lex->udf)))
my_ok(thd);
#else
@@ -3482,6 +3951,7 @@ end_with_restore_list:
if (check_access(thd, INSERT_ACL, "mysql", NULL, NULL, 1, 1) &&
check_global_access(thd,CREATE_USER_ACL))
break;
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL)
/* Conditionally writes to binlog */
if (!(res= mysql_create_user(thd, lex->users_list)))
my_ok(thd);
@@ -3493,6 +3963,7 @@ end_with_restore_list:
check_global_access(thd,CREATE_USER_ACL))
break;
/* Conditionally writes to binlog */
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL)
if (!(res= mysql_drop_user(thd, lex->users_list)))
my_ok(thd);
break;
@@ -3503,6 +3974,7 @@ end_with_restore_list:
check_global_access(thd,CREATE_USER_ACL))
break;
/* Conditionally writes to binlog */
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL)
if (!(res= mysql_rename_user(thd, lex->users_list)))
my_ok(thd);
break;
@@ -3517,6 +3989,7 @@ end_with_restore_list:
thd->binlog_invoker();
/* Conditionally writes to binlog */
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL)
if (!(res = mysql_revoke_all(thd, lex->users_list)))
my_ok(thd);
break;
@@ -3583,6 +4056,7 @@ end_with_restore_list:
lex->type == TYPE_ENUM_PROCEDURE, 0))
goto error;
/* Conditionally writes to binlog */
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL)
res= mysql_routine_grant(thd, all_tables,
lex->type == TYPE_ENUM_PROCEDURE,
lex->users_list, grants,
@@ -3596,6 +4070,7 @@ end_with_restore_list:
all_tables, FALSE, UINT_MAX, FALSE))
goto error;
/* Conditionally writes to binlog */
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL)
res= mysql_table_grant(thd, all_tables, lex->users_list,
lex->columns, lex->grant,
lex->sql_command == SQLCOM_REVOKE);
@@ -3611,6 +4086,7 @@ end_with_restore_list:
}
else
{
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL)
/* Conditionally writes to binlog */
res = mysql_grant(thd, select_lex->db, lex->users_list, lex->grant,
lex->sql_command == SQLCOM_REVOKE,
@@ -3749,9 +4225,17 @@ end_with_restore_list:
able to open it (with SQLCOM_HA_OPEN) in the first place.
*/
unit->set_limit(select_lex);
+#ifdef WITH_WSREP
+ { char* tmp_info= NULL;
+ if (WSREP(thd)) tmp_info = (char *)thd_proc_info(thd, "mysql_ha_read()");
+#endif /* WITH_WSREP */
res= mysql_ha_read(thd, first_table, lex->ha_read_mode, lex->ident.str,
lex->insert_list, lex->ha_rkey_mode, select_lex->where,
unit->select_limit_cnt, unit->offset_limit_cnt);
+#ifdef WITH_WSREP
+ if (WSREP(thd)) thd_proc_info(thd, tmp_info);
+ }
+#endif /* WITH_WSREP */
break;
case SQLCOM_BEGIN:
@@ -3819,8 +4303,20 @@ end_with_restore_list:
/* Disconnect the current client connection. */
if (tx_release)
thd->killed= KILL_CONNECTION;
- my_ok(thd);
- break;
+ #ifdef WITH_WSREP
+ if (WSREP(thd)) {
+ if (thd->wsrep_conflict_state == NO_CONFLICT ||
+ thd->wsrep_conflict_state == REPLAYING)
+ {
+ my_ok(thd);
+ }
+ } else {
+#endif /* WITH_WSREP */
+ my_ok(thd);
+ #ifdef WITH_WSREP
+ }
+#endif /* WITH_WSREP */
+ break;
}
case SQLCOM_RELEASE_SAVEPOINT:
if (trans_release_savepoint(thd, lex->ident))
@@ -3888,6 +4384,7 @@ end_with_restore_list:
if (sp_process_definer(thd))
goto create_sp_error;
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL)
res= (sp_result= sp_create_routine(thd, lex->sphead->m_type, lex->sphead));
switch (sp_result) {
case SP_OK: {
@@ -4099,6 +4596,7 @@ create_sp_error:
already puts on CREATE FUNCTION.
*/
/* Conditionally writes to binlog */
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL)
sp_result= sp_update_routine(thd, type, lex->spname, &lex->sp_chistics);
switch (sp_result)
{
@@ -4170,6 +4668,7 @@ create_sp_error:
if (check_routine_access(thd, ALTER_PROC_ACL, db, name,
lex->sql_command == SQLCOM_DROP_PROCEDURE, 0))
goto error;
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL)
/* Conditionally writes to binlog */
sp_result= sp_drop_routine(thd, type, lex->spname);
@@ -4287,6 +4786,7 @@ create_sp_error:
Note: SQLCOM_CREATE_VIEW also handles 'ALTER VIEW' commands
as specified through the thd->lex->create_view_mode flag.
*/
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL)
res= mysql_create_view(thd, first_table, thd->lex->create_view_mode);
break;
}
@@ -4295,12 +4795,14 @@ create_sp_error:
if (check_table_access(thd, DROP_ACL, all_tables, FALSE, UINT_MAX, FALSE))
goto error;
/* Conditionally writes to binlog. */
+ WSREP_TO_ISOLATION_BEGIN(NULL, NULL)
res= mysql_drop_view(thd, first_table, thd->lex->drop_mode);
break;
}
case SQLCOM_CREATE_TRIGGER:
{
/* Conditionally writes to binlog. */
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL)
res= mysql_create_or_drop_trigger(thd, all_tables, 1);
break;
@@ -4308,6 +4810,7 @@ create_sp_error:
case SQLCOM_DROP_TRIGGER:
{
/* Conditionally writes to binlog. */
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL)
res= mysql_create_or_drop_trigger(thd, all_tables, 0);
break;
}
@@ -4358,11 +4861,13 @@ create_sp_error:
my_ok(thd);
break;
case SQLCOM_INSTALL_PLUGIN:
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL)
if (! (res= mysql_install_plugin(thd, &thd->lex->comment,
&thd->lex->ident)))
my_ok(thd);
break;
case SQLCOM_UNINSTALL_PLUGIN:
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL)
if (! (res= mysql_uninstall_plugin(thd, &thd->lex->comment,
&thd->lex->ident)))
my_ok(thd);
@@ -4519,6 +5024,10 @@ finish:
/* Free tables */
thd_proc_info(thd, "closing tables");
close_thread_tables(thd);
+#ifdef WITH_WSREP
+ WSREP_TO_ISOLATION_END
+ thd->wsrep_consistency_check= FALSE;
+#endif /* WITH_WSREP */
thd_proc_info(thd, 0);
#ifndef DBUG_OFF
@@ -5429,6 +5938,21 @@ void THD::reset_for_next_command(bool calculate_userstat)
thd->auto_inc_intervals_in_cur_stmt_for_binlog.empty();
thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0;
+#ifdef WITH_WSREP
+ if (WSREP(thd)) {
+ if (wsrep_auto_increment_control)
+ {
+ if (thd->variables.auto_increment_offset !=
+ global_system_variables.auto_increment_offset)
+ thd->variables.auto_increment_offset=
+ global_system_variables.auto_increment_offset;
+ if (thd->variables.auto_increment_increment !=
+ global_system_variables.auto_increment_increment)
+ thd->variables.auto_increment_increment=
+ global_system_variables.auto_increment_increment;
+ }
+ }
+#endif /* WITH_WSREP */
thd->query_start_used= 0;
thd->query_start_sec_part_used= 0;
thd->is_fatal_error= thd->time_zone_used= 0;
@@ -7381,6 +7905,545 @@ LEX_USER *create_definer(THD *thd, LEX_STRING *user_name, LEX_STRING *host_name)
return definer;
}
+#ifdef WITH_WSREP
+static enum wsrep_status wsrep_apply_sql(
+ THD *thd, const char *sql, size_t sql_len, time_t timeval, uint32 randseed)
+{
+ int error;
+ enum wsrep_status ret_code= WSREP_OK;
+
+ DBUG_ENTER("wsrep_bf_execute_cb");
+ thd->wsrep_exec_mode= REPL_RECV;
+ thd->net.vio= 0;
+ thd->start_time= timeval;
+ thd->wsrep_rand= randseed;
+
+ thd->variables.option_bits |= OPTION_NOT_AUTOCOMMIT;
+
+ DBUG_PRINT("wsrep", ("SQL: %s", sql));
+
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ thd->wsrep_query_state= QUERY_EXEC;
+ /* preserve replaying mode */
+ if (thd->wsrep_conflict_state!= REPLAYING)
+ thd->wsrep_conflict_state= NO_CONFLICT;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+
+ if ((error= dispatch_command(COM_QUERY, thd, (char*)sql, sql_len))) {
+ WSREP_WARN("BF SQL apply failed: %d, %lld",
+ thd->wsrep_conflict_state, (long long)thd->wsrep_trx_seqno);
+ DBUG_RETURN(WSREP_FATAL);
+ }
+
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ if (thd->wsrep_conflict_state!= NO_CONFLICT &&
+ thd->wsrep_conflict_state!= REPLAYING) {
+ ret_code= WSREP_FATAL;
+ WSREP_DEBUG("BF thd ending, with: %d, %lld",
+ thd->wsrep_conflict_state, (long long)thd->wsrep_trx_seqno);
+ }
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+
+ assert(thd->wsrep_exec_mode== REPL_RECV);
+ DBUG_RETURN(ret_code);
+}
+
+void wsrep_write_rbr_buf(
+ THD *thd, const void* rbr_buf, size_t buf_len)
+{
+ char filename[PATH_MAX]= {0};
+ int len= snprintf(filename, PATH_MAX, "%s/GRA_%ld_%lld.log",
+ wsrep_data_home_dir, thd->thread_id,
+ (long long)thd->wsrep_trx_seqno);
+ if (len >= PATH_MAX)
+ {
+ WSREP_ERROR("RBR dump path too long: %d, skipping dump.", len);
+ return;
+ }
+
+ FILE *of= fopen(filename, "wb");
+ if (of)
+ {
+ fwrite (rbr_buf, buf_len, 1, of);
+ fclose(of);
+ }
+ else
+ {
+ WSREP_ERROR("Failed to open file '%s': %d (%s)",
+ filename, errno, strerror(errno));
+ }
+}
+
+static inline wsrep_status_t wsrep_apply_rbr(
+ THD *thd, const uchar *rbr_buf, size_t buf_len)
+{
+ char *buf= (char *)rbr_buf;
+ int rcode= 0;
+ int event= 1;
+
+ DBUG_ENTER("wsrep_apply_rbr");
+
+ if (thd->killed == KILL_CONNECTION)
+ {
+ WSREP_INFO("applier has been aborted, skipping apply_rbr: %lld",
+ (long long) thd->wsrep_trx_seqno);
+ DBUG_RETURN(WSREP_FATAL);
+ }
+
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ thd->wsrep_query_state= QUERY_EXEC;
+ if (thd->wsrep_conflict_state!= REPLAYING)
+ thd->wsrep_conflict_state= NO_CONFLICT;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+
+ if (!buf_len) WSREP_DEBUG("empty rbr buffer to apply: %lld",
+ (long long) thd->wsrep_trx_seqno);
+
+ if ((rcode= trans_begin(thd)))
+ WSREP_WARN("begin for rbr apply failed: %lld, code: %d",
+ (long long) thd->wsrep_trx_seqno, rcode);
+
+ while(buf_len)
+ {
+ int exec_res;
+ int error = 0;
+ Log_event* ev= wsrep_read_log_event(&buf, &buf_len, wsrep_format_desc);
+
+ switch (ev->get_type_code()) {
+ case WRITE_ROWS_EVENT:
+ case UPDATE_ROWS_EVENT:
+ case DELETE_ROWS_EVENT:
+ DBUG_ASSERT(buf_len != 0 ||
+ ((Rows_log_event *) ev)->get_flags(Rows_log_event::STMT_END_F));
+ break;
+ default:
+ break;
+ }
+
+ thd->server_id = ev->server_id; // use the original server id for logging
+ thd->set_time(); // time the query
+ wsrep_xid_init(&thd->transaction.xid_state.xid,
+ wsrep_cluster_uuid(),
+ thd->wsrep_trx_seqno);
+ thd->lex->current_select= 0;
+ if (!ev->when)
+ ev->when = time(NULL);
+ ev->thd = thd;
+ exec_res = ev->apply_event(thd->wsrep_rli);
+ DBUG_PRINT("info", ("exec_event result: %d", exec_res));
+
+ if (exec_res)
+ {
+ WSREP_WARN("RBR event %d %s apply warning: %d, %lld",
+ event, ev->get_type_str(), exec_res, (long long) thd->wsrep_trx_seqno);
+ rcode= exec_res;
+ /* stop processing for the first error */
+ delete ev;
+ goto error;
+ }
+ event++;
+
+ if (thd->wsrep_conflict_state!= NO_CONFLICT &&
+ thd->wsrep_conflict_state!= REPLAYING)
+ WSREP_WARN("conflict state after RBR event applying: %d, %lld",
+ thd->wsrep_query_state, (long long)thd->wsrep_trx_seqno);
+
+ if (thd->wsrep_conflict_state == MUST_ABORT) {
+ WSREP_WARN("RBR event apply failed, rolling back: %lld",
+ (long long) thd->wsrep_trx_seqno);
+ trans_rollback(thd);
+ thd->locked_tables_list.unlock_locked_tables(thd);
+ /* Release transactional metadata locks. */
+ thd->mdl_context.release_transactional_locks();
+ thd->wsrep_conflict_state= NO_CONFLICT;
+ DBUG_RETURN(WSREP_FATAL);
+ }
+
+ if (ev->get_type_code() != TABLE_MAP_EVENT &&
+ ((Rows_log_event *) ev)->get_flags(Rows_log_event::STMT_END_F))
+ {
+ // TODO: combine with commit on higher level common for the query ws
+
+ thd->wsrep_rli->cleanup_context(thd, 0);
+
+ if (error == 0)
+ {
+ thd->clear_error();
+ }
+ else
+ WSREP_ERROR("Error in %s event: commit of row events failed: %lld",
+ ev->get_type_str(), (long long)thd->wsrep_trx_seqno);
+ }
+ delete ev;
+ }
+
+ error:
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ thd->wsrep_query_state= QUERY_IDLE;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+
+ assert(thd->wsrep_exec_mode== REPL_RECV);
+
+ if (thd->killed == KILL_CONNECTION)
+ WSREP_INFO("applier aborted: %lld", (long long)thd->wsrep_trx_seqno);
+
+ if (rcode) DBUG_RETURN(WSREP_FATAL);
+ DBUG_RETURN(WSREP_OK);
+}
+
+wsrep_status_t wsrep_apply_cb(void* const ctx,
+ const void* const buf, size_t const buf_len,
+ wsrep_seqno_t const global_seqno)
+{
+ THD* const thd((THD*)ctx);
+
+ thd->wsrep_trx_seqno= global_seqno;
+
+#ifdef WSREP_PROC_INFO
+ snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
+ "applying write set %lld: %p, %zu",
+ (long long)thd->wsrep_trx_seqno, buf, buf_len);
+ thd_proc_info(thd, thd->wsrep_info);
+#else
+ thd_proc_info(thd, "applying write set");
+#endif /* WSREP_PROC_INFO */
+
+ wsrep_status_t const rcode(wsrep_apply_rbr(thd, (const uchar*)buf, buf_len));
+
+#ifdef WSREP_PROC_INFO
+ snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
+ "applied write set %lld", (long long)thd->wsrep_trx_seqno);
+ thd_proc_info(thd, thd->wsrep_info);
+#else
+ thd_proc_info(thd, "applied write set");
+#endif /* WSREP_PROC_INFO */
+
+ if (WSREP_OK != rcode) wsrep_write_rbr_buf(thd, buf, buf_len);
+
+ return rcode;
+}
+
+#if DELETE // this does not work in 5.5
+/* a common wrapper for end_trans() function - to put all necessary stuff */
+static inline wsrep_status_t
+wsrep_end_trans (THD* const thd, enum enum_mysql_completiontype const end)
+{
+ if (0 == end_trans(thd, end))
+ {
+ return WSREP_OK;
+ }
+ else
+ {
+ return WSREP_FATAL;
+ }
+}
+#endif
+
+wsrep_status_t wsrep_commit(THD* const thd, wsrep_seqno_t const global_seqno)
+{
+#ifdef WSREP_PROC_INFO
+ snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
+ "committing %lld", (long long)thd->wsrep_trx_seqno);
+ thd_proc_info(thd, thd->wsrep_info);
+#else
+ thd_proc_info(thd, "committing");
+#endif /* WSREP_PROC_INFO */
+
+ wsrep_status_t const rcode(wsrep_apply_sql(thd, "COMMIT", 6, 0, 0));
+// wsrep_status_t const rcode(wsrep_end_trans (thd, COMMIT));
+
+#ifdef WSREP_PROC_INFO
+ snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
+ "committed %lld", (long long)thd->wsrep_trx_seqno);
+ thd_proc_info(thd, thd->wsrep_info);
+#else
+ thd_proc_info(thd, "committed");
+#endif /* WSREP_PROC_INFO */
+
+ if (WSREP_OK == rcode)
+ {
+ // TODO: mark snapshot with global_seqno.
+ }
+
+ return rcode;
+}
+
+wsrep_status_t wsrep_rollback(THD* const thd, wsrep_seqno_t const global_seqno)
+{
+#ifdef WSREP_PROC_INFO
+ snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
+ "rolling back %lld", (long long)thd->wsrep_trx_seqno);
+ thd_proc_info(thd, thd->wsrep_info);
+#else
+ thd_proc_info(thd, "rolling back");
+#endif /* WSREP_PROC_INFO */
+
+ wsrep_status_t const rcode(wsrep_apply_sql(thd, "ROLLBACK", 8, 0, 0));
+// wsrep_status_t const rcode(wsrep_end_trans (thd, ROLLBACK));
+
+#ifdef WSREP_PROC_INFO
+ snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
+ "rolled back %lld", (long long)thd->wsrep_trx_seqno);
+ thd_proc_info(thd, thd->wsrep_info);
+#else
+ thd_proc_info(thd, "rolled back");
+#endif /* WSREP_PROC_INFO */
+
+ return rcode;
+}
+
+wsrep_status_t wsrep_commit_cb(void* const ctx,
+ wsrep_seqno_t const global_seqno,
+ bool const commit)
+{
+ THD* const thd((THD*)ctx);
+
+ assert(global_seqno == thd->wsrep_trx_seqno);
+
+ if (commit)
+ return wsrep_commit(thd, global_seqno);
+ else
+ return wsrep_rollback(thd, global_seqno);
+}
+
+Relay_log_info* wsrep_relay_log_init(const char* log_fname)
+{
+ Relay_log_info* rli= new Relay_log_info(false);
+
+ rli->no_storage= true;
+ if (!rli->relay_log.description_event_for_exec)
+ {
+ rli->relay_log.description_event_for_exec=
+ new Format_description_log_event(4);
+ }
+
+ rli->sql_thd= current_thd;
+ return rli;
+}
+
+void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow* shadow)
+{
+ shadow->options = thd->variables.option_bits;
+ shadow->wsrep_exec_mode = thd->wsrep_exec_mode;
+ shadow->vio = thd->net.vio;
+
+ if (opt_log_slave_updates)
+ thd->variables.option_bits|= OPTION_BIN_LOG;
+ else
+ thd->variables.option_bits&= ~(OPTION_BIN_LOG);
+
+ if (!thd->wsrep_rli) thd->wsrep_rli= wsrep_relay_log_init("wsrep_relay");
+
+ thd->wsrep_exec_mode= REPL_RECV;
+ thd->net.vio= 0;
+ thd->clear_error();
+
+ thd->variables.option_bits|= OPTION_NOT_AUTOCOMMIT;
+
+ shadow->tx_isolation = thd->variables.tx_isolation;
+ thd->variables.tx_isolation = ISO_READ_COMMITTED;
+ thd->tx_isolation = ISO_READ_COMMITTED;
+}
+
+void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow* shadow)
+{
+ thd->variables.option_bits = shadow->options;
+ thd->wsrep_exec_mode = shadow->wsrep_exec_mode;
+ thd->net.vio = shadow->vio;
+ thd->variables.tx_isolation = shadow->tx_isolation;
+}
+
+void wsrep_replication_process(THD *thd)
+{
+ int rcode;
+ DBUG_ENTER("wsrep_replication_process");
+
+ struct wsrep_thd_shadow shadow;
+ wsrep_prepare_bf_thd(thd, &shadow);
+
+ wsrep_format_desc= new Format_description_log_event(4);
+
+ rcode = wsrep->recv(wsrep, (void *)thd);
+ DBUG_PRINT("wsrep",("wsrep_repl returned: %d", rcode));
+
+ WSREP_INFO("applier thread exiting (code:%d)", rcode);
+
+ switch (rcode) {
+ case WSREP_OK:
+ case WSREP_NOT_IMPLEMENTED:
+ case WSREP_CONN_FAIL:
+ /* provider does not support slave operations / disconnected from group,
+ * just close applier thread */
+ break;
+ case WSREP_NODE_FAIL:
+ /* data inconsistency => SST is needed */
+ /* Note: we cannot just blindly restart replication here,
+ * SST might require server restart if storage engines must be
+ * initialized after SST */
+ WSREP_ERROR("node consistency compromised, aborting");
+ wsrep_kill_mysql(thd);
+ break;
+ case WSREP_WARNING:
+ case WSREP_TRX_FAIL:
+ case WSREP_TRX_MISSING:
+ /* these suggests a bug in provider code */
+ WSREP_WARN("bad return from recv() call: %d", rcode);
+ /* fall through to node shutdown */
+ case WSREP_FATAL:
+ /* Cluster connectivity is lost.
+ *
+ * If applier was killed on purpose (KILL_CONNECTION), we
+ * avoid mysql shutdown. This is because the killer will then handle
+ * shutdown processing (or replication restarting)
+ */
+ if (thd->killed != KILL_CONNECTION)
+ {
+ wsrep_kill_mysql(thd);
+ }
+ break;
+ }
+
+ if (thd->killed != KILL_CONNECTION)
+ {
+ mysql_mutex_lock(&LOCK_thread_count);
+ wsrep_close_applier(thd);
+ mysql_cond_broadcast(&COND_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
+ }
+
+ if (thd->killed != KILL_CONNECTION)
+ {
+ mysql_mutex_lock(&LOCK_thread_count);
+ wsrep_close_applier(thd);
+ mysql_cond_broadcast(&COND_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
+ }
+ wsrep_return_from_bf_mode(thd, &shadow);
+ DBUG_VOID_RETURN;
+}
+
+void wsrep_rollback_process(THD *thd)
+{
+ DBUG_ENTER("wsrep_rollback_process");
+
+ mysql_mutex_lock(&LOCK_wsrep_rollback);
+ wsrep_aborting_thd= NULL;
+
+ while (thd->killed == NOT_KILLED) {
+ thd_proc_info(thd, "wsrep aborter idle");
+ thd->mysys_var->current_mutex= &LOCK_wsrep_rollback;
+ thd->mysys_var->current_cond= &COND_wsrep_rollback;
+
+ mysql_cond_wait(&COND_wsrep_rollback,&LOCK_wsrep_rollback);
+
+ WSREP_DEBUG("WSREP rollback thread wakes for signal");
+
+ mysql_mutex_lock(&thd->mysys_var->mutex);
+ thd_proc_info(thd, "wsrep aborter active");
+ thd->mysys_var->current_mutex= 0;
+ thd->mysys_var->current_cond= 0;
+ mysql_mutex_unlock(&thd->mysys_var->mutex);
+
+ /* check for false alarms */
+ if (!wsrep_aborting_thd)
+ {
+ WSREP_DEBUG("WSREP rollback thread has empty abort queue");
+ }
+ /* process all entries in the queue */
+ while (wsrep_aborting_thd) {
+ THD *aborting;
+ wsrep_aborting_thd_t next = wsrep_aborting_thd->next;
+ aborting = wsrep_aborting_thd->aborting_thd;
+ my_free(wsrep_aborting_thd);
+ wsrep_aborting_thd= next;
+ /*
+ * must release mutex, appliers my want to add more
+ * aborting thds in our work queue, while we rollback
+ */
+ mysql_mutex_unlock(&LOCK_wsrep_rollback);
+
+ mysql_mutex_lock(&aborting->LOCK_wsrep_thd);
+ if (aborting->wsrep_conflict_state== ABORTED)
+ {
+ WSREP_DEBUG("WSREP, thd already aborted: %llu state: %d",
+ (long long)aborting->real_id,
+ aborting->wsrep_conflict_state);
+
+ mysql_mutex_unlock(&aborting->LOCK_wsrep_thd);
+ mysql_mutex_lock(&LOCK_wsrep_rollback);
+ continue;
+ }
+ aborting->wsrep_conflict_state= ABORTING;
+
+ mysql_mutex_unlock(&aborting->LOCK_wsrep_thd);
+
+ aborting->store_globals();
+
+ trans_rollback(aborting);
+ aborting->locked_tables_list.unlock_locked_tables(thd);
+ /* Release transactional metadata locks. */
+ aborting->mdl_context.release_transactional_locks();
+
+ mysql_mutex_lock(&aborting->LOCK_wsrep_thd);
+ aborting->wsrep_conflict_state= ABORTED;
+ WSREP_DEBUG("WSREP rollbacker aborted thd: %llu",
+ (long long)aborting->real_id);
+ mysql_mutex_unlock(&aborting->LOCK_wsrep_thd);
+ mysql_mutex_lock(&LOCK_wsrep_rollback);
+ }
+ }
+
+ mysql_mutex_unlock(&LOCK_wsrep_rollback);
+ sql_print_information("WSREP: rollbacker thread exiting");
+
+ DBUG_PRINT("wsrep",("wsrep rollbacker thread exiting"));
+ DBUG_VOID_RETURN;
+}
+extern "C"
+int wsrep_thd_is_brute_force(void *thd_ptr)
+{
+ if (thd_ptr) {
+ switch (((THD *)thd_ptr)->wsrep_exec_mode) {
+ case LOCAL_STATE:
+ {
+ if (((THD *)thd_ptr)->wsrep_conflict_state== REPLAYING)
+ {
+ return 1;
+ }
+ return 0;
+ }
+ case REPL_RECV: return 1;
+ case TOTAL_ORDER: return 2;
+ case LOCAL_COMMIT: return 3;
+ }
+ }
+ return 0;
+}
+extern "C"
+int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal)
+{
+ THD *victim_thd = (THD *) victim_thd_ptr;
+ THD *bf_thd = (THD *) bf_thd_ptr;
+ DBUG_ENTER("wsrep_abort_thd");
+
+ if (WSREP(bf_thd) && victim_thd)
+ {
+ WSREP_DEBUG("wsrep_abort_thd, by: %llu, victim: %llu", (bf_thd) ?
+ (long long)bf_thd->real_id : 0, (long long)victim_thd->real_id);
+ ha_wsrep_abort_transaction(bf_thd, victim_thd, signal);
+ }
+
+ DBUG_RETURN(1);
+}
+extern "C"
+int wsrep_thd_in_locking_session(void *thd_ptr)
+{
+ if (thd_ptr && ((THD *)thd_ptr)->in_lock_tables) {
+ return 1;
+ }
+ return 0;
+}
+#endif
/**
Retuns information about user or current user.
diff --git a/sql/sql_plugin.cc b/sql/sql_plugin.cc
index 39f69b2656d..6a1a9221064 100644
--- a/sql/sql_plugin.cc
+++ b/sql/sql_plugin.cc
@@ -2954,11 +2954,17 @@ void plugin_thdvar_init(THD *thd)
thd->variables.dynamic_variables_size= 0;
thd->variables.dynamic_variables_ptr= 0;
+#ifdef WITH_WSREP
+ if (!WSREP(thd) || !thd->wsrep_applier) {
+#endif
mysql_mutex_lock(&LOCK_plugin);
thd->variables.table_plugin=
my_intern_plugin_lock(NULL, global_system_variables.table_plugin);
intern_plugin_unlock(NULL, old_table_plugin);
mysql_mutex_unlock(&LOCK_plugin);
+#ifdef WITH_WSREP
+ }
+#endif
DBUG_VOID_RETURN;
}
diff --git a/sql/sql_reload.cc b/sql/sql_reload.cc
index d2f118b62c9..cb33110d9eb 100644
--- a/sql/sql_reload.cc
+++ b/sql/sql_reload.cc
@@ -224,7 +224,18 @@ bool reload_acl_and_cache(THD *thd, unsigned long options,
}
if (options & REFRESH_CHECKPOINT)
disable_checkpoints(thd);
- }
+#ifdef WITH_WSREP
+ /*
+ We need to do it second time after wsrep appliers were blocked in
+ make_global_read_lock_block_commit(thd) above since they could have
+ modified the tables too.
+ */
+ if (WSREP(thd) &&
+ close_cached_tables(thd, tables, (options & REFRESH_FAST) ?
+ FALSE : TRUE, TRUE))
+ result= 1;
+#endif /* WITH_WSREP */
+ }
else
{
if (thd && thd->locked_tables_mode)
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index eb17ef0812c..ecb47324552 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -1399,7 +1399,14 @@ int stop_slave(THD* thd, Master_info* mi, bool net_report )
ER(ER_SLAVE_WAS_NOT_RUNNING));
}
unlock_slave_threads(mi);
+#ifdef WITH_WSREP
+ if (WSREP(thd))
+ thd_proc_info(thd, "exit stop_slave()");
+ else
+ thd_proc_info(thd, 0);
+#else /* WITH_WSREP */
thd_proc_info(thd, 0);
+#endif /* WITH_WSREP */
if (slave_errno)
{
@@ -1832,7 +1839,14 @@ bool change_master(THD* thd, Master_info* mi)
err:
unlock_slave_threads(mi);
+#ifdef WITH_WSREP
+ if (WSREP(thd))
+ thd_proc_info(thd, "exit change_master()");
+ else
+ thd_proc_info(thd, 0);
+#else /* WITH_WSREP */
thd_proc_info(thd, 0);
+#endif /* WITH_WSREP */
if (ret == FALSE)
my_ok(thd);
DBUG_RETURN(ret);
diff --git a/sql/sql_show.cc b/sql/sql_show.cc
index d64c7a6df52..da48c7cdc63 100644
--- a/sql/sql_show.cc
+++ b/sql/sql_show.cc
@@ -59,6 +59,9 @@
#include "datadict.h" // dd_frm_type()
#include "keycaches.h"
+#if !defined(MYSQL_MAX_VARIABLE_VALUE_LEN)
+#define MYSQL_MAX_VARIABLE_VALUE_LEN 1024
+#endif // !defined(MYSQL_MAX_VARIABLE_VALUE_LEN)
#define STR_OR_NIL(S) ((S) ? (S) : "<nil>")
#ifdef WITH_PARTITION_STORAGE_ENGINE
@@ -8088,7 +8091,8 @@ ST_FIELD_INFO variables_fields_info[]=
{
{"VARIABLE_NAME", 64, MYSQL_TYPE_STRING, 0, 0, "Variable_name",
SKIP_OPEN_TABLE},
- {"VARIABLE_VALUE", 1024, MYSQL_TYPE_STRING, 0, 1, "Value", SKIP_OPEN_TABLE},
+ {"VARIABLE_VALUE", MYSQL_MAX_VARIABLE_VALUE_LEN, MYSQL_TYPE_STRING, 0, 1,
+ "Value", SKIP_OPEN_TABLE},
{0, 0, MYSQL_TYPE_STRING, 0, 0, 0, SKIP_OPEN_TABLE}
};
diff --git a/sql/sql_table.cc b/sql/sql_table.cc
index 01832036701..1fdd81de089 100644
--- a/sql/sql_table.cc
+++ b/sql/sql_table.cc
@@ -6158,12 +6158,18 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
error= 0;
break;
}
- if (error == HA_ERR_WRONG_COMMAND)
+#ifdef WITH_WSREP
+ bool do_log_write(true);
+#endif /* WITH_WSREP */
+ if (error == HA_ERR_WRONG_COMMAND)
{
error= 0;
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
ER_ILLEGAL_HA, ER(ER_ILLEGAL_HA),
table->alias.c_ptr());
+#ifdef WITH_WSREP
+ WSREP_DEBUG("ignoring DDL failure: %d %s", error, thd->query());
+#endif /* WITH_WSREP */
}
if (!error && (new_name != table_name || new_db != db))
@@ -6215,6 +6221,9 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
ER_ILLEGAL_HA, ER(ER_ILLEGAL_HA),
table->alias.c_ptr());
+#ifdef WITH_WSREP
+ WSREP_DEBUG("ignoring DDL failure: %d %s", error, thd->query());
+#endif /* WITH_WSREP */
}
if (!error)
diff --git a/sql/sql_trigger.cc b/sql/sql_trigger.cc
index 1ac1d7bbb5e..149c2431324 100644
--- a/sql/sql_trigger.cc
+++ b/sql/sql_trigger.cc
@@ -2452,3 +2452,55 @@ bool load_table_name_for_trigger(THD *thd,
DBUG_RETURN(FALSE);
}
+#ifdef WITH_WSREP
+int wsrep_create_trigger_query(THD *thd, uchar** buf, uint* buf_len)
+{
+ LEX *lex= thd->lex;
+ String stmt_query;
+
+ LEX_STRING definer_user;
+ LEX_STRING definer_host;
+
+ if (!lex->definer)
+ {
+ if (!thd->slave_thread)
+ {
+ if (!(lex->definer= create_default_definer(thd)))
+ return 1;
+ }
+ }
+
+ if (lex->definer)
+ {
+ /* SUID trigger. */
+
+ definer_user= lex->definer->user;
+ definer_host= lex->definer->host;
+ }
+ else
+ {
+ /* non-SUID trigger. */
+
+ definer_user.str= 0;
+ definer_user.length= 0;
+
+ definer_host.str= 0;
+ definer_host.length= 0;
+ }
+
+ stmt_query.append(STRING_WITH_LEN("CREATE "));
+
+ append_definer(thd, &stmt_query, &definer_user, &definer_host);
+
+ LEX_STRING stmt_definition;
+ stmt_definition.str= (char*) thd->lex->stmt_definition_begin;
+ stmt_definition.length= thd->lex->stmt_definition_end
+ - thd->lex->stmt_definition_begin;
+ trim_whitespace(thd->charset(), & stmt_definition);
+
+ stmt_query.append(stmt_definition.str, stmt_definition.length);
+
+ return wsrep_to_buf_helper(thd, stmt_query.c_ptr(), stmt_query.length(),
+ buf, buf_len);
+}
+#endif /* WITH_WSREP */
diff --git a/sql/sql_truncate.cc b/sql/sql_truncate.cc
index 67ed608f114..a66b2377fcb 100644
--- a/sql/sql_truncate.cc
+++ b/sql/sql_truncate.cc
@@ -24,6 +24,9 @@
#include "sql_acl.h" // DROP_ACL
#include "sql_parse.h" // check_one_table_access()
#include "sql_truncate.h"
+#ifdef WITH_WSREP
+#include "wsrep_mysqld.h"
+#endif /* WITH_WSREP */
/**
@@ -515,9 +518,14 @@ bool Truncate_statement::execute(THD *thd)
if (check_one_table_access(thd, DROP_ACL, first_table))
DBUG_RETURN(res);
+#ifdef WITH_WSREP
+ if (WSREP(thd) && wsrep_to_isolation_begin(thd,
+ first_table->db,
+ first_table->table_name))
+ DBUG_RETURN(TRUE);
+#endif /* WITH_WSREP */
if (! (res= truncate_table(thd, first_table)))
my_ok(thd);
-
DBUG_RETURN(res);
}
diff --git a/sql/sql_update.cc b/sql/sql_update.cc
index 5fa30c91417..ddb95f09ce2 100644
--- a/sql/sql_update.cc
+++ b/sql/sql_update.cc
@@ -902,7 +902,11 @@ int mysql_update(THD *thd,
*/
if ((error < 0) || thd->transaction.stmt.modified_non_trans_table)
{
+#ifdef WITH_WSREP
+ if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
+#else
if (mysql_bin_log.is_open())
+#endif
{
int errcode= 0;
if (error < 0)
@@ -2045,7 +2049,11 @@ void multi_update::abort_result_set()
The query has to binlog because there's a modified non-transactional table
either from the query's list or via a stored routine: bug#13270,23333
*/
+#ifdef WITH_WSREP
+ if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
+#else
if (mysql_bin_log.is_open())
+#endif
{
/*
THD::killed status might not have been set ON at time of an error
@@ -2304,7 +2312,11 @@ bool multi_update::send_eof()
if (local_error == 0 || thd->transaction.stmt.modified_non_trans_table)
{
+#ifdef WITH_WSREP
+ if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
+#else
if (mysql_bin_log.is_open())
+#endif
{
int errcode= 0;
if (local_error == 0)
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index 72e9525db72..dd11f51b212 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -3437,6 +3437,206 @@ static Sys_var_tz Sys_time_zone(
"time_zone", "time_zone",
SESSION_VAR(time_zone), NO_CMD_LINE,
DEFAULT(&default_tz), NO_MUTEX_GUARD, IN_BINLOG);
+#ifdef WITH_WSREP
+#include "wsrep_mysqld.h"
+
+static Sys_var_charptr Sys_wsrep_provider(
+ "wsrep_provider", "Path to replication provider library",
+ GLOBAL_VAR(wsrep_provider), CMD_LINE(REQUIRED_ARG, OPT_WSREP_PROVIDER),
+ IN_FS_CHARSET, DEFAULT(wsrep_provider),
+ // IN_FS_CHARSET, DEFAULT(wsrep_provider_default),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG,
+ ON_CHECK(wsrep_provider_check), ON_UPDATE(wsrep_provider_update));
+
+static Sys_var_charptr Sys_wsrep_provider_options(
+ "wsrep_provider_options", "provider specific options",
+ GLOBAL_VAR(wsrep_provider_options),
+ CMD_LINE(REQUIRED_ARG, OPT_WSREP_PROVIDER_OPTIONS),
+ IN_FS_CHARSET, DEFAULT(wsrep_provider_options),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG,
+ ON_CHECK(wsrep_provider_options_check),
+ ON_UPDATE(wsrep_provider_options_update));
+
+static Sys_var_charptr Sys_wsrep_data_home_dir(
+ "wsrep_data_home_dir", "home directory for wsrep provider",
+ READ_ONLY GLOBAL_VAR(wsrep_data_home_dir), CMD_LINE(REQUIRED_ARG),
+ IN_FS_CHARSET, DEFAULT(""),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG);
+
+static Sys_var_charptr Sys_wsrep_cluster_name(
+ "wsrep_cluster_name", "Name for the cluster",
+ GLOBAL_VAR(wsrep_cluster_name), CMD_LINE(REQUIRED_ARG),
+ IN_FS_CHARSET, DEFAULT(wsrep_cluster_name),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG,
+ ON_CHECK(wsrep_cluster_name_check),
+ ON_UPDATE(wsrep_cluster_name_update));
+
+static Sys_var_charptr Sys_wsrep_cluster_address (
+ "wsrep_cluster_address", "Address to initially connect to cluster",
+ GLOBAL_VAR(wsrep_cluster_address),
+ CMD_LINE(REQUIRED_ARG, OPT_WSREP_CLUSTER_ADDRESS),
+ IN_FS_CHARSET, DEFAULT(wsrep_cluster_address),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG,
+ ON_CHECK(wsrep_cluster_address_check),
+ ON_UPDATE(wsrep_cluster_address_update));
+
+static Sys_var_charptr Sys_wsrep_node_name (
+ "wsrep_node_name", "Node name",
+ GLOBAL_VAR(wsrep_node_name), CMD_LINE(REQUIRED_ARG),
+ IN_FS_CHARSET, DEFAULT(glob_hostname),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG);
+
+static Sys_var_charptr Sys_wsrep_node_address (
+ "wsrep_node_address", "Node address",
+ GLOBAL_VAR(wsrep_node_address), CMD_LINE(REQUIRED_ARG),
+ IN_FS_CHARSET, DEFAULT(wsrep_node_address),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG,
+ ON_CHECK(wsrep_node_address_check),
+ ON_UPDATE(wsrep_node_address_update));
+
+static Sys_var_charptr Sys_wsrep_node_incoming_address(
+ "wsrep_node_incoming_address", "Client connection address",
+ GLOBAL_VAR(wsrep_node_incoming_address),CMD_LINE(REQUIRED_ARG),
+ IN_FS_CHARSET, DEFAULT(wsrep_node_incoming_address),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG,
+ ON_CHECK(wsrep_node_name_check),
+ ON_UPDATE(wsrep_node_name_update));
+
+static Sys_var_ulong Sys_wsrep_slave_threads(
+ "wsrep_slave_threads", "Number of slave appliers to launch",
+ GLOBAL_VAR(wsrep_slave_threads), CMD_LINE(REQUIRED_ARG),
+ VALID_RANGE(1, 512), DEFAULT(1), BLOCK_SIZE(1));
+
+static Sys_var_charptr Sys_wsrep_dbug_option(
+ "wsrep_dbug_option", "DBUG options to provider library",
+ GLOBAL_VAR(wsrep_dbug_option),CMD_LINE(REQUIRED_ARG),
+ IN_FS_CHARSET, DEFAULT(""),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG);
+
+static Sys_var_mybool Sys_wsrep_debug(
+ "wsrep_debug", "To enable debug level logging",
+ GLOBAL_VAR(wsrep_debug), CMD_LINE(OPT_ARG), DEFAULT(FALSE));
+
+static Sys_var_mybool Sys_wsrep_convert_LOCK_to_trx(
+ "wsrep_convert_LOCK_to_trx", "To convert locking sessions "
+ "into transactions",
+ GLOBAL_VAR(wsrep_convert_LOCK_to_trx),
+ CMD_LINE(OPT_ARG), DEFAULT(FALSE));
+
+static Sys_var_ulong Sys_wsrep_retry_autocommit(
+ "wsrep_retry_autocommit", "Max number of times to retry "
+ "a failed autocommit statement",
+ SESSION_VAR(wsrep_retry_autocommit), CMD_LINE(REQUIRED_ARG),
+ VALID_RANGE(0, 10000), DEFAULT(1), BLOCK_SIZE(1));
+
+static Sys_var_mybool Sys_wsrep_auto_increment_control(
+ "wsrep_auto_increment_control", "To automatically control the "
+ "assignment of autoincrement variables",
+ GLOBAL_VAR(wsrep_auto_increment_control),
+ CMD_LINE(OPT_ARG), DEFAULT(TRUE));
+
+static Sys_var_mybool Sys_wsrep_drupal_282555_workaround(
+ "wsrep_drupal_282555_workaround", "To use a workaround for"
+ "bad autoincrement value",
+ GLOBAL_VAR(wsrep_drupal_282555_workaround),
+ CMD_LINE(OPT_ARG), DEFAULT(FALSE));
+
+static Sys_var_charptr sys_wsrep_sst_method(
+ "wsrep_sst_method", "Snapshot transfer method",
+ GLOBAL_VAR(wsrep_sst_method),CMD_LINE(REQUIRED_ARG),
+ IN_FS_CHARSET, DEFAULT(wsrep_sst_method), NO_MUTEX_GUARD, NOT_IN_BINLOG,
+ ON_CHECK(wsrep_sst_method_check),
+ ON_UPDATE(wsrep_sst_method_update));
+
+static Sys_var_charptr Sys_wsrep_sst_receive_address(
+ "wsrep_sst_receive_address", "Address where node is waiting for "
+ "SST contact",
+ GLOBAL_VAR(wsrep_sst_receive_address),CMD_LINE(REQUIRED_ARG),
+ IN_FS_CHARSET, DEFAULT(wsrep_sst_receive_address), NO_MUTEX_GUARD,
+ NOT_IN_BINLOG,
+ ON_CHECK(wsrep_sst_receive_address_check),
+ ON_UPDATE(wsrep_sst_receive_address_update));
+
+static Sys_var_charptr Sys_wsrep_sst_auth(
+ "wsrep_sst_auth", "Authentication for SST connection",
+ GLOBAL_VAR(wsrep_sst_auth), CMD_LINE(REQUIRED_ARG, OPT_WSREP_SST_AUTH),
+ IN_FS_CHARSET, DEFAULT(wsrep_sst_auth), NO_MUTEX_GUARD,
+ NOT_IN_BINLOG,
+ ON_CHECK(wsrep_sst_auth_check),
+ ON_UPDATE(wsrep_sst_auth_update));
+
+static Sys_var_charptr Sys_wsrep_sst_donor(
+ "wsrep_sst_donor", "preferred donor node for the SST",
+ GLOBAL_VAR(wsrep_sst_donor),CMD_LINE(REQUIRED_ARG),
+ IN_FS_CHARSET, DEFAULT(""), NO_MUTEX_GUARD, NOT_IN_BINLOG,
+ ON_CHECK(wsrep_sst_donor_check),
+ ON_UPDATE(wsrep_sst_donor_update));
+
+static Sys_var_mybool Sys_wsrep_on (
+ "wsrep_on", "To enable wsrep replication ",
+ SESSION_VAR(wsrep_on),
+ CMD_LINE(OPT_ARG), DEFAULT(TRUE),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
+ ON_UPDATE(wsrep_on_update));
+
+static Sys_var_charptr Sys_wsrep_start_position (
+ "wsrep_start_position", "global transaction position to start from ",
+ GLOBAL_VAR(wsrep_start_position),
+ CMD_LINE(REQUIRED_ARG, OPT_WSREP_START_POSITION),
+ IN_FS_CHARSET, DEFAULT(wsrep_start_position),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG,
+ ON_CHECK(wsrep_start_position_check),
+ ON_UPDATE(wsrep_start_position_update));
+
+static Sys_var_ulonglong Sys_wsrep_max_ws_size (
+ "wsrep_max_ws_size", "Max write set size (bytes)",
+ GLOBAL_VAR(wsrep_max_ws_size), CMD_LINE(REQUIRED_ARG),
+ VALID_RANGE(1024, 4294967296ULL), DEFAULT(1073741824ULL), BLOCK_SIZE(1));
+
+static Sys_var_ulong Sys_wsrep_max_ws_rows (
+ "wsrep_max_ws_rows", "Max number of rows in write set",
+ GLOBAL_VAR(wsrep_max_ws_rows), CMD_LINE(REQUIRED_ARG),
+ VALID_RANGE(1, 1048576), DEFAULT(131072), BLOCK_SIZE(1));
+
+static Sys_var_charptr Sys_wsrep_notify_cmd(
+ "wsrep_notify_cmd", "",
+ GLOBAL_VAR(wsrep_notify_cmd),CMD_LINE(REQUIRED_ARG),
+ IN_FS_CHARSET, DEFAULT(""), NO_MUTEX_GUARD, NOT_IN_BINLOG);
+
+static Sys_var_mybool Sys_wsrep_certify_nonPK(
+ "wsrep_certify_nonPK", "Certify tables with no primary key",
+ GLOBAL_VAR(wsrep_certify_nonPK),
+ CMD_LINE(OPT_ARG), DEFAULT(TRUE));
+
+static Sys_var_mybool Sys_wsrep_causal_reads(
+ "wsrep_causal_reads", "Enable \"strictly synchronous\" semantics for read operations",
+ SESSION_VAR(wsrep_causal_reads),
+ CMD_LINE(OPT_ARG), DEFAULT(FALSE));
+ // ON_UPDATE(wsrep_causal_reads_update));
+
+static const char *wsrep_OSU_method_names[]= { "TOI", "RSU", NullS };
+static Sys_var_enum Sys_wsrep_OSU_method(
+ "wsrep_OSU_method", "Method for Online Schema Upgrade",
+ GLOBAL_VAR(wsrep_OSU_method_options), CMD_LINE(OPT_ARG),
+ wsrep_OSU_method_names, DEFAULT(WSREP_OSU_TOI),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
+ ON_UPDATE(0));
+
+static Sys_var_enum Sys_wsrep_forced_binlog_format(
+ "wsrep_forced_binlog_format", "binlog format to take effect over user's choice",
+ GLOBAL_VAR(wsrep_forced_binlog_format),
+ CMD_LINE(REQUIRED_ARG, OPT_BINLOG_FORMAT),
+ wsrep_binlog_format_names, DEFAULT(BINLOG_FORMAT_UNSPEC),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
+ ON_UPDATE(0));
+
+static Sys_var_mybool Sys_wsrep_recover_datadir(
+ "wsrep_recover", "Recover database state after crash and exit",
+ READ_ONLY GLOBAL_VAR(wsrep_recovery),
+ CMD_LINE(OPT_ARG, OPT_WSREP_RECOVER), DEFAULT(FALSE));
+
+
+#endif /* WITH_WSREP */
static Sys_var_ulong Sys_sp_cache_size(
"stored_program_cache",
diff --git a/sql/transaction.cc b/sql/transaction.cc
index 94a32200274..2b23b5e19f1 100644
--- a/sql/transaction.cc
+++ b/sql/transaction.cc
@@ -96,6 +96,9 @@ static bool xa_trans_force_rollback(THD *thd)
by ha_rollback()/THD::transaction::cleanup().
*/
thd->transaction.xid_state.rm_error= 0;
+#ifdef WITH_WSREP
+ wsrep_register_hton(thd, TRUE);
+#endif /* WITH_WSREP */
if (ha_rollback_trans(thd, true))
{
my_error(ER_XAER_RMERR, MYF(0));
@@ -134,6 +137,9 @@ bool trans_begin(THD *thd, uint flags)
(thd->variables.option_bits & OPTION_TABLE_LOCK))
{
thd->variables.option_bits&= ~OPTION_TABLE_LOCK;
+#ifdef WITH_WSREP
+ wsrep_register_hton(thd, TRUE);
+#endif /* WITH_WSREP */
thd->server_status&= ~SERVER_STATUS_IN_TRANS;
res= test(ha_commit_trans(thd, TRUE));
}
@@ -150,6 +156,12 @@ bool trans_begin(THD *thd, uint flags)
*/
thd->mdl_context.release_transactional_locks();
+#ifdef WITH_WSREP
+ thd->wsrep_PA_safe= true;
+ if (thd->wsrep_client_thread && wsrep_causal_wait(thd))
+ DBUG_RETURN(TRUE);
+#endif /* WITH_WSREP */
+
thd->variables.option_bits|= OPTION_BEGIN;
thd->server_status|= SERVER_STATUS_IN_TRANS;
@@ -177,6 +189,9 @@ bool trans_commit(THD *thd)
if (trans_check(thd))
DBUG_RETURN(TRUE);
+#ifdef WITH_WSREP
+ wsrep_register_hton(thd, TRUE);
+#endif /* WITH_WSREP */
thd->server_status&= ~SERVER_STATUS_IN_TRANS;
res= ha_commit_trans(thd, TRUE);
if (res)
@@ -220,6 +235,9 @@ bool trans_commit_implicit(THD *thd)
/* Safety if one did "drop table" on locked tables */
if (!thd->locked_tables_mode)
thd->variables.option_bits&= ~OPTION_TABLE_LOCK;
+#ifdef WITH_WSREP
+ wsrep_register_hton(thd, TRUE);
+#endif /* WITH_WSREP */
thd->server_status&= ~SERVER_STATUS_IN_TRANS;
res= test(ha_commit_trans(thd, TRUE));
}
@@ -251,11 +269,16 @@ bool trans_commit_implicit(THD *thd)
bool trans_rollback(THD *thd)
{
int res;
- DBUG_ENTER("trans_rollback");
-
- if (trans_check(thd))
+ DBUG_ENTER("trans_rollback");
+#ifdef WITH_WSREP
+ thd->wsrep_PA_safe= true;
+#endif /* WITH_WSREP */
+ if (trans_check(thd))
DBUG_RETURN(TRUE);
+#ifdef WITH_WSREP
+ wsrep_register_hton(thd, TRUE);
+#endif /* WITH_WSREP */
thd->server_status&= ~SERVER_STATUS_IN_TRANS;
res= ha_rollback_trans(thd, TRUE);
RUN_HOOK(transaction, after_rollback, (thd, FALSE));
@@ -296,6 +319,9 @@ bool trans_commit_stmt(THD *thd)
if (thd->transaction.stmt.ha_list)
{
+#ifdef WITH_WSREP
+ wsrep_register_hton(thd, FALSE);
+#endif /* WITH_WSREP */
res= ha_commit_trans(thd, FALSE);
if (! thd->in_active_multi_stmt_transaction())
thd->tx_isolation= (enum_tx_isolation) thd->variables.tx_isolation;
@@ -338,9 +364,19 @@ bool trans_rollback_stmt(THD *thd)
if (thd->transaction.stmt.ha_list)
{
+#ifdef WITH_WSREP
+ wsrep_register_hton(thd, FALSE);
+#endif /* WITH_WSREP */
ha_rollback_trans(thd, FALSE);
if (thd->transaction_rollback_request && !thd->in_sub_stmt)
+#ifdef WITH_WSREP
+ {
+ wsrep_register_hton(thd, TRUE);
+#endif /* WITH_WSREP */
ha_rollback_trans(thd, TRUE);
+#ifdef WITH_WSREP
+ }
+#endif /* WITH_WSREP */
if (! thd->in_active_multi_stmt_transaction())
thd->tx_isolation= (enum_tx_isolation) thd->variables.tx_isolation;
}
@@ -681,6 +717,9 @@ bool trans_xa_commit(THD *thd)
}
else if (xa_state == XA_IDLE && thd->lex->xa_opt == XA_ONE_PHASE)
{
+#ifdef WITH_WSREP
+ wsrep_register_hton(thd, TRUE);
+#endif /* WITH_WSREP */
int r= ha_commit_trans(thd, TRUE);
if ((res= test(r)))
my_error(r == 1 ? ER_XA_RBROLLBACK : ER_XAER_RMERR, MYF(0));
@@ -702,6 +741,9 @@ bool trans_xa_commit(THD *thd)
if (thd->mdl_context.acquire_lock(&mdl_request,
thd->variables.lock_wait_timeout))
{
+#ifdef WITH_WSREP
+ wsrep_register_hton(thd, TRUE);
+#endif /* WITH_WSREP */
ha_rollback_trans(thd, TRUE);
my_error(ER_XAER_RMERR, MYF(0));
}