diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/handler.cc | 9 | ||||
-rw-r--r-- | sql/log.cc | 2 | ||||
-rw-r--r-- | sql/service_wsrep.cc | 10 | ||||
-rw-r--r-- | sql/sql_plugin_services.ic | 4 | ||||
-rw-r--r-- | sql/wsrep_applier.cc | 42 | ||||
-rw-r--r-- | sql/wsrep_applier.h | 37 | ||||
-rw-r--r-- | sql/wsrep_binlog.cc | 4 | ||||
-rw-r--r-- | sql/wsrep_client_service.cc | 1 | ||||
-rw-r--r-- | sql/wsrep_dummy.cc | 6 | ||||
-rw-r--r-- | sql/wsrep_high_priority_service.cc | 71 | ||||
-rw-r--r-- | sql/wsrep_high_priority_service.h | 5 | ||||
-rw-r--r-- | sql/wsrep_mysqld.cc | 18 | ||||
-rw-r--r-- | sql/wsrep_schema.cc | 14 | ||||
-rw-r--r-- | sql/wsrep_trans_observer.h | 4 |
14 files changed, 113 insertions, 114 deletions
diff --git a/sql/handler.cc b/sql/handler.cc index 94cffd69b75..5f2a1a573ba 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -1537,8 +1537,9 @@ int ha_commit_trans(THD *thd, bool all) #endif /* WITH_WSREP */ error= ha_commit_one_phase(thd, all); #ifdef WITH_WSREP - if (run_wsrep_hooks) - error= error || wsrep_after_commit(thd, all); + // Here in case of error we must return 2 for inconsistency + if (run_wsrep_hooks && !error) + error= wsrep_after_commit(thd, all) ? 2 : 0; #endif /* WITH_WSREP */ goto done; } @@ -1607,8 +1608,10 @@ int ha_commit_trans(THD *thd, bool all) error= commit_one_phase_2(thd, all, trans, is_real_trans) ? 2 : 0; #ifdef WITH_WSREP - if (run_wsrep_hooks && (error || (error = wsrep_after_commit(thd, all)))) + if (run_wsrep_hooks && + (error || (error = wsrep_after_commit(thd, all)))) { + error = 2; mysql_mutex_lock(&thd->LOCK_thd_data); if (wsrep_must_abort(thd)) { diff --git a/sql/log.cc b/sql/log.cc index d5d879a3409..4f51a9a9c17 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -7735,7 +7735,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) Release commit order and if leader, wait for prior commit to complete. This establishes total order for group leaders. */ - if (wsrep_ordered_commit(entry->thd, entry->all, wsrep_apply_error())) + if (wsrep_ordered_commit(entry->thd, entry->all)) { entry->thd->wakeup_subsequent_commits(1); return 1; diff --git a/sql/service_wsrep.cc b/sql/service_wsrep.cc index 8583897e064..c47ba9d9d37 100644 --- a/sql/service_wsrep.cc +++ b/sql/service_wsrep.cc @@ -270,3 +270,13 @@ extern "C" void wsrep_commit_ordered(THD *thd) thd->wsrep_cs().ordered_commit(); } } + +extern "C" my_bool wsrep_thd_has_ignored_error(const THD *thd) +{ + return thd->wsrep_has_ignored_error; +} + +extern "C" void wsrep_thd_set_ignored_error(THD *thd, my_bool val) +{ + thd->wsrep_has_ignored_error= val; +} diff --git a/sql/sql_plugin_services.ic b/sql/sql_plugin_services.ic index c7ecfcd482e..c3b0088c9bd 100644 --- a/sql/sql_plugin_services.ic +++ b/sql/sql_plugin_services.ic @@ -173,7 +173,9 @@ static struct wsrep_service_st wsrep_handler = { wsrep_get_sr_table_name, wsrep_get_debug, wsrep_commit_ordered, - wsrep_thd_is_applying + wsrep_thd_is_applying, + wsrep_thd_has_ignored_error, + wsrep_thd_set_ignored_error }; static struct thd_specifics_service_st thd_specifics_handler= diff --git a/sql/wsrep_applier.cc b/sql/wsrep_applier.cc index fd51dbf9439..1ab65df1ca3 100644 --- a/sql/wsrep_applier.cc +++ b/sql/wsrep_applier.cc @@ -1,4 +1,4 @@ -/* Copyright (C) 2013-2015 Codership Oy <info@codership.com> +/* Copyright (C) 2013-2019 Codership Oy <info@codership.com> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -24,7 +24,6 @@ #include "wsrep_trans_observer.h" #include "slave.h" // opt_log_slave_updates -#include "log_event.h" // class THD, EVENT_LEN_OFFSET, etc. #include "debug_sync.h" /* @@ -60,7 +59,6 @@ static Log_event* wsrep_read_log_event( } #include "transaction.h" // trans_commit(), trans_rollback() -#include "rpl_rli.h" // class Relay_log_info; void wsrep_set_apply_format(THD* thd, Format_description_log_event* ev) { @@ -84,7 +82,7 @@ wsrep_get_apply_format(THD* thd) return thd->wsrep_rgi->rli->relay_log.description_event_for_exec; } -void wsrep_apply_error::store(const THD* const thd) +void wsrep_store_error(const THD* const thd, wsrep::mutable_buffer& dst) { Diagnostics_area::Sql_condition_iterator it= thd->get_stmt_da()->sql_conditions(); @@ -92,27 +90,10 @@ void wsrep_apply_error::store(const THD* const thd) static size_t const max_len= 2*MAX_SLAVE_ERRMSG; // 2x so that we have enough - if (NULL == str_) - { - // this must be freeable by standard free() - str_= static_cast<char*>(malloc(max_len)); - if (NULL == str_) - { - WSREP_ERROR("Failed to allocate %zu bytes for error buffer.", max_len); - len_= 0; - return; - } - } - else - { - /* This is possible when we invoke rollback after failed applying. - * In this situation DA should not be reset yet and should contain - * all previous errors from applying and new ones from rollbacking, - * so we just overwrite is from scratch */ - } + dst.resize(max_len); - char* slider= str_; - const char* const buf_end= str_ + max_len - 1; // -1: leave space for \0 + char* slider= dst.data(); + const char* const buf_end= slider + max_len - 1; // -1: leave space for \0 for (cond= it++; cond && slider < buf_end; cond= it++) { @@ -123,12 +104,17 @@ void wsrep_apply_error::store(const THD* const thd) err_str, err_code); } - *slider= '\0'; - len_= slider - str_ + 1; // +1: add \0 + if (slider != dst.data()) + { + *slider= '\0'; + slider++; + } + + dst.resize(slider - dst.data()); - WSREP_DEBUG("Error buffer for thd %llu seqno %lld, %zu bytes: %s", + WSREP_DEBUG("Error buffer for thd %llu seqno %lld, %zu bytes: '%s'", thd->thread_id, (long long)wsrep_thd_trx_seqno(thd), - len_, str_ ? str_ : "(null)"); + dst.size(), dst.size() ? dst.data() : "(null)"); } int wsrep_apply_events(THD* thd, diff --git a/sql/wsrep_applier.h b/sql/wsrep_applier.h index 70361987cc7..fefca306a70 100644 --- a/sql/wsrep_applier.h +++ b/sql/wsrep_applier.h @@ -1,4 +1,4 @@ -/* Copyright 2013-2015 Codership Oy <http://www.codership.com> +/* Copyright 2013-2019 Codership Oy <http://www.codership.com> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -16,16 +16,15 @@ #ifndef WSREP_APPLIER_H #define WSREP_APPLIER_H -#include <my_config.h> - #include "sql_class.h" // THD class +#include "rpl_rli.h" // Relay_log_info +#include "log_event.h" // Format_description_log_event int wsrep_apply_events(THD* thd, Relay_log_info* rli, const void* events_buf, size_t buf_len); - /* Applier error codes, when nothing better is available. */ #define WSREP_RET_SUCCESS 0 // Success #define WSREP_ERR_GENERIC 1 // When in doubt (MySQL default error code) @@ -36,38 +35,10 @@ int wsrep_apply_events(THD* thd, #define WSREP_ERR_FAILED 6 // Operation failed for some internal reason #define WSREP_ERR_ABORTED 7 // Operation was aborted externally -class wsrep_apply_error -{ -public: - wsrep_apply_error() : str_(NULL), len_(0) {}; - ~wsrep_apply_error() { ::free(str_); } - /* stores the current THD error info from the diagnostic area. Works only - * once, subsequent invocations are ignored in order to preserve the original - * condition. */ - void store(const THD* thd); - const char* c_str() const { return str_; } - size_t length() const { return len_; } - bool is_null() const { return (c_str() == NULL && length() == 0); } - wsrep_buf_t get_buf() const - { - wsrep_buf_t ret= { c_str(), length() }; - return ret; - } -private: - char* str_; - size_t len_; -}; +void wsrep_store_error(const THD* thd, wsrep::mutable_buffer& buf); class Format_description_log_event; void wsrep_set_apply_format(THD*, Format_description_log_event*); Format_description_log_event* wsrep_get_apply_format(THD* thd); -int wsrep_apply(void* ctx, - uint32_t flags, - const wsrep_buf_t* buf, - const wsrep_trx_meta_t* meta, - wsrep_apply_error& err); - -wsrep_cb_status_t wsrep_unordered_cb(void* ctx, - const wsrep_buf_t* data); #endif /* WSREP_APPLIER_H */ diff --git a/sql/wsrep_binlog.cc b/sql/wsrep_binlog.cc index ecab4664d7b..cfa709b1055 100644 --- a/sql/wsrep_binlog.cc +++ b/sql/wsrep_binlog.cc @@ -417,7 +417,9 @@ void wsrep_register_for_group_commit(THD *thd) void wsrep_unregister_from_group_commit(THD *thd) { - DBUG_ASSERT(thd->wsrep_trx().state() == wsrep::transaction::s_ordered_commit); + DBUG_ASSERT(thd->wsrep_trx().state() == wsrep::transaction::s_ordered_commit|| + // ordered_commit() failure results in s_aborting state + thd->wsrep_trx().state() == wsrep::transaction::s_aborting); wait_for_commit *wfc= thd->wait_for_commit_ptr; if (wfc) diff --git a/sql/wsrep_client_service.cc b/sql/wsrep_client_service.cc index b182691c593..0fa10c1c9ea 100644 --- a/sql/wsrep_client_service.cc +++ b/sql/wsrep_client_service.cc @@ -15,7 +15,6 @@ #include "wsrep_client_service.h" #include "wsrep_high_priority_service.h" -#include "wsrep_applier.h" /* wsrep_apply_events() */ #include "wsrep_binlog.h" /* wsrep_dump_rbr_buf() */ #include "wsrep_schema.h" /* remove_fragments() */ #include "wsrep_thd.h" diff --git a/sql/wsrep_dummy.cc b/sql/wsrep_dummy.cc index 75ee9b04cdf..2ea434c092b 100644 --- a/sql/wsrep_dummy.cc +++ b/sql/wsrep_dummy.cc @@ -138,3 +138,9 @@ void wsrep_commit_ordered(THD* ) my_bool wsrep_thd_is_applying(const THD*) { return 0;} + +my_bool wsrep_thd_has_ignored_error(const THD*) +{ return 0;} + +void wsrep_thd_set_ignored_error(THD*, my_bool) +{ } diff --git a/sql/wsrep_high_priority_service.cc b/sql/wsrep_high_priority_service.cc index 68cf0d1877b..581ecfc8d34 100644 --- a/sql/wsrep_high_priority_service.cc +++ b/sql/wsrep_high_priority_service.cc @@ -119,6 +119,23 @@ static void wsrep_setup_uk_and_fk_checks(THD* thd) thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS; } +static int apply_events(THD* thd, + Relay_log_info* rli, + const wsrep::const_buffer& data, + wsrep::mutable_buffer& err) +{ + int const ret= wsrep_apply_events(thd, rli, data.data(), data.size()); + if (ret || wsrep_thd_has_ignored_error(thd)) + { + if (ret) + { + wsrep_store_error(thd, err); + } + wsrep_dump_rbr_buf_with_header(thd, data.data(), data.size()); + } + return ret; +} + /**************************************************************************** High priority service *****************************************************************************/ @@ -247,8 +264,8 @@ int Wsrep_high_priority_service::append_fragment_and_commit( common utility function to deal with commit. */ const bool do_binlog_commit= (opt_log_slave_updates && - wsrep_gtid_mode && - m_thd->variables.gtid_seq_no); + wsrep_gtid_mode && + m_thd->variables.gtid_seq_no); /* Write skip event into binlog if gtid_mode is on. This is to maintain gtid continuity. @@ -265,8 +282,7 @@ int Wsrep_high_priority_service::append_fragment_and_commit( } ret= ret || trans_commit(m_thd); - - m_thd->wsrep_cs().after_applying(); + ret= ret || (m_thd->wsrep_cs().after_applying(), 0); m_thd->mdl_context.release_transactional_locks(); thd_proc_info(m_thd, "wsrep applier committed"); @@ -335,7 +351,15 @@ int Wsrep_high_priority_service::rollback(const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta) { DBUG_ENTER("Wsrep_high_priority_service::rollback"); - m_thd->wsrep_cs().prepare_for_ordering(ws_handle, ws_meta, false); + if (ws_meta.ordered()) + { + m_thd->wsrep_cs().prepare_for_ordering(ws_handle, ws_meta, false); + } + else + { + assert(ws_meta == wsrep::ws_meta()); + assert(ws_handle == wsrep::ws_handle()); + } int ret= (trans_rollback_stmt(m_thd) || trans_rollback(m_thd)); m_thd->mdl_context.release_transactional_locks(); m_thd->mdl_context.release_explicit_locks(); @@ -344,7 +368,7 @@ int Wsrep_high_priority_service::rollback(const wsrep::ws_handle& ws_handle, int Wsrep_high_priority_service::apply_toi(const wsrep::ws_meta& ws_meta, const wsrep::const_buffer& data, - wsrep::mutable_buffer&) + wsrep::mutable_buffer& err) { DBUG_ENTER("Wsrep_high_priority_service::apply_toi"); THD* thd= m_thd; @@ -358,13 +382,8 @@ int Wsrep_high_priority_service::apply_toi(const wsrep::ws_meta& ws_meta, WSREP_DEBUG("Wsrep_high_priority_service::apply_toi: %lld", client_state.toi_meta().seqno().get()); - int ret= wsrep_apply_events(thd, m_rli, data.data(), data.size()); - if (ret != 0 || thd->wsrep_has_ignored_error) - { - wsrep_dump_rbr_buf_with_header(thd, data.data(), data.size()); - thd->wsrep_has_ignored_error= false; - /* todo: error voting */ - } + int ret= apply_events(thd, m_rli, data, err); + wsrep_thd_set_ignored_error(thd, false); trans_commit(thd); thd->close_temporary_tables(); @@ -435,6 +454,11 @@ int Wsrep_high_priority_service::log_dummy_write_set(const wsrep::ws_handle& ws_ DBUG_RETURN(ret); } +void Wsrep_high_priority_service::adopt_apply_error(wsrep::mutable_buffer& err) +{ + m_thd->wsrep_cs().adopt_apply_error(err); +} + void Wsrep_high_priority_service::debug_crash(const char* crash_point) { DBUG_ASSERT(m_thd == current_thd); @@ -466,7 +490,7 @@ Wsrep_applier_service::~Wsrep_applier_service() int Wsrep_applier_service::apply_write_set(const wsrep::ws_meta& ws_meta, const wsrep::const_buffer& data, - wsrep::mutable_buffer&) + wsrep::mutable_buffer& err) { DBUG_ENTER("Wsrep_applier_service::apply_write_set"); THD* thd= m_thd; @@ -492,13 +516,7 @@ int Wsrep_applier_service::apply_write_set(const wsrep::ws_meta& ws_meta, };); wsrep_setup_uk_and_fk_checks(thd); - - int ret= wsrep_apply_events(thd, m_rli, data.data(), data.size()); - - if (ret || thd->wsrep_has_ignored_error) - { - wsrep_dump_rbr_buf_with_header(thd, data.data(), data.size()); - } + int ret= apply_events(thd, m_rli, data, err); thd->close_temporary_tables(); if (!ret && !(ws_meta.flags() & wsrep::provider::flag::commit)) @@ -621,7 +639,7 @@ Wsrep_replayer_service::~Wsrep_replayer_service() int Wsrep_replayer_service::apply_write_set(const wsrep::ws_meta& ws_meta, const wsrep::const_buffer& data, - wsrep::mutable_buffer&) + wsrep::mutable_buffer& err) { DBUG_ENTER("Wsrep_replayer_service::apply_write_set"); THD* thd= m_thd; @@ -640,14 +658,7 @@ int Wsrep_replayer_service::apply_write_set(const wsrep::ws_meta& ws_meta, ws_meta, thd->wsrep_sr().fragments()); } - - ret= ret || wsrep_apply_events(thd, m_rli, data.data(), data.size()); - - if (ret || thd->wsrep_has_ignored_error) - { - wsrep_dump_rbr_buf_with_header(thd, data.data(), data.size()); - } - + ret= ret || apply_events(thd, m_rli, data, err); thd->close_temporary_tables(); if (!ret && !(ws_meta.flags() & wsrep::provider::flag::commit)) { diff --git a/sql/wsrep_high_priority_service.h b/sql/wsrep_high_priority_service.h index c8c5eb87f44..5657a2e82fc 100644 --- a/sql/wsrep_high_priority_service.h +++ b/sql/wsrep_high_priority_service.h @@ -17,7 +17,6 @@ #define WSREP_HIGH_PRIORITY_SERVICE_H #include "wsrep/high_priority_service.hpp" -#include "wsrep/client_state.hpp" #include "my_global.h" #include "sql_error.h" /* Diagnostics area */ #include "sql_class.h" /* rpl_group_info */ @@ -53,7 +52,7 @@ public: int log_dummy_write_set(const wsrep::ws_handle&, const wsrep::ws_meta&, wsrep::mutable_buffer&); - void adopt_apply_error(wsrep::mutable_buffer& err) {} + void adopt_apply_error(wsrep::mutable_buffer&); virtual bool check_exit_status() const = 0; void debug_crash(const char*); @@ -74,7 +73,7 @@ protected: my_hrtime_t user_time; longlong row_count_func; bool wsrep_applier; -} m_shadow; + } m_shadow; }; class Wsrep_applier_service : public Wsrep_high_priority_service diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index 6b64915ada2..ad4203490f2 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -281,7 +281,7 @@ static void wsrep_log_cb(wsrep::log::level level, const char *msg) sql_print_warning("WSREP: %s", msg); break; case wsrep::log::error: - sql_print_error("WSREP: %s", msg); + sql_print_error("WSREP: %s", msg); break; case wsrep::log::debug: if (wsrep_debug) sql_print_information ("[Debug] WSREP: %s", msg); @@ -1800,13 +1800,16 @@ static void wsrep_TOI_begin_failed(THD* thd, const wsrep_buf_t* /* const err */) if (wsrep_emulate_bin_log) wsrep_thd_binlog_trx_reset(thd); if (wsrep_write_dummy_event(thd, "TOI begin failed")) { goto fail; } wsrep::client_state& cs(thd->wsrep_cs()); - int const ret= cs.leave_toi_local(wsrep::mutable_buffer()); + std::string const err(wsrep::to_c_string(cs.current_error())); + wsrep::mutable_buffer err_buf; + err_buf.push_back(err); + int const ret= cs.leave_toi_local(err_buf); if (ret) { WSREP_ERROR("Leaving critical section for failed TOI failed: thd: %lld, " "schema: %s, SQL: %s, rcode: %d wsrep_error: %s", (long long)thd->real_id, thd->db.str, - thd->query(), ret, wsrep::to_c_string(cs.current_error())); + thd->query(), ret, err.c_str()); goto fail; } } @@ -1927,7 +1930,12 @@ static void wsrep_TOI_end(THD *thd) { if (wsrep_thd_is_local_toi(thd)) { wsrep_set_SE_checkpoint(client_state.toi_meta().gtid()); - int ret= client_state.leave_toi_local(wsrep::mutable_buffer()); + wsrep::mutable_buffer err; + if (thd->is_error() && !wsrep_must_ignore_error(thd)) + { + wsrep_store_error(thd, err); + } + int const ret= client_state.leave_toi_local(err); if (!ret) { WSREP_DEBUG("TO END: %lld", client_state.toi_meta().seqno().get()); @@ -2418,7 +2426,7 @@ int wsrep_must_ignore_error(THD* thd) const uint flags= sql_command_flags[thd->lex->sql_command]; DBUG_ASSERT(error); - DBUG_ASSERT(wsrep_thd_is_toi(thd) || wsrep_thd_is_applying(thd)); + DBUG_ASSERT(wsrep_thd_is_toi(thd)); if ((wsrep_ignore_apply_errors & WSREP_IGNORE_ERRORS_ON_DDL)) goto ignore_error; diff --git a/sql/wsrep_schema.cc b/sql/wsrep_schema.cc index 0fb0adca2bb..ee0fc8f12cc 100644 --- a/sql/wsrep_schema.cc +++ b/sql/wsrep_schema.cc @@ -1289,7 +1289,7 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd) goto out; } - while (true) + while (0 == error) { if ((error= Wsrep_schema_impl::next_record(frag_table)) == 0) { @@ -1344,19 +1344,23 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd) } applier->store_globals(); wsrep::mutable_buffer unused; - applier->apply_write_set(ws_meta, data, unused); - applier->after_apply(); + if ((ret= applier->apply_write_set(ws_meta, data, unused)) != 0) + { + WSREP_ERROR("SR trx recovery applying returned %d", ret); + } + else + { + applier->after_apply(); + } storage_service.store_globals(); } else if (error == HA_ERR_END_OF_FILE) { ret= 0; - break; } else { WSREP_ERROR("SR table scan returned error %d", error); - break; } } Wsrep_schema_impl::end_scan(frag_table); diff --git a/sql/wsrep_trans_observer.h b/sql/wsrep_trans_observer.h index 69756786dd2..118525bb908 100644 --- a/sql/wsrep_trans_observer.h +++ b/sql/wsrep_trans_observer.h @@ -292,9 +292,7 @@ static inline int wsrep_before_commit(THD* thd, bool all) Return zero on succes, non-zero on failure. */ -static inline int wsrep_ordered_commit(THD* thd, - bool all, - const wsrep_apply_error&) +static inline int wsrep_ordered_commit(THD* thd, bool all) { DBUG_ENTER("wsrep_ordered_commit"); WSREP_DEBUG("wsrep_ordered_commit: %d", wsrep_is_real(thd, all)); |