summaryrefslogtreecommitdiff
path: root/sql/wsrep_hton.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/wsrep_hton.cc')
-rw-r--r--sql/wsrep_hton.cc173
1 files changed, 81 insertions, 92 deletions
diff --git a/sql/wsrep_hton.cc b/sql/wsrep_hton.cc
index d4bb77c9e6f..8eb5340dd58 100644
--- a/sql/wsrep_hton.cc
+++ b/sql/wsrep_hton.cc
@@ -18,7 +18,7 @@
#include "rpl_filter.h"
#include <sql_class.h>
#include "wsrep_mysqld.h"
-#include "wsrep_priv.h"
+#include "wsrep_binlog.h"
#include <cstdio>
#include <cstdlib>
@@ -26,10 +26,11 @@ extern handlerton *binlog_hton;
extern int binlog_close_connection(handlerton *hton, THD *thd);
extern ulonglong thd_to_trx_id(THD *thd);
-extern "C" int thd_binlog_format(const MYSQL_THD thd);
-// todo: share interface with ha_innodb.c
+extern "C" int thd_binlog_format(const MYSQL_THD thd);
+// todo: share interface with ha_innodb.c
-enum wsrep_trx_status wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all);
+enum wsrep_trx_status wsrep_run_wsrep_commit(THD *thd, handlerton *hton,
+ bool all);
/*
Cleanup after local transaction commit/rollback, replay or TOI.
@@ -37,8 +38,9 @@ enum wsrep_trx_status wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool al
void wsrep_cleanup_transaction(THD *thd)
{
if (wsrep_emulate_bin_log) thd_binlog_trx_reset(thd);
- thd->wsrep_trx_handle.trx_id= WSREP_UNDEFINED_TRX_ID;
- thd->wsrep_trx_seqno= WSREP_SEQNO_UNDEFINED;
+ thd->wsrep_ws_handle.trx_id= WSREP_UNDEFINED_TRX_ID;
+ thd->wsrep_trx_meta.gtid= WSREP_GTID_UNDEFINED;
+ thd->wsrep_trx_meta.depends_on= WSREP_SEQNO_UNDEFINED;
thd->wsrep_exec_mode= LOCAL_STATE;
return;
}
@@ -66,7 +68,7 @@ handlerton *wsrep_hton;
*/
void wsrep_register_hton(THD* thd, bool all)
{
- if (thd->wsrep_exec_mode != TOTAL_ORDER)
+ if (thd->wsrep_exec_mode != TOTAL_ORDER && !thd->wsrep_apply_toi)
{
THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt;
for (Ha_trx_info *i= trans->ha_list; WSREP(thd) && i; i = i->next())
@@ -94,8 +96,8 @@ void wsrep_post_commit(THD* thd, bool all)
{
if (thd->wsrep_exec_mode == LOCAL_COMMIT)
{
- DBUG_ASSERT(thd->wsrep_trx_seqno != WSREP_SEQNO_UNDEFINED);
- if (wsrep->post_commit(wsrep, &thd->wsrep_trx_handle))
+ DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno != WSREP_SEQNO_UNDEFINED);
+ if (wsrep->post_commit(wsrep, &thd->wsrep_ws_handle))
{
DBUG_PRINT("wsrep", ("set committed fail"));
WSREP_WARN("set committed fail: %llu %d",
@@ -106,7 +108,7 @@ void wsrep_post_commit(THD* thd, bool all)
}
/*
- wsrep exploits binlog's caches even if binlogging itself is not
+ wsrep exploits binlog's caches even if binlogging itself is not
activated. In such case connection close needs calling
actual binlog's method.
Todo: split binlog hton from its caches to use ones by wsrep
@@ -125,7 +127,7 @@ wsrep_close_connection(handlerton* hton, THD* thd)
if (wsrep_emulate_bin_log && thd_get_ha_data(thd, binlog_hton) != NULL)
binlog_hton->close_connection (binlog_hton, thd);
DBUG_RETURN(0);
-}
+}
/*
prepare/wsrep_run_wsrep_commit can fail in two ways
@@ -147,18 +149,15 @@ static int wsrep_prepare(handlerton *hton, THD *thd, bool all)
DBUG_ASSERT(thd->ha_data[wsrep_hton->slot].ha_info[all].is_trx_read_write());
DBUG_ASSERT(thd->wsrep_exec_mode == LOCAL_STATE);
- DBUG_ASSERT(thd->wsrep_trx_seqno == WSREP_SEQNO_UNDEFINED);
+ DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno == WSREP_SEQNO_UNDEFINED);
- if ((all ||
+ if ((all ||
!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) &&
(thd->variables.wsrep_on && !wsrep_trans_cache_is_empty(thd)))
{
switch (wsrep_run_wsrep_commit(thd, hton, all))
{
case WSREP_TRX_OK:
- // DBUG_ASSERT(thd->wsrep_trx_seqno > old ||
- // thd->wsrep_exec_mode == REPL_RECV ||
- // thd->wsrep_exec_mode == TOTAL_ORDER);
break;
case WSREP_TRX_ROLLBACK:
case WSREP_TRX_ERROR:
@@ -208,10 +207,10 @@ static int wsrep_rollback(handlerton *hton, THD *thd, bool all)
if ((all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) &&
(thd->variables.wsrep_on && thd->wsrep_conflict_state != MUST_REPLAY))
{
- if (wsrep->post_rollback(wsrep, &thd->wsrep_trx_handle))
+ if (wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle))
{
DBUG_PRINT("wsrep", ("setting rollback fail"));
- WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s",
+ WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s",
(long long)thd->real_id, thd->query());
}
wsrep_cleanup_transaction(thd);
@@ -249,12 +248,12 @@ int wsrep_commit(handlerton *hton, THD *thd, bool all)
possible changes to clean state.
*/
if (WSREP_PROVIDER_EXISTS) {
- if (wsrep->post_rollback(wsrep, &thd->wsrep_trx_handle))
- {
- DBUG_PRINT("wsrep", ("setting rollback fail"));
- WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s",
- (long long)thd->real_id, thd->query());
- }
+ if (wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle))
+ {
+ DBUG_PRINT("wsrep", ("setting rollback fail"));
+ WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s",
+ (long long)thd->real_id, thd->query());
+ }
}
wsrep_cleanup_transaction(thd);
}
@@ -266,26 +265,24 @@ int wsrep_commit(handlerton *hton, THD *thd, bool all)
extern Rpl_filter* binlog_filter;
extern my_bool opt_log_slave_updates;
-extern void wsrep_write_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len);
+
enum wsrep_trx_status
-wsrep_run_wsrep_commit(
- THD *thd, handlerton *hton, bool all)
+wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all)
{
- int rcode = -1;
- uint data_len = 0;
- uchar *rbr_data = NULL;
+ int rcode= -1;
+ size_t data_len= 0;
IO_CACHE *cache;
int replay_round= 0;
if (thd->stmt_da->is_error()) {
- WSREP_ERROR("commit issue, error: %d %s",
+ WSREP_ERROR("commit issue, error: %d %s",
thd->stmt_da->sql_errno(), thd->stmt_da->message());
}
DBUG_ENTER("wsrep_run_wsrep_commit");
- if (thd->slave_thread && !opt_log_slave_updates) {
- DBUG_RETURN(WSREP_TRX_OK);
- }
+
+ if (thd->slave_thread && !opt_log_slave_updates) DBUG_RETURN(WSREP_TRX_OK);
+
if (thd->wsrep_exec_mode == REPL_RECV) {
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
@@ -303,9 +300,9 @@ wsrep_run_wsrep_commit(
}
mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
}
- if (thd->wsrep_exec_mode != LOCAL_STATE) {
- DBUG_RETURN(WSREP_TRX_OK);
- }
+
+ if (thd->wsrep_exec_mode != LOCAL_STATE) DBUG_RETURN(WSREP_TRX_OK);
+
if (thd->wsrep_consistency_check == CONSISTENCY_CHECK_RUNNING) {
WSREP_DEBUG("commit for consistency check: %s", thd->query());
DBUG_RETURN(WSREP_TRX_OK);
@@ -327,10 +324,10 @@ wsrep_run_wsrep_commit(
mysql_mutex_lock(&LOCK_wsrep_replaying);
- while (wsrep_replaying > 0 &&
+ while (wsrep_replaying > 0 &&
thd->wsrep_conflict_state == NO_CONFLICT &&
thd->killed == NOT_KILLED &&
- !shutdown_in_progress)
+ !shutdown_in_progress)
{
mysql_mutex_unlock(&LOCK_wsrep_replaying);
@@ -348,9 +345,12 @@ wsrep_run_wsrep_commit(
struct timespec wtime = {0, 1000000};
mysql_cond_timedwait(&COND_wsrep_replaying, &LOCK_wsrep_replaying,
&wtime);
+
if (replay_round++ % 100000 == 0)
- WSREP_DEBUG("commit waiting for replaying: replayers %d, thd: (%lu) conflict: %d (round: %d)",
- wsrep_replaying, thd->thread_id, thd->wsrep_conflict_state, replay_round);
+ WSREP_DEBUG("commit waiting for replaying: replayers %d, thd: (%lu) "
+ "conflict: %d (round: %d)",
+ wsrep_replaying, thd->thread_id,
+ thd->wsrep_conflict_state, replay_round);
mysql_mutex_unlock(&LOCK_wsrep_replaying);
@@ -371,7 +371,8 @@ wsrep_run_wsrep_commit(
WSREP_DEBUG("innobase_commit abort after replaying wait %s",
(thd->query()) ? thd->query() : "void");
DBUG_RETURN(WSREP_TRX_ROLLBACK);
- }
+ }
+
thd->wsrep_query_state = QUERY_COMMITTING;
mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
@@ -379,28 +380,28 @@ wsrep_run_wsrep_commit(
rcode = 0;
if (cache) {
thd->binlog_flush_pending_rows_event(true);
- rcode = wsrep_write_cache(cache, &rbr_data, &data_len);
- if (rcode) {
- WSREP_ERROR("rbr write fail, data_len: %d, %d", data_len, rcode);
- if (data_len) my_free(rbr_data);
+ rcode = wsrep_write_cache(wsrep, thd, cache, &data_len);
+ if (WSREP_OK != rcode) {
+ WSREP_ERROR("rbr write fail, data_len: %zu, %d", data_len, rcode);
DBUG_RETURN(WSREP_TRX_ROLLBACK);
}
}
- if (data_len == 0)
+
+ if (data_len == 0)
{
- if (thd->stmt_da->is_ok() &&
+ if (thd->stmt_da->is_ok() &&
thd->stmt_da->affected_rows() > 0 &&
!binlog_filter->is_on())
{
WSREP_DEBUG("empty rbr buffer, query: %s, "
- "affected rows: %llu, "
- "changed tables: %d, "
+ "affected rows: %llu, "
+ "changed tables: %d, "
"sql_log_bin: %d, "
- "wsrep status (%d %d %d)",
+ "wsrep status (%d %d %d)",
thd->query(), thd->stmt_da->affected_rows(),
stmt_has_updated_trans_table(thd), thd->variables.sql_log_bin,
- thd->wsrep_exec_mode, thd->wsrep_query_state,
- thd->wsrep_conflict_state);
+ thd->wsrep_exec_mode, thd->wsrep_query_state,
+ thd->wsrep_conflict_state);
}
else
{
@@ -409,38 +410,33 @@ wsrep_run_wsrep_commit(
thd->wsrep_query_state= QUERY_EXEC;
DBUG_RETURN(WSREP_TRX_OK);
}
- if (WSREP_UNDEFINED_TRX_ID == thd->wsrep_trx_handle.trx_id)
+
+ if (WSREP_UNDEFINED_TRX_ID == thd->wsrep_ws_handle.trx_id)
{
- WSREP_WARN("SQL statement was ineffective, THD: %lu, buf: %d\n"
- "QUERY: %s\n"
- " => Skipping replication",
- thd->thread_id, data_len, thd->query());
- if (wsrep_debug)
- {
- wsrep_write_rbr_buf(thd, rbr_data, data_len);
- }
+ WSREP_WARN("SQL statement was ineffective, THD: %lu, buf: %zu\n"
+ "QUERY: %s\n"
+ " => Skipping replication",
+ thd->thread_id, data_len, thd->query());
rcode = WSREP_TRX_FAIL;
}
else if (!rcode)
{
- rcode = wsrep->pre_commit(
- wsrep,
- (wsrep_conn_id_t)thd->thread_id,
- &thd->wsrep_trx_handle,
- rbr_data,
- data_len,
- (thd->wsrep_PA_safe) ? WSREP_FLAG_PA_SAFE : 0ULL,
- &thd->wsrep_trx_seqno);
- switch (rcode) {
- case WSREP_TRX_MISSING:
+ if (WSREP_OK == rcode)
+ rcode = wsrep->pre_commit(wsrep,
+ (wsrep_conn_id_t)thd->thread_id,
+ &thd->wsrep_ws_handle,
+ WSREP_FLAG_COMMIT |
+ ((thd->wsrep_PA_safe) ?
+ 0ULL : WSREP_FLAG_PA_UNSAFE),
+ &thd->wsrep_trx_meta);
+
+ if (rcode == WSREP_TRX_MISSING) {
WSREP_WARN("Transaction missing in provider, thd: %ld, SQL: %s",
thd->thread_id, thd->query());
- wsrep_write_rbr_buf(thd, rbr_data, data_len);
rcode = WSREP_TRX_FAIL;
- break;
- case WSREP_BF_ABORT:
+ } else if (rcode == WSREP_BF_ABORT) {
WSREP_DEBUG("thd %lu seqno %lld BF aborted by provider, will replay",
- thd->thread_id, (long long)thd->wsrep_trx_seqno);
+ thd->thread_id, (long long)thd->wsrep_trx_meta.gtid.seqno);
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
thd->wsrep_conflict_state = MUST_REPLAY;
mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
@@ -449,22 +445,14 @@ wsrep_run_wsrep_commit(
WSREP_DEBUG("replaying increased: %d, thd: %lu",
wsrep_replaying, thd->thread_id);
mysql_mutex_unlock(&LOCK_wsrep_replaying);
- break;
- default:
- break;
}
} else {
WSREP_ERROR("I/O error reading from thd's binlog iocache: "
"errno=%d, io cache code=%d", my_errno, cache->error);
- if (data_len) my_free(rbr_data);
DBUG_ASSERT(0); // failure like this can not normally happen
DBUG_RETURN(WSREP_TRX_ERROR);
}
- if (data_len) {
- my_free(rbr_data);
- }
-
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
switch(rcode) {
case 0:
@@ -481,22 +469,22 @@ wsrep_run_wsrep_commit(
{
WSREP_WARN("thd %lu seqno %lld: conflict state %d after post commit",
thd->thread_id,
- (long long)thd->wsrep_trx_seqno,
+ (long long)thd->wsrep_trx_meta.gtid.seqno,
thd->wsrep_conflict_state);
}
thd->wsrep_exec_mode= LOCAL_COMMIT;
- DBUG_ASSERT(thd->wsrep_trx_seqno != WSREP_SEQNO_UNDEFINED);
+ DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno != WSREP_SEQNO_UNDEFINED);
/* Override XID iff it was generated by mysql */
if (thd->transaction.xid_state.xid.get_my_xid())
{
wsrep_xid_init(&thd->transaction.xid_state.xid,
- wsrep_cluster_uuid(),
- thd->wsrep_trx_seqno);
+ &thd->wsrep_trx_meta.gtid.uuid,
+ thd->wsrep_trx_meta.gtid.seqno);
}
DBUG_PRINT("wsrep", ("replicating commit success"));
break;
case WSREP_BF_ABORT:
- DBUG_ASSERT(thd->wsrep_trx_seqno != WSREP_SEQNO_UNDEFINED);
+ DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno != WSREP_SEQNO_UNDEFINED);
case WSREP_TRX_FAIL:
WSREP_DEBUG("commit failed for reason: %d", rcode);
DBUG_PRINT("wsrep", ("replicating commit fail"));
@@ -505,7 +493,7 @@ wsrep_run_wsrep_commit(
if (thd->wsrep_conflict_state == MUST_ABORT) {
thd->wsrep_conflict_state= ABORTED;
- }
+ }
else
{
WSREP_DEBUG("conflict state: %d", thd->wsrep_conflict_state);
@@ -563,14 +551,15 @@ mysql_declare_plugin(wsrep)
&wsrep_storage_engine,
"wsrep",
"Codership Oy",
- "A pseudo storage engine to represent transactions in multi-master synchornous replication",
+ "A pseudo storage engine to represent transactions in multi-master "
+ "synchornous replication",
PLUGIN_LICENSE_GPL,
wsrep_hton_init, /* Plugin Init */
NULL, /* Plugin Deinit */
0x0100 /* 1.0 */,
NULL, /* status variables */
NULL, /* system variables */
- NULL, /* config options */
+ NULL, /* config options */
0, /* flags */
}
mysql_declare_plugin_end;