summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc226
1 files changed, 224 insertions, 2 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index a7f0f003e5c..67785be6255 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -60,6 +60,7 @@
#include "rpl_tblmap.h"
#include "debug_sync.h"
#include "rpl_parallel.h"
+#include "sql_show.h"
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
@@ -279,15 +280,180 @@ static void init_slave_psi_keys(void)
#endif /* HAVE_PSI_INTERFACE */
+/*
+ Note: This definition needs to be kept in sync with the one in
+ mysql_system_tables.sql which is used by mysql_create_db.
+*/
+static const char gtid_pos_table_definition1[]=
+ "CREATE TABLE ";
+static const char gtid_pos_table_definition2[]=
+ " (domain_id INT UNSIGNED NOT NULL, "
+ "sub_id BIGINT UNSIGNED NOT NULL, "
+ "server_id INT UNSIGNED NOT NULL, "
+ "seq_no BIGINT UNSIGNED NOT NULL, "
+ "PRIMARY KEY (domain_id, sub_id)) CHARSET=latin1 "
+ "COMMENT='Replication slave GTID position' "
+ "ENGINE=";
+
+/*
+ Build a query string
+ CREATE TABLE mysql.gtid_slave_pos_<engine> ... ENGINE=<engine>
+*/
+static bool
+build_gtid_pos_create_query(THD *thd, String *query,
+ LEX_STRING *table_name,
+ LEX_STRING *engine_name)
+{
+ bool err= false;
+ err|= query->append(gtid_pos_table_definition1);
+ err|= append_identifier(thd, query, table_name->str, table_name->length);
+ err|= query->append(gtid_pos_table_definition2);
+ err|= append_identifier(thd, query, engine_name->str, engine_name->length);
+ return err;
+}
+
+
+static int
+gtid_pos_table_creation(THD *thd, plugin_ref engine, LEX_STRING *table_name)
+{
+ int err;
+ StringBuffer<sizeof(gtid_pos_table_definition1) +
+ sizeof(gtid_pos_table_definition1) +
+ 2*FN_REFLEN> query;
+
+ if (build_gtid_pos_create_query(thd, &query, table_name, plugin_name(engine)))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return 1;
+ }
+
+ thd->set_db("mysql", 5);
+ thd->clear_error();
+ ulonglong thd_saved_option= thd->variables.option_bits;
+ /* This query shuold not be binlogged. */
+ thd->variables.option_bits&= ~(ulonglong)OPTION_BIN_LOG;
+ thd->set_query_and_id(query.c_ptr(), query.length(), thd->charset(),
+ next_query_id());
+ Parser_state parser_state;
+ err= parser_state.init(thd, thd->query(), thd->query_length());
+ if (err)
+ goto end;
+ mysql_parse(thd, thd->query(), thd->query_length(), &parser_state,
+ FALSE, FALSE);
+ if (thd->is_error())
+ err= 1;
+end:
+ thd->variables.option_bits= thd_saved_option;
+ thd->reset_query();
+ return err;
+}
+
+
+static void
+handle_gtid_pos_auto_create_request(THD *thd, void *hton)
+{
+ int err;
+ plugin_ref engine= NULL, *auto_engines;
+ rpl_slave_state::gtid_pos_table *entry;
+ StringBuffer<FN_REFLEN> loc_table_name;
+ LEX_STRING table_name;
+
+ /*
+ Check that the plugin is still in @@gtid_pos_auto_engines, and lock
+ it.
+ */
+ mysql_mutex_lock(&LOCK_global_system_variables);
+ engine= NULL;
+ for (auto_engines= opt_gtid_pos_auto_plugins;
+ auto_engines && *auto_engines;
+ ++auto_engines)
+ {
+ if (plugin_hton(*auto_engines) == hton)
+ {
+ engine= my_plugin_lock(NULL, *auto_engines);
+ break;
+ }
+ }
+ mysql_mutex_unlock(&LOCK_global_system_variables);
+ if (!engine)
+ {
+ /* The engine is gone from @@gtid_pos_auto_engines, so no action. */
+ goto end;
+ }
+
+ /* Find the entry for the table to auto-create. */
+ mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ entry= (rpl_slave_state::gtid_pos_table *)
+ rpl_global_gtid_slave_state->gtid_pos_tables;
+ while (entry)
+ {
+ if (entry->table_hton == hton &&
+ entry->state == rpl_slave_state::GTID_POS_CREATE_REQUESTED)
+ break;
+ entry= entry->next;
+ }
+ if (entry)
+ {
+ entry->state = rpl_slave_state::GTID_POS_CREATE_IN_PROGRESS;
+ err= loc_table_name.append(entry->table_name.str, entry->table_name.length);
+ }
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ if (!entry)
+ goto end;
+ if (err)
+ {
+ sql_print_error("Out of memory while trying to auto-create GTID position table");
+ goto end;
+ }
+ table_name.str= loc_table_name.c_ptr_safe();
+ table_name.length= loc_table_name.length();
+
+ err= gtid_pos_table_creation(thd, engine, &table_name);
+ if (err)
+ {
+ sql_print_error("Error auto-creating GTID position table `mysql.%s`: %s Error_code: %d",
+ table_name.str, thd->get_stmt_da()->message(),
+ thd->get_stmt_da()->sql_errno());
+ thd->clear_error();
+ goto end;
+ }
+
+ /* Now enable the entry for the auto-created table. */
+ mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ entry= (rpl_slave_state::gtid_pos_table *)
+ rpl_global_gtid_slave_state->gtid_pos_tables;
+ while (entry)
+ {
+ if (entry->table_hton == hton &&
+ entry->state == rpl_slave_state::GTID_POS_CREATE_IN_PROGRESS)
+ {
+ entry->state= rpl_slave_state::GTID_POS_AVAILABLE;
+ break;
+ }
+ entry= entry->next;
+ }
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+
+end:
+ if (engine)
+ plugin_unlock(NULL, engine);
+}
+
+
static bool slave_background_thread_running;
static bool slave_background_thread_stop;
static bool slave_background_thread_gtid_loaded;
-struct slave_background_kill_t {
+static struct slave_background_kill_t {
slave_background_kill_t *next;
THD *to_kill;
} *slave_background_kill_list;
+static struct slave_background_gtid_pos_create_t {
+ slave_background_gtid_pos_create_t *next;
+ void *hton;
+} *slave_background_gtid_pos_create_list;
+
pthread_handler_t
handle_slave_background(void *arg __attribute__((unused)))
@@ -321,6 +487,7 @@ handle_slave_background(void *arg __attribute__((unused)))
do
{
slave_background_kill_t *kill_list;
+ slave_background_gtid_pos_create_t *create_list;
thd->ENTER_COND(&COND_slave_background, &LOCK_slave_background,
&stage_slave_background_wait_request,
@@ -329,12 +496,14 @@ handle_slave_background(void *arg __attribute__((unused)))
{
stop= abort_loop || thd->killed || slave_background_thread_stop;
kill_list= slave_background_kill_list;
- if (stop || kill_list)
+ create_list= slave_background_gtid_pos_create_list;
+ if (stop || kill_list || create_list)
break;
mysql_cond_wait(&COND_slave_background, &LOCK_slave_background);
}
slave_background_kill_list= NULL;
+ slave_background_gtid_pos_create_list= NULL;
thd->EXIT_COND(&old_stage);
while (kill_list)
@@ -353,6 +522,16 @@ handle_slave_background(void *arg __attribute__((unused)))
mysql_mutex_unlock(&to_kill->LOCK_wakeup_ready);
my_free(p);
}
+
+ while (create_list)
+ {
+ slave_background_gtid_pos_create_t *next= create_list->next;
+ void *hton= create_list->hton;
+ handle_gtid_pos_auto_create_request(thd, hton);
+ my_free(create_list);
+ create_list= next;
+ }
+
mysql_mutex_lock(&LOCK_slave_background);
} while (!stop);
@@ -392,6 +571,41 @@ slave_background_kill_request(THD *to_kill)
/*
+ This function must only be called from a slave SQL thread (or worker thread),
+ to ensure that the table_entry will not go away before we can lock the
+ LOCK_slave_state.
+*/
+void
+slave_background_gtid_pos_create_request(
+ rpl_slave_state::gtid_pos_table *table_entry)
+{
+ slave_background_gtid_pos_create_t *p;
+
+ if (table_entry->state != rpl_slave_state::GTID_POS_AUTO_CREATE)
+ return;
+ p= (slave_background_gtid_pos_create_t *)my_malloc(sizeof(*p), MYF(MY_WME));
+ if (!p)
+ return;
+ mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ if (table_entry->state != rpl_slave_state::GTID_POS_AUTO_CREATE)
+ {
+ my_free(p);
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ return;
+ }
+ table_entry->state= rpl_slave_state::GTID_POS_CREATE_REQUESTED;
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+
+ p->hton= table_entry->table_hton;
+ mysql_mutex_lock(&LOCK_slave_background);
+ p->next= slave_background_gtid_pos_create_list;
+ slave_background_gtid_pos_create_list= p;
+ mysql_cond_signal(&COND_slave_background);
+ mysql_mutex_unlock(&LOCK_slave_background);
+}
+
+
+/*
Start the slave background thread.
This thread is currently used for two purposes:
@@ -5065,6 +5279,14 @@ pthread_handler_t handle_slave_sql(void *arg)
if (mi->using_gtid != Master_info::USE_GTID_NO || opt_gtid_strict_mode)
goto err;
}
+ /* Re-load the set of mysql.gtid_slave_posXXX tables available. */
+ if (find_gtid_slave_pos_tables(thd))
+ {
+ rli->report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(), NULL,
+ "Error processing replication GTID position tables: %s",
+ thd->get_stmt_da()->message());
+ goto err;
+ }
/* execute init_slave variable */
if (opt_init_slave.length)