summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
authorOleksandr Byelkin <sanja@mariadb.com>2021-02-02 17:55:53 +0100
committerOleksandr Byelkin <sanja@mariadb.com>2021-02-02 17:55:53 +0100
commit0f810b58faf487c7cd59f2ee520f62deb0f712eb (patch)
treedfb52d41ffd83d0dbffb969c091fe190b9457a24 /sql/slave.cc
parent542d769ea1a22a7a6a87c9fe76ff911a162ade44 (diff)
parent251b52190070095e4c65ffb0ae545d49330a02b2 (diff)
downloadmariadb-git-10.4-merge-attempt.tar.gz
Merge branch 'bb-10.3-release' into bb-10.4-release10.4-merge-attempt
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc242
1 files changed, 197 insertions, 45 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index 9d4049c6452..7e261dc0233 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -64,6 +64,7 @@
#include "rpl_parallel.h"
#include "sql_show.h"
#include "semisync_slave.h"
+#include "sql_manager.h"
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
@@ -363,9 +364,9 @@ end:
}
-static void
-handle_gtid_pos_auto_create_request(THD *thd, void *hton)
+static void bg_gtid_pos_auto_create(void *hton)
{
+ THD *thd= NULL;
int UNINIT_VAR(err);
plugin_ref engine= NULL, *auto_engines;
rpl_slave_state::gtid_pos_table *entry;
@@ -377,7 +378,6 @@ handle_gtid_pos_auto_create_request(THD *thd, void *hton)
it.
*/
mysql_mutex_lock(&LOCK_global_system_variables);
- engine= NULL;
for (auto_engines= opt_gtid_pos_auto_plugins;
auto_engines && *auto_engines;
++auto_engines)
@@ -422,6 +422,13 @@ handle_gtid_pos_auto_create_request(THD *thd, void *hton)
table_name.str= loc_table_name.c_ptr_safe();
table_name.length= loc_table_name.length();
+ thd= new THD(next_thread_id());
+ thd->thread_stack= (char*) &thd; /* Set approximate stack start */
+ thd->system_thread = SYSTEM_THREAD_SLAVE_BACKGROUND;
+ thd->store_globals();
+ thd->security_ctx->skip_grants();
+ thd->set_command(COM_DAEMON);
+ thd->variables.wsrep_on= 0;
err= gtid_pos_table_creation(thd, engine, &table_name);
if (err)
{
@@ -449,15 +456,15 @@ handle_gtid_pos_auto_create_request(THD *thd, void *hton)
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
end:
+ delete thd;
if (engine)
plugin_unlock(NULL, engine);
}
-static bool slave_background_thread_running;
-static bool slave_background_thread_stop;
static bool slave_background_thread_gtid_loaded;
+<<<<<<< HEAD
static struct slave_background_kill_t {
slave_background_kill_t *next;
THD *to_kill;
@@ -473,21 +480,31 @@ static volatile bool slave_background_gtid_pending_delete_flag;
pthread_handler_t
handle_slave_background(void *arg __attribute__((unused)))
-{
- THD *thd;
- PSI_stage_info old_stage;
- bool stop;
+||||||| 75538f94ca0
+static struct slave_background_kill_t {
+ slave_background_kill_t *next;
+ THD *to_kill;
+} *slave_background_kill_list;
- my_thread_init();
- thd= new THD(next_thread_id());
+static struct slave_background_gtid_pos_create_t {
+ slave_background_gtid_pos_create_t *next;
+ void *hton;
+} *slave_background_gtid_pos_create_list;
+
+
+pthread_handler_t
+handle_slave_background(void *arg __attribute__((unused)))
+=======
+static void bg_rpl_load_gtid_slave_state(void *)
+>>>>>>> bb-10.3-release
+{
+ THD *thd= new THD(next_thread_id());
thd->thread_stack= (char*) &thd; /* Set approximate stack start */
thd->system_thread = SYSTEM_THREAD_SLAVE_BACKGROUND;
thd->store_globals();
thd->security_ctx->skip_grants();
thd->set_command(COM_DAEMON);
-#ifdef WITH_WSREP
thd->variables.wsrep_on= 0;
-#endif
thd_proc_info(thd, "Loading slave GTID position from table");
if (rpl_load_gtid_slave_state(thd))
@@ -497,8 +514,10 @@ handle_slave_background(void *arg __attribute__((unused)))
thd->get_stmt_da()->sql_errno(),
thd->get_stmt_da()->message());
- mysql_mutex_lock(&LOCK_slave_background);
+ // hijacking global_rpl_thread_pool cond here - it's only once on startup
+ mysql_mutex_lock(&global_rpl_thread_pool.LOCK_rpl_thread_pool);
slave_background_thread_gtid_loaded= true;
+<<<<<<< HEAD
mysql_cond_broadcast(&COND_slave_background);
THD_STAGE_INFO(thd, stage_slave_background_process_request);
@@ -569,35 +588,100 @@ handle_slave_background(void *arg __attribute__((unused)))
mysql_cond_broadcast(&COND_slave_background);
mysql_mutex_unlock(&LOCK_slave_background);
+||||||| 75538f94ca0
+ mysql_cond_broadcast(&COND_slave_background);
+
+ THD_STAGE_INFO(thd, stage_slave_background_process_request);
+ do
+ {
+ slave_background_kill_t *kill_list;
+ slave_background_gtid_pos_create_t *create_list;
+
+ thd->ENTER_COND(&COND_slave_background, &LOCK_slave_background,
+ &stage_slave_background_wait_request,
+ &old_stage);
+ for (;;)
+ {
+ stop= abort_loop || thd->killed || slave_background_thread_stop;
+ kill_list= slave_background_kill_list;
+ create_list= slave_background_gtid_pos_create_list;
+ if (stop || kill_list || create_list)
+ break;
+ mysql_cond_wait(&COND_slave_background, &LOCK_slave_background);
+ }
+
+ slave_background_kill_list= NULL;
+ slave_background_gtid_pos_create_list= NULL;
+ thd->EXIT_COND(&old_stage);
+
+ while (kill_list)
+ {
+ slave_background_kill_t *p = kill_list;
+ THD *to_kill= p->to_kill;
+ kill_list= p->next;
+
+ to_kill->awake(KILL_CONNECTION);
+ mysql_mutex_lock(&to_kill->LOCK_wakeup_ready);
+ to_kill->rgi_slave->killed_for_retry=
+ rpl_group_info::RETRY_KILL_KILLED;
+ mysql_cond_broadcast(&to_kill->COND_wakeup_ready);
+ mysql_mutex_unlock(&to_kill->LOCK_wakeup_ready);
+ my_free(p);
+ }
+
+ while (create_list)
+ {
+ slave_background_gtid_pos_create_t *next= create_list->next;
+ void *hton= create_list->hton;
+ handle_gtid_pos_auto_create_request(thd, hton);
+ my_free(create_list);
+ create_list= next;
+ }
+
+ mysql_mutex_lock(&LOCK_slave_background);
+ } while (!stop);
+
+ slave_background_thread_running= false;
+ mysql_cond_broadcast(&COND_slave_background);
+ mysql_mutex_unlock(&LOCK_slave_background);
+
+=======
+ mysql_cond_signal(&global_rpl_thread_pool.COND_rpl_thread_pool);
+ mysql_mutex_unlock(&global_rpl_thread_pool.LOCK_rpl_thread_pool);
+>>>>>>> bb-10.3-release
delete thd;
+<<<<<<< HEAD
my_thread_end();
return 0;
-}
+||||||| 75538f94ca0
+ thread_safe_decrement32(&service_thread_count);
+ signal_thd_deleted();
+ my_thread_end();
+ return 0;
+=======
+>>>>>>> bb-10.3-release
+}
+static void bg_slave_kill(void *victim)
+{
+ THD *to_kill= (THD *)victim;
+ to_kill->awake(KILL_CONNECTION);
+ mysql_mutex_lock(&to_kill->LOCK_wakeup_ready);
+ to_kill->rgi_slave->killed_for_retry= rpl_group_info::RETRY_KILL_KILLED;
+ mysql_cond_broadcast(&to_kill->COND_wakeup_ready);
+ mysql_mutex_unlock(&to_kill->LOCK_wakeup_ready);
+}
-void
-slave_background_kill_request(THD *to_kill)
+void slave_background_kill_request(THD *to_kill)
{
if (to_kill->rgi_slave->killed_for_retry)
return; // Already deadlock killed.
- slave_background_kill_t *p=
- (slave_background_kill_t *)my_malloc(sizeof(*p), MYF(MY_WME));
- if (p)
- {
- p->to_kill= to_kill;
- to_kill->rgi_slave->killed_for_retry=
- rpl_group_info::RETRY_KILL_PENDING;
- mysql_mutex_lock(&LOCK_slave_background);
- p->next= slave_background_kill_list;
- slave_background_kill_list= p;
- mysql_cond_signal(&COND_slave_background);
- mysql_mutex_unlock(&LOCK_slave_background);
- }
+ to_kill->rgi_slave->killed_for_retry= rpl_group_info::RETRY_KILL_PENDING;
+ mysql_manager_submit(bg_slave_kill, to_kill);
}
-
/*
This function must only be called from a slave SQL thread (or worker thread),
to ensure that the table_entry will not go away before we can lock the
@@ -607,23 +691,18 @@ void
slave_background_gtid_pos_create_request(
rpl_slave_state::gtid_pos_table *table_entry)
{
- slave_background_gtid_pos_create_t *p;
-
if (table_entry->state != rpl_slave_state::GTID_POS_AUTO_CREATE)
return;
- p= (slave_background_gtid_pos_create_t *)my_malloc(sizeof(*p), MYF(MY_WME));
- if (!p)
- return;
mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
if (table_entry->state != rpl_slave_state::GTID_POS_AUTO_CREATE)
{
- my_free(p);
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
return;
}
table_entry->state= rpl_slave_state::GTID_POS_CREATE_REQUESTED;
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+<<<<<<< HEAD
p->hton= table_entry->table_hton;
mysql_mutex_lock(&LOCK_slave_background);
p->next= slave_background_gtid_pos_create_list;
@@ -698,6 +777,67 @@ stop_slave_background_thread()
while (slave_background_thread_running)
mysql_cond_wait(&COND_slave_background, &LOCK_slave_background);
mysql_mutex_unlock(&LOCK_slave_background);
+||||||| 75538f94ca0
+ p->hton= table_entry->table_hton;
+ mysql_mutex_lock(&LOCK_slave_background);
+ p->next= slave_background_gtid_pos_create_list;
+ slave_background_gtid_pos_create_list= p;
+ mysql_cond_signal(&COND_slave_background);
+ mysql_mutex_unlock(&LOCK_slave_background);
+}
+
+
+/*
+ Start the slave background thread.
+
+ This thread is currently used for two purposes:
+
+ 1. To load the GTID state from mysql.gtid_slave_pos at server start; reading
+ from table requires valid THD, which is otherwise not available during
+ server init.
+
+ 2. To kill worker thread transactions during parallel replication, when a
+ storage engine attempts to take an errorneous conflicting lock that would
+ cause a deadlock. Killing is done asynchroneously, as the kill may not
+ be safe within the context of a callback from inside storage engine
+ locking code.
+*/
+static int
+start_slave_background_thread()
+{
+ pthread_t th;
+
+ slave_background_thread_running= true;
+ slave_background_thread_stop= false;
+ slave_background_thread_gtid_loaded= false;
+ if (mysql_thread_create(key_thread_slave_background,
+ &th, &connection_attrib, handle_slave_background,
+ NULL))
+ {
+ sql_print_error("Failed to create thread while initialising slave");
+ return 1;
+ }
+ mysql_mutex_lock(&LOCK_slave_background);
+ while (!slave_background_thread_gtid_loaded)
+ mysql_cond_wait(&COND_slave_background, &LOCK_slave_background);
+ mysql_mutex_unlock(&LOCK_slave_background);
+
+ return 0;
+}
+
+
+static void
+stop_slave_background_thread()
+{
+ mysql_mutex_lock(&LOCK_slave_background);
+ slave_background_thread_stop= true;
+ mysql_cond_broadcast(&COND_slave_background);
+ while (slave_background_thread_running)
+ mysql_cond_wait(&COND_slave_background, &LOCK_slave_background);
+ mysql_mutex_unlock(&LOCK_slave_background);
+=======
+ mysql_manager_submit(bg_gtid_pos_auto_create, table_entry->table_hton);
+>>>>>>> bb-10.3-release
}
@@ -712,12 +852,19 @@ int init_slave()
init_slave_psi_keys();
#endif
- if (start_slave_background_thread())
- return 1;
-
if (global_rpl_thread_pool.init(opt_slave_parallel_threads))
return 1;
+ slave_background_thread_gtid_loaded= false;
+ mysql_manager_submit(bg_rpl_load_gtid_slave_state, NULL);
+
+ // hijacking global_rpl_thread_pool cond here - it's only once on startup
+ mysql_mutex_lock(&global_rpl_thread_pool.LOCK_rpl_thread_pool);
+ while (!slave_background_thread_gtid_loaded)
+ mysql_cond_wait(&global_rpl_thread_pool.COND_rpl_thread_pool,
+ &global_rpl_thread_pool.LOCK_rpl_thread_pool);
+ mysql_mutex_unlock(&global_rpl_thread_pool.LOCK_rpl_thread_pool);
+
/*
This is called when mysqld starts. Before client connections are
accepted. However bootstrap may conflict with us if it does START SLAVE.
@@ -1443,10 +1590,15 @@ void slave_prepare_for_shutdown()
mysql_mutex_lock(&LOCK_active_mi);
master_info_index->free_connections();
mysql_mutex_unlock(&LOCK_active_mi);
+<<<<<<< HEAD
// It's safe to destruct worker pool now when
// all driver threads are gone.
global_rpl_thread_pool.deactivate();
stop_slave_background_thread();
+||||||| 75538f94ca0
+ stop_slave_background_thread();
+=======
+>>>>>>> bb-10.3-release
}
/*
@@ -1477,8 +1629,6 @@ void end_slave()
active_mi= 0;
mysql_mutex_unlock(&LOCK_active_mi);
- stop_slave_background_thread();
-
global_rpl_thread_pool.destroy();
free_all_rpl_filters();
DBUG_VOID_RETURN;
@@ -4737,10 +4887,7 @@ pthread_handler_t handle_slave_io(void *arg)
goto err;
}
-
-#ifdef WITH_WSREP
thd->variables.wsrep_on= 0;
-#endif
if (DBUG_EVALUATE_IF("failed_slave_start", 1, 0)
|| repl_semisync_slave.slave_start(mi))
{
@@ -5060,8 +5207,11 @@ log space");
err:
// print the current replication position
if (mi->using_gtid == Master_info::USE_GTID_NO)
+ {
sql_print_information("Slave I/O thread exiting, read up to log '%s', "
"position %llu", IO_RPL_LOG_NAME, mi->master_log_pos);
+ sql_print_information("master was %s:%d", mi->host, mi->port);
+ }
else
{
StringBuffer<100> tmp;
@@ -5070,6 +5220,7 @@ err:
"position %llu; GTID position %s",
IO_RPL_LOG_NAME, mi->master_log_pos,
tmp.c_ptr_safe());
+ sql_print_information("master was %s:%d", mi->host, mi->port);
}
repl_semisync_slave.slave_stop(mi);
thd->reset_query();
@@ -5672,6 +5823,7 @@ pthread_handler_t handle_slave_sql(void *arg)
sql_print_information("Slave SQL thread exiting, replication stopped in "
"log '%s' at position %llu%s", RPL_LOG_NAME,
rli->group_master_log_pos, tmp.c_ptr_safe());
+ sql_print_information("master was %s:%d", mi->host, mi->port);
}
#ifdef WITH_WSREP
wsrep_after_command_before_result(thd);