diff options
Diffstat (limited to 'sql/wsrep_hton.cc')
-rw-r--r-- | sql/wsrep_hton.cc | 173 |
1 files changed, 81 insertions, 92 deletions
diff --git a/sql/wsrep_hton.cc b/sql/wsrep_hton.cc index d4bb77c9e6f..8eb5340dd58 100644 --- a/sql/wsrep_hton.cc +++ b/sql/wsrep_hton.cc @@ -18,7 +18,7 @@ #include "rpl_filter.h" #include <sql_class.h> #include "wsrep_mysqld.h" -#include "wsrep_priv.h" +#include "wsrep_binlog.h" #include <cstdio> #include <cstdlib> @@ -26,10 +26,11 @@ extern handlerton *binlog_hton; extern int binlog_close_connection(handlerton *hton, THD *thd); extern ulonglong thd_to_trx_id(THD *thd); -extern "C" int thd_binlog_format(const MYSQL_THD thd); -// todo: share interface with ha_innodb.c +extern "C" int thd_binlog_format(const MYSQL_THD thd); +// todo: share interface with ha_innodb.c -enum wsrep_trx_status wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all); +enum wsrep_trx_status wsrep_run_wsrep_commit(THD *thd, handlerton *hton, + bool all); /* Cleanup after local transaction commit/rollback, replay or TOI. @@ -37,8 +38,9 @@ enum wsrep_trx_status wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool al void wsrep_cleanup_transaction(THD *thd) { if (wsrep_emulate_bin_log) thd_binlog_trx_reset(thd); - thd->wsrep_trx_handle.trx_id= WSREP_UNDEFINED_TRX_ID; - thd->wsrep_trx_seqno= WSREP_SEQNO_UNDEFINED; + thd->wsrep_ws_handle.trx_id= WSREP_UNDEFINED_TRX_ID; + thd->wsrep_trx_meta.gtid= WSREP_GTID_UNDEFINED; + thd->wsrep_trx_meta.depends_on= WSREP_SEQNO_UNDEFINED; thd->wsrep_exec_mode= LOCAL_STATE; return; } @@ -66,7 +68,7 @@ handlerton *wsrep_hton; */ void wsrep_register_hton(THD* thd, bool all) { - if (thd->wsrep_exec_mode != TOTAL_ORDER) + if (thd->wsrep_exec_mode != TOTAL_ORDER && !thd->wsrep_apply_toi) { THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt; for (Ha_trx_info *i= trans->ha_list; WSREP(thd) && i; i = i->next()) @@ -94,8 +96,8 @@ void wsrep_post_commit(THD* thd, bool all) { if (thd->wsrep_exec_mode == LOCAL_COMMIT) { - DBUG_ASSERT(thd->wsrep_trx_seqno != WSREP_SEQNO_UNDEFINED); - if (wsrep->post_commit(wsrep, &thd->wsrep_trx_handle)) + DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno != WSREP_SEQNO_UNDEFINED); + if (wsrep->post_commit(wsrep, &thd->wsrep_ws_handle)) { DBUG_PRINT("wsrep", ("set committed fail")); WSREP_WARN("set committed fail: %llu %d", @@ -106,7 +108,7 @@ void wsrep_post_commit(THD* thd, bool all) } /* - wsrep exploits binlog's caches even if binlogging itself is not + wsrep exploits binlog's caches even if binlogging itself is not activated. In such case connection close needs calling actual binlog's method. Todo: split binlog hton from its caches to use ones by wsrep @@ -125,7 +127,7 @@ wsrep_close_connection(handlerton* hton, THD* thd) if (wsrep_emulate_bin_log && thd_get_ha_data(thd, binlog_hton) != NULL) binlog_hton->close_connection (binlog_hton, thd); DBUG_RETURN(0); -} +} /* prepare/wsrep_run_wsrep_commit can fail in two ways @@ -147,18 +149,15 @@ static int wsrep_prepare(handlerton *hton, THD *thd, bool all) DBUG_ASSERT(thd->ha_data[wsrep_hton->slot].ha_info[all].is_trx_read_write()); DBUG_ASSERT(thd->wsrep_exec_mode == LOCAL_STATE); - DBUG_ASSERT(thd->wsrep_trx_seqno == WSREP_SEQNO_UNDEFINED); + DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno == WSREP_SEQNO_UNDEFINED); - if ((all || + if ((all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) && (thd->variables.wsrep_on && !wsrep_trans_cache_is_empty(thd))) { switch (wsrep_run_wsrep_commit(thd, hton, all)) { case WSREP_TRX_OK: - // DBUG_ASSERT(thd->wsrep_trx_seqno > old || - // thd->wsrep_exec_mode == REPL_RECV || - // thd->wsrep_exec_mode == TOTAL_ORDER); break; case WSREP_TRX_ROLLBACK: case WSREP_TRX_ERROR: @@ -208,10 +207,10 @@ static int wsrep_rollback(handlerton *hton, THD *thd, bool all) if ((all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) && (thd->variables.wsrep_on && thd->wsrep_conflict_state != MUST_REPLAY)) { - if (wsrep->post_rollback(wsrep, &thd->wsrep_trx_handle)) + if (wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle)) { DBUG_PRINT("wsrep", ("setting rollback fail")); - WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s", + WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s", (long long)thd->real_id, thd->query()); } wsrep_cleanup_transaction(thd); @@ -249,12 +248,12 @@ int wsrep_commit(handlerton *hton, THD *thd, bool all) possible changes to clean state. */ if (WSREP_PROVIDER_EXISTS) { - if (wsrep->post_rollback(wsrep, &thd->wsrep_trx_handle)) - { - DBUG_PRINT("wsrep", ("setting rollback fail")); - WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s", - (long long)thd->real_id, thd->query()); - } + if (wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle)) + { + DBUG_PRINT("wsrep", ("setting rollback fail")); + WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s", + (long long)thd->real_id, thd->query()); + } } wsrep_cleanup_transaction(thd); } @@ -266,26 +265,24 @@ int wsrep_commit(handlerton *hton, THD *thd, bool all) extern Rpl_filter* binlog_filter; extern my_bool opt_log_slave_updates; -extern void wsrep_write_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len); + enum wsrep_trx_status -wsrep_run_wsrep_commit( - THD *thd, handlerton *hton, bool all) +wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all) { - int rcode = -1; - uint data_len = 0; - uchar *rbr_data = NULL; + int rcode= -1; + size_t data_len= 0; IO_CACHE *cache; int replay_round= 0; if (thd->stmt_da->is_error()) { - WSREP_ERROR("commit issue, error: %d %s", + WSREP_ERROR("commit issue, error: %d %s", thd->stmt_da->sql_errno(), thd->stmt_da->message()); } DBUG_ENTER("wsrep_run_wsrep_commit"); - if (thd->slave_thread && !opt_log_slave_updates) { - DBUG_RETURN(WSREP_TRX_OK); - } + + if (thd->slave_thread && !opt_log_slave_updates) DBUG_RETURN(WSREP_TRX_OK); + if (thd->wsrep_exec_mode == REPL_RECV) { mysql_mutex_lock(&thd->LOCK_wsrep_thd); @@ -303,9 +300,9 @@ wsrep_run_wsrep_commit( } mysql_mutex_unlock(&thd->LOCK_wsrep_thd); } - if (thd->wsrep_exec_mode != LOCAL_STATE) { - DBUG_RETURN(WSREP_TRX_OK); - } + + if (thd->wsrep_exec_mode != LOCAL_STATE) DBUG_RETURN(WSREP_TRX_OK); + if (thd->wsrep_consistency_check == CONSISTENCY_CHECK_RUNNING) { WSREP_DEBUG("commit for consistency check: %s", thd->query()); DBUG_RETURN(WSREP_TRX_OK); @@ -327,10 +324,10 @@ wsrep_run_wsrep_commit( mysql_mutex_lock(&LOCK_wsrep_replaying); - while (wsrep_replaying > 0 && + while (wsrep_replaying > 0 && thd->wsrep_conflict_state == NO_CONFLICT && thd->killed == NOT_KILLED && - !shutdown_in_progress) + !shutdown_in_progress) { mysql_mutex_unlock(&LOCK_wsrep_replaying); @@ -348,9 +345,12 @@ wsrep_run_wsrep_commit( struct timespec wtime = {0, 1000000}; mysql_cond_timedwait(&COND_wsrep_replaying, &LOCK_wsrep_replaying, &wtime); + if (replay_round++ % 100000 == 0) - WSREP_DEBUG("commit waiting for replaying: replayers %d, thd: (%lu) conflict: %d (round: %d)", - wsrep_replaying, thd->thread_id, thd->wsrep_conflict_state, replay_round); + WSREP_DEBUG("commit waiting for replaying: replayers %d, thd: (%lu) " + "conflict: %d (round: %d)", + wsrep_replaying, thd->thread_id, + thd->wsrep_conflict_state, replay_round); mysql_mutex_unlock(&LOCK_wsrep_replaying); @@ -371,7 +371,8 @@ wsrep_run_wsrep_commit( WSREP_DEBUG("innobase_commit abort after replaying wait %s", (thd->query()) ? thd->query() : "void"); DBUG_RETURN(WSREP_TRX_ROLLBACK); - } + } + thd->wsrep_query_state = QUERY_COMMITTING; mysql_mutex_unlock(&thd->LOCK_wsrep_thd); @@ -379,28 +380,28 @@ wsrep_run_wsrep_commit( rcode = 0; if (cache) { thd->binlog_flush_pending_rows_event(true); - rcode = wsrep_write_cache(cache, &rbr_data, &data_len); - if (rcode) { - WSREP_ERROR("rbr write fail, data_len: %d, %d", data_len, rcode); - if (data_len) my_free(rbr_data); + rcode = wsrep_write_cache(wsrep, thd, cache, &data_len); + if (WSREP_OK != rcode) { + WSREP_ERROR("rbr write fail, data_len: %zu, %d", data_len, rcode); DBUG_RETURN(WSREP_TRX_ROLLBACK); } } - if (data_len == 0) + + if (data_len == 0) { - if (thd->stmt_da->is_ok() && + if (thd->stmt_da->is_ok() && thd->stmt_da->affected_rows() > 0 && !binlog_filter->is_on()) { WSREP_DEBUG("empty rbr buffer, query: %s, " - "affected rows: %llu, " - "changed tables: %d, " + "affected rows: %llu, " + "changed tables: %d, " "sql_log_bin: %d, " - "wsrep status (%d %d %d)", + "wsrep status (%d %d %d)", thd->query(), thd->stmt_da->affected_rows(), stmt_has_updated_trans_table(thd), thd->variables.sql_log_bin, - thd->wsrep_exec_mode, thd->wsrep_query_state, - thd->wsrep_conflict_state); + thd->wsrep_exec_mode, thd->wsrep_query_state, + thd->wsrep_conflict_state); } else { @@ -409,38 +410,33 @@ wsrep_run_wsrep_commit( thd->wsrep_query_state= QUERY_EXEC; DBUG_RETURN(WSREP_TRX_OK); } - if (WSREP_UNDEFINED_TRX_ID == thd->wsrep_trx_handle.trx_id) + + if (WSREP_UNDEFINED_TRX_ID == thd->wsrep_ws_handle.trx_id) { - WSREP_WARN("SQL statement was ineffective, THD: %lu, buf: %d\n" - "QUERY: %s\n" - " => Skipping replication", - thd->thread_id, data_len, thd->query()); - if (wsrep_debug) - { - wsrep_write_rbr_buf(thd, rbr_data, data_len); - } + WSREP_WARN("SQL statement was ineffective, THD: %lu, buf: %zu\n" + "QUERY: %s\n" + " => Skipping replication", + thd->thread_id, data_len, thd->query()); rcode = WSREP_TRX_FAIL; } else if (!rcode) { - rcode = wsrep->pre_commit( - wsrep, - (wsrep_conn_id_t)thd->thread_id, - &thd->wsrep_trx_handle, - rbr_data, - data_len, - (thd->wsrep_PA_safe) ? WSREP_FLAG_PA_SAFE : 0ULL, - &thd->wsrep_trx_seqno); - switch (rcode) { - case WSREP_TRX_MISSING: + if (WSREP_OK == rcode) + rcode = wsrep->pre_commit(wsrep, + (wsrep_conn_id_t)thd->thread_id, + &thd->wsrep_ws_handle, + WSREP_FLAG_COMMIT | + ((thd->wsrep_PA_safe) ? + 0ULL : WSREP_FLAG_PA_UNSAFE), + &thd->wsrep_trx_meta); + + if (rcode == WSREP_TRX_MISSING) { WSREP_WARN("Transaction missing in provider, thd: %ld, SQL: %s", thd->thread_id, thd->query()); - wsrep_write_rbr_buf(thd, rbr_data, data_len); rcode = WSREP_TRX_FAIL; - break; - case WSREP_BF_ABORT: + } else if (rcode == WSREP_BF_ABORT) { WSREP_DEBUG("thd %lu seqno %lld BF aborted by provider, will replay", - thd->thread_id, (long long)thd->wsrep_trx_seqno); + thd->thread_id, (long long)thd->wsrep_trx_meta.gtid.seqno); mysql_mutex_lock(&thd->LOCK_wsrep_thd); thd->wsrep_conflict_state = MUST_REPLAY; mysql_mutex_unlock(&thd->LOCK_wsrep_thd); @@ -449,22 +445,14 @@ wsrep_run_wsrep_commit( WSREP_DEBUG("replaying increased: %d, thd: %lu", wsrep_replaying, thd->thread_id); mysql_mutex_unlock(&LOCK_wsrep_replaying); - break; - default: - break; } } else { WSREP_ERROR("I/O error reading from thd's binlog iocache: " "errno=%d, io cache code=%d", my_errno, cache->error); - if (data_len) my_free(rbr_data); DBUG_ASSERT(0); // failure like this can not normally happen DBUG_RETURN(WSREP_TRX_ERROR); } - if (data_len) { - my_free(rbr_data); - } - mysql_mutex_lock(&thd->LOCK_wsrep_thd); switch(rcode) { case 0: @@ -481,22 +469,22 @@ wsrep_run_wsrep_commit( { WSREP_WARN("thd %lu seqno %lld: conflict state %d after post commit", thd->thread_id, - (long long)thd->wsrep_trx_seqno, + (long long)thd->wsrep_trx_meta.gtid.seqno, thd->wsrep_conflict_state); } thd->wsrep_exec_mode= LOCAL_COMMIT; - DBUG_ASSERT(thd->wsrep_trx_seqno != WSREP_SEQNO_UNDEFINED); + DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno != WSREP_SEQNO_UNDEFINED); /* Override XID iff it was generated by mysql */ if (thd->transaction.xid_state.xid.get_my_xid()) { wsrep_xid_init(&thd->transaction.xid_state.xid, - wsrep_cluster_uuid(), - thd->wsrep_trx_seqno); + &thd->wsrep_trx_meta.gtid.uuid, + thd->wsrep_trx_meta.gtid.seqno); } DBUG_PRINT("wsrep", ("replicating commit success")); break; case WSREP_BF_ABORT: - DBUG_ASSERT(thd->wsrep_trx_seqno != WSREP_SEQNO_UNDEFINED); + DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno != WSREP_SEQNO_UNDEFINED); case WSREP_TRX_FAIL: WSREP_DEBUG("commit failed for reason: %d", rcode); DBUG_PRINT("wsrep", ("replicating commit fail")); @@ -505,7 +493,7 @@ wsrep_run_wsrep_commit( if (thd->wsrep_conflict_state == MUST_ABORT) { thd->wsrep_conflict_state= ABORTED; - } + } else { WSREP_DEBUG("conflict state: %d", thd->wsrep_conflict_state); @@ -563,14 +551,15 @@ mysql_declare_plugin(wsrep) &wsrep_storage_engine, "wsrep", "Codership Oy", - "A pseudo storage engine to represent transactions in multi-master synchornous replication", + "A pseudo storage engine to represent transactions in multi-master " + "synchornous replication", PLUGIN_LICENSE_GPL, wsrep_hton_init, /* Plugin Init */ NULL, /* Plugin Deinit */ 0x0100 /* 1.0 */, NULL, /* status variables */ NULL, /* system variables */ - NULL, /* config options */ + NULL, /* config options */ 0, /* flags */ } mysql_declare_plugin_end; |