diff options
| author | Kristian Nielsen <knielsen@knielsen-hq.org> | 2017-03-14 12:54:10 +0100 |
|---|---|---|
| committer | Kristian Nielsen <knielsen@knielsen-hq.org> | 2017-04-21 10:30:14 +0200 |
| commit | 6a84473c28af10e072267bc811f280a49bdc8ca8 (patch) | |
| tree | a2c5db3ee0997a1f5a4ee7e8771ecc739a850d0c /sql | |
| parent | 3501a5356e7358c0accfd3465a0c2f66f9b87b70 (diff) | |
| download | mariadb-git-6a84473c28af10e072267bc811f280a49bdc8ca8.tar.gz | |
MDEV-12179: Per-engine mysql.gtid_slave_pos table
Intermediate commit.
This commit implements that record_gtid() selects a gtid_slave_posXXX table
with a storage engine already in use by current transaction, if any.
The default table mysql.gtid_slave_pos is used if no match can be found on
storage engine, or for GTID position updates with no specific storage
engine.
Table discovery of mysql.gtid_slave_pos* happens on initial GTID state load
as well as on every START SLAVE. Some effort is made to make this possible
without additional locking. New tables are added using lock-free atomics.
Removing tables requires stopping all slaves first. A warning is given in
the error log when a table is removed but a non-stopped slave still has a
reference to it.
If multiple mysql.gtid_slave_posXXX tables with same storage engine exist,
one is chosen arbitrarily to be used, with a warning in the error log. GTID
data from all tables is still read, but only one among redundant tables with
same storage engine will be updated.
Diffstat (limited to 'sql')
| -rw-r--r-- | sql/rpl_gtid.cc | 73 | ||||
| -rw-r--r-- | sql/rpl_gtid.h | 25 | ||||
| -rw-r--r-- | sql/rpl_rli.cc | 184 |
3 files changed, 249 insertions, 33 deletions
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc index 9e6f6ec7009..ac3c621a5c1 100644 --- a/sql/rpl_gtid.cc +++ b/sql/rpl_gtid.cc @@ -471,6 +471,48 @@ gtid_check_rpl_slave_state_table(TABLE *table) /* + Attempt to find a mysql.gtid_slave_posXXX table that has a storage engine + that is already in use by the current transaction, if any. +*/ +void +rpl_slave_state::select_gtid_pos_table(THD *thd, LEX_STRING *out_tablename) +{ + struct gtid_pos_table *list, *table_entry, *default_entry; + + /* + See comments on rpl_slave_state::gtid_pos_tables for rules around proper + access to the list. + */ + list= my_atomic_loadptr_explicit(>id_pos_tables, MY_MEMORY_ORDER_ACQUIRE); + + Ha_trx_info *ha_info= thd->transaction.all.ha_list; + while (ha_info) + { + void *trx_hton= ha_info->ht(); + table_entry= list; + while (table_entry) + { + if (table_entry->table_hton == trx_hton) + { + *out_tablename= table_entry->table_name; + return; + } + table_entry= table_entry->next; + } + ha_info= ha_info->next(); + } + /* + If we cannot find any table whose engine matches an engine that is + already active in the transaction, or if there is no current transaction + engines available, we return the default gtid_slave_pos table. + */ + default_entry= my_atomic_loadptr_explicit(&default_gtid_pos_table, + MY_MEMORY_ORDER_ACQUIRE); + *out_tablename= default_entry->table_name; +} + + +/* Write a gtid to the replication slave state table. Do it as part of the transaction, to get slave crash safety, or as a separate @@ -500,6 +542,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, Query_tables_list lex_backup; wait_for_commit* suspended_wfc; void *hton= NULL; + LEX_STRING gtid_pos_table_name; DBUG_ENTER("record_gtid"); *out_hton= NULL; @@ -517,6 +560,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, if (!in_statement) thd->reset_for_next_command(); + select_gtid_pos_table(thd, >id_pos_table_name); DBUG_EXECUTE_IF("gtid_inject_record_gtid", { @@ -547,10 +591,8 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, */ suspended_wfc= thd->suspend_subsequent_commits(); thd->lex->reset_n_backup_query_tables_list(&lex_backup); - tlist.init_one_table(STRING_WITH_LEN("mysql"), - rpl_gtid_slave_state_table_name.str, - rpl_gtid_slave_state_table_name.length, - NULL, TL_WRITE); + tlist.init_one_table(STRING_WITH_LEN("mysql"), gtid_pos_table_name.str, + gtid_pos_table_name.length, NULL, TL_WRITE); if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0))) goto end; table_opened= true; @@ -1168,18 +1210,35 @@ rpl_slave_state::free_gtid_pos_tables(struct rpl_slave_state::gtid_pos_table *li } +/* + Replace the list of available mysql.gtid_slave_posXXX tables with a new list. + The caller must be holding LOCK_slave_state. Additionally, this function + must only be called while all SQL threads are stopped. +*/ void -rpl_slave_state::set_gtid_pos_tables_list(struct rpl_slave_state::gtid_pos_table *new_list) +rpl_slave_state::set_gtid_pos_tables_list(rpl_slave_state::gtid_pos_table *new_list, + rpl_slave_state::gtid_pos_table *default_entry) { - struct gtid_pos_table *old_list; + gtid_pos_table *old_list; mysql_mutex_assert_owner(&LOCK_slave_state); old_list= gtid_pos_tables; - gtid_pos_tables= new_list; + my_atomic_storeptr_explicit(>id_pos_tables, new_list, MY_MEMORY_ORDER_RELEASE); + my_atomic_storeptr_explicit(&default_gtid_pos_table, default_entry, + MY_MEMORY_ORDER_RELEASE); free_gtid_pos_tables(old_list); } +void +rpl_slave_state::add_gtid_pos_table(rpl_slave_state::gtid_pos_table *entry) +{ + mysql_mutex_assert_owner(&LOCK_slave_state); + entry->next= gtid_pos_tables; + my_atomic_storeptr_explicit(>id_pos_tables, entry, MY_MEMORY_ORDER_RELEASE); +} + + struct rpl_slave_state::gtid_pos_table * rpl_slave_state::alloc_gtid_pos_table(LEX_STRING *table_name, void *hton) { diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h index ea278427061..b4f80fe3ddf 100644 --- a/sql/rpl_gtid.h +++ b/sql/rpl_gtid.h @@ -177,7 +177,27 @@ struct rpl_slave_state DYNAMIC_ARRAY gtid_sort_array; uint64 last_sub_id; + /* + List of tables available for durably storing the slave GTID position. + + Accesses to this table is protected by LOCK_slave_state. However for + efficiency, there is also a provision for read access to it from a running + slave without lock. + + An element can be added at the head of a list by storing the new + gtid_pos_tables pointer atomically with release semantics, to ensure that + the next pointer of the new element is visible to readers of the new list. + Other changes (like deleting or replacing elements) must happen only while + all SQL driver threads are stopped. LOCK_slave_state must be held in any + case. + + The list can be read without lock by an SQL driver thread or worker thread + by reading the gtid_pos_tables pointer atomically with acquire semantics, + to ensure that it will see the correct next pointer of a new head element. + */ struct gtid_pos_table *gtid_pos_tables; + /* The default entry in gtid_pos_tables, mysql.gtid_slave_pos. */ + struct gtid_pos_table *default_gtid_pos_table; bool loaded; rpl_slave_state(); @@ -188,6 +208,7 @@ struct rpl_slave_state int update(uint32 domain_id, uint32 server_id, uint64 sub_id, uint64 seq_no, void *hton, rpl_group_info *rgi); int truncate_state_table(THD *thd); + void select_gtid_pos_table(THD *thd, LEX_STRING *out_tablename); int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, bool in_transaction, bool in_statement, void **out_hton); uint64 next_sub_id(uint32 domain_id); @@ -208,7 +229,9 @@ struct rpl_slave_state int record_and_update_gtid(THD *thd, struct rpl_group_info *rgi); int check_duplicate_gtid(rpl_gtid *gtid, rpl_group_info *rgi); void release_domain_owner(rpl_group_info *rgi); - void set_gtid_pos_tables_list(struct gtid_pos_table *new_list); + void set_gtid_pos_tables_list(gtid_pos_table *new_list, + gtid_pos_table *default_entry); + void add_gtid_pos_table(gtid_pos_table *entry); struct gtid_pos_table *alloc_gtid_pos_table(LEX_STRING *table_name, void *hton); void free_gtid_pos_tables(struct gtid_pos_table *list); }; diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 74d831db6dc..74e4be524b9 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -1647,24 +1647,70 @@ struct load_gtid_state_cb_data { HASH *hash; DYNAMIC_ARRAY *array; struct rpl_slave_state::gtid_pos_table *table_list; + struct rpl_slave_state::gtid_pos_table *default_entry; }; static int +process_gtid_pos_table(THD *thd, LEX_STRING *table_name, void *hton, + struct load_gtid_state_cb_data *data) +{ + struct rpl_slave_state::gtid_pos_table *p, *entry, **next_ptr; + bool is_default= + (strcmp(table_name->str, rpl_gtid_slave_state_table_name.str) == 0); + + /* + Ignore tables with duplicate storage engine, with a warning. + Prefer the default mysql.gtid_slave_pos over another table + mysql.gtid_slave_posXXX with the same storage engine. + */ + next_ptr= &data->table_list; + entry= data->table_list; + while (entry) + { + if (entry->table_hton == hton) + { + static const char *warning_msg= "Ignoring redundant table mysql.%s " + "since mysql.%s has the same storage engine"; + if (!is_default) + { + /* Ignore the redundant table. */ + sql_print_warning(warning_msg, table_name->str, entry->table_name); + return 0; + } + else + { + sql_print_warning(warning_msg, entry->table_name, table_name->str); + /* Delete the redundant table, and proceed to add this one instead. */ + *next_ptr= entry->next; + my_free(entry); + break; + } + } + next_ptr= &entry->next; + entry= entry->next; + } + + if (!(p= rpl_global_gtid_slave_state->alloc_gtid_pos_table(table_name, hton))) + return 1; + p->next= data->table_list; + data->table_list= p; + if (is_default) + data->default_entry= p; + return 0; +} + + +static int load_gtid_state_cb(THD *thd, LEX_STRING *table_name, void *arg) { int err; load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg); - struct rpl_slave_state::gtid_pos_table *p; void *hton; if ((err= scan_one_gtid_slave_pos_table(thd, data->hash, data->array, table_name, &hton))) return err; - if (!(p= rpl_global_gtid_slave_state->alloc_gtid_pos_table(table_name, hton))) - return 1; - p->next= data->table_list; - data->table_list= p; - return 0; + return process_gtid_pos_table(thd, table_name, hton, data); } @@ -1687,6 +1733,7 @@ rpl_load_gtid_slave_state(THD *thd) DBUG_RETURN(0); cb_data.table_list= NULL; + cb_data.default_entry= NULL; my_hash_init(&hash, &my_charset_bin, 32, offsetof(gtid_pos_element, gtid) + offsetof(rpl_gtid, domain_id), sizeof(uint32), NULL, my_free, HASH_UNIQUE); @@ -1706,6 +1753,23 @@ rpl_load_gtid_slave_state(THD *thd) goto end; } + if (!cb_data.table_list) + { + my_error(ER_NO_SUCH_TABLE, MYF(0), "mysql", + rpl_gtid_slave_state_table_name.str); + err= 1; + goto end; + } + else if (!cb_data.default_entry) + { + /* + If the mysql.gtid_slave_pos table does not exist, but at least one other + table is available, arbitrarily pick the first in the list to use as + default. + */ + cb_data.default_entry= cb_data.table_list; + } + for (i= 0; i < array.elements; ++i) { get_dynamic(&array, (uchar *)&tmp_entry, i); @@ -1735,7 +1799,8 @@ rpl_load_gtid_slave_state(THD *thd) } } - rpl_global_gtid_slave_state->set_gtid_pos_tables_list(cb_data.table_list); + rpl_global_gtid_slave_state->set_gtid_pos_tables_list(cb_data.table_list, + cb_data.default_entry); cb_data.table_list= NULL; rpl_global_gtid_slave_state->loaded= true; mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); @@ -1753,12 +1818,10 @@ end: static int find_gtid_pos_tables_cb(THD *thd, LEX_STRING *table_name, void *arg) { - struct rpl_slave_state::gtid_pos_table **table_list_ptr= - static_cast<struct rpl_slave_state::gtid_pos_table **>(arg); + load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg); TABLE_LIST tlist; TABLE *table= NULL; int err; - struct rpl_slave_state::gtid_pos_table *p; thd->reset_for_next_command(); tlist.init_one_table(STRING_WITH_LEN("mysql"), table_name->str, @@ -1769,14 +1832,7 @@ find_gtid_pos_tables_cb(THD *thd, LEX_STRING *table_name, void *arg) if ((err= gtid_check_rpl_slave_state_table(table))) goto end; - - if (!(p= rpl_global_gtid_slave_state->alloc_gtid_pos_table(table_name, table->s->db_type()))) - err= 1; - else - { - p->next= *table_list_ptr; - *table_list_ptr= p; - } + err= process_gtid_pos_table(thd, table_name, table->s->db_type(), data); end: if (table) @@ -1801,7 +1857,8 @@ int find_gtid_slave_pos_tables(THD *thd) { int err= 0; - struct rpl_slave_state::gtid_pos_table *table_list; + load_gtid_state_cb_data cb_data; + bool any_running; mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); bool loaded= rpl_global_gtid_slave_state->loaded; @@ -1809,18 +1866,95 @@ find_gtid_slave_pos_tables(THD *thd) if (!loaded) return 0; - table_list= NULL; - if ((err= scan_all_gtid_slave_pos_table(thd, find_gtid_pos_tables_cb, &table_list))) + cb_data.table_list= NULL; + cb_data.default_entry= NULL; + if ((err= scan_all_gtid_slave_pos_table(thd, find_gtid_pos_tables_cb, &cb_data))) + goto end; + + if (!cb_data.table_list) + { + my_error(ER_NO_SUCH_TABLE, MYF(0), "mysql", + rpl_gtid_slave_state_table_name.str); + err= 1; goto end; + } + else if (!cb_data.default_entry) + { + /* + If the mysql.gtid_slave_pos table does not exist, but at least one other + table is available, arbitrarily pick the first in the list to use as + default. + */ + cb_data.default_entry= cb_data.table_list; + } + any_running= any_slave_sql_running(); mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); - rpl_global_gtid_slave_state->set_gtid_pos_tables_list(table_list); - table_list= NULL; + if (!any_running) + { + rpl_global_gtid_slave_state->set_gtid_pos_tables_list(cb_data.table_list, + cb_data.default_entry); + cb_data.table_list= NULL; + } + else + { + /* + If there are SQL threads running, we cannot safely remove the old list. + However we can add new entries, and warn about any tables that + disappeared, but may still be visible to running SQL threads. + */ + rpl_slave_state::gtid_pos_table *old_entry, *new_entry, **next_ptr_ptr; + + old_entry= rpl_global_gtid_slave_state->gtid_pos_tables; + while (old_entry) + { + new_entry= cb_data.table_list; + while (new_entry) + { + if (new_entry->table_hton == old_entry->table_hton) + break; + new_entry= new_entry->next; + } + if (!new_entry) + sql_print_warning("The table mysql.%s was removed. " + "This change will not take full effect " + "until all SQL threads have been restarted", + old_entry->table_name.str); + old_entry= old_entry->next; + } + next_ptr_ptr= &cb_data.table_list; + new_entry= cb_data.table_list; + while (new_entry) + { + /* Check if we already have a table with this storage engine. */ + old_entry= rpl_global_gtid_slave_state->gtid_pos_tables; + while (old_entry) + { + if (new_entry->table_hton == old_entry->table_hton) + break; + old_entry= old_entry->next; + } + if (old_entry) + { + /* This new_entry is already available in the list. */ + next_ptr_ptr= &new_entry->next; + new_entry= new_entry->next; + } + else + { + /* Move this new_entry to the list. */ + rpl_slave_state::gtid_pos_table *next= new_entry->next; + rpl_global_gtid_slave_state->add_gtid_pos_table(new_entry); + *next_ptr_ptr= next; + new_entry= next; + } + } + } mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); end: - if (table_list) - rpl_global_gtid_slave_state->free_gtid_pos_tables(table_list); + if (cb_data.table_list) + rpl_global_gtid_slave_state->free_gtid_pos_tables(cb_data.table_list); return err; } |
