diff options
author | unknown <knielsen@knielsen-hq.org> | 2013-10-08 14:36:06 +0200 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2013-10-08 14:36:06 +0200 |
commit | 12c760ef71167a1ce6e1adaa084fb196b88e2e55 (patch) | |
tree | efd36c70ae7510e9ba220af9e0ba7dc38497025a /sql/rpl_parallel.cc | |
parent | 45c3c71513b68b8de79f3e0a5e9779e7e8021716 (diff) | |
download | mariadb-git-12c760ef71167a1ce6e1adaa084fb196b88e2e55.tar.gz |
MDEV-4506: Parallel replication.
Improve STOP SLAVE in parallel mode.
Now, the parallel part will queue the current event group to the
end, and then stop queing any more events. Each worker will
complete the current event group, and then just skip any further
queued events.
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r-- | sql/rpl_parallel.cc | 56 |
1 files changed, 49 insertions, 7 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 6c8c5b5c3fa..e80512a3580 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -77,6 +77,28 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, } +static bool +sql_worker_killed(THD *thd, rpl_group_info *rgi, bool in_event_group) +{ + if (!rgi->rli->abort_slave && !abort_loop) + return false; + + /* + Do not abort in the middle of an event group that cannot be rolled back. + */ + if ((thd->transaction.all.modified_non_trans_table || + (thd->variables.option_bits & OPTION_KEEP_LOG)) + && in_event_group) + return false; + /* ToDo: should we add some timeout like in sql_slave_killed? + if (rgi->last_event_start_time == 0) + rgi->last_event_start_time= my_time(0); + */ + + return true; +} + + pthread_handler_t handle_rpl_parallel_thread(void *arg) { @@ -131,7 +153,6 @@ handle_rpl_parallel_thread(void *arg) "Waiting for work from SQL thread"); while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed) mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread); - /* Mark that this thread is now executing */ rpt->event_queue= rpt->last_in_queue= NULL; thd->exit_cond(old_msg); @@ -159,7 +180,7 @@ handle_rpl_parallel_thread(void *arg) (0 != (static_cast<Gtid_log_event *>(events->ev)->flags2 & Gtid_log_event::FL_STANDALONE)); - /* Save this, as it gets cleared once event group commits. */ + /* Save this, as it gets cleared when the event group commits. */ event_gtid_sub_id= rgi->gtid_sub_id; rgi->thd= thd; @@ -197,7 +218,16 @@ handle_rpl_parallel_thread(void *arg) thd->wait_for_commit_ptr= &rgi->commit_orderer; } - rpt_handle_event(events, rpt); + /* + If the SQL thread is stopping, we just skip execution of all the + following event groups. We still do all the normal waiting and wakeup + processing between the event groups as a simple way to ensure that + everything is stopped and cleaned up correctly. + */ + if (!sql_worker_killed(thd, rgi, in_event_group)) + rpt_handle_event(events, rpt); + else + thd->wait_for_prior_commit(); end_of_group= in_event_group && @@ -207,7 +237,6 @@ handle_rpl_parallel_thread(void *arg) (!strcmp("COMMIT", ((Query_log_event *)events->ev)->query) || !strcmp("ROLLBACK", ((Query_log_event *)events->ev)->query)))); - /* ToDo: must use rgi here, not rli, for thread safety. */ delete_or_keep_event_post_apply(rgi, event_type, events->ev); my_free(events); @@ -516,7 +545,7 @@ free_rpl_parallel_entry(void *element) rpl_parallel::rpl_parallel() : - current(NULL) + current(NULL), sql_thread_stopping(false) { my_hash_init(&domain_hash, &my_charset_bin, 32, offsetof(rpl_parallel_entry, domain_id), sizeof(uint32), @@ -529,6 +558,7 @@ rpl_parallel::reset() { my_hash_reset(&domain_hash); current= NULL; + sql_thread_stopping= false; } @@ -591,10 +621,22 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) rpl_group_info *rgi= NULL; Relay_log_info *rli= serial_rgi->rli; enum Log_event_type typ; + bool is_group_event; /* ToDo: what to do with this lock?!? */ mysql_mutex_unlock(&rli->data_lock); + /* + Stop queueing additional event groups once the SQL thread is requested to + stop. + */ + if (((typ= ev->get_type_code()) == GTID_EVENT || + !(is_group_event= Log_event::is_group_event(typ))) && + rli->abort_slave) + sql_thread_stopping= true; + if (sql_thread_stopping) + return false; + if (!(qev= (rpl_parallel_thread::queued_event *)my_malloc(sizeof(*qev), MYF(0)))) { @@ -604,7 +646,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) qev->ev= ev; qev->next= NULL; - if ((typ= ev->get_type_code()) == GTID_EVENT) + if (typ == GTID_EVENT) { Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); @@ -714,7 +756,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) e->current_sub_id= rgi->gtid_sub_id; current= rgi->parallel_entry= e; } - else if (!Log_event::is_group_event(typ) || !current) + else if (!is_group_event || !current) { /* Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread. |