diff options
Diffstat (limited to 'sql/log.cc')
| -rw-r--r-- | sql/log.cc | 122 |
1 files changed, 106 insertions, 16 deletions
diff --git a/sql/log.cc b/sql/log.cc index a56117a4ac1..68e34513d40 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -55,10 +55,14 @@ #include "sql_show.h" #include "my_pthread.h" #include "semisync_master.h" -#include "wsrep_mysqld.h" #include "sp_rcontext.h" #include "sp_head.h" +#include "wsrep_mysqld.h" +#ifdef WITH_WSREP +#include "wsrep_trans_observer.h" +#endif /* WITH_WSREP */ + /* max size of the log message */ #define MAX_LOG_BUFFER_SIZE 1024 #define MAX_TIME_SIZE 32 @@ -1703,7 +1707,7 @@ static int binlog_close_connection(handlerton *hton, THD *thd) (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); #ifdef WITH_WSREP if (cache_mngr && !cache_mngr->trx_cache.empty()) { - IO_CACHE* cache= get_trans_log(thd); + IO_CACHE* cache= cache_mngr->get_binlog_cache_log(true); uchar *buf; size_t len=0; wsrep_write_cache_buf(cache, &buf, &len); @@ -2297,8 +2301,17 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv) non-transactional table. Otherwise, truncate the binlog cache starting from the SAVEPOINT command. */ +#ifdef WITH_WSREP + /* for streaming replication, we must replicate savepoint rollback so that + slaves can maintain SR transactions + */ + if (unlikely(thd->wsrep_trx().is_streaming() || + (trans_has_updated_non_trans_table(thd)) || + (thd->variables.option_bits & OPTION_KEEP_LOG))) +#else if (unlikely(trans_has_updated_non_trans_table(thd) || (thd->variables.option_bits & OPTION_KEEP_LOG))) +#endif /* WITH_WSREP */ { char buf[1024]; String log_query(buf, sizeof(buf), &my_charset_bin); @@ -5970,7 +5983,9 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone, DBUG_PRINT("enter", ("standalone: %d", standalone)); #ifdef WITH_WSREP - if (WSREP(thd) && thd->wsrep_trx_meta.gtid.seqno != -1 && wsrep_gtid_mode && !thd->variables.gtid_seq_no) + if (WSREP(thd) && + (wsrep_thd_trx_seqno(thd) > 0) && + wsrep_gtid_mode && !thd->variables.gtid_seq_no) { domain_id= wsrep_gtid_domain_id; } else { @@ -6287,7 +6302,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) */ /* applier and replayer can skip writing binlog events */ if ((WSREP_EMULATE_BINLOG(thd) && - IF_WSREP(thd->wsrep_exec_mode != REPL_RECV, 0)) || is_open()) + IF_WSREP(thd->wsrep_cs().mode() == wsrep::client_state::m_local, 0)) || is_open()) { my_off_t UNINIT_VAR(my_org_b_tell); #ifdef HAVE_REPLICATION @@ -7670,7 +7685,11 @@ bool MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) { int is_leader= queue_for_group_commit(entry); - +#ifdef WITH_WSREP + if (is_leader >= 0 && + wsrep_ordered_commit(entry->thd, entry->all, wsrep_apply_error())) + return true; +#endif /* WITH_WSREP */ /* The first in the queue handles group commit for all; the others just wait to be signalled when group commit is done. @@ -10592,7 +10611,10 @@ maria_declare_plugin(binlog) maria_declare_plugin_end; #ifdef WITH_WSREP -IO_CACHE * get_trans_log(THD * thd) +#include "wsrep_trans_observer.h" +#include "wsrep_mysqld.h" + +IO_CACHE *wsrep_get_trans_cache(THD * thd) { DBUG_ASSERT(binlog_hton->slot != HA_SLOT_UNDEF); binlog_cache_mngr *cache_mngr = (binlog_cache_mngr*) @@ -10605,17 +10627,10 @@ IO_CACHE * get_trans_log(THD * thd) return NULL; } - -bool wsrep_trans_cache_is_empty(THD *thd) -{ - binlog_cache_mngr *const cache_mngr= - (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); - return (!cache_mngr || cache_mngr->trx_cache.empty()); -} - - -void thd_binlog_trx_reset(THD * thd) +void wsrep_thd_binlog_trx_reset(THD * thd) { + DBUG_ENTER("wsrep_thd_binlog_trx_reset"); + WSREP_DEBUG("wsrep_thd_binlog_reset"); /* todo: fix autocommit select to not call the caller */ @@ -10634,6 +10649,7 @@ void thd_binlog_trx_reset(THD * thd) } } thd->clear_binlog_table_maps(); + DBUG_VOID_RETURN; } @@ -10646,4 +10662,78 @@ void thd_binlog_rollback_stmt(THD * thd) if (cache_mngr) cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF); } + +bool wsrep_stmt_rollback_is_safe(THD* thd) +{ + bool ret(true); + + DBUG_ENTER("wsrep_binlog_stmt_rollback_is_safe"); + + binlog_cache_mngr *cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + + + if (binlog_hton && cache_mngr) + { + binlog_cache_data * trx_cache = &cache_mngr->trx_cache; + if (thd->wsrep_sr().fragments_certified() > 0 && + (trx_cache->get_prev_position() == MY_OFF_T_UNDEF || + trx_cache->get_prev_position() < thd->wsrep_sr().bytes_certified())) + { + WSREP_DEBUG("statement rollback is not safe for streaming replication" + " pre-stmt_pos: %llu, frag repl pos: %lu\n" + "Thread: %llu, SQL: %s", + trx_cache->get_prev_position(), + thd->wsrep_sr().bytes_certified(), + thd->thread_id, thd->query()); + ret = false; + } + } + DBUG_RETURN(ret); +} + +void wsrep_register_binlog_handler(THD *thd, bool trx) +{ + DBUG_ENTER("register_binlog_handler"); + /* + If this is the first call to this function while processing a statement, + the transactional cache does not have a savepoint defined. So, in what + follows: + . an implicit savepoint is defined; + . callbacks are registered; + . binary log is set as read/write. + + The savepoint allows for truncating the trx-cache transactional changes + fail. Callbacks are necessary to flush caches upon committing or rolling + back a statement or a transaction. However, notifications do not happen + if the binary log is set as read/write. + */ + //binlog_cache_mngr *cache_mngr= thd_get_cache_mngr(thd); + binlog_cache_mngr *cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + /* cache_mngr may be missing e.g. in mtr test ev51914.test */ + if (cache_mngr && cache_mngr->trx_cache.get_prev_position() == MY_OFF_T_UNDEF) + { + /* + Set an implicit savepoint in order to be able to truncate a trx-cache. + */ + my_off_t pos= 0; + binlog_trans_log_savepos(thd, &pos); + cache_mngr->trx_cache.set_prev_position(pos); + + /* + Set callbacks in order to be able to call commmit or rollback. + */ + if (trx) + trans_register_ha(thd, TRUE, binlog_hton); + trans_register_ha(thd, FALSE, binlog_hton); + + /* + Set the binary log as read/write otherwise callbacks are not called. + */ + thd->ha_data[binlog_hton->slot].ha_info[0].set_trx_read_write(); + } + DBUG_VOID_RETURN; +} + #endif /* WITH_WSREP */ |
