diff options
| author | Andrei Elkin <andrei.elkin@mariadb.com> | 2019-05-10 23:26:56 +0300 |
|---|---|---|
| committer | Andrei Elkin <andrei.elkin@mariadb.com> | 2019-05-15 19:06:24 +0300 |
| commit | 3ae642bc77dafa0984f7541ffc19fd5dbb8d15fc (patch) | |
| tree | f352c50ece80033389dabc120609e7ba32ed2a6c | |
| parent | 0de38e461f8864a916837f4e9808ffbd33d9b741 (diff) | |
| download | mariadb-git-3ae642bc77dafa0984f7541ffc19fd5dbb8d15fc.tar.gz | |
MDEV-7974
Almost ultimate merge with the XA refactoring (to cover is_binlogged).
todo: test out the master side; etc.
| -rw-r--r-- | sql/handler.h | 10 | ||||
| -rw-r--r-- | sql/log.cc | 86 | ||||
| -rw-r--r-- | sql/log_event.cc | 151 | ||||
| -rw-r--r-- | sql/log_event.h | 119 | ||||
| -rw-r--r-- | sql/transaction.cc | 121 | ||||
| -rw-r--r-- | sql/transaction.h | 1 | ||||
| -rw-r--r-- | sql/xa.cc | 296 | ||||
| -rw-r--r-- | sql/xa.h | 26 |
8 files changed, 491 insertions, 319 deletions
diff --git a/sql/handler.h b/sql/handler.h index 16cc7c70c0f..dce2a8a2c88 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -831,16 +831,6 @@ struct xid_t { long gtrid_length; long bqual_length; char data[XIDDATASIZE]; // not \0-terminated ! - /* - The size of the string containing serialized Xid representation - is computed as a sum of - eight as the number of formatting symbols (X'',X'',) - plus 2 x XIDDATASIZE (2 due to hex format), - plus space for decimal digits of XID::formatID, - plus one for 0x0. - */ - static const uint ser_buf_size= - 8 + 2 * XIDDATASIZE + 4 * sizeof(long) + 1; xid_t() {} /* Remove gcc warning */ bool eq(struct xid_t *xid) diff --git a/sql/log.cc b/sql/log.cc index d6cee2bc39e..84dba2a41f2 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -91,6 +91,9 @@ static bool binlog_savepoint_rollback_can_release_mdl(handlerton *hton, static int binlog_commit(handlerton *hton, THD *thd, bool all); static int binlog_rollback(handlerton *hton, THD *thd, bool all); static int binlog_prepare(handlerton *hton, THD *thd, bool all); +static int binlog_xa_recover(handlerton *hton, XID *xid_list, uint len); +static int binlog_commit_by_xid(handlerton *hton, XID *xid); +static int binlog_rollback_by_xid(handlerton *hton, XID *xid); static int binlog_start_consistent_snapshot(handlerton *hton, THD *thd); static int binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr, Log_event *end_ev, bool all, bool using_stmt, @@ -1695,6 +1698,9 @@ int binlog_init(void *p) binlog_hton->commit= binlog_commit; binlog_hton->rollback= binlog_rollback; binlog_hton->prepare= binlog_prepare; + binlog_hton->recover= binlog_xa_recover; + binlog_hton->commit_by_xid= binlog_commit_by_xid; + binlog_hton->rollback_by_xid= binlog_rollback_by_xid; binlog_hton->start_consistent_snapshot= binlog_start_consistent_snapshot; binlog_hton->flags= HTON_NOT_USER_SELECTABLE | HTON_HIDDEN; return 0; @@ -1967,24 +1973,58 @@ static int binlog_prepare(handlerton *hton, THD *thd, bool all) } -static int serialize_xid(XID *xid, char *buf) +static int binlog_xa_recover(handlerton *hton __attribute__((unused)), + XID *xid_list __attribute__((unused)), + uint len __attribute__((unused))) { - size_t size; - buf[0]= '\''; - memcpy(buf+1, xid->data, xid->gtrid_length); - size= xid->gtrid_length + 2; - buf[size-1]= '\''; - if (xid->bqual_length == 0 && xid->formatID == 1) - return size; + /* Does nothing. */ + return 0; +} + +inline int binlog_write_by_xid(THD *thd, XID *xid, char *buf, + const char *query, size_t q_len) +{ + int res= 0; + + if ((WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) && + thd->transaction.xid_state.is_binlogged()) + { + size_t buflen; + + memcpy(buf, query, q_len); + buflen= q_len + strlen(static_cast<event_xid_t*>(xid)->serialize(buf+q_len)); + res= thd->binlog_query(THD::THD::STMT_QUERY_TYPE, buf, buflen, + FALSE, TRUE, TRUE, 0); + + DBUG_ASSERT(!res || thd->is_error()); + } + + return res; +} - memcpy(buf+size, ", '", 3); - memcpy(buf+size+3, xid->data+xid->gtrid_length, xid->bqual_length); - size+= 3 + xid->bqual_length; - buf[size]= '\''; - size++; - if (xid->formatID != 1) - size+= sprintf(buf+size, ", %ld", xid->formatID); - return size; +static int binlog_commit_by_xid(handlerton *hton, XID *xid) +{ + THD *thd= current_thd; + const char query[]= "XA COMMIT "; + const size_t q_len= sizeof(query) - 1; // do not count trailing 0 + char buf[q_len + ser_buf_size]; + + DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_COMMIT); + + return binlog_write_by_xid(thd, xid, buf, query, q_len); +} + + +static int binlog_rollback_by_xid(handlerton *hton, XID *xid) +{ + THD *thd= current_thd; + const char query[]= "XA ROLLBACK "; + const size_t q_len= sizeof(query) - 1; // do not count trailing 0 + char buf[q_len + ser_buf_size]; + + DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_ROLLBACK); + + return binlog_write_by_xid(thd, xid, buf, query, q_len); } @@ -9942,20 +9982,26 @@ int TC_LOG_BINLOG::log_xa_prepare(THD *thd, bool all) binlog_cache_mngr *cache_mngr= thd->binlog_setup_trx_data(); XID *xid= &thd->transaction.xid_state.xid_cache_element->xid; { + // todo assert wsrep_simulate || is_open() + /* Log the XA END event first. We don't do that in trans_xa_end() as XA COMMIT ONE PHASE is logged as simple BEGIN/COMMIT so the XA END should not get to the log. */ - const size_t xc_len= sizeof("XA END ") - 1; // do not count trailing 0 - char buf[xc_len + xid_t::ser_buf_size]; + const char query[]= "XA END "; + const size_t q_len= sizeof(query) - 1; // do not count trailing 0 + char buf[q_len + ser_buf_size]; size_t buflen; binlog_cache_data *cache_data; IO_CACHE *file; - memcpy(buf, "XA END ", xc_len); - buflen= xc_len + serialize_xid(xid, buf+xc_len); + // TODO binlog_query + + memcpy(buf, query, q_len); + buflen= q_len + + strlen(static_cast<event_xid_t*>(xid)->serialize(buf + q_len)); cache_data= cache_mngr->get_binlog_cache_data(true); file= &cache_data->cache_log; thd->lex->sql_command= SQLCOM_XA_END; diff --git a/sql/log_event.cc b/sql/log_event.cc index 57ec9ec2eb7..5cd69d037b1 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -7978,11 +7978,25 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg, /* Preserve any DDL or WAITED flag in the slave's binlog. */ if (thd_arg->rgi_slave) flags2|= (thd_arg->rgi_slave->gtid_ev_flags2 & (FL_DDL|FL_WAITED)); - if (thd->transaction.xid_state.xid_cache_element->xa_state == XA_IDLE && + if (thd->transaction.xid_state.xid_cache_element && thd->lex->xa_opt != XA_ONE_PHASE) { + DBUG_ASSERT(thd->transaction.xid_state.xid_cache_element->xa_state == XA_IDLE); + flags2|= FL_PREPARED_XA; - xid= thd->transaction.xid_state.xid_cache_element->xid; + + // No array assignment: xid.data= thd->transaction.xid_state.xid_cache_element->xid.data; + //xid= * static_cast<event_xid_t*> + // (&thd->transaction.xid_state.xid_cache_element->xid); + + xid.formatID= thd->transaction.xid_state.xid_cache_element->xid.formatID; + xid.gtrid_length= thd->transaction.xid_state.xid_cache_element->xid.gtrid_length; + xid.bqual_length= thd->transaction.xid_state.xid_cache_element->xid.bqual_length; + if (xid.formatID != -1) // TODO: -1 why + { + long data_length= xid.bqual_length + xid.gtrid_length; + memcpy(xid.data, thd->transaction.xid_state.xid_cache_element->xid.data, data_length); + } } } @@ -8099,9 +8113,14 @@ Gtid_log_event::make_compatible_event(String *packet, bool *need_dummy_event, void Gtid_log_event::pack_info(Protocol *protocol) { - char buf[6+5+10+1+10+1+20+1+4+20+1+5+128]; + char buf[6+5+10+1+10+1+20+1+4+20+1+5+128 /* todo: const:s */]; char *p; - p = strmov(buf, (flags2 & FL_STANDALONE ? "GTID " : "BEGIN GTID ")); + p = strmov(buf, (flags2 & FL_STANDALONE ? "GTID " : + flags2 & FL_PREPARED_XA ? "XA START " : "BEGIN GTID ")); + if (flags2 & FL_PREPARED_XA) + { + p += sprintf(p, "%s GTID ", xid.serialize()); + } p= longlong10_to_str(domain_id, p, 10); *p++= '-'; p= longlong10_to_str(server_id, p, 10); @@ -8113,12 +8132,6 @@ Gtid_log_event::pack_info(Protocol *protocol) p= longlong10_to_str(commit_id, p, 10); } - if (flags2 & FL_PREPARED_XA) - { - p= strmov(p, " XID :"); - p= strnmov(p, xid.data, xid.bqual_length + xid.gtrid_length); - } - protocol->store(buf, p-buf, &my_charset_bin); } @@ -8310,18 +8323,8 @@ Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) } if ((flags2 & FL_PREPARED_XA) && !is_flashback) { - my_b_write_string(&cache, "XA START '"); - my_b_write(&cache, (uchar *) xid.data, xid.gtrid_length); - my_b_write_string(&cache, "'"); - if (xid.bqual_length > 0 || xid.formatID != 1) - { - my_b_write_string(&cache, ", '"); - my_b_write(&cache, (uchar *) xid.data+xid.gtrid_length, xid.bqual_length); - my_b_write_string(&cache, "'"); - if (xid.formatID != 1) - if (my_b_printf(&cache, ", %d", xid.formatID)) - goto err; - } + my_b_write_string(&cache, "XA START "); + my_b_write(&cache, (uchar*) xid.serialize(), strlen(xid.buf)); if (my_b_printf(&cache, "%s\n", print_event_info->delimiter)) goto err; } @@ -9170,88 +9173,6 @@ int Xid_log_event::do_commit() #endif /* !MYSQL_CLIENT */ -#ifdef TODO7974 -/** - Function serializes XID which is characterized by by four last arguments - of the function. - Serialized XID is presented in valid hex format and is returned to - the caller in a buffer pointed by the first argument. - The buffer size provived by the caller must be not less than - 8 + 2 * XIDDATASIZE + 4 * sizeof(XID::formatID) + 1, see - XID::serialize_xid() that is a caller and plugin.h for XID declaration. - - @param buf pointer to a buffer allocated for storing serialized data - - @return the value of the buffer pointer -*/ - -char *XA_prepare_log_event::event_xid_t::serialize(char *buf) const -{ - int i; - char *c= buf; - /* - Build a string like following pattern: - X'hex11hex12...hex1m',X'hex21hex22...hex2n',11 - and store it into buf. - Here hex1i and hex2k are hexadecimals representing XID's internal - raw bytes (1 <= i <= m, 1 <= k <= n), and `m' and `n' even numbers - half of which corresponding to the lengths of XID's components. - */ - c[0]= 'X'; - c[1]= '\''; - c+= 2; - for (i= 0; i < gtrid_length; i++) - { - c[0]=_dig_vec_lower[((uchar*) data)[i] >> 4]; - c[1]=_dig_vec_lower[((uchar*) data)[i] & 0x0f]; - c+= 2; - } - c[0]= '\''; - c[1]= ','; - c[2]= 'X'; - c[3]= '\''; - c+= 4; - - for (; i < gtrid_length + bqual_length; i++) - { - c[0]=_dig_vec_lower[((uchar*) data)[i] >> 4]; - c[1]=_dig_vec_lower[((uchar*) data)[i] & 0x0f]; - c+= 2; - } - c[0]= '\''; - sprintf(c+1, ",%lu", formatID); - - return buf; -} -#endif /*TODO7974*/ - -char *XA_prepare_log_event::event_xid_t::serialize(char *buf) const -{ - char *c= buf; - - c[0]= '\''; - memcpy(c+1, data, gtrid_length); - c[gtrid_length+1]= '\''; - c+= gtrid_length + 2; - - if (bqual_length) - { - c[0]= ','; - c[1]= '\''; - memcpy(c+2, data+gtrid_length, bqual_length); - c[bqual_length+2]= '\''; - c+= bqual_length+3; - } - - if (formatID != 1) - sprintf(c, ",%lu", formatID); - else - c[0]=0; - - return buf; -} - - /************************************************************************** XA_prepare_log_event methods **************************************************************************/ @@ -9295,15 +9216,15 @@ XA_prepare_log_event(const char* buf, #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) void XA_prepare_log_event::pack_info(Protocol *protocol) { - char buf[ser_buf_size]; - char query[sizeof("XA COMMIT ONE PHASE") + 1 + sizeof(buf)]; + //char buf[ser_buf_size]; + char query[sizeof("XA COMMIT ONE PHASE") + 1 + ser_buf_size]; //sizeof(buf)]; /* RHS of the following assert is unknown to client sources */ - compile_time_assert(ser_buf_size == XID::ser_buf_size); - m_xid.serialize(buf); + // TODO: why ? compile_time_assert(ser_buf_size == XID::ser_buf_size); + //m_xid.serialize(); sprintf(query, (one_phase ? "XA COMMIT %s ONE PHASE" : "XA PREPARE %s"), - buf); + m_xid.serialize()); protocol->store(query, strlen(query), &my_charset_bin); } @@ -9321,9 +9242,9 @@ bool XA_prepare_log_event::write() int4store(data+(1+4), static_cast<XID*>(xid)->gtrid_length); int4store(data+(1+4+4), static_cast<XID*>(xid)->bqual_length); - DBUG_ASSERT(xid_bufs_size == sizeof(data) - 1); + DBUG_ASSERT(xid_subheader_no_data == sizeof(data) - 1); - return write_header(sizeof(one_phase_byte) + xid_bufs_size + + return write_header(sizeof(one_phase_byte) + xid_subheader_no_data + static_cast<XID*>(xid)->gtrid_length + static_cast<XID*>(xid)->bqual_length) || write_data(data, sizeof(data)) || @@ -9340,19 +9261,19 @@ bool XA_prepare_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { Write_on_release_cache cache(&print_event_info->head_cache, file, Write_on_release_cache::FLUSH_F, this); - char buf[ser_buf_size]; + //char buf[ser_buf_size]; - m_xid.serialize(buf); + m_xid.serialize(); if (!print_event_info->short_form) { print_header(&cache, print_event_info, FALSE); - if (my_b_printf(&cache, "\tXID = %s\n", buf)) + if (my_b_printf(&cache, "\tXID = %s\n", m_xid.buf)) goto error; } if (my_b_printf(&cache, "XA PREPARE %s\n%s\n", - buf, print_event_info->delimiter)) + m_xid.buf, print_event_info->delimiter)) goto error; return cache.flush_data(); diff --git a/sql/log_event.h b/sql/log_event.h index 89507d40581..53b36b857cd 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -3107,26 +3107,107 @@ private: with other servers. */ -class XA_prepare_log_event: public Xid_apply_log_event +/** + Function serializes XID which is characterized by by four last arguments + of the function. + Serialized XID is presented in valid hex format and is returned to + the caller in a buffer pointed by the first argument. + The buffer size provived by the caller must be not less than + 8 + 2 * XIDDATASIZE + 4 * sizeof(XID::formatID) + 1, see + MYSQL_{,XID} definitions. + + @param buf pointer to a buffer allocated for storing serialized data + @param fmt formatID value + @param gln gtrid_length value + @param bln bqual_length value + @param dat data value + + @return the value of the buffer pointer +*/ + +inline char *serialize_xid(char *buf, long fmt, long gln, long bln, + const char *dat) { -protected: - /* The event_xid_t members were copied from handler.h */ - struct event_xid_t + int i; + char *c= buf; + /* + Build a string like following pattern: + X'hex11hex12...hex1m',X'hex21hex22...hex2n',11 + and store it into buf. + Here hex1i and hex2k are hexadecimals representing XID's internal + raw bytes (1 <= i <= m, 1 <= k <= n), and `m' and `n' even numbers + half of which corresponding to the lengths of XID's components. + */ + c[0]= 'X'; + c[1]= '\''; + c+= 2; + for (i= 0; i < gln; i++) { - long formatID; - long gtrid_length; - long bqual_length; - char data[MYSQL_XIDDATASIZE]; // not \0-terminated ! - char *serialize(char *buf) const; - }; + c[0]=_dig_vec_lower[((uchar*) dat)[i] >> 4]; + c[1]=_dig_vec_lower[((uchar*) dat)[i] & 0x0f]; + c+= 2; + } + c[0]= '\''; + c[1]= ','; + c[2]= 'X'; + c[3]= '\''; + c+= 4; - /* size of serialization buffer is explained in $MYSQL/sql/xa.h. */ - static const uint ser_buf_size= - 8 + 2 * MYSQL_XIDDATASIZE + 4 * sizeof(long) + 1; + for (; i < gln + bln; i++) + { + c[0]=_dig_vec_lower[((uchar*) dat)[i] >> 4]; + c[1]=_dig_vec_lower[((uchar*) dat)[i] & 0x0f]; + c+= 2; + } + c[0]= '\''; + sprintf(c+1, ",%lu", fmt); + + return buf; +} + +/* + The size of the string containing serialized Xid representation + is computed as a sum of + eight as the number of formatting symbols (X'',X'',) + plus 2 x XIDDATASIZE (2 due to hex format), + plus space for decimal digits of XID::formatID, + plus one for 0x0. +*/ +static const uint ser_buf_size= + 8 + 2 * MYSQL_XIDDATASIZE + 4 * sizeof(long) + 1; + +struct event_mysql_xid_t : MYSQL_XID +{ + char buf[ser_buf_size]; + char *serialize() + { + return serialize_xid(buf, formatID, gtrid_length, bqual_length, data); + } +}; + +#ifndef MYSQL_CLIENT +struct event_xid_t : XID +{ + char buf[ser_buf_size]; + + char *serialize(char *buf_arg) + { + return serialize_xid(buf_arg, formatID, gtrid_length, bqual_length, data); + } + char *serialize() + { + return serialize(buf); + } +}; +#endif + +class XA_prepare_log_event: public Xid_apply_log_event +{ +protected: - /* Total size of buffers to hold serialized members of XID struct */ - static const int xid_bufs_size= 12; - event_xid_t m_xid; + /* Constant contribution to subheader in write() by members of XID struct. */ + static const int xid_subheader_no_data= 12; + event_mysql_xid_t m_xid; void *xid; bool one_phase; @@ -3149,7 +3230,7 @@ public: Log_event_type get_type_code() { return XA_PREPARE_LOG_EVENT; } int get_data_size() { - return xid_bufs_size + m_xid.gtrid_length + m_xid.bqual_length; + return xid_subheader_no_data + m_xid.gtrid_length + m_xid.bqual_length; } #ifdef MYSQL_SERVER @@ -3476,9 +3557,9 @@ public: uint64 commit_id; uint32 domain_id; #ifdef MYSQL_SERVER - XID xid; + event_xid_t xid; #else - struct st_mysql_xid xid; + event_mysql_xid_t xid; #endif uchar flags2; diff --git a/sql/transaction.cc b/sql/transaction.cc index ef560fbecbf..2887ae763df 100644 --- a/sql/transaction.cc +++ b/sql/transaction.cc @@ -700,124 +700,3 @@ bool trans_release_savepoint(THD *thd, LEX_CSTRING name) DBUG_RETURN(MY_TEST(res)); } - - -void attach_native_trx(THD *thd); -/** - This is a specific to "slave" applier collection of standard cleanup - actions to reset XA transaction states at the end of XA prepare rather than - to do it at the transaction commit, see @c ha_commit_one_phase. - THD of the slave applier is dissociated from a transaction object in engine - that continues to exist there. - - @param THD current thread - @return the value of is_error() -*/ - -bool applier_reset_xa_trans(THD *thd) -{ - thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG); - thd->server_status&= - ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY); - DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS")); - //TODO: review - //xid_cache_delete(thd, xid_s); - //if (xid_cache_insert(&xid_s->xid_cache_element->xid, XA_PREPARED, xid_s->is_binlogged)) - // return true; - thd->transaction.xid_state.xid_cache_element->acquired_to_recovered(); - thd->transaction.xid_state.xid_cache_element= 0; - - attach_native_trx(thd); - thd->transaction.cleanup(); - //TODO: thd->transaction.xid_state.xid_cache_element->xa_state= XA_NOTR; - thd->mdl_context.release_transactional_locks(); - - return thd->is_error(); -#ifdef p7974 - Transaction_ctx *trn_ctx= thd->get_transaction(); - XID_STATE *xid_state= trn_ctx->xid_state(); - /* - In the following the server transaction state gets reset for - a slave applier thread similarly to xa_commit logics - except commit does not run. - */ - thd->variables.option_bits&= ~OPTION_BEGIN; - trn_ctx->reset_unsafe_rollback_flags(Transaction_ctx::STMT); - thd->server_status&= ~SERVER_STATUS_IN_TRANS; - /* Server transaction ctx is detached from THD */ - transaction_cache_detach(trn_ctx); - xid_state->reset(); - /* - The current engine transactions is detached from THD, and - previously saved is restored. - */ - attach_native_trx(thd); - trn_ctx->set_ha_trx_info(Transaction_ctx::SESSION, NULL); - trn_ctx->set_no_2pc(Transaction_ctx::SESSION, false); - trn_ctx->cleanup(); -#ifdef HAVE_PSI_TRANSACTION_INTERFACE - MYSQL_COMMIT_TRANSACTION(thd->m_transaction_psi); - thd->m_transaction_psi= NULL; -#endif - -#endif /*p7974*/ - return thd->is_error(); -} - - -/** - The function detaches existing storage engines transaction - context from thd. Backup area to save it is provided to low level - storage engine function. - - is invoked by plugin_foreach() after - trans_xa_start() for each storage engine. - - @param[in,out] thd Thread context - @param plugin Reference to handlerton - - @return FALSE on success, TRUE otherwise. -*/ - -my_bool detach_native_trx(THD *thd, plugin_ref plugin, void *unused) -{ - handlerton *hton= plugin_hton(plugin); - if (hton->replace_native_transaction_in_thd) - hton->replace_native_transaction_in_thd(thd, NULL, - thd_ha_data_backup(thd, hton)); - - return FALSE; - -} - -/** - The function restores previously saved storage engine transaction context. - - @param thd Thread context -*/ -void attach_native_trx(THD *thd) -{ - Ha_trx_info *ha_info= thd->transaction.all.ha_list; - Ha_trx_info *ha_info_next; - - if (ha_info) - { - for (; ha_info; ha_info= ha_info_next) - { - handlerton *hton= ha_info->ht(); - if (hton->replace_native_transaction_in_thd) - { - /* restore the saved original engine transaction's link with thd */ - void **trx_backup= thd_ha_data_backup(thd, hton); - - hton-> - replace_native_transaction_in_thd(thd, *trx_backup, NULL); - *trx_backup= NULL; - } - ha_info_next= ha_info->next(); - ha_info->reset(); - } - } - thd->transaction.all.ha_list= 0; - thd->transaction.all.no_2pc= 0; -} diff --git a/sql/transaction.h b/sql/transaction.h index 5eaa2b00027..eaf2a4da919 100644 --- a/sql/transaction.h +++ b/sql/transaction.h @@ -40,5 +40,4 @@ bool trans_rollback_to_savepoint(THD *thd, LEX_CSTRING name); bool trans_release_savepoint(THD *thd, LEX_CSTRING name); void trans_reset_one_shot_chistics(THD *thd); - #endif /* TRANSACTION_H */ diff --git a/sql/xa.cc b/sql/xa.cc index 2aa798fae23..dbd0fc272e4 100644 --- a/sql/xa.cc +++ b/sql/xa.cc @@ -286,7 +286,7 @@ static bool xa_trans_force_rollback(THD *thd) bool trans_xa_start(THD *thd) { DBUG_ENTER("trans_xa_start"); - + // TODO/FIXME: s/if is_explicit_XA()/assert/ if (thd->transaction.xid_state.is_explicit_XA() && thd->transaction.xid_state.xid_cache_element->xa_state == XA_IDLE && thd->lex->xa_opt == XA_RESUME) @@ -314,6 +314,17 @@ bool trans_xa_start(THD *thd) trans_rollback(thd); DBUG_RETURN(true); } + + if (thd->variables.pseudo_slave_mode || thd->slave_thread) + { + /* + In case of slave thread applier or processing binlog by client, + detach the "native" thd's trx in favor of dynamically created. + */ + plugin_foreach(thd, detach_native_trx, + MYSQL_STORAGE_ENGINE_PLUGIN, NULL); + } + DBUG_RETURN(FALSE); } @@ -361,6 +372,8 @@ bool trans_xa_end(THD *thd) bool trans_xa_prepare(THD *thd) { + int res= 1; + DBUG_ENTER("trans_xa_prepare"); if (!thd->transaction.xid_state.is_explicit_XA() || @@ -368,16 +381,41 @@ bool trans_xa_prepare(THD *thd) thd->transaction.xid_state.er_xaer_rmfail(); else if (!thd->transaction.xid_state.xid_cache_element->xid.eq(thd->lex->xid)) my_error(ER_XAER_NOTA, MYF(0)); - else if (ha_prepare(thd)) + else { - xid_cache_delete(thd, &thd->transaction.xid_state); - my_error(ER_XA_RBROLLBACK, MYF(0)); + /* + Acquire metadata lock which will ensure that COMMIT is blocked + by active FLUSH TABLES WITH READ LOCK (and vice versa COMMIT in + progress blocks FTWRL). + + We allow FLUSHer to COMMIT; we assume FLUSHer knows what it does. + */ + MDL_request mdl_request; + mdl_request.init(MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT, + MDL_STATEMENT); + if (thd->mdl_context.acquire_lock(&mdl_request, + thd->variables.lock_wait_timeout) || + ha_prepare(thd)) + { + if (!mdl_request.ticket) + ha_rollback_trans(thd, TRUE); + thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG); + thd->transaction.all.reset(); + thd->server_status&= + ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY); + xid_cache_delete(thd, &thd->transaction.xid_state); + my_error(ER_XA_RBROLLBACK, MYF(0)); + } + else + { + res= 0; + thd->transaction.xid_state.xid_cache_element->xa_state= XA_PREPARED; + if (thd->variables.pseudo_slave_mode) + res= applier_reset_xa_trans(thd); + } } - else - thd->transaction.xid_state.xid_cache_element->xa_state= XA_PREPARED; - DBUG_RETURN(thd->is_error() || - thd->transaction.xid_state.xid_cache_element->xa_state != XA_PREPARED); + DBUG_RETURN(res); } @@ -392,11 +430,13 @@ bool trans_xa_prepare(THD *thd) bool trans_xa_commit(THD *thd) { - bool res= TRUE; + bool res= true; + XID_STATE &xid_state= thd->transaction.xid_state; + DBUG_ENTER("trans_xa_commit"); - if (!thd->transaction.xid_state.is_explicit_XA() || - !thd->transaction.xid_state.xid_cache_element->xid.eq(thd->lex->xid)) + if (!xid_state.is_explicit_XA() || + !xid_state.xid_cache_element->xid.eq(thd->lex->xid)) { if (thd->fix_xid_hash_pins()) { @@ -406,28 +446,62 @@ bool trans_xa_commit(THD *thd) if (auto xs= xid_cache_search(thd, thd->lex->xid)) { - res= xa_trans_rolled_back(xs); - ha_commit_or_rollback_by_xid(thd->lex->xid, !res); - xid_cache_delete(thd, xs); + if (thd->in_multi_stmt_transaction_mode()) + { + xid_state.er_xaer_rmfail(); + } + else + { + res= xa_trans_rolled_back(xs); + /* + Acquire metadata lock which will ensure that COMMIT is blocked + by active FLUSH TABLES WITH READ LOCK (and vice versa COMMIT in + progress blocks FTWRL). + + We allow FLUSHer to COMMIT; we assume FLUSHer knows what it does. + */ + MDL_request mdl_request; + mdl_request.init(MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT, + MDL_STATEMENT); + if (thd->mdl_context.acquire_lock(&mdl_request, + thd->variables.lock_wait_timeout)) + { + /* + We can't rollback an XA transaction on lock failure due to + Innodb redo log and bin log update is involved in rollback. + Return error to user for a retry. + */ + xid_state.er_xaer_rmfail(); + DBUG_RETURN(res); + } + DBUG_ASSERT(!xid_state.xid_cache_element); + + xid_state.xid_cache_element= xs; + ha_commit_or_rollback_by_xid(thd->lex->xid, !res); + xid_state.xid_cache_element= 0; + + res= res || thd->is_error(); + xid_cache_delete(thd, xs); + } } else my_error(ER_XAER_NOTA, MYF(0)); DBUG_RETURN(res); } - if (xa_trans_rolled_back(thd->transaction.xid_state.xid_cache_element)) + if (xa_trans_rolled_back(xid_state.xid_cache_element)) { xa_trans_force_rollback(thd); DBUG_RETURN(thd->is_error()); } - else if (thd->transaction.xid_state.xid_cache_element->xa_state == XA_IDLE && + else if (xid_state.xid_cache_element->xa_state == XA_IDLE && thd->lex->xa_opt == XA_ONE_PHASE) { int r= ha_commit_trans(thd, TRUE); if ((res= MY_TEST(r))) my_error(r == 1 ? ER_XA_RBROLLBACK : ER_XAER_RMERR, MYF(0)); } - else if (thd->transaction.xid_state.xid_cache_element->xa_state == XA_PREPARED && + else if (xid_state.xid_cache_element->xa_state == XA_PREPARED && thd->lex->xa_opt == XA_NONE) { MDL_request mdl_request; @@ -440,26 +514,40 @@ bool trans_xa_commit(THD *thd) We allow FLUSHer to COMMIT; we assume FLUSHer knows what it does. */ mdl_request.init(MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT, - MDL_TRANSACTION); + MDL_STATEMENT); if (thd->mdl_context.acquire_lock(&mdl_request, thd->variables.lock_wait_timeout)) { - ha_rollback_trans(thd, TRUE); + /* + We can't rollback an XA transaction on lock failure due to + Innodb redo log and bin log update is involved in rollback. + Return error to user for a retry. + */ my_error(ER_XAER_RMERR, MYF(0)); + DBUG_RETURN(true); } else { DEBUG_SYNC(thd, "trans_xa_commit_after_acquire_commit_lock"); - res= MY_TEST(ha_commit_one_phase(thd, 1)); - if (res) + if((WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) && + xid_state.is_binlogged()) + { + res= thd->binlog_query(THD::THD::STMT_QUERY_TYPE, + thd->query(), thd->query_length(), + FALSE, FALSE, FALSE, 0); + } + else + res= 0; + + if (res || (res= MY_TEST(ha_commit_one_phase(thd, 1)))) my_error(ER_XAER_RMERR, MYF(0)); } } else { - thd->transaction.xid_state.er_xaer_rmfail(); + xid_state.er_xaer_rmfail(); DBUG_RETURN(TRUE); } @@ -468,7 +556,7 @@ bool trans_xa_commit(THD *thd) thd->server_status&= ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY); DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS")); - xid_cache_delete(thd, &thd->transaction.xid_state); + xid_cache_delete(thd, &xid_state); trans_track_end_trx(thd); @@ -487,10 +575,13 @@ bool trans_xa_commit(THD *thd) bool trans_xa_rollback(THD *thd) { + bool res= false; + XID_STATE &xid_state= thd->transaction.xid_state; + DBUG_ENTER("trans_xa_rollback"); - if (!thd->transaction.xid_state.is_explicit_XA() || - !thd->transaction.xid_state.xid_cache_element->xid.eq(thd->lex->xid)) + if (!xid_state.is_explicit_XA() || + !xid_state.xid_cache_element->xid.eq(thd->lex->xid)) { if (thd->fix_xid_hash_pins()) { @@ -500,8 +591,26 @@ bool trans_xa_rollback(THD *thd) if (auto xs= xid_cache_search(thd, thd->lex->xid)) { + MDL_request mdl_request; + mdl_request.init(MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT, + MDL_STATEMENT); + if (thd->mdl_context.acquire_lock(&mdl_request, + thd->variables.lock_wait_timeout)) + { + /* + We can't rollback an XA transaction on lock failure due to + Innodb redo log and bin log update is involved in rollback. + Return error to user for a retry. + */ + xid_state.er_xaer_rmfail(); + DBUG_RETURN(true); + } xa_trans_rolled_back(xs); + DBUG_ASSERT(!xid_state.xid_cache_element); + + xid_state.xid_cache_element= xs; ha_commit_or_rollback_by_xid(thd->lex->xid, 0); + xid_state.xid_cache_element= 0; xid_cache_delete(thd, xs); } else @@ -509,19 +618,28 @@ bool trans_xa_rollback(THD *thd) DBUG_RETURN(thd->get_stmt_da()->is_error()); } - if (thd->transaction.xid_state.xid_cache_element->xa_state == XA_ACTIVE) + if (xid_state.xid_cache_element->xa_state == XA_ACTIVE) { - thd->transaction.xid_state.er_xaer_rmfail(); + xid_state.er_xaer_rmfail(); DBUG_RETURN(TRUE); } - DBUG_RETURN(xa_trans_force_rollback(thd)); + + if (xid_state.xid_cache_element->xa_state == XA_PREPARED && + xid_state.is_binlogged() && + (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())) + { + res= thd->binlog_query(THD::THD::STMT_QUERY_TYPE, + thd->query(), thd->query_length(), + FALSE, FALSE, FALSE, 0); + } + DBUG_RETURN(res != 0 || xa_trans_force_rollback(thd)); } bool trans_xa_detach(THD *thd) { DBUG_ASSERT(thd->transaction.xid_state.is_explicit_XA()); -#if 1 +#if 0 return xa_trans_force_rollback(thd); #else if (thd->transaction.xid_state.xid_cache_element->xa_state != XA_PREPARED) @@ -736,3 +854,123 @@ bool mysql_xa_recover(THD *thd) my_eof(thd); DBUG_RETURN(0); } + + +/** + This is a specific to "slave" applier collection of standard cleanup + actions to reset XA transaction states at the end of XA prepare rather than + to do it at the transaction commit, see @c ha_commit_one_phase. + THD of the slave applier is dissociated from a transaction object in engine + that continues to exist there. + + @param THD current thread + @return the value of is_error() +*/ + +bool applier_reset_xa_trans(THD *thd) +{ + thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG); + thd->server_status&= + ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY); + DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS")); + //TODO: review + //xid_cache_delete(thd, xid_s); + //if (xid_cache_insert(&xid_s->xid_cache_element->xid, XA_PREPARED, xid_s->is_binlogged)) + // return true; + thd->transaction.xid_state.xid_cache_element->acquired_to_recovered(); + thd->transaction.xid_state.xid_cache_element= 0; + + attach_native_trx(thd); + thd->transaction.cleanup(); + //TODO: thd->transaction.xid_state.xid_cache_element->xa_state= XA_NOTR; + thd->mdl_context.release_transactional_locks(); + + return thd->is_error(); +#ifdef p7974 + Transaction_ctx *trn_ctx= thd->get_transaction(); + XID_STATE *xid_state= trn_ctx->xid_state(); + /* + In the following the server transaction state gets reset for + a slave applier thread similarly to xa_commit logics + except commit does not run. + */ + thd->variables.option_bits&= ~OPTION_BEGIN; + trn_ctx->reset_unsafe_rollback_flags(Transaction_ctx::STMT); + thd->server_status&= ~SERVER_STATUS_IN_TRANS; + /* Server transaction ctx is detached from THD */ + transaction_cache_detach(trn_ctx); + xid_state->reset(); + /* + The current engine transactions is detached from THD, and + previously saved is restored. + */ + attach_native_trx(thd); + trn_ctx->set_ha_trx_info(Transaction_ctx::SESSION, NULL); + trn_ctx->set_no_2pc(Transaction_ctx::SESSION, false); + trn_ctx->cleanup(); +#ifdef HAVE_PSI_TRANSACTION_INTERFACE + MYSQL_COMMIT_TRANSACTION(thd->m_transaction_psi); + thd->m_transaction_psi= NULL; +#endif + +#endif /*p7974*/ + return thd->is_error(); +} + + +/** + The function detaches existing storage engines transaction + context from thd. Backup area to save it is provided to low level + storage engine function. + + is invoked by plugin_foreach() after + trans_xa_start() for each storage engine. + + @param[in,out] thd Thread context + @param plugin Reference to handlerton + + @return FALSE on success, TRUE otherwise. +*/ + +my_bool detach_native_trx(THD *thd, plugin_ref plugin, void *unused) +{ + handlerton *hton= plugin_hton(plugin); + if (hton->replace_native_transaction_in_thd) + hton->replace_native_transaction_in_thd(thd, NULL, + thd_ha_data_backup(thd, hton)); + + return FALSE; + +} + +/** + The function restores previously saved storage engine transaction context. + + @param thd Thread context +*/ +void attach_native_trx(THD *thd) +{ + Ha_trx_info *ha_info= thd->transaction.all.ha_list; + Ha_trx_info *ha_info_next; + + if (ha_info) + { + for (; ha_info; ha_info= ha_info_next) + { + handlerton *hton= ha_info->ht(); + if (hton->replace_native_transaction_in_thd) + { + /* restore the saved original engine transaction's link with thd */ + void **trx_backup= thd_ha_data_backup(thd, hton); + + hton-> + replace_native_transaction_in_thd(thd, *trx_backup, NULL); + *trx_backup= NULL; + } + ha_info_next= ha_info->next(); + ha_info->reset(); + } + } + thd->transaction.all.ha_list= 0; + thd->transaction.all.no_2pc= 0; +} @@ -1,3 +1,5 @@ +#ifndef XA_INCLUDED +#define XA_INCLUDED /* Copyright (c) 2000, 2016, Oracle and/or its affiliates. Copyright (c) 2009, 2019, MariaDB Corporation. @@ -67,6 +69,7 @@ class XID_cache_element public: static const int32 ACQUIRED= 1 << 30; static const int32 RECOVERED= 1 << 29; + static const int32 BINLOGGED= 1 << 28; /* Error reported by the Resource Manager (RM) to the Transaction Manager. */ uint rm_error; enum xa_states xa_state; @@ -160,7 +163,11 @@ struct XID_STATE { The recovered transaction after server restart sets it to TRUE always. That can cause inconsistencies (shoud be fixed?). */ - bool is_binlogged; + bool is_binlogged() + { + return xid_cache_element && + xid_cache_element->is_set(XID_cache_element::BINLOGGED); + } bool check_has_uncommitted_xa() const; bool is_explicit_XA() const { return xid_cache_element != 0; } @@ -171,12 +178,19 @@ struct XID_STATE { { //TODO: what's an equivalent //xid.null(); - is_binlogged= false; + //is_binlogged= false; + unset_binlogged(); } void set_binlogged() - { is_binlogged= true; } + { + if (xid_cache_element) + xid_cache_element->set(XID_cache_element::BINLOGGED); + } void unset_binlogged() - { is_binlogged= false; } + { + if (xid_cache_element) + xid_cache_element->set(~XID_cache_element::BINLOGGED); + } }; void xid_cache_init(void); @@ -193,3 +207,7 @@ bool trans_xa_rollback(THD *thd); bool trans_xa_detach(THD *thd); bool mysql_xa_recover(THD *thd); bool applier_reset_xa_trans(THD *thd); +void attach_native_trx(THD *thd); +my_bool detach_native_trx(THD *thd, plugin_ref plugin, void *unused); + +#endif /* XA_INCLUDED */ |
