summaryrefslogtreecommitdiff
path: root/sql/wsrep_applier.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/wsrep_applier.cc')
-rw-r--r--sql/wsrep_applier.cc84
1 files changed, 46 insertions, 38 deletions
diff --git a/sql/wsrep_applier.cc b/sql/wsrep_applier.cc
index fd51dbf9439..4005de22e72 100644
--- a/sql/wsrep_applier.cc
+++ b/sql/wsrep_applier.cc
@@ -1,4 +1,4 @@
-/* Copyright (C) 2013-2015 Codership Oy <info@codership.com>
+/* Copyright (C) 2013-2019 Codership Oy <info@codership.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -24,7 +24,6 @@
#include "wsrep_trans_observer.h"
#include "slave.h" // opt_log_slave_updates
-#include "log_event.h" // class THD, EVENT_LEN_OFFSET, etc.
#include "debug_sync.h"
/*
@@ -60,7 +59,6 @@ static Log_event* wsrep_read_log_event(
}
#include "transaction.h" // trans_commit(), trans_rollback()
-#include "rpl_rli.h" // class Relay_log_info;
void wsrep_set_apply_format(THD* thd, Format_description_log_event* ev)
{
@@ -84,7 +82,7 @@ wsrep_get_apply_format(THD* thd)
return thd->wsrep_rgi->rli->relay_log.description_event_for_exec;
}
-void wsrep_apply_error::store(const THD* const thd)
+void wsrep_store_error(const THD* const thd, wsrep::mutable_buffer& dst)
{
Diagnostics_area::Sql_condition_iterator it=
thd->get_stmt_da()->sql_conditions();
@@ -92,27 +90,10 @@ void wsrep_apply_error::store(const THD* const thd)
static size_t const max_len= 2*MAX_SLAVE_ERRMSG; // 2x so that we have enough
- if (NULL == str_)
- {
- // this must be freeable by standard free()
- str_= static_cast<char*>(malloc(max_len));
- if (NULL == str_)
- {
- WSREP_ERROR("Failed to allocate %zu bytes for error buffer.", max_len);
- len_= 0;
- return;
- }
- }
- else
- {
- /* This is possible when we invoke rollback after failed applying.
- * In this situation DA should not be reset yet and should contain
- * all previous errors from applying and new ones from rollbacking,
- * so we just overwrite is from scratch */
- }
+ dst.resize(max_len);
- char* slider= str_;
- const char* const buf_end= str_ + max_len - 1; // -1: leave space for \0
+ char* slider= dst.data();
+ const char* const buf_end= slider + max_len - 1; // -1: leave space for \0
for (cond= it++; cond && slider < buf_end; cond= it++)
{
@@ -123,12 +104,17 @@ void wsrep_apply_error::store(const THD* const thd)
err_str, err_code);
}
- *slider= '\0';
- len_= slider - str_ + 1; // +1: add \0
+ if (slider != dst.data())
+ {
+ *slider= '\0';
+ slider++;
+ }
+
+ dst.resize(slider - dst.data());
- WSREP_DEBUG("Error buffer for thd %llu seqno %lld, %zu bytes: %s",
+ WSREP_DEBUG("Error buffer for thd %llu seqno %lld, %zu bytes: '%s'",
thd->thread_id, (long long)wsrep_thd_trx_seqno(thd),
- len_, str_ ? str_ : "(null)");
+ dst.size(), dst.size() ? dst.data() : "(null)");
}
int wsrep_apply_events(THD* thd,
@@ -145,6 +131,12 @@ int wsrep_apply_events(THD* thd,
if (!buf_len) WSREP_DEBUG("empty rbr buffer to apply: %lld",
(long long) wsrep_thd_trx_seqno(thd));
+ thd->variables.gtid_seq_no= 0;
+ if (wsrep_gtid_mode)
+ thd->variables.gtid_domain_id= wsrep_gtid_server.domain_id;
+ else
+ thd->variables.gtid_domain_id= global_system_variables.gtid_domain_id;
+
while (buf_len)
{
int exec_res;
@@ -164,26 +156,42 @@ int wsrep_apply_events(THD* thd,
case FORMAT_DESCRIPTION_EVENT:
wsrep_set_apply_format(thd, (Format_description_log_event*)ev);
continue;
-#ifdef GTID_SUPPORT
- case GTID_LOG_EVENT:
- {
- Gtid_log_event* gev= (Gtid_log_event*)ev;
- if (gev->get_gno() == 0)
+ case GTID_EVENT:
{
- /* Skip GTID log event to make binlog to generate LTID on commit */
+ Gtid_log_event *gtid_ev= (Gtid_log_event*)ev;
+ thd->variables.server_id= gtid_ev->server_id;
+ thd->variables.gtid_domain_id= gtid_ev->domain_id;
+ if ((gtid_ev->server_id == wsrep_gtid_server.server_id) &&
+ (gtid_ev->domain_id == wsrep_gtid_server.domain_id))
+ {
+ thd->variables.wsrep_gtid_seq_no= gtid_ev->seq_no;
+ }
+ else
+ {
+ thd->variables.gtid_seq_no= gtid_ev->seq_no;
+ }
delete ev;
- continue;
}
- }
-#endif /* GTID_SUPPORT */
+ continue;
default:
break;
}
+
+ if (!thd->variables.gtid_seq_no && wsrep_thd_is_toi(thd) &&
+ (ev->get_type_code() == QUERY_EVENT))
+ {
+ uint64 seqno= wsrep_gtid_server.seqno_inc();
+ thd->wsrep_current_gtid_seqno= seqno;
+ if (mysql_bin_log.is_open() && wsrep_gtid_mode)
+ {
+ thd->variables.gtid_seq_no= seqno;
+ }
+ }
/* Use the original server id for logging. */
thd->set_server_id(ev->server_id);
thd->set_time(); // time the query
- thd->transaction.start_time.reset(thd);
+ thd->transaction->start_time.reset(thd);
thd->lex->current_select= 0;
if (!ev->when)
{