diff options
author | Seppo Jaakola <seppo.jaakola@codership.com> | 2012-04-13 01:33:24 +0300 |
---|---|---|
committer | Seppo Jaakola <seppo.jaakola@codership.com> | 2012-04-13 01:33:24 +0300 |
commit | 2fc1ec43560b453b4694adbc1aac11f3f23b1761 (patch) | |
tree | 880c3875dae28574a90bf891ef901027723d21f6 /sql | |
parent | 51c77ec5d406843bb8c8131f0687f4f75839d045 (diff) | |
download | mariadb-git-2fc1ec43560b453b4694adbc1aac11f3f23b1761.tar.gz |
Initial push of codership-wsrep API implementation for MariaDB.
Merge of:
lp:maria/5.5, #3334: http://bazaar.launchpad.net/~maria-captains/maria/5.5/revision/3334
lp:codership-mysql/5.5, #3725: http://bazaar.launchpad.net/~codership/codership-mysql/wsrep-5.5/revision/3725
Diffstat (limited to 'sql')
36 files changed, 3359 insertions, 48 deletions
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index 22c51f3ce23..f74931388bb 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -13,6 +13,10 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +IF(WITH_WSREP) + SET(WSREP_INCLUDES ${CMAKE_SOURCE_DIR}/wsrep) +ENDIF() + INCLUDE_DIRECTORIES( ${CMAKE_SOURCE_DIR}/include ${CMAKE_SOURCE_DIR}/sql @@ -20,6 +24,7 @@ ${CMAKE_SOURCE_DIR}/regex ${ZLIB_INCLUDE_DIR} ${SSL_INCLUDE_DIRS} ${CMAKE_BINARY_DIR}/sql +${WSREP_INCLUDES} ) SET(GEN_SOURCES @@ -35,6 +40,18 @@ ADD_DEFINITIONS(-DMYSQL_SERVER -DHAVE_EVENT_SCHEDULER -DHAVE_POOL_OF_THREADS) IF(SSL_DEFINES) ADD_DEFINITIONS(${SSL_DEFINES}) ENDIF() +IF(WITH_WSREP) + SET(WSREP_SOURCES + wsrep_check_opts.cc + wsrep_hton.cc + wsrep_mysqld.cc + wsrep_notify.cc + wsrep_sst.cc + wsrep_utils.cc + wsrep_var.cc + ) + SET(WSREP_LIB wsrep) +ENDIF() SET (SQL_SOURCE ../sql-common/client.c derror.cc des_key_file.cc @@ -86,6 +103,7 @@ SET (SQL_SOURCE threadpool_common.cc ../sql-common/mysql_async.c sql_logger.cc + ${WSREP_SOURCES} ${GEN_SOURCES} ${MYSYS_LIBWRAP_SOURCE} ) @@ -105,6 +123,7 @@ DTRACE_INSTRUMENT(sql) TARGET_LINK_LIBRARIES(sql ${MYSQLD_STATIC_PLUGIN_LIBS} mysys dbug strings vio regex ${LIBWRAP} ${LIBCRYPT} ${LIBDL} + ${WSREP_LIB} ${SSL_LIBRARIES}) IF(WIN32) @@ -200,7 +219,6 @@ IF (NOT ${CMAKE_CURRENT_SOURCE_DIR} STREQUAL ${CMAKE_CURRENT_BINARY_DIR}) ENDIF() ENDIF() - INCLUDE(${CMAKE_SOURCE_DIR}/cmake/bison.cmake) RUN_BISON( ${CMAKE_CURRENT_SOURCE_DIR}/sql_yacc.yy diff --git a/sql/events.cc b/sql/events.cc index 8b4bab9e3a6..208914a4e6d 100644 --- a/sql/events.cc +++ b/sql/events.cc @@ -1145,7 +1145,19 @@ end: close_mysql_tables(thd); DBUG_RETURN(ret); } +#ifdef WITH_WSREP +int wsrep_create_event_query(THD *thd, uchar** buf, uint* buf_len) +{ + String log_query; + if (create_query_string(thd, &log_query)) + { + WSREP_WARN("events create string failed: %s", thd->query()); + return 1; + } + return wsrep_to_buf_helper(thd, log_query.ptr(), log_query.length(), buf, buf_len); +} +#endif /* WITH_WSREP */ /** @} (End of group Event_Scheduler) */ diff --git a/sql/handler.cc b/sql/handler.cc index 0310180759c..c04759b196f 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -52,6 +52,9 @@ #include "../storage/maria/ha_maria.h" #endif +#ifdef WITH_WSREP +#include "wsrep_mysqld.h" +#endif /* While we have legacy_db_type, we have this array to check for dups and to find handlerton from legacy_db_type. @@ -1192,7 +1195,11 @@ int ha_commit_trans(THD *thd, bool all) { /* Free resources and perform other cleanup even for 'empty' transactions. */ if (is_real_trans) - thd->transaction.cleanup(); +#ifdef WITH_WSREP + thd->transaction.cleanup(thd); +#else + thd->transaction.cleanup(); +#endif /* WITH_WSREP */ DBUG_RETURN(0); } @@ -1220,7 +1227,12 @@ int ha_commit_trans(THD *thd, bool all) mdl_request.init(MDL_key::COMMIT, "", "", MDL_INTENTION_EXCLUSIVE, MDL_EXPLICIT); +#ifdef WITH_WSREP + if (!WSREP(thd) && + thd->mdl_context.acquire_lock(&mdl_request, +#else if (thd->mdl_context.acquire_lock(&mdl_request, +#endif /* WITH_WSREP */ thd->variables.lock_wait_timeout)) { ha_rollback_trans(thd, all); @@ -1267,6 +1279,19 @@ int ha_commit_trans(THD *thd, bool all) err= ht->prepare(ht, thd, all); status_var_increment(thd->status_var.ha_prepare_count); if (err) +#ifdef WITH_WSREP + if (WSREP(thd) && ht->db_type== DB_TYPE_WSREP) + { + error= 1; + /* avoid sending error, if we need to replay */ + if (thd->wsrep_conflict_state!= MUST_REPLAY) + { + my_error(ER_LOCK_DEADLOCK, MYF(0), err); + } + } + else + /* not wsrep hton, bail to native mysql behavior */ +#endif my_error(ER_ERROR_DURING_COMMIT, MYF(0), err); if (err) @@ -1277,6 +1302,13 @@ int ha_commit_trans(THD *thd, bool all) } DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_SUICIDE();); +#ifdef WITH_WSREP + if (!error && wsrep_is_wsrep_xid(&thd->transaction.xid_state.xid)) + { + // xid was rewritten by wsrep + xid= wsrep_xid_seqno(&thd->transaction.xid_state.xid); + } +#endif // WITH_WSREP if (!is_real_trans) { error= commit_one_phase_2(thd, all, trans, is_real_trans); @@ -1363,6 +1395,18 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans) int error= 0; Ha_trx_info *ha_info= trans->ha_list, *ha_info_next; DBUG_ENTER("commit_one_phase_2"); +#ifdef WITH_WSREP +#ifdef WSREP_PROC_INFO + char info[64]= { 0, }; + snprintf (info, sizeof(info) - 1, "ha_commit_one_phase(%lld)", + (long long)thd->wsrep_trx_seqno); +#else + const char info[]="ha_commit_one_phase()"; +#endif /* WSREP_PROC_INFO */ + char* tmp_info= NULL; + if (WSREP(thd)) tmp_info= (char *)thd_proc_info(thd, info); +#endif /* WITH_WSREP */ + if (ha_info) { for (; ha_info; ha_info= ha_info_next) @@ -1391,7 +1435,14 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans) } /* Free resources and perform other cleanup even for 'empty' transactions. */ if (is_real_trans) +#ifdef WITH_WSREP + thd->transaction.cleanup(thd); +#else thd->transaction.cleanup(); +#endif /* WITH_WSREP */ +#ifdef WITH_WSREP + if (WSREP(thd)) thd_proc_info(thd, tmp_info); +#endif /* WITH_WSREP */ DBUG_RETURN(error); } @@ -1466,7 +1517,11 @@ int ha_rollback_trans(THD *thd, bool all) } /* Always cleanup. Even if nht==0. There may be savepoints. */ if (is_real_trans) +#ifdef WITH_WSREP + thd->transaction.cleanup(thd); +#else thd->transaction.cleanup(); +#endif /* WITH_WSREP */ if (all) thd->transaction_rollback_request= FALSE; @@ -1631,7 +1686,13 @@ static my_bool xarecover_handlerton(THD *unused, plugin_ref plugin, got, hton_name(hton)->str); for (int i=0; i < got; i ++) { +#ifdef WITH_WSREP + my_xid x=(wsrep_is_wsrep_xid(&info->list[i]) ? + wsrep_xid_seqno(&info->list[i]) : + info->list[i].get_my_xid()); +#else my_xid x=info->list[i].get_my_xid(); +#endif /* WITH_WSREP */ if (!x) // not "mine" - that is generated by external TM { #ifndef DBUG_OFF @@ -2635,7 +2696,12 @@ int handler::update_auto_increment() variables->auto_increment_increment); auto_inc_intervals_count++; /* Row-based replication does not need to store intervals in binlog */ +#ifdef WITH_WSREP + if (((WSREP(thd) && wsrep_emulate_bin_log) || mysql_bin_log.is_open()) && + !thd->is_current_stmt_binlog_format_row()) +#else if (mysql_bin_log.is_open() && !thd->is_current_stmt_binlog_format_row()) +#endif /* WITH_WSREP */ thd->auto_inc_intervals_in_cur_stmt_for_binlog.append(auto_inc_interval_for_cur_row.minimum(), auto_inc_interval_for_cur_row.values(), variables->auto_increment_increment); @@ -4821,7 +4887,11 @@ static bool check_table_binlog_row_based(THD *thd, TABLE *table) return (thd->is_current_stmt_binlog_format_row() && table->s->cached_row_logging_check && (thd->variables.option_bits & OPTION_BIN_LOG) && +#ifdef WITH_WSREP + ((WSREP(thd) && wsrep_emulate_bin_log) || mysql_bin_log.is_open())); +#else mysql_bin_log.is_open()); +#endif } @@ -5142,6 +5212,41 @@ void signal_log_not_needed(struct handlerton, char *log_file) } +#ifdef WITH_WSREP +/** + @details + This function makes the storage engine to force the victim transaction + to abort. Currently, only innodb has this functionality, but any SE + implementing the wsrep API should provide this service to support + multi-master operation. + + @param bf_thd brute force THD asking for the abort + @param victim_thd victim THD to be aborted + + @return + always 0 +*/ + +int ha_wsrep_abort_transaction(THD *bf_thd, THD *victim_thd, my_bool signal) +{ + DBUG_ENTER("ha_wsrep_abort_transaction"); + if (!WSREP(bf_thd)) { + DBUG_RETURN(0); + } + + handlerton *hton= installed_htons[DB_TYPE_INNODB]; + if (hton && hton->wsrep_abort_transaction) + { + hton->wsrep_abort_transaction(hton, bf_thd, victim_thd, signal); + } + else + { + WSREP_WARN("cannot abort InnoDB transaction"); + } + + DBUG_RETURN(0); +} +#endif /* WITH_WSREP */ #ifdef TRANS_LOG_MGM_EXAMPLE_CODE /* Example of transaction log management functions based on assumption that logs diff --git a/sql/handler.h b/sql/handler.h index d56e3242ddd..4ac7202587e 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -355,6 +355,7 @@ enum legacy_db_type DB_TYPE_MARIA, /** Performance schema engine. */ DB_TYPE_PERFORMANCE_SCHEMA, + DB_TYPE_WSREP, DB_TYPE_FIRST_DYNAMIC=42, DB_TYPE_DEFAULT=127 // Must be last }; @@ -1042,6 +1043,10 @@ struct handlerton const char *wild, bool dir, List<LEX_STRING> *files); int (*table_exists_in_engine)(handlerton *hton, THD* thd, const char *db, const char *name); + int (*wsrep_abort_transaction)(handlerton *hton, THD *bf_thd, + THD *victim_thd, my_bool signal); + int (*wsrep_set_checkpoint)(handlerton *hton, const XID* xid); + int (*wsrep_get_checkpoint)(handlerton *hton, XID* xid); uint32 license; /* Flag for Engine License */ /* Optional clauses in the CREATE/ALTER TABLE @@ -2885,6 +2890,9 @@ bool key_uses_partial_cols(TABLE *table, uint keyno); extern const char *ha_row_type[]; extern MYSQL_PLUGIN_IMPORT const char *tx_isolation_names[]; extern MYSQL_PLUGIN_IMPORT const char *binlog_format_names[]; +#ifdef WITH_WSREP +extern MYSQL_PLUGIN_IMPORT const char *wsrep_binlog_format_names[]; +#endif /* WITH_WSREP */ extern TYPELIB tx_isolation_typelib; extern const char *myisam_stats_method_names[]; extern ulong total_ha, total_ha_2pc; @@ -2980,6 +2988,9 @@ int ha_enable_transaction(THD *thd, bool on); int ha_rollback_to_savepoint(THD *thd, SAVEPOINT *sv); int ha_savepoint(THD *thd, SAVEPOINT *sv); int ha_release_savepoint(THD *thd, SAVEPOINT *sv); +#ifdef WITH_WSREP +int ha_wsrep_abort_transaction(THD *bf_thd, THD *victim_thd, my_bool signal); +#endif /* WITH_WSREP */ /* these are called by storage engines */ void trans_register_ha(THD *thd, bool all, handlerton *ht); @@ -3010,6 +3021,9 @@ int ha_binlog_end(THD *thd); #define ha_binlog_wait(a) do {} while (0) #define ha_binlog_end(a) do {} while (0) #endif +#ifdef WITH_WSREP +void wsrep_brute_force_aborts(); +#endif const char *get_canonical_filename(handler *file, const char *path, char *tmp_path); diff --git a/sql/item_func.cc b/sql/item_func.cc index 92431a552c4..c56909d4335 100644 --- a/sql/item_func.cc +++ b/sql/item_func.cc @@ -2583,7 +2583,19 @@ void Item_func_rand::seed_random(Item *arg) TODO: do not do reinit 'rand' for every execute of PS/SP if args[0] is a constant. */ +#ifdef WITH_WSREP + uint32 tmp; + if (WSREP(current_thd)) + { + if (current_thd->wsrep_exec_mode==REPL_RECV) + tmp= current_thd->wsrep_rand; + else + tmp= current_thd->wsrep_rand= (uint32) arg->val_int(); + } else + tmp= (uint32) arg->val_int(); +#else uint32 tmp= (uint32) arg->val_int(); +#endif /* WITH_WSREP */ my_rnd_init(rand, (uint32) (tmp*0x10001L+55555555L), (uint32) (tmp*0x10000001L)); } diff --git a/sql/lock.cc b/sql/lock.cc index a7029548493..bbaa51dbbae 100644 --- a/sql/lock.cc +++ b/sql/lock.cc @@ -84,6 +84,10 @@ #include <hash.h> #include <assert.h> +#ifdef WITH_WSREP +#include "wsrep_mysqld.h" +#endif /* WITH_WSREP */ + /** @defgroup Locking Locking @{ @@ -314,6 +318,9 @@ bool mysql_lock_tables(THD *thd, MYSQL_LOCK *sql_lock, uint flags) /* Copy the lock data array. thr_multi_lock() reorders its contents. */ memcpy(sql_lock->locks + sql_lock->lock_count, sql_lock->locks, sql_lock->lock_count * sizeof(*sql_lock->locks)); +#ifdef WITH_WSREP + thd->lock_info.in_lock_tables= thd->in_lock_tables; +#endif /* Lock on the copied half of the lock data array. */ /* Lock on the copied half of the lock data array. */ rc= thr_lock_errno_to_mysql[(int) thr_multi_lock(sql_lock->locks + sql_lock->lock_count, @@ -323,7 +330,11 @@ bool mysql_lock_tables(THD *thd, MYSQL_LOCK *sql_lock, uint flags) (void) unlock_external(thd, sql_lock->table, sql_lock->table_count); end: +#ifdef WITH_WSREP + thd_proc_info(thd, "mysql_lock_tables(): unlocking tables II"); +#else /* WITH_WSREP */ thd_proc_info(thd, 0); +#endif /* WITH_WSREP */ if (thd->killed) { @@ -336,6 +347,9 @@ end: my_error(rc, MYF(0)); thd->set_time_after_lock(); +#ifdef WITH_WSREP + thd_proc_info(thd, "exit mysqld_lock_tables()"); +#endif /* WITH_WSREP */ DBUG_RETURN(rc); } @@ -1045,11 +1059,15 @@ void Global_read_lock::unlock_global_read_lock(THD *thd) { thd->mdl_context.release_lock(m_mdl_blocks_commits_lock); m_mdl_blocks_commits_lock= NULL; +#ifdef WITH_WSREP + wsrep_locked_seqno= WSREP_SEQNO_UNDEFINED; + wsrep->resume(wsrep); +#endif /* WITH_WSREP */ } thd->mdl_context.release_lock(m_mdl_global_shared_lock); m_mdl_global_shared_lock= NULL; m_state= GRL_NONE; - + DBUG_VOID_RETURN; } @@ -1077,9 +1095,39 @@ bool Global_read_lock::make_global_read_lock_block_commit(THD *thd) If we didn't succeed lock_global_read_lock(), or if we already suceeded make_global_read_lock_block_commit(), do nothing. */ + +#ifdef WITH_WSREP + if (m_mdl_blocks_commits_lock) + { + WSREP_DEBUG("GRL was in block commit mode when entering " + "make_global_read_lock_block_commit"); + thd->mdl_context.release_lock(m_mdl_blocks_commits_lock); + m_mdl_blocks_commits_lock= NULL; + wsrep_locked_seqno= WSREP_SEQNO_UNDEFINED; + wsrep->resume(wsrep); + m_state= GRL_ACQUIRED; + } +#endif /* WITH_WSREP */ + if (m_state != GRL_ACQUIRED) DBUG_RETURN(0); +#ifdef WITH_WSREP + long long ret = wsrep->pause(wsrep); + if (ret >= 0) + { + wsrep_locked_seqno= ret; + } + else if (ret != -ENOSYS) /* -ENOSYS - no provider */ + { + WSREP_ERROR("Failed to pause provider: %lld (%s)", -ret, strerror(-ret)); + + /* m_mdl_blocks_commits_lock is always NULL here */ + wsrep_locked_seqno= WSREP_SEQNO_UNDEFINED; + my_error(ER_LOCK_DEADLOCK, MYF(0)); + DBUG_RETURN(TRUE); + } +#endif /* WITH_WSREP */ mdl_request.init(MDL_key::COMMIT, "", "", MDL_SHARED, MDL_EXPLICIT); if (thd->mdl_context.acquire_lock(&mdl_request, diff --git a/sql/log.cc b/sql/log.cc index 577297fa1a4..ee7d548d81a 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -51,6 +51,9 @@ #include "sql_plugin.h" #include "rpl_handler.h" +#ifdef WITH_WSREP +#include "wsrep_mysqld.h" +#endif /* WITH_WSREP */ #include "debug_sync.h" /* max size of the log message */ @@ -486,6 +489,9 @@ private: }; handlerton *binlog_hton; +#ifdef WITH_WSREP +extern handlerton *wsrep_hton; +#endif bool LOGGER::is_log_table_enabled(uint log_table_type) { @@ -500,6 +506,134 @@ bool LOGGER::is_log_table_enabled(uint log_table_type) } } +#ifdef WITH_WSREP +IO_CACHE * get_trans_log(THD * thd) +{ + binlog_cache_mngr *cache_mngr = (binlog_cache_mngr*) + thd_get_ha_data(thd, binlog_hton); + if (cache_mngr) + { + return cache_mngr->get_binlog_cache_log(true); + } + else + { + WSREP_DEBUG("binlog cache not initialized, conn :%ld", thd->thread_id); + return NULL; + } +} + + +bool wsrep_trans_cache_is_empty(THD *thd) +{ + bool res= TRUE; + + if (thd_sql_command((const THD*) thd) != SQLCOM_SELECT) + res= FALSE; + else + { + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + if (cache_mngr) + { + res= cache_mngr->trx_cache.empty(); + } + } + return res; +} + +void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end) +{ + thd->binlog_flush_pending_rows_event(stmt_end); +} +void thd_binlog_trx_reset(THD * thd) +{ + /* + todo: fix autocommit select to not call the caller + */ + if (thd_get_ha_data(thd, binlog_hton) != NULL) + { + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + if (cache_mngr) cache_mngr->reset(TRUE, TRUE); + } + thd->clear_binlog_table_maps(); +} + +void thd_binlog_rollback_stmt(THD * thd) +{ + WSREP_DEBUG("thd_binlog_rollback_stmt :%ld", thd->thread_id); + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + if (cache_mngr) cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF); +} +/* + Write the contents of a cache to memory buffer. + + This function quite the same as MYSQL_BIN_LOG::write_cache(), + with the exception that here we write in buffer instead of log file. + */ + +int wsrep_write_cache(IO_CACHE *cache, uchar **buf, uint *buf_len) +{ + + if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) + return ER_ERROR_ON_WRITE; + uint length= my_b_bytes_in_cache(cache); + long long total_length = 0; + uchar *buf_ptr = NULL; + + do + { + /* bail out if buffer grows too large + This is a temporary fix to avoid flooding replication + TODO: remove this check for 0.7.4 release + */ + if (total_length > wsrep_max_ws_size) + { + WSREP_WARN("transaction size limit (%lld) exceeded: %lld", + wsrep_max_ws_size, total_length); + if (reinit_io_cache(cache, WRITE_CACHE, 0, 0, 0)) + { + WSREP_WARN("failed to initialize io-cache"); + } + if (buf_ptr) my_free(*buf); + *buf_len = 0; + return ER_ERROR_ON_WRITE; + } + if (total_length > 0) + { + *buf_len += length; + *buf = (uchar *)my_realloc(*buf, total_length+length, MYF(0)); + if (!*buf) + { + WSREP_ERROR("io cache write problem: %d %d", *buf_len, length); + return ER_ERROR_ON_WRITE; + } + buf_ptr = *buf+total_length; + } + else + { + if (buf_ptr != NULL) + { + WSREP_ERROR("io cache alloc error: %d %d", *buf_len, length); + my_free(*buf); + } + if (length > 0) + { + *buf = (uchar *) my_malloc(length, MYF(0)); + buf_ptr = *buf; + *buf_len = length; + } + } + total_length += length; + + memcpy(buf_ptr, cache->read_pos, length); + cache->read_pos=cache->read_end; + } while ((cache->file >= 0) && (length= my_b_fill(cache))); + + return 0; +} +#endif /* Check if a given table is opened log table */ int check_if_log_table(size_t db_len, const char *db, size_t table_name_len, @@ -1536,7 +1670,11 @@ binlog_trans_log_savepos(THD *thd, my_off_t *pos) thd->binlog_setup_trx_data(); binlog_cache_mngr *const cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); +#ifdef WITH_WSREP + DBUG_ASSERT((WSREP(thd) && wsrep_emulate_bin_log) || mysql_bin_log.is_open()); +#else DBUG_ASSERT(mysql_bin_log.is_open()); +#endif *pos= cache_mngr->trx_cache.get_byte_position(); DBUG_PRINT("return", ("*pos: %lu", (ulong) *pos)); DBUG_VOID_RETURN; @@ -1584,7 +1722,16 @@ binlog_trans_log_truncate(THD *thd, my_off_t pos) int binlog_init(void *p) { binlog_hton= (handlerton *)p; +#ifdef WITH_WSREP + if (WSREP_ON) + binlog_hton->state= SHOW_OPTION_YES; + else + { +#endif /* WITH_WSREP */ binlog_hton->state=opt_bin_log ? SHOW_OPTION_YES : SHOW_OPTION_NO; +#ifdef WITH_WSREP + } +#endif /* WITH_WSREP */ binlog_hton->db_type=DB_TYPE_BINLOG; binlog_hton->savepoint_offset= sizeof(my_off_t); binlog_hton->close_connection= binlog_close_connection; @@ -1840,6 +1987,9 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) DBUG_ENTER("binlog_commit"); binlog_cache_mngr *const cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); +#ifdef WITH_WSREP + if (!cache_mngr) DBUG_RETURN(0); +#endif /* WITH_WSREP */ DBUG_PRINT("debug", ("all: %d, in_transaction: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s", @@ -1896,6 +2046,9 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) int error= 0; binlog_cache_mngr *const cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); +#ifdef WITH_WSREP + if (!cache_mngr) DBUG_RETURN(0); +#endif /* WITH_WSREP */ DBUG_PRINT("debug", ("all: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s", YESNO(all), @@ -1924,8 +2077,12 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) cache_mngr->reset(false, true); DBUG_RETURN(error); } - +#ifdef WITH_WSREP + if (!wsrep_emulate_bin_log && + mysql_bin_log.check_write_error(thd)) +#else if (mysql_bin_log.check_write_error(thd)) +#endif { /* "all == true" means that a "rollback statement" triggered the error and @@ -1955,12 +2112,12 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) if (ending_trans(thd, all) && ((thd->variables.option_bits & OPTION_KEEP_LOG) || (trans_has_updated_non_trans_table(thd) && - thd->variables.binlog_format == BINLOG_FORMAT_STMT) || + WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_STMT) || (cache_mngr->trx_cache.changes_to_non_trans_temp_table() && - thd->variables.binlog_format == BINLOG_FORMAT_MIXED) || + WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_MIXED) || (trans_has_updated_non_trans_table(thd) && ending_single_stmt_trans(thd,all) && - thd->variables.binlog_format == BINLOG_FORMAT_MIXED))) + WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_MIXED))) error= binlog_rollback_flush_trx_cache(thd, all, cache_mngr); /* Truncate the cache if: @@ -1974,9 +2131,9 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) else if (ending_trans(thd, all) || (!(thd->variables.option_bits & OPTION_KEEP_LOG) && (!stmt_has_updated_non_trans_table(thd) || - thd->variables.binlog_format != BINLOG_FORMAT_STMT) && + WSREP_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_STMT) && (!cache_mngr->trx_cache.changes_to_non_trans_temp_table() || - thd->variables.binlog_format != BINLOG_FORMAT_MIXED))) + WSREP_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_MIXED))) error= binlog_truncate_trx_cache(thd, cache_mngr, all); } @@ -2070,7 +2227,9 @@ static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv) binlog_trans_log_savepos(thd, (my_off_t*) sv); /* Write it to the binary log */ - +#ifdef WITH_WSREP + if (wsrep_emulate_bin_log) DBUG_RETURN(0); +#endif /* WITH_WSREP */ String log_query; if (log_query.append(STRING_WITH_LEN("SAVEPOINT ")) || log_query.append("`") || @@ -2092,7 +2251,12 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv) non-transactional table. Otherwise, truncate the binlog cache starting from the SAVEPOINT command. */ +#ifdef WITH_WSREP + if (!wsrep_emulate_bin_log && + unlikely(trans_has_updated_non_trans_table(thd) || +#else if (unlikely(trans_has_updated_non_trans_table(thd) || +#endif (thd->variables.option_bits & OPTION_KEEP_LOG))) { String log_query; @@ -4720,6 +4884,7 @@ int THD::binlog_setup_trx_data() DBUG_RETURN(0); } + /* Function to start a statement and optionally a transaction for the binary log. @@ -4839,7 +5004,12 @@ int THD::binlog_write_table_map(TABLE *table, bool is_transactional, table->s->table_map_id)); /* Pre-conditions */ +#ifdef WITH_WSREP + DBUG_ASSERT(is_current_stmt_binlog_format_row() && + (WSREP_EMULATE_BINLOG(this) || mysql_bin_log.is_open())); +#else DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open()); +#endif DBUG_ASSERT(table->s->table_map_id != ULONG_MAX); Table_map_log_event @@ -4976,7 +5146,11 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, bool is_transactional) { DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)"); +#ifdef WITH_WSREP + DBUG_ASSERT(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()); +#else DBUG_ASSERT(mysql_bin_log.is_open()); +#endif DBUG_PRINT("enter", ("event: 0x%lx", (long) event)); int error= 0; @@ -5056,7 +5230,11 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) mostly called if is_open() *was* true a few instructions before, but it could have changed since. */ +#ifdef WITH_WSREP + if ((WSREP(thd) && wsrep_emulate_bin_log) || is_open()) +#else if (likely(is_open())) +#endif { my_off_t UNINIT_VAR(my_org_b_tell); #ifdef HAVE_REPLICATION @@ -5237,6 +5415,35 @@ err: } } +#ifdef WITH_WSREP + if (WSREP(thd) && wsrep_incremental_data_collection && + (wsrep_emulate_bin_log || mysql_bin_log.is_open())) + { + DBUG_ASSERT(thd->wsrep_trx_handle.trx_id != (unsigned long)-1); + if (!error) + { + IO_CACHE* cache= get_trans_log(thd); + uchar* buf= NULL; + uint buf_len= 0; + + if (wsrep_emulate_bin_log) + thd->binlog_flush_pending_rows_event(false); + error= wsrep_write_cache(cache, &buf, &buf_len); + if (!error && buf_len > 0) + { + wsrep_status_t rc= wsrep->append_data(wsrep, + &thd->wsrep_trx_handle, + buf, buf_len); + if (rc != WSREP_OK) + { + sql_print_warning("WSREP: append_data() returned %d", rc); + error= 1; + } + } + if (buf_len) my_free(buf); + } + } +#endif /* WITH_WSREP */ DBUG_RETURN(error); } @@ -5329,6 +5536,14 @@ int MYSQL_BIN_LOG::rotate(bool force_rotate, bool* check_purge) { int error= 0; DBUG_ENTER("MYSQL_BIN_LOG::rotate"); +#ifdef WITH_WSREP + if (WSREP_ON && wsrep_to_isolation) + { + WSREP_DEBUG("avoiding binlog rotate due to TO isolation: %d", + wsrep_to_isolation); + DBUG_RETURN(0); + } +#endif //todo: fix the macro def and restore safe_mutex_assert_owner(&LOCK_log); *check_purge= false; @@ -5797,6 +6012,9 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, group_commit_entry entry; DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_to_binlog"); +#ifdef WITH_WSREP + if (wsrep_emulate_bin_log) DBUG_RETURN(0); +#endif /* WITH_WSREP */ entry.thd= thd; entry.cache_mngr= cache_mngr; entry.error= 0; @@ -7426,7 +7644,13 @@ TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all, binlog_cache_mngr *cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); - +#ifdef WITH_WSREP + if (!cache_mngr) + { + WSREP_DEBUG("Skipping empty log_xid: %s", thd->query()); + DBUG_RETURN(1); + } +#endif /* WITH_WSREP */ cache_mngr->using_xa= TRUE; cache_mngr->xa_xid= xid; err= binlog_commit_flush_xid_caches(thd, cache_mngr, all, xid); diff --git a/sql/log.h b/sql/log.h index e8f59801683..2bc4d0e49d7 100644 --- a/sql/log.h +++ b/sql/log.h @@ -94,7 +94,7 @@ public: int log_and_order(THD *thd, my_xid xid, bool all, bool need_prepare_ordered, bool need_commit_ordered) { - DBUG_ASSERT(0 /* Internal error - TC_LOG_DUMMY::log_and_order() called */); + //DBUG_ASSERT(0 /* Internal error - TC_LOG_DUMMY::log_and_order() called */); return 1; } int unlog(ulong cookie, my_xid xid) { return 0; } @@ -267,6 +267,12 @@ enum enum_log_state { LOG_OPENED, LOG_CLOSED, LOG_TO_BE_OPENED }; (mmap+fsync is two times faster than write+fsync) */ +#ifdef WITH_WSREP +extern my_bool wsrep_emulate_bin_log; +Log_event* wsrep_read_log_event( + char **arg_buf, size_t *arg_buf_len, + const Format_description_log_event *description_event); +#endif class MYSQL_LOG { public: @@ -846,12 +852,30 @@ public: }; enum enum_binlog_format { + /* + statement-based except for cases where only row-based can work (UUID() + etc): + */ BINLOG_FORMAT_MIXED= 0, ///< statement if safe, otherwise row - autodetected BINLOG_FORMAT_STMT= 1, ///< statement-based BINLOG_FORMAT_ROW= 2, ///< row-based BINLOG_FORMAT_UNSPEC=3 ///< thd_binlog_format() returns it when binlog is closed }; +#ifdef WITH_WSREP +IO_CACHE * get_trans_log(THD * thd); +bool wsrep_trans_cache_is_empty(THD *thd); +void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end); +void thd_binlog_trx_reset(THD * thd); +void thd_binlog_rollback_stmt(THD * thd); +int wsrep_write_cache(IO_CACHE *cache, uchar **buf, uint *buf_len); + +#define WSREP_FORMAT(my_format) \ + ((wsrep_forced_binlog_format != BINLOG_FORMAT_UNSPEC) ? \ + wsrep_forced_binlog_format : my_format) +#else +#define WSREP_FORMAT(my_format) my_format +#endif int query_error_code(THD *thd, bool not_killed); uint purge_log_get_error_code(int res); diff --git a/sql/log_event.cc b/sql/log_event.cc index cec0785a088..05340fff03a 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -46,6 +46,9 @@ #include "transaction.h" #include <my_dir.h> +#if WITH_WSREP +#include "wsrep_mysqld.h" +#endif #endif /* MYSQL_CLIENT */ #include <base64.h> @@ -2769,7 +2772,9 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, master_data_written(0) { time_t end_time; - +#ifdef WITH_WSREP + thd->wsrep_PA_safe= false; +#endif /* WITH_WSREP */ memset(&user, 0, sizeof(user)); memset(&host, 0, sizeof(host)); @@ -6983,7 +6988,14 @@ err: end_io_cache(&file); if (fd >= 0) mysql_file_close(fd, MYF(0)); +#ifdef WITH_WSREP + if (WSREP(thd)) + thd_proc_info(thd, "exit Create_file_log_event::do_apply_event()"); + else + thd_proc_info(thd, 0); +#else /* WITH_WSREP */ thd_proc_info(thd, 0); +#endif /* WITH_WSREP */ return error != 0; } #endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */ @@ -7154,7 +7166,14 @@ int Append_block_log_event::do_apply_event(Relay_log_info const *rli) err: if (fd >= 0) mysql_file_close(fd, MYF(0)); +#ifdef WITH_WSREP + if (WSREP(thd)) + thd_proc_info(thd, "exit Append_block_log_event::do_apply_event()"); + else + thd_proc_info(thd, 0); +#else /* WITH_WSREP */ thd_proc_info(thd, 0); +#endif /* WITH_WSREP */ DBUG_RETURN(error); } #endif @@ -8097,7 +8116,17 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) if (open_and_lock_tables(thd, rli->tables_to_lock, FALSE, 0)) { +#ifdef WITH_WSREP + uint actual_error= ER_SERVER_SHUTDOWN; + if (WSREP(thd) && !thd->is_fatal_error) + { + sql_print_information("WSREP, BF applier interrupted in log_event.cc"); + } + else + actual_error= thd->stmt_da->sql_errno(); +#else uint actual_error= thd->stmt_da->sql_errno(); +#endif if (thd->is_slave_error || thd->is_fatal_error) { /* @@ -9842,8 +9871,23 @@ int Write_rows_log_event::do_exec_row(const Relay_log_info *const rli) { DBUG_ASSERT(m_table != NULL); +#ifdef WITH_WSREP +#ifdef WSREP_PROC_INFO + char info[64]; + info[sizeof(info) - 1] = '\0'; + snprintf(info, sizeof(info) - 1, "Write_rows_log_event::write_row(%lld)", + (long long) thd->wsrep_trx_seqno); + const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL; +#else + const char* tmp = (WSREP(thd)) ? + thd_proc_info(thd,"Write_rows_log_event::write_row()") : NULL; +#endif /* WSREP_PROC_INFO */ +#endif /* WITH_WSREP */ int error= write_row(rli, slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT); +#ifdef WITH_WSREP + if (WSREP(thd)) thd_proc_info(thd, tmp); +#endif /* WITH_WSREP */ if (error && !thd->is_error()) { DBUG_ASSERT(0); @@ -10496,14 +10540,39 @@ int Delete_rows_log_event::do_exec_row(const Relay_log_info *const rli) int error; DBUG_ASSERT(m_table != NULL); +#ifdef WITH_WSREP +#ifdef WSREP_PROC_INFO + char info[64]; + info[sizeof(info) - 1] = '\0'; + snprintf(info, sizeof(info) - 1, "Delete_rows_log_event::find_row(%lld)", + (long long) thd->wsrep_trx_seqno); + const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL; +#else + const char* tmp = (WSREP(thd)) ? + thd_proc_info(thd,"Delete_rows_log_event::find_row()") : NULL; +#endif /* WSREP_PROC_INFO */ +#endif /* WITH_WSREP */ if (!(error= find_row(rli))) { /* Delete the record found, located in record[0] */ +#ifdef WITH_WSREP +#ifdef WSREP_PROC_INFO + snprintf(info, sizeof(info) - 1, + "Delete_rows_log_event::ha_delete_row(%lld)", + (long long) thd->wsrep_trx_seqno); + if (WSREP(thd)) thd_proc_info(thd, info); +#else + if (WSREP(thd)) thd_proc_info(thd,"Delete_rows_log_event::ha_delete_row()"); +#endif /* WSREP_PROC_INFO */ +#endif /* WITH_WSREP */ error= m_table->file->ha_delete_row(m_table->record[0]); m_table->file->ha_index_or_rnd_end(); } +#ifdef WITH_WSREP + if (WSREP(thd)) thd_proc_info(thd, tmp); +#endif /* WITH_WSREP */ return error; } @@ -10617,6 +10686,18 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli) { DBUG_ASSERT(m_table != NULL); +#ifdef WITH_WSREP +#ifdef WSREP_PROC_INFO + char info[64]; + info[sizeof(info) - 1] = '\0'; + snprintf(info, sizeof(info) - 1, "Update_rows_log_event::find_row(%lld)", + (long long) thd->wsrep_trx_seqno); + const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL; +#else + const char* tmp = (WSREP(thd)) ? + thd_proc_info(thd,"Update_rows_log_event::find_row()") : NULL; +#endif /* WSREP_PROC_INFO */ +#endif /* WITH_WSREP */ int error= find_row(rli); if (error) { @@ -10643,6 +10724,17 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli) store_record(m_table,record[1]); m_curr_row= m_curr_row_end; +#ifdef WITH_WSREP +#ifdef WSREP_PROC_INFO + snprintf(info, sizeof(info) - 1, + "Update_rows_log_event::unpack_current_row(%lld)", + (long long) thd->wsrep_trx_seqno); + if (WSREP(thd)) thd_proc_info(thd, info); +#else + if (WSREP(thd)) + thd_proc_info(thd,"Update_rows_log_event::unpack_current_row()"); +#endif /* WSREP_PROC_INFO */ +#endif /* WITH_WSREP */ /* this also updates m_curr_row_end */ if ((error= unpack_current_row(rli))) goto err; @@ -10661,10 +10753,23 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli) DBUG_DUMP("new values", m_table->record[0], m_table->s->reclength); #endif +#ifdef WITH_WSREP +#ifdef WSREP_PROC_INFO + snprintf(info, sizeof(info) - 1, + "Update_rows_log_event::ha_update_row(%lld)", + (long long) thd->wsrep_trx_seqno); + if (WSREP(thd)) thd_proc_info(thd, info); +#else + if (WSREP(thd)) thd_proc_info(thd,"Update_rows_log_event::ha_update_row()"); +#endif /* WSREP_PROC_INFO */ +#endif /* WITH_WSREP */ error= m_table->file->ha_update_row(m_table->record[1], m_table->record[0]); if (error == HA_ERR_RECORD_IS_THE_SAME) error= 0; +#ifdef WITH_WSREP + if (WSREP(thd)) thd_proc_info(thd, tmp); +#endif /* WITH_WSREP */ err: m_table->file->ha_index_or_rnd_end(); return error; @@ -10749,6 +10854,50 @@ void Incident_log_event::pack_info(Protocol *protocol) protocol->store(buf, bytes, &my_charset_bin); } #endif +#if WITH_WSREP && !defined(MYSQL_CLIENT) +Format_description_log_event *wsrep_format_desc; // TODO: free them at the end +/* + read the first event from (*buf). The size of the (*buf) is (*buf_len). + At the end (*buf) is shitfed to point to the following event or NULL and + (*buf_len) will be changed to account just being read bytes of the 1st event. +*/ +Log_event* wsrep_read_log_event( + char **arg_buf, size_t *arg_buf_len, + const Format_description_log_event *description_event) +{ + DBUG_ENTER("wsrep_read_log_event"); + char *head= (*arg_buf); + + uint data_len = uint4korr(head + EVENT_LEN_OFFSET); + char *buf= (*arg_buf); + const char *error= 0; + Log_event *res= 0; +#ifndef max_allowed_packet + THD *thd=current_thd; + uint max_allowed_packet= thd ? thd->variables.max_allowed_packet : ~(ulong)0; +#endif + + if (data_len > max_allowed_packet) + { + error = "Event too big"; + goto err; + } + + res= Log_event::read_log_event(buf, data_len, &error, description_event, FALSE); + +err: + if (!res) + { + DBUG_ASSERT(error != 0); + sql_print_error("Error in Log_event::read_log_event(): " + "'%s', data_len: %d, event_type: %d", + error,data_len,head[EVENT_TYPE_OFFSET]); + } + (*arg_buf)+= data_len; + (*arg_buf_len)-= data_len; + DBUG_RETURN(res); +} +#endif #ifdef MYSQL_CLIENT diff --git a/sql/mdl.cc b/sql/mdl.cc index ca552a540b9..8ff420e4f50 100644 --- a/sql/mdl.cc +++ b/sql/mdl.cc @@ -20,7 +20,17 @@ #include <mysqld_error.h> #include <mysql/plugin.h> #include <mysql/service_thd_wait.h> - +#ifdef WITH_WSREP +#include "wsrep_mysqld.h" +extern "C" my_thread_id wsrep_thd_thread_id(THD *thd); +extern "C" char *wsrep_thd_query(THD *thd); +void sql_print_information(const char *format, ...) + ATTRIBUTE_FORMAT(printf, 1, 2); + +extern bool +wsrep_grant_mdl_exception(MDL_context *requestor_ctx, + MDL_ticket *ticket); +#endif /* WITH_WSREP */ #ifdef HAVE_PSI_INTERFACE static PSI_mutex_key key_MDL_map_mutex; static PSI_mutex_key key_MDL_wait_LOCK_wait_status; @@ -1222,11 +1232,54 @@ void MDL_lock::Ticket_list::add_ticket(MDL_ticket *ticket) called by other threads. */ DBUG_ASSERT(ticket->get_lock()); +#ifdef WITH_WSREP + if ((this == &(ticket->get_lock()->m_waiting)) && + wsrep_thd_is_brute_force((void *)(ticket->get_ctx()->get_thd()))) + { + Ticket_iterator itw(ticket->get_lock()->m_waiting); + Ticket_iterator itg(ticket->get_lock()->m_granted); + + MDL_ticket *waiting, *granted; + MDL_ticket *prev=NULL; + bool added= false; + + while ((waiting= itw++) && !added) + { + if (!wsrep_thd_is_brute_force((void *)(waiting->get_ctx()->get_thd()))) + { + WSREP_DEBUG("MDL add_ticket inserted before: %lu %s", + wsrep_thd_thread_id(waiting->get_ctx()->get_thd()), + wsrep_thd_query(waiting->get_ctx()->get_thd())); + m_list.insert_after(prev, ticket); + added= true; + } + prev= waiting; + } + if (!added) m_list.push_back(ticket); + + while ((granted= itg++)) + { + if (granted->get_ctx() != ticket->get_ctx() && + granted->is_incompatible_when_granted(ticket->get_type())) + { + if (!wsrep_grant_mdl_exception(ticket->get_ctx(), granted)) + { + WSREP_DEBUG("MDL victim killed at add_ticket"); + } + } + } + } + else + { +#endif /* WITH_WSREP */ /* Add ticket to the *back* of the queue to ensure fairness among requests with the same priority. */ m_list.push_back(ticket); +#ifdef WITH_WSREP + } +#endif /* WITH_WSREP */ m_bitmap|= MDL_BIT(ticket->get_type()); } @@ -1463,7 +1516,6 @@ MDL_object_lock::m_waiting_incompatible[MDL_TYPE_END] = 0 }; - /** Check if request for the metadata lock can be satisfied given its current state. @@ -1486,6 +1538,9 @@ MDL_lock::can_grant_lock(enum_mdl_type type_arg, bool can_grant= FALSE; bitmap_t waiting_incompat_map= incompatible_waiting_types_bitmap()[type_arg]; bitmap_t granted_incompat_map= incompatible_granted_types_bitmap()[type_arg]; +#ifdef WITH_WSREP + bool wsrep_can_grant= TRUE; +#endif /* WITH_WSREP */ /* New lock request can be satisfied iff: - There are no incompatible types of satisfied requests @@ -1507,12 +1562,51 @@ MDL_lock::can_grant_lock(enum_mdl_type type_arg, { if (ticket->get_ctx() != requestor_ctx && ticket->is_incompatible_when_granted(type_arg)) +#ifdef WITH_WSREP + { + if (wsrep_thd_is_brute_force((void *)(requestor_ctx->get_thd())) && + key.mdl_namespace() == MDL_key::GLOBAL) + { + WSREP_DEBUG("global lock granted for BF: %lu %s", + wsrep_thd_thread_id(requestor_ctx->get_thd()), + wsrep_thd_query(requestor_ctx->get_thd())); + can_grant = true; + } + else if (!wsrep_grant_mdl_exception(requestor_ctx, ticket)) + { + wsrep_can_grant= FALSE; + } + else + { + can_grant= TRUE; + } + } +#else break; +#endif /* WITH_WSREP */ } +#ifdef WITH_WSREP + if ((ticket == NULL) && wsrep_can_grant) +#else if (ticket == NULL) /* Incompatible locks are our own. */ +#endif /* WITH_WSREP */ + can_grant= TRUE; } } +#ifdef WITH_WSREP + else + { + if (wsrep_thd_is_brute_force((void *)(requestor_ctx->get_thd())) && + key.mdl_namespace() == MDL_key::GLOBAL) + { + WSREP_DEBUG("global lock granted for BF (waiting queue): %lu %s", + wsrep_thd_thread_id(requestor_ctx->get_thd()), + wsrep_thd_query(requestor_ctx->get_thd())); + can_grant = true; + } + } +#endif /* WITH_WSREP */ return can_grant; } diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 7394ce5c931..2251663c94a 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -72,6 +72,10 @@ #include "scheduler.h" #include <waiting_threads.h> #include "debug_sync.h" +#ifdef WITH_WSREP +#include "wsrep_mysqld.h" +ulong wsrep_running_threads = 0; // # of currently running wsrep threads +#endif #include "sql_callback.h" #include "threadpool.h" @@ -360,6 +364,9 @@ static DYNAMIC_ARRAY all_options; /* Global variables */ +#ifdef WITH_WSREP +ulong my_bind_addr; +#endif /* WITH_WSREP */ bool opt_bin_log, opt_ignore_builtin_innodb= 0; my_bool opt_log, opt_slow_log, debug_assert_if_crashed_table= 0, opt_help= 0, opt_abort; ulonglong log_output_options; @@ -452,6 +459,10 @@ ulong opt_binlog_rows_event_max_size; my_bool opt_master_verify_checksum= 0; my_bool opt_slave_sql_verify_checksum= 1; const char *binlog_format_names[]= {"MIXED", "STATEMENT", "ROW", NullS}; +#ifdef WITH_WSREP +const char *wsrep_binlog_format_names[]= + {"MIXED", "STATEMENT", "ROW", "NONE", NullS}; +#endif /*WITH_WSREP */ #ifdef HAVE_INITGROUPS volatile sig_atomic_t calling_initgroups= 0; /**< Used in SIGSEGV handler. */ #endif @@ -671,6 +682,21 @@ pthread_attr_t connection_attrib; mysql_mutex_t LOCK_server_started; mysql_cond_t COND_server_started; +#ifdef WITH_WSREP +mysql_mutex_t LOCK_wsrep_ready; +mysql_cond_t COND_wsrep_ready; +mysql_mutex_t LOCK_wsrep_sst; +mysql_cond_t COND_wsrep_sst; +mysql_mutex_t LOCK_wsrep_sst_init; +mysql_cond_t COND_wsrep_sst_init; +mysql_mutex_t LOCK_wsrep_rollback; +mysql_cond_t COND_wsrep_rollback; +wsrep_aborting_thd_t wsrep_aborting_thd= NULL; +mysql_mutex_t LOCK_wsrep_replaying; +mysql_cond_t COND_wsrep_replaying; +int wsrep_replaying= 0; +static void wsrep_close_threads(THD* thd); +#endif int mysqld_server_started= 0; File_parser_dummy_hook file_parser_dummy_hook; @@ -740,6 +766,11 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_prep_xids, key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data, key_LOCK_error_messages, key_LOG_INFO_lock, key_LOCK_thread_count, key_PARTITION_LOCK_auto_inc; +#ifdef WITH_WSREP +PSI_mutex_key key_LOCK_wsrep_rollback, key_LOCK_wsrep_thd, + key_LOCK_wsrep_replaying, key_LOCK_wsrep_ready, key_LOCK_wsrep_sst, + key_LOCK_wsrep_sst_thread, key_LOCK_wsrep_sst_init; +#endif PSI_mutex_key key_RELAYLOG_LOCK_index; PSI_mutex_key key_LOCK_stats, @@ -810,6 +841,16 @@ static PSI_mutex_info all_server_mutexes[]= { &key_LOCK_thread_count, "LOCK_thread_count", PSI_FLAG_GLOBAL}, { &key_LOCK_logger_service, "logger_service_file_st::lock", PSI_FLAG_GLOBAL}, +#ifdef WITH_WSREP + { &key_LOCK_wsrep_ready, "LOCK_wsrep_ready", PSI_FLAG_GLOBAL}, + { &key_LOCK_wsrep_sst, "LOCK_wsrep_sst", PSI_FLAG_GLOBAL}, + { &key_LOCK_wsrep_sst_thread, "wsrep_sst_thread", 0}, + { &key_LOCK_wsrep_sst_init, "LOCK_wsrep_sst_init", PSI_FLAG_GLOBAL}, + { &key_LOCK_wsrep_sst, "LOCK_wsrep_sst", PSI_FLAG_GLOBAL}, + { &key_LOCK_wsrep_rollback, "LOCK_wsrep_rollback", PSI_FLAG_GLOBAL}, + { &key_LOCK_wsrep_thd, "THD::LOCK_wsrep_thd", 0}, + { &key_LOCK_wsrep_replaying, "LOCK_wsrep_replaying", PSI_FLAG_GLOBAL}, +#endif { &key_PARTITION_LOCK_auto_inc, "HA_DATA_PARTITION::LOCK_auto_inc", 0} }; @@ -847,6 +888,11 @@ PSI_cond_key key_BINLOG_COND_prep_xids, key_BINLOG_update_cond, key_TABLE_SHARE_cond, key_user_level_lock_cond, key_COND_thread_count, key_COND_thread_cache, key_COND_flush_thread_cache, key_BINLOG_COND_queue_busy; +#ifdef WITH_WSREP +PSI_cond_key key_COND_wsrep_rollback, key_COND_wsrep_thd, + key_COND_wsrep_replaying, key_COND_wsrep_ready, key_COND_wsrep_sst, + key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread; +#endif /* WITH_WSREP */ PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready; PSI_cond_key key_RELAYLOG_COND_queue_busy; PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy; @@ -888,6 +934,15 @@ static PSI_cond_info all_server_conds[]= { &key_user_level_lock_cond, "User_level_lock::cond", 0}, { &key_COND_thread_count, "COND_thread_count", PSI_FLAG_GLOBAL}, { &key_COND_thread_cache, "COND_thread_cache", PSI_FLAG_GLOBAL}, +#ifdef WITH_WSREP + { &key_COND_wsrep_ready, "COND_wsrep_ready", PSI_FLAG_GLOBAL}, + { &key_COND_wsrep_sst, "COND_wsrep_sst", PSI_FLAG_GLOBAL}, + { &key_COND_wsrep_sst_init, "COND_wsrep_sst_init", PSI_FLAG_GLOBAL}, + { &key_COND_wsrep_sst_thread, "wsrep_sst_thread", 0}, + { &key_COND_wsrep_rollback, "COND_wsrep_rollback", PSI_FLAG_GLOBAL}, + { &key_COND_wsrep_thd, "THD::COND_wsrep_thd", 0}, + { &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL}, +#endif { &key_COND_flush_thread_cache, "COND_flush_thread_cache", PSI_FLAG_GLOBAL} }; @@ -1413,6 +1468,11 @@ static void close_connections(void) if (tmp->slave_thread) continue; +#ifdef WITH_WSREP + /* skip wsrep system threads as well */ + if (WSREP(tmp) && (tmp->wsrep_exec_mode==REPL_RECV || tmp->wsrep_applier)) + continue; +#endif tmp->killed= KILL_SERVER_HARD; MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (tmp)); mysql_mutex_lock(&tmp->LOCK_thd_data); @@ -1476,6 +1536,33 @@ static void close_connections(void) close_connection(tmp,ER_SERVER_SHUTDOWN); } #endif +#ifdef WITH_WSREP + /* + * TODO: this code block may turn out redundant. wsrep->disconnect() + * should terminate slave threads gracefully, and we don't need + * to signal them here. + * The code here makes sure mysqld will not hang during shutdown + * even if wsrep provider has problems in shutting down. + */ + if (WSREP(tmp) && tmp->wsrep_exec_mode==REPL_RECV) + { + sql_print_information("closing wsrep system thread"); + tmp->killed= KILL_CONNECTION; + MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (tmp)); + if (tmp->mysys_var) + { + tmp->mysys_var->abort=1; + mysql_mutex_lock(&tmp->mysys_var->mutex); + if (tmp->mysys_var->current_cond) + { + mysql_mutex_lock(tmp->mysys_var->current_mutex); + mysql_cond_broadcast(tmp->mysys_var->current_cond); + mysql_mutex_unlock(tmp->mysys_var->current_mutex); + } + mysql_mutex_unlock(&tmp->mysys_var->mutex); + } + } +#endif DBUG_PRINT("quit",("Unlocking LOCK_thread_count")); mysql_mutex_unlock(&LOCK_thread_count); } @@ -1636,8 +1723,14 @@ static void __cdecl kill_server(int sig_ptr) } } #endif +#ifdef WITH_WSREP + if (WSREP_ON) wsrep_stop_replication(NULL); +#endif close_connections(); +#ifdef WITH_WSREP + if (WSREP_ON) wsrep_deinit(); +#endif if (sig != MYSQL_KILL_SIGNAL && sig != 0) unireg_abort(1); /* purecov: inspected */ @@ -1732,6 +1825,21 @@ extern "C" void unireg_abort(int exit_code) usage(); if (exit_code) sql_print_error("Aborting\n"); +#ifdef WITH_WSREP + if (wsrep) + { + /* This is an abort situation, we cannot expect to gracefully close all + * wsrep threads here, we can only diconnect from service */ + wsrep_close_client_connections(FALSE); + shutdown_in_progress= 1; + THD* thd(0); + wsrep->disconnect(wsrep); + WSREP_INFO("Service disconnected."); + wsrep_close_threads(thd); /* this won't close all threads */ + sleep(1); /* so give some time to exit for those which can */ + WSREP_INFO("Some threads may fail to exit."); + } +#endif // WITH_WSREP clean_up(!opt_abort && (exit_code || !opt_bootstrap)); /* purecov: inspected */ DBUG_PRINT("quit",("done with cleanup in unireg_abort")); mysqld_exit(exit_code); @@ -1932,6 +2040,18 @@ static void clean_up_mutexes() mysql_cond_destroy(&COND_thread_count); mysql_cond_destroy(&COND_thread_cache); mysql_cond_destroy(&COND_flush_thread_cache); +#ifdef WITH_WSREP + (void) mysql_mutex_destroy(&LOCK_wsrep_ready); + (void) mysql_cond_destroy(&COND_wsrep_ready); + (void) mysql_mutex_destroy(&LOCK_wsrep_sst); + (void) mysql_cond_destroy(&COND_wsrep_sst); + (void) mysql_mutex_destroy(&LOCK_wsrep_sst_init); + (void) mysql_cond_destroy(&COND_wsrep_sst_init); + (void) mysql_mutex_destroy(&LOCK_wsrep_rollback); + (void) mysql_cond_destroy(&COND_wsrep_rollback); + (void) mysql_mutex_destroy(&LOCK_wsrep_replaying); + (void) mysql_cond_destroy(&COND_wsrep_replaying); +#endif mysql_mutex_destroy(&LOCK_server_started); mysql_cond_destroy(&COND_server_started); mysql_mutex_destroy(&LOCK_prepare_ordered); @@ -2350,7 +2470,11 @@ static void network_init(void) @note For the connection that is doing shutdown, this is called twice */ +#ifdef WITH_WSREP +void close_connection(THD *thd, uint sql_errno, bool lock) +#else void close_connection(THD *thd, uint sql_errno) +#endif { DBUG_ENTER("close_connection"); @@ -3477,7 +3601,11 @@ static int init_common_variables() compile_time_assert(sizeof(com_status_vars)/sizeof(com_status_vars[0]) - 1 == SQLCOM_END + 8); #endif - +#ifdef WITH_WSREP + /* This is a protection against mutually incompatible option values. */ + if (WSREP_ON && wsrep_check_opts (remaining_argc, remaining_argv)) + return 1; +#endif /* WITH_WSREP */ if (get_options(&remaining_argc, &remaining_argv)) return 1; set_server_version(); @@ -3864,6 +3992,23 @@ static int init_thread_environment() sql_print_error("Can't create thread-keys"); return 1; } +#ifdef WITH_WSREP + mysql_mutex_init(key_LOCK_wsrep_ready, + &LOCK_wsrep_ready, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_wsrep_ready, &COND_wsrep_ready, NULL); + mysql_mutex_init(key_LOCK_wsrep_sst, + &LOCK_wsrep_sst, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_wsrep_sst, &COND_wsrep_sst, NULL); + mysql_mutex_init(key_LOCK_wsrep_sst_init, + &LOCK_wsrep_sst_init, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_wsrep_sst_init, &COND_wsrep_sst_init, NULL); + mysql_mutex_init(key_LOCK_wsrep_rollback, + &LOCK_wsrep_rollback, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_wsrep_rollback, &COND_wsrep_rollback, NULL); + mysql_mutex_init(key_LOCK_wsrep_replaying, + &LOCK_wsrep_replaying, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_wsrep_replaying, &COND_wsrep_replaying, NULL); +#endif return 0; } @@ -4106,7 +4251,12 @@ static int init_server_components() sql_print_warning("You need to use --log-bin to make " "--log-slave-updates work."); } + +#ifdef WITH_WSREP + if (!WSREP_ON && !opt_bin_log && binlog_format_used) +#else if (!opt_bin_log && binlog_format_used) +#endif sql_print_warning("You need to use --log-bin to make " "--binlog-format work."); @@ -4131,6 +4281,31 @@ will be ignored as the --log-bin option is not defined."); } #endif +/* WSREP BEFORE */ +#ifdef WITH_WSREP + // add basedir/bin to PATH to resolve wsrep script names + char* const tmp_path((char*)alloca(strlen(mysql_home) + strlen("/bin") + 1)); + if (tmp_path) + { + strcpy(tmp_path, mysql_home); + strcat(tmp_path, "/bin"); + wsrep_prepend_PATH(tmp_path); + } + else + { + WSREP_ERROR("Could not append %s/bin to PATH", mysql_home); + } + + if (opt_bootstrap) + { + wsrep_provider_init(WSREP_NONE); + if (wsrep_init()) unireg_abort(1); + } + else if (!wsrep_recovery && wsrep_init_first()) + { + wsrep_init_startup(true); + } +#endif /* WITH_WSREP */ if (opt_bin_log) { /* Reports an error and aborts, if the --log-bin's path @@ -4336,11 +4511,30 @@ a file name for --log-bin-index option", opt_binlog_index_name); internal_tmp_table_max_key_segments= myisam_max_key_segments(); #endif +#ifdef WITH_WSREP + if (!opt_bin_log) + { + wsrep_emulate_bin_log= 1; + } +#endif + tc_log= (total_ha_2pc > 1 ? (opt_bin_log ? (TC_LOG *) &mysql_bin_log : +#ifdef WITH_WSREP + (WSREP_ON ? + (TC_LOG *) &tc_log_dummy : + (TC_LOG *) &tc_log_mmap)) : +#else (TC_LOG *) &tc_log_mmap) : - (TC_LOG *) &tc_log_dummy); - +#endif + (TC_LOG *) &tc_log_dummy); +#ifdef WITH_WSREP + WSREP_DEBUG("Initial TC log open: %s", + (tc_log == &mysql_bin_log) ? "binlog" : + (tc_log == &tc_log_mmap) ? "mmap" : + (tc_log == &tc_log_dummy) ? "dummy" : "unknown" + ); +#endif if (tc_log->open(opt_bin_log ? opt_bin_logname : opt_tc_log_file)) { sql_print_error("Can't init tc log"); @@ -4397,8 +4591,6 @@ a file name for --log-bin-index option", opt_binlog_index_name); init_global_client_stats(); DBUG_RETURN(0); } - - #ifndef EMBEDDED_LIBRARY static void create_shutdown_thread() @@ -4417,6 +4609,397 @@ static void create_shutdown_thread() #endif /* EMBEDDED_LIBRARY */ +#ifdef WITH_WSREP +typedef void (*wsrep_thd_processor_fun)(THD *); + +pthread_handler_t start_wsrep_THD(void *arg) +{ + THD *thd; + wsrep_thd_processor_fun processor= (wsrep_thd_processor_fun)arg; + + DBUG_ENTER("start_wsrep_THD"); + if (my_thread_init()) + { + WSREP_ERROR("Could not initialize thread"); + DBUG_RETURN(NULL); + } + + if (!(thd= new THD(true))) + { + DBUG_RETURN(NULL); + } + mysql_mutex_lock(&LOCK_thread_count); + thd->thread_id=thread_id++; + + thd->real_id=pthread_self(); // Keep purify happy + thread_count++; + thread_created++; + threads.append(thd); + + my_net_init(&thd->net,(st_vio*) 0); + + DBUG_PRINT("wsrep",(("creating thread %lld"), (long long)thd->thread_id)); + thd->prior_thr_create_utime= thd->start_utime= microsecond_interval_timer(); + (void) mysql_mutex_unlock(&LOCK_thread_count); + + /* from bootstrap()... */ + thd->bootstrap=1; + thd->max_client_packet_length= thd->net.max_packet; + thd->security_ctx->master_access= ~(ulong)0; + + /* from handle_one_connection... */ + pthread_detach_this_thread(); + + mysql_thread_set_psi_id(thd->thread_id); + thd->thr_create_utime= microsecond_interval_timer(); + if (MYSQL_CALLBACK_ELSE(thread_scheduler, init_new_connection_thread, (), 0)) + { + close_connection(thd, ER_OUT_OF_RESOURCES, 1); + statistic_increment(aborted_connects,&LOCK_status); + MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0)); + + DBUG_RETURN(NULL); + } + +// </5.1.17> + /* + handle_one_connection() is normally the only way a thread would + start and would always be on the very high end of the stack , + therefore, the thread stack always starts at the address of the + first local variable of handle_one_connection, which is thd. We + need to know the start of the stack so that we could check for + stack overruns. + */ + DBUG_PRINT("wsrep", ("handle_one_connection called by thread %lld\n", + (long long)thd->thread_id)); + /* now that we've called my_thread_init(), it is safe to call DBUG_* */ + + thd->thread_stack= (char*) &thd; + if (thd->store_globals()) + { + close_connection(thd, ER_OUT_OF_RESOURCES, 1); + statistic_increment(aborted_connects,&LOCK_status); + MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0)); + delete thd; + + DBUG_RETURN(NULL); + } + + thd->system_thread= SYSTEM_THREAD_SLAVE_SQL; + thd->security_ctx->skip_grants(); + + /* handle_one_connection() again... */ + //thd->version= refresh_version; + thd->proc_info= 0; + thd->command= COM_SLEEP; + thd->set_time(); + thd->init_for_queries(); + + mysql_mutex_lock(&LOCK_connection_count); + ++connection_count; + mysql_mutex_unlock(&LOCK_connection_count); + + mysql_mutex_lock(&LOCK_thread_count); + wsrep_running_threads++; + mysql_cond_signal(&COND_thread_count); + mysql_mutex_unlock(&LOCK_thread_count); + + processor(thd); + + close_connection(thd, 0, 1); + + mysql_mutex_lock(&LOCK_thread_count); + wsrep_running_threads--; + mysql_cond_signal(&COND_thread_count); + mysql_mutex_unlock(&LOCK_thread_count); + + // Note: We can't call THD destructor without crashing + // if plugins have not been initialized. However, in most of the + // cases this means that pre SE initialization SST failed and + // we are going to exit anyway. + if (plugins_are_initialized) + { + net_end(&thd->net); + MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 1)); + } + else + { + // TODO: lightweight cleanup to get rid of: + // 'Error in my_thread_global_end(): 2 threads didn't exit' + // at server shutdown + } + DBUG_RETURN(NULL); +} + +void wsrep_create_rollbacker() +{ + if (wsrep_provider && strcasecmp(wsrep_provider, "none")) + { + pthread_t hThread; + /* create rollbacker */ + if (pthread_create( &hThread, &connection_attrib, + start_wsrep_THD, (void*)wsrep_rollback_process)) + WSREP_WARN("Can't create thread to manage wsrep rollback"); + } +} + +void wsrep_create_appliers(long threads) +{ + if (!wsrep_connected) + { + /* see wsrep_replication_start() for the logic */ + if (wsrep_cluster_address && strlen(wsrep_cluster_address) && + wsrep_provider && strcasecmp(wsrep_provider, "none")) + { + WSREP_ERROR("Trying to launch slave threads before creating " + "connection at '%s'", wsrep_cluster_address); + assert(0); + } + return; + } + + long wsrep_threads=0; + pthread_t hThread; + while (wsrep_threads++ < threads) { + if (pthread_create( + &hThread, &connection_attrib, + start_wsrep_THD, (void*)wsrep_replication_process)) + WSREP_WARN("Can't create thread to manage wsrep replication"); + } +} +/**/ +static bool abort_replicated(THD *thd) +{ + bool ret_code= false; + if (thd->wsrep_query_state== QUERY_COMMITTING) + { + if (wsrep_debug) WSREP_INFO("aborting replicated trx: %lu", thd->real_id); + + (void)wsrep_abort_thd(thd, thd, TRUE); + ret_code= true; + } + return ret_code; +} +/**/ +static inline bool is_client_connection(THD *thd) +{ +#if REMOVE +// REMOVE THIS LATER (lp:777201). Below we had to add an explicit check for +// wsrep_applier since wsrep_exec_mode didn't seem to always work +if (thd->wsrep_applier && thd->wsrep_exec_mode != REPL_RECV) +WSREP_WARN("applier has wsrep_exec_mode = %d", thd->wsrep_exec_mode); + + if ( thd->slave_thread || /* declared as mysql slave */ + thd->system_thread || /* declared as system thread */ + !thd->vio_ok() || /* server internal thread */ + thd->wsrep_exec_mode==REPL_RECV || /* applier or replaying thread */ + thd->wsrep_applier || /* wsrep slave applier */ + !thd->variables.wsrep_on) /* client, but fenced outside wsrep */ + return false; + + return true; +#else + return (thd->wsrep_client_thread && thd->variables.wsrep_on); +#endif /* REMOVE */ +} + +static bool have_client_connections() +{ + THD *tmp; + + I_List_iterator<THD> it(threads); + while ((tmp=it++)) + { + DBUG_PRINT("quit",("Informing thread %ld that it's time to die", + tmp->thread_id)); + if (is_client_connection(tmp) && tmp->killed == KILL_CONNECTION) + { + (void)abort_replicated(tmp); + return true; + } + } + return false; +} + +/* + returns the number of wsrep appliers running. + However, the caller (thd parameter) is not taken in account + */ +static int have_wsrep_appliers(THD *thd) +{ + int ret= 0; + THD *tmp; + + I_List_iterator<THD> it(threads); + while ((tmp=it++)) + { + ret+= (tmp != thd && tmp->wsrep_applier); + } + return ret; +} + +static void wsrep_close_thread(THD *thd) +{ + thd->killed= KILL_CONNECTION; + MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd)); + if (thd->mysys_var) + { + thd->mysys_var->abort=1; + mysql_mutex_lock(&thd->mysys_var->mutex); + if (thd->mysys_var->current_cond) + { + mysql_mutex_lock(thd->mysys_var->current_mutex); + mysql_cond_broadcast(thd->mysys_var->current_cond); + mysql_mutex_unlock(thd->mysys_var->current_mutex); + } + mysql_mutex_unlock(&thd->mysys_var->mutex); + } +} + +void wsrep_close_client_connections(my_bool wait_to_end) +{ + /* + First signal all threads that it's time to die + */ + + THD *tmp; + mysql_mutex_lock(&LOCK_thread_count); // For unlink from list + + bool kill_cached_threads_saved= kill_cached_threads; + kill_cached_threads= true; // prevent future threads caching + mysql_cond_broadcast(&COND_thread_cache); // tell cached threads to die + + I_List_iterator<THD> it(threads); + while ((tmp=it++)) + { + DBUG_PRINT("quit",("Informing thread %ld that it's time to die", + tmp->thread_id)); + /* We skip slave threads & scheduler on this first loop through. */ + if (!is_client_connection(tmp)) + continue; + + /* replicated transactions must be skipped */ + if (abort_replicated(tmp)) + continue; + + WSREP_DEBUG("closing connection %ld", tmp->thread_id); + wsrep_close_thread(tmp); + } + mysql_mutex_unlock(&LOCK_thread_count); + + if (thread_count) + sleep(2); // Give threads time to die + + mysql_mutex_lock(&LOCK_thread_count); + /* + Force remaining threads to die by closing the connection to the client + */ + + I_List_iterator<THD> it2(threads); + while ((tmp=it2++)) + { +#ifndef __bsdi__ // Bug in BSDI kernel + if (is_client_connection(tmp) && !abort_replicated(tmp)) + { + WSREP_INFO("SST kill local trx: %ld",tmp->thread_id); + close_connection(tmp,0,0); + } +#endif + } + + DBUG_PRINT("quit",("Waiting for threads to die (count=%u)",thread_count)); + if (wsrep_debug) + WSREP_INFO("waiting for client connections to close: %u", thread_count); + + while (wait_to_end && have_client_connections()) + { + mysql_cond_wait(&COND_thread_count, &LOCK_thread_count); + DBUG_PRINT("quit",("One thread died (count=%u)", thread_count)); + } + + kill_cached_threads= kill_cached_threads_saved; + + mysql_mutex_unlock(&LOCK_thread_count); + + /* All client connection threads have now been aborted */ +} + +void wsrep_close_applier(THD *thd) +{ + if (wsrep_debug) + WSREP_INFO("closing applier %ld", thd->thread_id); + + wsrep_close_thread(thd); +} + +static void wsrep_close_threads(THD *thd) +{ + THD *tmp; + mysql_mutex_lock(&LOCK_thread_count); // For unlink from list + + I_List_iterator<THD> it(threads); + while ((tmp=it++)) + { + DBUG_PRINT("quit",("Informing thread %ld that it's time to die", + tmp->thread_id)); + /* We skip slave threads & scheduler on this first loop through. */ + if (tmp->wsrep_applier && tmp != thd) + { + if (wsrep_debug) + WSREP_INFO("closing wsrep thread %ld", tmp->thread_id); + wsrep_close_thread (tmp); + + } + } + + mysql_mutex_unlock(&LOCK_thread_count); +} + +void wsrep_wait_appliers_close(THD *thd) +{ + /* Wait for wsrep appliers to gracefully exit */ + mysql_mutex_lock(&LOCK_thread_count); + while (have_wsrep_appliers(thd) > 1) + // 1 is for rollbacker thread which needs to be killed explicitly. + // This gotta be fixed in a more elegant manner if we gonna have arbitrary + // number of non-applier wsrep threads. + { + mysql_cond_wait(&COND_thread_count,&LOCK_thread_count); + DBUG_PRINT("quit",("One applier died (count=%u)",thread_count)); + } + mysql_mutex_unlock(&LOCK_thread_count); + /* Now kill remaining wsrep threads: rollbacker */ + wsrep_close_threads (thd); + /* and wait for them to die */ + mysql_mutex_lock(&LOCK_thread_count); + while (have_wsrep_appliers(thd) > 0) + { + mysql_cond_wait(&COND_thread_count,&LOCK_thread_count); + DBUG_PRINT("quit",("One thread died (count=%u)",thread_count)); + } + mysql_mutex_unlock(&LOCK_thread_count); + + /* All wsrep applier threads have now been aborted. However, if this thread + is also applier, we are still running... + */ +} + +void wsrep_kill_mysql(THD *thd) +{ + if (mysqld_server_started) + { + if (!shutdown_in_progress) + { + WSREP_INFO("starting shutdown"); + kill_mysql(); + } + } + else + { + unireg_abort(1); + } +} +#endif /* WITH_WSREP */ #if (defined(_WIN32) || defined(HAVE_SMEM)) && !defined(EMBEDDED_LIBRARY) static void handle_connections_methods() @@ -4873,6 +5456,33 @@ int mysqld_main(int argc, char **argv) if (Events::init(opt_noacl || opt_bootstrap)) unireg_abort(1); +/* WSREP AFTER */ +#ifdef WITH_WSREP + wsrep_SE_initialized(); + if (opt_bootstrap) + { + /*! bootstrap wsrep init was taken care of above */ + } + else if (wsrep_recovery) + { + select_thread_in_use= 0; + wsrep_recover(); + unireg_abort(0); + } + else if (wsrep_init_first()) + { + /*! in case of no SST wsrep waits in view handler callback */ + wsrep_SE_init_grab(); + wsrep_SE_init_done(); + /*! in case of SST wsrep waits for wsrep->sst_received */ + wsrep_sst_continue(); + } + else + { + wsrep_init_startup (false); + } + wsrep_create_appliers(wsrep_slave_threads - 1); +#endif /* WITH_WSREP */ if (opt_bootstrap) { select_thread_in_use= 0; // Allow 'kill' to work @@ -4930,6 +5540,9 @@ int mysqld_main(int argc, char **argv) #ifdef EXTRA_DEBUG2 sql_print_error("Before Lock_thread_count"); #endif +#ifdef WITH_WSREP + WSREP_DEBUG("Before Lock_thread_count"); +#endif mysql_mutex_lock(&LOCK_thread_count); DBUG_PRINT("quit", ("Got thread_count mutex")); select_thread_in_use=0; // For close_connections @@ -5196,6 +5809,9 @@ static void bootstrap(MYSQL_FILE *file) DBUG_ENTER("bootstrap"); THD *thd= new THD; +#ifdef WITH_WSREP + thd->variables.wsrep_on= 0; +#endif thd->bootstrap=1; my_net_init(&thd->net,(st_vio*) 0); thd->max_client_packet_length= thd->net.max_packet; @@ -5320,7 +5936,11 @@ void create_thread_to_handle_connection(THD *thd) my_snprintf(error_message_buff, sizeof(error_message_buff), ER_THD(thd, ER_CANT_CREATE_THREAD), error); net_send_error(thd, ER_CANT_CREATE_THREAD, error_message_buff, NULL); +#ifdef WITH_WSREP + close_connection(thd, ER_OUT_OF_RESOURCES ,0); +#else close_connection(thd, ER_OUT_OF_RESOURCES); +#endif /* WITH_WSREP */ mysql_mutex_lock(&LOCK_thread_count); delete thd; mysql_mutex_unlock(&LOCK_thread_count); @@ -5363,7 +5983,11 @@ static void create_new_thread(THD *thd) mysql_mutex_unlock(&LOCK_connection_count); DBUG_PRINT("error",("Too many connections")); + #ifdef WITH_WSREP + close_connection(thd, ER_CON_COUNT_ERROR, 1); +#else close_connection(thd, ER_CON_COUNT_ERROR); +#endif /* WITH_WSREP */ statistic_increment(denied_connections, &LOCK_status); delete thd; DBUG_VOID_RETURN; @@ -5744,7 +6368,11 @@ pthread_handler_t handle_connections_namedpipes(void *arg) if (!(thd->net.vio= vio_new_win32pipe(hConnectedPipe)) || my_net_init(&thd->net, thd->net.vio)) { +#ifdef WITH_WSREP + close_connection(thd, ER_OUT_OF_RESOURCES, 1); +#else close_connection(thd, ER_OUT_OF_RESOURCES); +#endif delete thd; continue; } @@ -5939,7 +6567,11 @@ pthread_handler_t handle_connections_shared_memory(void *arg) event_conn_closed)) || my_net_init(&thd->net, thd->net.vio)) { +#ifdef WITH_WSREP + close_connection(thd, ER_OUT_OF_RESOURCES, 1); +#else close_connection(thd, ER_OUT_OF_RESOURCES); +#endif errmsg= 0; goto errorconn; } @@ -7022,6 +7654,19 @@ SHOW_VAR status_vars[]= { #ifdef ENABLED_PROFILING {"Uptime_since_flush_status",(char*) &show_flushstatustime, SHOW_FUNC}, #endif +#ifdef WITH_WSREP + {"wsrep_connected", (char*) &wsrep_connected, SHOW_BOOL}, + {"wsrep_ready", (char*) &wsrep_ready, SHOW_BOOL}, + {"wsrep_cluster_state_uuid", (char*) &wsrep_cluster_state_uuid,SHOW_CHAR_PTR}, + {"wsrep_cluster_conf_id", (char*) &wsrep_cluster_conf_id, SHOW_LONGLONG}, + {"wsrep_cluster_status", (char*) &wsrep_cluster_status, SHOW_CHAR_PTR}, + {"wsrep_cluster_size", (char*) &wsrep_cluster_size, SHOW_LONG}, + {"wsrep_local_index", (char*) &wsrep_local_index, SHOW_LONG}, + {"wsrep_provider_name", (char*) &wsrep_provider_name, SHOW_CHAR_PTR}, + {"wsrep_provider_version", (char*) &wsrep_provider_version, SHOW_CHAR_PTR}, + {"wsrep_provider_vendor", (char*) &wsrep_provider_vendor, SHOW_CHAR_PTR}, + {"wsrep", (char*) &wsrep_show_status, SHOW_FUNC}, +#endif {NullS, NullS, SHOW_LONG} }; @@ -7345,6 +7990,10 @@ static int mysql_init_variables(void) tmpenv = DEFAULT_MYSQL_HOME; (void) strmake(mysql_home, tmpenv, sizeof(mysql_home)-1); #endif +#ifdef WITH_WSREP + if (WSREP_ON && wsrep_init_vars()) + return 1; +#endif return 0; } @@ -7620,6 +8269,23 @@ mysqld_get_one_option(int optid, case OPT_LOWER_CASE_TABLE_NAMES: lower_case_table_names_used= 1; break; +#ifdef WITH_WSREP + case OPT_WSREP_PROVIDER: + wsrep_provider_init (argument); + break; + case OPT_WSREP_PROVIDER_OPTIONS: + wsrep_provider_options_init (argument); + break; + case OPT_WSREP_CLUSTER_ADDRESS: + wsrep_cluster_address_init (argument); + break; + case OPT_WSREP_START_POSITION: + wsrep_start_position_init (argument); + break; + case OPT_WSREP_SST_AUTH: + wsrep_sst_auth_init (argument); + break; +#endif #if defined(ENABLED_DEBUG_SYNC) case OPT_DEBUG_SYNC_TIMEOUT: /* diff --git a/sql/mysqld.h b/sql/mysqld.h index 988584f779a..99093e1e83b 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -53,7 +53,11 @@ typedef Bitmap<((MAX_INDEXES+7)/8*8)> key_map; /* Used for finding keys */ some places */ /* Function prototypes */ void kill_mysql(void); +#ifdef WITH_WSREP +void close_connection(THD *thd, uint sql_errno= 0, bool lock=1); +#else void close_connection(THD *thd, uint sql_errno= 0); +#endif void handle_connection_in_main_thread(THD *thd); void create_thread_to_handle_connection(THD *thd); void unlink_thd(THD *thd); @@ -219,6 +223,10 @@ extern pthread_key(MEM_ROOT**,THR_MALLOC); extern PSI_mutex_key key_PAGE_lock, key_LOCK_sync, key_LOCK_active, key_LOCK_pool; #endif /* HAVE_MMAP */ +#ifdef WITH_WSREP +extern PSI_mutex_key key_LOCK_wsrep_thd; +extern PSI_cond_key key_COND_wsrep_thd; +#endif /* HAVE_MMAP */ #ifdef HAVE_OPENSSL extern PSI_mutex_key key_LOCK_des_key_file; @@ -406,6 +414,14 @@ enum options_mysqld OPT_WANT_CORE, OPT_ENGINE_CONDITION_PUSHDOWN, OPT_LOG_ERROR, +#ifdef WITH_WSREP + OPT_WSREP_PROVIDER, + OPT_WSREP_PROVIDER_OPTIONS, + OPT_WSREP_CLUSTER_ADDRESS, + OPT_WSREP_START_POSITION, + OPT_WSREP_SST_AUTH, + OPT_WSREP_RECOVER, +#endif OPT_MAX_LONG_DATA_SIZE }; #endif @@ -554,4 +570,5 @@ extern uint internal_tmp_table_max_key_segments; extern uint volatile global_disable_checkpoint; extern my_bool opt_help; + #endif /* MYSQLD_INCLUDED */ diff --git a/sql/protocol.cc b/sql/protocol.cc index 63b945f7078..4a7ea88c402 100644 --- a/sql/protocol.cc +++ b/sql/protocol.cc @@ -485,6 +485,14 @@ static uchar *net_store_length_fast(uchar *packet, uint length) void Protocol::end_statement() { +#ifdef WITH_WSREP + /*sanity check, can be removed before 1.0 release */ + if (WSREP(thd) && thd->wsrep_conflict_state== REPLAYING) + { + WSREP_ERROR("attempting net_end_statement while replaying"); + return; + } +#endif DBUG_ENTER("Protocol::end_statement"); DBUG_ASSERT(! thd->stmt_da->is_sent); bool error= FALSE; diff --git a/sql/set_var.h b/sql/set_var.h index c074f3f4399..85faaaadd73 100644 --- a/sql/set_var.h +++ b/sql/set_var.h @@ -235,6 +235,9 @@ public: int check(THD *thd); int update(THD *thd); int light_check(THD *thd); +#ifdef WITH_WSREP + int wsrep_store_variable(THD *thd); +#endif }; @@ -315,6 +318,9 @@ extern sys_var *Sys_autocommit_ptr; CHARSET_INFO *get_old_charset_by_name(const char *old_name); +#ifdef WITH_WSREP +int sql_set_wsrep_variables(THD *thd, List<set_var_base> *var_list); +#endif int sys_var_init(); int sys_var_add_options(DYNAMIC_ARRAY *long_options, int parse_flags); void sys_var_end(void); diff --git a/sql/slave.cc b/sql/slave.cc index 56f9c14703c..833820203b0 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -53,6 +53,9 @@ // Create_file_log_event, // Format_description_log_event +#ifdef WITH_WSREP +#include "wsrep_mysqld.h" +#endif #ifdef HAVE_REPLICATION #include "rpl_tblmap.h" @@ -3487,6 +3490,11 @@ pthread_handler_t handle_slave_sql(void *arg) #endif DBUG_ASSERT(rli->sql_thd == thd); +#ifdef WITH_WSREP + thd->wsrep_exec_mode= LOCAL_STATE; + /* synchronize with wsrep replication */ + wsrep_ready_wait (); +#endif DBUG_PRINT("master_info",("log_file_name: %s position: %s", rli->group_master_log_name, llstr(rli->group_master_log_pos,llbuff))); diff --git a/sql/sp.cc b/sql/sp.cc index 29195234a5a..700a8c54fbf 100644 --- a/sql/sp.cc +++ b/sql/sp.cc @@ -2277,3 +2277,37 @@ sp_load_for_information_schema(THD *thd, TABLE *proc_table, String *db, thd->lex= old_lex; return sp; } +#ifdef WITH_WSREP +int wsrep_create_sp(THD *thd, uchar** buf, uint* buf_len) +{ + String log_query; + sp_head *sp = thd->lex->sphead; + ulong saved_mode= thd->variables.sql_mode; + String retstr(64); + retstr.set_charset(system_charset_info); + + log_query.set_charset(system_charset_info); + + if (sp->m_type == TYPE_ENUM_FUNCTION) + { + sp_returns_type(thd, retstr, sp); + } + + if (!create_string(thd, &log_query, + sp->m_type, + (sp->m_explicit_name ? sp->m_db.str : NULL), + (sp->m_explicit_name ? sp->m_db.length : 0), + sp->m_name.str, sp->m_name.length, + sp->m_params.str, sp->m_params.length, + retstr.c_ptr(), retstr.length(), + sp->m_body.str, sp->m_body.length, + sp->m_chistics, &(thd->lex->definer->user), + &(thd->lex->definer->host), + saved_mode)) + { + WSREP_WARN("SP create string failed: %s", thd->query()); + return 1; + } + return wsrep_to_buf_helper(thd, log_query.ptr(), log_query.length(), buf, buf_len); +} +#endif /* WITH_WSREP */ diff --git a/sql/sql_alter.cc b/sql/sql_alter.cc index c6c02773286..c4468ee8793 100644 --- a/sql/sql_alter.cc +++ b/sql/sql_alter.cc @@ -17,7 +17,9 @@ #include "sql_table.h" // mysql_alter_table, // mysql_exchange_partition #include "sql_alter.h" - +#ifdef WITH_WSREP +#include "wsrep_mysqld.h" +#endif /* WITH_WSREP */ bool Alter_table_statement::execute(THD *thd) { LEX *lex= thd->lex; @@ -97,6 +99,17 @@ bool Alter_table_statement::execute(THD *thd) thd->enable_slow_log= opt_log_slow_admin_statements; +#ifdef WITH_WSREP +TABLE *find_temporary_table(THD *thd, const TABLE_LIST *tl); + + if ((!thd->is_current_stmt_binlog_format_row() || + !find_temporary_table(thd, first_table)) && + wsrep_to_isolation_begin(thd, first_table->db, first_table->table_name)) + { + WSREP_WARN("ALTER TABLE isolation failure"); + DBUG_RETURN(TRUE); + } +#endif /* WITH_WSREP */ result= mysql_alter_table(thd, select_lex->db, lex->name.str, &create_info, first_table, @@ -105,5 +118,7 @@ bool Alter_table_statement::execute(THD *thd) select_lex->order_list.first, lex->ignore, lex->online); +#ifdef WITH_WSREP +#endif /* WITH_WSREP */ DBUG_RETURN(result); } diff --git a/sql/sql_base.cc b/sql/sql_base.cc index 9298d3ccb72..d0404693405 100644 --- a/sql/sql_base.cc +++ b/sql/sql_base.cc @@ -60,6 +60,9 @@ #include <io.h> #endif +#ifdef WITH_WSREP +#include "wsrep_mysqld.h" +#endif // WITH_WSREP bool No_such_table_error_handler::handle_condition(THD *, @@ -4218,7 +4221,7 @@ thr_lock_type read_lock_type_for_table(THD *thd, */ bool log_on= mysql_bin_log.is_open() && thd->variables.sql_log_bin; ulong binlog_format= thd->variables.binlog_format; - if ((log_on == FALSE) || (binlog_format == BINLOG_FORMAT_ROW) || + if ((log_on == FALSE) || (WSREP_FORMAT(binlog_format) == BINLOG_FORMAT_ROW) || (table_list->table->s->table_category == TABLE_CATEGORY_LOG) || (table_list->table->s->table_category == TABLE_CATEGORY_PERFORMANCE) || !(is_update_query(prelocking_ctx->sql_command) || @@ -5075,7 +5078,14 @@ restart: } err: +#ifdef WITH_WSREP + if (WSREP(thd)) + thd_proc_info(thd, "exit open_tables()"); + else + thd_proc_info(thd, 0); +#else /* WITH_WSREP */ thd_proc_info(thd, 0); +#endif /* WITH_WSREP */ free_root(&new_frm_mem, MYF(0)); // Free pre-alloced block if (error && *table_to_open) @@ -5518,7 +5528,14 @@ end: trans_rollback_stmt(thd); close_thread_tables(thd); } +#ifdef WITH_WSREP + if (WSREP(thd)) + thd_proc_info(thd, "End opening table"); + else + thd_proc_info(thd, 0); +#else /* WITH_WSREP */ thd_proc_info(thd, 0); +#endif /* WITH_WSREP */ DBUG_RETURN(table); } @@ -5738,7 +5755,7 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, We can solve these problems in mixed mode by switching to binlogging if at least one updated table is used by sub-statement */ - if (thd->variables.binlog_format != BINLOG_FORMAT_ROW && tables && + if (WSREP_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_ROW && tables && has_write_table_with_auto_increment(thd->lex->first_not_own_table())) thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_AUTOINC_COLUMNS); } @@ -9169,7 +9186,19 @@ bool mysql_notify_thread_having_shared_lock(THD *thd, THD *in_use, (e.g. see partitioning code). */ if (!thd_table->needs_reopen()) +#ifdef WITH_WSREP + { + signalled|= mysql_lock_abort_for_thread(thd, thd_table); + if (thd && WSREP(thd) && wsrep_thd_is_brute_force((void *)thd)) + { + WSREP_DEBUG("remove_table_from_cache: %llu", + (unsigned long long) thd->real_id); + wsrep_abort_thd((void *)thd, (void *)in_use, FALSE); + } + } +#else signalled|= mysql_lock_abort_for_thread(thd, thd_table); +#endif } mysql_mutex_unlock(&in_use->LOCK_thd_data); } diff --git a/sql/sql_builtin.cc.in b/sql/sql_builtin.cc.in index 63850650ac9..2de475b0a76 100644 --- a/sql/sql_builtin.cc.in +++ b/sql/sql_builtin.cc.in @@ -25,7 +25,11 @@ extern #endif builtin_maria_plugin @mysql_mandatory_plugins@ @mysql_optional_plugins@ - builtin_maria_binlog_plugin, builtin_maria_mysql_password_plugin; + builtin_maria_binlog_plugin, +#ifdef WITH_WSREP + builtin_wsrep_plugin@mysql_plugin_defs@, +#endif /* WITH_WSREP */ + builtin_maria_mysql_password_plugin; struct st_maria_plugin *mysql_optional_plugins[]= { @@ -35,5 +39,8 @@ struct st_maria_plugin *mysql_optional_plugins[]= struct st_maria_plugin *mysql_mandatory_plugins[]= { builtin_maria_binlog_plugin, builtin_maria_mysql_password_plugin, +#ifdef WITH_WSREP + builtin_wsrep_plugin@mysql_plugin_defs@, +#endif /* WITH_WSREP */ @mysql_mandatory_plugins@ 0 }; diff --git a/sql/sql_class.cc b/sql/sql_class.cc index d7d0c8d3f68..5b63201a910 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -62,6 +62,9 @@ #include "debug_sync.h" #include "sql_parse.h" // is_update_query #include "sql_callback.h" +#ifdef WITH_WSREP +#include "wsrep_mysqld.h" +#endif #include "sql_connect.h" /* @@ -695,6 +698,137 @@ char *thd_security_context(THD *thd, char *buffer, unsigned int length, return buffer; } +#ifdef WITH_WSREP +extern "C" int wsrep_on(void *thd) +{ + return (int)(WSREP(((THD*)thd))); +} +extern "C" bool wsrep_thd_is_wsrep_on(THD *thd) +{ + return thd->variables.wsrep_on; +} + +extern "C" bool wsrep_consistency_check(void *thd) +{ + return ((THD*)thd)->wsrep_consistency_check; +} + +extern "C" void wsrep_thd_set_exec_mode(THD *thd, enum wsrep_exec_mode mode) +{ + thd->wsrep_exec_mode= mode; +} +extern "C" void wsrep_thd_set_query_state( + THD *thd, enum wsrep_query_state state) +{ + thd->wsrep_query_state= state; +} +extern "C" void wsrep_thd_set_conflict_state( + THD *thd, enum wsrep_conflict_state state) +{ + thd->wsrep_conflict_state= state; +} + + +extern "C" enum wsrep_exec_mode wsrep_thd_exec_mode(THD *thd) +{ + return thd->wsrep_exec_mode; +} +extern "C" enum wsrep_query_state wsrep_thd_query_state(THD *thd) +{ + return thd->wsrep_query_state; +} +extern "C" enum wsrep_conflict_state wsrep_thd_conflict_state(THD *thd) +{ + return thd->wsrep_conflict_state; +} + +extern "C" wsrep_trx_handle_t* wsrep_thd_trx_handle(THD *thd) +{ + return &thd->wsrep_trx_handle; +} + +extern "C"void wsrep_thd_LOCK(THD *thd) +{ + mysql_mutex_lock(&thd->LOCK_wsrep_thd); +} +extern "C"void wsrep_thd_UNLOCK(THD *thd) +{ + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); +} +extern "C" time_t wsrep_thd_query_start(THD *thd) +{ + return thd->query_start(); +} +extern "C" uint32 wsrep_thd_wsrep_rand(THD *thd) +{ + return thd->wsrep_rand; +} +extern "C" my_thread_id wsrep_thd_thread_id(THD *thd) +{ + return thd->thread_id; +} +extern "C" wsrep_seqno_t wsrep_thd_trx_seqno(THD *thd) +{ + return thd->wsrep_trx_seqno; +} +extern "C" query_id_t wsrep_thd_query_id(THD *thd) +{ + return thd->query_id; +} +extern "C" char *wsrep_thd_query(THD *thd) +{ + return thd->query(); +} +extern "C" query_id_t wsrep_thd_wsrep_last_query_id(THD *thd) +{ + return thd->wsrep_last_query_id; +} +extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id) +{ + thd->wsrep_last_query_id= id; +} +extern "C" void wsrep_thd_awake(THD *thd, my_bool signal) +{ + if (signal) + { + mysql_mutex_lock(&thd->LOCK_thd_data); + thd->awake(KILL_QUERY); + mysql_mutex_unlock(&thd->LOCK_thd_data); + } + else + { + mysql_mutex_lock(&LOCK_wsrep_replaying); + mysql_cond_broadcast(&COND_wsrep_replaying); + mysql_mutex_unlock(&LOCK_wsrep_replaying); + } +} + +extern "C" int +wsrep_trx_order_before(void *thd1, void *thd2) +{ + if (((THD*)thd1)->wsrep_trx_seqno < ((THD*)thd2)->wsrep_trx_seqno) { + WSREP_DEBUG("BF conflict, order: %lld %lld\n", + (long long)((THD*)thd1)->wsrep_trx_seqno, + (long long)((THD*)thd2)->wsrep_trx_seqno); + return 1; + } + WSREP_DEBUG("waiting for BF, trx order: %lld %lld\n", + (long long)((THD*)thd1)->wsrep_trx_seqno, + (long long)((THD*)thd2)->wsrep_trx_seqno); + return 0; +} +extern "C" int +wsrep_trx_is_aborting(void *thd_ptr) +{ + if (thd_ptr) { + if ((((THD *)thd_ptr)->wsrep_conflict_state == MUST_ABORT) || + (((THD *)thd_ptr)->wsrep_conflict_state == ABORTING)) { + return 1; + } + } + return 0; +} +#endif /** Implementation of Drop_table_error_handler::handle_condition(). @@ -723,7 +857,11 @@ bool Drop_table_error_handler::handle_condition(THD *thd, } +#ifdef WITH_WSREP +THD::THD(bool is_applier) +#else THD::THD() +#endif :Statement(&main_lex, &main_mem_root, STMT_CONVENTIONAL_EXECUTION, /* statement id */ 0), rli_fake(0), @@ -750,6 +888,10 @@ THD::THD() bootstrap(0), derived_tables_processing(FALSE), spcont(NULL), +#ifdef WITH_WSREP + wsrep_applier(is_applier), + wsrep_client_thread(0), +#endif m_parser_state(NULL), #if defined(ENABLED_DEBUG_SYNC) debug_sync_control(0), @@ -849,6 +991,20 @@ THD::THD() command=COM_CONNECT; *scramble= '\0'; +#ifdef WITH_WSREP + mysql_mutex_init(key_LOCK_wsrep_thd, &LOCK_wsrep_thd, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_wsrep_thd, &COND_wsrep_thd, NULL); + wsrep_trx_handle.trx_id= -1; + wsrep_trx_handle.opaque= NULL; + //wsrep_retry_autocommit= ::wsrep_retry_autocommit; + wsrep_retry_counter= 0; + wsrep_PA_safe = true; + wsrep_seqno_changed= false; + wsrep_retry_query = NULL; + wsrep_retry_query_len = 0; + wsrep_retry_command = COM_CONNECT; + wsrep_consistency_check = false; +#endif /* Call to init() below requires fully initialized Open_tables_state. */ reset_open_tables_state(this); @@ -881,6 +1037,13 @@ THD::THD() my_rnd_init(&rand, tmp + (ulong) &rand, tmp + (ulong) ::global_query_id); substitute_null_with_insert_id = FALSE; thr_lock_info_init(&lock_info); /* safety: will be reset after start */ +#ifdef WITH_WSREP + lock_info.mysql_thd= (void *)this; + lock_info.in_lock_tables= false; +#ifdef WSREP_PROC_INFO + wsrep_info[sizeof(wsrep_info) - 1] = '\0'; /* make sure it is 0-terminated */ +#endif /* WSREP_PROC_INFO */ +#endif /* WITH_WSREP */ m_internal_handler= NULL; m_binlog_invoker= FALSE; @@ -1182,7 +1345,19 @@ void THD::init(void) reset_current_stmt_binlog_format_row(); bzero((char *) &status_var, sizeof(status_var)); bzero((char *) &org_status_var, sizeof(org_status_var)); - +#ifdef WITH_WSREP + wsrep_exec_mode= wsrep_applier ? REPL_RECV : LOCAL_STATE; + wsrep_conflict_state= NO_CONFLICT; + wsrep_query_state= QUERY_IDLE; + wsrep_last_query_id= 0; + wsrep_trx_seqno= 0; + wsrep_converted_lock_session= false; + wsrep_retry_counter= 0; + wsrep_rli= NULL; + wsrep_PA_safe= true; + wsrep_seqno_changed= false; + wsrep_consistency_check = false; +#endif if (variables.sql_log_bin) variables.option_bits|= OPTION_BIN_LOG; else @@ -1376,6 +1551,12 @@ THD::~THD() mysql_mutex_unlock(&LOCK_thd_data); add_to_status(&global_status_var, &status_var); +#ifdef WITH_WSREP + mysql_mutex_lock(&LOCK_wsrep_thd); + mysql_mutex_unlock(&LOCK_wsrep_thd); + mysql_mutex_destroy(&LOCK_wsrep_thd); + if (wsrep_rli) delete wsrep_rli; +#endif /* Close connection */ #ifndef EMBEDDED_LIBRARY if (net.vio) @@ -1790,6 +1971,13 @@ void THD::cleanup_after_query() /* reset table map for multi-table update */ table_map_for_update= 0; m_binlog_invoker= FALSE; +#ifdef WITH_WSREP + if (TOTAL_ORDER == wsrep_exec_mode) + { + wsrep_exec_mode = LOCAL_STATE; + } + //wsrep_trx_seqno = 0; +#endif /* WITH_WSREP */ DBUG_VOID_RETURN; } @@ -2205,6 +2393,13 @@ bool sql_exchange::escaped_given(void) bool select_send::send_result_set_metadata(List<Item> &list, uint flags) { bool res; +#ifdef WITH_WSREP + if (WSREP(thd) && thd->wsrep_retry_query) + { + WSREP_DEBUG("skipping select metadata"); + return FALSE; + } +#endif /* WITH_WSREP */ if (!(res= thd->protocol->send_result_set_metadata(&list, flags))) is_result_set_started= 1; return res; @@ -3911,8 +4106,13 @@ extern "C" int thd_non_transactional_update(const MYSQL_THD thd) extern "C" int thd_binlog_format(const MYSQL_THD thd) { +#ifdef WITH_WSREP + if (((WSREP(thd) && wsrep_emulate_bin_log) || mysql_bin_log.is_open()) && + (thd->variables.option_bits & OPTION_BIN_LOG)) +#else if (mysql_bin_log.is_open() && (thd->variables.option_bits & OPTION_BIN_LOG)) - return (int) thd->variables.binlog_format; +#endif + return (int) WSREP_FORMAT(thd->variables.binlog_format); else return BINLOG_FORMAT_UNSPEC; } @@ -4467,7 +4667,7 @@ int THD::decide_logging_format(TABLE_LIST *tables) binlog by filtering rules. */ if (mysql_bin_log.is_open() && (variables.option_bits & OPTION_BIN_LOG) && - !(variables.binlog_format == BINLOG_FORMAT_STMT && + !(WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_STMT && !binlog_filter->db_ok(db))) { /* @@ -4631,7 +4831,7 @@ int THD::decide_logging_format(TABLE_LIST *tables) */ my_error((error= ER_BINLOG_ROW_INJECTION_AND_STMT_ENGINE), MYF(0)); } - else if (variables.binlog_format == BINLOG_FORMAT_ROW && + else if (WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_ROW && sqlcom_can_generate_row_events(this)) { /* @@ -4660,7 +4860,7 @@ int THD::decide_logging_format(TABLE_LIST *tables) else { /* binlog_format = STATEMENT */ - if (variables.binlog_format == BINLOG_FORMAT_STMT) + if (WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_STMT) { if (lex->is_stmt_row_injection()) { @@ -4677,7 +4877,14 @@ int THD::decide_logging_format(TABLE_LIST *tables) 5. Error: Cannot modify table that uses a storage engine limited to row-logging when binlog_format = STATEMENT */ +#ifdef WITH_WSREP + if (!WSREP(this) || wsrep_exec_mode == LOCAL_STATE) + { +#endif /* WITH_WSREP */ my_error((error= ER_BINLOG_STMT_MODE_AND_ROW_ENGINE), MYF(0), ""); +#ifdef WITH_WSREP + } +#endif /* WITH_WSREP */ } else if (is_write && (unsafe_flags= lex->get_stmt_unsafe_flags()) != 0) { @@ -4725,7 +4932,7 @@ int THD::decide_logging_format(TABLE_LIST *tables) "and binlog_filter->db_ok(db) = %d", mysql_bin_log.is_open(), (variables.option_bits & OPTION_BIN_LOG), - variables.binlog_format, + WSREP_FORMAT(variables.binlog_format), binlog_filter->db_ok(db))); #endif @@ -4979,7 +5186,13 @@ int THD::binlog_write_row(TABLE* table, bool is_trans, MY_BITMAP const* cols, size_t colcnt, uchar const *record) { +#ifdef WITH_WSREP + DBUG_ASSERT(is_current_stmt_binlog_format_row() && + ((WSREP(this) && wsrep_emulate_bin_log) || + mysql_bin_log.is_open())); +#else DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open()); +#endif /* Pack records into format for transfer. We are allocating more @@ -5009,7 +5222,13 @@ int THD::binlog_update_row(TABLE* table, bool is_trans, const uchar *before_record, const uchar *after_record) { +#ifdef WITH_WSREP + DBUG_ASSERT(is_current_stmt_binlog_format_row() && + ((WSREP(this) && wsrep_emulate_bin_log) + || mysql_bin_log.is_open())); +#else DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open()); +#endif size_t const before_maxlen = max_row_length(table, before_record); size_t const after_maxlen = max_row_length(table, after_record); @@ -5054,7 +5273,13 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans, MY_BITMAP const* cols, size_t colcnt, uchar const *record) { +#ifdef WITH_WSREP + DBUG_ASSERT(is_current_stmt_binlog_format_row() && + ((WSREP(this) && wsrep_emulate_bin_log) + || mysql_bin_log.is_open())); +#else DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open()); +#endif /* Pack records into format for transfer. We are allocating more @@ -5085,7 +5310,11 @@ int THD::binlog_remove_pending_rows_event(bool clear_maps, { DBUG_ENTER("THD::binlog_remove_pending_rows_event"); +#ifdef WITH_WSREP + if (!(WSREP_EMULATE_BINLOG(this) || mysql_bin_log.is_open())) +#else if (!mysql_bin_log.is_open()) +#endif DBUG_RETURN(0); mysql_bin_log.remove_pending_rows_event(this, is_transactional); @@ -5104,7 +5333,11 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional) mode: it might be the case that we left row-based mode before flushing anything (e.g., if we have explicitly locked tables). */ +#ifdef WITH_WSREP + if (!(WSREP_EMULATE_BINLOG(this) || mysql_bin_log.is_open())) +#else if (!mysql_bin_log.is_open()) +#endif DBUG_RETURN(0); /* @@ -5224,8 +5457,12 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg, DBUG_ENTER("THD::binlog_query"); DBUG_PRINT("enter", ("qtype: %s query: '%-.*s'", show_query_type(qtype), (int) query_len, query_arg)); +#ifdef WITH_WSREP + DBUG_ASSERT(query_arg && (WSREP_EMULATE_BINLOG(this) + || mysql_bin_log.is_open())); +#else DBUG_ASSERT(query_arg && mysql_bin_log.is_open()); - +#endif /* If we are not in prelocked mode, mysql_unlock_tables() will be called after this binlog_query(), so we have to flush the pending diff --git a/sql/sql_class.h b/sql/sql_class.h index c6c46975076..96a67fccaef 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -20,6 +20,32 @@ #define SQL_CLASS_INCLUDED /* Classes in mysql */ +#ifdef WITH_WSREP +#include "../wsrep/wsrep_api.h" +//#include "wsrep_mysqld.h" + enum wsrep_exec_mode { + LOCAL_STATE, + REPL_RECV, + TOTAL_ORDER, + LOCAL_COMMIT, + }; + enum wsrep_query_state { + QUERY_IDLE, + QUERY_EXEC, + QUERY_COMMITTING, + QUERY_EXITING, + QUERY_ROLLINGBACK, + }; + enum wsrep_conflict_state { + NO_CONFLICT, + MUST_ABORT, + ABORTING, + ABORTED, + MUST_REPLAY, + REPLAYING, + RETRY_AUTOCOMMIT, + }; +#endif #ifdef USE_PRAGMA_INTERFACE #pragma interface /* gcc class implementation */ @@ -45,6 +71,15 @@ THR_LOCK_INFO */ +#ifdef WITH_WSREP +#include "wsrep_mysqld.h" +struct wsrep_thd_shadow { + ulonglong options; + enum wsrep_exec_mode wsrep_exec_mode; + Vio *vio; + ulong tx_isolation; +}; +#endif class Reprepare_observer; class Relay_log_info; @@ -572,6 +607,11 @@ typedef struct system_variables ulong wt_timeout_short, wt_deadlock_search_depth_short; ulong wt_timeout_long, wt_deadlock_search_depth_long; +#ifdef WITH_WSREP + my_bool wsrep_on; + my_bool wsrep_causal_reads; + ulong wsrep_retry_autocommit; +#endif double long_query_time_double; } SV; @@ -965,6 +1005,9 @@ struct st_savepoint { /** State of metadata locks before this savepoint was set. */ MDL_savepoint mdl_savepoint; }; +#ifdef WITH_WSREP +void wsrep_cleanup_transaction(THD *thd); // THD.transactions.cleanup calls it +#endif enum xa_states {XA_NOTR=0, XA_ACTIVE, XA_IDLE, XA_PREPARED, XA_ROLLBACK_ONLY}; extern const char *xa_state_names[]; @@ -1781,7 +1824,7 @@ public: int is_current_stmt_binlog_format_row() const { DBUG_ASSERT(current_stmt_binlog_format == BINLOG_FORMAT_STMT || current_stmt_binlog_format == BINLOG_FORMAT_ROW); - return current_stmt_binlog_format == BINLOG_FORMAT_ROW; + return (WSREP_FORMAT((ulong)current_stmt_binlog_format) == BINLOG_FORMAT_ROW); } private: @@ -1839,7 +1882,11 @@ public: */ CHANGED_TABLE_LIST* changed_tables; MEM_ROOT mem_root; // Transaction-life memory allocation pool +#ifdef WITH_WSREP + void cleanup(THD *thd) +#else void cleanup() +#endif { changed_tables= 0; savepoints= 0; @@ -1852,6 +1899,11 @@ public: if (!xid_state.rm_error) xid_state.xid.null(); free_root(&mem_root,MYF(MY_KEEP_PREALLOC)); +#ifdef WITH_WSREP + // Todo: convert into a plugin method + // wsrep's post-commit. LOCAL_COMMIT designates wsrep's commit was ok + if (WSREP(thd)) wsrep_cleanup_transaction(thd); +#endif /* WITH_WSREP */ } my_bool is_active() { @@ -2314,6 +2366,31 @@ public: query_id_t first_query_id; } binlog_evt_union; +#ifdef WITH_WSREP + const bool wsrep_applier; /* dedicated slave applier thread */ + bool wsrep_client_thread; /* to identify client threads*/ + enum wsrep_exec_mode wsrep_exec_mode; + query_id_t wsrep_last_query_id; + enum wsrep_query_state wsrep_query_state; + enum wsrep_conflict_state wsrep_conflict_state; + mysql_mutex_t LOCK_wsrep_thd; + mysql_cond_t COND_wsrep_thd; + wsrep_seqno_t wsrep_trx_seqno; + uint32 wsrep_rand; + Relay_log_info* wsrep_rli; + bool wsrep_converted_lock_session; + wsrep_trx_handle_t wsrep_trx_handle; + bool wsrep_seqno_changed; +#ifdef WSREP_PROC_INFO + char wsrep_info[128]; /* string for dynamic proc info */ +#endif /* WSREP_PROC_INFO */ + ulong wsrep_retry_counter; // of autocommit + bool wsrep_PA_safe; + char* wsrep_retry_query; + size_t wsrep_retry_query_len; + enum enum_server_command wsrep_retry_command; + bool wsrep_consistency_check; +#endif /* WITH_WSREP */ /** Internal parser state. Note that since the parser is not re-entrant, we keep only one parser @@ -2345,7 +2422,11 @@ public: /* Debug Sync facility. See debug_sync.cc. */ struct st_debug_sync_control *debug_sync_control; #endif /* defined(ENABLED_DEBUG_SYNC) */ +#ifdef WITH_WSREP + THD(bool is_applier = false); +#else THD(); +#endif ~THD(); void init(void); @@ -2741,7 +2822,7 @@ public: tests fail and so force them to propagate the lex->binlog_row_based_if_mixed upwards to the caller. */ - if ((variables.binlog_format == BINLOG_FORMAT_MIXED) && + if ((WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_MIXED) && (in_sub_stmt == 0)) set_current_stmt_binlog_format_row(); @@ -2783,7 +2864,7 @@ public: show_system_thread(system_thread))); if (in_sub_stmt == 0) { - if (variables.binlog_format == BINLOG_FORMAT_ROW) + if (WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_ROW) set_current_stmt_binlog_format_row(); else if (temporary_tables == NULL) clear_current_stmt_binlog_format_row(); diff --git a/sql/sql_connect.cc b/sql/sql_connect.cc index 7157b1b109f..e96068484eb 100644 --- a/sql/sql_connect.cc +++ b/sql/sql_connect.cc @@ -44,6 +44,9 @@ HASH global_index_stats; extern mysql_mutex_t LOCK_global_user_client_stats; extern mysql_mutex_t LOCK_global_table_stats; extern mysql_mutex_t LOCK_global_index_stats; +#ifdef WITH_WSREP +#include "wsrep_mysqld.h" +#endif /* Get structure for logging connection data for the current user @@ -97,6 +100,9 @@ int get_or_create_user_conn(THD *thd, const char *user, } thd->user_connect=uc; uc->connections++; +#ifdef WITH_WSREP + thd->wsrep_client_thread= 1; +#endif /* WITH_WSREP */ end: mysql_mutex_unlock(&LOCK_user_conn); return return_val; @@ -977,7 +983,11 @@ bool setup_connection_thread_globals(THD *thd) { if (thd->store_globals()) { +#ifdef WITH_WSREP + close_connection(thd, ER_OUT_OF_RESOURCES, 1); +#else close_connection(thd, ER_OUT_OF_RESOURCES); +#endif statistic_increment(aborted_connects,&LOCK_status); MYSQL_CALLBACK(thd->scheduler, end_thread, (thd, 0)); return 1; // Error @@ -1050,6 +1060,17 @@ bool login_connection(THD *thd) void end_connection(THD *thd) { NET *net= &thd->net; +#ifdef WITH_WSREP + if (WSREP(thd)) + { + wsrep_status_t rcode= wsrep->free_connection(wsrep, thd->thread_id); + if (rcode) { + WSREP_WARN("wsrep failed to free connection context: %lu, code: %d", + thd->thread_id, rcode); + } + } + thd->wsrep_client_thread= 0; +#endif plugin_thdvar_cleanup(thd); if (thd->user_connect) @@ -1205,7 +1226,11 @@ void do_handle_one_connection(THD *thd_arg) if (MYSQL_CALLBACK_ELSE(thd->scheduler, init_new_connection_thread, (), 0)) { +#ifdef WITH_WSREP + close_connection(thd, ER_OUT_OF_RESOURCES, 1); +#else close_connection(thd, ER_OUT_OF_RESOURCES); +#endif statistic_increment(aborted_connects,&LOCK_status); MYSQL_CALLBACK(thd->scheduler, end_thread, (thd, 0)); return; @@ -1254,9 +1279,21 @@ void do_handle_one_connection(THD *thd_arg) break; } end_connection(thd); - + +#ifdef WITH_WSREP + if (WSREP(thd)) + { + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + thd->wsrep_query_state= QUERY_EXITING; + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + } +#endif end_thread: +#ifdef WITH_WSREP + close_connection(thd, 0, 1); +#else close_connection(thd); +#endif if (thd->userstat_running) update_global_user_stats(thd, create_user, time(NULL)); diff --git a/sql/sql_delete.cc b/sql/sql_delete.cc index ff88bf7c0f8..7fc623daf88 100644 --- a/sql/sql_delete.cc +++ b/sql/sql_delete.cc @@ -410,7 +410,11 @@ cleanup: /* See similar binlogging code in sql_update.cc, for comments */ if ((error < 0) || thd->transaction.stmt.modified_non_trans_table) { +#ifdef WITH_WSREP + if ((WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())) +#else if (mysql_bin_log.is_open()) +#endif { int errcode= 0; if (error < 0) @@ -861,7 +865,11 @@ void multi_delete::abort_result_set() /* there is only side effects; to binlog with the error */ +#ifdef WITH_WSREP + if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) +#else if (mysql_bin_log.is_open()) +#endif { int errcode= query_error_code(thd, thd->killed == NOT_KILLED); /* possible error of writing binary log is ignored deliberately */ @@ -1037,7 +1045,11 @@ bool multi_delete::send_eof() } if ((local_error == 0) || thd->transaction.stmt.modified_non_trans_table) { +#ifdef WITH_WSREP + if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) +#else if (mysql_bin_log.is_open()) +#endif { int errcode= 0; if (local_error == 0) diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index 54f94ce78c1..a4eb1e8996f 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -1019,7 +1019,11 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, thd->transaction.stmt.modified_non_trans_table || was_insert_delayed) { +#ifdef WITH_WSREP + if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) +#else if (mysql_bin_log.is_open()) +#endif { int errcode= 0; if (error <= 0) @@ -3113,6 +3117,11 @@ bool Delayed_insert::handle_inserts(void) mysql_cond_broadcast(&cond_client); // If waiting clients } } +#ifdef WITH_WSREP + if (WSREP((&thd))) + thd_proc_info(&thd, "insert done"); + else +#endif /* WITH_WSREP */ thd_proc_info(&thd, 0); mysql_mutex_unlock(&mutex); @@ -3597,8 +3606,13 @@ bool select_insert::send_eof() events are in the transaction cache and will be written when ha_autocommit_or_rollback() is issued below. */ +#ifdef WITH_WSREP + if ((WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) && + (!error || thd->transaction.stmt.modified_non_trans_table)) +#else if (mysql_bin_log.is_open() && (!error || thd->transaction.stmt.modified_non_trans_table)) +#endif { int errcode= 0; if (!error) @@ -3681,7 +3695,11 @@ void select_insert::abort_result_set() { if (!can_rollback_data()) thd->transaction.all.modified_non_trans_table= TRUE; +#ifdef WITH_WSREP + if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) +#else if (mysql_bin_log.is_open()) +#endif { int errcode= query_error_code(thd, thd->killed == NOT_KILLED); /* error of writing binary log is ignored */ @@ -4072,7 +4090,11 @@ select_create::binlog_show_create_table(TABLE **tables, uint count) /* show_database */ TRUE); DBUG_ASSERT(result == 0); /* store_create_info() always return 0 */ +#ifdef WITH_WSREP + if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) +#else if (mysql_bin_log.is_open()) +#endif { int errcode= query_error_code(thd, thd->killed == NOT_KILLED); result= thd->binlog_query(THD::STMT_QUERY_TYPE, diff --git a/sql/sql_lex.cc b/sql/sql_lex.cc index b2dfae5ded4..070bcdfcfd4 100644 --- a/sql/sql_lex.cc +++ b/sql/sql_lex.cc @@ -1548,6 +1548,17 @@ int lex_one_token(void *arg, void *yythd) } else { +#ifdef WITH_WSREP + if (version == 99997 && thd->wsrep_exec_mode == LOCAL_STATE) + { + WSREP_DEBUG("consistency check: %s", thd->query()); + thd->wsrep_consistency_check= TRUE; + lip->yySkipn(5); + lip->set_echo(TRUE); + state=MY_LEX_START; + break; /* Do not treat contents as a comment. */ + } +#endif /* WITH_WSREP */ /* Patch and skip the conditional comment to avoid it being propagated infinitely (eg. to a slave). diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index e0b2acd199d..a955b434319 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -103,6 +103,24 @@ #include "../storage/maria/ha_maria.h" #endif +#ifdef WITH_WSREP +#include "wsrep_mysqld.h" +#include "rpl_rli.h" + +extern Format_description_log_event *wsrep_format_desc; +#define WSREP_MYSQL_DB (char *)"mysql" + +#define WSREP_TO_ISOLATION_BEGIN(db_, table_) \ + if (WSREP(thd) && wsrep_to_isolation_begin(thd, db_, table_)) goto error; + +#define WSREP_TO_ISOLATION_END \ + if (WSREP(thd) || (thd && thd->wsrep_exec_mode==TOTAL_ORDER)) \ + wsrep_to_isolation_end(thd); + +#else +#define WSREP_TO_ISOLATION_BEGIN(db_, table_) +#define WSREP_TO_ISOLATION_END +#endif /* WITH_WSREP */ /** @defgroup Runtime_Environment Runtime Environment @{ @@ -436,6 +454,13 @@ bool is_log_table_write_query(enum enum_sql_command command) return (sql_command_flags[command] & CF_WRITE_LOGS_COMMAND) != 0; } +#ifdef WITH_WSREP +bool is_show_query(enum enum_sql_command command) +{ + DBUG_ASSERT(command >= 0 && command <= SQLCOM_END); + return (sql_command_flags[command] & CF_STATUS_COMMAND) != 0; +} +#endif void execute_init_command(THD *thd, LEX_STRING *init_command, mysql_rwlock_t *var_lock) { @@ -617,8 +642,12 @@ void do_handle_bootstrap(THD *thd) if (my_thread_init() || thd->store_globals()) { #ifndef EMBEDDED_LIBRARY +#ifdef WITH_WSREP + close_connection(thd, ER_OUT_OF_RESOURCES, 1); +#else close_connection(thd, ER_OUT_OF_RESOURCES); #endif +#endif thd->fatal_error(); goto end; } @@ -692,7 +721,26 @@ bool do_command(THD *thd) NET *net= &thd->net; enum enum_server_command command; DBUG_ENTER("do_command"); +#ifdef WITH_WSREP + if (WSREP(thd)) + { + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + thd->wsrep_query_state= QUERY_IDLE; + if (thd->wsrep_conflict_state==MUST_ABORT) + { + thd->wsrep_conflict_state= ABORTING; + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + trans_rollback(thd); + thd->locked_tables_list.unlock_locked_tables(thd); + /* Release transactional metadata locks. */ + thd->mdl_context.release_transactional_locks(); + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + thd->wsrep_conflict_state= ABORTED; + } + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + } +#endif /* indicator of uninitialized lex => normal flow of errors handling (see my_message_sql) @@ -738,11 +786,57 @@ bool do_command(THD *thd) */ DEBUG_SYNC(thd, "before_do_command_net_read"); +#ifdef WITH_WSREP + if (WSREP(thd)) { + packet_length= my_net_read(net); + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + /* these THD's are aborted or are aborting during being idle */ + if (thd->wsrep_conflict_state == ABORTING) + { + while (thd->wsrep_conflict_state == ABORTING) { + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + my_sleep(1000); + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + } + thd->store_globals(); + } + else if (thd->wsrep_conflict_state == ABORTED) + { + thd->store_globals(); + } + + thd->wsrep_query_state= QUERY_EXEC; + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + } + if ((WSREP(thd) && packet_length == packet_error) || + (!WSREP(thd) && (packet_length= my_net_read(net)) == packet_error)) +#else if ((packet_length= my_net_read(net)) == packet_error) +#endif { DBUG_PRINT("info",("Got error %d reading command from socket %s", net->error, vio_description(net->vio))); +#ifdef WITH_WSREP + if (WSREP(thd)) { + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + if (thd->wsrep_conflict_state == MUST_ABORT) + { + DBUG_PRINT("wsrep",("aborted for wsrep rollback: %lu", thd->real_id)); + thd->wsrep_conflict_state= ABORTING; + + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + trans_rollback(thd); + thd->locked_tables_list.unlock_locked_tables(thd); + /* Release transactional metadata locks. */ + thd->mdl_context.release_transactional_locks(); + + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + thd->wsrep_conflict_state= ABORTED; + } + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + } +#endif /* Check if we can continue without closing the connection */ @@ -788,12 +882,54 @@ bool do_command(THD *thd) vio_description(net->vio), command, command_name[command].str)); +#ifdef WITH_WSREP + if (WSREP(thd)) { + /* + * bail out if DB snapshot has not been installed. We however, + * allow queries "SET" and "SHOW", they are trapped later in execute_command + */ + if (thd->variables.wsrep_on && !thd->wsrep_applier && !wsrep_ready && + command != COM_QUERY && + command != COM_PING && + command != COM_QUIT && + command != COM_PROCESS_INFO && + command != COM_PROCESS_KILL && + command != COM_SET_OPTION && + command != COM_SHUTDOWN && + command != COM_SLEEP && + command != COM_STATISTICS && + command != COM_TIME && + command != COM_END + ) { + my_error(ER_UNKNOWN_COM_ERROR, MYF(0), + "WSREP has not yet prepared node for application use"); + thd->protocol->end_statement(); + return_value= FALSE; + goto out; + } + } +#endif /* Restore read timeout value */ my_net_set_read_timeout(net, thd->variables.net_read_timeout); DBUG_ASSERT(packet_length); return_value= dispatch_command(command, thd, packet+1, (uint) (packet_length-1)); - +#ifdef WITH_WSREP + if (WSREP(thd)) { + while (thd->wsrep_conflict_state== RETRY_AUTOCOMMIT) + { + return_value= dispatch_command(command, thd, thd->wsrep_retry_query, + thd->wsrep_retry_query_len); + } + } + if (thd->wsrep_retry_query) + { + my_free(thd->wsrep_retry_query); + thd->wsrep_retry_query = NULL; + thd->wsrep_retry_query_len = 0; + thd->wsrep_retry_command = COM_CONNECT; + } +#endif out: DBUG_RETURN(return_value); } @@ -867,6 +1003,34 @@ static my_bool deny_updates_if_read_only_option(THD *thd, DBUG_RETURN(FALSE); } +#ifdef WITH_WSREP +static my_bool wsrep_read_only_option(THD *thd, TABLE_LIST *all_tables) +{ + int opt_readonly_saved = opt_readonly; + ulong flag_saved = (ulong)(thd->security_ctx->master_access & SUPER_ACL); + + opt_readonly = 0; + thd->security_ctx->master_access &= ~SUPER_ACL; + + my_bool ret = !deny_updates_if_read_only_option(thd, all_tables); + + opt_readonly = opt_readonly_saved; + thd->security_ctx->master_access |= flag_saved; + + return ret; +} + +static void wsrep_copy_query(THD *thd) +{ + thd->wsrep_retry_command = thd->command; + thd->wsrep_retry_query_len = thd->query_length(); + thd->wsrep_retry_query = (char *)my_malloc( + thd->wsrep_retry_query_len + 1, MYF(0)); + thd->wsrep_retry_command = thd->command; + strcpy(thd->wsrep_retry_query, thd->query()); + thd->wsrep_retry_query[thd->wsrep_retry_query_len] = '\0'; +} +#endif /* WITH_WSREP */ /** Perform one connection-level (COM_XXXX) command. @@ -896,6 +1060,50 @@ bool dispatch_command(enum enum_server_command command, THD *thd, DBUG_ENTER("dispatch_command"); DBUG_PRINT("info", ("command: %d", command)); +#ifdef WITH_WSREP + bool is_autocommit= false; + + if (WSREP(thd)) { + if (!thd->in_multi_stmt_transaction_mode()) + { + thd->wsrep_PA_safe= true; + } + + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + thd->wsrep_query_state= QUERY_EXEC; + if (thd->wsrep_conflict_state== RETRY_AUTOCOMMIT) + { + thd->wsrep_conflict_state= NO_CONFLICT; + } + + is_autocommit= !thd->in_multi_stmt_transaction_mode() && + thd->wsrep_conflict_state == NO_CONFLICT && + !thd->wsrep_applier && + wsrep_read_only_option(thd, thd->lex->query_tables); + + if (thd->wsrep_conflict_state== MUST_ABORT) + { + thd->wsrep_conflict_state= ABORTING; + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + trans_rollback(thd); + thd->locked_tables_list.unlock_locked_tables(thd); + /* Release transactional metadata locks. */ + thd->mdl_context.release_transactional_locks(); + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + thd->wsrep_conflict_state= ABORTED; + } + if (thd->wsrep_conflict_state== ABORTED) + { + my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction"); + WSREP_DEBUG("Deadlock error for: %s", thd->query()); + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + thd->killed= NOT_KILLED; + thd->mysys_var->abort= 0; + goto dispatch_end; + } + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + } +#endif /* WITH_WSREP */ #if defined(ENABLED_PROFILING) thd->profiling.start_new_query(); #endif @@ -1104,7 +1312,12 @@ bool dispatch_command(enum enum_server_command command, THD *thd, Count each statement from the client. */ statistic_increment(thd->status_var.questions, &LOCK_status); +#ifdef WITH_WSREP + if (!WSREP(thd)) + thd->set_time(); /* Reset the query start time. */ +#else thd->set_time(); /* Reset the query start time. */ +#endif parser_state.reset(beginning_of_next_stmt, length); /* TODO: set thd->lex->sql_command to SQLCOM_END here */ mysql_parse(thd, beginning_of_next_stmt, length, &parser_state); @@ -1422,6 +1635,156 @@ bool dispatch_command(enum enum_server_command command, THD *thd, my_message(ER_UNKNOWN_COM_ERROR, ER(ER_UNKNOWN_COM_ERROR), MYF(0)); break; } +#ifdef WITH_WSREP + dispatch_end: + + if (WSREP(thd)) { + /* wsrep BF abort in query exec phase */ + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + if (thd->wsrep_conflict_state == MUST_ABORT) { + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + ha_rollback_trans(thd, 0); + thd->locked_tables_list.unlock_locked_tables(thd); + /* Release transactional metadata locks. */ + thd->mdl_context.release_transactional_locks(); + thd->transaction.stmt.reset(); + WSREP_DEBUG("abort in exec query state, avoiding autocommit"); + goto wsrep_must_abort; + } + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + } +#endif /* WITH_WSREP */ + +#ifdef WITH_WSREP + wsrep_must_abort: + if (WSREP(thd)) { + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + if (thd->wsrep_conflict_state == MUST_ABORT) { + thd->wsrep_conflict_state= ABORTING; + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + + WSREP_DEBUG("in dispatch_command, aborting %s", + (thd->query()) ? thd->query() : "void"); + trans_rollback(thd); + thd->locked_tables_list.unlock_locked_tables(thd); + /* Release transactional metadata locks. */ + thd->mdl_context.release_transactional_locks(); + + if (thd->get_binlog_table_maps()) { + thd->clear_binlog_table_maps(); + } + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + thd->wsrep_conflict_state= ABORTED; + } + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + /* checking if BF trx must be replayed */ + if (thd->wsrep_conflict_state== MUST_REPLAY) { + if (thd->wsrep_exec_mode!= REPL_RECV) { + if (thd->stmt_da->is_sent) { + WSREP_ERROR("replay issue, thd has reported status already"); + } + thd->stmt_da->reset_diagnostics_area(); + + thd->wsrep_conflict_state= REPLAYING; + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + + mysql_reset_thd_for_next_command(thd, opt_userstat_running); + thd->killed= NOT_KILLED; + close_thread_tables(thd); + + thd_proc_info(thd, "wsrep replaying trx"); + WSREP_DEBUG("replay trx: %s %lld", + thd->query() ? thd->query() : "void", + (long long)thd->wsrep_trx_seqno); + struct wsrep_thd_shadow shadow; + wsrep_prepare_bf_thd(thd, &shadow); + int rcode = wsrep->replay_trx(wsrep, + &thd->wsrep_trx_handle, + (void *)thd); + + wsrep_return_from_bf_mode(thd, &shadow); + if (thd->wsrep_conflict_state!= REPLAYING) + WSREP_WARN("lost replaying mode: %d", thd->wsrep_conflict_state ); + + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + + switch (rcode) { + case WSREP_OK: + thd->wsrep_conflict_state= NO_CONFLICT; + wsrep->post_commit(wsrep, &thd->wsrep_trx_handle); + WSREP_DEBUG("trx_replay successful for: %ld %llu", + thd->thread_id, (long long)thd->real_id); + break; + case WSREP_TRX_FAIL: + if (thd->stmt_da->is_sent) { + WSREP_ERROR("replay failed, thd has reported status"); + } + else + { + WSREP_DEBUG("replay failed, rolling back"); + my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction"); + } + thd->wsrep_conflict_state= ABORTED; + wsrep->post_rollback(wsrep, &thd->wsrep_trx_handle); + break; + default: + WSREP_ERROR("trx_replay failed for: %d, query: %s", + rcode, thd->query() ? thd->query() : "void"); + /* we're now in inconsistent state, must abort */ + unireg_abort(1); + break; + } + mysql_mutex_lock(&LOCK_wsrep_replaying); + wsrep_replaying--; + WSREP_DEBUG("replaying decreased: %d, thd: %lu", + wsrep_replaying, thd->thread_id); + mysql_cond_broadcast(&COND_wsrep_replaying); + mysql_mutex_unlock(&LOCK_wsrep_replaying); + } + } + /* setting error code for BF aborted trxs */ + if (thd->wsrep_conflict_state == ABORTED) + { + mysql_reset_thd_for_next_command(thd, opt_userstat_running); + thd->killed= NOT_KILLED; + if (is_autocommit && + (thd->wsrep_retry_counter < thd->variables.wsrep_retry_autocommit)) + { + WSREP_DEBUG("wsrep retrying AC query: %s", + (thd->query()) ? thd->query() : "void"); + thd->wsrep_conflict_state= RETRY_AUTOCOMMIT; + thd->wsrep_retry_counter++; // grow + wsrep_copy_query(thd); + } + else + { + WSREP_DEBUG("BF aborted, thd: %lu is_AC: %d, retry: %lu - %lu SQL: %s", + thd->thread_id, is_autocommit, thd->wsrep_retry_counter, + thd->variables.wsrep_retry_autocommit, thd->query()); + my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction"); + thd->killed= NOT_KILLED; + thd->wsrep_conflict_state= NO_CONFLICT; + thd->wsrep_retry_counter= 0; // reset + } + } + else + { + set_if_smaller(thd->wsrep_retry_counter, 0); // reset; eventually ok + } + if ((thd->wsrep_conflict_state != REPLAYING) && + (thd->wsrep_conflict_state != RETRY_AUTOCOMMIT)) { + + thd->update_server_status(); + thd->protocol->end_statement(); + query_cache_end_of_result(thd); + } + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + + } else { /* if (WSREP(thd))... */ + +#endif /* WITH_WSREP */ DBUG_ASSERT(thd->derived_tables == NULL && (thd->open_tables == NULL || (thd->locked_tables_mode == LTM_LOCK_TABLES))); @@ -1430,6 +1793,9 @@ bool dispatch_command(enum enum_server_command command, THD *thd, thd->update_server_status(); thd->protocol->end_statement(); query_cache_end_of_result(thd); +#ifdef WITH_WSREP + } +#endif /* WITH_WSREP */ if (!thd->is_error() && !thd->killed_errno()) mysql_audit_general(thd, MYSQL_AUDIT_GENERAL_RESULT, 0, 0); @@ -1446,7 +1812,16 @@ bool dispatch_command(enum enum_server_command command, THD *thd, thd->reset_query(); thd->command=COM_SLEEP; dec_thread_running(); +#ifdef WITH_WSREP + if (WSREP(thd)) { + thd_proc_info(thd, "sleeping"); + } else { +#endif /* WITH_WSREP */ thd_proc_info(thd, 0); +#ifdef WITH_WSREP + } +#endif /* WITH_WSREP */ + thd->packet.shrink(thd->variables.net_buffer_length); // Reclaim some memory free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC)); @@ -2091,7 +2466,45 @@ mysql_execute_command(THD *thd) #ifdef HAVE_REPLICATION } /* endif unlikely slave */ #endif +#ifdef WITH_WSREP + if (WSREP(thd)) { + /* + change LOCK TABLE WRITE to transaction + */ + if (lex->sql_command== SQLCOM_LOCK_TABLES && wsrep_convert_LOCK_to_trx) + { + for (TABLE_LIST *table= all_tables; table; table= table->next_global) + { + if (table->lock_type >= TL_WRITE_ALLOW_WRITE) + { + lex->sql_command= SQLCOM_BEGIN; + thd->wsrep_converted_lock_session= true; + break; + } + } + } + if (lex->sql_command== SQLCOM_UNLOCK_TABLES && + thd->wsrep_converted_lock_session) + { + thd->wsrep_converted_lock_session= false; + lex->sql_command= SQLCOM_COMMIT; + lex->tx_release= TVL_NO; + } + /* + * bail out if DB snapshot has not been installed. We however, + * allow SET and SHOW queries + */ + if (thd->variables.wsrep_on && !thd->wsrep_applier && !wsrep_ready && + lex->sql_command != SQLCOM_SET_OPTION && + !is_show_query(lex->sql_command)) + { + my_error(ER_UNKNOWN_COM_ERROR, MYF(0), + "WSREP has not yet prepared node for application use"); + goto error; + } + } +#endif /* WITH_WSREP */ status_var_increment(thd->status_var.com_stat[lex->sql_command]); thd->progress.report_to_client= test(sql_command_flags[lex->sql_command] & CF_REPORT_PROGRESS); @@ -2134,6 +2547,9 @@ mysql_execute_command(THD *thd) #endif case SQLCOM_SHOW_STATUS_PROC: case SQLCOM_SHOW_STATUS_FUNC: +#ifdef WITH_WSREP + if (WSREP(thd) && wsrep_causal_wait(thd)) goto error; +#endif /* WITH_WSREP */ if ((res= check_table_access(thd, SELECT_ACL, all_tables, FALSE, UINT_MAX, FALSE))) goto error; @@ -2141,6 +2557,9 @@ mysql_execute_command(THD *thd) break; case SQLCOM_SHOW_STATUS: { +#ifdef WITH_WSREP + if (WSREP(thd) && wsrep_causal_wait(thd)) goto error; +#endif /* WITH_WSREP */ execute_show_status(thd, all_tables); break; } @@ -2152,17 +2571,27 @@ mysql_execute_command(THD *thd) case SQLCOM_SHOW_PLUGINS: case SQLCOM_SHOW_FIELDS: case SQLCOM_SHOW_KEYS: +#ifndef WITH_WSREP case SQLCOM_SHOW_VARIABLES: case SQLCOM_SHOW_CHARSETS: case SQLCOM_SHOW_COLLATIONS: case SQLCOM_SHOW_STORAGE_ENGINES: case SQLCOM_SHOW_PROFILE: +#endif /* WITH_WSREP */ case SQLCOM_SHOW_CLIENT_STATS: case SQLCOM_SHOW_USER_STATS: case SQLCOM_SHOW_TABLE_STATS: case SQLCOM_SHOW_INDEX_STATS: case SQLCOM_SELECT: - { +#ifdef WITH_WSREP + if (WSREP(thd) && wsrep_causal_wait(thd)) goto error; + case SQLCOM_SHOW_VARIABLES: + case SQLCOM_SHOW_CHARSETS: + case SQLCOM_SHOW_COLLATIONS: + case SQLCOM_SHOW_STORAGE_ENGINES: + case SQLCOM_SHOW_PROFILE: +#endif /* WITH_WSREP */ + { thd->status_var.last_query_cost= 0.0; /* @@ -2472,7 +2901,7 @@ case SQLCOM_PREPARE: */ if (thd->query_name_consts && mysql_bin_log.is_open() && - thd->variables.binlog_format == BINLOG_FORMAT_STMT && + WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_STMT && !mysql_bin_log.is_query_in_union(thd, thd->query_id)) { List_iterator_fast<Item> it(select_lex->item_list); @@ -2576,6 +3005,11 @@ case SQLCOM_PREPARE: if (create_info.options & HA_LEX_CREATE_TMP_TABLE) thd->variables.option_bits|= OPTION_KEEP_LOG; /* regular create */ +#ifdef WITH_WSREP + if (!thd->is_current_stmt_binlog_format_row() || + !(create_info.options & HA_LEX_CREATE_TMP_TABLE)) + WSREP_TO_ISOLATION_BEGIN(create_table->db, create_table->table_name) +#endif /* WITH_WSREP */ if (create_info.options & HA_LEX_CREATE_TABLE_LIKE) { /* CREATE TABLE ... LIKE ... */ @@ -2617,6 +3051,7 @@ end_with_restore_list: DBUG_ASSERT(first_table == all_tables && first_table != 0); if (check_one_table_access(thd, INDEX_ACL, all_tables)) goto error; /* purecov: inspected */ + WSREP_TO_ISOLATION_BEGIN(first_table->db, first_table->table_name) /* Currently CREATE INDEX or DROP INDEX cause a full table rebuild and thus classify as slow administrative statements just like @@ -2674,6 +3109,7 @@ end_with_restore_list: case SQLCOM_RENAME_TABLE: { + WSREP_TO_ISOLATION_BEGIN(first_table->db, first_table->table_name) if (execute_rename_table(thd, first_table, all_tables)) goto error; break; @@ -2701,6 +3137,10 @@ end_with_restore_list: goto error; #else { +#ifdef WITH_WSREP + if (WSREP(thd) && wsrep_causal_wait(thd)) goto error; +#endif /* WITH_WSREP */ + /* Access check: SHOW CREATE TABLE require any privileges on the table level (ie @@ -2756,6 +3196,10 @@ end_with_restore_list: case SQLCOM_CHECKSUM: { DBUG_ASSERT(first_table == all_tables && first_table != 0); +#ifdef WITH_WSREP + if (WSREP(thd) && wsrep_causal_wait(thd)) goto error; +#endif /* WITH_WSREP */ + if (check_table_access(thd, SELECT_ACL, all_tables, FALSE, UINT_MAX, FALSE)) goto error; /* purecov: inspected */ @@ -2950,6 +3394,14 @@ end_with_restore_list: DBUG_ASSERT(first_table == all_tables && first_table != 0); if ((res= insert_precheck(thd, all_tables))) break; +#ifdef WITH_WSREP + if (lex->sql_command == SQLCOM_INSERT_SELECT && + thd->wsrep_consistency_check) + { + WSREP_TO_ISOLATION_BEGIN(first_table->db, first_table->table_name); + } + +#endif /* INSERT...SELECT...ON DUPLICATE KEY UPDATE/REPLACE SELECT/ INSERT...IGNORE...SELECT can be unsafe, unless ORDER BY PRIMARY KEY @@ -3113,6 +3565,17 @@ end_with_restore_list: /* So that DROP TEMPORARY TABLE gets to binlog at commit/rollback */ thd->variables.option_bits|= OPTION_KEEP_LOG; } +#ifdef WITH_WSREP + for (TABLE_LIST *table= all_tables; table; table= table->next_global) + { + if (!thd->is_current_stmt_binlog_format_row() || + !find_temporary_table(thd, table)) + { + WSREP_TO_ISOLATION_BEGIN(table->db, table->table_name); + break; + } + } +#endif /* WITH_WSREP */ /* DDL and binlog write order are protected by metadata locks. */ res= mysql_rm_table(thd, first_table, lex->drop_if_exists, lex->drop_temporary); @@ -3156,7 +3619,6 @@ end_with_restore_list: if (!mysql_change_db(thd, &db_str, FALSE)) my_ok(thd); - break; } @@ -3299,6 +3761,7 @@ end_with_restore_list: #endif if (check_access(thd, CREATE_ACL, lex->name.str, NULL, NULL, 1, 0)) break; + WSREP_TO_ISOLATION_BEGIN(lex->name.str, NULL) res= mysql_create_db(thd,(lower_case_table_names == 2 ? alias : lex->name.str), &create_info, 0); break; @@ -3328,6 +3791,7 @@ end_with_restore_list: #endif if (check_access(thd, DROP_ACL, lex->name.str, NULL, NULL, 1, 0)) break; + WSREP_TO_ISOLATION_BEGIN(lex->name.str, NULL) res= mysql_rm_db(thd, lex->name.str, lex->drop_if_exists, 0); break; } @@ -3356,6 +3820,7 @@ end_with_restore_list: res= 1; break; } + WSREP_TO_ISOLATION_BEGIN(db->str, NULL) res= mysql_upgrade_db(thd, db); if (!res) my_ok(thd); @@ -3388,6 +3853,7 @@ end_with_restore_list: #endif if (check_access(thd, ALTER_ACL, db->str, NULL, NULL, 1, 0)) break; + WSREP_TO_ISOLATION_BEGIN(db->str, NULL) res= mysql_alter_db(thd, db->str, &create_info); break; } @@ -3420,6 +3886,7 @@ end_with_restore_list: if (res) break; + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL) switch (lex->sql_command) { case SQLCOM_CREATE_EVENT: { @@ -3454,6 +3921,7 @@ end_with_restore_list: lex->spname->m_name); break; case SQLCOM_DROP_EVENT: + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL) if (!(res= Events::drop_event(thd, lex->spname->m_db, lex->spname->m_name, lex->drop_if_exists))) @@ -3468,6 +3936,7 @@ end_with_restore_list: if (check_access(thd, INSERT_ACL, "mysql", NULL, NULL, 1, 0)) break; #ifdef HAVE_DLOPEN + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL) if (!(res = mysql_create_function(thd, &lex->udf))) my_ok(thd); #else @@ -3482,6 +3951,7 @@ end_with_restore_list: if (check_access(thd, INSERT_ACL, "mysql", NULL, NULL, 1, 1) && check_global_access(thd,CREATE_USER_ACL)) break; + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL) /* Conditionally writes to binlog */ if (!(res= mysql_create_user(thd, lex->users_list))) my_ok(thd); @@ -3493,6 +3963,7 @@ end_with_restore_list: check_global_access(thd,CREATE_USER_ACL)) break; /* Conditionally writes to binlog */ + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL) if (!(res= mysql_drop_user(thd, lex->users_list))) my_ok(thd); break; @@ -3503,6 +3974,7 @@ end_with_restore_list: check_global_access(thd,CREATE_USER_ACL)) break; /* Conditionally writes to binlog */ + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL) if (!(res= mysql_rename_user(thd, lex->users_list))) my_ok(thd); break; @@ -3517,6 +3989,7 @@ end_with_restore_list: thd->binlog_invoker(); /* Conditionally writes to binlog */ + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL) if (!(res = mysql_revoke_all(thd, lex->users_list))) my_ok(thd); break; @@ -3583,6 +4056,7 @@ end_with_restore_list: lex->type == TYPE_ENUM_PROCEDURE, 0)) goto error; /* Conditionally writes to binlog */ + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL) res= mysql_routine_grant(thd, all_tables, lex->type == TYPE_ENUM_PROCEDURE, lex->users_list, grants, @@ -3596,6 +4070,7 @@ end_with_restore_list: all_tables, FALSE, UINT_MAX, FALSE)) goto error; /* Conditionally writes to binlog */ + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL) res= mysql_table_grant(thd, all_tables, lex->users_list, lex->columns, lex->grant, lex->sql_command == SQLCOM_REVOKE); @@ -3611,6 +4086,7 @@ end_with_restore_list: } else { + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL) /* Conditionally writes to binlog */ res = mysql_grant(thd, select_lex->db, lex->users_list, lex->grant, lex->sql_command == SQLCOM_REVOKE, @@ -3749,9 +4225,17 @@ end_with_restore_list: able to open it (with SQLCOM_HA_OPEN) in the first place. */ unit->set_limit(select_lex); +#ifdef WITH_WSREP + { char* tmp_info= NULL; + if (WSREP(thd)) tmp_info = (char *)thd_proc_info(thd, "mysql_ha_read()"); +#endif /* WITH_WSREP */ res= mysql_ha_read(thd, first_table, lex->ha_read_mode, lex->ident.str, lex->insert_list, lex->ha_rkey_mode, select_lex->where, unit->select_limit_cnt, unit->offset_limit_cnt); +#ifdef WITH_WSREP + if (WSREP(thd)) thd_proc_info(thd, tmp_info); + } +#endif /* WITH_WSREP */ break; case SQLCOM_BEGIN: @@ -3819,8 +4303,20 @@ end_with_restore_list: /* Disconnect the current client connection. */ if (tx_release) thd->killed= KILL_CONNECTION; - my_ok(thd); - break; + #ifdef WITH_WSREP + if (WSREP(thd)) { + if (thd->wsrep_conflict_state == NO_CONFLICT || + thd->wsrep_conflict_state == REPLAYING) + { + my_ok(thd); + } + } else { +#endif /* WITH_WSREP */ + my_ok(thd); + #ifdef WITH_WSREP + } +#endif /* WITH_WSREP */ + break; } case SQLCOM_RELEASE_SAVEPOINT: if (trans_release_savepoint(thd, lex->ident)) @@ -3888,6 +4384,7 @@ end_with_restore_list: if (sp_process_definer(thd)) goto create_sp_error; + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL) res= (sp_result= sp_create_routine(thd, lex->sphead->m_type, lex->sphead)); switch (sp_result) { case SP_OK: { @@ -4099,6 +4596,7 @@ create_sp_error: already puts on CREATE FUNCTION. */ /* Conditionally writes to binlog */ + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL) sp_result= sp_update_routine(thd, type, lex->spname, &lex->sp_chistics); switch (sp_result) { @@ -4170,6 +4668,7 @@ create_sp_error: if (check_routine_access(thd, ALTER_PROC_ACL, db, name, lex->sql_command == SQLCOM_DROP_PROCEDURE, 0)) goto error; + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL) /* Conditionally writes to binlog */ sp_result= sp_drop_routine(thd, type, lex->spname); @@ -4287,6 +4786,7 @@ create_sp_error: Note: SQLCOM_CREATE_VIEW also handles 'ALTER VIEW' commands as specified through the thd->lex->create_view_mode flag. */ + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL) res= mysql_create_view(thd, first_table, thd->lex->create_view_mode); break; } @@ -4295,12 +4795,14 @@ create_sp_error: if (check_table_access(thd, DROP_ACL, all_tables, FALSE, UINT_MAX, FALSE)) goto error; /* Conditionally writes to binlog. */ + WSREP_TO_ISOLATION_BEGIN(NULL, NULL) res= mysql_drop_view(thd, first_table, thd->lex->drop_mode); break; } case SQLCOM_CREATE_TRIGGER: { /* Conditionally writes to binlog. */ + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL) res= mysql_create_or_drop_trigger(thd, all_tables, 1); break; @@ -4308,6 +4810,7 @@ create_sp_error: case SQLCOM_DROP_TRIGGER: { /* Conditionally writes to binlog. */ + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL) res= mysql_create_or_drop_trigger(thd, all_tables, 0); break; } @@ -4358,11 +4861,13 @@ create_sp_error: my_ok(thd); break; case SQLCOM_INSTALL_PLUGIN: + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL) if (! (res= mysql_install_plugin(thd, &thd->lex->comment, &thd->lex->ident))) my_ok(thd); break; case SQLCOM_UNINSTALL_PLUGIN: + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL) if (! (res= mysql_uninstall_plugin(thd, &thd->lex->comment, &thd->lex->ident))) my_ok(thd); @@ -4519,6 +5024,10 @@ finish: /* Free tables */ thd_proc_info(thd, "closing tables"); close_thread_tables(thd); +#ifdef WITH_WSREP + WSREP_TO_ISOLATION_END + thd->wsrep_consistency_check= FALSE; +#endif /* WITH_WSREP */ thd_proc_info(thd, 0); #ifndef DBUG_OFF @@ -5429,6 +5938,21 @@ void THD::reset_for_next_command(bool calculate_userstat) thd->auto_inc_intervals_in_cur_stmt_for_binlog.empty(); thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0; +#ifdef WITH_WSREP + if (WSREP(thd)) { + if (wsrep_auto_increment_control) + { + if (thd->variables.auto_increment_offset != + global_system_variables.auto_increment_offset) + thd->variables.auto_increment_offset= + global_system_variables.auto_increment_offset; + if (thd->variables.auto_increment_increment != + global_system_variables.auto_increment_increment) + thd->variables.auto_increment_increment= + global_system_variables.auto_increment_increment; + } + } +#endif /* WITH_WSREP */ thd->query_start_used= 0; thd->query_start_sec_part_used= 0; thd->is_fatal_error= thd->time_zone_used= 0; @@ -7381,6 +7905,545 @@ LEX_USER *create_definer(THD *thd, LEX_STRING *user_name, LEX_STRING *host_name) return definer; } +#ifdef WITH_WSREP +static enum wsrep_status wsrep_apply_sql( + THD *thd, const char *sql, size_t sql_len, time_t timeval, uint32 randseed) +{ + int error; + enum wsrep_status ret_code= WSREP_OK; + + DBUG_ENTER("wsrep_bf_execute_cb"); + thd->wsrep_exec_mode= REPL_RECV; + thd->net.vio= 0; + thd->start_time= timeval; + thd->wsrep_rand= randseed; + + thd->variables.option_bits |= OPTION_NOT_AUTOCOMMIT; + + DBUG_PRINT("wsrep", ("SQL: %s", sql)); + + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + thd->wsrep_query_state= QUERY_EXEC; + /* preserve replaying mode */ + if (thd->wsrep_conflict_state!= REPLAYING) + thd->wsrep_conflict_state= NO_CONFLICT; + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + + if ((error= dispatch_command(COM_QUERY, thd, (char*)sql, sql_len))) { + WSREP_WARN("BF SQL apply failed: %d, %lld", + thd->wsrep_conflict_state, (long long)thd->wsrep_trx_seqno); + DBUG_RETURN(WSREP_FATAL); + } + + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + if (thd->wsrep_conflict_state!= NO_CONFLICT && + thd->wsrep_conflict_state!= REPLAYING) { + ret_code= WSREP_FATAL; + WSREP_DEBUG("BF thd ending, with: %d, %lld", + thd->wsrep_conflict_state, (long long)thd->wsrep_trx_seqno); + } + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + + assert(thd->wsrep_exec_mode== REPL_RECV); + DBUG_RETURN(ret_code); +} + +void wsrep_write_rbr_buf( + THD *thd, const void* rbr_buf, size_t buf_len) +{ + char filename[PATH_MAX]= {0}; + int len= snprintf(filename, PATH_MAX, "%s/GRA_%ld_%lld.log", + wsrep_data_home_dir, thd->thread_id, + (long long)thd->wsrep_trx_seqno); + if (len >= PATH_MAX) + { + WSREP_ERROR("RBR dump path too long: %d, skipping dump.", len); + return; + } + + FILE *of= fopen(filename, "wb"); + if (of) + { + fwrite (rbr_buf, buf_len, 1, of); + fclose(of); + } + else + { + WSREP_ERROR("Failed to open file '%s': %d (%s)", + filename, errno, strerror(errno)); + } +} + +static inline wsrep_status_t wsrep_apply_rbr( + THD *thd, const uchar *rbr_buf, size_t buf_len) +{ + char *buf= (char *)rbr_buf; + int rcode= 0; + int event= 1; + + DBUG_ENTER("wsrep_apply_rbr"); + + if (thd->killed == KILL_CONNECTION) + { + WSREP_INFO("applier has been aborted, skipping apply_rbr: %lld", + (long long) thd->wsrep_trx_seqno); + DBUG_RETURN(WSREP_FATAL); + } + + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + thd->wsrep_query_state= QUERY_EXEC; + if (thd->wsrep_conflict_state!= REPLAYING) + thd->wsrep_conflict_state= NO_CONFLICT; + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + + if (!buf_len) WSREP_DEBUG("empty rbr buffer to apply: %lld", + (long long) thd->wsrep_trx_seqno); + + if ((rcode= trans_begin(thd))) + WSREP_WARN("begin for rbr apply failed: %lld, code: %d", + (long long) thd->wsrep_trx_seqno, rcode); + + while(buf_len) + { + int exec_res; + int error = 0; + Log_event* ev= wsrep_read_log_event(&buf, &buf_len, wsrep_format_desc); + + switch (ev->get_type_code()) { + case WRITE_ROWS_EVENT: + case UPDATE_ROWS_EVENT: + case DELETE_ROWS_EVENT: + DBUG_ASSERT(buf_len != 0 || + ((Rows_log_event *) ev)->get_flags(Rows_log_event::STMT_END_F)); + break; + default: + break; + } + + thd->server_id = ev->server_id; // use the original server id for logging + thd->set_time(); // time the query + wsrep_xid_init(&thd->transaction.xid_state.xid, + wsrep_cluster_uuid(), + thd->wsrep_trx_seqno); + thd->lex->current_select= 0; + if (!ev->when) + ev->when = time(NULL); + ev->thd = thd; + exec_res = ev->apply_event(thd->wsrep_rli); + DBUG_PRINT("info", ("exec_event result: %d", exec_res)); + + if (exec_res) + { + WSREP_WARN("RBR event %d %s apply warning: %d, %lld", + event, ev->get_type_str(), exec_res, (long long) thd->wsrep_trx_seqno); + rcode= exec_res; + /* stop processing for the first error */ + delete ev; + goto error; + } + event++; + + if (thd->wsrep_conflict_state!= NO_CONFLICT && + thd->wsrep_conflict_state!= REPLAYING) + WSREP_WARN("conflict state after RBR event applying: %d, %lld", + thd->wsrep_query_state, (long long)thd->wsrep_trx_seqno); + + if (thd->wsrep_conflict_state == MUST_ABORT) { + WSREP_WARN("RBR event apply failed, rolling back: %lld", + (long long) thd->wsrep_trx_seqno); + trans_rollback(thd); + thd->locked_tables_list.unlock_locked_tables(thd); + /* Release transactional metadata locks. */ + thd->mdl_context.release_transactional_locks(); + thd->wsrep_conflict_state= NO_CONFLICT; + DBUG_RETURN(WSREP_FATAL); + } + + if (ev->get_type_code() != TABLE_MAP_EVENT && + ((Rows_log_event *) ev)->get_flags(Rows_log_event::STMT_END_F)) + { + // TODO: combine with commit on higher level common for the query ws + + thd->wsrep_rli->cleanup_context(thd, 0); + + if (error == 0) + { + thd->clear_error(); + } + else + WSREP_ERROR("Error in %s event: commit of row events failed: %lld", + ev->get_type_str(), (long long)thd->wsrep_trx_seqno); + } + delete ev; + } + + error: + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + thd->wsrep_query_state= QUERY_IDLE; + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + + assert(thd->wsrep_exec_mode== REPL_RECV); + + if (thd->killed == KILL_CONNECTION) + WSREP_INFO("applier aborted: %lld", (long long)thd->wsrep_trx_seqno); + + if (rcode) DBUG_RETURN(WSREP_FATAL); + DBUG_RETURN(WSREP_OK); +} + +wsrep_status_t wsrep_apply_cb(void* const ctx, + const void* const buf, size_t const buf_len, + wsrep_seqno_t const global_seqno) +{ + THD* const thd((THD*)ctx); + + thd->wsrep_trx_seqno= global_seqno; + +#ifdef WSREP_PROC_INFO + snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, + "applying write set %lld: %p, %zu", + (long long)thd->wsrep_trx_seqno, buf, buf_len); + thd_proc_info(thd, thd->wsrep_info); +#else + thd_proc_info(thd, "applying write set"); +#endif /* WSREP_PROC_INFO */ + + wsrep_status_t const rcode(wsrep_apply_rbr(thd, (const uchar*)buf, buf_len)); + +#ifdef WSREP_PROC_INFO + snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, + "applied write set %lld", (long long)thd->wsrep_trx_seqno); + thd_proc_info(thd, thd->wsrep_info); +#else + thd_proc_info(thd, "applied write set"); +#endif /* WSREP_PROC_INFO */ + + if (WSREP_OK != rcode) wsrep_write_rbr_buf(thd, buf, buf_len); + + return rcode; +} + +#if DELETE // this does not work in 5.5 +/* a common wrapper for end_trans() function - to put all necessary stuff */ +static inline wsrep_status_t +wsrep_end_trans (THD* const thd, enum enum_mysql_completiontype const end) +{ + if (0 == end_trans(thd, end)) + { + return WSREP_OK; + } + else + { + return WSREP_FATAL; + } +} +#endif + +wsrep_status_t wsrep_commit(THD* const thd, wsrep_seqno_t const global_seqno) +{ +#ifdef WSREP_PROC_INFO + snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, + "committing %lld", (long long)thd->wsrep_trx_seqno); + thd_proc_info(thd, thd->wsrep_info); +#else + thd_proc_info(thd, "committing"); +#endif /* WSREP_PROC_INFO */ + + wsrep_status_t const rcode(wsrep_apply_sql(thd, "COMMIT", 6, 0, 0)); +// wsrep_status_t const rcode(wsrep_end_trans (thd, COMMIT)); + +#ifdef WSREP_PROC_INFO + snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, + "committed %lld", (long long)thd->wsrep_trx_seqno); + thd_proc_info(thd, thd->wsrep_info); +#else + thd_proc_info(thd, "committed"); +#endif /* WSREP_PROC_INFO */ + + if (WSREP_OK == rcode) + { + // TODO: mark snapshot with global_seqno. + } + + return rcode; +} + +wsrep_status_t wsrep_rollback(THD* const thd, wsrep_seqno_t const global_seqno) +{ +#ifdef WSREP_PROC_INFO + snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, + "rolling back %lld", (long long)thd->wsrep_trx_seqno); + thd_proc_info(thd, thd->wsrep_info); +#else + thd_proc_info(thd, "rolling back"); +#endif /* WSREP_PROC_INFO */ + + wsrep_status_t const rcode(wsrep_apply_sql(thd, "ROLLBACK", 8, 0, 0)); +// wsrep_status_t const rcode(wsrep_end_trans (thd, ROLLBACK)); + +#ifdef WSREP_PROC_INFO + snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, + "rolled back %lld", (long long)thd->wsrep_trx_seqno); + thd_proc_info(thd, thd->wsrep_info); +#else + thd_proc_info(thd, "rolled back"); +#endif /* WSREP_PROC_INFO */ + + return rcode; +} + +wsrep_status_t wsrep_commit_cb(void* const ctx, + wsrep_seqno_t const global_seqno, + bool const commit) +{ + THD* const thd((THD*)ctx); + + assert(global_seqno == thd->wsrep_trx_seqno); + + if (commit) + return wsrep_commit(thd, global_seqno); + else + return wsrep_rollback(thd, global_seqno); +} + +Relay_log_info* wsrep_relay_log_init(const char* log_fname) +{ + Relay_log_info* rli= new Relay_log_info(false); + + rli->no_storage= true; + if (!rli->relay_log.description_event_for_exec) + { + rli->relay_log.description_event_for_exec= + new Format_description_log_event(4); + } + + rli->sql_thd= current_thd; + return rli; +} + +void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow* shadow) +{ + shadow->options = thd->variables.option_bits; + shadow->wsrep_exec_mode = thd->wsrep_exec_mode; + shadow->vio = thd->net.vio; + + if (opt_log_slave_updates) + thd->variables.option_bits|= OPTION_BIN_LOG; + else + thd->variables.option_bits&= ~(OPTION_BIN_LOG); + + if (!thd->wsrep_rli) thd->wsrep_rli= wsrep_relay_log_init("wsrep_relay"); + + thd->wsrep_exec_mode= REPL_RECV; + thd->net.vio= 0; + thd->clear_error(); + + thd->variables.option_bits|= OPTION_NOT_AUTOCOMMIT; + + shadow->tx_isolation = thd->variables.tx_isolation; + thd->variables.tx_isolation = ISO_READ_COMMITTED; + thd->tx_isolation = ISO_READ_COMMITTED; +} + +void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow* shadow) +{ + thd->variables.option_bits = shadow->options; + thd->wsrep_exec_mode = shadow->wsrep_exec_mode; + thd->net.vio = shadow->vio; + thd->variables.tx_isolation = shadow->tx_isolation; +} + +void wsrep_replication_process(THD *thd) +{ + int rcode; + DBUG_ENTER("wsrep_replication_process"); + + struct wsrep_thd_shadow shadow; + wsrep_prepare_bf_thd(thd, &shadow); + + wsrep_format_desc= new Format_description_log_event(4); + + rcode = wsrep->recv(wsrep, (void *)thd); + DBUG_PRINT("wsrep",("wsrep_repl returned: %d", rcode)); + + WSREP_INFO("applier thread exiting (code:%d)", rcode); + + switch (rcode) { + case WSREP_OK: + case WSREP_NOT_IMPLEMENTED: + case WSREP_CONN_FAIL: + /* provider does not support slave operations / disconnected from group, + * just close applier thread */ + break; + case WSREP_NODE_FAIL: + /* data inconsistency => SST is needed */ + /* Note: we cannot just blindly restart replication here, + * SST might require server restart if storage engines must be + * initialized after SST */ + WSREP_ERROR("node consistency compromised, aborting"); + wsrep_kill_mysql(thd); + break; + case WSREP_WARNING: + case WSREP_TRX_FAIL: + case WSREP_TRX_MISSING: + /* these suggests a bug in provider code */ + WSREP_WARN("bad return from recv() call: %d", rcode); + /* fall through to node shutdown */ + case WSREP_FATAL: + /* Cluster connectivity is lost. + * + * If applier was killed on purpose (KILL_CONNECTION), we + * avoid mysql shutdown. This is because the killer will then handle + * shutdown processing (or replication restarting) + */ + if (thd->killed != KILL_CONNECTION) + { + wsrep_kill_mysql(thd); + } + break; + } + + if (thd->killed != KILL_CONNECTION) + { + mysql_mutex_lock(&LOCK_thread_count); + wsrep_close_applier(thd); + mysql_cond_broadcast(&COND_thread_count); + mysql_mutex_unlock(&LOCK_thread_count); + } + + if (thd->killed != KILL_CONNECTION) + { + mysql_mutex_lock(&LOCK_thread_count); + wsrep_close_applier(thd); + mysql_cond_broadcast(&COND_thread_count); + mysql_mutex_unlock(&LOCK_thread_count); + } + wsrep_return_from_bf_mode(thd, &shadow); + DBUG_VOID_RETURN; +} + +void wsrep_rollback_process(THD *thd) +{ + DBUG_ENTER("wsrep_rollback_process"); + + mysql_mutex_lock(&LOCK_wsrep_rollback); + wsrep_aborting_thd= NULL; + + while (thd->killed == NOT_KILLED) { + thd_proc_info(thd, "wsrep aborter idle"); + thd->mysys_var->current_mutex= &LOCK_wsrep_rollback; + thd->mysys_var->current_cond= &COND_wsrep_rollback; + + mysql_cond_wait(&COND_wsrep_rollback,&LOCK_wsrep_rollback); + + WSREP_DEBUG("WSREP rollback thread wakes for signal"); + + mysql_mutex_lock(&thd->mysys_var->mutex); + thd_proc_info(thd, "wsrep aborter active"); + thd->mysys_var->current_mutex= 0; + thd->mysys_var->current_cond= 0; + mysql_mutex_unlock(&thd->mysys_var->mutex); + + /* check for false alarms */ + if (!wsrep_aborting_thd) + { + WSREP_DEBUG("WSREP rollback thread has empty abort queue"); + } + /* process all entries in the queue */ + while (wsrep_aborting_thd) { + THD *aborting; + wsrep_aborting_thd_t next = wsrep_aborting_thd->next; + aborting = wsrep_aborting_thd->aborting_thd; + my_free(wsrep_aborting_thd); + wsrep_aborting_thd= next; + /* + * must release mutex, appliers my want to add more + * aborting thds in our work queue, while we rollback + */ + mysql_mutex_unlock(&LOCK_wsrep_rollback); + + mysql_mutex_lock(&aborting->LOCK_wsrep_thd); + if (aborting->wsrep_conflict_state== ABORTED) + { + WSREP_DEBUG("WSREP, thd already aborted: %llu state: %d", + (long long)aborting->real_id, + aborting->wsrep_conflict_state); + + mysql_mutex_unlock(&aborting->LOCK_wsrep_thd); + mysql_mutex_lock(&LOCK_wsrep_rollback); + continue; + } + aborting->wsrep_conflict_state= ABORTING; + + mysql_mutex_unlock(&aborting->LOCK_wsrep_thd); + + aborting->store_globals(); + + trans_rollback(aborting); + aborting->locked_tables_list.unlock_locked_tables(thd); + /* Release transactional metadata locks. */ + aborting->mdl_context.release_transactional_locks(); + + mysql_mutex_lock(&aborting->LOCK_wsrep_thd); + aborting->wsrep_conflict_state= ABORTED; + WSREP_DEBUG("WSREP rollbacker aborted thd: %llu", + (long long)aborting->real_id); + mysql_mutex_unlock(&aborting->LOCK_wsrep_thd); + mysql_mutex_lock(&LOCK_wsrep_rollback); + } + } + + mysql_mutex_unlock(&LOCK_wsrep_rollback); + sql_print_information("WSREP: rollbacker thread exiting"); + + DBUG_PRINT("wsrep",("wsrep rollbacker thread exiting")); + DBUG_VOID_RETURN; +} +extern "C" +int wsrep_thd_is_brute_force(void *thd_ptr) +{ + if (thd_ptr) { + switch (((THD *)thd_ptr)->wsrep_exec_mode) { + case LOCAL_STATE: + { + if (((THD *)thd_ptr)->wsrep_conflict_state== REPLAYING) + { + return 1; + } + return 0; + } + case REPL_RECV: return 1; + case TOTAL_ORDER: return 2; + case LOCAL_COMMIT: return 3; + } + } + return 0; +} +extern "C" +int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal) +{ + THD *victim_thd = (THD *) victim_thd_ptr; + THD *bf_thd = (THD *) bf_thd_ptr; + DBUG_ENTER("wsrep_abort_thd"); + + if (WSREP(bf_thd) && victim_thd) + { + WSREP_DEBUG("wsrep_abort_thd, by: %llu, victim: %llu", (bf_thd) ? + (long long)bf_thd->real_id : 0, (long long)victim_thd->real_id); + ha_wsrep_abort_transaction(bf_thd, victim_thd, signal); + } + + DBUG_RETURN(1); +} +extern "C" +int wsrep_thd_in_locking_session(void *thd_ptr) +{ + if (thd_ptr && ((THD *)thd_ptr)->in_lock_tables) { + return 1; + } + return 0; +} +#endif /** Retuns information about user or current user. diff --git a/sql/sql_plugin.cc b/sql/sql_plugin.cc index 39f69b2656d..6a1a9221064 100644 --- a/sql/sql_plugin.cc +++ b/sql/sql_plugin.cc @@ -2954,11 +2954,17 @@ void plugin_thdvar_init(THD *thd) thd->variables.dynamic_variables_size= 0; thd->variables.dynamic_variables_ptr= 0; +#ifdef WITH_WSREP + if (!WSREP(thd) || !thd->wsrep_applier) { +#endif mysql_mutex_lock(&LOCK_plugin); thd->variables.table_plugin= my_intern_plugin_lock(NULL, global_system_variables.table_plugin); intern_plugin_unlock(NULL, old_table_plugin); mysql_mutex_unlock(&LOCK_plugin); +#ifdef WITH_WSREP + } +#endif DBUG_VOID_RETURN; } diff --git a/sql/sql_reload.cc b/sql/sql_reload.cc index d2f118b62c9..cb33110d9eb 100644 --- a/sql/sql_reload.cc +++ b/sql/sql_reload.cc @@ -224,7 +224,18 @@ bool reload_acl_and_cache(THD *thd, unsigned long options, } if (options & REFRESH_CHECKPOINT) disable_checkpoints(thd); - } +#ifdef WITH_WSREP + /* + We need to do it second time after wsrep appliers were blocked in + make_global_read_lock_block_commit(thd) above since they could have + modified the tables too. + */ + if (WSREP(thd) && + close_cached_tables(thd, tables, (options & REFRESH_FAST) ? + FALSE : TRUE, TRUE)) + result= 1; +#endif /* WITH_WSREP */ + } else { if (thd && thd->locked_tables_mode) diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index eb17ef0812c..ecb47324552 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -1399,7 +1399,14 @@ int stop_slave(THD* thd, Master_info* mi, bool net_report ) ER(ER_SLAVE_WAS_NOT_RUNNING)); } unlock_slave_threads(mi); +#ifdef WITH_WSREP + if (WSREP(thd)) + thd_proc_info(thd, "exit stop_slave()"); + else + thd_proc_info(thd, 0); +#else /* WITH_WSREP */ thd_proc_info(thd, 0); +#endif /* WITH_WSREP */ if (slave_errno) { @@ -1832,7 +1839,14 @@ bool change_master(THD* thd, Master_info* mi) err: unlock_slave_threads(mi); +#ifdef WITH_WSREP + if (WSREP(thd)) + thd_proc_info(thd, "exit change_master()"); + else + thd_proc_info(thd, 0); +#else /* WITH_WSREP */ thd_proc_info(thd, 0); +#endif /* WITH_WSREP */ if (ret == FALSE) my_ok(thd); DBUG_RETURN(ret); diff --git a/sql/sql_show.cc b/sql/sql_show.cc index d64c7a6df52..da48c7cdc63 100644 --- a/sql/sql_show.cc +++ b/sql/sql_show.cc @@ -59,6 +59,9 @@ #include "datadict.h" // dd_frm_type() #include "keycaches.h" +#if !defined(MYSQL_MAX_VARIABLE_VALUE_LEN) +#define MYSQL_MAX_VARIABLE_VALUE_LEN 1024 +#endif // !defined(MYSQL_MAX_VARIABLE_VALUE_LEN) #define STR_OR_NIL(S) ((S) ? (S) : "<nil>") #ifdef WITH_PARTITION_STORAGE_ENGINE @@ -8088,7 +8091,8 @@ ST_FIELD_INFO variables_fields_info[]= { {"VARIABLE_NAME", 64, MYSQL_TYPE_STRING, 0, 0, "Variable_name", SKIP_OPEN_TABLE}, - {"VARIABLE_VALUE", 1024, MYSQL_TYPE_STRING, 0, 1, "Value", SKIP_OPEN_TABLE}, + {"VARIABLE_VALUE", MYSQL_MAX_VARIABLE_VALUE_LEN, MYSQL_TYPE_STRING, 0, 1, + "Value", SKIP_OPEN_TABLE}, {0, 0, MYSQL_TYPE_STRING, 0, 0, 0, SKIP_OPEN_TABLE} }; diff --git a/sql/sql_table.cc b/sql/sql_table.cc index 01832036701..1fdd81de089 100644 --- a/sql/sql_table.cc +++ b/sql/sql_table.cc @@ -6158,12 +6158,18 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name, error= 0; break; } - if (error == HA_ERR_WRONG_COMMAND) +#ifdef WITH_WSREP + bool do_log_write(true); +#endif /* WITH_WSREP */ + if (error == HA_ERR_WRONG_COMMAND) { error= 0; push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_ILLEGAL_HA, ER(ER_ILLEGAL_HA), table->alias.c_ptr()); +#ifdef WITH_WSREP + WSREP_DEBUG("ignoring DDL failure: %d %s", error, thd->query()); +#endif /* WITH_WSREP */ } if (!error && (new_name != table_name || new_db != db)) @@ -6215,6 +6221,9 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name, push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_ILLEGAL_HA, ER(ER_ILLEGAL_HA), table->alias.c_ptr()); +#ifdef WITH_WSREP + WSREP_DEBUG("ignoring DDL failure: %d %s", error, thd->query()); +#endif /* WITH_WSREP */ } if (!error) diff --git a/sql/sql_trigger.cc b/sql/sql_trigger.cc index 1ac1d7bbb5e..149c2431324 100644 --- a/sql/sql_trigger.cc +++ b/sql/sql_trigger.cc @@ -2452,3 +2452,55 @@ bool load_table_name_for_trigger(THD *thd, DBUG_RETURN(FALSE); } +#ifdef WITH_WSREP +int wsrep_create_trigger_query(THD *thd, uchar** buf, uint* buf_len) +{ + LEX *lex= thd->lex; + String stmt_query; + + LEX_STRING definer_user; + LEX_STRING definer_host; + + if (!lex->definer) + { + if (!thd->slave_thread) + { + if (!(lex->definer= create_default_definer(thd))) + return 1; + } + } + + if (lex->definer) + { + /* SUID trigger. */ + + definer_user= lex->definer->user; + definer_host= lex->definer->host; + } + else + { + /* non-SUID trigger. */ + + definer_user.str= 0; + definer_user.length= 0; + + definer_host.str= 0; + definer_host.length= 0; + } + + stmt_query.append(STRING_WITH_LEN("CREATE ")); + + append_definer(thd, &stmt_query, &definer_user, &definer_host); + + LEX_STRING stmt_definition; + stmt_definition.str= (char*) thd->lex->stmt_definition_begin; + stmt_definition.length= thd->lex->stmt_definition_end + - thd->lex->stmt_definition_begin; + trim_whitespace(thd->charset(), & stmt_definition); + + stmt_query.append(stmt_definition.str, stmt_definition.length); + + return wsrep_to_buf_helper(thd, stmt_query.c_ptr(), stmt_query.length(), + buf, buf_len); +} +#endif /* WITH_WSREP */ diff --git a/sql/sql_truncate.cc b/sql/sql_truncate.cc index 67ed608f114..a66b2377fcb 100644 --- a/sql/sql_truncate.cc +++ b/sql/sql_truncate.cc @@ -24,6 +24,9 @@ #include "sql_acl.h" // DROP_ACL #include "sql_parse.h" // check_one_table_access() #include "sql_truncate.h" +#ifdef WITH_WSREP +#include "wsrep_mysqld.h" +#endif /* WITH_WSREP */ /** @@ -515,9 +518,14 @@ bool Truncate_statement::execute(THD *thd) if (check_one_table_access(thd, DROP_ACL, first_table)) DBUG_RETURN(res); +#ifdef WITH_WSREP + if (WSREP(thd) && wsrep_to_isolation_begin(thd, + first_table->db, + first_table->table_name)) + DBUG_RETURN(TRUE); +#endif /* WITH_WSREP */ if (! (res= truncate_table(thd, first_table))) my_ok(thd); - DBUG_RETURN(res); } diff --git a/sql/sql_update.cc b/sql/sql_update.cc index 5fa30c91417..ddb95f09ce2 100644 --- a/sql/sql_update.cc +++ b/sql/sql_update.cc @@ -902,7 +902,11 @@ int mysql_update(THD *thd, */ if ((error < 0) || thd->transaction.stmt.modified_non_trans_table) { +#ifdef WITH_WSREP + if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) +#else if (mysql_bin_log.is_open()) +#endif { int errcode= 0; if (error < 0) @@ -2045,7 +2049,11 @@ void multi_update::abort_result_set() The query has to binlog because there's a modified non-transactional table either from the query's list or via a stored routine: bug#13270,23333 */ +#ifdef WITH_WSREP + if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) +#else if (mysql_bin_log.is_open()) +#endif { /* THD::killed status might not have been set ON at time of an error @@ -2304,7 +2312,11 @@ bool multi_update::send_eof() if (local_error == 0 || thd->transaction.stmt.modified_non_trans_table) { +#ifdef WITH_WSREP + if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) +#else if (mysql_bin_log.is_open()) +#endif { int errcode= 0; if (local_error == 0) diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 72e9525db72..dd11f51b212 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -3437,6 +3437,206 @@ static Sys_var_tz Sys_time_zone( "time_zone", "time_zone", SESSION_VAR(time_zone), NO_CMD_LINE, DEFAULT(&default_tz), NO_MUTEX_GUARD, IN_BINLOG); +#ifdef WITH_WSREP +#include "wsrep_mysqld.h" + +static Sys_var_charptr Sys_wsrep_provider( + "wsrep_provider", "Path to replication provider library", + GLOBAL_VAR(wsrep_provider), CMD_LINE(REQUIRED_ARG, OPT_WSREP_PROVIDER), + IN_FS_CHARSET, DEFAULT(wsrep_provider), + // IN_FS_CHARSET, DEFAULT(wsrep_provider_default), + NO_MUTEX_GUARD, NOT_IN_BINLOG, + ON_CHECK(wsrep_provider_check), ON_UPDATE(wsrep_provider_update)); + +static Sys_var_charptr Sys_wsrep_provider_options( + "wsrep_provider_options", "provider specific options", + GLOBAL_VAR(wsrep_provider_options), + CMD_LINE(REQUIRED_ARG, OPT_WSREP_PROVIDER_OPTIONS), + IN_FS_CHARSET, DEFAULT(wsrep_provider_options), + NO_MUTEX_GUARD, NOT_IN_BINLOG, + ON_CHECK(wsrep_provider_options_check), + ON_UPDATE(wsrep_provider_options_update)); + +static Sys_var_charptr Sys_wsrep_data_home_dir( + "wsrep_data_home_dir", "home directory for wsrep provider", + READ_ONLY GLOBAL_VAR(wsrep_data_home_dir), CMD_LINE(REQUIRED_ARG), + IN_FS_CHARSET, DEFAULT(""), + NO_MUTEX_GUARD, NOT_IN_BINLOG); + +static Sys_var_charptr Sys_wsrep_cluster_name( + "wsrep_cluster_name", "Name for the cluster", + GLOBAL_VAR(wsrep_cluster_name), CMD_LINE(REQUIRED_ARG), + IN_FS_CHARSET, DEFAULT(wsrep_cluster_name), + NO_MUTEX_GUARD, NOT_IN_BINLOG, + ON_CHECK(wsrep_cluster_name_check), + ON_UPDATE(wsrep_cluster_name_update)); + +static Sys_var_charptr Sys_wsrep_cluster_address ( + "wsrep_cluster_address", "Address to initially connect to cluster", + GLOBAL_VAR(wsrep_cluster_address), + CMD_LINE(REQUIRED_ARG, OPT_WSREP_CLUSTER_ADDRESS), + IN_FS_CHARSET, DEFAULT(wsrep_cluster_address), + NO_MUTEX_GUARD, NOT_IN_BINLOG, + ON_CHECK(wsrep_cluster_address_check), + ON_UPDATE(wsrep_cluster_address_update)); + +static Sys_var_charptr Sys_wsrep_node_name ( + "wsrep_node_name", "Node name", + GLOBAL_VAR(wsrep_node_name), CMD_LINE(REQUIRED_ARG), + IN_FS_CHARSET, DEFAULT(glob_hostname), + NO_MUTEX_GUARD, NOT_IN_BINLOG); + +static Sys_var_charptr Sys_wsrep_node_address ( + "wsrep_node_address", "Node address", + GLOBAL_VAR(wsrep_node_address), CMD_LINE(REQUIRED_ARG), + IN_FS_CHARSET, DEFAULT(wsrep_node_address), + NO_MUTEX_GUARD, NOT_IN_BINLOG, + ON_CHECK(wsrep_node_address_check), + ON_UPDATE(wsrep_node_address_update)); + +static Sys_var_charptr Sys_wsrep_node_incoming_address( + "wsrep_node_incoming_address", "Client connection address", + GLOBAL_VAR(wsrep_node_incoming_address),CMD_LINE(REQUIRED_ARG), + IN_FS_CHARSET, DEFAULT(wsrep_node_incoming_address), + NO_MUTEX_GUARD, NOT_IN_BINLOG, + ON_CHECK(wsrep_node_name_check), + ON_UPDATE(wsrep_node_name_update)); + +static Sys_var_ulong Sys_wsrep_slave_threads( + "wsrep_slave_threads", "Number of slave appliers to launch", + GLOBAL_VAR(wsrep_slave_threads), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(1, 512), DEFAULT(1), BLOCK_SIZE(1)); + +static Sys_var_charptr Sys_wsrep_dbug_option( + "wsrep_dbug_option", "DBUG options to provider library", + GLOBAL_VAR(wsrep_dbug_option),CMD_LINE(REQUIRED_ARG), + IN_FS_CHARSET, DEFAULT(""), + NO_MUTEX_GUARD, NOT_IN_BINLOG); + +static Sys_var_mybool Sys_wsrep_debug( + "wsrep_debug", "To enable debug level logging", + GLOBAL_VAR(wsrep_debug), CMD_LINE(OPT_ARG), DEFAULT(FALSE)); + +static Sys_var_mybool Sys_wsrep_convert_LOCK_to_trx( + "wsrep_convert_LOCK_to_trx", "To convert locking sessions " + "into transactions", + GLOBAL_VAR(wsrep_convert_LOCK_to_trx), + CMD_LINE(OPT_ARG), DEFAULT(FALSE)); + +static Sys_var_ulong Sys_wsrep_retry_autocommit( + "wsrep_retry_autocommit", "Max number of times to retry " + "a failed autocommit statement", + SESSION_VAR(wsrep_retry_autocommit), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(0, 10000), DEFAULT(1), BLOCK_SIZE(1)); + +static Sys_var_mybool Sys_wsrep_auto_increment_control( + "wsrep_auto_increment_control", "To automatically control the " + "assignment of autoincrement variables", + GLOBAL_VAR(wsrep_auto_increment_control), + CMD_LINE(OPT_ARG), DEFAULT(TRUE)); + +static Sys_var_mybool Sys_wsrep_drupal_282555_workaround( + "wsrep_drupal_282555_workaround", "To use a workaround for" + "bad autoincrement value", + GLOBAL_VAR(wsrep_drupal_282555_workaround), + CMD_LINE(OPT_ARG), DEFAULT(FALSE)); + +static Sys_var_charptr sys_wsrep_sst_method( + "wsrep_sst_method", "Snapshot transfer method", + GLOBAL_VAR(wsrep_sst_method),CMD_LINE(REQUIRED_ARG), + IN_FS_CHARSET, DEFAULT(wsrep_sst_method), NO_MUTEX_GUARD, NOT_IN_BINLOG, + ON_CHECK(wsrep_sst_method_check), + ON_UPDATE(wsrep_sst_method_update)); + +static Sys_var_charptr Sys_wsrep_sst_receive_address( + "wsrep_sst_receive_address", "Address where node is waiting for " + "SST contact", + GLOBAL_VAR(wsrep_sst_receive_address),CMD_LINE(REQUIRED_ARG), + IN_FS_CHARSET, DEFAULT(wsrep_sst_receive_address), NO_MUTEX_GUARD, + NOT_IN_BINLOG, + ON_CHECK(wsrep_sst_receive_address_check), + ON_UPDATE(wsrep_sst_receive_address_update)); + +static Sys_var_charptr Sys_wsrep_sst_auth( + "wsrep_sst_auth", "Authentication for SST connection", + GLOBAL_VAR(wsrep_sst_auth), CMD_LINE(REQUIRED_ARG, OPT_WSREP_SST_AUTH), + IN_FS_CHARSET, DEFAULT(wsrep_sst_auth), NO_MUTEX_GUARD, + NOT_IN_BINLOG, + ON_CHECK(wsrep_sst_auth_check), + ON_UPDATE(wsrep_sst_auth_update)); + +static Sys_var_charptr Sys_wsrep_sst_donor( + "wsrep_sst_donor", "preferred donor node for the SST", + GLOBAL_VAR(wsrep_sst_donor),CMD_LINE(REQUIRED_ARG), + IN_FS_CHARSET, DEFAULT(""), NO_MUTEX_GUARD, NOT_IN_BINLOG, + ON_CHECK(wsrep_sst_donor_check), + ON_UPDATE(wsrep_sst_donor_update)); + +static Sys_var_mybool Sys_wsrep_on ( + "wsrep_on", "To enable wsrep replication ", + SESSION_VAR(wsrep_on), + CMD_LINE(OPT_ARG), DEFAULT(TRUE), + NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), + ON_UPDATE(wsrep_on_update)); + +static Sys_var_charptr Sys_wsrep_start_position ( + "wsrep_start_position", "global transaction position to start from ", + GLOBAL_VAR(wsrep_start_position), + CMD_LINE(REQUIRED_ARG, OPT_WSREP_START_POSITION), + IN_FS_CHARSET, DEFAULT(wsrep_start_position), + NO_MUTEX_GUARD, NOT_IN_BINLOG, + ON_CHECK(wsrep_start_position_check), + ON_UPDATE(wsrep_start_position_update)); + +static Sys_var_ulonglong Sys_wsrep_max_ws_size ( + "wsrep_max_ws_size", "Max write set size (bytes)", + GLOBAL_VAR(wsrep_max_ws_size), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(1024, 4294967296ULL), DEFAULT(1073741824ULL), BLOCK_SIZE(1)); + +static Sys_var_ulong Sys_wsrep_max_ws_rows ( + "wsrep_max_ws_rows", "Max number of rows in write set", + GLOBAL_VAR(wsrep_max_ws_rows), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(1, 1048576), DEFAULT(131072), BLOCK_SIZE(1)); + +static Sys_var_charptr Sys_wsrep_notify_cmd( + "wsrep_notify_cmd", "", + GLOBAL_VAR(wsrep_notify_cmd),CMD_LINE(REQUIRED_ARG), + IN_FS_CHARSET, DEFAULT(""), NO_MUTEX_GUARD, NOT_IN_BINLOG); + +static Sys_var_mybool Sys_wsrep_certify_nonPK( + "wsrep_certify_nonPK", "Certify tables with no primary key", + GLOBAL_VAR(wsrep_certify_nonPK), + CMD_LINE(OPT_ARG), DEFAULT(TRUE)); + +static Sys_var_mybool Sys_wsrep_causal_reads( + "wsrep_causal_reads", "Enable \"strictly synchronous\" semantics for read operations", + SESSION_VAR(wsrep_causal_reads), + CMD_LINE(OPT_ARG), DEFAULT(FALSE)); + // ON_UPDATE(wsrep_causal_reads_update)); + +static const char *wsrep_OSU_method_names[]= { "TOI", "RSU", NullS }; +static Sys_var_enum Sys_wsrep_OSU_method( + "wsrep_OSU_method", "Method for Online Schema Upgrade", + GLOBAL_VAR(wsrep_OSU_method_options), CMD_LINE(OPT_ARG), + wsrep_OSU_method_names, DEFAULT(WSREP_OSU_TOI), + NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), + ON_UPDATE(0)); + +static Sys_var_enum Sys_wsrep_forced_binlog_format( + "wsrep_forced_binlog_format", "binlog format to take effect over user's choice", + GLOBAL_VAR(wsrep_forced_binlog_format), + CMD_LINE(REQUIRED_ARG, OPT_BINLOG_FORMAT), + wsrep_binlog_format_names, DEFAULT(BINLOG_FORMAT_UNSPEC), + NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), + ON_UPDATE(0)); + +static Sys_var_mybool Sys_wsrep_recover_datadir( + "wsrep_recover", "Recover database state after crash and exit", + READ_ONLY GLOBAL_VAR(wsrep_recovery), + CMD_LINE(OPT_ARG, OPT_WSREP_RECOVER), DEFAULT(FALSE)); + + +#endif /* WITH_WSREP */ static Sys_var_ulong Sys_sp_cache_size( "stored_program_cache", diff --git a/sql/transaction.cc b/sql/transaction.cc index 94a32200274..2b23b5e19f1 100644 --- a/sql/transaction.cc +++ b/sql/transaction.cc @@ -96,6 +96,9 @@ static bool xa_trans_force_rollback(THD *thd) by ha_rollback()/THD::transaction::cleanup(). */ thd->transaction.xid_state.rm_error= 0; +#ifdef WITH_WSREP + wsrep_register_hton(thd, TRUE); +#endif /* WITH_WSREP */ if (ha_rollback_trans(thd, true)) { my_error(ER_XAER_RMERR, MYF(0)); @@ -134,6 +137,9 @@ bool trans_begin(THD *thd, uint flags) (thd->variables.option_bits & OPTION_TABLE_LOCK)) { thd->variables.option_bits&= ~OPTION_TABLE_LOCK; +#ifdef WITH_WSREP + wsrep_register_hton(thd, TRUE); +#endif /* WITH_WSREP */ thd->server_status&= ~SERVER_STATUS_IN_TRANS; res= test(ha_commit_trans(thd, TRUE)); } @@ -150,6 +156,12 @@ bool trans_begin(THD *thd, uint flags) */ thd->mdl_context.release_transactional_locks(); +#ifdef WITH_WSREP + thd->wsrep_PA_safe= true; + if (thd->wsrep_client_thread && wsrep_causal_wait(thd)) + DBUG_RETURN(TRUE); +#endif /* WITH_WSREP */ + thd->variables.option_bits|= OPTION_BEGIN; thd->server_status|= SERVER_STATUS_IN_TRANS; @@ -177,6 +189,9 @@ bool trans_commit(THD *thd) if (trans_check(thd)) DBUG_RETURN(TRUE); +#ifdef WITH_WSREP + wsrep_register_hton(thd, TRUE); +#endif /* WITH_WSREP */ thd->server_status&= ~SERVER_STATUS_IN_TRANS; res= ha_commit_trans(thd, TRUE); if (res) @@ -220,6 +235,9 @@ bool trans_commit_implicit(THD *thd) /* Safety if one did "drop table" on locked tables */ if (!thd->locked_tables_mode) thd->variables.option_bits&= ~OPTION_TABLE_LOCK; +#ifdef WITH_WSREP + wsrep_register_hton(thd, TRUE); +#endif /* WITH_WSREP */ thd->server_status&= ~SERVER_STATUS_IN_TRANS; res= test(ha_commit_trans(thd, TRUE)); } @@ -251,11 +269,16 @@ bool trans_commit_implicit(THD *thd) bool trans_rollback(THD *thd) { int res; - DBUG_ENTER("trans_rollback"); - - if (trans_check(thd)) + DBUG_ENTER("trans_rollback"); +#ifdef WITH_WSREP + thd->wsrep_PA_safe= true; +#endif /* WITH_WSREP */ + if (trans_check(thd)) DBUG_RETURN(TRUE); +#ifdef WITH_WSREP + wsrep_register_hton(thd, TRUE); +#endif /* WITH_WSREP */ thd->server_status&= ~SERVER_STATUS_IN_TRANS; res= ha_rollback_trans(thd, TRUE); RUN_HOOK(transaction, after_rollback, (thd, FALSE)); @@ -296,6 +319,9 @@ bool trans_commit_stmt(THD *thd) if (thd->transaction.stmt.ha_list) { +#ifdef WITH_WSREP + wsrep_register_hton(thd, FALSE); +#endif /* WITH_WSREP */ res= ha_commit_trans(thd, FALSE); if (! thd->in_active_multi_stmt_transaction()) thd->tx_isolation= (enum_tx_isolation) thd->variables.tx_isolation; @@ -338,9 +364,19 @@ bool trans_rollback_stmt(THD *thd) if (thd->transaction.stmt.ha_list) { +#ifdef WITH_WSREP + wsrep_register_hton(thd, FALSE); +#endif /* WITH_WSREP */ ha_rollback_trans(thd, FALSE); if (thd->transaction_rollback_request && !thd->in_sub_stmt) +#ifdef WITH_WSREP + { + wsrep_register_hton(thd, TRUE); +#endif /* WITH_WSREP */ ha_rollback_trans(thd, TRUE); +#ifdef WITH_WSREP + } +#endif /* WITH_WSREP */ if (! thd->in_active_multi_stmt_transaction()) thd->tx_isolation= (enum_tx_isolation) thd->variables.tx_isolation; } @@ -681,6 +717,9 @@ bool trans_xa_commit(THD *thd) } else if (xa_state == XA_IDLE && thd->lex->xa_opt == XA_ONE_PHASE) { +#ifdef WITH_WSREP + wsrep_register_hton(thd, TRUE); +#endif /* WITH_WSREP */ int r= ha_commit_trans(thd, TRUE); if ((res= test(r))) my_error(r == 1 ? ER_XA_RBROLLBACK : ER_XAER_RMERR, MYF(0)); @@ -702,6 +741,9 @@ bool trans_xa_commit(THD *thd) if (thd->mdl_context.acquire_lock(&mdl_request, thd->variables.lock_wait_timeout)) { +#ifdef WITH_WSREP + wsrep_register_hton(thd, TRUE); +#endif /* WITH_WSREP */ ha_rollback_trans(thd, TRUE); my_error(ER_XAER_RMERR, MYF(0)); } |