From 12c760ef71167a1ce6e1adaa084fb196b88e2e55 Mon Sep 17 00:00:00 2001 From: unknown Date: Tue, 8 Oct 2013 14:36:06 +0200 Subject: MDEV-4506: Parallel replication. Improve STOP SLAVE in parallel mode. Now, the parallel part will queue the current event group to the end, and then stop queing any more events. Each worker will complete the current event group, and then just skip any further queued events. --- sql/rpl_parallel.cc | 56 ++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 49 insertions(+), 7 deletions(-) (limited to 'sql/rpl_parallel.cc') diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 6c8c5b5c3fa..e80512a3580 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -77,6 +77,28 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, } +static bool +sql_worker_killed(THD *thd, rpl_group_info *rgi, bool in_event_group) +{ + if (!rgi->rli->abort_slave && !abort_loop) + return false; + + /* + Do not abort in the middle of an event group that cannot be rolled back. + */ + if ((thd->transaction.all.modified_non_trans_table || + (thd->variables.option_bits & OPTION_KEEP_LOG)) + && in_event_group) + return false; + /* ToDo: should we add some timeout like in sql_slave_killed? + if (rgi->last_event_start_time == 0) + rgi->last_event_start_time= my_time(0); + */ + + return true; +} + + pthread_handler_t handle_rpl_parallel_thread(void *arg) { @@ -131,7 +153,6 @@ handle_rpl_parallel_thread(void *arg) "Waiting for work from SQL thread"); while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed) mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread); - /* Mark that this thread is now executing */ rpt->event_queue= rpt->last_in_queue= NULL; thd->exit_cond(old_msg); @@ -159,7 +180,7 @@ handle_rpl_parallel_thread(void *arg) (0 != (static_cast(events->ev)->flags2 & Gtid_log_event::FL_STANDALONE)); - /* Save this, as it gets cleared once event group commits. */ + /* Save this, as it gets cleared when the event group commits. */ event_gtid_sub_id= rgi->gtid_sub_id; rgi->thd= thd; @@ -197,7 +218,16 @@ handle_rpl_parallel_thread(void *arg) thd->wait_for_commit_ptr= &rgi->commit_orderer; } - rpt_handle_event(events, rpt); + /* + If the SQL thread is stopping, we just skip execution of all the + following event groups. We still do all the normal waiting and wakeup + processing between the event groups as a simple way to ensure that + everything is stopped and cleaned up correctly. + */ + if (!sql_worker_killed(thd, rgi, in_event_group)) + rpt_handle_event(events, rpt); + else + thd->wait_for_prior_commit(); end_of_group= in_event_group && @@ -207,7 +237,6 @@ handle_rpl_parallel_thread(void *arg) (!strcmp("COMMIT", ((Query_log_event *)events->ev)->query) || !strcmp("ROLLBACK", ((Query_log_event *)events->ev)->query)))); - /* ToDo: must use rgi here, not rli, for thread safety. */ delete_or_keep_event_post_apply(rgi, event_type, events->ev); my_free(events); @@ -516,7 +545,7 @@ free_rpl_parallel_entry(void *element) rpl_parallel::rpl_parallel() : - current(NULL) + current(NULL), sql_thread_stopping(false) { my_hash_init(&domain_hash, &my_charset_bin, 32, offsetof(rpl_parallel_entry, domain_id), sizeof(uint32), @@ -529,6 +558,7 @@ rpl_parallel::reset() { my_hash_reset(&domain_hash); current= NULL; + sql_thread_stopping= false; } @@ -591,10 +621,22 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) rpl_group_info *rgi= NULL; Relay_log_info *rli= serial_rgi->rli; enum Log_event_type typ; + bool is_group_event; /* ToDo: what to do with this lock?!? */ mysql_mutex_unlock(&rli->data_lock); + /* + Stop queueing additional event groups once the SQL thread is requested to + stop. + */ + if (((typ= ev->get_type_code()) == GTID_EVENT || + !(is_group_event= Log_event::is_group_event(typ))) && + rli->abort_slave) + sql_thread_stopping= true; + if (sql_thread_stopping) + return false; + if (!(qev= (rpl_parallel_thread::queued_event *)my_malloc(sizeof(*qev), MYF(0)))) { @@ -604,7 +646,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) qev->ev= ev; qev->next= NULL; - if ((typ= ev->get_type_code()) == GTID_EVENT) + if (typ == GTID_EVENT) { Gtid_log_event *gtid_ev= static_cast(ev); @@ -714,7 +756,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) e->current_sub_id= rgi->gtid_sub_id; current= rgi->parallel_entry= e; } - else if (!Log_event::is_group_event(typ) || !current) + else if (!is_group_event || !current) { /* Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread. -- cgit v1.2.1