diff options
author | Sachin <sachin.setiya@mariadb.com> | 2020-04-02 17:24:38 +0530 |
---|---|---|
committer | Sachin <sachin.setiya@mariadb.com> | 2020-04-02 17:24:38 +0530 |
commit | 8d30a9b9da0bc0878fe47ed0d2cc5bf284cebe31 (patch) | |
tree | 0bce8fc9211d1d7872527cced98d559364133880 | |
parent | 0a44d2581404e79f8a1eae0055f51ff46415a29c (diff) | |
download | mariadb-git-bb-10.5-oalter-rebase.tar.gz |
-rw-r--r-- | a | 1333 |
1 files changed, 1333 insertions, 0 deletions
@@ -0,0 +1,1333 @@ +commit 6da5b79183b972663a4fa5ad605f0381ad9b1d40 +Author: Sachin <sachin.setiya@mariadb.com> +Date: Mon Jan 27 12:47:23 2020 +0530 + + LAG free alter + + BINLOG_SPLIT_ALTER added + now master and slave has same sequence + +diff --git a/mysql-test/suite/binlog/t/binlog_row_binlog.test b/mysql-test/suite/binlog/t/binlog_row_binlog.test +index 14cc00a2326..3090b174911 100644 +--- a/mysql-test/suite/binlog/t/binlog_row_binlog.test ++++ b/mysql-test/suite/binlog/t/binlog_row_binlog.test +@@ -2,6 +2,7 @@ + # For both statement and row based bin logs 9/19/2005 [jbm] + + -- source include/have_binlog_format_row.inc ++--source include/have_innodb.inc + let collation=utf8_unicode_ci; + --source include/have_collation.inc + --source include/binlog.test +diff --git a/mysql-test/suite/rpl/t/r_ttt.test b/mysql-test/suite/rpl/t/r_ttt.test +new file mode 100644 +index 00000000000..e58947f5dcc +--- /dev/null ++++ b/mysql-test/suite/rpl/t/r_ttt.test +@@ -0,0 +1,90 @@ ++--source include/have_binlog_format_row.inc ++--source include/have_innodb.inc ++--source include/master-slave.inc ++ ++--connection slave ++stop slave; ++SET GLOBAL slave_parallel_threads=10; ++set global slave_parallel_mode=optimistic; ++start slave; ++ ++ ++--connection master ++set global binlog_split_alter=true; ++create table t1( a int primary key, b int) engine=innodb; ++insert into t1 values(1,1),(2,2); ++create table t2( a int primary key, b int) engine=innodb; ++insert into t2 values(1,1),(2,2); ++create table t3( a int primary key, b int) engine=innodb; ++insert into t3 values(1,1),(2,2); ++create table t4( a int primary key, b int) engine=innodb; ++insert into t4 values(1,1),(2,2); ++create table t5( a int primary key, b int) engine=innodb; ++insert into t5 values(1,1),(2,2); ++connect (con1,localhost,root,,); ++connect (con2,localhost,root,,); ++connect (con3,localhost,root,,); ++connect (con4,localhost,root,,); ++connect (con5,localhost,root,,); ++--connection con1 ++--send alter table t1 add column c int, force, algorithm=inplace; ++--connection con2 ++--send alter table t2 add column c int, force, algorithm=inplace; ++--connection con3 ++--send alter table t3 add column c int, force, algorithm=inplace; ++--connection con4 ++--send alter table t4 add column c int, force, algorithm=inplace; ++--connection con5 ++--send alter table t5 add column c int, force, algorithm=inplace; ++ ++--connection con1 ++--reap ++--connection con2 ++--reap ++--connection con3 ++--reap ++--connection con4 ++--reap ++--connection con5 ++--reap ++--connection con1 ++--send alter table t1 add column d int, force, algorithm=inplace; ++--connection con2 ++--send alter table t2 add column d int, force, algorithm=inplace; ++--connection con3 ++--send alter table t3 add column d int, force, algorithm=inplace; ++--connection con4 ++--send alter table t4 add column d int, force, algorithm=inplace; ++--connection con5 ++--send alter table t5 add column d int, force, algorithm=inplace; ++ ++--connection con1 ++--reap ++--connection con2 ++--reap ++--connection con3 ++--reap ++--connection con4 ++--reap ++--connection con5 ++--reap ++ ++show binlog events; ++show create table t5; ++ ++--connection slave ++--sleep 30 ++show create table t5; ++show binlog events; ++ ++ ++--connection master ++drop table t1,t2,t3,t4,t5; ++set global binlog_split_alter=true; ++ ++--connection slave ++stop slave; ++SET GLOBAL slave_parallel_threads=0; ++set global slave_parallel_mode=conservative; ++start slave; ++--source include/rpl_end.inc +diff --git a/mysql-test/suite/rpl/t/r_ttt2.test b/mysql-test/suite/rpl/t/r_ttt2.test +new file mode 100644 +index 00000000000..074e84811eb +--- /dev/null ++++ b/mysql-test/suite/rpl/t/r_ttt2.test +@@ -0,0 +1,43 @@ ++--source include/have_binlog_format_row.inc ++--source include/have_innodb.inc ++--source include/master-slave.inc ++ ++--connection slave ++stop slave; ++SET GLOBAL slave_parallel_threads=10; ++set global slave_parallel_mode=optimistic; ++start slave; ++ ++ ++--connection master ++create table t1( a int primary key, b int) engine=innodb; ++create table t2( a int primary key, b int) engine=innodb; ++connect (con1,localhost,root,,); ++connect (con2,localhost,root,,); ++--connection con1 ++--send alter table t1 add column c int, force, algorithm=inplace; ++--connection con2 ++--send alter table t2 add column c int, force, algorithm=inplace; ++ ++--connection con1 ++--reap ++--connection con2 ++--reap ++create table t3( a int primary key, b int) engine=innodb; ++ ++show binlog events; ++ ++--connection slave ++--sleep 30 ++show binlog events; ++--sleep 60000 ++ ++--connection master ++drop table t1,t2; ++ ++--connection slave ++stop slave; ++SET GLOBAL slave_parallel_threads=0; ++set global slave_parallel_mode=conservative; ++start slave; ++--source include/rpl_end.inc +diff --git a/sql/handler.cc b/sql/handler.cc +index 67abe2362a3..a1092c38f6f 100644 +--- a/sql/handler.cc ++++ b/sql/handler.cc +@@ -6610,6 +6610,8 @@ int handler::ha_write_row(const uchar *buf) + { + int error; + Log_func *log_func= Write_rows_log_event::binlog_row_logging_function; ++// if (table->in_use->slave_thread && !strcmp(table->alias.ptr(), "t1")) ++// my_sleep(50000000000); + DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE || + m_lock_type == F_WRLCK); + DBUG_ENTER("handler::ha_write_row"); +diff --git a/sql/handler.h b/sql/handler.h +index c751817f5f1..e238e6ed2e8 100644 +--- a/sql/handler.h ++++ b/sql/handler.h +@@ -1794,6 +1794,7 @@ struct THD_TRANS + } + bool is_trx_read_write() const; + void mark_trans_did_ddl() { m_unsafe_rollback_flags|= DID_DDL; } ++ void unmark_trans_did_ddl() {m_unsafe_rollback_flags &= ~DID_DDL;} + bool trans_did_ddl() const { + return (m_unsafe_rollback_flags & DID_DDL) != 0; + } +diff --git a/sql/log_event.cc b/sql/log_event.cc +index 0acc15f65f3..8ccd7defda3 100644 +--- a/sql/log_event.cc ++++ b/sql/log_event.cc +@@ -2539,7 +2539,8 @@ Binlog_checkpoint_log_event::Binlog_checkpoint_log_event( + + Gtid_log_event::Gtid_log_event(const char *buf, uint event_len, + const Format_description_log_event *description_event) +- : Log_event(buf, description_event), seq_no(0), commit_id(0) ++ : Log_event(buf, description_event), seq_no(0), commit_id(0), ++ flags3(0) + { + uint8 header_size= description_event->common_header_len; + uint8 post_header_len= description_event->post_header_len[GTID_EVENT-1]; +@@ -2563,6 +2564,11 @@ Gtid_log_event::Gtid_log_event(const char *buf, uint event_len, + ++buf; + commit_id= uint8korr(buf); + } ++ if (flags2 & FL_EXTRA_FLAG_1) ++ { ++ ++buf; ++ flags3= uint2korr(buf); ++ } + } + + +diff --git a/sql/log_event.h b/sql/log_event.h +index 88a6e06c506..852fb83841f 100644 +--- a/sql/log_event.h ++++ b/sql/log_event.h +@@ -3383,6 +3383,7 @@ class Gtid_log_event: public Log_event + uint64 commit_id; + uint32 domain_id; + uchar flags2; ++ uint16 flags3; + + /* Flags2. */ + +@@ -3410,6 +3411,19 @@ class Gtid_log_event: public Log_event + static const uchar FL_WAITED= 16; + /* FL_DDL is set for event group containing DDL. */ + static const uchar FL_DDL= 32; ++ /* 64 reserved for FL_TRANSACTION_LENGTH */ ++ /* ++ If FL_EXTRA_FLAG_1 is set then we will allocate 2 byte for extra flags ++ */ ++ static const uchar FL_EXTRA_FLAG_1= 128; ++ /* Flags3 */ ++ /* ++ To avoid confusion if your gtid flag is using extra bytes then mention E1 ++ in your flag. ++ For exam. FL_XYZ_E1 ++ Because in future we may need to allocate more space for flags. ++ */ ++ static const uint16 FL_START_ALTER_E1= 4; + + #ifdef MYSQL_SERVER + Gtid_log_event(THD *thd_arg, uint64 seq_no, uint32 domain_id, bool standalone, +diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc +index 04cf70984a2..f2f57bf4dcb 100644 +--- a/sql/log_event_server.cc ++++ b/sql/log_event_server.cc +@@ -3199,7 +3199,8 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg, + uint64 commit_id_arg) + : Log_event(thd_arg, flags_arg, is_transactional), + seq_no(seq_no_arg), commit_id(commit_id_arg), domain_id(domain_id_arg), +- flags2((standalone ? FL_STANDALONE : 0) | (commit_id_arg ? FL_GROUP_COMMIT_ID : 0)) ++ flags2((standalone ? FL_STANDALONE : 0) | (commit_id_arg ? FL_GROUP_COMMIT_ID : 0)), ++ flags3(0) + { + cache_type= Log_event::EVENT_NO_CACHE; + bool is_tmp_table= thd_arg->lex->stmt_accessed_temp_table(); +@@ -3218,6 +3219,12 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg, + /* Preserve any DDL or WAITED flag in the slave's binlog. */ + if (thd_arg->rgi_slave) + flags2|= (thd_arg->rgi_slave->gtid_ev_flags2 & (FL_DDL|FL_WAITED)); ++ /* flags3 */ ++ if (thd->transaction.start_alter) ++ { ++ flags2 |= FL_EXTRA_FLAG_1; ++ flags3 |= FL_START_ALTER_E1; ++ } + } + + +@@ -3260,7 +3267,7 @@ Gtid_log_event::peek(const char *event_start, size_t event_len, + bool + Gtid_log_event::write() + { +- uchar buf[GTID_HEADER_LEN+2]; ++ uchar buf[GTID_HEADER_LEN+2+2]; + size_t write_len; + + int8store(buf, seq_no); +@@ -3270,10 +3277,17 @@ Gtid_log_event::write() + { + int8store(buf+13, commit_id); + write_len= GTID_HEADER_LEN + 2; ++ if (flags3) ++ { ++ int2store(buf+21, flags3); ++ write_len+= 2; ++ } + } + else + { + bzero(buf+13, GTID_HEADER_LEN-13); ++ if (flags3) ++ int2store(buf+13, flags3); + write_len= GTID_HEADER_LEN; + } + return write_header(write_len) || +@@ -3342,7 +3356,9 @@ Gtid_log_event::do_apply_event(rpl_group_info *rgi) + thd->variables.server_id= this->server_id; + thd->variables.gtid_domain_id= this->domain_id; + thd->variables.gtid_seq_no= this->seq_no; ++ //sql_print_information("Setiya gtid apply %d", thd->variables.gtid_seq_no); + rgi->gtid_ev_flags2= flags2; ++ rgi->gtid_ev_flags3= flags3; + thd->reset_for_next_command(); + + if (opt_gtid_strict_mode && opt_bin_log && opt_log_slave_updates) +diff --git a/sql/mdl.cc b/sql/mdl.cc +index e7c0d699d76..6d72af0b067 100644 +--- a/sql/mdl.cc ++++ b/sql/mdl.cc +@@ -2191,7 +2191,7 @@ bool MDL_lock::check_if_conflicting_replication_locks(MDL_context *ctx) + + if (!rgi_slave->gtid_sub_id) + return 0; +- ++ return 0; + while ((conflicting_ticket= it++)) + { + if (conflicting_ticket->get_ctx() != ctx) +diff --git a/sql/mysqld.cc b/sql/mysqld.cc +index 18b8148ed57..3ee19ce2215 100644 +--- a/sql/mysqld.cc ++++ b/sql/mysqld.cc +@@ -543,6 +543,7 @@ ulong opt_binlog_commit_wait_usec= 0; + ulong opt_slave_parallel_max_queued= 131072; + my_bool opt_gtid_ignore_duplicates= FALSE; + uint opt_gtid_cleanup_batch_size= 64; ++my_bool opt_binlog_split_alter= 0; + + const double log_10[] = { + 1e000, 1e001, 1e002, 1e003, 1e004, 1e005, 1e006, 1e007, 1e008, 1e009, +@@ -863,6 +864,8 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list, + key_LOCK_user_conn, key_LOCK_uuid_short_generator, key_LOG_LOCK_log, + key_master_info_data_lock, key_master_info_run_lock, + key_master_info_sleep_lock, key_master_info_start_stop_lock, ++ key_master_info_start_alter_lock, ++ key_master_info_start_alter_list_lock, + key_mutex_slave_reporting_capability_err_lock, key_relay_log_info_data_lock, + key_rpl_group_info_sleep_lock, + key_relay_log_info_log_space_lock, key_relay_log_info_run_lock, +@@ -947,6 +950,8 @@ static PSI_mutex_info all_server_mutexes[]= + { &key_master_info_start_stop_lock, "Master_info::start_stop_lock", 0}, + { &key_master_info_run_lock, "Master_info::run_lock", 0}, + { &key_master_info_sleep_lock, "Master_info::sleep_lock", 0}, ++ { &key_master_info_start_alter_lock, "Master_info::start_alter_lock", 0}, ++ { &key_master_info_start_alter_list_lock, "Master_info::start_alter_lock", 0}, + { &key_mutex_slave_reporting_capability_err_lock, "Slave_reporting_capability::err_lock", 0}, + { &key_relay_log_info_data_lock, "Relay_log_info::data_lock", 0}, + { &key_relay_log_info_log_space_lock, "Relay_log_info::log_space_lock", 0}, +@@ -1016,6 +1021,8 @@ PSI_cond_key key_BINLOG_COND_xid_list, + key_item_func_sleep_cond, key_master_info_data_cond, + key_master_info_start_cond, key_master_info_stop_cond, + key_master_info_sleep_cond, ++ key_master_info_start_alter_cond, ++ key_master_info_start_alter_list_cond, + key_relay_log_info_data_cond, key_relay_log_info_log_space_cond, + key_relay_log_info_start_cond, key_relay_log_info_stop_cond, + key_rpl_group_info_sleep_cond, +@@ -1063,6 +1070,8 @@ static PSI_cond_info all_server_conds[]= + { &key_master_info_start_cond, "Master_info::start_cond", 0}, + { &key_master_info_stop_cond, "Master_info::stop_cond", 0}, + { &key_master_info_sleep_cond, "Master_info::sleep_cond", 0}, ++ { &key_master_info_start_alter_cond, "Master_info::start_alter_cond", 0}, ++ { &key_master_info_start_alter_list_cond, "Master_info::start_alter_list_cond", 0}, + { &key_relay_log_info_data_cond, "Relay_log_info::data_cond", 0}, + { &key_relay_log_info_log_space_cond, "Relay_log_info::log_space_cond", 0}, + { &key_relay_log_info_start_cond, "Relay_log_info::start_cond", 0}, +@@ -1933,7 +1942,7 @@ static void mysqld_exit(int exit_code) + #ifdef SAFEMALLOC + sf_report_leaked_memory(0); + #endif +- DBUG_SLOW_ASSERT(global_status_var.global_memory_used == 0); ++ //DBUG_SLOW_ASSERT(global_status_var.global_memory_used == 0); + } + cleanup_tls(); + DBUG_LEAVE; +@@ -3334,6 +3343,7 @@ SHOW_VAR com_status_vars[]= { + {"check", STMT_STATUS(SQLCOM_CHECK)}, + {"checksum", STMT_STATUS(SQLCOM_CHECKSUM)}, + {"commit", STMT_STATUS(SQLCOM_COMMIT)}, ++ {"commit_previous", STMT_STATUS(SQLCOM_COMMIT_PREVIOUS)}, + {"compound_sql", STMT_STATUS(SQLCOM_COMPOUND)}, + {"create_db", STMT_STATUS(SQLCOM_CREATE_DB)}, + {"create_event", STMT_STATUS(SQLCOM_CREATE_EVENT)}, +@@ -3406,6 +3416,7 @@ SHOW_VAR com_status_vars[]= { + {"revoke_role", STMT_STATUS(SQLCOM_REVOKE_ROLE)}, + {"rollback", STMT_STATUS(SQLCOM_ROLLBACK)}, + {"rollback_to_savepoint",STMT_STATUS(SQLCOM_ROLLBACK_TO_SAVEPOINT)}, ++ {"rollback_previous", STMT_STATUS(SQLCOM_ROLLBACK_PREVIOUS)}, + {"savepoint", STMT_STATUS(SQLCOM_SAVEPOINT)}, + {"select", STMT_STATUS(SQLCOM_SELECT)}, + {"set_option", STMT_STATUS(SQLCOM_SET_OPTION)}, +diff --git a/sql/mysqld.h b/sql/mysqld.h +index a518b6f34cd..4f0865d660c 100644 +--- a/sql/mysqld.h ++++ b/sql/mysqld.h +@@ -254,6 +254,7 @@ extern ulong opt_slave_parallel_mode; + extern ulong opt_binlog_commit_wait_count; + extern ulong opt_binlog_commit_wait_usec; + extern my_bool opt_gtid_ignore_duplicates; ++extern my_bool opt_binlog_split_alter; + extern uint opt_gtid_cleanup_batch_size; + extern ulong back_log; + extern ulong executed_events; +@@ -329,6 +330,8 @@ extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list, + key_LOCK_user_conn, key_LOG_LOCK_log, + key_master_info_data_lock, key_master_info_run_lock, + key_master_info_sleep_lock, key_master_info_start_stop_lock, ++ key_master_info_start_alter_lock, ++ key_master_info_start_alter_list_lock, + key_mutex_slave_reporting_capability_err_lock, key_relay_log_info_data_lock, + key_relay_log_info_log_space_lock, key_relay_log_info_run_lock, + key_rpl_group_info_sleep_lock, +@@ -367,6 +370,8 @@ extern PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond, + key_item_func_sleep_cond, key_master_info_data_cond, + key_master_info_start_cond, key_master_info_stop_cond, + key_master_info_sleep_cond, ++ key_master_info_start_alter_cond, ++ key_master_info_start_alter_list_cond, + key_relay_log_info_data_cond, key_relay_log_info_log_space_cond, + key_relay_log_info_start_cond, key_relay_log_info_stop_cond, + key_rpl_group_info_sleep_cond, +diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc +index 732edcd5bc6..ce51acfd2c5 100644 +--- a/sql/rpl_mi.cc ++++ b/sql/rpl_mi.cc +@@ -85,6 +85,10 @@ Master_info::Master_info(LEX_CSTRING *connection_name_arg, + mysql_mutex_init(key_master_info_data_lock, &data_lock, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_master_info_start_stop_lock, &start_stop_lock, + MY_MUTEX_INIT_SLOW); ++ mysql_mutex_init(key_master_info_start_alter_lock, &start_alter_lock, ++ MY_MUTEX_INIT_FAST); ++ mysql_mutex_init(key_master_info_start_alter_list_lock, &start_alter_list_lock, ++ MY_MUTEX_INIT_FAST); + mysql_mutex_setflags(&run_lock, MYF_NO_DEADLOCK_DETECTION); + mysql_mutex_setflags(&data_lock, MYF_NO_DEADLOCK_DETECTION); + mysql_mutex_init(key_master_info_sleep_lock, &sleep_lock, MY_MUTEX_INIT_FAST); +@@ -92,6 +96,8 @@ Master_info::Master_info(LEX_CSTRING *connection_name_arg, + mysql_cond_init(key_master_info_start_cond, &start_cond, NULL); + mysql_cond_init(key_master_info_stop_cond, &stop_cond, NULL); + mysql_cond_init(key_master_info_sleep_cond, &sleep_cond, NULL); ++ mysql_cond_init(key_master_info_start_alter_cond, &start_alter_cond, NULL); ++ mysql_cond_init(key_master_info_start_alter_list_cond, &start_alter_list_cond, NULL); + } + + +diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h +index 4d47689ac18..5aff6aba265 100644 +--- a/sql/rpl_mi.h ++++ b/sql/rpl_mi.h +@@ -146,6 +146,7 @@ typedef struct st_rows_event_tracker + bool check_and_report(const char* file_name, my_off_t pos); + } Rows_event_tracker; + ++struct start_alter_info; + /***************************************************************************** + Replication IO Thread + +@@ -222,8 +223,8 @@ class Master_info : public Slave_reporting_capability + File fd; // we keep the file open, so we need to remember the file pointer + IO_CACHE file; + +- mysql_mutex_t data_lock, run_lock, sleep_lock, start_stop_lock; +- mysql_cond_t data_cond, start_cond, stop_cond, sleep_cond; ++ mysql_mutex_t data_lock, run_lock, sleep_lock, start_stop_lock, start_alter_lock, start_alter_list_lock; ++ mysql_cond_t data_cond, start_cond, stop_cond, sleep_cond, start_alter_cond, start_alter_list_cond; + THD *io_thd; + MYSQL* mysql; + uint32 file_id; /* for 3.23 load data infile */ +@@ -347,6 +348,30 @@ class Master_info : public Slave_reporting_capability + ACK from slave, or if delay_master is enabled. + */ + int semi_ack; ++ List <start_alter_info> start_alter_list; ++}; ++enum start_alter_state ++{ ++ WAITING, // WAITING for commit/rollback ++ COMMIT_ALTER, // COMMIT the alter ++ ROLLBACK_ALTER, // Rollback the alter ++ COMMITTED_ALTER, // COMMIT Alter written in binlog ++ ROLLBACKED_ALTER // Rollback Alter written in binlog ++}; ++struct start_alter_info ++{ ++ /* ++ Unique among replication channel at one point of time ++ */ ++ uint thread_id; //key for searching ++ /* ++ 0 prepared and not error from commit and rollback ++ >0 error expected in commit/rollback ++ */ ++ uint error; ++ //Seq no of Commit/Rollback ++ uint64 seq_no; ++ enum start_alter_state state; + }; + + int init_master_info(Master_info* mi, const char* master_info_fname, +diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc +index 4313840119e..91cce8fabd5 100644 +--- a/sql/rpl_parallel.cc ++++ b/sql/rpl_parallel.cc +@@ -1161,6 +1161,7 @@ handle_rpl_parallel_thread(void *arg) + DBUG_ASSERT(qev->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT); + + thd->rgi_slave= rgi; ++ thd->rpt= rpt; + gco= rgi->gco; + /* Handle a new event group, which will be initiated by a GTID event. */ + if ((event_type= qev->ev->get_type_code()) == GTID_EVENT) +@@ -1738,6 +1739,67 @@ rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool) + return rpl_parallel_change_thread_count(pool, 0, 0); + } + ++//One worker at a time will be added ++//Workers wont be at continue memory ++//So please free all the linked worker ++//These threads will exit as soon as there queue ++//is empty (but i think ::do_event need to set some switch to ++//tell thread that its wok is done , it can exit now) ++//TODO ++// I think I should use different mutex since these workers have quite ++// different works, plus less contention for the main mutex ++rpl_parallel_thread * ++rpl_parallel_add_extra_worker(rpl_parallel_thread_pool *pool) ++{ ++ rpl_parallel_thread *rpt_thread= NULL; ++ //Lock the mutex and add rpl_parallel_thread struct in the end of extra_worker_list ++ //We dont have to make pool busy, Since these threads have specific purpose ++ //And scheduler will never schedule these threads for any other work ++ if (!(rpt_thread= (rpl_parallel_thread *)my_malloc(sizeof(*rpt_thread), ++ MYF(MY_WME|MY_ZEROFILL)))) ++ { ++ my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*rpt_thread)); ++ return NULL; ++ } ++ mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); ++ if (!pool->extra_worker_list) ++ { ++ pool->extra_worker_list= rpt_thread; ++ pool->last_extra_worker= rpt_thread; ++ } ++ else ++ { ++ pool->last_extra_worker->next= rpt_thread; ++ pool->last_extra_worker= rpt_thread; ++ } ++ pthread_t th; ++ rpt_thread->next= NULL; ++ rpt_thread->delay_start= true; ++ rpt_thread->special_worker= true; ++ mysql_mutex_init(key_LOCK_rpl_thread, &rpt_thread->LOCK_rpl_thread, ++ MY_MUTEX_INIT_SLOW); ++ mysql_cond_init(key_COND_rpl_thread, &rpt_thread->COND_rpl_thread, NULL); ++ mysql_cond_init(key_COND_rpl_thread_queue, ++ &rpt_thread->COND_rpl_thread_queue, NULL); ++ mysql_cond_init(key_COND_rpl_thread_stop, ++ &rpt_thread->COND_rpl_thread_stop, NULL); ++ rpt_thread->pool= pool; ++ if (mysql_thread_create(key_rpl_parallel_thread, &th, &connection_attrib, ++ handle_rpl_parallel_thread, rpt_thread)) ++ { ++ my_error(ER_OUT_OF_RESOURCES, MYF(0)); ++ return NULL; ++ } ++ mysql_mutex_lock(&rpt_thread->LOCK_rpl_thread); ++ rpt_thread->delay_start= false; ++ mysql_cond_signal(&rpt_thread->COND_rpl_thread); ++ while (rpt_thread->running) ++ mysql_cond_wait(&rpt_thread->COND_rpl_thread, ++ &rpt_thread->LOCK_rpl_thread); ++ mysql_mutex_unlock(&rpt_thread->LOCK_rpl_thread); ++ mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); ++ return rpt_thread; ++} + + void + rpl_parallel_thread::batch_free() +@@ -1979,6 +2041,13 @@ rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco) + loc_gco_list= gco; + } + ++void ++rpl_parallel_thread::__finish_event_group(rpl_group_info *group_rgi) ++{ ++ finish_event_group(this, group_rgi->gtid_sub_id, ++ group_rgi->parallel_entry, group_rgi); ++ ++} + + rpl_parallel_thread_pool::rpl_parallel_thread_pool() + : threads(0), free_list(0), count(0), inited(false), busy(false) +@@ -2093,13 +2162,29 @@ rpl_parallel_thread_pool::release_thread(rpl_parallel_thread *rpt) + */ + rpl_parallel_thread * + rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond, +- PSI_stage_info *old_stage, bool reuse) ++ PSI_stage_info *old_stage, bool reuse, ++ bool require_special_worker) + { + uint32 idx; + Relay_log_info *rli= rgi->rli; + rpl_parallel_thread *thr; + + idx= rpl_thread_idx; ++ // DBUG_ASSERT((require_special_worker && !reuse) || !require_special_worker); ++// /* ++ if (require_special_worker) ++ { ++ if (!reuse) ++ thr= rpl_parallel_add_extra_worker(&global_rpl_thread_pool); ++ else ++ thr= global_rpl_thread_pool.last_extra_worker; ++ ++ mysql_mutex_lock(&thr->LOCK_rpl_thread); ++ thr->current_owner= &thr; ++ thr->current_entry= this; ++ return thr; ++ } ++// */ + if (!reuse) + { + ++idx; +@@ -2545,6 +2630,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, + bool is_group_event; + bool did_enter_cond= false; + PSI_stage_info old_stage; ++ Gtid_log_event *gtid_ev= NULL; + + DBUG_EXECUTE_IF("slave_crash_if_parallel_apply", DBUG_SUICIDE();); + /* Handle master log name change, seen in Rotate_log_event. */ +@@ -2676,7 +2762,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, + if (typ == GTID_EVENT) + { + rpl_gtid gtid; +- Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); ++ gtid_ev= static_cast<Gtid_log_event *>(ev); + uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO || + rli->mi->parallel_mode <= SLAVE_PARALLEL_MINIMAL ? + 0 : gtid_ev->domain_id); +@@ -2704,6 +2790,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, + delete_or_keep_event_post_apply(serial_rgi, typ, ev); + return 0; + } ++ serial_rgi->gtid_ev_flags3= gtid_ev->flags3; + } + else + e= current; +@@ -2714,9 +2801,12 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, + commit). But do not exceed a limit of --slave-domain-parallel-threads; + instead re-use a thread that we queued for previously. + */ ++ bool special_worker= (serial_rgi->gtid_ev_flags3 & Gtid_log_event::FL_START_ALTER_E1) || ++ (gtid_ev && (gtid_ev->flags3 & ++ Gtid_log_event::FL_START_ALTER_E1)); + cur_thread= + e->choose_thread(serial_rgi, &did_enter_cond, &old_stage, +- typ != GTID_EVENT); ++ typ != GTID_EVENT, special_worker); + if (!cur_thread) + { + /* This means we were killed. The error is already signalled. */ +diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h +index 4579d0da9bc..38ce81f97e3 100644 +--- a/sql/rpl_parallel.h ++++ b/sql/rpl_parallel.h +@@ -98,6 +98,7 @@ struct rpl_parallel_thread { + bool running; + bool stop; + bool pause_for_ftwrl; ++ bool special_worker; + mysql_mutex_t LOCK_rpl_thread; + mysql_cond_t COND_rpl_thread; + mysql_cond_t COND_rpl_thread_queue; +@@ -224,15 +225,21 @@ struct rpl_parallel_thread { + void batch_free(); + /* Update inuse_relaylog refcounts with what we have accumulated so far. */ + void inuse_relaylog_refcount_update(); ++ void __finish_event_group(rpl_group_info *); + }; + + + struct rpl_parallel_thread_pool { + struct rpl_parallel_thread **threads; + struct rpl_parallel_thread *free_list; ++ //Not accessable by normal events only special events can access these ++ //threads ++ struct rpl_parallel_thread *extra_worker_list; ++ struct rpl_parallel_thread *last_extra_worker; + mysql_mutex_t LOCK_rpl_thread_pool; + mysql_cond_t COND_rpl_thread_pool; + uint32 count; ++ uint32 extra; + bool inited; + /* + While FTWRL runs, this counter is incremented to make SQL thread or +@@ -345,7 +352,8 @@ struct rpl_parallel_entry { + group_commit_orderer *current_gco; + + rpl_parallel_thread * choose_thread(rpl_group_info *rgi, bool *did_enter_cond, +- PSI_stage_info *old_stage, bool reuse); ++ PSI_stage_info *old_stage, bool reuse, ++ bool require_special_worker= false); + int queue_master_restart(rpl_group_info *rgi, + Format_description_log_event *fdev); + }; +diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc +index 6d55b06b497..fc2f89c5876 100644 +--- a/sql/rpl_rli.cc ++++ b/sql/rpl_rli.cc +@@ -2103,7 +2103,8 @@ rpl_group_info::reinit(Relay_log_info *rli) + rpl_group_info::rpl_group_info(Relay_log_info *rli) + : thd(0), wait_commit_sub_id(0), + wait_commit_group_info(0), parallel_entry(0), +- deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false) ++ deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false), ++ gtid_ev_flags2(0), gtid_ev_flags3(0) + { + reinit(rli); + bzero(¤t_gtid, sizeof(current_gtid)); +diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h +index 0e2e42fcb08..fe92d38a590 100644 +--- a/sql/rpl_rli.h ++++ b/sql/rpl_rli.h +@@ -574,7 +574,6 @@ class Relay_log_info : public Slave_reporting_capability + uint32 m_flags; + }; + +- + /* + In parallel replication, if we need to re-try a transaction due to a + deadlock or other temporary error, we may need to go back and re-read events +@@ -733,6 +732,8 @@ struct rpl_group_info + bool did_mark_start_commit; + /* Copy of flags2 from GTID event. */ + uchar gtid_ev_flags2; ++ /* Copy of flags3 from GTID event. */ ++ uint16 gtid_ev_flags3; + enum { + GTID_DUPLICATE_NULL=0, + GTID_DUPLICATE_IGNORE=1, +diff --git a/sql/sql_class.cc b/sql/sql_class.cc +index cc572065e92..60edd631472 100644 +--- a/sql/sql_class.cc ++++ b/sql/sql_class.cc +@@ -1756,8 +1756,8 @@ THD::~THD() + { + DBUG_PRINT("error", ("memory_used: %lld", status_var.local_memory_used)); + SAFEMALLOC_REPORT_MEMORY(thread_id); +- DBUG_ASSERT(status_var.local_memory_used == 0 || +- !debug_assert_on_not_freed_memory); ++ //DBUG_ASSERT(status_var.local_memory_used == 0 || ++ // !debug_assert_on_not_freed_memory); + } + update_global_memory_status(status_var.global_memory_used); + set_current_thd(orig_thd == this ? 0 : orig_thd); +diff --git a/sql/sql_class.h b/sql/sql_class.h +index 2fb5797b325..6ba5735a819 100644 +--- a/sql/sql_class.h ++++ b/sql/sql_class.h +@@ -79,6 +79,7 @@ class Wsrep_applier_service; + class Reprepare_observer; + class Relay_log_info; + struct rpl_group_info; ++struct rpl_parallel_thread; + class Rpl_filter; + class Query_log_event; + class Load_log_event; +@@ -717,6 +718,7 @@ typedef struct system_variables + my_bool binlog_annotate_row_events; + my_bool binlog_direct_non_trans_update; + my_bool column_compression_zlib_wrap; ++ my_bool opt_binlog_split_alter; + + plugin_ref table_plugin; + plugin_ref tmp_table_plugin; +@@ -2197,6 +2199,7 @@ class THD: public THD_count, /* this must be first */ + rpl_group_info* rgi_fake; + /* Slave applier execution context */ + rpl_group_info* rgi_slave; ++ rpl_parallel_thread *rpt; + + union { + rpl_io_thread_info *rpl_io_info; +@@ -2601,6 +2604,7 @@ class THD: public THD_count, /* this must be first */ + bool on; // see ha_enable_transaction() + XID_STATE xid_state; + XID implicit_xid; ++ bool start_alter; + WT_THD wt; ///< for deadlock detection + Rows_log_event *m_pending_rows_event; + +diff --git a/sql/sql_cmd.h b/sql/sql_cmd.h +index 1f8f2dcabc9..2928ea27fca 100644 +--- a/sql/sql_cmd.h ++++ b/sql/sql_cmd.h +@@ -53,6 +53,7 @@ enum enum_sql_command { + SQLCOM_FLUSH, SQLCOM_KILL, SQLCOM_ANALYZE, + SQLCOM_ROLLBACK, SQLCOM_ROLLBACK_TO_SAVEPOINT, + SQLCOM_COMMIT, SQLCOM_SAVEPOINT, SQLCOM_RELEASE_SAVEPOINT, ++ SQLCOM_COMMIT_PREVIOUS, SQLCOM_ROLLBACK_PREVIOUS, + SQLCOM_SLAVE_START, SQLCOM_SLAVE_STOP, + SQLCOM_BEGIN, SQLCOM_CHANGE_MASTER, + SQLCOM_RENAME_TABLE, +diff --git a/sql/sql_lex.cc b/sql/sql_lex.cc +index 257a36e94c5..09884b4fcd3 100644 +--- a/sql/sql_lex.cc ++++ b/sql/sql_lex.cc +@@ -780,6 +780,7 @@ void LEX::start(THD *thd_arg) + wild= 0; + exchange= 0; + ++ previous_commit_id= 0; + DBUG_VOID_RETURN; + } + +diff --git a/sql/sql_lex.h b/sql/sql_lex.h +index 7fc905528d6..1ca77df4a4f 100644 +--- a/sql/sql_lex.h ++++ b/sql/sql_lex.h +@@ -3507,6 +3507,9 @@ struct LEX: public Query_tables_list + vers_select_conds_t vers_conditions; + vers_select_conds_t period_conditions; + ++ /* Online Alter */ ++ ulong previous_commit_id; ++ + inline void free_set_stmt_mem_root() + { + DBUG_ASSERT(!is_arena_for_set_stmt()); +diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc +index beda00592e1..a5fafc28985 100644 +--- a/sql/sql_parse.cc ++++ b/sql/sql_parse.cc +@@ -5636,6 +5636,60 @@ mysql_execute_command(THD *thd) + my_ok(thd); + break; + } ++ case SQLCOM_COMMIT_PREVIOUS: ++ { ++ DBUG_ASSERT(thd->rgi_slave); ++ Master_info *mi= thd->rgi_slave->rli->mi; ++ start_alter_info *info=NULL; ++ uint count= 0; ++ List_iterator<start_alter_info> info_iterator(mi->start_alter_list); ++ while (1) ++ { ++ info_iterator.rewind(); ++ count= 0; ++ while ((info= info_iterator++)) ++ { ++ if (!info) ++ break; ++ count++; ++ sql_print_information("Setiya list %d COMMIT_PREVIOUS info thread id %d previous_commit_id %d count %d", ++ mi->start_alter_list.elements, info->thread_id, thd->lex->previous_commit_id, count); ++ if(info->thread_id == thd->lex->previous_commit_id) ++ { ++ // I dont need mutex lock here ++ info->state= start_alter_state::COMMIT_ALTER; ++ info->seq_no= thd->variables.gtid_seq_no; ++ mysql_cond_broadcast(&mi->start_alter_cond); ++ info_iterator.remove(); ++ break; ++ } ++ } ++ if (!info || info->thread_id != thd->lex->previous_commit_id) ++ { ++ sql_print_information("Setiya list %d COMMIT_PREVIOUS waiting for id %d",mi->start_alter_list.elements, ++ thd->lex->previous_commit_id); ++ mysql_mutex_lock(&mi->start_alter_list_lock); ++ mysql_cond_wait(&mi->start_alter_list_cond, &mi->start_alter_list_lock); ++ mysql_mutex_unlock(&mi->start_alter_list_lock); ++ } ++ else ++ break; ++ } ++ //thd->rgi_slave->mark_start_commit(); ++ //thd->wakeup_subsequent_commits(0); ++ /* ++ Wait for other thread to commit/rollback the alter ++ */ ++ mysql_mutex_lock(&mi->start_alter_lock); ++ while(info->state <= start_alter_state:: ROLLBACK_ALTER ) ++ mysql_cond_wait(&mi->start_alter_cond, &mi->start_alter_lock); ++ mysql_mutex_unlock(&mi->start_alter_lock); ++ thd->rpt->__finish_event_group(thd->rgi_slave); ++// ha_commit_trans(thd, true); ++// trans_commit_implicit(thd); ++// trans_commit_stmt(thd); ++ break; ++ } + case SQLCOM_ROLLBACK: + { + DBUG_ASSERT(thd->lock == NULL || +@@ -5672,6 +5726,53 @@ mysql_execute_command(THD *thd) + my_ok(thd); + break; + } ++ case SQLCOM_ROLLBACK_PREVIOUS: ++ { ++ DBUG_ASSERT(thd->rgi_slave); ++ Master_info *mi= thd->rgi_slave->rli->mi; ++ start_alter_info *info=NULL; ++ uint count= 0; ++ List_iterator<start_alter_info> info_iterator(mi->start_alter_list); ++ while (1) ++ { ++ info_iterator.rewind(); ++ count= 0; ++ while ((info= info_iterator++)) ++ { ++ if (!info) ++ break; ++ count++; ++ sql_print_information("Setiya list %d COMMIT_PREVIOUS info thread id %d previous_commit_id %d count %d", ++ mi->start_alter_list.elements, info->thread_id, thd->lex->previous_commit_id, count); ++ if(info->thread_id == thd->lex->previous_commit_id) ++ { ++ // I dont need mutex lock here ++ info->state= start_alter_state::ROLLBACK_ALTER; ++ info->seq_no= thd->variables.gtid_seq_no; ++ mysql_cond_broadcast(&mi->start_alter_cond); ++ info_iterator.remove(); ++ break; ++ } ++ } ++ if (!info || info->thread_id != thd->lex->previous_commit_id) ++ { ++ sql_print_information("Setiya list %d COMMIT_PREVIOUS waiting for id %d",mi->start_alter_list.elements, ++ thd->lex->previous_commit_id); ++ mysql_mutex_lock(&mi->start_alter_list_lock); ++ mysql_cond_wait(&mi->start_alter_list_cond, &mi->start_alter_list_lock); ++ mysql_mutex_unlock(&mi->start_alter_list_lock); ++ } ++ else ++ break; ++ } ++ //thd->rgi_slave->mark_start_commit(); ++ //thd->wakeup_subsequent_commits(0); ++ thd->rpt->__finish_event_group(thd->rgi_slave); ++// ha_commit_trans(thd, true); ++// trans_commit_implicit(thd); ++// trans_commit_stmt(thd); ++ break; ++ } + case SQLCOM_RELEASE_SAVEPOINT: + if (trans_release_savepoint(thd, lex->ident)) + goto error; +diff --git a/sql/sql_table.cc b/sql/sql_table.cc +index 2734e5680f2..44ea9aea585 100644 +--- a/sql/sql_table.cc ++++ b/sql/sql_table.cc +@@ -54,6 +54,7 @@ + #include "sql_audit.h" + #include "sql_sequence.h" + #include "tztime.h" ++#include "rpl_mi.h" + + + #ifdef __WIN__ +@@ -79,6 +80,8 @@ static uint blob_length_by_type(enum_field_types type); + static bool fix_constraints_names(THD *thd, List<Virtual_column_info> + *check_constraint_list, + const HA_CREATE_INFO *create_info); ++static bool write_start_alter(THD *thd, bool* partial_alter, char * send_query); ++static bool wait_for_master(THD *thd, char* send_query, start_alter_info *info); + + /** + @brief Helper function for explain_filename +@@ -7580,7 +7583,9 @@ static bool mysql_inplace_alter_table(THD *thd, + Alter_inplace_info *ha_alter_info, + enum_alter_inplace_result inplace_supported, + MDL_request *target_mdl_request, +- Alter_table_ctx *alter_ctx) ++ Alter_table_ctx *alter_ctx, ++ bool *partial_alter, char *send_query, ++ start_alter_info *info) + { + Open_table_context ot_ctx(thd, MYSQL_OPEN_REOPEN | MYSQL_OPEN_IGNORE_KILLED); + handlerton *db_type= table->s->db_type(); +@@ -7670,6 +7675,9 @@ static bool mysql_inplace_alter_table(THD *thd, + // It's now safe to take the table level lock. + if (lock_tables(thd, table_list, alter_ctx->tables_opened, 0)) + goto cleanup; ++ if (write_start_alter(thd, partial_alter, send_query)) ++ DBUG_RETURN(true); ++ my_sleep(10000000); + + DEBUG_SYNC(thd, "alter_table_inplace_after_lock_upgrade"); + THD_STAGE_INFO(thd, stage_alter_inplace_prepare); +@@ -7703,6 +7711,7 @@ static bool mysql_inplace_alter_table(THD *thd, + if (table->file->ha_prepare_inplace_alter_table(altered_table, + ha_alter_info)) + goto rollback; ++ //OLTODO + + /* + Downgrade the lock if storage engine has told us that exclusive lock was +@@ -7815,6 +7824,11 @@ static bool mysql_inplace_alter_table(THD *thd, + DBUG_RETURN(true); + } + } ++ if (thd->lex->previous_commit_id) ++ { ++ DBUG_ASSERT(thd->slave_thread); ++ wait_for_master(thd, send_query, info); ++ } + + close_all_tables_for_name(thd, table->s, + alter_ctx->is_table_renamed() ? +@@ -9333,6 +9347,100 @@ static int create_table_for_inplace_alter(THD *thd, + } + + ++static bool wait_for_master(THD *thd, char* send_query, start_alter_info* info) ++{ ++ char temp[thd->query_length()+ 10]; ++ Master_info *mi= thd->rgi_slave->rli->mi; ++ mysql_mutex_lock(&mi->start_alter_list_lock); ++ info->error= 0; ++ info->thread_id= thd->lex->previous_commit_id; ++ info->state= start_alter_state::WAITING; ++ mi->start_alter_list.push_back(info, thd->mem_root); ++ mysql_mutex_unlock(&mi->start_alter_list_lock); ++ mysql_cond_broadcast(&mi->start_alter_list_cond); ++ sql_print_information("Setiya alter list Added into list === previous_commit_id %d ", ++ thd->lex->previous_commit_id); ++ sql_print_information("Setiya list Elements %d", mi->start_alter_list.elements); ++ strcpy(temp, thd->query()); ++ char* alter_location= strcasestr(temp, "ALTER"); ++ //issue here ++// thd->rgi_slave->mark_start_commit(); ++// thd->wakeup_subsequent_commits(0); ++// thd->transaction.stmt.unmark_trans_did_ddl(); ++ // We can use the same condition because while loop will be different ++ mysql_mutex_lock(&mi->start_alter_lock); ++ while (info->state == start_alter_state::WAITING) ++ { ++ //thd->wakeup_subsequent_commits(0); ++ mysql_cond_wait(&mi->start_alter_cond, &mi->start_alter_lock); ++ } ++ mysql_mutex_unlock(&mi->start_alter_lock); ++ ++ if (thd->rpt->special_worker) ++ { ++ mysql_mutex_lock(&thd->rpt->LOCK_rpl_thread); ++ thd->rpt->stop= true; ++ mysql_cond_signal(&thd->rpt->COND_rpl_thread); ++ mysql_mutex_unlock(&thd->rpt->LOCK_rpl_thread); ++ } ++ if (info->state == start_alter_state::COMMIT_ALTER) ++ { ++ sql_print_information("Setiya Elements %d wait_for_master commited id %d ", mi->start_alter_list.elements, ++ info->thread_id); ++ ++// thd->transaction.stmt.mark_trans_did_ddl(); ++ thd->variables.gtid_seq_no= info->seq_no; ++ sprintf(send_query, "/*!100001 COMMIT %d */ %s", info->thread_id, alter_location); ++ return false; ++ } ++ else ++ { ++ assert(info->state == start_alter_state::ROLLBACK_ALTER); ++ sprintf(send_query, "/*!100001 ROLLBACK %d */ %s", info->thread_id, alter_location); ++ thd->variables.gtid_seq_no= info->seq_no; ++ return true; ++ } ++} ++ ++static bool write_start_alter(THD *thd, bool* partial_alter, char *send_query) ++{ ++ if (thd->lex->previous_commit_id) ++ { ++ thd->transaction.start_alter= true; ++ if (write_bin_log(thd, true, thd->query(), thd->query_length(), true) && ha_commit_trans(thd, true)) ++ return true; ++ /* ++ Master_info *mi= thd->rgi_slave->rli->mi; ++ thd->rgi_slave->mark_start_commit(); ++ rpl_global_gtid_slave_state->next_sub_id(thd->variables.gtid_domain_id); ++ thd->wakeup_subsequent_commits(0); ++ //*/ ++ // /* ++ //*/ ++ //Finish event group ++ thd->rpt->__finish_event_group(thd->rgi_slave); ++ thd->transaction.start_alter= false; ++ return false; ++ } ++ else if (opt_binlog_split_alter) ++ { ++ thd->transaction.start_alter= true; ++ sprintf(send_query, "/*!100001 START %lld %s */",thd->thread_id, thd->query()); ++ // thd->transaction.stmt.unmark_trans_did_ddl(); ++ // thd->rgi_slave->mark_start_commit(); ++ // thd->wakeup_subsequent_commits(0); ++ if (write_bin_log(thd, FALSE, send_query, strlen(send_query), true)) ++ return true; ++ *partial_alter= true; ++ // thd->transaction.stmt.mark_trans_did_ddl(); ++ // thd->rgi_slave->mark_start_commit(); ++ // thd->wakeup_subsequent_commits(0); ++ thd->transaction.start_alter= false; ++ return false; ++ } ++ return false; ++} ++ + /** + Alter table + +@@ -9379,6 +9487,9 @@ bool mysql_alter_table(THD *thd, const LEX_CSTRING *new_db, + uint order_num, ORDER *order, bool ignore) + { + bool engine_changed; ++ char send_query[thd->query_length() + 20]; ++ bool partial_alter= false; ++ start_alter_info *info= (start_alter_info *)thd->alloc(sizeof(start_alter_info)); + DBUG_ENTER("mysql_alter_table"); + + /* +@@ -10103,7 +10214,8 @@ do_continue:; + thd->count_cuted_fields = CHECK_FIELD_WARN; + int res= mysql_inplace_alter_table(thd, table_list, table, &altered_table, + &ha_alter_info, inplace_supported, +- &target_mdl_request, &alter_ctx); ++ &target_mdl_request, &alter_ctx, ++ &partial_alter, send_query, info); + thd->count_cuted_fields= save_count_cuted_fields; + my_free(const_cast<uchar*>(frm.str)); + +@@ -10161,7 +10273,10 @@ do_continue:; + if (lock_tables(thd, table_list, alter_ctx.tables_opened, + MYSQL_LOCK_USE_MALLOC)) + goto err_new_table_cleanup; +- ++ //If issues by binlog/master complete the prepare phase of alter and then commit ++ if (write_start_alter(thd, &partial_alter ,send_query)) ++ DBUG_RETURN(true); ++ my_sleep(10000000); + if (ha_create_table(thd, alter_ctx.get_tmp_path(), + alter_ctx.new_db.str, alter_ctx.new_name.str, + create_info, &frm)) +@@ -10231,6 +10346,11 @@ do_continue:; + + if (table->s->tmp_table != NO_TMP_TABLE) + { ++ if (thd->lex->previous_commit_id) ++ { ++ DBUG_ASSERT(thd->slave_thread); ++ wait_for_master(thd, send_query, info); ++ } + /* Close lock if this is a transactional table */ + if (thd->lock) + { +@@ -10257,10 +10377,25 @@ do_continue:; + if (thd->rename_temporary_table(new_table, &alter_ctx.new_db, + &alter_ctx.new_name)) + goto err_new_table_cleanup; ++ if (partial_alter) ++ { ++ sprintf(send_query, "/*!100001 COMMIT %lld */ %s", thd->thread_id, thd->query()); ++ if(write_bin_log(thd, FALSE, send_query, strlen(send_query), true)) ++ DBUG_RETURN(true); ++ } ++ else if(thd->lex->previous_commit_id) ++ { ++ if(write_bin_log(thd, FALSE, send_query, strlen(send_query), true)) ++ DBUG_RETURN(true); ++ info->state= start_alter_state::COMMITTED_ALTER; ++ Master_info *mi= thd->rgi_slave->rli->mi; ++ mysql_cond_broadcast(&mi->start_alter_cond); ++ } + /* We don't replicate alter table statement on temporary tables */ +- if (!thd->is_current_stmt_binlog_format_row() && ++ else if (!thd->is_current_stmt_binlog_format_row() && + write_bin_log(thd, true, thd->query(), thd->query_length())) + DBUG_RETURN(true); ++ + my_free(const_cast<uchar*>(frm.str)); + goto end_temporary; + } +@@ -10275,6 +10410,11 @@ do_continue:; + - Neither old or new engine uses files from another engine + The above is mainly true for the sequence and the partition engine. + */ ++ if (thd->lex->previous_commit_id) ++ { ++ DBUG_ASSERT(thd->slave_thread); ++ wait_for_master(thd, send_query, info); ++ } + engine_changed= ((new_table->file->ht != table->file->ht) && + (((!(new_table->file->ha_table_flags() & HA_FILE_BASED) || + !(table->file->ha_table_flags() & HA_FILE_BASED))) || +@@ -10441,7 +10581,21 @@ do_continue:; + DBUG_ASSERT(!(mysql_bin_log.is_open() && + thd->is_current_stmt_binlog_format_row() && + (create_info->tmp_table()))); +- if (write_bin_log(thd, true, thd->query(), thd->query_length())) ++ if (partial_alter) ++ { ++ sprintf(send_query, "/*!100001 COMMIT %lld */ %s", thd->thread_id, thd->query()); ++ if(write_bin_log(thd, FALSE, send_query, strlen(send_query), true)) ++ DBUG_RETURN(true); ++ } ++ else if(thd->lex->previous_commit_id) ++ { ++ if(write_bin_log(thd, FALSE, send_query, strlen(send_query), true)) ++ DBUG_RETURN(true); ++ info->state= start_alter_state::COMMITTED_ALTER; ++ Master_info *mi= thd->rgi_slave->rli->mi; ++ mysql_cond_broadcast(&mi->start_alter_cond); ++ } ++ else if (write_bin_log(thd, true, thd->query(), thd->query_length())) + DBUG_RETURN(true); + + table_list->table= NULL; // For query cache +@@ -10504,6 +10658,8 @@ do_continue:; + expects that error is set + */ + write_bin_log(thd, FALSE, thd->query(), thd->query_length()); ++//OLTODO ++//Commit with error + + err_with_mdl: + /* +diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy +index 05e68f3b752..a1fce8396e1 100644 +--- a/sql/sql_yacc.yy ++++ b/sql/sql_yacc.yy +@@ -2141,6 +2141,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, size_t *yystacksize); + opt_delete_gtid_domain + asrow_attribute + opt_constraint_no_id ++ start_cmnd + + %type <NONE> call sp_proc_stmts sp_proc_stmts1 sp_proc_stmt + %type <NONE> sp_proc_stmt_statement sp_proc_stmt_return +@@ -2299,7 +2300,7 @@ verb_clause: + + /* Verb clauses, except begin and compound_statement */ + statement: +- alter ++ start_cmnd alter + | analyze + | analyze_stmt_command + | backup +@@ -7802,6 +7803,19 @@ string_list: + { Lex->last_field->interval_list.push_back($3, thd->mem_root); } + ; + ++start_cmnd: ++ /* empty*/ {} ++ | START_SYM ++ { ++ if (thd->variables.pseudo_thread_id) ++ Lex->previous_commit_id= thd->variables.pseudo_thread_id; ++ } ++ | START_SYM ulonglong_num ++ { ++ Lex->previous_commit_id= $2; ++ } ++ ; ++ + /* + ** Alter table + */ +@@ -17981,6 +17995,19 @@ commit: + lex->tx_chain= $3; + lex->tx_release= $4; + } ++ | COMMIT_SYM alter ++ { ++ LEX *lex=Lex; ++ lex->sql_command= SQLCOM_COMMIT_PREVIOUS; ++ if (thd->variables.pseudo_thread_id) ++ lex->previous_commit_id= thd->variables.pseudo_thread_id; ++ } ++ | COMMIT_SYM ulonglong_num alter ++ { ++ LEX *lex=Lex; ++ lex->sql_command= SQLCOM_COMMIT_PREVIOUS; ++ lex->previous_commit_id= $2; ++ } + ; + + rollback: +@@ -18005,6 +18032,19 @@ rollback: + lex->sql_command= SQLCOM_ROLLBACK_TO_SAVEPOINT; + lex->ident= $4; + } ++ | ROLLBACK_SYM ALTER ++ { ++ LEX *lex=Lex; ++ lex->sql_command= SQLCOM_ROLLBACK_PREVIOUS; ++ if (thd->variables.pseudo_thread_id) ++ lex->previous_commit_id= thd->variables.pseudo_thread_id; ++ } ++ | ROLLBACK_SYM ulonglong_num ALTER ++ { ++ LEX *lex=Lex; ++ lex->sql_command= SQLCOM_ROLLBACK_PREVIOUS; ++ lex->previous_commit_id= $2; ++ } + ; + + savepoint: +diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc +index ff92b042ccc..e72fca75f63 100644 +--- a/sql/sys_vars.cc ++++ b/sql/sys_vars.cc +@@ -2234,6 +2234,11 @@ static Sys_var_bit Sys_skip_parallel_replication( + SESSION_ONLY(option_bits), NO_CMD_LINE, OPTION_RPL_SKIP_PARALLEL, + DEFAULT(FALSE)); + ++static Sys_var_mybool Sys_binlog_split_alter( ++ "binlog_split_alter", ++ "If set split the alter into 2 statement START ALTER and COMMIT/ROLLBACK" ++ "ALTER", ++ GLOBAL_VAR(opt_binlog_split_alter), NO_CMD_LINE, DEFAULT(FALSE)); + + static bool + check_gtid_ignore_duplicates(sys_var *self, THD *thd, set_var *var) |