summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSachin <sachin.setiya@mariadb.com>2020-05-12 17:00:26 +0530
committerSachin <sachin.setiya@mariadb.com>2020-05-12 17:00:26 +0530
commit7c6fe2289098566b607891cf831bf3affeedeffb (patch)
tree7965d2e78fced89d829887f9a30d32e61d65b138
parent423283350fb94f8fe8b704a7a028f251df401a4f (diff)
downloadmariadb-git-10.5-olter-v3.tar.gz
completed10.5-olter-v3
-rw-r--r--mysql-test/suite/rpl/t/r_ttt.test10
-rw-r--r--mysql-test/suite/rpl/t/r_ttt3.test49
-rw-r--r--mysql-test/suite/rpl/t/rpl_start_alter_shutdown_1.test47
-rw-r--r--mysql-test/suite/rpl/t/rpl_start_alter_shutdown_2.test69
-rw-r--r--sql/rpl_mi.h1
-rw-r--r--sql/rpl_parallel.cc180
-rw-r--r--sql/rpl_parallel.h12
-rw-r--r--sql/rpl_rli.cc4
-rw-r--r--sql/rpl_rli.h2
-rw-r--r--sql/slave.cc52
-rw-r--r--sql/sql_alter.cc12
-rw-r--r--sql/sql_table.cc49
12 files changed, 338 insertions, 149 deletions
diff --git a/mysql-test/suite/rpl/t/r_ttt.test b/mysql-test/suite/rpl/t/r_ttt.test
index 761b30627f7..eb54108a1fd 100644
--- a/mysql-test/suite/rpl/t/r_ttt.test
+++ b/mysql-test/suite/rpl/t/r_ttt.test
@@ -3,10 +3,9 @@
--source include/master-slave.inc
--connection slave
-call mtr.add_suppression("Warning: 72 bytes lost at .*");
set global debug_dbug="+d,start_alter_delay_slave";
stop slave;
-SET GLOBAL slave_parallel_threads=10;
+SET GLOBAL slave_parallel_threads=2;
set global slave_parallel_mode=optimistic;
change master to master_use_gtid=slave_pos;
start slave;
@@ -61,13 +60,13 @@ set gtid_domain_id=22;
set gtid_domain_id=22;
--send alter table t3 add column c int, force, algorithm=inplace;
--connection con4
-set gtid_domain_id=24;
+set gtid_domain_id=23;
--send alter table t4 add column c int, force, algorithm=inplace;
--connection con5
-set gtid_domain_id=24;
+set gtid_domain_id=23;
--send alter table t5 add column c int, force, algorithm=inplace;
--connection con6
-set gtid_domain_id=24;
+set gtid_domain_id=23;
--send alter table t6 add column c int, force, algorithm=inplace;
--connection con1
@@ -115,6 +114,7 @@ select @@gtid_slave_pos;
--connection master
--sync_slave_with_master
select @@gtid_slave_pos;
+show binlog events;
--connection master
drop table t1,t2,t3,t4,t5,t6;
diff --git a/mysql-test/suite/rpl/t/r_ttt3.test b/mysql-test/suite/rpl/t/r_ttt3.test
new file mode 100644
index 00000000000..994e3807499
--- /dev/null
+++ b/mysql-test/suite/rpl/t/r_ttt3.test
@@ -0,0 +1,49 @@
+--source include/have_binlog_format_row.inc
+--source include/have_innodb.inc
+--source include/master-slave.inc
+
+--connection slave
+stop slave;
+SET GLOBAL slave_parallel_threads=1;
+set global slave_parallel_mode=optimistic;
+set global debug_dbug="+d,start_alter_delay_slave";
+change master to master_use_gtid= current_pos;
+start slave;
+
+
+--connection master
+set global debug_dbug="+d,start_alter_delay_master";
+set global binlog_split_alter=true;
+create table t1( a int primary key, b int) engine=innodb;
+connect (con1,localhost,root,,);
+connect (con2,localhost,root,,);
+connect (con3,localhost,root,,);
+connect (con4,localhost,root,,);
+alter table t1 add column c int, force, algorithm=inplace;
+
+#--source include/kill_mysqld.inc
+#--let $rpl_server_number= 1
+#--source include/rpl_stop_server.inc
+create table t3( a int primary key, b int) engine=innodb;
+
+show binlog events;
+
+--sleep 5
+--connection slave
+show binlog events;
+
+--connection master
+--sync_slave_with_master
+show binlog events;
+show create table t1;
+
+--connection master
+drop table t1,t3;
+set global binlog_split_alter= false;
+
+--connection slave
+stop slave;
+SET GLOBAL slave_parallel_threads=0;
+set global slave_parallel_mode=conservative;
+start slave;
+--source include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/t/rpl_start_alter_shutdown_1.test b/mysql-test/suite/rpl/t/rpl_start_alter_shutdown_1.test
index 7df2ea41fd2..f576f7376f0 100644
--- a/mysql-test/suite/rpl/t/rpl_start_alter_shutdown_1.test
+++ b/mysql-test/suite/rpl/t/rpl_start_alter_shutdown_1.test
@@ -7,7 +7,7 @@
SET @old_debug_slave= @@global.debug;
set global debug_dbug="+d,start_alter_delay_slave";
stop slave;
-SET GLOBAL slave_parallel_threads=10;
+SET GLOBAL slave_parallel_threads=4;
set global slave_parallel_mode=optimistic;
start slave;
@@ -19,6 +19,7 @@ start slave;
SET @old_debug_master= @@global.debug;
set global debug_dbug="+d,start_alter_delay_master";
set global binlog_split_alter=true;
+--disable_parsing
create table t1( a int primary key, b int) engine=myisam;
create table t2( a int primary key, b int) engine=myisam;
connect (con1,localhost,root,,);
@@ -28,29 +29,59 @@ connect (con2,localhost,root,,);
--connection con2
--send alter table t2 add column c int;
+
#Shutdown Slave
---connection slave
-select master_gtid_wait('0-1-4');
-select @@gtid_binlog_state;
-select @@gtid_slave_pos;
---source include/stop_slave.inc
+#--connection slave
+#select master_gtid_wait('0-1-4');
+#select @@gtid_binlog_state;
+#select @@gtid_slave_pos;
+#--source include/stop_slave.inc
--connection con1
--reap
--connection con2
--reap
+--enable_parsing
create table t3( a int primary key, b int) engine=innodb;
+#--connection slave
+#--source include/start_slave.inc
+--connection master
+--sync_slave_with_master
+show create table t3;
+--connection slave
+--source include/stop_slave.inc
+
+
+--connection master
+--exec echo "restart" > $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
+SET SESSION debug_dbug="d,start_alter_kill";
+--error 2013
+alter table t3 add column d int;
+
+--let $rpl_server_number= 1
+--source include/rpl_reconnect.inc
+set global binlog_split_alter= true;
+alter table t3 add column d int;
+show create table t3;
+flush logs;
+show binlog events;
+show binlog events in 'master-bin.000002';
+show binlog events in 'master-bin.000003';
+
--connection slave
--source include/start_slave.inc
+
--connection master
--sync_slave_with_master
-show create table t1;
+show binlog events;
+
--connection master
SET GLOBAL debug_dbug= @old_debug_master;
-drop table t1,t2,t3;
+#drop table t1,t2,t3;
+drop table t3;
set global binlog_split_alter= false;
--sync_slave_with_master
diff --git a/mysql-test/suite/rpl/t/rpl_start_alter_shutdown_2.test b/mysql-test/suite/rpl/t/rpl_start_alter_shutdown_2.test
new file mode 100644
index 00000000000..0b19f9520e2
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_start_alter_shutdown_2.test
@@ -0,0 +1,69 @@
+--source include/have_log_bin.inc
+--source include/have_innodb.inc
+--source include/master-slave.inc
+--source include/have_debug.inc
+
+--connection slave
+SET @old_debug_slave= @@global.debug;
+set global debug_dbug="+d,rpl_slave_stop_CA";
+stop slave;
+SET GLOBAL slave_parallel_threads=4;
+set global slave_parallel_mode=optimistic;
+start slave;
+
+#
+# SLAVE Shutdown
+#
+#
+--connection master
+SET @old_debug_master= @@global.debug;
+set global debug_dbug="+d,start_alter_delay_master";
+set global binlog_split_alter=true;
+create table t1( a int primary key, b int) engine=myisam;
+create table t2( a int primary key, b int) engine=myisam;
+connect (con1,localhost,root,,);
+connect (con2,localhost,root,,);
+--connection con1
+--send alter table t1 add column c int;
+--connection con2
+--send alter table t2 add column c int;
+
+
+
+--connection con1
+--reap
+--connection con2
+--reap
+create table t3( a int primary key, b int) engine=innodb;
+show binlog events;
+
+#Stop Slave
+# Master binlog SA SA CA CA
+# lets stop at first CA processing (in process_commit_alter)
+--connection slave
+set debug_sync="now wait_for CA_1_processing";
+connect(extra_slave,127.0.0.1,root,,test,$SLAVE_MYPORT);
+--send stop slave;
+--connection slave
+set debug_sync="now signal proceed_CA_1";
+--connection extra_slave
+--reap
+SET GLOBAL debug_dbug= @old_debug_slave;
+show binlog events;
+
+--connection slave
+--source include/start_slave.inc
+--connection master
+--sync_slave_with_master
+show binlog events;
+
+--connection master
+drop table t1,t2,t3;
+set global binlog_split_alter= false;
+
+--sync_slave_with_master
+stop slave;
+SET GLOBAL slave_parallel_threads=0;
+set global slave_parallel_mode=conservative;
+start slave;
+--source include/rpl_end.inc
diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h
index e43b76569b6..025e1bd7e96 100644
--- a/sql/rpl_mi.h
+++ b/sql/rpl_mi.h
@@ -165,6 +165,7 @@ struct start_alter_info
TODO maybe used later ?
*/
uint error;
+ char* table_name;
enum start_alter_state state;
/* We are not using mysql_cond_t because we do not need PSI */
mysql_cond_t start_alter_cond;
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 407787bde37..9878c3d9346 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -1377,10 +1377,13 @@ handle_rpl_parallel_thread(void *arg)
if (end_of_group )
{
in_event_group= false;
- if (!(rgi->gtid_ev_flags3 & Gtid_log_event::FL_START_ALTER_E1) ||
- //TODO Do I need more strict check
- (thd->rpt == thd->rgi_slave->parallel_entry->rpl_threads[0]))
+ //TODO
+ if (1)
+ //if (!rgi->finish_event_group_called)
+ {
finish_event_group(rpt, event_gtid_sub_id, entry, rgi);
+ rgi->finish_event_group_called= false;
+ }
rpt->loc_free_rgi(rgi);
thd->rgi_slave= group_rgi= rgi= NULL;
skip_event_group= false;
@@ -1430,7 +1433,7 @@ handle_rpl_parallel_thread(void *arg)
thd->rgi_slave= group_rgi= NULL;
skip_event_group= false;
}
- if (!in_event_group && !rpt->current_start_alter_id)
+ if (!in_event_group)
{
/* If we are in a FLUSH TABLES FOR READ LOCK, wait for it */
while (rpt->current_entry && rpt->pause_for_ftwrl)
@@ -1662,6 +1665,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
mysql_mutex_lock(&pool->threads[i]->LOCK_rpl_thread);
pool->threads[i]->delay_start= false;
pool->threads[i]->current_start_alter_id= 0;
+ pool->threads[i]->last_SA_rgi= NULL;
mysql_cond_signal(&pool->threads[i]->COND_rpl_thread);
while (!pool->threads[i]->running)
mysql_cond_wait(&pool->threads[i]->COND_rpl_thread,
@@ -1992,7 +1996,8 @@ void rpl_parallel_thread::__finish_event_group(rpl_group_info *group_rgi)
}
rpl_parallel_thread_pool::rpl_parallel_thread_pool()
- : threads(0), free_list(0), count(0), inited(false), busy(false)
+ : threads(0), free_list(0), count(0), inited(false), current_start_alters(0),
+ busy(false)
{
}
@@ -2041,13 +2046,24 @@ struct rpl_parallel_thread *
rpl_parallel_thread_pool::get_thread(rpl_parallel_thread **owner,
rpl_parallel_entry *entry)
{
+ rpl_parallel_thread *rpt;
+
+ DBUG_ASSERT(count > 0);
mysql_mutex_lock(&LOCK_rpl_thread_pool);
- return get_thread_locked(owner, entry);
+ while (unlikely(busy) || !(rpt= free_list))
+ mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool);
+ free_list= rpt->next;
+ mysql_mutex_unlock(&LOCK_rpl_thread_pool);
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ rpt->current_owner= owner;
+ rpt->current_entry= entry;
+
+ return rpt;
}
//We call this when we already have acquired LOCK_rpl_thread_pool mutex
struct rpl_parallel_thread *
-rpl_parallel_thread_pool::get_thread_locked(rpl_parallel_thread **owner,
+rpl_parallel_thread_pool::get_thread_split_alter(rpl_parallel_thread **owner,
rpl_parallel_entry *entry)
{
rpl_parallel_thread *rpt;
@@ -2056,7 +2072,6 @@ rpl_parallel_thread_pool::get_thread_locked(rpl_parallel_thread **owner,
while (unlikely(busy) || !(rpt= free_list))
mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool);
free_list= rpt->next;
- mysql_mutex_unlock(&LOCK_rpl_thread_pool);
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
rpt->current_owner= owner;
rpt->current_entry= entry;
@@ -2087,35 +2102,25 @@ rpl_parallel_thread_pool::release_thread(rpl_parallel_thread *rpt)
static void get_split_alter_thread_id_part_1(rpl_parallel_thread **rpl_threads,
uint16 flags3, uint32 *idx,
- uint32 rpl_thread_max)
+ uint32 rpl_thread_max, rpl_parallel_entry *e)
{
- /*
- Commit alter/Rollback alter will always go to worker 1, to avoid deadlocks
- */
- if (flags3 & (Gtid_log_event::FL_COMMIT_ALTER_E1 |
- Gtid_log_event::FL_ROLLBACK_ALTER_E1 ))
+ for(uint i= 0; i < rpl_thread_max; i++)
{
- *idx= 0;
- }
- else
- {
- //Start with idx (> 0 ) or 1 (idx = 0)
- for(uint i= *idx ? *idx : 1; i < rpl_thread_max; i++)
+ if (!rpl_threads[i] || rpl_threads[i]->current_owner != &rpl_threads[i]
+ || !rpl_threads[i]->current_start_alter_id)
{
- if (!rpl_threads[i] || rpl_threads[i]->current_owner != &rpl_threads[i]
- || !rpl_threads[i]->current_start_alter_id)
- {
- *idx= i;
- return;
- }
+ //This condition will hit atleast one time no matter what happens
+ //worker per Domain id can have issues
+ //But easier to fix We will use some logic like count for rpl_entry
+ *idx= i;
+ return;
}
- //No worker is free , treat as a just a binlog start alter
- *idx= 0;
}
return;
}
//TODO Change Signature
-static void get_split_alter_thread_id_part_2(rpl_parallel_thread **rpl_threads,
+//will return corrosponding SA id for CA/RA
+static int get_split_alter_thread_id_part_2(rpl_parallel_thread **rpl_threads,
uint16 flags3, uint32 rpl_thread_max,
uint32 rpt_index, uint64 thread_id,
rpl_parallel_entry *e)
@@ -2135,32 +2140,30 @@ static void get_split_alter_thread_id_part_2(rpl_parallel_thread **rpl_threads,
Gtid_log_event::FL_ROLLBACK_ALTER_E1 ))
{
//Free the corrosponding rpt current_start_alter_id
- for(uint i= 1; i < rpl_thread_max; i++)
+ for(uint i= 0; i < rpl_thread_max; i++)
{
- //If we fail in this condition(after whole for) that means start alter
- //is assigned to worker 1
if(rpl_threads[i] && rpl_threads[i]->current_start_alter_id == (int64)thread_id)
{
mysql_mutex_lock(&rpl_threads[i]->LOCK_rpl_thread);
rpl_threads[i]->current_start_alter_id= 0;
mysql_mutex_unlock(&rpl_threads[i]->LOCK_rpl_thread);
- e->pending_start_alter--;
- break;
+ sql_print_information("Setiya part 2 inside Commit alter pool_worker_id= %d alter_id= %d, entry->rpl_threads_index= %d Commit ",
+ rpl_threads[rpt_index] - *global_rpl_thread_pool.threads, thread_id, rpt_index);
+ return i;
}
}
- sql_print_information("Setiya part 2 inside Commit alter pool_worker_id= %d alter_id= %d, entry->rpl_threads_index= %d Commit \n",
+ sql_print_information("Setiya part 2 inside Commit alter pool_worker_id= %d alter_id= %d, entry->rpl_threads_index= %d Commit ",
rpl_threads[rpt_index] - *global_rpl_thread_pool.threads, thread_id, rpt_index);
}
- else if ((flags3 & Gtid_log_event::FL_START_ALTER_E1) && rpt_index)
+ else if ((flags3 & Gtid_log_event::FL_START_ALTER_E1))
{
mysql_mutex_lock(&rpl_threads[rpt_index]->LOCK_rpl_thread);
rpl_threads[rpt_index]->current_start_alter_id= thread_id;
mysql_mutex_unlock(&rpl_threads[rpt_index]->LOCK_rpl_thread);
- sql_print_information("Setiya part 2 inside Start alter pool_worker_id= %d alter_id= %d, entry->rpl_threads_index= %d Start \n",
+ sql_print_information("Setiya part 2 inside Start alter pool_worker_id= %d alter_id= %d, entry->rpl_threads_index= %d Start ",
rpl_threads[rpt_index] - *global_rpl_thread_pool.threads, thread_id, rpt_index);
}
-
- return;
+ return -1;
}
@@ -2185,18 +2188,18 @@ static void get_split_alter_thread_id_part_2(rpl_parallel_thread **rpl_threads,
and the LOCK_rpl_thread must be released with THD::EXIT_COND() instead
of mysql_mutex_unlock.
- If the flag `reuse' is set, the last worker thread will be returned again,
+ If typ != GTID_EVENT, the last worker thread will be returned again,
if it is still available. Otherwise a new worker thread is allocated.
*/
rpl_parallel_thread *
rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
- PSI_stage_info *old_stage, bool reuse, uint64
- thread_id)
+ PSI_stage_info *old_stage, enum Log_event_type typ,
+ uint64 thread_id)
{
uint32 idx;
idx= rpl_thread_idx;
- if (!reuse)
+ if (typ == GTID_EVENT)
{
++idx;
if (idx >= rpl_thread_max)
@@ -2207,67 +2210,39 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
if (rgi->gtid_ev_flags3 & (Gtid_log_event::FL_START_ALTER_E1 |
Gtid_log_event::FL_COMMIT_ALTER_E1 |
Gtid_log_event::FL_ROLLBACK_ALTER_E1 ))
- {
get_split_alter_thread_id_part_1(rpl_threads, rgi->gtid_ev_flags3,
- &idx, rpl_thread_max);
- //Reserve worker for Commit/Rollback
- if (idx != 0)
+ &idx, rpl_thread_max, this);
+ rpl_thread_idx= idx;
+ }
+ else if (typ == QUERY_EVENT && rgi->gtid_ev_flags3 &
+ (Gtid_log_event::FL_START_ALTER_E1 |
+ Gtid_log_event::FL_COMMIT_ALTER_E1 |
+ Gtid_log_event::FL_ROLLBACK_ALTER_E1 ))
+ {
+ int SA_idx= get_split_alter_thread_id_part_2(rpl_threads, rgi->gtid_ev_flags3,
+ rpl_thread_max, idx, thread_id, this);
+
+ mysql_mutex_lock(&global_rpl_thread_pool.LOCK_rpl_thread_pool);
+ if (rgi->gtid_ev_flags3 & Gtid_log_event::FL_START_ALTER_E1)
+ {
+ if (pending_start_alters < rpl_thread_max - 1 &&
+ global_rpl_thread_pool.current_start_alters < global_rpl_thread_pool.count - 1)
{
- //TODO without mutex lock , maybe wrong ?
- rpl_parallel_thread *t= rpl_threads[0];
- //Will happen in the case binlog Start Alter only
- if (!t || t->current_owner != &rpl_threads[0])
- {
- t= choose_thread_internal(rgi, did_enter_cond, old_stage, 0);
- mysql_mutex_unlock(&t->LOCK_rpl_thread);
- }
- t->current_start_alter_id= -1;
- pending_start_alter++;
+ pending_start_alters++;
+ global_rpl_thread_pool.current_start_alters++;
}
- else if (rgi->gtid_ev_flags3 & (Gtid_log_event::FL_COMMIT_ALTER_E1 |
- Gtid_log_event::FL_ROLLBACK_ALTER_E1 ))
+ else
{
- //TODO without mutex lock , maybe wrong ?
- rpl_parallel_thread *t= rpl_threads[0];
- //Will happen in the case binlog Start Alter only
- if (!t || t->current_owner != &rpl_threads[0])
- t= choose_thread_internal(rgi, did_enter_cond, old_stage, 0);
- else
- //If it is allocated by Start alter mutex is released
- mysql_mutex_lock(&t->LOCK_rpl_thread);
- rpl_thread_idx= 0;
- return t;
+ rpl_threads[idx]->last_SA_rgi->reserved_start_alter_thread= true;
+ rpl_threads[idx]->current_start_alter_id= 0;
}
}
- else if(pending_start_alter)
+ else if(SA_idx >= 0)
{
- //Interleaved DML, No special treatment need either just get the non SA
- //Worker or Worker_0
- get_split_alter_thread_id_part_1(rpl_threads, rgi->gtid_ev_flags3,
- &idx, rpl_thread_max);
- sql_print_information("Setiya DML pool_worker_id= %d, entry->rpl_threads_index= %d DML \n",
- rpl_threads[idx] - *global_rpl_thread_pool.threads, idx);
+ pending_start_alters--;
+ global_rpl_thread_pool.current_start_alters--;
}
- sql_print_information("\n");
- sql_print_information("Setiya POOL INFO pending_start_alter = %d \n", pending_start_alter);
- for(uint k=0; k < rpl_thread_max; k++)
- {
- sql_print_information("Setiya POOL INFO SA ID pool_worker_id= %d, current_start_alter_id= %d \n",
- k, global_rpl_thread_pool.threads[k]->current_start_alter_id);
-
- }
- sql_print_information("\n");
-
- rpl_thread_idx= idx;
- }
- else if (rgi->gtid_ev_flags3 & (Gtid_log_event::FL_START_ALTER_E1 |
- Gtid_log_event::FL_COMMIT_ALTER_E1 |
- Gtid_log_event::FL_ROLLBACK_ALTER_E1 ))
- {
- get_split_alter_thread_id_part_2(rpl_threads, rgi->gtid_ev_flags3,
- rpl_thread_max, idx, thread_id, this);
- if (!pending_start_alter)
- rpl_threads[idx]->current_start_alter_id= 0; //Free for reuse for start alter
+ mysql_mutex_unlock(&global_rpl_thread_pool.LOCK_rpl_thread_pool);
}
return choose_thread_internal(rgi, did_enter_cond, old_stage, idx);
}
@@ -2417,7 +2392,8 @@ rpl_parallel::find(uint32 domain_id)
e->domain_id= domain_id;
e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX;
e->pause_sub_id= (uint64)ULONGLONG_MAX;
- e->pending_start_alter= 0;
+ e->pending_start_alters= 0;
+ e->start_alter_reserved= 0;
if (my_hash_insert(&domain_hash, (uchar *)e))
{
my_free(e);
@@ -2503,7 +2479,11 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli)
if ((rpt= e->rpl_threads[j]))
{
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
- while (rpt->current_owner == &e->rpl_threads[j])
+ //Dont wait for SA workers , But wait for CA/RA workers
+ //If CA/RA is executed that means corresponding SA is also executed
+ //And remaning SA will never recieve CA/RA so we have to manualy send it
+ while (rpt->current_owner == &e->rpl_threads[j] &&
+ !(rpt->thd->rgi_slave->gtid_ev_flags3 & Gtid_log_event::FL_START_ALTER_E1))
mysql_cond_wait(&rpt->COND_rpl_thread_stop, &rpt->LOCK_rpl_thread);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
}
@@ -2878,6 +2858,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
delete_or_keep_event_post_apply(serial_rgi, typ, ev);
return 0;
}
+ sql_print_information("Setiya Domain=%d Seq_no= %d ",(int)gtid_ev->domain_id, (int)gtid_ev->seq_no);
}
else
e= current;
@@ -2897,7 +2878,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
*/
cur_thread=
e->choose_thread(serial_rgi, &did_enter_cond, &old_stage,
- typ != GTID_EVENT, thread_id);
+ typ, thread_id);
if (!cur_thread)
{
/* This means we were killed. The error is already signalled. */
@@ -2947,6 +2928,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
*/
rgi->wait_commit_sub_id= e->current_sub_id;
rgi->wait_commit_group_info= e->current_group_info;
+ cur_thread->last_SA_rgi= rgi;
speculation= rpl_group_info::SPECULATE_NO;
new_gco= true;
diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h
index f52f28ae5e9..2bca3f5ff77 100644
--- a/sql/rpl_parallel.h
+++ b/sql/rpl_parallel.h
@@ -144,6 +144,8 @@ struct rpl_parallel_thread {
size_t event_size;
} *event_queue, *last_in_queue;
uint64 queued_size;
+ // SA rgi
+ rpl_group_info *last_SA_rgi;
/* These free lists are protected by LOCK_rpl_thread. */
queued_event *qev_free_list;
rpl_group_info *rgi_free_list;
@@ -244,6 +246,9 @@ struct rpl_parallel_thread_pool {
mysql_cond_t COND_rpl_thread_pool;
uint32 count;
bool inited;
+ //Please lock first LOCK_rpl_thread_pool and then LOCK_rpl_thread to
+ //update this variable.
+ uint32 current_start_alters;
/*
While FTWRL runs, this counter is incremented to make SQL thread or
STOP/START slave not try to start new activity while that operation
@@ -256,7 +261,7 @@ struct rpl_parallel_thread_pool {
void destroy();
struct rpl_parallel_thread *get_thread(rpl_parallel_thread **owner,
rpl_parallel_entry *entry);
- struct rpl_parallel_thread *get_thread_locked(rpl_parallel_thread **owner,
+ struct rpl_parallel_thread *get_thread_split_alter(rpl_parallel_thread **owner,
rpl_parallel_entry *entry);
void release_thread(rpl_parallel_thread *rpt);
};
@@ -273,7 +278,8 @@ struct rpl_parallel_entry {
*/
uint32 need_sub_id_signal;
uint64 last_commit_id;
- uint32 pending_start_alter;
+ uint32 pending_start_alters;
+ uint32 start_alter_reserved;
bool active;
/*
Set when SQL thread is shutting down, and no more events can be processed,
@@ -360,7 +366,7 @@ struct rpl_parallel_entry {
group_commit_orderer *current_gco;
rpl_parallel_thread * choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
- PSI_stage_info *old_stage, bool reuse,
+ PSI_stage_info *old_stage, enum Log_event_type typ,
uint64 thread_id);
rpl_parallel_thread * choose_thread_internal(rpl_group_info *rgi, bool *did_enter_cond,
PSI_stage_info *old_stage, uint32 idx);
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index fc2f89c5876..21d2b645eff 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -2094,6 +2094,7 @@ rpl_group_info::reinit(Relay_log_info *rli)
long_find_row_note_printed= false;
did_mark_start_commit= false;
gtid_ev_flags2= 0;
+ gtid_ev_flags3= 0;
last_master_timestamp = 0;
gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL;
speculation= SPECULATE_NO;
@@ -2104,7 +2105,8 @@ rpl_group_info::rpl_group_info(Relay_log_info *rli)
: thd(0), wait_commit_sub_id(0),
wait_commit_group_info(0), parallel_entry(0),
deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false),
- gtid_ev_flags2(0), gtid_ev_flags3(0)
+ gtid_ev_flags2(0), gtid_ev_flags3(0), reserved_start_alter_thread(0),
+ finish_event_group_called(0)
{
reinit(rli);
bzero(&current_gtid, sizeof(current_gtid));
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 06f40a8df97..af8997b9830 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -809,6 +809,8 @@ struct rpl_group_info
RETRY_KILL_KILLED
};
uchar killed_for_retry;
+ bool reserved_start_alter_thread;
+ bool finish_event_group_called;
rpl_group_info(Relay_log_info *rli_);
~rpl_group_info();
diff --git a/sql/slave.cc b/sql/slave.cc
index 54717536dfd..ed1e3d11a90 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -5657,7 +5657,34 @@ pthread_handler_t handle_slave_sql(void *arg)
err:
if (mi->using_parallel())
+ {
rli->parallel.wait_for_done(thd, rli);
+ /*
+ shutdown the alter threads waiting on C/R ALter
+ What if worker thread has not registered its start alter in alter_list ?
+ In that case rpt->stop
+ TODO sideeffects
+ */
+ start_alter_info *info=NULL;
+ mysql_mutex_lock(&mi->start_alter_list_lock);
+ List_iterator<start_alter_info> info_iterator(mi->start_alter_list);
+ while ((info= info_iterator++))
+ {
+ mysql_mutex_lock(&mi->start_alter_lock);
+ info->state= start_alter_state::ROLLBACK_ALTER;
+ mysql_cond_broadcast(&info->start_alter_cond);
+ mysql_mutex_unlock(&mi->start_alter_lock);
+ mysql_mutex_lock(&mi->start_alter_lock);
+ while(info->state == start_alter_state::ROLLBACK_ALTER)
+ mysql_cond_wait(&info->start_alter_cond, &mi->start_alter_lock);
+ mysql_mutex_unlock(&mi->start_alter_lock);
+ DBUG_ASSERT(info->state == start_alter_state::COMMITTED);
+ info_iterator.remove();
+ mysql_cond_destroy(&info->start_alter_cond);
+ my_free(info);
+ }
+ mysql_mutex_unlock(&mi->start_alter_list_lock);
+ }
/* Thread stopped. Print the current replication position to the log */
{
@@ -5756,35 +5783,12 @@ err_during_init:
/* We need data_lock, at least to wake up any waiting master_pos_wait() */
mysql_mutex_lock(&rli->data_lock);
DBUG_ASSERT(rli->slave_running == MYSQL_SLAVE_RUN_NOT_CONNECT); // tracking buffer overrun
- /*
- shutdown the alter threads waiting on C/R ALter
- What if worker thread has not registered its start alter in alter_list ?
- In that case rpt->stop
- TODO sideeffects
- */
- start_alter_info *info=NULL;
- List_iterator<start_alter_info> info_iterator(mi->start_alter_list);
- while ((info= info_iterator++))
- {
- mysql_mutex_lock(&mi->start_alter_lock);
- info->state= start_alter_state::ROLLBACK_ALTER;
- mysql_cond_broadcast(&info->start_alter_cond);
- mysql_mutex_unlock(&mi->start_alter_lock);
- mysql_mutex_lock(&mi->start_alter_lock);
- while(info->state == start_alter_state::ROLLBACK_ALTER)
- mysql_cond_wait(&info->start_alter_cond, &mi->start_alter_lock);
- mysql_mutex_unlock(&mi->start_alter_lock);
- DBUG_ASSERT(info->state == start_alter_state::COMMITTED);
- info_iterator.remove();
- mysql_cond_destroy(&info->start_alter_cond);
- my_free(info);
- }
/* When master_pos_wait() wakes up it will check this and terminate */
rli->slave_running= MYSQL_SLAVE_NOT_RUN;
/* Forget the relay log's format */
delete rli->relay_log.description_event_for_exec;
rli->relay_log.description_event_for_exec= 0;
- rli->reset_inuse_relaylog();
+ //rli->reset_inuse_relaylog();
/* Wake up master_pos_wait() */
mysql_mutex_unlock(&rli->data_lock);
DBUG_PRINT("info",("Signaling possibly waiting master_pos_wait() functions"));
diff --git a/sql/sql_alter.cc b/sql/sql_alter.cc
index 6cd8b0a5dae..6c7ac45d204 100644
--- a/sql/sql_alter.cc
+++ b/sql/sql_alter.cc
@@ -21,6 +21,7 @@
#include "sql_alter.h"
#include "rpl_mi.h"
#include "slave.h"
+#include "debug_sync.h"
#include "wsrep_mysqld.h"
Alter_info::Alter_info(const Alter_info &rhs, MEM_ROOT *mem_root)
@@ -377,15 +378,14 @@ void Alter_table_ctx::report_implicit_default_value_error(THD *thd,
static int process_start_alter(THD *thd, uint64 thread_id)
{
//No Slave, Normal Slave, Start Alter under Worker 1 will simple binlog and exit
- if(!thd->slave_thread || !thd->rpt
- ||((thd->rpt == thd->rgi_slave->parallel_entry->rpl_threads[0]) &&
- thd->rpt->current_owner && (thd->rpt->current_owner ==
- thd->rgi_slave->parallel_entry->rpl_threads)))
+ if(!thd->slave_thread || !thd->rpt || thd->rgi_slave->reserved_start_alter_thread)
{
/*
We will just write the binlog and move to next event , because COMMIT
Alter will take care of actual work
*/
+ if (thd->rgi_slave)
+ thd->rgi_slave->reserved_start_alter_thread= false;
if (write_bin_log(thd, false, thd->query(), thd->query_length()))
return START_ALTER_ERROR;
return START_ALTER_SKIP;
@@ -397,6 +397,10 @@ static int process_start_alter(THD *thd, uint64 thread_id)
static int process_commit_alter(THD *thd, uint64 thread_id)
{
DBUG_ASSERT(thd->rgi_slave);
+ DBUG_EXECUTE_IF("rpl_slave_stop_CA", {
+ debug_sync_set_action(thd,
+ STRING_WITH_LEN("now signal CA_1_processing WAIT_FOR proceed_CA_1"));
+ });
thd->gtid_flags3|= Gtid_log_event::FL_START_ALTER_E1;
Master_info *mi= thd->rgi_slave->rli->mi;
start_alter_info *info=NULL;
diff --git a/sql/sql_table.cc b/sql/sql_table.cc
index a922d35e99d..7b554572dfe 100644
--- a/sql/sql_table.cc
+++ b/sql/sql_table.cc
@@ -7651,9 +7651,12 @@ static int mysql_inplace_alter_table(THD *thd,
}
if (write_start_alter(thd, partial_alter, info))
DBUG_RETURN(true);
- DBUG_EXECUTE_IF("start_alter_delay_slave", {
+ DBUG_EXECUTE_IF("start_alter_delay_slave", {
my_sleep(5000000);
});
+ DBUG_EXECUTE_IF("start_alter_kill", {
+ DBUG_SUICIDE();
+ });
/*
Upgrade to SHARED_NO_WRITE lock if:
@@ -7672,7 +7675,7 @@ static int mysql_inplace_alter_table(THD *thd,
// It's now safe to take the table level lock.
if (lock_tables(thd, table_list, alter_ctx->tables_opened, 0))
goto cleanup;
- DBUG_EXECUTE_IF("start_alter_delay_master", {
+ DBUG_EXECUTE_IF("start_alter_delay_master", {
my_sleep(5000000);
});
DEBUG_SYNC(thd, "alter_table_inplace_after_lock_upgrade");
@@ -9422,10 +9425,12 @@ static bool write_start_alter(THD *thd, bool* partial_alter, start_alter_info *i
mysql_mutex_unlock(&mi->start_alter_list_lock);
mysql_cond_broadcast(&mi->start_alter_list_cond);
thd->start_alter_ev->update_pos(thd->rgi_slave);
+ thd->rgi_slave->commit_orderer.wait_for_prior_commit(thd);
thd->rgi_slave->mark_start_commit();
thd->wakeup_subsequent_commits(0);
//Finish event group
thd->rpt->__finish_event_group(thd->rgi_slave);
+ thd->rgi_slave->finish_event_group_called= true;
if (thd->slave_shutdown)
return true;
return false;
@@ -9461,6 +9466,34 @@ start_alter_info *get_new_start_alter_info(THD *thd)
// info->start_alter_cond.m_psi= NULL;
return info;
}
+//Will happen if there is restart in between
+void check_and_remove_duplicate_alter(Master_info *mi, char *table_name)
+{
+ start_alter_info *info;
+ mysql_mutex_lock(&mi->start_alter_list_lock);
+ List_iterator<start_alter_info> info_iterator(mi->start_alter_list);
+ while ((info= info_iterator++))
+ {
+ if(info->state == start_alter_state::REGISTERED &&
+ !my_strcasecmp(system_charset_info, info->table_name, table_name))
+ {
+ info_iterator.remove();
+ mysql_mutex_lock(&mi->start_alter_lock);
+ info->state= start_alter_state::ROLLBACK_ALTER;
+ mysql_mutex_unlock(&mi->start_alter_lock);
+ mysql_cond_broadcast(&info->start_alter_cond);
+ mysql_mutex_lock(&mi->start_alter_lock);
+ while(info->state != start_alter_state::COMMITTED )
+ mysql_cond_wait(&info->start_alter_cond, &mi->start_alter_lock);
+ mysql_mutex_unlock(&mi->start_alter_lock);
+ mysql_cond_destroy(&info->start_alter_cond);
+ my_free(info);
+ break;
+ }
+ }
+ mysql_mutex_unlock(&mi->start_alter_list_lock);
+ return;
+}
/**
Alter table
@@ -9569,6 +9602,12 @@ bool mysql_alter_table(THD *thd, const LEX_CSTRING *new_db,
thd->open_options|= HA_OPEN_FOR_ALTER;
thd->mdl_backup_ticket= 0;
+ char *complete_table_name= (char *)thd->alloc(table_list->table_name.length +
+ new_db->length + 2);
+ sprintf(complete_table_name,"%s.%s",table_list->table_name.str, new_db->str);
+ info->table_name= complete_table_name;
+ if (mi)
+ check_and_remove_duplicate_alter(mi, complete_table_name);
bool error= open_tables(thd, &table_list, &tables_opened, 0,
&alter_prelocking_strategy);
thd->open_options&= ~HA_OPEN_FOR_ALTER;
@@ -10305,7 +10344,7 @@ do_continue:;
//If issues by binlog/master complete the prepare phase of alter and then commit
if (write_start_alter(thd, &partial_alter, info))
DBUG_RETURN(true);
- DBUG_EXECUTE_IF("start_alter_delay_master", {
+ DBUG_EXECUTE_IF("start_alter_delay_master", {
my_sleep(5000000);
});
// It's now safe to take the table level lock.
@@ -10422,7 +10461,7 @@ do_continue:;
if(write_bin_log(thd, false, send_query, strlen(send_query)))
DBUG_RETURN(true);
}
- else if(start_alter_id)
+ else if(start_alter_id && !thd->direct_commit_alter)
{
//if(write_bin_log(thd, FALSE, send_query, strlen(send_query), true))
// DBUG_RETURN(true);
@@ -10631,7 +10670,7 @@ end_inplace:
if(write_bin_log(thd, false, send_query, strlen(send_query)))
DBUG_RETURN(true);
}
- else if(start_alter_id)
+ else if(start_alter_id && !thd->direct_commit_alter)
{
//if(write_bin_log(thd, FALSE, send_query, strlen(send_query), true))
// DBUG_RETURN(true);