diff options
Diffstat (limited to 'sql/wsrep_applier.cc')
-rw-r--r-- | sql/wsrep_applier.cc | 84 |
1 files changed, 46 insertions, 38 deletions
diff --git a/sql/wsrep_applier.cc b/sql/wsrep_applier.cc index fd51dbf9439..4005de22e72 100644 --- a/sql/wsrep_applier.cc +++ b/sql/wsrep_applier.cc @@ -1,4 +1,4 @@ -/* Copyright (C) 2013-2015 Codership Oy <info@codership.com> +/* Copyright (C) 2013-2019 Codership Oy <info@codership.com> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -24,7 +24,6 @@ #include "wsrep_trans_observer.h" #include "slave.h" // opt_log_slave_updates -#include "log_event.h" // class THD, EVENT_LEN_OFFSET, etc. #include "debug_sync.h" /* @@ -60,7 +59,6 @@ static Log_event* wsrep_read_log_event( } #include "transaction.h" // trans_commit(), trans_rollback() -#include "rpl_rli.h" // class Relay_log_info; void wsrep_set_apply_format(THD* thd, Format_description_log_event* ev) { @@ -84,7 +82,7 @@ wsrep_get_apply_format(THD* thd) return thd->wsrep_rgi->rli->relay_log.description_event_for_exec; } -void wsrep_apply_error::store(const THD* const thd) +void wsrep_store_error(const THD* const thd, wsrep::mutable_buffer& dst) { Diagnostics_area::Sql_condition_iterator it= thd->get_stmt_da()->sql_conditions(); @@ -92,27 +90,10 @@ void wsrep_apply_error::store(const THD* const thd) static size_t const max_len= 2*MAX_SLAVE_ERRMSG; // 2x so that we have enough - if (NULL == str_) - { - // this must be freeable by standard free() - str_= static_cast<char*>(malloc(max_len)); - if (NULL == str_) - { - WSREP_ERROR("Failed to allocate %zu bytes for error buffer.", max_len); - len_= 0; - return; - } - } - else - { - /* This is possible when we invoke rollback after failed applying. - * In this situation DA should not be reset yet and should contain - * all previous errors from applying and new ones from rollbacking, - * so we just overwrite is from scratch */ - } + dst.resize(max_len); - char* slider= str_; - const char* const buf_end= str_ + max_len - 1; // -1: leave space for \0 + char* slider= dst.data(); + const char* const buf_end= slider + max_len - 1; // -1: leave space for \0 for (cond= it++; cond && slider < buf_end; cond= it++) { @@ -123,12 +104,17 @@ void wsrep_apply_error::store(const THD* const thd) err_str, err_code); } - *slider= '\0'; - len_= slider - str_ + 1; // +1: add \0 + if (slider != dst.data()) + { + *slider= '\0'; + slider++; + } + + dst.resize(slider - dst.data()); - WSREP_DEBUG("Error buffer for thd %llu seqno %lld, %zu bytes: %s", + WSREP_DEBUG("Error buffer for thd %llu seqno %lld, %zu bytes: '%s'", thd->thread_id, (long long)wsrep_thd_trx_seqno(thd), - len_, str_ ? str_ : "(null)"); + dst.size(), dst.size() ? dst.data() : "(null)"); } int wsrep_apply_events(THD* thd, @@ -145,6 +131,12 @@ int wsrep_apply_events(THD* thd, if (!buf_len) WSREP_DEBUG("empty rbr buffer to apply: %lld", (long long) wsrep_thd_trx_seqno(thd)); + thd->variables.gtid_seq_no= 0; + if (wsrep_gtid_mode) + thd->variables.gtid_domain_id= wsrep_gtid_server.domain_id; + else + thd->variables.gtid_domain_id= global_system_variables.gtid_domain_id; + while (buf_len) { int exec_res; @@ -164,26 +156,42 @@ int wsrep_apply_events(THD* thd, case FORMAT_DESCRIPTION_EVENT: wsrep_set_apply_format(thd, (Format_description_log_event*)ev); continue; -#ifdef GTID_SUPPORT - case GTID_LOG_EVENT: - { - Gtid_log_event* gev= (Gtid_log_event*)ev; - if (gev->get_gno() == 0) + case GTID_EVENT: { - /* Skip GTID log event to make binlog to generate LTID on commit */ + Gtid_log_event *gtid_ev= (Gtid_log_event*)ev; + thd->variables.server_id= gtid_ev->server_id; + thd->variables.gtid_domain_id= gtid_ev->domain_id; + if ((gtid_ev->server_id == wsrep_gtid_server.server_id) && + (gtid_ev->domain_id == wsrep_gtid_server.domain_id)) + { + thd->variables.wsrep_gtid_seq_no= gtid_ev->seq_no; + } + else + { + thd->variables.gtid_seq_no= gtid_ev->seq_no; + } delete ev; - continue; } - } -#endif /* GTID_SUPPORT */ + continue; default: break; } + + if (!thd->variables.gtid_seq_no && wsrep_thd_is_toi(thd) && + (ev->get_type_code() == QUERY_EVENT)) + { + uint64 seqno= wsrep_gtid_server.seqno_inc(); + thd->wsrep_current_gtid_seqno= seqno; + if (mysql_bin_log.is_open() && wsrep_gtid_mode) + { + thd->variables.gtid_seq_no= seqno; + } + } /* Use the original server id for logging. */ thd->set_server_id(ev->server_id); thd->set_time(); // time the query - thd->transaction.start_time.reset(thd); + thd->transaction->start_time.reset(thd); thd->lex->current_select= 0; if (!ev->when) { |