summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrei Elkin <andrei.elkin@mariadb.com>2019-05-10 23:26:56 +0300
committerAndrei Elkin <andrei.elkin@mariadb.com>2019-05-15 19:06:24 +0300
commit3ae642bc77dafa0984f7541ffc19fd5dbb8d15fc (patch)
treef352c50ece80033389dabc120609e7ba32ed2a6c
parent0de38e461f8864a916837f4e9808ffbd33d9b741 (diff)
downloadmariadb-git-3ae642bc77dafa0984f7541ffc19fd5dbb8d15fc.tar.gz
MDEV-7974
Almost ultimate merge with the XA refactoring (to cover is_binlogged). todo: test out the master side; etc.
-rw-r--r--sql/handler.h10
-rw-r--r--sql/log.cc86
-rw-r--r--sql/log_event.cc151
-rw-r--r--sql/log_event.h119
-rw-r--r--sql/transaction.cc121
-rw-r--r--sql/transaction.h1
-rw-r--r--sql/xa.cc296
-rw-r--r--sql/xa.h26
8 files changed, 491 insertions, 319 deletions
diff --git a/sql/handler.h b/sql/handler.h
index 16cc7c70c0f..dce2a8a2c88 100644
--- a/sql/handler.h
+++ b/sql/handler.h
@@ -831,16 +831,6 @@ struct xid_t {
long gtrid_length;
long bqual_length;
char data[XIDDATASIZE]; // not \0-terminated !
- /*
- The size of the string containing serialized Xid representation
- is computed as a sum of
- eight as the number of formatting symbols (X'',X'',)
- plus 2 x XIDDATASIZE (2 due to hex format),
- plus space for decimal digits of XID::formatID,
- plus one for 0x0.
- */
- static const uint ser_buf_size=
- 8 + 2 * XIDDATASIZE + 4 * sizeof(long) + 1;
xid_t() {} /* Remove gcc warning */
bool eq(struct xid_t *xid)
diff --git a/sql/log.cc b/sql/log.cc
index d6cee2bc39e..84dba2a41f2 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -91,6 +91,9 @@ static bool binlog_savepoint_rollback_can_release_mdl(handlerton *hton,
static int binlog_commit(handlerton *hton, THD *thd, bool all);
static int binlog_rollback(handlerton *hton, THD *thd, bool all);
static int binlog_prepare(handlerton *hton, THD *thd, bool all);
+static int binlog_xa_recover(handlerton *hton, XID *xid_list, uint len);
+static int binlog_commit_by_xid(handlerton *hton, XID *xid);
+static int binlog_rollback_by_xid(handlerton *hton, XID *xid);
static int binlog_start_consistent_snapshot(handlerton *hton, THD *thd);
static int binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr,
Log_event *end_ev, bool all, bool using_stmt,
@@ -1695,6 +1698,9 @@ int binlog_init(void *p)
binlog_hton->commit= binlog_commit;
binlog_hton->rollback= binlog_rollback;
binlog_hton->prepare= binlog_prepare;
+ binlog_hton->recover= binlog_xa_recover;
+ binlog_hton->commit_by_xid= binlog_commit_by_xid;
+ binlog_hton->rollback_by_xid= binlog_rollback_by_xid;
binlog_hton->start_consistent_snapshot= binlog_start_consistent_snapshot;
binlog_hton->flags= HTON_NOT_USER_SELECTABLE | HTON_HIDDEN;
return 0;
@@ -1967,24 +1973,58 @@ static int binlog_prepare(handlerton *hton, THD *thd, bool all)
}
-static int serialize_xid(XID *xid, char *buf)
+static int binlog_xa_recover(handlerton *hton __attribute__((unused)),
+ XID *xid_list __attribute__((unused)),
+ uint len __attribute__((unused)))
{
- size_t size;
- buf[0]= '\'';
- memcpy(buf+1, xid->data, xid->gtrid_length);
- size= xid->gtrid_length + 2;
- buf[size-1]= '\'';
- if (xid->bqual_length == 0 && xid->formatID == 1)
- return size;
+ /* Does nothing. */
+ return 0;
+}
+
+inline int binlog_write_by_xid(THD *thd, XID *xid, char *buf,
+ const char *query, size_t q_len)
+{
+ int res= 0;
+
+ if ((WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) &&
+ thd->transaction.xid_state.is_binlogged())
+ {
+ size_t buflen;
+
+ memcpy(buf, query, q_len);
+ buflen= q_len + strlen(static_cast<event_xid_t*>(xid)->serialize(buf+q_len));
+ res= thd->binlog_query(THD::THD::STMT_QUERY_TYPE, buf, buflen,
+ FALSE, TRUE, TRUE, 0);
+
+ DBUG_ASSERT(!res || thd->is_error());
+ }
+
+ return res;
+}
- memcpy(buf+size, ", '", 3);
- memcpy(buf+size+3, xid->data+xid->gtrid_length, xid->bqual_length);
- size+= 3 + xid->bqual_length;
- buf[size]= '\'';
- size++;
- if (xid->formatID != 1)
- size+= sprintf(buf+size, ", %ld", xid->formatID);
- return size;
+static int binlog_commit_by_xid(handlerton *hton, XID *xid)
+{
+ THD *thd= current_thd;
+ const char query[]= "XA COMMIT ";
+ const size_t q_len= sizeof(query) - 1; // do not count trailing 0
+ char buf[q_len + ser_buf_size];
+
+ DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_COMMIT);
+
+ return binlog_write_by_xid(thd, xid, buf, query, q_len);
+}
+
+
+static int binlog_rollback_by_xid(handlerton *hton, XID *xid)
+{
+ THD *thd= current_thd;
+ const char query[]= "XA ROLLBACK ";
+ const size_t q_len= sizeof(query) - 1; // do not count trailing 0
+ char buf[q_len + ser_buf_size];
+
+ DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_ROLLBACK);
+
+ return binlog_write_by_xid(thd, xid, buf, query, q_len);
}
@@ -9942,20 +9982,26 @@ int TC_LOG_BINLOG::log_xa_prepare(THD *thd, bool all)
binlog_cache_mngr *cache_mngr= thd->binlog_setup_trx_data();
XID *xid= &thd->transaction.xid_state.xid_cache_element->xid;
{
+ // todo assert wsrep_simulate || is_open()
+
/*
Log the XA END event first.
We don't do that in trans_xa_end() as XA COMMIT ONE PHASE
is logged as simple BEGIN/COMMIT so the XA END should
not get to the log.
*/
- const size_t xc_len= sizeof("XA END ") - 1; // do not count trailing 0
- char buf[xc_len + xid_t::ser_buf_size];
+ const char query[]= "XA END ";
+ const size_t q_len= sizeof(query) - 1; // do not count trailing 0
+ char buf[q_len + ser_buf_size];
size_t buflen;
binlog_cache_data *cache_data;
IO_CACHE *file;
- memcpy(buf, "XA END ", xc_len);
- buflen= xc_len + serialize_xid(xid, buf+xc_len);
+ // TODO binlog_query
+
+ memcpy(buf, query, q_len);
+ buflen= q_len +
+ strlen(static_cast<event_xid_t*>(xid)->serialize(buf + q_len));
cache_data= cache_mngr->get_binlog_cache_data(true);
file= &cache_data->cache_log;
thd->lex->sql_command= SQLCOM_XA_END;
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 57ec9ec2eb7..5cd69d037b1 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -7978,11 +7978,25 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg,
/* Preserve any DDL or WAITED flag in the slave's binlog. */
if (thd_arg->rgi_slave)
flags2|= (thd_arg->rgi_slave->gtid_ev_flags2 & (FL_DDL|FL_WAITED));
- if (thd->transaction.xid_state.xid_cache_element->xa_state == XA_IDLE &&
+ if (thd->transaction.xid_state.xid_cache_element &&
thd->lex->xa_opt != XA_ONE_PHASE)
{
+ DBUG_ASSERT(thd->transaction.xid_state.xid_cache_element->xa_state == XA_IDLE);
+
flags2|= FL_PREPARED_XA;
- xid= thd->transaction.xid_state.xid_cache_element->xid;
+
+ // No array assignment: xid.data= thd->transaction.xid_state.xid_cache_element->xid.data;
+ //xid= * static_cast<event_xid_t*>
+ // (&thd->transaction.xid_state.xid_cache_element->xid);
+
+ xid.formatID= thd->transaction.xid_state.xid_cache_element->xid.formatID;
+ xid.gtrid_length= thd->transaction.xid_state.xid_cache_element->xid.gtrid_length;
+ xid.bqual_length= thd->transaction.xid_state.xid_cache_element->xid.bqual_length;
+ if (xid.formatID != -1) // TODO: -1 why
+ {
+ long data_length= xid.bqual_length + xid.gtrid_length;
+ memcpy(xid.data, thd->transaction.xid_state.xid_cache_element->xid.data, data_length);
+ }
}
}
@@ -8099,9 +8113,14 @@ Gtid_log_event::make_compatible_event(String *packet, bool *need_dummy_event,
void
Gtid_log_event::pack_info(Protocol *protocol)
{
- char buf[6+5+10+1+10+1+20+1+4+20+1+5+128];
+ char buf[6+5+10+1+10+1+20+1+4+20+1+5+128 /* todo: const:s */];
char *p;
- p = strmov(buf, (flags2 & FL_STANDALONE ? "GTID " : "BEGIN GTID "));
+ p = strmov(buf, (flags2 & FL_STANDALONE ? "GTID " :
+ flags2 & FL_PREPARED_XA ? "XA START " : "BEGIN GTID "));
+ if (flags2 & FL_PREPARED_XA)
+ {
+ p += sprintf(p, "%s GTID ", xid.serialize());
+ }
p= longlong10_to_str(domain_id, p, 10);
*p++= '-';
p= longlong10_to_str(server_id, p, 10);
@@ -8113,12 +8132,6 @@ Gtid_log_event::pack_info(Protocol *protocol)
p= longlong10_to_str(commit_id, p, 10);
}
- if (flags2 & FL_PREPARED_XA)
- {
- p= strmov(p, " XID :");
- p= strnmov(p, xid.data, xid.bqual_length + xid.gtrid_length);
- }
-
protocol->store(buf, p-buf, &my_charset_bin);
}
@@ -8310,18 +8323,8 @@ Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
}
if ((flags2 & FL_PREPARED_XA) && !is_flashback)
{
- my_b_write_string(&cache, "XA START '");
- my_b_write(&cache, (uchar *) xid.data, xid.gtrid_length);
- my_b_write_string(&cache, "'");
- if (xid.bqual_length > 0 || xid.formatID != 1)
- {
- my_b_write_string(&cache, ", '");
- my_b_write(&cache, (uchar *) xid.data+xid.gtrid_length, xid.bqual_length);
- my_b_write_string(&cache, "'");
- if (xid.formatID != 1)
- if (my_b_printf(&cache, ", %d", xid.formatID))
- goto err;
- }
+ my_b_write_string(&cache, "XA START ");
+ my_b_write(&cache, (uchar*) xid.serialize(), strlen(xid.buf));
if (my_b_printf(&cache, "%s\n", print_event_info->delimiter))
goto err;
}
@@ -9170,88 +9173,6 @@ int Xid_log_event::do_commit()
#endif /* !MYSQL_CLIENT */
-#ifdef TODO7974
-/**
- Function serializes XID which is characterized by by four last arguments
- of the function.
- Serialized XID is presented in valid hex format and is returned to
- the caller in a buffer pointed by the first argument.
- The buffer size provived by the caller must be not less than
- 8 + 2 * XIDDATASIZE + 4 * sizeof(XID::formatID) + 1, see
- XID::serialize_xid() that is a caller and plugin.h for XID declaration.
-
- @param buf pointer to a buffer allocated for storing serialized data
-
- @return the value of the buffer pointer
-*/
-
-char *XA_prepare_log_event::event_xid_t::serialize(char *buf) const
-{
- int i;
- char *c= buf;
- /*
- Build a string like following pattern:
- X'hex11hex12...hex1m',X'hex21hex22...hex2n',11
- and store it into buf.
- Here hex1i and hex2k are hexadecimals representing XID's internal
- raw bytes (1 <= i <= m, 1 <= k <= n), and `m' and `n' even numbers
- half of which corresponding to the lengths of XID's components.
- */
- c[0]= 'X';
- c[1]= '\'';
- c+= 2;
- for (i= 0; i < gtrid_length; i++)
- {
- c[0]=_dig_vec_lower[((uchar*) data)[i] >> 4];
- c[1]=_dig_vec_lower[((uchar*) data)[i] & 0x0f];
- c+= 2;
- }
- c[0]= '\'';
- c[1]= ',';
- c[2]= 'X';
- c[3]= '\'';
- c+= 4;
-
- for (; i < gtrid_length + bqual_length; i++)
- {
- c[0]=_dig_vec_lower[((uchar*) data)[i] >> 4];
- c[1]=_dig_vec_lower[((uchar*) data)[i] & 0x0f];
- c+= 2;
- }
- c[0]= '\'';
- sprintf(c+1, ",%lu", formatID);
-
- return buf;
-}
-#endif /*TODO7974*/
-
-char *XA_prepare_log_event::event_xid_t::serialize(char *buf) const
-{
- char *c= buf;
-
- c[0]= '\'';
- memcpy(c+1, data, gtrid_length);
- c[gtrid_length+1]= '\'';
- c+= gtrid_length + 2;
-
- if (bqual_length)
- {
- c[0]= ',';
- c[1]= '\'';
- memcpy(c+2, data+gtrid_length, bqual_length);
- c[bqual_length+2]= '\'';
- c+= bqual_length+3;
- }
-
- if (formatID != 1)
- sprintf(c, ",%lu", formatID);
- else
- c[0]=0;
-
- return buf;
-}
-
-
/**************************************************************************
XA_prepare_log_event methods
**************************************************************************/
@@ -9295,15 +9216,15 @@ XA_prepare_log_event(const char* buf,
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
void XA_prepare_log_event::pack_info(Protocol *protocol)
{
- char buf[ser_buf_size];
- char query[sizeof("XA COMMIT ONE PHASE") + 1 + sizeof(buf)];
+ //char buf[ser_buf_size];
+ char query[sizeof("XA COMMIT ONE PHASE") + 1 + ser_buf_size]; //sizeof(buf)];
/* RHS of the following assert is unknown to client sources */
- compile_time_assert(ser_buf_size == XID::ser_buf_size);
- m_xid.serialize(buf);
+ // TODO: why ? compile_time_assert(ser_buf_size == XID::ser_buf_size);
+ //m_xid.serialize();
sprintf(query,
(one_phase ? "XA COMMIT %s ONE PHASE" : "XA PREPARE %s"),
- buf);
+ m_xid.serialize());
protocol->store(query, strlen(query), &my_charset_bin);
}
@@ -9321,9 +9242,9 @@ bool XA_prepare_log_event::write()
int4store(data+(1+4), static_cast<XID*>(xid)->gtrid_length);
int4store(data+(1+4+4), static_cast<XID*>(xid)->bqual_length);
- DBUG_ASSERT(xid_bufs_size == sizeof(data) - 1);
+ DBUG_ASSERT(xid_subheader_no_data == sizeof(data) - 1);
- return write_header(sizeof(one_phase_byte) + xid_bufs_size +
+ return write_header(sizeof(one_phase_byte) + xid_subheader_no_data +
static_cast<XID*>(xid)->gtrid_length +
static_cast<XID*>(xid)->bqual_length) ||
write_data(data, sizeof(data)) ||
@@ -9340,19 +9261,19 @@ bool XA_prepare_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
{
Write_on_release_cache cache(&print_event_info->head_cache, file,
Write_on_release_cache::FLUSH_F, this);
- char buf[ser_buf_size];
+ //char buf[ser_buf_size];
- m_xid.serialize(buf);
+ m_xid.serialize();
if (!print_event_info->short_form)
{
print_header(&cache, print_event_info, FALSE);
- if (my_b_printf(&cache, "\tXID = %s\n", buf))
+ if (my_b_printf(&cache, "\tXID = %s\n", m_xid.buf))
goto error;
}
if (my_b_printf(&cache, "XA PREPARE %s\n%s\n",
- buf, print_event_info->delimiter))
+ m_xid.buf, print_event_info->delimiter))
goto error;
return cache.flush_data();
diff --git a/sql/log_event.h b/sql/log_event.h
index 89507d40581..53b36b857cd 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -3107,26 +3107,107 @@ private:
with other servers.
*/
-class XA_prepare_log_event: public Xid_apply_log_event
+/**
+ Function serializes XID which is characterized by by four last arguments
+ of the function.
+ Serialized XID is presented in valid hex format and is returned to
+ the caller in a buffer pointed by the first argument.
+ The buffer size provived by the caller must be not less than
+ 8 + 2 * XIDDATASIZE + 4 * sizeof(XID::formatID) + 1, see
+ MYSQL_{,XID} definitions.
+
+ @param buf pointer to a buffer allocated for storing serialized data
+ @param fmt formatID value
+ @param gln gtrid_length value
+ @param bln bqual_length value
+ @param dat data value
+
+ @return the value of the buffer pointer
+*/
+
+inline char *serialize_xid(char *buf, long fmt, long gln, long bln,
+ const char *dat)
{
-protected:
- /* The event_xid_t members were copied from handler.h */
- struct event_xid_t
+ int i;
+ char *c= buf;
+ /*
+ Build a string like following pattern:
+ X'hex11hex12...hex1m',X'hex21hex22...hex2n',11
+ and store it into buf.
+ Here hex1i and hex2k are hexadecimals representing XID's internal
+ raw bytes (1 <= i <= m, 1 <= k <= n), and `m' and `n' even numbers
+ half of which corresponding to the lengths of XID's components.
+ */
+ c[0]= 'X';
+ c[1]= '\'';
+ c+= 2;
+ for (i= 0; i < gln; i++)
{
- long formatID;
- long gtrid_length;
- long bqual_length;
- char data[MYSQL_XIDDATASIZE]; // not \0-terminated !
- char *serialize(char *buf) const;
- };
+ c[0]=_dig_vec_lower[((uchar*) dat)[i] >> 4];
+ c[1]=_dig_vec_lower[((uchar*) dat)[i] & 0x0f];
+ c+= 2;
+ }
+ c[0]= '\'';
+ c[1]= ',';
+ c[2]= 'X';
+ c[3]= '\'';
+ c+= 4;
- /* size of serialization buffer is explained in $MYSQL/sql/xa.h. */
- static const uint ser_buf_size=
- 8 + 2 * MYSQL_XIDDATASIZE + 4 * sizeof(long) + 1;
+ for (; i < gln + bln; i++)
+ {
+ c[0]=_dig_vec_lower[((uchar*) dat)[i] >> 4];
+ c[1]=_dig_vec_lower[((uchar*) dat)[i] & 0x0f];
+ c+= 2;
+ }
+ c[0]= '\'';
+ sprintf(c+1, ",%lu", fmt);
+
+ return buf;
+}
+
+/*
+ The size of the string containing serialized Xid representation
+ is computed as a sum of
+ eight as the number of formatting symbols (X'',X'',)
+ plus 2 x XIDDATASIZE (2 due to hex format),
+ plus space for decimal digits of XID::formatID,
+ plus one for 0x0.
+*/
+static const uint ser_buf_size=
+ 8 + 2 * MYSQL_XIDDATASIZE + 4 * sizeof(long) + 1;
+
+struct event_mysql_xid_t : MYSQL_XID
+{
+ char buf[ser_buf_size];
+ char *serialize()
+ {
+ return serialize_xid(buf, formatID, gtrid_length, bqual_length, data);
+ }
+};
+
+#ifndef MYSQL_CLIENT
+struct event_xid_t : XID
+{
+ char buf[ser_buf_size];
+
+ char *serialize(char *buf_arg)
+ {
+ return serialize_xid(buf_arg, formatID, gtrid_length, bqual_length, data);
+ }
+ char *serialize()
+ {
+ return serialize(buf);
+ }
+};
+#endif
+
+class XA_prepare_log_event: public Xid_apply_log_event
+{
+protected:
- /* Total size of buffers to hold serialized members of XID struct */
- static const int xid_bufs_size= 12;
- event_xid_t m_xid;
+ /* Constant contribution to subheader in write() by members of XID struct. */
+ static const int xid_subheader_no_data= 12;
+ event_mysql_xid_t m_xid;
void *xid;
bool one_phase;
@@ -3149,7 +3230,7 @@ public:
Log_event_type get_type_code() { return XA_PREPARE_LOG_EVENT; }
int get_data_size()
{
- return xid_bufs_size + m_xid.gtrid_length + m_xid.bqual_length;
+ return xid_subheader_no_data + m_xid.gtrid_length + m_xid.bqual_length;
}
#ifdef MYSQL_SERVER
@@ -3476,9 +3557,9 @@ public:
uint64 commit_id;
uint32 domain_id;
#ifdef MYSQL_SERVER
- XID xid;
+ event_xid_t xid;
#else
- struct st_mysql_xid xid;
+ event_mysql_xid_t xid;
#endif
uchar flags2;
diff --git a/sql/transaction.cc b/sql/transaction.cc
index ef560fbecbf..2887ae763df 100644
--- a/sql/transaction.cc
+++ b/sql/transaction.cc
@@ -700,124 +700,3 @@ bool trans_release_savepoint(THD *thd, LEX_CSTRING name)
DBUG_RETURN(MY_TEST(res));
}
-
-
-void attach_native_trx(THD *thd);
-/**
- This is a specific to "slave" applier collection of standard cleanup
- actions to reset XA transaction states at the end of XA prepare rather than
- to do it at the transaction commit, see @c ha_commit_one_phase.
- THD of the slave applier is dissociated from a transaction object in engine
- that continues to exist there.
-
- @param THD current thread
- @return the value of is_error()
-*/
-
-bool applier_reset_xa_trans(THD *thd)
-{
- thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
- thd->server_status&=
- ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
- DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
- //TODO: review
- //xid_cache_delete(thd, xid_s);
- //if (xid_cache_insert(&xid_s->xid_cache_element->xid, XA_PREPARED, xid_s->is_binlogged))
- // return true;
- thd->transaction.xid_state.xid_cache_element->acquired_to_recovered();
- thd->transaction.xid_state.xid_cache_element= 0;
-
- attach_native_trx(thd);
- thd->transaction.cleanup();
- //TODO: thd->transaction.xid_state.xid_cache_element->xa_state= XA_NOTR;
- thd->mdl_context.release_transactional_locks();
-
- return thd->is_error();
-#ifdef p7974
- Transaction_ctx *trn_ctx= thd->get_transaction();
- XID_STATE *xid_state= trn_ctx->xid_state();
- /*
- In the following the server transaction state gets reset for
- a slave applier thread similarly to xa_commit logics
- except commit does not run.
- */
- thd->variables.option_bits&= ~OPTION_BEGIN;
- trn_ctx->reset_unsafe_rollback_flags(Transaction_ctx::STMT);
- thd->server_status&= ~SERVER_STATUS_IN_TRANS;
- /* Server transaction ctx is detached from THD */
- transaction_cache_detach(trn_ctx);
- xid_state->reset();
- /*
- The current engine transactions is detached from THD, and
- previously saved is restored.
- */
- attach_native_trx(thd);
- trn_ctx->set_ha_trx_info(Transaction_ctx::SESSION, NULL);
- trn_ctx->set_no_2pc(Transaction_ctx::SESSION, false);
- trn_ctx->cleanup();
-#ifdef HAVE_PSI_TRANSACTION_INTERFACE
- MYSQL_COMMIT_TRANSACTION(thd->m_transaction_psi);
- thd->m_transaction_psi= NULL;
-#endif
-
-#endif /*p7974*/
- return thd->is_error();
-}
-
-
-/**
- The function detaches existing storage engines transaction
- context from thd. Backup area to save it is provided to low level
- storage engine function.
-
- is invoked by plugin_foreach() after
- trans_xa_start() for each storage engine.
-
- @param[in,out] thd Thread context
- @param plugin Reference to handlerton
-
- @return FALSE on success, TRUE otherwise.
-*/
-
-my_bool detach_native_trx(THD *thd, plugin_ref plugin, void *unused)
-{
- handlerton *hton= plugin_hton(plugin);
- if (hton->replace_native_transaction_in_thd)
- hton->replace_native_transaction_in_thd(thd, NULL,
- thd_ha_data_backup(thd, hton));
-
- return FALSE;
-
-}
-
-/**
- The function restores previously saved storage engine transaction context.
-
- @param thd Thread context
-*/
-void attach_native_trx(THD *thd)
-{
- Ha_trx_info *ha_info= thd->transaction.all.ha_list;
- Ha_trx_info *ha_info_next;
-
- if (ha_info)
- {
- for (; ha_info; ha_info= ha_info_next)
- {
- handlerton *hton= ha_info->ht();
- if (hton->replace_native_transaction_in_thd)
- {
- /* restore the saved original engine transaction's link with thd */
- void **trx_backup= thd_ha_data_backup(thd, hton);
-
- hton->
- replace_native_transaction_in_thd(thd, *trx_backup, NULL);
- *trx_backup= NULL;
- }
- ha_info_next= ha_info->next();
- ha_info->reset();
- }
- }
- thd->transaction.all.ha_list= 0;
- thd->transaction.all.no_2pc= 0;
-}
diff --git a/sql/transaction.h b/sql/transaction.h
index 5eaa2b00027..eaf2a4da919 100644
--- a/sql/transaction.h
+++ b/sql/transaction.h
@@ -40,5 +40,4 @@ bool trans_rollback_to_savepoint(THD *thd, LEX_CSTRING name);
bool trans_release_savepoint(THD *thd, LEX_CSTRING name);
void trans_reset_one_shot_chistics(THD *thd);
-
#endif /* TRANSACTION_H */
diff --git a/sql/xa.cc b/sql/xa.cc
index 2aa798fae23..dbd0fc272e4 100644
--- a/sql/xa.cc
+++ b/sql/xa.cc
@@ -286,7 +286,7 @@ static bool xa_trans_force_rollback(THD *thd)
bool trans_xa_start(THD *thd)
{
DBUG_ENTER("trans_xa_start");
-
+ // TODO/FIXME: s/if is_explicit_XA()/assert/
if (thd->transaction.xid_state.is_explicit_XA() &&
thd->transaction.xid_state.xid_cache_element->xa_state == XA_IDLE &&
thd->lex->xa_opt == XA_RESUME)
@@ -314,6 +314,17 @@ bool trans_xa_start(THD *thd)
trans_rollback(thd);
DBUG_RETURN(true);
}
+
+ if (thd->variables.pseudo_slave_mode || thd->slave_thread)
+ {
+ /*
+ In case of slave thread applier or processing binlog by client,
+ detach the "native" thd's trx in favor of dynamically created.
+ */
+ plugin_foreach(thd, detach_native_trx,
+ MYSQL_STORAGE_ENGINE_PLUGIN, NULL);
+ }
+
DBUG_RETURN(FALSE);
}
@@ -361,6 +372,8 @@ bool trans_xa_end(THD *thd)
bool trans_xa_prepare(THD *thd)
{
+ int res= 1;
+
DBUG_ENTER("trans_xa_prepare");
if (!thd->transaction.xid_state.is_explicit_XA() ||
@@ -368,16 +381,41 @@ bool trans_xa_prepare(THD *thd)
thd->transaction.xid_state.er_xaer_rmfail();
else if (!thd->transaction.xid_state.xid_cache_element->xid.eq(thd->lex->xid))
my_error(ER_XAER_NOTA, MYF(0));
- else if (ha_prepare(thd))
+ else
{
- xid_cache_delete(thd, &thd->transaction.xid_state);
- my_error(ER_XA_RBROLLBACK, MYF(0));
+ /*
+ Acquire metadata lock which will ensure that COMMIT is blocked
+ by active FLUSH TABLES WITH READ LOCK (and vice versa COMMIT in
+ progress blocks FTWRL).
+
+ We allow FLUSHer to COMMIT; we assume FLUSHer knows what it does.
+ */
+ MDL_request mdl_request;
+ mdl_request.init(MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT,
+ MDL_STATEMENT);
+ if (thd->mdl_context.acquire_lock(&mdl_request,
+ thd->variables.lock_wait_timeout) ||
+ ha_prepare(thd))
+ {
+ if (!mdl_request.ticket)
+ ha_rollback_trans(thd, TRUE);
+ thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
+ thd->transaction.all.reset();
+ thd->server_status&=
+ ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
+ xid_cache_delete(thd, &thd->transaction.xid_state);
+ my_error(ER_XA_RBROLLBACK, MYF(0));
+ }
+ else
+ {
+ res= 0;
+ thd->transaction.xid_state.xid_cache_element->xa_state= XA_PREPARED;
+ if (thd->variables.pseudo_slave_mode)
+ res= applier_reset_xa_trans(thd);
+ }
}
- else
- thd->transaction.xid_state.xid_cache_element->xa_state= XA_PREPARED;
- DBUG_RETURN(thd->is_error() ||
- thd->transaction.xid_state.xid_cache_element->xa_state != XA_PREPARED);
+ DBUG_RETURN(res);
}
@@ -392,11 +430,13 @@ bool trans_xa_prepare(THD *thd)
bool trans_xa_commit(THD *thd)
{
- bool res= TRUE;
+ bool res= true;
+ XID_STATE &xid_state= thd->transaction.xid_state;
+
DBUG_ENTER("trans_xa_commit");
- if (!thd->transaction.xid_state.is_explicit_XA() ||
- !thd->transaction.xid_state.xid_cache_element->xid.eq(thd->lex->xid))
+ if (!xid_state.is_explicit_XA() ||
+ !xid_state.xid_cache_element->xid.eq(thd->lex->xid))
{
if (thd->fix_xid_hash_pins())
{
@@ -406,28 +446,62 @@ bool trans_xa_commit(THD *thd)
if (auto xs= xid_cache_search(thd, thd->lex->xid))
{
- res= xa_trans_rolled_back(xs);
- ha_commit_or_rollback_by_xid(thd->lex->xid, !res);
- xid_cache_delete(thd, xs);
+ if (thd->in_multi_stmt_transaction_mode())
+ {
+ xid_state.er_xaer_rmfail();
+ }
+ else
+ {
+ res= xa_trans_rolled_back(xs);
+ /*
+ Acquire metadata lock which will ensure that COMMIT is blocked
+ by active FLUSH TABLES WITH READ LOCK (and vice versa COMMIT in
+ progress blocks FTWRL).
+
+ We allow FLUSHer to COMMIT; we assume FLUSHer knows what it does.
+ */
+ MDL_request mdl_request;
+ mdl_request.init(MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT,
+ MDL_STATEMENT);
+ if (thd->mdl_context.acquire_lock(&mdl_request,
+ thd->variables.lock_wait_timeout))
+ {
+ /*
+ We can't rollback an XA transaction on lock failure due to
+ Innodb redo log and bin log update is involved in rollback.
+ Return error to user for a retry.
+ */
+ xid_state.er_xaer_rmfail();
+ DBUG_RETURN(res);
+ }
+ DBUG_ASSERT(!xid_state.xid_cache_element);
+
+ xid_state.xid_cache_element= xs;
+ ha_commit_or_rollback_by_xid(thd->lex->xid, !res);
+ xid_state.xid_cache_element= 0;
+
+ res= res || thd->is_error();
+ xid_cache_delete(thd, xs);
+ }
}
else
my_error(ER_XAER_NOTA, MYF(0));
DBUG_RETURN(res);
}
- if (xa_trans_rolled_back(thd->transaction.xid_state.xid_cache_element))
+ if (xa_trans_rolled_back(xid_state.xid_cache_element))
{
xa_trans_force_rollback(thd);
DBUG_RETURN(thd->is_error());
}
- else if (thd->transaction.xid_state.xid_cache_element->xa_state == XA_IDLE &&
+ else if (xid_state.xid_cache_element->xa_state == XA_IDLE &&
thd->lex->xa_opt == XA_ONE_PHASE)
{
int r= ha_commit_trans(thd, TRUE);
if ((res= MY_TEST(r)))
my_error(r == 1 ? ER_XA_RBROLLBACK : ER_XAER_RMERR, MYF(0));
}
- else if (thd->transaction.xid_state.xid_cache_element->xa_state == XA_PREPARED &&
+ else if (xid_state.xid_cache_element->xa_state == XA_PREPARED &&
thd->lex->xa_opt == XA_NONE)
{
MDL_request mdl_request;
@@ -440,26 +514,40 @@ bool trans_xa_commit(THD *thd)
We allow FLUSHer to COMMIT; we assume FLUSHer knows what it does.
*/
mdl_request.init(MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT,
- MDL_TRANSACTION);
+ MDL_STATEMENT);
if (thd->mdl_context.acquire_lock(&mdl_request,
thd->variables.lock_wait_timeout))
{
- ha_rollback_trans(thd, TRUE);
+ /*
+ We can't rollback an XA transaction on lock failure due to
+ Innodb redo log and bin log update is involved in rollback.
+ Return error to user for a retry.
+ */
my_error(ER_XAER_RMERR, MYF(0));
+ DBUG_RETURN(true);
}
else
{
DEBUG_SYNC(thd, "trans_xa_commit_after_acquire_commit_lock");
- res= MY_TEST(ha_commit_one_phase(thd, 1));
- if (res)
+ if((WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) &&
+ xid_state.is_binlogged())
+ {
+ res= thd->binlog_query(THD::THD::STMT_QUERY_TYPE,
+ thd->query(), thd->query_length(),
+ FALSE, FALSE, FALSE, 0);
+ }
+ else
+ res= 0;
+
+ if (res || (res= MY_TEST(ha_commit_one_phase(thd, 1))))
my_error(ER_XAER_RMERR, MYF(0));
}
}
else
{
- thd->transaction.xid_state.er_xaer_rmfail();
+ xid_state.er_xaer_rmfail();
DBUG_RETURN(TRUE);
}
@@ -468,7 +556,7 @@ bool trans_xa_commit(THD *thd)
thd->server_status&=
~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
- xid_cache_delete(thd, &thd->transaction.xid_state);
+ xid_cache_delete(thd, &xid_state);
trans_track_end_trx(thd);
@@ -487,10 +575,13 @@ bool trans_xa_commit(THD *thd)
bool trans_xa_rollback(THD *thd)
{
+ bool res= false;
+ XID_STATE &xid_state= thd->transaction.xid_state;
+
DBUG_ENTER("trans_xa_rollback");
- if (!thd->transaction.xid_state.is_explicit_XA() ||
- !thd->transaction.xid_state.xid_cache_element->xid.eq(thd->lex->xid))
+ if (!xid_state.is_explicit_XA() ||
+ !xid_state.xid_cache_element->xid.eq(thd->lex->xid))
{
if (thd->fix_xid_hash_pins())
{
@@ -500,8 +591,26 @@ bool trans_xa_rollback(THD *thd)
if (auto xs= xid_cache_search(thd, thd->lex->xid))
{
+ MDL_request mdl_request;
+ mdl_request.init(MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT,
+ MDL_STATEMENT);
+ if (thd->mdl_context.acquire_lock(&mdl_request,
+ thd->variables.lock_wait_timeout))
+ {
+ /*
+ We can't rollback an XA transaction on lock failure due to
+ Innodb redo log and bin log update is involved in rollback.
+ Return error to user for a retry.
+ */
+ xid_state.er_xaer_rmfail();
+ DBUG_RETURN(true);
+ }
xa_trans_rolled_back(xs);
+ DBUG_ASSERT(!xid_state.xid_cache_element);
+
+ xid_state.xid_cache_element= xs;
ha_commit_or_rollback_by_xid(thd->lex->xid, 0);
+ xid_state.xid_cache_element= 0;
xid_cache_delete(thd, xs);
}
else
@@ -509,19 +618,28 @@ bool trans_xa_rollback(THD *thd)
DBUG_RETURN(thd->get_stmt_da()->is_error());
}
- if (thd->transaction.xid_state.xid_cache_element->xa_state == XA_ACTIVE)
+ if (xid_state.xid_cache_element->xa_state == XA_ACTIVE)
{
- thd->transaction.xid_state.er_xaer_rmfail();
+ xid_state.er_xaer_rmfail();
DBUG_RETURN(TRUE);
}
- DBUG_RETURN(xa_trans_force_rollback(thd));
+
+ if (xid_state.xid_cache_element->xa_state == XA_PREPARED &&
+ xid_state.is_binlogged() &&
+ (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()))
+ {
+ res= thd->binlog_query(THD::THD::STMT_QUERY_TYPE,
+ thd->query(), thd->query_length(),
+ FALSE, FALSE, FALSE, 0);
+ }
+ DBUG_RETURN(res != 0 || xa_trans_force_rollback(thd));
}
bool trans_xa_detach(THD *thd)
{
DBUG_ASSERT(thd->transaction.xid_state.is_explicit_XA());
-#if 1
+#if 0
return xa_trans_force_rollback(thd);
#else
if (thd->transaction.xid_state.xid_cache_element->xa_state != XA_PREPARED)
@@ -736,3 +854,123 @@ bool mysql_xa_recover(THD *thd)
my_eof(thd);
DBUG_RETURN(0);
}
+
+
+/**
+ This is a specific to "slave" applier collection of standard cleanup
+ actions to reset XA transaction states at the end of XA prepare rather than
+ to do it at the transaction commit, see @c ha_commit_one_phase.
+ THD of the slave applier is dissociated from a transaction object in engine
+ that continues to exist there.
+
+ @param THD current thread
+ @return the value of is_error()
+*/
+
+bool applier_reset_xa_trans(THD *thd)
+{
+ thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
+ thd->server_status&=
+ ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
+ DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
+ //TODO: review
+ //xid_cache_delete(thd, xid_s);
+ //if (xid_cache_insert(&xid_s->xid_cache_element->xid, XA_PREPARED, xid_s->is_binlogged))
+ // return true;
+ thd->transaction.xid_state.xid_cache_element->acquired_to_recovered();
+ thd->transaction.xid_state.xid_cache_element= 0;
+
+ attach_native_trx(thd);
+ thd->transaction.cleanup();
+ //TODO: thd->transaction.xid_state.xid_cache_element->xa_state= XA_NOTR;
+ thd->mdl_context.release_transactional_locks();
+
+ return thd->is_error();
+#ifdef p7974
+ Transaction_ctx *trn_ctx= thd->get_transaction();
+ XID_STATE *xid_state= trn_ctx->xid_state();
+ /*
+ In the following the server transaction state gets reset for
+ a slave applier thread similarly to xa_commit logics
+ except commit does not run.
+ */
+ thd->variables.option_bits&= ~OPTION_BEGIN;
+ trn_ctx->reset_unsafe_rollback_flags(Transaction_ctx::STMT);
+ thd->server_status&= ~SERVER_STATUS_IN_TRANS;
+ /* Server transaction ctx is detached from THD */
+ transaction_cache_detach(trn_ctx);
+ xid_state->reset();
+ /*
+ The current engine transactions is detached from THD, and
+ previously saved is restored.
+ */
+ attach_native_trx(thd);
+ trn_ctx->set_ha_trx_info(Transaction_ctx::SESSION, NULL);
+ trn_ctx->set_no_2pc(Transaction_ctx::SESSION, false);
+ trn_ctx->cleanup();
+#ifdef HAVE_PSI_TRANSACTION_INTERFACE
+ MYSQL_COMMIT_TRANSACTION(thd->m_transaction_psi);
+ thd->m_transaction_psi= NULL;
+#endif
+
+#endif /*p7974*/
+ return thd->is_error();
+}
+
+
+/**
+ The function detaches existing storage engines transaction
+ context from thd. Backup area to save it is provided to low level
+ storage engine function.
+
+ is invoked by plugin_foreach() after
+ trans_xa_start() for each storage engine.
+
+ @param[in,out] thd Thread context
+ @param plugin Reference to handlerton
+
+ @return FALSE on success, TRUE otherwise.
+*/
+
+my_bool detach_native_trx(THD *thd, plugin_ref plugin, void *unused)
+{
+ handlerton *hton= plugin_hton(plugin);
+ if (hton->replace_native_transaction_in_thd)
+ hton->replace_native_transaction_in_thd(thd, NULL,
+ thd_ha_data_backup(thd, hton));
+
+ return FALSE;
+
+}
+
+/**
+ The function restores previously saved storage engine transaction context.
+
+ @param thd Thread context
+*/
+void attach_native_trx(THD *thd)
+{
+ Ha_trx_info *ha_info= thd->transaction.all.ha_list;
+ Ha_trx_info *ha_info_next;
+
+ if (ha_info)
+ {
+ for (; ha_info; ha_info= ha_info_next)
+ {
+ handlerton *hton= ha_info->ht();
+ if (hton->replace_native_transaction_in_thd)
+ {
+ /* restore the saved original engine transaction's link with thd */
+ void **trx_backup= thd_ha_data_backup(thd, hton);
+
+ hton->
+ replace_native_transaction_in_thd(thd, *trx_backup, NULL);
+ *trx_backup= NULL;
+ }
+ ha_info_next= ha_info->next();
+ ha_info->reset();
+ }
+ }
+ thd->transaction.all.ha_list= 0;
+ thd->transaction.all.no_2pc= 0;
+}
diff --git a/sql/xa.h b/sql/xa.h
index 85acb6f4958..256b2c239be 100644
--- a/sql/xa.h
+++ b/sql/xa.h
@@ -1,3 +1,5 @@
+#ifndef XA_INCLUDED
+#define XA_INCLUDED
/*
Copyright (c) 2000, 2016, Oracle and/or its affiliates.
Copyright (c) 2009, 2019, MariaDB Corporation.
@@ -67,6 +69,7 @@ class XID_cache_element
public:
static const int32 ACQUIRED= 1 << 30;
static const int32 RECOVERED= 1 << 29;
+ static const int32 BINLOGGED= 1 << 28;
/* Error reported by the Resource Manager (RM) to the Transaction Manager. */
uint rm_error;
enum xa_states xa_state;
@@ -160,7 +163,11 @@ struct XID_STATE {
The recovered transaction after server restart sets it to TRUE always.
That can cause inconsistencies (shoud be fixed?).
*/
- bool is_binlogged;
+ bool is_binlogged()
+ {
+ return xid_cache_element &&
+ xid_cache_element->is_set(XID_cache_element::BINLOGGED);
+ }
bool check_has_uncommitted_xa() const;
bool is_explicit_XA() const { return xid_cache_element != 0; }
@@ -171,12 +178,19 @@ struct XID_STATE {
{
//TODO: what's an equivalent
//xid.null();
- is_binlogged= false;
+ //is_binlogged= false;
+ unset_binlogged();
}
void set_binlogged()
- { is_binlogged= true; }
+ {
+ if (xid_cache_element)
+ xid_cache_element->set(XID_cache_element::BINLOGGED);
+ }
void unset_binlogged()
- { is_binlogged= false; }
+ {
+ if (xid_cache_element)
+ xid_cache_element->set(~XID_cache_element::BINLOGGED);
+ }
};
void xid_cache_init(void);
@@ -193,3 +207,7 @@ bool trans_xa_rollback(THD *thd);
bool trans_xa_detach(THD *thd);
bool mysql_xa_recover(THD *thd);
bool applier_reset_xa_trans(THD *thd);
+void attach_native_trx(THD *thd);
+my_bool detach_native_trx(THD *thd, plugin_ref plugin, void *unused);
+
+#endif /* XA_INCLUDED */