summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/handler.cc9
-rw-r--r--sql/log.cc2
-rw-r--r--sql/service_wsrep.cc10
-rw-r--r--sql/sql_plugin_services.ic4
-rw-r--r--sql/wsrep_applier.cc42
-rw-r--r--sql/wsrep_applier.h37
-rw-r--r--sql/wsrep_binlog.cc4
-rw-r--r--sql/wsrep_client_service.cc1
-rw-r--r--sql/wsrep_dummy.cc6
-rw-r--r--sql/wsrep_high_priority_service.cc71
-rw-r--r--sql/wsrep_high_priority_service.h5
-rw-r--r--sql/wsrep_mysqld.cc18
-rw-r--r--sql/wsrep_schema.cc14
-rw-r--r--sql/wsrep_trans_observer.h4
14 files changed, 113 insertions, 114 deletions
diff --git a/sql/handler.cc b/sql/handler.cc
index 94cffd69b75..5f2a1a573ba 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -1537,8 +1537,9 @@ int ha_commit_trans(THD *thd, bool all)
#endif /* WITH_WSREP */
error= ha_commit_one_phase(thd, all);
#ifdef WITH_WSREP
- if (run_wsrep_hooks)
- error= error || wsrep_after_commit(thd, all);
+ // Here in case of error we must return 2 for inconsistency
+ if (run_wsrep_hooks && !error)
+ error= wsrep_after_commit(thd, all) ? 2 : 0;
#endif /* WITH_WSREP */
goto done;
}
@@ -1607,8 +1608,10 @@ int ha_commit_trans(THD *thd, bool all)
error= commit_one_phase_2(thd, all, trans, is_real_trans) ? 2 : 0;
#ifdef WITH_WSREP
- if (run_wsrep_hooks && (error || (error = wsrep_after_commit(thd, all))))
+ if (run_wsrep_hooks &&
+ (error || (error = wsrep_after_commit(thd, all))))
{
+ error = 2;
mysql_mutex_lock(&thd->LOCK_thd_data);
if (wsrep_must_abort(thd))
{
diff --git a/sql/log.cc b/sql/log.cc
index d5d879a3409..4f51a9a9c17 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -7735,7 +7735,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
Release commit order and if leader, wait for prior commit to
complete. This establishes total order for group leaders.
*/
- if (wsrep_ordered_commit(entry->thd, entry->all, wsrep_apply_error()))
+ if (wsrep_ordered_commit(entry->thd, entry->all))
{
entry->thd->wakeup_subsequent_commits(1);
return 1;
diff --git a/sql/service_wsrep.cc b/sql/service_wsrep.cc
index 8583897e064..c47ba9d9d37 100644
--- a/sql/service_wsrep.cc
+++ b/sql/service_wsrep.cc
@@ -270,3 +270,13 @@ extern "C" void wsrep_commit_ordered(THD *thd)
thd->wsrep_cs().ordered_commit();
}
}
+
+extern "C" my_bool wsrep_thd_has_ignored_error(const THD *thd)
+{
+ return thd->wsrep_has_ignored_error;
+}
+
+extern "C" void wsrep_thd_set_ignored_error(THD *thd, my_bool val)
+{
+ thd->wsrep_has_ignored_error= val;
+}
diff --git a/sql/sql_plugin_services.ic b/sql/sql_plugin_services.ic
index c7ecfcd482e..c3b0088c9bd 100644
--- a/sql/sql_plugin_services.ic
+++ b/sql/sql_plugin_services.ic
@@ -173,7 +173,9 @@ static struct wsrep_service_st wsrep_handler = {
wsrep_get_sr_table_name,
wsrep_get_debug,
wsrep_commit_ordered,
- wsrep_thd_is_applying
+ wsrep_thd_is_applying,
+ wsrep_thd_has_ignored_error,
+ wsrep_thd_set_ignored_error
};
static struct thd_specifics_service_st thd_specifics_handler=
diff --git a/sql/wsrep_applier.cc b/sql/wsrep_applier.cc
index fd51dbf9439..1ab65df1ca3 100644
--- a/sql/wsrep_applier.cc
+++ b/sql/wsrep_applier.cc
@@ -1,4 +1,4 @@
-/* Copyright (C) 2013-2015 Codership Oy <info@codership.com>
+/* Copyright (C) 2013-2019 Codership Oy <info@codership.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -24,7 +24,6 @@
#include "wsrep_trans_observer.h"
#include "slave.h" // opt_log_slave_updates
-#include "log_event.h" // class THD, EVENT_LEN_OFFSET, etc.
#include "debug_sync.h"
/*
@@ -60,7 +59,6 @@ static Log_event* wsrep_read_log_event(
}
#include "transaction.h" // trans_commit(), trans_rollback()
-#include "rpl_rli.h" // class Relay_log_info;
void wsrep_set_apply_format(THD* thd, Format_description_log_event* ev)
{
@@ -84,7 +82,7 @@ wsrep_get_apply_format(THD* thd)
return thd->wsrep_rgi->rli->relay_log.description_event_for_exec;
}
-void wsrep_apply_error::store(const THD* const thd)
+void wsrep_store_error(const THD* const thd, wsrep::mutable_buffer& dst)
{
Diagnostics_area::Sql_condition_iterator it=
thd->get_stmt_da()->sql_conditions();
@@ -92,27 +90,10 @@ void wsrep_apply_error::store(const THD* const thd)
static size_t const max_len= 2*MAX_SLAVE_ERRMSG; // 2x so that we have enough
- if (NULL == str_)
- {
- // this must be freeable by standard free()
- str_= static_cast<char*>(malloc(max_len));
- if (NULL == str_)
- {
- WSREP_ERROR("Failed to allocate %zu bytes for error buffer.", max_len);
- len_= 0;
- return;
- }
- }
- else
- {
- /* This is possible when we invoke rollback after failed applying.
- * In this situation DA should not be reset yet and should contain
- * all previous errors from applying and new ones from rollbacking,
- * so we just overwrite is from scratch */
- }
+ dst.resize(max_len);
- char* slider= str_;
- const char* const buf_end= str_ + max_len - 1; // -1: leave space for \0
+ char* slider= dst.data();
+ const char* const buf_end= slider + max_len - 1; // -1: leave space for \0
for (cond= it++; cond && slider < buf_end; cond= it++)
{
@@ -123,12 +104,17 @@ void wsrep_apply_error::store(const THD* const thd)
err_str, err_code);
}
- *slider= '\0';
- len_= slider - str_ + 1; // +1: add \0
+ if (slider != dst.data())
+ {
+ *slider= '\0';
+ slider++;
+ }
+
+ dst.resize(slider - dst.data());
- WSREP_DEBUG("Error buffer for thd %llu seqno %lld, %zu bytes: %s",
+ WSREP_DEBUG("Error buffer for thd %llu seqno %lld, %zu bytes: '%s'",
thd->thread_id, (long long)wsrep_thd_trx_seqno(thd),
- len_, str_ ? str_ : "(null)");
+ dst.size(), dst.size() ? dst.data() : "(null)");
}
int wsrep_apply_events(THD* thd,
diff --git a/sql/wsrep_applier.h b/sql/wsrep_applier.h
index 70361987cc7..fefca306a70 100644
--- a/sql/wsrep_applier.h
+++ b/sql/wsrep_applier.h
@@ -1,4 +1,4 @@
-/* Copyright 2013-2015 Codership Oy <http://www.codership.com>
+/* Copyright 2013-2019 Codership Oy <http://www.codership.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -16,16 +16,15 @@
#ifndef WSREP_APPLIER_H
#define WSREP_APPLIER_H
-#include <my_config.h>
-
#include "sql_class.h" // THD class
+#include "rpl_rli.h" // Relay_log_info
+#include "log_event.h" // Format_description_log_event
int wsrep_apply_events(THD* thd,
Relay_log_info* rli,
const void* events_buf,
size_t buf_len);
-
/* Applier error codes, when nothing better is available. */
#define WSREP_RET_SUCCESS 0 // Success
#define WSREP_ERR_GENERIC 1 // When in doubt (MySQL default error code)
@@ -36,38 +35,10 @@ int wsrep_apply_events(THD* thd,
#define WSREP_ERR_FAILED 6 // Operation failed for some internal reason
#define WSREP_ERR_ABORTED 7 // Operation was aborted externally
-class wsrep_apply_error
-{
-public:
- wsrep_apply_error() : str_(NULL), len_(0) {};
- ~wsrep_apply_error() { ::free(str_); }
- /* stores the current THD error info from the diagnostic area. Works only
- * once, subsequent invocations are ignored in order to preserve the original
- * condition. */
- void store(const THD* thd);
- const char* c_str() const { return str_; }
- size_t length() const { return len_; }
- bool is_null() const { return (c_str() == NULL && length() == 0); }
- wsrep_buf_t get_buf() const
- {
- wsrep_buf_t ret= { c_str(), length() };
- return ret;
- }
-private:
- char* str_;
- size_t len_;
-};
+void wsrep_store_error(const THD* thd, wsrep::mutable_buffer& buf);
class Format_description_log_event;
void wsrep_set_apply_format(THD*, Format_description_log_event*);
Format_description_log_event* wsrep_get_apply_format(THD* thd);
-int wsrep_apply(void* ctx,
- uint32_t flags,
- const wsrep_buf_t* buf,
- const wsrep_trx_meta_t* meta,
- wsrep_apply_error& err);
-
-wsrep_cb_status_t wsrep_unordered_cb(void* ctx,
- const wsrep_buf_t* data);
#endif /* WSREP_APPLIER_H */
diff --git a/sql/wsrep_binlog.cc b/sql/wsrep_binlog.cc
index ecab4664d7b..cfa709b1055 100644
--- a/sql/wsrep_binlog.cc
+++ b/sql/wsrep_binlog.cc
@@ -417,7 +417,9 @@ void wsrep_register_for_group_commit(THD *thd)
void wsrep_unregister_from_group_commit(THD *thd)
{
- DBUG_ASSERT(thd->wsrep_trx().state() == wsrep::transaction::s_ordered_commit);
+ DBUG_ASSERT(thd->wsrep_trx().state() == wsrep::transaction::s_ordered_commit||
+ // ordered_commit() failure results in s_aborting state
+ thd->wsrep_trx().state() == wsrep::transaction::s_aborting);
wait_for_commit *wfc= thd->wait_for_commit_ptr;
if (wfc)
diff --git a/sql/wsrep_client_service.cc b/sql/wsrep_client_service.cc
index b182691c593..0fa10c1c9ea 100644
--- a/sql/wsrep_client_service.cc
+++ b/sql/wsrep_client_service.cc
@@ -15,7 +15,6 @@
#include "wsrep_client_service.h"
#include "wsrep_high_priority_service.h"
-#include "wsrep_applier.h" /* wsrep_apply_events() */
#include "wsrep_binlog.h" /* wsrep_dump_rbr_buf() */
#include "wsrep_schema.h" /* remove_fragments() */
#include "wsrep_thd.h"
diff --git a/sql/wsrep_dummy.cc b/sql/wsrep_dummy.cc
index 75ee9b04cdf..2ea434c092b 100644
--- a/sql/wsrep_dummy.cc
+++ b/sql/wsrep_dummy.cc
@@ -138,3 +138,9 @@ void wsrep_commit_ordered(THD* )
my_bool wsrep_thd_is_applying(const THD*)
{ return 0;}
+
+my_bool wsrep_thd_has_ignored_error(const THD*)
+{ return 0;}
+
+void wsrep_thd_set_ignored_error(THD*, my_bool)
+{ }
diff --git a/sql/wsrep_high_priority_service.cc b/sql/wsrep_high_priority_service.cc
index 68cf0d1877b..581ecfc8d34 100644
--- a/sql/wsrep_high_priority_service.cc
+++ b/sql/wsrep_high_priority_service.cc
@@ -119,6 +119,23 @@ static void wsrep_setup_uk_and_fk_checks(THD* thd)
thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
}
+static int apply_events(THD* thd,
+ Relay_log_info* rli,
+ const wsrep::const_buffer& data,
+ wsrep::mutable_buffer& err)
+{
+ int const ret= wsrep_apply_events(thd, rli, data.data(), data.size());
+ if (ret || wsrep_thd_has_ignored_error(thd))
+ {
+ if (ret)
+ {
+ wsrep_store_error(thd, err);
+ }
+ wsrep_dump_rbr_buf_with_header(thd, data.data(), data.size());
+ }
+ return ret;
+}
+
/****************************************************************************
High priority service
*****************************************************************************/
@@ -247,8 +264,8 @@ int Wsrep_high_priority_service::append_fragment_and_commit(
common utility function to deal with commit.
*/
const bool do_binlog_commit= (opt_log_slave_updates &&
- wsrep_gtid_mode &&
- m_thd->variables.gtid_seq_no);
+ wsrep_gtid_mode &&
+ m_thd->variables.gtid_seq_no);
/*
Write skip event into binlog if gtid_mode is on. This is to
maintain gtid continuity.
@@ -265,8 +282,7 @@ int Wsrep_high_priority_service::append_fragment_and_commit(
}
ret= ret || trans_commit(m_thd);
-
- m_thd->wsrep_cs().after_applying();
+ ret= ret || (m_thd->wsrep_cs().after_applying(), 0);
m_thd->mdl_context.release_transactional_locks();
thd_proc_info(m_thd, "wsrep applier committed");
@@ -335,7 +351,15 @@ int Wsrep_high_priority_service::rollback(const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta)
{
DBUG_ENTER("Wsrep_high_priority_service::rollback");
- m_thd->wsrep_cs().prepare_for_ordering(ws_handle, ws_meta, false);
+ if (ws_meta.ordered())
+ {
+ m_thd->wsrep_cs().prepare_for_ordering(ws_handle, ws_meta, false);
+ }
+ else
+ {
+ assert(ws_meta == wsrep::ws_meta());
+ assert(ws_handle == wsrep::ws_handle());
+ }
int ret= (trans_rollback_stmt(m_thd) || trans_rollback(m_thd));
m_thd->mdl_context.release_transactional_locks();
m_thd->mdl_context.release_explicit_locks();
@@ -344,7 +368,7 @@ int Wsrep_high_priority_service::rollback(const wsrep::ws_handle& ws_handle,
int Wsrep_high_priority_service::apply_toi(const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data,
- wsrep::mutable_buffer&)
+ wsrep::mutable_buffer& err)
{
DBUG_ENTER("Wsrep_high_priority_service::apply_toi");
THD* thd= m_thd;
@@ -358,13 +382,8 @@ int Wsrep_high_priority_service::apply_toi(const wsrep::ws_meta& ws_meta,
WSREP_DEBUG("Wsrep_high_priority_service::apply_toi: %lld",
client_state.toi_meta().seqno().get());
- int ret= wsrep_apply_events(thd, m_rli, data.data(), data.size());
- if (ret != 0 || thd->wsrep_has_ignored_error)
- {
- wsrep_dump_rbr_buf_with_header(thd, data.data(), data.size());
- thd->wsrep_has_ignored_error= false;
- /* todo: error voting */
- }
+ int ret= apply_events(thd, m_rli, data, err);
+ wsrep_thd_set_ignored_error(thd, false);
trans_commit(thd);
thd->close_temporary_tables();
@@ -435,6 +454,11 @@ int Wsrep_high_priority_service::log_dummy_write_set(const wsrep::ws_handle& ws_
DBUG_RETURN(ret);
}
+void Wsrep_high_priority_service::adopt_apply_error(wsrep::mutable_buffer& err)
+{
+ m_thd->wsrep_cs().adopt_apply_error(err);
+}
+
void Wsrep_high_priority_service::debug_crash(const char* crash_point)
{
DBUG_ASSERT(m_thd == current_thd);
@@ -466,7 +490,7 @@ Wsrep_applier_service::~Wsrep_applier_service()
int Wsrep_applier_service::apply_write_set(const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data,
- wsrep::mutable_buffer&)
+ wsrep::mutable_buffer& err)
{
DBUG_ENTER("Wsrep_applier_service::apply_write_set");
THD* thd= m_thd;
@@ -492,13 +516,7 @@ int Wsrep_applier_service::apply_write_set(const wsrep::ws_meta& ws_meta,
};);
wsrep_setup_uk_and_fk_checks(thd);
-
- int ret= wsrep_apply_events(thd, m_rli, data.data(), data.size());
-
- if (ret || thd->wsrep_has_ignored_error)
- {
- wsrep_dump_rbr_buf_with_header(thd, data.data(), data.size());
- }
+ int ret= apply_events(thd, m_rli, data, err);
thd->close_temporary_tables();
if (!ret && !(ws_meta.flags() & wsrep::provider::flag::commit))
@@ -621,7 +639,7 @@ Wsrep_replayer_service::~Wsrep_replayer_service()
int Wsrep_replayer_service::apply_write_set(const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data,
- wsrep::mutable_buffer&)
+ wsrep::mutable_buffer& err)
{
DBUG_ENTER("Wsrep_replayer_service::apply_write_set");
THD* thd= m_thd;
@@ -640,14 +658,7 @@ int Wsrep_replayer_service::apply_write_set(const wsrep::ws_meta& ws_meta,
ws_meta,
thd->wsrep_sr().fragments());
}
-
- ret= ret || wsrep_apply_events(thd, m_rli, data.data(), data.size());
-
- if (ret || thd->wsrep_has_ignored_error)
- {
- wsrep_dump_rbr_buf_with_header(thd, data.data(), data.size());
- }
-
+ ret= ret || apply_events(thd, m_rli, data, err);
thd->close_temporary_tables();
if (!ret && !(ws_meta.flags() & wsrep::provider::flag::commit))
{
diff --git a/sql/wsrep_high_priority_service.h b/sql/wsrep_high_priority_service.h
index c8c5eb87f44..5657a2e82fc 100644
--- a/sql/wsrep_high_priority_service.h
+++ b/sql/wsrep_high_priority_service.h
@@ -17,7 +17,6 @@
#define WSREP_HIGH_PRIORITY_SERVICE_H
#include "wsrep/high_priority_service.hpp"
-#include "wsrep/client_state.hpp"
#include "my_global.h"
#include "sql_error.h" /* Diagnostics area */
#include "sql_class.h" /* rpl_group_info */
@@ -53,7 +52,7 @@ public:
int log_dummy_write_set(const wsrep::ws_handle&,
const wsrep::ws_meta&,
wsrep::mutable_buffer&);
- void adopt_apply_error(wsrep::mutable_buffer& err) {}
+ void adopt_apply_error(wsrep::mutable_buffer&);
virtual bool check_exit_status() const = 0;
void debug_crash(const char*);
@@ -74,7 +73,7 @@ protected:
my_hrtime_t user_time;
longlong row_count_func;
bool wsrep_applier;
-} m_shadow;
+ } m_shadow;
};
class Wsrep_applier_service : public Wsrep_high_priority_service
diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc
index 6b64915ada2..ad4203490f2 100644
--- a/sql/wsrep_mysqld.cc
+++ b/sql/wsrep_mysqld.cc
@@ -281,7 +281,7 @@ static void wsrep_log_cb(wsrep::log::level level, const char *msg)
sql_print_warning("WSREP: %s", msg);
break;
case wsrep::log::error:
- sql_print_error("WSREP: %s", msg);
+ sql_print_error("WSREP: %s", msg);
break;
case wsrep::log::debug:
if (wsrep_debug) sql_print_information ("[Debug] WSREP: %s", msg);
@@ -1800,13 +1800,16 @@ static void wsrep_TOI_begin_failed(THD* thd, const wsrep_buf_t* /* const err */)
if (wsrep_emulate_bin_log) wsrep_thd_binlog_trx_reset(thd);
if (wsrep_write_dummy_event(thd, "TOI begin failed")) { goto fail; }
wsrep::client_state& cs(thd->wsrep_cs());
- int const ret= cs.leave_toi_local(wsrep::mutable_buffer());
+ std::string const err(wsrep::to_c_string(cs.current_error()));
+ wsrep::mutable_buffer err_buf;
+ err_buf.push_back(err);
+ int const ret= cs.leave_toi_local(err_buf);
if (ret)
{
WSREP_ERROR("Leaving critical section for failed TOI failed: thd: %lld, "
"schema: %s, SQL: %s, rcode: %d wsrep_error: %s",
(long long)thd->real_id, thd->db.str,
- thd->query(), ret, wsrep::to_c_string(cs.current_error()));
+ thd->query(), ret, err.c_str());
goto fail;
}
}
@@ -1927,7 +1930,12 @@ static void wsrep_TOI_end(THD *thd) {
if (wsrep_thd_is_local_toi(thd))
{
wsrep_set_SE_checkpoint(client_state.toi_meta().gtid());
- int ret= client_state.leave_toi_local(wsrep::mutable_buffer());
+ wsrep::mutable_buffer err;
+ if (thd->is_error() && !wsrep_must_ignore_error(thd))
+ {
+ wsrep_store_error(thd, err);
+ }
+ int const ret= client_state.leave_toi_local(err);
if (!ret)
{
WSREP_DEBUG("TO END: %lld", client_state.toi_meta().seqno().get());
@@ -2418,7 +2426,7 @@ int wsrep_must_ignore_error(THD* thd)
const uint flags= sql_command_flags[thd->lex->sql_command];
DBUG_ASSERT(error);
- DBUG_ASSERT(wsrep_thd_is_toi(thd) || wsrep_thd_is_applying(thd));
+ DBUG_ASSERT(wsrep_thd_is_toi(thd));
if ((wsrep_ignore_apply_errors & WSREP_IGNORE_ERRORS_ON_DDL))
goto ignore_error;
diff --git a/sql/wsrep_schema.cc b/sql/wsrep_schema.cc
index 0fb0adca2bb..ee0fc8f12cc 100644
--- a/sql/wsrep_schema.cc
+++ b/sql/wsrep_schema.cc
@@ -1289,7 +1289,7 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
goto out;
}
- while (true)
+ while (0 == error)
{
if ((error= Wsrep_schema_impl::next_record(frag_table)) == 0)
{
@@ -1344,19 +1344,23 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
}
applier->store_globals();
wsrep::mutable_buffer unused;
- applier->apply_write_set(ws_meta, data, unused);
- applier->after_apply();
+ if ((ret= applier->apply_write_set(ws_meta, data, unused)) != 0)
+ {
+ WSREP_ERROR("SR trx recovery applying returned %d", ret);
+ }
+ else
+ {
+ applier->after_apply();
+ }
storage_service.store_globals();
}
else if (error == HA_ERR_END_OF_FILE)
{
ret= 0;
- break;
}
else
{
WSREP_ERROR("SR table scan returned error %d", error);
- break;
}
}
Wsrep_schema_impl::end_scan(frag_table);
diff --git a/sql/wsrep_trans_observer.h b/sql/wsrep_trans_observer.h
index 69756786dd2..118525bb908 100644
--- a/sql/wsrep_trans_observer.h
+++ b/sql/wsrep_trans_observer.h
@@ -292,9 +292,7 @@ static inline int wsrep_before_commit(THD* thd, bool all)
Return zero on succes, non-zero on failure.
*/
-static inline int wsrep_ordered_commit(THD* thd,
- bool all,
- const wsrep_apply_error&)
+static inline int wsrep_ordered_commit(THD* thd, bool all)
{
DBUG_ENTER("wsrep_ordered_commit");
WSREP_DEBUG("wsrep_ordered_commit: %d", wsrep_is_real(thd, all));