summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorKristian Nielsen <knielsen@knielsen-hq.org>2017-03-14 12:54:10 +0100
committerKristian Nielsen <knielsen@knielsen-hq.org>2017-04-21 10:30:14 +0200
commit6a84473c28af10e072267bc811f280a49bdc8ca8 (patch)
treea2c5db3ee0997a1f5a4ee7e8771ecc739a850d0c /sql
parent3501a5356e7358c0accfd3465a0c2f66f9b87b70 (diff)
downloadmariadb-git-6a84473c28af10e072267bc811f280a49bdc8ca8.tar.gz
MDEV-12179: Per-engine mysql.gtid_slave_pos table
Intermediate commit. This commit implements that record_gtid() selects a gtid_slave_posXXX table with a storage engine already in use by current transaction, if any. The default table mysql.gtid_slave_pos is used if no match can be found on storage engine, or for GTID position updates with no specific storage engine. Table discovery of mysql.gtid_slave_pos* happens on initial GTID state load as well as on every START SLAVE. Some effort is made to make this possible without additional locking. New tables are added using lock-free atomics. Removing tables requires stopping all slaves first. A warning is given in the error log when a table is removed but a non-stopped slave still has a reference to it. If multiple mysql.gtid_slave_posXXX tables with same storage engine exist, one is chosen arbitrarily to be used, with a warning in the error log. GTID data from all tables is still read, but only one among redundant tables with same storage engine will be updated.
Diffstat (limited to 'sql')
-rw-r--r--sql/rpl_gtid.cc73
-rw-r--r--sql/rpl_gtid.h25
-rw-r--r--sql/rpl_rli.cc184
3 files changed, 249 insertions, 33 deletions
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index 9e6f6ec7009..ac3c621a5c1 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -471,6 +471,48 @@ gtid_check_rpl_slave_state_table(TABLE *table)
/*
+ Attempt to find a mysql.gtid_slave_posXXX table that has a storage engine
+ that is already in use by the current transaction, if any.
+*/
+void
+rpl_slave_state::select_gtid_pos_table(THD *thd, LEX_STRING *out_tablename)
+{
+ struct gtid_pos_table *list, *table_entry, *default_entry;
+
+ /*
+ See comments on rpl_slave_state::gtid_pos_tables for rules around proper
+ access to the list.
+ */
+ list= my_atomic_loadptr_explicit(&gtid_pos_tables, MY_MEMORY_ORDER_ACQUIRE);
+
+ Ha_trx_info *ha_info= thd->transaction.all.ha_list;
+ while (ha_info)
+ {
+ void *trx_hton= ha_info->ht();
+ table_entry= list;
+ while (table_entry)
+ {
+ if (table_entry->table_hton == trx_hton)
+ {
+ *out_tablename= table_entry->table_name;
+ return;
+ }
+ table_entry= table_entry->next;
+ }
+ ha_info= ha_info->next();
+ }
+ /*
+ If we cannot find any table whose engine matches an engine that is
+ already active in the transaction, or if there is no current transaction
+ engines available, we return the default gtid_slave_pos table.
+ */
+ default_entry= my_atomic_loadptr_explicit(&default_gtid_pos_table,
+ MY_MEMORY_ORDER_ACQUIRE);
+ *out_tablename= default_entry->table_name;
+}
+
+
+/*
Write a gtid to the replication slave state table.
Do it as part of the transaction, to get slave crash safety, or as a separate
@@ -500,6 +542,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
Query_tables_list lex_backup;
wait_for_commit* suspended_wfc;
void *hton= NULL;
+ LEX_STRING gtid_pos_table_name;
DBUG_ENTER("record_gtid");
*out_hton= NULL;
@@ -517,6 +560,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
if (!in_statement)
thd->reset_for_next_command();
+ select_gtid_pos_table(thd, &gtid_pos_table_name);
DBUG_EXECUTE_IF("gtid_inject_record_gtid",
{
@@ -547,10 +591,8 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
*/
suspended_wfc= thd->suspend_subsequent_commits();
thd->lex->reset_n_backup_query_tables_list(&lex_backup);
- tlist.init_one_table(STRING_WITH_LEN("mysql"),
- rpl_gtid_slave_state_table_name.str,
- rpl_gtid_slave_state_table_name.length,
- NULL, TL_WRITE);
+ tlist.init_one_table(STRING_WITH_LEN("mysql"), gtid_pos_table_name.str,
+ gtid_pos_table_name.length, NULL, TL_WRITE);
if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
goto end;
table_opened= true;
@@ -1168,18 +1210,35 @@ rpl_slave_state::free_gtid_pos_tables(struct rpl_slave_state::gtid_pos_table *li
}
+/*
+ Replace the list of available mysql.gtid_slave_posXXX tables with a new list.
+ The caller must be holding LOCK_slave_state. Additionally, this function
+ must only be called while all SQL threads are stopped.
+*/
void
-rpl_slave_state::set_gtid_pos_tables_list(struct rpl_slave_state::gtid_pos_table *new_list)
+rpl_slave_state::set_gtid_pos_tables_list(rpl_slave_state::gtid_pos_table *new_list,
+ rpl_slave_state::gtid_pos_table *default_entry)
{
- struct gtid_pos_table *old_list;
+ gtid_pos_table *old_list;
mysql_mutex_assert_owner(&LOCK_slave_state);
old_list= gtid_pos_tables;
- gtid_pos_tables= new_list;
+ my_atomic_storeptr_explicit(&gtid_pos_tables, new_list, MY_MEMORY_ORDER_RELEASE);
+ my_atomic_storeptr_explicit(&default_gtid_pos_table, default_entry,
+ MY_MEMORY_ORDER_RELEASE);
free_gtid_pos_tables(old_list);
}
+void
+rpl_slave_state::add_gtid_pos_table(rpl_slave_state::gtid_pos_table *entry)
+{
+ mysql_mutex_assert_owner(&LOCK_slave_state);
+ entry->next= gtid_pos_tables;
+ my_atomic_storeptr_explicit(&gtid_pos_tables, entry, MY_MEMORY_ORDER_RELEASE);
+}
+
+
struct rpl_slave_state::gtid_pos_table *
rpl_slave_state::alloc_gtid_pos_table(LEX_STRING *table_name, void *hton)
{
diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h
index ea278427061..b4f80fe3ddf 100644
--- a/sql/rpl_gtid.h
+++ b/sql/rpl_gtid.h
@@ -177,7 +177,27 @@ struct rpl_slave_state
DYNAMIC_ARRAY gtid_sort_array;
uint64 last_sub_id;
+ /*
+ List of tables available for durably storing the slave GTID position.
+
+ Accesses to this table is protected by LOCK_slave_state. However for
+ efficiency, there is also a provision for read access to it from a running
+ slave without lock.
+
+ An element can be added at the head of a list by storing the new
+ gtid_pos_tables pointer atomically with release semantics, to ensure that
+ the next pointer of the new element is visible to readers of the new list.
+ Other changes (like deleting or replacing elements) must happen only while
+ all SQL driver threads are stopped. LOCK_slave_state must be held in any
+ case.
+
+ The list can be read without lock by an SQL driver thread or worker thread
+ by reading the gtid_pos_tables pointer atomically with acquire semantics,
+ to ensure that it will see the correct next pointer of a new head element.
+ */
struct gtid_pos_table *gtid_pos_tables;
+ /* The default entry in gtid_pos_tables, mysql.gtid_slave_pos. */
+ struct gtid_pos_table *default_gtid_pos_table;
bool loaded;
rpl_slave_state();
@@ -188,6 +208,7 @@ struct rpl_slave_state
int update(uint32 domain_id, uint32 server_id, uint64 sub_id,
uint64 seq_no, void *hton, rpl_group_info *rgi);
int truncate_state_table(THD *thd);
+ void select_gtid_pos_table(THD *thd, LEX_STRING *out_tablename);
int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
bool in_transaction, bool in_statement, void **out_hton);
uint64 next_sub_id(uint32 domain_id);
@@ -208,7 +229,9 @@ struct rpl_slave_state
int record_and_update_gtid(THD *thd, struct rpl_group_info *rgi);
int check_duplicate_gtid(rpl_gtid *gtid, rpl_group_info *rgi);
void release_domain_owner(rpl_group_info *rgi);
- void set_gtid_pos_tables_list(struct gtid_pos_table *new_list);
+ void set_gtid_pos_tables_list(gtid_pos_table *new_list,
+ gtid_pos_table *default_entry);
+ void add_gtid_pos_table(gtid_pos_table *entry);
struct gtid_pos_table *alloc_gtid_pos_table(LEX_STRING *table_name, void *hton);
void free_gtid_pos_tables(struct gtid_pos_table *list);
};
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 74d831db6dc..74e4be524b9 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -1647,24 +1647,70 @@ struct load_gtid_state_cb_data {
HASH *hash;
DYNAMIC_ARRAY *array;
struct rpl_slave_state::gtid_pos_table *table_list;
+ struct rpl_slave_state::gtid_pos_table *default_entry;
};
static int
+process_gtid_pos_table(THD *thd, LEX_STRING *table_name, void *hton,
+ struct load_gtid_state_cb_data *data)
+{
+ struct rpl_slave_state::gtid_pos_table *p, *entry, **next_ptr;
+ bool is_default=
+ (strcmp(table_name->str, rpl_gtid_slave_state_table_name.str) == 0);
+
+ /*
+ Ignore tables with duplicate storage engine, with a warning.
+ Prefer the default mysql.gtid_slave_pos over another table
+ mysql.gtid_slave_posXXX with the same storage engine.
+ */
+ next_ptr= &data->table_list;
+ entry= data->table_list;
+ while (entry)
+ {
+ if (entry->table_hton == hton)
+ {
+ static const char *warning_msg= "Ignoring redundant table mysql.%s "
+ "since mysql.%s has the same storage engine";
+ if (!is_default)
+ {
+ /* Ignore the redundant table. */
+ sql_print_warning(warning_msg, table_name->str, entry->table_name);
+ return 0;
+ }
+ else
+ {
+ sql_print_warning(warning_msg, entry->table_name, table_name->str);
+ /* Delete the redundant table, and proceed to add this one instead. */
+ *next_ptr= entry->next;
+ my_free(entry);
+ break;
+ }
+ }
+ next_ptr= &entry->next;
+ entry= entry->next;
+ }
+
+ if (!(p= rpl_global_gtid_slave_state->alloc_gtid_pos_table(table_name, hton)))
+ return 1;
+ p->next= data->table_list;
+ data->table_list= p;
+ if (is_default)
+ data->default_entry= p;
+ return 0;
+}
+
+
+static int
load_gtid_state_cb(THD *thd, LEX_STRING *table_name, void *arg)
{
int err;
load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg);
- struct rpl_slave_state::gtid_pos_table *p;
void *hton;
if ((err= scan_one_gtid_slave_pos_table(thd, data->hash, data->array,
table_name, &hton)))
return err;
- if (!(p= rpl_global_gtid_slave_state->alloc_gtid_pos_table(table_name, hton)))
- return 1;
- p->next= data->table_list;
- data->table_list= p;
- return 0;
+ return process_gtid_pos_table(thd, table_name, hton, data);
}
@@ -1687,6 +1733,7 @@ rpl_load_gtid_slave_state(THD *thd)
DBUG_RETURN(0);
cb_data.table_list= NULL;
+ cb_data.default_entry= NULL;
my_hash_init(&hash, &my_charset_bin, 32,
offsetof(gtid_pos_element, gtid) + offsetof(rpl_gtid, domain_id),
sizeof(uint32), NULL, my_free, HASH_UNIQUE);
@@ -1706,6 +1753,23 @@ rpl_load_gtid_slave_state(THD *thd)
goto end;
}
+ if (!cb_data.table_list)
+ {
+ my_error(ER_NO_SUCH_TABLE, MYF(0), "mysql",
+ rpl_gtid_slave_state_table_name.str);
+ err= 1;
+ goto end;
+ }
+ else if (!cb_data.default_entry)
+ {
+ /*
+ If the mysql.gtid_slave_pos table does not exist, but at least one other
+ table is available, arbitrarily pick the first in the list to use as
+ default.
+ */
+ cb_data.default_entry= cb_data.table_list;
+ }
+
for (i= 0; i < array.elements; ++i)
{
get_dynamic(&array, (uchar *)&tmp_entry, i);
@@ -1735,7 +1799,8 @@ rpl_load_gtid_slave_state(THD *thd)
}
}
- rpl_global_gtid_slave_state->set_gtid_pos_tables_list(cb_data.table_list);
+ rpl_global_gtid_slave_state->set_gtid_pos_tables_list(cb_data.table_list,
+ cb_data.default_entry);
cb_data.table_list= NULL;
rpl_global_gtid_slave_state->loaded= true;
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
@@ -1753,12 +1818,10 @@ end:
static int
find_gtid_pos_tables_cb(THD *thd, LEX_STRING *table_name, void *arg)
{
- struct rpl_slave_state::gtid_pos_table **table_list_ptr=
- static_cast<struct rpl_slave_state::gtid_pos_table **>(arg);
+ load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg);
TABLE_LIST tlist;
TABLE *table= NULL;
int err;
- struct rpl_slave_state::gtid_pos_table *p;
thd->reset_for_next_command();
tlist.init_one_table(STRING_WITH_LEN("mysql"), table_name->str,
@@ -1769,14 +1832,7 @@ find_gtid_pos_tables_cb(THD *thd, LEX_STRING *table_name, void *arg)
if ((err= gtid_check_rpl_slave_state_table(table)))
goto end;
-
- if (!(p= rpl_global_gtid_slave_state->alloc_gtid_pos_table(table_name, table->s->db_type())))
- err= 1;
- else
- {
- p->next= *table_list_ptr;
- *table_list_ptr= p;
- }
+ err= process_gtid_pos_table(thd, table_name, table->s->db_type(), data);
end:
if (table)
@@ -1801,7 +1857,8 @@ int
find_gtid_slave_pos_tables(THD *thd)
{
int err= 0;
- struct rpl_slave_state::gtid_pos_table *table_list;
+ load_gtid_state_cb_data cb_data;
+ bool any_running;
mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
bool loaded= rpl_global_gtid_slave_state->loaded;
@@ -1809,18 +1866,95 @@ find_gtid_slave_pos_tables(THD *thd)
if (!loaded)
return 0;
- table_list= NULL;
- if ((err= scan_all_gtid_slave_pos_table(thd, find_gtid_pos_tables_cb, &table_list)))
+ cb_data.table_list= NULL;
+ cb_data.default_entry= NULL;
+ if ((err= scan_all_gtid_slave_pos_table(thd, find_gtid_pos_tables_cb, &cb_data)))
+ goto end;
+
+ if (!cb_data.table_list)
+ {
+ my_error(ER_NO_SUCH_TABLE, MYF(0), "mysql",
+ rpl_gtid_slave_state_table_name.str);
+ err= 1;
goto end;
+ }
+ else if (!cb_data.default_entry)
+ {
+ /*
+ If the mysql.gtid_slave_pos table does not exist, but at least one other
+ table is available, arbitrarily pick the first in the list to use as
+ default.
+ */
+ cb_data.default_entry= cb_data.table_list;
+ }
+ any_running= any_slave_sql_running();
mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
- rpl_global_gtid_slave_state->set_gtid_pos_tables_list(table_list);
- table_list= NULL;
+ if (!any_running)
+ {
+ rpl_global_gtid_slave_state->set_gtid_pos_tables_list(cb_data.table_list,
+ cb_data.default_entry);
+ cb_data.table_list= NULL;
+ }
+ else
+ {
+ /*
+ If there are SQL threads running, we cannot safely remove the old list.
+ However we can add new entries, and warn about any tables that
+ disappeared, but may still be visible to running SQL threads.
+ */
+ rpl_slave_state::gtid_pos_table *old_entry, *new_entry, **next_ptr_ptr;
+
+ old_entry= rpl_global_gtid_slave_state->gtid_pos_tables;
+ while (old_entry)
+ {
+ new_entry= cb_data.table_list;
+ while (new_entry)
+ {
+ if (new_entry->table_hton == old_entry->table_hton)
+ break;
+ new_entry= new_entry->next;
+ }
+ if (!new_entry)
+ sql_print_warning("The table mysql.%s was removed. "
+ "This change will not take full effect "
+ "until all SQL threads have been restarted",
+ old_entry->table_name.str);
+ old_entry= old_entry->next;
+ }
+ next_ptr_ptr= &cb_data.table_list;
+ new_entry= cb_data.table_list;
+ while (new_entry)
+ {
+ /* Check if we already have a table with this storage engine. */
+ old_entry= rpl_global_gtid_slave_state->gtid_pos_tables;
+ while (old_entry)
+ {
+ if (new_entry->table_hton == old_entry->table_hton)
+ break;
+ old_entry= old_entry->next;
+ }
+ if (old_entry)
+ {
+ /* This new_entry is already available in the list. */
+ next_ptr_ptr= &new_entry->next;
+ new_entry= new_entry->next;
+ }
+ else
+ {
+ /* Move this new_entry to the list. */
+ rpl_slave_state::gtid_pos_table *next= new_entry->next;
+ rpl_global_gtid_slave_state->add_gtid_pos_table(new_entry);
+ *next_ptr_ptr= next;
+ new_entry= next;
+ }
+ }
+ }
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
end:
- if (table_list)
- rpl_global_gtid_slave_state->free_gtid_pos_tables(table_list);
+ if (cb_data.table_list)
+ rpl_global_gtid_slave_state->free_gtid_pos_tables(cb_data.table_list);
return err;
}