summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/log.cc31
-rw-r--r--sql/log.h1
-rw-r--r--sql/wsrep_binlog.cc36
-rw-r--r--sql/wsrep_binlog.h2
-rw-r--r--sql/wsrep_mysqld.h5
-rw-r--r--sql/wsrep_trans_observer.h23
6 files changed, 31 insertions, 67 deletions
diff --git a/sql/log.cc b/sql/log.cc
index ffeb3661783..1da73ab25df 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -10702,7 +10702,6 @@ maria_declare_plugin(binlog)
maria_declare_plugin_end;
#ifdef WITH_WSREP
-#include "wsrep_trans_observer.h"
#include "wsrep_mysqld.h"
IO_CACHE *wsrep_get_trans_cache(THD * thd)
@@ -10725,33 +10724,33 @@ void wsrep_thd_binlog_trx_reset(THD * thd)
/*
todo: fix autocommit select to not call the caller
*/
- if (thd_get_ha_data(thd, binlog_hton) != NULL)
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+ if (cache_mngr)
{
- binlog_cache_mngr *const cache_mngr=
- (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
- if (cache_mngr)
+ cache_mngr->reset(false, true);
+ if (!cache_mngr->stmt_cache.empty())
{
- cache_mngr->reset(false, true);
- if (!cache_mngr->stmt_cache.empty())
- {
- WSREP_DEBUG("pending events in stmt cache, sql: %s", thd->query());
- cache_mngr->stmt_cache.reset();
- }
+ WSREP_DEBUG("pending events in stmt cache, sql: %s", thd->query());
+ cache_mngr->stmt_cache.reset();
}
}
thd->clear_binlog_table_maps();
DBUG_VOID_RETURN;
}
-
-void thd_binlog_rollback_stmt(THD * thd)
+void wsrep_thd_binlog_stmt_rollback(THD * thd)
{
- WSREP_DEBUG("thd_binlog_rollback_stmt connection: %llu",
- thd->thread_id);
+ DBUG_ENTER("wsrep_thd_binlog_stmt_rollback");
+ WSREP_DEBUG("wsrep_thd_binlog_stmt_rollback");
binlog_cache_mngr *const cache_mngr=
(binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
if (cache_mngr)
- cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF);
+ {
+ thd->binlog_remove_pending_rows_event(TRUE, TRUE);
+ cache_mngr->stmt_cache.reset();
+ }
+ DBUG_VOID_RETURN;
}
bool wsrep_stmt_rollback_is_safe(THD* thd)
diff --git a/sql/log.h b/sql/log.h
index eef81c46ac4..4b80bdfd81f 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -1222,6 +1222,7 @@ static inline TC_LOG *get_tc_log_implementation()
#ifdef WITH_WSREP
IO_CACHE* wsrep_get_trans_cache(THD *);
void wsrep_thd_binlog_trx_reset(THD * thd);
+void wsrep_thd_binlog_stmt_rollback(THD * thd);
#endif /* WITH_WSREP */
class Gtid_list_log_event;
diff --git a/sql/wsrep_binlog.cc b/sql/wsrep_binlog.cc
index 84fce7c2b2d..787ebc042ae 100644
--- a/sql/wsrep_binlog.cc
+++ b/sql/wsrep_binlog.cc
@@ -228,40 +228,6 @@ void wsrep_dump_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len)
free(filename);
}
-/*
- wsrep exploits binlog's caches even if binlogging itself is not
- activated. In such case connection close needs calling
- actual binlog's method.
- Todo: split binlog hton from its caches to use ones by wsrep
- without referring to binlog's stuff.
-*/
-int wsrep_binlog_close_connection(THD* thd)
-{
- DBUG_ENTER("wsrep_binlog_close_connection");
- if (thd_get_ha_data(thd, binlog_hton) != NULL)
- binlog_hton->close_connection (binlog_hton, thd);
- DBUG_RETURN(0);
-}
-
-int wsrep_binlog_savepoint_set(THD *thd, void *sv)
-{
- if (!wsrep_emulate_bin_log) return 0;
- int rcode= binlog_hton->savepoint_set(binlog_hton, thd, sv);
- return rcode;
-}
-
-int wsrep_binlog_savepoint_rollback(THD *thd, void *sv)
-{
- if (!wsrep_emulate_bin_log) return 0;
- int rcode= binlog_hton->savepoint_rollback(binlog_hton, thd, sv);
- return rcode;
-}
-
-void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end)
-{
- thd->binlog_flush_pending_rows_event(stmt_end);
-}
-
/* Dump replication buffer along with header to a file. */
void wsrep_dump_rbr_buf_with_header(THD *thd, const void *rbr_buf,
size_t buf_len)
@@ -343,8 +309,6 @@ cleanup1:
DBUG_VOID_RETURN;
}
-#include "log_event.h"
-
int wsrep_write_skip_event(THD* thd)
{
DBUG_ENTER("wsrep_write_skip_event");
diff --git a/sql/wsrep_binlog.h b/sql/wsrep_binlog.h
index 4e29b30baca..252fbe602d2 100644
--- a/sql/wsrep_binlog.h
+++ b/sql/wsrep_binlog.h
@@ -50,8 +50,6 @@ void wsrep_dump_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len);
void wsrep_dump_rbr_buf_with_header(THD *thd, const void *rbr_buf,
size_t buf_len);
-int wsrep_binlog_close_connection(THD* thd);
-
/**
Write a skip event into binlog.
diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h
index 71cbc875b91..a5da8e3bc44 100644
--- a/sql/wsrep_mysqld.h
+++ b/sql/wsrep_mysqld.h
@@ -405,11 +405,6 @@ extern void
wsrep_handle_mdl_conflict(MDL_context *requestor_ctx,
MDL_ticket *ticket,
const MDL_key *key);
-IO_CACHE * get_trans_log(THD * thd);
-bool wsrep_trans_cache_is_empty(THD *thd);
-void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end);
-void thd_binlog_rollback_stmt(THD * thd);
-void thd_binlog_trx_reset(THD * thd);
enum wsrep_thread_type {
WSREP_APPLIER_THREAD=1,
diff --git a/sql/wsrep_trans_observer.h b/sql/wsrep_trans_observer.h
index 6bb26c40064..98fc63cf783 100644
--- a/sql/wsrep_trans_observer.h
+++ b/sql/wsrep_trans_observer.h
@@ -335,15 +335,22 @@ static inline int wsrep_before_rollback(THD* thd, bool all)
int ret= 0;
if (wsrep_is_active(thd))
{
- if (!all && thd->in_active_multi_stmt_transaction() &&
- thd->wsrep_trx().is_streaming() &&
- !wsrep_stmt_rollback_is_safe(thd))
+ if (!all && thd->in_active_multi_stmt_transaction())
{
- /* Non-safe statement rollback during SR multi statement
- transasction. Self abort the transaction, the actual rollback
- and error handling will be done in after statement phase. */
- wsrep_thd_self_abort(thd);
- ret= 0;
+ if (wsrep_emulate_bin_log)
+ {
+ wsrep_thd_binlog_stmt_rollback(thd);
+ }
+
+ if (thd->wsrep_trx().is_streaming() &&
+ !wsrep_stmt_rollback_is_safe(thd))
+ {
+ /* Non-safe statement rollback during SR multi statement
+ transasction. Self abort the transaction, the actual rollback
+ and error handling will be done in after statement phase. */
+ wsrep_thd_self_abort(thd);
+ ret= 0;
+ }
}
else if (wsrep_is_real(thd, all) &&
thd->wsrep_trx().state() != wsrep::transaction::s_aborted)