diff options
author | Sachin <sachin.setiya@mariadb.com> | 2020-05-12 17:00:26 +0530 |
---|---|---|
committer | Sachin <sachin.setiya@mariadb.com> | 2020-05-12 17:00:26 +0530 |
commit | 7c6fe2289098566b607891cf831bf3affeedeffb (patch) | |
tree | 7965d2e78fced89d829887f9a30d32e61d65b138 | |
parent | 423283350fb94f8fe8b704a7a028f251df401a4f (diff) | |
download | mariadb-git-10.5-olter-v3.tar.gz |
completed10.5-olter-v3
-rw-r--r-- | mysql-test/suite/rpl/t/r_ttt.test | 10 | ||||
-rw-r--r-- | mysql-test/suite/rpl/t/r_ttt3.test | 49 | ||||
-rw-r--r-- | mysql-test/suite/rpl/t/rpl_start_alter_shutdown_1.test | 47 | ||||
-rw-r--r-- | mysql-test/suite/rpl/t/rpl_start_alter_shutdown_2.test | 69 | ||||
-rw-r--r-- | sql/rpl_mi.h | 1 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 180 | ||||
-rw-r--r-- | sql/rpl_parallel.h | 12 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 4 | ||||
-rw-r--r-- | sql/rpl_rli.h | 2 | ||||
-rw-r--r-- | sql/slave.cc | 52 | ||||
-rw-r--r-- | sql/sql_alter.cc | 12 | ||||
-rw-r--r-- | sql/sql_table.cc | 49 |
12 files changed, 338 insertions, 149 deletions
diff --git a/mysql-test/suite/rpl/t/r_ttt.test b/mysql-test/suite/rpl/t/r_ttt.test index 761b30627f7..eb54108a1fd 100644 --- a/mysql-test/suite/rpl/t/r_ttt.test +++ b/mysql-test/suite/rpl/t/r_ttt.test @@ -3,10 +3,9 @@ --source include/master-slave.inc --connection slave -call mtr.add_suppression("Warning: 72 bytes lost at .*"); set global debug_dbug="+d,start_alter_delay_slave"; stop slave; -SET GLOBAL slave_parallel_threads=10; +SET GLOBAL slave_parallel_threads=2; set global slave_parallel_mode=optimistic; change master to master_use_gtid=slave_pos; start slave; @@ -61,13 +60,13 @@ set gtid_domain_id=22; set gtid_domain_id=22; --send alter table t3 add column c int, force, algorithm=inplace; --connection con4 -set gtid_domain_id=24; +set gtid_domain_id=23; --send alter table t4 add column c int, force, algorithm=inplace; --connection con5 -set gtid_domain_id=24; +set gtid_domain_id=23; --send alter table t5 add column c int, force, algorithm=inplace; --connection con6 -set gtid_domain_id=24; +set gtid_domain_id=23; --send alter table t6 add column c int, force, algorithm=inplace; --connection con1 @@ -115,6 +114,7 @@ select @@gtid_slave_pos; --connection master --sync_slave_with_master select @@gtid_slave_pos; +show binlog events; --connection master drop table t1,t2,t3,t4,t5,t6; diff --git a/mysql-test/suite/rpl/t/r_ttt3.test b/mysql-test/suite/rpl/t/r_ttt3.test new file mode 100644 index 00000000000..994e3807499 --- /dev/null +++ b/mysql-test/suite/rpl/t/r_ttt3.test @@ -0,0 +1,49 @@ +--source include/have_binlog_format_row.inc +--source include/have_innodb.inc +--source include/master-slave.inc + +--connection slave +stop slave; +SET GLOBAL slave_parallel_threads=1; +set global slave_parallel_mode=optimistic; +set global debug_dbug="+d,start_alter_delay_slave"; +change master to master_use_gtid= current_pos; +start slave; + + +--connection master +set global debug_dbug="+d,start_alter_delay_master"; +set global binlog_split_alter=true; +create table t1( a int primary key, b int) engine=innodb; +connect (con1,localhost,root,,); +connect (con2,localhost,root,,); +connect (con3,localhost,root,,); +connect (con4,localhost,root,,); +alter table t1 add column c int, force, algorithm=inplace; + +#--source include/kill_mysqld.inc +#--let $rpl_server_number= 1 +#--source include/rpl_stop_server.inc +create table t3( a int primary key, b int) engine=innodb; + +show binlog events; + +--sleep 5 +--connection slave +show binlog events; + +--connection master +--sync_slave_with_master +show binlog events; +show create table t1; + +--connection master +drop table t1,t3; +set global binlog_split_alter= false; + +--connection slave +stop slave; +SET GLOBAL slave_parallel_threads=0; +set global slave_parallel_mode=conservative; +start slave; +--source include/rpl_end.inc diff --git a/mysql-test/suite/rpl/t/rpl_start_alter_shutdown_1.test b/mysql-test/suite/rpl/t/rpl_start_alter_shutdown_1.test index 7df2ea41fd2..f576f7376f0 100644 --- a/mysql-test/suite/rpl/t/rpl_start_alter_shutdown_1.test +++ b/mysql-test/suite/rpl/t/rpl_start_alter_shutdown_1.test @@ -7,7 +7,7 @@ SET @old_debug_slave= @@global.debug; set global debug_dbug="+d,start_alter_delay_slave"; stop slave; -SET GLOBAL slave_parallel_threads=10; +SET GLOBAL slave_parallel_threads=4; set global slave_parallel_mode=optimistic; start slave; @@ -19,6 +19,7 @@ start slave; SET @old_debug_master= @@global.debug; set global debug_dbug="+d,start_alter_delay_master"; set global binlog_split_alter=true; +--disable_parsing create table t1( a int primary key, b int) engine=myisam; create table t2( a int primary key, b int) engine=myisam; connect (con1,localhost,root,,); @@ -28,29 +29,59 @@ connect (con2,localhost,root,,); --connection con2 --send alter table t2 add column c int; + #Shutdown Slave ---connection slave -select master_gtid_wait('0-1-4'); -select @@gtid_binlog_state; -select @@gtid_slave_pos; ---source include/stop_slave.inc +#--connection slave +#select master_gtid_wait('0-1-4'); +#select @@gtid_binlog_state; +#select @@gtid_slave_pos; +#--source include/stop_slave.inc --connection con1 --reap --connection con2 --reap +--enable_parsing create table t3( a int primary key, b int) engine=innodb; +#--connection slave +#--source include/start_slave.inc +--connection master +--sync_slave_with_master +show create table t3; +--connection slave +--source include/stop_slave.inc + + +--connection master +--exec echo "restart" > $MYSQLTEST_VARDIR/tmp/mysqld.1.expect +SET SESSION debug_dbug="d,start_alter_kill"; +--error 2013 +alter table t3 add column d int; + +--let $rpl_server_number= 1 +--source include/rpl_reconnect.inc +set global binlog_split_alter= true; +alter table t3 add column d int; +show create table t3; +flush logs; +show binlog events; +show binlog events in 'master-bin.000002'; +show binlog events in 'master-bin.000003'; + --connection slave --source include/start_slave.inc + --connection master --sync_slave_with_master -show create table t1; +show binlog events; + --connection master SET GLOBAL debug_dbug= @old_debug_master; -drop table t1,t2,t3; +#drop table t1,t2,t3; +drop table t3; set global binlog_split_alter= false; --sync_slave_with_master diff --git a/mysql-test/suite/rpl/t/rpl_start_alter_shutdown_2.test b/mysql-test/suite/rpl/t/rpl_start_alter_shutdown_2.test new file mode 100644 index 00000000000..0b19f9520e2 --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_start_alter_shutdown_2.test @@ -0,0 +1,69 @@ +--source include/have_log_bin.inc +--source include/have_innodb.inc +--source include/master-slave.inc +--source include/have_debug.inc + +--connection slave +SET @old_debug_slave= @@global.debug; +set global debug_dbug="+d,rpl_slave_stop_CA"; +stop slave; +SET GLOBAL slave_parallel_threads=4; +set global slave_parallel_mode=optimistic; +start slave; + +# +# SLAVE Shutdown +# +# +--connection master +SET @old_debug_master= @@global.debug; +set global debug_dbug="+d,start_alter_delay_master"; +set global binlog_split_alter=true; +create table t1( a int primary key, b int) engine=myisam; +create table t2( a int primary key, b int) engine=myisam; +connect (con1,localhost,root,,); +connect (con2,localhost,root,,); +--connection con1 +--send alter table t1 add column c int; +--connection con2 +--send alter table t2 add column c int; + + + +--connection con1 +--reap +--connection con2 +--reap +create table t3( a int primary key, b int) engine=innodb; +show binlog events; + +#Stop Slave +# Master binlog SA SA CA CA +# lets stop at first CA processing (in process_commit_alter) +--connection slave +set debug_sync="now wait_for CA_1_processing"; +connect(extra_slave,127.0.0.1,root,,test,$SLAVE_MYPORT); +--send stop slave; +--connection slave +set debug_sync="now signal proceed_CA_1"; +--connection extra_slave +--reap +SET GLOBAL debug_dbug= @old_debug_slave; +show binlog events; + +--connection slave +--source include/start_slave.inc +--connection master +--sync_slave_with_master +show binlog events; + +--connection master +drop table t1,t2,t3; +set global binlog_split_alter= false; + +--sync_slave_with_master +stop slave; +SET GLOBAL slave_parallel_threads=0; +set global slave_parallel_mode=conservative; +start slave; +--source include/rpl_end.inc diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h index e43b76569b6..025e1bd7e96 100644 --- a/sql/rpl_mi.h +++ b/sql/rpl_mi.h @@ -165,6 +165,7 @@ struct start_alter_info TODO maybe used later ? */ uint error; + char* table_name; enum start_alter_state state; /* We are not using mysql_cond_t because we do not need PSI */ mysql_cond_t start_alter_cond; diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 407787bde37..9878c3d9346 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -1377,10 +1377,13 @@ handle_rpl_parallel_thread(void *arg) if (end_of_group ) { in_event_group= false; - if (!(rgi->gtid_ev_flags3 & Gtid_log_event::FL_START_ALTER_E1) || - //TODO Do I need more strict check - (thd->rpt == thd->rgi_slave->parallel_entry->rpl_threads[0])) + //TODO + if (1) + //if (!rgi->finish_event_group_called) + { finish_event_group(rpt, event_gtid_sub_id, entry, rgi); + rgi->finish_event_group_called= false; + } rpt->loc_free_rgi(rgi); thd->rgi_slave= group_rgi= rgi= NULL; skip_event_group= false; @@ -1430,7 +1433,7 @@ handle_rpl_parallel_thread(void *arg) thd->rgi_slave= group_rgi= NULL; skip_event_group= false; } - if (!in_event_group && !rpt->current_start_alter_id) + if (!in_event_group) { /* If we are in a FLUSH TABLES FOR READ LOCK, wait for it */ while (rpt->current_entry && rpt->pause_for_ftwrl) @@ -1662,6 +1665,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, mysql_mutex_lock(&pool->threads[i]->LOCK_rpl_thread); pool->threads[i]->delay_start= false; pool->threads[i]->current_start_alter_id= 0; + pool->threads[i]->last_SA_rgi= NULL; mysql_cond_signal(&pool->threads[i]->COND_rpl_thread); while (!pool->threads[i]->running) mysql_cond_wait(&pool->threads[i]->COND_rpl_thread, @@ -1992,7 +1996,8 @@ void rpl_parallel_thread::__finish_event_group(rpl_group_info *group_rgi) } rpl_parallel_thread_pool::rpl_parallel_thread_pool() - : threads(0), free_list(0), count(0), inited(false), busy(false) + : threads(0), free_list(0), count(0), inited(false), current_start_alters(0), + busy(false) { } @@ -2041,13 +2046,24 @@ struct rpl_parallel_thread * rpl_parallel_thread_pool::get_thread(rpl_parallel_thread **owner, rpl_parallel_entry *entry) { + rpl_parallel_thread *rpt; + + DBUG_ASSERT(count > 0); mysql_mutex_lock(&LOCK_rpl_thread_pool); - return get_thread_locked(owner, entry); + while (unlikely(busy) || !(rpt= free_list)) + mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool); + free_list= rpt->next; + mysql_mutex_unlock(&LOCK_rpl_thread_pool); + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + rpt->current_owner= owner; + rpt->current_entry= entry; + + return rpt; } //We call this when we already have acquired LOCK_rpl_thread_pool mutex struct rpl_parallel_thread * -rpl_parallel_thread_pool::get_thread_locked(rpl_parallel_thread **owner, +rpl_parallel_thread_pool::get_thread_split_alter(rpl_parallel_thread **owner, rpl_parallel_entry *entry) { rpl_parallel_thread *rpt; @@ -2056,7 +2072,6 @@ rpl_parallel_thread_pool::get_thread_locked(rpl_parallel_thread **owner, while (unlikely(busy) || !(rpt= free_list)) mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool); free_list= rpt->next; - mysql_mutex_unlock(&LOCK_rpl_thread_pool); mysql_mutex_lock(&rpt->LOCK_rpl_thread); rpt->current_owner= owner; rpt->current_entry= entry; @@ -2087,35 +2102,25 @@ rpl_parallel_thread_pool::release_thread(rpl_parallel_thread *rpt) static void get_split_alter_thread_id_part_1(rpl_parallel_thread **rpl_threads, uint16 flags3, uint32 *idx, - uint32 rpl_thread_max) + uint32 rpl_thread_max, rpl_parallel_entry *e) { - /* - Commit alter/Rollback alter will always go to worker 1, to avoid deadlocks - */ - if (flags3 & (Gtid_log_event::FL_COMMIT_ALTER_E1 | - Gtid_log_event::FL_ROLLBACK_ALTER_E1 )) + for(uint i= 0; i < rpl_thread_max; i++) { - *idx= 0; - } - else - { - //Start with idx (> 0 ) or 1 (idx = 0) - for(uint i= *idx ? *idx : 1; i < rpl_thread_max; i++) + if (!rpl_threads[i] || rpl_threads[i]->current_owner != &rpl_threads[i] + || !rpl_threads[i]->current_start_alter_id) { - if (!rpl_threads[i] || rpl_threads[i]->current_owner != &rpl_threads[i] - || !rpl_threads[i]->current_start_alter_id) - { - *idx= i; - return; - } + //This condition will hit atleast one time no matter what happens + //worker per Domain id can have issues + //But easier to fix We will use some logic like count for rpl_entry + *idx= i; + return; } - //No worker is free , treat as a just a binlog start alter - *idx= 0; } return; } //TODO Change Signature -static void get_split_alter_thread_id_part_2(rpl_parallel_thread **rpl_threads, +//will return corrosponding SA id for CA/RA +static int get_split_alter_thread_id_part_2(rpl_parallel_thread **rpl_threads, uint16 flags3, uint32 rpl_thread_max, uint32 rpt_index, uint64 thread_id, rpl_parallel_entry *e) @@ -2135,32 +2140,30 @@ static void get_split_alter_thread_id_part_2(rpl_parallel_thread **rpl_threads, Gtid_log_event::FL_ROLLBACK_ALTER_E1 )) { //Free the corrosponding rpt current_start_alter_id - for(uint i= 1; i < rpl_thread_max; i++) + for(uint i= 0; i < rpl_thread_max; i++) { - //If we fail in this condition(after whole for) that means start alter - //is assigned to worker 1 if(rpl_threads[i] && rpl_threads[i]->current_start_alter_id == (int64)thread_id) { mysql_mutex_lock(&rpl_threads[i]->LOCK_rpl_thread); rpl_threads[i]->current_start_alter_id= 0; mysql_mutex_unlock(&rpl_threads[i]->LOCK_rpl_thread); - e->pending_start_alter--; - break; + sql_print_information("Setiya part 2 inside Commit alter pool_worker_id= %d alter_id= %d, entry->rpl_threads_index= %d Commit ", + rpl_threads[rpt_index] - *global_rpl_thread_pool.threads, thread_id, rpt_index); + return i; } } - sql_print_information("Setiya part 2 inside Commit alter pool_worker_id= %d alter_id= %d, entry->rpl_threads_index= %d Commit \n", + sql_print_information("Setiya part 2 inside Commit alter pool_worker_id= %d alter_id= %d, entry->rpl_threads_index= %d Commit ", rpl_threads[rpt_index] - *global_rpl_thread_pool.threads, thread_id, rpt_index); } - else if ((flags3 & Gtid_log_event::FL_START_ALTER_E1) && rpt_index) + else if ((flags3 & Gtid_log_event::FL_START_ALTER_E1)) { mysql_mutex_lock(&rpl_threads[rpt_index]->LOCK_rpl_thread); rpl_threads[rpt_index]->current_start_alter_id= thread_id; mysql_mutex_unlock(&rpl_threads[rpt_index]->LOCK_rpl_thread); - sql_print_information("Setiya part 2 inside Start alter pool_worker_id= %d alter_id= %d, entry->rpl_threads_index= %d Start \n", + sql_print_information("Setiya part 2 inside Start alter pool_worker_id= %d alter_id= %d, entry->rpl_threads_index= %d Start ", rpl_threads[rpt_index] - *global_rpl_thread_pool.threads, thread_id, rpt_index); } - - return; + return -1; } @@ -2185,18 +2188,18 @@ static void get_split_alter_thread_id_part_2(rpl_parallel_thread **rpl_threads, and the LOCK_rpl_thread must be released with THD::EXIT_COND() instead of mysql_mutex_unlock. - If the flag `reuse' is set, the last worker thread will be returned again, + If typ != GTID_EVENT, the last worker thread will be returned again, if it is still available. Otherwise a new worker thread is allocated. */ rpl_parallel_thread * rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond, - PSI_stage_info *old_stage, bool reuse, uint64 - thread_id) + PSI_stage_info *old_stage, enum Log_event_type typ, + uint64 thread_id) { uint32 idx; idx= rpl_thread_idx; - if (!reuse) + if (typ == GTID_EVENT) { ++idx; if (idx >= rpl_thread_max) @@ -2207,67 +2210,39 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond, if (rgi->gtid_ev_flags3 & (Gtid_log_event::FL_START_ALTER_E1 | Gtid_log_event::FL_COMMIT_ALTER_E1 | Gtid_log_event::FL_ROLLBACK_ALTER_E1 )) - { get_split_alter_thread_id_part_1(rpl_threads, rgi->gtid_ev_flags3, - &idx, rpl_thread_max); - //Reserve worker for Commit/Rollback - if (idx != 0) + &idx, rpl_thread_max, this); + rpl_thread_idx= idx; + } + else if (typ == QUERY_EVENT && rgi->gtid_ev_flags3 & + (Gtid_log_event::FL_START_ALTER_E1 | + Gtid_log_event::FL_COMMIT_ALTER_E1 | + Gtid_log_event::FL_ROLLBACK_ALTER_E1 )) + { + int SA_idx= get_split_alter_thread_id_part_2(rpl_threads, rgi->gtid_ev_flags3, + rpl_thread_max, idx, thread_id, this); + + mysql_mutex_lock(&global_rpl_thread_pool.LOCK_rpl_thread_pool); + if (rgi->gtid_ev_flags3 & Gtid_log_event::FL_START_ALTER_E1) + { + if (pending_start_alters < rpl_thread_max - 1 && + global_rpl_thread_pool.current_start_alters < global_rpl_thread_pool.count - 1) { - //TODO without mutex lock , maybe wrong ? - rpl_parallel_thread *t= rpl_threads[0]; - //Will happen in the case binlog Start Alter only - if (!t || t->current_owner != &rpl_threads[0]) - { - t= choose_thread_internal(rgi, did_enter_cond, old_stage, 0); - mysql_mutex_unlock(&t->LOCK_rpl_thread); - } - t->current_start_alter_id= -1; - pending_start_alter++; + pending_start_alters++; + global_rpl_thread_pool.current_start_alters++; } - else if (rgi->gtid_ev_flags3 & (Gtid_log_event::FL_COMMIT_ALTER_E1 | - Gtid_log_event::FL_ROLLBACK_ALTER_E1 )) + else { - //TODO without mutex lock , maybe wrong ? - rpl_parallel_thread *t= rpl_threads[0]; - //Will happen in the case binlog Start Alter only - if (!t || t->current_owner != &rpl_threads[0]) - t= choose_thread_internal(rgi, did_enter_cond, old_stage, 0); - else - //If it is allocated by Start alter mutex is released - mysql_mutex_lock(&t->LOCK_rpl_thread); - rpl_thread_idx= 0; - return t; + rpl_threads[idx]->last_SA_rgi->reserved_start_alter_thread= true; + rpl_threads[idx]->current_start_alter_id= 0; } } - else if(pending_start_alter) + else if(SA_idx >= 0) { - //Interleaved DML, No special treatment need either just get the non SA - //Worker or Worker_0 - get_split_alter_thread_id_part_1(rpl_threads, rgi->gtid_ev_flags3, - &idx, rpl_thread_max); - sql_print_information("Setiya DML pool_worker_id= %d, entry->rpl_threads_index= %d DML \n", - rpl_threads[idx] - *global_rpl_thread_pool.threads, idx); + pending_start_alters--; + global_rpl_thread_pool.current_start_alters--; } - sql_print_information("\n"); - sql_print_information("Setiya POOL INFO pending_start_alter = %d \n", pending_start_alter); - for(uint k=0; k < rpl_thread_max; k++) - { - sql_print_information("Setiya POOL INFO SA ID pool_worker_id= %d, current_start_alter_id= %d \n", - k, global_rpl_thread_pool.threads[k]->current_start_alter_id); - - } - sql_print_information("\n"); - - rpl_thread_idx= idx; - } - else if (rgi->gtid_ev_flags3 & (Gtid_log_event::FL_START_ALTER_E1 | - Gtid_log_event::FL_COMMIT_ALTER_E1 | - Gtid_log_event::FL_ROLLBACK_ALTER_E1 )) - { - get_split_alter_thread_id_part_2(rpl_threads, rgi->gtid_ev_flags3, - rpl_thread_max, idx, thread_id, this); - if (!pending_start_alter) - rpl_threads[idx]->current_start_alter_id= 0; //Free for reuse for start alter + mysql_mutex_unlock(&global_rpl_thread_pool.LOCK_rpl_thread_pool); } return choose_thread_internal(rgi, did_enter_cond, old_stage, idx); } @@ -2417,7 +2392,8 @@ rpl_parallel::find(uint32 domain_id) e->domain_id= domain_id; e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX; e->pause_sub_id= (uint64)ULONGLONG_MAX; - e->pending_start_alter= 0; + e->pending_start_alters= 0; + e->start_alter_reserved= 0; if (my_hash_insert(&domain_hash, (uchar *)e)) { my_free(e); @@ -2503,7 +2479,11 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli) if ((rpt= e->rpl_threads[j])) { mysql_mutex_lock(&rpt->LOCK_rpl_thread); - while (rpt->current_owner == &e->rpl_threads[j]) + //Dont wait for SA workers , But wait for CA/RA workers + //If CA/RA is executed that means corresponding SA is also executed + //And remaning SA will never recieve CA/RA so we have to manualy send it + while (rpt->current_owner == &e->rpl_threads[j] && + !(rpt->thd->rgi_slave->gtid_ev_flags3 & Gtid_log_event::FL_START_ALTER_E1)) mysql_cond_wait(&rpt->COND_rpl_thread_stop, &rpt->LOCK_rpl_thread); mysql_mutex_unlock(&rpt->LOCK_rpl_thread); } @@ -2878,6 +2858,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, delete_or_keep_event_post_apply(serial_rgi, typ, ev); return 0; } + sql_print_information("Setiya Domain=%d Seq_no= %d ",(int)gtid_ev->domain_id, (int)gtid_ev->seq_no); } else e= current; @@ -2897,7 +2878,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, */ cur_thread= e->choose_thread(serial_rgi, &did_enter_cond, &old_stage, - typ != GTID_EVENT, thread_id); + typ, thread_id); if (!cur_thread) { /* This means we were killed. The error is already signalled. */ @@ -2947,6 +2928,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, */ rgi->wait_commit_sub_id= e->current_sub_id; rgi->wait_commit_group_info= e->current_group_info; + cur_thread->last_SA_rgi= rgi; speculation= rpl_group_info::SPECULATE_NO; new_gco= true; diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index f52f28ae5e9..2bca3f5ff77 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -144,6 +144,8 @@ struct rpl_parallel_thread { size_t event_size; } *event_queue, *last_in_queue; uint64 queued_size; + // SA rgi + rpl_group_info *last_SA_rgi; /* These free lists are protected by LOCK_rpl_thread. */ queued_event *qev_free_list; rpl_group_info *rgi_free_list; @@ -244,6 +246,9 @@ struct rpl_parallel_thread_pool { mysql_cond_t COND_rpl_thread_pool; uint32 count; bool inited; + //Please lock first LOCK_rpl_thread_pool and then LOCK_rpl_thread to + //update this variable. + uint32 current_start_alters; /* While FTWRL runs, this counter is incremented to make SQL thread or STOP/START slave not try to start new activity while that operation @@ -256,7 +261,7 @@ struct rpl_parallel_thread_pool { void destroy(); struct rpl_parallel_thread *get_thread(rpl_parallel_thread **owner, rpl_parallel_entry *entry); - struct rpl_parallel_thread *get_thread_locked(rpl_parallel_thread **owner, + struct rpl_parallel_thread *get_thread_split_alter(rpl_parallel_thread **owner, rpl_parallel_entry *entry); void release_thread(rpl_parallel_thread *rpt); }; @@ -273,7 +278,8 @@ struct rpl_parallel_entry { */ uint32 need_sub_id_signal; uint64 last_commit_id; - uint32 pending_start_alter; + uint32 pending_start_alters; + uint32 start_alter_reserved; bool active; /* Set when SQL thread is shutting down, and no more events can be processed, @@ -360,7 +366,7 @@ struct rpl_parallel_entry { group_commit_orderer *current_gco; rpl_parallel_thread * choose_thread(rpl_group_info *rgi, bool *did_enter_cond, - PSI_stage_info *old_stage, bool reuse, + PSI_stage_info *old_stage, enum Log_event_type typ, uint64 thread_id); rpl_parallel_thread * choose_thread_internal(rpl_group_info *rgi, bool *did_enter_cond, PSI_stage_info *old_stage, uint32 idx); diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index fc2f89c5876..21d2b645eff 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -2094,6 +2094,7 @@ rpl_group_info::reinit(Relay_log_info *rli) long_find_row_note_printed= false; did_mark_start_commit= false; gtid_ev_flags2= 0; + gtid_ev_flags3= 0; last_master_timestamp = 0; gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL; speculation= SPECULATE_NO; @@ -2104,7 +2105,8 @@ rpl_group_info::rpl_group_info(Relay_log_info *rli) : thd(0), wait_commit_sub_id(0), wait_commit_group_info(0), parallel_entry(0), deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false), - gtid_ev_flags2(0), gtid_ev_flags3(0) + gtid_ev_flags2(0), gtid_ev_flags3(0), reserved_start_alter_thread(0), + finish_event_group_called(0) { reinit(rli); bzero(¤t_gtid, sizeof(current_gtid)); diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 06f40a8df97..af8997b9830 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -809,6 +809,8 @@ struct rpl_group_info RETRY_KILL_KILLED }; uchar killed_for_retry; + bool reserved_start_alter_thread; + bool finish_event_group_called; rpl_group_info(Relay_log_info *rli_); ~rpl_group_info(); diff --git a/sql/slave.cc b/sql/slave.cc index 54717536dfd..ed1e3d11a90 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -5657,7 +5657,34 @@ pthread_handler_t handle_slave_sql(void *arg) err: if (mi->using_parallel()) + { rli->parallel.wait_for_done(thd, rli); + /* + shutdown the alter threads waiting on C/R ALter + What if worker thread has not registered its start alter in alter_list ? + In that case rpt->stop + TODO sideeffects + */ + start_alter_info *info=NULL; + mysql_mutex_lock(&mi->start_alter_list_lock); + List_iterator<start_alter_info> info_iterator(mi->start_alter_list); + while ((info= info_iterator++)) + { + mysql_mutex_lock(&mi->start_alter_lock); + info->state= start_alter_state::ROLLBACK_ALTER; + mysql_cond_broadcast(&info->start_alter_cond); + mysql_mutex_unlock(&mi->start_alter_lock); + mysql_mutex_lock(&mi->start_alter_lock); + while(info->state == start_alter_state::ROLLBACK_ALTER) + mysql_cond_wait(&info->start_alter_cond, &mi->start_alter_lock); + mysql_mutex_unlock(&mi->start_alter_lock); + DBUG_ASSERT(info->state == start_alter_state::COMMITTED); + info_iterator.remove(); + mysql_cond_destroy(&info->start_alter_cond); + my_free(info); + } + mysql_mutex_unlock(&mi->start_alter_list_lock); + } /* Thread stopped. Print the current replication position to the log */ { @@ -5756,35 +5783,12 @@ err_during_init: /* We need data_lock, at least to wake up any waiting master_pos_wait() */ mysql_mutex_lock(&rli->data_lock); DBUG_ASSERT(rli->slave_running == MYSQL_SLAVE_RUN_NOT_CONNECT); // tracking buffer overrun - /* - shutdown the alter threads waiting on C/R ALter - What if worker thread has not registered its start alter in alter_list ? - In that case rpt->stop - TODO sideeffects - */ - start_alter_info *info=NULL; - List_iterator<start_alter_info> info_iterator(mi->start_alter_list); - while ((info= info_iterator++)) - { - mysql_mutex_lock(&mi->start_alter_lock); - info->state= start_alter_state::ROLLBACK_ALTER; - mysql_cond_broadcast(&info->start_alter_cond); - mysql_mutex_unlock(&mi->start_alter_lock); - mysql_mutex_lock(&mi->start_alter_lock); - while(info->state == start_alter_state::ROLLBACK_ALTER) - mysql_cond_wait(&info->start_alter_cond, &mi->start_alter_lock); - mysql_mutex_unlock(&mi->start_alter_lock); - DBUG_ASSERT(info->state == start_alter_state::COMMITTED); - info_iterator.remove(); - mysql_cond_destroy(&info->start_alter_cond); - my_free(info); - } /* When master_pos_wait() wakes up it will check this and terminate */ rli->slave_running= MYSQL_SLAVE_NOT_RUN; /* Forget the relay log's format */ delete rli->relay_log.description_event_for_exec; rli->relay_log.description_event_for_exec= 0; - rli->reset_inuse_relaylog(); + //rli->reset_inuse_relaylog(); /* Wake up master_pos_wait() */ mysql_mutex_unlock(&rli->data_lock); DBUG_PRINT("info",("Signaling possibly waiting master_pos_wait() functions")); diff --git a/sql/sql_alter.cc b/sql/sql_alter.cc index 6cd8b0a5dae..6c7ac45d204 100644 --- a/sql/sql_alter.cc +++ b/sql/sql_alter.cc @@ -21,6 +21,7 @@ #include "sql_alter.h" #include "rpl_mi.h" #include "slave.h" +#include "debug_sync.h" #include "wsrep_mysqld.h" Alter_info::Alter_info(const Alter_info &rhs, MEM_ROOT *mem_root) @@ -377,15 +378,14 @@ void Alter_table_ctx::report_implicit_default_value_error(THD *thd, static int process_start_alter(THD *thd, uint64 thread_id) { //No Slave, Normal Slave, Start Alter under Worker 1 will simple binlog and exit - if(!thd->slave_thread || !thd->rpt - ||((thd->rpt == thd->rgi_slave->parallel_entry->rpl_threads[0]) && - thd->rpt->current_owner && (thd->rpt->current_owner == - thd->rgi_slave->parallel_entry->rpl_threads))) + if(!thd->slave_thread || !thd->rpt || thd->rgi_slave->reserved_start_alter_thread) { /* We will just write the binlog and move to next event , because COMMIT Alter will take care of actual work */ + if (thd->rgi_slave) + thd->rgi_slave->reserved_start_alter_thread= false; if (write_bin_log(thd, false, thd->query(), thd->query_length())) return START_ALTER_ERROR; return START_ALTER_SKIP; @@ -397,6 +397,10 @@ static int process_start_alter(THD *thd, uint64 thread_id) static int process_commit_alter(THD *thd, uint64 thread_id) { DBUG_ASSERT(thd->rgi_slave); + DBUG_EXECUTE_IF("rpl_slave_stop_CA", { + debug_sync_set_action(thd, + STRING_WITH_LEN("now signal CA_1_processing WAIT_FOR proceed_CA_1")); + }); thd->gtid_flags3|= Gtid_log_event::FL_START_ALTER_E1; Master_info *mi= thd->rgi_slave->rli->mi; start_alter_info *info=NULL; diff --git a/sql/sql_table.cc b/sql/sql_table.cc index a922d35e99d..7b554572dfe 100644 --- a/sql/sql_table.cc +++ b/sql/sql_table.cc @@ -7651,9 +7651,12 @@ static int mysql_inplace_alter_table(THD *thd, } if (write_start_alter(thd, partial_alter, info)) DBUG_RETURN(true); - DBUG_EXECUTE_IF("start_alter_delay_slave", { + DBUG_EXECUTE_IF("start_alter_delay_slave", { my_sleep(5000000); }); + DBUG_EXECUTE_IF("start_alter_kill", { + DBUG_SUICIDE(); + }); /* Upgrade to SHARED_NO_WRITE lock if: @@ -7672,7 +7675,7 @@ static int mysql_inplace_alter_table(THD *thd, // It's now safe to take the table level lock. if (lock_tables(thd, table_list, alter_ctx->tables_opened, 0)) goto cleanup; - DBUG_EXECUTE_IF("start_alter_delay_master", { + DBUG_EXECUTE_IF("start_alter_delay_master", { my_sleep(5000000); }); DEBUG_SYNC(thd, "alter_table_inplace_after_lock_upgrade"); @@ -9422,10 +9425,12 @@ static bool write_start_alter(THD *thd, bool* partial_alter, start_alter_info *i mysql_mutex_unlock(&mi->start_alter_list_lock); mysql_cond_broadcast(&mi->start_alter_list_cond); thd->start_alter_ev->update_pos(thd->rgi_slave); + thd->rgi_slave->commit_orderer.wait_for_prior_commit(thd); thd->rgi_slave->mark_start_commit(); thd->wakeup_subsequent_commits(0); //Finish event group thd->rpt->__finish_event_group(thd->rgi_slave); + thd->rgi_slave->finish_event_group_called= true; if (thd->slave_shutdown) return true; return false; @@ -9461,6 +9466,34 @@ start_alter_info *get_new_start_alter_info(THD *thd) // info->start_alter_cond.m_psi= NULL; return info; } +//Will happen if there is restart in between +void check_and_remove_duplicate_alter(Master_info *mi, char *table_name) +{ + start_alter_info *info; + mysql_mutex_lock(&mi->start_alter_list_lock); + List_iterator<start_alter_info> info_iterator(mi->start_alter_list); + while ((info= info_iterator++)) + { + if(info->state == start_alter_state::REGISTERED && + !my_strcasecmp(system_charset_info, info->table_name, table_name)) + { + info_iterator.remove(); + mysql_mutex_lock(&mi->start_alter_lock); + info->state= start_alter_state::ROLLBACK_ALTER; + mysql_mutex_unlock(&mi->start_alter_lock); + mysql_cond_broadcast(&info->start_alter_cond); + mysql_mutex_lock(&mi->start_alter_lock); + while(info->state != start_alter_state::COMMITTED ) + mysql_cond_wait(&info->start_alter_cond, &mi->start_alter_lock); + mysql_mutex_unlock(&mi->start_alter_lock); + mysql_cond_destroy(&info->start_alter_cond); + my_free(info); + break; + } + } + mysql_mutex_unlock(&mi->start_alter_list_lock); + return; +} /** Alter table @@ -9569,6 +9602,12 @@ bool mysql_alter_table(THD *thd, const LEX_CSTRING *new_db, thd->open_options|= HA_OPEN_FOR_ALTER; thd->mdl_backup_ticket= 0; + char *complete_table_name= (char *)thd->alloc(table_list->table_name.length + + new_db->length + 2); + sprintf(complete_table_name,"%s.%s",table_list->table_name.str, new_db->str); + info->table_name= complete_table_name; + if (mi) + check_and_remove_duplicate_alter(mi, complete_table_name); bool error= open_tables(thd, &table_list, &tables_opened, 0, &alter_prelocking_strategy); thd->open_options&= ~HA_OPEN_FOR_ALTER; @@ -10305,7 +10344,7 @@ do_continue:; //If issues by binlog/master complete the prepare phase of alter and then commit if (write_start_alter(thd, &partial_alter, info)) DBUG_RETURN(true); - DBUG_EXECUTE_IF("start_alter_delay_master", { + DBUG_EXECUTE_IF("start_alter_delay_master", { my_sleep(5000000); }); // It's now safe to take the table level lock. @@ -10422,7 +10461,7 @@ do_continue:; if(write_bin_log(thd, false, send_query, strlen(send_query))) DBUG_RETURN(true); } - else if(start_alter_id) + else if(start_alter_id && !thd->direct_commit_alter) { //if(write_bin_log(thd, FALSE, send_query, strlen(send_query), true)) // DBUG_RETURN(true); @@ -10631,7 +10670,7 @@ end_inplace: if(write_bin_log(thd, false, send_query, strlen(send_query))) DBUG_RETURN(true); } - else if(start_alter_id) + else if(start_alter_id && !thd->direct_commit_alter) { //if(write_bin_log(thd, FALSE, send_query, strlen(send_query), true)) // DBUG_RETURN(true); |