diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/log.cc | 31 | ||||
-rw-r--r-- | sql/log.h | 1 | ||||
-rw-r--r-- | sql/wsrep_binlog.cc | 36 | ||||
-rw-r--r-- | sql/wsrep_binlog.h | 2 | ||||
-rw-r--r-- | sql/wsrep_mysqld.h | 5 | ||||
-rw-r--r-- | sql/wsrep_trans_observer.h | 23 |
6 files changed, 31 insertions, 67 deletions
diff --git a/sql/log.cc b/sql/log.cc index ffeb3661783..1da73ab25df 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -10702,7 +10702,6 @@ maria_declare_plugin(binlog) maria_declare_plugin_end; #ifdef WITH_WSREP -#include "wsrep_trans_observer.h" #include "wsrep_mysqld.h" IO_CACHE *wsrep_get_trans_cache(THD * thd) @@ -10725,33 +10724,33 @@ void wsrep_thd_binlog_trx_reset(THD * thd) /* todo: fix autocommit select to not call the caller */ - if (thd_get_ha_data(thd, binlog_hton) != NULL) + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + if (cache_mngr) { - binlog_cache_mngr *const cache_mngr= - (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); - if (cache_mngr) + cache_mngr->reset(false, true); + if (!cache_mngr->stmt_cache.empty()) { - cache_mngr->reset(false, true); - if (!cache_mngr->stmt_cache.empty()) - { - WSREP_DEBUG("pending events in stmt cache, sql: %s", thd->query()); - cache_mngr->stmt_cache.reset(); - } + WSREP_DEBUG("pending events in stmt cache, sql: %s", thd->query()); + cache_mngr->stmt_cache.reset(); } } thd->clear_binlog_table_maps(); DBUG_VOID_RETURN; } - -void thd_binlog_rollback_stmt(THD * thd) +void wsrep_thd_binlog_stmt_rollback(THD * thd) { - WSREP_DEBUG("thd_binlog_rollback_stmt connection: %llu", - thd->thread_id); + DBUG_ENTER("wsrep_thd_binlog_stmt_rollback"); + WSREP_DEBUG("wsrep_thd_binlog_stmt_rollback"); binlog_cache_mngr *const cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); if (cache_mngr) - cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF); + { + thd->binlog_remove_pending_rows_event(TRUE, TRUE); + cache_mngr->stmt_cache.reset(); + } + DBUG_VOID_RETURN; } bool wsrep_stmt_rollback_is_safe(THD* thd) diff --git a/sql/log.h b/sql/log.h index eef81c46ac4..4b80bdfd81f 100644 --- a/sql/log.h +++ b/sql/log.h @@ -1222,6 +1222,7 @@ static inline TC_LOG *get_tc_log_implementation() #ifdef WITH_WSREP IO_CACHE* wsrep_get_trans_cache(THD *); void wsrep_thd_binlog_trx_reset(THD * thd); +void wsrep_thd_binlog_stmt_rollback(THD * thd); #endif /* WITH_WSREP */ class Gtid_list_log_event; diff --git a/sql/wsrep_binlog.cc b/sql/wsrep_binlog.cc index 84fce7c2b2d..787ebc042ae 100644 --- a/sql/wsrep_binlog.cc +++ b/sql/wsrep_binlog.cc @@ -228,40 +228,6 @@ void wsrep_dump_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len) free(filename); } -/* - wsrep exploits binlog's caches even if binlogging itself is not - activated. In such case connection close needs calling - actual binlog's method. - Todo: split binlog hton from its caches to use ones by wsrep - without referring to binlog's stuff. -*/ -int wsrep_binlog_close_connection(THD* thd) -{ - DBUG_ENTER("wsrep_binlog_close_connection"); - if (thd_get_ha_data(thd, binlog_hton) != NULL) - binlog_hton->close_connection (binlog_hton, thd); - DBUG_RETURN(0); -} - -int wsrep_binlog_savepoint_set(THD *thd, void *sv) -{ - if (!wsrep_emulate_bin_log) return 0; - int rcode= binlog_hton->savepoint_set(binlog_hton, thd, sv); - return rcode; -} - -int wsrep_binlog_savepoint_rollback(THD *thd, void *sv) -{ - if (!wsrep_emulate_bin_log) return 0; - int rcode= binlog_hton->savepoint_rollback(binlog_hton, thd, sv); - return rcode; -} - -void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end) -{ - thd->binlog_flush_pending_rows_event(stmt_end); -} - /* Dump replication buffer along with header to a file. */ void wsrep_dump_rbr_buf_with_header(THD *thd, const void *rbr_buf, size_t buf_len) @@ -343,8 +309,6 @@ cleanup1: DBUG_VOID_RETURN; } -#include "log_event.h" - int wsrep_write_skip_event(THD* thd) { DBUG_ENTER("wsrep_write_skip_event"); diff --git a/sql/wsrep_binlog.h b/sql/wsrep_binlog.h index 4e29b30baca..252fbe602d2 100644 --- a/sql/wsrep_binlog.h +++ b/sql/wsrep_binlog.h @@ -50,8 +50,6 @@ void wsrep_dump_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len); void wsrep_dump_rbr_buf_with_header(THD *thd, const void *rbr_buf, size_t buf_len); -int wsrep_binlog_close_connection(THD* thd); - /** Write a skip event into binlog. diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h index 71cbc875b91..a5da8e3bc44 100644 --- a/sql/wsrep_mysqld.h +++ b/sql/wsrep_mysqld.h @@ -405,11 +405,6 @@ extern void wsrep_handle_mdl_conflict(MDL_context *requestor_ctx, MDL_ticket *ticket, const MDL_key *key); -IO_CACHE * get_trans_log(THD * thd); -bool wsrep_trans_cache_is_empty(THD *thd); -void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end); -void thd_binlog_rollback_stmt(THD * thd); -void thd_binlog_trx_reset(THD * thd); enum wsrep_thread_type { WSREP_APPLIER_THREAD=1, diff --git a/sql/wsrep_trans_observer.h b/sql/wsrep_trans_observer.h index 6bb26c40064..98fc63cf783 100644 --- a/sql/wsrep_trans_observer.h +++ b/sql/wsrep_trans_observer.h @@ -335,15 +335,22 @@ static inline int wsrep_before_rollback(THD* thd, bool all) int ret= 0; if (wsrep_is_active(thd)) { - if (!all && thd->in_active_multi_stmt_transaction() && - thd->wsrep_trx().is_streaming() && - !wsrep_stmt_rollback_is_safe(thd)) + if (!all && thd->in_active_multi_stmt_transaction()) { - /* Non-safe statement rollback during SR multi statement - transasction. Self abort the transaction, the actual rollback - and error handling will be done in after statement phase. */ - wsrep_thd_self_abort(thd); - ret= 0; + if (wsrep_emulate_bin_log) + { + wsrep_thd_binlog_stmt_rollback(thd); + } + + if (thd->wsrep_trx().is_streaming() && + !wsrep_stmt_rollback_is_safe(thd)) + { + /* Non-safe statement rollback during SR multi statement + transasction. Self abort the transaction, the actual rollback + and error handling will be done in after statement phase. */ + wsrep_thd_self_abort(thd); + ret= 0; + } } else if (wsrep_is_real(thd, all) && thd->wsrep_trx().state() != wsrep::transaction::s_aborted) |