summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorSachin <sachinsetia1001@gmail.com>2021-01-29 11:59:14 +0000
committerAndrei <andrei.elkin@mariadb.com>2022-01-27 21:25:07 +0200
commit0c5d1342ae6b5ab3256848be7a83e5c3b1f21566 (patch)
treee44e3d801abfcd6e269c099d6d05b1716b399ac6 /sql
parentc64e507fade9f79de3331bef1c5e04da975737c5 (diff)
downloadmariadb-git-0c5d1342ae6b5ab3256848be7a83e5c3b1f21566.tar.gz
MDEV-11675 Lag Free Alter On Slave
This commit implements two phase binloggable ALTER. When a new @@session.binlog_alter_two_phase = YES ALTER query gets logged in two parts, the START ALTER and the COMMIT or ROLLBACK ALTER. START Alter is written in binlog as soon as necessary locks have been acquired for the table. The timing is such that any concurrent DML:s that update the same table are either committed, thus logged into binary log having done work on the old version of the table, or will be queued for execution on its new version. The "COMPLETE" COMMIT or ROLLBACK ALTER are written at the very point of a normal "single-piece" ALTER that is after the most of the query work is done. When its result is positive COMMIT ALTER is written, otherwise ROLLBACK ALTER is written with specific error happened after START ALTER phase. Replication of two-phase binloggable ALTER is cross-version safe. Specifically the OLD slave merely does not recognized the start alter part, still being able to process and memorize its gtid. Two phase logged ALTER is read from binlog by mysqlbinlog to produce BINLOG 'string', where 'string' contains base64 encoded Query_log_event containing either the start part of ALTER, or a completion part. The Query details can be displayed with `-v` flag, similarly to ROW format events. Notice, mysqlbinlog output containing parts of two-phase binloggable ALTER is processable correctly only by binlog_alter_two_phase server. @@log_warnings > 2 can reveal details of binlogging and slave side processing of the ALTER parts. The current commit also carries fixes to the following list of reported bugs: MDEV-27511, MDEV-27471, MDEV-27349, MDEV-27628, MDEV-27528. Thanks to all people involved into early discussion of the feature including Kristian Nielsen, those who helped to design, implement and test: Sergei Golubchik, Andrei Elkin who took the burden of the implemenation completion, Sujatha Sivakumar, Brandon Nesterenko, Alice Sherepa, Ramesh Sivaraman, Jan Lindstrom.
Diffstat (limited to 'sql')
-rw-r--r--sql/log.cc162
-rw-r--r--sql/log.h3
-rw-r--r--sql/log_event.cc42
-rw-r--r--sql/log_event.h43
-rw-r--r--sql/log_event_client.cc62
-rw-r--r--sql/log_event_server.cc431
-rw-r--r--sql/mdl.cc8
-rw-r--r--sql/mysqld.cc4
-rw-r--r--sql/mysqld.h2
-rw-r--r--sql/rpl_gtid.cc35
-rw-r--r--sql/rpl_mi.cc15
-rw-r--r--sql/rpl_mi.h21
-rw-r--r--sql/rpl_parallel.cc263
-rw-r--r--sql/rpl_parallel.h26
-rw-r--r--sql/rpl_rli.cc12
-rw-r--r--sql/rpl_rli.h52
-rw-r--r--sql/share/errmsg-utf8.txt2
-rw-r--r--sql/slave.cc125
-rw-r--r--sql/sql_alter.cc3
-rw-r--r--sql/sql_binlog.cc78
-rw-r--r--sql/sql_class.cc10
-rw-r--r--sql/sql_class.h40
-rw-r--r--sql/sql_table.cc394
-rw-r--r--sql/sql_table.h3
-rw-r--r--sql/sys_vars.cc9
-rw-r--r--sql/temporary_tables.cc19
26 files changed, 1716 insertions, 148 deletions
diff --git a/sql/log.cc b/sql/log.cc
index da7b2dbda71..8a7ac7890d3 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -57,6 +57,7 @@
#include "semisync_master.h"
#include "sp_rcontext.h"
#include "sp_head.h"
+#include "sql_table.h"
#include "wsrep_mysqld.h"
#ifdef WITH_WSREP
@@ -576,13 +577,137 @@ public:
ulong binlog_id;
/* Set if we get an error during commit that must be returned from unlog(). */
bool delayed_error;
-
+ //Will be reset when gtid is written into binlog
+ uchar gtid_flags3;
+ decltype (rpl_gtid::seq_no) sa_seq_no;
private:
binlog_cache_mngr& operator=(const binlog_cache_mngr& info);
binlog_cache_mngr(const binlog_cache_mngr& info);
};
+/**
+ The function handles the first phase of two-phase binlogged ALTER.
+ On master binlogs START ALTER when that is configured to do so.
+ On slave START ALTER gets binlogged and its gtid committed into gtid slave pos
+ table.
+
+ @param thd Thread handle.
+ @param start_alter_id Start Alter identifier or zero.
+ @param[out]
+ partial_alter Is set to true when Start Alter phase is completed.
+ @param if_exists True indicates the binary logging of the query
+ should be done with "if exists" option.
+
+ @return false on success, true on failure
+ @return @c partial_alter set to @c true when START ALTER phase
+ has been completed
+*/
+bool write_bin_log_start_alter(THD *thd, bool& partial_alter,
+ uint64 start_alter_id, bool if_exists)
+{
+#if defined(HAVE_REPLICATION)
+ if (thd->variables.option_bits & OPTION_BIN_TMP_LOG_OFF)
+ return false;
+
+ if (start_alter_id)
+ {
+ if (thd->rgi_slave->get_finish_event_group_called())
+ return false; // can get here through retrying
+
+ DBUG_EXECUTE_IF("at_write_start_alter", {
+ debug_sync_set_action(thd,
+ STRING_WITH_LEN("now wait_for alter_cont"));
+ });
+
+ Master_info *mi= thd->rgi_slave->rli->mi;
+ start_alter_info *info= thd->rgi_slave->sa_info;
+ bool is_shutdown= false;
+
+ info->sa_seq_no= start_alter_id;
+ info->domain_id= thd->variables.gtid_domain_id;
+ mysql_mutex_lock(&mi->start_alter_list_lock);
+ // possible stop-slave's marking of the whole alter state list is checked
+ is_shutdown= mi->is_shutdown;
+ mi->start_alter_list.push_back(info, &mi->mem_root);
+ mysql_mutex_unlock(&mi->start_alter_list_lock);
+ info->state= start_alter_state::REGISTERED;
+ thd->rgi_slave->commit_orderer.wait_for_prior_commit(thd);
+ thd->rgi_slave->start_alter_ev->update_pos(thd->rgi_slave);
+ if (mysql_bin_log.is_open())
+ {
+ Write_log_with_flags wlwf (thd, Gtid_log_event::FL_START_ALTER_E1);
+ if (write_bin_log(thd, true, thd->query(), thd->query_length()))
+ {
+ DBUG_ASSERT(thd->is_error());
+ return true;
+ }
+ }
+ thd->rgi_slave->mark_start_commit();
+ thd->wakeup_subsequent_commits(0);
+ thd->rgi_slave->finish_start_alter_event_group();
+
+ if (is_shutdown)
+ {
+ /* SA exists abruptly and will notify any CA|RA waiter. */
+ mysql_mutex_lock(&mi->start_alter_lock);
+ /*
+ If there is (or will be) unlikely any CA it will execute
+ the whole query before to stop itself.
+ */
+ info->direct_commit_alter= true;
+ info->state= start_alter_state::ROLLBACK_ALTER;
+ mysql_mutex_unlock(&mi->start_alter_lock);
+
+ return true;
+ }
+
+ return false;
+ }
+#endif
+
+#ifndef WITH_WSREP
+ rpl_group_info *rgi= thd->rgi_slave ? thd->rgi_slave : thd->rgi_fake;
+#else
+ rpl_group_info *rgi= thd->slave_thread ? thd->rgi_slave :
+ WSREP(thd) ? (thd->wsrep_rgi ? thd->wsrep_rgi : thd->rgi_fake) :
+ thd->rgi_fake;
+#endif
+
+ if (!rgi && thd->variables.binlog_alter_two_phase)
+ {
+ /* slave applier can handle here only regular ALTER */
+ DBUG_ASSERT(!rgi || !(rgi->gtid_ev_flags_extra &
+ (Gtid_log_event::FL_START_ALTER_E1 |
+ Gtid_log_event::FL_COMMIT_ALTER_E1 |
+ Gtid_log_event::FL_ROLLBACK_ALTER_E1)));
+
+ /*
+ After logging binlog state stays flagged with SA flags3 an seq_no.
+ The state is not reset after write_bin_log() is done which is
+ deferred for the second logging phase.
+ */
+ thd->set_binlog_flags_for_alter(Gtid_log_event::FL_START_ALTER_E1);
+ if(write_bin_log_with_if_exists(thd, false, false, if_exists, false))
+ {
+ DBUG_ASSERT(thd->is_error());
+
+ thd->set_binlog_flags_for_alter(0);
+ return true;
+ }
+ partial_alter= true;
+ }
+ else if (rgi && rgi->direct_commit_alter)
+ {
+ DBUG_ASSERT(rgi->gtid_ev_flags_extra &
+ Gtid_log_event::FL_COMMIT_ALTER_E1);
+
+ partial_alter= true;
+ }
+
+ return false;
+}
+
bool LOGGER::is_log_table_enabled(uint log_table_type)
{
switch (log_table_type) {
@@ -5752,6 +5877,39 @@ binlog_cache_mngr *THD::binlog_setup_trx_data()
DBUG_RETURN(cache_mngr);
}
+
+/*
+ Two phase logged ALTER getter and setter methods.
+*/
+uchar THD::get_binlog_flags_for_alter()
+{
+ return mysql_bin_log.is_open() ? binlog_setup_trx_data()->gtid_flags3 : 0;
+}
+
+void THD::set_binlog_flags_for_alter(uchar flags)
+{
+ if (mysql_bin_log.is_open())
+ {
+ // SA must find the flag set empty
+ DBUG_ASSERT(flags != Gtid_log_event::FL_START_ALTER_E1 ||
+ binlog_setup_trx_data()->gtid_flags3 == 0);
+
+ binlog_setup_trx_data()->gtid_flags3= flags;
+ }
+}
+
+uint64 THD::get_binlog_start_alter_seq_no()
+{
+ return mysql_bin_log.is_open() ? binlog_setup_trx_data()->sa_seq_no : 0;
+}
+
+void THD::set_binlog_start_alter_seq_no(uint64 s_no)
+{
+ if (mysql_bin_log.is_open())
+ binlog_setup_trx_data()->sa_seq_no= s_no;
+}
+
+
/*
Function to start a statement and optionally a transaction for the
binary log.
@@ -6264,6 +6422,8 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
DBUG_RETURN(true);
thd->set_last_commit_gtid(gtid);
+ if (thd->get_binlog_flags_for_alter() & Gtid_log_event::FL_START_ALTER_E1)
+ thd->set_binlog_start_alter_seq_no(gtid.seq_no);
Gtid_log_event gtid_event(thd, seq_no, domain_id, standalone,
LOG_EVENT_SUPPRESS_USE_F, is_transactional,
diff --git a/sql/log.h b/sql/log.h
index b55ed0f31f6..b4c82e0aed4 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -1266,5 +1266,8 @@ get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list);
int binlog_commit(THD *thd, bool all, bool is_ro_1pc= false);
int binlog_commit_by_xid(handlerton *hton, XID *xid);
int binlog_rollback_by_xid(handlerton *hton, XID *xid);
+bool write_bin_log_start_alter(THD *thd, bool& partial_alter,
+ uint64 start_alter_id, bool log_if_exists);
+
#endif /* LOG_H */
diff --git a/sql/log_event.cc b/sql/log_event.cc
index afca79b008a..d3d44e8dc30 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -1375,6 +1375,7 @@ code_name(int code)
case Q_MASTER_DATA_WRITTEN_CODE: return "Q_MASTER_DATA_WRITTEN_CODE";
case Q_HRNOW: return "Q_HRNOW";
case Q_XID: return "XID";
+ case Q_GTID_FLAGS3: return "Q_GTID_FLAGS3";
}
sprintf(buf, "CODE#%d", code);
return buf;
@@ -1423,7 +1424,8 @@ Query_log_event::Query_log_event(const uchar *buf, uint event_len,
flags2_inited(0), sql_mode_inited(0), charset_inited(0), flags2(0),
auto_increment_increment(1), auto_increment_offset(1),
time_zone_len(0), lc_time_names_number(0), charset_database_number(0),
- table_map_for_update(0), xid(0), master_data_written(0)
+ table_map_for_update(0), xid(0), master_data_written(0), gtid_flags_extra(0),
+ sa_seq_no(0)
{
ulong data_len;
uint32 tmp;
@@ -1439,28 +1441,28 @@ Query_log_event::Query_log_event(const uchar *buf, uint event_len,
post_header_len= description_event->post_header_len[event_type-1];
DBUG_PRINT("info",("event_len: %u common_header_len: %d post_header_len: %d",
event_len, common_header_len, post_header_len));
-
+
/*
We test if the event's length is sensible, and if so we compute data_len.
We cannot rely on QUERY_HEADER_LEN here as it would not be format-tolerant.
We use QUERY_HEADER_MINIMAL_LEN which is the same for 3.23, 4.0 & 5.0.
*/
if (event_len < (uint)(common_header_len + post_header_len))
- DBUG_VOID_RETURN;
+ DBUG_VOID_RETURN;
data_len= event_len - (common_header_len + post_header_len);
buf+= common_header_len;
-
- thread_id= slave_proxy_id= uint4korr(buf + Q_THREAD_ID_OFFSET);
- exec_time= uint4korr(buf + Q_EXEC_TIME_OFFSET);
- db_len= buf[Q_DB_LEN_OFFSET]; // TODO: add a check of all *_len vars
- error_code= uint2korr(buf + Q_ERR_CODE_OFFSET);
+
+ thread_id = slave_proxy_id = uint4korr(buf + Q_THREAD_ID_OFFSET);
+ exec_time = uint4korr(buf + Q_EXEC_TIME_OFFSET);
+ db_len = (uchar)buf[Q_DB_LEN_OFFSET]; // TODO: add a check of all *_len vars
+ error_code = uint2korr(buf + Q_ERR_CODE_OFFSET);
/*
5.0 format starts here.
Depending on the format, we may or not have affected/warnings etc
The remnent post-header to be parsed has length:
*/
- tmp= post_header_len - QUERY_HEADER_MINIMAL_LEN;
+ tmp= post_header_len - QUERY_HEADER_MINIMAL_LEN;
if (tmp)
{
status_vars_len= uint2korr(buf + Q_STATUS_VARS_LEN_OFFSET);
@@ -1607,13 +1609,26 @@ Query_log_event::Query_log_event(const uchar *buf, uint event_len,
pos+= 3;
break;
}
- case Q_XID:
+ case Q_XID:
{
CHECK_SPACE(pos, end, 8);
xid= uint8korr(pos);
pos+= 8;
break;
}
+ case Q_GTID_FLAGS3:
+ {
+ CHECK_SPACE(pos, end, 1);
+ gtid_flags_extra= *pos++;
+ if (gtid_flags_extra & (Gtid_log_event::FL_COMMIT_ALTER_E1 |
+ Gtid_log_event::FL_ROLLBACK_ALTER_E1))
+ {
+ CHECK_SPACE(pos, end, 8);
+ sa_seq_no = uint8korr(pos);
+ pos+= 8;
+ }
+ break;
+ }
default:
/* That's why you must write status vars in growing order of code */
DBUG_PRINT("info",("Query_log_event has unknown status vars (first has\
@@ -2609,7 +2624,7 @@ Gtid_log_event::Gtid_log_event(const uchar *buf, uint event_len,
extra engines flags presence is identifed by non-zero byte value
at this point
*/
- if (flags_extra & FL_EXTRA_MULTI_ENGINE)
+ if (flags_extra & FL_EXTRA_MULTI_ENGINE_E1)
{
DBUG_ASSERT(static_cast<uint>(buf - buf_0) < event_len);
@@ -2617,6 +2632,11 @@ Gtid_log_event::Gtid_log_event(const uchar *buf, uint event_len,
DBUG_ASSERT(extra_engines > 0);
}
+ if (flags_extra & (FL_COMMIT_ALTER_E1 | FL_ROLLBACK_ALTER_E1))
+ {
+ sa_seq_no= uint8korr(buf);
+ buf+= 8;
+ }
}
/*
the strict '<' part of the assert corresponds to extra zero-padded
diff --git a/sql/log_event.h b/sql/log_event.h
index db77be2d99b..db80ca841a2 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -240,7 +240,8 @@ class String;
1 + 8 /* type, table_map_for_update */ + \
1 + 4 /* type, master_data_written */ + \
1 + 3 /* type, sec_part of NOW() */ + \
- 1 + 16 + 1 + 60/* type, user_len, user, host_len, host */)
+ 1 + 16 + 1 + 60/* type, user_len, user, host_len, host */ + \
+ 1 + 2 + 8 /* type, flags3, seq_no */)
#define MAX_LOG_EVENT_HEADER ( /* in order of Query_log_event::write */ \
LOG_EVENT_HEADER_LEN + /* write_header */ \
QUERY_HEADER_LEN + /* write_data */ \
@@ -321,6 +322,7 @@ class String;
#define Q_HRNOW 128
#define Q_XID 129
+#define Q_GTID_FLAGS3 130
/* Intvar event post-header */
/* Intvar event data */
@@ -2196,6 +2198,12 @@ public:
Q_MASTER_DATA_WRITTEN_CODE to the slave's server binlog.
*/
uint32 master_data_written;
+ /*
+ A copy of Gtid event's extra flags that is relevant for two-phase
+ logged ALTER.
+ */
+ uchar gtid_flags_extra;
+ decltype(rpl_gtid::seq_no) sa_seq_no; /* data part for CA/RA flags */
#ifdef MYSQL_SERVER
@@ -2207,6 +2215,7 @@ public:
#endif /* HAVE_REPLICATION */
#else
bool print_query_header(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info);
+ bool print_verbose(IO_CACHE* cache, PRINT_EVENT_INFO* print_event_info);
bool print(FILE* file, PRINT_EVENT_INFO* print_event_info);
#endif
@@ -2220,8 +2229,10 @@ public:
my_free(data_buf);
}
Log_event_type get_type_code() { return QUERY_EVENT; }
- static int dummy_event(String *packet, ulong ev_offset, enum enum_binlog_checksum_alg checksum_alg);
- static int begin_event(String *packet, ulong ev_offset, enum enum_binlog_checksum_alg checksum_alg);
+ static int dummy_event(String *packet, ulong ev_offset,
+ enum enum_binlog_checksum_alg checksum_alg);
+ static int begin_event(String *packet, ulong ev_offset,
+ enum enum_binlog_checksum_alg checksum_alg);
#ifdef MYSQL_SERVER
bool write();
virtual bool write_post_header_for_derived() { return FALSE; }
@@ -2247,6 +2258,9 @@ public: /* !!! Public in this patch to allow old usage */
size_t event_len,
enum enum_binlog_checksum_alg
checksum_alg);
+ int handle_split_alter_query_log_event(rpl_group_info *rgi,
+ bool &skip_error_check);
+
#endif /* HAVE_REPLICATION */
/*
If true, the event always be applied by slave SQL thread or be printed by
@@ -3654,13 +3668,19 @@ public:
uint64 seq_no;
uint64 commit_id;
uint32 domain_id;
+ uint64 sa_seq_no; // start alter identifier for CA/RA
#ifdef MYSQL_SERVER
event_xid_t xid;
#else
event_mysql_xid_t xid;
#endif
uchar flags2;
- uint flags_extra; // more flags area placed after the regular flags2's one
+ /*
+ More flags area placed after the regular flags2's area. The type
+ is declared to be in agreement with Query_log_event's member that
+ may copy the flags_extra value.
+ */
+ decltype(Query_log_event::gtid_flags_extra) flags_extra;
/*
Number of engine participants in transaction minus 1.
When zero the event does not contain that information.
@@ -3698,14 +3718,19 @@ public:
/* FL_"COMMITTED or ROLLED-BACK"_XA is set for XA transaction. */
static const uchar FL_COMPLETED_XA= 128;
- /* Flags_extra. */
-
/*
- FL_EXTRA_MULTI_ENGINE is set for event group comprising a transaction
+ flags_extra 's bit values.
+ _E1 suffix below stands for Extra to infer the extra flags,
+ their "1st" generation (more *generations* can come when necessary).
+
+ FL_EXTRA_MULTI_ENGINE_E1 is set for event group comprising a transaction
involving multiple storage engines. No flag and extra data are added
to the event when the transaction involves only one engine.
*/
- static const uchar FL_EXTRA_MULTI_ENGINE= 1;
+ static const uchar FL_EXTRA_MULTI_ENGINE_E1= 1;
+ static const uchar FL_START_ALTER_E1= 2;
+ static const uchar FL_COMMIT_ALTER_E1= 4;
+ static const uchar FL_ROLLBACK_ALTER_E1= 8;
#ifdef MYSQL_SERVER
Gtid_log_event(THD *thd_arg, uint64 seq_no, uint32 domain_id, bool standalone,
@@ -5875,4 +5900,6 @@ int row_log_event_uncompress(const Format_description_log_event
uchar* buf, ulong buf_size, bool *is_malloc,
uchar **dst, ulong *newlen);
+bool is_parallel_retry_error(rpl_group_info *rgi, int err);
+
#endif /* _log_event_h */
diff --git a/sql/log_event_client.cc b/sql/log_event_client.cc
index 66f2753d96b..9e75d3f4c4c 100644
--- a/sql/log_event_client.cc
+++ b/sql/log_event_client.cc
@@ -17,6 +17,7 @@
*/
+#include "log_event.h"
#ifndef MYSQL_CLIENT
#error MYSQL_CLIENT must be defined here
#endif
@@ -2005,6 +2006,19 @@ err:
return 1;
}
+bool Query_log_event::print_verbose(IO_CACHE* cache, PRINT_EVENT_INFO* print_event_info)
+{
+ if (my_b_printf(cache, "### ") ||
+ my_b_write(cache, (uchar *) query, q_len) ||
+ my_b_printf(cache, "\n"))
+ {
+ goto err;
+ }
+ return 0;
+
+err:
+ return 1;
+}
bool Query_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
{
@@ -2020,9 +2034,42 @@ bool Query_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
goto err;
if (!is_flashback)
{
- if (my_b_write(&cache, (uchar*) query, q_len) ||
- my_b_printf(&cache, "\n%s\n", print_event_info->delimiter))
- goto err;
+ if (gtid_flags_extra & (Gtid_log_event::FL_START_ALTER_E1 |
+ Gtid_log_event::FL_COMMIT_ALTER_E1 |
+ Gtid_log_event::FL_ROLLBACK_ALTER_E1))
+ {
+ bool do_print_encoded=
+ print_event_info->base64_output_mode != BASE64_OUTPUT_NEVER &&
+ print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS &&
+ !print_event_info->short_form;
+ bool comment_mode= do_print_encoded &&
+ gtid_flags_extra & (Gtid_log_event::FL_START_ALTER_E1 |
+ Gtid_log_event::FL_ROLLBACK_ALTER_E1);
+
+ if(comment_mode)
+ my_b_printf(&cache, "/*!100600 ");
+ if (do_print_encoded)
+ my_b_printf(&cache, "BINLOG '\n");
+ if (print_base64(&cache, print_event_info, do_print_encoded))
+ goto err;
+ if (do_print_encoded)
+ {
+ if(comment_mode)
+ my_b_printf(&cache, "' */%s\n", print_event_info->delimiter);
+ else
+ my_b_printf(&cache, "'%s\n", print_event_info->delimiter);
+ }
+ if (print_event_info->verbose && print_verbose(&cache, print_event_info))
+ {
+ goto err;
+ }
+ }
+ else
+ {
+ if (my_b_write(&cache, (uchar*) query, q_len) ||
+ my_b_printf(&cache, "\n%s\n", print_event_info->delimiter))
+ goto err;
+ }
}
else // is_flashback == 1
{
@@ -3856,6 +3903,15 @@ Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
if (flags2 & FL_WAITED)
if (my_b_write_string(&cache, " waited"))
goto err;
+ if (flags_extra & FL_START_ALTER_E1)
+ if (my_b_write_string(&cache, " START ALTER"))
+ goto err;
+ if (flags_extra & FL_COMMIT_ALTER_E1)
+ if (my_b_printf(&cache, " COMMIT ALTER id= %lu", sa_seq_no))
+ goto err;
+ if (flags_extra & FL_ROLLBACK_ALTER_E1)
+ if (my_b_printf(&cache, " ROLLBACK ALTER id= %lu", sa_seq_no))
+ goto err;
if (my_b_printf(&cache, "\n"))
goto err;
diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc
index cc103751b13..cf16d538744 100644
--- a/sql/log_event_server.cc
+++ b/sql/log_event_server.cc
@@ -52,6 +52,7 @@
#include "compat56.h"
#include "wsrep_mysqld.h"
#include "sql_insert.h"
+#include "sql_table.h"
#include <my_bitmap.h>
#include "rpl_utility.h"
@@ -138,7 +139,7 @@ static const char *HA_ERR(int i)
deadlocks; such errors are handled automatically by rolling back re-trying
the transactions, so should not pollute the error log.
*/
-static bool
+bool
is_parallel_retry_error(rpl_group_info *rgi, int err)
{
if (!rgi->is_parallel_exec)
@@ -1303,11 +1304,25 @@ bool Query_log_event::write()
start+= 8;
}
+ if (gtid_flags_extra)
+ {
+ *start++= Q_GTID_FLAGS3;
+ *start++= gtid_flags_extra;
+ if (gtid_flags_extra &
+ (Gtid_log_event::FL_COMMIT_ALTER_E1 |
+ Gtid_log_event::FL_ROLLBACK_ALTER_E1))
+ {
+ int8store(start, sa_seq_no);
+ start+= 8;
+ }
+ }
+
+
/*
NOTE: When adding new status vars, please don't forget to update
the MAX_SIZE_LOG_EVENT_STATUS in log_event.h and update the function
code_name() in this file.
-
+
Here there could be code like
if (command-line-option-which-says-"log_this_variable" && inited)
{
@@ -1415,7 +1430,9 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
lc_time_names_number(thd_arg->variables.lc_time_names->number),
charset_database_number(0),
table_map_for_update((ulonglong)thd_arg->table_map_for_update),
- master_data_written(0)
+ master_data_written(0),
+ gtid_flags_extra(thd_arg->get_binlog_flags_for_alter()),
+ sa_seq_no(0)
{
/* status_vars_len is set just before writing the event */
@@ -1551,11 +1568,15 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
use_cache= trx_cache= TRUE;
break;
default:
- use_cache= sqlcom_can_generate_row_events(thd);
+ use_cache= (gtid_flags_extra) ? false : sqlcom_can_generate_row_events(thd);
break;
}
}
+ if (gtid_flags_extra & (Gtid_log_event::FL_COMMIT_ALTER_E1 |
+ Gtid_log_event::FL_ROLLBACK_ALTER_E1))
+ sa_seq_no= thd_arg->get_binlog_start_alter_seq_no();
+
if (!use_cache || direct)
{
cache_type= Log_event::EVENT_NO_CACHE;
@@ -1627,6 +1648,223 @@ bool test_if_equal_repl_errors(int expected_error, int actual_error)
}
+static start_alter_info *get_new_start_alter_info(THD *thd)
+{
+ /*
+ Why on global memory ?- So that process_commit/rollback_alter should not get
+ error when spawned threads exits too early.
+ */
+ start_alter_info *info;
+ if (!(info= (start_alter_info *)my_malloc(PSI_INSTRUMENT_ME,
+ sizeof(start_alter_info), MYF(MY_WME))))
+ {
+ sql_print_error("Failed to allocate memory for ddl log free list");
+ return 0;
+ }
+ info->sa_seq_no= 0;
+ info->domain_id= 0;
+ info->direct_commit_alter= false;
+ info->state= start_alter_state::INVALID;
+ mysql_cond_init(0, &info->start_alter_cond, NULL);
+ info->error= 0;
+
+ return info;
+}
+
+
+/*
+ Perform necessary actions for two-phase-logged ALTER parts, to
+ return
+
+ 0 when the event's query proceeds normal parsing and execution
+ 1 when the event skips parsing and execution
+ -1 as error.
+*/
+int Query_log_event::handle_split_alter_query_log_event(rpl_group_info *rgi,
+ bool &skip_error_check)
+{
+ int rc= 0;
+
+ rgi->gtid_ev_flags_extra= gtid_flags_extra;
+ if (gtid_flags_extra & Gtid_log_event::FL_START_ALTER_E1)
+ {
+ //No Slave, Normal Slave, Start Alter under Worker 1 will simple binlog and exit
+ if(!rgi->rpt || rgi->reserved_start_alter_thread || WSREP(thd))
+ {
+ rc= 1;
+ /*
+ We will just write the binlog and move to next event , because COMMIT
+ Alter will take care of actual work
+ */
+ rgi->reserved_start_alter_thread= false;
+ thd->lex->sql_command= SQLCOM_ALTER_TABLE;
+ Write_log_with_flags wlwf(thd, Gtid_log_event::FL_START_ALTER_E1,
+ true /* wsrep to isolation end */);
+#ifdef WITH_WSREP
+ if (WSREP(thd) && wsrep_thd_is_local(thd) &&
+ // no need to supply other than db in this case
+ wsrep_to_isolation_begin(thd, db, NULL,NULL,NULL,NULL,NULL))
+ return -1;
+#endif
+ if (write_bin_log(thd, false, thd->query(), thd->query_length()))
+ return -1;
+
+ my_ok(thd);
+ return rc;
+ }
+ if (!rgi->sa_info)
+ rgi->sa_info= get_new_start_alter_info(thd);
+ else
+ {
+ /* Not send Start-Alter into query execution when it's to rollback */
+ mysql_mutex_lock(&rgi->rli->mi->start_alter_lock);
+ if (rgi->sa_info->state == start_alter_state::ROLLBACK_ALTER)
+ mysql_cond_broadcast(&rgi->sa_info->start_alter_cond);
+ mysql_mutex_unlock(&rgi->rli->mi->start_alter_lock);
+ }
+
+ return rc;
+ }
+
+ bool is_CA= (gtid_flags_extra & Gtid_log_event::FL_COMMIT_ALTER_E1) ? true : false;
+ if (is_CA)
+ {
+ DBUG_EXECUTE_IF("rpl_slave_stop_CA_before_binlog",
+ {
+ // the awake comes from STOP-SLAVE running driver (sql) thread
+ debug_sync_set_action(thd,
+ STRING_WITH_LEN("now WAIT_FOR proceed_CA_1"));
+ });
+ }
+ start_alter_info *info=NULL;
+ Master_info *mi= NULL;
+
+ rgi->gtid_ev_sa_seq_no= sa_seq_no;
+ // is set for both the direct execution and the write to binlog
+ thd->set_binlog_start_alter_seq_no(sa_seq_no);
+ mi= rgi->rli->mi;
+ mysql_mutex_lock(&mi->start_alter_list_lock);
+ {
+ List_iterator<start_alter_info> info_iterator(mi->start_alter_list);
+ while ((info= info_iterator++))
+ {
+ if(info->sa_seq_no == rgi->gtid_ev_sa_seq_no &&
+ info->domain_id == rgi->current_gtid.domain_id)
+ {
+ info_iterator.remove();
+ break;
+ }
+ }
+ }
+ mysql_mutex_unlock(&mi->start_alter_list_lock);
+
+ if (!info)
+ {
+ if (is_CA)
+ {
+ /*
+ error handeling, direct_commit_alter is turned on, so that we dont
+ wait for master reply in mysql_alter_table (in wait_for_master)
+ */
+ rgi->direct_commit_alter= true;
+#ifdef WITH_WSREP
+ if (WSREP(thd))
+ thd->set_binlog_flags_for_alter(Gtid_log_event::FL_COMMIT_ALTER_E1);
+#endif
+ goto cleanup;
+ }
+ else
+ {
+ //Just write the binlog because there is nothing to be done
+ goto write_binlog;
+ }
+ }
+
+ mysql_mutex_lock(&mi->start_alter_lock);
+ if (info->state != start_alter_state::COMPLETED)
+ {
+ if (is_CA)
+ info->state= start_alter_state::COMMIT_ALTER;
+ else
+ info->state= start_alter_state::ROLLBACK_ALTER;
+ mysql_cond_broadcast(&info->start_alter_cond);
+ mysql_mutex_unlock(&mi->start_alter_lock);
+ /*
+ Wait till Start Alter worker has changed the state to ::COMPLETED
+ when start alter worker reaches the old code write_bin_log(), it will
+ change state to COMMITTED.
+ COMMITTED and `direct_commit_alter == true` at the same time indicates
+ the query needs re-execution by the CA running thread.
+ */
+ mysql_mutex_lock(&mi->start_alter_lock);
+
+ DBUG_ASSERT(info->state == start_alter_state::COMPLETED ||
+ !info->direct_commit_alter);
+
+ while(info->state != start_alter_state::COMPLETED)
+ mysql_cond_wait(&info->start_alter_cond, &mi->start_alter_lock);
+ }
+ else
+ {
+ // SA has completed and left being kicked out by deadlock or ftwrl
+ DBUG_ASSERT(info->direct_commit_alter);
+ }
+ mysql_mutex_unlock(&mi->start_alter_lock);
+
+ if (info->direct_commit_alter)
+ {
+ rgi->direct_commit_alter= true; // execute the query as if there was no SA
+ if (is_CA)
+ goto cleanup;
+ }
+
+write_binlog:
+ rc= 1;
+
+ if(!is_CA)
+ {
+ if(((info && info->error) || error_code) &&
+ global_system_variables.log_warnings > 2)
+ {
+ sql_print_information("Query '%s' having %d error code on master "
+ "is rolled back%s", query, error_code,
+ !(info && info->error) ? "." : ";");
+ if (info && info->error)
+ sql_print_information("its execution on slave %sproduced %d error.",
+ info->error == error_code ? "re":"", info->error);
+ }
+ }
+ {
+ thd->lex->sql_command= SQLCOM_ALTER_TABLE;
+ Write_log_with_flags wlwf(thd, is_CA ? Gtid_log_event::FL_COMMIT_ALTER_E1 :
+ Gtid_log_event::FL_ROLLBACK_ALTER_E1,
+ true);
+#ifdef WITH_WSREP
+ if (WSREP(thd) && wsrep_thd_is_local(thd) &&
+ wsrep_to_isolation_begin(thd, db, NULL,NULL,NULL,NULL,NULL))
+ rc= -1;
+#endif
+ if (rc != -1 &&
+ write_bin_log(thd, false, thd->query(), thd->query_length()))
+ rc= -1;
+ }
+
+ if (!thd->is_error())
+ {
+ skip_error_check= true;
+ my_ok(thd);
+ }
+
+cleanup:
+ if (info)
+ {
+ mysql_cond_destroy(&info->start_alter_cond);
+ my_free(info);
+ }
+ return rc;
+}
+
+
/**
@todo
Compare the values of "affected rows" around here. Something
@@ -1655,6 +1893,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
Relay_log_info const *rli= rgi->rli;
Rpl_filter *rpl_filter= rli->mi->rpl_filter;
bool current_stmt_is_commit;
+ bool skip_error_check= false;
DBUG_ENTER("Query_log_event::do_apply_event");
/*
@@ -1665,6 +1904,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
you.
*/
thd->catalog= catalog_len ? (char *) catalog : (char *)"";
+ rgi->start_alter_ev= this;
size_t valid_len= Well_formed_prefix(system_charset_info,
db, db_len, NAME_LEN).length();
@@ -1709,13 +1949,15 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
*/
if (is_trans_keyword() || rpl_filter->db_ok(thd->db.str))
{
+ bool is_rb_alter= gtid_flags_extra & Gtid_log_event::FL_ROLLBACK_ALTER_E1;
+
thd->set_time(when, when_sec_part);
thd->set_query_and_id((char*)query_arg, q_len_arg,
thd->charset(), next_query_id());
thd->variables.pseudo_thread_id= thread_id; // for temp tables
DBUG_PRINT("query",("%s", thd->query()));
- if (unlikely(!(expected_error= error_code)) ||
+ if (unlikely(!(expected_error= !is_rb_alter ? error_code : 0)) ||
ignored_error_code(expected_error) ||
!unexpected_error_code(expected_error))
{
@@ -1749,7 +1991,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
if (charset_inited)
{
rpl_sql_thread_info *sql_info= thd->system_thread_info.rpl_sql_info;
- if (sql_info->cached_charset_compare(charset))
+ if (thd->slave_thread && sql_info->cached_charset_compare(charset))
{
/* Verify that we support the charsets found in the event. */
if (!(thd->variables.character_set_client=
@@ -1887,40 +2129,62 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
thd->variables.option_bits|= OPTION_MASTER_SQL_ERROR;
thd->variables.option_bits&= ~OPTION_GTID_BEGIN;
}
- /* Execute the query (note that we bypass dispatch_command()) */
- Parser_state parser_state;
- if (!parser_state.init(thd, thd->query(), thd->query_length()))
+
+ int sa_result= 0;
+ bool is_2p_alter= gtid_flags_extra &
+ (Gtid_log_event::FL_START_ALTER_E1 |
+ Gtid_log_event::FL_COMMIT_ALTER_E1 |
+ Gtid_log_event::FL_ROLLBACK_ALTER_E1);
+ if (is_2p_alter)
+ sa_result= handle_split_alter_query_log_event(rgi, skip_error_check);
+ if (sa_result == 0)
{
- DBUG_ASSERT(thd->m_digest == NULL);
- thd->m_digest= & thd->m_digest_state;
- DBUG_ASSERT(thd->m_statement_psi == NULL);
- thd->m_statement_psi= MYSQL_START_STATEMENT(&thd->m_statement_state,
- stmt_info_rpl.m_key,
- thd->db.str, thd->db.length,
- thd->charset(), NULL);
- THD_STAGE_INFO(thd, stage_starting);
- MYSQL_SET_STATEMENT_TEXT(thd->m_statement_psi, thd->query(), thd->query_length());
- if (thd->m_digest != NULL)
- thd->m_digest->reset(thd->m_token_array, max_digest_length);
-
- if (thd->slave_thread)
- {
- /*
- To be compatible with previous releases, the slave thread uses the global
- log_slow_disabled_statements value, wich can be changed dynamically, so we
- have to set the sql_log_slow respectively.
- */
- thd->variables.sql_log_slow= !MY_TEST(global_system_variables.log_slow_disabled_statements & LOG_SLOW_DISABLE_SLAVE);
- }
-
- mysql_parse(thd, thd->query(), thd->query_length(), &parser_state);
- /* Finalize server status flags after executing a statement. */
- thd->update_server_status();
- log_slow_statement(thd);
- thd->lex->restore_set_statement_var();
+ /* Execute the query (note that we bypass dispatch_command()) */
+ Parser_state parser_state;
+ if (!parser_state.init(thd, thd->query(), thd->query_length()))
+ {
+ DBUG_ASSERT(thd->m_digest == NULL);
+ thd->m_digest= & thd->m_digest_state;
+ DBUG_ASSERT(thd->m_statement_psi == NULL);
+ thd->m_statement_psi= MYSQL_START_STATEMENT(&thd->m_statement_state,
+ stmt_info_rpl.m_key,
+ thd->db.str, thd->db.length,
+ thd->charset(), NULL);
+ THD_STAGE_INFO(thd, stage_starting);
+ MYSQL_SET_STATEMENT_TEXT(thd->m_statement_psi, thd->query(), thd->query_length());
+ if (thd->m_digest != NULL)
+ thd->m_digest->reset(thd->m_token_array, max_digest_length);
+
+ if (thd->slave_thread)
+ {
+ /*
+ To be compatible with previous releases, the slave thread uses the global
+ log_slow_disabled_statements value, wich can be changed dynamically, so we
+ have to set the sql_log_slow respectively.
+ */
+ thd->variables.sql_log_slow= !MY_TEST(global_system_variables.log_slow_disabled_statements & LOG_SLOW_DISABLE_SLAVE);
+ }
+ mysql_parse(thd, thd->query(), thd->query_length(), &parser_state);
+ /* Finalize server status flags after executing a statement. */
+ thd->update_server_status();
+ log_slow_statement(thd);
+ thd->lex->restore_set_statement_var();
+ }
+ }
+ else if(sa_result == -1)
+ {
+ rli->report(ERROR_LEVEL, expected_error, rgi->gtid_info(),
+ "TODO start alter error");
+ thd->is_slave_error= 1;
+ goto end;
}
-
thd->variables.option_bits&= ~OPTION_MASTER_SQL_ERROR;
+ if (is_2p_alter && !rgi->is_parallel_exec)
+ {
+ rgi->gtid_ev_flags_extra= 0;
+ rgi->direct_commit_alter= 0;
+ rgi->gtid_ev_sa_seq_no= 0;
+ }
}
else
{
@@ -1983,7 +2247,8 @@ compare_errors:
If we expected a non-zero error code, and we don't get the same error
code, and it should be ignored or is related to a concurrency issue.
*/
- actual_error= thd->is_error() ? thd->get_stmt_da()->sql_errno() : 0;
+ actual_error= thd->is_error() ? thd->get_stmt_da()->sql_errno() :
+ skip_error_check? expected_error : 0;
DBUG_PRINT("info",("expected_error: %d sql_errno: %d",
expected_error, actual_error));
@@ -2379,6 +2644,39 @@ bool Format_description_log_event::write()
}
#if defined(HAVE_REPLICATION)
+/*
+ Auxiliary function to conduct cleanup of unfinished two-phase logged ALTERs.
+*/
+static void check_and_remove_stale_alter(Relay_log_info *rli)
+{
+ Master_info *mi= rli->mi;
+ start_alter_info *info=NULL;
+
+ mysql_mutex_lock(&mi->start_alter_list_lock);
+ List_iterator<start_alter_info> info_iterator(mi->start_alter_list);
+ while ((info= info_iterator++))
+ {
+ DBUG_ASSERT(info->state == start_alter_state::REGISTERED);
+
+ sql_print_warning("ALTER query started at %u-%u-%llu could not "
+ "be completed because of unexpected master server "
+ "or its binlog change", info->sa_seq_no, // todo:gtid
+ 0, 0);
+ info_iterator.remove();
+ mysql_mutex_lock(&mi->start_alter_lock);
+ info->state= start_alter_state::ROLLBACK_ALTER;
+ mysql_mutex_unlock(&mi->start_alter_lock);
+ mysql_cond_broadcast(&info->start_alter_cond);
+ mysql_mutex_lock(&mi->start_alter_lock);
+ while(info->state != start_alter_state::COMPLETED)
+ mysql_cond_wait(&info->start_alter_cond, &mi->start_alter_lock);
+ mysql_mutex_unlock(&mi->start_alter_lock);
+ mysql_cond_destroy(&info->start_alter_cond);
+ my_free(info);
+ }
+ mysql_mutex_unlock(&mi->start_alter_list_lock);
+}
+
int Format_description_log_event::do_apply_event(rpl_group_info *rgi)
{
int ret= 0;
@@ -2396,16 +2694,23 @@ int Format_description_log_event::do_apply_event(rpl_group_info *rgi)
original place when it comes to us; we'll know this by checking
log_pos ("artificial" events have log_pos == 0).
*/
- if (!is_artificial_event() && created && thd->transaction->all.ha_list)
+ if (!is_artificial_event() && created && !thd->rgi_fake)
{
- /* This is not an error (XA is safe), just an information */
- rli->report(INFORMATION_LEVEL, 0, NULL,
- "Rolling back unfinished transaction (no COMMIT "
- "or ROLLBACK in relay log). A probable cause is that "
- "the master died while writing the transaction to "
- "its binary log, thus rolled back too.");
- rgi->cleanup_context(thd, 1);
+ // check_and_remove stale Start Alter:s
+ if (flags & LOG_EVENT_BINLOG_IN_USE_F)
+ check_and_remove_stale_alter(rli);
+ if (thd->transaction->all.ha_list)
+ {
+ /* This is not an error (XA is safe), just an information */
+ rli->report(INFORMATION_LEVEL, 0, NULL,
+ "Rolling back unfinished transaction (no COMMIT "
+ "or ROLLBACK in relay log). A probable cause is that "
+ "the master died while writing the transaction to "
+ "its binary log, thus rolled back too.");
+ rgi->cleanup_context(thd, 1);
+ }
}
+ DBUG_ASSERT(!thd->rgi_fake || !thd->transaction->all.ha_list);
/*
If this event comes from ourselves, there is no cleaning task to
@@ -3324,7 +3629,14 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg,
extra_engines= count > 1 ? 0 : UCHAR_MAX;
}
if (extra_engines > 0)
- flags_extra|= FL_EXTRA_MULTI_ENGINE;
+ flags_extra|= FL_EXTRA_MULTI_ENGINE_E1;
+ }
+ if (thd->get_binlog_flags_for_alter())
+ {
+ flags_extra |= thd->get_binlog_flags_for_alter();
+ if (flags_extra & (FL_COMMIT_ALTER_E1 | FL_ROLLBACK_ALTER_E1))
+ sa_seq_no= thd->get_binlog_start_alter_seq_no();
+ flags2|= FL_DDL;
}
}
@@ -3397,12 +3709,18 @@ Gtid_log_event::write()
buf[write_len]= flags_extra;
write_len++;
}
- if (flags_extra & FL_EXTRA_MULTI_ENGINE)
+ if (flags_extra & FL_EXTRA_MULTI_ENGINE_E1)
{
buf[write_len]= extra_engines;
write_len++;
}
+ if (flags_extra & (FL_COMMIT_ALTER_E1 | FL_ROLLBACK_ALTER_E1))
+ {
+ int8store(buf + write_len, sa_seq_no);
+ write_len+= 8;
+ }
+
if (write_len < GTID_HEADER_LEN)
{
bzero(buf+write_len, GTID_HEADER_LEN-write_len);
@@ -3466,6 +3784,20 @@ Gtid_log_event::pack_info(Protocol *protocol)
p= strmov(p, " cid=");
p= longlong10_to_str(commit_id, p, 10);
}
+ if (flags_extra & FL_START_ALTER_E1)
+ {
+ p= strmov(p, " START ALTER");
+ }
+ if (flags_extra & FL_COMMIT_ALTER_E1)
+ {
+ p= strmov(p, " COMMIT ALTER id=");
+ p= longlong10_to_str(sa_seq_no, p, 10);
+ }
+ if (flags_extra & FL_ROLLBACK_ALTER_E1)
+ {
+ p= strmov(p, " ROLLBACK ALTER id=");
+ p= longlong10_to_str(sa_seq_no, p, 10);
+ }
protocol->store(buf, p-buf, &my_charset_bin);
}
@@ -3480,6 +3812,9 @@ Gtid_log_event::do_apply_event(rpl_group_info *rgi)
thd->variables.gtid_domain_id= this->domain_id;
thd->variables.gtid_seq_no= this->seq_no;
rgi->gtid_ev_flags2= flags2;
+
+ rgi->gtid_ev_flags_extra= flags_extra;
+ rgi->gtid_ev_sa_seq_no= sa_seq_no;
thd->reset_for_next_command();
if (opt_gtid_strict_mode && opt_bin_log && opt_log_slave_updates)
diff --git a/sql/mdl.cc b/sql/mdl.cc
index c2475bb3b91..bb764d04a42 100644
--- a/sql/mdl.cc
+++ b/sql/mdl.cc
@@ -2232,8 +2232,8 @@ bool MDL_lock::check_if_conflicting_replication_locks(MDL_context *ctx)
/*
If the conflicting thread is another parallel replication
- thread for the same master and it's not in commit stage, then
- the current transaction has started too early and something is
+ thread for the same master and it's not in commit or post-commit stages,
+ then the current transaction has started too early and something is
seriously wrong.
*/
if (conflicting_rgi_slave &&
@@ -2241,7 +2241,9 @@ bool MDL_lock::check_if_conflicting_replication_locks(MDL_context *ctx)
conflicting_rgi_slave->rli == rgi_slave->rli &&
conflicting_rgi_slave->current_gtid.domain_id ==
rgi_slave->current_gtid.domain_id &&
- !conflicting_rgi_slave->did_mark_start_commit)
+ !((conflicting_rgi_slave->did_mark_start_commit ||
+ conflicting_rgi_slave->worker_error) ||
+ conflicting_rgi_slave->finish_event_group_called))
return 1; // Fatal error
}
}
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index f23f485b9a6..3c3424e1350 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -911,6 +911,8 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_LOCK_user_conn, key_LOCK_uuid_short_generator, key_LOG_LOCK_log,
key_master_info_data_lock, key_master_info_run_lock,
key_master_info_sleep_lock, key_master_info_start_stop_lock,
+ key_master_info_start_alter_lock,
+ key_master_info_start_alter_list_lock,
key_mutex_slave_reporting_capability_err_lock, key_relay_log_info_data_lock,
key_rpl_group_info_sleep_lock,
key_relay_log_info_log_space_lock, key_relay_log_info_run_lock,
@@ -995,6 +997,8 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_master_info_start_stop_lock, "Master_info::start_stop_lock", 0},
{ &key_master_info_run_lock, "Master_info::run_lock", 0},
{ &key_master_info_sleep_lock, "Master_info::sleep_lock", 0},
+ { &key_master_info_start_alter_lock, "Master_info::start_alter_lock", 0},
+ { &key_master_info_start_alter_list_lock, "Master_info::start_alter_lock", 0},
{ &key_mutex_slave_reporting_capability_err_lock, "Slave_reporting_capability::err_lock", 0},
{ &key_relay_log_info_data_lock, "Relay_log_info::data_lock", 0},
{ &key_relay_log_info_log_space_lock, "Relay_log_info::log_space_lock", 0},
diff --git a/sql/mysqld.h b/sql/mysqld.h
index 8c0b92c6446..3a8710f9a85 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -332,6 +332,8 @@ extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_LOCK_user_conn, key_LOG_LOCK_log,
key_master_info_data_lock, key_master_info_run_lock,
key_master_info_sleep_lock, key_master_info_start_stop_lock,
+ key_master_info_start_alter_lock,
+ key_master_info_start_alter_list_lock,
key_mutex_slave_reporting_capability_err_lock, key_relay_log_info_data_lock,
key_relay_log_info_log_space_lock, key_relay_log_info_run_lock,
key_rpl_group_info_sleep_lock,
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index c1031133530..7b0e1e4a363 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -607,6 +607,8 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
wait_for_commit* suspended_wfc;
void *hton= NULL;
LEX_CSTRING gtid_pos_table_name;
+ TABLE *tbl= nullptr;
+ MDL_savepoint m_start_of_statement_svp(thd->mdl_context.mdl_savepoint());
DBUG_ENTER("record_gtid");
*out_hton= NULL;
@@ -625,6 +627,18 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
if (!in_statement)
thd->reset_for_next_command();
+ if (thd->rgi_slave && (thd->rgi_slave->gtid_ev_flags_extra &
+ Gtid_log_event::FL_START_ALTER_E1))
+ {
+ /*
+ store the open table table list in ptr, so that is close_thread_tables
+ is called start alter tables are not closed
+ */
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ tbl= thd->open_tables;
+ thd->open_tables= nullptr;
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ }
/*
Only the SQL thread can call select_gtid_pos_table without a mutex
Other threads needs to use a mutex and take into account that the
@@ -734,12 +748,23 @@ end:
{
if (err || (err= ha_commit_trans(thd, FALSE)))
ha_rollback_trans(thd, FALSE);
-
close_thread_tables(thd);
- if (in_transaction)
- thd->mdl_context.release_statement_locks();
- else
- thd->release_transactional_locks();
+ if (!thd->rgi_slave || !(thd->rgi_slave->gtid_ev_flags_extra &
+ Gtid_log_event::FL_START_ALTER_E1))
+ {
+ if (in_transaction)
+ thd->mdl_context.release_statement_locks();
+ else
+ thd->release_transactional_locks();
+ }
+ }
+ if (thd->rgi_slave &&
+ thd->rgi_slave->gtid_ev_flags_extra & Gtid_log_event::FL_START_ALTER_E1)
+ {
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ thd->open_tables= tbl;
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ thd->mdl_context.rollback_to_savepoint(m_start_of_statement_svp);
}
thd->lex->restore_backup_query_tables_list(&lex_backup);
thd->variables.option_bits= thd_saved_option;
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc
index 4fd36891a4b..2283f91ba02 100644
--- a/sql/rpl_mi.cc
+++ b/sql/rpl_mi.cc
@@ -43,7 +43,8 @@ Master_info::Master_info(LEX_CSTRING *connection_name_arg,
gtid_reconnect_event_skip_count(0), gtid_event_seen(false),
in_start_all_slaves(0), in_stop_all_slaves(0), in_flush_all_relay_logs(0),
users(0), killed(0),
- total_ddl_groups(0), total_non_trans_groups(0), total_trans_groups(0)
+ total_ddl_groups(0), total_non_trans_groups(0), total_trans_groups(0),
+ is_shutdown(false)
{
char *tmp;
host[0] = 0; user[0] = 0; password[0] = 0;
@@ -85,6 +86,14 @@ Master_info::Master_info(LEX_CSTRING *connection_name_arg,
mysql_mutex_init(key_master_info_data_lock, &data_lock, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_master_info_start_stop_lock, &start_stop_lock,
MY_MUTEX_INIT_SLOW);
+ /*
+ start_alter_lock will protect individual start_alter_info while
+ start_alter_list_lock is for list insertion and deletion operations
+ */
+ mysql_mutex_init(key_master_info_start_alter_lock, &start_alter_lock,
+ MY_MUTEX_INIT_FAST);
+ mysql_mutex_init(key_master_info_start_alter_list_lock, &start_alter_list_lock,
+ MY_MUTEX_INIT_FAST);
mysql_mutex_setflags(&run_lock, MYF_NO_DEADLOCK_DETECTION);
mysql_mutex_setflags(&data_lock, MYF_NO_DEADLOCK_DETECTION);
mysql_mutex_init(key_master_info_sleep_lock, &sleep_lock, MY_MUTEX_INIT_FAST);
@@ -92,6 +101,7 @@ Master_info::Master_info(LEX_CSTRING *connection_name_arg,
mysql_cond_init(key_master_info_start_cond, &start_cond, NULL);
mysql_cond_init(key_master_info_stop_cond, &stop_cond, NULL);
mysql_cond_init(key_master_info_sleep_cond, &sleep_cond, NULL);
+ init_sql_alloc(PSI_INSTRUMENT_ME, &mem_root, MEM_ROOT_BLOCK_SIZE, 0, MYF(0));
}
@@ -121,10 +131,13 @@ Master_info::~Master_info()
mysql_mutex_destroy(&data_lock);
mysql_mutex_destroy(&sleep_lock);
mysql_mutex_destroy(&start_stop_lock);
+ mysql_mutex_destroy(&start_alter_lock);
+ mysql_mutex_destroy(&start_alter_list_lock);
mysql_cond_destroy(&data_cond);
mysql_cond_destroy(&start_cond);
mysql_cond_destroy(&stop_cond);
mysql_cond_destroy(&sleep_cond);
+ free_root(&mem_root, MYF(0));
}
/**
diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h
index a4a06d42a5c..b8135b8b55f 100644
--- a/sql/rpl_mi.h
+++ b/sql/rpl_mi.h
@@ -227,7 +227,7 @@ class Master_info : public Slave_reporting_capability
File fd; // we keep the file open, so we need to remember the file pointer
IO_CACHE file;
- mysql_mutex_t data_lock, run_lock, sleep_lock, start_stop_lock;
+ mysql_mutex_t data_lock, run_lock, sleep_lock, start_stop_lock, start_alter_lock, start_alter_list_lock;
mysql_cond_t data_cond, start_cond, stop_cond, sleep_cond;
THD *io_thd;
MYSQL* mysql;
@@ -352,6 +352,25 @@ class Master_info : public Slave_reporting_capability
ACK from slave, or if delay_master is enabled.
*/
int semi_ack;
+ List <start_alter_info> start_alter_list;
+ MEM_ROOT mem_root;
+ /*
+ Flag is raised at the parallel worker slave stop. Its purpose
+ is to mark the whole start_alter_list when slave stops.
+ The flag is read by Start Alter event to self-mark its state accordingly
+ at time its alter info struct is about to be appened to the list.
+ */
+ bool is_shutdown;
+};
+
+struct start_alter_thd_args
+{
+ rpl_group_info *rgi;
+ LEX_CSTRING query;
+ LEX_CSTRING *db;
+ char *catalog;
+ bool shutdown;
+ CHARSET_INFO *cs;
};
int init_master_info(Master_info* mi, const char* master_info_fname,
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 49ec08a9cea..4b305e9a57a 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -151,6 +151,9 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id,
wait_for_commit *wfc= &rgi->commit_orderer;
int err;
+ if (rgi->get_finish_event_group_called())
+ return;
+
thd->get_stmt_da()->set_overwrite_status(true);
/*
Remove any left-over registration to wait for a prior commit to
@@ -272,6 +275,8 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id,
*/
thd->get_stmt_da()->reset_diagnostics_area();
wfc->wakeup_subsequent_commits(rgi->worker_error);
+ rgi->did_mark_start_commit= false;
+ rgi->set_finish_event_group_called(true);
}
@@ -571,6 +576,7 @@ rpl_pause_for_ftwrl(THD *thd)
uint32 i;
rpl_parallel_thread_pool *pool= &global_rpl_thread_pool;
int err;
+ Dynamic_array<Master_info*> mi_arr(4, 4); // array of replication source mi:s
DBUG_ENTER("rpl_pause_for_ftwrl");
/*
@@ -622,9 +628,36 @@ rpl_pause_for_ftwrl(THD *thd)
mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
};
--e->need_sub_id_signal;
+
thd->EXIT_COND(&old_stage);
if (err)
break;
+ /*
+ Notify any source any domain waiting-for-master Start-Alter to give way.
+ */
+ Master_info *mi= e->rli->mi;
+ bool found= false;
+ for (uint i= 0; i < mi_arr.elements() && !found; i++)
+ found= mi_arr.at(i) == mi;
+ if (!found)
+ {
+ mi_arr.append(mi);
+ start_alter_info *info=NULL;
+ mysql_mutex_lock(&mi->start_alter_list_lock);
+ List_iterator<start_alter_info> info_iterator(mi->start_alter_list);
+ while ((info= info_iterator++))
+ {
+ mysql_mutex_lock(&mi->start_alter_lock);
+
+ DBUG_ASSERT(info->state == start_alter_state::REGISTERED);
+
+ info->state= start_alter_state::ROLLBACK_ALTER;
+ info->direct_commit_alter= true;
+ mysql_cond_broadcast(&info->start_alter_cond);
+ mysql_mutex_unlock(&mi->start_alter_lock);
+ }
+ mysql_mutex_unlock(&mi->start_alter_list_lock);
+ }
}
if (err)
@@ -1690,6 +1723,9 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
{
mysql_mutex_lock(&pool->threads[i]->LOCK_rpl_thread);
pool->threads[i]->delay_start= false;
+ pool->threads[i]->current_start_alter_id= 0;
+ pool->threads[i]->current_start_alter_domain_id= 0;
+ pool->threads[i]->reserved_start_alter_thread= false;
mysql_cond_signal(&pool->threads[i]->COND_rpl_thread);
while (!pool->threads[i]->running)
mysql_cond_wait(&pool->threads[i]->COND_rpl_thread,
@@ -1969,7 +2005,19 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
rgi->retry_start_offset= rli->future_event_relay_log_pos-event_size;
rgi->retry_event_count= 0;
rgi->killed_for_retry= rpl_group_info::RETRY_KILL_NONE;
+ /* rgi is transaction specific so we need to move this value to rgi */
+ rgi->reserved_start_alter_thread= reserved_start_alter_thread;
+ rgi->rpt= this;
+ rgi->direct_commit_alter= false;
+ rgi->finish_event_group_called= false;
+ DBUG_ASSERT(!rgi->sa_info);
+ /*
+ We can remove the reserved_start_alter_thread flag.
+ If we get more concurrent alter handle_split_alter will
+ automatically set this flag again.
+ */
+ reserved_start_alter_thread= false;
return rgi;
}
@@ -2033,6 +2081,10 @@ rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco)
loc_gco_list= gco;
}
+void rpl_group_info::finish_start_alter_event_group()
+{
+ finish_event_group(rpt, this->gtid_sub_id, this->parallel_entry, this);
+}
rpl_parallel_thread::rpl_parallel_thread()
: channel_name_length(0), last_error_number(0), last_error_timestamp(0),
@@ -2042,7 +2094,7 @@ rpl_parallel_thread::rpl_parallel_thread()
rpl_parallel_thread_pool::rpl_parallel_thread_pool()
- : threads(0), free_list(0), count(0), inited(false), busy(false),
+ : threads(0), free_list(0), count(0), inited(false),current_start_alters(0), busy(false),
pfs_bkp{0, false, NULL}
{
}
@@ -2175,6 +2227,129 @@ rpl_parallel_thread_pool::copy_pool_for_pfs(Relay_log_info *rli)
}
}
+/*
+ START ALTER , COMMIT ALTER / ROLLBACK ALTER scheduling
+
+ Steps:-
+ 1. (For Gtid_log_event SA). Get the worker thread which is either
+ e->rpl_threads[i] is NULL means worker from poll has not been assigned yet
+ e->rpl_threads[i]->current_owner != &e->rpl_threads[i]
+ Thread has been released, or about to //same as choose_thread logic
+ !e->rpl_threads[i]->current_start_alter_id is 0 , safe to schedule.
+ We dont want to schedule on worker which already have been scheduled SA
+ but CA/RA has not been scheduled yet. current_start_alter_id will indicate
+ this. If we dont do this we will get deadlock.
+ 2. (For Gtid_log_event SA)
+ call choose_thread_internal so that e->rpl_threads[idx] is not null
+ update the current_start_alter_id
+ 3. (For Gtid_log_event SA)
+ update local e->pending_start_alters(local) variable and
+ pool->current_start_alters(global)
+ We need 2 status variable (global and local) because we can have
+ slave_domain_parallel_threads != pool->threads.
+ 4. (For CA/RA Gtid_log_event)
+ Update e->pending_start_alters and pool->current_start_alters
+ while holding mutex lock on pool (if SA is not assigned to
+ reserved thread)
+
+
+ @returns
+ true Worker allocated (choose_thread_internal called)
+ false Worker not allocated (choose_thread_internal not called)
+*/
+static bool handle_split_alter(rpl_parallel_entry *e,
+ Gtid_log_event *gtid_ev, uint32 *idx,
+ //choose_thread_internal specific
+ bool *did_enter_cond, rpl_group_info* rgi,
+ PSI_stage_info *old_stage)
+{
+ uint16 flags_extra= gtid_ev->flags_extra;
+ bool thread_allocated= false;
+ //Step 1
+ if (flags_extra & Gtid_log_event::FL_START_ALTER_E1 ||
+ //This will arrange finding threads for CA/RA as well
+ //as concurrent DDL
+ e->pending_start_alters)
+ {
+ /*
+ j is needed for round robin scheduling, we will start with rpl_thread_idx
+ go till rpl_thread_max and then start with 0 to rpl_thread_idx
+ */
+ int j= e->rpl_thread_idx;
+ for(uint i= 0; i < e->rpl_thread_max; i++)
+ {
+ if (!e->rpl_threads[j] || e->rpl_threads[j]->current_owner
+ != &e->rpl_threads[j] || !e->rpl_threads[j]->current_start_alter_id)
+ {
+ //This condition will hit atleast one time no matter what happens
+ *idx= j;
+ DBUG_PRINT("info", ("Start alter id %d", j));
+ goto idx_found;
+ }
+ j++;
+ j= j % e->rpl_thread_max;
+ }
+ //We did not find and idx
+ DBUG_ASSERT(0);
+ return false;
+idx_found:
+ e->rpl_thread_idx= *idx;
+ e->choose_thread_internal(*idx, did_enter_cond, rgi, old_stage);
+ thread_allocated= true;
+ if (flags_extra & Gtid_log_event::FL_START_ALTER_E1)
+ {
+ mysql_mutex_assert_owner(&e->rpl_threads[*idx]->LOCK_rpl_thread);
+ e->rpl_threads[e->rpl_thread_idx]->current_start_alter_id= gtid_ev->seq_no;
+ e->rpl_threads[e->rpl_thread_idx]->current_start_alter_domain_id=
+ gtid_ev->domain_id;
+ /*
+ We are locking LOCK_rpl_thread_pool becuase we are going to update
+ current_start_alters
+ */
+ mysql_mutex_lock(&global_rpl_thread_pool.LOCK_rpl_thread_pool);
+ if (e->pending_start_alters < e->rpl_thread_max - 1 &&
+ global_rpl_thread_pool.current_start_alters
+ < global_rpl_thread_pool.count - 1)
+ {
+ e->pending_start_alters++;
+ global_rpl_thread_pool.current_start_alters++;
+ }
+ else
+ {
+ e->rpl_threads[*idx]->reserved_start_alter_thread= true;
+ e->rpl_threads[*idx]->current_start_alter_id= 0;
+ e->rpl_threads[*idx]->current_start_alter_domain_id= 0;
+ }
+ mysql_mutex_unlock(&global_rpl_thread_pool.LOCK_rpl_thread_pool);
+ }
+ }
+ if(flags_extra & (Gtid_log_event::FL_COMMIT_ALTER_E1 |
+ Gtid_log_event::FL_ROLLBACK_ALTER_E1 ))
+ {
+ //Free the corrosponding rpt current_start_alter_id
+ for(uint i= 0; i < e->rpl_thread_max; i++)
+ {
+ if(e->rpl_threads[i] &&
+ e->rpl_threads[i]->current_start_alter_id == gtid_ev->sa_seq_no &&
+ e->rpl_threads[i]->current_start_alter_domain_id == gtid_ev->domain_id)
+ {
+ mysql_mutex_lock(&global_rpl_thread_pool.LOCK_rpl_thread_pool);
+ e->rpl_threads[i]->current_start_alter_id= 0;
+ e->rpl_threads[i]->current_start_alter_domain_id= 0;
+ global_rpl_thread_pool.current_start_alters--;
+ e->pending_start_alters--;
+ DBUG_PRINT("info", ("Commit/Rollback alter id %d", i));
+ mysql_mutex_unlock(&global_rpl_thread_pool.LOCK_rpl_thread_pool);
+ break;
+ }
+ }
+ }
+
+ return thread_allocated;
+
+}
+
+
/*
Obtain a worker thread that we can queue an event to.
@@ -2208,25 +2383,32 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
Gtid_log_event *gtid_ev)
{
uint32 idx;
- Relay_log_info *rli= rgi->rli;
- rpl_parallel_thread *thr;
idx= rpl_thread_idx;
if (gtid_ev)
{
+ if (++idx >= rpl_thread_max)
+ idx= 0;
+ //rpl_thread_idx will be updated handle_split_alter
+ if (handle_split_alter(this, gtid_ev, &idx, did_enter_cond, rgi, old_stage))
+ return rpl_threads[idx];
if (gtid_ev->flags2 &
(Gtid_log_event::FL_COMPLETED_XA | Gtid_log_event::FL_PREPARED_XA))
+ {
idx= my_hash_sort(&my_charset_bin, gtid_ev->xid.key(),
gtid_ev->xid.key_length()) % rpl_thread_max;
- else
- {
- ++idx;
- if (idx >= rpl_thread_max)
- idx= 0;
}
rpl_thread_idx= idx;
}
- thr= rpl_threads[idx];
+ return choose_thread_internal(idx, did_enter_cond, rgi, old_stage);
+}
+
+rpl_parallel_thread * rpl_parallel_entry::choose_thread_internal(uint idx,
+ bool *did_enter_cond, rpl_group_info *rgi,
+ PSI_stage_info *old_stage)
+{
+ rpl_parallel_thread* thr= rpl_threads[idx];
+ Relay_log_info *rli= rgi->rli;
if (thr)
{
*did_enter_cond= false;
@@ -2340,7 +2522,7 @@ rpl_parallel::~rpl_parallel()
rpl_parallel_entry *
-rpl_parallel::find(uint32 domain_id)
+rpl_parallel::find(uint32 domain_id, Relay_log_info *rli)
{
struct rpl_parallel_entry *e;
@@ -2366,6 +2548,8 @@ rpl_parallel::find(uint32 domain_id)
e->domain_id= domain_id;
e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX;
e->pause_sub_id= (uint64)ULONGLONG_MAX;
+ e->pending_start_alters= 0;
+ e->rli= rli;
if (my_hash_insert(&domain_hash, (uchar *)e))
{
my_free(e);
@@ -2376,7 +2560,11 @@ rpl_parallel::find(uint32 domain_id)
mysql_cond_init(key_COND_parallel_entry, &e->COND_parallel_entry, NULL);
}
else
+ {
+ DBUG_ASSERT(rli == e->rli);
+
e->force_abort= false;
+ }
return e;
}
@@ -2393,7 +2581,7 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli)
struct rpl_parallel_entry *e;
rpl_parallel_thread *rpt;
uint32 i, j;
-
+ Master_info *mi= rli->mi;
/*
First signal all workers that they must force quit; no more events will
be queued to complete any partial event groups executed.
@@ -2444,6 +2632,45 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli)
};);
global_rpl_thread_pool.copy_pool_for_pfs(rli);
+ /*
+ Shutdown SA alter threads through marking their execution states
+ to force their early post-SA execution exit. Upon that the affected SA threads
+ change their state to COMPLETED, notify any waiting CA|RA and this thread.
+ */
+ start_alter_info *info=NULL;
+ mysql_mutex_lock(&mi->start_alter_list_lock);
+ List_iterator<start_alter_info> info_iterator(mi->start_alter_list);
+ mi->is_shutdown= true; // a sign to stop in concurrently coming in new SA:s
+ while ((info= info_iterator++))
+ {
+ mysql_mutex_lock(&mi->start_alter_lock);
+ if (info->state == start_alter_state::COMPLETED)
+ {
+ mysql_mutex_unlock(&mi->start_alter_lock);
+ continue;
+ }
+ info->state= start_alter_state::ROLLBACK_ALTER;
+ // Any possible CA that is (will be) waiting will complete this ALTER instance
+ info->direct_commit_alter= true;
+ mysql_cond_broadcast(&info->start_alter_cond); // notify SA:s
+ mysql_mutex_unlock(&mi->start_alter_lock);
+
+ // await SA in the COMPLETED state
+ mysql_mutex_lock(&mi->start_alter_lock);
+ while(info->state == start_alter_state::ROLLBACK_ALTER)
+ mysql_cond_wait(&info->start_alter_cond, &mi->start_alter_lock);
+
+ DBUG_ASSERT(info->state == start_alter_state::COMPLETED);
+
+ mysql_mutex_unlock(&mi->start_alter_lock);
+ }
+ mysql_mutex_unlock(&mi->start_alter_list_lock);
+
+ DBUG_EXECUTE_IF("rpl_slave_stop_CA_before_binlog",
+ {
+ debug_sync_set_action(thd, STRING_WITH_LEN("now signal proceed_CA_1"));
+ });
+
for (i= 0; i < domain_hash.records; ++i)
{
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
@@ -2458,6 +2685,17 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli)
}
}
}
+ // Now that all threads are docked, remained alter states are safe to destroy
+ mysql_mutex_lock(&mi->start_alter_list_lock);
+ info_iterator.rewind();
+ while ((info= info_iterator++))
+ {
+ info_iterator.remove();
+ mysql_cond_destroy(&info->start_alter_cond);
+ my_free(info);
+ }
+ mi->is_shutdown= false;
+ mysql_mutex_unlock(&mi->start_alter_list_lock);
}
@@ -2802,7 +3040,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ||
rli->mi->parallel_mode <= SLAVE_PARALLEL_MINIMAL ?
0 : gtid_ev->domain_id);
- if (!(e= find(domain_id)))
+ if (!(e= find(domain_id, rli)))
{
my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
delete ev;
@@ -2814,6 +3052,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
gtid.server_id= gtid_ev->server_id;
gtid.seq_no= gtid_ev->seq_no;
rli->update_relay_log_state(&gtid, 1);
+ serial_rgi->gtid_ev_flags_extra= gtid_ev->flags_extra;
if (process_gtid_for_restart_pos(rli, &gtid))
{
/*
diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h
index d3c46301ff8..66c7fc9f316 100644
--- a/sql/rpl_parallel.h
+++ b/sql/rpl_parallel.h
@@ -99,6 +99,18 @@ struct rpl_parallel_thread {
bool running;
bool stop;
bool pause_for_ftwrl;
+ /*
+ 0 = No start alter assigned
+ >0 = Start alter assigned
+ */
+ uint64 current_start_alter_id;
+ uint32 current_start_alter_domain_id;
+ /*
+ This flag is true when Start Alter just needs to be binlogged only.
+ This scenario will happens when there is congestion , and we can not
+ allocate independent worker to start alter.
+ */
+ bool reserved_start_alter_thread;
mysql_mutex_t LOCK_rpl_thread;
mysql_cond_t COND_rpl_thread;
mysql_cond_t COND_rpl_thread_queue;
@@ -297,6 +309,12 @@ struct rpl_parallel_thread_pool {
mysql_cond_t COND_rpl_thread_pool;
uint32 count;
bool inited;
+
+ /*
+ Lock first LOCK_rpl_thread_pool and then LOCK_rpl_thread to
+ update this variable.
+ */
+ uint32 current_start_alters;
/*
While FTWRL runs, this counter is incremented to make SQL thread or
STOP/START slave not try to start new activity while that operation
@@ -328,6 +346,7 @@ struct rpl_parallel_entry {
*/
uint32 need_sub_id_signal;
uint64 last_commit_id;
+ uint32 pending_start_alters;
bool active;
/*
Set when SQL thread is shutting down, and no more events can be processed,
@@ -410,10 +429,15 @@ struct rpl_parallel_entry {
uint64 count_committing_event_groups;
/* The group_commit_orderer object for the events currently being queued. */
group_commit_orderer *current_gco;
+ /* Relay log info of replication source for this entry. */
+ Relay_log_info *rli;
rpl_parallel_thread * choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
PSI_stage_info *old_stage,
Gtid_log_event *gtid_ev);
+ rpl_parallel_thread *
+ choose_thread_internal(uint idx, bool *did_enter_cond, rpl_group_info *rgi,
+ PSI_stage_info *old_stage);
int queue_master_restart(rpl_group_info *rgi,
Format_description_log_event *fdev);
};
@@ -425,7 +449,7 @@ struct rpl_parallel {
rpl_parallel();
~rpl_parallel();
void reset();
- rpl_parallel_entry *find(uint32 domain_id);
+ rpl_parallel_entry *find(uint32 domain_id, Relay_log_info *rli);
void wait_for_done(THD *thd, Relay_log_info *rli);
void stop_during_until();
bool workers_idle();
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 4ba843a51ab..94b1db74637 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -2152,16 +2152,24 @@ rpl_group_info::reinit(Relay_log_info *rli)
long_find_row_note_printed= false;
did_mark_start_commit= false;
gtid_ev_flags2= 0;
+ gtid_ev_flags_extra= 0;
+ gtid_ev_sa_seq_no= 0;
last_master_timestamp = 0;
gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL;
speculation= SPECULATE_NO;
+ rpt= NULL;
+ start_alter_ev= NULL;
+ direct_commit_alter= false;
commit_orderer.reinit();
}
rpl_group_info::rpl_group_info(Relay_log_info *rli)
: thd(0), wait_commit_sub_id(0),
wait_commit_group_info(0), parallel_entry(0),
- deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false)
+ deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false),
+ gtid_ev_flags2(0), gtid_ev_flags_extra(0), gtid_ev_sa_seq_no(0),
+ reserved_start_alter_thread(0), finish_event_group_called(0), rpt(NULL),
+ start_alter_ev(NULL), direct_commit_alter(false), sa_info(NULL)
{
reinit(rli);
bzero(&current_gtid, sizeof(current_gtid));
@@ -2170,7 +2178,6 @@ rpl_group_info::rpl_group_info(Relay_log_info *rli)
mysql_cond_init(key_rpl_group_info_sleep_cond, &sleep_cond, NULL);
}
-
rpl_group_info::~rpl_group_info()
{
free_annotate_event();
@@ -2195,6 +2202,7 @@ event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev)
rgi->current_gtid.seq_no= gev->seq_no;
rgi->commit_id= gev->commit_id;
rgi->gtid_pending= true;
+ rgi->sa_info= NULL;
return 0;
}
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index cc807852bf2..80ee143a8e8 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -641,6 +641,33 @@ struct inuse_relaylog {
}
};
+enum start_alter_state
+{
+ INVALID= 0,
+ REGISTERED, // Start Alter exist, Default state
+ COMMIT_ALTER, // COMMIT the alter
+ ROLLBACK_ALTER, // Rollback the alter
+ COMPLETED // COMMIT/ROLLBACK Alter written in binlog
+};
+
+struct start_alter_info
+{
+ /*
+ ALTER id is defined as a pair of GTID's seq_no and domain_id.
+ */
+ decltype(rpl_gtid::seq_no) sa_seq_no; // key for searching (SA's id)
+ uint32 domain_id;
+ bool direct_commit_alter; // when true CA thread executes the whole query
+ /*
+ 0 prepared and not error from commit and rollback
+ >0 error expected in commit/rollback
+ Rollback can be logged with 0 error if master is killed
+ */
+ uint error;
+ enum start_alter_state state;
+ /* We are not using mysql_cond_t because we do not need PSI */
+ mysql_cond_t start_alter_cond;
+};
/*
This is data for various state needed to be kept for the processing of
@@ -760,6 +787,9 @@ struct rpl_group_info
bool did_mark_start_commit;
/* Copy of flags2 from GTID event. */
uchar gtid_ev_flags2;
+ /* Copy of flags3 from GTID event. */
+ uint16 gtid_ev_flags_extra;
+ uint64 gtid_ev_sa_seq_no;
enum {
GTID_DUPLICATE_NULL=0,
GTID_DUPLICATE_IGNORE=1,
@@ -834,6 +864,15 @@ struct rpl_group_info
RETRY_KILL_KILLED
};
uchar killed_for_retry;
+ bool reserved_start_alter_thread;
+ bool finish_event_group_called;
+ /*
+ Used for two phase alter table
+ */
+ rpl_parallel_thread *rpt;
+ Query_log_event *start_alter_ev;
+ bool direct_commit_alter;
+ start_alter_info *sa_info;
rpl_group_info(Relay_log_info *rli_);
~rpl_group_info();
@@ -961,6 +1000,19 @@ struct rpl_group_info
if (!is_parallel_exec)
rli->event_relay_log_pos= future_event_relay_log_pos;
}
+
+ void finish_start_alter_event_group();
+
+ bool get_finish_event_group_called()
+ {
+ return finish_event_group_called;
+ }
+
+ void set_finish_event_group_called(bool value)
+ {
+ finish_event_group_called= value;
+ }
+
};
diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt
index c9599f283bb..eb3d85b34f0 100644
--- a/sql/share/errmsg-utf8.txt
+++ b/sql/share/errmsg-utf8.txt
@@ -8917,3 +8917,5 @@ ER_JSON_HISTOGRAM_PARSE_FAILED
eng "Failed to parse histogram for table %s.%s: %s at offset %d."
ER_SF_OUT_INOUT_ARG_NOT_ALLOWED
eng "OUT or INOUT argument %d for function %s is not allowed here"
+ER_INCONSISTENT_SLAVE_TEMP_TABLE
+ eng "Replicated query '%s' table `%s.%s` can not be temporary"
diff --git a/sql/slave.cc b/sql/slave.cc
index 3c5b830fbe2..38d282a5ac5 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -3930,6 +3930,10 @@ apply_event_and_update_pos_apply(Log_event* ev, THD* thd, rpl_group_info *rgi,
DBUG_PRINT("info", ("apply_event error = %d", exec_res));
if (exec_res == 0)
{
+ if (thd->rgi_slave && (thd->rgi_slave->gtid_ev_flags_extra &
+ Gtid_log_event::FL_START_ALTER_E1) &&
+ thd->rgi_slave->get_finish_event_group_called())
+ DBUG_RETURN(exec_res ? 1 : 0);
int error= ev->update_pos(rgi);
#ifndef DBUG_OFF
DBUG_PRINT("info", ("update_pos error = %d", error));
@@ -4053,6 +4057,11 @@ int
apply_event_and_update_pos_for_parallel(Log_event* ev, THD* thd,
rpl_group_info *rgi)
{
+ int rc= 0;
+ ulong retries= 0;
+ bool is_sa= rgi->gtid_ev_flags_extra == Gtid_log_event::FL_START_ALTER_E1;
+ bool is_sa_temp_err= false;
+
mysql_mutex_assert_not_owner(&rgi->rli->data_lock);
int reason= apply_event_and_update_pos_setup(ev, thd, rgi);
/*
@@ -4064,7 +4073,51 @@ apply_event_and_update_pos_for_parallel(Log_event* ev, THD* thd,
Calling sql_delay_event() was handled in the SQL driver thread when
doing parallel replication.
*/
- return apply_event_and_update_pos_apply(ev, thd, rgi, reason);
+ do
+ {
+ rc= apply_event_and_update_pos_apply(ev, thd, rgi, reason);
+ if (rc && is_sa)
+ {
+ is_sa_temp_err=
+ is_parallel_retry_error(rgi, thd->get_stmt_da()->sql_errno());
+ }
+ }
+ while(is_sa_temp_err && retries++ < slave_trans_retries);
+
+ if (is_sa_temp_err)
+ {
+ Master_info *mi= rgi->rli->mi;
+ mysql_mutex_lock(&mi->start_alter_lock);
+
+ DBUG_ASSERT(!rgi->sa_info->direct_commit_alter);
+ /*
+ Give up retrying to hand the whole ALTER execution over to
+ the "Complete" ALTER.
+ */
+ rgi->sa_info->direct_commit_alter= true;
+ rgi->sa_info->state= start_alter_state::COMPLETED;
+ mysql_cond_broadcast(&rgi->sa_info->start_alter_cond);
+ mysql_mutex_unlock(&mi->start_alter_lock);
+ if (global_system_variables.log_warnings > 2)
+ {
+ rpl_gtid *gtid= &rgi->current_gtid;
+ sql_print_information("Start Alter Query '%s' "
+ "GTID %u-%u-%llu having a temporary error %d code "
+ "has been unsuccessfully retried %lu times; its "
+ "parallel optimistic execution now proceeds in "
+ "legacy mode",
+ static_cast<Query_log_event*>(ev)->query,
+ gtid->domain_id, gtid->server_id, gtid->seq_no,
+ thd->get_stmt_da()->sql_errno(), retries - 1);
+ }
+ thd->clear_error();
+ thd->reset_killed();
+ rgi->killed_for_retry = rpl_group_info::RETRY_KILL_NONE;
+
+ rc= false;
+ }
+
+ return rc;
}
@@ -5570,8 +5623,10 @@ pthread_handler_t handle_slave_sql(void *arg)
err:
if (mi->using_parallel())
+ {
rli->parallel.wait_for_done(thd, rli);
- /* Gtid_list_log_event::do_apply_event has already reported the GTID until */
+ };
+ /* Gtid_list_log_event::do_apply_event has already reported the GTID until */
if (rli->stop_for_until && rli->until_condition != Relay_log_info::UNTIL_GTID)
{
if (global_system_variables.log_warnings > 2)
@@ -6294,12 +6349,19 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len)
Rotate_log_event rev(buf, checksum_alg != BINLOG_CHECKSUM_ALG_OFF ?
event_len - BINLOG_CHECKSUM_LEN : event_len,
mi->rli.relay_log.description_event_for_queue);
-
- if (unlikely(mi->gtid_reconnect_event_skip_count) &&
- unlikely(!mi->gtid_event_seen) &&
- rev.is_artificial_event() &&
- (mi->prev_master_id != mi->master_id ||
- strcmp(rev.new_log_ident, mi->master_log_name) != 0))
+ bool master_changed= false;
+ bool maybe_crashed= false;
+ // Exclude server start scenario
+ if ((mi->prev_master_id && mi->master_id) &&
+ (mi->prev_master_id != mi->master_id))
+ master_changed= true;
+ if ((mi->master_log_name[0]!='\0') &&
+ (strcmp(rev.new_log_ident, mi->master_log_name) != 0))
+ maybe_crashed= true;
+
+ if (unlikely((mi->gtid_reconnect_event_skip_count && master_changed) ||
+ maybe_crashed) &&
+ unlikely(!mi->gtid_event_seen) && rev.is_artificial_event())
{
/*
Artificial Rotate_log_event is the first event we receive at the start
@@ -6335,26 +6397,37 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len)
case likewise rollback the partially received event group.
*/
Format_description_log_event fdle(4);
+ fdle.checksum_alg= checksum_alg;
- if (mi->prev_master_id != mi->master_id)
- sql_print_warning("The server_id of master server changed in the "
- "middle of GTID %u-%u-%llu. Assuming a change of "
- "master server, so rolling back the previously "
- "received partial transaction. Expected: %lu, "
- "received: %lu", mi->last_queued_gtid.domain_id,
- mi->last_queued_gtid.server_id,
- mi->last_queued_gtid.seq_no,
- mi->prev_master_id, mi->master_id);
- else if (strcmp(rev.new_log_ident, mi->master_log_name) != 0)
- sql_print_warning("Unexpected change of master binlog file name in the "
- "middle of GTID %u-%u-%llu, assuming that master has "
- "crashed and rolling back the transaction. Expected: "
- "'%s', received: '%s'",
- mi->last_queued_gtid.domain_id,
- mi->last_queued_gtid.server_id,
- mi->last_queued_gtid.seq_no,
- mi->master_log_name, rev.new_log_ident);
+ /*
+ Possible crash is flagged in being created FD' common header
+ to conduct any necessary cleanup by the slave applier.
+ */
+ if (maybe_crashed)
+ fdle.flags |= LOG_EVENT_BINLOG_IN_USE_F;
+
+ if (mi->gtid_reconnect_event_skip_count)
+ {
+ if (master_changed)
+ sql_print_warning("The server_id of master server changed in the "
+ "middle of GTID %u-%u-%llu. Assuming a change of "
+ "master server, so rolling back the previously "
+ "received partial transaction. Expected: %lu, "
+ "received: %lu", mi->last_queued_gtid.domain_id,
+ mi->last_queued_gtid.server_id,
+ mi->last_queued_gtid.seq_no,
+ mi->prev_master_id, mi->master_id);
+ else
+ sql_print_warning("Unexpected change of master binlog file name in "
+ "the middle of GTID %u-%u-%llu, assuming that "
+ "master has crashed and rolling back the "
+ "transaction. Expected: '%s', received: '%s'",
+ mi->last_queued_gtid.domain_id,
+ mi->last_queued_gtid.server_id,
+ mi->last_queued_gtid.seq_no, mi->master_log_name,
+ rev.new_log_ident);
+ }
mysql_mutex_lock(log_lock);
if (likely(!rli->relay_log.write_event(&fdle) &&
!rli->relay_log.flush_and_sync(NULL)))
diff --git a/sql/sql_alter.cc b/sql/sql_alter.cc
index e28308b4ff6..86c6e9a27f8 100644
--- a/sql/sql_alter.cc
+++ b/sql/sql_alter.cc
@@ -19,6 +19,9 @@
#include "sql_table.h" // mysql_alter_table,
// mysql_exchange_partition
#include "sql_alter.h"
+#include "rpl_mi.h"
+#include "slave.h"
+#include "debug_sync.h"
#include "wsrep_mysqld.h"
Alter_info::Alter_info(const Alter_info &rhs, MEM_ROOT *mem_root)
diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc
index bab2afb957a..baba60e9b6a 100644
--- a/sql/sql_binlog.cc
+++ b/sql/sql_binlog.cc
@@ -20,6 +20,7 @@
#include "sql_parse.h"
#include "sql_acl.h"
#include "rpl_rli.h"
+#include "rpl_mi.h"
#include "slave.h"
#include "log_event.h"
@@ -70,7 +71,8 @@ static int check_event_type(int type, Relay_log_info *rli)
/* It is always allowed to execute FD events. */
return 0;
-
+
+ case QUERY_EVENT:
case TABLE_MAP_EVENT:
case WRITE_ROWS_EVENT_V1:
case UPDATE_ROWS_EVENT_V1:
@@ -167,6 +169,69 @@ int binlog_defragment(THD *thd)
return 0;
}
+/**
+ Wraps Log_event::apply_event to save and restore
+ session context in case of Query_log_event.
+
+ @param ev replication event
+ @param rgi execution context for the event
+
+ @return
+ 0 on success,
+ non-zero otherwise.
+*/
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+int save_restore_context_apply_event(Log_event *ev, rpl_group_info *rgi)
+{
+ int err= 0;
+ THD *thd= rgi->thd;
+ Relay_log_info *rli= thd->rli_fake;
+ sql_digest_state *m_digest;
+ PSI_statement_locker *m_statement_psi;
+ LEX_CSTRING save_db;
+ my_thread_id m_thread_id= 0;
+ LEX_CSTRING connection_name= { STRING_WITH_LEN("BINLOG_BASE64_EVENT") };
+
+ DBUG_ASSERT(!rli->mi);
+
+ if (ev->get_type_code() == QUERY_EVENT)
+ {
+ m_digest= thd->m_digest;
+ m_statement_psi= thd->m_statement_psi;
+ m_thread_id= thd->variables.pseudo_thread_id;
+ thd->system_thread_info.rpl_sql_info= NULL;
+
+ if ((rli->mi= new Master_info(&connection_name, false)))
+ {
+ save_db= thd->db;
+ thd->reset_db(&null_clex_str);
+ }
+ else
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ err= -1;
+ goto end;
+ }
+ thd->m_digest= NULL;
+ thd->m_statement_psi= NULL;
+ }
+
+ err= ev->apply_event(rgi);
+
+end:
+ if (ev->get_type_code() == QUERY_EVENT)
+ {
+ thd->m_digest= m_digest;
+ thd->m_statement_psi= m_statement_psi;
+ thd->variables.pseudo_thread_id= m_thread_id;
+ thd->reset_db(&save_db);
+ delete rli->mi;
+ rli->mi= NULL;
+ }
+
+ return err;
+}
+#endif
/**
Execute a BINLOG statement.
@@ -216,11 +281,9 @@ void mysql_client_binlog_statement(THD* thd)
if (!(rgi= thd->rgi_fake))
rgi= thd->rgi_fake= new rpl_group_info(rli);
rgi->thd= thd;
-
const char *error= 0;
Log_event *ev = 0;
my_bool is_fragmented= FALSE;
-
/*
Out of memory check
*/
@@ -373,7 +436,7 @@ void mysql_client_binlog_statement(THD* thd)
LEX *backup_lex;
thd->backup_and_reset_current_lex(&backup_lex);
- err= ev->apply_event(rgi);
+ err= save_restore_context_apply_event(ev, rgi);
thd->restore_current_lex(backup_lex);
}
thd->variables.option_bits=
@@ -389,7 +452,7 @@ void mysql_client_binlog_statement(THD* thd)
i.e. when this thread terminates.
*/
if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
- delete ev;
+ delete ev;
ev= 0;
if (err)
{
@@ -397,7 +460,8 @@ void mysql_client_binlog_statement(THD* thd)
TODO: Maybe a better error message since the BINLOG statement
now contains several events.
*/
- my_error(ER_UNKNOWN_ERROR, MYF(0));
+ if (!thd->is_error())
+ my_error(ER_UNKNOWN_ERROR, MYF(0));
goto end;
}
}
@@ -413,5 +477,7 @@ end:
thd->variables.option_bits= thd_options;
rgi->slave_close_thread_tables(thd);
my_free(buf);
+ delete rgi;
+ rgi= thd->rgi_fake= NULL;
DBUG_VOID_RETURN;
}
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 29824301e9d..d3b4ead40a0 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -5286,6 +5286,16 @@ thd_rpl_deadlock_check(MYSQL_THD thd, MYSQL_THD other_thd)
return 0;
if (rgi->gtid_sub_id > other_rgi->gtid_sub_id)
return 0;
+ if (rgi->finish_event_group_called || other_rgi->finish_event_group_called)
+ {
+ /*
+ If either of two transactions has already performed commit
+ (e.g split ALTER, asserted below) there won't be any deadlock.
+ */
+ DBUG_ASSERT(rgi->sa_info || other_rgi->sa_info);
+
+ return 0;
+ }
/*
This transaction is about to wait for another transaction that is required
by replication binlog order to commit after. This would cause a deadlock.
diff --git a/sql/sql_class.h b/sql/sql_class.h
index f998b21856b..af424d100a8 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -81,6 +81,7 @@ class Wsrep_applier_service;
class Reprepare_observer;
class Relay_log_info;
struct rpl_group_info;
+struct rpl_parallel_thread;
class Rpl_filter;
class Query_log_event;
class Load_log_event;
@@ -853,6 +854,7 @@ typedef struct system_variables
vers_asof_timestamp_t vers_asof_timestamp;
ulong vers_alter_history;
+ ulong binlog_alter_two_phase;
} SV;
/**
@@ -2996,6 +2998,11 @@ public:
}
bool binlog_table_should_be_logged(const LEX_CSTRING *db);
+ // Accessors and setters of two-phase loggable ALTER binlog properties
+ uchar get_binlog_flags_for_alter();
+ void set_binlog_flags_for_alter(uchar);
+ uint64 get_binlog_start_alter_seq_no();
+ void set_binlog_start_alter_seq_no(uint64);
#endif /* MYSQL_CLIENT */
public:
@@ -7777,5 +7784,38 @@ extern THD_list server_threads;
void setup_tmp_table_column_bitmaps(TABLE *table, uchar *bitmaps,
uint field_count);
+/*
+ RAII utility class to ease binlogging with temporary setting
+ THD etc context and restoring the original one upon logger execution.
+*/
+class Write_log_with_flags
+{
+ THD* m_thd;
+#ifdef WITH_WSREP
+ bool wsrep_to_isolation;
+#endif
+
+public:
+~Write_log_with_flags()
+ {
+ m_thd->set_binlog_flags_for_alter(0);
+ m_thd->set_binlog_start_alter_seq_no(0);
+#ifdef WITH_WSREP
+ if (wsrep_to_isolation)
+ wsrep_to_isolation_end(m_thd);
+#endif
+ }
+
+ Write_log_with_flags(THD *thd, uchar flags,
+ bool do_wsrep_iso __attribute__((unused))= false) :
+ m_thd(thd)
+ {
+ m_thd->set_binlog_flags_for_alter(flags);
+#ifdef WITH_WSREP
+ wsrep_to_isolation= do_wsrep_iso && WSREP(m_thd);
+#endif
+ }
+};
+
#endif /* MYSQL_SERVER */
#endif /* SQL_CLASS_INCLUDED */
diff --git a/sql/sql_table.cc b/sql/sql_table.cc
index 03f46bdc5ea..6930808ce19 100644
--- a/sql/sql_table.cc
+++ b/sql/sql_table.cc
@@ -19,6 +19,7 @@
/* drop and alter of tables */
#include "mariadb.h"
+#include "sql_class.h"
#include "sql_priv.h"
#include "unireg.h"
#include "debug_sync.h"
@@ -58,6 +59,10 @@
#include "ddl_log.h"
#include "debug.h" // debug_crash_here()
#include <algorithm>
+#include "rpl_mi.h"
+#include "rpl_rli.h"
+#include "log.h"
+
#ifdef _WIN32
#include <io.h>
@@ -89,6 +94,12 @@ static int mysql_prepare_create_table(THD *, HA_CREATE_INFO *, Alter_info *,
static uint blob_length_by_type(enum_field_types type);
static bool fix_constraints_names(THD *, List<Virtual_column_info> *,
const HA_CREATE_INFO *);
+static bool wait_for_master(THD *thd);
+static int process_master_state(THD *thd, int alter_result,
+ uint64 &start_alter_id, bool if_exists);
+static bool
+write_bin_log_start_alter_rollback(THD *thd, uint64 &start_alter_id,
+ bool &partial_alter, bool if_exists);
/**
@brief Helper function for explain_filename
@@ -992,7 +1003,16 @@ int write_bin_log(THD *thd, bool clear_error,
int errcode= 0;
thd_proc_info(thd, "Writing to binlog");
if (clear_error)
+ {
+ if (global_system_variables.log_warnings > 2)
+ {
+ uint err_clear= thd->is_error() ? thd->get_stmt_da()->sql_errno() : 0;
+ if (err_clear)
+ sql_print_warning("Error code %d of query '%s' is cleared at its "
+ "binary logging.", err_clear, query);
+ }
thd->clear_error();
+ }
else
errcode= query_error_code(thd, TRUE);
error= thd->binlog_query(THD::STMT_QUERY_TYPE,
@@ -1004,21 +1024,40 @@ int write_bin_log(THD *thd, bool clear_error,
}
-/*
- Write to binary log with optional adding "IF EXISTS"
+/**
+ Write to binary log the query event whose text is taken from thd->query().
- The query is taken from thd->query()
+ @param thd Thread handle.
+ @param clear_error Whether to clear out any error from
+ execution context and binlog event.
+ @param is_trans Whether the query changed transactional data.
+ @param add_if_exists True indicates the binary logging of the query
+ should be done with "if exists" option.
+ @param commit_alter Whether the query should be binlogged as
+ Commit Alter.
+
+ @return false on Success
+ @return true otherwise
*/
int write_bin_log_with_if_exists(THD *thd, bool clear_error,
- bool is_trans, bool add_if_exists)
+ bool is_trans, bool add_if_exists,
+ bool commit_alter)
{
int result;
ulonglong save_option_bits= thd->variables.option_bits;
if (add_if_exists)
thd->variables.option_bits|= OPTION_IF_EXISTS;
+ if (commit_alter)
+ thd->set_binlog_flags_for_alter(Gtid_log_event::FL_COMMIT_ALTER_E1);
+
result= write_bin_log(thd, clear_error, thd->query(), thd->query_length(),
is_trans);
+ if (commit_alter)
+ {
+ thd->set_binlog_flags_for_alter(0);
+ thd->set_binlog_start_alter_seq_no(0);
+ }
thd->variables.option_bits= save_option_bits;
return result;
}
@@ -7296,6 +7335,86 @@ static bool notify_tabledef_changed(TABLE_LIST *table_list)
DBUG_RETURN(0);
}
+/**
+ The function is invoked in error branches of ALTER processing.
+ Write Rollback alter in case of partial_alter is true else
+ call process_master_state.
+
+ @param thd Thread handle.
+ @param[in/out]
+ start_alter_id Start Alter identifier or zero,
+ it is reset to zero.
+ @param[in/out]
+ partial_alter When is true at the function enter
+ that indicates Start Alter phase completed;
+ it then is reset to false.
+ @param if_exists True indicates the binary logging of the query
+ should be done with "if exists" option.
+
+ @return false on Success
+ @return true otherwise
+*/
+static bool
+write_bin_log_start_alter_rollback(THD *thd, uint64 &start_alter_id,
+ bool &partial_alter, bool if_exists)
+{
+#if defined(HAVE_REPLICATION)
+ if (start_alter_id)
+ {
+ start_alter_info *info= thd->rgi_slave->sa_info;
+ Master_info *mi= thd->rgi_slave->rli->mi;
+
+ if (info->sa_seq_no == 0)
+ {
+ /*
+ Error occurred before SA got to processing incl its binlogging.
+ So it's a failure to apply and thus no need to wait for master's
+ completion result.
+ */
+ return true;
+ }
+ mysql_mutex_lock(&mi->start_alter_lock);
+ if (info->direct_commit_alter)
+ {
+ DBUG_ASSERT(info->state == start_alter_state::ROLLBACK_ALTER);
+
+ /*
+ SA may end up in the rollback state through FTWRL that breaks
+ SA's waiting for a master decision.
+ Then it completes "officially", and `direct_commit_alter` true status
+ will affect the future of CA to re-execute the whole query.
+ */
+ info->state= start_alter_state::COMPLETED;
+ if (info->direct_commit_alter)
+ mysql_cond_broadcast(&info->start_alter_cond);
+ mysql_mutex_unlock(&mi->start_alter_lock);
+
+ return true; // not really an error to be handled by caller specifically
+ }
+ mysql_mutex_unlock(&mi->start_alter_lock);
+ /*
+ We have to call wait for master here because in main calculation
+ we can error out before calling wait for master
+ (for example if copy_data_between_tables fails)
+ */
+ if (info->state == start_alter_state::REGISTERED)
+ wait_for_master(thd);
+ if(process_master_state(thd, 1, start_alter_id, if_exists))
+ return true;
+ }
+ else
+#endif
+ if (partial_alter) // Write only if SA written
+ {
+ // Send the rollback message
+ Write_log_with_flags wlwf(thd, Gtid_log_event::FL_ROLLBACK_ALTER_E1);
+ if(write_bin_log_with_if_exists(thd, false, false, if_exists, false))
+ return true;
+ partial_alter= false;
+ }
+ return false;
+}
+
/**
Perform in-place alter table.
@@ -7309,9 +7428,17 @@ static bool notify_tabledef_changed(TABLE_LIST *table_list)
used during different phases.
@param target_mdl_request Metadata request/lock on the target table name.
@param alter_ctx ALTER TABLE runtime context.
-
- @retval true Error
- @retval false Success
+ @param partial_alter Is set to true to return the fact of the first
+ "START ALTER" binlogging phase is done.
+ @param[in/out]
+ start_alter_id Gtid seq_no of START ALTER or zero otherwise;
+ it may get changed to return to the caller.
+ @param if_exists True indicates the binary logging of the query
+ should be done with "if exists" option.
+
+ @retval >=1 Error{ 1= ROLLBACK recieved from master , 2= error
+ in alter so no ROLLBACK in binlog }
+ @retval 0 Success
@note
If mysql_alter_table does not need to copy the table, it is
@@ -7334,13 +7461,16 @@ static bool mysql_inplace_alter_table(THD *thd,
MDL_request *target_mdl_request,
DDL_LOG_STATE *ddl_log_state,
TRIGGER_RENAME_PARAM *trigger_param,
- Alter_table_ctx *alter_ctx)
+ Alter_table_ctx *alter_ctx,
+ bool &partial_alter,
+ uint64 &start_alter_id, bool if_exists)
{
Open_table_context ot_ctx(thd, MYSQL_OPEN_REOPEN | MYSQL_OPEN_IGNORE_KILLED);
handlerton *db_type= table->s->db_type();
Alter_info *alter_info= ha_alter_info->alter_info;
bool reopen_tables= false;
bool res, commit_succeded_with_error= 0;
+
const enum_alter_inplace_result inplace_supported=
ha_alter_info->inplace_supported;
DBUG_ENTER("mysql_inplace_alter_table");
@@ -7421,10 +7551,31 @@ static bool mysql_inplace_alter_table(THD *thd,
thd->variables.lock_wait_timeout))
goto cleanup;
+ DBUG_ASSERT(table->s->tmp_table == NO_TMP_TABLE || start_alter_id == 0);
+
+ if (table->s->tmp_table == NO_TMP_TABLE)
+ {
+ if (write_bin_log_start_alter(thd, partial_alter, start_alter_id,
+ if_exists))
+ goto cleanup;
+ }
+ else if (start_alter_id)
+ {
+ DBUG_ASSERT(thd->rgi_slave);
+
+ my_error(ER_INCONSISTENT_SLAVE_TEMP_TABLE, MYF(0), thd->query(),
+ table_list->db.str, table_list->table_name.str);
+ goto cleanup;
+ }
+
+ DBUG_EXECUTE_IF("start_alter_kill_after_binlog", {
+ DBUG_SUICIDE();
+ });
+
+
// It's now safe to take the table level lock.
if (lock_tables(thd, table_list, alter_ctx->tables_opened, 0))
goto cleanup;
-
DEBUG_SYNC(thd, "alter_table_inplace_after_lock_upgrade");
THD_STAGE_INFO(thd, stage_alter_inplace_prepare);
@@ -7503,14 +7654,23 @@ static bool mysql_inplace_alter_table(THD *thd,
DEBUG_SYNC(thd, "alter_table_inplace_after_lock_downgrade");
THD_STAGE_INFO(thd, stage_alter_inplace);
+ DBUG_EXECUTE_IF("start_alter_delay_master", {
+ debug_sync_set_action(thd,
+ STRING_WITH_LEN("now wait_for alter_cont"));
+ });
/* We can abort alter table for any table type */
thd->abort_on_warning= !ha_alter_info->ignore && thd->is_strict_mode();
res= table->file->ha_inplace_alter_table(altered_table, ha_alter_info);
thd->abort_on_warning= false;
+
+ if (start_alter_id && wait_for_master(thd))
+ goto rollback;
+
if (res)
goto rollback;
+
DEBUG_SYNC(thd, "alter_table_inplace_before_lock_upgrade");
// Upgrade to EXCLUSIVE before commit.
if (wait_while_table_is_used(thd, table, HA_EXTRA_PREPARE_FOR_RENAME))
@@ -7619,7 +7779,7 @@ static bool mysql_inplace_alter_table(THD *thd,
thd->is_error())
{
// Since changes were done in-place, we can't revert them.
- DBUG_RETURN(true);
+ goto err;
}
debug_crash_here("ddl_log_alter_after_rename_frm");
@@ -7636,7 +7796,7 @@ static bool mysql_inplace_alter_table(THD *thd,
If the rename fails we will still have a working table
with the old name, but with other changes applied.
*/
- DBUG_RETURN(true);
+ goto err;
}
debug_crash_here("ddl_log_alter_before_rename_triggers");
if (Table_triggers_list::change_table_name(thd, trigger_param,
@@ -7681,6 +7841,8 @@ static bool mysql_inplace_alter_table(THD *thd,
if (thd->locked_tables_list.reopen_tables(thd, false))
thd->locked_tables_list.unlink_all_closed_tables(thd, NULL, 0);
}
+
+err:
DBUG_RETURN(true);
}
@@ -9383,6 +9545,153 @@ static bool log_and_ok(THD *thd)
return(0);
}
+/*
+ Wait for master to send result of Alter table.
+ Returns
+ true when Rollback is decided
+ false otherwise
+*/
+static bool wait_for_master(THD *thd)
+{
+#ifdef HAVE_REPLICATION
+ start_alter_info* info= thd->rgi_slave->sa_info;
+ Master_info *mi= thd->rgi_slave->rli->mi;
+
+ DBUG_ASSERT(info);
+ DBUG_ASSERT(info->state != start_alter_state::INVALID);
+ DBUG_ASSERT(mi);
+
+ mysql_mutex_lock(&mi->start_alter_lock);
+
+ DBUG_ASSERT(!info->direct_commit_alter ||
+ info->state == start_alter_state::ROLLBACK_ALTER);
+
+ while (info->state == start_alter_state::REGISTERED)
+ {
+ mysql_cond_wait(&info->start_alter_cond, &mi->start_alter_lock);
+ }
+ if (info->state == start_alter_state::ROLLBACK_ALTER)
+ {
+ /*
+ SA thread will not give error , We will set the error in info->error
+ and then RA worker will output the error
+ We can modify the info->error without taking mutex, because CA worker
+ will be waiting on ::COMPLETED wait condition
+ */
+ if(thd->is_error())
+ {
+ info->error= thd->get_stmt_da()->sql_errno();
+ thd->clear_error();
+ thd->reset_killed();
+ }
+ }
+ mysql_mutex_unlock(&mi->start_alter_lock);
+
+ return info->state == start_alter_state::ROLLBACK_ALTER;
+#else
+ return 0;
+#endif
+}
+
+#ifdef HAVE_REPLICATION
+/**
+ In this function, we are going to change info->state to ::COMPLETED.
+ This means we are messaging CA/RA worker that we have binlogged, so our
+ here is finished.
+
+ @param thd Thread handle
+ @param start_alter_state ALTER replicaton execution context
+ @param mi Master_info of the replication source
+*/
+static void alter_committed(THD *thd, start_alter_info* info, Master_info *mi)
+{
+ start_alter_state tmp= info->state;
+ mysql_mutex_lock(&mi->start_alter_lock);
+ info->state= start_alter_state::COMPLETED;
+ mysql_cond_broadcast(&info->start_alter_cond);
+ mysql_mutex_unlock(&mi->start_alter_lock);
+ if (tmp == start_alter_state::ROLLBACK_ALTER)
+ {
+ thd->clear_error();
+ thd->reset_killed();
+ }
+}
+#endif
+
+/**
+ process_master_state:- process the info->state recieved from master
+ We will comapre master state with alter_result
+ In the case of ROLLBACK_ALTER alter_result > 0
+ In the case of COMMIT_ALTER alter_result == 0
+ if the condition is not satisfied we will report error and
+ Return 1. Make sure wait_for_master is called before calling this function
+ This function should be called only at the write binlog time of commit/
+ rollback alter. We will alter_committed if everything is fine.
+
+ @param thd Thread handle.
+ @param alter_result Result of execution.
+ @param[in/out]
+ start_alter_id Start Alter identifier or zero,
+ it is reset to zero.
+ @param if_exists True indicates the binary logging of the query
+ should be done with "if exists" option.
+ @retval 1 error
+ @retval 0 Ok
+*/
+static int process_master_state(THD *thd, int alter_result,
+ uint64 &start_alter_id, bool if_exists)
+{
+#ifdef HAVE_REPLICATION
+ start_alter_info *info= thd->rgi_slave->sa_info;
+ bool partial_alter= false;
+
+ if (info->state == start_alter_state::INVALID)
+ {
+ /* the caller has not yet called SA logging nor wait for master decision */
+ if (!write_bin_log_start_alter(thd, partial_alter, start_alter_id,
+ if_exists))
+ wait_for_master(thd);
+
+ DBUG_ASSERT(!partial_alter);
+ }
+
+ /* this function shouldn't be called twice */
+ DBUG_ASSERT(start_alter_id);
+
+ start_alter_id= 0;
+ if ((info->state == start_alter_state::ROLLBACK_ALTER && alter_result >= 0)
+ || (info->state == start_alter_state::COMMIT_ALTER && !alter_result))
+ {
+ alter_committed(thd, info, thd->rgi_slave->rli->mi);
+ return 0;
+ }
+ else
+ {
+ thd->is_slave_error= 1;
+ return 1;
+ }
+#else
+ return 0;
+#endif
+}
+
+/*
+ Returns a (global unique) identifier of START Alter when slave applier
+ executes mysql_alter_table().
+ In non-slave context it is zero.
+*/
+static uint64 get_start_alter_id(THD *thd)
+{
+ DBUG_ASSERT(!(thd->rgi_slave &&
+ (thd->rgi_slave->gtid_ev_flags_extra &
+ Gtid_log_event::FL_START_ALTER_E1)) ||
+ !thd->rgi_slave->direct_commit_alter);
+ return
+ thd->rgi_slave &&
+ (thd->rgi_slave->gtid_ev_flags_extra & Gtid_log_event::FL_START_ALTER_E1) ?
+ thd->variables.gtid_seq_no : 0;
+}
+
/**
Alter table
@@ -9440,6 +9749,14 @@ bool mysql_alter_table(THD *thd, const LEX_CSTRING *new_db,
bool partition_changed= false;
bool fast_alter_partition= false;
#endif
+ bool partial_alter= false;
+ /*
+ start_alter_id is the gtid seq no of the START Alter - the 1st part
+ of two-phase loggable ALTER. The variable is meaningful only
+ on slave execution.
+ */
+ uint64 start_alter_id= get_start_alter_id(thd);
+
/*
Create .FRM for new version of table with a temporary name.
We don't log the statement, it will be logged later.
@@ -9578,6 +9895,7 @@ bool mysql_alter_table(THD *thd, const LEX_CSTRING *new_db,
}
table= table_list->table;
+ bool is_reg_table= table->s->tmp_table == NO_TMP_TABLE;
#ifdef WITH_WSREP
if (WSREP(thd) &&
@@ -10358,7 +10676,8 @@ do_continue:;
&ha_alter_info,
&target_mdl_request, &ddl_log_state,
&trigger_param,
- &alter_ctx);
+ &alter_ctx, partial_alter,
+ start_alter_id, if_exists);
thd->count_cuted_fields= org_count_cuted_fields;
inplace_alter_table_committed= ha_alter_info.inplace_alter_table_committed;
inplace_alter_table_committed_argument=
@@ -10413,6 +10732,25 @@ do_continue:;
else
thd->close_unused_temporary_table_instances(table_list);
+ if (table->s->tmp_table == NO_TMP_TABLE)
+ {
+ if (write_bin_log_start_alter(thd, partial_alter, start_alter_id,
+ if_exists))
+ goto err_new_table_cleanup;
+ }
+ else if (start_alter_id)
+ {
+ DBUG_ASSERT(thd->rgi_slave);
+
+ my_error(ER_INCONSISTENT_SLAVE_TEMP_TABLE, MYF(0), thd->query(),
+ table_list->db.str, table_list->table_name.str);
+ goto err_new_table_cleanup;
+ }
+
+ DBUG_EXECUTE_IF("start_alter_delay_master", {
+ debug_sync_set_action(thd,
+ STRING_WITH_LEN("now wait_for alter_cont"));
+ });
// It's now safe to take the table level lock.
if (lock_tables(thd, table_list, alter_ctx.tables_opened,
MYSQL_LOCK_USE_MALLOC))
@@ -10525,6 +10863,13 @@ do_continue:;
}
thd->count_cuted_fields= CHECK_FIELD_IGNORE;
+ if (start_alter_id)
+ {
+ DBUG_ASSERT(thd->slave_thread);
+
+ if (wait_for_master(thd))
+ goto err_new_table_cleanup;
+ }
if (table->s->tmp_table != NO_TMP_TABLE)
{
/* Release lock if this is a transactional temporary table */
@@ -10568,6 +10913,9 @@ do_continue:;
binlog_commit(thd, true);
}
+ DBUG_ASSERT(!start_alter_id); // no 2 phase logging for
+ DBUG_ASSERT(!partial_alter); // temporary table alter
+
/* We don't replicate alter table statement on temporary tables */
if (!thd->is_current_stmt_binlog_format_row() &&
table_creation_was_logged &&
@@ -10806,12 +11154,26 @@ end_inplace:
DBUG_ASSERT(!(mysql_bin_log.is_open() &&
thd->is_current_stmt_binlog_format_row() &&
(create_info->tmp_table())));
- if (!binlog_as_create_select)
+
+ if(start_alter_id)
+ {
+ if (!is_reg_table)
+ {
+ my_error(ER_INCONSISTENT_SLAVE_TEMP_TABLE, MYF(0), thd->query(),
+ table_list->db.str, table_list->table_name.str);
+ DBUG_RETURN(true);
+ }
+
+ if (process_master_state(thd, 0, start_alter_id, if_exists))
+ DBUG_RETURN(true);
+ }
+ else if (!binlog_as_create_select)
{
int tmp_error;
thd->binlog_xid= thd->query_id;
ddl_log_update_xid(&ddl_log_state, thd->binlog_xid);
- tmp_error= write_bin_log_with_if_exists(thd, true, false, log_if_exists);
+ tmp_error= write_bin_log_with_if_exists(thd, true, false, log_if_exists,
+ partial_alter);
thd->binlog_xid= 0;
if (tmp_error)
goto err_cleanup;
@@ -10908,6 +11270,10 @@ err_cleanup:
/* Signal to storage engine that ddl log is committed */
(*inplace_alter_table_committed)(inplace_alter_table_committed_argument);
}
+ DEBUG_SYNC(thd, "alter_table_after_temp_table_drop");
+ if (partial_alter || start_alter_id)
+ write_bin_log_start_alter_rollback(thd, start_alter_id, partial_alter,
+ if_exists);
DBUG_RETURN(true);
err_with_mdl:
diff --git a/sql/sql_table.h b/sql/sql_table.h
index eaa03bfaf8c..e3cfec9ca93 100644
--- a/sql/sql_table.h
+++ b/sql/sql_table.h
@@ -211,7 +211,8 @@ int write_bin_log(THD *thd, bool clear_error,
char const *query, ulong query_length,
bool is_trans= FALSE);
int write_bin_log_with_if_exists(THD *thd, bool clear_error,
- bool is_trans, bool add_if_exists);
+ bool is_trans, bool add_if_exists,
+ bool commit_alter= false);
void promote_first_timestamp_column(List<Create_field> *column_definitions);
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index 7f2c34bae3e..a0c4c5763ed 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -2409,6 +2409,15 @@ static Sys_var_bit Sys_skip_parallel_replication(
SESSION_ONLY(option_bits), NO_CMD_LINE, OPTION_RPL_SKIP_PARALLEL,
DEFAULT(FALSE));
+static const char *binlog_alter_two_phase_enum[]=
+ {"No","Yes", NullS};
+static Sys_var_enum Sys_binlog_alter_two_phase(
+ "binlog_alter_two_phase",
+ "When set, split ALTER at binary logging into 2 statements: "
+ "START ALTER and COMMIT/ROLLBACK ALTER",
+ SESSION_VAR(binlog_alter_two_phase), CMD_LINE(REQUIRED_ARG),
+ binlog_alter_two_phase_enum,
+ DEFAULT(0));
static bool
check_gtid_ignore_duplicates(sys_var *self, THD *thd, set_var *var)
diff --git a/sql/temporary_tables.cc b/sql/temporary_tables.cc
index 981958abdb2..c582bf70e30 100644
--- a/sql/temporary_tables.cc
+++ b/sql/temporary_tables.cc
@@ -340,7 +340,8 @@ bool THD::open_temporary_table(TABLE_LIST *tl)
*/
DBUG_ASSERT(!tl->derived);
DBUG_ASSERT(!tl->schema_table);
- DBUG_ASSERT(has_temporary_tables());
+ DBUG_ASSERT(has_temporary_tables() ||
+ (rgi_slave && rgi_slave->is_parallel_exec));
if (tl->open_type == OT_BASE_ONLY)
{
@@ -872,12 +873,20 @@ void THD::restore_tmp_table_share(TMP_TABLE_SHARE *share)
bool THD::has_temporary_tables()
{
DBUG_ENTER("THD::has_temporary_tables");
- bool result=
+ bool result;
#ifdef HAVE_REPLICATION
- rgi_slave ? (rgi_slave->rli->save_temporary_tables &&
- !rgi_slave->rli->save_temporary_tables->is_empty()) :
+ if (rgi_slave)
+ {
+ mysql_mutex_lock(&rgi_slave->rli->data_lock);
+ result= rgi_slave->rli->save_temporary_tables &&
+ !rgi_slave->rli->save_temporary_tables->is_empty();
+ mysql_mutex_unlock(&rgi_slave->rli->data_lock);
+ }
+ else
#endif
- has_thd_temporary_tables();
+ {
+ result= has_thd_temporary_tables();
+ }
DBUG_RETURN(result);
}