summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/handler.cc5
-rw-r--r--sql/log.h1
-rw-r--r--sql/log_event.cc18
-rw-r--r--sql/mysqld.cc53
-rw-r--r--sql/mysqld.h6
-rw-r--r--sql/rpl_gtid.cc274
-rw-r--r--sql/rpl_gtid.h64
-rw-r--r--sql/rpl_mi.cc29
-rw-r--r--sql/rpl_mi.h2
-rw-r--r--sql/rpl_parallel.cc4
-rw-r--r--sql/rpl_rli.cc483
-rw-r--r--sql/rpl_rli.h1
-rw-r--r--sql/set_var.cc219
-rw-r--r--sql/set_var.h7
-rw-r--r--sql/slave.cc226
-rw-r--r--sql/slave.h3
-rw-r--r--sql/sql_plugin.cc30
-rw-r--r--sql/sys_vars.cc39
-rw-r--r--sql/sys_vars.ic110
19 files changed, 1459 insertions, 115 deletions
diff --git a/sql/handler.cc b/sql/handler.cc
index c74675ed113..7a1cec3b6f2 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -1546,6 +1546,7 @@ static int
commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans)
{
int error= 0;
+ uint count= 0;
Ha_trx_info *ha_info= trans->ha_list, *ha_info_next;
DBUG_ENTER("commit_one_phase_2");
if (is_real_trans)
@@ -1563,6 +1564,8 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans)
}
/* Should this be done only if is_real_trans is set ? */
status_var_increment(thd->status_var.ha_commit_count);
+ if (is_real_trans && ht != binlog_hton && ha_info->is_trx_read_write())
+ ++count;
ha_info_next= ha_info->next();
ha_info->reset(); /* keep it conveniently zero-filled */
}
@@ -1581,6 +1584,8 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans)
{
thd->has_waiter= false;
thd->transaction.cleanup();
+ if (count >= 2)
+ statistic_increment(transactions_multi_engine, LOCK_status);
}
DBUG_RETURN(error);
diff --git a/sql/log.h b/sql/log.h
index eaa63d4072d..f57693f9d2a 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -1096,6 +1096,7 @@ void make_default_log_name(char **out, const char* log_ext, bool once);
void binlog_reset_cache(THD *thd);
extern MYSQL_PLUGIN_IMPORT MYSQL_BIN_LOG mysql_bin_log;
+extern handlerton *binlog_hton;
extern LOGGER logger;
extern const char *log_bin_index;
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 44a2bf782fd..6aaa9a657fa 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -5027,6 +5027,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
int expected_error,actual_error= 0;
Schema_specification_st db_options;
uint64 sub_id= 0;
+ void *hton= NULL;
rpl_gtid gtid;
Relay_log_info const *rli= rgi->rli;
Rpl_filter *rpl_filter= rli->mi->rpl_filter;
@@ -5197,7 +5198,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
gtid= rgi->current_gtid;
if (rpl_global_gtid_slave_state->record_gtid(thd, &gtid, sub_id,
- true, false))
+ true, false, &hton))
{
int errcode= thd->get_stmt_da()->sql_errno();
if (!is_parallel_retry_error(rgi, errcode))
@@ -5418,7 +5419,7 @@ compare_errors:
end:
if (sub_id && !thd->is_slave_error)
- rpl_global_gtid_slave_state->update_state_hash(sub_id, &gtid, rgi);
+ rpl_global_gtid_slave_state->update_state_hash(sub_id, &gtid, hton, rgi);
/*
Probably we have set thd->query, thd->db, thd->catalog to point to places
@@ -7901,15 +7902,17 @@ Gtid_list_log_event::do_apply_event(rpl_group_info *rgi)
int ret;
if (gl_flags & FLAG_IGN_GTIDS)
{
+ void *hton= NULL;
uint32 i;
+
for (i= 0; i < count; ++i)
{
if ((ret= rpl_global_gtid_slave_state->record_gtid(thd, &list[i],
- sub_id_list[i],
- false, false)))
+ sub_id_list[i],
+ false, false, &hton)))
return ret;
rpl_global_gtid_slave_state->update_state_hash(sub_id_list[i], &list[i],
- NULL);
+ hton, NULL);
}
}
ret= Log_event::do_apply_event(rgi);
@@ -8390,6 +8393,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
rpl_gtid gtid;
uint64 sub_id= 0;
Relay_log_info const *rli= rgi->rli;
+ void *hton= NULL;
/*
XID_EVENT works like a COMMIT statement. And it also updates the
@@ -8414,7 +8418,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
gtid= rgi->current_gtid;
err= rpl_global_gtid_slave_state->record_gtid(thd, &gtid, sub_id, true,
- false);
+ false, &hton);
if (err)
{
int ec= thd->get_stmt_da()->sql_errno();
@@ -8447,7 +8451,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
thd->mdl_context.release_transactional_locks();
if (!res && sub_id)
- rpl_global_gtid_slave_state->update_state_hash(sub_id, &gtid, rgi);
+ rpl_global_gtid_slave_state->update_state_hash(sub_id, &gtid, hton, rgi);
/*
Increment the global status commit count variable
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index b8e0b44843c..c66f1c45c43 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -372,6 +372,8 @@ char *my_bind_addr_str;
static char *default_collation_name;
char *default_storage_engine, *default_tmp_storage_engine;
char *enforced_storage_engine=NULL;
+char *gtid_pos_auto_engines;
+plugin_ref *opt_gtid_pos_auto_plugins;
static char compiled_default_collation_name[]= MYSQL_DEFAULT_COLLATION_NAME;
static I_List<CONNECT> thread_cache;
static bool binlog_format_used= false;
@@ -523,6 +525,9 @@ ulong max_connections, max_connect_errors;
ulong extra_max_connections;
uint max_digest_length= 0;
ulong slave_retried_transactions;
+ulong transactions_multi_engine;
+ulong rpl_transactions_multi_engine;
+ulong transactions_gtid_foreign_engine;
ulonglong slave_skipped_errors;
ulong feature_files_opened_with_delayed_keys= 0, feature_check_constraint= 0;
ulonglong denied_connections;
@@ -4258,6 +4263,7 @@ static int init_common_variables()
default_storage_engine= const_cast<char *>("MyISAM");
#endif
default_tmp_storage_engine= NULL;
+ gtid_pos_auto_engines= const_cast<char *>("");
/*
Add server status variables to the dynamic list of
@@ -4937,6 +4943,34 @@ static int init_default_storage_engine_impl(const char *opt_name,
return 0;
}
+
+static int
+init_gtid_pos_auto_engines(void)
+{
+ plugin_ref *plugins;
+
+ /*
+ For the command-line option --gtid_pos_auto_engines, we allow (and ignore)
+ engines that are unknown. This is convenient, since it allows to set
+ default auto-create engines that might not be used by particular users.
+ The option sets a list of storage engines that will have gtid position
+ table auto-created for them if needed. And if the engine is not available,
+ then it will certainly not be needed.
+ */
+ if (gtid_pos_auto_engines)
+ plugins= resolve_engine_list(NULL, gtid_pos_auto_engines,
+ strlen(gtid_pos_auto_engines), false, false);
+ else
+ plugins= resolve_engine_list(NULL, "", 0, false, false);
+ if (!plugins)
+ return 1;
+ mysql_mutex_lock(&LOCK_global_system_variables);
+ opt_gtid_pos_auto_plugins= plugins;
+ mysql_mutex_unlock(&LOCK_global_system_variables);
+ return 0;
+}
+
+
static int init_server_components()
{
DBUG_ENTER("init_server_components");
@@ -5374,6 +5408,9 @@ static int init_server_components()
if (init_default_storage_engine(enforced_storage_engine, enforced_table_plugin))
unireg_abort(1);
+ if (init_gtid_pos_auto_engines())
+ unireg_abort(1);
+
#ifdef USE_ARIA_FOR_TMP_TABLES
if (!ha_storage_engine_is_enabled(maria_hton) && !opt_bootstrap)
{
@@ -7367,6 +7404,14 @@ struct my_option my_long_options[]=
"Set up signals usable for debugging. Deprecated, use --debug-gdb instead.",
&opt_debugging, &opt_debugging,
0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
+ {"gtid-pos-auto-engines", 0,
+ "List of engines for which to automatically create a "
+ "mysql.gtid_slave_pos_ENGINE table, if a transaction using that engine "
+ "is replicated. This can be used to avoid introducing cross-engine "
+ "transactions, if engines are used different from that used by table "
+ "mysql.gtid_slave_pos",
+ &gtid_pos_auto_engines, 0, 0, GET_STR, REQUIRED_ARG,
+ 0, 0, 0, 0, 0, 0 },
#ifdef HAVE_LARGE_PAGE_OPTION
{"super-large-pages", 0, "Enable support for super large pages.",
&opt_super_large_pages, &opt_super_large_pages, 0,
@@ -7764,7 +7809,7 @@ static int show_slaves_running(THD *thd, SHOW_VAR *var, char *buff)
var->type= SHOW_LONGLONG;
var->value= buff;
- *((longlong *)buff)= any_slave_sql_running();
+ *((longlong *)buff)= any_slave_sql_running(false);
return 0;
}
@@ -8539,6 +8584,9 @@ SHOW_VAR status_vars[]= {
{"Threads_connected", (char*) &connection_count, SHOW_INT},
{"Threads_created", (char*) &thread_created, SHOW_LONG_NOFLUSH},
{"Threads_running", (char*) &thread_running, SHOW_INT},
+ {"Transactions_multi_engine", (char*) &transactions_multi_engine, SHOW_LONG},
+ {"Rpl_transactions_multi_engine", (char*) &rpl_transactions_multi_engine, SHOW_LONG},
+ {"Transactions_gtid_foreign_engine", (char*) &transactions_gtid_foreign_engine, SHOW_LONG},
{"Update_scan", (char*) offsetof(STATUS_VAR, update_scan_count), SHOW_LONG_STATUS},
{"Uptime", (char*) &show_starttime, SHOW_SIMPLE_FUNC},
#ifdef ENABLED_PROFILING
@@ -8782,6 +8830,9 @@ static int mysql_init_variables(void)
report_user= report_password = report_host= 0; /* TO BE DELETED */
opt_relay_logname= opt_relaylog_index_name= 0;
slave_retried_transactions= 0;
+ transactions_multi_engine= 0;
+ rpl_transactions_multi_engine= 0;
+ transactions_gtid_foreign_engine= 0;
log_bin_basename= NULL;
log_bin_index= NULL;
diff --git a/sql/mysqld.h b/sql/mysqld.h
index 38e42dd61f1..6cf5a3776a0 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -19,6 +19,7 @@
#include <my_global.h> /* MYSQL_PLUGIN_IMPORT, FN_REFLEN, FN_EXTLEN */
#include "sql_basic_types.h" /* query_id_t */
+#include "sql_plugin.h"
#include "sql_bitmap.h" /* Bitmap */
#include "my_decimal.h" /* my_decimal */
#include "mysql_com.h" /* SERVER_VERSION_LENGTH */
@@ -130,6 +131,9 @@ extern my_bool opt_safe_show_db, opt_local_infile, opt_myisam_use_mmap;
extern my_bool opt_slave_compressed_protocol, use_temp_pool;
extern ulong slave_exec_mode_options, slave_ddl_exec_mode_options;
extern ulong slave_retried_transactions;
+extern ulong transactions_multi_engine;
+extern ulong rpl_transactions_multi_engine;
+extern ulong transactions_gtid_foreign_engine;
extern ulong slave_run_triggers_for_rbr;
extern ulonglong slave_type_conversions_options;
extern my_bool read_only, opt_readonly;
@@ -153,6 +157,8 @@ extern char *default_tz_name;
extern Time_zone *default_tz;
extern char *default_storage_engine, *default_tmp_storage_engine;
extern char *enforced_storage_engine;
+extern char *gtid_pos_auto_engines;
+extern plugin_ref *opt_gtid_pos_auto_plugins;
extern bool opt_endinfo, using_udf_functions;
extern my_bool locked_in_memory;
extern bool opt_using_transactions;
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index c385434e41e..fb57babd50f 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -26,6 +26,7 @@
#include "key.h"
#include "rpl_gtid.h"
#include "rpl_rli.h"
+#include "slave.h"
const LEX_STRING rpl_gtid_slave_state_table_name=
@@ -33,7 +34,7 @@ const LEX_STRING rpl_gtid_slave_state_table_name=
void
-rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid,
+rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid, void *hton,
rpl_group_info *rgi)
{
int err;
@@ -45,7 +46,7 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid,
it is even committed.
*/
mysql_mutex_lock(&LOCK_slave_state);
- err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, rgi);
+ err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, hton, rgi);
mysql_mutex_unlock(&LOCK_slave_state);
if (err)
{
@@ -74,12 +75,14 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
if (rgi->gtid_pending)
{
uint64 sub_id= rgi->gtid_sub_id;
+ void *hton= NULL;
+
rgi->gtid_pending= false;
if (rgi->gtid_ignore_duplicate_state!=rpl_group_info::GTID_DUPLICATE_IGNORE)
{
- if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false))
+ if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false, &hton))
DBUG_RETURN(1);
- update_state_hash(sub_id, &rgi->current_gtid, rgi);
+ update_state_hash(sub_id, &rgi->current_gtid, hton, rgi);
}
rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
}
@@ -243,7 +246,7 @@ rpl_slave_state_free_element(void *arg)
rpl_slave_state::rpl_slave_state()
- : last_sub_id(0), loaded(false)
+ : last_sub_id(0), gtid_pos_tables(0), loaded(false)
{
mysql_mutex_init(key_LOCK_slave_state, &LOCK_slave_state,
MY_MUTEX_INIT_SLOW);
@@ -255,6 +258,7 @@ rpl_slave_state::rpl_slave_state()
rpl_slave_state::~rpl_slave_state()
{
+ free_gtid_pos_tables((struct gtid_pos_table *)gtid_pos_tables);
truncate_hash();
my_hash_free(&hash);
delete_dynamic(&gtid_sort_array);
@@ -286,11 +290,12 @@ rpl_slave_state::truncate_hash()
int
rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
- uint64 seq_no, rpl_group_info *rgi)
+ uint64 seq_no, void *hton, rpl_group_info *rgi)
{
element *elem= NULL;
list_element *list_elem= NULL;
+ DBUG_ASSERT(hton || !loaded);
if (!(elem= get_element(domain_id)))
return 1;
@@ -335,6 +340,7 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
list_elem->server_id= server_id;
list_elem->sub_id= sub_id;
list_elem->seq_no= seq_no;
+ list_elem->hton= hton;
elem->add(list_elem);
if (last_sub_id < sub_id)
@@ -466,6 +472,94 @@ gtid_check_rpl_slave_state_table(TABLE *table)
/*
+ Attempt to find a mysql.gtid_slave_posXXX table that has a storage engine
+ that is already in use by the current transaction, if any.
+*/
+void
+rpl_slave_state::select_gtid_pos_table(THD *thd, LEX_STRING *out_tablename)
+{
+ struct gtid_pos_table *list, *table_entry, *default_entry;
+
+ /*
+ See comments on rpl_slave_state::gtid_pos_tables for rules around proper
+ access to the list.
+ */
+ list= (struct gtid_pos_table *)
+ my_atomic_loadptr_explicit(&gtid_pos_tables, MY_MEMORY_ORDER_ACQUIRE);
+
+ Ha_trx_info *ha_info;
+ uint count = 0;
+ for (ha_info= thd->transaction.all.ha_list; ha_info; ha_info= ha_info->next())
+ {
+ void *trx_hton= ha_info->ht();
+ table_entry= list;
+
+ if (!ha_info->is_trx_read_write() || trx_hton == binlog_hton)
+ continue;
+ while (table_entry)
+ {
+ if (table_entry->table_hton == trx_hton)
+ {
+ if (likely(table_entry->state == GTID_POS_AVAILABLE))
+ {
+ *out_tablename= table_entry->table_name;
+ /*
+ Check if this is a cross-engine transaction, so we can correctly
+ maintain the rpl_transactions_multi_engine status variable.
+ */
+ if (count >= 1)
+ statistic_increment(rpl_transactions_multi_engine, LOCK_status);
+ else
+ {
+ for (;;)
+ {
+ ha_info= ha_info->next();
+ if (!ha_info)
+ break;
+ if (ha_info->is_trx_read_write() && ha_info->ht() != binlog_hton)
+ {
+ statistic_increment(rpl_transactions_multi_engine, LOCK_status);
+ break;
+ }
+ }
+ }
+ return;
+ }
+ /*
+ This engine is marked to automatically create the table.
+ We cannot easily do this here (possibly in the middle of a
+ transaction). But we can request the slave background thread
+ to create it, and in a short while it should become available
+ for following transactions.
+ */
+#ifdef HAVE_REPLICATION
+ slave_background_gtid_pos_create_request(table_entry);
+#endif
+ break;
+ }
+ table_entry= table_entry->next;
+ }
+ ++count;
+ }
+ /*
+ If we cannot find any table whose engine matches an engine that is
+ already active in the transaction, or if there is no current transaction
+ engines available, we return the default gtid_slave_pos table.
+ */
+ default_entry= (struct gtid_pos_table *)
+ my_atomic_loadptr_explicit(&default_gtid_pos_table, MY_MEMORY_ORDER_ACQUIRE);
+ *out_tablename= default_entry->table_name;
+ /* Record in status that we failed to find a suitable gtid_pos table. */
+ if (count > 0)
+ {
+ statistic_increment(transactions_gtid_foreign_engine, LOCK_status);
+ if (count > 1)
+ statistic_increment(rpl_transactions_multi_engine, LOCK_status);
+ }
+}
+
+
+/*
Write a gtid to the replication slave state table.
Do it as part of the transaction, to get slave crash safety, or as a separate
@@ -481,19 +575,24 @@ gtid_check_rpl_slave_state_table(TABLE *table)
*/
int
rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
- bool in_transaction, bool in_statement)
+ bool in_transaction, bool in_statement,
+ void **out_hton)
{
TABLE_LIST tlist;
int err= 0;
bool table_opened= false;
TABLE *table;
- list_element *elist= 0, *next;
+ list_element *delete_list= 0, *next, *cur, **next_ptr_ptr, **best_ptr_ptr;
+ uint64_t best_sub_id;
element *elem;
ulonglong thd_saved_option= thd->variables.option_bits;
Query_tables_list lex_backup;
wait_for_commit* suspended_wfc;
+ void *hton= NULL;
+ LEX_STRING gtid_pos_table_name;
DBUG_ENTER("record_gtid");
+ *out_hton= NULL;
if (unlikely(!loaded))
{
/*
@@ -508,6 +607,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
if (!in_statement)
thd->reset_for_next_command();
+ select_gtid_pos_table(thd, &gtid_pos_table_name);
DBUG_EXECUTE_IF("gtid_inject_record_gtid",
{
@@ -538,14 +638,13 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
*/
suspended_wfc= thd->suspend_subsequent_commits();
thd->lex->reset_n_backup_query_tables_list(&lex_backup);
- tlist.init_one_table(STRING_WITH_LEN("mysql"),
- rpl_gtid_slave_state_table_name.str,
- rpl_gtid_slave_state_table_name.length,
- NULL, TL_WRITE);
+ tlist.init_one_table(STRING_WITH_LEN("mysql"), gtid_pos_table_name.str,
+ gtid_pos_table_name.length, NULL, TL_WRITE);
if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
goto end;
table_opened= true;
table= tlist.table;
+ hton= table->s->db_type();
if ((err= gtid_check_rpl_slave_state_table(table)))
goto end;
@@ -581,6 +680,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
table->file->print_error(err, MYF(0));
goto end;
}
+ *out_hton= hton;
if(opt_bin_log &&
(err= mysql_bin_log.bump_seq_no_counter_if_needed(gtid->domain_id,
@@ -598,36 +698,62 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
err= 1;
goto end;
}
- if ((elist= elem->grab_list()) != NULL)
+
+ /* Now pull out all GTIDs that were recorded in this engine. */
+ delete_list = NULL;
+ next_ptr_ptr= &elem->list;
+ cur= elem->list;
+ best_sub_id= 0;
+ best_ptr_ptr= NULL;
+ while (cur)
{
- /* Delete any old stuff, but keep around the most recent one. */
- list_element *cur= elist;
- uint64 best_sub_id= cur->sub_id;
- list_element **best_ptr_ptr= &elist;
- while ((next= cur->next))
+ list_element *next= cur->next;
+ if (cur->hton == hton)
{
- if (next->sub_id > best_sub_id)
+ /* Belongs to same engine, so move it to the delete list. */
+ cur->next= delete_list;
+ delete_list= cur;
+ if (cur->sub_id > best_sub_id)
{
- best_sub_id= next->sub_id;
+ best_sub_id= cur->sub_id;
+ best_ptr_ptr= &delete_list;
+ }
+ else if (best_ptr_ptr == &delete_list)
best_ptr_ptr= &cur->next;
+ }
+ else
+ {
+ /* Another engine, leave it in the list. */
+ if (cur->sub_id > best_sub_id)
+ {
+ best_sub_id= cur->sub_id;
+ /* Current best is not on the delete list. */
+ best_ptr_ptr= NULL;
}
- cur= next;
+ *next_ptr_ptr= cur;
+ next_ptr_ptr= &cur->next;
}
- /*
- Delete the highest sub_id element from the old list, and put it back as
- the single-element new list.
- */
+ cur= next;
+ }
+ *next_ptr_ptr= NULL;
+ /*
+ If the highest sub_id element is on the delete list, put it back on the
+ original list, to preserve the highest sub_id element in the table for
+ GTID position recovery.
+ */
+ if (best_ptr_ptr)
+ {
cur= *best_ptr_ptr;
*best_ptr_ptr= cur->next;
- cur->next= NULL;
+ cur->next= elem->list;
elem->list= cur;
}
mysql_mutex_unlock(&LOCK_slave_state);
- if (!elist)
+ if (!delete_list)
goto end;
- /* Now delete any already committed rows. */
+ /* Now delete any already committed GTIDs. */
bitmap_set_bit(table->read_set, table->field[0]->field_index);
bitmap_set_bit(table->read_set, table->field[1]->field_index);
@@ -636,7 +762,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
table->file->print_error(err, MYF(0));
goto end;
}
- while (elist)
+ while (delete_list)
{
uchar key_buffer[4+8];
@@ -646,9 +772,9 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
/* `break' does not work inside DBUG_EXECUTE_IF */
goto dbug_break; });
- next= elist->next;
+ next= delete_list->next;
- table->field[1]->store(elist->sub_id, true);
+ table->field[1]->store(delete_list->sub_id, true);
/* domain_id is already set in table->record[0] from write_row() above. */
key_copy(key_buffer, table->record[0], &table->key_info[0], 0, false);
if (table->file->ha_index_read_map(table->record[1], key_buffer,
@@ -662,8 +788,8 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
not want to endlessly error on the same element in case of table
corruption or such.
*/
- my_free(elist);
- elist= next;
+ my_free(delete_list);
+ delete_list= next;
if (err)
break;
}
@@ -681,13 +807,13 @@ end:
if (err || (err= ha_commit_trans(thd, FALSE)))
{
/*
- If error, we need to put any remaining elist back into the HASH so we
- can do another delete attempt later.
+ If error, we need to put any remaining delete_list back into the HASH
+ so we can do another delete attempt later.
*/
- if (elist)
+ if (delete_list)
{
mysql_mutex_lock(&LOCK_slave_state);
- put_back_list(gtid->domain_id, elist);
+ put_back_list(gtid->domain_id, delete_list);
mysql_mutex_unlock(&LOCK_slave_state);
}
@@ -1077,11 +1203,12 @@ rpl_slave_state::load(THD *thd, const char *state_from_master, size_t len,
{
rpl_gtid gtid;
uint64 sub_id;
+ void *hton= NULL;
if (gtid_parser_helper(&state_from_master, end, &gtid) ||
!(sub_id= next_sub_id(gtid.domain_id)) ||
- record_gtid(thd, &gtid, sub_id, false, in_statement) ||
- update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, NULL))
+ record_gtid(thd, &gtid, sub_id, false, in_statement, &hton) ||
+ update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, hton, NULL))
return 1;
if (state_from_master == end)
break;
@@ -1115,6 +1242,75 @@ rpl_slave_state::is_empty()
}
+void
+rpl_slave_state::free_gtid_pos_tables(struct rpl_slave_state::gtid_pos_table *list)
+{
+ struct gtid_pos_table *cur, *next;
+
+ cur= list;
+ while (cur)
+ {
+ next= cur->next;
+ my_free(cur);
+ cur= next;
+ }
+}
+
+
+/*
+ Replace the list of available mysql.gtid_slave_posXXX tables with a new list.
+ The caller must be holding LOCK_slave_state. Additionally, this function
+ must only be called while all SQL threads are stopped.
+*/
+void
+rpl_slave_state::set_gtid_pos_tables_list(rpl_slave_state::gtid_pos_table *new_list,
+ rpl_slave_state::gtid_pos_table *default_entry)
+{
+ gtid_pos_table *old_list;
+
+ mysql_mutex_assert_owner(&LOCK_slave_state);
+ old_list= (struct gtid_pos_table *)gtid_pos_tables;
+ my_atomic_storeptr_explicit(&gtid_pos_tables, new_list, MY_MEMORY_ORDER_RELEASE);
+ my_atomic_storeptr_explicit(&default_gtid_pos_table, default_entry,
+ MY_MEMORY_ORDER_RELEASE);
+ free_gtid_pos_tables(old_list);
+}
+
+
+void
+rpl_slave_state::add_gtid_pos_table(rpl_slave_state::gtid_pos_table *entry)
+{
+ mysql_mutex_assert_owner(&LOCK_slave_state);
+ entry->next= (struct gtid_pos_table *)gtid_pos_tables;
+ my_atomic_storeptr_explicit(&gtid_pos_tables, entry, MY_MEMORY_ORDER_RELEASE);
+}
+
+
+struct rpl_slave_state::gtid_pos_table *
+rpl_slave_state::alloc_gtid_pos_table(LEX_STRING *table_name, void *hton,
+ rpl_slave_state::gtid_pos_table_state state)
+{
+ struct gtid_pos_table *p;
+ char *allocated_str;
+
+ if (!my_multi_malloc(MYF(MY_WME),
+ &p, sizeof(*p),
+ &allocated_str, table_name->length+1,
+ NULL))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*p) + table_name->length+1));
+ return NULL;
+ }
+ memcpy(allocated_str, table_name->str, table_name->length+1); // Also copy '\0'
+ p->next = NULL;
+ p->table_hton= hton;
+ p->table_name.str= allocated_str;
+ p->table_name.length= table_name->length;
+ p->state= state;
+ return p;
+}
+
+
void rpl_binlog_state::init()
{
my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id),
diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h
index 5dfac7a3c6f..4a7661e1c52 100644
--- a/sql/rpl_gtid.h
+++ b/sql/rpl_gtid.h
@@ -112,6 +112,12 @@ struct rpl_slave_state
uint64 sub_id;
uint64 seq_no;
uint32 server_id;
+ /*
+ hton of mysql.gtid_slave_pos* table used to record this GTID.
+ Can be NULL if the gtid table failed to load (eg. missing
+ mysql.gtid_slave_pos table following an upgrade).
+ */
+ void *hton;
};
/* Elements in the HASH that hold the state for one domain_id. */
@@ -155,6 +161,26 @@ struct rpl_slave_state
}
};
+ /* Descriptor for mysql.gtid_slave_posXXX table in specific engine. */
+ enum gtid_pos_table_state {
+ GTID_POS_AUTO_CREATE,
+ GTID_POS_CREATE_REQUESTED,
+ GTID_POS_CREATE_IN_PROGRESS,
+ GTID_POS_AVAILABLE
+ };
+ struct gtid_pos_table {
+ struct gtid_pos_table *next;
+ /*
+ Use a void * here, rather than handlerton *, to make explicit that we
+ are not using the value to access any functionality in the engine. It
+ is just used as an opaque value to identify which engine we are using
+ for each GTID row.
+ */
+ void *table_hton;
+ LEX_STRING table_name;
+ uint8 state;
+ };
+
/* Mapping from domain_id to its element. */
HASH hash;
/* Mutex protecting access to the state. */
@@ -163,6 +189,30 @@ struct rpl_slave_state
DYNAMIC_ARRAY gtid_sort_array;
uint64 last_sub_id;
+ /*
+ List of tables available for durably storing the slave GTID position.
+
+ Accesses to this table is protected by LOCK_slave_state. However for
+ efficiency, there is also a provision for read access to it from a running
+ slave without lock.
+
+ An element can be added at the head of a list by storing the new
+ gtid_pos_tables pointer atomically with release semantics, to ensure that
+ the next pointer of the new element is visible to readers of the new list.
+ Other changes (like deleting or replacing elements) must happen only while
+ all SQL driver threads are stopped. LOCK_slave_state must be held in any
+ case.
+
+ The list can be read without lock by an SQL driver thread or worker thread
+ by reading the gtid_pos_tables pointer atomically with acquire semantics,
+ to ensure that it will see the correct next pointer of a new head element.
+
+ The type is struct gtid_pos_table *, but needs to be void * to allow using
+ my_atomic operations without violating C strict aliasing semantics.
+ */
+ void * volatile gtid_pos_tables;
+ /* The default entry in gtid_pos_tables, mysql.gtid_slave_pos. */
+ void * volatile default_gtid_pos_table;
bool loaded;
rpl_slave_state();
@@ -171,10 +221,11 @@ struct rpl_slave_state
void truncate_hash();
ulong count() const { return hash.records; }
int update(uint32 domain_id, uint32 server_id, uint64 sub_id,
- uint64 seq_no, rpl_group_info *rgi);
+ uint64 seq_no, void *hton, rpl_group_info *rgi);
int truncate_state_table(THD *thd);
+ void select_gtid_pos_table(THD *thd, LEX_STRING *out_tablename);
int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
- bool in_transaction, bool in_statement);
+ bool in_transaction, bool in_statement, void **out_hton);
uint64 next_sub_id(uint32 domain_id);
int iterate(int (*cb)(rpl_gtid *, void *), void *data,
rpl_gtid *extra_gtids, uint32 num_extra,
@@ -188,10 +239,17 @@ struct rpl_slave_state
element *get_element(uint32 domain_id);
int put_back_list(uint32 domain_id, list_element *list);
- void update_state_hash(uint64 sub_id, rpl_gtid *gtid, rpl_group_info *rgi);
+ void update_state_hash(uint64 sub_id, rpl_gtid *gtid, void *hton,
+ rpl_group_info *rgi);
int record_and_update_gtid(THD *thd, struct rpl_group_info *rgi);
int check_duplicate_gtid(rpl_gtid *gtid, rpl_group_info *rgi);
void release_domain_owner(rpl_group_info *rgi);
+ void set_gtid_pos_tables_list(gtid_pos_table *new_list,
+ gtid_pos_table *default_entry);
+ void add_gtid_pos_table(gtid_pos_table *entry);
+ struct gtid_pos_table *alloc_gtid_pos_table(LEX_STRING *table_name,
+ void *hton, rpl_slave_state::gtid_pos_table_state state);
+ void free_gtid_pos_tables(struct gtid_pos_table *list);
};
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc
index e90557efd0d..43c5eceecf4 100644
--- a/sql/rpl_mi.cc
+++ b/sql/rpl_mi.cc
@@ -1557,6 +1557,9 @@ bool give_error_if_slave_running(bool already_locked)
/**
any_slave_sql_running()
+ @param
+ already_locked 0 if we need to lock, 1 if we have LOCK_active_mi_locked
+
@return
0 No Slave SQL thread is running
# Number of slave SQL thread running
@@ -1567,26 +1570,28 @@ bool give_error_if_slave_running(bool already_locked)
hash entries can't be accessed.
*/
-uint any_slave_sql_running()
+uint any_slave_sql_running(bool already_locked)
{
uint count= 0;
HASH *hash;
DBUG_ENTER("any_slave_sql_running");
- mysql_mutex_lock(&LOCK_active_mi);
+ if (!already_locked)
+ mysql_mutex_lock(&LOCK_active_mi);
if (unlikely(shutdown_in_progress || !master_info_index))
+ count= 1;
+ else
{
- mysql_mutex_unlock(&LOCK_active_mi);
- DBUG_RETURN(1);
- }
- hash= &master_info_index->master_info_hash;
- for (uint i= 0; i< hash->records; ++i)
- {
- Master_info *mi= (Master_info *)my_hash_element(hash, i);
- if (mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN)
- count++;
+ hash= &master_info_index->master_info_hash;
+ for (uint i= 0; i< hash->records; ++i)
+ {
+ Master_info *mi= (Master_info *)my_hash_element(hash, i);
+ if (mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN)
+ count++;
+ }
}
- mysql_mutex_unlock(&LOCK_active_mi);
+ if (!already_locked)
+ mysql_mutex_unlock(&LOCK_active_mi);
DBUG_RETURN(count);
}
diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h
index ccc1be6e5ce..58333ad1aa8 100644
--- a/sql/rpl_mi.h
+++ b/sql/rpl_mi.h
@@ -379,7 +379,7 @@ void create_logfile_name_with_suffix(char *res_file_name, size_t length,
uchar *get_key_master_info(Master_info *mi, size_t *length,
my_bool not_used __attribute__((unused)));
void free_key_master_info(Master_info *mi);
-uint any_slave_sql_running();
+uint any_slave_sql_running(bool already_locked);
bool give_error_if_slave_running(bool already_lock);
#endif /* HAVE_REPLICATION */
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index aaa72da29db..0994bce4f7a 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -1466,7 +1466,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
*/
if (!new_count && !force)
{
- if (any_slave_sql_running())
+ if (any_slave_sql_running(false))
{
DBUG_PRINT("warning",
("SQL threads running while trying to reset parallel pool"));
@@ -1621,7 +1621,7 @@ err:
int rpl_parallel_resize_pool_if_no_slaves(void)
{
/* master_info_index is set to NULL on shutdown */
- if (opt_slave_parallel_threads > 0 && !any_slave_sql_running())
+ if (opt_slave_parallel_threads > 0 && !any_slave_sql_running(false))
return rpl_parallel_inactivate_pool(&global_rpl_thread_pool);
return 0;
}
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index fda922c3e12..63179f5d8e9 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -32,6 +32,8 @@
#include "slave.h"
#include <mysql/plugin.h>
#include <mysql/service_thd_wait.h>
+#include "lock.h"
+#include "sql_table.h"
static int count_relay_log_space(Relay_log_info* rli);
@@ -1466,41 +1468,22 @@ Relay_log_info::update_relay_log_state(rpl_gtid *gtid_list, uint32 count)
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
-int
-rpl_load_gtid_slave_state(THD *thd)
+struct gtid_pos_element { uint64 sub_id; rpl_gtid gtid; void *hton; };
+
+static int
+scan_one_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array,
+ LEX_STRING *tablename, void **out_hton)
{
TABLE_LIST tlist;
TABLE *table;
bool table_opened= false;
bool table_scanned= false;
- bool array_inited= false;
- struct local_element { uint64 sub_id; rpl_gtid gtid; };
- struct local_element tmp_entry, *entry;
- HASH hash;
- DYNAMIC_ARRAY array;
+ struct gtid_pos_element tmp_entry, *entry;
int err= 0;
- uint32 i;
- DBUG_ENTER("rpl_load_gtid_slave_state");
-
- mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
- bool loaded= rpl_global_gtid_slave_state->loaded;
- mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
- if (loaded)
- DBUG_RETURN(0);
-
- my_hash_init(&hash, &my_charset_bin, 32,
- offsetof(local_element, gtid) + offsetof(rpl_gtid, domain_id),
- sizeof(uint32), NULL, my_free, HASH_UNIQUE);
- if ((err= my_init_dynamic_array(&array, sizeof(local_element), 0, 0, MYF(0))))
- goto end;
- array_inited= true;
thd->reset_for_next_command();
-
- tlist.init_one_table(STRING_WITH_LEN("mysql"),
- rpl_gtid_slave_state_table_name.str,
- rpl_gtid_slave_state_table_name.length,
- NULL, TL_READ);
+ tlist.init_one_table(STRING_WITH_LEN("mysql"), tablename->str,
+ tablename->length, NULL, TL_READ);
if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
goto end;
table_opened= true;
@@ -1546,26 +1529,28 @@ rpl_load_gtid_slave_state(THD *thd)
tmp_entry.gtid.domain_id= domain_id;
tmp_entry.gtid.server_id= server_id;
tmp_entry.gtid.seq_no= seq_no;
- if ((err= insert_dynamic(&array, (uchar *)&tmp_entry)))
+ tmp_entry.hton= table->s->db_type();
+ if ((err= insert_dynamic(array, (uchar *)&tmp_entry)))
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));
goto end;
}
- if ((rec= my_hash_search(&hash, (const uchar *)&domain_id, 0)))
+ if ((rec= my_hash_search(hash, (const uchar *)&domain_id, 0)))
{
- entry= (struct local_element *)rec;
+ entry= (struct gtid_pos_element *)rec;
if (entry->sub_id >= sub_id)
continue;
entry->sub_id= sub_id;
DBUG_ASSERT(entry->gtid.domain_id == domain_id);
entry->gtid.server_id= server_id;
entry->gtid.seq_no= seq_no;
+ entry->hton= table->s->db_type();
}
else
{
- if (!(entry= (struct local_element *)my_malloc(sizeof(*entry),
- MYF(MY_WME))))
+ if (!(entry= (struct gtid_pos_element *)my_malloc(sizeof(*entry),
+ MYF(MY_WME))))
{
my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*entry));
err= 1;
@@ -1575,7 +1560,8 @@ rpl_load_gtid_slave_state(THD *thd)
entry->gtid.domain_id= domain_id;
entry->gtid.server_id= server_id;
entry->gtid.seq_no= seq_no;
- if ((err= my_hash_insert(&hash, (uchar *)entry)))
+ entry->hton= table->s->db_type();
+ if ((err= my_hash_insert(hash, (uchar *)entry)))
{
my_free(entry);
my_error(ER_OUT_OF_RESOURCES, MYF(0));
@@ -1583,6 +1569,249 @@ rpl_load_gtid_slave_state(THD *thd)
}
}
}
+ err= 0; /* Clear HA_ERR_END_OF_FILE */
+
+end:
+ if (table_scanned)
+ {
+ table->file->ha_index_or_rnd_end();
+ ha_commit_trans(thd, FALSE);
+ ha_commit_trans(thd, TRUE);
+ }
+ if (table_opened)
+ {
+ *out_hton= table->s->db_type();
+ close_thread_tables(thd);
+ thd->mdl_context.release_transactional_locks();
+ }
+ return err;
+}
+
+
+/*
+ Look for all tables mysql.gtid_slave_pos*. Read all rows from each such
+ table found into ARRAY. For each domain id, put the row with highest sub_id
+ into HASH.
+*/
+static int
+scan_all_gtid_slave_pos_table(THD *thd, int (*cb)(THD *, LEX_STRING *, void *),
+ void *cb_data)
+{
+ static LEX_STRING mysql_db_name= {C_STRING_WITH_LEN("mysql")};
+ char path[FN_REFLEN];
+ MY_DIR *dirp;
+
+ thd->reset_for_next_command();
+ if (lock_schema_name(thd, mysql_db_name.str))
+ return 1;
+
+ build_table_filename(path, sizeof(path) - 1, mysql_db_name.str, "", "", 0);
+ if (!(dirp= my_dir(path, MYF(MY_DONT_SORT))))
+ {
+ my_error(ER_FILE_NOT_FOUND, MYF(0), path, my_errno);
+ close_thread_tables(thd);
+ thd->mdl_context.release_transactional_locks();
+ return 1;
+ }
+ else
+ {
+ size_t i;
+ Dynamic_array<LEX_STRING*> files(dirp->number_of_files);
+ Discovered_table_list tl(thd, &files);
+ int err;
+
+ err= ha_discover_table_names(thd, &mysql_db_name, dirp, &tl, false);
+ my_dirend(dirp);
+ close_thread_tables(thd);
+ thd->mdl_context.release_transactional_locks();
+ if (err)
+ return err;
+
+ for (i = 0; i < files.elements(); ++i)
+ {
+ if (strncmp(files.at(i)->str,
+ rpl_gtid_slave_state_table_name.str,
+ rpl_gtid_slave_state_table_name.length) == 0)
+ {
+ if ((err= (*cb)(thd, files.at(i), cb_data)))
+ return err;
+ }
+ }
+ }
+
+ return 0;
+}
+
+
+struct load_gtid_state_cb_data {
+ HASH *hash;
+ DYNAMIC_ARRAY *array;
+ struct rpl_slave_state::gtid_pos_table *table_list;
+ struct rpl_slave_state::gtid_pos_table *default_entry;
+};
+
+static int
+process_gtid_pos_table(THD *thd, LEX_STRING *table_name, void *hton,
+ struct load_gtid_state_cb_data *data)
+{
+ struct rpl_slave_state::gtid_pos_table *p, *entry, **next_ptr;
+ bool is_default=
+ (strcmp(table_name->str, rpl_gtid_slave_state_table_name.str) == 0);
+
+ /*
+ Ignore tables with duplicate storage engine, with a warning.
+ Prefer the default mysql.gtid_slave_pos over another table
+ mysql.gtid_slave_posXXX with the same storage engine.
+ */
+ next_ptr= &data->table_list;
+ entry= data->table_list;
+ while (entry)
+ {
+ if (entry->table_hton == hton)
+ {
+ static const char *warning_msg= "Ignoring redundant table mysql.%s "
+ "since mysql.%s has the same storage engine";
+ if (!is_default)
+ {
+ /* Ignore the redundant table. */
+ sql_print_warning(warning_msg, table_name->str, entry->table_name);
+ return 0;
+ }
+ else
+ {
+ sql_print_warning(warning_msg, entry->table_name, table_name->str);
+ /* Delete the redundant table, and proceed to add this one instead. */
+ *next_ptr= entry->next;
+ my_free(entry);
+ break;
+ }
+ }
+ next_ptr= &entry->next;
+ entry= entry->next;
+ }
+
+ p= rpl_global_gtid_slave_state->alloc_gtid_pos_table(table_name,
+ hton, rpl_slave_state::GTID_POS_AVAILABLE);
+ if (!p)
+ return 1;
+ p->next= data->table_list;
+ data->table_list= p;
+ if (is_default)
+ data->default_entry= p;
+ return 0;
+}
+
+
+/*
+ Put tables corresponding to @@gtid_pos_auto_engines at the end of the list,
+ marked to be auto-created if needed.
+*/
+static int
+gtid_pos_auto_create_tables(rpl_slave_state::gtid_pos_table **list_ptr)
+{
+ plugin_ref *auto_engines;
+ int err= 0;
+ mysql_mutex_lock(&LOCK_global_system_variables);
+ for (auto_engines= opt_gtid_pos_auto_plugins;
+ !err && auto_engines && *auto_engines;
+ ++auto_engines)
+ {
+ void *hton= plugin_hton(*auto_engines);
+ char buf[FN_REFLEN+1];
+ LEX_STRING table_name;
+ char *p;
+ rpl_slave_state::gtid_pos_table *entry, **next_ptr;
+
+ /* See if this engine is already in the list. */
+ next_ptr= list_ptr;
+ entry= *list_ptr;
+ while (entry)
+ {
+ if (entry->table_hton == hton)
+ break;
+ next_ptr= &entry->next;
+ entry= entry->next;
+ }
+ if (entry)
+ continue;
+
+ /* Add an auto-create entry for this engine at end of list. */
+ p= strmake(buf, rpl_gtid_slave_state_table_name.str, FN_REFLEN);
+ p= strmake(p, "_", FN_REFLEN - (p - buf));
+ p= strmake(p, plugin_name(*auto_engines)->str, FN_REFLEN - (p - buf));
+ table_name.str= buf;
+ table_name.length= p - buf;
+ entry= rpl_global_gtid_slave_state->alloc_gtid_pos_table
+ (&table_name, hton, rpl_slave_state::GTID_POS_AUTO_CREATE);
+ if (!entry)
+ {
+ err= 1;
+ break;
+ }
+ *next_ptr= entry;
+ }
+ mysql_mutex_unlock(&LOCK_global_system_variables);
+ return err;
+}
+
+
+static int
+load_gtid_state_cb(THD *thd, LEX_STRING *table_name, void *arg)
+{
+ int err;
+ load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg);
+ void *hton;
+
+ if ((err= scan_one_gtid_slave_pos_table(thd, data->hash, data->array,
+ table_name, &hton)))
+ return err;
+ return process_gtid_pos_table(thd, table_name, hton, data);
+}
+
+
+int
+rpl_load_gtid_slave_state(THD *thd)
+{
+ bool array_inited= false;
+ struct gtid_pos_element tmp_entry, *entry;
+ HASH hash;
+ DYNAMIC_ARRAY array;
+ int err= 0;
+ uint32 i;
+ load_gtid_state_cb_data cb_data;
+ DBUG_ENTER("rpl_load_gtid_slave_state");
+
+ mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ bool loaded= rpl_global_gtid_slave_state->loaded;
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ if (loaded)
+ DBUG_RETURN(0);
+
+ cb_data.table_list= NULL;
+ cb_data.default_entry= NULL;
+ my_hash_init(&hash, &my_charset_bin, 32,
+ offsetof(gtid_pos_element, gtid) + offsetof(rpl_gtid, domain_id),
+ sizeof(uint32), NULL, my_free, HASH_UNIQUE);
+ if ((err= my_init_dynamic_array(&array, sizeof(gtid_pos_element), 0, 0, MYF(0))))
+ goto end;
+ array_inited= true;
+
+ cb_data.hash = &hash;
+ cb_data.array = &array;
+ if ((err= scan_all_gtid_slave_pos_table(thd, load_gtid_state_cb, &cb_data)))
+ goto end;
+
+ if (!cb_data.default_entry)
+ {
+ /*
+ If the mysql.gtid_slave_pos table does not exist, but at least one other
+ table is available, arbitrarily pick the first in the list to use as
+ default.
+ */
+ cb_data.default_entry= cb_data.table_list;
+ }
+ if ((err= gtid_pos_auto_create_tables(&cb_data.table_list)))
+ goto end;
mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
if (rpl_global_gtid_slave_state->loaded)
@@ -1591,14 +1820,24 @@ rpl_load_gtid_slave_state(THD *thd)
goto end;
}
+ if (!cb_data.table_list)
+ {
+ my_error(ER_NO_SUCH_TABLE, MYF(0), "mysql",
+ rpl_gtid_slave_state_table_name.str);
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ err= 1;
+ goto end;
+ }
+
for (i= 0; i < array.elements; ++i)
{
get_dynamic(&array, (uchar *)&tmp_entry, i);
if ((err= rpl_global_gtid_slave_state->update(tmp_entry.gtid.domain_id,
- tmp_entry.gtid.server_id,
- tmp_entry.sub_id,
- tmp_entry.gtid.seq_no,
- NULL)))
+ tmp_entry.gtid.server_id,
+ tmp_entry.sub_id,
+ tmp_entry.gtid.seq_no,
+ tmp_entry.hton,
+ NULL)))
{
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
my_error(ER_OUT_OF_RESOURCES, MYF(0));
@@ -1608,7 +1847,7 @@ rpl_load_gtid_slave_state(THD *thd)
for (i= 0; i < hash.records; ++i)
{
- entry= (struct local_element *)my_hash_element(&hash, i);
+ entry= (struct gtid_pos_element *)my_hash_element(&hash, i);
if (opt_bin_log &&
mysql_bin_log.bump_seq_no_counter_if_needed(entry->gtid.domain_id,
entry->gtid.seq_no))
@@ -1619,27 +1858,175 @@ rpl_load_gtid_slave_state(THD *thd)
}
}
+ rpl_global_gtid_slave_state->set_gtid_pos_tables_list(cb_data.table_list,
+ cb_data.default_entry);
+ cb_data.table_list= NULL;
rpl_global_gtid_slave_state->loaded= true;
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
- err= 0; /* Clear HA_ERR_END_OF_FILE */
+end:
+ if (array_inited)
+ delete_dynamic(&array);
+ my_hash_free(&hash);
+ if (cb_data.table_list)
+ rpl_global_gtid_slave_state->free_gtid_pos_tables(cb_data.table_list);
+ DBUG_RETURN(err);
+}
+
+
+static int
+find_gtid_pos_tables_cb(THD *thd, LEX_STRING *table_name, void *arg)
+{
+ load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg);
+ TABLE_LIST tlist;
+ TABLE *table= NULL;
+ int err;
+
+ thd->reset_for_next_command();
+ tlist.init_one_table(STRING_WITH_LEN("mysql"), table_name->str,
+ table_name->length, NULL, TL_READ);
+ if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
+ goto end;
+ table= tlist.table;
+
+ if ((err= gtid_check_rpl_slave_state_table(table)))
+ goto end;
+ err= process_gtid_pos_table(thd, table_name, table->s->db_type(), data);
end:
- if (table_scanned)
+ if (table)
{
- table->file->ha_index_or_rnd_end();
ha_commit_trans(thd, FALSE);
ha_commit_trans(thd, TRUE);
- }
- if (table_opened)
- {
close_thread_tables(thd);
thd->mdl_context.release_transactional_locks();
}
- if (array_inited)
- delete_dynamic(&array);
- my_hash_free(&hash);
- DBUG_RETURN(err);
+
+ return err;
+}
+
+
+/*
+ Re-compute the list of available mysql.gtid_slave_posXXX tables.
+
+ This is done at START SLAVE to pick up any newly created tables without
+ requiring server restart.
+*/
+int
+find_gtid_slave_pos_tables(THD *thd)
+{
+ int err= 0;
+ load_gtid_state_cb_data cb_data;
+ uint num_running;
+
+ mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ bool loaded= rpl_global_gtid_slave_state->loaded;
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ if (!loaded)
+ return 0;
+
+ cb_data.table_list= NULL;
+ cb_data.default_entry= NULL;
+ if ((err= scan_all_gtid_slave_pos_table(thd, find_gtid_pos_tables_cb, &cb_data)))
+ goto end;
+
+ if (!cb_data.table_list)
+ {
+ my_error(ER_NO_SUCH_TABLE, MYF(0), "mysql",
+ rpl_gtid_slave_state_table_name.str);
+ err= 1;
+ goto end;
+ }
+ if (!cb_data.default_entry)
+ {
+ /*
+ If the mysql.gtid_slave_pos table does not exist, but at least one other
+ table is available, arbitrarily pick the first in the list to use as
+ default.
+ */
+ cb_data.default_entry= cb_data.table_list;
+ }
+ if ((err= gtid_pos_auto_create_tables(&cb_data.table_list)))
+ goto end;
+
+ mysql_mutex_lock(&LOCK_active_mi);
+ num_running= any_slave_sql_running(true);
+ mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ if (num_running <= 1)
+ {
+ /*
+ If no slave is running now, the count will be 1, since this SQL thread
+ which is starting is included in the count. In this case, we can safely
+ replace the list, no-one can be trying to read it without lock.
+ */
+ DBUG_ASSERT(num_running == 1);
+ rpl_global_gtid_slave_state->set_gtid_pos_tables_list(cb_data.table_list,
+ cb_data.default_entry);
+ cb_data.table_list= NULL;
+ }
+ else
+ {
+ /*
+ If there are SQL threads running, we cannot safely remove the old list.
+ However we can add new entries, and warn about any tables that
+ disappeared, but may still be visible to running SQL threads.
+ */
+ rpl_slave_state::gtid_pos_table *old_entry, *new_entry, **next_ptr_ptr;
+
+ old_entry= (rpl_slave_state::gtid_pos_table *)
+ rpl_global_gtid_slave_state->gtid_pos_tables;
+ while (old_entry)
+ {
+ new_entry= cb_data.table_list;
+ while (new_entry)
+ {
+ if (new_entry->table_hton == old_entry->table_hton)
+ break;
+ new_entry= new_entry->next;
+ }
+ if (!new_entry)
+ sql_print_warning("The table mysql.%s was removed. "
+ "This change will not take full effect "
+ "until all SQL threads have been restarted",
+ old_entry->table_name.str);
+ old_entry= old_entry->next;
+ }
+ next_ptr_ptr= &cb_data.table_list;
+ new_entry= cb_data.table_list;
+ while (new_entry)
+ {
+ /* Check if we already have a table with this storage engine. */
+ old_entry= (rpl_slave_state::gtid_pos_table *)
+ rpl_global_gtid_slave_state->gtid_pos_tables;
+ while (old_entry)
+ {
+ if (new_entry->table_hton == old_entry->table_hton)
+ break;
+ old_entry= old_entry->next;
+ }
+ if (old_entry)
+ {
+ /* This new_entry is already available in the list. */
+ next_ptr_ptr= &new_entry->next;
+ new_entry= new_entry->next;
+ }
+ else
+ {
+ /* Move this new_entry to the list. */
+ rpl_slave_state::gtid_pos_table *next= new_entry->next;
+ rpl_global_gtid_slave_state->add_gtid_pos_table(new_entry);
+ *next_ptr_ptr= next;
+ new_entry= next;
+ }
+ }
+ }
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ mysql_mutex_unlock(&LOCK_active_mi);
+
+end:
+ if (cb_data.table_list)
+ rpl_global_gtid_slave_state->free_gtid_pos_tables(cb_data.table_list);
+ return err;
}
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 448fc231b2b..e293d681034 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -959,6 +959,7 @@ extern struct rpl_slave_state *rpl_global_gtid_slave_state;
extern gtid_waiting rpl_global_gtid_waiting;
int rpl_load_gtid_slave_state(THD *thd);
+int find_gtid_slave_pos_tables(THD *thd);
int event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev);
void delete_or_keep_event_post_apply(rpl_group_info *rgi,
Log_event_type typ, Log_event *ev);
diff --git a/sql/set_var.cc b/sql/set_var.cc
index 15f6bbdafc5..55587796281 100644
--- a/sql/set_var.cc
+++ b/sql/set_var.cc
@@ -1293,3 +1293,222 @@ enum sys_var::where get_sys_var_value_origin(void *ptr)
return sys_var::CONFIG;
}
+
+/*
+ Find the next item in string of comma-separated items.
+ END_POS points at the end of the string.
+ ITEM_START and ITEM_END return the limits of the next item.
+ Returns true while items are available, false at the end.
+*/
+static bool
+engine_list_next_item(const char **pos, const char *end_pos,
+ const char **item_start, const char **item_end)
+{
+ if (*pos >= end_pos)
+ return false;
+ *item_start= *pos;
+ while (*pos < end_pos && **pos != ',')
+ ++*pos;
+ *item_end= *pos;
+ ++*pos;
+ return true;
+}
+
+
+static bool
+resolve_engine_list_item(THD *thd, plugin_ref *list, uint32 *idx,
+ const char *pos, const char *pos_end,
+ bool error_on_unknown_engine, bool temp_copy)
+{
+ LEX_STRING item_str;
+ plugin_ref ref;
+ uint32_t i;
+ THD *thd_or_null = (temp_copy ? thd : NULL);
+
+ item_str.str= const_cast<char*>(pos);
+ item_str.length= pos_end-pos;
+ ref= ha_resolve_by_name(thd_or_null, &item_str, false);
+ if (!ref)
+ {
+ if (error_on_unknown_engine)
+ {
+ ErrConvString err(pos, pos_end-pos, system_charset_info);
+ my_error(ER_UNKNOWN_STORAGE_ENGINE, MYF(0), err.ptr());
+ return true;
+ }
+ return false;
+ }
+ /* Ignore duplicates, like --plugin-load does. */
+ for (i= 0; i < *idx; ++i)
+ {
+ if (plugin_hton(list[i]) == plugin_hton(ref))
+ {
+ if (!temp_copy)
+ plugin_unlock(NULL, ref);
+ return false;
+ }
+ }
+ list[*idx]= ref;
+ ++*idx;
+ return false;
+}
+
+
+/*
+ Helper for class Sys_var_pluginlist.
+ Resolve a comma-separated list of storage engine names to a null-terminated
+ array of plugin_ref.
+
+ If TEMP_COPY is true, a THD must be given as well. In this case, the
+ allocated memory and locked plugins are registered in the THD and will
+ be freed / unlocked automatically. If TEMP_COPY is true, THD can be
+ passed as NULL, and resources must be freed explicitly later with
+ free_engine_list().
+*/
+plugin_ref *
+resolve_engine_list(THD *thd, const char *str_arg, size_t str_arg_len,
+ bool error_on_unknown_engine, bool temp_copy)
+{
+ uint32 count, idx;
+ const char *pos, *item_start, *item_end;
+ const char *str_arg_end= str_arg + str_arg_len;
+ plugin_ref *res;
+
+ count= 0;
+ pos= str_arg;
+ for (;;)
+ {
+ if (!engine_list_next_item(&pos, str_arg_end, &item_start, &item_end))
+ break;
+ ++count;
+ }
+
+ if (temp_copy)
+ res= (plugin_ref *)thd->calloc((count+1)*sizeof(*res));
+ else
+ res= (plugin_ref *)my_malloc((count+1)*sizeof(*res), MYF(MY_ZEROFILL|MY_WME));
+ if (!res)
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), (int)((count+1)*sizeof(*res)));
+ goto err;
+ }
+
+ idx= 0;
+ pos= str_arg;
+ for (;;)
+ {
+ if (!engine_list_next_item(&pos, str_arg_end, &item_start, &item_end))
+ break;
+ DBUG_ASSERT(idx < count);
+ if (idx >= count)
+ break;
+ if (resolve_engine_list_item(thd, res, &idx, item_start, item_end,
+ error_on_unknown_engine, temp_copy))
+ goto err;
+ }
+
+ return res;
+
+err:
+ if (!temp_copy)
+ free_engine_list(res);
+ return NULL;
+}
+
+
+void
+free_engine_list(plugin_ref *list)
+{
+ plugin_ref *p;
+
+ if (!list)
+ return;
+ for (p= list; *p; ++p)
+ plugin_unlock(NULL, *p);
+ my_free(list);
+}
+
+
+plugin_ref *
+copy_engine_list(plugin_ref *list)
+{
+ plugin_ref *p;
+ uint32 count, i;
+
+ for (p= list, count= 0; *p; ++p, ++count)
+ ;
+ p= (plugin_ref *)my_malloc((count+1)*sizeof(*p), MYF(0));
+ if (!p)
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), (int)((count+1)*sizeof(*p)));
+ return NULL;
+ }
+ for (i= 0; i < count; ++i)
+ p[i]= my_plugin_lock(NULL, list[i]);
+ p[i] = NULL;
+ return p;
+}
+
+
+/*
+ Create a temporary copy of an engine list. The memory will be freed
+ (and the plugins unlocked) automatically, on the passed THD.
+*/
+plugin_ref *
+temp_copy_engine_list(THD *thd, plugin_ref *list)
+{
+ plugin_ref *p;
+ uint32 count, i;
+
+ for (p= list, count= 0; *p; ++p, ++count)
+ ;
+ p= (plugin_ref *)thd->alloc((count+1)*sizeof(*p));
+ if (!p)
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), (int)((count+1)*sizeof(*p)));
+ return NULL;
+ }
+ for (i= 0; i < count; ++i)
+ p[i]= my_plugin_lock(thd, list[i]);
+ p[i] = NULL;
+ return p;
+}
+
+
+char *
+pretty_print_engine_list(THD *thd, plugin_ref *list)
+{
+ plugin_ref *p;
+ size_t size;
+ char *buf, *pos;
+
+ if (!list)
+ return thd->strmake("", 0);
+
+ size= 0;
+ for (p= list; *p; ++p)
+ size+= plugin_name(*p)->length + 1;
+ buf= static_cast<char *>(thd->alloc(size));
+ if (!buf)
+ return NULL;
+ pos= buf;
+ for (p= list; *p; ++p)
+ {
+ LEX_STRING *name;
+ size_t remain;
+
+ remain= buf + size - pos;
+ DBUG_ASSERT(remain > 0);
+ if (remain <= 1)
+ break;
+ if (pos != buf)
+ {
+ pos= strmake(pos, ",", remain-1);
+ --remain;
+ }
+ name= plugin_name(*p);
+ pos= strmake(pos, name->str, MY_MIN(name->length, remain-1));
+ }
+ *pos= '\0';
+ return buf;
+}
diff --git a/sql/set_var.h b/sql/set_var.h
index 17d1ff93ebc..8d39854a744 100644
--- a/sql/set_var.h
+++ b/sql/set_var.h
@@ -286,6 +286,7 @@ public:
longlong longlong_value; ///< for signed integer
double double_value; ///< for Sys_var_double
plugin_ref plugin; ///< for Sys_var_plugin
+ plugin_ref *plugins; ///< for Sys_var_pluginlist
Time_zone *time_zone; ///< for Sys_var_tz
LEX_STRING string_value; ///< for Sys_var_charptr and others
const void *ptr; ///< for Sys_var_struct
@@ -424,6 +425,12 @@ int sys_var_init();
uint sys_var_elements();
int sys_var_add_options(DYNAMIC_ARRAY *long_options, int parse_flags);
void sys_var_end(void);
+plugin_ref *resolve_engine_list(THD *thd, const char *str_arg, size_t str_arg_len,
+ bool error_on_unknown_engine, bool temp_copy);
+void free_engine_list(plugin_ref *list);
+plugin_ref *copy_engine_list(plugin_ref *list);
+plugin_ref *temp_copy_engine_list(THD *thd, plugin_ref *list);
+char *pretty_print_engine_list(THD *thd, plugin_ref *list);
#endif
diff --git a/sql/slave.cc b/sql/slave.cc
index a7f0f003e5c..67785be6255 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -60,6 +60,7 @@
#include "rpl_tblmap.h"
#include "debug_sync.h"
#include "rpl_parallel.h"
+#include "sql_show.h"
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
@@ -279,15 +280,180 @@ static void init_slave_psi_keys(void)
#endif /* HAVE_PSI_INTERFACE */
+/*
+ Note: This definition needs to be kept in sync with the one in
+ mysql_system_tables.sql which is used by mysql_create_db.
+*/
+static const char gtid_pos_table_definition1[]=
+ "CREATE TABLE ";
+static const char gtid_pos_table_definition2[]=
+ " (domain_id INT UNSIGNED NOT NULL, "
+ "sub_id BIGINT UNSIGNED NOT NULL, "
+ "server_id INT UNSIGNED NOT NULL, "
+ "seq_no BIGINT UNSIGNED NOT NULL, "
+ "PRIMARY KEY (domain_id, sub_id)) CHARSET=latin1 "
+ "COMMENT='Replication slave GTID position' "
+ "ENGINE=";
+
+/*
+ Build a query string
+ CREATE TABLE mysql.gtid_slave_pos_<engine> ... ENGINE=<engine>
+*/
+static bool
+build_gtid_pos_create_query(THD *thd, String *query,
+ LEX_STRING *table_name,
+ LEX_STRING *engine_name)
+{
+ bool err= false;
+ err|= query->append(gtid_pos_table_definition1);
+ err|= append_identifier(thd, query, table_name->str, table_name->length);
+ err|= query->append(gtid_pos_table_definition2);
+ err|= append_identifier(thd, query, engine_name->str, engine_name->length);
+ return err;
+}
+
+
+static int
+gtid_pos_table_creation(THD *thd, plugin_ref engine, LEX_STRING *table_name)
+{
+ int err;
+ StringBuffer<sizeof(gtid_pos_table_definition1) +
+ sizeof(gtid_pos_table_definition1) +
+ 2*FN_REFLEN> query;
+
+ if (build_gtid_pos_create_query(thd, &query, table_name, plugin_name(engine)))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return 1;
+ }
+
+ thd->set_db("mysql", 5);
+ thd->clear_error();
+ ulonglong thd_saved_option= thd->variables.option_bits;
+ /* This query shuold not be binlogged. */
+ thd->variables.option_bits&= ~(ulonglong)OPTION_BIN_LOG;
+ thd->set_query_and_id(query.c_ptr(), query.length(), thd->charset(),
+ next_query_id());
+ Parser_state parser_state;
+ err= parser_state.init(thd, thd->query(), thd->query_length());
+ if (err)
+ goto end;
+ mysql_parse(thd, thd->query(), thd->query_length(), &parser_state,
+ FALSE, FALSE);
+ if (thd->is_error())
+ err= 1;
+end:
+ thd->variables.option_bits= thd_saved_option;
+ thd->reset_query();
+ return err;
+}
+
+
+static void
+handle_gtid_pos_auto_create_request(THD *thd, void *hton)
+{
+ int err;
+ plugin_ref engine= NULL, *auto_engines;
+ rpl_slave_state::gtid_pos_table *entry;
+ StringBuffer<FN_REFLEN> loc_table_name;
+ LEX_STRING table_name;
+
+ /*
+ Check that the plugin is still in @@gtid_pos_auto_engines, and lock
+ it.
+ */
+ mysql_mutex_lock(&LOCK_global_system_variables);
+ engine= NULL;
+ for (auto_engines= opt_gtid_pos_auto_plugins;
+ auto_engines && *auto_engines;
+ ++auto_engines)
+ {
+ if (plugin_hton(*auto_engines) == hton)
+ {
+ engine= my_plugin_lock(NULL, *auto_engines);
+ break;
+ }
+ }
+ mysql_mutex_unlock(&LOCK_global_system_variables);
+ if (!engine)
+ {
+ /* The engine is gone from @@gtid_pos_auto_engines, so no action. */
+ goto end;
+ }
+
+ /* Find the entry for the table to auto-create. */
+ mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ entry= (rpl_slave_state::gtid_pos_table *)
+ rpl_global_gtid_slave_state->gtid_pos_tables;
+ while (entry)
+ {
+ if (entry->table_hton == hton &&
+ entry->state == rpl_slave_state::GTID_POS_CREATE_REQUESTED)
+ break;
+ entry= entry->next;
+ }
+ if (entry)
+ {
+ entry->state = rpl_slave_state::GTID_POS_CREATE_IN_PROGRESS;
+ err= loc_table_name.append(entry->table_name.str, entry->table_name.length);
+ }
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ if (!entry)
+ goto end;
+ if (err)
+ {
+ sql_print_error("Out of memory while trying to auto-create GTID position table");
+ goto end;
+ }
+ table_name.str= loc_table_name.c_ptr_safe();
+ table_name.length= loc_table_name.length();
+
+ err= gtid_pos_table_creation(thd, engine, &table_name);
+ if (err)
+ {
+ sql_print_error("Error auto-creating GTID position table `mysql.%s`: %s Error_code: %d",
+ table_name.str, thd->get_stmt_da()->message(),
+ thd->get_stmt_da()->sql_errno());
+ thd->clear_error();
+ goto end;
+ }
+
+ /* Now enable the entry for the auto-created table. */
+ mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ entry= (rpl_slave_state::gtid_pos_table *)
+ rpl_global_gtid_slave_state->gtid_pos_tables;
+ while (entry)
+ {
+ if (entry->table_hton == hton &&
+ entry->state == rpl_slave_state::GTID_POS_CREATE_IN_PROGRESS)
+ {
+ entry->state= rpl_slave_state::GTID_POS_AVAILABLE;
+ break;
+ }
+ entry= entry->next;
+ }
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+
+end:
+ if (engine)
+ plugin_unlock(NULL, engine);
+}
+
+
static bool slave_background_thread_running;
static bool slave_background_thread_stop;
static bool slave_background_thread_gtid_loaded;
-struct slave_background_kill_t {
+static struct slave_background_kill_t {
slave_background_kill_t *next;
THD *to_kill;
} *slave_background_kill_list;
+static struct slave_background_gtid_pos_create_t {
+ slave_background_gtid_pos_create_t *next;
+ void *hton;
+} *slave_background_gtid_pos_create_list;
+
pthread_handler_t
handle_slave_background(void *arg __attribute__((unused)))
@@ -321,6 +487,7 @@ handle_slave_background(void *arg __attribute__((unused)))
do
{
slave_background_kill_t *kill_list;
+ slave_background_gtid_pos_create_t *create_list;
thd->ENTER_COND(&COND_slave_background, &LOCK_slave_background,
&stage_slave_background_wait_request,
@@ -329,12 +496,14 @@ handle_slave_background(void *arg __attribute__((unused)))
{
stop= abort_loop || thd->killed || slave_background_thread_stop;
kill_list= slave_background_kill_list;
- if (stop || kill_list)
+ create_list= slave_background_gtid_pos_create_list;
+ if (stop || kill_list || create_list)
break;
mysql_cond_wait(&COND_slave_background, &LOCK_slave_background);
}
slave_background_kill_list= NULL;
+ slave_background_gtid_pos_create_list= NULL;
thd->EXIT_COND(&old_stage);
while (kill_list)
@@ -353,6 +522,16 @@ handle_slave_background(void *arg __attribute__((unused)))
mysql_mutex_unlock(&to_kill->LOCK_wakeup_ready);
my_free(p);
}
+
+ while (create_list)
+ {
+ slave_background_gtid_pos_create_t *next= create_list->next;
+ void *hton= create_list->hton;
+ handle_gtid_pos_auto_create_request(thd, hton);
+ my_free(create_list);
+ create_list= next;
+ }
+
mysql_mutex_lock(&LOCK_slave_background);
} while (!stop);
@@ -392,6 +571,41 @@ slave_background_kill_request(THD *to_kill)
/*
+ This function must only be called from a slave SQL thread (or worker thread),
+ to ensure that the table_entry will not go away before we can lock the
+ LOCK_slave_state.
+*/
+void
+slave_background_gtid_pos_create_request(
+ rpl_slave_state::gtid_pos_table *table_entry)
+{
+ slave_background_gtid_pos_create_t *p;
+
+ if (table_entry->state != rpl_slave_state::GTID_POS_AUTO_CREATE)
+ return;
+ p= (slave_background_gtid_pos_create_t *)my_malloc(sizeof(*p), MYF(MY_WME));
+ if (!p)
+ return;
+ mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ if (table_entry->state != rpl_slave_state::GTID_POS_AUTO_CREATE)
+ {
+ my_free(p);
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ return;
+ }
+ table_entry->state= rpl_slave_state::GTID_POS_CREATE_REQUESTED;
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+
+ p->hton= table_entry->table_hton;
+ mysql_mutex_lock(&LOCK_slave_background);
+ p->next= slave_background_gtid_pos_create_list;
+ slave_background_gtid_pos_create_list= p;
+ mysql_cond_signal(&COND_slave_background);
+ mysql_mutex_unlock(&LOCK_slave_background);
+}
+
+
+/*
Start the slave background thread.
This thread is currently used for two purposes:
@@ -5065,6 +5279,14 @@ pthread_handler_t handle_slave_sql(void *arg)
if (mi->using_gtid != Master_info::USE_GTID_NO || opt_gtid_strict_mode)
goto err;
}
+ /* Re-load the set of mysql.gtid_slave_posXXX tables available. */
+ if (find_gtid_slave_pos_tables(thd))
+ {
+ rli->report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(), NULL,
+ "Error processing replication GTID position tables: %s",
+ thd->get_stmt_da()->message());
+ goto err;
+ }
/* execute init_slave variable */
if (opt_init_slave.length)
diff --git a/sql/slave.h b/sql/slave.h
index 431e6847abe..c856a6989ed 100644
--- a/sql/slave.h
+++ b/sql/slave.h
@@ -48,6 +48,7 @@
#include "my_list.h"
#include "rpl_filter.h"
#include "rpl_tblmap.h"
+#include "rpl_gtid.h"
#define SLAVE_NET_TIMEOUT 60
@@ -268,6 +269,8 @@ void slave_output_error_info(rpl_group_info *rgi, THD *thd);
pthread_handler_t handle_slave_sql(void *arg);
bool net_request_file(NET* net, const char* fname);
void slave_background_kill_request(THD *to_kill);
+void slave_background_gtid_pos_create_request
+ (rpl_slave_state::gtid_pos_table *table_entry);
extern bool volatile abort_loop;
extern Master_info *active_mi; /* active_mi for multi-master */
diff --git a/sql/sql_plugin.cc b/sql/sql_plugin.cc
index 92b9c94e84c..c73229a2ee0 100644
--- a/sql/sql_plugin.cc
+++ b/sql/sql_plugin.cc
@@ -941,6 +941,10 @@ SHOW_COMP_OPTION plugin_status(const char *name, size_t len, int type)
}
+/*
+ If LEX is passed non-NULL, an automatic unlock of the plugin will happen
+ in the LEX destructor.
+*/
static plugin_ref intern_plugin_lock(LEX *lex, plugin_ref rc)
{
st_plugin_int *pi= plugin_ref_to_int(rc);
@@ -984,6 +988,16 @@ static plugin_ref intern_plugin_lock(LEX *lex, plugin_ref rc)
}
+/*
+ Notes on lifetime:
+
+ If THD is passed as non-NULL (and with a non-NULL thd->lex), an entry is made
+ in the thd->lex which will cause an automatic unlock of the plugin in the LEX
+ destructor. In this case, no manual unlock must be done.
+
+ Otherwise, when passing a NULL THD, the caller must arrange that plugin
+ unlock happens later.
+*/
plugin_ref plugin_lock(THD *thd, plugin_ref ptr)
{
LEX *lex= thd ? thd->lex : 0;
@@ -1020,6 +1034,16 @@ plugin_ref plugin_lock(THD *thd, plugin_ref ptr)
}
+/*
+ Notes on lifetime:
+
+ If THD is passed as non-NULL (and with a non-NULL thd->lex), an entry is made
+ in the thd->lex which will cause an automatic unlock of the plugin in the LEX
+ destructor. In this case, no manual unlock must be done.
+
+ Otherwise, when passing a NULL THD, the caller must arrange that plugin
+ unlock happens later.
+*/
plugin_ref plugin_lock_by_name(THD *thd, const LEX_CSTRING *name, int type)
{
LEX *lex= thd ? thd->lex : 0;
@@ -1935,6 +1959,12 @@ void plugin_shutdown(void)
if (initialized)
{
+ if (opt_gtid_pos_auto_plugins)
+ {
+ free_engine_list(opt_gtid_pos_auto_plugins);
+ opt_gtid_pos_auto_plugins= NULL;
+ }
+
mysql_mutex_lock(&LOCK_plugin);
reap_needed= true;
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index 28665098d4d..de054f31afa 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -3556,6 +3556,45 @@ static Sys_var_plugin Sys_enforce_storage_engine(
NO_CMD_LINE, MYSQL_STORAGE_ENGINE_PLUGIN,
DEFAULT(&enforced_storage_engine), NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(check_has_super));
+
+#ifdef HAVE_REPLICATION
+/*
+ Check
+ 1. Value for gtid_pos_auto_engines is not NULL.
+ 2. No slave SQL thread is running.
+*/
+static bool
+check_gtid_pos_auto_engines(sys_var *self, THD *thd, set_var *var)
+{
+ bool running;
+ bool err= false;
+
+ DBUG_ASSERT(var->type == OPT_GLOBAL);
+ if (var->value && var->value->is_null())
+ err= true;
+ else
+ {
+ running= give_error_if_slave_running(false);
+ if (running)
+ err= true;
+ }
+ return err;
+}
+
+
+static Sys_var_pluginlist Sys_gtid_pos_auto_engines(
+ "gtid_pos_auto_engines",
+ "List of engines for which to automatically create a "
+ "mysql.gtid_slave_pos_ENGINE table, if a transaction using that engine "
+ "is replicated. This can be used to avoid introducing cross-engine "
+ "transactions, if engines are used different from that used by table "
+ "mysql.gtid_slave_pos",
+ GLOBAL_VAR(opt_gtid_pos_auto_plugins), NO_CMD_LINE,
+ DEFAULT(&gtid_pos_auto_engines),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(check_gtid_pos_auto_engines));
+#endif
+
+
#if defined(ENABLED_DEBUG_SYNC)
/*
Variable can be set for the session only.
diff --git a/sql/sys_vars.ic b/sql/sys_vars.ic
index f9acfb3b657..9b0553015a5 100644
--- a/sql/sys_vars.ic
+++ b/sql/sys_vars.ic
@@ -1535,6 +1535,116 @@ public:
{ return valptr(thd, get_default(thd)); }
};
+/**
+ Class for variables that containg a list of plugins.
+ Currently this is used only for @@gtid_pos_auto_create_engines
+
+ Backing store: plugin_ref
+
+ @note
+ Currently this is only used for storage engine type plugins, and thus only
+ storage engine type plugin is implemented. It could be extended to other
+ plugin types later if needed, similar to Sys_var_plugin.
+
+ These variables don't support command-line equivalents, any such
+ command-line options should be added manually to my_long_options in mysqld.cc
+
+ Note on lifetimes of resources allocated: We allocate a zero-terminated array
+ of plugin_ref*, and lock the contained plugins. The list in the global
+ variable must be freed (with free_engine_list()). However, the way Sys_var
+ works, there is no place to explicitly free other lists, like the one
+ returned from get_default().
+
+ Therefore, the code needs to work with temporary lists, which are
+ registered in the THD to be automatically freed (and plugins similarly
+ automatically unlocked). This is why do_check() allocates a temporary
+ list, from which do_update() then makes a permanent copy.
+*/
+class Sys_var_pluginlist: public sys_var
+{
+ int plugin_type;
+public:
+ Sys_var_pluginlist(const char *name_arg,
+ const char *comment, int flag_args, ptrdiff_t off, size_t size,
+ CMD_LINE getopt,
+ char **def_val, PolyLock *lock=0,
+ enum binlog_status_enum binlog_status_arg=VARIABLE_NOT_IN_BINLOG,
+ on_check_function on_check_func=0,
+ on_update_function on_update_func=0,
+ const char *substitute=0)
+ : sys_var(&all_sys_vars, name_arg, comment, flag_args, off, getopt.id,
+ getopt.arg_type, SHOW_CHAR, (intptr)def_val,
+ lock, binlog_status_arg, on_check_func, on_update_func,
+ substitute)
+ {
+ option.var_type|= GET_STR;
+ SYSVAR_ASSERT(size == sizeof(plugin_ref));
+ SYSVAR_ASSERT(getopt.id < 0); // force NO_CMD_LINE
+ }
+ bool do_check(THD *thd, set_var *var)
+ {
+ char buff[STRING_BUFFER_USUAL_SIZE];
+ String str(buff,sizeof(buff), system_charset_info), *res;
+ plugin_ref *plugins;
+
+ if (!(res=var->value->val_str(&str)))
+ plugins= resolve_engine_list(thd, "", 0, true, true);
+ else
+ plugins= resolve_engine_list(thd, res->ptr(), res->length(), true, true);
+ if (!plugins)
+ return true;
+ var->save_result.plugins= plugins;
+ return false;
+ }
+ void do_update(plugin_ref **valptr, plugin_ref* newval)
+ {
+ plugin_ref *oldval= *valptr;
+ *valptr= copy_engine_list(newval);
+ free_engine_list(oldval);
+ }
+ bool session_update(THD *thd, set_var *var)
+ {
+ do_update((plugin_ref**)session_var_ptr(thd),
+ var->save_result.plugins);
+ return false;
+ }
+ bool global_update(THD *thd, set_var *var)
+ {
+ do_update((plugin_ref**)global_var_ptr(),
+ var->save_result.plugins);
+ return false;
+ }
+ void session_save_default(THD *thd, set_var *var)
+ {
+ plugin_ref* plugins= global_var(plugin_ref *);
+ var->save_result.plugins= plugins ? temp_copy_engine_list(thd, plugins) : 0;
+ }
+ plugin_ref *get_default(THD *thd)
+ {
+ char *default_value= *reinterpret_cast<char**>(option.def_value);
+ if (!default_value)
+ return 0;
+ return resolve_engine_list(thd, default_value, strlen(default_value),
+ false, true);
+ }
+
+ void global_save_default(THD *thd, set_var *var)
+ {
+ var->save_result.plugins= get_default(thd);
+ }
+
+ uchar *valptr(THD *thd, plugin_ref *plugins)
+ {
+ return (uchar*)pretty_print_engine_list(thd, plugins);
+ }
+ uchar *session_value_ptr(THD *thd, const LEX_STRING *base)
+ { return valptr(thd, session_var(thd, plugin_ref*)); }
+ uchar *global_value_ptr(THD *thd, const LEX_STRING *base)
+ { return valptr(thd, global_var(plugin_ref*)); }
+ uchar *default_value_ptr(THD *thd)
+ { return valptr(thd, get_default(thd)); }
+};
+
#if defined(ENABLED_DEBUG_SYNC)
#include "debug_sync.h"